openzeppelin_relayer/jobs/
job_producer.rs

1//! Job producer module for enqueueing jobs to Redis queues.
2//!
3//! Provides functionality for producing various types of jobs:
4//! - Transaction processing jobs
5//! - Transaction submission jobs
6//! - Status monitoring jobs
7//! - Notification jobs
8
9use crate::{
10    jobs::{
11        Job, NotificationSend, Queue, RelayerHealthCheck, TransactionRequest, TransactionSend,
12        TransactionStatusCheck,
13    },
14    models::RelayerError,
15    observability::request_id::get_request_id,
16};
17use apalis::prelude::Storage;
18use apalis_redis::RedisError;
19use async_trait::async_trait;
20use serde::Serialize;
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tracing::{error, info};
24
25use super::{JobType, SolanaTokenSwapRequest};
26
27#[cfg(test)]
28use mockall::automock;
29
30#[derive(Debug, Error, Serialize)]
31pub enum JobProducerError {
32    #[error("Queue error: {0}")]
33    QueueError(String),
34}
35
36impl From<RedisError> for JobProducerError {
37    fn from(_: RedisError) -> Self {
38        JobProducerError::QueueError("Queue error".to_string())
39    }
40}
41
42impl From<JobProducerError> for RelayerError {
43    fn from(_: JobProducerError) -> Self {
44        RelayerError::QueueError("Queue error".to_string())
45    }
46}
47
48#[derive(Debug)]
49pub struct JobProducer {
50    queue: Mutex<Queue>,
51}
52
53impl Clone for JobProducer {
54    fn clone(&self) -> Self {
55        // We can't clone the Mutex directly, but we can create a new one with a cloned Queue
56        // This requires getting the lock first
57        let queue = self
58            .queue
59            .try_lock()
60            .expect("Failed to lock queue for cloning")
61            .clone();
62
63        Self {
64            queue: Mutex::new(queue),
65        }
66    }
67}
68
69#[async_trait]
70#[cfg_attr(test, automock)]
71pub trait JobProducerTrait: Send + Sync {
72    async fn produce_transaction_request_job(
73        &self,
74        transaction_process_job: TransactionRequest,
75        scheduled_on: Option<i64>,
76    ) -> Result<(), JobProducerError>;
77
78    async fn produce_submit_transaction_job(
79        &self,
80        transaction_submit_job: TransactionSend,
81        scheduled_on: Option<i64>,
82    ) -> Result<(), JobProducerError>;
83
84    async fn produce_check_transaction_status_job(
85        &self,
86        transaction_status_check_job: TransactionStatusCheck,
87        scheduled_on: Option<i64>,
88    ) -> Result<(), JobProducerError>;
89
90    async fn produce_send_notification_job(
91        &self,
92        notification_send_job: NotificationSend,
93        scheduled_on: Option<i64>,
94    ) -> Result<(), JobProducerError>;
95
96    async fn produce_solana_token_swap_request_job(
97        &self,
98        solana_swap_request_job: SolanaTokenSwapRequest,
99        scheduled_on: Option<i64>,
100    ) -> Result<(), JobProducerError>;
101
102    async fn produce_relayer_health_check_job(
103        &self,
104        relayer_health_check_job: RelayerHealthCheck,
105        scheduled_on: Option<i64>,
106    ) -> Result<(), JobProducerError>;
107
108    async fn get_queue(&self) -> Result<Queue, JobProducerError>;
109}
110
111impl JobProducer {
112    pub fn new(queue: Queue) -> Self {
113        Self {
114            queue: Mutex::new(queue.clone()),
115        }
116    }
117
118    pub async fn get_queue(&self) -> Result<Queue, JobProducerError> {
119        let queue = self.queue.lock().await;
120
121        Ok(queue.clone())
122    }
123}
124
125#[async_trait]
126impl JobProducerTrait for JobProducer {
127    async fn get_queue(&self) -> Result<Queue, JobProducerError> {
128        let queue = self.queue.lock().await;
129
130        Ok(queue.clone())
131    }
132
133    async fn produce_transaction_request_job(
134        &self,
135        transaction_process_job: TransactionRequest,
136        scheduled_on: Option<i64>,
137    ) -> Result<(), JobProducerError> {
138        info!(
139            "Producing transaction request job: {:?}",
140            transaction_process_job
141        );
142        let mut queue = self.queue.lock().await;
143        let job = Job::new(JobType::TransactionRequest, transaction_process_job)
144            .with_request_id(get_request_id());
145
146        match scheduled_on {
147            Some(scheduled_on) => {
148                queue
149                    .transaction_request_queue
150                    .schedule(job, scheduled_on)
151                    .await?;
152            }
153            None => {
154                queue.transaction_request_queue.push(job).await?;
155            }
156        }
157        info!("Transaction job produced successfully");
158
159        Ok(())
160    }
161
162    async fn produce_submit_transaction_job(
163        &self,
164        transaction_submit_job: TransactionSend,
165        scheduled_on: Option<i64>,
166    ) -> Result<(), JobProducerError> {
167        let mut queue = self.queue.lock().await;
168        let job = Job::new(JobType::TransactionSend, transaction_submit_job)
169            .with_request_id(get_request_id());
170
171        match scheduled_on {
172            Some(on) => {
173                queue.transaction_submission_queue.schedule(job, on).await?;
174            }
175            None => {
176                queue.transaction_submission_queue.push(job).await?;
177            }
178        }
179        info!("Transaction Submit job produced successfully");
180
181        Ok(())
182    }
183
184    async fn produce_check_transaction_status_job(
185        &self,
186        transaction_status_check_job: TransactionStatusCheck,
187        scheduled_on: Option<i64>,
188    ) -> Result<(), JobProducerError> {
189        let mut queue = self.queue.lock().await;
190        let job = Job::new(
191            JobType::TransactionStatusCheck,
192            transaction_status_check_job,
193        )
194        .with_request_id(get_request_id());
195        match scheduled_on {
196            Some(on) => {
197                queue.transaction_status_queue.schedule(job, on).await?;
198            }
199            None => {
200                queue.transaction_status_queue.push(job).await?;
201            }
202        }
203        info!("Transaction Status Check job produced successfully");
204        Ok(())
205    }
206
207    async fn produce_send_notification_job(
208        &self,
209        notification_send_job: NotificationSend,
210        scheduled_on: Option<i64>,
211    ) -> Result<(), JobProducerError> {
212        let mut queue = self.queue.lock().await;
213        let job = Job::new(JobType::NotificationSend, notification_send_job)
214            .with_request_id(get_request_id());
215
216        match scheduled_on {
217            Some(on) => {
218                queue.notification_queue.schedule(job, on).await?;
219            }
220            None => {
221                queue.notification_queue.push(job).await?;
222            }
223        }
224
225        info!("Notification Send job produced successfully");
226        Ok(())
227    }
228
229    async fn produce_solana_token_swap_request_job(
230        &self,
231        solana_swap_request_job: SolanaTokenSwapRequest,
232        scheduled_on: Option<i64>,
233    ) -> Result<(), JobProducerError> {
234        let mut queue = self.queue.lock().await;
235        let job = Job::new(JobType::SolanaTokenSwapRequest, solana_swap_request_job)
236            .with_request_id(get_request_id());
237
238        match scheduled_on {
239            Some(on) => {
240                queue
241                    .solana_token_swap_request_queue
242                    .schedule(job, on)
243                    .await?;
244            }
245            None => {
246                queue.solana_token_swap_request_queue.push(job).await?;
247            }
248        }
249
250        info!("Solana token swap job produced successfully");
251        Ok(())
252    }
253
254    async fn produce_relayer_health_check_job(
255        &self,
256        relayer_health_check_job: RelayerHealthCheck,
257        scheduled_on: Option<i64>,
258    ) -> Result<(), JobProducerError> {
259        let job = Job::new(
260            JobType::RelayerHealthCheck,
261            relayer_health_check_job.clone(),
262        )
263        .with_request_id(get_request_id());
264
265        let mut queue = self.queue.lock().await;
266
267        match scheduled_on {
268            Some(scheduled_on) => {
269                queue
270                    .relayer_health_check_queue
271                    .schedule(job, scheduled_on)
272                    .await?;
273            }
274            None => {
275                queue.relayer_health_check_queue.push(job).await?;
276            }
277        }
278
279        Ok(())
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use crate::models::{
287        EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
288        WebhookPayload, U256,
289    };
290    use crate::utils::calculate_scheduled_timestamp;
291
292    #[derive(Clone, Debug)]
293    // Define a simplified queue for testing without using complex mocks
294    struct TestRedisStorage<T> {
295        pub push_called: bool,
296        pub schedule_called: bool,
297        _phantom: std::marker::PhantomData<T>,
298    }
299
300    impl<T> TestRedisStorage<T> {
301        fn new() -> Self {
302            Self {
303                push_called: false,
304                schedule_called: false,
305                _phantom: std::marker::PhantomData,
306            }
307        }
308
309        async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
310            self.push_called = true;
311            Ok(())
312        }
313
314        async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
315            self.schedule_called = true;
316            Ok(())
317        }
318    }
319
320    // A test version of the Queue
321    #[derive(Clone, Debug)]
322    struct TestQueue {
323        pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
324        pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
325        pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
326        pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
327        pub solana_token_swap_request_queue: TestRedisStorage<Job<SolanaTokenSwapRequest>>,
328        pub relayer_health_check_queue: TestRedisStorage<Job<RelayerHealthCheck>>,
329    }
330
331    impl TestQueue {
332        fn new() -> Self {
333            Self {
334                transaction_request_queue: TestRedisStorage::new(),
335                transaction_submission_queue: TestRedisStorage::new(),
336                transaction_status_queue: TestRedisStorage::new(),
337                notification_queue: TestRedisStorage::new(),
338                solana_token_swap_request_queue: TestRedisStorage::new(),
339                relayer_health_check_queue: TestRedisStorage::new(),
340            }
341        }
342    }
343
344    // A test version of JobProducer
345    struct TestJobProducer {
346        queue: Mutex<TestQueue>,
347    }
348
349    impl Clone for TestJobProducer {
350        fn clone(&self) -> Self {
351            let queue = self
352                .queue
353                .try_lock()
354                .expect("Failed to lock queue for cloning")
355                .clone();
356            Self {
357                queue: Mutex::new(queue),
358            }
359        }
360    }
361
362    impl TestJobProducer {
363        fn new() -> Self {
364            Self {
365                queue: Mutex::new(TestQueue::new()),
366            }
367        }
368
369        async fn get_queue(&self) -> TestQueue {
370            self.queue.lock().await.clone()
371        }
372    }
373
374    #[async_trait]
375    impl JobProducerTrait for TestJobProducer {
376        async fn get_queue(&self) -> Result<Queue, JobProducerError> {
377            unimplemented!("get_queue not used in tests")
378        }
379
380        async fn produce_transaction_request_job(
381            &self,
382            transaction_process_job: TransactionRequest,
383            scheduled_on: Option<i64>,
384        ) -> Result<(), JobProducerError> {
385            let mut queue = self.queue.lock().await;
386            let job = Job::new(JobType::TransactionRequest, transaction_process_job);
387
388            match scheduled_on {
389                Some(scheduled_on) => {
390                    queue
391                        .transaction_request_queue
392                        .schedule(job, scheduled_on)
393                        .await?;
394                }
395                None => {
396                    queue.transaction_request_queue.push(job).await?;
397                }
398            }
399
400            Ok(())
401        }
402
403        async fn produce_submit_transaction_job(
404            &self,
405            transaction_submit_job: TransactionSend,
406            scheduled_on: Option<i64>,
407        ) -> Result<(), JobProducerError> {
408            let mut queue = self.queue.lock().await;
409            let job = Job::new(JobType::TransactionSend, transaction_submit_job);
410
411            match scheduled_on {
412                Some(on) => {
413                    queue.transaction_submission_queue.schedule(job, on).await?;
414                }
415                None => {
416                    queue.transaction_submission_queue.push(job).await?;
417                }
418            }
419
420            Ok(())
421        }
422
423        async fn produce_check_transaction_status_job(
424            &self,
425            transaction_status_check_job: TransactionStatusCheck,
426            scheduled_on: Option<i64>,
427        ) -> Result<(), JobProducerError> {
428            let mut queue = self.queue.lock().await;
429            let job = Job::new(
430                JobType::TransactionStatusCheck,
431                transaction_status_check_job,
432            );
433
434            match scheduled_on {
435                Some(on) => {
436                    queue.transaction_status_queue.schedule(job, on).await?;
437                }
438                None => {
439                    queue.transaction_status_queue.push(job).await?;
440                }
441            }
442
443            Ok(())
444        }
445
446        async fn produce_send_notification_job(
447            &self,
448            notification_send_job: NotificationSend,
449            scheduled_on: Option<i64>,
450        ) -> Result<(), JobProducerError> {
451            let mut queue = self.queue.lock().await;
452            let job = Job::new(JobType::NotificationSend, notification_send_job);
453
454            match scheduled_on {
455                Some(on) => {
456                    queue.notification_queue.schedule(job, on).await?;
457                }
458                None => {
459                    queue.notification_queue.push(job).await?;
460                }
461            }
462
463            Ok(())
464        }
465
466        async fn produce_solana_token_swap_request_job(
467            &self,
468            solana_token_swap_request_job: SolanaTokenSwapRequest,
469            scheduled_on: Option<i64>,
470        ) -> Result<(), JobProducerError> {
471            let mut queue = self.queue.lock().await;
472            let job = Job::new(
473                JobType::SolanaTokenSwapRequest,
474                solana_token_swap_request_job,
475            );
476
477            match scheduled_on {
478                Some(on) => {
479                    queue
480                        .solana_token_swap_request_queue
481                        .schedule(job, on)
482                        .await?;
483                }
484                None => {
485                    queue.solana_token_swap_request_queue.push(job).await?;
486                }
487            }
488
489            Ok(())
490        }
491
492        async fn produce_relayer_health_check_job(
493            &self,
494            relayer_health_check_job: RelayerHealthCheck,
495            scheduled_on: Option<i64>,
496        ) -> Result<(), JobProducerError> {
497            let mut queue = self.queue.lock().await;
498            let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job);
499
500            match scheduled_on {
501                Some(scheduled_on) => {
502                    queue
503                        .relayer_health_check_queue
504                        .schedule(job, scheduled_on)
505                        .await?;
506                }
507                None => {
508                    queue.relayer_health_check_queue.push(job).await?;
509                }
510            }
511
512            Ok(())
513        }
514    }
515
516    #[tokio::test]
517    async fn test_job_producer_operations() {
518        let producer = TestJobProducer::new();
519
520        // Test transaction request job
521        let request = TransactionRequest::new("tx123", "relayer-1");
522        let result = producer
523            .produce_transaction_request_job(request, None)
524            .await;
525        assert!(result.is_ok());
526
527        let queue = producer.get_queue().await;
528        assert!(queue.transaction_request_queue.push_called);
529
530        // Test scheduled job
531        let producer = TestJobProducer::new();
532        let request = TransactionRequest::new("tx123", "relayer-1");
533        let scheduled_timestamp = calculate_scheduled_timestamp(10); // Schedule for 10 seconds from now
534        let result = producer
535            .produce_transaction_request_job(request, Some(scheduled_timestamp))
536            .await;
537        assert!(result.is_ok());
538
539        let queue = producer.get_queue().await;
540        assert!(queue.transaction_request_queue.schedule_called);
541    }
542
543    #[tokio::test]
544    async fn test_submit_transaction_job() {
545        let producer = TestJobProducer::new();
546
547        // Test submit transaction job
548        let submit_job = TransactionSend::submit("tx123", "relayer-1");
549        let result = producer
550            .produce_submit_transaction_job(submit_job, None)
551            .await;
552        assert!(result.is_ok());
553
554        let queue = producer.get_queue().await;
555        assert!(queue.transaction_submission_queue.push_called);
556    }
557
558    #[tokio::test]
559    async fn test_check_status_job() {
560        let producer = TestJobProducer::new();
561
562        // Test status check job
563        let status_job = TransactionStatusCheck::new("tx123", "relayer-1");
564        let result = producer
565            .produce_check_transaction_status_job(status_job, None)
566            .await;
567        assert!(result.is_ok());
568
569        let queue = producer.get_queue().await;
570        assert!(queue.transaction_status_queue.push_called);
571    }
572
573    #[tokio::test]
574    async fn test_notification_job() {
575        let producer = TestJobProducer::new();
576
577        // Create a simple notification for testing
578        let notification = WebhookNotification::new(
579            "test_event".to_string(),
580            WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
581                EvmTransactionResponse {
582                    id: "tx123".to_string(),
583                    hash: Some("0x123".to_string()),
584                    status: TransactionStatus::Confirmed,
585                    status_reason: None,
586                    created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
587                    sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
588                    confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
589                    gas_price: Some(1000000000),
590                    gas_limit: Some(21000),
591                    nonce: Some(1),
592                    value: U256::from(1000000000000000000_u64),
593                    from: "0xabc".to_string(),
594                    to: Some("0xdef".to_string()),
595                    relayer_id: "relayer-1".to_string(),
596                    data: None,
597                    max_fee_per_gas: None,
598                    max_priority_fee_per_gas: None,
599                    signature: None,
600                    speed: None,
601                },
602            ))),
603        );
604        let job = NotificationSend::new("notification-1".to_string(), notification);
605
606        let result = producer.produce_send_notification_job(job, None).await;
607        assert!(result.is_ok());
608
609        let queue = producer.get_queue().await;
610        assert!(queue.notification_queue.push_called);
611    }
612
613    #[tokio::test]
614    async fn test_relayer_health_check_job() {
615        let producer = TestJobProducer::new();
616
617        // Test immediate health check job
618        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
619        let result = producer
620            .produce_relayer_health_check_job(health_check, None)
621            .await;
622        assert!(result.is_ok());
623
624        let queue = producer.get_queue().await;
625        assert!(queue.relayer_health_check_queue.push_called);
626
627        // Test scheduled health check job
628        let producer = TestJobProducer::new();
629        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
630        let scheduled_timestamp = calculate_scheduled_timestamp(60);
631        let result = producer
632            .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
633            .await;
634        assert!(result.is_ok());
635
636        let queue = producer.get_queue().await;
637        assert!(queue.relayer_health_check_queue.schedule_called);
638    }
639
640    #[test]
641    fn test_job_producer_error_conversion() {
642        // Test error conversion without using specific Redis error types
643        let job_error = JobProducerError::QueueError("Test error".to_string());
644        let relayer_error: RelayerError = job_error.into();
645
646        match relayer_error {
647            RelayerError::QueueError(msg) => {
648                assert_eq!(msg, "Queue error");
649            }
650            _ => panic!("Unexpected error type"),
651        }
652    }
653
654    #[tokio::test]
655    async fn test_get_queue() {
656        let producer = TestJobProducer::new();
657
658        // Get the queue
659        let queue = producer.get_queue().await;
660
661        // Verify the queue is valid and has the expected structure
662        assert!(!queue.transaction_request_queue.push_called);
663        assert!(!queue.transaction_request_queue.schedule_called);
664        assert!(!queue.transaction_submission_queue.push_called);
665        assert!(!queue.notification_queue.push_called);
666        assert!(!queue.solana_token_swap_request_queue.push_called);
667        assert!(!queue.relayer_health_check_queue.push_called);
668    }
669
670    #[tokio::test]
671    async fn test_produce_relayer_health_check_job_immediate() {
672        let producer = TestJobProducer::new();
673
674        // Test immediate health check job (no scheduling)
675        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
676        let result = producer
677            .produce_relayer_health_check_job(health_check, None)
678            .await;
679
680        // Should succeed
681        assert!(result.is_ok());
682
683        // Verify the job was pushed (not scheduled)
684        let queue = producer.get_queue().await;
685        assert!(queue.relayer_health_check_queue.push_called);
686        assert!(!queue.relayer_health_check_queue.schedule_called);
687
688        // Other queues should not be affected
689        assert!(!queue.transaction_request_queue.push_called);
690        assert!(!queue.transaction_submission_queue.push_called);
691        assert!(!queue.transaction_status_queue.push_called);
692        assert!(!queue.notification_queue.push_called);
693        assert!(!queue.solana_token_swap_request_queue.push_called);
694    }
695
696    #[tokio::test]
697    async fn test_produce_relayer_health_check_job_scheduled() {
698        let producer = TestJobProducer::new();
699
700        // Test scheduled health check job
701        let health_check = RelayerHealthCheck::new("relayer-2".to_string());
702        let scheduled_timestamp = calculate_scheduled_timestamp(300); // 5 minutes from now
703        let result = producer
704            .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
705            .await;
706
707        // Should succeed
708        assert!(result.is_ok());
709
710        // Verify the job was scheduled (not pushed)
711        let queue = producer.get_queue().await;
712        assert!(queue.relayer_health_check_queue.schedule_called);
713        assert!(!queue.relayer_health_check_queue.push_called);
714
715        // Other queues should not be affected
716        assert!(!queue.transaction_request_queue.push_called);
717        assert!(!queue.transaction_submission_queue.push_called);
718        assert!(!queue.transaction_status_queue.push_called);
719        assert!(!queue.notification_queue.push_called);
720        assert!(!queue.solana_token_swap_request_queue.push_called);
721    }
722
723    #[tokio::test]
724    async fn test_produce_relayer_health_check_job_multiple_relayers() {
725        let producer = TestJobProducer::new();
726
727        // Produce health check jobs for multiple relayers
728        let relayer_ids = vec!["relayer-1", "relayer-2", "relayer-3"];
729
730        for relayer_id in &relayer_ids {
731            let health_check = RelayerHealthCheck::new(relayer_id.to_string());
732            let result = producer
733                .produce_relayer_health_check_job(health_check, None)
734                .await;
735            assert!(result.is_ok());
736        }
737
738        // Verify jobs were produced
739        let queue = producer.get_queue().await;
740        assert!(queue.relayer_health_check_queue.push_called);
741    }
742}