1use std::{
19 future::Future,
20 sync::{
21 Arc, Mutex,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::{Duration, Instant},
25};
26
27use ahash::AHashMap;
28use anyhow::Context;
29use async_trait::async_trait;
30use futures_util::{StreamExt, pin_mut};
31use nautilus_common::{
32 clients::ExecutionClient,
33 enums::LogLevel,
34 live::{get_runtime, runner::get_exec_event_sender},
35 messages::execution::{
36 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
37 GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
38 GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
39 GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
40 SubmitOrderList,
41 },
42};
43use nautilus_core::{
44 UnixNanos,
45 time::{AtomicTime, get_atomic_clock_realtime},
46};
47use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
48use nautilus_model::{
49 accounts::AccountAny,
50 enums::{AccountType, OmsType, OrderSide, OrderType},
51 identifiers::{
52 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
53 },
54 instruments::{Instrument, InstrumentAny},
55 orders::{Order, OrderAny},
56 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
57 types::{AccountBalance, MarginBalance},
58};
59use rust_decimal::prelude::ToPrimitive;
60use tokio::task::JoinHandle;
61use ustr::Ustr;
62
63use crate::{
64 broadcast::{
65 canceller::{CancelBroadcaster, CancelBroadcasterConfig},
66 submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
67 },
68 common::{
69 enums::BitmexPegPriceType,
70 parse::{parse_peg_offset_value, parse_peg_price_type},
71 },
72 config::BitmexExecClientConfig,
73 http::client::BitmexHttpClient,
74 websocket::{
75 client::BitmexWebSocketClient,
76 dispatch::{self, OrderIdentity, WsDispatchState},
77 },
78};
79
80#[derive(Debug)]
81pub struct BitmexExecutionClient {
82 core: ExecutionClientCore,
83 clock: &'static AtomicTime,
84 config: BitmexExecClientConfig,
85 emitter: ExecutionEventEmitter,
86 http_client: BitmexHttpClient,
87 ws_client: BitmexWebSocketClient,
88 ws_dispatch_state: Arc<WsDispatchState>,
89 _submitter: SubmitBroadcaster,
90 _canceller: CancelBroadcaster,
91 ws_stream_handle: Option<JoinHandle<()>>,
92 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
93 dms_task_handle: Option<JoinHandle<()>>,
94 dms_running: Arc<AtomicBool>,
95}
96
97impl BitmexExecutionClient {
98 fn log_report_receipt(count: usize, report_type: &str, log_level: LogLevel) {
99 let plural = if count == 1 { "" } else { "s" };
100 let message = format!("Received {count} {report_type}{plural}");
101
102 match log_level {
103 LogLevel::Off => {}
104 LogLevel::Trace => log::trace!("{message}"),
105 LogLevel::Debug => log::debug!("{message}"),
106 LogLevel::Info => log::info!("{message}"),
107 LogLevel::Warning => log::warn!("{message}"),
108 LogLevel::Error => log::error!("{message}"),
109 }
110 }
111
112 pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
118 if !config.has_api_credentials() {
119 anyhow::bail!("BitMEX execution client requires API key and secret");
120 }
121
122 let trader_id = core.trader_id;
123 let account_id = config.account_id.unwrap_or(core.account_id);
124 let clock = get_atomic_clock_realtime();
125 let emitter =
126 ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
127 let http_client = BitmexHttpClient::new(
128 Some(config.http_base_url()),
129 config.api_key.clone(),
130 config.api_secret.clone(),
131 config.environment,
132 config.http_timeout_secs,
133 config.max_retries,
134 config.retry_delay_initial_ms,
135 config.retry_delay_max_ms,
136 config.recv_window_ms,
137 config.max_requests_per_second,
138 config.max_requests_per_minute,
139 config.proxy_url.clone(),
140 )
141 .context("failed to construct BitMEX HTTP client")?;
142 let ws_client = BitmexWebSocketClient::new_with_env(
143 Some(config.ws_url()),
144 config.api_key.clone(),
145 config.api_secret.clone(),
146 Some(account_id),
147 config.heartbeat_interval_secs,
148 config.environment,
149 config.transport_backend,
150 config.proxy_url.clone(),
151 )
152 .context("failed to construct BitMEX execution websocket client")?;
153
154 let pool_size = config.submitter_pool_size.unwrap_or(1);
155 let submitter_proxy_urls = match &config.submitter_proxy_urls {
156 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
157 None => vec![config.proxy_url.clone(); pool_size],
158 };
159
160 let submitter_config = SubmitBroadcasterConfig {
161 pool_size,
162 api_key: config.api_key.clone(),
163 api_secret: config.api_secret.clone(),
164 base_url: config.base_url_http.clone(),
165 environment: config.environment,
166 timeout_secs: config.http_timeout_secs,
167 max_retries: config.max_retries,
168 retry_delay_ms: config.retry_delay_initial_ms,
169 retry_delay_max_ms: config.retry_delay_max_ms,
170 recv_window_ms: config.recv_window_ms,
171 max_requests_per_second: config.max_requests_per_second,
172 max_requests_per_minute: config.max_requests_per_minute,
173 proxy_urls: submitter_proxy_urls,
174 ..Default::default()
175 };
176
177 let _submitter = SubmitBroadcaster::new(submitter_config)
178 .context("failed to create SubmitBroadcaster")?;
179
180 let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
181 let canceller_proxy_urls = match &config.canceller_proxy_urls {
182 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
183 None => vec![config.proxy_url.clone(); canceller_pool_size],
184 };
185
186 let canceller_config = CancelBroadcasterConfig {
187 pool_size: canceller_pool_size,
188 api_key: config.api_key.clone(),
189 api_secret: config.api_secret.clone(),
190 base_url: config.base_url_http.clone(),
191 environment: config.environment,
192 timeout_secs: config.http_timeout_secs,
193 max_retries: config.max_retries,
194 retry_delay_ms: config.retry_delay_initial_ms,
195 retry_delay_max_ms: config.retry_delay_max_ms,
196 recv_window_ms: config.recv_window_ms,
197 max_requests_per_second: config.max_requests_per_second,
198 max_requests_per_minute: config.max_requests_per_minute,
199 proxy_urls: canceller_proxy_urls,
200 ..Default::default()
201 };
202
203 let _canceller = CancelBroadcaster::new(canceller_config)
204 .context("failed to create CancelBroadcaster")?;
205
206 Ok(Self {
207 core,
208 clock,
209 config,
210 emitter,
211 http_client,
212 ws_client,
213 ws_dispatch_state: Arc::new(WsDispatchState::default()),
214 _submitter,
215 _canceller,
216 ws_stream_handle: None,
217 pending_tasks: Mutex::new(Vec::new()),
218 dms_task_handle: None,
219 dms_running: Arc::new(AtomicBool::new(false)),
220 })
221 }
222
223 fn spawn_task<F>(&self, label: &'static str, fut: F)
224 where
225 F: Future<Output = anyhow::Result<()>> + Send + 'static,
226 {
227 let handle = get_runtime().spawn(async move {
228 if let Err(e) = fut.await {
229 log::error!("{label}: {e:?}");
230 }
231 });
232
233 let mut guard = self
234 .pending_tasks
235 .lock()
236 .expect("pending task lock poisoned");
237
238 guard.retain(|h| !h.is_finished());
240 guard.push(handle);
241 }
242
243 fn abort_pending_tasks(&self) {
244 let mut guard = self
245 .pending_tasks
246 .lock()
247 .expect("pending task lock poisoned");
248
249 for handle in guard.drain(..) {
250 handle.abort();
251 }
252 }
253
254 fn ensure_order_identity(
259 &self,
260 client_order_id: ClientOrderId,
261 strategy_id: StrategyId,
262 instrument_id: InstrumentId,
263 ) {
264 if self
265 .ws_dispatch_state
266 .order_identities
267 .contains_key(&client_order_id)
268 {
269 return;
270 }
271
272 let cache = self.core.cache();
273 let (order_side, order_type) = cache
274 .order(&client_order_id)
275 .map_or((OrderSide::NoOrderSide, OrderType::Market), |o| {
276 (o.order_side(), o.order_type())
277 });
278 drop(cache);
279
280 self.ws_dispatch_state.order_identities.insert(
281 client_order_id,
282 OrderIdentity {
283 instrument_id,
284 strategy_id,
285 order_side,
286 order_type,
287 },
288 );
289 self.ws_dispatch_state.insert_accepted(client_order_id);
290 }
291
292 fn start_deadmans_switch(&mut self) {
293 let Some(timeout_secs) = self.config.deadmans_switch_timeout_secs else {
294 return;
295 };
296
297 let timeout_ms = timeout_secs * 1000;
298 let interval_secs = (timeout_secs / 4).max(1);
299
300 log::info!(
301 "Starting dead man's switch: timeout={timeout_secs}s, refresh_interval={interval_secs}s",
302 );
303
304 self.dms_running.store(true, Ordering::SeqCst);
305 let running = self.dms_running.clone();
306 let http_client = self.http_client.clone();
307
308 let handle = get_runtime().spawn(async move {
309 while running.load(Ordering::SeqCst) {
310 if let Err(e) = http_client.cancel_all_after(timeout_ms).await {
311 log::warn!("Dead man's switch heartbeat failed: {e}");
312 }
313 tokio::time::sleep(Duration::from_secs(interval_secs)).await;
314 }
315 });
316
317 self.dms_task_handle = Some(handle);
318 }
319
320 async fn stop_deadmans_switch(&mut self) {
321 if self.config.deadmans_switch_timeout_secs.is_none() {
322 return;
323 }
324
325 self.dms_running.store(false, Ordering::SeqCst);
326
327 if let Some(handle) = self.dms_task_handle.take() {
329 handle.abort();
330 let _ = handle.await;
331 }
332
333 log::info!("Disarming dead man's switch");
334
335 if let Err(e) = self.http_client.cancel_all_after(0).await {
336 log::warn!("Failed to disarm dead man's switch: {e}");
337 }
338 }
339
340 async fn ensure_instruments_initialized_async(&self) -> anyhow::Result<()> {
341 if self.core.instruments_initialized() {
342 return Ok(());
343 }
344
345 let mut instruments: Vec<InstrumentAny> = {
346 let cache = self.core.cache();
347 cache
348 .instruments(&self.core.venue, None)
349 .into_iter()
350 .cloned()
351 .collect()
352 };
353
354 if instruments.is_empty() {
355 let http = self.http_client.clone();
356 instruments = http
357 .request_instruments(self.config.active_only)
358 .await
359 .context("failed to request BitMEX instruments")?;
360 } else {
361 log::debug!(
362 "Reusing {} cached BitMEX instruments for execution client initialization",
363 instruments.len()
364 );
365 }
366
367 instruments.sort_by_key(|instrument| instrument.id());
368
369 self.http_client.cache_instruments(&instruments);
370 self.ws_client.cache_instruments(&instruments);
371 for instrument in &instruments {
372 self._submitter.cache_instrument(instrument);
373 self._canceller.cache_instrument(instrument);
374 }
375
376 self.core.set_instruments_initialized();
377 Ok(())
378 }
379
380 async fn refresh_account_state(&self) -> anyhow::Result<()> {
381 let account_state = self
382 .http_client
383 .request_account_state(self.core.account_id)
384 .await
385 .context("failed to request BitMEX account state")?;
386
387 self.emitter.send_account_state(account_state);
388 Ok(())
389 }
390
391 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
392 let account_id = self.core.account_id;
393
394 if self.core.cache().account(&account_id).is_some() {
395 log::info!("Account {account_id} registered");
396 return Ok(());
397 }
398
399 let start = Instant::now();
400 let timeout = Duration::from_secs_f64(timeout_secs);
401 let interval = Duration::from_millis(10);
402
403 loop {
404 tokio::time::sleep(interval).await;
405
406 if self.core.cache().account(&account_id).is_some() {
407 log::info!("Account {account_id} registered");
408 return Ok(());
409 }
410
411 if start.elapsed() >= timeout {
412 anyhow::bail!(
413 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
414 );
415 }
416 }
417 }
418
419 fn start_ws_stream(&mut self) {
420 if self.ws_stream_handle.is_some() {
421 return;
422 }
423
424 let stream = self.ws_client.stream();
425 let emitter = self.emitter.clone();
426 let state = Arc::clone(&self.ws_dispatch_state);
427 let account_id = self.core.account_id;
428 let clock = self.clock;
429
430 let mut instruments_by_symbol: AHashMap<Ustr, InstrumentAny> = self
432 .core
433 .cache()
434 .instruments(&self.core.venue, None)
435 .into_iter()
436 .map(|inst| (inst.symbol().inner(), inst.clone()))
437 .collect();
438
439 if instruments_by_symbol.is_empty() {
440 for (key, inst) in self.http_client.instruments_cache.load().iter() {
441 instruments_by_symbol.insert(*key, inst.clone());
442 }
443 }
444
445 let handle = get_runtime().spawn(async move {
446 pin_mut!(stream);
447 let mut order_type_cache: AHashMap<ClientOrderId, OrderType> = AHashMap::new();
448 let mut order_symbol_cache: AHashMap<ClientOrderId, Ustr> = AHashMap::new();
449 let mut insts_by_symbol = instruments_by_symbol;
450
451 while let Some(message) = stream.next().await {
452 dispatch::dispatch_ws_message(
453 clock.get_time_ns(),
454 message,
455 &emitter,
456 &state,
457 &mut insts_by_symbol,
458 &mut order_type_cache,
459 &mut order_symbol_cache,
460 account_id,
461 );
462 }
463 });
464
465 self.ws_stream_handle = Some(handle);
466 }
467
468 fn submit_cached_order(
469 &self,
470 order: &OrderAny,
471 submit_tries: Option<usize>,
472 peg_price_type: Option<BitmexPegPriceType>,
473 peg_offset_value: Option<f64>,
474 task_label: &'static str,
475 ) {
476 if order.is_closed() {
477 log::warn!("Cannot submit closed order {}", order.client_order_id());
478 return;
479 }
480
481 self.emitter.emit_order_submitted(order);
482
483 let strategy_id = order.strategy_id();
484 let instrument_id = order.instrument_id();
485 let client_order_id = order.client_order_id();
486 let order_side = order.order_side();
487 let order_type = order.order_type();
488
489 self.ws_dispatch_state.order_identities.insert(
490 client_order_id,
491 OrderIdentity {
492 instrument_id,
493 strategy_id,
494 order_side,
495 order_type,
496 },
497 );
498
499 let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
500 let http_client = self.http_client.clone();
501 let submitter = self._submitter.clone_for_async();
502 let ws_dispatch_state = self.ws_dispatch_state.clone();
503 let emitter = self.emitter.clone();
504 let clock = self.clock;
505 let quantity = order.quantity();
506 let time_in_force = order.time_in_force();
507 let price = order.price();
508 let trigger_price = order.trigger_price();
509 let trigger_type = order.trigger_type();
510 let trailing_offset = order.trailing_offset().and_then(|d| d.to_f64());
511 let trailing_offset_type = order.trailing_offset_type();
512 let display_qty = order.display_qty();
513 let post_only = order.is_post_only();
514 let reduce_only = order.is_reduce_only();
515 let order_list_id = order.order_list_id();
516 let contingency_type = order.contingency_type();
517
518 self.spawn_task(task_label, async move {
519 let result = if use_broadcaster {
520 submitter
521 .broadcast_submit(
522 instrument_id,
523 client_order_id,
524 order_side,
525 order_type,
526 quantity,
527 time_in_force,
528 price,
529 trigger_price,
530 trigger_type,
531 trailing_offset,
532 trailing_offset_type,
533 display_qty,
534 post_only,
535 reduce_only,
536 order_list_id,
537 contingency_type,
538 submit_tries,
539 peg_price_type,
540 peg_offset_value,
541 )
542 .await
543 } else {
544 http_client
545 .submit_order(
546 instrument_id,
547 client_order_id,
548 order_side,
549 order_type,
550 quantity,
551 time_in_force,
552 price,
553 trigger_price,
554 trigger_type,
555 trailing_offset,
556 trailing_offset_type,
557 display_qty,
558 post_only,
559 reduce_only,
560 order_list_id,
561 contingency_type,
562 peg_price_type,
563 peg_offset_value,
564 )
565 .await
566 };
567
568 match result {
569 Ok(_report) => {
570 }
575 Err(e) => {
576 let error_msg = e.to_string();
577
578 if error_msg.contains("IDEMPOTENT_DUPLICATE") {
581 log::warn!(
582 "Order {client_order_id} may exist (duplicate clOrdID from all transports), \
583 awaiting WebSocket confirmation",
584 );
585 return Ok(());
586 }
587
588 ws_dispatch_state.order_identities.remove(&client_order_id);
589 let ts_event = clock.get_time_ns();
590 emitter.emit_order_rejected_event(
591 strategy_id,
592 instrument_id,
593 client_order_id,
594 &format!("submit-order-error: {error_msg}"),
595 ts_event,
596 post_only,
597 );
598 }
599 }
600 Ok(())
601 });
602 }
603}
604
605#[async_trait(?Send)]
606impl ExecutionClient for BitmexExecutionClient {
607 fn is_connected(&self) -> bool {
608 self.core.is_connected()
609 }
610
611 fn client_id(&self) -> ClientId {
612 self.core.client_id
613 }
614
615 fn account_id(&self) -> AccountId {
616 self.core.account_id
617 }
618
619 fn venue(&self) -> Venue {
620 self.core.venue
621 }
622
623 fn oms_type(&self) -> OmsType {
624 self.core.oms_type
625 }
626
627 fn get_account(&self) -> Option<AccountAny> {
628 self.core.cache().account(&self.core.account_id).cloned()
629 }
630
631 fn generate_account_state(
632 &self,
633 balances: Vec<AccountBalance>,
634 margins: Vec<MarginBalance>,
635 reported: bool,
636 ts_event: UnixNanos,
637 ) -> anyhow::Result<()> {
638 self.emitter
639 .emit_account_state(balances, margins, reported, ts_event);
640 Ok(())
641 }
642
643 fn start(&mut self) -> anyhow::Result<()> {
644 if self.core.is_started() {
645 return Ok(());
646 }
647
648 self.emitter.set_sender(get_exec_event_sender());
649 self.core.set_started();
650 log::info!(
651 "BitMEX execution client started: client_id={}, account_id={}, environment={}, submitter_pool_size={:?}, canceller_pool_size={:?}, proxy_url={:?}, submitter_proxy_urls={:?}, canceller_proxy_urls={:?}",
652 self.core.client_id,
653 self.core.account_id,
654 self.config.environment,
655 self.config.submitter_pool_size,
656 self.config.canceller_pool_size,
657 self.config.proxy_url,
658 self.config.submitter_proxy_urls,
659 self.config.canceller_proxy_urls,
660 );
661 Ok(())
662 }
663
664 fn stop(&mut self) -> anyhow::Result<()> {
665 if self.core.is_stopped() {
666 return Ok(());
667 }
668
669 self.core.set_stopped();
670 self.core.set_disconnected();
671
672 if let Some(handle) = self.ws_stream_handle.take() {
673 handle.abort();
674 }
675
676 if let Some(handle) = self.dms_task_handle.take() {
677 handle.abort();
678 }
679 self.dms_running.store(false, Ordering::SeqCst);
680 self.abort_pending_tasks();
681 log::info!("BitMEX execution client {} stopped", self.core.client_id);
682 Ok(())
683 }
684
685 async fn connect(&mut self) -> anyhow::Result<()> {
686 if self.core.is_connected() {
687 return Ok(());
688 }
689
690 self.http_client.reset_cancellation_token();
692
693 self.ensure_instruments_initialized_async().await?;
694
695 self.ws_client.connect().await?;
696 self.ws_client.wait_until_active(10.0).await?;
697
698 self._submitter.start().await?;
700 self._canceller.start().await?;
701
702 self.ws_client.subscribe_orders().await?;
703 self.ws_client.subscribe_executions().await?;
704 self.ws_client.subscribe_positions().await?;
705 self.ws_client.subscribe_wallet().await?;
706 if let Err(e) = self.ws_client.subscribe_margin().await {
707 log::debug!("Margin subscription unavailable: {e:?}");
708 }
709
710 self.start_ws_stream();
711 self.refresh_account_state().await?;
712 self.await_account_registered(30.0).await?;
713
714 self.core.set_connected();
715 self.start_deadmans_switch();
716 log::info!("Connected: client_id={}", self.core.client_id);
717 Ok(())
718 }
719
720 async fn disconnect(&mut self) -> anyhow::Result<()> {
721 if self.core.is_disconnected() {
722 return Ok(());
723 }
724
725 self.stop_deadmans_switch().await;
727
728 self.http_client.cancel_all_requests();
729 self._submitter.stop().await;
730 self._canceller.stop().await;
731
732 if let Err(e) = self.ws_client.close().await {
733 log::warn!("Error while closing BitMEX execution websocket: {e:?}");
734 }
735
736 if let Some(handle) = self.ws_stream_handle.take() {
737 handle.abort();
738 }
739
740 self.abort_pending_tasks();
741 self.core.set_disconnected();
742 log::info!("Disconnected: client_id={}", self.core.client_id);
743 Ok(())
744 }
745
746 async fn generate_order_status_report(
747 &self,
748 cmd: &GenerateOrderStatusReport,
749 ) -> anyhow::Result<Option<OrderStatusReport>> {
750 let instrument_id = cmd
751 .instrument_id
752 .context("BitMEX generate_order_status_report requires an instrument identifier")?;
753
754 self.http_client
755 .query_order(
756 instrument_id,
757 cmd.client_order_id,
758 cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
759 )
760 .await
761 .context("failed to query BitMEX order status")
762 }
763
764 async fn generate_order_status_reports(
765 &self,
766 cmd: &GenerateOrderStatusReports,
767 ) -> anyhow::Result<Vec<OrderStatusReport>> {
768 let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
769 let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
770
771 let mut reports = self
772 .http_client
773 .request_order_status_reports(cmd.instrument_id, cmd.open_only, start_dt, end_dt, None)
774 .await
775 .context("failed to request BitMEX order status reports")?;
776
777 if let Some(start) = cmd.start {
778 reports.retain(|report| report.ts_last >= start);
779 }
780
781 if let Some(end) = cmd.end {
782 reports.retain(|report| report.ts_last <= end);
783 }
784
785 Self::log_report_receipt(reports.len(), "OrderStatusReport", cmd.log_receipt_level);
786
787 Ok(reports)
788 }
789
790 async fn generate_fill_reports(
791 &self,
792 cmd: GenerateFillReports,
793 ) -> anyhow::Result<Vec<FillReport>> {
794 let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
795 let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
796
797 let mut reports = self
798 .http_client
799 .request_fill_reports(cmd.instrument_id, start_dt, end_dt, None)
800 .await
801 .context("failed to request BitMEX fill reports")?;
802
803 if let Some(order_id) = cmd.venue_order_id {
804 reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
805 }
806
807 if let Some(start) = cmd.start {
808 reports.retain(|report| report.ts_event >= start);
809 }
810
811 if let Some(end) = cmd.end {
812 reports.retain(|report| report.ts_event <= end);
813 }
814
815 Self::log_report_receipt(reports.len(), "FillReport", cmd.log_receipt_level);
816
817 Ok(reports)
818 }
819
820 async fn generate_position_status_reports(
821 &self,
822 cmd: &GeneratePositionStatusReports,
823 ) -> anyhow::Result<Vec<PositionStatusReport>> {
824 let mut reports = self
825 .http_client
826 .request_position_status_reports()
827 .await
828 .context("failed to request BitMEX position reports")?;
829
830 if let Some(instrument_id) = cmd.instrument_id {
831 reports.retain(|report| report.instrument_id == instrument_id);
832 }
833
834 if let Some(start) = cmd.start {
835 reports.retain(|report| report.ts_last >= start);
836 }
837
838 if let Some(end) = cmd.end {
839 reports.retain(|report| report.ts_last <= end);
840 }
841
842 Self::log_report_receipt(reports.len(), "PositionStatusReport", cmd.log_receipt_level);
843
844 Ok(reports)
845 }
846
847 async fn generate_mass_status(
848 &self,
849 lookback_mins: Option<u64>,
850 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
851 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
852
853 let ts_now = self.clock.get_time_ns();
854 let start = lookback_mins.map(|mins| {
855 let lookback_ns = mins.saturating_mul(60).saturating_mul(1_000_000_000);
856 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
857 });
858
859 let order_cmd = GenerateOrderStatusReportsBuilder::default()
860 .ts_init(ts_now)
861 .open_only(false)
862 .start(start)
863 .build()
864 .map_err(|e| anyhow::anyhow!("{e}"))?;
865
866 let fill_cmd = GenerateFillReportsBuilder::default()
867 .ts_init(ts_now)
868 .start(start)
869 .build()
870 .map_err(|e| anyhow::anyhow!("{e}"))?;
871
872 let position_cmd = GeneratePositionStatusReportsBuilder::default()
873 .ts_init(ts_now)
874 .start(start)
875 .build()
876 .map_err(|e| anyhow::anyhow!("{e}"))?;
877
878 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
879 self.generate_order_status_reports(&order_cmd),
880 self.generate_fill_reports(fill_cmd),
881 self.generate_position_status_reports(&position_cmd),
882 )?;
883
884 let mut mass_status = ExecutionMassStatus::new(
885 self.core.client_id,
886 self.core.account_id,
887 self.core.venue,
888 ts_now,
889 None,
890 );
891 mass_status.add_order_reports(order_reports);
892 mass_status.add_fill_reports(fill_reports);
893 mass_status.add_position_reports(position_reports);
894
895 Ok(Some(mass_status))
896 }
897
898 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
899 let http_client = self.http_client.clone();
900 let emitter = self.emitter.clone();
901 let account_id = self.core.account_id;
902
903 self.spawn_task("query_account", async move {
904 match http_client.request_account_state(account_id).await {
905 Ok(account_state) => emitter.send_account_state(account_state),
906 Err(e) => log::error!("BitMEX query account failed: {e:?}"),
907 }
908 Ok(())
909 });
910
911 Ok(())
912 }
913
914 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
915 let http_client = self.http_client.clone();
916 let instrument_id = cmd.instrument_id;
917 let client_order_id = Some(cmd.client_order_id);
918 let venue_order_id = cmd.venue_order_id;
919 let emitter = self.emitter.clone();
920
921 self.spawn_task("query_order", async move {
922 match http_client
923 .request_order_status_report(instrument_id, client_order_id, venue_order_id)
924 .await
925 {
926 Ok(report) => emitter.send_order_status_report(report),
927 Err(e) => log::error!("BitMEX query order failed: {e:?}"),
928 }
929 Ok(())
930 });
931
932 Ok(())
933 }
934
935 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
936 let submit_tries = cmd
937 .params
938 .as_ref()
939 .and_then(|p| p.get_usize("submit_tries"))
940 .filter(|&n| n > 0);
941
942 let peg_price_type = parse_peg_price_type(cmd.params.as_ref())?;
943 let peg_offset_value = parse_peg_offset_value(cmd.params.as_ref())?;
944
945 let order = self
946 .core
947 .cache()
948 .order(&cmd.client_order_id)
949 .cloned()
950 .ok_or_else(|| {
951 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
952 })?;
953
954 self.submit_cached_order(
955 &order,
956 submit_tries,
957 peg_price_type,
958 peg_offset_value,
959 "submit_order",
960 );
961 Ok(())
962 }
963
964 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
965 if cmd.order_list.client_order_ids.is_empty() {
966 log::debug!("submit_order_list called with empty order list");
967 return Ok(());
968 }
969
970 let submit_tries = cmd
971 .params
972 .as_ref()
973 .and_then(|p| p.get_usize("submit_tries"))
974 .filter(|&n| n > 0);
975
976 let peg_price_type = parse_peg_price_type(cmd.params.as_ref())?;
977 let peg_offset_value = parse_peg_offset_value(cmd.params.as_ref())?;
978
979 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
980
981 log::info!(
982 "Submitting BitMEX order list: order_list_id={}, count={}",
983 cmd.order_list.id,
984 orders.len(),
985 );
986
987 for order in orders {
988 self.submit_cached_order(
989 &order,
990 submit_tries,
991 peg_price_type,
992 peg_offset_value,
993 "submit_order_list_item",
994 );
995 }
996
997 Ok(())
998 }
999
1000 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1001 self.ensure_order_identity(cmd.client_order_id, cmd.strategy_id, cmd.instrument_id);
1002 let http_client = self.http_client.clone();
1003 let emitter = self.emitter.clone();
1004 let instrument_id = cmd.instrument_id;
1005 let client_order_id = Some(cmd.client_order_id);
1006 let venue_order_id = cmd.venue_order_id;
1007 let quantity = cmd.quantity;
1008 let price = cmd.price;
1009 let trigger_price = cmd.trigger_price;
1010
1011 self.spawn_task("modify_order", async move {
1012 match http_client
1013 .modify_order(
1014 instrument_id,
1015 client_order_id,
1016 venue_order_id,
1017 quantity,
1018 price,
1019 trigger_price,
1020 )
1021 .await
1022 {
1023 Ok(report) => emitter.send_order_status_report(report),
1024 Err(e) => log::error!("BitMEX modify order failed: {e:?}"),
1025 }
1026 Ok(())
1027 });
1028
1029 Ok(())
1030 }
1031
1032 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1033 self.ensure_order_identity(cmd.client_order_id, cmd.strategy_id, cmd.instrument_id);
1034 let canceller = self._canceller.clone_for_async();
1035 let emitter = self.emitter.clone();
1036 let dispatch_state = Arc::clone(&self.ws_dispatch_state);
1037 let instrument_id = cmd.instrument_id;
1038 let client_order_id = Some(cmd.client_order_id);
1039 let venue_order_id = cmd.venue_order_id;
1040
1041 self.spawn_task("cancel_order", async move {
1042 match canceller
1043 .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
1044 .await
1045 {
1046 Ok(Some(report)) => {
1047 if let Some(cid) = &report.client_order_id {
1048 dispatch_state.tombstone_order(cid);
1049 }
1050 emitter.send_order_status_report(report);
1051 }
1052 Ok(None) => {
1053 log::debug!("Order already cancelled: {client_order_id:?}");
1054 }
1055 Err(e) => log::error!("BitMEX cancel order failed: {e:?}"),
1056 }
1057 Ok(())
1058 });
1059
1060 Ok(())
1061 }
1062
1063 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1064 let canceller = self._canceller.clone_for_async();
1065 let emitter = self.emitter.clone();
1066 let dispatch_state = Arc::clone(&self.ws_dispatch_state);
1067 let instrument_id = cmd.instrument_id;
1068 let order_side = if cmd.order_side == OrderSide::NoOrderSide {
1069 log::debug!(
1070 "BitMEX cancel_all_orders received NoOrderSide for {instrument_id}, using unfiltered cancel-all",
1071 );
1072 None
1073 } else {
1074 Some(cmd.order_side)
1075 };
1076
1077 self.spawn_task("cancel_all_orders", async move {
1078 match canceller
1079 .broadcast_cancel_all(instrument_id, order_side)
1080 .await
1081 {
1082 Ok(reports) => {
1083 for report in &reports {
1084 if let Some(cid) = &report.client_order_id {
1085 dispatch_state.tombstone_order(cid);
1086 }
1087 }
1088
1089 for report in reports {
1090 emitter.send_order_status_report(report);
1091 }
1092 }
1093 Err(e) => log::error!("BitMEX cancel all failed: {e:?}"),
1094 }
1095 Ok(())
1096 });
1097
1098 Ok(())
1099 }
1100
1101 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1102 let canceller = self._canceller.clone_for_async();
1103 let emitter = self.emitter.clone();
1104 let dispatch_state = Arc::clone(&self.ws_dispatch_state);
1105 let instrument_id = cmd.instrument_id;
1106
1107 let client_ids: Vec<ClientOrderId> = cmd
1108 .cancels
1109 .iter()
1110 .map(|cancel| cancel.client_order_id)
1111 .collect();
1112
1113 let venue_ids: Vec<VenueOrderId> = cmd
1114 .cancels
1115 .iter()
1116 .filter_map(|cancel| cancel.venue_order_id)
1117 .collect();
1118
1119 let client_ids_opt = if client_ids.is_empty() {
1120 None
1121 } else {
1122 Some(client_ids)
1123 };
1124
1125 let venue_ids_opt = if venue_ids.is_empty() {
1126 None
1127 } else {
1128 Some(venue_ids)
1129 };
1130
1131 self.spawn_task("batch_cancel_orders", async move {
1132 match canceller
1133 .broadcast_batch_cancel(instrument_id, client_ids_opt, venue_ids_opt)
1134 .await
1135 {
1136 Ok(reports) => {
1137 for report in &reports {
1138 if let Some(cid) = &report.client_order_id {
1139 dispatch_state.tombstone_order(cid);
1140 }
1141 }
1142
1143 for report in reports {
1144 emitter.send_order_status_report(report);
1145 }
1146 }
1147 Err(e) => log::error!("BitMEX batch cancel failed: {e:?}"),
1148 }
1149 Ok(())
1150 });
1151
1152 Ok(())
1153 }
1154}