openzeppelin_relayer/jobs/handlers/
transaction_submission_handler.rs1use actix_web::web::ThinData;
9use apalis::prelude::{Attempt, Data, *};
10use eyre::Result;
11use tracing::{debug, info, instrument};
12
13use crate::{
14 constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
15 domain::{get_relayer_transaction, get_transaction_by_id, Transaction},
16 jobs::{handle_result, Job, TransactionCommand, TransactionSend},
17 models::DefaultAppState,
18 observability::request_id::set_request_id,
19};
20
21#[instrument(
22 level = "info",
23 skip(job, state),
24 fields(
25 request_id = ?job.request_id,
26 job_id = %job.message_id,
27 job_type = %job.job_type.to_string(),
28 attempt = %attempt.current(),
29 tx_id = %job.data.transaction_id,
30 relayer_id = %job.data.relayer_id,
31 command = ?job.data.command,
32 ),
33 err
34)]
35pub async fn transaction_submission_handler(
36 job: Job<TransactionSend>,
37 state: Data<ThinData<DefaultAppState>>,
38 attempt: Attempt,
39) -> Result<(), Error> {
40 if let Some(request_id) = job.request_id.clone() {
41 set_request_id(request_id);
42 }
43
44 debug!("handling transaction submission");
45
46 let result = handle_request(job.data, state).await;
47
48 handle_result(
49 result,
50 attempt,
51 "Transaction Sender",
52 WORKER_DEFAULT_MAXIMUM_RETRIES,
53 )
54}
55
56async fn handle_request(
57 status_request: TransactionSend,
58 state: Data<ThinData<DefaultAppState>>,
59) -> Result<()> {
60 let relayer_transaction =
61 get_relayer_transaction(status_request.relayer_id.clone(), &state).await?;
62
63 let transaction = get_transaction_by_id(status_request.transaction_id, &state).await?;
64
65 match status_request.command {
66 TransactionCommand::Submit => {
67 relayer_transaction.submit_transaction(transaction).await?;
68 }
69 TransactionCommand::Cancel { reason } => {
70 info!(
71 reason = %reason,
72 "cancelling transaction"
73 );
74 relayer_transaction.submit_transaction(transaction).await?;
75 }
76 TransactionCommand::Resubmit => {
77 debug!("resubmitting transaction with updated parameters");
78 relayer_transaction
79 .resubmit_transaction(transaction)
80 .await?;
81 }
82 TransactionCommand::Resend => {
83 debug!("resending transaction");
84 relayer_transaction.submit_transaction(transaction).await?;
85 }
86 };
87
88 debug!("transaction handled successfully");
89
90 Ok(())
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96 use std::collections::HashMap;
97
98 #[tokio::test]
99 async fn test_submission_handler_job_validation() {
100 let submit_job = TransactionSend::submit("tx123", "relayer-1");
102 let job = Job::new(crate::jobs::JobType::TransactionSend, submit_job);
103
104 match job.data.command {
106 TransactionCommand::Submit => {}
107 _ => panic!("Expected Submit command"),
108 }
109 assert_eq!(job.data.transaction_id, "tx123");
110 assert_eq!(job.data.relayer_id, "relayer-1");
111 assert!(job.data.metadata.is_none());
112
113 let cancel_job = TransactionSend::cancel("tx123", "relayer-1", "user requested");
115 let job = Job::new(crate::jobs::JobType::TransactionSend, cancel_job);
116
117 match job.data.command {
119 TransactionCommand::Cancel { reason } => {
120 assert_eq!(reason, "user requested");
121 }
122 _ => panic!("Expected Cancel command"),
123 }
124 }
125
126 #[tokio::test]
127 async fn test_submission_job_with_metadata() {
128 let mut metadata = HashMap::new();
130 metadata.insert("gas_price".to_string(), "20000000000".to_string());
131
132 let submit_job =
133 TransactionSend::submit("tx123", "relayer-1").with_metadata(metadata.clone());
134
135 assert!(submit_job.metadata.is_some());
137 let job_metadata = submit_job.metadata.unwrap();
138 assert_eq!(job_metadata.get("gas_price").unwrap(), "20000000000");
139 }
140
141 }