1use crate::{
24 constants::STELLAR_SMALLEST_UNIT_NAME,
25 domain::{
26 transaction::stellar::fetch_next_sequence_from_chain, BalanceResponse, SignDataRequest,
27 SignDataResponse, SignTransactionExternalResponse, SignTransactionExternalResponseStellar,
28 SignTransactionRequest, SignTypedDataRequest,
29 },
30 jobs::{JobProducerTrait, RelayerHealthCheck, TransactionRequest},
31 models::{
32 produce_relayer_disabled_payload, DeletePendingTransactionsResponse, DisabledReason,
33 HealthCheckFailure, JsonRpcRequest, JsonRpcResponse, NetworkRepoModel, NetworkRpcRequest,
34 NetworkRpcResult, NetworkTransactionRequest, NetworkType, RelayerRepoModel, RelayerStatus,
35 RepositoryError, StellarNetwork, StellarRpcResult, TransactionRepoModel, TransactionStatus,
36 },
37 repositories::{NetworkRepository, RelayerRepository, Repository, TransactionRepository},
38 services::{
39 StellarProvider, StellarProviderTrait, StellarSignTrait, StellarSigner,
40 TransactionCounterService, TransactionCounterServiceTrait,
41 },
42 utils::calculate_scheduled_timestamp,
43};
44use async_trait::async_trait;
45use eyre::Result;
46use std::sync::Arc;
47use tracing::{debug, info, warn};
48
49use crate::domain::relayer::{Relayer, RelayerError};
50
51pub struct StellarRelayerDependencies<RR, NR, TR, J, TCS>
53where
54 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
55 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
56 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
57 J: JobProducerTrait + Send + Sync + 'static,
58 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
59{
60 pub relayer_repository: Arc<RR>,
61 pub network_repository: Arc<NR>,
62 pub transaction_repository: Arc<TR>,
63 pub transaction_counter_service: Arc<TCS>,
64 pub job_producer: Arc<J>,
65}
66
67impl<RR, NR, TR, J, TCS> StellarRelayerDependencies<RR, NR, TR, J, TCS>
68where
69 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
70 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
71 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
72 J: JobProducerTrait + Send + Sync,
73 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
74{
75 pub fn new(
89 relayer_repository: Arc<RR>,
90 network_repository: Arc<NR>,
91 transaction_repository: Arc<TR>,
92 transaction_counter_service: Arc<TCS>,
93 job_producer: Arc<J>,
94 ) -> Self {
95 Self {
96 relayer_repository,
97 network_repository,
98 transaction_repository,
99 transaction_counter_service,
100 job_producer,
101 }
102 }
103}
104
105#[allow(dead_code)]
106pub struct StellarRelayer<P, RR, NR, TR, J, TCS, S>
107where
108 P: StellarProviderTrait + Send + Sync,
109 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
110 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
111 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
112 J: JobProducerTrait + Send + Sync + 'static,
113 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
114 S: StellarSignTrait + Send + Sync + 'static,
115{
116 relayer: RelayerRepoModel,
117 signer: S,
118 network: StellarNetwork,
119 provider: P,
120 relayer_repository: Arc<RR>,
121 network_repository: Arc<NR>,
122 transaction_repository: Arc<TR>,
123 transaction_counter_service: Arc<TCS>,
124 job_producer: Arc<J>,
125}
126
127pub type DefaultStellarRelayer<J, TR, NR, RR, TCR> =
128 StellarRelayer<StellarProvider, RR, NR, TR, J, TransactionCounterService<TCR>, StellarSigner>;
129
130impl<P, RR, NR, TR, J, TCS, S> StellarRelayer<P, RR, NR, TR, J, TCS, S>
131where
132 P: StellarProviderTrait + Send + Sync,
133 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
134 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
135 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
136 J: JobProducerTrait + Send + Sync + 'static,
137 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
138 S: StellarSignTrait + Send + Sync + 'static,
139{
140 #[allow(clippy::too_many_arguments)]
158 pub async fn new(
159 relayer: RelayerRepoModel,
160 signer: S,
161 provider: P,
162 dependencies: StellarRelayerDependencies<RR, NR, TR, J, TCS>,
163 ) -> Result<Self, RelayerError> {
164 let network_repo = dependencies
165 .network_repository
166 .get_by_name(NetworkType::Stellar, &relayer.network)
167 .await
168 .ok()
169 .flatten()
170 .ok_or_else(|| {
171 RelayerError::NetworkConfiguration(format!("Network {} not found", relayer.network))
172 })?;
173
174 let network = StellarNetwork::try_from(network_repo)?;
175
176 Ok(Self {
177 relayer,
178 signer,
179 network,
180 provider,
181 relayer_repository: dependencies.relayer_repository,
182 network_repository: dependencies.network_repository,
183 transaction_repository: dependencies.transaction_repository,
184 transaction_counter_service: dependencies.transaction_counter_service,
185 job_producer: dependencies.job_producer,
186 })
187 }
188
189 async fn sync_sequence(&self) -> Result<(), RelayerError> {
190 info!(
191 "Syncing sequence for relayer: {} ({})",
192 self.relayer.id, self.relayer.address
193 );
194
195 let next = fetch_next_sequence_from_chain(&self.provider, &self.relayer.address)
196 .await
197 .map_err(RelayerError::ProviderError)?;
198
199 info!(
200 "Setting next sequence {} for relayer {}",
201 next, self.relayer.id
202 );
203 self.transaction_counter_service
204 .set(next)
205 .await
206 .map_err(RelayerError::from)?;
207 Ok(())
208 }
209}
210
211#[async_trait]
212impl<P, RR, NR, TR, J, TCS, S> Relayer for StellarRelayer<P, RR, NR, TR, J, TCS, S>
213where
214 P: StellarProviderTrait + Send + Sync,
215 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
216 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
217 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
218 J: JobProducerTrait + Send + Sync + 'static,
219 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
220 S: StellarSignTrait + Send + Sync + 'static,
221{
222 async fn process_transaction_request(
223 &self,
224 network_transaction: NetworkTransactionRequest,
225 ) -> Result<TransactionRepoModel, RelayerError> {
226 let network_model = self
227 .network_repository
228 .get_by_name(NetworkType::Stellar, &self.relayer.network)
229 .await?
230 .ok_or_else(|| {
231 RelayerError::NetworkConfiguration(format!(
232 "Network {} not found",
233 self.relayer.network
234 ))
235 })?;
236 let transaction =
237 TransactionRepoModel::try_from((&network_transaction, &self.relayer, &network_model))?;
238
239 self.transaction_repository
240 .create(transaction.clone())
241 .await
242 .map_err(|e| RepositoryError::TransactionFailure(e.to_string()))?;
243
244 self.job_producer
245 .produce_transaction_request_job(
246 TransactionRequest::new(transaction.id.clone(), transaction.relayer_id.clone()),
247 None,
248 )
249 .await?;
250
251 Ok(transaction)
252 }
253
254 async fn get_balance(&self) -> Result<BalanceResponse, RelayerError> {
255 let account_entry = self
256 .provider
257 .get_account(&self.relayer.address)
258 .await
259 .map_err(|e| {
260 RelayerError::ProviderError(format!("Failed to fetch account for balance: {}", e))
261 })?;
262
263 Ok(BalanceResponse {
264 balance: account_entry.balance as u128,
265 unit: STELLAR_SMALLEST_UNIT_NAME.to_string(),
266 })
267 }
268
269 async fn get_status(&self) -> Result<RelayerStatus, RelayerError> {
270 let relayer_model = &self.relayer;
271
272 let account_entry = self
273 .provider
274 .get_account(&relayer_model.address)
275 .await
276 .map_err(|e| {
277 RelayerError::ProviderError(format!("Failed to get account details: {}", e))
278 })?;
279
280 let sequence_number_str = account_entry.seq_num.0.to_string();
281
282 let balance_response = self.get_balance().await?;
283
284 let pending_statuses = [TransactionStatus::Pending, TransactionStatus::Submitted];
285 let pending_transactions = self
286 .transaction_repository
287 .find_by_status(&relayer_model.id, &pending_statuses[..])
288 .await
289 .map_err(RelayerError::from)?;
290 let pending_transactions_count = pending_transactions.len() as u64;
291
292 let confirmed_statuses = [TransactionStatus::Confirmed];
293 let confirmed_transactions = self
294 .transaction_repository
295 .find_by_status(&relayer_model.id, &confirmed_statuses[..])
296 .await
297 .map_err(RelayerError::from)?;
298
299 let last_confirmed_transaction_timestamp = confirmed_transactions
300 .iter()
301 .filter_map(|tx| tx.confirmed_at.as_ref())
302 .max()
303 .cloned();
304
305 Ok(RelayerStatus::Stellar {
306 balance: balance_response.balance.to_string(),
307 pending_transactions_count,
308 last_confirmed_transaction_timestamp,
309 system_disabled: relayer_model.system_disabled,
310 paused: relayer_model.paused,
311 sequence_number: sequence_number_str,
312 })
313 }
314
315 async fn delete_pending_transactions(
316 &self,
317 ) -> Result<DeletePendingTransactionsResponse, RelayerError> {
318 println!("Stellar delete_pending_transactions...");
319 Ok(DeletePendingTransactionsResponse {
320 queued_for_cancellation_transaction_ids: vec![],
321 failed_to_queue_transaction_ids: vec![],
322 total_processed: 0,
323 })
324 }
325
326 async fn sign_data(&self, _request: SignDataRequest) -> Result<SignDataResponse, RelayerError> {
327 Err(RelayerError::NotSupported(
328 "Signing data not supported for Stellar".to_string(),
329 ))
330 }
331
332 async fn sign_typed_data(
333 &self,
334 _request: SignTypedDataRequest,
335 ) -> Result<SignDataResponse, RelayerError> {
336 Err(RelayerError::NotSupported(
337 "Signing typed data not supported for Stellar".to_string(),
338 ))
339 }
340
341 async fn rpc(
342 &self,
343 _request: JsonRpcRequest<NetworkRpcRequest>,
344 ) -> Result<JsonRpcResponse<NetworkRpcResult>, RelayerError> {
345 println!("Stellar rpc...");
346 Ok(JsonRpcResponse {
347 id: None,
348 jsonrpc: "2.0".to_string(),
349 result: Some(NetworkRpcResult::Stellar(
350 StellarRpcResult::GenericRpcResult("".to_string()),
351 )),
352 error: None,
353 })
354 }
355
356 async fn validate_min_balance(&self) -> Result<(), RelayerError> {
357 Ok(())
358 }
359
360 async fn check_health(&self) -> Result<(), Vec<HealthCheckFailure>> {
361 debug!(
362 "running health checks for Stellar relayer {}",
363 self.relayer.id
364 );
365
366 match self.sync_sequence().await {
367 Ok(_) => {
368 debug!(
369 "all health checks passed for Stellar relayer {}",
370 self.relayer.id
371 );
372 Ok(())
373 }
374 Err(e) => {
375 let reason = HealthCheckFailure::SequenceSyncFailed(e.to_string());
376 warn!("health checks failed: {:?}", reason);
377 Err(vec![reason])
378 }
379 }
380 }
381
382 async fn initialize_relayer(&self) -> Result<(), RelayerError> {
383 debug!("initializing Stellar relayer {}", self.relayer.id);
384
385 match self.check_health().await {
386 Ok(_) => {
387 if self.relayer.system_disabled {
389 self.relayer_repository
391 .enable_relayer(self.relayer.id.clone())
392 .await?;
393 }
394
395 info!(
396 "Stellar relayer initialized successfully: {}",
397 self.relayer.id
398 );
399 Ok(())
400 }
401 Err(failures) => {
402 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
404 DisabledReason::SequenceSyncFailed("Unknown error".to_string())
405 });
406
407 warn!(reason = %reason, "disabling relayer");
408 let updated_relayer = self
409 .relayer_repository
410 .disable_relayer(self.relayer.id.clone(), reason.clone())
411 .await?;
412
413 if let Some(notification_id) = &self.relayer.notification_id {
415 self.job_producer
416 .produce_send_notification_job(
417 produce_relayer_disabled_payload(
418 notification_id,
419 &updated_relayer,
420 &reason.safe_description(),
421 ),
422 None,
423 )
424 .await?;
425 }
426
427 self.job_producer
429 .produce_relayer_health_check_job(
430 RelayerHealthCheck::new(self.relayer.id.clone()),
431 Some(calculate_scheduled_timestamp(10)),
432 )
433 .await?;
434
435 Ok(())
436 }
437 }
438 }
439
440 async fn sign_transaction(
441 &self,
442 request: &SignTransactionRequest,
443 ) -> Result<SignTransactionExternalResponse, RelayerError> {
444 let stellar_req = match request {
445 SignTransactionRequest::Stellar(req) => req,
446 _ => {
447 return Err(RelayerError::NotSupported(
448 "Invalid request type for Stellar relayer".to_string(),
449 ))
450 }
451 };
452
453 let response = self
455 .signer
456 .sign_xdr_transaction(&stellar_req.unsigned_xdr, &self.network.passphrase)
457 .await
458 .map_err(RelayerError::SignerError)?;
459
460 let signature_bytes = &response.signature.signature.0;
462 let signature_string =
463 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, signature_bytes);
464
465 Ok(SignTransactionExternalResponse::Stellar(
466 SignTransactionExternalResponseStellar {
467 signed_xdr: response.signed_xdr,
468 signature: signature_string,
469 },
470 ))
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477 use crate::{
478 config::{NetworkConfigCommon, StellarNetworkConfig},
479 constants::STELLAR_SMALLEST_UNIT_NAME,
480 domain::{SignTransactionRequestStellar, SignXdrTransactionResponseStellar},
481 jobs::MockJobProducerTrait,
482 models::{
483 NetworkConfigData, NetworkRepoModel, NetworkType, RelayerNetworkPolicy,
484 RelayerRepoModel, RelayerStellarPolicy, SignerError,
485 },
486 repositories::{
487 InMemoryNetworkRepository, MockRelayerRepository, MockTransactionRepository,
488 },
489 services::{
490 MockStellarProviderTrait, MockStellarSignTrait, MockTransactionCounterServiceTrait,
491 },
492 };
493 use eyre::eyre;
494 use mockall::predicate::*;
495 use soroban_rs::xdr::{
496 AccountEntry, AccountEntryExt, AccountId, DecoratedSignature, PublicKey, SequenceNumber,
497 Signature, SignatureHint, String32, Thresholds, Uint256, VecM,
498 };
499 use std::future::ready;
500 use std::sync::Arc;
501
502 struct TestCtx {
504 relayer_model: RelayerRepoModel,
505 network_repository: Arc<InMemoryNetworkRepository>,
506 }
507
508 impl Default for TestCtx {
509 fn default() -> Self {
510 let network_repository = Arc::new(InMemoryNetworkRepository::new());
511
512 let relayer_model = RelayerRepoModel {
513 id: "test-relayer-id".to_string(),
514 name: "Test Relayer".to_string(),
515 network: "testnet".to_string(),
516 paused: false,
517 network_type: NetworkType::Stellar,
518 signer_id: "signer-id".to_string(),
519 policies: RelayerNetworkPolicy::Stellar(RelayerStellarPolicy::default()),
520 address: "GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF".to_string(),
521 notification_id: Some("notification-id".to_string()),
522 system_disabled: false,
523 custom_rpc_urls: None,
524 ..Default::default()
525 };
526
527 TestCtx {
528 relayer_model,
529 network_repository,
530 }
531 }
532 }
533
534 impl TestCtx {
535 async fn setup_network(&self) {
536 let test_network = NetworkRepoModel {
537 id: "stellar:testnet".to_string(),
538 name: "testnet".to_string(),
539 network_type: NetworkType::Stellar,
540 config: NetworkConfigData::Stellar(StellarNetworkConfig {
541 common: NetworkConfigCommon {
542 network: "testnet".to_string(),
543 from: None,
544 rpc_urls: Some(vec!["https://horizon-testnet.stellar.org".to_string()]),
545 explorer_urls: None,
546 average_blocktime_ms: Some(5000),
547 is_testnet: Some(true),
548 tags: None,
549 },
550 passphrase: Some("Test SDF Network ; September 2015".to_string()),
551 }),
552 };
553
554 self.network_repository.create(test_network).await.unwrap();
555 }
556 }
557
558 #[tokio::test]
559 async fn test_sync_sequence_success() {
560 let ctx = TestCtx::default();
561 ctx.setup_network().await;
562 let relayer_model = ctx.relayer_model.clone();
563 let mut provider = MockStellarProviderTrait::new();
564 provider
565 .expect_get_account()
566 .with(eq(relayer_model.address.clone()))
567 .returning(|_| {
568 Box::pin(async {
569 Ok(AccountEntry {
570 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
571 balance: 0,
572 ext: AccountEntryExt::V0,
573 flags: 0,
574 home_domain: String32::default(),
575 inflation_dest: None,
576 seq_num: SequenceNumber(5),
577 num_sub_entries: 0,
578 signers: VecM::default(),
579 thresholds: Thresholds([0, 0, 0, 0]),
580 })
581 })
582 });
583 let mut counter = MockTransactionCounterServiceTrait::new();
584 counter
585 .expect_set()
586 .with(eq(6u64))
587 .returning(|_| Box::pin(async { Ok(()) }));
588 let relayer_repo = MockRelayerRepository::new();
589 let tx_repo = MockTransactionRepository::new();
590 let job_producer = MockJobProducerTrait::new();
591 let signer = MockStellarSignTrait::new();
592
593 let relayer = StellarRelayer::new(
594 relayer_model.clone(),
595 signer,
596 provider,
597 StellarRelayerDependencies::new(
598 Arc::new(relayer_repo),
599 ctx.network_repository.clone(),
600 Arc::new(tx_repo),
601 Arc::new(counter),
602 Arc::new(job_producer),
603 ),
604 )
605 .await
606 .unwrap();
607
608 let result = relayer.sync_sequence().await;
609 assert!(result.is_ok());
610 }
611
612 #[tokio::test]
613 async fn test_sync_sequence_provider_error() {
614 let ctx = TestCtx::default();
615 ctx.setup_network().await;
616 let relayer_model = ctx.relayer_model.clone();
617 let mut provider = MockStellarProviderTrait::new();
618 provider
619 .expect_get_account()
620 .with(eq(relayer_model.address.clone()))
621 .returning(|_| Box::pin(async { Err(eyre!("fail")) }));
622 let counter = MockTransactionCounterServiceTrait::new();
623 let relayer_repo = MockRelayerRepository::new();
624 let tx_repo = MockTransactionRepository::new();
625 let job_producer = MockJobProducerTrait::new();
626 let signer = MockStellarSignTrait::new();
627
628 let relayer = StellarRelayer::new(
629 relayer_model.clone(),
630 signer,
631 provider,
632 StellarRelayerDependencies::new(
633 Arc::new(relayer_repo),
634 ctx.network_repository.clone(),
635 Arc::new(tx_repo),
636 Arc::new(counter),
637 Arc::new(job_producer),
638 ),
639 )
640 .await
641 .unwrap();
642
643 let result = relayer.sync_sequence().await;
644 assert!(matches!(result, Err(RelayerError::ProviderError(_))));
645 }
646
647 #[tokio::test]
648 async fn test_get_status_success_stellar() {
649 let ctx = TestCtx::default();
650 ctx.setup_network().await;
651 let relayer_model = ctx.relayer_model.clone();
652 let mut provider_mock = MockStellarProviderTrait::new();
653 let mut tx_repo_mock = MockTransactionRepository::new();
654 let relayer_repo_mock = MockRelayerRepository::new();
655 let job_producer_mock = MockJobProducerTrait::new();
656 let counter_mock = MockTransactionCounterServiceTrait::new();
657
658 provider_mock.expect_get_account().times(2).returning(|_| {
659 Box::pin(ready(Ok(AccountEntry {
660 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
661 balance: 10000000,
662 seq_num: SequenceNumber(12345),
663 ext: AccountEntryExt::V0,
664 flags: 0,
665 home_domain: String32::default(),
666 inflation_dest: None,
667 num_sub_entries: 0,
668 signers: VecM::default(),
669 thresholds: Thresholds([0, 0, 0, 0]),
670 })))
671 });
672
673 tx_repo_mock
674 .expect_find_by_status()
675 .withf(|relayer_id, statuses| {
676 relayer_id == "test-relayer-id"
677 && statuses == [TransactionStatus::Pending, TransactionStatus::Submitted]
678 })
679 .returning(|_, _| Ok(vec![]) as Result<Vec<TransactionRepoModel>, RepositoryError>)
680 .once();
681
682 let confirmed_tx = TransactionRepoModel {
683 id: "tx1_stellar".to_string(),
684 relayer_id: relayer_model.id.clone(),
685 status: TransactionStatus::Confirmed,
686 confirmed_at: Some("2023-02-01T12:00:00Z".to_string()),
687 ..TransactionRepoModel::default()
688 };
689 tx_repo_mock
690 .expect_find_by_status()
691 .withf(|relayer_id, statuses| {
692 relayer_id == "test-relayer-id" && statuses == [TransactionStatus::Confirmed]
693 })
694 .returning(move |_, _| {
695 Ok(vec![confirmed_tx.clone()]) as Result<Vec<TransactionRepoModel>, RepositoryError>
696 })
697 .once();
698 let signer = MockStellarSignTrait::new();
699
700 let stellar_relayer = StellarRelayer::new(
701 relayer_model.clone(),
702 signer,
703 provider_mock,
704 StellarRelayerDependencies::new(
705 Arc::new(relayer_repo_mock),
706 ctx.network_repository.clone(),
707 Arc::new(tx_repo_mock),
708 Arc::new(counter_mock),
709 Arc::new(job_producer_mock),
710 ),
711 )
712 .await
713 .unwrap();
714
715 let status = stellar_relayer.get_status().await.unwrap();
716
717 match status {
718 RelayerStatus::Stellar {
719 balance,
720 pending_transactions_count,
721 last_confirmed_transaction_timestamp,
722 system_disabled,
723 paused,
724 sequence_number,
725 } => {
726 assert_eq!(balance, "10000000");
727 assert_eq!(pending_transactions_count, 0);
728 assert_eq!(
729 last_confirmed_transaction_timestamp,
730 Some("2023-02-01T12:00:00Z".to_string())
731 );
732 assert_eq!(system_disabled, relayer_model.system_disabled);
733 assert_eq!(paused, relayer_model.paused);
734 assert_eq!(sequence_number, "12345");
735 }
736 _ => panic!("Expected Stellar RelayerStatus"),
737 }
738 }
739
740 #[tokio::test]
741 async fn test_get_status_stellar_provider_error() {
742 let ctx = TestCtx::default();
743 ctx.setup_network().await;
744 let relayer_model = ctx.relayer_model.clone();
745 let mut provider_mock = MockStellarProviderTrait::new();
746 let tx_repo_mock = MockTransactionRepository::new();
747 let relayer_repo_mock = MockRelayerRepository::new();
748 let job_producer_mock = MockJobProducerTrait::new();
749 let counter_mock = MockTransactionCounterServiceTrait::new();
750
751 provider_mock
752 .expect_get_account()
753 .with(eq(relayer_model.address.clone()))
754 .returning(|_| Box::pin(async { Err(eyre!("Stellar provider down")) }));
755 let signer = MockStellarSignTrait::new();
756
757 let stellar_relayer = StellarRelayer::new(
758 relayer_model.clone(),
759 signer,
760 provider_mock,
761 StellarRelayerDependencies::new(
762 Arc::new(relayer_repo_mock),
763 ctx.network_repository.clone(),
764 Arc::new(tx_repo_mock),
765 Arc::new(counter_mock),
766 Arc::new(job_producer_mock),
767 ),
768 )
769 .await
770 .unwrap();
771
772 let result = stellar_relayer.get_status().await;
773 assert!(result.is_err());
774 match result.err().unwrap() {
775 RelayerError::ProviderError(msg) => {
776 assert!(msg.contains("Failed to get account details"))
777 }
778 _ => panic!("Expected ProviderError for get_account failure"),
779 }
780 }
781
782 #[tokio::test]
783 async fn test_get_balance_success() {
784 let ctx = TestCtx::default();
785 ctx.setup_network().await;
786 let relayer_model = ctx.relayer_model.clone();
787 let mut provider = MockStellarProviderTrait::new();
788 let expected_balance = 100_000_000i64; provider
791 .expect_get_account()
792 .with(eq(relayer_model.address.clone()))
793 .returning(move |_| {
794 Box::pin(async move {
795 Ok(AccountEntry {
796 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
797 balance: expected_balance,
798 ext: AccountEntryExt::V0,
799 flags: 0,
800 home_domain: String32::default(),
801 inflation_dest: None,
802 seq_num: SequenceNumber(5),
803 num_sub_entries: 0,
804 signers: VecM::default(),
805 thresholds: Thresholds([0, 0, 0, 0]),
806 })
807 })
808 });
809
810 let relayer_repo = Arc::new(MockRelayerRepository::new());
811 let tx_repo = Arc::new(MockTransactionRepository::new());
812 let job_producer = Arc::new(MockJobProducerTrait::new());
813 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
814 let signer = MockStellarSignTrait::new();
815
816 let relayer = StellarRelayer::new(
817 relayer_model,
818 signer,
819 provider,
820 StellarRelayerDependencies::new(
821 relayer_repo,
822 ctx.network_repository.clone(),
823 tx_repo,
824 counter,
825 job_producer,
826 ),
827 )
828 .await
829 .unwrap();
830
831 let result = relayer.get_balance().await;
832 assert!(result.is_ok());
833 let balance_response = result.unwrap();
834 assert_eq!(balance_response.balance, expected_balance as u128);
835 assert_eq!(balance_response.unit, STELLAR_SMALLEST_UNIT_NAME);
836 }
837
838 #[tokio::test]
839 async fn test_get_balance_provider_error() {
840 let ctx = TestCtx::default();
841 ctx.setup_network().await;
842 let relayer_model = ctx.relayer_model.clone();
843 let mut provider = MockStellarProviderTrait::new();
844
845 provider
846 .expect_get_account()
847 .with(eq(relayer_model.address.clone()))
848 .returning(|_| Box::pin(async { Err(eyre!("provider failed")) }));
849
850 let relayer_repo = Arc::new(MockRelayerRepository::new());
851 let tx_repo = Arc::new(MockTransactionRepository::new());
852 let job_producer = Arc::new(MockJobProducerTrait::new());
853 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
854 let signer = MockStellarSignTrait::new();
855
856 let relayer = StellarRelayer::new(
857 relayer_model,
858 signer,
859 provider,
860 StellarRelayerDependencies::new(
861 relayer_repo,
862 ctx.network_repository.clone(),
863 tx_repo,
864 counter,
865 job_producer,
866 ),
867 )
868 .await
869 .unwrap();
870
871 let result = relayer.get_balance().await;
872 assert!(result.is_err());
873 match result.err().unwrap() {
874 RelayerError::ProviderError(msg) => {
875 assert!(msg.contains("Failed to fetch account for balance: provider failed"));
876 }
877 _ => panic!("Unexpected error type"),
878 }
879 }
880
881 #[tokio::test]
882 async fn test_sign_transaction_success() {
883 let ctx = TestCtx::default();
884 ctx.setup_network().await;
885 let relayer_model = ctx.relayer_model.clone();
886 let provider = MockStellarProviderTrait::new();
887 let mut signer = MockStellarSignTrait::new();
888
889 let unsigned_xdr = "AAAAAgAAAAD///8AAAAAAAAAAQAAAAAAAAACAAAAAQAAAAAAAAAB";
890 let expected_signed_xdr =
891 "AAAAAgAAAAD///8AAAAAAAABAAAAAAAAAAIAAAABAAAAAAAAAAEAAAABAAAAA...";
892 let expected_signature = DecoratedSignature {
893 hint: SignatureHint([1, 2, 3, 4]),
894 signature: Signature([5u8; 64].try_into().unwrap()),
895 };
896 let expected_signature_for_closure = expected_signature.clone();
897
898 signer
899 .expect_sign_xdr_transaction()
900 .with(eq(unsigned_xdr), eq("Test SDF Network ; September 2015"))
901 .returning(move |_, _| {
902 Ok(SignXdrTransactionResponseStellar {
903 signed_xdr: expected_signed_xdr.to_string(),
904 signature: expected_signature_for_closure.clone(),
905 })
906 });
907
908 let relayer_repo = Arc::new(MockRelayerRepository::new());
909 let tx_repo = Arc::new(MockTransactionRepository::new());
910 let job_producer = Arc::new(MockJobProducerTrait::new());
911 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
912
913 let relayer = StellarRelayer::new(
914 relayer_model,
915 signer,
916 provider,
917 StellarRelayerDependencies::new(
918 relayer_repo,
919 ctx.network_repository.clone(),
920 tx_repo,
921 counter,
922 job_producer,
923 ),
924 )
925 .await
926 .unwrap();
927
928 let request = SignTransactionRequest::Stellar(SignTransactionRequestStellar {
929 unsigned_xdr: unsigned_xdr.to_string(),
930 });
931 let result = relayer.sign_transaction(&request).await;
932 assert!(result.is_ok());
933
934 match result.unwrap() {
935 SignTransactionExternalResponse::Stellar(response) => {
936 assert_eq!(response.signed_xdr, expected_signed_xdr);
937 let expected_signature_base64 = base64::Engine::encode(
939 &base64::engine::general_purpose::STANDARD,
940 &expected_signature.signature.0,
941 );
942 assert_eq!(response.signature, expected_signature_base64);
943 }
944 _ => panic!("Expected Stellar response"),
945 }
946 }
947
948 #[tokio::test]
949 async fn test_sign_transaction_signer_error() {
950 let ctx = TestCtx::default();
951 ctx.setup_network().await;
952 let relayer_model = ctx.relayer_model.clone();
953 let provider = MockStellarProviderTrait::new();
954 let mut signer = MockStellarSignTrait::new();
955
956 let unsigned_xdr = "INVALID_XDR";
957
958 signer
959 .expect_sign_xdr_transaction()
960 .with(eq(unsigned_xdr), eq("Test SDF Network ; September 2015"))
961 .returning(|_, _| Err(SignerError::SigningError("Invalid XDR format".to_string())));
962
963 let relayer_repo = Arc::new(MockRelayerRepository::new());
964 let tx_repo = Arc::new(MockTransactionRepository::new());
965 let job_producer = Arc::new(MockJobProducerTrait::new());
966 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
967
968 let relayer = StellarRelayer::new(
969 relayer_model,
970 signer,
971 provider,
972 StellarRelayerDependencies::new(
973 relayer_repo,
974 ctx.network_repository.clone(),
975 tx_repo,
976 counter,
977 job_producer,
978 ),
979 )
980 .await
981 .unwrap();
982
983 let request = SignTransactionRequest::Stellar(SignTransactionRequestStellar {
984 unsigned_xdr: unsigned_xdr.to_string(),
985 });
986 let result = relayer.sign_transaction(&request).await;
987 assert!(result.is_err());
988
989 match result.err().unwrap() {
990 RelayerError::SignerError(err) => match err {
991 SignerError::SigningError(msg) => {
992 assert_eq!(msg, "Invalid XDR format");
993 }
994 _ => panic!("Expected SigningError"),
995 },
996 _ => panic!("Expected RelayerError::SignerError"),
997 }
998 }
999
1000 #[tokio::test]
1001 async fn test_sign_transaction_with_different_network_passphrase() {
1002 let ctx = TestCtx::default();
1003 let custom_network = NetworkRepoModel {
1005 id: "stellar:mainnet".to_string(),
1006 name: "mainnet".to_string(),
1007 network_type: NetworkType::Stellar,
1008 config: NetworkConfigData::Stellar(StellarNetworkConfig {
1009 common: NetworkConfigCommon {
1010 network: "mainnet".to_string(),
1011 from: None,
1012 rpc_urls: Some(vec!["https://horizon.stellar.org".to_string()]),
1013 explorer_urls: None,
1014 average_blocktime_ms: Some(5000),
1015 is_testnet: Some(false),
1016 tags: None,
1017 },
1018 passphrase: Some("Public Global Stellar Network ; September 2015".to_string()),
1019 }),
1020 };
1021 ctx.network_repository.create(custom_network).await.unwrap();
1022
1023 let mut relayer_model = ctx.relayer_model.clone();
1024 relayer_model.network = "mainnet".to_string();
1025
1026 let provider = MockStellarProviderTrait::new();
1027 let mut signer = MockStellarSignTrait::new();
1028
1029 let unsigned_xdr = "AAAAAgAAAAD///8AAAAAAAAAAQAAAAAAAAACAAAAAQAAAAAAAAAB";
1030 let expected_signature = DecoratedSignature {
1031 hint: SignatureHint([10, 20, 30, 40]),
1032 signature: Signature([15u8; 64].try_into().unwrap()),
1033 };
1034 let expected_signature_for_closure = expected_signature.clone();
1035
1036 signer
1037 .expect_sign_xdr_transaction()
1038 .with(
1039 eq(unsigned_xdr),
1040 eq("Public Global Stellar Network ; September 2015"),
1041 )
1042 .returning(move |_, _| {
1043 Ok(SignXdrTransactionResponseStellar {
1044 signed_xdr: "mainnet_signed_xdr".to_string(),
1045 signature: expected_signature_for_closure.clone(),
1046 })
1047 });
1048
1049 let relayer_repo = Arc::new(MockRelayerRepository::new());
1050 let tx_repo = Arc::new(MockTransactionRepository::new());
1051 let job_producer = Arc::new(MockJobProducerTrait::new());
1052 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
1053
1054 let relayer = StellarRelayer::new(
1055 relayer_model,
1056 signer,
1057 provider,
1058 StellarRelayerDependencies::new(
1059 relayer_repo,
1060 ctx.network_repository.clone(),
1061 tx_repo,
1062 counter,
1063 job_producer,
1064 ),
1065 )
1066 .await
1067 .unwrap();
1068
1069 let request = SignTransactionRequest::Stellar(SignTransactionRequestStellar {
1070 unsigned_xdr: unsigned_xdr.to_string(),
1071 });
1072 let result = relayer.sign_transaction(&request).await;
1073 assert!(result.is_ok());
1074
1075 match result.unwrap() {
1076 SignTransactionExternalResponse::Stellar(response) => {
1077 assert_eq!(response.signed_xdr, "mainnet_signed_xdr");
1078 let expected_signature_string = base64::Engine::encode(
1080 &base64::engine::general_purpose::STANDARD,
1081 &expected_signature.signature.0,
1082 );
1083 assert_eq!(response.signature, expected_signature_string);
1084 }
1085 _ => panic!("Expected Stellar response"),
1086 }
1087 }
1088
1089 #[tokio::test]
1090 async fn test_initialize_relayer_disables_when_validation_fails() {
1091 let ctx = TestCtx::default();
1092 ctx.setup_network().await;
1093 let mut relayer_model = ctx.relayer_model.clone();
1094 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
1096
1097 let mut provider = MockStellarProviderTrait::new();
1098 let mut relayer_repo = MockRelayerRepository::new();
1099 let mut job_producer = MockJobProducerTrait::new();
1100
1101 provider
1103 .expect_get_account()
1104 .returning(|_| Box::pin(ready(Err(eyre!("RPC error")))));
1105
1106 let mut disabled_relayer = relayer_model.clone();
1108 disabled_relayer.system_disabled = true;
1109 relayer_repo
1110 .expect_disable_relayer()
1111 .withf(|id, reason| {
1112 id == "test-relayer-id"
1113 && matches!(reason, crate::models::DisabledReason::SequenceSyncFailed(_))
1114 })
1115 .returning(move |_, _| Ok(disabled_relayer.clone()));
1116
1117 job_producer
1119 .expect_produce_send_notification_job()
1120 .returning(|_, _| Box::pin(async { Ok(()) }));
1121
1122 job_producer
1124 .expect_produce_relayer_health_check_job()
1125 .returning(|_, _| Box::pin(async { Ok(()) }));
1126
1127 let tx_repo = MockTransactionRepository::new();
1128 let counter = MockTransactionCounterServiceTrait::new();
1129 let signer = MockStellarSignTrait::new();
1130
1131 let relayer = StellarRelayer::new(
1132 relayer_model.clone(),
1133 signer,
1134 provider,
1135 StellarRelayerDependencies::new(
1136 Arc::new(relayer_repo),
1137 ctx.network_repository.clone(),
1138 Arc::new(tx_repo),
1139 Arc::new(counter),
1140 Arc::new(job_producer),
1141 ),
1142 )
1143 .await
1144 .unwrap();
1145
1146 let result = relayer.initialize_relayer().await;
1147 assert!(result.is_ok());
1148 }
1149
1150 #[tokio::test]
1151 async fn test_initialize_relayer_enables_when_validation_passes_and_was_disabled() {
1152 let ctx = TestCtx::default();
1153 ctx.setup_network().await;
1154 let mut relayer_model = ctx.relayer_model.clone();
1155 relayer_model.system_disabled = true; let mut provider = MockStellarProviderTrait::new();
1158 let mut relayer_repo = MockRelayerRepository::new();
1159
1160 provider.expect_get_account().returning(|_| {
1162 Box::pin(ready(Ok(AccountEntry {
1163 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1164 balance: 1000000000, seq_num: SequenceNumber(1),
1166 num_sub_entries: 0,
1167 inflation_dest: None,
1168 flags: 0,
1169 home_domain: String32::default(),
1170 thresholds: Thresholds([0; 4]),
1171 signers: VecM::default(),
1172 ext: AccountEntryExt::V0,
1173 })))
1174 });
1175
1176 let mut enabled_relayer = relayer_model.clone();
1178 enabled_relayer.system_disabled = false;
1179 relayer_repo
1180 .expect_enable_relayer()
1181 .with(eq("test-relayer-id".to_string()))
1182 .returning(move |_| Ok(enabled_relayer.clone()));
1183
1184 let tx_repo = MockTransactionRepository::new();
1185 let mut counter = MockTransactionCounterServiceTrait::new();
1186 counter
1187 .expect_set()
1188 .returning(|_| Box::pin(async { Ok(()) }));
1189 let signer = MockStellarSignTrait::new();
1190 let job_producer = MockJobProducerTrait::new();
1191
1192 let relayer = StellarRelayer::new(
1193 relayer_model.clone(),
1194 signer,
1195 provider,
1196 StellarRelayerDependencies::new(
1197 Arc::new(relayer_repo),
1198 ctx.network_repository.clone(),
1199 Arc::new(tx_repo),
1200 Arc::new(counter),
1201 Arc::new(job_producer),
1202 ),
1203 )
1204 .await
1205 .unwrap();
1206
1207 let result = relayer.initialize_relayer().await;
1208 assert!(result.is_ok());
1209 }
1210
1211 #[tokio::test]
1212 async fn test_initialize_relayer_no_action_when_enabled_and_validation_passes() {
1213 let ctx = TestCtx::default();
1214 ctx.setup_network().await;
1215 let mut relayer_model = ctx.relayer_model.clone();
1216 relayer_model.system_disabled = false; let mut provider = MockStellarProviderTrait::new();
1219
1220 provider.expect_get_account().returning(|_| {
1222 Box::pin(ready(Ok(AccountEntry {
1223 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1224 balance: 1000000000, seq_num: SequenceNumber(1),
1226 num_sub_entries: 0,
1227 inflation_dest: None,
1228 flags: 0,
1229 home_domain: String32::default(),
1230 thresholds: Thresholds([0; 4]),
1231 signers: VecM::default(),
1232 ext: AccountEntryExt::V0,
1233 })))
1234 });
1235
1236 let tx_repo = MockTransactionRepository::new();
1239 let mut counter = MockTransactionCounterServiceTrait::new();
1240 counter
1241 .expect_set()
1242 .returning(|_| Box::pin(async { Ok(()) }));
1243 let signer = MockStellarSignTrait::new();
1244 let job_producer = MockJobProducerTrait::new();
1245 let relayer_repo = MockRelayerRepository::new();
1246
1247 let relayer = StellarRelayer::new(
1248 relayer_model.clone(),
1249 signer,
1250 provider,
1251 StellarRelayerDependencies::new(
1252 Arc::new(relayer_repo),
1253 ctx.network_repository.clone(),
1254 Arc::new(tx_repo),
1255 Arc::new(counter),
1256 Arc::new(job_producer),
1257 ),
1258 )
1259 .await
1260 .unwrap();
1261
1262 let result = relayer.initialize_relayer().await;
1263 assert!(result.is_ok());
1264 }
1265
1266 #[tokio::test]
1267 async fn test_initialize_relayer_sends_notification_when_disabled() {
1268 let ctx = TestCtx::default();
1269 ctx.setup_network().await;
1270 let mut relayer_model = ctx.relayer_model.clone();
1271 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
1273
1274 let mut provider = MockStellarProviderTrait::new();
1275 let mut relayer_repo = MockRelayerRepository::new();
1276 let mut job_producer = MockJobProducerTrait::new();
1277
1278 provider
1280 .expect_get_account()
1281 .returning(|_| Box::pin(ready(Err(eyre!("Sequence sync failed")))));
1282
1283 let mut disabled_relayer = relayer_model.clone();
1285 disabled_relayer.system_disabled = true;
1286 relayer_repo
1287 .expect_disable_relayer()
1288 .withf(|id, reason| {
1289 id == "test-relayer-id"
1290 && matches!(reason, crate::models::DisabledReason::SequenceSyncFailed(_))
1291 })
1292 .returning(move |_, _| Ok(disabled_relayer.clone()));
1293
1294 job_producer
1296 .expect_produce_send_notification_job()
1297 .returning(|_, _| Box::pin(async { Ok(()) }));
1298
1299 job_producer
1301 .expect_produce_relayer_health_check_job()
1302 .returning(|_, _| Box::pin(async { Ok(()) }));
1303
1304 let tx_repo = MockTransactionRepository::new();
1305 let counter = MockTransactionCounterServiceTrait::new();
1306 let signer = MockStellarSignTrait::new();
1307
1308 let relayer = StellarRelayer::new(
1309 relayer_model.clone(),
1310 signer,
1311 provider,
1312 StellarRelayerDependencies::new(
1313 Arc::new(relayer_repo),
1314 ctx.network_repository.clone(),
1315 Arc::new(tx_repo),
1316 Arc::new(counter),
1317 Arc::new(job_producer),
1318 ),
1319 )
1320 .await
1321 .unwrap();
1322
1323 let result = relayer.initialize_relayer().await;
1324 assert!(result.is_ok());
1325 }
1326
1327 #[tokio::test]
1328 async fn test_initialize_relayer_no_notification_when_no_notification_id() {
1329 let ctx = TestCtx::default();
1330 ctx.setup_network().await;
1331 let mut relayer_model = ctx.relayer_model.clone();
1332 relayer_model.system_disabled = false; relayer_model.notification_id = None; let mut provider = MockStellarProviderTrait::new();
1336 let mut relayer_repo = MockRelayerRepository::new();
1337
1338 provider
1340 .expect_get_account()
1341 .returning(|_| Box::pin(ready(Err(eyre!("Sequence sync failed")))));
1342
1343 let mut disabled_relayer = relayer_model.clone();
1345 disabled_relayer.system_disabled = true;
1346 relayer_repo
1347 .expect_disable_relayer()
1348 .withf(|id, reason| {
1349 id == "test-relayer-id"
1350 && matches!(reason, crate::models::DisabledReason::SequenceSyncFailed(_))
1351 })
1352 .returning(move |_, _| Ok(disabled_relayer.clone()));
1353
1354 let mut job_producer = MockJobProducerTrait::new();
1357 job_producer
1358 .expect_produce_relayer_health_check_job()
1359 .returning(|_, _| Box::pin(async { Ok(()) }));
1360
1361 let tx_repo = MockTransactionRepository::new();
1362 let counter = MockTransactionCounterServiceTrait::new();
1363 let signer = MockStellarSignTrait::new();
1364
1365 let relayer = StellarRelayer::new(
1366 relayer_model.clone(),
1367 signer,
1368 provider,
1369 StellarRelayerDependencies::new(
1370 Arc::new(relayer_repo),
1371 ctx.network_repository.clone(),
1372 Arc::new(tx_repo),
1373 Arc::new(counter),
1374 Arc::new(job_producer),
1375 ),
1376 )
1377 .await
1378 .unwrap();
1379
1380 let result = relayer.initialize_relayer().await;
1381 assert!(result.is_ok());
1382 }
1383}