Skip to main content

nautilus_bitmex/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the BitMEX adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, Mutex,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::{Duration, Instant},
25};
26
27use ahash::AHashMap;
28use anyhow::Context;
29use async_trait::async_trait;
30use futures_util::{StreamExt, pin_mut};
31use nautilus_common::{
32    clients::ExecutionClient,
33    enums::LogLevel,
34    live::{get_runtime, runner::get_exec_event_sender},
35    messages::execution::{
36        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
37        GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
38        GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
39        GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
40        SubmitOrderList,
41    },
42};
43use nautilus_core::{
44    UnixNanos,
45    time::{AtomicTime, get_atomic_clock_realtime},
46};
47use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
48use nautilus_model::{
49    accounts::AccountAny,
50    enums::{AccountType, OmsType, OrderSide, OrderType},
51    identifiers::{
52        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
53    },
54    instruments::{Instrument, InstrumentAny},
55    orders::{Order, OrderAny},
56    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
57    types::{AccountBalance, MarginBalance},
58};
59use rust_decimal::prelude::ToPrimitive;
60use tokio::task::JoinHandle;
61use ustr::Ustr;
62
63use crate::{
64    broadcast::{
65        canceller::{CancelBroadcaster, CancelBroadcasterConfig},
66        submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
67    },
68    common::{
69        enums::BitmexPegPriceType,
70        parse::{parse_peg_offset_value, parse_peg_price_type},
71    },
72    config::BitmexExecClientConfig,
73    http::client::BitmexHttpClient,
74    websocket::{
75        client::BitmexWebSocketClient,
76        dispatch::{self, OrderIdentity, WsDispatchState},
77    },
78};
79
80#[derive(Debug)]
81pub struct BitmexExecutionClient {
82    core: ExecutionClientCore,
83    clock: &'static AtomicTime,
84    config: BitmexExecClientConfig,
85    emitter: ExecutionEventEmitter,
86    http_client: BitmexHttpClient,
87    ws_client: BitmexWebSocketClient,
88    ws_dispatch_state: Arc<WsDispatchState>,
89    _submitter: SubmitBroadcaster,
90    _canceller: CancelBroadcaster,
91    ws_stream_handle: Option<JoinHandle<()>>,
92    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
93    dms_task_handle: Option<JoinHandle<()>>,
94    dms_running: Arc<AtomicBool>,
95}
96
97impl BitmexExecutionClient {
98    fn log_report_receipt(count: usize, report_type: &str, log_level: LogLevel) {
99        let plural = if count == 1 { "" } else { "s" };
100        let message = format!("Received {count} {report_type}{plural}");
101
102        match log_level {
103            LogLevel::Off => {}
104            LogLevel::Trace => log::trace!("{message}"),
105            LogLevel::Debug => log::debug!("{message}"),
106            LogLevel::Info => log::info!("{message}"),
107            LogLevel::Warning => log::warn!("{message}"),
108            LogLevel::Error => log::error!("{message}"),
109        }
110    }
111
112    /// Creates a new [`BitmexExecutionClient`].
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if either the HTTP or WebSocket client fail to construct.
117    pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
118        if !config.has_api_credentials() {
119            anyhow::bail!("BitMEX execution client requires API key and secret");
120        }
121
122        let trader_id = core.trader_id;
123        let account_id = config.account_id.unwrap_or(core.account_id);
124        let clock = get_atomic_clock_realtime();
125        let emitter =
126            ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
127        let http_client = BitmexHttpClient::new(
128            Some(config.http_base_url()),
129            config.api_key.clone(),
130            config.api_secret.clone(),
131            config.environment,
132            config.http_timeout_secs,
133            config.max_retries,
134            config.retry_delay_initial_ms,
135            config.retry_delay_max_ms,
136            config.recv_window_ms,
137            config.max_requests_per_second,
138            config.max_requests_per_minute,
139            config.proxy_url.clone(),
140        )
141        .context("failed to construct BitMEX HTTP client")?;
142        let ws_client = BitmexWebSocketClient::new_with_env(
143            Some(config.ws_url()),
144            config.api_key.clone(),
145            config.api_secret.clone(),
146            Some(account_id),
147            config.heartbeat_interval_secs,
148            config.environment,
149            config.transport_backend,
150            config.proxy_url.clone(),
151        )
152        .context("failed to construct BitMEX execution websocket client")?;
153
154        let pool_size = config.submitter_pool_size.unwrap_or(1);
155        let submitter_proxy_urls = match &config.submitter_proxy_urls {
156            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
157            None => vec![config.proxy_url.clone(); pool_size],
158        };
159
160        let submitter_config = SubmitBroadcasterConfig {
161            pool_size,
162            api_key: config.api_key.clone(),
163            api_secret: config.api_secret.clone(),
164            base_url: config.base_url_http.clone(),
165            environment: config.environment,
166            timeout_secs: config.http_timeout_secs,
167            max_retries: config.max_retries,
168            retry_delay_ms: config.retry_delay_initial_ms,
169            retry_delay_max_ms: config.retry_delay_max_ms,
170            recv_window_ms: config.recv_window_ms,
171            max_requests_per_second: config.max_requests_per_second,
172            max_requests_per_minute: config.max_requests_per_minute,
173            proxy_urls: submitter_proxy_urls,
174            ..Default::default()
175        };
176
177        let _submitter = SubmitBroadcaster::new(submitter_config)
178            .context("failed to create SubmitBroadcaster")?;
179
180        let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
181        let canceller_proxy_urls = match &config.canceller_proxy_urls {
182            Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
183            None => vec![config.proxy_url.clone(); canceller_pool_size],
184        };
185
186        let canceller_config = CancelBroadcasterConfig {
187            pool_size: canceller_pool_size,
188            api_key: config.api_key.clone(),
189            api_secret: config.api_secret.clone(),
190            base_url: config.base_url_http.clone(),
191            environment: config.environment,
192            timeout_secs: config.http_timeout_secs,
193            max_retries: config.max_retries,
194            retry_delay_ms: config.retry_delay_initial_ms,
195            retry_delay_max_ms: config.retry_delay_max_ms,
196            recv_window_ms: config.recv_window_ms,
197            max_requests_per_second: config.max_requests_per_second,
198            max_requests_per_minute: config.max_requests_per_minute,
199            proxy_urls: canceller_proxy_urls,
200            ..Default::default()
201        };
202
203        let _canceller = CancelBroadcaster::new(canceller_config)
204            .context("failed to create CancelBroadcaster")?;
205
206        Ok(Self {
207            core,
208            clock,
209            config,
210            emitter,
211            http_client,
212            ws_client,
213            ws_dispatch_state: Arc::new(WsDispatchState::default()),
214            _submitter,
215            _canceller,
216            ws_stream_handle: None,
217            pending_tasks: Mutex::new(Vec::new()),
218            dms_task_handle: None,
219            dms_running: Arc::new(AtomicBool::new(false)),
220        })
221    }
222
223    fn spawn_task<F>(&self, label: &'static str, fut: F)
224    where
225        F: Future<Output = anyhow::Result<()>> + Send + 'static,
226    {
227        let handle = get_runtime().spawn(async move {
228            if let Err(e) = fut.await {
229                log::error!("{label}: {e:?}");
230            }
231        });
232
233        let mut guard = self
234            .pending_tasks
235            .lock()
236            .expect("pending task lock poisoned");
237
238        // Remove completed tasks to prevent unbounded growth
239        guard.retain(|h| !h.is_finished());
240        guard.push(handle);
241    }
242
243    fn abort_pending_tasks(&self) {
244        let mut guard = self
245            .pending_tasks
246            .lock()
247            .expect("pending task lock poisoned");
248
249        for handle in guard.drain(..) {
250            handle.abort();
251        }
252    }
253
254    /// Populates `order_identities` for an order if not already present.
255    ///
256    /// Needed for cancel/modify commands on orders loaded via reconciliation
257    /// (which bypass `submit_order` and therefore have no identity entry).
258    fn ensure_order_identity(
259        &self,
260        client_order_id: ClientOrderId,
261        strategy_id: StrategyId,
262        instrument_id: InstrumentId,
263    ) {
264        if self
265            .ws_dispatch_state
266            .order_identities
267            .contains_key(&client_order_id)
268        {
269            return;
270        }
271
272        let cache = self.core.cache();
273        let (order_side, order_type) = cache
274            .order(&client_order_id)
275            .map_or((OrderSide::NoOrderSide, OrderType::Market), |o| {
276                (o.order_side(), o.order_type())
277            });
278        drop(cache);
279
280        self.ws_dispatch_state.order_identities.insert(
281            client_order_id,
282            OrderIdentity {
283                instrument_id,
284                strategy_id,
285                order_side,
286                order_type,
287            },
288        );
289        self.ws_dispatch_state.insert_accepted(client_order_id);
290    }
291
292    fn start_deadmans_switch(&mut self) {
293        let Some(timeout_secs) = self.config.deadmans_switch_timeout_secs else {
294            return;
295        };
296
297        let timeout_ms = timeout_secs * 1000;
298        let interval_secs = (timeout_secs / 4).max(1);
299
300        log::info!(
301            "Starting dead man's switch: timeout={timeout_secs}s, refresh_interval={interval_secs}s",
302        );
303
304        self.dms_running.store(true, Ordering::SeqCst);
305        let running = self.dms_running.clone();
306        let http_client = self.http_client.clone();
307
308        let handle = get_runtime().spawn(async move {
309            while running.load(Ordering::SeqCst) {
310                if let Err(e) = http_client.cancel_all_after(timeout_ms).await {
311                    log::warn!("Dead man's switch heartbeat failed: {e}");
312                }
313                tokio::time::sleep(Duration::from_secs(interval_secs)).await;
314            }
315        });
316
317        self.dms_task_handle = Some(handle);
318    }
319
320    async fn stop_deadmans_switch(&mut self) {
321        if self.config.deadmans_switch_timeout_secs.is_none() {
322            return;
323        }
324
325        self.dms_running.store(false, Ordering::SeqCst);
326
327        // Abort and await loop shutdown so disconnect does not block on sleep/HTTP timeout.
328        if let Some(handle) = self.dms_task_handle.take() {
329            handle.abort();
330            let _ = handle.await;
331        }
332
333        log::info!("Disarming dead man's switch");
334
335        if let Err(e) = self.http_client.cancel_all_after(0).await {
336            log::warn!("Failed to disarm dead man's switch: {e}");
337        }
338    }
339
340    async fn ensure_instruments_initialized_async(&self) -> anyhow::Result<()> {
341        if self.core.instruments_initialized() {
342            return Ok(());
343        }
344
345        let mut instruments: Vec<InstrumentAny> = {
346            let cache = self.core.cache();
347            cache
348                .instruments(&self.core.venue, None)
349                .into_iter()
350                .cloned()
351                .collect()
352        };
353
354        if instruments.is_empty() {
355            let http = self.http_client.clone();
356            instruments = http
357                .request_instruments(self.config.active_only)
358                .await
359                .context("failed to request BitMEX instruments")?;
360        } else {
361            log::debug!(
362                "Reusing {} cached BitMEX instruments for execution client initialization",
363                instruments.len()
364            );
365        }
366
367        instruments.sort_by_key(|instrument| instrument.id());
368
369        self.http_client.cache_instruments(&instruments);
370        self.ws_client.cache_instruments(&instruments);
371        for instrument in &instruments {
372            self._submitter.cache_instrument(instrument);
373            self._canceller.cache_instrument(instrument);
374        }
375
376        self.core.set_instruments_initialized();
377        Ok(())
378    }
379
380    async fn refresh_account_state(&self) -> anyhow::Result<()> {
381        let account_state = self
382            .http_client
383            .request_account_state(self.core.account_id)
384            .await
385            .context("failed to request BitMEX account state")?;
386
387        self.emitter.send_account_state(account_state);
388        Ok(())
389    }
390
391    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
392        let account_id = self.core.account_id;
393
394        if self.core.cache().account(&account_id).is_some() {
395            log::info!("Account {account_id} registered");
396            return Ok(());
397        }
398
399        let start = Instant::now();
400        let timeout = Duration::from_secs_f64(timeout_secs);
401        let interval = Duration::from_millis(10);
402
403        loop {
404            tokio::time::sleep(interval).await;
405
406            if self.core.cache().account(&account_id).is_some() {
407                log::info!("Account {account_id} registered");
408                return Ok(());
409            }
410
411            if start.elapsed() >= timeout {
412                anyhow::bail!(
413                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
414                );
415            }
416        }
417    }
418
419    fn start_ws_stream(&mut self) {
420        if self.ws_stream_handle.is_some() {
421            return;
422        }
423
424        let stream = self.ws_client.stream();
425        let emitter = self.emitter.clone();
426        let state = Arc::clone(&self.ws_dispatch_state);
427        let account_id = self.core.account_id;
428        let clock = self.clock;
429
430        // Build symbol-keyed instrument map, preferring core cache then HTTP client cache
431        let mut instruments_by_symbol: AHashMap<Ustr, InstrumentAny> = self
432            .core
433            .cache()
434            .instruments(&self.core.venue, None)
435            .into_iter()
436            .map(|inst| (inst.symbol().inner(), inst.clone()))
437            .collect();
438
439        if instruments_by_symbol.is_empty() {
440            for (key, inst) in self.http_client.instruments_cache.load().iter() {
441                instruments_by_symbol.insert(*key, inst.clone());
442            }
443        }
444
445        let handle = get_runtime().spawn(async move {
446            pin_mut!(stream);
447            let mut order_type_cache: AHashMap<ClientOrderId, OrderType> = AHashMap::new();
448            let mut order_symbol_cache: AHashMap<ClientOrderId, Ustr> = AHashMap::new();
449            let mut insts_by_symbol = instruments_by_symbol;
450
451            while let Some(message) = stream.next().await {
452                dispatch::dispatch_ws_message(
453                    clock.get_time_ns(),
454                    message,
455                    &emitter,
456                    &state,
457                    &mut insts_by_symbol,
458                    &mut order_type_cache,
459                    &mut order_symbol_cache,
460                    account_id,
461                );
462            }
463        });
464
465        self.ws_stream_handle = Some(handle);
466    }
467
468    fn submit_cached_order(
469        &self,
470        order: &OrderAny,
471        submit_tries: Option<usize>,
472        peg_price_type: Option<BitmexPegPriceType>,
473        peg_offset_value: Option<f64>,
474        task_label: &'static str,
475    ) {
476        if order.is_closed() {
477            log::warn!("Cannot submit closed order {}", order.client_order_id());
478            return;
479        }
480
481        self.emitter.emit_order_submitted(order);
482
483        let strategy_id = order.strategy_id();
484        let instrument_id = order.instrument_id();
485        let client_order_id = order.client_order_id();
486        let order_side = order.order_side();
487        let order_type = order.order_type();
488
489        self.ws_dispatch_state.order_identities.insert(
490            client_order_id,
491            OrderIdentity {
492                instrument_id,
493                strategy_id,
494                order_side,
495                order_type,
496            },
497        );
498
499        let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
500        let http_client = self.http_client.clone();
501        let submitter = self._submitter.clone_for_async();
502        let ws_dispatch_state = self.ws_dispatch_state.clone();
503        let emitter = self.emitter.clone();
504        let clock = self.clock;
505        let quantity = order.quantity();
506        let time_in_force = order.time_in_force();
507        let price = order.price();
508        let trigger_price = order.trigger_price();
509        let trigger_type = order.trigger_type();
510        let trailing_offset = order.trailing_offset().and_then(|d| d.to_f64());
511        let trailing_offset_type = order.trailing_offset_type();
512        let display_qty = order.display_qty();
513        let post_only = order.is_post_only();
514        let reduce_only = order.is_reduce_only();
515        let order_list_id = order.order_list_id();
516        let contingency_type = order.contingency_type();
517
518        self.spawn_task(task_label, async move {
519            let result = if use_broadcaster {
520                submitter
521                    .broadcast_submit(
522                        instrument_id,
523                        client_order_id,
524                        order_side,
525                        order_type,
526                        quantity,
527                        time_in_force,
528                        price,
529                        trigger_price,
530                        trigger_type,
531                        trailing_offset,
532                        trailing_offset_type,
533                        display_qty,
534                        post_only,
535                        reduce_only,
536                        order_list_id,
537                        contingency_type,
538                        submit_tries,
539                        peg_price_type,
540                        peg_offset_value,
541                    )
542                    .await
543            } else {
544                http_client
545                    .submit_order(
546                        instrument_id,
547                        client_order_id,
548                        order_side,
549                        order_type,
550                        quantity,
551                        time_in_force,
552                        price,
553                        trigger_price,
554                        trigger_type,
555                        trailing_offset,
556                        trailing_offset_type,
557                        display_qty,
558                        post_only,
559                        reduce_only,
560                        order_list_id,
561                        contingency_type,
562                        peg_price_type,
563                        peg_offset_value,
564                    )
565                    .await
566            };
567
568            match result {
569                Ok(_report) => {
570                    // The WS dispatch handles all lifecycle events for tracked orders.
571                    // Forwarding the HTTP response as a report would cause the ExecEngine
572                    // to generate inferred fills that conflict with real fills from the
573                    // Execution table WS stream.
574                }
575                Err(e) => {
576                    let error_msg = e.to_string();
577
578                    // If all transports returned "Duplicate clOrdID", the order likely exists
579                    // but the success response was lost. Wait for WebSocket confirmation.
580                    if error_msg.contains("IDEMPOTENT_DUPLICATE") {
581                        log::warn!(
582                            "Order {client_order_id} may exist (duplicate clOrdID from all transports), \
583                             awaiting WebSocket confirmation",
584                        );
585                        return Ok(());
586                    }
587
588                    ws_dispatch_state.order_identities.remove(&client_order_id);
589                    let ts_event = clock.get_time_ns();
590                    emitter.emit_order_rejected_event(
591                        strategy_id,
592                        instrument_id,
593                        client_order_id,
594                        &format!("submit-order-error: {error_msg}"),
595                        ts_event,
596                        post_only,
597                    );
598                }
599            }
600            Ok(())
601        });
602    }
603}
604
605#[async_trait(?Send)]
606impl ExecutionClient for BitmexExecutionClient {
607    fn is_connected(&self) -> bool {
608        self.core.is_connected()
609    }
610
611    fn client_id(&self) -> ClientId {
612        self.core.client_id
613    }
614
615    fn account_id(&self) -> AccountId {
616        self.core.account_id
617    }
618
619    fn venue(&self) -> Venue {
620        self.core.venue
621    }
622
623    fn oms_type(&self) -> OmsType {
624        self.core.oms_type
625    }
626
627    fn get_account(&self) -> Option<AccountAny> {
628        self.core.cache().account(&self.core.account_id).cloned()
629    }
630
631    fn generate_account_state(
632        &self,
633        balances: Vec<AccountBalance>,
634        margins: Vec<MarginBalance>,
635        reported: bool,
636        ts_event: UnixNanos,
637    ) -> anyhow::Result<()> {
638        self.emitter
639            .emit_account_state(balances, margins, reported, ts_event);
640        Ok(())
641    }
642
643    fn start(&mut self) -> anyhow::Result<()> {
644        if self.core.is_started() {
645            return Ok(());
646        }
647
648        self.emitter.set_sender(get_exec_event_sender());
649        self.core.set_started();
650        log::info!(
651            "BitMEX execution client started: client_id={}, account_id={}, environment={}, submitter_pool_size={:?}, canceller_pool_size={:?}, proxy_url={:?}, submitter_proxy_urls={:?}, canceller_proxy_urls={:?}",
652            self.core.client_id,
653            self.core.account_id,
654            self.config.environment,
655            self.config.submitter_pool_size,
656            self.config.canceller_pool_size,
657            self.config.proxy_url,
658            self.config.submitter_proxy_urls,
659            self.config.canceller_proxy_urls,
660        );
661        Ok(())
662    }
663
664    fn stop(&mut self) -> anyhow::Result<()> {
665        if self.core.is_stopped() {
666            return Ok(());
667        }
668
669        self.core.set_stopped();
670        self.core.set_disconnected();
671
672        if let Some(handle) = self.ws_stream_handle.take() {
673            handle.abort();
674        }
675
676        if let Some(handle) = self.dms_task_handle.take() {
677            handle.abort();
678        }
679        self.dms_running.store(false, Ordering::SeqCst);
680        self.abort_pending_tasks();
681        log::info!("BitMEX execution client {} stopped", self.core.client_id);
682        Ok(())
683    }
684
685    async fn connect(&mut self) -> anyhow::Result<()> {
686        if self.core.is_connected() {
687            return Ok(());
688        }
689
690        // Reset cancellation token so HTTP requests succeed after reconnect
691        self.http_client.reset_cancellation_token();
692
693        self.ensure_instruments_initialized_async().await?;
694
695        self.ws_client.connect().await?;
696        self.ws_client.wait_until_active(10.0).await?;
697
698        // Start submitter/canceller after WS connection succeeds
699        self._submitter.start().await?;
700        self._canceller.start().await?;
701
702        self.ws_client.subscribe_orders().await?;
703        self.ws_client.subscribe_executions().await?;
704        self.ws_client.subscribe_positions().await?;
705        self.ws_client.subscribe_wallet().await?;
706        if let Err(e) = self.ws_client.subscribe_margin().await {
707            log::debug!("Margin subscription unavailable: {e:?}");
708        }
709
710        self.start_ws_stream();
711        self.refresh_account_state().await?;
712        self.await_account_registered(30.0).await?;
713
714        self.core.set_connected();
715        self.start_deadmans_switch();
716        log::info!("Connected: client_id={}", self.core.client_id);
717        Ok(())
718    }
719
720    async fn disconnect(&mut self) -> anyhow::Result<()> {
721        if self.core.is_disconnected() {
722            return Ok(());
723        }
724
725        // Disarm DMS before cancelling requests (needs working HTTP)
726        self.stop_deadmans_switch().await;
727
728        self.http_client.cancel_all_requests();
729        self._submitter.stop().await;
730        self._canceller.stop().await;
731
732        if let Err(e) = self.ws_client.close().await {
733            log::warn!("Error while closing BitMEX execution websocket: {e:?}");
734        }
735
736        if let Some(handle) = self.ws_stream_handle.take() {
737            handle.abort();
738        }
739
740        self.abort_pending_tasks();
741        self.core.set_disconnected();
742        log::info!("Disconnected: client_id={}", self.core.client_id);
743        Ok(())
744    }
745
746    async fn generate_order_status_report(
747        &self,
748        cmd: &GenerateOrderStatusReport,
749    ) -> anyhow::Result<Option<OrderStatusReport>> {
750        let instrument_id = cmd
751            .instrument_id
752            .context("BitMEX generate_order_status_report requires an instrument identifier")?;
753
754        self.http_client
755            .query_order(
756                instrument_id,
757                cmd.client_order_id,
758                cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
759            )
760            .await
761            .context("failed to query BitMEX order status")
762    }
763
764    async fn generate_order_status_reports(
765        &self,
766        cmd: &GenerateOrderStatusReports,
767    ) -> anyhow::Result<Vec<OrderStatusReport>> {
768        let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
769        let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
770
771        let mut reports = self
772            .http_client
773            .request_order_status_reports(cmd.instrument_id, cmd.open_only, start_dt, end_dt, None)
774            .await
775            .context("failed to request BitMEX order status reports")?;
776
777        if let Some(start) = cmd.start {
778            reports.retain(|report| report.ts_last >= start);
779        }
780
781        if let Some(end) = cmd.end {
782            reports.retain(|report| report.ts_last <= end);
783        }
784
785        Self::log_report_receipt(reports.len(), "OrderStatusReport", cmd.log_receipt_level);
786
787        Ok(reports)
788    }
789
790    async fn generate_fill_reports(
791        &self,
792        cmd: GenerateFillReports,
793    ) -> anyhow::Result<Vec<FillReport>> {
794        let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
795        let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
796
797        let mut reports = self
798            .http_client
799            .request_fill_reports(cmd.instrument_id, start_dt, end_dt, None)
800            .await
801            .context("failed to request BitMEX fill reports")?;
802
803        if let Some(order_id) = cmd.venue_order_id {
804            reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
805        }
806
807        if let Some(start) = cmd.start {
808            reports.retain(|report| report.ts_event >= start);
809        }
810
811        if let Some(end) = cmd.end {
812            reports.retain(|report| report.ts_event <= end);
813        }
814
815        Self::log_report_receipt(reports.len(), "FillReport", cmd.log_receipt_level);
816
817        Ok(reports)
818    }
819
820    async fn generate_position_status_reports(
821        &self,
822        cmd: &GeneratePositionStatusReports,
823    ) -> anyhow::Result<Vec<PositionStatusReport>> {
824        let mut reports = self
825            .http_client
826            .request_position_status_reports()
827            .await
828            .context("failed to request BitMEX position reports")?;
829
830        if let Some(instrument_id) = cmd.instrument_id {
831            reports.retain(|report| report.instrument_id == instrument_id);
832        }
833
834        if let Some(start) = cmd.start {
835            reports.retain(|report| report.ts_last >= start);
836        }
837
838        if let Some(end) = cmd.end {
839            reports.retain(|report| report.ts_last <= end);
840        }
841
842        Self::log_report_receipt(reports.len(), "PositionStatusReport", cmd.log_receipt_level);
843
844        Ok(reports)
845    }
846
847    async fn generate_mass_status(
848        &self,
849        lookback_mins: Option<u64>,
850    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
851        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
852
853        let ts_now = self.clock.get_time_ns();
854        let start = lookback_mins.map(|mins| {
855            let lookback_ns = mins.saturating_mul(60).saturating_mul(1_000_000_000);
856            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
857        });
858
859        let order_cmd = GenerateOrderStatusReportsBuilder::default()
860            .ts_init(ts_now)
861            .open_only(false)
862            .start(start)
863            .build()
864            .map_err(|e| anyhow::anyhow!("{e}"))?;
865
866        let fill_cmd = GenerateFillReportsBuilder::default()
867            .ts_init(ts_now)
868            .start(start)
869            .build()
870            .map_err(|e| anyhow::anyhow!("{e}"))?;
871
872        let position_cmd = GeneratePositionStatusReportsBuilder::default()
873            .ts_init(ts_now)
874            .start(start)
875            .build()
876            .map_err(|e| anyhow::anyhow!("{e}"))?;
877
878        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
879            self.generate_order_status_reports(&order_cmd),
880            self.generate_fill_reports(fill_cmd),
881            self.generate_position_status_reports(&position_cmd),
882        )?;
883
884        let mut mass_status = ExecutionMassStatus::new(
885            self.core.client_id,
886            self.core.account_id,
887            self.core.venue,
888            ts_now,
889            None,
890        );
891        mass_status.add_order_reports(order_reports);
892        mass_status.add_fill_reports(fill_reports);
893        mass_status.add_position_reports(position_reports);
894
895        Ok(Some(mass_status))
896    }
897
898    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
899        let http_client = self.http_client.clone();
900        let emitter = self.emitter.clone();
901        let account_id = self.core.account_id;
902
903        self.spawn_task("query_account", async move {
904            match http_client.request_account_state(account_id).await {
905                Ok(account_state) => emitter.send_account_state(account_state),
906                Err(e) => log::error!("BitMEX query account failed: {e:?}"),
907            }
908            Ok(())
909        });
910
911        Ok(())
912    }
913
914    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
915        let http_client = self.http_client.clone();
916        let instrument_id = cmd.instrument_id;
917        let client_order_id = Some(cmd.client_order_id);
918        let venue_order_id = cmd.venue_order_id;
919        let emitter = self.emitter.clone();
920
921        self.spawn_task("query_order", async move {
922            match http_client
923                .request_order_status_report(instrument_id, client_order_id, venue_order_id)
924                .await
925            {
926                Ok(report) => emitter.send_order_status_report(report),
927                Err(e) => log::error!("BitMEX query order failed: {e:?}"),
928            }
929            Ok(())
930        });
931
932        Ok(())
933    }
934
935    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
936        let submit_tries = cmd
937            .params
938            .as_ref()
939            .and_then(|p| p.get_usize("submit_tries"))
940            .filter(|&n| n > 0);
941
942        let peg_price_type = parse_peg_price_type(cmd.params.as_ref())?;
943        let peg_offset_value = parse_peg_offset_value(cmd.params.as_ref())?;
944
945        let order = self
946            .core
947            .cache()
948            .order(&cmd.client_order_id)
949            .cloned()
950            .ok_or_else(|| {
951                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
952            })?;
953
954        self.submit_cached_order(
955            &order,
956            submit_tries,
957            peg_price_type,
958            peg_offset_value,
959            "submit_order",
960        );
961        Ok(())
962    }
963
964    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
965        if cmd.order_list.client_order_ids.is_empty() {
966            log::debug!("submit_order_list called with empty order list");
967            return Ok(());
968        }
969
970        let submit_tries = cmd
971            .params
972            .as_ref()
973            .and_then(|p| p.get_usize("submit_tries"))
974            .filter(|&n| n > 0);
975
976        let peg_price_type = parse_peg_price_type(cmd.params.as_ref())?;
977        let peg_offset_value = parse_peg_offset_value(cmd.params.as_ref())?;
978
979        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
980
981        log::info!(
982            "Submitting BitMEX order list: order_list_id={}, count={}",
983            cmd.order_list.id,
984            orders.len(),
985        );
986
987        for order in orders {
988            self.submit_cached_order(
989                &order,
990                submit_tries,
991                peg_price_type,
992                peg_offset_value,
993                "submit_order_list_item",
994            );
995        }
996
997        Ok(())
998    }
999
1000    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1001        self.ensure_order_identity(cmd.client_order_id, cmd.strategy_id, cmd.instrument_id);
1002        let http_client = self.http_client.clone();
1003        let emitter = self.emitter.clone();
1004        let instrument_id = cmd.instrument_id;
1005        let client_order_id = Some(cmd.client_order_id);
1006        let venue_order_id = cmd.venue_order_id;
1007        let quantity = cmd.quantity;
1008        let price = cmd.price;
1009        let trigger_price = cmd.trigger_price;
1010
1011        self.spawn_task("modify_order", async move {
1012            match http_client
1013                .modify_order(
1014                    instrument_id,
1015                    client_order_id,
1016                    venue_order_id,
1017                    quantity,
1018                    price,
1019                    trigger_price,
1020                )
1021                .await
1022            {
1023                Ok(report) => emitter.send_order_status_report(report),
1024                Err(e) => log::error!("BitMEX modify order failed: {e:?}"),
1025            }
1026            Ok(())
1027        });
1028
1029        Ok(())
1030    }
1031
1032    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1033        self.ensure_order_identity(cmd.client_order_id, cmd.strategy_id, cmd.instrument_id);
1034        let canceller = self._canceller.clone_for_async();
1035        let emitter = self.emitter.clone();
1036        let dispatch_state = Arc::clone(&self.ws_dispatch_state);
1037        let instrument_id = cmd.instrument_id;
1038        let client_order_id = Some(cmd.client_order_id);
1039        let venue_order_id = cmd.venue_order_id;
1040
1041        self.spawn_task("cancel_order", async move {
1042            match canceller
1043                .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
1044                .await
1045            {
1046                Ok(Some(report)) => {
1047                    if let Some(cid) = &report.client_order_id {
1048                        dispatch_state.tombstone_order(cid);
1049                    }
1050                    emitter.send_order_status_report(report);
1051                }
1052                Ok(None) => {
1053                    log::debug!("Order already cancelled: {client_order_id:?}");
1054                }
1055                Err(e) => log::error!("BitMEX cancel order failed: {e:?}"),
1056            }
1057            Ok(())
1058        });
1059
1060        Ok(())
1061    }
1062
1063    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1064        let canceller = self._canceller.clone_for_async();
1065        let emitter = self.emitter.clone();
1066        let dispatch_state = Arc::clone(&self.ws_dispatch_state);
1067        let instrument_id = cmd.instrument_id;
1068        let order_side = if cmd.order_side == OrderSide::NoOrderSide {
1069            log::debug!(
1070                "BitMEX cancel_all_orders received NoOrderSide for {instrument_id}, using unfiltered cancel-all",
1071            );
1072            None
1073        } else {
1074            Some(cmd.order_side)
1075        };
1076
1077        self.spawn_task("cancel_all_orders", async move {
1078            match canceller
1079                .broadcast_cancel_all(instrument_id, order_side)
1080                .await
1081            {
1082                Ok(reports) => {
1083                    for report in &reports {
1084                        if let Some(cid) = &report.client_order_id {
1085                            dispatch_state.tombstone_order(cid);
1086                        }
1087                    }
1088
1089                    for report in reports {
1090                        emitter.send_order_status_report(report);
1091                    }
1092                }
1093                Err(e) => log::error!("BitMEX cancel all failed: {e:?}"),
1094            }
1095            Ok(())
1096        });
1097
1098        Ok(())
1099    }
1100
1101    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1102        let canceller = self._canceller.clone_for_async();
1103        let emitter = self.emitter.clone();
1104        let dispatch_state = Arc::clone(&self.ws_dispatch_state);
1105        let instrument_id = cmd.instrument_id;
1106
1107        let client_ids: Vec<ClientOrderId> = cmd
1108            .cancels
1109            .iter()
1110            .map(|cancel| cancel.client_order_id)
1111            .collect();
1112
1113        let venue_ids: Vec<VenueOrderId> = cmd
1114            .cancels
1115            .iter()
1116            .filter_map(|cancel| cancel.venue_order_id)
1117            .collect();
1118
1119        let client_ids_opt = if client_ids.is_empty() {
1120            None
1121        } else {
1122            Some(client_ids)
1123        };
1124
1125        let venue_ids_opt = if venue_ids.is_empty() {
1126            None
1127        } else {
1128            Some(venue_ids)
1129        };
1130
1131        self.spawn_task("batch_cancel_orders", async move {
1132            match canceller
1133                .broadcast_batch_cancel(instrument_id, client_ids_opt, venue_ids_opt)
1134                .await
1135            {
1136                Ok(reports) => {
1137                    for report in &reports {
1138                        if let Some(cid) = &report.client_order_id {
1139                            dispatch_state.tombstone_order(cid);
1140                        }
1141                    }
1142
1143                    for report in reports {
1144                        emitter.send_order_status_report(report);
1145                    }
1146                }
1147                Err(e) => log::error!("BitMEX batch cancel failed: {e:?}"),
1148            }
1149            Ok(())
1150        });
1151
1152        Ok(())
1153    }
1154}