Skip to main content

nautilus_interactive_brokers/execution/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Core execution client implementation for Interactive Brokers.
17
18#[path = "core_helpers.rs"]
19mod core_helpers;
20#[path = "core_orders.rs"]
21mod core_orders;
22#[path = "core_updates.rs"]
23mod core_updates;
24#[cfg(test)]
25#[path = "core_tests.rs"]
26mod tests;
27
28#[cfg(feature = "python")]
29use std::{
30    cell::{Cell, RefCell},
31    collections::HashMap,
32    rc::Rc,
33};
34use std::{
35    collections::VecDeque,
36    fmt::Debug,
37    str::FromStr,
38    sync::{
39        Arc, Mutex,
40        atomic::{AtomicBool, Ordering},
41    },
42    time::Duration,
43};
44
45use ahash::AHashMap;
46use anyhow::Context;
47// removed unused async_trait
48use ibapi::{
49    accounts::PositionUpdate,
50    client::Client,
51    orders::{
52        ExecutionData, ExecutionFilter, Executions, OcaType, OrderStatus as IBOrderStatus,
53        OrderUpdate, Orders,
54    },
55};
56use nautilus_common::{
57    cache::Cache,
58    clients::ExecutionClient,
59    enums::LogLevel,
60    factories::OrderEventFactory,
61    live::{get_runtime, runner::get_exec_event_sender},
62    messages::{
63        ExecutionEvent,
64        execution::{
65            BatchCancelOrders, CancelAllOrders, CancelOrder, ExecutionReport, GenerateFillReports,
66            GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
67            GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
68            GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder,
69            SubmitOrder, SubmitOrderList,
70        },
71    },
72    msgbus::{send_account_state, switchboard::MessagingSwitchboard},
73};
74use nautilus_core::{
75    UUID4, UnixNanos,
76    time::{AtomicTime, get_atomic_clock_realtime},
77};
78use nautilus_live::ExecutionClientCore;
79#[cfg(feature = "python")]
80use nautilus_model::events::{OrderAcceptedBatch, OrderCanceledBatch, OrderSubmittedBatch};
81#[cfg(feature = "python")]
82use nautilus_model::identifiers::{ExecAlgorithmId, OrderListId, PositionId};
83#[cfg(feature = "python")]
84use nautilus_model::orders::OrderList;
85#[cfg(feature = "python")]
86use nautilus_model::python::events::order::order_event_to_pyobject;
87use nautilus_model::{
88    accounts::AccountAny,
89    enums::{
90        LiquiditySide, OmsType, OrderSide, OrderType, PositionSideSpecified, TrailingOffsetType,
91    },
92    events::{
93        AccountState, OrderAccepted, OrderCanceled, OrderEventAny, OrderPendingCancel,
94        OrderRejected, OrderSubmitted,
95    },
96    identifiers::{
97        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
98        VenueOrderId,
99    },
100    instruments::Instrument,
101    orders::{Order, any::OrderAny},
102    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
103    types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
104};
105#[cfg(feature = "python")]
106use nautilus_model::{enums::AccountType, events::OrderInitialized};
107#[cfg(feature = "python")]
108use pyo3::{IntoPyObjectExt, prelude::*};
109use rust_decimal::Decimal;
110use tokio::task::JoinHandle;
111use ustr::Ustr;
112
113use super::{
114    account::{PositionTracker, create_position_tracker},
115    parse::{parse_execution_time, parse_execution_to_fill_report, parse_order_status_to_report},
116    transform::nautilus_order_to_ib_order,
117};
118#[cfg(feature = "python")]
119use crate::common::consts::IB_VENUE;
120use crate::{
121    common::{
122        parse::{ib_contract_to_instrument_id_simple, is_spread_instrument_id},
123        shared_client::SharedClientHandle,
124    },
125    config::InteractiveBrokersExecClientConfig,
126    providers::instruments::InteractiveBrokersInstrumentProvider,
127};
128
129/// Interactive Brokers execution client.
130///
131/// This client provides order execution functionality using the `rust-ibapi` library.
132/// It manages order submission, modification, cancellation, and execution reporting.
133#[cfg_attr(
134    feature = "python",
135    pyo3::pyclass(
136        module = "nautilus_trader.core.nautilus_pyo3.interactive_brokers",
137        unsendable
138    )
139)]
140pub struct InteractiveBrokersExecutionClient {
141    /// Core execution client functionality.
142    core: ExecutionClientCore,
143    /// Configuration for the client.
144    config: InteractiveBrokersExecClientConfig,
145    /// Instrument provider.
146    instrument_provider: Arc<InteractiveBrokersInstrumentProvider>,
147    /// Connection state.
148    is_connected: AtomicBool,
149    /// IB API client (shared per host/port/client_id when both data and execution connect).
150    ib_client: Option<SharedClientHandle>,
151    /// Active task handles.
152    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
153    /// Order ID counter.
154    next_order_id: Arc<Mutex<i32>>,
155    /// Order update subscription handle.
156    order_update_handle: Mutex<Option<JoinHandle<()>>>,
157    /// Client order ID to venue order ID mapping.
158    order_id_map: Arc<Mutex<AHashMap<ClientOrderId, i32>>>,
159    /// Venue order ID to client order ID mapping.
160    venue_order_id_map: Arc<Mutex<AHashMap<i32, ClientOrderId>>>,
161    /// Commission cache by execution ID (to merge with fill reports).
162    commission_cache: Arc<Mutex<AHashMap<String, (f64, String)>>>,
163    /// Instrument ID mapping by venue order ID (for order status tracking).
164    instrument_id_map: Arc<Mutex<AHashMap<i32, InstrumentId>>>,
165    /// Trader ID mapping by venue order ID.
166    trader_id_map: Arc<Mutex<AHashMap<i32, TraderId>>>,
167    /// Strategy ID mapping by venue order ID.
168    strategy_id_map: Arc<Mutex<AHashMap<i32, StrategyId>>>,
169    /// Spread fill tracking to avoid duplicate processing.
170    /// Maps client_order_id to set of trade_ids that have been processed.
171    spread_fill_tracking: Arc<Mutex<AHashMap<ClientOrderId, ahash::AHashSet<String>>>>,
172    /// Position tracker for detecting external position changes (e.g., option exercises).
173    position_tracker: PositionTracker,
174    /// Average fill price tracking by client order ID.
175    /// Stores average fill prices from IB order status updates for use in fill reports.
176    order_avg_prices: Arc<Mutex<AHashMap<ClientOrderId, Price>>>,
177    /// Pending spread combo fills waiting for their matching avg fill price chunk.
178    pending_combo_fills: Arc<Mutex<AHashMap<ClientOrderId, VecDeque<PendingComboFill>>>>,
179    /// Pending average-price chunks derived from cumulative order status updates.
180    pending_combo_fill_avgs: Arc<Mutex<AHashMap<ClientOrderId, VecDeque<(Decimal, Price)>>>>,
181    /// Tracks cumulative filled quantity and notional for deriving incremental avg fill chunks.
182    order_fill_progress: Arc<Mutex<AHashMap<ClientOrderId, (Decimal, Decimal)>>>,
183    /// Set of client order IDs that have already emitted an OrderAccepted event.
184    accepted_orders: Arc<Mutex<ahash::AHashSet<ClientOrderId>>>,
185    /// Set of client order IDs that have already emitted an OrderPendingCancel event.
186    pending_cancel_orders: Arc<Mutex<ahash::AHashSet<ClientOrderId>>>,
187}
188
189#[derive(Clone, Debug)]
190struct PendingComboFill {
191    account_id: AccountId,
192    instrument_id: InstrumentId,
193    venue_order_id: VenueOrderId,
194    trade_id: TradeId,
195    order_side: OrderSide,
196    last_qty: Quantity,
197    last_px: Price,
198    commission: Money,
199    liquidity_side: LiquiditySide,
200    client_order_id: ClientOrderId,
201    ts_event: UnixNanos,
202    ts_init: UnixNanos,
203}
204
205#[cfg(feature = "python")]
206static EXEC_EVENT_CALLBACK: std::sync::OnceLock<std::sync::Mutex<Option<Py<PyAny>>>> =
207    std::sync::OnceLock::new();
208
209#[cfg(feature = "python")]
210thread_local! {
211    static EXEC_EVENT_BRIDGE_INITIALIZED: Cell<bool> = const { Cell::new(false) };
212}
213
214#[cfg(feature = "python")]
215fn exec_event_callback() -> &'static std::sync::Mutex<Option<Py<PyAny>>> {
216    EXEC_EVENT_CALLBACK.get_or_init(|| std::sync::Mutex::new(None))
217}
218
219#[cfg(feature = "python")]
220fn string_hash_map_to_params(
221    params: Option<HashMap<String, String>>,
222) -> Option<nautilus_core::Params> {
223    params.map(|items| {
224        let mut mapped = nautilus_core::Params::new();
225        for (key, value) in items {
226            mapped.insert(key, serde_json::Value::String(value));
227        }
228        mapped
229    })
230}
231
232#[cfg(feature = "python")]
233fn dispatch_python_exec_event(
234    py: Python<'_>,
235    callback: &Py<PyAny>,
236    event: ExecutionEvent,
237) -> PyResult<()> {
238    let (kind, payload) = match event {
239        ExecutionEvent::Order(order_event) => {
240            ("order_event", order_event_to_pyobject(py, order_event)?)
241        }
242        ExecutionEvent::OrderSubmittedBatch(batch) => (
243            "order_submitted_batch",
244            order_submitted_batch_to_pyobject(py, batch)?,
245        ),
246        ExecutionEvent::OrderAcceptedBatch(batch) => (
247            "order_accepted_batch",
248            order_accepted_batch_to_pyobject(py, batch)?,
249        ),
250        ExecutionEvent::OrderCanceledBatch(batch) => (
251            "order_canceled_batch",
252            order_canceled_batch_to_pyobject(py, batch)?,
253        ),
254        ExecutionEvent::Report(report) => match report {
255            ExecutionReport::Order(report) => ("order_report", (*report).into_py_any(py)?),
256            ExecutionReport::Fill(report) => ("fill_report", (*report).into_py_any(py)?),
257            ExecutionReport::Position(report) => ("position_report", (*report).into_py_any(py)?),
258            ExecutionReport::MassStatus(report) => {
259                ("mass_status_report", (*report).into_py_any(py)?)
260            }
261            // The IB adapter never emits OrderWithFills; this arm exists only to
262            // keep the match exhaustive against the shared ExecutionReport enum.
263            ExecutionReport::OrderWithFills(..) => return Ok(()),
264        },
265        ExecutionEvent::Account(account_state) => ("account_state", account_state.into_py_any(py)?),
266    };
267
268    callback.call1(py, (kind, payload))?;
269    Ok(())
270}
271
272#[cfg(feature = "python")]
273fn order_accepted_batch_to_pyobject(
274    py: Python<'_>,
275    batch: OrderAcceptedBatch,
276) -> PyResult<Py<PyAny>> {
277    batch
278        .into_iter()
279        .map(|event| order_event_to_pyobject(py, OrderEventAny::Accepted(event)))
280        .collect::<PyResult<Vec<Py<PyAny>>>>()?
281        .into_py_any(py)
282}
283
284#[cfg(feature = "python")]
285fn order_submitted_batch_to_pyobject(
286    py: Python<'_>,
287    batch: OrderSubmittedBatch,
288) -> PyResult<Py<PyAny>> {
289    batch
290        .into_iter()
291        .map(|event| order_event_to_pyobject(py, OrderEventAny::Submitted(event)))
292        .collect::<PyResult<Vec<Py<PyAny>>>>()?
293        .into_py_any(py)
294}
295
296#[cfg(feature = "python")]
297fn order_canceled_batch_to_pyobject(
298    py: Python<'_>,
299    batch: OrderCanceledBatch,
300) -> PyResult<Py<PyAny>> {
301    batch
302        .into_iter()
303        .map(|event| order_event_to_pyobject(py, OrderEventAny::Canceled(event)))
304        .collect::<PyResult<Vec<Py<PyAny>>>>()?
305        .into_py_any(py)
306}
307
308impl Debug for InteractiveBrokersExecutionClient {
309    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310        f.debug_struct(stringify!(InteractiveBrokersExecutionClient))
311            .field("core", &self.core)
312            .field("config", &self.config)
313            .field("instrument_provider", &self.instrument_provider)
314            .field("is_connected", &self.is_connected.load(Ordering::Relaxed))
315            .field("ib_client", &self.ib_client.is_some())
316            .finish_non_exhaustive()
317    }
318}
319
320impl InteractiveBrokersExecutionClient {
321    /// Creates a new [`InteractiveBrokersExecutionClient`].
322    ///
323    /// # Arguments
324    ///
325    /// * `core` - Core execution client functionality
326    /// * `config` - Configuration for the client
327    /// * `instrument_provider` - Instrument provider
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if client creation fails.
332    pub fn new(
333        mut core: ExecutionClientCore,
334        config: InteractiveBrokersExecClientConfig,
335        instrument_provider: Arc<InteractiveBrokersInstrumentProvider>,
336    ) -> anyhow::Result<Self> {
337        // If account_id is provided in config, use it
338        if let Some(account_id) = &config.account_id {
339            core.account_id = AccountId::from(account_id.clone());
340        }
341
342        Ok(Self {
343            core,
344            config,
345            instrument_provider,
346            is_connected: AtomicBool::new(false),
347            ib_client: None,
348            pending_tasks: Mutex::new(Vec::new()),
349            next_order_id: Arc::new(Mutex::new(0)),
350            order_update_handle: Mutex::new(None),
351            order_id_map: Arc::new(Mutex::new(AHashMap::new())),
352            venue_order_id_map: Arc::new(Mutex::new(AHashMap::new())),
353            commission_cache: Arc::new(Mutex::new(AHashMap::new())),
354            instrument_id_map: Arc::new(Mutex::new(AHashMap::new())),
355            trader_id_map: Arc::new(Mutex::new(AHashMap::new())),
356            strategy_id_map: Arc::new(Mutex::new(AHashMap::new())),
357            spread_fill_tracking: Arc::new(Mutex::new(AHashMap::new())),
358            position_tracker: create_position_tracker(),
359            order_avg_prices: Arc::new(Mutex::new(AHashMap::new())),
360            pending_combo_fills: Arc::new(Mutex::new(AHashMap::new())),
361            pending_combo_fill_avgs: Arc::new(Mutex::new(AHashMap::new())),
362            order_fill_progress: Arc::new(Mutex::new(AHashMap::new())),
363            accepted_orders: Arc::new(Mutex::new(ahash::AHashSet::new())),
364            pending_cancel_orders: Arc::new(Mutex::new(ahash::AHashSet::new())),
365        })
366    }
367
368    #[cfg(feature = "python")]
369    pub(crate) fn new_for_python(
370        mut config: InteractiveBrokersExecClientConfig,
371        instrument_provider: crate::providers::instruments::InteractiveBrokersInstrumentProvider,
372    ) -> anyhow::Result<Self> {
373        Self::ensure_python_event_bridge();
374
375        let account_id_value = config
376            .account_id
377            .clone()
378            .unwrap_or_else(|| "UNKNOWN".to_string());
379        let normalized_account_id = if account_id_value.starts_with("IB-") {
380            account_id_value
381        } else {
382            format!("IB-{account_id_value}")
383        };
384
385        config.account_id = Some(normalized_account_id.clone());
386
387        let core = ExecutionClientCore::new(
388            TraderId::from("TRADER-001"),
389            ClientId::from("IB"),
390            *IB_VENUE,
391            OmsType::Netting,
392            AccountId::from(normalized_account_id),
393            AccountType::Margin,
394            None,
395            Rc::new(RefCell::new(Cache::default())),
396        );
397
398        Self::new(core, config, Arc::new(instrument_provider))
399    }
400
401    #[cfg(feature = "python")]
402    pub(crate) fn register_python_event_callback(&self, callback: Py<PyAny>) {
403        *exec_event_callback()
404            .lock()
405            .expect("execution event callback mutex poisoned") = Some(callback);
406    }
407
408    #[cfg(feature = "python")]
409    fn ensure_python_event_bridge() {
410        if nautilus_common::live::runner::try_get_exec_event_sender().is_some() {
411            return;
412        }
413
414        EXEC_EVENT_BRIDGE_INITIALIZED.with(|initialized| {
415            if initialized.replace(true) {
416                return;
417            }
418
419            let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
420            nautilus_common::live::runner::set_exec_event_sender(sender);
421
422            get_runtime().spawn(async move {
423                while let Some(event) = receiver.recv().await {
424                    Python::attach(|py| {
425                        let callback_guard = exec_event_callback()
426                            .lock()
427                            .expect("execution event callback mutex poisoned");
428
429                        let Some(callback) = callback_guard.as_ref() else {
430                            return;
431                        };
432
433                        if let Err(e) = dispatch_python_exec_event(py, callback, event) {
434                            tracing::error!("Failed to dispatch IB execution event to Python: {e}");
435                        }
436                    });
437                }
438            });
439        });
440    }
441
442    #[cfg(feature = "python")]
443    #[allow(clippy::needless_pass_by_value)]
444    pub(crate) fn submit_order_for_python(
445        &self,
446        trader_id: TraderId,
447        order: OrderAny,
448        instrument_id: InstrumentId,
449        strategy_id: StrategyId,
450        exec_algorithm_id: Option<ExecAlgorithmId>,
451        position_id: Option<PositionId>,
452        params: Option<HashMap<String, String>>,
453    ) -> anyhow::Result<()> {
454        self.cache_order_for_python(order.clone(), position_id)?;
455
456        let cmd = SubmitOrder {
457            trader_id,
458            client_id: Some(self.client_id()),
459            strategy_id,
460            instrument_id,
461            client_order_id: order.client_order_id(),
462            order_init: order.init_event().clone(),
463            exec_algorithm_id,
464            position_id,
465            params: string_hash_map_to_params(params),
466            command_id: UUID4::new(),
467            ts_init: get_atomic_clock_realtime().get_time_ns(),
468        };
469
470        ExecutionClient::submit_order(self, cmd)
471    }
472
473    #[cfg(feature = "python")]
474    #[allow(clippy::needless_pass_by_value)]
475    pub(crate) fn submit_order_list_for_python(
476        &self,
477        trader_id: TraderId,
478        strategy_id: StrategyId,
479        orders: Vec<OrderAny>,
480        exec_algorithm_id: Option<ExecAlgorithmId>,
481        position_id: Option<PositionId>,
482        params: Option<HashMap<String, String>>,
483    ) -> anyhow::Result<()> {
484        if orders.is_empty() {
485            anyhow::bail!("Order list cannot be empty");
486        }
487
488        for order in &orders {
489            self.cache_order_for_python(order.clone(), position_id)?;
490        }
491
492        let ts_init = get_atomic_clock_realtime().get_time_ns();
493        let instrument_id = orders[0].instrument_id();
494        let client_order_ids: Vec<ClientOrderId> =
495            orders.iter().map(|o| o.client_order_id()).collect();
496        let order_list_id = OrderListId::from(UUID4::new().to_string());
497        let order_list = OrderList::new(
498            order_list_id,
499            instrument_id,
500            strategy_id,
501            client_order_ids,
502            ts_init,
503        );
504        let order_inits: Vec<OrderInitialized> =
505            orders.iter().map(|o| o.init_event().clone()).collect();
506
507        let cmd = SubmitOrderList::new(
508            trader_id,
509            Some(self.client_id()),
510            strategy_id,
511            order_list,
512            order_inits,
513            exec_algorithm_id,
514            position_id,
515            string_hash_map_to_params(params),
516            UUID4::new(),
517            ts_init,
518        );
519
520        ExecutionClient::submit_order_list(self, cmd)
521    }
522
523    #[cfg(feature = "python")]
524    pub(crate) fn modify_order_for_python(
525        &self,
526        trader_id: TraderId,
527        strategy_id: StrategyId,
528        client_order_id: ClientOrderId,
529        venue_order_id: Option<VenueOrderId>,
530        instrument_id: InstrumentId,
531        quantity: Option<Quantity>,
532        price: Option<Price>,
533        trigger_price: Option<Price>,
534        params: Option<HashMap<String, String>>,
535    ) -> anyhow::Result<()> {
536        let cmd = ModifyOrder {
537            trader_id,
538            client_id: Some(self.client_id()),
539            strategy_id,
540            instrument_id,
541            client_order_id,
542            venue_order_id,
543            quantity,
544            price,
545            trigger_price,
546            params: string_hash_map_to_params(params),
547            command_id: UUID4::new(),
548            ts_init: get_atomic_clock_realtime().get_time_ns(),
549        };
550
551        ExecutionClient::modify_order(self, cmd)
552    }
553
554    #[cfg(feature = "python")]
555    pub(crate) fn cancel_order_for_python(
556        &self,
557        trader_id: TraderId,
558        strategy_id: StrategyId,
559        client_order_id: ClientOrderId,
560        venue_order_id: Option<VenueOrderId>,
561        instrument_id: InstrumentId,
562        params: Option<HashMap<String, String>>,
563    ) -> anyhow::Result<()> {
564        let cmd = CancelOrder {
565            trader_id,
566            client_id: Some(self.client_id()),
567            strategy_id,
568            instrument_id,
569            client_order_id,
570            venue_order_id,
571            params: string_hash_map_to_params(params),
572            command_id: UUID4::new(),
573            ts_init: get_atomic_clock_realtime().get_time_ns(),
574        };
575
576        ExecutionClient::cancel_order(self, cmd)
577    }
578
579    #[cfg(feature = "python")]
580    pub(crate) fn cancel_all_orders_for_python(
581        &self,
582        trader_id: TraderId,
583        strategy_id: StrategyId,
584        instrument_id: InstrumentId,
585        order_side: OrderSide,
586        params: Option<HashMap<String, String>>,
587    ) -> anyhow::Result<()> {
588        let cmd = CancelAllOrders {
589            trader_id,
590            client_id: Some(self.client_id()),
591            strategy_id,
592            instrument_id,
593            order_side,
594            command_id: UUID4::new(),
595            ts_init: get_atomic_clock_realtime().get_time_ns(),
596            params: string_hash_map_to_params(params),
597        };
598
599        ExecutionClient::cancel_all_orders(self, cmd)
600    }
601
602    #[cfg(feature = "python")]
603    pub(crate) fn batch_cancel_orders_for_python(
604        &self,
605        trader_id: TraderId,
606        strategy_id: StrategyId,
607        instrument_id: InstrumentId,
608        client_order_ids: Vec<ClientOrderId>,
609        params: Option<HashMap<String, String>>,
610    ) -> anyhow::Result<()> {
611        let ts_init = get_atomic_clock_realtime().get_time_ns();
612        let cancels = client_order_ids
613            .into_iter()
614            .map(|client_order_id| CancelOrder {
615                trader_id,
616                client_id: Some(self.client_id()),
617                strategy_id,
618                instrument_id,
619                client_order_id,
620                venue_order_id: None,
621                command_id: UUID4::new(),
622                ts_init,
623                params: None,
624            })
625            .collect();
626
627        let cmd = BatchCancelOrders {
628            trader_id,
629            client_id: Some(self.client_id()),
630            strategy_id,
631            instrument_id,
632            cancels,
633            command_id: UUID4::new(),
634            ts_init,
635            params: string_hash_map_to_params(params),
636        };
637
638        ExecutionClient::batch_cancel_orders(self, cmd)
639    }
640
641    #[cfg(feature = "python")]
642    pub(crate) fn query_account_for_python(&self, trader_id: TraderId) -> anyhow::Result<()> {
643        let cmd = QueryAccount {
644            trader_id,
645            client_id: Some(self.client_id()),
646            account_id: ExecutionClient::account_id(self),
647            command_id: UUID4::new(),
648            ts_init: get_atomic_clock_realtime().get_time_ns(),
649            params: None,
650        };
651
652        ExecutionClient::query_account(self, cmd)
653    }
654
655    #[cfg(feature = "python")]
656    pub(crate) fn query_order_for_python(
657        &self,
658        trader_id: TraderId,
659        strategy_id: StrategyId,
660        instrument_id: InstrumentId,
661        client_order_id: ClientOrderId,
662        venue_order_id: Option<VenueOrderId>,
663    ) -> anyhow::Result<()> {
664        let cmd = QueryOrder {
665            trader_id,
666            client_id: Some(self.client_id()),
667            strategy_id,
668            instrument_id,
669            client_order_id,
670            venue_order_id,
671            command_id: UUID4::new(),
672            ts_init: get_atomic_clock_realtime().get_time_ns(),
673            params: None,
674        };
675
676        ExecutionClient::query_order(self, cmd)
677    }
678
679    #[cfg(feature = "python")]
680    pub(crate) async fn generate_order_status_report_for_python(
681        &self,
682        instrument_id: Option<InstrumentId>,
683        client_order_id: Option<ClientOrderId>,
684        venue_order_id: Option<VenueOrderId>,
685    ) -> anyhow::Result<Option<OrderStatusReport>> {
686        let cmd = GenerateOrderStatusReport {
687            command_id: UUID4::new(),
688            ts_init: get_atomic_clock_realtime().get_time_ns(),
689            instrument_id,
690            client_order_id,
691            venue_order_id,
692            params: None,
693            correlation_id: None,
694        };
695
696        self.generate_order_status_report(&cmd).await
697    }
698
699    #[cfg(feature = "python")]
700    pub(crate) async fn generate_order_status_reports_for_python(
701        &self,
702        open_only: bool,
703        instrument_id: Option<InstrumentId>,
704        start: Option<u64>,
705        end: Option<u64>,
706    ) -> anyhow::Result<Vec<OrderStatusReport>> {
707        let start_ns = start.map(nautilus_core::UnixNanos::from);
708        let end_ns = end.map(nautilus_core::UnixNanos::from);
709
710        let cmd = GenerateOrderStatusReports {
711            command_id: UUID4::new(),
712            ts_init: get_atomic_clock_realtime().get_time_ns(),
713            open_only,
714            instrument_id,
715            start: start_ns,
716            end: end_ns,
717            params: None,
718            log_receipt_level: LogLevel::Info,
719            correlation_id: None,
720        };
721
722        let mut reports = self.generate_order_status_reports(&cmd).await?;
723
724        if open_only {
725            use nautilus_model::enums::OrderStatus;
726            reports.retain(|report| {
727                matches!(
728                    report.order_status,
729                    OrderStatus::Initialized
730                        | OrderStatus::Submitted
731                        | OrderStatus::Accepted
732                        | OrderStatus::Triggered
733                        | OrderStatus::PendingUpdate
734                        | OrderStatus::PendingCancel
735                )
736            });
737        }
738
739        if start_ns.is_some() || end_ns.is_some() {
740            reports.retain(|report| {
741                let ts = report.ts_last;
742
743                if let Some(start) = start_ns
744                    && ts < start
745                {
746                    return false;
747                }
748
749                if let Some(end) = end_ns
750                    && ts > end
751                {
752                    return false;
753                }
754
755                true
756            });
757        }
758
759        Ok(reports)
760    }
761
762    #[cfg(feature = "python")]
763    pub(crate) async fn generate_fill_reports_for_python(
764        &self,
765        instrument_id: Option<InstrumentId>,
766        venue_order_id: Option<VenueOrderId>,
767        start: Option<u64>,
768        end: Option<u64>,
769    ) -> anyhow::Result<Vec<FillReport>> {
770        let cmd = GenerateFillReports {
771            command_id: UUID4::new(),
772            ts_init: get_atomic_clock_realtime().get_time_ns(),
773            instrument_id,
774            venue_order_id,
775            start: start.map(nautilus_core::UnixNanos::from),
776            end: end.map(nautilus_core::UnixNanos::from),
777            params: None,
778            log_receipt_level: LogLevel::Info,
779            correlation_id: None,
780        };
781
782        self.generate_fill_reports(cmd).await
783    }
784
785    #[cfg(feature = "python")]
786    pub(crate) async fn generate_position_status_reports_for_python(
787        &self,
788        instrument_id: Option<InstrumentId>,
789        start: Option<u64>,
790        end: Option<u64>,
791    ) -> anyhow::Result<Vec<PositionStatusReport>> {
792        let cmd = GeneratePositionStatusReports {
793            command_id: UUID4::new(),
794            ts_init: get_atomic_clock_realtime().get_time_ns(),
795            instrument_id,
796            start: start.map(nautilus_core::UnixNanos::from),
797            end: end.map(nautilus_core::UnixNanos::from),
798            params: None,
799            log_receipt_level: LogLevel::Info,
800            correlation_id: None,
801        };
802
803        self.generate_position_status_reports(&cmd).await
804    }
805
806    #[cfg(feature = "python")]
807    pub(crate) fn cache_order_for_python(
808        &self,
809        order: OrderAny,
810        position_id: Option<PositionId>,
811    ) -> anyhow::Result<()> {
812        self.core
813            .cache_mut()
814            .add_order(order, position_id, Some(self.client_id()), true)
815    }
816
817    fn reserve_next_local_order_id(next_order_id: &Arc<Mutex<i32>>) -> anyhow::Result<i32> {
818        let mut guard = next_order_id
819            .lock()
820            .map_err(|_| anyhow::anyhow!("Failed to lock next order ID"))?;
821        anyhow::ensure!(
822            *guard > 0,
823            "No valid Interactive Brokers order ID available"
824        );
825        let order_id = *guard;
826        *guard += 1;
827        Ok(order_id)
828    }
829
830    /// Gets the next valid order ID from IB.
831    ///
832    /// # Errors
833    ///
834    /// Returns an error if getting the next order ID fails.
835    async fn get_next_order_id(&self) -> anyhow::Result<i32> {
836        let client = self.ib_client.as_ref().context("IB client not connected")?;
837
838        let timeout_dur = Duration::from_secs(self.config.request_timeout);
839        let order_id = tokio::time::timeout(timeout_dur, client.next_valid_order_id())
840            .await
841            .context("Timeout getting next order ID")??;
842        Ok(order_id)
843    }
844
845    async fn get_highest_open_order_id(&self, client: &Client) -> anyhow::Result<Option<i32>> {
846        let timeout_dur = Duration::from_secs(self.config.request_timeout);
847        let mut subscription = tokio::time::timeout(timeout_dur, client.all_open_orders())
848            .await
849            .context("Timeout requesting open orders for next order ID initialization")??;
850        let mut highest_order_id = None;
851
852        while let Some(order_result) = subscription.next().await {
853            match order_result {
854                Ok(Orders::OrderData(data)) => {
855                    highest_order_id = Some(
856                        highest_order_id
857                            .map_or(data.order_id, |current: i32| current.max(data.order_id)),
858                    );
859                }
860                Ok(_) => {}
861                Err(e) => {
862                    tracing::debug!(
863                        "Ignoring open-order event while initializing next order ID: {e}"
864                    );
865                }
866            }
867        }
868
869        Ok(highest_order_id)
870    }
871
872    /// Aborts all pending tasks.
873    fn abort_pending_tasks(&self) {
874        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
875        for task in tasks.drain(..) {
876            task.abort();
877        }
878
879        if let Some(handle) = self
880            .order_update_handle
881            .lock()
882            .expect(MUTEX_POISONED)
883            .take()
884        {
885            handle.abort();
886        }
887    }
888}
889
890// Implementation of ExecutionClient trait
891#[async_trait::async_trait(?Send)]
892impl ExecutionClient for InteractiveBrokersExecutionClient {
893    fn is_connected(&self) -> bool {
894        self.is_connected.load(Ordering::Relaxed)
895    }
896
897    fn client_id(&self) -> ClientId {
898        self.core.client_id
899    }
900
901    fn account_id(&self) -> AccountId {
902        self.core.account_id
903    }
904
905    fn venue(&self) -> Venue {
906        self.core.venue
907    }
908
909    fn oms_type(&self) -> OmsType {
910        self.core.oms_type
911    }
912
913    fn get_account(&self) -> Option<AccountAny> {
914        self.core.cache().account(&self.core.account_id).cloned()
915    }
916
917    fn generate_account_state(
918        &self,
919        balances: Vec<AccountBalance>,
920        margins: Vec<MarginBalance>,
921        reported: bool,
922        ts_event: UnixNanos,
923    ) -> anyhow::Result<()> {
924        let factory = OrderEventFactory::new(
925            self.core.trader_id,
926            self.core.account_id,
927            self.core.account_type,
928            self.core.base_currency,
929        );
930        let state = factory.generate_account_state(
931            balances,
932            margins,
933            reported,
934            ts_event,
935            get_atomic_clock_realtime().get_time_ns(),
936        );
937        get_exec_event_sender()
938            .send(ExecutionEvent::Account(state))
939            .map_err(|e| anyhow::anyhow!("Failed to send account state: {e}"))
940    }
941
942    fn start(&mut self) -> anyhow::Result<()> {
943        // Start is handled by connect() for live clients
944        Ok(())
945    }
946
947    fn stop(&mut self) -> anyhow::Result<()> {
948        self.abort_pending_tasks();
949        Ok(())
950    }
951
952    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
953        let client = self.ib_client.as_ref().context("IB client not connected")?;
954
955        let order_id_map = Arc::clone(&self.order_id_map);
956        let venue_order_id_map = Arc::clone(&self.venue_order_id_map);
957        let instrument_id_map = Arc::clone(&self.instrument_id_map);
958        let trader_id_map = Arc::clone(&self.trader_id_map);
959        let strategy_id_map = Arc::clone(&self.strategy_id_map);
960        let next_order_id = Arc::clone(&self.next_order_id);
961        let instrument_provider = Arc::clone(&self.instrument_provider);
962        let exec_sender = get_exec_event_sender();
963        let clock = get_atomic_clock_realtime();
964        let accepted_orders = Arc::clone(&self.accepted_orders);
965
966        let client_clone = client.as_arc().clone();
967
968        let account_id = self.core.account_id;
969
970        let handle = get_runtime().spawn(async move {
971            if let Err(e) = Self::handle_submit_order_async(
972                &cmd,
973                &client_clone,
974                &order_id_map,
975                &venue_order_id_map,
976                &instrument_id_map,
977                &trader_id_map,
978                &strategy_id_map,
979                &next_order_id,
980                &instrument_provider,
981                &exec_sender,
982                clock,
983                account_id,
984                &accepted_orders,
985            )
986            .await
987            {
988                tracing::error!("Error submitting order: {e}");
989            }
990        });
991
992        self.pending_tasks
993            .lock()
994            .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
995            .push(handle);
996
997        Ok(())
998    }
999
1000    async fn connect(&mut self) -> anyhow::Result<()> {
1001        if self.is_connected.load(Ordering::Relaxed) {
1002            log::debug!("Interactive Brokers execution client already connected");
1003            return Ok(());
1004        }
1005
1006        tracing::info!("Connecting Interactive Brokers execution client...");
1007        log::debug!(
1008            "Execution client config host={} port={} client_id={} account_id={:?} request_timeout={} connection_timeout={} fetch_all_open_orders={} track_option_exercise_from_position_update={}",
1009            self.config.host,
1010            self.config.port,
1011            self.config.client_id,
1012            self.config.account_id,
1013            self.config.request_timeout,
1014            self.config.connection_timeout,
1015            self.config.fetch_all_open_orders,
1016            self.config.track_option_exercise_from_position_update
1017        );
1018
1019        let handle = crate::common::shared_client::get_or_connect(
1020            &self.config.host,
1021            self.config.port,
1022            self.config.client_id,
1023            self.config.connection_timeout,
1024        )
1025        .await
1026        .context("Failed to connect to IB Gateway/TWS")?;
1027
1028        tracing::info!(
1029            "Connected to IB Gateway/TWS at {}:{} (client_id: {})",
1030            self.config.host,
1031            self.config.port,
1032            self.config.client_id
1033        );
1034
1035        self.ib_client = Some(handle);
1036
1037        // Initialize provider and load instruments from cache if configured
1038        log::debug!("Initializing IB execution instrument provider");
1039        if let Err(e) = self.instrument_provider.initialize().await {
1040            tracing::warn!("Failed to initialize instrument provider: {}", e);
1041        }
1042
1043        // Load instruments from config
1044        log::debug!("Loading configured IB execution instruments");
1045
1046        if let Err(e) = self
1047            .instrument_provider
1048            .load_all_async(
1049                self.ib_client.as_ref().unwrap().as_arc().as_ref(),
1050                None,
1051                None,
1052                false,
1053            )
1054            .await
1055        {
1056            tracing::warn!("Failed to load instruments on startup: {}", e);
1057        }
1058
1059        let client = self.ib_client.as_ref().unwrap().as_arc();
1060        log::debug!("Preloading cached spread instruments for execution client");
1061        self.preload_cached_spread_instruments(client.as_ref())
1062            .await?;
1063
1064        // Get initial next order ID (uses self.ib_client internally)
1065        log::debug!("Requesting next valid IB order ID");
1066        let next_id = self.get_next_order_id().await?;
1067        log::debug!("Requesting highest open IB order ID");
1068        let highest_open_order_id = self.get_highest_open_order_id(client.as_ref()).await?;
1069        let starting_order_id = highest_open_order_id
1070            .map(|order_id| next_id.max(order_id.saturating_add(1)))
1071            .unwrap_or(next_id);
1072        if starting_order_id != next_id {
1073            tracing::info!(
1074                "Adjusted next Interactive Brokers order ID from {} to {} based on existing open orders",
1075                next_id,
1076                starting_order_id
1077            );
1078        } else {
1079            tracing::info!(
1080                "Initialized next Interactive Brokers order ID to {}",
1081                starting_order_id
1082            );
1083        }
1084        {
1085            let mut id = self
1086                .next_order_id
1087                .lock()
1088                .map_err(|_| anyhow::anyhow!("Failed to lock next order ID"))?;
1089            *id = starting_order_id;
1090        }
1091
1092        // Start order update subscription (uses self.ib_client internally)
1093        log::debug!("Starting IB order update stream");
1094        self.start_order_updates().await?;
1095
1096        // Subscribe to account summary and generate initial account state
1097        // Wait for initial account summary to load before proceeding
1098        let client_for_account = Arc::clone(client);
1099        let account_id = self.core.account_id;
1100        let _exec_client_core = self.core.clone(); // Clone core to generate account state
1101        log::debug!("Subscribing to IB account summary for {}", account_id);
1102        match crate::execution::account::subscribe_account_summary(&client_for_account, account_id)
1103            .await
1104        {
1105            Ok((balances, margins)) => {
1106                tracing::info!(
1107                    "Received account summary: {} balances, {} margins",
1108                    balances.len(),
1109                    margins.len()
1110                );
1111                // Generate account state event like Python version
1112                let ts_event = get_atomic_clock_realtime().get_time_ns();
1113
1114                if let Err(e) = ExecutionClient::generate_account_state(
1115                    self, balances, margins, true, // reported
1116                    ts_event,
1117                ) {
1118                    tracing::warn!("Failed to generate account state: {}", e);
1119                }
1120            }
1121            Err(e) => {
1122                tracing::warn!("Failed to subscribe to account summary: {}", e);
1123            }
1124        }
1125
1126        // Initialize position tracking with existing positions
1127        // This avoids processing duplicates from execDetails
1128        let client_for_positions_init = Arc::clone(client);
1129        let position_tracker_init = Arc::clone(&self.position_tracker);
1130
1131        log::debug!("Initializing IB execution position tracking");
1132        if let Err(e) = crate::execution::account::initialize_position_tracking(
1133            &client_for_positions_init,
1134            self.core.account_id,
1135            position_tracker_init,
1136        )
1137        .await
1138        {
1139            tracing::warn!("Failed to initialize position tracking: {}", e);
1140        }
1141
1142        // Subscribe to PnL updates
1143        let client_for_pnl = Arc::clone(client); // Clone Arc
1144
1145        log::debug!("Subscribing to IB PnL updates");
1146
1147        if let Err(e) =
1148            crate::execution::account::subscribe_pnl(&client_for_pnl, self.core.account_id).await
1149        {
1150            tracing::warn!("Failed to subscribe to PnL: {}", e);
1151        }
1152
1153        // Subscribe to position updates for option exercise tracking if enabled
1154        if self.config.track_option_exercise_from_position_update {
1155            let client_for_positions = Arc::clone(client);
1156            let position_tracker_clone = Arc::clone(&self.position_tracker);
1157            let instrument_provider_clone = Arc::clone(&self.instrument_provider);
1158
1159            log::debug!("Subscribing to IB position updates for option exercise tracking");
1160
1161            if let Err(e) = crate::execution::account::subscribe_positions(
1162                &client_for_positions,
1163                self.core.account_id,
1164                position_tracker_clone,
1165                instrument_provider_clone,
1166            )
1167            .await
1168            {
1169                tracing::warn!("Failed to subscribe to positions: {}", e);
1170            }
1171        }
1172
1173        self.is_connected.store(true, Ordering::Relaxed);
1174        self.core.set_connected();
1175
1176        tracing::info!("Connected Interactive Brokers execution client");
1177        Ok(())
1178    }
1179
1180    async fn disconnect(&mut self) -> anyhow::Result<()> {
1181        if !self.is_connected.load(Ordering::Relaxed) {
1182            log::debug!("Interactive Brokers execution client already disconnected");
1183            return Ok(());
1184        }
1185
1186        tracing::info!("Disconnecting Interactive Brokers execution client...");
1187
1188        // Abort pending tasks
1189        self.abort_pending_tasks();
1190
1191        // Disconnect IB client if connected
1192        // The rust-ibapi Client doesn't have an explicit disconnect method
1193        // Connection will be closed when the Arc is dropped
1194        if self.ib_client.is_some() {
1195            tracing::debug!("Dropping IB client connection");
1196        }
1197
1198        self.ib_client = None;
1199        self.is_connected.store(false, Ordering::Relaxed);
1200        self.core.set_disconnected();
1201
1202        tracing::info!("Disconnected Interactive Brokers execution client");
1203        Ok(())
1204    }
1205
1206    async fn generate_order_status_report(
1207        &self,
1208        cmd: &GenerateOrderStatusReport,
1209    ) -> anyhow::Result<Option<OrderStatusReport>> {
1210        let plural_cmd = GenerateOrderStatusReports {
1211            command_id: cmd.command_id,
1212            ts_init: cmd.ts_init,
1213            open_only: false,
1214            instrument_id: cmd.instrument_id,
1215            start: None,
1216            end: None,
1217            params: cmd.params.clone(),
1218            log_receipt_level: LogLevel::Info,
1219            correlation_id: cmd.correlation_id,
1220        };
1221
1222        let reports = self.generate_order_status_reports(&plural_cmd).await?;
1223
1224        // Filter by client_order_id and venue_order_id
1225        let report = reports.into_iter().find(|r| {
1226            let matches_client = if let Some(filter_client_id) = cmd.client_order_id {
1227                r.client_order_id == Some(filter_client_id)
1228            } else {
1229                true
1230            };
1231            let matches_venue = if let Some(filter_venue_id) = cmd.venue_order_id {
1232                r.venue_order_id == filter_venue_id
1233            } else {
1234                true
1235            };
1236            matches_client && matches_venue
1237        });
1238
1239        Ok(report)
1240    }
1241
1242    async fn generate_order_status_reports(
1243        &self,
1244        cmd: &GenerateOrderStatusReports,
1245    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1246        let client = self.ib_client.as_ref().context("IB client not connected")?;
1247
1248        let timeout_dur = Duration::from_secs(self.config.request_timeout);
1249        let mut subscription = tokio::time::timeout(timeout_dur, client.all_open_orders())
1250            .await
1251            .context("Timeout requesting open orders")??;
1252        let mut reports = Vec::new();
1253        let ts_init = get_atomic_clock_realtime().get_time_ns();
1254
1255        while let Some(order_result) = subscription.next().await {
1256            match order_result {
1257                Ok(Orders::OrderData(data)) => {
1258                    // Convert IB contract to instrument ID
1259                    let instrument_id = ib_contract_to_instrument_id_simple(&data.contract)
1260                        .context("Failed to convert contract to instrument ID")?;
1261
1262                    // Filter by instrument_id if specified
1263                    if let Some(filter_id) = cmd.instrument_id {
1264                        if instrument_id != filter_id {
1265                            continue;
1266                        }
1267                    }
1268
1269                    // Parse to order status report using minimal OrderStatus
1270                    // Note: OrderState doesn't have filled/average_fill_price, so we use defaults
1271                    match parse_order_status_to_report(
1272                        &IBOrderStatus {
1273                            order_id: data.order_id,
1274                            status: data.order_state.status.clone(),
1275                            filled: 0.0,             // Not available in OrderState
1276                            remaining: 0.0,          // Not available in OrderState
1277                            average_fill_price: 0.0, // Not available in OrderState
1278                            perm_id: data.order.perm_id,
1279                            parent_id: 0,         // Not available in OrderState
1280                            last_fill_price: 0.0, // Not available in OrderState
1281                            client_id: data.order.client_id,
1282                            why_held: String::new(), // Not available in OrderState
1283                            market_cap_price: 0.0,   // Not available in OrderState
1284                        },
1285                        Some(&data.order),
1286                        instrument_id,
1287                        self.core.account_id,
1288                        &self.instrument_provider,
1289                        ts_init,
1290                    ) {
1291                        Ok(report) => reports.push(report),
1292                        Err(e) => {
1293                            tracing::warn!("Failed to parse order status report: {e}");
1294                        }
1295                    }
1296                }
1297                Ok(_) => {
1298                    // Ignore other order types
1299                }
1300                Err(e) => {
1301                    tracing::warn!("Error receiving order data: {e}");
1302                }
1303            }
1304        }
1305
1306        Ok(reports)
1307    }
1308
1309    async fn generate_fill_reports(
1310        &self,
1311        cmd: GenerateFillReports,
1312    ) -> anyhow::Result<Vec<FillReport>> {
1313        let client = self.ib_client.as_ref().context("IB client not connected")?;
1314
1315        // Get account code from account ID
1316        let account_code = self.core.account_id.to_string();
1317
1318        // Build time filter from start/end if provided
1319        let time_filter = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1320            // Format: YYYYMMDD
1321            // Convert UnixNanos to DateTime<Utc> then format
1322            let start_dt = start.to_datetime_utc();
1323            let end_dt = end.to_datetime_utc();
1324            format!("{} {}", start_dt.format("%Y%m%d"), end_dt.format("%Y%m%d"))
1325        } else {
1326            String::new()
1327        };
1328
1329        let filter = ExecutionFilter {
1330            client_id: None,
1331            account_code,
1332            time: time_filter,
1333            symbol: String::new(),
1334            security_type: String::new(),
1335            exchange: String::new(),
1336            side: String::new(),
1337            last_n_days: 0,
1338            specific_dates: Vec::new(),
1339        };
1340
1341        let timeout_dur = Duration::from_secs(self.config.request_timeout);
1342        let mut subscription = tokio::time::timeout(timeout_dur, client.executions(filter))
1343            .await
1344            .context("Timeout requesting executions")??;
1345        let mut reports = Vec::new();
1346        let ts_init = get_atomic_clock_realtime().get_time_ns();
1347        let mut current_exec_data: Option<ExecutionData> = None;
1348
1349        while let Some(exec_result) = subscription.next().await {
1350            match exec_result {
1351                Ok(Executions::ExecutionData(exec_data)) => {
1352                    current_exec_data = Some(exec_data);
1353                }
1354                Ok(Executions::CommissionReport(commission)) => {
1355                    if let Some(exec_data) = current_exec_data.take() {
1356                        // Convert IB contract to instrument ID
1357                        let instrument_id =
1358                            ib_contract_to_instrument_id_simple(&exec_data.contract)
1359                                .context("Failed to convert contract to instrument ID")?;
1360
1361                        // Filter by instrument_id if specified
1362                        if let Some(filter_id) = cmd.instrument_id
1363                            && instrument_id != filter_id
1364                        {
1365                            continue;
1366                        }
1367
1368                        // Parse to fill report
1369                        match parse_execution_to_fill_report(
1370                            &exec_data.execution,
1371                            &exec_data.contract,
1372                            commission.commission,
1373                            &commission.currency,
1374                            instrument_id,
1375                            self.core.account_id,
1376                            &self.instrument_provider,
1377                            ts_init,
1378                            None, // avg_px (not available in historical fills)
1379                        ) {
1380                            Ok(report) => reports.push(report),
1381                            Err(e) => {
1382                                tracing::warn!("Failed to parse fill report: {e}");
1383                            }
1384                        }
1385                    }
1386                }
1387                Ok(_) => {
1388                    // Ignore other message types (Notice, etc.)
1389                }
1390                Err(e) => {
1391                    tracing::warn!("Error receiving execution data: {e}");
1392                }
1393            }
1394        }
1395
1396        Ok(reports)
1397    }
1398
1399    async fn generate_position_status_reports(
1400        &self,
1401        cmd: &GeneratePositionStatusReports,
1402    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1403        let client = self.ib_client.as_ref().context("IB client not connected")?;
1404
1405        let timeout_dur = Duration::from_secs(self.config.request_timeout);
1406        let mut subscription = tokio::time::timeout(timeout_dur, client.positions())
1407            .await
1408            .context("Timeout requesting positions")??;
1409        let mut reports = Vec::new();
1410        let ts_init = get_atomic_clock_realtime().get_time_ns();
1411
1412        // Process positions until PositionEnd; return empty list when none (reconciliation parity:
1413        // never return None/missing for "no positions").
1414        while let Some(position_result) = subscription.next().await {
1415            match position_result {
1416                Ok(PositionUpdate::Position(position)) => {
1417                    // Filter for the specific account
1418                    if position.account != self.core.account_id.to_string() {
1419                        continue;
1420                    }
1421
1422                    // Convert IB contract to instrument ID
1423                    let instrument_id = ib_contract_to_instrument_id_simple(&position.contract)
1424                        .context("Failed to convert contract to instrument ID")?;
1425
1426                    // Filter by instrument_id if specified
1427                    if let Some(filter_id) = cmd.instrument_id
1428                        && instrument_id != filter_id
1429                    {
1430                        continue;
1431                    }
1432
1433                    // Get instrument for precision
1434                    let instrument = self
1435                        .instrument_provider
1436                        .find(&instrument_id)
1437                        .context("Instrument not found")?;
1438
1439                    // Determine position side
1440                    let position_side = if position.position == 0.0 {
1441                        PositionSideSpecified::Flat
1442                    } else if position.position > 0.0 {
1443                        PositionSideSpecified::Long
1444                    } else {
1445                        PositionSideSpecified::Short
1446                    };
1447
1448                    let quantity =
1449                        Quantity::new(position.position.abs(), instrument.size_precision());
1450
1451                    // Convert IB avg_cost to Nautilus Price, accounting for price magnifier and multiplier
1452                    // Python: converted_avg_cost = avg_cost / (multiplier * price_magnifier)
1453                    let avg_px_open = if position.average_cost > 0.0 {
1454                        let price_magnifier =
1455                            self.instrument_provider.get_price_magnifier(&instrument_id) as f64;
1456                        let multiplier = instrument.multiplier().as_f64();
1457                        let converted_avg_cost =
1458                            position.average_cost / (multiplier * price_magnifier);
1459                        let price_precision = instrument.price_precision();
1460                        Some(
1461                            rust_decimal::Decimal::from_f64_retain(converted_avg_cost)
1462                                .and_then(|d| {
1463                                    // Round to price precision
1464                                    let rounded = d.round_dp(price_precision as u32);
1465                                    Some(rounded)
1466                                })
1467                                .unwrap_or_default(),
1468                        )
1469                    } else {
1470                        None
1471                    };
1472
1473                    let report = PositionStatusReport::new(
1474                        self.core.account_id,
1475                        instrument_id,
1476                        position_side,
1477                        quantity,
1478                        ts_init, // ts_last
1479                        ts_init, // ts_init
1480                        None,    // report_id: auto-generated
1481                        None,    // venue_position_id
1482                        avg_px_open,
1483                    );
1484
1485                    reports.push(report);
1486                }
1487                Ok(PositionUpdate::PositionEnd) => {
1488                    // End of position list
1489                    break;
1490                }
1491                Err(e) => {
1492                    tracing::warn!("Error receiving position data: {e}");
1493                }
1494            }
1495        }
1496
1497        Ok(reports)
1498    }
1499
1500    async fn generate_mass_status(
1501        &self,
1502        lookback_mins: Option<u64>,
1503    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1504        let ts_now = get_atomic_clock_realtime().get_time_ns();
1505        let start = lookback_mins.map(|mins| {
1506            let lookback_ns = mins * 60 * 1_000_000_000;
1507            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1508        });
1509
1510        let order_cmd = GenerateOrderStatusReportsBuilder::default()
1511            .ts_init(ts_now)
1512            .open_only(false)
1513            .start(start)
1514            .build()
1515            .map_err(|e| anyhow::anyhow!("{e}"))?;
1516
1517        let fill_cmd = GenerateFillReportsBuilder::default()
1518            .ts_init(ts_now)
1519            .start(start)
1520            .build()
1521            .map_err(|e| anyhow::anyhow!("{e}"))?;
1522
1523        let position_cmd = GeneratePositionStatusReportsBuilder::default()
1524            .ts_init(ts_now)
1525            .start(start)
1526            .build()
1527            .map_err(|e| anyhow::anyhow!("{e}"))?;
1528
1529        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
1530            self.generate_order_status_reports(&order_cmd),
1531            self.generate_fill_reports(fill_cmd),
1532            self.generate_position_status_reports(&position_cmd),
1533        )?;
1534
1535        tracing::info!(
1536            "generate_mass_status: {} order reports, {} fill reports, {} position reports",
1537            order_reports.len(),
1538            fill_reports.len(),
1539            position_reports.len()
1540        );
1541
1542        let mut mass_status = ExecutionMassStatus::new(
1543            self.core.client_id,
1544            self.core.account_id,
1545            self.core.venue,
1546            ts_now,
1547            Some(UUID4::new()),
1548        );
1549
1550        mass_status.add_order_reports(order_reports);
1551        mass_status.add_fill_reports(fill_reports);
1552        mass_status.add_position_reports(position_reports);
1553
1554        Ok(Some(mass_status))
1555    }
1556
1557    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1558        let client = self.ib_client.as_ref().context("IB client not connected")?;
1559
1560        let client_clone = client.as_arc().clone();
1561        let account_id = self.core.account_id;
1562        let account_type = self.core.account_type;
1563        let base_currency = self.core.base_currency;
1564        let clock = get_atomic_clock_realtime();
1565        let request_timeout_secs = self.config.request_timeout;
1566
1567        let handle = get_runtime().spawn(async move {
1568            let timeout_dur = Duration::from_secs(request_timeout_secs);
1569            let result = tokio::time::timeout(
1570                timeout_dur,
1571                crate::execution::account::subscribe_account_summary(&client_clone, account_id),
1572            )
1573            .await;
1574
1575            match result {
1576                Ok(Ok((balances, margins))) => {
1577                    let ts_event = clock.get_time_ns();
1578                    let ts_now = clock.get_time_ns();
1579
1580                    let account_state = AccountState::new(
1581                        account_id,
1582                        account_type,
1583                        balances,
1584                        margins,
1585                        true,
1586                        UUID4::new(),
1587                        ts_event,
1588                        ts_now,
1589                        base_currency,
1590                    );
1591
1592                    let endpoint = MessagingSwitchboard::portfolio_update_account();
1593                    send_account_state(endpoint, &account_state);
1594                }
1595                Ok(Err(e)) => {
1596                    tracing::error!("Failed to query account state: {e}");
1597                }
1598                Err(_) => {
1599                    tracing::error!("Timeout waiting for account summary");
1600                }
1601            }
1602        });
1603
1604        self.pending_tasks
1605            .lock()
1606            .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
1607            .push(handle);
1608
1609        Ok(())
1610    }
1611
1612    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1613        let client = self.ib_client.as_ref().context("IB client not connected")?;
1614        let client_order_id = cmd.client_order_id;
1615        let trader_id = cmd.trader_id;
1616        let strategy_id = cmd.strategy_id;
1617        let instrument_id = cmd.instrument_id;
1618
1619        let target_ib_order_id: i32 = if let Some(venue_order_id) = &cmd.venue_order_id {
1620            venue_order_id
1621                .as_str()
1622                .parse()
1623                .context("Failed to parse venue_order_id as IB order id")?
1624        } else {
1625            let map = self
1626                .order_id_map
1627                .lock()
1628                .map_err(|_| anyhow::anyhow!("Failed to lock order_id_map"))?;
1629            *map.get(&cmd.client_order_id)
1630                .context("No venue order id for client_order_id")?
1631        };
1632
1633        let client_clone = client.as_arc().clone();
1634        let instrument_id_map = Arc::clone(&self.instrument_id_map);
1635        let instrument_provider = Arc::clone(&self.instrument_provider);
1636        let account_id = self.core.account_id;
1637        let exec_sender = get_exec_event_sender();
1638        let ts_init = get_atomic_clock_realtime().get_time_ns();
1639        let request_timeout_secs = self.config.request_timeout;
1640        let pending_cancel_orders = Arc::clone(&self.pending_cancel_orders);
1641
1642        let handle = get_runtime().spawn(async move {
1643            let timeout_dur = Duration::from_secs(request_timeout_secs);
1644            let mut subscription =
1645                match tokio::time::timeout(timeout_dur, client_clone.all_open_orders()).await {
1646                    Ok(Ok(s)) => s,
1647                    Ok(Err(e)) => {
1648                        tracing::error!("query_order: failed to request open orders: {e}");
1649                        return;
1650                    }
1651                    Err(_) => {
1652                        tracing::error!("query_order: timeout requesting open orders");
1653                        return;
1654                    }
1655                };
1656
1657            while let Some(order_result) = subscription.next().await {
1658                if let Ok(Orders::OrderData(data)) = order_result {
1659                    if data.order_id != target_ib_order_id {
1660                        continue;
1661                    }
1662
1663                    let instrument_id = match instrument_id_map.lock() {
1664                        Ok(map) => map.get(&data.order_id).copied(),
1665                        Err(_) => None,
1666                    };
1667                    let instrument_id = match instrument_id {
1668                        Some(id) => id,
1669                        None => match ib_contract_to_instrument_id_simple(&data.contract) {
1670                            Ok(id) => id,
1671                            Err(e) => {
1672                                tracing::warn!("query_order: failed to convert contract: {e}");
1673                                return;
1674                            }
1675                        },
1676                    };
1677
1678                    let report = match parse_order_status_to_report(
1679                        &IBOrderStatus {
1680                            order_id: data.order_id,
1681                            status: data.order_state.status.clone(),
1682                            filled: 0.0,
1683                            remaining: 0.0,
1684                            average_fill_price: 0.0,
1685                            perm_id: data.order.perm_id,
1686                            parent_id: 0,
1687                            last_fill_price: 0.0,
1688                            client_id: data.order.client_id,
1689                            why_held: String::new(),
1690                            market_cap_price: 0.0,
1691                        },
1692                        Some(&data.order),
1693                        instrument_id,
1694                        account_id,
1695                        &instrument_provider,
1696                        ts_init,
1697                    ) {
1698                        Ok(r) => r,
1699                        Err(e) => {
1700                            tracing::warn!("query_order: failed to parse order status: {e}");
1701                            return;
1702                        }
1703                    };
1704
1705                    if exec_sender
1706                        .send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
1707                            report,
1708                        ))))
1709                        .is_err()
1710                    {
1711                        tracing::error!("query_order: failed to send order status report");
1712                    }
1713                    return;
1714                }
1715            }
1716
1717            let was_pending_cancel = pending_cancel_orders
1718                .lock()
1719                .map(|mut pending| pending.remove(&client_order_id))
1720                .unwrap_or(false);
1721
1722            if was_pending_cancel {
1723                let event = OrderCanceled::new(
1724                    trader_id,
1725                    strategy_id,
1726                    instrument_id,
1727                    client_order_id,
1728                    UUID4::new(),
1729                    ts_init,
1730                    ts_init,
1731                    false,
1732                    Some(VenueOrderId::from(target_ib_order_id.to_string())),
1733                    Some(account_id),
1734                );
1735
1736                if exec_sender
1737                    .send(ExecutionEvent::Order(OrderEventAny::Canceled(event)))
1738                    .is_err()
1739                {
1740                    tracing::error!("query_order: failed to send inferred order canceled event");
1741                } else {
1742                    tracing::info!(
1743                        "query_order: inferred cancel for {} from missing open order {}",
1744                        client_order_id,
1745                        target_ib_order_id
1746                    );
1747                }
1748                return;
1749            }
1750
1751            tracing::debug!(
1752                "query_order: order {} not found in open orders (may be filled or canceled)",
1753                target_ib_order_id
1754            );
1755        });
1756
1757        self.pending_tasks
1758            .lock()
1759            .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
1760            .push(handle);
1761
1762        Ok(())
1763    }
1764
1765    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
1766        let client = self.ib_client.as_ref().context("IB client not connected")?;
1767
1768        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
1769
1770        let order_id_map = Arc::clone(&self.order_id_map);
1771        let venue_order_id_map = Arc::clone(&self.venue_order_id_map);
1772        let instrument_id_map = Arc::clone(&self.instrument_id_map);
1773        let trader_id_map = Arc::clone(&self.trader_id_map);
1774        let strategy_id_map = Arc::clone(&self.strategy_id_map);
1775        let next_order_id = Arc::clone(&self.next_order_id);
1776        let instrument_provider = Arc::clone(&self.instrument_provider);
1777        let exec_sender = get_exec_event_sender();
1778        let clock = get_atomic_clock_realtime();
1779        let account_id = self.core.account_id;
1780        let strategy_id = cmd.strategy_id;
1781        let accepted_orders = Arc::clone(&self.accepted_orders);
1782        let client_clone = client.as_arc().clone();
1783
1784        let handle = get_runtime().spawn(async move {
1785            if let Err(e) = Self::handle_submit_order_list_async(
1786                &cmd,
1787                &orders,
1788                &client_clone,
1789                &order_id_map,
1790                &venue_order_id_map,
1791                &instrument_id_map,
1792                &trader_id_map,
1793                &strategy_id_map,
1794                &next_order_id,
1795                &instrument_provider,
1796                &exec_sender,
1797                clock,
1798                account_id,
1799                strategy_id,
1800                &accepted_orders,
1801            )
1802            .await
1803            {
1804                tracing::error!("Error submitting order list: {e}");
1805            }
1806        });
1807
1808        self.pending_tasks
1809            .lock()
1810            .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
1811            .push(handle);
1812
1813        Ok(())
1814    }
1815
1816    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1817        let client = self.ib_client.as_ref().context("IB client not connected")?;
1818
1819        // Get order from cache before spawning async task (Rc doesn't work across async boundaries)
1820        let original_order = {
1821            let cache = self.core.cache();
1822            cache
1823                .order(&cmd.client_order_id)
1824                .cloned()
1825                .context("Order not found in cache")?
1826        };
1827
1828        let order_id_map = Arc::clone(&self.order_id_map);
1829        let venue_order_id_map = Arc::clone(&self.venue_order_id_map);
1830        let instrument_provider = Arc::clone(&self.instrument_provider);
1831        let exec_sender = get_exec_event_sender();
1832        let clock = get_atomic_clock_realtime();
1833        let account_id = self.core.account_id;
1834        let client_clone = client.as_arc().clone();
1835        let original_order = Arc::new(original_order);
1836
1837        let handle = get_runtime().spawn(async move {
1838            if let Err(e) = Self::handle_modify_order_async(
1839                &cmd,
1840                &client_clone,
1841                &order_id_map,
1842                &venue_order_id_map,
1843                &instrument_provider,
1844                &exec_sender,
1845                clock,
1846                account_id,
1847                &original_order,
1848            )
1849            .await
1850            {
1851                tracing::error!("Error modifying order: {e}");
1852            }
1853        });
1854
1855        self.pending_tasks
1856            .lock()
1857            .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
1858            .push(handle);
1859
1860        Ok(())
1861    }
1862
1863    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1864        let client = self.ib_client.as_ref().context("IB client not connected")?;
1865
1866        let order_id_map = Arc::clone(&self.order_id_map);
1867        let instrument_id_map = Arc::clone(&self.instrument_id_map);
1868        let trader_id_map = Arc::clone(&self.trader_id_map);
1869        let strategy_id_map = Arc::clone(&self.strategy_id_map);
1870        let pending_cancel_orders = Arc::clone(&self.pending_cancel_orders);
1871        let exec_sender = get_exec_event_sender();
1872        let clock = get_atomic_clock_realtime();
1873        let account_id = self.core.account_id;
1874        let client_clone = client.as_arc().clone();
1875
1876        let handle = get_runtime().spawn(async move {
1877            if let Err(e) = Self::handle_cancel_order_async(
1878                &cmd,
1879                &client_clone,
1880                &order_id_map,
1881                &instrument_id_map,
1882                &trader_id_map,
1883                &strategy_id_map,
1884                &pending_cancel_orders,
1885                &exec_sender,
1886                clock.get_time_ns(),
1887                account_id,
1888            )
1889            .await
1890            {
1891                tracing::error!("Error canceling order: {e}");
1892            }
1893        });
1894
1895        self.pending_tasks
1896            .lock()
1897            .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
1898            .push(handle);
1899
1900        Ok(())
1901    }
1902
1903    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1904        let client = self.ib_client.as_ref().context("IB client not connected")?;
1905
1906        // Warn if order_side is specified (IB doesn't support side filtering)
1907        if cmd.order_side != OrderSide::NoOrderSide {
1908            tracing::warn!(
1909                "Interactive Brokers does not support order_side filtering for cancel all orders; \
1910                ignoring order_side={:?} and canceling all orders",
1911                cmd.order_side
1912            );
1913        }
1914
1915        // Get open orders from cache before spawning async task (Rc doesn't work across async boundaries)
1916        // Note: In Rust, instrument_id is always required, so we always filter by it
1917        let orders_to_cancel: Vec<(ClientOrderId, Option<VenueOrderId>)> = {
1918            let cache = self.core.cache();
1919            let mut orders_to_cancel: Vec<(ClientOrderId, Option<VenueOrderId>)> = cache
1920                .orders_open(
1921                    None,                     // venue
1922                    Some(&cmd.instrument_id), // instrument_id (always filter by it in Rust)
1923                    None,                     // strategy_id
1924                    None,                     // account_id
1925                    None,                     // side (IB doesn't support side filtering)
1926                )
1927                .iter()
1928                .map(|order| (order.client_order_id(), order.venue_order_id()))
1929                .collect();
1930
1931            if orders_to_cancel.is_empty() {
1932                let instrument_id_map = self
1933                    .instrument_id_map
1934                    .lock()
1935                    .map_err(|_| anyhow::anyhow!("Failed to lock instrument ID map"))?;
1936
1937                let venue_map = self
1938                    .venue_order_id_map
1939                    .lock()
1940                    .map_err(|_| anyhow::anyhow!("Failed to lock venue order ID map"))?;
1941
1942                orders_to_cancel.extend(instrument_id_map.iter().filter_map(
1943                    |(order_id, instrument_id)| {
1944                        (*instrument_id == cmd.instrument_id)
1945                            .then_some(*order_id)
1946                            .and_then(|ib_order_id| {
1947                                venue_map.get(&ib_order_id).copied().map(|client_order_id| {
1948                                    (
1949                                        client_order_id,
1950                                        Some(VenueOrderId::from(ib_order_id.to_string())),
1951                                    )
1952                                })
1953                            })
1954                    },
1955                ));
1956            }
1957
1958            orders_to_cancel.sort_by_key(|(client_order_id, _)| client_order_id.to_string());
1959            orders_to_cancel.dedup_by_key(|(client_order_id, _)| *client_order_id);
1960            orders_to_cancel
1961        };
1962
1963        if orders_to_cancel.is_empty() {
1964            tracing::info!("No open orders to cancel");
1965            return Ok(());
1966        }
1967
1968        tracing::info!(
1969            "Canceling {} open order(s) for instrument {}",
1970            orders_to_cancel.len(),
1971            cmd.instrument_id
1972        );
1973
1974        let client_clone = client.as_arc().clone();
1975        let order_id_map = Arc::clone(&self.order_id_map);
1976        let instrument_id_map = Arc::clone(&self.instrument_id_map);
1977        let trader_id_map = Arc::clone(&self.trader_id_map);
1978        let strategy_id_map = Arc::clone(&self.strategy_id_map);
1979        let pending_cancel_orders = Arc::clone(&self.pending_cancel_orders);
1980        let exec_sender = get_exec_event_sender();
1981        let clock = get_atomic_clock_realtime();
1982        let account_id = self.core.account_id;
1983
1984        let handle = get_runtime().spawn(async move {
1985            if let Err(e) = Self::handle_cancel_all_orders_async(
1986                &client_clone,
1987                &order_id_map,
1988                &instrument_id_map,
1989                &trader_id_map,
1990                &strategy_id_map,
1991                &pending_cancel_orders,
1992                &exec_sender,
1993                clock.get_time_ns(),
1994                account_id,
1995                orders_to_cancel,
1996            )
1997            .await
1998            {
1999                tracing::error!("Error canceling all orders: {e}");
2000            }
2001        });
2002
2003        self.pending_tasks
2004            .lock()
2005            .map_err(|_| anyhow::anyhow!("Failed to lock pending tasks"))?
2006            .push(handle);
2007
2008        Ok(())
2009    }
2010
2011    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
2012        // Cancel each order in the batch
2013        for cancel_cmd in cmd.cancels {
2014            self.cancel_order(cancel_cmd)?;
2015        }
2016        Ok(())
2017    }
2018}
2019
2020#[allow(dead_code)]
2021impl InteractiveBrokersExecutionClient {
2022    /// Handles cancel all orders asynchronously.
2023    ///
2024    /// # Errors
2025    ///
2026    /// Returns an error if the global cancel request fails.
2027    async fn handle_cancel_order_async(
2028        cmd: &CancelOrder,
2029        client: &Arc<Client>,
2030        order_id_map: &Arc<Mutex<AHashMap<ClientOrderId, i32>>>,
2031        instrument_id_map: &Arc<Mutex<AHashMap<i32, InstrumentId>>>,
2032        trader_id_map: &Arc<Mutex<AHashMap<i32, TraderId>>>,
2033        strategy_id_map: &Arc<Mutex<AHashMap<i32, StrategyId>>>,
2034        pending_cancel_orders: &Arc<Mutex<ahash::AHashSet<ClientOrderId>>>,
2035        exec_sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
2036        ts_init: UnixNanos,
2037        account_id: AccountId,
2038    ) -> anyhow::Result<()> {
2039        let ib_order_id = if let Some(venue_order_id) = &cmd.venue_order_id {
2040            // Use venue order ID directly if available
2041            venue_order_id
2042                .as_str()
2043                .parse::<i32>()
2044                .map_err(|e| anyhow::anyhow!("Failed to parse venue order ID: {e}"))?
2045        } else {
2046            // Otherwise look it up from mapping
2047            let map = order_id_map
2048                .lock()
2049                .map_err(|_| anyhow::anyhow!("Failed to lock order ID map"))?;
2050            *map.get(&cmd.client_order_id)
2051                .context("No IB order ID mapping found for client order ID")?
2052        };
2053
2054        client
2055            .cancel_order(ib_order_id, "")
2056            .await
2057            .context("Failed to cancel order with IB")?;
2058
2059        Self::emit_order_pending_cancel(
2060            ib_order_id,
2061            cmd.client_order_id,
2062            instrument_id_map,
2063            trader_id_map,
2064            strategy_id_map,
2065            pending_cancel_orders,
2066            exec_sender,
2067            ts_init,
2068            account_id,
2069        )?;
2070
2071        Ok(())
2072    }
2073
2074    async fn handle_cancel_all_orders_async(
2075        client: &Arc<Client>,
2076        order_id_map: &Arc<Mutex<AHashMap<ClientOrderId, i32>>>,
2077        instrument_id_map: &Arc<Mutex<AHashMap<i32, InstrumentId>>>,
2078        trader_id_map: &Arc<Mutex<AHashMap<i32, TraderId>>>,
2079        strategy_id_map: &Arc<Mutex<AHashMap<i32, StrategyId>>>,
2080        pending_cancel_orders: &Arc<Mutex<ahash::AHashSet<ClientOrderId>>>,
2081        exec_sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
2082        ts_init: UnixNanos,
2083        account_id: AccountId,
2084        orders_to_cancel: Vec<(ClientOrderId, Option<VenueOrderId>)>,
2085    ) -> anyhow::Result<()> {
2086        // Get all IB order IDs first, then drop the guard before awaiting
2087        let ib_order_ids: Vec<(ClientOrderId, i32)> = {
2088            let order_id_map_guard = order_id_map
2089                .lock()
2090                .map_err(|_| anyhow::anyhow!("Failed to lock order ID map"))?;
2091
2092            orders_to_cancel
2093                .into_iter()
2094                .filter_map(|(client_order_id, venue_order_id)| {
2095                    if let Some(venue_order_id) = venue_order_id {
2096                        return venue_order_id
2097                            .as_str()
2098                            .parse::<i32>()
2099                            .ok()
2100                            .map(|ib_order_id| (client_order_id, ib_order_id));
2101                    }
2102
2103                    order_id_map_guard
2104                        .get(&client_order_id)
2105                        .copied()
2106                        .map(|ib_order_id| (client_order_id, ib_order_id))
2107                })
2108                .collect()
2109        };
2110
2111        // Now cancel each order (guard is dropped, so we can await)
2112        for (client_order_id, ib_order_id) in ib_order_ids {
2113            if let Err(e) = client.cancel_order(ib_order_id, "").await {
2114                tracing::error!(
2115                    "Failed to cancel order {} (IB order ID: {}): {e}",
2116                    client_order_id,
2117                    ib_order_id
2118                );
2119            } else {
2120                if let Err(e) = Self::emit_order_pending_cancel(
2121                    ib_order_id,
2122                    client_order_id,
2123                    instrument_id_map,
2124                    trader_id_map,
2125                    strategy_id_map,
2126                    pending_cancel_orders,
2127                    exec_sender,
2128                    ts_init,
2129                    account_id,
2130                ) {
2131                    tracing::error!(
2132                        "Failed to emit pending cancel for order {} (IB order ID: {}): {e}",
2133                        client_order_id,
2134                        ib_order_id
2135                    );
2136                }
2137                tracing::debug!(
2138                    "Canceled order {} (IB order ID: {})",
2139                    client_order_id,
2140                    ib_order_id
2141                );
2142            }
2143        }
2144
2145        tracing::info!("Finished canceling all orders");
2146
2147        Ok(())
2148    }
2149
2150    #[allow(clippy::too_many_arguments)]
2151    fn emit_order_pending_cancel(
2152        order_id: i32,
2153        client_order_id: ClientOrderId,
2154        instrument_id_map: &Arc<Mutex<AHashMap<i32, InstrumentId>>>,
2155        trader_id_map: &Arc<Mutex<AHashMap<i32, TraderId>>>,
2156        strategy_id_map: &Arc<Mutex<AHashMap<i32, StrategyId>>>,
2157        pending_cancel_orders: &Arc<Mutex<ahash::AHashSet<ClientOrderId>>>,
2158        exec_sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
2159        ts_init: UnixNanos,
2160        account_id: AccountId,
2161    ) -> anyhow::Result<()> {
2162        let mut pending = pending_cancel_orders
2163            .lock()
2164            .map_err(|_| anyhow::anyhow!("Failed to lock pending cancel orders map"))?;
2165        if !pending.insert(client_order_id) {
2166            return Ok(());
2167        }
2168        drop(pending);
2169
2170        let instrument_id = Self::get_mapped_instrument_id(order_id, instrument_id_map)?
2171            .context("Instrument ID not found for pending cancel order")?;
2172        let (trader_id, strategy_id) =
2173            Self::get_required_order_actor_ids(order_id, trader_id_map, strategy_id_map)?;
2174
2175        let event = OrderPendingCancel::new(
2176            trader_id,
2177            strategy_id,
2178            instrument_id,
2179            client_order_id,
2180            account_id,
2181            UUID4::new(),
2182            ts_init,
2183            ts_init,
2184            false,
2185            Some(VenueOrderId::from(order_id.to_string())),
2186        );
2187
2188        exec_sender
2189            .send(ExecutionEvent::Order(OrderEventAny::PendingCancel(event)))
2190            .map_err(|e| anyhow::anyhow!("Failed to send order pending cancel event: {e}"))?;
2191
2192        Ok(())
2193    }
2194}
2195
2196const MUTEX_POISONED: &str = "Mutex poisoned";