openzeppelin_relayer/jobs/handlers/
transaction_request_handler.rs

1//! Transaction request handler for processing incoming transaction jobs.
2//!
3//! Handles the validation and preparation of transactions before they are
4//! submitted to the network
5use actix_web::web::ThinData;
6use apalis::prelude::{Attempt, Context, Data, TaskId, Worker, *};
7use apalis_redis::RedisContext;
8use eyre::Result;
9use tracing::{debug, instrument};
10
11use crate::{
12    constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
13    domain::{get_relayer_transaction, get_transaction_by_id, Transaction},
14    jobs::{handle_result, Job, TransactionRequest},
15    models::DefaultAppState,
16    observability::request_id::set_request_id,
17};
18
19#[instrument(
20    level = "info",
21    skip(job, state, _worker, _ctx),
22    fields(
23        request_id = ?job.request_id,
24        job_id = %job.message_id,
25        job_type = %job.job_type.to_string(),
26        attempt = %attempt.current(),
27        tx_id = %job.data.transaction_id,
28        relayer_id = %job.data.relayer_id,
29        task_id = %task_id.to_string(),
30    ),
31    err
32)]
33pub async fn transaction_request_handler(
34    job: Job<TransactionRequest>,
35    state: Data<ThinData<DefaultAppState>>,
36    attempt: Attempt,
37    _worker: Worker<Context>,
38    task_id: TaskId,
39    _ctx: RedisContext,
40) -> Result<(), Error> {
41    if let Some(request_id) = job.request_id.clone() {
42        set_request_id(request_id);
43    }
44
45    debug!("handling transaction request");
46
47    let result = handle_request(job.data, state).await;
48
49    handle_result(
50        result,
51        attempt,
52        "Transaction Request",
53        WORKER_DEFAULT_MAXIMUM_RETRIES,
54    )
55}
56
57async fn handle_request(
58    request: TransactionRequest,
59    state: Data<ThinData<DefaultAppState>>,
60) -> Result<()> {
61    let relayer_transaction = get_relayer_transaction(request.relayer_id, &state).await?;
62
63    let transaction = get_transaction_by_id(request.transaction_id, &state).await?;
64
65    relayer_transaction.prepare_transaction(transaction).await?;
66
67    debug!("transaction request handled successfully");
68
69    Ok(())
70}
71
72#[cfg(test)]
73mod tests {
74    use super::*;
75    use apalis::prelude::Attempt;
76
77    #[tokio::test]
78    async fn test_handler_result_processing() {
79        // This test focuses only on the interaction with handle_result
80        // which we can test without mocking the entire state
81
82        // Create a minimal job
83        let request = TransactionRequest::new("tx123", "relayer-1");
84        let job = Job::new(crate::jobs::JobType::TransactionRequest, request);
85
86        // Create a test attempt
87        let attempt = Attempt::default();
88
89        // We cannot fully test the transaction_request_handler without extensive mocking
90        // of the domain layer, but we can verify our test setup is correct
91        assert_eq!(job.data.transaction_id, "tx123");
92        assert_eq!(job.data.relayer_id, "relayer-1");
93        assert_eq!(attempt.current(), 0);
94    }
95
96    // Note: Fully testing the functionality would require either:
97    // 1. Dependency injection for all external dependencies
98    // 2. Feature flags to enable mock implementations
99    // 3. Integration tests with a real or test database
100
101    // For now, these tests serve as placeholders to be expanded
102    // when the appropriate testing infrastructure is in place.
103}