1use std::{
23 collections::VecDeque,
24 fmt::Debug,
25 sync::{
26 Arc,
27 atomic::{AtomicBool, Ordering},
28 },
29};
30
31use ahash::AHashMap;
32use nautilus_network::{
33 RECONNECTED,
34 retry::{RetryManager, create_websocket_retry_manager},
35 websocket::{SubscriptionState, WebSocketClient},
36};
37use tokio_tungstenite::tungstenite::Message;
38use ustr::Ustr;
39
40use super::{
41 DydxWsError, DydxWsResult,
42 client::DYDX_RATE_LIMIT_KEY_SUBSCRIPTION,
43 enums::{DydxWsChannel, DydxWsMessage, DydxWsOutputMessage},
44 error::DydxWebSocketError,
45 messages::{
46 DydxCandle, DydxMarketsContents, DydxOrderbookContents, DydxOrderbookSnapshotContents,
47 DydxSubscription, DydxTradeContents, DydxWsBlockHeightMessage, DydxWsCandlesMessage,
48 DydxWsChannelBatchDataMsg, DydxWsChannelDataMsg, DydxWsConnectedMsg, DydxWsFeedMessage,
49 DydxWsGenericMsg, DydxWsMarketsMessage, DydxWsOrderbookMessage,
50 DydxWsParentSubaccountsMessage, DydxWsSubaccountsChannelContents,
51 DydxWsSubaccountsChannelData, DydxWsSubaccountsMessage, DydxWsSubaccountsSubscribed,
52 DydxWsSubscriptionMsg, DydxWsTradesMessage,
53 },
54};
55
56#[derive(Debug, Clone)]
58pub enum HandlerCommand {
59 RegisterSubscription {
61 topic: String,
62 subscription: DydxSubscription,
63 },
64 UnregisterSubscription { topic: String },
66 SendText(String),
68 Disconnect,
70}
71
72pub struct FeedHandler {
77 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
78 out_tx: tokio::sync::mpsc::UnboundedSender<DydxWsOutputMessage>,
79 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
80 client: WebSocketClient,
81 signal: Arc<AtomicBool>,
82 retry_manager: RetryManager<DydxWsError>,
83 subscriptions: SubscriptionState,
84 subscription_messages: AHashMap<String, DydxSubscription>,
85 message_buffer: VecDeque<DydxWsOutputMessage>,
86 book_sequence: AHashMap<String, u64>,
87}
88
89impl Debug for FeedHandler {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 f.debug_struct(stringify!(FeedHandler))
92 .field("subscriptions", &self.subscriptions.len())
93 .finish_non_exhaustive()
94 }
95}
96
97impl FeedHandler {
98 #[must_use]
100 pub fn new(
101 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
102 out_tx: tokio::sync::mpsc::UnboundedSender<DydxWsOutputMessage>,
103 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
104 client: WebSocketClient,
105 signal: Arc<AtomicBool>,
106 subscriptions: SubscriptionState,
107 ) -> Self {
108 Self {
109 cmd_rx,
110 out_tx,
111 raw_rx,
112 client,
113 signal,
114 retry_manager: create_websocket_retry_manager(),
115 subscriptions,
116 subscription_messages: AHashMap::new(),
117 message_buffer: VecDeque::new(),
118 book_sequence: AHashMap::new(),
119 }
120 }
121
122 async fn send_with_retry(
123 &self,
124 payload: String,
125 rate_limit_keys: Option<&[Ustr]>,
126 ) -> Result<(), DydxWsError> {
127 let keys_owned: Option<Vec<Ustr>> = rate_limit_keys.map(|k| k.to_vec());
128 self.retry_manager
129 .execute_with_retry(
130 "websocket_send",
131 || {
132 let payload = payload.clone();
133 let keys = keys_owned.clone();
134 async move {
135 self.client
136 .send_text(payload, keys.as_deref())
137 .await
138 .map_err(|e| DydxWsError::ClientError(format!("Send failed: {e}")))
139 }
140 },
141 should_retry_dydx_error,
142 create_dydx_timeout_error,
143 )
144 .await
145 }
146
147 pub async fn run(&mut self) {
154 log::debug!("WebSocket handler started");
155
156 loop {
157 if !self.message_buffer.is_empty() {
159 let msg = self.message_buffer.pop_front().unwrap();
160 if self.out_tx.send(msg).is_err() {
161 log::debug!("Receiver dropped, stopping handler");
162 break;
163 }
164 continue;
165 }
166
167 tokio::select! {
168 Some(cmd) = self.cmd_rx.recv() => {
169 if self.handle_command(cmd).await {
170 break;
171 }
172 }
173
174 Some(msg) = self.raw_rx.recv() => {
175 log::trace!("Handler received raw message");
176 let msgs = self.process_raw_message(msg).await;
177 if !msgs.is_empty() {
178 let mut iter = msgs.into_iter();
179 let first = iter.next().expect("non-empty vec has first element");
181 self.message_buffer.extend(iter);
182 log::trace!("Handler sending message: {:?}", std::mem::discriminant(&first));
183 if self.out_tx.send(first).is_err() {
184 log::debug!("Receiver dropped, stopping handler");
185 break;
186 }
187 }
188 }
189
190 else => {
191 log::debug!("Handler shutting down: channels closed");
192 break;
193 }
194 }
195
196 if self.signal.load(Ordering::Acquire) {
197 log::debug!("Handler received stop signal");
198 break;
199 }
200 }
201 }
202
203 async fn process_raw_message(&mut self, msg: Message) -> Vec<DydxWsOutputMessage> {
204 match msg {
205 Message::Text(txt) => {
206 if txt == RECONNECTED {
207 self.clear_state();
208
209 if let Err(e) = self.replay_subscriptions().await {
210 log::error!("Failed to replay subscriptions after reconnect: {e}");
211 }
212 return vec![DydxWsOutputMessage::Reconnected];
213 }
214
215 match serde_json::from_str::<DydxWsFeedMessage>(&txt) {
217 Ok(feed_msg) => {
218 return self.handle_feed_message(feed_msg);
219 }
220 Err(e) => {
221 if txt.contains("v4_subaccounts") {
222 log::warn!(
223 "[WS_DESER] Failed to parse v4_subaccounts as DydxWsFeedMessage: {e}\nRaw: {txt}"
224 );
225 }
226 }
227 }
228
229 match serde_json::from_str::<serde_json::Value>(&txt) {
231 Ok(val) => match serde_json::from_value::<DydxWsGenericMsg>(val.clone()) {
232 Ok(meta) => {
233 let result = if meta.is_connected() {
234 serde_json::from_value::<DydxWsConnectedMsg>(val)
235 .map(DydxWsMessage::Connected)
236 } else if meta.is_subscribed() {
237 log::debug!("Processing subscribed message via fallback path");
238
239 if let Ok(sub_msg) =
240 serde_json::from_value::<DydxWsSubscriptionMsg>(val.clone())
241 {
242 if sub_msg.channel == DydxWsChannel::Subaccounts {
243 log::debug!("Parsing subaccounts subscription (fallback)");
244 serde_json::from_value::<DydxWsSubaccountsSubscribed>(val)
245 .map(DydxWsMessage::SubaccountsSubscribed)
246 .or_else(|e| {
247 log::warn!(
248 "Failed to parse subaccounts subscription: {e}"
249 );
250 Ok(DydxWsMessage::Subscribed(sub_msg))
251 })
252 } else {
253 Ok(DydxWsMessage::Subscribed(sub_msg))
254 }
255 } else {
256 serde_json::from_value::<DydxWsSubscriptionMsg>(val)
257 .map(DydxWsMessage::Subscribed)
258 }
259 } else if meta.is_unsubscribed() {
260 serde_json::from_value::<DydxWsSubscriptionMsg>(val)
261 .map(DydxWsMessage::Unsubscribed)
262 } else if meta.is_error() {
263 serde_json::from_value::<DydxWebSocketError>(val)
264 .map(DydxWsMessage::Error)
265 } else if meta.is_unknown() {
266 log::warn!("Received unknown WebSocket message type: {txt}",);
267 Ok(DydxWsMessage::Raw(val))
268 } else {
269 Ok(DydxWsMessage::Raw(val))
270 };
271
272 match result {
273 Ok(dydx_msg) => self.handle_dydx_message(dydx_msg).await,
274 Err(e) => {
275 log::error!(
276 "Failed to parse WebSocket message: {e}. Message type: {:?}, Channel: {:?}. Raw: {txt}",
277 meta.msg_type,
278 meta.channel,
279 );
280 vec![]
281 }
282 }
283 }
284 Err(e) => {
285 log::error!(
286 "Failed to parse WebSocket message envelope (DydxWsGenericMsg): {e}\nRaw JSON:\n{txt}"
287 );
288 vec![]
289 }
290 },
291 Err(e) => {
292 let err = DydxWebSocketError::from_message(e.to_string());
293 vec![DydxWsOutputMessage::Error(err)]
294 }
295 }
296 }
297 Message::Pong(_data) => vec![],
298 Message::Ping(_data) => vec![],
299 Message::Binary(_bin) => vec![],
300 Message::Close(_frame) => {
301 log::info!("WebSocket close frame received");
302 vec![]
303 }
304 Message::Frame(_) => vec![],
305 }
306 }
307
308 async fn handle_dydx_message(&mut self, msg: DydxWsMessage) -> Vec<DydxWsOutputMessage> {
309 match self.handle_message(msg).await {
310 Ok(msgs) => msgs,
311 Err(e) => {
312 log::error!("Error handling message: {e}");
313 vec![]
314 }
315 }
316 }
317
318 fn handle_feed_message(&mut self, feed_msg: DydxWsFeedMessage) -> Vec<DydxWsOutputMessage> {
319 log::trace!(
320 "Handling feed message: {:?}",
321 std::mem::discriminant(&feed_msg)
322 );
323
324 match feed_msg {
325 DydxWsFeedMessage::Subaccounts(msg) => self.handle_subaccounts(msg),
326 DydxWsFeedMessage::Orderbook(msg) => self.handle_orderbook(msg),
327 DydxWsFeedMessage::Trades(msg) => self.handle_trades(msg),
328 DydxWsFeedMessage::Markets(msg) => self.handle_markets_feed(msg),
329 DydxWsFeedMessage::Candles(msg) => self.handle_candles_feed(msg),
330 DydxWsFeedMessage::ParentSubaccounts(msg) => self.handle_parent_subaccounts(msg),
331 DydxWsFeedMessage::BlockHeight(msg) => self.handle_block_height_feed(msg),
332 }
333 }
334
335 fn handle_subaccounts(&self, msg: DydxWsSubaccountsMessage) -> Vec<DydxWsOutputMessage> {
336 match msg {
337 DydxWsSubaccountsMessage::Subscribed(data) => {
338 let topic =
339 self.topic_from_msg(&DydxWsChannel::Subaccounts, &Some(data.id.clone()));
340 self.subscriptions.confirm_subscribe(&topic);
341 log::debug!("Forwarding subaccount subscription to execution client");
342 vec![DydxWsOutputMessage::SubaccountSubscribed(Box::new(data))]
343 }
344 DydxWsSubaccountsMessage::ChannelData(data) => {
345 let has_orders = data.contents.orders.as_ref().is_some_and(|o| !o.is_empty());
346 let has_fills = data.contents.fills.as_ref().is_some_and(|f| !f.is_empty());
347
348 if has_orders || has_fills {
349 log::debug!(
350 "Received {} order(s), {} fill(s) - forwarding to execution client",
351 data.contents.orders.as_ref().map_or(0, |o| o.len()),
352 data.contents.fills.as_ref().map_or(0, |f| f.len())
353 );
354 vec![DydxWsOutputMessage::SubaccountsChannelData(Box::new(data))]
355 } else {
356 vec![]
357 }
358 }
359 DydxWsSubaccountsMessage::Unsubscribed(data) => {
360 let topic = self.topic_from_msg(&DydxWsChannel::Subaccounts, &data.id);
361 self.subscriptions.confirm_unsubscribe(&topic);
362 vec![]
363 }
364 }
365 }
366
367 fn handle_orderbook(&mut self, msg: DydxWsOrderbookMessage) -> Vec<DydxWsOutputMessage> {
368 match msg {
369 DydxWsOrderbookMessage::Subscribed(data) => {
370 let topic = self.topic_from_msg(&DydxWsChannel::Orderbook, &data.id);
371 self.subscriptions.confirm_subscribe(&topic);
372
373 if let Some(id) = &data.id {
374 self.book_sequence.insert(id.clone(), data.message_id);
375 }
376
377 self.deserialize_orderbook_snapshot(&data)
378 }
379 DydxWsOrderbookMessage::ChannelData(data) => {
380 if let Some(id) = &data.id {
381 if let Some(last_id) = self.book_sequence.get(id)
382 && data.message_id <= *last_id
383 {
384 log::warn!(
385 "Orderbook sequence regression for {id}: last {last_id}, received {}",
386 data.message_id
387 );
388 }
389 self.book_sequence.insert(id.clone(), data.message_id);
390 }
391 self.deserialize_orderbook_update(&data)
392 }
393 DydxWsOrderbookMessage::ChannelBatchData(data) => {
394 if let Some(id) = &data.id {
395 if let Some(last_id) = self.book_sequence.get(id)
396 && data.message_id <= *last_id
397 {
398 log::warn!(
399 "Orderbook batch sequence regression for {id}: last {last_id}, received {}",
400 data.message_id
401 );
402 }
403 self.book_sequence.insert(id.clone(), data.message_id);
404 }
405 self.deserialize_orderbook_batch(&data)
406 }
407 DydxWsOrderbookMessage::Unsubscribed(data) => {
408 let topic = self.topic_from_msg(&DydxWsChannel::Orderbook, &data.id);
409 self.subscriptions.confirm_unsubscribe(&topic);
410
411 if let Some(id) = &data.id {
412 self.book_sequence.remove(id);
413 }
414 vec![]
415 }
416 }
417 }
418
419 fn handle_trades(&self, msg: DydxWsTradesMessage) -> Vec<DydxWsOutputMessage> {
420 match msg {
421 DydxWsTradesMessage::Subscribed(data) => {
422 let topic = self.topic_from_msg(&DydxWsChannel::Trades, &data.id);
423 self.subscriptions.confirm_subscribe(&topic);
424 self.deserialize_trades(&data)
425 }
426 DydxWsTradesMessage::ChannelData(data) => self.deserialize_trades(&data),
427 DydxWsTradesMessage::Unsubscribed(data) => {
428 let topic = self.topic_from_msg(&DydxWsChannel::Trades, &data.id);
429 self.subscriptions.confirm_unsubscribe(&topic);
430 vec![]
431 }
432 }
433 }
434
435 fn handle_markets_feed(&self, msg: DydxWsMarketsMessage) -> Vec<DydxWsOutputMessage> {
436 match msg {
437 DydxWsMarketsMessage::Subscribed(data) => {
438 let topic = self.topic_from_msg(&DydxWsChannel::Markets, &data.id);
439 self.subscriptions.confirm_subscribe(&topic);
440 self.deserialize_markets(&data)
441 }
442 DydxWsMarketsMessage::ChannelData(data) => self.deserialize_markets(&data),
443 DydxWsMarketsMessage::Unsubscribed(data) => {
444 let topic = self.topic_from_msg(&DydxWsChannel::Markets, &data.id);
445 self.subscriptions.confirm_unsubscribe(&topic);
446 vec![]
447 }
448 }
449 }
450
451 fn handle_candles_feed(&self, msg: DydxWsCandlesMessage) -> Vec<DydxWsOutputMessage> {
452 match msg {
453 DydxWsCandlesMessage::Subscribed(data) => {
454 let topic = self.topic_from_msg(&DydxWsChannel::Candles, &data.id);
455 self.subscriptions.confirm_subscribe(&topic);
456 vec![]
457 }
458 DydxWsCandlesMessage::ChannelData(data) => self.deserialize_candles(&data),
459 DydxWsCandlesMessage::Unsubscribed(data) => {
460 let topic = self.topic_from_msg(&DydxWsChannel::Candles, &data.id);
461 self.subscriptions.confirm_unsubscribe(&topic);
462 vec![]
463 }
464 }
465 }
466
467 fn handle_parent_subaccounts(
468 &self,
469 msg: DydxWsParentSubaccountsMessage,
470 ) -> Vec<DydxWsOutputMessage> {
471 match msg {
472 DydxWsParentSubaccountsMessage::Subscribed(data) => {
473 let topic = self.topic_from_msg(&DydxWsChannel::ParentSubaccounts, &data.id);
474 self.subscriptions.confirm_subscribe(&topic);
475 self.deserialize_parent_subaccounts(&data)
476 }
477 DydxWsParentSubaccountsMessage::ChannelData(data) => {
478 self.deserialize_parent_subaccounts(&data)
479 }
480 DydxWsParentSubaccountsMessage::Unsubscribed(data) => {
481 let topic = self.topic_from_msg(&DydxWsChannel::ParentSubaccounts, &data.id);
482 self.subscriptions.confirm_unsubscribe(&topic);
483 vec![]
484 }
485 }
486 }
487
488 fn handle_block_height_feed(&self, msg: DydxWsBlockHeightMessage) -> Vec<DydxWsOutputMessage> {
489 match msg {
490 DydxWsBlockHeightMessage::Subscribed(data) => {
491 let topic =
492 self.topic_from_msg(&DydxWsChannel::BlockHeight, &Some(data.id.clone()));
493 self.subscriptions.confirm_subscribe(&topic);
494
495 match data.contents.height.parse::<u64>() {
496 Ok(height) => vec![DydxWsOutputMessage::BlockHeight {
497 height,
498 time: data.contents.time,
499 }],
500 Err(e) => {
501 log::warn!("Failed to parse block height from subscription: {e}");
502 vec![]
503 }
504 }
505 }
506 DydxWsBlockHeightMessage::ChannelData(data) => {
507 match data.contents.block_height.parse::<u64>() {
508 Ok(height) => vec![DydxWsOutputMessage::BlockHeight {
509 height,
510 time: data.contents.time,
511 }],
512 Err(e) => {
513 log::warn!("Failed to parse block height from channel data: {e}");
514 vec![]
515 }
516 }
517 }
518 DydxWsBlockHeightMessage::Unsubscribed(data) => {
519 let topic = self.topic_from_msg(&DydxWsChannel::BlockHeight, &data.id);
520 self.subscriptions.confirm_unsubscribe(&topic);
521 vec![]
522 }
523 }
524 }
525
526 fn deserialize_trades(&self, data: &DydxWsChannelDataMsg) -> Vec<DydxWsOutputMessage> {
527 let Some(id) = data.id.clone() else {
528 log::error!("Missing id for trades channel");
529 return vec![];
530 };
531
532 match serde_json::from_value::<DydxTradeContents>(data.contents.clone()) {
533 Ok(contents) => vec![DydxWsOutputMessage::Trades { id, contents }],
534 Err(e) => {
535 log::error!("Failed to deserialize trade contents: {e}");
536 vec![]
537 }
538 }
539 }
540
541 fn deserialize_orderbook_snapshot(
542 &self,
543 data: &DydxWsChannelDataMsg,
544 ) -> Vec<DydxWsOutputMessage> {
545 let Some(id) = data.id.clone() else {
546 log::error!("Missing id for orderbook snapshot");
547 return vec![];
548 };
549
550 match serde_json::from_value::<DydxOrderbookSnapshotContents>(data.contents.clone()) {
551 Ok(contents) => vec![DydxWsOutputMessage::OrderbookSnapshot { id, contents }],
552 Err(e) => {
553 log::error!("Failed to deserialize orderbook snapshot: {e}");
554 vec![]
555 }
556 }
557 }
558
559 fn deserialize_orderbook_update(
560 &self,
561 data: &DydxWsChannelDataMsg,
562 ) -> Vec<DydxWsOutputMessage> {
563 let Some(id) = data.id.clone() else {
564 log::error!("Missing id for orderbook update");
565 return vec![];
566 };
567
568 match serde_json::from_value::<DydxOrderbookContents>(data.contents.clone()) {
569 Ok(contents) => vec![DydxWsOutputMessage::OrderbookUpdate { id, contents }],
570 Err(e) => {
571 log::error!("Failed to deserialize orderbook contents: {e}");
572 vec![]
573 }
574 }
575 }
576
577 fn deserialize_orderbook_batch(
578 &self,
579 data: &DydxWsChannelBatchDataMsg,
580 ) -> Vec<DydxWsOutputMessage> {
581 let Some(id) = data.id.clone() else {
582 log::error!("Missing id for orderbook batch");
583 return vec![];
584 };
585
586 match serde_json::from_value::<Vec<DydxOrderbookContents>>(data.contents.clone()) {
587 Ok(updates) => vec![DydxWsOutputMessage::OrderbookBatch { id, updates }],
588 Err(e) => {
589 log::error!("Failed to deserialize orderbook batch: {e}");
590 vec![]
591 }
592 }
593 }
594
595 fn deserialize_candles(&self, data: &DydxWsChannelDataMsg) -> Vec<DydxWsOutputMessage> {
596 let Some(id) = data.id.clone() else {
597 log::error!("Missing id for candles channel");
598 return vec![];
599 };
600
601 match serde_json::from_value::<DydxCandle>(data.contents.clone()) {
602 Ok(contents) => vec![DydxWsOutputMessage::Candles { id, contents }],
603 Err(e) => {
604 log::error!("Failed to deserialize candle contents: {e}");
605 vec![]
606 }
607 }
608 }
609
610 fn deserialize_markets(&self, data: &DydxWsChannelDataMsg) -> Vec<DydxWsOutputMessage> {
611 match serde_json::from_value::<DydxMarketsContents>(data.contents.clone()) {
612 Ok(contents) => vec![DydxWsOutputMessage::Markets(contents)],
613 Err(e) => {
614 log::error!("Failed to deserialize markets contents: {e}");
615 vec![]
616 }
617 }
618 }
619
620 fn deserialize_parent_subaccounts(
621 &self,
622 data: &DydxWsChannelDataMsg,
623 ) -> Vec<DydxWsOutputMessage> {
624 match serde_json::from_value::<DydxWsSubaccountsChannelContents>(data.contents.clone()) {
625 Ok(contents) => {
626 let has_orders = contents.orders.as_ref().is_some_and(|o| !o.is_empty());
627 let has_fills = contents.fills.as_ref().is_some_and(|f| !f.is_empty());
628
629 if has_orders || has_fills {
630 let channel_data = DydxWsSubaccountsChannelData {
631 connection_id: data.connection_id.clone(),
632 message_id: data.message_id,
633 id: data.id.clone().unwrap_or_default(),
634 version: data.version.clone().unwrap_or_default(),
635 contents,
636 };
637 vec![DydxWsOutputMessage::SubaccountsChannelData(Box::new(
638 channel_data,
639 ))]
640 } else {
641 vec![]
642 }
643 }
644 Err(e) => {
645 log::error!("Failed to deserialize parent subaccounts contents: {e}");
646 vec![]
647 }
648 }
649 }
650
651 async fn handle_command(&mut self, command: HandlerCommand) -> bool {
652 match command {
653 HandlerCommand::RegisterSubscription {
654 topic,
655 subscription,
656 } => {
657 self.subscription_messages.insert(topic, subscription);
658 }
659 HandlerCommand::UnregisterSubscription { topic } => {
660 self.subscription_messages.remove(&topic);
661 }
662 HandlerCommand::SendText(text) => {
663 if let Err(e) = self
664 .send_with_retry(text, Some(DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
665 .await
666 {
667 log::error!("Failed to send WebSocket text after retries: {e}");
668 }
669 }
670 HandlerCommand::Disconnect => {
671 log::debug!("Disconnect command received");
672 self.client.disconnect().await;
673 return true;
674 }
675 }
676 false
677 }
678
679 fn topic_from_msg(&self, channel: &DydxWsChannel, id: &Option<String>) -> String {
680 match id {
681 Some(id) => format!(
682 "{}{}{}",
683 channel.as_ref(),
684 self.subscriptions.delimiter(),
685 id
686 ),
687 None => channel.as_ref().to_string(),
688 }
689 }
690
691 fn clear_state(&mut self) {
692 let buffer_count = self.message_buffer.len();
693 let seq_count = self.book_sequence.len();
694 self.message_buffer.clear();
695 self.book_sequence.clear();
696 log::debug!(
697 "Cleared reconnect state: message_buffer={buffer_count}, book_sequence={seq_count}"
698 );
699 }
700
701 async fn replay_subscriptions(&self) -> DydxWsResult<()> {
702 let topics = self.subscriptions.all_topics();
703 for topic in topics {
704 let Some(subscription) = self.subscription_messages.get(&topic).cloned() else {
705 log::warn!("No preserved subscription message for topic: {topic}");
706 continue;
707 };
708
709 let payload = serde_json::to_string(&subscription)?;
710 self.subscriptions.mark_subscribe(&topic);
711
712 if let Err(e) = self
713 .send_with_retry(payload, Some(DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
714 .await
715 {
716 self.subscriptions.mark_failure(&topic);
717 return Err(e);
718 }
719 }
720
721 Ok(())
722 }
723
724 pub async fn handle_message(
732 &mut self,
733 msg: DydxWsMessage,
734 ) -> DydxWsResult<Vec<DydxWsOutputMessage>> {
735 match msg {
736 DydxWsMessage::Connected(_) => {
737 log::info!("dYdX WebSocket connected");
738 Ok(vec![])
739 }
740 DydxWsMessage::Subscribed(sub) => {
741 log::debug!("Subscribed to {} (id: {:?})", sub.channel, sub.id);
742 let topic = self.topic_from_msg(&sub.channel, &sub.id);
743 self.subscriptions.confirm_subscribe(&topic);
744 Ok(vec![])
745 }
746 DydxWsMessage::SubaccountsSubscribed(msg) => {
747 log::debug!("Subaccounts subscribed with initial state (fallback path)");
748 let topic = self.topic_from_msg(&DydxWsChannel::Subaccounts, &Some(msg.id.clone()));
749 self.subscriptions.confirm_subscribe(&topic);
750 Ok(vec![DydxWsOutputMessage::SubaccountSubscribed(Box::new(
751 msg,
752 ))])
753 }
754 DydxWsMessage::Unsubscribed(unsub) => {
755 log::debug!("Unsubscribed from {} (id: {:?})", unsub.channel, unsub.id);
756 let topic = self.topic_from_msg(&unsub.channel, &unsub.id);
757 self.subscriptions.confirm_unsubscribe(&topic);
758 Ok(vec![])
759 }
760 DydxWsMessage::Error(err) => Ok(vec![DydxWsOutputMessage::Error(err)]),
761 DydxWsMessage::Reconnected => {
762 self.clear_state();
763
764 if let Err(e) = self.replay_subscriptions().await {
765 log::error!("Failed to replay subscriptions after reconnect message: {e}");
766 }
767 Ok(vec![DydxWsOutputMessage::Reconnected])
768 }
769 DydxWsMessage::Pong => Ok(vec![]),
770 DydxWsMessage::Raw(_) => Ok(vec![]),
771 }
772 }
773}
774
775fn should_retry_dydx_error(error: &DydxWsError) -> bool {
777 match error {
778 DydxWsError::Transport(_) => true,
779 DydxWsError::Send(_) => true,
780 DydxWsError::ClientError(msg) => {
781 let msg_lower = msg.to_lowercase();
782 msg_lower.contains("timeout")
783 || msg_lower.contains("timed out")
784 || msg_lower.contains("connection")
785 || msg_lower.contains("network")
786 }
787 DydxWsError::NotConnected
788 | DydxWsError::Json(_)
789 | DydxWsError::Parse(_)
790 | DydxWsError::Authentication(_)
791 | DydxWsError::Subscription(_)
792 | DydxWsError::Venue(_) => false,
793 }
794}
795
796fn create_dydx_timeout_error(msg: String) -> DydxWsError {
798 DydxWsError::ClientError(msg)
799}