1use std::{
21 fmt::Debug,
22 sync::{
23 Arc,
24 atomic::{AtomicBool, AtomicU8, Ordering},
25 },
26 time::Duration,
27};
28
29use arc_swap::ArcSwap;
30use dashmap::DashMap;
31use nautilus_common::live::get_runtime;
32use nautilus_core::{AtomicMap, AtomicSet, UUID4, consts::NAUTILUS_USER_AGENT};
33use nautilus_model::{
34 data::BarType,
35 enums::{AggregationSource, OrderSide, OrderType, PriceType, TimeInForce, TriggerType},
36 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
37 instruments::{Instrument, InstrumentAny},
38 types::{Price, Quantity},
39};
40use nautilus_network::{
41 backoff::ExponentialBackoff,
42 mode::ConnectionMode,
43 websocket::{
44 AuthTracker, PingHandler, SubscriptionState, TransportBackend, WebSocketClient,
45 WebSocketConfig, channel_message_handler,
46 },
47};
48use serde_json::Value;
49use tokio_util::sync::CancellationToken;
50use ustr::Ustr;
51
52use crate::{
53 common::{
54 consts::{BYBIT_NAUTILUS_BROKER_ID, BYBIT_WS_TOPIC_DELIMITER},
55 credential::Credential,
56 enums::{
57 BybitEnvironment, BybitOrderSide, BybitOrderType, BybitPositionIdx, BybitProductType,
58 BybitTimeInForce, BybitTpSlMode, BybitWsOrderRequestOp, resolve_trigger_type,
59 },
60 parse::{
61 bar_spec_to_bybit_interval, extract_base_coin, extract_raw_symbol, map_time_in_force,
62 spot_leverage, spot_market_unit, trigger_direction,
63 },
64 symbol::BybitSymbol,
65 urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
66 },
67 websocket::{
68 dispatch::PendingOperation,
69 enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
70 error::{BybitWsError, BybitWsResult},
71 handler::{BybitWsFeedHandler, HandlerCommand},
72 messages::{
73 BybitAuthRequest, BybitSubscription, BybitWsAmendOrderParams, BybitWsBatchCancelItem,
74 BybitWsBatchCancelOrderArgs, BybitWsBatchPlaceItem, BybitWsBatchPlaceOrderArgs,
75 BybitWsCancelOrderParams, BybitWsHeader, BybitWsMessage, BybitWsPlaceOrderParams,
76 BybitWsRequest,
77 },
78 },
79};
80
81const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
82const AUTH_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
83pub const BATCH_PROCESSING_LIMIT: usize = 20;
84
85#[derive(Debug, Clone)]
87pub struct PendingPyRequest {
88 pub client_order_id: ClientOrderId,
89 pub operation: PendingOperation,
90 pub trader_id: TraderId,
91 pub strategy_id: StrategyId,
92 pub instrument_id: InstrumentId,
93 pub venue_order_id: Option<VenueOrderId>,
94}
95
96#[cfg_attr(feature = "python", pyo3::pyclass(from_py_object))]
98#[cfg_attr(
99 feature = "python",
100 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bybit")
101)]
102pub struct BybitWebSocketClient {
103 url: String,
104 environment: BybitEnvironment,
105 product_type: Option<BybitProductType>,
106 credential: Option<Credential>,
107 requires_auth: bool,
108 auth_tracker: AuthTracker,
109 heartbeat: Option<u64>,
110 connection_mode: Arc<ArcSwap<AtomicU8>>,
111 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
112 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<BybitWsMessage>>>,
113 signal: Arc<AtomicBool>,
114 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
115 subscriptions: SubscriptionState,
116 account_id: Option<AccountId>,
117 mm_level: Arc<AtomicU8>,
118 bar_types_cache: Arc<AtomicMap<String, BarType>>,
119 instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
120 trade_subs: Arc<AtomicSet<InstrumentId>>,
121 option_greeks_subs: Arc<AtomicSet<InstrumentId>>,
122 bars_timestamp_on_close: Arc<AtomicBool>,
123 pending_py_requests: Arc<DashMap<String, Vec<PendingPyRequest>>>,
124 transport_backend: TransportBackend,
125 cancellation_token: CancellationToken,
126 proxy_url: Option<String>,
127}
128
129impl Debug for BybitWebSocketClient {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 f.debug_struct(stringify!(BybitWebSocketClient))
132 .field("url", &self.url)
133 .field("environment", &self.environment)
134 .field("product_type", &self.product_type)
135 .field("requires_auth", &self.requires_auth)
136 .field("heartbeat", &self.heartbeat)
137 .field("confirmed_subscriptions", &self.subscriptions.len())
138 .finish()
139 }
140}
141
142impl Clone for BybitWebSocketClient {
143 fn clone(&self) -> Self {
144 Self {
145 url: self.url.clone(),
146 environment: self.environment,
147 product_type: self.product_type,
148 credential: self.credential.clone(),
149 requires_auth: self.requires_auth,
150 auth_tracker: self.auth_tracker.clone(),
151 heartbeat: self.heartbeat,
152 connection_mode: Arc::clone(&self.connection_mode),
153 cmd_tx: Arc::clone(&self.cmd_tx),
154 out_rx: None, signal: Arc::clone(&self.signal),
156 task_handle: None, subscriptions: self.subscriptions.clone(),
158 account_id: self.account_id,
159 mm_level: Arc::clone(&self.mm_level),
160 bar_types_cache: Arc::clone(&self.bar_types_cache),
161 instruments_cache: Arc::clone(&self.instruments_cache),
162 trade_subs: Arc::clone(&self.trade_subs),
163 option_greeks_subs: Arc::clone(&self.option_greeks_subs),
164 bars_timestamp_on_close: Arc::clone(&self.bars_timestamp_on_close),
165 pending_py_requests: Arc::clone(&self.pending_py_requests),
166 transport_backend: self.transport_backend,
167 cancellation_token: self.cancellation_token.clone(),
168 proxy_url: self.proxy_url.clone(),
169 }
170 }
171}
172
173impl BybitWebSocketClient {
174 #[must_use]
176 pub fn new_public(url: Option<String>, heartbeat: u64) -> Self {
177 Self::new_public_with(
178 BybitProductType::Linear,
179 BybitEnvironment::Mainnet,
180 url,
181 heartbeat,
182 TransportBackend::default(),
183 None,
184 )
185 }
186
187 #[must_use]
189 pub fn new_public_with(
190 product_type: BybitProductType,
191 environment: BybitEnvironment,
192 url: Option<String>,
193 heartbeat: u64,
194 transport_backend: TransportBackend,
195 proxy_url: Option<String>,
196 ) -> Self {
197 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
198
199 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
200 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
201
202 Self {
203 url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
204 environment,
205 product_type: Some(product_type),
206 credential: None,
207 requires_auth: false,
208 auth_tracker: AuthTracker::new(),
209 heartbeat: Some(heartbeat),
210 connection_mode,
211 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
212 out_rx: None,
213 signal: Arc::new(AtomicBool::new(false)),
214 task_handle: None,
215 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
216 bar_types_cache: Arc::new(AtomicMap::new()),
217 instruments_cache: Arc::new(AtomicMap::new()),
218 trade_subs: Arc::new(AtomicSet::new()),
219 option_greeks_subs: Arc::new(AtomicSet::new()),
220 bars_timestamp_on_close: Arc::new(AtomicBool::new(true)),
221 pending_py_requests: Arc::new(DashMap::new()),
222 account_id: None,
223 mm_level: Arc::new(AtomicU8::new(0)),
224 transport_backend,
225 cancellation_token: CancellationToken::new(),
226 proxy_url,
227 }
228 }
229
230 #[must_use]
238 pub fn new_private(
239 environment: BybitEnvironment,
240 api_key: Option<String>,
241 api_secret: Option<String>,
242 url: Option<String>,
243 heartbeat: u64,
244 transport_backend: TransportBackend,
245 proxy_url: Option<String>,
246 ) -> Self {
247 let credential = Credential::resolve(api_key, api_secret, environment);
248
249 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
250
251 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
252 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
253
254 Self {
255 url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
256 environment,
257 product_type: None,
258 credential,
259 requires_auth: true,
260 auth_tracker: AuthTracker::new(),
261 heartbeat: Some(heartbeat),
262 connection_mode,
263 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
264 out_rx: None,
265 signal: Arc::new(AtomicBool::new(false)),
266 task_handle: None,
267 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
268 bar_types_cache: Arc::new(AtomicMap::new()),
269 instruments_cache: Arc::new(AtomicMap::new()),
270 trade_subs: Arc::new(AtomicSet::new()),
271 option_greeks_subs: Arc::new(AtomicSet::new()),
272 bars_timestamp_on_close: Arc::new(AtomicBool::new(true)),
273 pending_py_requests: Arc::new(DashMap::new()),
274 account_id: None,
275 mm_level: Arc::new(AtomicU8::new(0)),
276 transport_backend,
277 cancellation_token: CancellationToken::new(),
278 proxy_url,
279 }
280 }
281
282 #[must_use]
290 pub fn new_trade(
291 environment: BybitEnvironment,
292 api_key: Option<String>,
293 api_secret: Option<String>,
294 url: Option<String>,
295 heartbeat: u64,
296 transport_backend: TransportBackend,
297 proxy_url: Option<String>,
298 ) -> Self {
299 let credential = Credential::resolve(api_key, api_secret, environment);
300
301 let (cmd_tx, _) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
302
303 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
304 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
305
306 Self {
307 url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
308 environment,
309 product_type: None,
310 credential,
311 requires_auth: true,
312 auth_tracker: AuthTracker::new(),
313 heartbeat: Some(heartbeat),
314 connection_mode,
315 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
316 out_rx: None,
317 signal: Arc::new(AtomicBool::new(false)),
318 task_handle: None,
319 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
320 bar_types_cache: Arc::new(AtomicMap::new()),
321 instruments_cache: Arc::new(AtomicMap::new()),
322 trade_subs: Arc::new(AtomicSet::new()),
323 option_greeks_subs: Arc::new(AtomicSet::new()),
324 bars_timestamp_on_close: Arc::new(AtomicBool::new(true)),
325 pending_py_requests: Arc::new(DashMap::new()),
326 account_id: None,
327 mm_level: Arc::new(AtomicU8::new(0)),
328 transport_backend,
329 cancellation_token: CancellationToken::new(),
330 proxy_url,
331 }
332 }
333
334 pub async fn connect(&mut self) -> BybitWsResult<()> {
341 const MAX_RETRIES: u32 = 5;
342 const CONNECTION_TIMEOUT_SECS: u64 = 10;
343
344 self.signal.store(false, Ordering::Relaxed);
345
346 let (raw_handler, raw_rx) = channel_message_handler();
347
348 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
351 });
353
354 let ping_msg = serde_json::to_string(&BybitSubscription {
355 op: BybitWsOperation::Ping,
356 args: vec![],
357 req_id: None,
358 })?;
359
360 let config = WebSocketConfig {
361 url: self.url.clone(),
362 headers: Self::default_headers(),
363 heartbeat: self.heartbeat,
364 heartbeat_msg: Some(ping_msg),
365 reconnect_timeout_ms: Some(5_000),
366 reconnect_delay_initial_ms: Some(500),
367 reconnect_delay_max_ms: Some(5_000),
368 reconnect_backoff_factor: Some(1.5),
369 reconnect_jitter_ms: Some(250),
370 reconnect_max_attempts: None,
371 idle_timeout_ms: None,
372 backend: self.transport_backend,
373 proxy_url: self.proxy_url.clone(),
374 };
375
376 let mut backoff = ExponentialBackoff::new(
378 Duration::from_millis(500),
379 Duration::from_millis(5000),
380 2.0,
381 250,
382 false,
383 )
384 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
385
386 #[allow(unused_assignments)]
387 let mut last_error = String::new();
388 let mut attempt = 0;
389 let client = loop {
390 attempt += 1;
391
392 match tokio::time::timeout(
393 Duration::from_secs(CONNECTION_TIMEOUT_SECS),
394 WebSocketClient::connect(
395 config.clone(),
396 Some(raw_handler.clone()),
397 Some(ping_handler.clone()),
398 None,
399 vec![],
400 None,
401 ),
402 )
403 .await
404 {
405 Ok(Ok(client)) => {
406 if attempt > 1 {
407 log::info!("WebSocket connection established after {attempt} attempts");
408 }
409 break client;
410 }
411 Ok(Err(e)) => {
412 last_error = e.to_string();
413 log::warn!(
414 "WebSocket connection attempt failed: attempt={attempt}, max_retries={MAX_RETRIES}, url={}, error={last_error}",
415 self.url
416 );
417 }
418 Err(_) => {
419 last_error = format!(
420 "Connection timeout after {CONNECTION_TIMEOUT_SECS}s (possible DNS resolution failure)"
421 );
422 log::warn!(
423 "WebSocket connection attempt timed out: attempt={attempt}, max_retries={MAX_RETRIES}, url={}",
424 self.url
425 );
426 }
427 }
428
429 if attempt >= MAX_RETRIES {
430 return Err(BybitWsError::Transport(format!(
431 "Failed to connect to {} after {MAX_RETRIES} attempts: {}. \
432 If this is a DNS error, check your network configuration and DNS settings.",
433 self.url,
434 if last_error.is_empty() {
435 "unknown error"
436 } else {
437 &last_error
438 }
439 )));
440 }
441
442 let delay = backoff.next_duration();
443 log::debug!(
444 "Retrying in {delay:?} (attempt {}/{MAX_RETRIES})",
445 attempt + 1
446 );
447 tokio::time::sleep(delay).await;
448 };
449
450 self.connection_mode.store(client.connection_mode_atomic());
451 client.set_auth_tracker(self.auth_tracker.clone(), self.requires_auth);
452
453 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<BybitWsMessage>();
454 self.out_rx = Some(Arc::new(out_rx));
455
456 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
457 *self.cmd_tx.write().await = cmd_tx.clone();
458
459 let cmd = HandlerCommand::SetClient(client);
460
461 self.send_cmd(cmd).await?;
462
463 let signal = Arc::clone(&self.signal);
464 let subscriptions = self.subscriptions.clone();
465 let credential = self.credential.clone();
466 let requires_auth = self.requires_auth;
467 let cmd_tx_for_reconnect = cmd_tx.clone();
468 let auth_tracker = self.auth_tracker.clone();
469 let auth_tracker_for_handler = auth_tracker.clone();
470
471 let stream_handle = get_runtime().spawn(async move {
472 let mut handler = BybitWsFeedHandler::new(
473 signal.clone(),
474 cmd_rx,
475 raw_rx,
476 auth_tracker_for_handler,
477 subscriptions.clone(),
478 );
479
480 let resubscribe_all = || async {
482 let topics = subscriptions.all_topics();
483
484 if topics.is_empty() {
485 return;
486 }
487
488 log::debug!(
489 "Resubscribing to confirmed subscriptions: count={}",
490 topics.len()
491 );
492
493 for topic in &topics {
494 subscriptions.mark_subscribe(topic.as_str());
495 }
496
497 let mut payloads = Vec::with_capacity(topics.len());
498 for topic in &topics {
499 let message = BybitSubscription {
500 op: BybitWsOperation::Subscribe,
501 args: vec![topic.clone()],
502 req_id: Some(topic.clone()),
503 };
504
505 if let Ok(payload) = serde_json::to_string(&message) {
506 payloads.push(payload);
507 }
508 }
509
510 let cmd = HandlerCommand::Subscribe { topics: payloads };
511
512 if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
513 log::error!("Failed to send resubscribe command: {e}");
514 }
515 };
516
517 loop {
519 match handler.next().await {
520 Some(BybitWsMessage::Reconnected) => {
521 if signal.load(Ordering::Relaxed) {
522 continue;
523 }
524
525 log::info!("WebSocket reconnected");
526
527 let confirmed_topics: Vec<String> = {
529 let confirmed = subscriptions.confirmed();
530 let mut topics = Vec::new();
531
532 for entry in confirmed.iter() {
533 let (channel, symbols) = entry.pair();
534 for symbol in symbols {
535 if symbol.is_empty() {
536 topics.push(channel.to_string());
537 } else {
538 topics.push(format!("{channel}.{symbol}"));
539 }
540 }
541 }
542 topics
543 };
544
545 if !confirmed_topics.is_empty() {
546 log::debug!(
547 "Marking confirmed subscriptions as pending for replay: count={}",
548 confirmed_topics.len()
549 );
550
551 for topic in confirmed_topics {
552 subscriptions.mark_failure(&topic);
553 }
554 }
555
556 if requires_auth {
557 log::debug!("Re-authenticating after reconnection");
558
559 if let Some(cred) = &credential {
560 let _rx = auth_tracker.begin();
562
563 let expires = chrono::Utc::now().timestamp_millis()
564 + WEBSOCKET_AUTH_WINDOW_MS;
565 let signature = cred.sign_websocket_auth(expires);
566
567 let auth_message = BybitAuthRequest {
568 op: BybitWsOperation::Auth,
569 args: vec![
570 Value::String(cred.api_key().to_string()),
571 Value::Number(expires.into()),
572 Value::String(signature),
573 ],
574 };
575
576 if let Ok(payload) = serde_json::to_string(&auth_message) {
577 let cmd = HandlerCommand::Authenticate { payload };
578 if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
579 log::error!(
580 "Failed to send reconnection auth command: error={e}"
581 );
582 }
583 } else {
584 log::error!("Failed to serialize reconnection auth message");
585 }
586 }
587 }
588
589 if !requires_auth {
592 log::debug!("No authentication required, resubscribing immediately");
593 resubscribe_all().await;
594 }
595
596 if out_tx.send(BybitWsMessage::Reconnected).is_err() {
598 log::debug!("Receiver dropped, stopping");
599 break;
600 }
601 }
602 Some(BybitWsMessage::Auth(ref auth)) => {
603 let is_success = auth.success.unwrap_or(false) || auth.ret_code == Some(0);
604 if is_success {
605 log::debug!("Authenticated, resubscribing");
606 resubscribe_all().await;
607 }
608
609 if out_tx.send(BybitWsMessage::Auth(auth.clone())).is_err() {
610 log::error!("Failed to send message (receiver dropped)");
611 break;
612 }
613 }
614 Some(msg) => {
615 if out_tx.send(msg).is_err() {
616 log::error!("Failed to send message (receiver dropped)");
617 break;
618 }
619 }
620 None => {
621 if handler.is_stopped() {
623 log::debug!("Stop signal received, ending message processing");
624 break;
625 }
626 log::warn!("WebSocket stream ended unexpectedly");
628 break;
629 }
630 }
631 }
632
633 log::debug!("Handler task exiting");
634 });
635
636 self.task_handle = Some(Arc::new(stream_handle));
637
638 if requires_auth && let Err(e) = self.authenticate_if_required().await {
639 return Err(e);
640 }
641
642 Ok(())
643 }
644
645 pub async fn close(&mut self) -> BybitWsResult<()> {
647 log::debug!("Starting close process");
648
649 self.signal.store(true, Ordering::Relaxed);
650
651 let cmd = HandlerCommand::Disconnect;
652 if let Err(e) = self.cmd_tx.read().await.send(cmd) {
653 log::debug!(
654 "Failed to send disconnect command (handler may already be shut down): {e}"
655 );
656 }
657
658 if let Some(task_handle) = self.task_handle.take() {
659 match Arc::try_unwrap(task_handle) {
660 Ok(handle) => {
661 log::debug!("Waiting for task handle to complete");
662 match tokio::time::timeout(Duration::from_secs(2), handle).await {
663 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
664 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
665 Err(_) => {
666 log::warn!(
667 "Timeout waiting for task handle, task may still be running"
668 );
669 }
670 }
671 }
672 Err(arc_handle) => {
673 log::debug!(
674 "Cannot take ownership of task handle - other references exist, aborting task"
675 );
676 arc_handle.abort();
677 }
678 }
679 } else {
680 log::debug!("No task handle to await");
681 }
682
683 self.auth_tracker.invalidate();
684
685 log::debug!("Closed");
686
687 Ok(())
688 }
689
690 #[must_use]
692 pub fn is_active(&self) -> bool {
693 let connection_mode_arc = self.connection_mode.load();
694 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
695 && !self.signal.load(Ordering::Relaxed)
696 }
697
698 pub fn is_closed(&self) -> bool {
700 let connection_mode_arc = self.connection_mode.load();
701 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
702 || self.signal.load(Ordering::Relaxed)
703 }
704
705 pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
711 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
712
713 tokio::time::timeout(timeout, async {
714 while !self.is_active() {
715 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
716 }
717 })
718 .await
719 .map_err(|_| {
720 BybitWsError::ClientError(format!(
721 "WebSocket connection timeout after {timeout_secs} seconds"
722 ))
723 })?;
724
725 Ok(())
726 }
727
728 pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
730 if topics.is_empty() {
731 return Ok(());
732 }
733
734 log::debug!("Subscribing to topics: {topics:?}");
735
736 let mut topics_to_send = Vec::new();
738
739 for topic in topics {
740 if self.subscriptions.add_reference(&topic) {
742 self.subscriptions.mark_subscribe(&topic);
743 topics_to_send.push(topic.clone());
744 } else {
745 log::debug!("Already subscribed to {topic}, skipping duplicate subscription");
746 }
747 }
748
749 if topics_to_send.is_empty() {
750 return Ok(());
751 }
752
753 let mut payloads = Vec::with_capacity(topics_to_send.len());
755 for topic in &topics_to_send {
756 let message = BybitSubscription {
757 op: BybitWsOperation::Subscribe,
758 args: vec![topic.clone()],
759 req_id: Some(topic.clone()),
760 };
761 let payload = serde_json::to_string(&message).map_err(|e| {
762 BybitWsError::Json(format!("Failed to serialize subscription: {e}"))
763 })?;
764 payloads.push(payload);
765 }
766
767 let cmd = HandlerCommand::Subscribe { topics: payloads };
768 self.cmd_tx
769 .read()
770 .await
771 .send(cmd)
772 .map_err(|e| BybitWsError::Send(format!("Failed to send subscribe command: {e}")))?;
773
774 Ok(())
775 }
776
777 pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
779 if topics.is_empty() {
780 return Ok(());
781 }
782
783 log::debug!("Attempting to unsubscribe from topics: {topics:?}");
784
785 if self.signal.load(Ordering::Relaxed) {
786 log::debug!("Shutdown signal detected, skipping unsubscribe");
787 return Ok(());
788 }
789
790 let mut topics_to_send = Vec::new();
792
793 for topic in topics {
794 if self.subscriptions.remove_reference(&topic) {
796 self.subscriptions.mark_unsubscribe(&topic);
797 topics_to_send.push(topic.clone());
798 } else {
799 log::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
800 }
801 }
802
803 if topics_to_send.is_empty() {
804 return Ok(());
805 }
806
807 let mut payloads = Vec::with_capacity(topics_to_send.len());
809 for topic in &topics_to_send {
810 let message = BybitSubscription {
811 op: BybitWsOperation::Unsubscribe,
812 args: vec![topic.clone()],
813 req_id: Some(topic.clone()),
814 };
815
816 if let Ok(payload) = serde_json::to_string(&message) {
817 payloads.push(payload);
818 }
819 }
820
821 let cmd = HandlerCommand::Unsubscribe { topics: payloads };
822 if let Err(e) = self.cmd_tx.read().await.send(cmd) {
823 log::debug!("Failed to send unsubscribe command: error={e}");
824 }
825
826 Ok(())
827 }
828
829 pub fn stream(&mut self) -> impl futures_util::Stream<Item = BybitWsMessage> + use<> {
835 let rx = self
836 .out_rx
837 .take()
838 .expect("Stream receiver already taken or client not connected");
839 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
840 async_stream::stream! {
841 while let Some(msg) = rx.recv().await {
842 yield msg;
843 }
844 }
845 }
846
847 #[must_use]
849 pub fn subscription_count(&self) -> usize {
850 self.subscriptions.len()
851 }
852
853 #[must_use]
855 pub fn credential(&self) -> Option<&Credential> {
856 self.credential.as_ref()
857 }
858
859 pub fn set_account_id(&mut self, account_id: AccountId) {
861 self.account_id = Some(account_id);
862 }
863
864 pub fn set_mm_level(&self, mm_level: u8) {
866 self.mm_level.store(mm_level, Ordering::Relaxed);
867 }
868
869 #[must_use]
871 pub fn account_id(&self) -> Option<AccountId> {
872 self.account_id
873 }
874
875 #[must_use]
877 pub fn product_type(&self) -> Option<BybitProductType> {
878 self.product_type
879 }
880
881 #[must_use]
883 pub fn bar_types_cache(&self) -> &Arc<AtomicMap<String, BarType>> {
884 &self.bar_types_cache
885 }
886
887 pub fn cache_instrument(&self, instrument: InstrumentAny) {
889 self.instruments_cache
890 .insert(instrument.id().symbol.inner(), instrument);
891 }
892
893 #[must_use]
895 pub fn instruments_snapshot(&self) -> ahash::AHashMap<Ustr, InstrumentAny> {
896 (**self.instruments_cache.load()).clone()
897 }
898
899 pub fn set_bars_timestamp_on_close(&self, value: bool) {
901 self.bars_timestamp_on_close.store(value, Ordering::Relaxed);
902 }
903
904 #[must_use]
906 pub fn bars_timestamp_on_close(&self) -> bool {
907 self.bars_timestamp_on_close.load(Ordering::Relaxed)
908 }
909
910 pub fn add_option_greeks_sub(&self, instrument_id: InstrumentId) {
912 self.option_greeks_subs.insert(instrument_id);
913 }
914
915 pub fn remove_option_greeks_sub(&self, instrument_id: &InstrumentId) {
917 self.option_greeks_subs.remove(instrument_id);
918 }
919
920 #[must_use]
922 pub fn option_greeks_subs(&self) -> &Arc<AtomicSet<InstrumentId>> {
923 &self.option_greeks_subs
924 }
925
926 #[must_use]
928 pub fn trade_subs(&self) -> &Arc<AtomicSet<InstrumentId>> {
929 &self.trade_subs
930 }
931
932 #[must_use]
934 pub fn pending_py_requests(&self) -> &Arc<DashMap<String, Vec<PendingPyRequest>>> {
935 &self.pending_py_requests
936 }
937
938 #[must_use]
940 pub fn instruments_cache_ref(&self) -> &Arc<AtomicMap<Ustr, InstrumentAny>> {
941 &self.instruments_cache
942 }
943
944 pub async fn subscribe_orderbook(
954 &self,
955 instrument_id: InstrumentId,
956 depth: u32,
957 ) -> BybitWsResult<()> {
958 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
959 let topic = format!(
960 "{}.{depth}.{raw_symbol}",
961 BybitWsPublicChannel::OrderBook.as_ref()
962 );
963 self.subscribe(vec![topic]).await
964 }
965
966 pub async fn unsubscribe_orderbook(
968 &self,
969 instrument_id: InstrumentId,
970 depth: u32,
971 ) -> BybitWsResult<()> {
972 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
973 let topic = format!(
974 "{}.{depth}.{raw_symbol}",
975 BybitWsPublicChannel::OrderBook.as_ref()
976 );
977 self.unsubscribe(vec![topic]).await
978 }
979
980 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
990 self.trade_subs.insert(instrument_id);
991 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
992 let topic_symbol = match self.product_type {
994 Some(BybitProductType::Option) => extract_base_coin(raw_symbol),
995 _ => raw_symbol,
996 };
997 let topic = format!(
998 "{}.{topic_symbol}",
999 BybitWsPublicChannel::PublicTrade.as_ref()
1000 );
1001 self.subscribe(vec![topic]).await
1002 }
1003
1004 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1006 self.trade_subs.remove(&instrument_id);
1007 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1008 let topic_symbol = match self.product_type {
1009 Some(BybitProductType::Option) => extract_base_coin(raw_symbol),
1010 _ => raw_symbol,
1011 };
1012 let topic = format!(
1013 "{}.{topic_symbol}",
1014 BybitWsPublicChannel::PublicTrade.as_ref()
1015 );
1016 self.unsubscribe(vec![topic]).await
1017 }
1018
1019 pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1029 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1030 let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1031 self.subscribe(vec![topic]).await
1032 }
1033
1034 pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1036 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1037 let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1038 self.unsubscribe(vec![topic]).await
1039 }
1040
1041 pub async fn subscribe_bars(&self, bar_type: BarType) -> BybitWsResult<()> {
1051 if self.product_type == Some(BybitProductType::Option) {
1052 return Err(BybitWsError::ClientError(
1053 "Bybit does not support kline/bar data for options".to_string(),
1054 ));
1055 }
1056
1057 let spec = bar_type.spec();
1058
1059 if spec.price_type != PriceType::Last {
1060 return Err(BybitWsError::ClientError(format!(
1061 "Invalid bar type: Bybit bars only support LAST price type, received {}",
1062 spec.price_type
1063 )));
1064 }
1065
1066 if bar_type.aggregation_source() != AggregationSource::External {
1067 return Err(BybitWsError::ClientError(format!(
1068 "Invalid bar type: Bybit bars only support EXTERNAL aggregation source, received {}",
1069 bar_type.aggregation_source()
1070 )));
1071 }
1072
1073 let interval = bar_spec_to_bybit_interval(spec.aggregation, spec.step.get() as u64)
1074 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1075
1076 let instrument_id = bar_type.instrument_id();
1077 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1078 let topic = format!(
1079 "{}.{}.{raw_symbol}",
1080 BybitWsPublicChannel::Kline.as_ref(),
1081 interval
1082 );
1083
1084 if self.subscriptions.get_reference_count(&topic) == 0 {
1086 self.bar_types_cache.insert(topic.clone(), bar_type);
1087 }
1088
1089 self.subscribe(vec![topic]).await
1090 }
1091
1092 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> BybitWsResult<()> {
1094 let spec = bar_type.spec();
1095 let interval = bar_spec_to_bybit_interval(spec.aggregation, spec.step.get() as u64)
1096 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1097
1098 let instrument_id = bar_type.instrument_id();
1099 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1100 let topic = format!(
1101 "{}.{}.{raw_symbol}",
1102 BybitWsPublicChannel::Kline.as_ref(),
1103 interval
1104 );
1105
1106 if self.subscriptions.get_reference_count(&topic) == 1 {
1108 self.bar_types_cache.remove(&topic);
1109 }
1110
1111 self.unsubscribe(vec![topic]).await
1112 }
1113
1114 pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
1124 if !self.requires_auth {
1125 return Err(BybitWsError::Authentication(
1126 "Order subscription requires authentication".to_string(),
1127 ));
1128 }
1129 self.subscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1130 .await
1131 }
1132
1133 pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
1135 self.unsubscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1136 .await
1137 }
1138
1139 pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
1149 if !self.requires_auth {
1150 return Err(BybitWsError::Authentication(
1151 "Execution subscription requires authentication".to_string(),
1152 ));
1153 }
1154 self.subscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1155 .await
1156 }
1157
1158 pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
1160 self.unsubscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1161 .await
1162 }
1163
1164 pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
1174 if !self.requires_auth {
1175 return Err(BybitWsError::Authentication(
1176 "Position subscription requires authentication".to_string(),
1177 ));
1178 }
1179 self.subscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1180 .await
1181 }
1182
1183 pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
1185 self.unsubscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1186 .await
1187 }
1188
1189 pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
1199 if !self.requires_auth {
1200 return Err(BybitWsError::Authentication(
1201 "Wallet subscription requires authentication".to_string(),
1202 ));
1203 }
1204 self.subscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1205 .await
1206 }
1207
1208 pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
1210 self.unsubscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1211 .await
1212 }
1213
1214 async fn require_authenticated(&self) -> BybitWsResult<()> {
1217 if self.is_closed() {
1218 return Err(BybitWsError::ClientError(
1219 "WebSocket client is closed".to_string(),
1220 ));
1221 }
1222
1223 if self.auth_tracker.is_authenticated() {
1224 return Ok(());
1225 }
1226
1227 tokio::select! {
1228 authenticated = self.auth_tracker.wait_for_authenticated(AUTH_WAIT_TIMEOUT) => {
1229 if authenticated {
1230 Ok(())
1231 } else {
1232 Err(BybitWsError::Authentication(
1233 "Must be authenticated".to_string(),
1234 ))
1235 }
1236 }
1237 () = async {
1238 loop {
1239 tokio::time::sleep(Duration::from_millis(100)).await;
1240
1241 if self.is_closed() {
1242 return;
1243 }
1244 }
1245 } => {
1246 Err(BybitWsError::ClientError(
1247 "WebSocket client closed during authentication wait".to_string(),
1248 ))
1249 }
1250 }
1251 }
1252
1253 pub async fn place_order(&self, params: BybitWsPlaceOrderParams) -> BybitWsResult<String> {
1259 self.require_authenticated().await?;
1260
1261 let req_id = UUID4::new().to_string();
1262
1263 let referer = if self.include_referer_header(params.time_in_force) {
1264 Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1265 } else {
1266 None
1267 };
1268
1269 let request = BybitWsRequest {
1270 req_id: Some(req_id.clone()),
1271 op: BybitWsOrderRequestOp::Create,
1272 header: BybitWsHeader::with_referer(referer),
1273 args: vec![params],
1274 };
1275
1276 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1277 self.send_text(&payload).await?;
1278
1279 Ok(req_id)
1280 }
1281
1282 pub async fn amend_order(&self, params: BybitWsAmendOrderParams) -> BybitWsResult<String> {
1288 self.require_authenticated().await?;
1289
1290 let req_id = UUID4::new().to_string();
1291
1292 let request = BybitWsRequest {
1293 req_id: Some(req_id.clone()),
1294 op: BybitWsOrderRequestOp::Amend,
1295 header: BybitWsHeader::now(),
1296 args: vec![params],
1297 };
1298
1299 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1300 self.send_text(&payload).await?;
1301
1302 Ok(req_id)
1303 }
1304
1305 pub async fn cancel_order(&self, params: BybitWsCancelOrderParams) -> BybitWsResult<String> {
1311 self.require_authenticated().await?;
1312
1313 let req_id = UUID4::new().to_string();
1314
1315 let request = BybitWsRequest {
1316 req_id: Some(req_id.clone()),
1317 op: BybitWsOrderRequestOp::Cancel,
1318 header: BybitWsHeader::now(),
1319 args: vec![params],
1320 };
1321
1322 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1323 self.send_text(&payload).await?;
1324
1325 Ok(req_id)
1326 }
1327
1328 pub async fn batch_place_orders(
1334 &self,
1335 orders: Vec<BybitWsPlaceOrderParams>,
1336 ) -> BybitWsResult<Vec<String>> {
1337 self.require_authenticated().await?;
1338
1339 if orders.is_empty() {
1340 log::warn!("Batch place orders called with empty orders list");
1341 return Ok(vec![]);
1342 }
1343
1344 let mut req_ids = Vec::new();
1345
1346 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1347 let req_id = self.batch_place_orders_chunk(chunk.to_vec()).await?;
1348 req_ids.push(req_id);
1349 }
1350
1351 Ok(req_ids)
1352 }
1353
1354 async fn batch_place_orders_chunk(
1355 &self,
1356 orders: Vec<BybitWsPlaceOrderParams>,
1357 ) -> BybitWsResult<String> {
1358 let category = orders[0].category;
1359 let batch_req_id = UUID4::new().to_string();
1360
1361 let mm_level = self.mm_level.load(Ordering::Relaxed);
1362 let has_non_post_only = orders
1363 .iter()
1364 .any(|o| !matches!(o.time_in_force, Some(BybitTimeInForce::PostOnly)));
1365 let referer = if has_non_post_only || mm_level == 0 {
1366 Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1367 } else {
1368 None
1369 };
1370
1371 let request_items: Vec<BybitWsBatchPlaceItem> = orders
1372 .into_iter()
1373 .map(|order| BybitWsBatchPlaceItem {
1374 symbol: order.symbol,
1375 side: order.side,
1376 order_type: order.order_type,
1377 qty: order.qty,
1378 is_leverage: order.is_leverage,
1379 market_unit: order.market_unit,
1380 price: order.price,
1381 time_in_force: order.time_in_force,
1382 order_link_id: order.order_link_id,
1383 reduce_only: order.reduce_only,
1384 close_on_trigger: order.close_on_trigger,
1385 trigger_price: order.trigger_price,
1386 trigger_by: order.trigger_by,
1387 trigger_direction: order.trigger_direction,
1388 tpsl_mode: order.tpsl_mode,
1389 take_profit: order.take_profit,
1390 stop_loss: order.stop_loss,
1391 tp_trigger_by: order.tp_trigger_by,
1392 sl_trigger_by: order.sl_trigger_by,
1393 sl_trigger_price: order.sl_trigger_price,
1394 tp_trigger_price: order.tp_trigger_price,
1395 sl_order_type: order.sl_order_type,
1396 tp_order_type: order.tp_order_type,
1397 sl_limit_price: order.sl_limit_price,
1398 tp_limit_price: order.tp_limit_price,
1399 order_iv: order.order_iv,
1400 mmp: order.mmp,
1401 position_idx: order.position_idx,
1402 })
1403 .collect();
1404
1405 let args = BybitWsBatchPlaceOrderArgs {
1406 category,
1407 request: request_items,
1408 };
1409
1410 let request = BybitWsRequest {
1411 req_id: Some(batch_req_id.clone()),
1412 op: BybitWsOrderRequestOp::CreateBatch,
1413 header: BybitWsHeader::with_referer(referer),
1414 args: vec![args],
1415 };
1416
1417 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1418 self.send_text(&payload).await?;
1419
1420 Ok(batch_req_id)
1421 }
1422
1423 pub async fn batch_amend_orders(
1429 &self,
1430 orders: Vec<BybitWsAmendOrderParams>,
1431 ) -> BybitWsResult<Vec<String>> {
1432 self.require_authenticated().await?;
1433
1434 if orders.is_empty() {
1435 log::warn!("Batch amend orders called with empty orders list");
1436 return Ok(vec![]);
1437 }
1438
1439 let mut req_ids = Vec::new();
1440
1441 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1442 let req_id = self.batch_amend_orders_chunk(chunk.to_vec()).await?;
1443 req_ids.push(req_id);
1444 }
1445
1446 Ok(req_ids)
1447 }
1448
1449 async fn batch_amend_orders_chunk(
1450 &self,
1451 orders: Vec<BybitWsAmendOrderParams>,
1452 ) -> BybitWsResult<String> {
1453 let batch_req_id = UUID4::new().to_string();
1454
1455 let request = BybitWsRequest {
1456 req_id: Some(batch_req_id.clone()),
1457 op: BybitWsOrderRequestOp::AmendBatch,
1458 header: BybitWsHeader::now(),
1459 args: orders,
1460 };
1461
1462 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1463 self.send_text(&payload).await?;
1464
1465 Ok(batch_req_id)
1466 }
1467
1468 pub async fn batch_cancel_orders(
1474 &self,
1475 orders: Vec<BybitWsCancelOrderParams>,
1476 ) -> BybitWsResult<Vec<String>> {
1477 self.require_authenticated().await?;
1478
1479 if orders.is_empty() {
1480 log::warn!("Batch cancel orders called with empty orders list");
1481 return Ok(vec![]);
1482 }
1483
1484 let mut req_ids = Vec::new();
1485
1486 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1487 let req_id = self.batch_cancel_orders_chunk(chunk.to_vec()).await?;
1488 req_ids.push(req_id);
1489 }
1490
1491 Ok(req_ids)
1492 }
1493
1494 async fn batch_cancel_orders_chunk(
1495 &self,
1496 orders: Vec<BybitWsCancelOrderParams>,
1497 ) -> BybitWsResult<String> {
1498 if orders.is_empty() {
1499 return Ok(String::new());
1500 }
1501
1502 let category = orders[0].category;
1503 let batch_req_id = UUID4::new().to_string();
1504
1505 let request_items: Vec<BybitWsBatchCancelItem> = orders
1506 .into_iter()
1507 .map(|order| BybitWsBatchCancelItem {
1508 symbol: order.symbol,
1509 order_id: order.order_id,
1510 order_link_id: order.order_link_id,
1511 })
1512 .collect();
1513
1514 let args = BybitWsBatchCancelOrderArgs {
1515 category,
1516 request: request_items,
1517 };
1518
1519 let request = BybitWsRequest {
1520 req_id: Some(batch_req_id.clone()),
1521 op: BybitWsOrderRequestOp::CancelBatch,
1522 header: BybitWsHeader::now(),
1523 args: vec![args],
1524 };
1525
1526 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1527 self.send_text(&payload).await?;
1528
1529 Ok(batch_req_id)
1530 }
1531
1532 #[expect(clippy::too_many_arguments)]
1538 pub async fn submit_order(
1539 &self,
1540 product_type: BybitProductType,
1541 instrument_id: InstrumentId,
1542 client_order_id: ClientOrderId,
1543 order_side: OrderSide,
1544 order_type: OrderType,
1545 quantity: Quantity,
1546 is_quote_quantity: bool,
1547 time_in_force: Option<TimeInForce>,
1548 price: Option<Price>,
1549 trigger_price: Option<Price>,
1550 trigger_type: Option<TriggerType>,
1551 post_only: Option<bool>,
1552 reduce_only: Option<bool>,
1553 is_leverage: bool,
1554 position_idx: Option<BybitPositionIdx>,
1555 ) -> BybitWsResult<String> {
1556 let params = self.build_place_order_params(
1557 product_type,
1558 instrument_id,
1559 client_order_id,
1560 order_side,
1561 order_type,
1562 quantity,
1563 is_quote_quantity,
1564 time_in_force,
1565 price,
1566 trigger_price,
1567 trigger_type,
1568 post_only,
1569 reduce_only,
1570 is_leverage,
1571 None,
1572 None,
1573 position_idx,
1574 )?;
1575
1576 self.place_order(params).await
1577 }
1578
1579 pub async fn modify_order(
1585 &self,
1586 product_type: BybitProductType,
1587 instrument_id: InstrumentId,
1588 client_order_id: ClientOrderId,
1589 venue_order_id: Option<VenueOrderId>,
1590 quantity: Option<Quantity>,
1591 price: Option<Price>,
1592 ) -> BybitWsResult<String> {
1593 let params = self.build_amend_order_params(
1594 product_type,
1595 instrument_id,
1596 venue_order_id,
1597 Some(client_order_id),
1598 quantity,
1599 price,
1600 )?;
1601
1602 self.amend_order(params).await
1603 }
1604
1605 pub async fn cancel_order_by_id(
1611 &self,
1612 product_type: BybitProductType,
1613 instrument_id: InstrumentId,
1614 client_order_id: ClientOrderId,
1615 venue_order_id: Option<VenueOrderId>,
1616 ) -> BybitWsResult<String> {
1617 let params = self.build_cancel_order_params(
1618 product_type,
1619 instrument_id,
1620 venue_order_id,
1621 Some(client_order_id),
1622 )?;
1623
1624 self.cancel_order(params).await
1625 }
1626
1627 #[expect(clippy::too_many_arguments)]
1629 pub fn build_place_order_params(
1630 &self,
1631 product_type: BybitProductType,
1632 instrument_id: InstrumentId,
1633 client_order_id: ClientOrderId,
1634 order_side: OrderSide,
1635 order_type: OrderType,
1636 quantity: Quantity,
1637 is_quote_quantity: bool,
1638 time_in_force: Option<TimeInForce>,
1639 price: Option<Price>,
1640 trigger_price: Option<Price>,
1641 trigger_type: Option<TriggerType>,
1642 post_only: Option<bool>,
1643 reduce_only: Option<bool>,
1644 is_leverage: bool,
1645 take_profit: Option<Price>,
1646 stop_loss: Option<Price>,
1647 position_idx: Option<BybitPositionIdx>,
1648 ) -> BybitWsResult<BybitWsPlaceOrderParams> {
1649 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1650 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1651 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1652
1653 let bybit_side = match order_side {
1654 OrderSide::Buy => BybitOrderSide::Buy,
1655 OrderSide::Sell => BybitOrderSide::Sell,
1656 _ => {
1657 return Err(BybitWsError::ClientError(format!(
1658 "Invalid order side: {order_side:?}"
1659 )));
1660 }
1661 };
1662
1663 let (bybit_order_type, is_stop_order) = match order_type {
1664 OrderType::Market => (BybitOrderType::Market, false),
1665 OrderType::Limit => (BybitOrderType::Limit, false),
1666 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1667 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1668 _ => {
1669 return Err(BybitWsError::ClientError(format!(
1670 "Unsupported order type: {order_type:?}"
1671 )));
1672 }
1673 };
1674
1675 let bybit_tif =
1676 map_time_in_force(bybit_order_type, time_in_force, post_only).map_err(|tif| {
1677 BybitWsError::ClientError(format!("Unsupported time in force: {tif:?}"))
1678 })?;
1679 let market_unit = spot_market_unit(product_type, bybit_order_type, is_quote_quantity);
1680 let is_leverage_value = spot_leverage(product_type, is_leverage);
1681 let trigger_dir =
1682 trigger_direction(order_type, order_side, is_stop_order).map(|d| d as i32);
1683
1684 let params = if is_stop_order {
1685 BybitWsPlaceOrderParams {
1686 category: product_type,
1687 symbol: raw_symbol,
1688 side: bybit_side,
1689 order_type: bybit_order_type,
1690 qty: quantity.to_string(),
1691 is_leverage: is_leverage_value,
1692 market_unit,
1693 price: price.map(|p| p.to_string()),
1694 time_in_force: bybit_tif,
1695 order_link_id: Some(client_order_id.to_string()),
1696 reduce_only: reduce_only.filter(|&r| r),
1697 close_on_trigger: None,
1698 trigger_price: trigger_price.map(|p| p.to_string()),
1699 trigger_by: Some(resolve_trigger_type(trigger_type)),
1700 trigger_direction: trigger_dir,
1701 tpsl_mode: if take_profit.is_some() || stop_loss.is_some() {
1702 Some(BybitTpSlMode::Full)
1703 } else {
1704 None
1705 },
1706 take_profit: take_profit.map(|p| p.to_string()),
1707 stop_loss: stop_loss.map(|p| p.to_string()),
1708 tp_trigger_by: take_profit.map(|_| resolve_trigger_type(trigger_type)),
1709 sl_trigger_by: stop_loss.map(|_| resolve_trigger_type(trigger_type)),
1710 sl_trigger_price: None,
1711 tp_trigger_price: None,
1712 sl_order_type: None,
1713 tp_order_type: None,
1714 sl_limit_price: None,
1715 tp_limit_price: None,
1716 order_iv: None,
1717 mmp: None,
1718 position_idx,
1719 }
1720 } else {
1721 BybitWsPlaceOrderParams {
1722 category: product_type,
1723 symbol: raw_symbol,
1724 side: bybit_side,
1725 order_type: bybit_order_type,
1726 qty: quantity.to_string(),
1727 is_leverage: is_leverage_value,
1728 market_unit,
1729 price: price.map(|p| p.to_string()),
1730 time_in_force: bybit_tif,
1731 order_link_id: Some(client_order_id.to_string()),
1732 reduce_only: reduce_only.filter(|&r| r),
1733 close_on_trigger: None,
1734 trigger_price: None,
1735 trigger_by: None,
1736 trigger_direction: None,
1737 tpsl_mode: if take_profit.is_some() || stop_loss.is_some() {
1738 Some(BybitTpSlMode::Full)
1739 } else {
1740 None
1741 },
1742 take_profit: take_profit.map(|p| p.to_string()),
1743 stop_loss: stop_loss.map(|p| p.to_string()),
1744 tp_trigger_by: take_profit.map(|_| resolve_trigger_type(trigger_type)),
1745 sl_trigger_by: stop_loss.map(|_| resolve_trigger_type(trigger_type)),
1746 sl_trigger_price: None,
1747 tp_trigger_price: None,
1748 sl_order_type: None,
1749 tp_order_type: None,
1750 sl_limit_price: None,
1751 tp_limit_price: None,
1752 order_iv: None,
1753 mmp: None,
1754 position_idx,
1755 }
1756 };
1757
1758 Ok(params)
1759 }
1760
1761 pub fn build_amend_order_params(
1763 &self,
1764 product_type: BybitProductType,
1765 instrument_id: InstrumentId,
1766 venue_order_id: Option<VenueOrderId>,
1767 client_order_id: Option<ClientOrderId>,
1768 quantity: Option<Quantity>,
1769 price: Option<Price>,
1770 ) -> BybitWsResult<BybitWsAmendOrderParams> {
1771 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1772 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1773 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1774
1775 Ok(BybitWsAmendOrderParams {
1776 category: product_type,
1777 symbol: raw_symbol,
1778 order_id: venue_order_id.map(|v| v.to_string()),
1779 order_link_id: client_order_id.map(|c| c.to_string()),
1780 qty: quantity.map(|q| q.to_string()),
1781 price: price.map(|p| p.to_string()),
1782 trigger_price: None,
1783 take_profit: None,
1784 stop_loss: None,
1785 tp_trigger_by: None,
1786 sl_trigger_by: None,
1787 order_iv: None,
1788 })
1789 }
1790
1791 pub fn build_cancel_order_params(
1798 &self,
1799 product_type: BybitProductType,
1800 instrument_id: InstrumentId,
1801 venue_order_id: Option<VenueOrderId>,
1802 client_order_id: Option<ClientOrderId>,
1803 ) -> BybitWsResult<BybitWsCancelOrderParams> {
1804 if venue_order_id.is_none() && client_order_id.is_none() {
1805 return Err(BybitWsError::ClientError(
1806 "Either venue_order_id or client_order_id must be provided".to_string(),
1807 ));
1808 }
1809
1810 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1811 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1812 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1813
1814 Ok(BybitWsCancelOrderParams {
1815 category: product_type,
1816 symbol: raw_symbol,
1817 order_id: venue_order_id.map(|v| v.to_string()),
1818 order_link_id: client_order_id.map(|c| c.to_string()),
1819 })
1820 }
1821
1822 fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
1823 let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
1824 let mm_level = self.mm_level.load(Ordering::Relaxed);
1825 !(is_post_only && mm_level > 0)
1826 }
1827
1828 fn default_headers() -> Vec<(String, String)> {
1829 vec![
1830 ("Content-Type".to_string(), "application/json".to_string()),
1831 ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
1832 ]
1833 }
1834
1835 async fn authenticate_if_required(&self) -> BybitWsResult<()> {
1836 if !self.requires_auth {
1837 return Ok(());
1838 }
1839
1840 let credential = self.credential.as_ref().ok_or_else(|| {
1841 BybitWsError::Authentication("Credentials required for authentication".to_string())
1842 })?;
1843
1844 let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
1845 let signature = credential.sign_websocket_auth(expires);
1846
1847 let auth_message = BybitAuthRequest {
1848 op: BybitWsOperation::Auth,
1849 args: vec![
1850 Value::String(credential.api_key().to_string()),
1851 Value::Number(expires.into()),
1852 Value::String(signature),
1853 ],
1854 };
1855
1856 let payload = serde_json::to_string(&auth_message)?;
1857
1858 let _rx = self.auth_tracker.begin();
1860
1861 self.cmd_tx
1862 .read()
1863 .await
1864 .send(HandlerCommand::Authenticate { payload })
1865 .map_err(|e| BybitWsError::Send(format!("Failed to send auth command: {e}")))?;
1866
1867 Ok(())
1868 }
1869
1870 async fn send_text(&self, text: &str) -> BybitWsResult<()> {
1871 let cmd = HandlerCommand::SendText {
1872 payload: text.to_string(),
1873 };
1874
1875 self.send_cmd(cmd).await
1876 }
1877
1878 async fn send_cmd(&self, cmd: HandlerCommand) -> BybitWsResult<()> {
1879 self.cmd_tx
1880 .read()
1881 .await
1882 .send(cmd)
1883 .map_err(|e| BybitWsError::Send(e.to_string()))
1884 }
1885}
1886
1887#[cfg(test)]
1888mod tests {
1889 use rstest::rstest;
1890
1891 use super::*;
1892 use crate::{
1893 common::{enums::BybitMarketUnit, testing::load_test_json},
1894 websocket::{messages::BybitWsFrame, parse_bybit_ws_frame},
1895 };
1896
1897 #[rstest]
1898 fn classify_orderbook_snapshot() {
1899 let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
1900 .expect("invalid fixture");
1901 let frame = parse_bybit_ws_frame(json);
1902 assert!(matches!(frame, BybitWsFrame::Orderbook(_)));
1903 }
1904
1905 #[rstest]
1906 fn classify_trade_snapshot() {
1907 let json: Value =
1908 serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
1909 let frame = parse_bybit_ws_frame(json);
1910 assert!(matches!(frame, BybitWsFrame::Trade(_)));
1911 }
1912
1913 #[rstest]
1914 fn classify_ticker_linear_snapshot() {
1915 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
1916 .expect("invalid fixture");
1917 let frame = parse_bybit_ws_frame(json);
1918 assert!(matches!(frame, BybitWsFrame::TickerLinear(_)));
1919 }
1920
1921 #[rstest]
1922 fn classify_ticker_option_snapshot() {
1923 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
1924 .expect("invalid fixture");
1925 let frame = parse_bybit_ws_frame(json);
1926 assert!(matches!(frame, BybitWsFrame::TickerOption(_)));
1927 }
1928
1929 #[rstest]
1930 fn test_race_unsubscribe_failure_recovery() {
1931 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1932 let topic = "publicTrade.BTCUSDT";
1933
1934 subscriptions.mark_subscribe(topic);
1935 subscriptions.confirm_subscribe(topic);
1936 assert_eq!(subscriptions.len(), 1);
1937
1938 subscriptions.mark_unsubscribe(topic);
1939 assert_eq!(subscriptions.len(), 0);
1940 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
1941
1942 subscriptions.confirm_unsubscribe(topic);
1943 subscriptions.mark_subscribe(topic);
1944 subscriptions.confirm_subscribe(topic);
1945
1946 assert_eq!(subscriptions.len(), 1);
1947 assert!(subscriptions.pending_unsubscribe_topics().is_empty());
1948 assert!(subscriptions.pending_subscribe_topics().is_empty());
1949
1950 let all = subscriptions.all_topics();
1951 assert_eq!(all.len(), 1);
1952 assert!(all.contains(&topic.to_string()));
1953 }
1954
1955 #[rstest]
1956 fn test_race_resubscribe_before_unsubscribe_ack() {
1957 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1958 let topic = "orderbook.50.BTCUSDT";
1959
1960 subscriptions.mark_subscribe(topic);
1961 subscriptions.confirm_subscribe(topic);
1962 assert_eq!(subscriptions.len(), 1);
1963
1964 subscriptions.mark_unsubscribe(topic);
1965 assert_eq!(subscriptions.len(), 0);
1966 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
1967
1968 subscriptions.mark_subscribe(topic);
1969 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
1970
1971 subscriptions.confirm_unsubscribe(topic);
1972 assert!(subscriptions.pending_unsubscribe_topics().is_empty());
1973 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
1974
1975 subscriptions.confirm_subscribe(topic);
1976 assert_eq!(subscriptions.len(), 1);
1977 assert!(subscriptions.pending_subscribe_topics().is_empty());
1978
1979 let all = subscriptions.all_topics();
1980 assert_eq!(all.len(), 1);
1981 assert!(all.contains(&topic.to_string()));
1982 }
1983
1984 #[rstest]
1985 fn test_race_late_subscribe_confirmation_after_unsubscribe() {
1986 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1987 let topic = "tickers.ETHUSDT";
1988
1989 subscriptions.mark_subscribe(topic);
1990 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
1991
1992 subscriptions.mark_unsubscribe(topic);
1993 assert!(subscriptions.pending_subscribe_topics().is_empty());
1994 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
1995
1996 subscriptions.confirm_subscribe(topic);
1997 assert_eq!(subscriptions.len(), 0);
1998 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
1999
2000 subscriptions.confirm_unsubscribe(topic);
2001
2002 assert!(subscriptions.is_empty());
2003 assert!(subscriptions.all_topics().is_empty());
2004 }
2005
2006 #[rstest]
2007 fn test_race_reconnection_with_pending_states() {
2008 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
2009
2010 let trade_btc = "publicTrade.BTCUSDT";
2011 subscriptions.mark_subscribe(trade_btc);
2012 subscriptions.confirm_subscribe(trade_btc);
2013
2014 let trade_eth = "publicTrade.ETHUSDT";
2015 subscriptions.mark_subscribe(trade_eth);
2016
2017 let book_btc = "orderbook.50.BTCUSDT";
2018 subscriptions.mark_subscribe(book_btc);
2019 subscriptions.confirm_subscribe(book_btc);
2020 subscriptions.mark_unsubscribe(book_btc);
2021
2022 let topics_to_restore = subscriptions.all_topics();
2023
2024 assert_eq!(topics_to_restore.len(), 2);
2025 assert!(topics_to_restore.contains(&trade_btc.to_string()));
2026 assert!(topics_to_restore.contains(&trade_eth.to_string()));
2027 assert!(!topics_to_restore.contains(&book_btc.to_string()));
2028 }
2029
2030 #[rstest]
2031 fn test_race_duplicate_subscribe_messages_idempotent() {
2032 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
2033 let topic = "publicTrade.BTCUSDT";
2034
2035 subscriptions.mark_subscribe(topic);
2036 subscriptions.confirm_subscribe(topic);
2037 assert_eq!(subscriptions.len(), 1);
2038
2039 subscriptions.mark_subscribe(topic);
2040 assert!(subscriptions.pending_subscribe_topics().is_empty());
2041 assert_eq!(subscriptions.len(), 1);
2042
2043 subscriptions.confirm_subscribe(topic);
2044 assert_eq!(subscriptions.len(), 1);
2045
2046 let all = subscriptions.all_topics();
2047 assert_eq!(all.len(), 1);
2048 assert_eq!(all[0], topic);
2049 }
2050
2051 #[rstest]
2052 #[case::spot_with_leverage(BybitProductType::Spot, true, Some(1))]
2053 #[case::spot_without_leverage(BybitProductType::Spot, false, Some(0))]
2054 #[case::linear_with_leverage(BybitProductType::Linear, true, None)]
2055 #[case::linear_without_leverage(BybitProductType::Linear, false, None)]
2056 #[case::inverse_with_leverage(BybitProductType::Inverse, true, None)]
2057 #[case::option_with_leverage(BybitProductType::Option, true, None)]
2058 fn test_is_leverage_parameter(
2059 #[case] product_type: BybitProductType,
2060 #[case] is_leverage: bool,
2061 #[case] expected: Option<i32>,
2062 ) {
2063 let symbol = match product_type {
2064 BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2065 BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2066 BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2067 BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2068 };
2069
2070 let instrument_id = InstrumentId::from(symbol);
2071 let client_order_id = ClientOrderId::from("test-order-1");
2072 let quantity = Quantity::from("1.0");
2073
2074 let client = BybitWebSocketClient::new_trade(
2075 BybitEnvironment::Testnet,
2076 Some("test-key".to_string()),
2077 Some("test-secret".to_string()),
2078 None,
2079 20,
2080 TransportBackend::default(),
2081 None,
2082 );
2083
2084 let params = client
2085 .build_place_order_params(
2086 product_type,
2087 instrument_id,
2088 client_order_id,
2089 OrderSide::Buy,
2090 OrderType::Limit,
2091 quantity,
2092 false,
2093 Some(TimeInForce::Gtc),
2094 Some(Price::from("50000.0")),
2095 None,
2096 None,
2097 None,
2098 None,
2099 is_leverage,
2100 None,
2101 None,
2102 None,
2103 )
2104 .expect("Failed to build params");
2105
2106 assert_eq!(params.is_leverage, expected);
2107 }
2108
2109 #[rstest]
2110 #[case::spot_market_quote_quantity(
2111 BybitProductType::Spot,
2112 OrderType::Market,
2113 true,
2114 Some(BybitMarketUnit::QuoteCoin)
2115 )]
2116 #[case::spot_market_base_quantity(
2117 BybitProductType::Spot,
2118 OrderType::Market,
2119 false,
2120 Some(BybitMarketUnit::BaseCoin)
2121 )]
2122 #[case::spot_limit_no_unit(BybitProductType::Spot, OrderType::Limit, false, None)]
2123 #[case::spot_limit_quote(BybitProductType::Spot, OrderType::Limit, true, None)]
2124 #[case::linear_market_no_unit(BybitProductType::Linear, OrderType::Market, false, None)]
2125 #[case::inverse_market_no_unit(BybitProductType::Inverse, OrderType::Market, true, None)]
2126 fn test_is_quote_quantity_parameter(
2127 #[case] product_type: BybitProductType,
2128 #[case] order_type: OrderType,
2129 #[case] is_quote_quantity: bool,
2130 #[case] expected: Option<BybitMarketUnit>,
2131 ) {
2132 let symbol = match product_type {
2133 BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2134 BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2135 BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2136 BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2137 };
2138
2139 let instrument_id = InstrumentId::from(symbol);
2140 let client_order_id = ClientOrderId::from("test-order-1");
2141 let quantity = Quantity::from("1.0");
2142
2143 let client = BybitWebSocketClient::new_trade(
2144 BybitEnvironment::Testnet,
2145 Some("test-key".to_string()),
2146 Some("test-secret".to_string()),
2147 None,
2148 20,
2149 TransportBackend::default(),
2150 None,
2151 );
2152
2153 let params = client
2154 .build_place_order_params(
2155 product_type,
2156 instrument_id,
2157 client_order_id,
2158 OrderSide::Buy,
2159 order_type,
2160 quantity,
2161 is_quote_quantity,
2162 Some(TimeInForce::Gtc),
2163 if order_type == OrderType::Market {
2164 None
2165 } else {
2166 Some(Price::from("50000.0"))
2167 },
2168 None,
2169 None,
2170 None,
2171 None,
2172 false,
2173 None,
2174 None,
2175 None,
2176 )
2177 .expect("Failed to build params");
2178
2179 assert_eq!(params.market_unit, expected);
2180 }
2181}