1use rand::Rng;
24use std::future::Future;
25use std::time::Duration;
26
27use super::rpc_selector::RpcSelector;
28use crate::config::ServerConfig;
29use crate::constants::RETRY_JITTER_PERCENT;
30
31pub fn calculate_retry_delay(attempt: u8, base_delay_ms: u64, max_delay_ms: u64) -> Duration {
41 if base_delay_ms == 0 || max_delay_ms == 0 {
42 return Duration::from_millis(0);
43 }
44
45 let exp_backoff = if attempt > 63 {
47 max_delay_ms
48 } else {
49 let multiplier = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
51 base_delay_ms.saturating_mul(multiplier)
52 };
53
54 let delay_ms = exp_backoff.min(max_delay_ms);
55
56 apply_jitter(delay_ms)
57}
58
59fn apply_jitter(delay_ms: u64) -> Duration {
71 if delay_ms == 0 {
72 return Duration::from_millis(0);
73 }
74
75 let jitter_range = (delay_ms as f64 * RETRY_JITTER_PERCENT).floor() as u64;
77
78 if jitter_range == 0 {
79 return Duration::from_millis(delay_ms);
80 }
81
82 let mut rng = rand::rng();
83 let jitter_value = rng.random_range(0..=jitter_range);
84
85 let final_delay = if rng.random_bool(0.5) {
86 delay_ms.saturating_add(jitter_value)
87 } else {
88 delay_ms.saturating_sub(jitter_value)
89 };
90
91 Duration::from_millis(final_delay)
92}
93
94#[derive(Debug)]
96enum InternalRetryError<E> {
97 NonRetriable(E),
98 RetriesExhausted(E),
99}
100
101#[derive(Debug, Clone)]
103pub struct RetryConfig {
104 pub max_retries: u8,
106 pub max_failovers: u8,
108 pub base_delay_ms: u64,
110 pub max_delay_ms: u64,
112}
113
114impl RetryConfig {
115 pub fn new(max_retries: u8, max_failovers: u8, base_delay_ms: u64, max_delay_ms: u64) -> Self {
127 if (base_delay_ms == 0) != (max_delay_ms == 0) {
129 panic!(
130 "Delay values must be consistent: both zero (no delays) or both non-zero. Got base_delay_ms={}, max_delay_ms={}",
131 base_delay_ms, max_delay_ms
132 );
133 }
134
135 if base_delay_ms > 0 && max_delay_ms > 0 && max_delay_ms < base_delay_ms {
137 panic!(
138 "max_delay_ms ({}) must be >= base_delay_ms ({}) when both are non-zero",
139 max_delay_ms, base_delay_ms
140 );
141 }
142
143 Self {
144 max_retries,
145 max_failovers,
146 base_delay_ms,
147 max_delay_ms,
148 }
149 }
150
151 pub fn from_env() -> Self {
153 let config = ServerConfig::from_env();
154 Self::new(
155 config.provider_max_retries,
156 config.provider_max_failovers,
157 config.provider_retry_base_delay_ms,
158 config.provider_retry_max_delay_ms,
159 )
160 }
161}
162
163pub async fn retry_rpc_call<P, T, E, F, Fut, I>(
193 selector: &RpcSelector,
194 operation_name: &str,
195 is_retriable_error: impl Fn(&E) -> bool,
196 should_mark_provider_failed: impl Fn(&E) -> bool,
197 provider_initializer: I,
198 operation: F,
199 config: Option<RetryConfig>,
200) -> Result<T, E>
201where
202 P: Clone,
203 E: std::fmt::Display + From<String>,
204 F: Fn(P) -> Fut,
205 Fut: Future<Output = Result<T, E>>,
206 I: Fn(&str) -> Result<P, E>,
207{
208 let config = config.unwrap_or_else(RetryConfig::from_env);
209 let total_providers = selector.provider_count();
210 let max_failovers = std::cmp::min(config.max_failovers as usize, total_providers - 1);
211 let mut failover_count = 0;
212 let mut total_attempts = 0;
213 let mut last_error = None;
214
215 tracing::debug!(
216 operation_name = %operation_name,
217 max_retries = %config.max_retries,
218 max_failovers = %max_failovers,
219 total_providers = %total_providers,
220 "starting rpc call"
221 );
222
223 while failover_count <= max_failovers && selector.available_provider_count() > 0 {
224 let (provider, provider_url) =
226 match get_provider(selector, operation_name, &provider_initializer) {
227 Ok((provider, url)) => (provider, url),
228 Err(e) => {
229 last_error = Some(e);
230 failover_count += 1;
231
232 if failover_count > max_failovers || selector.available_provider_count() == 0 {
234 break;
235 }
236
237 selector.mark_current_as_failed();
239 continue;
240 }
241 };
242
243 tracing::debug!(
244 provider_url = %provider_url,
245 operation_name = %operation_name,
246 "selected provider"
247 );
248
249 match try_with_retries(
251 &provider,
252 &provider_url,
253 operation_name,
254 &operation,
255 &is_retriable_error,
256 &config,
257 &mut total_attempts,
258 )
259 .await
260 {
261 Ok(result) => {
262 tracing::debug!(
263 operation_name = %operation_name,
264 provider_url = %provider_url,
265 total_attempts = %total_attempts,
266 "rpc call succeeded"
267 );
268 return Ok(result);
269 }
270 Err(internal_err) => {
271 match internal_err {
272 InternalRetryError::NonRetriable(original_err) => {
273 if should_mark_provider_failed(&original_err)
275 && selector.available_provider_count() > 1
276 {
277 tracing::warn!(
278 error = %original_err,
279 provider_url = %provider_url,
280 operation_name = %operation_name,
281 "non-retriable error should mark provider as failed, marking as failed and switching to next provider"
282 );
283 selector.mark_current_as_failed();
284 }
285 return Err(original_err);
286 }
287 InternalRetryError::RetriesExhausted(original_err) => {
288 last_error = Some(original_err);
289
290 if selector.available_provider_count() > 1 {
293 tracing::warn!(
294 max_retries = %config.max_retries,
295 provider_url = %provider_url,
296 operation_name = %operation_name,
297 error = %last_error.as_ref().unwrap(),
298 failover_count = %(failover_count + 1),
299 max_failovers = %max_failovers,
300 "all retry attempts failed, marking as failed and switching to next provider"
301 );
302 selector.mark_current_as_failed();
303 failover_count += 1;
304 } else {
305 tracing::warn!(
306 max_retries = %config.max_retries,
307 provider_url = %provider_url,
308 operation_name = %operation_name,
309 error = %last_error.as_ref().unwrap(),
310 "all retry attempts failed, this is the last available provider, not marking as failed"
311 );
312 break;
313 }
314 }
315 }
316 }
317 }
318 }
319
320 match &last_error {
321 Some(e) => {
322 tracing::error!(
323 operation_name = %operation_name,
324 total_attempts = %total_attempts,
325 failover_count = %failover_count,
326 error = %e,
327 "rpc call failed after attempts across providers"
328 );
329 }
330 None => {
331 tracing::error!(
332 operation_name = %operation_name,
333 total_attempts = %total_attempts,
334 failover_count = %failover_count,
335 "rpc call failed after attempts across providers with no error details"
336 );
337 }
338 }
339
340 let error_message = match &last_error {
341 Some(e) => format!(
342 "RPC call '{}' failed after {} total attempts across {} providers: {}",
343 operation_name,
344 total_attempts,
345 failover_count,
346 e
347 ),
348 None => format!(
349 "RPC call '{}' failed after {} total attempts across {} providers with no error details",
350 operation_name,
351 total_attempts,
352 failover_count
353 )
354 };
355
356 Err(last_error.unwrap_or_else(|| E::from(error_message)))
358}
359
360fn get_provider<P, E, I>(
362 selector: &RpcSelector,
363 operation_name: &str,
364 provider_initializer: &I,
365) -> Result<(P, String), E>
366where
367 E: std::fmt::Display + From<String>,
368 I: Fn(&str) -> Result<P, E>,
369{
370 let provider_url = selector
372 .get_client(|url| Ok::<_, eyre::Report>(url.to_string()))
373 .map_err(|e| {
374 let err_msg = format!("Failed to get provider URL for {}: {}", operation_name, e);
375 tracing::warn!(operation_name = %operation_name, error = %e, "failed to get provider url");
376 E::from(err_msg)
377 })?;
378
379 let provider = provider_initializer(&provider_url).map_err(|e| {
381 tracing::warn!(
382 provider_url = %provider_url,
383 operation_name = %operation_name,
384 error = %e,
385 "failed to initialize provider"
386 );
387 e
388 })?;
389
390 Ok((provider, provider_url))
391}
392
393async fn try_with_retries<P, T, E, F, Fut>(
395 provider: &P,
396 provider_url: &str,
397 operation_name: &str,
398 operation: &F,
399 is_retriable_error: &impl Fn(&E) -> bool,
400 config: &RetryConfig,
401 total_attempts: &mut usize,
402) -> Result<T, InternalRetryError<E>>
403where
404 P: Clone,
405 E: std::fmt::Display + From<String>,
406 F: Fn(P) -> Fut,
407 Fut: Future<Output = Result<T, E>>,
408{
409 if config.max_retries <= 1 {
411 *total_attempts += 1;
412 return operation(provider.clone())
413 .await
414 .map_err(InternalRetryError::NonRetriable);
415 }
416
417 for current_attempt_idx in 0..config.max_retries {
418 *total_attempts += 1;
419
420 match operation(provider.clone()).await {
421 Ok(result) => {
422 tracing::debug!(
423 operation_name = %operation_name,
424 provider_url = %provider_url,
425 attempt = %(current_attempt_idx + 1),
426 max_retries = %config.max_retries,
427 total_attempts = %*total_attempts,
428 "rpc call succeeded"
429 );
430 return Ok(result);
431 }
432 Err(e) => {
433 let is_retriable = is_retriable_error(&e);
434 let is_last_attempt = current_attempt_idx + 1 >= config.max_retries;
435
436 tracing::warn!(
437 operation_name = %operation_name,
438 provider_url = %provider_url,
439 attempt = %(current_attempt_idx + 1),
440 max_retries = %config.max_retries,
441 error = %e,
442 retriable = %is_retriable,
443 "rpc call failed"
444 );
445
446 if !is_retriable {
447 return Err(InternalRetryError::NonRetriable(e));
448 }
449
450 if is_last_attempt {
451 tracing::warn!(
452 max_retries = %config.max_retries,
453 operation_name = %operation_name,
454 provider_url = %provider_url,
455 error = %e,
456 "all retries exhausted"
457 );
458 return Err(InternalRetryError::RetriesExhausted(e));
459 }
460
461 let delay = calculate_retry_delay(
463 current_attempt_idx + 1,
464 config.base_delay_ms,
465 config.max_delay_ms,
466 );
467
468 tracing::debug!(
469 operation_name = %operation_name,
470 provider_url = %provider_url,
471 delay = ?delay,
472 next_attempt = %(current_attempt_idx + 2),
473 max_retries = %config.max_retries,
474 "retrying rpc call after delay"
475 );
476 tokio::time::sleep(delay).await;
477 }
478 }
479 }
480
481 unreachable!(
482 "Loop should have returned if max_retries > 1; max_retries=0 or 1 case is handled above."
483 );
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489 use crate::models::RpcConfig;
490 use lazy_static::lazy_static;
491 use std::cmp::Ordering;
492 use std::env;
493 use std::sync::atomic::{AtomicU8, Ordering as AtomicOrdering};
494 use std::sync::Arc;
495 use std::sync::Mutex;
496
497 lazy_static! {
499 static ref RETRY_TEST_ENV_MUTEX: Mutex<()> = Mutex::new(());
500 }
501
502 #[derive(Debug, Clone)]
504 struct TestError(String);
505
506 impl std::fmt::Display for TestError {
507 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
508 write!(f, "TestError: {}", self.0)
509 }
510 }
511
512 impl From<String> for TestError {
513 fn from(msg: String) -> Self {
514 TestError(msg)
515 }
516 }
517
518 struct EnvGuard {
520 keys: Vec<String>,
521 old_values: Vec<Option<String>>,
522 }
523
524 impl EnvGuard {
525 fn new() -> Self {
526 Self {
527 keys: Vec::new(),
528 old_values: Vec::new(),
529 }
530 }
531
532 fn set(&mut self, key: &str, value: &str) {
533 let old_value = env::var(key).ok();
534 self.keys.push(key.to_string());
535 self.old_values.push(old_value);
536 env::set_var(key, value);
537 }
538 }
539
540 impl Drop for EnvGuard {
541 fn drop(&mut self) {
542 for i in 0..self.keys.len() {
543 match &self.old_values[i] {
544 Some(value) => env::set_var(&self.keys[i], value),
545 None => env::remove_var(&self.keys[i]),
546 }
547 }
548 }
549 }
550
551 fn setup_test_env() -> EnvGuard {
553 let mut guard = EnvGuard::new();
554 guard.set("API_KEY", "fake-api-key-for-tests-01234567890123456789");
555 guard.set("PROVIDER_MAX_RETRIES", "2");
556 guard.set("PROVIDER_MAX_FAILOVERS", "1");
557 guard.set("PROVIDER_RETRY_BASE_DELAY_MS", "1");
558 guard.set("PROVIDER_RETRY_MAX_DELAY_MS", "5");
559 guard.set("REDIS_URL", "redis://localhost:6379");
560 guard.set(
561 "RELAYER_PRIVATE_KEY",
562 "0x1234567890123456789012345678901234567890123456789012345678901234",
563 );
564 guard
565 }
566
567 #[test]
568 fn test_calculate_retry_delay() {
569 let base_delay_ms = 10;
571 let max_delay_ms = 10000;
572
573 let expected_backoffs = [
574 10, 20, 40, 80, 160, 320, ];
581
582 for (i, expected) in expected_backoffs.iter().enumerate() {
583 let attempt = i as u8;
584 let delay = calculate_retry_delay(attempt, base_delay_ms, max_delay_ms);
585
586 let min_expected = (*expected as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
587 let max_expected = (*expected as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
588
589 assert!(
590 (min_expected..=max_expected).contains(&delay.as_millis()),
591 "Delay {} outside expected range {}..={}",
592 delay.as_millis(),
593 min_expected,
594 max_expected
595 );
596 }
597
598 let base_delay_ms = 100;
600 let max_delay_ms = 1000;
601 let delay = calculate_retry_delay(4, base_delay_ms, max_delay_ms);
602 let min_expected = (max_delay_ms as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
603 let max_expected = (max_delay_ms as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
604 assert!(
605 (min_expected..=max_expected).contains(&delay.as_millis()),
606 "Delay {} outside expected range {}..={}",
607 delay.as_millis(),
608 min_expected,
609 max_expected
610 );
611
612 assert_eq!(calculate_retry_delay(5, 0, 1000).as_millis(), 0);
614 assert_eq!(calculate_retry_delay(5, 100, 0).as_millis(), 0);
615 assert_eq!(calculate_retry_delay(5, 0, 0).as_millis(), 0);
616
617 let max_delay_ms = 10_000;
619 let delay = calculate_retry_delay(u8::MAX, 1, max_delay_ms);
620 assert!(
621 delay.as_millis()
622 <= (max_delay_ms as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128
623 );
624 }
625
626 #[test]
627 fn test_apply_jitter() {
628 let base_delay = 1000;
629 let jittered = apply_jitter(base_delay);
630
631 let min_expected = (base_delay as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u64;
632 let max_expected = (base_delay as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u64;
633
634 assert!(
635 (min_expected as u128..=max_expected as u128).contains(&jittered.as_millis()),
636 "Jittered value {} outside expected range {}..={}",
637 jittered.as_millis(),
638 min_expected,
639 max_expected
640 );
641
642 assert_eq!(apply_jitter(0).as_millis(), 0);
644
645 for delay in 1..5 {
647 let jittered = apply_jitter(delay);
648 let jitter_range = (delay as f64 * RETRY_JITTER_PERCENT).floor() as u64;
649
650 if jitter_range == 0 {
651 assert_eq!(jittered.as_millis(), delay as u128);
652 } else {
653 let min_expected = delay.saturating_sub(jitter_range);
654 let max_expected = delay.saturating_add(jitter_range);
655 assert!(
656 (min_expected as u128..=max_expected as u128).contains(&jittered.as_millis()),
657 "Jittered value {} outside expected range {}..={}",
658 jittered.as_millis(),
659 min_expected,
660 max_expected
661 );
662 }
663 }
664
665 let base_delay = 10000;
666 let iterations = 200;
667 let mut additions = 0;
668 let mut subtractions = 0;
669
670 for _ in 0..iterations {
671 let jittered = apply_jitter(base_delay);
672 let j_millis = jittered.as_millis();
673 let b_delay = base_delay as u128;
674
675 match j_millis.cmp(&b_delay) {
676 Ordering::Greater => {
677 additions += 1;
678 }
679 Ordering::Less => {
680 subtractions += 1;
681 }
682 Ordering::Equal => {}
683 }
684 }
685
686 assert!(additions > 0, "No additions were observed");
687 assert!(subtractions > 0, "No subtractions were observed");
688 }
689
690 #[test]
691 fn test_retry_config() {
692 let config = RetryConfig::new(5, 2, 100, 5000);
693 assert_eq!(config.max_retries, 5);
694 assert_eq!(config.max_failovers, 2);
695 assert_eq!(config.base_delay_ms, 100);
696 assert_eq!(config.max_delay_ms, 5000);
697 }
698
699 #[test]
700 fn test_retry_config_from_env() {
701 let _lock = RETRY_TEST_ENV_MUTEX
702 .lock()
703 .unwrap_or_else(|e| e.into_inner());
704 let mut guard = setup_test_env();
705 guard.set("REDIS_URL", "redis://localhost:6379");
707 guard.set(
708 "RELAYER_PRIVATE_KEY",
709 "0x1234567890123456789012345678901234567890123456789012345678901234",
710 );
711
712 let config = RetryConfig::from_env();
713 assert_eq!(config.max_retries, 2);
714 assert_eq!(config.max_failovers, 1);
715 assert_eq!(config.base_delay_ms, 1);
716 assert_eq!(config.max_delay_ms, 5);
717 }
718
719 #[test]
720 fn test_calculate_retry_delay_edge_cases() {
721 let delay = calculate_retry_delay(0, 100, 1000);
723 let min_expected = (100.0 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
724 let max_expected = (100.0 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
725 assert!(
726 (min_expected..=max_expected).contains(&delay.as_millis()),
727 "Delay {} outside expected range {}..={}",
728 delay.as_millis(),
729 min_expected,
730 max_expected
731 );
732
733 let delay = calculate_retry_delay(5, 100, 100);
735 let min_expected = (100.0 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
736 let max_expected = (100.0 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
737 assert!(
738 (min_expected..=max_expected).contains(&delay.as_millis()),
739 "Delay {} outside expected range {}..={}",
740 delay.as_millis(),
741 min_expected,
742 max_expected
743 );
744
745 let delay = calculate_retry_delay(60, 1000, u64::MAX);
747 assert!(delay.as_millis() > 0);
748
749 let delay = calculate_retry_delay(1, 1, 1);
751 assert_eq!(delay.as_millis(), 1);
752 }
753
754 #[test]
755 fn test_retry_config_validation() {
756 let _config = RetryConfig::new(3, 1, 100, 1000);
758 let _config = RetryConfig::new(3, 1, 0, 0); let _config = RetryConfig::new(3, 1, 100, 100); let _config = RetryConfig::new(0, 0, 1, 1); let _config = RetryConfig::new(255, 255, 1, 1000); }
763
764 #[test]
765 #[should_panic(
766 expected = "max_delay_ms (50) must be >= base_delay_ms (100) when both are non-zero"
767 )]
768 fn test_retry_config_validation_panic_delay_ordering() {
769 let _config = RetryConfig::new(3, 1, 100, 50);
771 }
772
773 #[test]
774 #[should_panic(
775 expected = "Delay values must be consistent: both zero (no delays) or both non-zero"
776 )]
777 fn test_retry_config_validation_panic_inconsistent_delays_base_zero() {
778 let _config = RetryConfig::new(3, 1, 0, 1000);
780 }
781
782 #[test]
783 #[should_panic(
784 expected = "Delay values must be consistent: both zero (no delays) or both non-zero"
785 )]
786 fn test_retry_config_validation_panic_inconsistent_delays_max_zero() {
787 let _config = RetryConfig::new(3, 1, 100, 0);
789 }
790
791 #[test]
792 fn test_get_provider() {
793 let _guard = setup_test_env();
794
795 let configs = vec![
796 RpcConfig::new("http://localhost:8545".to_string()),
797 RpcConfig::new("http://localhost:8546".to_string()),
798 ];
799 let selector = RpcSelector::new(configs).expect("Failed to create selector");
800
801 let initializer =
802 |url: &str| -> Result<String, TestError> { Ok(format!("provider-{}", url)) };
803
804 let result = get_provider(&selector, "test_operation", &initializer);
805 assert!(result.is_ok());
806 let (provider, url) = result.unwrap();
807 assert_eq!(url, "http://localhost:8545");
808 assert_eq!(provider, "provider-http://localhost:8545");
809
810 let initializer = |_: &str| -> Result<String, TestError> {
811 Err(TestError("Failed to initialize".to_string()))
812 };
813
814 let result = get_provider(&selector, "test_operation", &initializer);
815 assert!(result.is_err());
816 let err = result.unwrap_err();
817 assert!(format!("{}", err).contains("Failed to initialize"));
818 }
819
820 #[tokio::test]
821 async fn test_try_with_retries() {
822 let provider = "test_provider".to_string();
823 let provider_url = "http://localhost:8545";
824 let mut total_attempts = 0;
825 let config = RetryConfig::new(3, 1, 5, 10);
826
827 let operation = |p: String| async move {
828 assert_eq!(p, "test_provider");
829 Ok::<_, TestError>(42)
830 };
831
832 let result = try_with_retries(
833 &provider,
834 provider_url,
835 "test_operation",
836 &operation,
837 &|_| false,
838 &config,
839 &mut total_attempts,
840 )
841 .await;
842
843 assert!(result.is_ok());
844 assert_eq!(result.unwrap(), 42);
845 assert_eq!(total_attempts, 1);
846
847 let attempts = Arc::new(AtomicU8::new(0));
848 let attempts_clone = attempts.clone();
849 let operation = move |_: String| {
850 let attempts = attempts_clone.clone();
851 async move {
852 let current = attempts.fetch_add(1, AtomicOrdering::SeqCst);
853 if current < 2 {
854 Err(TestError("Retriable error".to_string()))
855 } else {
856 Ok(42)
857 }
858 }
859 };
860
861 let mut total_attempts = 0;
862 let result = try_with_retries(
863 &provider,
864 provider_url,
865 "test_operation",
866 &operation,
867 &|_| true,
868 &config,
869 &mut total_attempts,
870 )
871 .await;
872
873 assert!(result.is_ok());
874 assert_eq!(result.unwrap(), 42);
875 assert_eq!(total_attempts, 3);
876
877 let operation = |_: String| async { Err(TestError("Non-retriable error".to_string())) };
879
880 let mut total_attempts = 0;
881 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
882 &provider,
883 provider_url,
884 "test_operation",
885 &operation,
886 &|_| false,
887 &config,
888 &mut total_attempts,
889 )
890 .await;
891
892 assert!(result.is_err());
893 assert_eq!(total_attempts, 1);
894 let err = result.unwrap_err();
895 assert!(matches!(err, InternalRetryError::NonRetriable(_)));
896
897 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
899
900 let mut total_attempts = 0;
901 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
902 &provider,
903 provider_url,
904 "test_operation",
905 &operation,
906 &|_| true,
907 &config,
908 &mut total_attempts,
909 )
910 .await;
911
912 assert!(result.is_err());
913 assert_eq!(total_attempts, 3); let error = result.unwrap_err();
915 assert!(matches!(error, InternalRetryError::RetriesExhausted(_)));
916 }
917
918 #[tokio::test]
919 async fn test_try_with_retries_max_retries_zero() {
920 let provider = "test_provider".to_string();
921 let provider_url = "http://localhost:8545";
922 let mut total_attempts = 0;
923 let config = RetryConfig::new(0, 1, 5, 10);
924
925 let operation = |_p: String| async move { Ok::<_, TestError>(42) };
927
928 let result = try_with_retries(
929 &provider,
930 provider_url,
931 "test_operation",
932 &operation,
933 &|_| false,
934 &config,
935 &mut total_attempts,
936 )
937 .await;
938
939 assert!(result.is_ok());
940 assert_eq!(result.unwrap(), 42);
941
942 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
944
945 let mut total_attempts = 0;
946 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
947 &provider,
948 provider_url,
949 "test_operation",
950 &operation,
951 &|_| true,
952 &config,
953 &mut total_attempts,
954 )
955 .await;
956
957 assert!(result.is_err());
958 let error = result.unwrap_err();
959 assert!(matches!(error, InternalRetryError::NonRetriable(_))); }
961
962 #[tokio::test]
963 async fn test_try_with_retries_max_retries_one() {
964 let provider = "test_provider".to_string();
965 let provider_url = "http://localhost:8545";
966 let mut total_attempts = 0;
967 let config = RetryConfig::new(1, 1, 5, 10);
968
969 let operation = |p: String| async move {
971 assert_eq!(p, "test_provider");
972 Ok::<_, TestError>(42)
973 };
974
975 let result = try_with_retries(
976 &provider,
977 provider_url,
978 "test_operation",
979 &operation,
980 &|_| false,
981 &config,
982 &mut total_attempts,
983 )
984 .await;
985
986 assert!(result.is_ok());
987 assert_eq!(result.unwrap(), 42);
988
989 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
991
992 let mut total_attempts = 0;
993 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
994 &provider,
995 provider_url,
996 "test_operation",
997 &operation,
998 &|_| true,
999 &config,
1000 &mut total_attempts,
1001 )
1002 .await;
1003
1004 assert!(result.is_err());
1005 let error = result.unwrap_err();
1006 assert!(matches!(error, InternalRetryError::NonRetriable(_))); }
1008
1009 #[tokio::test]
1010 async fn test_non_retriable_error_does_not_mark_provider_failed() {
1011 let _guard = setup_test_env();
1012
1013 let configs = vec![
1014 RpcConfig::new("http://localhost:8545".to_string()),
1015 RpcConfig::new("http://localhost:8546".to_string()),
1016 ];
1017 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1018
1019 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1020
1021 let operation =
1023 |_provider: String| async move { Err(TestError("Non-retriable error".to_string())) };
1024
1025 let config = RetryConfig::new(3, 1, 0, 0);
1026
1027 let initial_available_count = selector.available_provider_count();
1029
1030 let result: Result<i32, TestError> = retry_rpc_call(
1031 &selector,
1032 "test_operation",
1033 |_| false, |_| false, provider_initializer,
1036 operation,
1037 Some(config),
1038 )
1039 .await;
1040
1041 assert!(result.is_err());
1042
1043 let final_available_count = selector.available_provider_count();
1045 assert_eq!(
1046 initial_available_count, final_available_count,
1047 "Provider count should remain the same for non-retriable errors"
1048 );
1049 }
1050
1051 #[tokio::test]
1052 async fn test_retriable_error_marks_provider_failed_after_retries_exhausted() {
1053 let _guard = setup_test_env();
1054
1055 let configs = vec![
1056 RpcConfig::new("http://localhost:8545".to_string()),
1057 RpcConfig::new("http://localhost:8546".to_string()),
1058 ];
1059 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1060
1061 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1062
1063 let operation = |_provider: String| async { Err(TestError("Retriable error".to_string())) };
1065
1066 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1070
1071 let result: Result<i32, TestError> = retry_rpc_call(
1072 &selector,
1073 "test_operation",
1074 |_| true, |_| true, provider_initializer,
1077 operation,
1078 Some(config),
1079 )
1080 .await;
1081
1082 assert!(result.is_err());
1083
1084 let final_available_count = selector.available_provider_count();
1086 assert!(final_available_count < initial_available_count,
1087 "At least one provider should be marked as failed after retriable errors exhaust retries");
1088 }
1089
1090 #[tokio::test]
1091 async fn test_retry_rpc_call_success() {
1092 let _guard = setup_test_env();
1093
1094 let configs = vec![
1095 RpcConfig::new("http://localhost:8545".to_string()),
1096 RpcConfig::new("http://localhost:8546".to_string()),
1097 ];
1098 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1099
1100 let attempts = Arc::new(AtomicU8::new(0));
1101 let attempts_clone = attempts.clone();
1102
1103 let provider_initializer =
1104 |_url: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1105
1106 let operation = move |_provider: String| {
1107 let attempts = attempts_clone.clone();
1108 async move {
1109 attempts.fetch_add(1, AtomicOrdering::SeqCst);
1110 Ok::<_, TestError>(42)
1111 }
1112 };
1113
1114 let config = RetryConfig::new(1, 1, 0, 0);
1115
1116 let result = retry_rpc_call(
1117 &selector,
1118 "test_operation",
1119 |_| false, |_| false, provider_initializer,
1122 operation,
1123 Some(config),
1124 )
1125 .await;
1126
1127 assert!(result.is_ok(), "Expected OK result but got: {:?}", result);
1128 assert_eq!(result.unwrap(), 42);
1129 assert_eq!(attempts.load(AtomicOrdering::SeqCst), 1); }
1131
1132 #[tokio::test]
1133 async fn test_retry_rpc_call_with_provider_failover() {
1134 let _guard = setup_test_env();
1135
1136 let configs = vec![
1137 RpcConfig::new("http://localhost:8545".to_string()),
1138 RpcConfig::new("http://localhost:8546".to_string()),
1139 ];
1140 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1141
1142 let current_provider = Arc::new(Mutex::new(String::new()));
1143 let current_provider_clone = current_provider.clone();
1144
1145 let provider_initializer = move |url: &str| -> Result<String, TestError> {
1146 let mut provider = current_provider_clone.lock().unwrap();
1147 *provider = url.to_string();
1148 Ok(url.to_string())
1149 };
1150
1151 let operation = move |provider: String| async move {
1152 if provider.contains("8545") {
1153 Err(TestError("First provider error".to_string()))
1154 } else {
1155 Ok(42)
1156 }
1157 };
1158
1159 let config = RetryConfig::new(2, 1, 0, 0); let result = retry_rpc_call(
1162 &selector,
1163 "test_operation",
1164 |_| true, |_| true, provider_initializer,
1167 operation,
1168 Some(config),
1169 )
1170 .await;
1171
1172 assert!(result.is_ok(), "Expected OK result but got: {:?}", result);
1173 assert_eq!(result.unwrap(), 42);
1174
1175 let final_provider = current_provider.lock().unwrap().clone();
1177 assert!(
1178 final_provider.contains("8546"),
1179 "Wrong provider selected: {}",
1180 final_provider
1181 );
1182 }
1183
1184 #[tokio::test]
1185 async fn test_retry_rpc_call_all_providers_fail() {
1186 let _guard = setup_test_env();
1187
1188 let configs = vec![
1189 RpcConfig::new("http://localhost:8545".to_string()),
1190 RpcConfig::new("http://localhost:8546".to_string()),
1191 ];
1192 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1193
1194 let provider_initializer =
1195 |_: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1196
1197 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
1198
1199 let config = RetryConfig::new(2, 1, 0, 0); let result: Result<i32, TestError> = retry_rpc_call(
1202 &selector,
1203 "test_operation",
1204 |_| true, |_| false, provider_initializer,
1207 operation,
1208 Some(config),
1209 )
1210 .await;
1211
1212 assert!(result.is_err(), "Expected an error but got: {:?}", result);
1213 }
1214
1215 #[tokio::test]
1216 async fn test_retry_rpc_call_with_default_config() {
1217 let (_guard, selector) = {
1218 let _lock = RETRY_TEST_ENV_MUTEX
1219 .lock()
1220 .unwrap_or_else(|e| e.into_inner());
1221 let guard = setup_test_env();
1222
1223 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1224 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1225 (guard, selector)
1226 };
1227
1228 let provider_initializer =
1229 |_url: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1230
1231 let operation = |_provider: String| async move { Ok::<_, TestError>(42) };
1232
1233 let result = retry_rpc_call(
1235 &selector,
1236 "test_operation",
1237 |_| false,
1238 |_| false,
1239 provider_initializer,
1240 operation,
1241 None, )
1243 .await;
1244
1245 assert!(result.is_ok());
1246 assert_eq!(result.unwrap(), 42);
1247 }
1248
1249 #[tokio::test]
1250 async fn test_retry_rpc_call_provider_initialization_failures() {
1251 let _guard = setup_test_env();
1252
1253 let configs = vec![
1254 RpcConfig::new("http://localhost:8545".to_string()),
1255 RpcConfig::new("http://localhost:8546".to_string()),
1256 ];
1257 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1258
1259 let attempt_count = Arc::new(AtomicU8::new(0));
1260 let attempt_count_clone = attempt_count.clone();
1261
1262 let provider_initializer = move |url: &str| -> Result<String, TestError> {
1263 let count = attempt_count_clone.fetch_add(1, AtomicOrdering::SeqCst);
1264 if count == 0 && url.contains("8545") {
1265 Err(TestError("First provider init failed".to_string()))
1266 } else {
1267 Ok(url.to_string())
1268 }
1269 };
1270
1271 let operation = |_provider: String| async move { Ok::<_, TestError>(42) };
1272
1273 let config = RetryConfig::new(2, 1, 0, 0);
1274
1275 let result = retry_rpc_call(
1276 &selector,
1277 "test_operation",
1278 |_| true,
1279 |_| false,
1280 provider_initializer,
1281 operation,
1282 Some(config),
1283 )
1284 .await;
1285
1286 assert!(result.is_ok());
1287 assert_eq!(result.unwrap(), 42);
1288 assert!(attempt_count.load(AtomicOrdering::SeqCst) >= 2); }
1290
1291 #[test]
1292 fn test_get_provider_selector_errors() {
1293 let _guard = setup_test_env();
1294
1295 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1297 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1298
1299 let _ = selector.get_current_url().unwrap(); selector.mark_current_as_failed(); let provider_initializer =
1304 |url: &str| -> Result<String, TestError> { Ok(format!("provider-{}", url)) };
1305
1306 let result = get_provider(&selector, "test_operation", &provider_initializer);
1308 assert!(result.is_err());
1309 }
1310
1311 #[tokio::test]
1312 async fn test_last_provider_never_marked_as_failed() {
1313 let _guard = setup_test_env();
1314
1315 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1317 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1318
1319 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1320
1321 let operation = |_provider: String| async { Err(TestError("Always fails".to_string())) };
1323
1324 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1328 assert_eq!(initial_available_count, 1);
1329
1330 let result: Result<i32, TestError> = retry_rpc_call(
1331 &selector,
1332 "test_operation",
1333 |_| true, |_| true, provider_initializer,
1336 operation,
1337 Some(config),
1338 )
1339 .await;
1340
1341 assert!(result.is_err());
1342
1343 let final_available_count = selector.available_provider_count();
1345 assert_eq!(
1346 final_available_count, initial_available_count,
1347 "Last provider should never be marked as failed"
1348 );
1349 assert_eq!(
1350 final_available_count, 1,
1351 "Should still have 1 provider available"
1352 );
1353 }
1354
1355 #[tokio::test]
1356 async fn test_last_provider_behavior_with_multiple_providers() {
1357 let _guard = setup_test_env();
1358
1359 let configs = vec![
1361 RpcConfig::new("http://localhost:8545".to_string()),
1362 RpcConfig::new("http://localhost:8546".to_string()),
1363 RpcConfig::new("http://localhost:8547".to_string()),
1364 ];
1365 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1366
1367 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1368
1369 let operation = |_provider: String| async { Err(TestError("Always fails".to_string())) };
1371
1372 let config = RetryConfig::new(2, 2, 0, 0); let initial_available_count = selector.available_provider_count();
1376 assert_eq!(initial_available_count, 3);
1377
1378 let result: Result<i32, TestError> = retry_rpc_call(
1379 &selector,
1380 "test_operation",
1381 |_| true, |_| true, provider_initializer,
1384 operation,
1385 Some(config),
1386 )
1387 .await;
1388
1389 assert!(result.is_err());
1390
1391 let final_available_count = selector.available_provider_count();
1393 assert_eq!(
1394 final_available_count, 1,
1395 "Should have exactly 1 provider left (the last one should not be marked as failed)"
1396 );
1397 }
1398
1399 #[tokio::test]
1400 async fn test_non_retriable_error_should_mark_provider_failed() {
1401 let _guard = setup_test_env();
1402
1403 let configs = vec![
1404 RpcConfig::new("http://localhost:8545".to_string()),
1405 RpcConfig::new("http://localhost:8546".to_string()),
1406 ];
1407 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1408
1409 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1410
1411 let operation = |_provider: String| async move {
1413 Err(TestError("Critical non-retriable error".to_string()))
1414 };
1415
1416 let config = RetryConfig::new(3, 1, 0, 0);
1417
1418 let initial_available_count = selector.available_provider_count();
1420 assert_eq!(initial_available_count, 2);
1421
1422 let result: Result<i32, TestError> = retry_rpc_call(
1423 &selector,
1424 "test_operation",
1425 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1428 operation,
1429 Some(config),
1430 )
1431 .await;
1432
1433 assert!(result.is_err());
1434
1435 let final_available_count = selector.available_provider_count();
1437 assert_eq!(final_available_count, 1,
1438 "Provider should be marked as failed when should_mark_provider_failed returns true for non-retriable error");
1439 }
1440
1441 #[tokio::test]
1442 async fn test_non_retriable_error_should_not_mark_provider_failed() {
1443 let _guard = setup_test_env();
1444
1445 let configs = vec![
1446 RpcConfig::new("http://localhost:8545".to_string()),
1447 RpcConfig::new("http://localhost:8546".to_string()),
1448 ];
1449 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1450
1451 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1452
1453 let operation = |_provider: String| async move {
1455 Err(TestError("Minor non-retriable error".to_string()))
1456 };
1457
1458 let config = RetryConfig::new(3, 1, 0, 0);
1459
1460 let initial_available_count = selector.available_provider_count();
1462 assert_eq!(initial_available_count, 2);
1463
1464 let result: Result<i32, TestError> = retry_rpc_call(
1465 &selector,
1466 "test_operation",
1467 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1470 operation,
1471 Some(config),
1472 )
1473 .await;
1474
1475 assert!(result.is_err());
1476
1477 let final_available_count = selector.available_provider_count();
1479 assert_eq!(final_available_count, initial_available_count,
1480 "Provider should NOT be marked as failed when should_mark_provider_failed returns false for non-retriable error");
1481 }
1482
1483 #[tokio::test]
1484 async fn test_retriable_error_ignores_should_mark_provider_failed() {
1485 let _guard = setup_test_env();
1486
1487 let configs = vec![
1488 RpcConfig::new("http://localhost:8545".to_string()),
1489 RpcConfig::new("http://localhost:8546".to_string()),
1490 ];
1491 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1492
1493 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1494
1495 let operation =
1497 |_provider: String| async { Err(TestError("Retriable network error".to_string())) };
1498
1499 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1503 assert_eq!(initial_available_count, 2);
1504
1505 let result: Result<i32, TestError> = retry_rpc_call(
1506 &selector,
1507 "test_operation",
1508 |_| true, |_| false, provider_initializer,
1511 operation,
1512 Some(config),
1513 )
1514 .await;
1515
1516 assert!(result.is_err());
1517
1518 let final_available_count = selector.available_provider_count();
1521 assert!(final_available_count < initial_available_count,
1522 "Provider should be marked as failed when retriable errors exhaust retries, regardless of should_mark_provider_failed");
1523 }
1524
1525 #[tokio::test]
1526 async fn test_mixed_error_scenarios_with_different_marking_behavior() {
1527 let _guard = setup_test_env();
1528
1529 let configs = vec![
1531 RpcConfig::new("http://localhost:8545".to_string()),
1532 RpcConfig::new("http://localhost:8546".to_string()),
1533 ];
1534 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1535
1536 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1537
1538 let operation =
1539 |_provider: String| async move { Err(TestError("Critical network error".to_string())) };
1540
1541 let config = RetryConfig::new(1, 1, 0, 0);
1542 let initial_count = selector.available_provider_count();
1543
1544 let result: Result<i32, TestError> = retry_rpc_call(
1545 &selector,
1546 "test_operation",
1547 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1550 operation,
1551 Some(config.clone()),
1552 )
1553 .await;
1554
1555 assert!(result.is_err());
1556 let after_critical_count = selector.available_provider_count();
1557 assert_eq!(
1558 after_critical_count,
1559 initial_count - 1,
1560 "Critical error should mark provider as failed"
1561 );
1562
1563 let operation =
1565 |_provider: String| async move { Err(TestError("Minor validation error".to_string())) };
1566
1567 let result: Result<i32, TestError> = retry_rpc_call(
1568 &selector,
1569 "test_operation",
1570 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1573 operation,
1574 Some(config),
1575 )
1576 .await;
1577
1578 assert!(result.is_err());
1579 let final_count = selector.available_provider_count();
1580 assert_eq!(
1581 final_count, after_critical_count,
1582 "Minor error should NOT mark provider as failed"
1583 );
1584 }
1585
1586 #[tokio::test]
1587 async fn test_should_mark_provider_failed_respects_last_provider_protection() {
1588 let _guard = setup_test_env();
1589
1590 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1592 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1593
1594 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1595
1596 let operation =
1598 |_provider: String| async move { Err(TestError("Critical network error".to_string())) };
1599
1600 let config = RetryConfig::new(1, 1, 0, 0);
1601
1602 let initial_available_count = selector.available_provider_count();
1604 assert_eq!(initial_available_count, 1);
1605
1606 let result: Result<i32, TestError> = retry_rpc_call(
1607 &selector,
1608 "test_operation",
1609 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1612 operation,
1613 Some(config),
1614 )
1615 .await;
1616
1617 assert!(result.is_err());
1618
1619 let final_available_count = selector.available_provider_count();
1621 assert_eq!(final_available_count, initial_available_count,
1622 "Last provider should never be marked as failed, regardless of should_mark_provider_failed");
1623 assert_eq!(
1624 final_available_count, 1,
1625 "Should still have 1 provider available"
1626 );
1627 }
1628
1629 #[tokio::test]
1630 async fn test_should_mark_provider_failed_with_multiple_providers_last_protection() {
1631 let _guard = setup_test_env();
1632
1633 let configs = vec![
1635 RpcConfig::new("http://localhost:8545".to_string()),
1636 RpcConfig::new("http://localhost:8546".to_string()),
1637 ];
1638 let selector = RpcSelector::new(configs).expect("Failed to create selector");
1639
1640 let attempt_count = Arc::new(AtomicU8::new(0));
1641 let attempt_count_clone = attempt_count.clone();
1642
1643 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1644
1645 let operation = move |_provider: String| {
1647 let attempt_count = attempt_count_clone.clone();
1648 async move {
1649 let count = attempt_count.fetch_add(1, AtomicOrdering::SeqCst);
1650 Err(TestError(format!("Critical error #{}", count)))
1651 }
1652 };
1653
1654 let config = RetryConfig::new(1, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1658 assert_eq!(initial_available_count, 2);
1659
1660 let result: Result<i32, TestError> = retry_rpc_call(
1661 &selector,
1662 "test_operation",
1663 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1666 operation,
1667 Some(config),
1668 )
1669 .await;
1670
1671 assert!(result.is_err());
1672
1673 let final_available_count = selector.available_provider_count();
1675 assert_eq!(
1676 final_available_count, 1,
1677 "First provider should be marked as failed, but last provider should be protected"
1678 );
1679 }
1680}