openzeppelin_relayer/jobs/handlers/
notification_handler.rs

1//! Notification handling worker implementation.
2//!
3//! This module implements the notification handling worker that processes
4//! notification jobs from the queue.
5
6use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, *};
8use eyre::Result;
9use tracing::{debug, instrument};
10
11use crate::{
12    constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
13    jobs::{handle_result, Job, NotificationSend},
14    models::DefaultAppState,
15    observability::request_id::set_request_id,
16    repositories::Repository,
17    services::WebhookNotificationService,
18};
19
20/// Handles incoming notification jobs from the queue.
21///
22/// # Arguments
23/// * `job` - The notification job containing recipient and message details
24/// * `context` - Application state containing notification services
25///
26/// # Returns
27/// * `Result<(), Error>` - Success or failure of notification processing
28#[instrument(
29    level = "info",
30    skip(job, context),
31    fields(
32        request_id = ?job.request_id,
33        job_id = %job.message_id,
34        job_type = %job.job_type.to_string(),
35        attempt = %attempt.current(),
36        notification_id = %job.data.notification_id,
37    ),
38    err
39)]
40pub async fn notification_handler(
41    job: Job<NotificationSend>,
42    context: Data<ThinData<DefaultAppState>>,
43    attempt: Attempt,
44) -> Result<(), Error> {
45    if let Some(request_id) = job.request_id.clone() {
46        set_request_id(request_id);
47    }
48
49    debug!("handling notification");
50
51    let result = handle_request(job.data, context).await;
52
53    handle_result(
54        result,
55        attempt,
56        "Notification",
57        WORKER_DEFAULT_MAXIMUM_RETRIES,
58    )
59}
60
61async fn handle_request(
62    request: NotificationSend,
63    context: Data<ThinData<DefaultAppState>>,
64) -> Result<()> {
65    debug!("sending notification");
66    let notification = context
67        .notification_repository
68        .get_by_id(request.notification_id)
69        .await?;
70
71    let notification_service =
72        WebhookNotificationService::new(notification.url, notification.signing_key);
73
74    notification_service
75        .send_notification(request.notification)
76        .await?;
77
78    Ok(())
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84    use crate::models::{
85        EvmTransactionResponse, NetworkType, RelayerDisabledPayload, RelayerEvmPolicy,
86        RelayerNetworkPolicyResponse, RelayerResponse, TransactionResponse, TransactionStatus,
87        WebhookNotification, WebhookPayload, U256,
88    };
89
90    #[tokio::test]
91    async fn test_notification_job_creation() {
92        // Create a basic notification webhook payload
93        let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
94            EvmTransactionResponse {
95                id: "tx123".to_string(),
96                hash: Some("0x123".to_string()),
97                status: TransactionStatus::Confirmed,
98                status_reason: None,
99                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
100                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
101                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
102                gas_price: Some(1000000000),
103                gas_limit: Some(21000),
104                nonce: Some(1),
105                value: U256::from(1000000000000000000_u64),
106                from: "0xabc".to_string(),
107                to: Some("0xdef".to_string()),
108                relayer_id: "relayer-1".to_string(),
109                data: None,
110                max_fee_per_gas: None,
111                max_priority_fee_per_gas: None,
112                signature: None,
113                speed: None,
114            },
115        )));
116
117        // Create a notification
118        let notification = WebhookNotification::new("test_event".to_string(), payload);
119        let notification_job =
120            NotificationSend::new("notification-1".to_string(), notification.clone());
121
122        // Create the job
123        let job = Job::new(crate::jobs::JobType::NotificationSend, notification_job);
124
125        // Test the job structure
126        assert_eq!(job.data.notification_id, "notification-1");
127        assert_eq!(job.data.notification.event, "test_event");
128    }
129
130    #[tokio::test]
131    async fn test_notification_job_with_different_payloads() {
132        // Test with different payload types
133
134        let transaction_payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
135            EvmTransactionResponse {
136                id: "tx123".to_string(),
137                hash: Some("0x123".to_string()),
138                status: TransactionStatus::Confirmed,
139                status_reason: None,
140                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
141                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
142                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
143                gas_price: Some(1000000000),
144                gas_limit: Some(21000),
145                nonce: Some(1),
146                value: U256::from(1000000000000000000_u64),
147                from: "0xabc".to_string(),
148                to: Some("0xdef".to_string()),
149                relayer_id: "relayer-1".to_string(),
150                data: None,
151                max_fee_per_gas: None,
152                max_priority_fee_per_gas: None,
153                signature: None,
154                speed: None,
155            },
156        )));
157
158        let string_notification =
159            WebhookNotification::new("transaction_payload".to_string(), transaction_payload);
160        let job = NotificationSend::new("notification-string".to_string(), string_notification);
161        assert_eq!(job.notification.event, "transaction_payload");
162
163        let relayer_disabled = WebhookPayload::RelayerDisabled(Box::new(RelayerDisabledPayload {
164            relayer: RelayerResponse {
165                id: "relayer-1".to_string(),
166                name: "relayer-1".to_string(),
167                network: "ethereum".to_string(),
168                network_type: NetworkType::Evm,
169                paused: false,
170                policies: Some(RelayerNetworkPolicyResponse::Evm(
171                    RelayerEvmPolicy {
172                        gas_price_cap: None,
173                        whitelist_receivers: None,
174                        eip1559_pricing: None,
175                        private_transactions: Some(false),
176                        min_balance: Some(0),
177                        gas_limit_estimation: None,
178                    }
179                    .into(),
180                )),
181                signer_id: "signer-1".to_string(),
182                notification_id: None,
183                custom_rpc_urls: None,
184                address: Some("0xabc".to_string()),
185                system_disabled: Some(false),
186                ..Default::default()
187            },
188            disable_reason: "test".to_string(),
189        }));
190        let object_notification =
191            WebhookNotification::new("object_event".to_string(), relayer_disabled);
192        let job = NotificationSend::new("notification-object".to_string(), object_notification);
193        assert_eq!(job.notification.event, "object_event");
194    }
195}