Skip to main content

nautilus_deribit/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Deribit.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21
22use std::{
23    collections::VecDeque,
24    sync::{
25        Arc, Mutex,
26        atomic::{AtomicBool, AtomicU64, Ordering},
27    },
28};
29
30use ahash::AHashMap;
31use nautilus_common::cache::fifo::FifoCache;
32use nautilus_core::{AtomicSet, AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
33use nautilus_model::{
34    data::{Bar, Data, InstrumentStatus},
35    enums::MarketStatusAction,
36    events::{AccountState, OrderCancelRejected, OrderModifyRejected, OrderRejected},
37    identifiers::{
38        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
39    },
40    instruments::{Instrument, InstrumentAny},
41};
42use nautilus_network::{
43    RECONNECTED,
44    retry::{RetryManager, create_websocket_retry_manager},
45    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
46};
47use tokio_tungstenite::tungstenite::Message;
48use ustr::Ustr;
49
50use super::{
51    enums::{DeribitBookMsgType, DeribitHeartbeatType, DeribitWsChannel},
52    error::DeribitWsError,
53    messages::{
54        DeribitAuthResult, DeribitBookMsg, DeribitCancelAllByInstrumentParams, DeribitCancelParams,
55        DeribitChartMsg, DeribitEditParams, DeribitHeartbeatParams, DeribitInstrumentStateMsg,
56        DeribitJsonRpcRequest, DeribitOrderMsg, DeribitOrderParams, DeribitOrderResponse,
57        DeribitPerpetualMsg, DeribitPortfolioMsg, DeribitQuoteMsg, DeribitSubscribeParams,
58        DeribitTickerMsg, DeribitTradeMsg, DeribitUserTradeMsg, DeribitWsMessage,
59        NautilusWsMessage, parse_raw_message,
60    },
61    parse::{
62        OrderEventType, determine_order_event_type, parse_book_msg, parse_chart_msg,
63        parse_order_accepted, parse_order_canceled, parse_order_expired, parse_order_updated,
64        parse_perpetual_to_funding_rate, parse_quote_msg, parse_ticker_to_index_price,
65        parse_ticker_to_mark_price, parse_ticker_to_option_greeks, parse_trades_data,
66        parse_user_order_msg, parse_user_trade_msg, resolution_to_bar_type,
67    },
68};
69use crate::common::{
70    consts::{DERIBIT_POST_ONLY_ERROR_CODE, DERIBIT_RATE_LIMIT_KEY_ORDER, DERIBIT_VENUE},
71    enums::DeribitInstrumentState,
72    parse::parse_portfolio_to_account_state,
73};
74
75/// Type of pending request for request ID correlation.
76#[derive(Debug, Clone)]
77pub enum PendingRequestType {
78    /// Authentication request.
79    Authenticate,
80    /// Subscribe request with requested channels.
81    Subscribe { channels: Vec<String> },
82    /// Unsubscribe request with requested channels.
83    Unsubscribe { channels: Vec<String> },
84    /// Set heartbeat request.
85    SetHeartbeat,
86    /// Test/ping request (heartbeat response).
87    Test,
88    /// Buy order request.
89    Buy {
90        client_order_id: ClientOrderId,
91        trader_id: TraderId,
92        strategy_id: StrategyId,
93        instrument_id: InstrumentId,
94    },
95    /// Sell order request.
96    Sell {
97        client_order_id: ClientOrderId,
98        trader_id: TraderId,
99        strategy_id: StrategyId,
100        instrument_id: InstrumentId,
101    },
102    /// Edit order request.
103    Edit {
104        client_order_id: ClientOrderId,
105        trader_id: TraderId,
106        strategy_id: StrategyId,
107        instrument_id: InstrumentId,
108    },
109    /// Cancel order request.
110    Cancel {
111        client_order_id: ClientOrderId,
112        trader_id: TraderId,
113        strategy_id: StrategyId,
114        instrument_id: InstrumentId,
115    },
116    /// Cancel all orders by instrument request.
117    CancelAllByInstrument { instrument_id: InstrumentId },
118    /// Get order state request.
119    GetOrderState {
120        client_order_id: ClientOrderId,
121        trader_id: TraderId,
122        strategy_id: StrategyId,
123        instrument_id: InstrumentId,
124    },
125}
126
127/// Commands sent from the client to the handler.
128#[allow(missing_debug_implementations)]
129pub enum HandlerCommand {
130    /// Set the active WebSocket client.
131    SetClient(WebSocketClient),
132    /// Disconnect the WebSocket.
133    Disconnect,
134    /// Authenticate with credentials.
135    Authenticate {
136        /// Serialized auth params (DeribitAuthParams or DeribitRefreshTokenParams).
137        auth_params: serde_json::Value,
138    },
139    /// Enable heartbeat with interval.
140    SetHeartbeat { interval: u64 },
141    /// Initialize the instrument cache.
142    InitializeInstruments(Vec<InstrumentAny>),
143    /// Update a single instrument in the cache.
144    UpdateInstrument(Box<InstrumentAny>),
145    /// Subscribe to channels.
146    Subscribe { channels: Vec<String> },
147    /// Unsubscribe from channels.
148    Unsubscribe { channels: Vec<String> },
149    /// Submit a buy order.
150    Buy {
151        params: DeribitOrderParams,
152        client_order_id: ClientOrderId,
153        trader_id: TraderId,
154        strategy_id: StrategyId,
155        instrument_id: InstrumentId,
156    },
157    /// Submit a sell order.
158    Sell {
159        params: DeribitOrderParams,
160        client_order_id: ClientOrderId,
161        trader_id: TraderId,
162        strategy_id: StrategyId,
163        instrument_id: InstrumentId,
164    },
165    /// Edit an existing order.
166    Edit {
167        params: DeribitEditParams,
168        client_order_id: ClientOrderId,
169        trader_id: TraderId,
170        strategy_id: StrategyId,
171        instrument_id: InstrumentId,
172    },
173    /// Cancel an existing order.
174    Cancel {
175        params: DeribitCancelParams,
176        client_order_id: ClientOrderId,
177        trader_id: TraderId,
178        strategy_id: StrategyId,
179        instrument_id: InstrumentId,
180    },
181    /// Cancel all orders by instrument.
182    CancelAllByInstrument {
183        params: DeribitCancelAllByInstrumentParams,
184        instrument_id: InstrumentId,
185    },
186    /// Get order state.
187    GetOrderState {
188        order_id: String,
189        client_order_id: ClientOrderId,
190        trader_id: TraderId,
191        strategy_id: StrategyId,
192        instrument_id: InstrumentId,
193    },
194}
195
196/// Context for an order submitted via this handler.
197///
198/// Stores the original trader/strategy/client IDs from the buy/sell command
199/// so they can be used when processing user.orders subscription updates.
200#[derive(Debug, Clone)]
201pub struct OrderContext {
202    pub client_order_id: ClientOrderId,
203    pub trader_id: TraderId,
204    pub strategy_id: StrategyId,
205    pub instrument_id: InstrumentId,
206}
207
208/// Deribit WebSocket feed handler.
209///
210/// Runs in a dedicated Tokio task, processing commands and raw WebSocket messages.
211#[allow(missing_debug_implementations)]
212pub struct DeribitWsFeedHandler {
213    clock: &'static AtomicTime,
214    signal: Arc<AtomicBool>,
215    inner: Option<WebSocketClient>,
216    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
217    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
218    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
219    auth_tracker: AuthTracker,
220    subscriptions_state: SubscriptionState,
221    retry_manager: RetryManager<DeribitWsError>,
222    instruments_cache: AHashMap<Ustr, InstrumentAny>,
223    option_greeks_subs: Arc<AtomicSet<InstrumentId>>,
224    mark_price_subs: Arc<AtomicSet<InstrumentId>>,
225    index_price_subs: Arc<AtomicSet<InstrumentId>>,
226    request_id_counter: AtomicU64,
227    pending_requests: AHashMap<u64, PendingRequestType>,
228    account_id: Option<AccountId>,
229    order_contexts: AHashMap<VenueOrderId, OrderContext>,
230    emitted_accepted: FifoCache<VenueOrderId, 10_000>,
231    terminal_orders: FifoCache<ClientOrderId, 10_000>,
232    pending_bars: AHashMap<String, Bar>,
233    bars_timestamp_on_close: bool,
234    last_account_states: AHashMap<String, AccountState>,
235    book_sequence: AHashMap<Ustr, u64>,
236    pending_book_resync: Vec<String>,
237    pending_outgoing: VecDeque<NautilusWsMessage>,
238    subscribe_errors: Arc<Mutex<Vec<String>>>,
239}
240
241impl DeribitWsFeedHandler {
242    /// Creates a new feed handler.
243    #[expect(clippy::too_many_arguments)]
244    #[must_use]
245    pub fn new(
246        signal: Arc<AtomicBool>,
247        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
248        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
249        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
250        auth_tracker: AuthTracker,
251        subscriptions_state: SubscriptionState,
252        option_greeks_subs: Arc<AtomicSet<InstrumentId>>,
253        mark_price_subs: Arc<AtomicSet<InstrumentId>>,
254        index_price_subs: Arc<AtomicSet<InstrumentId>>,
255        account_id: Option<AccountId>,
256        bars_timestamp_on_close: bool,
257        subscribe_errors: Arc<Mutex<Vec<String>>>,
258    ) -> Self {
259        Self {
260            clock: get_atomic_clock_realtime(),
261            signal,
262            inner: None,
263            cmd_rx,
264            raw_rx,
265            out_tx,
266            auth_tracker,
267            subscriptions_state,
268            retry_manager: create_websocket_retry_manager(),
269            instruments_cache: AHashMap::new(),
270            option_greeks_subs,
271            mark_price_subs,
272            index_price_subs,
273            request_id_counter: AtomicU64::new(1),
274            pending_requests: AHashMap::new(),
275            account_id,
276            order_contexts: AHashMap::new(),
277            emitted_accepted: FifoCache::new(),
278            terminal_orders: FifoCache::new(),
279            pending_bars: AHashMap::new(),
280            bars_timestamp_on_close,
281            last_account_states: AHashMap::new(),
282            book_sequence: AHashMap::new(),
283            pending_book_resync: Vec::new(),
284            pending_outgoing: VecDeque::new(),
285            subscribe_errors,
286        }
287    }
288
289    /// Sets the account ID for order/fill reports.
290    pub fn set_account_id(&mut self, account_id: AccountId) {
291        self.account_id = Some(account_id);
292    }
293
294    /// Returns the account ID.
295    #[must_use]
296    pub fn account_id(&self) -> Option<AccountId> {
297        self.account_id
298    }
299
300    fn clear_state(&mut self) {
301        let pending_count = self.pending_requests.len();
302        let emitted_count = self.emitted_accepted.len();
303        let bars_count = self.pending_bars.len();
304        let account_count = self.last_account_states.len();
305        let book_count = self.book_sequence.len();
306        let outgoing_count = self.pending_outgoing.len();
307
308        self.pending_requests.clear();
309        self.emitted_accepted.clear();
310        self.pending_bars.clear();
311        self.last_account_states.clear();
312        self.book_sequence.clear();
313        self.pending_book_resync.clear();
314        self.pending_outgoing.clear();
315
316        log::debug!(
317            "Reset state: pending_requests={pending_count}, emitted_accepted={emitted_count}, \
318            pending_bars={bars_count}, account_states={account_count}, book_sequence={book_count}, \
319            pending_outgoing={outgoing_count}"
320        );
321    }
322
323    /// Generates a unique request ID.
324    fn next_request_id(&self) -> u64 {
325        self.request_id_counter.fetch_add(1, Ordering::Relaxed)
326    }
327
328    /// Returns the current timestamp.
329    fn ts_init(&self) -> UnixNanos {
330        self.clock.get_time_ns()
331    }
332
333    /// Checks if there's a pending buy/sell request for the given client_order_id.
334    ///
335    /// This is used to avoid emitting duplicate OrderAccepted events from the
336    /// user.orders subscription when the response path will also emit an event.
337    fn is_pending_order(&self, client_order_id: &ClientOrderId) -> bool {
338        self.pending_requests.values().any(|req| match req {
339            PendingRequestType::Buy {
340                client_order_id: id,
341                ..
342            }
343            | PendingRequestType::Sell {
344                client_order_id: id,
345                ..
346            } => id == client_order_id,
347            _ => false,
348        })
349    }
350
351    /// Gets the OrderContext from a pending buy/sell request by client_order_id.
352    ///
353    /// Returns None if no pending request found.
354    fn get_pending_order_context(&self, client_order_id: &ClientOrderId) -> Option<OrderContext> {
355        for req in self.pending_requests.values() {
356            match req {
357                PendingRequestType::Buy {
358                    client_order_id: id,
359                    trader_id,
360                    strategy_id,
361                    instrument_id,
362                }
363                | PendingRequestType::Sell {
364                    client_order_id: id,
365                    trader_id,
366                    strategy_id,
367                    instrument_id,
368                } if id == client_order_id => {
369                    return Some(OrderContext {
370                        client_order_id: *id,
371                        trader_id: *trader_id,
372                        strategy_id: *strategy_id,
373                        instrument_id: *instrument_id,
374                    });
375                }
376                _ => {}
377            }
378        }
379        None
380    }
381
382    async fn send_tracked_request(
383        &mut self,
384        request_id: u64,
385        payload: Result<String, DeribitWsError>,
386        rate_limit_keys: Option<&[Ustr]>,
387    ) -> Result<(), DeribitWsError> {
388        let payload = match payload {
389            Ok(p) => p,
390            Err(e) => {
391                self.pending_requests.remove(&request_id);
392                return Err(e);
393            }
394        };
395        let result = self.send_with_retry(payload, rate_limit_keys).await;
396        if result.is_err() {
397            self.pending_requests.remove(&request_id);
398        }
399        result
400    }
401
402    /// Sends a message over the WebSocket with retry logic.
403    async fn send_with_retry(
404        &self,
405        payload: String,
406        rate_limit_keys: Option<&[Ustr]>,
407    ) -> Result<(), DeribitWsError> {
408        if let Some(client) = &self.inner {
409            let keys_owned: Option<Vec<Ustr>> = rate_limit_keys.map(|k| k.to_vec());
410            self.retry_manager
411                .execute_with_retry(
412                    "websocket_send",
413                    || {
414                        let payload = payload.clone();
415                        let keys = keys_owned.clone();
416                        async move {
417                            client
418                                .send_text(payload, keys.as_deref())
419                                .await
420                                .map_err(|e| DeribitWsError::Send(e.to_string()))
421                        }
422                    },
423                    |e| matches!(e, DeribitWsError::Send(_)),
424                    DeribitWsError::Timeout,
425                )
426                .await
427        } else {
428            Err(DeribitWsError::NotConnected)
429        }
430    }
431
432    /// Handles a subscribe command.
433    ///
434    /// Note: The client has already called `mark_subscribe` before sending this command.
435    async fn handle_subscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
436        let request_id = self.next_request_id();
437
438        // Track this request for response correlation
439        self.pending_requests.insert(
440            request_id,
441            PendingRequestType::Subscribe {
442                channels: channels.clone(),
443            },
444        );
445
446        // Deribit requires private/subscribe for authenticated channels
447        let method = if channels
448            .iter()
449            .any(|ch| DeribitWsChannel::requires_auth(ch))
450        {
451            "private/subscribe"
452        } else {
453            "public/subscribe"
454        };
455
456        let request = DeribitJsonRpcRequest::new(
457            request_id,
458            method,
459            DeribitSubscribeParams {
460                channels: channels.clone(),
461            },
462        );
463
464        let payload =
465            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
466
467        log::debug!("Subscribing to channels: request_id={request_id}, channels={channels:?}");
468        self.send_tracked_request(request_id, payload, None).await
469    }
470
471    /// Handles an unsubscribe command.
472    async fn handle_unsubscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
473        let request_id = self.next_request_id();
474
475        // Track this request for response correlation
476        self.pending_requests.insert(
477            request_id,
478            PendingRequestType::Unsubscribe {
479                channels: channels.clone(),
480            },
481        );
482
483        let method = if channels
484            .iter()
485            .any(|ch| DeribitWsChannel::requires_auth(ch))
486        {
487            "private/unsubscribe"
488        } else {
489            "public/unsubscribe"
490        };
491
492        let request = DeribitJsonRpcRequest::new(
493            request_id,
494            method,
495            DeribitSubscribeParams {
496                channels: channels.clone(),
497            },
498        );
499
500        let payload =
501            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
502
503        log::debug!("Unsubscribing from channels: request_id={request_id}, channels={channels:?}");
504        self.send_tracked_request(request_id, payload, None).await
505    }
506
507    /// Handles enabling heartbeat.
508    async fn handle_set_heartbeat(&mut self, interval: u64) -> Result<(), DeribitWsError> {
509        let request_id = self.next_request_id();
510
511        // Track this request for response correlation
512        self.pending_requests
513            .insert(request_id, PendingRequestType::SetHeartbeat);
514
515        let request = DeribitJsonRpcRequest::new(
516            request_id,
517            "public/set_heartbeat",
518            DeribitHeartbeatParams { interval },
519        );
520
521        let payload =
522            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
523
524        log::debug!(
525            "Enabling heartbeat with interval: request_id={request_id}, interval={interval} seconds"
526        );
527        self.send_tracked_request(request_id, payload, None).await
528    }
529
530    /// Responds to a heartbeat test_request.
531    async fn handle_heartbeat_test_request(&mut self) -> Result<(), DeribitWsError> {
532        let request_id = self.next_request_id();
533
534        // Track this request for response correlation
535        self.pending_requests
536            .insert(request_id, PendingRequestType::Test);
537
538        let request = DeribitJsonRpcRequest::new(request_id, "public/test", serde_json::json!({}));
539
540        let payload =
541            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
542
543        log::trace!("Responding to heartbeat test_request: request_id={request_id}");
544        self.send_tracked_request(request_id, payload, None).await
545    }
546
547    /// Handles a buy order command.
548    async fn handle_buy(
549        &mut self,
550        params: DeribitOrderParams,
551        client_order_id: ClientOrderId,
552        trader_id: TraderId,
553        strategy_id: StrategyId,
554        instrument_id: InstrumentId,
555    ) -> Result<(), DeribitWsError> {
556        let request_id = self.next_request_id();
557
558        self.pending_requests.insert(
559            request_id,
560            PendingRequestType::Buy {
561                client_order_id,
562                trader_id,
563                strategy_id,
564                instrument_id,
565            },
566        );
567
568        let request = DeribitJsonRpcRequest::new(request_id, "private/buy", params);
569
570        let payload =
571            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
572
573        log::debug!("Sending buy order: request_id={request_id}");
574        self.send_tracked_request(
575            request_id,
576            payload,
577            Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()),
578        )
579        .await
580    }
581
582    /// Handles a sell order command.
583    async fn handle_sell(
584        &mut self,
585        params: DeribitOrderParams,
586        client_order_id: ClientOrderId,
587        trader_id: TraderId,
588        strategy_id: StrategyId,
589        instrument_id: InstrumentId,
590    ) -> Result<(), DeribitWsError> {
591        let request_id = self.next_request_id();
592
593        self.pending_requests.insert(
594            request_id,
595            PendingRequestType::Sell {
596                client_order_id,
597                trader_id,
598                strategy_id,
599                instrument_id,
600            },
601        );
602
603        let request = DeribitJsonRpcRequest::new(request_id, "private/sell", params);
604
605        let payload =
606            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
607
608        log::debug!("Sending sell order: request_id={request_id}");
609        self.send_tracked_request(
610            request_id,
611            payload,
612            Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()),
613        )
614        .await
615    }
616
617    /// Handles an edit order command.
618    async fn handle_edit(
619        &mut self,
620        params: DeribitEditParams,
621        client_order_id: ClientOrderId,
622        trader_id: TraderId,
623        strategy_id: StrategyId,
624        instrument_id: InstrumentId,
625    ) -> Result<(), DeribitWsError> {
626        let request_id = self.next_request_id();
627        let order_id = params.order_id.clone();
628
629        self.pending_requests.insert(
630            request_id,
631            PendingRequestType::Edit {
632                client_order_id,
633                trader_id,
634                strategy_id,
635                instrument_id,
636            },
637        );
638
639        let request = DeribitJsonRpcRequest::new(request_id, "private/edit", params);
640
641        let payload =
642            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
643
644        log::debug!("Sending edit order: request_id={request_id}, order_id={order_id}");
645        self.send_tracked_request(
646            request_id,
647            payload,
648            Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()),
649        )
650        .await
651    }
652
653    /// Handles a cancel order command.
654    async fn handle_cancel(
655        &mut self,
656        params: DeribitCancelParams,
657        client_order_id: ClientOrderId,
658        trader_id: TraderId,
659        strategy_id: StrategyId,
660        instrument_id: InstrumentId,
661    ) -> Result<(), DeribitWsError> {
662        let request_id = self.next_request_id();
663        let order_id = params.order_id.clone();
664
665        self.pending_requests.insert(
666            request_id,
667            PendingRequestType::Cancel {
668                client_order_id,
669                trader_id,
670                strategy_id,
671                instrument_id,
672            },
673        );
674
675        let request = DeribitJsonRpcRequest::new(request_id, "private/cancel", params);
676
677        let payload =
678            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
679
680        log::debug!("Sending cancel order: request_id={request_id}, order_id={order_id}");
681        self.send_tracked_request(
682            request_id,
683            payload,
684            Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()),
685        )
686        .await
687    }
688
689    /// Handles cancel all orders by instrument command.
690    async fn handle_cancel_all_by_instrument(
691        &mut self,
692        params: DeribitCancelAllByInstrumentParams,
693        instrument_id: InstrumentId,
694    ) -> Result<(), DeribitWsError> {
695        let request_id = self.next_request_id();
696        let instrument_name = params.instrument_name.clone();
697
698        // Track this request for response correlation
699        self.pending_requests.insert(
700            request_id,
701            PendingRequestType::CancelAllByInstrument { instrument_id },
702        );
703
704        let request =
705            DeribitJsonRpcRequest::new(request_id, "private/cancel_all_by_instrument", params);
706
707        let payload =
708            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
709
710        log::debug!(
711            "Sending cancel_all_by_instrument: request_id={request_id}, instrument={instrument_name}"
712        );
713        self.send_tracked_request(
714            request_id,
715            payload,
716            Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()),
717        )
718        .await
719    }
720
721    /// Handles get order state command.
722    async fn handle_get_order_state(
723        &mut self,
724        order_id: String,
725        client_order_id: ClientOrderId,
726        trader_id: TraderId,
727        strategy_id: StrategyId,
728        instrument_id: InstrumentId,
729    ) -> Result<(), DeribitWsError> {
730        let request_id = self.next_request_id();
731
732        // Track this request for response correlation
733        self.pending_requests.insert(
734            request_id,
735            PendingRequestType::GetOrderState {
736                client_order_id,
737                trader_id,
738                strategy_id,
739                instrument_id,
740            },
741        );
742
743        let params = serde_json::json!({
744            "order_id": order_id
745        });
746
747        let request = DeribitJsonRpcRequest::new(request_id, "private/get_order_state", params);
748
749        let payload =
750            serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
751
752        log::debug!("Sending get_order_state: request_id={request_id}, order_id={order_id}");
753        self.send_tracked_request(
754            request_id,
755            payload,
756            Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()),
757        )
758        .await
759    }
760
761    /// Processes a command from the client.
762    async fn process_command(&mut self, cmd: HandlerCommand) {
763        match cmd {
764            HandlerCommand::SetClient(client) => {
765                log::debug!("Setting WebSocket client");
766                self.inner = Some(client);
767            }
768            HandlerCommand::Disconnect => {
769                log::debug!("Disconnecting WebSocket");
770
771                if let Some(client) = self.inner.take() {
772                    client.disconnect().await;
773                }
774            }
775            HandlerCommand::Authenticate { auth_params } => {
776                let request_id = self.next_request_id();
777                log::debug!("Authenticating: request_id={request_id}");
778
779                // Track this request for response correlation
780                self.pending_requests
781                    .insert(request_id, PendingRequestType::Authenticate);
782
783                let request = DeribitJsonRpcRequest::new(request_id, "public/auth", auth_params);
784                match serde_json::to_string(&request) {
785                    Ok(payload) => {
786                        if let Err(e) = self.send_with_retry(payload, None).await {
787                            self.pending_requests.remove(&request_id);
788                            log::error!("Authentication send failed: {e}");
789                            self.auth_tracker.fail(format!("Send failed: {e}"));
790                        }
791                    }
792                    Err(e) => {
793                        self.pending_requests.remove(&request_id);
794                        log::error!("Failed to serialize auth request: {e}");
795                        self.auth_tracker.fail(format!("Serialization failed: {e}"));
796                    }
797                }
798            }
799            HandlerCommand::SetHeartbeat { interval } => {
800                if let Err(e) = self.handle_set_heartbeat(interval).await {
801                    log::error!("Set heartbeat failed: {e}");
802                }
803            }
804            HandlerCommand::InitializeInstruments(instruments) => {
805                log::info!("Handler received {} instruments", instruments.len());
806                self.instruments_cache.clear();
807                for inst in instruments {
808                    self.instruments_cache
809                        .insert(inst.raw_symbol().inner(), inst);
810                }
811            }
812            HandlerCommand::UpdateInstrument(instrument) => {
813                log::trace!("Updating instrument: {}", instrument.raw_symbol());
814                self.instruments_cache
815                    .insert(instrument.raw_symbol().inner(), *instrument);
816            }
817            HandlerCommand::Subscribe { channels } => {
818                if let Err(e) = self.handle_subscribe(channels).await {
819                    log::error!("Subscribe failed: {e}");
820                }
821            }
822            HandlerCommand::Unsubscribe { channels } => {
823                // User-initiated unsubscribe cancels any pending book resync
824                // for these channels so we don't re-subscribe against user intent
825                self.pending_book_resync.retain(|ch| !channels.contains(ch));
826
827                if let Err(e) = self.handle_unsubscribe(channels).await {
828                    log::error!("Unsubscribe failed: {e}");
829                }
830            }
831            HandlerCommand::Buy {
832                params,
833                client_order_id,
834                trader_id,
835                strategy_id,
836                instrument_id,
837            } => {
838                if let Err(e) = self
839                    .handle_buy(
840                        params,
841                        client_order_id,
842                        trader_id,
843                        strategy_id,
844                        instrument_id,
845                    )
846                    .await
847                {
848                    log::error!("Buy order failed: {e}");
849                }
850            }
851            HandlerCommand::Sell {
852                params,
853                client_order_id,
854                trader_id,
855                strategy_id,
856                instrument_id,
857            } => {
858                if let Err(e) = self
859                    .handle_sell(
860                        params,
861                        client_order_id,
862                        trader_id,
863                        strategy_id,
864                        instrument_id,
865                    )
866                    .await
867                {
868                    log::error!("Sell order failed: {e}");
869                }
870            }
871            HandlerCommand::Edit {
872                params,
873                client_order_id,
874                trader_id,
875                strategy_id,
876                instrument_id,
877            } => {
878                if let Err(e) = self
879                    .handle_edit(
880                        params,
881                        client_order_id,
882                        trader_id,
883                        strategy_id,
884                        instrument_id,
885                    )
886                    .await
887                {
888                    log::error!("Edit order failed: {e}");
889                }
890            }
891            HandlerCommand::Cancel {
892                params,
893                client_order_id,
894                trader_id,
895                strategy_id,
896                instrument_id,
897            } => {
898                if let Err(e) = self
899                    .handle_cancel(
900                        params,
901                        client_order_id,
902                        trader_id,
903                        strategy_id,
904                        instrument_id,
905                    )
906                    .await
907                {
908                    log::error!("Cancel order failed: {e}");
909                }
910            }
911            HandlerCommand::CancelAllByInstrument {
912                params,
913                instrument_id,
914            } => {
915                if let Err(e) = self
916                    .handle_cancel_all_by_instrument(params, instrument_id)
917                    .await
918                {
919                    log::error!("Cancel all by instrument failed: {e}");
920                }
921            }
922            HandlerCommand::GetOrderState {
923                order_id,
924                client_order_id,
925                trader_id,
926                strategy_id,
927                instrument_id,
928            } => {
929                if let Err(e) = self
930                    .handle_get_order_state(
931                        order_id,
932                        client_order_id,
933                        trader_id,
934                        strategy_id,
935                        instrument_id,
936                    )
937                    .await
938                {
939                    log::error!("Get order state failed: {e}");
940                }
941            }
942        }
943    }
944
945    /// Processes a raw WebSocket message.
946    async fn process_raw_message(&mut self, text: &str) -> Option<NautilusWsMessage> {
947        if text == RECONNECTED {
948            log::info!("Received reconnection signal");
949
950            self.auth_tracker.invalidate();
951            self.clear_state();
952
953            return Some(NautilusWsMessage::Reconnected);
954        }
955
956        // Parse the JSON-RPC message
957        let ws_msg = match parse_raw_message(text) {
958            Ok(msg) => msg,
959            Err(e) => {
960                log::warn!("Failed to parse message: {e}");
961                return None;
962            }
963        };
964
965        let ts_init = self.ts_init();
966
967        match ws_msg {
968            DeribitWsMessage::Response(response) => {
969                // Look up the request type by ID for explicit correlation
970                if let Some(request_id) = response.id
971                    && let Some(request_type) = self.pending_requests.remove(&request_id)
972                {
973                    match request_type {
974                        PendingRequestType::Authenticate => {
975                            if let Some(error) = &response.error {
976                                let reason = format!(
977                                    "Authentication error code={}: {}",
978                                    error.code, error.message
979                                );
980                                log::error!(
981                                    "Authentication failed: code={}, message={}, request_id={}",
982                                    error.code,
983                                    error.message,
984                                    request_id
985                                );
986                                self.auth_tracker.fail(reason.clone());
987                                return Some(NautilusWsMessage::AuthenticationFailed(reason));
988                            } else if let Some(result) = &response.result {
989                                match serde_json::from_value::<DeribitAuthResult>(result.clone()) {
990                                    Ok(auth_result) => {
991                                        self.auth_tracker.succeed();
992                                        log::debug!(
993                                            "WebSocket authenticated successfully (request_id={}, scope={}, expires_in={}s)",
994                                            request_id,
995                                            auth_result.scope,
996                                            auth_result.expires_in
997                                        );
998                                        return Some(NautilusWsMessage::Authenticated(Box::new(
999                                            auth_result,
1000                                        )));
1001                                    }
1002                                    Err(e) => {
1003                                        let reason = format!("Failed to parse auth result: {e}");
1004                                        log::error!("{reason}: request_id={request_id}");
1005                                        self.auth_tracker.fail(reason.clone());
1006                                        return Some(NautilusWsMessage::AuthenticationFailed(
1007                                            reason,
1008                                        ));
1009                                    }
1010                                }
1011                            }
1012                        }
1013                        PendingRequestType::Subscribe { channels } => {
1014                            if let Some(error) = &response.error {
1015                                log::error!(
1016                                    "Subscribe failed: code={}, message={}, channels={:?}, request_id={}",
1017                                    error.code,
1018                                    error.message,
1019                                    channels,
1020                                    request_id
1021                                );
1022
1023                                if let Ok(mut errors) = self.subscribe_errors.lock() {
1024                                    errors.push(format!(
1025                                        "Subscribe rejected: code={}, message={}",
1026                                        error.code, error.message,
1027                                    ));
1028                                }
1029                            } else {
1030                                // Confirm each channel in the subscription
1031                                for ch in &channels {
1032                                    self.subscriptions_state.confirm_subscribe(ch);
1033                                    log::debug!("Subscription confirmed: {ch}");
1034                                }
1035                            }
1036                        }
1037                        PendingRequestType::Unsubscribe { channels } => {
1038                            if let Some(error) = &response.error {
1039                                log::error!(
1040                                    "Unsubscribe failed: code={}, message={}, channels={:?}, request_id={}",
1041                                    error.code,
1042                                    error.message,
1043                                    channels,
1044                                    request_id
1045                                );
1046                            } else {
1047                                for ch in &channels {
1048                                    self.subscriptions_state.confirm_unsubscribe(ch);
1049                                    log::debug!("Unsubscription confirmed: {ch}");
1050                                }
1051                            }
1052
1053                            // Resubscribe channels pending book resync (kept in
1054                            // pending_book_resync until a fresh snapshot arrives)
1055                            if !self.pending_book_resync.is_empty() {
1056                                let resync: Vec<String> = channels
1057                                    .iter()
1058                                    .filter(|ch| self.pending_book_resync.contains(ch))
1059                                    .cloned()
1060                                    .collect();
1061
1062                                if !resync.is_empty() {
1063                                    let _ = self.handle_subscribe(resync).await;
1064                                }
1065                            }
1066                        }
1067                        PendingRequestType::SetHeartbeat => {
1068                            if let Some(error) = &response.error {
1069                                log::error!(
1070                                    "Set heartbeat failed: code={}, message={}, request_id={}",
1071                                    error.code,
1072                                    error.message,
1073                                    request_id
1074                                );
1075                            } else {
1076                                log::debug!("Heartbeat enabled (request_id={request_id})");
1077                            }
1078                        }
1079                        PendingRequestType::Test => {
1080                            if let Some(error) = &response.error {
1081                                log::warn!(
1082                                    "Heartbeat test failed: code={}, message={}, request_id={}",
1083                                    error.code,
1084                                    error.message,
1085                                    request_id
1086                                );
1087                            } else {
1088                                log::trace!(
1089                                    "Heartbeat test acknowledged (request_id={request_id})"
1090                                );
1091                            }
1092                        }
1093                        PendingRequestType::Cancel {
1094                            client_order_id,
1095                            trader_id,
1096                            strategy_id,
1097                            instrument_id,
1098                        } => {
1099                            if let Some(result) = &response.result {
1100                                match serde_json::from_value::<DeribitOrderMsg>(result.clone()) {
1101                                    Ok(order_msg) => {
1102                                        let venue_order_id =
1103                                            VenueOrderId::new(order_msg.order_id.as_str());
1104                                        log::debug!(
1105                                            "Cancel confirmed: venue_order_id={venue_order_id}, \
1106                                            client_order_id={client_order_id}, state={}",
1107                                            order_msg.order_state
1108                                        );
1109
1110                                        // Emit OrderCanceled from the response path so we
1111                                        // do not lose the event during a reconnection gap.
1112                                        // Both paths check terminal_orders to suppress
1113                                        // duplicates regardless of which arrives first.
1114                                        if order_msg.order_state == "cancelled"
1115                                            && !self.terminal_orders.contains(&client_order_id)
1116                                        {
1117                                            let instrument_name_ustr =
1118                                                Ustr::from(order_msg.instrument_name.as_str());
1119
1120                                            if let Some(instrument) =
1121                                                self.instruments_cache.get(&instrument_name_ustr)
1122                                                && let Some(account_id) = self.account_id
1123                                            {
1124                                                let event = parse_order_canceled(
1125                                                    &order_msg,
1126                                                    instrument,
1127                                                    account_id,
1128                                                    trader_id,
1129                                                    strategy_id,
1130                                                    ts_init,
1131                                                );
1132                                                // Keep order_contexts so the subscription
1133                                                // path resolves the same client_order_id
1134                                                // and hits the terminal_orders dedup check
1135                                                self.terminal_orders.add(client_order_id);
1136                                                return Some(NautilusWsMessage::OrderCanceled(
1137                                                    event,
1138                                                ));
1139                                            }
1140                                        }
1141                                    }
1142                                    Err(e) => {
1143                                        log::error!(
1144                                            "Failed to parse cancel response: request_id={request_id}, error={e}"
1145                                        );
1146                                    }
1147                                }
1148                            } else if let Some(error) = &response.error {
1149                                log::error!(
1150                                    "Cancel rejected: code={}, message={}, client_order_id={}",
1151                                    error.code,
1152                                    error.message,
1153                                    client_order_id
1154                                );
1155                                return Some(NautilusWsMessage::OrderCancelRejected(
1156                                    OrderCancelRejected::new(
1157                                        trader_id,
1158                                        strategy_id,
1159                                        instrument_id,
1160                                        client_order_id,
1161                                        ustr::ustr(&format!(
1162                                            "code={}: {}",
1163                                            error.code, error.message
1164                                        )),
1165                                        UUID4::new(),
1166                                        ts_init,
1167                                        ts_init,
1168                                        false,
1169                                        None, // venue_order_id not available in error response
1170                                        self.account_id,
1171                                    ),
1172                                ));
1173                            }
1174                        }
1175                        PendingRequestType::CancelAllByInstrument { instrument_id } => {
1176                            if let Some(result) = &response.result {
1177                                match serde_json::from_value::<u64>(result.clone()) {
1178                                    Ok(count) => {
1179                                        log::info!(
1180                                            "Cancelled {count} orders for instrument {instrument_id}"
1181                                        );
1182                                        // Individual order status updates come via user.orders subscription
1183                                    }
1184                                    Err(e) => {
1185                                        log::warn!("Failed to parse cancel_all response: {e}");
1186                                    }
1187                                }
1188                            } else if let Some(error) = &response.error {
1189                                log::error!(
1190                                    "Cancel all by instrument rejected: code={}, message={}, instrument_id={}",
1191                                    error.code,
1192                                    error.message,
1193                                    instrument_id
1194                                );
1195                            }
1196                        }
1197                        PendingRequestType::Buy {
1198                            client_order_id,
1199                            trader_id,
1200                            strategy_id,
1201                            instrument_id,
1202                        }
1203                        | PendingRequestType::Sell {
1204                            client_order_id,
1205                            trader_id,
1206                            strategy_id,
1207                            instrument_id,
1208                        } => {
1209                            if let Some(result) = &response.result {
1210                                match serde_json::from_value::<DeribitOrderResponse>(result.clone())
1211                                {
1212                                    Ok(order_response) => {
1213                                        let venue_order_id_str = &order_response.order.order_id;
1214                                        let venue_order_id =
1215                                            VenueOrderId::new(venue_order_id_str.as_str());
1216                                        let order_state = &order_response.order.order_state;
1217                                        log::debug!(
1218                                            "Order response: venue_order_id={venue_order_id}, client_order_id={client_order_id}, state={order_state}"
1219                                        );
1220
1221                                        self.order_contexts.insert(
1222                                            venue_order_id,
1223                                            OrderContext {
1224                                                client_order_id,
1225                                                trader_id,
1226                                                strategy_id,
1227                                                instrument_id,
1228                                            },
1229                                        );
1230
1231                                        // Skip OrderAccepted if order already reached terminal state
1232                                        if self.terminal_orders.contains(&client_order_id) {
1233                                            log::debug!(
1234                                                "Skipping OrderAccepted for terminal order: client_order_id={client_order_id}"
1235                                            );
1236                                            self.emitted_accepted.add(venue_order_id);
1237                                        } else if order_state == "filled" {
1238                                            // Order went directly Submitted -> Filled (e.g., market orders)
1239                                            log::debug!(
1240                                                "Skipping OrderAccepted for already filled order: venue_order_id={venue_order_id}, client_order_id={client_order_id}"
1241                                            );
1242                                            self.terminal_orders.add(client_order_id);
1243                                            self.emitted_accepted.add(venue_order_id);
1244                                        } else {
1245                                            let instrument_name_ustr = Ustr::from(
1246                                                order_response.order.instrument_name.as_str(),
1247                                            );
1248
1249                                            if let Some(instrument) =
1250                                                self.instruments_cache.get(&instrument_name_ustr)
1251                                            {
1252                                                if let Some(account_id) = self.account_id {
1253                                                    let event = parse_order_accepted(
1254                                                        &order_response.order,
1255                                                        instrument,
1256                                                        account_id,
1257                                                        trader_id,
1258                                                        strategy_id,
1259                                                        ts_init,
1260                                                    );
1261                                                    // Mark OrderAccepted as emitted to prevent duplicate from subscription
1262                                                    self.emitted_accepted.add(venue_order_id);
1263                                                    return Some(NautilusWsMessage::OrderAccepted(
1264                                                        event,
1265                                                    ));
1266                                                } else {
1267                                                    log::warn!(
1268                                                        "Cannot create OrderAccepted: account_id not set"
1269                                                    );
1270                                                }
1271                                            } else {
1272                                                log::warn!(
1273                                                    "Instrument {instrument_name_ustr} not found in cache for order response"
1274                                                );
1275                                            }
1276                                        }
1277                                    }
1278                                    Err(e) => {
1279                                        log::error!(
1280                                            "Failed to parse order response: request_id={request_id}, error={e}"
1281                                        );
1282                                        return Some(NautilusWsMessage::OrderRejected(
1283                                            OrderRejected::new(
1284                                                trader_id,
1285                                                strategy_id,
1286                                                instrument_id,
1287                                                client_order_id,
1288                                                self.account_id
1289                                                    .unwrap_or(AccountId::new("DERIBIT-UNKNOWN")),
1290                                                ustr::ustr(&format!(
1291                                                    "Failed to parse response: {e}"
1292                                                )),
1293                                                UUID4::new(),
1294                                                ts_init,
1295                                                ts_init,
1296                                                false,
1297                                                false,
1298                                            ),
1299                                        ));
1300                                    }
1301                                }
1302                            } else if let Some(error) = &response.error {
1303                                let due_post_only = error.code == DERIBIT_POST_ONLY_ERROR_CODE;
1304                                let reason = if let Some(data) = &error.data {
1305                                    format!(
1306                                        "code={}: {} (data: {})",
1307                                        error.code, error.message, data
1308                                    )
1309                                } else {
1310                                    format!("code={}: {}", error.code, error.message)
1311                                };
1312
1313                                log::debug!(
1314                                    "Order rejected: {reason}, client_order_id={client_order_id}"
1315                                );
1316                                return Some(NautilusWsMessage::OrderRejected(OrderRejected::new(
1317                                    trader_id,
1318                                    strategy_id,
1319                                    instrument_id,
1320                                    client_order_id,
1321                                    self.account_id.unwrap_or(AccountId::new("DERIBIT-UNKNOWN")),
1322                                    ustr::ustr(&reason),
1323                                    UUID4::new(),
1324                                    ts_init,
1325                                    ts_init,
1326                                    false,
1327                                    due_post_only,
1328                                )));
1329                            }
1330                        }
1331                        PendingRequestType::Edit {
1332                            client_order_id,
1333                            trader_id,
1334                            strategy_id,
1335                            instrument_id,
1336                        } => {
1337                            if let Some(result) = &response.result {
1338                                match serde_json::from_value::<DeribitOrderResponse>(result.clone())
1339                                {
1340                                    Ok(order_response) => {
1341                                        let venue_order_id =
1342                                            VenueOrderId::new(&order_response.order.order_id);
1343                                        log::info!(
1344                                            "Order updated: venue_order_id={}, client_order_id={}, state={}",
1345                                            venue_order_id,
1346                                            client_order_id,
1347                                            order_response.order.order_state
1348                                        );
1349
1350                                        self.order_contexts.insert(
1351                                            venue_order_id,
1352                                            OrderContext {
1353                                                client_order_id,
1354                                                trader_id,
1355                                                strategy_id,
1356                                                instrument_id,
1357                                            },
1358                                        );
1359
1360                                        let instrument_name_ustr = Ustr::from(
1361                                            order_response.order.instrument_name.as_str(),
1362                                        );
1363
1364                                        if let Some(instrument) =
1365                                            self.instruments_cache.get(&instrument_name_ustr)
1366                                        {
1367                                            if let Some(account_id) = self.account_id {
1368                                                let event = parse_order_updated(
1369                                                    &order_response.order,
1370                                                    instrument,
1371                                                    account_id,
1372                                                    trader_id,
1373                                                    strategy_id,
1374                                                    ts_init,
1375                                                );
1376                                                return Some(NautilusWsMessage::OrderUpdated(
1377                                                    event,
1378                                                ));
1379                                            } else {
1380                                                log::warn!(
1381                                                    "Cannot create OrderUpdated: account_id not set"
1382                                                );
1383                                            }
1384                                        } else {
1385                                            log::warn!(
1386                                                "Instrument {instrument_name_ustr} not found in cache for edit response"
1387                                            );
1388                                        }
1389                                    }
1390                                    Err(e) => {
1391                                        log::error!(
1392                                            "Failed to parse edit response: request_id={request_id}, error={e}"
1393                                        );
1394                                        return Some(NautilusWsMessage::OrderModifyRejected(
1395                                            OrderModifyRejected::new(
1396                                                trader_id,
1397                                                strategy_id,
1398                                                instrument_id,
1399                                                client_order_id,
1400                                                ustr::ustr(&format!(
1401                                                    "Failed to parse response: {e}"
1402                                                )),
1403                                                UUID4::new(),
1404                                                ts_init,
1405                                                ts_init,
1406                                                false,
1407                                                None, // venue_order_id not available
1408                                                self.account_id,
1409                                            ),
1410                                        ));
1411                                    }
1412                                }
1413                            } else if let Some(error) = &response.error {
1414                                log::error!(
1415                                    "Order modify rejected: code={}, message={}, client_order_id={}",
1416                                    error.code,
1417                                    error.message,
1418                                    client_order_id
1419                                );
1420                                return Some(NautilusWsMessage::OrderModifyRejected(
1421                                    OrderModifyRejected::new(
1422                                        trader_id,
1423                                        strategy_id,
1424                                        instrument_id,
1425                                        client_order_id,
1426                                        ustr::ustr(&format!(
1427                                            "code={}: {}",
1428                                            error.code, error.message
1429                                        )),
1430                                        UUID4::new(),
1431                                        ts_init,
1432                                        ts_init,
1433                                        false,
1434                                        None, // venue_order_id not available
1435                                        self.account_id,
1436                                    ),
1437                                ));
1438                            }
1439                        }
1440                        PendingRequestType::GetOrderState {
1441                            client_order_id,
1442                            trader_id: _,
1443                            strategy_id: _,
1444                            instrument_id: _,
1445                        } => {
1446                            if let Some(result) = &response.result {
1447                                match serde_json::from_value::<DeribitOrderMsg>(result.clone()) {
1448                                    Ok(order_msg) => {
1449                                        log::info!(
1450                                            "Order state received: venue_order_id={}, client_order_id={}, state={}",
1451                                            order_msg.order_id,
1452                                            client_order_id,
1453                                            order_msg.order_state
1454                                        );
1455
1456                                        // Convert to OrderStatusReport
1457                                        let instrument_name_ustr = order_msg.instrument_name;
1458
1459                                        if let Some(instrument) =
1460                                            self.instruments_cache.get(&instrument_name_ustr)
1461                                        {
1462                                            if let Some(account_id) = self.account_id {
1463                                                match parse_user_order_msg(
1464                                                    &order_msg, instrument, account_id, ts_init,
1465                                                ) {
1466                                                    Ok(report) => {
1467                                                        return Some(
1468                                                            NautilusWsMessage::OrderStatusReports(
1469                                                                vec![report],
1470                                                            ),
1471                                                        );
1472                                                    }
1473                                                    Err(e) => {
1474                                                        log::warn!(
1475                                                            "Failed to parse get_order_state response to report: {e}"
1476                                                        );
1477                                                    }
1478                                                }
1479                                            } else {
1480                                                log::warn!(
1481                                                    "Cannot create OrderStatusReport: account_id not set"
1482                                                );
1483                                            }
1484                                        } else {
1485                                            log::warn!(
1486                                                "Instrument {instrument_name_ustr} not found in cache for get_order_state response"
1487                                            );
1488                                        }
1489                                    }
1490                                    Err(e) => {
1491                                        log::error!(
1492                                            "Failed to parse get_order_state response: request_id={request_id}, error={e}"
1493                                        );
1494                                    }
1495                                }
1496                            } else if let Some(error) = &response.error {
1497                                log::error!(
1498                                    "Get order state failed: code={}, message={}, client_order_id={}",
1499                                    error.code,
1500                                    error.message,
1501                                    client_order_id
1502                                );
1503                            }
1504                        }
1505                    }
1506                } else if let Some(request_id) = response.id {
1507                    // Response with ID but no matching pending request
1508                    if let Some(error) = &response.error {
1509                        // Log orphaned error response with all available context
1510                        log::error!(
1511                            "Deribit error for unknown request: code={}, message={}, request_id={}, data={:?}",
1512                            error.code,
1513                            error.message,
1514                            request_id,
1515                            error.data
1516                        );
1517                        return Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
1518                            code: error.code,
1519                            message: error.message.clone(),
1520                        }));
1521                    } else {
1522                        // Success response but no pending request - likely already processed
1523                        log::debug!(
1524                            "Received response for unknown request_id={}, result present: {}",
1525                            request_id,
1526                            response.result.is_some()
1527                        );
1528                    }
1529                } else if let Some(error) = &response.error {
1530                    // Error response with no ID (shouldn't happen in JSON-RPC 2.0, but handle it)
1531                    log::error!(
1532                        "Deribit error with no request_id: code={}, message={}, data={:?}",
1533                        error.code,
1534                        error.message,
1535                        error.data
1536                    );
1537                    return Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
1538                        code: error.code,
1539                        message: error.message.clone(),
1540                    }));
1541                }
1542                None
1543            }
1544            DeribitWsMessage::Notification(notification) => {
1545                let channel = &notification.params.channel;
1546                let data = &notification.params.data;
1547
1548                // Determine channel type and parse accordingly
1549                if let Some(channel_type) = DeribitWsChannel::from_channel_string(channel) {
1550                    match channel_type {
1551                        DeribitWsChannel::Trades => {
1552                            // Parse trade messages
1553                            match serde_json::from_value::<Vec<DeribitTradeMsg>>(data.clone()) {
1554                                Ok(trades) => {
1555                                    log::debug!("Received {} trades", trades.len());
1556                                    let data_vec = parse_trades_data(
1557                                        &trades,
1558                                        &self.instruments_cache,
1559                                        ts_init,
1560                                    );
1561
1562                                    if data_vec.is_empty() {
1563                                        log::debug!(
1564                                            "No trades parsed - instrument cache size: {}",
1565                                            self.instruments_cache.len()
1566                                        );
1567                                    } else {
1568                                        log::debug!("Parsed {} trade ticks", data_vec.len());
1569                                        return Some(NautilusWsMessage::Data(data_vec));
1570                                    }
1571                                }
1572                                Err(e) => {
1573                                    log::warn!("Failed to deserialize trades: {e}");
1574                                }
1575                            }
1576                        }
1577                        DeribitWsChannel::Book => {
1578                            // Parse order book messages
1579                            match serde_json::from_value::<DeribitBookMsg>(data.clone()) {
1580                                Ok(book_msg) => {
1581                                    if let Some(instrument) =
1582                                        self.instruments_cache.get(&book_msg.instrument_name)
1583                                    {
1584                                        let inst_name = book_msg.instrument_name.to_string();
1585                                        let awaiting_resync =
1586                                            self.pending_book_resync.iter().any(|ch| {
1587                                                ch.starts_with("book.")
1588                                                    && ch
1589                                                        .split('.')
1590                                                        .nth(1)
1591                                                        .is_some_and(|s| s == inst_name)
1592                                            });
1593
1594                                        if awaiting_resync
1595                                            && book_msg.msg_type == DeribitBookMsgType::Change
1596                                        {
1597                                            // Drop deltas while awaiting resync snapshot
1598                                        } else if awaiting_resync
1599                                            && book_msg.msg_type == DeribitBookMsgType::Snapshot
1600                                        {
1601                                            self.pending_book_resync.retain(|ch| {
1602                                                !(ch.starts_with("book.")
1603                                                    && ch
1604                                                        .split('.')
1605                                                        .nth(1)
1606                                                        .is_some_and(|s| s == inst_name))
1607                                            });
1608                                            self.book_sequence.insert(
1609                                                book_msg.instrument_name,
1610                                                book_msg.change_id,
1611                                            );
1612
1613                                            match parse_book_msg(&book_msg, instrument, ts_init) {
1614                                                Ok(deltas) => {
1615                                                    return Some(NautilusWsMessage::Deltas(deltas));
1616                                                }
1617                                                Err(e) => {
1618                                                    log::warn!("Failed to parse book message: {e}");
1619                                                }
1620                                            }
1621                                        } else if book_msg.msg_type == DeribitBookMsgType::Change
1622                                            && let Some(prev_id) = book_msg.prev_change_id
1623                                            && let Some(&last_id) =
1624                                                self.book_sequence.get(&book_msg.instrument_name)
1625                                            && prev_id != last_id
1626                                        {
1627                                            log::error!(
1628                                                "Book sequence gap for {}: expected prev_change_id={}, was {} \
1629                                                - dropping delta, forcing resync",
1630                                                book_msg.instrument_name,
1631                                                last_id,
1632                                                prev_id
1633                                            );
1634                                            self.book_sequence.remove(&book_msg.instrument_name);
1635
1636                                            let book_channels: Vec<String> = self
1637                                                .subscriptions_state
1638                                                .all_topics()
1639                                                .into_iter()
1640                                                .filter(|t| {
1641                                                    t.starts_with("book.")
1642                                                        && t.split('.')
1643                                                            .nth(1)
1644                                                            .is_some_and(|s| s == inst_name)
1645                                                })
1646                                                .collect();
1647
1648                                            if !book_channels.is_empty() {
1649                                                for ch in &book_channels {
1650                                                    self.subscriptions_state.mark_failure(ch);
1651                                                }
1652                                                // Defer resubscribe until unsubscribe ack
1653                                                self.pending_book_resync
1654                                                    .extend(book_channels.clone());
1655                                                let _ =
1656                                                    self.handle_unsubscribe(book_channels).await;
1657                                            }
1658                                        } else {
1659                                            self.book_sequence.insert(
1660                                                book_msg.instrument_name,
1661                                                book_msg.change_id,
1662                                            );
1663
1664                                            match parse_book_msg(&book_msg, instrument, ts_init) {
1665                                                Ok(deltas) => {
1666                                                    return Some(NautilusWsMessage::Deltas(deltas));
1667                                                }
1668                                                Err(e) => {
1669                                                    log::warn!("Failed to parse book message: {e}");
1670                                                }
1671                                            }
1672                                        }
1673                                    } else {
1674                                        log::warn!(
1675                                            "Book message received but instrument '{}' not found in cache (cache size: {})",
1676                                            book_msg.instrument_name,
1677                                            self.instruments_cache.len()
1678                                        );
1679                                    }
1680                                }
1681                                Err(e) => {
1682                                    log::warn!(
1683                                        "Failed to deserialize book message: {e}, channel: {channel}"
1684                                    );
1685                                }
1686                            }
1687                        }
1688                        DeribitWsChannel::Ticker => {
1689                            if let Ok(ticker_msg) =
1690                                serde_json::from_value::<DeribitTickerMsg>(data.clone())
1691                                && let Some(instrument) =
1692                                    self.instruments_cache.get(&ticker_msg.instrument_name)
1693                            {
1694                                // Emit OptionGreeks only if subscribed
1695                                if self.option_greeks_subs.contains(&instrument.id())
1696                                    && let Some(option_greeks) = parse_ticker_to_option_greeks(
1697                                        &ticker_msg,
1698                                        instrument,
1699                                        ts_init,
1700                                    )
1701                                {
1702                                    let _ = self
1703                                        .out_tx
1704                                        .send(NautilusWsMessage::OptionGreeks(option_greeks));
1705                                }
1706
1707                                let instrument_id = instrument.id();
1708                                let mut data_vec = Vec::new();
1709
1710                                // Emit MarkPriceUpdate only if subscribed
1711                                if self.mark_price_subs.contains(&instrument_id) {
1712                                    match parse_ticker_to_mark_price(
1713                                        &ticker_msg,
1714                                        instrument,
1715                                        ts_init,
1716                                    ) {
1717                                        Ok(mark_price) => {
1718                                            data_vec.push(Data::MarkPriceUpdate(mark_price));
1719                                        }
1720                                        Err(e) => log::warn!("Failed to parse mark price: {e}"),
1721                                    }
1722                                }
1723
1724                                // Emit IndexPriceUpdate only if subscribed
1725                                if self.index_price_subs.contains(&instrument_id) {
1726                                    match parse_ticker_to_index_price(
1727                                        &ticker_msg,
1728                                        instrument,
1729                                        ts_init,
1730                                    ) {
1731                                        Ok(index_price) => {
1732                                            data_vec.push(Data::IndexPriceUpdate(index_price));
1733                                        }
1734                                        Err(e) => log::warn!("Failed to parse index price: {e}"),
1735                                    }
1736                                }
1737
1738                                if !data_vec.is_empty() {
1739                                    return Some(NautilusWsMessage::Data(data_vec));
1740                                }
1741                            }
1742                        }
1743                        DeribitWsChannel::Perpetual => {
1744                            // Parse perpetual channel for funding rate updates
1745                            // This channel is dedicated to perpetual instruments and provides
1746                            // the interest (funding) rate
1747                            match serde_json::from_value::<DeribitPerpetualMsg>(data.clone()) {
1748                                Ok(perpetual_msg) => {
1749                                    // Extract instrument name from channel: perpetual.{instrument}.{interval}
1750                                    let parts: Vec<&str> = channel.split('.').collect();
1751                                    if parts.len() >= 2 {
1752                                        let instrument_name = Ustr::from(parts[1]);
1753
1754                                        if let Some(instrument) =
1755                                            self.instruments_cache.get(&instrument_name)
1756                                        {
1757                                            let funding_rate = parse_perpetual_to_funding_rate(
1758                                                &perpetual_msg,
1759                                                instrument,
1760                                                ts_init,
1761                                            );
1762                                            return Some(NautilusWsMessage::FundingRates(vec![
1763                                                funding_rate,
1764                                            ]));
1765                                        } else {
1766                                            log::warn!(
1767                                                "Instrument {} not found in cache (cache size: {})",
1768                                                instrument_name,
1769                                                self.instruments_cache.len()
1770                                            );
1771                                        }
1772                                    }
1773                                }
1774                                Err(e) => {
1775                                    log::warn!(
1776                                        "Failed to deserialize perpetual message: {e}, data: {data}"
1777                                    );
1778                                }
1779                            }
1780                        }
1781                        DeribitWsChannel::Quote => {
1782                            // Parse quote messages
1783                            if let Ok(quote_msg) =
1784                                serde_json::from_value::<DeribitQuoteMsg>(data.clone())
1785                                && let Some(instrument) =
1786                                    self.instruments_cache.get(&quote_msg.instrument_name)
1787                            {
1788                                match parse_quote_msg(&quote_msg, instrument, ts_init) {
1789                                    Ok(quote) => {
1790                                        return Some(NautilusWsMessage::Data(vec![Data::Quote(
1791                                            quote,
1792                                        )]));
1793                                    }
1794                                    Err(e) => {
1795                                        log::warn!("Failed to parse quote message: {e}");
1796                                    }
1797                                }
1798                            }
1799                        }
1800                        DeribitWsChannel::InstrumentState => {
1801                            match serde_json::from_value::<DeribitInstrumentStateMsg>(data.clone())
1802                            {
1803                                Ok(state_msg) => {
1804                                    log::info!(
1805                                        "Instrument state change: {} -> {} (timestamp: {})",
1806                                        state_msg.instrument_name,
1807                                        state_msg.state,
1808                                        state_msg.timestamp
1809                                    );
1810
1811                                    let instrument_id = if let Some(instrument) =
1812                                        self.instruments_cache.get(&state_msg.instrument_name)
1813                                    {
1814                                        instrument.id()
1815                                    } else {
1816                                        log::debug!(
1817                                            "Instrument '{}' not in cache, constructing ID",
1818                                            state_msg.instrument_name
1819                                        );
1820                                        InstrumentId::new(
1821                                            Symbol::new(state_msg.instrument_name),
1822                                            *DERIBIT_VENUE,
1823                                        )
1824                                    };
1825
1826                                    let action = MarketStatusAction::from(state_msg.state);
1827                                    let is_trading =
1828                                        Some(state_msg.state == DeribitInstrumentState::Started);
1829                                    let ts_event = UnixNanos::from(state_msg.timestamp * 1_000_000);
1830                                    let status = InstrumentStatus::new(
1831                                        instrument_id,
1832                                        action,
1833                                        ts_event,
1834                                        ts_init,
1835                                        None,
1836                                        None,
1837                                        is_trading,
1838                                        None,
1839                                        None,
1840                                    );
1841                                    return Some(NautilusWsMessage::InstrumentStatus(status));
1842                                }
1843                                Err(e) => {
1844                                    log::warn!("Failed to parse instrument status message: {e}");
1845                                }
1846                            }
1847                        }
1848                        DeribitWsChannel::ChartTrades => {
1849                            // Parse chart.trades messages into Bar objects using emit-on-next pattern.
1850                            // Deribit sends updates for the current bar as it builds. We only emit
1851                            // a bar when we receive a bar with a different timestamp, confirming
1852                            // the previous bar is closed.
1853                            if let Ok(chart_msg) =
1854                                serde_json::from_value::<DeribitChartMsg>(data.clone())
1855                            {
1856                                // Extract instrument and resolution from channel
1857                                // Channel format: chart.trades.{instrument}.{resolution}
1858                                let parts: Vec<&str> = channel.split('.').collect();
1859                                if parts.len() >= 4 {
1860                                    let instrument_name = Ustr::from(parts[2]);
1861                                    let resolution = parts[3];
1862
1863                                    if let Some(instrument) =
1864                                        self.instruments_cache.get(&instrument_name)
1865                                    {
1866                                        let instrument_id = instrument.id();
1867
1868                                        match resolution_to_bar_type(instrument_id, resolution) {
1869                                            Ok(bar_type) => {
1870                                                let price_precision = instrument.price_precision();
1871                                                let size_precision = instrument.size_precision();
1872
1873                                                match parse_chart_msg(
1874                                                    &chart_msg,
1875                                                    bar_type,
1876                                                    price_precision,
1877                                                    size_precision,
1878                                                    self.bars_timestamp_on_close,
1879                                                    ts_init,
1880                                                ) {
1881                                                    Ok(new_bar) => {
1882                                                        // Check if we have a pending bar for this channel
1883                                                        let channel_key = channel.clone();
1884
1885                                                        if let Some(pending_bar) =
1886                                                            self.pending_bars.get(&channel_key)
1887                                                        {
1888                                                            // If new bar has different timestamp, the pending bar is closed
1889                                                            if new_bar.ts_event
1890                                                                != pending_bar.ts_event
1891                                                            {
1892                                                                let closed_bar = *pending_bar;
1893                                                                self.pending_bars
1894                                                                    .insert(channel_key, new_bar);
1895                                                                log::debug!(
1896                                                                    "Emitting closed bar: {closed_bar:?}"
1897                                                                );
1898                                                                return Some(
1899                                                                    NautilusWsMessage::Data(vec![
1900                                                                        Data::Bar(closed_bar),
1901                                                                    ]),
1902                                                                );
1903                                                            }
1904                                                            // Same timestamp - update pending bar with latest values
1905                                                            self.pending_bars
1906                                                                .insert(channel_key, new_bar);
1907                                                        } else {
1908                                                            // First bar for this channel - store as pending
1909                                                            self.pending_bars
1910                                                                .insert(channel_key, new_bar);
1911                                                        }
1912                                                    }
1913                                                    Err(e) => {
1914                                                        log::warn!(
1915                                                            "Failed to parse chart message to bar: {e}"
1916                                                        );
1917                                                    }
1918                                                }
1919                                            }
1920                                            Err(e) => {
1921                                                log::warn!(
1922                                                    "Failed to create BarType from resolution {resolution}: {e}"
1923                                                );
1924                                            }
1925                                        }
1926                                    } else {
1927                                        log::warn!(
1928                                            "Instrument {instrument_name} not found in cache for chart data"
1929                                        );
1930                                    }
1931                                }
1932                            }
1933                        }
1934                        DeribitWsChannel::UserOrders => {
1935                            // Handle both array and single object responses
1936                            let orders_result =
1937                                serde_json::from_value::<Vec<DeribitOrderMsg>>(data.clone())
1938                                    .or_else(|_| {
1939                                        serde_json::from_value::<DeribitOrderMsg>(data.clone())
1940                                            .map(|order| vec![order])
1941                                    });
1942
1943                            match orders_result {
1944                                Ok(orders) => {
1945                                    log::debug!("Received {} user order updates", orders.len());
1946
1947                                    // Require account_id for parsing
1948                                    let Some(account_id) = self.account_id else {
1949                                        log::warn!("Cannot parse user orders: account_id not set");
1950                                        return Some(NautilusWsMessage::Raw(data.clone()));
1951                                    };
1952
1953                                    let mut outgoing = Vec::new();
1954
1955                                    // Process each order and emit appropriate events
1956                                    for order in &orders {
1957                                        let venue_order_id_str = &order.order_id;
1958                                        let venue_order_id =
1959                                            VenueOrderId::new(venue_order_id_str.as_str());
1960                                        let instrument_name = order.instrument_name;
1961
1962                                        let Some(instrument) =
1963                                            self.instruments_cache.get(&instrument_name)
1964                                        else {
1965                                            log::warn!(
1966                                                "Instrument {instrument_name} not found in cache"
1967                                            );
1968                                            continue;
1969                                        };
1970
1971                                        // Look up OrderContext for this order
1972                                        // First check order_contexts (for orders whose response has been processed)
1973                                        // Then check pending_requests (for orders whose response hasn't arrived yet)
1974                                        // If neither found, this is a true external order
1975                                        let context =
1976                                            self.order_contexts.get(&venue_order_id).cloned();
1977
1978                                        // Extract client_order_id from order label for pending check
1979                                        let label_client_order_id = order
1980                                            .label
1981                                            .as_ref()
1982                                            .filter(|l| !l.is_empty())
1983                                            .map(ClientOrderId::new);
1984
1985                                        // Check for pending request if not in order_contexts
1986                                        let pending_context = if context.is_none() {
1987                                            if let Some(client_id) = &label_client_order_id {
1988                                                self.get_pending_order_context(client_id)
1989                                            } else {
1990                                                None
1991                                            }
1992                                        } else {
1993                                            None
1994                                        };
1995
1996                                        // Check if order has a pending request for context resolution
1997                                        let has_pending_request =
1998                                            if let Some(client_id) = &label_client_order_id {
1999                                                self.is_pending_order(client_id)
2000                                            } else {
2001                                                false
2002                                            };
2003
2004                                        let effective_context = context.or(pending_context);
2005                                        let is_known_order =
2006                                            effective_context.is_some() || has_pending_request;
2007
2008                                        // Determine event type based on order state
2009                                        let event_type = determine_order_event_type(
2010                                            &order.order_state,
2011                                            !is_known_order, // is_new if we don't know about it
2012                                            false,           // not from edit response
2013                                        );
2014
2015                                        let (trader_id, strategy_id, client_order_id) =
2016                                            if let Some(ctx) = effective_context {
2017                                                (
2018                                                    ctx.trader_id,
2019                                                    ctx.strategy_id,
2020                                                    ctx.client_order_id,
2021                                                )
2022                                            } else {
2023                                                // External order - use default values
2024                                                // Note: These won't match any strategy, which is correct
2025                                                (
2026                                                    TraderId::new("EXTERNAL-000"),
2027                                                    StrategyId::new("EXTERNAL"),
2028                                                    ClientOrderId::new(venue_order_id_str),
2029                                                )
2030                                            };
2031
2032                                        match event_type {
2033                                            OrderEventType::Accepted => {
2034                                                // Skip if order already reached terminal state (race condition)
2035                                                if self.terminal_orders.contains(&client_order_id) {
2036                                                    log::debug!(
2037                                                        "Skipping OrderAccepted for terminal order: client_order_id={client_order_id}"
2038                                                    );
2039                                                    continue;
2040                                                }
2041
2042                                                // Check if we already emitted OrderAccepted for this order
2043                                                // This prevents duplicates from both response and subscription paths
2044                                                if self.emitted_accepted.contains(&venue_order_id) {
2045                                                    log::trace!(
2046                                                        "Skipping duplicate OrderAccepted: venue_order_id={venue_order_id}"
2047                                                    );
2048                                                    continue;
2049                                                }
2050
2051                                                let event = parse_order_accepted(
2052                                                    order,
2053                                                    instrument,
2054                                                    account_id,
2055                                                    trader_id,
2056                                                    strategy_id,
2057                                                    ts_init,
2058                                                );
2059
2060                                                // Mark OrderAccepted as emitted
2061                                                self.emitted_accepted.add(venue_order_id);
2062
2063                                                log::debug!(
2064                                                    "Emitting OrderAccepted: venue_order_id={venue_order_id}, is_known={is_known_order}"
2065                                                );
2066                                                outgoing
2067                                                    .push(NautilusWsMessage::OrderAccepted(event));
2068                                            }
2069                                            OrderEventType::Canceled => {
2070                                                // Skip if already emitted from the cancel
2071                                                // response path
2072                                                if self.terminal_orders.contains(&client_order_id) {
2073                                                    log::trace!(
2074                                                        "Skipping duplicate OrderCanceled: client_order_id={client_order_id}"
2075                                                    );
2076                                                    continue;
2077                                                }
2078
2079                                                let event = parse_order_canceled(
2080                                                    order,
2081                                                    instrument,
2082                                                    account_id,
2083                                                    trader_id,
2084                                                    strategy_id,
2085                                                    ts_init,
2086                                                );
2087                                                log::debug!(
2088                                                    "Emitting OrderCanceled: venue_order_id={venue_order_id}"
2089                                                );
2090                                                self.terminal_orders.add(client_order_id);
2091                                                self.order_contexts.remove(&venue_order_id);
2092                                                self.emitted_accepted.remove(&venue_order_id);
2093                                                outgoing
2094                                                    .push(NautilusWsMessage::OrderCanceled(event));
2095                                            }
2096                                            OrderEventType::Expired => {
2097                                                let event = parse_order_expired(
2098                                                    order,
2099                                                    instrument,
2100                                                    account_id,
2101                                                    trader_id,
2102                                                    strategy_id,
2103                                                    ts_init,
2104                                                );
2105                                                log::debug!(
2106                                                    "Emitting OrderExpired: venue_order_id={venue_order_id}"
2107                                                );
2108                                                self.terminal_orders.add(client_order_id);
2109                                                self.order_contexts.remove(&venue_order_id);
2110                                                self.emitted_accepted.remove(&venue_order_id);
2111                                                outgoing
2112                                                    .push(NautilusWsMessage::OrderExpired(event));
2113                                            }
2114                                            OrderEventType::Updated => {
2115                                                // Emit OrderStatusReport for updates
2116                                                // This includes quantity/price changes from modify
2117                                                match parse_user_order_msg(
2118                                                    order, instrument, account_id, ts_init,
2119                                                ) {
2120                                                    Ok(report) => {
2121                                                        log::debug!(
2122                                                            "Emitting OrderStatusReport (updated): venue_order_id={venue_order_id}"
2123                                                        );
2124                                                        outgoing.push(
2125                                                            NautilusWsMessage::OrderStatusReports(
2126                                                                vec![report],
2127                                                            ),
2128                                                        );
2129                                                    }
2130                                                    Err(e) => {
2131                                                        log::warn!(
2132                                                            "Failed to parse order update: {e}"
2133                                                        );
2134                                                    }
2135                                                }
2136                                            }
2137                                            OrderEventType::None => {
2138                                                // Fills handled via user.trades, track terminal state
2139                                                // for race condition prevention
2140                                                if matches!(
2141                                                    order.order_state.as_str(),
2142                                                    "filled" | "rejected"
2143                                                ) {
2144                                                    log::debug!(
2145                                                        "Recording terminal order: venue_order_id={venue_order_id}, state={}",
2146                                                        order.order_state
2147                                                    );
2148                                                    self.terminal_orders.add(client_order_id);
2149                                                    self.order_contexts.remove(&venue_order_id);
2150                                                    self.emitted_accepted.remove(&venue_order_id);
2151                                                } else {
2152                                                    log::trace!(
2153                                                        "No event to emit for order {}, state={}",
2154                                                        venue_order_id,
2155                                                        order.order_state
2156                                                    );
2157                                                }
2158                                            }
2159                                        }
2160                                    }
2161
2162                                    if !outgoing.is_empty() {
2163                                        self.pending_outgoing.extend(outgoing);
2164                                    }
2165                                }
2166                                Err(e) => {
2167                                    log::warn!("Failed to deserialize user orders: {e}");
2168                                }
2169                            }
2170                        }
2171                        DeribitWsChannel::UserTrades => {
2172                            // Handle both array and single object responses
2173                            let trades_result =
2174                                serde_json::from_value::<Vec<DeribitUserTradeMsg>>(data.clone())
2175                                    .or_else(|_| {
2176                                        serde_json::from_value::<DeribitUserTradeMsg>(data.clone())
2177                                            .map(|trade| vec![trade])
2178                                    });
2179
2180                            match trades_result {
2181                                Ok(trades) => {
2182                                    log::debug!("Received {} user trade updates", trades.len());
2183
2184                                    let Some(account_id) = self.account_id else {
2185                                        log::warn!("Cannot parse user trades: account_id not set");
2186                                        return Some(NautilusWsMessage::Raw(data.clone()));
2187                                    };
2188
2189                                    let mut reports = Vec::with_capacity(trades.len());
2190                                    for trade in &trades {
2191                                        let instrument_name = trade.instrument_name;
2192
2193                                        if let Some(instrument) =
2194                                            self.instruments_cache.get(&instrument_name)
2195                                        {
2196                                            match parse_user_trade_msg(
2197                                                trade, instrument, account_id, ts_init,
2198                                            ) {
2199                                                Ok(report) => {
2200                                                    log::debug!(
2201                                                        "Parsed fill report: {} @ {}",
2202                                                        report.trade_id,
2203                                                        report.last_px
2204                                                    );
2205                                                    reports.push(report);
2206                                                }
2207                                                Err(e) => {
2208                                                    log::warn!(
2209                                                        "Failed to parse trade {}: {e}",
2210                                                        trade.trade_id
2211                                                    );
2212                                                }
2213                                            }
2214                                        } else {
2215                                            log::warn!(
2216                                                "Instrument {instrument_name} not found in cache"
2217                                            );
2218                                        }
2219                                    }
2220
2221                                    if !reports.is_empty() {
2222                                        return Some(NautilusWsMessage::FillReports(reports));
2223                                    }
2224                                }
2225                                Err(e) => {
2226                                    log::warn!("Failed to deserialize user trades: {e}");
2227                                }
2228                            }
2229                        }
2230                        DeribitWsChannel::UserPortfolio => {
2231                            match serde_json::from_value::<DeribitPortfolioMsg>(data.clone()) {
2232                                Ok(portfolio) => {
2233                                    // Skip zero-balance currencies (common with cross-collateral)
2234                                    // Only check equity and balance - initial_margin can be non-zero
2235                                    // for all currencies when cross-collateral is enabled
2236                                    if portfolio.equity.is_zero() && portfolio.balance.is_zero() {
2237                                        log::trace!(
2238                                            "Skipping zero-balance portfolio for {}",
2239                                            portfolio.currency
2240                                        );
2241                                        return None;
2242                                    }
2243
2244                                    // Require account_id for parsing
2245                                    let Some(account_id) = self.account_id else {
2246                                        log::warn!("Cannot parse portfolio: account_id not set");
2247                                        return None;
2248                                    };
2249
2250                                    match parse_portfolio_to_account_state(
2251                                        &portfolio, account_id, ts_init,
2252                                    ) {
2253                                        Ok(account_state) => {
2254                                            // Check for duplicate per currency
2255                                            let currency_key = portfolio.currency.clone();
2256
2257                                            if let Some(last) =
2258                                                self.last_account_states.get(&currency_key)
2259                                                && account_state.has_same_balances_and_margins(last)
2260                                            {
2261                                                log::trace!(
2262                                                    "Skipping duplicate portfolio update for {}",
2263                                                    portfolio.currency
2264                                                );
2265                                                return None;
2266                                            }
2267
2268                                            self.last_account_states
2269                                                .insert(currency_key, account_state.clone());
2270                                            return Some(NautilusWsMessage::AccountState(
2271                                                account_state,
2272                                            ));
2273                                        }
2274                                        Err(e) => {
2275                                            log::warn!(
2276                                                "Failed to parse portfolio to AccountState: {e}"
2277                                            );
2278                                        }
2279                                    }
2280                                }
2281                                Err(e) => {
2282                                    log::warn!("Failed to deserialize portfolio: {e}");
2283                                }
2284                            }
2285                        }
2286                        _ => {
2287                            // Unhandled channel - return raw
2288                            log::trace!("Unhandled channel: {channel}");
2289                            return Some(NautilusWsMessage::Raw(data.clone()));
2290                        }
2291                    }
2292                } else {
2293                    log::trace!("Unknown channel: {channel}");
2294                    return Some(NautilusWsMessage::Raw(data.clone()));
2295                }
2296                None
2297            }
2298            DeribitWsMessage::Heartbeat(heartbeat) => {
2299                match heartbeat.heartbeat_type {
2300                    DeribitHeartbeatType::TestRequest => {
2301                        log::trace!(
2302                            "Received heartbeat test_request - responding with public/test"
2303                        );
2304
2305                        if let Err(e) = self.handle_heartbeat_test_request().await {
2306                            log::error!("Failed to respond to heartbeat test_request: {e}");
2307
2308                            // Return error to signal connection may be unhealthy
2309                            return Some(NautilusWsMessage::Error(DeribitWsError::Send(format!(
2310                                "Heartbeat response failed: {e}"
2311                            ))));
2312                        }
2313                    }
2314                    DeribitHeartbeatType::Heartbeat => {
2315                        log::trace!("Received heartbeat acknowledgment");
2316                    }
2317                }
2318                None
2319            }
2320            DeribitWsMessage::Error(err) => {
2321                log::error!("Deribit error {}: {}", err.code, err.message);
2322                Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
2323                    code: err.code,
2324                    message: err.message,
2325                }))
2326            }
2327            DeribitWsMessage::Reconnected => Some(NautilusWsMessage::Reconnected),
2328        }
2329    }
2330
2331    /// Main message processing loop.
2332    ///
2333    /// Returns `None` when the handler should stop.
2334    /// Messages that need client-side handling (e.g., Reconnected) are returned.
2335    /// Data messages are sent directly to `out_tx` for the user stream.
2336    pub async fn next(&mut self) -> Option<NautilusWsMessage> {
2337        loop {
2338            if let Some(msg) = self.pending_outgoing.pop_front() {
2339                match msg {
2340                    NautilusWsMessage::Reconnected
2341                    | NautilusWsMessage::Authenticated(_)
2342                    | NautilusWsMessage::AuthenticationFailed(_) => {
2343                        return Some(msg);
2344                    }
2345                    _ => {
2346                        let _ = self.out_tx.send(msg);
2347                        continue;
2348                    }
2349                }
2350            }
2351
2352            tokio::select! {
2353                // Process commands from client
2354                Some(cmd) = self.cmd_rx.recv() => {
2355                    self.process_command(cmd).await;
2356                }
2357                // Process raw WebSocket messages
2358                Some(msg) = self.raw_rx.recv() => {
2359                    match msg {
2360                        Message::Text(text) => {
2361                            if let Some(nautilus_msg) = self.process_raw_message(&text).await {
2362                                // Send data messages to user stream
2363                                match &nautilus_msg {
2364                                    NautilusWsMessage::Data(_)
2365                                    | NautilusWsMessage::Deltas(_)
2366                                    | NautilusWsMessage::Instrument(_)
2367                                    | NautilusWsMessage::InstrumentStatus(_)
2368                                    | NautilusWsMessage::OptionGreeks(_)
2369                                    | NautilusWsMessage::Raw(_)
2370                                    | NautilusWsMessage::Error(_) => {
2371                                        let _ = self.out_tx.send(nautilus_msg);
2372                                    }
2373                                    NautilusWsMessage::FundingRates(rates) => {
2374                                        let msg_to_send =
2375                                            NautilusWsMessage::FundingRates(rates.clone());
2376
2377                                        if let Err(e) = self.out_tx.send(msg_to_send) {
2378                                            log::error!("Failed to send funding rates: {e}");
2379                                        }
2380                                    }
2381                                    NautilusWsMessage::OrderStatusReports(_)
2382                                    | NautilusWsMessage::FillReports(_)
2383                                    | NautilusWsMessage::OrderAccepted(_)
2384                                    | NautilusWsMessage::OrderCanceled(_)
2385                                    | NautilusWsMessage::OrderExpired(_)
2386                                    | NautilusWsMessage::OrderUpdated(_)
2387                                    | NautilusWsMessage::OrderRejected(_)
2388                                    | NautilusWsMessage::OrderCancelRejected(_)
2389                                    | NautilusWsMessage::OrderModifyRejected(_)
2390                                    | NautilusWsMessage::AccountState(_) => {
2391                                        let _ = self.out_tx.send(nautilus_msg);
2392                                    }
2393                                    // Return messages that need client-side handling
2394                                    NautilusWsMessage::Reconnected
2395                                    | NautilusWsMessage::Authenticated(_)
2396                                    | NautilusWsMessage::AuthenticationFailed(_) => {
2397                                        return Some(nautilus_msg);
2398                                    }
2399                                }
2400                            }
2401                        }
2402                        Message::Ping(data) => {
2403                            // Respond to ping with pong
2404                            if let Some(client) = &self.inner {
2405                                let _ = client.send_pong(data.to_vec()).await;
2406                            }
2407                        }
2408                        Message::Close(_) => {
2409                            log::info!("Received close frame");
2410                        }
2411                        _ => {}
2412                    }
2413                }
2414                // Check for stop signal
2415                () = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
2416                    if self.signal.load(Ordering::Relaxed) {
2417                        log::debug!("Stop signal received");
2418                        return None;
2419                    }
2420                }
2421            }
2422        }
2423    }
2424}