1use std::{
24 sync::{
25 Arc,
26 atomic::{AtomicBool, AtomicU8, Ordering},
27 },
28 time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::live::get_runtime;
35use nautilus_core::{
36 consts::NAUTILUS_USER_AGENT,
37 env::{get_env_var, get_or_env_var_opt},
38};
39use nautilus_model::{
40 data::bar::BarType,
41 identifiers::{AccountId, InstrumentId},
42 instruments::{Instrument, InstrumentAny},
43};
44use nautilus_network::{
45 http::USER_AGENT,
46 mode::ConnectionMode,
47 websocket::{
48 AUTHENTICATION_TIMEOUT_SECS, AuthTracker, PingHandler, SubscriptionState, TransportBackend,
49 WebSocketClient, WebSocketConfig, channel_message_handler,
50 },
51};
52use tokio_tungstenite::tungstenite::Message;
53use ustr::Ustr;
54
55use super::{
56 enums::{BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic},
57 error::BitmexWsError,
58 handler::{BitmexWsFeedHandler, HandlerCommand},
59 messages::{BitmexAuthentication, BitmexSubscription, BitmexWsMessage},
60 parse::{is_index_symbol, topic_from_bar_spec},
61};
62use crate::common::{
63 consts::{BITMEX_WS_TOPIC_DELIMITER, BITMEX_WS_URL},
64 credential::{Credential, credential_env_vars},
65 enums::BitmexEnvironment,
66};
67
68#[derive(Clone, Debug)]
76pub struct BitmexWebSocketClient {
77 url: String,
78 credential: Option<Credential>,
79 heartbeat: Option<u64>,
80 account_id: AccountId,
81 auth_tracker: AuthTracker,
82 signal: Arc<AtomicBool>,
83 connection_mode: Arc<ArcSwap<AtomicU8>>,
84 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
85 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<BitmexWsMessage>>>,
86 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
87 subscriptions: SubscriptionState,
88 tracked_subscriptions: Arc<DashMap<String, ()>>,
89 instruments: Arc<DashMap<Ustr, InstrumentAny>>,
90 transport_backend: TransportBackend,
91 proxy_url: Option<String>,
92}
93
94impl BitmexWebSocketClient {
95 pub fn new(
101 url: Option<String>,
102 api_key: Option<String>,
103 api_secret: Option<String>,
104 account_id: Option<AccountId>,
105 heartbeat: u64,
106 transport_backend: TransportBackend,
107 proxy_url: Option<String>,
108 ) -> anyhow::Result<Self> {
109 let credential = match (api_key, api_secret) {
110 (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
111 (None, None) => None,
112 _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
113 };
114
115 let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
116
117 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
118 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
119
120 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
122
123 Ok(Self {
124 url: url.unwrap_or(BITMEX_WS_URL.to_string()),
125 credential,
126 heartbeat: Some(heartbeat),
127 account_id,
128 auth_tracker: AuthTracker::new(),
129 signal: Arc::new(AtomicBool::new(false)),
130 connection_mode,
131 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
132 out_rx: None,
133 task_handle: None,
134 subscriptions: SubscriptionState::new(BITMEX_WS_TOPIC_DELIMITER),
135 tracked_subscriptions: Arc::new(DashMap::new()),
136 instruments: Arc::new(DashMap::new()),
137 transport_backend,
138 proxy_url,
139 })
140 }
141
142 #[expect(clippy::too_many_arguments)]
153 pub fn new_with_env(
154 url: Option<String>,
155 api_key: Option<String>,
156 api_secret: Option<String>,
157 account_id: Option<AccountId>,
158 heartbeat: u64,
159 environment: BitmexEnvironment,
160 transport_backend: TransportBackend,
161 proxy_url: Option<String>,
162 ) -> anyhow::Result<Self> {
163 let (api_key_env, api_secret_env) = credential_env_vars(environment);
164
165 let key = get_or_env_var_opt(api_key, api_key_env);
166 let secret = get_or_env_var_opt(api_secret, api_secret_env);
167
168 Self::new(
169 url,
170 key,
171 secret,
172 account_id,
173 heartbeat,
174 transport_backend,
175 proxy_url,
176 )
177 }
178
179 pub fn from_env() -> anyhow::Result<Self> {
185 let url = get_env_var("BITMEX_WS_URL")?;
186 let (key_var, secret_var) = credential_env_vars(BitmexEnvironment::Mainnet);
187 let api_key = get_env_var(key_var)?;
188 let api_secret = get_env_var(secret_var)?;
189
190 Self::new(
191 Some(url),
192 Some(api_key),
193 Some(api_secret),
194 None,
195 5,
196 TransportBackend::default(),
197 None,
198 )
199 }
200
201 #[must_use]
203 pub const fn url(&self) -> &str {
204 self.url.as_str()
205 }
206
207 #[must_use]
209 pub fn api_key(&self) -> Option<&str> {
210 self.credential.as_ref().map(|c| c.api_key())
211 }
212
213 #[must_use]
215 pub fn api_key_masked(&self) -> Option<String> {
216 self.credential.as_ref().map(|c| c.api_key_masked())
217 }
218
219 #[must_use]
221 pub fn is_active(&self) -> bool {
222 let connection_mode_arc = self.connection_mode.load();
223 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
224 && !self.signal.load(Ordering::Relaxed)
225 }
226
227 #[must_use]
229 pub fn is_closed(&self) -> bool {
230 let connection_mode_arc = self.connection_mode.load();
231 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
232 || self.signal.load(Ordering::Relaxed)
233 }
234
235 #[must_use]
237 pub fn account_id(&self) -> AccountId {
238 self.account_id
239 }
240
241 pub fn set_account_id(&mut self, account_id: AccountId) {
243 self.account_id = account_id;
244 }
245
246 pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
248 self.instruments.clear();
249 for inst in instruments {
250 self.instruments
251 .insert(inst.raw_symbol().inner(), inst.clone());
252 }
253 }
254
255 pub fn cache_instrument(&self, instrument: InstrumentAny) {
257 self.instruments
258 .insert(instrument.raw_symbol().inner(), instrument);
259 }
260
261 #[must_use]
263 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
264 self.instruments
265 .get(symbol)
266 .map(|entry| entry.value().clone())
267 }
268
269 pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
276 let (client, raw_rx) = self.connect_inner().await?;
277
278 self.signal.store(false, Ordering::Relaxed);
280
281 self.connection_mode.store(client.connection_mode_atomic());
283
284 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<BitmexWsMessage>();
285 self.out_rx = Some(Arc::new(out_rx));
286
287 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
288 *self.cmd_tx.write().await = cmd_tx.clone();
289
290 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
292 return Err(BitmexWsError::ClientError(format!(
293 "Failed to send WebSocketClient to handler: {e}"
294 )));
295 }
296
297 let signal = self.signal.clone();
298 let credential = self.credential.clone();
299 let auth_tracker = self.auth_tracker.clone();
300 let subscriptions = self.subscriptions.clone();
301 let cmd_tx_for_reconnect = cmd_tx.clone();
302
303 let stream_handle = get_runtime().spawn(async move {
304 let mut handler = BitmexWsFeedHandler::new(
305 signal.clone(),
306 cmd_rx,
307 raw_rx,
308 out_tx,
309 auth_tracker.clone(),
310 subscriptions.clone(),
311 );
312
313 let resubscribe_all = || {
315 let topics = subscriptions.all_topics();
317
318 if topics.is_empty() {
319 return;
320 }
321
322 log::debug!(
323 "Resubscribing to confirmed subscriptions: count={}",
324 topics.len()
325 );
326
327 for topic in &topics {
328 subscriptions.mark_subscribe(topic.as_str());
329 }
330
331 let mut payloads = Vec::with_capacity(topics.len());
333 for topic in &topics {
334 let message = BitmexSubscription {
335 op: BitmexWsOperation::Subscribe,
336 args: vec![Ustr::from(topic.as_ref())],
337 };
338
339 if let Ok(payload) = serde_json::to_string(&message) {
340 payloads.push(payload);
341 }
342 }
343
344 if let Err(e) =
345 cmd_tx_for_reconnect.send(HandlerCommand::Subscribe { topics: payloads })
346 {
347 log::error!("Failed to send resubscribe command: {e}");
348 }
349 };
350
351 loop {
353 match handler.next().await {
354 Some(BitmexWsMessage::Reconnected) => {
355 if signal.load(Ordering::Relaxed) {
356 continue;
357 }
358
359 log::info!("WebSocket reconnected");
360
361 let confirmed_topics: Vec<String> = {
363 let confirmed = subscriptions.confirmed();
364 let mut topics = Vec::new();
365
366 for entry in confirmed.iter() {
367 let (channel, symbols) = entry.pair();
368
369 if *channel == BitmexWsTopic::Instrument.as_ref() {
370 continue;
371 }
372
373 for symbol in symbols {
374 if symbol.is_empty() {
375 topics.push(channel.to_string());
376 } else {
377 topics.push(format!("{channel}:{symbol}"));
378 }
379 }
380 }
381
382 topics
383 };
384
385 if !confirmed_topics.is_empty() {
386 log::debug!(
387 "Marking confirmed subscriptions as pending for replay: count={}",
388 confirmed_topics.len()
389 );
390
391 for topic in confirmed_topics {
392 subscriptions.mark_failure(&topic);
393 }
394 }
395
396 if let Some(cred) = &credential {
397 log::debug!("Re-authenticating after reconnection");
398
399 let expires =
400 (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
401 let signature = cred.sign("GET", "/realtime", expires, "");
402
403 let auth_message = BitmexAuthentication {
404 op: BitmexWsAuthAction::AuthKeyExpires,
405 args: (cred.api_key().to_string(), expires, signature),
406 };
407
408 if let Ok(payload) = serde_json::to_string(&auth_message) {
409 if let Err(e) = cmd_tx_for_reconnect
410 .send(HandlerCommand::Authenticate { payload })
411 {
412 log::error!("Failed to send reconnection auth command: {e}");
413 }
414 } else {
415 log::error!("Failed to serialize reconnection auth message");
416 }
417 }
418
419 if credential.is_none() {
422 log::debug!("No authentication required, resubscribing immediately");
423 resubscribe_all();
424 }
425
426 if handler.send(BitmexWsMessage::Reconnected).is_err() {
427 log::error!("Failed to forward reconnect event (receiver dropped)");
428 break;
429 }
430 }
431 Some(BitmexWsMessage::Authenticated) => {
432 log::debug!("Authenticated after reconnection, resubscribing");
433 resubscribe_all();
434 }
435 Some(msg) => {
436 if handler.send(msg).is_err() {
437 log::error!("Failed to send message (receiver dropped)");
438 break;
439 }
440 }
441 None => {
442 if handler.is_stopped() {
444 log::debug!("Stop signal received, ending message processing");
445 break;
446 }
447 log::warn!("WebSocket stream ended unexpectedly");
449 break;
450 }
451 }
452 }
453
454 log::debug!("Handler task exiting");
455 });
456
457 self.task_handle = Some(Arc::new(stream_handle));
458
459 if self.credential.is_some()
460 && let Err(e) = self.authenticate().await
461 {
462 if let Some(handle) = self.task_handle.take() {
463 handle.abort();
464 }
465 self.signal.store(true, Ordering::Relaxed);
466 return Err(e);
467 }
468
469 let instrument_topic = BitmexWsTopic::Instrument.as_ref().to_string();
471 self.subscriptions.mark_subscribe(&instrument_topic);
472 self.tracked_subscriptions.insert(instrument_topic, ());
473
474 let subscribe_msg = BitmexSubscription {
475 op: BitmexWsOperation::Subscribe,
476 args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
477 };
478
479 match serde_json::to_string(&subscribe_msg) {
480 Ok(subscribe_json) => {
481 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
482 topics: vec![subscribe_json],
483 }) {
484 log::error!("Failed to send subscribe command for instruments: {e}");
485 } else {
486 log::debug!("Subscribed to all instruments");
487 }
488 }
489 Err(e) => {
490 log::error!("Failed to serialize subscribe message: {e}");
491 }
492 }
493
494 Ok(())
495 }
496
497 async fn connect_inner(
503 &self,
504 ) -> Result<
505 (
506 WebSocketClient,
507 tokio::sync::mpsc::UnboundedReceiver<Message>,
508 ),
509 BitmexWsError,
510 > {
511 let (message_handler, rx) = channel_message_handler();
512
513 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
516 });
518
519 let config = WebSocketConfig {
520 url: self.url.clone(),
521 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
522 heartbeat: self.heartbeat,
523 heartbeat_msg: None,
524 reconnect_timeout_ms: Some(5_000),
525 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, reconnect_max_attempts: None,
530 idle_timeout_ms: None,
531 backend: self.transport_backend,
532 proxy_url: self.proxy_url.clone(),
533 };
534
535 let keyed_quotas = vec![];
536 let client = WebSocketClient::connect(
537 config,
538 Some(message_handler),
539 Some(ping_handler),
540 None, keyed_quotas,
542 None, )
544 .await
545 .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
546
547 Ok((client, rx))
548 }
549
550 async fn authenticate(&self) -> Result<(), BitmexWsError> {
557 let credential = match &self.credential {
558 Some(credential) => credential,
559 None => {
560 return Err(BitmexWsError::AuthenticationError(
561 "API credentials not available to authenticate".to_string(),
562 ));
563 }
564 };
565
566 let receiver = self.auth_tracker.begin();
567
568 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
569 let signature = credential.sign("GET", "/realtime", expires, "");
570
571 let auth_message = BitmexAuthentication {
572 op: BitmexWsAuthAction::AuthKeyExpires,
573 args: (credential.api_key().to_string(), expires, signature),
574 };
575
576 let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
577 let msg = format!("Failed to serialize auth message: {e}");
578 self.auth_tracker.fail(msg.clone());
579 BitmexWsError::AuthenticationError(msg)
580 })?;
581
582 self.cmd_tx
584 .read()
585 .await
586 .send(HandlerCommand::Authenticate { payload: auth_json })
587 .map_err(|e| {
588 let msg = format!("Failed to send authenticate command: {e}");
589 self.auth_tracker.fail(msg.clone());
590 BitmexWsError::AuthenticationError(msg)
591 })?;
592
593 self.auth_tracker
594 .wait_for_result::<BitmexWsError>(
595 Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
596 receiver,
597 )
598 .await
599 }
600
601 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
607 let timeout = Duration::from_secs_f64(timeout_secs);
608
609 tokio::time::timeout(timeout, async {
610 while !self.is_active() {
611 tokio::time::sleep(Duration::from_millis(10)).await;
612 }
613 })
614 .await
615 .map_err(|_| {
616 BitmexWsError::ClientError(format!(
617 "WebSocket connection timeout after {timeout_secs} seconds"
618 ))
619 })?;
620
621 Ok(())
622 }
623
624 pub fn stream(&mut self) -> impl Stream<Item = BitmexWsMessage> + use<> {
632 let rx = self
633 .out_rx
634 .take()
635 .expect("Stream receiver already taken or not connected");
636 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
637 async_stream::stream! {
638 while let Some(msg) = rx.recv().await {
639 yield msg;
640 }
641 }
642 }
643
644 pub async fn close(&mut self) -> Result<(), BitmexWsError> {
650 log::debug!("Starting close process");
651
652 self.signal.store(true, Ordering::Relaxed);
653
654 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
656 log::debug!(
657 "Failed to send disconnect command (handler may already be shut down): {e}"
658 );
659 }
660
661 if let Some(task_handle) = self.task_handle.take() {
663 match Arc::try_unwrap(task_handle) {
664 Ok(handle) => {
665 log::debug!("Waiting for task handle to complete");
666 match tokio::time::timeout(Duration::from_secs(2), handle).await {
667 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
668 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
669 Err(_) => {
670 log::warn!(
671 "Timeout waiting for task handle, task may still be running"
672 );
673 }
675 }
676 }
677 Err(arc_handle) => {
678 log::debug!(
679 "Cannot take ownership of task handle - other references exist, aborting task"
680 );
681 arc_handle.abort();
682 }
683 }
684 } else {
685 log::debug!("No task handle to await");
686 }
687
688 log::debug!("Closed");
689
690 Ok(())
691 }
692
693 pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
699 log::debug!("Subscribing to topics: {topics:?}");
700
701 for topic in &topics {
702 self.subscriptions.mark_subscribe(topic.as_str());
703 self.tracked_subscriptions.insert(topic.clone(), ());
704 }
705
706 let mut payloads = Vec::with_capacity(topics.len());
708 for topic in &topics {
709 let message = BitmexSubscription {
710 op: BitmexWsOperation::Subscribe,
711 args: vec![Ustr::from(topic.as_ref())],
712 };
713 let payload = serde_json::to_string(&message).map_err(|e| {
714 BitmexWsError::SubscriptionError(format!("Failed to serialize subscription: {e}"))
715 })?;
716 payloads.push(payload);
717 }
718
719 let cmd = HandlerCommand::Subscribe { topics: payloads };
721
722 self.send_cmd(cmd).await.map_err(|e| {
723 BitmexWsError::SubscriptionError(format!("Failed to send subscribe command: {e}"))
724 })
725 }
726
727 async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
733 log::debug!("Attempting to unsubscribe from topics: {topics:?}");
734
735 if self.signal.load(Ordering::Relaxed) {
736 log::debug!("Shutdown signal detected, skipping unsubscribe");
737 return Ok(());
738 }
739
740 for topic in &topics {
741 self.subscriptions.mark_unsubscribe(topic.as_str());
742 self.tracked_subscriptions.remove(topic);
743 }
744
745 let mut payloads = Vec::with_capacity(topics.len());
747 for topic in &topics {
748 let message = BitmexSubscription {
749 op: BitmexWsOperation::Unsubscribe,
750 args: vec![Ustr::from(topic.as_ref())],
751 };
752
753 if let Ok(payload) = serde_json::to_string(&message) {
754 payloads.push(payload);
755 }
756 }
757
758 let cmd = HandlerCommand::Unsubscribe { topics: payloads };
760
761 if let Err(e) = self.send_cmd(cmd).await {
762 log::debug!("Failed to send unsubscribe command: {e}");
763 }
764
765 Ok(())
766 }
767
768 #[must_use]
770 pub fn subscription_count(&self) -> usize {
771 self.subscriptions.len()
772 }
773
774 pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
775 let symbol = instrument_id.symbol.inner();
776 let confirmed = self.subscriptions.confirmed();
777 let mut channels = Vec::with_capacity(confirmed.len());
778
779 for entry in confirmed.iter() {
780 let (channel, symbols) = entry.pair();
781 if symbols.contains(&symbol) {
782 channels.push(format!("{channel}:{symbol}"));
784 } else {
785 let has_channel_marker = symbols.iter().any(|s| s.is_empty());
786 if has_channel_marker
787 && (*channel == BitmexWsAuthChannel::Execution.as_ref()
788 || *channel == BitmexWsAuthChannel::Order.as_ref())
789 {
790 channels.push(channel.to_string());
792 }
793 }
794 }
795
796 channels
797 }
798
799 pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
805 log::debug!("Already subscribed to all instruments on connection, skipping");
807 Ok(())
808 }
809
810 pub async fn subscribe_instrument(
816 &self,
817 instrument_id: InstrumentId,
818 ) -> Result<(), BitmexWsError> {
819 log::debug!(
821 "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
822 );
823 Ok(())
824 }
825
826 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
832 let topic = BitmexWsTopic::OrderBookL2;
833 let symbol = instrument_id.symbol.inner();
834 self.subscribe(vec![format!("{topic}:{symbol}")]).await
835 }
836
837 pub async fn subscribe_book_25(
843 &self,
844 instrument_id: InstrumentId,
845 ) -> Result<(), BitmexWsError> {
846 let topic = BitmexWsTopic::OrderBookL2_25;
847 let symbol = instrument_id.symbol.inner();
848 self.subscribe(vec![format!("{topic}:{symbol}")]).await
849 }
850
851 pub async fn subscribe_book_depth10(
857 &self,
858 instrument_id: InstrumentId,
859 ) -> Result<(), BitmexWsError> {
860 let topic = BitmexWsTopic::OrderBook10;
861 let symbol = instrument_id.symbol.inner();
862 self.subscribe(vec![format!("{topic}:{symbol}")]).await
863 }
864
865 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
873 let symbol = instrument_id.symbol.inner();
874
875 if is_index_symbol(&instrument_id.symbol.inner()) {
877 log::warn!("Ignoring quote subscription for index symbol: {symbol}");
878 return Ok(());
879 }
880
881 let topic = BitmexWsTopic::Quote;
882 self.subscribe(vec![format!("{topic}:{symbol}")]).await
883 }
884
885 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
893 let symbol = instrument_id.symbol.inner();
894
895 if is_index_symbol(&symbol) {
897 log::warn!("Ignoring trade subscription for index symbol: {symbol}");
898 return Ok(());
899 }
900
901 let topic = BitmexWsTopic::Trade;
902 self.subscribe(vec![format!("{topic}:{symbol}")]).await
903 }
904
905 pub async fn subscribe_mark_prices(
911 &self,
912 instrument_id: InstrumentId,
913 ) -> Result<(), BitmexWsError> {
914 self.subscribe_instrument(instrument_id).await
915 }
916
917 pub async fn subscribe_index_prices(
923 &self,
924 instrument_id: InstrumentId,
925 ) -> Result<(), BitmexWsError> {
926 self.subscribe_instrument(instrument_id).await
927 }
928
929 pub async fn subscribe_funding_rates(
935 &self,
936 instrument_id: InstrumentId,
937 ) -> Result<(), BitmexWsError> {
938 let topic = BitmexWsTopic::Funding;
939 let symbol = instrument_id.symbol.inner();
940 self.subscribe(vec![format!("{topic}:{symbol}")]).await
941 }
942
943 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
949 let topic = topic_from_bar_spec(bar_type.spec());
950 let symbol = bar_type.instrument_id().symbol.inner();
951 self.subscribe(vec![format!("{topic}:{symbol}")]).await
952 }
953
954 pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
960 log::debug!(
962 "Instruments subscription maintained for proper operation, skipping unsubscribe"
963 );
964 Ok(())
965 }
966
967 pub async fn unsubscribe_instrument(
973 &self,
974 instrument_id: InstrumentId,
975 ) -> Result<(), BitmexWsError> {
976 log::debug!(
978 "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
979 );
980 Ok(())
981 }
982
983 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
989 let topic = BitmexWsTopic::OrderBookL2;
990 let symbol = instrument_id.symbol.inner();
991 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
992 }
993
994 pub async fn unsubscribe_book_25(
1000 &self,
1001 instrument_id: InstrumentId,
1002 ) -> Result<(), BitmexWsError> {
1003 let topic = BitmexWsTopic::OrderBookL2_25;
1004 let symbol = instrument_id.symbol.inner();
1005 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1006 }
1007
1008 pub async fn unsubscribe_book_depth10(
1014 &self,
1015 instrument_id: InstrumentId,
1016 ) -> Result<(), BitmexWsError> {
1017 let topic = BitmexWsTopic::OrderBook10;
1018 let symbol = instrument_id.symbol.inner();
1019 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1020 }
1021
1022 pub async fn unsubscribe_quotes(
1028 &self,
1029 instrument_id: InstrumentId,
1030 ) -> Result<(), BitmexWsError> {
1031 let symbol = instrument_id.symbol.inner();
1032
1033 if is_index_symbol(&symbol) {
1035 return Ok(());
1036 }
1037
1038 let topic = BitmexWsTopic::Quote;
1039 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1040 }
1041
1042 pub async fn unsubscribe_trades(
1048 &self,
1049 instrument_id: InstrumentId,
1050 ) -> Result<(), BitmexWsError> {
1051 let symbol = instrument_id.symbol.inner();
1052
1053 if is_index_symbol(&symbol) {
1055 return Ok(());
1056 }
1057
1058 let topic = BitmexWsTopic::Trade;
1059 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1060 }
1061
1062 pub async fn unsubscribe_mark_prices(
1068 &self,
1069 instrument_id: InstrumentId,
1070 ) -> Result<(), BitmexWsError> {
1071 log::debug!(
1073 "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1074 );
1075 Ok(())
1076 }
1077
1078 pub async fn unsubscribe_index_prices(
1084 &self,
1085 instrument_id: InstrumentId,
1086 ) -> Result<(), BitmexWsError> {
1087 log::debug!(
1089 "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1090 );
1091 Ok(())
1092 }
1093
1094 pub async fn unsubscribe_funding_rates(
1100 &self,
1101 instrument_id: InstrumentId,
1102 ) -> Result<(), BitmexWsError> {
1103 log::debug!(
1105 "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1106 );
1107 Ok(())
1108 }
1109
1110 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1116 let topic = topic_from_bar_spec(bar_type.spec());
1117 let symbol = bar_type.instrument_id().symbol.inner();
1118 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1119 }
1120
1121 pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1127 if self.credential.is_none() {
1128 return Err(BitmexWsError::MissingCredentials);
1129 }
1130 self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1131 .await
1132 }
1133
1134 pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1140 if self.credential.is_none() {
1141 return Err(BitmexWsError::MissingCredentials);
1142 }
1143 self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1144 .await
1145 }
1146
1147 pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1153 if self.credential.is_none() {
1154 return Err(BitmexWsError::MissingCredentials);
1155 }
1156 self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1157 .await
1158 }
1159
1160 pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1166 if self.credential.is_none() {
1167 return Err(BitmexWsError::MissingCredentials);
1168 }
1169 self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1170 .await
1171 }
1172
1173 pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1179 if self.credential.is_none() {
1180 return Err(BitmexWsError::MissingCredentials);
1181 }
1182 self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1183 .await
1184 }
1185
1186 pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1192 self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1193 .await
1194 }
1195
1196 pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1202 self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1203 .await
1204 }
1205
1206 pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1212 self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1213 .await
1214 }
1215
1216 pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1222 self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1223 .await
1224 }
1225
1226 pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1232 self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1233 .await
1234 }
1235
1236 async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), BitmexWsError> {
1238 self.cmd_tx
1239 .read()
1240 .await
1241 .send(cmd)
1242 .map_err(|e| BitmexWsError::ClientError(format!("Handler not available: {e}")))
1243 }
1244}
1245
1246#[cfg(test)]
1247mod tests {
1248 use ahash::AHashSet;
1249 use rstest::rstest;
1250 use ustr::Ustr;
1251
1252 use super::*;
1253
1254 #[rstest]
1255 fn test_reconnect_topics_restoration_logic() {
1256 let client = BitmexWebSocketClient::new(
1258 Some("ws://test.com".to_string()),
1259 Some("test_key".to_string()),
1260 Some("test_secret".to_string()),
1261 Some(AccountId::new("BITMEX-TEST")),
1262 5,
1263 TransportBackend::default(),
1264 None,
1265 )
1266 .unwrap();
1267
1268 let subs = client.subscriptions.confirmed();
1270 subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1271 let mut set = AHashSet::new();
1272 set.insert(Ustr::from("XBTUSD"));
1273 set.insert(Ustr::from("ETHUSD"));
1274 set
1275 });
1276
1277 subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1278 let mut set = AHashSet::new();
1279 set.insert(Ustr::from("XBTUSD"));
1280 set
1281 });
1282
1283 subs.insert(Ustr::from(BitmexWsAuthChannel::Order.as_ref()), {
1285 let mut set = AHashSet::new();
1286 set.insert(Ustr::from(""));
1287 set
1288 });
1289 subs.insert(Ustr::from(BitmexWsAuthChannel::Position.as_ref()), {
1290 let mut set = AHashSet::new();
1291 set.insert(Ustr::from(""));
1292 set
1293 });
1294
1295 let mut topics_to_restore = Vec::new();
1297
1298 for entry in subs.iter() {
1299 let (channel, symbols) = entry.pair();
1300 for symbol in symbols {
1301 if symbol.is_empty() {
1302 topics_to_restore.push(channel.to_string());
1303 } else {
1304 topics_to_restore.push(format!("{channel}:{symbol}"));
1305 }
1306 }
1307 }
1308
1309 assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1311 assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1312 assert!(
1313 topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1314 );
1315 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1316 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1317 assert_eq!(topics_to_restore.len(), 5);
1318 }
1319
1320 #[rstest]
1321 fn test_reconnect_auth_message_building() {
1322 let client_with_creds = BitmexWebSocketClient::new(
1324 Some("ws://test.com".to_string()),
1325 Some("test_key".to_string()),
1326 Some("test_secret".to_string()),
1327 Some(AccountId::new("BITMEX-TEST")),
1328 5,
1329 TransportBackend::default(),
1330 None,
1331 )
1332 .unwrap();
1333
1334 if let Some(cred) = &client_with_creds.credential {
1336 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1337 let signature = cred.sign("GET", "/realtime", expires, "");
1338
1339 let auth_message = BitmexAuthentication {
1340 op: BitmexWsAuthAction::AuthKeyExpires,
1341 args: (cred.api_key().to_string(), expires, signature),
1342 };
1343
1344 assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1346 assert_eq!(auth_message.args.0, "test_key");
1347 assert!(auth_message.args.1 > 0); assert!(!auth_message.args.2.is_empty()); } else {
1350 panic!("Client should have credentials");
1351 }
1352
1353 let client_no_creds = BitmexWebSocketClient::new(
1355 Some("ws://test.com".to_string()),
1356 None,
1357 None,
1358 Some(AccountId::new("BITMEX-TEST")),
1359 5,
1360 TransportBackend::default(),
1361 None,
1362 )
1363 .unwrap();
1364
1365 assert!(client_no_creds.credential.is_none());
1366 }
1367
1368 #[rstest]
1369 fn test_subscription_state_after_unsubscribe() {
1370 let client = BitmexWebSocketClient::new(
1371 Some("ws://test.com".to_string()),
1372 Some("test_key".to_string()),
1373 Some("test_secret".to_string()),
1374 Some(AccountId::new("BITMEX-TEST")),
1375 5,
1376 TransportBackend::default(),
1377 None,
1378 )
1379 .unwrap();
1380
1381 let subs = client.subscriptions.confirmed();
1383 subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1384 let mut set = AHashSet::new();
1385 set.insert(Ustr::from("XBTUSD"));
1386 set.insert(Ustr::from("ETHUSD"));
1387 set
1388 });
1389
1390 subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1391 let mut set = AHashSet::new();
1392 set.insert(Ustr::from("XBTUSD"));
1393 set
1394 });
1395
1396 let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1398 if let Some((channel, symbol)) = topic.split_once(':')
1399 && let Some(mut entry) = subs.get_mut(&Ustr::from(channel))
1400 {
1401 entry.remove(&Ustr::from(symbol));
1402 if entry.is_empty() {
1403 drop(entry);
1404 subs.remove(&Ustr::from(channel));
1405 }
1406 }
1407
1408 let mut topics_to_restore = Vec::new();
1410
1411 for entry in subs.iter() {
1412 let (channel, symbols) = entry.pair();
1413 for symbol in symbols {
1414 if symbol.is_empty() {
1415 topics_to_restore.push(channel.to_string());
1416 } else {
1417 topics_to_restore.push(format!("{channel}:{symbol}"));
1418 }
1419 }
1420 }
1421
1422 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1424 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1425 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1426
1427 assert!(topics_to_restore.contains(&trade_xbt));
1428 assert!(!topics_to_restore.contains(&trade_eth));
1429 assert!(topics_to_restore.contains(&book_xbt));
1430 assert_eq!(topics_to_restore.len(), 2);
1431 }
1432
1433 #[rstest]
1434 fn test_race_unsubscribe_failure_recovery() {
1435 let client = BitmexWebSocketClient::new(
1441 Some("ws://test.com".to_string()),
1442 None,
1443 None,
1444 Some(AccountId::new("BITMEX-TEST")),
1445 5,
1446 TransportBackend::default(),
1447 None,
1448 )
1449 .unwrap();
1450
1451 let topic = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1452
1453 client.subscriptions.mark_subscribe(&topic);
1455 client.subscriptions.confirm_subscribe(&topic);
1456 assert_eq!(client.subscriptions.len(), 1);
1457
1458 client.subscriptions.mark_unsubscribe(&topic);
1460 assert_eq!(client.subscriptions.len(), 0);
1461 assert_eq!(
1462 client.subscriptions.pending_unsubscribe_topics(),
1463 vec![topic.clone()]
1464 );
1465
1466 client.subscriptions.confirm_unsubscribe(&topic); client.subscriptions.mark_subscribe(&topic); client.subscriptions.confirm_subscribe(&topic); assert_eq!(client.subscriptions.len(), 1);
1474 assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1475 assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1476
1477 let all = client.subscriptions.all_topics();
1479 assert_eq!(all.len(), 1);
1480 assert!(all.contains(&topic));
1481 }
1482
1483 #[rstest]
1484 fn test_race_resubscribe_before_unsubscribe_ack() {
1485 let client = BitmexWebSocketClient::new(
1489 Some("ws://test.com".to_string()),
1490 None,
1491 None,
1492 Some(AccountId::new("BITMEX-TEST")),
1493 5,
1494 TransportBackend::default(),
1495 None,
1496 )
1497 .unwrap();
1498
1499 let topic = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1500
1501 client.subscriptions.mark_subscribe(&topic);
1503 client.subscriptions.confirm_subscribe(&topic);
1504 assert_eq!(client.subscriptions.len(), 1);
1505
1506 client.subscriptions.mark_unsubscribe(&topic);
1508 assert_eq!(client.subscriptions.len(), 0);
1509 assert_eq!(
1510 client.subscriptions.pending_unsubscribe_topics(),
1511 vec![topic.clone()]
1512 );
1513
1514 client.subscriptions.mark_subscribe(&topic);
1516 assert_eq!(
1517 client.subscriptions.pending_subscribe_topics(),
1518 vec![topic.clone()]
1519 );
1520
1521 client.subscriptions.confirm_unsubscribe(&topic);
1523 assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1524 assert_eq!(
1525 client.subscriptions.pending_subscribe_topics(),
1526 vec![topic.clone()]
1527 );
1528
1529 client.subscriptions.confirm_subscribe(&topic);
1531 assert_eq!(client.subscriptions.len(), 1);
1532 assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1533
1534 let all = client.subscriptions.all_topics();
1536 assert_eq!(all.len(), 1);
1537 assert!(all.contains(&topic));
1538 }
1539
1540 #[rstest]
1541 fn test_race_channel_level_reconnection_with_pending_states() {
1542 let client = BitmexWebSocketClient::new(
1544 Some("ws://test.com".to_string()),
1545 Some("test_key".to_string()),
1546 Some("test_secret".to_string()),
1547 Some(AccountId::new("BITMEX-TEST")),
1548 5,
1549 TransportBackend::default(),
1550 None,
1551 )
1552 .unwrap();
1553
1554 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1557 client.subscriptions.mark_subscribe(&trade_xbt);
1558 client.subscriptions.confirm_subscribe(&trade_xbt);
1559
1560 let order_channel = BitmexWsAuthChannel::Order.as_ref();
1562 client.subscriptions.mark_subscribe(order_channel);
1563 client.subscriptions.confirm_subscribe(order_channel);
1564
1565 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1567 client.subscriptions.mark_subscribe(&trade_eth);
1568
1569 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1571 client.subscriptions.mark_subscribe(&book_xbt);
1572 client.subscriptions.confirm_subscribe(&book_xbt);
1573 client.subscriptions.mark_unsubscribe(&book_xbt);
1574
1575 let topics_to_restore = client.subscriptions.all_topics();
1577
1578 assert_eq!(topics_to_restore.len(), 3);
1580 assert!(topics_to_restore.contains(&trade_xbt));
1581 assert!(topics_to_restore.contains(&order_channel.to_string()));
1582 assert!(topics_to_restore.contains(&trade_eth));
1583 assert!(!topics_to_restore.contains(&book_xbt)); for topic in &topics_to_restore {
1588 if topic == order_channel {
1589 assert!(
1590 !topic.contains(':'),
1591 "Channel-level topic should not have delimiter"
1592 );
1593 }
1594 }
1595 }
1596}