Skip to main content

nautilus_interactive_brokers/providers/
instruments.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Interactive Brokers instrument provider implementation.
17
18use std::{collections::HashMap, fs, path::Path, str::FromStr, sync::Arc};
19
20use anyhow::Context;
21use chrono::{DateTime, Utc};
22use dashmap::DashMap;
23use ibapi::contracts::{ComboLegOpenClose, Contract, Exchange, SecurityType, Symbol};
24#[cfg(test)]
25use nautilus_model::instruments::Instrument;
26use nautilus_model::{
27    identifiers::{InstrumentId, Venue},
28    instruments::InstrumentAny,
29};
30use serde::{Deserialize, Serialize};
31
32use crate::{
33    common::parse::{
34        determine_venue_from_contract, instrument_id_to_ib_contract, is_spread_instrument_id,
35        parse_spread_instrument_id_to_legs, possible_exchanges_for_venue,
36    },
37    config::InteractiveBrokersInstrumentProviderConfig,
38    providers::parse::{
39        create_spread_instrument_id, parse_ib_contract_to_instrument, parse_spread_instrument_any,
40    },
41};
42
43/// Cache structure for persistent instrument caching.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45struct InstrumentCache {
46    /// Timestamp when cache was created.
47    cache_timestamp: DateTime<Utc>,
48    /// Contract ID to Instrument ID mappings.
49    contract_id_to_instrument_id: Vec<(i32, String)>,
50    /// Instrument ID to Price Magnifier mappings.
51    price_magnifiers: Vec<(String, i32)>,
52    /// Instruments serialized as JSON strings (since InstrumentAny is serializable).
53    instruments: Vec<(String, String)>, // (instrument_id, json)
54}
55
56/// Interactive Brokers instrument provider.
57///
58/// This provider fetches contract details from Interactive Brokers using the `rust-ibapi` library
59/// and converts them to NautilusTrader instruments.
60#[cfg_attr(
61    feature = "python",
62    pyo3::pyclass(
63        module = "nautilus_trader.core.nautilus_pyo3.interactive_brokers",
64        unsendable,
65        from_py_object
66    )
67)]
68#[derive(Debug, Clone)]
69pub struct InteractiveBrokersInstrumentProvider {
70    /// Configuration for the provider.
71    config: InteractiveBrokersInstrumentProviderConfig,
72    /// Cache mapping contract IDs to instrument IDs.
73    contract_id_to_instrument_id: Arc<DashMap<i32, InstrumentId>>,
74    /// Cache mapping instrument IDs to instruments.
75    instruments: Arc<DashMap<InstrumentId, InstrumentAny>>,
76    /// Cache mapping instrument IDs to contract details.
77    contract_details: Arc<DashMap<InstrumentId, ibapi::contracts::ContractDetails>>,
78    /// Cache mapping instrument IDs to IB contracts.
79    contracts: Arc<DashMap<InstrumentId, Contract>>,
80    /// Dedicated cache for price magnifiers for fast lookups.
81    price_magnifiers: Arc<DashMap<InstrumentId, i32>>,
82}
83
84impl InteractiveBrokersInstrumentProvider {
85    /// Create a new `InteractiveBrokersInstrumentProvider`.
86    ///
87    /// # Arguments
88    ///
89    /// * `config` - Configuration for the provider
90    pub fn new(config: InteractiveBrokersInstrumentProviderConfig) -> Self {
91        Self {
92            config,
93            contract_id_to_instrument_id: Arc::new(DashMap::new()),
94            instruments: Arc::new(DashMap::new()),
95            contract_details: Arc::new(DashMap::new()),
96            contracts: Arc::new(DashMap::new()),
97            price_magnifiers: Arc::new(DashMap::new()),
98        }
99    }
100
101    #[cfg(test)]
102    pub(crate) fn insert_test_instrument(
103        &self,
104        instrument: InstrumentAny,
105        contract_id: i32,
106        price_magnifier: i32,
107    ) {
108        let instrument_id = instrument.id();
109        self.instruments.insert(instrument_id, instrument);
110        self.contract_id_to_instrument_id
111            .insert(contract_id, instrument_id);
112        self.contracts.insert(
113            instrument_id,
114            Contract {
115                contract_id,
116                ..Default::default()
117            },
118        );
119        self.price_magnifiers.insert(instrument_id, price_magnifier);
120    }
121
122    /// Initialize the provider by loading cache if configured.
123    ///
124    /// This is equivalent to Python's `provider.initialize()` method.
125    /// It loads instruments from cache if `cache_path` is configured and cache is valid.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if cache loading fails.
130    pub async fn initialize(&self) -> anyhow::Result<()> {
131        if let Some(ref cache_path) = self.config.cache_path {
132            match self.load_cache(cache_path).await {
133                Ok(cache_loaded) => {
134                    if cache_loaded {
135                        tracing::info!(
136                            "Initialized provider with {} instruments from cache",
137                            self.count()
138                        );
139                    } else {
140                        tracing::debug!(
141                            "Cache file not found or expired, starting with empty cache"
142                        );
143                    }
144                }
145                Err(e) => {
146                    tracing::warn!("Failed to load cache during initialization: {}", e);
147                }
148            }
149        }
150        Ok(())
151    }
152
153    /// Determine venue from contract using provider configuration.
154    ///
155    /// This is equivalent to Python's `determine_venue_from_contract` method.
156    /// It uses the config's symbol-to-venue mapping and exchange-to-venue conversion settings.
157    ///
158    /// # Arguments
159    ///
160    /// * `contract` - The IB contract
161    ///
162    /// # Returns
163    ///
164    /// The determined venue.
165    pub fn determine_venue(
166        &self,
167        contract: &Contract,
168        contract_details: Option<&ibapi::contracts::ContractDetails>,
169    ) -> Venue {
170        let valid_exchanges = contract_details.map(|details| details.valid_exchanges.join(","));
171        let venue_str = determine_venue_from_contract(
172            contract,
173            &self.config.symbol_to_mic_venue,
174            self.config.convert_exchange_to_mic_venue,
175            valid_exchanges.as_deref(),
176        );
177        Venue::from(venue_str.as_str())
178    }
179
180    /// Get the symbology method from the provider configuration.
181    pub fn symbology_method(&self) -> crate::config::SymbologyMethod {
182        self.config.symbology_method
183    }
184
185    /// Get an instrument by its ID.
186    ///
187    /// # Arguments
188    ///
189    /// * `instrument_id` - The instrument ID to look up
190    ///
191    /// # Returns
192    ///
193    /// Returns the instrument if found, `None` otherwise.
194    #[must_use]
195    pub fn find(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
196        self.instruments
197            .get(instrument_id)
198            .map(|entry| entry.value().clone())
199    }
200
201    /// Get an instrument by contract ID.
202    ///
203    /// # Arguments
204    ///
205    /// * `contract_id` - The IB contract ID to look up
206    ///
207    /// # Returns
208    ///
209    /// Returns the instrument if found, `None` otherwise.
210    #[must_use]
211    pub fn find_by_contract_id(&self, contract_id: i32) -> Option<InstrumentAny> {
212        self.contract_id_to_instrument_id
213            .get(&contract_id)
214            .and_then(|entry| self.find(entry.value()))
215    }
216
217    /// Get an instrument ID by contract ID.
218    ///
219    /// # Arguments
220    ///
221    /// * `contract_id` - The IB contract ID to look up
222    ///
223    /// # Returns
224    ///
225    /// Returns the instrument ID if found, `None` otherwise.
226    #[must_use]
227    pub fn get_instrument_id_by_contract_id(&self, contract_id: i32) -> Option<InstrumentId> {
228        self.contract_id_to_instrument_id
229            .get(&contract_id)
230            .map(|entry| *entry.value())
231    }
232
233    /// Check if a security type should be filtered.
234    ///
235    /// # Arguments
236    ///
237    /// * `sec_type` - The security type to check
238    ///
239    /// # Returns
240    ///
241    /// Returns `true` if the security type should be filtered.
242    #[must_use]
243    pub fn is_filtered_sec_type(&self, sec_type: &str) -> bool {
244        self.config.filter_sec_types.contains(sec_type)
245    }
246
247    /// Get all cached instruments.
248    ///
249    /// # Returns
250    ///
251    /// Returns a vector of all cached instruments.
252    #[must_use]
253    pub fn get_all(&self) -> Vec<InstrumentAny> {
254        self.instruments
255            .iter()
256            .map(|entry| entry.value().clone())
257            .collect()
258    }
259
260    /// Get the number of cached instruments.
261    ///
262    /// # Returns
263    ///
264    /// Returns the number of cached instruments.
265    #[must_use]
266    pub fn count(&self) -> usize {
267        self.instruments.len()
268    }
269
270    /// Get price magnifier for an instrument ID.
271    ///
272    /// Price magnifier allows execution and strike prices to be reported consistently
273    /// with market data and historical data.
274    ///
275    /// This method first checks the dedicated price magnifier cache for fast lookup.
276    /// If not found, it falls back to checking contract details. If still not found,
277    /// it returns the default value of 1 and logs a warning if the instrument exists.
278    ///
279    /// # Arguments
280    ///
281    /// * `instrument_id` - The instrument ID to look up
282    ///
283    /// # Returns
284    ///
285    /// Returns the price magnifier if found, otherwise 1.
286    #[must_use]
287    pub fn get_price_magnifier(&self, instrument_id: &InstrumentId) -> i32 {
288        // First try dedicated price magnifier cache for fast lookup
289        if let Some(magnifier) = self.price_magnifiers.get(instrument_id) {
290            return normalize_price_magnifier(*magnifier.value());
291        }
292
293        // Fall back to contract details lookup
294        if let Some(details) = self.contract_details.get(instrument_id) {
295            let magnifier = normalize_price_magnifier(details.value().price_magnifier);
296            // Cache it for future fast lookups
297            self.price_magnifiers.insert(*instrument_id, magnifier);
298            return magnifier;
299        }
300
301        // Not found - check if instrument exists (might not have contract details loaded yet)
302        if self.instruments.contains_key(instrument_id) {
303            tracing::debug!(
304                "Price magnifier not found for instrument {} (has instrument but no contract details), using default 1",
305                instrument_id
306            );
307        } else {
308            tracing::trace!(
309                "Price magnifier not found for instrument {} (instrument not loaded), using default 1",
310                instrument_id
311            );
312        }
313
314        // Default to 1 if not found
315        1
316    }
317
318    /// Get an instrument by IB Contract.
319    ///
320    /// This is equivalent to Python's `get_instrument` method.
321    /// Supports BAG contracts by auto-loading legs.
322    ///
323    /// # Arguments
324    ///
325    /// * `client` - The IB API client
326    /// * `contract` - The IB contract to get instrument for
327    ///
328    /// # Returns
329    ///
330    /// Returns the instrument if found, `None` otherwise.
331    ///
332    /// # Errors
333    ///
334    /// Returns an error if fetching fails.
335    pub async fn get_instrument(
336        &self,
337        client: &ibapi::Client,
338        contract: &Contract,
339    ) -> anyhow::Result<Option<InstrumentAny>> {
340        log::debug!(
341            "IB get_instrument request sec_type={:?} con_id={} symbol={} local_symbol={} exchange={} expiry={}",
342            contract.security_type,
343            contract.contract_id,
344            contract.symbol.as_str(),
345            contract.local_symbol.as_str(),
346            contract.exchange.as_str(),
347            contract.last_trade_date_or_contract_month.as_str()
348        );
349        // Check if security type is filtered
350        let sec_type_str = format!("{:?}", contract.security_type);
351        if self.is_filtered_sec_type(&sec_type_str) {
352            tracing::warn!(
353                "Skipping filtered security type {} for contract",
354                sec_type_str
355            );
356            return Ok(None);
357        }
358
359        let contract_id = contract.contract_id;
360
361        // Check if we already have this instrument by contract ID
362        if let Some(cached_instrument_id) = self.contract_id_to_instrument_id.get(&contract_id) {
363            log::debug!(
364                "IB get_instrument cache hit for contract_id={} -> {}",
365                contract_id,
366                cached_instrument_id.value()
367            );
368
369            if let Some(instrument) = self.find(cached_instrument_id.value()) {
370                return Ok(Some(instrument));
371            }
372        }
373
374        // Special handling for BAG contracts
375        if contract.security_type == SecurityType::Spread && !contract.combo_legs.is_empty() {
376            // Load BAG contract (which auto-loads legs and creates spread instrument)
377            self.fetch_bag_contract(client, contract).await?;
378
379            // Get the spread instrument ID that was created
380            if let Some(spread_instrument_id) = self.contract_id_to_instrument_id.get(&contract_id)
381            {
382                return Ok(self.find(spread_instrument_id.value()));
383            }
384        }
385
386        // For non-BAG contracts, fetch contract details and load
387        let details_vec = client
388            .contract_details(contract)
389            .await
390            .context("Failed to fetch contract details from IB")?;
391
392        log::debug!(
393            "IB get_instrument received {} contract details for sec_type={:?} symbol={} local_symbol={}",
394            details_vec.len(),
395            contract.security_type,
396            contract.symbol.as_str(),
397            contract.local_symbol.as_str()
398        );
399
400        if details_vec.is_empty() {
401            tracing::warn!("No contract details returned for contract {}", contract_id);
402            return Ok(None);
403        }
404
405        let details = &details_vec[0];
406        log::debug!(
407            "IB get_instrument using first detail sec_type={:?} con_id={} local_symbol={} exchange={} under_con_id={}",
408            details.contract.security_type,
409            details.contract.contract_id,
410            details.contract.local_symbol.as_str(),
411            details.contract.exchange.as_str(),
412            details.under_contract_id
413        );
414        let venue = self.determine_venue(&details.contract, Some(details));
415        let instrument_id = match self.config.symbology_method {
416            crate::config::SymbologyMethod::Simplified => {
417                crate::common::parse::ib_contract_to_instrument_id_simplified(
418                    &details.contract,
419                    Some(venue),
420                )
421            }
422            crate::config::SymbologyMethod::Raw => {
423                crate::common::parse::ib_contract_to_instrument_id_raw(
424                    &details.contract,
425                    Some(venue),
426                )
427            }
428        }
429        .context("Failed to convert contract to instrument ID")?;
430
431        log::debug!(
432            "IB get_instrument mapped to instrument_id={}",
433            instrument_id
434        );
435
436        // Parse and cache the instrument
437        let instrument = parse_ib_contract_to_instrument(details, instrument_id)
438            .context("Failed to parse instrument")?;
439
440        self.instruments.insert(instrument_id, instrument.clone());
441        self.contract_details.insert(instrument_id, details.clone());
442        self.contracts
443            .insert(instrument_id, details.contract.clone());
444        self.contract_id_to_instrument_id
445            .insert(details.contract.contract_id, instrument_id);
446        self.price_magnifiers
447            .insert(instrument_id, details.price_magnifier);
448
449        Ok(Some(instrument))
450    }
451
452    /// Convert an instrument ID to IB contract details.
453    ///
454    /// This is equivalent to Python's `instrument_id_to_ib_contract_details` method.
455    ///
456    /// # Arguments
457    ///
458    /// * `instrument_id` - The instrument ID to convert
459    ///
460    /// # Returns
461    ///
462    /// Returns the contract details if found, `None` otherwise.
463    #[must_use]
464    pub fn instrument_id_to_ib_contract_details(
465        &self,
466        instrument_id: &InstrumentId,
467    ) -> Option<ibapi::contracts::ContractDetails> {
468        self.contract_details
469            .get(instrument_id)
470            .map(|entry| entry.value().clone())
471    }
472
473    #[must_use]
474    pub fn instrument_id_to_ib_contract(&self, instrument_id: &InstrumentId) -> Option<Contract> {
475        self.contracts
476            .get(instrument_id)
477            .map(|entry| entry.value().clone())
478    }
479
480    pub fn resolve_contract_for_instrument(
481        &self,
482        instrument_id: InstrumentId,
483    ) -> anyhow::Result<Contract> {
484        if let Some(contract) = self.instrument_id_to_ib_contract(&instrument_id) {
485            return Ok(contract);
486        }
487
488        if let Some(details) = self.instrument_id_to_ib_contract_details(&instrument_id) {
489            return Ok(details.contract);
490        }
491
492        instrument_id_to_ib_contract(instrument_id, None)
493    }
494
495    /// Load a single instrument (does not return loaded IDs).
496    ///
497    /// This is equivalent to Python's `load_async` method.
498    ///
499    /// # Arguments
500    ///
501    /// * `client` - The IB API client
502    /// * `instrument_id` - The instrument ID to load
503    /// * `force_instrument_update` - If true, force re-fetch even if already cached
504    ///
505    /// # Errors
506    ///
507    /// Returns an error if loading fails.
508    pub async fn load_async(
509        &self,
510        client: &ibapi::Client,
511        instrument_id: InstrumentId,
512        filters: Option<HashMap<String, String>>,
513    ) -> anyhow::Result<()> {
514        let filters: Option<HashMap<String, String>> = filters;
515        let force_instrument_update = filters
516            .as_ref()
517            .and_then(|f| f.get("force_instrument_update"))
518            .map(|v| v == "true")
519            .unwrap_or(false);
520
521        self.fetch_contract_details(client, instrument_id, force_instrument_update, filters)
522            .await
523    }
524
525    /// Load a single instrument and return the loaded instrument ID.
526    ///
527    /// This is equivalent to Python's `load_with_return_async` method.
528    ///
529    /// # Arguments
530    ///
531    /// * `client` - The IB API client
532    /// * `instrument_id` - The instrument ID to load
533    /// * `force_instrument_update` - If true, force re-fetch even if already cached
534    ///
535    /// # Returns
536    ///
537    /// Returns the loaded instrument ID if successful, `None` otherwise.
538    ///
539    /// # Errors
540    ///
541    /// Returns an error if loading fails.
542    pub async fn load_with_return_async(
543        &self,
544        client: &ibapi::Client,
545        instrument_id: InstrumentId,
546        filters: Option<HashMap<String, String>>,
547    ) -> anyhow::Result<Option<InstrumentId>> {
548        let filters: Option<HashMap<String, String>> = filters;
549        let force_instrument_update = filters
550            .as_ref()
551            .and_then(|f| f.get("force_instrument_update"))
552            .map(|v| v == "true")
553            .unwrap_or(false);
554
555        if is_spread_instrument_id(&instrument_id) {
556            self.fetch_spread_instrument(client, instrument_id, force_instrument_update, filters)
557                .await?;
558        } else {
559            self.fetch_contract_details(client, instrument_id, force_instrument_update, filters)
560                .await?;
561        }
562
563        if self.instruments.contains_key(&instrument_id) {
564            Ok(Some(instrument_id))
565        } else {
566            Ok(None)
567        }
568    }
569
570    /// Load multiple instruments (does not return loaded IDs).
571    ///
572    /// This is equivalent to Python's `load_ids_async` method.
573    ///
574    /// # Arguments
575    ///
576    /// * `client` - The IB API client
577    /// * `instrument_ids` - Vector of instrument IDs to load
578    /// * `force_instrument_update` - If true, force re-fetch even if already cached
579    ///
580    /// # Errors
581    ///
582    /// Returns an error if loading fails.
583    pub async fn load_ids_async(
584        &self,
585        client: &ibapi::Client,
586        instrument_ids: Vec<InstrumentId>,
587        filters: Option<HashMap<String, String>>,
588    ) -> anyhow::Result<()> {
589        let filters: Option<HashMap<String, String>> = filters;
590        let force_instrument_update = filters
591            .as_ref()
592            .and_then(|f| f.get("force_instrument_update"))
593            .map(|v| v == "true")
594            .unwrap_or(false);
595
596        for instrument_id in instrument_ids {
597            let load_result = if is_spread_instrument_id(&instrument_id) {
598                self.fetch_spread_instrument(
599                    client,
600                    instrument_id,
601                    force_instrument_update,
602                    filters.clone(),
603                )
604                .await
605                .map(|_| ())
606            } else {
607                self.fetch_contract_details(
608                    client,
609                    instrument_id,
610                    force_instrument_update,
611                    filters.clone(),
612                )
613                .await
614            };
615
616            if let Err(e) = load_result {
617                tracing::warn!("Failed to load instrument {}: {}", instrument_id, e);
618            }
619        }
620        Ok(())
621    }
622
623    /// Load multiple instruments and return the loaded instrument IDs.
624    ///
625    /// This is equivalent to Python's `load_ids_with_return_async` method.
626    ///
627    /// # Arguments
628    ///
629    /// * `client` - The IB API client
630    /// * `instrument_ids` - Vector of instrument IDs to load
631    /// * `force_instrument_update` - If true, force re-fetch even if already cached
632    ///
633    /// # Returns
634    ///
635    /// Returns a vector of successfully loaded instrument IDs.
636    ///
637    /// # Errors
638    ///
639    /// Returns an error if loading fails.
640    pub async fn load_ids_with_return_async(
641        &self,
642        client: &ibapi::Client,
643        instrument_ids: Vec<InstrumentId>,
644        filters: Option<HashMap<String, String>>,
645    ) -> anyhow::Result<Vec<InstrumentId>> {
646        let mut loaded_ids = Vec::new();
647
648        let force_instrument_update = filters
649            .as_ref()
650            .and_then(|f| f.get("force_instrument_update"))
651            .map(|v| v == "true")
652            .unwrap_or(false);
653
654        for instrument_id in instrument_ids {
655            let load_result = if is_spread_instrument_id(&instrument_id) {
656                self.fetch_spread_instrument(
657                    client,
658                    instrument_id,
659                    force_instrument_update,
660                    filters.clone(),
661                )
662                .await
663                .map(|loaded| loaded.then_some(()))
664            } else {
665                self.fetch_contract_details(
666                    client,
667                    instrument_id,
668                    force_instrument_update,
669                    filters.clone(),
670                )
671                .await
672                .map(Some)
673            };
674
675            if load_result.is_ok() {
676                loaded_ids.push(instrument_id);
677            }
678        }
679
680        Ok(loaded_ids)
681    }
682
683    fn create_bag_contract_from_legs(
684        &self,
685        leg_contract_details: &[(ibapi::contracts::ContractDetails, i32)],
686        instrument_id: Option<InstrumentId>,
687        bag_contract: Option<&Contract>,
688    ) -> anyhow::Result<Contract> {
689        if let Some(bag_contract) = bag_contract {
690            return Ok(bag_contract.clone());
691        }
692
693        let (first_details, _) = leg_contract_details
694            .first()
695            .ok_or_else(|| anyhow::anyhow!("Cannot create BAG contract without leg details"))?;
696
697        let combo_legs = leg_contract_details
698            .iter()
699            .map(|(details, ratio)| ibapi::contracts::ComboLeg {
700                contract_id: details.contract.contract_id,
701                ratio: ratio.abs(),
702                action: if *ratio > 0 {
703                    "BUY".to_string()
704                } else {
705                    "SELL".to_string()
706                },
707                exchange: details.contract.exchange.to_string(),
708                open_close: ComboLegOpenClose::Same,
709                short_sale_slot: 0,
710                designated_location: String::new(),
711                exempt_code: -1,
712            })
713            .collect();
714
715        Ok(Contract {
716            contract_id: 0,
717            symbol: first_details.contract.symbol.clone(),
718            security_type: SecurityType::Spread,
719            exchange: Exchange::from("SMART"),
720            currency: first_details.contract.currency.clone(),
721            local_symbol: instrument_id.map_or_else(String::new, |id| id.symbol.to_string()),
722            combo_legs_description: instrument_id
723                .map(|id| format!("Spread: {}", id.symbol))
724                .unwrap_or_else(|| "Spread".to_string()),
725            combo_legs,
726            ..Default::default()
727        })
728    }
729
730    /// Fetch a spread instrument by loading its individual legs.
731    ///
732    /// This is equivalent to Python's `_fetch_spread_instrument` method.
733    /// It parses the spread instrument ID to extract leg tuples, loads each leg,
734    /// and then creates the spread instrument.
735    ///
736    /// # Arguments
737    ///
738    /// * `client` - The IB API client
739    /// * `spread_instrument_id` - The spread instrument ID to fetch
740    /// * `force_instrument_update` - If true, force re-fetch even if already cached
741    ///
742    /// # Returns
743    ///
744    /// Returns `true` if the spread instrument was successfully loaded, `false` otherwise.
745    ///
746    /// # Errors
747    ///
748    /// Returns an error if parsing or loading fails.
749    pub async fn fetch_spread_instrument(
750        &self,
751        client: &ibapi::Client,
752        spread_instrument_id: InstrumentId,
753        force_instrument_update: bool,
754        filters: Option<HashMap<String, String>>,
755    ) -> anyhow::Result<bool> {
756        // Check if already cached (unless forcing update)
757        if !force_instrument_update && self.instruments.contains_key(&spread_instrument_id) {
758            tracing::debug!("Spread instrument {} already cached", spread_instrument_id);
759            return Ok(true);
760        }
761
762        // Parse the spread ID to get individual legs
763        let leg_tuples = parse_spread_instrument_id_to_legs(&spread_instrument_id)
764            .context("Failed to parse spread instrument ID to leg tuples")?;
765
766        if leg_tuples.is_empty() {
767            tracing::error!("Spread instrument {} has no legs", spread_instrument_id);
768            return Ok(false);
769        }
770
771        tracing::info!(
772            "Loading spread instrument {} with {} legs",
773            spread_instrument_id,
774            leg_tuples.len()
775        );
776
777        // First, load all individual leg instruments to get their contract details
778        let mut leg_contract_details = Vec::new();
779
780        for (leg_instrument_id, ratio) in &leg_tuples {
781            tracing::info!(
782                "Loading leg instrument: {} (ratio: {})",
783                leg_instrument_id,
784                ratio
785            );
786
787            // Load the individual leg instrument
788            self.fetch_contract_details(
789                client,
790                *leg_instrument_id,
791                force_instrument_update,
792                filters.clone(),
793            )
794            .await
795            .with_context(|| format!("Failed to load leg instrument: {}", leg_instrument_id))?;
796
797            // Get the contract details for this leg
798            let leg_details = self
799                .contract_details
800                .get(leg_instrument_id)
801                .map(|entry| entry.value().clone())
802                .ok_or_else(|| {
803                    anyhow::anyhow!(
804                        "Leg instrument {} not found in contract details after loading",
805                        leg_instrument_id
806                    )
807                })?;
808
809            leg_contract_details.push((leg_details, *ratio));
810        }
811
812        // Create the spread instrument
813        let timestamp = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
814        let leg_details_refs: Vec<(&ibapi::contracts::ContractDetails, i32)> =
815            leg_contract_details.iter().map(|(d, r)| (d, *r)).collect();
816
817        let bag_contract = self.create_bag_contract_from_legs(
818            &leg_contract_details,
819            Some(spread_instrument_id),
820            None,
821        )?;
822        let spread_instrument = parse_spread_instrument_any(
823            spread_instrument_id,
824            &leg_details_refs,
825            Some(&bag_contract),
826            Some(timestamp),
827        )
828        .context("Failed to parse spread instrument")?;
829
830        // Cache the spread instrument
831        self.instruments
832            .insert(spread_instrument_id, spread_instrument);
833        self.contracts.insert(spread_instrument_id, bag_contract);
834
835        if let Some((first_details, _)) = leg_contract_details.first() {
836            self.price_magnifiers
837                .insert(spread_instrument_id, first_details.price_magnifier);
838        }
839
840        tracing::info!(
841            "Successfully loaded spread instrument {}",
842            spread_instrument_id
843        );
844        Ok(true)
845    }
846
847    /// Load all instruments from provided IDs and contracts.
848    ///
849    /// This is equivalent to Python's `load_all_async` method.
850    /// Python version loads from config's `_load_ids_on_start` and `_load_contracts_on_start`.
851    /// Rust version accepts these as parameters for flexibility.
852    ///
853    /// # Arguments
854    ///
855    /// * `client` - The IB API client
856    /// * `instrument_ids` - Optional vector of instrument IDs to load
857    /// * `contracts` - Optional vector of IB contracts to load
858    /// * `force_instrument_update` - If true, force re-fetch even if already cached
859    ///
860    /// # Errors
861    ///
862    /// Returns an error if loading fails.
863    pub async fn load_all_async(
864        &self,
865        client: &ibapi::Client,
866        instrument_ids: Option<Vec<InstrumentId>>,
867        contracts: Option<Vec<Contract>>,
868        force_instrument_update: bool,
869    ) -> anyhow::Result<Vec<InstrumentId>> {
870        let mut loaded_ids = Vec::new();
871
872        // Load from instrument IDs
873        let ids_to_load =
874            instrument_ids.unwrap_or_else(|| self.config.load_ids.iter().cloned().collect());
875
876        if !ids_to_load.is_empty() {
877            let mut filters = std::collections::HashMap::new();
878
879            if force_instrument_update {
880                filters.insert("force_instrument_update".to_string(), "true".to_string());
881            }
882            let filters = if filters.is_empty() {
883                None
884            } else {
885                Some(filters)
886            };
887
888            let ids_result = self
889                .load_ids_with_return_async(client, ids_to_load, filters)
890                .await
891                .context("Failed to load instruments from IDs")?;
892            loaded_ids.extend(ids_result);
893        }
894
895        // Load from contracts
896        let mut contracts_to_load = contracts.unwrap_or_default();
897        if contracts_to_load.is_empty() {
898            for contract_json in &self.config.load_contracts {
899                let contract = crate::common::contracts::parse_contract_from_json(contract_json)
900                    .context("Failed to parse contract from config JSON")?;
901                contracts_to_load.push(contract);
902            }
903        }
904
905        if !contracts_to_load.is_empty() {
906            for contract in contracts_to_load {
907                match self.get_instrument(client, &contract).await {
908                    Ok(Some(instrument)) => {
909                        use nautilus_model::instruments::Instrument;
910                        let instrument_id = instrument.id();
911                        loaded_ids.push(instrument_id);
912                        tracing::debug!("Loaded instrument {} from contract", instrument_id);
913                    }
914                    Ok(None) => {
915                        tracing::warn!("Failed to load instrument from contract: {:?}", contract);
916                    }
917                    Err(e) => {
918                        tracing::warn!(
919                            "Error loading instrument from contract {:?}: {}",
920                            contract,
921                            e
922                        );
923                    }
924                }
925            }
926        }
927
928        if loaded_ids.is_empty() {
929            tracing::debug!("load_all_async called but no instruments were loaded");
930        } else {
931            tracing::info!("load_all_async loaded {} instruments", loaded_ids.len());
932        }
933
934        Ok(loaded_ids)
935    }
936}
937
938fn normalize_price_magnifier(price_magnifier: i32) -> i32 {
939    if price_magnifier > 0 {
940        price_magnifier
941    } else {
942        1
943    }
944}
945
946impl InteractiveBrokersInstrumentProvider {
947    /// Fetch and cache contract details for an instrument ID using the provided IB client.
948    ///
949    /// # Arguments
950    ///
951    /// * `client` - The IB API client
952    /// * `instrument_id` - The instrument ID to fetch
953    ///
954    /// # Errors
955    ///
956    /// Returns an error if fetching fails.
957    pub async fn fetch_contract_details(
958        &self,
959        client: &ibapi::Client,
960        instrument_id: InstrumentId,
961        force_instrument_update: bool,
962        filters: Option<HashMap<String, String>>,
963    ) -> anyhow::Result<()> {
964        if !force_instrument_update {
965            if self.instruments.contains_key(&instrument_id) {
966                tracing::debug!(
967                    "Instrument {} already cached, skipping fetch",
968                    instrument_id
969                );
970                return Ok(());
971            }
972        }
973        // Convert instrument ID to IB contract
974        let exchange = filters
975            .as_ref()
976            .and_then(|f| f.get("exchange"))
977            .map(|s| s.as_str());
978
979        let exchanges_to_try: Vec<String> = if let Some(exchange) = exchange {
980            vec![exchange.to_string()]
981        } else {
982            possible_exchanges_for_venue(instrument_id.venue.as_str())
983        };
984
985        let mut details_vec = Vec::new();
986        let mut last_error = None;
987
988        for candidate_exchange in exchanges_to_try {
989            let contract = instrument_id_to_ib_contract(instrument_id, Some(candidate_exchange.as_str()))
990                .with_context(|| format!("Failed to convert instrument_id {} to IB contract. Check that the instrument ID format is correct and the venue/symbol are valid.", instrument_id))?;
991
992            match client.contract_details(&contract).await {
993                Ok(result) if !result.is_empty() => {
994                    details_vec = result;
995                    break;
996                }
997                Ok(_) => {}
998                Err(e) => {
999                    last_error = Some((candidate_exchange.clone(), e.to_string()));
1000                }
1001            }
1002        }
1003
1004        if details_vec.is_empty() {
1005            if let Some((candidate_exchange, error)) = last_error {
1006                tracing::warn!(
1007                    "Failed to fetch contract details for {} on {}: {}",
1008                    instrument_id,
1009                    candidate_exchange,
1010                    error
1011                );
1012            } else {
1013                tracing::warn!(
1014                    "No contract details returned for {} - instrument may not exist in IB or contract specification is incomplete",
1015                    instrument_id
1016                );
1017            }
1018            return Ok(());
1019        }
1020
1021        // Process the first contract detail (usually there's only one)
1022        let details = &details_vec[0];
1023
1024        // Check if security type is filtered
1025        let sec_type_str = format!("{:?}", details.contract.security_type);
1026        if self.is_filtered_sec_type(&sec_type_str) {
1027            tracing::warn!("Skipping filtered security type: {}", sec_type_str);
1028            return Ok(());
1029        }
1030
1031        // Parse to Nautilus instrument
1032        let parsed_instrument = parse_ib_contract_to_instrument(details, instrument_id)
1033            .context("Failed to parse IB contract to Nautilus instrument")?;
1034
1035        // TODO: Filter callable support (Python feature)
1036        // Python's filter_callable allows custom filtering of instruments via a Python callable.
1037        // This requires calling Python functions from Rust, which needs GIL handling and Python interop.
1038        // For now, users can filter instruments in Python after loading if needed.
1039        // To implement: Accept PyObject callable in config, call it here with parsed_instrument,
1040        // skip instrument if callable returns False.
1041
1042        // Cache the instrument and mappings (force update if requested)
1043        if force_instrument_update || !self.instruments.contains_key(&instrument_id) {
1044            self.instruments.insert(instrument_id, parsed_instrument);
1045            self.contract_details.insert(instrument_id, details.clone());
1046            self.contracts
1047                .insert(instrument_id, details.contract.clone());
1048            self.contract_id_to_instrument_id
1049                .insert(details.contract.contract_id, instrument_id);
1050            self.price_magnifiers
1051                .insert(instrument_id, details.price_magnifier);
1052        }
1053
1054        tracing::info!(
1055            "Successfully loaded instrument: {} (price_magnifier: {})",
1056            instrument_id,
1057            details.price_magnifier
1058        );
1059        Ok(())
1060    }
1061
1062    /// Batch load multiple instrument IDs.
1063    ///
1064    /// This method fetches and caches contract details for multiple instrument IDs in parallel.
1065    ///
1066    /// # Arguments
1067    ///
1068    /// * `client` - The IB API client
1069    /// * `instrument_ids` - Vector of instrument IDs to load
1070    /// * `filters` - Optional filters to apply (not yet implemented, reserved for future use)
1071    ///
1072    /// # Returns
1073    ///
1074    /// Returns a vector of successfully loaded instrument IDs.
1075    ///
1076    /// # Errors
1077    ///
1078    /// Returns an error if fetching fails.
1079    pub async fn batch_load(
1080        &self,
1081        client: &ibapi::Client,
1082        instrument_ids: Vec<InstrumentId>,
1083        filters: Option<&[String]>,
1084    ) -> anyhow::Result<Vec<InstrumentId>> {
1085        let mut loaded_ids = Vec::new();
1086
1087        // Apply filters if provided
1088        let filtered_ids: Vec<InstrumentId> = if let Some(filter_list) = filters {
1089            // Filter instrument IDs by matching against filter patterns
1090            // Filters can be:
1091            // - Security type filters (e.g., "STK", "OPT", "FUT")
1092            // - Venue filters (e.g., "SMART", "NASDAQ")
1093            // - Symbol patterns (partial matching)
1094            instrument_ids
1095                .into_iter()
1096                .filter(|instrument_id| {
1097                    // Check if instrument matches any filter
1098                    for filter in filter_list {
1099                        // Check symbol match (case-insensitive partial match)
1100                        if instrument_id
1101                            .symbol
1102                            .as_str()
1103                            .to_lowercase()
1104                            .contains(&filter.to_lowercase())
1105                        {
1106                            return true;
1107                        }
1108
1109                        // Check venue match
1110                        if instrument_id.venue.as_str() == filter {
1111                            return true;
1112                        }
1113
1114                        // Check security type (try to infer from instrument)
1115                        if let Some(contract_details) = self.contract_details.get(instrument_id) {
1116                            let sec_type_str =
1117                                format!("{:?}", contract_details.contract.security_type);
1118
1119                            if sec_type_str.to_uppercase().contains(&filter.to_uppercase()) {
1120                                return true;
1121                            }
1122                        }
1123                    }
1124                    false
1125                })
1126                .collect()
1127        } else {
1128            instrument_ids
1129        };
1130
1131        // Load instruments sequentially (can be parallelized in future if needed)
1132        let filtered_count = filtered_ids.len();
1133        for instrument_id in filtered_ids {
1134            match self
1135                .fetch_contract_details(client, instrument_id, false, None)
1136                .await
1137            {
1138                Ok(()) => loaded_ids.push(instrument_id),
1139                Err(e) => {
1140                    tracing::warn!("Failed to load instrument {}: {}", instrument_id, e);
1141                }
1142            }
1143        }
1144
1145        tracing::info!(
1146            "Batch loaded {} instruments ({} after filtering)",
1147            loaded_ids.len(),
1148            filtered_count
1149        );
1150
1151        // Save cache if cache_path is configured
1152        if !loaded_ids.is_empty()
1153            && let Some(ref cache_path) = self.config.cache_path
1154            && let Err(e) = self.save_cache(cache_path).await
1155        {
1156            tracing::warn!("Failed to save instrument cache to {}: {}", cache_path, e);
1157        }
1158
1159        Ok(loaded_ids)
1160    }
1161
1162    /// Fetch option chain for a given underlying contract with expiry filtering.
1163    ///
1164    /// This is equivalent to Python's `get_option_chain_details_by_range`.
1165    /// It uses `contract_details` to fetch options with precise expiry filtering,
1166    /// which is more flexible than the basic `option_chain` API.
1167    ///
1168    /// # Arguments
1169    ///
1170    /// * `client` - The IB API client
1171    /// * `underlying` - The underlying contract
1172    /// * `expiry_min` - Minimum expiry date string (YYYYMMDD format, can be None for no min)
1173    /// * `expiry_max` - Maximum expiry date string (YYYYMMDD format, can be None for no max)
1174    ///
1175    /// # Returns
1176    ///
1177    /// Returns the number of option instruments loaded.
1178    ///
1179    /// # Errors
1180    ///
1181    /// Returns an error if fetching fails.
1182    pub async fn fetch_option_chain_by_range(
1183        &self,
1184        client: &ibapi::Client,
1185        underlying: &Contract,
1186        expiry_min: Option<&str>,
1187        expiry_max: Option<&str>,
1188    ) -> anyhow::Result<usize> {
1189        tracing::info!(
1190            "Building option chain for {}.{} (sec_type={:?}, contract_id={}, expiry_min={:?}, expiry_max={:?}, config_min_days={:?}, config_max_days={:?})",
1191            underlying.symbol.as_str(),
1192            underlying.exchange.as_str(),
1193            underlying.security_type,
1194            underlying.contract_id,
1195            expiry_min,
1196            expiry_max,
1197            self.config.min_expiry_days,
1198            self.config.max_expiry_days,
1199        );
1200
1201        // First, get option chain metadata to determine expirations
1202        let symbol = underlying.symbol.as_str();
1203        let exchange = underlying.exchange.as_str();
1204
1205        let mut option_chain_stream = client
1206            .option_chain(
1207                symbol,
1208                exchange,
1209                underlying.security_type.clone(),
1210                underlying.contract_id,
1211            )
1212            .await
1213            .context("Failed to request option chain from IB")?;
1214
1215        let mut total_loaded = 0;
1216
1217        // Get current time for expiry day calculation
1218        let now = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1219
1220        // Collect all expirations from the metadata
1221        let mut all_expirations = Vec::new();
1222
1223        while let Some(result) = option_chain_stream.next().await {
1224            if let Ok(chain) = result {
1225                tracing::debug!(
1226                    "Received option chain metadata exchange={} trading_class={} expirations={} strikes={}",
1227                    chain.exchange,
1228                    chain.trading_class,
1229                    chain.expirations.len(),
1230                    chain.strikes.len(),
1231                );
1232
1233                for expiration in &chain.expirations {
1234                    // Filter by expiry date string if specified
1235                    let date_filter_pass = match (expiry_min, expiry_max) {
1236                        (Some(min), Some(max)) => {
1237                            expiration.as_str() >= min && expiration.as_str() <= max
1238                        }
1239                        (Some(min), None) => expiration.as_str() >= min,
1240                        (None, Some(max)) => expiration.as_str() <= max,
1241                        (None, None) => true,
1242                    };
1243
1244                    // Filter by expiry days from config if specified
1245                    let days_filter_pass = {
1246                        let expiry_ns = crate::providers::parse::expiry_timestring_to_unix_nanos(
1247                            expiration.as_str(),
1248                            None,
1249                        )
1250                        .unwrap_or(now);
1251                        let days_until_expiry = (expiry_ns.as_u64().saturating_sub(now.as_u64()))
1252                            / (24 * 60 * 60 * 1_000_000_000);
1253
1254                        let min_days_ok = self
1255                            .config
1256                            .min_expiry_days
1257                            .is_none_or(|min| days_until_expiry >= min as u64);
1258                        let max_days_ok = self
1259                            .config
1260                            .max_expiry_days
1261                            .is_none_or(|max| days_until_expiry <= max as u64);
1262
1263                        min_days_ok && max_days_ok
1264                    };
1265
1266                    if date_filter_pass && days_filter_pass && !all_expirations.contains(expiration)
1267                    {
1268                        all_expirations.push(expiration.clone());
1269                    }
1270                }
1271            }
1272        }
1273
1274        tracing::info!(
1275            "Filtered {} option expirations for {}.{}",
1276            all_expirations.len(),
1277            underlying.symbol.as_str(),
1278            underlying.exchange.as_str(),
1279        );
1280
1281        // Now fetch contract details for each expiry using contract_details
1282        for expiration in all_expirations {
1283            tracing::info!(
1284                "Requesting option contract details for {}.{} expiry {}",
1285                underlying.symbol.as_str(),
1286                underlying.exchange.as_str(),
1287                expiration,
1288            );
1289
1290            let option_contract = Contract {
1291                contract_id: 0,
1292                symbol: underlying.symbol.clone(),
1293                security_type: if underlying.security_type == SecurityType::Future {
1294                    SecurityType::FuturesOption
1295                } else {
1296                    SecurityType::Option
1297                },
1298                last_trade_date_or_contract_month: expiration.clone(),
1299                strike: 0.0,
1300                right: String::new(),
1301                multiplier: String::new(),
1302                exchange: underlying.exchange.clone(),
1303                currency: underlying.currency.clone(),
1304                local_symbol: String::new(),
1305                primary_exchange: Exchange::default(),
1306                trading_class: String::new(),
1307                include_expired: false,
1308                security_id_type: String::new(),
1309                security_id: String::new(),
1310                combo_legs_description: String::new(),
1311                combo_legs: Vec::new(),
1312                delta_neutral_contract: None,
1313                issuer_id: String::new(),
1314                description: String::new(),
1315                last_trade_date: None,
1316            };
1317
1318            match client.contract_details(&option_contract).await {
1319                Ok(details_vec) => {
1320                    tracing::info!(
1321                        "Received {} raw option contract details for {}.{} expiry {}",
1322                        details_vec.len(),
1323                        underlying.symbol.as_str(),
1324                        underlying.exchange.as_str(),
1325                        expiration,
1326                    );
1327
1328                    for details in details_vec {
1329                        // Filter by underlying contract ID
1330                        if details.under_contract_id != underlying.contract_id {
1331                            continue;
1332                        }
1333
1334                        let contract_id = details.contract.contract_id;
1335
1336                        if self.contract_id_to_instrument_id.contains_key(&contract_id) {
1337                            continue;
1338                        }
1339
1340                        let venue = self.determine_venue(&details.contract, Some(&details));
1341                        let instrument_id = match self.config.symbology_method {
1342                            crate::config::SymbologyMethod::Simplified => {
1343                                crate::common::parse::ib_contract_to_instrument_id_simplified(
1344                                    &details.contract,
1345                                    Some(venue),
1346                                )
1347                            }
1348                            crate::config::SymbologyMethod::Raw => {
1349                                crate::common::parse::ib_contract_to_instrument_id_raw(
1350                                    &details.contract,
1351                                    Some(venue),
1352                                )
1353                            }
1354                        }
1355                        .context("Failed to convert IB contract to instrument ID")?;
1356
1357                        let sec_type_str = format!("{:?}", details.contract.security_type);
1358                        if self.is_filtered_sec_type(&sec_type_str) {
1359                            continue;
1360                        }
1361
1362                        match parse_ib_contract_to_instrument(&details, instrument_id) {
1363                            Ok(parsed_instrument) => {
1364                                self.instruments.insert(instrument_id, parsed_instrument);
1365                                self.contract_details.insert(instrument_id, details.clone());
1366                                self.contracts
1367                                    .insert(instrument_id, details.contract.clone());
1368                                self.contract_id_to_instrument_id
1369                                    .insert(contract_id, instrument_id);
1370                                self.price_magnifiers
1371                                    .insert(instrument_id, details.price_magnifier);
1372                                total_loaded += 1;
1373                            }
1374                            Err(e) => {
1375                                tracing::warn!("Failed to parse option instrument: {}", e);
1376                            }
1377                        }
1378                    }
1379                }
1380                Err(e) => {
1381                    tracing::warn!(
1382                        "Failed to fetch contract details for expiration {}: {}",
1383                        expiration,
1384                        e
1385                    );
1386                }
1387            }
1388        }
1389
1390        tracing::info!(
1391            "Successfully loaded {} option instruments from chain for {}.{}",
1392            total_loaded,
1393            underlying.symbol.as_str(),
1394            underlying.exchange.as_str(),
1395        );
1396
1397        // Save cache if cache_path is configured
1398        if total_loaded > 0
1399            && let Some(ref cache_path) = self.config.cache_path
1400            && let Err(e) = self.save_cache(cache_path).await
1401        {
1402            tracing::warn!("Failed to save instrument cache to {}: {}", cache_path, e);
1403        }
1404
1405        Ok(total_loaded)
1406    }
1407
1408    /// Fetch and cache futures chain (all futures contracts for a symbol).
1409    ///
1410    /// This method fetches all futures contracts for a given underlying symbol
1411    /// and populates the cache with all individual futures instruments.
1412    ///
1413    /// # Arguments
1414    ///
1415    /// * `client` - The IB API client
1416    /// * `symbol` - The underlying symbol
1417    /// * `exchange` - The exchange (use "" for all exchanges)
1418    /// * `currency` - The currency (use USD as default)
1419    ///
1420    /// # Returns
1421    ///
1422    /// Returns the number of futures instruments loaded.
1423    ///
1424    /// # Errors
1425    ///
1426    /// Returns an error if fetching fails.
1427    pub async fn fetch_futures_chain(
1428        &self,
1429        client: &ibapi::Client,
1430        symbol: &str,
1431        exchange: &str,
1432        currency: &str,
1433        min_expiry_days: Option<u32>,
1434        max_expiry_days: Option<u32>,
1435    ) -> anyhow::Result<usize> {
1436        tracing::info!(
1437            "Building futures chain for {}.{} (currency={}, min_days={:?}, max_days={:?}, config_min_days={:?}, config_max_days={:?})",
1438            symbol,
1439            exchange,
1440            currency,
1441            min_expiry_days,
1442            max_expiry_days,
1443            self.config.min_expiry_days,
1444            self.config.max_expiry_days,
1445        );
1446
1447        // Build futures contract for lookup
1448        let futures_contract = Contract {
1449            contract_id: 0, // 0 for lookup by specification
1450            symbol: Symbol::from(symbol.to_string()),
1451            security_type: SecurityType::Future,
1452            last_trade_date_or_contract_month: String::new(),
1453            strike: 0.0,
1454            right: String::new(),
1455            multiplier: String::new(),
1456            exchange: Exchange::from(exchange.to_string()),
1457            currency: ibapi::contracts::Currency::from(currency.to_string()),
1458            local_symbol: String::new(),
1459            primary_exchange: Exchange::default(),
1460            trading_class: String::new(),
1461            include_expired: false,
1462            security_id_type: String::new(),
1463            security_id: String::new(),
1464            combo_legs_description: String::new(),
1465            combo_legs: Vec::new(),
1466            delta_neutral_contract: None,
1467            issuer_id: String::new(),
1468            description: String::new(),
1469            last_trade_date: None,
1470        };
1471
1472        // Fetch contract details for all matching futures
1473        let details_vec = client
1474            .contract_details(&futures_contract)
1475            .await
1476            .context("Failed to fetch futures chain from IB")?;
1477
1478        tracing::info!(
1479            "Received {} raw futures contract details for {}.{}",
1480            details_vec.len(),
1481            symbol,
1482            exchange,
1483        );
1484
1485        let mut total_loaded = 0;
1486        let now = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1487
1488        for details in details_vec {
1489            let contract_id = details.contract.contract_id;
1490
1491            // Check if already cached
1492            if self.contract_id_to_instrument_id.contains_key(&contract_id) {
1493                continue;
1494            }
1495
1496            // Generate instrument ID using configured symbology method
1497            let venue = self.determine_venue(&details.contract, Some(&details));
1498            let instrument_id = match self.config.symbology_method {
1499                crate::config::SymbologyMethod::Simplified => {
1500                    crate::common::parse::ib_contract_to_instrument_id_simplified(
1501                        &details.contract,
1502                        Some(venue),
1503                    )
1504                }
1505                crate::config::SymbologyMethod::Raw => {
1506                    crate::common::parse::ib_contract_to_instrument_id_raw(
1507                        &details.contract,
1508                        Some(venue),
1509                    )
1510                }
1511            }
1512            .context("Failed to convert IB contract to instrument ID")?;
1513
1514            // Check if security type is filtered
1515            let sec_type_str = format!("{:?}", details.contract.security_type);
1516            if self.is_filtered_sec_type(&sec_type_str) {
1517                continue;
1518            }
1519
1520            // Filter by expiry days for futures
1521            if !details
1522                .contract
1523                .last_trade_date_or_contract_month
1524                .is_empty()
1525                && let Ok(expiry_ns) = crate::providers::parse::expiry_timestring_to_unix_nanos(
1526                    &details.contract.last_trade_date_or_contract_month,
1527                    Some(&details),
1528                )
1529            {
1530                let days_until_expiry = (expiry_ns.as_u64().saturating_sub(now.as_u64()))
1531                    / (24 * 60 * 60 * 1_000_000_000);
1532
1533                let min_days_ok = min_expiry_days
1534                    .or(self.config.min_expiry_days)
1535                    .is_none_or(|min| days_until_expiry >= min as u64);
1536                let max_days_ok = max_expiry_days
1537                    .or(self.config.max_expiry_days)
1538                    .is_none_or(|max| days_until_expiry <= max as u64);
1539
1540                if !min_days_ok || !max_days_ok {
1541                    continue;
1542                }
1543            }
1544
1545            // Parse to Nautilus instrument
1546            match parse_ib_contract_to_instrument(&details, instrument_id) {
1547                Ok(parsed_instrument) => {
1548                    // Cache the instrument and mappings
1549                    self.instruments.insert(instrument_id, parsed_instrument);
1550                    self.contract_details.insert(instrument_id, details.clone());
1551                    self.contracts
1552                        .insert(instrument_id, details.contract.clone());
1553                    self.contract_id_to_instrument_id
1554                        .insert(contract_id, instrument_id);
1555                    self.price_magnifiers
1556                        .insert(instrument_id, details.price_magnifier);
1557                    total_loaded += 1;
1558                }
1559                Err(e) => {
1560                    tracing::warn!("Failed to parse futures instrument: {}", e);
1561                }
1562            }
1563        }
1564
1565        tracing::info!(
1566            "Successfully loaded {} futures instruments from chain",
1567            total_loaded
1568        );
1569
1570        // Save cache if cache_path is configured
1571        if total_loaded > 0
1572            && let Some(ref cache_path) = self.config.cache_path
1573            && let Err(e) = self.save_cache(cache_path).await
1574        {
1575            tracing::warn!("Failed to save instrument cache to {}: {}", cache_path, e);
1576        }
1577
1578        Ok(total_loaded)
1579    }
1580
1581    /// Fetch and cache a BAG (spread) contract.
1582    ///
1583    /// This method fetches contract details for a spread contract by requesting
1584    /// contract details with a BAG contract. The BAG contract should have its
1585    /// combo_legs populated with the individual leg contract IDs.
1586    ///
1587    /// # Arguments
1588    ///
1589    /// * `client` - The IB API client
1590    /// * `bag_contract` - The BAG contract with populated combo_legs
1591    ///
1592    /// # Returns
1593    ///
1594    /// Returns the number of spread instruments loaded (0 or 1).
1595    ///
1596    /// # Errors
1597    ///
1598    /// Returns an error if fetching fails.
1599    ///
1600    /// # Notes
1601    ///
1602    /// This method now auto-loads all leg instruments from combo_legs and creates
1603    /// a proper spread instrument, matching Python's `_load_bag_contract` behavior.
1604    pub async fn fetch_bag_contract(
1605        &self,
1606        client: &ibapi::Client,
1607        bag_contract: &Contract,
1608    ) -> anyhow::Result<usize> {
1609        // Validate BAG contract
1610        if bag_contract.security_type != SecurityType::Spread || bag_contract.combo_legs.is_empty()
1611        {
1612            anyhow::bail!(
1613                "Invalid BAG contract: must have security_type=Spread and non-empty combo_legs"
1614            );
1615        }
1616
1617        tracing::info!(
1618            "Loading BAG contract with {} legs",
1619            bag_contract.combo_legs.len()
1620        );
1621
1622        // First, load all individual leg instruments and collect their details
1623        let mut leg_contract_details = Vec::new();
1624        let mut leg_tuples = Vec::new();
1625
1626        for combo_leg in &bag_contract.combo_legs {
1627            // Create a leg contract using information from the combo leg
1628            let leg_contract = Contract {
1629                contract_id: combo_leg.contract_id,  // Use conId from combo_leg
1630                symbol: bag_contract.symbol.clone(), // Use underlying symbol from BAG
1631                security_type: SecurityType::Option, // Default to Option, will be determined from contract details
1632                last_trade_date_or_contract_month: String::new(),
1633                strike: 0.0,
1634                right: String::new(),
1635                multiplier: String::new(),
1636                exchange: Exchange::from(combo_leg.exchange.as_str()),
1637                currency: bag_contract.currency.clone(), // Use currency from BAG
1638                local_symbol: String::new(),
1639                primary_exchange: Exchange::default(),
1640                trading_class: String::new(),
1641                include_expired: false,
1642                security_id_type: String::new(),
1643                security_id: String::new(),
1644                combo_legs_description: String::new(),
1645                combo_legs: Vec::new(),
1646                delta_neutral_contract: None,
1647                issuer_id: String::new(),
1648                description: String::new(),
1649                last_trade_date: None,
1650            };
1651
1652            // Fetch contract details for this leg
1653            let leg_details_vec =
1654                client
1655                    .contract_details(&leg_contract)
1656                    .await
1657                    .with_context(|| {
1658                        format!(
1659                            "Failed to fetch contract details for leg conId {}",
1660                            combo_leg.contract_id
1661                        )
1662                    })?;
1663
1664            if leg_details_vec.is_empty() {
1665                tracing::warn!(
1666                    "No contract details returned for leg conId {}",
1667                    combo_leg.contract_id
1668                );
1669                continue;
1670            }
1671
1672            let leg_details = &leg_details_vec[0];
1673            let leg_contract_id = leg_details.contract.contract_id;
1674
1675            // Check if leg is already cached
1676            let leg_instrument_id =
1677                if let Some(cached_id) = self.contract_id_to_instrument_id.get(&leg_contract_id) {
1678                    *cached_id.value()
1679                } else {
1680                    // Load the leg instrument
1681                    let leg_venue = self.determine_venue(&leg_details.contract, Some(leg_details));
1682                    let leg_instrument_id = match self.config.symbology_method {
1683                        crate::config::SymbologyMethod::Simplified => {
1684                            crate::common::parse::ib_contract_to_instrument_id_simplified(
1685                                &leg_details.contract,
1686                                Some(leg_venue),
1687                            )
1688                        }
1689                        crate::config::SymbologyMethod::Raw => {
1690                            crate::common::parse::ib_contract_to_instrument_id_raw(
1691                                &leg_details.contract,
1692                                Some(leg_venue),
1693                            )
1694                        }
1695                    }
1696                    .context("Failed to convert leg contract to instrument ID")?;
1697
1698                    // Parse and cache the leg instrument
1699                    let leg_instrument =
1700                        parse_ib_contract_to_instrument(leg_details, leg_instrument_id)
1701                            .context("Failed to parse leg instrument")?;
1702
1703                    self.instruments.insert(leg_instrument_id, leg_instrument);
1704                    self.contract_details
1705                        .insert(leg_instrument_id, leg_details.clone());
1706                    self.contracts
1707                        .insert(leg_instrument_id, leg_details.contract.clone());
1708                    self.contract_id_to_instrument_id
1709                        .insert(leg_contract_id, leg_instrument_id);
1710                    self.price_magnifiers
1711                        .insert(leg_instrument_id, leg_details.price_magnifier);
1712
1713                    leg_instrument_id
1714                };
1715
1716            // Determine ratio (positive for BUY, negative for SELL)
1717            let ratio = if combo_leg.action == "BUY" {
1718                combo_leg.ratio
1719            } else {
1720                -combo_leg.ratio
1721            };
1722
1723            // Get the contract details for this leg (should be cached now)
1724            let leg_details_clone = self
1725                .contract_details
1726                .get(&leg_instrument_id)
1727                .map(|entry| entry.value().clone())
1728                .ok_or_else(|| {
1729                    anyhow::anyhow!(
1730                        "Contract details not found for leg {} after loading",
1731                        leg_instrument_id
1732                    )
1733                })?;
1734
1735            leg_contract_details.push((leg_details_clone, ratio));
1736            leg_tuples.push((leg_instrument_id, ratio));
1737        }
1738
1739        if leg_tuples.is_empty() {
1740            anyhow::bail!("No valid legs loaded for BAG contract");
1741        }
1742
1743        // Create spread instrument ID from leg tuples
1744        let spread_instrument_id = create_spread_instrument_id(&leg_tuples)
1745            .context("Failed to create spread instrument ID from leg tuples")?;
1746
1747        // Check if spread is already cached
1748        if self.instruments.contains_key(&spread_instrument_id) {
1749            tracing::info!("Spread instrument {} already cached", spread_instrument_id);
1750            return Ok(0);
1751        }
1752
1753        // Fetch BAG contract details (for storing the mapping)
1754        let bag_details_vec = client
1755            .contract_details(bag_contract)
1756            .await
1757            .context("Failed to fetch BAG contract details from IB")?;
1758
1759        if bag_details_vec.is_empty() {
1760            tracing::warn!("No contract details returned for BAG contract");
1761            return Ok(0);
1762        }
1763
1764        let bag_details = &bag_details_vec[0];
1765        let bag_contract_id = bag_details.contract.contract_id;
1766
1767        // Create the spread instrument
1768        let timestamp = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1769
1770        // Convert leg_contract_details to the format needed by parse_spread_instrument_id
1771        let leg_details_refs: Vec<(&ibapi::contracts::ContractDetails, i32)> =
1772            leg_contract_details.iter().map(|(d, r)| (d, *r)).collect();
1773
1774        let spread_instrument = parse_spread_instrument_any(
1775            spread_instrument_id,
1776            &leg_details_refs,
1777            Some(&bag_details.contract),
1778            Some(timestamp),
1779        )
1780        .context("Failed to parse spread instrument")?;
1781
1782        // Cache the spread instrument and mappings
1783        self.instruments
1784            .insert(spread_instrument_id, spread_instrument);
1785        self.contract_details
1786            .insert(spread_instrument_id, bag_details.clone());
1787        self.contracts
1788            .insert(spread_instrument_id, bag_details.contract.clone());
1789        self.contract_id_to_instrument_id
1790            .insert(bag_contract_id, spread_instrument_id);
1791        self.price_magnifiers
1792            .insert(spread_instrument_id, bag_details.price_magnifier);
1793
1794        tracing::info!(
1795            "Successfully loaded spread instrument {} with {} legs",
1796            spread_instrument_id,
1797            leg_tuples.len()
1798        );
1799
1800        // Save cache if cache_path is configured
1801        if let Some(ref cache_path) = self.config.cache_path
1802            && let Err(e) = self.save_cache(cache_path).await
1803        {
1804            tracing::warn!("Failed to save instrument cache to {}: {}", cache_path, e);
1805        }
1806
1807        Ok(1)
1808    }
1809
1810    /// Save the current instrument cache to disk.
1811    ///
1812    /// # Arguments
1813    ///
1814    /// * `cache_path` - Path to the cache file
1815    ///
1816    /// # Errors
1817    ///
1818    /// Returns an error if serialization or file I/O fails.
1819    pub async fn save_cache(&self, cache_path: &str) -> anyhow::Result<()> {
1820        let cache = InstrumentCache {
1821            cache_timestamp: Utc::now(),
1822            contract_id_to_instrument_id: self
1823                .contract_id_to_instrument_id
1824                .iter()
1825                .map(|entry| (*entry.key(), entry.value().to_string()))
1826                .collect(),
1827            price_magnifiers: self
1828                .price_magnifiers
1829                .iter()
1830                .map(|entry| (entry.key().to_string(), *entry.value()))
1831                .collect(),
1832            instruments: self
1833                .instruments
1834                .iter()
1835                .map(|entry| {
1836                    let instrument_id = entry.key().to_string();
1837                    let json =
1838                        serde_json::to_string(entry.value()).unwrap_or_else(|_| String::new());
1839                    (instrument_id, json)
1840                })
1841                .collect(),
1842        };
1843
1844        // Ensure parent directory exists
1845        if let Some(parent) = Path::new(cache_path).parent() {
1846            fs::create_dir_all(parent)?;
1847        }
1848
1849        // Write cache to file
1850        let json = serde_json::to_string_pretty(&cache)?;
1851        fs::write(cache_path, json)?;
1852        tracing::info!(
1853            "Saved instrument cache to {} ({} instruments)",
1854            cache_path,
1855            cache.instruments.len()
1856        );
1857        Ok(())
1858    }
1859
1860    /// Load instrument cache from disk if valid.
1861    ///
1862    /// # Arguments
1863    ///
1864    /// * `cache_path` - Path to the cache file
1865    ///
1866    /// # Returns
1867    ///
1868    /// Returns `true` if cache was loaded successfully and is valid, `false` otherwise.
1869    ///
1870    /// # Errors
1871    ///
1872    /// Returns an error if deserialization or file I/O fails (but treats missing file as non-error).
1873    pub async fn load_cache(&self, cache_path: &str) -> anyhow::Result<bool> {
1874        // Check if cache file exists
1875        if !Path::new(cache_path).exists() {
1876            tracing::debug!("Cache file does not exist: {}", cache_path);
1877            return Ok(false);
1878        }
1879
1880        // Load cache from file
1881        let json = fs::read_to_string(cache_path)?;
1882        let cache: InstrumentCache = serde_json::from_str(&json)?;
1883
1884        // Check cache validity
1885        if let Some(validity_days) = self.config.cache_validity_days {
1886            let cache_age = Utc::now() - cache.cache_timestamp;
1887            let max_age = chrono::Duration::days(validity_days as i64);
1888            if cache_age > max_age {
1889                tracing::info!(
1890                    "Cache is expired (age: {} days, max: {} days). Ignoring cache",
1891                    cache_age.num_days(),
1892                    validity_days
1893                );
1894                return Ok(false);
1895            }
1896        }
1897
1898        // Deserialize and restore instruments
1899        let mut loaded_count = 0;
1900
1901        for (instrument_id_str, instrument_json) in &cache.instruments {
1902            match InstrumentId::from_str(instrument_id_str) {
1903                Ok(instrument_id) => match serde_json::from_str::<InstrumentAny>(instrument_json) {
1904                    Ok(instrument) => {
1905                        self.instruments.insert(instrument_id, instrument);
1906
1907                        if let Ok(value) =
1908                            serde_json::from_str::<serde_json::Value>(instrument_json)
1909                            && let Some(contract_json) =
1910                                value.get("info").and_then(|info| info.get("contract"))
1911                            && let Ok(contract) =
1912                                crate::common::contracts::parse_contract_from_json(contract_json)
1913                        {
1914                            self.contracts.insert(instrument_id, contract);
1915                        }
1916                        loaded_count += 1;
1917                    }
1918                    Err(e) => {
1919                        tracing::warn!(
1920                            "Failed to deserialize instrument {}: {}",
1921                            instrument_id_str,
1922                            e
1923                        );
1924                    }
1925                },
1926                Err(e) => {
1927                    tracing::warn!("Failed to parse instrument ID {}: {}", instrument_id_str, e);
1928                }
1929            }
1930        }
1931
1932        // Restore contract ID mappings
1933        for (contract_id, instrument_id_str) in &cache.contract_id_to_instrument_id {
1934            if let Ok(instrument_id) = InstrumentId::from_str(instrument_id_str) {
1935                self.contract_id_to_instrument_id
1936                    .insert(*contract_id, instrument_id);
1937            }
1938        }
1939
1940        // Restore price magnifiers
1941        for (instrument_id_str, magnifier) in &cache.price_magnifiers {
1942            if let Ok(instrument_id) = InstrumentId::from_str(instrument_id_str) {
1943                self.price_magnifiers.insert(instrument_id, *magnifier);
1944            }
1945        }
1946
1947        tracing::info!(
1948            "Loaded instrument cache from {} ({} instruments, created at {})",
1949            cache_path,
1950            loaded_count,
1951            cache.cache_timestamp
1952        );
1953        Ok(true)
1954    }
1955}
1956
1957#[cfg(test)]
1958mod tests {
1959    use std::fs;
1960
1961    use nautilus_core::UnixNanos;
1962    use nautilus_model::{
1963        identifiers::{Symbol, Venue},
1964        instruments::CurrencyPair,
1965        types::{Price, Quantity, currency::Currency},
1966    };
1967    use tempfile::TempDir;
1968
1969    use super::*;
1970
1971    fn create_test_provider_with_cache() -> (InteractiveBrokersInstrumentProvider, TempDir) {
1972        let temp_dir = TempDir::new().unwrap();
1973        let cache_path = temp_dir
1974            .path()
1975            .join("test_cache.json")
1976            .to_str()
1977            .unwrap()
1978            .to_string();
1979
1980        let config = InteractiveBrokersInstrumentProviderConfig::builder()
1981            .cache_path(cache_path)
1982            .cache_validity_days(7u32)
1983            .build();
1984
1985        let provider = InteractiveBrokersInstrumentProvider::new(config);
1986        (provider, temp_dir)
1987    }
1988
1989    fn create_test_instrument(instrument_id: InstrumentId) -> InstrumentAny {
1990        CurrencyPair::new(
1991            instrument_id,
1992            Symbol::from("EUR/USD"),
1993            Currency::from("EUR"),
1994            Currency::from("USD"),
1995            4,
1996            0,
1997            Price::from("0.0001"),
1998            Quantity::from(1),
1999            None,
2000            None,
2001            None,
2002            None,
2003            None,
2004            None,
2005            None,
2006            None,
2007            None,
2008            None,
2009            None,
2010            None,
2011            None,
2012            UnixNanos::default(),
2013            UnixNanos::default(),
2014        )
2015        .into()
2016    }
2017
2018    #[tokio::test]
2019    async fn test_save_cache() {
2020        let (provider, _temp_dir) = create_test_provider_with_cache();
2021        let cache_path = provider.config.cache_path.as_ref().unwrap().clone();
2022
2023        // Add some test instruments
2024        let instrument_id1 = InstrumentId::new(Symbol::from("EUR/USD"), Venue::from("IDEALPRO"));
2025        let instrument_id2 = InstrumentId::new(Symbol::from("GBP/USD"), Venue::from("IDEALPRO"));
2026
2027        let instrument1 = create_test_instrument(instrument_id1);
2028        let instrument2 = create_test_instrument(instrument_id2);
2029
2030        provider.instruments.insert(instrument_id1, instrument1);
2031        provider.instruments.insert(instrument_id2, instrument2);
2032        provider
2033            .contract_id_to_instrument_id
2034            .insert(100, instrument_id1);
2035        provider
2036            .contract_id_to_instrument_id
2037            .insert(200, instrument_id2);
2038        provider.price_magnifiers.insert(instrument_id1, 1);
2039        provider.price_magnifiers.insert(instrument_id2, 1);
2040
2041        // Save cache
2042        let result = provider.save_cache(&cache_path).await;
2043        assert!(result.is_ok(), "save_cache should succeed");
2044
2045        // Verify file exists
2046        assert!(Path::new(&cache_path).exists(), "Cache file should exist");
2047
2048        // Verify file contains JSON
2049        let contents = fs::read_to_string(&cache_path).unwrap();
2050        assert!(
2051            contents.contains("EUR/USD"),
2052            "Cache should contain instrument data"
2053        );
2054        assert!(
2055            contents.contains("cache_timestamp"),
2056            "Cache should contain timestamp"
2057        );
2058    }
2059
2060    #[tokio::test]
2061    async fn test_load_cache_valid() {
2062        let (provider, _temp_dir) = create_test_provider_with_cache();
2063        let cache_path = provider.config.cache_path.as_ref().unwrap().clone();
2064
2065        // First save a cache
2066        let instrument_id = InstrumentId::new(Symbol::from("EUR/USD"), Venue::from("IDEALPRO"));
2067        let instrument = create_test_instrument(instrument_id);
2068
2069        provider
2070            .instruments
2071            .insert(instrument_id, instrument.clone());
2072        provider
2073            .contract_id_to_instrument_id
2074            .insert(100, instrument_id);
2075        provider.price_magnifiers.insert(instrument_id, 1);
2076
2077        provider.save_cache(&cache_path).await.unwrap();
2078
2079        // Create a new provider and load the cache
2080        let new_config = InteractiveBrokersInstrumentProviderConfig::builder()
2081            .cache_path(cache_path.clone())
2082            .cache_validity_days(7u32)
2083            .build();
2084
2085        let new_provider = InteractiveBrokersInstrumentProvider::new(new_config);
2086
2087        let result = new_provider.load_cache(&cache_path).await;
2088        assert!(result.is_ok(), "load_cache should succeed");
2089        assert!(
2090            result.unwrap(),
2091            "load_cache should return true for valid cache"
2092        );
2093
2094        // Verify instrument was loaded
2095        assert!(
2096            new_provider.find(&instrument_id).is_some(),
2097            "Instrument should be loaded from cache"
2098        );
2099        assert_eq!(new_provider.count(), 1, "Provider should have 1 instrument");
2100    }
2101
2102    #[tokio::test]
2103    async fn test_load_cache_missing_file() {
2104        let (provider, _temp_dir) = create_test_provider_with_cache();
2105        let cache_path = "/nonexistent/path/cache.json";
2106
2107        let result = provider.load_cache(cache_path).await;
2108        assert!(
2109            result.is_ok(),
2110            "load_cache should not error on missing file"
2111        );
2112        assert!(
2113            !result.unwrap(),
2114            "load_cache should return false for missing file"
2115        );
2116    }
2117
2118    #[tokio::test]
2119    async fn test_load_cache_expired() {
2120        let (provider, _temp_dir) = create_test_provider_with_cache();
2121        let cache_path = provider.config.cache_path.as_ref().unwrap().clone();
2122
2123        // Create an expired cache manually
2124        let old_timestamp = Utc::now() - chrono::Duration::days(10);
2125        let expired_cache = InstrumentCache {
2126            cache_timestamp: old_timestamp,
2127            contract_id_to_instrument_id: vec![],
2128            price_magnifiers: vec![],
2129            instruments: vec![],
2130        };
2131
2132        let json = serde_json::to_string_pretty(&expired_cache).unwrap();
2133        fs::write(&cache_path, json).unwrap();
2134
2135        // Try to load with validity_days = 7
2136        let result = provider.load_cache(&cache_path).await;
2137        assert!(
2138            result.is_ok(),
2139            "load_cache should not error on expired cache"
2140        );
2141        assert!(
2142            !result.unwrap(),
2143            "load_cache should return false for expired cache"
2144        );
2145    }
2146}