Skip to main content

nautilus_network/ratelimiter/
clock.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Time sources for rate limiters.
17//!
18//! The time sources contained in this module allow the rate limiter
19//! to be (optionally) independent of std, and additionally
20//! allow mocking the passage of time.
21//!
22//! You can supply a custom time source by implementing both [`Reference`]
23//! and [`Clock`] for your own types, and by implementing `Add<Nanos>` for
24//! your [`Reference`] type:
25use std::{
26    fmt::Debug,
27    future::Future,
28    ops::Add,
29    prelude::v1::*,
30    sync::{
31        Arc,
32        atomic::{AtomicU64, Ordering},
33    },
34    time::Duration,
35};
36
37use super::nanos::Nanos;
38use crate::dst::time::Instant;
39
40/// A measurement from a clock.
41pub trait Reference:
42    Sized + Add<Nanos, Output = Self> + PartialEq + Eq + Ord + Copy + Clone + Send + Sync + Debug
43{
44    /// Determines the time that separates two measurements of a
45    /// clock. Implementations of this must perform a saturating
46    /// subtraction - if the `earlier` timestamp should be later,
47    /// `duration_since` must return the zero duration.
48    fn duration_since(&self, earlier: Self) -> Nanos;
49
50    /// Returns a reference point that lies at most `duration` in the
51    /// past from the current reference. If an underflow should occur,
52    /// returns the current reference.
53    #[must_use]
54    fn saturating_sub(&self, duration: Nanos) -> Self;
55}
56
57/// A time source used by rate limiters.
58pub trait Clock: Clone {
59    /// A measurement of a monotonically increasing clock.
60    type Instant: Reference;
61
62    /// Returns a measurement of the clock.
63    fn now(&self) -> Self::Instant;
64
65    /// Waits for `duration` on this clock's time base.
66    ///
67    /// Implementations must advance on the same clock as [`Clock::now`] so
68    /// callers using `sleep` together with `now` observe consistent time
69    /// under both real and simulated runtimes.
70    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + '_;
71}
72
73impl Reference for Duration {
74    /// The internal duration between this point and another.
75    fn duration_since(&self, earlier: Self) -> Nanos {
76        self.checked_sub(earlier)
77            .unwrap_or_else(|| Self::new(0, 0))
78            .into()
79    }
80
81    /// The internal duration between this point and another.
82    fn saturating_sub(&self, duration: Nanos) -> Self {
83        self.checked_sub(duration.into()).unwrap_or(*self)
84    }
85}
86
87impl Add<Nanos> for Duration {
88    type Output = Self;
89
90    fn add(self, other: Nanos) -> Self {
91        let other: Self = other.into();
92        self + other
93    }
94}
95
96/// A mock implementation of a clock. All it does is keep track of
97/// what "now" is (relative to some point meaningful to the program),
98/// and returns that.
99///
100/// # Thread Safety
101///
102/// The mock time is represented as an atomic u64 count of nanoseconds, behind an [`Arc`].
103/// Clones of this clock will all show the same time, even if the original advances.
104#[derive(Debug, Clone, Default)]
105pub struct FakeRelativeClock {
106    now: Arc<AtomicU64>,
107}
108
109impl FakeRelativeClock {
110    /// Advances the fake clock by the given amount.
111    ///
112    /// # Panics
113    ///
114    /// Panics if `by` cannot be represented as a `u64` number of nanoseconds (i.e., exceeds 584 years).
115    pub fn advance(&self, by: Duration) {
116        let by: u64 = by
117            .as_nanos()
118            .try_into()
119            .expect("Cannot represent durations greater than 584 years");
120
121        let mut prev = self.now.load(Ordering::Acquire);
122        let mut next = prev + by;
123
124        while let Err(e) =
125            self.now
126                .compare_exchange_weak(prev, next, Ordering::Release, Ordering::Relaxed)
127        {
128            prev = e;
129            next = prev + by;
130        }
131    }
132}
133
134impl PartialEq for FakeRelativeClock {
135    fn eq(&self, other: &Self) -> bool {
136        self.now.load(Ordering::Relaxed) == other.now.load(Ordering::Relaxed)
137    }
138}
139
140impl Clock for FakeRelativeClock {
141    type Instant = Nanos;
142
143    fn now(&self) -> Self::Instant {
144        self.now.load(Ordering::Relaxed).into()
145    }
146
147    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + '_ {
148        self.advance(duration);
149        std::future::ready(())
150    }
151}
152
153/// The monotonic clock implemented by [`Instant`].
154#[derive(Clone, Debug, Default)]
155pub struct MonotonicClock;
156
157impl Add<Nanos> for Instant {
158    type Output = Self;
159
160    fn add(self, other: Nanos) -> Self {
161        let other: Duration = other.into();
162        self + other
163    }
164}
165
166impl Reference for Instant {
167    fn duration_since(&self, earlier: Self) -> Nanos {
168        if earlier < *self {
169            (*self - earlier).into()
170        } else {
171            Nanos::from(Duration::new(0, 0))
172        }
173    }
174
175    fn saturating_sub(&self, duration: Nanos) -> Self {
176        self.checked_sub(duration.into()).unwrap_or(*self)
177    }
178}
179
180impl Clock for MonotonicClock {
181    type Instant = Instant;
182
183    fn now(&self) -> Self::Instant {
184        Instant::now()
185    }
186
187    async fn sleep(&self, duration: Duration) {
188        #[cfg(not(all(feature = "simulation", madsim)))]
189        tokio::time::sleep(duration).await;
190        #[cfg(all(feature = "simulation", madsim))]
191        madsim::time::sleep(duration).await;
192    }
193}
194
195#[cfg(test)]
196mod test {
197    use std::{sync::Arc, thread, time::Duration};
198
199    use rstest::rstest;
200
201    use super::*;
202
203    #[rstest]
204    fn fake_clock_parallel_advances() {
205        let clock = Arc::new(FakeRelativeClock::default());
206        let threads = std::iter::repeat_n((), 10)
207            .map(move |()| {
208                let clock = Arc::clone(&clock);
209
210                thread::spawn(move || {
211                    for _ in 0..1_000_000 {
212                        let now = clock.now();
213                        clock.advance(Duration::from_nanos(1));
214                        assert!(clock.now() > now);
215                    }
216                })
217            })
218            .collect::<Vec<_>>();
219
220        for t in threads {
221            t.join().unwrap();
222        }
223    }
224
225    #[rstest]
226    fn duration_addition_coverage() {
227        let d = Duration::from_secs(1);
228        let one_ns = Nanos::from(1);
229        assert!(d + one_ns > d);
230    }
231
232    // Under madsim, `MonotonicClock::sleep` runs on the virtual clock with
233    // sub-ms scheduling epsilon. If the cfg gate fell through to real tokio,
234    // `sleep` would block on the OS scheduler with ~5-15ms of jitter and the
235    // tight upper bound would fail.
236    #[cfg(all(feature = "simulation", madsim))]
237    #[madsim::test]
238    async fn test_monotonic_clock_sleep_uses_virtual_time() {
239        let clock = MonotonicClock;
240        let start = Instant::now();
241        clock.sleep(Duration::from_millis(100)).await;
242        let elapsed = start.elapsed();
243        assert!(elapsed >= Duration::from_millis(100));
244        assert!(
245            elapsed < Duration::from_millis(101),
246            "virtual sleep showed real-tokio jitter: {elapsed:?}"
247        );
248    }
249}