1use std::collections::BinaryHeap;
19
20use ahash::AHashMap;
21use nautilus_core::UnixNanos;
22use nautilus_model::data::{Data, HasTsInit};
23
24#[derive(Debug, Eq, PartialEq)]
26struct HeapEntry {
27 ts: UnixNanos,
28 priority: i32,
29 index: usize,
30}
31
32impl Ord for HeapEntry {
33 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
34 self.ts
36 .cmp(&other.ts)
37 .then_with(|| self.priority.cmp(&other.priority))
38 .then_with(|| self.index.cmp(&other.index))
39 .reverse() }
41}
42
43impl PartialOrd for HeapEntry {
44 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
45 Some(self.cmp(other))
46 }
47}
48
49#[derive(Debug, Default)]
51pub struct BacktestDataIterator {
52 streams: AHashMap<i32, Vec<Data>>, names: AHashMap<i32, String>, priorities: AHashMap<String, i32>, indices: AHashMap<i32, usize>, heap: BinaryHeap<HeapEntry>,
57 single_priority: Option<i32>,
58 next_priority_counter: i32, }
60
61impl BacktestDataIterator {
62 #[must_use]
64 pub fn new() -> Self {
65 Self {
66 streams: AHashMap::new(),
67 names: AHashMap::new(),
68 priorities: AHashMap::new(),
69 indices: AHashMap::new(),
70 heap: BinaryHeap::new(),
71 single_priority: None,
72 next_priority_counter: 0,
73 }
74 }
75
76 pub fn add_data(&mut self, name: &str, mut data: Vec<Data>, append_data: bool) {
81 if data.is_empty() {
82 return;
83 }
84
85 data.sort_by_key(HasTsInit::ts_init);
87
88 let priority = if let Some(p) = self.priorities.get(name) {
89 *p
91 } else {
92 self.next_priority_counter += 1;
93 let sign = if append_data { 1 } else { -1 };
94 sign * self.next_priority_counter
95 };
96
97 self.remove_data(name, true);
99
100 self.streams.insert(priority, data);
101 self.names.insert(priority, name.to_string());
102 self.priorities.insert(name.to_string(), priority);
103 self.indices.insert(priority, 0);
104
105 self.rebuild_heap();
106 }
107
108 pub fn remove_data(&mut self, name: &str, complete_remove: bool) {
110 if let Some(priority) = self.priorities.remove(name) {
111 self.streams.remove(&priority);
112 self.indices.remove(&priority);
113 self.names.remove(&priority);
114
115 self.heap.retain(|e| e.priority != priority);
117
118 if self.heap.is_empty() {
119 self.single_priority = None;
120 }
121 }
122
123 if complete_remove {
124 }
126 }
127
128 pub fn set_index(&mut self, name: &str, index: usize) {
130 if let Some(priority) = self.priorities.get(name) {
131 self.indices.insert(*priority, index);
132 self.rebuild_heap();
133 }
134 }
135
136 pub fn reset_all_cursors(&mut self) {
138 for idx in self.indices.values_mut() {
139 *idx = 0;
140 }
141 self.rebuild_heap();
142 }
143
144 #[expect(clippy::should_implement_trait)]
146 pub fn next(&mut self) -> Option<Data> {
147 if let Some(p) = self.single_priority {
149 let data = self.streams.get_mut(&p)?;
150 let idx = self.indices.get_mut(&p)?;
151 if *idx >= data.len() {
152 return None;
153 }
154 let element = data[*idx].clone();
155 *idx += 1;
156 return Some(element);
157 }
158
159 let entry = self.heap.pop()?;
161 let stream_vec = self.streams.get(&entry.priority)?;
162 let element = stream_vec[entry.index].clone();
163
164 let next_index = entry.index + 1;
166 self.indices.insert(entry.priority, next_index);
167 if next_index < stream_vec.len() {
168 self.heap.push(HeapEntry {
169 ts: stream_vec[next_index].ts_init(),
170 priority: entry.priority,
171 index: next_index,
172 });
173 }
174
175 Some(element)
176 }
177
178 #[must_use]
180 pub fn is_done(&self) -> bool {
181 if let Some(p) = self.single_priority {
182 if let Some(idx) = self.indices.get(&p)
183 && let Some(vec) = self.streams.get(&p)
184 {
185 return *idx >= vec.len();
186 }
187 true
188 } else {
189 self.heap.is_empty()
190 }
191 }
192
193 fn rebuild_heap(&mut self) {
194 self.heap.clear();
195
196 if self.streams.len() == 1 {
198 self.single_priority = self.streams.keys().next().copied();
199 return;
200 }
201 self.single_priority = None;
202
203 for (&priority, vec) in &self.streams {
204 let idx = *self.indices.get(&priority).unwrap_or(&0);
205 if idx < vec.len() {
206 self.heap.push(HeapEntry {
207 ts: vec[idx].ts_init(),
208 priority,
209 index: idx,
210 });
211 }
212 }
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use nautilus_model::{
219 data::QuoteTick,
220 identifiers::InstrumentId,
221 types::{Price, Quantity},
222 };
223 use rstest::rstest;
224
225 use super::*;
226
227 fn quote(id: &str, ts: u64) -> Data {
228 let inst = InstrumentId::from(id);
229 Data::Quote(QuoteTick::new(
230 inst,
231 Price::from("1.0"),
232 Price::from("1.0"),
233 Quantity::from(100),
234 Quantity::from(100),
235 ts.into(),
236 ts.into(),
237 ))
238 }
239
240 fn collect_ts(it: &mut BacktestDataIterator) -> Vec<u64> {
241 let mut ts = Vec::new();
242 while let Some(d) = it.next() {
243 ts.push(d.ts_init().as_u64());
244 }
245 ts
246 }
247
248 #[rstest]
249 fn test_single_stream_yields_in_order() {
250 let mut it = BacktestDataIterator::new();
251 it.add_data(
252 "s",
253 vec![quote("A.B", 100), quote("A.B", 200), quote("A.B", 300)],
254 true,
255 );
256
257 assert_eq!(collect_ts(&mut it), vec![100, 200, 300]);
258 assert!(it.is_done());
259 }
260
261 #[rstest]
262 fn test_single_stream_exhaustion_returns_none() {
263 let mut it = BacktestDataIterator::new();
264 it.add_data("s", vec![quote("A.B", 1), quote("A.B", 3)], true);
265 assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(1));
266 assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(3));
267 assert!(it.next().is_none());
268 }
269
270 #[rstest]
271 fn test_single_stream_sorts_unsorted_input() {
272 let mut it = BacktestDataIterator::new();
273 it.add_data(
274 "s",
275 vec![quote("A.B", 300), quote("A.B", 100), quote("A.B", 200)],
276 true,
277 );
278
279 assert_eq!(collect_ts(&mut it), vec![100, 200, 300]);
280 }
281
282 #[rstest]
283 fn test_two_stream_merge_chronological() {
284 let mut it = BacktestDataIterator::new();
285 it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 4)], true);
286 it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 3)], false);
287
288 assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
289 }
290
291 #[rstest]
292 fn test_three_stream_merge_sorted() {
293 let mut it = BacktestDataIterator::new();
294 let data_len = 5;
295 let d0: Vec<Data> = (0..data_len).map(|k| quote("A.B", 3 * k)).collect();
296 let d1: Vec<Data> = (0..data_len).map(|k| quote("C.D", 3 * k + 1)).collect();
297 let d2: Vec<Data> = (0..data_len).map(|k| quote("E.F", 3 * k + 2)).collect();
298 it.add_data("d0", d0, true);
299 it.add_data("d1", d1, true);
300 it.add_data("d2", d2, true);
301
302 let ts = collect_ts(&mut it);
303 assert_eq!(ts.len(), 15);
304 for i in 0..ts.len() - 1 {
305 assert!(ts[i] <= ts[i + 1], "Not sorted at index {i}");
306 }
307 }
308
309 #[rstest]
310 fn test_multiple_streams_merge_order() {
311 let mut it = BacktestDataIterator::new();
312 it.add_data("s1", vec![quote("A.B", 100), quote("A.B", 300)], true);
313 it.add_data("s2", vec![quote("C.D", 200), quote("C.D", 400)], true);
314
315 assert_eq!(collect_ts(&mut it), vec![100, 200, 300, 400]);
316 }
317
318 #[rstest]
319 fn test_append_data_priority_default_fifo() {
320 let mut it = BacktestDataIterator::new();
321 it.add_data("a", vec![quote("A.B", 100)], true);
322 it.add_data("b", vec![quote("C.D", 100)], true);
323
324 let ts = collect_ts(&mut it);
326 assert_eq!(ts, vec![100, 100]);
327 }
328
329 #[rstest]
330 fn test_prepend_priority_wins_ties() {
331 let mut it = BacktestDataIterator::new();
332 it.add_data("a", vec![quote("A.B", 100)], true);
334 it.add_data("b", vec![quote("C.D", 100)], false);
335
336 let first = it.next().unwrap();
338 let second = it.next().unwrap();
339 assert_eq!(first.instrument_id(), InstrumentId::from("C.D"));
341 assert_eq!(second.instrument_id(), InstrumentId::from("A.B"));
342 }
343
344 #[rstest]
345 fn test_is_done_empty_iterator() {
346 let it = BacktestDataIterator::new();
347 assert!(it.is_done());
348 }
349
350 #[rstest]
351 fn test_is_done_after_consumption() {
352 let mut it = BacktestDataIterator::new();
353 it.add_data("s", vec![quote("A.B", 1)], true);
354
355 assert!(!it.is_done());
356 it.next();
357 assert!(it.is_done());
358 }
359
360 #[rstest]
361 fn test_is_done_multi_stream() {
362 let mut it = BacktestDataIterator::new();
363 it.add_data("s1", vec![quote("A.B", 1)], true);
364 it.add_data("s2", vec![quote("C.D", 2)], true);
365
366 assert!(!it.is_done());
367 it.next();
368 assert!(!it.is_done());
369 it.next();
370 assert!(it.is_done());
371 }
372
373 #[rstest]
374 fn test_partial_consumption_then_complete() {
375 let mut it = BacktestDataIterator::new();
376 it.add_data(
377 "s",
378 vec![
379 quote("A.B", 0),
380 quote("A.B", 1),
381 quote("A.B", 2),
382 quote("A.B", 3),
383 ],
384 true,
385 );
386
387 assert_eq!(it.next().unwrap().ts_init().as_u64(), 0);
388 assert_eq!(it.next().unwrap().ts_init().as_u64(), 1);
389
390 let remaining = collect_ts(&mut it);
391 assert_eq!(remaining, vec![2, 3]);
392 assert!(it.is_done());
393 }
394
395 #[rstest]
396 fn test_remove_stream_reduces_output() {
397 let mut it = BacktestDataIterator::new();
398 it.add_data("a", vec![quote("A.B", 1)], true);
399 it.add_data("b", vec![quote("C.D", 2)], true);
400
401 it.remove_data("a", false);
402
403 assert_eq!(collect_ts(&mut it), vec![2]);
404 }
405
406 #[rstest]
407 fn test_remove_all_streams_yields_empty() {
408 let mut it = BacktestDataIterator::new();
409 it.add_data("x", vec![quote("A.B", 1)], true);
410 it.add_data("y", vec![quote("C.D", 2)], true);
411
412 it.remove_data("x", false);
413 it.remove_data("y", false);
414
415 assert!(it.next().is_none());
416 assert!(it.is_done());
417 }
418
419 #[rstest]
420 fn test_remove_nonexistent_stream_is_noop() {
421 let mut it = BacktestDataIterator::new();
422 it.add_data("s", vec![quote("A.B", 1)], true);
423
424 it.remove_data("nonexistent", false);
425
426 assert_eq!(collect_ts(&mut it), vec![1]);
427 }
428
429 #[rstest]
430 fn test_remove_after_full_consumption() {
431 let mut it = BacktestDataIterator::new();
432 it.add_data("s", vec![quote("A.B", 1), quote("A.B", 2)], true);
433
434 collect_ts(&mut it);
435
436 it.remove_data("s", true);
437 assert!(it.is_done());
438 }
439
440 #[rstest]
441 fn test_set_index_rewinds_stream() {
442 let mut it = BacktestDataIterator::new();
443 it.add_data(
444 "s",
445 vec![quote("A.B", 10), quote("A.B", 20), quote("A.B", 30)],
446 true,
447 );
448
449 assert_eq!(it.next().unwrap().ts_init().as_u64(), 10);
450
451 it.set_index("s", 0);
452
453 assert_eq!(collect_ts(&mut it), vec![10, 20, 30]);
454 }
455
456 #[rstest]
457 fn test_set_index_skips_forward() {
458 let mut it = BacktestDataIterator::new();
459 it.add_data(
460 "s",
461 vec![quote("A.B", 10), quote("A.B", 20), quote("A.B", 30)],
462 true,
463 );
464
465 it.set_index("s", 2);
466
467 assert_eq!(collect_ts(&mut it), vec![30]);
468 }
469
470 #[rstest]
471 fn test_set_index_nonexistent_stream_is_noop() {
472 let mut it = BacktestDataIterator::new();
473 it.add_data("s", vec![quote("A.B", 1)], true);
474
475 it.set_index("nonexistent", 0);
476
477 assert_eq!(collect_ts(&mut it), vec![1]);
478 }
479
480 #[rstest]
481 fn test_reset_all_cursors_single_stream() {
482 let mut it = BacktestDataIterator::new();
483 it.add_data("s", vec![quote("A.B", 1), quote("A.B", 2)], true);
484
485 collect_ts(&mut it);
486 assert!(it.is_done());
487
488 it.reset_all_cursors();
489 assert!(!it.is_done());
490 assert_eq!(collect_ts(&mut it), vec![1, 2]);
491 }
492
493 #[rstest]
494 fn test_reset_all_cursors_multi_stream() {
495 let mut it = BacktestDataIterator::new();
496 it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 3)], true);
497 it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 4)], true);
498
499 collect_ts(&mut it);
500 assert!(it.is_done());
501
502 it.reset_all_cursors();
503 assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
504 }
505
506 #[rstest]
507 fn test_readding_data_replaces_stream() {
508 let mut it = BacktestDataIterator::new();
509 it.add_data("X", vec![quote("A.B", 1), quote("A.B", 2)], true);
510 it.add_data("X", vec![quote("A.B", 10)], true);
511
512 assert_eq!(collect_ts(&mut it), vec![10]);
513 }
514
515 #[rstest]
516 fn test_add_empty_data_is_noop() {
517 let mut it = BacktestDataIterator::new();
518 it.add_data("empty", vec![], true);
519
520 assert!(it.is_done());
521 assert!(it.next().is_none());
522 }
523
524 #[rstest]
525 fn test_empty_iterator_returns_none() {
526 let mut it = BacktestDataIterator::new();
527 assert!(it.next().is_none());
528 assert!(it.is_done());
529 }
530
531 #[rstest]
532 fn test_multiple_add_data_calls_with_different_names() {
533 let mut it = BacktestDataIterator::new();
534 it.add_data("batch_0", vec![quote("A.B", 1), quote("A.B", 3)], true);
535 it.add_data("batch_1", vec![quote("A.B", 2), quote("A.B", 4)], true);
536
537 assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
538 }
539
540 #[rstest]
541 fn test_prepend_stream_always_wins_ties_across_batches() {
542 let mut it = BacktestDataIterator::new();
545 it.add_data("append_a", vec![quote("A.B", 100)], true);
546 it.add_data("append_b", vec![quote("C.D", 100)], true);
547 it.add_data("prepend", vec![quote("E.F", 100)], false);
548
549 let first = it.next().unwrap();
550 assert_eq!(
551 first.instrument_id(),
552 InstrumentId::from("E.F"),
553 "Prepend stream should always come first in ties"
554 );
555 }
556
557 #[rstest]
558 fn test_equal_timestamps_across_many_streams_preserves_priority_order() {
559 let mut it = BacktestDataIterator::new();
561 it.add_data("s1", vec![quote("A.B", 50)], true);
562 it.add_data("s2", vec![quote("C.D", 50)], true);
563 it.add_data("s3", vec![quote("E.F", 50)], true);
564 it.add_data("s4", vec![quote("G.H", 50)], true);
565
566 let mut ids = Vec::new();
567 while let Some(d) = it.next() {
568 ids.push(d.instrument_id());
569 }
570
571 assert_eq!(ids.len(), 4);
572
573 assert!(ids.contains(&InstrumentId::from("A.B")));
575 assert!(ids.contains(&InstrumentId::from("C.D")));
576 assert!(ids.contains(&InstrumentId::from("E.F")));
577 assert!(ids.contains(&InstrumentId::from("G.H")));
578 }
579}