1use crate::{
7 constants::DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS,
8 domain::transaction::{stellar::fetch_next_sequence_from_chain, Transaction},
9 jobs::{JobProducer, JobProducerTrait, TransactionRequest},
10 models::{
11 produce_transaction_update_notification_payload, NetworkTransactionRequest,
12 RelayerNetworkPolicy, RelayerRepoModel, TransactionError, TransactionRepoModel,
13 TransactionStatus, TransactionUpdateRequest,
14 },
15 repositories::{
16 RelayerRepositoryStorage, Repository, TransactionCounterRepositoryStorage,
17 TransactionCounterTrait, TransactionRepository, TransactionRepositoryStorage,
18 },
19 services::{Signer, StellarProvider, StellarProviderTrait, StellarSigner},
20 utils::calculate_scheduled_timestamp,
21};
22use async_trait::async_trait;
23use eyre::Result;
24use std::sync::Arc;
25use tracing::info;
26
27use super::lane_gate;
28
29#[allow(dead_code)]
30pub struct StellarRelayerTransaction<R, T, J, S, P, C>
31where
32 R: Repository<RelayerRepoModel, String>,
33 T: TransactionRepository,
34 J: JobProducerTrait,
35 S: Signer,
36 P: StellarProviderTrait,
37 C: TransactionCounterTrait,
38{
39 relayer: RelayerRepoModel,
40 relayer_repository: Arc<R>,
41 transaction_repository: Arc<T>,
42 job_producer: Arc<J>,
43 signer: Arc<S>,
44 provider: P,
45 transaction_counter_service: Arc<C>,
46}
47
48#[allow(dead_code)]
49impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
50where
51 R: Repository<RelayerRepoModel, String>,
52 T: TransactionRepository,
53 J: JobProducerTrait,
54 S: Signer,
55 P: StellarProviderTrait,
56 C: TransactionCounterTrait,
57{
58 #[allow(clippy::too_many_arguments)]
74 pub fn new(
75 relayer: RelayerRepoModel,
76 relayer_repository: Arc<R>,
77 transaction_repository: Arc<T>,
78 job_producer: Arc<J>,
79 signer: Arc<S>,
80 provider: P,
81 transaction_counter_service: Arc<C>,
82 ) -> Result<Self, TransactionError> {
83 Ok(Self {
84 relayer,
85 relayer_repository,
86 transaction_repository,
87 job_producer,
88 signer,
89 provider,
90 transaction_counter_service,
91 })
92 }
93
94 pub fn provider(&self) -> &P {
95 &self.provider
96 }
97
98 pub fn relayer(&self) -> &RelayerRepoModel {
99 &self.relayer
100 }
101
102 pub fn job_producer(&self) -> &J {
103 &self.job_producer
104 }
105
106 pub fn transaction_repository(&self) -> &T {
107 &self.transaction_repository
108 }
109
110 pub fn signer(&self) -> &S {
111 &self.signer
112 }
113
114 pub fn transaction_counter_service(&self) -> &C {
115 &self.transaction_counter_service
116 }
117
118 pub fn concurrent_transactions_enabled(&self) -> bool {
119 if let RelayerNetworkPolicy::Stellar(policy) = &self.relayer().policies {
120 policy
121 .concurrent_transactions
122 .unwrap_or(DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS)
123 } else {
124 DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
125 }
126 }
127
128 pub async fn send_transaction_request_job(
130 &self,
131 tx: &TransactionRepoModel,
132 delay_seconds: Option<i64>,
133 ) -> Result<(), TransactionError> {
134 let job = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
135 let scheduled_on = delay_seconds.map(calculate_scheduled_timestamp);
136 self.job_producer()
137 .produce_transaction_request_job(job, scheduled_on)
138 .await?;
139 Ok(())
140 }
141
142 pub(super) async fn send_transaction_update_notification(
144 &self,
145 tx: &TransactionRepoModel,
146 ) -> Result<(), TransactionError> {
147 if let Some(notification_id) = &self.relayer().notification_id {
148 self.job_producer()
149 .produce_send_notification_job(
150 produce_transaction_update_notification_payload(notification_id, tx),
151 None,
152 )
153 .await
154 .map_err(|e| {
155 TransactionError::UnexpectedError(format!("Failed to send notification: {}", e))
156 })?;
157 }
158 Ok(())
159 }
160
161 pub async fn finalize_transaction_state(
163 &self,
164 tx_id: String,
165 update_req: TransactionUpdateRequest,
166 ) -> Result<TransactionRepoModel, TransactionError> {
167 let updated_tx = self
168 .transaction_repository()
169 .partial_update(tx_id, update_req)
170 .await?;
171
172 self.send_transaction_update_notification(&updated_tx)
173 .await?;
174 Ok(updated_tx)
175 }
176
177 pub async fn enqueue_next_pending_transaction(
178 &self,
179 finished_tx_id: &str,
180 ) -> Result<(), TransactionError> {
181 if !self.concurrent_transactions_enabled() {
182 if let Some(next) = self
183 .find_oldest_pending_for_relayer(&self.relayer().id)
184 .await?
185 {
186 info!(to_tx_id = %next.id, finished_tx_id = %finished_tx_id, "handing over lane");
188 lane_gate::pass_to(&self.relayer().id, finished_tx_id, &next.id);
189 self.send_transaction_request_job(&next, None).await?;
190 } else {
191 info!(finished_tx_id = %finished_tx_id, "releasing relayer lane");
192 lane_gate::free(&self.relayer().id, finished_tx_id);
193 }
194 }
195 Ok(())
196 }
197
198 async fn find_oldest_pending_for_relayer(
200 &self,
201 relayer_id: &str,
202 ) -> Result<Option<TransactionRepoModel>, TransactionError> {
203 let pending_txs = self
204 .transaction_repository()
205 .find_by_status(relayer_id, &[TransactionStatus::Pending])
206 .await
207 .map_err(TransactionError::from)?;
208
209 Ok(pending_txs.into_iter().next())
210 }
211
212 pub async fn sync_sequence_from_chain(
215 &self,
216 relayer_address: &str,
217 ) -> Result<(), TransactionError> {
218 info!(address = %relayer_address, "syncing sequence number from chain");
219
220 let next_usable_seq = fetch_next_sequence_from_chain(self.provider(), relayer_address)
222 .await
223 .map_err(TransactionError::UnexpectedError)?;
224
225 self.transaction_counter_service()
227 .set(&self.relayer().id, relayer_address, next_usable_seq)
228 .await
229 .map_err(|e| {
230 TransactionError::UnexpectedError(format!(
231 "Failed to update sequence counter: {}",
232 e
233 ))
234 })?;
235
236 info!(sequence = %next_usable_seq, "updated local sequence counter");
237 Ok(())
238 }
239
240 pub async fn reset_transaction_for_retry(
243 &self,
244 tx: TransactionRepoModel,
245 ) -> Result<TransactionRepoModel, TransactionError> {
246 info!("resetting transaction for retry through pipeline");
247
248 let update_req = tx.create_reset_update_request()?;
250
251 let reset_tx = self
253 .transaction_repository()
254 .partial_update(tx.id.clone(), update_req)
255 .await?;
256
257 info!("transaction reset successfully to pre-prepare state");
258 Ok(reset_tx)
259 }
260}
261
262#[async_trait]
263impl<R, T, J, S, P, C> Transaction for StellarRelayerTransaction<R, T, J, S, P, C>
264where
265 R: Repository<RelayerRepoModel, String> + Send + Sync,
266 T: TransactionRepository + Send + Sync,
267 J: JobProducerTrait + Send + Sync,
268 S: Signer + Send + Sync,
269 P: StellarProviderTrait + Send + Sync,
270 C: TransactionCounterTrait + Send + Sync,
271{
272 async fn prepare_transaction(
273 &self,
274 tx: TransactionRepoModel,
275 ) -> Result<TransactionRepoModel, TransactionError> {
276 self.prepare_transaction_impl(tx).await
277 }
278
279 async fn submit_transaction(
280 &self,
281 tx: TransactionRepoModel,
282 ) -> Result<TransactionRepoModel, TransactionError> {
283 self.submit_transaction_impl(tx).await
284 }
285
286 async fn resubmit_transaction(
287 &self,
288 tx: TransactionRepoModel,
289 ) -> Result<TransactionRepoModel, TransactionError> {
290 Ok(tx)
291 }
292
293 async fn handle_transaction_status(
294 &self,
295 tx: TransactionRepoModel,
296 ) -> Result<TransactionRepoModel, TransactionError> {
297 self.handle_transaction_status_impl(tx).await
298 }
299
300 async fn cancel_transaction(
301 &self,
302 tx: TransactionRepoModel,
303 ) -> Result<TransactionRepoModel, TransactionError> {
304 Ok(tx)
305 }
306
307 async fn replace_transaction(
308 &self,
309 _old_tx: TransactionRepoModel,
310 _new_tx_request: NetworkTransactionRequest,
311 ) -> Result<TransactionRepoModel, TransactionError> {
312 Ok(_old_tx)
313 }
314
315 async fn sign_transaction(
316 &self,
317 tx: TransactionRepoModel,
318 ) -> Result<TransactionRepoModel, TransactionError> {
319 Ok(tx)
320 }
321
322 async fn validate_transaction(
323 &self,
324 _tx: TransactionRepoModel,
325 ) -> Result<bool, TransactionError> {
326 Ok(true)
327 }
328}
329
330pub type DefaultStellarTransaction = StellarRelayerTransaction<
331 RelayerRepositoryStorage,
332 TransactionRepositoryStorage,
333 JobProducer,
334 StellarSigner,
335 StellarProvider,
336 TransactionCounterRepositoryStorage,
337>;
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use crate::models::{NetworkTransactionData, RepositoryError};
343 use std::sync::Arc;
344
345 use crate::domain::transaction::stellar::test_helpers::*;
346
347 #[test]
348 fn new_returns_ok() {
349 let relayer = create_test_relayer();
350 let mocks = default_test_mocks();
351 let result = StellarRelayerTransaction::new(
352 relayer,
353 Arc::new(mocks.relayer_repo),
354 Arc::new(mocks.tx_repo),
355 Arc::new(mocks.job_producer),
356 Arc::new(mocks.signer),
357 mocks.provider,
358 Arc::new(mocks.counter),
359 );
360 assert!(result.is_ok());
361 }
362
363 #[test]
364 fn accessor_methods_return_correct_references() {
365 let relayer = create_test_relayer();
366 let mocks = default_test_mocks();
367 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
368
369 assert_eq!(handler.relayer().id, "relayer-1");
371 assert_eq!(handler.relayer().address, TEST_PK);
372
373 let _ = handler.provider();
375 let _ = handler.job_producer();
376 let _ = handler.transaction_repository();
377 let _ = handler.signer();
378 let _ = handler.transaction_counter_service();
379 }
380
381 #[tokio::test]
382 async fn send_transaction_request_job_success() {
383 let relayer = create_test_relayer();
384 let mut mocks = default_test_mocks();
385
386 mocks
387 .job_producer
388 .expect_produce_transaction_request_job()
389 .withf(|job, delay| {
390 job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay.is_none()
391 })
392 .times(1)
393 .returning(|_, _| Box::pin(async { Ok(()) }));
394
395 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
396 let tx = create_test_transaction(&relayer.id);
397
398 let result = handler.send_transaction_request_job(&tx, None).await;
399 assert!(result.is_ok());
400 }
401
402 #[tokio::test]
403 async fn send_transaction_request_job_with_delay() {
404 let relayer = create_test_relayer();
405 let mut mocks = default_test_mocks();
406
407 mocks
408 .job_producer
409 .expect_produce_transaction_request_job()
410 .withf(|job, delay| {
411 job.transaction_id == "tx-1"
412 && job.relayer_id == "relayer-1"
413 && delay.is_some()
414 && delay.unwrap() > chrono::Utc::now().timestamp()
415 })
416 .times(1)
417 .returning(|_, _| Box::pin(async { Ok(()) }));
418
419 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
420 let tx = create_test_transaction(&relayer.id);
421
422 let result = handler.send_transaction_request_job(&tx, Some(60)).await;
423 assert!(result.is_ok());
424 }
425
426 #[tokio::test]
427 async fn finalize_transaction_state_success() {
428 let relayer = create_test_relayer();
429 let mut mocks = default_test_mocks();
430
431 mocks
433 .tx_repo
434 .expect_partial_update()
435 .withf(|tx_id, update| {
436 tx_id == "tx-1"
437 && update.status == Some(TransactionStatus::Confirmed)
438 && update.status_reason == Some("Transaction confirmed".to_string())
439 })
440 .times(1)
441 .returning(|tx_id, update| {
442 let mut tx = create_test_transaction("relayer-1");
443 tx.id = tx_id;
444 tx.status = update.status.unwrap();
445 tx.status_reason = update.status_reason;
446 tx.confirmed_at = update.confirmed_at;
447 Ok::<_, RepositoryError>(tx)
448 });
449
450 mocks
452 .job_producer
453 .expect_produce_send_notification_job()
454 .times(1)
455 .returning(|_, _| Box::pin(async { Ok(()) }));
456
457 let handler = make_stellar_tx_handler(relayer, mocks);
458
459 let update_request = TransactionUpdateRequest {
460 status: Some(TransactionStatus::Confirmed),
461 status_reason: Some("Transaction confirmed".to_string()),
462 confirmed_at: Some("2023-01-01T00:00:00Z".to_string()),
463 ..Default::default()
464 };
465
466 let result = handler
467 .finalize_transaction_state("tx-1".to_string(), update_request)
468 .await;
469
470 assert!(result.is_ok());
471 let updated_tx = result.unwrap();
472 assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
473 assert_eq!(
474 updated_tx.status_reason,
475 Some("Transaction confirmed".to_string())
476 );
477 }
478
479 #[tokio::test]
480 async fn enqueue_next_pending_transaction_with_pending_tx() {
481 let relayer = create_test_relayer();
482 let mut mocks = default_test_mocks();
483
484 let mut pending_tx = create_test_transaction(&relayer.id);
486 pending_tx.id = "pending-tx-1".to_string();
487
488 mocks
489 .tx_repo
490 .expect_find_by_status()
491 .withf(|relayer_id, statuses| {
492 relayer_id == "relayer-1" && statuses == [TransactionStatus::Pending]
493 })
494 .times(1)
495 .returning(move |_, _| {
496 let mut tx = create_test_transaction("relayer-1");
497 tx.id = "pending-tx-1".to_string();
498 Ok(vec![tx])
499 });
500
501 mocks
503 .job_producer
504 .expect_produce_transaction_request_job()
505 .withf(|job, delay| job.transaction_id == "pending-tx-1" && delay.is_none())
506 .times(1)
507 .returning(|_, _| Box::pin(async { Ok(()) }));
508
509 let handler = make_stellar_tx_handler(relayer, mocks);
510
511 let result = handler
512 .enqueue_next_pending_transaction("finished-tx")
513 .await;
514 assert!(result.is_ok());
515 }
516
517 #[tokio::test]
518 async fn enqueue_next_pending_transaction_no_pending_tx() {
519 let relayer = create_test_relayer();
520 let mut mocks = default_test_mocks();
521
522 mocks
524 .tx_repo
525 .expect_find_by_status()
526 .times(1)
527 .returning(|_, _| Ok(vec![]));
528
529 let handler = make_stellar_tx_handler(relayer, mocks);
530
531 let result = handler
532 .enqueue_next_pending_transaction("finished-tx")
533 .await;
534 assert!(result.is_ok());
535 }
536
537 #[tokio::test]
538 async fn test_sync_sequence_from_chain() {
539 let relayer = create_test_relayer();
540 let mut mocks = default_test_mocks();
541
542 mocks
544 .provider
545 .expect_get_account()
546 .withf(|addr| addr == TEST_PK)
547 .times(1)
548 .returning(|_| {
549 Box::pin(async {
550 use soroban_rs::xdr::{
551 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
552 String32, Thresholds, Uint256,
553 };
554 use stellar_strkey::ed25519;
555
556 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
558 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
559
560 Ok(AccountEntry {
561 account_id,
562 balance: 1000000,
563 seq_num: SequenceNumber(100),
564 num_sub_entries: 0,
565 inflation_dest: None,
566 flags: 0,
567 home_domain: String32::default(),
568 thresholds: Thresholds([1, 1, 1, 1]),
569 signers: Default::default(),
570 ext: AccountEntryExt::V0,
571 })
572 })
573 });
574
575 mocks
577 .counter
578 .expect_set()
579 .withf(|relayer_id, addr, seq| {
580 relayer_id == "relayer-1" && addr == TEST_PK && *seq == 101
581 })
582 .times(1)
583 .returning(|_, _, _| Box::pin(async { Ok(()) }));
584
585 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
586
587 let result = handler.sync_sequence_from_chain(&relayer.address).await;
588 assert!(result.is_ok());
589 }
590
591 #[tokio::test]
592 async fn test_sync_sequence_from_chain_provider_error() {
593 let relayer = create_test_relayer();
594 let mut mocks = default_test_mocks();
595
596 mocks
598 .provider
599 .expect_get_account()
600 .times(1)
601 .returning(|_| Box::pin(async { Err(eyre::eyre!("Account not found")) }));
602
603 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
604
605 let result = handler.sync_sequence_from_chain(&relayer.address).await;
606 assert!(result.is_err());
607 match result.unwrap_err() {
608 TransactionError::UnexpectedError(msg) => {
609 assert!(msg.contains("Failed to fetch account from chain"));
610 }
611 _ => panic!("Expected UnexpectedError"),
612 }
613 }
614
615 #[tokio::test]
616 async fn test_sync_sequence_from_chain_counter_error() {
617 let relayer = create_test_relayer();
618 let mut mocks = default_test_mocks();
619
620 mocks.provider.expect_get_account().times(1).returning(|_| {
622 Box::pin(async {
623 use soroban_rs::xdr::{
624 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber, String32,
625 Thresholds, Uint256,
626 };
627 use stellar_strkey::ed25519;
628
629 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
631 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
632
633 Ok(AccountEntry {
634 account_id,
635 balance: 1000000,
636 seq_num: SequenceNumber(100),
637 num_sub_entries: 0,
638 inflation_dest: None,
639 flags: 0,
640 home_domain: String32::default(),
641 thresholds: Thresholds([1, 1, 1, 1]),
642 signers: Default::default(),
643 ext: AccountEntryExt::V0,
644 })
645 })
646 });
647
648 mocks.counter.expect_set().times(1).returning(|_, _, _| {
650 Box::pin(async {
651 Err(RepositoryError::Unknown(
652 "Counter update failed".to_string(),
653 ))
654 })
655 });
656
657 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
658
659 let result = handler.sync_sequence_from_chain(&relayer.address).await;
660 assert!(result.is_err());
661 match result.unwrap_err() {
662 TransactionError::UnexpectedError(msg) => {
663 assert!(msg.contains("Failed to update sequence counter"));
664 }
665 _ => panic!("Expected UnexpectedError"),
666 }
667 }
668
669 #[test]
670 fn test_concurrent_transactions_enabled() {
671 let mut relayer = create_test_relayer();
673 if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
674 policy.concurrent_transactions = Some(true);
675 }
676 let mocks = default_test_mocks();
677 let handler = make_stellar_tx_handler(relayer, mocks);
678 assert!(handler.concurrent_transactions_enabled());
679
680 let mut relayer = create_test_relayer();
682 if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
683 policy.concurrent_transactions = Some(false);
684 }
685 let mocks = default_test_mocks();
686 let handler = make_stellar_tx_handler(relayer, mocks);
687 assert!(!handler.concurrent_transactions_enabled());
688
689 let relayer = create_test_relayer();
691 let mocks = default_test_mocks();
692 let handler = make_stellar_tx_handler(relayer, mocks);
693 assert_eq!(
694 handler.concurrent_transactions_enabled(),
695 DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
696 );
697 }
698
699 #[tokio::test]
700 async fn test_enqueue_next_pending_transaction_with_concurrency_enabled() {
701 let mut relayer = create_test_relayer();
703 if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
704 policy.concurrent_transactions = Some(true);
705 }
706 let mut mocks = default_test_mocks();
707
708 mocks.tx_repo.expect_find_by_status().times(0); mocks
713 .job_producer
714 .expect_produce_transaction_request_job()
715 .times(0); let handler = make_stellar_tx_handler(relayer, mocks);
718
719 let result = handler
720 .enqueue_next_pending_transaction("finished-tx")
721 .await;
722 assert!(result.is_ok());
723 }
724
725 #[tokio::test]
726 async fn test_reset_transaction_for_retry() {
727 let relayer = create_test_relayer();
728 let mut mocks = default_test_mocks();
729
730 let mut tx = create_test_transaction(&relayer.id);
732 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
733 data.sequence_number = Some(42);
734 data.signatures.push(dummy_signature());
735 data.hash = Some("test-hash".to_string());
736 data.signed_envelope_xdr = Some("test-xdr".to_string());
737 }
738
739 mocks
741 .tx_repo
742 .expect_partial_update()
743 .withf(|tx_id, upd| {
744 tx_id == "tx-1"
745 && upd.status == Some(TransactionStatus::Pending)
746 && upd.sent_at.is_none()
747 && upd.confirmed_at.is_none()
748 })
749 .times(1)
750 .returning(|id, upd| {
751 let mut tx = create_test_transaction("relayer-1");
752 tx.id = id;
753 tx.status = upd.status.unwrap();
754 if let Some(network_data) = upd.network_data {
755 tx.network_data = network_data;
756 }
757 Ok::<_, RepositoryError>(tx)
758 });
759
760 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
761
762 let result = handler.reset_transaction_for_retry(tx).await;
763 assert!(result.is_ok());
764
765 let reset_tx = result.unwrap();
766 assert_eq!(reset_tx.status, TransactionStatus::Pending);
767
768 if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
770 assert!(data.sequence_number.is_none());
771 assert!(data.signatures.is_empty());
772 assert!(data.hash.is_none());
773 assert!(data.signed_envelope_xdr.is_none());
774 } else {
775 panic!("Expected Stellar transaction data");
776 }
777 }
778}