Skip to main content

nautilus_backtest/
accumulator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Time event accumulation and scheduling for the backtest engine.
17
18use std::{cmp::Reverse, collections::BinaryHeap};
19
20use nautilus_common::{clock::TestClock, timer::TimeEventHandler};
21use nautilus_core::UnixNanos;
22
23/// Provides a means of accumulating and draining time event handlers using a priority queue.
24///
25/// Events are maintained in timestamp order using a binary heap, allowing efficient
26/// retrieval of the next event to process.
27#[derive(Debug)]
28pub struct TimeEventAccumulator {
29    heap: BinaryHeap<Reverse<TimeEventHandler>>,
30}
31
32impl TimeEventAccumulator {
33    /// Creates a new [`TimeEventAccumulator`] instance.
34    #[must_use]
35    pub fn new() -> Self {
36        Self {
37            heap: BinaryHeap::new(),
38        }
39    }
40
41    /// Advance the given clock to the `to_time_ns` and push events to the heap.
42    pub fn advance_clock(&mut self, clock: &mut TestClock, to_time_ns: UnixNanos, set_time: bool) {
43        let events = clock.advance_time(to_time_ns, set_time);
44        let handlers = clock.match_handlers(events);
45        for handler in handlers {
46            self.heap.push(Reverse(handler));
47        }
48    }
49
50    /// Peek at the next event timestamp without removing it.
51    ///
52    /// Returns `None` if the heap is empty.
53    #[must_use]
54    pub fn peek_next_time(&self) -> Option<UnixNanos> {
55        self.heap.peek().map(|h| h.0.event.ts_event)
56    }
57
58    /// Pop the next event if its timestamp is at or before `ts`.
59    ///
60    /// Returns `None` if the heap is empty or the next event is after `ts`.
61    pub fn pop_next_at_or_before(&mut self, ts: UnixNanos) -> Option<TimeEventHandler> {
62        if self.heap.peek().is_some_and(|h| h.0.event.ts_event <= ts) {
63            self.heap.pop().map(|h| h.0)
64        } else {
65            None
66        }
67    }
68
69    /// Check if the heap is empty.
70    #[must_use]
71    pub fn is_empty(&self) -> bool {
72        self.heap.is_empty()
73    }
74
75    /// Get the number of events in the heap.
76    #[must_use]
77    pub fn len(&self) -> usize {
78        self.heap.len()
79    }
80
81    /// Clear all events from the heap.
82    pub fn clear(&mut self) {
83        self.heap.clear();
84    }
85
86    /// Drain all events from the heap in timestamp order.
87    ///
88    /// This is provided for backwards compatibility with code that expects
89    /// batch processing. For iterative processing, prefer `pop_next_at_or_before`.
90    pub fn drain(&mut self) -> Vec<TimeEventHandler> {
91        let mut handlers = Vec::with_capacity(self.heap.len());
92        while let Some(scheduled) = self.heap.pop() {
93            handlers.push(scheduled.0);
94        }
95        handlers
96    }
97}
98
99impl Default for TimeEventAccumulator {
100    /// Creates a new default [`TimeEventAccumulator`] instance.
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106#[cfg(all(test, feature = "python"))]
107mod tests {
108    use nautilus_common::timer::{TimeEvent, TimeEventCallback};
109    use nautilus_core::UUID4;
110    use pyo3::{Py, Python, prelude::*, types::PyList};
111    use rstest::*;
112    use ustr::Ustr;
113
114    use super::*;
115
116    #[rstest]
117    fn test_accumulator_pop_in_order() {
118        Python::initialize();
119        Python::attach(|py| {
120            let py_list = PyList::empty(py);
121            let py_append = Py::from(py_list.getattr("append").unwrap());
122
123            let mut accumulator = TimeEventAccumulator::new();
124
125            let time_event1 = TimeEvent::new(
126                Ustr::from("TEST_EVENT_1"),
127                UUID4::new(),
128                100.into(),
129                100.into(),
130            );
131            let time_event2 = TimeEvent::new(
132                Ustr::from("TEST_EVENT_2"),
133                UUID4::new(),
134                300.into(),
135                300.into(),
136            );
137            let time_event3 = TimeEvent::new(
138                Ustr::from("TEST_EVENT_3"),
139                UUID4::new(),
140                200.into(),
141                200.into(),
142            );
143
144            let callback = TimeEventCallback::from(py_append.into_any());
145
146            let handler1 = TimeEventHandler::new(time_event1.clone(), callback.clone());
147            let handler2 = TimeEventHandler::new(time_event2.clone(), callback.clone());
148            let handler3 = TimeEventHandler::new(time_event3.clone(), callback);
149
150            accumulator.heap.push(Reverse(handler1));
151            accumulator.heap.push(Reverse(handler2));
152            accumulator.heap.push(Reverse(handler3));
153            assert_eq!(accumulator.len(), 3);
154
155            let popped1 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
156            assert_eq!(popped1.event.ts_event, time_event1.ts_event);
157
158            let popped2 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
159            assert_eq!(popped2.event.ts_event, time_event3.ts_event);
160
161            let popped3 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
162            assert_eq!(popped3.event.ts_event, time_event2.ts_event);
163
164            assert!(accumulator.is_empty());
165        });
166    }
167
168    #[rstest]
169    fn test_accumulator_pop_same_timestamp_in_name_order() {
170        Python::initialize();
171        Python::attach(|py| {
172            let py_list = PyList::empty(py);
173            let py_append = Py::from(py_list.getattr("append").unwrap());
174
175            let mut accumulator = TimeEventAccumulator::new();
176            let callback = TimeEventCallback::from(py_append.into_any());
177
178            let spread_event = TimeEvent::new(
179                Ustr::from("SPREAD_QUOTE_ESM4"),
180                UUID4::new(),
181                100.into(),
182                100.into(),
183            );
184            let time_bar_event = TimeEvent::new(
185                Ustr::from("TIME_BAR_ESM4-2-MINUTE-ASK-INTERNAL"),
186                UUID4::new(),
187                100.into(),
188                100.into(),
189            );
190
191            accumulator.heap.push(Reverse(TimeEventHandler::new(
192                time_bar_event.clone(),
193                callback.clone(),
194            )));
195            accumulator.heap.push(Reverse(TimeEventHandler::new(
196                spread_event.clone(),
197                callback,
198            )));
199
200            let popped1 = accumulator.pop_next_at_or_before(100.into()).unwrap();
201            assert_eq!(popped1.event.ts_event, spread_event.ts_event);
202            assert_eq!(popped1.event.name, spread_event.name);
203
204            let popped2 = accumulator.pop_next_at_or_before(100.into()).unwrap();
205            assert_eq!(popped2.event.ts_event, time_bar_event.ts_event);
206            assert_eq!(popped2.event.name, time_bar_event.name);
207        });
208    }
209
210    #[rstest]
211    fn test_accumulator_pop_respects_timestamp() {
212        Python::initialize();
213        Python::attach(|py| {
214            let py_list = PyList::empty(py);
215            let py_append = Py::from(py_list.getattr("append").unwrap());
216
217            let mut accumulator = TimeEventAccumulator::new();
218
219            let time_event1 = TimeEvent::new(
220                Ustr::from("TEST_EVENT_1"),
221                UUID4::new(),
222                100.into(),
223                100.into(),
224            );
225            let time_event2 = TimeEvent::new(
226                Ustr::from("TEST_EVENT_2"),
227                UUID4::new(),
228                300.into(),
229                300.into(),
230            );
231
232            let callback = TimeEventCallback::from(py_append.into_any());
233
234            accumulator.heap.push(Reverse(TimeEventHandler::new(
235                time_event1.clone(),
236                callback.clone(),
237            )));
238            accumulator.heap.push(Reverse(TimeEventHandler::new(
239                time_event2.clone(),
240                callback,
241            )));
242
243            let popped1 = accumulator.pop_next_at_or_before(200.into()).unwrap();
244            assert_eq!(popped1.event.ts_event, time_event1.ts_event);
245
246            // Event at 300 should not be returned with ts=200
247            assert!(accumulator.pop_next_at_or_before(200.into()).is_none());
248
249            let popped2 = accumulator.pop_next_at_or_before(300.into()).unwrap();
250            assert_eq!(popped2.event.ts_event, time_event2.ts_event);
251        });
252    }
253
254    #[rstest]
255    fn test_peek_next_time() {
256        Python::initialize();
257        Python::attach(|py| {
258            let py_list = PyList::empty(py);
259            let py_append = Py::from(py_list.getattr("append").unwrap());
260            let callback = TimeEventCallback::from(py_append.into_any());
261
262            let mut accumulator = TimeEventAccumulator::new();
263            assert!(accumulator.peek_next_time().is_none());
264
265            let time_event1 = TimeEvent::new(
266                Ustr::from("TEST_EVENT_1"),
267                UUID4::new(),
268                200.into(),
269                200.into(),
270            );
271            let time_event2 = TimeEvent::new(
272                Ustr::from("TEST_EVENT_2"),
273                UUID4::new(),
274                100.into(),
275                100.into(),
276            );
277
278            accumulator.heap.push(Reverse(TimeEventHandler::new(
279                time_event1,
280                callback.clone(),
281            )));
282            assert_eq!(accumulator.peek_next_time(), Some(200.into()));
283
284            accumulator
285                .heap
286                .push(Reverse(TimeEventHandler::new(time_event2, callback)));
287            assert_eq!(accumulator.peek_next_time(), Some(100.into()));
288        });
289    }
290
291    #[rstest]
292    fn test_drain_returns_in_order() {
293        Python::initialize();
294        Python::attach(|py| {
295            let py_list = PyList::empty(py);
296            let py_append = Py::from(py_list.getattr("append").unwrap());
297            let callback = TimeEventCallback::from(py_append.into_any());
298
299            let mut accumulator = TimeEventAccumulator::new();
300
301            for ts in [300u64, 100, 200] {
302                let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), ts.into(), ts.into());
303                accumulator
304                    .heap
305                    .push(Reverse(TimeEventHandler::new(event, callback.clone())));
306            }
307
308            let handlers = accumulator.drain();
309
310            assert_eq!(handlers.len(), 3);
311            assert_eq!(handlers[0].event.ts_event.as_u64(), 100);
312            assert_eq!(handlers[1].event.ts_event.as_u64(), 200);
313            assert_eq!(handlers[2].event.ts_event.as_u64(), 300);
314            assert!(accumulator.is_empty());
315        });
316    }
317
318    #[rstest]
319    fn test_interleaved_push_pop_maintains_order() {
320        Python::initialize();
321        Python::attach(|py| {
322            let py_list = PyList::empty(py);
323            let py_append = Py::from(py_list.getattr("append").unwrap());
324            let callback = TimeEventCallback::from(py_append.into_any());
325
326            let mut accumulator = TimeEventAccumulator::new();
327            let mut popped_timestamps: Vec<u64> = Vec::new();
328
329            for ts in [100u64, 300] {
330                let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), ts.into(), ts.into());
331                accumulator
332                    .heap
333                    .push(Reverse(TimeEventHandler::new(event, callback.clone())));
334            }
335
336            let handler = accumulator.pop_next_at_or_before(1000.into()).unwrap();
337            popped_timestamps.push(handler.event.ts_event.as_u64());
338
339            // Simulate callback scheduling new event at 150 (between popped 100 and pending 300)
340            let event = TimeEvent::new(Ustr::from("NEW"), UUID4::new(), 150.into(), 150.into());
341            accumulator
342                .heap
343                .push(Reverse(TimeEventHandler::new(event, callback)));
344
345            while let Some(handler) = accumulator.pop_next_at_or_before(1000.into()) {
346                popped_timestamps.push(handler.event.ts_event.as_u64());
347            }
348
349            assert_eq!(popped_timestamps, vec![100, 150, 300]);
350        });
351    }
352
353    #[rstest]
354    fn test_same_timestamp_events() {
355        Python::initialize();
356        Python::attach(|py| {
357            let py_list = PyList::empty(py);
358            let py_append = Py::from(py_list.getattr("append").unwrap());
359            let callback = TimeEventCallback::from(py_append.into_any());
360
361            let mut accumulator = TimeEventAccumulator::new();
362
363            for i in 0..3 {
364                let event = TimeEvent::new(
365                    Ustr::from(&format!("EVENT_{i}")),
366                    UUID4::new(),
367                    100.into(),
368                    100.into(),
369                );
370                accumulator
371                    .heap
372                    .push(Reverse(TimeEventHandler::new(event, callback.clone())));
373            }
374
375            let mut count = 0;
376
377            while let Some(handler) = accumulator.pop_next_at_or_before(100.into()) {
378                assert_eq!(handler.event.ts_event.as_u64(), 100);
379                count += 1;
380            }
381            assert_eq!(count, 3);
382        });
383    }
384
385    #[rstest]
386    fn test_pop_at_exact_timestamp_boundary() {
387        Python::initialize();
388        Python::attach(|py| {
389            let py_list = PyList::empty(py);
390            let py_append = Py::from(py_list.getattr("append").unwrap());
391            let callback = TimeEventCallback::from(py_append.into_any());
392
393            let mut accumulator = TimeEventAccumulator::new();
394
395            let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), 100.into(), 100.into());
396            accumulator
397                .heap
398                .push(Reverse(TimeEventHandler::new(event, callback)));
399
400            let handler = accumulator.pop_next_at_or_before(100.into());
401            assert!(handler.is_some());
402            assert_eq!(handler.unwrap().event.ts_event.as_u64(), 100);
403
404            let event2 = TimeEvent::new(Ustr::from("TEST2"), UUID4::new(), 200.into(), 200.into());
405            accumulator.heap.push(Reverse(TimeEventHandler::new(
406                event2,
407                TimeEventCallback::from(Py::from(py_list.getattr("append").unwrap()).into_any()),
408            )));
409
410            assert!(accumulator.pop_next_at_or_before(199.into()).is_none());
411            assert!(accumulator.pop_next_at_or_before(200.into()).is_some());
412        });
413    }
414}