nautilus_core/time.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The core `AtomicTime` for real-time and static clocks.
17//!
18//! This module provides an atomic time abstraction that supports both real-time and static
19//! clocks. It ensures thread-safe operations and monotonic time retrieval with nanosecond precision.
20//!
21//! # Modes
22//!
23//! - **Real-time mode:** The clock continuously syncs with system wall-clock time (via
24//! [`SystemTime::now()`]). To ensure strict monotonic increments across multiple threads,
25//! the internal updates use an atomic compare-and-exchange loop (`time_since_epoch`).
26//! While this guarantees that every new timestamp is at least one nanosecond greater than the
27//! last, it may introduce higher contention if many threads call it heavily.
28//!
29//! - **Static mode:** The clock is manually controlled via [`AtomicTime::set_time`] or [`AtomicTime::increment_time`],
30//! which can be useful for simulations or backtesting. You can switch modes at runtime using
31//! [`AtomicTime::make_realtime`] or [`AtomicTime::make_static`]. In **static mode**, we use
32//! acquire/release semantics so that updates from one thread can be observed by another;
33//! however, we do not enforce strict global ordering for manual updates. If you need strong,
34//! multi-threaded ordering in **static mode**, you must coordinate higher-level synchronization yourself.
35
36use std::{
37 ops::Deref,
38 sync::{
39 OnceLock,
40 atomic::{AtomicBool, AtomicU64, Ordering},
41 },
42 time::{Duration, SystemTime, UNIX_EPOCH},
43};
44
45use crate::{
46 UnixNanos,
47 datetime::{NANOSECONDS_IN_MICROSECOND, NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND},
48};
49
50/// Global atomic time in **real-time mode** for use across the system.
51///
52/// This clock operates in **real-time mode**, synchronizing with the system clock.
53/// It provides globally unique, strictly increasing timestamps across threads.
54pub static ATOMIC_CLOCK_REALTIME: OnceLock<AtomicTime> = OnceLock::new();
55
56/// Global atomic time in **static mode** for use across the system.
57///
58/// This clock operates in **static mode**, where the time value can be set or incremented
59/// manually. Useful for backtesting or simulated time control.
60pub static ATOMIC_CLOCK_STATIC: OnceLock<AtomicTime> = OnceLock::new();
61
62/// Returns a static reference to the global atomic clock in **real-time mode**.
63///
64/// This clock uses [`AtomicTime::time_since_epoch`] under the hood, ensuring strictly increasing
65/// timestamps across threads.
66pub fn get_atomic_clock_realtime() -> &'static AtomicTime {
67 ATOMIC_CLOCK_REALTIME.get_or_init(AtomicTime::default)
68}
69
70/// Returns a static reference to the global atomic clock in **static mode**.
71///
72/// This clock allows manual time control via [`AtomicTime::set_time`] or [`AtomicTime::increment_time`],
73/// and does not automatically sync with system time.
74pub fn get_atomic_clock_static() -> &'static AtomicTime {
75 ATOMIC_CLOCK_STATIC.get_or_init(|| AtomicTime::new(false, UnixNanos::default()))
76}
77
78/// Returns the duration since the UNIX epoch based on [`SystemTime::now()`].
79///
80/// # Panics
81///
82/// Panics if the system time is set before the UNIX epoch.
83#[inline(always)]
84#[must_use]
85pub fn duration_since_unix_epoch() -> Duration {
86 // The expect() is acceptable here because:
87 // - SystemTime failure indicates catastrophic system clock issues
88 // - This would affect the entire application's ability to function
89 // - Alternative error handling would complicate all time-dependent code paths
90 // - Such failures are extremely rare in practice and indicate hardware/OS problems
91 wall_clock_now()
92 .duration_since(UNIX_EPOCH)
93 .expect("Error calling `SystemTime`")
94}
95
96/// Returns the current wall-clock time as [`SystemTime`].
97///
98/// Under simulation (`simulation` + `cfg(madsim)`), returns virtual wall-clock
99/// time from the madsim deterministic scheduler when called from inside a
100/// madsim runtime. When called outside a runtime (e.g. plain `#[rstest]` test
101/// bodies), falls back to [`SystemTime::now()`], which under `cfg(madsim)` is
102/// libc-intercepted by madsim and resolves to the same real syscall it would
103/// in a normal build. Under normal builds, returns [`SystemTime::now()`].
104///
105/// This is the wall-clock seam. It preserves Unix-epoch semantics (unlike
106/// `tokio::time::Instant` which is monotonic and carries no epoch).
107#[inline(always)]
108#[must_use]
109fn wall_clock_now() -> SystemTime {
110 #[cfg(not(all(feature = "simulation", madsim)))]
111 {
112 SystemTime::now()
113 }
114 #[cfg(all(feature = "simulation", madsim))]
115 {
116 // `try_current` returns `None` when no madsim runtime is active.
117 // Falling back to `SystemTime::now()` matches what madsim's own libc
118 // shim does for `clock_gettime` outside a runtime; production paths
119 // running under simulation are always inside a runtime, so they
120 // continue to receive virtual time.
121 match madsim::time::TimeHandle::try_current() {
122 Some(handle) => handle.now_time(),
123 None => SystemTime::now(),
124 }
125 }
126}
127
128/// Returns the current UNIX time in nanoseconds, based on [`SystemTime::now()`].
129///
130/// # Panics
131///
132/// Panics if the duration in nanoseconds exceeds `u64::MAX`.
133#[inline(always)]
134#[must_use]
135#[expect(
136 clippy::cast_possible_truncation,
137 reason = "value is guarded by the assert above (ns <= u64::MAX)"
138)]
139pub fn nanos_since_unix_epoch() -> u64 {
140 let ns = duration_since_unix_epoch().as_nanos();
141 assert!(
142 ns <= u128::from(u64::MAX),
143 "System time overflow: value exceeds u64::MAX nanoseconds"
144 );
145 ns as u64
146}
147
148/// Represents an atomic timekeeping structure.
149///
150/// [`AtomicTime`] can act as a real-time clock or static clock based on its mode.
151/// It uses an [`AtomicU64`] to atomically update the value using only immutable
152/// references.
153///
154/// The `realtime` flag indicates which mode the clock is currently in.
155/// For concurrency, this struct uses atomic operations with appropriate memory orderings:
156/// - **Acquire/Release** for reading/writing in **static mode**.
157/// - **Compare-and-exchange (`AcqRel`)** in real-time mode to guarantee monotonic increments.
158#[repr(C)]
159#[derive(Debug)]
160pub struct AtomicTime {
161 /// Indicates whether the clock is operating in **real-time mode** (`true`) or **static mode** (`false`)
162 pub realtime: AtomicBool,
163 /// The last recorded time (in UNIX nanoseconds). Updated atomically with compare-and-exchange
164 /// in **real-time mode**, or simple store/fetch in **static mode**.
165 pub timestamp_ns: AtomicU64,
166}
167
168impl Deref for AtomicTime {
169 type Target = AtomicU64;
170
171 fn deref(&self) -> &Self::Target {
172 &self.timestamp_ns
173 }
174}
175
176impl Default for AtomicTime {
177 /// Creates a new default [`AtomicTime`] instance in **real-time mode**, starting at the current system time.
178 fn default() -> Self {
179 Self::new(true, UnixNanos::default())
180 }
181}
182
183impl AtomicTime {
184 /// Creates a new [`AtomicTime`] instance.
185 ///
186 /// - If `realtime` is `true`, the provided `time` is used only as an initial placeholder
187 /// and will quickly be overridden by calls to [`AtomicTime::time_since_epoch`].
188 /// - If `realtime` is `false`, this clock starts in **static mode**, with the given `time`
189 /// as its current value.
190 #[must_use]
191 pub fn new(realtime: bool, time: UnixNanos) -> Self {
192 Self {
193 realtime: AtomicBool::new(realtime),
194 timestamp_ns: AtomicU64::new(time.into()),
195 }
196 }
197
198 /// Returns the current time in nanoseconds, based on the clock’s mode.
199 ///
200 /// - In **real-time mode**, calls [`AtomicTime::time_since_epoch`], ensuring strictly increasing
201 /// timestamps across threads, using `AcqRel` semantics for the underlying atomic.
202 /// - In **static mode**, reads the stored time using [`Ordering::Acquire`]. Updates by other
203 /// threads using [`AtomicTime::set_time`] or [`AtomicTime::increment_time`] (Release/AcqRel)
204 /// will be visible here.
205 ///
206 /// # Thread Safety
207 ///
208 /// The mode check is not atomic with the subsequent read/update. If another thread
209 /// switches modes between the check and the operation, one stale-mode result may be
210 /// returned. This is intentional: mode switching is a setup-time operation and should
211 /// not occur concurrently with time operations.
212 #[must_use]
213 pub fn get_time_ns(&self) -> UnixNanos {
214 if self.realtime.load(Ordering::Acquire) {
215 self.time_since_epoch()
216 } else {
217 UnixNanos::from(self.timestamp_ns.load(Ordering::Acquire))
218 }
219 }
220
221 /// Returns the current time as microseconds.
222 #[must_use]
223 pub fn get_time_us(&self) -> u64 {
224 self.get_time_ns().as_u64() / NANOSECONDS_IN_MICROSECOND
225 }
226
227 /// Returns the current time as milliseconds.
228 #[must_use]
229 pub fn get_time_ms(&self) -> u64 {
230 self.get_time_ns().as_u64() / NANOSECONDS_IN_MILLISECOND
231 }
232
233 /// Returns the current time as seconds.
234 #[must_use]
235 #[expect(
236 clippy::cast_precision_loss,
237 reason = "Precision loss acceptable for time conversion"
238 )]
239 pub fn get_time(&self) -> f64 {
240 self.get_time_ns().as_f64() / (NANOSECONDS_IN_SECOND as f64)
241 }
242
243 /// Manually sets a new time for the clock (only possible in **static mode**).
244 ///
245 /// This uses an atomic store with [`Ordering::Release`], so any thread reading with
246 /// [`Ordering::Acquire`] will see the updated time. This does *not* enforce a total ordering
247 /// among all threads, but is enough to ensure that once a thread sees this update, it also
248 /// sees all writes made before this call in the writing thread.
249 ///
250 /// Typically used in single-threaded scenarios or coordinated concurrency in **static mode**,
251 /// since there's no global ordering across threads.
252 ///
253 /// # Panics
254 ///
255 /// Panics if invoked when in real-time mode.
256 ///
257 /// # Thread Safety
258 ///
259 /// The mode check is not atomic with the subsequent store. If another thread calls
260 /// `make_realtime()` between the check and store, the invariant can be violated.
261 /// This is intentional: mode switching is a setup-time operation and should not
262 /// occur concurrently with time operations. Callers must ensure mode switches are
263 /// complete before resuming time operations.
264 pub fn set_time(&self, time: UnixNanos) {
265 assert!(
266 !self.realtime.load(Ordering::SeqCst),
267 "Cannot set time while clock is in realtime mode"
268 );
269
270 self.store(time.into(), Ordering::Release);
271
272 debug_assert!(
273 !self.realtime.load(Ordering::SeqCst),
274 "Invariant: clock must remain in static mode across `set_time`"
275 );
276 }
277
278 /// Increments the current (static-mode) time by `delta` nanoseconds and returns the updated value.
279 ///
280 /// Internally this uses [`AtomicU64::fetch_update`] with [`Ordering::AcqRel`] to ensure the increment is
281 /// atomic and visible to readers using `Acquire` loads.
282 ///
283 /// # Errors
284 ///
285 /// Returns an error if the increment would overflow `u64::MAX` or if called
286 /// while the clock is in real-time mode.
287 ///
288 /// # Thread Safety
289 ///
290 /// The mode check is not atomic with the subsequent update. If another thread calls
291 /// `make_realtime()` between the check and update, the invariant can be violated.
292 /// This is intentional: mode switching is a setup-time operation and should not
293 /// occur concurrently with time operations. Callers must ensure mode switches are
294 /// complete before resuming time operations.
295 pub fn increment_time(&self, delta: u64) -> anyhow::Result<UnixNanos> {
296 anyhow::ensure!(
297 !self.realtime.load(Ordering::SeqCst),
298 "Cannot increment time while clock is in realtime mode"
299 );
300
301 let previous =
302 match self
303 .timestamp_ns
304 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
305 current.checked_add(delta)
306 }) {
307 Ok(prev) => prev,
308 Err(_) => anyhow::bail!("Cannot increment time beyond u64::MAX"),
309 };
310
311 debug_assert!(
312 !self.realtime.load(Ordering::SeqCst),
313 "Invariant: clock must remain in static mode across `increment_time`"
314 );
315
316 Ok(UnixNanos::from(previous + delta))
317 }
318
319 /// Retrieves and updates the current “real-time” clock, returning a strictly increasing
320 /// timestamp based on system time.
321 ///
322 /// Internally:
323 /// - We fetch `now` from [`SystemTime::now()`].
324 /// - We do an atomic compare-and-exchange (using [`Ordering::AcqRel`]) to ensure the stored
325 /// timestamp is never less than the last timestamp.
326 ///
327 /// This ensures:
328 /// 1. **Monotonic increments**: The returned timestamp is strictly greater than the previous
329 /// one (by at least 1 nanosecond).
330 /// 2. **No backward jumps**: If the OS time moves backward, we ignore that shift to preserve
331 /// monotonicity.
332 /// 3. **Visibility**: In a multi-threaded environment, other threads see the updated value
333 /// once this compare-and-exchange completes.
334 ///
335 /// # Panics
336 ///
337 /// Panics if the internal counter has reached `u64::MAX`, which would indicate the process has
338 /// been running for longer than the representable range (~584 years) *or* the clock was
339 /// manually corrupted.
340 pub fn time_since_epoch(&self) -> UnixNanos {
341 // This method guarantees strict consistency but may incur a performance cost under
342 // high contention due to retries in the `compare_exchange` loop.
343 let now = nanos_since_unix_epoch();
344
345 loop {
346 // Acquire to observe the latest stored value
347 let last = self.load(Ordering::Acquire);
348 // Ensure we never wrap past u64::MAX – treat that as a fatal error
349 let incremented = last
350 .checked_add(1)
351 .expect("AtomicTime overflow: reached u64::MAX");
352 let next = now.max(incremented);
353 // AcqRel on success ensures this new value is published,
354 // Acquire on failure reloads if we lost a CAS race.
355 //
356 // Note that under heavy contention (many threads calling this in tight loops),
357 // the CAS loop may increase latency.
358 //
359 // However, in practice, the loop terminates quickly because:
360 // - System time naturally advances between iterations
361 // - Each iteration increments time by at least 1ns, preventing ABA problems
362 // - True contention requiring retry is rare in normal usage patterns
363 //
364 // The concurrent stress test (4 threads × 100k iterations) validates this approach.
365 if self
366 .compare_exchange(last, next, Ordering::AcqRel, Ordering::Acquire)
367 .is_ok()
368 {
369 debug_assert!(
370 next > last,
371 "Invariant: time is strictly monotonic across CAS"
372 );
373 return UnixNanos::from(next);
374 }
375 }
376 }
377
378 /// Switches the clock to **real-time mode** (`realtime = true`).
379 ///
380 /// If transitioning from static mode, the internal counter is reset to the current
381 /// wall-clock time so that [`AtomicTime::time_since_epoch`] does not carry forward a
382 /// timestamp set during static mode (e.g. a backtest far in the future).
383 ///
384 /// Uses [`Ordering::SeqCst`] for the mode flag to ensure global ordering.
385 pub fn make_realtime(&self) {
386 if !self.realtime.swap(true, Ordering::SeqCst) {
387 self.timestamp_ns
388 .store(nanos_since_unix_epoch(), Ordering::Release);
389 }
390 }
391
392 /// Switches the clock to **static mode** (`realtime = false`).
393 ///
394 /// If transitioning from real-time mode, the internal counter is snapshotted to the
395 /// current wall-clock time so that subsequent static reads return a reasonable value
396 /// rather than a stale or zero placeholder.
397 ///
398 /// Uses [`Ordering::SeqCst`] for the mode flag to ensure global ordering.
399 pub fn make_static(&self) {
400 if self.realtime.swap(false, Ordering::SeqCst) {
401 self.timestamp_ns
402 .store(nanos_since_unix_epoch(), Ordering::Release);
403 }
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use std::sync::Arc;
410
411 use rstest::*;
412
413 use super::*;
414
415 #[rstest]
416 fn test_global_clocks_initialization() {
417 let realtime_clock = get_atomic_clock_realtime();
418 assert!(realtime_clock.get_time_ns().as_u64() > 0);
419
420 let static_clock = get_atomic_clock_static();
421 static_clock.set_time(UnixNanos::from(500_000_000)); // 500 ms
422 assert_eq!(static_clock.get_time_ns().as_u64(), 500_000_000);
423 }
424
425 #[rstest]
426 fn test_mode_switching() {
427 let time = AtomicTime::new(true, UnixNanos::default());
428
429 // Verify real-time mode
430 let realtime_ns = time.get_time_ns();
431 assert!(realtime_ns.as_u64() > 0);
432
433 // Switch to static mode
434 time.make_static();
435 time.set_time(UnixNanos::from(1_000_000_000)); // 1 second
436 let static_ns = time.get_time_ns();
437 assert_eq!(static_ns.as_u64(), 1_000_000_000);
438
439 // Switch back to real-time mode
440 time.make_realtime();
441 let new_realtime_ns = time.get_time_ns();
442 assert!(new_realtime_ns.as_u64() > static_ns.as_u64());
443 }
444
445 #[rstest]
446 #[should_panic(expected = "Cannot set time while clock is in realtime mode")]
447 fn test_set_time_panics_in_realtime_mode() {
448 let clock = AtomicTime::new(true, UnixNanos::default());
449 clock.set_time(UnixNanos::from(123));
450 }
451
452 #[rstest]
453 fn test_increment_time_returns_error_in_realtime_mode() {
454 let clock = AtomicTime::new(true, UnixNanos::default());
455 let result = clock.increment_time(1);
456 assert!(result.is_err());
457 assert!(
458 result
459 .unwrap_err()
460 .to_string()
461 .contains("Cannot increment time while clock is in realtime mode")
462 );
463 }
464
465 #[rstest]
466 #[should_panic(expected = "AtomicTime overflow")]
467 fn test_time_since_epoch_overflow_panics() {
468 use std::sync::atomic::{AtomicBool, AtomicU64};
469
470 // Manually construct a clock with the counter already at u64::MAX
471 let clock = AtomicTime {
472 realtime: AtomicBool::new(true),
473 timestamp_ns: AtomicU64::new(u64::MAX),
474 };
475
476 // This call will attempt to add 1 and must panic
477 let _ = clock.time_since_epoch();
478 }
479
480 #[rstest]
481 fn test_make_static_snapshots_wall_time() {
482 // A fresh realtime clock that has never been read starts with timestamp_ns = 0.
483 // Switching to static should snapshot wall time, not leave it at 0.
484 let clock = AtomicTime::new(true, UnixNanos::default());
485 clock.make_static();
486 let ts = clock.get_time_ns();
487 assert!(
488 ts.as_u64() > 1_650_000_000_000_000_000,
489 "Expected wall-clock snapshot, was {ts}"
490 );
491 }
492
493 #[rstest]
494 fn test_make_realtime_resets_future_timestamp() {
495 // If static mode set the clock into the future, switching to realtime
496 // should reset to wall time so timestamps are not poisoned.
497 let clock = AtomicTime::new(false, UnixNanos::from(u64::MAX - 1_000));
498 clock.make_realtime();
499 let ts = clock.get_time_ns();
500 // Should be near current wall time, not near u64::MAX
501 let now = nanos_since_unix_epoch();
502 assert!(
503 ts.as_u64() <= now + 1_000_000_000, // within 1 second
504 "Expected wall-clock time, was {ts} (now={now})"
505 );
506 }
507
508 #[rstest]
509 fn test_make_static_idempotent() {
510 // Calling make_static on an already-static clock should not change the time
511 let clock = AtomicTime::new(false, UnixNanos::from(42));
512 clock.make_static();
513 assert_eq!(clock.get_time_ns(), UnixNanos::from(42));
514 }
515
516 #[rstest]
517 fn test_make_realtime_idempotent() {
518 // Calling make_realtime on an already-realtime clock should not reset the counter
519 let clock = AtomicTime::new(true, UnixNanos::default());
520 let ts1 = clock.get_time_ns();
521 clock.make_realtime(); // already realtime, should be a no-op
522 let ts2 = clock.get_time_ns();
523 assert!(ts2 >= ts1);
524 }
525
526 #[rstest]
527 fn test_static_time_is_stable() {
528 // Create a clock in static mode with an initial value
529 let clock = AtomicTime::new(false, UnixNanos::from(42));
530 let time1 = clock.get_time_ns();
531
532 // Sleep a bit to give the system time to change, if the clock were using real-time
533 std::thread::sleep(std::time::Duration::from_millis(10));
534 let time2 = clock.get_time_ns();
535
536 // In static mode, the value should remain unchanged
537 assert_eq!(time1, time2);
538 }
539
540 #[rstest]
541 fn test_increment_time() {
542 // Start in static mode
543 let time = AtomicTime::new(false, UnixNanos::from(0));
544
545 let updated_time = time.increment_time(500).unwrap();
546 assert_eq!(updated_time.as_u64(), 500);
547
548 let updated_time = time.increment_time(1_000).unwrap();
549 assert_eq!(updated_time.as_u64(), 1_500);
550 }
551
552 #[rstest]
553 fn test_increment_time_overflow_errors() {
554 let time = AtomicTime::new(false, UnixNanos::from(u64::MAX - 5));
555
556 let err = time.increment_time(10).unwrap_err();
557 assert_eq!(err.to_string(), "Cannot increment time beyond u64::MAX");
558 }
559
560 #[rstest]
561 #[expect(
562 clippy::cast_possible_truncation,
563 clippy::cast_possible_wrap,
564 reason = "Intentional cast for Python interop"
565 )]
566 fn test_nanos_since_unix_epoch_vs_system_time() {
567 let unix_nanos = nanos_since_unix_epoch();
568 let system_ns = duration_since_unix_epoch().as_nanos() as u64;
569 assert!((unix_nanos as i64 - system_ns as i64).abs() < NANOSECONDS_IN_SECOND as i64);
570 }
571
572 #[rstest]
573 fn test_time_since_epoch_monotonicity() {
574 let clock = get_atomic_clock_realtime();
575 let mut previous = clock.time_since_epoch();
576 for _ in 0..1_000_000 {
577 let current = clock.time_since_epoch();
578 assert!(current > previous);
579 previous = current;
580 }
581 }
582
583 #[rstest]
584 fn test_time_since_epoch_strictly_increasing_concurrent() {
585 let time = Arc::new(AtomicTime::new(true, UnixNanos::default()));
586 let num_threads = 4;
587 let iterations = 100_000;
588 let mut handles = Vec::with_capacity(num_threads);
589
590 for thread_id in 0..num_threads {
591 let time_clone = Arc::clone(&time);
592
593 let handle = std::thread::spawn(move || {
594 let mut previous = time_clone.time_since_epoch().as_u64();
595
596 for i in 0..iterations {
597 let current = time_clone.time_since_epoch().as_u64();
598 assert!(
599 current > previous,
600 "Thread {thread_id}: iteration {i}: time did not increase: previous={previous}, current={current}",
601 );
602 previous = current;
603 }
604 });
605
606 handles.push(handle);
607 }
608
609 for handle in handles {
610 handle.join().unwrap();
611 }
612 }
613
614 #[rstest]
615 fn test_duration_since_unix_epoch() {
616 let time = AtomicTime::new(true, UnixNanos::default());
617 let duration = Duration::from_nanos(time.get_time_ns().into());
618 let now = SystemTime::now();
619
620 // Check if the duration is close to the actual difference between now and UNIX_EPOCH
621 let delta = now
622 .duration_since(UNIX_EPOCH)
623 .unwrap()
624 .checked_sub(duration);
625 assert!(delta.unwrap_or_default() < Duration::from_millis(100));
626
627 // Check if the duration is greater than a certain value (assuming the test is run after that point)
628 assert!(duration > Duration::from_mins(27_500_000));
629 }
630
631 #[rstest]
632 fn test_unix_timestamp_is_monotonic_increasing() {
633 let time = AtomicTime::new(true, UnixNanos::default());
634 let result1 = time.get_time();
635 let result2 = time.get_time();
636 let result3 = time.get_time();
637 let result4 = time.get_time();
638 let result5 = time.get_time();
639
640 assert!(result2 >= result1);
641 assert!(result3 >= result2);
642 assert!(result4 >= result3);
643 assert!(result5 >= result4);
644 assert!(result1 > 1_650_000_000.0);
645 }
646
647 #[rstest]
648 fn test_unix_timestamp_ms_is_monotonic_increasing() {
649 let time = AtomicTime::new(true, UnixNanos::default());
650 let result1 = time.get_time_ms();
651 let result2 = time.get_time_ms();
652 let result3 = time.get_time_ms();
653 let result4 = time.get_time_ms();
654 let result5 = time.get_time_ms();
655
656 assert!(result2 >= result1);
657 assert!(result3 >= result2);
658 assert!(result4 >= result3);
659 assert!(result5 >= result4);
660 assert!(result1 >= 1_650_000_000_000);
661 }
662
663 #[rstest]
664 fn test_unix_timestamp_us_is_monotonic_increasing() {
665 let time = AtomicTime::new(true, UnixNanos::default());
666 let result1 = time.get_time_us();
667 let result2 = time.get_time_us();
668 let result3 = time.get_time_us();
669 let result4 = time.get_time_us();
670 let result5 = time.get_time_us();
671
672 assert!(result2 >= result1);
673 assert!(result3 >= result2);
674 assert!(result4 >= result3);
675 assert!(result5 >= result4);
676 assert!(result1 > 1_650_000_000_000_000);
677 }
678
679 #[rstest]
680 fn test_unix_timestamp_ns_is_monotonic_increasing() {
681 let time = AtomicTime::new(true, UnixNanos::default());
682 let result1 = time.get_time_ns();
683 let result2 = time.get_time_ns();
684 let result3 = time.get_time_ns();
685 let result4 = time.get_time_ns();
686 let result5 = time.get_time_ns();
687
688 assert!(result2 >= result1);
689 assert!(result3 >= result2);
690 assert!(result4 >= result3);
691 assert!(result5 >= result4);
692 assert!(result1.as_u64() > 1_650_000_000_000_000_000);
693 }
694
695 #[rstest]
696 fn test_acquire_release_contract_static_mode() {
697 // This test explicitly proves the Acquire/Release memory ordering contract:
698 // - Writer thread uses set_time() which does Release store (see AtomicTime::set_time)
699 // - Reader thread uses get_time_ns() which does Acquire load (see AtomicTime::get_time_ns)
700 // - The Release-Acquire pair ensures all writes before Release are visible after Acquire
701
702 let clock = Arc::new(AtomicTime::new(false, UnixNanos::from(0)));
703 let aux_data = Arc::new(AtomicU64::new(0));
704 let done = Arc::new(AtomicBool::new(false));
705
706 // Writer thread: updates auxiliary data, then releases via set_time
707 let writer_clock = Arc::clone(&clock);
708 let writer_aux = Arc::clone(&aux_data);
709 let writer_done = Arc::clone(&done);
710
711 let writer = std::thread::spawn(move || {
712 for i in 1..=1_000u64 {
713 writer_aux.store(i, Ordering::Relaxed);
714
715 // Release store via set_time creates a release fence - all prior writes (including aux_data)
716 // must be visible to any thread that observes this time value via Acquire load
717 writer_clock.set_time(UnixNanos::from(i * 1000));
718
719 // Yield to encourage interleaving
720 std::thread::yield_now();
721 }
722 writer_done.store(true, Ordering::Release);
723 });
724
725 // Reader thread: acquires via get_time_ns, then checks auxiliary data
726 let reader_clock = Arc::clone(&clock);
727 let reader_aux = Arc::clone(&aux_data);
728 let reader_done = Arc::clone(&done);
729
730 let reader = std::thread::spawn(move || {
731 let mut last_time = 0u64;
732 let mut max_aux_seen = 0u64;
733
734 // Poll until writer is done, with no iteration limit
735 while !reader_done.load(Ordering::Acquire) {
736 let current_time = reader_clock.get_time_ns().as_u64();
737
738 if current_time > last_time {
739 // The Acquire in get_time_ns synchronizes with the Release in set_time,
740 // making aux_data visible
741 let aux_value = reader_aux.load(Ordering::Relaxed);
742
743 // Invariant: aux_value must never go backwards (proves Release-Acquire sync works)
744 if aux_value > 0 {
745 assert!(
746 aux_value >= max_aux_seen,
747 "Acquire/Release contract violated: aux went backwards from {max_aux_seen} to {aux_value}"
748 );
749 max_aux_seen = aux_value;
750 }
751
752 last_time = current_time;
753 }
754
755 std::thread::yield_now();
756 }
757
758 // Check final state after writer completes to ensure we observe updates
759 let final_time = reader_clock.get_time_ns().as_u64();
760 if final_time > last_time {
761 let final_aux = reader_aux.load(Ordering::Relaxed);
762 if final_aux > 0 {
763 assert!(
764 final_aux >= max_aux_seen,
765 "Acquire/Release contract violated: final aux {final_aux} < max {max_aux_seen}"
766 );
767 max_aux_seen = final_aux;
768 }
769 }
770
771 max_aux_seen
772 });
773
774 writer.join().unwrap();
775 let max_observed = reader.join().unwrap();
776
777 // Ensure the reader actually observed updates (not vacuously satisfied)
778 assert!(max_observed > 0, "Reader must observe writer updates");
779 }
780
781 #[rstest]
782 fn test_acquire_release_contract_increment_time() {
783 // Similar test for increment_time, which uses fetch_update with AcqRel (see AtomicTime::increment_time)
784
785 let clock = Arc::new(AtomicTime::new(false, UnixNanos::from(0)));
786 let aux_data = Arc::new(AtomicU64::new(0));
787 let done = Arc::new(AtomicBool::new(false));
788
789 let writer_clock = Arc::clone(&clock);
790 let writer_aux = Arc::clone(&aux_data);
791 let writer_done = Arc::clone(&done);
792
793 let writer = std::thread::spawn(move || {
794 for i in 1..=1_000u64 {
795 writer_aux.store(i, Ordering::Relaxed);
796 let _ = writer_clock.increment_time(1000).unwrap();
797 std::thread::yield_now();
798 }
799 writer_done.store(true, Ordering::Release);
800 });
801
802 let reader_clock = Arc::clone(&clock);
803 let reader_aux = Arc::clone(&aux_data);
804 let reader_done = Arc::clone(&done);
805
806 let reader = std::thread::spawn(move || {
807 let mut last_time = 0u64;
808 let mut max_aux = 0u64;
809
810 // Poll until writer is done, with no iteration limit
811 while !reader_done.load(Ordering::Acquire) {
812 let current_time = reader_clock.get_time_ns().as_u64();
813
814 if current_time > last_time {
815 let aux_value = reader_aux.load(Ordering::Relaxed);
816
817 // Invariant: aux_value must never regress (proves AcqRel sync works)
818 if aux_value > 0 {
819 assert!(
820 aux_value >= max_aux,
821 "AcqRel contract violated: aux regressed from {max_aux} to {aux_value}"
822 );
823 max_aux = aux_value;
824 }
825
826 last_time = current_time;
827 }
828
829 std::thread::yield_now();
830 }
831
832 // Check final state after writer completes to ensure we observe updates
833 let final_time = reader_clock.get_time_ns().as_u64();
834 if final_time > last_time {
835 let final_aux = reader_aux.load(Ordering::Relaxed);
836 if final_aux > 0 {
837 assert!(
838 final_aux >= max_aux,
839 "AcqRel contract violated: final aux {final_aux} < max {max_aux}"
840 );
841 max_aux = final_aux;
842 }
843 }
844
845 max_aux
846 });
847
848 writer.join().unwrap();
849 let max_observed = reader.join().unwrap();
850
851 // Ensure the reader actually observed updates (not vacuously satisfied)
852 assert!(max_observed > 0, "Reader must observe writer updates");
853 }
854
855 // The wall-clock seam (`wall_clock_now`) routes through madsim's virtual
856 // clock under simulation. Sleeping for 60 virtual seconds must advance
857 // the value returned by `nanos_since_unix_epoch` by 60s in wall-clock
858 // terms. If the cfg gate fell through to `SystemTime::now()`, the elapsed
859 // value would only reflect real wall-clock time (~0ms) and the assertion
860 // would fail.
861 #[cfg(all(feature = "simulation", madsim))]
862 #[madsim::test]
863 async fn test_wall_clock_advances_with_virtual_time() {
864 let before = nanos_since_unix_epoch();
865 madsim::time::sleep(std::time::Duration::from_secs(60)).await;
866 let after = nanos_since_unix_epoch();
867
868 let elapsed_ns = after.saturating_sub(before);
869 let sixty_seconds_ns = 60 * NANOSECONDS_IN_SECOND;
870 assert!(
871 elapsed_ns >= sixty_seconds_ns,
872 "wall clock did not advance by full virtual sleep: elapsed={elapsed_ns}ns"
873 );
874 }
875}