openzeppelin_relayer/repositories/transaction/
transaction_redis.rs

1//! Redis-backed implementation of the TransactionRepository.
2
3use crate::models::{
4    NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
5    TransactionStatus, TransactionUpdateRequest,
6};
7use crate::repositories::redis_base::RedisRepository;
8use crate::repositories::{
9    BatchRetrievalResult, PaginatedResult, Repository, TransactionRepository,
10};
11use async_trait::async_trait;
12use redis::aio::ConnectionManager;
13use redis::AsyncCommands;
14use std::fmt;
15use std::sync::Arc;
16use tracing::{debug, error, warn};
17
18const RELAYER_PREFIX: &str = "relayer";
19const TX_PREFIX: &str = "tx";
20const STATUS_PREFIX: &str = "status";
21const NONCE_PREFIX: &str = "nonce";
22const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
23const RELAYER_LIST_KEY: &str = "relayer_list";
24
25#[derive(Clone)]
26pub struct RedisTransactionRepository {
27    pub client: Arc<ConnectionManager>,
28    pub key_prefix: String,
29}
30
31impl RedisRepository for RedisTransactionRepository {}
32
33impl RedisTransactionRepository {
34    pub fn new(
35        connection_manager: Arc<ConnectionManager>,
36        key_prefix: String,
37    ) -> Result<Self, RepositoryError> {
38        if key_prefix.is_empty() {
39            return Err(RepositoryError::InvalidData(
40                "Redis key prefix cannot be empty".to_string(),
41            ));
42        }
43
44        Ok(Self {
45            client: connection_manager,
46            key_prefix,
47        })
48    }
49
50    /// Generate key for transaction data: relayer:{relayer_id}:tx:{tx_id}
51    fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
52        format!(
53            "{}:{}:{}:{}:{}",
54            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
55        )
56    }
57
58    /// Generate key for reverse lookup: tx_to_relayer:{tx_id}
59    fn tx_to_relayer_key(&self, tx_id: &str) -> String {
60        format!(
61            "{}:{}:{}:{}",
62            self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
63        )
64    }
65
66    /// Generate key for relayer status index: relayer:{relayer_id}:status:{status}
67    fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
68        format!(
69            "{}:{}:{}:{}:{}",
70            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
71        )
72    }
73
74    /// Generate key for relayer nonce index: relayer:{relayer_id}:nonce:{nonce}
75    fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
76        format!(
77            "{}:{}:{}:{}:{}",
78            self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
79        )
80    }
81
82    /// Generate key for relayer list: relayer_list (set of all relayer IDs)
83    fn relayer_list_key(&self) -> String {
84        format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
85    }
86
87    /// Batch fetch transactions by IDs using reverse lookup
88    async fn get_transactions_by_ids(
89        &self,
90        ids: &[String],
91    ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
92        if ids.is_empty() {
93            debug!("no transaction IDs provided for batch fetch");
94            return Ok(BatchRetrievalResult {
95                results: vec![],
96                failed_ids: vec![],
97            });
98        }
99
100        let mut conn = self.client.as_ref().clone();
101
102        let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
103
104        debug!(count = %ids.len(), "fetching relayer IDs for transactions");
105
106        let relayer_ids: Vec<Option<String>> = conn
107            .mget(&reverse_keys)
108            .await
109            .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
110
111        let mut tx_keys = Vec::new();
112        let mut valid_ids = Vec::new();
113        let mut failed_ids = Vec::new();
114        for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
115            match relayer_id {
116                Some(relayer_id) => {
117                    tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
118                    valid_ids.push(ids[i].clone());
119                }
120                None => {
121                    warn!(tx_id = %ids[i], "no relayer found for transaction");
122                    failed_ids.push(ids[i].clone());
123                }
124            }
125        }
126
127        if tx_keys.is_empty() {
128            debug!("no valid transactions found for batch fetch");
129            return Ok(BatchRetrievalResult {
130                results: vec![],
131                failed_ids,
132            });
133        }
134
135        debug!(count = %tx_keys.len(), "batch fetching transaction data");
136
137        let values: Vec<Option<String>> = conn
138            .mget(&tx_keys)
139            .await
140            .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
141
142        let mut transactions = Vec::new();
143        let mut failed_count = 0;
144        let mut failed_ids = Vec::new();
145        for (i, value) in values.into_iter().enumerate() {
146            match value {
147                Some(json) => {
148                    match self.deserialize_entity::<TransactionRepoModel>(
149                        &json,
150                        &valid_ids[i],
151                        "transaction",
152                    ) {
153                        Ok(tx) => transactions.push(tx),
154                        Err(e) => {
155                            failed_count += 1;
156                            error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
157                            // Continue processing other transactions
158                        }
159                    }
160                }
161                None => {
162                    warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
163                    failed_ids.push(valid_ids[i].clone());
164                }
165            }
166        }
167
168        if failed_count > 0 {
169            warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
170        }
171
172        debug!(count = %transactions.len(), "successfully fetched transactions");
173        Ok(BatchRetrievalResult {
174            results: transactions,
175            failed_ids,
176        })
177    }
178
179    /// Extract nonce from EVM transaction data
180    fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
181        match network_data.get_evm_transaction_data() {
182            Ok(tx_data) => tx_data.nonce,
183            Err(_) => {
184                debug!("no EVM transaction data available for nonce extraction");
185                None
186            }
187        }
188    }
189
190    /// Update indexes atomically with comprehensive error handling
191    async fn update_indexes(
192        &self,
193        tx: &TransactionRepoModel,
194        old_tx: Option<&TransactionRepoModel>,
195    ) -> Result<(), RepositoryError> {
196        let mut conn = self.client.as_ref().clone();
197        let mut pipe = redis::pipe();
198        pipe.atomic();
199
200        debug!(tx_id = %tx.id, "updating indexes for transaction");
201
202        // Add relayer to the global relayer list
203        let relayer_list_key = self.relayer_list_key();
204        pipe.sadd(&relayer_list_key, &tx.relayer_id);
205
206        // Handle status index updates
207        let new_status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
208        pipe.sadd(&new_status_key, &tx.id);
209
210        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
211            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
212            pipe.set(&nonce_key, &tx.id);
213            debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
214        }
215
216        // Remove old indexes if updating
217        if let Some(old) = old_tx {
218            if old.status != tx.status {
219                let old_status_key = self.relayer_status_key(&old.relayer_id, &old.status);
220                pipe.srem(&old_status_key, &tx.id);
221                debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status index for transaction");
222            }
223
224            // Handle nonce index cleanup
225            if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
226                let new_nonce = self.extract_nonce(&tx.network_data);
227                if Some(old_nonce) != new_nonce {
228                    let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
229                    pipe.del(&old_nonce_key);
230                    debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
231                }
232            }
233        }
234
235        // Execute all operations in a single pipeline
236        pipe.exec_async(&mut conn).await.map_err(|e| {
237            error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
238            self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
239        })?;
240
241        debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
242        Ok(())
243    }
244
245    /// Remove all indexes with error recovery
246    async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
247        let mut conn = self.client.as_ref().clone();
248        let mut pipe = redis::pipe();
249        pipe.atomic();
250
251        debug!(tx_id = %tx.id, "removing all indexes for transaction");
252
253        // Remove from status index
254        let status_key = self.relayer_status_key(&tx.relayer_id, &tx.status);
255        pipe.srem(&status_key, &tx.id);
256
257        // Remove nonce index if exists
258        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
259            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
260            pipe.del(&nonce_key);
261            debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
262        }
263
264        // Remove reverse lookup
265        let reverse_key = self.tx_to_relayer_key(&tx.id);
266        pipe.del(&reverse_key);
267
268        pipe.exec_async(&mut conn).await.map_err(|e| {
269            error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
270            self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
271        })?;
272
273        debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
274        Ok(())
275    }
276}
277
278impl fmt::Debug for RedisTransactionRepository {
279    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280        f.debug_struct("RedisTransactionRepository")
281            .field("client", &"<ConnectionManager>")
282            .field("key_prefix", &self.key_prefix)
283            .finish()
284    }
285}
286
287#[async_trait]
288impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
289    async fn create(
290        &self,
291        entity: TransactionRepoModel,
292    ) -> Result<TransactionRepoModel, RepositoryError> {
293        if entity.id.is_empty() {
294            return Err(RepositoryError::InvalidData(
295                "Transaction ID cannot be empty".to_string(),
296            ));
297        }
298
299        let key = self.tx_key(&entity.relayer_id, &entity.id);
300        let reverse_key = self.tx_to_relayer_key(&entity.id);
301        let mut conn = self.client.as_ref().clone();
302
303        debug!(tx_id = %entity.id, "creating transaction");
304
305        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
306
307        // Check if transaction already exists by checking reverse lookup
308        let existing: Option<String> = conn
309            .get(&reverse_key)
310            .await
311            .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
312
313        if existing.is_some() {
314            return Err(RepositoryError::ConstraintViolation(format!(
315                "Transaction with ID {} already exists",
316                entity.id
317            )));
318        }
319
320        // Use atomic pipeline for consistency
321        let mut pipe = redis::pipe();
322        pipe.atomic();
323        pipe.set(&key, &value);
324        pipe.set(&reverse_key, &entity.relayer_id);
325
326        pipe.exec_async(&mut conn)
327            .await
328            .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
329
330        // Update indexes separately to handle partial failures gracefully
331        if let Err(e) = self.update_indexes(&entity, None).await {
332            error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
333            return Err(e);
334        }
335
336        debug!(tx_id = %entity.id, "successfully created transaction");
337        Ok(entity)
338    }
339
340    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
341        if id.is_empty() {
342            return Err(RepositoryError::InvalidData(
343                "Transaction ID cannot be empty".to_string(),
344            ));
345        }
346
347        let mut conn = self.client.as_ref().clone();
348
349        debug!(tx_id = %id, "fetching transaction");
350
351        let reverse_key = self.tx_to_relayer_key(&id);
352        let relayer_id: Option<String> = conn
353            .get(&reverse_key)
354            .await
355            .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
356
357        let relayer_id = match relayer_id {
358            Some(relayer_id) => relayer_id,
359            None => {
360                debug!(tx_id = %id, "transaction not found (no reverse lookup)");
361                return Err(RepositoryError::NotFound(format!(
362                    "Transaction with ID {} not found",
363                    id
364                )));
365            }
366        };
367
368        let key = self.tx_key(&relayer_id, &id);
369        let value: Option<String> = conn
370            .get(&key)
371            .await
372            .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
373
374        match value {
375            Some(json) => {
376                let tx =
377                    self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
378                debug!(tx_id = %id, "successfully fetched transaction");
379                Ok(tx)
380            }
381            None => {
382                debug!(tx_id = %id, "transaction not found");
383                Err(RepositoryError::NotFound(format!(
384                    "Transaction with ID {} not found",
385                    id
386                )))
387            }
388        }
389    }
390
391    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
392        let mut conn = self.client.as_ref().clone();
393
394        debug!("fetching all transaction IDs");
395
396        // Get all relayer IDs
397        let relayer_list_key = self.relayer_list_key();
398        let relayer_ids: Vec<String> = conn
399            .smembers(&relayer_list_key)
400            .await
401            .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
402
403        debug!(count = %relayer_ids.len(), "found relayers");
404
405        // Collect all transaction IDs from all relayers
406        let mut all_tx_ids = Vec::new();
407        for relayer_id in relayer_ids {
408            let pattern = format!(
409                "{}:{}:{}:{}:*",
410                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
411            );
412            let mut cursor = 0;
413            loop {
414                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
415                    .cursor_arg(cursor)
416                    .arg("MATCH")
417                    .arg(&pattern)
418                    .query_async(&mut conn)
419                    .await
420                    .map_err(|e| self.map_redis_error(e, "list_all_scan_keys"))?;
421
422                // Extract transaction IDs from keys
423                for key in keys {
424                    if let Some(tx_id) = key.split(':').next_back() {
425                        all_tx_ids.push(tx_id.to_string());
426                    }
427                }
428
429                cursor = next_cursor;
430                if cursor == 0 {
431                    break;
432                }
433            }
434        }
435
436        debug!(count = %all_tx_ids.len(), "found transaction IDs");
437
438        let transactions = self.get_transactions_by_ids(&all_tx_ids).await?;
439        Ok(transactions.results)
440    }
441
442    async fn list_paginated(
443        &self,
444        query: PaginationQuery,
445    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
446        if query.per_page == 0 {
447            return Err(RepositoryError::InvalidData(
448                "per_page must be greater than 0".to_string(),
449            ));
450        }
451
452        let mut conn = self.client.as_ref().clone();
453
454        debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions");
455
456        // Get all relayer IDs
457        let relayer_list_key = self.relayer_list_key();
458        let relayer_ids: Vec<String> = conn
459            .smembers(&relayer_list_key)
460            .await
461            .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
462
463        // Collect all transaction IDs from all relayers
464        let mut all_tx_ids = Vec::new();
465        for relayer_id in relayer_ids {
466            let pattern = format!(
467                "{}:{}:{}:{}:*",
468                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
469            );
470            let mut cursor = 0;
471            loop {
472                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
473                    .cursor_arg(cursor)
474                    .arg("MATCH")
475                    .arg(&pattern)
476                    .query_async(&mut conn)
477                    .await
478                    .map_err(|e| self.map_redis_error(e, "list_paginated_scan_keys"))?;
479
480                // Extract transaction IDs from keys
481                for key in keys {
482                    if let Some(tx_id) = key.split(':').next_back() {
483                        all_tx_ids.push(tx_id.to_string());
484                    }
485                }
486
487                cursor = next_cursor;
488                if cursor == 0 {
489                    break;
490                }
491            }
492        }
493
494        let total = all_tx_ids.len() as u64;
495        let start = ((query.page - 1) * query.per_page) as usize;
496        let end = (start + query.per_page as usize).min(all_tx_ids.len());
497
498        if start >= all_tx_ids.len() {
499            debug!(page = %query.page, total = %total, "page is beyond available data");
500            return Ok(PaginatedResult {
501                items: vec![],
502                total,
503                page: query.page,
504                per_page: query.per_page,
505            });
506        }
507
508        let page_ids = &all_tx_ids[start..end];
509        let items = self.get_transactions_by_ids(page_ids).await?;
510
511        debug!(count = %items.results.len(), page = %query.page, "successfully fetched transactions for page");
512
513        Ok(PaginatedResult {
514            items: items.results.clone(),
515            total,
516            page: query.page,
517            per_page: query.per_page,
518        })
519    }
520
521    async fn update(
522        &self,
523        id: String,
524        entity: TransactionRepoModel,
525    ) -> Result<TransactionRepoModel, RepositoryError> {
526        if id.is_empty() {
527            return Err(RepositoryError::InvalidData(
528                "Transaction ID cannot be empty".to_string(),
529            ));
530        }
531
532        debug!(tx_id = %id, "updating transaction");
533
534        // Get the old transaction for index cleanup
535        let old_tx = self.get_by_id(id.clone()).await?;
536
537        let key = self.tx_key(&entity.relayer_id, &id);
538        let mut conn = self.client.as_ref().clone();
539
540        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
541
542        // Update transaction
543        let _: () = conn
544            .set(&key, value)
545            .await
546            .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
547
548        // Update indexes
549        self.update_indexes(&entity, Some(&old_tx)).await?;
550
551        debug!(tx_id = %id, "successfully updated transaction");
552        Ok(entity)
553    }
554
555    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
556        if id.is_empty() {
557            return Err(RepositoryError::InvalidData(
558                "Transaction ID cannot be empty".to_string(),
559            ));
560        }
561
562        debug!(tx_id = %id, "deleting transaction");
563
564        // Get transaction first for index cleanup
565        let tx = self.get_by_id(id.clone()).await?;
566
567        let key = self.tx_key(&tx.relayer_id, &id);
568        let reverse_key = self.tx_to_relayer_key(&id);
569        let mut conn = self.client.as_ref().clone();
570
571        let mut pipe = redis::pipe();
572        pipe.atomic();
573        pipe.del(&key);
574        pipe.del(&reverse_key);
575
576        pipe.exec_async(&mut conn)
577            .await
578            .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
579
580        // Remove indexes (log errors but don't fail the delete)
581        if let Err(e) = self.remove_all_indexes(&tx).await {
582            error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
583        }
584
585        debug!(tx_id = %id, "successfully deleted transaction");
586        Ok(())
587    }
588
589    async fn count(&self) -> Result<usize, RepositoryError> {
590        let mut conn = self.client.as_ref().clone();
591
592        debug!("counting transactions");
593
594        // Get all relayer IDs
595        let relayer_list_key = self.relayer_list_key();
596        let relayer_ids: Vec<String> = conn
597            .smembers(&relayer_list_key)
598            .await
599            .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
600
601        // Count transactions across all relayers
602        let mut total_count = 0;
603        for relayer_id in relayer_ids {
604            let pattern = format!(
605                "{}:{}:{}:{}:*",
606                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
607            );
608            let mut cursor = 0;
609            loop {
610                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
611                    .cursor_arg(cursor)
612                    .arg("MATCH")
613                    .arg(&pattern)
614                    .query_async(&mut conn)
615                    .await
616                    .map_err(|e| self.map_redis_error(e, "count_scan_keys"))?;
617
618                total_count += keys.len();
619
620                cursor = next_cursor;
621                if cursor == 0 {
622                    break;
623                }
624            }
625        }
626
627        debug!(count = %total_count, "transaction count");
628        Ok(total_count)
629    }
630
631    async fn has_entries(&self) -> Result<bool, RepositoryError> {
632        let mut conn = self.client.as_ref().clone();
633        let relayer_list_key = self.relayer_list_key();
634
635        debug!("checking if transaction entries exist");
636
637        let exists: bool = conn
638            .exists(&relayer_list_key)
639            .await
640            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
641
642        debug!(exists = %exists, "transaction entries exist");
643        Ok(exists)
644    }
645
646    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
647        let mut conn = self.client.as_ref().clone();
648        let relayer_list_key = self.relayer_list_key();
649
650        debug!("dropping all transaction entries");
651
652        // Get all relayer IDs first
653        let relayer_ids: Vec<String> = conn
654            .smembers(&relayer_list_key)
655            .await
656            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
657
658        if relayer_ids.is_empty() {
659            debug!("no transaction entries to drop");
660            return Ok(());
661        }
662
663        // Use pipeline for atomic operations
664        let mut pipe = redis::pipe();
665        pipe.atomic();
666
667        // Delete all transactions and their indexes for each relayer
668        for relayer_id in &relayer_ids {
669            // Get all transaction IDs for this relayer
670            let pattern = format!(
671                "{}:{}:{}:{}:*",
672                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
673            );
674            let mut cursor = 0;
675            let mut tx_ids = Vec::new();
676
677            loop {
678                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
679                    .cursor_arg(cursor)
680                    .arg("MATCH")
681                    .arg(&pattern)
682                    .query_async(&mut conn)
683                    .await
684                    .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
685
686                // Extract transaction IDs from keys and delete keys
687                for key in keys {
688                    pipe.del(&key);
689                    if let Some(tx_id) = key.split(':').next_back() {
690                        tx_ids.push(tx_id.to_string());
691                    }
692                }
693
694                cursor = next_cursor;
695                if cursor == 0 {
696                    break;
697                }
698            }
699
700            // Delete reverse lookup keys and indexes
701            for tx_id in tx_ids {
702                let reverse_key = self.tx_to_relayer_key(&tx_id);
703                pipe.del(&reverse_key);
704
705                // Delete status indexes (we can't know the specific status, so we'll clean up known ones)
706                for status in &[
707                    TransactionStatus::Pending,
708                    TransactionStatus::Sent,
709                    TransactionStatus::Confirmed,
710                    TransactionStatus::Failed,
711                    TransactionStatus::Canceled,
712                ] {
713                    let status_key = self.relayer_status_key(relayer_id, status);
714                    pipe.srem(&status_key, &tx_id);
715                }
716            }
717        }
718
719        // Delete the relayer list key
720        pipe.del(&relayer_list_key);
721
722        pipe.exec_async(&mut conn)
723            .await
724            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
725
726        debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
727        Ok(())
728    }
729}
730
731#[async_trait]
732impl TransactionRepository for RedisTransactionRepository {
733    async fn find_by_relayer_id(
734        &self,
735        relayer_id: &str,
736        query: PaginationQuery,
737    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
738        let mut conn = self.client.as_ref().clone();
739
740        // Scan for all transaction keys for this relayer
741        let pattern = format!(
742            "{}:{}:{}:{}:*",
743            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
744        );
745        let mut all_tx_ids = Vec::new();
746        let mut cursor = 0;
747
748        loop {
749            let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
750                .cursor_arg(cursor)
751                .arg("MATCH")
752                .arg(&pattern)
753                .query_async(&mut conn)
754                .await
755                .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_scan"))?;
756
757            // Extract transaction IDs from keys
758            for key in keys {
759                if let Some(tx_id) = key.split(':').next_back() {
760                    all_tx_ids.push(tx_id.to_string());
761                }
762            }
763
764            cursor = next_cursor;
765            if cursor == 0 {
766                break;
767            }
768        }
769
770        let total = all_tx_ids.len() as u64;
771        let start = ((query.page - 1) * query.per_page) as usize;
772        let end = (start + query.per_page as usize).min(all_tx_ids.len());
773
774        let page_ids = &all_tx_ids[start..end];
775        let items = self.get_transactions_by_ids(page_ids).await?;
776
777        Ok(PaginatedResult {
778            items: items.results.clone(),
779            total,
780            page: query.page,
781            per_page: query.per_page,
782        })
783    }
784
785    async fn find_by_status(
786        &self,
787        relayer_id: &str,
788        statuses: &[TransactionStatus],
789    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
790        let mut conn = self.client.as_ref().clone();
791        let mut all_ids = Vec::new();
792
793        // Collect IDs from all status sets
794        for status in statuses {
795            let status_key = self.relayer_status_key(relayer_id, status);
796            let ids: Vec<String> = conn
797                .smembers(status_key)
798                .await
799                .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
800
801            all_ids.extend(ids);
802        }
803
804        // Remove duplicates and batch fetch
805        all_ids.sort();
806        all_ids.dedup();
807
808        let transactions = self.get_transactions_by_ids(&all_ids).await?;
809        Ok(transactions.results)
810    }
811
812    async fn find_by_nonce(
813        &self,
814        relayer_id: &str,
815        nonce: u64,
816    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
817        let mut conn = self.client.as_ref().clone();
818        let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
819
820        // Get transaction ID with this nonce for this relayer (should be single value)
821        let tx_id: Option<String> = conn
822            .get(nonce_key)
823            .await
824            .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
825
826        match tx_id {
827            Some(tx_id) => {
828                match self.get_by_id(tx_id.clone()).await {
829                    Ok(tx) => Ok(Some(tx)),
830                    Err(RepositoryError::NotFound(_)) => {
831                        // Transaction was deleted but index wasn't cleaned up
832                        warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
833                        Ok(None)
834                    }
835                    Err(e) => Err(e),
836                }
837            }
838            None => Ok(None),
839        }
840    }
841
842    async fn update_status(
843        &self,
844        tx_id: String,
845        status: TransactionStatus,
846    ) -> Result<TransactionRepoModel, RepositoryError> {
847        let update = TransactionUpdateRequest {
848            status: Some(status),
849            ..Default::default()
850        };
851        self.partial_update(tx_id, update).await
852    }
853
854    async fn partial_update(
855        &self,
856        tx_id: String,
857        update: TransactionUpdateRequest,
858    ) -> Result<TransactionRepoModel, RepositoryError> {
859        // Get current transaction
860        let mut tx = self.get_by_id(tx_id.clone()).await?;
861        let old_tx = tx.clone(); // Keep copy for index updates
862
863        // Apply partial updates using the model's business logic
864        tx.apply_partial_update(update);
865
866        // Update transaction and indexes atomically
867        let key = self.tx_key(&tx.relayer_id, &tx_id);
868        let mut conn = self.client.as_ref().clone();
869
870        let value = self.serialize_entity(&tx, |t| &t.id, "transaction")?;
871
872        let _: () = conn
873            .set(&key, value)
874            .await
875            .map_err(|e| self.map_redis_error(e, "partial_update"))?;
876
877        self.update_indexes(&tx, Some(&old_tx)).await?;
878        Ok(tx)
879    }
880
881    async fn update_network_data(
882        &self,
883        tx_id: String,
884        network_data: NetworkTransactionData,
885    ) -> Result<TransactionRepoModel, RepositoryError> {
886        let update = TransactionUpdateRequest {
887            network_data: Some(network_data),
888            ..Default::default()
889        };
890        self.partial_update(tx_id, update).await
891    }
892
893    async fn set_sent_at(
894        &self,
895        tx_id: String,
896        sent_at: String,
897    ) -> Result<TransactionRepoModel, RepositoryError> {
898        let update = TransactionUpdateRequest {
899            sent_at: Some(sent_at),
900            ..Default::default()
901        };
902        self.partial_update(tx_id, update).await
903    }
904
905    async fn set_confirmed_at(
906        &self,
907        tx_id: String,
908        confirmed_at: String,
909    ) -> Result<TransactionRepoModel, RepositoryError> {
910        let update = TransactionUpdateRequest {
911            confirmed_at: Some(confirmed_at),
912            ..Default::default()
913        };
914        self.partial_update(tx_id, update).await
915    }
916}
917
918#[cfg(test)]
919mod tests {
920    use super::*;
921    use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
922    use alloy::primitives::U256;
923    use lazy_static::lazy_static;
924    use redis::Client;
925    use std::str::FromStr;
926    use tokio;
927    use uuid::Uuid;
928
929    use tokio::sync::Mutex;
930
931    // Use a mutex to ensure tests don't run in parallel when modifying env vars
932    lazy_static! {
933        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
934    }
935
936    // Helper function to create test transactions
937    fn create_test_transaction(id: &str) -> TransactionRepoModel {
938        TransactionRepoModel {
939            id: id.to_string(),
940            relayer_id: "relayer-1".to_string(),
941            status: TransactionStatus::Pending,
942            status_reason: None,
943            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
944            sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
945            confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
946            valid_until: None,
947            delete_at: None,
948            network_type: NetworkType::Evm,
949            priced_at: None,
950            hashes: vec![],
951            network_data: NetworkTransactionData::Evm(EvmTransactionData {
952                gas_price: Some(1000000000),
953                gas_limit: Some(21000),
954                nonce: Some(1),
955                value: U256::from_str("1000000000000000000").unwrap(),
956                data: Some("0x".to_string()),
957                from: "0xSender".to_string(),
958                to: Some("0xRecipient".to_string()),
959                chain_id: 1,
960                signature: None,
961                hash: Some(format!("0x{}", id)),
962                speed: Some(Speed::Fast),
963                max_fee_per_gas: None,
964                max_priority_fee_per_gas: None,
965                raw: None,
966            }),
967            noop_count: None,
968            is_canceled: Some(false),
969        }
970    }
971
972    fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
973        let mut tx = create_test_transaction(id);
974        tx.relayer_id = relayer_id.to_string();
975        tx
976    }
977
978    fn create_test_transaction_with_status(
979        id: &str,
980        relayer_id: &str,
981        status: TransactionStatus,
982    ) -> TransactionRepoModel {
983        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
984        tx.status = status;
985        tx
986    }
987
988    fn create_test_transaction_with_nonce(
989        id: &str,
990        nonce: u64,
991        relayer_id: &str,
992    ) -> TransactionRepoModel {
993        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
994        if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
995            evm_data.nonce = Some(nonce);
996        }
997        tx
998    }
999
1000    async fn setup_test_repo() -> RedisTransactionRepository {
1001        // Use a mock Redis URL - in real integration tests, this would connect to a test Redis instance
1002        let redis_url = std::env::var("REDIS_TEST_URL")
1003            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1004
1005        let client = Client::open(redis_url).expect("Failed to create Redis client");
1006        let connection_manager = ConnectionManager::new(client)
1007            .await
1008            .expect("Failed to create connection manager");
1009
1010        let random_id = Uuid::new_v4().to_string();
1011        let key_prefix = format!("test_prefix:{}", random_id);
1012
1013        RedisTransactionRepository::new(Arc::new(connection_manager), key_prefix)
1014            .expect("Failed to create RedisTransactionRepository")
1015    }
1016
1017    #[tokio::test]
1018    #[ignore = "Requires active Redis instance"]
1019    async fn test_new_repository_creation() {
1020        let repo = setup_test_repo().await;
1021        assert!(repo.key_prefix.contains("test_prefix"));
1022    }
1023
1024    #[tokio::test]
1025    #[ignore = "Requires active Redis instance"]
1026    async fn test_new_repository_empty_prefix_fails() {
1027        let redis_url = std::env::var("REDIS_TEST_URL")
1028            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1029        let client = Client::open(redis_url).expect("Failed to create Redis client");
1030        let connection_manager = ConnectionManager::new(client)
1031            .await
1032            .expect("Failed to create connection manager");
1033
1034        let result = RedisTransactionRepository::new(Arc::new(connection_manager), "".to_string());
1035        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1036    }
1037
1038    #[tokio::test]
1039    #[ignore = "Requires active Redis instance"]
1040    async fn test_key_generation() {
1041        let repo = setup_test_repo().await;
1042
1043        assert!(repo
1044            .tx_key("relayer-1", "test-id")
1045            .contains(":relayer:relayer-1:tx:test-id"));
1046        assert!(repo
1047            .tx_to_relayer_key("test-id")
1048            .contains(":relayer:tx_to_relayer:test-id"));
1049        assert!(repo.relayer_list_key().contains(":relayer_list"));
1050        assert!(repo
1051            .relayer_status_key("relayer-1", &TransactionStatus::Pending)
1052            .contains(":relayer:relayer-1:status:Pending"));
1053        assert!(repo
1054            .relayer_nonce_key("relayer-1", 42)
1055            .contains(":relayer:relayer-1:nonce:42"));
1056    }
1057
1058    #[tokio::test]
1059    #[ignore = "Requires active Redis instance"]
1060    async fn test_serialize_deserialize_transaction() {
1061        let repo = setup_test_repo().await;
1062        let tx = create_test_transaction("test-1");
1063
1064        let serialized = repo
1065            .serialize_entity(&tx, |t| &t.id, "transaction")
1066            .expect("Serialization should succeed");
1067        let deserialized: TransactionRepoModel = repo
1068            .deserialize_entity(&serialized, "test-1", "transaction")
1069            .expect("Deserialization should succeed");
1070
1071        assert_eq!(tx.id, deserialized.id);
1072        assert_eq!(tx.relayer_id, deserialized.relayer_id);
1073        assert_eq!(tx.status, deserialized.status);
1074    }
1075
1076    #[tokio::test]
1077    #[ignore = "Requires active Redis instance"]
1078    async fn test_extract_nonce() {
1079        let repo = setup_test_repo().await;
1080        let random_id = Uuid::new_v4().to_string();
1081        let relayer_id = Uuid::new_v4().to_string();
1082        let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1083
1084        let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
1085        assert_eq!(nonce, Some(42));
1086    }
1087
1088    #[tokio::test]
1089    #[ignore = "Requires active Redis instance"]
1090    async fn test_create_transaction() {
1091        let repo = setup_test_repo().await;
1092        let random_id = Uuid::new_v4().to_string();
1093        let tx = create_test_transaction(&random_id);
1094
1095        let result = repo.create(tx.clone()).await.unwrap();
1096        assert_eq!(result.id, tx.id);
1097    }
1098
1099    #[tokio::test]
1100    #[ignore = "Requires active Redis instance"]
1101    async fn test_get_transaction() {
1102        let repo = setup_test_repo().await;
1103        let random_id = Uuid::new_v4().to_string();
1104        let tx = create_test_transaction(&random_id);
1105
1106        repo.create(tx.clone()).await.unwrap();
1107        let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
1108        assert_eq!(stored.id, tx.id);
1109        assert_eq!(stored.relayer_id, tx.relayer_id);
1110    }
1111
1112    #[tokio::test]
1113    #[ignore = "Requires active Redis instance"]
1114    async fn test_update_transaction() {
1115        let repo = setup_test_repo().await;
1116        let random_id = Uuid::new_v4().to_string();
1117        let mut tx = create_test_transaction(&random_id);
1118
1119        repo.create(tx.clone()).await.unwrap();
1120        tx.status = TransactionStatus::Confirmed;
1121
1122        let updated = repo.update(random_id.to_string(), tx).await.unwrap();
1123        assert!(matches!(updated.status, TransactionStatus::Confirmed));
1124    }
1125
1126    #[tokio::test]
1127    #[ignore = "Requires active Redis instance"]
1128    async fn test_delete_transaction() {
1129        let repo = setup_test_repo().await;
1130        let random_id = Uuid::new_v4().to_string();
1131        let tx = create_test_transaction(&random_id);
1132
1133        repo.create(tx).await.unwrap();
1134        repo.delete_by_id(random_id.to_string()).await.unwrap();
1135
1136        let result = repo.get_by_id(random_id.to_string()).await;
1137        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1138    }
1139
1140    #[tokio::test]
1141    #[ignore = "Requires active Redis instance"]
1142    async fn test_list_all_transactions() {
1143        let repo = setup_test_repo().await;
1144        let random_id = Uuid::new_v4().to_string();
1145        let random_id2 = Uuid::new_v4().to_string();
1146
1147        let tx1 = create_test_transaction(&random_id);
1148        let tx2 = create_test_transaction(&random_id2);
1149
1150        repo.create(tx1).await.unwrap();
1151        repo.create(tx2).await.unwrap();
1152
1153        let transactions = repo.list_all().await.unwrap();
1154        assert!(transactions.len() >= 2);
1155    }
1156
1157    #[tokio::test]
1158    #[ignore = "Requires active Redis instance"]
1159    async fn test_count_transactions() {
1160        let repo = setup_test_repo().await;
1161        let random_id = Uuid::new_v4().to_string();
1162        let tx = create_test_transaction(&random_id);
1163
1164        let count = repo.count().await.unwrap();
1165        repo.create(tx).await.unwrap();
1166        assert!(repo.count().await.unwrap() > count);
1167    }
1168
1169    #[tokio::test]
1170    #[ignore = "Requires active Redis instance"]
1171    async fn test_get_nonexistent_transaction() {
1172        let repo = setup_test_repo().await;
1173        let result = repo.get_by_id("nonexistent".to_string()).await;
1174        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1175    }
1176
1177    #[tokio::test]
1178    #[ignore = "Requires active Redis instance"]
1179    async fn test_duplicate_transaction_creation() {
1180        let repo = setup_test_repo().await;
1181        let random_id = Uuid::new_v4().to_string();
1182
1183        let tx = create_test_transaction(&random_id);
1184
1185        repo.create(tx.clone()).await.unwrap();
1186        let result = repo.create(tx).await;
1187
1188        assert!(matches!(
1189            result,
1190            Err(RepositoryError::ConstraintViolation(_))
1191        ));
1192    }
1193
1194    #[tokio::test]
1195    #[ignore = "Requires active Redis instance"]
1196    async fn test_update_nonexistent_transaction() {
1197        let repo = setup_test_repo().await;
1198        let tx = create_test_transaction("test-1");
1199
1200        let result = repo.update("nonexistent".to_string(), tx).await;
1201        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1202    }
1203
1204    #[tokio::test]
1205    #[ignore = "Requires active Redis instance"]
1206    async fn test_list_paginated() {
1207        let repo = setup_test_repo().await;
1208
1209        // Create multiple transactions
1210        for _ in 1..=10 {
1211            let random_id = Uuid::new_v4().to_string();
1212            let tx = create_test_transaction(&random_id);
1213            repo.create(tx).await.unwrap();
1214        }
1215
1216        // Test first page with 3 items per page
1217        let query = PaginationQuery {
1218            page: 1,
1219            per_page: 3,
1220        };
1221        let result = repo.list_paginated(query).await.unwrap();
1222        assert_eq!(result.items.len(), 3);
1223        assert!(result.total >= 10);
1224        assert_eq!(result.page, 1);
1225        assert_eq!(result.per_page, 3);
1226
1227        // Test empty page (beyond total items)
1228        let query = PaginationQuery {
1229            page: 1000,
1230            per_page: 3,
1231        };
1232        let result = repo.list_paginated(query).await.unwrap();
1233        assert_eq!(result.items.len(), 0);
1234    }
1235
1236    #[tokio::test]
1237    #[ignore = "Requires active Redis instance"]
1238    async fn test_find_by_relayer_id() {
1239        let repo = setup_test_repo().await;
1240        let random_id = Uuid::new_v4().to_string();
1241        let random_id2 = Uuid::new_v4().to_string();
1242        let random_id3 = Uuid::new_v4().to_string();
1243
1244        let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
1245        let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
1246        let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
1247
1248        repo.create(tx1).await.unwrap();
1249        repo.create(tx2).await.unwrap();
1250        repo.create(tx3).await.unwrap();
1251
1252        // Test finding transactions for relayer-1
1253        let query = PaginationQuery {
1254            page: 1,
1255            per_page: 10,
1256        };
1257        let result = repo
1258            .find_by_relayer_id("relayer-1", query.clone())
1259            .await
1260            .unwrap();
1261        assert!(result.total >= 2);
1262        assert!(result.items.len() >= 2);
1263        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
1264
1265        // Test finding transactions for relayer-2
1266        let result = repo
1267            .find_by_relayer_id("relayer-2", query.clone())
1268            .await
1269            .unwrap();
1270        assert!(result.total >= 1);
1271        assert!(!result.items.is_empty());
1272        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
1273
1274        // Test finding transactions for non-existent relayer
1275        let result = repo
1276            .find_by_relayer_id("non-existent", query.clone())
1277            .await
1278            .unwrap();
1279        assert_eq!(result.total, 0);
1280        assert_eq!(result.items.len(), 0);
1281    }
1282
1283    #[tokio::test]
1284    #[ignore = "Requires active Redis instance"]
1285    async fn test_find_by_status() {
1286        let repo = setup_test_repo().await;
1287        let random_id = Uuid::new_v4().to_string();
1288        let random_id2 = Uuid::new_v4().to_string();
1289        let random_id3 = Uuid::new_v4().to_string();
1290        let relayer_id = Uuid::new_v4().to_string();
1291        let tx1 = create_test_transaction_with_status(
1292            &random_id,
1293            &relayer_id,
1294            TransactionStatus::Pending,
1295        );
1296        let tx2 =
1297            create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
1298        let tx3 = create_test_transaction_with_status(
1299            &random_id3,
1300            &relayer_id,
1301            TransactionStatus::Confirmed,
1302        );
1303
1304        repo.create(tx1).await.unwrap();
1305        repo.create(tx2).await.unwrap();
1306        repo.create(tx3).await.unwrap();
1307
1308        // Test finding pending transactions
1309        let result = repo
1310            .find_by_status(&relayer_id, &[TransactionStatus::Pending])
1311            .await
1312            .unwrap();
1313        assert_eq!(result.len(), 1);
1314        assert_eq!(result[0].status, TransactionStatus::Pending);
1315
1316        // Test finding multiple statuses
1317        let result = repo
1318            .find_by_status(
1319                &relayer_id,
1320                &[TransactionStatus::Pending, TransactionStatus::Sent],
1321            )
1322            .await
1323            .unwrap();
1324        assert_eq!(result.len(), 2);
1325
1326        // Test finding non-existent status
1327        let result = repo
1328            .find_by_status(&relayer_id, &[TransactionStatus::Failed])
1329            .await
1330            .unwrap();
1331        assert_eq!(result.len(), 0);
1332    }
1333
1334    #[tokio::test]
1335    #[ignore = "Requires active Redis instance"]
1336    async fn test_find_by_nonce() {
1337        let repo = setup_test_repo().await;
1338        let random_id = Uuid::new_v4().to_string();
1339        let random_id2 = Uuid::new_v4().to_string();
1340        let relayer_id = Uuid::new_v4().to_string();
1341
1342        let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1343        let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
1344
1345        repo.create(tx1.clone()).await.unwrap();
1346        repo.create(tx2).await.unwrap();
1347
1348        // Test finding existing nonce
1349        let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1350        assert!(result.is_some());
1351        assert_eq!(result.unwrap().id, random_id);
1352
1353        // Test finding non-existent nonce
1354        let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
1355        assert!(result.is_none());
1356
1357        // Test finding nonce for non-existent relayer
1358        let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
1359        assert!(result.is_none());
1360    }
1361
1362    #[tokio::test]
1363    #[ignore = "Requires active Redis instance"]
1364    async fn test_update_status() {
1365        let repo = setup_test_repo().await;
1366        let random_id = Uuid::new_v4().to_string();
1367        let tx = create_test_transaction(&random_id);
1368
1369        repo.create(tx).await.unwrap();
1370        let updated = repo
1371            .update_status(random_id.to_string(), TransactionStatus::Confirmed)
1372            .await
1373            .unwrap();
1374        assert_eq!(updated.status, TransactionStatus::Confirmed);
1375    }
1376
1377    #[tokio::test]
1378    #[ignore = "Requires active Redis instance"]
1379    async fn test_partial_update() {
1380        let repo = setup_test_repo().await;
1381        let random_id = Uuid::new_v4().to_string();
1382        let tx = create_test_transaction(&random_id);
1383
1384        repo.create(tx).await.unwrap();
1385
1386        let update = TransactionUpdateRequest {
1387            status: Some(TransactionStatus::Sent),
1388            status_reason: Some("Transaction sent".to_string()),
1389            sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
1390            confirmed_at: None,
1391            network_data: None,
1392            hashes: None,
1393            is_canceled: None,
1394            priced_at: None,
1395            noop_count: None,
1396            delete_at: None,
1397        };
1398
1399        let updated = repo
1400            .partial_update(random_id.to_string(), update)
1401            .await
1402            .unwrap();
1403        assert_eq!(updated.status, TransactionStatus::Sent);
1404        assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
1405        assert_eq!(
1406            updated.sent_at,
1407            Some("2025-01-27T16:00:00.000000+00:00".to_string())
1408        );
1409    }
1410
1411    #[tokio::test]
1412    #[ignore = "Requires active Redis instance"]
1413    async fn test_set_sent_at() {
1414        let repo = setup_test_repo().await;
1415        let random_id = Uuid::new_v4().to_string();
1416        let tx = create_test_transaction(&random_id);
1417
1418        repo.create(tx).await.unwrap();
1419        let updated = repo
1420            .set_sent_at(
1421                random_id.to_string(),
1422                "2025-01-27T16:00:00.000000+00:00".to_string(),
1423            )
1424            .await
1425            .unwrap();
1426        assert_eq!(
1427            updated.sent_at,
1428            Some("2025-01-27T16:00:00.000000+00:00".to_string())
1429        );
1430    }
1431
1432    #[tokio::test]
1433    #[ignore = "Requires active Redis instance"]
1434    async fn test_set_confirmed_at() {
1435        let repo = setup_test_repo().await;
1436        let random_id = Uuid::new_v4().to_string();
1437        let tx = create_test_transaction(&random_id);
1438
1439        repo.create(tx).await.unwrap();
1440        let updated = repo
1441            .set_confirmed_at(
1442                random_id.to_string(),
1443                "2025-01-27T16:00:00.000000+00:00".to_string(),
1444            )
1445            .await
1446            .unwrap();
1447        assert_eq!(
1448            updated.confirmed_at,
1449            Some("2025-01-27T16:00:00.000000+00:00".to_string())
1450        );
1451    }
1452
1453    #[tokio::test]
1454    #[ignore = "Requires active Redis instance"]
1455    async fn test_update_network_data() {
1456        let repo = setup_test_repo().await;
1457        let random_id = Uuid::new_v4().to_string();
1458        let tx = create_test_transaction(&random_id);
1459
1460        repo.create(tx).await.unwrap();
1461
1462        let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
1463            gas_price: Some(2000000000),
1464            gas_limit: Some(42000),
1465            nonce: Some(2),
1466            value: U256::from_str("2000000000000000000").unwrap(),
1467            data: Some("0x1234".to_string()),
1468            from: "0xNewSender".to_string(),
1469            to: Some("0xNewRecipient".to_string()),
1470            chain_id: 1,
1471            signature: None,
1472            hash: Some("0xnewhash".to_string()),
1473            speed: Some(Speed::SafeLow),
1474            max_fee_per_gas: None,
1475            max_priority_fee_per_gas: None,
1476            raw: None,
1477        });
1478
1479        let updated = repo
1480            .update_network_data(random_id.to_string(), new_network_data.clone())
1481            .await
1482            .unwrap();
1483        assert_eq!(
1484            updated
1485                .network_data
1486                .get_evm_transaction_data()
1487                .unwrap()
1488                .hash,
1489            new_network_data.get_evm_transaction_data().unwrap().hash
1490        );
1491    }
1492
1493    #[tokio::test]
1494    #[ignore = "Requires active Redis instance"]
1495    async fn test_debug_implementation() {
1496        let repo = setup_test_repo().await;
1497        let debug_str = format!("{:?}", repo);
1498        assert!(debug_str.contains("RedisTransactionRepository"));
1499        assert!(debug_str.contains("test_prefix"));
1500    }
1501
1502    #[tokio::test]
1503    #[ignore = "Requires active Redis instance"]
1504    async fn test_error_handling_empty_id() {
1505        let repo = setup_test_repo().await;
1506
1507        let result = repo.get_by_id("".to_string()).await;
1508        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1509
1510        let result = repo
1511            .update("".to_string(), create_test_transaction("test"))
1512            .await;
1513        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1514
1515        let result = repo.delete_by_id("".to_string()).await;
1516        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1517    }
1518
1519    #[tokio::test]
1520    #[ignore = "Requires active Redis instance"]
1521    async fn test_pagination_validation() {
1522        let repo = setup_test_repo().await;
1523
1524        let query = PaginationQuery {
1525            page: 1,
1526            per_page: 0,
1527        };
1528        let result = repo.list_paginated(query).await;
1529        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1530    }
1531
1532    #[tokio::test]
1533    #[ignore = "Requires active Redis instance"]
1534    async fn test_index_consistency() {
1535        let repo = setup_test_repo().await;
1536        let random_id = Uuid::new_v4().to_string();
1537        let relayer_id = Uuid::new_v4().to_string();
1538        let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1539
1540        // Create transaction
1541        repo.create(tx.clone()).await.unwrap();
1542
1543        // Verify it can be found by nonce
1544        let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1545        assert!(found.is_some());
1546
1547        // Update the transaction with a new nonce
1548        let mut updated_tx = tx.clone();
1549        if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
1550            evm_data.nonce = Some(43);
1551        }
1552
1553        repo.update(random_id.to_string(), updated_tx)
1554            .await
1555            .unwrap();
1556
1557        // Verify old nonce index is cleaned up
1558        let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
1559        assert!(old_nonce_result.is_none());
1560
1561        // Verify new nonce index works
1562        let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
1563        assert!(new_nonce_result.is_some());
1564    }
1565
1566    #[tokio::test]
1567    #[ignore = "Requires active Redis instance"]
1568    async fn test_has_entries() {
1569        let repo = setup_test_repo().await;
1570        assert!(!repo.has_entries().await.unwrap());
1571
1572        let tx_id = uuid::Uuid::new_v4().to_string();
1573        let tx = create_test_transaction(&tx_id);
1574        repo.create(tx.clone()).await.unwrap();
1575
1576        assert!(repo.has_entries().await.unwrap());
1577    }
1578
1579    #[tokio::test]
1580    #[ignore = "Requires active Redis instance"]
1581    async fn test_drop_all_entries() {
1582        let repo = setup_test_repo().await;
1583        let tx_id = uuid::Uuid::new_v4().to_string();
1584        let tx = create_test_transaction(&tx_id);
1585        repo.create(tx.clone()).await.unwrap();
1586        assert!(repo.has_entries().await.unwrap());
1587
1588        repo.drop_all_entries().await.unwrap();
1589        assert!(!repo.has_entries().await.unwrap());
1590    }
1591
1592    // Tests for delete_at field setting on final status updates
1593    #[tokio::test]
1594    #[ignore = "Requires active Redis instance"]
1595    async fn test_update_status_sets_delete_at_for_final_statuses() {
1596        let _lock = ENV_MUTEX.lock().await;
1597
1598        use chrono::{DateTime, Duration, Utc};
1599        use std::env;
1600
1601        // Use a unique test environment variable to avoid conflicts
1602        env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
1603
1604        let repo = setup_test_repo().await;
1605
1606        let final_statuses = [
1607            TransactionStatus::Canceled,
1608            TransactionStatus::Confirmed,
1609            TransactionStatus::Failed,
1610            TransactionStatus::Expired,
1611        ];
1612
1613        for (i, status) in final_statuses.iter().enumerate() {
1614            let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
1615            let mut tx = create_test_transaction(&tx_id);
1616
1617            // Ensure transaction has no delete_at initially and is in pending state
1618            tx.delete_at = None;
1619            tx.status = TransactionStatus::Pending;
1620
1621            repo.create(tx).await.unwrap();
1622
1623            let before_update = Utc::now();
1624
1625            // Update to final status
1626            let updated = repo
1627                .update_status(tx_id.clone(), status.clone())
1628                .await
1629                .unwrap();
1630
1631            // Should have delete_at set
1632            assert!(
1633                updated.delete_at.is_some(),
1634                "delete_at should be set for status: {:?}",
1635                status
1636            );
1637
1638            // Verify the timestamp is reasonable (approximately 6 hours from now)
1639            let delete_at_str = updated.delete_at.unwrap();
1640            let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1641                .expect("delete_at should be valid RFC3339")
1642                .with_timezone(&Utc);
1643
1644            let duration_from_before = delete_at.signed_duration_since(before_update);
1645            let expected_duration = Duration::hours(6);
1646            let tolerance = Duration::minutes(5);
1647
1648            assert!(
1649                duration_from_before >= expected_duration - tolerance &&
1650                duration_from_before <= expected_duration + tolerance,
1651                "delete_at should be approximately 6 hours from now for status: {:?}. Duration: {:?}",
1652                status, duration_from_before
1653            );
1654        }
1655
1656        // Cleanup
1657        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1658    }
1659
1660    #[tokio::test]
1661    #[ignore = "Requires active Redis instance"]
1662    async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
1663        let _lock = ENV_MUTEX.lock().await;
1664
1665        use std::env;
1666
1667        env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
1668
1669        let repo = setup_test_repo().await;
1670
1671        let non_final_statuses = [
1672            TransactionStatus::Pending,
1673            TransactionStatus::Sent,
1674            TransactionStatus::Submitted,
1675            TransactionStatus::Mined,
1676        ];
1677
1678        for (i, status) in non_final_statuses.iter().enumerate() {
1679            let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
1680            let mut tx = create_test_transaction(&tx_id);
1681            tx.delete_at = None;
1682            tx.status = TransactionStatus::Pending;
1683
1684            repo.create(tx).await.unwrap();
1685
1686            // Update to non-final status
1687            let updated = repo
1688                .update_status(tx_id.clone(), status.clone())
1689                .await
1690                .unwrap();
1691
1692            // Should NOT have delete_at set
1693            assert!(
1694                updated.delete_at.is_none(),
1695                "delete_at should NOT be set for status: {:?}",
1696                status
1697            );
1698        }
1699
1700        // Cleanup
1701        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1702    }
1703
1704    #[tokio::test]
1705    #[ignore = "Requires active Redis instance"]
1706    async fn test_partial_update_sets_delete_at_for_final_statuses() {
1707        let _lock = ENV_MUTEX.lock().await;
1708
1709        use chrono::{DateTime, Duration, Utc};
1710        use std::env;
1711
1712        env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
1713
1714        let repo = setup_test_repo().await;
1715        let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
1716        let mut tx = create_test_transaction(&tx_id);
1717        tx.delete_at = None;
1718        tx.status = TransactionStatus::Pending;
1719
1720        repo.create(tx).await.unwrap();
1721
1722        let before_update = Utc::now();
1723
1724        // Use partial_update to set status to Confirmed (final status)
1725        let update = TransactionUpdateRequest {
1726            status: Some(TransactionStatus::Confirmed),
1727            status_reason: Some("Transaction completed".to_string()),
1728            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
1729            ..Default::default()
1730        };
1731
1732        let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
1733
1734        // Should have delete_at set
1735        assert!(
1736            updated.delete_at.is_some(),
1737            "delete_at should be set when updating to Confirmed status"
1738        );
1739
1740        // Verify the timestamp is reasonable (approximately 8 hours from now)
1741        let delete_at_str = updated.delete_at.unwrap();
1742        let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
1743            .expect("delete_at should be valid RFC3339")
1744            .with_timezone(&Utc);
1745
1746        let duration_from_before = delete_at.signed_duration_since(before_update);
1747        let expected_duration = Duration::hours(8);
1748        let tolerance = Duration::minutes(5);
1749
1750        assert!(
1751            duration_from_before >= expected_duration - tolerance
1752                && duration_from_before <= expected_duration + tolerance,
1753            "delete_at should be approximately 8 hours from now. Duration: {:?}",
1754            duration_from_before
1755        );
1756
1757        // Also verify other fields were updated
1758        assert_eq!(updated.status, TransactionStatus::Confirmed);
1759        assert_eq!(
1760            updated.status_reason,
1761            Some("Transaction completed".to_string())
1762        );
1763        assert_eq!(
1764            updated.confirmed_at,
1765            Some("2023-01-01T12:05:00Z".to_string())
1766        );
1767
1768        // Cleanup
1769        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1770    }
1771
1772    #[tokio::test]
1773    #[ignore = "Requires active Redis instance"]
1774    async fn test_update_status_preserves_existing_delete_at() {
1775        let _lock = ENV_MUTEX.lock().await;
1776
1777        use std::env;
1778
1779        env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
1780
1781        let repo = setup_test_repo().await;
1782        let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
1783        let mut tx = create_test_transaction(&tx_id);
1784
1785        // Set an existing delete_at value
1786        let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
1787        tx.delete_at = Some(existing_delete_at.clone());
1788        tx.status = TransactionStatus::Pending;
1789
1790        repo.create(tx).await.unwrap();
1791
1792        // Update to final status
1793        let updated = repo
1794            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
1795            .await
1796            .unwrap();
1797
1798        // Should preserve the existing delete_at value
1799        assert_eq!(
1800            updated.delete_at,
1801            Some(existing_delete_at),
1802            "Existing delete_at should be preserved when updating to final status"
1803        );
1804
1805        // Cleanup
1806        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1807    }
1808    #[tokio::test]
1809    #[ignore = "Requires active Redis instance"]
1810    async fn test_partial_update_without_status_change_preserves_delete_at() {
1811        let _lock = ENV_MUTEX.lock().await;
1812
1813        use std::env;
1814
1815        env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
1816
1817        let repo = setup_test_repo().await;
1818        let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
1819        let mut tx = create_test_transaction(&tx_id);
1820        tx.delete_at = None;
1821        tx.status = TransactionStatus::Pending;
1822
1823        repo.create(tx).await.unwrap();
1824
1825        // First, update to final status to set delete_at
1826        let updated1 = repo
1827            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
1828            .await
1829            .unwrap();
1830
1831        assert!(updated1.delete_at.is_some());
1832        let original_delete_at = updated1.delete_at.clone();
1833
1834        // Now update other fields without changing status
1835        let update = TransactionUpdateRequest {
1836            status: None, // No status change
1837            status_reason: Some("Updated reason".to_string()),
1838            confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
1839            ..Default::default()
1840        };
1841
1842        let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
1843
1844        // delete_at should be preserved
1845        assert_eq!(
1846            updated2.delete_at, original_delete_at,
1847            "delete_at should be preserved when status is not updated"
1848        );
1849
1850        // Other fields should be updated
1851        assert_eq!(updated2.status, TransactionStatus::Confirmed); // Unchanged
1852        assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
1853        assert_eq!(
1854            updated2.confirmed_at,
1855            Some("2023-01-01T12:10:00Z".to_string())
1856        );
1857
1858        // Cleanup
1859        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
1860    }
1861}