Skip to main content

nautilus_core/
time.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The core `AtomicTime` for real-time and static clocks.
17//!
18//! This module provides an atomic time abstraction that supports both real-time and static
19//! clocks. It ensures thread-safe operations and monotonic time retrieval with nanosecond precision.
20//!
21//! # Modes
22//!
23//! - **Real-time mode:** The clock continuously syncs with system wall-clock time (via
24//!   [`SystemTime::now()`]). To ensure strict monotonic increments across multiple threads,
25//!   the internal updates use an atomic compare-and-exchange loop (`time_since_epoch`).
26//!   While this guarantees that every new timestamp is at least one nanosecond greater than the
27//!   last, it may introduce higher contention if many threads call it heavily.
28//!
29//! - **Static mode:** The clock is manually controlled via [`AtomicTime::set_time`] or [`AtomicTime::increment_time`],
30//!   which can be useful for simulations or backtesting. You can switch modes at runtime using
31//!   [`AtomicTime::make_realtime`] or [`AtomicTime::make_static`]. In **static mode**, we use
32//!   acquire/release semantics so that updates from one thread can be observed by another;
33//!   however, we do not enforce strict global ordering for manual updates. If you need strong,
34//!   multi-threaded ordering in **static mode**, you must coordinate higher-level synchronization yourself.
35
36use std::{
37    ops::Deref,
38    sync::{
39        OnceLock,
40        atomic::{AtomicBool, AtomicU64, Ordering},
41    },
42    time::{Duration, SystemTime, UNIX_EPOCH},
43};
44
45use crate::{
46    UnixNanos,
47    datetime::{NANOSECONDS_IN_MICROSECOND, NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND},
48};
49
50/// Global atomic time in **real-time mode** for use across the system.
51///
52/// This clock operates in **real-time mode**, synchronizing with the system clock.
53/// It provides globally unique, strictly increasing timestamps across threads.
54pub static ATOMIC_CLOCK_REALTIME: OnceLock<AtomicTime> = OnceLock::new();
55
56/// Global atomic time in **static mode** for use across the system.
57///
58/// This clock operates in **static mode**, where the time value can be set or incremented
59/// manually. Useful for backtesting or simulated time control.
60pub static ATOMIC_CLOCK_STATIC: OnceLock<AtomicTime> = OnceLock::new();
61
62/// Returns a static reference to the global atomic clock in **real-time mode**.
63///
64/// This clock uses [`AtomicTime::time_since_epoch`] under the hood, ensuring strictly increasing
65/// timestamps across threads.
66pub fn get_atomic_clock_realtime() -> &'static AtomicTime {
67    ATOMIC_CLOCK_REALTIME.get_or_init(AtomicTime::default)
68}
69
70/// Returns a static reference to the global atomic clock in **static mode**.
71///
72/// This clock allows manual time control via [`AtomicTime::set_time`] or [`AtomicTime::increment_time`],
73/// and does not automatically sync with system time.
74pub fn get_atomic_clock_static() -> &'static AtomicTime {
75    ATOMIC_CLOCK_STATIC.get_or_init(|| AtomicTime::new(false, UnixNanos::default()))
76}
77
78/// Returns the duration since the UNIX epoch based on [`SystemTime::now()`].
79///
80/// # Panics
81///
82/// Panics if the system time is set before the UNIX epoch.
83#[inline(always)]
84#[must_use]
85pub fn duration_since_unix_epoch() -> Duration {
86    // The expect() is acceptable here because:
87    // - SystemTime failure indicates catastrophic system clock issues
88    // - This would affect the entire application's ability to function
89    // - Alternative error handling would complicate all time-dependent code paths
90    // - Such failures are extremely rare in practice and indicate hardware/OS problems
91    wall_clock_now()
92        .duration_since(UNIX_EPOCH)
93        .expect("Error calling `SystemTime`")
94}
95
96/// Returns the current wall-clock time as [`SystemTime`].
97///
98/// Under simulation (`simulation` + `cfg(madsim)`), returns virtual wall-clock
99/// time from the madsim deterministic scheduler when called from inside a
100/// madsim runtime. When called outside a runtime (e.g. plain `#[rstest]` test
101/// bodies), falls back to [`SystemTime::now()`], which under `cfg(madsim)` is
102/// libc-intercepted by madsim and resolves to the same real syscall it would
103/// in a normal build. Under normal builds, returns [`SystemTime::now()`].
104///
105/// This is the wall-clock seam. It preserves Unix-epoch semantics (unlike
106/// `tokio::time::Instant` which is monotonic and carries no epoch).
107#[inline(always)]
108#[must_use]
109fn wall_clock_now() -> SystemTime {
110    #[cfg(not(all(feature = "simulation", madsim)))]
111    {
112        SystemTime::now()
113    }
114    #[cfg(all(feature = "simulation", madsim))]
115    {
116        // `try_current` returns `None` when no madsim runtime is active.
117        // Falling back to `SystemTime::now()` matches what madsim's own libc
118        // shim does for `clock_gettime` outside a runtime; production paths
119        // running under simulation are always inside a runtime, so they
120        // continue to receive virtual time.
121        match madsim::time::TimeHandle::try_current() {
122            Some(handle) => handle.now_time(),
123            None => SystemTime::now(),
124        }
125    }
126}
127
128/// Returns the current UNIX time in nanoseconds, based on [`SystemTime::now()`].
129///
130/// # Panics
131///
132/// Panics if the duration in nanoseconds exceeds `u64::MAX`.
133#[inline(always)]
134#[must_use]
135#[expect(
136    clippy::cast_possible_truncation,
137    reason = "value is guarded by the assert above (ns <= u64::MAX)"
138)]
139pub fn nanos_since_unix_epoch() -> u64 {
140    let ns = duration_since_unix_epoch().as_nanos();
141    assert!(
142        ns <= u128::from(u64::MAX),
143        "System time overflow: value exceeds u64::MAX nanoseconds"
144    );
145    ns as u64
146}
147
148/// Represents an atomic timekeeping structure.
149///
150/// [`AtomicTime`] can act as a real-time clock or static clock based on its mode.
151/// It uses an [`AtomicU64`] to atomically update the value using only immutable
152/// references.
153///
154/// The `realtime` flag indicates which mode the clock is currently in.
155/// For concurrency, this struct uses atomic operations with appropriate memory orderings:
156/// - **Acquire/Release** for reading/writing in **static mode**.
157/// - **Compare-and-exchange (`AcqRel`)** in real-time mode to guarantee monotonic increments.
158#[repr(C)]
159#[derive(Debug)]
160pub struct AtomicTime {
161    /// Indicates whether the clock is operating in **real-time mode** (`true`) or **static mode** (`false`)
162    pub realtime: AtomicBool,
163    /// The last recorded time (in UNIX nanoseconds). Updated atomically with compare-and-exchange
164    /// in **real-time mode**, or simple store/fetch in **static mode**.
165    pub timestamp_ns: AtomicU64,
166}
167
168impl Deref for AtomicTime {
169    type Target = AtomicU64;
170
171    fn deref(&self) -> &Self::Target {
172        &self.timestamp_ns
173    }
174}
175
176impl Default for AtomicTime {
177    /// Creates a new default [`AtomicTime`] instance in **real-time mode**, starting at the current system time.
178    fn default() -> Self {
179        Self::new(true, UnixNanos::default())
180    }
181}
182
183impl AtomicTime {
184    /// Creates a new [`AtomicTime`] instance.
185    ///
186    /// - If `realtime` is `true`, the provided `time` is used only as an initial placeholder
187    ///   and will quickly be overridden by calls to [`AtomicTime::time_since_epoch`].
188    /// - If `realtime` is `false`, this clock starts in **static mode**, with the given `time`
189    ///   as its current value.
190    #[must_use]
191    pub fn new(realtime: bool, time: UnixNanos) -> Self {
192        Self {
193            realtime: AtomicBool::new(realtime),
194            timestamp_ns: AtomicU64::new(time.into()),
195        }
196    }
197
198    /// Returns the current time in nanoseconds, based on the clock’s mode.
199    ///
200    /// - In **real-time mode**, calls [`AtomicTime::time_since_epoch`], ensuring strictly increasing
201    ///   timestamps across threads, using `AcqRel` semantics for the underlying atomic.
202    /// - In **static mode**, reads the stored time using [`Ordering::Acquire`]. Updates by other
203    ///   threads using [`AtomicTime::set_time`] or [`AtomicTime::increment_time`] (Release/AcqRel)
204    ///   will be visible here.
205    ///
206    /// # Thread Safety
207    ///
208    /// The mode check is not atomic with the subsequent read/update. If another thread
209    /// switches modes between the check and the operation, one stale-mode result may be
210    /// returned. This is intentional: mode switching is a setup-time operation and should
211    /// not occur concurrently with time operations.
212    #[must_use]
213    pub fn get_time_ns(&self) -> UnixNanos {
214        if self.realtime.load(Ordering::Acquire) {
215            self.time_since_epoch()
216        } else {
217            UnixNanos::from(self.timestamp_ns.load(Ordering::Acquire))
218        }
219    }
220
221    /// Returns the current time as microseconds.
222    #[must_use]
223    pub fn get_time_us(&self) -> u64 {
224        self.get_time_ns().as_u64() / NANOSECONDS_IN_MICROSECOND
225    }
226
227    /// Returns the current time as milliseconds.
228    #[must_use]
229    pub fn get_time_ms(&self) -> u64 {
230        self.get_time_ns().as_u64() / NANOSECONDS_IN_MILLISECOND
231    }
232
233    /// Returns the current time as seconds.
234    #[must_use]
235    #[expect(
236        clippy::cast_precision_loss,
237        reason = "Precision loss acceptable for time conversion"
238    )]
239    pub fn get_time(&self) -> f64 {
240        self.get_time_ns().as_f64() / (NANOSECONDS_IN_SECOND as f64)
241    }
242
243    /// Manually sets a new time for the clock (only possible in **static mode**).
244    ///
245    /// This uses an atomic store with [`Ordering::Release`], so any thread reading with
246    /// [`Ordering::Acquire`] will see the updated time. This does *not* enforce a total ordering
247    /// among all threads, but is enough to ensure that once a thread sees this update, it also
248    /// sees all writes made before this call in the writing thread.
249    ///
250    /// Typically used in single-threaded scenarios or coordinated concurrency in **static mode**,
251    /// since there's no global ordering across threads.
252    ///
253    /// # Panics
254    ///
255    /// Panics if invoked when in real-time mode.
256    ///
257    /// # Thread Safety
258    ///
259    /// The mode check is not atomic with the subsequent store. If another thread calls
260    /// `make_realtime()` between the check and store, the invariant can be violated.
261    /// This is intentional: mode switching is a setup-time operation and should not
262    /// occur concurrently with time operations. Callers must ensure mode switches are
263    /// complete before resuming time operations.
264    pub fn set_time(&self, time: UnixNanos) {
265        assert!(
266            !self.realtime.load(Ordering::SeqCst),
267            "Cannot set time while clock is in realtime mode"
268        );
269
270        self.store(time.into(), Ordering::Release);
271
272        debug_assert!(
273            !self.realtime.load(Ordering::SeqCst),
274            "Invariant: clock must remain in static mode across `set_time`"
275        );
276    }
277
278    /// Increments the current (static-mode) time by `delta` nanoseconds and returns the updated value.
279    ///
280    /// Internally this uses [`AtomicU64::fetch_update`] with [`Ordering::AcqRel`] to ensure the increment is
281    /// atomic and visible to readers using `Acquire` loads.
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if the increment would overflow `u64::MAX` or if called
286    /// while the clock is in real-time mode.
287    ///
288    /// # Thread Safety
289    ///
290    /// The mode check is not atomic with the subsequent update. If another thread calls
291    /// `make_realtime()` between the check and update, the invariant can be violated.
292    /// This is intentional: mode switching is a setup-time operation and should not
293    /// occur concurrently with time operations. Callers must ensure mode switches are
294    /// complete before resuming time operations.
295    pub fn increment_time(&self, delta: u64) -> anyhow::Result<UnixNanos> {
296        anyhow::ensure!(
297            !self.realtime.load(Ordering::SeqCst),
298            "Cannot increment time while clock is in realtime mode"
299        );
300
301        let previous =
302            match self
303                .timestamp_ns
304                .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
305                    current.checked_add(delta)
306                }) {
307                Ok(prev) => prev,
308                Err(_) => anyhow::bail!("Cannot increment time beyond u64::MAX"),
309            };
310
311        debug_assert!(
312            !self.realtime.load(Ordering::SeqCst),
313            "Invariant: clock must remain in static mode across `increment_time`"
314        );
315
316        Ok(UnixNanos::from(previous + delta))
317    }
318
319    /// Retrieves and updates the current “real-time” clock, returning a strictly increasing
320    /// timestamp based on system time.
321    ///
322    /// Internally:
323    /// - We fetch `now` from [`SystemTime::now()`].
324    /// - We do an atomic compare-and-exchange (using [`Ordering::AcqRel`]) to ensure the stored
325    ///   timestamp is never less than the last timestamp.
326    ///
327    /// This ensures:
328    /// 1. **Monotonic increments**: The returned timestamp is strictly greater than the previous
329    ///    one (by at least 1 nanosecond).
330    /// 2. **No backward jumps**: If the OS time moves backward, we ignore that shift to preserve
331    ///    monotonicity.
332    /// 3. **Visibility**: In a multi-threaded environment, other threads see the updated value
333    ///    once this compare-and-exchange completes.
334    ///
335    /// # Panics
336    ///
337    /// Panics if the internal counter has reached `u64::MAX`, which would indicate the process has
338    /// been running for longer than the representable range (~584 years) *or* the clock was
339    /// manually corrupted.
340    pub fn time_since_epoch(&self) -> UnixNanos {
341        // This method guarantees strict consistency but may incur a performance cost under
342        // high contention due to retries in the `compare_exchange` loop.
343        let now = nanos_since_unix_epoch();
344
345        loop {
346            // Acquire to observe the latest stored value
347            let last = self.load(Ordering::Acquire);
348            // Ensure we never wrap past u64::MAX – treat that as a fatal error
349            let incremented = last
350                .checked_add(1)
351                .expect("AtomicTime overflow: reached u64::MAX");
352            let next = now.max(incremented);
353            // AcqRel on success ensures this new value is published,
354            // Acquire on failure reloads if we lost a CAS race.
355            //
356            // Note that under heavy contention (many threads calling this in tight loops),
357            // the CAS loop may increase latency.
358            //
359            // However, in practice, the loop terminates quickly because:
360            // - System time naturally advances between iterations
361            // - Each iteration increments time by at least 1ns, preventing ABA problems
362            // - True contention requiring retry is rare in normal usage patterns
363            //
364            // The concurrent stress test (4 threads × 100k iterations) validates this approach.
365            if self
366                .compare_exchange(last, next, Ordering::AcqRel, Ordering::Acquire)
367                .is_ok()
368            {
369                debug_assert!(
370                    next > last,
371                    "Invariant: time is strictly monotonic across CAS"
372                );
373                return UnixNanos::from(next);
374            }
375        }
376    }
377
378    /// Switches the clock to **real-time mode** (`realtime = true`).
379    ///
380    /// If transitioning from static mode, the internal counter is reset to the current
381    /// wall-clock time so that [`AtomicTime::time_since_epoch`] does not carry forward a
382    /// timestamp set during static mode (e.g. a backtest far in the future).
383    ///
384    /// Uses [`Ordering::SeqCst`] for the mode flag to ensure global ordering.
385    pub fn make_realtime(&self) {
386        if !self.realtime.swap(true, Ordering::SeqCst) {
387            self.timestamp_ns
388                .store(nanos_since_unix_epoch(), Ordering::Release);
389        }
390    }
391
392    /// Switches the clock to **static mode** (`realtime = false`).
393    ///
394    /// If transitioning from real-time mode, the internal counter is snapshotted to the
395    /// current wall-clock time so that subsequent static reads return a reasonable value
396    /// rather than a stale or zero placeholder.
397    ///
398    /// Uses [`Ordering::SeqCst`] for the mode flag to ensure global ordering.
399    pub fn make_static(&self) {
400        if self.realtime.swap(false, Ordering::SeqCst) {
401            self.timestamp_ns
402                .store(nanos_since_unix_epoch(), Ordering::Release);
403        }
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use std::sync::Arc;
410
411    use rstest::*;
412
413    use super::*;
414
415    #[rstest]
416    fn test_global_clocks_initialization() {
417        let realtime_clock = get_atomic_clock_realtime();
418        assert!(realtime_clock.get_time_ns().as_u64() > 0);
419
420        let static_clock = get_atomic_clock_static();
421        static_clock.set_time(UnixNanos::from(500_000_000)); // 500 ms
422        assert_eq!(static_clock.get_time_ns().as_u64(), 500_000_000);
423    }
424
425    #[rstest]
426    fn test_mode_switching() {
427        let time = AtomicTime::new(true, UnixNanos::default());
428
429        // Verify real-time mode
430        let realtime_ns = time.get_time_ns();
431        assert!(realtime_ns.as_u64() > 0);
432
433        // Switch to static mode
434        time.make_static();
435        time.set_time(UnixNanos::from(1_000_000_000)); // 1 second
436        let static_ns = time.get_time_ns();
437        assert_eq!(static_ns.as_u64(), 1_000_000_000);
438
439        // Switch back to real-time mode
440        time.make_realtime();
441        let new_realtime_ns = time.get_time_ns();
442        assert!(new_realtime_ns.as_u64() > static_ns.as_u64());
443    }
444
445    #[rstest]
446    #[should_panic(expected = "Cannot set time while clock is in realtime mode")]
447    fn test_set_time_panics_in_realtime_mode() {
448        let clock = AtomicTime::new(true, UnixNanos::default());
449        clock.set_time(UnixNanos::from(123));
450    }
451
452    #[rstest]
453    fn test_increment_time_returns_error_in_realtime_mode() {
454        let clock = AtomicTime::new(true, UnixNanos::default());
455        let result = clock.increment_time(1);
456        assert!(result.is_err());
457        assert!(
458            result
459                .unwrap_err()
460                .to_string()
461                .contains("Cannot increment time while clock is in realtime mode")
462        );
463    }
464
465    #[rstest]
466    #[should_panic(expected = "AtomicTime overflow")]
467    fn test_time_since_epoch_overflow_panics() {
468        use std::sync::atomic::{AtomicBool, AtomicU64};
469
470        // Manually construct a clock with the counter already at u64::MAX
471        let clock = AtomicTime {
472            realtime: AtomicBool::new(true),
473            timestamp_ns: AtomicU64::new(u64::MAX),
474        };
475
476        // This call will attempt to add 1 and must panic
477        let _ = clock.time_since_epoch();
478    }
479
480    #[rstest]
481    fn test_make_static_snapshots_wall_time() {
482        // A fresh realtime clock that has never been read starts with timestamp_ns = 0.
483        // Switching to static should snapshot wall time, not leave it at 0.
484        let clock = AtomicTime::new(true, UnixNanos::default());
485        clock.make_static();
486        let ts = clock.get_time_ns();
487        assert!(
488            ts.as_u64() > 1_650_000_000_000_000_000,
489            "Expected wall-clock snapshot, was {ts}"
490        );
491    }
492
493    #[rstest]
494    fn test_make_realtime_resets_future_timestamp() {
495        // If static mode set the clock into the future, switching to realtime
496        // should reset to wall time so timestamps are not poisoned.
497        let clock = AtomicTime::new(false, UnixNanos::from(u64::MAX - 1_000));
498        clock.make_realtime();
499        let ts = clock.get_time_ns();
500        // Should be near current wall time, not near u64::MAX
501        let now = nanos_since_unix_epoch();
502        assert!(
503            ts.as_u64() <= now + 1_000_000_000, // within 1 second
504            "Expected wall-clock time, was {ts} (now={now})"
505        );
506    }
507
508    #[rstest]
509    fn test_make_static_idempotent() {
510        // Calling make_static on an already-static clock should not change the time
511        let clock = AtomicTime::new(false, UnixNanos::from(42));
512        clock.make_static();
513        assert_eq!(clock.get_time_ns(), UnixNanos::from(42));
514    }
515
516    #[rstest]
517    fn test_make_realtime_idempotent() {
518        // Calling make_realtime on an already-realtime clock should not reset the counter
519        let clock = AtomicTime::new(true, UnixNanos::default());
520        let ts1 = clock.get_time_ns();
521        clock.make_realtime(); // already realtime, should be a no-op
522        let ts2 = clock.get_time_ns();
523        assert!(ts2 >= ts1);
524    }
525
526    #[rstest]
527    fn test_static_time_is_stable() {
528        // Create a clock in static mode with an initial value
529        let clock = AtomicTime::new(false, UnixNanos::from(42));
530        let time1 = clock.get_time_ns();
531
532        // Sleep a bit to give the system time to change, if the clock were using real-time
533        std::thread::sleep(std::time::Duration::from_millis(10));
534        let time2 = clock.get_time_ns();
535
536        // In static mode, the value should remain unchanged
537        assert_eq!(time1, time2);
538    }
539
540    #[rstest]
541    fn test_increment_time() {
542        // Start in static mode
543        let time = AtomicTime::new(false, UnixNanos::from(0));
544
545        let updated_time = time.increment_time(500).unwrap();
546        assert_eq!(updated_time.as_u64(), 500);
547
548        let updated_time = time.increment_time(1_000).unwrap();
549        assert_eq!(updated_time.as_u64(), 1_500);
550    }
551
552    #[rstest]
553    fn test_increment_time_overflow_errors() {
554        let time = AtomicTime::new(false, UnixNanos::from(u64::MAX - 5));
555
556        let err = time.increment_time(10).unwrap_err();
557        assert_eq!(err.to_string(), "Cannot increment time beyond u64::MAX");
558    }
559
560    #[rstest]
561    #[expect(
562        clippy::cast_possible_truncation,
563        clippy::cast_possible_wrap,
564        reason = "Intentional cast for Python interop"
565    )]
566    fn test_nanos_since_unix_epoch_vs_system_time() {
567        let unix_nanos = nanos_since_unix_epoch();
568        let system_ns = duration_since_unix_epoch().as_nanos() as u64;
569        assert!((unix_nanos as i64 - system_ns as i64).abs() < NANOSECONDS_IN_SECOND as i64);
570    }
571
572    #[rstest]
573    fn test_time_since_epoch_monotonicity() {
574        let clock = get_atomic_clock_realtime();
575        let mut previous = clock.time_since_epoch();
576        for _ in 0..1_000_000 {
577            let current = clock.time_since_epoch();
578            assert!(current > previous);
579            previous = current;
580        }
581    }
582
583    #[rstest]
584    fn test_time_since_epoch_strictly_increasing_concurrent() {
585        let time = Arc::new(AtomicTime::new(true, UnixNanos::default()));
586        let num_threads = 4;
587        let iterations = 100_000;
588        let mut handles = Vec::with_capacity(num_threads);
589
590        for thread_id in 0..num_threads {
591            let time_clone = Arc::clone(&time);
592
593            let handle = std::thread::spawn(move || {
594                let mut previous = time_clone.time_since_epoch().as_u64();
595
596                for i in 0..iterations {
597                    let current = time_clone.time_since_epoch().as_u64();
598                    assert!(
599                        current > previous,
600                        "Thread {thread_id}: iteration {i}: time did not increase: previous={previous}, current={current}",
601                    );
602                    previous = current;
603                }
604            });
605
606            handles.push(handle);
607        }
608
609        for handle in handles {
610            handle.join().unwrap();
611        }
612    }
613
614    #[rstest]
615    fn test_duration_since_unix_epoch() {
616        let time = AtomicTime::new(true, UnixNanos::default());
617        let duration = Duration::from_nanos(time.get_time_ns().into());
618        let now = SystemTime::now();
619
620        // Check if the duration is close to the actual difference between now and UNIX_EPOCH
621        let delta = now
622            .duration_since(UNIX_EPOCH)
623            .unwrap()
624            .checked_sub(duration);
625        assert!(delta.unwrap_or_default() < Duration::from_millis(100));
626
627        // Check if the duration is greater than a certain value (assuming the test is run after that point)
628        assert!(duration > Duration::from_mins(27_500_000));
629    }
630
631    #[rstest]
632    fn test_unix_timestamp_is_monotonic_increasing() {
633        let time = AtomicTime::new(true, UnixNanos::default());
634        let result1 = time.get_time();
635        let result2 = time.get_time();
636        let result3 = time.get_time();
637        let result4 = time.get_time();
638        let result5 = time.get_time();
639
640        assert!(result2 >= result1);
641        assert!(result3 >= result2);
642        assert!(result4 >= result3);
643        assert!(result5 >= result4);
644        assert!(result1 > 1_650_000_000.0);
645    }
646
647    #[rstest]
648    fn test_unix_timestamp_ms_is_monotonic_increasing() {
649        let time = AtomicTime::new(true, UnixNanos::default());
650        let result1 = time.get_time_ms();
651        let result2 = time.get_time_ms();
652        let result3 = time.get_time_ms();
653        let result4 = time.get_time_ms();
654        let result5 = time.get_time_ms();
655
656        assert!(result2 >= result1);
657        assert!(result3 >= result2);
658        assert!(result4 >= result3);
659        assert!(result5 >= result4);
660        assert!(result1 >= 1_650_000_000_000);
661    }
662
663    #[rstest]
664    fn test_unix_timestamp_us_is_monotonic_increasing() {
665        let time = AtomicTime::new(true, UnixNanos::default());
666        let result1 = time.get_time_us();
667        let result2 = time.get_time_us();
668        let result3 = time.get_time_us();
669        let result4 = time.get_time_us();
670        let result5 = time.get_time_us();
671
672        assert!(result2 >= result1);
673        assert!(result3 >= result2);
674        assert!(result4 >= result3);
675        assert!(result5 >= result4);
676        assert!(result1 > 1_650_000_000_000_000);
677    }
678
679    #[rstest]
680    fn test_unix_timestamp_ns_is_monotonic_increasing() {
681        let time = AtomicTime::new(true, UnixNanos::default());
682        let result1 = time.get_time_ns();
683        let result2 = time.get_time_ns();
684        let result3 = time.get_time_ns();
685        let result4 = time.get_time_ns();
686        let result5 = time.get_time_ns();
687
688        assert!(result2 >= result1);
689        assert!(result3 >= result2);
690        assert!(result4 >= result3);
691        assert!(result5 >= result4);
692        assert!(result1.as_u64() > 1_650_000_000_000_000_000);
693    }
694
695    #[rstest]
696    fn test_acquire_release_contract_static_mode() {
697        // This test explicitly proves the Acquire/Release memory ordering contract:
698        // - Writer thread uses set_time() which does Release store (see AtomicTime::set_time)
699        // - Reader thread uses get_time_ns() which does Acquire load (see AtomicTime::get_time_ns)
700        // - The Release-Acquire pair ensures all writes before Release are visible after Acquire
701
702        let clock = Arc::new(AtomicTime::new(false, UnixNanos::from(0)));
703        let aux_data = Arc::new(AtomicU64::new(0));
704        let done = Arc::new(AtomicBool::new(false));
705
706        // Writer thread: updates auxiliary data, then releases via set_time
707        let writer_clock = Arc::clone(&clock);
708        let writer_aux = Arc::clone(&aux_data);
709        let writer_done = Arc::clone(&done);
710
711        let writer = std::thread::spawn(move || {
712            for i in 1..=1_000u64 {
713                writer_aux.store(i, Ordering::Relaxed);
714
715                // Release store via set_time creates a release fence - all prior writes (including aux_data)
716                // must be visible to any thread that observes this time value via Acquire load
717                writer_clock.set_time(UnixNanos::from(i * 1000));
718
719                // Yield to encourage interleaving
720                std::thread::yield_now();
721            }
722            writer_done.store(true, Ordering::Release);
723        });
724
725        // Reader thread: acquires via get_time_ns, then checks auxiliary data
726        let reader_clock = Arc::clone(&clock);
727        let reader_aux = Arc::clone(&aux_data);
728        let reader_done = Arc::clone(&done);
729
730        let reader = std::thread::spawn(move || {
731            let mut last_time = 0u64;
732            let mut max_aux_seen = 0u64;
733
734            // Poll until writer is done, with no iteration limit
735            while !reader_done.load(Ordering::Acquire) {
736                let current_time = reader_clock.get_time_ns().as_u64();
737
738                if current_time > last_time {
739                    // The Acquire in get_time_ns synchronizes with the Release in set_time,
740                    // making aux_data visible
741                    let aux_value = reader_aux.load(Ordering::Relaxed);
742
743                    // Invariant: aux_value must never go backwards (proves Release-Acquire sync works)
744                    if aux_value > 0 {
745                        assert!(
746                            aux_value >= max_aux_seen,
747                            "Acquire/Release contract violated: aux went backwards from {max_aux_seen} to {aux_value}"
748                        );
749                        max_aux_seen = aux_value;
750                    }
751
752                    last_time = current_time;
753                }
754
755                std::thread::yield_now();
756            }
757
758            // Check final state after writer completes to ensure we observe updates
759            let final_time = reader_clock.get_time_ns().as_u64();
760            if final_time > last_time {
761                let final_aux = reader_aux.load(Ordering::Relaxed);
762                if final_aux > 0 {
763                    assert!(
764                        final_aux >= max_aux_seen,
765                        "Acquire/Release contract violated: final aux {final_aux} < max {max_aux_seen}"
766                    );
767                    max_aux_seen = final_aux;
768                }
769            }
770
771            max_aux_seen
772        });
773
774        writer.join().unwrap();
775        let max_observed = reader.join().unwrap();
776
777        // Ensure the reader actually observed updates (not vacuously satisfied)
778        assert!(max_observed > 0, "Reader must observe writer updates");
779    }
780
781    #[rstest]
782    fn test_acquire_release_contract_increment_time() {
783        // Similar test for increment_time, which uses fetch_update with AcqRel (see AtomicTime::increment_time)
784
785        let clock = Arc::new(AtomicTime::new(false, UnixNanos::from(0)));
786        let aux_data = Arc::new(AtomicU64::new(0));
787        let done = Arc::new(AtomicBool::new(false));
788
789        let writer_clock = Arc::clone(&clock);
790        let writer_aux = Arc::clone(&aux_data);
791        let writer_done = Arc::clone(&done);
792
793        let writer = std::thread::spawn(move || {
794            for i in 1..=1_000u64 {
795                writer_aux.store(i, Ordering::Relaxed);
796                let _ = writer_clock.increment_time(1000).unwrap();
797                std::thread::yield_now();
798            }
799            writer_done.store(true, Ordering::Release);
800        });
801
802        let reader_clock = Arc::clone(&clock);
803        let reader_aux = Arc::clone(&aux_data);
804        let reader_done = Arc::clone(&done);
805
806        let reader = std::thread::spawn(move || {
807            let mut last_time = 0u64;
808            let mut max_aux = 0u64;
809
810            // Poll until writer is done, with no iteration limit
811            while !reader_done.load(Ordering::Acquire) {
812                let current_time = reader_clock.get_time_ns().as_u64();
813
814                if current_time > last_time {
815                    let aux_value = reader_aux.load(Ordering::Relaxed);
816
817                    // Invariant: aux_value must never regress (proves AcqRel sync works)
818                    if aux_value > 0 {
819                        assert!(
820                            aux_value >= max_aux,
821                            "AcqRel contract violated: aux regressed from {max_aux} to {aux_value}"
822                        );
823                        max_aux = aux_value;
824                    }
825
826                    last_time = current_time;
827                }
828
829                std::thread::yield_now();
830            }
831
832            // Check final state after writer completes to ensure we observe updates
833            let final_time = reader_clock.get_time_ns().as_u64();
834            if final_time > last_time {
835                let final_aux = reader_aux.load(Ordering::Relaxed);
836                if final_aux > 0 {
837                    assert!(
838                        final_aux >= max_aux,
839                        "AcqRel contract violated: final aux {final_aux} < max {max_aux}"
840                    );
841                    max_aux = final_aux;
842                }
843            }
844
845            max_aux
846        });
847
848        writer.join().unwrap();
849        let max_observed = reader.join().unwrap();
850
851        // Ensure the reader actually observed updates (not vacuously satisfied)
852        assert!(max_observed > 0, "Reader must observe writer updates");
853    }
854
855    // The wall-clock seam (`wall_clock_now`) routes through madsim's virtual
856    // clock under simulation. Sleeping for 60 virtual seconds must advance
857    // the value returned by `nanos_since_unix_epoch` by 60s in wall-clock
858    // terms. If the cfg gate fell through to `SystemTime::now()`, the elapsed
859    // value would only reflect real wall-clock time (~0ms) and the assertion
860    // would fail.
861    #[cfg(all(feature = "simulation", madsim))]
862    #[madsim::test]
863    async fn test_wall_clock_advances_with_virtual_time() {
864        let before = nanos_since_unix_epoch();
865        madsim::time::sleep(std::time::Duration::from_secs(60)).await;
866        let after = nanos_since_unix_epoch();
867
868        let elapsed_ns = after.saturating_sub(before);
869        let sixty_seconds_ns = 60 * NANOSECONDS_IN_SECOND;
870        assert!(
871            elapsed_ns >= sixty_seconds_ns,
872            "wall clock did not advance by full virtual sleep: elapsed={elapsed_ns}ns"
873        );
874    }
875}