1use crate::{
10 jobs::{
11 Job, NotificationSend, Queue, RelayerHealthCheck, TransactionRequest, TransactionSend,
12 TransactionStatusCheck,
13 },
14 models::RelayerError,
15 observability::request_id::get_request_id,
16};
17use apalis::prelude::Storage;
18use apalis_redis::RedisError;
19use async_trait::async_trait;
20use serde::Serialize;
21use thiserror::Error;
22use tokio::sync::Mutex;
23use tracing::{error, info};
24
25use super::{JobType, SolanaTokenSwapRequest};
26
27#[cfg(test)]
28use mockall::automock;
29
30#[derive(Debug, Error, Serialize)]
31pub enum JobProducerError {
32 #[error("Queue error: {0}")]
33 QueueError(String),
34}
35
36impl From<RedisError> for JobProducerError {
37 fn from(_: RedisError) -> Self {
38 JobProducerError::QueueError("Queue error".to_string())
39 }
40}
41
42impl From<JobProducerError> for RelayerError {
43 fn from(_: JobProducerError) -> Self {
44 RelayerError::QueueError("Queue error".to_string())
45 }
46}
47
48#[derive(Debug)]
49pub struct JobProducer {
50 queue: Mutex<Queue>,
51}
52
53impl Clone for JobProducer {
54 fn clone(&self) -> Self {
55 let queue = self
58 .queue
59 .try_lock()
60 .expect("Failed to lock queue for cloning")
61 .clone();
62
63 Self {
64 queue: Mutex::new(queue),
65 }
66 }
67}
68
69#[async_trait]
70#[cfg_attr(test, automock)]
71pub trait JobProducerTrait: Send + Sync {
72 async fn produce_transaction_request_job(
73 &self,
74 transaction_process_job: TransactionRequest,
75 scheduled_on: Option<i64>,
76 ) -> Result<(), JobProducerError>;
77
78 async fn produce_submit_transaction_job(
79 &self,
80 transaction_submit_job: TransactionSend,
81 scheduled_on: Option<i64>,
82 ) -> Result<(), JobProducerError>;
83
84 async fn produce_check_transaction_status_job(
85 &self,
86 transaction_status_check_job: TransactionStatusCheck,
87 scheduled_on: Option<i64>,
88 ) -> Result<(), JobProducerError>;
89
90 async fn produce_send_notification_job(
91 &self,
92 notification_send_job: NotificationSend,
93 scheduled_on: Option<i64>,
94 ) -> Result<(), JobProducerError>;
95
96 async fn produce_solana_token_swap_request_job(
97 &self,
98 solana_swap_request_job: SolanaTokenSwapRequest,
99 scheduled_on: Option<i64>,
100 ) -> Result<(), JobProducerError>;
101
102 async fn produce_relayer_health_check_job(
103 &self,
104 relayer_health_check_job: RelayerHealthCheck,
105 scheduled_on: Option<i64>,
106 ) -> Result<(), JobProducerError>;
107
108 async fn get_queue(&self) -> Result<Queue, JobProducerError>;
109}
110
111impl JobProducer {
112 pub fn new(queue: Queue) -> Self {
113 Self {
114 queue: Mutex::new(queue.clone()),
115 }
116 }
117
118 pub async fn get_queue(&self) -> Result<Queue, JobProducerError> {
119 let queue = self.queue.lock().await;
120
121 Ok(queue.clone())
122 }
123}
124
125#[async_trait]
126impl JobProducerTrait for JobProducer {
127 async fn get_queue(&self) -> Result<Queue, JobProducerError> {
128 let queue = self.queue.lock().await;
129
130 Ok(queue.clone())
131 }
132
133 async fn produce_transaction_request_job(
134 &self,
135 transaction_process_job: TransactionRequest,
136 scheduled_on: Option<i64>,
137 ) -> Result<(), JobProducerError> {
138 info!(
139 "Producing transaction request job: {:?}",
140 transaction_process_job
141 );
142 let mut queue = self.queue.lock().await;
143 let job = Job::new(JobType::TransactionRequest, transaction_process_job)
144 .with_request_id(get_request_id());
145
146 match scheduled_on {
147 Some(scheduled_on) => {
148 queue
149 .transaction_request_queue
150 .schedule(job, scheduled_on)
151 .await?;
152 }
153 None => {
154 queue.transaction_request_queue.push(job).await?;
155 }
156 }
157 info!("Transaction job produced successfully");
158
159 Ok(())
160 }
161
162 async fn produce_submit_transaction_job(
163 &self,
164 transaction_submit_job: TransactionSend,
165 scheduled_on: Option<i64>,
166 ) -> Result<(), JobProducerError> {
167 let mut queue = self.queue.lock().await;
168 let job = Job::new(JobType::TransactionSend, transaction_submit_job)
169 .with_request_id(get_request_id());
170
171 match scheduled_on {
172 Some(on) => {
173 queue.transaction_submission_queue.schedule(job, on).await?;
174 }
175 None => {
176 queue.transaction_submission_queue.push(job).await?;
177 }
178 }
179 info!("Transaction Submit job produced successfully");
180
181 Ok(())
182 }
183
184 async fn produce_check_transaction_status_job(
185 &self,
186 transaction_status_check_job: TransactionStatusCheck,
187 scheduled_on: Option<i64>,
188 ) -> Result<(), JobProducerError> {
189 let mut queue = self.queue.lock().await;
190 let job = Job::new(
191 JobType::TransactionStatusCheck,
192 transaction_status_check_job,
193 )
194 .with_request_id(get_request_id());
195 match scheduled_on {
196 Some(on) => {
197 queue.transaction_status_queue.schedule(job, on).await?;
198 }
199 None => {
200 queue.transaction_status_queue.push(job).await?;
201 }
202 }
203 info!("Transaction Status Check job produced successfully");
204 Ok(())
205 }
206
207 async fn produce_send_notification_job(
208 &self,
209 notification_send_job: NotificationSend,
210 scheduled_on: Option<i64>,
211 ) -> Result<(), JobProducerError> {
212 let mut queue = self.queue.lock().await;
213 let job = Job::new(JobType::NotificationSend, notification_send_job)
214 .with_request_id(get_request_id());
215
216 match scheduled_on {
217 Some(on) => {
218 queue.notification_queue.schedule(job, on).await?;
219 }
220 None => {
221 queue.notification_queue.push(job).await?;
222 }
223 }
224
225 info!("Notification Send job produced successfully");
226 Ok(())
227 }
228
229 async fn produce_solana_token_swap_request_job(
230 &self,
231 solana_swap_request_job: SolanaTokenSwapRequest,
232 scheduled_on: Option<i64>,
233 ) -> Result<(), JobProducerError> {
234 let mut queue = self.queue.lock().await;
235 let job = Job::new(JobType::SolanaTokenSwapRequest, solana_swap_request_job)
236 .with_request_id(get_request_id());
237
238 match scheduled_on {
239 Some(on) => {
240 queue
241 .solana_token_swap_request_queue
242 .schedule(job, on)
243 .await?;
244 }
245 None => {
246 queue.solana_token_swap_request_queue.push(job).await?;
247 }
248 }
249
250 info!("Solana token swap job produced successfully");
251 Ok(())
252 }
253
254 async fn produce_relayer_health_check_job(
255 &self,
256 relayer_health_check_job: RelayerHealthCheck,
257 scheduled_on: Option<i64>,
258 ) -> Result<(), JobProducerError> {
259 let job = Job::new(
260 JobType::RelayerHealthCheck,
261 relayer_health_check_job.clone(),
262 )
263 .with_request_id(get_request_id());
264
265 let mut queue = self.queue.lock().await;
266
267 match scheduled_on {
268 Some(scheduled_on) => {
269 queue
270 .relayer_health_check_queue
271 .schedule(job, scheduled_on)
272 .await?;
273 }
274 None => {
275 queue.relayer_health_check_queue.push(job).await?;
276 }
277 }
278
279 Ok(())
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use crate::models::{
287 EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
288 WebhookPayload, U256,
289 };
290 use crate::utils::calculate_scheduled_timestamp;
291
292 #[derive(Clone, Debug)]
293 struct TestRedisStorage<T> {
295 pub push_called: bool,
296 pub schedule_called: bool,
297 _phantom: std::marker::PhantomData<T>,
298 }
299
300 impl<T> TestRedisStorage<T> {
301 fn new() -> Self {
302 Self {
303 push_called: false,
304 schedule_called: false,
305 _phantom: std::marker::PhantomData,
306 }
307 }
308
309 async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
310 self.push_called = true;
311 Ok(())
312 }
313
314 async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
315 self.schedule_called = true;
316 Ok(())
317 }
318 }
319
320 #[derive(Clone, Debug)]
322 struct TestQueue {
323 pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
324 pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
325 pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
326 pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
327 pub solana_token_swap_request_queue: TestRedisStorage<Job<SolanaTokenSwapRequest>>,
328 pub relayer_health_check_queue: TestRedisStorage<Job<RelayerHealthCheck>>,
329 }
330
331 impl TestQueue {
332 fn new() -> Self {
333 Self {
334 transaction_request_queue: TestRedisStorage::new(),
335 transaction_submission_queue: TestRedisStorage::new(),
336 transaction_status_queue: TestRedisStorage::new(),
337 notification_queue: TestRedisStorage::new(),
338 solana_token_swap_request_queue: TestRedisStorage::new(),
339 relayer_health_check_queue: TestRedisStorage::new(),
340 }
341 }
342 }
343
344 struct TestJobProducer {
346 queue: Mutex<TestQueue>,
347 }
348
349 impl Clone for TestJobProducer {
350 fn clone(&self) -> Self {
351 let queue = self
352 .queue
353 .try_lock()
354 .expect("Failed to lock queue for cloning")
355 .clone();
356 Self {
357 queue: Mutex::new(queue),
358 }
359 }
360 }
361
362 impl TestJobProducer {
363 fn new() -> Self {
364 Self {
365 queue: Mutex::new(TestQueue::new()),
366 }
367 }
368
369 async fn get_queue(&self) -> TestQueue {
370 self.queue.lock().await.clone()
371 }
372 }
373
374 #[async_trait]
375 impl JobProducerTrait for TestJobProducer {
376 async fn get_queue(&self) -> Result<Queue, JobProducerError> {
377 unimplemented!("get_queue not used in tests")
378 }
379
380 async fn produce_transaction_request_job(
381 &self,
382 transaction_process_job: TransactionRequest,
383 scheduled_on: Option<i64>,
384 ) -> Result<(), JobProducerError> {
385 let mut queue = self.queue.lock().await;
386 let job = Job::new(JobType::TransactionRequest, transaction_process_job);
387
388 match scheduled_on {
389 Some(scheduled_on) => {
390 queue
391 .transaction_request_queue
392 .schedule(job, scheduled_on)
393 .await?;
394 }
395 None => {
396 queue.transaction_request_queue.push(job).await?;
397 }
398 }
399
400 Ok(())
401 }
402
403 async fn produce_submit_transaction_job(
404 &self,
405 transaction_submit_job: TransactionSend,
406 scheduled_on: Option<i64>,
407 ) -> Result<(), JobProducerError> {
408 let mut queue = self.queue.lock().await;
409 let job = Job::new(JobType::TransactionSend, transaction_submit_job);
410
411 match scheduled_on {
412 Some(on) => {
413 queue.transaction_submission_queue.schedule(job, on).await?;
414 }
415 None => {
416 queue.transaction_submission_queue.push(job).await?;
417 }
418 }
419
420 Ok(())
421 }
422
423 async fn produce_check_transaction_status_job(
424 &self,
425 transaction_status_check_job: TransactionStatusCheck,
426 scheduled_on: Option<i64>,
427 ) -> Result<(), JobProducerError> {
428 let mut queue = self.queue.lock().await;
429 let job = Job::new(
430 JobType::TransactionStatusCheck,
431 transaction_status_check_job,
432 );
433
434 match scheduled_on {
435 Some(on) => {
436 queue.transaction_status_queue.schedule(job, on).await?;
437 }
438 None => {
439 queue.transaction_status_queue.push(job).await?;
440 }
441 }
442
443 Ok(())
444 }
445
446 async fn produce_send_notification_job(
447 &self,
448 notification_send_job: NotificationSend,
449 scheduled_on: Option<i64>,
450 ) -> Result<(), JobProducerError> {
451 let mut queue = self.queue.lock().await;
452 let job = Job::new(JobType::NotificationSend, notification_send_job);
453
454 match scheduled_on {
455 Some(on) => {
456 queue.notification_queue.schedule(job, on).await?;
457 }
458 None => {
459 queue.notification_queue.push(job).await?;
460 }
461 }
462
463 Ok(())
464 }
465
466 async fn produce_solana_token_swap_request_job(
467 &self,
468 solana_token_swap_request_job: SolanaTokenSwapRequest,
469 scheduled_on: Option<i64>,
470 ) -> Result<(), JobProducerError> {
471 let mut queue = self.queue.lock().await;
472 let job = Job::new(
473 JobType::SolanaTokenSwapRequest,
474 solana_token_swap_request_job,
475 );
476
477 match scheduled_on {
478 Some(on) => {
479 queue
480 .solana_token_swap_request_queue
481 .schedule(job, on)
482 .await?;
483 }
484 None => {
485 queue.solana_token_swap_request_queue.push(job).await?;
486 }
487 }
488
489 Ok(())
490 }
491
492 async fn produce_relayer_health_check_job(
493 &self,
494 relayer_health_check_job: RelayerHealthCheck,
495 scheduled_on: Option<i64>,
496 ) -> Result<(), JobProducerError> {
497 let mut queue = self.queue.lock().await;
498 let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job);
499
500 match scheduled_on {
501 Some(scheduled_on) => {
502 queue
503 .relayer_health_check_queue
504 .schedule(job, scheduled_on)
505 .await?;
506 }
507 None => {
508 queue.relayer_health_check_queue.push(job).await?;
509 }
510 }
511
512 Ok(())
513 }
514 }
515
516 #[tokio::test]
517 async fn test_job_producer_operations() {
518 let producer = TestJobProducer::new();
519
520 let request = TransactionRequest::new("tx123", "relayer-1");
522 let result = producer
523 .produce_transaction_request_job(request, None)
524 .await;
525 assert!(result.is_ok());
526
527 let queue = producer.get_queue().await;
528 assert!(queue.transaction_request_queue.push_called);
529
530 let producer = TestJobProducer::new();
532 let request = TransactionRequest::new("tx123", "relayer-1");
533 let scheduled_timestamp = calculate_scheduled_timestamp(10); let result = producer
535 .produce_transaction_request_job(request, Some(scheduled_timestamp))
536 .await;
537 assert!(result.is_ok());
538
539 let queue = producer.get_queue().await;
540 assert!(queue.transaction_request_queue.schedule_called);
541 }
542
543 #[tokio::test]
544 async fn test_submit_transaction_job() {
545 let producer = TestJobProducer::new();
546
547 let submit_job = TransactionSend::submit("tx123", "relayer-1");
549 let result = producer
550 .produce_submit_transaction_job(submit_job, None)
551 .await;
552 assert!(result.is_ok());
553
554 let queue = producer.get_queue().await;
555 assert!(queue.transaction_submission_queue.push_called);
556 }
557
558 #[tokio::test]
559 async fn test_check_status_job() {
560 let producer = TestJobProducer::new();
561
562 let status_job = TransactionStatusCheck::new("tx123", "relayer-1");
564 let result = producer
565 .produce_check_transaction_status_job(status_job, None)
566 .await;
567 assert!(result.is_ok());
568
569 let queue = producer.get_queue().await;
570 assert!(queue.transaction_status_queue.push_called);
571 }
572
573 #[tokio::test]
574 async fn test_notification_job() {
575 let producer = TestJobProducer::new();
576
577 let notification = WebhookNotification::new(
579 "test_event".to_string(),
580 WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
581 EvmTransactionResponse {
582 id: "tx123".to_string(),
583 hash: Some("0x123".to_string()),
584 status: TransactionStatus::Confirmed,
585 status_reason: None,
586 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
587 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
588 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
589 gas_price: Some(1000000000),
590 gas_limit: Some(21000),
591 nonce: Some(1),
592 value: U256::from(1000000000000000000_u64),
593 from: "0xabc".to_string(),
594 to: Some("0xdef".to_string()),
595 relayer_id: "relayer-1".to_string(),
596 data: None,
597 max_fee_per_gas: None,
598 max_priority_fee_per_gas: None,
599 signature: None,
600 speed: None,
601 },
602 ))),
603 );
604 let job = NotificationSend::new("notification-1".to_string(), notification);
605
606 let result = producer.produce_send_notification_job(job, None).await;
607 assert!(result.is_ok());
608
609 let queue = producer.get_queue().await;
610 assert!(queue.notification_queue.push_called);
611 }
612
613 #[tokio::test]
614 async fn test_relayer_health_check_job() {
615 let producer = TestJobProducer::new();
616
617 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
619 let result = producer
620 .produce_relayer_health_check_job(health_check, None)
621 .await;
622 assert!(result.is_ok());
623
624 let queue = producer.get_queue().await;
625 assert!(queue.relayer_health_check_queue.push_called);
626
627 let producer = TestJobProducer::new();
629 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
630 let scheduled_timestamp = calculate_scheduled_timestamp(60);
631 let result = producer
632 .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
633 .await;
634 assert!(result.is_ok());
635
636 let queue = producer.get_queue().await;
637 assert!(queue.relayer_health_check_queue.schedule_called);
638 }
639
640 #[test]
641 fn test_job_producer_error_conversion() {
642 let job_error = JobProducerError::QueueError("Test error".to_string());
644 let relayer_error: RelayerError = job_error.into();
645
646 match relayer_error {
647 RelayerError::QueueError(msg) => {
648 assert_eq!(msg, "Queue error");
649 }
650 _ => panic!("Unexpected error type"),
651 }
652 }
653
654 #[tokio::test]
655 async fn test_get_queue() {
656 let producer = TestJobProducer::new();
657
658 let queue = producer.get_queue().await;
660
661 assert!(!queue.transaction_request_queue.push_called);
663 assert!(!queue.transaction_request_queue.schedule_called);
664 assert!(!queue.transaction_submission_queue.push_called);
665 assert!(!queue.notification_queue.push_called);
666 assert!(!queue.solana_token_swap_request_queue.push_called);
667 assert!(!queue.relayer_health_check_queue.push_called);
668 }
669
670 #[tokio::test]
671 async fn test_produce_relayer_health_check_job_immediate() {
672 let producer = TestJobProducer::new();
673
674 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
676 let result = producer
677 .produce_relayer_health_check_job(health_check, None)
678 .await;
679
680 assert!(result.is_ok());
682
683 let queue = producer.get_queue().await;
685 assert!(queue.relayer_health_check_queue.push_called);
686 assert!(!queue.relayer_health_check_queue.schedule_called);
687
688 assert!(!queue.transaction_request_queue.push_called);
690 assert!(!queue.transaction_submission_queue.push_called);
691 assert!(!queue.transaction_status_queue.push_called);
692 assert!(!queue.notification_queue.push_called);
693 assert!(!queue.solana_token_swap_request_queue.push_called);
694 }
695
696 #[tokio::test]
697 async fn test_produce_relayer_health_check_job_scheduled() {
698 let producer = TestJobProducer::new();
699
700 let health_check = RelayerHealthCheck::new("relayer-2".to_string());
702 let scheduled_timestamp = calculate_scheduled_timestamp(300); let result = producer
704 .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
705 .await;
706
707 assert!(result.is_ok());
709
710 let queue = producer.get_queue().await;
712 assert!(queue.relayer_health_check_queue.schedule_called);
713 assert!(!queue.relayer_health_check_queue.push_called);
714
715 assert!(!queue.transaction_request_queue.push_called);
717 assert!(!queue.transaction_submission_queue.push_called);
718 assert!(!queue.transaction_status_queue.push_called);
719 assert!(!queue.notification_queue.push_called);
720 assert!(!queue.solana_token_swap_request_queue.push_called);
721 }
722
723 #[tokio::test]
724 async fn test_produce_relayer_health_check_job_multiple_relayers() {
725 let producer = TestJobProducer::new();
726
727 let relayer_ids = vec!["relayer-1", "relayer-2", "relayer-3"];
729
730 for relayer_id in &relayer_ids {
731 let health_check = RelayerHealthCheck::new(relayer_id.to_string());
732 let result = producer
733 .produce_relayer_health_check_job(health_check, None)
734 .await;
735 assert!(result.is_ok());
736 }
737
738 let queue = producer.get_queue().await;
740 assert!(queue.relayer_health_check_queue.push_called);
741 }
742}