1use std::{
19 future::Future,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::{AHashMap, AHashSet};
27use anyhow::Context;
28use futures_util::{StreamExt, pin_mut};
29use nautilus_common::{
30 clients::DataClient,
31 live::{runner::get_data_event_sender, runtime::get_runtime},
32 messages::{
33 DataEvent,
34 data::{
35 BarsResponse, BookResponse, DataResponse, ForwardPricesResponse, FundingRatesResponse,
36 InstrumentResponse, InstrumentsResponse, RequestBars, RequestBookSnapshot,
37 RequestForwardPrices, RequestFundingRates, RequestInstrument, RequestInstruments,
38 RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeFundingRates,
39 SubscribeIndexPrices, SubscribeInstrumentStatus, SubscribeMarkPrices,
40 SubscribeOptionGreeks, SubscribeQuotes, SubscribeTrades, TradesResponse,
41 UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeFundingRates,
42 UnsubscribeIndexPrices, UnsubscribeInstrumentStatus, UnsubscribeMarkPrices,
43 UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades,
44 },
45 },
46};
47use nautilus_core::{
48 AtomicMap, AtomicSet,
49 datetime::datetime_to_unix_nanos,
50 time::{AtomicTime, get_atomic_clock_realtime},
51};
52use nautilus_model::{
53 data::{BarType, Data, ForwardPrice, OrderBookDeltas_API, QuoteTick},
54 enums::{BookType, MarketStatusAction},
55 identifiers::{ClientId, InstrumentId, Venue},
56 instruments::{Instrument, InstrumentAny},
57 orderbook::book::OrderBook,
58};
59use rust_decimal::Decimal;
60use tokio::{task::JoinHandle, time::Duration};
61use tokio_util::sync::CancellationToken;
62use ustr::Ustr;
63
64use crate::{
65 common::{
66 consts::{BYBIT_DEFAULT_ORDERBOOK_DEPTH, BYBIT_VENUE},
67 enums::BybitProductType,
68 parse::{extract_raw_symbol, make_bybit_symbol},
69 status::diff_and_emit_statuses,
70 symbol::BybitSymbol,
71 },
72 config::BybitDataClientConfig,
73 http::client::BybitHttpClient,
74 websocket::{
75 client::BybitWebSocketClient,
76 messages::BybitWsMessage,
77 parse::{
78 parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
79 parse_ticker_linear_funding, parse_ticker_linear_index_price,
80 parse_ticker_linear_mark_price, parse_ticker_linear_quote, parse_ticker_option_greeks,
81 parse_ticker_option_index_price, parse_ticker_option_mark_price,
82 parse_ticker_option_quote, parse_ws_kline_bar, parse_ws_trade_tick,
83 },
84 },
85};
86
87#[derive(Debug)]
89pub struct BybitDataClient {
90 client_id: ClientId,
91 config: BybitDataClientConfig,
92 http_client: BybitHttpClient,
93 ws_clients: Vec<BybitWebSocketClient>,
94 is_connected: AtomicBool,
95 cancellation_token: CancellationToken,
96 tasks: Vec<JoinHandle<()>>,
97 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
98 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
99 book_depths: Arc<AtomicMap<InstrumentId, u32>>,
100 quote_depths: Arc<AtomicMap<InstrumentId, u32>>,
101 ticker_subs: Arc<AtomicMap<InstrumentId, AHashSet<&'static str>>>,
102 trade_subs: Arc<AtomicSet<InstrumentId>>,
103 option_greeks_subs: Arc<AtomicSet<InstrumentId>>,
104 instrument_status_subs: Arc<AtomicSet<InstrumentId>>,
105 status_cache: Arc<AtomicMap<InstrumentId, MarketStatusAction>>,
106 clock: &'static AtomicTime,
107}
108
109impl BybitDataClient {
110 pub fn new(client_id: ClientId, config: BybitDataClientConfig) -> anyhow::Result<Self> {
116 let clock = get_atomic_clock_realtime();
117 let data_sender = get_data_event_sender();
118
119 let http_client = if let (Some(api_key), Some(api_secret)) =
120 (config.api_key.clone(), config.api_secret.clone())
121 {
122 BybitHttpClient::with_credentials(
123 api_key,
124 api_secret,
125 Some(config.http_base_url()),
126 config.http_timeout_secs,
127 config.max_retries,
128 config.retry_delay_initial_ms,
129 config.retry_delay_max_ms,
130 config.recv_window_ms,
131 config.proxy_url.clone(),
132 )?
133 } else {
134 BybitHttpClient::new(
135 Some(config.http_base_url()),
136 config.http_timeout_secs,
137 config.max_retries,
138 config.retry_delay_initial_ms,
139 config.retry_delay_max_ms,
140 config.recv_window_ms,
141 config.proxy_url.clone(),
142 )?
143 };
144
145 let product_types = if config.product_types.is_empty() {
147 vec![BybitProductType::Linear]
148 } else {
149 config.product_types.clone()
150 };
151
152 let ws_clients: Vec<BybitWebSocketClient> = product_types
153 .iter()
154 .map(|product_type| {
155 BybitWebSocketClient::new_public_with(
156 *product_type,
157 config.environment,
158 Some(config.ws_public_url_for(*product_type)),
159 config.heartbeat_interval_secs,
160 config.transport_backend,
161 config.proxy_url.clone(),
162 )
163 })
164 .collect();
165
166 Ok(Self {
167 client_id,
168 config,
169 http_client,
170 ws_clients,
171 is_connected: AtomicBool::new(false),
172 cancellation_token: CancellationToken::new(),
173 tasks: Vec::new(),
174 data_sender,
175 instruments: Arc::new(AtomicMap::new()),
176 book_depths: Arc::new(AtomicMap::new()),
177 quote_depths: Arc::new(AtomicMap::new()),
178 ticker_subs: Arc::new(AtomicMap::new()),
179 trade_subs: Arc::new(AtomicSet::new()),
180 option_greeks_subs: Arc::new(AtomicSet::new()),
181 instrument_status_subs: Arc::new(AtomicSet::new()),
182 status_cache: Arc::new(AtomicMap::new()),
183 clock,
184 })
185 }
186
187 fn venue(&self) -> Venue {
188 *BYBIT_VENUE
189 }
190
191 fn get_ws_client_for_product(
192 &self,
193 product_type: BybitProductType,
194 ) -> Option<&BybitWebSocketClient> {
195 self.ws_clients
196 .iter()
197 .find(|ws| ws.product_type() == Some(product_type))
198 }
199
200 fn get_product_type_for_instrument(
201 &self,
202 instrument_id: InstrumentId,
203 ) -> Option<BybitProductType> {
204 let guard = self.instruments.load();
205 guard
206 .get(&instrument_id)
207 .and_then(|_| BybitProductType::from_suffix(instrument_id.symbol.as_str()))
208 }
209
210 fn spawn_ws<F>(&self, fut: F, context: &'static str)
211 where
212 F: Future<Output = anyhow::Result<()>> + Send + 'static,
213 {
214 get_runtime().spawn(async move {
215 if let Err(e) = fut.await {
216 log::error!("{context}: {e:?}");
217 }
218 });
219 }
220
221 fn spawn_instrument_status_polling(
222 &mut self,
223 product_types: &[BybitProductType],
224 poll_secs: u64,
225 ) {
226 let http = self.http_client.clone();
227 let sender = self.data_sender.clone();
228 let instruments = self.instruments.clone();
229 let status_cache = self.status_cache.clone();
230 let status_subs = self.instrument_status_subs.clone();
231 let cancel = self.cancellation_token.clone();
232 let clock = self.clock;
233 let product_types = product_types.to_vec();
234
235 let handle = get_runtime().spawn(async move {
236 let mut interval = tokio::time::interval(Duration::from_secs(poll_secs));
237 interval.tick().await; loop {
240 tokio::select! {
241 _ = interval.tick() => {
242 if status_subs.is_empty() {
243 continue;
244 }
245
246 let mut all_statuses = AHashMap::new();
248
249 for &pt in &product_types {
250 match http.request_instrument_statuses(pt).await {
251 Ok(new_statuses) => {
252 let inst_guard = instruments.load();
253 for (id, action) in new_statuses {
254 if inst_guard.contains_key(&id) {
255 all_statuses.insert(id, action);
256 }
257 }
258 }
259 Err(e) => {
260 log::warn!("Bybit instrument status poll failed for {pt:?}: {e}");
261 }
262 }
263 }
264
265 let ts = clock.get_time_ns();
266 let mut cache = (**status_cache.load()).clone();
267 let subs_guard = status_subs.load();
268 diff_and_emit_statuses(
269 &all_statuses, &mut cache, Some(&subs_guard), &sender, ts, ts,
270 );
271 status_cache.store(cache);
272 }
273 () = cancel.cancelled() => {
274 log::debug!("Bybit instrument status polling task cancelled");
275 break;
276 }
277 }
278 }
279 });
280 self.tasks.push(handle);
281 log::info!("Instrument status polling started: interval={poll_secs}s");
282 }
283}
284
285fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
286 if let Err(e) = sender.send(DataEvent::Data(data)) {
287 log::error!("Failed to emit data event: {e}");
288 }
289}
290
291type FundingCacheEntry = (Option<String>, Option<String>, Option<String>);
293
294#[expect(clippy::too_many_arguments)]
295fn handle_ws_message(
296 message: &BybitWsMessage,
297 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
298 instruments: &AHashMap<Ustr, InstrumentAny>,
299 product_type: Option<BybitProductType>,
300 trade_subs: &Arc<AtomicSet<InstrumentId>>,
301 ticker_subs: &Arc<AtomicMap<InstrumentId, AHashSet<&'static str>>>,
302 quote_depths: &Arc<AtomicMap<InstrumentId, u32>>,
303 book_depths: &Arc<AtomicMap<InstrumentId, u32>>,
304 option_greeks_subs: &Arc<AtomicSet<InstrumentId>>,
305 bar_types_cache: &Arc<AtomicMap<String, BarType>>,
306 quote_cache: &mut AHashMap<InstrumentId, QuoteTick>,
307 funding_cache: &mut AHashMap<Ustr, FundingCacheEntry>,
308 clock: &AtomicTime,
309) {
310 let ts_init = clock.get_time_ns();
311 let resolve = |raw_symbol: &Ustr| -> Option<&InstrumentAny> {
312 let key = product_type.map_or(*raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
313 instruments.get(&key)
314 };
315
316 match message {
317 BybitWsMessage::Orderbook(msg) => {
318 let Some(instrument) = resolve(&msg.data.s) else {
319 log::warn!("Unknown symbol in orderbook update: {}", msg.data.s);
320 return;
321 };
322 let instrument_id = instrument.id();
323
324 let has_book_sub = book_depths.contains_key(&instrument_id);
326
327 if has_book_sub {
328 match parse_orderbook_deltas(msg, instrument, ts_init) {
329 Ok(deltas) => {
330 send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
331 }
332 Err(e) => log::error!("Failed to parse orderbook deltas: {e}"),
333 }
334 }
335
336 let has_quote_sub = quote_depths.contains_key(&instrument_id);
338 let has_ticker_quote_sub = ticker_subs
339 .load()
340 .get(&instrument_id)
341 .is_some_and(|s| s.contains("quotes"));
342
343 if has_quote_sub || has_ticker_quote_sub {
344 let last_quote = quote_cache.get(&instrument_id);
345 match parse_orderbook_quote(msg, instrument, last_quote, ts_init) {
346 Ok(quote) => {
347 quote_cache.insert(instrument_id, quote);
348 send_data(data_sender, Data::Quote(quote));
349 }
350 Err(e) => log::error!("Failed to parse orderbook quote: {e}"),
351 }
352 }
353 }
354 BybitWsMessage::Trade(msg) => {
355 for trade in &msg.data {
356 let Some(instrument) = resolve(&trade.s) else {
357 continue;
358 };
359 let instrument_id = instrument.id();
360 if !trade_subs.contains(&instrument_id) {
361 continue;
362 }
363
364 match parse_ws_trade_tick(trade, instrument, ts_init) {
365 Ok(tick) => send_data(data_sender, Data::Trade(tick)),
366 Err(e) => log::error!("Failed to parse trade tick: {e}"),
367 }
368 }
369 }
370 BybitWsMessage::Kline(msg) => {
371 let Ok((_, raw_symbol)) = parse_kline_topic(msg.topic.as_str()) else {
372 log::warn!("Invalid kline topic: {}", msg.topic);
373 return;
374 };
375 let ustr_symbol = Ustr::from(raw_symbol);
376 let Some(instrument) = resolve(&ustr_symbol) else {
377 log::warn!("Unknown symbol in kline update: {raw_symbol}");
378 return;
379 };
380 let topic_key = msg.topic.as_str();
381 let Some(bar_type) = bar_types_cache.load().get(topic_key).copied() else {
382 log::warn!("No bar type cached for kline topic: {topic_key}");
383 return;
384 };
385
386 for kline in &msg.data {
387 if !kline.confirm {
388 continue;
389 }
390
391 match parse_ws_kline_bar(kline, instrument, bar_type, true, ts_init) {
392 Ok(bar) => send_data(data_sender, Data::Bar(bar)),
393 Err(e) => log::error!("Failed to parse kline bar: {e}"),
394 }
395 }
396 }
397 BybitWsMessage::TickerLinear(msg) => {
398 let Some(instrument) = resolve(&msg.data.symbol) else {
399 log::warn!("Unknown symbol in ticker update: {}", msg.data.symbol);
400 return;
401 };
402 let instrument_id = instrument.id();
403 let subs = ticker_subs.load();
404 let sub_set = subs.get(&instrument_id);
405
406 if sub_set.is_some_and(|s| s.contains("quotes")) && msg.data.bid1_price.is_some() {
407 match parse_ticker_linear_quote(msg, instrument, ts_init) {
408 Ok(quote) => {
409 let last = quote_cache.get(&instrument_id);
410 if last.is_none_or(|q| *q != quote) {
411 quote_cache.insert(instrument_id, quote);
412 send_data(data_sender, Data::Quote(quote));
413 }
414 }
415 Err(e) => log::debug!("Skipping partial ticker update: {e}"),
416 }
417 }
418
419 let ts_event = match parse_millis_i64(msg.ts, "ticker.ts") {
420 Ok(ts) => ts,
421 Err(e) => {
422 log::error!("Failed to parse ticker timestamp: {e}");
423 return;
424 }
425 };
426
427 if sub_set.is_some_and(|s| s.contains("funding")) {
428 let cache_entry = funding_cache
429 .entry(msg.data.symbol)
430 .or_insert((None, None, None));
431 let mut changed = false;
432
433 if let Some(rate) = &msg.data.funding_rate
434 && cache_entry.0.as_ref() != Some(rate)
435 {
436 cache_entry.0 = Some(rate.clone());
437 changed = true;
438 }
439
440 if let Some(next_time) = &msg.data.next_funding_time
441 && cache_entry.1.as_ref() != Some(next_time)
442 {
443 cache_entry.1 = Some(next_time.clone());
444 changed = true;
445 }
446
447 if let Some(interval) = &msg.data.funding_interval_hour {
448 cache_entry.2 = Some(interval.clone());
449 }
450
451 if changed && cache_entry.0.is_some() {
452 let mut merged = msg.data.clone();
453
454 if merged.funding_rate.is_none() {
455 merged.funding_rate.clone_from(&cache_entry.0);
456 }
457
458 if merged.next_funding_time.is_none() {
459 merged.next_funding_time.clone_from(&cache_entry.1);
460 }
461
462 if merged.funding_interval_hour.is_none() {
463 merged.funding_interval_hour.clone_from(&cache_entry.2);
464 }
465
466 match parse_ticker_linear_funding(&merged, instrument_id, ts_event, ts_init) {
467 Ok(update) => {
468 if let Err(e) = data_sender.send(DataEvent::FundingRate(update)) {
469 log::error!("Failed to emit funding rate event: {e}");
470 }
471 }
472 Err(e) => log::error!("Failed to parse ticker linear funding: {e}"),
473 }
474 }
475 }
476
477 if sub_set.is_some_and(|s| s.contains("mark_prices")) && msg.data.mark_price.is_some() {
478 match parse_ticker_linear_mark_price(&msg.data, instrument, ts_event, ts_init) {
479 Ok(update) => send_data(data_sender, Data::MarkPriceUpdate(update)),
480 Err(e) => log::debug!("Skipping mark price update: {e}"),
481 }
482 }
483
484 if sub_set.is_some_and(|s| s.contains("index_prices")) && msg.data.index_price.is_some()
485 {
486 match parse_ticker_linear_index_price(&msg.data, instrument, ts_event, ts_init) {
487 Ok(update) => send_data(data_sender, Data::IndexPriceUpdate(update)),
488 Err(e) => log::debug!("Skipping index price update: {e}"),
489 }
490 }
491 }
492 BybitWsMessage::TickerOption(msg) => {
493 let Some(instrument) = resolve(&msg.data.symbol) else {
494 log::warn!(
495 "Unknown symbol in option ticker update: {}",
496 msg.data.symbol
497 );
498 return;
499 };
500 let instrument_id = instrument.id();
501 let subs = ticker_subs.load();
502 let sub_set = subs.get(&instrument_id);
503
504 if sub_set.is_some_and(|s| s.contains("quotes")) {
505 match parse_ticker_option_quote(msg, instrument, ts_init) {
506 Ok(quote) => {
507 let last = quote_cache.get(&instrument_id);
508 if last.is_none_or(|q| *q != quote) {
509 quote_cache.insert(instrument_id, quote);
510 send_data(data_sender, Data::Quote(quote));
511 }
512 }
513 Err(e) => log::error!("Failed to parse ticker option quote: {e}"),
514 }
515 }
516
517 if sub_set.is_some_and(|s| s.contains("mark_prices")) {
518 match parse_ticker_option_mark_price(msg, instrument, ts_init) {
519 Ok(update) => send_data(data_sender, Data::MarkPriceUpdate(update)),
520 Err(e) => log::error!("Failed to parse ticker option mark price: {e}"),
521 }
522 }
523
524 if sub_set.is_some_and(|s| s.contains("index_prices")) {
525 match parse_ticker_option_index_price(msg, instrument, ts_init) {
526 Ok(update) => send_data(data_sender, Data::IndexPriceUpdate(update)),
527 Err(e) => log::error!("Failed to parse ticker option index price: {e}"),
528 }
529 }
530
531 if option_greeks_subs.contains(&instrument_id) {
532 match parse_ticker_option_greeks(msg, instrument, ts_init) {
533 Ok(greeks) => {
534 if let Err(e) = data_sender.send(DataEvent::OptionGreeks(greeks)) {
535 log::error!("Failed to send option greeks: {e}");
536 }
537 }
538 Err(e) => log::error!("Failed to parse option greeks: {e}"),
539 }
540 }
541 }
542 BybitWsMessage::Reconnected => {
543 quote_cache.clear();
544 funding_cache.clear();
545 log::info!("WebSocket reconnected, cleared caches");
546 }
547 BybitWsMessage::Error(e) => {
548 log::error!(
549 "Bybit WebSocket error: code={} message={}",
550 e.code,
551 e.message
552 );
553 }
554 BybitWsMessage::Auth(_)
555 | BybitWsMessage::OrderResponse(_)
556 | BybitWsMessage::AccountOrder(_)
557 | BybitWsMessage::AccountExecution(_)
558 | BybitWsMessage::AccountWallet(_)
559 | BybitWsMessage::AccountPosition(_) => {}
560 }
561}
562
563fn upsert_instrument(
564 cache: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
565 instrument: InstrumentAny,
566) {
567 cache.insert(instrument.id(), instrument);
568}
569
570#[async_trait::async_trait(?Send)]
571impl DataClient for BybitDataClient {
572 fn client_id(&self) -> ClientId {
573 self.client_id
574 }
575
576 fn venue(&self) -> Option<Venue> {
577 Some(self.venue())
578 }
579
580 fn start(&mut self) -> anyhow::Result<()> {
581 log::info!(
582 "Started: client_id={}, product_types={:?}, environment={:?}, proxy_url={:?}",
583 self.client_id,
584 self.config.product_types,
585 self.config.environment,
586 self.config.proxy_url,
587 );
588 Ok(())
589 }
590
591 fn stop(&mut self) -> anyhow::Result<()> {
592 log::info!("Stopping {id}", id = self.client_id);
593 self.cancellation_token.cancel();
594 self.is_connected.store(false, Ordering::Relaxed);
595 Ok(())
596 }
597
598 fn reset(&mut self) -> anyhow::Result<()> {
599 log::debug!("Resetting {id}", id = self.client_id);
600 self.is_connected.store(false, Ordering::Relaxed);
601 self.cancellation_token = CancellationToken::new();
602 self.tasks.clear();
603 self.book_depths.store(AHashMap::new());
604 self.quote_depths.store(AHashMap::new());
605 self.ticker_subs.store(AHashMap::new());
606 self.option_greeks_subs.store(AHashSet::new());
607 self.instrument_status_subs.store(AHashSet::new());
608 self.status_cache.store(AHashMap::new());
609 Ok(())
610 }
611
612 fn dispose(&mut self) -> anyhow::Result<()> {
613 log::debug!("Disposing {id}", id = self.client_id);
614 self.stop()
615 }
616
617 async fn connect(&mut self) -> anyhow::Result<()> {
618 if self.is_connected() {
619 return Ok(());
620 }
621
622 let product_types = if self.config.product_types.is_empty() {
623 vec![BybitProductType::Linear]
624 } else {
625 self.config.product_types.clone()
626 };
627
628 let mut all_instruments = Vec::new();
629
630 for product_type in &product_types {
631 let fetched = self
632 .http_client
633 .request_instruments(*product_type, None, None)
634 .await
635 .with_context(|| {
636 format!("failed to request Bybit instruments for {product_type:?}")
637 })?;
638
639 self.http_client.cache_instruments(&fetched);
640
641 self.instruments.rcu(|m| {
642 for instrument in &fetched {
643 m.insert(instrument.id(), instrument.clone());
644 }
645 });
646
647 all_instruments.extend(fetched);
648 }
649
650 if self
652 .config
653 .instrument_status_poll_secs
654 .is_some_and(|s| s > 0)
655 {
656 let mut collected_statuses = Vec::new();
658
659 for product_type in &product_types {
660 match self
661 .http_client
662 .request_instrument_statuses(*product_type)
663 .await
664 {
665 Ok(statuses) => collected_statuses.push(statuses),
666 Err(e) => {
667 log::warn!(
668 "Failed to seed instrument status cache for {product_type:?}: {e}"
669 );
670 }
671 }
672 }
673
674 let inst_guard = self.instruments.load();
675 let mut status_map = AHashMap::new();
676
677 for statuses in collected_statuses {
678 for (id, action) in statuses {
679 if inst_guard.contains_key(&id) {
680 status_map.insert(id, action);
681 }
682 }
683 }
684 log::info!(
685 "Seeded instrument status cache with {} entries",
686 status_map.len()
687 );
688 self.status_cache.store(status_map);
689 }
690
691 for instrument in all_instruments {
692 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
693 log::warn!("Failed to send instrument: {e}");
694 }
695 }
696
697 let instruments_by_symbol: Arc<AHashMap<Ustr, InstrumentAny>> = {
699 let guard = self.instruments.load();
700 let mut map = AHashMap::new();
701 for instrument in guard.values() {
702 map.insert(instrument.id().symbol.inner(), instrument.clone());
703 }
704 Arc::new(map)
705 };
706
707 for ws_client in &mut self.ws_clients {
708 ws_client
709 .connect()
710 .await
711 .context("failed to connect Bybit WebSocket")?;
712 ws_client
713 .wait_until_active(10.0)
714 .await
715 .context("WebSocket did not become active")?;
716
717 let stream = ws_client.stream();
718 let product_type = ws_client.product_type();
719 let sender = self.data_sender.clone();
720 let trade_subs = self.trade_subs.clone();
721 let ticker_subs = self.ticker_subs.clone();
722 let quote_depths = self.quote_depths.clone();
723 let book_depths = self.book_depths.clone();
724 let option_greeks_subs = self.option_greeks_subs.clone();
725 let bar_types_cache = ws_client.bar_types_cache().clone();
726 let instruments = Arc::clone(&instruments_by_symbol);
727 let clock = self.clock;
728 let cancel = self.cancellation_token.clone();
729
730 let handle = get_runtime().spawn(async move {
731 let mut quote_cache: AHashMap<InstrumentId, QuoteTick> = AHashMap::new();
732 let mut funding_cache: AHashMap<Ustr, FundingCacheEntry> = AHashMap::new();
733
734 pin_mut!(stream);
735
736 loop {
737 tokio::select! {
738 Some(message) = stream.next() => {
739 handle_ws_message(
740 &message,
741 &sender,
742 &instruments,
743 product_type,
744 &trade_subs,
745 &ticker_subs,
746 "e_depths,
747 &book_depths,
748 &option_greeks_subs,
749 &bar_types_cache,
750 &mut quote_cache,
751 &mut funding_cache,
752 clock,
753 );
754 }
755 () = cancel.cancelled() => {
756 log::debug!("WebSocket stream task cancelled");
757 break;
758 }
759 }
760 }
761 });
762 self.tasks.push(handle);
763 }
764
765 if let Some(poll_secs) = self.config.instrument_status_poll_secs
767 && poll_secs > 0
768 {
769 self.spawn_instrument_status_polling(&product_types, poll_secs);
770 }
771
772 self.is_connected.store(true, Ordering::Release);
773 log::info!("Connected: client_id={}", self.client_id);
774 Ok(())
775 }
776
777 async fn disconnect(&mut self) -> anyhow::Result<()> {
778 if self.is_disconnected() {
779 return Ok(());
780 }
781
782 self.cancellation_token.cancel();
783
784 self.cancellation_token = CancellationToken::new();
786
787 for ws_client in &mut self.ws_clients {
788 if let Err(e) = ws_client.close().await {
789 log::warn!("Error closing WebSocket: {e:?}");
790 }
791 }
792
793 tokio::time::sleep(Duration::from_millis(500)).await;
795
796 let handles: Vec<_> = self.tasks.drain(..).collect();
797 for handle in handles {
798 if let Err(e) = handle.await {
799 log::error!("Error joining WebSocket task: {e}");
800 }
801 }
802
803 self.book_depths.store(AHashMap::new());
804 self.quote_depths.store(AHashMap::new());
805 self.ticker_subs.store(AHashMap::new());
806 self.trade_subs.store(AHashSet::new());
807 self.option_greeks_subs.store(AHashSet::new());
808 self.instrument_status_subs.store(AHashSet::new());
809 self.status_cache.store(AHashMap::new());
810 self.is_connected.store(false, Ordering::Release);
811 log::info!("Disconnected: client_id={}", self.client_id);
812 Ok(())
813 }
814
815 fn is_connected(&self) -> bool {
816 self.is_connected.load(Ordering::Relaxed)
817 }
818
819 fn is_disconnected(&self) -> bool {
820 !self.is_connected()
821 }
822
823 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
824 if cmd.book_type != BookType::L2_MBP {
825 anyhow::bail!("Bybit only supports L2_MBP order book deltas");
826 }
827
828 let depth = cmd
829 .depth
830 .map_or(BYBIT_DEFAULT_ORDERBOOK_DEPTH, |d| d.get() as u32);
831
832 if !matches!(depth, 1 | 50 | 200 | 500) {
833 anyhow::bail!("invalid depth {depth}; valid values are 1, 50, 200, or 500");
834 }
835
836 let instrument_id = cmd.instrument_id;
837 let product_type = self
838 .get_product_type_for_instrument(instrument_id)
839 .unwrap_or(BybitProductType::Linear);
840
841 let ws = self
842 .get_ws_client_for_product(product_type)
843 .context("no WebSocket client for product type")?
844 .clone();
845
846 let book_depths = Arc::clone(&self.book_depths);
847
848 self.spawn_ws(
849 async move {
850 ws.subscribe_orderbook(instrument_id, depth)
851 .await
852 .context("orderbook subscription")?;
853 book_depths.insert(instrument_id, depth);
854 Ok(())
855 },
856 "order book delta subscription",
857 );
858
859 Ok(())
860 }
861
862 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
863 let instrument_id = cmd.instrument_id;
864 let product_type = self
865 .get_product_type_for_instrument(instrument_id)
866 .unwrap_or(BybitProductType::Linear);
867
868 let ws = self
869 .get_ws_client_for_product(product_type)
870 .context("no WebSocket client for product type")?
871 .clone();
872
873 if product_type == BybitProductType::Spot {
875 let depth = 1;
876 self.quote_depths.insert(instrument_id, depth);
877
878 self.spawn_ws(
879 async move {
880 ws.subscribe_orderbook(instrument_id, depth)
881 .await
882 .context("orderbook subscription for quotes")
883 },
884 "quote subscription (spot orderbook)",
885 );
886 } else {
887 let mut should_subscribe = false;
888 self.ticker_subs.rcu(|m| {
889 let entry = m.entry(instrument_id).or_default();
890 should_subscribe = entry.is_empty();
891 entry.insert("quotes");
892 });
893
894 if should_subscribe {
895 self.spawn_ws(
896 async move {
897 ws.subscribe_ticker(instrument_id)
898 .await
899 .context("ticker subscription")
900 },
901 "quote subscription",
902 );
903 }
904 }
905 Ok(())
906 }
907
908 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
909 let instrument_id = cmd.instrument_id;
910 let product_type = self
911 .get_product_type_for_instrument(instrument_id)
912 .unwrap_or(BybitProductType::Linear);
913
914 self.trade_subs.insert(instrument_id);
915
916 let ws = self
917 .get_ws_client_for_product(product_type)
918 .context("no WebSocket client for product type")?
919 .clone();
920
921 self.spawn_ws(
922 async move {
923 ws.subscribe_trades(instrument_id)
924 .await
925 .context("trades subscription")
926 },
927 "trade subscription",
928 );
929 Ok(())
930 }
931
932 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
933 let instrument_id = cmd.instrument_id;
934 let product_type = self
935 .get_product_type_for_instrument(instrument_id)
936 .unwrap_or(BybitProductType::Linear);
937
938 if product_type == BybitProductType::Spot || product_type == BybitProductType::Option {
939 anyhow::bail!("Funding rates not available for {product_type:?} instruments");
940 }
941
942 let mut should_subscribe = false;
943 self.ticker_subs.rcu(|m| {
944 let entry = m.entry(instrument_id).or_default();
945 should_subscribe = entry.is_empty();
946 entry.insert("funding");
947 });
948
949 if should_subscribe {
950 let ws = self
951 .get_ws_client_for_product(product_type)
952 .context("no WebSocket client for product type")?
953 .clone();
954
955 self.spawn_ws(
956 async move {
957 ws.subscribe_ticker(instrument_id)
958 .await
959 .context("ticker subscription for funding rates")
960 },
961 "funding rate subscription",
962 );
963 }
964 Ok(())
965 }
966
967 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
968 let instrument_id = cmd.instrument_id;
969 let product_type = self
970 .get_product_type_for_instrument(instrument_id)
971 .unwrap_or(BybitProductType::Linear);
972
973 if product_type == BybitProductType::Spot {
974 anyhow::bail!("Mark prices not available for Spot instruments");
975 }
976
977 let mut should_subscribe = false;
978 self.ticker_subs.rcu(|m| {
979 let entry = m.entry(instrument_id).or_default();
980 should_subscribe = entry.is_empty();
981 entry.insert("mark_prices");
982 });
983
984 if should_subscribe {
985 let ws = self
986 .get_ws_client_for_product(product_type)
987 .context("no WebSocket client for product type")?
988 .clone();
989
990 self.spawn_ws(
991 async move {
992 ws.subscribe_ticker(instrument_id)
993 .await
994 .context("ticker subscription for mark prices")
995 },
996 "mark price subscription",
997 );
998 }
999 Ok(())
1000 }
1001
1002 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
1003 let instrument_id = cmd.instrument_id;
1004 let product_type = self
1005 .get_product_type_for_instrument(instrument_id)
1006 .unwrap_or(BybitProductType::Linear);
1007
1008 if product_type == BybitProductType::Spot {
1009 anyhow::bail!("Index prices not available for Spot instruments");
1010 }
1011
1012 let mut should_subscribe = false;
1013 self.ticker_subs.rcu(|m| {
1014 let entry = m.entry(instrument_id).or_default();
1015 should_subscribe = entry.is_empty();
1016 entry.insert("index_prices");
1017 });
1018
1019 if should_subscribe {
1020 let ws = self
1021 .get_ws_client_for_product(product_type)
1022 .context("no WebSocket client for product type")?
1023 .clone();
1024
1025 self.spawn_ws(
1026 async move {
1027 ws.subscribe_ticker(instrument_id)
1028 .await
1029 .context("ticker subscription for index prices")
1030 },
1031 "index price subscription",
1032 );
1033 }
1034 Ok(())
1035 }
1036
1037 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
1038 let bar_type = cmd.bar_type;
1039 let instrument_id = bar_type.instrument_id();
1040 let product_type = self
1041 .get_product_type_for_instrument(instrument_id)
1042 .unwrap_or(BybitProductType::Linear);
1043
1044 if product_type == BybitProductType::Option {
1045 anyhow::bail!("Bybit does not support kline/bar data for options");
1046 }
1047
1048 let ws = self
1049 .get_ws_client_for_product(product_type)
1050 .context("no WebSocket client for product type")?
1051 .clone();
1052
1053 self.spawn_ws(
1054 async move {
1055 ws.subscribe_bars(bar_type)
1056 .await
1057 .context("bars subscription")
1058 },
1059 "bar subscription",
1060 );
1061 Ok(())
1062 }
1063
1064 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1065 let instrument_id = cmd.instrument_id;
1066 let depth = self
1067 .book_depths
1068 .load()
1069 .get(&instrument_id)
1070 .copied()
1071 .unwrap_or(BYBIT_DEFAULT_ORDERBOOK_DEPTH);
1072 self.book_depths.remove(&instrument_id);
1073
1074 let product_type = self
1075 .get_product_type_for_instrument(instrument_id)
1076 .unwrap_or(BybitProductType::Linear);
1077
1078 let quote_using_same_depth = self
1080 .quote_depths
1081 .load()
1082 .get(&instrument_id)
1083 .is_some_and(|&d| d == depth);
1084
1085 if quote_using_same_depth {
1086 return Ok(());
1087 }
1088
1089 let ws = self
1090 .get_ws_client_for_product(product_type)
1091 .context("no WebSocket client for product type")?
1092 .clone();
1093
1094 self.spawn_ws(
1095 async move {
1096 ws.unsubscribe_orderbook(instrument_id, depth)
1097 .await
1098 .context("orderbook unsubscribe")
1099 },
1100 "order book unsubscribe",
1101 );
1102 Ok(())
1103 }
1104
1105 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1106 let instrument_id = cmd.instrument_id;
1107 let product_type = self
1108 .get_product_type_for_instrument(instrument_id)
1109 .unwrap_or(BybitProductType::Linear);
1110
1111 let ws = self
1112 .get_ws_client_for_product(product_type)
1113 .context("no WebSocket client for product type")?
1114 .clone();
1115
1116 if product_type == BybitProductType::Spot {
1117 let depth = self
1118 .quote_depths
1119 .load()
1120 .get(&instrument_id)
1121 .copied()
1122 .unwrap_or(1);
1123 self.quote_depths.remove(&instrument_id);
1124
1125 let book_using_same_depth = self
1127 .book_depths
1128 .load()
1129 .get(&instrument_id)
1130 .is_some_and(|&d| d == depth);
1131
1132 if !book_using_same_depth {
1133 self.spawn_ws(
1134 async move {
1135 ws.unsubscribe_orderbook(instrument_id, depth)
1136 .await
1137 .context("orderbook unsubscribe for quotes")
1138 },
1139 "quote unsubscribe (spot orderbook)",
1140 );
1141 }
1142 } else {
1143 let mut should_unsubscribe = false;
1144 self.ticker_subs.rcu(|m| {
1145 if let Some(entry) = m.get_mut(&instrument_id) {
1146 entry.remove("quotes");
1147 if entry.is_empty() {
1148 m.remove(&instrument_id);
1149 should_unsubscribe = true;
1150 } else {
1151 should_unsubscribe = false;
1152 }
1153 } else {
1154 should_unsubscribe = false;
1155 }
1156 });
1157
1158 if should_unsubscribe {
1159 self.spawn_ws(
1160 async move {
1161 ws.unsubscribe_ticker(instrument_id)
1162 .await
1163 .context("ticker unsubscribe")
1164 },
1165 "quote unsubscribe",
1166 );
1167 }
1168 }
1169 Ok(())
1170 }
1171
1172 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1173 let instrument_id = cmd.instrument_id;
1174 let product_type = self
1175 .get_product_type_for_instrument(instrument_id)
1176 .unwrap_or(BybitProductType::Linear);
1177
1178 self.trade_subs.remove(&instrument_id);
1179
1180 let ws = self
1181 .get_ws_client_for_product(product_type)
1182 .context("no WebSocket client for product type")?
1183 .clone();
1184
1185 self.spawn_ws(
1186 async move {
1187 ws.unsubscribe_trades(instrument_id)
1188 .await
1189 .context("trades unsubscribe")
1190 },
1191 "trade unsubscribe",
1192 );
1193 Ok(())
1194 }
1195
1196 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1197 let instrument_id = cmd.instrument_id;
1198 let product_type = self
1199 .get_product_type_for_instrument(instrument_id)
1200 .unwrap_or(BybitProductType::Linear);
1201
1202 let mut should_unsubscribe = false;
1203 self.ticker_subs.rcu(|m| {
1204 if let Some(entry) = m.get_mut(&instrument_id) {
1205 entry.remove("funding");
1206 if entry.is_empty() {
1207 m.remove(&instrument_id);
1208 should_unsubscribe = true;
1209 } else {
1210 should_unsubscribe = false;
1211 }
1212 } else {
1213 should_unsubscribe = false;
1214 }
1215 });
1216
1217 if should_unsubscribe {
1218 let ws = self
1219 .get_ws_client_for_product(product_type)
1220 .context("no WebSocket client for product type")?
1221 .clone();
1222
1223 self.spawn_ws(
1224 async move {
1225 ws.unsubscribe_ticker(instrument_id)
1226 .await
1227 .context("ticker unsubscribe for funding rates")
1228 },
1229 "funding rate unsubscribe",
1230 );
1231 }
1232 Ok(())
1233 }
1234
1235 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1236 let instrument_id = cmd.instrument_id;
1237 let product_type = self
1238 .get_product_type_for_instrument(instrument_id)
1239 .unwrap_or(BybitProductType::Linear);
1240
1241 let mut should_unsubscribe = false;
1242 self.ticker_subs.rcu(|m| {
1243 if let Some(entry) = m.get_mut(&instrument_id) {
1244 entry.remove("mark_prices");
1245 if entry.is_empty() {
1246 m.remove(&instrument_id);
1247 should_unsubscribe = true;
1248 } else {
1249 should_unsubscribe = false;
1250 }
1251 } else {
1252 should_unsubscribe = false;
1253 }
1254 });
1255
1256 if should_unsubscribe {
1257 let ws = self
1258 .get_ws_client_for_product(product_type)
1259 .context("no WebSocket client for product type")?
1260 .clone();
1261
1262 self.spawn_ws(
1263 async move {
1264 ws.unsubscribe_ticker(instrument_id)
1265 .await
1266 .context("ticker unsubscribe for mark prices")
1267 },
1268 "mark price unsubscribe",
1269 );
1270 }
1271 Ok(())
1272 }
1273
1274 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1275 let instrument_id = cmd.instrument_id;
1276 let product_type = self
1277 .get_product_type_for_instrument(instrument_id)
1278 .unwrap_or(BybitProductType::Linear);
1279
1280 let mut should_unsubscribe = false;
1281 self.ticker_subs.rcu(|m| {
1282 if let Some(entry) = m.get_mut(&instrument_id) {
1283 entry.remove("index_prices");
1284 if entry.is_empty() {
1285 m.remove(&instrument_id);
1286 should_unsubscribe = true;
1287 } else {
1288 should_unsubscribe = false;
1289 }
1290 } else {
1291 should_unsubscribe = false;
1292 }
1293 });
1294
1295 if should_unsubscribe {
1296 let ws = self
1297 .get_ws_client_for_product(product_type)
1298 .context("no WebSocket client for product type")?
1299 .clone();
1300
1301 self.spawn_ws(
1302 async move {
1303 ws.unsubscribe_ticker(instrument_id)
1304 .await
1305 .context("ticker unsubscribe for index prices")
1306 },
1307 "index price unsubscribe",
1308 );
1309 }
1310 Ok(())
1311 }
1312
1313 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1314 let bar_type = cmd.bar_type;
1315 let instrument_id = bar_type.instrument_id();
1316 let product_type = self
1317 .get_product_type_for_instrument(instrument_id)
1318 .unwrap_or(BybitProductType::Linear);
1319
1320 let ws = self
1321 .get_ws_client_for_product(product_type)
1322 .context("no WebSocket client for product type")?
1323 .clone();
1324
1325 self.spawn_ws(
1326 async move {
1327 ws.unsubscribe_bars(bar_type)
1328 .await
1329 .context("bars unsubscribe")
1330 },
1331 "bar unsubscribe",
1332 );
1333 Ok(())
1334 }
1335
1336 fn subscribe_option_greeks(&mut self, cmd: SubscribeOptionGreeks) -> anyhow::Result<()> {
1337 let instrument_id = cmd.instrument_id;
1338 self.option_greeks_subs.insert(instrument_id);
1339
1340 let mut should_subscribe = false;
1341 self.ticker_subs.rcu(|m| {
1342 let entry = m.entry(instrument_id).or_default();
1343 should_subscribe = entry.is_empty();
1344 entry.insert("option_greeks");
1345 });
1346
1347 if should_subscribe {
1348 let product_type = self
1349 .get_product_type_for_instrument(instrument_id)
1350 .unwrap_or(BybitProductType::Option);
1351
1352 let ws = self
1353 .get_ws_client_for_product(product_type)
1354 .context("no WebSocket client for product type")?
1355 .clone();
1356
1357 self.spawn_ws(
1358 async move {
1359 ws.subscribe_ticker(instrument_id)
1360 .await
1361 .context("ticker subscription for option greeks")
1362 },
1363 "option greeks subscription",
1364 );
1365 }
1366 Ok(())
1367 }
1368
1369 fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
1370 let instrument_id = cmd.instrument_id;
1371 self.option_greeks_subs.remove(&instrument_id);
1372
1373 let mut should_unsubscribe = false;
1374 self.ticker_subs.rcu(|m| {
1375 if let Some(entry) = m.get_mut(&instrument_id) {
1376 entry.remove("option_greeks");
1377 if entry.is_empty() {
1378 m.remove(&instrument_id);
1379 should_unsubscribe = true;
1380 } else {
1381 should_unsubscribe = false;
1382 }
1383 } else {
1384 should_unsubscribe = false;
1385 }
1386 });
1387
1388 if should_unsubscribe {
1389 let product_type = self
1390 .get_product_type_for_instrument(instrument_id)
1391 .unwrap_or(BybitProductType::Option);
1392
1393 let ws = self
1394 .get_ws_client_for_product(product_type)
1395 .context("no WebSocket client for product type")?
1396 .clone();
1397
1398 self.spawn_ws(
1399 async move {
1400 ws.unsubscribe_ticker(instrument_id)
1401 .await
1402 .context("ticker unsubscribe for option greeks")
1403 },
1404 "option greeks unsubscribe",
1405 );
1406 }
1407 Ok(())
1408 }
1409
1410 fn subscribe_instrument_status(
1411 &mut self,
1412 cmd: SubscribeInstrumentStatus,
1413 ) -> anyhow::Result<()> {
1414 log::debug!(
1415 "subscribe_instrument_status: {id} (status changes detected via periodic instrument info polling)",
1416 id = cmd.instrument_id,
1417 );
1418 self.instrument_status_subs.insert(cmd.instrument_id);
1419 Ok(())
1420 }
1421
1422 fn unsubscribe_instrument_status(
1423 &mut self,
1424 cmd: &UnsubscribeInstrumentStatus,
1425 ) -> anyhow::Result<()> {
1426 log::debug!(
1427 "unsubscribe_instrument_status: {id}",
1428 id = cmd.instrument_id,
1429 );
1430 self.instrument_status_subs.remove(&cmd.instrument_id);
1431 Ok(())
1432 }
1433
1434 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1435 let http = self.http_client.clone();
1436 let sender = self.data_sender.clone();
1437 let instruments_cache = self.instruments.clone();
1438 let request_id = request.request_id;
1439 let client_id = request.client_id.unwrap_or(self.client_id);
1440 let venue = self.venue();
1441 let start = request.start;
1442 let end = request.end;
1443 let params = request.params;
1444 let clock = self.clock;
1445 let start_nanos = datetime_to_unix_nanos(start);
1446 let end_nanos = datetime_to_unix_nanos(end);
1447 let product_types = if self.config.product_types.is_empty() {
1448 vec![BybitProductType::Linear]
1449 } else {
1450 self.config.product_types.clone()
1451 };
1452
1453 get_runtime().spawn(async move {
1454 let mut all_instruments = Vec::new();
1455
1456 for product_type in product_types {
1457 match http.request_instruments(product_type, None, None).await {
1458 Ok(instruments) => {
1459 for instrument in instruments {
1460 upsert_instrument(&instruments_cache, instrument.clone());
1461 all_instruments.push(instrument);
1462 }
1463 }
1464 Err(e) => {
1465 log::error!("Failed to fetch instruments for {product_type:?}: {e:?}");
1466 }
1467 }
1468 }
1469
1470 let response = DataResponse::Instruments(InstrumentsResponse::new(
1471 request_id,
1472 client_id,
1473 venue,
1474 all_instruments,
1475 start_nanos,
1476 end_nanos,
1477 clock.get_time_ns(),
1478 params,
1479 ));
1480
1481 if let Err(e) = sender.send(DataEvent::Response(response)) {
1482 log::error!("Failed to send instruments response: {e}");
1483 }
1484 });
1485
1486 Ok(())
1487 }
1488
1489 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1490 let http = self.http_client.clone();
1491 let sender = self.data_sender.clone();
1492 let instruments = self.instruments.clone();
1493 let instrument_id = request.instrument_id;
1494 let request_id = request.request_id;
1495 let client_id = request.client_id.unwrap_or(self.client_id);
1496 let start = request.start;
1497 let end = request.end;
1498 let params = request.params;
1499 let clock = self.clock;
1500 let start_nanos = datetime_to_unix_nanos(start);
1501 let end_nanos = datetime_to_unix_nanos(end);
1502
1503 let product_type = BybitProductType::from_suffix(instrument_id.symbol.as_str())
1504 .unwrap_or(BybitProductType::Linear);
1505 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str()).to_string();
1506
1507 get_runtime().spawn(async move {
1508 match http
1509 .request_instruments(product_type, Some(raw_symbol), None)
1510 .await
1511 .context("fetch instrument from API")
1512 {
1513 Ok(fetched) => {
1514 if let Some(instrument) = fetched.into_iter().find(|i| i.id() == instrument_id)
1515 {
1516 upsert_instrument(&instruments, instrument.clone());
1517
1518 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1519 request_id,
1520 client_id,
1521 instrument.id(),
1522 instrument,
1523 start_nanos,
1524 end_nanos,
1525 clock.get_time_ns(),
1526 params,
1527 )));
1528
1529 if let Err(e) = sender.send(DataEvent::Response(response)) {
1530 log::error!("Failed to send instrument response: {e}");
1531 }
1532 } else {
1533 log::error!("Instrument not found: {instrument_id}");
1534 }
1535 }
1536 Err(e) => log::error!("Instrument request failed: {e:?}"),
1537 }
1538 });
1539
1540 Ok(())
1541 }
1542
1543 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1544 let http = self.http_client.clone();
1545 let sender = self.data_sender.clone();
1546 let instrument_id = request.instrument_id;
1547 let depth = request.depth.map(|n| n.get() as u32);
1548 let request_id = request.request_id;
1549 let client_id = request.client_id.unwrap_or(self.client_id);
1550 let params = request.params;
1551 let clock = self.clock;
1552
1553 let product_type = BybitProductType::from_suffix(instrument_id.symbol.as_str())
1554 .unwrap_or(BybitProductType::Linear);
1555
1556 get_runtime().spawn(async move {
1557 match http
1558 .request_orderbook_snapshot(product_type, instrument_id, depth)
1559 .await
1560 .context("failed to request book snapshot from Bybit")
1561 {
1562 Ok(deltas) => {
1563 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1564 if let Err(e) = book.apply_deltas(&deltas) {
1565 log::error!("Failed to apply book deltas for {instrument_id}: {e}");
1566 return;
1567 }
1568
1569 let response = DataResponse::Book(BookResponse::new(
1570 request_id,
1571 client_id,
1572 instrument_id,
1573 book,
1574 None,
1575 None,
1576 clock.get_time_ns(),
1577 params,
1578 ));
1579
1580 if let Err(e) = sender.send(DataEvent::Response(response)) {
1581 log::error!("Failed to send book snapshot response: {e}");
1582 }
1583 }
1584 Err(e) => log::error!("Book snapshot request failed for {instrument_id}: {e:?}"),
1585 }
1586 });
1587
1588 Ok(())
1589 }
1590
1591 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1592 let http = self.http_client.clone();
1593 let sender = self.data_sender.clone();
1594 let instrument_id = request.instrument_id;
1595 let start = request.start;
1596 let end = request.end;
1597 let limit = request.limit.map(|n| n.get() as u32);
1598 let request_id = request.request_id;
1599 let client_id = request.client_id.unwrap_or(self.client_id);
1600 let params = request.params;
1601 let clock = self.clock;
1602 let start_nanos = datetime_to_unix_nanos(start);
1603 let end_nanos = datetime_to_unix_nanos(end);
1604
1605 let product_type = BybitProductType::from_suffix(instrument_id.symbol.as_str())
1606 .unwrap_or(BybitProductType::Linear);
1607
1608 get_runtime().spawn(async move {
1609 match http
1610 .request_trades(product_type, instrument_id, limit)
1611 .await
1612 .context("failed to request trades from Bybit")
1613 {
1614 Ok(trades) => {
1615 let response = DataResponse::Trades(TradesResponse::new(
1616 request_id,
1617 client_id,
1618 instrument_id,
1619 trades,
1620 start_nanos,
1621 end_nanos,
1622 clock.get_time_ns(),
1623 params,
1624 ));
1625
1626 if let Err(e) = sender.send(DataEvent::Response(response)) {
1627 log::error!("Failed to send trades response: {e}");
1628 }
1629 }
1630 Err(e) => log::error!("Trade request failed: {e:?}"),
1631 }
1632 });
1633
1634 Ok(())
1635 }
1636
1637 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1638 let http = self.http_client.clone();
1639 let sender = self.data_sender.clone();
1640 let bar_type = request.bar_type;
1641 let start = request.start;
1642 let end = request.end;
1643 let limit = request.limit.map(|n| n.get() as u32);
1644 let request_id = request.request_id;
1645 let client_id = request.client_id.unwrap_or(self.client_id);
1646 let params = request.params;
1647 let clock = self.clock;
1648 let start_nanos = datetime_to_unix_nanos(start);
1649 let end_nanos = datetime_to_unix_nanos(end);
1650
1651 let instrument_id = bar_type.instrument_id();
1652 let product_type = BybitProductType::from_suffix(instrument_id.symbol.as_str())
1653 .unwrap_or(BybitProductType::Linear);
1654
1655 get_runtime().spawn(async move {
1656 match http
1657 .request_bars(product_type, bar_type, start, end, limit, true)
1658 .await
1659 .context("failed to request bars from Bybit")
1660 {
1661 Ok(bars) => {
1662 let response = DataResponse::Bars(BarsResponse::new(
1663 request_id,
1664 client_id,
1665 bar_type,
1666 bars,
1667 start_nanos,
1668 end_nanos,
1669 clock.get_time_ns(),
1670 params,
1671 ));
1672
1673 if let Err(e) = sender.send(DataEvent::Response(response)) {
1674 log::error!("Failed to send bars response: {e}");
1675 }
1676 }
1677 Err(e) => log::error!("Bar request failed: {e:?}"),
1678 }
1679 });
1680
1681 Ok(())
1682 }
1683
1684 fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1685 let http = self.http_client.clone();
1686 let sender = self.data_sender.clone();
1687 let instrument_id = request.instrument_id;
1688 let start = request.start;
1689 let end = request.end;
1690 let limit = request.limit.map(|n| n.get() as u32);
1691 let request_id = request.request_id;
1692 let client_id = request.client_id.unwrap_or(self.client_id);
1693 let params = request.params;
1694 let clock = self.clock;
1695 let start_nanos = datetime_to_unix_nanos(start);
1696 let end_nanos = datetime_to_unix_nanos(end);
1697
1698 let product_type = BybitProductType::from_suffix(instrument_id.symbol.as_str())
1699 .unwrap_or(BybitProductType::Linear);
1700
1701 if product_type == BybitProductType::Spot || product_type == BybitProductType::Option {
1702 anyhow::bail!("Funding rates not available for {product_type} instruments");
1703 }
1704
1705 get_runtime().spawn(async move {
1706 match http
1707 .request_funding_rates(product_type, instrument_id, start, end, limit)
1708 .await
1709 .context("failed to request funding rates from Bybit")
1710 {
1711 Ok(funding_rates) => {
1712 let response = DataResponse::FundingRates(FundingRatesResponse::new(
1713 request_id,
1714 client_id,
1715 instrument_id,
1716 funding_rates,
1717 start_nanos,
1718 end_nanos,
1719 clock.get_time_ns(),
1720 params,
1721 ));
1722
1723 if let Err(e) = sender.send(DataEvent::Response(response)) {
1724 log::error!("Failed to send funding rates response: {e}");
1725 }
1726 }
1727 Err(e) => log::error!("Funding rates request failed for {instrument_id}: {e:?}"),
1728 }
1729 });
1730
1731 Ok(())
1732 }
1733
1734 fn request_forward_prices(&self, request: RequestForwardPrices) -> anyhow::Result<()> {
1735 let underlying = request.underlying.to_string();
1736 let instrument_id = request.instrument_id;
1737 let http_client = self.http_client.clone();
1738 let sender = self.data_sender.clone();
1739 let request_id = request.request_id;
1740 let client_id = self.client_id();
1741 let params = request.params;
1742 let clock = self.clock;
1743 let venue = *BYBIT_VENUE;
1744
1745 get_runtime().spawn(async move {
1746 let result = if let Some(inst_id) = instrument_id {
1747 let raw_symbol = extract_raw_symbol(inst_id.symbol.as_str()).to_string();
1749 log::info!(
1750 "Requesting forward price for {underlying} (single instrument: {raw_symbol})"
1751 );
1752
1753 let params = crate::http::query::BybitTickersParams {
1754 category: BybitProductType::Option,
1755 symbol: Some(raw_symbol.clone()),
1756 base_coin: None,
1757 exp_date: None,
1758 };
1759
1760 match http_client.request_option_tickers_raw_with_params(¶ms).await {
1761 Ok(tickers) => {
1762 let ts = clock.get_time_ns();
1763 let forward_prices: Vec<ForwardPrice> = tickers
1764 .into_iter()
1765 .filter_map(|t| {
1766 let up: Decimal = t.underlying_price.parse().ok()?;
1767 if up.is_zero() {
1768 return None;
1769 }
1770 Some(ForwardPrice::new(inst_id, up, None, ts, ts))
1771 })
1772 .collect();
1773
1774 log::info!(
1775 "Fetched {} forward price for {underlying} (single instrument: {raw_symbol})",
1776 forward_prices.len(),
1777 );
1778 Ok((forward_prices, ts))
1779 }
1780 Err(e) => Err(e),
1781 }
1782 } else {
1783 log::info!("Requesting option forward prices for base_coin={underlying} (bulk)");
1785
1786 match http_client.request_option_tickers_raw(&underlying).await {
1787 Ok(tickers) => {
1788 let ts = clock.get_time_ns();
1789
1790 let mut seen_expiries = std::collections::HashSet::new();
1794 let forward_prices: Vec<ForwardPrice> = tickers
1795 .into_iter()
1796 .filter_map(|t| {
1797 let up: Decimal = t.underlying_price.parse().ok()?;
1798 if up.is_zero() {
1799 return None;
1800 }
1801 let parts: Vec<&str> = t.symbol.splitn(3, '-').collect();
1802 let expiry_key = if parts.len() >= 2 {
1803 format!("{}-{}", parts[0], parts[1])
1804 } else {
1805 t.symbol.to_string()
1806 };
1807
1808 if !seen_expiries.insert(expiry_key) {
1809 return None;
1810 }
1811 Some(ForwardPrice::new(
1812 BybitSymbol::new(format!("{}-OPTION", t.symbol))
1813 .map(|s| s.to_instrument_id())
1814 .ok()?,
1815 up,
1816 None,
1817 ts,
1818 ts,
1819 ))
1820 })
1821 .collect();
1822
1823 log::info!(
1824 "Fetched {} forward prices (per-expiry) for {underlying}",
1825 forward_prices.len(),
1826 );
1827 Ok((forward_prices, ts))
1828 }
1829 Err(e) => Err(e),
1830 }
1831 };
1832
1833 match result {
1834 Ok((forward_prices, ts)) => {
1835 let response = DataResponse::ForwardPrices(ForwardPricesResponse::new(
1836 request_id,
1837 client_id,
1838 venue,
1839 forward_prices,
1840 ts,
1841 params,
1842 ));
1843
1844 if let Err(e) = sender.send(DataEvent::Response(response)) {
1845 log::error!("Failed to send forward prices response: {e}");
1846 }
1847 }
1848 Err(e) => {
1849 log::error!("Forward prices request failed for {underlying}: {e:?}");
1850 }
1851 }
1852 });
1853
1854 Ok(())
1855 }
1856}
1857
1858#[cfg(test)]
1859mod tests {
1860 use std::sync::Arc;
1861
1862 use ahash::{AHashMap, AHashSet};
1863 use nautilus_common::messages::DataEvent;
1864 use nautilus_core::{AtomicMap, AtomicSet, UnixNanos, time::get_atomic_clock_realtime};
1865 use nautilus_model::{
1866 data::{BarType, Data, QuoteTick},
1867 enums::AggressorSide,
1868 identifiers::InstrumentId,
1869 instruments::{Instrument, InstrumentAny},
1870 types::{Price, Quantity},
1871 };
1872 use rstest::rstest;
1873 use ustr::Ustr;
1874
1875 use super::handle_ws_message;
1876 use crate::{
1877 common::{
1878 enums::BybitProductType,
1879 parse::{parse_linear_instrument, parse_option_instrument},
1880 testing::load_test_json,
1881 },
1882 http::models::{
1883 BybitFeeRate, BybitInstrumentLinearResponse, BybitInstrumentOptionResponse,
1884 },
1885 websocket::messages::{
1886 BybitWsMessage, BybitWsOrderbookDepthMsg, BybitWsTickerLinearMsg,
1887 BybitWsTickerOptionMsg, BybitWsTradeMsg,
1888 },
1889 };
1890
1891 fn sample_fee_rate(
1892 symbol: &str,
1893 taker: &str,
1894 maker: &str,
1895 base_coin: Option<&str>,
1896 ) -> BybitFeeRate {
1897 BybitFeeRate {
1898 symbol: Ustr::from(symbol),
1899 taker_fee_rate: taker.to_string(),
1900 maker_fee_rate: maker.to_string(),
1901 base_coin: base_coin.map(Ustr::from),
1902 }
1903 }
1904
1905 fn linear_instrument() -> InstrumentAny {
1906 let json = load_test_json("http_get_instruments_linear.json");
1907 let response: BybitInstrumentLinearResponse = serde_json::from_str(&json).unwrap();
1908 let instrument = &response.result.list[0];
1909 let fee_rate = sample_fee_rate("BTCUSDT", "0.00055", "0.0001", Some("BTC"));
1910 let ts = UnixNanos::new(1_700_000_000_000_000_000);
1911 parse_linear_instrument(instrument, &fee_rate, ts, ts).unwrap()
1912 }
1913
1914 fn option_instrument() -> InstrumentAny {
1915 let json = load_test_json("http_get_instruments_option.json");
1916 let response: BybitInstrumentOptionResponse = serde_json::from_str(&json).unwrap();
1917 let instrument = &response.result.list[0];
1918 let ts = UnixNanos::new(1_700_000_000_000_000_000);
1919 parse_option_instrument(instrument, None, ts, ts).unwrap()
1920 }
1921
1922 fn build_instruments(instruments: &[InstrumentAny]) -> AHashMap<Ustr, InstrumentAny> {
1923 let mut map = AHashMap::new();
1924 for inst in instruments {
1925 map.insert(inst.id().symbol.inner(), inst.clone());
1926 }
1927 map
1928 }
1929
1930 #[expect(clippy::type_complexity)]
1931 fn empty_subs() -> (
1932 Arc<AtomicSet<InstrumentId>>,
1933 Arc<AtomicMap<InstrumentId, AHashSet<&'static str>>>,
1934 Arc<AtomicMap<InstrumentId, u32>>,
1935 Arc<AtomicMap<InstrumentId, u32>>,
1936 Arc<AtomicSet<InstrumentId>>,
1937 Arc<AtomicMap<String, BarType>>,
1938 ) {
1939 (
1940 Arc::new(AtomicSet::new()),
1941 Arc::new(AtomicMap::new()),
1942 Arc::new(AtomicMap::new()),
1943 Arc::new(AtomicMap::new()),
1944 Arc::new(AtomicSet::new()),
1945 Arc::new(AtomicMap::new()),
1946 )
1947 }
1948
1949 #[rstest]
1950 fn test_handle_trade_message_emits_trade_tick() {
1951 let instrument = linear_instrument();
1952 let instruments = build_instruments(std::slice::from_ref(&instrument));
1953 let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
1954 empty_subs();
1955 trade_subs.insert(instrument.id());
1956 let mut quote_cache = AHashMap::new();
1957 let mut funding_cache = AHashMap::new();
1958 let clock = get_atomic_clock_realtime();
1959
1960 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1961
1962 let json = load_test_json("ws_public_trade.json");
1963 let msg: BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
1964 let ws_msg = BybitWsMessage::Trade(msg);
1965
1966 handle_ws_message(
1967 &ws_msg,
1968 &tx,
1969 &instruments,
1970 Some(BybitProductType::Linear),
1971 &trade_subs,
1972 &ticker_subs,
1973 "e_depths,
1974 &book_depths,
1975 &greeks_subs,
1976 &bar_types,
1977 &mut quote_cache,
1978 &mut funding_cache,
1979 clock,
1980 );
1981
1982 let event = rx.try_recv().unwrap();
1983 match event {
1984 DataEvent::Data(Data::Trade(tick)) => {
1985 assert_eq!(tick.instrument_id, instrument.id());
1986 assert_eq!(tick.price, instrument.make_price(27451.00));
1987 assert_eq!(tick.size, instrument.make_qty(0.010, None));
1988 assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
1989 }
1990 other => panic!("Expected Trade data event, found {other:?}"),
1991 }
1992 }
1993
1994 #[rstest]
1995 fn test_handle_trade_message_unknown_symbol_no_event() {
1996 let instruments = AHashMap::new();
1997 let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
1998 empty_subs();
1999 let mut quote_cache = AHashMap::new();
2000 let mut funding_cache = AHashMap::new();
2001 let clock = get_atomic_clock_realtime();
2002
2003 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2004
2005 let json = load_test_json("ws_public_trade.json");
2006 let msg: BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
2007 let ws_msg = BybitWsMessage::Trade(msg);
2008
2009 handle_ws_message(
2010 &ws_msg,
2011 &tx,
2012 &instruments,
2013 Some(BybitProductType::Linear),
2014 &trade_subs,
2015 &ticker_subs,
2016 "e_depths,
2017 &book_depths,
2018 &greeks_subs,
2019 &bar_types,
2020 &mut quote_cache,
2021 &mut funding_cache,
2022 clock,
2023 );
2024
2025 assert!(rx.try_recv().is_err());
2026 }
2027
2028 #[rstest]
2029 fn test_handle_orderbook_message_emits_deltas_and_quote() {
2030 let instrument = linear_instrument();
2031 let instrument_id = instrument.id();
2032 let instruments = build_instruments(&[instrument]);
2033 let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2034 empty_subs();
2035
2036 book_depths.insert(instrument_id, 1);
2037 quote_depths.insert(instrument_id, 1);
2038
2039 let mut quote_cache = AHashMap::new();
2040 let mut funding_cache = AHashMap::new();
2041 let clock = get_atomic_clock_realtime();
2042
2043 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2044
2045 let json = load_test_json("ws_orderbook_snapshot.json");
2046 let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
2047 let ws_msg = BybitWsMessage::Orderbook(msg);
2048
2049 handle_ws_message(
2050 &ws_msg,
2051 &tx,
2052 &instruments,
2053 Some(BybitProductType::Linear),
2054 &trade_subs,
2055 &ticker_subs,
2056 "e_depths,
2057 &book_depths,
2058 &greeks_subs,
2059 &bar_types,
2060 &mut quote_cache,
2061 &mut funding_cache,
2062 clock,
2063 );
2064
2065 let event1 = rx.try_recv().unwrap();
2066 assert!(matches!(event1, DataEvent::Data(Data::Deltas(_))));
2067
2068 let event2 = rx.try_recv().unwrap();
2069 assert!(matches!(event2, DataEvent::Data(Data::Quote(_))));
2070 }
2071
2072 #[rstest]
2073 fn test_handle_orderbook_message_no_sub_no_event() {
2074 let instrument = linear_instrument();
2075 let instruments = build_instruments(&[instrument]);
2076 let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2077 empty_subs();
2078 let mut quote_cache = AHashMap::new();
2079 let mut funding_cache = AHashMap::new();
2080 let clock = get_atomic_clock_realtime();
2081
2082 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2083
2084 let json = load_test_json("ws_orderbook_snapshot.json");
2085 let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
2086 let ws_msg = BybitWsMessage::Orderbook(msg);
2087
2088 handle_ws_message(
2089 &ws_msg,
2090 &tx,
2091 &instruments,
2092 Some(BybitProductType::Linear),
2093 &trade_subs,
2094 &ticker_subs,
2095 "e_depths,
2096 &book_depths,
2097 &greeks_subs,
2098 &bar_types,
2099 &mut quote_cache,
2100 &mut funding_cache,
2101 clock,
2102 );
2103
2104 assert!(rx.try_recv().is_err());
2105 }
2106
2107 #[rstest]
2108 fn test_handle_ticker_linear_emits_quote() {
2109 let instrument = linear_instrument();
2110 let instrument_id = instrument.id();
2111 let instruments = build_instruments(&[instrument]);
2112 let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2113 empty_subs();
2114
2115 let mut subs = AHashSet::new();
2116 subs.insert("quotes");
2117 ticker_subs.insert(instrument_id, subs);
2118
2119 let mut quote_cache = AHashMap::new();
2120 let mut funding_cache = AHashMap::new();
2121 let clock = get_atomic_clock_realtime();
2122
2123 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2124
2125 let json = load_test_json("ws_ticker_linear.json");
2126 let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
2127 let ws_msg = BybitWsMessage::TickerLinear(msg);
2128
2129 handle_ws_message(
2130 &ws_msg,
2131 &tx,
2132 &instruments,
2133 Some(BybitProductType::Linear),
2134 &trade_subs,
2135 &ticker_subs,
2136 "e_depths,
2137 &book_depths,
2138 &greeks_subs,
2139 &bar_types,
2140 &mut quote_cache,
2141 &mut funding_cache,
2142 clock,
2143 );
2144
2145 let event = rx.try_recv().unwrap();
2146 assert!(matches!(event, DataEvent::Data(Data::Quote(_))));
2147 assert!(quote_cache.contains_key(&instrument_id));
2148 }
2149
2150 #[rstest]
2151 fn test_handle_ticker_linear_funding_dedup() {
2152 let instrument = linear_instrument();
2153 let instrument_id = instrument.id();
2154 let instruments = build_instruments(&[instrument]);
2155 let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2156 empty_subs();
2157
2158 let mut subs = AHashSet::new();
2159 subs.insert("funding");
2160 ticker_subs.insert(instrument_id, subs);
2161
2162 let mut quote_cache = AHashMap::new();
2163 let mut funding_cache = AHashMap::new();
2164 let clock = get_atomic_clock_realtime();
2165
2166 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2167
2168 let json = load_test_json("ws_ticker_linear.json");
2169 let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
2170 let ws_msg = BybitWsMessage::TickerLinear(msg.clone());
2171
2172 handle_ws_message(
2173 &ws_msg,
2174 &tx,
2175 &instruments,
2176 Some(BybitProductType::Linear),
2177 &trade_subs,
2178 &ticker_subs,
2179 "e_depths,
2180 &book_depths,
2181 &greeks_subs,
2182 &bar_types,
2183 &mut quote_cache,
2184 &mut funding_cache,
2185 clock,
2186 );
2187
2188 let event = rx.try_recv().unwrap();
2189 assert!(matches!(event, DataEvent::FundingRate(_)));
2190
2191 let ws_msg2 = BybitWsMessage::TickerLinear(msg);
2193 handle_ws_message(
2194 &ws_msg2,
2195 &tx,
2196 &instruments,
2197 Some(BybitProductType::Linear),
2198 &trade_subs,
2199 &ticker_subs,
2200 "e_depths,
2201 &book_depths,
2202 &greeks_subs,
2203 &bar_types,
2204 &mut quote_cache,
2205 &mut funding_cache,
2206 clock,
2207 );
2208
2209 assert!(rx.try_recv().is_err());
2210 }
2211
2212 #[rstest]
2213 fn test_handle_ticker_linear_mark_and_index_prices() {
2214 let instrument = linear_instrument();
2215 let instrument_id = instrument.id();
2216 let instruments = build_instruments(&[instrument]);
2217 let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2218 empty_subs();
2219
2220 let mut subs = AHashSet::new();
2221 subs.insert("mark_prices");
2222 subs.insert("index_prices");
2223 ticker_subs.insert(instrument_id, subs);
2224
2225 let mut quote_cache = AHashMap::new();
2226 let mut funding_cache = AHashMap::new();
2227 let clock = get_atomic_clock_realtime();
2228
2229 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2230
2231 let json = load_test_json("ws_ticker_linear.json");
2232 let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
2233 let ws_msg = BybitWsMessage::TickerLinear(msg);
2234
2235 handle_ws_message(
2236 &ws_msg,
2237 &tx,
2238 &instruments,
2239 Some(BybitProductType::Linear),
2240 &trade_subs,
2241 &ticker_subs,
2242 "e_depths,
2243 &book_depths,
2244 &greeks_subs,
2245 &bar_types,
2246 &mut quote_cache,
2247 &mut funding_cache,
2248 clock,
2249 );
2250
2251 let event1 = rx.try_recv().unwrap();
2252 assert!(matches!(event1, DataEvent::Data(Data::MarkPriceUpdate(_))));
2253
2254 let event2 = rx.try_recv().unwrap();
2255 assert!(matches!(event2, DataEvent::Data(Data::IndexPriceUpdate(_))));
2256 }
2257
2258 #[rstest]
2259 fn test_handle_reconnected_clears_caches() {
2260 let instruments = AHashMap::new();
2261 let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2262 empty_subs();
2263 let mut quote_cache = AHashMap::new();
2264 let mut funding_cache = AHashMap::new();
2265 let clock = get_atomic_clock_realtime();
2266
2267 let instrument_id = InstrumentId::from("BTCUSDT-LINEAR.BYBIT");
2268 quote_cache.insert(
2269 instrument_id,
2270 QuoteTick::new(
2271 instrument_id,
2272 Price::from("100.00"),
2273 Price::from("101.00"),
2274 Quantity::from("1.0"),
2275 Quantity::from("1.0"),
2276 UnixNanos::default(),
2277 UnixNanos::default(),
2278 ),
2279 );
2280 funding_cache.insert(
2281 Ustr::from("BTCUSDT"),
2282 (
2283 Some("-0.001".to_string()),
2284 Some("1000".to_string()),
2285 Some("8".to_string()),
2286 ),
2287 );
2288
2289 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
2290
2291 handle_ws_message(
2292 &BybitWsMessage::Reconnected,
2293 &tx,
2294 &instruments,
2295 None,
2296 &trade_subs,
2297 &ticker_subs,
2298 "e_depths,
2299 &book_depths,
2300 &greeks_subs,
2301 &bar_types,
2302 &mut quote_cache,
2303 &mut funding_cache,
2304 clock,
2305 );
2306
2307 assert!(quote_cache.is_empty());
2308 assert!(funding_cache.is_empty());
2309 }
2310
2311 #[rstest]
2312 fn test_handle_ticker_option_greeks() {
2313 let instrument = option_instrument();
2316 let instrument_id = instrument.id();
2317
2318 let ticker_key = Ustr::from("BTC-6JAN23-17500-C-OPTION");
2320 let mut instruments = AHashMap::new();
2321 instruments.insert(ticker_key, instrument);
2322
2323 let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2324 empty_subs();
2325 greeks_subs.insert(instrument_id);
2326
2327 let mut quote_cache = AHashMap::new();
2328 let mut funding_cache = AHashMap::new();
2329 let clock = get_atomic_clock_realtime();
2330
2331 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2332
2333 let json = load_test_json("ws_ticker_option.json");
2334 let msg: BybitWsTickerOptionMsg = serde_json::from_str(&json).unwrap();
2335 let ws_msg = BybitWsMessage::TickerOption(msg);
2336
2337 handle_ws_message(
2338 &ws_msg,
2339 &tx,
2340 &instruments,
2341 Some(BybitProductType::Option),
2342 &trade_subs,
2343 &ticker_subs,
2344 "e_depths,
2345 &book_depths,
2346 &greeks_subs,
2347 &bar_types,
2348 &mut quote_cache,
2349 &mut funding_cache,
2350 clock,
2351 );
2352
2353 let event = rx.try_recv().unwrap();
2354 assert!(matches!(event, DataEvent::OptionGreeks(_)));
2355 }
2356
2357 #[rstest]
2358 fn test_handle_execution_message_ignored_by_data() {
2359 let instruments = AHashMap::new();
2360 let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2361 empty_subs();
2362 let mut quote_cache = AHashMap::new();
2363 let mut funding_cache = AHashMap::new();
2364 let clock = get_atomic_clock_realtime();
2365
2366 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2367
2368 let json = load_test_json("ws_account_order.json");
2369 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
2370 serde_json::from_str(&json).unwrap();
2371 let ws_msg = BybitWsMessage::AccountOrder(msg);
2372
2373 handle_ws_message(
2374 &ws_msg,
2375 &tx,
2376 &instruments,
2377 None,
2378 &trade_subs,
2379 &ticker_subs,
2380 "e_depths,
2381 &book_depths,
2382 &greeks_subs,
2383 &bar_types,
2384 &mut quote_cache,
2385 &mut funding_cache,
2386 clock,
2387 );
2388
2389 assert!(rx.try_recv().is_err());
2390 }
2391
2392 #[rstest]
2393 fn test_instrument_resolution_with_product_type() {
2394 let instrument = linear_instrument();
2395
2396 let mut map = AHashMap::new();
2397 map.insert(instrument.id().symbol.inner(), instrument.clone());
2398
2399 let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2400 empty_subs();
2401 trade_subs.insert(instrument.id());
2402 let mut quote_cache = AHashMap::new();
2403 let mut funding_cache = AHashMap::new();
2404 let clock = get_atomic_clock_realtime();
2405 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2406
2407 let json = load_test_json("ws_public_trade.json");
2408 let msg: BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
2409
2410 handle_ws_message(
2412 &BybitWsMessage::Trade(msg.clone()),
2413 &tx,
2414 &map,
2415 None,
2416 &trade_subs,
2417 &ticker_subs,
2418 "e_depths,
2419 &book_depths,
2420 &greeks_subs,
2421 &bar_types,
2422 &mut quote_cache,
2423 &mut funding_cache,
2424 clock,
2425 );
2426 assert!(rx.try_recv().is_err());
2427
2428 handle_ws_message(
2430 &BybitWsMessage::Trade(msg),
2431 &tx,
2432 &map,
2433 Some(BybitProductType::Linear),
2434 &trade_subs,
2435 &ticker_subs,
2436 "e_depths,
2437 &book_depths,
2438 &greeks_subs,
2439 &bar_types,
2440 &mut quote_cache,
2441 &mut funding_cache,
2442 clock,
2443 );
2444
2445 let event = rx.try_recv().unwrap();
2446 assert!(matches!(event, DataEvent::Data(Data::Trade(_))));
2447 }
2448
2449 #[rstest]
2450 fn test_handle_trade_filters_by_subscription() {
2451 let instrument = linear_instrument();
2452 let instruments = build_instruments(std::slice::from_ref(&instrument));
2453 let (trade_subs, ticker_subs, quote_depths, book_depths, greeks_subs, bar_types) =
2454 empty_subs();
2455 let mut quote_cache = AHashMap::new();
2456 let mut funding_cache = AHashMap::new();
2457 let clock = get_atomic_clock_realtime();
2458 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
2459
2460 let json = load_test_json("ws_public_trade.json");
2461 let msg: BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
2462
2463 handle_ws_message(
2465 &BybitWsMessage::Trade(msg.clone()),
2466 &tx,
2467 &instruments,
2468 Some(BybitProductType::Linear),
2469 &trade_subs,
2470 &ticker_subs,
2471 "e_depths,
2472 &book_depths,
2473 &greeks_subs,
2474 &bar_types,
2475 &mut quote_cache,
2476 &mut funding_cache,
2477 clock,
2478 );
2479 assert!(rx.try_recv().is_err());
2480
2481 trade_subs.insert(instrument.id());
2483 handle_ws_message(
2484 &BybitWsMessage::Trade(msg),
2485 &tx,
2486 &instruments,
2487 Some(BybitProductType::Linear),
2488 &trade_subs,
2489 &ticker_subs,
2490 "e_depths,
2491 &book_depths,
2492 &greeks_subs,
2493 &bar_types,
2494 &mut quote_cache,
2495 &mut funding_cache,
2496 clock,
2497 );
2498 let event = rx.try_recv().unwrap();
2499 assert!(matches!(event, DataEvent::Data(Data::Trade(_))));
2500 }
2501}