Skip to main content

nautilus_bybit/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the Bybit adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::{AHashMap, AHashSet};
27use anyhow::Context;
28use futures_util::{StreamExt, pin_mut};
29use nautilus_common::{
30    clients::DataClient,
31    live::{runner::get_data_event_sender, runtime::get_runtime},
32    messages::{
33        DataEvent,
34        data::{
35            BarsResponse, BookResponse, DataResponse, ForwardPricesResponse, FundingRatesResponse,
36            InstrumentResponse, InstrumentsResponse, RequestBars, RequestBookSnapshot,
37            RequestForwardPrices, RequestFundingRates, RequestInstrument, RequestInstruments,
38            RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeFundingRates,
39            SubscribeIndexPrices, SubscribeInstrumentStatus, SubscribeMarkPrices,
40            SubscribeOptionGreeks, SubscribeQuotes, SubscribeTrades, TradesResponse,
41            UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeFundingRates,
42            UnsubscribeIndexPrices, UnsubscribeInstrumentStatus, UnsubscribeMarkPrices,
43            UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades,
44        },
45    },
46};
47use nautilus_core::{
48    AtomicMap, AtomicSet,
49    datetime::datetime_to_unix_nanos,
50    time::{AtomicTime, get_atomic_clock_realtime},
51};
52use nautilus_model::{
53    data::{BarType, Data, ForwardPrice, OrderBookDeltas_API, QuoteTick},
54    enums::{BookType, MarketStatusAction},
55    identifiers::{ClientId, InstrumentId, Venue},
56    instruments::{Instrument, InstrumentAny},
57    orderbook::book::OrderBook,
58};
59use rust_decimal::Decimal;
60use tokio::{task::JoinHandle, time::Duration};
61use tokio_util::sync::CancellationToken;
62use ustr::Ustr;
63
64use crate::{
65    common::{
66        consts::{BYBIT_DEFAULT_ORDERBOOK_DEPTH, BYBIT_VENUE},
67        enums::BybitProductType,
68        parse::{extract_raw_symbol, make_bybit_symbol},
69        status::diff_and_emit_statuses,
70        symbol::BybitSymbol,
71    },
72    config::BybitDataClientConfig,
73    http::client::BybitHttpClient,
74    websocket::{
75        client::BybitWebSocketClient,
76        messages::BybitWsMessage,
77        parse::{
78            parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
79            parse_ticker_linear_funding, parse_ticker_linear_index_price,
80            parse_ticker_linear_mark_price, parse_ticker_linear_quote, parse_ticker_option_greeks,
81            parse_ticker_option_index_price, parse_ticker_option_mark_price,
82            parse_ticker_option_quote, parse_ws_kline_bar, parse_ws_trade_tick,
83        },
84    },
85};
86
87/// Live market data client for Bybit.
88#[derive(Debug)]
89pub struct BybitDataClient {
90    client_id: ClientId,
91    config: BybitDataClientConfig,
92    http_client: BybitHttpClient,
93    ws_clients: Vec<BybitWebSocketClient>,
94    is_connected: AtomicBool,
95    cancellation_token: CancellationToken,
96    tasks: Vec<JoinHandle<()>>,
97    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
98    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
99    book_depths: Arc<AtomicMap<InstrumentId, u32>>,
100    quote_depths: Arc<AtomicMap<InstrumentId, u32>>,
101    ticker_subs: Arc<AtomicMap<InstrumentId, AHashSet<&'static str>>>,
102    trade_subs: Arc<AtomicSet<InstrumentId>>,
103    option_greeks_subs: Arc<AtomicSet<InstrumentId>>,
104    instrument_status_subs: Arc<AtomicSet<InstrumentId>>,
105    status_cache: Arc<AtomicMap<InstrumentId, MarketStatusAction>>,
106    clock: &'static AtomicTime,
107}
108
109impl BybitDataClient {
110    /// Creates a new [`BybitDataClient`] instance.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the client fails to initialize.
115    pub fn new(client_id: ClientId, config: BybitDataClientConfig) -> anyhow::Result<Self> {
116        let clock = get_atomic_clock_realtime();
117        let data_sender = get_data_event_sender();
118
119        let http_client = if let (Some(api_key), Some(api_secret)) =
120            (config.api_key.clone(), config.api_secret.clone())
121        {
122            BybitHttpClient::with_credentials(
123                api_key,
124                api_secret,
125                Some(config.http_base_url()),
126                config.http_timeout_secs,
127                config.max_retries,
128                config.retry_delay_initial_ms,
129                config.retry_delay_max_ms,
130                config.recv_window_ms,
131                config.proxy_url.clone(),
132            )?
133        } else {
134            BybitHttpClient::new(
135                Some(config.http_base_url()),
136                config.http_timeout_secs,
137                config.max_retries,
138                config.retry_delay_initial_ms,
139                config.retry_delay_max_ms,
140                config.recv_window_ms,
141                config.proxy_url.clone(),
142            )?
143        };
144
145        // Create a WebSocket client for each product type (default to Linear if empty)
146        let product_types = if config.product_types.is_empty() {
147            vec![BybitProductType::Linear]
148        } else {
149            config.product_types.clone()
150        };
151
152        let ws_clients: Vec<BybitWebSocketClient> = product_types
153            .iter()
154            .map(|product_type| {
155                BybitWebSocketClient::new_public_with(
156                    *product_type,
157                    config.environment,
158                    Some(config.ws_public_url_for(*product_type)),
159                    config.heartbeat_interval_secs,
160                    config.transport_backend,
161                    config.proxy_url.clone(),
162                )
163            })
164            .collect();
165
166        Ok(Self {
167            client_id,
168            config,
169            http_client,
170            ws_clients,
171            is_connected: AtomicBool::new(false),
172            cancellation_token: CancellationToken::new(),
173            tasks: Vec::new(),
174            data_sender,
175            instruments: Arc::new(AtomicMap::new()),
176            book_depths: Arc::new(AtomicMap::new()),
177            quote_depths: Arc::new(AtomicMap::new()),
178            ticker_subs: Arc::new(AtomicMap::new()),
179            trade_subs: Arc::new(AtomicSet::new()),
180            option_greeks_subs: Arc::new(AtomicSet::new()),
181            instrument_status_subs: Arc::new(AtomicSet::new()),
182            status_cache: Arc::new(AtomicMap::new()),
183            clock,
184        })
185    }
186
187    fn venue(&self) -> Venue {
188        *BYBIT_VENUE
189    }
190
191    fn get_ws_client_for_product(
192        &self,
193        product_type: BybitProductType,
194    ) -> Option<&BybitWebSocketClient> {
195        self.ws_clients
196            .iter()
197            .find(|ws| ws.product_type() == Some(product_type))
198    }
199
200    fn get_product_type_for_instrument(
201        &self,
202        instrument_id: InstrumentId,
203    ) -> Option<BybitProductType> {
204        let guard = self.instruments.load();
205        guard
206            .get(&instrument_id)
207            .and_then(|_| BybitProductType::from_suffix(instrument_id.symbol.as_str()))
208    }
209
210    fn spawn_ws<F>(&self, fut: F, context: &'static str)
211    where
212        F: Future<Output = anyhow::Result<()>> + Send + 'static,
213    {
214        get_runtime().spawn(async move {
215            if let Err(e) = fut.await {
216                log::error!("{context}: {e:?}");
217            }
218        });
219    }
220
221    fn spawn_instrument_status_polling(
222        &mut self,
223        product_types: &[BybitProductType],
224        poll_secs: u64,
225    ) {
226        let http = self.http_client.clone();
227        let sender = self.data_sender.clone();
228        let instruments = self.instruments.clone();
229        let status_cache = self.status_cache.clone();
230        let status_subs = self.instrument_status_subs.clone();
231        let cancel = self.cancellation_token.clone();
232        let clock = self.clock;
233        let product_types = product_types.to_vec();
234
235        let handle = get_runtime().spawn(async move {
236            let mut interval = tokio::time::interval(Duration::from_secs(poll_secs));
237            interval.tick().await; // Skip first immediate tick
238
239            loop {
240                tokio::select! {
241                    _ = interval.tick() => {
242                        if status_subs.is_empty() {
243                            continue;
244                        }
245
246                        // Accumulate statuses from all product types before diffing
247                        let mut all_statuses = AHashMap::new();
248
249                        for &pt in &product_types {
250                            match http.request_instrument_statuses(pt).await {
251                                Ok(new_statuses) => {
252                                    let inst_guard = instruments.load();
253                                    for (id, action) in new_statuses {
254                                        if inst_guard.contains_key(&id) {
255                                            all_statuses.insert(id, action);
256                                        }
257                                    }
258                                }
259                                Err(e) => {
260                                    log::warn!("Bybit instrument status poll failed for {pt:?}: {e}");
261                                }
262                            }
263                        }
264
265                        let ts = clock.get_time_ns();
266                        let mut cache = (**status_cache.load()).clone();
267                        let subs_guard = status_subs.load();
268                        diff_and_emit_statuses(
269                            &all_statuses, &mut cache, Some(&subs_guard), &sender, ts, ts,
270                        );
271                        status_cache.store(cache);
272                    }
273                    () = cancel.cancelled() => {
274                        log::debug!("Bybit instrument status polling task cancelled");
275                        break;
276                    }
277                }
278            }
279        });
280        self.tasks.push(handle);
281        log::info!("Instrument status polling started: interval={poll_secs}s");
282    }
283}
284
285fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
286    if let Err(e) = sender.send(DataEvent::Data(data)) {
287        log::error!("Failed to emit data event: {e}");
288    }
289}
290
291/// Cached funding state per symbol: (funding_rate, next_funding_time, funding_interval_hour).
292type FundingCacheEntry = (Option<String>, Option<String>, Option<String>);
293
294#[expect(clippy::too_many_arguments)]
295fn handle_ws_message(
296    message: &BybitWsMessage,
297    data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
298    instruments: &AHashMap<Ustr, InstrumentAny>,
299    product_type: Option<BybitProductType>,
300    trade_subs: &Arc<AtomicSet<InstrumentId>>,
301    ticker_subs: &Arc<AtomicMap<InstrumentId, AHashSet<&'static str>>>,
302    quote_depths: &Arc<AtomicMap<InstrumentId, u32>>,
303    book_depths: &Arc<AtomicMap<InstrumentId, u32>>,
304    option_greeks_subs: &Arc<AtomicSet<InstrumentId>>,
305    bar_types_cache: &Arc<AtomicMap<String, BarType>>,
306    quote_cache: &mut AHashMap<InstrumentId, QuoteTick>,
307    funding_cache: &mut AHashMap<Ustr, FundingCacheEntry>,
308    clock: &AtomicTime,
309) {
310    let ts_init = clock.get_time_ns();
311    let resolve = |raw_symbol: &Ustr| -> Option<&InstrumentAny> {
312        let key = product_type.map_or(*raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
313        instruments.get(&key)
314    };
315
316    match message {
317        BybitWsMessage::Orderbook(msg) => {
318            let Some(instrument) = resolve(&msg.data.s) else {
319                log::warn!("Unknown symbol in orderbook update: {}", msg.data.s);
320                return;
321            };
322            let instrument_id = instrument.id();
323
324            // Emit deltas if subscribed to book
325            let has_book_sub = book_depths.contains_key(&instrument_id);
326
327            if has_book_sub {
328                match parse_orderbook_deltas(msg, instrument, ts_init) {
329                    Ok(deltas) => {
330                        send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
331                    }
332                    Err(e) => log::error!("Failed to parse orderbook deltas: {e}"),
333                }
334            }
335
336            // Emit quote from best bid/ask if subscribed
337            let has_quote_sub = quote_depths.contains_key(&instrument_id);
338            let has_ticker_quote_sub = ticker_subs
339                .load()
340                .get(&instrument_id)
341                .is_some_and(|s| s.contains("quotes"));
342
343            if has_quote_sub || has_ticker_quote_sub {
344                let last_quote = quote_cache.get(&instrument_id);
345                match parse_orderbook_quote(msg, instrument, last_quote, ts_init) {
346                    Ok(quote) => {
347                        quote_cache.insert(instrument_id, quote);
348                        send_data(data_sender, Data::Quote(quote));
349                    }
350                    Err(e) => log::error!("Failed to parse orderbook quote: {e}"),
351                }
352            }
353        }
354        BybitWsMessage::Trade(msg) => {
355            for trade in &msg.data {
356                let Some(instrument) = resolve(&trade.s) else {
357                    continue;
358                };
359                let instrument_id = instrument.id();
360                if !trade_subs.contains(&instrument_id) {
361                    continue;
362                }
363
364                match parse_ws_trade_tick(trade, instrument, ts_init) {
365                    Ok(tick) => send_data(data_sender, Data::Trade(tick)),
366                    Err(e) => log::error!("Failed to parse trade tick: {e}"),
367                }
368            }
369        }
370        BybitWsMessage::Kline(msg) => {
371            let Ok((_, raw_symbol)) = parse_kline_topic(msg.topic.as_str()) else {
372                log::warn!("Invalid kline topic: {}", msg.topic);
373                return;
374            };
375            let ustr_symbol = Ustr::from(raw_symbol);
376            let Some(instrument) = resolve(&ustr_symbol) else {
377                log::warn!("Unknown symbol in kline update: {raw_symbol}");
378                return;
379            };
380            let topic_key = msg.topic.as_str();
381            let Some(bar_type) = bar_types_cache.load().get(topic_key).copied() else {
382                log::warn!("No bar type cached for kline topic: {topic_key}");
383                return;
384            };
385
386            for kline in &msg.data {
387                if !kline.confirm {
388                    continue;
389                }
390
391                match parse_ws_kline_bar(kline, instrument, bar_type, true, ts_init) {
392                    Ok(bar) => send_data(data_sender, Data::Bar(bar)),
393                    Err(e) => log::error!("Failed to parse kline bar: {e}"),
394                }
395            }
396        }
397        BybitWsMessage::TickerLinear(msg) => {
398            let Some(instrument) = resolve(&msg.data.symbol) else {
399                log::warn!("Unknown symbol in ticker update: {}", msg.data.symbol);
400                return;
401            };
402            let instrument_id = instrument.id();
403            let subs = ticker_subs.load();
404            let sub_set = subs.get(&instrument_id);
405
406            if sub_set.is_some_and(|s| s.contains("quotes")) && msg.data.bid1_price.is_some() {
407                match parse_ticker_linear_quote(msg, instrument, ts_init) {
408                    Ok(quote) => {
409                        let last = quote_cache.get(&instrument_id);
410                        if last.is_none_or(|q| *q != quote) {
411                            quote_cache.insert(instrument_id, quote);
412                            send_data(data_sender, Data::Quote(quote));
413                        }
414                    }
415                    Err(e) => log::debug!("Skipping partial ticker update: {e}"),
416                }
417            }
418
419            let ts_event = match parse_millis_i64(msg.ts, "ticker.ts") {
420                Ok(ts) => ts,
421                Err(e) => {
422                    log::error!("Failed to parse ticker timestamp: {e}");
423                    return;
424                }
425            };
426
427            if sub_set.is_some_and(|s| s.contains("funding")) {
428                let cache_entry = funding_cache
429                    .entry(msg.data.symbol)
430                    .or_insert((None, None, None));
431                let mut changed = false;
432
433                if let Some(rate) = &msg.data.funding_rate
434                    && cache_entry.0.as_ref() != Some(rate)
435                {
436                    cache_entry.0 = Some(rate.clone());
437                    changed = true;
438                }
439
440                if let Some(next_time) = &msg.data.next_funding_time
441                    && cache_entry.1.as_ref() != Some(next_time)
442                {
443                    cache_entry.1 = Some(next_time.clone());
444                    changed = true;
445                }
446
447                if let Some(interval) = &msg.data.funding_interval_hour {
448                    cache_entry.2 = Some(interval.clone());
449                }
450
451                if changed && cache_entry.0.is_some() {
452                    let mut merged = msg.data.clone();
453
454                    if merged.funding_rate.is_none() {
455                        merged.funding_rate.clone_from(&cache_entry.0);
456                    }
457
458                    if merged.next_funding_time.is_none() {
459                        merged.next_funding_time.clone_from(&cache_entry.1);
460                    }
461
462                    if merged.funding_interval_hour.is_none() {
463                        merged.funding_interval_hour.clone_from(&cache_entry.2);
464                    }
465
466                    match parse_ticker_linear_funding(&merged, instrument_id, ts_event, ts_init) {
467                        Ok(update) => {
468                            if let Err(e) = data_sender.send(DataEvent::FundingRate(update)) {
469                                log::error!("Failed to emit funding rate event: {e}");
470                            }
471                        }
472                        Err(e) => log::error!("Failed to parse ticker linear funding: {e}"),
473                    }
474                }
475            }
476
477            if sub_set.is_some_and(|s| s.contains("mark_prices")) && msg.data.mark_price.is_some() {
478                match parse_ticker_linear_mark_price(&msg.data, instrument, ts_event, ts_init) {
479                    Ok(update) => send_data(data_sender, Data::MarkPriceUpdate(update)),
480                    Err(e) => log::debug!("Skipping mark price update: {e}"),
481                }
482            }
483
484            if sub_set.is_some_and(|s| s.contains("index_prices")) && msg.data.index_price.is_some()
485            {
486                match parse_ticker_linear_index_price(&msg.data, instrument, ts_event, ts_init) {
487                    Ok(update) => send_data(data_sender, Data::IndexPriceUpdate(update)),
488                    Err(e) => log::debug!("Skipping index price update: {e}"),
489                }
490            }
491        }
492        BybitWsMessage::TickerOption(msg) => {
493            let Some(instrument) = resolve(&msg.data.symbol) else {
494                log::warn!(
495                    "Unknown symbol in option ticker update: {}",
496                    msg.data.symbol
497                );
498                return;
499            };
500            let instrument_id = instrument.id();
501            let subs = ticker_subs.load();
502            let sub_set = subs.get(&instrument_id);
503
504            if sub_set.is_some_and(|s| s.contains("quotes")) {
505                match parse_ticker_option_quote(msg, instrument, ts_init) {
506                    Ok(quote) => {
507                        let last = quote_cache.get(&instrument_id);
508                        if last.is_none_or(|q| *q != quote) {
509                            quote_cache.insert(instrument_id, quote);
510                            send_data(data_sender, Data::Quote(quote));
511                        }
512                    }
513                    Err(e) => log::error!("Failed to parse ticker option quote: {e}"),
514                }
515            }
516
517            if sub_set.is_some_and(|s| s.contains("mark_prices")) {
518                match parse_ticker_option_mark_price(msg, instrument, ts_init) {
519                    Ok(update) => send_data(data_sender, Data::MarkPriceUpdate(update)),
520                    Err(e) => log::error!("Failed to parse ticker option mark price: {e}"),
521                }
522            }
523
524            if sub_set.is_some_and(|s| s.contains("index_prices")) {
525                match parse_ticker_option_index_price(msg, instrument, ts_init) {
526                    Ok(update) => send_data(data_sender, Data::IndexPriceUpdate(update)),
527                    Err(e) => log::error!("Failed to parse ticker option index price: {e}"),
528                }
529            }
530
531            if option_greeks_subs.contains(&instrument_id) {
532                match parse_ticker_option_greeks(msg, instrument, ts_init) {
533                    Ok(greeks) => {
534                        if let Err(e) = data_sender.send(DataEvent::OptionGreeks(greeks)) {
535                            log::error!("Failed to send option greeks: {e}");
536                        }
537                    }
538                    Err(e) => log::error!("Failed to parse option greeks: {e}"),
539                }
540            }
541        }
542        BybitWsMessage::Reconnected => {
543            quote_cache.clear();
544            funding_cache.clear();
545            log::info!("WebSocket reconnected, cleared caches");
546        }
547        BybitWsMessage::Error(e) => {
548            log::error!(
549                "Bybit WebSocket error: code={} message={}",
550                e.code,
551                e.message
552            );
553        }
554        BybitWsMessage::Auth(_)
555        | BybitWsMessage::OrderResponse(_)
556        | BybitWsMessage::AccountOrder(_)
557        | BybitWsMessage::AccountExecution(_)
558        | BybitWsMessage::AccountWallet(_)
559        | BybitWsMessage::AccountPosition(_) => {}
560    }
561}
562
563fn upsert_instrument(
564    cache: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
565    instrument: InstrumentAny,
566) {
567    cache.insert(instrument.id(), instrument);
568}
569
570#[async_trait::async_trait(?Send)]
571impl DataClient for BybitDataClient {
572    fn client_id(&self) -> ClientId {
573        self.client_id
574    }
575
576    fn venue(&self) -> Option<Venue> {
577        Some(self.venue())
578    }
579
580    fn start(&mut self) -> anyhow::Result<()> {
581        log::info!(
582            "Started: client_id={}, product_types={:?}, environment={:?}, proxy_url={:?}",
583            self.client_id,
584            self.config.product_types,
585            self.config.environment,
586            self.config.proxy_url,
587        );
588        Ok(())
589    }
590
591    fn stop(&mut self) -> anyhow::Result<()> {
592        log::info!("Stopping {id}", id = self.client_id);
593        self.cancellation_token.cancel();
594        self.is_connected.store(false, Ordering::Relaxed);
595        Ok(())
596    }
597
598    fn reset(&mut self) -> anyhow::Result<()> {
599        log::debug!("Resetting {id}", id = self.client_id);
600        self.is_connected.store(false, Ordering::Relaxed);
601        self.cancellation_token = CancellationToken::new();
602        self.tasks.clear();
603        self.book_depths.store(AHashMap::new());
604        self.quote_depths.store(AHashMap::new());
605        self.ticker_subs.store(AHashMap::new());
606        self.option_greeks_subs.store(AHashSet::new());
607        self.instrument_status_subs.store(AHashSet::new());
608        self.status_cache.store(AHashMap::new());
609        Ok(())
610    }
611
612    fn dispose(&mut self) -> anyhow::Result<()> {
613        log::debug!("Disposing {id}", id = self.client_id);
614        self.stop()
615    }
616
617    async fn connect(&mut self) -> anyhow::Result<()> {
618        if self.is_connected() {
619            return Ok(());
620        }
621
622        let product_types = if self.config.product_types.is_empty() {
623            vec![BybitProductType::Linear]
624        } else {
625            self.config.product_types.clone()
626        };
627
628        let mut all_instruments = Vec::new();
629
630        for product_type in &product_types {
631            let fetched = self
632                .http_client
633                .request_instruments(*product_type, None, None)
634                .await
635                .with_context(|| {
636                    format!("failed to request Bybit instruments for {product_type:?}")
637                })?;
638
639            self.http_client.cache_instruments(&fetched);
640
641            self.instruments.rcu(|m| {
642                for instrument in &fetched {
643                    m.insert(instrument.id(), instrument.clone());
644                }
645            });
646
647            all_instruments.extend(fetched);
648        }
649
650        // Seed instrument status cache from initial fetch
651        if self
652            .config
653            .instrument_status_poll_secs
654            .is_some_and(|s| s > 0)
655        {
656            // Collect all statuses first (without holding the lock across await)
657            let mut collected_statuses = Vec::new();
658
659            for product_type in &product_types {
660                match self
661                    .http_client
662                    .request_instrument_statuses(*product_type)
663                    .await
664                {
665                    Ok(statuses) => collected_statuses.push(statuses),
666                    Err(e) => {
667                        log::warn!(
668                            "Failed to seed instrument status cache for {product_type:?}: {e}"
669                        );
670                    }
671                }
672            }
673
674            let inst_guard = self.instruments.load();
675            let mut status_map = AHashMap::new();
676
677            for statuses in collected_statuses {
678                for (id, action) in statuses {
679                    if inst_guard.contains_key(&id) {
680                        status_map.insert(id, action);
681                    }
682                }
683            }
684            log::info!(
685                "Seeded instrument status cache with {} entries",
686                status_map.len()
687            );
688            self.status_cache.store(status_map);
689        }
690
691        for instrument in all_instruments {
692            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
693                log::warn!("Failed to send instrument: {e}");
694            }
695        }
696
697        // Build instruments map keyed by full Nautilus symbol for parsing
698        let instruments_by_symbol: Arc<AHashMap<Ustr, InstrumentAny>> = {
699            let guard = self.instruments.load();
700            let mut map = AHashMap::new();
701            for instrument in guard.values() {
702                map.insert(instrument.id().symbol.inner(), instrument.clone());
703            }
704            Arc::new(map)
705        };
706
707        for ws_client in &mut self.ws_clients {
708            ws_client
709                .connect()
710                .await
711                .context("failed to connect Bybit WebSocket")?;
712            ws_client
713                .wait_until_active(10.0)
714                .await
715                .context("WebSocket did not become active")?;
716
717            let stream = ws_client.stream();
718            let product_type = ws_client.product_type();
719            let sender = self.data_sender.clone();
720            let trade_subs = self.trade_subs.clone();
721            let ticker_subs = self.ticker_subs.clone();
722            let quote_depths = self.quote_depths.clone();
723            let book_depths = self.book_depths.clone();
724            let option_greeks_subs = self.option_greeks_subs.clone();
725            let bar_types_cache = ws_client.bar_types_cache().clone();
726            let instruments = Arc::clone(&instruments_by_symbol);
727            let clock = self.clock;
728            let cancel = self.cancellation_token.clone();
729
730            let handle = get_runtime().spawn(async move {
731                let mut quote_cache: AHashMap<InstrumentId, QuoteTick> = AHashMap::new();
732                let mut funding_cache: AHashMap<Ustr, FundingCacheEntry> = AHashMap::new();
733
734                pin_mut!(stream);
735
736                loop {
737                    tokio::select! {
738                        Some(message) = stream.next() => {
739                            handle_ws_message(
740                                &message,
741                                &sender,
742                                &instruments,
743                                product_type,
744                                &trade_subs,
745                                &ticker_subs,
746                                &quote_depths,
747                                &book_depths,
748                                &option_greeks_subs,
749                                &bar_types_cache,
750                                &mut quote_cache,
751                                &mut funding_cache,
752                                clock,
753                            );
754                        }
755                        () = cancel.cancelled() => {
756                            log::debug!("WebSocket stream task cancelled");
757                            break;
758                        }
759                    }
760                }
761            });
762            self.tasks.push(handle);
763        }
764
765        // Spawn instrument status polling task
766        if let Some(poll_secs) = self.config.instrument_status_poll_secs
767            && poll_secs > 0
768        {
769            self.spawn_instrument_status_polling(&product_types, poll_secs);
770        }
771
772        self.is_connected.store(true, Ordering::Release);
773        log::info!("Connected: client_id={}", self.client_id);
774        Ok(())
775    }
776
777    async fn disconnect(&mut self) -> anyhow::Result<()> {
778        if self.is_disconnected() {
779            return Ok(());
780        }
781
782        self.cancellation_token.cancel();
783
784        // Reinitialize token so reconnect can spawn new stream tasks
785        self.cancellation_token = CancellationToken::new();
786
787        for ws_client in &mut self.ws_clients {
788            if let Err(e) = ws_client.close().await {
789                log::warn!("Error closing WebSocket: {e:?}");
790            }
791        }
792
793        // Allow time for unsubscribe confirmations
794        tokio::time::sleep(Duration::from_millis(500)).await;
795
796        let handles: Vec<_> = self.tasks.drain(..).collect();
797        for handle in handles {
798            if let Err(e) = handle.await {
799                log::error!("Error joining WebSocket task: {e}");
800            }
801        }
802
803        self.book_depths.store(AHashMap::new());
804        self.quote_depths.store(AHashMap::new());
805        self.ticker_subs.store(AHashMap::new());
806        self.trade_subs.store(AHashSet::new());
807        self.option_greeks_subs.store(AHashSet::new());
808        self.instrument_status_subs.store(AHashSet::new());
809        self.status_cache.store(AHashMap::new());
810        self.is_connected.store(false, Ordering::Release);
811        log::info!("Disconnected: client_id={}", self.client_id);
812        Ok(())
813    }
814
815    fn is_connected(&self) -> bool {
816        self.is_connected.load(Ordering::Relaxed)
817    }
818
819    fn is_disconnected(&self) -> bool {
820        !self.is_connected()
821    }
822
823    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
824        if cmd.book_type != BookType::L2_MBP {
825            anyhow::bail!("Bybit only supports L2_MBP order book deltas");
826        }
827
828        let depth = cmd
829            .depth
830            .map_or(BYBIT_DEFAULT_ORDERBOOK_DEPTH, |d| d.get() as u32);
831
832        if !matches!(depth, 1 | 50 | 200 | 500) {
833            anyhow::bail!("invalid depth {depth}; valid values are 1, 50, 200, or 500");
834        }
835
836        let instrument_id = cmd.instrument_id;
837        let product_type = self
838            .get_product_type_for_instrument(instrument_id)
839            .unwrap_or(BybitProductType::Linear);
840
841        let ws = self
842            .get_ws_client_for_product(product_type)
843            .context("no WebSocket client for product type")?
844            .clone();
845
846        let book_depths = Arc::clone(&self.book_depths);
847
848        self.spawn_ws(
849            async move {
850                ws.subscribe_orderbook(instrument_id, depth)
851                    .await
852                    .context("orderbook subscription")?;
853                book_depths.insert(instrument_id, depth);
854                Ok(())
855            },
856            "order book delta subscription",
857        );
858
859        Ok(())
860    }
861
862    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
863        let instrument_id = cmd.instrument_id;
864        let product_type = self
865            .get_product_type_for_instrument(instrument_id)
866            .unwrap_or(BybitProductType::Linear);
867
868        let ws = self
869            .get_ws_client_for_product(product_type)
870            .context("no WebSocket client for product type")?
871            .clone();
872
873        // SPOT ticker channel doesn't include bid/ask, use orderbook depth=1
874        if product_type == BybitProductType::Spot {
875            let depth = 1;
876            self.quote_depths.insert(instrument_id, depth);
877
878            self.spawn_ws(
879                async move {
880                    ws.subscribe_orderbook(instrument_id, depth)
881                        .await
882                        .context("orderbook subscription for quotes")
883                },
884                "quote subscription (spot orderbook)",
885            );
886        } else {
887            let mut should_subscribe = false;
888            self.ticker_subs.rcu(|m| {
889                let entry = m.entry(instrument_id).or_default();
890                should_subscribe = entry.is_empty();
891                entry.insert("quotes");
892            });
893
894            if should_subscribe {
895                self.spawn_ws(
896                    async move {
897                        ws.subscribe_ticker(instrument_id)
898                            .await
899                            .context("ticker subscription")
900                    },
901                    "quote subscription",
902                );
903            }
904        }
905        Ok(())
906    }
907
908    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
909        let instrument_id = cmd.instrument_id;
910        let product_type = self
911            .get_product_type_for_instrument(instrument_id)
912            .unwrap_or(BybitProductType::Linear);
913
914        self.trade_subs.insert(instrument_id);
915
916        let ws = self
917            .get_ws_client_for_product(product_type)
918            .context("no WebSocket client for product type")?
919            .clone();
920
921        self.spawn_ws(
922            async move {
923                ws.subscribe_trades(instrument_id)
924                    .await
925                    .context("trades subscription")
926            },
927            "trade subscription",
928        );
929        Ok(())
930    }
931
932    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
933        let instrument_id = cmd.instrument_id;
934        let product_type = self
935            .get_product_type_for_instrument(instrument_id)
936            .unwrap_or(BybitProductType::Linear);
937
938        if product_type == BybitProductType::Spot || product_type == BybitProductType::Option {
939            anyhow::bail!("Funding rates not available for {product_type:?} instruments");
940        }
941
942        let mut should_subscribe = false;
943        self.ticker_subs.rcu(|m| {
944            let entry = m.entry(instrument_id).or_default();
945            should_subscribe = entry.is_empty();
946            entry.insert("funding");
947        });
948
949        if should_subscribe {
950            let ws = self
951                .get_ws_client_for_product(product_type)
952                .context("no WebSocket client for product type")?
953                .clone();
954
955            self.spawn_ws(
956                async move {
957                    ws.subscribe_ticker(instrument_id)
958                        .await
959                        .context("ticker subscription for funding rates")
960                },
961                "funding rate subscription",
962            );
963        }
964        Ok(())
965    }
966
967    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
968        let instrument_id = cmd.instrument_id;
969        let product_type = self
970            .get_product_type_for_instrument(instrument_id)
971            .unwrap_or(BybitProductType::Linear);
972
973        if product_type == BybitProductType::Spot {
974            anyhow::bail!("Mark prices not available for Spot instruments");
975        }
976
977        let mut should_subscribe = false;
978        self.ticker_subs.rcu(|m| {
979            let entry = m.entry(instrument_id).or_default();
980            should_subscribe = entry.is_empty();
981            entry.insert("mark_prices");
982        });
983
984        if should_subscribe {
985            let ws = self
986                .get_ws_client_for_product(product_type)
987                .context("no WebSocket client for product type")?
988                .clone();
989
990            self.spawn_ws(
991                async move {
992                    ws.subscribe_ticker(instrument_id)
993                        .await
994                        .context("ticker subscription for mark prices")
995                },
996                "mark price subscription",
997            );
998        }
999        Ok(())
1000    }
1001
1002    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
1003        let instrument_id = cmd.instrument_id;
1004        let product_type = self
1005            .get_product_type_for_instrument(instrument_id)
1006            .unwrap_or(BybitProductType::Linear);
1007
1008        if product_type == BybitProductType::Spot {
1009            anyhow::bail!("Index prices not available for Spot instruments");
1010        }
1011
1012        let mut should_subscribe = false;
1013        self.ticker_subs.rcu(|m| {
1014            let entry = m.entry(instrument_id).or_default();
1015            should_subscribe = entry.is_empty();
1016            entry.insert("index_prices");
1017        });
1018
1019        if should_subscribe {
1020            let ws = self
1021                .get_ws_client_for_product(product_type)
1022                .context("no WebSocket client for product type")?
1023                .clone();
1024
1025            self.spawn_ws(
1026                async move {
1027                    ws.subscribe_ticker(instrument_id)
1028                        .await
1029                        .context("ticker subscription for index prices")
1030                },
1031                "index price subscription",
1032            );
1033        }
1034        Ok(())
1035    }
1036
1037    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
1038        let bar_type = cmd.bar_type;
1039        let instrument_id = bar_type.instrument_id();
1040        let product_type = self
1041            .get_product_type_for_instrument(instrument_id)
1042            .unwrap_or(BybitProductType::Linear);
1043
1044        if product_type == BybitProductType::Option {
1045            anyhow::bail!("Bybit does not support kline/bar data for options");
1046        }
1047
1048        let ws = self
1049            .get_ws_client_for_product(product_type)
1050            .context("no WebSocket client for product type")?
1051            .clone();
1052
1053        self.spawn_ws(
1054            async move {
1055                ws.subscribe_bars(bar_type)
1056                    .await
1057                    .context("bars subscription")
1058            },
1059            "bar subscription",
1060        );
1061        Ok(())
1062    }
1063
1064    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1065        let instrument_id = cmd.instrument_id;
1066        let depth = self
1067            .book_depths
1068            .load()
1069            .get(&instrument_id)
1070            .copied()
1071            .unwrap_or(BYBIT_DEFAULT_ORDERBOOK_DEPTH);
1072        self.book_depths.remove(&instrument_id);
1073
1074        let product_type = self
1075            .get_product_type_for_instrument(instrument_id)
1076            .unwrap_or(BybitProductType::Linear);
1077
1078        // Check if spot quote subscription is using the same depth
1079        let quote_using_same_depth = self
1080            .quote_depths
1081            .load()
1082            .get(&instrument_id)
1083            .is_some_and(|&d| d == depth);
1084
1085        if quote_using_same_depth {
1086            return Ok(());
1087        }
1088
1089        let ws = self
1090            .get_ws_client_for_product(product_type)
1091            .context("no WebSocket client for product type")?
1092            .clone();
1093
1094        self.spawn_ws(
1095            async move {
1096                ws.unsubscribe_orderbook(instrument_id, depth)
1097                    .await
1098                    .context("orderbook unsubscribe")
1099            },
1100            "order book unsubscribe",
1101        );
1102        Ok(())
1103    }
1104
1105    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1106        let instrument_id = cmd.instrument_id;
1107        let product_type = self
1108            .get_product_type_for_instrument(instrument_id)
1109            .unwrap_or(BybitProductType::Linear);
1110
1111        let ws = self
1112            .get_ws_client_for_product(product_type)
1113            .context("no WebSocket client for product type")?
1114            .clone();
1115
1116        if product_type == BybitProductType::Spot {
1117            let depth = self
1118                .quote_depths
1119                .load()
1120                .get(&instrument_id)
1121                .copied()
1122                .unwrap_or(1);
1123            self.quote_depths.remove(&instrument_id);
1124
1125            // Check if book deltas subscription is using the same depth
1126            let book_using_same_depth = self
1127                .book_depths
1128                .load()
1129                .get(&instrument_id)
1130                .is_some_and(|&d| d == depth);
1131
1132            if !book_using_same_depth {
1133                self.spawn_ws(
1134                    async move {
1135                        ws.unsubscribe_orderbook(instrument_id, depth)
1136                            .await
1137                            .context("orderbook unsubscribe for quotes")
1138                    },
1139                    "quote unsubscribe (spot orderbook)",
1140                );
1141            }
1142        } else {
1143            let mut should_unsubscribe = false;
1144            self.ticker_subs.rcu(|m| {
1145                if let Some(entry) = m.get_mut(&instrument_id) {
1146                    entry.remove("quotes");
1147                    if entry.is_empty() {
1148                        m.remove(&instrument_id);
1149                        should_unsubscribe = true;
1150                    } else {
1151                        should_unsubscribe = false;
1152                    }
1153                } else {
1154                    should_unsubscribe = false;
1155                }
1156            });
1157
1158            if should_unsubscribe {
1159                self.spawn_ws(
1160                    async move {
1161                        ws.unsubscribe_ticker(instrument_id)
1162                            .await
1163                            .context("ticker unsubscribe")
1164                    },
1165                    "quote unsubscribe",
1166                );
1167            }
1168        }
1169        Ok(())
1170    }
1171
1172    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1173        let instrument_id = cmd.instrument_id;
1174        let product_type = self
1175            .get_product_type_for_instrument(instrument_id)
1176            .unwrap_or(BybitProductType::Linear);
1177
1178        self.trade_subs.remove(&instrument_id);
1179
1180        let ws = self
1181            .get_ws_client_for_product(product_type)
1182            .context("no WebSocket client for product type")?
1183            .clone();
1184
1185        self.spawn_ws(
1186            async move {
1187                ws.unsubscribe_trades(instrument_id)
1188                    .await
1189                    .context("trades unsubscribe")
1190            },
1191            "trade unsubscribe",
1192        );
1193        Ok(())
1194    }
1195
1196    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1197        let instrument_id = cmd.instrument_id;
1198        let product_type = self
1199            .get_product_type_for_instrument(instrument_id)
1200            .unwrap_or(BybitProductType::Linear);
1201
1202        let mut should_unsubscribe = false;
1203        self.ticker_subs.rcu(|m| {
1204            if let Some(entry) = m.get_mut(&instrument_id) {
1205                entry.remove("funding");
1206                if entry.is_empty() {
1207                    m.remove(&instrument_id);
1208                    should_unsubscribe = true;
1209                } else {
1210                    should_unsubscribe = false;
1211                }
1212            } else {
1213                should_unsubscribe = false;
1214            }
1215        });
1216
1217        if should_unsubscribe {
1218            let ws = self
1219                .get_ws_client_for_product(product_type)
1220                .context("no WebSocket client for product type")?
1221                .clone();
1222
1223            self.spawn_ws(
1224                async move {
1225                    ws.unsubscribe_ticker(instrument_id)
1226                        .await
1227                        .context("ticker unsubscribe for funding rates")
1228                },
1229                "funding rate unsubscribe",
1230            );
1231        }
1232        Ok(())
1233    }
1234
1235    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1236        let instrument_id = cmd.instrument_id;
1237        let product_type = self
1238            .get_product_type_for_instrument(instrument_id)
1239            .unwrap_or(BybitProductType::Linear);
1240
1241        let mut should_unsubscribe = false;
1242        self.ticker_subs.rcu(|m| {
1243            if let Some(entry) = m.get_mut(&instrument_id) {
1244                entry.remove("mark_prices");
1245                if entry.is_empty() {
1246                    m.remove(&instrument_id);
1247                    should_unsubscribe = true;
1248                } else {
1249                    should_unsubscribe = false;
1250                }
1251            } else {
1252                should_unsubscribe = false;
1253            }
1254        });
1255
1256        if should_unsubscribe {
1257            let ws = self
1258                .get_ws_client_for_product(product_type)
1259                .context("no WebSocket client for product type")?
1260                .clone();
1261
1262            self.spawn_ws(
1263                async move {
1264                    ws.unsubscribe_ticker(instrument_id)
1265                        .await
1266                        .context("ticker unsubscribe for mark prices")
1267                },
1268                "mark price unsubscribe",
1269            );
1270        }
1271        Ok(())
1272    }
1273
1274    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1275        let instrument_id = cmd.instrument_id;
1276        let product_type = self
1277            .get_product_type_for_instrument(instrument_id)
1278            .unwrap_or(BybitProductType::Linear);
1279
1280        let mut should_unsubscribe = false;
1281        self.ticker_subs.rcu(|m| {
1282            if let Some(entry) = m.get_mut(&instrument_id) {
1283                entry.remove("index_prices");
1284                if entry.is_empty() {
1285                    m.remove(&instrument_id);
1286                    should_unsubscribe = true;
1287                } else {
1288                    should_unsubscribe = false;
1289                }
1290            } else {
1291                should_unsubscribe = false;
1292            }
1293        });
1294
1295        if should_unsubscribe {
1296            let ws = self
1297                .get_ws_client_for_product(product_type)
1298                .context("no WebSocket client for product type")?
1299                .clone();
1300
1301            self.spawn_ws(
1302                async move {
1303                    ws.unsubscribe_ticker(instrument_id)
1304                        .await
1305                        .context("ticker unsubscribe for index prices")
1306                },
1307                "index price unsubscribe",
1308            );
1309        }
1310        Ok(())
1311    }
1312
1313    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1314        let bar_type = cmd.bar_type;
1315        let instrument_id = bar_type.instrument_id();
1316        let product_type = self
1317            .get_product_type_for_instrument(instrument_id)
1318            .unwrap_or(BybitProductType::Linear);
1319
1320        let ws = self
1321            .get_ws_client_for_product(product_type)
1322            .context("no WebSocket client for product type")?
1323            .clone();
1324
1325        self.spawn_ws(
1326            async move {
1327                ws.unsubscribe_bars(bar_type)
1328                    .await
1329                    .context("bars unsubscribe")
1330            },
1331            "bar unsubscribe",
1332        );
1333        Ok(())
1334    }
1335
1336    fn subscribe_option_greeks(&mut self, cmd: SubscribeOptionGreeks) -> anyhow::Result<()> {
1337        let instrument_id = cmd.instrument_id;
1338        self.option_greeks_subs.insert(instrument_id);
1339
1340        let mut should_subscribe = false;
1341        self.ticker_subs.rcu(|m| {
1342            let entry = m.entry(instrument_id).or_default();
1343            should_subscribe = entry.is_empty();
1344            entry.insert("option_greeks");
1345        });
1346
1347        if should_subscribe {
1348            let product_type = self
1349                .get_product_type_for_instrument(instrument_id)
1350                .unwrap_or(BybitProductType::Option);
1351
1352            let ws = self
1353                .get_ws_client_for_product(product_type)
1354                .context("no WebSocket client for product type")?
1355                .clone();
1356
1357            self.spawn_ws(
1358                async move {
1359                    ws.subscribe_ticker(instrument_id)
1360                        .await
1361                        .context("ticker subscription for option greeks")
1362                },
1363                "option greeks subscription",
1364            );
1365        }
1366        Ok(())
1367    }
1368
1369    fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
1370        let instrument_id = cmd.instrument_id;
1371        self.option_greeks_subs.remove(&instrument_id);
1372
1373        let mut should_unsubscribe = false;
1374        self.ticker_subs.rcu(|m| {
1375            if let Some(entry) = m.get_mut(&instrument_id) {
1376                entry.remove("option_greeks");
1377                if entry.is_empty() {
1378                    m.remove(&instrument_id);
1379                    should_unsubscribe = true;
1380                } else {
1381                    should_unsubscribe = false;
1382                }
1383            } else {
1384                should_unsubscribe = false;
1385            }
1386        });
1387
1388        if should_unsubscribe {
1389            let product_type = self
1390                .get_product_type_for_instrument(instrument_id)
1391                .unwrap_or(BybitProductType::Option);
1392
1393            let ws = self
1394                .get_ws_client_for_product(product_type)
1395                .context("no WebSocket client for product type")?
1396                .clone();
1397
1398            self.spawn_ws(
1399                async move {
1400                    ws.unsubscribe_ticker(instrument_id)
1401                        .await
1402                        .context("ticker unsubscribe for option greeks")
1403                },
1404                "option greeks unsubscribe",
1405            );
1406        }
1407        Ok(())
1408    }
1409
1410    fn subscribe_instrument_status(
1411        &mut self,
1412        cmd: SubscribeInstrumentStatus,
1413    ) -> anyhow::Result<()> {
1414        log::debug!(
1415            "subscribe_instrument_status: {id} (status changes detected via periodic instrument info polling)",
1416            id = cmd.instrument_id,
1417        );
1418        self.instrument_status_subs.insert(cmd.instrument_id);
1419        Ok(())
1420    }
1421
1422    fn unsubscribe_instrument_status(
1423        &mut self,
1424        cmd: &UnsubscribeInstrumentStatus,
1425    ) -> anyhow::Result<()> {
1426        log::debug!(
1427            "unsubscribe_instrument_status: {id}",
1428            id = cmd.instrument_id,
1429        );
1430        self.instrument_status_subs.remove(&cmd.instrument_id);
1431        Ok(())
1432    }
1433
1434    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1435        let http = self.http_client.clone();
1436        let sender = self.data_sender.clone();
1437        let instruments_cache = self.instruments.clone();
1438        let request_id = request.request_id;
1439        let client_id = request.client_id.unwrap_or(self.client_id);
1440        let venue = self.venue();
1441        let start = request.start;
1442        let end = request.end;
1443        let params = request.params;
1444        let clock = self.clock;
1445        let start_nanos = datetime_to_unix_nanos(start);
1446        let end_nanos = datetime_to_unix_nanos(end);
1447        let product_types = if self.config.product_types.is_empty() {
1448            vec![BybitProductType::Linear]
1449        } else {
1450            self.config.product_types.clone()
1451        };
1452
1453        get_runtime().spawn(async move {
1454            let mut all_instruments = Vec::new();
1455
1456            for product_type in product_types {
1457                match http.request_instruments(product_type, None, None).await {
1458                    Ok(instruments) => {
1459                        for instrument in instruments {
1460                            upsert_instrument(&instruments_cache, instrument.clone());
1461                            all_instruments.push(instrument);
1462                        }
1463                    }
1464                    Err(e) => {
1465                        log::error!("Failed to fetch instruments for {product_type:?}: {e:?}");
1466                    }
1467                }
1468            }
1469
1470            let response = DataResponse::Instruments(InstrumentsResponse::new(
1471                request_id,
1472                client_id,
1473                venue,
1474                all_instruments,
1475                start_nanos,
1476                end_nanos,
1477                clock.get_time_ns(),
1478                params,
1479            ));
1480
1481            if let Err(e) = sender.send(DataEvent::Response(response)) {
1482                log::error!("Failed to send instruments response: {e}");
1483            }
1484        });
1485
1486        Ok(())
1487    }
1488
1489    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1490        let http = self.http_client.clone();
1491        let sender = self.data_sender.clone();
1492        let instruments = self.instruments.clone();
1493        let instrument_id = request.instrument_id;
1494        let request_id = request.request_id;
1495        let client_id = request.client_id.unwrap_or(self.client_id);
1496        let start = request.start;
1497        let end = request.end;
1498        let params = request.params;
1499        let clock = self.clock;
1500        let start_nanos = datetime_to_unix_nanos(start);
1501        let end_nanos = datetime_to_unix_nanos(end);
1502
1503        let product_type = BybitProductType::from_suffix(instrument_id.symbol.as_str())
1504            .unwrap_or(BybitProductType::Linear);
1505        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str()).to_string();
1506
1507        get_runtime().spawn(async move {
1508            match http
1509                .request_instruments(product_type, Some(raw_symbol), None)
1510                .await
1511                .context("fetch instrument from API")
1512            {
1513                Ok(fetched) => {
1514                    if let Some(instrument) = fetched.into_iter().find(|i| i.id() == instrument_id)
1515                    {
1516                        upsert_instrument(&instruments, instrument.clone());
1517
1518                        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1519                            request_id,
1520                            client_id,
1521                            instrument.id(),
1522                            instrument,
1523                            start_nanos,
1524                            end_nanos,
1525                            clock.get_time_ns(),
1526                            params,
1527                        )));
1528
1529                        if let Err(e) = sender.send(DataEvent::Response(response)) {
1530                            log::error!("Failed to send instrument response: {e}");
1531                        }
1532                    } else {
1533                        log::error!("Instrument not found: {instrument_id}");
1534                    }
1535                }
1536                Err(e) => log::error!("Instrument request failed: {e:?}"),
1537            }
1538        });
1539
1540        Ok(())
1541    }
1542
1543    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1544        let http = self.http_client.clone();
1545        let sender = self.data_sender.clone();
1546        let instrument_id = request.instrument_id;
1547        let depth = request.depth.map(|n| n.get() as u32);
1548        let request_id = request.request_id;
1549        let client_id = request.client_id.unwrap_or(self.client_id);
1550        let params = request.params;
1551        let clock = self.clock;
1552
1553        let product_type = BybitProductType::from_suffix(instrument_id.symbol.as_str())
1554            .unwrap_or(BybitProductType::Linear);
1555
1556        get_runtime().spawn(async move {
1557            match http
1558                .request_orderbook_snapshot(product_type, instrument_id, depth)
1559                .await
1560                .context("failed to request book snapshot from Bybit")
1561            {
1562                Ok(deltas) => {
1563                    let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1564                    if let Err(e) = book.apply_deltas(&deltas) {
1565                        log::error!("Failed to apply book deltas for {instrument_id}: {e}");
1566                        return;
1567                    }
1568
1569                    let response = DataResponse::Book(BookResponse::new(
1570                        request_id,
1571                        client_id,
1572                        instrument_id,
1573                        book,
1574                        None,
1575                        None,
1576                        clock.get_time_ns(),
1577                        params,
1578                    ));
1579
1580                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1581                        log::error!("Failed to send book snapshot response: {e}");
1582                    }
1583                }
1584                Err(e) => log::error!("Book snapshot request failed for {instrument_id}: {e:?}"),
1585            }
1586        });
1587
1588        Ok(())
1589    }
1590
1591    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1592        let http = self.http_client.clone();
1593        let sender = self.data_sender.clone();
1594        let instrument_id = request.instrument_id;
1595        let start = request.start;
1596        let end = request.end;
1597        let limit = request.limit.map(|n| n.get() as u32);
1598        let request_id = request.request_id;
1599        let client_id = request.client_id.unwrap_or(self.client_id);
1600        let params = request.params;
1601        let clock = self.clock;
1602        let start_nanos = datetime_to_unix_nanos(start);
1603        let end_nanos = datetime_to_unix_nanos(end);
1604
1605        let product_type = BybitProductType::from_suffix(instrument_id.symbol.as_str())
1606            .unwrap_or(BybitProductType::Linear);
1607
1608        get_runtime().spawn(async move {
1609            match http
1610                .request_trades(product_type, instrument_id, limit)
1611                .await
1612                .context("failed to request trades from Bybit")
1613            {
1614                Ok(trades) => {
1615                    let response = DataResponse::Trades(TradesResponse::new(
1616                        request_id,
1617                        client_id,
1618                        instrument_id,
1619                        trades,
1620                        start_nanos,
1621                        end_nanos,
1622                        clock.get_time_ns(),
1623                        params,
1624                    ));
1625
1626                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1627                        log::error!("Failed to send trades response: {e}");
1628                    }
1629                }
1630                Err(e) => log::error!("Trade request failed: {e:?}"),
1631            }
1632        });
1633
1634        Ok(())
1635    }
1636
1637    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1638        let http = self.http_client.clone();
1639        let sender = self.data_sender.clone();
1640        let bar_type = request.bar_type;
1641        let start = request.start;
1642        let end = request.end;
1643        let limit = request.limit.map(|n| n.get() as u32);
1644        let request_id = request.request_id;
1645        let client_id = request.client_id.unwrap_or(self.client_id);
1646        let params = request.params;
1647        let clock = self.clock;
1648        let start_nanos = datetime_to_unix_nanos(start);
1649        let end_nanos = datetime_to_unix_nanos(end);
1650
1651        let instrument_id = bar_type.instrument_id();
1652        let product_type = BybitProductType::from_suffix(instrument_id.symbol.as_str())
1653            .unwrap_or(BybitProductType::Linear);
1654
1655        get_runtime().spawn(async move {
1656            match http
1657                .request_bars(product_type, bar_type, start, end, limit, true)
1658                .await
1659                .context("failed to request bars from Bybit")
1660            {
1661                Ok(bars) => {
1662                    let response = DataResponse::Bars(BarsResponse::new(
1663                        request_id,
1664                        client_id,
1665                        bar_type,
1666                        bars,
1667                        start_nanos,
1668                        end_nanos,
1669                        clock.get_time_ns(),
1670                        params,
1671                    ));
1672
1673                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1674                        log::error!("Failed to send bars response: {e}");
1675                    }
1676                }
1677                Err(e) => log::error!("Bar request failed: {e:?}"),
1678            }
1679        });
1680
1681        Ok(())
1682    }
1683
1684    fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1685        let http = self.http_client.clone();
1686        let sender = self.data_sender.clone();
1687        let instrument_id = request.instrument_id;
1688        let start = request.start;
1689        let end = request.end;
1690        let limit = request.limit.map(|n| n.get() as u32);
1691        let request_id = request.request_id;
1692        let client_id = request.client_id.unwrap_or(self.client_id);
1693        let params = request.params;
1694        let clock = self.clock;
1695        let start_nanos = datetime_to_unix_nanos(start);
1696        let end_nanos = datetime_to_unix_nanos(end);
1697
1698        let product_type = BybitProductType::from_suffix(instrument_id.symbol.as_str())
1699            .unwrap_or(BybitProductType::Linear);
1700
1701        if product_type == BybitProductType::Spot || product_type == BybitProductType::Option {
1702            anyhow::bail!("Funding rates not available for {product_type} instruments");
1703        }
1704
1705        get_runtime().spawn(async move {
1706            match http
1707                .request_funding_rates(product_type, instrument_id, start, end, limit)
1708                .await
1709                .context("failed to request funding rates from Bybit")
1710            {
1711                Ok(funding_rates) => {
1712                    let response = DataResponse::FundingRates(FundingRatesResponse::new(
1713                        request_id,
1714                        client_id,
1715                        instrument_id,
1716                        funding_rates,
1717                        start_nanos,
1718                        end_nanos,
1719                        clock.get_time_ns(),
1720                        params,
1721                    ));
1722
1723                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1724                        log::error!("Failed to send funding rates response: {e}");
1725                    }
1726                }
1727                Err(e) => log::error!("Funding rates request failed for {instrument_id}: {e:?}"),
1728            }
1729        });
1730
1731        Ok(())
1732    }
1733
1734    fn request_forward_prices(&self, request: RequestForwardPrices) -> anyhow::Result<()> {
1735        let underlying = request.underlying.to_string();
1736        let instrument_id = request.instrument_id;
1737        let http_client = self.http_client.clone();
1738        let sender = self.data_sender.clone();
1739        let request_id = request.request_id;
1740        let client_id = self.client_id();
1741        let params = request.params;
1742        let clock = self.clock;
1743        let venue = *BYBIT_VENUE;
1744
1745        get_runtime().spawn(async move {
1746            let result = if let Some(inst_id) = instrument_id {
1747                // Single-instrument path: fetch ticker for one symbol
1748                let raw_symbol = extract_raw_symbol(inst_id.symbol.as_str()).to_string();
1749                log::info!(
1750                    "Requesting forward price for {underlying} (single instrument: {raw_symbol})"
1751                );
1752
1753                let params = crate::http::query::BybitTickersParams {
1754                    category: BybitProductType::Option,
1755                    symbol: Some(raw_symbol.clone()),
1756                    base_coin: None,
1757                    exp_date: None,
1758                };
1759
1760                match http_client.request_option_tickers_raw_with_params(&params).await {
1761                    Ok(tickers) => {
1762                        let ts = clock.get_time_ns();
1763                        let forward_prices: Vec<ForwardPrice> = tickers
1764                            .into_iter()
1765                            .filter_map(|t| {
1766                                let up: Decimal = t.underlying_price.parse().ok()?;
1767                                if up.is_zero() {
1768                                    return None;
1769                                }
1770                                Some(ForwardPrice::new(inst_id, up, None, ts, ts))
1771                            })
1772                            .collect();
1773
1774                        log::info!(
1775                            "Fetched {} forward price for {underlying} (single instrument: {raw_symbol})",
1776                            forward_prices.len(),
1777                        );
1778                        Ok((forward_prices, ts))
1779                    }
1780                    Err(e) => Err(e),
1781                }
1782            } else {
1783                // Bulk path: fetch all option tickers
1784                log::info!("Requesting option forward prices for base_coin={underlying} (bulk)");
1785
1786                match http_client.request_option_tickers_raw(&underlying).await {
1787                    Ok(tickers) => {
1788                        let ts = clock.get_time_ns();
1789
1790                        // Deduplicate: all options at the same expiry share the same
1791                        // forward price. Extract expiry prefix (e.g. "BTC-28FEB26" from
1792                        // "BTC-28FEB26-65000-C") and keep only one entry per expiry.
1793                        let mut seen_expiries = std::collections::HashSet::new();
1794                        let forward_prices: Vec<ForwardPrice> = tickers
1795                            .into_iter()
1796                            .filter_map(|t| {
1797                                let up: Decimal = t.underlying_price.parse().ok()?;
1798                                if up.is_zero() {
1799                                    return None;
1800                                }
1801                                let parts: Vec<&str> = t.symbol.splitn(3, '-').collect();
1802                                let expiry_key = if parts.len() >= 2 {
1803                                    format!("{}-{}", parts[0], parts[1])
1804                                } else {
1805                                    t.symbol.to_string()
1806                                };
1807
1808                                if !seen_expiries.insert(expiry_key) {
1809                                    return None;
1810                                }
1811                                Some(ForwardPrice::new(
1812                                    BybitSymbol::new(format!("{}-OPTION", t.symbol))
1813                                        .map(|s| s.to_instrument_id())
1814                                        .ok()?,
1815                                    up,
1816                                    None,
1817                                    ts,
1818                                    ts,
1819                                ))
1820                            })
1821                            .collect();
1822
1823                        log::info!(
1824                            "Fetched {} forward prices (per-expiry) for {underlying}",
1825                            forward_prices.len(),
1826                        );
1827                        Ok((forward_prices, ts))
1828                    }
1829                    Err(e) => Err(e),
1830                }
1831            };
1832
1833            match result {
1834                Ok((forward_prices, ts)) => {
1835                    let response = DataResponse::ForwardPrices(ForwardPricesResponse::new(
1836                        request_id,
1837                        client_id,
1838                        venue,
1839                        forward_prices,
1840                        ts,
1841                        params,
1842                    ));
1843
1844                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1845                        log::error!("Failed to send forward prices response: {e}");
1846                    }
1847                }
1848                Err(e) => {
1849                    log::error!("Forward prices request failed for {underlying}: {e:?}");
1850                }
1851            }
1852        });
1853
1854        Ok(())
1855    }
1856}
1857
1858#[cfg(test)]
1859mod tests {
1860    use std::sync::Arc;
1861
1862    use ahash::{AHashMap, AHashSet};
1863    use nautilus_common::messages::DataEvent;
1864    use nautilus_core::{AtomicMap, AtomicSet, UnixNanos, time::get_atomic_clock_realtime};
1865    use nautilus_model::{
1866        data::{BarType, Data, QuoteTick},
1867        enums::AggressorSide,
1868        identifiers::InstrumentId,
1869        instruments::{Instrument, InstrumentAny},
1870        types::{Price, Quantity},
1871    };
1872    use rstest::rstest;
1873    use ustr::Ustr;
1874
1875    use super::handle_ws_message;
1876    use crate::{
1877        common::{
1878            enums::BybitProductType,
1879            parse::{parse_linear_instrument, parse_option_instrument},
1880            testing::load_test_json,
1881        },
1882        http::models::{
1883            BybitFeeRate, BybitInstrumentLinearResponse, BybitInstrumentOptionResponse,
1884        },
1885        websocket::messages::{
1886            BybitWsMessage, BybitWsOrderbookDepthMsg, BybitWsTickerLinearMsg,
1887            BybitWsTickerOptionMsg, BybitWsTradeMsg,
1888        },
1889    };
1890
1891    fn sample_fee_rate(
1892        symbol: &str,
1893        taker: &str,
1894        maker: &str,
1895        base_coin: Option<&str>,
1896    ) -> BybitFeeRate {
1897        BybitFeeRate {
1898            symbol: Ustr::from(symbol),
1899            taker_fee_rate: taker.to_string(),
1900            maker_fee_rate: maker.to_string(),
1901            base_coin: base_coin.map(Ustr::from),
1902        }
1903    }
1904
1905    fn linear_instrument() -> InstrumentAny {
1906        let json = load_test_json("http_get_instruments_linear.json");
1907        let response: BybitInstrumentLinearResponse = serde_json::from_str(&json).unwrap();
1908        let instrument = &response.result.list[0];
1909        let fee_rate = sample_fee_rate("BTCUSDT", "0.00055", "0.0001", Some("BTC"));
1910        let ts = UnixNanos::new(1_700_000_000_000_000_000);
1911        parse_linear_instrument(instrument, &fee_rate, ts, ts).unwrap()
1912    }
1913
1914    fn option_instrument() -> InstrumentAny {
1915        let json = load_test_json("http_get_instruments_option.json");
1916        let response: BybitInstrumentOptionResponse = serde_json::from_str(&json).unwrap();
1917        let instrument = &response.result.list[0];
1918        let ts = UnixNanos::new(1_700_000_000_000_000_000);
1919        parse_option_instrument(instrument, None, ts, ts).unwrap()
1920    }
1921
1922    fn build_instruments(instruments: &[InstrumentAny]) -> AHashMap<Ustr, InstrumentAny> {
1923        let mut map = AHashMap::new();
1924        for inst in instruments {
1925            map.insert(inst.id().symbol.inner(), inst.clone());
1926        }
1927        map
1928    }
1929
1930    #[expect(clippy::type_complexity)]
1931    fn empty_subs() -> (
1932        Arc<AtomicSet<InstrumentId>>,
1933        Arc<AtomicMap<InstrumentId, AHashSet<&'static str>>>,
1934        Arc<AtomicMap<InstrumentId, u32>>,
1935        Arc<AtomicMap<InstrumentId, u32>>,
1936        Arc<AtomicSet<InstrumentId>>,
1937        Arc<AtomicMap<String, BarType>>,
1938    ) {
1939        (
1940            Arc::new(AtomicSet::new()),
1941            Arc::new(AtomicMap::new()),
1942            Arc::new(AtomicMap::new()),
1943            Arc::new(AtomicMap::new()),
1944            Arc::new(AtomicSet::new()),
1945            Arc::new(AtomicMap::new()),
1946        )
1947    }
1948
1949    #[rstest]
1950    fn test_handle_trade_message_emits_trade_tick() {
1951        let instrument = linear_instrument();
1952        let instruments = build_instruments(std::slice::from_ref(&instrument));
1953        let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
1954            empty_subs();
1955        trade_subs.insert(instrument.id());
1956        let mut quote_cache = AHashMap::new();
1957        let mut funding_cache = AHashMap::new();
1958        let clock = get_atomic_clock_realtime();
1959
1960        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1961
1962        let json = load_test_json("ws_public_trade.json");
1963        let msg: BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
1964        let ws_msg = BybitWsMessage::Trade(msg);
1965
1966        handle_ws_message(
1967            &ws_msg,
1968            &tx,
1969            &instruments,
1970            Some(BybitProductType::Linear),
1971            &trade_subs,
1972            &ticker_subs,
1973            &quote_depths,
1974            &book_depths,
1975            &greeks_subs,
1976            &bar_types,
1977            &mut quote_cache,
1978            &mut funding_cache,
1979            clock,
1980        );
1981
1982        let event = rx.try_recv().unwrap();
1983        match event {
1984            DataEvent::Data(Data::Trade(tick)) => {
1985                assert_eq!(tick.instrument_id, instrument.id());
1986                assert_eq!(tick.price, instrument.make_price(27451.00));
1987                assert_eq!(tick.size, instrument.make_qty(0.010, None));
1988                assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
1989            }
1990            other => panic!("Expected Trade data event, found {other:?}"),
1991        }
1992    }
1993
1994    #[rstest]
1995    fn test_handle_trade_message_unknown_symbol_no_event() {
1996        let instruments = AHashMap::new();
1997        let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
1998            empty_subs();
1999        let mut quote_cache = AHashMap::new();
2000        let mut funding_cache = AHashMap::new();
2001        let clock = get_atomic_clock_realtime();
2002
2003        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2004
2005        let json = load_test_json("ws_public_trade.json");
2006        let msg: BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
2007        let ws_msg = BybitWsMessage::Trade(msg);
2008
2009        handle_ws_message(
2010            &ws_msg,
2011            &tx,
2012            &instruments,
2013            Some(BybitProductType::Linear),
2014            &trade_subs,
2015            &ticker_subs,
2016            &quote_depths,
2017            &book_depths,
2018            &greeks_subs,
2019            &bar_types,
2020            &mut quote_cache,
2021            &mut funding_cache,
2022            clock,
2023        );
2024
2025        assert!(rx.try_recv().is_err());
2026    }
2027
2028    #[rstest]
2029    fn test_handle_orderbook_message_emits_deltas_and_quote() {
2030        let instrument = linear_instrument();
2031        let instrument_id = instrument.id();
2032        let instruments = build_instruments(&[instrument]);
2033        let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2034            empty_subs();
2035
2036        book_depths.insert(instrument_id, 1);
2037        quote_depths.insert(instrument_id, 1);
2038
2039        let mut quote_cache = AHashMap::new();
2040        let mut funding_cache = AHashMap::new();
2041        let clock = get_atomic_clock_realtime();
2042
2043        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2044
2045        let json = load_test_json("ws_orderbook_snapshot.json");
2046        let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
2047        let ws_msg = BybitWsMessage::Orderbook(msg);
2048
2049        handle_ws_message(
2050            &ws_msg,
2051            &tx,
2052            &instruments,
2053            Some(BybitProductType::Linear),
2054            &trade_subs,
2055            &ticker_subs,
2056            &quote_depths,
2057            &book_depths,
2058            &greeks_subs,
2059            &bar_types,
2060            &mut quote_cache,
2061            &mut funding_cache,
2062            clock,
2063        );
2064
2065        let event1 = rx.try_recv().unwrap();
2066        assert!(matches!(event1, DataEvent::Data(Data::Deltas(_))));
2067
2068        let event2 = rx.try_recv().unwrap();
2069        assert!(matches!(event2, DataEvent::Data(Data::Quote(_))));
2070    }
2071
2072    #[rstest]
2073    fn test_handle_orderbook_message_no_sub_no_event() {
2074        let instrument = linear_instrument();
2075        let instruments = build_instruments(&[instrument]);
2076        let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2077            empty_subs();
2078        let mut quote_cache = AHashMap::new();
2079        let mut funding_cache = AHashMap::new();
2080        let clock = get_atomic_clock_realtime();
2081
2082        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2083
2084        let json = load_test_json("ws_orderbook_snapshot.json");
2085        let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
2086        let ws_msg = BybitWsMessage::Orderbook(msg);
2087
2088        handle_ws_message(
2089            &ws_msg,
2090            &tx,
2091            &instruments,
2092            Some(BybitProductType::Linear),
2093            &trade_subs,
2094            &ticker_subs,
2095            &quote_depths,
2096            &book_depths,
2097            &greeks_subs,
2098            &bar_types,
2099            &mut quote_cache,
2100            &mut funding_cache,
2101            clock,
2102        );
2103
2104        assert!(rx.try_recv().is_err());
2105    }
2106
2107    #[rstest]
2108    fn test_handle_ticker_linear_emits_quote() {
2109        let instrument = linear_instrument();
2110        let instrument_id = instrument.id();
2111        let instruments = build_instruments(&[instrument]);
2112        let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2113            empty_subs();
2114
2115        let mut subs = AHashSet::new();
2116        subs.insert("quotes");
2117        ticker_subs.insert(instrument_id, subs);
2118
2119        let mut quote_cache = AHashMap::new();
2120        let mut funding_cache = AHashMap::new();
2121        let clock = get_atomic_clock_realtime();
2122
2123        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2124
2125        let json = load_test_json("ws_ticker_linear.json");
2126        let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
2127        let ws_msg = BybitWsMessage::TickerLinear(msg);
2128
2129        handle_ws_message(
2130            &ws_msg,
2131            &tx,
2132            &instruments,
2133            Some(BybitProductType::Linear),
2134            &trade_subs,
2135            &ticker_subs,
2136            &quote_depths,
2137            &book_depths,
2138            &greeks_subs,
2139            &bar_types,
2140            &mut quote_cache,
2141            &mut funding_cache,
2142            clock,
2143        );
2144
2145        let event = rx.try_recv().unwrap();
2146        assert!(matches!(event, DataEvent::Data(Data::Quote(_))));
2147        assert!(quote_cache.contains_key(&instrument_id));
2148    }
2149
2150    #[rstest]
2151    fn test_handle_ticker_linear_funding_dedup() {
2152        let instrument = linear_instrument();
2153        let instrument_id = instrument.id();
2154        let instruments = build_instruments(&[instrument]);
2155        let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2156            empty_subs();
2157
2158        let mut subs = AHashSet::new();
2159        subs.insert("funding");
2160        ticker_subs.insert(instrument_id, subs);
2161
2162        let mut quote_cache = AHashMap::new();
2163        let mut funding_cache = AHashMap::new();
2164        let clock = get_atomic_clock_realtime();
2165
2166        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2167
2168        let json = load_test_json("ws_ticker_linear.json");
2169        let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
2170        let ws_msg = BybitWsMessage::TickerLinear(msg.clone());
2171
2172        handle_ws_message(
2173            &ws_msg,
2174            &tx,
2175            &instruments,
2176            Some(BybitProductType::Linear),
2177            &trade_subs,
2178            &ticker_subs,
2179            &quote_depths,
2180            &book_depths,
2181            &greeks_subs,
2182            &bar_types,
2183            &mut quote_cache,
2184            &mut funding_cache,
2185            clock,
2186        );
2187
2188        let event = rx.try_recv().unwrap();
2189        assert!(matches!(event, DataEvent::FundingRate(_)));
2190
2191        // Send same message again, funding unchanged so should be deduped
2192        let ws_msg2 = BybitWsMessage::TickerLinear(msg);
2193        handle_ws_message(
2194            &ws_msg2,
2195            &tx,
2196            &instruments,
2197            Some(BybitProductType::Linear),
2198            &trade_subs,
2199            &ticker_subs,
2200            &quote_depths,
2201            &book_depths,
2202            &greeks_subs,
2203            &bar_types,
2204            &mut quote_cache,
2205            &mut funding_cache,
2206            clock,
2207        );
2208
2209        assert!(rx.try_recv().is_err());
2210    }
2211
2212    #[rstest]
2213    fn test_handle_ticker_linear_mark_and_index_prices() {
2214        let instrument = linear_instrument();
2215        let instrument_id = instrument.id();
2216        let instruments = build_instruments(&[instrument]);
2217        let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2218            empty_subs();
2219
2220        let mut subs = AHashSet::new();
2221        subs.insert("mark_prices");
2222        subs.insert("index_prices");
2223        ticker_subs.insert(instrument_id, subs);
2224
2225        let mut quote_cache = AHashMap::new();
2226        let mut funding_cache = AHashMap::new();
2227        let clock = get_atomic_clock_realtime();
2228
2229        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2230
2231        let json = load_test_json("ws_ticker_linear.json");
2232        let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
2233        let ws_msg = BybitWsMessage::TickerLinear(msg);
2234
2235        handle_ws_message(
2236            &ws_msg,
2237            &tx,
2238            &instruments,
2239            Some(BybitProductType::Linear),
2240            &trade_subs,
2241            &ticker_subs,
2242            &quote_depths,
2243            &book_depths,
2244            &greeks_subs,
2245            &bar_types,
2246            &mut quote_cache,
2247            &mut funding_cache,
2248            clock,
2249        );
2250
2251        let event1 = rx.try_recv().unwrap();
2252        assert!(matches!(event1, DataEvent::Data(Data::MarkPriceUpdate(_))));
2253
2254        let event2 = rx.try_recv().unwrap();
2255        assert!(matches!(event2, DataEvent::Data(Data::IndexPriceUpdate(_))));
2256    }
2257
2258    #[rstest]
2259    fn test_handle_reconnected_clears_caches() {
2260        let instruments = AHashMap::new();
2261        let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2262            empty_subs();
2263        let mut quote_cache = AHashMap::new();
2264        let mut funding_cache = AHashMap::new();
2265        let clock = get_atomic_clock_realtime();
2266
2267        let instrument_id = InstrumentId::from("BTCUSDT-LINEAR.BYBIT");
2268        quote_cache.insert(
2269            instrument_id,
2270            QuoteTick::new(
2271                instrument_id,
2272                Price::from("100.00"),
2273                Price::from("101.00"),
2274                Quantity::from("1.0"),
2275                Quantity::from("1.0"),
2276                UnixNanos::default(),
2277                UnixNanos::default(),
2278            ),
2279        );
2280        funding_cache.insert(
2281            Ustr::from("BTCUSDT"),
2282            (
2283                Some("-0.001".to_string()),
2284                Some("1000".to_string()),
2285                Some("8".to_string()),
2286            ),
2287        );
2288
2289        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
2290
2291        handle_ws_message(
2292            &BybitWsMessage::Reconnected,
2293            &tx,
2294            &instruments,
2295            None,
2296            &trade_subs,
2297            &ticker_subs,
2298            &quote_depths,
2299            &book_depths,
2300            &greeks_subs,
2301            &bar_types,
2302            &mut quote_cache,
2303            &mut funding_cache,
2304            clock,
2305        );
2306
2307        assert!(quote_cache.is_empty());
2308        assert!(funding_cache.is_empty());
2309    }
2310
2311    #[rstest]
2312    fn test_handle_ticker_option_greeks() {
2313        // Use the option instrument but key it by the ticker fixture symbol
2314        // (fixture instrument is ETH-26JUN26-16000-P, ticker fixture is BTC-6JAN23-17500-C)
2315        let instrument = option_instrument();
2316        let instrument_id = instrument.id();
2317
2318        // Key the instrument by the fixture ticker symbol with OPTION suffix
2319        let ticker_key = Ustr::from("BTC-6JAN23-17500-C-OPTION");
2320        let mut instruments = AHashMap::new();
2321        instruments.insert(ticker_key, instrument);
2322
2323        let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2324            empty_subs();
2325        greeks_subs.insert(instrument_id);
2326
2327        let mut quote_cache = AHashMap::new();
2328        let mut funding_cache = AHashMap::new();
2329        let clock = get_atomic_clock_realtime();
2330
2331        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2332
2333        let json = load_test_json("ws_ticker_option.json");
2334        let msg: BybitWsTickerOptionMsg = serde_json::from_str(&json).unwrap();
2335        let ws_msg = BybitWsMessage::TickerOption(msg);
2336
2337        handle_ws_message(
2338            &ws_msg,
2339            &tx,
2340            &instruments,
2341            Some(BybitProductType::Option),
2342            &trade_subs,
2343            &ticker_subs,
2344            &quote_depths,
2345            &book_depths,
2346            &greeks_subs,
2347            &bar_types,
2348            &mut quote_cache,
2349            &mut funding_cache,
2350            clock,
2351        );
2352
2353        let event = rx.try_recv().unwrap();
2354        assert!(matches!(event, DataEvent::OptionGreeks(_)));
2355    }
2356
2357    #[rstest]
2358    fn test_handle_execution_message_ignored_by_data() {
2359        let instruments = AHashMap::new();
2360        let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2361            empty_subs();
2362        let mut quote_cache = AHashMap::new();
2363        let mut funding_cache = AHashMap::new();
2364        let clock = get_atomic_clock_realtime();
2365
2366        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2367
2368        let json = load_test_json("ws_account_order.json");
2369        let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
2370            serde_json::from_str(&json).unwrap();
2371        let ws_msg = BybitWsMessage::AccountOrder(msg);
2372
2373        handle_ws_message(
2374            &ws_msg,
2375            &tx,
2376            &instruments,
2377            None,
2378            &trade_subs,
2379            &ticker_subs,
2380            &quote_depths,
2381            &book_depths,
2382            &greeks_subs,
2383            &bar_types,
2384            &mut quote_cache,
2385            &mut funding_cache,
2386            clock,
2387        );
2388
2389        assert!(rx.try_recv().is_err());
2390    }
2391
2392    #[rstest]
2393    fn test_instrument_resolution_with_product_type() {
2394        let instrument = linear_instrument();
2395
2396        let mut map = AHashMap::new();
2397        map.insert(instrument.id().symbol.inner(), instrument.clone());
2398
2399        let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2400            empty_subs();
2401        trade_subs.insert(instrument.id());
2402        let mut quote_cache = AHashMap::new();
2403        let mut funding_cache = AHashMap::new();
2404        let clock = get_atomic_clock_realtime();
2405        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2406
2407        let json = load_test_json("ws_public_trade.json");
2408        let msg: BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
2409
2410        // With None product_type, raw symbol "BTCUSDT" does not match "BTCUSDT-LINEAR"
2411        handle_ws_message(
2412            &BybitWsMessage::Trade(msg.clone()),
2413            &tx,
2414            &map,
2415            None,
2416            &trade_subs,
2417            &ticker_subs,
2418            &quote_depths,
2419            &book_depths,
2420            &greeks_subs,
2421            &bar_types,
2422            &mut quote_cache,
2423            &mut funding_cache,
2424            clock,
2425        );
2426        assert!(rx.try_recv().is_err());
2427
2428        // With product_type=Linear, "BTCUSDT" -> "BTCUSDT-LINEAR" matches
2429        handle_ws_message(
2430            &BybitWsMessage::Trade(msg),
2431            &tx,
2432            &map,
2433            Some(BybitProductType::Linear),
2434            &trade_subs,
2435            &ticker_subs,
2436            &quote_depths,
2437            &book_depths,
2438            &greeks_subs,
2439            &bar_types,
2440            &mut quote_cache,
2441            &mut funding_cache,
2442            clock,
2443        );
2444
2445        let event = rx.try_recv().unwrap();
2446        assert!(matches!(event, DataEvent::Data(Data::Trade(_))));
2447    }
2448
2449    #[rstest]
2450    fn test_handle_trade_filters_by_subscription() {
2451        let instrument = linear_instrument();
2452        let instruments = build_instruments(std::slice::from_ref(&instrument));
2453        let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2454            empty_subs();
2455        let mut quote_cache = AHashMap::new();
2456        let mut funding_cache = AHashMap::new();
2457        let clock = get_atomic_clock_realtime();
2458        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2459
2460        let json = load_test_json("ws_public_trade.json");
2461        let msg: BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
2462
2463        // Without subscription, trade should be filtered out
2464        handle_ws_message(
2465            &BybitWsMessage::Trade(msg.clone()),
2466            &tx,
2467            &instruments,
2468            Some(BybitProductType::Linear),
2469            &trade_subs,
2470            &ticker_subs,
2471            &quote_depths,
2472            &book_depths,
2473            &greeks_subs,
2474            &bar_types,
2475            &mut quote_cache,
2476            &mut funding_cache,
2477            clock,
2478        );
2479        assert!(rx.try_recv().is_err());
2480
2481        // With subscription, trade should be emitted
2482        trade_subs.insert(instrument.id());
2483        handle_ws_message(
2484            &BybitWsMessage::Trade(msg),
2485            &tx,
2486            &instruments,
2487            Some(BybitProductType::Linear),
2488            &trade_subs,
2489            &ticker_subs,
2490            &quote_depths,
2491            &book_depths,
2492            &greeks_subs,
2493            &bar_types,
2494            &mut quote_cache,
2495            &mut funding_cache,
2496            clock,
2497        );
2498        let event = rx.try_recv().unwrap();
2499        assert!(matches!(event, DataEvent::Data(Data::Trade(_))));
2500    }
2501}