Skip to main content

nautilus_dydx/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Message handler for dYdX WebSocket streams.
17//!
18//! The handler owns the WebSocketClient exclusively and runs in a dedicated
19//! Tokio task within the lock-free I/O boundary. It deserializes raw messages
20//! into venue-specific types without converting to Nautilus domain objects.
21
22use std::{
23    collections::VecDeque,
24    fmt::Debug,
25    sync::{
26        Arc,
27        atomic::{AtomicBool, Ordering},
28    },
29};
30
31use ahash::AHashMap;
32use nautilus_network::{
33    RECONNECTED,
34    retry::{RetryManager, create_websocket_retry_manager},
35    websocket::{SubscriptionState, WebSocketClient},
36};
37use tokio_tungstenite::tungstenite::Message;
38use ustr::Ustr;
39
40use super::{
41    DydxWsError, DydxWsResult,
42    client::DYDX_RATE_LIMIT_KEY_SUBSCRIPTION,
43    enums::{DydxWsChannel, DydxWsMessage, DydxWsOutputMessage},
44    error::DydxWebSocketError,
45    messages::{
46        DydxCandle, DydxMarketsContents, DydxOrderbookContents, DydxOrderbookSnapshotContents,
47        DydxSubscription, DydxTradeContents, DydxWsBlockHeightMessage, DydxWsCandlesMessage,
48        DydxWsChannelBatchDataMsg, DydxWsChannelDataMsg, DydxWsConnectedMsg, DydxWsFeedMessage,
49        DydxWsGenericMsg, DydxWsMarketsMessage, DydxWsOrderbookMessage,
50        DydxWsParentSubaccountsMessage, DydxWsSubaccountsChannelContents,
51        DydxWsSubaccountsChannelData, DydxWsSubaccountsMessage, DydxWsSubaccountsSubscribed,
52        DydxWsSubscriptionMsg, DydxWsTradesMessage,
53    },
54};
55
56/// Commands sent to the feed handler.
57#[derive(Debug, Clone)]
58pub enum HandlerCommand {
59    /// Registers a subscription message for replay.
60    RegisterSubscription {
61        topic: String,
62        subscription: DydxSubscription,
63    },
64    /// Unregisters a subscription message.
65    UnregisterSubscription { topic: String },
66    /// Sends a text message via WebSocket.
67    SendText(String),
68    /// Disconnects the WebSocket client.
69    Disconnect,
70}
71
72/// Deserializes incoming WebSocket messages into venue-specific types.
73///
74/// The handler owns the WebSocketClient exclusively within the lock-free I/O boundary,
75/// eliminating RwLock contention on the hot path.
76pub struct FeedHandler {
77    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
78    out_tx: tokio::sync::mpsc::UnboundedSender<DydxWsOutputMessage>,
79    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
80    client: WebSocketClient,
81    signal: Arc<AtomicBool>,
82    retry_manager: RetryManager<DydxWsError>,
83    subscriptions: SubscriptionState,
84    subscription_messages: AHashMap<String, DydxSubscription>,
85    message_buffer: VecDeque<DydxWsOutputMessage>,
86    book_sequence: AHashMap<String, u64>,
87}
88
89impl Debug for FeedHandler {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        f.debug_struct(stringify!(FeedHandler))
92            .field("subscriptions", &self.subscriptions.len())
93            .finish_non_exhaustive()
94    }
95}
96
97impl FeedHandler {
98    /// Creates a new [`FeedHandler`].
99    #[must_use]
100    pub fn new(
101        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
102        out_tx: tokio::sync::mpsc::UnboundedSender<DydxWsOutputMessage>,
103        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
104        client: WebSocketClient,
105        signal: Arc<AtomicBool>,
106        subscriptions: SubscriptionState,
107    ) -> Self {
108        Self {
109            cmd_rx,
110            out_tx,
111            raw_rx,
112            client,
113            signal,
114            retry_manager: create_websocket_retry_manager(),
115            subscriptions,
116            subscription_messages: AHashMap::new(),
117            message_buffer: VecDeque::new(),
118            book_sequence: AHashMap::new(),
119        }
120    }
121
122    async fn send_with_retry(
123        &self,
124        payload: String,
125        rate_limit_keys: Option<&[Ustr]>,
126    ) -> Result<(), DydxWsError> {
127        let keys_owned: Option<Vec<Ustr>> = rate_limit_keys.map(|k| k.to_vec());
128        self.retry_manager
129            .execute_with_retry(
130                "websocket_send",
131                || {
132                    let payload = payload.clone();
133                    let keys = keys_owned.clone();
134                    async move {
135                        self.client
136                            .send_text(payload, keys.as_deref())
137                            .await
138                            .map_err(|e| DydxWsError::ClientError(format!("Send failed: {e}")))
139                    }
140                },
141                should_retry_dydx_error,
142                create_dydx_timeout_error,
143            )
144            .await
145    }
146
147    /// Main processing loop for the handler.
148    ///
149    /// # Panics
150    ///
151    /// This method will not panic. The `expect` call on `iter.next()` is safe
152    /// because we explicitly check that `msgs` is not empty before calling it.
153    pub async fn run(&mut self) {
154        log::debug!("WebSocket handler started");
155
156        loop {
157            // First drain any buffered messages
158            if !self.message_buffer.is_empty() {
159                let msg = self.message_buffer.pop_front().unwrap();
160                if self.out_tx.send(msg).is_err() {
161                    log::debug!("Receiver dropped, stopping handler");
162                    break;
163                }
164                continue;
165            }
166
167            tokio::select! {
168                Some(cmd) = self.cmd_rx.recv() => {
169                    if self.handle_command(cmd).await {
170                        break;
171                    }
172                }
173
174                Some(msg) = self.raw_rx.recv() => {
175                    log::trace!("Handler received raw message");
176                    let msgs = self.process_raw_message(msg).await;
177                    if !msgs.is_empty() {
178                        let mut iter = msgs.into_iter();
179                        // We just checked that msgs is not empty
180                        let first = iter.next().expect("non-empty vec has first element");
181                        self.message_buffer.extend(iter);
182                        log::trace!("Handler sending message: {:?}", std::mem::discriminant(&first));
183                        if self.out_tx.send(first).is_err() {
184                            log::debug!("Receiver dropped, stopping handler");
185                            break;
186                        }
187                    }
188                }
189
190                else => {
191                    log::debug!("Handler shutting down: channels closed");
192                    break;
193                }
194            }
195
196            if self.signal.load(Ordering::Acquire) {
197                log::debug!("Handler received stop signal");
198                break;
199            }
200        }
201    }
202
203    async fn process_raw_message(&mut self, msg: Message) -> Vec<DydxWsOutputMessage> {
204        match msg {
205            Message::Text(txt) => {
206                if txt == RECONNECTED {
207                    self.clear_state();
208
209                    if let Err(e) = self.replay_subscriptions().await {
210                        log::error!("Failed to replay subscriptions after reconnect: {e}");
211                    }
212                    return vec![DydxWsOutputMessage::Reconnected];
213                }
214
215                // Hot path: zero-copy parse for feed messages (orderbook/trades/candles)
216                match serde_json::from_str::<DydxWsFeedMessage>(&txt) {
217                    Ok(feed_msg) => {
218                        return self.handle_feed_message(feed_msg);
219                    }
220                    Err(e) => {
221                        if txt.contains("v4_subaccounts") {
222                            log::warn!(
223                                "[WS_DESER] Failed to parse v4_subaccounts as DydxWsFeedMessage: {e}\nRaw: {txt}"
224                            );
225                        }
226                    }
227                }
228
229                // Cold path: infrequent control messages (connected/subscribed/error)
230                match serde_json::from_str::<serde_json::Value>(&txt) {
231                    Ok(val) => match serde_json::from_value::<DydxWsGenericMsg>(val.clone()) {
232                        Ok(meta) => {
233                            let result = if meta.is_connected() {
234                                serde_json::from_value::<DydxWsConnectedMsg>(val)
235                                    .map(DydxWsMessage::Connected)
236                            } else if meta.is_subscribed() {
237                                log::debug!("Processing subscribed message via fallback path");
238
239                                if let Ok(sub_msg) =
240                                    serde_json::from_value::<DydxWsSubscriptionMsg>(val.clone())
241                                {
242                                    if sub_msg.channel == DydxWsChannel::Subaccounts {
243                                        log::debug!("Parsing subaccounts subscription (fallback)");
244                                        serde_json::from_value::<DydxWsSubaccountsSubscribed>(val)
245                                            .map(DydxWsMessage::SubaccountsSubscribed)
246                                            .or_else(|e| {
247                                                log::warn!(
248                                                    "Failed to parse subaccounts subscription: {e}"
249                                                );
250                                                Ok(DydxWsMessage::Subscribed(sub_msg))
251                                            })
252                                    } else {
253                                        Ok(DydxWsMessage::Subscribed(sub_msg))
254                                    }
255                                } else {
256                                    serde_json::from_value::<DydxWsSubscriptionMsg>(val)
257                                        .map(DydxWsMessage::Subscribed)
258                                }
259                            } else if meta.is_unsubscribed() {
260                                serde_json::from_value::<DydxWsSubscriptionMsg>(val)
261                                    .map(DydxWsMessage::Unsubscribed)
262                            } else if meta.is_error() {
263                                serde_json::from_value::<DydxWebSocketError>(val)
264                                    .map(DydxWsMessage::Error)
265                            } else if meta.is_unknown() {
266                                log::warn!("Received unknown WebSocket message type: {txt}",);
267                                Ok(DydxWsMessage::Raw(val))
268                            } else {
269                                Ok(DydxWsMessage::Raw(val))
270                            };
271
272                            match result {
273                                Ok(dydx_msg) => self.handle_dydx_message(dydx_msg).await,
274                                Err(e) => {
275                                    log::error!(
276                                        "Failed to parse WebSocket message: {e}. Message type: {:?}, Channel: {:?}. Raw: {txt}",
277                                        meta.msg_type,
278                                        meta.channel,
279                                    );
280                                    vec![]
281                                }
282                            }
283                        }
284                        Err(e) => {
285                            log::error!(
286                                "Failed to parse WebSocket message envelope (DydxWsGenericMsg): {e}\nRaw JSON:\n{txt}"
287                            );
288                            vec![]
289                        }
290                    },
291                    Err(e) => {
292                        let err = DydxWebSocketError::from_message(e.to_string());
293                        vec![DydxWsOutputMessage::Error(err)]
294                    }
295                }
296            }
297            Message::Pong(_data) => vec![],
298            Message::Ping(_data) => vec![],
299            Message::Binary(_bin) => vec![],
300            Message::Close(_frame) => {
301                log::info!("WebSocket close frame received");
302                vec![]
303            }
304            Message::Frame(_) => vec![],
305        }
306    }
307
308    async fn handle_dydx_message(&mut self, msg: DydxWsMessage) -> Vec<DydxWsOutputMessage> {
309        match self.handle_message(msg).await {
310            Ok(msgs) => msgs,
311            Err(e) => {
312                log::error!("Error handling message: {e}");
313                vec![]
314            }
315        }
316    }
317
318    fn handle_feed_message(&mut self, feed_msg: DydxWsFeedMessage) -> Vec<DydxWsOutputMessage> {
319        log::trace!(
320            "Handling feed message: {:?}",
321            std::mem::discriminant(&feed_msg)
322        );
323
324        match feed_msg {
325            DydxWsFeedMessage::Subaccounts(msg) => self.handle_subaccounts(msg),
326            DydxWsFeedMessage::Orderbook(msg) => self.handle_orderbook(msg),
327            DydxWsFeedMessage::Trades(msg) => self.handle_trades(msg),
328            DydxWsFeedMessage::Markets(msg) => self.handle_markets_feed(msg),
329            DydxWsFeedMessage::Candles(msg) => self.handle_candles_feed(msg),
330            DydxWsFeedMessage::ParentSubaccounts(msg) => self.handle_parent_subaccounts(msg),
331            DydxWsFeedMessage::BlockHeight(msg) => self.handle_block_height_feed(msg),
332        }
333    }
334
335    fn handle_subaccounts(&self, msg: DydxWsSubaccountsMessage) -> Vec<DydxWsOutputMessage> {
336        match msg {
337            DydxWsSubaccountsMessage::Subscribed(data) => {
338                let topic =
339                    self.topic_from_msg(&DydxWsChannel::Subaccounts, &Some(data.id.clone()));
340                self.subscriptions.confirm_subscribe(&topic);
341                log::debug!("Forwarding subaccount subscription to execution client");
342                vec![DydxWsOutputMessage::SubaccountSubscribed(Box::new(data))]
343            }
344            DydxWsSubaccountsMessage::ChannelData(data) => {
345                let has_orders = data.contents.orders.as_ref().is_some_and(|o| !o.is_empty());
346                let has_fills = data.contents.fills.as_ref().is_some_and(|f| !f.is_empty());
347
348                if has_orders || has_fills {
349                    log::debug!(
350                        "Received {} order(s), {} fill(s) - forwarding to execution client",
351                        data.contents.orders.as_ref().map_or(0, |o| o.len()),
352                        data.contents.fills.as_ref().map_or(0, |f| f.len())
353                    );
354                    vec![DydxWsOutputMessage::SubaccountsChannelData(Box::new(data))]
355                } else {
356                    vec![]
357                }
358            }
359            DydxWsSubaccountsMessage::Unsubscribed(data) => {
360                let topic = self.topic_from_msg(&DydxWsChannel::Subaccounts, &data.id);
361                self.subscriptions.confirm_unsubscribe(&topic);
362                vec![]
363            }
364        }
365    }
366
367    fn handle_orderbook(&mut self, msg: DydxWsOrderbookMessage) -> Vec<DydxWsOutputMessage> {
368        match msg {
369            DydxWsOrderbookMessage::Subscribed(data) => {
370                let topic = self.topic_from_msg(&DydxWsChannel::Orderbook, &data.id);
371                self.subscriptions.confirm_subscribe(&topic);
372
373                if let Some(id) = &data.id {
374                    self.book_sequence.insert(id.clone(), data.message_id);
375                }
376
377                self.deserialize_orderbook_snapshot(&data)
378            }
379            DydxWsOrderbookMessage::ChannelData(data) => {
380                if let Some(id) = &data.id {
381                    if let Some(last_id) = self.book_sequence.get(id)
382                        && data.message_id <= *last_id
383                    {
384                        log::warn!(
385                            "Orderbook sequence regression for {id}: last {last_id}, received {}",
386                            data.message_id
387                        );
388                    }
389                    self.book_sequence.insert(id.clone(), data.message_id);
390                }
391                self.deserialize_orderbook_update(&data)
392            }
393            DydxWsOrderbookMessage::ChannelBatchData(data) => {
394                if let Some(id) = &data.id {
395                    if let Some(last_id) = self.book_sequence.get(id)
396                        && data.message_id <= *last_id
397                    {
398                        log::warn!(
399                            "Orderbook batch sequence regression for {id}: last {last_id}, received {}",
400                            data.message_id
401                        );
402                    }
403                    self.book_sequence.insert(id.clone(), data.message_id);
404                }
405                self.deserialize_orderbook_batch(&data)
406            }
407            DydxWsOrderbookMessage::Unsubscribed(data) => {
408                let topic = self.topic_from_msg(&DydxWsChannel::Orderbook, &data.id);
409                self.subscriptions.confirm_unsubscribe(&topic);
410
411                if let Some(id) = &data.id {
412                    self.book_sequence.remove(id);
413                }
414                vec![]
415            }
416        }
417    }
418
419    fn handle_trades(&self, msg: DydxWsTradesMessage) -> Vec<DydxWsOutputMessage> {
420        match msg {
421            DydxWsTradesMessage::Subscribed(data) => {
422                let topic = self.topic_from_msg(&DydxWsChannel::Trades, &data.id);
423                self.subscriptions.confirm_subscribe(&topic);
424                self.deserialize_trades(&data)
425            }
426            DydxWsTradesMessage::ChannelData(data) => self.deserialize_trades(&data),
427            DydxWsTradesMessage::Unsubscribed(data) => {
428                let topic = self.topic_from_msg(&DydxWsChannel::Trades, &data.id);
429                self.subscriptions.confirm_unsubscribe(&topic);
430                vec![]
431            }
432        }
433    }
434
435    fn handle_markets_feed(&self, msg: DydxWsMarketsMessage) -> Vec<DydxWsOutputMessage> {
436        match msg {
437            DydxWsMarketsMessage::Subscribed(data) => {
438                let topic = self.topic_from_msg(&DydxWsChannel::Markets, &data.id);
439                self.subscriptions.confirm_subscribe(&topic);
440                self.deserialize_markets(&data)
441            }
442            DydxWsMarketsMessage::ChannelData(data) => self.deserialize_markets(&data),
443            DydxWsMarketsMessage::Unsubscribed(data) => {
444                let topic = self.topic_from_msg(&DydxWsChannel::Markets, &data.id);
445                self.subscriptions.confirm_unsubscribe(&topic);
446                vec![]
447            }
448        }
449    }
450
451    fn handle_candles_feed(&self, msg: DydxWsCandlesMessage) -> Vec<DydxWsOutputMessage> {
452        match msg {
453            DydxWsCandlesMessage::Subscribed(data) => {
454                let topic = self.topic_from_msg(&DydxWsChannel::Candles, &data.id);
455                self.subscriptions.confirm_subscribe(&topic);
456                vec![]
457            }
458            DydxWsCandlesMessage::ChannelData(data) => self.deserialize_candles(&data),
459            DydxWsCandlesMessage::Unsubscribed(data) => {
460                let topic = self.topic_from_msg(&DydxWsChannel::Candles, &data.id);
461                self.subscriptions.confirm_unsubscribe(&topic);
462                vec![]
463            }
464        }
465    }
466
467    fn handle_parent_subaccounts(
468        &self,
469        msg: DydxWsParentSubaccountsMessage,
470    ) -> Vec<DydxWsOutputMessage> {
471        match msg {
472            DydxWsParentSubaccountsMessage::Subscribed(data) => {
473                let topic = self.topic_from_msg(&DydxWsChannel::ParentSubaccounts, &data.id);
474                self.subscriptions.confirm_subscribe(&topic);
475                self.deserialize_parent_subaccounts(&data)
476            }
477            DydxWsParentSubaccountsMessage::ChannelData(data) => {
478                self.deserialize_parent_subaccounts(&data)
479            }
480            DydxWsParentSubaccountsMessage::Unsubscribed(data) => {
481                let topic = self.topic_from_msg(&DydxWsChannel::ParentSubaccounts, &data.id);
482                self.subscriptions.confirm_unsubscribe(&topic);
483                vec![]
484            }
485        }
486    }
487
488    fn handle_block_height_feed(&self, msg: DydxWsBlockHeightMessage) -> Vec<DydxWsOutputMessage> {
489        match msg {
490            DydxWsBlockHeightMessage::Subscribed(data) => {
491                let topic =
492                    self.topic_from_msg(&DydxWsChannel::BlockHeight, &Some(data.id.clone()));
493                self.subscriptions.confirm_subscribe(&topic);
494
495                match data.contents.height.parse::<u64>() {
496                    Ok(height) => vec![DydxWsOutputMessage::BlockHeight {
497                        height,
498                        time: data.contents.time,
499                    }],
500                    Err(e) => {
501                        log::warn!("Failed to parse block height from subscription: {e}");
502                        vec![]
503                    }
504                }
505            }
506            DydxWsBlockHeightMessage::ChannelData(data) => {
507                match data.contents.block_height.parse::<u64>() {
508                    Ok(height) => vec![DydxWsOutputMessage::BlockHeight {
509                        height,
510                        time: data.contents.time,
511                    }],
512                    Err(e) => {
513                        log::warn!("Failed to parse block height from channel data: {e}");
514                        vec![]
515                    }
516                }
517            }
518            DydxWsBlockHeightMessage::Unsubscribed(data) => {
519                let topic = self.topic_from_msg(&DydxWsChannel::BlockHeight, &data.id);
520                self.subscriptions.confirm_unsubscribe(&topic);
521                vec![]
522            }
523        }
524    }
525
526    fn deserialize_trades(&self, data: &DydxWsChannelDataMsg) -> Vec<DydxWsOutputMessage> {
527        let Some(id) = data.id.clone() else {
528            log::error!("Missing id for trades channel");
529            return vec![];
530        };
531
532        match serde_json::from_value::<DydxTradeContents>(data.contents.clone()) {
533            Ok(contents) => vec![DydxWsOutputMessage::Trades { id, contents }],
534            Err(e) => {
535                log::error!("Failed to deserialize trade contents: {e}");
536                vec![]
537            }
538        }
539    }
540
541    fn deserialize_orderbook_snapshot(
542        &self,
543        data: &DydxWsChannelDataMsg,
544    ) -> Vec<DydxWsOutputMessage> {
545        let Some(id) = data.id.clone() else {
546            log::error!("Missing id for orderbook snapshot");
547            return vec![];
548        };
549
550        match serde_json::from_value::<DydxOrderbookSnapshotContents>(data.contents.clone()) {
551            Ok(contents) => vec![DydxWsOutputMessage::OrderbookSnapshot { id, contents }],
552            Err(e) => {
553                log::error!("Failed to deserialize orderbook snapshot: {e}");
554                vec![]
555            }
556        }
557    }
558
559    fn deserialize_orderbook_update(
560        &self,
561        data: &DydxWsChannelDataMsg,
562    ) -> Vec<DydxWsOutputMessage> {
563        let Some(id) = data.id.clone() else {
564            log::error!("Missing id for orderbook update");
565            return vec![];
566        };
567
568        match serde_json::from_value::<DydxOrderbookContents>(data.contents.clone()) {
569            Ok(contents) => vec![DydxWsOutputMessage::OrderbookUpdate { id, contents }],
570            Err(e) => {
571                log::error!("Failed to deserialize orderbook contents: {e}");
572                vec![]
573            }
574        }
575    }
576
577    fn deserialize_orderbook_batch(
578        &self,
579        data: &DydxWsChannelBatchDataMsg,
580    ) -> Vec<DydxWsOutputMessage> {
581        let Some(id) = data.id.clone() else {
582            log::error!("Missing id for orderbook batch");
583            return vec![];
584        };
585
586        match serde_json::from_value::<Vec<DydxOrderbookContents>>(data.contents.clone()) {
587            Ok(updates) => vec![DydxWsOutputMessage::OrderbookBatch { id, updates }],
588            Err(e) => {
589                log::error!("Failed to deserialize orderbook batch: {e}");
590                vec![]
591            }
592        }
593    }
594
595    fn deserialize_candles(&self, data: &DydxWsChannelDataMsg) -> Vec<DydxWsOutputMessage> {
596        let Some(id) = data.id.clone() else {
597            log::error!("Missing id for candles channel");
598            return vec![];
599        };
600
601        match serde_json::from_value::<DydxCandle>(data.contents.clone()) {
602            Ok(contents) => vec![DydxWsOutputMessage::Candles { id, contents }],
603            Err(e) => {
604                log::error!("Failed to deserialize candle contents: {e}");
605                vec![]
606            }
607        }
608    }
609
610    fn deserialize_markets(&self, data: &DydxWsChannelDataMsg) -> Vec<DydxWsOutputMessage> {
611        match serde_json::from_value::<DydxMarketsContents>(data.contents.clone()) {
612            Ok(contents) => vec![DydxWsOutputMessage::Markets(contents)],
613            Err(e) => {
614                log::error!("Failed to deserialize markets contents: {e}");
615                vec![]
616            }
617        }
618    }
619
620    fn deserialize_parent_subaccounts(
621        &self,
622        data: &DydxWsChannelDataMsg,
623    ) -> Vec<DydxWsOutputMessage> {
624        match serde_json::from_value::<DydxWsSubaccountsChannelContents>(data.contents.clone()) {
625            Ok(contents) => {
626                let has_orders = contents.orders.as_ref().is_some_and(|o| !o.is_empty());
627                let has_fills = contents.fills.as_ref().is_some_and(|f| !f.is_empty());
628
629                if has_orders || has_fills {
630                    let channel_data = DydxWsSubaccountsChannelData {
631                        connection_id: data.connection_id.clone(),
632                        message_id: data.message_id,
633                        id: data.id.clone().unwrap_or_default(),
634                        version: data.version.clone().unwrap_or_default(),
635                        contents,
636                    };
637                    vec![DydxWsOutputMessage::SubaccountsChannelData(Box::new(
638                        channel_data,
639                    ))]
640                } else {
641                    vec![]
642                }
643            }
644            Err(e) => {
645                log::error!("Failed to deserialize parent subaccounts contents: {e}");
646                vec![]
647            }
648        }
649    }
650
651    async fn handle_command(&mut self, command: HandlerCommand) -> bool {
652        match command {
653            HandlerCommand::RegisterSubscription {
654                topic,
655                subscription,
656            } => {
657                self.subscription_messages.insert(topic, subscription);
658            }
659            HandlerCommand::UnregisterSubscription { topic } => {
660                self.subscription_messages.remove(&topic);
661            }
662            HandlerCommand::SendText(text) => {
663                if let Err(e) = self
664                    .send_with_retry(text, Some(DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
665                    .await
666                {
667                    log::error!("Failed to send WebSocket text after retries: {e}");
668                }
669            }
670            HandlerCommand::Disconnect => {
671                log::debug!("Disconnect command received");
672                self.client.disconnect().await;
673                return true;
674            }
675        }
676        false
677    }
678
679    fn topic_from_msg(&self, channel: &DydxWsChannel, id: &Option<String>) -> String {
680        match id {
681            Some(id) => format!(
682                "{}{}{}",
683                channel.as_ref(),
684                self.subscriptions.delimiter(),
685                id
686            ),
687            None => channel.as_ref().to_string(),
688        }
689    }
690
691    fn clear_state(&mut self) {
692        let buffer_count = self.message_buffer.len();
693        let seq_count = self.book_sequence.len();
694        self.message_buffer.clear();
695        self.book_sequence.clear();
696        log::debug!(
697            "Cleared reconnect state: message_buffer={buffer_count}, book_sequence={seq_count}"
698        );
699    }
700
701    async fn replay_subscriptions(&self) -> DydxWsResult<()> {
702        let topics = self.subscriptions.all_topics();
703        for topic in topics {
704            let Some(subscription) = self.subscription_messages.get(&topic).cloned() else {
705                log::warn!("No preserved subscription message for topic: {topic}");
706                continue;
707            };
708
709            let payload = serde_json::to_string(&subscription)?;
710            self.subscriptions.mark_subscribe(&topic);
711
712            if let Err(e) = self
713                .send_with_retry(payload, Some(DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
714                .await
715            {
716                self.subscriptions.mark_failure(&topic);
717                return Err(e);
718            }
719        }
720
721        Ok(())
722    }
723
724    /// Handles control messages from the fallback parsing path.
725    ///
726    /// Channel data is handled directly via `handle_feed_message()`.
727    ///
728    /// # Errors
729    ///
730    /// Returns an error if the message cannot be processed.
731    pub async fn handle_message(
732        &mut self,
733        msg: DydxWsMessage,
734    ) -> DydxWsResult<Vec<DydxWsOutputMessage>> {
735        match msg {
736            DydxWsMessage::Connected(_) => {
737                log::info!("dYdX WebSocket connected");
738                Ok(vec![])
739            }
740            DydxWsMessage::Subscribed(sub) => {
741                log::debug!("Subscribed to {} (id: {:?})", sub.channel, sub.id);
742                let topic = self.topic_from_msg(&sub.channel, &sub.id);
743                self.subscriptions.confirm_subscribe(&topic);
744                Ok(vec![])
745            }
746            DydxWsMessage::SubaccountsSubscribed(msg) => {
747                log::debug!("Subaccounts subscribed with initial state (fallback path)");
748                let topic = self.topic_from_msg(&DydxWsChannel::Subaccounts, &Some(msg.id.clone()));
749                self.subscriptions.confirm_subscribe(&topic);
750                Ok(vec![DydxWsOutputMessage::SubaccountSubscribed(Box::new(
751                    msg,
752                ))])
753            }
754            DydxWsMessage::Unsubscribed(unsub) => {
755                log::debug!("Unsubscribed from {} (id: {:?})", unsub.channel, unsub.id);
756                let topic = self.topic_from_msg(&unsub.channel, &unsub.id);
757                self.subscriptions.confirm_unsubscribe(&topic);
758                Ok(vec![])
759            }
760            DydxWsMessage::Error(err) => Ok(vec![DydxWsOutputMessage::Error(err)]),
761            DydxWsMessage::Reconnected => {
762                self.clear_state();
763
764                if let Err(e) = self.replay_subscriptions().await {
765                    log::error!("Failed to replay subscriptions after reconnect message: {e}");
766                }
767                Ok(vec![DydxWsOutputMessage::Reconnected])
768            }
769            DydxWsMessage::Pong => Ok(vec![]),
770            DydxWsMessage::Raw(_) => Ok(vec![]),
771        }
772    }
773}
774
775/// Determines if a dYdX WebSocket error should trigger a retry.
776fn should_retry_dydx_error(error: &DydxWsError) -> bool {
777    match error {
778        DydxWsError::Transport(_) => true,
779        DydxWsError::Send(_) => true,
780        DydxWsError::ClientError(msg) => {
781            let msg_lower = msg.to_lowercase();
782            msg_lower.contains("timeout")
783                || msg_lower.contains("timed out")
784                || msg_lower.contains("connection")
785                || msg_lower.contains("network")
786        }
787        DydxWsError::NotConnected
788        | DydxWsError::Json(_)
789        | DydxWsError::Parse(_)
790        | DydxWsError::Authentication(_)
791        | DydxWsError::Subscription(_)
792        | DydxWsError::Venue(_) => false,
793    }
794}
795
796/// Creates a timeout error for the retry manager.
797fn create_dydx_timeout_error(msg: String) -> DydxWsError {
798    DydxWsError::ClientError(msg)
799}