openzeppelin_relayer/bootstrap/
initialize_workers.rs

1//! Worker initialization
2//!
3//! This module contains functions for initializing background workers,
4//! including job processors and other long-running tasks.
5use crate::{
6    jobs::{
7        notification_handler, relayer_health_check_handler, solana_token_swap_cron_handler,
8        solana_token_swap_request_handler, transaction_cleanup_handler,
9        transaction_request_handler, transaction_status_handler, transaction_submission_handler,
10        BackoffRetryPolicy, JobProducerTrait,
11    },
12    models::{
13        NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel,
14        ThinDataAppState, TransactionRepoModel,
15    },
16    repositories::{
17        ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
18        Repository, TransactionCounterTrait, TransactionRepository,
19    },
20};
21use apalis::{layers::ErrorHandlingLayer, prelude::*};
22use apalis_cron::CronStream;
23use eyre::Result;
24use std::{str::FromStr, time::Duration};
25use tokio::signal::unix::SignalKind;
26use tracing::{debug, error, info};
27
28// Worker configuration constants
29const DEFAULT_CONCURRENCY: usize = 10;
30const DEFAULT_RATE_LIMIT: u64 = 100;
31const DEFAULT_RATE_LIMIT_DURATION: Duration = Duration::from_secs(1);
32
33const TRANSACTION_REQUEST: &str = "transaction_request";
34const TRANSACTION_SENDER: &str = "transaction_sender";
35const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
36const NOTIFICATION_SENDER: &str = "notification_sender";
37const SOLANA_TOKEN_SWAP_REQUEST: &str = "solana_token_swap_request";
38const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
39const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
40
41/// Configuration for a worker
42#[derive(Debug, Clone)]
43pub struct WorkerConfig {
44    pub name: &'static str,
45    pub concurrency: usize,
46    pub rate_limit: u64,
47    pub rate_limit_duration: Duration,
48    /// Optional cron schedule for periodic workers (e.g., cleanup tasks)
49    pub cron_schedule: Option<&'static str>,
50}
51
52impl WorkerConfig {
53    pub const fn new(name: &'static str, concurrency: usize) -> Self {
54        Self {
55            name,
56            concurrency,
57            rate_limit: DEFAULT_RATE_LIMIT,
58            rate_limit_duration: DEFAULT_RATE_LIMIT_DURATION,
59            cron_schedule: None,
60        }
61    }
62
63    pub const fn with_cron(name: &'static str, concurrency: usize, cron: &'static str) -> Self {
64        Self {
65            name,
66            concurrency,
67            rate_limit: DEFAULT_RATE_LIMIT,
68            rate_limit_duration: DEFAULT_RATE_LIMIT_DURATION,
69            cron_schedule: Some(cron),
70        }
71    }
72}
73
74/// Get worker configurations for all standard workers
75///
76/// Concurrency reasoning:
77/// - Transaction processing (request/sender/status): I/O-bound blockchain calls → high concurrency
78/// - Notifications: I/O-bound webhook calls → high concurrency
79/// - Solana swap: DEX operations with multiple tokens → very high concurrency
80/// - Cleanup: Resource-intensive DB operations → low concurrency to avoid conflicts
81/// - Health checks: Multiple relayers checked independently → moderate-high concurrency
82pub fn get_worker_configs() -> Vec<WorkerConfig> {
83    vec![
84        // High concurrency for I/O-bound blockchain operations
85        WorkerConfig::new(TRANSACTION_REQUEST, DEFAULT_CONCURRENCY), // 10: Validate/parse requests
86        WorkerConfig::new(TRANSACTION_SENDER, DEFAULT_CONCURRENCY),  // 10: Submit to blockchains
87        WorkerConfig::new(TRANSACTION_STATUS_CHECKER, 20), // 20: Poll RPC endpoints (read-only)
88        // High concurrency for webhook notifications (I/O-bound HTTP calls)
89        WorkerConfig::new(NOTIFICATION_SENDER, DEFAULT_CONCURRENCY), // 10: Send webhooks
90        // Very high concurrency for DEX operations (multiple tokens/swaps)
91        WorkerConfig::new(SOLANA_TOKEN_SWAP_REQUEST, 25), // 25: Handle swap requests
92        // Low concurrency for resource-intensive periodic cleanup (cron-based)
93        WorkerConfig::with_cron(
94            TRANSACTION_CLEANUP,
95            1,                // 1: Avoid DB conflicts
96            "0 */30 * * * *", // Every 30 minutes
97        ),
98        // Moderate-high concurrency for independent health checks
99        WorkerConfig::new(RELAYER_HEALTH_CHECK, 10), // 10: Check multiple relayers
100    ]
101}
102
103pub async fn initialize_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
104    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
105) -> Result<()>
106where
107    J: JobProducerTrait + Send + Sync + 'static,
108    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
109    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
110    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
111    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
112    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
113    TCR: TransactionCounterTrait + Send + Sync + 'static,
114    PR: PluginRepositoryTrait + Send + Sync + 'static,
115    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
116{
117    let queue = app_state.job_producer().get_queue().await?;
118    let configs = get_worker_configs();
119
120    // Helper function to find config by name
121    let get_config = |name: &str| -> &WorkerConfig {
122        configs
123            .iter()
124            .find(|c| c.name == name)
125            .unwrap_or_else(|| panic!("Worker config for '{}' should exist", name))
126    };
127
128    let tx_req_config = get_config(TRANSACTION_REQUEST);
129    let transaction_request_queue_worker = WorkerBuilder::new(tx_req_config.name)
130        .layer(ErrorHandlingLayer::new())
131        .enable_tracing()
132        .catch_panic()
133        .rate_limit(tx_req_config.rate_limit, tx_req_config.rate_limit_duration)
134        .retry(BackoffRetryPolicy::default())
135        .concurrency(tx_req_config.concurrency)
136        .data(app_state.clone())
137        .backend(queue.transaction_request_queue.clone())
138        .build_fn(transaction_request_handler);
139
140    let tx_sub_config = get_config(TRANSACTION_SENDER);
141    let transaction_submission_queue_worker = WorkerBuilder::new(tx_sub_config.name)
142        .layer(ErrorHandlingLayer::new())
143        .enable_tracing()
144        .catch_panic()
145        .rate_limit(tx_sub_config.rate_limit, tx_sub_config.rate_limit_duration)
146        .retry(BackoffRetryPolicy::default())
147        .concurrency(tx_sub_config.concurrency)
148        .data(app_state.clone())
149        .backend(queue.transaction_submission_queue.clone())
150        .build_fn(transaction_submission_handler);
151
152    let tx_status_config = get_config(TRANSACTION_STATUS_CHECKER);
153    let transaction_status_queue_worker = WorkerBuilder::new(tx_status_config.name)
154        .layer(ErrorHandlingLayer::new())
155        .catch_panic()
156        .enable_tracing()
157        .rate_limit(
158            tx_status_config.rate_limit,
159            tx_status_config.rate_limit_duration,
160        )
161        .retry(BackoffRetryPolicy::default())
162        .concurrency(tx_status_config.concurrency)
163        .data(app_state.clone())
164        .backend(queue.transaction_status_queue.clone())
165        .build_fn(transaction_status_handler);
166
167    let notif_config = get_config(NOTIFICATION_SENDER);
168    let notification_queue_worker = WorkerBuilder::new(notif_config.name)
169        .layer(ErrorHandlingLayer::new())
170        .enable_tracing()
171        .catch_panic()
172        .rate_limit(notif_config.rate_limit, notif_config.rate_limit_duration)
173        .retry(BackoffRetryPolicy::default())
174        .concurrency(notif_config.concurrency)
175        .data(app_state.clone())
176        .backend(queue.notification_queue.clone())
177        .build_fn(notification_handler);
178
179    let swap_config = get_config(SOLANA_TOKEN_SWAP_REQUEST);
180    let solana_token_swap_request_queue_worker = WorkerBuilder::new(swap_config.name)
181        .layer(ErrorHandlingLayer::new())
182        .enable_tracing()
183        .catch_panic()
184        .rate_limit(swap_config.rate_limit, swap_config.rate_limit_duration)
185        .retry(BackoffRetryPolicy::default())
186        .concurrency(swap_config.concurrency)
187        .data(app_state.clone())
188        .backend(queue.solana_token_swap_request_queue.clone())
189        .build_fn(solana_token_swap_request_handler);
190
191    let cleanup_config = get_config(TRANSACTION_CLEANUP);
192    let transaction_cleanup_queue_worker = WorkerBuilder::new(cleanup_config.name)
193        .layer(ErrorHandlingLayer::new())
194        .enable_tracing()
195        .catch_panic()
196        .rate_limit(
197            cleanup_config.rate_limit,
198            cleanup_config.rate_limit_duration,
199        )
200        .retry(BackoffRetryPolicy::default())
201        .concurrency(cleanup_config.concurrency)
202        .data(app_state.clone())
203        .backend(CronStream::new(
204            apalis_cron::Schedule::from_str(
205                cleanup_config
206                    .cron_schedule
207                    .expect("TRANSACTION_CLEANUP should have cron schedule"),
208            )
209            .expect("Valid cron schedule"),
210        ))
211        .build_fn(transaction_cleanup_handler);
212
213    let health_config = get_config(RELAYER_HEALTH_CHECK);
214    let relayer_health_check_worker = WorkerBuilder::new(health_config.name)
215        .layer(ErrorHandlingLayer::new())
216        .enable_tracing()
217        .catch_panic()
218        .rate_limit(health_config.rate_limit, health_config.rate_limit_duration)
219        .retry(BackoffRetryPolicy::default())
220        .concurrency(health_config.concurrency)
221        .data(app_state.clone())
222        .backend(queue.relayer_health_check_queue.clone())
223        .build_fn(relayer_health_check_handler);
224
225    let monitor = Monitor::new()
226        .register(transaction_request_queue_worker)
227        .register(transaction_submission_queue_worker)
228        .register(transaction_status_queue_worker)
229        .register(notification_queue_worker)
230        .register(solana_token_swap_request_queue_worker)
231        .register(transaction_cleanup_queue_worker)
232        .register(relayer_health_check_worker)
233        .on_event(monitor_handle_event)
234        .shutdown_timeout(Duration::from_millis(5000));
235
236    let monitor_future = monitor.run_with_signal(async {
237        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
238            .expect("Failed to create SIGINT signal");
239        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
240            .expect("Failed to create SIGTERM signal");
241
242        info!("Monitor started");
243
244        tokio::select! {
245            _ = sigint.recv() => info!("Received SIGINT."),
246            _ = sigterm.recv() => info!("Received SIGTERM."),
247        };
248
249        info!("Monitor shutting down");
250
251        Ok(())
252    });
253    tokio::spawn(async move {
254        if let Err(e) = monitor_future.await {
255            error!(error = %e, "monitor error");
256        }
257    });
258    info!("Monitor shutdown complete");
259    Ok(())
260}
261
262/// Filters relayers to find those eligible for swap workers
263/// Returns relayers that have:
264/// 1. Solana network type
265/// 2. Swap configuration
266/// 3. Cron schedule defined
267fn filter_relayers_for_swap(relayers: Vec<RelayerRepoModel>) -> Vec<RelayerRepoModel> {
268    relayers
269        .into_iter()
270        .filter(|relayer| {
271            let policy = relayer.policies.get_solana_policy();
272            let swap_config = match policy.get_swap_config() {
273                Some(config) => config,
274                None => {
275                    debug!(relayer_id = %relayer.id, "No swap configuration specified; skipping");
276                    return false;
277                }
278            };
279
280            if swap_config.cron_schedule.is_none() {
281                debug!(relayer_id = %relayer.id, "No cron schedule specified; skipping");
282                return false;
283            }
284            true
285        })
286        .collect()
287}
288
289/// Initializes the Solana swap workers
290/// This function creates and registers workers for Solana relayers that have swap enabled and cron schedule set.
291pub async fn initialize_solana_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
292    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
293) -> Result<()>
294where
295    J: JobProducerTrait + Send + Sync + 'static,
296    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
297    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
298    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
299    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
300    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
301    TCR: TransactionCounterTrait + Send + Sync + 'static,
302    PR: PluginRepositoryTrait + Send + Sync + 'static,
303    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
304{
305    let active_relayers = app_state.relayer_repository.list_active().await?;
306    let solena_relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
307
308    if solena_relayers_with_swap_enabled.is_empty() {
309        info!("No solana relayers with swap enabled");
310        return Ok(());
311    }
312    info!(
313        "Found {} solana relayers with swap enabled",
314        solena_relayers_with_swap_enabled.len()
315    );
316
317    let mut workers = Vec::new();
318
319    for relayer in solena_relayers_with_swap_enabled {
320        debug!(relayer = ?relayer, "found solana relayer with swap enabled");
321
322        let policy = relayer.policies.get_solana_policy();
323        let swap_config = match policy.get_swap_config() {
324            Some(config) => config,
325            None => {
326                info!("No swap configuration specified; skipping validation.");
327                continue;
328            }
329        };
330
331        let calendar_schedule = match swap_config.cron_schedule {
332            Some(schedule) => apalis_cron::Schedule::from_str(&schedule).unwrap(),
333            None => {
334                debug!(relayer = ?relayer, "no swap cron schedule found for relayer");
335                continue;
336            }
337        };
338
339        // Create worker and add to the workers vector
340        let worker = WorkerBuilder::new(format!("solana-swap-schedule-{}", relayer.id.clone()))
341            .layer(ErrorHandlingLayer::new())
342            .enable_tracing()
343            .catch_panic()
344            .rate_limit(DEFAULT_RATE_LIMIT, DEFAULT_RATE_LIMIT_DURATION)
345            .retry(BackoffRetryPolicy::default())
346            .concurrency(1)
347            .data(relayer.id.clone())
348            .data(app_state.clone())
349            .backend(CronStream::new(calendar_schedule))
350            .build_fn(solana_token_swap_cron_handler);
351
352        workers.push(worker);
353        info!(
354            "Created worker for solana relayer with swap enabled: {:?}",
355            relayer
356        );
357    }
358
359    let mut monitor = Monitor::new()
360        .on_event(monitor_handle_event)
361        .shutdown_timeout(Duration::from_millis(5000));
362
363    // Register all workers with the monitor
364    for worker in workers {
365        monitor = monitor.register(worker);
366    }
367
368    let monitor_future = monitor.run_with_signal(async {
369        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
370            .expect("Failed to create SIGINT signal");
371        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
372            .expect("Failed to create SIGTERM signal");
373
374        info!("Solana Swap Monitor started");
375
376        tokio::select! {
377            _ = sigint.recv() => info!("Received SIGINT."),
378            _ = sigterm.recv() => info!("Received SIGTERM."),
379        };
380
381        info!("Solana Swap Monitor shutting down");
382
383        Ok(())
384    });
385    tokio::spawn(async move {
386        if let Err(e) = monitor_future.await {
387            error!(error = %e, "monitor error");
388        }
389    });
390    Ok(())
391}
392
393fn monitor_handle_event(e: Worker<Event>) {
394    let worker_id = e.id();
395    match e.inner() {
396        Event::Engage(task_id) => {
397            debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
398        }
399        Event::Error(e) => {
400            error!(worker_id = %worker_id, error = %e, "worker encountered an error");
401        }
402        Event::Exit => {
403            debug!(worker_id = %worker_id, "worker exited");
404        }
405        Event::Idle => {
406            debug!(worker_id = %worker_id, "worker is idle");
407        }
408        Event::Start => {
409            debug!(worker_id = %worker_id, "worker started");
410        }
411        Event::Stop => {
412            debug!(worker_id = %worker_id, "worker stopped");
413        }
414        _ => {}
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use crate::models::{
422        NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel, RelayerSolanaPolicy,
423        RelayerSolanaSwapConfig,
424    };
425
426    fn create_test_evm_relayer(id: &str) -> RelayerRepoModel {
427        RelayerRepoModel {
428            id: id.to_string(),
429            name: format!("EVM Relayer {}", id),
430            network: "sepolia".to_string(),
431            paused: false,
432            network_type: NetworkType::Evm,
433            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
434            signer_id: "test-signer".to_string(),
435            address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
436            system_disabled: false,
437            ..Default::default()
438        }
439    }
440
441    fn create_test_solana_relayer_with_swap(
442        id: &str,
443        cron_schedule: Option<String>,
444    ) -> RelayerRepoModel {
445        RelayerRepoModel {
446            id: id.to_string(),
447            name: format!("Solana Relayer {}", id),
448            network: "mainnet-beta".to_string(),
449            paused: false,
450            network_type: NetworkType::Solana,
451            policies: RelayerNetworkPolicy::Solana(RelayerSolanaPolicy {
452                min_balance: Some(1000000000),
453                allowed_tokens: None,
454                allowed_programs: None,
455                max_signatures: None,
456                max_tx_data_size: None,
457                fee_payment_strategy: None,
458                fee_margin_percentage: None,
459                allowed_accounts: None,
460                disallowed_accounts: None,
461                max_allowed_fee_lamports: None,
462                swap_config: Some(RelayerSolanaSwapConfig {
463                    strategy: None,
464                    cron_schedule,
465                    min_balance_threshold: Some(5000000000),
466                    jupiter_swap_options: None,
467                }),
468            }),
469            signer_id: "test-signer".to_string(),
470            address: "5zWma6gn4QxRfC6xZk6KfpXWXXgV3Xt6VzPpXMKCMYW5".to_string(),
471            system_disabled: false,
472            ..Default::default()
473        }
474    }
475
476    #[test]
477    fn test_filter_relayers_for_swap_with_empty_list() {
478        let relayers = vec![];
479        let filtered = filter_relayers_for_swap(relayers);
480
481        assert_eq!(
482            filtered.len(),
483            0,
484            "Should return empty list when no relayers provided"
485        );
486    }
487
488    #[test]
489    fn test_filter_relayers_for_swap_filters_non_solana() {
490        let relayers = vec![
491            create_test_evm_relayer("evm-1"),
492            create_test_evm_relayer("evm-2"),
493        ];
494
495        let filtered = filter_relayers_for_swap(relayers);
496
497        assert_eq!(
498            filtered.len(),
499            0,
500            "Should filter out all non-Solana relayers"
501        );
502    }
503
504    #[test]
505    fn test_filter_relayers_for_swap_filters_no_cron_schedule() {
506        let relayers = vec![
507            create_test_solana_relayer_with_swap("solana-1", None),
508            create_test_solana_relayer_with_swap("solana-2", None),
509        ];
510
511        let filtered = filter_relayers_for_swap(relayers);
512
513        assert_eq!(
514            filtered.len(),
515            0,
516            "Should filter out Solana relayers without cron schedule"
517        );
518    }
519
520    #[test]
521    fn test_filter_relayers_for_swap_includes_valid_relayers() {
522        let relayers = vec![
523            create_test_solana_relayer_with_swap("solana-1", Some("0 0 * * * *".to_string())),
524            create_test_solana_relayer_with_swap("solana-2", Some("0 */2 * * * *".to_string())),
525        ];
526
527        let filtered = filter_relayers_for_swap(relayers);
528
529        assert_eq!(
530            filtered.len(),
531            2,
532            "Should include all Solana relayers with cron schedule"
533        );
534        assert_eq!(filtered[0].id, "solana-1");
535        assert_eq!(filtered[1].id, "solana-2");
536    }
537
538    #[test]
539    fn test_filter_relayers_for_swap_with_mixed_relayers() {
540        let relayers = vec![
541            create_test_evm_relayer("evm-1"),
542            create_test_solana_relayer_with_swap("solana-no-cron", None),
543            create_test_solana_relayer_with_swap(
544                "solana-with-cron-1",
545                Some("0 0 * * * *".to_string()),
546            ),
547            create_test_evm_relayer("evm-2"),
548            create_test_solana_relayer_with_swap(
549                "solana-with-cron-2",
550                Some("0 */3 * * * *".to_string()),
551            ),
552        ];
553
554        let filtered = filter_relayers_for_swap(relayers);
555
556        assert_eq!(
557            filtered.len(),
558            2,
559            "Should only include Solana relayers with cron schedule"
560        );
561
562        // Verify the correct relayers were included
563        let ids: Vec<&str> = filtered.iter().map(|r| r.id.as_str()).collect();
564        assert!(
565            ids.contains(&"solana-with-cron-1"),
566            "Should include solana-with-cron-1"
567        );
568        assert!(
569            ids.contains(&"solana-with-cron-2"),
570            "Should include solana-with-cron-2"
571        );
572        assert!(!ids.contains(&"evm-1"), "Should not include EVM relayers");
573        assert!(
574            !ids.contains(&"solana-no-cron"),
575            "Should not include Solana without cron"
576        );
577    }
578
579    #[test]
580    fn test_filter_relayers_for_swap_preserves_relayer_data() {
581        let cron = "0 1 * * * *".to_string();
582        let relayers = vec![create_test_solana_relayer_with_swap(
583            "test-relayer",
584            Some(cron.clone()),
585        )];
586
587        let filtered = filter_relayers_for_swap(relayers);
588
589        assert_eq!(filtered.len(), 1);
590
591        let relayer = &filtered[0];
592        assert_eq!(relayer.id, "test-relayer");
593        assert_eq!(relayer.name, "Solana Relayer test-relayer");
594        assert_eq!(relayer.network_type, NetworkType::Solana);
595
596        // Verify swap config is preserved
597        let policy = relayer.policies.get_solana_policy();
598        let swap_config = policy.get_swap_config().expect("Should have swap config");
599        assert_eq!(swap_config.cron_schedule.as_ref(), Some(&cron));
600    }
601
602    // ===== Worker Configuration Tests =====
603
604    #[test]
605    fn test_get_worker_configs_returns_all_workers() {
606        let configs = get_worker_configs();
607
608        assert_eq!(configs.len(), 7, "Should have 7 standard workers");
609    }
610
611    #[test]
612    fn test_worker_configs_have_correct_names() {
613        let configs = get_worker_configs();
614        let names: Vec<&str> = configs.iter().map(|c| c.name).collect();
615
616        // Verify all expected workers are present
617        assert!(
618            names.contains(&TRANSACTION_REQUEST),
619            "Should include transaction_request"
620        );
621        assert!(
622            names.contains(&TRANSACTION_SENDER),
623            "Should include transaction_sender"
624        );
625        assert!(
626            names.contains(&TRANSACTION_STATUS_CHECKER),
627            "Should include transaction_status_checker"
628        );
629        assert!(
630            names.contains(&NOTIFICATION_SENDER),
631            "Should include notification_sender"
632        );
633        assert!(
634            names.contains(&SOLANA_TOKEN_SWAP_REQUEST),
635            "Should include solana_token_swap_request"
636        );
637        assert!(
638            names.contains(&TRANSACTION_CLEANUP),
639            "Should include transaction_cleanup"
640        );
641        assert!(
642            names.contains(&RELAYER_HEALTH_CHECK),
643            "Should include relayer_health_check"
644        );
645    }
646
647    #[test]
648    fn test_worker_configs_have_correct_concurrency() {
649        let configs = get_worker_configs();
650
651        for config in configs {
652            match config.name {
653                TRANSACTION_REQUEST => assert_eq!(
654                    config.concurrency, 10,
655                    "transaction_request should have concurrency 10 (I/O-bound)"
656                ),
657                TRANSACTION_SENDER => assert_eq!(
658                    config.concurrency, 10,
659                    "transaction_sender should have concurrency 10 (I/O-bound)"
660                ),
661                TRANSACTION_STATUS_CHECKER => assert_eq!(
662                    config.concurrency, 20,
663                    "transaction_status_checker should have concurrency 20 (read-only RPC)"
664                ),
665                NOTIFICATION_SENDER => assert_eq!(
666                    config.concurrency, 10,
667                    "notification_sender should have concurrency 10 (I/O-bound)"
668                ),
669                SOLANA_TOKEN_SWAP_REQUEST => assert_eq!(
670                    config.concurrency, 25,
671                    "solana_token_swap_request should have concurrency 25 (DEX operations)"
672                ),
673                TRANSACTION_CLEANUP => assert_eq!(
674                    config.concurrency, 1,
675                    "transaction_cleanup should have concurrency 1 (DB conflicts)"
676                ),
677                RELAYER_HEALTH_CHECK => assert_eq!(
678                    config.concurrency, 10,
679                    "relayer_health_check should have concurrency 10 (independent checks)"
680                ),
681                _ => panic!("Unexpected worker name: {}", config.name),
682            }
683        }
684    }
685
686    #[test]
687    fn test_worker_configs_have_default_rate_limits() {
688        let configs = get_worker_configs();
689
690        for config in configs {
691            assert_eq!(
692                config.rate_limit, DEFAULT_RATE_LIMIT,
693                "Worker {} should have default rate limit",
694                config.name
695            );
696            assert_eq!(
697                config.rate_limit_duration, DEFAULT_RATE_LIMIT_DURATION,
698                "Worker {} should have default rate limit duration",
699                config.name
700            );
701        }
702    }
703
704    #[test]
705    fn test_worker_config_constants_are_unique() {
706        let configs = get_worker_configs();
707        let names: Vec<&str> = configs.iter().map(|c| c.name).collect();
708        let mut unique_names = names.clone();
709        unique_names.sort();
710        unique_names.dedup();
711
712        assert_eq!(
713            names.len(),
714            unique_names.len(),
715            "All worker names should be unique"
716        );
717    }
718
719    #[test]
720    fn test_worker_constants_match_strings() {
721        // Verify constants are set correctly
722        assert_eq!(TRANSACTION_REQUEST, "transaction_request");
723        assert_eq!(TRANSACTION_SENDER, "transaction_sender");
724        assert_eq!(TRANSACTION_STATUS_CHECKER, "transaction_status_checker");
725        assert_eq!(NOTIFICATION_SENDER, "notification_sender");
726        assert_eq!(SOLANA_TOKEN_SWAP_REQUEST, "solana_token_swap_request");
727        assert_eq!(TRANSACTION_CLEANUP, "transaction_cleanup");
728        assert_eq!(RELAYER_HEALTH_CHECK, "relayer_health_check");
729    }
730
731    #[test]
732    fn test_default_configuration_constants() {
733        // Verify defaults are sensible for I/O-bound operations
734        assert_eq!(
735            DEFAULT_CONCURRENCY, 10,
736            "Default concurrency should be 10 for I/O-bound ops"
737        );
738        assert_eq!(
739            DEFAULT_RATE_LIMIT, 100,
740            "Default rate limit should be 100 for higher throughput"
741        );
742        assert_eq!(
743            DEFAULT_RATE_LIMIT_DURATION,
744            Duration::from_secs(1),
745            "Default rate limit duration should be 1 second"
746        );
747    }
748
749    #[test]
750    fn test_specialized_worker_concurrency() {
751        let configs = get_worker_configs();
752
753        // Very high concurrency for DEX swap operations
754        let swap_worker = configs
755            .iter()
756            .find(|c| c.name == SOLANA_TOKEN_SWAP_REQUEST)
757            .expect("Should have solana swap worker");
758        assert_eq!(
759            swap_worker.concurrency, 25,
760            "Solana swap should handle 25 concurrent requests (DEX operations)"
761        );
762
763        // Highest concurrency for read-only status checks
764        let status_worker = configs
765            .iter()
766            .find(|c| c.name == TRANSACTION_STATUS_CHECKER)
767            .expect("Should have status checker");
768        assert_eq!(
769            status_worker.concurrency, 20,
770            "Status checker should have high concurrency 20 (read-only RPC calls)"
771        );
772
773        // Low concurrency for cleanup (resource-intensive, avoid DB conflicts)
774        let cleanup_worker = configs
775            .iter()
776            .find(|c| c.name == TRANSACTION_CLEANUP)
777            .expect("Should have cleanup worker");
778        assert_eq!(
779            cleanup_worker.concurrency, 1,
780            "Cleanup should run single-threaded to avoid DB conflicts"
781        );
782
783        // High concurrency for independent health checks
784        let health_worker = configs
785            .iter()
786            .find(|c| c.name == RELAYER_HEALTH_CHECK)
787            .expect("Should have health check worker");
788        assert_eq!(
789            health_worker.concurrency, 10,
790            "Health checks should handle 10 concurrent relayers"
791        );
792    }
793
794    #[test]
795    fn test_cron_scheduled_workers() {
796        let configs = get_worker_configs();
797
798        // Find cleanup worker which should have a cron schedule
799        let cleanup_worker = configs
800            .iter()
801            .find(|c| c.name == TRANSACTION_CLEANUP)
802            .expect("Should have cleanup worker");
803
804        assert!(
805            cleanup_worker.cron_schedule.is_some(),
806            "Cleanup worker should have a cron schedule"
807        );
808
809        assert_eq!(
810            cleanup_worker.cron_schedule,
811            Some("0 */30 * * * *"),
812            "Cleanup should run every 30 minutes"
813        );
814
815        // Verify the cron schedule is valid
816        let schedule_str = cleanup_worker.cron_schedule.unwrap();
817        let schedule = apalis_cron::Schedule::from_str(schedule_str);
818        assert!(
819            schedule.is_ok(),
820            "Cron schedule should be valid: {}",
821            schedule_str
822        );
823    }
824
825    #[test]
826    fn test_non_cron_workers_have_no_schedule() {
827        let configs = get_worker_configs();
828
829        // Regular queue-based workers should not have cron schedules
830        let queue_workers = vec![
831            TRANSACTION_REQUEST,
832            TRANSACTION_SENDER,
833            TRANSACTION_STATUS_CHECKER,
834            NOTIFICATION_SENDER,
835            SOLANA_TOKEN_SWAP_REQUEST,
836            RELAYER_HEALTH_CHECK,
837        ];
838
839        for worker_name in queue_workers {
840            let worker = configs
841                .iter()
842                .find(|c| c.name == worker_name)
843                .unwrap_or_else(|| panic!("Should have worker: {}", worker_name));
844
845            assert!(
846                worker.cron_schedule.is_none(),
847                "Queue-based worker {} should not have cron schedule",
848                worker_name
849            );
850        }
851    }
852}