openzeppelin_relayer/jobs/handlers/
notification_handler.rs1use actix_web::web::ThinData;
7use apalis::prelude::{Attempt, Data, *};
8use eyre::Result;
9use tracing::{debug, instrument};
10
11use crate::{
12 constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
13 jobs::{handle_result, Job, NotificationSend},
14 models::DefaultAppState,
15 observability::request_id::set_request_id,
16 repositories::Repository,
17 services::WebhookNotificationService,
18};
19
20#[instrument(
29 level = "info",
30 skip(job, context),
31 fields(
32 request_id = ?job.request_id,
33 job_id = %job.message_id,
34 job_type = %job.job_type.to_string(),
35 attempt = %attempt.current(),
36 notification_id = %job.data.notification_id,
37 ),
38 err
39)]
40pub async fn notification_handler(
41 job: Job<NotificationSend>,
42 context: Data<ThinData<DefaultAppState>>,
43 attempt: Attempt,
44) -> Result<(), Error> {
45 if let Some(request_id) = job.request_id.clone() {
46 set_request_id(request_id);
47 }
48
49 debug!("handling notification");
50
51 let result = handle_request(job.data, context).await;
52
53 handle_result(
54 result,
55 attempt,
56 "Notification",
57 WORKER_DEFAULT_MAXIMUM_RETRIES,
58 )
59}
60
61async fn handle_request(
62 request: NotificationSend,
63 context: Data<ThinData<DefaultAppState>>,
64) -> Result<()> {
65 debug!("sending notification");
66 let notification = context
67 .notification_repository
68 .get_by_id(request.notification_id)
69 .await?;
70
71 let notification_service =
72 WebhookNotificationService::new(notification.url, notification.signing_key);
73
74 notification_service
75 .send_notification(request.notification)
76 .await?;
77
78 Ok(())
79}
80
81#[cfg(test)]
82mod tests {
83 use super::*;
84 use crate::models::{
85 EvmTransactionResponse, NetworkType, RelayerDisabledPayload, RelayerEvmPolicy,
86 RelayerNetworkPolicyResponse, RelayerResponse, TransactionResponse, TransactionStatus,
87 WebhookNotification, WebhookPayload, U256,
88 };
89
90 #[tokio::test]
91 async fn test_notification_job_creation() {
92 let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
94 EvmTransactionResponse {
95 id: "tx123".to_string(),
96 hash: Some("0x123".to_string()),
97 status: TransactionStatus::Confirmed,
98 status_reason: None,
99 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
100 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
101 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
102 gas_price: Some(1000000000),
103 gas_limit: Some(21000),
104 nonce: Some(1),
105 value: U256::from(1000000000000000000_u64),
106 from: "0xabc".to_string(),
107 to: Some("0xdef".to_string()),
108 relayer_id: "relayer-1".to_string(),
109 data: None,
110 max_fee_per_gas: None,
111 max_priority_fee_per_gas: None,
112 signature: None,
113 speed: None,
114 },
115 )));
116
117 let notification = WebhookNotification::new("test_event".to_string(), payload);
119 let notification_job =
120 NotificationSend::new("notification-1".to_string(), notification.clone());
121
122 let job = Job::new(crate::jobs::JobType::NotificationSend, notification_job);
124
125 assert_eq!(job.data.notification_id, "notification-1");
127 assert_eq!(job.data.notification.event, "test_event");
128 }
129
130 #[tokio::test]
131 async fn test_notification_job_with_different_payloads() {
132 let transaction_payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
135 EvmTransactionResponse {
136 id: "tx123".to_string(),
137 hash: Some("0x123".to_string()),
138 status: TransactionStatus::Confirmed,
139 status_reason: None,
140 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
141 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
142 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
143 gas_price: Some(1000000000),
144 gas_limit: Some(21000),
145 nonce: Some(1),
146 value: U256::from(1000000000000000000_u64),
147 from: "0xabc".to_string(),
148 to: Some("0xdef".to_string()),
149 relayer_id: "relayer-1".to_string(),
150 data: None,
151 max_fee_per_gas: None,
152 max_priority_fee_per_gas: None,
153 signature: None,
154 speed: None,
155 },
156 )));
157
158 let string_notification =
159 WebhookNotification::new("transaction_payload".to_string(), transaction_payload);
160 let job = NotificationSend::new("notification-string".to_string(), string_notification);
161 assert_eq!(job.notification.event, "transaction_payload");
162
163 let relayer_disabled = WebhookPayload::RelayerDisabled(Box::new(RelayerDisabledPayload {
164 relayer: RelayerResponse {
165 id: "relayer-1".to_string(),
166 name: "relayer-1".to_string(),
167 network: "ethereum".to_string(),
168 network_type: NetworkType::Evm,
169 paused: false,
170 policies: Some(RelayerNetworkPolicyResponse::Evm(
171 RelayerEvmPolicy {
172 gas_price_cap: None,
173 whitelist_receivers: None,
174 eip1559_pricing: None,
175 private_transactions: Some(false),
176 min_balance: Some(0),
177 gas_limit_estimation: None,
178 }
179 .into(),
180 )),
181 signer_id: "signer-1".to_string(),
182 notification_id: None,
183 custom_rpc_urls: None,
184 address: Some("0xabc".to_string()),
185 system_disabled: Some(false),
186 ..Default::default()
187 },
188 disable_reason: "test".to_string(),
189 }));
190 let object_notification =
191 WebhookNotification::new("object_event".to_string(), relayer_disabled);
192 let job = NotificationSend::new("notification-object".to_string(), object_notification);
193 assert_eq!(job.notification.event, "object_event");
194 }
195}