Skip to main content

nautilus_interactive_brokers/data/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Core data client implementation for Interactive Brokers.
17
18#[path = "core_streams.rs"]
19mod streams;
20
21use std::{
22    fmt::Debug,
23    sync::{
24        Arc,
25        atomic::{AtomicBool, Ordering},
26    },
27};
28
29use ahash::AHashMap;
30use anyhow::Context;
31#[cfg(feature = "python")]
32use chrono::{DateTime, Utc};
33use ibapi::{
34    contracts::{Contract, Currency as IBCurrency, Exchange as IBExchange, SecurityType, Symbol},
35    market_data::historical::ToDuration,
36};
37use nautilus_common::{
38    clients::DataClient,
39    live::{get_runtime, runner::get_data_event_sender},
40    messages::{
41        DataEvent, DataResponse,
42        data::{
43            BarsResponse, InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
44            RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
45            SubscribeBookDeltas, SubscribeIndexPrices, SubscribeOptionGreeks, SubscribeQuotes,
46            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
47            UnsubscribeIndexPrices, UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades,
48        },
49    },
50};
51#[cfg(feature = "python")]
52use nautilus_core::{Params, UUID4};
53use nautilus_core::{
54    UnixNanos,
55    time::{AtomicTime, get_atomic_clock_realtime},
56};
57#[cfg(feature = "python")]
58use nautilus_model::data::{Bar, BarType, Data, QuoteTick, TradeTick};
59use nautilus_model::{
60    enums::BookType,
61    identifiers::{ClientId, InstrumentId, Venue},
62    instruments::{Instrument, any::InstrumentAny},
63};
64#[cfg(feature = "python")]
65use pyo3::{IntoPyObjectExt, prelude::*};
66use tokio::task::JoinHandle;
67use tokio_util::sync::CancellationToken;
68
69use self::streams::{
70    handle_historical_bars_subscription, handle_index_price_subscription,
71    handle_market_depth_subscription, handle_option_greeks_subscription, handle_quote_subscription,
72    handle_realtime_bars_subscription, handle_tick_by_tick_quote_subscription,
73    handle_trade_subscription,
74};
75use super::{
76    cache::{OptionGreeksCache, QuoteCache},
77    convert::{
78        bar_type_to_ib_bar_size, calculate_duration, calculate_duration_segments,
79        chrono_to_ib_datetime, ib_bar_to_nautilus_bar, price_type_to_ib_what_to_show,
80    },
81};
82use crate::{
83    common::{consts::IB_VENUE, shared_client::SharedClientHandle},
84    config::InteractiveBrokersDataClientConfig,
85    providers::instruments::InteractiveBrokersInstrumentProvider,
86};
87
88/// Interactive Brokers data client.
89///
90/// This client provides market data functionality using the `rust-ibapi` library.
91/// It manages subscriptions, handles historical data requests, and streams
92/// market data to NautilusTrader.
93#[cfg_attr(
94    feature = "python",
95    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.interactive_brokers")
96)]
97pub struct InteractiveBrokersDataClient {
98    /// Client identifier.
99    client_id: ClientId,
100    /// Configuration for the client.
101    config: InteractiveBrokersDataClientConfig,
102    /// Instrument provider.
103    instrument_provider: Arc<InteractiveBrokersInstrumentProvider>,
104    /// Connection state.
105    is_connected: AtomicBool,
106    /// Cancellation token for stopping tasks.
107    cancellation_token: CancellationToken,
108    /// Active task handles.
109    tasks: Vec<JoinHandle<()>>,
110    /// Data event sender.
111    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
112    /// Active subscriptions mapped by instrument ID.
113    subscriptions: Arc<tokio::sync::Mutex<AHashMap<InstrumentId, SubscriptionInfo>>>,
114    /// Active option greeks subscriptions mapped by instrument ID.
115    option_greeks_subscriptions: Arc<tokio::sync::Mutex<AHashMap<InstrumentId, CancellationToken>>>,
116    /// Quote cache for accumulating tick updates.
117    quote_cache: Arc<tokio::sync::Mutex<QuoteCache>>,
118    /// Option greeks cache for merging IB option-computation ticks.
119    option_greeks_cache: Arc<tokio::sync::Mutex<OptionGreeksCache>>,
120    /// Clock for timestamping.
121    clock: &'static AtomicTime,
122    /// IB API client (shared per host/port/client_id when both data and execution connect).
123    ib_client: Option<SharedClientHandle>,
124    /// Last bar for each bar type (for bar completion timeout tracking).
125    last_bars: Arc<tokio::sync::Mutex<AHashMap<String, ibapi::market_data::realtime::Bar>>>,
126    /// Active timeout tasks for bar completion.
127    bar_timeout_tasks: Arc<tokio::sync::Mutex<AHashMap<String, tokio::task::JoinHandle<()>>>>,
128}
129
130/// Information about an active subscription.
131#[derive(Debug)]
132#[allow(dead_code)]
133struct SubscriptionInfo {
134    /// Instrument ID for the subscription.
135    instrument_id: InstrumentId,
136    /// Subscription type.
137    subscription_type: SubscriptionType,
138    /// Cancellation token for this specific subscription.
139    cancellation_token: CancellationToken,
140}
141
142/// Type of subscription.
143#[derive(Debug, Clone)]
144enum SubscriptionType {
145    /// Quote subscription.
146    Quotes,
147    /// Index price subscription.
148    IndexPrices,
149    /// Trade subscription.
150    Trades,
151    /// Bar subscription.
152    Bars,
153    /// Order book delta subscription.
154    BookDeltas,
155}
156
157#[cfg(feature = "python")]
158static DATA_EVENT_CALLBACK: std::sync::OnceLock<std::sync::Mutex<Option<Py<PyAny>>>> =
159    std::sync::OnceLock::new();
160
161#[cfg(feature = "python")]
162thread_local! {
163    static DATA_EVENT_BRIDGE_INITIALIZED: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
164}
165
166#[cfg(feature = "python")]
167fn data_event_callback() -> &'static std::sync::Mutex<Option<Py<PyAny>>> {
168    DATA_EVENT_CALLBACK.get_or_init(|| std::sync::Mutex::new(None))
169}
170
171#[cfg(feature = "python")]
172fn string_hash_map_to_params(
173    map: Option<std::collections::HashMap<String, String>>,
174) -> Option<Params> {
175    map.map(|m| {
176        let mut params = Params::new();
177        for (key, value) in m {
178            params.insert(key, serde_json::Value::String(value));
179        }
180        params
181    })
182}
183
184#[cfg(feature = "python")]
185fn unix_nanos_to_datetime(nanos: Option<u64>) -> Option<DateTime<Utc>> {
186    nanos.and_then(|value| {
187        let secs = value / 1_000_000_000;
188        let nanos_part = (value % 1_000_000_000) as u32;
189        DateTime::from_timestamp(secs as i64, nanos_part)
190    })
191}
192
193#[cfg(feature = "python")]
194fn u64_to_nonzero_usize(value: u64) -> Option<std::num::NonZeroUsize> {
195    std::num::NonZeroUsize::new(value as usize)
196}
197
198#[cfg(feature = "python")]
199fn u16_to_nonzero_usize(value: u16) -> Option<std::num::NonZeroUsize> {
200    std::num::NonZeroUsize::new(value as usize)
201}
202
203fn parse_start_ns(params: Option<&nautilus_core::Params>) -> Option<UnixNanos> {
204    params
205        .and_then(|params| params.get_u64("start_ns"))
206        .or_else(|| {
207            params
208                .and_then(|params| params.get_str("start_ns"))
209                .and_then(|value| value.parse::<u64>().ok())
210        })
211        .map(UnixNanos::from)
212}
213
214#[cfg(feature = "python")]
215fn py_list_from_quotes(py: Python<'_>, values: Vec<QuoteTick>) -> PyResult<Py<PyAny>> {
216    let items: PyResult<Vec<Py<PyAny>>> = values
217        .into_iter()
218        .map(|value| value.into_py_any(py))
219        .collect();
220    items?.into_py_any(py)
221}
222
223#[cfg(feature = "python")]
224fn py_list_from_trades(py: Python<'_>, values: Vec<TradeTick>) -> PyResult<Py<PyAny>> {
225    let items: PyResult<Vec<Py<PyAny>>> = values
226        .into_iter()
227        .map(|value| value.into_py_any(py))
228        .collect();
229    items?.into_py_any(py)
230}
231
232#[cfg(feature = "python")]
233fn py_list_from_bars(py: Python<'_>, values: Vec<Bar>) -> PyResult<Py<PyAny>> {
234    let items: PyResult<Vec<Py<PyAny>>> = values
235        .into_iter()
236        .map(|value| value.into_py_any(py))
237        .collect();
238    items?.into_py_any(py)
239}
240
241#[cfg(feature = "python")]
242fn dispatch_python_data_event(
243    py: Python<'_>,
244    callback: &Py<PyAny>,
245    event: DataEvent,
246) -> PyResult<()> {
247    let (kind, correlation_id, payload) = match event {
248        DataEvent::Data(data) => match data {
249            Data::Quote(quote) => ("quote", None, quote.into_py_any(py)?),
250            Data::Trade(trade) => ("trade", None, trade.into_py_any(py)?),
251            Data::Bar(bar) => ("bar", None, bar.into_py_any(py)?),
252            Data::Delta(delta) => ("delta", None, delta.into_py_any(py)?),
253            Data::IndexPriceUpdate(index_price) => {
254                ("index_price", None, index_price.into_py_any(py)?)
255            }
256            other => {
257                tracing::debug!("Ignoring unsupported IB data event payload: {:?}", other);
258                return Ok(());
259            }
260        },
261        DataEvent::OptionGreeks(greeks) => ("option_greeks", None, greeks.into_py_any(py)?),
262        DataEvent::Instrument(instrument) => (
263            "instrument",
264            None,
265            nautilus_model::python::instruments::instrument_any_to_pyobject(py, instrument)?,
266        ),
267        DataEvent::Response(response) => match response {
268            DataResponse::Instrument(response) => (
269                "instrument_response",
270                Some(response.correlation_id.to_string()),
271                nautilus_model::python::instruments::instrument_any_to_pyobject(py, response.data)?,
272            ),
273            DataResponse::Instruments(response) => (
274                "instruments_response",
275                Some(response.correlation_id.to_string()),
276                response
277                    .data
278                    .into_iter()
279                    .map(|instrument| {
280                        nautilus_model::python::instruments::instrument_any_to_pyobject(
281                            py, instrument,
282                        )
283                    })
284                    .collect::<PyResult<Vec<_>>>()?
285                    .into_py_any(py)?,
286            ),
287            DataResponse::Quotes(response) => (
288                "quotes_response",
289                Some(response.correlation_id.to_string()),
290                py_list_from_quotes(py, response.data)?,
291            ),
292            DataResponse::Trades(response) => (
293                "trades_response",
294                Some(response.correlation_id.to_string()),
295                py_list_from_trades(py, response.data)?,
296            ),
297            DataResponse::Bars(response) => (
298                "bars_response",
299                Some(response.correlation_id.to_string()),
300                py_list_from_bars(py, response.data)?,
301            ),
302            other => {
303                tracing::debug!("Ignoring unsupported IB data response payload: {:?}", other);
304                return Ok(());
305            }
306        },
307        other => {
308            tracing::debug!("Ignoring unsupported IB data event variant: {:?}", other);
309            return Ok(());
310        }
311    };
312
313    callback.call1(py, (kind, correlation_id, payload))?;
314    Ok(())
315}
316
317impl InteractiveBrokersDataClient {
318    /// Create a new `InteractiveBrokersDataClient`.
319    ///
320    /// # Arguments
321    ///
322    /// * `client_id` - Client identifier
323    /// * `config` - Configuration for the client
324    /// * `instrument_provider` - Instrument provider
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if client creation fails.
329    pub fn new(
330        client_id: ClientId,
331        config: InteractiveBrokersDataClientConfig,
332        instrument_provider: Arc<InteractiveBrokersInstrumentProvider>,
333    ) -> anyhow::Result<Self> {
334        let clock = get_atomic_clock_realtime();
335        let data_sender = get_data_event_sender();
336
337        Ok(Self {
338            client_id,
339            config,
340            instrument_provider,
341            is_connected: AtomicBool::new(false),
342            cancellation_token: CancellationToken::new(),
343            tasks: Vec::new(),
344            data_sender,
345            subscriptions: Arc::new(tokio::sync::Mutex::new(AHashMap::new())),
346            option_greeks_subscriptions: Arc::new(tokio::sync::Mutex::new(AHashMap::new())),
347            quote_cache: Arc::new(tokio::sync::Mutex::new(QuoteCache::new())),
348            option_greeks_cache: Arc::new(tokio::sync::Mutex::new(OptionGreeksCache::new())),
349            clock,
350            ib_client: None,
351            last_bars: Arc::new(tokio::sync::Mutex::new(AHashMap::new())),
352            bar_timeout_tasks: Arc::new(tokio::sync::Mutex::new(AHashMap::new())),
353        })
354    }
355
356    #[cfg(feature = "python")]
357    pub(crate) fn new_for_python(
358        config: InteractiveBrokersDataClientConfig,
359        instrument_provider: crate::providers::instruments::InteractiveBrokersInstrumentProvider,
360    ) -> anyhow::Result<Self> {
361        Self::ensure_python_event_bridge();
362        let client_id = ClientId::from(format!("IB-{:03}", config.client_id));
363        Self::new(client_id, config, Arc::new(instrument_provider))
364    }
365
366    #[cfg(feature = "python")]
367    pub(crate) fn register_python_event_callback(&self, callback: Py<PyAny>) {
368        *data_event_callback()
369            .lock()
370            .expect("data event callback mutex poisoned") = Some(callback);
371    }
372
373    #[cfg(feature = "python")]
374    fn ensure_python_event_bridge() {
375        if nautilus_common::live::runner::try_get_data_event_sender().is_some() {
376            return;
377        }
378
379        DATA_EVENT_BRIDGE_INITIALIZED.with(|initialized| {
380            if initialized.replace(true) {
381                return;
382            }
383
384            let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
385            nautilus_common::live::runner::set_data_event_sender(sender);
386
387            get_runtime().spawn(async move {
388                while let Some(event) = receiver.recv().await {
389                    Python::attach(|py| {
390                        let callback_guard = data_event_callback()
391                            .lock()
392                            .expect("data event callback mutex poisoned");
393
394                        let Some(callback) = callback_guard.as_ref() else {
395                            return;
396                        };
397
398                        if let Err(e) = dispatch_python_data_event(py, callback, event) {
399                            tracing::error!("Failed to dispatch IB data event to Python: {e}");
400                        }
401                    });
402                }
403            });
404        });
405    }
406
407    #[cfg(feature = "python")]
408    pub(crate) fn subscribe_quotes_for_python(
409        &mut self,
410        instrument_id: InstrumentId,
411        params: Option<std::collections::HashMap<String, String>>,
412    ) -> anyhow::Result<()> {
413        let cmd = SubscribeQuotes {
414            instrument_id,
415            client_id: Some(self.client_id()),
416            venue: Some(instrument_id.venue),
417            command_id: UUID4::new(),
418            ts_init: self.clock.get_time_ns(),
419            correlation_id: None,
420            params: string_hash_map_to_params(params),
421        };
422        DataClient::subscribe_quotes(self, cmd)
423    }
424
425    #[cfg(feature = "python")]
426    pub(crate) fn subscribe_index_prices_for_python(
427        &mut self,
428        instrument_id: InstrumentId,
429    ) -> anyhow::Result<()> {
430        let cmd = SubscribeIndexPrices {
431            instrument_id,
432            client_id: Some(self.client_id()),
433            venue: Some(instrument_id.venue),
434            command_id: UUID4::new(),
435            ts_init: self.clock.get_time_ns(),
436            correlation_id: None,
437            params: None,
438        };
439        DataClient::subscribe_index_prices(self, cmd)
440    }
441
442    #[cfg(feature = "python")]
443    pub(crate) fn subscribe_option_greeks_for_python(
444        &mut self,
445        instrument_id: InstrumentId,
446    ) -> anyhow::Result<()> {
447        let cmd = SubscribeOptionGreeks {
448            instrument_id,
449            client_id: Some(self.client_id()),
450            venue: Some(instrument_id.venue),
451            command_id: UUID4::new(),
452            ts_init: self.clock.get_time_ns(),
453            correlation_id: None,
454            params: None,
455        };
456        DataClient::subscribe_option_greeks(self, cmd)
457    }
458
459    #[cfg(feature = "python")]
460    pub(crate) fn subscribe_trades_for_python(
461        &mut self,
462        instrument_id: InstrumentId,
463    ) -> anyhow::Result<()> {
464        let cmd = SubscribeTrades {
465            instrument_id,
466            client_id: Some(self.client_id()),
467            venue: Some(instrument_id.venue),
468            command_id: UUID4::new(),
469            ts_init: self.clock.get_time_ns(),
470            correlation_id: None,
471            params: None,
472        };
473        DataClient::subscribe_trades(self, cmd)
474    }
475
476    #[cfg(feature = "python")]
477    pub(crate) fn subscribe_bars_for_python(
478        &mut self,
479        bar_type: BarType,
480        params: Option<std::collections::HashMap<String, String>>,
481    ) -> anyhow::Result<()> {
482        let cmd = SubscribeBars {
483            bar_type,
484            client_id: Some(self.client_id()),
485            venue: Some(bar_type.instrument_id().venue),
486            command_id: UUID4::new(),
487            ts_init: self.clock.get_time_ns(),
488            correlation_id: None,
489            params: string_hash_map_to_params(params),
490        };
491        DataClient::subscribe_bars(self, cmd)
492    }
493
494    #[cfg(feature = "python")]
495    pub(crate) fn subscribe_book_deltas_for_python(
496        &mut self,
497        instrument_id: InstrumentId,
498        depth: Option<u16>,
499        params: Option<std::collections::HashMap<String, String>>,
500    ) -> anyhow::Result<()> {
501        let cmd = SubscribeBookDeltas {
502            instrument_id,
503            book_type: BookType::L2_MBP,
504            client_id: Some(self.client_id()),
505            venue: Some(instrument_id.venue),
506            command_id: UUID4::new(),
507            ts_init: self.clock.get_time_ns(),
508            depth: u16_to_nonzero_usize(depth.unwrap_or(20)),
509            managed: true,
510            correlation_id: None,
511            params: string_hash_map_to_params(params),
512        };
513        DataClient::subscribe_book_deltas(self, cmd)
514    }
515
516    #[cfg(feature = "python")]
517    pub(crate) fn unsubscribe_quotes_for_python(
518        &mut self,
519        instrument_id: InstrumentId,
520    ) -> anyhow::Result<()> {
521        let cmd = UnsubscribeQuotes {
522            instrument_id,
523            client_id: Some(self.client_id()),
524            venue: Some(instrument_id.venue),
525            command_id: UUID4::new(),
526            ts_init: self.clock.get_time_ns(),
527            correlation_id: None,
528            params: None,
529        };
530        DataClient::unsubscribe_quotes(self, &cmd)
531    }
532
533    #[cfg(feature = "python")]
534    pub(crate) fn unsubscribe_index_prices_for_python(
535        &mut self,
536        instrument_id: InstrumentId,
537    ) -> anyhow::Result<()> {
538        let cmd = UnsubscribeIndexPrices {
539            instrument_id,
540            client_id: Some(self.client_id()),
541            venue: Some(instrument_id.venue),
542            command_id: UUID4::new(),
543            ts_init: self.clock.get_time_ns(),
544            correlation_id: None,
545            params: None,
546        };
547        DataClient::unsubscribe_index_prices(self, &cmd)
548    }
549
550    #[cfg(feature = "python")]
551    pub(crate) fn unsubscribe_option_greeks_for_python(
552        &mut self,
553        instrument_id: InstrumentId,
554    ) -> anyhow::Result<()> {
555        let cmd = UnsubscribeOptionGreeks {
556            instrument_id,
557            client_id: Some(self.client_id()),
558            venue: Some(instrument_id.venue),
559            command_id: UUID4::new(),
560            ts_init: self.clock.get_time_ns(),
561            correlation_id: None,
562            params: None,
563        };
564        DataClient::unsubscribe_option_greeks(self, &cmd)
565    }
566
567    #[cfg(feature = "python")]
568    pub(crate) fn unsubscribe_trades_for_python(
569        &mut self,
570        instrument_id: InstrumentId,
571    ) -> anyhow::Result<()> {
572        let cmd = UnsubscribeTrades {
573            instrument_id,
574            client_id: Some(self.client_id()),
575            venue: Some(instrument_id.venue),
576            command_id: UUID4::new(),
577            ts_init: self.clock.get_time_ns(),
578            correlation_id: None,
579            params: None,
580        };
581        DataClient::unsubscribe_trades(self, &cmd)
582    }
583
584    #[cfg(feature = "python")]
585    pub(crate) fn unsubscribe_bars_for_python(&mut self, bar_type: BarType) -> anyhow::Result<()> {
586        let cmd = UnsubscribeBars {
587            bar_type,
588            client_id: Some(self.client_id()),
589            venue: Some(bar_type.instrument_id().venue),
590            command_id: UUID4::new(),
591            ts_init: self.clock.get_time_ns(),
592            correlation_id: None,
593            params: None,
594        };
595        DataClient::unsubscribe_bars(self, &cmd)
596    }
597
598    #[cfg(feature = "python")]
599    pub(crate) fn unsubscribe_book_deltas_for_python(
600        &mut self,
601        instrument_id: InstrumentId,
602    ) -> anyhow::Result<()> {
603        let cmd = UnsubscribeBookDeltas {
604            instrument_id,
605            client_id: Some(self.client_id()),
606            venue: Some(instrument_id.venue),
607            command_id: UUID4::new(),
608            ts_init: self.clock.get_time_ns(),
609            correlation_id: None,
610            params: None,
611        };
612        DataClient::unsubscribe_book_deltas(self, &cmd)
613    }
614
615    #[cfg(feature = "python")]
616    pub(crate) fn request_quotes_for_python(
617        &self,
618        instrument_id: InstrumentId,
619        limit: Option<u64>,
620        start: Option<u64>,
621        end: Option<u64>,
622        request_id: Option<String>,
623    ) -> anyhow::Result<()> {
624        let req = RequestQuotes {
625            instrument_id,
626            start: unix_nanos_to_datetime(start),
627            end: unix_nanos_to_datetime(end),
628            limit: u64_to_nonzero_usize(limit.unwrap_or(10_000)),
629            client_id: Some(self.client_id()),
630            request_id: request_id.map_or_else(UUID4::new, UUID4::from),
631            ts_init: self.clock.get_time_ns(),
632            params: None,
633        };
634        DataClient::request_quotes(self, req)
635    }
636
637    #[cfg(feature = "python")]
638    pub(crate) fn request_trades_for_python(
639        &self,
640        instrument_id: InstrumentId,
641        limit: Option<u64>,
642        start: Option<u64>,
643        end: Option<u64>,
644        request_id: Option<String>,
645    ) -> anyhow::Result<()> {
646        let req = RequestTrades {
647            instrument_id,
648            start: unix_nanos_to_datetime(start),
649            end: unix_nanos_to_datetime(end),
650            limit: u64_to_nonzero_usize(limit.unwrap_or(10_000)),
651            client_id: Some(self.client_id()),
652            request_id: request_id.map_or_else(UUID4::new, UUID4::from),
653            ts_init: self.clock.get_time_ns(),
654            params: None,
655        };
656        DataClient::request_trades(self, req)
657    }
658
659    #[cfg(feature = "python")]
660    pub(crate) fn request_bars_for_python(
661        &self,
662        bar_type: BarType,
663        limit: Option<u64>,
664        start: Option<u64>,
665        end: Option<u64>,
666        request_id: Option<String>,
667    ) -> anyhow::Result<()> {
668        let req = RequestBars {
669            bar_type,
670            start: unix_nanos_to_datetime(start),
671            end: unix_nanos_to_datetime(end),
672            limit: u64_to_nonzero_usize(limit.unwrap_or(1_000)),
673            client_id: Some(self.client_id()),
674            request_id: request_id.map_or_else(UUID4::new, UUID4::from),
675            ts_init: self.clock.get_time_ns(),
676            params: None,
677        };
678        DataClient::request_bars(self, req)
679    }
680
681    #[cfg(feature = "python")]
682    pub(crate) fn request_instrument_for_python(
683        &self,
684        instrument_id: InstrumentId,
685        params: Option<std::collections::HashMap<String, String>>,
686    ) -> anyhow::Result<()> {
687        let req = RequestInstrument {
688            client_id: Some(self.client_id()),
689            instrument_id,
690            start: None,
691            end: None,
692            request_id: UUID4::new(),
693            ts_init: self.clock.get_time_ns(),
694            params: string_hash_map_to_params(params),
695        };
696        DataClient::request_instrument(self, req)
697    }
698
699    #[cfg(feature = "python")]
700    pub(crate) fn request_instruments_for_python(
701        &self,
702        venue: Option<Venue>,
703        params: Option<std::collections::HashMap<String, String>>,
704    ) -> anyhow::Result<()> {
705        let req = RequestInstruments {
706            client_id: Some(self.client_id()),
707            venue,
708            start: None,
709            end: None,
710            request_id: UUID4::new(),
711            ts_init: self.clock.get_time_ns(),
712            params: string_hash_map_to_params(params),
713        };
714        DataClient::request_instruments(self, req)
715    }
716
717    fn venue_id(&self) -> Venue {
718        *IB_VENUE
719    }
720
721    /// Get a reference to the IB client if connected.
722    /// This is used internally for provider method calls.
723    #[allow(dead_code)] // Library API - may be used by other modules or PyO3 bindings
724    pub(crate) fn get_ib_client(&self) -> Option<&Arc<ibapi::Client>> {
725        self.ib_client.as_ref().map(|h| h.as_arc())
726    }
727
728    /// Get a reference to the instrument provider.
729    #[allow(dead_code)] // Library API - may be used by other modules or PyO3 bindings
730    pub(crate) fn instrument_provider(&self) -> Arc<InteractiveBrokersInstrumentProvider> {
731        Arc::clone(&self.instrument_provider)
732    }
733
734    /// Batch load multiple instrument IDs using the internal IB client.
735    ///
736    /// This method calls the provider's batch_load with the data client's IB client.
737    ///
738    /// # Arguments
739    ///
740    /// * `instrument_ids` - Vector of instrument IDs to load
741    ///
742    /// # Errors
743    ///
744    /// Returns an error if:
745    /// - The client is not connected
746    /// - The provider batch_load fails
747    pub async fn batch_load_instruments(
748        &self,
749        instrument_ids: Vec<InstrumentId>,
750    ) -> anyhow::Result<Vec<InstrumentId>> {
751        log::debug!(
752            "Batch loading {} IB instruments through data client",
753            instrument_ids.len()
754        );
755        let client = self
756            .ib_client
757            .as_ref()
758            .context("IB client not connected. Call connect() first")?;
759
760        let loaded = self
761            .instrument_provider
762            .batch_load(client, instrument_ids, None)
763            .await?;
764        log::debug!("Batch loaded {} IB instruments", loaded.len());
765        Ok(loaded)
766    }
767
768    /// Fetch option chain for an underlying contract with expiry filtering.
769    ///
770    /// This method calls the provider's fetch_option_chain_by_range with the data client's IB client.
771    ///
772    /// # Arguments
773    ///
774    /// * `underlying_symbol` - The underlying symbol (e.g., "AAPL")
775    /// * `exchange` - The exchange (defaults to "SMART")
776    /// * `currency` - The currency (defaults to "USD")
777    /// * `expiry_min` - Minimum expiry date string (YYYYMMDD format, optional)
778    /// * `expiry_max` - Maximum expiry date string (YYYYMMDD format, optional)
779    ///
780    /// # Errors
781    ///
782    /// Returns an error if:
783    /// - The client is not connected
784    /// - The provider method fails
785    pub async fn fetch_option_chain_by_range(
786        &self,
787        underlying_symbol: &str,
788        exchange: Option<&str>,
789        currency: Option<&str>,
790        expiry_min: Option<&str>,
791        expiry_max: Option<&str>,
792    ) -> anyhow::Result<usize> {
793        log::debug!(
794            "Fetching IB option chain by range (symbol={}, exchange={:?}, currency={:?}, expiry_min={:?}, expiry_max={:?})",
795            underlying_symbol,
796            exchange,
797            currency,
798            expiry_min,
799            expiry_max
800        );
801        let client = self
802            .ib_client
803            .as_ref()
804            .context("IB client not connected. Call connect() first")?;
805
806        let underlying = Contract {
807            contract_id: 0,
808            symbol: Symbol::from(underlying_symbol.to_string()),
809            security_type: SecurityType::Stock,
810            last_trade_date_or_contract_month: String::new(),
811            strike: 0.0,
812            right: String::new(),
813            multiplier: String::new(),
814            exchange: IBExchange::from(exchange.unwrap_or("SMART")),
815            currency: IBCurrency::from(currency.unwrap_or("USD")),
816            local_symbol: String::new(),
817            primary_exchange: IBExchange::default(),
818            trading_class: String::new(),
819            include_expired: false,
820            security_id_type: String::new(),
821            security_id: String::new(),
822            combo_legs_description: String::new(),
823            combo_legs: Vec::new(),
824            delta_neutral_contract: None,
825            issuer_id: String::new(),
826            description: String::new(),
827            last_trade_date: None,
828        };
829
830        let count = self
831            .instrument_provider
832            .fetch_option_chain_by_range(client, &underlying, expiry_min, expiry_max)
833            .await?;
834        log::debug!(
835            "Fetched {} IB option instruments for {}",
836            count,
837            underlying_symbol
838        );
839        Ok(count)
840    }
841
842    /// Fetch futures chain for a given underlying symbol.
843    ///
844    /// This method calls the provider's fetch_futures_chain with the data client's IB client.
845    ///
846    /// # Arguments
847    ///
848    /// * `symbol` - The underlying symbol (e.g., "ES")
849    /// * `exchange` - The exchange (defaults to empty string for all exchanges)
850    /// * `currency` - The currency (defaults to "USD")
851    ///
852    /// # Errors
853    ///
854    /// Returns an error if:
855    /// - The client is not connected
856    /// - The provider method fails
857    pub async fn fetch_futures_chain(
858        &self,
859        symbol: &str,
860        exchange: Option<&str>,
861        currency: Option<&str>,
862        min_expiry_days: Option<u32>,
863        max_expiry_days: Option<u32>,
864    ) -> anyhow::Result<usize> {
865        log::debug!(
866            "Fetching IB futures chain (symbol={}, exchange={:?}, currency={:?}, min_days={:?}, max_days={:?})",
867            symbol,
868            exchange,
869            currency,
870            min_expiry_days,
871            max_expiry_days
872        );
873        let client = self
874            .ib_client
875            .as_ref()
876            .context("IB client not connected. Call connect() first")?;
877
878        let count = self
879            .instrument_provider
880            .fetch_futures_chain(
881                client,
882                symbol,
883                exchange.unwrap_or(""),
884                currency.unwrap_or("USD"),
885                min_expiry_days,
886                max_expiry_days,
887            )
888            .await?;
889        log::debug!("Fetched {} IB futures instruments for {}", count, symbol);
890        Ok(count)
891    }
892
893    /// Fetch BAG (spread) contract details.
894    ///
895    /// This method calls the provider's fetch_bag_contract with the data client's IB client.
896    ///
897    /// # Arguments
898    ///
899    /// * `bag_contract` - The BAG contract with populated combo_legs
900    ///
901    /// # Errors
902    ///
903    /// Returns an error if:
904    /// - The client is not connected
905    /// - The provider method fails
906    pub async fn fetch_bag_contract(
907        &self,
908        bag_contract: &ibapi::contracts::Contract,
909    ) -> anyhow::Result<usize> {
910        log::debug!(
911            "Fetching IB BAG contract details (contract_id={}, exchange={}, symbol={})",
912            bag_contract.contract_id,
913            bag_contract.exchange.as_str(),
914            bag_contract.symbol.as_str()
915        );
916        let client = self
917            .ib_client
918            .as_ref()
919            .context("IB client not connected. Call connect() first")?;
920
921        let count = self
922            .instrument_provider
923            .fetch_bag_contract(client, bag_contract)
924            .await?;
925        log::debug!("Fetched {} BAG instruments", count);
926        Ok(count)
927    }
928}
929
930impl Debug for InteractiveBrokersDataClient {
931    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
932        f.debug_struct(stringify!(InteractiveBrokersDataClient))
933            .field("client_id", &self.client_id)
934            .field("config", &self.config)
935            .field("is_connected", &self.is_connected.load(Ordering::Relaxed))
936            .field("has_ib_client", &self.ib_client.is_some())
937            .finish_non_exhaustive()
938    }
939}
940
941#[async_trait::async_trait(?Send)]
942impl DataClient for InteractiveBrokersDataClient {
943    fn client_id(&self) -> ClientId {
944        self.client_id
945    }
946
947    fn venue(&self) -> Option<Venue> {
948        Some(self.venue_id())
949    }
950
951    fn start(&mut self) -> anyhow::Result<()> {
952        tracing::info!(
953            client_id = %self.client_id,
954            "Starting Interactive Brokers data client"
955        );
956        Ok(())
957    }
958
959    fn stop(&mut self) -> anyhow::Result<()> {
960        tracing::info!(
961            "Stopping Interactive Brokers data client {id}",
962            id = self.client_id
963        );
964        self.cancellation_token.cancel();
965        self.is_connected.store(false, Ordering::Relaxed);
966
967        // Cancel all tasks
968        for task in &self.tasks {
969            task.abort();
970        }
971        self.tasks.clear();
972
973        Ok(())
974    }
975
976    fn reset(&mut self) -> anyhow::Result<()> {
977        tracing::debug!(
978            "Resetting Interactive Brokers data client {id}",
979            id = self.client_id
980        );
981        self.is_connected.store(false, Ordering::Relaxed);
982        self.cancellation_token = CancellationToken::new();
983        self.tasks.clear();
984
985        // Clear subscriptions and cache
986        {
987            let mut subscriptions = self.subscriptions.blocking_lock();
988            subscriptions.clear();
989        }
990        {
991            let mut subscriptions = self.option_greeks_subscriptions.blocking_lock();
992            subscriptions.clear();
993        }
994        {
995            let mut cache = self.quote_cache.blocking_lock();
996            cache.clear();
997        }
998        {
999            let mut cache = self.option_greeks_cache.blocking_lock();
1000            cache.clear();
1001        }
1002
1003        Ok(())
1004    }
1005
1006    fn dispose(&mut self) -> anyhow::Result<()> {
1007        self.stop()
1008    }
1009
1010    async fn connect(&mut self) -> anyhow::Result<()> {
1011        tracing::debug!("Connecting Interactive Brokers data client...");
1012
1013        let handle = crate::common::shared_client::get_or_connect(
1014            &self.config.host,
1015            self.config.port,
1016            self.config.client_id,
1017            self.config.connection_timeout,
1018        )
1019        .await
1020        .context("Failed to connect to IB Gateway/TWS")?;
1021
1022        let client = handle.as_arc();
1023
1024        tracing::info!(
1025            "Connected to IB Gateway/TWS at {}:{} (client_id: {})",
1026            self.config.host,
1027            self.config.port,
1028            self.config.client_id
1029        );
1030
1031        // Set market data type if not default
1032        if self.config.market_data_type != crate::config::MarketDataType::Realtime {
1033            let ib_data_type: ibapi::market_data::MarketDataType =
1034                self.config.market_data_type.into();
1035            client
1036                .switch_market_data_type(ib_data_type)
1037                .await
1038                .context("Failed to switch market data type")?;
1039            tracing::info!("Set market data type to {:?}", self.config.market_data_type);
1040        }
1041
1042        self.ib_client = Some(handle);
1043        self.is_connected.store(true, Ordering::Relaxed);
1044
1045        // Initialize provider and load instruments from cache if configured
1046        tracing::debug!("Initializing IB data instrument provider");
1047        if let Err(e) = self.instrument_provider.initialize().await {
1048            tracing::warn!("Failed to initialize instrument provider: {}", e);
1049        }
1050
1051        tracing::debug!("Loading configured IB data instruments");
1052
1053        if let Err(e) = self
1054            .instrument_provider
1055            .load_all_async(
1056                self.ib_client.as_ref().unwrap().as_arc().as_ref(),
1057                None,
1058                None,
1059                false,
1060            )
1061            .await
1062        {
1063            tracing::warn!("Failed to load instruments on startup: {}", e);
1064        }
1065
1066        let instrument_count = self.instrument_provider.count();
1067        if instrument_count > 0 {
1068            tracing::info!(
1069                "Data client connected with {} instruments in provider cache",
1070                instrument_count
1071            );
1072        }
1073
1074        tracing::info!("Connected Interactive Brokers data client");
1075        Ok(())
1076    }
1077
1078    async fn disconnect(&mut self) -> anyhow::Result<()> {
1079        tracing::debug!("Disconnecting Interactive Brokers data client...");
1080
1081        self.stop()?;
1082        self.ib_client = None;
1083        self.is_connected.store(false, Ordering::Relaxed);
1084        tracing::info!("Disconnected Interactive Brokers data client");
1085        Ok(())
1086    }
1087
1088    fn is_connected(&self) -> bool {
1089        self.is_connected.load(Ordering::Relaxed)
1090    }
1091
1092    fn is_disconnected(&self) -> bool {
1093        !self.is_connected()
1094    }
1095
1096    // Subscription handlers
1097    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
1098        tracing::debug!("Subscribing to quotes for {}", cmd.instrument_id);
1099
1100        let client = self
1101            .ib_client
1102            .as_ref()
1103            .context("IB client not connected. Call connect() first")?;
1104
1105        // Get instrument from provider
1106        let instrument = self
1107            .instrument_provider
1108            .find(&cmd.instrument_id)
1109            .context(format!(
1110                "Instrument {} not found in provider",
1111                cmd.instrument_id
1112            ))?;
1113
1114        let price_precision = instrument.price_precision();
1115        let size_precision = instrument.size_precision();
1116
1117        // Convert instrument_id to IB contract
1118        let contract = self
1119            .instrument_provider
1120            .resolve_contract_for_instrument(cmd.instrument_id)
1121            .context("Failed to convert instrument_id to IB contract")?;
1122
1123        // Check if contract is BAG (spread) or if batch_quotes parameter is set
1124        // BAG contracts have SecurityType::Spread or combo_legs populated
1125        let is_bag = matches!(
1126            contract.security_type,
1127            ibapi::contracts::SecurityType::Spread
1128        ) || !contract.combo_legs.is_empty();
1129        let batch_quotes = cmd
1130            .params
1131            .as_ref()
1132            .and_then(|params| params.get_str("batch_quotes"))
1133            .map_or(self.config.batch_quotes, |s| {
1134                s == "true" || s == "True" || s == "1"
1135            });
1136
1137        let use_market_data = is_bag || batch_quotes;
1138
1139        let instrument_id = cmd.instrument_id;
1140        let data_sender = self.data_sender.clone();
1141        let quote_cache = Arc::clone(&self.quote_cache);
1142        let clock = self.clock;
1143
1144        // Get price magnifier from instrument provider
1145        let price_magnifier = self.instrument_provider.get_price_magnifier(&instrument_id) as f64;
1146
1147        // Create subscription-specific cancellation token
1148        let subscription_token = CancellationToken::new();
1149
1150        // Spawn subscription task
1151        let client_clone = client.as_arc().clone();
1152        let subscription_token_clone = subscription_token.clone();
1153        let ignore_size_updates = self.config.ignore_quote_tick_size_updates;
1154
1155        let task = get_runtime().spawn(async move {
1156            if use_market_data {
1157                // Use market_data (reqMktData) for BAG contracts or when batch_quotes is requested
1158                tracing::debug!(
1159                    "Using market_data subscription for {} (BAG: {}, batch_quotes: {})",
1160                    instrument_id,
1161                    is_bag,
1162                    batch_quotes
1163                );
1164
1165                if let Err(e) = handle_quote_subscription(
1166                    client_clone,
1167                    contract,
1168                    instrument_id,
1169                    price_precision,
1170                    size_precision,
1171                    data_sender,
1172                    quote_cache,
1173                    clock,
1174                    subscription_token_clone,
1175                    ignore_size_updates,
1176                )
1177                .await
1178                {
1179                    tracing::error!("Quote subscription error for {}: {:?}", instrument_id, e);
1180                }
1181            } else {
1182                // Try tick_by_tick_bid_ask first for regular contracts (better performance)
1183                // Fallback to market_data if it fails (e.g., for BAG contracts not detected upfront)
1184                tracing::debug!(
1185                    "Attempting tick_by_tick_bid_ask subscription for {}",
1186                    instrument_id
1187                );
1188
1189                match handle_tick_by_tick_quote_subscription(
1190                    client_clone.clone(),
1191                    contract.clone(),
1192                    instrument_id,
1193                    price_precision,
1194                    size_precision,
1195                    data_sender.clone(),
1196                    clock,
1197                    subscription_token_clone.clone(),
1198                    price_magnifier,
1199                )
1200                .await
1201                {
1202                    Ok(()) => {
1203                        // Success - subscription is active
1204                    }
1205                    Err(e) => {
1206                        tracing::warn!(
1207                            "tick_by_tick_bid_ask failed for {} (may be BAG contract), falling back to market_data: {:?}",
1208                            instrument_id,
1209                            e
1210                        );
1211                        // Fallback to market_data (reqMktData) - works for BAG contracts
1212                        if let Err(fallback_err) = handle_quote_subscription(
1213                            client_clone,
1214                            contract,
1215                            instrument_id,
1216                            price_precision,
1217                            size_precision,
1218                            data_sender,
1219                            quote_cache,
1220                            clock,
1221                            subscription_token_clone,
1222                            ignore_size_updates,
1223                        )
1224                        .await
1225                        {
1226                            tracing::error!(
1227                                "Quote subscription fallback also failed for {}: {:?}",
1228                                instrument_id,
1229                                fallback_err
1230                            );
1231                        } else {
1232                            tracing::info!(
1233                                "Successfully subscribed to {} using market_data fallback",
1234                                instrument_id
1235                            );
1236                        }
1237                    }
1238                }
1239            }
1240        });
1241
1242        self.tasks.push(task);
1243
1244        // Record subscription
1245        let mut subscriptions = self.subscriptions.blocking_lock();
1246        subscriptions.insert(
1247            cmd.instrument_id,
1248            SubscriptionInfo {
1249                instrument_id: cmd.instrument_id,
1250                subscription_type: SubscriptionType::Quotes,
1251                cancellation_token: subscription_token,
1252            },
1253        );
1254
1255        tracing::info!(
1256            "Quote subscription started for {} (method: {})",
1257            cmd.instrument_id,
1258            if use_market_data {
1259                "market_data"
1260            } else {
1261                "tick_by_tick_bid_ask"
1262            }
1263        );
1264        Ok(())
1265    }
1266
1267    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
1268        tracing::debug!("Subscribing to index prices for {}", cmd.instrument_id);
1269
1270        let client = self
1271            .ib_client
1272            .as_ref()
1273            .context("IB client not connected. Call connect() first")?;
1274
1275        let instrument = self
1276            .instrument_provider
1277            .find(&cmd.instrument_id)
1278            .context(format!(
1279                "Instrument {} not found in provider",
1280                cmd.instrument_id
1281            ))?;
1282
1283        let contract = self
1284            .instrument_provider
1285            .resolve_contract_for_instrument(cmd.instrument_id)
1286            .context("Failed to convert instrument_id to IB contract")?;
1287
1288        if !matches!(contract.security_type, SecurityType::Index) {
1289            tracing::warn!(
1290                "Index price subscription not supported for security type {:?} on {}",
1291                contract.security_type,
1292                cmd.instrument_id
1293            );
1294            return Ok(());
1295        }
1296
1297        let price_precision = instrument.price_precision();
1298        let price_magnifier = self
1299            .instrument_provider
1300            .get_price_magnifier(&cmd.instrument_id);
1301        let instrument_id = cmd.instrument_id;
1302        let data_sender = self.data_sender.clone();
1303        let clock = self.clock;
1304
1305        let subscription_token = CancellationToken::new();
1306
1307        let client_clone = client.as_arc().clone();
1308        let subscription_token_clone = subscription_token.clone();
1309
1310        let task = get_runtime().spawn(async move {
1311            if let Err(e) = handle_index_price_subscription(
1312                client_clone,
1313                contract,
1314                instrument_id,
1315                price_precision,
1316                price_magnifier,
1317                data_sender,
1318                clock,
1319                subscription_token_clone,
1320            )
1321            .await
1322            {
1323                tracing::error!(
1324                    "Index price subscription error for {}: {:?}",
1325                    instrument_id,
1326                    e
1327                );
1328            }
1329        });
1330
1331        self.tasks.push(task);
1332
1333        let mut subscriptions = self.subscriptions.blocking_lock();
1334        subscriptions.insert(
1335            cmd.instrument_id,
1336            SubscriptionInfo {
1337                instrument_id: cmd.instrument_id,
1338                subscription_type: SubscriptionType::IndexPrices,
1339                cancellation_token: subscription_token,
1340            },
1341        );
1342
1343        tracing::info!("Index price subscription started for {}", cmd.instrument_id);
1344        Ok(())
1345    }
1346
1347    fn subscribe_option_greeks(&mut self, cmd: SubscribeOptionGreeks) -> anyhow::Result<()> {
1348        tracing::debug!("Subscribing to option greeks for {}", cmd.instrument_id);
1349
1350        let client = self
1351            .ib_client
1352            .as_ref()
1353            .context("IB client not connected. Call connect() first")?;
1354
1355        let instrument = self
1356            .instrument_provider
1357            .find(&cmd.instrument_id)
1358            .context(format!(
1359                "Instrument {} not found in provider",
1360                cmd.instrument_id
1361            ))?;
1362
1363        if !matches!(
1364            instrument,
1365            InstrumentAny::OptionContract(_)
1366                | InstrumentAny::FuturesContract(_)
1367                | InstrumentAny::CryptoOption(_)
1368        ) && !matches!(
1369            self.instrument_provider
1370                .resolve_contract_for_instrument(cmd.instrument_id)?
1371                .security_type,
1372            SecurityType::Option | SecurityType::FuturesOption
1373        ) {
1374            tracing::warn!(
1375                "Option greeks subscription is only supported for option instruments: {}",
1376                cmd.instrument_id
1377            );
1378            return Ok(());
1379        }
1380
1381        let contract = self
1382            .instrument_provider
1383            .resolve_contract_for_instrument(cmd.instrument_id)
1384            .context("Failed to convert instrument_id to IB contract")?;
1385
1386        let instrument_id = cmd.instrument_id;
1387        let data_sender = self.data_sender.clone();
1388        let option_greeks_cache = Arc::clone(&self.option_greeks_cache);
1389        let clock = self.clock;
1390        let subscription_token = CancellationToken::new();
1391        let subscription_token_clone = subscription_token.clone();
1392        let client_clone = client.as_arc().clone();
1393
1394        let task = get_runtime().spawn(async move {
1395            if let Err(e) = handle_option_greeks_subscription(
1396                client_clone,
1397                contract,
1398                instrument_id,
1399                data_sender,
1400                option_greeks_cache,
1401                clock,
1402                subscription_token_clone,
1403            )
1404            .await
1405            {
1406                tracing::error!(
1407                    "Option greeks subscription error for {}: {:?}",
1408                    instrument_id,
1409                    e
1410                );
1411            }
1412        });
1413
1414        self.tasks.push(task);
1415
1416        let mut subscriptions = self.option_greeks_subscriptions.blocking_lock();
1417        if let Some(existing) = subscriptions.insert(cmd.instrument_id, subscription_token) {
1418            existing.cancel();
1419        }
1420
1421        tracing::info!(
1422            "Option greeks subscription started for {}",
1423            cmd.instrument_id
1424        );
1425        Ok(())
1426    }
1427
1428    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1429        tracing::debug!("Unsubscribing from quotes for {}", cmd.instrument_id);
1430
1431        let mut subscriptions = self.subscriptions.blocking_lock();
1432        if let Some(sub_info) = subscriptions.remove(&cmd.instrument_id) {
1433            sub_info.cancellation_token.cancel();
1434            tracing::info!("Unsubscribed from quotes for {}", cmd.instrument_id);
1435        } else {
1436            tracing::warn!(
1437                "No active quote subscription found for {}",
1438                cmd.instrument_id
1439            );
1440        }
1441
1442        // Clear quote cache for this instrument
1443        {
1444            // Quote cache doesn't have per-instrument clear, but we can clear all
1445            // In practice, the cache will naturally expire as new quotes arrive
1446        }
1447
1448        Ok(())
1449    }
1450
1451    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1452        tracing::debug!("Unsubscribing from index prices for {}", cmd.instrument_id);
1453
1454        let mut subscriptions = self.subscriptions.blocking_lock();
1455        if let Some(sub_info) = subscriptions.remove(&cmd.instrument_id) {
1456            sub_info.cancellation_token.cancel();
1457            tracing::info!("Unsubscribed from index prices for {}", cmd.instrument_id);
1458        } else {
1459            tracing::warn!(
1460                "No active index price subscription found for {}",
1461                cmd.instrument_id
1462            );
1463        }
1464
1465        Ok(())
1466    }
1467
1468    fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
1469        tracing::debug!("Unsubscribing from option greeks for {}", cmd.instrument_id);
1470
1471        let mut subscriptions = self.option_greeks_subscriptions.blocking_lock();
1472        if let Some(subscription_token) = subscriptions.remove(&cmd.instrument_id) {
1473            subscription_token.cancel();
1474            tracing::info!("Unsubscribed from option greeks for {}", cmd.instrument_id);
1475        } else {
1476            tracing::warn!(
1477                "No active option greeks subscription found for {}",
1478                cmd.instrument_id
1479            );
1480        }
1481
1482        Ok(())
1483    }
1484
1485    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
1486        tracing::debug!("Subscribing to trades for {}", cmd.instrument_id);
1487
1488        let client = self
1489            .ib_client
1490            .as_ref()
1491            .context("IB client not connected. Call connect() first")?;
1492
1493        // Get instrument from provider
1494        let instrument = self
1495            .instrument_provider
1496            .find(&cmd.instrument_id)
1497            .context(format!(
1498                "Instrument {} not found in provider",
1499                cmd.instrument_id
1500            ))?;
1501
1502        // Check if instrument is a CurrencyPair (IB doesn't support trades for CurrencyPair)
1503        if matches!(instrument, InstrumentAny::CurrencyPair(_)) {
1504            tracing::error!(
1505                "Interactive Brokers does not support trades for CurrencyPair instruments: {}",
1506                cmd.instrument_id
1507            );
1508            return Ok(());
1509        }
1510
1511        let price_precision = instrument.price_precision();
1512        let size_precision = instrument.size_precision();
1513
1514        // Convert instrument_id to IB contract
1515        let contract = self
1516            .instrument_provider
1517            .resolve_contract_for_instrument(cmd.instrument_id)
1518            .context("Failed to convert instrument_id to IB contract")?;
1519
1520        let instrument_id = cmd.instrument_id;
1521        let data_sender = self.data_sender.clone();
1522        let clock = self.clock;
1523
1524        // Create subscription-specific cancellation token
1525        let subscription_token = CancellationToken::new();
1526
1527        // Spawn subscription task
1528        let client_clone = client.as_arc().clone();
1529        let subscription_token_clone = subscription_token.clone();
1530
1531        let task = get_runtime().spawn(async move {
1532            if let Err(e) = handle_trade_subscription(
1533                client_clone,
1534                contract,
1535                instrument_id,
1536                price_precision,
1537                size_precision,
1538                data_sender,
1539                clock,
1540                subscription_token_clone,
1541            )
1542            .await
1543            {
1544                tracing::error!("Trade subscription error for {}: {:?}", instrument_id, e);
1545            }
1546        });
1547
1548        self.tasks.push(task);
1549
1550        // Record subscription
1551        let mut subscriptions = self.subscriptions.blocking_lock();
1552        subscriptions.insert(
1553            cmd.instrument_id,
1554            SubscriptionInfo {
1555                instrument_id: cmd.instrument_id,
1556                subscription_type: SubscriptionType::Trades,
1557                cancellation_token: subscription_token,
1558            },
1559        );
1560
1561        tracing::info!("Trade subscription started for {}", cmd.instrument_id);
1562        Ok(())
1563    }
1564
1565    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1566        tracing::debug!("Unsubscribing from trades for {}", cmd.instrument_id);
1567
1568        let mut subscriptions = self.subscriptions.blocking_lock();
1569        if let Some(sub_info) = subscriptions.remove(&cmd.instrument_id) {
1570            sub_info.cancellation_token.cancel();
1571            tracing::info!("Unsubscribed from trades for {}", cmd.instrument_id);
1572        } else {
1573            tracing::warn!(
1574                "No active trade subscription found for {}",
1575                cmd.instrument_id
1576            );
1577        }
1578
1579        Ok(())
1580    }
1581
1582    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
1583        tracing::debug!("Subscribing to bars for {}", cmd.bar_type);
1584
1585        let client = self
1586            .ib_client
1587            .as_ref()
1588            .context("IB client not connected. Call connect() first")?;
1589
1590        // Get instrument from provider
1591        let instrument_id = cmd.bar_type.instrument_id();
1592        let instrument = self
1593            .instrument_provider
1594            .find(&instrument_id)
1595            .context(format!("Instrument {instrument_id} not found in provider"))?;
1596
1597        let price_precision = instrument.price_precision();
1598        let size_precision = instrument.size_precision();
1599
1600        // Convert instrument_id to IB contract
1601        let contract = self
1602            .instrument_provider
1603            .resolve_contract_for_instrument(instrument_id)
1604            .context("Failed to convert instrument_id to IB contract")?;
1605
1606        let bar_type = cmd.bar_type;
1607        let bar_type_str = bar_type.to_string();
1608        let data_sender = self.data_sender.clone();
1609        let clock = self.clock;
1610        let last_bars = Arc::clone(&self.last_bars);
1611        let bar_timeout_tasks = Arc::clone(&self.bar_timeout_tasks);
1612        let handle_revised_bars = self.config.handle_revised_bars;
1613        let use_rth = self.config.use_regular_trading_hours;
1614        let start_ns = parse_start_ns(cmd.params.as_ref());
1615
1616        // Create subscription-specific cancellation token
1617        let subscription_token = CancellationToken::new();
1618
1619        // Spawn subscription task
1620        let client_clone = client.as_arc().clone();
1621        let subscription_token_clone = subscription_token.clone();
1622
1623        let task = get_runtime().spawn(async move {
1624            let result = if bar_type.spec().timedelta().num_seconds() == 5 {
1625                handle_realtime_bars_subscription(
1626                    client_clone,
1627                    contract,
1628                    bar_type,
1629                    bar_type_str,
1630                    instrument_id,
1631                    price_precision,
1632                    size_precision,
1633                    data_sender,
1634                    clock,
1635                    last_bars,
1636                    bar_timeout_tasks,
1637                    handle_revised_bars,
1638                    subscription_token_clone,
1639                )
1640                .await
1641            } else {
1642                handle_historical_bars_subscription(
1643                    client_clone,
1644                    contract,
1645                    bar_type,
1646                    price_type_to_ib_what_to_show(bar_type.spec().price_type),
1647                    price_precision,
1648                    size_precision,
1649                    use_rth,
1650                    start_ns,
1651                    data_sender,
1652                    handle_revised_bars,
1653                    clock,
1654                    subscription_token_clone,
1655                )
1656                .await
1657            };
1658
1659            if let Err(e) = result {
1660                tracing::error!("Bars subscription error for {}: {:?}", bar_type, e);
1661            }
1662        });
1663
1664        self.tasks.push(task);
1665
1666        // Record subscription
1667        let mut subscriptions = self.subscriptions.blocking_lock();
1668        subscriptions.insert(
1669            instrument_id,
1670            SubscriptionInfo {
1671                instrument_id,
1672                subscription_type: SubscriptionType::Bars,
1673                cancellation_token: subscription_token,
1674            },
1675        );
1676
1677        tracing::info!("Real-time bars subscription started for {}", bar_type);
1678        Ok(())
1679    }
1680
1681    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1682        tracing::debug!("Unsubscribing from bars for {}", cmd.bar_type);
1683
1684        let instrument_id = cmd.bar_type.instrument_id();
1685        let mut subscriptions = self.subscriptions.blocking_lock();
1686        if let Some(sub_info) = subscriptions.remove(&instrument_id) {
1687            sub_info.cancellation_token.cancel();
1688            tracing::info!("Unsubscribed from bars for {}", cmd.bar_type);
1689        } else {
1690            tracing::warn!("No active bar subscription found for {}", cmd.bar_type);
1691        }
1692
1693        Ok(())
1694    }
1695
1696    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
1697        tracing::debug!("Subscribing to book deltas for {}", cmd.instrument_id);
1698
1699        // Validate book type (IB doesn't support L3_MBO)
1700        if cmd.book_type == BookType::L3_MBO {
1701            tracing::error!(
1702                "Cannot subscribe to order book deltas: L3_MBO data is not published by Interactive Brokers. Valid book types are L1_MBP, L2_MBP"
1703            );
1704            return Ok(());
1705        }
1706
1707        let client = self
1708            .ib_client
1709            .as_ref()
1710            .context("IB client not connected. Call connect() first")?;
1711
1712        // Get instrument from provider
1713        let instrument = self
1714            .instrument_provider
1715            .find(&cmd.instrument_id)
1716            .context(format!(
1717                "Instrument {} not found in provider",
1718                cmd.instrument_id
1719            ))?;
1720
1721        let price_precision = instrument.price_precision();
1722        let size_precision = instrument.size_precision();
1723
1724        // Convert instrument_id to IB contract
1725        let contract = self
1726            .instrument_provider
1727            .resolve_contract_for_instrument(cmd.instrument_id)
1728            .context("Failed to convert instrument_id to IB contract")?;
1729
1730        let instrument_id = cmd.instrument_id;
1731        let data_sender = self.data_sender.clone();
1732        let clock = self.clock;
1733
1734        // Create subscription-specific cancellation token
1735        let subscription_token = CancellationToken::new();
1736
1737        // Get depth from command or default to 20 (Python default)
1738        let depth_rows = cmd.depth.map_or(20, |d| d.get() as i32);
1739
1740        // Get is_smart_depth from params or default to true
1741        let is_smart_depth = cmd
1742            .params
1743            .as_ref()
1744            .and_then(|params| params.get_str("is_smart_depth"))
1745            .is_none_or(|s| s == "true" || s == "True" || s == "1");
1746
1747        // Spawn subscription task
1748        let client_clone = client.as_arc().clone();
1749        let subscription_token_clone = subscription_token.clone();
1750
1751        let task = get_runtime().spawn(async move {
1752            if let Err(e) = handle_market_depth_subscription(
1753                client_clone,
1754                contract,
1755                instrument_id,
1756                price_precision,
1757                size_precision,
1758                depth_rows,
1759                is_smart_depth,
1760                data_sender,
1761                clock,
1762                subscription_token_clone,
1763            )
1764            .await
1765            {
1766                tracing::error!(
1767                    "Market depth subscription error for {}: {:?}",
1768                    instrument_id,
1769                    e
1770                );
1771            }
1772        });
1773
1774        self.tasks.push(task);
1775
1776        // Record subscription
1777        let mut subscriptions = self.subscriptions.blocking_lock();
1778        subscriptions.insert(
1779            cmd.instrument_id,
1780            SubscriptionInfo {
1781                instrument_id: cmd.instrument_id,
1782                subscription_type: SubscriptionType::BookDeltas,
1783                cancellation_token: subscription_token,
1784            },
1785        );
1786
1787        tracing::info!(
1788            "Market depth subscription started for {}",
1789            cmd.instrument_id
1790        );
1791        Ok(())
1792    }
1793
1794    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1795        tracing::debug!("Unsubscribing from book deltas for {}", cmd.instrument_id);
1796
1797        let mut subscriptions = self.subscriptions.blocking_lock();
1798        if let Some(sub_info) = subscriptions.remove(&cmd.instrument_id) {
1799            sub_info.cancellation_token.cancel();
1800            tracing::info!("Unsubscribed from book deltas for {}", cmd.instrument_id);
1801        } else {
1802            tracing::warn!(
1803                "No active book delta subscription found for {}",
1804                cmd.instrument_id
1805            );
1806        }
1807
1808        Ok(())
1809    }
1810
1811    // Request handlers
1812    fn request_instrument(&self, cmd: RequestInstrument) -> anyhow::Result<()> {
1813        tracing::debug!("Requesting instrument: {}", cmd.instrument_id);
1814
1815        // Check if force_instrument_update is requested
1816        let force_update = cmd
1817            .params
1818            .as_ref()
1819            .and_then(|params| params.get_str("force_instrument_update"))
1820            .is_some_and(|s| s == "true" || s == "True" || s == "1");
1821
1822        // Get instrument from provider (or load if not found or force_update)
1823        let instrument =
1824            if force_update || self.instrument_provider.find(&cmd.instrument_id).is_none() {
1825                // Need to load instrument - spawn async task
1826                let client = self
1827                    .ib_client
1828                    .as_ref()
1829                    .context("IB client not connected. Call connect() first")?;
1830                let instrument_provider = Arc::clone(&self.instrument_provider);
1831                let instrument_id = cmd.instrument_id;
1832                let data_sender = self.data_sender.clone();
1833                let clock = self.clock;
1834                let request_id = cmd.request_id;
1835                let client_id = cmd.client_id.unwrap_or(self.client_id);
1836                let params = cmd.params.clone();
1837                let start_nanos = cmd.start.map(|dt| {
1838                    UnixNanos::from(
1839                        dt.timestamp_nanos_opt()
1840                            .unwrap_or_else(|| dt.timestamp() * 1_000_000_000)
1841                            as u64,
1842                    )
1843                });
1844                let end_nanos = cmd.end.map(|dt| {
1845                    UnixNanos::from(
1846                        dt.timestamp_nanos_opt()
1847                            .unwrap_or_else(|| dt.timestamp() * 1_000_000_000)
1848                            as u64,
1849                    )
1850                });
1851
1852                let client_clone = client.as_arc().clone();
1853
1854                get_runtime().spawn(async move {
1855                    if let Err(e) = instrument_provider
1856                        .fetch_contract_details(&client_clone, instrument_id, false, None)
1857                        .await
1858                    {
1859                        tracing::error!(
1860                            "Failed to fetch contract details for {}: {:?}",
1861                            instrument_id,
1862                            e
1863                        );
1864                        return;
1865                    }
1866
1867                    if let Some(instrument) = instrument_provider.find(&instrument_id) {
1868                        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1869                            request_id,
1870                            client_id,
1871                            instrument_id,
1872                            instrument,
1873                            start_nanos,
1874                            end_nanos,
1875                            clock.get_time_ns(),
1876                            params,
1877                        )));
1878
1879                        if let Err(e) = data_sender.send(DataEvent::Response(response)) {
1880                            tracing::error!("Failed to send instrument response: {e}");
1881                        }
1882                    }
1883                });
1884
1885                // Return early, response will be sent async
1886                return Ok(());
1887            } else {
1888                // Instrument already in provider
1889                self.instrument_provider
1890                    .find(&cmd.instrument_id)
1891                    .context(format!(
1892                        "Instrument {} not found in provider",
1893                        cmd.instrument_id
1894                    ))?
1895            };
1896
1897        let start_nanos = cmd.start.map(|dt| {
1898            UnixNanos::from(
1899                dt.timestamp_nanos_opt()
1900                    .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
1901            )
1902        });
1903        let end_nanos = cmd.end.map(|dt| {
1904            UnixNanos::from(
1905                dt.timestamp_nanos_opt()
1906                    .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
1907            )
1908        });
1909
1910        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1911            cmd.request_id,
1912            cmd.client_id.unwrap_or(self.client_id),
1913            cmd.instrument_id,
1914            instrument,
1915            start_nanos,
1916            end_nanos,
1917            self.clock.get_time_ns(),
1918            cmd.params,
1919        )));
1920
1921        if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
1922            tracing::error!("Failed to send instrument response: {e}");
1923        }
1924
1925        Ok(())
1926    }
1927
1928    fn request_instruments(&self, cmd: RequestInstruments) -> anyhow::Result<()> {
1929        tracing::debug!("Requesting all instruments for venue: {:?}", cmd.venue);
1930
1931        let client = self
1932            .ib_client
1933            .as_ref()
1934            .context("IB client not connected. Call connect() first")?;
1935
1936        // Check for force_instrument_update
1937        let force_update = cmd
1938            .params
1939            .as_ref()
1940            .and_then(|params| params.get_str("force_instrument_update"))
1941            .is_some_and(|s| s == "true" || s == "True" || s == "1");
1942
1943        // Check if ib_contracts parameter is provided for batch loading
1944        let mut contracts_to_load: Vec<ibapi::contracts::Contract> = Vec::new();
1945
1946        if let Some(params) = &cmd.params
1947            && let Some(ib_contracts_json_str) = params.get_str("ib_contracts")
1948        {
1949            // Parse JSON string containing array of contracts
1950            match crate::common::contracts::parse_contracts_from_json_array(ib_contracts_json_str) {
1951                Ok(contracts) => {
1952                    tracing::info!(
1953                        "Parsed {} contracts from ib_contracts JSON",
1954                        contracts.len()
1955                    );
1956                    log::debug!("Parsed ib_contracts payload: {}", ib_contracts_json_str);
1957                    contracts_to_load = contracts;
1958                }
1959                Err(e) => {
1960                    tracing::warn!(
1961                        "Failed to parse ib_contracts JSON: {}. Continuing without contracts",
1962                        e
1963                    );
1964                }
1965            }
1966        }
1967
1968        // If force_update is requested or we need to batch load, spawn async task
1969        let instrument_provider = Arc::clone(&self.instrument_provider);
1970        let client_clone = client.as_arc().clone();
1971        let data_sender = self.data_sender.clone();
1972        let clock = self.clock;
1973        let request_id = cmd.request_id;
1974        let client_id = cmd.client_id.unwrap_or(self.client_id);
1975        let venue = cmd.venue.unwrap_or(*IB_VENUE);
1976        let params = cmd.params.clone();
1977        let start_nanos = cmd.start.map(|dt| {
1978            UnixNanos::from(
1979                dt.timestamp_nanos_opt()
1980                    .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
1981            )
1982        });
1983        let end_nanos = cmd.end.map(|dt| {
1984            UnixNanos::from(
1985                dt.timestamp_nanos_opt()
1986                    .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
1987            )
1988        });
1989
1990        // Handle batch loading if contracts are provided or force_update is requested
1991        if !contracts_to_load.is_empty() || force_update {
1992            let contracts_to_load_clone = contracts_to_load;
1993
1994            get_runtime().spawn(async move {
1995                let mut loaded_instrument_ids = Vec::new();
1996
1997                // Load instruments from contracts if provided
1998                if !contracts_to_load_clone.is_empty() {
1999                    for contract in contracts_to_load_clone {
2000                        log::debug!(
2001                            "Loading instrument from IB contract spec (sec_type={:?}, symbol={}, local_symbol={}, exchange={}, expiry={})",
2002                            contract.security_type,
2003                            contract.symbol.as_str(),
2004                            contract.local_symbol.as_str(),
2005                            contract.exchange.as_str(),
2006                            contract.last_trade_date_or_contract_month.as_str()
2007                        );
2008                        // Convert contract to instrument ID and load
2009                        if let Ok(instrument_id) =
2010                            crate::common::parse::ib_contract_to_instrument_id_simple(&contract)
2011                        {
2012                            if instrument_provider.find(&instrument_id).is_none() {
2013                                if let Err(e) = instrument_provider
2014                                    .fetch_contract_details(
2015                                        &client_clone,
2016                                        instrument_id,
2017                                        false,
2018                                        None,
2019                                    )
2020                                    .await
2021                                {
2022                                    tracing::warn!(
2023                                        "Failed to load contract for {}: {}",
2024                                        instrument_id,
2025                                        e
2026                                    );
2027                                } else {
2028                                    loaded_instrument_ids.push(instrument_id);
2029                                }
2030                            } else {
2031                                loaded_instrument_ids.push(instrument_id);
2032                            }
2033                        }
2034                    }
2035                }
2036
2037                // If force_update, also reload all existing instruments
2038                if force_update {
2039                    let all_instrument_ids: Vec<InstrumentId> = instrument_provider
2040                        .get_all()
2041                        .into_iter()
2042                        .map(|inst| inst.id())
2043                        .collect();
2044
2045                    if !all_instrument_ids.is_empty()
2046                        && let Ok(mut reloaded_ids) = instrument_provider
2047                            .batch_load(&client_clone, all_instrument_ids, None)
2048                            .await
2049                    {
2050                        loaded_instrument_ids.append(&mut reloaded_ids);
2051                    }
2052                }
2053
2054                // Get all instruments from provider after loading
2055                let instruments = instrument_provider.get_all();
2056
2057                let response = DataResponse::Instruments(InstrumentsResponse::new(
2058                    request_id,
2059                    client_id,
2060                    venue,
2061                    instruments,
2062                    start_nanos,
2063                    end_nanos,
2064                    clock.get_time_ns(),
2065                    params,
2066                ));
2067
2068                if let Err(e) = data_sender.send(DataEvent::Response(response)) {
2069                    tracing::error!("Failed to send instruments response: {e}");
2070                } else {
2071                    tracing::info!(
2072                        "Successfully sent {} instruments response (loaded {} new instruments)",
2073                        instrument_provider.count(),
2074                        loaded_instrument_ids.len()
2075                    );
2076                }
2077            });
2078        } else {
2079            // Get all instruments from provider (no loading needed)
2080            let instruments = self.instrument_provider.get_all();
2081
2082            let response = DataResponse::Instruments(InstrumentsResponse::new(
2083                cmd.request_id,
2084                cmd.client_id.unwrap_or(self.client_id),
2085                venue,
2086                instruments,
2087                start_nanos,
2088                end_nanos,
2089                self.clock.get_time_ns(),
2090                cmd.params,
2091            ));
2092
2093            if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
2094                tracing::error!("Failed to send instruments response: {e}");
2095            } else {
2096                tracing::info!(
2097                    "Successfully sent {} instruments response",
2098                    self.instrument_provider.count()
2099                );
2100            }
2101        }
2102
2103        Ok(())
2104    }
2105
2106    fn request_quotes(&self, cmd: RequestQuotes) -> anyhow::Result<()> {
2107        tracing::debug!("Requesting quotes for {}", cmd.instrument_id);
2108
2109        let client = self
2110            .ib_client
2111            .as_ref()
2112            .context("IB client not connected. Call connect() first")?;
2113
2114        // Get instrument from provider
2115        let instrument = self
2116            .instrument_provider
2117            .find(&cmd.instrument_id)
2118            .context(format!(
2119                "Instrument {} not found in provider",
2120                cmd.instrument_id
2121            ))?;
2122
2123        let price_precision = instrument.price_precision();
2124        let size_precision = instrument.size_precision();
2125
2126        // Convert instrument_id to IB contract
2127        let contract = self
2128            .instrument_provider
2129            .resolve_contract_for_instrument(cmd.instrument_id)
2130            .context("Failed to convert instrument_id to IB contract")?;
2131
2132        // Determine number of ticks from limit or default to 1000
2133        let number_of_ticks = cmd.limit.map_or(1000, |l| l.get() as i32).min(1000);
2134
2135        let instrument_id = cmd.instrument_id;
2136        let data_sender = self.data_sender.clone();
2137        let clock = self.clock;
2138        let request_id = cmd.request_id;
2139        let client_id = cmd.client_id.unwrap_or(self.client_id);
2140        let params = cmd.params.clone();
2141        let start_nanos = cmd.start.map(|dt| {
2142            UnixNanos::from(
2143                dt.timestamp_nanos_opt()
2144                    .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
2145            )
2146        });
2147        let end_nanos = cmd.end.map(|dt| {
2148            UnixNanos::from(
2149                dt.timestamp_nanos_opt()
2150                    .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
2151            )
2152        });
2153
2154        // Spawn async task to handle the request with pagination
2155        let client_clone = client.as_arc().clone();
2156        let limit = cmd.limit.map_or(1000, |l| l.get());
2157        let start_nanos_clone = start_nanos;
2158        let end_nanos_clone = end_nanos;
2159        let cmd_start = cmd.start;
2160        let cmd_end = cmd.end;
2161
2162        get_runtime().spawn(async move {
2163            let mut all_quotes = Vec::new();
2164            // Work backwards from end_date, updating end to the earliest tick received
2165            let mut current_end_date = cmd_end;
2166            if current_end_date.is_none() {
2167                current_end_date = Some(chrono::Utc::now());
2168            }
2169            let current_start_date = cmd_start;
2170
2171            // Pagination loop: continue while (start exists and end > start) or (len < limit)
2172            loop {
2173                let should_continue =
2174                    if let (Some(start), Some(end)) = (current_start_date, current_end_date) {
2175                        end > start
2176                    } else {
2177                        false
2178                    };
2179
2180                if !should_continue && all_quotes.len() >= limit {
2181                    break;
2182                }
2183
2184                let current_end_ib = current_end_date.as_ref().map(chrono_to_ib_datetime);
2185
2186                // Make request for this batch
2187                match client_clone
2188                    .historical_ticks_bid_ask(
2189                        &contract,
2190                        current_start_date.as_ref().map(chrono_to_ib_datetime),
2191                        current_end_ib,
2192                        number_of_ticks,
2193                        ibapi::market_data::TradingHours::Regular,
2194                        false, // ignore_size
2195                    )
2196                    .await
2197                {
2198                    Ok(mut subscription) => {
2199                        let mut batch_quotes = Vec::new();
2200
2201                        while let Some(tick) = subscription.next().await {
2202                            let ts_event =
2203                                super::convert::ib_timestamp_to_unix_nanos(&tick.timestamp);
2204                            let ts_init = clock.get_time_ns();
2205
2206                            match super::parse::parse_quote_tick(
2207                                instrument_id,
2208                                Some(tick.price_bid),
2209                                Some(tick.price_ask),
2210                                Some(tick.size_bid as f64),
2211                                Some(tick.size_ask as f64),
2212                                price_precision,
2213                                size_precision,
2214                                ts_event,
2215                                ts_init,
2216                            ) {
2217                                Ok(quote_tick) => batch_quotes.push(quote_tick),
2218                                Err(e) => {
2219                                    tracing::warn!("Failed to parse quote tick: {:?}", e);
2220                                }
2221                            }
2222                        }
2223
2224                        if batch_quotes.is_empty() {
2225                            break;
2226                        }
2227
2228                        // Update current_end_date to the minimum ts_init from this batch for next iteration
2229                        // This works backwards in time
2230                        if let Some(min_tick) = batch_quotes.iter().min_by_key(|t| t.ts_init) {
2231                            let min_ts_nanos = min_tick.ts_init.as_u64();
2232                            // Convert UnixNanos to DateTime<Utc>
2233                            let min_ts_seconds = (min_ts_nanos / 1_000_000_000) as i64;
2234                            let min_ts_nanos_remainder = (min_ts_nanos % 1_000_000_000) as u32;
2235                            current_end_date = chrono::DateTime::from_timestamp(
2236                                min_ts_seconds,
2237                                min_ts_nanos_remainder,
2238                            );
2239                        }
2240
2241                        all_quotes.extend(batch_quotes);
2242
2243                        // Check if we should continue - need start and current_end > start
2244                        if let (Some(start_dt), Some(end_dt)) =
2245                            (current_start_date, current_end_date)
2246                            && end_dt <= start_dt
2247                        {
2248                            // Filter out quotes after end_date_time and before start
2249                            if let Some(end_limit) = end_nanos_clone {
2250                                all_quotes.retain(|q| q.ts_init <= end_limit);
2251                            }
2252
2253                            if let Some(start_limit) = start_nanos_clone {
2254                                all_quotes.retain(|q| q.ts_init >= start_limit);
2255                            }
2256                            break;
2257                        }
2258
2259                        // Break if we've reached the limit
2260                        if all_quotes.len() >= limit {
2261                            break;
2262                        }
2263                    }
2264                    Err(e) => {
2265                        tracing::error!(
2266                            "Historical quotes request failed for {}: {:?}",
2267                            instrument_id,
2268                            e
2269                        );
2270                        break;
2271                    }
2272                }
2273            }
2274
2275            // Filter out ticks after end_date_time if specified
2276            if let Some(end_limit) = end_nanos_clone {
2277                all_quotes.retain(|q| q.ts_init <= end_limit);
2278            }
2279
2280            // Sort by ts_init
2281            all_quotes.sort_by_key(|q| q.ts_init);
2282
2283            let quotes_count = all_quotes.len();
2284            let response = DataResponse::Quotes(QuotesResponse::new(
2285                request_id,
2286                client_id,
2287                instrument_id,
2288                all_quotes,
2289                start_nanos_clone,
2290                end_nanos_clone,
2291                clock.get_time_ns(),
2292                params,
2293            ));
2294
2295            if let Err(e) = data_sender.send(DataEvent::Response(response)) {
2296                tracing::error!("Failed to send quotes response: {e}");
2297            } else {
2298                tracing::info!(
2299                    "Successfully sent {} quotes for {}",
2300                    quotes_count,
2301                    instrument_id
2302                );
2303            }
2304        });
2305
2306        Ok(())
2307    }
2308
2309    fn request_trades(&self, cmd: RequestTrades) -> anyhow::Result<()> {
2310        tracing::debug!("Requesting trades for {}", cmd.instrument_id);
2311
2312        let client = self
2313            .ib_client
2314            .as_ref()
2315            .context("IB client not connected. Call connect() first")?;
2316
2317        // Get instrument from provider
2318        let instrument = self
2319            .instrument_provider
2320            .find(&cmd.instrument_id)
2321            .context(format!(
2322                "Instrument {} not found in provider",
2323                cmd.instrument_id
2324            ))?;
2325
2326        // Check if instrument is a CurrencyPair (IB doesn't support trades for CurrencyPair)
2327        if matches!(instrument, InstrumentAny::CurrencyPair(_)) {
2328            tracing::error!(
2329                "Interactive Brokers does not support trades for CurrencyPair instruments: {}",
2330                cmd.instrument_id
2331            );
2332            return Ok(());
2333        }
2334
2335        let price_precision = instrument.price_precision();
2336        let size_precision = instrument.size_precision();
2337
2338        // Convert instrument_id to IB contract
2339        let contract = self
2340            .instrument_provider
2341            .resolve_contract_for_instrument(cmd.instrument_id)
2342            .context("Failed to convert instrument_id to IB contract")?;
2343
2344        // Determine number of ticks from limit or default to 1000
2345        let number_of_ticks = cmd.limit.map_or(1000, |l| l.get() as i32).min(1000);
2346
2347        let instrument_id = cmd.instrument_id;
2348        let data_sender = self.data_sender.clone();
2349        let clock = self.clock;
2350        let request_id = cmd.request_id;
2351        let client_id = cmd.client_id.unwrap_or(self.client_id);
2352        let params = cmd.params.clone();
2353        let start_nanos = cmd.start.map(|dt| {
2354            UnixNanos::from(
2355                dt.timestamp_nanos_opt()
2356                    .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
2357            )
2358        });
2359        let end_nanos = cmd.end.map(|dt| {
2360            UnixNanos::from(
2361                dt.timestamp_nanos_opt()
2362                    .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
2363            )
2364        });
2365
2366        // Spawn async task to handle the request with pagination
2367        let client_clone = client.as_arc().clone();
2368        let limit = cmd.limit.map_or(1000, |l| l.get());
2369        let start_nanos_clone = start_nanos;
2370        let end_nanos_clone = end_nanos;
2371        let cmd_start = cmd.start;
2372        let cmd_end = cmd.end;
2373
2374        get_runtime().spawn(async move {
2375            let mut all_trades = Vec::new();
2376            // Work backwards from end_date, updating end to the earliest tick received
2377            let mut current_end_date = cmd_end;
2378            if current_end_date.is_none() {
2379                current_end_date = Some(chrono::Utc::now());
2380            }
2381            let current_start_date = cmd_start;
2382
2383            // Pagination loop: continue while (start exists and end > start) or (len < limit)
2384            loop {
2385                let should_continue =
2386                    if let (Some(start), Some(end)) = (current_start_date, current_end_date) {
2387                        end > start
2388                    } else {
2389                        false
2390                    };
2391
2392                if !should_continue && all_trades.len() >= limit {
2393                    break;
2394                }
2395
2396                let current_end_ib = current_end_date.as_ref().map(chrono_to_ib_datetime);
2397
2398                // Make request for this batch
2399                match client_clone
2400                    .historical_ticks_trade(
2401                        &contract,
2402                        current_start_date.as_ref().map(chrono_to_ib_datetime),
2403                        current_end_ib,
2404                        number_of_ticks,
2405                        ibapi::market_data::TradingHours::Regular,
2406                    )
2407                    .await
2408                {
2409                    Ok(mut subscription) => {
2410                        let mut batch_trades = Vec::new();
2411
2412                        while let Some(tick) = subscription.next().await {
2413                            let ts_event =
2414                                super::convert::ib_timestamp_to_unix_nanos(&tick.timestamp);
2415                            let ts_init = clock.get_time_ns();
2416
2417                            // Generate trade ID from exchange and special conditions if available
2418                            let trade_id = None;
2419
2420                            match super::parse::parse_trade_tick(
2421                                instrument_id,
2422                                tick.price,
2423                                tick.size as f64,
2424                                price_precision,
2425                                size_precision,
2426                                ts_event,
2427                                ts_init,
2428                                trade_id,
2429                            ) {
2430                                Ok(trade_tick) => batch_trades.push(trade_tick),
2431                                Err(e) => {
2432                                    tracing::warn!("Failed to parse trade tick: {:?}", e);
2433                                }
2434                            }
2435                        }
2436
2437                        if batch_trades.is_empty() {
2438                            break;
2439                        }
2440
2441                        // Update current_end_date to the minimum ts_init from this batch for next iteration
2442                        // This works backwards in time
2443                        if let Some(min_tick) = batch_trades.iter().min_by_key(|t| t.ts_init) {
2444                            let min_ts_nanos = min_tick.ts_init.as_u64();
2445                            // Convert UnixNanos to DateTime<Utc>
2446                            let min_ts_seconds = (min_ts_nanos / 1_000_000_000) as i64;
2447                            let min_ts_nanos_remainder = (min_ts_nanos % 1_000_000_000) as u32;
2448                            current_end_date = chrono::DateTime::from_timestamp(
2449                                min_ts_seconds,
2450                                min_ts_nanos_remainder,
2451                            );
2452                        }
2453
2454                        all_trades.extend(batch_trades);
2455
2456                        // Check if we should continue - need start and current_end > start
2457                        if let (Some(start_dt), Some(end_dt)) =
2458                            (current_start_date, current_end_date)
2459                            && end_dt <= start_dt
2460                        {
2461                            // Filter out trades after end_date_time and before start
2462                            if let Some(end_limit) = end_nanos_clone {
2463                                all_trades.retain(|t| t.ts_init <= end_limit);
2464                            }
2465
2466                            if let Some(start_limit) = start_nanos_clone {
2467                                all_trades.retain(|t| t.ts_init >= start_limit);
2468                            }
2469                            break;
2470                        }
2471
2472                        // Break if we've reached the limit
2473                        if all_trades.len() >= limit {
2474                            break;
2475                        }
2476                    }
2477                    Err(e) => {
2478                        tracing::error!(
2479                            "Historical trades request failed for {}: {:?}",
2480                            instrument_id,
2481                            e
2482                        );
2483                        break;
2484                    }
2485                }
2486            }
2487
2488            // Filter out ticks after end_date_time if specified
2489            if let Some(end_limit) = end_nanos_clone {
2490                all_trades.retain(|t| t.ts_init <= end_limit);
2491            }
2492
2493            // Sort by ts_init
2494            all_trades.sort_by_key(|t| t.ts_init);
2495
2496            let trades_count = all_trades.len();
2497            let response = DataResponse::Trades(TradesResponse::new(
2498                request_id,
2499                client_id,
2500                instrument_id,
2501                all_trades,
2502                start_nanos_clone,
2503                end_nanos_clone,
2504                clock.get_time_ns(),
2505                params,
2506            ));
2507
2508            if let Err(e) = data_sender.send(DataEvent::Response(response)) {
2509                tracing::error!("Failed to send trades response: {e}");
2510            } else {
2511                tracing::info!(
2512                    "Successfully sent {} trades for {}",
2513                    trades_count,
2514                    instrument_id
2515                );
2516            }
2517        });
2518
2519        Ok(())
2520    }
2521
2522    fn request_bars(&self, cmd: RequestBars) -> anyhow::Result<()> {
2523        tracing::debug!("Requesting bars for {}", cmd.bar_type);
2524
2525        // Validate bar spec (only time-aggregated bars are supported)
2526        if !cmd.bar_type.spec().is_time_aggregated() {
2527            tracing::error!(
2528                "Cannot request {} bars: only time bars are aggregated by Interactive Brokers",
2529                cmd.bar_type
2530            );
2531            return Ok(());
2532        }
2533
2534        let client = self
2535            .ib_client
2536            .as_ref()
2537            .context("IB client not connected. Call connect() first")?;
2538
2539        // Get instrument from provider
2540        let instrument_id = cmd.bar_type.instrument_id();
2541        let instrument = self
2542            .instrument_provider
2543            .find(&instrument_id)
2544            .context(format!("Instrument {instrument_id} not found in provider"))?;
2545
2546        let price_precision = instrument.price_precision();
2547        let size_precision = instrument.size_precision();
2548
2549        // Convert instrument_id to IB contract
2550        let contract = self
2551            .instrument_provider
2552            .resolve_contract_for_instrument(instrument_id)
2553            .context("Failed to convert instrument_id to IB contract")?;
2554
2555        // Convert bar type to IB formats
2556        let ib_bar_size = bar_type_to_ib_bar_size(&cmd.bar_type)
2557            .context("Failed to convert bar type to IB bar size")?;
2558        let ib_what_to_show = price_type_to_ib_what_to_show(cmd.bar_type.spec().price_type);
2559
2560        // Calculate segments to break down the request if needed
2561        let segments = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
2562            calculate_duration_segments(start, end)
2563        } else {
2564            let end_date = cmd.end.unwrap_or_else(chrono::Utc::now);
2565            let duration = calculate_duration(cmd.start, cmd.end).unwrap_or_else(|_| 1i32.days());
2566            vec![(end_date, duration)]
2567        };
2568
2569        let bar_type = cmd.bar_type;
2570        let data_sender = self.data_sender.clone();
2571        let clock = self.clock;
2572        let request_id = cmd.request_id;
2573        let client_id = cmd.client_id.unwrap_or(self.client_id);
2574        let params = cmd.params.clone();
2575        let start_nanos = cmd.start.map(|dt| {
2576            UnixNanos::from(
2577                dt.timestamp_nanos_opt()
2578                    .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
2579            )
2580        });
2581        let end_nanos = cmd.end.map(|dt| {
2582            UnixNanos::from(
2583                dt.timestamp_nanos_opt()
2584                    .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
2585            )
2586        });
2587
2588        // Spawn async task to handle the request with segmentation
2589        let client_clone = client.as_arc().clone();
2590
2591        get_runtime().spawn(async move {
2592            let mut all_bars = Vec::new();
2593
2594            for (seg_end, seg_duration) in segments {
2595                let end_ib = chrono_to_ib_datetime(&seg_end);
2596
2597                match client_clone
2598                    .historical_data(
2599                        &contract,
2600                        Some(end_ib),
2601                        seg_duration,
2602                        ib_bar_size,
2603                        Some(ib_what_to_show),
2604                        ibapi::market_data::TradingHours::Regular,
2605                    )
2606                    .await
2607                {
2608                    Ok(historical_data) => {
2609                        // Convert IB bars to Nautilus bars
2610                        for ib_bar in &historical_data.bars {
2611                            match ib_bar_to_nautilus_bar(
2612                                ib_bar,
2613                                bar_type,
2614                                price_precision,
2615                                size_precision,
2616                            ) {
2617                                Ok(bar) => all_bars.push(bar),
2618                                Err(e) => {
2619                                    tracing::warn!(
2620                                        "Failed to convert IB bar to Nautilus bar: {:?}",
2621                                        e
2622                                    );
2623                                }
2624                            }
2625                        }
2626                    }
2627                    Err(e) => {
2628                        tracing::error!(
2629                            "Historical data request failed for {} segment: {:?}",
2630                            bar_type,
2631                            e
2632                        );
2633                        // We continue with other segments if one fails?
2634                        // For now keep going to return what we have
2635                    }
2636                }
2637            }
2638
2639            // Return aggregated results
2640            let bars_count = all_bars.len();
2641            if bars_count == 0 {
2642                tracing::warn!("No bar data received for {}", bar_type);
2643            }
2644
2645            // Sort bars by timestamp as segments might overlap or be out of order from IB
2646            all_bars.sort_by_key(|b| b.ts_event);
2647
2648            let response = DataResponse::Bars(BarsResponse::new(
2649                request_id,
2650                client_id,
2651                bar_type,
2652                all_bars,
2653                start_nanos,
2654                end_nanos,
2655                clock.get_time_ns(),
2656                params,
2657            ));
2658
2659            if let Err(e) = data_sender.send(DataEvent::Response(response)) {
2660                tracing::error!("Failed to send bars response: {e}");
2661            } else {
2662                tracing::info!(
2663                    "Successfully sent {} bars for {} (segmented)",
2664                    bars_count,
2665                    bar_type
2666                );
2667            }
2668        });
2669
2670        Ok(())
2671    }
2672}
2673
2674impl Drop for InteractiveBrokersDataClient {
2675    fn drop(&mut self) {
2676        let _ = self.stop();
2677    }
2678}