Skip to main content

nautilus_backtest/
data_iterator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Multi-stream, time-ordered data iterator for replaying historical market data.
17
18use std::collections::BinaryHeap;
19
20use ahash::AHashMap;
21use nautilus_core::UnixNanos;
22use nautilus_model::data::{Data, HasTsInit};
23
24/// Internal convenience struct to keep heap entries ordered by `(ts_init, priority)`.
25#[derive(Debug, Eq, PartialEq)]
26struct HeapEntry {
27    ts: UnixNanos,
28    priority: i32,
29    index: usize,
30}
31
32impl Ord for HeapEntry {
33    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
34        // min-heap on ts, then priority sign (+/-) then index
35        self.ts
36            .cmp(&other.ts)
37            .then_with(|| self.priority.cmp(&other.priority))
38            .then_with(|| self.index.cmp(&other.index))
39            .reverse() // BinaryHeap is max by default -> reverse for min behaviour
40    }
41}
42
43impl PartialOrd for HeapEntry {
44    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
45        Some(self.cmp(other))
46    }
47}
48
49/// Multi-stream, time-ordered data iterator used by the backtest engine.
50#[derive(Debug, Default)]
51pub struct BacktestDataIterator {
52    streams: AHashMap<i32, Vec<Data>>, // key: priority, value: Vec<Data>
53    names: AHashMap<i32, String>,      // priority -> name
54    priorities: AHashMap<String, i32>, // name -> priority
55    indices: AHashMap<i32, usize>,     // cursor per stream
56    heap: BinaryHeap<HeapEntry>,
57    single_priority: Option<i32>,
58    next_priority_counter: i32, // monotonically increasing counter used to assign priorities
59}
60
61impl BacktestDataIterator {
62    /// Creates a new empty [`BacktestDataIterator`].
63    #[must_use]
64    pub fn new() -> Self {
65        Self {
66            streams: AHashMap::new(),
67            names: AHashMap::new(),
68            priorities: AHashMap::new(),
69            indices: AHashMap::new(),
70            heap: BinaryHeap::new(),
71            single_priority: None,
72            next_priority_counter: 0,
73        }
74    }
75
76    /// Adds (or replaces) a named data stream.
77    ///
78    /// When `append_data` is true the stream gets lower priority on timestamp
79    /// ties; when false (prepend) it wins ties.
80    pub fn add_data(&mut self, name: &str, mut data: Vec<Data>, append_data: bool) {
81        if data.is_empty() {
82            return;
83        }
84
85        // Ensure sorted by ts_init
86        data.sort_by_key(HasTsInit::ts_init);
87
88        let priority = if let Some(p) = self.priorities.get(name) {
89            // Replace existing stream – remove previous traces then re-insert below.
90            *p
91        } else {
92            self.next_priority_counter += 1;
93            let sign = if append_data { 1 } else { -1 };
94            sign * self.next_priority_counter
95        };
96
97        // Remove old state if any
98        self.remove_data(name, true);
99
100        self.streams.insert(priority, data);
101        self.names.insert(priority, name.to_string());
102        self.priorities.insert(name.to_string(), priority);
103        self.indices.insert(priority, 0);
104
105        self.rebuild_heap();
106    }
107
108    /// Removes a named data stream.
109    pub fn remove_data(&mut self, name: &str, complete_remove: bool) {
110        if let Some(priority) = self.priorities.remove(name) {
111            self.streams.remove(&priority);
112            self.indices.remove(&priority);
113            self.names.remove(&priority);
114
115            // Rebuild heap sans removed priority
116            self.heap.retain(|e| e.priority != priority);
117
118            if self.heap.is_empty() {
119                self.single_priority = None;
120            }
121        }
122
123        if complete_remove {
124            // Placeholder for future generator cleanup
125        }
126    }
127
128    /// Sets the cursor of a named stream to `index` (0-based).
129    pub fn set_index(&mut self, name: &str, index: usize) {
130        if let Some(priority) = self.priorities.get(name) {
131            self.indices.insert(*priority, index);
132            self.rebuild_heap();
133        }
134    }
135
136    /// Resets all stream cursors to the beginning.
137    pub fn reset_all_cursors(&mut self) {
138        for idx in self.indices.values_mut() {
139            *idx = 0;
140        }
141        self.rebuild_heap();
142    }
143
144    /// Returns the next [`Data`] element across all streams in chronological order.
145    #[expect(clippy::should_implement_trait)]
146    pub fn next(&mut self) -> Option<Data> {
147        // Fast path for single stream
148        if let Some(p) = self.single_priority {
149            let data = self.streams.get_mut(&p)?;
150            let idx = self.indices.get_mut(&p)?;
151            if *idx >= data.len() {
152                return None;
153            }
154            let element = data[*idx].clone();
155            *idx += 1;
156            return Some(element);
157        }
158
159        // Multi-stream path using heap
160        let entry = self.heap.pop()?;
161        let stream_vec = self.streams.get(&entry.priority)?;
162        let element = stream_vec[entry.index].clone();
163
164        // Advance cursor and push next entry
165        let next_index = entry.index + 1;
166        self.indices.insert(entry.priority, next_index);
167        if next_index < stream_vec.len() {
168            self.heap.push(HeapEntry {
169                ts: stream_vec[next_index].ts_init(),
170                priority: entry.priority,
171                index: next_index,
172            });
173        }
174
175        Some(element)
176    }
177
178    /// Returns whether all streams have been fully consumed.
179    #[must_use]
180    pub fn is_done(&self) -> bool {
181        if let Some(p) = self.single_priority {
182            if let Some(idx) = self.indices.get(&p)
183                && let Some(vec) = self.streams.get(&p)
184            {
185                return *idx >= vec.len();
186            }
187            true
188        } else {
189            self.heap.is_empty()
190        }
191    }
192
193    fn rebuild_heap(&mut self) {
194        self.heap.clear();
195
196        // Determine if we’re in single-stream mode
197        if self.streams.len() == 1 {
198            self.single_priority = self.streams.keys().next().copied();
199            return;
200        }
201        self.single_priority = None;
202
203        for (&priority, vec) in &self.streams {
204            let idx = *self.indices.get(&priority).unwrap_or(&0);
205            if idx < vec.len() {
206                self.heap.push(HeapEntry {
207                    ts: vec[idx].ts_init(),
208                    priority,
209                    index: idx,
210                });
211            }
212        }
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use nautilus_model::{
219        data::QuoteTick,
220        identifiers::InstrumentId,
221        types::{Price, Quantity},
222    };
223    use rstest::rstest;
224
225    use super::*;
226
227    fn quote(id: &str, ts: u64) -> Data {
228        let inst = InstrumentId::from(id);
229        Data::Quote(QuoteTick::new(
230            inst,
231            Price::from("1.0"),
232            Price::from("1.0"),
233            Quantity::from(100),
234            Quantity::from(100),
235            ts.into(),
236            ts.into(),
237        ))
238    }
239
240    fn collect_ts(it: &mut BacktestDataIterator) -> Vec<u64> {
241        let mut ts = Vec::new();
242        while let Some(d) = it.next() {
243            ts.push(d.ts_init().as_u64());
244        }
245        ts
246    }
247
248    #[rstest]
249    fn test_single_stream_yields_in_order() {
250        let mut it = BacktestDataIterator::new();
251        it.add_data(
252            "s",
253            vec![quote("A.B", 100), quote("A.B", 200), quote("A.B", 300)],
254            true,
255        );
256
257        assert_eq!(collect_ts(&mut it), vec![100, 200, 300]);
258        assert!(it.is_done());
259    }
260
261    #[rstest]
262    fn test_single_stream_exhaustion_returns_none() {
263        let mut it = BacktestDataIterator::new();
264        it.add_data("s", vec![quote("A.B", 1), quote("A.B", 3)], true);
265        assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(1));
266        assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(3));
267        assert!(it.next().is_none());
268    }
269
270    #[rstest]
271    fn test_single_stream_sorts_unsorted_input() {
272        let mut it = BacktestDataIterator::new();
273        it.add_data(
274            "s",
275            vec![quote("A.B", 300), quote("A.B", 100), quote("A.B", 200)],
276            true,
277        );
278
279        assert_eq!(collect_ts(&mut it), vec![100, 200, 300]);
280    }
281
282    #[rstest]
283    fn test_two_stream_merge_chronological() {
284        let mut it = BacktestDataIterator::new();
285        it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 4)], true);
286        it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 3)], false);
287
288        assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
289    }
290
291    #[rstest]
292    fn test_three_stream_merge_sorted() {
293        let mut it = BacktestDataIterator::new();
294        let data_len = 5;
295        let d0: Vec<Data> = (0..data_len).map(|k| quote("A.B", 3 * k)).collect();
296        let d1: Vec<Data> = (0..data_len).map(|k| quote("C.D", 3 * k + 1)).collect();
297        let d2: Vec<Data> = (0..data_len).map(|k| quote("E.F", 3 * k + 2)).collect();
298        it.add_data("d0", d0, true);
299        it.add_data("d1", d1, true);
300        it.add_data("d2", d2, true);
301
302        let ts = collect_ts(&mut it);
303        assert_eq!(ts.len(), 15);
304        for i in 0..ts.len() - 1 {
305            assert!(ts[i] <= ts[i + 1], "Not sorted at index {i}");
306        }
307    }
308
309    #[rstest]
310    fn test_multiple_streams_merge_order() {
311        let mut it = BacktestDataIterator::new();
312        it.add_data("s1", vec![quote("A.B", 100), quote("A.B", 300)], true);
313        it.add_data("s2", vec![quote("C.D", 200), quote("C.D", 400)], true);
314
315        assert_eq!(collect_ts(&mut it), vec![100, 200, 300, 400]);
316    }
317
318    #[rstest]
319    fn test_append_data_priority_default_fifo() {
320        let mut it = BacktestDataIterator::new();
321        it.add_data("a", vec![quote("A.B", 100)], true);
322        it.add_data("b", vec![quote("C.D", 100)], true);
323
324        // Both at same timestamp, FIFO order (a before b)
325        let ts = collect_ts(&mut it);
326        assert_eq!(ts, vec![100, 100]);
327    }
328
329    #[rstest]
330    fn test_prepend_priority_wins_ties() {
331        let mut it = BacktestDataIterator::new();
332        // "a" is appended (lower priority), "b" is prepended (higher priority)
333        it.add_data("a", vec![quote("A.B", 100)], true);
334        it.add_data("b", vec![quote("C.D", 100)], false);
335
336        // "b" (prepend) should come first despite being added second
337        let first = it.next().unwrap();
338        let second = it.next().unwrap();
339        // Prepend stream (negative priority) wins ties over append (positive)
340        assert_eq!(first.instrument_id(), InstrumentId::from("C.D"));
341        assert_eq!(second.instrument_id(), InstrumentId::from("A.B"));
342    }
343
344    #[rstest]
345    fn test_is_done_empty_iterator() {
346        let it = BacktestDataIterator::new();
347        assert!(it.is_done());
348    }
349
350    #[rstest]
351    fn test_is_done_after_consumption() {
352        let mut it = BacktestDataIterator::new();
353        it.add_data("s", vec![quote("A.B", 1)], true);
354
355        assert!(!it.is_done());
356        it.next();
357        assert!(it.is_done());
358    }
359
360    #[rstest]
361    fn test_is_done_multi_stream() {
362        let mut it = BacktestDataIterator::new();
363        it.add_data("s1", vec![quote("A.B", 1)], true);
364        it.add_data("s2", vec![quote("C.D", 2)], true);
365
366        assert!(!it.is_done());
367        it.next();
368        assert!(!it.is_done());
369        it.next();
370        assert!(it.is_done());
371    }
372
373    #[rstest]
374    fn test_partial_consumption_then_complete() {
375        let mut it = BacktestDataIterator::new();
376        it.add_data(
377            "s",
378            vec![
379                quote("A.B", 0),
380                quote("A.B", 1),
381                quote("A.B", 2),
382                quote("A.B", 3),
383            ],
384            true,
385        );
386
387        assert_eq!(it.next().unwrap().ts_init().as_u64(), 0);
388        assert_eq!(it.next().unwrap().ts_init().as_u64(), 1);
389
390        let remaining = collect_ts(&mut it);
391        assert_eq!(remaining, vec![2, 3]);
392        assert!(it.is_done());
393    }
394
395    #[rstest]
396    fn test_remove_stream_reduces_output() {
397        let mut it = BacktestDataIterator::new();
398        it.add_data("a", vec![quote("A.B", 1)], true);
399        it.add_data("b", vec![quote("C.D", 2)], true);
400
401        it.remove_data("a", false);
402
403        assert_eq!(collect_ts(&mut it), vec![2]);
404    }
405
406    #[rstest]
407    fn test_remove_all_streams_yields_empty() {
408        let mut it = BacktestDataIterator::new();
409        it.add_data("x", vec![quote("A.B", 1)], true);
410        it.add_data("y", vec![quote("C.D", 2)], true);
411
412        it.remove_data("x", false);
413        it.remove_data("y", false);
414
415        assert!(it.next().is_none());
416        assert!(it.is_done());
417    }
418
419    #[rstest]
420    fn test_remove_nonexistent_stream_is_noop() {
421        let mut it = BacktestDataIterator::new();
422        it.add_data("s", vec![quote("A.B", 1)], true);
423
424        it.remove_data("nonexistent", false);
425
426        assert_eq!(collect_ts(&mut it), vec![1]);
427    }
428
429    #[rstest]
430    fn test_remove_after_full_consumption() {
431        let mut it = BacktestDataIterator::new();
432        it.add_data("s", vec![quote("A.B", 1), quote("A.B", 2)], true);
433
434        collect_ts(&mut it);
435
436        it.remove_data("s", true);
437        assert!(it.is_done());
438    }
439
440    #[rstest]
441    fn test_set_index_rewinds_stream() {
442        let mut it = BacktestDataIterator::new();
443        it.add_data(
444            "s",
445            vec![quote("A.B", 10), quote("A.B", 20), quote("A.B", 30)],
446            true,
447        );
448
449        assert_eq!(it.next().unwrap().ts_init().as_u64(), 10);
450
451        it.set_index("s", 0);
452
453        assert_eq!(collect_ts(&mut it), vec![10, 20, 30]);
454    }
455
456    #[rstest]
457    fn test_set_index_skips_forward() {
458        let mut it = BacktestDataIterator::new();
459        it.add_data(
460            "s",
461            vec![quote("A.B", 10), quote("A.B", 20), quote("A.B", 30)],
462            true,
463        );
464
465        it.set_index("s", 2);
466
467        assert_eq!(collect_ts(&mut it), vec![30]);
468    }
469
470    #[rstest]
471    fn test_set_index_nonexistent_stream_is_noop() {
472        let mut it = BacktestDataIterator::new();
473        it.add_data("s", vec![quote("A.B", 1)], true);
474
475        it.set_index("nonexistent", 0);
476
477        assert_eq!(collect_ts(&mut it), vec![1]);
478    }
479
480    #[rstest]
481    fn test_reset_all_cursors_single_stream() {
482        let mut it = BacktestDataIterator::new();
483        it.add_data("s", vec![quote("A.B", 1), quote("A.B", 2)], true);
484
485        collect_ts(&mut it);
486        assert!(it.is_done());
487
488        it.reset_all_cursors();
489        assert!(!it.is_done());
490        assert_eq!(collect_ts(&mut it), vec![1, 2]);
491    }
492
493    #[rstest]
494    fn test_reset_all_cursors_multi_stream() {
495        let mut it = BacktestDataIterator::new();
496        it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 3)], true);
497        it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 4)], true);
498
499        collect_ts(&mut it);
500        assert!(it.is_done());
501
502        it.reset_all_cursors();
503        assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
504    }
505
506    #[rstest]
507    fn test_readding_data_replaces_stream() {
508        let mut it = BacktestDataIterator::new();
509        it.add_data("X", vec![quote("A.B", 1), quote("A.B", 2)], true);
510        it.add_data("X", vec![quote("A.B", 10)], true);
511
512        assert_eq!(collect_ts(&mut it), vec![10]);
513    }
514
515    #[rstest]
516    fn test_add_empty_data_is_noop() {
517        let mut it = BacktestDataIterator::new();
518        it.add_data("empty", vec![], true);
519
520        assert!(it.is_done());
521        assert!(it.next().is_none());
522    }
523
524    #[rstest]
525    fn test_empty_iterator_returns_none() {
526        let mut it = BacktestDataIterator::new();
527        assert!(it.next().is_none());
528        assert!(it.is_done());
529    }
530
531    #[rstest]
532    fn test_multiple_add_data_calls_with_different_names() {
533        let mut it = BacktestDataIterator::new();
534        it.add_data("batch_0", vec![quote("A.B", 1), quote("A.B", 3)], true);
535        it.add_data("batch_1", vec![quote("A.B", 2), quote("A.B", 4)], true);
536
537        assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
538    }
539
540    #[rstest]
541    fn test_prepend_stream_always_wins_ties_across_batches() {
542        // Verifies that a prepend stream (negative priority) wins ties
543        // even when added after multiple append streams
544        let mut it = BacktestDataIterator::new();
545        it.add_data("append_a", vec![quote("A.B", 100)], true);
546        it.add_data("append_b", vec![quote("C.D", 100)], true);
547        it.add_data("prepend", vec![quote("E.F", 100)], false);
548
549        let first = it.next().unwrap();
550        assert_eq!(
551            first.instrument_id(),
552            InstrumentId::from("E.F"),
553            "Prepend stream should always come first in ties"
554        );
555    }
556
557    #[rstest]
558    fn test_equal_timestamps_across_many_streams_preserves_priority_order() {
559        // All items at the same timestamp — ordering is strictly by priority
560        let mut it = BacktestDataIterator::new();
561        it.add_data("s1", vec![quote("A.B", 50)], true);
562        it.add_data("s2", vec![quote("C.D", 50)], true);
563        it.add_data("s3", vec![quote("E.F", 50)], true);
564        it.add_data("s4", vec![quote("G.H", 50)], true);
565
566        let mut ids = Vec::new();
567        while let Some(d) = it.next() {
568            ids.push(d.instrument_id());
569        }
570
571        assert_eq!(ids.len(), 4);
572
573        // All should be yielded (no duplicates dropped, no items lost)
574        assert!(ids.contains(&InstrumentId::from("A.B")));
575        assert!(ids.contains(&InstrumentId::from("C.D")));
576        assert!(ids.contains(&InstrumentId::from("E.F")));
577        assert!(ids.contains(&InstrumentId::from("G.H")));
578    }
579}