Skip to main content

nautilus_common/live/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live timer implementation using Tokio for real-time scheduling.
17
18use std::{
19    num::NonZeroU64,
20    sync::{
21        Arc,
22        atomic::{self, AtomicU64},
23    },
24};
25
26#[cfg(feature = "python")]
27use nautilus_core::consts::NAUTILUS_PREFIX;
28use nautilus_core::{
29    UUID4, UnixNanos,
30    correctness::{FAILED, check_valid_string_utf8},
31    datetime::floor_to_nearest_microsecond,
32    time::get_atomic_clock_realtime,
33};
34#[cfg(feature = "python")]
35use pyo3::{Py, PyAny, Python};
36use tokio::{
37    task::JoinHandle,
38    time::{Duration, Instant},
39};
40use ustr::Ustr;
41
42use super::runtime::get_runtime;
43use crate::{
44    runner::TimeEventSender,
45    timer::{TimeEvent, TimeEventCallback, TimeEventHandler, Timer},
46};
47
48/// A live timer for use with a `LiveClock`.
49///
50/// `LiveTimer` triggers events at specified intervals in a real-time environment,
51/// using Tokio's async runtime to handle scheduling and execution.
52///
53/// # Threading
54///
55/// The timer runs on the runtime thread that created it and dispatches events across threads as needed.
56#[derive(Debug)]
57pub struct LiveTimer {
58    /// The name of the timer.
59    pub name: Ustr,
60    /// The start time of the timer in UNIX nanoseconds.
61    pub interval_ns: NonZeroU64,
62    /// The start time of the timer in UNIX nanoseconds.
63    pub start_time_ns: UnixNanos,
64    /// The optional stop time of the timer in UNIX nanoseconds.
65    pub stop_time_ns: Option<UnixNanos>,
66    /// If the timer should fire immediately at start time.
67    pub fire_immediately: bool,
68    next_time_ns: Arc<AtomicU64>,
69    callback: TimeEventCallback,
70    task_handle: Option<JoinHandle<()>>,
71    sender: Option<Arc<dyn TimeEventSender>>,
72}
73
74impl LiveTimer {
75    /// Creates a new [`LiveTimer`] instance.
76    ///
77    /// # Panics
78    ///
79    /// Panics if `name` is not a valid string.
80    #[must_use]
81    pub fn new(
82        name: Ustr,
83        interval_ns: NonZeroU64,
84        start_time_ns: UnixNanos,
85        stop_time_ns: Option<UnixNanos>,
86        callback: TimeEventCallback,
87        fire_immediately: bool,
88        sender: Option<Arc<dyn TimeEventSender>>,
89    ) -> Self {
90        check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
91
92        let next_time_ns = if fire_immediately {
93            start_time_ns.as_u64()
94        } else {
95            start_time_ns.as_u64() + interval_ns.get()
96        };
97
98        log::debug!("Creating timer '{name}'");
99
100        Self {
101            name,
102            interval_ns,
103            start_time_ns,
104            stop_time_ns,
105            fire_immediately,
106            next_time_ns: Arc::new(AtomicU64::new(next_time_ns)),
107            callback,
108            task_handle: None,
109            sender,
110        }
111    }
112
113    /// Returns the next time in UNIX nanoseconds when the timer will fire.
114    ///
115    /// Provides the scheduled time for the next event based on the current state of the timer.
116    #[must_use]
117    pub fn next_time_ns(&self) -> UnixNanos {
118        UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
119    }
120
121    /// Returns whether the timer is expired.
122    ///
123    /// An expired timer will not trigger any further events.
124    /// A timer that has not been started is not expired.
125    #[must_use]
126    pub fn is_expired(&self) -> bool {
127        self.task_handle
128            .as_ref()
129            .is_some_and(tokio::task::JoinHandle::is_finished)
130    }
131
132    /// Starts the timer.
133    ///
134    /// Time events will begin triggering at the specified intervals.
135    /// The generated events are handled by the provided callback function.
136    ///
137    /// # Panics
138    ///
139    /// Panics if using a Rust callback (`Rust` or `RustLocal`) without a `TimeEventSender`.
140    #[allow(unused_variables)]
141    pub fn start(&mut self) {
142        let event_name = self.name;
143        let stop_time_ns = self.stop_time_ns;
144        let interval_ns = self.interval_ns.get();
145        let callback = self.callback.clone();
146
147        // Get current time
148        let clock = get_atomic_clock_realtime();
149        let now_ns = clock.get_time_ns();
150
151        // Check if the timer's alert time is in the past and adjust if needed
152        let now_raw = now_ns.as_u64();
153        let mut observed_next = self.next_time_ns.load(atomic::Ordering::SeqCst);
154
155        if observed_next <= now_raw {
156            loop {
157                match self.next_time_ns.compare_exchange(
158                    observed_next,
159                    now_raw,
160                    atomic::Ordering::SeqCst,
161                    atomic::Ordering::SeqCst,
162                ) {
163                    Ok(_) => {
164                        if observed_next < now_raw {
165                            let original = UnixNanos::from(observed_next);
166                            log::warn!(
167                                "Timer '{event_name}' alert time {} was in the past, adjusted to current time for immediate fire",
168                                original.to_rfc3339(),
169                            );
170                        }
171                        observed_next = now_raw;
172                        break;
173                    }
174                    Err(actual) => {
175                        observed_next = actual;
176                        if observed_next > now_raw {
177                            break;
178                        }
179                    }
180                }
181            }
182        }
183
184        // Floor the next time to the nearest microsecond which is within the timers accuracy
185        let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(observed_next));
186        let next_time_atomic = self.next_time_ns.clone();
187        next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
188
189        let sender = self.sender.clone();
190
191        let rt = get_runtime();
192        let handle = rt.spawn(async move {
193            let clock = get_atomic_clock_realtime();
194
195            // 1-millisecond delay to account for the overhead of initializing a tokio timer
196            let overhead = Duration::from_millis(1);
197            let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
198            let mut delay = Duration::from_nanos(delay_ns);
199
200            // Subtract the estimated startup overhead; saturating to zero for sub-ms delays
201            if delay > overhead {
202                delay -= overhead;
203            } else {
204                delay = Duration::from_nanos(0);
205            }
206
207            let start = Instant::now() + delay;
208
209            let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
210
211            loop {
212                // `timer.tick` is cancellation safe, if the cancel branch completes
213                // first then no tick has been consumed (no event was ready).
214                timer.tick().await;
215                let now_ns = clock.get_time_ns();
216
217                let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
218
219                match callback {
220                    #[cfg(feature = "python")]
221                    TimeEventCallback::Python(ref callback) => {
222                        call_python_with_time_event(event, callback);
223                    }
224                    TimeEventCallback::Rust(_) | TimeEventCallback::RustLocal(_) => {
225                        debug_assert!(
226                            sender.is_some(),
227                            "LiveTimer with Rust callback requires TimeEventSender"
228                        );
229                        let sender = sender
230                            .as_ref()
231                            .expect("timer event sender was unset for Rust callback system");
232
233                        // TODO: This clone happens on a Tokio worker thread. For `RustLocal`
234                        // callbacks containing `Rc`, this violates thread safety (Rc::clone
235                        // is not thread-safe). The callback should be stored separately and
236                        // looked up by timer name on the receiving thread, rather than being
237                        // cloned here. This affects any code using RustLocal with LiveTimer.
238                        let handler = TimeEventHandler::new(event, callback.clone());
239                        sender.send(handler);
240                    }
241                }
242
243                // Prepare next time interval
244                next_time_ns += interval_ns;
245                next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
246
247                // Check if expired
248                if let Some(stop_time_ns) = stop_time_ns
249                    && std::cmp::max(next_time_ns, now_ns) >= stop_time_ns
250                {
251                    break; // Timer expired
252                }
253            }
254        });
255
256        self.task_handle = Some(handle);
257    }
258
259    /// Cancels the timer.
260    ///
261    /// The timer will not generate a final event.
262    pub fn cancel(&mut self) {
263        log::debug!("Cancel timer '{}'", self.name);
264
265        if let Some(ref handle) = self.task_handle {
266            handle.abort();
267        }
268    }
269}
270
271impl Timer for LiveTimer {
272    fn is_expired(&self) -> bool {
273        self.task_handle
274            .as_ref()
275            .is_some_and(tokio::task::JoinHandle::is_finished)
276    }
277
278    fn cancel(&mut self) {
279        Self::cancel(self);
280    }
281}
282
283#[cfg(feature = "python")]
284fn call_python_with_time_event(event: TimeEvent, callback: &Py<PyAny>) {
285    use nautilus_core::python::IntoPyObjectNautilusExt;
286    use pyo3::types::PyCapsule;
287
288    Python::attach(|py| {
289        // Create a new PyCapsule that owns `event` and registers a destructor so
290        // the contained `TimeEvent` is properly freed once the capsule is
291        // garbage-collected by Python. Without the destructor the memory would
292        // leak because the capsule would not know how to drop the Rust value.
293
294        // Register a destructor that simply drops the `TimeEvent` once the
295        // capsule is freed on the Python side.
296        let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
297            .expect("Error creating `PyCapsule`")
298            .into_py_any_unwrap(py);
299
300        match callback.call1(py, (capsule,)) {
301            Ok(_) => {}
302            Err(e) => eprintln!("{NAUTILUS_PREFIX} Error on callback: {e:?}"),
303        }
304    });
305}
306
307#[cfg(test)]
308mod tests {
309    use std::{num::NonZeroU64, sync::Arc};
310
311    use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
312    use rstest::*;
313    use ustr::Ustr;
314
315    use super::LiveTimer;
316    use crate::{
317        runner::TimeEventSender,
318        timer::{TimeEventCallback, TimeEventHandler},
319    };
320
321    #[rstest]
322    fn test_live_timer_fire_immediately_field() {
323        let timer = LiveTimer::new(
324            Ustr::from("TEST_TIMER"),
325            NonZeroU64::new(1000).unwrap(),
326            UnixNanos::from(100),
327            None,
328            TimeEventCallback::from(|_| {}),
329            true, // fire_immediately = true
330            None, // time_event_sender
331        );
332
333        // Verify the field is set correctly
334        assert!(timer.fire_immediately);
335
336        // With fire_immediately=true, next_time_ns should be start_time_ns
337        assert_eq!(timer.next_time_ns(), UnixNanos::from(100));
338    }
339
340    #[rstest]
341    fn test_live_timer_fire_immediately_false_field() {
342        let timer = LiveTimer::new(
343            Ustr::from("TEST_TIMER"),
344            NonZeroU64::new(1000).unwrap(),
345            UnixNanos::from(100),
346            None,
347            TimeEventCallback::from(|_| {}),
348            false, // fire_immediately = false
349            None,  // time_event_sender
350        );
351
352        // Verify the field is set correctly
353        assert!(!timer.fire_immediately);
354
355        // With fire_immediately=false, next_time_ns should be start_time_ns + interval
356        assert_eq!(timer.next_time_ns(), UnixNanos::from(1100));
357    }
358
359    #[rstest]
360    fn test_live_timer_adjusts_past_due_start_time() {
361        #[derive(Debug)]
362        struct NoopSender;
363
364        impl TimeEventSender for NoopSender {
365            fn send(&self, _handler: TimeEventHandler) {}
366        }
367
368        let sender = Arc::new(NoopSender);
369        let mut timer = LiveTimer::new(
370            Ustr::from("PAST_TIMER"),
371            NonZeroU64::new(1).unwrap(),
372            UnixNanos::from(0),
373            None,
374            TimeEventCallback::from(|_| {}),
375            true,
376            Some(sender),
377        );
378
379        let before = get_atomic_clock_realtime().get_time_ns();
380
381        timer.start();
382
383        assert!(timer.next_time_ns() >= before);
384
385        timer.cancel();
386    }
387}