1use std::{
19 num::NonZeroU64,
20 sync::{
21 Arc,
22 atomic::{self, AtomicU64},
23 },
24};
25
26#[cfg(feature = "python")]
27use nautilus_core::consts::NAUTILUS_PREFIX;
28use nautilus_core::{
29 UUID4, UnixNanos,
30 correctness::{FAILED, check_valid_string_utf8},
31 datetime::floor_to_nearest_microsecond,
32 time::get_atomic_clock_realtime,
33};
34#[cfg(feature = "python")]
35use pyo3::{Py, PyAny, Python};
36use tokio::{
37 task::JoinHandle,
38 time::{Duration, Instant},
39};
40use ustr::Ustr;
41
42use super::runtime::get_runtime;
43use crate::{
44 runner::TimeEventSender,
45 timer::{TimeEvent, TimeEventCallback, TimeEventHandler, Timer},
46};
47
48#[derive(Debug)]
57pub struct LiveTimer {
58 pub name: Ustr,
60 pub interval_ns: NonZeroU64,
62 pub start_time_ns: UnixNanos,
64 pub stop_time_ns: Option<UnixNanos>,
66 pub fire_immediately: bool,
68 next_time_ns: Arc<AtomicU64>,
69 callback: TimeEventCallback,
70 task_handle: Option<JoinHandle<()>>,
71 sender: Option<Arc<dyn TimeEventSender>>,
72}
73
74impl LiveTimer {
75 #[must_use]
81 pub fn new(
82 name: Ustr,
83 interval_ns: NonZeroU64,
84 start_time_ns: UnixNanos,
85 stop_time_ns: Option<UnixNanos>,
86 callback: TimeEventCallback,
87 fire_immediately: bool,
88 sender: Option<Arc<dyn TimeEventSender>>,
89 ) -> Self {
90 check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
91
92 let next_time_ns = if fire_immediately {
93 start_time_ns.as_u64()
94 } else {
95 start_time_ns.as_u64() + interval_ns.get()
96 };
97
98 log::debug!("Creating timer '{name}'");
99
100 Self {
101 name,
102 interval_ns,
103 start_time_ns,
104 stop_time_ns,
105 fire_immediately,
106 next_time_ns: Arc::new(AtomicU64::new(next_time_ns)),
107 callback,
108 task_handle: None,
109 sender,
110 }
111 }
112
113 #[must_use]
117 pub fn next_time_ns(&self) -> UnixNanos {
118 UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
119 }
120
121 #[must_use]
126 pub fn is_expired(&self) -> bool {
127 self.task_handle
128 .as_ref()
129 .is_some_and(tokio::task::JoinHandle::is_finished)
130 }
131
132 #[allow(unused_variables)]
141 pub fn start(&mut self) {
142 let event_name = self.name;
143 let stop_time_ns = self.stop_time_ns;
144 let interval_ns = self.interval_ns.get();
145 let callback = self.callback.clone();
146
147 let clock = get_atomic_clock_realtime();
149 let now_ns = clock.get_time_ns();
150
151 let now_raw = now_ns.as_u64();
153 let mut observed_next = self.next_time_ns.load(atomic::Ordering::SeqCst);
154
155 if observed_next <= now_raw {
156 loop {
157 match self.next_time_ns.compare_exchange(
158 observed_next,
159 now_raw,
160 atomic::Ordering::SeqCst,
161 atomic::Ordering::SeqCst,
162 ) {
163 Ok(_) => {
164 if observed_next < now_raw {
165 let original = UnixNanos::from(observed_next);
166 log::warn!(
167 "Timer '{event_name}' alert time {} was in the past, adjusted to current time for immediate fire",
168 original.to_rfc3339(),
169 );
170 }
171 observed_next = now_raw;
172 break;
173 }
174 Err(actual) => {
175 observed_next = actual;
176 if observed_next > now_raw {
177 break;
178 }
179 }
180 }
181 }
182 }
183
184 let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(observed_next));
186 let next_time_atomic = self.next_time_ns.clone();
187 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
188
189 let sender = self.sender.clone();
190
191 let rt = get_runtime();
192 let handle = rt.spawn(async move {
193 let clock = get_atomic_clock_realtime();
194
195 let overhead = Duration::from_millis(1);
197 let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
198 let mut delay = Duration::from_nanos(delay_ns);
199
200 if delay > overhead {
202 delay -= overhead;
203 } else {
204 delay = Duration::from_nanos(0);
205 }
206
207 let start = Instant::now() + delay;
208
209 let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
210
211 loop {
212 timer.tick().await;
215 let now_ns = clock.get_time_ns();
216
217 let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
218
219 match callback {
220 #[cfg(feature = "python")]
221 TimeEventCallback::Python(ref callback) => {
222 call_python_with_time_event(event, callback);
223 }
224 TimeEventCallback::Rust(_) | TimeEventCallback::RustLocal(_) => {
225 debug_assert!(
226 sender.is_some(),
227 "LiveTimer with Rust callback requires TimeEventSender"
228 );
229 let sender = sender
230 .as_ref()
231 .expect("timer event sender was unset for Rust callback system");
232
233 let handler = TimeEventHandler::new(event, callback.clone());
239 sender.send(handler);
240 }
241 }
242
243 next_time_ns += interval_ns;
245 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
246
247 if let Some(stop_time_ns) = stop_time_ns
249 && std::cmp::max(next_time_ns, now_ns) >= stop_time_ns
250 {
251 break; }
253 }
254 });
255
256 self.task_handle = Some(handle);
257 }
258
259 pub fn cancel(&mut self) {
263 log::debug!("Cancel timer '{}'", self.name);
264
265 if let Some(ref handle) = self.task_handle {
266 handle.abort();
267 }
268 }
269}
270
271impl Timer for LiveTimer {
272 fn is_expired(&self) -> bool {
273 self.task_handle
274 .as_ref()
275 .is_some_and(tokio::task::JoinHandle::is_finished)
276 }
277
278 fn cancel(&mut self) {
279 Self::cancel(self);
280 }
281}
282
283#[cfg(feature = "python")]
284fn call_python_with_time_event(event: TimeEvent, callback: &Py<PyAny>) {
285 use nautilus_core::python::IntoPyObjectNautilusExt;
286 use pyo3::types::PyCapsule;
287
288 Python::attach(|py| {
289 let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
297 .expect("Error creating `PyCapsule`")
298 .into_py_any_unwrap(py);
299
300 match callback.call1(py, (capsule,)) {
301 Ok(_) => {}
302 Err(e) => eprintln!("{NAUTILUS_PREFIX} Error on callback: {e:?}"),
303 }
304 });
305}
306
307#[cfg(test)]
308mod tests {
309 use std::{num::NonZeroU64, sync::Arc};
310
311 use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
312 use rstest::*;
313 use ustr::Ustr;
314
315 use super::LiveTimer;
316 use crate::{
317 runner::TimeEventSender,
318 timer::{TimeEventCallback, TimeEventHandler},
319 };
320
321 #[rstest]
322 fn test_live_timer_fire_immediately_field() {
323 let timer = LiveTimer::new(
324 Ustr::from("TEST_TIMER"),
325 NonZeroU64::new(1000).unwrap(),
326 UnixNanos::from(100),
327 None,
328 TimeEventCallback::from(|_| {}),
329 true, None, );
332
333 assert!(timer.fire_immediately);
335
336 assert_eq!(timer.next_time_ns(), UnixNanos::from(100));
338 }
339
340 #[rstest]
341 fn test_live_timer_fire_immediately_false_field() {
342 let timer = LiveTimer::new(
343 Ustr::from("TEST_TIMER"),
344 NonZeroU64::new(1000).unwrap(),
345 UnixNanos::from(100),
346 None,
347 TimeEventCallback::from(|_| {}),
348 false, None, );
351
352 assert!(!timer.fire_immediately);
354
355 assert_eq!(timer.next_time_ns(), UnixNanos::from(1100));
357 }
358
359 #[rstest]
360 fn test_live_timer_adjusts_past_due_start_time() {
361 #[derive(Debug)]
362 struct NoopSender;
363
364 impl TimeEventSender for NoopSender {
365 fn send(&self, _handler: TimeEventHandler) {}
366 }
367
368 let sender = Arc::new(NoopSender);
369 let mut timer = LiveTimer::new(
370 Ustr::from("PAST_TIMER"),
371 NonZeroU64::new(1).unwrap(),
372 UnixNanos::from(0),
373 None,
374 TimeEventCallback::from(|_| {}),
375 true,
376 Some(sender),
377 );
378
379 let before = get_atomic_clock_realtime().get_time_ns();
380
381 timer.start();
382
383 assert!(timer.next_time_ns() >= before);
384
385 timer.cancel();
386 }
387}