1use std::{any::Any, collections::BTreeMap, fmt::Debug, ops::Deref, time::Duration};
19
20use ahash::AHashMap;
21use chrono::{DateTime, Utc};
22use nautilus_core::{
23 AtomicTime, UnixNanos,
24 correctness::{check_positive_u64, check_predicate_true, check_valid_string_utf8},
25 string::formatting::Separable,
26};
27use ustr::Ustr;
28
29use crate::timer::{
30 TestTimer, TimeEvent, TimeEventCallback, TimeEventHandler, Timer, create_valid_interval,
31};
32
33pub trait Clock: Debug + Any {
39 fn utc_now(&self) -> DateTime<Utc> {
41 DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
42 }
43
44 fn timestamp_ns(&self) -> UnixNanos;
46
47 fn timestamp_us(&self) -> u64;
49
50 fn timestamp_ms(&self) -> u64;
52
53 fn timestamp(&self) -> f64;
55
56 fn timer_names(&self) -> Vec<&str>;
58
59 fn timer_count(&self) -> usize;
61
62 fn timer_exists(&self, name: &Ustr) -> bool;
64
65 fn register_default_handler(&mut self, callback: TimeEventCallback);
68
69 fn get_handler(&self, event: TimeEvent) -> TimeEventHandler;
73
74 fn set_time_alert(
88 &mut self,
89 name: &str,
90 alert_time: DateTime<Utc>,
91 callback: Option<TimeEventCallback>,
92 allow_past: Option<bool>,
93 ) -> anyhow::Result<()> {
94 self.set_time_alert_ns(name, alert_time.into(), callback, allow_past)
95 }
96
97 fn set_time_alert_ns(
118 &mut self,
119 name: &str,
120 alert_time_ns: UnixNanos,
121 callback: Option<TimeEventCallback>,
122 allow_past: Option<bool>,
123 ) -> anyhow::Result<()>;
124
125 #[expect(clippy::too_many_arguments)]
141 fn set_timer(
142 &mut self,
143 name: &str,
144 interval: Duration,
145 start_time: Option<DateTime<Utc>>,
146 stop_time: Option<DateTime<Utc>>,
147 callback: Option<TimeEventCallback>,
148 allow_past: Option<bool>,
149 fire_immediately: Option<bool>,
150 ) -> anyhow::Result<()> {
151 self.set_timer_ns(
152 name,
153 interval.as_nanos() as u64,
154 start_time.map(UnixNanos::from),
155 stop_time.map(UnixNanos::from),
156 callback,
157 allow_past,
158 fire_immediately,
159 )
160 }
161
162 #[expect(clippy::too_many_arguments)]
190 fn set_timer_ns(
191 &mut self,
192 name: &str,
193 interval_ns: u64,
194 start_time_ns: Option<UnixNanos>,
195 stop_time_ns: Option<UnixNanos>,
196 callback: Option<TimeEventCallback>,
197 allow_past: Option<bool>,
198 fire_immediately: Option<bool>,
199 ) -> anyhow::Result<()>;
200
201 fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
205
206 fn cancel_timer(&mut self, name: &str);
208
209 fn cancel_timers(&mut self);
211
212 fn reset(&mut self);
214}
215
216impl dyn Clock {
217 pub fn as_any(&self) -> &dyn std::any::Any {
219 self
220 }
221 pub fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
223 self
224 }
225}
226
227#[derive(Debug, Default)]
232pub struct CallbackRegistry {
233 default_callback: Option<TimeEventCallback>,
234 callbacks: AHashMap<Ustr, TimeEventCallback>,
235}
236
237impl CallbackRegistry {
238 #[must_use]
240 pub fn new() -> Self {
241 Self {
242 default_callback: None,
243 callbacks: AHashMap::new(),
244 }
245 }
246
247 pub fn register_default_handler(&mut self, callback: TimeEventCallback) {
249 self.default_callback = Some(callback);
250 }
251
252 pub fn register_callback(&mut self, name: Ustr, callback: TimeEventCallback) {
254 self.callbacks.insert(name, callback);
255 }
256
257 #[must_use]
259 pub fn has_any_callback(&self, name: &Ustr) -> bool {
260 self.callbacks.contains_key(name) || self.default_callback.is_some()
261 }
262
263 #[must_use]
265 pub fn get_callback(&self, name: &Ustr) -> Option<TimeEventCallback> {
266 self.callbacks
267 .get(name)
268 .cloned()
269 .or_else(|| self.default_callback.clone())
270 }
271
272 #[must_use]
278 pub fn get_handler(&self, event: TimeEvent) -> TimeEventHandler {
279 let callback = self
280 .get_callback(&event.name)
281 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
282
283 TimeEventHandler::new(event, callback)
284 }
285
286 pub fn clear(&mut self) {
288 self.callbacks.clear();
289 }
290}
291
292pub fn validate_and_prepare_time_alert(
300 name: &str,
301 mut alert_time_ns: UnixNanos,
302 allow_past: Option<bool>,
303 ts_now: UnixNanos,
304) -> anyhow::Result<(Ustr, UnixNanos)> {
305 check_valid_string_utf8(name, stringify!(name))?;
306
307 let name = Ustr::from(name);
308 let allow_past = allow_past.unwrap_or(true);
309
310 if alert_time_ns < ts_now {
311 if allow_past {
312 alert_time_ns = ts_now;
313 log::warn!(
314 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
315 alert_time_ns.to_rfc3339(),
316 );
317 } else {
318 anyhow::bail!(
319 "Timer '{name}' alert time {} was in the past (current time is {ts_now})",
320 alert_time_ns.to_rfc3339(),
321 );
322 }
323 }
324
325 Ok((name, alert_time_ns))
326}
327
328pub fn validate_and_prepare_timer(
337 name: &str,
338 interval_ns: u64,
339 start_time_ns: Option<UnixNanos>,
340 stop_time_ns: Option<UnixNanos>,
341 allow_past: Option<bool>,
342 fire_immediately: Option<bool>,
343 ts_now: UnixNanos,
344) -> anyhow::Result<(Ustr, UnixNanos, Option<UnixNanos>, bool, bool)> {
345 check_valid_string_utf8(name, stringify!(name))?;
346 check_positive_u64(interval_ns, stringify!(interval_ns))?;
347
348 let name = Ustr::from(name);
349 let allow_past = allow_past.unwrap_or(true);
350 let fire_immediately = fire_immediately.unwrap_or(false);
351
352 let mut start_time_ns = start_time_ns.unwrap_or_default();
353
354 if start_time_ns == 0 {
355 start_time_ns = ts_now;
357 } else if !allow_past {
358 let next_event_time = if fire_immediately {
359 start_time_ns
360 } else {
361 start_time_ns + interval_ns
362 };
363
364 if next_event_time < ts_now {
365 anyhow::bail!(
366 "Timer '{name}' next event time {} would be in the past (current time is {ts_now})",
367 next_event_time.to_rfc3339(),
368 );
369 }
370 }
371
372 if let Some(stop_time) = stop_time_ns {
373 if stop_time <= start_time_ns {
374 anyhow::bail!(
375 "Timer '{name}' stop time {} must be after start time {}",
376 stop_time.to_rfc3339(),
377 start_time_ns.to_rfc3339(),
378 );
379 }
380
381 if !allow_past && stop_time <= ts_now {
382 anyhow::bail!(
383 "Timer '{name}' stop time {} is in the past (current time is {ts_now})",
384 stop_time.to_rfc3339(),
385 );
386 }
387 }
388
389 Ok((
390 name,
391 start_time_ns,
392 stop_time_ns,
393 allow_past,
394 fire_immediately,
395 ))
396}
397
398#[derive(Debug)]
406pub struct TestClock {
407 time: AtomicTime,
408 timers: BTreeMap<Ustr, TestTimer>,
410 callbacks: CallbackRegistry,
411}
412
413impl TestClock {
414 #[must_use]
416 pub fn new() -> Self {
417 Self {
418 time: AtomicTime::new(false, UnixNanos::default()),
419 timers: BTreeMap::new(),
420 callbacks: CallbackRegistry::new(),
421 }
422 }
423
424 #[must_use]
426 pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
427 &self.timers
428 }
429
430 pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
447 const WARN_TIME_EVENTS_THRESHOLD: usize = 1_000_000;
448
449 let from_time_ns = self.time.get_time_ns();
450
451 assert!(
452 to_time_ns >= from_time_ns,
453 "Invariant: time must be non-decreasing, `to_time_ns` {to_time_ns} < `from_time_ns` {from_time_ns}"
454 );
455
456 if set_time {
457 self.time.set_time(to_time_ns);
458 }
459
460 let mut events: Vec<TimeEvent> = Vec::new();
462 self.timers.retain(|_, timer| {
463 timer.advance(to_time_ns).for_each(|event| {
464 events.push(event);
465 });
466
467 !timer.is_expired()
468 });
469
470 if events.len() >= WARN_TIME_EVENTS_THRESHOLD {
471 log::warn!(
472 "Allocated {} time events during clock advancement from {} to {}, \
473 consider stopping the timer between large time ranges with no data points",
474 events.len().separate_with_commas(),
475 from_time_ns,
476 to_time_ns
477 );
478 }
479
480 events.sort_by_key(|a| a.ts_event);
481 events
482 }
483
484 #[must_use]
494 pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandler> {
495 events
496 .into_iter()
497 .map(|event| self.callbacks.get_handler(event))
498 .collect()
499 }
500
501 fn replace_existing_timer_if_needed(&mut self, name: &Ustr) {
502 replace_existing_timer(&mut self.timers, name);
503 }
504}
505
506impl Default for TestClock {
507 fn default() -> Self {
509 Self::new()
510 }
511}
512
513impl Deref for TestClock {
514 type Target = AtomicTime;
515
516 fn deref(&self) -> &Self::Target {
517 &self.time
518 }
519}
520
521impl Clock for TestClock {
522 fn timestamp_ns(&self) -> UnixNanos {
523 self.time.get_time_ns()
524 }
525
526 fn timestamp_us(&self) -> u64 {
527 self.time.get_time_us()
528 }
529
530 fn timestamp_ms(&self) -> u64 {
531 self.time.get_time_ms()
532 }
533
534 fn timestamp(&self) -> f64 {
535 self.time.get_time()
536 }
537
538 fn timer_names(&self) -> Vec<&str> {
539 self.timers
540 .iter()
541 .filter(|(_, timer)| !timer.is_expired())
542 .map(|(k, _)| k.as_str())
543 .collect()
544 }
545
546 fn timer_count(&self) -> usize {
547 self.timers
548 .iter()
549 .filter(|(_, timer)| !timer.is_expired())
550 .count()
551 }
552
553 fn timer_exists(&self, name: &Ustr) -> bool {
554 self.timers.contains_key(name)
555 }
556
557 fn register_default_handler(&mut self, callback: TimeEventCallback) {
558 self.callbacks.register_default_handler(callback);
559 }
560
561 fn get_handler(&self, event: TimeEvent) -> TimeEventHandler {
567 self.callbacks.get_handler(event)
568 }
569
570 fn set_time_alert_ns(
571 &mut self,
572 name: &str,
573 alert_time_ns: UnixNanos,
574 callback: Option<TimeEventCallback>,
575 allow_past: Option<bool>,
576 ) -> anyhow::Result<()> {
577 let ts_now = self.get_time_ns();
578 let (name, alert_time_ns) =
579 validate_and_prepare_time_alert(name, alert_time_ns, allow_past, ts_now)?;
580
581 self.replace_existing_timer_if_needed(&name);
582
583 check_predicate_true(
584 callback.is_some() | self.callbacks.has_any_callback(&name),
585 "No callbacks provided",
586 )?;
587
588 if let Some(callback) = callback {
589 self.callbacks.register_callback(name, callback);
590 }
591
592 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
594 let fire_immediately = alert_time_ns == ts_now;
595
596 let timer = TestTimer::new(
597 name,
598 interval_ns,
599 ts_now,
600 Some(alert_time_ns),
601 fire_immediately,
602 );
603 self.timers.insert(name, timer);
604
605 Ok(())
606 }
607
608 fn set_timer_ns(
609 &mut self,
610 name: &str,
611 interval_ns: u64,
612 start_time_ns: Option<UnixNanos>,
613 stop_time_ns: Option<UnixNanos>,
614 callback: Option<TimeEventCallback>,
615 allow_past: Option<bool>,
616 fire_immediately: Option<bool>,
617 ) -> anyhow::Result<()> {
618 let ts_now = self.get_time_ns();
619 let (name, start_time_ns, stop_time_ns, _allow_past, fire_immediately) =
620 validate_and_prepare_timer(
621 name,
622 interval_ns,
623 start_time_ns,
624 stop_time_ns,
625 allow_past,
626 fire_immediately,
627 ts_now,
628 )?;
629
630 check_predicate_true(
631 callback.is_some() | self.callbacks.has_any_callback(&name),
632 "No callbacks provided",
633 )?;
634
635 self.replace_existing_timer_if_needed(&name);
636
637 if let Some(callback) = callback {
638 self.callbacks.register_callback(name, callback);
639 }
640
641 let interval_ns = create_valid_interval(interval_ns);
642
643 let timer = TestTimer::new(
644 name,
645 interval_ns,
646 start_time_ns,
647 stop_time_ns,
648 fire_immediately,
649 );
650 self.timers.insert(name, timer);
651
652 Ok(())
653 }
654
655 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
656 self.timers
657 .get(&Ustr::from(name))
658 .map(|timer| timer.next_time_ns())
659 }
660
661 fn cancel_timer(&mut self, name: &str) {
662 let timer = self.timers.remove(&Ustr::from(name));
663 if let Some(mut timer) = timer {
664 timer.cancel();
665 }
666 }
667
668 fn cancel_timers(&mut self) {
669 for timer in &mut self.timers.values_mut() {
670 timer.cancel();
671 }
672
673 self.timers.clear();
674 }
675
676 fn reset(&mut self) {
677 self.time = AtomicTime::new(false, UnixNanos::default());
678 self.timers = BTreeMap::new();
679 self.callbacks.clear();
680 }
681}
682
683pub(crate) fn replace_existing_timer<T: Timer>(timers: &mut BTreeMap<Ustr, T>, name: &Ustr) {
684 let is_expired = timers.get(name).map(|t| t.is_expired());
685 match is_expired {
686 Some(true) => {
687 timers.remove(name);
688 }
689 Some(false) => {
690 if let Some(mut timer) = timers.remove(name) {
691 timer.cancel();
692 }
693 log::warn!("Timer '{name}' replaced");
694 }
695 None => {}
696 }
697}
698
699#[cfg(test)]
700mod tests {
701 use std::{
702 sync::{Arc, Mutex},
703 time::Duration,
704 };
705
706 use nautilus_core::{MUTEX_POISONED, UnixNanos};
707 use rstest::{fixture, rstest};
708 use ustr::Ustr;
709
710 use super::*;
711 use crate::timer::{TimeEvent, TimeEventCallback};
712
713 #[derive(Debug, Default)]
714 struct TestCallback {
715 called: Arc<Mutex<bool>>,
717 }
718
719 impl TestCallback {
720 fn new(called: Arc<Mutex<bool>>) -> Self {
721 Self { called }
722 }
723 }
724
725 impl From<TestCallback> for TimeEventCallback {
726 fn from(callback: TestCallback) -> Self {
727 Self::from(move |_event: TimeEvent| {
728 if let Ok(mut called) = callback.called.lock() {
729 *called = true;
730 }
731 })
732 }
733 }
734
735 #[fixture]
736 pub fn test_clock() -> TestClock {
737 let mut clock = TestClock::new();
738 clock.register_default_handler(TestCallback::default().into());
739 clock
740 }
741
742 #[rstest]
743 fn test_time_monotonicity(mut test_clock: TestClock) {
744 let initial_time = test_clock.timestamp_ns();
745 test_clock.advance_time(UnixNanos::from(*initial_time + 1000), true);
746 assert!(test_clock.timestamp_ns() > initial_time);
747 }
748
749 #[rstest]
750 fn test_timer_registration(mut test_clock: TestClock) {
751 test_clock
752 .set_time_alert_ns(
753 "test_timer",
754 (*test_clock.timestamp_ns() + 1000).into(),
755 None,
756 None,
757 )
758 .unwrap();
759 assert_eq!(test_clock.timer_count(), 1);
760 assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
761 }
762
763 #[rstest]
764 fn test_timer_expiration(mut test_clock: TestClock) {
765 let alert_time = (*test_clock.timestamp_ns() + 1000).into();
766 test_clock
767 .set_time_alert_ns("test_timer", alert_time, None, None)
768 .unwrap();
769 let events = test_clock.advance_time(alert_time, true);
770 assert_eq!(events.len(), 1);
771 assert_eq!(events[0].name.as_str(), "test_timer");
772 }
773
774 #[rstest]
775 fn test_timer_cancellation(mut test_clock: TestClock) {
776 test_clock
777 .set_time_alert_ns(
778 "test_timer",
779 (*test_clock.timestamp_ns() + 1000).into(),
780 None,
781 None,
782 )
783 .unwrap();
784 assert_eq!(test_clock.timer_count(), 1);
785 test_clock.cancel_timer("test_timer");
786 assert_eq!(test_clock.timer_count(), 0);
787 }
788
789 #[rstest]
790 fn test_time_advancement(mut test_clock: TestClock) {
791 let start_time = test_clock.timestamp_ns();
792 test_clock
793 .set_timer_ns("test_timer", 1000, Some(start_time), None, None, None, None)
794 .unwrap();
795 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2500), true);
796 assert_eq!(events.len(), 2);
797 assert_eq!(*events[0].ts_event, *start_time + 1000);
798 assert_eq!(*events[1].ts_event, *start_time + 2000);
799 }
800
801 #[rstest]
802 fn test_default_and_custom_callbacks() {
803 let mut clock = TestClock::new();
804 let default_called = Arc::new(Mutex::new(false));
805 let custom_called = Arc::new(Mutex::new(false));
806
807 let default_callback = TestCallback::new(Arc::clone(&default_called));
808 let custom_callback = TestCallback::new(Arc::clone(&custom_called));
809
810 clock.register_default_handler(TimeEventCallback::from(default_callback));
811 clock
812 .set_time_alert_ns(
813 "default_timer",
814 (*clock.timestamp_ns() + 1000).into(),
815 None,
816 None,
817 )
818 .unwrap();
819 clock
820 .set_time_alert_ns(
821 "custom_timer",
822 (*clock.timestamp_ns() + 1000).into(),
823 Some(TimeEventCallback::from(custom_callback)),
824 None,
825 )
826 .unwrap();
827
828 let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
829 let handlers = clock.match_handlers(events);
830
831 for handler in handlers {
832 handler.callback.call(handler.event);
833 }
834
835 assert!(*default_called.lock().expect(MUTEX_POISONED));
836 assert!(*custom_called.lock().expect(MUTEX_POISONED));
837 }
838
839 #[rstest]
840 fn test_timer_with_rust_local_callback() {
841 use std::{cell::RefCell, rc::Rc};
842
843 let mut clock = TestClock::new();
844 let call_count = Rc::new(RefCell::new(0_u32));
845 let call_count_clone = Rc::clone(&call_count);
846
847 let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event: TimeEvent| {
849 *call_count_clone.borrow_mut() += 1;
850 });
851
852 clock
853 .set_time_alert_ns(
854 "local_timer",
855 (*clock.timestamp_ns() + 1000).into(),
856 Some(TimeEventCallback::from(callback)),
857 None,
858 )
859 .unwrap();
860
861 let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
862 let handlers = clock.match_handlers(events);
863
864 for handler in handlers {
865 handler.callback.call(handler.event);
866 }
867
868 assert_eq!(*call_count.borrow(), 1);
869 }
870
871 #[rstest]
872 fn test_multiple_timers(mut test_clock: TestClock) {
873 let start_time = test_clock.timestamp_ns();
874 test_clock
875 .set_timer_ns("timer1", 1000, Some(start_time), None, None, None, None)
876 .unwrap();
877 test_clock
878 .set_timer_ns("timer2", 2000, Some(start_time), None, None, None, None)
879 .unwrap();
880 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2000), true);
881 assert_eq!(events.len(), 3);
882 assert_eq!(events[0].name.as_str(), "timer1");
883 assert_eq!(events[1].name.as_str(), "timer1");
884 assert_eq!(events[2].name.as_str(), "timer2");
885 }
886
887 #[rstest]
888 fn test_allow_past_parameter_true(mut test_clock: TestClock) {
889 test_clock.set_time(UnixNanos::from(2000));
890 let current_time = test_clock.timestamp_ns();
891 let past_time = UnixNanos::from(current_time.as_u64() - 1000);
892
893 test_clock
895 .set_time_alert_ns("past_timer", past_time, None, Some(true))
896 .unwrap();
897
898 assert_eq!(test_clock.timer_count(), 1);
900 assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
901
902 let next_time = test_clock.next_time_ns("past_timer").unwrap();
904 assert!(next_time >= current_time);
905 }
906
907 #[rstest]
908 fn test_allow_past_parameter_false(mut test_clock: TestClock) {
909 test_clock.set_time(UnixNanos::from(2000));
910 let current_time = test_clock.timestamp_ns();
911 let past_time = current_time - 1000;
912
913 let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
915
916 assert!(result.is_err());
918 assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
919
920 assert_eq!(test_clock.timer_count(), 0);
922 assert!(test_clock.timer_names().is_empty());
923 }
924
925 #[rstest]
926 fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
927 test_clock.set_time(UnixNanos::from(2000));
928 let current_time = test_clock.timestamp_ns();
929 let start_time = current_time + 1000;
930 let stop_time = current_time + 500; let result = test_clock.set_timer_ns(
934 "invalid_timer",
935 100,
936 Some(start_time),
937 Some(stop_time),
938 None,
939 None,
940 None,
941 );
942
943 assert!(result.is_err());
945 assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
946
947 assert_eq!(test_clock.timer_count(), 0);
949 }
950
951 #[rstest]
952 fn test_set_timer_ns_fire_immediately_true(mut test_clock: TestClock) {
953 let start_time = test_clock.timestamp_ns();
954 let interval_ns = 1000;
955
956 test_clock
957 .set_timer_ns(
958 "fire_immediately_timer",
959 interval_ns,
960 Some(start_time),
961 None,
962 None,
963 None,
964 Some(true),
965 )
966 .unwrap();
967
968 let events = test_clock.advance_time(start_time + 2500, true);
970
971 assert_eq!(events.len(), 3);
973 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *start_time + 1000); assert_eq!(*events[2].ts_event, *start_time + 2000); }
977
978 #[rstest]
979 fn test_set_timer_ns_fire_immediately_false(mut test_clock: TestClock) {
980 let start_time = test_clock.timestamp_ns();
981 let interval_ns = 1000;
982
983 test_clock
984 .set_timer_ns(
985 "normal_timer",
986 interval_ns,
987 Some(start_time),
988 None,
989 None,
990 None,
991 Some(false),
992 )
993 .unwrap();
994
995 let events = test_clock.advance_time(start_time + 2500, true);
997
998 assert_eq!(events.len(), 2);
1000 assert_eq!(*events[0].ts_event, *start_time + 1000); assert_eq!(*events[1].ts_event, *start_time + 2000); }
1003
1004 #[rstest]
1005 fn test_set_timer_ns_fire_immediately_default_is_false(mut test_clock: TestClock) {
1006 let start_time = test_clock.timestamp_ns();
1007 let interval_ns = 1000;
1008
1009 test_clock
1011 .set_timer_ns(
1012 "default_timer",
1013 interval_ns,
1014 Some(start_time),
1015 None,
1016 None,
1017 None,
1018 None,
1019 )
1020 .unwrap();
1021
1022 let events = test_clock.advance_time(start_time + 1500, true);
1023
1024 assert_eq!(events.len(), 1);
1026 assert_eq!(*events[0].ts_event, *start_time + 1000); }
1028
1029 #[rstest]
1030 fn test_set_timer_ns_fire_immediately_with_zero_start_time(mut test_clock: TestClock) {
1031 test_clock.set_time(5000.into());
1032 let interval_ns = 1000;
1033
1034 test_clock
1035 .set_timer_ns(
1036 "zero_start_timer",
1037 interval_ns,
1038 None,
1039 None,
1040 None,
1041 None,
1042 Some(true),
1043 )
1044 .unwrap();
1045
1046 let events = test_clock.advance_time(UnixNanos::from(7000), true);
1047
1048 assert_eq!(events.len(), 3);
1051 assert_eq!(*events[0].ts_event, 5000); assert_eq!(*events[1].ts_event, 6000);
1053 assert_eq!(*events[2].ts_event, 7000);
1054 }
1055
1056 #[rstest]
1057 fn test_multiple_timers_different_fire_immediately_settings(mut test_clock: TestClock) {
1058 let start_time = test_clock.timestamp_ns();
1059 let interval_ns = 1000;
1060
1061 test_clock
1063 .set_timer_ns(
1064 "immediate_timer",
1065 interval_ns,
1066 Some(start_time),
1067 None,
1068 None,
1069 None,
1070 Some(true),
1071 )
1072 .unwrap();
1073
1074 test_clock
1076 .set_timer_ns(
1077 "normal_timer",
1078 interval_ns,
1079 Some(start_time),
1080 None,
1081 None,
1082 None,
1083 Some(false),
1084 )
1085 .unwrap();
1086
1087 let events = test_clock.advance_time(start_time + 1500, true);
1088
1089 assert_eq!(events.len(), 3);
1091
1092 let mut event_times: Vec<u64> = events.iter().map(|e| e.ts_event.as_u64()).collect();
1094 event_times.sort_unstable();
1095
1096 assert_eq!(event_times[0], start_time.as_u64()); assert_eq!(event_times[1], start_time.as_u64() + 1000); assert_eq!(event_times[2], start_time.as_u64() + 1000); }
1100
1101 #[rstest]
1102 fn test_timer_name_collision_overwrites(mut test_clock: TestClock) {
1103 let start_time = test_clock.timestamp_ns();
1104
1105 test_clock
1107 .set_timer_ns(
1108 "collision_timer",
1109 1000,
1110 Some(start_time),
1111 None,
1112 None,
1113 None,
1114 None,
1115 )
1116 .unwrap();
1117
1118 let result = test_clock.set_timer_ns(
1120 "collision_timer",
1121 2000,
1122 Some(start_time),
1123 None,
1124 None,
1125 None,
1126 None,
1127 );
1128
1129 assert!(result.is_ok());
1130 assert_eq!(test_clock.timer_count(), 1);
1132
1133 let next_time = test_clock.next_time_ns("collision_timer").unwrap();
1135 assert_eq!(next_time, start_time + 2000);
1137 }
1138
1139 #[rstest]
1140 fn test_timer_zero_interval_error(mut test_clock: TestClock) {
1141 let start_time = test_clock.timestamp_ns();
1142
1143 let result =
1145 test_clock.set_timer_ns("zero_interval", 0, Some(start_time), None, None, None, None);
1146
1147 assert!(result.is_err());
1148 assert_eq!(test_clock.timer_count(), 0);
1149 }
1150
1151 #[rstest]
1152 fn test_timer_empty_name_error(mut test_clock: TestClock) {
1153 let start_time = test_clock.timestamp_ns();
1154
1155 let result = test_clock.set_timer_ns("", 1000, Some(start_time), None, None, None, None);
1157
1158 assert!(result.is_err());
1159 assert_eq!(test_clock.timer_count(), 0);
1160 }
1161
1162 #[rstest]
1163 fn test_timer_exists(mut test_clock: TestClock) {
1164 let name = Ustr::from("exists_timer");
1165 assert!(!test_clock.timer_exists(&name));
1166
1167 test_clock
1168 .set_time_alert_ns(
1169 name.as_str(),
1170 (*test_clock.timestamp_ns() + 1_000).into(),
1171 None,
1172 None,
1173 )
1174 .unwrap();
1175
1176 assert!(test_clock.timer_exists(&name));
1177 }
1178
1179 #[rstest]
1180 fn test_timer_rejects_past_stop_time_when_not_allowed(mut test_clock: TestClock) {
1181 test_clock.set_time(UnixNanos::from(10_000));
1182 let current = test_clock.timestamp_ns();
1183
1184 let result = test_clock.set_timer_ns(
1185 "past_stop",
1186 10_000,
1187 Some(current - 500),
1188 Some(current - 100),
1189 None,
1190 Some(false),
1191 None,
1192 );
1193
1194 let err = result.expect_err("expected stop time validation error");
1195 let err_msg = err.to_string();
1196 assert!(err_msg.contains("stop time"));
1197 assert!(err_msg.contains("in the past"));
1198 }
1199
1200 #[rstest]
1201 fn test_timer_accepts_future_stop_time(mut test_clock: TestClock) {
1202 let current = test_clock.timestamp_ns();
1203
1204 let result = test_clock.set_timer_ns(
1205 "future_stop",
1206 1_000,
1207 Some(current),
1208 Some(current + 10_000),
1209 None,
1210 Some(false),
1211 None,
1212 );
1213
1214 assert!(result.is_ok());
1215 }
1216
1217 #[rstest]
1218 fn test_timer_fire_immediately_at_exact_stop_time(mut test_clock: TestClock) {
1219 let start_time = test_clock.timestamp_ns();
1220 let interval_ns = 1000;
1221 let stop_time = start_time + interval_ns; test_clock
1224 .set_timer_ns(
1225 "exact_stop",
1226 interval_ns,
1227 Some(start_time),
1228 Some(stop_time),
1229 None,
1230 None,
1231 Some(true),
1232 )
1233 .unwrap();
1234
1235 let events = test_clock.advance_time(stop_time, true);
1236
1237 assert_eq!(events.len(), 2);
1239 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *stop_time); }
1242
1243 #[rstest]
1244 fn test_timer_advance_to_exact_next_time(mut test_clock: TestClock) {
1245 let start_time = test_clock.timestamp_ns();
1246 let interval_ns = 1000;
1247
1248 test_clock
1249 .set_timer_ns(
1250 "exact_advance",
1251 interval_ns,
1252 Some(start_time),
1253 None,
1254 None,
1255 None,
1256 Some(false),
1257 )
1258 .unwrap();
1259
1260 let next_time = test_clock.next_time_ns("exact_advance").unwrap();
1262 let events = test_clock.advance_time(next_time, true);
1263
1264 assert_eq!(events.len(), 1);
1265 assert_eq!(*events[0].ts_event, *next_time);
1266 }
1267
1268 #[rstest]
1269 fn test_allow_past_bar_aggregation_use_case(mut test_clock: TestClock) {
1270 test_clock.set_time(UnixNanos::from(100_500)); let bar_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1280 "bar_timer",
1281 interval_ns,
1282 Some(bar_start_time),
1283 None,
1284 None,
1285 Some(false), Some(false), );
1288
1289 assert!(result.is_ok());
1291 assert_eq!(test_clock.timer_count(), 1);
1292
1293 let next_time = test_clock.next_time_ns("bar_timer").unwrap();
1295 assert_eq!(*next_time, 101_000);
1296 }
1297
1298 #[rstest]
1299 fn test_allow_past_false_rejects_when_next_event_in_past(mut test_clock: TestClock) {
1300 test_clock.set_time(UnixNanos::from(102_000)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1309 "past_event_timer",
1310 interval_ns,
1311 Some(past_start_time),
1312 None,
1313 None,
1314 Some(false), Some(false), );
1317
1318 assert!(result.is_err());
1320 assert!(
1321 result
1322 .unwrap_err()
1323 .to_string()
1324 .contains("would be in the past")
1325 );
1326 }
1327
1328 #[rstest]
1329 fn test_allow_past_false_with_fire_immediately_true(mut test_clock: TestClock) {
1330 test_clock.set_time(UnixNanos::from(100_500)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000;
1334
1335 let result = test_clock.set_timer_ns(
1338 "immediate_past_timer",
1339 interval_ns,
1340 Some(past_start_time),
1341 None,
1342 None,
1343 Some(false), Some(true), );
1346
1347 assert!(result.is_err());
1349 assert!(
1350 result
1351 .unwrap_err()
1352 .to_string()
1353 .contains("would be in the past")
1354 );
1355 }
1356
1357 #[rstest]
1358 fn test_cancel_timer_during_execution(mut test_clock: TestClock) {
1359 let start_time = test_clock.timestamp_ns();
1360
1361 test_clock
1362 .set_timer_ns(
1363 "cancel_test",
1364 1000,
1365 Some(start_time),
1366 None,
1367 None,
1368 None,
1369 None,
1370 )
1371 .unwrap();
1372
1373 assert_eq!(test_clock.timer_count(), 1);
1374
1375 test_clock.cancel_timer("cancel_test");
1377
1378 assert_eq!(test_clock.timer_count(), 0);
1379
1380 let events = test_clock.advance_time(start_time + 2000, true);
1382 assert_eq!(events.len(), 0);
1383 }
1384
1385 #[rstest]
1386 fn test_cancel_all_timers(mut test_clock: TestClock) {
1387 test_clock
1389 .set_timer_ns("timer1", 1000, None, None, None, None, None)
1390 .unwrap();
1391 test_clock
1392 .set_timer_ns("timer2", 1500, None, None, None, None, None)
1393 .unwrap();
1394 test_clock
1395 .set_timer_ns("timer3", 2000, None, None, None, None, None)
1396 .unwrap();
1397
1398 assert_eq!(test_clock.timer_count(), 3);
1399
1400 test_clock.cancel_timers();
1402
1403 assert_eq!(test_clock.timer_count(), 0);
1404
1405 let events = test_clock.advance_time(UnixNanos::from(5000), true);
1407 assert_eq!(events.len(), 0);
1408 }
1409
1410 #[rstest]
1411 fn test_clock_reset_clears_timers(mut test_clock: TestClock) {
1412 test_clock
1413 .set_timer_ns("reset_test", 1000, None, None, None, None, None)
1414 .unwrap();
1415
1416 assert_eq!(test_clock.timer_count(), 1);
1417
1418 test_clock.reset();
1420
1421 assert_eq!(test_clock.timer_count(), 0);
1422 assert_eq!(test_clock.timestamp_ns(), UnixNanos::default()); }
1424
1425 #[rstest]
1426 fn test_set_time_alert_default_impl(mut test_clock: TestClock) {
1427 let current_time = test_clock.utc_now();
1428 let alert_time = current_time + chrono::Duration::seconds(1);
1429
1430 test_clock
1432 .set_time_alert("alert_test", alert_time, None, None)
1433 .unwrap();
1434
1435 assert_eq!(test_clock.timer_count(), 1);
1436 assert_eq!(test_clock.timer_names(), vec!["alert_test"]);
1437
1438 let expected_ns = UnixNanos::from(alert_time);
1440 let next_time = test_clock.next_time_ns("alert_test").unwrap();
1441
1442 let diff = if next_time >= expected_ns {
1444 next_time.as_u64() - expected_ns.as_u64()
1445 } else {
1446 expected_ns.as_u64() - next_time.as_u64()
1447 };
1448 assert!(
1449 diff < 1000,
1450 "Timer should be set within 1 microsecond of expected time"
1451 );
1452 }
1453
1454 #[rstest]
1455 fn test_set_timer_default_impl(mut test_clock: TestClock) {
1456 let current_time = test_clock.utc_now();
1457 let start_time = current_time + chrono::Duration::seconds(1);
1458 let interval = Duration::from_millis(500);
1459
1460 test_clock
1462 .set_timer(
1463 "timer_test",
1464 interval,
1465 Some(start_time),
1466 None,
1467 None,
1468 None,
1469 None,
1470 )
1471 .unwrap();
1472
1473 assert_eq!(test_clock.timer_count(), 1);
1474 assert_eq!(test_clock.timer_names(), vec!["timer_test"]);
1475
1476 let start_ns = UnixNanos::from(start_time);
1478 let interval_ns = interval.as_nanos() as u64;
1479
1480 let events = test_clock.advance_time(start_ns + interval_ns * 3, true);
1481 assert_eq!(events.len(), 3); assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1485 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1486 assert_eq!(*events[2].ts_event, *start_ns + interval_ns * 3);
1487 }
1488
1489 #[rstest]
1490 fn test_set_timer_with_stop_time_default_impl(mut test_clock: TestClock) {
1491 let current_time = test_clock.utc_now();
1492 let start_time = current_time + chrono::Duration::seconds(1);
1493 let stop_time = current_time + chrono::Duration::seconds(3);
1494 let interval = Duration::from_secs(1);
1495
1496 test_clock
1498 .set_timer(
1499 "timer_with_stop",
1500 interval,
1501 Some(start_time),
1502 Some(stop_time),
1503 None,
1504 None,
1505 None,
1506 )
1507 .unwrap();
1508
1509 assert_eq!(test_clock.timer_count(), 1);
1510
1511 let stop_ns = UnixNanos::from(stop_time);
1513 let events = test_clock.advance_time(stop_ns + 1000, true);
1514
1515 assert_eq!(events.len(), 2);
1517
1518 let start_ns = UnixNanos::from(start_time);
1519 let interval_ns = interval.as_nanos() as u64;
1520 assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1521 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1522 }
1523
1524 #[rstest]
1525 fn test_set_timer_fire_immediately_default_impl(mut test_clock: TestClock) {
1526 let current_time = test_clock.utc_now();
1527 let start_time = current_time + chrono::Duration::seconds(1);
1528 let interval = Duration::from_millis(500);
1529
1530 test_clock
1532 .set_timer(
1533 "immediate_timer",
1534 interval,
1535 Some(start_time),
1536 None,
1537 None,
1538 None,
1539 Some(true),
1540 )
1541 .unwrap();
1542
1543 let start_ns = UnixNanos::from(start_time);
1544 let interval_ns = interval.as_nanos() as u64;
1545
1546 let events = test_clock.advance_time(start_ns + interval_ns, true);
1548
1549 assert_eq!(events.len(), 2);
1551 assert_eq!(*events[0].ts_event, *start_ns); assert_eq!(*events[1].ts_event, *start_ns + interval_ns); }
1554
1555 #[rstest]
1556 fn test_set_time_alert_when_alert_time_equals_current_time(mut test_clock: TestClock) {
1557 let current_time = test_clock.timestamp_ns();
1558
1559 test_clock
1561 .set_time_alert_ns("alert_at_current_time", current_time, None, None)
1562 .unwrap();
1563
1564 assert_eq!(test_clock.timer_count(), 1);
1565
1566 let events = test_clock.advance_time(current_time, true);
1568
1569 assert_eq!(events.len(), 1);
1571 assert_eq!(events[0].name.as_str(), "alert_at_current_time");
1572 assert_eq!(*events[0].ts_event, *current_time);
1573 }
1574
1575 #[rstest]
1576 fn test_cancel_and_reschedule_same_name(mut test_clock: TestClock) {
1577 let start = test_clock.timestamp_ns();
1578
1579 test_clock
1580 .set_time_alert_ns("timer", UnixNanos::from(*start + 1000), None, None)
1581 .unwrap();
1582 assert_eq!(test_clock.timer_count(), 1);
1583
1584 test_clock.cancel_timer("timer");
1585 assert_eq!(test_clock.timer_count(), 0);
1586
1587 test_clock
1588 .set_time_alert_ns("timer", UnixNanos::from(*start + 2000), None, None)
1589 .unwrap();
1590 assert_eq!(test_clock.timer_count(), 1);
1591
1592 let events = test_clock.advance_time(UnixNanos::from(*start + 1500), true);
1593 assert!(events.is_empty());
1594
1595 let events = test_clock.advance_time(UnixNanos::from(*start + 2000), true);
1596 assert_eq!(events.len(), 1);
1597 assert_eq!(*events[0].ts_event, *start + 2000);
1598 }
1599
1600 #[rstest]
1601 fn test_multiple_timers_same_timestamp_all_fire(mut test_clock: TestClock) {
1602 let fire_time = UnixNanos::from(*test_clock.timestamp_ns() + 1000);
1603
1604 for i in 0..5 {
1605 test_clock
1606 .set_time_alert_ns(&format!("timer_{i}"), fire_time, None, None)
1607 .unwrap();
1608 }
1609 assert_eq!(test_clock.timer_count(), 5);
1610
1611 let events = test_clock.advance_time(fire_time, true);
1612 assert_eq!(events.len(), 5);
1613 for event in &events {
1614 assert_eq!(*event.ts_event, *fire_time);
1615 }
1616 }
1617
1618 #[rstest]
1619 fn test_events_ordered_by_timestamp_after_advance() {
1620 let mut clock = TestClock::new();
1621 clock.register_default_handler(TestCallback::default().into());
1622 let start = clock.timestamp_ns();
1623
1624 clock
1625 .set_time_alert_ns("third", UnixNanos::from(*start + 300), None, None)
1626 .unwrap();
1627 clock
1628 .set_time_alert_ns("first", UnixNanos::from(*start + 100), None, None)
1629 .unwrap();
1630 clock
1631 .set_time_alert_ns("second", UnixNanos::from(*start + 200), None, None)
1632 .unwrap();
1633
1634 let events = clock.advance_time(UnixNanos::from(*start + 400), true);
1635 assert_eq!(events.len(), 3);
1636 assert_eq!(events[0].name.as_str(), "first");
1637 assert_eq!(events[1].name.as_str(), "second");
1638 assert_eq!(events[2].name.as_str(), "third");
1639 }
1640
1641 #[rstest]
1642 fn test_large_interval_does_not_overflow(mut test_clock: TestClock) {
1643 let start = test_clock.timestamp_ns();
1644 let large_interval: u64 = 1_000_000_000 * 60 * 60 * 24 * 365; test_clock
1647 .set_timer_ns(
1648 "large_interval",
1649 large_interval,
1650 Some(start),
1651 None,
1652 None,
1653 None,
1654 None,
1655 )
1656 .unwrap();
1657
1658 let events = test_clock.advance_time(UnixNanos::from(*start + large_interval), true);
1659 assert_eq!(events.len(), 1);
1660 assert_eq!(*events[0].ts_event, *start + large_interval);
1661 }
1662
1663 #[rstest]
1664 fn test_near_zero_interval_fires_correctly(mut test_clock: TestClock) {
1665 let start = test_clock.timestamp_ns();
1666
1667 test_clock
1668 .set_timer_ns("tiny", 1, Some(start), None, None, None, None)
1669 .unwrap();
1670
1671 let events = test_clock.advance_time(UnixNanos::from(*start + 10), true);
1672 assert_eq!(events.len(), 10);
1673
1674 for i in 1..events.len() {
1675 assert!(events[i].ts_event >= events[i - 1].ts_event);
1676 }
1677 }
1678
1679 #[rstest]
1680 fn test_repeated_advance_to_same_time_no_double_fire(mut test_clock: TestClock) {
1681 let fire_time = UnixNanos::from(*test_clock.timestamp_ns() + 1000);
1682
1683 test_clock
1684 .set_time_alert_ns("once", fire_time, None, None)
1685 .unwrap();
1686
1687 let events1 = test_clock.advance_time(fire_time, true);
1688 assert_eq!(events1.len(), 1);
1689
1690 let events2 = test_clock.advance_time(fire_time, true);
1691 assert!(events2.is_empty());
1692 }
1693
1694 #[rstest]
1695 fn test_advance_with_no_timers(mut test_clock: TestClock) {
1696 let start = test_clock.timestamp_ns();
1697
1698 let events = test_clock.advance_time(UnixNanos::from(*start + 1000), true);
1699 assert!(events.is_empty());
1700 assert_eq!(*test_clock.timestamp_ns(), *start + 1000);
1701 }
1702}