Skip to main content

nautilus_interactive_brokers/historical/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Historical data client for Interactive Brokers.
17
18use std::{fmt::Debug, str::FromStr, sync::Arc};
19
20use anyhow::Context;
21use chrono::{DateTime, Utc};
22use ibapi::{
23    client::Client,
24    contracts::Contract,
25    market_data::{TradingHours, historical},
26};
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29    data::{Bar, BarSpecification, BarType, Data, QuoteTick, TradeTick},
30    enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
31    identifiers::InstrumentId,
32    instruments::{Instrument, any::InstrumentAny},
33    types::{Price, Quantity},
34};
35
36use crate::{
37    data::convert::{
38        bar_type_to_ib_bar_size, chrono_to_ib_datetime, ib_bar_to_nautilus_bar,
39        ib_timestamp_to_unix_nanos, price_type_to_ib_what_to_show,
40    },
41    providers::instruments::InteractiveBrokersInstrumentProvider,
42};
43
44/// Historical data client for Interactive Brokers.
45///
46/// This client provides methods for requesting historical bars and ticks
47/// for backtesting and research purposes.
48#[cfg_attr(
49    feature = "python",
50    pyo3::pyclass(
51        module = "nautilus_trader.core.nautilus_pyo3.interactive_brokers",
52        subclass,
53        from_py_object
54    )
55)]
56pub struct HistoricalInteractiveBrokersClient {
57    /// IB API client.
58    ib_client: Arc<Client>,
59    /// Instrument provider.
60    instrument_provider: Arc<InteractiveBrokersInstrumentProvider>,
61}
62
63impl Clone for HistoricalInteractiveBrokersClient {
64    fn clone(&self) -> Self {
65        Self {
66            ib_client: Arc::clone(&self.ib_client),
67            instrument_provider: Arc::clone(&self.instrument_provider),
68        }
69    }
70}
71
72impl Debug for HistoricalInteractiveBrokersClient {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        f.debug_struct(stringify!(HistoricalInteractiveBrokersClient))
75            .field("ib_client", &"<Client>")
76            .field("instrument_provider", &"<InstrumentProvider>")
77            .finish()
78    }
79}
80
81impl HistoricalInteractiveBrokersClient {
82    /// Create a new historical data client.
83    ///
84    /// # Arguments
85    ///
86    /// * `ib_client` - The IB API client
87    /// * `instrument_provider` - The instrument provider
88    pub fn new(
89        ib_client: Arc<Client>,
90        instrument_provider: Arc<InteractiveBrokersInstrumentProvider>,
91    ) -> Self {
92        Self {
93            ib_client,
94            instrument_provider,
95        }
96    }
97
98    /// Request historical bars.
99    ///
100    /// # Arguments
101    ///
102    /// * `bar_specifications` - List of bar specifications (e.g., "1-HOUR-LAST")
103    /// * `end_date_time` - End date for bars
104    /// * `start_date_time` - Optional start date
105    /// * `duration` - Optional duration string (e.g., "1 D")
106    /// * `contracts` - List of IB contracts
107    /// * `instrument_ids` - List of instrument IDs
108    /// * `use_rth` - Use regular trading hours only
109    /// * `timeout` - Request timeout in seconds
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if the request fails.
114    #[allow(clippy::too_many_arguments)]
115    pub async fn request_bars(
116        &self,
117        bar_specifications: Vec<&str>,
118        end_date_time: DateTime<Utc>,
119        start_date_time: Option<DateTime<Utc>>,
120        duration: Option<&str>,
121        contracts: Option<Vec<Contract>>,
122        instrument_ids: Option<Vec<InstrumentId>>,
123        use_rth: bool,
124        timeout: u64,
125    ) -> anyhow::Result<Vec<Bar>> {
126        // Validate inputs
127        if start_date_time.is_some() && duration.is_some() {
128            anyhow::bail!("Either start_date_time or duration should be provided, not both");
129        }
130
131        if let Some(start) = start_date_time
132            && start >= end_date_time
133        {
134            anyhow::bail!("Start date must be before end date");
135        }
136
137        let contracts = contracts.unwrap_or_default();
138        let instrument_ids = instrument_ids.unwrap_or_default();
139
140        if contracts.is_empty() && instrument_ids.is_empty() {
141            anyhow::bail!("Either contracts or instrument_ids must be provided");
142        }
143
144        // Convert instrument IDs to contracts using instrument provider
145        let mut all_contracts = contracts;
146
147        for instrument_id in instrument_ids {
148            // Try to find instrument in provider first
149            if self.instrument_provider.find(&instrument_id).is_none() {
150                // Auto-fetch if not cached
151                if let Err(e) = self
152                    .instrument_provider
153                    .fetch_contract_details(&self.ib_client, instrument_id, false, None)
154                    .await
155                {
156                    tracing::warn!(
157                        "Failed to auto-fetch contract details for {}: {}",
158                        instrument_id,
159                        e
160                    );
161                }
162            }
163
164            // Try to convert instrument ID to contract
165            if let Ok(contract) = self
166                .instrument_provider
167                .resolve_contract_for_instrument(instrument_id)
168            {
169                all_contracts.push(contract);
170            } else {
171                tracing::warn!(
172                    "Failed to convert instrument_id {} to IB contract, skipping",
173                    instrument_id
174                );
175            }
176        }
177
178        // Auto-fetch contracts if not cached (by contract ID)
179        for contract in &all_contracts {
180            if let Some(instrument_id) = self
181                .instrument_provider
182                .get_instrument_id_by_contract_id(contract.contract_id)
183                && self.instrument_provider.find(&instrument_id).is_none()
184                && let Err(e) = self
185                    .instrument_provider
186                    .fetch_contract_details(&self.ib_client, instrument_id, false, None)
187                    .await
188            {
189                tracing::warn!(
190                    "Failed to auto-fetch contract details for contract ID {}: {}",
191                    contract.contract_id,
192                    e
193                );
194            }
195        }
196
197        if all_contracts.is_empty() {
198            anyhow::bail!("No valid contracts found after conversion");
199        }
200
201        let trading_hours = if use_rth {
202            TradingHours::Regular
203        } else {
204            TradingHours::Extended
205        };
206
207        let mut all_bars = Vec::new();
208
209        for contract in all_contracts {
210            for bar_spec_str in &bar_specifications {
211                // Parse bar spec (e.g., "1-HOUR-LAST")
212                let parts: Vec<&str> = bar_spec_str.split('-').collect();
213                if parts.len() != 3 {
214                    anyhow::bail!("Invalid bar specification format: {}", bar_spec_str);
215                }
216
217                let step = parts[0].parse::<usize>()?;
218                let aggregation = parts[1].to_lowercase();
219                let price_type = parts[2].to_uppercase();
220
221                let bar_spec = match aggregation.as_str() {
222                    "second" => BarSpecification::new(
223                        step,
224                        BarAggregation::Second,
225                        PriceType::from_str(&price_type).unwrap_or(PriceType::Last),
226                    ),
227                    "minute" => BarSpecification::new(
228                        step,
229                        BarAggregation::Minute,
230                        PriceType::from_str(&price_type).unwrap_or(PriceType::Last),
231                    ),
232                    "hour" => BarSpecification::new(
233                        step,
234                        BarAggregation::Hour,
235                        PriceType::from_str(&price_type).unwrap_or(PriceType::Last),
236                    ),
237                    "day" => BarSpecification::new(
238                        step,
239                        BarAggregation::Day,
240                        PriceType::from_str(&price_type).unwrap_or(PriceType::Last),
241                    ),
242                    "week" => BarSpecification::new(
243                        step,
244                        BarAggregation::Week,
245                        PriceType::from_str(&price_type).unwrap_or(PriceType::Last),
246                    ),
247                    _ => anyhow::bail!("Unsupported aggregation: {}", aggregation),
248                };
249
250                let instrument_id = self.resolve_instrument_id(&contract).await?;
251                let bar_type_with_id =
252                    BarType::new(instrument_id, bar_spec, AggregationSource::External);
253
254                // Convert bar type to IB parameters
255                let ib_bar_size = bar_type_to_ib_bar_size(&bar_type_with_id)?;
256                let ib_what_to_show = price_type_to_ib_what_to_show(bar_spec.price_type);
257
258                // Calculate duration segments
259                let segments =
260                    self.calculate_duration_segments(start_date_time, end_date_time, duration);
261
262                for (segment_end, segment_duration) in segments {
263                    tracing::info!(
264                        "Requesting historical bars ending on {} with duration {}",
265                        segment_end,
266                        segment_duration
267                    );
268
269                    let historical_data = tokio::time::timeout(
270                        std::time::Duration::from_secs(timeout),
271                        self.ib_client.historical_data(
272                            &contract,
273                            Some(chrono_to_ib_datetime(&segment_end)),
274                            segment_duration,
275                            ib_bar_size,
276                            Some(ib_what_to_show),
277                            trading_hours,
278                        ),
279                    )
280                    .await
281                    .context(format!(
282                        "Historical data request timed out after {} seconds",
283                        timeout
284                    ))??;
285
286                    // Get precision from instrument if available
287                    let (price_precision, size_precision) =
288                        if let Some(instrument) = self.instrument_provider.find(&instrument_id) {
289                            (instrument.price_precision(), instrument.size_precision())
290                        } else {
291                            (5, 0) // Default fallback
292                        };
293
294                    // Create new bar_type with correct instrument_id
295                    for ib_bar in &historical_data.bars {
296                        let nautilus_bar = ib_bar_to_nautilus_bar(
297                            ib_bar,
298                            bar_type_with_id,
299                            price_precision,
300                            size_precision,
301                        )?;
302                        all_bars.push(nautilus_bar);
303                    }
304
305                    tracing::info!("Retrieved {} bars in batch", historical_data.bars.len());
306                }
307            }
308        }
309
310        // Sort by timestamp
311        all_bars.sort_by_key(|b| b.ts_event);
312
313        Ok(all_bars)
314    }
315
316    /// Request historical ticks with pagination support.
317    ///
318    /// # Arguments
319    ///
320    /// * `tick_type` - "TRADES" or "BID_ASK"
321    /// * `start_date_time` - Start date
322    /// * `end_date_time` - End date
323    /// * `contracts` - List of IB contracts
324    /// * `instrument_ids` - List of instrument IDs
325    /// * `use_rth` - Use regular trading hours only
326    /// * `timeout` - Request timeout in seconds
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if the request fails.
331    #[allow(clippy::too_many_arguments)]
332    pub async fn request_ticks(
333        &self,
334        tick_type: &str,
335        start_date_time: DateTime<Utc>,
336        end_date_time: DateTime<Utc>,
337        contracts: Option<Vec<Contract>>,
338        instrument_ids: Option<Vec<InstrumentId>>,
339        use_rth: bool,
340        _timeout: u64,
341    ) -> anyhow::Result<Vec<Data>> {
342        if tick_type != "TRADES" && tick_type != "BID_ASK" {
343            anyhow::bail!("tick_type must be 'TRADES' or 'BID_ASK'");
344        }
345
346        if start_date_time >= end_date_time {
347            anyhow::bail!("Start date must be before end date");
348        }
349
350        let contracts = contracts.unwrap_or_default();
351        let instrument_ids = instrument_ids.unwrap_or_default();
352
353        if contracts.is_empty() && instrument_ids.is_empty() {
354            anyhow::bail!("Either contracts or instrument_ids must be provided");
355        }
356
357        let trading_hours = if use_rth {
358            TradingHours::Regular
359        } else {
360            TradingHours::Extended
361        };
362
363        // Convert instrument IDs to contracts and auto-fetch if not cached
364        let mut all_contracts = contracts;
365
366        for instrument_id in instrument_ids {
367            // Auto-fetch if not cached
368            if self.instrument_provider.find(&instrument_id).is_none()
369                && let Err(e) = self
370                    .instrument_provider
371                    .fetch_contract_details(&self.ib_client, instrument_id, false, None)
372                    .await
373            {
374                tracing::warn!(
375                    "Failed to auto-fetch contract details for {}: {}",
376                    instrument_id,
377                    e
378                );
379            }
380
381            if let Ok(contract) = self
382                .instrument_provider
383                .resolve_contract_for_instrument(instrument_id)
384            {
385                all_contracts.push(contract);
386            } else {
387                tracing::warn!(
388                    "Failed to convert instrument_id {} to IB contract, skipping",
389                    instrument_id
390                );
391            }
392        }
393
394        // Auto-fetch contracts if not cached
395        for contract in &all_contracts {
396            if let Some(instrument_id) = self
397                .instrument_provider
398                .get_instrument_id_by_contract_id(contract.contract_id)
399                && self.instrument_provider.find(&instrument_id).is_none()
400                && let Err(e) = self
401                    .instrument_provider
402                    .fetch_contract_details(&self.ib_client, instrument_id, false, None)
403                    .await
404            {
405                tracing::warn!(
406                    "Failed to auto-fetch contract details for contract ID {}: {}",
407                    contract.contract_id,
408                    e
409                );
410            }
411        }
412
413        if all_contracts.is_empty() {
414            anyhow::bail!("No valid contracts found after conversion");
415        }
416
417        let mut all_ticks = Vec::new();
418
419        for contract in all_contracts {
420            let instrument_id = self.resolve_instrument_id(&contract).await?;
421
422            // Get precision from instrument if available
423            let (price_precision, size_precision) =
424                if let Some(instrument) = self.instrument_provider.find(&instrument_id) {
425                    (instrument.price_precision(), instrument.size_precision())
426                } else {
427                    (5, 0) // Default fallback
428                };
429
430            // Pagination loop for ticks (similar to Python _handle_timestamp_iteration)
431            let mut current_end_date = end_date_time;
432            let current_start_date = start_date_time;
433            let end_date_time_ns = UnixNanos::from(
434                end_date_time
435                    .timestamp_nanos_opt()
436                    .unwrap_or_else(|| end_date_time.timestamp() * 1_000_000_000)
437                    as u64,
438            );
439
440            match tick_type {
441                "TRADES" => {
442                    loop {
443                        // Make request for this batch
444                        let mut subscription = self
445                            .ib_client
446                            .historical_ticks_trade(
447                                &contract,
448                                Some(chrono_to_ib_datetime(&current_start_date)),
449                                Some(chrono_to_ib_datetime(&current_end_date)),
450                                1000, // Number of ticks per request
451                                trading_hours,
452                            )
453                            .await?;
454
455                        let mut batch_ticks = Vec::new();
456
457                        while let Some(tick) = subscription.next().await {
458                            let ts_event = ib_timestamp_to_unix_nanos(&tick.timestamp);
459
460                            // Filter out ticks after end_date_time
461                            if ts_event > end_date_time_ns {
462                                continue;
463                            }
464
465                            let ts_init = ts_event;
466
467                            let price = Price::new(tick.price, price_precision);
468                            let size = Quantity::new(tick.size as f64, size_precision);
469
470                            let trade_tick = TradeTick::new(
471                                instrument_id,
472                                price,
473                                size,
474                                AggressorSide::NoAggressor,
475                                crate::common::parse::generate_ib_trade_id(
476                                    ts_event,
477                                    tick.price,
478                                    tick.size as f64,
479                                ),
480                                ts_event,
481                                ts_init,
482                            );
483
484                            batch_ticks.push(Data::Trade(trade_tick));
485                        }
486
487                        if batch_ticks.is_empty() {
488                            break;
489                        }
490
491                        // Update current_end_date to the minimum ts_event from this batch for next iteration
492                        // This works backwards in time
493                        if let Some(min_tick) = batch_ticks.iter().min_by_key(|t| match t {
494                            Data::Trade(t) => t.ts_event,
495                            _ => UnixNanos::default(),
496                        }) {
497                            let min_ts_nanos = match min_tick {
498                                Data::Trade(t) => t.ts_event.as_u64(),
499                                _ => break,
500                            };
501
502                            if let Some(new_end) = retreat_end_datetime(min_ts_nanos) {
503                                current_end_date = new_end;
504                            } else {
505                                break;
506                            }
507                        }
508
509                        all_ticks.extend(batch_ticks);
510
511                        // Check if we should continue - need current_end > current_start
512                        if !should_continue_backward_pagination(
513                            current_end_date,
514                            current_start_date,
515                        ) {
516                            break;
517                        }
518
519                        // Filter out ticks after end_date_time if needed
520                        all_ticks.retain(|t| match t {
521                            Data::Trade(t) => t.ts_event <= end_date_time_ns,
522                            Data::Quote(q) => q.ts_event <= end_date_time_ns,
523                            _ => true,
524                        });
525                    }
526                }
527                "BID_ASK" => {
528                    loop {
529                        // Make request for this batch
530                        let mut subscription = self
531                            .ib_client
532                            .historical_ticks_bid_ask(
533                                &contract,
534                                Some(chrono_to_ib_datetime(&current_start_date)),
535                                Some(chrono_to_ib_datetime(&current_end_date)),
536                                1000,
537                                trading_hours,
538                                false, // ignore_size
539                            )
540                            .await?;
541
542                        let mut batch_ticks = Vec::new();
543
544                        while let Some(tick) = subscription.next().await {
545                            let ts_event = ib_timestamp_to_unix_nanos(&tick.timestamp);
546
547                            // Filter out ticks after end_date_time
548                            if ts_event > end_date_time_ns {
549                                continue;
550                            }
551
552                            let ts_init = ts_event;
553
554                            let bid_price = Price::new(tick.price_bid, price_precision);
555                            let bid_size = Quantity::new(tick.size_bid as f64, size_precision);
556                            let ask_price = Price::new(tick.price_ask, price_precision);
557                            let ask_size = Quantity::new(tick.size_ask as f64, size_precision);
558
559                            let quote_tick = QuoteTick::new(
560                                instrument_id,
561                                bid_price,
562                                ask_price,
563                                bid_size,
564                                ask_size,
565                                ts_event,
566                                ts_init,
567                            );
568
569                            batch_ticks.push(Data::Quote(quote_tick));
570                        }
571
572                        if batch_ticks.is_empty() {
573                            break;
574                        }
575
576                        // Update current_end_date to the minimum ts_event from this batch for next iteration
577                        if let Some(min_tick) = batch_ticks.iter().min_by_key(|t| match t {
578                            Data::Quote(q) => q.ts_event,
579                            _ => UnixNanos::default(),
580                        }) {
581                            let min_ts_nanos = match min_tick {
582                                Data::Quote(q) => q.ts_event.as_u64(),
583                                _ => break,
584                            };
585
586                            if let Some(new_end) = retreat_end_datetime(min_ts_nanos) {
587                                current_end_date = new_end;
588                            } else {
589                                break;
590                            }
591                        }
592
593                        all_ticks.extend(batch_ticks);
594
595                        // Check if we should continue
596                        if !should_continue_backward_pagination(
597                            current_end_date,
598                            current_start_date,
599                        ) {
600                            break;
601                        }
602
603                        // Filter out ticks after end_date_time if needed
604                        all_ticks.retain(|t| match t {
605                            Data::Trade(t) => t.ts_event <= end_date_time_ns,
606                            Data::Quote(q) => q.ts_event <= end_date_time_ns,
607                            _ => true,
608                        });
609                    }
610                }
611                _ => unreachable!(),
612            }
613        }
614
615        // Sort by timestamp
616        all_ticks.sort_by_key(|tick| match tick {
617            Data::Trade(t) => t.ts_event,
618            Data::Quote(q) => q.ts_event,
619            _ => UnixNanos::default(),
620        });
621
622        Ok(all_ticks)
623    }
624
625    /// Request instruments given instrument IDs or contracts.
626    ///
627    /// This method uses the instrument provider to load and return instruments.
628    ///
629    /// # Arguments
630    ///
631    /// * `instrument_ids` - Optional list of instrument IDs
632    /// * `contracts` - Optional list of IB contracts
633    ///
634    /// # Returns
635    ///
636    /// Returns a list of instruments.
637    ///
638    /// # Errors
639    ///
640    /// Returns an error if loading fails.
641    pub async fn request_instruments(
642        &self,
643        instrument_ids: Option<Vec<InstrumentId>>,
644        contracts: Option<Vec<Contract>>,
645    ) -> anyhow::Result<Vec<InstrumentAny>> {
646        let instrument_ids = instrument_ids.unwrap_or_default();
647        let contracts = contracts.unwrap_or_default();
648
649        if instrument_ids.is_empty() && contracts.is_empty() {
650            anyhow::bail!("Either instrument_ids or contracts must be provided");
651        }
652
653        let mut loaded_instruments = Vec::new();
654
655        // Load instruments from instrument IDs
656        for instrument_id in instrument_ids {
657            // Try fetching from provider if not already loaded
658            if self.instrument_provider.find(&instrument_id).is_none()
659                && let Err(e) = self
660                    .instrument_provider
661                    .fetch_contract_details(&self.ib_client, instrument_id, false, None)
662                    .await
663            {
664                tracing::warn!(
665                    "Failed to fetch contract details for {}: {}",
666                    instrument_id,
667                    e
668                );
669                continue;
670            }
671
672            if let Some(instrument) = self.instrument_provider.find(&instrument_id) {
673                loaded_instruments.push(instrument);
674            }
675        }
676
677        // Load instruments from contracts (equivalent to Python's _fetch_instruments_if_not_cached)
678        for contract in contracts {
679            // Try to find instrument by contract ID first
680            let instrument_id = if let Some(cached_id) = self
681                .instrument_provider
682                .get_instrument_id_by_contract_id(contract.contract_id)
683            {
684                Some(cached_id)
685            } else {
686                // Convert contract to instrument ID using provider's venue determination
687                // This matches Python's logic: venue = instrument_provider.determine_venue_from_contract(contract)
688                let venue = self.instrument_provider.determine_venue(&contract, None);
689                match self.instrument_provider.symbology_method() {
690                    crate::config::SymbologyMethod::Simplified => {
691                        crate::common::parse::ib_contract_to_instrument_id_simplified(
692                            &contract,
693                            Some(venue),
694                        )
695                        .ok()
696                    }
697                    crate::config::SymbologyMethod::Raw => {
698                        crate::common::parse::ib_contract_to_instrument_id_raw(
699                            &contract,
700                            Some(venue),
701                        )
702                        .ok()
703                    }
704                }
705            };
706
707            if let Some(instrument_id) = instrument_id {
708                // Check if already loaded (skip if already in results)
709                if loaded_instruments.iter().any(|i| i.id() == instrument_id) {
710                    continue;
711                }
712
713                // Fetch if not cached (matching Python: if not self._client._cache.instrument(instrument_id))
714                if self.instrument_provider.find(&instrument_id).is_none() {
715                    tracing::info!("Fetching Instrument for: {}", instrument_id);
716
717                    if let Err(e) = self
718                        .instrument_provider
719                        .fetch_contract_details(&self.ib_client, instrument_id, false, None)
720                        .await
721                    {
722                        tracing::warn!(
723                            "Failed to fetch contract details for {}: {}",
724                            instrument_id,
725                            e
726                        );
727                        continue;
728                    }
729                }
730
731                if let Some(instrument) = self.instrument_provider.find(&instrument_id) {
732                    loaded_instruments.push(instrument);
733                }
734            } else {
735                // Fallback: try using get_instrument which handles BAG contracts
736                if let Ok(Some(instrument)) = self
737                    .instrument_provider
738                    .get_instrument(&self.ib_client, &contract)
739                    .await
740                {
741                    if !loaded_instruments.iter().any(|i| i.id() == instrument.id()) {
742                        loaded_instruments.push(instrument);
743                    }
744                }
745            }
746        }
747
748        tracing::info!("Loaded {} instruments", loaded_instruments.len());
749
750        Ok(loaded_instruments)
751    }
752
753    /// Calculate duration segments for a time range.
754    ///
755    /// This breaks down large date ranges into smaller segments that IB can handle.
756    ///
757    /// # Arguments
758    ///
759    /// * `start_date` - Optional start date
760    /// * `end_date` - End date
761    /// * `duration` - Optional duration string
762    ///
763    /// # Returns
764    ///
765    /// Returns a list of (end_date, duration) tuples.
766    fn calculate_duration_segments(
767        &self,
768        start_date: Option<DateTime<Utc>>,
769        end_date: DateTime<Utc>,
770        duration: Option<&str>,
771    ) -> Vec<(DateTime<Utc>, historical::Duration)> {
772        // If duration is specified, use it directly
773        if let Some(dur_str) = duration {
774            if let Ok(dur) = dur_str.parse::<historical::Duration>() {
775                return vec![(end_date, dur)];
776            } else {
777                tracing::warn!("Invalid duration format: {}, using default", dur_str);
778            }
779        }
780
781        // Calculate from start/end dates - matching Python's comprehensive breakdown
782        if let Some(start) = start_date {
783            let total_delta = end_date.signed_duration_since(start);
784            let total_days = total_delta.num_days();
785
786            let mut segments = Vec::new();
787
788            // Calculate full years in the time delta (matching Python: years = total_delta.days // 365)
789            let years = total_days / 365;
790            let minus_years_date = if years > 0 {
791                end_date - chrono::Duration::days(365 * years)
792            } else {
793                end_date
794            };
795
796            // Calculate remaining days after subtracting full years (matching Python logic)
797            let days = if years > 0 {
798                let remaining_delta = minus_years_date.signed_duration_since(start);
799                remaining_delta.num_days()
800            } else {
801                total_days
802            };
803
804            let minus_days_date = if days > 0 {
805                minus_years_date - chrono::Duration::days(days)
806            } else {
807                minus_years_date
808            };
809
810            // Calculate remaining time in seconds after subtracting years and days
811            // Matching Python: hours*3600 + minutes*60 + seconds + subsecond
812            let remaining_delta = minus_days_date.signed_duration_since(start);
813            // Extract time components from the remaining delta
814            let total_secs = remaining_delta.num_seconds();
815            let hours = total_secs / 3600;
816            let minutes = (total_secs % 3600) / 60;
817            let secs = total_secs % 60;
818            // Check for subsecond precision (milliseconds, microseconds, nanoseconds)
819            let subsecond = if remaining_delta.num_milliseconds() % 1000 > 0
820                || remaining_delta.num_microseconds().unwrap_or(0) % 1000 > 0
821                || remaining_delta.num_nanoseconds().unwrap_or(0) % 1000 > 0
822            {
823                1
824            } else {
825                0
826            };
827            let seconds = hours * 3600 + minutes * 60 + secs + subsecond;
828
829            // Build segments in order: years, days, seconds (matching Python order)
830            if years > 0 {
831                segments.push((end_date, historical::Duration::years(years as i32)));
832            }
833
834            if days > 0 {
835                segments.push((minus_years_date, historical::Duration::days(days as i32)));
836            }
837
838            if seconds > 0 {
839                segments.push((
840                    minus_days_date,
841                    historical::Duration::seconds(seconds as i32),
842                ));
843            }
844
845            if segments.is_empty() {
846                // Default to 1 day if calculation results in nothing
847                segments.push((end_date, historical::Duration::days(1)));
848            }
849
850            segments
851        } else {
852            // Default to 1 day if no start date
853            vec![(end_date, historical::Duration::days(1))]
854        }
855    }
856
857    async fn resolve_instrument_id(&self, contract: &Contract) -> anyhow::Result<InstrumentId> {
858        if let Some(instrument_id) = self
859            .instrument_provider
860            .get_instrument_id_by_contract_id(contract.contract_id)
861        {
862            return Ok(instrument_id);
863        }
864
865        let venue = self.instrument_provider.determine_venue(contract, None);
866        let parsed = match self.instrument_provider.symbology_method() {
867            crate::config::SymbologyMethod::Simplified => {
868                crate::common::parse::ib_contract_to_instrument_id_simplified(contract, Some(venue))
869                    .ok()
870            }
871            crate::config::SymbologyMethod::Raw => {
872                crate::common::parse::ib_contract_to_instrument_id_raw(contract, Some(venue)).ok()
873            }
874        };
875
876        if let Some(instrument_id) = parsed {
877            return Ok(instrument_id);
878        }
879
880        if let Ok(Some(instrument)) = self
881            .instrument_provider
882            .get_instrument(&self.ib_client, contract)
883            .await
884        {
885            return Ok(instrument.id());
886        }
887
888        anyhow::bail!(
889            "Failed to resolve instrument ID for contract {}:{}:{}",
890            contract.symbol,
891            contract.security_type,
892            contract.exchange
893        );
894    }
895}
896
897fn retreat_end_datetime(min_ts_nanos: u64) -> Option<DateTime<Utc>> {
898    let new_end_nanos = min_ts_nanos.saturating_sub(1_000_000); // 1ms
899    let seconds = (new_end_nanos / 1_000_000_000) as i64;
900    let nanos = (new_end_nanos % 1_000_000_000) as u32;
901    chrono::DateTime::from_timestamp(seconds, nanos)
902}
903
904fn should_continue_backward_pagination(
905    current_end_date: DateTime<Utc>,
906    current_start_date: DateTime<Utc>,
907) -> bool {
908    current_end_date > current_start_date
909}
910
911#[cfg(test)]
912mod tests {
913    use chrono::{TimeZone, Utc};
914    use rstest::rstest;
915
916    use super::{retreat_end_datetime, should_continue_backward_pagination};
917
918    #[rstest]
919    fn test_retreat_end_datetime_subtracts_one_millisecond() {
920        let ts_nanos = 1_700_000_000_123_456_789_u64;
921        let result = retreat_end_datetime(ts_nanos).unwrap();
922        assert_eq!(
923            result.timestamp_nanos_opt().unwrap() as u64,
924            ts_nanos - 1_000_000
925        );
926    }
927
928    #[rstest]
929    fn test_retreat_end_datetime_saturates_at_zero() {
930        let result = retreat_end_datetime(500_000).unwrap();
931        assert_eq!(result.timestamp_nanos_opt().unwrap(), 0);
932    }
933
934    #[rstest]
935    fn test_should_continue_backward_pagination_true_when_end_after_start() {
936        let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
937        let end = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 1).unwrap();
938        assert!(should_continue_backward_pagination(end, start));
939    }
940
941    #[rstest]
942    fn test_should_continue_backward_pagination_false_when_end_equal_start() {
943        let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
944        assert!(!should_continue_backward_pagination(start, start));
945    }
946}