openzeppelin_relayer/repositories/transaction/
transaction_redis.rs

1//! Redis-backed implementation of the TransactionRepository.
2
3use crate::config::ServerConfig;
4use crate::constants::FINAL_TRANSACTION_STATUSES;
5use crate::domain::transaction::common::is_final_state;
6use crate::metrics::{
7    TRANSACTIONS_BY_STATUS, TRANSACTIONS_CREATED, TRANSACTIONS_FAILED,
8    TRANSACTIONS_INSUFFICIENT_FEE_FAILED, TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS,
9    TRANSACTIONS_SUBMITTED, TRANSACTIONS_SUCCESS, TRANSACTIONS_TRY_AGAIN_LATER_FAILED,
10    TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS, TRANSACTION_PROCESSING_TIME,
11};
12use crate::models::{
13    NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
14    TransactionStatus, TransactionUpdateRequest,
15};
16use crate::repositories::redis_base::RedisRepository;
17use crate::repositories::{
18    BatchDeleteResult, BatchRetrievalResult, PaginatedResult, Repository, TransactionDeleteRequest,
19    TransactionRepository,
20};
21use crate::utils::RedisConnections;
22use async_trait::async_trait;
23use chrono::Utc;
24use redis::{AsyncCommands, Script};
25use std::fmt;
26use std::sync::Arc;
27use tracing::{debug, error, warn};
28
29const RELAYER_PREFIX: &str = "relayer";
30const TX_PREFIX: &str = "tx";
31const STATUS_PREFIX: &str = "status";
32const STATUS_SORTED_PREFIX: &str = "status_sorted";
33const NONCE_PREFIX: &str = "nonce";
34const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
35const RELAYER_LIST_KEY: &str = "relayer_list";
36const TX_BY_CREATED_AT_PREFIX: &str = "tx_by_created_at";
37
38#[derive(Clone)]
39pub struct RedisTransactionRepository {
40    pub connections: Arc<RedisConnections>,
41    pub key_prefix: String,
42}
43
44impl RedisRepository for RedisTransactionRepository {}
45
46impl RedisTransactionRepository {
47    pub fn new(
48        connections: Arc<RedisConnections>,
49        key_prefix: String,
50    ) -> Result<Self, RepositoryError> {
51        if key_prefix.is_empty() {
52            return Err(RepositoryError::InvalidData(
53                "Redis key prefix cannot be empty".to_string(),
54            ));
55        }
56
57        Ok(Self {
58            connections,
59            key_prefix,
60        })
61    }
62
63    /// Generate key for transaction data: relayer:{relayer_id}:tx:{tx_id}
64    fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
65        format!(
66            "{}:{}:{}:{}:{}",
67            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
68        )
69    }
70
71    /// Generate key for reverse lookup: tx_to_relayer:{tx_id}
72    fn tx_to_relayer_key(&self, tx_id: &str) -> String {
73        format!(
74            "{}:{}:{}:{}",
75            self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
76        )
77    }
78
79    /// Generate key for relayer status index (legacy SET): relayer:{relayer_id}:status:{status}
80    fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
81        format!(
82            "{}:{}:{}:{}:{}",
83            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
84        )
85    }
86
87    /// Generate key for relayer status sorted index (SORTED SET): relayer:{relayer_id}:status_sorted:{status}
88    /// Score is created_at timestamp in milliseconds for efficient ordering.
89    fn relayer_status_sorted_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
90        format!(
91            "{}:{}:{}:{}:{}",
92            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_SORTED_PREFIX, status
93        )
94    }
95
96    /// Generate key for relayer nonce index: relayer:{relayer_id}:nonce:{nonce}
97    fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
98        format!(
99            "{}:{}:{}:{}:{}",
100            self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
101        )
102    }
103
104    /// Generate key for relayer list: relayer_list (set of all relayer IDs)
105    fn relayer_list_key(&self) -> String {
106        format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
107    }
108
109    /// Generate key for relayer's sorted set by created_at: relayer:{relayer_id}:tx_by_created_at
110    fn relayer_tx_by_created_at_key(&self, relayer_id: &str) -> String {
111        format!(
112            "{}:{}:{}:{}",
113            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_BY_CREATED_AT_PREFIX
114        )
115    }
116
117    /// Returns the components needed for Lua scripts to resolve a tx key from
118    /// only the tx_id: (tx_to_relayer lookup key, key prefix, key suffix).
119    /// The Lua script does: `GET KEYS[1]` to get the relayer_id, then
120    /// constructs the tx key as `ARGV[1] .. relayer_id .. ARGV[2]`.
121    fn tx_key_parts(&self, tx_id: &str) -> (String, String, String) {
122        let lookup_key = self.tx_to_relayer_key(tx_id);
123        let key_prefix = format!("{}:{}:", self.key_prefix, RELAYER_PREFIX);
124        let key_suffix = format!(":{TX_PREFIX}:{tx_id}");
125        (lookup_key, key_prefix, key_suffix)
126    }
127
128    /// Executes an atomic Lua script with retry/backoff for transient Redis failures.
129    ///
130    /// Every script receives `KEYS[1]` = tx_to_relayer lookup key and
131    /// `ARGV[1..2]` = key prefix/suffix. `extra_args` are appended as `ARGV[3..]`.
132    /// The script must return the (possibly updated) JSON string or `false` for
133    /// not-found.
134    async fn run_atomic_script(
135        &self,
136        lua: &str,
137        tx_id: &str,
138        extra_args: &[&str],
139        op_name: &str,
140    ) -> Result<TransactionRepoModel, RepositoryError> {
141        const MAX_RETRIES: u32 = 3;
142        const BASE_BACKOFF_MS: u64 = 100;
143
144        let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(tx_id);
145        let script = Script::new(lua);
146        let mut last_error = None;
147
148        for attempt in 0..MAX_RETRIES {
149            let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
150
151            let mut conn = match self
152                .get_connection(self.connections.primary(), op_name)
153                .await
154            {
155                Ok(conn) => conn,
156                Err(e) => {
157                    last_error = Some(e);
158                    if attempt < MAX_RETRIES - 1 {
159                        warn!(tx_id = %tx_id, attempt, op = %op_name, "connection failed, retrying");
160                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
161                        continue;
162                    }
163                    return Err(last_error.unwrap());
164                }
165            };
166
167            let mut invocation = script.prepare_invoke();
168            invocation
169                .key(&lookup_key)
170                .arg(&key_prefix)
171                .arg(&key_suffix);
172            for arg in extra_args {
173                invocation.arg(*arg);
174            }
175
176            match invocation.invoke_async::<Option<String>>(&mut conn).await {
177                Ok(result) => {
178                    let json = result.ok_or_else(|| {
179                        RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found"))
180                    })?;
181                    return self.deserialize_entity::<TransactionRepoModel>(
182                        &json,
183                        tx_id,
184                        "transaction",
185                    );
186                }
187                Err(e) => {
188                    last_error = Some(self.map_redis_error(e, op_name));
189                    if attempt < MAX_RETRIES - 1 {
190                        warn!(
191                            tx_id = %tx_id, attempt, op = %op_name,
192                            "atomic script failed, retrying"
193                        );
194                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
195                        continue;
196                    }
197                    return Err(last_error.unwrap());
198                }
199            }
200        }
201        Err(last_error.unwrap_or_else(|| {
202            RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}"))
203        }))
204    }
205
206    /// Executes a Lua script with retry/backoff, returning a Vec<String> result
207    /// (for scripts that return Lua tables / multi-bulk replies).
208    /// Returns `Ok(None)` when the script returns `false`.
209    async fn run_script_with_retry_vec(
210        &self,
211        script: &Script,
212        lookup_key: &str,
213        key_prefix: &str,
214        key_suffix: &str,
215        extra_args: &[&str],
216        op_name: &str,
217    ) -> Result<Option<Vec<String>>, RepositoryError> {
218        const MAX_RETRIES: u32 = 3;
219        const BASE_BACKOFF_MS: u64 = 100;
220
221        let mut last_error = None;
222
223        for attempt in 0..MAX_RETRIES {
224            let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
225
226            let mut conn = match self
227                .get_connection(self.connections.primary(), op_name)
228                .await
229            {
230                Ok(conn) => conn,
231                Err(e) => {
232                    last_error = Some(e);
233                    if attempt < MAX_RETRIES - 1 {
234                        warn!(op = %op_name, attempt, "connection failed, retrying");
235                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
236                        continue;
237                    }
238                    return Err(last_error.unwrap());
239                }
240            };
241
242            let mut invocation = script.prepare_invoke();
243            invocation.key(lookup_key).arg(key_prefix).arg(key_suffix);
244            for arg in extra_args {
245                invocation.arg(*arg);
246            }
247
248            // Redis returns `false` from Lua as a Nil bulk reply, which
249            // redis-rs maps to `None` for `Option<Vec<String>>`.
250            match invocation
251                .invoke_async::<Option<Vec<String>>>(&mut conn)
252                .await
253            {
254                Ok(result) => return Ok(result),
255                Err(e) => {
256                    last_error = Some(self.map_redis_error(e, op_name));
257                    if attempt < MAX_RETRIES - 1 {
258                        warn!(op = %op_name, attempt, "script failed, retrying");
259                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
260                        continue;
261                    }
262                    return Err(last_error.unwrap());
263                }
264            }
265        }
266        Err(last_error.unwrap_or_else(|| {
267            RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}"))
268        }))
269    }
270
271    /// Parse timestamp string to score for sorted set (milliseconds since epoch)
272    fn timestamp_to_score(&self, timestamp: &str) -> f64 {
273        chrono::DateTime::parse_from_rfc3339(timestamp)
274            .map(|dt| dt.timestamp_millis() as f64)
275            .unwrap_or_else(|_| {
276                warn!(timestamp = %timestamp, "failed to parse timestamp, using 0");
277                0.0
278            })
279    }
280
281    /// Compute the appropriate score for a transaction's status sorted set.
282    /// - For Confirmed status: use confirmed_at (on-chain confirmation order)
283    /// - For all other statuses: use created_at (queue/processing order)
284    fn status_sorted_score(&self, tx: &TransactionRepoModel) -> f64 {
285        if tx.status == TransactionStatus::Confirmed {
286            // For Confirmed, prefer confirmed_at for accurate on-chain ordering
287            if let Some(ref confirmed_at) = tx.confirmed_at {
288                return self.timestamp_to_score(confirmed_at);
289            }
290            // Fallback to created_at if confirmed_at not set (shouldn't happen)
291            warn!(tx_id = %tx.id, "Confirmed transaction missing confirmed_at, using created_at");
292        }
293        self.timestamp_to_score(&tx.created_at)
294    }
295
296    /// Batch fetch transactions by IDs using reverse lookup
297    async fn get_transactions_by_ids(
298        &self,
299        ids: &[String],
300    ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
301        if ids.is_empty() {
302            debug!("no transaction IDs provided for batch fetch");
303            return Ok(BatchRetrievalResult {
304                results: vec![],
305                failed_ids: vec![],
306            });
307        }
308
309        let mut conn = self
310            .get_connection(self.connections.reader(), "batch_fetch_transactions")
311            .await?;
312
313        let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
314
315        debug!(count = %ids.len(), "fetching relayer IDs for transactions");
316
317        let relayer_ids: Vec<Option<String>> = conn
318            .mget(&reverse_keys)
319            .await
320            .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
321
322        let mut tx_keys = Vec::new();
323        let mut valid_ids = Vec::new();
324        let mut failed_ids = Vec::new();
325        for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
326            match relayer_id {
327                Some(relayer_id) => {
328                    tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
329                    valid_ids.push(ids[i].clone());
330                }
331                None => {
332                    warn!(tx_id = %ids[i], "no relayer found for transaction");
333                    failed_ids.push(ids[i].clone());
334                }
335            }
336        }
337
338        if tx_keys.is_empty() {
339            debug!("no valid transactions found for batch fetch");
340            return Ok(BatchRetrievalResult {
341                results: vec![],
342                failed_ids,
343            });
344        }
345
346        debug!(count = %tx_keys.len(), "batch fetching transaction data");
347
348        let values: Vec<Option<String>> = conn
349            .mget(&tx_keys)
350            .await
351            .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
352
353        let mut transactions = Vec::new();
354        let mut failed_count = 0;
355        for (i, value) in values.into_iter().enumerate() {
356            match value {
357                Some(json) => {
358                    match self.deserialize_entity::<TransactionRepoModel>(
359                        &json,
360                        &valid_ids[i],
361                        "transaction",
362                    ) {
363                        Ok(tx) => transactions.push(tx),
364                        Err(e) => {
365                            failed_count += 1;
366                            error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
367                            // Continue processing other transactions
368                        }
369                    }
370                }
371                None => {
372                    warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
373                    failed_ids.push(valid_ids[i].clone());
374                }
375            }
376        }
377
378        if failed_count > 0 {
379            warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
380        }
381
382        debug!(count = %transactions.len(), "successfully fetched transactions");
383        Ok(BatchRetrievalResult {
384            results: transactions,
385            failed_ids,
386        })
387    }
388
389    /// Extract nonce from EVM transaction data
390    fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
391        match network_data.get_evm_transaction_data() {
392            Ok(tx_data) => tx_data.nonce,
393            Err(_) => {
394                debug!("no EVM transaction data available for nonce extraction");
395                None
396            }
397        }
398    }
399
400    /// Ensures the status sorted set exists, migrating from legacy SET if needed.
401    ///
402    /// This handles the transition from unordered SETs to sorted SETs for status indexing.
403    /// If the sorted set is empty but the legacy set has data, it migrates the data
404    /// by looking up each transaction's created_at timestamp to compute the score.
405    ///
406    /// # Concurrency
407    /// This function is safe for concurrent calls. If multiple calls race to migrate
408    /// the same status set:
409    /// - ZADD is idempotent (same member + score = no-op)
410    /// - DEL on non-existent key is safe (returns 0)
411    /// - After first successful migration, subsequent calls hit the fast path (ZCARD > 0)
412    ///
413    /// The only downside of concurrent migrations is wasted work, not data corruption.
414    ///
415    /// Returns the count of items in the sorted set after migration.
416    async fn ensure_status_sorted_set(
417        &self,
418        relayer_id: &str,
419        status: &TransactionStatus,
420    ) -> Result<u64, RepositoryError> {
421        let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
422        let legacy_key = self.relayer_status_key(relayer_id, status);
423
424        // Phase 1: Check if migration is needed
425        let legacy_ids = {
426            let mut conn = self
427                .get_connection(self.connections.primary(), "ensure_status_sorted_set_check")
428                .await?;
429
430            // Always check if legacy set has data that needs migration
431            let legacy_count: u64 = conn
432                .scard(&legacy_key)
433                .await
434                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_scard"))?;
435
436            if legacy_count == 0 {
437                // No legacy data to migrate, return current ZSET count
438                let sorted_count: u64 = conn
439                    .zcard(&sorted_key)
440                    .await
441                    .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_zcard"))?;
442                return Ok(sorted_count);
443            }
444
445            // Migration needed: get all IDs from legacy set
446            debug!(
447                relayer_id = %relayer_id,
448                status = %status,
449                legacy_count = %legacy_count,
450                "migrating status set to sorted set"
451            );
452
453            let ids: Vec<String> = conn
454                .smembers(&legacy_key)
455                .await
456                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_smembers"))?;
457
458            ids
459            // Connection dropped here before nested call to avoid connection doubling
460        };
461
462        if legacy_ids.is_empty() {
463            return Ok(0);
464        }
465
466        // Phase 2: Fetch transactions (uses its own connection internally)
467        let transactions = self.get_transactions_by_ids(&legacy_ids).await?;
468
469        // Phase 3: Perform migration with a new connection
470        let mut conn = self
471            .get_connection(
472                self.connections.primary(),
473                "ensure_status_sorted_set_migrate",
474            )
475            .await?;
476
477        if transactions.results.is_empty() {
478            // All transactions were stale/deleted, clean up legacy set
479            let _: () = conn
480                .del(&legacy_key)
481                .await
482                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_del_stale"))?;
483            return Ok(0);
484        }
485
486        // Build sorted set entries and migrate atomically
487        // Use status-aware scoring: confirmed_at for Confirmed, created_at for others
488        let mut pipe = redis::pipe();
489        pipe.atomic();
490
491        for tx in &transactions.results {
492            let score = self.status_sorted_score(tx);
493            pipe.zadd(&sorted_key, &tx.id, score);
494        }
495
496        // Delete legacy set after migration
497        pipe.del(&legacy_key);
498
499        pipe.query_async::<()>(&mut conn)
500            .await
501            .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_migrate"))?;
502
503        let migrated_count = transactions.results.len() as u64;
504        debug!(
505            relayer_id = %relayer_id,
506            status = %status,
507            migrated_count = %migrated_count,
508            "completed migration of status set to sorted set"
509        );
510
511        Ok(migrated_count)
512    }
513
514    /// Update indexes atomically with comprehensive error handling
515    async fn update_indexes(
516        &self,
517        tx: &TransactionRepoModel,
518        old_tx: Option<&TransactionRepoModel>,
519    ) -> Result<(), RepositoryError> {
520        let mut conn = self
521            .get_connection(self.connections.primary(), "update_indexes")
522            .await?;
523        let mut pipe = redis::pipe();
524        pipe.atomic();
525
526        debug!(tx_id = %tx.id, "updating indexes for transaction");
527
528        // Add relayer to the global relayer list
529        let relayer_list_key = self.relayer_list_key();
530        pipe.sadd(&relayer_list_key, &tx.relayer_id);
531
532        // Compute scores for sorted sets
533        // Status sorted set: uses confirmed_at for Confirmed status, created_at for others
534        let status_score = self.status_sorted_score(tx);
535        // Global tx_by_created_at: always uses created_at for consistent ordering
536        let created_at_score = self.timestamp_to_score(&tx.created_at);
537
538        // Handle status index updates - write to SORTED SET (new format)
539        let new_status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, &tx.status);
540        pipe.zadd(&new_status_sorted_key, &tx.id, status_score);
541        debug!(tx_id = %tx.id, status = %tx.status, score = %status_score, "added transaction to status sorted set");
542
543        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
544            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
545            pipe.set(&nonce_key, &tx.id);
546            debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
547        }
548
549        // Add to per-relayer sorted set by created_at (for efficient sorted pagination)
550        let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
551        pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
552        debug!(tx_id = %tx.id, score = %created_at_score, "added transaction to sorted set by created_at");
553
554        // Remove old indexes if updating
555        if let Some(old) = old_tx {
556            if old.status != tx.status {
557                // Remove from old status sorted set (new format)
558                let old_status_sorted_key =
559                    self.relayer_status_sorted_key(&old.relayer_id, &old.status);
560                pipe.zrem(&old_status_sorted_key, &tx.id);
561
562                // Also clean up legacy SET if it exists (for migration cleanup)
563                let old_status_legacy_key = self.relayer_status_key(&old.relayer_id, &old.status);
564                pipe.srem(&old_status_legacy_key, &tx.id);
565
566                debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status indexes for transaction");
567            }
568
569            // Handle nonce index cleanup
570            if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
571                let new_nonce = self.extract_nonce(&tx.network_data);
572                if Some(old_nonce) != new_nonce {
573                    let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
574                    pipe.del(&old_nonce_key);
575                    debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
576                }
577            }
578        }
579
580        // Execute all operations in a single pipeline
581        pipe.exec_async(&mut conn).await.map_err(|e| {
582            error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
583            self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
584        })?;
585
586        debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
587        Ok(())
588    }
589
590    /// Remove all indexes with error recovery
591    async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
592        let mut conn = self
593            .get_connection(self.connections.primary(), "remove_all_indexes")
594            .await?;
595        let mut pipe = redis::pipe();
596        pipe.atomic();
597
598        debug!(tx_id = %tx.id, "removing all indexes for transaction");
599
600        // Remove from ALL possible status indexes to ensure complete cleanup
601        // This handles cases where a transaction might be in multiple status sets
602        // due to race conditions, partial failures, or bugs
603        for status in &[
604            TransactionStatus::Canceled,
605            TransactionStatus::Pending,
606            TransactionStatus::Sent,
607            TransactionStatus::Submitted,
608            TransactionStatus::Mined,
609            TransactionStatus::Confirmed,
610            TransactionStatus::Failed,
611            TransactionStatus::Expired,
612        ] {
613            // Remove from sorted status set (new format)
614            let status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, status);
615            pipe.zrem(&status_sorted_key, &tx.id);
616
617            // Remove from legacy status set (for migration cleanup)
618            let status_legacy_key = self.relayer_status_key(&tx.relayer_id, status);
619            pipe.srem(&status_legacy_key, &tx.id);
620        }
621
622        // Remove nonce index if exists
623        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
624            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
625            pipe.del(&nonce_key);
626            debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
627        }
628
629        // Remove from per-relayer sorted set by created_at
630        let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
631        pipe.zrem(&relayer_sorted_key, &tx.id);
632        debug!(tx_id = %tx.id, "removing transaction from sorted set by created_at");
633
634        // Remove reverse lookup
635        let reverse_key = self.tx_to_relayer_key(&tx.id);
636        pipe.del(&reverse_key);
637
638        pipe.exec_async(&mut conn).await.map_err(|e| {
639            error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
640            self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
641        })?;
642
643        debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
644        Ok(())
645    }
646
647    /// Track Prometheus metrics when a transaction status changes.
648    fn track_status_change_metrics(
649        &self,
650        _original_tx: &TransactionRepoModel,
651        updated_tx: &TransactionRepoModel,
652        old_status: &TransactionStatus,
653        new_status: &TransactionStatus,
654    ) {
655        let network_type = format!("{:?}", updated_tx.network_type).to_lowercase();
656        let relayer_id = updated_tx.relayer_id.as_str();
657
658        // Track submission (when status changes to Submitted)
659        if *old_status != TransactionStatus::Submitted
660            && *new_status == TransactionStatus::Submitted
661        {
662            TRANSACTIONS_SUBMITTED
663                .with_label_values(&[relayer_id, &network_type])
664                .inc();
665
666            if let Ok(created_time) = chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at) {
667                let processing_seconds =
668                    (Utc::now() - created_time.with_timezone(&Utc)).num_seconds() as f64;
669                TRANSACTION_PROCESSING_TIME
670                    .with_label_values(&[relayer_id, &network_type, "creation_to_submission"])
671                    .observe(processing_seconds);
672            }
673        }
674
675        // Track status distribution (update gauge when status changes)
676        if old_status != new_status {
677            let old_status_str = format!("{old_status:?}").to_lowercase();
678            let old_status_gauge = TRANSACTIONS_BY_STATUS.with_label_values(&[
679                relayer_id,
680                &network_type,
681                &old_status_str,
682            ]);
683            let clamped_value = (old_status_gauge.get() - 1.0).max(0.0);
684            old_status_gauge.set(clamped_value);
685
686            let new_status_str = format!("{new_status:?}").to_lowercase();
687            TRANSACTIONS_BY_STATUS
688                .with_label_values(&[relayer_id, &network_type, &new_status_str])
689                .inc();
690        }
691
692        // Track metrics for final transaction states
693        let was_final = is_final_state(old_status);
694        let is_final = is_final_state(new_status);
695
696        if !was_final && is_final {
697            let meta = updated_tx.metadata.as_ref();
698            let had_insufficient_fee = meta.is_some_and(|m| m.insufficient_fee_retries > 0);
699            let had_try_again_later = meta.is_some_and(|m| m.try_again_later_retries > 0);
700
701            match new_status {
702                TransactionStatus::Confirmed => {
703                    TRANSACTIONS_SUCCESS
704                        .with_label_values(&[relayer_id, &network_type])
705                        .inc();
706                    if had_insufficient_fee {
707                        TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS
708                            .with_label_values(&[relayer_id, &network_type])
709                            .inc();
710                    }
711                    if had_try_again_later {
712                        TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS
713                            .with_label_values(&[relayer_id, &network_type])
714                            .inc();
715                    }
716
717                    if let (Some(sent_at_str), Some(confirmed_at_str)) =
718                        (&updated_tx.sent_at, &updated_tx.confirmed_at)
719                    {
720                        if let (Ok(sent_time), Ok(confirmed_time)) = (
721                            chrono::DateTime::parse_from_rfc3339(sent_at_str),
722                            chrono::DateTime::parse_from_rfc3339(confirmed_at_str),
723                        ) {
724                            let processing_seconds = (confirmed_time.with_timezone(&Utc)
725                                - sent_time.with_timezone(&Utc))
726                            .num_seconds()
727                                as f64;
728                            TRANSACTION_PROCESSING_TIME
729                                .with_label_values(&[
730                                    relayer_id,
731                                    &network_type,
732                                    "submission_to_confirmation",
733                                ])
734                                .observe(processing_seconds);
735                        }
736                    }
737
738                    if let Ok(created_time) =
739                        chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
740                    {
741                        if let Some(confirmed_at_str) = &updated_tx.confirmed_at {
742                            if let Ok(confirmed_time) =
743                                chrono::DateTime::parse_from_rfc3339(confirmed_at_str)
744                            {
745                                let processing_seconds = (confirmed_time.with_timezone(&Utc)
746                                    - created_time.with_timezone(&Utc))
747                                .num_seconds()
748                                    as f64;
749                                TRANSACTION_PROCESSING_TIME
750                                    .with_label_values(&[
751                                        relayer_id,
752                                        &network_type,
753                                        "creation_to_confirmation",
754                                    ])
755                                    .observe(processing_seconds);
756                            }
757                        }
758                    }
759                }
760                TransactionStatus::Failed => {
761                    let failure_reason = updated_tx
762                        .status_reason
763                        .as_deref()
764                        .map(|reason| {
765                            if reason.starts_with("Submission failed:") {
766                                "submission_failed"
767                            } else if reason.starts_with("Preparation failed:") {
768                                "preparation_failed"
769                            } else {
770                                "failed"
771                            }
772                        })
773                        .unwrap_or("failed");
774                    TRANSACTIONS_FAILED
775                        .with_label_values(&[relayer_id, &network_type, failure_reason])
776                        .inc();
777                }
778                TransactionStatus::Expired => {
779                    TRANSACTIONS_FAILED
780                        .with_label_values(&[relayer_id, &network_type, "expired"])
781                        .inc();
782                }
783                TransactionStatus::Canceled => {
784                    TRANSACTIONS_FAILED
785                        .with_label_values(&[relayer_id, &network_type, "canceled"])
786                        .inc();
787                }
788                _ => {}
789            }
790
791            // Track retry-related failure metrics for all non-success final states
792            if *new_status != TransactionStatus::Confirmed {
793                if had_insufficient_fee {
794                    TRANSACTIONS_INSUFFICIENT_FEE_FAILED
795                        .with_label_values(&[relayer_id, &network_type])
796                        .inc();
797                }
798                if had_try_again_later {
799                    TRANSACTIONS_TRY_AGAIN_LATER_FAILED
800                        .with_label_values(&[relayer_id, &network_type])
801                        .inc();
802                }
803            }
804        }
805    }
806}
807
808impl fmt::Debug for RedisTransactionRepository {
809    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
810        f.debug_struct("RedisTransactionRepository")
811            .field("connections", &"<RedisConnections>")
812            .field("key_prefix", &self.key_prefix)
813            .finish()
814    }
815}
816
817#[async_trait]
818impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
819    async fn create(
820        &self,
821        entity: TransactionRepoModel,
822    ) -> Result<TransactionRepoModel, RepositoryError> {
823        if entity.id.is_empty() {
824            return Err(RepositoryError::InvalidData(
825                "Transaction ID cannot be empty".to_string(),
826            ));
827        }
828
829        let key = self.tx_key(&entity.relayer_id, &entity.id);
830        let reverse_key = self.tx_to_relayer_key(&entity.id);
831        let mut conn = self
832            .get_connection(self.connections.primary(), "create")
833            .await?;
834
835        debug!(tx_id = %entity.id, "creating transaction");
836
837        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
838
839        // Check if transaction already exists by checking reverse lookup
840        let existing: Option<String> = conn
841            .get(&reverse_key)
842            .await
843            .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
844
845        if existing.is_some() {
846            return Err(RepositoryError::ConstraintViolation(format!(
847                "Transaction with ID {} already exists",
848                entity.id
849            )));
850        }
851
852        // Use atomic pipeline for consistency
853        let mut pipe = redis::pipe();
854        pipe.atomic();
855        pipe.set(&key, &value);
856        pipe.set(&reverse_key, &entity.relayer_id);
857
858        pipe.exec_async(&mut conn)
859            .await
860            .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
861
862        // Update indexes separately to handle partial failures gracefully
863        if let Err(e) = self.update_indexes(&entity, None).await {
864            error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
865            return Err(e);
866        }
867
868        // Track transaction creation metric
869        let network_type = format!("{:?}", entity.network_type).to_lowercase();
870        let relayer_id = entity.relayer_id.as_str();
871        TRANSACTIONS_CREATED
872            .with_label_values(&[relayer_id, &network_type])
873            .inc();
874
875        // Track initial status distribution (Pending)
876        let status = &entity.status;
877        let status_str = format!("{status:?}").to_lowercase();
878        TRANSACTIONS_BY_STATUS
879            .with_label_values(&[relayer_id, &network_type, &status_str])
880            .inc();
881
882        debug!(tx_id = %entity.id, "successfully created transaction");
883        Ok(entity)
884    }
885
886    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
887        if id.is_empty() {
888            return Err(RepositoryError::InvalidData(
889                "Transaction ID cannot be empty".to_string(),
890            ));
891        }
892
893        let mut conn = self
894            .get_connection(self.connections.reader(), "get_by_id")
895            .await?;
896
897        debug!(tx_id = %id, "fetching transaction");
898
899        let reverse_key = self.tx_to_relayer_key(&id);
900        let relayer_id: Option<String> = conn
901            .get(&reverse_key)
902            .await
903            .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
904
905        let relayer_id = match relayer_id {
906            Some(relayer_id) => relayer_id,
907            None => {
908                debug!(tx_id = %id, "transaction not found (no reverse lookup)");
909                return Err(RepositoryError::NotFound(format!(
910                    "Transaction with ID {id} not found"
911                )));
912            }
913        };
914
915        let key = self.tx_key(&relayer_id, &id);
916        let value: Option<String> = conn
917            .get(&key)
918            .await
919            .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
920
921        match value {
922            Some(json) => {
923                let tx =
924                    self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
925                debug!(tx_id = %id, "successfully fetched transaction");
926                Ok(tx)
927            }
928            None => {
929                debug!(tx_id = %id, "transaction not found");
930                Err(RepositoryError::NotFound(format!(
931                    "Transaction with ID {id} not found"
932                )))
933            }
934        }
935    }
936
937    // Unoptimized implementation of list_paginated. Rarely used. find_by_relayer_id is preferred.
938    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
939        let mut conn = self
940            .get_connection(self.connections.reader(), "list_all")
941            .await?;
942
943        debug!("fetching all transactions sorted by created_at (newest first)");
944
945        // Get all relayer IDs
946        let relayer_list_key = self.relayer_list_key();
947        let relayer_ids: Vec<String> = conn
948            .smembers(&relayer_list_key)
949            .await
950            .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
951
952        debug!(count = %relayer_ids.len(), "found relayers");
953
954        // Collect all transaction IDs from all relayers using their sorted sets
955        let mut all_tx_ids = Vec::new();
956        for relayer_id in relayer_ids {
957            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
958            let tx_ids: Vec<String> = redis::cmd("ZRANGE")
959                .arg(&relayer_sorted_key)
960                .arg(0)
961                .arg(-1)
962                .arg("REV")
963                .query_async(&mut conn)
964                .await
965                .map_err(|e| self.map_redis_error(e, "list_all_relayer_sorted"))?;
966
967            all_tx_ids.extend(tx_ids);
968        }
969
970        // Release connection before nested call to avoid connection doubling
971        drop(conn);
972
973        // Batch fetch all transactions at once
974        let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
975        let mut all_transactions = batch_result.results;
976
977        // Sort all transactions by created_at (newest first)
978        all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
979
980        debug!(count = %all_transactions.len(), "found transactions");
981        Ok(all_transactions)
982    }
983
984    // Unoptimized implementation of list_paginated. Rarely used. find_by_relayer_id is preferred.
985    async fn list_paginated(
986        &self,
987        query: PaginationQuery,
988    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
989        if query.per_page == 0 {
990            return Err(RepositoryError::InvalidData(
991                "per_page must be greater than 0".to_string(),
992            ));
993        }
994
995        let mut conn = self
996            .get_connection(self.connections.reader(), "list_paginated")
997            .await?;
998
999        debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions sorted by created_at (newest first)");
1000
1001        // Get all relayer IDs
1002        let relayer_list_key = self.relayer_list_key();
1003        let relayer_ids: Vec<String> = conn
1004            .smembers(&relayer_list_key)
1005            .await
1006            .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
1007
1008        // Collect all transaction IDs from all relayers using their sorted sets
1009        let mut all_tx_ids = Vec::new();
1010        for relayer_id in relayer_ids {
1011            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
1012            let tx_ids: Vec<String> = redis::cmd("ZRANGE")
1013                .arg(&relayer_sorted_key)
1014                .arg(0)
1015                .arg(-1)
1016                .arg("REV")
1017                .query_async(&mut conn)
1018                .await
1019                .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_sorted"))?;
1020
1021            all_tx_ids.extend(tx_ids);
1022        }
1023
1024        // Release connection before nested call to avoid connection doubling
1025        drop(conn);
1026
1027        // Batch fetch all transactions at once
1028        let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
1029        let mut all_transactions = batch_result.results;
1030
1031        // Sort all transactions by created_at (newest first)
1032        all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1033
1034        let total = all_transactions.len() as u64;
1035        let start = ((query.page - 1) * query.per_page) as usize;
1036        let end = (start + query.per_page as usize).min(all_transactions.len());
1037
1038        if start >= all_transactions.len() {
1039            debug!(page = %query.page, total = %total, "page is beyond available data");
1040            return Ok(PaginatedResult {
1041                items: vec![],
1042                total,
1043                page: query.page,
1044                per_page: query.per_page,
1045            });
1046        }
1047
1048        let items = all_transactions[start..end].to_vec();
1049
1050        debug!(count = %items.len(), page = %query.page, "successfully fetched transactions for page");
1051
1052        Ok(PaginatedResult {
1053            items,
1054            total,
1055            page: query.page,
1056            per_page: query.per_page,
1057        })
1058    }
1059
1060    async fn update(
1061        &self,
1062        id: String,
1063        entity: TransactionRepoModel,
1064    ) -> Result<TransactionRepoModel, RepositoryError> {
1065        if id.is_empty() {
1066            return Err(RepositoryError::InvalidData(
1067                "Transaction ID cannot be empty".to_string(),
1068            ));
1069        }
1070
1071        debug!(tx_id = %id, "updating transaction");
1072
1073        // Get the old transaction for index cleanup
1074        let old_tx = self.get_by_id(id.clone()).await?;
1075
1076        let key = self.tx_key(&entity.relayer_id, &id);
1077        let mut conn = self
1078            .get_connection(self.connections.primary(), "update")
1079            .await?;
1080
1081        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
1082
1083        // Update transaction
1084        let _: () = conn
1085            .set(&key, value)
1086            .await
1087            .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
1088
1089        // Update indexes
1090        self.update_indexes(&entity, Some(&old_tx)).await?;
1091
1092        debug!(tx_id = %id, "successfully updated transaction");
1093        Ok(entity)
1094    }
1095
1096    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
1097        if id.is_empty() {
1098            return Err(RepositoryError::InvalidData(
1099                "Transaction ID cannot be empty".to_string(),
1100            ));
1101        }
1102
1103        debug!(tx_id = %id, "deleting transaction");
1104
1105        // Get transaction first for index cleanup
1106        let tx = self.get_by_id(id.clone()).await?;
1107
1108        let key = self.tx_key(&tx.relayer_id, &id);
1109        let reverse_key = self.tx_to_relayer_key(&id);
1110        let mut conn = self
1111            .get_connection(self.connections.primary(), "delete_by_id")
1112            .await?;
1113
1114        let mut pipe = redis::pipe();
1115        pipe.atomic();
1116        pipe.del(&key);
1117        pipe.del(&reverse_key);
1118
1119        pipe.exec_async(&mut conn)
1120            .await
1121            .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
1122
1123        // Remove indexes (log errors but don't fail the delete)
1124        if let Err(e) = self.remove_all_indexes(&tx).await {
1125            error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
1126        }
1127
1128        debug!(tx_id = %id, "successfully deleted transaction");
1129        Ok(())
1130    }
1131
1132    // Unoptimized implementation of count. Rarely used. find_by_relayer_id is preferred.
1133    async fn count(&self) -> Result<usize, RepositoryError> {
1134        let mut conn = self
1135            .get_connection(self.connections.reader(), "count")
1136            .await?;
1137
1138        debug!("counting transactions");
1139
1140        // Get all relayer IDs and sum their sorted set counts
1141        let relayer_list_key = self.relayer_list_key();
1142        let relayer_ids: Vec<String> = conn
1143            .smembers(&relayer_list_key)
1144            .await
1145            .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
1146
1147        let mut total_count = 0usize;
1148        for relayer_id in relayer_ids {
1149            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
1150            let count: usize = conn
1151                .zcard(&relayer_sorted_key)
1152                .await
1153                .map_err(|e| self.map_redis_error(e, "count_relayer_transactions"))?;
1154            total_count += count;
1155        }
1156
1157        debug!(count = %total_count, "transaction count");
1158        Ok(total_count)
1159    }
1160
1161    async fn has_entries(&self) -> Result<bool, RepositoryError> {
1162        let mut conn = self
1163            .get_connection(self.connections.reader(), "has_entries")
1164            .await?;
1165        let relayer_list_key = self.relayer_list_key();
1166
1167        debug!("checking if transaction entries exist");
1168
1169        let exists: bool = conn
1170            .exists(&relayer_list_key)
1171            .await
1172            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
1173
1174        debug!(exists = %exists, "transaction entries exist");
1175        Ok(exists)
1176    }
1177
1178    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
1179        let mut conn = self
1180            .get_connection(self.connections.primary(), "drop_all_entries")
1181            .await?;
1182        let relayer_list_key = self.relayer_list_key();
1183
1184        debug!("dropping all transaction entries");
1185
1186        // Get all relayer IDs first
1187        let relayer_ids: Vec<String> = conn
1188            .smembers(&relayer_list_key)
1189            .await
1190            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
1191
1192        if relayer_ids.is_empty() {
1193            debug!("no transaction entries to drop");
1194            return Ok(());
1195        }
1196
1197        // Use pipeline for atomic operations
1198        let mut pipe = redis::pipe();
1199        pipe.atomic();
1200
1201        // Delete all transactions and their indexes for each relayer
1202        for relayer_id in &relayer_ids {
1203            // Get all transaction IDs for this relayer
1204            let pattern = format!(
1205                "{}:{}:{}:{}:*",
1206                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
1207            );
1208            let mut cursor = 0;
1209            let mut tx_ids = Vec::new();
1210
1211            loop {
1212                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
1213                    .cursor_arg(cursor)
1214                    .arg("MATCH")
1215                    .arg(&pattern)
1216                    .query_async(&mut conn)
1217                    .await
1218                    .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
1219
1220                // Extract transaction IDs from keys and delete keys
1221                for key in keys {
1222                    pipe.del(&key);
1223                    if let Some(tx_id) = key.split(':').next_back() {
1224                        tx_ids.push(tx_id.to_string());
1225                    }
1226                }
1227
1228                cursor = next_cursor;
1229                if cursor == 0 {
1230                    break;
1231                }
1232            }
1233
1234            // Delete reverse lookup keys and indexes
1235            for tx_id in tx_ids {
1236                let reverse_key = self.tx_to_relayer_key(&tx_id);
1237                pipe.del(&reverse_key);
1238
1239                // Delete status indexes (we can't know the specific status, so we'll clean up all possible ones)
1240                // This ensures complete cleanup even if there are orphaned entries
1241                for status in &[
1242                    TransactionStatus::Canceled,
1243                    TransactionStatus::Pending,
1244                    TransactionStatus::Sent,
1245                    TransactionStatus::Submitted,
1246                    TransactionStatus::Mined,
1247                    TransactionStatus::Confirmed,
1248                    TransactionStatus::Failed,
1249                    TransactionStatus::Expired,
1250                ] {
1251                    // Remove from sorted status set (new format)
1252                    let status_sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1253                    pipe.zrem(&status_sorted_key, &tx_id);
1254
1255                    // Remove from legacy status set (for migration cleanup)
1256                    let status_key = self.relayer_status_key(relayer_id, status);
1257                    pipe.srem(&status_key, &tx_id);
1258                }
1259            }
1260
1261            // Delete the relayer's sorted set by created_at
1262            let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
1263            pipe.del(&relayer_sorted_key);
1264        }
1265
1266        // Delete the relayer list key
1267        pipe.del(&relayer_list_key);
1268
1269        pipe.exec_async(&mut conn)
1270            .await
1271            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
1272
1273        debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
1274        Ok(())
1275    }
1276}
1277
1278#[async_trait]
1279impl TransactionRepository for RedisTransactionRepository {
1280    async fn find_by_relayer_id(
1281        &self,
1282        relayer_id: &str,
1283        query: PaginationQuery,
1284    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1285        let mut conn = self
1286            .get_connection(self.connections.reader(), "find_by_relayer_id")
1287            .await?;
1288
1289        debug!(relayer_id = %relayer_id, page = %query.page, per_page = %query.per_page, "fetching transactions for relayer sorted by created_at (newest first)");
1290
1291        let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
1292
1293        // Get total count from relayer's sorted set
1294        let sorted_set_count: u64 = conn
1295            .zcard(&relayer_sorted_key)
1296            .await
1297            .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_count"))?;
1298
1299        // If sorted set is empty, return empty result immediately
1300        // All new transactions are automatically added to the sorted set
1301        if sorted_set_count == 0 {
1302            debug!(relayer_id = %relayer_id, "no transactions found for relayer (sorted set is empty)");
1303            return Ok(PaginatedResult {
1304                items: vec![],
1305                total: 0,
1306                page: query.page,
1307                per_page: query.per_page,
1308            });
1309        }
1310
1311        let total = sorted_set_count;
1312
1313        // Calculate pagination range (0-indexed for Redis ZRANGE with REV)
1314        let start = ((query.page - 1) * query.per_page) as isize;
1315        let end = start + query.per_page as isize - 1;
1316
1317        if start as u64 >= total {
1318            debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
1319            return Ok(PaginatedResult {
1320                items: vec![],
1321                total,
1322                page: query.page,
1323                per_page: query.per_page,
1324            });
1325        }
1326
1327        // Get page of transaction IDs from sorted set (newest first using ZRANGE with REV)
1328        let page_ids: Vec<String> = redis::cmd("ZRANGE")
1329            .arg(&relayer_sorted_key)
1330            .arg(start)
1331            .arg(end)
1332            .arg("REV")
1333            .query_async(&mut conn)
1334            .await
1335            .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_sorted"))?;
1336
1337        // Release connection before nested call to avoid connection doubling
1338        drop(conn);
1339
1340        let items = self.get_transactions_by_ids(&page_ids).await?;
1341
1342        debug!(relayer_id = %relayer_id, count = %items.results.len(), page = %query.page, "successfully fetched transactions for relayer");
1343
1344        Ok(PaginatedResult {
1345            items: items.results,
1346            total,
1347            page: query.page,
1348            per_page: query.per_page,
1349        })
1350    }
1351
1352    // Unoptimized implementation of find_by_status. Rarely used. find_by_status_paginated is preferred.
1353    async fn find_by_status(
1354        &self,
1355        relayer_id: &str,
1356        statuses: &[TransactionStatus],
1357    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
1358        // Ensure all status sorted sets are migrated first (releases connection after each)
1359        for status in statuses {
1360            self.ensure_status_sorted_set(relayer_id, status).await?;
1361        }
1362
1363        // Now get a connection and collect all IDs
1364        let mut conn = self
1365            .get_connection(self.connections.reader(), "find_by_status")
1366            .await?;
1367
1368        let mut all_ids: Vec<String> = Vec::new();
1369        for status in statuses {
1370            // Get IDs from sorted set (already ordered by created_at)
1371            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1372            let ids: Vec<String> = redis::cmd("ZRANGE")
1373                .arg(&sorted_key)
1374                .arg(0)
1375                .arg(-1)
1376                .arg("REV") // Newest first
1377                .query_async(&mut conn)
1378                .await
1379                .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
1380
1381            all_ids.extend(ids);
1382        }
1383
1384        // Release connection before nested call to avoid connection doubling
1385        drop(conn);
1386
1387        if all_ids.is_empty() {
1388            return Ok(vec![]);
1389        }
1390
1391        // Remove duplicates (can happen if a transaction is in multiple status sets due to partial failures)
1392        all_ids.sort();
1393        all_ids.dedup();
1394
1395        // Fetch all transactions and sort by created_at (newest first)
1396        let mut transactions = self.get_transactions_by_ids(&all_ids).await?;
1397
1398        // Sort by created_at descending (newest first)
1399        transactions
1400            .results
1401            .sort_by(|a, b| b.created_at.cmp(&a.created_at));
1402
1403        Ok(transactions.results)
1404    }
1405
1406    async fn find_by_status_paginated(
1407        &self,
1408        relayer_id: &str,
1409        statuses: &[TransactionStatus],
1410        query: PaginationQuery,
1411        oldest_first: bool,
1412    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1413        // Ensure all status sorted sets are migrated first (releases connection after each)
1414        for status in statuses {
1415            self.ensure_status_sorted_set(relayer_id, status).await?;
1416        }
1417
1418        let mut conn = self
1419            .get_connection(self.connections.reader(), "find_by_status_paginated")
1420            .await?;
1421
1422        // For single status, we can paginate directly from the sorted set
1423        if statuses.len() == 1 {
1424            let sorted_key = self.relayer_status_sorted_key(relayer_id, &statuses[0]);
1425
1426            // Get total count
1427            let total: u64 = conn
1428                .zcard(&sorted_key)
1429                .await
1430                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_count"))?;
1431
1432            if total == 0 {
1433                return Ok(PaginatedResult {
1434                    items: vec![],
1435                    total: 0,
1436                    page: query.page,
1437                    per_page: query.per_page,
1438                });
1439            }
1440
1441            // Calculate pagination bounds
1442            let start = ((query.page.saturating_sub(1)) * query.per_page) as isize;
1443            let end = start + query.per_page as isize - 1;
1444
1445            // Get page of IDs directly from sorted set
1446            // REV = newest first (descending), no REV = oldest first (ascending)
1447            let mut cmd = redis::cmd("ZRANGE");
1448            cmd.arg(&sorted_key).arg(start).arg(end);
1449            if !oldest_first {
1450                cmd.arg("REV");
1451            }
1452            let page_ids: Vec<String> = cmd
1453                .query_async(&mut conn)
1454                .await
1455                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated"))?;
1456
1457            // Release connection before nested call to avoid connection doubling
1458            drop(conn);
1459
1460            let transactions = self.get_transactions_by_ids(&page_ids).await?;
1461
1462            debug!(
1463                relayer_id = %relayer_id,
1464                status = %statuses[0],
1465                total = %total,
1466                page = %query.page,
1467                page_size = %transactions.results.len(),
1468                "fetched paginated transactions by single status"
1469            );
1470
1471            return Ok(PaginatedResult {
1472                items: transactions.results,
1473                total,
1474                page: query.page,
1475                per_page: query.per_page,
1476            });
1477        }
1478
1479        // For multiple statuses, collect all IDs and merge
1480        let mut all_ids: Vec<(String, f64)> = Vec::new();
1481        for status in statuses {
1482            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1483
1484            // Get IDs with scores for proper sorting
1485            let ids_with_scores: Vec<(String, f64)> = redis::cmd("ZRANGE")
1486                .arg(&sorted_key)
1487                .arg(0)
1488                .arg(-1)
1489                .arg("WITHSCORES")
1490                .query_async(&mut conn)
1491                .await
1492                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_multi"))?;
1493
1494            all_ids.extend(ids_with_scores);
1495        }
1496
1497        // Release connection before nested call to avoid connection doubling
1498        drop(conn);
1499
1500        // Remove duplicates (keep highest/lowest score based on sort order)
1501        let mut id_map: std::collections::HashMap<String, f64> = std::collections::HashMap::new();
1502        for (id, score) in all_ids {
1503            id_map
1504                .entry(id)
1505                .and_modify(|s| {
1506                    // For oldest_first, keep the lowest score; otherwise keep highest
1507                    if oldest_first {
1508                        if score < *s {
1509                            *s = score
1510                        }
1511                    } else if score > *s {
1512                        *s = score
1513                    }
1514                })
1515                .or_insert(score);
1516        }
1517
1518        // Sort by score: descending for newest first, ascending for oldest first
1519        let mut sorted_ids: Vec<(String, f64)> = id_map.into_iter().collect();
1520        if oldest_first {
1521            sorted_ids.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1522        } else {
1523            sorted_ids.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1524        }
1525
1526        let total = sorted_ids.len() as u64;
1527
1528        if total == 0 {
1529            return Ok(PaginatedResult {
1530                items: vec![],
1531                total: 0,
1532                page: query.page,
1533                per_page: query.per_page,
1534            });
1535        }
1536
1537        // Apply pagination
1538        let start = ((query.page.saturating_sub(1)) * query.per_page) as usize;
1539        let page_ids: Vec<String> = sorted_ids
1540            .into_iter()
1541            .skip(start)
1542            .take(query.per_page as usize)
1543            .map(|(id, _)| id)
1544            .collect();
1545
1546        // Fetch only the transactions for this page
1547        let transactions = self.get_transactions_by_ids(&page_ids).await?;
1548
1549        debug!(
1550            relayer_id = %relayer_id,
1551            total = %total,
1552            page = %query.page,
1553            page_size = %transactions.results.len(),
1554            "fetched paginated transactions by status"
1555        );
1556
1557        Ok(PaginatedResult {
1558            items: transactions.results,
1559            total,
1560            page: query.page,
1561            per_page: query.per_page,
1562        })
1563    }
1564
1565    async fn find_by_nonce(
1566        &self,
1567        relayer_id: &str,
1568        nonce: u64,
1569    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
1570        let mut conn = self
1571            .get_connection(self.connections.reader(), "find_by_nonce")
1572            .await?;
1573        let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
1574
1575        // Get transaction ID with this nonce for this relayer (should be single value)
1576        let tx_id: Option<String> = conn
1577            .get(nonce_key)
1578            .await
1579            .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
1580
1581        match tx_id {
1582            Some(tx_id) => {
1583                match self.get_by_id(tx_id.clone()).await {
1584                    Ok(tx) => Ok(Some(tx)),
1585                    Err(RepositoryError::NotFound(_)) => {
1586                        // Transaction was deleted but index wasn't cleaned up
1587                        warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
1588                        Ok(None)
1589                    }
1590                    Err(e) => Err(e),
1591                }
1592            }
1593            None => Ok(None),
1594        }
1595    }
1596
1597    async fn update_status(
1598        &self,
1599        tx_id: String,
1600        status: TransactionStatus,
1601    ) -> Result<TransactionRepoModel, RepositoryError> {
1602        let update = TransactionUpdateRequest {
1603            status: Some(status),
1604            ..Default::default()
1605        };
1606        self.partial_update(tx_id, update).await
1607    }
1608
1609    async fn partial_update(
1610        &self,
1611        tx_id: String,
1612        update: TransactionUpdateRequest,
1613    ) -> Result<TransactionRepoModel, RepositoryError> {
1614        // Serialize only the non-None fields as a JSON patch.
1615        let patch_json = serde_json::to_string(&update).map_err(|e| {
1616            RepositoryError::InvalidData(format!("Failed to serialize update patch: {e}"))
1617        })?;
1618
1619        // If the update sets a final status, compute delete_at in Rust (depends on server config)
1620        // and include it in the patch so the Lua script applies it atomically.
1621        let delete_at_value = if let Some(ref status) = update.status {
1622            if FINAL_TRANSACTION_STATUSES.contains(status) {
1623                let expiration_hours = ServerConfig::get_transaction_expiration_hours();
1624                let seconds = (expiration_hours * 3600.0) as i64;
1625                let delete_time = Utc::now() + chrono::Duration::seconds(seconds);
1626                Some(delete_time.to_rfc3339())
1627            } else {
1628                None
1629            }
1630        } else {
1631            None
1632        };
1633        let delete_at_arg = delete_at_value.as_deref().unwrap_or("");
1634
1635        let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(&tx_id);
1636
1637        // Lua script: atomically applies a JSON patch to the stored transaction.
1638        // Guards: rejects status changes on already-finalized transactions.
1639        // Returns a two-element array {old_json, new_json} so Rust has the full
1640        // pre-update state for index cleanup and metrics.
1641        // Returns false if tx not found.
1642        let patch_script = Script::new(
1643            r#"
1644            local relayer_id = redis.call('GET', KEYS[1])
1645            if not relayer_id then return false end
1646
1647            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1648            local current = redis.call('GET', tx_key)
1649            if not current then return false end
1650
1651            local tx = cjson.decode(current)
1652            local patch = cjson.decode(ARGV[3])
1653
1654            -- Guard: reject status changes on finalized transactions.
1655            -- A stale worker must not resurrect a tx that another worker
1656            -- already moved to a terminal state.
1657            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1658            if final_states[tx["status"]] and patch["status"] then
1659                return {current, current}
1660            end
1661
1662            local old_snapshot = current
1663
1664            -- lua-cjson cannot distinguish empty Lua tables from empty
1665            -- arrays, so a decode/encode round-trip turns [] into {}.
1666            -- Record which keys held [] in the stored doc and the patch
1667            -- so we can restore them after cjson.encode.
1668            -- NOTE: this relies on each array-typed field having a unique key
1669            -- name across the entire JSON document (including nested objects).
1670            -- If the model ever introduces duplicate key names at different
1671            -- nesting levels (e.g. metadata.hashes), the gsub below could
1672            -- restore the wrong occurrence.
1673            local empty_arrs = {}
1674            for k in string.gmatch(current, '"([^"]+)"%s*:%s*%[%s*%]') do
1675                empty_arrs[k] = true
1676            end
1677            for k in string.gmatch(ARGV[3], '"([^"]+)"%s*:%s*%[%s*%]') do
1678                empty_arrs[k] = true
1679            end
1680
1681            for k, v in pairs(patch) do
1682                tx[k] = v
1683            end
1684
1685            -- Apply delete_at if transitioning to a final state and not already set
1686            if ARGV[4] ~= '' and (not tx["delete_at"] or tx["delete_at"] == cjson.null) then
1687                tx["delete_at"] = ARGV[4]
1688            end
1689
1690            local updated = cjson.encode(tx)
1691
1692            -- Restore empty arrays that cjson.encode converted to {}
1693            for k, _ in pairs(empty_arrs) do
1694                updated = string.gsub(
1695                    updated, '"'..k..'"%s*:%s*{}', '"'..k..'":[]', 1
1696                )
1697            end
1698
1699            redis.call('SET', tx_key, updated)
1700            return {old_snapshot, updated}
1701            "#,
1702        );
1703
1704        let result: Option<Vec<String>> = self
1705            .run_script_with_retry_vec(
1706                &patch_script,
1707                &lookup_key,
1708                &key_prefix,
1709                &key_suffix,
1710                &[&patch_json, delete_at_arg],
1711                "partial_update",
1712            )
1713            .await?;
1714
1715        let parts = result.ok_or_else(|| {
1716            RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found"))
1717        })?;
1718
1719        if parts.len() != 2 {
1720            return Err(RepositoryError::UnexpectedError(format!(
1721                "partial_update script returned {} elements, expected 2",
1722                parts.len()
1723            )));
1724        }
1725
1726        let old_json = &parts[0];
1727        let new_json = &parts[1];
1728
1729        let original_tx =
1730            self.deserialize_entity::<TransactionRepoModel>(old_json, &tx_id, "transaction")?;
1731        let updated_tx =
1732            self.deserialize_entity::<TransactionRepoModel>(new_json, &tx_id, "transaction")?;
1733
1734        // Update indexes using the full pre-update state (status, network_data, nonce, etc.)
1735        self.update_indexes(&updated_tx, Some(&original_tx)).await?;
1736
1737        debug!(tx_id = %tx_id, "successfully updated transaction via patch");
1738
1739        // Track metrics only when the persisted status actually changed.
1740        // The Lua script may silently reject a status patch on already-final
1741        // transactions, so we compare the deserialized before/after states.
1742        if original_tx.status != updated_tx.status {
1743            self.track_status_change_metrics(
1744                &original_tx,
1745                &updated_tx,
1746                &original_tx.status,
1747                &updated_tx.status,
1748            );
1749        }
1750
1751        Ok(updated_tx)
1752    }
1753
1754    async fn update_network_data(
1755        &self,
1756        tx_id: String,
1757        network_data: NetworkTransactionData,
1758    ) -> Result<TransactionRepoModel, RepositoryError> {
1759        let update = TransactionUpdateRequest {
1760            network_data: Some(network_data),
1761            ..Default::default()
1762        };
1763        self.partial_update(tx_id, update).await
1764    }
1765
1766    async fn set_sent_at(
1767        &self,
1768        tx_id: String,
1769        sent_at: String,
1770    ) -> Result<TransactionRepoModel, RepositoryError> {
1771        let update = TransactionUpdateRequest {
1772            sent_at: Some(sent_at),
1773            ..Default::default()
1774        };
1775        self.partial_update(tx_id, update).await
1776    }
1777
1778    async fn increment_status_check_failures(
1779        &self,
1780        tx_id: String,
1781    ) -> Result<TransactionRepoModel, RepositoryError> {
1782        self.run_atomic_script(
1783            r#"
1784            local function set_obj(json, key, tbl)
1785                local enc = cjson.encode(tbl)
1786                local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1787                if n > 0 then return r end
1788                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1789                if n > 0 then return r end
1790                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1791            end
1792
1793            local relayer_id = redis.call('GET', KEYS[1])
1794            if not relayer_id then return false end
1795
1796            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1797            local current = redis.call('GET', tx_key)
1798            if not current then return false end
1799
1800            local tx = cjson.decode(current)
1801            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1802            if final_states[tx["status"]] then return current end
1803
1804            local metadata = tx["metadata"]
1805            if type(metadata) ~= 'table' then metadata = {} end
1806            metadata["consecutive_failures"] = (metadata["consecutive_failures"] or 0) + 1
1807            metadata["total_failures"] = (metadata["total_failures"] or 0) + 1
1808
1809            local updated = set_obj(current, "metadata", metadata)
1810            redis.call('SET', tx_key, updated)
1811            return updated
1812            "#,
1813            &tx_id,
1814            &[],
1815            "increment_status_check_failures",
1816        )
1817        .await
1818    }
1819
1820    async fn reset_status_check_consecutive_failures(
1821        &self,
1822        tx_id: String,
1823    ) -> Result<TransactionRepoModel, RepositoryError> {
1824        self.run_atomic_script(
1825            r#"
1826            local function set_obj(json, key, tbl)
1827                local enc = cjson.encode(tbl)
1828                local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1829                if n > 0 then return r end
1830                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1831                if n > 0 then return r end
1832                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1833            end
1834
1835            local relayer_id = redis.call('GET', KEYS[1])
1836            if not relayer_id then return false end
1837
1838            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1839            local current = redis.call('GET', tx_key)
1840            if not current then return false end
1841
1842            local tx = cjson.decode(current)
1843            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1844            if final_states[tx["status"]] then return current end
1845
1846            local metadata = tx["metadata"]
1847            if type(metadata) ~= 'table' then metadata = {} end
1848            metadata["consecutive_failures"] = 0
1849
1850            local updated = set_obj(current, "metadata", metadata)
1851            redis.call('SET', tx_key, updated)
1852            return updated
1853            "#,
1854            &tx_id,
1855            &[],
1856            "reset_status_check_consecutive_failures",
1857        )
1858        .await
1859    }
1860
1861    async fn record_stellar_insufficient_fee_retry(
1862        &self,
1863        tx_id: String,
1864        sent_at: String,
1865    ) -> Result<TransactionRepoModel, RepositoryError> {
1866        self.run_atomic_script(
1867            r#"
1868            local function set_str(json, key, val)
1869                local enc = cjson.encode(val)
1870                local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1)
1871                if n > 0 then return r end
1872                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1873                if n > 0 then return r end
1874                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1875            end
1876            local function set_obj(json, key, tbl)
1877                local enc = cjson.encode(tbl)
1878                local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1879                if n > 0 then return r end
1880                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1881                if n > 0 then return r end
1882                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1883            end
1884
1885            local relayer_id = redis.call('GET', KEYS[1])
1886            if not relayer_id then return false end
1887
1888            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1889            local current = redis.call('GET', tx_key)
1890            if not current then return false end
1891
1892            local tx = cjson.decode(current)
1893            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1894            if final_states[tx["status"]] then return current end
1895
1896            local metadata = tx["metadata"]
1897            if type(metadata) ~= 'table' then metadata = {} end
1898            metadata["insufficient_fee_retries"] = (metadata["insufficient_fee_retries"] or 0) + 1
1899
1900            local updated = set_str(current, "sent_at", ARGV[3])
1901            updated = set_obj(updated, "metadata", metadata)
1902            redis.call('SET', tx_key, updated)
1903            return updated
1904            "#,
1905            &tx_id,
1906            &[&sent_at],
1907            "record_stellar_insufficient_fee_retry",
1908        )
1909        .await
1910    }
1911
1912    async fn record_stellar_try_again_later_retry(
1913        &self,
1914        tx_id: String,
1915        sent_at: String,
1916    ) -> Result<TransactionRepoModel, RepositoryError> {
1917        self.run_atomic_script(
1918            r#"
1919            local function set_str(json, key, val)
1920                local enc = cjson.encode(val)
1921                local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1)
1922                if n > 0 then return r end
1923                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1924                if n > 0 then return r end
1925                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1926            end
1927            local function set_obj(json, key, tbl)
1928                local enc = cjson.encode(tbl)
1929                local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1930                if n > 0 then return r end
1931                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1932                if n > 0 then return r end
1933                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1934            end
1935
1936            local relayer_id = redis.call('GET', KEYS[1])
1937            if not relayer_id then return false end
1938
1939            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1940            local current = redis.call('GET', tx_key)
1941            if not current then return false end
1942
1943            local tx = cjson.decode(current)
1944            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1945            if final_states[tx["status"]] then return current end
1946
1947            local metadata = tx["metadata"]
1948            if type(metadata) ~= 'table' then metadata = {} end
1949            metadata["try_again_later_retries"] = (metadata["try_again_later_retries"] or 0) + 1
1950
1951            local updated = set_str(current, "sent_at", ARGV[3])
1952            updated = set_obj(updated, "metadata", metadata)
1953            redis.call('SET', tx_key, updated)
1954            return updated
1955            "#,
1956            &tx_id,
1957            &[&sent_at],
1958            "record_stellar_try_again_later_retry",
1959        )
1960        .await
1961    }
1962
1963    async fn set_confirmed_at(
1964        &self,
1965        tx_id: String,
1966        confirmed_at: String,
1967    ) -> Result<TransactionRepoModel, RepositoryError> {
1968        let update = TransactionUpdateRequest {
1969            confirmed_at: Some(confirmed_at),
1970            ..Default::default()
1971        };
1972        self.partial_update(tx_id, update).await
1973    }
1974
1975    /// Count transactions by status using Redis ZCARD (O(1) per sorted set).
1976    /// Much more efficient than find_by_status when you only need the count.
1977    /// Triggers migration from legacy SETs if needed.
1978    async fn count_by_status(
1979        &self,
1980        relayer_id: &str,
1981        statuses: &[TransactionStatus],
1982    ) -> Result<u64, RepositoryError> {
1983        let mut conn = self
1984            .get_connection(self.connections.reader(), "count_by_status")
1985            .await?;
1986        let mut total_count: u64 = 0;
1987
1988        for status in statuses {
1989            // Ensure sorted set is migrated
1990            self.ensure_status_sorted_set(relayer_id, status).await?;
1991
1992            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1993            let count: u64 = conn
1994                .zcard(&sorted_key)
1995                .await
1996                .map_err(|e| self.map_redis_error(e, "count_by_status"))?;
1997            total_count += count;
1998        }
1999
2000        debug!(relayer_id = %relayer_id, count = %total_count, "counted transactions by status");
2001        Ok(total_count)
2002    }
2003
2004    async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
2005        if ids.is_empty() {
2006            debug!("no transaction IDs provided for batch delete");
2007            return Ok(BatchDeleteResult::default());
2008        }
2009
2010        debug!(count = %ids.len(), "batch deleting transactions by IDs (with fetch)");
2011
2012        // Fetch transactions to get their data for index cleanup
2013        let batch_result = self.get_transactions_by_ids(&ids).await?;
2014
2015        // Convert to delete requests
2016        let requests: Vec<TransactionDeleteRequest> = batch_result
2017            .results
2018            .iter()
2019            .map(|tx| TransactionDeleteRequest {
2020                id: tx.id.clone(),
2021                relayer_id: tx.relayer_id.clone(),
2022                nonce: self.extract_nonce(&tx.network_data),
2023            })
2024            .collect();
2025
2026        // Track IDs that weren't found
2027        let mut result = self.delete_by_requests(requests).await?;
2028
2029        // Add the IDs that weren't found during fetch
2030        for id in batch_result.failed_ids {
2031            result
2032                .failed
2033                .push((id.clone(), format!("Transaction with ID {id} not found")));
2034        }
2035
2036        Ok(result)
2037    }
2038
2039    async fn delete_by_requests(
2040        &self,
2041        requests: Vec<TransactionDeleteRequest>,
2042    ) -> Result<BatchDeleteResult, RepositoryError> {
2043        if requests.is_empty() {
2044            debug!("no delete requests provided for batch delete");
2045            return Ok(BatchDeleteResult::default());
2046        }
2047
2048        debug!(count = %requests.len(), "batch deleting transactions by requests (no fetch)");
2049        let mut conn = self
2050            .get_connection(self.connections.primary(), "batch_delete_no_fetch")
2051            .await?;
2052        let mut pipe = redis::pipe();
2053        pipe.atomic();
2054
2055        // All possible statuses for index cleanup
2056        let all_statuses = [
2057            TransactionStatus::Canceled,
2058            TransactionStatus::Pending,
2059            TransactionStatus::Sent,
2060            TransactionStatus::Submitted,
2061            TransactionStatus::Mined,
2062            TransactionStatus::Confirmed,
2063            TransactionStatus::Failed,
2064            TransactionStatus::Expired,
2065        ];
2066
2067        // Build pipeline for all deletions and index removals
2068        for req in &requests {
2069            // Delete transaction data
2070            let tx_key = self.tx_key(&req.relayer_id, &req.id);
2071            pipe.del(&tx_key);
2072
2073            // Delete reverse lookup
2074            let reverse_key = self.tx_to_relayer_key(&req.id);
2075            pipe.del(&reverse_key);
2076
2077            // Remove from all possible status indexes
2078            for status in &all_statuses {
2079                let status_sorted_key = self.relayer_status_sorted_key(&req.relayer_id, status);
2080                pipe.zrem(&status_sorted_key, &req.id);
2081
2082                let status_legacy_key = self.relayer_status_key(&req.relayer_id, status);
2083                pipe.srem(&status_legacy_key, &req.id);
2084            }
2085
2086            // Remove nonce index if exists
2087            if let Some(nonce) = req.nonce {
2088                let nonce_key = self.relayer_nonce_key(&req.relayer_id, nonce);
2089                pipe.del(&nonce_key);
2090            }
2091
2092            // Remove from per-relayer sorted set by created_at
2093            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&req.relayer_id);
2094            pipe.zrem(&relayer_sorted_key, &req.id);
2095        }
2096
2097        // Execute the entire pipeline in one round-trip
2098        match pipe.exec_async(&mut conn).await {
2099            Ok(_) => {
2100                let deleted_count = requests.len();
2101                debug!(
2102                    deleted_count = %deleted_count,
2103                    "batch delete completed"
2104                );
2105                Ok(BatchDeleteResult {
2106                    deleted_count,
2107                    failed: vec![],
2108                })
2109            }
2110            Err(e) => {
2111                error!(error = %e, "batch delete pipeline failed");
2112                // Mark all requests as failed
2113                let failed: Vec<(String, String)> = requests
2114                    .iter()
2115                    .map(|req| (req.id.clone(), format!("Redis pipeline error: {e}")))
2116                    .collect();
2117                Ok(BatchDeleteResult {
2118                    deleted_count: 0,
2119                    failed,
2120                })
2121            }
2122        }
2123    }
2124}
2125
2126#[cfg(test)]
2127mod tests {
2128    use super::*;
2129    use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
2130    use alloy::primitives::U256;
2131    use deadpool_redis::{Config, Runtime};
2132    use lazy_static::lazy_static;
2133    use std::str::FromStr;
2134    use tokio;
2135    use uuid::Uuid;
2136
2137    use tokio::sync::Mutex;
2138
2139    // Use a mutex to ensure tests don't run in parallel when modifying env vars
2140    lazy_static! {
2141        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
2142    }
2143
2144    // Helper function to create test transactions
2145    fn create_test_transaction(id: &str) -> TransactionRepoModel {
2146        TransactionRepoModel {
2147            id: id.to_string(),
2148            relayer_id: "relayer-1".to_string(),
2149            status: TransactionStatus::Pending,
2150            status_reason: None,
2151            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
2152            sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
2153            confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
2154            valid_until: None,
2155            delete_at: None,
2156            network_type: NetworkType::Evm,
2157            priced_at: None,
2158            hashes: vec![],
2159            network_data: NetworkTransactionData::Evm(EvmTransactionData {
2160                gas_price: Some(1000000000),
2161                gas_limit: Some(21000),
2162                nonce: Some(1),
2163                value: U256::from_str("1000000000000000000").unwrap(),
2164                data: Some("0x".to_string()),
2165                from: "0xSender".to_string(),
2166                to: Some("0xRecipient".to_string()),
2167                chain_id: 1,
2168                signature: None,
2169                hash: Some(format!("0x{id}")),
2170                speed: Some(Speed::Fast),
2171                max_fee_per_gas: None,
2172                max_priority_fee_per_gas: None,
2173                raw: None,
2174            }),
2175            noop_count: None,
2176            is_canceled: Some(false),
2177            metadata: None,
2178        }
2179    }
2180
2181    fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
2182        let mut tx = create_test_transaction(id);
2183        tx.relayer_id = relayer_id.to_string();
2184        tx
2185    }
2186
2187    fn create_test_transaction_with_status(
2188        id: &str,
2189        relayer_id: &str,
2190        status: TransactionStatus,
2191    ) -> TransactionRepoModel {
2192        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
2193        tx.status = status;
2194        tx
2195    }
2196
2197    fn create_test_transaction_with_nonce(
2198        id: &str,
2199        nonce: u64,
2200        relayer_id: &str,
2201    ) -> TransactionRepoModel {
2202        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
2203        if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
2204            evm_data.nonce = Some(nonce);
2205        }
2206        tx
2207    }
2208
2209    async fn setup_test_repo() -> RedisTransactionRepository {
2210        // Use a mock Redis URL - in real integration tests, this would connect to a test Redis instance
2211        let redis_url = std::env::var("REDIS_TEST_URL")
2212            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
2213
2214        let cfg = Config::from_url(&redis_url);
2215        let pool = Arc::new(
2216            cfg.builder()
2217                .expect("Failed to create pool builder")
2218                .max_size(16)
2219                .runtime(Runtime::Tokio1)
2220                .build()
2221                .expect("Failed to build Redis pool"),
2222        );
2223
2224        // Create RedisConnections with same pool for both primary and reader (for testing)
2225        let connections = Arc::new(RedisConnections::new_single_pool(pool));
2226
2227        let random_id = Uuid::new_v4().to_string();
2228        let key_prefix = format!("test_prefix:{random_id}");
2229
2230        RedisTransactionRepository::new(connections, key_prefix)
2231            .expect("Failed to create RedisTransactionRepository")
2232    }
2233
2234    #[tokio::test]
2235    #[ignore = "Requires active Redis instance"]
2236    async fn test_new_repository_creation() {
2237        let repo = setup_test_repo().await;
2238        assert!(repo.key_prefix.contains("test_prefix"));
2239    }
2240
2241    #[tokio::test]
2242    #[ignore = "Requires active Redis instance"]
2243    async fn test_new_repository_empty_prefix_fails() {
2244        let redis_url = std::env::var("REDIS_TEST_URL")
2245            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
2246        let cfg = Config::from_url(&redis_url);
2247        let pool = Arc::new(
2248            cfg.builder()
2249                .expect("Failed to create pool builder")
2250                .max_size(16)
2251                .runtime(Runtime::Tokio1)
2252                .build()
2253                .expect("Failed to build Redis pool"),
2254        );
2255        let connections = Arc::new(RedisConnections::new_single_pool(pool));
2256
2257        let result = RedisTransactionRepository::new(connections, "".to_string());
2258        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2259    }
2260
2261    #[tokio::test]
2262    #[ignore = "Requires active Redis instance"]
2263    async fn test_key_generation() {
2264        let repo = setup_test_repo().await;
2265
2266        assert!(repo
2267            .tx_key("relayer-1", "test-id")
2268            .contains(":relayer:relayer-1:tx:test-id"));
2269        assert!(repo
2270            .tx_to_relayer_key("test-id")
2271            .contains(":relayer:tx_to_relayer:test-id"));
2272        assert!(repo.relayer_list_key().contains(":relayer_list"));
2273        assert!(repo
2274            .relayer_status_key("relayer-1", &TransactionStatus::Pending)
2275            .contains(":relayer:relayer-1:status:Pending"));
2276        assert!(repo
2277            .relayer_nonce_key("relayer-1", 42)
2278            .contains(":relayer:relayer-1:nonce:42"));
2279    }
2280
2281    #[tokio::test]
2282    #[ignore = "Requires active Redis instance"]
2283    async fn test_serialize_deserialize_transaction() {
2284        let repo = setup_test_repo().await;
2285        let tx = create_test_transaction("test-1");
2286
2287        let serialized = repo
2288            .serialize_entity(&tx, |t| &t.id, "transaction")
2289            .expect("Serialization should succeed");
2290        let deserialized: TransactionRepoModel = repo
2291            .deserialize_entity(&serialized, "test-1", "transaction")
2292            .expect("Deserialization should succeed");
2293
2294        assert_eq!(tx.id, deserialized.id);
2295        assert_eq!(tx.relayer_id, deserialized.relayer_id);
2296        assert_eq!(tx.status, deserialized.status);
2297    }
2298
2299    #[tokio::test]
2300    #[ignore = "Requires active Redis instance"]
2301    async fn test_extract_nonce() {
2302        let repo = setup_test_repo().await;
2303        let random_id = Uuid::new_v4().to_string();
2304        let relayer_id = Uuid::new_v4().to_string();
2305        let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2306
2307        let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
2308        assert_eq!(nonce, Some(42));
2309    }
2310
2311    #[tokio::test]
2312    #[ignore = "Requires active Redis instance"]
2313    async fn test_create_transaction() {
2314        let repo = setup_test_repo().await;
2315        let random_id = Uuid::new_v4().to_string();
2316        let tx = create_test_transaction(&random_id);
2317
2318        let result = repo.create(tx.clone()).await.unwrap();
2319        assert_eq!(result.id, tx.id);
2320    }
2321
2322    #[tokio::test]
2323    #[ignore = "Requires active Redis instance"]
2324    async fn test_get_transaction() {
2325        let repo = setup_test_repo().await;
2326        let random_id = Uuid::new_v4().to_string();
2327        let tx = create_test_transaction(&random_id);
2328
2329        repo.create(tx.clone()).await.unwrap();
2330        let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
2331        assert_eq!(stored.id, tx.id);
2332        assert_eq!(stored.relayer_id, tx.relayer_id);
2333    }
2334
2335    #[tokio::test]
2336    #[ignore = "Requires active Redis instance"]
2337    async fn test_update_transaction() {
2338        let repo = setup_test_repo().await;
2339        let random_id = Uuid::new_v4().to_string();
2340        let mut tx = create_test_transaction(&random_id);
2341
2342        repo.create(tx.clone()).await.unwrap();
2343        tx.status = TransactionStatus::Confirmed;
2344
2345        let updated = repo.update(random_id.to_string(), tx).await.unwrap();
2346        assert!(matches!(updated.status, TransactionStatus::Confirmed));
2347    }
2348
2349    #[tokio::test]
2350    #[ignore = "Requires active Redis instance"]
2351    async fn test_delete_transaction() {
2352        let repo = setup_test_repo().await;
2353        let random_id = Uuid::new_v4().to_string();
2354        let tx = create_test_transaction(&random_id);
2355
2356        repo.create(tx).await.unwrap();
2357        repo.delete_by_id(random_id.to_string()).await.unwrap();
2358
2359        let result = repo.get_by_id(random_id.to_string()).await;
2360        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2361    }
2362
2363    #[tokio::test]
2364    #[ignore = "Requires active Redis instance"]
2365    async fn test_list_all_transactions() {
2366        let repo = setup_test_repo().await;
2367        let random_id = Uuid::new_v4().to_string();
2368        let random_id2 = Uuid::new_v4().to_string();
2369
2370        let tx1 = create_test_transaction(&random_id);
2371        let tx2 = create_test_transaction(&random_id2);
2372
2373        repo.create(tx1).await.unwrap();
2374        repo.create(tx2).await.unwrap();
2375
2376        let transactions = repo.list_all().await.unwrap();
2377        assert!(transactions.len() >= 2);
2378    }
2379
2380    #[tokio::test]
2381    #[ignore = "Requires active Redis instance"]
2382    async fn test_count_transactions() {
2383        let repo = setup_test_repo().await;
2384        let random_id = Uuid::new_v4().to_string();
2385        let tx = create_test_transaction(&random_id);
2386
2387        let count = repo.count().await.unwrap();
2388        repo.create(tx).await.unwrap();
2389        assert!(repo.count().await.unwrap() > count);
2390    }
2391
2392    #[tokio::test]
2393    #[ignore = "Requires active Redis instance"]
2394    async fn test_get_nonexistent_transaction() {
2395        let repo = setup_test_repo().await;
2396        let result = repo.get_by_id("nonexistent".to_string()).await;
2397        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2398    }
2399
2400    #[tokio::test]
2401    #[ignore = "Requires active Redis instance"]
2402    async fn test_duplicate_transaction_creation() {
2403        let repo = setup_test_repo().await;
2404        let random_id = Uuid::new_v4().to_string();
2405
2406        let tx = create_test_transaction(&random_id);
2407
2408        repo.create(tx.clone()).await.unwrap();
2409        let result = repo.create(tx).await;
2410
2411        assert!(matches!(
2412            result,
2413            Err(RepositoryError::ConstraintViolation(_))
2414        ));
2415    }
2416
2417    #[tokio::test]
2418    #[ignore = "Requires active Redis instance"]
2419    async fn test_update_nonexistent_transaction() {
2420        let repo = setup_test_repo().await;
2421        let tx = create_test_transaction("test-1");
2422
2423        let result = repo.update("nonexistent".to_string(), tx).await;
2424        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2425    }
2426
2427    #[tokio::test]
2428    #[ignore = "Requires active Redis instance"]
2429    async fn test_list_paginated() {
2430        let repo = setup_test_repo().await;
2431
2432        // Create multiple transactions
2433        for _ in 1..=10 {
2434            let random_id = Uuid::new_v4().to_string();
2435            let tx = create_test_transaction(&random_id);
2436            repo.create(tx).await.unwrap();
2437        }
2438
2439        // Test first page with 3 items per page
2440        let query = PaginationQuery {
2441            page: 1,
2442            per_page: 3,
2443        };
2444        let result = repo.list_paginated(query).await.unwrap();
2445        assert_eq!(result.items.len(), 3);
2446        assert!(result.total >= 10);
2447        assert_eq!(result.page, 1);
2448        assert_eq!(result.per_page, 3);
2449
2450        // Test empty page (beyond total items)
2451        let query = PaginationQuery {
2452            page: 1000,
2453            per_page: 3,
2454        };
2455        let result = repo.list_paginated(query).await.unwrap();
2456        assert_eq!(result.items.len(), 0);
2457    }
2458
2459    #[tokio::test]
2460    #[ignore = "Requires active Redis instance"]
2461    async fn test_find_by_relayer_id() {
2462        let repo = setup_test_repo().await;
2463        let random_id = Uuid::new_v4().to_string();
2464        let random_id2 = Uuid::new_v4().to_string();
2465        let random_id3 = Uuid::new_v4().to_string();
2466
2467        let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
2468        let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
2469        let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
2470
2471        repo.create(tx1).await.unwrap();
2472        repo.create(tx2).await.unwrap();
2473        repo.create(tx3).await.unwrap();
2474
2475        // Test finding transactions for relayer-1
2476        let query = PaginationQuery {
2477            page: 1,
2478            per_page: 10,
2479        };
2480        let result = repo
2481            .find_by_relayer_id("relayer-1", query.clone())
2482            .await
2483            .unwrap();
2484        assert!(result.total >= 2);
2485        assert!(result.items.len() >= 2);
2486        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
2487
2488        // Test finding transactions for relayer-2
2489        let result = repo
2490            .find_by_relayer_id("relayer-2", query.clone())
2491            .await
2492            .unwrap();
2493        assert!(result.total >= 1);
2494        assert!(!result.items.is_empty());
2495        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
2496
2497        // Test finding transactions for non-existent relayer
2498        let result = repo
2499            .find_by_relayer_id("non-existent", query.clone())
2500            .await
2501            .unwrap();
2502        assert_eq!(result.total, 0);
2503        assert_eq!(result.items.len(), 0);
2504    }
2505
2506    #[tokio::test]
2507    #[ignore = "Requires active Redis instance"]
2508    async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
2509        let repo = setup_test_repo().await;
2510        let relayer_id = Uuid::new_v4().to_string();
2511
2512        // Create transactions with different created_at timestamps
2513        let mut tx1 = create_test_transaction_with_relayer("test-1", &relayer_id);
2514        tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); // Oldest
2515
2516        let mut tx2 = create_test_transaction_with_relayer("test-2", &relayer_id);
2517        tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); // Middle
2518
2519        let mut tx3 = create_test_transaction_with_relayer("test-3", &relayer_id);
2520        tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); // Newest
2521
2522        // Create transactions in non-chronological order to ensure sorting works
2523        repo.create(tx2.clone()).await.unwrap(); // Middle first
2524        repo.create(tx1.clone()).await.unwrap(); // Oldest second
2525        repo.create(tx3.clone()).await.unwrap(); // Newest last
2526
2527        let query = PaginationQuery {
2528            page: 1,
2529            per_page: 10,
2530        };
2531        let result = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2532
2533        assert_eq!(result.total, 3);
2534        assert_eq!(result.items.len(), 3);
2535
2536        // Verify transactions are sorted by created_at descending (newest first)
2537        assert_eq!(
2538            result.items[0].id, "test-3",
2539            "First item should be newest (test-3)"
2540        );
2541        assert_eq!(
2542            result.items[0].created_at,
2543            "2025-01-27T14:00:00.000000+00:00"
2544        );
2545
2546        assert_eq!(
2547            result.items[1].id, "test-2",
2548            "Second item should be middle (test-2)"
2549        );
2550        assert_eq!(
2551            result.items[1].created_at,
2552            "2025-01-27T12:00:00.000000+00:00"
2553        );
2554
2555        assert_eq!(
2556            result.items[2].id, "test-1",
2557            "Third item should be oldest (test-1)"
2558        );
2559        assert_eq!(
2560            result.items[2].created_at,
2561            "2025-01-27T10:00:00.000000+00:00"
2562        );
2563    }
2564
2565    #[tokio::test]
2566    #[ignore = "Requires active Redis instance"]
2567    async fn test_find_by_status() {
2568        let repo = setup_test_repo().await;
2569        let random_id = Uuid::new_v4().to_string();
2570        let random_id2 = Uuid::new_v4().to_string();
2571        let random_id3 = Uuid::new_v4().to_string();
2572        let relayer_id = Uuid::new_v4().to_string();
2573        let tx1 = create_test_transaction_with_status(
2574            &random_id,
2575            &relayer_id,
2576            TransactionStatus::Pending,
2577        );
2578        let tx2 =
2579            create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
2580        let tx3 = create_test_transaction_with_status(
2581            &random_id3,
2582            &relayer_id,
2583            TransactionStatus::Confirmed,
2584        );
2585
2586        repo.create(tx1).await.unwrap();
2587        repo.create(tx2).await.unwrap();
2588        repo.create(tx3).await.unwrap();
2589
2590        // Test finding pending transactions
2591        let result = repo
2592            .find_by_status(&relayer_id, &[TransactionStatus::Pending])
2593            .await
2594            .unwrap();
2595        assert_eq!(result.len(), 1);
2596        assert_eq!(result[0].status, TransactionStatus::Pending);
2597
2598        // Test finding multiple statuses
2599        let result = repo
2600            .find_by_status(
2601                &relayer_id,
2602                &[TransactionStatus::Pending, TransactionStatus::Sent],
2603            )
2604            .await
2605            .unwrap();
2606        assert_eq!(result.len(), 2);
2607
2608        // Test finding non-existent status
2609        let result = repo
2610            .find_by_status(&relayer_id, &[TransactionStatus::Failed])
2611            .await
2612            .unwrap();
2613        assert_eq!(result.len(), 0);
2614    }
2615
2616    #[tokio::test]
2617    #[ignore = "Requires active Redis instance"]
2618    async fn test_find_by_status_paginated() {
2619        let repo = setup_test_repo().await;
2620        let relayer_id = Uuid::new_v4().to_string();
2621
2622        // Create 5 pending transactions with different timestamps
2623        for i in 1..=5 {
2624            let tx_id = Uuid::new_v4().to_string();
2625            let mut tx = create_test_transaction_with_status(
2626                &tx_id,
2627                &relayer_id,
2628                TransactionStatus::Pending,
2629            );
2630            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2631            repo.create(tx).await.unwrap();
2632        }
2633
2634        // Create 2 confirmed transactions
2635        for i in 6..=7 {
2636            let tx_id = Uuid::new_v4().to_string();
2637            let mut tx = create_test_transaction_with_status(
2638                &tx_id,
2639                &relayer_id,
2640                TransactionStatus::Confirmed,
2641            );
2642            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2643            repo.create(tx).await.unwrap();
2644        }
2645
2646        // Test first page (2 items per page)
2647        let query = PaginationQuery {
2648            page: 1,
2649            per_page: 2,
2650        };
2651        let result = repo
2652            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2653            .await
2654            .unwrap();
2655
2656        assert_eq!(result.total, 5);
2657        assert_eq!(result.items.len(), 2);
2658        assert_eq!(result.page, 1);
2659        assert_eq!(result.per_page, 2);
2660
2661        // Test second page
2662        let query = PaginationQuery {
2663            page: 2,
2664            per_page: 2,
2665        };
2666        let result = repo
2667            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2668            .await
2669            .unwrap();
2670
2671        assert_eq!(result.total, 5);
2672        assert_eq!(result.items.len(), 2);
2673        assert_eq!(result.page, 2);
2674
2675        // Test last page (partial)
2676        let query = PaginationQuery {
2677            page: 3,
2678            per_page: 2,
2679        };
2680        let result = repo
2681            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2682            .await
2683            .unwrap();
2684
2685        assert_eq!(result.total, 5);
2686        assert_eq!(result.items.len(), 1);
2687
2688        // Test multiple statuses
2689        let query = PaginationQuery {
2690            page: 1,
2691            per_page: 10,
2692        };
2693        let result = repo
2694            .find_by_status_paginated(
2695                &relayer_id,
2696                &[TransactionStatus::Pending, TransactionStatus::Confirmed],
2697                query,
2698                false,
2699            )
2700            .await
2701            .unwrap();
2702
2703        assert_eq!(result.total, 7);
2704        assert_eq!(result.items.len(), 7);
2705
2706        // Test empty result
2707        let query = PaginationQuery {
2708            page: 1,
2709            per_page: 10,
2710        };
2711        let result = repo
2712            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Failed], query, false)
2713            .await
2714            .unwrap();
2715
2716        assert_eq!(result.total, 0);
2717        assert_eq!(result.items.len(), 0);
2718    }
2719
2720    #[tokio::test]
2721    #[ignore = "Requires active Redis instance"]
2722    async fn test_find_by_status_paginated_oldest_first() {
2723        let repo = setup_test_repo().await;
2724        let relayer_id = Uuid::new_v4().to_string();
2725
2726        // Create 5 pending transactions with ascending timestamps
2727        for i in 1..=5 {
2728            let tx_id = format!("tx{}-{}", i, Uuid::new_v4());
2729            let mut tx = create_test_transaction(&tx_id);
2730            tx.relayer_id = relayer_id.clone();
2731            tx.status = TransactionStatus::Pending;
2732            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2733            repo.create(tx).await.unwrap();
2734        }
2735
2736        // Test oldest_first: true - should return oldest transactions first
2737        let query = PaginationQuery {
2738            page: 1,
2739            per_page: 3,
2740        };
2741        let result = repo
2742            .find_by_status_paginated(
2743                &relayer_id,
2744                &[TransactionStatus::Pending],
2745                query.clone(),
2746                true,
2747            )
2748            .await
2749            .unwrap();
2750
2751        assert_eq!(result.total, 5);
2752        assert_eq!(result.items.len(), 3);
2753        // Verify ordering: oldest first (11:00, 12:00, 13:00)
2754        assert!(
2755            result.items[0].created_at < result.items[1].created_at,
2756            "First item should be older than second"
2757        );
2758        assert!(
2759            result.items[1].created_at < result.items[2].created_at,
2760            "Second item should be older than third"
2761        );
2762
2763        // Contrast with oldest_first: false - should return newest first
2764        let result_newest = repo
2765            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2766            .await
2767            .unwrap();
2768
2769        assert_eq!(result_newest.items.len(), 3);
2770        // Verify ordering: newest first (15:00, 14:00, 13:00)
2771        assert!(
2772            result_newest.items[0].created_at > result_newest.items[1].created_at,
2773            "First item should be newer than second"
2774        );
2775        assert!(
2776            result_newest.items[1].created_at > result_newest.items[2].created_at,
2777            "Second item should be newer than third"
2778        );
2779    }
2780
2781    #[tokio::test]
2782    #[ignore = "Requires active Redis instance"]
2783    async fn test_find_by_status_paginated_oldest_first_single_item() {
2784        let repo = setup_test_repo().await;
2785        let relayer_id = Uuid::new_v4().to_string();
2786
2787        // Create transactions with specific timestamps
2788        let timestamps = [
2789            "2025-01-27T08:00:00.000000+00:00", // oldest
2790            "2025-01-27T10:00:00.000000+00:00", // middle
2791            "2025-01-27T12:00:00.000000+00:00", // newest
2792        ];
2793
2794        let mut oldest_id = String::new();
2795        let mut newest_id = String::new();
2796
2797        for (i, timestamp) in timestamps.iter().enumerate() {
2798            let tx_id = format!("tx-{}-{}", i, Uuid::new_v4());
2799            if i == 0 {
2800                oldest_id = tx_id.clone();
2801            }
2802            if i == 2 {
2803                newest_id = tx_id.clone();
2804            }
2805            let mut tx = create_test_transaction(&tx_id);
2806            tx.relayer_id = relayer_id.clone();
2807            tx.status = TransactionStatus::Pending;
2808            tx.created_at = timestamp.to_string();
2809            repo.create(tx).await.unwrap();
2810        }
2811
2812        // Request just 1 item with oldest_first: true
2813        let query = PaginationQuery {
2814            page: 1,
2815            per_page: 1,
2816        };
2817        let result = repo
2818            .find_by_status_paginated(
2819                &relayer_id,
2820                &[TransactionStatus::Pending],
2821                query.clone(),
2822                true,
2823            )
2824            .await
2825            .unwrap();
2826
2827        assert_eq!(result.total, 3);
2828        assert_eq!(result.items.len(), 1);
2829        assert_eq!(
2830            result.items[0].id, oldest_id,
2831            "With oldest_first=true and per_page=1, should return the oldest transaction"
2832        );
2833
2834        // Contrast with oldest_first: false
2835        let result = repo
2836            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2837            .await
2838            .unwrap();
2839
2840        assert_eq!(result.items.len(), 1);
2841        assert_eq!(
2842            result.items[0].id, newest_id,
2843            "With oldest_first=false and per_page=1, should return the newest transaction"
2844        );
2845    }
2846
2847    #[tokio::test]
2848    #[ignore = "Requires active Redis instance"]
2849    async fn test_find_by_nonce() {
2850        let repo = setup_test_repo().await;
2851        let random_id = Uuid::new_v4().to_string();
2852        let random_id2 = Uuid::new_v4().to_string();
2853        let relayer_id = Uuid::new_v4().to_string();
2854
2855        let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2856        let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
2857
2858        repo.create(tx1.clone()).await.unwrap();
2859        repo.create(tx2).await.unwrap();
2860
2861        // Test finding existing nonce
2862        let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2863        assert!(result.is_some());
2864        assert_eq!(result.unwrap().id, random_id);
2865
2866        // Test finding non-existent nonce
2867        let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
2868        assert!(result.is_none());
2869
2870        // Test finding nonce for non-existent relayer
2871        let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
2872        assert!(result.is_none());
2873    }
2874
2875    #[tokio::test]
2876    #[ignore = "Requires active Redis instance"]
2877    async fn test_update_status() {
2878        let repo = setup_test_repo().await;
2879        let random_id = Uuid::new_v4().to_string();
2880        let tx = create_test_transaction(&random_id);
2881
2882        repo.create(tx).await.unwrap();
2883        let updated = repo
2884            .update_status(random_id.to_string(), TransactionStatus::Confirmed)
2885            .await
2886            .unwrap();
2887        assert_eq!(updated.status, TransactionStatus::Confirmed);
2888    }
2889
2890    #[tokio::test]
2891    #[ignore = "Requires active Redis instance"]
2892    async fn test_partial_update() {
2893        let repo = setup_test_repo().await;
2894        let random_id = Uuid::new_v4().to_string();
2895        let tx = create_test_transaction(&random_id);
2896
2897        repo.create(tx).await.unwrap();
2898
2899        let update = TransactionUpdateRequest {
2900            status: Some(TransactionStatus::Sent),
2901            status_reason: Some("Transaction sent".to_string()),
2902            sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
2903            confirmed_at: None,
2904            network_data: None,
2905            hashes: None,
2906            is_canceled: None,
2907            priced_at: None,
2908            noop_count: None,
2909            delete_at: None,
2910            metadata: None,
2911        };
2912
2913        let updated = repo
2914            .partial_update(random_id.to_string(), update)
2915            .await
2916            .unwrap();
2917        assert_eq!(updated.status, TransactionStatus::Sent);
2918        assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
2919        assert_eq!(
2920            updated.sent_at,
2921            Some("2025-01-27T16:00:00.000000+00:00".to_string())
2922        );
2923    }
2924
2925    #[tokio::test]
2926    #[ignore = "Requires active Redis instance"]
2927    async fn test_set_sent_at() {
2928        let repo = setup_test_repo().await;
2929        let random_id = Uuid::new_v4().to_string();
2930        let tx = create_test_transaction(&random_id);
2931
2932        repo.create(tx).await.unwrap();
2933        let updated = repo
2934            .set_sent_at(
2935                random_id.to_string(),
2936                "2025-01-27T16:00:00.000000+00:00".to_string(),
2937            )
2938            .await
2939            .unwrap();
2940        assert_eq!(
2941            updated.sent_at,
2942            Some("2025-01-27T16:00:00.000000+00:00".to_string())
2943        );
2944    }
2945
2946    #[tokio::test]
2947    #[ignore = "Requires active Redis instance"]
2948    async fn test_set_confirmed_at() {
2949        let repo = setup_test_repo().await;
2950        let random_id = Uuid::new_v4().to_string();
2951        let tx = create_test_transaction(&random_id);
2952
2953        repo.create(tx).await.unwrap();
2954        let updated = repo
2955            .set_confirmed_at(
2956                random_id.to_string(),
2957                "2025-01-27T16:00:00.000000+00:00".to_string(),
2958            )
2959            .await
2960            .unwrap();
2961        assert_eq!(
2962            updated.confirmed_at,
2963            Some("2025-01-27T16:00:00.000000+00:00".to_string())
2964        );
2965    }
2966
2967    #[tokio::test]
2968    #[ignore = "Requires active Redis instance"]
2969    async fn test_update_network_data() {
2970        let repo = setup_test_repo().await;
2971        let random_id = Uuid::new_v4().to_string();
2972        let tx = create_test_transaction(&random_id);
2973
2974        repo.create(tx).await.unwrap();
2975
2976        let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
2977            gas_price: Some(2000000000),
2978            gas_limit: Some(42000),
2979            nonce: Some(2),
2980            value: U256::from_str("2000000000000000000").unwrap(),
2981            data: Some("0x1234".to_string()),
2982            from: "0xNewSender".to_string(),
2983            to: Some("0xNewRecipient".to_string()),
2984            chain_id: 1,
2985            signature: None,
2986            hash: Some("0xnewhash".to_string()),
2987            speed: Some(Speed::SafeLow),
2988            max_fee_per_gas: None,
2989            max_priority_fee_per_gas: None,
2990            raw: None,
2991        });
2992
2993        let updated = repo
2994            .update_network_data(random_id.to_string(), new_network_data.clone())
2995            .await
2996            .unwrap();
2997        assert_eq!(
2998            updated
2999                .network_data
3000                .get_evm_transaction_data()
3001                .unwrap()
3002                .hash,
3003            new_network_data.get_evm_transaction_data().unwrap().hash
3004        );
3005    }
3006
3007    #[tokio::test]
3008    #[ignore = "Requires active Redis instance"]
3009    async fn test_debug_implementation() {
3010        let repo = setup_test_repo().await;
3011        let debug_str = format!("{repo:?}");
3012        assert!(debug_str.contains("RedisTransactionRepository"));
3013        assert!(debug_str.contains("test_prefix"));
3014    }
3015
3016    #[tokio::test]
3017    #[ignore = "Requires active Redis instance"]
3018    async fn test_error_handling_empty_id() {
3019        let repo = setup_test_repo().await;
3020
3021        let result = repo.get_by_id("".to_string()).await;
3022        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3023
3024        let result = repo
3025            .update("".to_string(), create_test_transaction("test"))
3026            .await;
3027        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3028
3029        let result = repo.delete_by_id("".to_string()).await;
3030        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3031    }
3032
3033    #[tokio::test]
3034    #[ignore = "Requires active Redis instance"]
3035    async fn test_pagination_validation() {
3036        let repo = setup_test_repo().await;
3037
3038        let query = PaginationQuery {
3039            page: 1,
3040            per_page: 0,
3041        };
3042        let result = repo.list_paginated(query).await;
3043        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3044    }
3045
3046    #[tokio::test]
3047    #[ignore = "Requires active Redis instance"]
3048    async fn test_index_consistency() {
3049        let repo = setup_test_repo().await;
3050        let random_id = Uuid::new_v4().to_string();
3051        let relayer_id = Uuid::new_v4().to_string();
3052        let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
3053
3054        // Create transaction
3055        repo.create(tx.clone()).await.unwrap();
3056
3057        // Verify it can be found by nonce
3058        let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
3059        assert!(found.is_some());
3060
3061        // Update the transaction with a new nonce
3062        let mut updated_tx = tx.clone();
3063        if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
3064            evm_data.nonce = Some(43);
3065        }
3066
3067        repo.update(random_id.to_string(), updated_tx)
3068            .await
3069            .unwrap();
3070
3071        // Verify old nonce index is cleaned up
3072        let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
3073        assert!(old_nonce_result.is_none());
3074
3075        // Verify new nonce index works
3076        let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
3077        assert!(new_nonce_result.is_some());
3078    }
3079
3080    #[tokio::test]
3081    #[ignore = "Requires active Redis instance"]
3082    async fn test_has_entries() {
3083        let repo = setup_test_repo().await;
3084        assert!(!repo.has_entries().await.unwrap());
3085
3086        let tx_id = uuid::Uuid::new_v4().to_string();
3087        let tx = create_test_transaction(&tx_id);
3088        repo.create(tx.clone()).await.unwrap();
3089
3090        assert!(repo.has_entries().await.unwrap());
3091    }
3092
3093    #[tokio::test]
3094    #[ignore = "Requires active Redis instance"]
3095    async fn test_drop_all_entries() {
3096        let repo = setup_test_repo().await;
3097        let tx_id = uuid::Uuid::new_v4().to_string();
3098        let tx = create_test_transaction(&tx_id);
3099        repo.create(tx.clone()).await.unwrap();
3100        assert!(repo.has_entries().await.unwrap());
3101
3102        repo.drop_all_entries().await.unwrap();
3103        assert!(!repo.has_entries().await.unwrap());
3104    }
3105
3106    // Tests for delete_at field setting on final status updates
3107    #[tokio::test]
3108    #[ignore = "Requires active Redis instance"]
3109    async fn test_update_status_sets_delete_at_for_final_statuses() {
3110        let _lock = ENV_MUTEX.lock().await;
3111
3112        use chrono::{DateTime, Duration, Utc};
3113        use std::env;
3114
3115        // Use a unique test environment variable to avoid conflicts
3116        env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
3117
3118        let repo = setup_test_repo().await;
3119
3120        let final_statuses = [
3121            TransactionStatus::Canceled,
3122            TransactionStatus::Confirmed,
3123            TransactionStatus::Failed,
3124            TransactionStatus::Expired,
3125        ];
3126
3127        for (i, status) in final_statuses.iter().enumerate() {
3128            let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
3129            let mut tx = create_test_transaction(&tx_id);
3130
3131            // Ensure transaction has no delete_at initially and is in pending state
3132            tx.delete_at = None;
3133            tx.status = TransactionStatus::Pending;
3134
3135            repo.create(tx).await.unwrap();
3136
3137            let before_update = Utc::now();
3138
3139            // Update to final status
3140            let updated = repo
3141                .update_status(tx_id.clone(), status.clone())
3142                .await
3143                .unwrap();
3144
3145            // Should have delete_at set
3146            assert!(
3147                updated.delete_at.is_some(),
3148                "delete_at should be set for status: {status:?}"
3149            );
3150
3151            // Verify the timestamp is reasonable (approximately 6 hours from now)
3152            let delete_at_str = updated.delete_at.unwrap();
3153            let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
3154                .expect("delete_at should be valid RFC3339")
3155                .with_timezone(&Utc);
3156
3157            let duration_from_before = delete_at.signed_duration_since(before_update);
3158            let expected_duration = Duration::hours(6);
3159            let tolerance = Duration::minutes(5);
3160
3161            assert!(
3162                duration_from_before >= expected_duration - tolerance
3163                    && duration_from_before <= expected_duration + tolerance,
3164                "delete_at should be approximately 6 hours from now for status: {status:?}. Duration: {duration_from_before:?}"
3165            );
3166        }
3167
3168        // Cleanup
3169        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3170    }
3171
3172    #[tokio::test]
3173    #[ignore = "Requires active Redis instance"]
3174    async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
3175        let _lock = ENV_MUTEX.lock().await;
3176
3177        use std::env;
3178
3179        env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
3180
3181        let repo = setup_test_repo().await;
3182
3183        let non_final_statuses = [
3184            TransactionStatus::Pending,
3185            TransactionStatus::Sent,
3186            TransactionStatus::Submitted,
3187            TransactionStatus::Mined,
3188        ];
3189
3190        for (i, status) in non_final_statuses.iter().enumerate() {
3191            let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
3192            let mut tx = create_test_transaction(&tx_id);
3193            tx.delete_at = None;
3194            tx.status = TransactionStatus::Pending;
3195
3196            repo.create(tx).await.unwrap();
3197
3198            // Update to non-final status
3199            let updated = repo
3200                .update_status(tx_id.clone(), status.clone())
3201                .await
3202                .unwrap();
3203
3204            // Should NOT have delete_at set
3205            assert!(
3206                updated.delete_at.is_none(),
3207                "delete_at should NOT be set for status: {status:?}"
3208            );
3209        }
3210
3211        // Cleanup
3212        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3213    }
3214
3215    #[tokio::test]
3216    #[ignore = "Requires active Redis instance"]
3217    async fn test_partial_update_sets_delete_at_for_final_statuses() {
3218        let _lock = ENV_MUTEX.lock().await;
3219
3220        use chrono::{DateTime, Duration, Utc};
3221        use std::env;
3222
3223        env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
3224
3225        let repo = setup_test_repo().await;
3226        let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
3227        let mut tx = create_test_transaction(&tx_id);
3228        tx.delete_at = None;
3229        tx.status = TransactionStatus::Pending;
3230
3231        repo.create(tx).await.unwrap();
3232
3233        let before_update = Utc::now();
3234
3235        // Use partial_update to set status to Confirmed (final status)
3236        let update = TransactionUpdateRequest {
3237            status: Some(TransactionStatus::Confirmed),
3238            status_reason: Some("Transaction completed".to_string()),
3239            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
3240            ..Default::default()
3241        };
3242
3243        let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
3244
3245        // Should have delete_at set
3246        assert!(
3247            updated.delete_at.is_some(),
3248            "delete_at should be set when updating to Confirmed status"
3249        );
3250
3251        // Verify the timestamp is reasonable (approximately 8 hours from now)
3252        let delete_at_str = updated.delete_at.unwrap();
3253        let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
3254            .expect("delete_at should be valid RFC3339")
3255            .with_timezone(&Utc);
3256
3257        let duration_from_before = delete_at.signed_duration_since(before_update);
3258        let expected_duration = Duration::hours(8);
3259        let tolerance = Duration::minutes(5);
3260
3261        assert!(
3262            duration_from_before >= expected_duration - tolerance
3263                && duration_from_before <= expected_duration + tolerance,
3264            "delete_at should be approximately 8 hours from now. Duration: {duration_from_before:?}"
3265        );
3266
3267        // Also verify other fields were updated
3268        assert_eq!(updated.status, TransactionStatus::Confirmed);
3269        assert_eq!(
3270            updated.status_reason,
3271            Some("Transaction completed".to_string())
3272        );
3273        assert_eq!(
3274            updated.confirmed_at,
3275            Some("2023-01-01T12:05:00Z".to_string())
3276        );
3277
3278        // Cleanup
3279        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3280    }
3281
3282    #[tokio::test]
3283    #[ignore = "Requires active Redis instance"]
3284    async fn test_update_status_preserves_existing_delete_at() {
3285        let _lock = ENV_MUTEX.lock().await;
3286
3287        use std::env;
3288
3289        env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
3290
3291        let repo = setup_test_repo().await;
3292        let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
3293        let mut tx = create_test_transaction(&tx_id);
3294
3295        // Set an existing delete_at value
3296        let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
3297        tx.delete_at = Some(existing_delete_at.clone());
3298        tx.status = TransactionStatus::Pending;
3299
3300        repo.create(tx).await.unwrap();
3301
3302        // Update to final status
3303        let updated = repo
3304            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3305            .await
3306            .unwrap();
3307
3308        // Should preserve the existing delete_at value
3309        assert_eq!(
3310            updated.delete_at,
3311            Some(existing_delete_at),
3312            "Existing delete_at should be preserved when updating to final status"
3313        );
3314
3315        // Cleanup
3316        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3317    }
3318    #[tokio::test]
3319    #[ignore = "Requires active Redis instance"]
3320    async fn test_partial_update_without_status_change_preserves_delete_at() {
3321        let _lock = ENV_MUTEX.lock().await;
3322
3323        use std::env;
3324
3325        env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
3326
3327        let repo = setup_test_repo().await;
3328        let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
3329        let mut tx = create_test_transaction(&tx_id);
3330        tx.delete_at = None;
3331        tx.status = TransactionStatus::Pending;
3332
3333        repo.create(tx).await.unwrap();
3334
3335        // First, update to final status to set delete_at
3336        let updated1 = repo
3337            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3338            .await
3339            .unwrap();
3340
3341        assert!(updated1.delete_at.is_some());
3342        let original_delete_at = updated1.delete_at.clone();
3343
3344        // Now update other fields without changing status
3345        let update = TransactionUpdateRequest {
3346            status: None, // No status change
3347            status_reason: Some("Updated reason".to_string()),
3348            confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
3349            ..Default::default()
3350        };
3351
3352        let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
3353
3354        // delete_at should be preserved
3355        assert_eq!(
3356            updated2.delete_at, original_delete_at,
3357            "delete_at should be preserved when status is not updated"
3358        );
3359
3360        // Other fields should be updated
3361        assert_eq!(updated2.status, TransactionStatus::Confirmed); // Unchanged
3362        assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
3363        assert_eq!(
3364            updated2.confirmed_at,
3365            Some("2023-01-01T12:10:00Z".to_string())
3366        );
3367
3368        // Cleanup
3369        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3370    }
3371
3372    // Tests for delete_by_ids batch delete functionality
3373
3374    #[tokio::test]
3375    #[ignore = "Requires active Redis instance"]
3376    async fn test_delete_by_ids_empty_list() {
3377        let repo = setup_test_repo().await;
3378        let tx_id = format!("test-empty-{}", Uuid::new_v4());
3379
3380        // Create a transaction to ensure repo is not empty
3381        let tx = create_test_transaction(&tx_id);
3382        repo.create(tx).await.unwrap();
3383
3384        // Delete with empty list should succeed and not affect existing data
3385        let result = repo.delete_by_ids(vec![]).await.unwrap();
3386
3387        assert_eq!(result.deleted_count, 0);
3388        assert!(result.failed.is_empty());
3389
3390        // Original transaction should still exist
3391        assert!(repo.get_by_id(tx_id).await.is_ok());
3392    }
3393
3394    #[tokio::test]
3395    #[ignore = "Requires active Redis instance"]
3396    async fn test_delete_by_ids_single_transaction() {
3397        let repo = setup_test_repo().await;
3398        let tx_id = format!("test-single-{}", Uuid::new_v4());
3399
3400        let tx = create_test_transaction(&tx_id);
3401        repo.create(tx).await.unwrap();
3402
3403        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3404
3405        assert_eq!(result.deleted_count, 1);
3406        assert!(result.failed.is_empty());
3407
3408        // Verify transaction was deleted
3409        assert!(repo.get_by_id(tx_id).await.is_err());
3410    }
3411
3412    #[tokio::test]
3413    #[ignore = "Requires active Redis instance"]
3414    async fn test_delete_by_ids_multiple_transactions() {
3415        let repo = setup_test_repo().await;
3416        let base_id = Uuid::new_v4();
3417
3418        // Create multiple transactions
3419        let mut created_ids = Vec::new();
3420        for i in 1..=5 {
3421            let tx_id = format!("test-multi-{base_id}-{i}");
3422            let tx = create_test_transaction(&tx_id);
3423            repo.create(tx).await.unwrap();
3424            created_ids.push(tx_id);
3425        }
3426
3427        // Delete 3 of them
3428        let ids_to_delete = vec![
3429            created_ids[0].clone(),
3430            created_ids[2].clone(),
3431            created_ids[4].clone(),
3432        ];
3433        let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3434
3435        assert_eq!(result.deleted_count, 3);
3436        assert!(result.failed.is_empty());
3437
3438        // Verify correct transactions were deleted
3439        assert!(repo.get_by_id(created_ids[0].clone()).await.is_err());
3440        assert!(repo.get_by_id(created_ids[1].clone()).await.is_ok()); // Not deleted
3441        assert!(repo.get_by_id(created_ids[2].clone()).await.is_err());
3442        assert!(repo.get_by_id(created_ids[3].clone()).await.is_ok()); // Not deleted
3443        assert!(repo.get_by_id(created_ids[4].clone()).await.is_err());
3444    }
3445
3446    #[tokio::test]
3447    #[ignore = "Requires active Redis instance"]
3448    async fn test_delete_by_ids_nonexistent_transactions() {
3449        let repo = setup_test_repo().await;
3450        let base_id = Uuid::new_v4();
3451
3452        // Try to delete transactions that don't exist
3453        let ids_to_delete = vec![
3454            format!("nonexistent-{}-1", base_id),
3455            format!("nonexistent-{}-2", base_id),
3456        ];
3457        let result = repo.delete_by_ids(ids_to_delete.clone()).await.unwrap();
3458
3459        assert_eq!(result.deleted_count, 0);
3460        assert_eq!(result.failed.len(), 2);
3461
3462        // Verify error messages contain the IDs
3463        let failed_ids: Vec<&String> = result.failed.iter().map(|(id, _)| id).collect();
3464        assert!(failed_ids.contains(&&ids_to_delete[0]));
3465        assert!(failed_ids.contains(&&ids_to_delete[1]));
3466    }
3467
3468    #[tokio::test]
3469    #[ignore = "Requires active Redis instance"]
3470    async fn test_delete_by_ids_mixed_existing_and_nonexistent() {
3471        let repo = setup_test_repo().await;
3472        let base_id = Uuid::new_v4();
3473
3474        // Create some transactions
3475        let existing_ids: Vec<String> = (1..=3)
3476            .map(|i| format!("test-mixed-existing-{base_id}-{i}"))
3477            .collect();
3478
3479        for id in &existing_ids {
3480            let tx = create_test_transaction(id);
3481            repo.create(tx).await.unwrap();
3482        }
3483
3484        let nonexistent_ids: Vec<String> = (1..=2)
3485            .map(|i| format!("test-mixed-nonexistent-{base_id}-{i}"))
3486            .collect();
3487
3488        // Try to delete mix of existing and non-existing
3489        let ids_to_delete = vec![
3490            existing_ids[0].clone(),
3491            nonexistent_ids[0].clone(),
3492            existing_ids[1].clone(),
3493            nonexistent_ids[1].clone(),
3494        ];
3495        let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3496
3497        assert_eq!(result.deleted_count, 2);
3498        assert_eq!(result.failed.len(), 2);
3499
3500        // Verify existing transactions were deleted
3501        assert!(repo.get_by_id(existing_ids[0].clone()).await.is_err());
3502        assert!(repo.get_by_id(existing_ids[1].clone()).await.is_err());
3503
3504        // Verify remaining transaction still exists
3505        assert!(repo.get_by_id(existing_ids[2].clone()).await.is_ok());
3506    }
3507
3508    #[tokio::test]
3509    #[ignore = "Requires active Redis instance"]
3510    async fn test_delete_by_ids_removes_all_indexes() {
3511        let repo = setup_test_repo().await;
3512        let relayer_id = format!("relayer-{}", Uuid::new_v4());
3513        let tx_id = format!("test-indexes-{}", Uuid::new_v4());
3514
3515        // Create a transaction with specific status
3516        let mut tx = create_test_transaction(&tx_id);
3517        tx.relayer_id = relayer_id.clone();
3518        tx.status = TransactionStatus::Confirmed;
3519        repo.create(tx).await.unwrap();
3520
3521        // Verify transaction exists and is indexed
3522        let found = repo
3523            .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3524            .await
3525            .unwrap();
3526        assert!(found.iter().any(|t| t.id == tx_id));
3527
3528        // Delete the transaction
3529        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3530        assert_eq!(result.deleted_count, 1);
3531
3532        // Verify transaction is no longer in status index
3533        let found_after = repo
3534            .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3535            .await
3536            .unwrap();
3537        assert!(!found_after.iter().any(|t| t.id == tx_id));
3538
3539        // Verify transaction cannot be found
3540        assert!(repo.get_by_id(tx_id).await.is_err());
3541    }
3542
3543    #[tokio::test]
3544    #[ignore = "Requires active Redis instance"]
3545    async fn test_delete_by_ids_removes_nonce_index() {
3546        let repo = setup_test_repo().await;
3547        let relayer_id = format!("relayer-{}", Uuid::new_v4());
3548        let tx_id = format!("test-nonce-{}", Uuid::new_v4());
3549        let nonce = 12345u64;
3550
3551        // Create a transaction with a specific nonce
3552        let tx = create_test_transaction_with_nonce(&tx_id, nonce, &relayer_id);
3553        repo.create(tx).await.unwrap();
3554
3555        // Verify nonce index works
3556        let found = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3557        assert!(found.is_some());
3558        assert_eq!(found.unwrap().id, tx_id);
3559
3560        // Delete the transaction
3561        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3562        assert_eq!(result.deleted_count, 1);
3563
3564        // Verify nonce index was cleaned up
3565        let found_after = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3566        assert!(found_after.is_none());
3567    }
3568
3569    #[tokio::test]
3570    #[ignore = "Requires active Redis instance"]
3571    async fn test_delete_by_ids_large_batch() {
3572        let repo = setup_test_repo().await;
3573        let base_id = Uuid::new_v4();
3574
3575        // Create many transactions to test batch performance
3576        let count = 50;
3577        let mut created_ids = Vec::new();
3578
3579        for i in 0..count {
3580            let tx_id = format!("test-large-{base_id}-{i}");
3581            let tx = create_test_transaction(&tx_id);
3582            repo.create(tx).await.unwrap();
3583            created_ids.push(tx_id);
3584        }
3585
3586        // Delete all of them in one batch
3587        let result = repo.delete_by_ids(created_ids.clone()).await.unwrap();
3588
3589        assert_eq!(result.deleted_count, count);
3590        assert!(result.failed.is_empty());
3591
3592        // Verify all were deleted
3593        for id in created_ids {
3594            assert!(repo.get_by_id(id).await.is_err());
3595        }
3596    }
3597
3598    #[tokio::test]
3599    #[ignore = "Requires active Redis instance"]
3600    async fn test_delete_by_ids_preserves_other_relayer_transactions() {
3601        let repo = setup_test_repo().await;
3602        let relayer_1 = format!("relayer-1-{}", Uuid::new_v4());
3603        let relayer_2 = format!("relayer-2-{}", Uuid::new_v4());
3604        let tx_id_1 = format!("tx-relayer-1-{}", Uuid::new_v4());
3605        let tx_id_2 = format!("tx-relayer-2-{}", Uuid::new_v4());
3606
3607        // Create transactions for different relayers
3608        let tx1 = create_test_transaction_with_relayer(&tx_id_1, &relayer_1);
3609        let tx2 = create_test_transaction_with_relayer(&tx_id_2, &relayer_2);
3610
3611        repo.create(tx1).await.unwrap();
3612        repo.create(tx2).await.unwrap();
3613
3614        // Delete only relayer-1's transaction
3615        let result = repo.delete_by_ids(vec![tx_id_1.clone()]).await.unwrap();
3616
3617        assert_eq!(result.deleted_count, 1);
3618
3619        // relayer-1's transaction should be deleted
3620        assert!(repo.get_by_id(tx_id_1).await.is_err());
3621
3622        // relayer-2's transaction should still exist
3623        let remaining = repo.get_by_id(tx_id_2).await.unwrap();
3624        assert_eq!(remaining.relayer_id, relayer_2);
3625    }
3626
3627    // ── increment_status_check_failures ─────────────────────────────
3628
3629    #[tokio::test]
3630    #[ignore = "Requires active Redis instance"]
3631    async fn test_increment_status_check_failures_no_prior_metadata() {
3632        let _lock = ENV_MUTEX.lock().await;
3633        let repo = setup_test_repo().await;
3634        let relayer_id = Uuid::new_v4().to_string();
3635        let tx_id = Uuid::new_v4().to_string();
3636        let mut tx =
3637            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3638        tx.metadata = None;
3639        repo.create(tx).await.unwrap();
3640
3641        let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3642
3643        let meta = updated.metadata.expect("metadata should be set");
3644        assert_eq!(meta.consecutive_failures, 1);
3645        assert_eq!(meta.total_failures, 1);
3646        assert_eq!(meta.insufficient_fee_retries, 0);
3647    }
3648
3649    #[tokio::test]
3650    #[ignore = "Requires active Redis instance"]
3651    async fn test_increment_status_check_failures_accumulates() {
3652        let _lock = ENV_MUTEX.lock().await;
3653        let repo = setup_test_repo().await;
3654        let relayer_id = Uuid::new_v4().to_string();
3655        let tx_id = Uuid::new_v4().to_string();
3656        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3657        repo.create(tx).await.unwrap();
3658
3659        repo.increment_status_check_failures(tx_id.clone())
3660            .await
3661            .unwrap();
3662        repo.increment_status_check_failures(tx_id.clone())
3663            .await
3664            .unwrap();
3665        let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3666
3667        let meta = updated.metadata.unwrap();
3668        assert_eq!(meta.consecutive_failures, 3);
3669        assert_eq!(meta.total_failures, 3);
3670    }
3671
3672    #[tokio::test]
3673    #[ignore = "Requires active Redis instance"]
3674    async fn test_increment_status_check_failures_noop_on_final_state() {
3675        let _lock = ENV_MUTEX.lock().await;
3676        let repo = setup_test_repo().await;
3677        let relayer_id = Uuid::new_v4().to_string();
3678        let tx_id = Uuid::new_v4().to_string();
3679        let tx =
3680            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3681        repo.create(tx).await.unwrap();
3682
3683        let result = repo.increment_status_check_failures(tx_id).await.unwrap();
3684
3685        // Should return unchanged — no metadata mutation on final state
3686        assert!(result.metadata.is_none());
3687        assert_eq!(result.status, TransactionStatus::Confirmed);
3688    }
3689
3690    #[tokio::test]
3691    #[ignore = "Requires active Redis instance"]
3692    async fn test_increment_status_check_failures_not_found() {
3693        let _lock = ENV_MUTEX.lock().await;
3694        let repo = setup_test_repo().await;
3695
3696        let result = repo
3697            .increment_status_check_failures("nonexistent".to_string())
3698            .await;
3699
3700        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3701    }
3702
3703    // ── reset_status_check_consecutive_failures ─────────────────────
3704
3705    #[tokio::test]
3706    #[ignore = "Requires active Redis instance"]
3707    async fn test_reset_consecutive_failures() {
3708        let _lock = ENV_MUTEX.lock().await;
3709        let repo = setup_test_repo().await;
3710        let relayer_id = Uuid::new_v4().to_string();
3711        let tx_id = Uuid::new_v4().to_string();
3712        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3713        repo.create(tx).await.unwrap();
3714
3715        // Increment a few times first
3716        repo.increment_status_check_failures(tx_id.clone())
3717            .await
3718            .unwrap();
3719        repo.increment_status_check_failures(tx_id.clone())
3720            .await
3721            .unwrap();
3722
3723        let updated = repo
3724            .reset_status_check_consecutive_failures(tx_id)
3725            .await
3726            .unwrap();
3727
3728        let meta = updated.metadata.unwrap();
3729        assert_eq!(meta.consecutive_failures, 0);
3730        // total_failures should be preserved
3731        assert_eq!(meta.total_failures, 2);
3732    }
3733
3734    #[tokio::test]
3735    #[ignore = "Requires active Redis instance"]
3736    async fn test_reset_consecutive_failures_noop_on_final_state() {
3737        let _lock = ENV_MUTEX.lock().await;
3738        let repo = setup_test_repo().await;
3739        let relayer_id = Uuid::new_v4().to_string();
3740        let tx_id = Uuid::new_v4().to_string();
3741        let mut tx =
3742            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Failed);
3743        tx.metadata = Some(crate::models::TransactionMetadata {
3744            consecutive_failures: 5,
3745            total_failures: 10,
3746            insufficient_fee_retries: 0,
3747            try_again_later_retries: 0,
3748        });
3749        repo.create(tx).await.unwrap();
3750
3751        let result = repo
3752            .reset_status_check_consecutive_failures(tx_id)
3753            .await
3754            .unwrap();
3755
3756        // Should return unchanged on final state
3757        let meta = result.metadata.unwrap();
3758        assert_eq!(meta.consecutive_failures, 5);
3759    }
3760
3761    #[tokio::test]
3762    #[ignore = "Requires active Redis instance"]
3763    async fn test_reset_consecutive_failures_not_found() {
3764        let _lock = ENV_MUTEX.lock().await;
3765        let repo = setup_test_repo().await;
3766
3767        let result = repo
3768            .reset_status_check_consecutive_failures("nonexistent".to_string())
3769            .await;
3770
3771        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3772    }
3773
3774    // ── record_stellar_insufficient_fee_retry ───────────────────────
3775
3776    #[tokio::test]
3777    #[ignore = "Requires active Redis instance"]
3778    async fn test_record_insufficient_fee_retry() {
3779        let _lock = ENV_MUTEX.lock().await;
3780        let repo = setup_test_repo().await;
3781        let relayer_id = Uuid::new_v4().to_string();
3782        let tx_id = Uuid::new_v4().to_string();
3783        let mut tx =
3784            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3785        tx.sent_at = None;
3786        repo.create(tx).await.unwrap();
3787
3788        let updated = repo
3789            .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:00:00Z".to_string())
3790            .await
3791            .unwrap();
3792
3793        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
3794        let meta = updated.metadata.unwrap();
3795        assert_eq!(meta.insufficient_fee_retries, 1);
3796        assert_eq!(meta.consecutive_failures, 0);
3797        assert_eq!(meta.total_failures, 0);
3798    }
3799
3800    #[tokio::test]
3801    #[ignore = "Requires active Redis instance"]
3802    async fn test_record_insufficient_fee_retry_accumulates() {
3803        let _lock = ENV_MUTEX.lock().await;
3804        let repo = setup_test_repo().await;
3805        let relayer_id = Uuid::new_v4().to_string();
3806        let tx_id = Uuid::new_v4().to_string();
3807        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3808        repo.create(tx).await.unwrap();
3809
3810        repo.record_stellar_insufficient_fee_retry(
3811            tx_id.clone(),
3812            "2025-03-18T10:00:00Z".to_string(),
3813        )
3814        .await
3815        .unwrap();
3816
3817        let updated = repo
3818            .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:01:00Z".to_string())
3819            .await
3820            .unwrap();
3821
3822        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
3823        let meta = updated.metadata.unwrap();
3824        assert_eq!(meta.insufficient_fee_retries, 2);
3825    }
3826
3827    #[tokio::test]
3828    #[ignore = "Requires active Redis instance"]
3829    async fn test_record_insufficient_fee_retry_noop_on_final_state() {
3830        let _lock = ENV_MUTEX.lock().await;
3831        let repo = setup_test_repo().await;
3832        let relayer_id = Uuid::new_v4().to_string();
3833        let tx_id = Uuid::new_v4().to_string();
3834        let mut tx =
3835            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3836        tx.sent_at = Some("old-time".to_string());
3837        repo.create(tx).await.unwrap();
3838
3839        let result = repo
3840            .record_stellar_insufficient_fee_retry(tx_id, "new-time".to_string())
3841            .await
3842            .unwrap();
3843
3844        // Should return unchanged on final state
3845        assert_eq!(result.sent_at.as_deref(), Some("old-time"));
3846        assert!(result.metadata.is_none());
3847    }
3848
3849    #[tokio::test]
3850    #[ignore = "Requires active Redis instance"]
3851    async fn test_record_insufficient_fee_retry_not_found() {
3852        let _lock = ENV_MUTEX.lock().await;
3853        let repo = setup_test_repo().await;
3854
3855        let result = repo
3856            .record_stellar_insufficient_fee_retry(
3857                "nonexistent".to_string(),
3858                "2025-03-18T10:00:00Z".to_string(),
3859            )
3860            .await;
3861
3862        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3863    }
3864
3865    // ── record_stellar_try_again_later_retry ────────────────────────
3866
3867    #[tokio::test]
3868    #[ignore = "Requires active Redis instance"]
3869    async fn test_record_try_again_later_retry() {
3870        let _lock = ENV_MUTEX.lock().await;
3871        let repo = setup_test_repo().await;
3872        let relayer_id = Uuid::new_v4().to_string();
3873        let tx_id = Uuid::new_v4().to_string();
3874        let mut tx =
3875            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3876        tx.sent_at = None;
3877        repo.create(tx).await.unwrap();
3878
3879        let updated = repo
3880            .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:00:00Z".to_string())
3881            .await
3882            .unwrap();
3883
3884        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
3885        let meta = updated.metadata.unwrap();
3886        assert_eq!(meta.try_again_later_retries, 1);
3887        assert_eq!(meta.consecutive_failures, 0);
3888        assert_eq!(meta.total_failures, 0);
3889    }
3890
3891    #[tokio::test]
3892    #[ignore = "Requires active Redis instance"]
3893    async fn test_record_try_again_later_retry_accumulates() {
3894        let _lock = ENV_MUTEX.lock().await;
3895        let repo = setup_test_repo().await;
3896        let relayer_id = Uuid::new_v4().to_string();
3897        let tx_id = Uuid::new_v4().to_string();
3898        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3899        repo.create(tx).await.unwrap();
3900
3901        repo.record_stellar_try_again_later_retry(
3902            tx_id.clone(),
3903            "2025-03-18T10:00:00Z".to_string(),
3904        )
3905        .await
3906        .unwrap();
3907
3908        let updated = repo
3909            .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:01:00Z".to_string())
3910            .await
3911            .unwrap();
3912
3913        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
3914        let meta = updated.metadata.unwrap();
3915        assert_eq!(meta.try_again_later_retries, 2);
3916    }
3917
3918    #[tokio::test]
3919    #[ignore = "Requires active Redis instance"]
3920    async fn test_record_try_again_later_retry_noop_on_final_state() {
3921        let _lock = ENV_MUTEX.lock().await;
3922        let repo = setup_test_repo().await;
3923        let relayer_id = Uuid::new_v4().to_string();
3924        let tx_id = Uuid::new_v4().to_string();
3925        let mut tx =
3926            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3927        tx.sent_at = Some("old-time".to_string());
3928        repo.create(tx).await.unwrap();
3929
3930        let result = repo
3931            .record_stellar_try_again_later_retry(tx_id, "new-time".to_string())
3932            .await
3933            .unwrap();
3934
3935        // Should return unchanged on final state
3936        assert_eq!(result.sent_at.as_deref(), Some("old-time"));
3937        assert!(result.metadata.is_none());
3938    }
3939
3940    #[tokio::test]
3941    #[ignore = "Requires active Redis instance"]
3942    async fn test_record_try_again_later_retry_not_found() {
3943        let _lock = ENV_MUTEX.lock().await;
3944        let repo = setup_test_repo().await;
3945
3946        let result = repo
3947            .record_stellar_try_again_later_retry(
3948                "nonexistent".to_string(),
3949                "2025-03-18T10:00:00Z".to_string(),
3950            )
3951            .await;
3952
3953        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3954    }
3955
3956    // ── metadata preservation across operations ─────────────────────
3957
3958    #[tokio::test]
3959    #[ignore = "Requires active Redis instance"]
3960    async fn test_increment_failures_preserves_try_again_later_retries() {
3961        let _lock = ENV_MUTEX.lock().await;
3962        let repo = setup_test_repo().await;
3963        let relayer_id = Uuid::new_v4().to_string();
3964        let tx_id = Uuid::new_v4().to_string();
3965        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3966        repo.create(tx).await.unwrap();
3967
3968        // Set try_again_later_retries = 1
3969        repo.record_stellar_try_again_later_retry(
3970            tx_id.clone(),
3971            "2025-03-18T10:00:00Z".to_string(),
3972        )
3973        .await
3974        .unwrap();
3975
3976        // Now increment failures — should NOT clobber try_again_later_retries
3977        let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3978
3979        let meta = updated.metadata.unwrap();
3980        assert_eq!(
3981            meta.try_again_later_retries, 1,
3982            "try_again_later_retries must survive increment_status_check_failures"
3983        );
3984        assert_eq!(meta.consecutive_failures, 1);
3985        assert_eq!(meta.total_failures, 1);
3986    }
3987
3988    #[tokio::test]
3989    #[ignore = "Requires active Redis instance"]
3990    async fn test_increment_failures_preserves_insufficient_fee_retries() {
3991        let _lock = ENV_MUTEX.lock().await;
3992        let repo = setup_test_repo().await;
3993        let relayer_id = Uuid::new_v4().to_string();
3994        let tx_id = Uuid::new_v4().to_string();
3995        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3996        repo.create(tx).await.unwrap();
3997
3998        // Set insufficient_fee_retries = 1
3999        repo.record_stellar_insufficient_fee_retry(
4000            tx_id.clone(),
4001            "2025-03-18T10:00:00Z".to_string(),
4002        )
4003        .await
4004        .unwrap();
4005
4006        // Now increment failures — should NOT clobber insufficient_fee_retries
4007        let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
4008
4009        let meta = updated.metadata.unwrap();
4010        assert_eq!(
4011            meta.insufficient_fee_retries, 1,
4012            "insufficient_fee_retries must survive increment_status_check_failures"
4013        );
4014        assert_eq!(meta.consecutive_failures, 1);
4015    }
4016
4017    #[tokio::test]
4018    #[ignore = "Requires active Redis instance"]
4019    async fn test_reset_failures_preserves_retry_counters() {
4020        let _lock = ENV_MUTEX.lock().await;
4021        let repo = setup_test_repo().await;
4022        let relayer_id = Uuid::new_v4().to_string();
4023        let tx_id = Uuid::new_v4().to_string();
4024        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4025        repo.create(tx).await.unwrap();
4026
4027        // Set both retry counters
4028        repo.record_stellar_try_again_later_retry(
4029            tx_id.clone(),
4030            "2025-03-18T10:00:00Z".to_string(),
4031        )
4032        .await
4033        .unwrap();
4034        repo.record_stellar_insufficient_fee_retry(
4035            tx_id.clone(),
4036            "2025-03-18T10:01:00Z".to_string(),
4037        )
4038        .await
4039        .unwrap();
4040
4041        // Increment then reset consecutive failures
4042        repo.increment_status_check_failures(tx_id.clone())
4043            .await
4044            .unwrap();
4045        let updated = repo
4046            .reset_status_check_consecutive_failures(tx_id)
4047            .await
4048            .unwrap();
4049
4050        let meta = updated.metadata.unwrap();
4051        assert_eq!(meta.consecutive_failures, 0);
4052        assert_eq!(meta.total_failures, 1);
4053        assert_eq!(
4054            meta.try_again_later_retries, 1,
4055            "try_again_later_retries must survive reset"
4056        );
4057        assert_eq!(
4058            meta.insufficient_fee_retries, 1,
4059            "insufficient_fee_retries must survive reset"
4060        );
4061    }
4062
4063    #[tokio::test]
4064    #[ignore = "Requires active Redis instance"]
4065    async fn test_fee_and_try_again_later_retries_independent() {
4066        let _lock = ENV_MUTEX.lock().await;
4067        let repo = setup_test_repo().await;
4068        let relayer_id = Uuid::new_v4().to_string();
4069        let tx_id = Uuid::new_v4().to_string();
4070        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4071        repo.create(tx).await.unwrap();
4072
4073        // Set try_again_later_retries = 2
4074        repo.record_stellar_try_again_later_retry(
4075            tx_id.clone(),
4076            "2025-03-18T10:00:00Z".to_string(),
4077        )
4078        .await
4079        .unwrap();
4080        repo.record_stellar_try_again_later_retry(
4081            tx_id.clone(),
4082            "2025-03-18T10:01:00Z".to_string(),
4083        )
4084        .await
4085        .unwrap();
4086
4087        // Set insufficient_fee_retries = 1 — should NOT clobber try_again_later_retries
4088        let updated = repo
4089            .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:02:00Z".to_string())
4090            .await
4091            .unwrap();
4092
4093        let meta = updated.metadata.unwrap();
4094        assert_eq!(
4095            meta.try_again_later_retries, 2,
4096            "try_again_later_retries must survive insufficient_fee_retry"
4097        );
4098        assert_eq!(meta.insufficient_fee_retries, 1);
4099    }
4100}