openzeppelin_relayer/jobs/
queue.rs1use std::{env, sync::Arc};
11
12use apalis_redis::{Config, ConnectionManager, RedisStorage};
13use color_eyre::{eyre, Result};
14use serde::{Deserialize, Serialize};
15use tokio::time::{timeout, Duration};
16use tracing::error;
17
18use crate::config::ServerConfig;
19
20use super::{
21 Job, NotificationSend, RelayerHealthCheck, SolanaTokenSwapRequest, TransactionRequest,
22 TransactionSend, TransactionStatusCheck,
23};
24
25#[derive(Clone, Debug)]
26pub struct Queue {
27 pub transaction_request_queue: RedisStorage<Job<TransactionRequest>>,
28 pub transaction_submission_queue: RedisStorage<Job<TransactionSend>>,
29 pub transaction_status_queue: RedisStorage<Job<TransactionStatusCheck>>,
30 pub notification_queue: RedisStorage<Job<NotificationSend>>,
31 pub solana_token_swap_request_queue: RedisStorage<Job<SolanaTokenSwapRequest>>,
32 pub relayer_health_check_queue: RedisStorage<Job<RelayerHealthCheck>>,
33}
34
35impl Queue {
36 async fn storage<T: Serialize + for<'de> Deserialize<'de>>(
37 namespace: &str,
38 shared: Arc<ConnectionManager>,
39 ) -> Result<RedisStorage<T>> {
40 let config = Config::default()
41 .set_namespace(namespace)
42 .set_enqueue_scheduled(Duration::from_secs(1)); Ok(RedisStorage::new_with_config((*shared).clone(), config))
45 }
46
47 pub async fn setup() -> Result<Self> {
48 let config = ServerConfig::from_env();
49 let redis_url = config.redis_url.clone();
50 let redis_connection_timeout_ms = config.redis_connection_timeout_ms;
51 let conn = match timeout(Duration::from_millis(redis_connection_timeout_ms), apalis_redis::connect(redis_url.clone())).await {
52 Ok(result) => result.map_err(|e| {
53 error!(redis_url = %redis_url, error = %e, "failed to connect to redis");
54 eyre::eyre!("Failed to connect to Redis. Please ensure Redis is running and accessible at {}. Error: {}", redis_url, e)
55 })?,
56 Err(_) => {
57 error!(redis_url = %redis_url, "timeout connecting to redis");
58 return Err(eyre::eyre!("Timed out after {} milliseconds while connecting to Redis at {}", redis_connection_timeout_ms, redis_url));
59 }
60 };
61
62 let shared = Arc::new(conn);
63 let redis_key_prefix = env::var("REDIS_KEY_PREFIX")
65 .ok()
66 .filter(|v| !v.is_empty())
67 .map(|value| format!("{value}:queue:"))
68 .unwrap_or_default();
69 Ok(Self {
70 transaction_request_queue: Self::storage(
71 &format!("{}transaction_request_queue", redis_key_prefix),
72 shared.clone(),
73 )
74 .await?,
75 transaction_submission_queue: Self::storage(
76 &format!("{}transaction_submission_queue", redis_key_prefix),
77 shared.clone(),
78 )
79 .await?,
80 transaction_status_queue: Self::storage(
81 &format!("{}transaction_status_queue", redis_key_prefix),
82 shared.clone(),
83 )
84 .await?,
85 notification_queue: Self::storage(
86 &format!("{}notification_queue", redis_key_prefix),
87 shared.clone(),
88 )
89 .await?,
90 solana_token_swap_request_queue: Self::storage(
91 &format!("{}solana_token_swap_request_queue", redis_key_prefix),
92 shared.clone(),
93 )
94 .await?,
95 relayer_health_check_queue: Self::storage(
96 &format!("{}relayer_health_check_queue", redis_key_prefix),
97 shared.clone(),
98 )
99 .await?,
100 })
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107
108 #[tokio::test]
109 async fn test_queue_storage_configuration() {
110 let namespace = "test_namespace";
112 let config = Config::default().set_namespace(namespace);
113
114 assert_eq!(config.get_namespace(), namespace);
115 }
116
117 #[derive(Clone, Debug)]
119 struct MockQueue {
120 pub namespace_transaction_request: String,
121 pub namespace_transaction_submission: String,
122 pub namespace_transaction_status: String,
123 pub namespace_notification: String,
124 pub namespace_solana_token_swap_request_queue: String,
125 pub namespace_relayer_health_check_queue: String,
126 }
127
128 impl MockQueue {
129 fn new() -> Self {
130 Self {
131 namespace_transaction_request: "transaction_request_queue".to_string(),
132 namespace_transaction_submission: "transaction_submission_queue".to_string(),
133 namespace_transaction_status: "transaction_status_queue".to_string(),
134 namespace_notification: "notification_queue".to_string(),
135 namespace_solana_token_swap_request_queue: "solana_token_swap_request_queue"
136 .to_string(),
137 namespace_relayer_health_check_queue: "relayer_health_check_queue".to_string(),
138 }
139 }
140 }
141
142 #[test]
143 fn test_queue_namespaces() {
144 let mock_queue = MockQueue::new();
145
146 assert_eq!(
147 mock_queue.namespace_transaction_request,
148 "transaction_request_queue"
149 );
150 assert_eq!(
151 mock_queue.namespace_transaction_submission,
152 "transaction_submission_queue"
153 );
154 assert_eq!(
155 mock_queue.namespace_transaction_status,
156 "transaction_status_queue"
157 );
158 assert_eq!(mock_queue.namespace_notification, "notification_queue");
159 assert_eq!(
160 mock_queue.namespace_solana_token_swap_request_queue,
161 "solana_token_swap_request_queue"
162 );
163 assert_eq!(
164 mock_queue.namespace_relayer_health_check_queue,
165 "relayer_health_check_queue"
166 );
167 }
168}