openzeppelin_relayer/domain/transaction/stellar/
submit.rs

1//! This module contains the submission-related functionality for Stellar transactions.
2//! It includes methods for submitting transactions with robust error handling,
3//! ensuring proper transaction state management on failure.
4
5use chrono::Utc;
6use tracing::{info, warn};
7
8use super::{utils::is_bad_sequence_error, StellarRelayerTransaction};
9use crate::{
10    constants::{STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS, STELLAR_STATUS_CHECK_JOB_DELAY_SECONDS},
11    jobs::{JobProducerTrait, TransactionStatusCheck},
12    models::{
13        NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
14        TransactionStatus, TransactionUpdateRequest,
15    },
16    repositories::{Repository, TransactionCounterTrait, TransactionRepository},
17    services::{Signer, StellarProviderTrait},
18    utils::calculate_scheduled_timestamp,
19};
20
21impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
22where
23    R: Repository<RelayerRepoModel, String> + Send + Sync,
24    T: TransactionRepository + Send + Sync,
25    J: JobProducerTrait + Send + Sync,
26    S: Signer + Send + Sync,
27    P: StellarProviderTrait + Send + Sync,
28    C: TransactionCounterTrait + Send + Sync,
29{
30    /// Main submission method with robust error handling.
31    /// Unlike prepare, submit doesn't claim lanes but still needs proper error handling.
32    pub async fn submit_transaction_impl(
33        &self,
34        tx: TransactionRepoModel,
35    ) -> Result<TransactionRepoModel, TransactionError> {
36        info!("submitting stellar transaction");
37
38        // Call core submission logic with error handling
39        match self.submit_core(tx.clone()).await {
40            Ok(submitted_tx) => Ok(submitted_tx),
41            Err(error) => {
42                // Handle submission failure - mark as failed and send notification
43                self.handle_submit_failure(tx, error).await
44            }
45        }
46    }
47
48    /// Core submission logic - pure business logic without error handling concerns.
49    async fn submit_core(
50        &self,
51        tx: TransactionRepoModel,
52    ) -> Result<TransactionRepoModel, TransactionError> {
53        let stellar_data = tx.network_data.get_stellar_transaction_data()?;
54        let tx_envelope = stellar_data
55            .get_envelope_for_submission()
56            .map_err(TransactionError::from)?;
57
58        let hash = self
59            .provider()
60            .send_transaction(&tx_envelope)
61            .await
62            .map_err(TransactionError::from)?;
63
64        let tx_hash_hex = hex::encode(hash.as_slice());
65        let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
66
67        let mut hashes = tx.hashes.clone();
68        hashes.push(tx_hash_hex);
69
70        let update_req = TransactionUpdateRequest {
71            status: Some(TransactionStatus::Submitted),
72            sent_at: Some(Utc::now().to_rfc3339()),
73            network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
74            hashes: Some(hashes),
75            ..Default::default()
76        };
77
78        let updated_tx = self
79            .transaction_repository()
80            .partial_update(tx.id.clone(), update_req)
81            .await?;
82
83        // Enqueue status check job
84        self.job_producer()
85            .produce_check_transaction_status_job(
86                TransactionStatusCheck::new(updated_tx.id.clone(), updated_tx.relayer_id.clone()),
87                Some(calculate_scheduled_timestamp(
88                    STELLAR_STATUS_CHECK_JOB_DELAY_SECONDS,
89                )),
90            )
91            .await?;
92
93        // Send notification
94        self.send_transaction_update_notification(&updated_tx)
95            .await?;
96
97        Ok(updated_tx)
98    }
99
100    /// Handles submission failures with comprehensive cleanup and error reporting.
101    /// For bad sequence errors, resets the transaction and re-enqueues it for retry.
102    async fn handle_submit_failure(
103        &self,
104        tx: TransactionRepoModel,
105        error: TransactionError,
106    ) -> Result<TransactionRepoModel, TransactionError> {
107        let error_reason = format!("Submission failed: {}", error);
108        let tx_id = tx.id.clone();
109        warn!(reason = %error_reason, "transaction submission failed");
110
111        if is_bad_sequence_error(&error_reason) {
112            // For bad sequence errors, sync sequence from chain first
113            if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() {
114                info!("syncing sequence from chain after bad sequence error");
115                match self
116                    .sync_sequence_from_chain(&stellar_data.source_account)
117                    .await
118                {
119                    Ok(()) => {
120                        info!("successfully synced sequence from chain");
121                    }
122                    Err(sync_error) => {
123                        warn!(error = %sync_error, "failed to sync sequence from chain");
124                    }
125                }
126            }
127
128            // Reset the transaction and re-enqueue it
129            info!("bad sequence error detected, resetting and re-enqueueing");
130
131            // Reset the transaction to pending state
132            match self.reset_transaction_for_retry(tx.clone()).await {
133                Ok(reset_tx) => {
134                    // Re-enqueue the transaction to go through the pipeline again
135                    if let Err(e) = self
136                        .send_transaction_request_job(
137                            &reset_tx,
138                            Some(calculate_scheduled_timestamp(
139                                STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS,
140                            )),
141                        )
142                        .await
143                    {
144                        warn!(error = %e, "failed to re-enqueue transaction after reset");
145                    } else {
146                        info!("transaction reset and re-enqueued for retry through pipeline");
147                    }
148
149                    // Return success since we're handling the retry
150                    return Ok(reset_tx);
151                }
152                Err(reset_error) => {
153                    warn!(error = %reset_error, "failed to reset transaction for retry");
154                    // Fall through to normal failure handling
155                }
156            }
157        }
158
159        // For non-bad-sequence errors or if reset failed, mark as failed
160        // Step 1: Mark transaction as Failed with detailed reason
161        let update_request = TransactionUpdateRequest {
162            status: Some(TransactionStatus::Failed),
163            status_reason: Some(error_reason.clone()),
164            ..Default::default()
165        };
166        let _failed_tx = match self
167            .finalize_transaction_state(tx_id.clone(), update_request)
168            .await
169        {
170            Ok(updated_tx) => updated_tx,
171            Err(finalize_error) => {
172                warn!(error = %finalize_error, "failed to mark transaction as failed, continuing with lane cleanup");
173                tx
174            }
175        };
176
177        // Attempt to enqueue next pending transaction or release lane
178        if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
179            warn!(error = %enqueue_error, "failed to enqueue next pending transaction after submission failure");
180        }
181
182        info!(error = %error_reason, "transaction submission failure handled");
183
184        Err(error)
185    }
186
187    /// Resubmit transaction - delegates to submit_transaction_impl
188    pub async fn resubmit_transaction_impl(
189        &self,
190        tx: TransactionRepoModel,
191    ) -> Result<TransactionRepoModel, TransactionError> {
192        self.submit_transaction_impl(tx).await
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use soroban_rs::xdr::{Hash, WriteXdr};
200
201    use crate::domain::transaction::stellar::test_helpers::*;
202
203    mod submit_transaction_tests {
204        use crate::models::RepositoryError;
205
206        use super::*;
207
208        #[tokio::test]
209        async fn submit_transaction_happy_path() {
210            let relayer = create_test_relayer();
211            let mut mocks = default_test_mocks();
212
213            // provider gives a hash
214            mocks
215                .provider
216                .expect_send_transaction()
217                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
218
219            // expect partial update to Submitted
220            mocks
221                .tx_repo
222                .expect_partial_update()
223                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
224                .returning(|id, upd| {
225                    let mut tx = create_test_transaction("relayer-1");
226                    tx.id = id;
227                    tx.status = upd.status.unwrap();
228                    Ok::<_, RepositoryError>(tx)
229                });
230
231            // enqueue status-check & notification
232            mocks
233                .job_producer
234                .expect_produce_check_transaction_status_job()
235                .times(1)
236                .returning(|_, _| Box::pin(async { Ok(()) }));
237            mocks
238                .job_producer
239                .expect_produce_send_notification_job()
240                .times(1)
241                .returning(|_, _| Box::pin(async { Ok(()) }));
242
243            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
244
245            let mut tx = create_test_transaction(&relayer.id);
246            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
247                d.signatures.push(dummy_signature());
248            }
249
250            let res = handler.submit_transaction_impl(tx).await.unwrap();
251            assert_eq!(res.status, TransactionStatus::Submitted);
252        }
253
254        #[tokio::test]
255        async fn submit_transaction_provider_error_marks_failed() {
256            let relayer = create_test_relayer();
257            let mut mocks = default_test_mocks();
258
259            // Provider fails with non-bad-sequence error
260            mocks
261                .provider
262                .expect_send_transaction()
263                .returning(|_| Box::pin(async { Err(eyre::eyre!("Network error")) }));
264
265            // Mock finalize_transaction_state for failure handling
266            mocks
267                .tx_repo
268                .expect_partial_update()
269                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
270                .returning(|id, upd| {
271                    let mut tx = create_test_transaction("relayer-1");
272                    tx.id = id;
273                    tx.status = upd.status.unwrap();
274                    Ok::<_, RepositoryError>(tx)
275                });
276
277            // Mock notification for failed transaction
278            mocks
279                .job_producer
280                .expect_produce_send_notification_job()
281                .times(1)
282                .returning(|_, _| Box::pin(async { Ok(()) }));
283
284            // Mock find_by_status for enqueue_next_pending_transaction
285            mocks
286                .tx_repo
287                .expect_find_by_status()
288                .returning(|_, _| Ok(vec![])); // No pending transactions
289
290            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
291            let mut tx = create_test_transaction(&relayer.id);
292            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
293                data.signatures.push(dummy_signature());
294                data.sequence_number = Some(42); // Set sequence number
295            }
296
297            let res = handler.submit_transaction_impl(tx).await;
298
299            // Should return error but transaction should be marked as failed
300            assert!(res.is_err());
301            matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
302        }
303
304        #[tokio::test]
305        async fn submit_transaction_repository_error_marks_failed() {
306            let relayer = create_test_relayer();
307            let mut mocks = default_test_mocks();
308
309            // Provider succeeds
310            mocks
311                .provider
312                .expect_send_transaction()
313                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
314
315            // Repository fails on first update (submission)
316            mocks
317                .tx_repo
318                .expect_partial_update()
319                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
320                .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
321
322            // Mock finalize_transaction_state for failure handling
323            mocks
324                .tx_repo
325                .expect_partial_update()
326                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
327                .returning(|id, upd| {
328                    let mut tx = create_test_transaction("relayer-1");
329                    tx.id = id;
330                    tx.status = upd.status.unwrap();
331                    Ok::<_, RepositoryError>(tx)
332                });
333
334            // Mock notification for failed transaction
335            mocks
336                .job_producer
337                .expect_produce_send_notification_job()
338                .times(1)
339                .returning(|_, _| Box::pin(async { Ok(()) }));
340
341            // Mock find_by_status for enqueue_next_pending_transaction
342            mocks
343                .tx_repo
344                .expect_find_by_status()
345                .returning(|_, _| Ok(vec![])); // No pending transactions
346
347            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
348            let mut tx = create_test_transaction(&relayer.id);
349            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
350                data.signatures.push(dummy_signature());
351                data.sequence_number = Some(42); // Set sequence number
352            }
353
354            let res = handler.submit_transaction_impl(tx).await;
355
356            // Should return error but transaction should be marked as failed
357            assert!(res.is_err());
358        }
359
360        #[tokio::test]
361        async fn submit_transaction_uses_signed_envelope_xdr() {
362            let relayer = create_test_relayer();
363            let mut mocks = default_test_mocks();
364
365            // Create a transaction with signed_envelope_xdr set
366            let mut tx = create_test_transaction(&relayer.id);
367            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
368                data.signatures.push(dummy_signature());
369                // Build and store the signed envelope XDR
370                let envelope = data.get_envelope_for_submission().unwrap();
371                let xdr = envelope
372                    .to_xdr_base64(soroban_rs::xdr::Limits::none())
373                    .unwrap();
374                data.signed_envelope_xdr = Some(xdr);
375            }
376
377            // Provider should receive the envelope decoded from signed_envelope_xdr
378            mocks
379                .provider
380                .expect_send_transaction()
381                .returning(|_| Box::pin(async { Ok(Hash([2u8; 32])) }));
382
383            // Update to Submitted
384            mocks
385                .tx_repo
386                .expect_partial_update()
387                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
388                .returning(|id, upd| {
389                    let mut tx = create_test_transaction("relayer-1");
390                    tx.id = id;
391                    tx.status = upd.status.unwrap();
392                    Ok::<_, RepositoryError>(tx)
393                });
394
395            // Job and notification expectations
396            mocks
397                .job_producer
398                .expect_produce_check_transaction_status_job()
399                .times(1)
400                .returning(|_, _| Box::pin(async { Ok(()) }));
401            mocks
402                .job_producer
403                .expect_produce_send_notification_job()
404                .times(1)
405                .returning(|_, _| Box::pin(async { Ok(()) }));
406
407            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
408            let res = handler.submit_transaction_impl(tx).await.unwrap();
409
410            assert_eq!(res.status, TransactionStatus::Submitted);
411        }
412
413        #[tokio::test]
414        async fn resubmit_transaction_delegates_to_submit() {
415            let relayer = create_test_relayer();
416            let mut mocks = default_test_mocks();
417
418            // provider gives a hash
419            mocks
420                .provider
421                .expect_send_transaction()
422                .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
423
424            // expect partial update to Submitted
425            mocks
426                .tx_repo
427                .expect_partial_update()
428                .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
429                .returning(|id, upd| {
430                    let mut tx = create_test_transaction("relayer-1");
431                    tx.id = id;
432                    tx.status = upd.status.unwrap();
433                    Ok::<_, RepositoryError>(tx)
434                });
435
436            // enqueue status-check & notification
437            mocks
438                .job_producer
439                .expect_produce_check_transaction_status_job()
440                .times(1)
441                .returning(|_, _| Box::pin(async { Ok(()) }));
442            mocks
443                .job_producer
444                .expect_produce_send_notification_job()
445                .times(1)
446                .returning(|_, _| Box::pin(async { Ok(()) }));
447
448            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
449
450            let mut tx = create_test_transaction(&relayer.id);
451            if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
452                d.signatures.push(dummy_signature());
453            }
454
455            let res = handler.resubmit_transaction_impl(tx).await.unwrap();
456            assert_eq!(res.status, TransactionStatus::Submitted);
457        }
458
459        #[tokio::test]
460        async fn submit_transaction_failure_enqueues_next_transaction() {
461            let relayer = create_test_relayer();
462            let mut mocks = default_test_mocks();
463
464            // Provider fails with non-bad-sequence error
465            mocks
466                .provider
467                .expect_send_transaction()
468                .returning(|_| Box::pin(async { Err(eyre::eyre!("Network error")) }));
469
470            // No sync expected for non-bad-sequence errors
471
472            // Mock finalize_transaction_state for failure handling
473            mocks
474                .tx_repo
475                .expect_partial_update()
476                .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
477                .returning(|id, upd| {
478                    let mut tx = create_test_transaction("relayer-1");
479                    tx.id = id;
480                    tx.status = upd.status.unwrap();
481                    Ok::<_, RepositoryError>(tx)
482                });
483
484            // Mock notification for failed transaction
485            mocks
486                .job_producer
487                .expect_produce_send_notification_job()
488                .times(1)
489                .returning(|_, _| Box::pin(async { Ok(()) }));
490
491            // Mock find_by_status to return a pending transaction
492            let mut pending_tx = create_test_transaction(&relayer.id);
493            pending_tx.id = "next-pending-tx".to_string();
494            pending_tx.status = TransactionStatus::Pending;
495            let captured_pending_tx = pending_tx.clone();
496            mocks
497                .tx_repo
498                .expect_find_by_status()
499                .with(
500                    mockall::predicate::eq(relayer.id.clone()),
501                    mockall::predicate::eq(vec![TransactionStatus::Pending]),
502                )
503                .times(1)
504                .returning(move |_, _| Ok(vec![captured_pending_tx.clone()]));
505
506            // Mock produce_transaction_request_job for the next pending transaction
507            mocks
508                .job_producer
509                .expect_produce_transaction_request_job()
510                .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
511                .times(1)
512                .returning(|_, _| Box::pin(async { Ok(()) }));
513
514            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
515            let mut tx = create_test_transaction(&relayer.id);
516            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
517                data.signatures.push(dummy_signature());
518                data.sequence_number = Some(42); // Set sequence number
519            }
520
521            let res = handler.submit_transaction_impl(tx).await;
522
523            // Should return error but next transaction should be enqueued
524            assert!(res.is_err());
525            matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
526        }
527
528        #[tokio::test]
529        async fn test_submit_bad_sequence_resets_and_retries() {
530            let relayer = create_test_relayer();
531            let mut mocks = default_test_mocks();
532
533            // Mock provider to return bad sequence error
534            mocks.provider.expect_send_transaction().returning(|_| {
535                Box::pin(async { Err(eyre::eyre!("transaction submission failed: TxBadSeq")) })
536            });
537
538            // Mock get_account for sync_sequence_from_chain
539            mocks.provider.expect_get_account().times(1).returning(|_| {
540                Box::pin(async {
541                    use soroban_rs::xdr::{
542                        AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
543                        String32, Thresholds, Uint256,
544                    };
545                    use stellar_strkey::ed25519;
546
547                    let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
548                    let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
549
550                    Ok(AccountEntry {
551                        account_id,
552                        balance: 1000000,
553                        seq_num: SequenceNumber(100),
554                        num_sub_entries: 0,
555                        inflation_dest: None,
556                        flags: 0,
557                        home_domain: String32::default(),
558                        thresholds: Thresholds([1, 1, 1, 1]),
559                        signers: Default::default(),
560                        ext: AccountEntryExt::V0,
561                    })
562                })
563            });
564
565            // Mock counter set for sync_sequence_from_chain
566            mocks
567                .counter
568                .expect_set()
569                .times(1)
570                .returning(|_, _, _| Box::pin(async { Ok(()) }));
571
572            // Mock partial_update for reset_transaction_for_retry - should reset to Pending
573            mocks
574                .tx_repo
575                .expect_partial_update()
576                .withf(|_, upd| upd.status == Some(TransactionStatus::Pending))
577                .times(1)
578                .returning(|id, upd| {
579                    let mut tx = create_test_transaction("relayer-1");
580                    tx.id = id;
581                    tx.status = upd.status.unwrap();
582                    if let Some(network_data) = upd.network_data {
583                        tx.network_data = network_data;
584                    }
585                    Ok::<_, RepositoryError>(tx)
586                });
587
588            // Mock produce_transaction_request_job for re-enqueue
589            mocks
590                .job_producer
591                .expect_produce_transaction_request_job()
592                .times(1)
593                .returning(|_, _| Box::pin(async { Ok(()) }));
594
595            let handler = make_stellar_tx_handler(relayer.clone(), mocks);
596            let mut tx = create_test_transaction(&relayer.id);
597            if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
598                data.signatures.push(dummy_signature());
599                data.sequence_number = Some(42);
600            }
601
602            let result = handler.submit_transaction_impl(tx).await;
603
604            // Should return Ok since we're handling the retry
605            assert!(result.is_ok());
606            let reset_tx = result.unwrap();
607            assert_eq!(reset_tx.status, TransactionStatus::Pending);
608
609            // Verify stellar data was reset
610            if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
611                assert!(data.sequence_number.is_none());
612                assert!(data.signatures.is_empty());
613                assert!(data.hash.is_none());
614                assert!(data.signed_envelope_xdr.is_none());
615            } else {
616                panic!("Expected Stellar transaction data");
617            }
618        }
619    }
620}