Skip to main content

nautilus_bybit/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bybit WebSocket client providing public market data streaming.
17//!
18//! Bybit API reference <https://bybit-exchange.github.io/docs/>.
19
20use std::{
21    fmt::Debug,
22    sync::{
23        Arc,
24        atomic::{AtomicBool, AtomicU8, Ordering},
25    },
26    time::Duration,
27};
28
29use arc_swap::ArcSwap;
30use dashmap::DashMap;
31use nautilus_common::live::get_runtime;
32use nautilus_core::{AtomicMap, AtomicSet, UUID4, consts::NAUTILUS_USER_AGENT};
33use nautilus_model::{
34    data::BarType,
35    enums::{AggregationSource, OrderSide, OrderType, PriceType, TimeInForce, TriggerType},
36    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
37    instruments::{Instrument, InstrumentAny},
38    types::{Price, Quantity},
39};
40use nautilus_network::{
41    backoff::ExponentialBackoff,
42    mode::ConnectionMode,
43    websocket::{
44        AuthTracker, PingHandler, SubscriptionState, TransportBackend, WebSocketClient,
45        WebSocketConfig, channel_message_handler,
46    },
47};
48use serde_json::Value;
49use tokio_util::sync::CancellationToken;
50use ustr::Ustr;
51
52use crate::{
53    common::{
54        consts::{BYBIT_NAUTILUS_BROKER_ID, BYBIT_WS_TOPIC_DELIMITER},
55        credential::Credential,
56        enums::{
57            BybitEnvironment, BybitOrderSide, BybitOrderType, BybitPositionIdx, BybitProductType,
58            BybitTimeInForce, BybitTpSlMode, BybitWsOrderRequestOp, resolve_trigger_type,
59        },
60        parse::{
61            bar_spec_to_bybit_interval, extract_base_coin, extract_raw_symbol, map_time_in_force,
62            spot_leverage, spot_market_unit, trigger_direction,
63        },
64        symbol::BybitSymbol,
65        urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
66    },
67    websocket::{
68        dispatch::PendingOperation,
69        enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
70        error::{BybitWsError, BybitWsResult},
71        handler::{BybitWsFeedHandler, HandlerCommand},
72        messages::{
73            BybitAuthRequest, BybitSubscription, BybitWsAmendOrderParams, BybitWsBatchCancelItem,
74            BybitWsBatchCancelOrderArgs, BybitWsBatchPlaceItem, BybitWsBatchPlaceOrderArgs,
75            BybitWsCancelOrderParams, BybitWsHeader, BybitWsMessage, BybitWsPlaceOrderParams,
76            BybitWsRequest,
77        },
78    },
79};
80
81const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
82const AUTH_WAIT_TIMEOUT: Duration = Duration::from_secs(5);
83pub const BATCH_PROCESSING_LIMIT: usize = 20;
84
85/// Tracks a pending Python execution request for OrderResponse correlation.
86#[derive(Debug, Clone)]
87pub struct PendingPyRequest {
88    pub client_order_id: ClientOrderId,
89    pub operation: PendingOperation,
90    pub trader_id: TraderId,
91    pub strategy_id: StrategyId,
92    pub instrument_id: InstrumentId,
93    pub venue_order_id: Option<VenueOrderId>,
94}
95
96/// Public/market data WebSocket client for Bybit.
97#[cfg_attr(feature = "python", pyo3::pyclass(from_py_object))]
98#[cfg_attr(
99    feature = "python",
100    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bybit")
101)]
102pub struct BybitWebSocketClient {
103    url: String,
104    environment: BybitEnvironment,
105    product_type: Option<BybitProductType>,
106    credential: Option<Credential>,
107    requires_auth: bool,
108    auth_tracker: AuthTracker,
109    heartbeat: Option<u64>,
110    connection_mode: Arc<ArcSwap<AtomicU8>>,
111    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
112    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<BybitWsMessage>>>,
113    signal: Arc<AtomicBool>,
114    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
115    subscriptions: SubscriptionState,
116    account_id: Option<AccountId>,
117    mm_level: Arc<AtomicU8>,
118    bar_types_cache: Arc<AtomicMap<String, BarType>>,
119    instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
120    trade_subs: Arc<AtomicSet<InstrumentId>>,
121    option_greeks_subs: Arc<AtomicSet<InstrumentId>>,
122    bars_timestamp_on_close: Arc<AtomicBool>,
123    pending_py_requests: Arc<DashMap<String, Vec<PendingPyRequest>>>,
124    transport_backend: TransportBackend,
125    cancellation_token: CancellationToken,
126    proxy_url: Option<String>,
127}
128
129impl Debug for BybitWebSocketClient {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        f.debug_struct(stringify!(BybitWebSocketClient))
132            .field("url", &self.url)
133            .field("environment", &self.environment)
134            .field("product_type", &self.product_type)
135            .field("requires_auth", &self.requires_auth)
136            .field("heartbeat", &self.heartbeat)
137            .field("confirmed_subscriptions", &self.subscriptions.len())
138            .finish()
139    }
140}
141
142impl Clone for BybitWebSocketClient {
143    fn clone(&self) -> Self {
144        Self {
145            url: self.url.clone(),
146            environment: self.environment,
147            product_type: self.product_type,
148            credential: self.credential.clone(),
149            requires_auth: self.requires_auth,
150            auth_tracker: self.auth_tracker.clone(),
151            heartbeat: self.heartbeat,
152            connection_mode: Arc::clone(&self.connection_mode),
153            cmd_tx: Arc::clone(&self.cmd_tx),
154            out_rx: None, // Each clone gets its own receiver
155            signal: Arc::clone(&self.signal),
156            task_handle: None, // Each clone gets its own task handle
157            subscriptions: self.subscriptions.clone(),
158            account_id: self.account_id,
159            mm_level: Arc::clone(&self.mm_level),
160            bar_types_cache: Arc::clone(&self.bar_types_cache),
161            instruments_cache: Arc::clone(&self.instruments_cache),
162            trade_subs: Arc::clone(&self.trade_subs),
163            option_greeks_subs: Arc::clone(&self.option_greeks_subs),
164            bars_timestamp_on_close: Arc::clone(&self.bars_timestamp_on_close),
165            pending_py_requests: Arc::clone(&self.pending_py_requests),
166            transport_backend: self.transport_backend,
167            cancellation_token: self.cancellation_token.clone(),
168            proxy_url: self.proxy_url.clone(),
169        }
170    }
171}
172
173impl BybitWebSocketClient {
174    /// Creates a new Bybit public WebSocket client.
175    #[must_use]
176    pub fn new_public(url: Option<String>, heartbeat: u64) -> Self {
177        Self::new_public_with(
178            BybitProductType::Linear,
179            BybitEnvironment::Mainnet,
180            url,
181            heartbeat,
182            TransportBackend::default(),
183            None,
184        )
185    }
186
187    /// Creates a new Bybit public WebSocket client targeting the specified product/environment.
188    #[must_use]
189    pub fn new_public_with(
190        product_type: BybitProductType,
191        environment: BybitEnvironment,
192        url: Option<String>,
193        heartbeat: u64,
194        transport_backend: TransportBackend,
195        proxy_url: Option<String>,
196    ) -> Self {
197        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
198
199        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
200        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
201
202        Self {
203            url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
204            environment,
205            product_type: Some(product_type),
206            credential: None,
207            requires_auth: false,
208            auth_tracker: AuthTracker::new(),
209            heartbeat: Some(heartbeat),
210            connection_mode,
211            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
212            out_rx: None,
213            signal: Arc::new(AtomicBool::new(false)),
214            task_handle: None,
215            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
216            bar_types_cache: Arc::new(AtomicMap::new()),
217            instruments_cache: Arc::new(AtomicMap::new()),
218            trade_subs: Arc::new(AtomicSet::new()),
219            option_greeks_subs: Arc::new(AtomicSet::new()),
220            bars_timestamp_on_close: Arc::new(AtomicBool::new(true)),
221            pending_py_requests: Arc::new(DashMap::new()),
222            account_id: None,
223            mm_level: Arc::new(AtomicU8::new(0)),
224            transport_backend,
225            cancellation_token: CancellationToken::new(),
226            proxy_url,
227        }
228    }
229
230    /// Creates a new Bybit private WebSocket client.
231    ///
232    /// If `api_key` or `api_secret` are not provided, they will be loaded from
233    /// environment variables based on the environment:
234    /// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
235    /// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
236    /// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
237    #[must_use]
238    pub fn new_private(
239        environment: BybitEnvironment,
240        api_key: Option<String>,
241        api_secret: Option<String>,
242        url: Option<String>,
243        heartbeat: u64,
244        transport_backend: TransportBackend,
245        proxy_url: Option<String>,
246    ) -> Self {
247        let credential = Credential::resolve(api_key, api_secret, environment);
248
249        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
250
251        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
252        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
253
254        Self {
255            url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
256            environment,
257            product_type: None,
258            credential,
259            requires_auth: true,
260            auth_tracker: AuthTracker::new(),
261            heartbeat: Some(heartbeat),
262            connection_mode,
263            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
264            out_rx: None,
265            signal: Arc::new(AtomicBool::new(false)),
266            task_handle: None,
267            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
268            bar_types_cache: Arc::new(AtomicMap::new()),
269            instruments_cache: Arc::new(AtomicMap::new()),
270            trade_subs: Arc::new(AtomicSet::new()),
271            option_greeks_subs: Arc::new(AtomicSet::new()),
272            bars_timestamp_on_close: Arc::new(AtomicBool::new(true)),
273            pending_py_requests: Arc::new(DashMap::new()),
274            account_id: None,
275            mm_level: Arc::new(AtomicU8::new(0)),
276            transport_backend,
277            cancellation_token: CancellationToken::new(),
278            proxy_url,
279        }
280    }
281
282    /// Creates a new Bybit trade WebSocket client for order operations.
283    ///
284    /// If `api_key` or `api_secret` are not provided, they will be loaded from
285    /// environment variables based on the environment:
286    /// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
287    /// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
288    /// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
289    #[must_use]
290    pub fn new_trade(
291        environment: BybitEnvironment,
292        api_key: Option<String>,
293        api_secret: Option<String>,
294        url: Option<String>,
295        heartbeat: u64,
296        transport_backend: TransportBackend,
297        proxy_url: Option<String>,
298    ) -> Self {
299        let credential = Credential::resolve(api_key, api_secret, environment);
300
301        let (cmd_tx, _) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
302
303        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
304        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
305
306        Self {
307            url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
308            environment,
309            product_type: None,
310            credential,
311            requires_auth: true,
312            auth_tracker: AuthTracker::new(),
313            heartbeat: Some(heartbeat),
314            connection_mode,
315            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
316            out_rx: None,
317            signal: Arc::new(AtomicBool::new(false)),
318            task_handle: None,
319            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
320            bar_types_cache: Arc::new(AtomicMap::new()),
321            instruments_cache: Arc::new(AtomicMap::new()),
322            trade_subs: Arc::new(AtomicSet::new()),
323            option_greeks_subs: Arc::new(AtomicSet::new()),
324            bars_timestamp_on_close: Arc::new(AtomicBool::new(true)),
325            pending_py_requests: Arc::new(DashMap::new()),
326            account_id: None,
327            mm_level: Arc::new(AtomicU8::new(0)),
328            transport_backend,
329            cancellation_token: CancellationToken::new(),
330            proxy_url,
331        }
332    }
333
334    /// Establishes the WebSocket connection.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if the underlying WebSocket connection cannot be established,
339    /// after retrying multiple times with exponential backoff.
340    pub async fn connect(&mut self) -> BybitWsResult<()> {
341        const MAX_RETRIES: u32 = 5;
342        const CONNECTION_TIMEOUT_SECS: u64 = 10;
343
344        self.signal.store(false, Ordering::Relaxed);
345
346        let (raw_handler, raw_rx) = channel_message_handler();
347
348        // No-op ping handler: handler owns the WebSocketClient and responds to pings directly
349        // in the message loop for minimal latency (see handler.rs pong response)
350        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
351            // Handler responds to pings internally via select! loop
352        });
353
354        let ping_msg = serde_json::to_string(&BybitSubscription {
355            op: BybitWsOperation::Ping,
356            args: vec![],
357            req_id: None,
358        })?;
359
360        let config = WebSocketConfig {
361            url: self.url.clone(),
362            headers: Self::default_headers(),
363            heartbeat: self.heartbeat,
364            heartbeat_msg: Some(ping_msg),
365            reconnect_timeout_ms: Some(5_000),
366            reconnect_delay_initial_ms: Some(500),
367            reconnect_delay_max_ms: Some(5_000),
368            reconnect_backoff_factor: Some(1.5),
369            reconnect_jitter_ms: Some(250),
370            reconnect_max_attempts: None,
371            idle_timeout_ms: None,
372            backend: self.transport_backend,
373            proxy_url: self.proxy_url.clone(),
374        };
375
376        // Retry initial connection with exponential backoff to handle transient DNS/network issues
377        let mut backoff = ExponentialBackoff::new(
378            Duration::from_millis(500),
379            Duration::from_millis(5000),
380            2.0,
381            250,
382            false,
383        )
384        .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
385
386        #[allow(unused_assignments)]
387        let mut last_error = String::new();
388        let mut attempt = 0;
389        let client = loop {
390            attempt += 1;
391
392            match tokio::time::timeout(
393                Duration::from_secs(CONNECTION_TIMEOUT_SECS),
394                WebSocketClient::connect(
395                    config.clone(),
396                    Some(raw_handler.clone()),
397                    Some(ping_handler.clone()),
398                    None,
399                    vec![],
400                    None,
401                ),
402            )
403            .await
404            {
405                Ok(Ok(client)) => {
406                    if attempt > 1 {
407                        log::info!("WebSocket connection established after {attempt} attempts");
408                    }
409                    break client;
410                }
411                Ok(Err(e)) => {
412                    last_error = e.to_string();
413                    log::warn!(
414                        "WebSocket connection attempt failed: attempt={attempt}, max_retries={MAX_RETRIES}, url={}, error={last_error}",
415                        self.url
416                    );
417                }
418                Err(_) => {
419                    last_error = format!(
420                        "Connection timeout after {CONNECTION_TIMEOUT_SECS}s (possible DNS resolution failure)"
421                    );
422                    log::warn!(
423                        "WebSocket connection attempt timed out: attempt={attempt}, max_retries={MAX_RETRIES}, url={}",
424                        self.url
425                    );
426                }
427            }
428
429            if attempt >= MAX_RETRIES {
430                return Err(BybitWsError::Transport(format!(
431                    "Failed to connect to {} after {MAX_RETRIES} attempts: {}. \
432                    If this is a DNS error, check your network configuration and DNS settings.",
433                    self.url,
434                    if last_error.is_empty() {
435                        "unknown error"
436                    } else {
437                        &last_error
438                    }
439                )));
440            }
441
442            let delay = backoff.next_duration();
443            log::debug!(
444                "Retrying in {delay:?} (attempt {}/{MAX_RETRIES})",
445                attempt + 1
446            );
447            tokio::time::sleep(delay).await;
448        };
449
450        self.connection_mode.store(client.connection_mode_atomic());
451        client.set_auth_tracker(self.auth_tracker.clone(), self.requires_auth);
452
453        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<BybitWsMessage>();
454        self.out_rx = Some(Arc::new(out_rx));
455
456        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
457        *self.cmd_tx.write().await = cmd_tx.clone();
458
459        let cmd = HandlerCommand::SetClient(client);
460
461        self.send_cmd(cmd).await?;
462
463        let signal = Arc::clone(&self.signal);
464        let subscriptions = self.subscriptions.clone();
465        let credential = self.credential.clone();
466        let requires_auth = self.requires_auth;
467        let cmd_tx_for_reconnect = cmd_tx.clone();
468        let auth_tracker = self.auth_tracker.clone();
469        let auth_tracker_for_handler = auth_tracker.clone();
470
471        let stream_handle = get_runtime().spawn(async move {
472            let mut handler = BybitWsFeedHandler::new(
473                signal.clone(),
474                cmd_rx,
475                raw_rx,
476                auth_tracker_for_handler,
477                subscriptions.clone(),
478            );
479
480            // Helper closure to resubscribe all tracked subscriptions after reconnection
481            let resubscribe_all = || async {
482                let topics = subscriptions.all_topics();
483
484                if topics.is_empty() {
485                    return;
486                }
487
488                log::debug!(
489                    "Resubscribing to confirmed subscriptions: count={}",
490                    topics.len()
491                );
492
493                for topic in &topics {
494                    subscriptions.mark_subscribe(topic.as_str());
495                }
496
497                let mut payloads = Vec::with_capacity(topics.len());
498                for topic in &topics {
499                    let message = BybitSubscription {
500                        op: BybitWsOperation::Subscribe,
501                        args: vec![topic.clone()],
502                        req_id: Some(topic.clone()),
503                    };
504
505                    if let Ok(payload) = serde_json::to_string(&message) {
506                        payloads.push(payload);
507                    }
508                }
509
510                let cmd = HandlerCommand::Subscribe { topics: payloads };
511
512                if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
513                    log::error!("Failed to send resubscribe command: {e}");
514                }
515            };
516
517            // Run message processing with reconnection handling
518            loop {
519                match handler.next().await {
520                    Some(BybitWsMessage::Reconnected) => {
521                        if signal.load(Ordering::Relaxed) {
522                            continue;
523                        }
524
525                        log::info!("WebSocket reconnected");
526
527                        // Mark all confirmed subscriptions as failed so they transition to pending state
528                        let confirmed_topics: Vec<String> = {
529                            let confirmed = subscriptions.confirmed();
530                            let mut topics = Vec::new();
531
532                            for entry in confirmed.iter() {
533                                let (channel, symbols) = entry.pair();
534                                for symbol in symbols {
535                                    if symbol.is_empty() {
536                                        topics.push(channel.to_string());
537                                    } else {
538                                        topics.push(format!("{channel}.{symbol}"));
539                                    }
540                                }
541                            }
542                            topics
543                        };
544
545                        if !confirmed_topics.is_empty() {
546                            log::debug!(
547                                "Marking confirmed subscriptions as pending for replay: count={}",
548                                confirmed_topics.len()
549                            );
550
551                            for topic in confirmed_topics {
552                                subscriptions.mark_failure(&topic);
553                            }
554                        }
555
556                        if requires_auth {
557                            log::debug!("Re-authenticating after reconnection");
558
559                            if let Some(cred) = &credential {
560                                // Begin auth attempt so succeed() will update state
561                                let _rx = auth_tracker.begin();
562
563                                let expires = chrono::Utc::now().timestamp_millis()
564                                    + WEBSOCKET_AUTH_WINDOW_MS;
565                                let signature = cred.sign_websocket_auth(expires);
566
567                                let auth_message = BybitAuthRequest {
568                                    op: BybitWsOperation::Auth,
569                                    args: vec![
570                                        Value::String(cred.api_key().to_string()),
571                                        Value::Number(expires.into()),
572                                        Value::String(signature),
573                                    ],
574                                };
575
576                                if let Ok(payload) = serde_json::to_string(&auth_message) {
577                                    let cmd = HandlerCommand::Authenticate { payload };
578                                    if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
579                                        log::error!(
580                                            "Failed to send reconnection auth command: error={e}"
581                                        );
582                                    }
583                                } else {
584                                    log::error!("Failed to serialize reconnection auth message");
585                                }
586                            }
587                        }
588
589                        // Unauthenticated sessions resubscribe immediately after reconnection,
590                        // authenticated sessions wait for Auth message
591                        if !requires_auth {
592                            log::debug!("No authentication required, resubscribing immediately");
593                            resubscribe_all().await;
594                        }
595
596                        // Forward to out_tx so caller sees the Reconnected message
597                        if out_tx.send(BybitWsMessage::Reconnected).is_err() {
598                            log::debug!("Receiver dropped, stopping");
599                            break;
600                        }
601                    }
602                    Some(BybitWsMessage::Auth(ref auth)) => {
603                        let is_success = auth.success.unwrap_or(false) || auth.ret_code == Some(0);
604                        if is_success {
605                            log::debug!("Authenticated, resubscribing");
606                            resubscribe_all().await;
607                        }
608
609                        if out_tx.send(BybitWsMessage::Auth(auth.clone())).is_err() {
610                            log::error!("Failed to send message (receiver dropped)");
611                            break;
612                        }
613                    }
614                    Some(msg) => {
615                        if out_tx.send(msg).is_err() {
616                            log::error!("Failed to send message (receiver dropped)");
617                            break;
618                        }
619                    }
620                    None => {
621                        // Stream ended - check if it's a stop signal
622                        if handler.is_stopped() {
623                            log::debug!("Stop signal received, ending message processing");
624                            break;
625                        }
626                        // Otherwise it's an unexpected stream end
627                        log::warn!("WebSocket stream ended unexpectedly");
628                        break;
629                    }
630                }
631            }
632
633            log::debug!("Handler task exiting");
634        });
635
636        self.task_handle = Some(Arc::new(stream_handle));
637
638        if requires_auth && let Err(e) = self.authenticate_if_required().await {
639            return Err(e);
640        }
641
642        Ok(())
643    }
644
645    /// Disconnects the WebSocket client and stops the background task.
646    pub async fn close(&mut self) -> BybitWsResult<()> {
647        log::debug!("Starting close process");
648
649        self.signal.store(true, Ordering::Relaxed);
650
651        let cmd = HandlerCommand::Disconnect;
652        if let Err(e) = self.cmd_tx.read().await.send(cmd) {
653            log::debug!(
654                "Failed to send disconnect command (handler may already be shut down): {e}"
655            );
656        }
657
658        if let Some(task_handle) = self.task_handle.take() {
659            match Arc::try_unwrap(task_handle) {
660                Ok(handle) => {
661                    log::debug!("Waiting for task handle to complete");
662                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
663                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
664                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
665                        Err(_) => {
666                            log::warn!(
667                                "Timeout waiting for task handle, task may still be running"
668                            );
669                        }
670                    }
671                }
672                Err(arc_handle) => {
673                    log::debug!(
674                        "Cannot take ownership of task handle - other references exist, aborting task"
675                    );
676                    arc_handle.abort();
677                }
678            }
679        } else {
680            log::debug!("No task handle to await");
681        }
682
683        self.auth_tracker.invalidate();
684
685        log::debug!("Closed");
686
687        Ok(())
688    }
689
690    /// Returns a value indicating whether the client is active.
691    #[must_use]
692    pub fn is_active(&self) -> bool {
693        let connection_mode_arc = self.connection_mode.load();
694        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
695            && !self.signal.load(Ordering::Relaxed)
696    }
697
698    /// Returns a value indicating whether the client is closed.
699    pub fn is_closed(&self) -> bool {
700        let connection_mode_arc = self.connection_mode.load();
701        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
702            || self.signal.load(Ordering::Relaxed)
703    }
704
705    /// Waits until the WebSocket client becomes active or times out.
706    ///
707    /// # Errors
708    ///
709    /// Returns an error if the timeout is exceeded before the client becomes active.
710    pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
711        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
712
713        tokio::time::timeout(timeout, async {
714            while !self.is_active() {
715                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
716            }
717        })
718        .await
719        .map_err(|_| {
720            BybitWsError::ClientError(format!(
721                "WebSocket connection timeout after {timeout_secs} seconds"
722            ))
723        })?;
724
725        Ok(())
726    }
727
728    /// Subscribe to the provided topic strings.
729    pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
730        if topics.is_empty() {
731            return Ok(());
732        }
733
734        log::debug!("Subscribing to topics: {topics:?}");
735
736        // Use reference counting to deduplicate subscriptions
737        let mut topics_to_send = Vec::new();
738
739        for topic in topics {
740            // Returns true if this is the first subscription (ref count 0 -> 1)
741            if self.subscriptions.add_reference(&topic) {
742                self.subscriptions.mark_subscribe(&topic);
743                topics_to_send.push(topic.clone());
744            } else {
745                log::debug!("Already subscribed to {topic}, skipping duplicate subscription");
746            }
747        }
748
749        if topics_to_send.is_empty() {
750            return Ok(());
751        }
752
753        // Serialize subscription messages
754        let mut payloads = Vec::with_capacity(topics_to_send.len());
755        for topic in &topics_to_send {
756            let message = BybitSubscription {
757                op: BybitWsOperation::Subscribe,
758                args: vec![topic.clone()],
759                req_id: Some(topic.clone()),
760            };
761            let payload = serde_json::to_string(&message).map_err(|e| {
762                BybitWsError::Json(format!("Failed to serialize subscription: {e}"))
763            })?;
764            payloads.push(payload);
765        }
766
767        let cmd = HandlerCommand::Subscribe { topics: payloads };
768        self.cmd_tx
769            .read()
770            .await
771            .send(cmd)
772            .map_err(|e| BybitWsError::Send(format!("Failed to send subscribe command: {e}")))?;
773
774        Ok(())
775    }
776
777    /// Unsubscribe from the provided topics.
778    pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
779        if topics.is_empty() {
780            return Ok(());
781        }
782
783        log::debug!("Attempting to unsubscribe from topics: {topics:?}");
784
785        if self.signal.load(Ordering::Relaxed) {
786            log::debug!("Shutdown signal detected, skipping unsubscribe");
787            return Ok(());
788        }
789
790        // Use reference counting to avoid unsubscribing while other consumers still need the topic
791        let mut topics_to_send = Vec::new();
792
793        for topic in topics {
794            // Returns true if this was the last subscription (ref count 1 -> 0)
795            if self.subscriptions.remove_reference(&topic) {
796                self.subscriptions.mark_unsubscribe(&topic);
797                topics_to_send.push(topic.clone());
798            } else {
799                log::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
800            }
801        }
802
803        if topics_to_send.is_empty() {
804            return Ok(());
805        }
806
807        // Serialize unsubscription messages
808        let mut payloads = Vec::with_capacity(topics_to_send.len());
809        for topic in &topics_to_send {
810            let message = BybitSubscription {
811                op: BybitWsOperation::Unsubscribe,
812                args: vec![topic.clone()],
813                req_id: Some(topic.clone()),
814            };
815
816            if let Ok(payload) = serde_json::to_string(&message) {
817                payloads.push(payload);
818            }
819        }
820
821        let cmd = HandlerCommand::Unsubscribe { topics: payloads };
822        if let Err(e) = self.cmd_tx.read().await.send(cmd) {
823            log::debug!("Failed to send unsubscribe command: error={e}");
824        }
825
826        Ok(())
827    }
828
829    /// Returns a stream of venue-typed [`BybitWsMessage`] items.
830    ///
831    /// # Panics
832    ///
833    /// Panics if called before [`Self::connect`] or if the stream has already been taken.
834    pub fn stream(&mut self) -> impl futures_util::Stream<Item = BybitWsMessage> + use<> {
835        let rx = self
836            .out_rx
837            .take()
838            .expect("Stream receiver already taken or client not connected");
839        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
840        async_stream::stream! {
841            while let Some(msg) = rx.recv().await {
842                yield msg;
843            }
844        }
845    }
846
847    /// Returns the number of currently registered subscriptions.
848    #[must_use]
849    pub fn subscription_count(&self) -> usize {
850        self.subscriptions.len()
851    }
852
853    /// Returns the credential associated with this client, if any.
854    #[must_use]
855    pub fn credential(&self) -> Option<&Credential> {
856        self.credential.as_ref()
857    }
858
859    /// Sets the account ID for account message parsing.
860    pub fn set_account_id(&mut self, account_id: AccountId) {
861        self.account_id = Some(account_id);
862    }
863
864    /// Sets the account market maker level.
865    pub fn set_mm_level(&self, mm_level: u8) {
866        self.mm_level.store(mm_level, Ordering::Relaxed);
867    }
868
869    /// Returns the account ID if set.
870    #[must_use]
871    pub fn account_id(&self) -> Option<AccountId> {
872        self.account_id
873    }
874
875    /// Returns the product type for public connections.
876    #[must_use]
877    pub fn product_type(&self) -> Option<BybitProductType> {
878        self.product_type
879    }
880
881    /// Returns a reference to the bar types cache.
882    #[must_use]
883    pub fn bar_types_cache(&self) -> &Arc<AtomicMap<String, BarType>> {
884        &self.bar_types_cache
885    }
886
887    /// Adds an instrument to the shared instruments cache.
888    pub fn cache_instrument(&self, instrument: InstrumentAny) {
889        self.instruments_cache
890            .insert(instrument.id().symbol.inner(), instrument);
891    }
892
893    /// Returns a snapshot of the instruments cache keyed by symbol.
894    #[must_use]
895    pub fn instruments_snapshot(&self) -> ahash::AHashMap<Ustr, InstrumentAny> {
896        (**self.instruments_cache.load()).clone()
897    }
898
899    /// Sets whether bar timestamps use the close time.
900    pub fn set_bars_timestamp_on_close(&self, value: bool) {
901        self.bars_timestamp_on_close.store(value, Ordering::Relaxed);
902    }
903
904    /// Returns whether bar timestamps use the close time.
905    #[must_use]
906    pub fn bars_timestamp_on_close(&self) -> bool {
907        self.bars_timestamp_on_close.load(Ordering::Relaxed)
908    }
909
910    /// Adds an instrument ID to the option greeks subscription set.
911    pub fn add_option_greeks_sub(&self, instrument_id: InstrumentId) {
912        self.option_greeks_subs.insert(instrument_id);
913    }
914
915    /// Removes an instrument ID from the option greeks subscription set.
916    pub fn remove_option_greeks_sub(&self, instrument_id: &InstrumentId) {
917        self.option_greeks_subs.remove(instrument_id);
918    }
919
920    /// Returns a reference to the option greeks subscription set.
921    #[must_use]
922    pub fn option_greeks_subs(&self) -> &Arc<AtomicSet<InstrumentId>> {
923        &self.option_greeks_subs
924    }
925
926    /// Returns a reference to the trade subscriptions set.
927    #[must_use]
928    pub fn trade_subs(&self) -> &Arc<AtomicSet<InstrumentId>> {
929        &self.trade_subs
930    }
931
932    /// Returns a reference to the pending Python requests map.
933    #[must_use]
934    pub fn pending_py_requests(&self) -> &Arc<DashMap<String, Vec<PendingPyRequest>>> {
935        &self.pending_py_requests
936    }
937
938    /// Returns a reference to the live instruments cache Arc.
939    #[must_use]
940    pub fn instruments_cache_ref(&self) -> &Arc<AtomicMap<Ustr, InstrumentAny>> {
941        &self.instruments_cache
942    }
943
944    /// Subscribes to orderbook updates for a specific instrument.
945    ///
946    /// # Errors
947    ///
948    /// Returns an error if the subscription request fails.
949    ///
950    /// # References
951    ///
952    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/orderbook>
953    pub async fn subscribe_orderbook(
954        &self,
955        instrument_id: InstrumentId,
956        depth: u32,
957    ) -> BybitWsResult<()> {
958        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
959        let topic = format!(
960            "{}.{depth}.{raw_symbol}",
961            BybitWsPublicChannel::OrderBook.as_ref()
962        );
963        self.subscribe(vec![topic]).await
964    }
965
966    /// Unsubscribes from orderbook updates for a specific instrument.
967    pub async fn unsubscribe_orderbook(
968        &self,
969        instrument_id: InstrumentId,
970        depth: u32,
971    ) -> BybitWsResult<()> {
972        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
973        let topic = format!(
974            "{}.{depth}.{raw_symbol}",
975            BybitWsPublicChannel::OrderBook.as_ref()
976        );
977        self.unsubscribe(vec![topic]).await
978    }
979
980    /// Subscribes to public trade updates for a specific instrument.
981    ///
982    /// # Errors
983    ///
984    /// Returns an error if the subscription request fails.
985    ///
986    /// # References
987    ///
988    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/trade>
989    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
990        self.trade_subs.insert(instrument_id);
991        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
992        // Bybit option trades use baseCoin topic (e.g. publicTrade.BTC)
993        let topic_symbol = match self.product_type {
994            Some(BybitProductType::Option) => extract_base_coin(raw_symbol),
995            _ => raw_symbol,
996        };
997        let topic = format!(
998            "{}.{topic_symbol}",
999            BybitWsPublicChannel::PublicTrade.as_ref()
1000        );
1001        self.subscribe(vec![topic]).await
1002    }
1003
1004    /// Unsubscribes from public trade updates for a specific instrument.
1005    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1006        self.trade_subs.remove(&instrument_id);
1007        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1008        let topic_symbol = match self.product_type {
1009            Some(BybitProductType::Option) => extract_base_coin(raw_symbol),
1010            _ => raw_symbol,
1011        };
1012        let topic = format!(
1013            "{}.{topic_symbol}",
1014            BybitWsPublicChannel::PublicTrade.as_ref()
1015        );
1016        self.unsubscribe(vec![topic]).await
1017    }
1018
1019    /// Subscribes to ticker updates for a specific instrument.
1020    ///
1021    /// # Errors
1022    ///
1023    /// Returns an error if the subscription request fails.
1024    ///
1025    /// # References
1026    ///
1027    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/ticker>
1028    pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1029        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1030        let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1031        self.subscribe(vec![topic]).await
1032    }
1033
1034    /// Unsubscribes from ticker updates for a specific instrument.
1035    pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1036        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1037        let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1038        self.unsubscribe(vec![topic]).await
1039    }
1040
1041    /// Subscribes to kline/candlestick updates for a specific instrument.
1042    ///
1043    /// # Errors
1044    ///
1045    /// Returns an error if the subscription request fails.
1046    ///
1047    /// # References
1048    ///
1049    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/kline>
1050    pub async fn subscribe_bars(&self, bar_type: BarType) -> BybitWsResult<()> {
1051        if self.product_type == Some(BybitProductType::Option) {
1052            return Err(BybitWsError::ClientError(
1053                "Bybit does not support kline/bar data for options".to_string(),
1054            ));
1055        }
1056
1057        let spec = bar_type.spec();
1058
1059        if spec.price_type != PriceType::Last {
1060            return Err(BybitWsError::ClientError(format!(
1061                "Invalid bar type: Bybit bars only support LAST price type, received {}",
1062                spec.price_type
1063            )));
1064        }
1065
1066        if bar_type.aggregation_source() != AggregationSource::External {
1067            return Err(BybitWsError::ClientError(format!(
1068                "Invalid bar type: Bybit bars only support EXTERNAL aggregation source, received {}",
1069                bar_type.aggregation_source()
1070            )));
1071        }
1072
1073        let interval = bar_spec_to_bybit_interval(spec.aggregation, spec.step.get() as u64)
1074            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1075
1076        let instrument_id = bar_type.instrument_id();
1077        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1078        let topic = format!(
1079            "{}.{}.{raw_symbol}",
1080            BybitWsPublicChannel::Kline.as_ref(),
1081            interval
1082        );
1083
1084        // Coordinate with reference counting to avoid duplicate cache entries
1085        if self.subscriptions.get_reference_count(&topic) == 0 {
1086            self.bar_types_cache.insert(topic.clone(), bar_type);
1087        }
1088
1089        self.subscribe(vec![topic]).await
1090    }
1091
1092    /// Unsubscribes from kline/candlestick updates for a specific instrument.
1093    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> BybitWsResult<()> {
1094        let spec = bar_type.spec();
1095        let interval = bar_spec_to_bybit_interval(spec.aggregation, spec.step.get() as u64)
1096            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1097
1098        let instrument_id = bar_type.instrument_id();
1099        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1100        let topic = format!(
1101            "{}.{}.{raw_symbol}",
1102            BybitWsPublicChannel::Kline.as_ref(),
1103            interval
1104        );
1105
1106        // Coordinate with reference counting to preserve cache for other subscribers
1107        if self.subscriptions.get_reference_count(&topic) == 1 {
1108            self.bar_types_cache.remove(&topic);
1109        }
1110
1111        self.unsubscribe(vec![topic]).await
1112    }
1113
1114    /// Subscribes to order updates.
1115    ///
1116    /// # Errors
1117    ///
1118    /// Returns an error if the subscription request fails or if not authenticated.
1119    ///
1120    /// # References
1121    ///
1122    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/order>
1123    pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
1124        if !self.requires_auth {
1125            return Err(BybitWsError::Authentication(
1126                "Order subscription requires authentication".to_string(),
1127            ));
1128        }
1129        self.subscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1130            .await
1131    }
1132
1133    /// Unsubscribes from order updates.
1134    pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
1135        self.unsubscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1136            .await
1137    }
1138
1139    /// Subscribes to execution/fill updates.
1140    ///
1141    /// # Errors
1142    ///
1143    /// Returns an error if the subscription request fails or if not authenticated.
1144    ///
1145    /// # References
1146    ///
1147    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/execution>
1148    pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
1149        if !self.requires_auth {
1150            return Err(BybitWsError::Authentication(
1151                "Execution subscription requires authentication".to_string(),
1152            ));
1153        }
1154        self.subscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1155            .await
1156    }
1157
1158    /// Unsubscribes from execution/fill updates.
1159    pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
1160        self.unsubscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1161            .await
1162    }
1163
1164    /// Subscribes to position updates.
1165    ///
1166    /// # Errors
1167    ///
1168    /// Returns an error if the subscription request fails or if not authenticated.
1169    ///
1170    /// # References
1171    ///
1172    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/position>
1173    pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
1174        if !self.requires_auth {
1175            return Err(BybitWsError::Authentication(
1176                "Position subscription requires authentication".to_string(),
1177            ));
1178        }
1179        self.subscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1180            .await
1181    }
1182
1183    /// Unsubscribes from position updates.
1184    pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
1185        self.unsubscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1186            .await
1187    }
1188
1189    /// Subscribes to wallet/balance updates.
1190    ///
1191    /// # Errors
1192    ///
1193    /// Returns an error if the subscription request fails or if not authenticated.
1194    ///
1195    /// # References
1196    ///
1197    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/wallet>
1198    pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
1199        if !self.requires_auth {
1200            return Err(BybitWsError::Authentication(
1201                "Wallet subscription requires authentication".to_string(),
1202            ));
1203        }
1204        self.subscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1205            .await
1206    }
1207
1208    /// Unsubscribes from wallet/balance updates.
1209    pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
1210        self.unsubscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1211            .await
1212    }
1213
1214    /// Waits for the session to be authenticated, aborting early if the client
1215    /// enters a terminal state (closed or disconnecting) during the wait.
1216    async fn require_authenticated(&self) -> BybitWsResult<()> {
1217        if self.is_closed() {
1218            return Err(BybitWsError::ClientError(
1219                "WebSocket client is closed".to_string(),
1220            ));
1221        }
1222
1223        if self.auth_tracker.is_authenticated() {
1224            return Ok(());
1225        }
1226
1227        tokio::select! {
1228            authenticated = self.auth_tracker.wait_for_authenticated(AUTH_WAIT_TIMEOUT) => {
1229                if authenticated {
1230                    Ok(())
1231                } else {
1232                    Err(BybitWsError::Authentication(
1233                        "Must be authenticated".to_string(),
1234                    ))
1235                }
1236            }
1237            () = async {
1238                loop {
1239                    tokio::time::sleep(Duration::from_millis(100)).await;
1240
1241                    if self.is_closed() {
1242                        return;
1243                    }
1244                }
1245            } => {
1246                Err(BybitWsError::ClientError(
1247                    "WebSocket client closed during authentication wait".to_string(),
1248                ))
1249            }
1250        }
1251    }
1252
1253    /// Places an order via WebSocket, returning the request ID for correlation.
1254    ///
1255    /// # Errors
1256    ///
1257    /// Returns an error if the order request fails or if not authenticated.
1258    pub async fn place_order(&self, params: BybitWsPlaceOrderParams) -> BybitWsResult<String> {
1259        self.require_authenticated().await?;
1260
1261        let req_id = UUID4::new().to_string();
1262
1263        let referer = if self.include_referer_header(params.time_in_force) {
1264            Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1265        } else {
1266            None
1267        };
1268
1269        let request = BybitWsRequest {
1270            req_id: Some(req_id.clone()),
1271            op: BybitWsOrderRequestOp::Create,
1272            header: BybitWsHeader::with_referer(referer),
1273            args: vec![params],
1274        };
1275
1276        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1277        self.send_text(&payload).await?;
1278
1279        Ok(req_id)
1280    }
1281
1282    /// Amends an existing order via WebSocket, returning the request ID for correlation.
1283    ///
1284    /// # Errors
1285    ///
1286    /// Returns an error if the amend request fails or if not authenticated.
1287    pub async fn amend_order(&self, params: BybitWsAmendOrderParams) -> BybitWsResult<String> {
1288        self.require_authenticated().await?;
1289
1290        let req_id = UUID4::new().to_string();
1291
1292        let request = BybitWsRequest {
1293            req_id: Some(req_id.clone()),
1294            op: BybitWsOrderRequestOp::Amend,
1295            header: BybitWsHeader::now(),
1296            args: vec![params],
1297        };
1298
1299        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1300        self.send_text(&payload).await?;
1301
1302        Ok(req_id)
1303    }
1304
1305    /// Cancels an order via WebSocket, returning the request ID for correlation.
1306    ///
1307    /// # Errors
1308    ///
1309    /// Returns an error if the cancel request fails or if not authenticated.
1310    pub async fn cancel_order(&self, params: BybitWsCancelOrderParams) -> BybitWsResult<String> {
1311        self.require_authenticated().await?;
1312
1313        let req_id = UUID4::new().to_string();
1314
1315        let request = BybitWsRequest {
1316            req_id: Some(req_id.clone()),
1317            op: BybitWsOrderRequestOp::Cancel,
1318            header: BybitWsHeader::now(),
1319            args: vec![params],
1320        };
1321
1322        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1323        self.send_text(&payload).await?;
1324
1325        Ok(req_id)
1326    }
1327
1328    /// Batch creates multiple orders via WebSocket, returning the request ID for correlation.
1329    ///
1330    /// # Errors
1331    ///
1332    /// Returns an error if the batch request fails or if not authenticated.
1333    pub async fn batch_place_orders(
1334        &self,
1335        orders: Vec<BybitWsPlaceOrderParams>,
1336    ) -> BybitWsResult<Vec<String>> {
1337        self.require_authenticated().await?;
1338
1339        if orders.is_empty() {
1340            log::warn!("Batch place orders called with empty orders list");
1341            return Ok(vec![]);
1342        }
1343
1344        let mut req_ids = Vec::new();
1345
1346        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1347            let req_id = self.batch_place_orders_chunk(chunk.to_vec()).await?;
1348            req_ids.push(req_id);
1349        }
1350
1351        Ok(req_ids)
1352    }
1353
1354    async fn batch_place_orders_chunk(
1355        &self,
1356        orders: Vec<BybitWsPlaceOrderParams>,
1357    ) -> BybitWsResult<String> {
1358        let category = orders[0].category;
1359        let batch_req_id = UUID4::new().to_string();
1360
1361        let mm_level = self.mm_level.load(Ordering::Relaxed);
1362        let has_non_post_only = orders
1363            .iter()
1364            .any(|o| !matches!(o.time_in_force, Some(BybitTimeInForce::PostOnly)));
1365        let referer = if has_non_post_only || mm_level == 0 {
1366            Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1367        } else {
1368            None
1369        };
1370
1371        let request_items: Vec<BybitWsBatchPlaceItem> = orders
1372            .into_iter()
1373            .map(|order| BybitWsBatchPlaceItem {
1374                symbol: order.symbol,
1375                side: order.side,
1376                order_type: order.order_type,
1377                qty: order.qty,
1378                is_leverage: order.is_leverage,
1379                market_unit: order.market_unit,
1380                price: order.price,
1381                time_in_force: order.time_in_force,
1382                order_link_id: order.order_link_id,
1383                reduce_only: order.reduce_only,
1384                close_on_trigger: order.close_on_trigger,
1385                trigger_price: order.trigger_price,
1386                trigger_by: order.trigger_by,
1387                trigger_direction: order.trigger_direction,
1388                tpsl_mode: order.tpsl_mode,
1389                take_profit: order.take_profit,
1390                stop_loss: order.stop_loss,
1391                tp_trigger_by: order.tp_trigger_by,
1392                sl_trigger_by: order.sl_trigger_by,
1393                sl_trigger_price: order.sl_trigger_price,
1394                tp_trigger_price: order.tp_trigger_price,
1395                sl_order_type: order.sl_order_type,
1396                tp_order_type: order.tp_order_type,
1397                sl_limit_price: order.sl_limit_price,
1398                tp_limit_price: order.tp_limit_price,
1399                order_iv: order.order_iv,
1400                mmp: order.mmp,
1401                position_idx: order.position_idx,
1402            })
1403            .collect();
1404
1405        let args = BybitWsBatchPlaceOrderArgs {
1406            category,
1407            request: request_items,
1408        };
1409
1410        let request = BybitWsRequest {
1411            req_id: Some(batch_req_id.clone()),
1412            op: BybitWsOrderRequestOp::CreateBatch,
1413            header: BybitWsHeader::with_referer(referer),
1414            args: vec![args],
1415        };
1416
1417        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1418        self.send_text(&payload).await?;
1419
1420        Ok(batch_req_id)
1421    }
1422
1423    /// Batch amends multiple orders via WebSocket.
1424    ///
1425    /// # Errors
1426    ///
1427    /// Returns an error if the batch request fails or if not authenticated.
1428    pub async fn batch_amend_orders(
1429        &self,
1430        orders: Vec<BybitWsAmendOrderParams>,
1431    ) -> BybitWsResult<Vec<String>> {
1432        self.require_authenticated().await?;
1433
1434        if orders.is_empty() {
1435            log::warn!("Batch amend orders called with empty orders list");
1436            return Ok(vec![]);
1437        }
1438
1439        let mut req_ids = Vec::new();
1440
1441        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1442            let req_id = self.batch_amend_orders_chunk(chunk.to_vec()).await?;
1443            req_ids.push(req_id);
1444        }
1445
1446        Ok(req_ids)
1447    }
1448
1449    async fn batch_amend_orders_chunk(
1450        &self,
1451        orders: Vec<BybitWsAmendOrderParams>,
1452    ) -> BybitWsResult<String> {
1453        let batch_req_id = UUID4::new().to_string();
1454
1455        let request = BybitWsRequest {
1456            req_id: Some(batch_req_id.clone()),
1457            op: BybitWsOrderRequestOp::AmendBatch,
1458            header: BybitWsHeader::now(),
1459            args: orders,
1460        };
1461
1462        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1463        self.send_text(&payload).await?;
1464
1465        Ok(batch_req_id)
1466    }
1467
1468    /// Batch cancels multiple orders via WebSocket, returning the request ID for correlation.
1469    ///
1470    /// # Errors
1471    ///
1472    /// Returns an error if the batch request fails or if not authenticated.
1473    pub async fn batch_cancel_orders(
1474        &self,
1475        orders: Vec<BybitWsCancelOrderParams>,
1476    ) -> BybitWsResult<Vec<String>> {
1477        self.require_authenticated().await?;
1478
1479        if orders.is_empty() {
1480            log::warn!("Batch cancel orders called with empty orders list");
1481            return Ok(vec![]);
1482        }
1483
1484        let mut req_ids = Vec::new();
1485
1486        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1487            let req_id = self.batch_cancel_orders_chunk(chunk.to_vec()).await?;
1488            req_ids.push(req_id);
1489        }
1490
1491        Ok(req_ids)
1492    }
1493
1494    async fn batch_cancel_orders_chunk(
1495        &self,
1496        orders: Vec<BybitWsCancelOrderParams>,
1497    ) -> BybitWsResult<String> {
1498        if orders.is_empty() {
1499            return Ok(String::new());
1500        }
1501
1502        let category = orders[0].category;
1503        let batch_req_id = UUID4::new().to_string();
1504
1505        let request_items: Vec<BybitWsBatchCancelItem> = orders
1506            .into_iter()
1507            .map(|order| BybitWsBatchCancelItem {
1508                symbol: order.symbol,
1509                order_id: order.order_id,
1510                order_link_id: order.order_link_id,
1511            })
1512            .collect();
1513
1514        let args = BybitWsBatchCancelOrderArgs {
1515            category,
1516            request: request_items,
1517        };
1518
1519        let request = BybitWsRequest {
1520            req_id: Some(batch_req_id.clone()),
1521            op: BybitWsOrderRequestOp::CancelBatch,
1522            header: BybitWsHeader::now(),
1523            args: vec![args],
1524        };
1525
1526        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1527        self.send_text(&payload).await?;
1528
1529        Ok(batch_req_id)
1530    }
1531
1532    /// Submits an order using Nautilus domain objects.
1533    ///
1534    /// # Errors
1535    ///
1536    /// Returns an error if order submission fails or if not authenticated.
1537    #[expect(clippy::too_many_arguments)]
1538    pub async fn submit_order(
1539        &self,
1540        product_type: BybitProductType,
1541        instrument_id: InstrumentId,
1542        client_order_id: ClientOrderId,
1543        order_side: OrderSide,
1544        order_type: OrderType,
1545        quantity: Quantity,
1546        is_quote_quantity: bool,
1547        time_in_force: Option<TimeInForce>,
1548        price: Option<Price>,
1549        trigger_price: Option<Price>,
1550        trigger_type: Option<TriggerType>,
1551        post_only: Option<bool>,
1552        reduce_only: Option<bool>,
1553        is_leverage: bool,
1554        position_idx: Option<BybitPositionIdx>,
1555    ) -> BybitWsResult<String> {
1556        let params = self.build_place_order_params(
1557            product_type,
1558            instrument_id,
1559            client_order_id,
1560            order_side,
1561            order_type,
1562            quantity,
1563            is_quote_quantity,
1564            time_in_force,
1565            price,
1566            trigger_price,
1567            trigger_type,
1568            post_only,
1569            reduce_only,
1570            is_leverage,
1571            None,
1572            None,
1573            position_idx,
1574        )?;
1575
1576        self.place_order(params).await
1577    }
1578
1579    /// Modifies an existing order using Nautilus domain objects.
1580    ///
1581    /// # Errors
1582    ///
1583    /// Returns an error if modification fails or if not authenticated.
1584    pub async fn modify_order(
1585        &self,
1586        product_type: BybitProductType,
1587        instrument_id: InstrumentId,
1588        client_order_id: ClientOrderId,
1589        venue_order_id: Option<VenueOrderId>,
1590        quantity: Option<Quantity>,
1591        price: Option<Price>,
1592    ) -> BybitWsResult<String> {
1593        let params = self.build_amend_order_params(
1594            product_type,
1595            instrument_id,
1596            venue_order_id,
1597            Some(client_order_id),
1598            quantity,
1599            price,
1600        )?;
1601
1602        self.amend_order(params).await
1603    }
1604
1605    /// Cancels an order using Nautilus domain objects.
1606    ///
1607    /// # Errors
1608    ///
1609    /// Returns an error if cancellation fails or if not authenticated.
1610    pub async fn cancel_order_by_id(
1611        &self,
1612        product_type: BybitProductType,
1613        instrument_id: InstrumentId,
1614        client_order_id: ClientOrderId,
1615        venue_order_id: Option<VenueOrderId>,
1616    ) -> BybitWsResult<String> {
1617        let params = self.build_cancel_order_params(
1618            product_type,
1619            instrument_id,
1620            venue_order_id,
1621            Some(client_order_id),
1622        )?;
1623
1624        self.cancel_order(params).await
1625    }
1626
1627    /// Builds order params for placing an order.
1628    #[expect(clippy::too_many_arguments)]
1629    pub fn build_place_order_params(
1630        &self,
1631        product_type: BybitProductType,
1632        instrument_id: InstrumentId,
1633        client_order_id: ClientOrderId,
1634        order_side: OrderSide,
1635        order_type: OrderType,
1636        quantity: Quantity,
1637        is_quote_quantity: bool,
1638        time_in_force: Option<TimeInForce>,
1639        price: Option<Price>,
1640        trigger_price: Option<Price>,
1641        trigger_type: Option<TriggerType>,
1642        post_only: Option<bool>,
1643        reduce_only: Option<bool>,
1644        is_leverage: bool,
1645        take_profit: Option<Price>,
1646        stop_loss: Option<Price>,
1647        position_idx: Option<BybitPositionIdx>,
1648    ) -> BybitWsResult<BybitWsPlaceOrderParams> {
1649        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1650            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1651        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1652
1653        let bybit_side = match order_side {
1654            OrderSide::Buy => BybitOrderSide::Buy,
1655            OrderSide::Sell => BybitOrderSide::Sell,
1656            _ => {
1657                return Err(BybitWsError::ClientError(format!(
1658                    "Invalid order side: {order_side:?}"
1659                )));
1660            }
1661        };
1662
1663        let (bybit_order_type, is_stop_order) = match order_type {
1664            OrderType::Market => (BybitOrderType::Market, false),
1665            OrderType::Limit => (BybitOrderType::Limit, false),
1666            OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1667            OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1668            _ => {
1669                return Err(BybitWsError::ClientError(format!(
1670                    "Unsupported order type: {order_type:?}"
1671                )));
1672            }
1673        };
1674
1675        let bybit_tif =
1676            map_time_in_force(bybit_order_type, time_in_force, post_only).map_err(|tif| {
1677                BybitWsError::ClientError(format!("Unsupported time in force: {tif:?}"))
1678            })?;
1679        let market_unit = spot_market_unit(product_type, bybit_order_type, is_quote_quantity);
1680        let is_leverage_value = spot_leverage(product_type, is_leverage);
1681        let trigger_dir =
1682            trigger_direction(order_type, order_side, is_stop_order).map(|d| d as i32);
1683
1684        let params = if is_stop_order {
1685            BybitWsPlaceOrderParams {
1686                category: product_type,
1687                symbol: raw_symbol,
1688                side: bybit_side,
1689                order_type: bybit_order_type,
1690                qty: quantity.to_string(),
1691                is_leverage: is_leverage_value,
1692                market_unit,
1693                price: price.map(|p| p.to_string()),
1694                time_in_force: bybit_tif,
1695                order_link_id: Some(client_order_id.to_string()),
1696                reduce_only: reduce_only.filter(|&r| r),
1697                close_on_trigger: None,
1698                trigger_price: trigger_price.map(|p| p.to_string()),
1699                trigger_by: Some(resolve_trigger_type(trigger_type)),
1700                trigger_direction: trigger_dir,
1701                tpsl_mode: if take_profit.is_some() || stop_loss.is_some() {
1702                    Some(BybitTpSlMode::Full)
1703                } else {
1704                    None
1705                },
1706                take_profit: take_profit.map(|p| p.to_string()),
1707                stop_loss: stop_loss.map(|p| p.to_string()),
1708                tp_trigger_by: take_profit.map(|_| resolve_trigger_type(trigger_type)),
1709                sl_trigger_by: stop_loss.map(|_| resolve_trigger_type(trigger_type)),
1710                sl_trigger_price: None,
1711                tp_trigger_price: None,
1712                sl_order_type: None,
1713                tp_order_type: None,
1714                sl_limit_price: None,
1715                tp_limit_price: None,
1716                order_iv: None,
1717                mmp: None,
1718                position_idx,
1719            }
1720        } else {
1721            BybitWsPlaceOrderParams {
1722                category: product_type,
1723                symbol: raw_symbol,
1724                side: bybit_side,
1725                order_type: bybit_order_type,
1726                qty: quantity.to_string(),
1727                is_leverage: is_leverage_value,
1728                market_unit,
1729                price: price.map(|p| p.to_string()),
1730                time_in_force: bybit_tif,
1731                order_link_id: Some(client_order_id.to_string()),
1732                reduce_only: reduce_only.filter(|&r| r),
1733                close_on_trigger: None,
1734                trigger_price: None,
1735                trigger_by: None,
1736                trigger_direction: None,
1737                tpsl_mode: if take_profit.is_some() || stop_loss.is_some() {
1738                    Some(BybitTpSlMode::Full)
1739                } else {
1740                    None
1741                },
1742                take_profit: take_profit.map(|p| p.to_string()),
1743                stop_loss: stop_loss.map(|p| p.to_string()),
1744                tp_trigger_by: take_profit.map(|_| resolve_trigger_type(trigger_type)),
1745                sl_trigger_by: stop_loss.map(|_| resolve_trigger_type(trigger_type)),
1746                sl_trigger_price: None,
1747                tp_trigger_price: None,
1748                sl_order_type: None,
1749                tp_order_type: None,
1750                sl_limit_price: None,
1751                tp_limit_price: None,
1752                order_iv: None,
1753                mmp: None,
1754                position_idx,
1755            }
1756        };
1757
1758        Ok(params)
1759    }
1760
1761    /// Builds order params for amending an order.
1762    pub fn build_amend_order_params(
1763        &self,
1764        product_type: BybitProductType,
1765        instrument_id: InstrumentId,
1766        venue_order_id: Option<VenueOrderId>,
1767        client_order_id: Option<ClientOrderId>,
1768        quantity: Option<Quantity>,
1769        price: Option<Price>,
1770    ) -> BybitWsResult<BybitWsAmendOrderParams> {
1771        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1772            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1773        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1774
1775        Ok(BybitWsAmendOrderParams {
1776            category: product_type,
1777            symbol: raw_symbol,
1778            order_id: venue_order_id.map(|v| v.to_string()),
1779            order_link_id: client_order_id.map(|c| c.to_string()),
1780            qty: quantity.map(|q| q.to_string()),
1781            price: price.map(|p| p.to_string()),
1782            trigger_price: None,
1783            take_profit: None,
1784            stop_loss: None,
1785            tp_trigger_by: None,
1786            sl_trigger_by: None,
1787            order_iv: None,
1788        })
1789    }
1790
1791    /// Builds order params for canceling an order via WebSocket.
1792    ///
1793    /// # Errors
1794    ///
1795    /// Returns an error if symbol parsing fails or if neither venue_order_id
1796    /// nor client_order_id is provided.
1797    pub fn build_cancel_order_params(
1798        &self,
1799        product_type: BybitProductType,
1800        instrument_id: InstrumentId,
1801        venue_order_id: Option<VenueOrderId>,
1802        client_order_id: Option<ClientOrderId>,
1803    ) -> BybitWsResult<BybitWsCancelOrderParams> {
1804        if venue_order_id.is_none() && client_order_id.is_none() {
1805            return Err(BybitWsError::ClientError(
1806                "Either venue_order_id or client_order_id must be provided".to_string(),
1807            ));
1808        }
1809
1810        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1811            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1812        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1813
1814        Ok(BybitWsCancelOrderParams {
1815            category: product_type,
1816            symbol: raw_symbol,
1817            order_id: venue_order_id.map(|v| v.to_string()),
1818            order_link_id: client_order_id.map(|c| c.to_string()),
1819        })
1820    }
1821
1822    fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
1823        let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
1824        let mm_level = self.mm_level.load(Ordering::Relaxed);
1825        !(is_post_only && mm_level > 0)
1826    }
1827
1828    fn default_headers() -> Vec<(String, String)> {
1829        vec![
1830            ("Content-Type".to_string(), "application/json".to_string()),
1831            ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
1832        ]
1833    }
1834
1835    async fn authenticate_if_required(&self) -> BybitWsResult<()> {
1836        if !self.requires_auth {
1837            return Ok(());
1838        }
1839
1840        let credential = self.credential.as_ref().ok_or_else(|| {
1841            BybitWsError::Authentication("Credentials required for authentication".to_string())
1842        })?;
1843
1844        let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
1845        let signature = credential.sign_websocket_auth(expires);
1846
1847        let auth_message = BybitAuthRequest {
1848            op: BybitWsOperation::Auth,
1849            args: vec![
1850                Value::String(credential.api_key().to_string()),
1851                Value::Number(expires.into()),
1852                Value::String(signature),
1853            ],
1854        };
1855
1856        let payload = serde_json::to_string(&auth_message)?;
1857
1858        // Begin auth attempt so succeed() will update state
1859        let _rx = self.auth_tracker.begin();
1860
1861        self.cmd_tx
1862            .read()
1863            .await
1864            .send(HandlerCommand::Authenticate { payload })
1865            .map_err(|e| BybitWsError::Send(format!("Failed to send auth command: {e}")))?;
1866
1867        Ok(())
1868    }
1869
1870    async fn send_text(&self, text: &str) -> BybitWsResult<()> {
1871        let cmd = HandlerCommand::SendText {
1872            payload: text.to_string(),
1873        };
1874
1875        self.send_cmd(cmd).await
1876    }
1877
1878    async fn send_cmd(&self, cmd: HandlerCommand) -> BybitWsResult<()> {
1879        self.cmd_tx
1880            .read()
1881            .await
1882            .send(cmd)
1883            .map_err(|e| BybitWsError::Send(e.to_string()))
1884    }
1885}
1886
1887#[cfg(test)]
1888mod tests {
1889    use rstest::rstest;
1890
1891    use super::*;
1892    use crate::{
1893        common::{enums::BybitMarketUnit, testing::load_test_json},
1894        websocket::{messages::BybitWsFrame, parse_bybit_ws_frame},
1895    };
1896
1897    #[rstest]
1898    fn classify_orderbook_snapshot() {
1899        let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
1900            .expect("invalid fixture");
1901        let frame = parse_bybit_ws_frame(json);
1902        assert!(matches!(frame, BybitWsFrame::Orderbook(_)));
1903    }
1904
1905    #[rstest]
1906    fn classify_trade_snapshot() {
1907        let json: Value =
1908            serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
1909        let frame = parse_bybit_ws_frame(json);
1910        assert!(matches!(frame, BybitWsFrame::Trade(_)));
1911    }
1912
1913    #[rstest]
1914    fn classify_ticker_linear_snapshot() {
1915        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
1916            .expect("invalid fixture");
1917        let frame = parse_bybit_ws_frame(json);
1918        assert!(matches!(frame, BybitWsFrame::TickerLinear(_)));
1919    }
1920
1921    #[rstest]
1922    fn classify_ticker_option_snapshot() {
1923        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
1924            .expect("invalid fixture");
1925        let frame = parse_bybit_ws_frame(json);
1926        assert!(matches!(frame, BybitWsFrame::TickerOption(_)));
1927    }
1928
1929    #[rstest]
1930    fn test_race_unsubscribe_failure_recovery() {
1931        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1932        let topic = "publicTrade.BTCUSDT";
1933
1934        subscriptions.mark_subscribe(topic);
1935        subscriptions.confirm_subscribe(topic);
1936        assert_eq!(subscriptions.len(), 1);
1937
1938        subscriptions.mark_unsubscribe(topic);
1939        assert_eq!(subscriptions.len(), 0);
1940        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
1941
1942        subscriptions.confirm_unsubscribe(topic);
1943        subscriptions.mark_subscribe(topic);
1944        subscriptions.confirm_subscribe(topic);
1945
1946        assert_eq!(subscriptions.len(), 1);
1947        assert!(subscriptions.pending_unsubscribe_topics().is_empty());
1948        assert!(subscriptions.pending_subscribe_topics().is_empty());
1949
1950        let all = subscriptions.all_topics();
1951        assert_eq!(all.len(), 1);
1952        assert!(all.contains(&topic.to_string()));
1953    }
1954
1955    #[rstest]
1956    fn test_race_resubscribe_before_unsubscribe_ack() {
1957        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1958        let topic = "orderbook.50.BTCUSDT";
1959
1960        subscriptions.mark_subscribe(topic);
1961        subscriptions.confirm_subscribe(topic);
1962        assert_eq!(subscriptions.len(), 1);
1963
1964        subscriptions.mark_unsubscribe(topic);
1965        assert_eq!(subscriptions.len(), 0);
1966        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
1967
1968        subscriptions.mark_subscribe(topic);
1969        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
1970
1971        subscriptions.confirm_unsubscribe(topic);
1972        assert!(subscriptions.pending_unsubscribe_topics().is_empty());
1973        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
1974
1975        subscriptions.confirm_subscribe(topic);
1976        assert_eq!(subscriptions.len(), 1);
1977        assert!(subscriptions.pending_subscribe_topics().is_empty());
1978
1979        let all = subscriptions.all_topics();
1980        assert_eq!(all.len(), 1);
1981        assert!(all.contains(&topic.to_string()));
1982    }
1983
1984    #[rstest]
1985    fn test_race_late_subscribe_confirmation_after_unsubscribe() {
1986        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1987        let topic = "tickers.ETHUSDT";
1988
1989        subscriptions.mark_subscribe(topic);
1990        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
1991
1992        subscriptions.mark_unsubscribe(topic);
1993        assert!(subscriptions.pending_subscribe_topics().is_empty());
1994        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
1995
1996        subscriptions.confirm_subscribe(topic);
1997        assert_eq!(subscriptions.len(), 0);
1998        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
1999
2000        subscriptions.confirm_unsubscribe(topic);
2001
2002        assert!(subscriptions.is_empty());
2003        assert!(subscriptions.all_topics().is_empty());
2004    }
2005
2006    #[rstest]
2007    fn test_race_reconnection_with_pending_states() {
2008        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
2009
2010        let trade_btc = "publicTrade.BTCUSDT";
2011        subscriptions.mark_subscribe(trade_btc);
2012        subscriptions.confirm_subscribe(trade_btc);
2013
2014        let trade_eth = "publicTrade.ETHUSDT";
2015        subscriptions.mark_subscribe(trade_eth);
2016
2017        let book_btc = "orderbook.50.BTCUSDT";
2018        subscriptions.mark_subscribe(book_btc);
2019        subscriptions.confirm_subscribe(book_btc);
2020        subscriptions.mark_unsubscribe(book_btc);
2021
2022        let topics_to_restore = subscriptions.all_topics();
2023
2024        assert_eq!(topics_to_restore.len(), 2);
2025        assert!(topics_to_restore.contains(&trade_btc.to_string()));
2026        assert!(topics_to_restore.contains(&trade_eth.to_string()));
2027        assert!(!topics_to_restore.contains(&book_btc.to_string()));
2028    }
2029
2030    #[rstest]
2031    fn test_race_duplicate_subscribe_messages_idempotent() {
2032        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
2033        let topic = "publicTrade.BTCUSDT";
2034
2035        subscriptions.mark_subscribe(topic);
2036        subscriptions.confirm_subscribe(topic);
2037        assert_eq!(subscriptions.len(), 1);
2038
2039        subscriptions.mark_subscribe(topic);
2040        assert!(subscriptions.pending_subscribe_topics().is_empty());
2041        assert_eq!(subscriptions.len(), 1);
2042
2043        subscriptions.confirm_subscribe(topic);
2044        assert_eq!(subscriptions.len(), 1);
2045
2046        let all = subscriptions.all_topics();
2047        assert_eq!(all.len(), 1);
2048        assert_eq!(all[0], topic);
2049    }
2050
2051    #[rstest]
2052    #[case::spot_with_leverage(BybitProductType::Spot, true, Some(1))]
2053    #[case::spot_without_leverage(BybitProductType::Spot, false, Some(0))]
2054    #[case::linear_with_leverage(BybitProductType::Linear, true, None)]
2055    #[case::linear_without_leverage(BybitProductType::Linear, false, None)]
2056    #[case::inverse_with_leverage(BybitProductType::Inverse, true, None)]
2057    #[case::option_with_leverage(BybitProductType::Option, true, None)]
2058    fn test_is_leverage_parameter(
2059        #[case] product_type: BybitProductType,
2060        #[case] is_leverage: bool,
2061        #[case] expected: Option<i32>,
2062    ) {
2063        let symbol = match product_type {
2064            BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2065            BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2066            BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2067            BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2068        };
2069
2070        let instrument_id = InstrumentId::from(symbol);
2071        let client_order_id = ClientOrderId::from("test-order-1");
2072        let quantity = Quantity::from("1.0");
2073
2074        let client = BybitWebSocketClient::new_trade(
2075            BybitEnvironment::Testnet,
2076            Some("test-key".to_string()),
2077            Some("test-secret".to_string()),
2078            None,
2079            20,
2080            TransportBackend::default(),
2081            None,
2082        );
2083
2084        let params = client
2085            .build_place_order_params(
2086                product_type,
2087                instrument_id,
2088                client_order_id,
2089                OrderSide::Buy,
2090                OrderType::Limit,
2091                quantity,
2092                false,
2093                Some(TimeInForce::Gtc),
2094                Some(Price::from("50000.0")),
2095                None,
2096                None,
2097                None,
2098                None,
2099                is_leverage,
2100                None,
2101                None,
2102                None,
2103            )
2104            .expect("Failed to build params");
2105
2106        assert_eq!(params.is_leverage, expected);
2107    }
2108
2109    #[rstest]
2110    #[case::spot_market_quote_quantity(
2111        BybitProductType::Spot,
2112        OrderType::Market,
2113        true,
2114        Some(BybitMarketUnit::QuoteCoin)
2115    )]
2116    #[case::spot_market_base_quantity(
2117        BybitProductType::Spot,
2118        OrderType::Market,
2119        false,
2120        Some(BybitMarketUnit::BaseCoin)
2121    )]
2122    #[case::spot_limit_no_unit(BybitProductType::Spot, OrderType::Limit, false, None)]
2123    #[case::spot_limit_quote(BybitProductType::Spot, OrderType::Limit, true, None)]
2124    #[case::linear_market_no_unit(BybitProductType::Linear, OrderType::Market, false, None)]
2125    #[case::inverse_market_no_unit(BybitProductType::Inverse, OrderType::Market, true, None)]
2126    fn test_is_quote_quantity_parameter(
2127        #[case] product_type: BybitProductType,
2128        #[case] order_type: OrderType,
2129        #[case] is_quote_quantity: bool,
2130        #[case] expected: Option<BybitMarketUnit>,
2131    ) {
2132        let symbol = match product_type {
2133            BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2134            BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2135            BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2136            BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2137        };
2138
2139        let instrument_id = InstrumentId::from(symbol);
2140        let client_order_id = ClientOrderId::from("test-order-1");
2141        let quantity = Quantity::from("1.0");
2142
2143        let client = BybitWebSocketClient::new_trade(
2144            BybitEnvironment::Testnet,
2145            Some("test-key".to_string()),
2146            Some("test-secret".to_string()),
2147            None,
2148            20,
2149            TransportBackend::default(),
2150            None,
2151        );
2152
2153        let params = client
2154            .build_place_order_params(
2155                product_type,
2156                instrument_id,
2157                client_order_id,
2158                OrderSide::Buy,
2159                order_type,
2160                quantity,
2161                is_quote_quantity,
2162                Some(TimeInForce::Gtc),
2163                if order_type == OrderType::Market {
2164                    None
2165                } else {
2166                    Some(Price::from("50000.0"))
2167                },
2168                None,
2169                None,
2170                None,
2171                None,
2172                false,
2173                None,
2174                None,
2175                None,
2176            )
2177            .expect("Failed to build params");
2178
2179        assert_eq!(params.market_unit, expected);
2180    }
2181}