openzeppelin_relayer/domain/transaction/stellar/
submit.rs1use chrono::Utc;
6use tracing::{info, warn};
7
8use super::{utils::is_bad_sequence_error, StellarRelayerTransaction};
9use crate::{
10 constants::{STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS, STELLAR_STATUS_CHECK_JOB_DELAY_SECONDS},
11 jobs::{JobProducerTrait, TransactionStatusCheck},
12 models::{
13 NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
14 TransactionStatus, TransactionUpdateRequest,
15 },
16 repositories::{Repository, TransactionCounterTrait, TransactionRepository},
17 services::{Signer, StellarProviderTrait},
18 utils::calculate_scheduled_timestamp,
19};
20
21impl<R, T, J, S, P, C> StellarRelayerTransaction<R, T, J, S, P, C>
22where
23 R: Repository<RelayerRepoModel, String> + Send + Sync,
24 T: TransactionRepository + Send + Sync,
25 J: JobProducerTrait + Send + Sync,
26 S: Signer + Send + Sync,
27 P: StellarProviderTrait + Send + Sync,
28 C: TransactionCounterTrait + Send + Sync,
29{
30 pub async fn submit_transaction_impl(
33 &self,
34 tx: TransactionRepoModel,
35 ) -> Result<TransactionRepoModel, TransactionError> {
36 info!("submitting stellar transaction");
37
38 match self.submit_core(tx.clone()).await {
40 Ok(submitted_tx) => Ok(submitted_tx),
41 Err(error) => {
42 self.handle_submit_failure(tx, error).await
44 }
45 }
46 }
47
48 async fn submit_core(
50 &self,
51 tx: TransactionRepoModel,
52 ) -> Result<TransactionRepoModel, TransactionError> {
53 let stellar_data = tx.network_data.get_stellar_transaction_data()?;
54 let tx_envelope = stellar_data
55 .get_envelope_for_submission()
56 .map_err(TransactionError::from)?;
57
58 let hash = self
59 .provider()
60 .send_transaction(&tx_envelope)
61 .await
62 .map_err(TransactionError::from)?;
63
64 let tx_hash_hex = hex::encode(hash.as_slice());
65 let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
66
67 let mut hashes = tx.hashes.clone();
68 hashes.push(tx_hash_hex);
69
70 let update_req = TransactionUpdateRequest {
71 status: Some(TransactionStatus::Submitted),
72 sent_at: Some(Utc::now().to_rfc3339()),
73 network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
74 hashes: Some(hashes),
75 ..Default::default()
76 };
77
78 let updated_tx = self
79 .transaction_repository()
80 .partial_update(tx.id.clone(), update_req)
81 .await?;
82
83 self.job_producer()
85 .produce_check_transaction_status_job(
86 TransactionStatusCheck::new(updated_tx.id.clone(), updated_tx.relayer_id.clone()),
87 Some(calculate_scheduled_timestamp(
88 STELLAR_STATUS_CHECK_JOB_DELAY_SECONDS,
89 )),
90 )
91 .await?;
92
93 self.send_transaction_update_notification(&updated_tx)
95 .await?;
96
97 Ok(updated_tx)
98 }
99
100 async fn handle_submit_failure(
103 &self,
104 tx: TransactionRepoModel,
105 error: TransactionError,
106 ) -> Result<TransactionRepoModel, TransactionError> {
107 let error_reason = format!("Submission failed: {}", error);
108 let tx_id = tx.id.clone();
109 warn!(reason = %error_reason, "transaction submission failed");
110
111 if is_bad_sequence_error(&error_reason) {
112 if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() {
114 info!("syncing sequence from chain after bad sequence error");
115 match self
116 .sync_sequence_from_chain(&stellar_data.source_account)
117 .await
118 {
119 Ok(()) => {
120 info!("successfully synced sequence from chain");
121 }
122 Err(sync_error) => {
123 warn!(error = %sync_error, "failed to sync sequence from chain");
124 }
125 }
126 }
127
128 info!("bad sequence error detected, resetting and re-enqueueing");
130
131 match self.reset_transaction_for_retry(tx.clone()).await {
133 Ok(reset_tx) => {
134 if let Err(e) = self
136 .send_transaction_request_job(
137 &reset_tx,
138 Some(calculate_scheduled_timestamp(
139 STELLAR_BAD_SEQUENCE_RETRY_DELAY_SECONDS,
140 )),
141 )
142 .await
143 {
144 warn!(error = %e, "failed to re-enqueue transaction after reset");
145 } else {
146 info!("transaction reset and re-enqueued for retry through pipeline");
147 }
148
149 return Ok(reset_tx);
151 }
152 Err(reset_error) => {
153 warn!(error = %reset_error, "failed to reset transaction for retry");
154 }
156 }
157 }
158
159 let update_request = TransactionUpdateRequest {
162 status: Some(TransactionStatus::Failed),
163 status_reason: Some(error_reason.clone()),
164 ..Default::default()
165 };
166 let _failed_tx = match self
167 .finalize_transaction_state(tx_id.clone(), update_request)
168 .await
169 {
170 Ok(updated_tx) => updated_tx,
171 Err(finalize_error) => {
172 warn!(error = %finalize_error, "failed to mark transaction as failed, continuing with lane cleanup");
173 tx
174 }
175 };
176
177 if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
179 warn!(error = %enqueue_error, "failed to enqueue next pending transaction after submission failure");
180 }
181
182 info!(error = %error_reason, "transaction submission failure handled");
183
184 Err(error)
185 }
186
187 pub async fn resubmit_transaction_impl(
189 &self,
190 tx: TransactionRepoModel,
191 ) -> Result<TransactionRepoModel, TransactionError> {
192 self.submit_transaction_impl(tx).await
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use soroban_rs::xdr::{Hash, WriteXdr};
200
201 use crate::domain::transaction::stellar::test_helpers::*;
202
203 mod submit_transaction_tests {
204 use crate::models::RepositoryError;
205
206 use super::*;
207
208 #[tokio::test]
209 async fn submit_transaction_happy_path() {
210 let relayer = create_test_relayer();
211 let mut mocks = default_test_mocks();
212
213 mocks
215 .provider
216 .expect_send_transaction()
217 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
218
219 mocks
221 .tx_repo
222 .expect_partial_update()
223 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
224 .returning(|id, upd| {
225 let mut tx = create_test_transaction("relayer-1");
226 tx.id = id;
227 tx.status = upd.status.unwrap();
228 Ok::<_, RepositoryError>(tx)
229 });
230
231 mocks
233 .job_producer
234 .expect_produce_check_transaction_status_job()
235 .times(1)
236 .returning(|_, _| Box::pin(async { Ok(()) }));
237 mocks
238 .job_producer
239 .expect_produce_send_notification_job()
240 .times(1)
241 .returning(|_, _| Box::pin(async { Ok(()) }));
242
243 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
244
245 let mut tx = create_test_transaction(&relayer.id);
246 if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
247 d.signatures.push(dummy_signature());
248 }
249
250 let res = handler.submit_transaction_impl(tx).await.unwrap();
251 assert_eq!(res.status, TransactionStatus::Submitted);
252 }
253
254 #[tokio::test]
255 async fn submit_transaction_provider_error_marks_failed() {
256 let relayer = create_test_relayer();
257 let mut mocks = default_test_mocks();
258
259 mocks
261 .provider
262 .expect_send_transaction()
263 .returning(|_| Box::pin(async { Err(eyre::eyre!("Network error")) }));
264
265 mocks
267 .tx_repo
268 .expect_partial_update()
269 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
270 .returning(|id, upd| {
271 let mut tx = create_test_transaction("relayer-1");
272 tx.id = id;
273 tx.status = upd.status.unwrap();
274 Ok::<_, RepositoryError>(tx)
275 });
276
277 mocks
279 .job_producer
280 .expect_produce_send_notification_job()
281 .times(1)
282 .returning(|_, _| Box::pin(async { Ok(()) }));
283
284 mocks
286 .tx_repo
287 .expect_find_by_status()
288 .returning(|_, _| Ok(vec![])); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
291 let mut tx = create_test_transaction(&relayer.id);
292 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
293 data.signatures.push(dummy_signature());
294 data.sequence_number = Some(42); }
296
297 let res = handler.submit_transaction_impl(tx).await;
298
299 assert!(res.is_err());
301 matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
302 }
303
304 #[tokio::test]
305 async fn submit_transaction_repository_error_marks_failed() {
306 let relayer = create_test_relayer();
307 let mut mocks = default_test_mocks();
308
309 mocks
311 .provider
312 .expect_send_transaction()
313 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
314
315 mocks
317 .tx_repo
318 .expect_partial_update()
319 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
320 .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
321
322 mocks
324 .tx_repo
325 .expect_partial_update()
326 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
327 .returning(|id, upd| {
328 let mut tx = create_test_transaction("relayer-1");
329 tx.id = id;
330 tx.status = upd.status.unwrap();
331 Ok::<_, RepositoryError>(tx)
332 });
333
334 mocks
336 .job_producer
337 .expect_produce_send_notification_job()
338 .times(1)
339 .returning(|_, _| Box::pin(async { Ok(()) }));
340
341 mocks
343 .tx_repo
344 .expect_find_by_status()
345 .returning(|_, _| Ok(vec![])); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
348 let mut tx = create_test_transaction(&relayer.id);
349 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
350 data.signatures.push(dummy_signature());
351 data.sequence_number = Some(42); }
353
354 let res = handler.submit_transaction_impl(tx).await;
355
356 assert!(res.is_err());
358 }
359
360 #[tokio::test]
361 async fn submit_transaction_uses_signed_envelope_xdr() {
362 let relayer = create_test_relayer();
363 let mut mocks = default_test_mocks();
364
365 let mut tx = create_test_transaction(&relayer.id);
367 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
368 data.signatures.push(dummy_signature());
369 let envelope = data.get_envelope_for_submission().unwrap();
371 let xdr = envelope
372 .to_xdr_base64(soroban_rs::xdr::Limits::none())
373 .unwrap();
374 data.signed_envelope_xdr = Some(xdr);
375 }
376
377 mocks
379 .provider
380 .expect_send_transaction()
381 .returning(|_| Box::pin(async { Ok(Hash([2u8; 32])) }));
382
383 mocks
385 .tx_repo
386 .expect_partial_update()
387 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
388 .returning(|id, upd| {
389 let mut tx = create_test_transaction("relayer-1");
390 tx.id = id;
391 tx.status = upd.status.unwrap();
392 Ok::<_, RepositoryError>(tx)
393 });
394
395 mocks
397 .job_producer
398 .expect_produce_check_transaction_status_job()
399 .times(1)
400 .returning(|_, _| Box::pin(async { Ok(()) }));
401 mocks
402 .job_producer
403 .expect_produce_send_notification_job()
404 .times(1)
405 .returning(|_, _| Box::pin(async { Ok(()) }));
406
407 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
408 let res = handler.submit_transaction_impl(tx).await.unwrap();
409
410 assert_eq!(res.status, TransactionStatus::Submitted);
411 }
412
413 #[tokio::test]
414 async fn resubmit_transaction_delegates_to_submit() {
415 let relayer = create_test_relayer();
416 let mut mocks = default_test_mocks();
417
418 mocks
420 .provider
421 .expect_send_transaction()
422 .returning(|_| Box::pin(async { Ok(Hash([1u8; 32])) }));
423
424 mocks
426 .tx_repo
427 .expect_partial_update()
428 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
429 .returning(|id, upd| {
430 let mut tx = create_test_transaction("relayer-1");
431 tx.id = id;
432 tx.status = upd.status.unwrap();
433 Ok::<_, RepositoryError>(tx)
434 });
435
436 mocks
438 .job_producer
439 .expect_produce_check_transaction_status_job()
440 .times(1)
441 .returning(|_, _| Box::pin(async { Ok(()) }));
442 mocks
443 .job_producer
444 .expect_produce_send_notification_job()
445 .times(1)
446 .returning(|_, _| Box::pin(async { Ok(()) }));
447
448 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
449
450 let mut tx = create_test_transaction(&relayer.id);
451 if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
452 d.signatures.push(dummy_signature());
453 }
454
455 let res = handler.resubmit_transaction_impl(tx).await.unwrap();
456 assert_eq!(res.status, TransactionStatus::Submitted);
457 }
458
459 #[tokio::test]
460 async fn submit_transaction_failure_enqueues_next_transaction() {
461 let relayer = create_test_relayer();
462 let mut mocks = default_test_mocks();
463
464 mocks
466 .provider
467 .expect_send_transaction()
468 .returning(|_| Box::pin(async { Err(eyre::eyre!("Network error")) }));
469
470 mocks
474 .tx_repo
475 .expect_partial_update()
476 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
477 .returning(|id, upd| {
478 let mut tx = create_test_transaction("relayer-1");
479 tx.id = id;
480 tx.status = upd.status.unwrap();
481 Ok::<_, RepositoryError>(tx)
482 });
483
484 mocks
486 .job_producer
487 .expect_produce_send_notification_job()
488 .times(1)
489 .returning(|_, _| Box::pin(async { Ok(()) }));
490
491 let mut pending_tx = create_test_transaction(&relayer.id);
493 pending_tx.id = "next-pending-tx".to_string();
494 pending_tx.status = TransactionStatus::Pending;
495 let captured_pending_tx = pending_tx.clone();
496 mocks
497 .tx_repo
498 .expect_find_by_status()
499 .with(
500 mockall::predicate::eq(relayer.id.clone()),
501 mockall::predicate::eq(vec![TransactionStatus::Pending]),
502 )
503 .times(1)
504 .returning(move |_, _| Ok(vec![captured_pending_tx.clone()]));
505
506 mocks
508 .job_producer
509 .expect_produce_transaction_request_job()
510 .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
511 .times(1)
512 .returning(|_, _| Box::pin(async { Ok(()) }));
513
514 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
515 let mut tx = create_test_transaction(&relayer.id);
516 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
517 data.signatures.push(dummy_signature());
518 data.sequence_number = Some(42); }
520
521 let res = handler.submit_transaction_impl(tx).await;
522
523 assert!(res.is_err());
525 matches!(res.unwrap_err(), TransactionError::UnexpectedError(_));
526 }
527
528 #[tokio::test]
529 async fn test_submit_bad_sequence_resets_and_retries() {
530 let relayer = create_test_relayer();
531 let mut mocks = default_test_mocks();
532
533 mocks.provider.expect_send_transaction().returning(|_| {
535 Box::pin(async { Err(eyre::eyre!("transaction submission failed: TxBadSeq")) })
536 });
537
538 mocks.provider.expect_get_account().times(1).returning(|_| {
540 Box::pin(async {
541 use soroban_rs::xdr::{
542 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
543 String32, Thresholds, Uint256,
544 };
545 use stellar_strkey::ed25519;
546
547 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
548 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
549
550 Ok(AccountEntry {
551 account_id,
552 balance: 1000000,
553 seq_num: SequenceNumber(100),
554 num_sub_entries: 0,
555 inflation_dest: None,
556 flags: 0,
557 home_domain: String32::default(),
558 thresholds: Thresholds([1, 1, 1, 1]),
559 signers: Default::default(),
560 ext: AccountEntryExt::V0,
561 })
562 })
563 });
564
565 mocks
567 .counter
568 .expect_set()
569 .times(1)
570 .returning(|_, _, _| Box::pin(async { Ok(()) }));
571
572 mocks
574 .tx_repo
575 .expect_partial_update()
576 .withf(|_, upd| upd.status == Some(TransactionStatus::Pending))
577 .times(1)
578 .returning(|id, upd| {
579 let mut tx = create_test_transaction("relayer-1");
580 tx.id = id;
581 tx.status = upd.status.unwrap();
582 if let Some(network_data) = upd.network_data {
583 tx.network_data = network_data;
584 }
585 Ok::<_, RepositoryError>(tx)
586 });
587
588 mocks
590 .job_producer
591 .expect_produce_transaction_request_job()
592 .times(1)
593 .returning(|_, _| Box::pin(async { Ok(()) }));
594
595 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
596 let mut tx = create_test_transaction(&relayer.id);
597 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
598 data.signatures.push(dummy_signature());
599 data.sequence_number = Some(42);
600 }
601
602 let result = handler.submit_transaction_impl(tx).await;
603
604 assert!(result.is_ok());
606 let reset_tx = result.unwrap();
607 assert_eq!(reset_tx.status, TransactionStatus::Pending);
608
609 if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
611 assert!(data.sequence_number.is_none());
612 assert!(data.signatures.is_empty());
613 assert!(data.hash.is_none());
614 assert!(data.signed_envelope_xdr.is_none());
615 } else {
616 panic!("Expected Stellar transaction data");
617 }
618 }
619 }
620}