1use chrono::Utc;
7use solana_sdk::signature::Signature;
8use std::str::FromStr;
9use tracing::{debug, error, info, warn};
10
11use super::SolanaRelayerTransaction;
12use crate::{
13 jobs::{JobProducerTrait, TransactionStatusCheck},
14 models::{
15 produce_transaction_update_notification_payload, RelayerRepoModel, SolanaTransactionStatus,
16 TransactionError, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
17 },
18 repositories::{transaction::TransactionRepository, RelayerRepository, Repository},
19 services::provider::SolanaProviderTrait,
20 utils::calculate_scheduled_timestamp,
21};
22
23const SOLANA_DEFAULT_STATUS_RETRY_DELAY_SECONDS: i64 = 10;
25
26impl<P, RR, TR, J> SolanaRelayerTransaction<P, RR, TR, J>
27where
28 P: SolanaProviderTrait,
29 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
30 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
31 J: JobProducerTrait + Send + Sync + 'static,
32{
33 pub async fn handle_transaction_status_impl(
35 &self,
36 tx: TransactionRepoModel,
37 ) -> Result<TransactionRepoModel, TransactionError> {
38 info!("handling solana transaction status");
39
40 if matches!(
42 tx.status,
43 TransactionStatus::Confirmed | TransactionStatus::Failed | TransactionStatus::Expired
44 ) {
45 info!(status = ?tx.status, "transaction already in final state");
46 return Ok(tx);
47 }
48
49 match self.check_and_update_status(tx.clone()).await {
51 Ok(updated_tx) => Ok(updated_tx),
52 Err(error) => {
53 match error {
55 TransactionError::ValidationError(_) => {
56 Err(error)
58 }
59 _ => {
60 self.handle_status_check_failure(tx, error).await
62 }
63 }
64 }
65 }
66 }
67
68 async fn handle_status_check_failure(
71 &self,
72 tx: TransactionRepoModel,
73 error: TransactionError,
74 ) -> Result<TransactionRepoModel, TransactionError> {
75 warn!(error = %error, "failed to get solana transaction status, re-queueing check");
76
77 if let Err(requeue_error) = self
78 .schedule_status_check(&tx, Some(2 * SOLANA_DEFAULT_STATUS_RETRY_DELAY_SECONDS))
79 .await
80 {
81 warn!(error = %requeue_error, "failed to requeue status check for transaction");
82 }
83
84 info!(error = %error, "transaction status check failure handled, will retry later");
85
86 Err(error)
88 }
89
90 async fn check_and_update_status(
92 &self,
93 tx: TransactionRepoModel,
94 ) -> Result<TransactionRepoModel, TransactionError> {
95 let solana_data = tx.network_data.get_solana_transaction_data()?;
97 let signature_str = solana_data.signature.as_ref().ok_or_else(|| {
98 TransactionError::ValidationError("Transaction signature is missing".to_string())
99 })?;
100
101 let signature = Signature::from_str(signature_str).map_err(|e| {
102 TransactionError::ValidationError(format!("Invalid signature format: {}", e))
103 })?;
104
105 let solana_status = self
107 .provider()
108 .get_transaction_status(&signature)
109 .await
110 .map_err(|e| {
111 TransactionError::UnexpectedError(format!(
112 "Failed to get Solana transaction status for tx {} (signature {}): {}",
113 tx.id, signature_str, e
114 ))
115 })?;
116
117 println!("solana_status: {:?}", solana_status);
118
119 match solana_status {
121 SolanaTransactionStatus::Processed => self.handle_processed_status(tx).await,
122 SolanaTransactionStatus::Confirmed => self.handle_confirmed_status(tx).await,
123 SolanaTransactionStatus::Finalized => self.handle_finalized_status(tx).await,
124 SolanaTransactionStatus::Failed => self.handle_failed_status(tx).await,
125 }
126 }
127
128 async fn update_transaction_status_if_needed(
130 &self,
131 tx: TransactionRepoModel,
132 new_status: TransactionStatus,
133 ) -> Result<TransactionRepoModel, TransactionError> {
134 if tx.status != new_status {
135 let update_request = TransactionUpdateRequest {
136 status: Some(new_status.clone()),
137 confirmed_at: if matches!(new_status, TransactionStatus::Confirmed) {
138 Some(Utc::now().to_rfc3339())
139 } else {
140 None
141 },
142 ..Default::default()
143 };
144 return self
145 .finalize_transaction_state(tx.id.clone(), update_request)
146 .await;
147 }
148 Ok(tx)
149 }
150
151 async fn schedule_status_check(
153 &self,
154 tx: &TransactionRepoModel,
155 delay_seconds: Option<i64>,
156 ) -> Result<(), TransactionError> {
157 let delay = delay_seconds.map(calculate_scheduled_timestamp);
158 self.job_producer()
159 .produce_check_transaction_status_job(
160 TransactionStatusCheck::new(tx.id.clone(), tx.relayer_id.clone()),
161 delay,
162 )
163 .await
164 .map_err(|e| {
165 TransactionError::UnexpectedError(format!("Failed to schedule status check: {}", e))
166 })
167 }
168
169 async fn handle_processed_status(
171 &self,
172 tx: TransactionRepoModel,
173 ) -> Result<TransactionRepoModel, TransactionError> {
174 info!("transaction is processed but waiting for supermajority confirmation");
175
176 self.schedule_status_check(&tx, Some(SOLANA_DEFAULT_STATUS_RETRY_DELAY_SECONDS))
178 .await?;
179
180 Ok(tx)
182 }
183
184 async fn handle_confirmed_status(
188 &self,
189 tx: TransactionRepoModel,
190 ) -> Result<TransactionRepoModel, TransactionError> {
191 debug!("transaction is confirmed by supermajority");
192
193 let updated_tx = self
195 .update_transaction_status_if_needed(tx, TransactionStatus::Mined)
196 .await?;
197
198 self.schedule_status_check(&updated_tx, Some(SOLANA_DEFAULT_STATUS_RETRY_DELAY_SECONDS))
200 .await?;
201
202 Ok(updated_tx)
203 }
204
205 async fn handle_finalized_status(
209 &self,
210 tx: TransactionRepoModel,
211 ) -> Result<TransactionRepoModel, TransactionError> {
212 info!("transaction is finalized and irreversible");
213
214 self.update_transaction_status_if_needed(tx, TransactionStatus::Confirmed)
216 .await
217 }
218
219 async fn handle_failed_status(
221 &self,
222 tx: TransactionRepoModel,
223 ) -> Result<TransactionRepoModel, TransactionError> {
224 warn!("transaction failed on-chain");
225
226 self.update_transaction_status_if_needed(tx, TransactionStatus::Failed)
228 .await
229 }
230
231 async fn finalize_transaction_state(
233 &self,
234 tx_id: String,
235 update_req: TransactionUpdateRequest,
236 ) -> Result<TransactionRepoModel, TransactionError> {
237 let updated_tx = self
239 .transaction_repository()
240 .partial_update(tx_id, update_req)
241 .await
242 .map_err(|e| TransactionError::UnexpectedError(e.to_string()))?;
243
244 self.send_transaction_update_notification(&updated_tx)
246 .await?;
247
248 Ok(updated_tx)
249 }
250
251 async fn send_transaction_update_notification(
253 &self,
254 tx: &TransactionRepoModel,
255 ) -> Result<(), TransactionError> {
256 if let Some(notification_id) = &self.relayer().notification_id {
257 info!("sending webhook notification for transaction");
258
259 let notification_payload =
260 produce_transaction_update_notification_payload(notification_id, tx);
261
262 if let Err(e) = self
263 .job_producer()
264 .produce_send_notification_job(notification_payload, None)
265 .await
266 {
267 error!(error = %e, "failed to produce notification job");
268 }
269 }
270
271 Ok(())
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use crate::{
279 jobs::MockJobProducerTrait,
280 models::{NetworkTransactionData, SolanaTransactionData},
281 repositories::{MockRelayerRepository, MockTransactionRepository},
282 services::{MockSolanaProviderTrait, SolanaProviderError},
283 utils::mocks::mockutils::{create_mock_solana_relayer, create_mock_solana_transaction},
284 };
285 use eyre::Result;
286 use mockall::predicate::*;
287 use std::sync::Arc;
288
289 fn create_tx_with_signature(
291 status: TransactionStatus,
292 signature: Option<&str>,
293 ) -> TransactionRepoModel {
294 let mut tx = create_mock_solana_transaction();
295 tx.status = status;
296 if let Some(sig) = signature {
297 tx.network_data = NetworkTransactionData::Solana(SolanaTransactionData {
298 transaction: "test".to_string(),
299 signature: Some(sig.to_string()),
300 });
301 }
302 tx
303 }
304
305 #[tokio::test]
306 async fn test_handle_status_already_final() {
307 let provider = Arc::new(MockSolanaProviderTrait::new());
308 let relayer_repo = Arc::new(MockRelayerRepository::new());
309 let tx_repo = Arc::new(MockTransactionRepository::new());
310 let job_producer = Arc::new(MockJobProducerTrait::new());
311 let relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
312
313 let handler =
314 SolanaRelayerTransaction::new(relayer, relayer_repo, provider, tx_repo, job_producer)
315 .unwrap();
316
317 let tx_confirmed = create_tx_with_signature(TransactionStatus::Confirmed, None);
319 let result = handler
320 .handle_transaction_status_impl(tx_confirmed.clone())
321 .await;
322 assert!(result.is_ok());
323 assert_eq!(result.unwrap().id, tx_confirmed.id);
324
325 let tx_failed = create_tx_with_signature(TransactionStatus::Failed, None);
327 let result = handler
328 .handle_transaction_status_impl(tx_failed.clone())
329 .await;
330 assert!(result.is_ok());
331 assert_eq!(result.unwrap().id, tx_failed.id);
332
333 let tx_expired = create_tx_with_signature(TransactionStatus::Expired, None);
335 let result = handler
336 .handle_transaction_status_impl(tx_expired.clone())
337 .await;
338 assert!(result.is_ok());
339 assert_eq!(result.unwrap().id, tx_expired.id);
340 }
341
342 #[tokio::test]
343 async fn test_handle_status_processed() -> Result<()> {
344 let mut provider = MockSolanaProviderTrait::new();
345 let relayer_repo = Arc::new(MockRelayerRepository::new());
346 let tx_repo = Arc::new(MockTransactionRepository::new());
347 let mut job_producer = MockJobProducerTrait::new();
348
349 let signature_str =
350 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
351 let tx = create_tx_with_signature(TransactionStatus::Pending, Some(signature_str));
352
353 provider
354 .expect_get_transaction_status()
355 .with(eq(Signature::from_str(signature_str)?))
356 .times(1)
357 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Processed) }));
358
359 job_producer
360 .expect_produce_check_transaction_status_job()
361 .withf(|check, delay| check.transaction_id == "test" && delay.is_some())
362 .times(1)
363 .returning(|_, _| Box::pin(async { Ok(()) }));
364
365 let handler = SolanaRelayerTransaction::new(
366 create_mock_solana_relayer("test-relayer".to_string(), false),
367 relayer_repo,
368 Arc::new(provider),
369 tx_repo,
370 Arc::new(job_producer),
371 )?;
372
373 let result = handler.handle_transaction_status_impl(tx.clone()).await;
374
375 assert!(result.is_ok());
376 let updated_tx = result.unwrap();
377 assert_eq!(updated_tx.id, tx.id);
378 assert_eq!(updated_tx.status, TransactionStatus::Pending); Ok(())
380 }
381
382 #[tokio::test]
383 async fn test_handle_status_confirmed() -> Result<()> {
384 let mut provider = MockSolanaProviderTrait::new();
385 let relayer_repo = Arc::new(MockRelayerRepository::new());
386 let mut tx_repo = MockTransactionRepository::new();
387 let mut job_producer = MockJobProducerTrait::new();
388
389 let signature_str =
390 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
391 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
392
393 provider
394 .expect_get_transaction_status()
395 .with(eq(Signature::from_str(signature_str)?))
396 .times(1)
397 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Confirmed) }));
398
399 job_producer
400 .expect_produce_check_transaction_status_job()
401 .withf(|check, delay| check.transaction_id == "test" && delay.is_some())
402 .times(1)
403 .returning(|_, _| Box::pin(async { Ok(()) }));
404
405 let tx_id = tx.id.clone();
406
407 tx_repo
408 .expect_partial_update()
409 .withf(move |tx_id_param, update_req| {
410 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Mined)
411 })
412 .times(1)
413 .returning(move |_, _| {
414 Ok(create_tx_with_signature(
415 TransactionStatus::Mined,
416 Some(signature_str),
417 ))
418 });
419
420 let handler = SolanaRelayerTransaction::new(
421 create_mock_solana_relayer("test-relayer".to_string(), false),
422 relayer_repo,
423 Arc::new(provider),
424 Arc::new(tx_repo),
425 Arc::new(job_producer),
426 )?;
427
428 let result = handler.handle_transaction_status_impl(tx.clone()).await;
429
430 assert!(result.is_ok());
431 let updated_tx = result.unwrap();
432 assert_eq!(updated_tx.id, tx.id);
433 assert_eq!(updated_tx.status, TransactionStatus::Mined);
434 Ok(())
435 }
436
437 #[tokio::test]
438 async fn test_handle_status_finalized() -> Result<()> {
439 let mut provider = MockSolanaProviderTrait::new();
440 let relayer_repo = Arc::new(MockRelayerRepository::new());
441 let mut tx_repo = MockTransactionRepository::new();
442 let job_producer = MockJobProducerTrait::new();
443
444 let signature_str =
445 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
446 let tx = create_tx_with_signature(TransactionStatus::Mined, Some(signature_str));
447
448 provider
449 .expect_get_transaction_status()
450 .with(eq(Signature::from_str(signature_str)?))
451 .times(1)
452 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Finalized) }));
453
454 let tx_id = tx.id.clone();
455
456 tx_repo
457 .expect_partial_update()
458 .withf(move |tx_id_param, update_req| {
459 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Confirmed)
460 })
461 .times(1)
462 .returning(move |_, _| {
463 Ok(create_tx_with_signature(
464 TransactionStatus::Confirmed,
465 Some(signature_str),
466 ))
467 });
468
469 let handler = SolanaRelayerTransaction::new(
470 create_mock_solana_relayer("test-relayer".to_string(), false),
471 relayer_repo,
472 Arc::new(provider),
473 Arc::new(tx_repo),
474 Arc::new(job_producer),
475 )?;
476
477 let result = handler.handle_transaction_status_impl(tx.clone()).await;
478
479 assert!(result.is_ok());
480 let updated_tx = result.unwrap();
481 assert_eq!(updated_tx.id, tx.id);
482 assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
483 Ok(())
484 }
485
486 #[tokio::test]
487 async fn test_handle_status_provider_error() -> Result<()> {
488 let mut provider = MockSolanaProviderTrait::new();
489 let relayer_repo = Arc::new(MockRelayerRepository::new());
490 let tx_repo = Arc::new(MockTransactionRepository::new());
491 let mut job_producer = MockJobProducerTrait::new();
492
493 let signature_str = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
494 let tx = create_tx_with_signature(TransactionStatus::Pending, Some(signature_str));
495 let error_message = "Provider is down";
496
497 provider
498 .expect_get_transaction_status()
499 .with(eq(Signature::from_str(signature_str)?))
500 .times(1)
501 .returning(move |_| {
502 Box::pin(async { Err(SolanaProviderError::RpcError(error_message.to_string())) })
503 });
504
505 job_producer
506 .expect_produce_check_transaction_status_job()
507 .withf(|check, delay| check.transaction_id == "test" && delay.is_some())
508 .times(1)
509 .returning(|_, _| Box::pin(async { Ok(()) }));
510
511 let handler = SolanaRelayerTransaction::new(
512 create_mock_solana_relayer("test-relayer".to_string(), false),
513 relayer_repo,
514 Arc::new(provider),
515 tx_repo,
516 Arc::new(job_producer),
517 )?;
518
519 let result = handler.handle_transaction_status_impl(tx.clone()).await;
520
521 assert!(result.is_err());
522 let err = result.unwrap_err();
523 assert!(matches!(err, TransactionError::UnexpectedError(_)));
524 Ok(())
525 }
526
527 #[tokio::test]
528 async fn test_handle_status_failed() -> Result<()> {
529 let mut provider = MockSolanaProviderTrait::new();
530 let relayer_repo = Arc::new(MockRelayerRepository::new());
531 let mut tx_repo = MockTransactionRepository::new();
532 let job_producer = MockJobProducerTrait::new();
533
534 let signature_str =
535 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
536 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
537
538 provider
539 .expect_get_transaction_status()
540 .with(eq(Signature::from_str(signature_str)?))
541 .times(1)
542 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Failed) }));
543
544 let tx_id = tx.id.clone();
545
546 tx_repo
547 .expect_partial_update()
548 .withf(move |tx_id_param, update_req| {
549 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Failed)
550 })
551 .times(1)
552 .returning(move |_, _| {
553 Ok(create_tx_with_signature(
554 TransactionStatus::Failed,
555 Some(signature_str),
556 ))
557 });
558
559 let handler = SolanaRelayerTransaction::new(
560 create_mock_solana_relayer("test-relayer".to_string(), false),
561 relayer_repo,
562 Arc::new(provider),
563 Arc::new(tx_repo),
564 Arc::new(job_producer),
565 )?;
566
567 let result = handler.handle_transaction_status_impl(tx.clone()).await;
568
569 assert!(result.is_ok());
570 let updated_tx = result.unwrap();
571 assert_eq!(updated_tx.id, tx.id);
572 assert_eq!(updated_tx.status, TransactionStatus::Failed);
573 Ok(())
574 }
575}