1#[path = "core_helpers.rs"]
19mod core_helpers;
20#[path = "core_orders.rs"]
21mod core_orders;
22#[path = "core_updates.rs"]
23mod core_updates;
24#[cfg(test)]
25#[path = "core_tests.rs"]
26mod tests;
27
28#[cfg(feature = "python")]
29use std::{
30 cell::{Cell, RefCell},
31 collections::HashMap,
32 rc::Rc,
33};
34use std::{
35 collections::VecDeque,
36 fmt::Debug,
37 str::FromStr,
38 sync::{
39 Arc, Mutex,
40 atomic::{AtomicBool, Ordering},
41 },
42 time::Duration,
43};
44
45use ahash::AHashMap;
46use anyhow::Context;
47use ibapi::{
49 accounts::PositionUpdate,
50 client::Client,
51 orders::{
52 ExecutionData, ExecutionFilter, Executions, OcaType, OrderStatus as IBOrderStatus,
53 OrderUpdate, Orders,
54 },
55};
56use nautilus_common::{
57 cache::Cache,
58 clients::ExecutionClient,
59 enums::LogLevel,
60 factories::OrderEventFactory,
61 live::{get_runtime, runner::get_exec_event_sender},
62 messages::{
63 ExecutionEvent,
64 execution::{
65 BatchCancelOrders, CancelAllOrders, CancelOrder, ExecutionReport, GenerateFillReports,
66 GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
67 GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
68 GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder,
69 SubmitOrder, SubmitOrderList,
70 },
71 },
72 msgbus::{send_account_state, switchboard::MessagingSwitchboard},
73};
74use nautilus_core::{
75 UUID4, UnixNanos,
76 time::{AtomicTime, get_atomic_clock_realtime},
77};
78use nautilus_live::ExecutionClientCore;
79#[cfg(feature = "python")]
80use nautilus_model::events::{OrderAcceptedBatch, OrderCanceledBatch, OrderSubmittedBatch};
81#[cfg(feature = "python")]
82use nautilus_model::identifiers::{ExecAlgorithmId, OrderListId, PositionId};
83#[cfg(feature = "python")]
84use nautilus_model::orders::OrderList;
85#[cfg(feature = "python")]
86use nautilus_model::python::events::order::order_event_to_pyobject;
87use nautilus_model::{
88 accounts::AccountAny,
89 enums::{
90 LiquiditySide, OmsType, OrderSide, OrderType, PositionSideSpecified, TrailingOffsetType,
91 },
92 events::{
93 AccountState, OrderAccepted, OrderCanceled, OrderEventAny, OrderPendingCancel,
94 OrderRejected, OrderSubmitted,
95 },
96 identifiers::{
97 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
98 VenueOrderId,
99 },
100 instruments::Instrument,
101 orders::{Order, any::OrderAny},
102 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
103 types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
104};
105#[cfg(feature = "python")]
106use nautilus_model::{enums::AccountType, events::OrderInitialized};
107#[cfg(feature = "python")]
108use pyo3::{IntoPyObjectExt, prelude::*};
109use rust_decimal::Decimal;
110use tokio::task::JoinHandle;
111use ustr::Ustr;
112
113use super::{
114 account::{PositionTracker, create_position_tracker},
115 parse::{parse_execution_time, parse_execution_to_fill_report, parse_order_status_to_report},
116 transform::nautilus_order_to_ib_order,
117};
118#[cfg(feature = "python")]
119use crate::common::consts::IB_VENUE;
120use crate::{
121 common::{
122 parse::{ib_contract_to_instrument_id_simple, is_spread_instrument_id},
123 shared_client::SharedClientHandle,
124 },
125 config::InteractiveBrokersExecClientConfig,
126 providers::instruments::InteractiveBrokersInstrumentProvider,
127};
128
129#[cfg_attr(
134 feature = "python",
135 pyo3::pyclass(
136 module = "nautilus_trader.core.nautilus_pyo3.interactive_brokers",
137 unsendable
138 )
139)]
140pub struct InteractiveBrokersExecutionClient {
141 core: ExecutionClientCore,
143 config: InteractiveBrokersExecClientConfig,
145 instrument_provider: Arc<InteractiveBrokersInstrumentProvider>,
147 is_connected: AtomicBool,
149 ib_client: Option<SharedClientHandle>,
151 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
153 next_order_id: Arc<Mutex<i32>>,
155 order_update_handle: Mutex<Option<JoinHandle<()>>>,
157 order_id_map: Arc<Mutex<AHashMap<ClientOrderId, i32>>>,
159 venue_order_id_map: Arc<Mutex<AHashMap<i32, ClientOrderId>>>,
161 commission_cache: Arc<Mutex<AHashMap<String, (f64, String)>>>,
163 instrument_id_map: Arc<Mutex<AHashMap<i32, InstrumentId>>>,
165 trader_id_map: Arc<Mutex<AHashMap<i32, TraderId>>>,
167 strategy_id_map: Arc<Mutex<AHashMap<i32, StrategyId>>>,
169 spread_fill_tracking: Arc<Mutex<AHashMap<ClientOrderId, ahash::AHashSet<String>>>>,
172 position_tracker: PositionTracker,
174 order_avg_prices: Arc<Mutex<AHashMap<ClientOrderId, Price>>>,
177 pending_combo_fills: Arc<Mutex<AHashMap<ClientOrderId, VecDeque<PendingComboFill>>>>,
179 pending_combo_fill_avgs: Arc<Mutex<AHashMap<ClientOrderId, VecDeque<(Decimal, Price)>>>>,
181 order_fill_progress: Arc<Mutex<AHashMap<ClientOrderId, (Decimal, Decimal)>>>,
183 accepted_orders: Arc<Mutex<ahash::AHashSet<ClientOrderId>>>,
185 pending_cancel_orders: Arc<Mutex<ahash::AHashSet<ClientOrderId>>>,
187}
188
189#[derive(Clone, Debug)]
190struct PendingComboFill {
191 account_id: AccountId,
192 instrument_id: InstrumentId,
193 venue_order_id: VenueOrderId,
194 trade_id: TradeId,
195 order_side: OrderSide,
196 last_qty: Quantity,
197 last_px: Price,
198 commission: Money,
199 liquidity_side: LiquiditySide,
200 client_order_id: ClientOrderId,
201 ts_event: UnixNanos,
202 ts_init: UnixNanos,
203}
204
205#[cfg(feature = "python")]
206static EXEC_EVENT_CALLBACK: std::sync::OnceLock<std::sync::Mutex<Option<Py<PyAny>>>> =
207 std::sync::OnceLock::new();
208
209#[cfg(feature = "python")]
210thread_local! {
211 static EXEC_EVENT_BRIDGE_INITIALIZED: Cell<bool> = const { Cell::new(false) };
212}
213
214#[cfg(feature = "python")]
215fn exec_event_callback() -> &'static std::sync::Mutex<Option<Py<PyAny>>> {
216 EXEC_EVENT_CALLBACK.get_or_init(|| std::sync::Mutex::new(None))
217}
218
219#[cfg(feature = "python")]
220fn string_hash_map_to_params(
221 params: Option<HashMap<String, String>>,
222) -> Option<nautilus_core::Params> {
223 params.map(|items| {
224 let mut mapped = nautilus_core::Params::new();
225 for (key, value) in items {
226 mapped.insert(key, serde_json::Value::String(value));
227 }
228 mapped
229 })
230}
231
232#[cfg(feature = "python")]
233fn dispatch_python_exec_event(
234 py: Python<'_>,
235 callback: &Py<PyAny>,
236 event: ExecutionEvent,
237) -> PyResult<()> {
238 let (kind, payload) = match event {
239 ExecutionEvent::Order(order_event) => {
240 ("order_event", order_event_to_pyobject(py, order_event)?)
241 }
242 ExecutionEvent::OrderSubmittedBatch(batch) => (
243 "order_submitted_batch",
244 order_submitted_batch_to_pyobject(py, batch)?,
245 ),
246 ExecutionEvent::OrderAcceptedBatch(batch) => (
247 "order_accepted_batch",
248 order_accepted_batch_to_pyobject(py, batch)?,
249 ),
250 ExecutionEvent::OrderCanceledBatch(batch) => (
251 "order_canceled_batch",
252 order_canceled_batch_to_pyobject(py, batch)?,
253 ),
254 ExecutionEvent::Report(report) => match report {
255 ExecutionReport::Order(report) => ("order_report", (*report).into_py_any(py)?),
256 ExecutionReport::Fill(report) => ("fill_report", (*report).into_py_any(py)?),
257 ExecutionReport::Position(report) => ("position_report", (*report).into_py_any(py)?),
258 ExecutionReport::MassStatus(report) => {
259 ("mass_status_report", (*report).into_py_any(py)?)
260 }
261 ExecutionReport::OrderWithFills(..) => return Ok(()),
264 },
265 ExecutionEvent::Account(account_state) => ("account_state", account_state.into_py_any(py)?),
266 };
267
268 callback.call1(py, (kind, payload))?;
269 Ok(())
270}
271
272#[cfg(feature = "python")]
273fn order_accepted_batch_to_pyobject(
274 py: Python<'_>,
275 batch: OrderAcceptedBatch,
276) -> PyResult<Py<PyAny>> {
277 batch
278 .into_iter()
279 .map(|event| order_event_to_pyobject(py, OrderEventAny::Accepted(event)))
280 .collect::<PyResult<Vec<Py<PyAny>>>>()?
281 .into_py_any(py)
282}
283
284#[cfg(feature = "python")]
285fn order_submitted_batch_to_pyobject(
286 py: Python<'_>,
287 batch: OrderSubmittedBatch,
288) -> PyResult<Py<PyAny>> {
289 batch
290 .into_iter()
291 .map(|event| order_event_to_pyobject(py, OrderEventAny::Submitted(event)))
292 .collect::<PyResult<Vec<Py<PyAny>>>>()?
293 .into_py_any(py)
294}
295
296#[cfg(feature = "python")]
297fn order_canceled_batch_to_pyobject(
298 py: Python<'_>,
299 batch: OrderCanceledBatch,
300) -> PyResult<Py<PyAny>> {
301 batch
302 .into_iter()
303 .map(|event| order_event_to_pyobject(py, OrderEventAny::Canceled(event)))
304 .collect::<PyResult<Vec<Py<PyAny>>>>()?
305 .into_py_any(py)
306}
307
308impl Debug for InteractiveBrokersExecutionClient {
309 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310 f.debug_struct(stringify!(InteractiveBrokersExecutionClient))
311 .field("core", &self.core)
312 .field("config", &self.config)
313 .field("instrument_provider", &self.instrument_provider)
314 .field("is_connected", &self.is_connected.load(Ordering::Relaxed))
315 .field("ib_client", &self.ib_client.is_some())
316 .finish_non_exhaustive()
317 }
318}
319
320impl InteractiveBrokersExecutionClient {
321 pub fn new(
333 mut core: ExecutionClientCore,
334 config: InteractiveBrokersExecClientConfig,
335 instrument_provider: Arc<InteractiveBrokersInstrumentProvider>,
336 ) -> anyhow::Result<Self> {
337 if let Some(account_id) = &config.account_id {
339 core.account_id = AccountId::from(account_id.clone());
340 }
341
342 Ok(Self {
343 core,
344 config,
345 instrument_provider,
346 is_connected: AtomicBool::new(false),
347 ib_client: None,
348 pending_tasks: Mutex::new(Vec::new()),
349 next_order_id: Arc::new(Mutex::new(0)),
350 order_update_handle: Mutex::new(None),
351 order_id_map: Arc::new(Mutex::new(AHashMap::new())),
352 venue_order_id_map: Arc::new(Mutex::new(AHashMap::new())),
353 commission_cache: Arc::new(Mutex::new(AHashMap::new())),
354 instrument_id_map: Arc::new(Mutex::new(AHashMap::new())),
355 trader_id_map: Arc::new(Mutex::new(AHashMap::new())),
356 strategy_id_map: Arc::new(Mutex::new(AHashMap::new())),
357 spread_fill_tracking: Arc::new(Mutex::new(AHashMap::new())),
358 position_tracker: create_position_tracker(),
359 order_avg_prices: Arc::new(Mutex::new(AHashMap::new())),
360 pending_combo_fills: Arc::new(Mutex::new(AHashMap::new())),
361 pending_combo_fill_avgs: Arc::new(Mutex::new(AHashMap::new())),
362 order_fill_progress: Arc::new(Mutex::new(AHashMap::new())),
363 accepted_orders: Arc::new(Mutex::new(ahash::AHashSet::new())),
364 pending_cancel_orders: Arc::new(Mutex::new(ahash::AHashSet::new())),
365 })
366 }
367
368 #[cfg(feature = "python")]
369 pub(crate) fn new_for_python(
370 mut config: InteractiveBrokersExecClientConfig,
371 instrument_provider: crate::providers::instruments::InteractiveBrokersInstrumentProvider,
372 ) -> anyhow::Result<Self> {
373 Self::ensure_python_event_bridge();
374
375 let account_id_value = config
376 .account_id
377 .clone()
378 .unwrap_or_else(|| "UNKNOWN".to_string());
379 let normalized_account_id = if account_id_value.starts_with("IB-") {
380 account_id_value
381 } else {
382 format!("IB-{account_id_value}")
383 };
384
385 config.account_id = Some(normalized_account_id.clone());
386
387 let core = ExecutionClientCore::new(
388 TraderId::from("TRADER-001"),
389 ClientId::from("IB"),
390 *IB_VENUE,
391 OmsType::Netting,
392 AccountId::from(normalized_account_id),
393 AccountType::Margin,
394 None,
395 Rc::new(RefCell::new(Cache::default())),
396 );
397
398 Self::new(core, config, Arc::new(instrument_provider))
399 }
400
401 #[cfg(feature = "python")]
402 pub(crate) fn register_python_event_callback(&self, callback: Py<PyAny>) {
403 *exec_event_callback()
404 .lock()
405 .expect("execution event callback mutex poisoned") = Some(callback);
406 }
407
408 #[cfg(feature = "python")]
409 fn ensure_python_event_bridge() {
410 if nautilus_common::live::runner::try_get_exec_event_sender().is_some() {
411 return;
412 }
413
414 EXEC_EVENT_BRIDGE_INITIALIZED.with(|initialized| {
415 if initialized.replace(true) {
416 return;
417 }
418
419 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
420 nautilus_common::live::runner::set_exec_event_sender(sender);
421
422 get_runtime().spawn(async move {
423 while let Some(event) = receiver.recv().await {
424 Python::attach(|py| {
425 let callback_guard = exec_event_callback()
426 .lock()
427 .expect("execution event callback mutex poisoned");
428
429 let Some(callback) = callback_guard.as_ref() else {
430 return;
431 };
432
433 if let Err(e) = dispatch_python_exec_event(py, callback, event) {
434 tracing::error!("Failed to dispatch IB execution event to Python: {e}");
435 }
436 });
437 }
438 });
439 });
440 }
441
442 #[cfg(feature = "python")]
443 #[allow(clippy::needless_pass_by_value)]
444 pub(crate) fn submit_order_for_python(
445 &self,
446 trader_id: TraderId,
447 order: OrderAny,
448 instrument_id: InstrumentId,
449 strategy_id: StrategyId,
450 exec_algorithm_id: Option<ExecAlgorithmId>,
451 position_id: Option<PositionId>,
452 params: Option<HashMap<String, String>>,
453 ) -> anyhow::Result<()> {
454 self.cache_order_for_python(order.clone(), position_id)?;
455
456 let cmd = SubmitOrder {
457 trader_id,
458 client_id: Some(self.client_id()),
459 strategy_id,
460 instrument_id,
461 client_order_id: order.client_order_id(),
462 order_init: order.init_event().clone(),
463 exec_algorithm_id,
464 position_id,
465 params: string_hash_map_to_params(params),
466 command_id: UUID4::new(),
467 ts_init: get_atomic_clock_realtime().get_time_ns(),
468 };
469
470 ExecutionClient::submit_order(self, cmd)
471 }
472
473 #[cfg(feature = "python")]
474 #[allow(clippy::needless_pass_by_value)]
475 pub(crate) fn submit_order_list_for_python(
476 &self,
477 trader_id: TraderId,
478 strategy_id: StrategyId,
479 orders: Vec<OrderAny>,
480 exec_algorithm_id: Option<ExecAlgorithmId>,
481 position_id: Option<PositionId>,
482 params: Option<HashMap<String, String>>,
483 ) -> anyhow::Result<()> {
484 if orders.is_empty() {
485 anyhow::bail!("Order list cannot be empty");
486 }
487
488 for order in &orders {
489 self.cache_order_for_python(order.clone(), position_id)?;
490 }
491
492 let ts_init = get_atomic_clock_realtime().get_time_ns();
493 let instrument_id = orders[0].instrument_id();
494 let client_order_ids: Vec<ClientOrderId> =
495 orders.iter().map(|o| o.client_order_id()).collect();
496 let order_list_id = OrderListId::from(UUID4::new().to_string());
497 let order_list = OrderList::new(
498 order_list_id,
499 instrument_id,
500 strategy_id,
501 client_order_ids,
502 ts_init,
503 );
504 let order_inits: Vec<OrderInitialized> =
505 orders.iter().map(|o| o.init_event().clone()).collect();
506
507 let cmd = SubmitOrderList::new(
508 trader_id,
509 Some(self.client_id()),
510 strategy_id,
511 order_list,
512 order_inits,
513 exec_algorithm_id,
514 position_id,
515 string_hash_map_to_params(params),
516 UUID4::new(),
517 ts_init,
518 );
519
520 ExecutionClient::submit_order_list(self, cmd)
521 }
522
523 #[cfg(feature = "python")]
524 pub(crate) fn modify_order_for_python(
525 &self,
526 trader_id: TraderId,
527 strategy_id: StrategyId,
528 client_order_id: ClientOrderId,
529 venue_order_id: Option<VenueOrderId>,
530 instrument_id: InstrumentId,
531 quantity: Option<Quantity>,
532 price: Option<Price>,
533 trigger_price: Option<Price>,
534 params: Option<HashMap<String, String>>,
535 ) -> anyhow::Result<()> {
536 let cmd = ModifyOrder {
537 trader_id,
538 client_id: Some(self.client_id()),
539 strategy_id,
540 instrument_id,
541 client_order_id,
542 venue_order_id,
543 quantity,
544 price,
545 trigger_price,
546 params: string_hash_map_to_params(params),
547 command_id: UUID4::new(),
548 ts_init: get_atomic_clock_realtime().get_time_ns(),
549 };
550
551 ExecutionClient::modify_order(self, cmd)
552 }
553
554 #[cfg(feature = "python")]
555 pub(crate) fn cancel_order_for_python(
556 &self,
557 trader_id: TraderId,
558 strategy_id: StrategyId,
559 client_order_id: ClientOrderId,
560 venue_order_id: Option<VenueOrderId>,
561 instrument_id: InstrumentId,
562 params: Option<HashMap<String, String>>,
563 ) -> anyhow::Result<()> {
564 let cmd = CancelOrder {
565 trader_id,
566 client_id: Some(self.client_id()),
567 strategy_id,
568 instrument_id,
569 client_order_id,
570 venue_order_id,
571 params: string_hash_map_to_params(params),
572 command_id: UUID4::new(),
573 ts_init: get_atomic_clock_realtime().get_time_ns(),
574 };
575
576 ExecutionClient::cancel_order(self, cmd)
577 }
578
579 #[cfg(feature = "python")]
580 pub(crate) fn cancel_all_orders_for_python(
581 &self,
582 trader_id: TraderId,
583 strategy_id: StrategyId,
584 instrument_id: InstrumentId,
585 order_side: OrderSide,
586 params: Option<HashMap<String, String>>,
587 ) -> anyhow::Result<()> {
588 let cmd = CancelAllOrders {
589 trader_id,
590 client_id: Some(self.client_id()),
591 strategy_id,
592 instrument_id,
593 order_side,
594 command_id: UUID4::new(),
595 ts_init: get_atomic_clock_realtime().get_time_ns(),
596 params: string_hash_map_to_params(params),
597 };
598
599 ExecutionClient::cancel_all_orders(self, cmd)
600 }
601
602 #[cfg(feature = "python")]
603 pub(crate) fn batch_cancel_orders_for_python(
604 &self,
605 trader_id: TraderId,
606 strategy_id: StrategyId,
607 instrument_id: InstrumentId,
608 client_order_ids: Vec<ClientOrderId>,
609 params: Option<HashMap<String, String>>,
610 ) -> anyhow::Result<()> {
611 let ts_init = get_atomic_clock_realtime().get_time_ns();
612 let cancels = client_order_ids
613 .into_iter()
614 .map(|client_order_id| CancelOrder {
615 trader_id,
616 client_id: Some(self.client_id()),
617 strategy_id,
618 instrument_id,
619 client_order_id,
620 venue_order_id: None,
621 command_id: UUID4::new(),
622 ts_init,
623 params: None,
624 })
625 .collect();
626
627 let cmd = BatchCancelOrders {
628 trader_id,
629 client_id: Some(self.client_id()),
630 strategy_id,
631 instrument_id,
632 cancels,
633 command_id: UUID4::new(),
634 ts_init,
635 params: string_hash_map_to_params(params),
636 };
637
638 ExecutionClient::batch_cancel_orders(self, cmd)
639 }
640
641 #[cfg(feature = "python")]
642 pub(crate) fn query_account_for_python(&self, trader_id: TraderId) -> anyhow::Result<()> {
643 let cmd = QueryAccount {
644 trader_id,
645 client_id: Some(self.client_id()),
646 account_id: ExecutionClient::account_id(self),
647 command_id: UUID4::new(),
648 ts_init: get_atomic_clock_realtime().get_time_ns(),
649 params: None,
650 };
651
652 ExecutionClient::query_account(self, cmd)
653 }
654
655 #[cfg(feature = "python")]
656 pub(crate) fn query_order_for_python(
657 &self,
658 trader_id: TraderId,
659 strategy_id: StrategyId,
660 instrument_id: InstrumentId,
661 client_order_id: ClientOrderId,
662 venue_order_id: Option<VenueOrderId>,
663 ) -> anyhow::Result<()> {
664 let cmd = QueryOrder {
665 trader_id,
666 client_id: Some(self.client_id()),
667 strategy_id,
668 instrument_id,
669 client_order_id,
670 venue_order_id,
671 command_id: UUID4::new(),
672 ts_init: get_atomic_clock_realtime().get_time_ns(),
673 params: None,
674 };
675
676 ExecutionClient::query_order(self, cmd)
677 }
678
679 #[cfg(feature = "python")]
680 pub(crate) async fn generate_order_status_report_for_python(
681 &self,
682 instrument_id: Option<InstrumentId>,
683 client_order_id: Option<ClientOrderId>,
684 venue_order_id: Option<VenueOrderId>,
685 ) -> anyhow::Result<Option<OrderStatusReport>> {
686 let cmd = GenerateOrderStatusReport {
687 command_id: UUID4::new(),
688 ts_init: get_atomic_clock_realtime().get_time_ns(),
689 instrument_id,
690 client_order_id,
691 venue_order_id,
692 params: None,
693 correlation_id: None,
694 };
695
696 self.generate_order_status_report(&cmd).await
697 }
698
699 #[cfg(feature = "python")]
700 pub(crate) async fn generate_order_status_reports_for_python(
701 &self,
702 open_only: bool,
703 instrument_id: Option<InstrumentId>,
704 start: Option<u64>,
705 end: Option<u64>,
706 ) -> anyhow::Result<Vec<OrderStatusReport>> {
707 let start_ns = start.map(nautilus_core::UnixNanos::from);
708 let end_ns = end.map(nautilus_core::UnixNanos::from);
709
710 let cmd = GenerateOrderStatusReports {
711 command_id: UUID4::new(),
712 ts_init: get_atomic_clock_realtime().get_time_ns(),
713 open_only,
714 instrument_id,
715 start: start_ns,
716 end: end_ns,
717 params: None,
718 log_receipt_level: LogLevel::Info,
719 correlation_id: None,
720 };
721
722 let mut reports = self.generate_order_status_reports(&cmd).await?;
723
724 if open_only {
725 use nautilus_model::enums::OrderStatus;
726 reports.retain(|report| {
727 matches!(
728 report.order_status,
729 OrderStatus::Initialized
730 | OrderStatus::Submitted
731 | OrderStatus::Accepted
732 | OrderStatus::Triggered
733 | OrderStatus::PendingUpdate
734 | OrderStatus::PendingCancel
735 )
736 });
737 }
738
739 if start_ns.is_some() || end_ns.is_some() {
740 reports.retain(|report| {
741 let ts = report.ts_last;
742
743 if let Some(start) = start_ns
744 && ts < start
745 {
746 return false;
747 }
748
749 if let Some(end) = end_ns
750 && ts > end
751 {
752 return false;
753 }
754
755 true
756 });
757 }
758
759 Ok(reports)
760 }
761
762 #[cfg(feature = "python")]
763 pub(crate) async fn generate_fill_reports_for_python(
764 &self,
765 instrument_id: Option<InstrumentId>,
766 venue_order_id: Option<VenueOrderId>,
767 start: Option<u64>,
768 end: Option<u64>,
769 ) -> anyhow::Result<Vec<FillReport>> {
770 let cmd = GenerateFillReports {
771 command_id: UUID4::new(),
772 ts_init: get_atomic_clock_realtime().get_time_ns(),
773 instrument_id,
774 venue_order_id,
775 start: start.map(nautilus_core::UnixNanos::from),
776 end: end.map(nautilus_core::UnixNanos::from),
777 params: None,
778 log_receipt_level: LogLevel::Info,
779 correlation_id: None,
780 };
781
782 self.generate_fill_reports(cmd).await
783 }
784
785 #[cfg(feature = "python")]
786 pub(crate) async fn generate_position_status_reports_for_python(
787 &self,
788 instrument_id: Option<InstrumentId>,
789 start: Option<u64>,
790 end: Option<u64>,
791 ) -> anyhow::Result<Vec<PositionStatusReport>> {
792 let cmd = GeneratePositionStatusReports {
793 command_id: UUID4::new(),
794 ts_init: get_atomic_clock_realtime().get_time_ns(),
795 instrument_id,
796 start: start.map(nautilus_core::UnixNanos::from),
797 end: end.map(nautilus_core::UnixNanos::from),
798 params: None,
799 log_receipt_level: LogLevel::Info,
800 correlation_id: None,
801 };
802
803 self.generate_position_status_reports(&cmd).await
804 }
805
806 #[cfg(feature = "python")]
807 pub(crate) fn cache_order_for_python(
808 &self,
809 order: OrderAny,
810 position_id: Option<PositionId>,
811 ) -> anyhow::Result<()> {
812 self.core
813 .cache_mut()
814 .add_order(order, position_id, Some(self.client_id()), true)
815 }
816
817 fn reserve_next_local_order_id(next_order_id: &Arc<Mutex<i32>>) -> anyhow::Result<i32> {
818 let mut guard = next_order_id
819 .lock()
820 .map_err(|_| anyhow::anyhow!("Failed to lock next order ID"))?;
821 anyhow::ensure!(
822 *guard > 0,
823 "No valid Interactive Brokers order ID available"
824 );
825 let order_id = *guard;
826 *guard += 1;
827 Ok(order_id)
828 }
829
830 async fn get_next_order_id(&self) -> anyhow::Result<i32> {
836 let client = self.ib_client.as_ref().context("IB client not connected")?;
837
838 let timeout_dur = Duration::from_secs(self.config.request_timeout);
839 let order_id = tokio::time::timeout(timeout_dur, client.next_valid_order_id())
840 .await
841 .context("Timeout getting next order ID")??;
842 Ok(order_id)
843 }
844
845 async fn get_highest_open_order_id(&self, client: &Client) -> anyhow::Result<Option<i32>> {
846 let timeout_dur = Duration::from_secs(self.config.request_timeout);
847 let mut subscription = tokio::time::timeout(timeout_dur, client.all_open_orders())
848 .await
849 .context("Timeout requesting open orders for next order ID initialization")??;
850 let mut highest_order_id = None;
851
852 while let Some(order_result) = subscription.next().await {
853 match order_result {
854 Ok(Orders::OrderData(data)) => {
855 highest_order_id = Some(
856 highest_order_id
857 .map_or(data.order_id, |current: i32| current.max(data.order_id)),
858 );
859 }
860 Ok(_) => {}
861 Err(e) => {
862 tracing::debug!(
863 "Ignoring open-order event while initializing next order ID: {e}"
864 );
865 }
866 }
867 }
868
869 Ok(highest_order_id)
870 }
871
872 fn abort_pending_tasks(&self) {
874 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
875 for task in tasks.drain(..) {
876 task.abort();
877 }
878
879 if let Some(handle) = self
880 .order_update_handle
881 .lock()
882 .expect(MUTEX_POISONED)
883 .take()
884 {
885 handle.abort();
886 }
887 }
888}
889
890#[async_trait::async_trait(?Send)]
892impl ExecutionClient for InteractiveBrokersExecutionClient {
893 fn is_connected(&self) -> bool {
894 self.is_connected.load(Ordering::Relaxed)
895 }
896
897 fn client_id(&self) -> ClientId {
898 self.core.client_id
899 }
900
901 fn account_id(&self) -> AccountId {
902 self.core.account_id
903 }
904
905 fn venue(&self) -> Venue {
906 self.core.venue
907 }
908
909 fn oms_type(&self) -> OmsType {
910 self.core.oms_type
911 }
912
913 fn get_account(&self) -> Option<AccountAny> {
914 self.core.cache().account(&self.core.account_id).cloned()
915 }
916
917 fn generate_account_state(
918 &self,
919 balances: Vec<AccountBalance>,
920 margins: Vec<MarginBalance>,
921 reported: bool,
922 ts_event: UnixNanos,
923 ) -> anyhow::Result<()> {
924 let factory = OrderEventFactory::new(
925 self.core.trader_id,
926 self.core.account_id,
927 self.core.account_type,
928 self.core.base_currency,
929 );
930 let state = factory.generate_account_state(
931 balances,
932 margins,
933 reported,
934 ts_event,
935 get_atomic_clock_realtime().get_time_ns(),
936 );
937 get_exec_event_sender()
938 .send(ExecutionEvent::Account(state))
939 .map_err(|e| anyhow::anyhow!("Failed to send account state: {e}"))
940 }
941
942 fn start(&mut self) -> anyhow::Result<()> {
943 Ok(())
945 }
946
947 fn stop(&mut self) -> anyhow::Result<()> {
948 self.abort_pending_tasks();
949 Ok(())
950 }
951
952 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
953 let client = self.ib_client.as_ref().context("IB client not connected")?;
954
955 let order_id_map = Arc::clone(&self.order_id_map);
956 let venue_order_id_map = Arc::clone(&self.venue_order_id_map);
957 let instrument_id_map = Arc::clone(&self.instrument_id_map);
958 let trader_id_map = Arc::clone(&self.trader_id_map);
959 let strategy_id_map = Arc::clone(&self.strategy_id_map);
960 let next_order_id = Arc::clone(&self.next_order_id);
961 let instrument_provider = Arc::clone(&self.instrument_provider);
962 let exec_sender = get_exec_event_sender();
963 let clock = get_atomic_clock_realtime();
964 let accepted_orders = Arc::clone(&self.accepted_orders);
965
966 let client_clone = client.as_arc().clone();
967
968 let account_id = self.core.account_id;
969
970 let handle = get_runtime().spawn(async move {
971 if let Err(e) = Self::handle_submit_order_async(
972 &cmd,
973 &client_clone,
974 &order_id_map,
975 &venue_order_id_map,
976 &instrument_id_map,
977 &trader_id_map,
978 &strategy_id_map,
979 &next_order_id,
980 &instrument_provider,
981 &exec_sender,
982 clock,
983 account_id,
984 &accepted_orders,
985 )
986 .await
987 {
988 tracing::error!("Error submitting order: {e}");
989 }
990 });
991
992 self.pending_tasks
993 .lock()
994 .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
995 .push(handle);
996
997 Ok(())
998 }
999
1000 async fn connect(&mut self) -> anyhow::Result<()> {
1001 if self.is_connected.load(Ordering::Relaxed) {
1002 log::debug!("Interactive Brokers execution client already connected");
1003 return Ok(());
1004 }
1005
1006 tracing::info!("Connecting Interactive Brokers execution client...");
1007 log::debug!(
1008 "Execution client config host={} port={} client_id={} account_id={:?} request_timeout={} connection_timeout={} fetch_all_open_orders={} track_option_exercise_from_position_update={}",
1009 self.config.host,
1010 self.config.port,
1011 self.config.client_id,
1012 self.config.account_id,
1013 self.config.request_timeout,
1014 self.config.connection_timeout,
1015 self.config.fetch_all_open_orders,
1016 self.config.track_option_exercise_from_position_update
1017 );
1018
1019 let handle = crate::common::shared_client::get_or_connect(
1020 &self.config.host,
1021 self.config.port,
1022 self.config.client_id,
1023 self.config.connection_timeout,
1024 )
1025 .await
1026 .context("Failed to connect to IB Gateway/TWS")?;
1027
1028 tracing::info!(
1029 "Connected to IB Gateway/TWS at {}:{} (client_id: {})",
1030 self.config.host,
1031 self.config.port,
1032 self.config.client_id
1033 );
1034
1035 self.ib_client = Some(handle);
1036
1037 log::debug!("Initializing IB execution instrument provider");
1039 if let Err(e) = self.instrument_provider.initialize().await {
1040 tracing::warn!("Failed to initialize instrument provider: {}", e);
1041 }
1042
1043 log::debug!("Loading configured IB execution instruments");
1045
1046 if let Err(e) = self
1047 .instrument_provider
1048 .load_all_async(
1049 self.ib_client.as_ref().unwrap().as_arc().as_ref(),
1050 None,
1051 None,
1052 false,
1053 )
1054 .await
1055 {
1056 tracing::warn!("Failed to load instruments on startup: {}", e);
1057 }
1058
1059 let client = self.ib_client.as_ref().unwrap().as_arc();
1060 log::debug!("Preloading cached spread instruments for execution client");
1061 self.preload_cached_spread_instruments(client.as_ref())
1062 .await?;
1063
1064 log::debug!("Requesting next valid IB order ID");
1066 let next_id = self.get_next_order_id().await?;
1067 log::debug!("Requesting highest open IB order ID");
1068 let highest_open_order_id = self.get_highest_open_order_id(client.as_ref()).await?;
1069 let starting_order_id = highest_open_order_id
1070 .map(|order_id| next_id.max(order_id.saturating_add(1)))
1071 .unwrap_or(next_id);
1072 if starting_order_id != next_id {
1073 tracing::info!(
1074 "Adjusted next Interactive Brokers order ID from {} to {} based on existing open orders",
1075 next_id,
1076 starting_order_id
1077 );
1078 } else {
1079 tracing::info!(
1080 "Initialized next Interactive Brokers order ID to {}",
1081 starting_order_id
1082 );
1083 }
1084 {
1085 let mut id = self
1086 .next_order_id
1087 .lock()
1088 .map_err(|_| anyhow::anyhow!("Failed to lock next order ID"))?;
1089 *id = starting_order_id;
1090 }
1091
1092 log::debug!("Starting IB order update stream");
1094 self.start_order_updates().await?;
1095
1096 let client_for_account = Arc::clone(client);
1099 let account_id = self.core.account_id;
1100 let _exec_client_core = self.core.clone(); log::debug!("Subscribing to IB account summary for {}", account_id);
1102 match crate::execution::account::subscribe_account_summary(&client_for_account, account_id)
1103 .await
1104 {
1105 Ok((balances, margins)) => {
1106 tracing::info!(
1107 "Received account summary: {} balances, {} margins",
1108 balances.len(),
1109 margins.len()
1110 );
1111 let ts_event = get_atomic_clock_realtime().get_time_ns();
1113
1114 if let Err(e) = ExecutionClient::generate_account_state(
1115 self, balances, margins, true, ts_event,
1117 ) {
1118 tracing::warn!("Failed to generate account state: {}", e);
1119 }
1120 }
1121 Err(e) => {
1122 tracing::warn!("Failed to subscribe to account summary: {}", e);
1123 }
1124 }
1125
1126 let client_for_positions_init = Arc::clone(client);
1129 let position_tracker_init = Arc::clone(&self.position_tracker);
1130
1131 log::debug!("Initializing IB execution position tracking");
1132 if let Err(e) = crate::execution::account::initialize_position_tracking(
1133 &client_for_positions_init,
1134 self.core.account_id,
1135 position_tracker_init,
1136 )
1137 .await
1138 {
1139 tracing::warn!("Failed to initialize position tracking: {}", e);
1140 }
1141
1142 let client_for_pnl = Arc::clone(client); log::debug!("Subscribing to IB PnL updates");
1146
1147 if let Err(e) =
1148 crate::execution::account::subscribe_pnl(&client_for_pnl, self.core.account_id).await
1149 {
1150 tracing::warn!("Failed to subscribe to PnL: {}", e);
1151 }
1152
1153 if self.config.track_option_exercise_from_position_update {
1155 let client_for_positions = Arc::clone(client);
1156 let position_tracker_clone = Arc::clone(&self.position_tracker);
1157 let instrument_provider_clone = Arc::clone(&self.instrument_provider);
1158
1159 log::debug!("Subscribing to IB position updates for option exercise tracking");
1160
1161 if let Err(e) = crate::execution::account::subscribe_positions(
1162 &client_for_positions,
1163 self.core.account_id,
1164 position_tracker_clone,
1165 instrument_provider_clone,
1166 )
1167 .await
1168 {
1169 tracing::warn!("Failed to subscribe to positions: {}", e);
1170 }
1171 }
1172
1173 self.is_connected.store(true, Ordering::Relaxed);
1174 self.core.set_connected();
1175
1176 tracing::info!("Connected Interactive Brokers execution client");
1177 Ok(())
1178 }
1179
1180 async fn disconnect(&mut self) -> anyhow::Result<()> {
1181 if !self.is_connected.load(Ordering::Relaxed) {
1182 log::debug!("Interactive Brokers execution client already disconnected");
1183 return Ok(());
1184 }
1185
1186 tracing::info!("Disconnecting Interactive Brokers execution client...");
1187
1188 self.abort_pending_tasks();
1190
1191 if self.ib_client.is_some() {
1195 tracing::debug!("Dropping IB client connection");
1196 }
1197
1198 self.ib_client = None;
1199 self.is_connected.store(false, Ordering::Relaxed);
1200 self.core.set_disconnected();
1201
1202 tracing::info!("Disconnected Interactive Brokers execution client");
1203 Ok(())
1204 }
1205
1206 async fn generate_order_status_report(
1207 &self,
1208 cmd: &GenerateOrderStatusReport,
1209 ) -> anyhow::Result<Option<OrderStatusReport>> {
1210 let plural_cmd = GenerateOrderStatusReports {
1211 command_id: cmd.command_id,
1212 ts_init: cmd.ts_init,
1213 open_only: false,
1214 instrument_id: cmd.instrument_id,
1215 start: None,
1216 end: None,
1217 params: cmd.params.clone(),
1218 log_receipt_level: LogLevel::Info,
1219 correlation_id: cmd.correlation_id,
1220 };
1221
1222 let reports = self.generate_order_status_reports(&plural_cmd).await?;
1223
1224 let report = reports.into_iter().find(|r| {
1226 let matches_client = if let Some(filter_client_id) = cmd.client_order_id {
1227 r.client_order_id == Some(filter_client_id)
1228 } else {
1229 true
1230 };
1231 let matches_venue = if let Some(filter_venue_id) = cmd.venue_order_id {
1232 r.venue_order_id == filter_venue_id
1233 } else {
1234 true
1235 };
1236 matches_client && matches_venue
1237 });
1238
1239 Ok(report)
1240 }
1241
1242 async fn generate_order_status_reports(
1243 &self,
1244 cmd: &GenerateOrderStatusReports,
1245 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1246 let client = self.ib_client.as_ref().context("IB client not connected")?;
1247
1248 let timeout_dur = Duration::from_secs(self.config.request_timeout);
1249 let mut subscription = tokio::time::timeout(timeout_dur, client.all_open_orders())
1250 .await
1251 .context("Timeout requesting open orders")??;
1252 let mut reports = Vec::new();
1253 let ts_init = get_atomic_clock_realtime().get_time_ns();
1254
1255 while let Some(order_result) = subscription.next().await {
1256 match order_result {
1257 Ok(Orders::OrderData(data)) => {
1258 let instrument_id = ib_contract_to_instrument_id_simple(&data.contract)
1260 .context("Failed to convert contract to instrument ID")?;
1261
1262 if let Some(filter_id) = cmd.instrument_id {
1264 if instrument_id != filter_id {
1265 continue;
1266 }
1267 }
1268
1269 match parse_order_status_to_report(
1272 &IBOrderStatus {
1273 order_id: data.order_id,
1274 status: data.order_state.status.clone(),
1275 filled: 0.0, remaining: 0.0, average_fill_price: 0.0, perm_id: data.order.perm_id,
1279 parent_id: 0, last_fill_price: 0.0, client_id: data.order.client_id,
1282 why_held: String::new(), market_cap_price: 0.0, },
1285 Some(&data.order),
1286 instrument_id,
1287 self.core.account_id,
1288 &self.instrument_provider,
1289 ts_init,
1290 ) {
1291 Ok(report) => reports.push(report),
1292 Err(e) => {
1293 tracing::warn!("Failed to parse order status report: {e}");
1294 }
1295 }
1296 }
1297 Ok(_) => {
1298 }
1300 Err(e) => {
1301 tracing::warn!("Error receiving order data: {e}");
1302 }
1303 }
1304 }
1305
1306 Ok(reports)
1307 }
1308
1309 async fn generate_fill_reports(
1310 &self,
1311 cmd: GenerateFillReports,
1312 ) -> anyhow::Result<Vec<FillReport>> {
1313 let client = self.ib_client.as_ref().context("IB client not connected")?;
1314
1315 let account_code = self.core.account_id.to_string();
1317
1318 let time_filter = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1320 let start_dt = start.to_datetime_utc();
1323 let end_dt = end.to_datetime_utc();
1324 format!("{} {}", start_dt.format("%Y%m%d"), end_dt.format("%Y%m%d"))
1325 } else {
1326 String::new()
1327 };
1328
1329 let filter = ExecutionFilter {
1330 client_id: None,
1331 account_code,
1332 time: time_filter,
1333 symbol: String::new(),
1334 security_type: String::new(),
1335 exchange: String::new(),
1336 side: String::new(),
1337 last_n_days: 0,
1338 specific_dates: Vec::new(),
1339 };
1340
1341 let timeout_dur = Duration::from_secs(self.config.request_timeout);
1342 let mut subscription = tokio::time::timeout(timeout_dur, client.executions(filter))
1343 .await
1344 .context("Timeout requesting executions")??;
1345 let mut reports = Vec::new();
1346 let ts_init = get_atomic_clock_realtime().get_time_ns();
1347 let mut current_exec_data: Option<ExecutionData> = None;
1348
1349 while let Some(exec_result) = subscription.next().await {
1350 match exec_result {
1351 Ok(Executions::ExecutionData(exec_data)) => {
1352 current_exec_data = Some(exec_data);
1353 }
1354 Ok(Executions::CommissionReport(commission)) => {
1355 if let Some(exec_data) = current_exec_data.take() {
1356 let instrument_id =
1358 ib_contract_to_instrument_id_simple(&exec_data.contract)
1359 .context("Failed to convert contract to instrument ID")?;
1360
1361 if let Some(filter_id) = cmd.instrument_id
1363 && instrument_id != filter_id
1364 {
1365 continue;
1366 }
1367
1368 match parse_execution_to_fill_report(
1370 &exec_data.execution,
1371 &exec_data.contract,
1372 commission.commission,
1373 &commission.currency,
1374 instrument_id,
1375 self.core.account_id,
1376 &self.instrument_provider,
1377 ts_init,
1378 None, ) {
1380 Ok(report) => reports.push(report),
1381 Err(e) => {
1382 tracing::warn!("Failed to parse fill report: {e}");
1383 }
1384 }
1385 }
1386 }
1387 Ok(_) => {
1388 }
1390 Err(e) => {
1391 tracing::warn!("Error receiving execution data: {e}");
1392 }
1393 }
1394 }
1395
1396 Ok(reports)
1397 }
1398
1399 async fn generate_position_status_reports(
1400 &self,
1401 cmd: &GeneratePositionStatusReports,
1402 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1403 let client = self.ib_client.as_ref().context("IB client not connected")?;
1404
1405 let timeout_dur = Duration::from_secs(self.config.request_timeout);
1406 let mut subscription = tokio::time::timeout(timeout_dur, client.positions())
1407 .await
1408 .context("Timeout requesting positions")??;
1409 let mut reports = Vec::new();
1410 let ts_init = get_atomic_clock_realtime().get_time_ns();
1411
1412 while let Some(position_result) = subscription.next().await {
1415 match position_result {
1416 Ok(PositionUpdate::Position(position)) => {
1417 if position.account != self.core.account_id.to_string() {
1419 continue;
1420 }
1421
1422 let instrument_id = ib_contract_to_instrument_id_simple(&position.contract)
1424 .context("Failed to convert contract to instrument ID")?;
1425
1426 if let Some(filter_id) = cmd.instrument_id
1428 && instrument_id != filter_id
1429 {
1430 continue;
1431 }
1432
1433 let instrument = self
1435 .instrument_provider
1436 .find(&instrument_id)
1437 .context("Instrument not found")?;
1438
1439 let position_side = if position.position == 0.0 {
1441 PositionSideSpecified::Flat
1442 } else if position.position > 0.0 {
1443 PositionSideSpecified::Long
1444 } else {
1445 PositionSideSpecified::Short
1446 };
1447
1448 let quantity =
1449 Quantity::new(position.position.abs(), instrument.size_precision());
1450
1451 let avg_px_open = if position.average_cost > 0.0 {
1454 let price_magnifier =
1455 self.instrument_provider.get_price_magnifier(&instrument_id) as f64;
1456 let multiplier = instrument.multiplier().as_f64();
1457 let converted_avg_cost =
1458 position.average_cost / (multiplier * price_magnifier);
1459 let price_precision = instrument.price_precision();
1460 Some(
1461 rust_decimal::Decimal::from_f64_retain(converted_avg_cost)
1462 .and_then(|d| {
1463 let rounded = d.round_dp(price_precision as u32);
1465 Some(rounded)
1466 })
1467 .unwrap_or_default(),
1468 )
1469 } else {
1470 None
1471 };
1472
1473 let report = PositionStatusReport::new(
1474 self.core.account_id,
1475 instrument_id,
1476 position_side,
1477 quantity,
1478 ts_init, ts_init, None, None, avg_px_open,
1483 );
1484
1485 reports.push(report);
1486 }
1487 Ok(PositionUpdate::PositionEnd) => {
1488 break;
1490 }
1491 Err(e) => {
1492 tracing::warn!("Error receiving position data: {e}");
1493 }
1494 }
1495 }
1496
1497 Ok(reports)
1498 }
1499
1500 async fn generate_mass_status(
1501 &self,
1502 lookback_mins: Option<u64>,
1503 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1504 let ts_now = get_atomic_clock_realtime().get_time_ns();
1505 let start = lookback_mins.map(|mins| {
1506 let lookback_ns = mins * 60 * 1_000_000_000;
1507 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1508 });
1509
1510 let order_cmd = GenerateOrderStatusReportsBuilder::default()
1511 .ts_init(ts_now)
1512 .open_only(false)
1513 .start(start)
1514 .build()
1515 .map_err(|e| anyhow::anyhow!("{e}"))?;
1516
1517 let fill_cmd = GenerateFillReportsBuilder::default()
1518 .ts_init(ts_now)
1519 .start(start)
1520 .build()
1521 .map_err(|e| anyhow::anyhow!("{e}"))?;
1522
1523 let position_cmd = GeneratePositionStatusReportsBuilder::default()
1524 .ts_init(ts_now)
1525 .start(start)
1526 .build()
1527 .map_err(|e| anyhow::anyhow!("{e}"))?;
1528
1529 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
1530 self.generate_order_status_reports(&order_cmd),
1531 self.generate_fill_reports(fill_cmd),
1532 self.generate_position_status_reports(&position_cmd),
1533 )?;
1534
1535 tracing::info!(
1536 "generate_mass_status: {} order reports, {} fill reports, {} position reports",
1537 order_reports.len(),
1538 fill_reports.len(),
1539 position_reports.len()
1540 );
1541
1542 let mut mass_status = ExecutionMassStatus::new(
1543 self.core.client_id,
1544 self.core.account_id,
1545 self.core.venue,
1546 ts_now,
1547 Some(UUID4::new()),
1548 );
1549
1550 mass_status.add_order_reports(order_reports);
1551 mass_status.add_fill_reports(fill_reports);
1552 mass_status.add_position_reports(position_reports);
1553
1554 Ok(Some(mass_status))
1555 }
1556
1557 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1558 let client = self.ib_client.as_ref().context("IB client not connected")?;
1559
1560 let client_clone = client.as_arc().clone();
1561 let account_id = self.core.account_id;
1562 let account_type = self.core.account_type;
1563 let base_currency = self.core.base_currency;
1564 let clock = get_atomic_clock_realtime();
1565 let request_timeout_secs = self.config.request_timeout;
1566
1567 let handle = get_runtime().spawn(async move {
1568 let timeout_dur = Duration::from_secs(request_timeout_secs);
1569 let result = tokio::time::timeout(
1570 timeout_dur,
1571 crate::execution::account::subscribe_account_summary(&client_clone, account_id),
1572 )
1573 .await;
1574
1575 match result {
1576 Ok(Ok((balances, margins))) => {
1577 let ts_event = clock.get_time_ns();
1578 let ts_now = clock.get_time_ns();
1579
1580 let account_state = AccountState::new(
1581 account_id,
1582 account_type,
1583 balances,
1584 margins,
1585 true,
1586 UUID4::new(),
1587 ts_event,
1588 ts_now,
1589 base_currency,
1590 );
1591
1592 let endpoint = MessagingSwitchboard::portfolio_update_account();
1593 send_account_state(endpoint, &account_state);
1594 }
1595 Ok(Err(e)) => {
1596 tracing::error!("Failed to query account state: {e}");
1597 }
1598 Err(_) => {
1599 tracing::error!("Timeout waiting for account summary");
1600 }
1601 }
1602 });
1603
1604 self.pending_tasks
1605 .lock()
1606 .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
1607 .push(handle);
1608
1609 Ok(())
1610 }
1611
1612 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1613 let client = self.ib_client.as_ref().context("IB client not connected")?;
1614 let client_order_id = cmd.client_order_id;
1615 let trader_id = cmd.trader_id;
1616 let strategy_id = cmd.strategy_id;
1617 let instrument_id = cmd.instrument_id;
1618
1619 let target_ib_order_id: i32 = if let Some(venue_order_id) = &cmd.venue_order_id {
1620 venue_order_id
1621 .as_str()
1622 .parse()
1623 .context("Failed to parse venue_order_id as IB order id")?
1624 } else {
1625 let map = self
1626 .order_id_map
1627 .lock()
1628 .map_err(|_| anyhow::anyhow!("Failed to lock order_id_map"))?;
1629 *map.get(&cmd.client_order_id)
1630 .context("No venue order id for client_order_id")?
1631 };
1632
1633 let client_clone = client.as_arc().clone();
1634 let instrument_id_map = Arc::clone(&self.instrument_id_map);
1635 let instrument_provider = Arc::clone(&self.instrument_provider);
1636 let account_id = self.core.account_id;
1637 let exec_sender = get_exec_event_sender();
1638 let ts_init = get_atomic_clock_realtime().get_time_ns();
1639 let request_timeout_secs = self.config.request_timeout;
1640 let pending_cancel_orders = Arc::clone(&self.pending_cancel_orders);
1641
1642 let handle = get_runtime().spawn(async move {
1643 let timeout_dur = Duration::from_secs(request_timeout_secs);
1644 let mut subscription =
1645 match tokio::time::timeout(timeout_dur, client_clone.all_open_orders()).await {
1646 Ok(Ok(s)) => s,
1647 Ok(Err(e)) => {
1648 tracing::error!("query_order: failed to request open orders: {e}");
1649 return;
1650 }
1651 Err(_) => {
1652 tracing::error!("query_order: timeout requesting open orders");
1653 return;
1654 }
1655 };
1656
1657 while let Some(order_result) = subscription.next().await {
1658 if let Ok(Orders::OrderData(data)) = order_result {
1659 if data.order_id != target_ib_order_id {
1660 continue;
1661 }
1662
1663 let instrument_id = match instrument_id_map.lock() {
1664 Ok(map) => map.get(&data.order_id).copied(),
1665 Err(_) => None,
1666 };
1667 let instrument_id = match instrument_id {
1668 Some(id) => id,
1669 None => match ib_contract_to_instrument_id_simple(&data.contract) {
1670 Ok(id) => id,
1671 Err(e) => {
1672 tracing::warn!("query_order: failed to convert contract: {e}");
1673 return;
1674 }
1675 },
1676 };
1677
1678 let report = match parse_order_status_to_report(
1679 &IBOrderStatus {
1680 order_id: data.order_id,
1681 status: data.order_state.status.clone(),
1682 filled: 0.0,
1683 remaining: 0.0,
1684 average_fill_price: 0.0,
1685 perm_id: data.order.perm_id,
1686 parent_id: 0,
1687 last_fill_price: 0.0,
1688 client_id: data.order.client_id,
1689 why_held: String::new(),
1690 market_cap_price: 0.0,
1691 },
1692 Some(&data.order),
1693 instrument_id,
1694 account_id,
1695 &instrument_provider,
1696 ts_init,
1697 ) {
1698 Ok(r) => r,
1699 Err(e) => {
1700 tracing::warn!("query_order: failed to parse order status: {e}");
1701 return;
1702 }
1703 };
1704
1705 if exec_sender
1706 .send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
1707 report,
1708 ))))
1709 .is_err()
1710 {
1711 tracing::error!("query_order: failed to send order status report");
1712 }
1713 return;
1714 }
1715 }
1716
1717 let was_pending_cancel = pending_cancel_orders
1718 .lock()
1719 .map(|mut pending| pending.remove(&client_order_id))
1720 .unwrap_or(false);
1721
1722 if was_pending_cancel {
1723 let event = OrderCanceled::new(
1724 trader_id,
1725 strategy_id,
1726 instrument_id,
1727 client_order_id,
1728 UUID4::new(),
1729 ts_init,
1730 ts_init,
1731 false,
1732 Some(VenueOrderId::from(target_ib_order_id.to_string())),
1733 Some(account_id),
1734 );
1735
1736 if exec_sender
1737 .send(ExecutionEvent::Order(OrderEventAny::Canceled(event)))
1738 .is_err()
1739 {
1740 tracing::error!("query_order: failed to send inferred order canceled event");
1741 } else {
1742 tracing::info!(
1743 "query_order: inferred cancel for {} from missing open order {}",
1744 client_order_id,
1745 target_ib_order_id
1746 );
1747 }
1748 return;
1749 }
1750
1751 tracing::debug!(
1752 "query_order: order {} not found in open orders (may be filled or canceled)",
1753 target_ib_order_id
1754 );
1755 });
1756
1757 self.pending_tasks
1758 .lock()
1759 .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
1760 .push(handle);
1761
1762 Ok(())
1763 }
1764
1765 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
1766 let client = self.ib_client.as_ref().context("IB client not connected")?;
1767
1768 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
1769
1770 let order_id_map = Arc::clone(&self.order_id_map);
1771 let venue_order_id_map = Arc::clone(&self.venue_order_id_map);
1772 let instrument_id_map = Arc::clone(&self.instrument_id_map);
1773 let trader_id_map = Arc::clone(&self.trader_id_map);
1774 let strategy_id_map = Arc::clone(&self.strategy_id_map);
1775 let next_order_id = Arc::clone(&self.next_order_id);
1776 let instrument_provider = Arc::clone(&self.instrument_provider);
1777 let exec_sender = get_exec_event_sender();
1778 let clock = get_atomic_clock_realtime();
1779 let account_id = self.core.account_id;
1780 let strategy_id = cmd.strategy_id;
1781 let accepted_orders = Arc::clone(&self.accepted_orders);
1782 let client_clone = client.as_arc().clone();
1783
1784 let handle = get_runtime().spawn(async move {
1785 if let Err(e) = Self::handle_submit_order_list_async(
1786 &cmd,
1787 &orders,
1788 &client_clone,
1789 &order_id_map,
1790 &venue_order_id_map,
1791 &instrument_id_map,
1792 &trader_id_map,
1793 &strategy_id_map,
1794 &next_order_id,
1795 &instrument_provider,
1796 &exec_sender,
1797 clock,
1798 account_id,
1799 strategy_id,
1800 &accepted_orders,
1801 )
1802 .await
1803 {
1804 tracing::error!("Error submitting order list: {e}");
1805 }
1806 });
1807
1808 self.pending_tasks
1809 .lock()
1810 .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
1811 .push(handle);
1812
1813 Ok(())
1814 }
1815
1816 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1817 let client = self.ib_client.as_ref().context("IB client not connected")?;
1818
1819 let original_order = {
1821 let cache = self.core.cache();
1822 cache
1823 .order(&cmd.client_order_id)
1824 .cloned()
1825 .context("Order not found in cache")?
1826 };
1827
1828 let order_id_map = Arc::clone(&self.order_id_map);
1829 let venue_order_id_map = Arc::clone(&self.venue_order_id_map);
1830 let instrument_provider = Arc::clone(&self.instrument_provider);
1831 let exec_sender = get_exec_event_sender();
1832 let clock = get_atomic_clock_realtime();
1833 let account_id = self.core.account_id;
1834 let client_clone = client.as_arc().clone();
1835 let original_order = Arc::new(original_order);
1836
1837 let handle = get_runtime().spawn(async move {
1838 if let Err(e) = Self::handle_modify_order_async(
1839 &cmd,
1840 &client_clone,
1841 &order_id_map,
1842 &venue_order_id_map,
1843 &instrument_provider,
1844 &exec_sender,
1845 clock,
1846 account_id,
1847 &original_order,
1848 )
1849 .await
1850 {
1851 tracing::error!("Error modifying order: {e}");
1852 }
1853 });
1854
1855 self.pending_tasks
1856 .lock()
1857 .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
1858 .push(handle);
1859
1860 Ok(())
1861 }
1862
1863 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1864 let client = self.ib_client.as_ref().context("IB client not connected")?;
1865
1866 let order_id_map = Arc::clone(&self.order_id_map);
1867 let instrument_id_map = Arc::clone(&self.instrument_id_map);
1868 let trader_id_map = Arc::clone(&self.trader_id_map);
1869 let strategy_id_map = Arc::clone(&self.strategy_id_map);
1870 let pending_cancel_orders = Arc::clone(&self.pending_cancel_orders);
1871 let exec_sender = get_exec_event_sender();
1872 let clock = get_atomic_clock_realtime();
1873 let account_id = self.core.account_id;
1874 let client_clone = client.as_arc().clone();
1875
1876 let handle = get_runtime().spawn(async move {
1877 if let Err(e) = Self::handle_cancel_order_async(
1878 &cmd,
1879 &client_clone,
1880 &order_id_map,
1881 &instrument_id_map,
1882 &trader_id_map,
1883 &strategy_id_map,
1884 &pending_cancel_orders,
1885 &exec_sender,
1886 clock.get_time_ns(),
1887 account_id,
1888 )
1889 .await
1890 {
1891 tracing::error!("Error canceling order: {e}");
1892 }
1893 });
1894
1895 self.pending_tasks
1896 .lock()
1897 .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
1898 .push(handle);
1899
1900 Ok(())
1901 }
1902
1903 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1904 let client = self.ib_client.as_ref().context("IB client not connected")?;
1905
1906 if cmd.order_side != OrderSide::NoOrderSide {
1908 tracing::warn!(
1909 "Interactive Brokers does not support order_side filtering for cancel all orders; \
1910 ignoring order_side={:?} and canceling all orders",
1911 cmd.order_side
1912 );
1913 }
1914
1915 let orders_to_cancel: Vec<(ClientOrderId, Option<VenueOrderId>)> = {
1918 let cache = self.core.cache();
1919 let mut orders_to_cancel: Vec<(ClientOrderId, Option<VenueOrderId>)> = cache
1920 .orders_open(
1921 None, Some(&cmd.instrument_id), None, None, None, )
1927 .iter()
1928 .map(|order| (order.client_order_id(), order.venue_order_id()))
1929 .collect();
1930
1931 if orders_to_cancel.is_empty() {
1932 let instrument_id_map = self
1933 .instrument_id_map
1934 .lock()
1935 .map_err(|_| anyhow::anyhow!("Failed to lock instrument ID map"))?;
1936
1937 let venue_map = self
1938 .venue_order_id_map
1939 .lock()
1940 .map_err(|_| anyhow::anyhow!("Failed to lock venue order ID map"))?;
1941
1942 orders_to_cancel.extend(instrument_id_map.iter().filter_map(
1943 |(order_id, instrument_id)| {
1944 (*instrument_id == cmd.instrument_id)
1945 .then_some(*order_id)
1946 .and_then(|ib_order_id| {
1947 venue_map.get(&ib_order_id).copied().map(|client_order_id| {
1948 (
1949 client_order_id,
1950 Some(VenueOrderId::from(ib_order_id.to_string())),
1951 )
1952 })
1953 })
1954 },
1955 ));
1956 }
1957
1958 orders_to_cancel.sort_by_key(|(client_order_id, _)| client_order_id.to_string());
1959 orders_to_cancel.dedup_by_key(|(client_order_id, _)| *client_order_id);
1960 orders_to_cancel
1961 };
1962
1963 if orders_to_cancel.is_empty() {
1964 tracing::info!("No open orders to cancel");
1965 return Ok(());
1966 }
1967
1968 tracing::info!(
1969 "Canceling {} open order(s) for instrument {}",
1970 orders_to_cancel.len(),
1971 cmd.instrument_id
1972 );
1973
1974 let client_clone = client.as_arc().clone();
1975 let order_id_map = Arc::clone(&self.order_id_map);
1976 let instrument_id_map = Arc::clone(&self.instrument_id_map);
1977 let trader_id_map = Arc::clone(&self.trader_id_map);
1978 let strategy_id_map = Arc::clone(&self.strategy_id_map);
1979 let pending_cancel_orders = Arc::clone(&self.pending_cancel_orders);
1980 let exec_sender = get_exec_event_sender();
1981 let clock = get_atomic_clock_realtime();
1982 let account_id = self.core.account_id;
1983
1984 let handle = get_runtime().spawn(async move {
1985 if let Err(e) = Self::handle_cancel_all_orders_async(
1986 &client_clone,
1987 &order_id_map,
1988 &instrument_id_map,
1989 &trader_id_map,
1990 &strategy_id_map,
1991 &pending_cancel_orders,
1992 &exec_sender,
1993 clock.get_time_ns(),
1994 account_id,
1995 orders_to_cancel,
1996 )
1997 .await
1998 {
1999 tracing::error!("Error canceling all orders: {e}");
2000 }
2001 });
2002
2003 self.pending_tasks
2004 .lock()
2005 .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
2006 .push(handle);
2007
2008 Ok(())
2009 }
2010
2011 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
2012 for cancel_cmd in cmd.cancels {
2014 self.cancel_order(cancel_cmd)?;
2015 }
2016 Ok(())
2017 }
2018}
2019
2020#[allow(dead_code)]
2021impl InteractiveBrokersExecutionClient {
2022 async fn handle_cancel_order_async(
2028 cmd: &CancelOrder,
2029 client: &Arc<Client>,
2030 order_id_map: &Arc<Mutex<AHashMap<ClientOrderId, i32>>>,
2031 instrument_id_map: &Arc<Mutex<AHashMap<i32, InstrumentId>>>,
2032 trader_id_map: &Arc<Mutex<AHashMap<i32, TraderId>>>,
2033 strategy_id_map: &Arc<Mutex<AHashMap<i32, StrategyId>>>,
2034 pending_cancel_orders: &Arc<Mutex<ahash::AHashSet<ClientOrderId>>>,
2035 exec_sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
2036 ts_init: UnixNanos,
2037 account_id: AccountId,
2038 ) -> anyhow::Result<()> {
2039 let ib_order_id = if let Some(venue_order_id) = &cmd.venue_order_id {
2040 venue_order_id
2042 .as_str()
2043 .parse::<i32>()
2044 .map_err(|e| anyhow::anyhow!("Failed to parse venue order ID: {e}"))?
2045 } else {
2046 let map = order_id_map
2048 .lock()
2049 .map_err(|_| anyhow::anyhow!("Failed to lock order ID map"))?;
2050 *map.get(&cmd.client_order_id)
2051 .context("No IB order ID mapping found for client order ID")?
2052 };
2053
2054 client
2055 .cancel_order(ib_order_id, "")
2056 .await
2057 .context("Failed to cancel order with IB")?;
2058
2059 Self::emit_order_pending_cancel(
2060 ib_order_id,
2061 cmd.client_order_id,
2062 instrument_id_map,
2063 trader_id_map,
2064 strategy_id_map,
2065 pending_cancel_orders,
2066 exec_sender,
2067 ts_init,
2068 account_id,
2069 )?;
2070
2071 Ok(())
2072 }
2073
2074 async fn handle_cancel_all_orders_async(
2075 client: &Arc<Client>,
2076 order_id_map: &Arc<Mutex<AHashMap<ClientOrderId, i32>>>,
2077 instrument_id_map: &Arc<Mutex<AHashMap<i32, InstrumentId>>>,
2078 trader_id_map: &Arc<Mutex<AHashMap<i32, TraderId>>>,
2079 strategy_id_map: &Arc<Mutex<AHashMap<i32, StrategyId>>>,
2080 pending_cancel_orders: &Arc<Mutex<ahash::AHashSet<ClientOrderId>>>,
2081 exec_sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
2082 ts_init: UnixNanos,
2083 account_id: AccountId,
2084 orders_to_cancel: Vec<(ClientOrderId, Option<VenueOrderId>)>,
2085 ) -> anyhow::Result<()> {
2086 let ib_order_ids: Vec<(ClientOrderId, i32)> = {
2088 let order_id_map_guard = order_id_map
2089 .lock()
2090 .map_err(|_| anyhow::anyhow!("Failed to lock order ID map"))?;
2091
2092 orders_to_cancel
2093 .into_iter()
2094 .filter_map(|(client_order_id, venue_order_id)| {
2095 if let Some(venue_order_id) = venue_order_id {
2096 return venue_order_id
2097 .as_str()
2098 .parse::<i32>()
2099 .ok()
2100 .map(|ib_order_id| (client_order_id, ib_order_id));
2101 }
2102
2103 order_id_map_guard
2104 .get(&client_order_id)
2105 .copied()
2106 .map(|ib_order_id| (client_order_id, ib_order_id))
2107 })
2108 .collect()
2109 };
2110
2111 for (client_order_id, ib_order_id) in ib_order_ids {
2113 if let Err(e) = client.cancel_order(ib_order_id, "").await {
2114 tracing::error!(
2115 "Failed to cancel order {} (IB order ID: {}): {e}",
2116 client_order_id,
2117 ib_order_id
2118 );
2119 } else {
2120 if let Err(e) = Self::emit_order_pending_cancel(
2121 ib_order_id,
2122 client_order_id,
2123 instrument_id_map,
2124 trader_id_map,
2125 strategy_id_map,
2126 pending_cancel_orders,
2127 exec_sender,
2128 ts_init,
2129 account_id,
2130 ) {
2131 tracing::error!(
2132 "Failed to emit pending cancel for order {} (IB order ID: {}): {e}",
2133 client_order_id,
2134 ib_order_id
2135 );
2136 }
2137 tracing::debug!(
2138 "Canceled order {} (IB order ID: {})",
2139 client_order_id,
2140 ib_order_id
2141 );
2142 }
2143 }
2144
2145 tracing::info!("Finished canceling all orders");
2146
2147 Ok(())
2148 }
2149
2150 #[allow(clippy::too_many_arguments)]
2151 fn emit_order_pending_cancel(
2152 order_id: i32,
2153 client_order_id: ClientOrderId,
2154 instrument_id_map: &Arc<Mutex<AHashMap<i32, InstrumentId>>>,
2155 trader_id_map: &Arc<Mutex<AHashMap<i32, TraderId>>>,
2156 strategy_id_map: &Arc<Mutex<AHashMap<i32, StrategyId>>>,
2157 pending_cancel_orders: &Arc<Mutex<ahash::AHashSet<ClientOrderId>>>,
2158 exec_sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
2159 ts_init: UnixNanos,
2160 account_id: AccountId,
2161 ) -> anyhow::Result<()> {
2162 let mut pending = pending_cancel_orders
2163 .lock()
2164 .map_err(|_| anyhow::anyhow!("Failed to lock pending cancel orders map"))?;
2165 if !pending.insert(client_order_id) {
2166 return Ok(());
2167 }
2168 drop(pending);
2169
2170 let instrument_id = Self::get_mapped_instrument_id(order_id, instrument_id_map)?
2171 .context("Instrument ID not found for pending cancel order")?;
2172 let (trader_id, strategy_id) =
2173 Self::get_required_order_actor_ids(order_id, trader_id_map, strategy_id_map)?;
2174
2175 let event = OrderPendingCancel::new(
2176 trader_id,
2177 strategy_id,
2178 instrument_id,
2179 client_order_id,
2180 account_id,
2181 UUID4::new(),
2182 ts_init,
2183 ts_init,
2184 false,
2185 Some(VenueOrderId::from(order_id.to_string())),
2186 );
2187
2188 exec_sender
2189 .send(ExecutionEvent::Order(OrderEventAny::PendingCancel(event)))
2190 .map_err(|e| anyhow::anyhow!("Failed to send order pending cancel event: {e}"))?;
2191
2192 Ok(())
2193 }
2194}
2195
2196const MUTEX_POISONED: &str = "Mutex poisoned";