1use crate::{
6 jobs::{
7 notification_handler, relayer_health_check_handler, solana_token_swap_cron_handler,
8 solana_token_swap_request_handler, transaction_cleanup_handler,
9 transaction_request_handler, transaction_status_handler, transaction_submission_handler,
10 BackoffRetryPolicy, JobProducerTrait,
11 },
12 models::{
13 NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel,
14 ThinDataAppState, TransactionRepoModel,
15 },
16 repositories::{
17 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
18 Repository, TransactionCounterTrait, TransactionRepository,
19 },
20};
21use apalis::{layers::ErrorHandlingLayer, prelude::*};
22use apalis_cron::CronStream;
23use eyre::Result;
24use std::{str::FromStr, time::Duration};
25use tokio::signal::unix::SignalKind;
26use tracing::{debug, error, info};
27
28const DEFAULT_CONCURRENCY: usize = 10;
30const DEFAULT_RATE_LIMIT: u64 = 100;
31const DEFAULT_RATE_LIMIT_DURATION: Duration = Duration::from_secs(1);
32
33const TRANSACTION_REQUEST: &str = "transaction_request";
34const TRANSACTION_SENDER: &str = "transaction_sender";
35const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
36const NOTIFICATION_SENDER: &str = "notification_sender";
37const SOLANA_TOKEN_SWAP_REQUEST: &str = "solana_token_swap_request";
38const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
39const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
40
41#[derive(Debug, Clone)]
43pub struct WorkerConfig {
44 pub name: &'static str,
45 pub concurrency: usize,
46 pub rate_limit: u64,
47 pub rate_limit_duration: Duration,
48 pub cron_schedule: Option<&'static str>,
50}
51
52impl WorkerConfig {
53 pub const fn new(name: &'static str, concurrency: usize) -> Self {
54 Self {
55 name,
56 concurrency,
57 rate_limit: DEFAULT_RATE_LIMIT,
58 rate_limit_duration: DEFAULT_RATE_LIMIT_DURATION,
59 cron_schedule: None,
60 }
61 }
62
63 pub const fn with_cron(name: &'static str, concurrency: usize, cron: &'static str) -> Self {
64 Self {
65 name,
66 concurrency,
67 rate_limit: DEFAULT_RATE_LIMIT,
68 rate_limit_duration: DEFAULT_RATE_LIMIT_DURATION,
69 cron_schedule: Some(cron),
70 }
71 }
72}
73
74pub fn get_worker_configs() -> Vec<WorkerConfig> {
83 vec![
84 WorkerConfig::new(TRANSACTION_REQUEST, DEFAULT_CONCURRENCY), WorkerConfig::new(TRANSACTION_SENDER, DEFAULT_CONCURRENCY), WorkerConfig::new(TRANSACTION_STATUS_CHECKER, 20), WorkerConfig::new(NOTIFICATION_SENDER, DEFAULT_CONCURRENCY), WorkerConfig::new(SOLANA_TOKEN_SWAP_REQUEST, 25), WorkerConfig::with_cron(
94 TRANSACTION_CLEANUP,
95 1, "0 */30 * * * *", ),
98 WorkerConfig::new(RELAYER_HEALTH_CHECK, 10), ]
101}
102
103pub async fn initialize_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
104 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
105) -> Result<()>
106where
107 J: JobProducerTrait + Send + Sync + 'static,
108 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
109 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
110 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
111 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
112 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
113 TCR: TransactionCounterTrait + Send + Sync + 'static,
114 PR: PluginRepositoryTrait + Send + Sync + 'static,
115 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
116{
117 let queue = app_state.job_producer().get_queue().await?;
118 let configs = get_worker_configs();
119
120 let get_config = |name: &str| -> &WorkerConfig {
122 configs
123 .iter()
124 .find(|c| c.name == name)
125 .unwrap_or_else(|| panic!("Worker config for '{}' should exist", name))
126 };
127
128 let tx_req_config = get_config(TRANSACTION_REQUEST);
129 let transaction_request_queue_worker = WorkerBuilder::new(tx_req_config.name)
130 .layer(ErrorHandlingLayer::new())
131 .enable_tracing()
132 .catch_panic()
133 .rate_limit(tx_req_config.rate_limit, tx_req_config.rate_limit_duration)
134 .retry(BackoffRetryPolicy::default())
135 .concurrency(tx_req_config.concurrency)
136 .data(app_state.clone())
137 .backend(queue.transaction_request_queue.clone())
138 .build_fn(transaction_request_handler);
139
140 let tx_sub_config = get_config(TRANSACTION_SENDER);
141 let transaction_submission_queue_worker = WorkerBuilder::new(tx_sub_config.name)
142 .layer(ErrorHandlingLayer::new())
143 .enable_tracing()
144 .catch_panic()
145 .rate_limit(tx_sub_config.rate_limit, tx_sub_config.rate_limit_duration)
146 .retry(BackoffRetryPolicy::default())
147 .concurrency(tx_sub_config.concurrency)
148 .data(app_state.clone())
149 .backend(queue.transaction_submission_queue.clone())
150 .build_fn(transaction_submission_handler);
151
152 let tx_status_config = get_config(TRANSACTION_STATUS_CHECKER);
153 let transaction_status_queue_worker = WorkerBuilder::new(tx_status_config.name)
154 .layer(ErrorHandlingLayer::new())
155 .catch_panic()
156 .enable_tracing()
157 .rate_limit(
158 tx_status_config.rate_limit,
159 tx_status_config.rate_limit_duration,
160 )
161 .retry(BackoffRetryPolicy::default())
162 .concurrency(tx_status_config.concurrency)
163 .data(app_state.clone())
164 .backend(queue.transaction_status_queue.clone())
165 .build_fn(transaction_status_handler);
166
167 let notif_config = get_config(NOTIFICATION_SENDER);
168 let notification_queue_worker = WorkerBuilder::new(notif_config.name)
169 .layer(ErrorHandlingLayer::new())
170 .enable_tracing()
171 .catch_panic()
172 .rate_limit(notif_config.rate_limit, notif_config.rate_limit_duration)
173 .retry(BackoffRetryPolicy::default())
174 .concurrency(notif_config.concurrency)
175 .data(app_state.clone())
176 .backend(queue.notification_queue.clone())
177 .build_fn(notification_handler);
178
179 let swap_config = get_config(SOLANA_TOKEN_SWAP_REQUEST);
180 let solana_token_swap_request_queue_worker = WorkerBuilder::new(swap_config.name)
181 .layer(ErrorHandlingLayer::new())
182 .enable_tracing()
183 .catch_panic()
184 .rate_limit(swap_config.rate_limit, swap_config.rate_limit_duration)
185 .retry(BackoffRetryPolicy::default())
186 .concurrency(swap_config.concurrency)
187 .data(app_state.clone())
188 .backend(queue.solana_token_swap_request_queue.clone())
189 .build_fn(solana_token_swap_request_handler);
190
191 let cleanup_config = get_config(TRANSACTION_CLEANUP);
192 let transaction_cleanup_queue_worker = WorkerBuilder::new(cleanup_config.name)
193 .layer(ErrorHandlingLayer::new())
194 .enable_tracing()
195 .catch_panic()
196 .rate_limit(
197 cleanup_config.rate_limit,
198 cleanup_config.rate_limit_duration,
199 )
200 .retry(BackoffRetryPolicy::default())
201 .concurrency(cleanup_config.concurrency)
202 .data(app_state.clone())
203 .backend(CronStream::new(
204 apalis_cron::Schedule::from_str(
205 cleanup_config
206 .cron_schedule
207 .expect("TRANSACTION_CLEANUP should have cron schedule"),
208 )
209 .expect("Valid cron schedule"),
210 ))
211 .build_fn(transaction_cleanup_handler);
212
213 let health_config = get_config(RELAYER_HEALTH_CHECK);
214 let relayer_health_check_worker = WorkerBuilder::new(health_config.name)
215 .layer(ErrorHandlingLayer::new())
216 .enable_tracing()
217 .catch_panic()
218 .rate_limit(health_config.rate_limit, health_config.rate_limit_duration)
219 .retry(BackoffRetryPolicy::default())
220 .concurrency(health_config.concurrency)
221 .data(app_state.clone())
222 .backend(queue.relayer_health_check_queue.clone())
223 .build_fn(relayer_health_check_handler);
224
225 let monitor = Monitor::new()
226 .register(transaction_request_queue_worker)
227 .register(transaction_submission_queue_worker)
228 .register(transaction_status_queue_worker)
229 .register(notification_queue_worker)
230 .register(solana_token_swap_request_queue_worker)
231 .register(transaction_cleanup_queue_worker)
232 .register(relayer_health_check_worker)
233 .on_event(monitor_handle_event)
234 .shutdown_timeout(Duration::from_millis(5000));
235
236 let monitor_future = monitor.run_with_signal(async {
237 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
238 .expect("Failed to create SIGINT signal");
239 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
240 .expect("Failed to create SIGTERM signal");
241
242 info!("Monitor started");
243
244 tokio::select! {
245 _ = sigint.recv() => info!("Received SIGINT."),
246 _ = sigterm.recv() => info!("Received SIGTERM."),
247 };
248
249 info!("Monitor shutting down");
250
251 Ok(())
252 });
253 tokio::spawn(async move {
254 if let Err(e) = monitor_future.await {
255 error!(error = %e, "monitor error");
256 }
257 });
258 info!("Monitor shutdown complete");
259 Ok(())
260}
261
262fn filter_relayers_for_swap(relayers: Vec<RelayerRepoModel>) -> Vec<RelayerRepoModel> {
268 relayers
269 .into_iter()
270 .filter(|relayer| {
271 let policy = relayer.policies.get_solana_policy();
272 let swap_config = match policy.get_swap_config() {
273 Some(config) => config,
274 None => {
275 debug!(relayer_id = %relayer.id, "No swap configuration specified; skipping");
276 return false;
277 }
278 };
279
280 if swap_config.cron_schedule.is_none() {
281 debug!(relayer_id = %relayer.id, "No cron schedule specified; skipping");
282 return false;
283 }
284 true
285 })
286 .collect()
287}
288
289pub async fn initialize_solana_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
292 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
293) -> Result<()>
294where
295 J: JobProducerTrait + Send + Sync + 'static,
296 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
297 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
298 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
299 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
300 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
301 TCR: TransactionCounterTrait + Send + Sync + 'static,
302 PR: PluginRepositoryTrait + Send + Sync + 'static,
303 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
304{
305 let active_relayers = app_state.relayer_repository.list_active().await?;
306 let solena_relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
307
308 if solena_relayers_with_swap_enabled.is_empty() {
309 info!("No solana relayers with swap enabled");
310 return Ok(());
311 }
312 info!(
313 "Found {} solana relayers with swap enabled",
314 solena_relayers_with_swap_enabled.len()
315 );
316
317 let mut workers = Vec::new();
318
319 for relayer in solena_relayers_with_swap_enabled {
320 debug!(relayer = ?relayer, "found solana relayer with swap enabled");
321
322 let policy = relayer.policies.get_solana_policy();
323 let swap_config = match policy.get_swap_config() {
324 Some(config) => config,
325 None => {
326 info!("No swap configuration specified; skipping validation.");
327 continue;
328 }
329 };
330
331 let calendar_schedule = match swap_config.cron_schedule {
332 Some(schedule) => apalis_cron::Schedule::from_str(&schedule).unwrap(),
333 None => {
334 debug!(relayer = ?relayer, "no swap cron schedule found for relayer");
335 continue;
336 }
337 };
338
339 let worker = WorkerBuilder::new(format!("solana-swap-schedule-{}", relayer.id.clone()))
341 .layer(ErrorHandlingLayer::new())
342 .enable_tracing()
343 .catch_panic()
344 .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
345 .retry(BackoffRetryPolicy::default())
346 .concurrency(1)
347 .data(relayer.id.clone())
348 .data(app_state.clone())
349 .backend(CronStream::new(calendar_schedule))
350 .build_fn(solana_token_swap_cron_handler);
351
352 workers.push(worker);
353 info!(
354 "Created worker for solana relayer with swap enabled: {:?}",
355 relayer
356 );
357 }
358
359 let mut monitor = Monitor::new()
360 .on_event(monitor_handle_event)
361 .shutdown_timeout(Duration::from_millis(5000));
362
363 for worker in workers {
365 monitor = monitor.register(worker);
366 }
367
368 let monitor_future = monitor.run_with_signal(async {
369 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
370 .expect("Failed to create SIGINT signal");
371 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
372 .expect("Failed to create SIGTERM signal");
373
374 info!("Solana Swap Monitor started");
375
376 tokio::select! {
377 _ = sigint.recv() => info!("Received SIGINT."),
378 _ = sigterm.recv() => info!("Received SIGTERM."),
379 };
380
381 info!("Solana Swap Monitor shutting down");
382
383 Ok(())
384 });
385 tokio::spawn(async move {
386 if let Err(e) = monitor_future.await {
387 error!(error = %e, "monitor error");
388 }
389 });
390 Ok(())
391}
392
393fn monitor_handle_event(e: Worker<Event>) {
394 let worker_id = e.id();
395 match e.inner() {
396 Event::Engage(task_id) => {
397 debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
398 }
399 Event::Error(e) => {
400 error!(worker_id = %worker_id, error = %e, "worker encountered an error");
401 }
402 Event::Exit => {
403 debug!(worker_id = %worker_id, "worker exited");
404 }
405 Event::Idle => {
406 debug!(worker_id = %worker_id, "worker is idle");
407 }
408 Event::Start => {
409 debug!(worker_id = %worker_id, "worker started");
410 }
411 Event::Stop => {
412 debug!(worker_id = %worker_id, "worker stopped");
413 }
414 _ => {}
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use crate::models::{
422 NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel, RelayerSolanaPolicy,
423 RelayerSolanaSwapConfig,
424 };
425
426 fn create_test_evm_relayer(id: &str) -> RelayerRepoModel {
427 RelayerRepoModel {
428 id: id.to_string(),
429 name: format!("EVM Relayer {}", id),
430 network: "sepolia".to_string(),
431 paused: false,
432 network_type: NetworkType::Evm,
433 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
434 signer_id: "test-signer".to_string(),
435 address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
436 system_disabled: false,
437 ..Default::default()
438 }
439 }
440
441 fn create_test_solana_relayer_with_swap(
442 id: &str,
443 cron_schedule: Option<String>,
444 ) -> RelayerRepoModel {
445 RelayerRepoModel {
446 id: id.to_string(),
447 name: format!("Solana Relayer {}", id),
448 network: "mainnet-beta".to_string(),
449 paused: false,
450 network_type: NetworkType::Solana,
451 policies: RelayerNetworkPolicy::Solana(RelayerSolanaPolicy {
452 min_balance: Some(1000000000),
453 allowed_tokens: None,
454 allowed_programs: None,
455 max_signatures: None,
456 max_tx_data_size: None,
457 fee_payment_strategy: None,
458 fee_margin_percentage: None,
459 allowed_accounts: None,
460 disallowed_accounts: None,
461 max_allowed_fee_lamports: None,
462 swap_config: Some(RelayerSolanaSwapConfig {
463 strategy: None,
464 cron_schedule,
465 min_balance_threshold: Some(5000000000),
466 jupiter_swap_options: None,
467 }),
468 }),
469 signer_id: "test-signer".to_string(),
470 address: "5zWma6gn4QxRfC6xZk6KfpXWXXgV3Xt6VzPpXMKCMYW5".to_string(),
471 system_disabled: false,
472 ..Default::default()
473 }
474 }
475
476 #[test]
477 fn test_filter_relayers_for_swap_with_empty_list() {
478 let relayers = vec![];
479 let filtered = filter_relayers_for_swap(relayers);
480
481 assert_eq!(
482 filtered.len(),
483 0,
484 "Should return empty list when no relayers provided"
485 );
486 }
487
488 #[test]
489 fn test_filter_relayers_for_swap_filters_non_solana() {
490 let relayers = vec![
491 create_test_evm_relayer("evm-1"),
492 create_test_evm_relayer("evm-2"),
493 ];
494
495 let filtered = filter_relayers_for_swap(relayers);
496
497 assert_eq!(
498 filtered.len(),
499 0,
500 "Should filter out all non-Solana relayers"
501 );
502 }
503
504 #[test]
505 fn test_filter_relayers_for_swap_filters_no_cron_schedule() {
506 let relayers = vec![
507 create_test_solana_relayer_with_swap("solana-1", None),
508 create_test_solana_relayer_with_swap("solana-2", None),
509 ];
510
511 let filtered = filter_relayers_for_swap(relayers);
512
513 assert_eq!(
514 filtered.len(),
515 0,
516 "Should filter out Solana relayers without cron schedule"
517 );
518 }
519
520 #[test]
521 fn test_filter_relayers_for_swap_includes_valid_relayers() {
522 let relayers = vec![
523 create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
524 create_test_solana_relayer_with_swap("solana-2", Some("0 */2 * * * *".to_string())),
525 ];
526
527 let filtered = filter_relayers_for_swap(relayers);
528
529 assert_eq!(
530 filtered.len(),
531 2,
532 "Should include all Solana relayers with cron schedule"
533 );
534 assert_eq!(filtered[0].id, "solana-1");
535 assert_eq!(filtered[1].id, "solana-2");
536 }
537
538 #[test]
539 fn test_filter_relayers_for_swap_with_mixed_relayers() {
540 let relayers = vec![
541 create_test_evm_relayer("evm-1"),
542 create_test_solana_relayer_with_swap("solana-no-cron", None),
543 create_test_solana_relayer_with_swap(
544 "solana-with-cron-1",
545 Some("0 0 * * * *".to_string()),
546 ),
547 create_test_evm_relayer("evm-2"),
548 create_test_solana_relayer_with_swap(
549 "solana-with-cron-2",
550 Some("0 */3 * * * *".to_string()),
551 ),
552 ];
553
554 let filtered = filter_relayers_for_swap(relayers);
555
556 assert_eq!(
557 filtered.len(),
558 2,
559 "Should only include Solana relayers with cron schedule"
560 );
561
562 let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
564 assert!(
565 ids.contains(&"solana-with-cron-1"),
566 "Should include solana-with-cron-1"
567 );
568 assert!(
569 ids.contains(&"solana-with-cron-2"),
570 "Should include solana-with-cron-2"
571 );
572 assert!(!ids.contains(&"evm-1"), "Should not include EVM relayers");
573 assert!(
574 !ids.contains(&"solana-no-cron"),
575 "Should not include Solana without cron"
576 );
577 }
578
579 #[test]
580 fn test_filter_relayers_for_swap_preserves_relayer_data() {
581 let cron = "0 1 * * * *".to_string();
582 let relayers = vec![create_test_solana_relayer_with_swap(
583 "test-relayer",
584 Some(cron.clone()),
585 )];
586
587 let filtered = filter_relayers_for_swap(relayers);
588
589 assert_eq!(filtered.len(), 1);
590
591 let relayer = &filtered[0];
592 assert_eq!(relayer.id, "test-relayer");
593 assert_eq!(relayer.name, "Solana Relayer test-relayer");
594 assert_eq!(relayer.network_type, NetworkType::Solana);
595
596 let policy = relayer.policies.get_solana_policy();
598 let swap_config = policy.get_swap_config().expect("Should have swap config");
599 assert_eq!(swap_config.cron_schedule.as_ref(), Some(&cron));
600 }
601
602 #[test]
605 fn test_get_worker_configs_returns_all_workers() {
606 let configs = get_worker_configs();
607
608 assert_eq!(configs.len(), 7, "Should have 7 standard workers");
609 }
610
611 #[test]
612 fn test_worker_configs_have_correct_names() {
613 let configs = get_worker_configs();
614 let names: Vec<&str> = configs.iter().map(|c| c.name).collect();
615
616 assert!(
618 names.contains(&TRANSACTION_REQUEST),
619 "Should include transaction_request"
620 );
621 assert!(
622 names.contains(&TRANSACTION_SENDER),
623 "Should include transaction_sender"
624 );
625 assert!(
626 names.contains(&TRANSACTION_STATUS_CHECKER),
627 "Should include transaction_status_checker"
628 );
629 assert!(
630 names.contains(&NOTIFICATION_SENDER),
631 "Should include notification_sender"
632 );
633 assert!(
634 names.contains(&SOLANA_TOKEN_SWAP_REQUEST),
635 "Should include solana_token_swap_request"
636 );
637 assert!(
638 names.contains(&TRANSACTION_CLEANUP),
639 "Should include transaction_cleanup"
640 );
641 assert!(
642 names.contains(&RELAYER_HEALTH_CHECK),
643 "Should include relayer_health_check"
644 );
645 }
646
647 #[test]
648 fn test_worker_configs_have_correct_concurrency() {
649 let configs = get_worker_configs();
650
651 for config in configs {
652 match config.name {
653 TRANSACTION_REQUEST => assert_eq!(
654 config.concurrency, 10,
655 "transaction_request should have concurrency 10 (I/O-bound)"
656 ),
657 TRANSACTION_SENDER => assert_eq!(
658 config.concurrency, 10,
659 "transaction_sender should have concurrency 10 (I/O-bound)"
660 ),
661 TRANSACTION_STATUS_CHECKER => assert_eq!(
662 config.concurrency, 20,
663 "transaction_status_checker should have concurrency 20 (read-only RPC)"
664 ),
665 NOTIFICATION_SENDER => assert_eq!(
666 config.concurrency, 10,
667 "notification_sender should have concurrency 10 (I/O-bound)"
668 ),
669 SOLANA_TOKEN_SWAP_REQUEST => assert_eq!(
670 config.concurrency, 25,
671 "solana_token_swap_request should have concurrency 25 (DEX operations)"
672 ),
673 TRANSACTION_CLEANUP => assert_eq!(
674 config.concurrency, 1,
675 "transaction_cleanup should have concurrency 1 (DB conflicts)"
676 ),
677 RELAYER_HEALTH_CHECK => assert_eq!(
678 config.concurrency, 10,
679 "relayer_health_check should have concurrency 10 (independent checks)"
680 ),
681 _ => panic!("Unexpected worker name: {}", config.name),
682 }
683 }
684 }
685
686 #[test]
687 fn test_worker_configs_have_default_rate_limits() {
688 let configs = get_worker_configs();
689
690 for config in configs {
691 assert_eq!(
692 config.rate_limit, DEFAULT_RATE_LIMIT,
693 "Worker {} should have default rate limit",
694 config.name
695 );
696 assert_eq!(
697 config.rate_limit_duration, DEFAULT_RATE_LIMIT_DURATION,
698 "Worker {} should have default rate limit duration",
699 config.name
700 );
701 }
702 }
703
704 #[test]
705 fn test_worker_config_constants_are_unique() {
706 let configs = get_worker_configs();
707 let names: Vec<&str> = configs.iter().map(|c| c.name).collect();
708 let mut unique_names = names.clone();
709 unique_names.sort();
710 unique_names.dedup();
711
712 assert_eq!(
713 names.len(),
714 unique_names.len(),
715 "All worker names should be unique"
716 );
717 }
718
719 #[test]
720 fn test_worker_constants_match_strings() {
721 assert_eq!(TRANSACTION_REQUEST, "transaction_request");
723 assert_eq!(TRANSACTION_SENDER, "transaction_sender");
724 assert_eq!(TRANSACTION_STATUS_CHECKER, "transaction_status_checker");
725 assert_eq!(NOTIFICATION_SENDER, "notification_sender");
726 assert_eq!(SOLANA_TOKEN_SWAP_REQUEST, "solana_token_swap_request");
727 assert_eq!(TRANSACTION_CLEANUP, "transaction_cleanup");
728 assert_eq!(RELAYER_HEALTH_CHECK, "relayer_health_check");
729 }
730
731 #[test]
732 fn test_default_configuration_constants() {
733 assert_eq!(
735 DEFAULT_CONCURRENCY, 10,
736 "Default concurrency should be 10 for I/O-bound ops"
737 );
738 assert_eq!(
739 DEFAULT_RATE_LIMIT, 100,
740 "Default rate limit should be 100 for higher throughput"
741 );
742 assert_eq!(
743 DEFAULT_RATE_LIMIT_DURATION,
744 Duration::from_secs(1),
745 "Default rate limit duration should be 1 second"
746 );
747 }
748
749 #[test]
750 fn test_specialized_worker_concurrency() {
751 let configs = get_worker_configs();
752
753 let swap_worker = configs
755 .iter()
756 .find(|c| c.name == SOLANA_TOKEN_SWAP_REQUEST)
757 .expect("Should have solana swap worker");
758 assert_eq!(
759 swap_worker.concurrency, 25,
760 "Solana swap should handle 25 concurrent requests (DEX operations)"
761 );
762
763 let status_worker = configs
765 .iter()
766 .find(|c| c.name == TRANSACTION_STATUS_CHECKER)
767 .expect("Should have status checker");
768 assert_eq!(
769 status_worker.concurrency, 20,
770 "Status checker should have high concurrency 20 (read-only RPC calls)"
771 );
772
773 let cleanup_worker = configs
775 .iter()
776 .find(|c| c.name == TRANSACTION_CLEANUP)
777 .expect("Should have cleanup worker");
778 assert_eq!(
779 cleanup_worker.concurrency, 1,
780 "Cleanup should run single-threaded to avoid DB conflicts"
781 );
782
783 let health_worker = configs
785 .iter()
786 .find(|c| c.name == RELAYER_HEALTH_CHECK)
787 .expect("Should have health check worker");
788 assert_eq!(
789 health_worker.concurrency, 10,
790 "Health checks should handle 10 concurrent relayers"
791 );
792 }
793
794 #[test]
795 fn test_cron_scheduled_workers() {
796 let configs = get_worker_configs();
797
798 let cleanup_worker = configs
800 .iter()
801 .find(|c| c.name == TRANSACTION_CLEANUP)
802 .expect("Should have cleanup worker");
803
804 assert!(
805 cleanup_worker.cron_schedule.is_some(),
806 "Cleanup worker should have a cron schedule"
807 );
808
809 assert_eq!(
810 cleanup_worker.cron_schedule,
811 Some("0 */30 * * * *"),
812 "Cleanup should run every 30 minutes"
813 );
814
815 let schedule_str = cleanup_worker.cron_schedule.unwrap();
817 let schedule = apalis_cron::Schedule::from_str(schedule_str);
818 assert!(
819 schedule.is_ok(),
820 "Cron schedule should be valid: {}",
821 schedule_str
822 );
823 }
824
825 #[test]
826 fn test_non_cron_workers_have_no_schedule() {
827 let configs = get_worker_configs();
828
829 let queue_workers = vec![
831 TRANSACTION_REQUEST,
832 TRANSACTION_SENDER,
833 TRANSACTION_STATUS_CHECKER,
834 NOTIFICATION_SENDER,
835 SOLANA_TOKEN_SWAP_REQUEST,
836 RELAYER_HEALTH_CHECK,
837 ];
838
839 for worker_name in queue_workers {
840 let worker = configs
841 .iter()
842 .find(|c| c.name == worker_name)
843 .unwrap_or_else(|| panic!("Should have worker: {}", worker_name));
844
845 assert!(
846 worker.cron_schedule.is_none(),
847 "Queue-based worker {} should not have cron schedule",
848 worker_name
849 );
850 }
851 }
852}