Skip to main content

nautilus_bitmex/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the BitMEX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use futures_util::StreamExt;
29use nautilus_common::{
30    cache::quote::QuoteCache,
31    clients::DataClient,
32    live::{runner::get_data_event_sender, runtime::get_runtime},
33    messages::{
34        DataEvent,
35        data::{
36            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
37            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
38            SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
39            SubscribeInstrument, SubscribeInstrumentStatus, SubscribeInstruments,
40            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
41            UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeFundingRates,
42            UnsubscribeIndexPrices, UnsubscribeInstrumentStatus, UnsubscribeMarkPrices,
43            UnsubscribeQuotes, UnsubscribeTrades,
44        },
45    },
46};
47use nautilus_core::{
48    AtomicMap, UnixNanos,
49    datetime::datetime_to_unix_nanos,
50    time::{AtomicTime, get_atomic_clock_realtime},
51};
52use nautilus_model::{
53    data::{Data, InstrumentStatus},
54    enums::{BookType, MarketStatusAction},
55    identifiers::{ClientId, InstrumentId, Venue},
56    instruments::{Instrument, InstrumentAny},
57    types::Price,
58};
59use tokio::{task::JoinHandle, time::Duration};
60use tokio_util::sync::CancellationToken;
61use ustr::Ustr;
62
63use crate::{
64    common::{
65        consts::BITMEX_VENUE,
66        enums::BitmexInstrumentState,
67        parse::{
68            parse_contracts_quantity, parse_instrument_id, parse_optional_datetime_to_unix_nanos,
69        },
70    },
71    config::BitmexDataClientConfig,
72    http::{
73        client::BitmexHttpClient,
74        parse::{InstrumentParseResult, parse_instrument_any},
75    },
76    websocket::{
77        client::BitmexWebSocketClient,
78        enums::{BitmexAction, BitmexBookChannel, BitmexWsTopic},
79        messages::{BitmexQuoteMsg, BitmexTableMessage, BitmexWsMessage},
80        parse::{
81            parse_book_msg_vec, parse_book10_msg_vec, parse_funding_msg, parse_instrument_msg,
82            parse_trade_bin_msg_vec, parse_trade_msg_vec,
83        },
84    },
85};
86
87#[derive(Debug)]
88pub struct BitmexDataClient {
89    client_id: ClientId,
90    clock: &'static AtomicTime,
91    config: BitmexDataClientConfig,
92    http_client: BitmexHttpClient,
93    ws_client: Option<BitmexWebSocketClient>,
94    is_connected: AtomicBool,
95    cancellation_token: CancellationToken,
96    tasks: Vec<JoinHandle<()>>,
97    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
98    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
99    book_channels: Arc<AtomicMap<InstrumentId, BitmexBookChannel>>,
100    instrument_refresh_active: bool,
101}
102
103impl BitmexDataClient {
104    /// Creates a new [`BitmexDataClient`] instance.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if the HTTP client cannot be constructed.
109    pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
110        let clock = get_atomic_clock_realtime();
111        let data_sender = get_data_event_sender();
112
113        let http_client = BitmexHttpClient::new(
114            Some(config.http_base_url()),
115            config.api_key.clone(),
116            config.api_secret.clone(),
117            config.environment,
118            config.http_timeout_secs,
119            config.max_retries,
120            config.retry_delay_initial_ms,
121            config.retry_delay_max_ms,
122            config.recv_window_ms,
123            config.max_requests_per_second,
124            config.max_requests_per_minute,
125            config.proxy_url.clone(),
126        )
127        .context("failed to construct BitMEX HTTP client")?;
128
129        Ok(Self {
130            client_id,
131            clock,
132            config,
133            http_client,
134            ws_client: None,
135            is_connected: AtomicBool::new(false),
136            cancellation_token: CancellationToken::new(),
137            tasks: Vec::new(),
138            data_sender,
139            instruments: Arc::new(AtomicMap::new()),
140            book_channels: Arc::new(AtomicMap::new()),
141            instrument_refresh_active: false,
142        })
143    }
144
145    fn venue(&self) -> Venue {
146        *BITMEX_VENUE
147    }
148
149    fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
150        self.ws_client
151            .as_ref()
152            .context("websocket client not initialized; call connect first")
153    }
154
155    fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
156        self.ws_client
157            .as_mut()
158            .context("websocket client not initialized; call connect first")
159    }
160
161    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
162        if let Err(e) = sender.send(DataEvent::Data(data)) {
163            log::error!("Failed to emit data event: {e}");
164        }
165    }
166
167    fn spawn_ws<F>(&self, fut: F, context: &'static str)
168    where
169        F: Future<Output = anyhow::Result<()>> + Send + 'static,
170    {
171        get_runtime().spawn(async move {
172            if let Err(e) = fut.await {
173                log::error!("{context}: {e:?}");
174            }
175        });
176    }
177
178    fn spawn_stream_task(
179        &mut self,
180        stream: impl futures_util::Stream<Item = BitmexWsMessage> + Send + 'static,
181    ) {
182        let data_sender = self.data_sender.clone();
183        let instruments = Arc::clone(&self.instruments);
184        let cancellation = self.cancellation_token.clone();
185        let clock = self.clock;
186
187        let instruments_by_symbol: AHashMap<Ustr, InstrumentAny> = {
188            let guard = instruments.load();
189            guard
190                .values()
191                .map(|inst| (inst.symbol().inner(), inst.clone()))
192                .collect()
193        };
194
195        let handle = get_runtime().spawn(async move {
196            tokio::pin!(stream);
197            let mut quote_cache = QuoteCache::new();
198            let mut insts_by_symbol = instruments_by_symbol;
199
200            loop {
201                tokio::select! {
202                    maybe_msg = stream.next() => {
203                        match maybe_msg {
204                            Some(msg) => Self::handle_ws_message(
205                                clock.get_time_ns(),
206                                msg,
207                                &data_sender,
208                                &instruments,
209                                &mut insts_by_symbol,
210                                &mut quote_cache,
211                            ),
212                            None => {
213                                log::debug!("BitMEX websocket stream ended");
214                                break;
215                            }
216                        }
217                    }
218                    () = cancellation.cancelled() => {
219                        log::debug!("BitMEX websocket stream task cancelled");
220                        break;
221                    }
222                }
223            }
224        });
225
226        self.tasks.push(handle);
227    }
228
229    fn handle_ws_message(
230        ts_init: UnixNanos,
231        message: BitmexWsMessage,
232        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
233        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
234        instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
235        quote_cache: &mut QuoteCache,
236    ) {
237        match message {
238            BitmexWsMessage::Table(table_msg) => {
239                match table_msg {
240                    BitmexTableMessage::OrderBookL2 { action, data }
241                    | BitmexTableMessage::OrderBookL2_25 { action, data } => {
242                        if !data.is_empty() {
243                            let parsed =
244                                parse_book_msg_vec(data, action, instruments_by_symbol, ts_init);
245
246                            for d in parsed {
247                                Self::send_data(sender, d);
248                            }
249                        }
250                    }
251                    BitmexTableMessage::OrderBook10 { data, .. } => {
252                        if !data.is_empty() {
253                            let parsed = parse_book10_msg_vec(data, instruments_by_symbol, ts_init);
254                            for d in parsed {
255                                Self::send_data(sender, d);
256                            }
257                        }
258                    }
259                    BitmexTableMessage::Quote { data, .. } => {
260                        handle_quote_messages(
261                            data,
262                            instruments_by_symbol,
263                            quote_cache,
264                            ts_init,
265                            sender,
266                        );
267                    }
268                    BitmexTableMessage::Trade { data, .. } => {
269                        if !data.is_empty() {
270                            let parsed = parse_trade_msg_vec(data, instruments_by_symbol, ts_init);
271                            for d in parsed {
272                                Self::send_data(sender, d);
273                            }
274                        }
275                    }
276                    BitmexTableMessage::TradeBin1m { action, data } => {
277                        if action != BitmexAction::Partial && !data.is_empty() {
278                            let parsed = parse_trade_bin_msg_vec(
279                                data,
280                                &BitmexWsTopic::TradeBin1m,
281                                instruments_by_symbol,
282                                ts_init,
283                            );
284
285                            for d in parsed {
286                                Self::send_data(sender, d);
287                            }
288                        }
289                    }
290                    BitmexTableMessage::TradeBin5m { action, data } => {
291                        if action != BitmexAction::Partial && !data.is_empty() {
292                            let parsed = parse_trade_bin_msg_vec(
293                                data,
294                                &BitmexWsTopic::TradeBin5m,
295                                instruments_by_symbol,
296                                ts_init,
297                            );
298
299                            for d in parsed {
300                                Self::send_data(sender, d);
301                            }
302                        }
303                    }
304                    BitmexTableMessage::TradeBin1h { action, data } => {
305                        if action != BitmexAction::Partial && !data.is_empty() {
306                            let parsed = parse_trade_bin_msg_vec(
307                                data,
308                                &BitmexWsTopic::TradeBin1h,
309                                instruments_by_symbol,
310                                ts_init,
311                            );
312
313                            for d in parsed {
314                                Self::send_data(sender, d);
315                            }
316                        }
317                    }
318                    BitmexTableMessage::TradeBin1d { action, data } => {
319                        if action != BitmexAction::Partial && !data.is_empty() {
320                            let parsed = parse_trade_bin_msg_vec(
321                                data,
322                                &BitmexWsTopic::TradeBin1d,
323                                instruments_by_symbol,
324                                ts_init,
325                            );
326
327                            for d in parsed {
328                                Self::send_data(sender, d);
329                            }
330                        }
331                    }
332                    BitmexTableMessage::Instrument { action, data } => {
333                        Self::handle_instrument_msg(
334                            action,
335                            data,
336                            ts_init,
337                            sender,
338                            instruments,
339                            instruments_by_symbol,
340                        );
341                    }
342                    BitmexTableMessage::Funding { data, .. } => {
343                        for msg in data {
344                            let update = parse_funding_msg(&msg, ts_init);
345                            log::debug!(
346                                "Funding rate update: instrument={}, rate={}",
347                                update.instrument_id,
348                                update.rate,
349                            );
350
351                            if let Err(e) = sender.send(DataEvent::FundingRate(update)) {
352                                log::error!("Failed to emit funding rate event: {e}");
353                            }
354                        }
355                    }
356                    // Ignore execution-only tables on data client
357                    BitmexTableMessage::Order { .. }
358                    | BitmexTableMessage::Execution { .. }
359                    | BitmexTableMessage::Position { .. }
360                    | BitmexTableMessage::Wallet { .. }
361                    | BitmexTableMessage::Margin { .. } => {
362                        log::debug!("Ignoring trading message on data client");
363                    }
364                    _ => {
365                        log::warn!("Unhandled table message type on data client");
366                    }
367                }
368            }
369            BitmexWsMessage::Reconnected => {
370                quote_cache.clear();
371                log::info!("BitMEX websocket reconnected");
372            }
373            BitmexWsMessage::Authenticated => {
374                log::debug!("BitMEX websocket authenticated");
375            }
376        }
377    }
378
379    fn handle_instrument_msg(
380        action: BitmexAction,
381        data: Vec<crate::websocket::messages::BitmexInstrumentMsg>,
382        ts_init: UnixNanos,
383        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
384        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
385        instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
386    ) {
387        match action {
388            BitmexAction::Partial | BitmexAction::Insert => {
389                let mut new_instruments = Vec::with_capacity(data.len());
390                let mut temp_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
391
392                let data_for_prices = data.clone();
393
394                for msg in data {
395                    match msg.try_into() {
396                        Ok(http_inst) => match parse_instrument_any(&http_inst, ts_init) {
397                            InstrumentParseResult::Ok(boxed) => {
398                                let instrument_any = *boxed;
399                                let symbol = instrument_any.symbol().inner();
400                                temp_cache.insert(symbol, instrument_any.clone());
401                                new_instruments.push(instrument_any);
402                            }
403                            InstrumentParseResult::Unsupported { .. }
404                            | InstrumentParseResult::Inactive { .. } => {}
405                            InstrumentParseResult::Failed {
406                                symbol,
407                                instrument_type,
408                                error,
409                            } => {
410                                log::warn!(
411                                    "Failed to parse instrument {symbol} ({instrument_type:?}): {error}"
412                                );
413                            }
414                        },
415                        Err(e) => {
416                            log::debug!("Skipping instrument (missing required fields): {e}");
417                        }
418                    }
419                }
420
421                instruments.rcu(|m| {
422                    for inst in &new_instruments {
423                        m.insert(inst.id(), inst.clone());
424                    }
425                });
426
427                for (symbol, inst) in &temp_cache {
428                    instruments_by_symbol.insert(*symbol, inst.clone());
429                }
430
431                for inst in new_instruments {
432                    if let Err(e) = sender.send(DataEvent::Instrument(inst)) {
433                        log::error!("Failed to send instrument event: {e}");
434                    }
435                }
436
437                for msg in data_for_prices {
438                    for d in parse_instrument_msg(&msg, &temp_cache, ts_init) {
439                        Self::send_data(sender, d);
440                    }
441                }
442            }
443            BitmexAction::Update => {
444                for msg in &data {
445                    if let Some(state_str) = &msg.state
446                        && let Ok(state) = serde_json::from_str::<BitmexInstrumentState>(&format!(
447                            "\"{state_str}\""
448                        ))
449                    {
450                        let instrument_id = parse_instrument_id(msg.symbol);
451                        let action = MarketStatusAction::from(&state);
452                        let is_trading = Some(state == BitmexInstrumentState::Open);
453                        let ts_event = parse_optional_datetime_to_unix_nanos(
454                            &Some(msg.timestamp),
455                            "timestamp",
456                        );
457                        let status = InstrumentStatus::new(
458                            instrument_id,
459                            action,
460                            ts_event,
461                            ts_init,
462                            None,
463                            None,
464                            is_trading,
465                            None,
466                            None,
467                        );
468
469                        if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
470                            log::error!("Failed to send instrument status: {e}");
471                        }
472                    }
473                }
474
475                // Parse mark/index price data
476                for msg in data {
477                    for d in parse_instrument_msg(&msg, instruments_by_symbol, ts_init) {
478                        Self::send_data(sender, d);
479                    }
480                }
481            }
482            BitmexAction::Delete => {
483                log::info!(
484                    "Received instrument delete action for {} instrument(s)",
485                    data.len(),
486                );
487            }
488        }
489    }
490
491    async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
492        let http = self.http_client.clone();
493        let mut instruments = http
494            .request_instruments(self.config.active_only)
495            .await
496            .context("failed to request BitMEX instruments")?;
497
498        instruments.sort_by_key(|instrument| instrument.id());
499
500        self.instruments.rcu(|m| {
501            m.clear();
502            for instrument in &instruments {
503                m.insert(instrument.id(), instrument.clone());
504            }
505        });
506
507        self.http_client.cache_instruments(&instruments);
508
509        if let Some(ws) = &self.ws_client {
510            ws.cache_instruments(&instruments);
511        }
512
513        for instrument in &instruments {
514            if let Err(e) = self
515                .data_sender
516                .send(DataEvent::Instrument(instrument.clone()))
517            {
518                log::warn!(
519                    "Failed to send instrument event for {}: {e}",
520                    instrument.id()
521                );
522            }
523        }
524
525        Ok(instruments)
526    }
527
528    fn is_connected(&self) -> bool {
529        self.is_connected.load(Ordering::Relaxed)
530    }
531
532    fn is_disconnected(&self) -> bool {
533        !self.is_connected()
534    }
535
536    fn maybe_spawn_instrument_refresh(&mut self) {
537        let Some(minutes) = self.config.update_instruments_interval_mins else {
538            return;
539        };
540
541        if minutes == 0 || self.instrument_refresh_active {
542            return;
543        }
544
545        let interval_secs = minutes.saturating_mul(60);
546        if interval_secs == 0 {
547            return;
548        }
549
550        let interval = Duration::from_secs(interval_secs);
551        let cancellation = self.cancellation_token.clone();
552        let instruments_cache = Arc::clone(&self.instruments);
553        let active_only = self.config.active_only;
554        let client_id = self.client_id;
555        let http_client = self.http_client.clone();
556
557        let handle = get_runtime().spawn(async move {
558            let http_client = http_client;
559
560            loop {
561                let sleep = tokio::time::sleep(interval);
562                tokio::pin!(sleep);
563                tokio::select! {
564                    () = cancellation.cancelled() => {
565                        log::debug!("BitMEX instrument refresh task cancelled");
566                        break;
567                    }
568                    () = &mut sleep => {
569                        match http_client.request_instruments(active_only).await {
570                            Ok(mut instruments) => {
571                                instruments.sort_by_key(|instrument| instrument.id());
572
573                                instruments_cache.rcu(|m| {
574                                    m.clear();
575                                    for instrument in &instruments {
576                                        m.insert(instrument.id(), instrument.clone());
577                                    }
578                                });
579
580                                http_client.cache_instruments(&instruments);
581
582                                log::debug!("BitMEX instruments refreshed: client_id={client_id}");
583                            }
584                            Err(e) => {
585                                log::warn!("Failed to refresh BitMEX instruments: client_id={client_id}, error={e:?}");
586                            }
587                        }
588                    }
589                }
590            }
591        });
592
593        self.tasks.push(handle);
594        self.instrument_refresh_active = true;
595    }
596}
597
598#[async_trait::async_trait(?Send)]
599impl DataClient for BitmexDataClient {
600    fn client_id(&self) -> ClientId {
601        self.client_id
602    }
603
604    fn venue(&self) -> Option<Venue> {
605        Some(self.venue())
606    }
607
608    fn start(&mut self) -> anyhow::Result<()> {
609        log::info!(
610            "Starting BitMEX data client: client_id={}, environment={}, proxy_url={:?}",
611            self.client_id,
612            self.config.environment,
613            self.config.proxy_url,
614        );
615        Ok(())
616    }
617
618    fn stop(&mut self) -> anyhow::Result<()> {
619        log::info!("Stopping BitMEX data client {id}", id = self.client_id);
620        self.cancellation_token.cancel();
621        self.is_connected.store(false, Ordering::Relaxed);
622        self.instrument_refresh_active = false;
623        Ok(())
624    }
625
626    fn reset(&mut self) -> anyhow::Result<()> {
627        log::debug!("Resetting BitMEX data client {id}", id = self.client_id);
628        self.is_connected.store(false, Ordering::Relaxed);
629        self.cancellation_token = CancellationToken::new();
630        self.tasks.clear();
631        self.book_channels.store(AHashMap::new());
632        self.instrument_refresh_active = false;
633        Ok(())
634    }
635
636    fn dispose(&mut self) -> anyhow::Result<()> {
637        self.stop()
638    }
639
640    async fn connect(&mut self) -> anyhow::Result<()> {
641        if self.is_connected() {
642            return Ok(());
643        }
644
645        if self.ws_client.is_none() {
646            let ws = BitmexWebSocketClient::new_with_env(
647                Some(self.config.ws_url()),
648                self.config.api_key.clone(),
649                self.config.api_secret.clone(),
650                None,
651                self.config.heartbeat_interval_secs.unwrap_or(5),
652                self.config.environment,
653                self.config.transport_backend,
654                self.config.proxy_url.clone(),
655            )
656            .context("failed to construct BitMEX websocket client")?;
657            self.ws_client = Some(ws);
658        }
659
660        self.bootstrap_instruments().await?;
661
662        let ws = self.ws_client_mut()?;
663        ws.connect()
664            .await
665            .context("failed to connect BitMEX websocket")?;
666        ws.wait_until_active(10.0)
667            .await
668            .context("BitMEX websocket did not become active")?;
669
670        let stream = ws.stream();
671        self.spawn_stream_task(stream);
672        self.maybe_spawn_instrument_refresh();
673
674        self.is_connected.store(true, Ordering::Relaxed);
675        log::info!("Connected");
676        Ok(())
677    }
678
679    async fn disconnect(&mut self) -> anyhow::Result<()> {
680        if self.is_disconnected() {
681            return Ok(());
682        }
683
684        self.cancellation_token.cancel();
685
686        if let Some(ws) = self.ws_client.as_mut()
687            && let Err(e) = ws.close().await
688        {
689            log::warn!("Error while closing BitMEX websocket: {e:?}");
690        }
691
692        for handle in self.tasks.drain(..) {
693            if let Err(e) = handle.await {
694                log::error!("Error joining websocket task: {e:?}");
695            }
696        }
697
698        self.cancellation_token = CancellationToken::new();
699        self.is_connected.store(false, Ordering::Relaxed);
700        self.book_channels.store(AHashMap::new());
701        self.instrument_refresh_active = false;
702
703        log::info!("Disconnected");
704        Ok(())
705    }
706
707    fn is_connected(&self) -> bool {
708        self.is_connected()
709    }
710
711    fn is_disconnected(&self) -> bool {
712        self.is_disconnected()
713    }
714
715    fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
716        let ws = self.ws_client()?.clone();
717
718        self.spawn_ws(
719            async move {
720                ws.subscribe_instruments()
721                    .await
722                    .map_err(|e| anyhow::anyhow!(e))
723            },
724            "BitMEX instruments subscription",
725        );
726        Ok(())
727    }
728
729    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
730        let instrument_id = cmd.instrument_id;
731
732        if let Some(instrument) = self.instruments.load().get(&instrument_id).cloned() {
733            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
734                log::error!("Failed to send instrument event for {instrument_id}: {e}");
735            }
736            return Ok(());
737        }
738
739        log::warn!("Instrument {instrument_id} not found in BitMEX cache");
740
741        let ws = self.ws_client()?.clone();
742        self.spawn_ws(
743            async move {
744                ws.subscribe_instrument(instrument_id)
745                    .await
746                    .map_err(|e| anyhow::anyhow!(e))
747            },
748            "BitMEX instrument subscription",
749        );
750
751        Ok(())
752    }
753
754    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
755        if cmd.book_type != BookType::L2_MBP {
756            anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
757        }
758
759        let instrument_id = cmd.instrument_id;
760        let depth = cmd.depth.map_or(0, |d| d.get());
761        let channel = if depth > 0 && depth <= 25 {
762            if depth != 25 {
763                log::info!(
764                    "BitMEX only supports depth 25 for L2 deltas, using L2_25 for requested depth {depth}"
765                );
766            }
767            BitmexBookChannel::OrderBookL2_25
768        } else {
769            BitmexBookChannel::OrderBookL2
770        };
771
772        let ws = self.ws_client()?.clone();
773        let book_channels = Arc::clone(&self.book_channels);
774
775        self.spawn_ws(
776            async move {
777                match channel {
778                    BitmexBookChannel::OrderBookL2 => ws
779                        .subscribe_book(instrument_id)
780                        .await
781                        .map_err(|e| anyhow::anyhow!(e))?,
782                    BitmexBookChannel::OrderBookL2_25 => ws
783                        .subscribe_book_25(instrument_id)
784                        .await
785                        .map_err(|e| anyhow::anyhow!(e))?,
786                    BitmexBookChannel::OrderBook10 => unreachable!(),
787                }
788                book_channels.insert(instrument_id, channel);
789                Ok(())
790            },
791            "BitMEX book delta subscription",
792        );
793
794        Ok(())
795    }
796
797    fn subscribe_book_depth10(&mut self, cmd: SubscribeBookDepth10) -> anyhow::Result<()> {
798        let instrument_id = cmd.instrument_id;
799        let ws = self.ws_client()?.clone();
800        let book_channels = Arc::clone(&self.book_channels);
801
802        self.spawn_ws(
803            async move {
804                ws.subscribe_book_depth10(instrument_id)
805                    .await
806                    .map_err(|e| anyhow::anyhow!(e))?;
807                book_channels.insert(instrument_id, BitmexBookChannel::OrderBook10);
808                Ok(())
809            },
810            "BitMEX book depth10 subscription",
811        );
812        Ok(())
813    }
814
815    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
816        let instrument_id = cmd.instrument_id;
817        let ws = self.ws_client()?.clone();
818
819        self.spawn_ws(
820            async move {
821                ws.subscribe_quotes(instrument_id)
822                    .await
823                    .map_err(|e| anyhow::anyhow!(e))
824            },
825            "BitMEX quote subscription",
826        );
827        Ok(())
828    }
829
830    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
831        let instrument_id = cmd.instrument_id;
832        let ws = self.ws_client()?.clone();
833
834        self.spawn_ws(
835            async move {
836                ws.subscribe_trades(instrument_id)
837                    .await
838                    .map_err(|e| anyhow::anyhow!(e))
839            },
840            "BitMEX trade subscription",
841        );
842        Ok(())
843    }
844
845    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
846        let instrument_id = cmd.instrument_id;
847        let ws = self.ws_client()?.clone();
848
849        self.spawn_ws(
850            async move {
851                ws.subscribe_mark_prices(instrument_id)
852                    .await
853                    .map_err(|e| anyhow::anyhow!(e))
854            },
855            "BitMEX mark price subscription",
856        );
857        Ok(())
858    }
859
860    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
861        let instrument_id = cmd.instrument_id;
862        let ws = self.ws_client()?.clone();
863
864        self.spawn_ws(
865            async move {
866                ws.subscribe_index_prices(instrument_id)
867                    .await
868                    .map_err(|e| anyhow::anyhow!(e))
869            },
870            "BitMEX index price subscription",
871        );
872        Ok(())
873    }
874
875    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
876        let instrument_id = cmd.instrument_id;
877        let ws = self.ws_client()?.clone();
878
879        self.spawn_ws(
880            async move {
881                ws.subscribe_funding_rates(instrument_id)
882                    .await
883                    .map_err(|e| anyhow::anyhow!(e))
884            },
885            "BitMEX funding rate subscription",
886        );
887        Ok(())
888    }
889
890    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
891        let bar_type = cmd.bar_type;
892        let ws = self.ws_client()?.clone();
893
894        self.spawn_ws(
895            async move {
896                ws.subscribe_bars(bar_type)
897                    .await
898                    .map_err(|e| anyhow::anyhow!(e))
899            },
900            "BitMEX bar subscription",
901        );
902        Ok(())
903    }
904
905    fn subscribe_instrument_status(
906        &mut self,
907        cmd: SubscribeInstrumentStatus,
908    ) -> anyhow::Result<()> {
909        let instrument_id = cmd.instrument_id;
910        let ws = self.ws_client()?.clone();
911
912        self.spawn_ws(
913            async move {
914                ws.subscribe_instrument(instrument_id)
915                    .await
916                    .map_err(|e| anyhow::anyhow!(e))
917            },
918            "BitMEX instrument status subscription",
919        );
920        Ok(())
921    }
922
923    fn unsubscribe_instrument_status(
924        &mut self,
925        cmd: &UnsubscribeInstrumentStatus,
926    ) -> anyhow::Result<()> {
927        let instrument_id = cmd.instrument_id;
928        let ws = self.ws_client()?.clone();
929
930        self.spawn_ws(
931            async move {
932                ws.unsubscribe_instrument(instrument_id)
933                    .await
934                    .map_err(|e| anyhow::anyhow!(e))
935            },
936            "BitMEX instrument status unsubscribe",
937        );
938        Ok(())
939    }
940
941    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
942        let instrument_id = cmd.instrument_id;
943        let ws = self.ws_client()?.clone();
944        let book_channels = Arc::clone(&self.book_channels);
945
946        self.spawn_ws(
947            async move {
948                let channel = book_channels.load().get(&instrument_id).copied();
949                book_channels.remove(&instrument_id);
950
951                match channel {
952                    Some(BitmexBookChannel::OrderBookL2) => ws
953                        .unsubscribe_book(instrument_id)
954                        .await
955                        .map_err(|e| anyhow::anyhow!(e))?,
956                    Some(BitmexBookChannel::OrderBookL2_25) => ws
957                        .unsubscribe_book_25(instrument_id)
958                        .await
959                        .map_err(|e| anyhow::anyhow!(e))?,
960                    Some(BitmexBookChannel::OrderBook10) => ws
961                        .unsubscribe_book_depth10(instrument_id)
962                        .await
963                        .map_err(|e| anyhow::anyhow!(e))?,
964                    None => ws
965                        .unsubscribe_book(instrument_id)
966                        .await
967                        .map_err(|e| anyhow::anyhow!(e))?,
968                }
969                Ok(())
970            },
971            "BitMEX book delta unsubscribe",
972        );
973        Ok(())
974    }
975
976    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
977        let instrument_id = cmd.instrument_id;
978        let ws = self.ws_client()?.clone();
979        let book_channels = Arc::clone(&self.book_channels);
980
981        self.spawn_ws(
982            async move {
983                book_channels.remove(&instrument_id);
984                ws.unsubscribe_book_depth10(instrument_id)
985                    .await
986                    .map_err(|e| anyhow::anyhow!(e))
987            },
988            "BitMEX book depth10 unsubscribe",
989        );
990        Ok(())
991    }
992
993    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
994        let instrument_id = cmd.instrument_id;
995        let ws = self.ws_client()?.clone();
996
997        self.spawn_ws(
998            async move {
999                ws.unsubscribe_quotes(instrument_id)
1000                    .await
1001                    .map_err(|e| anyhow::anyhow!(e))
1002            },
1003            "BitMEX quote unsubscribe",
1004        );
1005        Ok(())
1006    }
1007
1008    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1009        let instrument_id = cmd.instrument_id;
1010        let ws = self.ws_client()?.clone();
1011
1012        self.spawn_ws(
1013            async move {
1014                ws.unsubscribe_trades(instrument_id)
1015                    .await
1016                    .map_err(|e| anyhow::anyhow!(e))
1017            },
1018            "BitMEX trade unsubscribe",
1019        );
1020        Ok(())
1021    }
1022
1023    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1024        let ws = self.ws_client()?.clone();
1025        let instrument_id = cmd.instrument_id;
1026
1027        self.spawn_ws(
1028            async move {
1029                ws.unsubscribe_mark_prices(instrument_id)
1030                    .await
1031                    .map_err(|e| anyhow::anyhow!(e))
1032            },
1033            "BitMEX mark price unsubscribe",
1034        );
1035        Ok(())
1036    }
1037
1038    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1039        let ws = self.ws_client()?.clone();
1040        let instrument_id = cmd.instrument_id;
1041
1042        self.spawn_ws(
1043            async move {
1044                ws.unsubscribe_index_prices(instrument_id)
1045                    .await
1046                    .map_err(|e| anyhow::anyhow!(e))
1047            },
1048            "BitMEX index price unsubscribe",
1049        );
1050        Ok(())
1051    }
1052
1053    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1054        let ws = self.ws_client()?.clone();
1055        let instrument_id = cmd.instrument_id;
1056
1057        self.spawn_ws(
1058            async move {
1059                ws.unsubscribe_funding_rates(instrument_id)
1060                    .await
1061                    .map_err(|e| anyhow::anyhow!(e))
1062            },
1063            "BitMEX funding rate unsubscribe",
1064        );
1065        Ok(())
1066    }
1067
1068    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1069        let bar_type = cmd.bar_type;
1070        let ws = self.ws_client()?.clone();
1071
1072        self.spawn_ws(
1073            async move {
1074                ws.unsubscribe_bars(bar_type)
1075                    .await
1076                    .map_err(|e| anyhow::anyhow!(e))
1077            },
1078            "BitMEX bar unsubscribe",
1079        );
1080        Ok(())
1081    }
1082
1083    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1084        if let Some(req_venue) = request.venue
1085            && req_venue != self.venue()
1086        {
1087            log::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
1088        }
1089        let venue = self.venue();
1090
1091        let http = self.http_client.clone();
1092        let instruments_cache = Arc::clone(&self.instruments);
1093        let sender = self.data_sender.clone();
1094        let request_id = request.request_id;
1095        let client_id = request.client_id.unwrap_or(self.client_id);
1096        let params = request.params;
1097        let start_nanos = datetime_to_unix_nanos(request.start);
1098        let end_nanos = datetime_to_unix_nanos(request.end);
1099        let clock = self.clock;
1100        let active_only = self.config.active_only;
1101
1102        get_runtime().spawn(async move {
1103            let http_client = http;
1104            match http_client
1105                .request_instruments(active_only)
1106                .await
1107                .context("failed to request instruments from BitMEX")
1108            {
1109                Ok(instruments) => {
1110                    instruments_cache.rcu(|m| {
1111                        m.clear();
1112                        for instrument in &instruments {
1113                            m.insert(instrument.id(), instrument.clone());
1114                        }
1115                    });
1116                    http_client.cache_instruments(&instruments);
1117
1118                    let response = DataResponse::Instruments(InstrumentsResponse::new(
1119                        request_id,
1120                        client_id,
1121                        venue,
1122                        instruments,
1123                        start_nanos,
1124                        end_nanos,
1125                        clock.get_time_ns(),
1126                        params,
1127                    ));
1128
1129                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1130                        log::error!("Failed to send instruments response: {e}");
1131                    }
1132                }
1133                Err(e) => log::error!("Instrument request failed: {e:?}"),
1134            }
1135        });
1136
1137        Ok(())
1138    }
1139
1140    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1141        if let Some(instrument) = self.instruments.load().get(&request.instrument_id).cloned() {
1142            let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1143                request.request_id,
1144                request.client_id.unwrap_or(self.client_id),
1145                instrument.id(),
1146                instrument,
1147                datetime_to_unix_nanos(request.start),
1148                datetime_to_unix_nanos(request.end),
1149                self.clock.get_time_ns(),
1150                request.params,
1151            )));
1152
1153            if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
1154                log::error!("Failed to send instrument response: {e}");
1155            }
1156            return Ok(());
1157        }
1158
1159        let http_client = self.http_client.clone();
1160        let instruments_cache = Arc::clone(&self.instruments);
1161        let sender = self.data_sender.clone();
1162        let instrument_id = request.instrument_id;
1163        let request_id = request.request_id;
1164        let client_id = request.client_id.unwrap_or(self.client_id);
1165        let start = request.start;
1166        let end = request.end;
1167        let params = request.params;
1168        let clock = self.clock;
1169
1170        get_runtime().spawn(async move {
1171            match http_client
1172                .request_instrument(instrument_id)
1173                .await
1174                .context("failed to request instrument from BitMEX")
1175            {
1176                Ok(Some(instrument)) => {
1177                    http_client.cache_instrument(instrument.clone());
1178                    instruments_cache.insert(instrument.id(), instrument.clone());
1179
1180                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1181                        request_id,
1182                        client_id,
1183                        instrument.id(),
1184                        instrument,
1185                        datetime_to_unix_nanos(start),
1186                        datetime_to_unix_nanos(end),
1187                        clock.get_time_ns(),
1188                        params,
1189                    )));
1190
1191                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1192                        log::error!("Failed to send instrument response: {e}");
1193                    }
1194                }
1195                Ok(None) => log::warn!("BitMEX instrument {instrument_id} not found"),
1196                Err(e) => log::error!("Instrument request failed: {e:?}"),
1197            }
1198        });
1199
1200        Ok(())
1201    }
1202
1203    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1204        let http = self.http_client.clone();
1205        let sender = self.data_sender.clone();
1206        let instrument_id = request.instrument_id;
1207        let start = request.start;
1208        let end = request.end;
1209        let limit = request.limit.map(|n| n.get() as u32);
1210        let request_id = request.request_id;
1211        let client_id = request.client_id.unwrap_or(self.client_id);
1212        let params = request.params;
1213        let clock = self.clock;
1214        let start_nanos = datetime_to_unix_nanos(start);
1215        let end_nanos = datetime_to_unix_nanos(end);
1216
1217        get_runtime().spawn(async move {
1218            match http
1219                .request_trades(instrument_id, start, end, limit)
1220                .await
1221                .context("failed to request trades from BitMEX")
1222            {
1223                Ok(trades) => {
1224                    let response = DataResponse::Trades(TradesResponse::new(
1225                        request_id,
1226                        client_id,
1227                        instrument_id,
1228                        trades,
1229                        start_nanos,
1230                        end_nanos,
1231                        clock.get_time_ns(),
1232                        params,
1233                    ));
1234
1235                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1236                        log::error!("Failed to send trades response: {e}");
1237                    }
1238                }
1239                Err(e) => log::error!("Trade request failed: {e:?}"),
1240            }
1241        });
1242
1243        Ok(())
1244    }
1245
1246    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1247        let http = self.http_client.clone();
1248        let sender = self.data_sender.clone();
1249        let bar_type = request.bar_type;
1250        let start = request.start;
1251        let end = request.end;
1252        let limit = request.limit.map(|n| n.get() as u32);
1253        let request_id = request.request_id;
1254        let client_id = request.client_id.unwrap_or(self.client_id);
1255        let params = request.params;
1256        let clock = self.clock;
1257        let start_nanos = datetime_to_unix_nanos(start);
1258        let end_nanos = datetime_to_unix_nanos(end);
1259
1260        get_runtime().spawn(async move {
1261            match http
1262                .request_bars(bar_type, start, end, limit, false)
1263                .await
1264                .context("failed to request bars from BitMEX")
1265            {
1266                Ok(bars) => {
1267                    let response = DataResponse::Bars(BarsResponse::new(
1268                        request_id,
1269                        client_id,
1270                        bar_type,
1271                        bars,
1272                        start_nanos,
1273                        end_nanos,
1274                        clock.get_time_ns(),
1275                        params,
1276                    ));
1277
1278                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1279                        log::error!("Failed to send bars response: {e}");
1280                    }
1281                }
1282                Err(e) => log::error!("Bar request failed: {e:?}"),
1283            }
1284        });
1285
1286        Ok(())
1287    }
1288}
1289
1290fn handle_quote_messages(
1291    data: Vec<BitmexQuoteMsg>,
1292    instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
1293    quote_cache: &mut QuoteCache,
1294    ts_init: UnixNanos,
1295    sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1296) {
1297    for msg in data {
1298        let Some(instrument) = instruments_by_symbol.get(&msg.symbol) else {
1299            log::error!(
1300                "Instrument cache miss: quote dropped for symbol={}",
1301                msg.symbol,
1302            );
1303            continue;
1304        };
1305
1306        let instrument_id = instrument.id();
1307        let price_precision = instrument.price_precision();
1308
1309        let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
1310        let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
1311        let bid_size = msg
1312            .bid_size
1313            .map(|s| parse_contracts_quantity(s, instrument));
1314        let ask_size = msg
1315            .ask_size
1316            .map(|s| parse_contracts_quantity(s, instrument));
1317        let ts_event = UnixNanos::from(msg.timestamp);
1318
1319        match quote_cache.process(
1320            instrument_id,
1321            bid_price,
1322            ask_price,
1323            bid_size,
1324            ask_size,
1325            ts_event,
1326            ts_init,
1327        ) {
1328            Ok(quote) => {
1329                if let Err(e) = sender.send(DataEvent::Data(Data::Quote(quote))) {
1330                    log::error!("Failed to emit data event: {e}");
1331                }
1332            }
1333            Err(e) => {
1334                log::warn!("Failed to process quote for {}: {e}", msg.symbol);
1335            }
1336        }
1337    }
1338}