1pub static DYDX_RATE_LIMIT_KEY_SUBSCRIPTION: LazyLock<[Ustr; 1]> =
37 LazyLock::new(|| [Ustr::from("subscription")]);
38
39pub const DYDX_WS_TOPIC_DELIMITER: char = ':';
41
42pub static DYDX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
44 Quota::per_second(NonZeroU32::new(2).expect("non-zero")).expect("valid constant")
45});
46
47use std::{
48 num::NonZeroU32,
49 sync::{
50 Arc, LazyLock,
51 atomic::{AtomicBool, AtomicU8, Ordering},
52 },
53 time::Duration,
54};
55
56use arc_swap::ArcSwap;
57use dashmap::DashMap;
58use nautilus_common::live::get_runtime;
59use nautilus_model::{
60 data::BarType,
61 identifiers::{AccountId, InstrumentId},
62 instruments::InstrumentAny,
63};
64use nautilus_network::{
65 mode::ConnectionMode,
66 ratelimiter::quota::Quota,
67 websocket::{
68 AuthTracker, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
69 channel_message_handler,
70 },
71};
72use ustr::Ustr;
73
74use super::{
75 dispatch::DydxWsDispatchState,
76 enums::{DydxWsChannel, DydxWsOperation, DydxWsOutputMessage},
77 error::{DydxWsError, DydxWsResult},
78 handler::{FeedHandler, HandlerCommand},
79 messages::DydxSubscription,
80};
81use crate::{
82 common::{credential::DydxCredential, instrument_cache::InstrumentCache},
83 execution::encoder::ClientOrderIdEncoder,
84};
85
86#[derive(Debug)]
110#[cfg_attr(
111 feature = "python",
112 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx", from_py_object)
113)]
114#[cfg_attr(
115 feature = "python",
116 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.dydx")
117)]
118pub struct DydxWebSocketClient {
119 url: String,
120 credential: Option<Arc<DydxCredential>>,
121 requires_auth: bool,
122 auth_tracker: AuthTracker,
123 subscriptions: SubscriptionState,
124 connection_mode: Arc<ArcSwap<AtomicU8>>,
125 signal: Arc<AtomicBool>,
126 instrument_cache: Arc<InstrumentCache>,
127 account_id: Option<AccountId>,
128 heartbeat: Option<u64>,
129 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
130 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DydxWsOutputMessage>>,
131 handler_task: Option<Arc<tokio::task::JoinHandle<()>>>,
132 encoder: Arc<ClientOrderIdEncoder>,
133 bar_types: Arc<DashMap<String, BarType>>,
134 bars_timestamp_on_close: Arc<AtomicBool>,
135 ws_dispatch_state: Arc<DydxWsDispatchState>,
136 transport_backend: TransportBackend,
137 proxy_url: Option<String>,
138}
139
140impl Clone for DydxWebSocketClient {
141 fn clone(&self) -> Self {
142 Self {
143 url: self.url.clone(),
144 credential: self.credential.clone(),
145 requires_auth: self.requires_auth,
146 auth_tracker: self.auth_tracker.clone(),
147 subscriptions: self.subscriptions.clone(),
148 connection_mode: self.connection_mode.clone(),
149 signal: self.signal.clone(),
150 instrument_cache: self.instrument_cache.clone(),
151 account_id: self.account_id,
152 heartbeat: self.heartbeat,
153 cmd_tx: self.cmd_tx.clone(),
154 out_rx: None, handler_task: None, encoder: self.encoder.clone(),
157 bar_types: self.bar_types.clone(),
158 bars_timestamp_on_close: self.bars_timestamp_on_close.clone(),
159 ws_dispatch_state: self.ws_dispatch_state.clone(),
160 transport_backend: self.transport_backend,
161 proxy_url: self.proxy_url.clone(),
162 }
163 }
164}
165
166impl DydxWebSocketClient {
167 #[must_use]
172 pub fn new_public(url: String, heartbeat: Option<u64>, proxy_url: Option<String>) -> Self {
173 Self::new_public_with_cache(
174 url,
175 Arc::new(InstrumentCache::new()),
176 heartbeat,
177 TransportBackend::default(),
178 proxy_url,
179 )
180 }
181
182 #[must_use]
186 pub fn new_public_with_cache(
187 url: String,
188 instrument_cache: Arc<InstrumentCache>,
189 heartbeat: Option<u64>,
190 transport_backend: TransportBackend,
191 proxy_url: Option<String>,
192 ) -> Self {
193 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
195
196 Self {
197 url,
198 credential: None,
199 requires_auth: false,
200 auth_tracker: AuthTracker::new(),
201 subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
202 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
203 ConnectionMode::Closed as u8,
204 ))),
205 signal: Arc::new(AtomicBool::new(false)),
206 instrument_cache,
207 account_id: None,
208 heartbeat,
209 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
210 out_rx: None,
211 handler_task: None,
212 encoder: Arc::new(ClientOrderIdEncoder::new()),
213 bar_types: Arc::new(DashMap::new()),
214 bars_timestamp_on_close: Arc::new(AtomicBool::new(true)),
215 ws_dispatch_state: Arc::new(DydxWsDispatchState::default()),
216 transport_backend,
217 proxy_url,
218 }
219 }
220
221 #[must_use]
226 pub fn new_private(
227 url: String,
228 credential: DydxCredential,
229 account_id: AccountId,
230 heartbeat: Option<u64>,
231 proxy_url: Option<String>,
232 ) -> Self {
233 Self::new_private_with_cache(
234 url,
235 credential,
236 account_id,
237 Arc::new(InstrumentCache::new()),
238 heartbeat,
239 TransportBackend::default(),
240 proxy_url,
241 )
242 }
243
244 #[must_use]
248 pub fn new_private_with_cache(
249 url: String,
250 credential: DydxCredential,
251 account_id: AccountId,
252 instrument_cache: Arc<InstrumentCache>,
253 heartbeat: Option<u64>,
254 transport_backend: TransportBackend,
255 proxy_url: Option<String>,
256 ) -> Self {
257 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
259
260 Self {
261 url,
262 credential: Some(Arc::new(credential)),
263 requires_auth: true,
264 auth_tracker: AuthTracker::new(),
265 subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
266 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
267 ConnectionMode::Closed as u8,
268 ))),
269 signal: Arc::new(AtomicBool::new(false)),
270 instrument_cache,
271 account_id: Some(account_id),
272 heartbeat,
273 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
274 out_rx: None,
275 handler_task: None,
276 encoder: Arc::new(ClientOrderIdEncoder::new()),
277 bar_types: Arc::new(DashMap::new()),
278 bars_timestamp_on_close: Arc::new(AtomicBool::new(true)),
279 ws_dispatch_state: Arc::new(DydxWsDispatchState::default()),
280 transport_backend,
281 proxy_url,
282 }
283 }
284
285 #[must_use]
287 pub fn credential(&self) -> Option<&Arc<DydxCredential>> {
288 self.credential.as_ref()
289 }
290
291 #[must_use]
293 pub fn is_connected(&self) -> bool {
294 let mode = self.connection_mode.load();
295 let mode_u8 = mode.load(Ordering::Relaxed);
296 matches!(
297 mode_u8,
298 x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
299 )
300 }
301
302 #[must_use]
304 pub fn url(&self) -> &str {
305 &self.url
306 }
307
308 #[must_use]
312 pub fn connection_mode_atomic(&self) -> Arc<ArcSwap<AtomicU8>> {
313 self.connection_mode.clone()
314 }
315
316 pub fn set_account_id(&mut self, account_id: AccountId) {
318 self.account_id = Some(account_id);
319 }
320
321 #[must_use]
323 pub fn account_id(&self) -> Option<AccountId> {
324 self.account_id
325 }
326
327 pub fn set_instrument_cache(&mut self, cache: Arc<InstrumentCache>) {
333 self.instrument_cache = cache;
334 }
335
336 pub fn cache_instrument(&self, instrument: InstrumentAny) {
340 self.instrument_cache.insert_instrument_only(instrument);
341 }
342
343 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
347 log::debug!(
348 "Caching {} instruments in WebSocket client",
349 instruments.len()
350 );
351 self.instrument_cache.insert_instruments_only(instruments);
352 }
353
354 #[must_use]
356 pub fn instrument_cache(&self) -> &Arc<InstrumentCache> {
357 &self.instrument_cache
358 }
359
360 #[must_use]
362 pub fn encoder(&self) -> &Arc<ClientOrderIdEncoder> {
363 &self.encoder
364 }
365
366 #[must_use]
368 pub fn bar_types(&self) -> &Arc<DashMap<String, BarType>> {
369 &self.bar_types
370 }
371
372 pub fn ws_dispatch_state(&self) -> &Arc<DydxWsDispatchState> {
374 &self.ws_dispatch_state
375 }
376
377 pub fn set_bars_timestamp_on_close(&self, value: bool) {
379 self.bars_timestamp_on_close.store(value, Ordering::Relaxed);
380 }
381
382 #[must_use]
384 pub fn bars_timestamp_on_close(&self) -> bool {
385 self.bars_timestamp_on_close.load(Ordering::Relaxed)
386 }
387
388 #[must_use]
392 pub fn all_instruments(&self) -> Vec<InstrumentAny> {
393 self.instrument_cache.all_instruments()
394 }
395
396 #[must_use]
398 pub fn cached_instruments_count(&self) -> usize {
399 self.instrument_cache.len()
400 }
401
402 #[must_use]
406 pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
407 self.instrument_cache.get(instrument_id)
408 }
409
410 #[must_use]
414 pub fn get_instrument_by_market(&self, ticker: &str) -> Option<InstrumentAny> {
415 self.instrument_cache.get_by_market(ticker)
416 }
417
418 pub fn take_receiver(
421 &mut self,
422 ) -> Option<tokio::sync::mpsc::UnboundedReceiver<DydxWsOutputMessage>> {
423 self.out_rx.take()
424 }
425
426 pub fn stream(
434 &mut self,
435 ) -> impl futures_util::Stream<Item = DydxWsOutputMessage> + Send + 'static {
436 let mut rx = self
437 .out_rx
438 .take()
439 .expect("Message stream receiver already taken or not connected");
440
441 async_stream::stream! {
442 while let Some(msg) = rx.recv().await {
443 yield msg;
444 }
445 }
446 }
447
448 pub async fn connect(&mut self) -> DydxWsResult<()> {
457 if self.is_connected() {
458 return Ok(());
459 }
460
461 self.signal.store(false, Ordering::Release);
463
464 let (message_handler, raw_rx) = channel_message_handler();
465
466 let cfg = WebSocketConfig {
467 url: self.url.clone(),
468 headers: vec![],
469 heartbeat: self.heartbeat,
470 heartbeat_msg: None,
471 reconnect_timeout_ms: Some(15_000),
472 reconnect_delay_initial_ms: Some(250),
473 reconnect_delay_max_ms: Some(5_000),
474 reconnect_backoff_factor: Some(2.0),
475 reconnect_jitter_ms: Some(200),
476 reconnect_max_attempts: None,
477 idle_timeout_ms: None,
478 backend: self.transport_backend,
479 proxy_url: self.proxy_url.clone(),
480 };
481
482 let client = WebSocketClient::connect(
483 cfg,
484 Some(message_handler),
485 None,
486 None,
487 vec![],
488 Some(*DYDX_WS_SUBSCRIPTION_QUOTA),
489 )
490 .await
491 .map_err(|e| DydxWsError::Transport(e.to_string()))?;
492
493 self.connection_mode.store(client.connection_mode_atomic());
495
496 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
498 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<DydxWsOutputMessage>();
499
500 {
502 let mut guard = self.cmd_tx.write().await;
503 *guard = cmd_tx;
504 }
505 self.out_rx = Some(out_rx);
506
507 let signal = self.signal.clone();
509 let subscriptions = self.subscriptions.clone();
510
511 let handler_task = get_runtime().spawn(async move {
512 let mut handler =
513 FeedHandler::new(cmd_rx, out_tx, raw_rx, client, signal, subscriptions);
514 handler.run().await;
515 });
516
517 self.handler_task = Some(Arc::new(handler_task));
518 log::info!("Connected dYdX WebSocket: {}", self.url);
519 Ok(())
520 }
521
522 pub async fn disconnect(&mut self) -> DydxWsResult<()> {
531 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
533 log::debug!("Failed to send disconnect command: {e}");
534 }
535
536 self.signal.store(true, Ordering::Release);
538
539 if let Some(task_handle) = self.handler_task.take() {
541 match Arc::try_unwrap(task_handle) {
542 Ok(handle) => {
543 let abort_handle = handle.abort_handle();
544 match tokio::time::timeout(Duration::from_secs(2), handle).await {
545 Ok(Ok(())) => log::debug!("Handler task completed"),
546 Ok(Err(e)) => log::error!("Handler task error: {e:?}"),
547 Err(_) => {
548 log::warn!("Timeout waiting for handler task, aborting");
549 abort_handle.abort();
550 }
551 }
552 }
553 Err(arc_handle) => {
554 log::debug!("Cannot unwrap task handle, aborting");
555 arc_handle.abort();
556 }
557 }
558 }
559
560 self.connection_mode
562 .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
563
564 self.out_rx = None;
565
566 log::debug!("Disconnected dYdX WebSocket");
567 Ok(())
568 }
569
570 async fn send_text_inner(&self, text: &str) -> DydxWsResult<()> {
571 self.cmd_tx
572 .read()
573 .await
574 .send(HandlerCommand::SendText(text.to_string()))
575 .map_err(|e| {
576 DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
577 })?;
578 Ok(())
579 }
580
581 pub fn send_command(&self, cmd: HandlerCommand) -> DydxWsResult<()> {
587 if let Ok(guard) = self.cmd_tx.try_read() {
588 guard.send(cmd).map_err(|e| {
589 DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
590 })?;
591 } else {
592 return Err(DydxWsError::Transport(
593 "Failed to acquire lock on command channel".to_string(),
594 ));
595 }
596 Ok(())
597 }
598
599 fn ticker_from_instrument_id(instrument_id: &InstrumentId) -> String {
600 let mut s = instrument_id.symbol.as_str().to_string();
601 if let Some(stripped) = s.strip_suffix("-PERP") {
602 s = stripped.to_string();
603 }
604 s
605 }
606
607 fn topic(channel: DydxWsChannel, id: Option<&str>) -> String {
608 match id {
609 Some(id) => format!("{}{}{}", channel.as_ref(), DYDX_WS_TOPIC_DELIMITER, id),
610 None => channel.as_ref().to_string(),
611 }
612 }
613
614 async fn send_and_track_subscribe(
615 &self,
616 sub: DydxSubscription,
617 topic: &str,
618 ) -> DydxWsResult<()> {
619 self.subscriptions.mark_subscribe(topic);
620
621 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
622 let _ = cmd_tx.send(HandlerCommand::RegisterSubscription {
623 topic: topic.to_string(),
624 subscription: sub.clone(),
625 });
626 }
627
628 let payload = serde_json::to_string(&sub)?;
629 if let Err(e) = self.send_text_inner(&payload).await {
630 self.subscriptions.mark_failure(topic);
631 self.subscriptions.remove_reference(topic);
632 return Err(e);
633 }
634 Ok(())
635 }
636
637 async fn send_and_track_unsubscribe(
638 &self,
639 sub: DydxSubscription,
640 topic: &str,
641 ) -> DydxWsResult<()> {
642 self.subscriptions.mark_unsubscribe(topic);
643
644 let payload = serde_json::to_string(&sub)?;
645 if let Err(e) = self.send_text_inner(&payload).await {
646 self.subscriptions.add_reference(topic);
647 self.subscriptions.mark_subscribe(topic);
648 return Err(e);
649 }
650
651 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
652 let _ = cmd_tx.send(HandlerCommand::UnregisterSubscription {
653 topic: topic.to_string(),
654 });
655 }
656
657 Ok(())
658 }
659
660 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
670 let ticker = Self::ticker_from_instrument_id(&instrument_id);
671 let topic = Self::topic(DydxWsChannel::Trades, Some(&ticker));
672 if !self.subscriptions.add_reference(&topic) {
673 return Ok(());
674 }
675
676 let sub = DydxSubscription {
677 op: DydxWsOperation::Subscribe,
678 channel: DydxWsChannel::Trades,
679 id: Some(ticker),
680 };
681
682 self.send_and_track_subscribe(sub, &topic).await
683 }
684
685 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
691 let ticker = Self::ticker_from_instrument_id(&instrument_id);
692 let topic = Self::topic(DydxWsChannel::Trades, Some(&ticker));
693 if !self.subscriptions.remove_reference(&topic) {
694 return Ok(());
695 }
696
697 let sub = DydxSubscription {
698 op: DydxWsOperation::Unsubscribe,
699 channel: DydxWsChannel::Trades,
700 id: Some(ticker),
701 };
702
703 self.send_and_track_unsubscribe(sub, &topic).await
704 }
705
706 pub async fn subscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
716 let ticker = Self::ticker_from_instrument_id(&instrument_id);
717 let topic = Self::topic(DydxWsChannel::Orderbook, Some(&ticker));
718 if !self.subscriptions.add_reference(&topic) {
719 return Ok(());
720 }
721
722 let sub = DydxSubscription {
723 op: DydxWsOperation::Subscribe,
724 channel: DydxWsChannel::Orderbook,
725 id: Some(ticker),
726 };
727
728 self.send_and_track_subscribe(sub, &topic).await
729 }
730
731 pub async fn unsubscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
737 let ticker = Self::ticker_from_instrument_id(&instrument_id);
738 let topic = Self::topic(DydxWsChannel::Orderbook, Some(&ticker));
739 if !self.subscriptions.remove_reference(&topic) {
740 return Ok(());
741 }
742
743 let sub = DydxSubscription {
744 op: DydxWsOperation::Unsubscribe,
745 channel: DydxWsChannel::Orderbook,
746 id: Some(ticker),
747 };
748
749 self.send_and_track_unsubscribe(sub, &topic).await
750 }
751
752 pub async fn subscribe_candles(
762 &self,
763 instrument_id: InstrumentId,
764 resolution: &str,
765 ) -> DydxWsResult<()> {
766 let ticker = Self::ticker_from_instrument_id(&instrument_id);
767 let id = format!("{ticker}/{resolution}");
768 let topic = Self::topic(DydxWsChannel::Candles, Some(&id));
769 if !self.subscriptions.add_reference(&topic) {
770 return Ok(());
771 }
772
773 let sub = DydxSubscription {
774 op: DydxWsOperation::Subscribe,
775 channel: DydxWsChannel::Candles,
776 id: Some(id),
777 };
778
779 self.send_and_track_subscribe(sub, &topic).await
780 }
781
782 pub async fn unsubscribe_candles(
788 &self,
789 instrument_id: InstrumentId,
790 resolution: &str,
791 ) -> DydxWsResult<()> {
792 let ticker = Self::ticker_from_instrument_id(&instrument_id);
793 let id = format!("{ticker}/{resolution}");
794 let topic = Self::topic(DydxWsChannel::Candles, Some(&id));
795 if !self.subscriptions.remove_reference(&topic) {
796 return Ok(());
797 }
798
799 let sub = DydxSubscription {
800 op: DydxWsOperation::Unsubscribe,
801 channel: DydxWsChannel::Candles,
802 id: Some(id),
803 };
804
805 self.send_and_track_unsubscribe(sub, &topic).await
806 }
807
808 pub async fn subscribe_markets(&self) -> DydxWsResult<()> {
818 let topic = Self::topic(DydxWsChannel::Markets, None);
819 if !self.subscriptions.add_reference(&topic) {
820 return Ok(());
821 }
822
823 let sub = DydxSubscription {
824 op: DydxWsOperation::Subscribe,
825 channel: DydxWsChannel::Markets,
826 id: None,
827 };
828
829 self.send_and_track_subscribe(sub, &topic).await
830 }
831
832 pub async fn unsubscribe_markets(&self) -> DydxWsResult<()> {
838 let topic = Self::topic(DydxWsChannel::Markets, None);
839 if !self.subscriptions.remove_reference(&topic) {
840 return Ok(());
841 }
842
843 let sub = DydxSubscription {
844 op: DydxWsOperation::Unsubscribe,
845 channel: DydxWsChannel::Markets,
846 id: None,
847 };
848
849 self.send_and_track_unsubscribe(sub, &topic).await
850 }
851
852 pub async fn subscribe_subaccount(
866 &self,
867 address: &str,
868 subaccount_number: u32,
869 ) -> DydxWsResult<()> {
870 if !self.requires_auth {
871 return Err(DydxWsError::Authentication(
872 "Subaccount subscriptions require authentication. Use new_private() to create an authenticated client".to_string(),
873 ));
874 }
875 let id = format!("{address}/{subaccount_number}");
876 let topic = Self::topic(DydxWsChannel::Subaccounts, Some(&id));
877 if !self.subscriptions.add_reference(&topic) {
878 return Ok(());
879 }
880
881 let sub = DydxSubscription {
882 op: DydxWsOperation::Subscribe,
883 channel: DydxWsChannel::Subaccounts,
884 id: Some(id),
885 };
886
887 self.send_and_track_subscribe(sub, &topic).await
888 }
889
890 pub async fn unsubscribe_subaccount(
896 &self,
897 address: &str,
898 subaccount_number: u32,
899 ) -> DydxWsResult<()> {
900 let id = format!("{address}/{subaccount_number}");
901 let topic = Self::topic(DydxWsChannel::Subaccounts, Some(&id));
902 if !self.subscriptions.remove_reference(&topic) {
903 return Ok(());
904 }
905
906 let sub = DydxSubscription {
907 op: DydxWsOperation::Unsubscribe,
908 channel: DydxWsChannel::Subaccounts,
909 id: Some(id),
910 };
911
912 self.send_and_track_unsubscribe(sub, &topic).await
913 }
914
915 pub async fn subscribe_block_height(&self) -> DydxWsResult<()> {
925 let topic = Self::topic(DydxWsChannel::BlockHeight, None);
926 if !self.subscriptions.add_reference(&topic) {
927 return Ok(());
928 }
929
930 let sub = DydxSubscription {
931 op: DydxWsOperation::Subscribe,
932 channel: DydxWsChannel::BlockHeight,
933 id: None,
934 };
935
936 self.send_and_track_subscribe(sub, &topic).await
937 }
938
939 pub async fn unsubscribe_block_height(&self) -> DydxWsResult<()> {
945 let topic = Self::topic(DydxWsChannel::BlockHeight, None);
946 if !self.subscriptions.remove_reference(&topic) {
947 return Ok(());
948 }
949
950 let sub = DydxSubscription {
951 op: DydxWsOperation::Unsubscribe,
952 channel: DydxWsChannel::BlockHeight,
953 id: None,
954 };
955
956 self.send_and_track_unsubscribe(sub, &topic).await
957 }
958}