1use std::{future::Future, marker::PhantomData, time::Duration};
19
20use serde::{Deserialize, Serialize};
21use tokio_util::sync::CancellationToken;
22
23use crate::{backoff::ExponentialBackoff, dst};
24
25#[derive(Debug, Clone, Deserialize, Serialize)]
27#[serde(default, deny_unknown_fields)]
28pub struct RetryConfig {
29 pub max_retries: u32,
31 pub initial_delay_ms: u64,
33 pub max_delay_ms: u64,
35 pub backoff_factor: f64,
37 pub jitter_ms: u64,
39 pub operation_timeout_ms: Option<u64>,
42 pub immediate_first: bool,
45 pub max_elapsed_ms: Option<u64>,
48}
49
50impl Default for RetryConfig {
51 fn default() -> Self {
52 Self {
53 max_retries: 3,
54 initial_delay_ms: 1_000,
55 max_delay_ms: 10_000,
56 backoff_factor: 2.0,
57 jitter_ms: 100,
58 operation_timeout_ms: Some(30_000),
59 immediate_first: false,
60 max_elapsed_ms: None,
61 }
62 }
63}
64
65#[derive(Clone, Debug)]
69pub struct RetryManager<E> {
70 config: RetryConfig,
71 _phantom: PhantomData<E>,
72}
73
74impl<E> RetryManager<E>
75where
76 E: std::error::Error,
77{
78 #[must_use]
80 pub const fn new(config: RetryConfig) -> Self {
81 Self {
82 config,
83 _phantom: PhantomData,
84 }
85 }
86
87 #[inline(always)]
89 fn budget_exceeded_msg(&self, attempt: u32) -> String {
90 format!(
91 "Retry budget exceeded ({}/{})",
92 attempt.saturating_add(1),
93 self.config.max_retries.saturating_add(1)
94 )
95 }
96
97 pub async fn execute_with_retry_inner<F, Fut, T>(
111 &self,
112 operation_name: &str,
113 mut operation: F,
114 should_retry: impl Fn(&E) -> bool,
115 create_error: impl Fn(String) -> E,
116 cancel: Option<&CancellationToken>,
117 ) -> Result<T, E>
118 where
119 F: FnMut() -> Fut,
120 Fut: Future<Output = Result<T, E>>,
121 {
122 let mut backoff = ExponentialBackoff::new(
123 Duration::from_millis(self.config.initial_delay_ms),
124 Duration::from_millis(self.config.max_delay_ms),
125 self.config.backoff_factor,
126 self.config.jitter_ms,
127 self.config.immediate_first,
128 )
129 .map_err(|e| create_error(format!("Invalid configuration: {e}")))?;
130
131 let mut attempt = 0;
132 let start_time = dst::time::Instant::now();
133
134 loop {
135 if let Some(token) = cancel
136 && token.is_cancelled()
137 {
138 log::debug!("Operation '{operation_name}' canceled after {attempt} attempts");
139 return Err(create_error("canceled".to_string()));
140 }
141
142 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
143 let elapsed = start_time.elapsed();
144 if elapsed.as_millis() >= u128::from(max_elapsed_ms) {
145 return Err(create_error(self.budget_exceeded_msg(attempt)));
146 }
147 }
148
149 let result = match (self.config.operation_timeout_ms, cancel) {
150 (Some(timeout_ms), Some(token)) => {
151 tokio::select! {
152 biased;
153 result = dst::time::timeout(Duration::from_millis(timeout_ms), operation()) => result,
154 () = token.cancelled() => {
155 log::debug!("Operation '{operation_name}' canceled during execution");
156 return Err(create_error("canceled".to_string()));
157 }
158 }
159 }
160 (Some(timeout_ms), None) => {
161 dst::time::timeout(Duration::from_millis(timeout_ms), operation()).await
162 }
163 (None, Some(token)) => {
164 tokio::select! {
165 biased;
166 result = operation() => Ok(result),
167 () = token.cancelled() => {
168 log::debug!("Operation '{operation_name}' canceled during execution");
169 return Err(create_error("canceled".to_string()));
170 }
171 }
172 }
173 (None, None) => Ok(operation().await),
174 };
175
176 match result {
177 Ok(Ok(success)) => {
178 if attempt > 0 {
179 log::trace!(
180 "Operation '{operation_name}' succeeded after {} attempts",
181 attempt + 1
182 );
183 }
184 return Ok(success);
185 }
186 Ok(Err(e)) => {
187 if !should_retry(&e) {
188 log::trace!("Operation '{operation_name}' non-retryable error: {e}");
189 return Err(e);
190 }
191
192 if attempt >= self.config.max_retries {
193 log::trace!(
194 "Operation '{operation_name}' retries exhausted after {} attempts: {e}",
195 attempt + 1
196 );
197 return Err(e);
198 }
199
200 let mut delay = backoff.next_duration();
201
202 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
203 let elapsed = start_time.elapsed();
204 let remaining =
205 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
206
207 if remaining.is_zero() {
208 return Err(create_error(self.budget_exceeded_msg(attempt)));
209 }
210
211 delay = delay.min(remaining);
212 }
213
214 log::trace!(
215 "Operation '{operation_name}' attempt {} failed, retrying in {}ms: {e}",
216 attempt + 1,
217 delay.as_millis()
218 );
219
220 if delay.is_zero() {
222 tokio::task::yield_now().await;
223 attempt += 1;
224 continue;
225 }
226
227 if let Some(token) = cancel {
228 tokio::select! {
229 biased;
230 () = dst::time::sleep(delay) => {},
231 () = token.cancelled() => {
232 log::debug!("Operation '{operation_name}' canceled during retry delay (attempt {})", attempt + 1);
233 return Err(create_error("canceled".to_string()));
234 }
235 }
236 } else {
237 dst::time::sleep(delay).await;
238 }
239 attempt += 1;
240 }
241 Err(_) => {
242 let e = create_error(format!(
243 "Timed out after {}ms",
244 self.config.operation_timeout_ms.unwrap_or(0)
245 ));
246
247 if !should_retry(&e) {
248 log::trace!("Operation '{operation_name}' non-retryable timeout: {e}");
249 return Err(e);
250 }
251
252 if attempt >= self.config.max_retries {
253 log::trace!(
254 "Operation '{operation_name}' retries exhausted after timeout ({} attempts): {e}",
255 attempt + 1
256 );
257 return Err(e);
258 }
259
260 let mut delay = backoff.next_duration();
261
262 if let Some(max_elapsed_ms) = self.config.max_elapsed_ms {
263 let elapsed = start_time.elapsed();
264 let remaining =
265 Duration::from_millis(max_elapsed_ms).saturating_sub(elapsed);
266
267 if remaining.is_zero() {
268 return Err(create_error(self.budget_exceeded_msg(attempt)));
269 }
270
271 delay = delay.min(remaining);
272 }
273
274 log::trace!(
275 "Operation '{operation_name}' attempt {} timed out, retrying in {}ms: {e}",
276 attempt + 1,
277 delay.as_millis()
278 );
279
280 if delay.is_zero() {
282 tokio::task::yield_now().await;
283 attempt += 1;
284 continue;
285 }
286
287 if let Some(token) = cancel {
288 tokio::select! {
289 biased;
290 () = dst::time::sleep(delay) => {},
291 () = token.cancelled() => {
292 log::debug!("Operation '{operation_name}' canceled during retry delay (attempt {})", attempt + 1);
293 return Err(create_error("canceled".to_string()));
294 }
295 }
296 } else {
297 dst::time::sleep(delay).await;
298 }
299 attempt += 1;
300 }
301 }
302 }
303 }
304
305 pub async fn execute_with_retry<F, Fut, T>(
312 &self,
313 operation_name: &str,
314 operation: F,
315 should_retry: impl Fn(&E) -> bool,
316 create_error: impl Fn(String) -> E,
317 ) -> Result<T, E>
318 where
319 F: FnMut() -> Fut,
320 Fut: Future<Output = Result<T, E>>,
321 {
322 self.execute_with_retry_inner(operation_name, operation, should_retry, create_error, None)
323 .await
324 }
325
326 pub async fn execute_with_retry_with_cancel<F, Fut, T>(
333 &self,
334 operation_name: &str,
335 operation: F,
336 should_retry: impl Fn(&E) -> bool,
337 create_error: impl Fn(String) -> E,
338 cancellation_token: &CancellationToken,
339 ) -> Result<T, E>
340 where
341 F: FnMut() -> Fut,
342 Fut: Future<Output = Result<T, E>>,
343 {
344 self.execute_with_retry_inner(
345 operation_name,
346 operation,
347 should_retry,
348 create_error,
349 Some(cancellation_token),
350 )
351 .await
352 }
353}
354
355#[must_use]
357pub fn create_default_retry_manager<E>() -> RetryManager<E>
358where
359 E: std::error::Error,
360{
361 RetryManager::new(RetryConfig::default())
362}
363
364#[must_use]
366pub const fn create_http_retry_manager<E>() -> RetryManager<E>
367where
368 E: std::error::Error,
369{
370 let config = RetryConfig {
371 max_retries: 3,
372 initial_delay_ms: 1_000,
373 max_delay_ms: 10_000,
374 backoff_factor: 2.0,
375 jitter_ms: 1_000,
376 operation_timeout_ms: Some(60_000), immediate_first: false,
378 max_elapsed_ms: Some(180_000), };
380 RetryManager::new(config)
381}
382
383#[must_use]
385pub const fn create_websocket_retry_manager<E>() -> RetryManager<E>
386where
387 E: std::error::Error,
388{
389 let config = RetryConfig {
390 max_retries: 5,
391 initial_delay_ms: 1_000,
392 max_delay_ms: 10_000,
393 backoff_factor: 2.0,
394 jitter_ms: 1_000,
395 operation_timeout_ms: Some(30_000), immediate_first: true,
397 max_elapsed_ms: Some(120_000), };
399 RetryManager::new(config)
400}
401
402#[cfg(test)]
403mod test_utils {
404 #[derive(Debug, thiserror::Error)]
405 pub enum TestError {
406 #[error("Retryable error: {0}")]
407 Retryable(String),
408 #[error("Non-retryable error: {0}")]
409 NonRetryable(String),
410 #[error("Timeout error: {0}")]
411 Timeout(String),
412 }
413
414 pub fn should_retry_test_error(error: &TestError) -> bool {
415 matches!(error, TestError::Retryable(_))
416 }
417
418 pub fn create_test_error(msg: String) -> TestError {
419 TestError::Timeout(msg)
420 }
421}
422
423#[cfg(test)]
432mod tests {
433 use std::sync::{
434 Arc,
435 atomic::{AtomicU32, Ordering},
436 };
437
438 #[cfg(all(feature = "simulation", madsim))]
439 use madsim::task::{spawn, yield_now};
440 use nautilus_core::MUTEX_POISONED;
441 use rstest::rstest;
442 #[cfg(not(all(feature = "simulation", madsim)))]
443 use tokio::task::{spawn, yield_now};
444
445 use super::{test_utils::*, *};
446 use crate::dst::time;
447
448 const MAX_WAIT_ITERS: usize = 10_000;
449 const MAX_ADVANCE_ITERS: usize = 10_000;
450
451 #[cfg(all(feature = "simulation", madsim))]
452 pub(crate) async fn advance_clock(d: Duration) {
453 madsim::time::advance(d);
454 madsim::task::yield_now().await;
455 }
456
457 #[cfg(not(all(feature = "simulation", madsim)))]
458 pub(crate) async fn advance_clock(d: Duration) {
459 tokio::time::advance(d).await;
460 }
461
462 pub(crate) async fn yield_until<F>(mut condition: F)
463 where
464 F: FnMut() -> bool,
465 {
466 for _ in 0..MAX_WAIT_ITERS {
467 if condition() {
468 return;
469 }
470 yield_now().await;
471 }
472
473 panic!("yield_until timed out waiting for condition");
474 }
475
476 pub(crate) async fn advance_until<F>(mut condition: F)
477 where
478 F: FnMut() -> bool,
479 {
480 for _ in 0..MAX_ADVANCE_ITERS {
481 if condition() {
482 return;
483 }
484 advance_clock(Duration::from_millis(1)).await;
485 yield_now().await;
486 }
487
488 panic!("advance_until timed out waiting for condition");
489 }
490
491 #[rstest]
492 fn test_retry_config_default() {
493 let config = RetryConfig::default();
494 assert_eq!(config.max_retries, 3);
495 assert_eq!(config.initial_delay_ms, 1_000);
496 assert_eq!(config.max_delay_ms, 10_000);
497 #[allow(clippy::float_cmp)]
498 {
499 assert_eq!(config.backoff_factor, 2.0);
500 }
501 assert_eq!(config.jitter_ms, 100);
502 assert_eq!(config.operation_timeout_ms, Some(30_000));
503 assert!(!config.immediate_first);
504 assert_eq!(config.max_elapsed_ms, None);
505 }
506
507 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
508 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
509 async fn test_retry_manager_success_first_attempt() {
510 let manager = RetryManager::new(RetryConfig::default());
511
512 let result = manager
513 .execute_with_retry(
514 "test_operation",
515 || async { Ok::<i32, TestError>(42) },
516 should_retry_test_error,
517 create_test_error,
518 )
519 .await;
520
521 assert_eq!(result.unwrap(), 42);
522 }
523
524 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
525 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
526 async fn test_retry_manager_non_retryable_error() {
527 let manager = RetryManager::new(RetryConfig::default());
528
529 let result = manager
530 .execute_with_retry(
531 "test_operation",
532 || async { Err::<i32, TestError>(TestError::NonRetryable("test".to_string())) },
533 should_retry_test_error,
534 create_test_error,
535 )
536 .await;
537
538 assert!(result.is_err());
539 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
540 }
541
542 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
543 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
544 async fn test_retry_manager_retryable_error_exhausted() {
545 let config = RetryConfig {
546 max_retries: 2,
547 initial_delay_ms: 10,
548 max_delay_ms: 50,
549 backoff_factor: 2.0,
550 jitter_ms: 0,
551 operation_timeout_ms: None,
552 immediate_first: false,
553 max_elapsed_ms: None,
554 };
555 let manager = RetryManager::new(config);
556
557 let result = manager
558 .execute_with_retry(
559 "test_operation",
560 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
561 should_retry_test_error,
562 create_test_error,
563 )
564 .await;
565
566 assert!(result.is_err());
567 assert!(matches!(result.unwrap_err(), TestError::Retryable(_)));
568 }
569
570 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
571 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
572 async fn test_timeout_path() {
573 let config = RetryConfig {
574 max_retries: 2,
575 initial_delay_ms: 10,
576 max_delay_ms: 50,
577 backoff_factor: 2.0,
578 jitter_ms: 0,
579 operation_timeout_ms: Some(50),
580 immediate_first: false,
581 max_elapsed_ms: None,
582 };
583 let manager = RetryManager::new(config);
584
585 let result = manager
586 .execute_with_retry(
587 "test_timeout",
588 || async {
589 time::sleep(Duration::from_millis(100)).await;
590 Ok::<i32, TestError>(42)
591 },
592 should_retry_test_error,
593 create_test_error,
594 )
595 .await;
596
597 assert!(result.is_err());
598 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
599 }
600
601 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
602 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
603 async fn test_max_elapsed_time_budget() {
604 let config = RetryConfig {
605 max_retries: 10,
606 initial_delay_ms: 50,
607 max_delay_ms: 100,
608 backoff_factor: 2.0,
609 jitter_ms: 0,
610 operation_timeout_ms: None,
611 immediate_first: false,
612 max_elapsed_ms: Some(200),
613 };
614 let manager = RetryManager::new(config);
615
616 let start = time::Instant::now();
617 let result = manager
618 .execute_with_retry(
619 "test_budget",
620 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
621 should_retry_test_error,
622 create_test_error,
623 )
624 .await;
625
626 let elapsed = start.elapsed();
627 assert!(result.is_err());
628 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
629 assert!(elapsed.as_millis() >= 150);
630 assert!(elapsed.as_millis() < 1000);
631 }
632
633 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
634 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
635 async fn test_budget_exceeded_message_format() {
636 let config = RetryConfig {
637 max_retries: 5,
638 initial_delay_ms: 10,
639 max_delay_ms: 20,
640 backoff_factor: 1.0,
641 jitter_ms: 0,
642 operation_timeout_ms: None,
643 immediate_first: false,
644 max_elapsed_ms: Some(35),
645 };
646 let manager = RetryManager::new(config);
647
648 let result = manager
649 .execute_with_retry(
650 "test_budget_msg",
651 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
652 should_retry_test_error,
653 create_test_error,
654 )
655 .await;
656
657 assert!(result.is_err());
658 let error_msg = result.unwrap_err().to_string();
659
660 assert!(error_msg.contains("Retry budget exceeded"));
661 assert!(error_msg.contains("/6)"));
662
663 if let Some(captures) = error_msg.strip_prefix("Timeout error: Retry budget exceeded (")
664 && let Some(nums) = captures.strip_suffix(")")
665 {
666 let parts: Vec<&str> = nums.split('/').collect();
667 assert_eq!(parts.len(), 2);
668 let current: u32 = parts[0].parse().unwrap();
669 let total: u32 = parts[1].parse().unwrap();
670
671 assert_eq!(total, 6, "Total should be max_retries + 1");
672 assert!(current <= total, "Current attempt should not exceed total");
673 assert!(current >= 1, "Current attempt should be at least 1");
674 }
675 }
676
677 #[cfg_attr(
678 not(all(feature = "simulation", madsim)),
679 tokio::test(start_paused = true)
680 )]
681 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
682 async fn test_budget_exceeded_edge_cases() {
683 let config = RetryConfig {
684 max_retries: 2,
685 initial_delay_ms: 50,
686 max_delay_ms: 100,
687 backoff_factor: 1.0,
688 jitter_ms: 0,
689 operation_timeout_ms: None,
690 immediate_first: false,
691 max_elapsed_ms: Some(100),
692 };
693 let manager = RetryManager::new(config);
694
695 let attempt_count = Arc::new(AtomicU32::new(0));
696 let count_clone = attempt_count.clone();
697
698 let handle = spawn(async move {
699 manager
700 .execute_with_retry(
701 "test_first_attempt",
702 move || {
703 let count = count_clone.clone();
704 async move {
705 count.fetch_add(1, Ordering::SeqCst);
706 Err::<i32, TestError>(TestError::Retryable("test".to_string()))
707 }
708 },
709 should_retry_test_error,
710 create_test_error,
711 )
712 .await
713 });
714
715 yield_until(|| attempt_count.load(Ordering::SeqCst) >= 1).await;
717
718 advance_clock(Duration::from_millis(101)).await;
720 yield_now().await;
721
722 let result = handle.await.unwrap();
723 assert!(result.is_err());
724 let error_msg = result.unwrap_err().to_string();
725
726 assert!(
728 error_msg.contains("(2/3)"),
729 "Expected (2/3) but got: {error_msg}"
730 );
731 }
732
733 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
734 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
735 async fn test_budget_exceeded_no_overflow() {
736 let config = RetryConfig {
737 max_retries: u32::MAX,
738 initial_delay_ms: 10,
739 max_delay_ms: 20,
740 backoff_factor: 1.0,
741 jitter_ms: 0,
742 operation_timeout_ms: None,
743 immediate_first: false,
744 max_elapsed_ms: Some(1),
745 };
746 let manager = RetryManager::new(config);
747
748 let result = manager
749 .execute_with_retry(
750 "test_overflow",
751 || async { Err::<i32, TestError>(TestError::Retryable("test".to_string())) },
752 should_retry_test_error,
753 create_test_error,
754 )
755 .await;
756
757 assert!(result.is_err());
758 let error_msg = result.unwrap_err().to_string();
759
760 assert!(error_msg.contains("Retry budget exceeded"));
762 assert!(error_msg.contains(&format!("/{}", u32::MAX)));
763 }
764
765 #[rstest]
766 fn test_http_retry_manager_config() {
767 let manager = create_http_retry_manager::<TestError>();
768 assert_eq!(manager.config.max_retries, 3);
769 assert!(!manager.config.immediate_first);
770 assert_eq!(manager.config.max_elapsed_ms, Some(180_000));
771 }
772
773 #[rstest]
774 fn test_websocket_retry_manager_config() {
775 let manager = create_websocket_retry_manager::<TestError>();
776 assert_eq!(manager.config.max_retries, 5);
777 assert!(manager.config.immediate_first);
778 assert_eq!(manager.config.max_elapsed_ms, Some(120_000));
779 }
780
781 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
782 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
783 async fn test_timeout_respects_retry_predicate() {
784 let config = RetryConfig {
785 max_retries: 3,
786 initial_delay_ms: 10,
787 max_delay_ms: 50,
788 backoff_factor: 2.0,
789 jitter_ms: 0,
790 operation_timeout_ms: Some(50),
791 immediate_first: false,
792 max_elapsed_ms: None,
793 };
794 let manager = RetryManager::new(config);
795
796 let should_not_retry_timeouts = |error: &TestError| !matches!(error, TestError::Timeout(_));
798
799 let result = manager
800 .execute_with_retry(
801 "test_timeout_non_retryable",
802 || async {
803 time::sleep(Duration::from_millis(100)).await;
804 Ok::<i32, TestError>(42)
805 },
806 should_not_retry_timeouts,
807 create_test_error,
808 )
809 .await;
810
811 assert!(result.is_err());
813 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
814 }
815
816 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
817 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
818 async fn test_timeout_retries_when_predicate_allows() {
819 let config = RetryConfig {
820 max_retries: 2,
821 initial_delay_ms: 10,
822 max_delay_ms: 50,
823 backoff_factor: 2.0,
824 jitter_ms: 0,
825 operation_timeout_ms: Some(50),
826 immediate_first: false,
827 max_elapsed_ms: None,
828 };
829 let manager = RetryManager::new(config);
830
831 let should_retry_timeouts = |error: &TestError| matches!(error, TestError::Timeout(_));
833
834 let start = time::Instant::now();
835 let result = manager
836 .execute_with_retry(
837 "test_timeout_retryable",
838 || async {
839 time::sleep(Duration::from_millis(100)).await;
840 Ok::<i32, TestError>(42)
841 },
842 should_retry_timeouts,
843 create_test_error,
844 )
845 .await;
846
847 let elapsed = start.elapsed();
848
849 assert!(result.is_err());
851 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
852 assert!(elapsed.as_millis() > 80); }
855
856 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
857 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
858 async fn test_successful_retry_after_failures() {
859 let config = RetryConfig {
860 max_retries: 3,
861 initial_delay_ms: 10,
862 max_delay_ms: 50,
863 backoff_factor: 2.0,
864 jitter_ms: 0,
865 operation_timeout_ms: None,
866 immediate_first: false,
867 max_elapsed_ms: None,
868 };
869 let manager = RetryManager::new(config);
870
871 let attempt_counter = Arc::new(AtomicU32::new(0));
872 let counter_clone = attempt_counter.clone();
873
874 let result = manager
875 .execute_with_retry(
876 "test_eventual_success",
877 move || {
878 let counter = counter_clone.clone();
879 async move {
880 let attempts = counter.fetch_add(1, Ordering::SeqCst);
881 if attempts < 2 {
882 Err(TestError::Retryable("temporary failure".to_string()))
883 } else {
884 Ok(42)
885 }
886 }
887 },
888 should_retry_test_error,
889 create_test_error,
890 )
891 .await;
892
893 assert_eq!(result.unwrap(), 42);
894 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
895 }
896
897 #[cfg_attr(
898 not(all(feature = "simulation", madsim)),
899 tokio::test(start_paused = true)
900 )]
901 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
902 async fn test_immediate_first_retry() {
903 let config = RetryConfig {
904 max_retries: 2,
905 initial_delay_ms: 100,
906 max_delay_ms: 200,
907 backoff_factor: 2.0,
908 jitter_ms: 0,
909 operation_timeout_ms: None,
910 immediate_first: true,
911 max_elapsed_ms: None,
912 };
913 let manager = RetryManager::new(config);
914
915 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
916 let times_clone = attempt_times.clone();
917 let start = time::Instant::now();
918
919 let handle = spawn({
920 let times_clone = times_clone.clone();
921 async move {
922 let _ = manager
923 .execute_with_retry(
924 "test_immediate",
925 move || {
926 let times = times_clone.clone();
927 async move {
928 times.lock().expect(MUTEX_POISONED).push(start.elapsed());
929 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
930 }
931 },
932 should_retry_test_error,
933 create_test_error,
934 )
935 .await;
936 }
937 });
938
939 yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 2).await;
941
942 advance_clock(Duration::from_millis(100)).await;
944 yield_now().await;
945
946 yield_until(|| attempt_times.lock().expect(MUTEX_POISONED).len() >= 3).await;
948
949 handle.await.unwrap();
950
951 let times = attempt_times.lock().expect(MUTEX_POISONED);
952 assert_eq!(times.len(), 3); assert!(times[1] <= Duration::from_millis(1));
956 assert!(times[2] >= Duration::from_millis(100));
958 assert!(times[2] <= Duration::from_millis(110));
959 }
960
961 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
962 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
963 async fn test_operation_without_timeout() {
964 let config = RetryConfig {
965 max_retries: 2,
966 initial_delay_ms: 10,
967 max_delay_ms: 50,
968 backoff_factor: 2.0,
969 jitter_ms: 0,
970 operation_timeout_ms: None, immediate_first: false,
972 max_elapsed_ms: None,
973 };
974 let manager = RetryManager::new(config);
975
976 let start = time::Instant::now();
977 let result = manager
978 .execute_with_retry(
979 "test_no_timeout",
980 || async {
981 time::sleep(Duration::from_millis(50)).await;
982 Ok::<i32, TestError>(42)
983 },
984 should_retry_test_error,
985 create_test_error,
986 )
987 .await;
988
989 let elapsed = start.elapsed();
990 assert_eq!(result.unwrap(), 42);
991 assert!(elapsed.as_millis() >= 30);
993 assert!(elapsed.as_millis() < 200);
994 }
995
996 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
997 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
998 async fn test_zero_retries() {
999 let config = RetryConfig {
1000 max_retries: 0,
1001 initial_delay_ms: 10,
1002 max_delay_ms: 50,
1003 backoff_factor: 2.0,
1004 jitter_ms: 0,
1005 operation_timeout_ms: None,
1006 immediate_first: false,
1007 max_elapsed_ms: None,
1008 };
1009 let manager = RetryManager::new(config);
1010
1011 let attempt_counter = Arc::new(AtomicU32::new(0));
1012 let counter_clone = attempt_counter.clone();
1013
1014 let result = manager
1015 .execute_with_retry(
1016 "test_no_retries",
1017 move || {
1018 let counter = counter_clone.clone();
1019 async move {
1020 counter.fetch_add(1, Ordering::SeqCst);
1021 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1022 }
1023 },
1024 should_retry_test_error,
1025 create_test_error,
1026 )
1027 .await;
1028
1029 assert!(result.is_err());
1030 assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
1032 }
1033
1034 #[cfg_attr(
1035 not(all(feature = "simulation", madsim)),
1036 tokio::test(start_paused = true)
1037 )]
1038 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
1039 async fn test_jitter_applied() {
1040 let config = RetryConfig {
1041 max_retries: 2,
1042 initial_delay_ms: 50,
1043 max_delay_ms: 100,
1044 backoff_factor: 2.0,
1045 jitter_ms: 50, operation_timeout_ms: None,
1047 immediate_first: false,
1048 max_elapsed_ms: None,
1049 };
1050 let manager = RetryManager::new(config);
1051
1052 let delays = Arc::new(std::sync::Mutex::new(Vec::new()));
1053 let delays_clone = delays.clone();
1054 let last_time = Arc::new(std::sync::Mutex::new(time::Instant::now()));
1055 let last_time_clone = last_time.clone();
1056
1057 let handle = spawn({
1058 let delays_clone = delays_clone.clone();
1059 async move {
1060 let _ = manager
1061 .execute_with_retry(
1062 "test_jitter",
1063 move || {
1064 let delays = delays_clone.clone();
1065 let last_time = last_time_clone.clone();
1066 async move {
1067 let now = time::Instant::now();
1068 let delay = {
1069 let mut last = last_time.lock().expect(MUTEX_POISONED);
1070 let d = now.duration_since(*last);
1071 *last = now;
1072 d
1073 };
1074 delays.lock().expect(MUTEX_POISONED).push(delay);
1075 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1076 }
1077 },
1078 should_retry_test_error,
1079 create_test_error,
1080 )
1081 .await;
1082 }
1083 });
1084
1085 yield_until(|| !delays.lock().expect(MUTEX_POISONED).is_empty()).await;
1086 advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 2).await;
1087 advance_until(|| delays.lock().expect(MUTEX_POISONED).len() >= 3).await;
1088
1089 handle.await.unwrap();
1090
1091 let delays = delays.lock().expect(MUTEX_POISONED);
1092 for delay in delays.iter().skip(1) {
1094 assert!(delay.as_millis() >= 50);
1096 assert!(delay.as_millis() <= 151);
1098 }
1099 }
1100
1101 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
1102 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
1103 async fn test_max_elapsed_stops_early() {
1104 let config = RetryConfig {
1105 max_retries: 100, initial_delay_ms: 50,
1107 max_delay_ms: 100,
1108 backoff_factor: 1.5,
1109 jitter_ms: 0,
1110 operation_timeout_ms: None,
1111 immediate_first: false,
1112 max_elapsed_ms: Some(150), };
1114 let manager = RetryManager::new(config);
1115
1116 let attempt_counter = Arc::new(AtomicU32::new(0));
1117 let counter_clone = attempt_counter.clone();
1118
1119 let start = time::Instant::now();
1120 let result = manager
1121 .execute_with_retry(
1122 "test_elapsed_limit",
1123 move || {
1124 let counter = counter_clone.clone();
1125 async move {
1126 counter.fetch_add(1, Ordering::SeqCst);
1127 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1128 }
1129 },
1130 should_retry_test_error,
1131 create_test_error,
1132 )
1133 .await;
1134
1135 let elapsed = start.elapsed();
1136 assert!(result.is_err());
1137 assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1138
1139 let attempts = attempt_counter.load(Ordering::SeqCst);
1141 assert!(attempts < 10); assert!(elapsed.as_millis() >= 100);
1143 }
1144
1145 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
1146 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
1147 async fn test_mixed_errors_retry_behavior() {
1148 let config = RetryConfig {
1149 max_retries: 5,
1150 initial_delay_ms: 10,
1151 max_delay_ms: 50,
1152 backoff_factor: 2.0,
1153 jitter_ms: 0,
1154 operation_timeout_ms: None,
1155 immediate_first: false,
1156 max_elapsed_ms: None,
1157 };
1158 let manager = RetryManager::new(config);
1159
1160 let attempt_counter = Arc::new(AtomicU32::new(0));
1161 let counter_clone = attempt_counter.clone();
1162
1163 let result = manager
1164 .execute_with_retry(
1165 "test_mixed_errors",
1166 move || {
1167 let counter = counter_clone.clone();
1168 async move {
1169 let attempts = counter.fetch_add(1, Ordering::SeqCst);
1170 match attempts {
1171 0 => Err(TestError::Retryable("retry 1".to_string())),
1172 1 => Err(TestError::Retryable("retry 2".to_string())),
1173 2 => Err(TestError::NonRetryable("stop here".to_string())),
1174 _ => Ok(42),
1175 }
1176 }
1177 },
1178 should_retry_test_error,
1179 create_test_error,
1180 )
1181 .await;
1182
1183 assert!(result.is_err());
1184 assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1185 assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
1187 }
1188
1189 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
1190 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
1191 async fn test_cancellation_during_retry_delay() {
1192 use tokio_util::sync::CancellationToken;
1193
1194 let config = RetryConfig {
1195 max_retries: 10,
1196 initial_delay_ms: 500, max_delay_ms: 1000,
1198 backoff_factor: 2.0,
1199 jitter_ms: 0,
1200 operation_timeout_ms: None,
1201 immediate_first: false,
1202 max_elapsed_ms: None,
1203 };
1204 let manager = RetryManager::new(config);
1205
1206 let token = CancellationToken::new();
1207 let token_clone = token.clone();
1208
1209 spawn(async move {
1211 time::sleep(Duration::from_millis(100)).await;
1212 token_clone.cancel();
1213 });
1214
1215 let attempt_counter = Arc::new(AtomicU32::new(0));
1216 let counter_clone = attempt_counter.clone();
1217
1218 let start = time::Instant::now();
1219 let result = manager
1220 .execute_with_retry_with_cancel(
1221 "test_cancellation",
1222 move || {
1223 let counter = counter_clone.clone();
1224 async move {
1225 counter.fetch_add(1, Ordering::SeqCst);
1226 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1227 }
1228 },
1229 should_retry_test_error,
1230 create_test_error,
1231 &token,
1232 )
1233 .await;
1234
1235 let elapsed = start.elapsed();
1236
1237 assert!(result.is_err());
1239 let error_msg = format!("{}", result.unwrap_err());
1240 assert!(error_msg.contains("canceled"));
1241
1242 assert!(elapsed.as_millis() < 600);
1244
1245 let attempts = attempt_counter.load(Ordering::SeqCst);
1247 assert!(attempts >= 1);
1248 }
1249
1250 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
1251 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
1252 async fn test_cancellation_during_operation_execution() {
1253 use tokio_util::sync::CancellationToken;
1254
1255 let config = RetryConfig {
1256 max_retries: 5,
1257 initial_delay_ms: 50,
1258 max_delay_ms: 100,
1259 backoff_factor: 2.0,
1260 jitter_ms: 0,
1261 operation_timeout_ms: None,
1262 immediate_first: false,
1263 max_elapsed_ms: None,
1264 };
1265 let manager = RetryManager::new(config);
1266
1267 let token = CancellationToken::new();
1268 let token_clone = token.clone();
1269
1270 spawn(async move {
1272 time::sleep(Duration::from_millis(50)).await;
1273 token_clone.cancel();
1274 });
1275
1276 let start = time::Instant::now();
1277 let result = manager
1278 .execute_with_retry_with_cancel(
1279 "test_cancellation_during_op",
1280 || async {
1281 time::sleep(Duration::from_millis(200)).await;
1283 Ok::<i32, TestError>(42)
1284 },
1285 should_retry_test_error,
1286 create_test_error,
1287 &token,
1288 )
1289 .await;
1290
1291 let elapsed = start.elapsed();
1292
1293 assert!(result.is_err());
1295 let error_msg = format!("{}", result.unwrap_err());
1296 assert!(error_msg.contains("canceled"));
1297
1298 assert!(elapsed.as_millis() < 250);
1300 }
1301
1302 #[cfg_attr(not(all(feature = "simulation", madsim)), tokio::test)]
1303 #[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
1304 async fn test_cancellation_error_message() {
1305 use tokio_util::sync::CancellationToken;
1306
1307 let config = RetryConfig::default();
1308 let manager = RetryManager::new(config);
1309
1310 let token = CancellationToken::new();
1311 token.cancel(); let result = manager
1314 .execute_with_retry_with_cancel(
1315 "test_operation",
1316 || async { Ok::<i32, TestError>(42) },
1317 should_retry_test_error,
1318 create_test_error,
1319 &token,
1320 )
1321 .await;
1322
1323 assert!(result.is_err());
1324 let error_msg = format!("{}", result.unwrap_err());
1325 assert!(error_msg.contains("canceled"));
1326 }
1327}
1328
1329#[cfg(test)]
1330mod proptest_tests {
1331 use std::sync::{
1332 Arc,
1333 atomic::{AtomicU32, Ordering},
1334 };
1335
1336 #[cfg(all(feature = "simulation", madsim))]
1337 use madsim::task::spawn;
1338 use nautilus_core::MUTEX_POISONED;
1339 use proptest::prelude::*;
1340 use rstest::rstest;
1342 #[cfg(not(all(feature = "simulation", madsim)))]
1343 use tokio::task::spawn;
1344
1345 #[cfg(not(all(feature = "simulation", madsim)))]
1346 use super::tests::{advance_until, yield_until};
1347 use super::{test_utils::*, tests::advance_clock, *};
1348 use crate::dst::time;
1349
1350 #[cfg(all(feature = "simulation", madsim))]
1355 fn build_paused_runtime() -> madsim::runtime::Runtime {
1356 madsim::runtime::Runtime::new()
1357 }
1358
1359 #[cfg(not(all(feature = "simulation", madsim)))]
1360 fn build_paused_runtime() -> tokio::runtime::Runtime {
1361 tokio::runtime::Builder::new_current_thread()
1362 .enable_time()
1363 .start_paused(true)
1364 .build()
1365 .unwrap()
1366 }
1367
1368 proptest! {
1369 #[rstest]
1370 fn test_retry_config_valid_ranges(
1371 max_retries in 0u32..100,
1372 initial_delay_ms in 1u64..10_000,
1373 max_delay_ms in 1u64..60_000,
1374 backoff_factor in 1.0f64..10.0,
1375 jitter_ms in 0u64..1_000,
1376 operation_timeout_ms in prop::option::of(1u64..120_000),
1377 immediate_first in any::<bool>(),
1378 max_elapsed_ms in prop::option::of(1u64..300_000)
1379 ) {
1380 let max_delay_ms = max_delay_ms.max(initial_delay_ms);
1382
1383 let config = RetryConfig {
1384 max_retries,
1385 initial_delay_ms,
1386 max_delay_ms,
1387 backoff_factor,
1388 jitter_ms,
1389 operation_timeout_ms,
1390 immediate_first,
1391 max_elapsed_ms,
1392 };
1393
1394 let _manager = RetryManager::<std::io::Error>::new(config);
1396 }
1397
1398 #[rstest]
1399 fn test_retry_attempts_bounded(
1400 max_retries in 0u32..5,
1401 initial_delay_ms in 1u64..10,
1402 backoff_factor in 1.0f64..2.0,
1403 ) {
1404 let rt = build_paused_runtime();
1405
1406 let config = RetryConfig {
1407 max_retries,
1408 initial_delay_ms,
1409 max_delay_ms: initial_delay_ms * 2,
1410 backoff_factor,
1411 jitter_ms: 0,
1412 operation_timeout_ms: None,
1413 immediate_first: false,
1414 max_elapsed_ms: None,
1415 };
1416
1417 let manager = RetryManager::new(config);
1418 let attempt_counter = Arc::new(AtomicU32::new(0));
1419 let counter_clone = attempt_counter.clone();
1420
1421 let _result = rt.block_on(manager.execute_with_retry(
1422 "prop_test",
1423 move || {
1424 let counter = counter_clone.clone();
1425 async move {
1426 counter.fetch_add(1, Ordering::SeqCst);
1427 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1428 }
1429 },
1430 |e: &TestError| matches!(e, TestError::Retryable(_)),
1431 TestError::Timeout,
1432 ));
1433
1434 let attempts = attempt_counter.load(Ordering::SeqCst);
1435 prop_assert_eq!(attempts, max_retries + 1);
1437 }
1438
1439 #[rstest]
1440 fn test_timeout_always_respected(
1441 timeout_ms in 10u64..50,
1442 operation_delay_ms in 60u64..100,
1443 ) {
1444 let rt = build_paused_runtime();
1445
1446 let config = RetryConfig {
1447 max_retries: 0, initial_delay_ms: 10,
1449 max_delay_ms: 100,
1450 backoff_factor: 2.0,
1451 jitter_ms: 0,
1452 operation_timeout_ms: Some(timeout_ms),
1453 immediate_first: false,
1454 max_elapsed_ms: None,
1455 };
1456
1457 let manager = RetryManager::new(config);
1458
1459 let result = rt.block_on(async {
1460 let operation_future = manager.execute_with_retry(
1461 "timeout_test",
1462 move || async move {
1463 time::sleep(Duration::from_millis(operation_delay_ms)).await;
1464 Ok::<i32, TestError>(42)
1465 },
1466 |_: &TestError| true,
1467 TestError::Timeout,
1468 );
1469
1470 advance_clock(Duration::from_millis(timeout_ms + 10)).await;
1472 operation_future.await
1473 });
1474
1475 prop_assert!(result.is_err());
1477 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1478 }
1479
1480 #[rstest]
1481 fn test_max_elapsed_always_respected(
1482 max_elapsed_ms in 20u64..50,
1483 delay_per_retry in 15u64..30,
1484 max_retries in 10u32..20,
1485 ) {
1486 let rt = build_paused_runtime();
1487
1488 let config = RetryConfig {
1490 max_retries,
1491 initial_delay_ms: delay_per_retry,
1492 max_delay_ms: delay_per_retry * 2,
1493 backoff_factor: 1.0, jitter_ms: 0,
1495 operation_timeout_ms: None,
1496 immediate_first: false,
1497 max_elapsed_ms: Some(max_elapsed_ms),
1498 };
1499
1500 let manager = RetryManager::new(config);
1501 let attempt_counter = Arc::new(AtomicU32::new(0));
1502 let counter_clone = attempt_counter.clone();
1503
1504 let result = rt.block_on(async {
1505 let operation_future = manager.execute_with_retry(
1506 "elapsed_test",
1507 move || {
1508 let counter = counter_clone.clone();
1509 async move {
1510 counter.fetch_add(1, Ordering::SeqCst);
1511 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1512 }
1513 },
1514 |e: &TestError| matches!(e, TestError::Retryable(_)),
1515 TestError::Timeout,
1516 );
1517
1518 advance_clock(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1520 operation_future.await
1521 });
1522
1523 let attempts = attempt_counter.load(Ordering::SeqCst);
1524
1525 prop_assert!(result.is_err());
1527 prop_assert!(matches!(result.unwrap_err(), TestError::Timeout(_)));
1528
1529 prop_assert!(attempts <= max_retries + 1);
1531 }
1532
1533 #[rstest]
1534 fn test_jitter_bounds(
1535 jitter_ms in 0u64..20,
1536 base_delay_ms in 10u64..30,
1537 ) {
1538 let rt = build_paused_runtime();
1539
1540 let config = RetryConfig {
1541 max_retries: 2,
1542 initial_delay_ms: base_delay_ms,
1543 max_delay_ms: base_delay_ms * 2,
1544 backoff_factor: 1.0, jitter_ms,
1546 operation_timeout_ms: None,
1547 immediate_first: false,
1548 max_elapsed_ms: None,
1549 };
1550
1551 let manager = RetryManager::new(config);
1552 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1553 let attempt_times_for_block = attempt_times.clone();
1554
1555 rt.block_on(async move {
1556 #[cfg(not(all(feature = "simulation", madsim)))]
1557 let attempt_times_for_wait = attempt_times_for_block.clone();
1558 let handle = spawn({
1559 let attempt_times_for_task = attempt_times_for_block.clone();
1560 let manager = manager;
1561 async move {
1562 let start_time = time::Instant::now();
1563 let _ = manager
1564 .execute_with_retry(
1565 "jitter_test",
1566 move || {
1567 let attempt_times_inner = attempt_times_for_task.clone();
1568 async move {
1569 attempt_times_inner
1570 .lock()
1571 .unwrap()
1572 .push(start_time.elapsed());
1573 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1574 }
1575 },
1576 |e: &TestError| matches!(e, TestError::Retryable(_)),
1577 TestError::Timeout,
1578 )
1579 .await;
1580 }
1581 });
1582
1583 #[cfg(not(all(feature = "simulation", madsim)))]
1588 {
1589 yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1590 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1591 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1592 }
1593
1594 handle.await.unwrap();
1595 });
1596
1597 let times = attempt_times.lock().expect(MUTEX_POISONED);
1598
1599 prop_assert!(times.len() >= 2);
1601
1602 prop_assert!(times[0].as_millis() < 5);
1604
1605 for i in 1..times.len() {
1607 let delay_from_previous = if i == 1 {
1608 times[i].checked_sub(times[0]).unwrap()
1609 } else {
1610 times[i].checked_sub(times[i - 1]).unwrap()
1611 };
1612
1613 prop_assert!(
1615 delay_from_previous.as_millis() >= u128::from(base_delay_ms),
1616 "Retry {} delay {}ms is less than base {}ms",
1617 i, delay_from_previous.as_millis(), base_delay_ms
1618 );
1619
1620 prop_assert!(
1622 delay_from_previous.as_millis() <= u128::from(base_delay_ms + jitter_ms + 1),
1623 "Retry {} delay {}ms exceeds base {} + jitter {}",
1624 i, delay_from_previous.as_millis(), base_delay_ms, jitter_ms
1625 );
1626 }
1627 }
1628
1629 #[rstest]
1630 fn test_immediate_first_property(
1631 immediate_first in any::<bool>(),
1632 initial_delay_ms in 10u64..30,
1633 ) {
1634 let rt = build_paused_runtime();
1635
1636 let config = RetryConfig {
1637 max_retries: 2,
1638 initial_delay_ms,
1639 max_delay_ms: initial_delay_ms * 2,
1640 backoff_factor: 2.0,
1641 jitter_ms: 0,
1642 operation_timeout_ms: None,
1643 immediate_first,
1644 max_elapsed_ms: None,
1645 };
1646
1647 let manager = RetryManager::new(config);
1648 let attempt_times = Arc::new(std::sync::Mutex::new(Vec::new()));
1649 let attempt_times_for_block = attempt_times.clone();
1650
1651 rt.block_on(async move {
1652 #[cfg(not(all(feature = "simulation", madsim)))]
1653 let attempt_times_for_wait = attempt_times_for_block.clone();
1654 let handle = spawn({
1655 let attempt_times_for_task = attempt_times_for_block.clone();
1656 let manager = manager;
1657 async move {
1658 let start = time::Instant::now();
1659 let _ = manager
1660 .execute_with_retry(
1661 "immediate_test",
1662 move || {
1663 let attempt_times_inner = attempt_times_for_task.clone();
1664 async move {
1665 let elapsed = start.elapsed();
1666 attempt_times_inner.lock().expect(MUTEX_POISONED).push(elapsed);
1667 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1668 }
1669 },
1670 |e: &TestError| matches!(e, TestError::Retryable(_)),
1671 TestError::Timeout,
1672 )
1673 .await;
1674 }
1675 });
1676
1677 #[cfg(not(all(feature = "simulation", madsim)))]
1681 {
1682 yield_until(|| !attempt_times_for_wait.lock().expect(MUTEX_POISONED).is_empty()).await;
1683 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 2).await;
1684 advance_until(|| attempt_times_for_wait.lock().expect(MUTEX_POISONED).len() >= 3).await;
1685 }
1686
1687 handle.await.unwrap();
1688 });
1689
1690 let times = attempt_times.lock().expect(MUTEX_POISONED);
1691 prop_assert!(times.len() >= 2);
1692
1693 if immediate_first {
1694 prop_assert!(times[1].as_millis() < 20,
1696 "With immediate_first=true, first retry took {}ms",
1697 times[1].as_millis());
1698 } else {
1699 prop_assert!(times[1].as_millis() >= u128::from(initial_delay_ms - 1),
1701 "With immediate_first=false, first retry was too fast: {}ms",
1702 times[1].as_millis());
1703 }
1704 }
1705
1706 #[rstest]
1707 fn test_non_retryable_stops_immediately(
1708 attempt_before_non_retryable in 0usize..3,
1709 max_retries in 3u32..5,
1710 ) {
1711 let rt = build_paused_runtime();
1712
1713 let config = RetryConfig {
1714 max_retries,
1715 initial_delay_ms: 10,
1716 max_delay_ms: 100,
1717 backoff_factor: 2.0,
1718 jitter_ms: 0,
1719 operation_timeout_ms: None,
1720 immediate_first: false,
1721 max_elapsed_ms: None,
1722 };
1723
1724 let manager = RetryManager::new(config);
1725 let attempt_counter = Arc::new(AtomicU32::new(0));
1726 let counter_clone = attempt_counter.clone();
1727
1728 let result: Result<i32, TestError> = rt.block_on(manager.execute_with_retry(
1729 "non_retryable_test",
1730 move || {
1731 let counter = counter_clone.clone();
1732 async move {
1733 let attempts = counter.fetch_add(1, Ordering::SeqCst) as usize;
1734 if attempts == attempt_before_non_retryable {
1735 Err(TestError::NonRetryable("stop".to_string()))
1736 } else {
1737 Err(TestError::Retryable("retry".to_string()))
1738 }
1739 }
1740 },
1741 |e: &TestError| matches!(e, TestError::Retryable(_)),
1742 TestError::Timeout,
1743 ));
1744
1745 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1746
1747 prop_assert!(result.is_err());
1748 prop_assert!(matches!(result.unwrap_err(), TestError::NonRetryable(_)));
1749 prop_assert_eq!(attempts, attempt_before_non_retryable + 1);
1751 }
1752
1753 #[rstest]
1754 fn test_cancellation_stops_immediately(
1755 cancel_after_ms in 10u64..100,
1756 initial_delay_ms in 200u64..500,
1757 ) {
1758 use tokio_util::sync::CancellationToken;
1759
1760 let rt = build_paused_runtime();
1761
1762 let config = RetryConfig {
1763 max_retries: 10,
1764 initial_delay_ms,
1765 max_delay_ms: initial_delay_ms * 2,
1766 backoff_factor: 2.0,
1767 jitter_ms: 0,
1768 operation_timeout_ms: None,
1769 immediate_first: false,
1770 max_elapsed_ms: None,
1771 };
1772
1773 let manager = RetryManager::new(config);
1774 let token = CancellationToken::new();
1775 let token_clone = token.clone();
1776
1777 let result: Result<i32, TestError> = rt.block_on(async {
1778 spawn(async move {
1780 time::sleep(Duration::from_millis(cancel_after_ms)).await;
1781 token_clone.cancel();
1782 });
1783
1784 let operation_future = manager.execute_with_retry_with_cancel(
1785 "cancellation_test",
1786 || async {
1787 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1788 },
1789 |e: &TestError| matches!(e, TestError::Retryable(_)),
1790 create_test_error,
1791 &token,
1792 );
1793
1794 advance_clock(Duration::from_millis(cancel_after_ms + 10)).await;
1796 operation_future.await
1797 });
1798
1799 prop_assert!(result.is_err());
1801 let error_msg = format!("{}", result.unwrap_err());
1802 prop_assert!(error_msg.contains("canceled"));
1803 }
1804
1805 #[rstest]
1806 fn test_budget_clamp_prevents_overshoot(
1807 max_elapsed_ms in 10u64..30,
1808 delay_per_retry in 20u64..50,
1809 ) {
1810 let rt = build_paused_runtime();
1811
1812 let config = RetryConfig {
1814 max_retries: 5,
1815 initial_delay_ms: delay_per_retry,
1816 max_delay_ms: delay_per_retry * 2,
1817 backoff_factor: 1.0,
1818 jitter_ms: 0,
1819 operation_timeout_ms: None,
1820 immediate_first: false,
1821 max_elapsed_ms: Some(max_elapsed_ms),
1822 };
1823
1824 let manager = RetryManager::new(config);
1825
1826 let _result = rt.block_on(async {
1827 let operation_future = manager.execute_with_retry(
1828 "budget_clamp_test",
1829 || async {
1830 Err::<i32, TestError>(TestError::Retryable("fail".to_string()))
1832 },
1833 |e: &TestError| matches!(e, TestError::Retryable(_)),
1834 create_test_error,
1835 );
1836
1837 advance_clock(Duration::from_millis(max_elapsed_ms + delay_per_retry)).await;
1839 operation_future.await
1840 });
1841
1842 }
1845
1846 #[rstest]
1847 fn test_success_on_kth_attempt(
1848 k in 1usize..5,
1849 initial_delay_ms in 5u64..20,
1850 ) {
1851 let rt = build_paused_runtime();
1852
1853 let config = RetryConfig {
1854 max_retries: 10, initial_delay_ms,
1856 max_delay_ms: initial_delay_ms * 4,
1857 backoff_factor: 2.0,
1858 jitter_ms: 0,
1859 operation_timeout_ms: None,
1860 immediate_first: false,
1861 max_elapsed_ms: None,
1862 };
1863
1864 let manager = RetryManager::new(config);
1865 let attempt_counter = Arc::new(AtomicU32::new(0));
1866 let counter_clone = attempt_counter.clone();
1867 let target_k = k;
1868
1869 let (result, _elapsed) = rt.block_on(async {
1870 let start = time::Instant::now();
1871
1872 let operation_future = manager.execute_with_retry(
1873 "kth_attempt_test",
1874 move || {
1875 let counter = counter_clone.clone();
1876 async move {
1877 let attempt = counter.fetch_add(1, Ordering::SeqCst) as usize;
1878 if attempt + 1 == target_k {
1879 Ok(42)
1880 } else {
1881 Err(TestError::Retryable("retry".to_string()))
1882 }
1883 }
1884 },
1885 |e: &TestError| matches!(e, TestError::Retryable(_)),
1886 create_test_error,
1887 );
1888
1889 for _ in 0..k {
1891 advance_clock(Duration::from_millis(initial_delay_ms * 4)).await;
1892 }
1893
1894 let result = operation_future.await;
1895 let elapsed = start.elapsed();
1896
1897 (result, elapsed)
1898 });
1899
1900 let attempts = attempt_counter.load(Ordering::SeqCst) as usize;
1901
1902 prop_assert!(result.is_ok());
1904 prop_assert_eq!(result.unwrap(), 42);
1905 prop_assert_eq!(attempts, k);
1906 }
1907 }
1908}