openzeppelin_relayer/jobs/handlers/
transaction_status_handler.rs1use actix_web::web::ThinData;
8use apalis::prelude::{Attempt, Data, *};
9use eyre::Result;
10use tracing::{debug, instrument};
11
12use crate::{
13 constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
14 domain::{get_relayer_transaction, get_transaction_by_id, Transaction},
15 jobs::{handle_result, Job, TransactionStatusCheck},
16 models::DefaultAppState,
17 observability::request_id::set_request_id,
18};
19
20#[instrument(
21 level = "info",
22 skip(job, state),
23 fields(
24 request_id = ?job.request_id,
25 job_id = %job.message_id,
26 job_type = %job.job_type.to_string(),
27 attempt = %attempt.current(),
28 tx_id = %job.data.transaction_id,
29 relayer_id = %job.data.relayer_id,
30 ),
31 err
32)]
33pub async fn transaction_status_handler(
34 job: Job<TransactionStatusCheck>,
35 state: Data<ThinData<DefaultAppState>>,
36 attempt: Attempt,
37) -> Result<(), Error> {
38 if let Some(request_id) = job.request_id.clone() {
39 set_request_id(request_id);
40 }
41
42 debug!("handling transaction status check");
43
44 let result = handle_request(job.data, state).await;
45
46 handle_result(
47 result,
48 attempt,
49 "Transaction Status",
50 WORKER_DEFAULT_MAXIMUM_RETRIES,
51 )
52}
53
54async fn handle_request(
55 status_request: TransactionStatusCheck,
56 state: Data<ThinData<DefaultAppState>>,
57) -> Result<()> {
58 let relayer_transaction =
59 get_relayer_transaction(status_request.relayer_id.clone(), &state).await?;
60
61 let transaction = get_transaction_by_id(status_request.transaction_id, &state).await?;
62
63 relayer_transaction
64 .handle_transaction_status(transaction)
65 .await?;
66
67 debug!("status check handled successfully");
68
69 Ok(())
70}
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75 use apalis::prelude::Attempt;
76 use std::collections::HashMap;
77
78 #[tokio::test]
79 async fn test_status_check_job_validation() {
80 let check_job = TransactionStatusCheck::new("tx123", "relayer-1");
82 let job = Job::new(crate::jobs::JobType::TransactionStatusCheck, check_job);
83
84 assert_eq!(job.data.transaction_id, "tx123");
86 assert_eq!(job.data.relayer_id, "relayer-1");
87 assert!(job.data.metadata.is_none());
88 }
89
90 #[tokio::test]
91 async fn test_status_check_with_metadata() {
92 let mut metadata = HashMap::new();
94 metadata.insert("retry_count".to_string(), "2".to_string());
95 metadata.insert("last_status".to_string(), "pending".to_string());
96
97 let check_job =
98 TransactionStatusCheck::new("tx123", "relayer-1").with_metadata(metadata.clone());
99
100 assert!(check_job.metadata.is_some());
102 let job_metadata = check_job.metadata.unwrap();
103 assert_eq!(job_metadata.get("retry_count").unwrap(), "2");
104 assert_eq!(job_metadata.get("last_status").unwrap(), "pending");
105 }
106
107 #[tokio::test]
108 async fn test_status_handler_attempt_tracking() {
109 let first_attempt = Attempt::default();
111 assert_eq!(first_attempt.current(), 0);
112
113 let second_attempt = Attempt::default();
114 second_attempt.increment();
115 assert_eq!(second_attempt.current(), 1);
116
117 let final_attempt = Attempt::default();
118 for _ in 0..WORKER_DEFAULT_MAXIMUM_RETRIES {
119 final_attempt.increment();
120 }
121 assert_eq!(final_attempt.current(), WORKER_DEFAULT_MAXIMUM_RETRIES);
122 }
123}