1use std::{cmp::Reverse, collections::BinaryHeap};
19
20use nautilus_common::{clock::TestClock, timer::TimeEventHandler};
21use nautilus_core::UnixNanos;
22
23#[derive(Debug)]
28pub struct TimeEventAccumulator {
29 heap: BinaryHeap<Reverse<TimeEventHandler>>,
30}
31
32impl TimeEventAccumulator {
33 #[must_use]
35 pub fn new() -> Self {
36 Self {
37 heap: BinaryHeap::new(),
38 }
39 }
40
41 pub fn advance_clock(&mut self, clock: &mut TestClock, to_time_ns: UnixNanos, set_time: bool) {
43 let events = clock.advance_time(to_time_ns, set_time);
44 let handlers = clock.match_handlers(events);
45 for handler in handlers {
46 self.heap.push(Reverse(handler));
47 }
48 }
49
50 #[must_use]
54 pub fn peek_next_time(&self) -> Option<UnixNanos> {
55 self.heap.peek().map(|h| h.0.event.ts_event)
56 }
57
58 pub fn pop_next_at_or_before(&mut self, ts: UnixNanos) -> Option<TimeEventHandler> {
62 if self.heap.peek().is_some_and(|h| h.0.event.ts_event <= ts) {
63 self.heap.pop().map(|h| h.0)
64 } else {
65 None
66 }
67 }
68
69 #[must_use]
71 pub fn is_empty(&self) -> bool {
72 self.heap.is_empty()
73 }
74
75 #[must_use]
77 pub fn len(&self) -> usize {
78 self.heap.len()
79 }
80
81 pub fn clear(&mut self) {
83 self.heap.clear();
84 }
85
86 pub fn drain(&mut self) -> Vec<TimeEventHandler> {
91 let mut handlers = Vec::with_capacity(self.heap.len());
92 while let Some(scheduled) = self.heap.pop() {
93 handlers.push(scheduled.0);
94 }
95 handlers
96 }
97}
98
99impl Default for TimeEventAccumulator {
100 fn default() -> Self {
102 Self::new()
103 }
104}
105
106#[cfg(all(test, feature = "python"))]
107mod tests {
108 use nautilus_common::timer::{TimeEvent, TimeEventCallback};
109 use nautilus_core::UUID4;
110 use pyo3::{Py, Python, prelude::*, types::PyList};
111 use rstest::*;
112 use ustr::Ustr;
113
114 use super::*;
115
116 #[rstest]
117 fn test_accumulator_pop_in_order() {
118 Python::initialize();
119 Python::attach(|py| {
120 let py_list = PyList::empty(py);
121 let py_append = Py::from(py_list.getattr("append").unwrap());
122
123 let mut accumulator = TimeEventAccumulator::new();
124
125 let time_event1 = TimeEvent::new(
126 Ustr::from("TEST_EVENT_1"),
127 UUID4::new(),
128 100.into(),
129 100.into(),
130 );
131 let time_event2 = TimeEvent::new(
132 Ustr::from("TEST_EVENT_2"),
133 UUID4::new(),
134 300.into(),
135 300.into(),
136 );
137 let time_event3 = TimeEvent::new(
138 Ustr::from("TEST_EVENT_3"),
139 UUID4::new(),
140 200.into(),
141 200.into(),
142 );
143
144 let callback = TimeEventCallback::from(py_append.into_any());
145
146 let handler1 = TimeEventHandler::new(time_event1.clone(), callback.clone());
147 let handler2 = TimeEventHandler::new(time_event2.clone(), callback.clone());
148 let handler3 = TimeEventHandler::new(time_event3.clone(), callback);
149
150 accumulator.heap.push(Reverse(handler1));
151 accumulator.heap.push(Reverse(handler2));
152 accumulator.heap.push(Reverse(handler3));
153 assert_eq!(accumulator.len(), 3);
154
155 let popped1 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
156 assert_eq!(popped1.event.ts_event, time_event1.ts_event);
157
158 let popped2 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
159 assert_eq!(popped2.event.ts_event, time_event3.ts_event);
160
161 let popped3 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
162 assert_eq!(popped3.event.ts_event, time_event2.ts_event);
163
164 assert!(accumulator.is_empty());
165 });
166 }
167
168 #[rstest]
169 fn test_accumulator_pop_same_timestamp_in_name_order() {
170 Python::initialize();
171 Python::attach(|py| {
172 let py_list = PyList::empty(py);
173 let py_append = Py::from(py_list.getattr("append").unwrap());
174
175 let mut accumulator = TimeEventAccumulator::new();
176 let callback = TimeEventCallback::from(py_append.into_any());
177
178 let spread_event = TimeEvent::new(
179 Ustr::from("SPREAD_QUOTE_ESM4"),
180 UUID4::new(),
181 100.into(),
182 100.into(),
183 );
184 let time_bar_event = TimeEvent::new(
185 Ustr::from("TIME_BAR_ESM4-2-MINUTE-ASK-INTERNAL"),
186 UUID4::new(),
187 100.into(),
188 100.into(),
189 );
190
191 accumulator.heap.push(Reverse(TimeEventHandler::new(
192 time_bar_event.clone(),
193 callback.clone(),
194 )));
195 accumulator.heap.push(Reverse(TimeEventHandler::new(
196 spread_event.clone(),
197 callback,
198 )));
199
200 let popped1 = accumulator.pop_next_at_or_before(100.into()).unwrap();
201 assert_eq!(popped1.event.ts_event, spread_event.ts_event);
202 assert_eq!(popped1.event.name, spread_event.name);
203
204 let popped2 = accumulator.pop_next_at_or_before(100.into()).unwrap();
205 assert_eq!(popped2.event.ts_event, time_bar_event.ts_event);
206 assert_eq!(popped2.event.name, time_bar_event.name);
207 });
208 }
209
210 #[rstest]
211 fn test_accumulator_pop_respects_timestamp() {
212 Python::initialize();
213 Python::attach(|py| {
214 let py_list = PyList::empty(py);
215 let py_append = Py::from(py_list.getattr("append").unwrap());
216
217 let mut accumulator = TimeEventAccumulator::new();
218
219 let time_event1 = TimeEvent::new(
220 Ustr::from("TEST_EVENT_1"),
221 UUID4::new(),
222 100.into(),
223 100.into(),
224 );
225 let time_event2 = TimeEvent::new(
226 Ustr::from("TEST_EVENT_2"),
227 UUID4::new(),
228 300.into(),
229 300.into(),
230 );
231
232 let callback = TimeEventCallback::from(py_append.into_any());
233
234 accumulator.heap.push(Reverse(TimeEventHandler::new(
235 time_event1.clone(),
236 callback.clone(),
237 )));
238 accumulator.heap.push(Reverse(TimeEventHandler::new(
239 time_event2.clone(),
240 callback,
241 )));
242
243 let popped1 = accumulator.pop_next_at_or_before(200.into()).unwrap();
244 assert_eq!(popped1.event.ts_event, time_event1.ts_event);
245
246 assert!(accumulator.pop_next_at_or_before(200.into()).is_none());
248
249 let popped2 = accumulator.pop_next_at_or_before(300.into()).unwrap();
250 assert_eq!(popped2.event.ts_event, time_event2.ts_event);
251 });
252 }
253
254 #[rstest]
255 fn test_peek_next_time() {
256 Python::initialize();
257 Python::attach(|py| {
258 let py_list = PyList::empty(py);
259 let py_append = Py::from(py_list.getattr("append").unwrap());
260 let callback = TimeEventCallback::from(py_append.into_any());
261
262 let mut accumulator = TimeEventAccumulator::new();
263 assert!(accumulator.peek_next_time().is_none());
264
265 let time_event1 = TimeEvent::new(
266 Ustr::from("TEST_EVENT_1"),
267 UUID4::new(),
268 200.into(),
269 200.into(),
270 );
271 let time_event2 = TimeEvent::new(
272 Ustr::from("TEST_EVENT_2"),
273 UUID4::new(),
274 100.into(),
275 100.into(),
276 );
277
278 accumulator.heap.push(Reverse(TimeEventHandler::new(
279 time_event1,
280 callback.clone(),
281 )));
282 assert_eq!(accumulator.peek_next_time(), Some(200.into()));
283
284 accumulator
285 .heap
286 .push(Reverse(TimeEventHandler::new(time_event2, callback)));
287 assert_eq!(accumulator.peek_next_time(), Some(100.into()));
288 });
289 }
290
291 #[rstest]
292 fn test_drain_returns_in_order() {
293 Python::initialize();
294 Python::attach(|py| {
295 let py_list = PyList::empty(py);
296 let py_append = Py::from(py_list.getattr("append").unwrap());
297 let callback = TimeEventCallback::from(py_append.into_any());
298
299 let mut accumulator = TimeEventAccumulator::new();
300
301 for ts in [300u64, 100, 200] {
302 let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), ts.into(), ts.into());
303 accumulator
304 .heap
305 .push(Reverse(TimeEventHandler::new(event, callback.clone())));
306 }
307
308 let handlers = accumulator.drain();
309
310 assert_eq!(handlers.len(), 3);
311 assert_eq!(handlers[0].event.ts_event.as_u64(), 100);
312 assert_eq!(handlers[1].event.ts_event.as_u64(), 200);
313 assert_eq!(handlers[2].event.ts_event.as_u64(), 300);
314 assert!(accumulator.is_empty());
315 });
316 }
317
318 #[rstest]
319 fn test_interleaved_push_pop_maintains_order() {
320 Python::initialize();
321 Python::attach(|py| {
322 let py_list = PyList::empty(py);
323 let py_append = Py::from(py_list.getattr("append").unwrap());
324 let callback = TimeEventCallback::from(py_append.into_any());
325
326 let mut accumulator = TimeEventAccumulator::new();
327 let mut popped_timestamps: Vec<u64> = Vec::new();
328
329 for ts in [100u64, 300] {
330 let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), ts.into(), ts.into());
331 accumulator
332 .heap
333 .push(Reverse(TimeEventHandler::new(event, callback.clone())));
334 }
335
336 let handler = accumulator.pop_next_at_or_before(1000.into()).unwrap();
337 popped_timestamps.push(handler.event.ts_event.as_u64());
338
339 let event = TimeEvent::new(Ustr::from("NEW"), UUID4::new(), 150.into(), 150.into());
341 accumulator
342 .heap
343 .push(Reverse(TimeEventHandler::new(event, callback)));
344
345 while let Some(handler) = accumulator.pop_next_at_or_before(1000.into()) {
346 popped_timestamps.push(handler.event.ts_event.as_u64());
347 }
348
349 assert_eq!(popped_timestamps, vec![100, 150, 300]);
350 });
351 }
352
353 #[rstest]
354 fn test_same_timestamp_events() {
355 Python::initialize();
356 Python::attach(|py| {
357 let py_list = PyList::empty(py);
358 let py_append = Py::from(py_list.getattr("append").unwrap());
359 let callback = TimeEventCallback::from(py_append.into_any());
360
361 let mut accumulator = TimeEventAccumulator::new();
362
363 for i in 0..3 {
364 let event = TimeEvent::new(
365 Ustr::from(&format!("EVENT_{i}")),
366 UUID4::new(),
367 100.into(),
368 100.into(),
369 );
370 accumulator
371 .heap
372 .push(Reverse(TimeEventHandler::new(event, callback.clone())));
373 }
374
375 let mut count = 0;
376
377 while let Some(handler) = accumulator.pop_next_at_or_before(100.into()) {
378 assert_eq!(handler.event.ts_event.as_u64(), 100);
379 count += 1;
380 }
381 assert_eq!(count, 3);
382 });
383 }
384
385 #[rstest]
386 fn test_pop_at_exact_timestamp_boundary() {
387 Python::initialize();
388 Python::attach(|py| {
389 let py_list = PyList::empty(py);
390 let py_append = Py::from(py_list.getattr("append").unwrap());
391 let callback = TimeEventCallback::from(py_append.into_any());
392
393 let mut accumulator = TimeEventAccumulator::new();
394
395 let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), 100.into(), 100.into());
396 accumulator
397 .heap
398 .push(Reverse(TimeEventHandler::new(event, callback)));
399
400 let handler = accumulator.pop_next_at_or_before(100.into());
401 assert!(handler.is_some());
402 assert_eq!(handler.unwrap().event.ts_event.as_u64(), 100);
403
404 let event2 = TimeEvent::new(Ustr::from("TEST2"), UUID4::new(), 200.into(), 200.into());
405 accumulator.heap.push(Reverse(TimeEventHandler::new(
406 event2,
407 TimeEventCallback::from(Py::from(py_list.getattr("append").unwrap()).into_any()),
408 )));
409
410 assert!(accumulator.pop_next_at_or_before(199.into()).is_none());
411 assert!(accumulator.pop_next_at_or_before(200.into()).is_some());
412 });
413 }
414}