Skip to main content

nautilus_network/
retry.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Generic retry mechanism for network operations.
17
18use std::{future::Future, marker::PhantomData, time::Duration};
19
20use serde::{Deserialize, Serialize};
21use tokio_util::sync::CancellationToken;
22
23use crate::{backoff::ExponentialBackoff, dst};
24
25/// Configuration for retry behavior.
26#[derive(Debug, Clone, Deserialize, Serialize)]
27#[serde(default, deny_unknown_fields)]
28pub struct RetryConfig {
29    /// Maximum number of retry attempts (total attempts = 1 initial + `max_retries`).
30    pub max_retries: u32,
31    /// Initial delay between retries in milliseconds.
32    pub initial_delay_ms: u64,
33    /// Maximum delay between retries in milliseconds.
34    pub max_delay_ms: u64,
35    /// Backoff multiplier factor.
36    pub backoff_factor: f64,
37    /// Maximum jitter in milliseconds to add to delays.
38    pub jitter_ms: u64,
39    /// Optional timeout for individual operations in milliseconds.
40    /// If None, no timeout is applied.
41    pub operation_timeout_ms: Option<u64>,
42    /// Whether the first retry should happen immediately without delay.
43    /// Should be false for HTTP/order operations, true for connection operations.
44    pub immediate_first: bool,
45    /// Optional maximum total elapsed time for all retries in milliseconds.
46    /// If exceeded, retries stop even if `max_retries` hasn't been reached.
47    pub max_elapsed_ms: Option<u64>,
48}
49
50impl Default for RetryConfig {
51    fn default() -> Self {
52        Self {
53            max_retries: 3,
54            initial_delay_ms: 1_000,
55            max_delay_ms: 10_000,
56            backoff_factor: 2.0,
57            jitter_ms: 100,
58            operation_timeout_ms: Some(30_000),
59            immediate_first: false,
60            max_elapsed_ms: None,
61        }
62    }
63}
64
65/// Generic retry manager for network operations.
66///
67/// Stateless and thread-safe - each operation maintains its own backoff state.
68#[derive(Clone, Debug)]
69pub struct RetryManager<E> {
70    config: RetryConfig,
71    _phantom: PhantomData<E>,
72}
73
74impl<E> RetryManager<E>
75where
76    E: std::error::Error,
77{
78    /// Creates a new retry manager with the given configuration.
79    #[must_use]
80    pub const fn new(config: RetryConfig) -> Self {
81        Self {
82            config,
83            _phantom: PhantomData,
84        }
85    }
86
87    /// Formats a retry budget exceeded error message with attempt context.
88    #[inline(always)]
89    fn budget_exceeded_msg(&self, attempt: u32) -> String {
90        format!(
91            "Retry budget exceeded ({}/{})",
92            attempt.saturating_add(1),
93            self.config.max_retries.saturating_add(1)
94        )
95    }
96
97    /// Executes an operation with retry logic and optional cancellation.
98    ///
99    /// Cancellation is checked at three points:
100    /// (1) Before each operation attempt.
101    /// (2) During operation execution (via `tokio::select!`).
102    /// (3) During retry delays.
103    ///
104    /// This means cancellation may be delayed by up to one operation timeout if it occurs mid-execution.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if the operation fails after exhausting all retries,
109    /// if the operation times out, if creating the backoff state fails, or if canceled.
110    pub async fn execute_with_retry_inner<F, Fut, T>(
111        &self,
112        operation_name: &str,
113        mut operation: F,
114        should_retry: impl Fn(&E) -> bool,
115        create_error: impl Fn(String) -> E,
116        cancel: Option<&CancellationToken>,
117    ) -> Result<T, E>
118    where
119        F: FnMut() -> Fut,
120        Fut: Future<Output = Result<T, E>>,
121    {
122        let mut backoff = ExponentialBackoff::new(
123            Duration::from_millis(self.config.initial_delay_ms),
124            Duration::from_millis(self.config.max_delay_ms),
125            self.config.backoff_factor,
126            self.config.jitter_ms,
127            self.config.immediate_first,
128        )
129        .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
130
131        let mut attempt = 0;
132        let start_time = dst::time::Instant::now();
133
134        loop {
135            if let Some(token) = cancel
136                && token.is_cancelled()
137            {
138                log::debug!("Operation '{operation_name}' canceled after {attempt} attempts");
139                return Err(create_error("canceled".to_string()));
140            }
141
142            if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
143                let elapsed = start_time.elapsed();
144                if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
145                    return Err(create_error(self.budget_exceeded_msg(attempt)));
146                }
147            }
148
149            let result = match (self.config.operation_timeout_ms, cancel) {
150                (Some(timeout_ms), Some(token)) => {
151                    tokio::select! {
152                        biased;
153                        result = dst::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
154                        () = token.cancelled() => {
155                            log::debug!("Operation '{operation_name}' canceled during execution");
156                            return Err(create_error("canceled".to_string()));
157                        }
158                    }
159                }
160                (Some(timeout_ms), None) => {
161                    dst::time::timeout(Duration::from_millis(timeout_ms), operation()).await
162                }
163                (None, Some(token)) => {
164                    tokio::select! {
165                        biased;
166                        result = operation() => Ok(result),
167                        () = token.cancelled() => {
168                            log::debug!("Operation '{operation_name}' canceled during execution");
169                            return Err(create_error("canceled".to_string()));
170                        }
171                    }
172                }
173                (None, None) => Ok(operation().await),
174            };
175
176            match result {
177                Ok(Ok(success)) => {
178                    if attempt > 0 {
179                        log::trace!(
180                            "Operation '{operation_name}' succeeded after {} attempts",
181                            attempt + 1
182                        );
183                    }
184                    return Ok(success);
185                }
186                Ok(Err(e)) => {
187                    if !should_retry(&e) {
188                        log::trace!("Operation '{operation_name}' non-retryable error: {e}");
189                        return Err(e);
190                    }
191
192                    if attempt >= self.config.max_retries {
193                        log::trace!(
194                            "Operation '{operation_name}' retries exhausted after {} attempts: {e}",
195                            attempt + 1
196                        );
197                        return Err(e);
198                    }
199
200                    let mut delay = backoff.next_duration();
201
202                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
203                        let elapsed = start_time.elapsed();
204                        let remaining =
205                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
206
207                        if remaining.is_zero() {
208                            return Err(create_error(self.budget_exceeded_msg(attempt)));
209                        }
210
211                        delay = delay.min(remaining);
212                    }
213
214                    log::trace!(
215                        "Operation '{operation_name}' attempt {} failed, retrying in {}ms: {e}",
216                        attempt + 1,
217                        delay.as_millis()
218                    );
219
220                    // Yield even on zero-delay to avoid busy-wait loop
221                    if delay.is_zero() {
222                        tokio::task::yield_now().await;
223                        attempt += 1;
224                        continue;
225                    }
226
227                    if let Some(token) = cancel {
228                        tokio::select! {
229                            biased;
230                            () = dst::time::sleep(delay) => {},
231                            () = token.cancelled() => {
232                                log::debug!("Operation '{operation_name}' canceled during retry delay (attempt {})", attempt + 1);
233                                return Err(create_error("canceled".to_string()));
234                            }
235                        }
236                    } else {
237                        dst::time::sleep(delay).await;
238                    }
239                    attempt += 1;
240                }
241                Err(_) => {
242                    let e = create_error(format!(
243                        "Timed out after {}ms",
244                        self.config.operation_timeout_ms.unwrap_or(0)
245                    ));
246
247                    if !should_retry(&e) {
248                        log::trace!("Operation '{operation_name}' non-retryable timeout: {e}");
249                        return Err(e);
250                    }
251
252                    if attempt >= self.config.max_retries {
253                        log::trace!(
254                            "Operation '{operation_name}' retries exhausted after timeout ({} attempts): {e}",
255                            attempt + 1
256                        );
257                        return Err(e);
258                    }
259
260                    let mut delay = backoff.next_duration();
261
262                    if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
263                        let elapsed = start_time.elapsed();
264                        let remaining =
265                            Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
266
267                        if remaining.is_zero() {
268                            return Err(create_error(self.budget_exceeded_msg(attempt)));
269                        }
270
271                        delay = delay.min(remaining);
272                    }
273
274                    log::trace!(
275                        "Operation '{operation_name}' attempt {} timed out, retrying in {}ms: {e}",
276                        attempt + 1,
277                        delay.as_millis()
278                    );
279
280                    // Yield even on zero-delay to avoid busy-wait loop
281                    if delay.is_zero() {
282                        tokio::task::yield_now().await;
283                        attempt += 1;
284                        continue;
285                    }
286
287                    if let Some(token) = cancel {
288                        tokio::select! {
289                            biased;
290                            () = dst::time::sleep(delay) => {},
291                            () = token.cancelled() => {
292                                log::debug!("Operation '{operation_name}' canceled during retry delay (attempt {})", attempt + 1);
293                                return Err(create_error("canceled".to_string()));
294                            }
295                        }
296                    } else {
297                        dst::time::sleep(delay).await;
298                    }
299                    attempt += 1;
300                }
301            }
302        }
303    }
304
305    /// Executes an operation with retry logic.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if the operation fails after exhausting all retries,
310    /// if the operation times out, or if creating the backoff state fails.
311    pub async fn execute_with_retry<F, Fut, T>(
312        &self,
313        operation_name: &str,
314        operation: F,
315        should_retry: impl Fn(&E) -> bool,
316        create_error: impl Fn(String) -> E,
317    ) -> Result<T, E>
318    where
319        F: FnMut() -> Fut,
320        Fut: Future<Output = Result<T, E>>,
321    {
322        self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
323            .await
324    }
325
326    /// Executes an operation with retry logic and cancellation support.
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if the operation fails after exhausting all retries,
331    /// if the operation times out, if creating the backoff state fails, or if canceled.
332    pub async fn execute_with_retry_with_cancel<F, Fut, T>(
333        &self,
334        operation_name: &str,
335        operation: F,
336        should_retry: impl Fn(&E) -> bool,
337        create_error: impl Fn(String) -> E,
338        cancellation_token: &CancellationToken,
339    ) -> Result<T, E>
340    where
341        F: FnMut() -> Fut,
342        Fut: Future<Output = Result<T, E>>,
343    {
344        self.execute_with_retry_inner(
345            operation_name,
346            operation,
347            should_retry,
348            create_error,
349            Some(cancellation_token),
350        )
351        .await
352    }
353}
354
355/// Convenience function to create a retry manager with default configuration.
356#[must_use]
357pub fn create_default_retry_manager<E>() -> RetryManager<E>
358where
359    E: std::error::Error,
360{
361    RetryManager::new(RetryConfig::default())
362}
363
364/// Convenience function to create a retry manager for HTTP operations.
365#[must_use]
366pub const fn create_http_retry_manager<E>() -> RetryManager<E>
367where
368    E: std::error::Error,
369{
370    let config = RetryConfig {
371        max_retries: 3,
372        initial_delay_ms: 1_000,
373        max_delay_ms: 10_000,
374        backoff_factor: 2.0,
375        jitter_ms: 1_000,
376        operation_timeout_ms: Some(60_000), // 60s for HTTP requests
377        immediate_first: false,
378        max_elapsed_ms: Some(180_000), // 3 minutes total budget
379    };
380    RetryManager::new(config)
381}
382
383/// Convenience function to create a retry manager for WebSocket operations.
384#[must_use]
385pub const fn create_websocket_retry_manager<E>() -> RetryManager<E>
386where
387    E: std::error::Error,
388{
389    let config = RetryConfig {
390        max_retries: 5,
391        initial_delay_ms: 1_000,
392        max_delay_ms: 10_000,
393        backoff_factor: 2.0,
394        jitter_ms: 1_000,
395        operation_timeout_ms: Some(30_000), // 30s for WebSocket operations
396        immediate_first: true,
397        max_elapsed_ms: Some(120_000), // 2 minutes total budget
398    };
399    RetryManager::new(config)
400}
401
402#[cfg(test)]
403mod test_utils {
404    #[derive(Debug, thiserror::Error)]
405    pub enum TestError {
406        #[error("Retryable error: {0}")]
407        Retryable(String),
408        #[error("Non-retryable error: {0}")]
409        NonRetryable(String),
410        #[error("Timeout error: {0}")]
411        Timeout(String),
412    }
413
414    pub fn should_retry_test_error(error: &TestError) -> bool {
415        matches!(error, TestError::Retryable(_))
416    }
417
418    pub fn create_test_error(msg: String) -> TestError {
419        TestError::Timeout(msg)
420    }
421}
422
423// Retry tests run under both real tokio (`#[tokio::test]`, paused-clock when
424// the test relies on virtual time advance) and madsim (`#[madsim::test]`,
425// virtual time always paused). `tokio::time::advance` has no direct madsim
426// equivalent, so explicit clock advances route through `advance_clock` below;
427// time reads and sleeps go through the `dst::time` re-export so they pick up
428// the runtime-appropriate clock. madsim auto-advances virtual time when all
429// tasks block, but `yield_until`-style busy-yield loops keep the runtime
430// non-idle, so explicit advances are still needed where they were before.
431#[cfg(test)]
432mod tests {
433    use std::sync::{
434        Arc,
435        atomic::{AtomicU32, Ordering},
436    };
437
438    #[cfg(all(feature = "simulation", madsim))]
439    use madsim::task::{spawn, yield_now};
440    use nautilus_core::MUTEX_POISONED;
441    use rstest::rstest;
442    #[cfg(not(all(feature = "simulation", madsim)))]
443    use tokio::task::{spawn, yield_now};
444
445    use super::{test_utils::*, *};
446    use crate::dst::time;
447
448    const MAX_WAIT_ITERS: usize = 10_000;
449    const MAX_ADVANCE_ITERS: usize = 10_000;
450
451    #[cfg(all(feature = "simulation", madsim))]
452    pub(crate) async fn advance_clock(d: Duration) {
453        madsim::time::advance(d);
454        madsim::task::yield_now().await;
455    }
456
457    #[cfg(not(all(feature = "simulation", madsim)))]
458    pub(crate) async fn advance_clock(d: Duration) {
459        tokio::time::advance(d).await;
460    }
461
462    pub(crate) async fn yield_until<F>(mut condition: F)
463    where
464        F: FnMut() -> bool,
465    {
466        for _ in 0..MAX_WAIT_ITERS {
467            if condition() {
468                return;
469            }
470            yield_now().await;
471        }
472
473        panic!("yield_until timed out waiting for condition");
474    }
475
476    pub(crate) async fn advance_until<F>(mut condition: F)
477    where
478        F: FnMut() -> bool,
479    {
480        for _ in 0..MAX_ADVANCE_ITERS {
481            if condition() {
482                return;
483            }
484            advance_clock(Duration::from_millis(1)).await;
485            yield_now().await;
486        }
487
488        panic!("advance_until timed out waiting for condition");
489    }
490
491    #[rstest]
492    fn test_retry_config_default() {
493        let config = RetryConfig::default();
494        assert_eq!(config.max_retries, 3);
495        assert_eq!(config.initial_delay_ms, 1_000);
496        assert_eq!(config.max_delay_ms, 10_000);
497        #[allow(clippy::float_cmp)]
498        {
499            assert_eq!(config.backoff_factor, 2.0);
500        }
501        assert_eq!(config.jitter_ms, 100);
502        assert_eq!(config.operation_timeout_ms, Some(30_000));
503        assert!(!config.immediate_first);
504        assert_eq!(config.max_elapsed_ms, None);
505    }
506
507    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
508    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
509    async fn test_retry_manager_success_first_attempt() {
510        let manager = RetryManager::new(RetryConfig::default());
511
512        let result = manager
513            .execute_with_retry(
514                "test_operation",
515                || async { Ok::<i32, TestError>(42) },
516                should_retry_test_error,
517                create_test_error,
518            )
519            .await;
520
521        assert_eq!(result.unwrap(), 42);
522    }
523
524    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
525    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
526    async fn test_retry_manager_non_retryable_error() {
527        let manager = RetryManager::new(RetryConfig::default());
528
529        let result = manager
530            .execute_with_retry(
531                "test_operation",
532                || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
533                should_retry_test_error,
534                create_test_error,
535            )
536            .await;
537
538        assert!(result.is_err());
539        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
540    }
541
542    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
543    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
544    async fn test_retry_manager_retryable_error_exhausted() {
545        let config = RetryConfig {
546            max_retries: 2,
547            initial_delay_ms: 10,
548            max_delay_ms: 50,
549            backoff_factor: 2.0,
550            jitter_ms: 0,
551            operation_timeout_ms: None,
552            immediate_first: false,
553            max_elapsed_ms: None,
554        };
555        let manager = RetryManager::new(config);
556
557        let result = manager
558            .execute_with_retry(
559                "test_operation",
560                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
561                should_retry_test_error,
562                create_test_error,
563            )
564            .await;
565
566        assert!(result.is_err());
567        assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
568    }
569
570    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
571    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
572    async fn test_timeout_path() {
573        let config = RetryConfig {
574            max_retries: 2,
575            initial_delay_ms: 10,
576            max_delay_ms: 50,
577            backoff_factor: 2.0,
578            jitter_ms: 0,
579            operation_timeout_ms: Some(50),
580            immediate_first: false,
581            max_elapsed_ms: None,
582        };
583        let manager = RetryManager::new(config);
584
585        let result = manager
586            .execute_with_retry(
587                "test_timeout",
588                || async {
589                    time::sleep(Duration::from_millis(100)).await;
590                    Ok::<i32, TestError>(42)
591                },
592                should_retry_test_error,
593                create_test_error,
594            )
595            .await;
596
597        assert!(result.is_err());
598        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
599    }
600
601    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
602    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
603    async fn test_max_elapsed_time_budget() {
604        let config = RetryConfig {
605            max_retries: 10,
606            initial_delay_ms: 50,
607            max_delay_ms: 100,
608            backoff_factor: 2.0,
609            jitter_ms: 0,
610            operation_timeout_ms: None,
611            immediate_first: false,
612            max_elapsed_ms: Some(200),
613        };
614        let manager = RetryManager::new(config);
615
616        let start = time::Instant::now();
617        let result = manager
618            .execute_with_retry(
619                "test_budget",
620                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
621                should_retry_test_error,
622                create_test_error,
623            )
624            .await;
625
626        let elapsed = start.elapsed();
627        assert!(result.is_err());
628        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
629        assert!(elapsed.as_millis() >= 150);
630        assert!(elapsed.as_millis() < 1000);
631    }
632
633    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
634    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
635    async fn test_budget_exceeded_message_format() {
636        let config = RetryConfig {
637            max_retries: 5,
638            initial_delay_ms: 10,
639            max_delay_ms: 20,
640            backoff_factor: 1.0,
641            jitter_ms: 0,
642            operation_timeout_ms: None,
643            immediate_first: false,
644            max_elapsed_ms: Some(35),
645        };
646        let manager = RetryManager::new(config);
647
648        let result = manager
649            .execute_with_retry(
650                "test_budget_msg",
651                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
652                should_retry_test_error,
653                create_test_error,
654            )
655            .await;
656
657        assert!(result.is_err());
658        let error_msg = result.unwrap_err().to_string();
659
660        assert!(error_msg.contains("Retry budget exceeded"));
661        assert!(error_msg.contains("/6)"));
662
663        if let Some(captures) = error_msg.strip_prefix("Timeout error: Retry budget exceeded (")
664            && let Some(nums) = captures.strip_suffix(")")
665        {
666            let parts: Vec<&str> = nums.split('/').collect();
667            assert_eq!(parts.len(), 2);
668            let current: u32 = parts[0].parse().unwrap();
669            let total: u32 = parts[1].parse().unwrap();
670
671            assert_eq!(total, 6, "Total should be max_retries + 1");
672            assert!(current <= total, "Current attempt should not exceed total");
673            assert!(current >= 1, "Current attempt should be at least 1");
674        }
675    }
676
677    #[cfg_attr(
678        not(all(feature = "simulation", madsim)),
679        tokio::test(start_paused = true)
680    )]
681    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
682    async fn test_budget_exceeded_edge_cases() {
683        let config = RetryConfig {
684            max_retries: 2,
685            initial_delay_ms: 50,
686            max_delay_ms: 100,
687            backoff_factor: 1.0,
688            jitter_ms: 0,
689            operation_timeout_ms: None,
690            immediate_first: false,
691            max_elapsed_ms: Some(100),
692        };
693        let manager = RetryManager::new(config);
694
695        let attempt_count = Arc::new(AtomicU32::new(0));
696        let count_clone = attempt_count.clone();
697
698        let handle = spawn(async move {
699            manager
700                .execute_with_retry(
701                    "test_first_attempt",
702                    move || {
703                        let count = count_clone.clone();
704                        async move {
705                            count.fetch_add(1, Ordering::SeqCst);
706                            Err::<i32, TestError>(TestError::Retryable("test".to_string()))
707                        }
708                    },
709                    should_retry_test_error,
710                    create_test_error,
711                )
712                .await
713        });
714
715        // Wait for first attempt
716        yield_until(|| attempt_count.load(Ordering::SeqCst) >= 1).await;
717
718        // Advance past budget to trigger check at loop start before second attempt
719        advance_clock(Duration::from_millis(101)).await;
720        yield_now().await;
721
722        let result = handle.await.unwrap();
723        assert!(result.is_err());
724        let error_msg = result.unwrap_err().to_string();
725
726        // Budget check happens at loop start, so shows (2/3) = "starting 2nd of 3 attempts"
727        assert!(
728            error_msg.contains("(2/3)"),
729            "Expected (2/3) but got: {error_msg}"
730        );
731    }
732
733    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
734    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
735    async fn test_budget_exceeded_no_overflow() {
736        let config = RetryConfig {
737            max_retries: u32::MAX,
738            initial_delay_ms: 10,
739            max_delay_ms: 20,
740            backoff_factor: 1.0,
741            jitter_ms: 0,
742            operation_timeout_ms: None,
743            immediate_first: false,
744            max_elapsed_ms: Some(1),
745        };
746        let manager = RetryManager::new(config);
747
748        let result = manager
749            .execute_with_retry(
750                "test_overflow",
751                || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
752                should_retry_test_error,
753                create_test_error,
754            )
755            .await;
756
757        assert!(result.is_err());
758        let error_msg = result.unwrap_err().to_string();
759
760        // Should saturate at u32::MAX instead of wrapping to 0
761        assert!(error_msg.contains("Retry budget exceeded"));
762        assert!(error_msg.contains(&format!("/{}", u32::MAX)));
763    }
764
765    #[rstest]
766    fn test_http_retry_manager_config() {
767        let manager = create_http_retry_manager::<TestError>();
768        assert_eq!(manager.config.max_retries, 3);
769        assert!(!manager.config.immediate_first);
770        assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
771    }
772
773    #[rstest]
774    fn test_websocket_retry_manager_config() {
775        let manager = create_websocket_retry_manager::<TestError>();
776        assert_eq!(manager.config.max_retries, 5);
777        assert!(manager.config.immediate_first);
778        assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
779    }
780
781    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
782    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
783    async fn test_timeout_respects_retry_predicate() {
784        let config = RetryConfig {
785            max_retries: 3,
786            initial_delay_ms: 10,
787            max_delay_ms: 50,
788            backoff_factor: 2.0,
789            jitter_ms: 0,
790            operation_timeout_ms: Some(50),
791            immediate_first: false,
792            max_elapsed_ms: None,
793        };
794        let manager = RetryManager::new(config);
795
796        // Test with retry predicate that rejects timeouts
797        let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
798
799        let result = manager
800            .execute_with_retry(
801                "test_timeout_non_retryable",
802                || async {
803                    time::sleep(Duration::from_millis(100)).await;
804                    Ok::<i32, TestError>(42)
805                },
806                should_not_retry_timeouts,
807                create_test_error,
808            )
809            .await;
810
811        // Should fail immediately without retries since timeout is non-retryable
812        assert!(result.is_err());
813        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
814    }
815
816    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
817    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
818    async fn test_timeout_retries_when_predicate_allows() {
819        let config = RetryConfig {
820            max_retries: 2,
821            initial_delay_ms: 10,
822            max_delay_ms: 50,
823            backoff_factor: 2.0,
824            jitter_ms: 0,
825            operation_timeout_ms: Some(50),
826            immediate_first: false,
827            max_elapsed_ms: None,
828        };
829        let manager = RetryManager::new(config);
830
831        // Test with retry predicate that allows timeouts
832        let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
833
834        let start = time::Instant::now();
835        let result = manager
836            .execute_with_retry(
837                "test_timeout_retryable",
838                || async {
839                    time::sleep(Duration::from_millis(100)).await;
840                    Ok::<i32, TestError>(42)
841                },
842                should_retry_timeouts,
843                create_test_error,
844            )
845            .await;
846
847        let elapsed = start.elapsed();
848
849        // Should fail after retries (not immediately)
850        assert!(result.is_err());
851        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
852        // Should have taken time for retries (at least 2 timeouts + delays)
853        assert!(elapsed.as_millis() > 80); // More than just one timeout
854    }
855
856    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
857    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
858    async fn test_successful_retry_after_failures() {
859        let config = RetryConfig {
860            max_retries: 3,
861            initial_delay_ms: 10,
862            max_delay_ms: 50,
863            backoff_factor: 2.0,
864            jitter_ms: 0,
865            operation_timeout_ms: None,
866            immediate_first: false,
867            max_elapsed_ms: None,
868        };
869        let manager = RetryManager::new(config);
870
871        let attempt_counter = Arc::new(AtomicU32::new(0));
872        let counter_clone = attempt_counter.clone();
873
874        let result = manager
875            .execute_with_retry(
876                "test_eventual_success",
877                move || {
878                    let counter = counter_clone.clone();
879                    async move {
880                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
881                        if attempts < 2 {
882                            Err(TestError::Retryable("temporary failure".to_string()))
883                        } else {
884                            Ok(42)
885                        }
886                    }
887                },
888                should_retry_test_error,
889                create_test_error,
890            )
891            .await;
892
893        assert_eq!(result.unwrap(), 42);
894        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
895    }
896
897    #[cfg_attr(
898        not(all(feature = "simulation", madsim)),
899        tokio::test(start_paused = true)
900    )]
901    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
902    async fn test_immediate_first_retry() {
903        let config = RetryConfig {
904            max_retries: 2,
905            initial_delay_ms: 100,
906            max_delay_ms: 200,
907            backoff_factor: 2.0,
908            jitter_ms: 0,
909            operation_timeout_ms: None,
910            immediate_first: true,
911            max_elapsed_ms: None,
912        };
913        let manager = RetryManager::new(config);
914
915        let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
916        let times_clone = attempt_times.clone();
917        let start = time::Instant::now();
918
919        let handle = spawn({
920            let times_clone = times_clone.clone();
921            async move {
922                let _ = manager
923                    .execute_with_retry(
924                        "test_immediate",
925                        move || {
926                            let times = times_clone.clone();
927                            async move {
928                                times.lock().expect(MUTEX_POISONED).push(start.elapsed());
929                                Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
930                            }
931                        },
932                        should_retry_test_error,
933                        create_test_error,
934                    )
935                    .await;
936            }
937        });
938
939        // Allow initial attempt and immediate retry to run without advancing time
940        yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 2).await;
941
942        // Advance time for the next backoff interval
943        advance_clock(Duration::from_millis(100)).await;
944        yield_now().await;
945
946        // Wait for the final retry to be recorded
947        yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 3).await;
948
949        handle.await.unwrap();
950
951        let times = attempt_times.lock().expect(MUTEX_POISONED);
952        assert_eq!(times.len(), 3); // Initial + 2 retries
953
954        // First retry should be immediate (within 1ms tolerance)
955        assert!(times[1] <= Duration::from_millis(1));
956        // Second retry should have backoff delay (at least 100ms from start)
957        assert!(times[2] >= Duration::from_millis(100));
958        assert!(times[2] <= Duration::from_millis(110));
959    }
960
961    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
962    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
963    async fn test_operation_without_timeout() {
964        let config = RetryConfig {
965            max_retries: 2,
966            initial_delay_ms: 10,
967            max_delay_ms: 50,
968            backoff_factor: 2.0,
969            jitter_ms: 0,
970            operation_timeout_ms: None, // No timeout
971            immediate_first: false,
972            max_elapsed_ms: None,
973        };
974        let manager = RetryManager::new(config);
975
976        let start = time::Instant::now();
977        let result = manager
978            .execute_with_retry(
979                "test_no_timeout",
980                || async {
981                    time::sleep(Duration::from_millis(50)).await;
982                    Ok::<i32, TestError>(42)
983                },
984                should_retry_test_error,
985                create_test_error,
986            )
987            .await;
988
989        let elapsed = start.elapsed();
990        assert_eq!(result.unwrap(), 42);
991        // Should complete without timing out
992        assert!(elapsed.as_millis() >= 30);
993        assert!(elapsed.as_millis() < 200);
994    }
995
996    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
997    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
998    async fn test_zero_retries() {
999        let config = RetryConfig {
1000            max_retries: 0,
1001            initial_delay_ms: 10,
1002            max_delay_ms: 50,
1003            backoff_factor: 2.0,
1004            jitter_ms: 0,
1005            operation_timeout_ms: None,
1006            immediate_first: false,
1007            max_elapsed_ms: None,
1008        };
1009        let manager = RetryManager::new(config);
1010
1011        let attempt_counter = Arc::new(AtomicU32::new(0));
1012        let counter_clone = attempt_counter.clone();
1013
1014        let result = manager
1015            .execute_with_retry(
1016                "test_no_retries",
1017                move || {
1018                    let counter = counter_clone.clone();
1019                    async move {
1020                        counter.fetch_add(1, Ordering::SeqCst);
1021                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1022                    }
1023                },
1024                should_retry_test_error,
1025                create_test_error,
1026            )
1027            .await;
1028
1029        assert!(result.is_err());
1030        // Should only attempt once (no retries)
1031        assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
1032    }
1033
1034    #[cfg_attr(
1035        not(all(feature = "simulation", madsim)),
1036        tokio::test(start_paused = true)
1037    )]
1038    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
1039    async fn test_jitter_applied() {
1040        let config = RetryConfig {
1041            max_retries: 2,
1042            initial_delay_ms: 50,
1043            max_delay_ms: 100,
1044            backoff_factor: 2.0,
1045            jitter_ms: 50, // Significant jitter
1046            operation_timeout_ms: None,
1047            immediate_first: false,
1048            max_elapsed_ms: None,
1049        };
1050        let manager = RetryManager::new(config);
1051
1052        let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
1053        let delays_clone = delays.clone();
1054        let last_time = Arc::new(std::sync::Mutex::new(time::Instant::now()));
1055        let last_time_clone = last_time.clone();
1056
1057        let handle = spawn({
1058            let delays_clone = delays_clone.clone();
1059            async move {
1060                let _ = manager
1061                    .execute_with_retry(
1062                        "test_jitter",
1063                        move || {
1064                            let delays = delays_clone.clone();
1065                            let last_time = last_time_clone.clone();
1066                            async move {
1067                                let now = time::Instant::now();
1068                                let delay = {
1069                                    let mut last = last_time.lock().expect(MUTEX_POISONED);
1070                                    let d = now.duration_since(*last);
1071                                    *last = now;
1072                                    d
1073                                };
1074                                delays.lock().expect(MUTEX_POISONED).push(delay);
1075                                Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1076                            }
1077                        },
1078                        should_retry_test_error,
1079                        create_test_error,
1080                    )
1081                    .await;
1082            }
1083        });
1084
1085        yield_until(|| !delays.lock().expect(MUTEX_POISONED).is_empty()).await;
1086        advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 2).await;
1087        advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 3).await;
1088
1089        handle.await.unwrap();
1090
1091        let delays = delays.lock().expect(MUTEX_POISONED);
1092        // Skip the first delay (initial attempt)
1093        for delay in delays.iter().skip(1) {
1094            // Each delay should be at least the base delay (50ms for first retry)
1095            assert!(delay.as_millis() >= 50);
1096            // But no more than base + jitter (allow small tolerance for step advance)
1097            assert!(delay.as_millis() <= 151);
1098        }
1099    }
1100
1101    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
1102    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
1103    async fn test_max_elapsed_stops_early() {
1104        let config = RetryConfig {
1105            max_retries: 100, // Very high retry count
1106            initial_delay_ms: 50,
1107            max_delay_ms: 100,
1108            backoff_factor: 1.5,
1109            jitter_ms: 0,
1110            operation_timeout_ms: None,
1111            immediate_first: false,
1112            max_elapsed_ms: Some(150), // Should stop after ~3 attempts
1113        };
1114        let manager = RetryManager::new(config);
1115
1116        let attempt_counter = Arc::new(AtomicU32::new(0));
1117        let counter_clone = attempt_counter.clone();
1118
1119        let start = time::Instant::now();
1120        let result = manager
1121            .execute_with_retry(
1122                "test_elapsed_limit",
1123                move || {
1124                    let counter = counter_clone.clone();
1125                    async move {
1126                        counter.fetch_add(1, Ordering::SeqCst);
1127                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1128                    }
1129                },
1130                should_retry_test_error,
1131                create_test_error,
1132            )
1133            .await;
1134
1135        let elapsed = start.elapsed();
1136        assert!(result.is_err());
1137        assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1138
1139        // Should have stopped due to time limit, not retry count
1140        let attempts = attempt_counter.load(Ordering::SeqCst);
1141        assert!(attempts < 10); // Much less than max_retries
1142        assert!(elapsed.as_millis() >= 100);
1143    }
1144
1145    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
1146    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
1147    async fn test_mixed_errors_retry_behavior() {
1148        let config = RetryConfig {
1149            max_retries: 5,
1150            initial_delay_ms: 10,
1151            max_delay_ms: 50,
1152            backoff_factor: 2.0,
1153            jitter_ms: 0,
1154            operation_timeout_ms: None,
1155            immediate_first: false,
1156            max_elapsed_ms: None,
1157        };
1158        let manager = RetryManager::new(config);
1159
1160        let attempt_counter = Arc::new(AtomicU32::new(0));
1161        let counter_clone = attempt_counter.clone();
1162
1163        let result = manager
1164            .execute_with_retry(
1165                "test_mixed_errors",
1166                move || {
1167                    let counter = counter_clone.clone();
1168                    async move {
1169                        let attempts = counter.fetch_add(1, Ordering::SeqCst);
1170                        match attempts {
1171                            0 => Err(TestError::Retryable("retry 1".to_string())),
1172                            1 => Err(TestError::Retryable("retry 2".to_string())),
1173                            2 => Err(TestError::NonRetryable("stop here".to_string())),
1174                            _ => Ok(42),
1175                        }
1176                    }
1177                },
1178                should_retry_test_error,
1179                create_test_error,
1180            )
1181            .await;
1182
1183        assert!(result.is_err());
1184        assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1185        // Should stop at the non-retryable error
1186        assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1187    }
1188
1189    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
1190    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
1191    async fn test_cancellation_during_retry_delay() {
1192        use tokio_util::sync::CancellationToken;
1193
1194        let config = RetryConfig {
1195            max_retries: 10,
1196            initial_delay_ms: 500, // Long delay to ensure cancellation happens during sleep
1197            max_delay_ms: 1000,
1198            backoff_factor: 2.0,
1199            jitter_ms: 0,
1200            operation_timeout_ms: None,
1201            immediate_first: false,
1202            max_elapsed_ms: None,
1203        };
1204        let manager = RetryManager::new(config);
1205
1206        let token = CancellationToken::new();
1207        let token_clone = token.clone();
1208
1209        // Cancel after a short delay
1210        spawn(async move {
1211            time::sleep(Duration::from_millis(100)).await;
1212            token_clone.cancel();
1213        });
1214
1215        let attempt_counter = Arc::new(AtomicU32::new(0));
1216        let counter_clone = attempt_counter.clone();
1217
1218        let start = time::Instant::now();
1219        let result = manager
1220            .execute_with_retry_with_cancel(
1221                "test_cancellation",
1222                move || {
1223                    let counter = counter_clone.clone();
1224                    async move {
1225                        counter.fetch_add(1, Ordering::SeqCst);
1226                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1227                    }
1228                },
1229                should_retry_test_error,
1230                create_test_error,
1231                &token,
1232            )
1233            .await;
1234
1235        let elapsed = start.elapsed();
1236
1237        // Should be canceled quickly
1238        assert!(result.is_err());
1239        let error_msg = format!("{}", result.unwrap_err());
1240        assert!(error_msg.contains("canceled"));
1241
1242        // Should not have taken the full delay time
1243        assert!(elapsed.as_millis() < 600);
1244
1245        // Should have made at least one attempt
1246        let attempts = attempt_counter.load(Ordering::SeqCst);
1247        assert!(attempts >= 1);
1248    }
1249
1250    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
1251    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
1252    async fn test_cancellation_during_operation_execution() {
1253        use tokio_util::sync::CancellationToken;
1254
1255        let config = RetryConfig {
1256            max_retries: 5,
1257            initial_delay_ms: 50,
1258            max_delay_ms: 100,
1259            backoff_factor: 2.0,
1260            jitter_ms: 0,
1261            operation_timeout_ms: None,
1262            immediate_first: false,
1263            max_elapsed_ms: None,
1264        };
1265        let manager = RetryManager::new(config);
1266
1267        let token = CancellationToken::new();
1268        let token_clone = token.clone();
1269
1270        // Cancel after a short delay
1271        spawn(async move {
1272            time::sleep(Duration::from_millis(50)).await;
1273            token_clone.cancel();
1274        });
1275
1276        let start = time::Instant::now();
1277        let result = manager
1278            .execute_with_retry_with_cancel(
1279                "test_cancellation_during_op",
1280                || async {
1281                    // Long-running operation
1282                    time::sleep(Duration::from_millis(200)).await;
1283                    Ok::<i32, TestError>(42)
1284                },
1285                should_retry_test_error,
1286                create_test_error,
1287                &token,
1288            )
1289            .await;
1290
1291        let elapsed = start.elapsed();
1292
1293        // Should be canceled during the operation
1294        assert!(result.is_err());
1295        let error_msg = format!("{}", result.unwrap_err());
1296        assert!(error_msg.contains("canceled"));
1297
1298        // Should not have completed the long operation
1299        assert!(elapsed.as_millis() < 250);
1300    }
1301
1302    #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
1303    #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
1304    async fn test_cancellation_error_message() {
1305        use tokio_util::sync::CancellationToken;
1306
1307        let config = RetryConfig::default();
1308        let manager = RetryManager::new(config);
1309
1310        let token = CancellationToken::new();
1311        token.cancel(); // Pre-cancel for immediate cancellation
1312
1313        let result = manager
1314            .execute_with_retry_with_cancel(
1315                "test_operation",
1316                || async { Ok::<i32, TestError>(42) },
1317                should_retry_test_error,
1318                create_test_error,
1319                &token,
1320            )
1321            .await;
1322
1323        assert!(result.is_err());
1324        let error_msg = format!("{}", result.unwrap_err());
1325        assert!(error_msg.contains("canceled"));
1326    }
1327}
1328
1329#[cfg(test)]
1330mod proptest_tests {
1331    use std::sync::{
1332        Arc,
1333        atomic::{AtomicU32, Ordering},
1334    };
1335
1336    #[cfg(all(feature = "simulation", madsim))]
1337    use madsim::task::spawn;
1338    use nautilus_core::MUTEX_POISONED;
1339    use proptest::prelude::*;
1340    // Import rstest attribute macro used within proptest! tests
1341    use rstest::rstest;
1342    #[cfg(not(all(feature = "simulation", madsim)))]
1343    use tokio::task::spawn;
1344
1345    #[cfg(not(all(feature = "simulation", madsim)))]
1346    use super::tests::{advance_until, yield_until};
1347    use super::{test_utils::*, tests::advance_clock, *};
1348    use crate::dst::time;
1349
1350    // Each proptest case constructs a runtime to drive the manager via
1351    // `block_on`. Under tokio, that runtime is paused so virtual sleeps
1352    // auto-advance; under madsim, the runtime is the deterministic sim
1353    // runtime, which also runs in virtual time. Both expose `block_on`.
1354    #[cfg(all(feature = "simulation", madsim))]
1355    fn build_paused_runtime() -> madsim::runtime::Runtime {
1356        madsim::runtime::Runtime::new()
1357    }
1358
1359    #[cfg(not(all(feature = "simulation", madsim)))]
1360    fn build_paused_runtime() -> tokio::runtime::Runtime {
1361        tokio::runtime::Builder::new_current_thread()
1362            .enable_time()
1363            .start_paused(true)
1364            .build()
1365            .unwrap()
1366    }
1367
1368    proptest! {
1369        #[rstest]
1370        fn test_retry_config_valid_ranges(
1371            max_retries in 0u32..100,
1372            initial_delay_ms in 1u64..10_000,
1373            max_delay_ms in 1u64..60_000,
1374            backoff_factor in 1.0f64..10.0,
1375            jitter_ms in 0u64..1_000,
1376            operation_timeout_ms in prop::option::of(1u64..120_000),
1377            immediate_first in any::<bool>(),
1378            max_elapsed_ms in prop::option::of(1u64..300_000)
1379        ) {
1380            // Ensure max_delay >= initial_delay for valid config
1381            let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1382
1383            let config = RetryConfig {
1384                max_retries,
1385                initial_delay_ms,
1386                max_delay_ms,
1387                backoff_factor,
1388                jitter_ms,
1389                operation_timeout_ms,
1390                immediate_first,
1391                max_elapsed_ms,
1392            };
1393
1394            // Should always be able to create a RetryManager with valid config
1395            let _manager = RetryManager::<std::io::Error>::new(config);
1396        }
1397
1398        #[rstest]
1399        fn test_retry_attempts_bounded(
1400            max_retries in 0u32..5,
1401            initial_delay_ms in 1u64..10,
1402            backoff_factor in 1.0f64..2.0,
1403        ) {
1404            let rt = build_paused_runtime();
1405
1406            let config = RetryConfig {
1407                max_retries,
1408                initial_delay_ms,
1409                max_delay_ms: initial_delay_ms * 2,
1410                backoff_factor,
1411                jitter_ms: 0,
1412                operation_timeout_ms: None,
1413                immediate_first: false,
1414                max_elapsed_ms: None,
1415            };
1416
1417            let manager = RetryManager::new(config);
1418            let attempt_counter = Arc::new(AtomicU32::new(0));
1419            let counter_clone = attempt_counter.clone();
1420
1421            let _result = rt.block_on(manager.execute_with_retry(
1422                "prop_test",
1423                move || {
1424                    let counter = counter_clone.clone();
1425                    async move {
1426                        counter.fetch_add(1, Ordering::SeqCst);
1427                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1428                    }
1429                },
1430                |e: &TestError| matches!(e, TestError::Retryable(_)),
1431                TestError::Timeout,
1432            ));
1433
1434            let attempts = attempt_counter.load(Ordering::SeqCst);
1435            // Total attempts should be 1 (initial) + max_retries
1436            prop_assert_eq!(attempts, max_retries + 1);
1437        }
1438
1439        #[rstest]
1440        fn test_timeout_always_respected(
1441            timeout_ms in 10u64..50,
1442            operation_delay_ms in 60u64..100,
1443        ) {
1444            let rt = build_paused_runtime();
1445
1446            let config = RetryConfig {
1447                max_retries: 0, // No retries to isolate timeout behavior
1448                initial_delay_ms: 10,
1449                max_delay_ms: 100,
1450                backoff_factor: 2.0,
1451                jitter_ms: 0,
1452                operation_timeout_ms: Some(timeout_ms),
1453                immediate_first: false,
1454                max_elapsed_ms: None,
1455            };
1456
1457            let manager = RetryManager::new(config);
1458
1459            let result = rt.block_on(async {
1460                let operation_future = manager.execute_with_retry(
1461                    "timeout_test",
1462                    move || async move {
1463                        time::sleep(Duration::from_millis(operation_delay_ms)).await;
1464                        Ok::<i32, TestError>(42)
1465                    },
1466                    |_: &TestError| true,
1467                    TestError::Timeout,
1468                );
1469
1470                // Advance time to trigger timeout
1471                advance_clock(Duration::from_millis(timeout_ms + 10)).await;
1472                operation_future.await
1473            });
1474
1475            // Operation should timeout
1476            prop_assert!(result.is_err());
1477            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1478        }
1479
1480        #[rstest]
1481        fn test_max_elapsed_always_respected(
1482            max_elapsed_ms in 20u64..50,
1483            delay_per_retry in 15u64..30,
1484            max_retries in 10u32..20,
1485        ) {
1486            let rt = build_paused_runtime();
1487
1488            // Set up config where we would exceed max_elapsed_ms before max_retries
1489            let config = RetryConfig {
1490                max_retries,
1491                initial_delay_ms: delay_per_retry,
1492                max_delay_ms: delay_per_retry * 2,
1493                backoff_factor: 1.0, // No backoff to make timing predictable
1494                jitter_ms: 0,
1495                operation_timeout_ms: None,
1496                immediate_first: false,
1497                max_elapsed_ms: Some(max_elapsed_ms),
1498            };
1499
1500            let manager = RetryManager::new(config);
1501            let attempt_counter = Arc::new(AtomicU32::new(0));
1502            let counter_clone = attempt_counter.clone();
1503
1504            let result = rt.block_on(async {
1505                let operation_future = manager.execute_with_retry(
1506                    "elapsed_test",
1507                    move || {
1508                        let counter = counter_clone.clone();
1509                        async move {
1510                            counter.fetch_add(1, Ordering::SeqCst);
1511                            Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1512                        }
1513                    },
1514                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1515                    TestError::Timeout,
1516                );
1517
1518                // Advance time past max_elapsed_ms
1519                advance_clock(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1520                operation_future.await
1521            });
1522
1523            let attempts = attempt_counter.load(Ordering::SeqCst);
1524
1525            // Should have failed with timeout error
1526            prop_assert!(result.is_err());
1527            prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1528
1529            // Should have stopped before exhausting all retries
1530            prop_assert!(attempts <= max_retries + 1);
1531        }
1532
1533        #[rstest]
1534        fn test_jitter_bounds(
1535            jitter_ms in 0u64..20,
1536            base_delay_ms in 10u64..30,
1537        ) {
1538            let rt = build_paused_runtime();
1539
1540            let config = RetryConfig {
1541                max_retries: 2,
1542                initial_delay_ms: base_delay_ms,
1543                max_delay_ms: base_delay_ms * 2,
1544                backoff_factor: 1.0, // No backoff to isolate jitter
1545                jitter_ms,
1546                operation_timeout_ms: None,
1547                immediate_first: false,
1548                max_elapsed_ms: None,
1549            };
1550
1551            let manager = RetryManager::new(config);
1552            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1553            let attempt_times_for_block = attempt_times.clone();
1554
1555            rt.block_on(async move {
1556                #[cfg(not(all(feature = "simulation", madsim)))]
1557                let attempt_times_for_wait = attempt_times_for_block.clone();
1558                let handle = spawn({
1559                    let attempt_times_for_task = attempt_times_for_block.clone();
1560                    let manager = manager;
1561                    async move {
1562                        let start_time = time::Instant::now();
1563                        let _ = manager
1564                            .execute_with_retry(
1565                                "jitter_test",
1566                                move || {
1567                                    let attempt_times_inner = attempt_times_for_task.clone();
1568                                    async move {
1569                                        attempt_times_inner
1570                                            .lock()
1571                                            .unwrap()
1572                                            .push(start_time.elapsed());
1573                                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1574                                    }
1575                                },
1576                                |e: &TestError| matches!(e, TestError::Retryable(_)),
1577                                TestError::Timeout,
1578                            )
1579                            .await;
1580                    }
1581                });
1582
1583                // Under tokio paused clock, drive virtual time forward in 1ms
1584                // ticks to release the manager's sleeps; under madsim the
1585                // runtime auto-advances when all tasks block on virtual time,
1586                // so awaiting the handle is enough and yields exact timings.
1587                #[cfg(not(all(feature = "simulation", madsim)))]
1588                {
1589                    yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1590                    advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1591                    advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1592                }
1593
1594                handle.await.unwrap();
1595            });
1596
1597            let times = attempt_times.lock().expect(MUTEX_POISONED);
1598
1599            // We expect at least 2 attempts total (initial + at least 1 retry)
1600            prop_assert!(times.len() >= 2);
1601
1602            // First attempt should be immediate (no delay)
1603            prop_assert!(times[0].as_millis() < 5);
1604
1605            // Check subsequent retries have appropriate delays
1606            for i in 1..times.len() {
1607                let delay_from_previous = if i == 1 {
1608                    times[i].checked_sub(times[0]).unwrap()
1609                } else {
1610                    times[i].checked_sub(times[i - 1]).unwrap()
1611                };
1612
1613                // The delay should be at least base_delay_ms
1614                prop_assert!(
1615                    delay_from_previous.as_millis() >= u128::from(base_delay_ms),
1616                    "Retry {} delay {}ms is less than base {}ms",
1617                    i, delay_from_previous.as_millis(), base_delay_ms
1618                );
1619
1620                // Delay should be at most base_delay + jitter
1621                prop_assert!(
1622                    delay_from_previous.as_millis() <= u128::from(base_delay_ms + jitter_ms + 1),
1623                    "Retry {} delay {}ms exceeds base {} + jitter {}",
1624                    i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1625                );
1626            }
1627        }
1628
1629        #[rstest]
1630        fn test_immediate_first_property(
1631            immediate_first in any::<bool>(),
1632            initial_delay_ms in 10u64..30,
1633        ) {
1634            let rt = build_paused_runtime();
1635
1636            let config = RetryConfig {
1637                max_retries: 2,
1638                initial_delay_ms,
1639                max_delay_ms: initial_delay_ms * 2,
1640                backoff_factor: 2.0,
1641                jitter_ms: 0,
1642                operation_timeout_ms: None,
1643                immediate_first,
1644                max_elapsed_ms: None,
1645            };
1646
1647            let manager = RetryManager::new(config);
1648            let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1649            let attempt_times_for_block = attempt_times.clone();
1650
1651            rt.block_on(async move {
1652                #[cfg(not(all(feature = "simulation", madsim)))]
1653                let attempt_times_for_wait = attempt_times_for_block.clone();
1654                let handle = spawn({
1655                    let attempt_times_for_task = attempt_times_for_block.clone();
1656                    let manager = manager;
1657                    async move {
1658                        let start = time::Instant::now();
1659                        let _ = manager
1660                            .execute_with_retry(
1661                                "immediate_test",
1662                                move || {
1663                                    let attempt_times_inner = attempt_times_for_task.clone();
1664                                    async move {
1665                                        let elapsed = start.elapsed();
1666                                        attempt_times_inner.lock().expect(MUTEX_POISONED).push(elapsed);
1667                                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1668                                    }
1669                                },
1670                                |e: &TestError| matches!(e, TestError::Retryable(_)),
1671                                TestError::Timeout,
1672                            )
1673                            .await;
1674                    }
1675                });
1676
1677                // See test_jitter_bounds: madsim auto-advances virtual time
1678                // when all tasks block on it, so awaiting the handle suffices
1679                // and avoids the 1ms-tick driver's added scheduler overhead.
1680                #[cfg(not(all(feature = "simulation", madsim)))]
1681                {
1682                    yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1683                    advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1684                    advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1685                }
1686
1687                handle.await.unwrap();
1688            });
1689
1690            let times = attempt_times.lock().expect(MUTEX_POISONED);
1691            prop_assert!(times.len() >= 2);
1692
1693            if immediate_first {
1694                // First retry should be immediate
1695                prop_assert!(times[1].as_millis() < 20,
1696                    "With immediate_first=true, first retry took {}ms",
1697                    times[1].as_millis());
1698            } else {
1699                // First retry should have delay
1700                prop_assert!(times[1].as_millis() >= u128::from(initial_delay_ms - 1),
1701                    "With immediate_first=false, first retry was too fast: {}ms",
1702                    times[1].as_millis());
1703            }
1704        }
1705
1706        #[rstest]
1707        fn test_non_retryable_stops_immediately(
1708            attempt_before_non_retryable in 0usize..3,
1709            max_retries in 3u32..5,
1710        ) {
1711            let rt = build_paused_runtime();
1712
1713            let config = RetryConfig {
1714                max_retries,
1715                initial_delay_ms: 10,
1716                max_delay_ms: 100,
1717                backoff_factor: 2.0,
1718                jitter_ms: 0,
1719                operation_timeout_ms: None,
1720                immediate_first: false,
1721                max_elapsed_ms: None,
1722            };
1723
1724            let manager = RetryManager::new(config);
1725            let attempt_counter = Arc::new(AtomicU32::new(0));
1726            let counter_clone = attempt_counter.clone();
1727
1728            let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1729                "non_retryable_test",
1730                move || {
1731                    let counter = counter_clone.clone();
1732                    async move {
1733                        let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1734                        if attempts == attempt_before_non_retryable {
1735                            Err(TestError::NonRetryable("stop".to_string()))
1736                        } else {
1737                            Err(TestError::Retryable("retry".to_string()))
1738                        }
1739                    }
1740                },
1741                |e: &TestError| matches!(e, TestError::Retryable(_)),
1742                TestError::Timeout,
1743            ));
1744
1745            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1746
1747            prop_assert!(result.is_err());
1748            prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1749            // Should stop exactly when non-retryable error occurs
1750            prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1751        }
1752
1753        #[rstest]
1754        fn test_cancellation_stops_immediately(
1755            cancel_after_ms in 10u64..100,
1756            initial_delay_ms in 200u64..500,
1757        ) {
1758            use tokio_util::sync::CancellationToken;
1759
1760            let rt = build_paused_runtime();
1761
1762            let config = RetryConfig {
1763                max_retries: 10,
1764                initial_delay_ms,
1765                max_delay_ms: initial_delay_ms * 2,
1766                backoff_factor: 2.0,
1767                jitter_ms: 0,
1768                operation_timeout_ms: None,
1769                immediate_first: false,
1770                max_elapsed_ms: None,
1771            };
1772
1773            let manager = RetryManager::new(config);
1774            let token = CancellationToken::new();
1775            let token_clone = token.clone();
1776
1777            let result: Result<i32, TestError> = rt.block_on(async {
1778                // Spawn cancellation task
1779                spawn(async move {
1780                    time::sleep(Duration::from_millis(cancel_after_ms)).await;
1781                    token_clone.cancel();
1782                });
1783
1784                let operation_future = manager.execute_with_retry_with_cancel(
1785                    "cancellation_test",
1786                    || async {
1787                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1788                    },
1789                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1790                    create_test_error,
1791                    &token,
1792                );
1793
1794                // Advance time to trigger cancellation
1795                advance_clock(Duration::from_millis(cancel_after_ms + 10)).await;
1796                operation_future.await
1797            });
1798
1799            // Should be canceled
1800            prop_assert!(result.is_err());
1801            let error_msg = format!("{}", result.unwrap_err());
1802            prop_assert!(error_msg.contains("canceled"));
1803        }
1804
1805        #[rstest]
1806        fn test_budget_clamp_prevents_overshoot(
1807            max_elapsed_ms in 10u64..30,
1808            delay_per_retry in 20u64..50,
1809        ) {
1810            let rt = build_paused_runtime();
1811
1812            // Configure so that first retry delay would exceed budget
1813            let config = RetryConfig {
1814                max_retries: 5,
1815                initial_delay_ms: delay_per_retry,
1816                max_delay_ms: delay_per_retry * 2,
1817                backoff_factor: 1.0,
1818                jitter_ms: 0,
1819                operation_timeout_ms: None,
1820                immediate_first: false,
1821                max_elapsed_ms: Some(max_elapsed_ms),
1822            };
1823
1824            let manager = RetryManager::new(config);
1825
1826            let _result = rt.block_on(async {
1827                let operation_future = manager.execute_with_retry(
1828                    "budget_clamp_test",
1829                    || async {
1830                        // Fast operation to focus on delay timing
1831                        Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1832                    },
1833                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1834                    create_test_error,
1835                );
1836
1837                // Advance time past max_elapsed_ms
1838                advance_clock(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1839                operation_future.await
1840            });
1841
1842            // With deterministic time, operation completes without wall-clock delay
1843            // The budget constraint is still enforced by the retry manager
1844        }
1845
1846        #[rstest]
1847        fn test_success_on_kth_attempt(
1848            k in 1usize..5,
1849            initial_delay_ms in 5u64..20,
1850        ) {
1851            let rt = build_paused_runtime();
1852
1853            let config = RetryConfig {
1854                max_retries: 10, // More than k
1855                initial_delay_ms,
1856                max_delay_ms: initial_delay_ms * 4,
1857                backoff_factor: 2.0,
1858                jitter_ms: 0,
1859                operation_timeout_ms: None,
1860                immediate_first: false,
1861                max_elapsed_ms: None,
1862            };
1863
1864            let manager = RetryManager::new(config);
1865            let attempt_counter = Arc::new(AtomicU32::new(0));
1866            let counter_clone = attempt_counter.clone();
1867            let target_k = k;
1868
1869            let (result, _elapsed) = rt.block_on(async {
1870                let start = time::Instant::now();
1871
1872                let operation_future = manager.execute_with_retry(
1873                    "kth_attempt_test",
1874                    move || {
1875                        let counter = counter_clone.clone();
1876                        async move {
1877                            let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1878                            if attempt + 1 == target_k {
1879                                Ok(42)
1880                            } else {
1881                                Err(TestError::Retryable("retry".to_string()))
1882                            }
1883                        }
1884                    },
1885                    |e: &TestError| matches!(e, TestError::Retryable(_)),
1886                    create_test_error,
1887                );
1888
1889                // Advance time to allow enough retries
1890                for _ in 0..k {
1891                    advance_clock(Duration::from_millis(initial_delay_ms * 4)).await;
1892                }
1893
1894                let result = operation_future.await;
1895                let elapsed = start.elapsed();
1896
1897                (result, elapsed)
1898            });
1899
1900            let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1901
1902            // Using paused Tokio time (start_paused + advance); assert behavior only (no wall-clock timing)
1903            prop_assert!(result.is_ok());
1904            prop_assert_eq!(result.unwrap(), 42);
1905            prop_assert_eq!(attempts, k);
1906        }
1907    }
1908}