openzeppelin_relayer/jobs/
queue.rs

1//! Queue management module for job processing.
2//!
3//! This module provides Redis-backed queue implementation for handling different types of jobs:
4//! - Transaction requests
5//! - Transaction submissions
6//! - Transaction status checks
7//! - Notifications
8//! - Solana swap requests
9//! - Relayer health checks
10use std::{env, sync::Arc};
11
12use apalis_redis::{Config, ConnectionManager, RedisStorage};
13use color_eyre::{eyre, Result};
14use serde::{Deserialize, Serialize};
15use tokio::time::{timeout, Duration};
16use tracing::error;
17
18use crate::config::ServerConfig;
19
20use super::{
21    Job, NotificationSend, RelayerHealthCheck, SolanaTokenSwapRequest, TransactionRequest,
22    TransactionSend, TransactionStatusCheck,
23};
24
25#[derive(Clone, Debug)]
26pub struct Queue {
27    pub transaction_request_queue: RedisStorage<Job<TransactionRequest>>,
28    pub transaction_submission_queue: RedisStorage<Job<TransactionSend>>,
29    pub transaction_status_queue: RedisStorage<Job<TransactionStatusCheck>>,
30    pub notification_queue: RedisStorage<Job<NotificationSend>>,
31    pub solana_token_swap_request_queue: RedisStorage<Job<SolanaTokenSwapRequest>>,
32    pub relayer_health_check_queue: RedisStorage<Job<RelayerHealthCheck>>,
33}
34
35impl Queue {
36    async fn storage<T: Serialize + for<'de> Deserialize<'de>>(
37        namespace: &str,
38        shared: Arc<ConnectionManager>,
39    ) -> Result<RedisStorage<T>> {
40        let config = Config::default()
41            .set_namespace(namespace)
42            .set_enqueue_scheduled(Duration::from_secs(1)); // Sets the polling interval for scheduled jobs from default 30 seconds
43
44        Ok(RedisStorage::new_with_config((*shared).clone(), config))
45    }
46
47    pub async fn setup() -> Result<Self> {
48        let config = ServerConfig::from_env();
49        let redis_url = config.redis_url.clone();
50        let redis_connection_timeout_ms = config.redis_connection_timeout_ms;
51        let conn = match timeout(Duration::from_millis(redis_connection_timeout_ms), apalis_redis::connect(redis_url.clone())).await {
52            Ok(result) => result.map_err(|e| {
53                error!(redis_url = %redis_url, error = %e, "failed to connect to redis");
54                eyre::eyre!("Failed to connect to Redis. Please ensure Redis is running and accessible at {}. Error: {}", redis_url, e)
55            })?,
56            Err(_) => {
57                error!(redis_url = %redis_url, "timeout connecting to redis");
58                return Err(eyre::eyre!("Timed out after {} milliseconds while connecting to Redis at {}", redis_connection_timeout_ms, redis_url));
59            }
60        };
61
62        let shared = Arc::new(conn);
63        // use REDIS_KEY_PREFIX only if set, otherwise do not use it
64        let redis_key_prefix = env::var("REDIS_KEY_PREFIX")
65            .ok()
66            .filter(|v| !v.is_empty())
67            .map(|value| format!("{value}:queue:"))
68            .unwrap_or_default();
69        Ok(Self {
70            transaction_request_queue: Self::storage(
71                &format!("{}transaction_request_queue", redis_key_prefix),
72                shared.clone(),
73            )
74            .await?,
75            transaction_submission_queue: Self::storage(
76                &format!("{}transaction_submission_queue", redis_key_prefix),
77                shared.clone(),
78            )
79            .await?,
80            transaction_status_queue: Self::storage(
81                &format!("{}transaction_status_queue", redis_key_prefix),
82                shared.clone(),
83            )
84            .await?,
85            notification_queue: Self::storage(
86                &format!("{}notification_queue", redis_key_prefix),
87                shared.clone(),
88            )
89            .await?,
90            solana_token_swap_request_queue: Self::storage(
91                &format!("{}solana_token_swap_request_queue", redis_key_prefix),
92                shared.clone(),
93            )
94            .await?,
95            relayer_health_check_queue: Self::storage(
96                &format!("{}relayer_health_check_queue", redis_key_prefix),
97                shared.clone(),
98            )
99            .await?,
100        })
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107
108    #[tokio::test]
109    async fn test_queue_storage_configuration() {
110        // Test the config creation logic without actual Redis connections
111        let namespace = "test_namespace";
112        let config = Config::default().set_namespace(namespace);
113
114        assert_eq!(config.get_namespace(), namespace);
115    }
116
117    // Mock version of Queue for testing
118    #[derive(Clone, Debug)]
119    struct MockQueue {
120        pub namespace_transaction_request: String,
121        pub namespace_transaction_submission: String,
122        pub namespace_transaction_status: String,
123        pub namespace_notification: String,
124        pub namespace_solana_token_swap_request_queue: String,
125        pub namespace_relayer_health_check_queue: String,
126    }
127
128    impl MockQueue {
129        fn new() -> Self {
130            Self {
131                namespace_transaction_request: "transaction_request_queue".to_string(),
132                namespace_transaction_submission: "transaction_submission_queue".to_string(),
133                namespace_transaction_status: "transaction_status_queue".to_string(),
134                namespace_notification: "notification_queue".to_string(),
135                namespace_solana_token_swap_request_queue: "solana_token_swap_request_queue"
136                    .to_string(),
137                namespace_relayer_health_check_queue: "relayer_health_check_queue".to_string(),
138            }
139        }
140    }
141
142    #[test]
143    fn test_queue_namespaces() {
144        let mock_queue = MockQueue::new();
145
146        assert_eq!(
147            mock_queue.namespace_transaction_request,
148            "transaction_request_queue"
149        );
150        assert_eq!(
151            mock_queue.namespace_transaction_submission,
152            "transaction_submission_queue"
153        );
154        assert_eq!(
155            mock_queue.namespace_transaction_status,
156            "transaction_status_queue"
157        );
158        assert_eq!(mock_queue.namespace_notification, "notification_queue");
159        assert_eq!(
160            mock_queue.namespace_solana_token_swap_request_queue,
161            "solana_token_swap_request_queue"
162        );
163        assert_eq!(
164            mock_queue.namespace_relayer_health_check_queue,
165            "relayer_health_check_queue"
166        );
167    }
168}