1use std::{
19 future::Future,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use futures_util::StreamExt;
29use nautilus_common::{
30 cache::quote::QuoteCache,
31 clients::DataClient,
32 live::{runner::get_data_event_sender, runtime::get_runtime},
33 messages::{
34 DataEvent,
35 data::{
36 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
37 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
38 SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
39 SubscribeInstrument, SubscribeInstrumentStatus, SubscribeInstruments,
40 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
41 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeFundingRates,
42 UnsubscribeIndexPrices, UnsubscribeInstrumentStatus, UnsubscribeMarkPrices,
43 UnsubscribeQuotes, UnsubscribeTrades,
44 },
45 },
46};
47use nautilus_core::{
48 AtomicMap, UnixNanos,
49 datetime::datetime_to_unix_nanos,
50 time::{AtomicTime, get_atomic_clock_realtime},
51};
52use nautilus_model::{
53 data::{Data, InstrumentStatus},
54 enums::{BookType, MarketStatusAction},
55 identifiers::{ClientId, InstrumentId, Venue},
56 instruments::{Instrument, InstrumentAny},
57 types::Price,
58};
59use tokio::{task::JoinHandle, time::Duration};
60use tokio_util::sync::CancellationToken;
61use ustr::Ustr;
62
63use crate::{
64 common::{
65 consts::BITMEX_VENUE,
66 enums::BitmexInstrumentState,
67 parse::{
68 parse_contracts_quantity, parse_instrument_id, parse_optional_datetime_to_unix_nanos,
69 },
70 },
71 config::BitmexDataClientConfig,
72 http::{
73 client::BitmexHttpClient,
74 parse::{InstrumentParseResult, parse_instrument_any},
75 },
76 websocket::{
77 client::BitmexWebSocketClient,
78 enums::{BitmexAction, BitmexBookChannel, BitmexWsTopic},
79 messages::{BitmexQuoteMsg, BitmexTableMessage, BitmexWsMessage},
80 parse::{
81 parse_book_msg_vec, parse_book10_msg_vec, parse_funding_msg, parse_instrument_msg,
82 parse_trade_bin_msg_vec, parse_trade_msg_vec,
83 },
84 },
85};
86
87#[derive(Debug)]
88pub struct BitmexDataClient {
89 client_id: ClientId,
90 clock: &'static AtomicTime,
91 config: BitmexDataClientConfig,
92 http_client: BitmexHttpClient,
93 ws_client: Option<BitmexWebSocketClient>,
94 is_connected: AtomicBool,
95 cancellation_token: CancellationToken,
96 tasks: Vec<JoinHandle<()>>,
97 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
98 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
99 book_channels: Arc<AtomicMap<InstrumentId, BitmexBookChannel>>,
100 instrument_refresh_active: bool,
101}
102
103impl BitmexDataClient {
104 pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
110 let clock = get_atomic_clock_realtime();
111 let data_sender = get_data_event_sender();
112
113 let http_client = BitmexHttpClient::new(
114 Some(config.http_base_url()),
115 config.api_key.clone(),
116 config.api_secret.clone(),
117 config.environment,
118 config.http_timeout_secs,
119 config.max_retries,
120 config.retry_delay_initial_ms,
121 config.retry_delay_max_ms,
122 config.recv_window_ms,
123 config.max_requests_per_second,
124 config.max_requests_per_minute,
125 config.proxy_url.clone(),
126 )
127 .context("failed to construct BitMEX HTTP client")?;
128
129 Ok(Self {
130 client_id,
131 clock,
132 config,
133 http_client,
134 ws_client: None,
135 is_connected: AtomicBool::new(false),
136 cancellation_token: CancellationToken::new(),
137 tasks: Vec::new(),
138 data_sender,
139 instruments: Arc::new(AtomicMap::new()),
140 book_channels: Arc::new(AtomicMap::new()),
141 instrument_refresh_active: false,
142 })
143 }
144
145 fn venue(&self) -> Venue {
146 *BITMEX_VENUE
147 }
148
149 fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
150 self.ws_client
151 .as_ref()
152 .context("websocket client not initialized; call connect first")
153 }
154
155 fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
156 self.ws_client
157 .as_mut()
158 .context("websocket client not initialized; call connect first")
159 }
160
161 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
162 if let Err(e) = sender.send(DataEvent::Data(data)) {
163 log::error!("Failed to emit data event: {e}");
164 }
165 }
166
167 fn spawn_ws<F>(&self, fut: F, context: &'static str)
168 where
169 F: Future<Output = anyhow::Result<()>> + Send + 'static,
170 {
171 get_runtime().spawn(async move {
172 if let Err(e) = fut.await {
173 log::error!("{context}: {e:?}");
174 }
175 });
176 }
177
178 fn spawn_stream_task(
179 &mut self,
180 stream: impl futures_util::Stream<Item = BitmexWsMessage> + Send + 'static,
181 ) {
182 let data_sender = self.data_sender.clone();
183 let instruments = Arc::clone(&self.instruments);
184 let cancellation = self.cancellation_token.clone();
185 let clock = self.clock;
186
187 let instruments_by_symbol: AHashMap<Ustr, InstrumentAny> = {
188 let guard = instruments.load();
189 guard
190 .values()
191 .map(|inst| (inst.symbol().inner(), inst.clone()))
192 .collect()
193 };
194
195 let handle = get_runtime().spawn(async move {
196 tokio::pin!(stream);
197 let mut quote_cache = QuoteCache::new();
198 let mut insts_by_symbol = instruments_by_symbol;
199
200 loop {
201 tokio::select! {
202 maybe_msg = stream.next() => {
203 match maybe_msg {
204 Some(msg) => Self::handle_ws_message(
205 clock.get_time_ns(),
206 msg,
207 &data_sender,
208 &instruments,
209 &mut insts_by_symbol,
210 &mut quote_cache,
211 ),
212 None => {
213 log::debug!("BitMEX websocket stream ended");
214 break;
215 }
216 }
217 }
218 () = cancellation.cancelled() => {
219 log::debug!("BitMEX websocket stream task cancelled");
220 break;
221 }
222 }
223 }
224 });
225
226 self.tasks.push(handle);
227 }
228
229 fn handle_ws_message(
230 ts_init: UnixNanos,
231 message: BitmexWsMessage,
232 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
233 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
234 instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
235 quote_cache: &mut QuoteCache,
236 ) {
237 match message {
238 BitmexWsMessage::Table(table_msg) => {
239 match table_msg {
240 BitmexTableMessage::OrderBookL2 { action, data }
241 | BitmexTableMessage::OrderBookL2_25 { action, data } => {
242 if !data.is_empty() {
243 let parsed =
244 parse_book_msg_vec(data, action, instruments_by_symbol, ts_init);
245
246 for d in parsed {
247 Self::send_data(sender, d);
248 }
249 }
250 }
251 BitmexTableMessage::OrderBook10 { data, .. } => {
252 if !data.is_empty() {
253 let parsed = parse_book10_msg_vec(data, instruments_by_symbol, ts_init);
254 for d in parsed {
255 Self::send_data(sender, d);
256 }
257 }
258 }
259 BitmexTableMessage::Quote { data, .. } => {
260 handle_quote_messages(
261 data,
262 instruments_by_symbol,
263 quote_cache,
264 ts_init,
265 sender,
266 );
267 }
268 BitmexTableMessage::Trade { data, .. } => {
269 if !data.is_empty() {
270 let parsed = parse_trade_msg_vec(data, instruments_by_symbol, ts_init);
271 for d in parsed {
272 Self::send_data(sender, d);
273 }
274 }
275 }
276 BitmexTableMessage::TradeBin1m { action, data } => {
277 if action != BitmexAction::Partial && !data.is_empty() {
278 let parsed = parse_trade_bin_msg_vec(
279 data,
280 &BitmexWsTopic::TradeBin1m,
281 instruments_by_symbol,
282 ts_init,
283 );
284
285 for d in parsed {
286 Self::send_data(sender, d);
287 }
288 }
289 }
290 BitmexTableMessage::TradeBin5m { action, data } => {
291 if action != BitmexAction::Partial && !data.is_empty() {
292 let parsed = parse_trade_bin_msg_vec(
293 data,
294 &BitmexWsTopic::TradeBin5m,
295 instruments_by_symbol,
296 ts_init,
297 );
298
299 for d in parsed {
300 Self::send_data(sender, d);
301 }
302 }
303 }
304 BitmexTableMessage::TradeBin1h { action, data } => {
305 if action != BitmexAction::Partial && !data.is_empty() {
306 let parsed = parse_trade_bin_msg_vec(
307 data,
308 &BitmexWsTopic::TradeBin1h,
309 instruments_by_symbol,
310 ts_init,
311 );
312
313 for d in parsed {
314 Self::send_data(sender, d);
315 }
316 }
317 }
318 BitmexTableMessage::TradeBin1d { action, data } => {
319 if action != BitmexAction::Partial && !data.is_empty() {
320 let parsed = parse_trade_bin_msg_vec(
321 data,
322 &BitmexWsTopic::TradeBin1d,
323 instruments_by_symbol,
324 ts_init,
325 );
326
327 for d in parsed {
328 Self::send_data(sender, d);
329 }
330 }
331 }
332 BitmexTableMessage::Instrument { action, data } => {
333 Self::handle_instrument_msg(
334 action,
335 data,
336 ts_init,
337 sender,
338 instruments,
339 instruments_by_symbol,
340 );
341 }
342 BitmexTableMessage::Funding { data, .. } => {
343 for msg in data {
344 let update = parse_funding_msg(&msg, ts_init);
345 log::debug!(
346 "Funding rate update: instrument={}, rate={}",
347 update.instrument_id,
348 update.rate,
349 );
350
351 if let Err(e) = sender.send(DataEvent::FundingRate(update)) {
352 log::error!("Failed to emit funding rate event: {e}");
353 }
354 }
355 }
356 BitmexTableMessage::Order { .. }
358 | BitmexTableMessage::Execution { .. }
359 | BitmexTableMessage::Position { .. }
360 | BitmexTableMessage::Wallet { .. }
361 | BitmexTableMessage::Margin { .. } => {
362 log::debug!("Ignoring trading message on data client");
363 }
364 _ => {
365 log::warn!("Unhandled table message type on data client");
366 }
367 }
368 }
369 BitmexWsMessage::Reconnected => {
370 quote_cache.clear();
371 log::info!("BitMEX websocket reconnected");
372 }
373 BitmexWsMessage::Authenticated => {
374 log::debug!("BitMEX websocket authenticated");
375 }
376 }
377 }
378
379 fn handle_instrument_msg(
380 action: BitmexAction,
381 data: Vec<crate::websocket::messages::BitmexInstrumentMsg>,
382 ts_init: UnixNanos,
383 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
384 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
385 instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
386 ) {
387 match action {
388 BitmexAction::Partial | BitmexAction::Insert => {
389 let mut new_instruments = Vec::with_capacity(data.len());
390 let mut temp_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
391
392 let data_for_prices = data.clone();
393
394 for msg in data {
395 match msg.try_into() {
396 Ok(http_inst) => match parse_instrument_any(&http_inst, ts_init) {
397 InstrumentParseResult::Ok(boxed) => {
398 let instrument_any = *boxed;
399 let symbol = instrument_any.symbol().inner();
400 temp_cache.insert(symbol, instrument_any.clone());
401 new_instruments.push(instrument_any);
402 }
403 InstrumentParseResult::Unsupported { .. }
404 | InstrumentParseResult::Inactive { .. } => {}
405 InstrumentParseResult::Failed {
406 symbol,
407 instrument_type,
408 error,
409 } => {
410 log::warn!(
411 "Failed to parse instrument {symbol} ({instrument_type:?}): {error}"
412 );
413 }
414 },
415 Err(e) => {
416 log::debug!("Skipping instrument (missing required fields): {e}");
417 }
418 }
419 }
420
421 instruments.rcu(|m| {
422 for inst in &new_instruments {
423 m.insert(inst.id(), inst.clone());
424 }
425 });
426
427 for (symbol, inst) in &temp_cache {
428 instruments_by_symbol.insert(*symbol, inst.clone());
429 }
430
431 for inst in new_instruments {
432 if let Err(e) = sender.send(DataEvent::Instrument(inst)) {
433 log::error!("Failed to send instrument event: {e}");
434 }
435 }
436
437 for msg in data_for_prices {
438 for d in parse_instrument_msg(&msg, &temp_cache, ts_init) {
439 Self::send_data(sender, d);
440 }
441 }
442 }
443 BitmexAction::Update => {
444 for msg in &data {
445 if let Some(state_str) = &msg.state
446 && let Ok(state) = serde_json::from_str::<BitmexInstrumentState>(&format!(
447 "\"{state_str}\""
448 ))
449 {
450 let instrument_id = parse_instrument_id(msg.symbol);
451 let action = MarketStatusAction::from(&state);
452 let is_trading = Some(state == BitmexInstrumentState::Open);
453 let ts_event = parse_optional_datetime_to_unix_nanos(
454 &Some(msg.timestamp),
455 "timestamp",
456 );
457 let status = InstrumentStatus::new(
458 instrument_id,
459 action,
460 ts_event,
461 ts_init,
462 None,
463 None,
464 is_trading,
465 None,
466 None,
467 );
468
469 if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
470 log::error!("Failed to send instrument status: {e}");
471 }
472 }
473 }
474
475 for msg in data {
477 for d in parse_instrument_msg(&msg, instruments_by_symbol, ts_init) {
478 Self::send_data(sender, d);
479 }
480 }
481 }
482 BitmexAction::Delete => {
483 log::info!(
484 "Received instrument delete action for {} instrument(s)",
485 data.len(),
486 );
487 }
488 }
489 }
490
491 async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
492 let http = self.http_client.clone();
493 let mut instruments = http
494 .request_instruments(self.config.active_only)
495 .await
496 .context("failed to request BitMEX instruments")?;
497
498 instruments.sort_by_key(|instrument| instrument.id());
499
500 self.instruments.rcu(|m| {
501 m.clear();
502 for instrument in &instruments {
503 m.insert(instrument.id(), instrument.clone());
504 }
505 });
506
507 self.http_client.cache_instruments(&instruments);
508
509 if let Some(ws) = &self.ws_client {
510 ws.cache_instruments(&instruments);
511 }
512
513 for instrument in &instruments {
514 if let Err(e) = self
515 .data_sender
516 .send(DataEvent::Instrument(instrument.clone()))
517 {
518 log::warn!(
519 "Failed to send instrument event for {}: {e}",
520 instrument.id()
521 );
522 }
523 }
524
525 Ok(instruments)
526 }
527
528 fn is_connected(&self) -> bool {
529 self.is_connected.load(Ordering::Relaxed)
530 }
531
532 fn is_disconnected(&self) -> bool {
533 !self.is_connected()
534 }
535
536 fn maybe_spawn_instrument_refresh(&mut self) {
537 let Some(minutes) = self.config.update_instruments_interval_mins else {
538 return;
539 };
540
541 if minutes == 0 || self.instrument_refresh_active {
542 return;
543 }
544
545 let interval_secs = minutes.saturating_mul(60);
546 if interval_secs == 0 {
547 return;
548 }
549
550 let interval = Duration::from_secs(interval_secs);
551 let cancellation = self.cancellation_token.clone();
552 let instruments_cache = Arc::clone(&self.instruments);
553 let active_only = self.config.active_only;
554 let client_id = self.client_id;
555 let http_client = self.http_client.clone();
556
557 let handle = get_runtime().spawn(async move {
558 let http_client = http_client;
559
560 loop {
561 let sleep = tokio::time::sleep(interval);
562 tokio::pin!(sleep);
563 tokio::select! {
564 () = cancellation.cancelled() => {
565 log::debug!("BitMEX instrument refresh task cancelled");
566 break;
567 }
568 () = &mut sleep => {
569 match http_client.request_instruments(active_only).await {
570 Ok(mut instruments) => {
571 instruments.sort_by_key(|instrument| instrument.id());
572
573 instruments_cache.rcu(|m| {
574 m.clear();
575 for instrument in &instruments {
576 m.insert(instrument.id(), instrument.clone());
577 }
578 });
579
580 http_client.cache_instruments(&instruments);
581
582 log::debug!("BitMEX instruments refreshed: client_id={client_id}");
583 }
584 Err(e) => {
585 log::warn!("Failed to refresh BitMEX instruments: client_id={client_id}, error={e:?}");
586 }
587 }
588 }
589 }
590 }
591 });
592
593 self.tasks.push(handle);
594 self.instrument_refresh_active = true;
595 }
596}
597
598#[async_trait::async_trait(?Send)]
599impl DataClient for BitmexDataClient {
600 fn client_id(&self) -> ClientId {
601 self.client_id
602 }
603
604 fn venue(&self) -> Option<Venue> {
605 Some(self.venue())
606 }
607
608 fn start(&mut self) -> anyhow::Result<()> {
609 log::info!(
610 "Starting BitMEX data client: client_id={}, environment={}, proxy_url={:?}",
611 self.client_id,
612 self.config.environment,
613 self.config.proxy_url,
614 );
615 Ok(())
616 }
617
618 fn stop(&mut self) -> anyhow::Result<()> {
619 log::info!("Stopping BitMEX data client {id}", id = self.client_id);
620 self.cancellation_token.cancel();
621 self.is_connected.store(false, Ordering::Relaxed);
622 self.instrument_refresh_active = false;
623 Ok(())
624 }
625
626 fn reset(&mut self) -> anyhow::Result<()> {
627 log::debug!("Resetting BitMEX data client {id}", id = self.client_id);
628 self.is_connected.store(false, Ordering::Relaxed);
629 self.cancellation_token = CancellationToken::new();
630 self.tasks.clear();
631 self.book_channels.store(AHashMap::new());
632 self.instrument_refresh_active = false;
633 Ok(())
634 }
635
636 fn dispose(&mut self) -> anyhow::Result<()> {
637 self.stop()
638 }
639
640 async fn connect(&mut self) -> anyhow::Result<()> {
641 if self.is_connected() {
642 return Ok(());
643 }
644
645 if self.ws_client.is_none() {
646 let ws = BitmexWebSocketClient::new_with_env(
647 Some(self.config.ws_url()),
648 self.config.api_key.clone(),
649 self.config.api_secret.clone(),
650 None,
651 self.config.heartbeat_interval_secs.unwrap_or(5),
652 self.config.environment,
653 self.config.transport_backend,
654 self.config.proxy_url.clone(),
655 )
656 .context("failed to construct BitMEX websocket client")?;
657 self.ws_client = Some(ws);
658 }
659
660 self.bootstrap_instruments().await?;
661
662 let ws = self.ws_client_mut()?;
663 ws.connect()
664 .await
665 .context("failed to connect BitMEX websocket")?;
666 ws.wait_until_active(10.0)
667 .await
668 .context("BitMEX websocket did not become active")?;
669
670 let stream = ws.stream();
671 self.spawn_stream_task(stream);
672 self.maybe_spawn_instrument_refresh();
673
674 self.is_connected.store(true, Ordering::Relaxed);
675 log::info!("Connected");
676 Ok(())
677 }
678
679 async fn disconnect(&mut self) -> anyhow::Result<()> {
680 if self.is_disconnected() {
681 return Ok(());
682 }
683
684 self.cancellation_token.cancel();
685
686 if let Some(ws) = self.ws_client.as_mut()
687 && let Err(e) = ws.close().await
688 {
689 log::warn!("Error while closing BitMEX websocket: {e:?}");
690 }
691
692 for handle in self.tasks.drain(..) {
693 if let Err(e) = handle.await {
694 log::error!("Error joining websocket task: {e:?}");
695 }
696 }
697
698 self.cancellation_token = CancellationToken::new();
699 self.is_connected.store(false, Ordering::Relaxed);
700 self.book_channels.store(AHashMap::new());
701 self.instrument_refresh_active = false;
702
703 log::info!("Disconnected");
704 Ok(())
705 }
706
707 fn is_connected(&self) -> bool {
708 self.is_connected()
709 }
710
711 fn is_disconnected(&self) -> bool {
712 self.is_disconnected()
713 }
714
715 fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
716 let ws = self.ws_client()?.clone();
717
718 self.spawn_ws(
719 async move {
720 ws.subscribe_instruments()
721 .await
722 .map_err(|e| anyhow::anyhow!(e))
723 },
724 "BitMEX instruments subscription",
725 );
726 Ok(())
727 }
728
729 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
730 let instrument_id = cmd.instrument_id;
731
732 if let Some(instrument) = self.instruments.load().get(&instrument_id).cloned() {
733 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
734 log::error!("Failed to send instrument event for {instrument_id}: {e}");
735 }
736 return Ok(());
737 }
738
739 log::warn!("Instrument {instrument_id} not found in BitMEX cache");
740
741 let ws = self.ws_client()?.clone();
742 self.spawn_ws(
743 async move {
744 ws.subscribe_instrument(instrument_id)
745 .await
746 .map_err(|e| anyhow::anyhow!(e))
747 },
748 "BitMEX instrument subscription",
749 );
750
751 Ok(())
752 }
753
754 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
755 if cmd.book_type != BookType::L2_MBP {
756 anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
757 }
758
759 let instrument_id = cmd.instrument_id;
760 let depth = cmd.depth.map_or(0, |d| d.get());
761 let channel = if depth > 0 && depth <= 25 {
762 if depth != 25 {
763 log::info!(
764 "BitMEX only supports depth 25 for L2 deltas, using L2_25 for requested depth {depth}"
765 );
766 }
767 BitmexBookChannel::OrderBookL2_25
768 } else {
769 BitmexBookChannel::OrderBookL2
770 };
771
772 let ws = self.ws_client()?.clone();
773 let book_channels = Arc::clone(&self.book_channels);
774
775 self.spawn_ws(
776 async move {
777 match channel {
778 BitmexBookChannel::OrderBookL2 => ws
779 .subscribe_book(instrument_id)
780 .await
781 .map_err(|e| anyhow::anyhow!(e))?,
782 BitmexBookChannel::OrderBookL2_25 => ws
783 .subscribe_book_25(instrument_id)
784 .await
785 .map_err(|e| anyhow::anyhow!(e))?,
786 BitmexBookChannel::OrderBook10 => unreachable!(),
787 }
788 book_channels.insert(instrument_id, channel);
789 Ok(())
790 },
791 "BitMEX book delta subscription",
792 );
793
794 Ok(())
795 }
796
797 fn subscribe_book_depth10(&mut self, cmd: SubscribeBookDepth10) -> anyhow::Result<()> {
798 let instrument_id = cmd.instrument_id;
799 let ws = self.ws_client()?.clone();
800 let book_channels = Arc::clone(&self.book_channels);
801
802 self.spawn_ws(
803 async move {
804 ws.subscribe_book_depth10(instrument_id)
805 .await
806 .map_err(|e| anyhow::anyhow!(e))?;
807 book_channels.insert(instrument_id, BitmexBookChannel::OrderBook10);
808 Ok(())
809 },
810 "BitMEX book depth10 subscription",
811 );
812 Ok(())
813 }
814
815 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
816 let instrument_id = cmd.instrument_id;
817 let ws = self.ws_client()?.clone();
818
819 self.spawn_ws(
820 async move {
821 ws.subscribe_quotes(instrument_id)
822 .await
823 .map_err(|e| anyhow::anyhow!(e))
824 },
825 "BitMEX quote subscription",
826 );
827 Ok(())
828 }
829
830 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
831 let instrument_id = cmd.instrument_id;
832 let ws = self.ws_client()?.clone();
833
834 self.spawn_ws(
835 async move {
836 ws.subscribe_trades(instrument_id)
837 .await
838 .map_err(|e| anyhow::anyhow!(e))
839 },
840 "BitMEX trade subscription",
841 );
842 Ok(())
843 }
844
845 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
846 let instrument_id = cmd.instrument_id;
847 let ws = self.ws_client()?.clone();
848
849 self.spawn_ws(
850 async move {
851 ws.subscribe_mark_prices(instrument_id)
852 .await
853 .map_err(|e| anyhow::anyhow!(e))
854 },
855 "BitMEX mark price subscription",
856 );
857 Ok(())
858 }
859
860 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
861 let instrument_id = cmd.instrument_id;
862 let ws = self.ws_client()?.clone();
863
864 self.spawn_ws(
865 async move {
866 ws.subscribe_index_prices(instrument_id)
867 .await
868 .map_err(|e| anyhow::anyhow!(e))
869 },
870 "BitMEX index price subscription",
871 );
872 Ok(())
873 }
874
875 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
876 let instrument_id = cmd.instrument_id;
877 let ws = self.ws_client()?.clone();
878
879 self.spawn_ws(
880 async move {
881 ws.subscribe_funding_rates(instrument_id)
882 .await
883 .map_err(|e| anyhow::anyhow!(e))
884 },
885 "BitMEX funding rate subscription",
886 );
887 Ok(())
888 }
889
890 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
891 let bar_type = cmd.bar_type;
892 let ws = self.ws_client()?.clone();
893
894 self.spawn_ws(
895 async move {
896 ws.subscribe_bars(bar_type)
897 .await
898 .map_err(|e| anyhow::anyhow!(e))
899 },
900 "BitMEX bar subscription",
901 );
902 Ok(())
903 }
904
905 fn subscribe_instrument_status(
906 &mut self,
907 cmd: SubscribeInstrumentStatus,
908 ) -> anyhow::Result<()> {
909 let instrument_id = cmd.instrument_id;
910 let ws = self.ws_client()?.clone();
911
912 self.spawn_ws(
913 async move {
914 ws.subscribe_instrument(instrument_id)
915 .await
916 .map_err(|e| anyhow::anyhow!(e))
917 },
918 "BitMEX instrument status subscription",
919 );
920 Ok(())
921 }
922
923 fn unsubscribe_instrument_status(
924 &mut self,
925 cmd: &UnsubscribeInstrumentStatus,
926 ) -> anyhow::Result<()> {
927 let instrument_id = cmd.instrument_id;
928 let ws = self.ws_client()?.clone();
929
930 self.spawn_ws(
931 async move {
932 ws.unsubscribe_instrument(instrument_id)
933 .await
934 .map_err(|e| anyhow::anyhow!(e))
935 },
936 "BitMEX instrument status unsubscribe",
937 );
938 Ok(())
939 }
940
941 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
942 let instrument_id = cmd.instrument_id;
943 let ws = self.ws_client()?.clone();
944 let book_channels = Arc::clone(&self.book_channels);
945
946 self.spawn_ws(
947 async move {
948 let channel = book_channels.load().get(&instrument_id).copied();
949 book_channels.remove(&instrument_id);
950
951 match channel {
952 Some(BitmexBookChannel::OrderBookL2) => ws
953 .unsubscribe_book(instrument_id)
954 .await
955 .map_err(|e| anyhow::anyhow!(e))?,
956 Some(BitmexBookChannel::OrderBookL2_25) => ws
957 .unsubscribe_book_25(instrument_id)
958 .await
959 .map_err(|e| anyhow::anyhow!(e))?,
960 Some(BitmexBookChannel::OrderBook10) => ws
961 .unsubscribe_book_depth10(instrument_id)
962 .await
963 .map_err(|e| anyhow::anyhow!(e))?,
964 None => ws
965 .unsubscribe_book(instrument_id)
966 .await
967 .map_err(|e| anyhow::anyhow!(e))?,
968 }
969 Ok(())
970 },
971 "BitMEX book delta unsubscribe",
972 );
973 Ok(())
974 }
975
976 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
977 let instrument_id = cmd.instrument_id;
978 let ws = self.ws_client()?.clone();
979 let book_channels = Arc::clone(&self.book_channels);
980
981 self.spawn_ws(
982 async move {
983 book_channels.remove(&instrument_id);
984 ws.unsubscribe_book_depth10(instrument_id)
985 .await
986 .map_err(|e| anyhow::anyhow!(e))
987 },
988 "BitMEX book depth10 unsubscribe",
989 );
990 Ok(())
991 }
992
993 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
994 let instrument_id = cmd.instrument_id;
995 let ws = self.ws_client()?.clone();
996
997 self.spawn_ws(
998 async move {
999 ws.unsubscribe_quotes(instrument_id)
1000 .await
1001 .map_err(|e| anyhow::anyhow!(e))
1002 },
1003 "BitMEX quote unsubscribe",
1004 );
1005 Ok(())
1006 }
1007
1008 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1009 let instrument_id = cmd.instrument_id;
1010 let ws = self.ws_client()?.clone();
1011
1012 self.spawn_ws(
1013 async move {
1014 ws.unsubscribe_trades(instrument_id)
1015 .await
1016 .map_err(|e| anyhow::anyhow!(e))
1017 },
1018 "BitMEX trade unsubscribe",
1019 );
1020 Ok(())
1021 }
1022
1023 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1024 let ws = self.ws_client()?.clone();
1025 let instrument_id = cmd.instrument_id;
1026
1027 self.spawn_ws(
1028 async move {
1029 ws.unsubscribe_mark_prices(instrument_id)
1030 .await
1031 .map_err(|e| anyhow::anyhow!(e))
1032 },
1033 "BitMEX mark price unsubscribe",
1034 );
1035 Ok(())
1036 }
1037
1038 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1039 let ws = self.ws_client()?.clone();
1040 let instrument_id = cmd.instrument_id;
1041
1042 self.spawn_ws(
1043 async move {
1044 ws.unsubscribe_index_prices(instrument_id)
1045 .await
1046 .map_err(|e| anyhow::anyhow!(e))
1047 },
1048 "BitMEX index price unsubscribe",
1049 );
1050 Ok(())
1051 }
1052
1053 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1054 let ws = self.ws_client()?.clone();
1055 let instrument_id = cmd.instrument_id;
1056
1057 self.spawn_ws(
1058 async move {
1059 ws.unsubscribe_funding_rates(instrument_id)
1060 .await
1061 .map_err(|e| anyhow::anyhow!(e))
1062 },
1063 "BitMEX funding rate unsubscribe",
1064 );
1065 Ok(())
1066 }
1067
1068 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1069 let bar_type = cmd.bar_type;
1070 let ws = self.ws_client()?.clone();
1071
1072 self.spawn_ws(
1073 async move {
1074 ws.unsubscribe_bars(bar_type)
1075 .await
1076 .map_err(|e| anyhow::anyhow!(e))
1077 },
1078 "BitMEX bar unsubscribe",
1079 );
1080 Ok(())
1081 }
1082
1083 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1084 if let Some(req_venue) = request.venue
1085 && req_venue != self.venue()
1086 {
1087 log::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
1088 }
1089 let venue = self.venue();
1090
1091 let http = self.http_client.clone();
1092 let instruments_cache = Arc::clone(&self.instruments);
1093 let sender = self.data_sender.clone();
1094 let request_id = request.request_id;
1095 let client_id = request.client_id.unwrap_or(self.client_id);
1096 let params = request.params;
1097 let start_nanos = datetime_to_unix_nanos(request.start);
1098 let end_nanos = datetime_to_unix_nanos(request.end);
1099 let clock = self.clock;
1100 let active_only = self.config.active_only;
1101
1102 get_runtime().spawn(async move {
1103 let http_client = http;
1104 match http_client
1105 .request_instruments(active_only)
1106 .await
1107 .context("failed to request instruments from BitMEX")
1108 {
1109 Ok(instruments) => {
1110 instruments_cache.rcu(|m| {
1111 m.clear();
1112 for instrument in &instruments {
1113 m.insert(instrument.id(), instrument.clone());
1114 }
1115 });
1116 http_client.cache_instruments(&instruments);
1117
1118 let response = DataResponse::Instruments(InstrumentsResponse::new(
1119 request_id,
1120 client_id,
1121 venue,
1122 instruments,
1123 start_nanos,
1124 end_nanos,
1125 clock.get_time_ns(),
1126 params,
1127 ));
1128
1129 if let Err(e) = sender.send(DataEvent::Response(response)) {
1130 log::error!("Failed to send instruments response: {e}");
1131 }
1132 }
1133 Err(e) => log::error!("Instrument request failed: {e:?}"),
1134 }
1135 });
1136
1137 Ok(())
1138 }
1139
1140 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1141 if let Some(instrument) = self.instruments.load().get(&request.instrument_id).cloned() {
1142 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1143 request.request_id,
1144 request.client_id.unwrap_or(self.client_id),
1145 instrument.id(),
1146 instrument,
1147 datetime_to_unix_nanos(request.start),
1148 datetime_to_unix_nanos(request.end),
1149 self.clock.get_time_ns(),
1150 request.params,
1151 )));
1152
1153 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
1154 log::error!("Failed to send instrument response: {e}");
1155 }
1156 return Ok(());
1157 }
1158
1159 let http_client = self.http_client.clone();
1160 let instruments_cache = Arc::clone(&self.instruments);
1161 let sender = self.data_sender.clone();
1162 let instrument_id = request.instrument_id;
1163 let request_id = request.request_id;
1164 let client_id = request.client_id.unwrap_or(self.client_id);
1165 let start = request.start;
1166 let end = request.end;
1167 let params = request.params;
1168 let clock = self.clock;
1169
1170 get_runtime().spawn(async move {
1171 match http_client
1172 .request_instrument(instrument_id)
1173 .await
1174 .context("failed to request instrument from BitMEX")
1175 {
1176 Ok(Some(instrument)) => {
1177 http_client.cache_instrument(instrument.clone());
1178 instruments_cache.insert(instrument.id(), instrument.clone());
1179
1180 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1181 request_id,
1182 client_id,
1183 instrument.id(),
1184 instrument,
1185 datetime_to_unix_nanos(start),
1186 datetime_to_unix_nanos(end),
1187 clock.get_time_ns(),
1188 params,
1189 )));
1190
1191 if let Err(e) = sender.send(DataEvent::Response(response)) {
1192 log::error!("Failed to send instrument response: {e}");
1193 }
1194 }
1195 Ok(None) => log::warn!("BitMEX instrument {instrument_id} not found"),
1196 Err(e) => log::error!("Instrument request failed: {e:?}"),
1197 }
1198 });
1199
1200 Ok(())
1201 }
1202
1203 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1204 let http = self.http_client.clone();
1205 let sender = self.data_sender.clone();
1206 let instrument_id = request.instrument_id;
1207 let start = request.start;
1208 let end = request.end;
1209 let limit = request.limit.map(|n| n.get() as u32);
1210 let request_id = request.request_id;
1211 let client_id = request.client_id.unwrap_or(self.client_id);
1212 let params = request.params;
1213 let clock = self.clock;
1214 let start_nanos = datetime_to_unix_nanos(start);
1215 let end_nanos = datetime_to_unix_nanos(end);
1216
1217 get_runtime().spawn(async move {
1218 match http
1219 .request_trades(instrument_id, start, end, limit)
1220 .await
1221 .context("failed to request trades from BitMEX")
1222 {
1223 Ok(trades) => {
1224 let response = DataResponse::Trades(TradesResponse::new(
1225 request_id,
1226 client_id,
1227 instrument_id,
1228 trades,
1229 start_nanos,
1230 end_nanos,
1231 clock.get_time_ns(),
1232 params,
1233 ));
1234
1235 if let Err(e) = sender.send(DataEvent::Response(response)) {
1236 log::error!("Failed to send trades response: {e}");
1237 }
1238 }
1239 Err(e) => log::error!("Trade request failed: {e:?}"),
1240 }
1241 });
1242
1243 Ok(())
1244 }
1245
1246 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1247 let http = self.http_client.clone();
1248 let sender = self.data_sender.clone();
1249 let bar_type = request.bar_type;
1250 let start = request.start;
1251 let end = request.end;
1252 let limit = request.limit.map(|n| n.get() as u32);
1253 let request_id = request.request_id;
1254 let client_id = request.client_id.unwrap_or(self.client_id);
1255 let params = request.params;
1256 let clock = self.clock;
1257 let start_nanos = datetime_to_unix_nanos(start);
1258 let end_nanos = datetime_to_unix_nanos(end);
1259
1260 get_runtime().spawn(async move {
1261 match http
1262 .request_bars(bar_type, start, end, limit, false)
1263 .await
1264 .context("failed to request bars from BitMEX")
1265 {
1266 Ok(bars) => {
1267 let response = DataResponse::Bars(BarsResponse::new(
1268 request_id,
1269 client_id,
1270 bar_type,
1271 bars,
1272 start_nanos,
1273 end_nanos,
1274 clock.get_time_ns(),
1275 params,
1276 ));
1277
1278 if let Err(e) = sender.send(DataEvent::Response(response)) {
1279 log::error!("Failed to send bars response: {e}");
1280 }
1281 }
1282 Err(e) => log::error!("Bar request failed: {e:?}"),
1283 }
1284 });
1285
1286 Ok(())
1287 }
1288}
1289
1290fn handle_quote_messages(
1291 data: Vec<BitmexQuoteMsg>,
1292 instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
1293 quote_cache: &mut QuoteCache,
1294 ts_init: UnixNanos,
1295 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1296) {
1297 for msg in data {
1298 let Some(instrument) = instruments_by_symbol.get(&msg.symbol) else {
1299 log::error!(
1300 "Instrument cache miss: quote dropped for symbol={}",
1301 msg.symbol,
1302 );
1303 continue;
1304 };
1305
1306 let instrument_id = instrument.id();
1307 let price_precision = instrument.price_precision();
1308
1309 let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
1310 let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
1311 let bid_size = msg
1312 .bid_size
1313 .map(|s| parse_contracts_quantity(s, instrument));
1314 let ask_size = msg
1315 .ask_size
1316 .map(|s| parse_contracts_quantity(s, instrument));
1317 let ts_event = UnixNanos::from(msg.timestamp);
1318
1319 match quote_cache.process(
1320 instrument_id,
1321 bid_price,
1322 ask_price,
1323 bid_size,
1324 ask_size,
1325 ts_event,
1326 ts_init,
1327 ) {
1328 Ok(quote) => {
1329 if let Err(e) = sender.send(DataEvent::Data(Data::Quote(quote))) {
1330 log::error!("Failed to emit data event: {e}");
1331 }
1332 }
1333 Err(e) => {
1334 log::warn!("Failed to process quote for {}: {e}", msg.symbol);
1335 }
1336 }
1337 }
1338}