1use crate::models::{
4 NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
5 TransactionStatus, TransactionUpdateRequest,
6};
7use crate::repositories::redis_base::RedisRepository;
8use crate::repositories::{
9 BatchRetrievalResult, PaginatedResult, Repository, TransactionRepository,
10};
11use async_trait::async_trait;
12use redis::aio::ConnectionManager;
13use redis::AsyncCommands;
14use std::fmt;
15use std::sync::Arc;
16use tracing::{debug, error, warn};
17
18const RELAYER_PREFIX: &str = "relayer";
19const TX_PREFIX: &str = "tx";
20const STATUS_PREFIX: &str = "status";
21const NONCE_PREFIX: &str = "nonce";
22const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
23const RELAYER_LIST_KEY: &str = "relayer_list";
24
25#[derive(Clone)]
26pub struct RedisTransactionRepository {
27 pub client: Arc<ConnectionManager>,
28 pub key_prefix: String,
29}
30
31impl RedisRepository for RedisTransactionRepository {}
32
33impl RedisTransactionRepository {
34 pub fn new(
35 connection_manager: Arc<ConnectionManager>,
36 key_prefix: String,
37 ) -> Result<Self, RepositoryError> {
38 if key_prefix.is_empty() {
39 return Err(RepositoryError::InvalidData(
40 "Redis key prefix cannot be empty".to_string(),
41 ));
42 }
43
44 Ok(Self {
45 client: connection_manager,
46 key_prefix,
47 })
48 }
49
50 fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
52 format!(
53 "{}:{}:{}:{}:{}",
54 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
55 )
56 }
57
58 fn tx_to_relayer_key(&self, tx_id: &str) -> String {
60 format!(
61 "{}:{}:{}:{}",
62 self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
63 )
64 }
65
66 fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
68 format!(
69 "{}:{}:{}:{}:{}",
70 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
71 )
72 }
73
74 fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
76 format!(
77 "{}:{}:{}:{}:{}",
78 self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
79 )
80 }
81
82 fn relayer_list_key(&self) -> String {
84 format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
85 }
86
87 async fn get_transactions_by_ids(
89 &self,
90 ids: &[String],
91 ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
92 if ids.is_empty() {
93 debug!("no transaction IDs provided for batch fetch");
94 return Ok(BatchRetrievalResult {
95 results: vec![],
96 failed_ids: vec![],
97 });
98 }
99
100 let mut conn = self.client.as_ref().clone();
101
102 let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
103
104 debug!(count = %ids.len(), "fetching relayer IDs for transactions");
105
106 let relayer_ids: Vec<Option<String>> = conn
107 .mget(&reverse_keys)
108 .await
109 .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
110
111 let mut tx_keys = Vec::new();
112 let mut valid_ids = Vec::new();
113 let mut failed_ids = Vec::new();
114 for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
115 match relayer_id {
116 Some(relayer_id) => {
117 tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
118 valid_ids.push(ids[i].clone());
119 }
120 None => {
121 warn!(tx_id = %ids[i], "no relayer found for transaction");
122 failed_ids.push(ids[i].clone());
123 }
124 }
125 }
126
127 if tx_keys.is_empty() {
128 debug!("no valid transactions found for batch fetch");
129 return Ok(BatchRetrievalResult {
130 results: vec![],
131 failed_ids,
132 });
133 }
134
135 debug!(count = %tx_keys.len(), "batch fetching transaction data");
136
137 let values: Vec<Option<String>> = conn
138 .mget(&tx_keys)
139 .await
140 .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
141
142 let mut transactions = Vec::new();
143 let mut failed_count = 0;
144 let mut failed_ids = Vec::new();
145 for (i, value) in values.into_iter().enumerate() {
146 match value {
147 Some(json) => {
148 match self.deserialize_entity::<TransactionRepoModel>(
149 &json,
150 &valid_ids[i],
151 "transaction",
152 ) {
153 Ok(tx) => transactions.push(tx),
154 Err(e) => {
155 failed_count += 1;
156 error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
157 }
159 }
160 }
161 None => {
162 warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
163 failed_ids.push(valid_ids[i].clone());
164 }
165 }
166 }
167
168 if failed_count > 0 {
169 warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
170 }
171
172 debug!(count = %transactions.len(), "successfully fetched transactions");
173 Ok(BatchRetrievalResult {
174 results: transactions,
175 failed_ids,
176 })
177 }
178
179 fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
181 match network_data.get_evm_transaction_data() {
182 Ok(tx_data) => tx_data.nonce,
183 Err(_) => {
184 debug!("no EVM transaction data available for nonce extraction");
185 None
186 }
187 }
188 }
189
190 async fn update_indexes(
192 &self,
193 tx: &TransactionRepoModel,
194 old_tx: Option<&TransactionRepoModel>,
195 ) -> Result<(), RepositoryError> {
196 let mut conn = self.client.as_ref().clone();
197 let mut pipe = redis::pipe();
198 pipe.atomic();
199
200 debug!(tx_id = %tx.id, "updating indexes for transaction");
201
202 let relayer_list_key = self.relayer_list_key();
204 pipe.sadd(&relayer_list_key, &tx.relayer_id);
205
206 let new_status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
208 pipe.sadd(&new_status_key, &tx.id);
209
210 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
211 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
212 pipe.set(&nonce_key, &tx.id);
213 debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
214 }
215
216 if let Some(old) = old_tx {
218 if old.status != tx.status {
219 let old_status_key = self.relayer_status_key(&old.relayer_id, &old.status);
220 pipe.srem(&old_status_key, &tx.id);
221 debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status index for transaction");
222 }
223
224 if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
226 let new_nonce = self.extract_nonce(&tx.network_data);
227 if Some(old_nonce) != new_nonce {
228 let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
229 pipe.del(&old_nonce_key);
230 debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
231 }
232 }
233 }
234
235 pipe.exec_async(&mut conn).await.map_err(|e| {
237 error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
238 self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
239 })?;
240
241 debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
242 Ok(())
243 }
244
245 async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
247 let mut conn = self.client.as_ref().clone();
248 let mut pipe = redis::pipe();
249 pipe.atomic();
250
251 debug!(tx_id = %tx.id, "removing all indexes for transaction");
252
253 let status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
255 pipe.srem(&status_key, &tx.id);
256
257 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
259 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
260 pipe.del(&nonce_key);
261 debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
262 }
263
264 let reverse_key = self.tx_to_relayer_key(&tx.id);
266 pipe.del(&reverse_key);
267
268 pipe.exec_async(&mut conn).await.map_err(|e| {
269 error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
270 self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
271 })?;
272
273 debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
274 Ok(())
275 }
276}
277
278impl fmt::Debug for RedisTransactionRepository {
279 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280 f.debug_struct("RedisTransactionRepository")
281 .field("client", &"<ConnectionManager>")
282 .field("key_prefix", &self.key_prefix)
283 .finish()
284 }
285}
286
287#[async_trait]
288impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
289 async fn create(
290 &self,
291 entity: TransactionRepoModel,
292 ) -> Result<TransactionRepoModel, RepositoryError> {
293 if entity.id.is_empty() {
294 return Err(RepositoryError::InvalidData(
295 "Transaction ID cannot be empty".to_string(),
296 ));
297 }
298
299 let key = self.tx_key(&entity.relayer_id, &entity.id);
300 let reverse_key = self.tx_to_relayer_key(&entity.id);
301 let mut conn = self.client.as_ref().clone();
302
303 debug!(tx_id = %entity.id, "creating transaction");
304
305 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
306
307 let existing: Option<String> = conn
309 .get(&reverse_key)
310 .await
311 .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
312
313 if existing.is_some() {
314 return Err(RepositoryError::ConstraintViolation(format!(
315 "Transaction with ID {} already exists",
316 entity.id
317 )));
318 }
319
320 let mut pipe = redis::pipe();
322 pipe.atomic();
323 pipe.set(&key, &value);
324 pipe.set(&reverse_key, &entity.relayer_id);
325
326 pipe.exec_async(&mut conn)
327 .await
328 .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
329
330 if let Err(e) = self.update_indexes(&entity, None).await {
332 error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
333 return Err(e);
334 }
335
336 debug!(tx_id = %entity.id, "successfully created transaction");
337 Ok(entity)
338 }
339
340 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
341 if id.is_empty() {
342 return Err(RepositoryError::InvalidData(
343 "Transaction ID cannot be empty".to_string(),
344 ));
345 }
346
347 let mut conn = self.client.as_ref().clone();
348
349 debug!(tx_id = %id, "fetching transaction");
350
351 let reverse_key = self.tx_to_relayer_key(&id);
352 let relayer_id: Option<String> = conn
353 .get(&reverse_key)
354 .await
355 .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
356
357 let relayer_id = match relayer_id {
358 Some(relayer_id) => relayer_id,
359 None => {
360 debug!(tx_id = %id, "transaction not found (no reverse lookup)");
361 return Err(RepositoryError::NotFound(format!(
362 "Transaction with ID {} not found",
363 id
364 )));
365 }
366 };
367
368 let key = self.tx_key(&relayer_id, &id);
369 let value: Option<String> = conn
370 .get(&key)
371 .await
372 .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
373
374 match value {
375 Some(json) => {
376 let tx =
377 self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
378 debug!(tx_id = %id, "successfully fetched transaction");
379 Ok(tx)
380 }
381 None => {
382 debug!(tx_id = %id, "transaction not found");
383 Err(RepositoryError::NotFound(format!(
384 "Transaction with ID {} not found",
385 id
386 )))
387 }
388 }
389 }
390
391 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
392 let mut conn = self.client.as_ref().clone();
393
394 debug!("fetching all transaction IDs");
395
396 let relayer_list_key = self.relayer_list_key();
398 let relayer_ids: Vec<String> = conn
399 .smembers(&relayer_list_key)
400 .await
401 .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
402
403 debug!(count = %relayer_ids.len(), "found relayers");
404
405 let mut all_tx_ids = Vec::new();
407 for relayer_id in relayer_ids {
408 let pattern = format!(
409 "{}:{}:{}:{}:*",
410 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
411 );
412 let mut cursor = 0;
413 loop {
414 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
415 .cursor_arg(cursor)
416 .arg("MATCH")
417 .arg(&pattern)
418 .query_async(&mut conn)
419 .await
420 .map_err(|e| self.map_redis_error(e, "list_all_scan_keys"))?;
421
422 for key in keys {
424 if let Some(tx_id) = key.split(':').next_back() {
425 all_tx_ids.push(tx_id.to_string());
426 }
427 }
428
429 cursor = next_cursor;
430 if cursor == 0 {
431 break;
432 }
433 }
434 }
435
436 debug!(count = %all_tx_ids.len(), "found transaction IDs");
437
438 let transactions = self.get_transactions_by_ids(&all_tx_ids).await?;
439 Ok(transactions.results)
440 }
441
442 async fn list_paginated(
443 &self,
444 query: PaginationQuery,
445 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
446 if query.per_page == 0 {
447 return Err(RepositoryError::InvalidData(
448 "per_page must be greater than 0".to_string(),
449 ));
450 }
451
452 let mut conn = self.client.as_ref().clone();
453
454 debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions");
455
456 let relayer_list_key = self.relayer_list_key();
458 let relayer_ids: Vec<String> = conn
459 .smembers(&relayer_list_key)
460 .await
461 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
462
463 let mut all_tx_ids = Vec::new();
465 for relayer_id in relayer_ids {
466 let pattern = format!(
467 "{}:{}:{}:{}:*",
468 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
469 );
470 let mut cursor = 0;
471 loop {
472 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
473 .cursor_arg(cursor)
474 .arg("MATCH")
475 .arg(&pattern)
476 .query_async(&mut conn)
477 .await
478 .map_err(|e| self.map_redis_error(e, "list_paginated_scan_keys"))?;
479
480 for key in keys {
482 if let Some(tx_id) = key.split(':').next_back() {
483 all_tx_ids.push(tx_id.to_string());
484 }
485 }
486
487 cursor = next_cursor;
488 if cursor == 0 {
489 break;
490 }
491 }
492 }
493
494 let total = all_tx_ids.len() as u64;
495 let start = ((query.page - 1) * query.per_page) as usize;
496 let end = (start + query.per_page as usize).min(all_tx_ids.len());
497
498 if start >= all_tx_ids.len() {
499 debug!(page = %query.page, total = %total, "page is beyond available data");
500 return Ok(PaginatedResult {
501 items: vec![],
502 total,
503 page: query.page,
504 per_page: query.per_page,
505 });
506 }
507
508 let page_ids = &all_tx_ids[start..end];
509 let items = self.get_transactions_by_ids(page_ids).await?;
510
511 debug!(count = %items.results.len(), page = %query.page, "successfully fetched transactions for page");
512
513 Ok(PaginatedResult {
514 items: items.results.clone(),
515 total,
516 page: query.page,
517 per_page: query.per_page,
518 })
519 }
520
521 async fn update(
522 &self,
523 id: String,
524 entity: TransactionRepoModel,
525 ) -> Result<TransactionRepoModel, RepositoryError> {
526 if id.is_empty() {
527 return Err(RepositoryError::InvalidData(
528 "Transaction ID cannot be empty".to_string(),
529 ));
530 }
531
532 debug!(tx_id = %id, "updating transaction");
533
534 let old_tx = self.get_by_id(id.clone()).await?;
536
537 let key = self.tx_key(&entity.relayer_id, &id);
538 let mut conn = self.client.as_ref().clone();
539
540 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
541
542 let _: () = conn
544 .set(&key, value)
545 .await
546 .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
547
548 self.update_indexes(&entity, Some(&old_tx)).await?;
550
551 debug!(tx_id = %id, "successfully updated transaction");
552 Ok(entity)
553 }
554
555 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
556 if id.is_empty() {
557 return Err(RepositoryError::InvalidData(
558 "Transaction ID cannot be empty".to_string(),
559 ));
560 }
561
562 debug!(tx_id = %id, "deleting transaction");
563
564 let tx = self.get_by_id(id.clone()).await?;
566
567 let key = self.tx_key(&tx.relayer_id, &id);
568 let reverse_key = self.tx_to_relayer_key(&id);
569 let mut conn = self.client.as_ref().clone();
570
571 let mut pipe = redis::pipe();
572 pipe.atomic();
573 pipe.del(&key);
574 pipe.del(&reverse_key);
575
576 pipe.exec_async(&mut conn)
577 .await
578 .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
579
580 if let Err(e) = self.remove_all_indexes(&tx).await {
582 error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
583 }
584
585 debug!(tx_id = %id, "successfully deleted transaction");
586 Ok(())
587 }
588
589 async fn count(&self) -> Result<usize, RepositoryError> {
590 let mut conn = self.client.as_ref().clone();
591
592 debug!("counting transactions");
593
594 let relayer_list_key = self.relayer_list_key();
596 let relayer_ids: Vec<String> = conn
597 .smembers(&relayer_list_key)
598 .await
599 .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
600
601 let mut total_count = 0;
603 for relayer_id in relayer_ids {
604 let pattern = format!(
605 "{}:{}:{}:{}:*",
606 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
607 );
608 let mut cursor = 0;
609 loop {
610 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
611 .cursor_arg(cursor)
612 .arg("MATCH")
613 .arg(&pattern)
614 .query_async(&mut conn)
615 .await
616 .map_err(|e| self.map_redis_error(e, "count_scan_keys"))?;
617
618 total_count += keys.len();
619
620 cursor = next_cursor;
621 if cursor == 0 {
622 break;
623 }
624 }
625 }
626
627 debug!(count = %total_count, "transaction count");
628 Ok(total_count)
629 }
630
631 async fn has_entries(&self) -> Result<bool, RepositoryError> {
632 let mut conn = self.client.as_ref().clone();
633 let relayer_list_key = self.relayer_list_key();
634
635 debug!("checking if transaction entries exist");
636
637 let exists: bool = conn
638 .exists(&relayer_list_key)
639 .await
640 .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
641
642 debug!(exists = %exists, "transaction entries exist");
643 Ok(exists)
644 }
645
646 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
647 let mut conn = self.client.as_ref().clone();
648 let relayer_list_key = self.relayer_list_key();
649
650 debug!("dropping all transaction entries");
651
652 let relayer_ids: Vec<String> = conn
654 .smembers(&relayer_list_key)
655 .await
656 .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
657
658 if relayer_ids.is_empty() {
659 debug!("no transaction entries to drop");
660 return Ok(());
661 }
662
663 let mut pipe = redis::pipe();
665 pipe.atomic();
666
667 for relayer_id in &relayer_ids {
669 let pattern = format!(
671 "{}:{}:{}:{}:*",
672 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
673 );
674 let mut cursor = 0;
675 let mut tx_ids = Vec::new();
676
677 loop {
678 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
679 .cursor_arg(cursor)
680 .arg("MATCH")
681 .arg(&pattern)
682 .query_async(&mut conn)
683 .await
684 .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
685
686 for key in keys {
688 pipe.del(&key);
689 if let Some(tx_id) = key.split(':').next_back() {
690 tx_ids.push(tx_id.to_string());
691 }
692 }
693
694 cursor = next_cursor;
695 if cursor == 0 {
696 break;
697 }
698 }
699
700 for tx_id in tx_ids {
702 let reverse_key = self.tx_to_relayer_key(&tx_id);
703 pipe.del(&reverse_key);
704
705 for status in &[
707 TransactionStatus::Pending,
708 TransactionStatus::Sent,
709 TransactionStatus::Confirmed,
710 TransactionStatus::Failed,
711 TransactionStatus::Canceled,
712 ] {
713 let status_key = self.relayer_status_key(relayer_id, status);
714 pipe.srem(&status_key, &tx_id);
715 }
716 }
717 }
718
719 pipe.del(&relayer_list_key);
721
722 pipe.exec_async(&mut conn)
723 .await
724 .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
725
726 debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
727 Ok(())
728 }
729}
730
731#[async_trait]
732impl TransactionRepository for RedisTransactionRepository {
733 async fn find_by_relayer_id(
734 &self,
735 relayer_id: &str,
736 query: PaginationQuery,
737 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
738 let mut conn = self.client.as_ref().clone();
739
740 let pattern = format!(
742 "{}:{}:{}:{}:*",
743 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
744 );
745 let mut all_tx_ids = Vec::new();
746 let mut cursor = 0;
747
748 loop {
749 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
750 .cursor_arg(cursor)
751 .arg("MATCH")
752 .arg(&pattern)
753 .query_async(&mut conn)
754 .await
755 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_scan"))?;
756
757 for key in keys {
759 if let Some(tx_id) = key.split(':').next_back() {
760 all_tx_ids.push(tx_id.to_string());
761 }
762 }
763
764 cursor = next_cursor;
765 if cursor == 0 {
766 break;
767 }
768 }
769
770 let total = all_tx_ids.len() as u64;
771 let start = ((query.page - 1) * query.per_page) as usize;
772 let end = (start + query.per_page as usize).min(all_tx_ids.len());
773
774 let page_ids = &all_tx_ids[start..end];
775 let items = self.get_transactions_by_ids(page_ids).await?;
776
777 Ok(PaginatedResult {
778 items: items.results.clone(),
779 total,
780 page: query.page,
781 per_page: query.per_page,
782 })
783 }
784
785 async fn find_by_status(
786 &self,
787 relayer_id: &str,
788 statuses: &[TransactionStatus],
789 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
790 let mut conn = self.client.as_ref().clone();
791 let mut all_ids = Vec::new();
792
793 for status in statuses {
795 let status_key = self.relayer_status_key(relayer_id, status);
796 let ids: Vec<String> = conn
797 .smembers(status_key)
798 .await
799 .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
800
801 all_ids.extend(ids);
802 }
803
804 all_ids.sort();
806 all_ids.dedup();
807
808 let transactions = self.get_transactions_by_ids(&all_ids).await?;
809 Ok(transactions.results)
810 }
811
812 async fn find_by_nonce(
813 &self,
814 relayer_id: &str,
815 nonce: u64,
816 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
817 let mut conn = self.client.as_ref().clone();
818 let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
819
820 let tx_id: Option<String> = conn
822 .get(nonce_key)
823 .await
824 .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
825
826 match tx_id {
827 Some(tx_id) => {
828 match self.get_by_id(tx_id.clone()).await {
829 Ok(tx) => Ok(Some(tx)),
830 Err(RepositoryError::NotFound(_)) => {
831 warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
833 Ok(None)
834 }
835 Err(e) => Err(e),
836 }
837 }
838 None => Ok(None),
839 }
840 }
841
842 async fn update_status(
843 &self,
844 tx_id: String,
845 status: TransactionStatus,
846 ) -> Result<TransactionRepoModel, RepositoryError> {
847 let update = TransactionUpdateRequest {
848 status: Some(status),
849 ..Default::default()
850 };
851 self.partial_update(tx_id, update).await
852 }
853
854 async fn partial_update(
855 &self,
856 tx_id: String,
857 update: TransactionUpdateRequest,
858 ) -> Result<TransactionRepoModel, RepositoryError> {
859 let mut tx = self.get_by_id(tx_id.clone()).await?;
861 let old_tx = tx.clone(); tx.apply_partial_update(update);
865
866 let key = self.tx_key(&tx.relayer_id, &tx_id);
868 let mut conn = self.client.as_ref().clone();
869
870 let value = self.serialize_entity(&tx, |t| &t.id, "transaction")?;
871
872 let _: () = conn
873 .set(&key, value)
874 .await
875 .map_err(|e| self.map_redis_error(e, "partial_update"))?;
876
877 self.update_indexes(&tx, Some(&old_tx)).await?;
878 Ok(tx)
879 }
880
881 async fn update_network_data(
882 &self,
883 tx_id: String,
884 network_data: NetworkTransactionData,
885 ) -> Result<TransactionRepoModel, RepositoryError> {
886 let update = TransactionUpdateRequest {
887 network_data: Some(network_data),
888 ..Default::default()
889 };
890 self.partial_update(tx_id, update).await
891 }
892
893 async fn set_sent_at(
894 &self,
895 tx_id: String,
896 sent_at: String,
897 ) -> Result<TransactionRepoModel, RepositoryError> {
898 let update = TransactionUpdateRequest {
899 sent_at: Some(sent_at),
900 ..Default::default()
901 };
902 self.partial_update(tx_id, update).await
903 }
904
905 async fn set_confirmed_at(
906 &self,
907 tx_id: String,
908 confirmed_at: String,
909 ) -> Result<TransactionRepoModel, RepositoryError> {
910 let update = TransactionUpdateRequest {
911 confirmed_at: Some(confirmed_at),
912 ..Default::default()
913 };
914 self.partial_update(tx_id, update).await
915 }
916}
917
918#[cfg(test)]
919mod tests {
920 use super::*;
921 use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
922 use alloy::primitives::U256;
923 use lazy_static::lazy_static;
924 use redis::Client;
925 use std::str::FromStr;
926 use tokio;
927 use uuid::Uuid;
928
929 use tokio::sync::Mutex;
930
931 lazy_static! {
933 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
934 }
935
936 fn create_test_transaction(id: &str) -> TransactionRepoModel {
938 TransactionRepoModel {
939 id: id.to_string(),
940 relayer_id: "relayer-1".to_string(),
941 status: TransactionStatus::Pending,
942 status_reason: None,
943 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
944 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
945 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
946 valid_until: None,
947 delete_at: None,
948 network_type: NetworkType::Evm,
949 priced_at: None,
950 hashes: vec![],
951 network_data: NetworkTransactionData::Evm(EvmTransactionData {
952 gas_price: Some(1000000000),
953 gas_limit: Some(21000),
954 nonce: Some(1),
955 value: U256::from_str("1000000000000000000").unwrap(),
956 data: Some("0x".to_string()),
957 from: "0xSender".to_string(),
958 to: Some("0xRecipient".to_string()),
959 chain_id: 1,
960 signature: None,
961 hash: Some(format!("0x{}", id)),
962 speed: Some(Speed::Fast),
963 max_fee_per_gas: None,
964 max_priority_fee_per_gas: None,
965 raw: None,
966 }),
967 noop_count: None,
968 is_canceled: Some(false),
969 }
970 }
971
972 fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
973 let mut tx = create_test_transaction(id);
974 tx.relayer_id = relayer_id.to_string();
975 tx
976 }
977
978 fn create_test_transaction_with_status(
979 id: &str,
980 relayer_id: &str,
981 status: TransactionStatus,
982 ) -> TransactionRepoModel {
983 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
984 tx.status = status;
985 tx
986 }
987
988 fn create_test_transaction_with_nonce(
989 id: &str,
990 nonce: u64,
991 relayer_id: &str,
992 ) -> TransactionRepoModel {
993 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
994 if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
995 evm_data.nonce = Some(nonce);
996 }
997 tx
998 }
999
1000 async fn setup_test_repo() -> RedisTransactionRepository {
1001 let redis_url = std::env::var("REDIS_TEST_URL")
1003 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1004
1005 let client = Client::open(redis_url).expect("Failed to create Redis client");
1006 let connection_manager = ConnectionManager::new(client)
1007 .await
1008 .expect("Failed to create connection manager");
1009
1010 let random_id = Uuid::new_v4().to_string();
1011 let key_prefix = format!("test_prefix:{}", random_id);
1012
1013 RedisTransactionRepository::new(Arc::new(connection_manager), key_prefix)
1014 .expect("Failed to create RedisTransactionRepository")
1015 }
1016
1017 #[tokio::test]
1018 #[ignore = "Requires active Redis instance"]
1019 async fn test_new_repository_creation() {
1020 let repo = setup_test_repo().await;
1021 assert!(repo.key_prefix.contains("test_prefix"));
1022 }
1023
1024 #[tokio::test]
1025 #[ignore = "Requires active Redis instance"]
1026 async fn test_new_repository_empty_prefix_fails() {
1027 let redis_url = std::env::var("REDIS_TEST_URL")
1028 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1029 let client = Client::open(redis_url).expect("Failed to create Redis client");
1030 let connection_manager = ConnectionManager::new(client)
1031 .await
1032 .expect("Failed to create connection manager");
1033
1034 let result = RedisTransactionRepository::new(Arc::new(connection_manager), "".to_string());
1035 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1036 }
1037
1038 #[tokio::test]
1039 #[ignore = "Requires active Redis instance"]
1040 async fn test_key_generation() {
1041 let repo = setup_test_repo().await;
1042
1043 assert!(repo
1044 .tx_key("relayer-1", "test-id")
1045 .contains(":relayer:relayer-1:tx:test-id"));
1046 assert!(repo
1047 .tx_to_relayer_key("test-id")
1048 .contains(":relayer:tx_to_relayer:test-id"));
1049 assert!(repo.relayer_list_key().contains(":relayer_list"));
1050 assert!(repo
1051 .relayer_status_key("relayer-1", &TransactionStatus::Pending)
1052 .contains(":relayer:relayer-1:status:Pending"));
1053 assert!(repo
1054 .relayer_nonce_key("relayer-1", 42)
1055 .contains(":relayer:relayer-1:nonce:42"));
1056 }
1057
1058 #[tokio::test]
1059 #[ignore = "Requires active Redis instance"]
1060 async fn test_serialize_deserialize_transaction() {
1061 let repo = setup_test_repo().await;
1062 let tx = create_test_transaction("test-1");
1063
1064 let serialized = repo
1065 .serialize_entity(&tx, |t| &t.id, "transaction")
1066 .expect("Serialization should succeed");
1067 let deserialized: TransactionRepoModel = repo
1068 .deserialize_entity(&serialized, "test-1", "transaction")
1069 .expect("Deserialization should succeed");
1070
1071 assert_eq!(tx.id, deserialized.id);
1072 assert_eq!(tx.relayer_id, deserialized.relayer_id);
1073 assert_eq!(tx.status, deserialized.status);
1074 }
1075
1076 #[tokio::test]
1077 #[ignore = "Requires active Redis instance"]
1078 async fn test_extract_nonce() {
1079 let repo = setup_test_repo().await;
1080 let random_id = Uuid::new_v4().to_string();
1081 let relayer_id = Uuid::new_v4().to_string();
1082 let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1083
1084 let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
1085 assert_eq!(nonce, Some(42));
1086 }
1087
1088 #[tokio::test]
1089 #[ignore = "Requires active Redis instance"]
1090 async fn test_create_transaction() {
1091 let repo = setup_test_repo().await;
1092 let random_id = Uuid::new_v4().to_string();
1093 let tx = create_test_transaction(&random_id);
1094
1095 let result = repo.create(tx.clone()).await.unwrap();
1096 assert_eq!(result.id, tx.id);
1097 }
1098
1099 #[tokio::test]
1100 #[ignore = "Requires active Redis instance"]
1101 async fn test_get_transaction() {
1102 let repo = setup_test_repo().await;
1103 let random_id = Uuid::new_v4().to_string();
1104 let tx = create_test_transaction(&random_id);
1105
1106 repo.create(tx.clone()).await.unwrap();
1107 let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
1108 assert_eq!(stored.id, tx.id);
1109 assert_eq!(stored.relayer_id, tx.relayer_id);
1110 }
1111
1112 #[tokio::test]
1113 #[ignore = "Requires active Redis instance"]
1114 async fn test_update_transaction() {
1115 let repo = setup_test_repo().await;
1116 let random_id = Uuid::new_v4().to_string();
1117 let mut tx = create_test_transaction(&random_id);
1118
1119 repo.create(tx.clone()).await.unwrap();
1120 tx.status = TransactionStatus::Confirmed;
1121
1122 let updated = repo.update(random_id.to_string(), tx).await.unwrap();
1123 assert!(matches!(updated.status, TransactionStatus::Confirmed));
1124 }
1125
1126 #[tokio::test]
1127 #[ignore = "Requires active Redis instance"]
1128 async fn test_delete_transaction() {
1129 let repo = setup_test_repo().await;
1130 let random_id = Uuid::new_v4().to_string();
1131 let tx = create_test_transaction(&random_id);
1132
1133 repo.create(tx).await.unwrap();
1134 repo.delete_by_id(random_id.to_string()).await.unwrap();
1135
1136 let result = repo.get_by_id(random_id.to_string()).await;
1137 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1138 }
1139
1140 #[tokio::test]
1141 #[ignore = "Requires active Redis instance"]
1142 async fn test_list_all_transactions() {
1143 let repo = setup_test_repo().await;
1144 let random_id = Uuid::new_v4().to_string();
1145 let random_id2 = Uuid::new_v4().to_string();
1146
1147 let tx1 = create_test_transaction(&random_id);
1148 let tx2 = create_test_transaction(&random_id2);
1149
1150 repo.create(tx1).await.unwrap();
1151 repo.create(tx2).await.unwrap();
1152
1153 let transactions = repo.list_all().await.unwrap();
1154 assert!(transactions.len() >= 2);
1155 }
1156
1157 #[tokio::test]
1158 #[ignore = "Requires active Redis instance"]
1159 async fn test_count_transactions() {
1160 let repo = setup_test_repo().await;
1161 let random_id = Uuid::new_v4().to_string();
1162 let tx = create_test_transaction(&random_id);
1163
1164 let count = repo.count().await.unwrap();
1165 repo.create(tx).await.unwrap();
1166 assert!(repo.count().await.unwrap() > count);
1167 }
1168
1169 #[tokio::test]
1170 #[ignore = "Requires active Redis instance"]
1171 async fn test_get_nonexistent_transaction() {
1172 let repo = setup_test_repo().await;
1173 let result = repo.get_by_id("nonexistent".to_string()).await;
1174 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1175 }
1176
1177 #[tokio::test]
1178 #[ignore = "Requires active Redis instance"]
1179 async fn test_duplicate_transaction_creation() {
1180 let repo = setup_test_repo().await;
1181 let random_id = Uuid::new_v4().to_string();
1182
1183 let tx = create_test_transaction(&random_id);
1184
1185 repo.create(tx.clone()).await.unwrap();
1186 let result = repo.create(tx).await;
1187
1188 assert!(matches!(
1189 result,
1190 Err(RepositoryError::ConstraintViolation(_))
1191 ));
1192 }
1193
1194 #[tokio::test]
1195 #[ignore = "Requires active Redis instance"]
1196 async fn test_update_nonexistent_transaction() {
1197 let repo = setup_test_repo().await;
1198 let tx = create_test_transaction("test-1");
1199
1200 let result = repo.update("nonexistent".to_string(), tx).await;
1201 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1202 }
1203
1204 #[tokio::test]
1205 #[ignore = "Requires active Redis instance"]
1206 async fn test_list_paginated() {
1207 let repo = setup_test_repo().await;
1208
1209 for _ in 1..=10 {
1211 let random_id = Uuid::new_v4().to_string();
1212 let tx = create_test_transaction(&random_id);
1213 repo.create(tx).await.unwrap();
1214 }
1215
1216 let query = PaginationQuery {
1218 page: 1,
1219 per_page: 3,
1220 };
1221 let result = repo.list_paginated(query).await.unwrap();
1222 assert_eq!(result.items.len(), 3);
1223 assert!(result.total >= 10);
1224 assert_eq!(result.page, 1);
1225 assert_eq!(result.per_page, 3);
1226
1227 let query = PaginationQuery {
1229 page: 1000,
1230 per_page: 3,
1231 };
1232 let result = repo.list_paginated(query).await.unwrap();
1233 assert_eq!(result.items.len(), 0);
1234 }
1235
1236 #[tokio::test]
1237 #[ignore = "Requires active Redis instance"]
1238 async fn test_find_by_relayer_id() {
1239 let repo = setup_test_repo().await;
1240 let random_id = Uuid::new_v4().to_string();
1241 let random_id2 = Uuid::new_v4().to_string();
1242 let random_id3 = Uuid::new_v4().to_string();
1243
1244 let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
1245 let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
1246 let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
1247
1248 repo.create(tx1).await.unwrap();
1249 repo.create(tx2).await.unwrap();
1250 repo.create(tx3).await.unwrap();
1251
1252 let query = PaginationQuery {
1254 page: 1,
1255 per_page: 10,
1256 };
1257 let result = repo
1258 .find_by_relayer_id("relayer-1", query.clone())
1259 .await
1260 .unwrap();
1261 assert!(result.total >= 2);
1262 assert!(result.items.len() >= 2);
1263 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
1264
1265 let result = repo
1267 .find_by_relayer_id("relayer-2", query.clone())
1268 .await
1269 .unwrap();
1270 assert!(result.total >= 1);
1271 assert!(!result.items.is_empty());
1272 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
1273
1274 let result = repo
1276 .find_by_relayer_id("non-existent", query.clone())
1277 .await
1278 .unwrap();
1279 assert_eq!(result.total, 0);
1280 assert_eq!(result.items.len(), 0);
1281 }
1282
1283 #[tokio::test]
1284 #[ignore = "Requires active Redis instance"]
1285 async fn test_find_by_status() {
1286 let repo = setup_test_repo().await;
1287 let random_id = Uuid::new_v4().to_string();
1288 let random_id2 = Uuid::new_v4().to_string();
1289 let random_id3 = Uuid::new_v4().to_string();
1290 let relayer_id = Uuid::new_v4().to_string();
1291 let tx1 = create_test_transaction_with_status(
1292 &random_id,
1293 &relayer_id,
1294 TransactionStatus::Pending,
1295 );
1296 let tx2 =
1297 create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
1298 let tx3 = create_test_transaction_with_status(
1299 &random_id3,
1300 &relayer_id,
1301 TransactionStatus::Confirmed,
1302 );
1303
1304 repo.create(tx1).await.unwrap();
1305 repo.create(tx2).await.unwrap();
1306 repo.create(tx3).await.unwrap();
1307
1308 let result = repo
1310 .find_by_status(&relayer_id, &[TransactionStatus::Pending])
1311 .await
1312 .unwrap();
1313 assert_eq!(result.len(), 1);
1314 assert_eq!(result[0].status, TransactionStatus::Pending);
1315
1316 let result = repo
1318 .find_by_status(
1319 &relayer_id,
1320 &[TransactionStatus::Pending, TransactionStatus::Sent],
1321 )
1322 .await
1323 .unwrap();
1324 assert_eq!(result.len(), 2);
1325
1326 let result = repo
1328 .find_by_status(&relayer_id, &[TransactionStatus::Failed])
1329 .await
1330 .unwrap();
1331 assert_eq!(result.len(), 0);
1332 }
1333
1334 #[tokio::test]
1335 #[ignore = "Requires active Redis instance"]
1336 async fn test_find_by_nonce() {
1337 let repo = setup_test_repo().await;
1338 let random_id = Uuid::new_v4().to_string();
1339 let random_id2 = Uuid::new_v4().to_string();
1340 let relayer_id = Uuid::new_v4().to_string();
1341
1342 let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1343 let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
1344
1345 repo.create(tx1.clone()).await.unwrap();
1346 repo.create(tx2).await.unwrap();
1347
1348 let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1350 assert!(result.is_some());
1351 assert_eq!(result.unwrap().id, random_id);
1352
1353 let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
1355 assert!(result.is_none());
1356
1357 let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
1359 assert!(result.is_none());
1360 }
1361
1362 #[tokio::test]
1363 #[ignore = "Requires active Redis instance"]
1364 async fn test_update_status() {
1365 let repo = setup_test_repo().await;
1366 let random_id = Uuid::new_v4().to_string();
1367 let tx = create_test_transaction(&random_id);
1368
1369 repo.create(tx).await.unwrap();
1370 let updated = repo
1371 .update_status(random_id.to_string(), TransactionStatus::Confirmed)
1372 .await
1373 .unwrap();
1374 assert_eq!(updated.status, TransactionStatus::Confirmed);
1375 }
1376
1377 #[tokio::test]
1378 #[ignore = "Requires active Redis instance"]
1379 async fn test_partial_update() {
1380 let repo = setup_test_repo().await;
1381 let random_id = Uuid::new_v4().to_string();
1382 let tx = create_test_transaction(&random_id);
1383
1384 repo.create(tx).await.unwrap();
1385
1386 let update = TransactionUpdateRequest {
1387 status: Some(TransactionStatus::Sent),
1388 status_reason: Some("Transaction sent".to_string()),
1389 sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
1390 confirmed_at: None,
1391 network_data: None,
1392 hashes: None,
1393 is_canceled: None,
1394 priced_at: None,
1395 noop_count: None,
1396 delete_at: None,
1397 };
1398
1399 let updated = repo
1400 .partial_update(random_id.to_string(), update)
1401 .await
1402 .unwrap();
1403 assert_eq!(updated.status, TransactionStatus::Sent);
1404 assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
1405 assert_eq!(
1406 updated.sent_at,
1407 Some("2025-01-27T16:00:00.000000+00:00".to_string())
1408 );
1409 }
1410
1411 #[tokio::test]
1412 #[ignore = "Requires active Redis instance"]
1413 async fn test_set_sent_at() {
1414 let repo = setup_test_repo().await;
1415 let random_id = Uuid::new_v4().to_string();
1416 let tx = create_test_transaction(&random_id);
1417
1418 repo.create(tx).await.unwrap();
1419 let updated = repo
1420 .set_sent_at(
1421 random_id.to_string(),
1422 "2025-01-27T16:00:00.000000+00:00".to_string(),
1423 )
1424 .await
1425 .unwrap();
1426 assert_eq!(
1427 updated.sent_at,
1428 Some("2025-01-27T16:00:00.000000+00:00".to_string())
1429 );
1430 }
1431
1432 #[tokio::test]
1433 #[ignore = "Requires active Redis instance"]
1434 async fn test_set_confirmed_at() {
1435 let repo = setup_test_repo().await;
1436 let random_id = Uuid::new_v4().to_string();
1437 let tx = create_test_transaction(&random_id);
1438
1439 repo.create(tx).await.unwrap();
1440 let updated = repo
1441 .set_confirmed_at(
1442 random_id.to_string(),
1443 "2025-01-27T16:00:00.000000+00:00".to_string(),
1444 )
1445 .await
1446 .unwrap();
1447 assert_eq!(
1448 updated.confirmed_at,
1449 Some("2025-01-27T16:00:00.000000+00:00".to_string())
1450 );
1451 }
1452
1453 #[tokio::test]
1454 #[ignore = "Requires active Redis instance"]
1455 async fn test_update_network_data() {
1456 let repo = setup_test_repo().await;
1457 let random_id = Uuid::new_v4().to_string();
1458 let tx = create_test_transaction(&random_id);
1459
1460 repo.create(tx).await.unwrap();
1461
1462 let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
1463 gas_price: Some(2000000000),
1464 gas_limit: Some(42000),
1465 nonce: Some(2),
1466 value: U256::from_str("2000000000000000000").unwrap(),
1467 data: Some("0x1234".to_string()),
1468 from: "0xNewSender".to_string(),
1469 to: Some("0xNewRecipient".to_string()),
1470 chain_id: 1,
1471 signature: None,
1472 hash: Some("0xnewhash".to_string()),
1473 speed: Some(Speed::SafeLow),
1474 max_fee_per_gas: None,
1475 max_priority_fee_per_gas: None,
1476 raw: None,
1477 });
1478
1479 let updated = repo
1480 .update_network_data(random_id.to_string(), new_network_data.clone())
1481 .await
1482 .unwrap();
1483 assert_eq!(
1484 updated
1485 .network_data
1486 .get_evm_transaction_data()
1487 .unwrap()
1488 .hash,
1489 new_network_data.get_evm_transaction_data().unwrap().hash
1490 );
1491 }
1492
1493 #[tokio::test]
1494 #[ignore = "Requires active Redis instance"]
1495 async fn test_debug_implementation() {
1496 let repo = setup_test_repo().await;
1497 let debug_str = format!("{:?}", repo);
1498 assert!(debug_str.contains("RedisTransactionRepository"));
1499 assert!(debug_str.contains("test_prefix"));
1500 }
1501
1502 #[tokio::test]
1503 #[ignore = "Requires active Redis instance"]
1504 async fn test_error_handling_empty_id() {
1505 let repo = setup_test_repo().await;
1506
1507 let result = repo.get_by_id("".to_string()).await;
1508 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1509
1510 let result = repo
1511 .update("".to_string(), create_test_transaction("test"))
1512 .await;
1513 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1514
1515 let result = repo.delete_by_id("".to_string()).await;
1516 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1517 }
1518
1519 #[tokio::test]
1520 #[ignore = "Requires active Redis instance"]
1521 async fn test_pagination_validation() {
1522 let repo = setup_test_repo().await;
1523
1524 let query = PaginationQuery {
1525 page: 1,
1526 per_page: 0,
1527 };
1528 let result = repo.list_paginated(query).await;
1529 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1530 }
1531
1532 #[tokio::test]
1533 #[ignore = "Requires active Redis instance"]
1534 async fn test_index_consistency() {
1535 let repo = setup_test_repo().await;
1536 let random_id = Uuid::new_v4().to_string();
1537 let relayer_id = Uuid::new_v4().to_string();
1538 let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1539
1540 repo.create(tx.clone()).await.unwrap();
1542
1543 let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1545 assert!(found.is_some());
1546
1547 let mut updated_tx = tx.clone();
1549 if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
1550 evm_data.nonce = Some(43);
1551 }
1552
1553 repo.update(random_id.to_string(), updated_tx)
1554 .await
1555 .unwrap();
1556
1557 let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1559 assert!(old_nonce_result.is_none());
1560
1561 let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
1563 assert!(new_nonce_result.is_some());
1564 }
1565
1566 #[tokio::test]
1567 #[ignore = "Requires active Redis instance"]
1568 async fn test_has_entries() {
1569 let repo = setup_test_repo().await;
1570 assert!(!repo.has_entries().await.unwrap());
1571
1572 let tx_id = uuid::Uuid::new_v4().to_string();
1573 let tx = create_test_transaction(&tx_id);
1574 repo.create(tx.clone()).await.unwrap();
1575
1576 assert!(repo.has_entries().await.unwrap());
1577 }
1578
1579 #[tokio::test]
1580 #[ignore = "Requires active Redis instance"]
1581 async fn test_drop_all_entries() {
1582 let repo = setup_test_repo().await;
1583 let tx_id = uuid::Uuid::new_v4().to_string();
1584 let tx = create_test_transaction(&tx_id);
1585 repo.create(tx.clone()).await.unwrap();
1586 assert!(repo.has_entries().await.unwrap());
1587
1588 repo.drop_all_entries().await.unwrap();
1589 assert!(!repo.has_entries().await.unwrap());
1590 }
1591
1592 #[tokio::test]
1594 #[ignore = "Requires active Redis instance"]
1595 async fn test_update_status_sets_delete_at_for_final_statuses() {
1596 let _lock = ENV_MUTEX.lock().await;
1597
1598 use chrono::{DateTime, Duration, Utc};
1599 use std::env;
1600
1601 env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
1603
1604 let repo = setup_test_repo().await;
1605
1606 let final_statuses = [
1607 TransactionStatus::Canceled,
1608 TransactionStatus::Confirmed,
1609 TransactionStatus::Failed,
1610 TransactionStatus::Expired,
1611 ];
1612
1613 for (i, status) in final_statuses.iter().enumerate() {
1614 let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
1615 let mut tx = create_test_transaction(&tx_id);
1616
1617 tx.delete_at = None;
1619 tx.status = TransactionStatus::Pending;
1620
1621 repo.create(tx).await.unwrap();
1622
1623 let before_update = Utc::now();
1624
1625 let updated = repo
1627 .update_status(tx_id.clone(), status.clone())
1628 .await
1629 .unwrap();
1630
1631 assert!(
1633 updated.delete_at.is_some(),
1634 "delete_at should be set for status: {:?}",
1635 status
1636 );
1637
1638 let delete_at_str = updated.delete_at.unwrap();
1640 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1641 .expect("delete_at should be valid RFC3339")
1642 .with_timezone(&Utc);
1643
1644 let duration_from_before = delete_at.signed_duration_since(before_update);
1645 let expected_duration = Duration::hours(6);
1646 let tolerance = Duration::minutes(5);
1647
1648 assert!(
1649 duration_from_before >= expected_duration - tolerance &&
1650 duration_from_before <= expected_duration + tolerance,
1651 "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
1652 status, duration_from_before
1653 );
1654 }
1655
1656 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1658 }
1659
1660 #[tokio::test]
1661 #[ignore = "Requires active Redis instance"]
1662 async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
1663 let _lock = ENV_MUTEX.lock().await;
1664
1665 use std::env;
1666
1667 env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
1668
1669 let repo = setup_test_repo().await;
1670
1671 let non_final_statuses = [
1672 TransactionStatus::Pending,
1673 TransactionStatus::Sent,
1674 TransactionStatus::Submitted,
1675 TransactionStatus::Mined,
1676 ];
1677
1678 for (i, status) in non_final_statuses.iter().enumerate() {
1679 let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
1680 let mut tx = create_test_transaction(&tx_id);
1681 tx.delete_at = None;
1682 tx.status = TransactionStatus::Pending;
1683
1684 repo.create(tx).await.unwrap();
1685
1686 let updated = repo
1688 .update_status(tx_id.clone(), status.clone())
1689 .await
1690 .unwrap();
1691
1692 assert!(
1694 updated.delete_at.is_none(),
1695 "delete_at should NOT be set for status: {:?}",
1696 status
1697 );
1698 }
1699
1700 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1702 }
1703
1704 #[tokio::test]
1705 #[ignore = "Requires active Redis instance"]
1706 async fn test_partial_update_sets_delete_at_for_final_statuses() {
1707 let _lock = ENV_MUTEX.lock().await;
1708
1709 use chrono::{DateTime, Duration, Utc};
1710 use std::env;
1711
1712 env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
1713
1714 let repo = setup_test_repo().await;
1715 let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
1716 let mut tx = create_test_transaction(&tx_id);
1717 tx.delete_at = None;
1718 tx.status = TransactionStatus::Pending;
1719
1720 repo.create(tx).await.unwrap();
1721
1722 let before_update = Utc::now();
1723
1724 let update = TransactionUpdateRequest {
1726 status: Some(TransactionStatus::Confirmed),
1727 status_reason: Some("Transaction completed".to_string()),
1728 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
1729 ..Default::default()
1730 };
1731
1732 let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
1733
1734 assert!(
1736 updated.delete_at.is_some(),
1737 "delete_at should be set when updating to Confirmed status"
1738 );
1739
1740 let delete_at_str = updated.delete_at.unwrap();
1742 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1743 .expect("delete_at should be valid RFC3339")
1744 .with_timezone(&Utc);
1745
1746 let duration_from_before = delete_at.signed_duration_since(before_update);
1747 let expected_duration = Duration::hours(8);
1748 let tolerance = Duration::minutes(5);
1749
1750 assert!(
1751 duration_from_before >= expected_duration - tolerance
1752 && duration_from_before <= expected_duration + tolerance,
1753 "delete_at should be approximately 8 hours from now. Duration: {:?}",
1754 duration_from_before
1755 );
1756
1757 assert_eq!(updated.status, TransactionStatus::Confirmed);
1759 assert_eq!(
1760 updated.status_reason,
1761 Some("Transaction completed".to_string())
1762 );
1763 assert_eq!(
1764 updated.confirmed_at,
1765 Some("2023-01-01T12:05:00Z".to_string())
1766 );
1767
1768 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1770 }
1771
1772 #[tokio::test]
1773 #[ignore = "Requires active Redis instance"]
1774 async fn test_update_status_preserves_existing_delete_at() {
1775 let _lock = ENV_MUTEX.lock().await;
1776
1777 use std::env;
1778
1779 env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
1780
1781 let repo = setup_test_repo().await;
1782 let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
1783 let mut tx = create_test_transaction(&tx_id);
1784
1785 let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
1787 tx.delete_at = Some(existing_delete_at.clone());
1788 tx.status = TransactionStatus::Pending;
1789
1790 repo.create(tx).await.unwrap();
1791
1792 let updated = repo
1794 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
1795 .await
1796 .unwrap();
1797
1798 assert_eq!(
1800 updated.delete_at,
1801 Some(existing_delete_at),
1802 "Existing delete_at should be preserved when updating to final status"
1803 );
1804
1805 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1807 }
1808 #[tokio::test]
1809 #[ignore = "Requires active Redis instance"]
1810 async fn test_partial_update_without_status_change_preserves_delete_at() {
1811 let _lock = ENV_MUTEX.lock().await;
1812
1813 use std::env;
1814
1815 env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
1816
1817 let repo = setup_test_repo().await;
1818 let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
1819 let mut tx = create_test_transaction(&tx_id);
1820 tx.delete_at = None;
1821 tx.status = TransactionStatus::Pending;
1822
1823 repo.create(tx).await.unwrap();
1824
1825 let updated1 = repo
1827 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
1828 .await
1829 .unwrap();
1830
1831 assert!(updated1.delete_at.is_some());
1832 let original_delete_at = updated1.delete_at.clone();
1833
1834 let update = TransactionUpdateRequest {
1836 status: None, status_reason: Some("Updated reason".to_string()),
1838 confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
1839 ..Default::default()
1840 };
1841
1842 let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
1843
1844 assert_eq!(
1846 updated2.delete_at, original_delete_at,
1847 "delete_at should be preserved when status is not updated"
1848 );
1849
1850 assert_eq!(updated2.status, TransactionStatus::Confirmed); assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
1853 assert_eq!(
1854 updated2.confirmed_at,
1855 Some("2023-01-01T12:10:00Z".to_string())
1856 );
1857
1858 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1860 }
1861}