nautilus_deribit/websocket/handler.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Deribit.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21
22use std::{
23 collections::VecDeque,
24 sync::{
25 Arc, Mutex,
26 atomic::{AtomicBool, AtomicU64, Ordering},
27 },
28};
29
30use ahash::AHashMap;
31use nautilus_common::cache::fifo::FifoCache;
32use nautilus_core::{AtomicSet, AtomicTime, UUID4, UnixNanos, time::get_atomic_clock_realtime};
33use nautilus_model::{
34 data::{Bar, Data, InstrumentStatus},
35 enums::MarketStatusAction,
36 events::{AccountState, OrderCancelRejected, OrderModifyRejected, OrderRejected},
37 identifiers::{
38 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
39 },
40 instruments::{Instrument, InstrumentAny},
41};
42use nautilus_network::{
43 RECONNECTED,
44 retry::{RetryManager, create_websocket_retry_manager},
45 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
46};
47use tokio_tungstenite::tungstenite::Message;
48use ustr::Ustr;
49
50use super::{
51 enums::{DeribitBookMsgType, DeribitHeartbeatType, DeribitWsChannel},
52 error::DeribitWsError,
53 messages::{
54 DeribitAuthResult, DeribitBookMsg, DeribitCancelAllByInstrumentParams, DeribitCancelParams,
55 DeribitChartMsg, DeribitEditParams, DeribitHeartbeatParams, DeribitInstrumentStateMsg,
56 DeribitJsonRpcRequest, DeribitOrderMsg, DeribitOrderParams, DeribitOrderResponse,
57 DeribitPerpetualMsg, DeribitPortfolioMsg, DeribitQuoteMsg, DeribitSubscribeParams,
58 DeribitTickerMsg, DeribitTradeMsg, DeribitUserTradeMsg, DeribitWsMessage,
59 NautilusWsMessage, parse_raw_message,
60 },
61 parse::{
62 OrderEventType, determine_order_event_type, parse_book_msg, parse_chart_msg,
63 parse_order_accepted, parse_order_canceled, parse_order_expired, parse_order_updated,
64 parse_perpetual_to_funding_rate, parse_quote_msg, parse_ticker_to_index_price,
65 parse_ticker_to_mark_price, parse_ticker_to_option_greeks, parse_trades_data,
66 parse_user_order_msg, parse_user_trade_msg, resolution_to_bar_type,
67 },
68};
69use crate::common::{
70 consts::{DERIBIT_POST_ONLY_ERROR_CODE, DERIBIT_RATE_LIMIT_KEY_ORDER, DERIBIT_VENUE},
71 enums::DeribitInstrumentState,
72 parse::parse_portfolio_to_account_state,
73};
74
75/// Type of pending request for request ID correlation.
76#[derive(Debug, Clone)]
77pub enum PendingRequestType {
78 /// Authentication request.
79 Authenticate,
80 /// Subscribe request with requested channels.
81 Subscribe { channels: Vec<String> },
82 /// Unsubscribe request with requested channels.
83 Unsubscribe { channels: Vec<String> },
84 /// Set heartbeat request.
85 SetHeartbeat,
86 /// Test/ping request (heartbeat response).
87 Test,
88 /// Buy order request.
89 Buy {
90 client_order_id: ClientOrderId,
91 trader_id: TraderId,
92 strategy_id: StrategyId,
93 instrument_id: InstrumentId,
94 },
95 /// Sell order request.
96 Sell {
97 client_order_id: ClientOrderId,
98 trader_id: TraderId,
99 strategy_id: StrategyId,
100 instrument_id: InstrumentId,
101 },
102 /// Edit order request.
103 Edit {
104 client_order_id: ClientOrderId,
105 trader_id: TraderId,
106 strategy_id: StrategyId,
107 instrument_id: InstrumentId,
108 },
109 /// Cancel order request.
110 Cancel {
111 client_order_id: ClientOrderId,
112 trader_id: TraderId,
113 strategy_id: StrategyId,
114 instrument_id: InstrumentId,
115 },
116 /// Cancel all orders by instrument request.
117 CancelAllByInstrument { instrument_id: InstrumentId },
118 /// Get order state request.
119 GetOrderState {
120 client_order_id: ClientOrderId,
121 trader_id: TraderId,
122 strategy_id: StrategyId,
123 instrument_id: InstrumentId,
124 },
125}
126
127/// Commands sent from the client to the handler.
128#[allow(missing_debug_implementations)]
129pub enum HandlerCommand {
130 /// Set the active WebSocket client.
131 SetClient(WebSocketClient),
132 /// Disconnect the WebSocket.
133 Disconnect,
134 /// Authenticate with credentials.
135 Authenticate {
136 /// Serialized auth params (DeribitAuthParams or DeribitRefreshTokenParams).
137 auth_params: serde_json::Value,
138 },
139 /// Enable heartbeat with interval.
140 SetHeartbeat { interval: u64 },
141 /// Initialize the instrument cache.
142 InitializeInstruments(Vec<InstrumentAny>),
143 /// Update a single instrument in the cache.
144 UpdateInstrument(Box<InstrumentAny>),
145 /// Subscribe to channels.
146 Subscribe { channels: Vec<String> },
147 /// Unsubscribe from channels.
148 Unsubscribe { channels: Vec<String> },
149 /// Submit a buy order.
150 Buy {
151 params: DeribitOrderParams,
152 client_order_id: ClientOrderId,
153 trader_id: TraderId,
154 strategy_id: StrategyId,
155 instrument_id: InstrumentId,
156 },
157 /// Submit a sell order.
158 Sell {
159 params: DeribitOrderParams,
160 client_order_id: ClientOrderId,
161 trader_id: TraderId,
162 strategy_id: StrategyId,
163 instrument_id: InstrumentId,
164 },
165 /// Edit an existing order.
166 Edit {
167 params: DeribitEditParams,
168 client_order_id: ClientOrderId,
169 trader_id: TraderId,
170 strategy_id: StrategyId,
171 instrument_id: InstrumentId,
172 },
173 /// Cancel an existing order.
174 Cancel {
175 params: DeribitCancelParams,
176 client_order_id: ClientOrderId,
177 trader_id: TraderId,
178 strategy_id: StrategyId,
179 instrument_id: InstrumentId,
180 },
181 /// Cancel all orders by instrument.
182 CancelAllByInstrument {
183 params: DeribitCancelAllByInstrumentParams,
184 instrument_id: InstrumentId,
185 },
186 /// Get order state.
187 GetOrderState {
188 order_id: String,
189 client_order_id: ClientOrderId,
190 trader_id: TraderId,
191 strategy_id: StrategyId,
192 instrument_id: InstrumentId,
193 },
194}
195
196/// Context for an order submitted via this handler.
197///
198/// Stores the original trader/strategy/client IDs from the buy/sell command
199/// so they can be used when processing user.orders subscription updates.
200#[derive(Debug, Clone)]
201pub struct OrderContext {
202 pub client_order_id: ClientOrderId,
203 pub trader_id: TraderId,
204 pub strategy_id: StrategyId,
205 pub instrument_id: InstrumentId,
206}
207
208/// Deribit WebSocket feed handler.
209///
210/// Runs in a dedicated Tokio task, processing commands and raw WebSocket messages.
211#[allow(missing_debug_implementations)]
212pub struct DeribitWsFeedHandler {
213 clock: &'static AtomicTime,
214 signal: Arc<AtomicBool>,
215 inner: Option<WebSocketClient>,
216 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
217 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
218 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
219 auth_tracker: AuthTracker,
220 subscriptions_state: SubscriptionState,
221 retry_manager: RetryManager<DeribitWsError>,
222 instruments_cache: AHashMap<Ustr, InstrumentAny>,
223 option_greeks_subs: Arc<AtomicSet<InstrumentId>>,
224 mark_price_subs: Arc<AtomicSet<InstrumentId>>,
225 index_price_subs: Arc<AtomicSet<InstrumentId>>,
226 request_id_counter: AtomicU64,
227 pending_requests: AHashMap<u64, PendingRequestType>,
228 account_id: Option<AccountId>,
229 order_contexts: AHashMap<VenueOrderId, OrderContext>,
230 emitted_accepted: FifoCache<VenueOrderId, 10_000>,
231 terminal_orders: FifoCache<ClientOrderId, 10_000>,
232 pending_bars: AHashMap<String, Bar>,
233 bars_timestamp_on_close: bool,
234 last_account_states: AHashMap<String, AccountState>,
235 book_sequence: AHashMap<Ustr, u64>,
236 pending_book_resync: Vec<String>,
237 pending_outgoing: VecDeque<NautilusWsMessage>,
238 subscribe_errors: Arc<Mutex<Vec<String>>>,
239}
240
241impl DeribitWsFeedHandler {
242 /// Creates a new feed handler.
243 #[expect(clippy::too_many_arguments)]
244 #[must_use]
245 pub fn new(
246 signal: Arc<AtomicBool>,
247 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
248 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
249 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
250 auth_tracker: AuthTracker,
251 subscriptions_state: SubscriptionState,
252 option_greeks_subs: Arc<AtomicSet<InstrumentId>>,
253 mark_price_subs: Arc<AtomicSet<InstrumentId>>,
254 index_price_subs: Arc<AtomicSet<InstrumentId>>,
255 account_id: Option<AccountId>,
256 bars_timestamp_on_close: bool,
257 subscribe_errors: Arc<Mutex<Vec<String>>>,
258 ) -> Self {
259 Self {
260 clock: get_atomic_clock_realtime(),
261 signal,
262 inner: None,
263 cmd_rx,
264 raw_rx,
265 out_tx,
266 auth_tracker,
267 subscriptions_state,
268 retry_manager: create_websocket_retry_manager(),
269 instruments_cache: AHashMap::new(),
270 option_greeks_subs,
271 mark_price_subs,
272 index_price_subs,
273 request_id_counter: AtomicU64::new(1),
274 pending_requests: AHashMap::new(),
275 account_id,
276 order_contexts: AHashMap::new(),
277 emitted_accepted: FifoCache::new(),
278 terminal_orders: FifoCache::new(),
279 pending_bars: AHashMap::new(),
280 bars_timestamp_on_close,
281 last_account_states: AHashMap::new(),
282 book_sequence: AHashMap::new(),
283 pending_book_resync: Vec::new(),
284 pending_outgoing: VecDeque::new(),
285 subscribe_errors,
286 }
287 }
288
289 /// Sets the account ID for order/fill reports.
290 pub fn set_account_id(&mut self, account_id: AccountId) {
291 self.account_id = Some(account_id);
292 }
293
294 /// Returns the account ID.
295 #[must_use]
296 pub fn account_id(&self) -> Option<AccountId> {
297 self.account_id
298 }
299
300 fn clear_state(&mut self) {
301 let pending_count = self.pending_requests.len();
302 let emitted_count = self.emitted_accepted.len();
303 let bars_count = self.pending_bars.len();
304 let account_count = self.last_account_states.len();
305 let book_count = self.book_sequence.len();
306 let outgoing_count = self.pending_outgoing.len();
307
308 self.pending_requests.clear();
309 self.emitted_accepted.clear();
310 self.pending_bars.clear();
311 self.last_account_states.clear();
312 self.book_sequence.clear();
313 self.pending_book_resync.clear();
314 self.pending_outgoing.clear();
315
316 log::debug!(
317 "Reset state: pending_requests={pending_count}, emitted_accepted={emitted_count}, \
318 pending_bars={bars_count}, account_states={account_count}, book_sequence={book_count}, \
319 pending_outgoing={outgoing_count}"
320 );
321 }
322
323 /// Generates a unique request ID.
324 fn next_request_id(&self) -> u64 {
325 self.request_id_counter.fetch_add(1, Ordering::Relaxed)
326 }
327
328 /// Returns the current timestamp.
329 fn ts_init(&self) -> UnixNanos {
330 self.clock.get_time_ns()
331 }
332
333 /// Checks if there's a pending buy/sell request for the given client_order_id.
334 ///
335 /// This is used to avoid emitting duplicate OrderAccepted events from the
336 /// user.orders subscription when the response path will also emit an event.
337 fn is_pending_order(&self, client_order_id: &ClientOrderId) -> bool {
338 self.pending_requests.values().any(|req| match req {
339 PendingRequestType::Buy {
340 client_order_id: id,
341 ..
342 }
343 | PendingRequestType::Sell {
344 client_order_id: id,
345 ..
346 } => id == client_order_id,
347 _ => false,
348 })
349 }
350
351 /// Gets the OrderContext from a pending buy/sell request by client_order_id.
352 ///
353 /// Returns None if no pending request found.
354 fn get_pending_order_context(&self, client_order_id: &ClientOrderId) -> Option<OrderContext> {
355 for req in self.pending_requests.values() {
356 match req {
357 PendingRequestType::Buy {
358 client_order_id: id,
359 trader_id,
360 strategy_id,
361 instrument_id,
362 }
363 | PendingRequestType::Sell {
364 client_order_id: id,
365 trader_id,
366 strategy_id,
367 instrument_id,
368 } if id == client_order_id => {
369 return Some(OrderContext {
370 client_order_id: *id,
371 trader_id: *trader_id,
372 strategy_id: *strategy_id,
373 instrument_id: *instrument_id,
374 });
375 }
376 _ => {}
377 }
378 }
379 None
380 }
381
382 async fn send_tracked_request(
383 &mut self,
384 request_id: u64,
385 payload: Result<String, DeribitWsError>,
386 rate_limit_keys: Option<&[Ustr]>,
387 ) -> Result<(), DeribitWsError> {
388 let payload = match payload {
389 Ok(p) => p,
390 Err(e) => {
391 self.pending_requests.remove(&request_id);
392 return Err(e);
393 }
394 };
395 let result = self.send_with_retry(payload, rate_limit_keys).await;
396 if result.is_err() {
397 self.pending_requests.remove(&request_id);
398 }
399 result
400 }
401
402 /// Sends a message over the WebSocket with retry logic.
403 async fn send_with_retry(
404 &self,
405 payload: String,
406 rate_limit_keys: Option<&[Ustr]>,
407 ) -> Result<(), DeribitWsError> {
408 if let Some(client) = &self.inner {
409 let keys_owned: Option<Vec<Ustr>> = rate_limit_keys.map(|k| k.to_vec());
410 self.retry_manager
411 .execute_with_retry(
412 "websocket_send",
413 || {
414 let payload = payload.clone();
415 let keys = keys_owned.clone();
416 async move {
417 client
418 .send_text(payload, keys.as_deref())
419 .await
420 .map_err(|e| DeribitWsError::Send(e.to_string()))
421 }
422 },
423 |e| matches!(e, DeribitWsError::Send(_)),
424 DeribitWsError::Timeout,
425 )
426 .await
427 } else {
428 Err(DeribitWsError::NotConnected)
429 }
430 }
431
432 /// Handles a subscribe command.
433 ///
434 /// Note: The client has already called `mark_subscribe` before sending this command.
435 async fn handle_subscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
436 let request_id = self.next_request_id();
437
438 // Track this request for response correlation
439 self.pending_requests.insert(
440 request_id,
441 PendingRequestType::Subscribe {
442 channels: channels.clone(),
443 },
444 );
445
446 // Deribit requires private/subscribe for authenticated channels
447 let method = if channels
448 .iter()
449 .any(|ch| DeribitWsChannel::requires_auth(ch))
450 {
451 "private/subscribe"
452 } else {
453 "public/subscribe"
454 };
455
456 let request = DeribitJsonRpcRequest::new(
457 request_id,
458 method,
459 DeribitSubscribeParams {
460 channels: channels.clone(),
461 },
462 );
463
464 let payload =
465 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
466
467 log::debug!("Subscribing to channels: request_id={request_id}, channels={channels:?}");
468 self.send_tracked_request(request_id, payload, None).await
469 }
470
471 /// Handles an unsubscribe command.
472 async fn handle_unsubscribe(&mut self, channels: Vec<String>) -> Result<(), DeribitWsError> {
473 let request_id = self.next_request_id();
474
475 // Track this request for response correlation
476 self.pending_requests.insert(
477 request_id,
478 PendingRequestType::Unsubscribe {
479 channels: channels.clone(),
480 },
481 );
482
483 let method = if channels
484 .iter()
485 .any(|ch| DeribitWsChannel::requires_auth(ch))
486 {
487 "private/unsubscribe"
488 } else {
489 "public/unsubscribe"
490 };
491
492 let request = DeribitJsonRpcRequest::new(
493 request_id,
494 method,
495 DeribitSubscribeParams {
496 channels: channels.clone(),
497 },
498 );
499
500 let payload =
501 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
502
503 log::debug!("Unsubscribing from channels: request_id={request_id}, channels={channels:?}");
504 self.send_tracked_request(request_id, payload, None).await
505 }
506
507 /// Handles enabling heartbeat.
508 async fn handle_set_heartbeat(&mut self, interval: u64) -> Result<(), DeribitWsError> {
509 let request_id = self.next_request_id();
510
511 // Track this request for response correlation
512 self.pending_requests
513 .insert(request_id, PendingRequestType::SetHeartbeat);
514
515 let request = DeribitJsonRpcRequest::new(
516 request_id,
517 "public/set_heartbeat",
518 DeribitHeartbeatParams { interval },
519 );
520
521 let payload =
522 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
523
524 log::debug!(
525 "Enabling heartbeat with interval: request_id={request_id}, interval={interval} seconds"
526 );
527 self.send_tracked_request(request_id, payload, None).await
528 }
529
530 /// Responds to a heartbeat test_request.
531 async fn handle_heartbeat_test_request(&mut self) -> Result<(), DeribitWsError> {
532 let request_id = self.next_request_id();
533
534 // Track this request for response correlation
535 self.pending_requests
536 .insert(request_id, PendingRequestType::Test);
537
538 let request = DeribitJsonRpcRequest::new(request_id, "public/test", serde_json::json!({}));
539
540 let payload =
541 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
542
543 log::trace!("Responding to heartbeat test_request: request_id={request_id}");
544 self.send_tracked_request(request_id, payload, None).await
545 }
546
547 /// Handles a buy order command.
548 async fn handle_buy(
549 &mut self,
550 params: DeribitOrderParams,
551 client_order_id: ClientOrderId,
552 trader_id: TraderId,
553 strategy_id: StrategyId,
554 instrument_id: InstrumentId,
555 ) -> Result<(), DeribitWsError> {
556 let request_id = self.next_request_id();
557
558 self.pending_requests.insert(
559 request_id,
560 PendingRequestType::Buy {
561 client_order_id,
562 trader_id,
563 strategy_id,
564 instrument_id,
565 },
566 );
567
568 let request = DeribitJsonRpcRequest::new(request_id, "private/buy", params);
569
570 let payload =
571 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
572
573 log::debug!("Sending buy order: request_id={request_id}");
574 self.send_tracked_request(
575 request_id,
576 payload,
577 Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()),
578 )
579 .await
580 }
581
582 /// Handles a sell order command.
583 async fn handle_sell(
584 &mut self,
585 params: DeribitOrderParams,
586 client_order_id: ClientOrderId,
587 trader_id: TraderId,
588 strategy_id: StrategyId,
589 instrument_id: InstrumentId,
590 ) -> Result<(), DeribitWsError> {
591 let request_id = self.next_request_id();
592
593 self.pending_requests.insert(
594 request_id,
595 PendingRequestType::Sell {
596 client_order_id,
597 trader_id,
598 strategy_id,
599 instrument_id,
600 },
601 );
602
603 let request = DeribitJsonRpcRequest::new(request_id, "private/sell", params);
604
605 let payload =
606 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
607
608 log::debug!("Sending sell order: request_id={request_id}");
609 self.send_tracked_request(
610 request_id,
611 payload,
612 Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()),
613 )
614 .await
615 }
616
617 /// Handles an edit order command.
618 async fn handle_edit(
619 &mut self,
620 params: DeribitEditParams,
621 client_order_id: ClientOrderId,
622 trader_id: TraderId,
623 strategy_id: StrategyId,
624 instrument_id: InstrumentId,
625 ) -> Result<(), DeribitWsError> {
626 let request_id = self.next_request_id();
627 let order_id = params.order_id.clone();
628
629 self.pending_requests.insert(
630 request_id,
631 PendingRequestType::Edit {
632 client_order_id,
633 trader_id,
634 strategy_id,
635 instrument_id,
636 },
637 );
638
639 let request = DeribitJsonRpcRequest::new(request_id, "private/edit", params);
640
641 let payload =
642 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
643
644 log::debug!("Sending edit order: request_id={request_id}, order_id={order_id}");
645 self.send_tracked_request(
646 request_id,
647 payload,
648 Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()),
649 )
650 .await
651 }
652
653 /// Handles a cancel order command.
654 async fn handle_cancel(
655 &mut self,
656 params: DeribitCancelParams,
657 client_order_id: ClientOrderId,
658 trader_id: TraderId,
659 strategy_id: StrategyId,
660 instrument_id: InstrumentId,
661 ) -> Result<(), DeribitWsError> {
662 let request_id = self.next_request_id();
663 let order_id = params.order_id.clone();
664
665 self.pending_requests.insert(
666 request_id,
667 PendingRequestType::Cancel {
668 client_order_id,
669 trader_id,
670 strategy_id,
671 instrument_id,
672 },
673 );
674
675 let request = DeribitJsonRpcRequest::new(request_id, "private/cancel", params);
676
677 let payload =
678 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
679
680 log::debug!("Sending cancel order: request_id={request_id}, order_id={order_id}");
681 self.send_tracked_request(
682 request_id,
683 payload,
684 Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()),
685 )
686 .await
687 }
688
689 /// Handles cancel all orders by instrument command.
690 async fn handle_cancel_all_by_instrument(
691 &mut self,
692 params: DeribitCancelAllByInstrumentParams,
693 instrument_id: InstrumentId,
694 ) -> Result<(), DeribitWsError> {
695 let request_id = self.next_request_id();
696 let instrument_name = params.instrument_name.clone();
697
698 // Track this request for response correlation
699 self.pending_requests.insert(
700 request_id,
701 PendingRequestType::CancelAllByInstrument { instrument_id },
702 );
703
704 let request =
705 DeribitJsonRpcRequest::new(request_id, "private/cancel_all_by_instrument", params);
706
707 let payload =
708 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
709
710 log::debug!(
711 "Sending cancel_all_by_instrument: request_id={request_id}, instrument={instrument_name}"
712 );
713 self.send_tracked_request(
714 request_id,
715 payload,
716 Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()),
717 )
718 .await
719 }
720
721 /// Handles get order state command.
722 async fn handle_get_order_state(
723 &mut self,
724 order_id: String,
725 client_order_id: ClientOrderId,
726 trader_id: TraderId,
727 strategy_id: StrategyId,
728 instrument_id: InstrumentId,
729 ) -> Result<(), DeribitWsError> {
730 let request_id = self.next_request_id();
731
732 // Track this request for response correlation
733 self.pending_requests.insert(
734 request_id,
735 PendingRequestType::GetOrderState {
736 client_order_id,
737 trader_id,
738 strategy_id,
739 instrument_id,
740 },
741 );
742
743 let params = serde_json::json!({
744 "order_id": order_id
745 });
746
747 let request = DeribitJsonRpcRequest::new(request_id, "private/get_order_state", params);
748
749 let payload =
750 serde_json::to_string(&request).map_err(|e| DeribitWsError::Json(e.to_string()));
751
752 log::debug!("Sending get_order_state: request_id={request_id}, order_id={order_id}");
753 self.send_tracked_request(
754 request_id,
755 payload,
756 Some(DERIBIT_RATE_LIMIT_KEY_ORDER.as_slice()),
757 )
758 .await
759 }
760
761 /// Processes a command from the client.
762 async fn process_command(&mut self, cmd: HandlerCommand) {
763 match cmd {
764 HandlerCommand::SetClient(client) => {
765 log::debug!("Setting WebSocket client");
766 self.inner = Some(client);
767 }
768 HandlerCommand::Disconnect => {
769 log::debug!("Disconnecting WebSocket");
770
771 if let Some(client) = self.inner.take() {
772 client.disconnect().await;
773 }
774 }
775 HandlerCommand::Authenticate { auth_params } => {
776 let request_id = self.next_request_id();
777 log::debug!("Authenticating: request_id={request_id}");
778
779 // Track this request for response correlation
780 self.pending_requests
781 .insert(request_id, PendingRequestType::Authenticate);
782
783 let request = DeribitJsonRpcRequest::new(request_id, "public/auth", auth_params);
784 match serde_json::to_string(&request) {
785 Ok(payload) => {
786 if let Err(e) = self.send_with_retry(payload, None).await {
787 self.pending_requests.remove(&request_id);
788 log::error!("Authentication send failed: {e}");
789 self.auth_tracker.fail(format!("Send failed: {e}"));
790 }
791 }
792 Err(e) => {
793 self.pending_requests.remove(&request_id);
794 log::error!("Failed to serialize auth request: {e}");
795 self.auth_tracker.fail(format!("Serialization failed: {e}"));
796 }
797 }
798 }
799 HandlerCommand::SetHeartbeat { interval } => {
800 if let Err(e) = self.handle_set_heartbeat(interval).await {
801 log::error!("Set heartbeat failed: {e}");
802 }
803 }
804 HandlerCommand::InitializeInstruments(instruments) => {
805 log::info!("Handler received {} instruments", instruments.len());
806 self.instruments_cache.clear();
807 for inst in instruments {
808 self.instruments_cache
809 .insert(inst.raw_symbol().inner(), inst);
810 }
811 }
812 HandlerCommand::UpdateInstrument(instrument) => {
813 log::trace!("Updating instrument: {}", instrument.raw_symbol());
814 self.instruments_cache
815 .insert(instrument.raw_symbol().inner(), *instrument);
816 }
817 HandlerCommand::Subscribe { channels } => {
818 if let Err(e) = self.handle_subscribe(channels).await {
819 log::error!("Subscribe failed: {e}");
820 }
821 }
822 HandlerCommand::Unsubscribe { channels } => {
823 // User-initiated unsubscribe cancels any pending book resync
824 // for these channels so we don't re-subscribe against user intent
825 self.pending_book_resync.retain(|ch| !channels.contains(ch));
826
827 if let Err(e) = self.handle_unsubscribe(channels).await {
828 log::error!("Unsubscribe failed: {e}");
829 }
830 }
831 HandlerCommand::Buy {
832 params,
833 client_order_id,
834 trader_id,
835 strategy_id,
836 instrument_id,
837 } => {
838 if let Err(e) = self
839 .handle_buy(
840 params,
841 client_order_id,
842 trader_id,
843 strategy_id,
844 instrument_id,
845 )
846 .await
847 {
848 log::error!("Buy order failed: {e}");
849 }
850 }
851 HandlerCommand::Sell {
852 params,
853 client_order_id,
854 trader_id,
855 strategy_id,
856 instrument_id,
857 } => {
858 if let Err(e) = self
859 .handle_sell(
860 params,
861 client_order_id,
862 trader_id,
863 strategy_id,
864 instrument_id,
865 )
866 .await
867 {
868 log::error!("Sell order failed: {e}");
869 }
870 }
871 HandlerCommand::Edit {
872 params,
873 client_order_id,
874 trader_id,
875 strategy_id,
876 instrument_id,
877 } => {
878 if let Err(e) = self
879 .handle_edit(
880 params,
881 client_order_id,
882 trader_id,
883 strategy_id,
884 instrument_id,
885 )
886 .await
887 {
888 log::error!("Edit order failed: {e}");
889 }
890 }
891 HandlerCommand::Cancel {
892 params,
893 client_order_id,
894 trader_id,
895 strategy_id,
896 instrument_id,
897 } => {
898 if let Err(e) = self
899 .handle_cancel(
900 params,
901 client_order_id,
902 trader_id,
903 strategy_id,
904 instrument_id,
905 )
906 .await
907 {
908 log::error!("Cancel order failed: {e}");
909 }
910 }
911 HandlerCommand::CancelAllByInstrument {
912 params,
913 instrument_id,
914 } => {
915 if let Err(e) = self
916 .handle_cancel_all_by_instrument(params, instrument_id)
917 .await
918 {
919 log::error!("Cancel all by instrument failed: {e}");
920 }
921 }
922 HandlerCommand::GetOrderState {
923 order_id,
924 client_order_id,
925 trader_id,
926 strategy_id,
927 instrument_id,
928 } => {
929 if let Err(e) = self
930 .handle_get_order_state(
931 order_id,
932 client_order_id,
933 trader_id,
934 strategy_id,
935 instrument_id,
936 )
937 .await
938 {
939 log::error!("Get order state failed: {e}");
940 }
941 }
942 }
943 }
944
945 /// Processes a raw WebSocket message.
946 async fn process_raw_message(&mut self, text: &str) -> Option<NautilusWsMessage> {
947 if text == RECONNECTED {
948 log::info!("Received reconnection signal");
949
950 self.auth_tracker.invalidate();
951 self.clear_state();
952
953 return Some(NautilusWsMessage::Reconnected);
954 }
955
956 // Parse the JSON-RPC message
957 let ws_msg = match parse_raw_message(text) {
958 Ok(msg) => msg,
959 Err(e) => {
960 log::warn!("Failed to parse message: {e}");
961 return None;
962 }
963 };
964
965 let ts_init = self.ts_init();
966
967 match ws_msg {
968 DeribitWsMessage::Response(response) => {
969 // Look up the request type by ID for explicit correlation
970 if let Some(request_id) = response.id
971 && let Some(request_type) = self.pending_requests.remove(&request_id)
972 {
973 match request_type {
974 PendingRequestType::Authenticate => {
975 if let Some(error) = &response.error {
976 let reason = format!(
977 "Authentication error code={}: {}",
978 error.code, error.message
979 );
980 log::error!(
981 "Authentication failed: code={}, message={}, request_id={}",
982 error.code,
983 error.message,
984 request_id
985 );
986 self.auth_tracker.fail(reason.clone());
987 return Some(NautilusWsMessage::AuthenticationFailed(reason));
988 } else if let Some(result) = &response.result {
989 match serde_json::from_value::<DeribitAuthResult>(result.clone()) {
990 Ok(auth_result) => {
991 self.auth_tracker.succeed();
992 log::debug!(
993 "WebSocket authenticated successfully (request_id={}, scope={}, expires_in={}s)",
994 request_id,
995 auth_result.scope,
996 auth_result.expires_in
997 );
998 return Some(NautilusWsMessage::Authenticated(Box::new(
999 auth_result,
1000 )));
1001 }
1002 Err(e) => {
1003 let reason = format!("Failed to parse auth result: {e}");
1004 log::error!("{reason}: request_id={request_id}");
1005 self.auth_tracker.fail(reason.clone());
1006 return Some(NautilusWsMessage::AuthenticationFailed(
1007 reason,
1008 ));
1009 }
1010 }
1011 }
1012 }
1013 PendingRequestType::Subscribe { channels } => {
1014 if let Some(error) = &response.error {
1015 log::error!(
1016 "Subscribe failed: code={}, message={}, channels={:?}, request_id={}",
1017 error.code,
1018 error.message,
1019 channels,
1020 request_id
1021 );
1022
1023 if let Ok(mut errors) = self.subscribe_errors.lock() {
1024 errors.push(format!(
1025 "Subscribe rejected: code={}, message={}",
1026 error.code, error.message,
1027 ));
1028 }
1029 } else {
1030 // Confirm each channel in the subscription
1031 for ch in &channels {
1032 self.subscriptions_state.confirm_subscribe(ch);
1033 log::debug!("Subscription confirmed: {ch}");
1034 }
1035 }
1036 }
1037 PendingRequestType::Unsubscribe { channels } => {
1038 if let Some(error) = &response.error {
1039 log::error!(
1040 "Unsubscribe failed: code={}, message={}, channels={:?}, request_id={}",
1041 error.code,
1042 error.message,
1043 channels,
1044 request_id
1045 );
1046 } else {
1047 for ch in &channels {
1048 self.subscriptions_state.confirm_unsubscribe(ch);
1049 log::debug!("Unsubscription confirmed: {ch}");
1050 }
1051 }
1052
1053 // Resubscribe channels pending book resync (kept in
1054 // pending_book_resync until a fresh snapshot arrives)
1055 if !self.pending_book_resync.is_empty() {
1056 let resync: Vec<String> = channels
1057 .iter()
1058 .filter(|ch| self.pending_book_resync.contains(ch))
1059 .cloned()
1060 .collect();
1061
1062 if !resync.is_empty() {
1063 let _ = self.handle_subscribe(resync).await;
1064 }
1065 }
1066 }
1067 PendingRequestType::SetHeartbeat => {
1068 if let Some(error) = &response.error {
1069 log::error!(
1070 "Set heartbeat failed: code={}, message={}, request_id={}",
1071 error.code,
1072 error.message,
1073 request_id
1074 );
1075 } else {
1076 log::debug!("Heartbeat enabled (request_id={request_id})");
1077 }
1078 }
1079 PendingRequestType::Test => {
1080 if let Some(error) = &response.error {
1081 log::warn!(
1082 "Heartbeat test failed: code={}, message={}, request_id={}",
1083 error.code,
1084 error.message,
1085 request_id
1086 );
1087 } else {
1088 log::trace!(
1089 "Heartbeat test acknowledged (request_id={request_id})"
1090 );
1091 }
1092 }
1093 PendingRequestType::Cancel {
1094 client_order_id,
1095 trader_id,
1096 strategy_id,
1097 instrument_id,
1098 } => {
1099 if let Some(result) = &response.result {
1100 match serde_json::from_value::<DeribitOrderMsg>(result.clone()) {
1101 Ok(order_msg) => {
1102 let venue_order_id =
1103 VenueOrderId::new(order_msg.order_id.as_str());
1104 log::debug!(
1105 "Cancel confirmed: venue_order_id={venue_order_id}, \
1106 client_order_id={client_order_id}, state={}",
1107 order_msg.order_state
1108 );
1109
1110 // Emit OrderCanceled from the response path so we
1111 // do not lose the event during a reconnection gap.
1112 // Both paths check terminal_orders to suppress
1113 // duplicates regardless of which arrives first.
1114 if order_msg.order_state == "cancelled"
1115 && !self.terminal_orders.contains(&client_order_id)
1116 {
1117 let instrument_name_ustr =
1118 Ustr::from(order_msg.instrument_name.as_str());
1119
1120 if let Some(instrument) =
1121 self.instruments_cache.get(&instrument_name_ustr)
1122 && let Some(account_id) = self.account_id
1123 {
1124 let event = parse_order_canceled(
1125 &order_msg,
1126 instrument,
1127 account_id,
1128 trader_id,
1129 strategy_id,
1130 ts_init,
1131 );
1132 // Keep order_contexts so the subscription
1133 // path resolves the same client_order_id
1134 // and hits the terminal_orders dedup check
1135 self.terminal_orders.add(client_order_id);
1136 return Some(NautilusWsMessage::OrderCanceled(
1137 event,
1138 ));
1139 }
1140 }
1141 }
1142 Err(e) => {
1143 log::error!(
1144 "Failed to parse cancel response: request_id={request_id}, error={e}"
1145 );
1146 }
1147 }
1148 } else if let Some(error) = &response.error {
1149 log::error!(
1150 "Cancel rejected: code={}, message={}, client_order_id={}",
1151 error.code,
1152 error.message,
1153 client_order_id
1154 );
1155 return Some(NautilusWsMessage::OrderCancelRejected(
1156 OrderCancelRejected::new(
1157 trader_id,
1158 strategy_id,
1159 instrument_id,
1160 client_order_id,
1161 ustr::ustr(&format!(
1162 "code={}: {}",
1163 error.code, error.message
1164 )),
1165 UUID4::new(),
1166 ts_init,
1167 ts_init,
1168 false,
1169 None, // venue_order_id not available in error response
1170 self.account_id,
1171 ),
1172 ));
1173 }
1174 }
1175 PendingRequestType::CancelAllByInstrument { instrument_id } => {
1176 if let Some(result) = &response.result {
1177 match serde_json::from_value::<u64>(result.clone()) {
1178 Ok(count) => {
1179 log::info!(
1180 "Cancelled {count} orders for instrument {instrument_id}"
1181 );
1182 // Individual order status updates come via user.orders subscription
1183 }
1184 Err(e) => {
1185 log::warn!("Failed to parse cancel_all response: {e}");
1186 }
1187 }
1188 } else if let Some(error) = &response.error {
1189 log::error!(
1190 "Cancel all by instrument rejected: code={}, message={}, instrument_id={}",
1191 error.code,
1192 error.message,
1193 instrument_id
1194 );
1195 }
1196 }
1197 PendingRequestType::Buy {
1198 client_order_id,
1199 trader_id,
1200 strategy_id,
1201 instrument_id,
1202 }
1203 | PendingRequestType::Sell {
1204 client_order_id,
1205 trader_id,
1206 strategy_id,
1207 instrument_id,
1208 } => {
1209 if let Some(result) = &response.result {
1210 match serde_json::from_value::<DeribitOrderResponse>(result.clone())
1211 {
1212 Ok(order_response) => {
1213 let venue_order_id_str = &order_response.order.order_id;
1214 let venue_order_id =
1215 VenueOrderId::new(venue_order_id_str.as_str());
1216 let order_state = &order_response.order.order_state;
1217 log::debug!(
1218 "Order response: venue_order_id={venue_order_id}, client_order_id={client_order_id}, state={order_state}"
1219 );
1220
1221 self.order_contexts.insert(
1222 venue_order_id,
1223 OrderContext {
1224 client_order_id,
1225 trader_id,
1226 strategy_id,
1227 instrument_id,
1228 },
1229 );
1230
1231 // Skip OrderAccepted if order already reached terminal state
1232 if self.terminal_orders.contains(&client_order_id) {
1233 log::debug!(
1234 "Skipping OrderAccepted for terminal order: client_order_id={client_order_id}"
1235 );
1236 self.emitted_accepted.add(venue_order_id);
1237 } else if order_state == "filled" {
1238 // Order went directly Submitted -> Filled (e.g., market orders)
1239 log::debug!(
1240 "Skipping OrderAccepted for already filled order: venue_order_id={venue_order_id}, client_order_id={client_order_id}"
1241 );
1242 self.terminal_orders.add(client_order_id);
1243 self.emitted_accepted.add(venue_order_id);
1244 } else {
1245 let instrument_name_ustr = Ustr::from(
1246 order_response.order.instrument_name.as_str(),
1247 );
1248
1249 if let Some(instrument) =
1250 self.instruments_cache.get(&instrument_name_ustr)
1251 {
1252 if let Some(account_id) = self.account_id {
1253 let event = parse_order_accepted(
1254 &order_response.order,
1255 instrument,
1256 account_id,
1257 trader_id,
1258 strategy_id,
1259 ts_init,
1260 );
1261 // Mark OrderAccepted as emitted to prevent duplicate from subscription
1262 self.emitted_accepted.add(venue_order_id);
1263 return Some(NautilusWsMessage::OrderAccepted(
1264 event,
1265 ));
1266 } else {
1267 log::warn!(
1268 "Cannot create OrderAccepted: account_id not set"
1269 );
1270 }
1271 } else {
1272 log::warn!(
1273 "Instrument {instrument_name_ustr} not found in cache for order response"
1274 );
1275 }
1276 }
1277 }
1278 Err(e) => {
1279 log::error!(
1280 "Failed to parse order response: request_id={request_id}, error={e}"
1281 );
1282 return Some(NautilusWsMessage::OrderRejected(
1283 OrderRejected::new(
1284 trader_id,
1285 strategy_id,
1286 instrument_id,
1287 client_order_id,
1288 self.account_id
1289 .unwrap_or(AccountId::new("DERIBIT-UNKNOWN")),
1290 ustr::ustr(&format!(
1291 "Failed to parse response: {e}"
1292 )),
1293 UUID4::new(),
1294 ts_init,
1295 ts_init,
1296 false,
1297 false,
1298 ),
1299 ));
1300 }
1301 }
1302 } else if let Some(error) = &response.error {
1303 let due_post_only = error.code == DERIBIT_POST_ONLY_ERROR_CODE;
1304 let reason = if let Some(data) = &error.data {
1305 format!(
1306 "code={}: {} (data: {})",
1307 error.code, error.message, data
1308 )
1309 } else {
1310 format!("code={}: {}", error.code, error.message)
1311 };
1312
1313 log::debug!(
1314 "Order rejected: {reason}, client_order_id={client_order_id}"
1315 );
1316 return Some(NautilusWsMessage::OrderRejected(OrderRejected::new(
1317 trader_id,
1318 strategy_id,
1319 instrument_id,
1320 client_order_id,
1321 self.account_id.unwrap_or(AccountId::new("DERIBIT-UNKNOWN")),
1322 ustr::ustr(&reason),
1323 UUID4::new(),
1324 ts_init,
1325 ts_init,
1326 false,
1327 due_post_only,
1328 )));
1329 }
1330 }
1331 PendingRequestType::Edit {
1332 client_order_id,
1333 trader_id,
1334 strategy_id,
1335 instrument_id,
1336 } => {
1337 if let Some(result) = &response.result {
1338 match serde_json::from_value::<DeribitOrderResponse>(result.clone())
1339 {
1340 Ok(order_response) => {
1341 let venue_order_id =
1342 VenueOrderId::new(&order_response.order.order_id);
1343 log::info!(
1344 "Order updated: venue_order_id={}, client_order_id={}, state={}",
1345 venue_order_id,
1346 client_order_id,
1347 order_response.order.order_state
1348 );
1349
1350 self.order_contexts.insert(
1351 venue_order_id,
1352 OrderContext {
1353 client_order_id,
1354 trader_id,
1355 strategy_id,
1356 instrument_id,
1357 },
1358 );
1359
1360 let instrument_name_ustr = Ustr::from(
1361 order_response.order.instrument_name.as_str(),
1362 );
1363
1364 if let Some(instrument) =
1365 self.instruments_cache.get(&instrument_name_ustr)
1366 {
1367 if let Some(account_id) = self.account_id {
1368 let event = parse_order_updated(
1369 &order_response.order,
1370 instrument,
1371 account_id,
1372 trader_id,
1373 strategy_id,
1374 ts_init,
1375 );
1376 return Some(NautilusWsMessage::OrderUpdated(
1377 event,
1378 ));
1379 } else {
1380 log::warn!(
1381 "Cannot create OrderUpdated: account_id not set"
1382 );
1383 }
1384 } else {
1385 log::warn!(
1386 "Instrument {instrument_name_ustr} not found in cache for edit response"
1387 );
1388 }
1389 }
1390 Err(e) => {
1391 log::error!(
1392 "Failed to parse edit response: request_id={request_id}, error={e}"
1393 );
1394 return Some(NautilusWsMessage::OrderModifyRejected(
1395 OrderModifyRejected::new(
1396 trader_id,
1397 strategy_id,
1398 instrument_id,
1399 client_order_id,
1400 ustr::ustr(&format!(
1401 "Failed to parse response: {e}"
1402 )),
1403 UUID4::new(),
1404 ts_init,
1405 ts_init,
1406 false,
1407 None, // venue_order_id not available
1408 self.account_id,
1409 ),
1410 ));
1411 }
1412 }
1413 } else if let Some(error) = &response.error {
1414 log::error!(
1415 "Order modify rejected: code={}, message={}, client_order_id={}",
1416 error.code,
1417 error.message,
1418 client_order_id
1419 );
1420 return Some(NautilusWsMessage::OrderModifyRejected(
1421 OrderModifyRejected::new(
1422 trader_id,
1423 strategy_id,
1424 instrument_id,
1425 client_order_id,
1426 ustr::ustr(&format!(
1427 "code={}: {}",
1428 error.code, error.message
1429 )),
1430 UUID4::new(),
1431 ts_init,
1432 ts_init,
1433 false,
1434 None, // venue_order_id not available
1435 self.account_id,
1436 ),
1437 ));
1438 }
1439 }
1440 PendingRequestType::GetOrderState {
1441 client_order_id,
1442 trader_id: _,
1443 strategy_id: _,
1444 instrument_id: _,
1445 } => {
1446 if let Some(result) = &response.result {
1447 match serde_json::from_value::<DeribitOrderMsg>(result.clone()) {
1448 Ok(order_msg) => {
1449 log::info!(
1450 "Order state received: venue_order_id={}, client_order_id={}, state={}",
1451 order_msg.order_id,
1452 client_order_id,
1453 order_msg.order_state
1454 );
1455
1456 // Convert to OrderStatusReport
1457 let instrument_name_ustr = order_msg.instrument_name;
1458
1459 if let Some(instrument) =
1460 self.instruments_cache.get(&instrument_name_ustr)
1461 {
1462 if let Some(account_id) = self.account_id {
1463 match parse_user_order_msg(
1464 &order_msg, instrument, account_id, ts_init,
1465 ) {
1466 Ok(report) => {
1467 return Some(
1468 NautilusWsMessage::OrderStatusReports(
1469 vec![report],
1470 ),
1471 );
1472 }
1473 Err(e) => {
1474 log::warn!(
1475 "Failed to parse get_order_state response to report: {e}"
1476 );
1477 }
1478 }
1479 } else {
1480 log::warn!(
1481 "Cannot create OrderStatusReport: account_id not set"
1482 );
1483 }
1484 } else {
1485 log::warn!(
1486 "Instrument {instrument_name_ustr} not found in cache for get_order_state response"
1487 );
1488 }
1489 }
1490 Err(e) => {
1491 log::error!(
1492 "Failed to parse get_order_state response: request_id={request_id}, error={e}"
1493 );
1494 }
1495 }
1496 } else if let Some(error) = &response.error {
1497 log::error!(
1498 "Get order state failed: code={}, message={}, client_order_id={}",
1499 error.code,
1500 error.message,
1501 client_order_id
1502 );
1503 }
1504 }
1505 }
1506 } else if let Some(request_id) = response.id {
1507 // Response with ID but no matching pending request
1508 if let Some(error) = &response.error {
1509 // Log orphaned error response with all available context
1510 log::error!(
1511 "Deribit error for unknown request: code={}, message={}, request_id={}, data={:?}",
1512 error.code,
1513 error.message,
1514 request_id,
1515 error.data
1516 );
1517 return Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
1518 code: error.code,
1519 message: error.message.clone(),
1520 }));
1521 } else {
1522 // Success response but no pending request - likely already processed
1523 log::debug!(
1524 "Received response for unknown request_id={}, result present: {}",
1525 request_id,
1526 response.result.is_some()
1527 );
1528 }
1529 } else if let Some(error) = &response.error {
1530 // Error response with no ID (shouldn't happen in JSON-RPC 2.0, but handle it)
1531 log::error!(
1532 "Deribit error with no request_id: code={}, message={}, data={:?}",
1533 error.code,
1534 error.message,
1535 error.data
1536 );
1537 return Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
1538 code: error.code,
1539 message: error.message.clone(),
1540 }));
1541 }
1542 None
1543 }
1544 DeribitWsMessage::Notification(notification) => {
1545 let channel = ¬ification.params.channel;
1546 let data = ¬ification.params.data;
1547
1548 // Determine channel type and parse accordingly
1549 if let Some(channel_type) = DeribitWsChannel::from_channel_string(channel) {
1550 match channel_type {
1551 DeribitWsChannel::Trades => {
1552 // Parse trade messages
1553 match serde_json::from_value::<Vec<DeribitTradeMsg>>(data.clone()) {
1554 Ok(trades) => {
1555 log::debug!("Received {} trades", trades.len());
1556 let data_vec = parse_trades_data(
1557 &trades,
1558 &self.instruments_cache,
1559 ts_init,
1560 );
1561
1562 if data_vec.is_empty() {
1563 log::debug!(
1564 "No trades parsed - instrument cache size: {}",
1565 self.instruments_cache.len()
1566 );
1567 } else {
1568 log::debug!("Parsed {} trade ticks", data_vec.len());
1569 return Some(NautilusWsMessage::Data(data_vec));
1570 }
1571 }
1572 Err(e) => {
1573 log::warn!("Failed to deserialize trades: {e}");
1574 }
1575 }
1576 }
1577 DeribitWsChannel::Book => {
1578 // Parse order book messages
1579 match serde_json::from_value::<DeribitBookMsg>(data.clone()) {
1580 Ok(book_msg) => {
1581 if let Some(instrument) =
1582 self.instruments_cache.get(&book_msg.instrument_name)
1583 {
1584 let inst_name = book_msg.instrument_name.to_string();
1585 let awaiting_resync =
1586 self.pending_book_resync.iter().any(|ch| {
1587 ch.starts_with("book.")
1588 && ch
1589 .split('.')
1590 .nth(1)
1591 .is_some_and(|s| s == inst_name)
1592 });
1593
1594 if awaiting_resync
1595 && book_msg.msg_type == DeribitBookMsgType::Change
1596 {
1597 // Drop deltas while awaiting resync snapshot
1598 } else if awaiting_resync
1599 && book_msg.msg_type == DeribitBookMsgType::Snapshot
1600 {
1601 self.pending_book_resync.retain(|ch| {
1602 !(ch.starts_with("book.")
1603 && ch
1604 .split('.')
1605 .nth(1)
1606 .is_some_and(|s| s == inst_name))
1607 });
1608 self.book_sequence.insert(
1609 book_msg.instrument_name,
1610 book_msg.change_id,
1611 );
1612
1613 match parse_book_msg(&book_msg, instrument, ts_init) {
1614 Ok(deltas) => {
1615 return Some(NautilusWsMessage::Deltas(deltas));
1616 }
1617 Err(e) => {
1618 log::warn!("Failed to parse book message: {e}");
1619 }
1620 }
1621 } else if book_msg.msg_type == DeribitBookMsgType::Change
1622 && let Some(prev_id) = book_msg.prev_change_id
1623 && let Some(&last_id) =
1624 self.book_sequence.get(&book_msg.instrument_name)
1625 && prev_id != last_id
1626 {
1627 log::error!(
1628 "Book sequence gap for {}: expected prev_change_id={}, was {} \
1629 - dropping delta, forcing resync",
1630 book_msg.instrument_name,
1631 last_id,
1632 prev_id
1633 );
1634 self.book_sequence.remove(&book_msg.instrument_name);
1635
1636 let book_channels: Vec<String> = self
1637 .subscriptions_state
1638 .all_topics()
1639 .into_iter()
1640 .filter(|t| {
1641 t.starts_with("book.")
1642 && t.split('.')
1643 .nth(1)
1644 .is_some_and(|s| s == inst_name)
1645 })
1646 .collect();
1647
1648 if !book_channels.is_empty() {
1649 for ch in &book_channels {
1650 self.subscriptions_state.mark_failure(ch);
1651 }
1652 // Defer resubscribe until unsubscribe ack
1653 self.pending_book_resync
1654 .extend(book_channels.clone());
1655 let _ =
1656 self.handle_unsubscribe(book_channels).await;
1657 }
1658 } else {
1659 self.book_sequence.insert(
1660 book_msg.instrument_name,
1661 book_msg.change_id,
1662 );
1663
1664 match parse_book_msg(&book_msg, instrument, ts_init) {
1665 Ok(deltas) => {
1666 return Some(NautilusWsMessage::Deltas(deltas));
1667 }
1668 Err(e) => {
1669 log::warn!("Failed to parse book message: {e}");
1670 }
1671 }
1672 }
1673 } else {
1674 log::warn!(
1675 "Book message received but instrument '{}' not found in cache (cache size: {})",
1676 book_msg.instrument_name,
1677 self.instruments_cache.len()
1678 );
1679 }
1680 }
1681 Err(e) => {
1682 log::warn!(
1683 "Failed to deserialize book message: {e}, channel: {channel}"
1684 );
1685 }
1686 }
1687 }
1688 DeribitWsChannel::Ticker => {
1689 if let Ok(ticker_msg) =
1690 serde_json::from_value::<DeribitTickerMsg>(data.clone())
1691 && let Some(instrument) =
1692 self.instruments_cache.get(&ticker_msg.instrument_name)
1693 {
1694 // Emit OptionGreeks only if subscribed
1695 if self.option_greeks_subs.contains(&instrument.id())
1696 && let Some(option_greeks) = parse_ticker_to_option_greeks(
1697 &ticker_msg,
1698 instrument,
1699 ts_init,
1700 )
1701 {
1702 let _ = self
1703 .out_tx
1704 .send(NautilusWsMessage::OptionGreeks(option_greeks));
1705 }
1706
1707 let instrument_id = instrument.id();
1708 let mut data_vec = Vec::new();
1709
1710 // Emit MarkPriceUpdate only if subscribed
1711 if self.mark_price_subs.contains(&instrument_id) {
1712 match parse_ticker_to_mark_price(
1713 &ticker_msg,
1714 instrument,
1715 ts_init,
1716 ) {
1717 Ok(mark_price) => {
1718 data_vec.push(Data::MarkPriceUpdate(mark_price));
1719 }
1720 Err(e) => log::warn!("Failed to parse mark price: {e}"),
1721 }
1722 }
1723
1724 // Emit IndexPriceUpdate only if subscribed
1725 if self.index_price_subs.contains(&instrument_id) {
1726 match parse_ticker_to_index_price(
1727 &ticker_msg,
1728 instrument,
1729 ts_init,
1730 ) {
1731 Ok(index_price) => {
1732 data_vec.push(Data::IndexPriceUpdate(index_price));
1733 }
1734 Err(e) => log::warn!("Failed to parse index price: {e}"),
1735 }
1736 }
1737
1738 if !data_vec.is_empty() {
1739 return Some(NautilusWsMessage::Data(data_vec));
1740 }
1741 }
1742 }
1743 DeribitWsChannel::Perpetual => {
1744 // Parse perpetual channel for funding rate updates
1745 // This channel is dedicated to perpetual instruments and provides
1746 // the interest (funding) rate
1747 match serde_json::from_value::<DeribitPerpetualMsg>(data.clone()) {
1748 Ok(perpetual_msg) => {
1749 // Extract instrument name from channel: perpetual.{instrument}.{interval}
1750 let parts: Vec<&str> = channel.split('.').collect();
1751 if parts.len() >= 2 {
1752 let instrument_name = Ustr::from(parts[1]);
1753
1754 if let Some(instrument) =
1755 self.instruments_cache.get(&instrument_name)
1756 {
1757 let funding_rate = parse_perpetual_to_funding_rate(
1758 &perpetual_msg,
1759 instrument,
1760 ts_init,
1761 );
1762 return Some(NautilusWsMessage::FundingRates(vec![
1763 funding_rate,
1764 ]));
1765 } else {
1766 log::warn!(
1767 "Instrument {} not found in cache (cache size: {})",
1768 instrument_name,
1769 self.instruments_cache.len()
1770 );
1771 }
1772 }
1773 }
1774 Err(e) => {
1775 log::warn!(
1776 "Failed to deserialize perpetual message: {e}, data: {data}"
1777 );
1778 }
1779 }
1780 }
1781 DeribitWsChannel::Quote => {
1782 // Parse quote messages
1783 if let Ok(quote_msg) =
1784 serde_json::from_value::<DeribitQuoteMsg>(data.clone())
1785 && let Some(instrument) =
1786 self.instruments_cache.get("e_msg.instrument_name)
1787 {
1788 match parse_quote_msg("e_msg, instrument, ts_init) {
1789 Ok(quote) => {
1790 return Some(NautilusWsMessage::Data(vec![Data::Quote(
1791 quote,
1792 )]));
1793 }
1794 Err(e) => {
1795 log::warn!("Failed to parse quote message: {e}");
1796 }
1797 }
1798 }
1799 }
1800 DeribitWsChannel::InstrumentState => {
1801 match serde_json::from_value::<DeribitInstrumentStateMsg>(data.clone())
1802 {
1803 Ok(state_msg) => {
1804 log::info!(
1805 "Instrument state change: {} -> {} (timestamp: {})",
1806 state_msg.instrument_name,
1807 state_msg.state,
1808 state_msg.timestamp
1809 );
1810
1811 let instrument_id = if let Some(instrument) =
1812 self.instruments_cache.get(&state_msg.instrument_name)
1813 {
1814 instrument.id()
1815 } else {
1816 log::debug!(
1817 "Instrument '{}' not in cache, constructing ID",
1818 state_msg.instrument_name
1819 );
1820 InstrumentId::new(
1821 Symbol::new(state_msg.instrument_name),
1822 *DERIBIT_VENUE,
1823 )
1824 };
1825
1826 let action = MarketStatusAction::from(state_msg.state);
1827 let is_trading =
1828 Some(state_msg.state == DeribitInstrumentState::Started);
1829 let ts_event = UnixNanos::from(state_msg.timestamp * 1_000_000);
1830 let status = InstrumentStatus::new(
1831 instrument_id,
1832 action,
1833 ts_event,
1834 ts_init,
1835 None,
1836 None,
1837 is_trading,
1838 None,
1839 None,
1840 );
1841 return Some(NautilusWsMessage::InstrumentStatus(status));
1842 }
1843 Err(e) => {
1844 log::warn!("Failed to parse instrument status message: {e}");
1845 }
1846 }
1847 }
1848 DeribitWsChannel::ChartTrades => {
1849 // Parse chart.trades messages into Bar objects using emit-on-next pattern.
1850 // Deribit sends updates for the current bar as it builds. We only emit
1851 // a bar when we receive a bar with a different timestamp, confirming
1852 // the previous bar is closed.
1853 if let Ok(chart_msg) =
1854 serde_json::from_value::<DeribitChartMsg>(data.clone())
1855 {
1856 // Extract instrument and resolution from channel
1857 // Channel format: chart.trades.{instrument}.{resolution}
1858 let parts: Vec<&str> = channel.split('.').collect();
1859 if parts.len() >= 4 {
1860 let instrument_name = Ustr::from(parts[2]);
1861 let resolution = parts[3];
1862
1863 if let Some(instrument) =
1864 self.instruments_cache.get(&instrument_name)
1865 {
1866 let instrument_id = instrument.id();
1867
1868 match resolution_to_bar_type(instrument_id, resolution) {
1869 Ok(bar_type) => {
1870 let price_precision = instrument.price_precision();
1871 let size_precision = instrument.size_precision();
1872
1873 match parse_chart_msg(
1874 &chart_msg,
1875 bar_type,
1876 price_precision,
1877 size_precision,
1878 self.bars_timestamp_on_close,
1879 ts_init,
1880 ) {
1881 Ok(new_bar) => {
1882 // Check if we have a pending bar for this channel
1883 let channel_key = channel.clone();
1884
1885 if let Some(pending_bar) =
1886 self.pending_bars.get(&channel_key)
1887 {
1888 // If new bar has different timestamp, the pending bar is closed
1889 if new_bar.ts_event
1890 != pending_bar.ts_event
1891 {
1892 let closed_bar = *pending_bar;
1893 self.pending_bars
1894 .insert(channel_key, new_bar);
1895 log::debug!(
1896 "Emitting closed bar: {closed_bar:?}"
1897 );
1898 return Some(
1899 NautilusWsMessage::Data(vec![
1900 Data::Bar(closed_bar),
1901 ]),
1902 );
1903 }
1904 // Same timestamp - update pending bar with latest values
1905 self.pending_bars
1906 .insert(channel_key, new_bar);
1907 } else {
1908 // First bar for this channel - store as pending
1909 self.pending_bars
1910 .insert(channel_key, new_bar);
1911 }
1912 }
1913 Err(e) => {
1914 log::warn!(
1915 "Failed to parse chart message to bar: {e}"
1916 );
1917 }
1918 }
1919 }
1920 Err(e) => {
1921 log::warn!(
1922 "Failed to create BarType from resolution {resolution}: {e}"
1923 );
1924 }
1925 }
1926 } else {
1927 log::warn!(
1928 "Instrument {instrument_name} not found in cache for chart data"
1929 );
1930 }
1931 }
1932 }
1933 }
1934 DeribitWsChannel::UserOrders => {
1935 // Handle both array and single object responses
1936 let orders_result =
1937 serde_json::from_value::<Vec<DeribitOrderMsg>>(data.clone())
1938 .or_else(|_| {
1939 serde_json::from_value::<DeribitOrderMsg>(data.clone())
1940 .map(|order| vec![order])
1941 });
1942
1943 match orders_result {
1944 Ok(orders) => {
1945 log::debug!("Received {} user order updates", orders.len());
1946
1947 // Require account_id for parsing
1948 let Some(account_id) = self.account_id else {
1949 log::warn!("Cannot parse user orders: account_id not set");
1950 return Some(NautilusWsMessage::Raw(data.clone()));
1951 };
1952
1953 let mut outgoing = Vec::new();
1954
1955 // Process each order and emit appropriate events
1956 for order in &orders {
1957 let venue_order_id_str = &order.order_id;
1958 let venue_order_id =
1959 VenueOrderId::new(venue_order_id_str.as_str());
1960 let instrument_name = order.instrument_name;
1961
1962 let Some(instrument) =
1963 self.instruments_cache.get(&instrument_name)
1964 else {
1965 log::warn!(
1966 "Instrument {instrument_name} not found in cache"
1967 );
1968 continue;
1969 };
1970
1971 // Look up OrderContext for this order
1972 // First check order_contexts (for orders whose response has been processed)
1973 // Then check pending_requests (for orders whose response hasn't arrived yet)
1974 // If neither found, this is a true external order
1975 let context =
1976 self.order_contexts.get(&venue_order_id).cloned();
1977
1978 // Extract client_order_id from order label for pending check
1979 let label_client_order_id = order
1980 .label
1981 .as_ref()
1982 .filter(|l| !l.is_empty())
1983 .map(ClientOrderId::new);
1984
1985 // Check for pending request if not in order_contexts
1986 let pending_context = if context.is_none() {
1987 if let Some(client_id) = &label_client_order_id {
1988 self.get_pending_order_context(client_id)
1989 } else {
1990 None
1991 }
1992 } else {
1993 None
1994 };
1995
1996 // Check if order has a pending request for context resolution
1997 let has_pending_request =
1998 if let Some(client_id) = &label_client_order_id {
1999 self.is_pending_order(client_id)
2000 } else {
2001 false
2002 };
2003
2004 let effective_context = context.or(pending_context);
2005 let is_known_order =
2006 effective_context.is_some() || has_pending_request;
2007
2008 // Determine event type based on order state
2009 let event_type = determine_order_event_type(
2010 &order.order_state,
2011 !is_known_order, // is_new if we don't know about it
2012 false, // not from edit response
2013 );
2014
2015 let (trader_id, strategy_id, client_order_id) =
2016 if let Some(ctx) = effective_context {
2017 (
2018 ctx.trader_id,
2019 ctx.strategy_id,
2020 ctx.client_order_id,
2021 )
2022 } else {
2023 // External order - use default values
2024 // Note: These won't match any strategy, which is correct
2025 (
2026 TraderId::new("EXTERNAL-000"),
2027 StrategyId::new("EXTERNAL"),
2028 ClientOrderId::new(venue_order_id_str),
2029 )
2030 };
2031
2032 match event_type {
2033 OrderEventType::Accepted => {
2034 // Skip if order already reached terminal state (race condition)
2035 if self.terminal_orders.contains(&client_order_id) {
2036 log::debug!(
2037 "Skipping OrderAccepted for terminal order: client_order_id={client_order_id}"
2038 );
2039 continue;
2040 }
2041
2042 // Check if we already emitted OrderAccepted for this order
2043 // This prevents duplicates from both response and subscription paths
2044 if self.emitted_accepted.contains(&venue_order_id) {
2045 log::trace!(
2046 "Skipping duplicate OrderAccepted: venue_order_id={venue_order_id}"
2047 );
2048 continue;
2049 }
2050
2051 let event = parse_order_accepted(
2052 order,
2053 instrument,
2054 account_id,
2055 trader_id,
2056 strategy_id,
2057 ts_init,
2058 );
2059
2060 // Mark OrderAccepted as emitted
2061 self.emitted_accepted.add(venue_order_id);
2062
2063 log::debug!(
2064 "Emitting OrderAccepted: venue_order_id={venue_order_id}, is_known={is_known_order}"
2065 );
2066 outgoing
2067 .push(NautilusWsMessage::OrderAccepted(event));
2068 }
2069 OrderEventType::Canceled => {
2070 // Skip if already emitted from the cancel
2071 // response path
2072 if self.terminal_orders.contains(&client_order_id) {
2073 log::trace!(
2074 "Skipping duplicate OrderCanceled: client_order_id={client_order_id}"
2075 );
2076 continue;
2077 }
2078
2079 let event = parse_order_canceled(
2080 order,
2081 instrument,
2082 account_id,
2083 trader_id,
2084 strategy_id,
2085 ts_init,
2086 );
2087 log::debug!(
2088 "Emitting OrderCanceled: venue_order_id={venue_order_id}"
2089 );
2090 self.terminal_orders.add(client_order_id);
2091 self.order_contexts.remove(&venue_order_id);
2092 self.emitted_accepted.remove(&venue_order_id);
2093 outgoing
2094 .push(NautilusWsMessage::OrderCanceled(event));
2095 }
2096 OrderEventType::Expired => {
2097 let event = parse_order_expired(
2098 order,
2099 instrument,
2100 account_id,
2101 trader_id,
2102 strategy_id,
2103 ts_init,
2104 );
2105 log::debug!(
2106 "Emitting OrderExpired: venue_order_id={venue_order_id}"
2107 );
2108 self.terminal_orders.add(client_order_id);
2109 self.order_contexts.remove(&venue_order_id);
2110 self.emitted_accepted.remove(&venue_order_id);
2111 outgoing
2112 .push(NautilusWsMessage::OrderExpired(event));
2113 }
2114 OrderEventType::Updated => {
2115 // Emit OrderStatusReport for updates
2116 // This includes quantity/price changes from modify
2117 match parse_user_order_msg(
2118 order, instrument, account_id, ts_init,
2119 ) {
2120 Ok(report) => {
2121 log::debug!(
2122 "Emitting OrderStatusReport (updated): venue_order_id={venue_order_id}"
2123 );
2124 outgoing.push(
2125 NautilusWsMessage::OrderStatusReports(
2126 vec![report],
2127 ),
2128 );
2129 }
2130 Err(e) => {
2131 log::warn!(
2132 "Failed to parse order update: {e}"
2133 );
2134 }
2135 }
2136 }
2137 OrderEventType::None => {
2138 // Fills handled via user.trades, track terminal state
2139 // for race condition prevention
2140 if matches!(
2141 order.order_state.as_str(),
2142 "filled" | "rejected"
2143 ) {
2144 log::debug!(
2145 "Recording terminal order: venue_order_id={venue_order_id}, state={}",
2146 order.order_state
2147 );
2148 self.terminal_orders.add(client_order_id);
2149 self.order_contexts.remove(&venue_order_id);
2150 self.emitted_accepted.remove(&venue_order_id);
2151 } else {
2152 log::trace!(
2153 "No event to emit for order {}, state={}",
2154 venue_order_id,
2155 order.order_state
2156 );
2157 }
2158 }
2159 }
2160 }
2161
2162 if !outgoing.is_empty() {
2163 self.pending_outgoing.extend(outgoing);
2164 }
2165 }
2166 Err(e) => {
2167 log::warn!("Failed to deserialize user orders: {e}");
2168 }
2169 }
2170 }
2171 DeribitWsChannel::UserTrades => {
2172 // Handle both array and single object responses
2173 let trades_result =
2174 serde_json::from_value::<Vec<DeribitUserTradeMsg>>(data.clone())
2175 .or_else(|_| {
2176 serde_json::from_value::<DeribitUserTradeMsg>(data.clone())
2177 .map(|trade| vec![trade])
2178 });
2179
2180 match trades_result {
2181 Ok(trades) => {
2182 log::debug!("Received {} user trade updates", trades.len());
2183
2184 let Some(account_id) = self.account_id else {
2185 log::warn!("Cannot parse user trades: account_id not set");
2186 return Some(NautilusWsMessage::Raw(data.clone()));
2187 };
2188
2189 let mut reports = Vec::with_capacity(trades.len());
2190 for trade in &trades {
2191 let instrument_name = trade.instrument_name;
2192
2193 if let Some(instrument) =
2194 self.instruments_cache.get(&instrument_name)
2195 {
2196 match parse_user_trade_msg(
2197 trade, instrument, account_id, ts_init,
2198 ) {
2199 Ok(report) => {
2200 log::debug!(
2201 "Parsed fill report: {} @ {}",
2202 report.trade_id,
2203 report.last_px
2204 );
2205 reports.push(report);
2206 }
2207 Err(e) => {
2208 log::warn!(
2209 "Failed to parse trade {}: {e}",
2210 trade.trade_id
2211 );
2212 }
2213 }
2214 } else {
2215 log::warn!(
2216 "Instrument {instrument_name} not found in cache"
2217 );
2218 }
2219 }
2220
2221 if !reports.is_empty() {
2222 return Some(NautilusWsMessage::FillReports(reports));
2223 }
2224 }
2225 Err(e) => {
2226 log::warn!("Failed to deserialize user trades: {e}");
2227 }
2228 }
2229 }
2230 DeribitWsChannel::UserPortfolio => {
2231 match serde_json::from_value::<DeribitPortfolioMsg>(data.clone()) {
2232 Ok(portfolio) => {
2233 // Skip zero-balance currencies (common with cross-collateral)
2234 // Only check equity and balance - initial_margin can be non-zero
2235 // for all currencies when cross-collateral is enabled
2236 if portfolio.equity.is_zero() && portfolio.balance.is_zero() {
2237 log::trace!(
2238 "Skipping zero-balance portfolio for {}",
2239 portfolio.currency
2240 );
2241 return None;
2242 }
2243
2244 // Require account_id for parsing
2245 let Some(account_id) = self.account_id else {
2246 log::warn!("Cannot parse portfolio: account_id not set");
2247 return None;
2248 };
2249
2250 match parse_portfolio_to_account_state(
2251 &portfolio, account_id, ts_init,
2252 ) {
2253 Ok(account_state) => {
2254 // Check for duplicate per currency
2255 let currency_key = portfolio.currency.clone();
2256
2257 if let Some(last) =
2258 self.last_account_states.get(¤cy_key)
2259 && account_state.has_same_balances_and_margins(last)
2260 {
2261 log::trace!(
2262 "Skipping duplicate portfolio update for {}",
2263 portfolio.currency
2264 );
2265 return None;
2266 }
2267
2268 self.last_account_states
2269 .insert(currency_key, account_state.clone());
2270 return Some(NautilusWsMessage::AccountState(
2271 account_state,
2272 ));
2273 }
2274 Err(e) => {
2275 log::warn!(
2276 "Failed to parse portfolio to AccountState: {e}"
2277 );
2278 }
2279 }
2280 }
2281 Err(e) => {
2282 log::warn!("Failed to deserialize portfolio: {e}");
2283 }
2284 }
2285 }
2286 _ => {
2287 // Unhandled channel - return raw
2288 log::trace!("Unhandled channel: {channel}");
2289 return Some(NautilusWsMessage::Raw(data.clone()));
2290 }
2291 }
2292 } else {
2293 log::trace!("Unknown channel: {channel}");
2294 return Some(NautilusWsMessage::Raw(data.clone()));
2295 }
2296 None
2297 }
2298 DeribitWsMessage::Heartbeat(heartbeat) => {
2299 match heartbeat.heartbeat_type {
2300 DeribitHeartbeatType::TestRequest => {
2301 log::trace!(
2302 "Received heartbeat test_request - responding with public/test"
2303 );
2304
2305 if let Err(e) = self.handle_heartbeat_test_request().await {
2306 log::error!("Failed to respond to heartbeat test_request: {e}");
2307
2308 // Return error to signal connection may be unhealthy
2309 return Some(NautilusWsMessage::Error(DeribitWsError::Send(format!(
2310 "Heartbeat response failed: {e}"
2311 ))));
2312 }
2313 }
2314 DeribitHeartbeatType::Heartbeat => {
2315 log::trace!("Received heartbeat acknowledgment");
2316 }
2317 }
2318 None
2319 }
2320 DeribitWsMessage::Error(err) => {
2321 log::error!("Deribit error {}: {}", err.code, err.message);
2322 Some(NautilusWsMessage::Error(DeribitWsError::DeribitError {
2323 code: err.code,
2324 message: err.message,
2325 }))
2326 }
2327 DeribitWsMessage::Reconnected => Some(NautilusWsMessage::Reconnected),
2328 }
2329 }
2330
2331 /// Main message processing loop.
2332 ///
2333 /// Returns `None` when the handler should stop.
2334 /// Messages that need client-side handling (e.g., Reconnected) are returned.
2335 /// Data messages are sent directly to `out_tx` for the user stream.
2336 pub async fn next(&mut self) -> Option<NautilusWsMessage> {
2337 loop {
2338 if let Some(msg) = self.pending_outgoing.pop_front() {
2339 match msg {
2340 NautilusWsMessage::Reconnected
2341 | NautilusWsMessage::Authenticated(_)
2342 | NautilusWsMessage::AuthenticationFailed(_) => {
2343 return Some(msg);
2344 }
2345 _ => {
2346 let _ = self.out_tx.send(msg);
2347 continue;
2348 }
2349 }
2350 }
2351
2352 tokio::select! {
2353 // Process commands from client
2354 Some(cmd) = self.cmd_rx.recv() => {
2355 self.process_command(cmd).await;
2356 }
2357 // Process raw WebSocket messages
2358 Some(msg) = self.raw_rx.recv() => {
2359 match msg {
2360 Message::Text(text) => {
2361 if let Some(nautilus_msg) = self.process_raw_message(&text).await {
2362 // Send data messages to user stream
2363 match &nautilus_msg {
2364 NautilusWsMessage::Data(_)
2365 | NautilusWsMessage::Deltas(_)
2366 | NautilusWsMessage::Instrument(_)
2367 | NautilusWsMessage::InstrumentStatus(_)
2368 | NautilusWsMessage::OptionGreeks(_)
2369 | NautilusWsMessage::Raw(_)
2370 | NautilusWsMessage::Error(_) => {
2371 let _ = self.out_tx.send(nautilus_msg);
2372 }
2373 NautilusWsMessage::FundingRates(rates) => {
2374 let msg_to_send =
2375 NautilusWsMessage::FundingRates(rates.clone());
2376
2377 if let Err(e) = self.out_tx.send(msg_to_send) {
2378 log::error!("Failed to send funding rates: {e}");
2379 }
2380 }
2381 NautilusWsMessage::OrderStatusReports(_)
2382 | NautilusWsMessage::FillReports(_)
2383 | NautilusWsMessage::OrderAccepted(_)
2384 | NautilusWsMessage::OrderCanceled(_)
2385 | NautilusWsMessage::OrderExpired(_)
2386 | NautilusWsMessage::OrderUpdated(_)
2387 | NautilusWsMessage::OrderRejected(_)
2388 | NautilusWsMessage::OrderCancelRejected(_)
2389 | NautilusWsMessage::OrderModifyRejected(_)
2390 | NautilusWsMessage::AccountState(_) => {
2391 let _ = self.out_tx.send(nautilus_msg);
2392 }
2393 // Return messages that need client-side handling
2394 NautilusWsMessage::Reconnected
2395 | NautilusWsMessage::Authenticated(_)
2396 | NautilusWsMessage::AuthenticationFailed(_) => {
2397 return Some(nautilus_msg);
2398 }
2399 }
2400 }
2401 }
2402 Message::Ping(data) => {
2403 // Respond to ping with pong
2404 if let Some(client) = &self.inner {
2405 let _ = client.send_pong(data.to_vec()).await;
2406 }
2407 }
2408 Message::Close(_) => {
2409 log::info!("Received close frame");
2410 }
2411 _ => {}
2412 }
2413 }
2414 // Check for stop signal
2415 () = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
2416 if self.signal.load(Ordering::Relaxed) {
2417 log::debug!("Stop signal received");
2418 return None;
2419 }
2420 }
2421 }
2422 }
2423 }
2424}