Skip to main content

nautilus_bitmex/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the WebSocket client integration for the [BitMEX](https://bitmex.com) WebSocket API.
17//!
18//! This module defines and implements a [`BitmexWebSocketClient`] for
19//! connecting to BitMEX WebSocket streams. It handles authentication (when credentials
20//! are provided), manages subscriptions to market data and account update channels,
21//! and emits venue-specific message types for consumers to parse.
22
23use std::{
24    sync::{
25        Arc,
26        atomic::{AtomicBool, AtomicU8, Ordering},
27    },
28    time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::live::get_runtime;
35use nautilus_core::{
36    consts::NAUTILUS_USER_AGENT,
37    env::{get_env_var, get_or_env_var_opt},
38};
39use nautilus_model::{
40    data::bar::BarType,
41    identifiers::{AccountId, InstrumentId},
42    instruments::{Instrument, InstrumentAny},
43};
44use nautilus_network::{
45    http::USER_AGENT,
46    mode::ConnectionMode,
47    websocket::{
48        AUTHENTICATION_TIMEOUT_SECS, AuthTracker, PingHandler, SubscriptionState, TransportBackend,
49        WebSocketClient, WebSocketConfig, channel_message_handler,
50    },
51};
52use tokio_tungstenite::tungstenite::Message;
53use ustr::Ustr;
54
55use super::{
56    enums::{BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic},
57    error::BitmexWsError,
58    handler::{BitmexWsFeedHandler, HandlerCommand},
59    messages::{BitmexAuthentication, BitmexSubscription, BitmexWsMessage},
60    parse::{is_index_symbol, topic_from_bar_spec},
61};
62use crate::common::{
63    consts::{BITMEX_WS_TOPIC_DELIMITER, BITMEX_WS_URL},
64    credential::{Credential, credential_env_vars},
65    enums::BitmexEnvironment,
66};
67
68/// Provides a WebSocket client for connecting to the [BitMEX](https://bitmex.com) real-time API.
69///
70/// Key runtime patterns:
71/// - Authentication handshakes are managed by the internal auth tracker, ensuring resubscriptions
72///   occur only after BitMEX acknowledges `authKey` messages.
73/// - The subscription state maintains pending and confirmed topics so reconnection replay is
74///   deterministic and per-topic errors are surfaced.
75#[derive(Clone, Debug)]
76pub struct BitmexWebSocketClient {
77    url: String,
78    credential: Option<Credential>,
79    heartbeat: Option<u64>,
80    account_id: AccountId,
81    auth_tracker: AuthTracker,
82    signal: Arc<AtomicBool>,
83    connection_mode: Arc<ArcSwap<AtomicU8>>,
84    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
85    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<BitmexWsMessage>>>,
86    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
87    subscriptions: SubscriptionState,
88    tracked_subscriptions: Arc<DashMap<String, ()>>,
89    instruments: Arc<DashMap<Ustr, InstrumentAny>>,
90    transport_backend: TransportBackend,
91    proxy_url: Option<String>,
92}
93
94impl BitmexWebSocketClient {
95    /// Creates a new [`BitmexWebSocketClient`] instance.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if only one of `api_key` or `api_secret` is provided (both or neither required).
100    pub fn new(
101        url: Option<String>,
102        api_key: Option<String>,
103        api_secret: Option<String>,
104        account_id: Option<AccountId>,
105        heartbeat: u64,
106        transport_backend: TransportBackend,
107        proxy_url: Option<String>,
108    ) -> anyhow::Result<Self> {
109        let credential = match (api_key, api_secret) {
110            (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
111            (None, None) => None,
112            _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
113        };
114
115        let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
116
117        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
118        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
119
120        // Placeholder channel until connect() creates the real one
121        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
122
123        Ok(Self {
124            url: url.unwrap_or(BITMEX_WS_URL.to_string()),
125            credential,
126            heartbeat: Some(heartbeat),
127            account_id,
128            auth_tracker: AuthTracker::new(),
129            signal: Arc::new(AtomicBool::new(false)),
130            connection_mode,
131            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
132            out_rx: None,
133            task_handle: None,
134            subscriptions: SubscriptionState::new(BITMEX_WS_TOPIC_DELIMITER),
135            tracked_subscriptions: Arc::new(DashMap::new()),
136            instruments: Arc::new(DashMap::new()),
137            transport_backend,
138            proxy_url,
139        })
140    }
141
142    /// Creates a new [`BitmexWebSocketClient`] with environment variable credential resolution.
143    ///
144    /// If `api_key` or `api_secret` are not provided, they will be loaded from
145    /// environment variables based on the `environment`:
146    /// - Testnet: `BITMEX_TESTNET_API_KEY`, `BITMEX_TESTNET_API_SECRET`
147    /// - Mainnet: `BITMEX_API_KEY`, `BITMEX_API_SECRET`
148    ///
149    /// # Errors
150    ///
151    /// Returns an error if only one of `api_key` or `api_secret` is provided.
152    #[expect(clippy::too_many_arguments)]
153    pub fn new_with_env(
154        url: Option<String>,
155        api_key: Option<String>,
156        api_secret: Option<String>,
157        account_id: Option<AccountId>,
158        heartbeat: u64,
159        environment: BitmexEnvironment,
160        transport_backend: TransportBackend,
161        proxy_url: Option<String>,
162    ) -> anyhow::Result<Self> {
163        let (api_key_env, api_secret_env) = credential_env_vars(environment);
164
165        let key = get_or_env_var_opt(api_key, api_key_env);
166        let secret = get_or_env_var_opt(api_secret, api_secret_env);
167
168        Self::new(
169            url,
170            key,
171            secret,
172            account_id,
173            heartbeat,
174            transport_backend,
175            proxy_url,
176        )
177    }
178
179    /// Creates a new authenticated [`BitmexWebSocketClient`] using environment variables.
180    ///
181    /// # Errors
182    ///
183    /// Returns an error if environment variables are not set or credentials are invalid.
184    pub fn from_env() -> anyhow::Result<Self> {
185        let url = get_env_var("BITMEX_WS_URL")?;
186        let (key_var, secret_var) = credential_env_vars(BitmexEnvironment::Mainnet);
187        let api_key = get_env_var(key_var)?;
188        let api_secret = get_env_var(secret_var)?;
189
190        Self::new(
191            Some(url),
192            Some(api_key),
193            Some(api_secret),
194            None,
195            5,
196            TransportBackend::default(),
197            None,
198        )
199    }
200
201    /// Returns the websocket url being used by the client.
202    #[must_use]
203    pub const fn url(&self) -> &str {
204        self.url.as_str()
205    }
206
207    /// Returns the public API key being used by the client.
208    #[must_use]
209    pub fn api_key(&self) -> Option<&str> {
210        self.credential.as_ref().map(|c| c.api_key())
211    }
212
213    /// Returns a masked version of the API key for logging purposes.
214    #[must_use]
215    pub fn api_key_masked(&self) -> Option<String> {
216        self.credential.as_ref().map(|c| c.api_key_masked())
217    }
218
219    /// Returns a value indicating whether the client is active.
220    #[must_use]
221    pub fn is_active(&self) -> bool {
222        let connection_mode_arc = self.connection_mode.load();
223        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
224            && !self.signal.load(Ordering::Relaxed)
225    }
226
227    /// Returns a value indicating whether the client is closed.
228    #[must_use]
229    pub fn is_closed(&self) -> bool {
230        let connection_mode_arc = self.connection_mode.load();
231        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
232            || self.signal.load(Ordering::Relaxed)
233    }
234
235    /// Returns the account ID.
236    #[must_use]
237    pub fn account_id(&self) -> AccountId {
238        self.account_id
239    }
240
241    /// Sets the account ID.
242    pub fn set_account_id(&mut self, account_id: AccountId) {
243        self.account_id = account_id;
244    }
245
246    /// Bulk-replaces the instrument cache with the given instruments.
247    pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
248        self.instruments.clear();
249        for inst in instruments {
250            self.instruments
251                .insert(inst.raw_symbol().inner(), inst.clone());
252        }
253    }
254
255    /// Upserts a single instrument into the cache.
256    pub fn cache_instrument(&self, instrument: InstrumentAny) {
257        self.instruments
258            .insert(instrument.raw_symbol().inner(), instrument);
259    }
260
261    /// Retrieves an instrument from the cache by symbol.
262    #[must_use]
263    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
264        self.instruments
265            .get(symbol)
266            .map(|entry| entry.value().clone())
267    }
268
269    /// Connect to the BitMEX WebSocket server.
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if the WebSocket connection fails or authentication fails (if credentials provided).
274    ///
275    pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
276        let (client, raw_rx) = self.connect_inner().await?;
277
278        // Reset shutdown signal so is_active() works after close+reconnect
279        self.signal.store(false, Ordering::Relaxed);
280
281        // Replace connection state so all clones see the underlying WebSocketClient's state
282        self.connection_mode.store(client.connection_mode_atomic());
283
284        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<BitmexWsMessage>();
285        self.out_rx = Some(Arc::new(out_rx));
286
287        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
288        *self.cmd_tx.write().await = cmd_tx.clone();
289
290        // Send WebSocketClient to handler
291        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
292            return Err(BitmexWsError::ClientError(format!(
293                "Failed to send WebSocketClient to handler: {e}"
294            )));
295        }
296
297        let signal = self.signal.clone();
298        let credential = self.credential.clone();
299        let auth_tracker = self.auth_tracker.clone();
300        let subscriptions = self.subscriptions.clone();
301        let cmd_tx_for_reconnect = cmd_tx.clone();
302
303        let stream_handle = get_runtime().spawn(async move {
304            let mut handler = BitmexWsFeedHandler::new(
305                signal.clone(),
306                cmd_rx,
307                raw_rx,
308                out_tx,
309                auth_tracker.clone(),
310                subscriptions.clone(),
311            );
312
313            // Helper closure to resubscribe all tracked subscriptions after reconnection
314            let resubscribe_all = || {
315                // Use SubscriptionState as source of truth for what to restore
316                let topics = subscriptions.all_topics();
317
318                if topics.is_empty() {
319                    return;
320                }
321
322                log::debug!(
323                    "Resubscribing to confirmed subscriptions: count={}",
324                    topics.len()
325                );
326
327                for topic in &topics {
328                    subscriptions.mark_subscribe(topic.as_str());
329                }
330
331                // Serialize subscription messages
332                let mut payloads = Vec::with_capacity(topics.len());
333                for topic in &topics {
334                    let message = BitmexSubscription {
335                        op: BitmexWsOperation::Subscribe,
336                        args: vec![Ustr::from(topic.as_ref())],
337                    };
338
339                    if let Ok(payload) = serde_json::to_string(&message) {
340                        payloads.push(payload);
341                    }
342                }
343
344                if let Err(e) =
345                    cmd_tx_for_reconnect.send(HandlerCommand::Subscribe { topics: payloads })
346                {
347                    log::error!("Failed to send resubscribe command: {e}");
348                }
349            };
350
351            // Run message processing with reconnection handling
352            loop {
353                match handler.next().await {
354                    Some(BitmexWsMessage::Reconnected) => {
355                        if signal.load(Ordering::Relaxed) {
356                            continue;
357                        }
358
359                        log::info!("WebSocket reconnected");
360
361                        // Mark all confirmed subscriptions as failed so they transition to pending state
362                        let confirmed_topics: Vec<String> = {
363                            let confirmed = subscriptions.confirmed();
364                            let mut topics = Vec::new();
365
366                            for entry in confirmed.iter() {
367                                let (channel, symbols) = entry.pair();
368
369                                if *channel == BitmexWsTopic::Instrument.as_ref() {
370                                    continue;
371                                }
372
373                                for symbol in symbols {
374                                    if symbol.is_empty() {
375                                        topics.push(channel.to_string());
376                                    } else {
377                                        topics.push(format!("{channel}:{symbol}"));
378                                    }
379                                }
380                            }
381
382                            topics
383                        };
384
385                        if !confirmed_topics.is_empty() {
386                            log::debug!(
387                                "Marking confirmed subscriptions as pending for replay: count={}",
388                                confirmed_topics.len()
389                            );
390
391                            for topic in confirmed_topics {
392                                subscriptions.mark_failure(&topic);
393                            }
394                        }
395
396                        if let Some(cred) = &credential {
397                            log::debug!("Re-authenticating after reconnection");
398
399                            let expires =
400                                (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
401                            let signature = cred.sign("GET", "/realtime", expires, "");
402
403                            let auth_message = BitmexAuthentication {
404                                op: BitmexWsAuthAction::AuthKeyExpires,
405                                args: (cred.api_key().to_string(), expires, signature),
406                            };
407
408                            if let Ok(payload) = serde_json::to_string(&auth_message) {
409                                if let Err(e) = cmd_tx_for_reconnect
410                                    .send(HandlerCommand::Authenticate { payload })
411                                {
412                                    log::error!("Failed to send reconnection auth command: {e}");
413                                }
414                            } else {
415                                log::error!("Failed to serialize reconnection auth message");
416                            }
417                        }
418
419                        // Unauthenticated sessions resubscribe immediately after reconnection,
420                        // authenticated sessions wait for Authenticated message
421                        if credential.is_none() {
422                            log::debug!("No authentication required, resubscribing immediately");
423                            resubscribe_all();
424                        }
425
426                        if handler.send(BitmexWsMessage::Reconnected).is_err() {
427                            log::error!("Failed to forward reconnect event (receiver dropped)");
428                            break;
429                        }
430                    }
431                    Some(BitmexWsMessage::Authenticated) => {
432                        log::debug!("Authenticated after reconnection, resubscribing");
433                        resubscribe_all();
434                    }
435                    Some(msg) => {
436                        if handler.send(msg).is_err() {
437                            log::error!("Failed to send message (receiver dropped)");
438                            break;
439                        }
440                    }
441                    None => {
442                        // Stream ended - check if it's a stop signal
443                        if handler.is_stopped() {
444                            log::debug!("Stop signal received, ending message processing");
445                            break;
446                        }
447                        // Otherwise it's an unexpected stream end
448                        log::warn!("WebSocket stream ended unexpectedly");
449                        break;
450                    }
451                }
452            }
453
454            log::debug!("Handler task exiting");
455        });
456
457        self.task_handle = Some(Arc::new(stream_handle));
458
459        if self.credential.is_some()
460            && let Err(e) = self.authenticate().await
461        {
462            if let Some(handle) = self.task_handle.take() {
463                handle.abort();
464            }
465            self.signal.store(true, Ordering::Relaxed);
466            return Err(e);
467        }
468
469        // Subscribe to instrument topic
470        let instrument_topic = BitmexWsTopic::Instrument.as_ref().to_string();
471        self.subscriptions.mark_subscribe(&instrument_topic);
472        self.tracked_subscriptions.insert(instrument_topic, ());
473
474        let subscribe_msg = BitmexSubscription {
475            op: BitmexWsOperation::Subscribe,
476            args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
477        };
478
479        match serde_json::to_string(&subscribe_msg) {
480            Ok(subscribe_json) => {
481                if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
482                    topics: vec![subscribe_json],
483                }) {
484                    log::error!("Failed to send subscribe command for instruments: {e}");
485                } else {
486                    log::debug!("Subscribed to all instruments");
487                }
488            }
489            Err(e) => {
490                log::error!("Failed to serialize subscribe message: {e}");
491            }
492        }
493
494        Ok(())
495    }
496
497    /// Connect to the WebSocket and return a message receiver.
498    ///
499    /// # Errors
500    ///
501    /// Returns an error if the WebSocket connection fails or if authentication fails (when credentials are provided).
502    async fn connect_inner(
503        &self,
504    ) -> Result<
505        (
506            WebSocketClient,
507            tokio::sync::mpsc::UnboundedReceiver<Message>,
508        ),
509        BitmexWsError,
510    > {
511        let (message_handler, rx) = channel_message_handler();
512
513        // No-op ping handler: handler owns the WebSocketClient and responds to pings directly
514        // in the message loop for minimal latency (see handler.rs pong response)
515        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
516            // Handler responds to pings internally via select! loop
517        });
518
519        let config = WebSocketConfig {
520            url: self.url.clone(),
521            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
522            heartbeat: self.heartbeat,
523            heartbeat_msg: None,
524            reconnect_timeout_ms: Some(5_000),
525            reconnect_delay_initial_ms: None, // Use default
526            reconnect_delay_max_ms: None,     // Use default
527            reconnect_backoff_factor: None,   // Use default
528            reconnect_jitter_ms: None,        // Use default
529            reconnect_max_attempts: None,
530            idle_timeout_ms: None,
531            backend: self.transport_backend,
532            proxy_url: self.proxy_url.clone(),
533        };
534
535        let keyed_quotas = vec![];
536        let client = WebSocketClient::connect(
537            config,
538            Some(message_handler),
539            Some(ping_handler),
540            None, // post_reconnection
541            keyed_quotas,
542            None, // default_quota
543        )
544        .await
545        .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
546
547        Ok((client, rx))
548    }
549
550    /// Authenticate the WebSocket connection using the provided credentials.
551    ///
552    /// # Errors
553    ///
554    /// Returns an error if the WebSocket is not connected, if authentication fails,
555    /// or if credentials are not available.
556    async fn authenticate(&self) -> Result<(), BitmexWsError> {
557        let credential = match &self.credential {
558            Some(credential) => credential,
559            None => {
560                return Err(BitmexWsError::AuthenticationError(
561                    "API credentials not available to authenticate".to_string(),
562                ));
563            }
564        };
565
566        let receiver = self.auth_tracker.begin();
567
568        let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
569        let signature = credential.sign("GET", "/realtime", expires, "");
570
571        let auth_message = BitmexAuthentication {
572            op: BitmexWsAuthAction::AuthKeyExpires,
573            args: (credential.api_key().to_string(), expires, signature),
574        };
575
576        let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
577            let msg = format!("Failed to serialize auth message: {e}");
578            self.auth_tracker.fail(msg.clone());
579            BitmexWsError::AuthenticationError(msg)
580        })?;
581
582        // Send Authenticate command to handler
583        self.cmd_tx
584            .read()
585            .await
586            .send(HandlerCommand::Authenticate { payload: auth_json })
587            .map_err(|e| {
588                let msg = format!("Failed to send authenticate command: {e}");
589                self.auth_tracker.fail(msg.clone());
590                BitmexWsError::AuthenticationError(msg)
591            })?;
592
593        self.auth_tracker
594            .wait_for_result::<BitmexWsError>(
595                Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
596                receiver,
597            )
598            .await
599    }
600
601    /// Wait until the WebSocket connection is active.
602    ///
603    /// # Errors
604    ///
605    /// Returns an error if the connection times out.
606    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
607        let timeout = Duration::from_secs_f64(timeout_secs);
608
609        tokio::time::timeout(timeout, async {
610            while !self.is_active() {
611                tokio::time::sleep(Duration::from_millis(10)).await;
612            }
613        })
614        .await
615        .map_err(|_| {
616            BitmexWsError::ClientError(format!(
617                "WebSocket connection timeout after {timeout_secs} seconds"
618            ))
619        })?;
620
621        Ok(())
622    }
623
624    /// Provides the internal stream as a channel-based stream.
625    ///
626    /// # Panics
627    ///
628    /// This function panics:
629    /// - If the websocket is not connected.
630    /// - If `stream` has already been called somewhere else (stream receiver is then taken).
631    pub fn stream(&mut self) -> impl Stream<Item = BitmexWsMessage> + use<> {
632        let rx = self
633            .out_rx
634            .take()
635            .expect("Stream receiver already taken or not connected");
636        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
637        async_stream::stream! {
638            while let Some(msg) = rx.recv().await {
639                yield msg;
640            }
641        }
642    }
643
644    /// Closes the client.
645    ///
646    /// # Errors
647    ///
648    /// Returns an error if the WebSocket is not connected or if closing fails.
649    pub async fn close(&mut self) -> Result<(), BitmexWsError> {
650        log::debug!("Starting close process");
651
652        self.signal.store(true, Ordering::Relaxed);
653
654        // Send Disconnect command to handler
655        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
656            log::debug!(
657                "Failed to send disconnect command (handler may already be shut down): {e}"
658            );
659        }
660
661        // Clean up task handle with timeout
662        if let Some(task_handle) = self.task_handle.take() {
663            match Arc::try_unwrap(task_handle) {
664                Ok(handle) => {
665                    log::debug!("Waiting for task handle to complete");
666                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
667                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
668                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
669                        Err(_) => {
670                            log::warn!(
671                                "Timeout waiting for task handle, task may still be running"
672                            );
673                            // The task will be dropped and should clean up automatically
674                        }
675                    }
676                }
677                Err(arc_handle) => {
678                    log::debug!(
679                        "Cannot take ownership of task handle - other references exist, aborting task"
680                    );
681                    arc_handle.abort();
682                }
683            }
684        } else {
685            log::debug!("No task handle to await");
686        }
687
688        log::debug!("Closed");
689
690        Ok(())
691    }
692
693    /// Subscribe to the specified topics.
694    ///
695    /// # Errors
696    ///
697    /// Returns an error if the WebSocket is not connected or if sending the subscription message fails.
698    pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
699        log::debug!("Subscribing to topics: {topics:?}");
700
701        for topic in &topics {
702            self.subscriptions.mark_subscribe(topic.as_str());
703            self.tracked_subscriptions.insert(topic.clone(), ());
704        }
705
706        // Serialize subscription messages
707        let mut payloads = Vec::with_capacity(topics.len());
708        for topic in &topics {
709            let message = BitmexSubscription {
710                op: BitmexWsOperation::Subscribe,
711                args: vec![Ustr::from(topic.as_ref())],
712            };
713            let payload = serde_json::to_string(&message).map_err(|e| {
714                BitmexWsError::SubscriptionError(format!("Failed to serialize subscription: {e}"))
715            })?;
716            payloads.push(payload);
717        }
718
719        // Send Subscribe command to handler
720        let cmd = HandlerCommand::Subscribe { topics: payloads };
721
722        self.send_cmd(cmd).await.map_err(|e| {
723            BitmexWsError::SubscriptionError(format!("Failed to send subscribe command: {e}"))
724        })
725    }
726
727    /// Unsubscribe from the specified topics.
728    ///
729    /// # Errors
730    ///
731    /// Returns an error if the WebSocket is not connected or if sending the unsubscription message fails.
732    async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
733        log::debug!("Attempting to unsubscribe from topics: {topics:?}");
734
735        if self.signal.load(Ordering::Relaxed) {
736            log::debug!("Shutdown signal detected, skipping unsubscribe");
737            return Ok(());
738        }
739
740        for topic in &topics {
741            self.subscriptions.mark_unsubscribe(topic.as_str());
742            self.tracked_subscriptions.remove(topic);
743        }
744
745        // Serialize unsubscription messages
746        let mut payloads = Vec::with_capacity(topics.len());
747        for topic in &topics {
748            let message = BitmexSubscription {
749                op: BitmexWsOperation::Unsubscribe,
750                args: vec![Ustr::from(topic.as_ref())],
751            };
752
753            if let Ok(payload) = serde_json::to_string(&message) {
754                payloads.push(payload);
755            }
756        }
757
758        // Send Unsubscribe command to handler
759        let cmd = HandlerCommand::Unsubscribe { topics: payloads };
760
761        if let Err(e) = self.send_cmd(cmd).await {
762            log::debug!("Failed to send unsubscribe command: {e}");
763        }
764
765        Ok(())
766    }
767
768    /// Get the current number of active subscriptions.
769    #[must_use]
770    pub fn subscription_count(&self) -> usize {
771        self.subscriptions.len()
772    }
773
774    pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
775        let symbol = instrument_id.symbol.inner();
776        let confirmed = self.subscriptions.confirmed();
777        let mut channels = Vec::with_capacity(confirmed.len());
778
779        for entry in confirmed.iter() {
780            let (channel, symbols) = entry.pair();
781            if symbols.contains(&symbol) {
782                // Return the full topic string (e.g., "orderBookL2:XBTUSD")
783                channels.push(format!("{channel}:{symbol}"));
784            } else {
785                let has_channel_marker = symbols.iter().any(|s| s.is_empty());
786                if has_channel_marker
787                    && (*channel == BitmexWsAuthChannel::Execution.as_ref()
788                        || *channel == BitmexWsAuthChannel::Order.as_ref())
789                {
790                    // These are account-level subscriptions without symbols
791                    channels.push(channel.to_string());
792                }
793            }
794        }
795
796        channels
797    }
798
799    /// Subscribe to instrument updates for all instruments on the venue.
800    ///
801    /// # Errors
802    ///
803    /// Returns an error if the WebSocket is not connected or if the subscription fails.
804    pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
805        // Already subscribed automatically on connection
806        log::debug!("Already subscribed to all instruments on connection, skipping");
807        Ok(())
808    }
809
810    /// Subscribe to instrument updates (mark/index prices) for the specified instrument.
811    ///
812    /// # Errors
813    ///
814    /// Returns an error if the WebSocket is not connected or if the subscription fails.
815    pub async fn subscribe_instrument(
816        &self,
817        instrument_id: InstrumentId,
818    ) -> Result<(), BitmexWsError> {
819        // Already subscribed to all instruments on connection
820        log::debug!(
821            "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
822        );
823        Ok(())
824    }
825
826    /// Subscribe to order book updates for the specified instrument.
827    ///
828    /// # Errors
829    ///
830    /// Returns an error if the WebSocket is not connected or if the subscription fails.
831    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
832        let topic = BitmexWsTopic::OrderBookL2;
833        let symbol = instrument_id.symbol.inner();
834        self.subscribe(vec![format!("{topic}:{symbol}")]).await
835    }
836
837    /// Subscribe to order book L2 (25 levels) updates for the specified instrument.
838    ///
839    /// # Errors
840    ///
841    /// Returns an error if the WebSocket is not connected or if the subscription fails.
842    pub async fn subscribe_book_25(
843        &self,
844        instrument_id: InstrumentId,
845    ) -> Result<(), BitmexWsError> {
846        let topic = BitmexWsTopic::OrderBookL2_25;
847        let symbol = instrument_id.symbol.inner();
848        self.subscribe(vec![format!("{topic}:{symbol}")]).await
849    }
850
851    /// Subscribe to order book depth 10 updates for the specified instrument.
852    ///
853    /// # Errors
854    ///
855    /// Returns an error if the WebSocket is not connected or if the subscription fails.
856    pub async fn subscribe_book_depth10(
857        &self,
858        instrument_id: InstrumentId,
859    ) -> Result<(), BitmexWsError> {
860        let topic = BitmexWsTopic::OrderBook10;
861        let symbol = instrument_id.symbol.inner();
862        self.subscribe(vec![format!("{topic}:{symbol}")]).await
863    }
864
865    /// Subscribe to quote updates for the specified instrument.
866    ///
867    /// Note: Index symbols (starting with '.') do not have quotes and will be silently ignored.
868    ///
869    /// # Errors
870    ///
871    /// Returns an error if the WebSocket is not connected or if the subscription fails.
872    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
873        let symbol = instrument_id.symbol.inner();
874
875        // Index symbols don't have quotes (bid/ask), only a single price
876        if is_index_symbol(&instrument_id.symbol.inner()) {
877            log::warn!("Ignoring quote subscription for index symbol: {symbol}");
878            return Ok(());
879        }
880
881        let topic = BitmexWsTopic::Quote;
882        self.subscribe(vec![format!("{topic}:{symbol}")]).await
883    }
884
885    /// Subscribe to trade updates for the specified instrument.
886    ///
887    /// Note: Index symbols (starting with '.') do not have trades and will be silently ignored.
888    ///
889    /// # Errors
890    ///
891    /// Returns an error if the WebSocket is not connected or if the subscription fails.
892    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
893        let symbol = instrument_id.symbol.inner();
894
895        // Index symbols don't have trades
896        if is_index_symbol(&symbol) {
897            log::warn!("Ignoring trade subscription for index symbol: {symbol}");
898            return Ok(());
899        }
900
901        let topic = BitmexWsTopic::Trade;
902        self.subscribe(vec![format!("{topic}:{symbol}")]).await
903    }
904
905    /// Subscribe to mark price updates for the specified instrument.
906    ///
907    /// # Errors
908    ///
909    /// Returns an error if the WebSocket is not connected or if the subscription fails.
910    pub async fn subscribe_mark_prices(
911        &self,
912        instrument_id: InstrumentId,
913    ) -> Result<(), BitmexWsError> {
914        self.subscribe_instrument(instrument_id).await
915    }
916
917    /// Subscribe to index price updates for the specified instrument.
918    ///
919    /// # Errors
920    ///
921    /// Returns an error if the WebSocket is not connected or if the subscription fails.
922    pub async fn subscribe_index_prices(
923        &self,
924        instrument_id: InstrumentId,
925    ) -> Result<(), BitmexWsError> {
926        self.subscribe_instrument(instrument_id).await
927    }
928
929    /// Subscribe to funding rate updates for the specified instrument.
930    ///
931    /// # Errors
932    ///
933    /// Returns an error if the WebSocket is not connected or if the subscription fails.
934    pub async fn subscribe_funding_rates(
935        &self,
936        instrument_id: InstrumentId,
937    ) -> Result<(), BitmexWsError> {
938        let topic = BitmexWsTopic::Funding;
939        let symbol = instrument_id.symbol.inner();
940        self.subscribe(vec![format!("{topic}:{symbol}")]).await
941    }
942
943    /// Subscribe to bar updates for the specified bar type.
944    ///
945    /// # Errors
946    ///
947    /// Returns an error if the WebSocket is not connected or if the subscription fails.
948    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
949        let topic = topic_from_bar_spec(bar_type.spec());
950        let symbol = bar_type.instrument_id().symbol.inner();
951        self.subscribe(vec![format!("{topic}:{symbol}")]).await
952    }
953
954    /// Unsubscribe from instrument updates for all instruments on the venue.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
959    pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
960        // No-op: instruments are required for proper operation
961        log::debug!(
962            "Instruments subscription maintained for proper operation, skipping unsubscribe"
963        );
964        Ok(())
965    }
966
967    /// Unsubscribe from instrument updates (mark/index prices) for the specified instrument.
968    ///
969    /// # Errors
970    ///
971    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
972    pub async fn unsubscribe_instrument(
973        &self,
974        instrument_id: InstrumentId,
975    ) -> Result<(), BitmexWsError> {
976        // No-op: instruments are required for proper operation
977        log::debug!(
978            "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
979        );
980        Ok(())
981    }
982
983    /// Unsubscribe from order book updates for the specified instrument.
984    ///
985    /// # Errors
986    ///
987    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
988    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
989        let topic = BitmexWsTopic::OrderBookL2;
990        let symbol = instrument_id.symbol.inner();
991        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
992    }
993
994    /// Unsubscribe from order book L2 (25 levels) updates for the specified instrument.
995    ///
996    /// # Errors
997    ///
998    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
999    pub async fn unsubscribe_book_25(
1000        &self,
1001        instrument_id: InstrumentId,
1002    ) -> Result<(), BitmexWsError> {
1003        let topic = BitmexWsTopic::OrderBookL2_25;
1004        let symbol = instrument_id.symbol.inner();
1005        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1006    }
1007
1008    /// Unsubscribe from order book depth 10 updates for the specified instrument.
1009    ///
1010    /// # Errors
1011    ///
1012    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1013    pub async fn unsubscribe_book_depth10(
1014        &self,
1015        instrument_id: InstrumentId,
1016    ) -> Result<(), BitmexWsError> {
1017        let topic = BitmexWsTopic::OrderBook10;
1018        let symbol = instrument_id.symbol.inner();
1019        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1020    }
1021
1022    /// Unsubscribe from quote updates for the specified instrument.
1023    ///
1024    /// # Errors
1025    ///
1026    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1027    pub async fn unsubscribe_quotes(
1028        &self,
1029        instrument_id: InstrumentId,
1030    ) -> Result<(), BitmexWsError> {
1031        let symbol = instrument_id.symbol.inner();
1032
1033        // Index symbols don't have quotes
1034        if is_index_symbol(&symbol) {
1035            return Ok(());
1036        }
1037
1038        let topic = BitmexWsTopic::Quote;
1039        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1040    }
1041
1042    /// Unsubscribe from trade updates for the specified instrument.
1043    ///
1044    /// # Errors
1045    ///
1046    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1047    pub async fn unsubscribe_trades(
1048        &self,
1049        instrument_id: InstrumentId,
1050    ) -> Result<(), BitmexWsError> {
1051        let symbol = instrument_id.symbol.inner();
1052
1053        // Index symbols don't have trades
1054        if is_index_symbol(&symbol) {
1055            return Ok(());
1056        }
1057
1058        let topic = BitmexWsTopic::Trade;
1059        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1060    }
1061
1062    /// Unsubscribe from mark price updates for the specified instrument.
1063    ///
1064    /// # Errors
1065    ///
1066    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1067    pub async fn unsubscribe_mark_prices(
1068        &self,
1069        instrument_id: InstrumentId,
1070    ) -> Result<(), BitmexWsError> {
1071        // No-op: instrument channel shared with index prices
1072        log::debug!(
1073            "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1074        );
1075        Ok(())
1076    }
1077
1078    /// Unsubscribe from index price updates for the specified instrument.
1079    ///
1080    /// # Errors
1081    ///
1082    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1083    pub async fn unsubscribe_index_prices(
1084        &self,
1085        instrument_id: InstrumentId,
1086    ) -> Result<(), BitmexWsError> {
1087        // No-op: instrument channel shared with mark prices
1088        log::debug!(
1089            "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1090        );
1091        Ok(())
1092    }
1093
1094    /// Unsubscribe from funding rate updates for the specified instrument.
1095    ///
1096    /// # Errors
1097    ///
1098    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1099    pub async fn unsubscribe_funding_rates(
1100        &self,
1101        instrument_id: InstrumentId,
1102    ) -> Result<(), BitmexWsError> {
1103        // No-op: unsubscribing during shutdown causes race conditions
1104        log::debug!(
1105            "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1106        );
1107        Ok(())
1108    }
1109
1110    /// Unsubscribe from bar updates for the specified bar type.
1111    ///
1112    /// # Errors
1113    ///
1114    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1115    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1116        let topic = topic_from_bar_spec(bar_type.spec());
1117        let symbol = bar_type.instrument_id().symbol.inner();
1118        self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1119    }
1120
1121    /// Subscribe to order updates for the authenticated account.
1122    ///
1123    /// # Errors
1124    ///
1125    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1126    pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1127        if self.credential.is_none() {
1128            return Err(BitmexWsError::MissingCredentials);
1129        }
1130        self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1131            .await
1132    }
1133
1134    /// Subscribe to execution updates for the authenticated account.
1135    ///
1136    /// # Errors
1137    ///
1138    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1139    pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1140        if self.credential.is_none() {
1141            return Err(BitmexWsError::MissingCredentials);
1142        }
1143        self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1144            .await
1145    }
1146
1147    /// Subscribe to position updates for the authenticated account.
1148    ///
1149    /// # Errors
1150    ///
1151    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1152    pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1153        if self.credential.is_none() {
1154            return Err(BitmexWsError::MissingCredentials);
1155        }
1156        self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1157            .await
1158    }
1159
1160    /// Subscribe to margin updates for the authenticated account.
1161    ///
1162    /// # Errors
1163    ///
1164    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1165    pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1166        if self.credential.is_none() {
1167            return Err(BitmexWsError::MissingCredentials);
1168        }
1169        self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1170            .await
1171    }
1172
1173    /// Subscribe to wallet updates for the authenticated account.
1174    ///
1175    /// # Errors
1176    ///
1177    /// Returns an error if the WebSocket is not connected, not authenticated, or if the subscription fails.
1178    pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1179        if self.credential.is_none() {
1180            return Err(BitmexWsError::MissingCredentials);
1181        }
1182        self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1183            .await
1184    }
1185
1186    /// Unsubscribe from order updates for the authenticated account.
1187    ///
1188    /// # Errors
1189    ///
1190    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1191    pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1192        self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1193            .await
1194    }
1195
1196    /// Unsubscribe from execution updates for the authenticated account.
1197    ///
1198    /// # Errors
1199    ///
1200    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1201    pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1202        self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1203            .await
1204    }
1205
1206    /// Unsubscribe from position updates for the authenticated account.
1207    ///
1208    /// # Errors
1209    ///
1210    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1211    pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1212        self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1213            .await
1214    }
1215
1216    /// Unsubscribe from margin updates for the authenticated account.
1217    ///
1218    /// # Errors
1219    ///
1220    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1221    pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1222        self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1223            .await
1224    }
1225
1226    /// Unsubscribe from wallet updates for the authenticated account.
1227    ///
1228    /// # Errors
1229    ///
1230    /// Returns an error if the WebSocket is not connected or if the unsubscription fails.
1231    pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1232        self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1233            .await
1234    }
1235
1236    /// Sends a command to the handler.
1237    async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), BitmexWsError> {
1238        self.cmd_tx
1239            .read()
1240            .await
1241            .send(cmd)
1242            .map_err(|e| BitmexWsError::ClientError(format!("Handler not available: {e}")))
1243    }
1244}
1245
1246#[cfg(test)]
1247mod tests {
1248    use ahash::AHashSet;
1249    use rstest::rstest;
1250    use ustr::Ustr;
1251
1252    use super::*;
1253
1254    #[rstest]
1255    fn test_reconnect_topics_restoration_logic() {
1256        // Create real client with credentials
1257        let client = BitmexWebSocketClient::new(
1258            Some("ws://test.com".to_string()),
1259            Some("test_key".to_string()),
1260            Some("test_secret".to_string()),
1261            Some(AccountId::new("BITMEX-TEST")),
1262            5,
1263            TransportBackend::default(),
1264            None,
1265        )
1266        .unwrap();
1267
1268        // Populate subscriptions like they would be during normal operation
1269        let subs = client.subscriptions.confirmed();
1270        subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1271            let mut set = AHashSet::new();
1272            set.insert(Ustr::from("XBTUSD"));
1273            set.insert(Ustr::from("ETHUSD"));
1274            set
1275        });
1276
1277        subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1278            let mut set = AHashSet::new();
1279            set.insert(Ustr::from("XBTUSD"));
1280            set
1281        });
1282
1283        // Private channels (no symbols)
1284        subs.insert(Ustr::from(BitmexWsAuthChannel::Order.as_ref()), {
1285            let mut set = AHashSet::new();
1286            set.insert(Ustr::from(""));
1287            set
1288        });
1289        subs.insert(Ustr::from(BitmexWsAuthChannel::Position.as_ref()), {
1290            let mut set = AHashSet::new();
1291            set.insert(Ustr::from(""));
1292            set
1293        });
1294
1295        // Test the actual reconnection topic building logic
1296        let mut topics_to_restore = Vec::new();
1297
1298        for entry in subs.iter() {
1299            let (channel, symbols) = entry.pair();
1300            for symbol in symbols {
1301                if symbol.is_empty() {
1302                    topics_to_restore.push(channel.to_string());
1303                } else {
1304                    topics_to_restore.push(format!("{channel}:{symbol}"));
1305                }
1306            }
1307        }
1308
1309        // Verify it builds the correct restoration topics
1310        assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1311        assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1312        assert!(
1313            topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1314        );
1315        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1316        assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1317        assert_eq!(topics_to_restore.len(), 5);
1318    }
1319
1320    #[rstest]
1321    fn test_reconnect_auth_message_building() {
1322        // Test with credentials
1323        let client_with_creds = BitmexWebSocketClient::new(
1324            Some("ws://test.com".to_string()),
1325            Some("test_key".to_string()),
1326            Some("test_secret".to_string()),
1327            Some(AccountId::new("BITMEX-TEST")),
1328            5,
1329            TransportBackend::default(),
1330            None,
1331        )
1332        .unwrap();
1333
1334        // Test the actual auth message building logic from lines 220-228
1335        if let Some(cred) = &client_with_creds.credential {
1336            let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1337            let signature = cred.sign("GET", "/realtime", expires, "");
1338
1339            let auth_message = BitmexAuthentication {
1340                op: BitmexWsAuthAction::AuthKeyExpires,
1341                args: (cred.api_key().to_string(), expires, signature),
1342            };
1343
1344            // Verify auth message structure
1345            assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1346            assert_eq!(auth_message.args.0, "test_key");
1347            assert!(auth_message.args.1 > 0); // expires should be positive
1348            assert!(!auth_message.args.2.is_empty()); // signature should exist
1349        } else {
1350            panic!("Client should have credentials");
1351        }
1352
1353        // Test without credentials
1354        let client_no_creds = BitmexWebSocketClient::new(
1355            Some("ws://test.com".to_string()),
1356            None,
1357            None,
1358            Some(AccountId::new("BITMEX-TEST")),
1359            5,
1360            TransportBackend::default(),
1361            None,
1362        )
1363        .unwrap();
1364
1365        assert!(client_no_creds.credential.is_none());
1366    }
1367
1368    #[rstest]
1369    fn test_subscription_state_after_unsubscribe() {
1370        let client = BitmexWebSocketClient::new(
1371            Some("ws://test.com".to_string()),
1372            Some("test_key".to_string()),
1373            Some("test_secret".to_string()),
1374            Some(AccountId::new("BITMEX-TEST")),
1375            5,
1376            TransportBackend::default(),
1377            None,
1378        )
1379        .unwrap();
1380
1381        // Set up initial subscriptions
1382        let subs = client.subscriptions.confirmed();
1383        subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1384            let mut set = AHashSet::new();
1385            set.insert(Ustr::from("XBTUSD"));
1386            set.insert(Ustr::from("ETHUSD"));
1387            set
1388        });
1389
1390        subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1391            let mut set = AHashSet::new();
1392            set.insert(Ustr::from("XBTUSD"));
1393            set
1394        });
1395
1396        // Simulate unsubscribe logic (like from unsubscribe() method lines 586-599)
1397        let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1398        if let Some((channel, symbol)) = topic.split_once(':')
1399            && let Some(mut entry) = subs.get_mut(&Ustr::from(channel))
1400        {
1401            entry.remove(&Ustr::from(symbol));
1402            if entry.is_empty() {
1403                drop(entry);
1404                subs.remove(&Ustr::from(channel));
1405            }
1406        }
1407
1408        // Build restoration topics after unsubscribe
1409        let mut topics_to_restore = Vec::new();
1410
1411        for entry in subs.iter() {
1412            let (channel, symbols) = entry.pair();
1413            for symbol in symbols {
1414                if symbol.is_empty() {
1415                    topics_to_restore.push(channel.to_string());
1416                } else {
1417                    topics_to_restore.push(format!("{channel}:{symbol}"));
1418                }
1419            }
1420        }
1421
1422        // Should have XBTUSD trade but not ETHUSD trade
1423        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1424        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1425        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1426
1427        assert!(topics_to_restore.contains(&trade_xbt));
1428        assert!(!topics_to_restore.contains(&trade_eth));
1429        assert!(topics_to_restore.contains(&book_xbt));
1430        assert_eq!(topics_to_restore.len(), 2);
1431    }
1432
1433    #[rstest]
1434    fn test_race_unsubscribe_failure_recovery() {
1435        // Simulates the race condition where venue rejects an unsubscribe request.
1436        // The adapter must perform the 3-step recovery:
1437        // 1. confirm_unsubscribe() - clear pending_unsubscribe
1438        // 2. mark_subscribe() - mark as subscribing again
1439        // 3. confirm_subscribe() - restore to confirmed state
1440        let client = BitmexWebSocketClient::new(
1441            Some("ws://test.com".to_string()),
1442            None,
1443            None,
1444            Some(AccountId::new("BITMEX-TEST")),
1445            5,
1446            TransportBackend::default(),
1447            None,
1448        )
1449        .unwrap();
1450
1451        let topic = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1452
1453        // Initial subscribe flow
1454        client.subscriptions.mark_subscribe(&topic);
1455        client.subscriptions.confirm_subscribe(&topic);
1456        assert_eq!(client.subscriptions.len(), 1);
1457
1458        // User unsubscribes
1459        client.subscriptions.mark_unsubscribe(&topic);
1460        assert_eq!(client.subscriptions.len(), 0);
1461        assert_eq!(
1462            client.subscriptions.pending_unsubscribe_topics(),
1463            vec![topic.clone()]
1464        );
1465
1466        // Venue REJECTS the unsubscribe (error message)
1467        // Adapter must perform 3-step recovery (from lines 1884-1891)
1468        client.subscriptions.confirm_unsubscribe(&topic); // Step 1: clear pending_unsubscribe
1469        client.subscriptions.mark_subscribe(&topic); // Step 2: mark as subscribing
1470        client.subscriptions.confirm_subscribe(&topic); // Step 3: confirm subscription
1471
1472        // Verify recovery: topic should be back in confirmed state
1473        assert_eq!(client.subscriptions.len(), 1);
1474        assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1475        assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1476
1477        // Verify topic is in all_topics() for reconnect
1478        let all = client.subscriptions.all_topics();
1479        assert_eq!(all.len(), 1);
1480        assert!(all.contains(&topic));
1481    }
1482
1483    #[rstest]
1484    fn test_race_resubscribe_before_unsubscribe_ack() {
1485        // Simulates: User unsubscribes, then immediately resubscribes before
1486        // the unsubscribe ACK arrives from the venue.
1487        // This is the race condition fixed in the subscription tracker.
1488        let client = BitmexWebSocketClient::new(
1489            Some("ws://test.com".to_string()),
1490            None,
1491            None,
1492            Some(AccountId::new("BITMEX-TEST")),
1493            5,
1494            TransportBackend::default(),
1495            None,
1496        )
1497        .unwrap();
1498
1499        let topic = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1500
1501        // Initial subscribe
1502        client.subscriptions.mark_subscribe(&topic);
1503        client.subscriptions.confirm_subscribe(&topic);
1504        assert_eq!(client.subscriptions.len(), 1);
1505
1506        // User unsubscribes
1507        client.subscriptions.mark_unsubscribe(&topic);
1508        assert_eq!(client.subscriptions.len(), 0);
1509        assert_eq!(
1510            client.subscriptions.pending_unsubscribe_topics(),
1511            vec![topic.clone()]
1512        );
1513
1514        // User immediately changes mind and resubscribes (before unsubscribe ACK)
1515        client.subscriptions.mark_subscribe(&topic);
1516        assert_eq!(
1517            client.subscriptions.pending_subscribe_topics(),
1518            vec![topic.clone()]
1519        );
1520
1521        // NOW the unsubscribe ACK arrives - should NOT clear pending_subscribe
1522        client.subscriptions.confirm_unsubscribe(&topic);
1523        assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1524        assert_eq!(
1525            client.subscriptions.pending_subscribe_topics(),
1526            vec![topic.clone()]
1527        );
1528
1529        // Subscribe ACK arrives
1530        client.subscriptions.confirm_subscribe(&topic);
1531        assert_eq!(client.subscriptions.len(), 1);
1532        assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1533
1534        // Verify final state is correct
1535        let all = client.subscriptions.all_topics();
1536        assert_eq!(all.len(), 1);
1537        assert!(all.contains(&topic));
1538    }
1539
1540    #[rstest]
1541    fn test_race_channel_level_reconnection_with_pending_states() {
1542        // Simulates reconnection with mixed pending states including channel-level subscriptions.
1543        let client = BitmexWebSocketClient::new(
1544            Some("ws://test.com".to_string()),
1545            Some("test_key".to_string()),
1546            Some("test_secret".to_string()),
1547            Some(AccountId::new("BITMEX-TEST")),
1548            5,
1549            TransportBackend::default(),
1550            None,
1551        )
1552        .unwrap();
1553
1554        // Set up mixed state before reconnection
1555        // Confirmed: trade:XBTUSD
1556        let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1557        client.subscriptions.mark_subscribe(&trade_xbt);
1558        client.subscriptions.confirm_subscribe(&trade_xbt);
1559
1560        // Confirmed: order (channel-level, no symbol)
1561        let order_channel = BitmexWsAuthChannel::Order.as_ref();
1562        client.subscriptions.mark_subscribe(order_channel);
1563        client.subscriptions.confirm_subscribe(order_channel);
1564
1565        // Pending subscribe: trade:ETHUSD
1566        let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1567        client.subscriptions.mark_subscribe(&trade_eth);
1568
1569        // Pending unsubscribe: orderBookL2:XBTUSD (user cancelled)
1570        let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1571        client.subscriptions.mark_subscribe(&book_xbt);
1572        client.subscriptions.confirm_subscribe(&book_xbt);
1573        client.subscriptions.mark_unsubscribe(&book_xbt);
1574
1575        // Get topics for reconnection
1576        let topics_to_restore = client.subscriptions.all_topics();
1577
1578        // Should include: confirmed + pending_subscribe (NOT pending_unsubscribe)
1579        assert_eq!(topics_to_restore.len(), 3);
1580        assert!(topics_to_restore.contains(&trade_xbt));
1581        assert!(topics_to_restore.contains(&order_channel.to_string()));
1582        assert!(topics_to_restore.contains(&trade_eth));
1583        assert!(!topics_to_restore.contains(&book_xbt)); // Excluded
1584
1585        // Verify channel-level marker is handled correctly
1586        // order channel should not have ':' delimiter
1587        for topic in &topics_to_restore {
1588            if topic == order_channel {
1589                assert!(
1590                    !topic.contains(':'),
1591                    "Channel-level topic should not have delimiter"
1592                );
1593            }
1594        }
1595    }
1596}