1#[path = "core_streams.rs"]
19mod streams;
20
21use std::{
22 fmt::Debug,
23 sync::{
24 Arc,
25 atomic::{AtomicBool, Ordering},
26 },
27};
28
29use ahash::AHashMap;
30use anyhow::Context;
31#[cfg(feature = "python")]
32use chrono::{DateTime, Utc};
33use ibapi::{
34 contracts::{Contract, Currency as IBCurrency, Exchange as IBExchange, SecurityType, Symbol},
35 market_data::historical::ToDuration,
36};
37use nautilus_common::{
38 clients::DataClient,
39 live::{get_runtime, runner::get_data_event_sender},
40 messages::{
41 DataEvent, DataResponse,
42 data::{
43 BarsResponse, InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
44 RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
45 SubscribeBookDeltas, SubscribeIndexPrices, SubscribeOptionGreeks, SubscribeQuotes,
46 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
47 UnsubscribeIndexPrices, UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades,
48 },
49 },
50};
51#[cfg(feature = "python")]
52use nautilus_core::{Params, UUID4};
53use nautilus_core::{
54 UnixNanos,
55 time::{AtomicTime, get_atomic_clock_realtime},
56};
57#[cfg(feature = "python")]
58use nautilus_model::data::{Bar, BarType, Data, QuoteTick, TradeTick};
59use nautilus_model::{
60 enums::BookType,
61 identifiers::{ClientId, InstrumentId, Venue},
62 instruments::{Instrument, any::InstrumentAny},
63};
64#[cfg(feature = "python")]
65use pyo3::{IntoPyObjectExt, prelude::*};
66use tokio::task::JoinHandle;
67use tokio_util::sync::CancellationToken;
68
69use self::streams::{
70 handle_historical_bars_subscription, handle_index_price_subscription,
71 handle_market_depth_subscription, handle_option_greeks_subscription, handle_quote_subscription,
72 handle_realtime_bars_subscription, handle_tick_by_tick_quote_subscription,
73 handle_trade_subscription,
74};
75use super::{
76 cache::{OptionGreeksCache, QuoteCache},
77 convert::{
78 bar_type_to_ib_bar_size, calculate_duration, calculate_duration_segments,
79 chrono_to_ib_datetime, ib_bar_to_nautilus_bar, price_type_to_ib_what_to_show,
80 },
81};
82use crate::{
83 common::{consts::IB_VENUE, shared_client::SharedClientHandle},
84 config::InteractiveBrokersDataClientConfig,
85 providers::instruments::InteractiveBrokersInstrumentProvider,
86};
87
88#[cfg_attr(
94 feature = "python",
95 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.interactive_brokers")
96)]
97pub struct InteractiveBrokersDataClient {
98 client_id: ClientId,
100 config: InteractiveBrokersDataClientConfig,
102 instrument_provider: Arc<InteractiveBrokersInstrumentProvider>,
104 is_connected: AtomicBool,
106 cancellation_token: CancellationToken,
108 tasks: Vec<JoinHandle<()>>,
110 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
112 subscriptions: Arc<tokio::sync::Mutex<AHashMap<InstrumentId, SubscriptionInfo>>>,
114 option_greeks_subscriptions: Arc<tokio::sync::Mutex<AHashMap<InstrumentId, CancellationToken>>>,
116 quote_cache: Arc<tokio::sync::Mutex<QuoteCache>>,
118 option_greeks_cache: Arc<tokio::sync::Mutex<OptionGreeksCache>>,
120 clock: &'static AtomicTime,
122 ib_client: Option<SharedClientHandle>,
124 last_bars: Arc<tokio::sync::Mutex<AHashMap<String, ibapi::market_data::realtime::Bar>>>,
126 bar_timeout_tasks: Arc<tokio::sync::Mutex<AHashMap<String, tokio::task::JoinHandle<()>>>>,
128}
129
130#[derive(Debug)]
132#[allow(dead_code)]
133struct SubscriptionInfo {
134 instrument_id: InstrumentId,
136 subscription_type: SubscriptionType,
138 cancellation_token: CancellationToken,
140}
141
142#[derive(Debug, Clone)]
144enum SubscriptionType {
145 Quotes,
147 IndexPrices,
149 Trades,
151 Bars,
153 BookDeltas,
155}
156
157#[cfg(feature = "python")]
158static DATA_EVENT_CALLBACK: std::sync::OnceLock<std::sync::Mutex<Option<Py<PyAny>>>> =
159 std::sync::OnceLock::new();
160
161#[cfg(feature = "python")]
162thread_local! {
163 static DATA_EVENT_BRIDGE_INITIALIZED: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
164}
165
166#[cfg(feature = "python")]
167fn data_event_callback() -> &'static std::sync::Mutex<Option<Py<PyAny>>> {
168 DATA_EVENT_CALLBACK.get_or_init(|| std::sync::Mutex::new(None))
169}
170
171#[cfg(feature = "python")]
172fn string_hash_map_to_params(
173 map: Option<std::collections::HashMap<String, String>>,
174) -> Option<Params> {
175 map.map(|m| {
176 let mut params = Params::new();
177 for (key, value) in m {
178 params.insert(key, serde_json::Value::String(value));
179 }
180 params
181 })
182}
183
184#[cfg(feature = "python")]
185fn unix_nanos_to_datetime(nanos: Option<u64>) -> Option<DateTime<Utc>> {
186 nanos.and_then(|value| {
187 let secs = value / 1_000_000_000;
188 let nanos_part = (value % 1_000_000_000) as u32;
189 DateTime::from_timestamp(secs as i64, nanos_part)
190 })
191}
192
193#[cfg(feature = "python")]
194fn u64_to_nonzero_usize(value: u64) -> Option<std::num::NonZeroUsize> {
195 std::num::NonZeroUsize::new(value as usize)
196}
197
198#[cfg(feature = "python")]
199fn u16_to_nonzero_usize(value: u16) -> Option<std::num::NonZeroUsize> {
200 std::num::NonZeroUsize::new(value as usize)
201}
202
203fn parse_start_ns(params: Option<&nautilus_core::Params>) -> Option<UnixNanos> {
204 params
205 .and_then(|params| params.get_u64("start_ns"))
206 .or_else(|| {
207 params
208 .and_then(|params| params.get_str("start_ns"))
209 .and_then(|value| value.parse::<u64>().ok())
210 })
211 .map(UnixNanos::from)
212}
213
214#[cfg(feature = "python")]
215fn py_list_from_quotes(py: Python<'_>, values: Vec<QuoteTick>) -> PyResult<Py<PyAny>> {
216 let items: PyResult<Vec<Py<PyAny>>> = values
217 .into_iter()
218 .map(|value| value.into_py_any(py))
219 .collect();
220 items?.into_py_any(py)
221}
222
223#[cfg(feature = "python")]
224fn py_list_from_trades(py: Python<'_>, values: Vec<TradeTick>) -> PyResult<Py<PyAny>> {
225 let items: PyResult<Vec<Py<PyAny>>> = values
226 .into_iter()
227 .map(|value| value.into_py_any(py))
228 .collect();
229 items?.into_py_any(py)
230}
231
232#[cfg(feature = "python")]
233fn py_list_from_bars(py: Python<'_>, values: Vec<Bar>) -> PyResult<Py<PyAny>> {
234 let items: PyResult<Vec<Py<PyAny>>> = values
235 .into_iter()
236 .map(|value| value.into_py_any(py))
237 .collect();
238 items?.into_py_any(py)
239}
240
241#[cfg(feature = "python")]
242fn dispatch_python_data_event(
243 py: Python<'_>,
244 callback: &Py<PyAny>,
245 event: DataEvent,
246) -> PyResult<()> {
247 let (kind, correlation_id, payload) = match event {
248 DataEvent::Data(data) => match data {
249 Data::Quote(quote) => ("quote", None, quote.into_py_any(py)?),
250 Data::Trade(trade) => ("trade", None, trade.into_py_any(py)?),
251 Data::Bar(bar) => ("bar", None, bar.into_py_any(py)?),
252 Data::Delta(delta) => ("delta", None, delta.into_py_any(py)?),
253 Data::IndexPriceUpdate(index_price) => {
254 ("index_price", None, index_price.into_py_any(py)?)
255 }
256 other => {
257 tracing::debug!("Ignoring unsupported IB data event payload: {:?}", other);
258 return Ok(());
259 }
260 },
261 DataEvent::OptionGreeks(greeks) => ("option_greeks", None, greeks.into_py_any(py)?),
262 DataEvent::Instrument(instrument) => (
263 "instrument",
264 None,
265 nautilus_model::python::instruments::instrument_any_to_pyobject(py, instrument)?,
266 ),
267 DataEvent::Response(response) => match response {
268 DataResponse::Instrument(response) => (
269 "instrument_response",
270 Some(response.correlation_id.to_string()),
271 nautilus_model::python::instruments::instrument_any_to_pyobject(py, response.data)?,
272 ),
273 DataResponse::Instruments(response) => (
274 "instruments_response",
275 Some(response.correlation_id.to_string()),
276 response
277 .data
278 .into_iter()
279 .map(|instrument| {
280 nautilus_model::python::instruments::instrument_any_to_pyobject(
281 py, instrument,
282 )
283 })
284 .collect::<PyResult<Vec<_>>>()?
285 .into_py_any(py)?,
286 ),
287 DataResponse::Quotes(response) => (
288 "quotes_response",
289 Some(response.correlation_id.to_string()),
290 py_list_from_quotes(py, response.data)?,
291 ),
292 DataResponse::Trades(response) => (
293 "trades_response",
294 Some(response.correlation_id.to_string()),
295 py_list_from_trades(py, response.data)?,
296 ),
297 DataResponse::Bars(response) => (
298 "bars_response",
299 Some(response.correlation_id.to_string()),
300 py_list_from_bars(py, response.data)?,
301 ),
302 other => {
303 tracing::debug!("Ignoring unsupported IB data response payload: {:?}", other);
304 return Ok(());
305 }
306 },
307 other => {
308 tracing::debug!("Ignoring unsupported IB data event variant: {:?}", other);
309 return Ok(());
310 }
311 };
312
313 callback.call1(py, (kind, correlation_id, payload))?;
314 Ok(())
315}
316
317impl InteractiveBrokersDataClient {
318 pub fn new(
330 client_id: ClientId,
331 config: InteractiveBrokersDataClientConfig,
332 instrument_provider: Arc<InteractiveBrokersInstrumentProvider>,
333 ) -> anyhow::Result<Self> {
334 let clock = get_atomic_clock_realtime();
335 let data_sender = get_data_event_sender();
336
337 Ok(Self {
338 client_id,
339 config,
340 instrument_provider,
341 is_connected: AtomicBool::new(false),
342 cancellation_token: CancellationToken::new(),
343 tasks: Vec::new(),
344 data_sender,
345 subscriptions: Arc::new(tokio::sync::Mutex::new(AHashMap::new())),
346 option_greeks_subscriptions: Arc::new(tokio::sync::Mutex::new(AHashMap::new())),
347 quote_cache: Arc::new(tokio::sync::Mutex::new(QuoteCache::new())),
348 option_greeks_cache: Arc::new(tokio::sync::Mutex::new(OptionGreeksCache::new())),
349 clock,
350 ib_client: None,
351 last_bars: Arc::new(tokio::sync::Mutex::new(AHashMap::new())),
352 bar_timeout_tasks: Arc::new(tokio::sync::Mutex::new(AHashMap::new())),
353 })
354 }
355
356 #[cfg(feature = "python")]
357 pub(crate) fn new_for_python(
358 config: InteractiveBrokersDataClientConfig,
359 instrument_provider: crate::providers::instruments::InteractiveBrokersInstrumentProvider,
360 ) -> anyhow::Result<Self> {
361 Self::ensure_python_event_bridge();
362 let client_id = ClientId::from(format!("IB-{:03}", config.client_id));
363 Self::new(client_id, config, Arc::new(instrument_provider))
364 }
365
366 #[cfg(feature = "python")]
367 pub(crate) fn register_python_event_callback(&self, callback: Py<PyAny>) {
368 *data_event_callback()
369 .lock()
370 .expect("data event callback mutex poisoned") = Some(callback);
371 }
372
373 #[cfg(feature = "python")]
374 fn ensure_python_event_bridge() {
375 if nautilus_common::live::runner::try_get_data_event_sender().is_some() {
376 return;
377 }
378
379 DATA_EVENT_BRIDGE_INITIALIZED.with(|initialized| {
380 if initialized.replace(true) {
381 return;
382 }
383
384 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
385 nautilus_common::live::runner::set_data_event_sender(sender);
386
387 get_runtime().spawn(async move {
388 while let Some(event) = receiver.recv().await {
389 Python::attach(|py| {
390 let callback_guard = data_event_callback()
391 .lock()
392 .expect("data event callback mutex poisoned");
393
394 let Some(callback) = callback_guard.as_ref() else {
395 return;
396 };
397
398 if let Err(e) = dispatch_python_data_event(py, callback, event) {
399 tracing::error!("Failed to dispatch IB data event to Python: {e}");
400 }
401 });
402 }
403 });
404 });
405 }
406
407 #[cfg(feature = "python")]
408 pub(crate) fn subscribe_quotes_for_python(
409 &mut self,
410 instrument_id: InstrumentId,
411 params: Option<std::collections::HashMap<String, String>>,
412 ) -> anyhow::Result<()> {
413 let cmd = SubscribeQuotes {
414 instrument_id,
415 client_id: Some(self.client_id()),
416 venue: Some(instrument_id.venue),
417 command_id: UUID4::new(),
418 ts_init: self.clock.get_time_ns(),
419 correlation_id: None,
420 params: string_hash_map_to_params(params),
421 };
422 DataClient::subscribe_quotes(self, cmd)
423 }
424
425 #[cfg(feature = "python")]
426 pub(crate) fn subscribe_index_prices_for_python(
427 &mut self,
428 instrument_id: InstrumentId,
429 ) -> anyhow::Result<()> {
430 let cmd = SubscribeIndexPrices {
431 instrument_id,
432 client_id: Some(self.client_id()),
433 venue: Some(instrument_id.venue),
434 command_id: UUID4::new(),
435 ts_init: self.clock.get_time_ns(),
436 correlation_id: None,
437 params: None,
438 };
439 DataClient::subscribe_index_prices(self, cmd)
440 }
441
442 #[cfg(feature = "python")]
443 pub(crate) fn subscribe_option_greeks_for_python(
444 &mut self,
445 instrument_id: InstrumentId,
446 ) -> anyhow::Result<()> {
447 let cmd = SubscribeOptionGreeks {
448 instrument_id,
449 client_id: Some(self.client_id()),
450 venue: Some(instrument_id.venue),
451 command_id: UUID4::new(),
452 ts_init: self.clock.get_time_ns(),
453 correlation_id: None,
454 params: None,
455 };
456 DataClient::subscribe_option_greeks(self, cmd)
457 }
458
459 #[cfg(feature = "python")]
460 pub(crate) fn subscribe_trades_for_python(
461 &mut self,
462 instrument_id: InstrumentId,
463 ) -> anyhow::Result<()> {
464 let cmd = SubscribeTrades {
465 instrument_id,
466 client_id: Some(self.client_id()),
467 venue: Some(instrument_id.venue),
468 command_id: UUID4::new(),
469 ts_init: self.clock.get_time_ns(),
470 correlation_id: None,
471 params: None,
472 };
473 DataClient::subscribe_trades(self, cmd)
474 }
475
476 #[cfg(feature = "python")]
477 pub(crate) fn subscribe_bars_for_python(
478 &mut self,
479 bar_type: BarType,
480 params: Option<std::collections::HashMap<String, String>>,
481 ) -> anyhow::Result<()> {
482 let cmd = SubscribeBars {
483 bar_type,
484 client_id: Some(self.client_id()),
485 venue: Some(bar_type.instrument_id().venue),
486 command_id: UUID4::new(),
487 ts_init: self.clock.get_time_ns(),
488 correlation_id: None,
489 params: string_hash_map_to_params(params),
490 };
491 DataClient::subscribe_bars(self, cmd)
492 }
493
494 #[cfg(feature = "python")]
495 pub(crate) fn subscribe_book_deltas_for_python(
496 &mut self,
497 instrument_id: InstrumentId,
498 depth: Option<u16>,
499 params: Option<std::collections::HashMap<String, String>>,
500 ) -> anyhow::Result<()> {
501 let cmd = SubscribeBookDeltas {
502 instrument_id,
503 book_type: BookType::L2_MBP,
504 client_id: Some(self.client_id()),
505 venue: Some(instrument_id.venue),
506 command_id: UUID4::new(),
507 ts_init: self.clock.get_time_ns(),
508 depth: u16_to_nonzero_usize(depth.unwrap_or(20)),
509 managed: true,
510 correlation_id: None,
511 params: string_hash_map_to_params(params),
512 };
513 DataClient::subscribe_book_deltas(self, cmd)
514 }
515
516 #[cfg(feature = "python")]
517 pub(crate) fn unsubscribe_quotes_for_python(
518 &mut self,
519 instrument_id: InstrumentId,
520 ) -> anyhow::Result<()> {
521 let cmd = UnsubscribeQuotes {
522 instrument_id,
523 client_id: Some(self.client_id()),
524 venue: Some(instrument_id.venue),
525 command_id: UUID4::new(),
526 ts_init: self.clock.get_time_ns(),
527 correlation_id: None,
528 params: None,
529 };
530 DataClient::unsubscribe_quotes(self, &cmd)
531 }
532
533 #[cfg(feature = "python")]
534 pub(crate) fn unsubscribe_index_prices_for_python(
535 &mut self,
536 instrument_id: InstrumentId,
537 ) -> anyhow::Result<()> {
538 let cmd = UnsubscribeIndexPrices {
539 instrument_id,
540 client_id: Some(self.client_id()),
541 venue: Some(instrument_id.venue),
542 command_id: UUID4::new(),
543 ts_init: self.clock.get_time_ns(),
544 correlation_id: None,
545 params: None,
546 };
547 DataClient::unsubscribe_index_prices(self, &cmd)
548 }
549
550 #[cfg(feature = "python")]
551 pub(crate) fn unsubscribe_option_greeks_for_python(
552 &mut self,
553 instrument_id: InstrumentId,
554 ) -> anyhow::Result<()> {
555 let cmd = UnsubscribeOptionGreeks {
556 instrument_id,
557 client_id: Some(self.client_id()),
558 venue: Some(instrument_id.venue),
559 command_id: UUID4::new(),
560 ts_init: self.clock.get_time_ns(),
561 correlation_id: None,
562 params: None,
563 };
564 DataClient::unsubscribe_option_greeks(self, &cmd)
565 }
566
567 #[cfg(feature = "python")]
568 pub(crate) fn unsubscribe_trades_for_python(
569 &mut self,
570 instrument_id: InstrumentId,
571 ) -> anyhow::Result<()> {
572 let cmd = UnsubscribeTrades {
573 instrument_id,
574 client_id: Some(self.client_id()),
575 venue: Some(instrument_id.venue),
576 command_id: UUID4::new(),
577 ts_init: self.clock.get_time_ns(),
578 correlation_id: None,
579 params: None,
580 };
581 DataClient::unsubscribe_trades(self, &cmd)
582 }
583
584 #[cfg(feature = "python")]
585 pub(crate) fn unsubscribe_bars_for_python(&mut self, bar_type: BarType) -> anyhow::Result<()> {
586 let cmd = UnsubscribeBars {
587 bar_type,
588 client_id: Some(self.client_id()),
589 venue: Some(bar_type.instrument_id().venue),
590 command_id: UUID4::new(),
591 ts_init: self.clock.get_time_ns(),
592 correlation_id: None,
593 params: None,
594 };
595 DataClient::unsubscribe_bars(self, &cmd)
596 }
597
598 #[cfg(feature = "python")]
599 pub(crate) fn unsubscribe_book_deltas_for_python(
600 &mut self,
601 instrument_id: InstrumentId,
602 ) -> anyhow::Result<()> {
603 let cmd = UnsubscribeBookDeltas {
604 instrument_id,
605 client_id: Some(self.client_id()),
606 venue: Some(instrument_id.venue),
607 command_id: UUID4::new(),
608 ts_init: self.clock.get_time_ns(),
609 correlation_id: None,
610 params: None,
611 };
612 DataClient::unsubscribe_book_deltas(self, &cmd)
613 }
614
615 #[cfg(feature = "python")]
616 pub(crate) fn request_quotes_for_python(
617 &self,
618 instrument_id: InstrumentId,
619 limit: Option<u64>,
620 start: Option<u64>,
621 end: Option<u64>,
622 request_id: Option<String>,
623 ) -> anyhow::Result<()> {
624 let req = RequestQuotes {
625 instrument_id,
626 start: unix_nanos_to_datetime(start),
627 end: unix_nanos_to_datetime(end),
628 limit: u64_to_nonzero_usize(limit.unwrap_or(10_000)),
629 client_id: Some(self.client_id()),
630 request_id: request_id.map_or_else(UUID4::new, UUID4::from),
631 ts_init: self.clock.get_time_ns(),
632 params: None,
633 };
634 DataClient::request_quotes(self, req)
635 }
636
637 #[cfg(feature = "python")]
638 pub(crate) fn request_trades_for_python(
639 &self,
640 instrument_id: InstrumentId,
641 limit: Option<u64>,
642 start: Option<u64>,
643 end: Option<u64>,
644 request_id: Option<String>,
645 ) -> anyhow::Result<()> {
646 let req = RequestTrades {
647 instrument_id,
648 start: unix_nanos_to_datetime(start),
649 end: unix_nanos_to_datetime(end),
650 limit: u64_to_nonzero_usize(limit.unwrap_or(10_000)),
651 client_id: Some(self.client_id()),
652 request_id: request_id.map_or_else(UUID4::new, UUID4::from),
653 ts_init: self.clock.get_time_ns(),
654 params: None,
655 };
656 DataClient::request_trades(self, req)
657 }
658
659 #[cfg(feature = "python")]
660 pub(crate) fn request_bars_for_python(
661 &self,
662 bar_type: BarType,
663 limit: Option<u64>,
664 start: Option<u64>,
665 end: Option<u64>,
666 request_id: Option<String>,
667 ) -> anyhow::Result<()> {
668 let req = RequestBars {
669 bar_type,
670 start: unix_nanos_to_datetime(start),
671 end: unix_nanos_to_datetime(end),
672 limit: u64_to_nonzero_usize(limit.unwrap_or(1_000)),
673 client_id: Some(self.client_id()),
674 request_id: request_id.map_or_else(UUID4::new, UUID4::from),
675 ts_init: self.clock.get_time_ns(),
676 params: None,
677 };
678 DataClient::request_bars(self, req)
679 }
680
681 #[cfg(feature = "python")]
682 pub(crate) fn request_instrument_for_python(
683 &self,
684 instrument_id: InstrumentId,
685 params: Option<std::collections::HashMap<String, String>>,
686 ) -> anyhow::Result<()> {
687 let req = RequestInstrument {
688 client_id: Some(self.client_id()),
689 instrument_id,
690 start: None,
691 end: None,
692 request_id: UUID4::new(),
693 ts_init: self.clock.get_time_ns(),
694 params: string_hash_map_to_params(params),
695 };
696 DataClient::request_instrument(self, req)
697 }
698
699 #[cfg(feature = "python")]
700 pub(crate) fn request_instruments_for_python(
701 &self,
702 venue: Option<Venue>,
703 params: Option<std::collections::HashMap<String, String>>,
704 ) -> anyhow::Result<()> {
705 let req = RequestInstruments {
706 client_id: Some(self.client_id()),
707 venue,
708 start: None,
709 end: None,
710 request_id: UUID4::new(),
711 ts_init: self.clock.get_time_ns(),
712 params: string_hash_map_to_params(params),
713 };
714 DataClient::request_instruments(self, req)
715 }
716
717 fn venue_id(&self) -> Venue {
718 *IB_VENUE
719 }
720
721 #[allow(dead_code)] pub(crate) fn get_ib_client(&self) -> Option<&Arc<ibapi::Client>> {
725 self.ib_client.as_ref().map(|h| h.as_arc())
726 }
727
728 #[allow(dead_code)] pub(crate) fn instrument_provider(&self) -> Arc<InteractiveBrokersInstrumentProvider> {
731 Arc::clone(&self.instrument_provider)
732 }
733
734 pub async fn batch_load_instruments(
748 &self,
749 instrument_ids: Vec<InstrumentId>,
750 ) -> anyhow::Result<Vec<InstrumentId>> {
751 log::debug!(
752 "Batch loading {} IB instruments through data client",
753 instrument_ids.len()
754 );
755 let client = self
756 .ib_client
757 .as_ref()
758 .context("IB client not connected. Call connect() first")?;
759
760 let loaded = self
761 .instrument_provider
762 .batch_load(client, instrument_ids, None)
763 .await?;
764 log::debug!("Batch loaded {} IB instruments", loaded.len());
765 Ok(loaded)
766 }
767
768 pub async fn fetch_option_chain_by_range(
786 &self,
787 underlying_symbol: &str,
788 exchange: Option<&str>,
789 currency: Option<&str>,
790 expiry_min: Option<&str>,
791 expiry_max: Option<&str>,
792 ) -> anyhow::Result<usize> {
793 log::debug!(
794 "Fetching IB option chain by range (symbol={}, exchange={:?}, currency={:?}, expiry_min={:?}, expiry_max={:?})",
795 underlying_symbol,
796 exchange,
797 currency,
798 expiry_min,
799 expiry_max
800 );
801 let client = self
802 .ib_client
803 .as_ref()
804 .context("IB client not connected. Call connect() first")?;
805
806 let underlying = Contract {
807 contract_id: 0,
808 symbol: Symbol::from(underlying_symbol.to_string()),
809 security_type: SecurityType::Stock,
810 last_trade_date_or_contract_month: String::new(),
811 strike: 0.0,
812 right: String::new(),
813 multiplier: String::new(),
814 exchange: IBExchange::from(exchange.unwrap_or("SMART")),
815 currency: IBCurrency::from(currency.unwrap_or("USD")),
816 local_symbol: String::new(),
817 primary_exchange: IBExchange::default(),
818 trading_class: String::new(),
819 include_expired: false,
820 security_id_type: String::new(),
821 security_id: String::new(),
822 combo_legs_description: String::new(),
823 combo_legs: Vec::new(),
824 delta_neutral_contract: None,
825 issuer_id: String::new(),
826 description: String::new(),
827 last_trade_date: None,
828 };
829
830 let count = self
831 .instrument_provider
832 .fetch_option_chain_by_range(client, &underlying, expiry_min, expiry_max)
833 .await?;
834 log::debug!(
835 "Fetched {} IB option instruments for {}",
836 count,
837 underlying_symbol
838 );
839 Ok(count)
840 }
841
842 pub async fn fetch_futures_chain(
858 &self,
859 symbol: &str,
860 exchange: Option<&str>,
861 currency: Option<&str>,
862 min_expiry_days: Option<u32>,
863 max_expiry_days: Option<u32>,
864 ) -> anyhow::Result<usize> {
865 log::debug!(
866 "Fetching IB futures chain (symbol={}, exchange={:?}, currency={:?}, min_days={:?}, max_days={:?})",
867 symbol,
868 exchange,
869 currency,
870 min_expiry_days,
871 max_expiry_days
872 );
873 let client = self
874 .ib_client
875 .as_ref()
876 .context("IB client not connected. Call connect() first")?;
877
878 let count = self
879 .instrument_provider
880 .fetch_futures_chain(
881 client,
882 symbol,
883 exchange.unwrap_or(""),
884 currency.unwrap_or("USD"),
885 min_expiry_days,
886 max_expiry_days,
887 )
888 .await?;
889 log::debug!("Fetched {} IB futures instruments for {}", count, symbol);
890 Ok(count)
891 }
892
893 pub async fn fetch_bag_contract(
907 &self,
908 bag_contract: &ibapi::contracts::Contract,
909 ) -> anyhow::Result<usize> {
910 log::debug!(
911 "Fetching IB BAG contract details (contract_id={}, exchange={}, symbol={})",
912 bag_contract.contract_id,
913 bag_contract.exchange.as_str(),
914 bag_contract.symbol.as_str()
915 );
916 let client = self
917 .ib_client
918 .as_ref()
919 .context("IB client not connected. Call connect() first")?;
920
921 let count = self
922 .instrument_provider
923 .fetch_bag_contract(client, bag_contract)
924 .await?;
925 log::debug!("Fetched {} BAG instruments", count);
926 Ok(count)
927 }
928}
929
930impl Debug for InteractiveBrokersDataClient {
931 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
932 f.debug_struct(stringify!(InteractiveBrokersDataClient))
933 .field("client_id", &self.client_id)
934 .field("config", &self.config)
935 .field("is_connected", &self.is_connected.load(Ordering::Relaxed))
936 .field("has_ib_client", &self.ib_client.is_some())
937 .finish_non_exhaustive()
938 }
939}
940
941#[async_trait::async_trait(?Send)]
942impl DataClient for InteractiveBrokersDataClient {
943 fn client_id(&self) -> ClientId {
944 self.client_id
945 }
946
947 fn venue(&self) -> Option<Venue> {
948 Some(self.venue_id())
949 }
950
951 fn start(&mut self) -> anyhow::Result<()> {
952 tracing::info!(
953 client_id = %self.client_id,
954 "Starting Interactive Brokers data client"
955 );
956 Ok(())
957 }
958
959 fn stop(&mut self) -> anyhow::Result<()> {
960 tracing::info!(
961 "Stopping Interactive Brokers data client {id}",
962 id = self.client_id
963 );
964 self.cancellation_token.cancel();
965 self.is_connected.store(false, Ordering::Relaxed);
966
967 for task in &self.tasks {
969 task.abort();
970 }
971 self.tasks.clear();
972
973 Ok(())
974 }
975
976 fn reset(&mut self) -> anyhow::Result<()> {
977 tracing::debug!(
978 "Resetting Interactive Brokers data client {id}",
979 id = self.client_id
980 );
981 self.is_connected.store(false, Ordering::Relaxed);
982 self.cancellation_token = CancellationToken::new();
983 self.tasks.clear();
984
985 {
987 let mut subscriptions = self.subscriptions.blocking_lock();
988 subscriptions.clear();
989 }
990 {
991 let mut subscriptions = self.option_greeks_subscriptions.blocking_lock();
992 subscriptions.clear();
993 }
994 {
995 let mut cache = self.quote_cache.blocking_lock();
996 cache.clear();
997 }
998 {
999 let mut cache = self.option_greeks_cache.blocking_lock();
1000 cache.clear();
1001 }
1002
1003 Ok(())
1004 }
1005
1006 fn dispose(&mut self) -> anyhow::Result<()> {
1007 self.stop()
1008 }
1009
1010 async fn connect(&mut self) -> anyhow::Result<()> {
1011 tracing::debug!("Connecting Interactive Brokers data client...");
1012
1013 let handle = crate::common::shared_client::get_or_connect(
1014 &self.config.host,
1015 self.config.port,
1016 self.config.client_id,
1017 self.config.connection_timeout,
1018 )
1019 .await
1020 .context("Failed to connect to IB Gateway/TWS")?;
1021
1022 let client = handle.as_arc();
1023
1024 tracing::info!(
1025 "Connected to IB Gateway/TWS at {}:{} (client_id: {})",
1026 self.config.host,
1027 self.config.port,
1028 self.config.client_id
1029 );
1030
1031 if self.config.market_data_type != crate::config::MarketDataType::Realtime {
1033 let ib_data_type: ibapi::market_data::MarketDataType =
1034 self.config.market_data_type.into();
1035 client
1036 .switch_market_data_type(ib_data_type)
1037 .await
1038 .context("Failed to switch market data type")?;
1039 tracing::info!("Set market data type to {:?}", self.config.market_data_type);
1040 }
1041
1042 self.ib_client = Some(handle);
1043 self.is_connected.store(true, Ordering::Relaxed);
1044
1045 tracing::debug!("Initializing IB data instrument provider");
1047 if let Err(e) = self.instrument_provider.initialize().await {
1048 tracing::warn!("Failed to initialize instrument provider: {}", e);
1049 }
1050
1051 tracing::debug!("Loading configured IB data instruments");
1052
1053 if let Err(e) = self
1054 .instrument_provider
1055 .load_all_async(
1056 self.ib_client.as_ref().unwrap().as_arc().as_ref(),
1057 None,
1058 None,
1059 false,
1060 )
1061 .await
1062 {
1063 tracing::warn!("Failed to load instruments on startup: {}", e);
1064 }
1065
1066 let instrument_count = self.instrument_provider.count();
1067 if instrument_count > 0 {
1068 tracing::info!(
1069 "Data client connected with {} instruments in provider cache",
1070 instrument_count
1071 );
1072 }
1073
1074 tracing::info!("Connected Interactive Brokers data client");
1075 Ok(())
1076 }
1077
1078 async fn disconnect(&mut self) -> anyhow::Result<()> {
1079 tracing::debug!("Disconnecting Interactive Brokers data client...");
1080
1081 self.stop()?;
1082 self.ib_client = None;
1083 self.is_connected.store(false, Ordering::Relaxed);
1084 tracing::info!("Disconnected Interactive Brokers data client");
1085 Ok(())
1086 }
1087
1088 fn is_connected(&self) -> bool {
1089 self.is_connected.load(Ordering::Relaxed)
1090 }
1091
1092 fn is_disconnected(&self) -> bool {
1093 !self.is_connected()
1094 }
1095
1096 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
1098 tracing::debug!("Subscribing to quotes for {}", cmd.instrument_id);
1099
1100 let client = self
1101 .ib_client
1102 .as_ref()
1103 .context("IB client not connected. Call connect() first")?;
1104
1105 let instrument = self
1107 .instrument_provider
1108 .find(&cmd.instrument_id)
1109 .context(format!(
1110 "Instrument {} not found in provider",
1111 cmd.instrument_id
1112 ))?;
1113
1114 let price_precision = instrument.price_precision();
1115 let size_precision = instrument.size_precision();
1116
1117 let contract = self
1119 .instrument_provider
1120 .resolve_contract_for_instrument(cmd.instrument_id)
1121 .context("Failed to convert instrument_id to IB contract")?;
1122
1123 let is_bag = matches!(
1126 contract.security_type,
1127 ibapi::contracts::SecurityType::Spread
1128 ) || !contract.combo_legs.is_empty();
1129 let batch_quotes = cmd
1130 .params
1131 .as_ref()
1132 .and_then(|params| params.get_str("batch_quotes"))
1133 .map_or(self.config.batch_quotes, |s| {
1134 s == "true" || s == "True" || s == "1"
1135 });
1136
1137 let use_market_data = is_bag || batch_quotes;
1138
1139 let instrument_id = cmd.instrument_id;
1140 let data_sender = self.data_sender.clone();
1141 let quote_cache = Arc::clone(&self.quote_cache);
1142 let clock = self.clock;
1143
1144 let price_magnifier = self.instrument_provider.get_price_magnifier(&instrument_id) as f64;
1146
1147 let subscription_token = CancellationToken::new();
1149
1150 let client_clone = client.as_arc().clone();
1152 let subscription_token_clone = subscription_token.clone();
1153 let ignore_size_updates = self.config.ignore_quote_tick_size_updates;
1154
1155 let task = get_runtime().spawn(async move {
1156 if use_market_data {
1157 tracing::debug!(
1159 "Using market_data subscription for {} (BAG: {}, batch_quotes: {})",
1160 instrument_id,
1161 is_bag,
1162 batch_quotes
1163 );
1164
1165 if let Err(e) = handle_quote_subscription(
1166 client_clone,
1167 contract,
1168 instrument_id,
1169 price_precision,
1170 size_precision,
1171 data_sender,
1172 quote_cache,
1173 clock,
1174 subscription_token_clone,
1175 ignore_size_updates,
1176 )
1177 .await
1178 {
1179 tracing::error!("Quote subscription error for {}: {:?}", instrument_id, e);
1180 }
1181 } else {
1182 tracing::debug!(
1185 "Attempting tick_by_tick_bid_ask subscription for {}",
1186 instrument_id
1187 );
1188
1189 match handle_tick_by_tick_quote_subscription(
1190 client_clone.clone(),
1191 contract.clone(),
1192 instrument_id,
1193 price_precision,
1194 size_precision,
1195 data_sender.clone(),
1196 clock,
1197 subscription_token_clone.clone(),
1198 price_magnifier,
1199 )
1200 .await
1201 {
1202 Ok(()) => {
1203 }
1205 Err(e) => {
1206 tracing::warn!(
1207 "tick_by_tick_bid_ask failed for {} (may be BAG contract), falling back to market_data: {:?}",
1208 instrument_id,
1209 e
1210 );
1211 if let Err(fallback_err) = handle_quote_subscription(
1213 client_clone,
1214 contract,
1215 instrument_id,
1216 price_precision,
1217 size_precision,
1218 data_sender,
1219 quote_cache,
1220 clock,
1221 subscription_token_clone,
1222 ignore_size_updates,
1223 )
1224 .await
1225 {
1226 tracing::error!(
1227 "Quote subscription fallback also failed for {}: {:?}",
1228 instrument_id,
1229 fallback_err
1230 );
1231 } else {
1232 tracing::info!(
1233 "Successfully subscribed to {} using market_data fallback",
1234 instrument_id
1235 );
1236 }
1237 }
1238 }
1239 }
1240 });
1241
1242 self.tasks.push(task);
1243
1244 let mut subscriptions = self.subscriptions.blocking_lock();
1246 subscriptions.insert(
1247 cmd.instrument_id,
1248 SubscriptionInfo {
1249 instrument_id: cmd.instrument_id,
1250 subscription_type: SubscriptionType::Quotes,
1251 cancellation_token: subscription_token,
1252 },
1253 );
1254
1255 tracing::info!(
1256 "Quote subscription started for {} (method: {})",
1257 cmd.instrument_id,
1258 if use_market_data {
1259 "market_data"
1260 } else {
1261 "tick_by_tick_bid_ask"
1262 }
1263 );
1264 Ok(())
1265 }
1266
1267 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
1268 tracing::debug!("Subscribing to index prices for {}", cmd.instrument_id);
1269
1270 let client = self
1271 .ib_client
1272 .as_ref()
1273 .context("IB client not connected. Call connect() first")?;
1274
1275 let instrument = self
1276 .instrument_provider
1277 .find(&cmd.instrument_id)
1278 .context(format!(
1279 "Instrument {} not found in provider",
1280 cmd.instrument_id
1281 ))?;
1282
1283 let contract = self
1284 .instrument_provider
1285 .resolve_contract_for_instrument(cmd.instrument_id)
1286 .context("Failed to convert instrument_id to IB contract")?;
1287
1288 if !matches!(contract.security_type, SecurityType::Index) {
1289 tracing::warn!(
1290 "Index price subscription not supported for security type {:?} on {}",
1291 contract.security_type,
1292 cmd.instrument_id
1293 );
1294 return Ok(());
1295 }
1296
1297 let price_precision = instrument.price_precision();
1298 let price_magnifier = self
1299 .instrument_provider
1300 .get_price_magnifier(&cmd.instrument_id);
1301 let instrument_id = cmd.instrument_id;
1302 let data_sender = self.data_sender.clone();
1303 let clock = self.clock;
1304
1305 let subscription_token = CancellationToken::new();
1306
1307 let client_clone = client.as_arc().clone();
1308 let subscription_token_clone = subscription_token.clone();
1309
1310 let task = get_runtime().spawn(async move {
1311 if let Err(e) = handle_index_price_subscription(
1312 client_clone,
1313 contract,
1314 instrument_id,
1315 price_precision,
1316 price_magnifier,
1317 data_sender,
1318 clock,
1319 subscription_token_clone,
1320 )
1321 .await
1322 {
1323 tracing::error!(
1324 "Index price subscription error for {}: {:?}",
1325 instrument_id,
1326 e
1327 );
1328 }
1329 });
1330
1331 self.tasks.push(task);
1332
1333 let mut subscriptions = self.subscriptions.blocking_lock();
1334 subscriptions.insert(
1335 cmd.instrument_id,
1336 SubscriptionInfo {
1337 instrument_id: cmd.instrument_id,
1338 subscription_type: SubscriptionType::IndexPrices,
1339 cancellation_token: subscription_token,
1340 },
1341 );
1342
1343 tracing::info!("Index price subscription started for {}", cmd.instrument_id);
1344 Ok(())
1345 }
1346
1347 fn subscribe_option_greeks(&mut self, cmd: SubscribeOptionGreeks) -> anyhow::Result<()> {
1348 tracing::debug!("Subscribing to option greeks for {}", cmd.instrument_id);
1349
1350 let client = self
1351 .ib_client
1352 .as_ref()
1353 .context("IB client not connected. Call connect() first")?;
1354
1355 let instrument = self
1356 .instrument_provider
1357 .find(&cmd.instrument_id)
1358 .context(format!(
1359 "Instrument {} not found in provider",
1360 cmd.instrument_id
1361 ))?;
1362
1363 if !matches!(
1364 instrument,
1365 InstrumentAny::OptionContract(_)
1366 | InstrumentAny::FuturesContract(_)
1367 | InstrumentAny::CryptoOption(_)
1368 ) && !matches!(
1369 self.instrument_provider
1370 .resolve_contract_for_instrument(cmd.instrument_id)?
1371 .security_type,
1372 SecurityType::Option | SecurityType::FuturesOption
1373 ) {
1374 tracing::warn!(
1375 "Option greeks subscription is only supported for option instruments: {}",
1376 cmd.instrument_id
1377 );
1378 return Ok(());
1379 }
1380
1381 let contract = self
1382 .instrument_provider
1383 .resolve_contract_for_instrument(cmd.instrument_id)
1384 .context("Failed to convert instrument_id to IB contract")?;
1385
1386 let instrument_id = cmd.instrument_id;
1387 let data_sender = self.data_sender.clone();
1388 let option_greeks_cache = Arc::clone(&self.option_greeks_cache);
1389 let clock = self.clock;
1390 let subscription_token = CancellationToken::new();
1391 let subscription_token_clone = subscription_token.clone();
1392 let client_clone = client.as_arc().clone();
1393
1394 let task = get_runtime().spawn(async move {
1395 if let Err(e) = handle_option_greeks_subscription(
1396 client_clone,
1397 contract,
1398 instrument_id,
1399 data_sender,
1400 option_greeks_cache,
1401 clock,
1402 subscription_token_clone,
1403 )
1404 .await
1405 {
1406 tracing::error!(
1407 "Option greeks subscription error for {}: {:?}",
1408 instrument_id,
1409 e
1410 );
1411 }
1412 });
1413
1414 self.tasks.push(task);
1415
1416 let mut subscriptions = self.option_greeks_subscriptions.blocking_lock();
1417 if let Some(existing) = subscriptions.insert(cmd.instrument_id, subscription_token) {
1418 existing.cancel();
1419 }
1420
1421 tracing::info!(
1422 "Option greeks subscription started for {}",
1423 cmd.instrument_id
1424 );
1425 Ok(())
1426 }
1427
1428 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1429 tracing::debug!("Unsubscribing from quotes for {}", cmd.instrument_id);
1430
1431 let mut subscriptions = self.subscriptions.blocking_lock();
1432 if let Some(sub_info) = subscriptions.remove(&cmd.instrument_id) {
1433 sub_info.cancellation_token.cancel();
1434 tracing::info!("Unsubscribed from quotes for {}", cmd.instrument_id);
1435 } else {
1436 tracing::warn!(
1437 "No active quote subscription found for {}",
1438 cmd.instrument_id
1439 );
1440 }
1441
1442 {
1444 }
1447
1448 Ok(())
1449 }
1450
1451 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1452 tracing::debug!("Unsubscribing from index prices for {}", cmd.instrument_id);
1453
1454 let mut subscriptions = self.subscriptions.blocking_lock();
1455 if let Some(sub_info) = subscriptions.remove(&cmd.instrument_id) {
1456 sub_info.cancellation_token.cancel();
1457 tracing::info!("Unsubscribed from index prices for {}", cmd.instrument_id);
1458 } else {
1459 tracing::warn!(
1460 "No active index price subscription found for {}",
1461 cmd.instrument_id
1462 );
1463 }
1464
1465 Ok(())
1466 }
1467
1468 fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
1469 tracing::debug!("Unsubscribing from option greeks for {}", cmd.instrument_id);
1470
1471 let mut subscriptions = self.option_greeks_subscriptions.blocking_lock();
1472 if let Some(subscription_token) = subscriptions.remove(&cmd.instrument_id) {
1473 subscription_token.cancel();
1474 tracing::info!("Unsubscribed from option greeks for {}", cmd.instrument_id);
1475 } else {
1476 tracing::warn!(
1477 "No active option greeks subscription found for {}",
1478 cmd.instrument_id
1479 );
1480 }
1481
1482 Ok(())
1483 }
1484
1485 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
1486 tracing::debug!("Subscribing to trades for {}", cmd.instrument_id);
1487
1488 let client = self
1489 .ib_client
1490 .as_ref()
1491 .context("IB client not connected. Call connect() first")?;
1492
1493 let instrument = self
1495 .instrument_provider
1496 .find(&cmd.instrument_id)
1497 .context(format!(
1498 "Instrument {} not found in provider",
1499 cmd.instrument_id
1500 ))?;
1501
1502 if matches!(instrument, InstrumentAny::CurrencyPair(_)) {
1504 tracing::error!(
1505 "Interactive Brokers does not support trades for CurrencyPair instruments: {}",
1506 cmd.instrument_id
1507 );
1508 return Ok(());
1509 }
1510
1511 let price_precision = instrument.price_precision();
1512 let size_precision = instrument.size_precision();
1513
1514 let contract = self
1516 .instrument_provider
1517 .resolve_contract_for_instrument(cmd.instrument_id)
1518 .context("Failed to convert instrument_id to IB contract")?;
1519
1520 let instrument_id = cmd.instrument_id;
1521 let data_sender = self.data_sender.clone();
1522 let clock = self.clock;
1523
1524 let subscription_token = CancellationToken::new();
1526
1527 let client_clone = client.as_arc().clone();
1529 let subscription_token_clone = subscription_token.clone();
1530
1531 let task = get_runtime().spawn(async move {
1532 if let Err(e) = handle_trade_subscription(
1533 client_clone,
1534 contract,
1535 instrument_id,
1536 price_precision,
1537 size_precision,
1538 data_sender,
1539 clock,
1540 subscription_token_clone,
1541 )
1542 .await
1543 {
1544 tracing::error!("Trade subscription error for {}: {:?}", instrument_id, e);
1545 }
1546 });
1547
1548 self.tasks.push(task);
1549
1550 let mut subscriptions = self.subscriptions.blocking_lock();
1552 subscriptions.insert(
1553 cmd.instrument_id,
1554 SubscriptionInfo {
1555 instrument_id: cmd.instrument_id,
1556 subscription_type: SubscriptionType::Trades,
1557 cancellation_token: subscription_token,
1558 },
1559 );
1560
1561 tracing::info!("Trade subscription started for {}", cmd.instrument_id);
1562 Ok(())
1563 }
1564
1565 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1566 tracing::debug!("Unsubscribing from trades for {}", cmd.instrument_id);
1567
1568 let mut subscriptions = self.subscriptions.blocking_lock();
1569 if let Some(sub_info) = subscriptions.remove(&cmd.instrument_id) {
1570 sub_info.cancellation_token.cancel();
1571 tracing::info!("Unsubscribed from trades for {}", cmd.instrument_id);
1572 } else {
1573 tracing::warn!(
1574 "No active trade subscription found for {}",
1575 cmd.instrument_id
1576 );
1577 }
1578
1579 Ok(())
1580 }
1581
1582 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
1583 tracing::debug!("Subscribing to bars for {}", cmd.bar_type);
1584
1585 let client = self
1586 .ib_client
1587 .as_ref()
1588 .context("IB client not connected. Call connect() first")?;
1589
1590 let instrument_id = cmd.bar_type.instrument_id();
1592 let instrument = self
1593 .instrument_provider
1594 .find(&instrument_id)
1595 .context(format!("Instrument {instrument_id} not found in provider"))?;
1596
1597 let price_precision = instrument.price_precision();
1598 let size_precision = instrument.size_precision();
1599
1600 let contract = self
1602 .instrument_provider
1603 .resolve_contract_for_instrument(instrument_id)
1604 .context("Failed to convert instrument_id to IB contract")?;
1605
1606 let bar_type = cmd.bar_type;
1607 let bar_type_str = bar_type.to_string();
1608 let data_sender = self.data_sender.clone();
1609 let clock = self.clock;
1610 let last_bars = Arc::clone(&self.last_bars);
1611 let bar_timeout_tasks = Arc::clone(&self.bar_timeout_tasks);
1612 let handle_revised_bars = self.config.handle_revised_bars;
1613 let use_rth = self.config.use_regular_trading_hours;
1614 let start_ns = parse_start_ns(cmd.params.as_ref());
1615
1616 let subscription_token = CancellationToken::new();
1618
1619 let client_clone = client.as_arc().clone();
1621 let subscription_token_clone = subscription_token.clone();
1622
1623 let task = get_runtime().spawn(async move {
1624 let result = if bar_type.spec().timedelta().num_seconds() == 5 {
1625 handle_realtime_bars_subscription(
1626 client_clone,
1627 contract,
1628 bar_type,
1629 bar_type_str,
1630 instrument_id,
1631 price_precision,
1632 size_precision,
1633 data_sender,
1634 clock,
1635 last_bars,
1636 bar_timeout_tasks,
1637 handle_revised_bars,
1638 subscription_token_clone,
1639 )
1640 .await
1641 } else {
1642 handle_historical_bars_subscription(
1643 client_clone,
1644 contract,
1645 bar_type,
1646 price_type_to_ib_what_to_show(bar_type.spec().price_type),
1647 price_precision,
1648 size_precision,
1649 use_rth,
1650 start_ns,
1651 data_sender,
1652 handle_revised_bars,
1653 clock,
1654 subscription_token_clone,
1655 )
1656 .await
1657 };
1658
1659 if let Err(e) = result {
1660 tracing::error!("Bars subscription error for {}: {:?}", bar_type, e);
1661 }
1662 });
1663
1664 self.tasks.push(task);
1665
1666 let mut subscriptions = self.subscriptions.blocking_lock();
1668 subscriptions.insert(
1669 instrument_id,
1670 SubscriptionInfo {
1671 instrument_id,
1672 subscription_type: SubscriptionType::Bars,
1673 cancellation_token: subscription_token,
1674 },
1675 );
1676
1677 tracing::info!("Real-time bars subscription started for {}", bar_type);
1678 Ok(())
1679 }
1680
1681 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1682 tracing::debug!("Unsubscribing from bars for {}", cmd.bar_type);
1683
1684 let instrument_id = cmd.bar_type.instrument_id();
1685 let mut subscriptions = self.subscriptions.blocking_lock();
1686 if let Some(sub_info) = subscriptions.remove(&instrument_id) {
1687 sub_info.cancellation_token.cancel();
1688 tracing::info!("Unsubscribed from bars for {}", cmd.bar_type);
1689 } else {
1690 tracing::warn!("No active bar subscription found for {}", cmd.bar_type);
1691 }
1692
1693 Ok(())
1694 }
1695
1696 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
1697 tracing::debug!("Subscribing to book deltas for {}", cmd.instrument_id);
1698
1699 if cmd.book_type == BookType::L3_MBO {
1701 tracing::error!(
1702 "Cannot subscribe to order book deltas: L3_MBO data is not published by Interactive Brokers. Valid book types are L1_MBP, L2_MBP"
1703 );
1704 return Ok(());
1705 }
1706
1707 let client = self
1708 .ib_client
1709 .as_ref()
1710 .context("IB client not connected. Call connect() first")?;
1711
1712 let instrument = self
1714 .instrument_provider
1715 .find(&cmd.instrument_id)
1716 .context(format!(
1717 "Instrument {} not found in provider",
1718 cmd.instrument_id
1719 ))?;
1720
1721 let price_precision = instrument.price_precision();
1722 let size_precision = instrument.size_precision();
1723
1724 let contract = self
1726 .instrument_provider
1727 .resolve_contract_for_instrument(cmd.instrument_id)
1728 .context("Failed to convert instrument_id to IB contract")?;
1729
1730 let instrument_id = cmd.instrument_id;
1731 let data_sender = self.data_sender.clone();
1732 let clock = self.clock;
1733
1734 let subscription_token = CancellationToken::new();
1736
1737 let depth_rows = cmd.depth.map_or(20, |d| d.get() as i32);
1739
1740 let is_smart_depth = cmd
1742 .params
1743 .as_ref()
1744 .and_then(|params| params.get_str("is_smart_depth"))
1745 .is_none_or(|s| s == "true" || s == "True" || s == "1");
1746
1747 let client_clone = client.as_arc().clone();
1749 let subscription_token_clone = subscription_token.clone();
1750
1751 let task = get_runtime().spawn(async move {
1752 if let Err(e) = handle_market_depth_subscription(
1753 client_clone,
1754 contract,
1755 instrument_id,
1756 price_precision,
1757 size_precision,
1758 depth_rows,
1759 is_smart_depth,
1760 data_sender,
1761 clock,
1762 subscription_token_clone,
1763 )
1764 .await
1765 {
1766 tracing::error!(
1767 "Market depth subscription error for {}: {:?}",
1768 instrument_id,
1769 e
1770 );
1771 }
1772 });
1773
1774 self.tasks.push(task);
1775
1776 let mut subscriptions = self.subscriptions.blocking_lock();
1778 subscriptions.insert(
1779 cmd.instrument_id,
1780 SubscriptionInfo {
1781 instrument_id: cmd.instrument_id,
1782 subscription_type: SubscriptionType::BookDeltas,
1783 cancellation_token: subscription_token,
1784 },
1785 );
1786
1787 tracing::info!(
1788 "Market depth subscription started for {}",
1789 cmd.instrument_id
1790 );
1791 Ok(())
1792 }
1793
1794 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1795 tracing::debug!("Unsubscribing from book deltas for {}", cmd.instrument_id);
1796
1797 let mut subscriptions = self.subscriptions.blocking_lock();
1798 if let Some(sub_info) = subscriptions.remove(&cmd.instrument_id) {
1799 sub_info.cancellation_token.cancel();
1800 tracing::info!("Unsubscribed from book deltas for {}", cmd.instrument_id);
1801 } else {
1802 tracing::warn!(
1803 "No active book delta subscription found for {}",
1804 cmd.instrument_id
1805 );
1806 }
1807
1808 Ok(())
1809 }
1810
1811 fn request_instrument(&self, cmd: RequestInstrument) -> anyhow::Result<()> {
1813 tracing::debug!("Requesting instrument: {}", cmd.instrument_id);
1814
1815 let force_update = cmd
1817 .params
1818 .as_ref()
1819 .and_then(|params| params.get_str("force_instrument_update"))
1820 .is_some_and(|s| s == "true" || s == "True" || s == "1");
1821
1822 let instrument =
1824 if force_update || self.instrument_provider.find(&cmd.instrument_id).is_none() {
1825 let client = self
1827 .ib_client
1828 .as_ref()
1829 .context("IB client not connected. Call connect() first")?;
1830 let instrument_provider = Arc::clone(&self.instrument_provider);
1831 let instrument_id = cmd.instrument_id;
1832 let data_sender = self.data_sender.clone();
1833 let clock = self.clock;
1834 let request_id = cmd.request_id;
1835 let client_id = cmd.client_id.unwrap_or(self.client_id);
1836 let params = cmd.params.clone();
1837 let start_nanos = cmd.start.map(|dt| {
1838 UnixNanos::from(
1839 dt.timestamp_nanos_opt()
1840 .unwrap_or_else(|| dt.timestamp() * 1_000_000_000)
1841 as u64,
1842 )
1843 });
1844 let end_nanos = cmd.end.map(|dt| {
1845 UnixNanos::from(
1846 dt.timestamp_nanos_opt()
1847 .unwrap_or_else(|| dt.timestamp() * 1_000_000_000)
1848 as u64,
1849 )
1850 });
1851
1852 let client_clone = client.as_arc().clone();
1853
1854 get_runtime().spawn(async move {
1855 if let Err(e) = instrument_provider
1856 .fetch_contract_details(&client_clone, instrument_id, false, None)
1857 .await
1858 {
1859 tracing::error!(
1860 "Failed to fetch contract details for {}: {:?}",
1861 instrument_id,
1862 e
1863 );
1864 return;
1865 }
1866
1867 if let Some(instrument) = instrument_provider.find(&instrument_id) {
1868 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1869 request_id,
1870 client_id,
1871 instrument_id,
1872 instrument,
1873 start_nanos,
1874 end_nanos,
1875 clock.get_time_ns(),
1876 params,
1877 )));
1878
1879 if let Err(e) = data_sender.send(DataEvent::Response(response)) {
1880 tracing::error!("Failed to send instrument response: {e}");
1881 }
1882 }
1883 });
1884
1885 return Ok(());
1887 } else {
1888 self.instrument_provider
1890 .find(&cmd.instrument_id)
1891 .context(format!(
1892 "Instrument {} not found in provider",
1893 cmd.instrument_id
1894 ))?
1895 };
1896
1897 let start_nanos = cmd.start.map(|dt| {
1898 UnixNanos::from(
1899 dt.timestamp_nanos_opt()
1900 .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
1901 )
1902 });
1903 let end_nanos = cmd.end.map(|dt| {
1904 UnixNanos::from(
1905 dt.timestamp_nanos_opt()
1906 .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
1907 )
1908 });
1909
1910 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1911 cmd.request_id,
1912 cmd.client_id.unwrap_or(self.client_id),
1913 cmd.instrument_id,
1914 instrument,
1915 start_nanos,
1916 end_nanos,
1917 self.clock.get_time_ns(),
1918 cmd.params,
1919 )));
1920
1921 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
1922 tracing::error!("Failed to send instrument response: {e}");
1923 }
1924
1925 Ok(())
1926 }
1927
1928 fn request_instruments(&self, cmd: RequestInstruments) -> anyhow::Result<()> {
1929 tracing::debug!("Requesting all instruments for venue: {:?}", cmd.venue);
1930
1931 let client = self
1932 .ib_client
1933 .as_ref()
1934 .context("IB client not connected. Call connect() first")?;
1935
1936 let force_update = cmd
1938 .params
1939 .as_ref()
1940 .and_then(|params| params.get_str("force_instrument_update"))
1941 .is_some_and(|s| s == "true" || s == "True" || s == "1");
1942
1943 let mut contracts_to_load: Vec<ibapi::contracts::Contract> = Vec::new();
1945
1946 if let Some(params) = &cmd.params
1947 && let Some(ib_contracts_json_str) = params.get_str("ib_contracts")
1948 {
1949 match crate::common::contracts::parse_contracts_from_json_array(ib_contracts_json_str) {
1951 Ok(contracts) => {
1952 tracing::info!(
1953 "Parsed {} contracts from ib_contracts JSON",
1954 contracts.len()
1955 );
1956 log::debug!("Parsed ib_contracts payload: {}", ib_contracts_json_str);
1957 contracts_to_load = contracts;
1958 }
1959 Err(e) => {
1960 tracing::warn!(
1961 "Failed to parse ib_contracts JSON: {}. Continuing without contracts",
1962 e
1963 );
1964 }
1965 }
1966 }
1967
1968 let instrument_provider = Arc::clone(&self.instrument_provider);
1970 let client_clone = client.as_arc().clone();
1971 let data_sender = self.data_sender.clone();
1972 let clock = self.clock;
1973 let request_id = cmd.request_id;
1974 let client_id = cmd.client_id.unwrap_or(self.client_id);
1975 let venue = cmd.venue.unwrap_or(*IB_VENUE);
1976 let params = cmd.params.clone();
1977 let start_nanos = cmd.start.map(|dt| {
1978 UnixNanos::from(
1979 dt.timestamp_nanos_opt()
1980 .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
1981 )
1982 });
1983 let end_nanos = cmd.end.map(|dt| {
1984 UnixNanos::from(
1985 dt.timestamp_nanos_opt()
1986 .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
1987 )
1988 });
1989
1990 if !contracts_to_load.is_empty() || force_update {
1992 let contracts_to_load_clone = contracts_to_load;
1993
1994 get_runtime().spawn(async move {
1995 let mut loaded_instrument_ids = Vec::new();
1996
1997 if !contracts_to_load_clone.is_empty() {
1999 for contract in contracts_to_load_clone {
2000 log::debug!(
2001 "Loading instrument from IB contract spec (sec_type={:?}, symbol={}, local_symbol={}, exchange={}, expiry={})",
2002 contract.security_type,
2003 contract.symbol.as_str(),
2004 contract.local_symbol.as_str(),
2005 contract.exchange.as_str(),
2006 contract.last_trade_date_or_contract_month.as_str()
2007 );
2008 if let Ok(instrument_id) =
2010 crate::common::parse::ib_contract_to_instrument_id_simple(&contract)
2011 {
2012 if instrument_provider.find(&instrument_id).is_none() {
2013 if let Err(e) = instrument_provider
2014 .fetch_contract_details(
2015 &client_clone,
2016 instrument_id,
2017 false,
2018 None,
2019 )
2020 .await
2021 {
2022 tracing::warn!(
2023 "Failed to load contract for {}: {}",
2024 instrument_id,
2025 e
2026 );
2027 } else {
2028 loaded_instrument_ids.push(instrument_id);
2029 }
2030 } else {
2031 loaded_instrument_ids.push(instrument_id);
2032 }
2033 }
2034 }
2035 }
2036
2037 if force_update {
2039 let all_instrument_ids: Vec<InstrumentId> = instrument_provider
2040 .get_all()
2041 .into_iter()
2042 .map(|inst| inst.id())
2043 .collect();
2044
2045 if !all_instrument_ids.is_empty()
2046 && let Ok(mut reloaded_ids) = instrument_provider
2047 .batch_load(&client_clone, all_instrument_ids, None)
2048 .await
2049 {
2050 loaded_instrument_ids.append(&mut reloaded_ids);
2051 }
2052 }
2053
2054 let instruments = instrument_provider.get_all();
2056
2057 let response = DataResponse::Instruments(InstrumentsResponse::new(
2058 request_id,
2059 client_id,
2060 venue,
2061 instruments,
2062 start_nanos,
2063 end_nanos,
2064 clock.get_time_ns(),
2065 params,
2066 ));
2067
2068 if let Err(e) = data_sender.send(DataEvent::Response(response)) {
2069 tracing::error!("Failed to send instruments response: {e}");
2070 } else {
2071 tracing::info!(
2072 "Successfully sent {} instruments response (loaded {} new instruments)",
2073 instrument_provider.count(),
2074 loaded_instrument_ids.len()
2075 );
2076 }
2077 });
2078 } else {
2079 let instruments = self.instrument_provider.get_all();
2081
2082 let response = DataResponse::Instruments(InstrumentsResponse::new(
2083 cmd.request_id,
2084 cmd.client_id.unwrap_or(self.client_id),
2085 venue,
2086 instruments,
2087 start_nanos,
2088 end_nanos,
2089 self.clock.get_time_ns(),
2090 cmd.params,
2091 ));
2092
2093 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
2094 tracing::error!("Failed to send instruments response: {e}");
2095 } else {
2096 tracing::info!(
2097 "Successfully sent {} instruments response",
2098 self.instrument_provider.count()
2099 );
2100 }
2101 }
2102
2103 Ok(())
2104 }
2105
2106 fn request_quotes(&self, cmd: RequestQuotes) -> anyhow::Result<()> {
2107 tracing::debug!("Requesting quotes for {}", cmd.instrument_id);
2108
2109 let client = self
2110 .ib_client
2111 .as_ref()
2112 .context("IB client not connected. Call connect() first")?;
2113
2114 let instrument = self
2116 .instrument_provider
2117 .find(&cmd.instrument_id)
2118 .context(format!(
2119 "Instrument {} not found in provider",
2120 cmd.instrument_id
2121 ))?;
2122
2123 let price_precision = instrument.price_precision();
2124 let size_precision = instrument.size_precision();
2125
2126 let contract = self
2128 .instrument_provider
2129 .resolve_contract_for_instrument(cmd.instrument_id)
2130 .context("Failed to convert instrument_id to IB contract")?;
2131
2132 let number_of_ticks = cmd.limit.map_or(1000, |l| l.get() as i32).min(1000);
2134
2135 let instrument_id = cmd.instrument_id;
2136 let data_sender = self.data_sender.clone();
2137 let clock = self.clock;
2138 let request_id = cmd.request_id;
2139 let client_id = cmd.client_id.unwrap_or(self.client_id);
2140 let params = cmd.params.clone();
2141 let start_nanos = cmd.start.map(|dt| {
2142 UnixNanos::from(
2143 dt.timestamp_nanos_opt()
2144 .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
2145 )
2146 });
2147 let end_nanos = cmd.end.map(|dt| {
2148 UnixNanos::from(
2149 dt.timestamp_nanos_opt()
2150 .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
2151 )
2152 });
2153
2154 let client_clone = client.as_arc().clone();
2156 let limit = cmd.limit.map_or(1000, |l| l.get());
2157 let start_nanos_clone = start_nanos;
2158 let end_nanos_clone = end_nanos;
2159 let cmd_start = cmd.start;
2160 let cmd_end = cmd.end;
2161
2162 get_runtime().spawn(async move {
2163 let mut all_quotes = Vec::new();
2164 let mut current_end_date = cmd_end;
2166 if current_end_date.is_none() {
2167 current_end_date = Some(chrono::Utc::now());
2168 }
2169 let current_start_date = cmd_start;
2170
2171 loop {
2173 let should_continue =
2174 if let (Some(start), Some(end)) = (current_start_date, current_end_date) {
2175 end > start
2176 } else {
2177 false
2178 };
2179
2180 if !should_continue && all_quotes.len() >= limit {
2181 break;
2182 }
2183
2184 let current_end_ib = current_end_date.as_ref().map(chrono_to_ib_datetime);
2185
2186 match client_clone
2188 .historical_ticks_bid_ask(
2189 &contract,
2190 current_start_date.as_ref().map(chrono_to_ib_datetime),
2191 current_end_ib,
2192 number_of_ticks,
2193 ibapi::market_data::TradingHours::Regular,
2194 false, )
2196 .await
2197 {
2198 Ok(mut subscription) => {
2199 let mut batch_quotes = Vec::new();
2200
2201 while let Some(tick) = subscription.next().await {
2202 let ts_event =
2203 super::convert::ib_timestamp_to_unix_nanos(&tick.timestamp);
2204 let ts_init = clock.get_time_ns();
2205
2206 match super::parse::parse_quote_tick(
2207 instrument_id,
2208 Some(tick.price_bid),
2209 Some(tick.price_ask),
2210 Some(tick.size_bid as f64),
2211 Some(tick.size_ask as f64),
2212 price_precision,
2213 size_precision,
2214 ts_event,
2215 ts_init,
2216 ) {
2217 Ok(quote_tick) => batch_quotes.push(quote_tick),
2218 Err(e) => {
2219 tracing::warn!("Failed to parse quote tick: {:?}", e);
2220 }
2221 }
2222 }
2223
2224 if batch_quotes.is_empty() {
2225 break;
2226 }
2227
2228 if let Some(min_tick) = batch_quotes.iter().min_by_key(|t| t.ts_init) {
2231 let min_ts_nanos = min_tick.ts_init.as_u64();
2232 let min_ts_seconds = (min_ts_nanos / 1_000_000_000) as i64;
2234 let min_ts_nanos_remainder = (min_ts_nanos % 1_000_000_000) as u32;
2235 current_end_date = chrono::DateTime::from_timestamp(
2236 min_ts_seconds,
2237 min_ts_nanos_remainder,
2238 );
2239 }
2240
2241 all_quotes.extend(batch_quotes);
2242
2243 if let (Some(start_dt), Some(end_dt)) =
2245 (current_start_date, current_end_date)
2246 && end_dt <= start_dt
2247 {
2248 if let Some(end_limit) = end_nanos_clone {
2250 all_quotes.retain(|q| q.ts_init <= end_limit);
2251 }
2252
2253 if let Some(start_limit) = start_nanos_clone {
2254 all_quotes.retain(|q| q.ts_init >= start_limit);
2255 }
2256 break;
2257 }
2258
2259 if all_quotes.len() >= limit {
2261 break;
2262 }
2263 }
2264 Err(e) => {
2265 tracing::error!(
2266 "Historical quotes request failed for {}: {:?}",
2267 instrument_id,
2268 e
2269 );
2270 break;
2271 }
2272 }
2273 }
2274
2275 if let Some(end_limit) = end_nanos_clone {
2277 all_quotes.retain(|q| q.ts_init <= end_limit);
2278 }
2279
2280 all_quotes.sort_by_key(|q| q.ts_init);
2282
2283 let quotes_count = all_quotes.len();
2284 let response = DataResponse::Quotes(QuotesResponse::new(
2285 request_id,
2286 client_id,
2287 instrument_id,
2288 all_quotes,
2289 start_nanos_clone,
2290 end_nanos_clone,
2291 clock.get_time_ns(),
2292 params,
2293 ));
2294
2295 if let Err(e) = data_sender.send(DataEvent::Response(response)) {
2296 tracing::error!("Failed to send quotes response: {e}");
2297 } else {
2298 tracing::info!(
2299 "Successfully sent {} quotes for {}",
2300 quotes_count,
2301 instrument_id
2302 );
2303 }
2304 });
2305
2306 Ok(())
2307 }
2308
2309 fn request_trades(&self, cmd: RequestTrades) -> anyhow::Result<()> {
2310 tracing::debug!("Requesting trades for {}", cmd.instrument_id);
2311
2312 let client = self
2313 .ib_client
2314 .as_ref()
2315 .context("IB client not connected. Call connect() first")?;
2316
2317 let instrument = self
2319 .instrument_provider
2320 .find(&cmd.instrument_id)
2321 .context(format!(
2322 "Instrument {} not found in provider",
2323 cmd.instrument_id
2324 ))?;
2325
2326 if matches!(instrument, InstrumentAny::CurrencyPair(_)) {
2328 tracing::error!(
2329 "Interactive Brokers does not support trades for CurrencyPair instruments: {}",
2330 cmd.instrument_id
2331 );
2332 return Ok(());
2333 }
2334
2335 let price_precision = instrument.price_precision();
2336 let size_precision = instrument.size_precision();
2337
2338 let contract = self
2340 .instrument_provider
2341 .resolve_contract_for_instrument(cmd.instrument_id)
2342 .context("Failed to convert instrument_id to IB contract")?;
2343
2344 let number_of_ticks = cmd.limit.map_or(1000, |l| l.get() as i32).min(1000);
2346
2347 let instrument_id = cmd.instrument_id;
2348 let data_sender = self.data_sender.clone();
2349 let clock = self.clock;
2350 let request_id = cmd.request_id;
2351 let client_id = cmd.client_id.unwrap_or(self.client_id);
2352 let params = cmd.params.clone();
2353 let start_nanos = cmd.start.map(|dt| {
2354 UnixNanos::from(
2355 dt.timestamp_nanos_opt()
2356 .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
2357 )
2358 });
2359 let end_nanos = cmd.end.map(|dt| {
2360 UnixNanos::from(
2361 dt.timestamp_nanos_opt()
2362 .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
2363 )
2364 });
2365
2366 let client_clone = client.as_arc().clone();
2368 let limit = cmd.limit.map_or(1000, |l| l.get());
2369 let start_nanos_clone = start_nanos;
2370 let end_nanos_clone = end_nanos;
2371 let cmd_start = cmd.start;
2372 let cmd_end = cmd.end;
2373
2374 get_runtime().spawn(async move {
2375 let mut all_trades = Vec::new();
2376 let mut current_end_date = cmd_end;
2378 if current_end_date.is_none() {
2379 current_end_date = Some(chrono::Utc::now());
2380 }
2381 let current_start_date = cmd_start;
2382
2383 loop {
2385 let should_continue =
2386 if let (Some(start), Some(end)) = (current_start_date, current_end_date) {
2387 end > start
2388 } else {
2389 false
2390 };
2391
2392 if !should_continue && all_trades.len() >= limit {
2393 break;
2394 }
2395
2396 let current_end_ib = current_end_date.as_ref().map(chrono_to_ib_datetime);
2397
2398 match client_clone
2400 .historical_ticks_trade(
2401 &contract,
2402 current_start_date.as_ref().map(chrono_to_ib_datetime),
2403 current_end_ib,
2404 number_of_ticks,
2405 ibapi::market_data::TradingHours::Regular,
2406 )
2407 .await
2408 {
2409 Ok(mut subscription) => {
2410 let mut batch_trades = Vec::new();
2411
2412 while let Some(tick) = subscription.next().await {
2413 let ts_event =
2414 super::convert::ib_timestamp_to_unix_nanos(&tick.timestamp);
2415 let ts_init = clock.get_time_ns();
2416
2417 let trade_id = None;
2419
2420 match super::parse::parse_trade_tick(
2421 instrument_id,
2422 tick.price,
2423 tick.size as f64,
2424 price_precision,
2425 size_precision,
2426 ts_event,
2427 ts_init,
2428 trade_id,
2429 ) {
2430 Ok(trade_tick) => batch_trades.push(trade_tick),
2431 Err(e) => {
2432 tracing::warn!("Failed to parse trade tick: {:?}", e);
2433 }
2434 }
2435 }
2436
2437 if batch_trades.is_empty() {
2438 break;
2439 }
2440
2441 if let Some(min_tick) = batch_trades.iter().min_by_key(|t| t.ts_init) {
2444 let min_ts_nanos = min_tick.ts_init.as_u64();
2445 let min_ts_seconds = (min_ts_nanos / 1_000_000_000) as i64;
2447 let min_ts_nanos_remainder = (min_ts_nanos % 1_000_000_000) as u32;
2448 current_end_date = chrono::DateTime::from_timestamp(
2449 min_ts_seconds,
2450 min_ts_nanos_remainder,
2451 );
2452 }
2453
2454 all_trades.extend(batch_trades);
2455
2456 if let (Some(start_dt), Some(end_dt)) =
2458 (current_start_date, current_end_date)
2459 && end_dt <= start_dt
2460 {
2461 if let Some(end_limit) = end_nanos_clone {
2463 all_trades.retain(|t| t.ts_init <= end_limit);
2464 }
2465
2466 if let Some(start_limit) = start_nanos_clone {
2467 all_trades.retain(|t| t.ts_init >= start_limit);
2468 }
2469 break;
2470 }
2471
2472 if all_trades.len() >= limit {
2474 break;
2475 }
2476 }
2477 Err(e) => {
2478 tracing::error!(
2479 "Historical trades request failed for {}: {:?}",
2480 instrument_id,
2481 e
2482 );
2483 break;
2484 }
2485 }
2486 }
2487
2488 if let Some(end_limit) = end_nanos_clone {
2490 all_trades.retain(|t| t.ts_init <= end_limit);
2491 }
2492
2493 all_trades.sort_by_key(|t| t.ts_init);
2495
2496 let trades_count = all_trades.len();
2497 let response = DataResponse::Trades(TradesResponse::new(
2498 request_id,
2499 client_id,
2500 instrument_id,
2501 all_trades,
2502 start_nanos_clone,
2503 end_nanos_clone,
2504 clock.get_time_ns(),
2505 params,
2506 ));
2507
2508 if let Err(e) = data_sender.send(DataEvent::Response(response)) {
2509 tracing::error!("Failed to send trades response: {e}");
2510 } else {
2511 tracing::info!(
2512 "Successfully sent {} trades for {}",
2513 trades_count,
2514 instrument_id
2515 );
2516 }
2517 });
2518
2519 Ok(())
2520 }
2521
2522 fn request_bars(&self, cmd: RequestBars) -> anyhow::Result<()> {
2523 tracing::debug!("Requesting bars for {}", cmd.bar_type);
2524
2525 if !cmd.bar_type.spec().is_time_aggregated() {
2527 tracing::error!(
2528 "Cannot request {} bars: only time bars are aggregated by Interactive Brokers",
2529 cmd.bar_type
2530 );
2531 return Ok(());
2532 }
2533
2534 let client = self
2535 .ib_client
2536 .as_ref()
2537 .context("IB client not connected. Call connect() first")?;
2538
2539 let instrument_id = cmd.bar_type.instrument_id();
2541 let instrument = self
2542 .instrument_provider
2543 .find(&instrument_id)
2544 .context(format!("Instrument {instrument_id} not found in provider"))?;
2545
2546 let price_precision = instrument.price_precision();
2547 let size_precision = instrument.size_precision();
2548
2549 let contract = self
2551 .instrument_provider
2552 .resolve_contract_for_instrument(instrument_id)
2553 .context("Failed to convert instrument_id to IB contract")?;
2554
2555 let ib_bar_size = bar_type_to_ib_bar_size(&cmd.bar_type)
2557 .context("Failed to convert bar type to IB bar size")?;
2558 let ib_what_to_show = price_type_to_ib_what_to_show(cmd.bar_type.spec().price_type);
2559
2560 let segments = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
2562 calculate_duration_segments(start, end)
2563 } else {
2564 let end_date = cmd.end.unwrap_or_else(chrono::Utc::now);
2565 let duration = calculate_duration(cmd.start, cmd.end).unwrap_or_else(|_| 1i32.days());
2566 vec![(end_date, duration)]
2567 };
2568
2569 let bar_type = cmd.bar_type;
2570 let data_sender = self.data_sender.clone();
2571 let clock = self.clock;
2572 let request_id = cmd.request_id;
2573 let client_id = cmd.client_id.unwrap_or(self.client_id);
2574 let params = cmd.params.clone();
2575 let start_nanos = cmd.start.map(|dt| {
2576 UnixNanos::from(
2577 dt.timestamp_nanos_opt()
2578 .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
2579 )
2580 });
2581 let end_nanos = cmd.end.map(|dt| {
2582 UnixNanos::from(
2583 dt.timestamp_nanos_opt()
2584 .unwrap_or_else(|| dt.timestamp() * 1_000_000_000) as u64,
2585 )
2586 });
2587
2588 let client_clone = client.as_arc().clone();
2590
2591 get_runtime().spawn(async move {
2592 let mut all_bars = Vec::new();
2593
2594 for (seg_end, seg_duration) in segments {
2595 let end_ib = chrono_to_ib_datetime(&seg_end);
2596
2597 match client_clone
2598 .historical_data(
2599 &contract,
2600 Some(end_ib),
2601 seg_duration,
2602 ib_bar_size,
2603 Some(ib_what_to_show),
2604 ibapi::market_data::TradingHours::Regular,
2605 )
2606 .await
2607 {
2608 Ok(historical_data) => {
2609 for ib_bar in &historical_data.bars {
2611 match ib_bar_to_nautilus_bar(
2612 ib_bar,
2613 bar_type,
2614 price_precision,
2615 size_precision,
2616 ) {
2617 Ok(bar) => all_bars.push(bar),
2618 Err(e) => {
2619 tracing::warn!(
2620 "Failed to convert IB bar to Nautilus bar: {:?}",
2621 e
2622 );
2623 }
2624 }
2625 }
2626 }
2627 Err(e) => {
2628 tracing::error!(
2629 "Historical data request failed for {} segment: {:?}",
2630 bar_type,
2631 e
2632 );
2633 }
2636 }
2637 }
2638
2639 let bars_count = all_bars.len();
2641 if bars_count == 0 {
2642 tracing::warn!("No bar data received for {}", bar_type);
2643 }
2644
2645 all_bars.sort_by_key(|b| b.ts_event);
2647
2648 let response = DataResponse::Bars(BarsResponse::new(
2649 request_id,
2650 client_id,
2651 bar_type,
2652 all_bars,
2653 start_nanos,
2654 end_nanos,
2655 clock.get_time_ns(),
2656 params,
2657 ));
2658
2659 if let Err(e) = data_sender.send(DataEvent::Response(response)) {
2660 tracing::error!("Failed to send bars response: {e}");
2661 } else {
2662 tracing::info!(
2663 "Successfully sent {} bars for {} (segmented)",
2664 bars_count,
2665 bar_type
2666 );
2667 }
2668 });
2669
2670 Ok(())
2671 }
2672}
2673
2674impl Drop for InteractiveBrokersDataClient {
2675 fn drop(&mut self) {
2676 let _ = self.stop();
2677 }
2678}