1use std::{collections::HashMap, fs, path::Path, str::FromStr, sync::Arc};
19
20use anyhow::Context;
21use chrono::{DateTime, Utc};
22use dashmap::DashMap;
23use ibapi::contracts::{ComboLegOpenClose, Contract, Exchange, SecurityType, Symbol};
24#[cfg(test)]
25use nautilus_model::instruments::Instrument;
26use nautilus_model::{
27 identifiers::{InstrumentId, Venue},
28 instruments::InstrumentAny,
29};
30use serde::{Deserialize, Serialize};
31
32use crate::{
33 common::parse::{
34 determine_venue_from_contract, instrument_id_to_ib_contract, is_spread_instrument_id,
35 parse_spread_instrument_id_to_legs, possible_exchanges_for_venue,
36 },
37 config::InteractiveBrokersInstrumentProviderConfig,
38 providers::parse::{
39 create_spread_instrument_id, parse_ib_contract_to_instrument, parse_spread_instrument_any,
40 },
41};
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45struct InstrumentCache {
46 cache_timestamp: DateTime<Utc>,
48 contract_id_to_instrument_id: Vec<(i32, String)>,
50 price_magnifiers: Vec<(String, i32)>,
52 instruments: Vec<(String, String)>, }
55
56#[cfg_attr(
61 feature = "python",
62 pyo3::pyclass(
63 module = "nautilus_trader.core.nautilus_pyo3.interactive_brokers",
64 unsendable,
65 from_py_object
66 )
67)]
68#[derive(Debug, Clone)]
69pub struct InteractiveBrokersInstrumentProvider {
70 config: InteractiveBrokersInstrumentProviderConfig,
72 contract_id_to_instrument_id: Arc<DashMap<i32, InstrumentId>>,
74 instruments: Arc<DashMap<InstrumentId, InstrumentAny>>,
76 contract_details: Arc<DashMap<InstrumentId, ibapi::contracts::ContractDetails>>,
78 contracts: Arc<DashMap<InstrumentId, Contract>>,
80 price_magnifiers: Arc<DashMap<InstrumentId, i32>>,
82}
83
84impl InteractiveBrokersInstrumentProvider {
85 pub fn new(config: InteractiveBrokersInstrumentProviderConfig) -> Self {
91 Self {
92 config,
93 contract_id_to_instrument_id: Arc::new(DashMap::new()),
94 instruments: Arc::new(DashMap::new()),
95 contract_details: Arc::new(DashMap::new()),
96 contracts: Arc::new(DashMap::new()),
97 price_magnifiers: Arc::new(DashMap::new()),
98 }
99 }
100
101 #[cfg(test)]
102 pub(crate) fn insert_test_instrument(
103 &self,
104 instrument: InstrumentAny,
105 contract_id: i32,
106 price_magnifier: i32,
107 ) {
108 let instrument_id = instrument.id();
109 self.instruments.insert(instrument_id, instrument);
110 self.contract_id_to_instrument_id
111 .insert(contract_id, instrument_id);
112 self.contracts.insert(
113 instrument_id,
114 Contract {
115 contract_id,
116 ..Default::default()
117 },
118 );
119 self.price_magnifiers.insert(instrument_id, price_magnifier);
120 }
121
122 pub async fn initialize(&self) -> anyhow::Result<()> {
131 if let Some(ref cache_path) = self.config.cache_path {
132 match self.load_cache(cache_path).await {
133 Ok(cache_loaded) => {
134 if cache_loaded {
135 tracing::info!(
136 "Initialized provider with {} instruments from cache",
137 self.count()
138 );
139 } else {
140 tracing::debug!(
141 "Cache file not found or expired, starting with empty cache"
142 );
143 }
144 }
145 Err(e) => {
146 tracing::warn!("Failed to load cache during initialization: {}", e);
147 }
148 }
149 }
150 Ok(())
151 }
152
153 pub fn determine_venue(
166 &self,
167 contract: &Contract,
168 contract_details: Option<&ibapi::contracts::ContractDetails>,
169 ) -> Venue {
170 let valid_exchanges = contract_details.map(|details| details.valid_exchanges.join(","));
171 let venue_str = determine_venue_from_contract(
172 contract,
173 &self.config.symbol_to_mic_venue,
174 self.config.convert_exchange_to_mic_venue,
175 valid_exchanges.as_deref(),
176 );
177 Venue::from(venue_str.as_str())
178 }
179
180 pub fn symbology_method(&self) -> crate::config::SymbologyMethod {
182 self.config.symbology_method
183 }
184
185 #[must_use]
195 pub fn find(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
196 self.instruments
197 .get(instrument_id)
198 .map(|entry| entry.value().clone())
199 }
200
201 #[must_use]
211 pub fn find_by_contract_id(&self, contract_id: i32) -> Option<InstrumentAny> {
212 self.contract_id_to_instrument_id
213 .get(&contract_id)
214 .and_then(|entry| self.find(entry.value()))
215 }
216
217 #[must_use]
227 pub fn get_instrument_id_by_contract_id(&self, contract_id: i32) -> Option<InstrumentId> {
228 self.contract_id_to_instrument_id
229 .get(&contract_id)
230 .map(|entry| *entry.value())
231 }
232
233 #[must_use]
243 pub fn is_filtered_sec_type(&self, sec_type: &str) -> bool {
244 self.config.filter_sec_types.contains(sec_type)
245 }
246
247 #[must_use]
253 pub fn get_all(&self) -> Vec<InstrumentAny> {
254 self.instruments
255 .iter()
256 .map(|entry| entry.value().clone())
257 .collect()
258 }
259
260 #[must_use]
266 pub fn count(&self) -> usize {
267 self.instruments.len()
268 }
269
270 #[must_use]
287 pub fn get_price_magnifier(&self, instrument_id: &InstrumentId) -> i32 {
288 if let Some(magnifier) = self.price_magnifiers.get(instrument_id) {
290 return normalize_price_magnifier(*magnifier.value());
291 }
292
293 if let Some(details) = self.contract_details.get(instrument_id) {
295 let magnifier = normalize_price_magnifier(details.value().price_magnifier);
296 self.price_magnifiers.insert(*instrument_id, magnifier);
298 return magnifier;
299 }
300
301 if self.instruments.contains_key(instrument_id) {
303 tracing::debug!(
304 "Price magnifier not found for instrument {} (has instrument but no contract details), using default 1",
305 instrument_id
306 );
307 } else {
308 tracing::trace!(
309 "Price magnifier not found for instrument {} (instrument not loaded), using default 1",
310 instrument_id
311 );
312 }
313
314 1
316 }
317
318 pub async fn get_instrument(
336 &self,
337 client: &ibapi::Client,
338 contract: &Contract,
339 ) -> anyhow::Result<Option<InstrumentAny>> {
340 log::debug!(
341 "IB get_instrument request sec_type={:?} con_id={} symbol={} local_symbol={} exchange={} expiry={}",
342 contract.security_type,
343 contract.contract_id,
344 contract.symbol.as_str(),
345 contract.local_symbol.as_str(),
346 contract.exchange.as_str(),
347 contract.last_trade_date_or_contract_month.as_str()
348 );
349 let sec_type_str = format!("{:?}", contract.security_type);
351 if self.is_filtered_sec_type(&sec_type_str) {
352 tracing::warn!(
353 "Skipping filtered security type {} for contract",
354 sec_type_str
355 );
356 return Ok(None);
357 }
358
359 let contract_id = contract.contract_id;
360
361 if let Some(cached_instrument_id) = self.contract_id_to_instrument_id.get(&contract_id) {
363 log::debug!(
364 "IB get_instrument cache hit for contract_id={} -> {}",
365 contract_id,
366 cached_instrument_id.value()
367 );
368
369 if let Some(instrument) = self.find(cached_instrument_id.value()) {
370 return Ok(Some(instrument));
371 }
372 }
373
374 if contract.security_type == SecurityType::Spread && !contract.combo_legs.is_empty() {
376 self.fetch_bag_contract(client, contract).await?;
378
379 if let Some(spread_instrument_id) = self.contract_id_to_instrument_id.get(&contract_id)
381 {
382 return Ok(self.find(spread_instrument_id.value()));
383 }
384 }
385
386 let details_vec = client
388 .contract_details(contract)
389 .await
390 .context("Failed to fetch contract details from IB")?;
391
392 log::debug!(
393 "IB get_instrument received {} contract details for sec_type={:?} symbol={} local_symbol={}",
394 details_vec.len(),
395 contract.security_type,
396 contract.symbol.as_str(),
397 contract.local_symbol.as_str()
398 );
399
400 if details_vec.is_empty() {
401 tracing::warn!("No contract details returned for contract {}", contract_id);
402 return Ok(None);
403 }
404
405 let details = &details_vec[0];
406 log::debug!(
407 "IB get_instrument using first detail sec_type={:?} con_id={} local_symbol={} exchange={} under_con_id={}",
408 details.contract.security_type,
409 details.contract.contract_id,
410 details.contract.local_symbol.as_str(),
411 details.contract.exchange.as_str(),
412 details.under_contract_id
413 );
414 let venue = self.determine_venue(&details.contract, Some(details));
415 let instrument_id = match self.config.symbology_method {
416 crate::config::SymbologyMethod::Simplified => {
417 crate::common::parse::ib_contract_to_instrument_id_simplified(
418 &details.contract,
419 Some(venue),
420 )
421 }
422 crate::config::SymbologyMethod::Raw => {
423 crate::common::parse::ib_contract_to_instrument_id_raw(
424 &details.contract,
425 Some(venue),
426 )
427 }
428 }
429 .context("Failed to convert contract to instrument ID")?;
430
431 log::debug!(
432 "IB get_instrument mapped to instrument_id={}",
433 instrument_id
434 );
435
436 let instrument = parse_ib_contract_to_instrument(details, instrument_id)
438 .context("Failed to parse instrument")?;
439
440 self.instruments.insert(instrument_id, instrument.clone());
441 self.contract_details.insert(instrument_id, details.clone());
442 self.contracts
443 .insert(instrument_id, details.contract.clone());
444 self.contract_id_to_instrument_id
445 .insert(details.contract.contract_id, instrument_id);
446 self.price_magnifiers
447 .insert(instrument_id, details.price_magnifier);
448
449 Ok(Some(instrument))
450 }
451
452 #[must_use]
464 pub fn instrument_id_to_ib_contract_details(
465 &self,
466 instrument_id: &InstrumentId,
467 ) -> Option<ibapi::contracts::ContractDetails> {
468 self.contract_details
469 .get(instrument_id)
470 .map(|entry| entry.value().clone())
471 }
472
473 #[must_use]
474 pub fn instrument_id_to_ib_contract(&self, instrument_id: &InstrumentId) -> Option<Contract> {
475 self.contracts
476 .get(instrument_id)
477 .map(|entry| entry.value().clone())
478 }
479
480 pub fn resolve_contract_for_instrument(
481 &self,
482 instrument_id: InstrumentId,
483 ) -> anyhow::Result<Contract> {
484 if let Some(contract) = self.instrument_id_to_ib_contract(&instrument_id) {
485 return Ok(contract);
486 }
487
488 if let Some(details) = self.instrument_id_to_ib_contract_details(&instrument_id) {
489 return Ok(details.contract);
490 }
491
492 instrument_id_to_ib_contract(instrument_id, None)
493 }
494
495 pub async fn load_async(
509 &self,
510 client: &ibapi::Client,
511 instrument_id: InstrumentId,
512 filters: Option<HashMap<String, String>>,
513 ) -> anyhow::Result<()> {
514 let filters: Option<HashMap<String, String>> = filters;
515 let force_instrument_update = filters
516 .as_ref()
517 .and_then(|f| f.get("force_instrument_update"))
518 .map(|v| v == "true")
519 .unwrap_or(false);
520
521 self.fetch_contract_details(client, instrument_id, force_instrument_update, filters)
522 .await
523 }
524
525 pub async fn load_with_return_async(
543 &self,
544 client: &ibapi::Client,
545 instrument_id: InstrumentId,
546 filters: Option<HashMap<String, String>>,
547 ) -> anyhow::Result<Option<InstrumentId>> {
548 let filters: Option<HashMap<String, String>> = filters;
549 let force_instrument_update = filters
550 .as_ref()
551 .and_then(|f| f.get("force_instrument_update"))
552 .map(|v| v == "true")
553 .unwrap_or(false);
554
555 if is_spread_instrument_id(&instrument_id) {
556 self.fetch_spread_instrument(client, instrument_id, force_instrument_update, filters)
557 .await?;
558 } else {
559 self.fetch_contract_details(client, instrument_id, force_instrument_update, filters)
560 .await?;
561 }
562
563 if self.instruments.contains_key(&instrument_id) {
564 Ok(Some(instrument_id))
565 } else {
566 Ok(None)
567 }
568 }
569
570 pub async fn load_ids_async(
584 &self,
585 client: &ibapi::Client,
586 instrument_ids: Vec<InstrumentId>,
587 filters: Option<HashMap<String, String>>,
588 ) -> anyhow::Result<()> {
589 let filters: Option<HashMap<String, String>> = filters;
590 let force_instrument_update = filters
591 .as_ref()
592 .and_then(|f| f.get("force_instrument_update"))
593 .map(|v| v == "true")
594 .unwrap_or(false);
595
596 for instrument_id in instrument_ids {
597 let load_result = if is_spread_instrument_id(&instrument_id) {
598 self.fetch_spread_instrument(
599 client,
600 instrument_id,
601 force_instrument_update,
602 filters.clone(),
603 )
604 .await
605 .map(|_| ())
606 } else {
607 self.fetch_contract_details(
608 client,
609 instrument_id,
610 force_instrument_update,
611 filters.clone(),
612 )
613 .await
614 };
615
616 if let Err(e) = load_result {
617 tracing::warn!("Failed to load instrument {}: {}", instrument_id, e);
618 }
619 }
620 Ok(())
621 }
622
623 pub async fn load_ids_with_return_async(
641 &self,
642 client: &ibapi::Client,
643 instrument_ids: Vec<InstrumentId>,
644 filters: Option<HashMap<String, String>>,
645 ) -> anyhow::Result<Vec<InstrumentId>> {
646 let mut loaded_ids = Vec::new();
647
648 let force_instrument_update = filters
649 .as_ref()
650 .and_then(|f| f.get("force_instrument_update"))
651 .map(|v| v == "true")
652 .unwrap_or(false);
653
654 for instrument_id in instrument_ids {
655 let load_result = if is_spread_instrument_id(&instrument_id) {
656 self.fetch_spread_instrument(
657 client,
658 instrument_id,
659 force_instrument_update,
660 filters.clone(),
661 )
662 .await
663 .map(|loaded| loaded.then_some(()))
664 } else {
665 self.fetch_contract_details(
666 client,
667 instrument_id,
668 force_instrument_update,
669 filters.clone(),
670 )
671 .await
672 .map(Some)
673 };
674
675 if load_result.is_ok() {
676 loaded_ids.push(instrument_id);
677 }
678 }
679
680 Ok(loaded_ids)
681 }
682
683 fn create_bag_contract_from_legs(
684 &self,
685 leg_contract_details: &[(ibapi::contracts::ContractDetails, i32)],
686 instrument_id: Option<InstrumentId>,
687 bag_contract: Option<&Contract>,
688 ) -> anyhow::Result<Contract> {
689 if let Some(bag_contract) = bag_contract {
690 return Ok(bag_contract.clone());
691 }
692
693 let (first_details, _) = leg_contract_details
694 .first()
695 .ok_or_else(|| anyhow::anyhow!("Cannot create BAG contract without leg details"))?;
696
697 let combo_legs = leg_contract_details
698 .iter()
699 .map(|(details, ratio)| ibapi::contracts::ComboLeg {
700 contract_id: details.contract.contract_id,
701 ratio: ratio.abs(),
702 action: if *ratio > 0 {
703 "BUY".to_string()
704 } else {
705 "SELL".to_string()
706 },
707 exchange: details.contract.exchange.to_string(),
708 open_close: ComboLegOpenClose::Same,
709 short_sale_slot: 0,
710 designated_location: String::new(),
711 exempt_code: -1,
712 })
713 .collect();
714
715 Ok(Contract {
716 contract_id: 0,
717 symbol: first_details.contract.symbol.clone(),
718 security_type: SecurityType::Spread,
719 exchange: Exchange::from("SMART"),
720 currency: first_details.contract.currency.clone(),
721 local_symbol: instrument_id.map_or_else(String::new, |id| id.symbol.to_string()),
722 combo_legs_description: instrument_id
723 .map(|id| format!("Spread: {}", id.symbol))
724 .unwrap_or_else(|| "Spread".to_string()),
725 combo_legs,
726 ..Default::default()
727 })
728 }
729
730 pub async fn fetch_spread_instrument(
750 &self,
751 client: &ibapi::Client,
752 spread_instrument_id: InstrumentId,
753 force_instrument_update: bool,
754 filters: Option<HashMap<String, String>>,
755 ) -> anyhow::Result<bool> {
756 if !force_instrument_update && self.instruments.contains_key(&spread_instrument_id) {
758 tracing::debug!("Spread instrument {} already cached", spread_instrument_id);
759 return Ok(true);
760 }
761
762 let leg_tuples = parse_spread_instrument_id_to_legs(&spread_instrument_id)
764 .context("Failed to parse spread instrument ID to leg tuples")?;
765
766 if leg_tuples.is_empty() {
767 tracing::error!("Spread instrument {} has no legs", spread_instrument_id);
768 return Ok(false);
769 }
770
771 tracing::info!(
772 "Loading spread instrument {} with {} legs",
773 spread_instrument_id,
774 leg_tuples.len()
775 );
776
777 let mut leg_contract_details = Vec::new();
779
780 for (leg_instrument_id, ratio) in &leg_tuples {
781 tracing::info!(
782 "Loading leg instrument: {} (ratio: {})",
783 leg_instrument_id,
784 ratio
785 );
786
787 self.fetch_contract_details(
789 client,
790 *leg_instrument_id,
791 force_instrument_update,
792 filters.clone(),
793 )
794 .await
795 .with_context(|| format!("Failed to load leg instrument: {}", leg_instrument_id))?;
796
797 let leg_details = self
799 .contract_details
800 .get(leg_instrument_id)
801 .map(|entry| entry.value().clone())
802 .ok_or_else(|| {
803 anyhow::anyhow!(
804 "Leg instrument {} not found in contract details after loading",
805 leg_instrument_id
806 )
807 })?;
808
809 leg_contract_details.push((leg_details, *ratio));
810 }
811
812 let timestamp = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
814 let leg_details_refs: Vec<(&ibapi::contracts::ContractDetails, i32)> =
815 leg_contract_details.iter().map(|(d, r)| (d, *r)).collect();
816
817 let bag_contract = self.create_bag_contract_from_legs(
818 &leg_contract_details,
819 Some(spread_instrument_id),
820 None,
821 )?;
822 let spread_instrument = parse_spread_instrument_any(
823 spread_instrument_id,
824 &leg_details_refs,
825 Some(&bag_contract),
826 Some(timestamp),
827 )
828 .context("Failed to parse spread instrument")?;
829
830 self.instruments
832 .insert(spread_instrument_id, spread_instrument);
833 self.contracts.insert(spread_instrument_id, bag_contract);
834
835 if let Some((first_details, _)) = leg_contract_details.first() {
836 self.price_magnifiers
837 .insert(spread_instrument_id, first_details.price_magnifier);
838 }
839
840 tracing::info!(
841 "Successfully loaded spread instrument {}",
842 spread_instrument_id
843 );
844 Ok(true)
845 }
846
847 pub async fn load_all_async(
864 &self,
865 client: &ibapi::Client,
866 instrument_ids: Option<Vec<InstrumentId>>,
867 contracts: Option<Vec<Contract>>,
868 force_instrument_update: bool,
869 ) -> anyhow::Result<Vec<InstrumentId>> {
870 let mut loaded_ids = Vec::new();
871
872 let ids_to_load =
874 instrument_ids.unwrap_or_else(|| self.config.load_ids.iter().cloned().collect());
875
876 if !ids_to_load.is_empty() {
877 let mut filters = std::collections::HashMap::new();
878
879 if force_instrument_update {
880 filters.insert("force_instrument_update".to_string(), "true".to_string());
881 }
882 let filters = if filters.is_empty() {
883 None
884 } else {
885 Some(filters)
886 };
887
888 let ids_result = self
889 .load_ids_with_return_async(client, ids_to_load, filters)
890 .await
891 .context("Failed to load instruments from IDs")?;
892 loaded_ids.extend(ids_result);
893 }
894
895 let mut contracts_to_load = contracts.unwrap_or_default();
897 if contracts_to_load.is_empty() {
898 for contract_json in &self.config.load_contracts {
899 let contract = crate::common::contracts::parse_contract_from_json(contract_json)
900 .context("Failed to parse contract from config JSON")?;
901 contracts_to_load.push(contract);
902 }
903 }
904
905 if !contracts_to_load.is_empty() {
906 for contract in contracts_to_load {
907 match self.get_instrument(client, &contract).await {
908 Ok(Some(instrument)) => {
909 use nautilus_model::instruments::Instrument;
910 let instrument_id = instrument.id();
911 loaded_ids.push(instrument_id);
912 tracing::debug!("Loaded instrument {} from contract", instrument_id);
913 }
914 Ok(None) => {
915 tracing::warn!("Failed to load instrument from contract: {:?}", contract);
916 }
917 Err(e) => {
918 tracing::warn!(
919 "Error loading instrument from contract {:?}: {}",
920 contract,
921 e
922 );
923 }
924 }
925 }
926 }
927
928 if loaded_ids.is_empty() {
929 tracing::debug!("load_all_async called but no instruments were loaded");
930 } else {
931 tracing::info!("load_all_async loaded {} instruments", loaded_ids.len());
932 }
933
934 Ok(loaded_ids)
935 }
936}
937
938fn normalize_price_magnifier(price_magnifier: i32) -> i32 {
939 if price_magnifier > 0 {
940 price_magnifier
941 } else {
942 1
943 }
944}
945
946impl InteractiveBrokersInstrumentProvider {
947 pub async fn fetch_contract_details(
958 &self,
959 client: &ibapi::Client,
960 instrument_id: InstrumentId,
961 force_instrument_update: bool,
962 filters: Option<HashMap<String, String>>,
963 ) -> anyhow::Result<()> {
964 if !force_instrument_update {
965 if self.instruments.contains_key(&instrument_id) {
966 tracing::debug!(
967 "Instrument {} already cached, skipping fetch",
968 instrument_id
969 );
970 return Ok(());
971 }
972 }
973 let exchange = filters
975 .as_ref()
976 .and_then(|f| f.get("exchange"))
977 .map(|s| s.as_str());
978
979 let exchanges_to_try: Vec<String> = if let Some(exchange) = exchange {
980 vec![exchange.to_string()]
981 } else {
982 possible_exchanges_for_venue(instrument_id.venue.as_str())
983 };
984
985 let mut details_vec = Vec::new();
986 let mut last_error = None;
987
988 for candidate_exchange in exchanges_to_try {
989 let contract = instrument_id_to_ib_contract(instrument_id, Some(candidate_exchange.as_str()))
990 .with_context(|| format!("Failed to convert instrument_id {} to IB contract. Check that the instrument ID format is correct and the venue/symbol are valid.", instrument_id))?;
991
992 match client.contract_details(&contract).await {
993 Ok(result) if !result.is_empty() => {
994 details_vec = result;
995 break;
996 }
997 Ok(_) => {}
998 Err(e) => {
999 last_error = Some((candidate_exchange.clone(), e.to_string()));
1000 }
1001 }
1002 }
1003
1004 if details_vec.is_empty() {
1005 if let Some((candidate_exchange, error)) = last_error {
1006 tracing::warn!(
1007 "Failed to fetch contract details for {} on {}: {}",
1008 instrument_id,
1009 candidate_exchange,
1010 error
1011 );
1012 } else {
1013 tracing::warn!(
1014 "No contract details returned for {} - instrument may not exist in IB or contract specification is incomplete",
1015 instrument_id
1016 );
1017 }
1018 return Ok(());
1019 }
1020
1021 let details = &details_vec[0];
1023
1024 let sec_type_str = format!("{:?}", details.contract.security_type);
1026 if self.is_filtered_sec_type(&sec_type_str) {
1027 tracing::warn!("Skipping filtered security type: {}", sec_type_str);
1028 return Ok(());
1029 }
1030
1031 let parsed_instrument = parse_ib_contract_to_instrument(details, instrument_id)
1033 .context("Failed to parse IB contract to Nautilus instrument")?;
1034
1035 if force_instrument_update || !self.instruments.contains_key(&instrument_id) {
1044 self.instruments.insert(instrument_id, parsed_instrument);
1045 self.contract_details.insert(instrument_id, details.clone());
1046 self.contracts
1047 .insert(instrument_id, details.contract.clone());
1048 self.contract_id_to_instrument_id
1049 .insert(details.contract.contract_id, instrument_id);
1050 self.price_magnifiers
1051 .insert(instrument_id, details.price_magnifier);
1052 }
1053
1054 tracing::info!(
1055 "Successfully loaded instrument: {} (price_magnifier: {})",
1056 instrument_id,
1057 details.price_magnifier
1058 );
1059 Ok(())
1060 }
1061
1062 pub async fn batch_load(
1080 &self,
1081 client: &ibapi::Client,
1082 instrument_ids: Vec<InstrumentId>,
1083 filters: Option<&[String]>,
1084 ) -> anyhow::Result<Vec<InstrumentId>> {
1085 let mut loaded_ids = Vec::new();
1086
1087 let filtered_ids: Vec<InstrumentId> = if let Some(filter_list) = filters {
1089 instrument_ids
1095 .into_iter()
1096 .filter(|instrument_id| {
1097 for filter in filter_list {
1099 if instrument_id
1101 .symbol
1102 .as_str()
1103 .to_lowercase()
1104 .contains(&filter.to_lowercase())
1105 {
1106 return true;
1107 }
1108
1109 if instrument_id.venue.as_str() == filter {
1111 return true;
1112 }
1113
1114 if let Some(contract_details) = self.contract_details.get(instrument_id) {
1116 let sec_type_str =
1117 format!("{:?}", contract_details.contract.security_type);
1118
1119 if sec_type_str.to_uppercase().contains(&filter.to_uppercase()) {
1120 return true;
1121 }
1122 }
1123 }
1124 false
1125 })
1126 .collect()
1127 } else {
1128 instrument_ids
1129 };
1130
1131 let filtered_count = filtered_ids.len();
1133 for instrument_id in filtered_ids {
1134 match self
1135 .fetch_contract_details(client, instrument_id, false, None)
1136 .await
1137 {
1138 Ok(()) => loaded_ids.push(instrument_id),
1139 Err(e) => {
1140 tracing::warn!("Failed to load instrument {}: {}", instrument_id, e);
1141 }
1142 }
1143 }
1144
1145 tracing::info!(
1146 "Batch loaded {} instruments ({} after filtering)",
1147 loaded_ids.len(),
1148 filtered_count
1149 );
1150
1151 if !loaded_ids.is_empty()
1153 && let Some(ref cache_path) = self.config.cache_path
1154 && let Err(e) = self.save_cache(cache_path).await
1155 {
1156 tracing::warn!("Failed to save instrument cache to {}: {}", cache_path, e);
1157 }
1158
1159 Ok(loaded_ids)
1160 }
1161
1162 pub async fn fetch_option_chain_by_range(
1183 &self,
1184 client: &ibapi::Client,
1185 underlying: &Contract,
1186 expiry_min: Option<&str>,
1187 expiry_max: Option<&str>,
1188 ) -> anyhow::Result<usize> {
1189 tracing::info!(
1190 "Building option chain for {}.{} (sec_type={:?}, contract_id={}, expiry_min={:?}, expiry_max={:?}, config_min_days={:?}, config_max_days={:?})",
1191 underlying.symbol.as_str(),
1192 underlying.exchange.as_str(),
1193 underlying.security_type,
1194 underlying.contract_id,
1195 expiry_min,
1196 expiry_max,
1197 self.config.min_expiry_days,
1198 self.config.max_expiry_days,
1199 );
1200
1201 let symbol = underlying.symbol.as_str();
1203 let exchange = underlying.exchange.as_str();
1204
1205 let mut option_chain_stream = client
1206 .option_chain(
1207 symbol,
1208 exchange,
1209 underlying.security_type.clone(),
1210 underlying.contract_id,
1211 )
1212 .await
1213 .context("Failed to request option chain from IB")?;
1214
1215 let mut total_loaded = 0;
1216
1217 let now = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1219
1220 let mut all_expirations = Vec::new();
1222
1223 while let Some(result) = option_chain_stream.next().await {
1224 if let Ok(chain) = result {
1225 tracing::debug!(
1226 "Received option chain metadata exchange={} trading_class={} expirations={} strikes={}",
1227 chain.exchange,
1228 chain.trading_class,
1229 chain.expirations.len(),
1230 chain.strikes.len(),
1231 );
1232
1233 for expiration in &chain.expirations {
1234 let date_filter_pass = match (expiry_min, expiry_max) {
1236 (Some(min), Some(max)) => {
1237 expiration.as_str() >= min && expiration.as_str() <= max
1238 }
1239 (Some(min), None) => expiration.as_str() >= min,
1240 (None, Some(max)) => expiration.as_str() <= max,
1241 (None, None) => true,
1242 };
1243
1244 let days_filter_pass = {
1246 let expiry_ns = crate::providers::parse::expiry_timestring_to_unix_nanos(
1247 expiration.as_str(),
1248 None,
1249 )
1250 .unwrap_or(now);
1251 let days_until_expiry = (expiry_ns.as_u64().saturating_sub(now.as_u64()))
1252 / (24 * 60 * 60 * 1_000_000_000);
1253
1254 let min_days_ok = self
1255 .config
1256 .min_expiry_days
1257 .is_none_or(|min| days_until_expiry >= min as u64);
1258 let max_days_ok = self
1259 .config
1260 .max_expiry_days
1261 .is_none_or(|max| days_until_expiry <= max as u64);
1262
1263 min_days_ok && max_days_ok
1264 };
1265
1266 if date_filter_pass && days_filter_pass && !all_expirations.contains(expiration)
1267 {
1268 all_expirations.push(expiration.clone());
1269 }
1270 }
1271 }
1272 }
1273
1274 tracing::info!(
1275 "Filtered {} option expirations for {}.{}",
1276 all_expirations.len(),
1277 underlying.symbol.as_str(),
1278 underlying.exchange.as_str(),
1279 );
1280
1281 for expiration in all_expirations {
1283 tracing::info!(
1284 "Requesting option contract details for {}.{} expiry {}",
1285 underlying.symbol.as_str(),
1286 underlying.exchange.as_str(),
1287 expiration,
1288 );
1289
1290 let option_contract = Contract {
1291 contract_id: 0,
1292 symbol: underlying.symbol.clone(),
1293 security_type: if underlying.security_type == SecurityType::Future {
1294 SecurityType::FuturesOption
1295 } else {
1296 SecurityType::Option
1297 },
1298 last_trade_date_or_contract_month: expiration.clone(),
1299 strike: 0.0,
1300 right: String::new(),
1301 multiplier: String::new(),
1302 exchange: underlying.exchange.clone(),
1303 currency: underlying.currency.clone(),
1304 local_symbol: String::new(),
1305 primary_exchange: Exchange::default(),
1306 trading_class: String::new(),
1307 include_expired: false,
1308 security_id_type: String::new(),
1309 security_id: String::new(),
1310 combo_legs_description: String::new(),
1311 combo_legs: Vec::new(),
1312 delta_neutral_contract: None,
1313 issuer_id: String::new(),
1314 description: String::new(),
1315 last_trade_date: None,
1316 };
1317
1318 match client.contract_details(&option_contract).await {
1319 Ok(details_vec) => {
1320 tracing::info!(
1321 "Received {} raw option contract details for {}.{} expiry {}",
1322 details_vec.len(),
1323 underlying.symbol.as_str(),
1324 underlying.exchange.as_str(),
1325 expiration,
1326 );
1327
1328 for details in details_vec {
1329 if details.under_contract_id != underlying.contract_id {
1331 continue;
1332 }
1333
1334 let contract_id = details.contract.contract_id;
1335
1336 if self.contract_id_to_instrument_id.contains_key(&contract_id) {
1337 continue;
1338 }
1339
1340 let venue = self.determine_venue(&details.contract, Some(&details));
1341 let instrument_id = match self.config.symbology_method {
1342 crate::config::SymbologyMethod::Simplified => {
1343 crate::common::parse::ib_contract_to_instrument_id_simplified(
1344 &details.contract,
1345 Some(venue),
1346 )
1347 }
1348 crate::config::SymbologyMethod::Raw => {
1349 crate::common::parse::ib_contract_to_instrument_id_raw(
1350 &details.contract,
1351 Some(venue),
1352 )
1353 }
1354 }
1355 .context("Failed to convert IB contract to instrument ID")?;
1356
1357 let sec_type_str = format!("{:?}", details.contract.security_type);
1358 if self.is_filtered_sec_type(&sec_type_str) {
1359 continue;
1360 }
1361
1362 match parse_ib_contract_to_instrument(&details, instrument_id) {
1363 Ok(parsed_instrument) => {
1364 self.instruments.insert(instrument_id, parsed_instrument);
1365 self.contract_details.insert(instrument_id, details.clone());
1366 self.contracts
1367 .insert(instrument_id, details.contract.clone());
1368 self.contract_id_to_instrument_id
1369 .insert(contract_id, instrument_id);
1370 self.price_magnifiers
1371 .insert(instrument_id, details.price_magnifier);
1372 total_loaded += 1;
1373 }
1374 Err(e) => {
1375 tracing::warn!("Failed to parse option instrument: {}", e);
1376 }
1377 }
1378 }
1379 }
1380 Err(e) => {
1381 tracing::warn!(
1382 "Failed to fetch contract details for expiration {}: {}",
1383 expiration,
1384 e
1385 );
1386 }
1387 }
1388 }
1389
1390 tracing::info!(
1391 "Successfully loaded {} option instruments from chain for {}.{}",
1392 total_loaded,
1393 underlying.symbol.as_str(),
1394 underlying.exchange.as_str(),
1395 );
1396
1397 if total_loaded > 0
1399 && let Some(ref cache_path) = self.config.cache_path
1400 && let Err(e) = self.save_cache(cache_path).await
1401 {
1402 tracing::warn!("Failed to save instrument cache to {}: {}", cache_path, e);
1403 }
1404
1405 Ok(total_loaded)
1406 }
1407
1408 pub async fn fetch_futures_chain(
1428 &self,
1429 client: &ibapi::Client,
1430 symbol: &str,
1431 exchange: &str,
1432 currency: &str,
1433 min_expiry_days: Option<u32>,
1434 max_expiry_days: Option<u32>,
1435 ) -> anyhow::Result<usize> {
1436 tracing::info!(
1437 "Building futures chain for {}.{} (currency={}, min_days={:?}, max_days={:?}, config_min_days={:?}, config_max_days={:?})",
1438 symbol,
1439 exchange,
1440 currency,
1441 min_expiry_days,
1442 max_expiry_days,
1443 self.config.min_expiry_days,
1444 self.config.max_expiry_days,
1445 );
1446
1447 let futures_contract = Contract {
1449 contract_id: 0, symbol: Symbol::from(symbol.to_string()),
1451 security_type: SecurityType::Future,
1452 last_trade_date_or_contract_month: String::new(),
1453 strike: 0.0,
1454 right: String::new(),
1455 multiplier: String::new(),
1456 exchange: Exchange::from(exchange.to_string()),
1457 currency: ibapi::contracts::Currency::from(currency.to_string()),
1458 local_symbol: String::new(),
1459 primary_exchange: Exchange::default(),
1460 trading_class: String::new(),
1461 include_expired: false,
1462 security_id_type: String::new(),
1463 security_id: String::new(),
1464 combo_legs_description: String::new(),
1465 combo_legs: Vec::new(),
1466 delta_neutral_contract: None,
1467 issuer_id: String::new(),
1468 description: String::new(),
1469 last_trade_date: None,
1470 };
1471
1472 let details_vec = client
1474 .contract_details(&futures_contract)
1475 .await
1476 .context("Failed to fetch futures chain from IB")?;
1477
1478 tracing::info!(
1479 "Received {} raw futures contract details for {}.{}",
1480 details_vec.len(),
1481 symbol,
1482 exchange,
1483 );
1484
1485 let mut total_loaded = 0;
1486 let now = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1487
1488 for details in details_vec {
1489 let contract_id = details.contract.contract_id;
1490
1491 if self.contract_id_to_instrument_id.contains_key(&contract_id) {
1493 continue;
1494 }
1495
1496 let venue = self.determine_venue(&details.contract, Some(&details));
1498 let instrument_id = match self.config.symbology_method {
1499 crate::config::SymbologyMethod::Simplified => {
1500 crate::common::parse::ib_contract_to_instrument_id_simplified(
1501 &details.contract,
1502 Some(venue),
1503 )
1504 }
1505 crate::config::SymbologyMethod::Raw => {
1506 crate::common::parse::ib_contract_to_instrument_id_raw(
1507 &details.contract,
1508 Some(venue),
1509 )
1510 }
1511 }
1512 .context("Failed to convert IB contract to instrument ID")?;
1513
1514 let sec_type_str = format!("{:?}", details.contract.security_type);
1516 if self.is_filtered_sec_type(&sec_type_str) {
1517 continue;
1518 }
1519
1520 if !details
1522 .contract
1523 .last_trade_date_or_contract_month
1524 .is_empty()
1525 && let Ok(expiry_ns) = crate::providers::parse::expiry_timestring_to_unix_nanos(
1526 &details.contract.last_trade_date_or_contract_month,
1527 Some(&details),
1528 )
1529 {
1530 let days_until_expiry = (expiry_ns.as_u64().saturating_sub(now.as_u64()))
1531 / (24 * 60 * 60 * 1_000_000_000);
1532
1533 let min_days_ok = min_expiry_days
1534 .or(self.config.min_expiry_days)
1535 .is_none_or(|min| days_until_expiry >= min as u64);
1536 let max_days_ok = max_expiry_days
1537 .or(self.config.max_expiry_days)
1538 .is_none_or(|max| days_until_expiry <= max as u64);
1539
1540 if !min_days_ok || !max_days_ok {
1541 continue;
1542 }
1543 }
1544
1545 match parse_ib_contract_to_instrument(&details, instrument_id) {
1547 Ok(parsed_instrument) => {
1548 self.instruments.insert(instrument_id, parsed_instrument);
1550 self.contract_details.insert(instrument_id, details.clone());
1551 self.contracts
1552 .insert(instrument_id, details.contract.clone());
1553 self.contract_id_to_instrument_id
1554 .insert(contract_id, instrument_id);
1555 self.price_magnifiers
1556 .insert(instrument_id, details.price_magnifier);
1557 total_loaded += 1;
1558 }
1559 Err(e) => {
1560 tracing::warn!("Failed to parse futures instrument: {}", e);
1561 }
1562 }
1563 }
1564
1565 tracing::info!(
1566 "Successfully loaded {} futures instruments from chain",
1567 total_loaded
1568 );
1569
1570 if total_loaded > 0
1572 && let Some(ref cache_path) = self.config.cache_path
1573 && let Err(e) = self.save_cache(cache_path).await
1574 {
1575 tracing::warn!("Failed to save instrument cache to {}: {}", cache_path, e);
1576 }
1577
1578 Ok(total_loaded)
1579 }
1580
1581 pub async fn fetch_bag_contract(
1605 &self,
1606 client: &ibapi::Client,
1607 bag_contract: &Contract,
1608 ) -> anyhow::Result<usize> {
1609 if bag_contract.security_type != SecurityType::Spread || bag_contract.combo_legs.is_empty()
1611 {
1612 anyhow::bail!(
1613 "Invalid BAG contract: must have security_type=Spread and non-empty combo_legs"
1614 );
1615 }
1616
1617 tracing::info!(
1618 "Loading BAG contract with {} legs",
1619 bag_contract.combo_legs.len()
1620 );
1621
1622 let mut leg_contract_details = Vec::new();
1624 let mut leg_tuples = Vec::new();
1625
1626 for combo_leg in &bag_contract.combo_legs {
1627 let leg_contract = Contract {
1629 contract_id: combo_leg.contract_id, symbol: bag_contract.symbol.clone(), security_type: SecurityType::Option, last_trade_date_or_contract_month: String::new(),
1633 strike: 0.0,
1634 right: String::new(),
1635 multiplier: String::new(),
1636 exchange: Exchange::from(combo_leg.exchange.as_str()),
1637 currency: bag_contract.currency.clone(), local_symbol: String::new(),
1639 primary_exchange: Exchange::default(),
1640 trading_class: String::new(),
1641 include_expired: false,
1642 security_id_type: String::new(),
1643 security_id: String::new(),
1644 combo_legs_description: String::new(),
1645 combo_legs: Vec::new(),
1646 delta_neutral_contract: None,
1647 issuer_id: String::new(),
1648 description: String::new(),
1649 last_trade_date: None,
1650 };
1651
1652 let leg_details_vec =
1654 client
1655 .contract_details(&leg_contract)
1656 .await
1657 .with_context(|| {
1658 format!(
1659 "Failed to fetch contract details for leg conId {}",
1660 combo_leg.contract_id
1661 )
1662 })?;
1663
1664 if leg_details_vec.is_empty() {
1665 tracing::warn!(
1666 "No contract details returned for leg conId {}",
1667 combo_leg.contract_id
1668 );
1669 continue;
1670 }
1671
1672 let leg_details = &leg_details_vec[0];
1673 let leg_contract_id = leg_details.contract.contract_id;
1674
1675 let leg_instrument_id =
1677 if let Some(cached_id) = self.contract_id_to_instrument_id.get(&leg_contract_id) {
1678 *cached_id.value()
1679 } else {
1680 let leg_venue = self.determine_venue(&leg_details.contract, Some(leg_details));
1682 let leg_instrument_id = match self.config.symbology_method {
1683 crate::config::SymbologyMethod::Simplified => {
1684 crate::common::parse::ib_contract_to_instrument_id_simplified(
1685 &leg_details.contract,
1686 Some(leg_venue),
1687 )
1688 }
1689 crate::config::SymbologyMethod::Raw => {
1690 crate::common::parse::ib_contract_to_instrument_id_raw(
1691 &leg_details.contract,
1692 Some(leg_venue),
1693 )
1694 }
1695 }
1696 .context("Failed to convert leg contract to instrument ID")?;
1697
1698 let leg_instrument =
1700 parse_ib_contract_to_instrument(leg_details, leg_instrument_id)
1701 .context("Failed to parse leg instrument")?;
1702
1703 self.instruments.insert(leg_instrument_id, leg_instrument);
1704 self.contract_details
1705 .insert(leg_instrument_id, leg_details.clone());
1706 self.contracts
1707 .insert(leg_instrument_id, leg_details.contract.clone());
1708 self.contract_id_to_instrument_id
1709 .insert(leg_contract_id, leg_instrument_id);
1710 self.price_magnifiers
1711 .insert(leg_instrument_id, leg_details.price_magnifier);
1712
1713 leg_instrument_id
1714 };
1715
1716 let ratio = if combo_leg.action == "BUY" {
1718 combo_leg.ratio
1719 } else {
1720 -combo_leg.ratio
1721 };
1722
1723 let leg_details_clone = self
1725 .contract_details
1726 .get(&leg_instrument_id)
1727 .map(|entry| entry.value().clone())
1728 .ok_or_else(|| {
1729 anyhow::anyhow!(
1730 "Contract details not found for leg {} after loading",
1731 leg_instrument_id
1732 )
1733 })?;
1734
1735 leg_contract_details.push((leg_details_clone, ratio));
1736 leg_tuples.push((leg_instrument_id, ratio));
1737 }
1738
1739 if leg_tuples.is_empty() {
1740 anyhow::bail!("No valid legs loaded for BAG contract");
1741 }
1742
1743 let spread_instrument_id = create_spread_instrument_id(&leg_tuples)
1745 .context("Failed to create spread instrument ID from leg tuples")?;
1746
1747 if self.instruments.contains_key(&spread_instrument_id) {
1749 tracing::info!("Spread instrument {} already cached", spread_instrument_id);
1750 return Ok(0);
1751 }
1752
1753 let bag_details_vec = client
1755 .contract_details(bag_contract)
1756 .await
1757 .context("Failed to fetch BAG contract details from IB")?;
1758
1759 if bag_details_vec.is_empty() {
1760 tracing::warn!("No contract details returned for BAG contract");
1761 return Ok(0);
1762 }
1763
1764 let bag_details = &bag_details_vec[0];
1765 let bag_contract_id = bag_details.contract.contract_id;
1766
1767 let timestamp = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1769
1770 let leg_details_refs: Vec<(&ibapi::contracts::ContractDetails, i32)> =
1772 leg_contract_details.iter().map(|(d, r)| (d, *r)).collect();
1773
1774 let spread_instrument = parse_spread_instrument_any(
1775 spread_instrument_id,
1776 &leg_details_refs,
1777 Some(&bag_details.contract),
1778 Some(timestamp),
1779 )
1780 .context("Failed to parse spread instrument")?;
1781
1782 self.instruments
1784 .insert(spread_instrument_id, spread_instrument);
1785 self.contract_details
1786 .insert(spread_instrument_id, bag_details.clone());
1787 self.contracts
1788 .insert(spread_instrument_id, bag_details.contract.clone());
1789 self.contract_id_to_instrument_id
1790 .insert(bag_contract_id, spread_instrument_id);
1791 self.price_magnifiers
1792 .insert(spread_instrument_id, bag_details.price_magnifier);
1793
1794 tracing::info!(
1795 "Successfully loaded spread instrument {} with {} legs",
1796 spread_instrument_id,
1797 leg_tuples.len()
1798 );
1799
1800 if let Some(ref cache_path) = self.config.cache_path
1802 && let Err(e) = self.save_cache(cache_path).await
1803 {
1804 tracing::warn!("Failed to save instrument cache to {}: {}", cache_path, e);
1805 }
1806
1807 Ok(1)
1808 }
1809
1810 pub async fn save_cache(&self, cache_path: &str) -> anyhow::Result<()> {
1820 let cache = InstrumentCache {
1821 cache_timestamp: Utc::now(),
1822 contract_id_to_instrument_id: self
1823 .contract_id_to_instrument_id
1824 .iter()
1825 .map(|entry| (*entry.key(), entry.value().to_string()))
1826 .collect(),
1827 price_magnifiers: self
1828 .price_magnifiers
1829 .iter()
1830 .map(|entry| (entry.key().to_string(), *entry.value()))
1831 .collect(),
1832 instruments: self
1833 .instruments
1834 .iter()
1835 .map(|entry| {
1836 let instrument_id = entry.key().to_string();
1837 let json =
1838 serde_json::to_string(entry.value()).unwrap_or_else(|_| String::new());
1839 (instrument_id, json)
1840 })
1841 .collect(),
1842 };
1843
1844 if let Some(parent) = Path::new(cache_path).parent() {
1846 fs::create_dir_all(parent)?;
1847 }
1848
1849 let json = serde_json::to_string_pretty(&cache)?;
1851 fs::write(cache_path, json)?;
1852 tracing::info!(
1853 "Saved instrument cache to {} ({} instruments)",
1854 cache_path,
1855 cache.instruments.len()
1856 );
1857 Ok(())
1858 }
1859
1860 pub async fn load_cache(&self, cache_path: &str) -> anyhow::Result<bool> {
1874 if !Path::new(cache_path).exists() {
1876 tracing::debug!("Cache file does not exist: {}", cache_path);
1877 return Ok(false);
1878 }
1879
1880 let json = fs::read_to_string(cache_path)?;
1882 let cache: InstrumentCache = serde_json::from_str(&json)?;
1883
1884 if let Some(validity_days) = self.config.cache_validity_days {
1886 let cache_age = Utc::now() - cache.cache_timestamp;
1887 let max_age = chrono::Duration::days(validity_days as i64);
1888 if cache_age > max_age {
1889 tracing::info!(
1890 "Cache is expired (age: {} days, max: {} days). Ignoring cache",
1891 cache_age.num_days(),
1892 validity_days
1893 );
1894 return Ok(false);
1895 }
1896 }
1897
1898 let mut loaded_count = 0;
1900
1901 for (instrument_id_str, instrument_json) in &cache.instruments {
1902 match InstrumentId::from_str(instrument_id_str) {
1903 Ok(instrument_id) => match serde_json::from_str::<InstrumentAny>(instrument_json) {
1904 Ok(instrument) => {
1905 self.instruments.insert(instrument_id, instrument);
1906
1907 if let Ok(value) =
1908 serde_json::from_str::<serde_json::Value>(instrument_json)
1909 && let Some(contract_json) =
1910 value.get("info").and_then(|info| info.get("contract"))
1911 && let Ok(contract) =
1912 crate::common::contracts::parse_contract_from_json(contract_json)
1913 {
1914 self.contracts.insert(instrument_id, contract);
1915 }
1916 loaded_count += 1;
1917 }
1918 Err(e) => {
1919 tracing::warn!(
1920 "Failed to deserialize instrument {}: {}",
1921 instrument_id_str,
1922 e
1923 );
1924 }
1925 },
1926 Err(e) => {
1927 tracing::warn!("Failed to parse instrument ID {}: {}", instrument_id_str, e);
1928 }
1929 }
1930 }
1931
1932 for (contract_id, instrument_id_str) in &cache.contract_id_to_instrument_id {
1934 if let Ok(instrument_id) = InstrumentId::from_str(instrument_id_str) {
1935 self.contract_id_to_instrument_id
1936 .insert(*contract_id, instrument_id);
1937 }
1938 }
1939
1940 for (instrument_id_str, magnifier) in &cache.price_magnifiers {
1942 if let Ok(instrument_id) = InstrumentId::from_str(instrument_id_str) {
1943 self.price_magnifiers.insert(instrument_id, *magnifier);
1944 }
1945 }
1946
1947 tracing::info!(
1948 "Loaded instrument cache from {} ({} instruments, created at {})",
1949 cache_path,
1950 loaded_count,
1951 cache.cache_timestamp
1952 );
1953 Ok(true)
1954 }
1955}
1956
1957#[cfg(test)]
1958mod tests {
1959 use std::fs;
1960
1961 use nautilus_core::UnixNanos;
1962 use nautilus_model::{
1963 identifiers::{Symbol, Venue},
1964 instruments::CurrencyPair,
1965 types::{Price, Quantity, currency::Currency},
1966 };
1967 use tempfile::TempDir;
1968
1969 use super::*;
1970
1971 fn create_test_provider_with_cache() -> (InteractiveBrokersInstrumentProvider, TempDir) {
1972 let temp_dir = TempDir::new().unwrap();
1973 let cache_path = temp_dir
1974 .path()
1975 .join("test_cache.json")
1976 .to_str()
1977 .unwrap()
1978 .to_string();
1979
1980 let config = InteractiveBrokersInstrumentProviderConfig::builder()
1981 .cache_path(cache_path)
1982 .cache_validity_days(7u32)
1983 .build();
1984
1985 let provider = InteractiveBrokersInstrumentProvider::new(config);
1986 (provider, temp_dir)
1987 }
1988
1989 fn create_test_instrument(instrument_id: InstrumentId) -> InstrumentAny {
1990 CurrencyPair::new(
1991 instrument_id,
1992 Symbol::from("EUR/USD"),
1993 Currency::from("EUR"),
1994 Currency::from("USD"),
1995 4,
1996 0,
1997 Price::from("0.0001"),
1998 Quantity::from(1),
1999 None,
2000 None,
2001 None,
2002 None,
2003 None,
2004 None,
2005 None,
2006 None,
2007 None,
2008 None,
2009 None,
2010 None,
2011 None,
2012 UnixNanos::default(),
2013 UnixNanos::default(),
2014 )
2015 .into()
2016 }
2017
2018 #[tokio::test]
2019 async fn test_save_cache() {
2020 let (provider, _temp_dir) = create_test_provider_with_cache();
2021 let cache_path = provider.config.cache_path.as_ref().unwrap().clone();
2022
2023 let instrument_id1 = InstrumentId::new(Symbol::from("EUR/USD"), Venue::from("IDEALPRO"));
2025 let instrument_id2 = InstrumentId::new(Symbol::from("GBP/USD"), Venue::from("IDEALPRO"));
2026
2027 let instrument1 = create_test_instrument(instrument_id1);
2028 let instrument2 = create_test_instrument(instrument_id2);
2029
2030 provider.instruments.insert(instrument_id1, instrument1);
2031 provider.instruments.insert(instrument_id2, instrument2);
2032 provider
2033 .contract_id_to_instrument_id
2034 .insert(100, instrument_id1);
2035 provider
2036 .contract_id_to_instrument_id
2037 .insert(200, instrument_id2);
2038 provider.price_magnifiers.insert(instrument_id1, 1);
2039 provider.price_magnifiers.insert(instrument_id2, 1);
2040
2041 let result = provider.save_cache(&cache_path).await;
2043 assert!(result.is_ok(), "save_cache should succeed");
2044
2045 assert!(Path::new(&cache_path).exists(), "Cache file should exist");
2047
2048 let contents = fs::read_to_string(&cache_path).unwrap();
2050 assert!(
2051 contents.contains("EUR/USD"),
2052 "Cache should contain instrument data"
2053 );
2054 assert!(
2055 contents.contains("cache_timestamp"),
2056 "Cache should contain timestamp"
2057 );
2058 }
2059
2060 #[tokio::test]
2061 async fn test_load_cache_valid() {
2062 let (provider, _temp_dir) = create_test_provider_with_cache();
2063 let cache_path = provider.config.cache_path.as_ref().unwrap().clone();
2064
2065 let instrument_id = InstrumentId::new(Symbol::from("EUR/USD"), Venue::from("IDEALPRO"));
2067 let instrument = create_test_instrument(instrument_id);
2068
2069 provider
2070 .instruments
2071 .insert(instrument_id, instrument.clone());
2072 provider
2073 .contract_id_to_instrument_id
2074 .insert(100, instrument_id);
2075 provider.price_magnifiers.insert(instrument_id, 1);
2076
2077 provider.save_cache(&cache_path).await.unwrap();
2078
2079 let new_config = InteractiveBrokersInstrumentProviderConfig::builder()
2081 .cache_path(cache_path.clone())
2082 .cache_validity_days(7u32)
2083 .build();
2084
2085 let new_provider = InteractiveBrokersInstrumentProvider::new(new_config);
2086
2087 let result = new_provider.load_cache(&cache_path).await;
2088 assert!(result.is_ok(), "load_cache should succeed");
2089 assert!(
2090 result.unwrap(),
2091 "load_cache should return true for valid cache"
2092 );
2093
2094 assert!(
2096 new_provider.find(&instrument_id).is_some(),
2097 "Instrument should be loaded from cache"
2098 );
2099 assert_eq!(new_provider.count(), 1, "Provider should have 1 instrument");
2100 }
2101
2102 #[tokio::test]
2103 async fn test_load_cache_missing_file() {
2104 let (provider, _temp_dir) = create_test_provider_with_cache();
2105 let cache_path = "/nonexistent/path/cache.json";
2106
2107 let result = provider.load_cache(cache_path).await;
2108 assert!(
2109 result.is_ok(),
2110 "load_cache should not error on missing file"
2111 );
2112 assert!(
2113 !result.unwrap(),
2114 "load_cache should return false for missing file"
2115 );
2116 }
2117
2118 #[tokio::test]
2119 async fn test_load_cache_expired() {
2120 let (provider, _temp_dir) = create_test_provider_with_cache();
2121 let cache_path = provider.config.cache_path.as_ref().unwrap().clone();
2122
2123 let old_timestamp = Utc::now() - chrono::Duration::days(10);
2125 let expired_cache = InstrumentCache {
2126 cache_timestamp: old_timestamp,
2127 contract_id_to_instrument_id: vec![],
2128 price_magnifiers: vec![],
2129 instruments: vec![],
2130 };
2131
2132 let json = serde_json::to_string_pretty(&expired_cache).unwrap();
2133 fs::write(&cache_path, json).unwrap();
2134
2135 let result = provider.load_cache(&cache_path).await;
2137 assert!(
2138 result.is_ok(),
2139 "load_cache should not error on expired cache"
2140 );
2141 assert!(
2142 !result.unwrap(),
2143 "load_cache should return false for expired cache"
2144 );
2145 }
2146}