openzeppelin_relayer/jobs/handlers/
solana_swap_request_handler.rs

1//! Solana swap request handling worker implementation.
2//!
3//! This module implements the solana token swap request handling worker that processes
4//! notification jobs from the queue.
5
6use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, *};
8use eyre::Result;
9use tracing::{debug, info, instrument};
10
11use crate::{
12    constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
13    domain::{create_solana_relayer, get_relayer_by_id, SolanaRelayerDexTrait},
14    jobs::{handle_result, Job, SolanaTokenSwapRequest},
15    models::DefaultAppState,
16    observability::request_id::set_request_id,
17    repositories::Repository,
18};
19
20/// Handles incoming swap jobs from the queue.
21///
22/// # Arguments
23/// * `job` - The notification job containing recipient and message details
24/// * `context` - Application state containing notification services
25///
26/// # Returns
27/// * `Result<(), Error>` - Success or failure of notification processing
28#[instrument(
29    level = "info",
30    skip(job, context),
31    fields(
32        request_id = ?job.request_id,
33        job_id = %job.message_id,
34        job_type = %job.job_type.to_string(),
35        attempt = %attempt.current(),
36        relayer_id = %job.data.relayer_id,
37    ),
38    err
39)]
40pub async fn solana_token_swap_request_handler(
41    job: Job<SolanaTokenSwapRequest>,
42    context: Data<ThinData<DefaultAppState>>,
43    attempt: Attempt,
44) -> Result<(), Error> {
45    if let Some(request_id) = job.request_id.clone() {
46        set_request_id(request_id);
47    }
48
49    debug!("handling solana token swap request");
50
51    let result = handle_request(job.data, context).await;
52
53    handle_result(
54        result,
55        attempt,
56        "SolanaTokenSwapRequest",
57        WORKER_DEFAULT_MAXIMUM_RETRIES,
58    )
59}
60
61#[derive(Default, Debug, Clone)]
62pub struct CronReminder();
63
64/// Handles incoming swap jobs from the cron queue.
65#[instrument(
66    level = "info",
67    skip(_job, data, relayer_id),
68    fields(
69        job_type = "solana_token_swap_cron",
70        attempt = %attempt.current(),
71    ),
72    err
73)]
74pub async fn solana_token_swap_cron_handler(
75    _job: CronReminder,
76    relayer_id: Data<String>,
77    data: Data<ThinData<DefaultAppState>>,
78    attempt: Attempt,
79) -> Result<(), Error> {
80    info!(
81        relayer_id = %*relayer_id,
82        "handling solana token swap cron request"
83    );
84
85    let result = handle_request(
86        SolanaTokenSwapRequest {
87            relayer_id: relayer_id.to_string(),
88        },
89        data,
90    )
91    .await;
92
93    handle_result(
94        result,
95        attempt,
96        "SolanaTokenSwapRequest",
97        WORKER_DEFAULT_MAXIMUM_RETRIES,
98    )
99}
100
101async fn handle_request(
102    request: SolanaTokenSwapRequest,
103    context: Data<ThinData<DefaultAppState>>,
104) -> Result<()> {
105    debug!("processing solana token swap");
106
107    let relayer_model = get_relayer_by_id(request.relayer_id.clone(), &context).await?;
108    let signer_model = context
109        .signer_repository
110        .get_by_id(relayer_model.signer_id.clone())
111        .await?;
112
113    let relayer = create_solana_relayer(
114        relayer_model,
115        signer_model,
116        context.relayer_repository(),
117        context.network_repository(),
118        context.transaction_repository(),
119        context.job_producer(),
120    )
121    .await?;
122
123    relayer
124        .handle_token_swap_request(request.relayer_id.clone())
125        .await
126        .map_err(|e| eyre::eyre!("Failed to handle solana token swap request: {}", e))?;
127
128    Ok(())
129}
130
131#[cfg(test)]
132mod tests {}