Skip to main content

nautilus_dydx/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for dYdX v4 API.
17//!
18//! This client provides streaming connectivity to dYdX's WebSocket API for both
19//! public market data and private account updates.
20//!
21//! # Authentication
22//!
23//! dYdX v4 uses Cosmos SDK wallet-based authentication. Unlike traditional exchanges:
24//! - **Public channels** require no authentication.
25//! - **Private channels** (subaccounts) only require the wallet address in the subscription message.
26//! - No signature or API key is needed for WebSocket connections themselves.
27//!
28//! # References
29//!
30//! <https://docs.dydx.trade/developers/indexer/websockets>
31
32/// Pre-interned rate limit key for subscription operations (subscribe/unsubscribe).
33///
34/// dYdX allows up to 2 subscription messages per second per connection.
35/// See: <https://docs.dydx.trade/developers/indexer/websockets#rate-limits>
36pub static DYDX_RATE_LIMIT_KEY_SUBSCRIPTION: LazyLock<[Ustr; 1]> =
37    LazyLock::new(|| [Ustr::from("subscription")]);
38
39/// WebSocket topic delimiter for dYdX (channel:symbol format).
40pub const DYDX_WS_TOPIC_DELIMITER: char = ':';
41
42/// Default WebSocket quota for dYdX subscriptions (2 messages per second).
43pub static DYDX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
44    Quota::per_second(NonZeroU32::new(2).expect("non-zero")).expect("valid constant")
45});
46
47use std::{
48    num::NonZeroU32,
49    sync::{
50        Arc, LazyLock,
51        atomic::{AtomicBool, AtomicU8, Ordering},
52    },
53    time::Duration,
54};
55
56use arc_swap::ArcSwap;
57use dashmap::DashMap;
58use nautilus_common::live::get_runtime;
59use nautilus_model::{
60    data::BarType,
61    identifiers::{AccountId, InstrumentId},
62    instruments::InstrumentAny,
63};
64use nautilus_network::{
65    mode::ConnectionMode,
66    ratelimiter::quota::Quota,
67    websocket::{
68        AuthTracker, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
69        channel_message_handler,
70    },
71};
72use ustr::Ustr;
73
74use super::{
75    dispatch::DydxWsDispatchState,
76    enums::{DydxWsChannel, DydxWsOperation, DydxWsOutputMessage},
77    error::{DydxWsError, DydxWsResult},
78    handler::{FeedHandler, HandlerCommand},
79    messages::DydxSubscription,
80};
81use crate::{
82    common::{credential::DydxCredential, instrument_cache::InstrumentCache},
83    execution::encoder::ClientOrderIdEncoder,
84};
85
86/// WebSocket client for dYdX v4 market data and account streams.
87///
88/// # Authentication
89///
90/// dYdX v4 does not require traditional API key signatures for WebSocket connections.
91/// Public channels work without any credentials. Private channels (subaccounts) only
92/// need the wallet address included in the subscription message.
93///
94/// The [`DydxCredential`] stored in this client is used for:
95/// - Providing the wallet address for private channel subscriptions
96/// - Transaction signing (when placing orders via the validator node)
97///
98/// It is **NOT** used for WebSocket message signing or authentication.
99///
100/// # Architecture
101///
102/// This client follows a two-layer architecture:
103/// - **Outer client** (this struct): Orchestrates connection and maintains Python-accessible state
104/// - **Inner handler**: Owns WebSocketClient exclusively and processes messages in a dedicated task
105///
106/// Communication uses lock-free channels:
107/// - Commands flow from client → handler via `cmd_tx`
108/// - Parsed events flow from handler → client via `out_rx`
109#[derive(Debug)]
110#[cfg_attr(
111    feature = "python",
112    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx", from_py_object)
113)]
114#[cfg_attr(
115    feature = "python",
116    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.dydx")
117)]
118pub struct DydxWebSocketClient {
119    url: String,
120    credential: Option<Arc<DydxCredential>>,
121    requires_auth: bool,
122    auth_tracker: AuthTracker,
123    subscriptions: SubscriptionState,
124    connection_mode: Arc<ArcSwap<AtomicU8>>,
125    signal: Arc<AtomicBool>,
126    instrument_cache: Arc<InstrumentCache>,
127    account_id: Option<AccountId>,
128    heartbeat: Option<u64>,
129    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
130    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DydxWsOutputMessage>>,
131    handler_task: Option<Arc<tokio::task::JoinHandle<()>>>,
132    encoder: Arc<ClientOrderIdEncoder>,
133    bar_types: Arc<DashMap<String, BarType>>,
134    bars_timestamp_on_close: Arc<AtomicBool>,
135    ws_dispatch_state: Arc<DydxWsDispatchState>,
136    transport_backend: TransportBackend,
137    proxy_url: Option<String>,
138}
139
140impl Clone for DydxWebSocketClient {
141    fn clone(&self) -> Self {
142        Self {
143            url: self.url.clone(),
144            credential: self.credential.clone(),
145            requires_auth: self.requires_auth,
146            auth_tracker: self.auth_tracker.clone(),
147            subscriptions: self.subscriptions.clone(),
148            connection_mode: self.connection_mode.clone(),
149            signal: self.signal.clone(),
150            instrument_cache: self.instrument_cache.clone(),
151            account_id: self.account_id,
152            heartbeat: self.heartbeat,
153            cmd_tx: self.cmd_tx.clone(),
154            out_rx: None,       // Cannot clone receiver - only one owner allowed
155            handler_task: None, // Cannot clone task handle
156            encoder: self.encoder.clone(),
157            bar_types: self.bar_types.clone(),
158            bars_timestamp_on_close: self.bars_timestamp_on_close.clone(),
159            ws_dispatch_state: self.ws_dispatch_state.clone(),
160            transport_backend: self.transport_backend,
161            proxy_url: self.proxy_url.clone(),
162        }
163    }
164}
165
166impl DydxWebSocketClient {
167    /// Creates a new public WebSocket client for market data.
168    ///
169    /// This creates a new independent instrument cache. To share a cache with
170    /// the HTTP client, use [`Self::new_public_with_cache`] instead.
171    #[must_use]
172    pub fn new_public(url: String, heartbeat: Option<u64>, proxy_url: Option<String>) -> Self {
173        Self::new_public_with_cache(
174            url,
175            Arc::new(InstrumentCache::new()),
176            heartbeat,
177            TransportBackend::default(),
178            proxy_url,
179        )
180    }
181
182    /// Creates a new public WebSocket client with a shared instrument cache.
183    ///
184    /// Use this when you want to share instrument data with the HTTP client.
185    #[must_use]
186    pub fn new_public_with_cache(
187        url: String,
188        instrument_cache: Arc<InstrumentCache>,
189        heartbeat: Option<u64>,
190        transport_backend: TransportBackend,
191        proxy_url: Option<String>,
192    ) -> Self {
193        // Create dummy command channel (will be replaced on connect)
194        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
195
196        Self {
197            url,
198            credential: None,
199            requires_auth: false,
200            auth_tracker: AuthTracker::new(),
201            subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
202            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
203                ConnectionMode::Closed as u8,
204            ))),
205            signal: Arc::new(AtomicBool::new(false)),
206            instrument_cache,
207            account_id: None,
208            heartbeat,
209            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
210            out_rx: None,
211            handler_task: None,
212            encoder: Arc::new(ClientOrderIdEncoder::new()),
213            bar_types: Arc::new(DashMap::new()),
214            bars_timestamp_on_close: Arc::new(AtomicBool::new(true)),
215            ws_dispatch_state: Arc::new(DydxWsDispatchState::default()),
216            transport_backend,
217            proxy_url,
218        }
219    }
220
221    /// Creates a new private WebSocket client for account updates.
222    ///
223    /// This creates a new independent instrument cache. To share a cache with
224    /// the HTTP client, use [`Self::new_private_with_cache`] instead.
225    #[must_use]
226    pub fn new_private(
227        url: String,
228        credential: DydxCredential,
229        account_id: AccountId,
230        heartbeat: Option<u64>,
231        proxy_url: Option<String>,
232    ) -> Self {
233        Self::new_private_with_cache(
234            url,
235            credential,
236            account_id,
237            Arc::new(InstrumentCache::new()),
238            heartbeat,
239            TransportBackend::default(),
240            proxy_url,
241        )
242    }
243
244    /// Creates a new private WebSocket client with a shared instrument cache.
245    ///
246    /// Use this when you want to share instrument data with the HTTP client.
247    #[must_use]
248    pub fn new_private_with_cache(
249        url: String,
250        credential: DydxCredential,
251        account_id: AccountId,
252        instrument_cache: Arc<InstrumentCache>,
253        heartbeat: Option<u64>,
254        transport_backend: TransportBackend,
255        proxy_url: Option<String>,
256    ) -> Self {
257        // Create dummy command channel (will be replaced on connect)
258        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
259
260        Self {
261            url,
262            credential: Some(Arc::new(credential)),
263            requires_auth: true,
264            auth_tracker: AuthTracker::new(),
265            subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
266            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
267                ConnectionMode::Closed as u8,
268            ))),
269            signal: Arc::new(AtomicBool::new(false)),
270            instrument_cache,
271            account_id: Some(account_id),
272            heartbeat,
273            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
274            out_rx: None,
275            handler_task: None,
276            encoder: Arc::new(ClientOrderIdEncoder::new()),
277            bar_types: Arc::new(DashMap::new()),
278            bars_timestamp_on_close: Arc::new(AtomicBool::new(true)),
279            ws_dispatch_state: Arc::new(DydxWsDispatchState::default()),
280            transport_backend,
281            proxy_url,
282        }
283    }
284
285    /// Returns the credential associated with this client, if any.
286    #[must_use]
287    pub fn credential(&self) -> Option<&Arc<DydxCredential>> {
288        self.credential.as_ref()
289    }
290
291    /// Returns `true` when the client is connected.
292    #[must_use]
293    pub fn is_connected(&self) -> bool {
294        let mode = self.connection_mode.load();
295        let mode_u8 = mode.load(Ordering::Relaxed);
296        matches!(
297            mode_u8,
298            x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
299        )
300    }
301
302    /// Returns the URL of this WebSocket client.
303    #[must_use]
304    pub fn url(&self) -> &str {
305        &self.url
306    }
307
308    /// Returns a clone of the connection mode atomic reference.
309    ///
310    /// This is primarily used for Python bindings that need to monitor connection state.
311    #[must_use]
312    pub fn connection_mode_atomic(&self) -> Arc<ArcSwap<AtomicU8>> {
313        self.connection_mode.clone()
314    }
315
316    /// Sets the account ID for account message parsing.
317    pub fn set_account_id(&mut self, account_id: AccountId) {
318        self.account_id = Some(account_id);
319    }
320
321    /// Returns the account ID if set.
322    #[must_use]
323    pub fn account_id(&self) -> Option<AccountId> {
324        self.account_id
325    }
326
327    /// Replaces the instrument cache with an externally shared one.
328    ///
329    /// Use this to share the HTTP client's cache (which includes CLOB pair ID
330    /// and market ticker indices) with the WebSocket client. Must be called
331    /// before `connect()`.
332    pub fn set_instrument_cache(&mut self, cache: Arc<InstrumentCache>) {
333        self.instrument_cache = cache;
334    }
335
336    /// Caches a single instrument.
337    ///
338    /// Any existing instrument with the same ID will be replaced.
339    pub fn cache_instrument(&self, instrument: InstrumentAny) {
340        self.instrument_cache.insert_instrument_only(instrument);
341    }
342
343    /// Caches multiple instruments.
344    ///
345    /// Any existing instruments with the same IDs will be replaced.
346    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
347        log::debug!(
348            "Caching {} instruments in WebSocket client",
349            instruments.len()
350        );
351        self.instrument_cache.insert_instruments_only(instruments);
352    }
353
354    /// Returns a reference to the shared instrument cache.
355    #[must_use]
356    pub fn instrument_cache(&self) -> &Arc<InstrumentCache> {
357        &self.instrument_cache
358    }
359
360    /// Returns a reference to the shared client order ID encoder.
361    #[must_use]
362    pub fn encoder(&self) -> &Arc<ClientOrderIdEncoder> {
363        &self.encoder
364    }
365
366    /// Returns a reference to the bar type registrations map.
367    #[must_use]
368    pub fn bar_types(&self) -> &Arc<DashMap<String, BarType>> {
369        &self.bar_types
370    }
371
372    /// Returns a reference to the shared WebSocket dispatch state.
373    pub fn ws_dispatch_state(&self) -> &Arc<DydxWsDispatchState> {
374        &self.ws_dispatch_state
375    }
376
377    /// Sets whether bar timestamps use the close time.
378    pub fn set_bars_timestamp_on_close(&self, value: bool) {
379        self.bars_timestamp_on_close.store(value, Ordering::Relaxed);
380    }
381
382    /// Returns whether bar timestamps use the close time.
383    #[must_use]
384    pub fn bars_timestamp_on_close(&self) -> bool {
385        self.bars_timestamp_on_close.load(Ordering::Relaxed)
386    }
387
388    /// Returns all cached instruments.
389    ///
390    /// This is a snapshot of the current cache contents.
391    #[must_use]
392    pub fn all_instruments(&self) -> Vec<InstrumentAny> {
393        self.instrument_cache.all_instruments()
394    }
395
396    /// Returns the number of cached instruments.
397    #[must_use]
398    pub fn cached_instruments_count(&self) -> usize {
399        self.instrument_cache.len()
400    }
401
402    /// Retrieves an instrument from the cache by InstrumentId.
403    ///
404    /// Returns `None` if the instrument is not found.
405    #[must_use]
406    pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
407        self.instrument_cache.get(instrument_id)
408    }
409
410    /// Retrieves an instrument from the cache by market ticker (e.g., "BTC-USD").
411    ///
412    /// Returns `None` if the instrument is not found.
413    #[must_use]
414    pub fn get_instrument_by_market(&self, ticker: &str) -> Option<InstrumentAny> {
415        self.instrument_cache.get_by_market(ticker)
416    }
417
418    /// Takes ownership of the inbound message receiver.
419    /// Returns None if the receiver has already been taken or not connected.
420    pub fn take_receiver(
421        &mut self,
422    ) -> Option<tokio::sync::mpsc::UnboundedReceiver<DydxWsOutputMessage>> {
423        self.out_rx.take()
424    }
425
426    /// Returns a stream of venue-specific WebSocket messages.
427    ///
428    /// Takes ownership of the message receiver and returns it as a `Stream`.
429    ///
430    /// # Panics
431    ///
432    /// Panics if the receiver has already been taken.
433    pub fn stream(
434        &mut self,
435    ) -> impl futures_util::Stream<Item = DydxWsOutputMessage> + Send + 'static {
436        let mut rx = self
437            .out_rx
438            .take()
439            .expect("Message stream receiver already taken or not connected");
440
441        async_stream::stream! {
442            while let Some(msg) = rx.recv().await {
443                yield msg;
444            }
445        }
446    }
447
448    /// Connects the websocket client in handler mode with automatic reconnection.
449    ///
450    /// Spawns a background handler task that owns the WebSocketClient and processes
451    /// raw messages into venue-specific [`DydxWsOutputMessage`] values.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if the connection cannot be established.
456    pub async fn connect(&mut self) -> DydxWsResult<()> {
457        if self.is_connected() {
458            return Ok(());
459        }
460
461        // Reset stop signal from any previous disconnect
462        self.signal.store(false, Ordering::Release);
463
464        let (message_handler, raw_rx) = channel_message_handler();
465
466        let cfg = WebSocketConfig {
467            url: self.url.clone(),
468            headers: vec![],
469            heartbeat: self.heartbeat,
470            heartbeat_msg: None,
471            reconnect_timeout_ms: Some(15_000),
472            reconnect_delay_initial_ms: Some(250),
473            reconnect_delay_max_ms: Some(5_000),
474            reconnect_backoff_factor: Some(2.0),
475            reconnect_jitter_ms: Some(200),
476            reconnect_max_attempts: None,
477            idle_timeout_ms: None,
478            backend: self.transport_backend,
479            proxy_url: self.proxy_url.clone(),
480        };
481
482        let client = WebSocketClient::connect(
483            cfg,
484            Some(message_handler),
485            None,
486            None,
487            vec![],
488            Some(*DYDX_WS_SUBSCRIPTION_QUOTA),
489        )
490        .await
491        .map_err(|e| DydxWsError::Transport(e.to_string()))?;
492
493        // Update connection state atomically
494        self.connection_mode.store(client.connection_mode_atomic());
495
496        // Create fresh channels for this connection
497        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
498        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<DydxWsOutputMessage>();
499
500        // Update the shared cmd_tx so all clones see the new sender
501        {
502            let mut guard = self.cmd_tx.write().await;
503            *guard = cmd_tx;
504        }
505        self.out_rx = Some(out_rx);
506
507        // Spawn handler task
508        let signal = self.signal.clone();
509        let subscriptions = self.subscriptions.clone();
510
511        let handler_task = get_runtime().spawn(async move {
512            let mut handler =
513                FeedHandler::new(cmd_rx, out_tx, raw_rx, client, signal, subscriptions);
514            handler.run().await;
515        });
516
517        self.handler_task = Some(Arc::new(handler_task));
518        log::info!("Connected dYdX WebSocket: {}", self.url);
519        Ok(())
520    }
521
522    /// Disconnects the websocket client gracefully.
523    ///
524    /// Sends a disconnect command to the handler, sets the stop signal, then
525    /// awaits the handler task with a timeout before aborting.
526    ///
527    /// # Errors
528    ///
529    /// Returns an error if the underlying client cannot be accessed.
530    pub async fn disconnect(&mut self) -> DydxWsResult<()> {
531        // 1. Send disconnect command so the handler can close the WS connection
532        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
533            log::debug!("Failed to send disconnect command: {e}");
534        }
535
536        // 2. Set stop signal with Release ordering
537        self.signal.store(true, Ordering::Release);
538
539        // 3. Await handler task with timeout, abort if stuck
540        if let Some(task_handle) = self.handler_task.take() {
541            match Arc::try_unwrap(task_handle) {
542                Ok(handle) => {
543                    let abort_handle = handle.abort_handle();
544                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
545                        Ok(Ok(())) => log::debug!("Handler task completed"),
546                        Ok(Err(e)) => log::error!("Handler task error: {e:?}"),
547                        Err(_) => {
548                            log::warn!("Timeout waiting for handler task, aborting");
549                            abort_handle.abort();
550                        }
551                    }
552                }
553                Err(arc_handle) => {
554                    log::debug!("Cannot unwrap task handle, aborting");
555                    arc_handle.abort();
556                }
557            }
558        }
559
560        // Reset connection mode to Closed
561        self.connection_mode
562            .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
563
564        self.out_rx = None;
565
566        log::debug!("Disconnected dYdX WebSocket");
567        Ok(())
568    }
569
570    async fn send_text_inner(&self, text: &str) -> DydxWsResult<()> {
571        self.cmd_tx
572            .read()
573            .await
574            .send(HandlerCommand::SendText(text.to_string()))
575            .map_err(|e| {
576                DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
577            })?;
578        Ok(())
579    }
580
581    /// Sends a command to the handler.
582    ///
583    /// # Errors
584    ///
585    /// Returns an error if the handler task has terminated.
586    pub fn send_command(&self, cmd: HandlerCommand) -> DydxWsResult<()> {
587        if let Ok(guard) = self.cmd_tx.try_read() {
588            guard.send(cmd).map_err(|e| {
589                DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
590            })?;
591        } else {
592            return Err(DydxWsError::Transport(
593                "Failed to acquire lock on command channel".to_string(),
594            ));
595        }
596        Ok(())
597    }
598
599    fn ticker_from_instrument_id(instrument_id: &InstrumentId) -> String {
600        let mut s = instrument_id.symbol.as_str().to_string();
601        if let Some(stripped) = s.strip_suffix("-PERP") {
602            s = stripped.to_string();
603        }
604        s
605    }
606
607    fn topic(channel: DydxWsChannel, id: Option<&str>) -> String {
608        match id {
609            Some(id) => format!("{}{}{}", channel.as_ref(), DYDX_WS_TOPIC_DELIMITER, id),
610            None => channel.as_ref().to_string(),
611        }
612    }
613
614    async fn send_and_track_subscribe(
615        &self,
616        sub: DydxSubscription,
617        topic: &str,
618    ) -> DydxWsResult<()> {
619        self.subscriptions.mark_subscribe(topic);
620
621        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
622            let _ = cmd_tx.send(HandlerCommand::RegisterSubscription {
623                topic: topic.to_string(),
624                subscription: sub.clone(),
625            });
626        }
627
628        let payload = serde_json::to_string(&sub)?;
629        if let Err(e) = self.send_text_inner(&payload).await {
630            self.subscriptions.mark_failure(topic);
631            self.subscriptions.remove_reference(topic);
632            return Err(e);
633        }
634        Ok(())
635    }
636
637    async fn send_and_track_unsubscribe(
638        &self,
639        sub: DydxSubscription,
640        topic: &str,
641    ) -> DydxWsResult<()> {
642        self.subscriptions.mark_unsubscribe(topic);
643
644        let payload = serde_json::to_string(&sub)?;
645        if let Err(e) = self.send_text_inner(&payload).await {
646            self.subscriptions.add_reference(topic);
647            self.subscriptions.mark_subscribe(topic);
648            return Err(e);
649        }
650
651        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
652            let _ = cmd_tx.send(HandlerCommand::UnregisterSubscription {
653                topic: topic.to_string(),
654            });
655        }
656
657        Ok(())
658    }
659
660    /// Subscribes to public trade updates for a specific instrument.
661    ///
662    /// # Errors
663    ///
664    /// Returns an error if the subscription request fails.
665    ///
666    /// # References
667    ///
668    /// <https://docs.dydx.trade/developers/indexer/websockets#trades-channel>
669    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
670        let ticker = Self::ticker_from_instrument_id(&instrument_id);
671        let topic = Self::topic(DydxWsChannel::Trades, Some(&ticker));
672        if !self.subscriptions.add_reference(&topic) {
673            return Ok(());
674        }
675
676        let sub = DydxSubscription {
677            op: DydxWsOperation::Subscribe,
678            channel: DydxWsChannel::Trades,
679            id: Some(ticker),
680        };
681
682        self.send_and_track_subscribe(sub, &topic).await
683    }
684
685    /// Unsubscribes from public trade updates for a specific instrument.
686    ///
687    /// # Errors
688    ///
689    /// Returns an error if the unsubscription request fails.
690    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
691        let ticker = Self::ticker_from_instrument_id(&instrument_id);
692        let topic = Self::topic(DydxWsChannel::Trades, Some(&ticker));
693        if !self.subscriptions.remove_reference(&topic) {
694            return Ok(());
695        }
696
697        let sub = DydxSubscription {
698            op: DydxWsOperation::Unsubscribe,
699            channel: DydxWsChannel::Trades,
700            id: Some(ticker),
701        };
702
703        self.send_and_track_unsubscribe(sub, &topic).await
704    }
705
706    /// Subscribes to orderbook updates for a specific instrument.
707    ///
708    /// # Errors
709    ///
710    /// Returns an error if the subscription request fails.
711    ///
712    /// # References
713    ///
714    /// <https://docs.dydx.trade/developers/indexer/websockets#orderbook-channel>
715    pub async fn subscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
716        let ticker = Self::ticker_from_instrument_id(&instrument_id);
717        let topic = Self::topic(DydxWsChannel::Orderbook, Some(&ticker));
718        if !self.subscriptions.add_reference(&topic) {
719            return Ok(());
720        }
721
722        let sub = DydxSubscription {
723            op: DydxWsOperation::Subscribe,
724            channel: DydxWsChannel::Orderbook,
725            id: Some(ticker),
726        };
727
728        self.send_and_track_subscribe(sub, &topic).await
729    }
730
731    /// Unsubscribes from orderbook updates for a specific instrument.
732    ///
733    /// # Errors
734    ///
735    /// Returns an error if the unsubscription request fails.
736    pub async fn unsubscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
737        let ticker = Self::ticker_from_instrument_id(&instrument_id);
738        let topic = Self::topic(DydxWsChannel::Orderbook, Some(&ticker));
739        if !self.subscriptions.remove_reference(&topic) {
740            return Ok(());
741        }
742
743        let sub = DydxSubscription {
744            op: DydxWsOperation::Unsubscribe,
745            channel: DydxWsChannel::Orderbook,
746            id: Some(ticker),
747        };
748
749        self.send_and_track_unsubscribe(sub, &topic).await
750    }
751
752    /// Subscribes to candle/kline updates for a specific instrument.
753    ///
754    /// # Errors
755    ///
756    /// Returns an error if the subscription request fails.
757    ///
758    /// # References
759    ///
760    /// <https://docs.dydx.trade/developers/indexer/websockets#candles-channel>
761    pub async fn subscribe_candles(
762        &self,
763        instrument_id: InstrumentId,
764        resolution: &str,
765    ) -> DydxWsResult<()> {
766        let ticker = Self::ticker_from_instrument_id(&instrument_id);
767        let id = format!("{ticker}/{resolution}");
768        let topic = Self::topic(DydxWsChannel::Candles, Some(&id));
769        if !self.subscriptions.add_reference(&topic) {
770            return Ok(());
771        }
772
773        let sub = DydxSubscription {
774            op: DydxWsOperation::Subscribe,
775            channel: DydxWsChannel::Candles,
776            id: Some(id),
777        };
778
779        self.send_and_track_subscribe(sub, &topic).await
780    }
781
782    /// Unsubscribes from candle/kline updates for a specific instrument.
783    ///
784    /// # Errors
785    ///
786    /// Returns an error if the unsubscription request fails.
787    pub async fn unsubscribe_candles(
788        &self,
789        instrument_id: InstrumentId,
790        resolution: &str,
791    ) -> DydxWsResult<()> {
792        let ticker = Self::ticker_from_instrument_id(&instrument_id);
793        let id = format!("{ticker}/{resolution}");
794        let topic = Self::topic(DydxWsChannel::Candles, Some(&id));
795        if !self.subscriptions.remove_reference(&topic) {
796            return Ok(());
797        }
798
799        let sub = DydxSubscription {
800            op: DydxWsOperation::Unsubscribe,
801            channel: DydxWsChannel::Candles,
802            id: Some(id),
803        };
804
805        self.send_and_track_unsubscribe(sub, &topic).await
806    }
807
808    /// Subscribes to market updates for all instruments.
809    ///
810    /// # Errors
811    ///
812    /// Returns an error if the subscription request fails.
813    ///
814    /// # References
815    ///
816    /// <https://docs.dydx.trade/developers/indexer/websockets#markets-channel>
817    pub async fn subscribe_markets(&self) -> DydxWsResult<()> {
818        let topic = Self::topic(DydxWsChannel::Markets, None);
819        if !self.subscriptions.add_reference(&topic) {
820            return Ok(());
821        }
822
823        let sub = DydxSubscription {
824            op: DydxWsOperation::Subscribe,
825            channel: DydxWsChannel::Markets,
826            id: None,
827        };
828
829        self.send_and_track_subscribe(sub, &topic).await
830    }
831
832    /// Unsubscribes from market updates.
833    ///
834    /// # Errors
835    ///
836    /// Returns an error if the unsubscription request fails.
837    pub async fn unsubscribe_markets(&self) -> DydxWsResult<()> {
838        let topic = Self::topic(DydxWsChannel::Markets, None);
839        if !self.subscriptions.remove_reference(&topic) {
840            return Ok(());
841        }
842
843        let sub = DydxSubscription {
844            op: DydxWsOperation::Unsubscribe,
845            channel: DydxWsChannel::Markets,
846            id: None,
847        };
848
849        self.send_and_track_unsubscribe(sub, &topic).await
850    }
851
852    /// Subscribes to subaccount updates (orders, fills, positions, balances).
853    ///
854    /// This requires authentication and will only work for private WebSocket clients
855    /// created with [`Self::new_private`].
856    ///
857    /// # Errors
858    ///
859    /// Returns an error if the client was not created with credentials or if the
860    /// subscription request fails.
861    ///
862    /// # References
863    ///
864    /// <https://docs.dydx.trade/developers/indexer/websockets#subaccounts-channel>
865    pub async fn subscribe_subaccount(
866        &self,
867        address: &str,
868        subaccount_number: u32,
869    ) -> DydxWsResult<()> {
870        if !self.requires_auth {
871            return Err(DydxWsError::Authentication(
872                "Subaccount subscriptions require authentication. Use new_private() to create an authenticated client".to_string(),
873            ));
874        }
875        let id = format!("{address}/{subaccount_number}");
876        let topic = Self::topic(DydxWsChannel::Subaccounts, Some(&id));
877        if !self.subscriptions.add_reference(&topic) {
878            return Ok(());
879        }
880
881        let sub = DydxSubscription {
882            op: DydxWsOperation::Subscribe,
883            channel: DydxWsChannel::Subaccounts,
884            id: Some(id),
885        };
886
887        self.send_and_track_subscribe(sub, &topic).await
888    }
889
890    /// Unsubscribes from subaccount updates.
891    ///
892    /// # Errors
893    ///
894    /// Returns an error if the unsubscription request fails.
895    pub async fn unsubscribe_subaccount(
896        &self,
897        address: &str,
898        subaccount_number: u32,
899    ) -> DydxWsResult<()> {
900        let id = format!("{address}/{subaccount_number}");
901        let topic = Self::topic(DydxWsChannel::Subaccounts, Some(&id));
902        if !self.subscriptions.remove_reference(&topic) {
903            return Ok(());
904        }
905
906        let sub = DydxSubscription {
907            op: DydxWsOperation::Unsubscribe,
908            channel: DydxWsChannel::Subaccounts,
909            id: Some(id),
910        };
911
912        self.send_and_track_unsubscribe(sub, &topic).await
913    }
914
915    /// Subscribes to block height updates.
916    ///
917    /// # Errors
918    ///
919    /// Returns an error if the subscription request fails.
920    ///
921    /// # References
922    ///
923    /// <https://docs.dydx.trade/developers/indexer/websockets#block-height-channel>
924    pub async fn subscribe_block_height(&self) -> DydxWsResult<()> {
925        let topic = Self::topic(DydxWsChannel::BlockHeight, None);
926        if !self.subscriptions.add_reference(&topic) {
927            return Ok(());
928        }
929
930        let sub = DydxSubscription {
931            op: DydxWsOperation::Subscribe,
932            channel: DydxWsChannel::BlockHeight,
933            id: None,
934        };
935
936        self.send_and_track_subscribe(sub, &topic).await
937    }
938
939    /// Unsubscribes from block height updates.
940    ///
941    /// # Errors
942    ///
943    /// Returns an error if the unsubscription request fails.
944    pub async fn unsubscribe_block_height(&self) -> DydxWsResult<()> {
945        let topic = Self::topic(DydxWsChannel::BlockHeight, None);
946        if !self.subscriptions.remove_reference(&topic) {
947            return Ok(());
948        }
949
950        let sub = DydxSubscription {
951            op: DydxWsOperation::Unsubscribe,
952            channel: DydxWsChannel::BlockHeight,
953            id: None,
954        };
955
956        self.send_and_track_unsubscribe(sub, &topic).await
957    }
958}