1use crate::config::ServerConfig;
4use crate::constants::FINAL_TRANSACTION_STATUSES;
5use crate::domain::transaction::common::is_final_state;
6use crate::metrics::{
7 TRANSACTIONS_BY_STATUS, TRANSACTIONS_CREATED, TRANSACTIONS_FAILED,
8 TRANSACTIONS_INSUFFICIENT_FEE_FAILED, TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS,
9 TRANSACTIONS_SUBMITTED, TRANSACTIONS_SUCCESS, TRANSACTIONS_TRY_AGAIN_LATER_FAILED,
10 TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS, TRANSACTION_PROCESSING_TIME,
11};
12use crate::models::{
13 NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
14 TransactionStatus, TransactionUpdateRequest,
15};
16use crate::repositories::redis_base::RedisRepository;
17use crate::repositories::{
18 BatchDeleteResult, BatchRetrievalResult, PaginatedResult, Repository, TransactionDeleteRequest,
19 TransactionRepository,
20};
21use crate::utils::RedisConnections;
22use async_trait::async_trait;
23use chrono::Utc;
24use redis::{AsyncCommands, Script};
25use std::fmt;
26use std::sync::Arc;
27use tracing::{debug, error, warn};
28
29const RELAYER_PREFIX: &str = "relayer";
30const TX_PREFIX: &str = "tx";
31const STATUS_PREFIX: &str = "status";
32const STATUS_SORTED_PREFIX: &str = "status_sorted";
33const NONCE_PREFIX: &str = "nonce";
34const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
35const RELAYER_LIST_KEY: &str = "relayer_list";
36const TX_BY_CREATED_AT_PREFIX: &str = "tx_by_created_at";
37
38#[derive(Clone)]
39pub struct RedisTransactionRepository {
40 pub connections: Arc<RedisConnections>,
41 pub key_prefix: String,
42}
43
44impl RedisRepository for RedisTransactionRepository {}
45
46impl RedisTransactionRepository {
47 pub fn new(
48 connections: Arc<RedisConnections>,
49 key_prefix: String,
50 ) -> Result<Self, RepositoryError> {
51 if key_prefix.is_empty() {
52 return Err(RepositoryError::InvalidData(
53 "Redis key prefix cannot be empty".to_string(),
54 ));
55 }
56
57 Ok(Self {
58 connections,
59 key_prefix,
60 })
61 }
62
63 fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
65 format!(
66 "{}:{}:{}:{}:{}",
67 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
68 )
69 }
70
71 fn tx_to_relayer_key(&self, tx_id: &str) -> String {
73 format!(
74 "{}:{}:{}:{}",
75 self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
76 )
77 }
78
79 fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
81 format!(
82 "{}:{}:{}:{}:{}",
83 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
84 )
85 }
86
87 fn relayer_status_sorted_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
90 format!(
91 "{}:{}:{}:{}:{}",
92 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_SORTED_PREFIX, status
93 )
94 }
95
96 fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
98 format!(
99 "{}:{}:{}:{}:{}",
100 self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
101 )
102 }
103
104 fn relayer_list_key(&self) -> String {
106 format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
107 }
108
109 fn relayer_tx_by_created_at_key(&self, relayer_id: &str) -> String {
111 format!(
112 "{}:{}:{}:{}",
113 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_BY_CREATED_AT_PREFIX
114 )
115 }
116
117 fn tx_key_parts(&self, tx_id: &str) -> (String, String, String) {
122 let lookup_key = self.tx_to_relayer_key(tx_id);
123 let key_prefix = format!("{}:{}:", self.key_prefix, RELAYER_PREFIX);
124 let key_suffix = format!(":{TX_PREFIX}:{tx_id}");
125 (lookup_key, key_prefix, key_suffix)
126 }
127
128 async fn run_atomic_script(
135 &self,
136 lua: &str,
137 tx_id: &str,
138 extra_args: &[&str],
139 op_name: &str,
140 ) -> Result<TransactionRepoModel, RepositoryError> {
141 const MAX_RETRIES: u32 = 3;
142 const BASE_BACKOFF_MS: u64 = 100;
143
144 let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(tx_id);
145 let script = Script::new(lua);
146 let mut last_error = None;
147
148 for attempt in 0..MAX_RETRIES {
149 let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
150
151 let mut conn = match self
152 .get_connection(self.connections.primary(), op_name)
153 .await
154 {
155 Ok(conn) => conn,
156 Err(e) => {
157 last_error = Some(e);
158 if attempt < MAX_RETRIES - 1 {
159 warn!(tx_id = %tx_id, attempt, op = %op_name, "connection failed, retrying");
160 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
161 continue;
162 }
163 return Err(last_error.unwrap());
164 }
165 };
166
167 let mut invocation = script.prepare_invoke();
168 invocation
169 .key(&lookup_key)
170 .arg(&key_prefix)
171 .arg(&key_suffix);
172 for arg in extra_args {
173 invocation.arg(*arg);
174 }
175
176 match invocation.invoke_async::<Option<String>>(&mut conn).await {
177 Ok(result) => {
178 let json = result.ok_or_else(|| {
179 RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found"))
180 })?;
181 return self.deserialize_entity::<TransactionRepoModel>(
182 &json,
183 tx_id,
184 "transaction",
185 );
186 }
187 Err(e) => {
188 last_error = Some(self.map_redis_error(e, op_name));
189 if attempt < MAX_RETRIES - 1 {
190 warn!(
191 tx_id = %tx_id, attempt, op = %op_name,
192 "atomic script failed, retrying"
193 );
194 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
195 continue;
196 }
197 return Err(last_error.unwrap());
198 }
199 }
200 }
201 Err(last_error.unwrap_or_else(|| {
202 RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}"))
203 }))
204 }
205
206 async fn run_script_with_retry_vec(
210 &self,
211 script: &Script,
212 lookup_key: &str,
213 key_prefix: &str,
214 key_suffix: &str,
215 extra_args: &[&str],
216 op_name: &str,
217 ) -> Result<Option<Vec<String>>, RepositoryError> {
218 const MAX_RETRIES: u32 = 3;
219 const BASE_BACKOFF_MS: u64 = 100;
220
221 let mut last_error = None;
222
223 for attempt in 0..MAX_RETRIES {
224 let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
225
226 let mut conn = match self
227 .get_connection(self.connections.primary(), op_name)
228 .await
229 {
230 Ok(conn) => conn,
231 Err(e) => {
232 last_error = Some(e);
233 if attempt < MAX_RETRIES - 1 {
234 warn!(op = %op_name, attempt, "connection failed, retrying");
235 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
236 continue;
237 }
238 return Err(last_error.unwrap());
239 }
240 };
241
242 let mut invocation = script.prepare_invoke();
243 invocation.key(lookup_key).arg(key_prefix).arg(key_suffix);
244 for arg in extra_args {
245 invocation.arg(*arg);
246 }
247
248 match invocation
251 .invoke_async::<Option<Vec<String>>>(&mut conn)
252 .await
253 {
254 Ok(result) => return Ok(result),
255 Err(e) => {
256 last_error = Some(self.map_redis_error(e, op_name));
257 if attempt < MAX_RETRIES - 1 {
258 warn!(op = %op_name, attempt, "script failed, retrying");
259 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
260 continue;
261 }
262 return Err(last_error.unwrap());
263 }
264 }
265 }
266 Err(last_error.unwrap_or_else(|| {
267 RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}"))
268 }))
269 }
270
271 fn timestamp_to_score(&self, timestamp: &str) -> f64 {
273 chrono::DateTime::parse_from_rfc3339(timestamp)
274 .map(|dt| dt.timestamp_millis() as f64)
275 .unwrap_or_else(|_| {
276 warn!(timestamp = %timestamp, "failed to parse timestamp, using 0");
277 0.0
278 })
279 }
280
281 fn status_sorted_score(&self, tx: &TransactionRepoModel) -> f64 {
285 if tx.status == TransactionStatus::Confirmed {
286 if let Some(ref confirmed_at) = tx.confirmed_at {
288 return self.timestamp_to_score(confirmed_at);
289 }
290 warn!(tx_id = %tx.id, "Confirmed transaction missing confirmed_at, using created_at");
292 }
293 self.timestamp_to_score(&tx.created_at)
294 }
295
296 async fn get_transactions_by_ids(
298 &self,
299 ids: &[String],
300 ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
301 if ids.is_empty() {
302 debug!("no transaction IDs provided for batch fetch");
303 return Ok(BatchRetrievalResult {
304 results: vec![],
305 failed_ids: vec![],
306 });
307 }
308
309 let mut conn = self
310 .get_connection(self.connections.reader(), "batch_fetch_transactions")
311 .await?;
312
313 let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
314
315 debug!(count = %ids.len(), "fetching relayer IDs for transactions");
316
317 let relayer_ids: Vec<Option<String>> = conn
318 .mget(&reverse_keys)
319 .await
320 .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
321
322 let mut tx_keys = Vec::new();
323 let mut valid_ids = Vec::new();
324 let mut failed_ids = Vec::new();
325 for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
326 match relayer_id {
327 Some(relayer_id) => {
328 tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
329 valid_ids.push(ids[i].clone());
330 }
331 None => {
332 warn!(tx_id = %ids[i], "no relayer found for transaction");
333 failed_ids.push(ids[i].clone());
334 }
335 }
336 }
337
338 if tx_keys.is_empty() {
339 debug!("no valid transactions found for batch fetch");
340 return Ok(BatchRetrievalResult {
341 results: vec![],
342 failed_ids,
343 });
344 }
345
346 debug!(count = %tx_keys.len(), "batch fetching transaction data");
347
348 let values: Vec<Option<String>> = conn
349 .mget(&tx_keys)
350 .await
351 .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
352
353 let mut transactions = Vec::new();
354 let mut failed_count = 0;
355 for (i, value) in values.into_iter().enumerate() {
356 match value {
357 Some(json) => {
358 match self.deserialize_entity::<TransactionRepoModel>(
359 &json,
360 &valid_ids[i],
361 "transaction",
362 ) {
363 Ok(tx) => transactions.push(tx),
364 Err(e) => {
365 failed_count += 1;
366 error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
367 }
369 }
370 }
371 None => {
372 warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
373 failed_ids.push(valid_ids[i].clone());
374 }
375 }
376 }
377
378 if failed_count > 0 {
379 warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
380 }
381
382 debug!(count = %transactions.len(), "successfully fetched transactions");
383 Ok(BatchRetrievalResult {
384 results: transactions,
385 failed_ids,
386 })
387 }
388
389 fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
391 match network_data.get_evm_transaction_data() {
392 Ok(tx_data) => tx_data.nonce,
393 Err(_) => {
394 debug!("no EVM transaction data available for nonce extraction");
395 None
396 }
397 }
398 }
399
400 async fn ensure_status_sorted_set(
417 &self,
418 relayer_id: &str,
419 status: &TransactionStatus,
420 ) -> Result<u64, RepositoryError> {
421 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
422 let legacy_key = self.relayer_status_key(relayer_id, status);
423
424 let legacy_ids = {
426 let mut conn = self
427 .get_connection(self.connections.primary(), "ensure_status_sorted_set_check")
428 .await?;
429
430 let legacy_count: u64 = conn
432 .scard(&legacy_key)
433 .await
434 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_scard"))?;
435
436 if legacy_count == 0 {
437 let sorted_count: u64 = conn
439 .zcard(&sorted_key)
440 .await
441 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_zcard"))?;
442 return Ok(sorted_count);
443 }
444
445 debug!(
447 relayer_id = %relayer_id,
448 status = %status,
449 legacy_count = %legacy_count,
450 "migrating status set to sorted set"
451 );
452
453 let ids: Vec<String> = conn
454 .smembers(&legacy_key)
455 .await
456 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_smembers"))?;
457
458 ids
459 };
461
462 if legacy_ids.is_empty() {
463 return Ok(0);
464 }
465
466 let transactions = self.get_transactions_by_ids(&legacy_ids).await?;
468
469 let mut conn = self
471 .get_connection(
472 self.connections.primary(),
473 "ensure_status_sorted_set_migrate",
474 )
475 .await?;
476
477 if transactions.results.is_empty() {
478 let _: () = conn
480 .del(&legacy_key)
481 .await
482 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_del_stale"))?;
483 return Ok(0);
484 }
485
486 let mut pipe = redis::pipe();
489 pipe.atomic();
490
491 for tx in &transactions.results {
492 let score = self.status_sorted_score(tx);
493 pipe.zadd(&sorted_key, &tx.id, score);
494 }
495
496 pipe.del(&legacy_key);
498
499 pipe.query_async::<()>(&mut conn)
500 .await
501 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_migrate"))?;
502
503 let migrated_count = transactions.results.len() as u64;
504 debug!(
505 relayer_id = %relayer_id,
506 status = %status,
507 migrated_count = %migrated_count,
508 "completed migration of status set to sorted set"
509 );
510
511 Ok(migrated_count)
512 }
513
514 async fn update_indexes(
516 &self,
517 tx: &TransactionRepoModel,
518 old_tx: Option<&TransactionRepoModel>,
519 ) -> Result<(), RepositoryError> {
520 let mut conn = self
521 .get_connection(self.connections.primary(), "update_indexes")
522 .await?;
523 let mut pipe = redis::pipe();
524 pipe.atomic();
525
526 debug!(tx_id = %tx.id, "updating indexes for transaction");
527
528 let relayer_list_key = self.relayer_list_key();
530 pipe.sadd(&relayer_list_key, &tx.relayer_id);
531
532 let status_score = self.status_sorted_score(tx);
535 let created_at_score = self.timestamp_to_score(&tx.created_at);
537
538 let new_status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, &tx.status);
540 pipe.zadd(&new_status_sorted_key, &tx.id, status_score);
541 debug!(tx_id = %tx.id, status = %tx.status, score = %status_score, "added transaction to status sorted set");
542
543 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
544 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
545 pipe.set(&nonce_key, &tx.id);
546 debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
547 }
548
549 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
551 pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
552 debug!(tx_id = %tx.id, score = %created_at_score, "added transaction to sorted set by created_at");
553
554 if let Some(old) = old_tx {
556 if old.status != tx.status {
557 let old_status_sorted_key =
559 self.relayer_status_sorted_key(&old.relayer_id, &old.status);
560 pipe.zrem(&old_status_sorted_key, &tx.id);
561
562 let old_status_legacy_key = self.relayer_status_key(&old.relayer_id, &old.status);
564 pipe.srem(&old_status_legacy_key, &tx.id);
565
566 debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status indexes for transaction");
567 }
568
569 if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
571 let new_nonce = self.extract_nonce(&tx.network_data);
572 if Some(old_nonce) != new_nonce {
573 let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
574 pipe.del(&old_nonce_key);
575 debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
576 }
577 }
578 }
579
580 pipe.exec_async(&mut conn).await.map_err(|e| {
582 error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
583 self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
584 })?;
585
586 debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
587 Ok(())
588 }
589
590 async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
592 let mut conn = self
593 .get_connection(self.connections.primary(), "remove_all_indexes")
594 .await?;
595 let mut pipe = redis::pipe();
596 pipe.atomic();
597
598 debug!(tx_id = %tx.id, "removing all indexes for transaction");
599
600 for status in &[
604 TransactionStatus::Canceled,
605 TransactionStatus::Pending,
606 TransactionStatus::Sent,
607 TransactionStatus::Submitted,
608 TransactionStatus::Mined,
609 TransactionStatus::Confirmed,
610 TransactionStatus::Failed,
611 TransactionStatus::Expired,
612 ] {
613 let status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, status);
615 pipe.zrem(&status_sorted_key, &tx.id);
616
617 let status_legacy_key = self.relayer_status_key(&tx.relayer_id, status);
619 pipe.srem(&status_legacy_key, &tx.id);
620 }
621
622 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
624 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
625 pipe.del(&nonce_key);
626 debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
627 }
628
629 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
631 pipe.zrem(&relayer_sorted_key, &tx.id);
632 debug!(tx_id = %tx.id, "removing transaction from sorted set by created_at");
633
634 let reverse_key = self.tx_to_relayer_key(&tx.id);
636 pipe.del(&reverse_key);
637
638 pipe.exec_async(&mut conn).await.map_err(|e| {
639 error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
640 self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
641 })?;
642
643 debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
644 Ok(())
645 }
646
647 fn track_status_change_metrics(
649 &self,
650 _original_tx: &TransactionRepoModel,
651 updated_tx: &TransactionRepoModel,
652 old_status: &TransactionStatus,
653 new_status: &TransactionStatus,
654 ) {
655 let network_type = format!("{:?}", updated_tx.network_type).to_lowercase();
656 let relayer_id = updated_tx.relayer_id.as_str();
657
658 if *old_status != TransactionStatus::Submitted
660 && *new_status == TransactionStatus::Submitted
661 {
662 TRANSACTIONS_SUBMITTED
663 .with_label_values(&[relayer_id, &network_type])
664 .inc();
665
666 if let Ok(created_time) = chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at) {
667 let processing_seconds =
668 (Utc::now() - created_time.with_timezone(&Utc)).num_seconds() as f64;
669 TRANSACTION_PROCESSING_TIME
670 .with_label_values(&[relayer_id, &network_type, "creation_to_submission"])
671 .observe(processing_seconds);
672 }
673 }
674
675 if old_status != new_status {
677 let old_status_str = format!("{old_status:?}").to_lowercase();
678 let old_status_gauge = TRANSACTIONS_BY_STATUS.with_label_values(&[
679 relayer_id,
680 &network_type,
681 &old_status_str,
682 ]);
683 let clamped_value = (old_status_gauge.get() - 1.0).max(0.0);
684 old_status_gauge.set(clamped_value);
685
686 let new_status_str = format!("{new_status:?}").to_lowercase();
687 TRANSACTIONS_BY_STATUS
688 .with_label_values(&[relayer_id, &network_type, &new_status_str])
689 .inc();
690 }
691
692 let was_final = is_final_state(old_status);
694 let is_final = is_final_state(new_status);
695
696 if !was_final && is_final {
697 let meta = updated_tx.metadata.as_ref();
698 let had_insufficient_fee = meta.is_some_and(|m| m.insufficient_fee_retries > 0);
699 let had_try_again_later = meta.is_some_and(|m| m.try_again_later_retries > 0);
700
701 match new_status {
702 TransactionStatus::Confirmed => {
703 TRANSACTIONS_SUCCESS
704 .with_label_values(&[relayer_id, &network_type])
705 .inc();
706 if had_insufficient_fee {
707 TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS
708 .with_label_values(&[relayer_id, &network_type])
709 .inc();
710 }
711 if had_try_again_later {
712 TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS
713 .with_label_values(&[relayer_id, &network_type])
714 .inc();
715 }
716
717 if let (Some(sent_at_str), Some(confirmed_at_str)) =
718 (&updated_tx.sent_at, &updated_tx.confirmed_at)
719 {
720 if let (Ok(sent_time), Ok(confirmed_time)) = (
721 chrono::DateTime::parse_from_rfc3339(sent_at_str),
722 chrono::DateTime::parse_from_rfc3339(confirmed_at_str),
723 ) {
724 let processing_seconds = (confirmed_time.with_timezone(&Utc)
725 - sent_time.with_timezone(&Utc))
726 .num_seconds()
727 as f64;
728 TRANSACTION_PROCESSING_TIME
729 .with_label_values(&[
730 relayer_id,
731 &network_type,
732 "submission_to_confirmation",
733 ])
734 .observe(processing_seconds);
735 }
736 }
737
738 if let Ok(created_time) =
739 chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
740 {
741 if let Some(confirmed_at_str) = &updated_tx.confirmed_at {
742 if let Ok(confirmed_time) =
743 chrono::DateTime::parse_from_rfc3339(confirmed_at_str)
744 {
745 let processing_seconds = (confirmed_time.with_timezone(&Utc)
746 - created_time.with_timezone(&Utc))
747 .num_seconds()
748 as f64;
749 TRANSACTION_PROCESSING_TIME
750 .with_label_values(&[
751 relayer_id,
752 &network_type,
753 "creation_to_confirmation",
754 ])
755 .observe(processing_seconds);
756 }
757 }
758 }
759 }
760 TransactionStatus::Failed => {
761 let failure_reason = updated_tx
762 .status_reason
763 .as_deref()
764 .map(|reason| {
765 if reason.starts_with("Submission failed:") {
766 "submission_failed"
767 } else if reason.starts_with("Preparation failed:") {
768 "preparation_failed"
769 } else {
770 "failed"
771 }
772 })
773 .unwrap_or("failed");
774 TRANSACTIONS_FAILED
775 .with_label_values(&[relayer_id, &network_type, failure_reason])
776 .inc();
777 }
778 TransactionStatus::Expired => {
779 TRANSACTIONS_FAILED
780 .with_label_values(&[relayer_id, &network_type, "expired"])
781 .inc();
782 }
783 TransactionStatus::Canceled => {
784 TRANSACTIONS_FAILED
785 .with_label_values(&[relayer_id, &network_type, "canceled"])
786 .inc();
787 }
788 _ => {}
789 }
790
791 if *new_status != TransactionStatus::Confirmed {
793 if had_insufficient_fee {
794 TRANSACTIONS_INSUFFICIENT_FEE_FAILED
795 .with_label_values(&[relayer_id, &network_type])
796 .inc();
797 }
798 if had_try_again_later {
799 TRANSACTIONS_TRY_AGAIN_LATER_FAILED
800 .with_label_values(&[relayer_id, &network_type])
801 .inc();
802 }
803 }
804 }
805 }
806}
807
808impl fmt::Debug for RedisTransactionRepository {
809 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
810 f.debug_struct("RedisTransactionRepository")
811 .field("connections", &"<RedisConnections>")
812 .field("key_prefix", &self.key_prefix)
813 .finish()
814 }
815}
816
817#[async_trait]
818impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
819 async fn create(
820 &self,
821 entity: TransactionRepoModel,
822 ) -> Result<TransactionRepoModel, RepositoryError> {
823 if entity.id.is_empty() {
824 return Err(RepositoryError::InvalidData(
825 "Transaction ID cannot be empty".to_string(),
826 ));
827 }
828
829 let key = self.tx_key(&entity.relayer_id, &entity.id);
830 let reverse_key = self.tx_to_relayer_key(&entity.id);
831 let mut conn = self
832 .get_connection(self.connections.primary(), "create")
833 .await?;
834
835 debug!(tx_id = %entity.id, "creating transaction");
836
837 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
838
839 let existing: Option<String> = conn
841 .get(&reverse_key)
842 .await
843 .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
844
845 if existing.is_some() {
846 return Err(RepositoryError::ConstraintViolation(format!(
847 "Transaction with ID {} already exists",
848 entity.id
849 )));
850 }
851
852 let mut pipe = redis::pipe();
854 pipe.atomic();
855 pipe.set(&key, &value);
856 pipe.set(&reverse_key, &entity.relayer_id);
857
858 pipe.exec_async(&mut conn)
859 .await
860 .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
861
862 if let Err(e) = self.update_indexes(&entity, None).await {
864 error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
865 return Err(e);
866 }
867
868 let network_type = format!("{:?}", entity.network_type).to_lowercase();
870 let relayer_id = entity.relayer_id.as_str();
871 TRANSACTIONS_CREATED
872 .with_label_values(&[relayer_id, &network_type])
873 .inc();
874
875 let status = &entity.status;
877 let status_str = format!("{status:?}").to_lowercase();
878 TRANSACTIONS_BY_STATUS
879 .with_label_values(&[relayer_id, &network_type, &status_str])
880 .inc();
881
882 debug!(tx_id = %entity.id, "successfully created transaction");
883 Ok(entity)
884 }
885
886 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
887 if id.is_empty() {
888 return Err(RepositoryError::InvalidData(
889 "Transaction ID cannot be empty".to_string(),
890 ));
891 }
892
893 let mut conn = self
894 .get_connection(self.connections.reader(), "get_by_id")
895 .await?;
896
897 debug!(tx_id = %id, "fetching transaction");
898
899 let reverse_key = self.tx_to_relayer_key(&id);
900 let relayer_id: Option<String> = conn
901 .get(&reverse_key)
902 .await
903 .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
904
905 let relayer_id = match relayer_id {
906 Some(relayer_id) => relayer_id,
907 None => {
908 debug!(tx_id = %id, "transaction not found (no reverse lookup)");
909 return Err(RepositoryError::NotFound(format!(
910 "Transaction with ID {id} not found"
911 )));
912 }
913 };
914
915 let key = self.tx_key(&relayer_id, &id);
916 let value: Option<String> = conn
917 .get(&key)
918 .await
919 .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
920
921 match value {
922 Some(json) => {
923 let tx =
924 self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
925 debug!(tx_id = %id, "successfully fetched transaction");
926 Ok(tx)
927 }
928 None => {
929 debug!(tx_id = %id, "transaction not found");
930 Err(RepositoryError::NotFound(format!(
931 "Transaction with ID {id} not found"
932 )))
933 }
934 }
935 }
936
937 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
939 let mut conn = self
940 .get_connection(self.connections.reader(), "list_all")
941 .await?;
942
943 debug!("fetching all transactions sorted by created_at (newest first)");
944
945 let relayer_list_key = self.relayer_list_key();
947 let relayer_ids: Vec<String> = conn
948 .smembers(&relayer_list_key)
949 .await
950 .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
951
952 debug!(count = %relayer_ids.len(), "found relayers");
953
954 let mut all_tx_ids = Vec::new();
956 for relayer_id in relayer_ids {
957 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
958 let tx_ids: Vec<String> = redis::cmd("ZRANGE")
959 .arg(&relayer_sorted_key)
960 .arg(0)
961 .arg(-1)
962 .arg("REV")
963 .query_async(&mut conn)
964 .await
965 .map_err(|e| self.map_redis_error(e, "list_all_relayer_sorted"))?;
966
967 all_tx_ids.extend(tx_ids);
968 }
969
970 drop(conn);
972
973 let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
975 let mut all_transactions = batch_result.results;
976
977 all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
979
980 debug!(count = %all_transactions.len(), "found transactions");
981 Ok(all_transactions)
982 }
983
984 async fn list_paginated(
986 &self,
987 query: PaginationQuery,
988 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
989 if query.per_page == 0 {
990 return Err(RepositoryError::InvalidData(
991 "per_page must be greater than 0".to_string(),
992 ));
993 }
994
995 let mut conn = self
996 .get_connection(self.connections.reader(), "list_paginated")
997 .await?;
998
999 debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions sorted by created_at (newest first)");
1000
1001 let relayer_list_key = self.relayer_list_key();
1003 let relayer_ids: Vec<String> = conn
1004 .smembers(&relayer_list_key)
1005 .await
1006 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
1007
1008 let mut all_tx_ids = Vec::new();
1010 for relayer_id in relayer_ids {
1011 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
1012 let tx_ids: Vec<String> = redis::cmd("ZRANGE")
1013 .arg(&relayer_sorted_key)
1014 .arg(0)
1015 .arg(-1)
1016 .arg("REV")
1017 .query_async(&mut conn)
1018 .await
1019 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_sorted"))?;
1020
1021 all_tx_ids.extend(tx_ids);
1022 }
1023
1024 drop(conn);
1026
1027 let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
1029 let mut all_transactions = batch_result.results;
1030
1031 all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1033
1034 let total = all_transactions.len() as u64;
1035 let start = ((query.page - 1) * query.per_page) as usize;
1036 let end = (start + query.per_page as usize).min(all_transactions.len());
1037
1038 if start >= all_transactions.len() {
1039 debug!(page = %query.page, total = %total, "page is beyond available data");
1040 return Ok(PaginatedResult {
1041 items: vec![],
1042 total,
1043 page: query.page,
1044 per_page: query.per_page,
1045 });
1046 }
1047
1048 let items = all_transactions[start..end].to_vec();
1049
1050 debug!(count = %items.len(), page = %query.page, "successfully fetched transactions for page");
1051
1052 Ok(PaginatedResult {
1053 items,
1054 total,
1055 page: query.page,
1056 per_page: query.per_page,
1057 })
1058 }
1059
1060 async fn update(
1061 &self,
1062 id: String,
1063 entity: TransactionRepoModel,
1064 ) -> Result<TransactionRepoModel, RepositoryError> {
1065 if id.is_empty() {
1066 return Err(RepositoryError::InvalidData(
1067 "Transaction ID cannot be empty".to_string(),
1068 ));
1069 }
1070
1071 debug!(tx_id = %id, "updating transaction");
1072
1073 let old_tx = self.get_by_id(id.clone()).await?;
1075
1076 let key = self.tx_key(&entity.relayer_id, &id);
1077 let mut conn = self
1078 .get_connection(self.connections.primary(), "update")
1079 .await?;
1080
1081 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
1082
1083 let _: () = conn
1085 .set(&key, value)
1086 .await
1087 .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
1088
1089 self.update_indexes(&entity, Some(&old_tx)).await?;
1091
1092 debug!(tx_id = %id, "successfully updated transaction");
1093 Ok(entity)
1094 }
1095
1096 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
1097 if id.is_empty() {
1098 return Err(RepositoryError::InvalidData(
1099 "Transaction ID cannot be empty".to_string(),
1100 ));
1101 }
1102
1103 debug!(tx_id = %id, "deleting transaction");
1104
1105 let tx = self.get_by_id(id.clone()).await?;
1107
1108 let key = self.tx_key(&tx.relayer_id, &id);
1109 let reverse_key = self.tx_to_relayer_key(&id);
1110 let mut conn = self
1111 .get_connection(self.connections.primary(), "delete_by_id")
1112 .await?;
1113
1114 let mut pipe = redis::pipe();
1115 pipe.atomic();
1116 pipe.del(&key);
1117 pipe.del(&reverse_key);
1118
1119 pipe.exec_async(&mut conn)
1120 .await
1121 .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
1122
1123 if let Err(e) = self.remove_all_indexes(&tx).await {
1125 error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
1126 }
1127
1128 debug!(tx_id = %id, "successfully deleted transaction");
1129 Ok(())
1130 }
1131
1132 async fn count(&self) -> Result<usize, RepositoryError> {
1134 let mut conn = self
1135 .get_connection(self.connections.reader(), "count")
1136 .await?;
1137
1138 debug!("counting transactions");
1139
1140 let relayer_list_key = self.relayer_list_key();
1142 let relayer_ids: Vec<String> = conn
1143 .smembers(&relayer_list_key)
1144 .await
1145 .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
1146
1147 let mut total_count = 0usize;
1148 for relayer_id in relayer_ids {
1149 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
1150 let count: usize = conn
1151 .zcard(&relayer_sorted_key)
1152 .await
1153 .map_err(|e| self.map_redis_error(e, "count_relayer_transactions"))?;
1154 total_count += count;
1155 }
1156
1157 debug!(count = %total_count, "transaction count");
1158 Ok(total_count)
1159 }
1160
1161 async fn has_entries(&self) -> Result<bool, RepositoryError> {
1162 let mut conn = self
1163 .get_connection(self.connections.reader(), "has_entries")
1164 .await?;
1165 let relayer_list_key = self.relayer_list_key();
1166
1167 debug!("checking if transaction entries exist");
1168
1169 let exists: bool = conn
1170 .exists(&relayer_list_key)
1171 .await
1172 .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
1173
1174 debug!(exists = %exists, "transaction entries exist");
1175 Ok(exists)
1176 }
1177
1178 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
1179 let mut conn = self
1180 .get_connection(self.connections.primary(), "drop_all_entries")
1181 .await?;
1182 let relayer_list_key = self.relayer_list_key();
1183
1184 debug!("dropping all transaction entries");
1185
1186 let relayer_ids: Vec<String> = conn
1188 .smembers(&relayer_list_key)
1189 .await
1190 .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
1191
1192 if relayer_ids.is_empty() {
1193 debug!("no transaction entries to drop");
1194 return Ok(());
1195 }
1196
1197 let mut pipe = redis::pipe();
1199 pipe.atomic();
1200
1201 for relayer_id in &relayer_ids {
1203 let pattern = format!(
1205 "{}:{}:{}:{}:*",
1206 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
1207 );
1208 let mut cursor = 0;
1209 let mut tx_ids = Vec::new();
1210
1211 loop {
1212 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
1213 .cursor_arg(cursor)
1214 .arg("MATCH")
1215 .arg(&pattern)
1216 .query_async(&mut conn)
1217 .await
1218 .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
1219
1220 for key in keys {
1222 pipe.del(&key);
1223 if let Some(tx_id) = key.split(':').next_back() {
1224 tx_ids.push(tx_id.to_string());
1225 }
1226 }
1227
1228 cursor = next_cursor;
1229 if cursor == 0 {
1230 break;
1231 }
1232 }
1233
1234 for tx_id in tx_ids {
1236 let reverse_key = self.tx_to_relayer_key(&tx_id);
1237 pipe.del(&reverse_key);
1238
1239 for status in &[
1242 TransactionStatus::Canceled,
1243 TransactionStatus::Pending,
1244 TransactionStatus::Sent,
1245 TransactionStatus::Submitted,
1246 TransactionStatus::Mined,
1247 TransactionStatus::Confirmed,
1248 TransactionStatus::Failed,
1249 TransactionStatus::Expired,
1250 ] {
1251 let status_sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1253 pipe.zrem(&status_sorted_key, &tx_id);
1254
1255 let status_key = self.relayer_status_key(relayer_id, status);
1257 pipe.srem(&status_key, &tx_id);
1258 }
1259 }
1260
1261 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
1263 pipe.del(&relayer_sorted_key);
1264 }
1265
1266 pipe.del(&relayer_list_key);
1268
1269 pipe.exec_async(&mut conn)
1270 .await
1271 .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
1272
1273 debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
1274 Ok(())
1275 }
1276}
1277
1278#[async_trait]
1279impl TransactionRepository for RedisTransactionRepository {
1280 async fn find_by_relayer_id(
1281 &self,
1282 relayer_id: &str,
1283 query: PaginationQuery,
1284 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1285 let mut conn = self
1286 .get_connection(self.connections.reader(), "find_by_relayer_id")
1287 .await?;
1288
1289 debug!(relayer_id = %relayer_id, page = %query.page, per_page = %query.per_page, "fetching transactions for relayer sorted by created_at (newest first)");
1290
1291 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
1292
1293 let sorted_set_count: u64 = conn
1295 .zcard(&relayer_sorted_key)
1296 .await
1297 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_count"))?;
1298
1299 if sorted_set_count == 0 {
1302 debug!(relayer_id = %relayer_id, "no transactions found for relayer (sorted set is empty)");
1303 return Ok(PaginatedResult {
1304 items: vec![],
1305 total: 0,
1306 page: query.page,
1307 per_page: query.per_page,
1308 });
1309 }
1310
1311 let total = sorted_set_count;
1312
1313 let start = ((query.page - 1) * query.per_page) as isize;
1315 let end = start + query.per_page as isize - 1;
1316
1317 if start as u64 >= total {
1318 debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
1319 return Ok(PaginatedResult {
1320 items: vec![],
1321 total,
1322 page: query.page,
1323 per_page: query.per_page,
1324 });
1325 }
1326
1327 let page_ids: Vec<String> = redis::cmd("ZRANGE")
1329 .arg(&relayer_sorted_key)
1330 .arg(start)
1331 .arg(end)
1332 .arg("REV")
1333 .query_async(&mut conn)
1334 .await
1335 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_sorted"))?;
1336
1337 drop(conn);
1339
1340 let items = self.get_transactions_by_ids(&page_ids).await?;
1341
1342 debug!(relayer_id = %relayer_id, count = %items.results.len(), page = %query.page, "successfully fetched transactions for relayer");
1343
1344 Ok(PaginatedResult {
1345 items: items.results,
1346 total,
1347 page: query.page,
1348 per_page: query.per_page,
1349 })
1350 }
1351
1352 async fn find_by_status(
1354 &self,
1355 relayer_id: &str,
1356 statuses: &[TransactionStatus],
1357 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
1358 for status in statuses {
1360 self.ensure_status_sorted_set(relayer_id, status).await?;
1361 }
1362
1363 let mut conn = self
1365 .get_connection(self.connections.reader(), "find_by_status")
1366 .await?;
1367
1368 let mut all_ids: Vec<String> = Vec::new();
1369 for status in statuses {
1370 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1372 let ids: Vec<String> = redis::cmd("ZRANGE")
1373 .arg(&sorted_key)
1374 .arg(0)
1375 .arg(-1)
1376 .arg("REV") .query_async(&mut conn)
1378 .await
1379 .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
1380
1381 all_ids.extend(ids);
1382 }
1383
1384 drop(conn);
1386
1387 if all_ids.is_empty() {
1388 return Ok(vec![]);
1389 }
1390
1391 all_ids.sort();
1393 all_ids.dedup();
1394
1395 let mut transactions = self.get_transactions_by_ids(&all_ids).await?;
1397
1398 transactions
1400 .results
1401 .sort_by(|a, b| b.created_at.cmp(&a.created_at));
1402
1403 Ok(transactions.results)
1404 }
1405
1406 async fn find_by_status_paginated(
1407 &self,
1408 relayer_id: &str,
1409 statuses: &[TransactionStatus],
1410 query: PaginationQuery,
1411 oldest_first: bool,
1412 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1413 for status in statuses {
1415 self.ensure_status_sorted_set(relayer_id, status).await?;
1416 }
1417
1418 let mut conn = self
1419 .get_connection(self.connections.reader(), "find_by_status_paginated")
1420 .await?;
1421
1422 if statuses.len() == 1 {
1424 let sorted_key = self.relayer_status_sorted_key(relayer_id, &statuses[0]);
1425
1426 let total: u64 = conn
1428 .zcard(&sorted_key)
1429 .await
1430 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_count"))?;
1431
1432 if total == 0 {
1433 return Ok(PaginatedResult {
1434 items: vec![],
1435 total: 0,
1436 page: query.page,
1437 per_page: query.per_page,
1438 });
1439 }
1440
1441 let start = ((query.page.saturating_sub(1)) * query.per_page) as isize;
1443 let end = start + query.per_page as isize - 1;
1444
1445 let mut cmd = redis::cmd("ZRANGE");
1448 cmd.arg(&sorted_key).arg(start).arg(end);
1449 if !oldest_first {
1450 cmd.arg("REV");
1451 }
1452 let page_ids: Vec<String> = cmd
1453 .query_async(&mut conn)
1454 .await
1455 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated"))?;
1456
1457 drop(conn);
1459
1460 let transactions = self.get_transactions_by_ids(&page_ids).await?;
1461
1462 debug!(
1463 relayer_id = %relayer_id,
1464 status = %statuses[0],
1465 total = %total,
1466 page = %query.page,
1467 page_size = %transactions.results.len(),
1468 "fetched paginated transactions by single status"
1469 );
1470
1471 return Ok(PaginatedResult {
1472 items: transactions.results,
1473 total,
1474 page: query.page,
1475 per_page: query.per_page,
1476 });
1477 }
1478
1479 let mut all_ids: Vec<(String, f64)> = Vec::new();
1481 for status in statuses {
1482 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1483
1484 let ids_with_scores: Vec<(String, f64)> = redis::cmd("ZRANGE")
1486 .arg(&sorted_key)
1487 .arg(0)
1488 .arg(-1)
1489 .arg("WITHSCORES")
1490 .query_async(&mut conn)
1491 .await
1492 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_multi"))?;
1493
1494 all_ids.extend(ids_with_scores);
1495 }
1496
1497 drop(conn);
1499
1500 let mut id_map: std::collections::HashMap<String, f64> = std::collections::HashMap::new();
1502 for (id, score) in all_ids {
1503 id_map
1504 .entry(id)
1505 .and_modify(|s| {
1506 if oldest_first {
1508 if score < *s {
1509 *s = score
1510 }
1511 } else if score > *s {
1512 *s = score
1513 }
1514 })
1515 .or_insert(score);
1516 }
1517
1518 let mut sorted_ids: Vec<(String, f64)> = id_map.into_iter().collect();
1520 if oldest_first {
1521 sorted_ids.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1522 } else {
1523 sorted_ids.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1524 }
1525
1526 let total = sorted_ids.len() as u64;
1527
1528 if total == 0 {
1529 return Ok(PaginatedResult {
1530 items: vec![],
1531 total: 0,
1532 page: query.page,
1533 per_page: query.per_page,
1534 });
1535 }
1536
1537 let start = ((query.page.saturating_sub(1)) * query.per_page) as usize;
1539 let page_ids: Vec<String> = sorted_ids
1540 .into_iter()
1541 .skip(start)
1542 .take(query.per_page as usize)
1543 .map(|(id, _)| id)
1544 .collect();
1545
1546 let transactions = self.get_transactions_by_ids(&page_ids).await?;
1548
1549 debug!(
1550 relayer_id = %relayer_id,
1551 total = %total,
1552 page = %query.page,
1553 page_size = %transactions.results.len(),
1554 "fetched paginated transactions by status"
1555 );
1556
1557 Ok(PaginatedResult {
1558 items: transactions.results,
1559 total,
1560 page: query.page,
1561 per_page: query.per_page,
1562 })
1563 }
1564
1565 async fn find_by_nonce(
1566 &self,
1567 relayer_id: &str,
1568 nonce: u64,
1569 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
1570 let mut conn = self
1571 .get_connection(self.connections.reader(), "find_by_nonce")
1572 .await?;
1573 let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
1574
1575 let tx_id: Option<String> = conn
1577 .get(nonce_key)
1578 .await
1579 .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
1580
1581 match tx_id {
1582 Some(tx_id) => {
1583 match self.get_by_id(tx_id.clone()).await {
1584 Ok(tx) => Ok(Some(tx)),
1585 Err(RepositoryError::NotFound(_)) => {
1586 warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
1588 Ok(None)
1589 }
1590 Err(e) => Err(e),
1591 }
1592 }
1593 None => Ok(None),
1594 }
1595 }
1596
1597 async fn update_status(
1598 &self,
1599 tx_id: String,
1600 status: TransactionStatus,
1601 ) -> Result<TransactionRepoModel, RepositoryError> {
1602 let update = TransactionUpdateRequest {
1603 status: Some(status),
1604 ..Default::default()
1605 };
1606 self.partial_update(tx_id, update).await
1607 }
1608
1609 async fn partial_update(
1610 &self,
1611 tx_id: String,
1612 update: TransactionUpdateRequest,
1613 ) -> Result<TransactionRepoModel, RepositoryError> {
1614 let patch_json = serde_json::to_string(&update).map_err(|e| {
1616 RepositoryError::InvalidData(format!("Failed to serialize update patch: {e}"))
1617 })?;
1618
1619 let delete_at_value = if let Some(ref status) = update.status {
1622 if FINAL_TRANSACTION_STATUSES.contains(status) {
1623 let expiration_hours = ServerConfig::get_transaction_expiration_hours();
1624 let seconds = (expiration_hours * 3600.0) as i64;
1625 let delete_time = Utc::now() + chrono::Duration::seconds(seconds);
1626 Some(delete_time.to_rfc3339())
1627 } else {
1628 None
1629 }
1630 } else {
1631 None
1632 };
1633 let delete_at_arg = delete_at_value.as_deref().unwrap_or("");
1634
1635 let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(&tx_id);
1636
1637 let patch_script = Script::new(
1643 r#"
1644 local relayer_id = redis.call('GET', KEYS[1])
1645 if not relayer_id then return false end
1646
1647 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1648 local current = redis.call('GET', tx_key)
1649 if not current then return false end
1650
1651 local tx = cjson.decode(current)
1652 local patch = cjson.decode(ARGV[3])
1653
1654 -- Guard: reject status changes on finalized transactions.
1655 -- A stale worker must not resurrect a tx that another worker
1656 -- already moved to a terminal state.
1657 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1658 if final_states[tx["status"]] and patch["status"] then
1659 return {current, current}
1660 end
1661
1662 local old_snapshot = current
1663
1664 -- lua-cjson cannot distinguish empty Lua tables from empty
1665 -- arrays, so a decode/encode round-trip turns [] into {}.
1666 -- Record which keys held [] in the stored doc and the patch
1667 -- so we can restore them after cjson.encode.
1668 -- NOTE: this relies on each array-typed field having a unique key
1669 -- name across the entire JSON document (including nested objects).
1670 -- If the model ever introduces duplicate key names at different
1671 -- nesting levels (e.g. metadata.hashes), the gsub below could
1672 -- restore the wrong occurrence.
1673 local empty_arrs = {}
1674 for k in string.gmatch(current, '"([^"]+)"%s*:%s*%[%s*%]') do
1675 empty_arrs[k] = true
1676 end
1677 for k in string.gmatch(ARGV[3], '"([^"]+)"%s*:%s*%[%s*%]') do
1678 empty_arrs[k] = true
1679 end
1680
1681 for k, v in pairs(patch) do
1682 tx[k] = v
1683 end
1684
1685 -- Apply delete_at if transitioning to a final state and not already set
1686 if ARGV[4] ~= '' and (not tx["delete_at"] or tx["delete_at"] == cjson.null) then
1687 tx["delete_at"] = ARGV[4]
1688 end
1689
1690 local updated = cjson.encode(tx)
1691
1692 -- Restore empty arrays that cjson.encode converted to {}
1693 for k, _ in pairs(empty_arrs) do
1694 updated = string.gsub(
1695 updated, '"'..k..'"%s*:%s*{}', '"'..k..'":[]', 1
1696 )
1697 end
1698
1699 redis.call('SET', tx_key, updated)
1700 return {old_snapshot, updated}
1701 "#,
1702 );
1703
1704 let result: Option<Vec<String>> = self
1705 .run_script_with_retry_vec(
1706 &patch_script,
1707 &lookup_key,
1708 &key_prefix,
1709 &key_suffix,
1710 &[&patch_json, delete_at_arg],
1711 "partial_update",
1712 )
1713 .await?;
1714
1715 let parts = result.ok_or_else(|| {
1716 RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found"))
1717 })?;
1718
1719 if parts.len() != 2 {
1720 return Err(RepositoryError::UnexpectedError(format!(
1721 "partial_update script returned {} elements, expected 2",
1722 parts.len()
1723 )));
1724 }
1725
1726 let old_json = &parts[0];
1727 let new_json = &parts[1];
1728
1729 let original_tx =
1730 self.deserialize_entity::<TransactionRepoModel>(old_json, &tx_id, "transaction")?;
1731 let updated_tx =
1732 self.deserialize_entity::<TransactionRepoModel>(new_json, &tx_id, "transaction")?;
1733
1734 self.update_indexes(&updated_tx, Some(&original_tx)).await?;
1736
1737 debug!(tx_id = %tx_id, "successfully updated transaction via patch");
1738
1739 if original_tx.status != updated_tx.status {
1743 self.track_status_change_metrics(
1744 &original_tx,
1745 &updated_tx,
1746 &original_tx.status,
1747 &updated_tx.status,
1748 );
1749 }
1750
1751 Ok(updated_tx)
1752 }
1753
1754 async fn update_network_data(
1755 &self,
1756 tx_id: String,
1757 network_data: NetworkTransactionData,
1758 ) -> Result<TransactionRepoModel, RepositoryError> {
1759 let update = TransactionUpdateRequest {
1760 network_data: Some(network_data),
1761 ..Default::default()
1762 };
1763 self.partial_update(tx_id, update).await
1764 }
1765
1766 async fn set_sent_at(
1767 &self,
1768 tx_id: String,
1769 sent_at: String,
1770 ) -> Result<TransactionRepoModel, RepositoryError> {
1771 let update = TransactionUpdateRequest {
1772 sent_at: Some(sent_at),
1773 ..Default::default()
1774 };
1775 self.partial_update(tx_id, update).await
1776 }
1777
1778 async fn increment_status_check_failures(
1779 &self,
1780 tx_id: String,
1781 ) -> Result<TransactionRepoModel, RepositoryError> {
1782 self.run_atomic_script(
1783 r#"
1784 local function set_obj(json, key, tbl)
1785 local enc = cjson.encode(tbl)
1786 local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1787 if n > 0 then return r end
1788 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1789 if n > 0 then return r end
1790 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1791 end
1792
1793 local relayer_id = redis.call('GET', KEYS[1])
1794 if not relayer_id then return false end
1795
1796 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1797 local current = redis.call('GET', tx_key)
1798 if not current then return false end
1799
1800 local tx = cjson.decode(current)
1801 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1802 if final_states[tx["status"]] then return current end
1803
1804 local metadata = tx["metadata"]
1805 if type(metadata) ~= 'table' then metadata = {} end
1806 metadata["consecutive_failures"] = (metadata["consecutive_failures"] or 0) + 1
1807 metadata["total_failures"] = (metadata["total_failures"] or 0) + 1
1808
1809 local updated = set_obj(current, "metadata", metadata)
1810 redis.call('SET', tx_key, updated)
1811 return updated
1812 "#,
1813 &tx_id,
1814 &[],
1815 "increment_status_check_failures",
1816 )
1817 .await
1818 }
1819
1820 async fn reset_status_check_consecutive_failures(
1821 &self,
1822 tx_id: String,
1823 ) -> Result<TransactionRepoModel, RepositoryError> {
1824 self.run_atomic_script(
1825 r#"
1826 local function set_obj(json, key, tbl)
1827 local enc = cjson.encode(tbl)
1828 local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1829 if n > 0 then return r end
1830 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1831 if n > 0 then return r end
1832 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1833 end
1834
1835 local relayer_id = redis.call('GET', KEYS[1])
1836 if not relayer_id then return false end
1837
1838 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1839 local current = redis.call('GET', tx_key)
1840 if not current then return false end
1841
1842 local tx = cjson.decode(current)
1843 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1844 if final_states[tx["status"]] then return current end
1845
1846 local metadata = tx["metadata"]
1847 if type(metadata) ~= 'table' then metadata = {} end
1848 metadata["consecutive_failures"] = 0
1849
1850 local updated = set_obj(current, "metadata", metadata)
1851 redis.call('SET', tx_key, updated)
1852 return updated
1853 "#,
1854 &tx_id,
1855 &[],
1856 "reset_status_check_consecutive_failures",
1857 )
1858 .await
1859 }
1860
1861 async fn record_stellar_insufficient_fee_retry(
1862 &self,
1863 tx_id: String,
1864 sent_at: String,
1865 ) -> Result<TransactionRepoModel, RepositoryError> {
1866 self.run_atomic_script(
1867 r#"
1868 local function set_str(json, key, val)
1869 local enc = cjson.encode(val)
1870 local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1)
1871 if n > 0 then return r end
1872 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1873 if n > 0 then return r end
1874 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1875 end
1876 local function set_obj(json, key, tbl)
1877 local enc = cjson.encode(tbl)
1878 local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1879 if n > 0 then return r end
1880 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1881 if n > 0 then return r end
1882 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1883 end
1884
1885 local relayer_id = redis.call('GET', KEYS[1])
1886 if not relayer_id then return false end
1887
1888 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1889 local current = redis.call('GET', tx_key)
1890 if not current then return false end
1891
1892 local tx = cjson.decode(current)
1893 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1894 if final_states[tx["status"]] then return current end
1895
1896 local metadata = tx["metadata"]
1897 if type(metadata) ~= 'table' then metadata = {} end
1898 metadata["insufficient_fee_retries"] = (metadata["insufficient_fee_retries"] or 0) + 1
1899
1900 local updated = set_str(current, "sent_at", ARGV[3])
1901 updated = set_obj(updated, "metadata", metadata)
1902 redis.call('SET', tx_key, updated)
1903 return updated
1904 "#,
1905 &tx_id,
1906 &[&sent_at],
1907 "record_stellar_insufficient_fee_retry",
1908 )
1909 .await
1910 }
1911
1912 async fn record_stellar_try_again_later_retry(
1913 &self,
1914 tx_id: String,
1915 sent_at: String,
1916 ) -> Result<TransactionRepoModel, RepositoryError> {
1917 self.run_atomic_script(
1918 r#"
1919 local function set_str(json, key, val)
1920 local enc = cjson.encode(val)
1921 local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1)
1922 if n > 0 then return r end
1923 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1924 if n > 0 then return r end
1925 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1926 end
1927 local function set_obj(json, key, tbl)
1928 local enc = cjson.encode(tbl)
1929 local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1930 if n > 0 then return r end
1931 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1932 if n > 0 then return r end
1933 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1934 end
1935
1936 local relayer_id = redis.call('GET', KEYS[1])
1937 if not relayer_id then return false end
1938
1939 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1940 local current = redis.call('GET', tx_key)
1941 if not current then return false end
1942
1943 local tx = cjson.decode(current)
1944 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1945 if final_states[tx["status"]] then return current end
1946
1947 local metadata = tx["metadata"]
1948 if type(metadata) ~= 'table' then metadata = {} end
1949 metadata["try_again_later_retries"] = (metadata["try_again_later_retries"] or 0) + 1
1950
1951 local updated = set_str(current, "sent_at", ARGV[3])
1952 updated = set_obj(updated, "metadata", metadata)
1953 redis.call('SET', tx_key, updated)
1954 return updated
1955 "#,
1956 &tx_id,
1957 &[&sent_at],
1958 "record_stellar_try_again_later_retry",
1959 )
1960 .await
1961 }
1962
1963 async fn set_confirmed_at(
1964 &self,
1965 tx_id: String,
1966 confirmed_at: String,
1967 ) -> Result<TransactionRepoModel, RepositoryError> {
1968 let update = TransactionUpdateRequest {
1969 confirmed_at: Some(confirmed_at),
1970 ..Default::default()
1971 };
1972 self.partial_update(tx_id, update).await
1973 }
1974
1975 async fn count_by_status(
1979 &self,
1980 relayer_id: &str,
1981 statuses: &[TransactionStatus],
1982 ) -> Result<u64, RepositoryError> {
1983 let mut conn = self
1984 .get_connection(self.connections.reader(), "count_by_status")
1985 .await?;
1986 let mut total_count: u64 = 0;
1987
1988 for status in statuses {
1989 self.ensure_status_sorted_set(relayer_id, status).await?;
1991
1992 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1993 let count: u64 = conn
1994 .zcard(&sorted_key)
1995 .await
1996 .map_err(|e| self.map_redis_error(e, "count_by_status"))?;
1997 total_count += count;
1998 }
1999
2000 debug!(relayer_id = %relayer_id, count = %total_count, "counted transactions by status");
2001 Ok(total_count)
2002 }
2003
2004 async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
2005 if ids.is_empty() {
2006 debug!("no transaction IDs provided for batch delete");
2007 return Ok(BatchDeleteResult::default());
2008 }
2009
2010 debug!(count = %ids.len(), "batch deleting transactions by IDs (with fetch)");
2011
2012 let batch_result = self.get_transactions_by_ids(&ids).await?;
2014
2015 let requests: Vec<TransactionDeleteRequest> = batch_result
2017 .results
2018 .iter()
2019 .map(|tx| TransactionDeleteRequest {
2020 id: tx.id.clone(),
2021 relayer_id: tx.relayer_id.clone(),
2022 nonce: self.extract_nonce(&tx.network_data),
2023 })
2024 .collect();
2025
2026 let mut result = self.delete_by_requests(requests).await?;
2028
2029 for id in batch_result.failed_ids {
2031 result
2032 .failed
2033 .push((id.clone(), format!("Transaction with ID {id} not found")));
2034 }
2035
2036 Ok(result)
2037 }
2038
2039 async fn delete_by_requests(
2040 &self,
2041 requests: Vec<TransactionDeleteRequest>,
2042 ) -> Result<BatchDeleteResult, RepositoryError> {
2043 if requests.is_empty() {
2044 debug!("no delete requests provided for batch delete");
2045 return Ok(BatchDeleteResult::default());
2046 }
2047
2048 debug!(count = %requests.len(), "batch deleting transactions by requests (no fetch)");
2049 let mut conn = self
2050 .get_connection(self.connections.primary(), "batch_delete_no_fetch")
2051 .await?;
2052 let mut pipe = redis::pipe();
2053 pipe.atomic();
2054
2055 let all_statuses = [
2057 TransactionStatus::Canceled,
2058 TransactionStatus::Pending,
2059 TransactionStatus::Sent,
2060 TransactionStatus::Submitted,
2061 TransactionStatus::Mined,
2062 TransactionStatus::Confirmed,
2063 TransactionStatus::Failed,
2064 TransactionStatus::Expired,
2065 ];
2066
2067 for req in &requests {
2069 let tx_key = self.tx_key(&req.relayer_id, &req.id);
2071 pipe.del(&tx_key);
2072
2073 let reverse_key = self.tx_to_relayer_key(&req.id);
2075 pipe.del(&reverse_key);
2076
2077 for status in &all_statuses {
2079 let status_sorted_key = self.relayer_status_sorted_key(&req.relayer_id, status);
2080 pipe.zrem(&status_sorted_key, &req.id);
2081
2082 let status_legacy_key = self.relayer_status_key(&req.relayer_id, status);
2083 pipe.srem(&status_legacy_key, &req.id);
2084 }
2085
2086 if let Some(nonce) = req.nonce {
2088 let nonce_key = self.relayer_nonce_key(&req.relayer_id, nonce);
2089 pipe.del(&nonce_key);
2090 }
2091
2092 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&req.relayer_id);
2094 pipe.zrem(&relayer_sorted_key, &req.id);
2095 }
2096
2097 match pipe.exec_async(&mut conn).await {
2099 Ok(_) => {
2100 let deleted_count = requests.len();
2101 debug!(
2102 deleted_count = %deleted_count,
2103 "batch delete completed"
2104 );
2105 Ok(BatchDeleteResult {
2106 deleted_count,
2107 failed: vec![],
2108 })
2109 }
2110 Err(e) => {
2111 error!(error = %e, "batch delete pipeline failed");
2112 let failed: Vec<(String, String)> = requests
2114 .iter()
2115 .map(|req| (req.id.clone(), format!("Redis pipeline error: {e}")))
2116 .collect();
2117 Ok(BatchDeleteResult {
2118 deleted_count: 0,
2119 failed,
2120 })
2121 }
2122 }
2123 }
2124}
2125
2126#[cfg(test)]
2127mod tests {
2128 use super::*;
2129 use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
2130 use alloy::primitives::U256;
2131 use deadpool_redis::{Config, Runtime};
2132 use lazy_static::lazy_static;
2133 use std::str::FromStr;
2134 use tokio;
2135 use uuid::Uuid;
2136
2137 use tokio::sync::Mutex;
2138
2139 lazy_static! {
2141 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
2142 }
2143
2144 fn create_test_transaction(id: &str) -> TransactionRepoModel {
2146 TransactionRepoModel {
2147 id: id.to_string(),
2148 relayer_id: "relayer-1".to_string(),
2149 status: TransactionStatus::Pending,
2150 status_reason: None,
2151 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
2152 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
2153 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
2154 valid_until: None,
2155 delete_at: None,
2156 network_type: NetworkType::Evm,
2157 priced_at: None,
2158 hashes: vec![],
2159 network_data: NetworkTransactionData::Evm(EvmTransactionData {
2160 gas_price: Some(1000000000),
2161 gas_limit: Some(21000),
2162 nonce: Some(1),
2163 value: U256::from_str("1000000000000000000").unwrap(),
2164 data: Some("0x".to_string()),
2165 from: "0xSender".to_string(),
2166 to: Some("0xRecipient".to_string()),
2167 chain_id: 1,
2168 signature: None,
2169 hash: Some(format!("0x{id}")),
2170 speed: Some(Speed::Fast),
2171 max_fee_per_gas: None,
2172 max_priority_fee_per_gas: None,
2173 raw: None,
2174 }),
2175 noop_count: None,
2176 is_canceled: Some(false),
2177 metadata: None,
2178 }
2179 }
2180
2181 fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
2182 let mut tx = create_test_transaction(id);
2183 tx.relayer_id = relayer_id.to_string();
2184 tx
2185 }
2186
2187 fn create_test_transaction_with_status(
2188 id: &str,
2189 relayer_id: &str,
2190 status: TransactionStatus,
2191 ) -> TransactionRepoModel {
2192 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
2193 tx.status = status;
2194 tx
2195 }
2196
2197 fn create_test_transaction_with_nonce(
2198 id: &str,
2199 nonce: u64,
2200 relayer_id: &str,
2201 ) -> TransactionRepoModel {
2202 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
2203 if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
2204 evm_data.nonce = Some(nonce);
2205 }
2206 tx
2207 }
2208
2209 async fn setup_test_repo() -> RedisTransactionRepository {
2210 let redis_url = std::env::var("REDIS_TEST_URL")
2212 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
2213
2214 let cfg = Config::from_url(&redis_url);
2215 let pool = Arc::new(
2216 cfg.builder()
2217 .expect("Failed to create pool builder")
2218 .max_size(16)
2219 .runtime(Runtime::Tokio1)
2220 .build()
2221 .expect("Failed to build Redis pool"),
2222 );
2223
2224 let connections = Arc::new(RedisConnections::new_single_pool(pool));
2226
2227 let random_id = Uuid::new_v4().to_string();
2228 let key_prefix = format!("test_prefix:{random_id}");
2229
2230 RedisTransactionRepository::new(connections, key_prefix)
2231 .expect("Failed to create RedisTransactionRepository")
2232 }
2233
2234 #[tokio::test]
2235 #[ignore = "Requires active Redis instance"]
2236 async fn test_new_repository_creation() {
2237 let repo = setup_test_repo().await;
2238 assert!(repo.key_prefix.contains("test_prefix"));
2239 }
2240
2241 #[tokio::test]
2242 #[ignore = "Requires active Redis instance"]
2243 async fn test_new_repository_empty_prefix_fails() {
2244 let redis_url = std::env::var("REDIS_TEST_URL")
2245 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
2246 let cfg = Config::from_url(&redis_url);
2247 let pool = Arc::new(
2248 cfg.builder()
2249 .expect("Failed to create pool builder")
2250 .max_size(16)
2251 .runtime(Runtime::Tokio1)
2252 .build()
2253 .expect("Failed to build Redis pool"),
2254 );
2255 let connections = Arc::new(RedisConnections::new_single_pool(pool));
2256
2257 let result = RedisTransactionRepository::new(connections, "".to_string());
2258 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2259 }
2260
2261 #[tokio::test]
2262 #[ignore = "Requires active Redis instance"]
2263 async fn test_key_generation() {
2264 let repo = setup_test_repo().await;
2265
2266 assert!(repo
2267 .tx_key("relayer-1", "test-id")
2268 .contains(":relayer:relayer-1:tx:test-id"));
2269 assert!(repo
2270 .tx_to_relayer_key("test-id")
2271 .contains(":relayer:tx_to_relayer:test-id"));
2272 assert!(repo.relayer_list_key().contains(":relayer_list"));
2273 assert!(repo
2274 .relayer_status_key("relayer-1", &TransactionStatus::Pending)
2275 .contains(":relayer:relayer-1:status:Pending"));
2276 assert!(repo
2277 .relayer_nonce_key("relayer-1", 42)
2278 .contains(":relayer:relayer-1:nonce:42"));
2279 }
2280
2281 #[tokio::test]
2282 #[ignore = "Requires active Redis instance"]
2283 async fn test_serialize_deserialize_transaction() {
2284 let repo = setup_test_repo().await;
2285 let tx = create_test_transaction("test-1");
2286
2287 let serialized = repo
2288 .serialize_entity(&tx, |t| &t.id, "transaction")
2289 .expect("Serialization should succeed");
2290 let deserialized: TransactionRepoModel = repo
2291 .deserialize_entity(&serialized, "test-1", "transaction")
2292 .expect("Deserialization should succeed");
2293
2294 assert_eq!(tx.id, deserialized.id);
2295 assert_eq!(tx.relayer_id, deserialized.relayer_id);
2296 assert_eq!(tx.status, deserialized.status);
2297 }
2298
2299 #[tokio::test]
2300 #[ignore = "Requires active Redis instance"]
2301 async fn test_extract_nonce() {
2302 let repo = setup_test_repo().await;
2303 let random_id = Uuid::new_v4().to_string();
2304 let relayer_id = Uuid::new_v4().to_string();
2305 let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2306
2307 let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
2308 assert_eq!(nonce, Some(42));
2309 }
2310
2311 #[tokio::test]
2312 #[ignore = "Requires active Redis instance"]
2313 async fn test_create_transaction() {
2314 let repo = setup_test_repo().await;
2315 let random_id = Uuid::new_v4().to_string();
2316 let tx = create_test_transaction(&random_id);
2317
2318 let result = repo.create(tx.clone()).await.unwrap();
2319 assert_eq!(result.id, tx.id);
2320 }
2321
2322 #[tokio::test]
2323 #[ignore = "Requires active Redis instance"]
2324 async fn test_get_transaction() {
2325 let repo = setup_test_repo().await;
2326 let random_id = Uuid::new_v4().to_string();
2327 let tx = create_test_transaction(&random_id);
2328
2329 repo.create(tx.clone()).await.unwrap();
2330 let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
2331 assert_eq!(stored.id, tx.id);
2332 assert_eq!(stored.relayer_id, tx.relayer_id);
2333 }
2334
2335 #[tokio::test]
2336 #[ignore = "Requires active Redis instance"]
2337 async fn test_update_transaction() {
2338 let repo = setup_test_repo().await;
2339 let random_id = Uuid::new_v4().to_string();
2340 let mut tx = create_test_transaction(&random_id);
2341
2342 repo.create(tx.clone()).await.unwrap();
2343 tx.status = TransactionStatus::Confirmed;
2344
2345 let updated = repo.update(random_id.to_string(), tx).await.unwrap();
2346 assert!(matches!(updated.status, TransactionStatus::Confirmed));
2347 }
2348
2349 #[tokio::test]
2350 #[ignore = "Requires active Redis instance"]
2351 async fn test_delete_transaction() {
2352 let repo = setup_test_repo().await;
2353 let random_id = Uuid::new_v4().to_string();
2354 let tx = create_test_transaction(&random_id);
2355
2356 repo.create(tx).await.unwrap();
2357 repo.delete_by_id(random_id.to_string()).await.unwrap();
2358
2359 let result = repo.get_by_id(random_id.to_string()).await;
2360 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2361 }
2362
2363 #[tokio::test]
2364 #[ignore = "Requires active Redis instance"]
2365 async fn test_list_all_transactions() {
2366 let repo = setup_test_repo().await;
2367 let random_id = Uuid::new_v4().to_string();
2368 let random_id2 = Uuid::new_v4().to_string();
2369
2370 let tx1 = create_test_transaction(&random_id);
2371 let tx2 = create_test_transaction(&random_id2);
2372
2373 repo.create(tx1).await.unwrap();
2374 repo.create(tx2).await.unwrap();
2375
2376 let transactions = repo.list_all().await.unwrap();
2377 assert!(transactions.len() >= 2);
2378 }
2379
2380 #[tokio::test]
2381 #[ignore = "Requires active Redis instance"]
2382 async fn test_count_transactions() {
2383 let repo = setup_test_repo().await;
2384 let random_id = Uuid::new_v4().to_string();
2385 let tx = create_test_transaction(&random_id);
2386
2387 let count = repo.count().await.unwrap();
2388 repo.create(tx).await.unwrap();
2389 assert!(repo.count().await.unwrap() > count);
2390 }
2391
2392 #[tokio::test]
2393 #[ignore = "Requires active Redis instance"]
2394 async fn test_get_nonexistent_transaction() {
2395 let repo = setup_test_repo().await;
2396 let result = repo.get_by_id("nonexistent".to_string()).await;
2397 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2398 }
2399
2400 #[tokio::test]
2401 #[ignore = "Requires active Redis instance"]
2402 async fn test_duplicate_transaction_creation() {
2403 let repo = setup_test_repo().await;
2404 let random_id = Uuid::new_v4().to_string();
2405
2406 let tx = create_test_transaction(&random_id);
2407
2408 repo.create(tx.clone()).await.unwrap();
2409 let result = repo.create(tx).await;
2410
2411 assert!(matches!(
2412 result,
2413 Err(RepositoryError::ConstraintViolation(_))
2414 ));
2415 }
2416
2417 #[tokio::test]
2418 #[ignore = "Requires active Redis instance"]
2419 async fn test_update_nonexistent_transaction() {
2420 let repo = setup_test_repo().await;
2421 let tx = create_test_transaction("test-1");
2422
2423 let result = repo.update("nonexistent".to_string(), tx).await;
2424 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2425 }
2426
2427 #[tokio::test]
2428 #[ignore = "Requires active Redis instance"]
2429 async fn test_list_paginated() {
2430 let repo = setup_test_repo().await;
2431
2432 for _ in 1..=10 {
2434 let random_id = Uuid::new_v4().to_string();
2435 let tx = create_test_transaction(&random_id);
2436 repo.create(tx).await.unwrap();
2437 }
2438
2439 let query = PaginationQuery {
2441 page: 1,
2442 per_page: 3,
2443 };
2444 let result = repo.list_paginated(query).await.unwrap();
2445 assert_eq!(result.items.len(), 3);
2446 assert!(result.total >= 10);
2447 assert_eq!(result.page, 1);
2448 assert_eq!(result.per_page, 3);
2449
2450 let query = PaginationQuery {
2452 page: 1000,
2453 per_page: 3,
2454 };
2455 let result = repo.list_paginated(query).await.unwrap();
2456 assert_eq!(result.items.len(), 0);
2457 }
2458
2459 #[tokio::test]
2460 #[ignore = "Requires active Redis instance"]
2461 async fn test_find_by_relayer_id() {
2462 let repo = setup_test_repo().await;
2463 let random_id = Uuid::new_v4().to_string();
2464 let random_id2 = Uuid::new_v4().to_string();
2465 let random_id3 = Uuid::new_v4().to_string();
2466
2467 let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
2468 let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
2469 let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
2470
2471 repo.create(tx1).await.unwrap();
2472 repo.create(tx2).await.unwrap();
2473 repo.create(tx3).await.unwrap();
2474
2475 let query = PaginationQuery {
2477 page: 1,
2478 per_page: 10,
2479 };
2480 let result = repo
2481 .find_by_relayer_id("relayer-1", query.clone())
2482 .await
2483 .unwrap();
2484 assert!(result.total >= 2);
2485 assert!(result.items.len() >= 2);
2486 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
2487
2488 let result = repo
2490 .find_by_relayer_id("relayer-2", query.clone())
2491 .await
2492 .unwrap();
2493 assert!(result.total >= 1);
2494 assert!(!result.items.is_empty());
2495 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
2496
2497 let result = repo
2499 .find_by_relayer_id("non-existent", query.clone())
2500 .await
2501 .unwrap();
2502 assert_eq!(result.total, 0);
2503 assert_eq!(result.items.len(), 0);
2504 }
2505
2506 #[tokio::test]
2507 #[ignore = "Requires active Redis instance"]
2508 async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
2509 let repo = setup_test_repo().await;
2510 let relayer_id = Uuid::new_v4().to_string();
2511
2512 let mut tx1 = create_test_transaction_with_relayer("test-1", &relayer_id);
2514 tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); let mut tx2 = create_test_transaction_with_relayer("test-2", &relayer_id);
2517 tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); let mut tx3 = create_test_transaction_with_relayer("test-3", &relayer_id);
2520 tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); repo.create(tx2.clone()).await.unwrap(); repo.create(tx1.clone()).await.unwrap(); repo.create(tx3.clone()).await.unwrap(); let query = PaginationQuery {
2528 page: 1,
2529 per_page: 10,
2530 };
2531 let result = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2532
2533 assert_eq!(result.total, 3);
2534 assert_eq!(result.items.len(), 3);
2535
2536 assert_eq!(
2538 result.items[0].id, "test-3",
2539 "First item should be newest (test-3)"
2540 );
2541 assert_eq!(
2542 result.items[0].created_at,
2543 "2025-01-27T14:00:00.000000+00:00"
2544 );
2545
2546 assert_eq!(
2547 result.items[1].id, "test-2",
2548 "Second item should be middle (test-2)"
2549 );
2550 assert_eq!(
2551 result.items[1].created_at,
2552 "2025-01-27T12:00:00.000000+00:00"
2553 );
2554
2555 assert_eq!(
2556 result.items[2].id, "test-1",
2557 "Third item should be oldest (test-1)"
2558 );
2559 assert_eq!(
2560 result.items[2].created_at,
2561 "2025-01-27T10:00:00.000000+00:00"
2562 );
2563 }
2564
2565 #[tokio::test]
2566 #[ignore = "Requires active Redis instance"]
2567 async fn test_find_by_status() {
2568 let repo = setup_test_repo().await;
2569 let random_id = Uuid::new_v4().to_string();
2570 let random_id2 = Uuid::new_v4().to_string();
2571 let random_id3 = Uuid::new_v4().to_string();
2572 let relayer_id = Uuid::new_v4().to_string();
2573 let tx1 = create_test_transaction_with_status(
2574 &random_id,
2575 &relayer_id,
2576 TransactionStatus::Pending,
2577 );
2578 let tx2 =
2579 create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
2580 let tx3 = create_test_transaction_with_status(
2581 &random_id3,
2582 &relayer_id,
2583 TransactionStatus::Confirmed,
2584 );
2585
2586 repo.create(tx1).await.unwrap();
2587 repo.create(tx2).await.unwrap();
2588 repo.create(tx3).await.unwrap();
2589
2590 let result = repo
2592 .find_by_status(&relayer_id, &[TransactionStatus::Pending])
2593 .await
2594 .unwrap();
2595 assert_eq!(result.len(), 1);
2596 assert_eq!(result[0].status, TransactionStatus::Pending);
2597
2598 let result = repo
2600 .find_by_status(
2601 &relayer_id,
2602 &[TransactionStatus::Pending, TransactionStatus::Sent],
2603 )
2604 .await
2605 .unwrap();
2606 assert_eq!(result.len(), 2);
2607
2608 let result = repo
2610 .find_by_status(&relayer_id, &[TransactionStatus::Failed])
2611 .await
2612 .unwrap();
2613 assert_eq!(result.len(), 0);
2614 }
2615
2616 #[tokio::test]
2617 #[ignore = "Requires active Redis instance"]
2618 async fn test_find_by_status_paginated() {
2619 let repo = setup_test_repo().await;
2620 let relayer_id = Uuid::new_v4().to_string();
2621
2622 for i in 1..=5 {
2624 let tx_id = Uuid::new_v4().to_string();
2625 let mut tx = create_test_transaction_with_status(
2626 &tx_id,
2627 &relayer_id,
2628 TransactionStatus::Pending,
2629 );
2630 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2631 repo.create(tx).await.unwrap();
2632 }
2633
2634 for i in 6..=7 {
2636 let tx_id = Uuid::new_v4().to_string();
2637 let mut tx = create_test_transaction_with_status(
2638 &tx_id,
2639 &relayer_id,
2640 TransactionStatus::Confirmed,
2641 );
2642 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2643 repo.create(tx).await.unwrap();
2644 }
2645
2646 let query = PaginationQuery {
2648 page: 1,
2649 per_page: 2,
2650 };
2651 let result = repo
2652 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2653 .await
2654 .unwrap();
2655
2656 assert_eq!(result.total, 5);
2657 assert_eq!(result.items.len(), 2);
2658 assert_eq!(result.page, 1);
2659 assert_eq!(result.per_page, 2);
2660
2661 let query = PaginationQuery {
2663 page: 2,
2664 per_page: 2,
2665 };
2666 let result = repo
2667 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2668 .await
2669 .unwrap();
2670
2671 assert_eq!(result.total, 5);
2672 assert_eq!(result.items.len(), 2);
2673 assert_eq!(result.page, 2);
2674
2675 let query = PaginationQuery {
2677 page: 3,
2678 per_page: 2,
2679 };
2680 let result = repo
2681 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2682 .await
2683 .unwrap();
2684
2685 assert_eq!(result.total, 5);
2686 assert_eq!(result.items.len(), 1);
2687
2688 let query = PaginationQuery {
2690 page: 1,
2691 per_page: 10,
2692 };
2693 let result = repo
2694 .find_by_status_paginated(
2695 &relayer_id,
2696 &[TransactionStatus::Pending, TransactionStatus::Confirmed],
2697 query,
2698 false,
2699 )
2700 .await
2701 .unwrap();
2702
2703 assert_eq!(result.total, 7);
2704 assert_eq!(result.items.len(), 7);
2705
2706 let query = PaginationQuery {
2708 page: 1,
2709 per_page: 10,
2710 };
2711 let result = repo
2712 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Failed], query, false)
2713 .await
2714 .unwrap();
2715
2716 assert_eq!(result.total, 0);
2717 assert_eq!(result.items.len(), 0);
2718 }
2719
2720 #[tokio::test]
2721 #[ignore = "Requires active Redis instance"]
2722 async fn test_find_by_status_paginated_oldest_first() {
2723 let repo = setup_test_repo().await;
2724 let relayer_id = Uuid::new_v4().to_string();
2725
2726 for i in 1..=5 {
2728 let tx_id = format!("tx{}-{}", i, Uuid::new_v4());
2729 let mut tx = create_test_transaction(&tx_id);
2730 tx.relayer_id = relayer_id.clone();
2731 tx.status = TransactionStatus::Pending;
2732 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2733 repo.create(tx).await.unwrap();
2734 }
2735
2736 let query = PaginationQuery {
2738 page: 1,
2739 per_page: 3,
2740 };
2741 let result = repo
2742 .find_by_status_paginated(
2743 &relayer_id,
2744 &[TransactionStatus::Pending],
2745 query.clone(),
2746 true,
2747 )
2748 .await
2749 .unwrap();
2750
2751 assert_eq!(result.total, 5);
2752 assert_eq!(result.items.len(), 3);
2753 assert!(
2755 result.items[0].created_at < result.items[1].created_at,
2756 "First item should be older than second"
2757 );
2758 assert!(
2759 result.items[1].created_at < result.items[2].created_at,
2760 "Second item should be older than third"
2761 );
2762
2763 let result_newest = repo
2765 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2766 .await
2767 .unwrap();
2768
2769 assert_eq!(result_newest.items.len(), 3);
2770 assert!(
2772 result_newest.items[0].created_at > result_newest.items[1].created_at,
2773 "First item should be newer than second"
2774 );
2775 assert!(
2776 result_newest.items[1].created_at > result_newest.items[2].created_at,
2777 "Second item should be newer than third"
2778 );
2779 }
2780
2781 #[tokio::test]
2782 #[ignore = "Requires active Redis instance"]
2783 async fn test_find_by_status_paginated_oldest_first_single_item() {
2784 let repo = setup_test_repo().await;
2785 let relayer_id = Uuid::new_v4().to_string();
2786
2787 let timestamps = [
2789 "2025-01-27T08:00:00.000000+00:00", "2025-01-27T10:00:00.000000+00:00", "2025-01-27T12:00:00.000000+00:00", ];
2793
2794 let mut oldest_id = String::new();
2795 let mut newest_id = String::new();
2796
2797 for (i, timestamp) in timestamps.iter().enumerate() {
2798 let tx_id = format!("tx-{}-{}", i, Uuid::new_v4());
2799 if i == 0 {
2800 oldest_id = tx_id.clone();
2801 }
2802 if i == 2 {
2803 newest_id = tx_id.clone();
2804 }
2805 let mut tx = create_test_transaction(&tx_id);
2806 tx.relayer_id = relayer_id.clone();
2807 tx.status = TransactionStatus::Pending;
2808 tx.created_at = timestamp.to_string();
2809 repo.create(tx).await.unwrap();
2810 }
2811
2812 let query = PaginationQuery {
2814 page: 1,
2815 per_page: 1,
2816 };
2817 let result = repo
2818 .find_by_status_paginated(
2819 &relayer_id,
2820 &[TransactionStatus::Pending],
2821 query.clone(),
2822 true,
2823 )
2824 .await
2825 .unwrap();
2826
2827 assert_eq!(result.total, 3);
2828 assert_eq!(result.items.len(), 1);
2829 assert_eq!(
2830 result.items[0].id, oldest_id,
2831 "With oldest_first=true and per_page=1, should return the oldest transaction"
2832 );
2833
2834 let result = repo
2836 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2837 .await
2838 .unwrap();
2839
2840 assert_eq!(result.items.len(), 1);
2841 assert_eq!(
2842 result.items[0].id, newest_id,
2843 "With oldest_first=false and per_page=1, should return the newest transaction"
2844 );
2845 }
2846
2847 #[tokio::test]
2848 #[ignore = "Requires active Redis instance"]
2849 async fn test_find_by_nonce() {
2850 let repo = setup_test_repo().await;
2851 let random_id = Uuid::new_v4().to_string();
2852 let random_id2 = Uuid::new_v4().to_string();
2853 let relayer_id = Uuid::new_v4().to_string();
2854
2855 let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2856 let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
2857
2858 repo.create(tx1.clone()).await.unwrap();
2859 repo.create(tx2).await.unwrap();
2860
2861 let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2863 assert!(result.is_some());
2864 assert_eq!(result.unwrap().id, random_id);
2865
2866 let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
2868 assert!(result.is_none());
2869
2870 let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
2872 assert!(result.is_none());
2873 }
2874
2875 #[tokio::test]
2876 #[ignore = "Requires active Redis instance"]
2877 async fn test_update_status() {
2878 let repo = setup_test_repo().await;
2879 let random_id = Uuid::new_v4().to_string();
2880 let tx = create_test_transaction(&random_id);
2881
2882 repo.create(tx).await.unwrap();
2883 let updated = repo
2884 .update_status(random_id.to_string(), TransactionStatus::Confirmed)
2885 .await
2886 .unwrap();
2887 assert_eq!(updated.status, TransactionStatus::Confirmed);
2888 }
2889
2890 #[tokio::test]
2891 #[ignore = "Requires active Redis instance"]
2892 async fn test_partial_update() {
2893 let repo = setup_test_repo().await;
2894 let random_id = Uuid::new_v4().to_string();
2895 let tx = create_test_transaction(&random_id);
2896
2897 repo.create(tx).await.unwrap();
2898
2899 let update = TransactionUpdateRequest {
2900 status: Some(TransactionStatus::Sent),
2901 status_reason: Some("Transaction sent".to_string()),
2902 sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
2903 confirmed_at: None,
2904 network_data: None,
2905 hashes: None,
2906 is_canceled: None,
2907 priced_at: None,
2908 noop_count: None,
2909 delete_at: None,
2910 metadata: None,
2911 };
2912
2913 let updated = repo
2914 .partial_update(random_id.to_string(), update)
2915 .await
2916 .unwrap();
2917 assert_eq!(updated.status, TransactionStatus::Sent);
2918 assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
2919 assert_eq!(
2920 updated.sent_at,
2921 Some("2025-01-27T16:00:00.000000+00:00".to_string())
2922 );
2923 }
2924
2925 #[tokio::test]
2926 #[ignore = "Requires active Redis instance"]
2927 async fn test_set_sent_at() {
2928 let repo = setup_test_repo().await;
2929 let random_id = Uuid::new_v4().to_string();
2930 let tx = create_test_transaction(&random_id);
2931
2932 repo.create(tx).await.unwrap();
2933 let updated = repo
2934 .set_sent_at(
2935 random_id.to_string(),
2936 "2025-01-27T16:00:00.000000+00:00".to_string(),
2937 )
2938 .await
2939 .unwrap();
2940 assert_eq!(
2941 updated.sent_at,
2942 Some("2025-01-27T16:00:00.000000+00:00".to_string())
2943 );
2944 }
2945
2946 #[tokio::test]
2947 #[ignore = "Requires active Redis instance"]
2948 async fn test_set_confirmed_at() {
2949 let repo = setup_test_repo().await;
2950 let random_id = Uuid::new_v4().to_string();
2951 let tx = create_test_transaction(&random_id);
2952
2953 repo.create(tx).await.unwrap();
2954 let updated = repo
2955 .set_confirmed_at(
2956 random_id.to_string(),
2957 "2025-01-27T16:00:00.000000+00:00".to_string(),
2958 )
2959 .await
2960 .unwrap();
2961 assert_eq!(
2962 updated.confirmed_at,
2963 Some("2025-01-27T16:00:00.000000+00:00".to_string())
2964 );
2965 }
2966
2967 #[tokio::test]
2968 #[ignore = "Requires active Redis instance"]
2969 async fn test_update_network_data() {
2970 let repo = setup_test_repo().await;
2971 let random_id = Uuid::new_v4().to_string();
2972 let tx = create_test_transaction(&random_id);
2973
2974 repo.create(tx).await.unwrap();
2975
2976 let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
2977 gas_price: Some(2000000000),
2978 gas_limit: Some(42000),
2979 nonce: Some(2),
2980 value: U256::from_str("2000000000000000000").unwrap(),
2981 data: Some("0x1234".to_string()),
2982 from: "0xNewSender".to_string(),
2983 to: Some("0xNewRecipient".to_string()),
2984 chain_id: 1,
2985 signature: None,
2986 hash: Some("0xnewhash".to_string()),
2987 speed: Some(Speed::SafeLow),
2988 max_fee_per_gas: None,
2989 max_priority_fee_per_gas: None,
2990 raw: None,
2991 });
2992
2993 let updated = repo
2994 .update_network_data(random_id.to_string(), new_network_data.clone())
2995 .await
2996 .unwrap();
2997 assert_eq!(
2998 updated
2999 .network_data
3000 .get_evm_transaction_data()
3001 .unwrap()
3002 .hash,
3003 new_network_data.get_evm_transaction_data().unwrap().hash
3004 );
3005 }
3006
3007 #[tokio::test]
3008 #[ignore = "Requires active Redis instance"]
3009 async fn test_debug_implementation() {
3010 let repo = setup_test_repo().await;
3011 let debug_str = format!("{repo:?}");
3012 assert!(debug_str.contains("RedisTransactionRepository"));
3013 assert!(debug_str.contains("test_prefix"));
3014 }
3015
3016 #[tokio::test]
3017 #[ignore = "Requires active Redis instance"]
3018 async fn test_error_handling_empty_id() {
3019 let repo = setup_test_repo().await;
3020
3021 let result = repo.get_by_id("".to_string()).await;
3022 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3023
3024 let result = repo
3025 .update("".to_string(), create_test_transaction("test"))
3026 .await;
3027 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3028
3029 let result = repo.delete_by_id("".to_string()).await;
3030 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3031 }
3032
3033 #[tokio::test]
3034 #[ignore = "Requires active Redis instance"]
3035 async fn test_pagination_validation() {
3036 let repo = setup_test_repo().await;
3037
3038 let query = PaginationQuery {
3039 page: 1,
3040 per_page: 0,
3041 };
3042 let result = repo.list_paginated(query).await;
3043 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3044 }
3045
3046 #[tokio::test]
3047 #[ignore = "Requires active Redis instance"]
3048 async fn test_index_consistency() {
3049 let repo = setup_test_repo().await;
3050 let random_id = Uuid::new_v4().to_string();
3051 let relayer_id = Uuid::new_v4().to_string();
3052 let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
3053
3054 repo.create(tx.clone()).await.unwrap();
3056
3057 let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
3059 assert!(found.is_some());
3060
3061 let mut updated_tx = tx.clone();
3063 if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
3064 evm_data.nonce = Some(43);
3065 }
3066
3067 repo.update(random_id.to_string(), updated_tx)
3068 .await
3069 .unwrap();
3070
3071 let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
3073 assert!(old_nonce_result.is_none());
3074
3075 let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
3077 assert!(new_nonce_result.is_some());
3078 }
3079
3080 #[tokio::test]
3081 #[ignore = "Requires active Redis instance"]
3082 async fn test_has_entries() {
3083 let repo = setup_test_repo().await;
3084 assert!(!repo.has_entries().await.unwrap());
3085
3086 let tx_id = uuid::Uuid::new_v4().to_string();
3087 let tx = create_test_transaction(&tx_id);
3088 repo.create(tx.clone()).await.unwrap();
3089
3090 assert!(repo.has_entries().await.unwrap());
3091 }
3092
3093 #[tokio::test]
3094 #[ignore = "Requires active Redis instance"]
3095 async fn test_drop_all_entries() {
3096 let repo = setup_test_repo().await;
3097 let tx_id = uuid::Uuid::new_v4().to_string();
3098 let tx = create_test_transaction(&tx_id);
3099 repo.create(tx.clone()).await.unwrap();
3100 assert!(repo.has_entries().await.unwrap());
3101
3102 repo.drop_all_entries().await.unwrap();
3103 assert!(!repo.has_entries().await.unwrap());
3104 }
3105
3106 #[tokio::test]
3108 #[ignore = "Requires active Redis instance"]
3109 async fn test_update_status_sets_delete_at_for_final_statuses() {
3110 let _lock = ENV_MUTEX.lock().await;
3111
3112 use chrono::{DateTime, Duration, Utc};
3113 use std::env;
3114
3115 env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
3117
3118 let repo = setup_test_repo().await;
3119
3120 let final_statuses = [
3121 TransactionStatus::Canceled,
3122 TransactionStatus::Confirmed,
3123 TransactionStatus::Failed,
3124 TransactionStatus::Expired,
3125 ];
3126
3127 for (i, status) in final_statuses.iter().enumerate() {
3128 let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
3129 let mut tx = create_test_transaction(&tx_id);
3130
3131 tx.delete_at = None;
3133 tx.status = TransactionStatus::Pending;
3134
3135 repo.create(tx).await.unwrap();
3136
3137 let before_update = Utc::now();
3138
3139 let updated = repo
3141 .update_status(tx_id.clone(), status.clone())
3142 .await
3143 .unwrap();
3144
3145 assert!(
3147 updated.delete_at.is_some(),
3148 "delete_at should be set for status: {status:?}"
3149 );
3150
3151 let delete_at_str = updated.delete_at.unwrap();
3153 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
3154 .expect("delete_at should be valid RFC3339")
3155 .with_timezone(&Utc);
3156
3157 let duration_from_before = delete_at.signed_duration_since(before_update);
3158 let expected_duration = Duration::hours(6);
3159 let tolerance = Duration::minutes(5);
3160
3161 assert!(
3162 duration_from_before >= expected_duration - tolerance
3163 && duration_from_before <= expected_duration + tolerance,
3164 "delete_at should be approximately 6 hours from now for status: {status:?}. Duration: {duration_from_before:?}"
3165 );
3166 }
3167
3168 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3170 }
3171
3172 #[tokio::test]
3173 #[ignore = "Requires active Redis instance"]
3174 async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
3175 let _lock = ENV_MUTEX.lock().await;
3176
3177 use std::env;
3178
3179 env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
3180
3181 let repo = setup_test_repo().await;
3182
3183 let non_final_statuses = [
3184 TransactionStatus::Pending,
3185 TransactionStatus::Sent,
3186 TransactionStatus::Submitted,
3187 TransactionStatus::Mined,
3188 ];
3189
3190 for (i, status) in non_final_statuses.iter().enumerate() {
3191 let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
3192 let mut tx = create_test_transaction(&tx_id);
3193 tx.delete_at = None;
3194 tx.status = TransactionStatus::Pending;
3195
3196 repo.create(tx).await.unwrap();
3197
3198 let updated = repo
3200 .update_status(tx_id.clone(), status.clone())
3201 .await
3202 .unwrap();
3203
3204 assert!(
3206 updated.delete_at.is_none(),
3207 "delete_at should NOT be set for status: {status:?}"
3208 );
3209 }
3210
3211 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3213 }
3214
3215 #[tokio::test]
3216 #[ignore = "Requires active Redis instance"]
3217 async fn test_partial_update_sets_delete_at_for_final_statuses() {
3218 let _lock = ENV_MUTEX.lock().await;
3219
3220 use chrono::{DateTime, Duration, Utc};
3221 use std::env;
3222
3223 env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
3224
3225 let repo = setup_test_repo().await;
3226 let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
3227 let mut tx = create_test_transaction(&tx_id);
3228 tx.delete_at = None;
3229 tx.status = TransactionStatus::Pending;
3230
3231 repo.create(tx).await.unwrap();
3232
3233 let before_update = Utc::now();
3234
3235 let update = TransactionUpdateRequest {
3237 status: Some(TransactionStatus::Confirmed),
3238 status_reason: Some("Transaction completed".to_string()),
3239 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
3240 ..Default::default()
3241 };
3242
3243 let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
3244
3245 assert!(
3247 updated.delete_at.is_some(),
3248 "delete_at should be set when updating to Confirmed status"
3249 );
3250
3251 let delete_at_str = updated.delete_at.unwrap();
3253 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
3254 .expect("delete_at should be valid RFC3339")
3255 .with_timezone(&Utc);
3256
3257 let duration_from_before = delete_at.signed_duration_since(before_update);
3258 let expected_duration = Duration::hours(8);
3259 let tolerance = Duration::minutes(5);
3260
3261 assert!(
3262 duration_from_before >= expected_duration - tolerance
3263 && duration_from_before <= expected_duration + tolerance,
3264 "delete_at should be approximately 8 hours from now. Duration: {duration_from_before:?}"
3265 );
3266
3267 assert_eq!(updated.status, TransactionStatus::Confirmed);
3269 assert_eq!(
3270 updated.status_reason,
3271 Some("Transaction completed".to_string())
3272 );
3273 assert_eq!(
3274 updated.confirmed_at,
3275 Some("2023-01-01T12:05:00Z".to_string())
3276 );
3277
3278 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3280 }
3281
3282 #[tokio::test]
3283 #[ignore = "Requires active Redis instance"]
3284 async fn test_update_status_preserves_existing_delete_at() {
3285 let _lock = ENV_MUTEX.lock().await;
3286
3287 use std::env;
3288
3289 env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
3290
3291 let repo = setup_test_repo().await;
3292 let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
3293 let mut tx = create_test_transaction(&tx_id);
3294
3295 let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
3297 tx.delete_at = Some(existing_delete_at.clone());
3298 tx.status = TransactionStatus::Pending;
3299
3300 repo.create(tx).await.unwrap();
3301
3302 let updated = repo
3304 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3305 .await
3306 .unwrap();
3307
3308 assert_eq!(
3310 updated.delete_at,
3311 Some(existing_delete_at),
3312 "Existing delete_at should be preserved when updating to final status"
3313 );
3314
3315 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3317 }
3318 #[tokio::test]
3319 #[ignore = "Requires active Redis instance"]
3320 async fn test_partial_update_without_status_change_preserves_delete_at() {
3321 let _lock = ENV_MUTEX.lock().await;
3322
3323 use std::env;
3324
3325 env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
3326
3327 let repo = setup_test_repo().await;
3328 let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
3329 let mut tx = create_test_transaction(&tx_id);
3330 tx.delete_at = None;
3331 tx.status = TransactionStatus::Pending;
3332
3333 repo.create(tx).await.unwrap();
3334
3335 let updated1 = repo
3337 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3338 .await
3339 .unwrap();
3340
3341 assert!(updated1.delete_at.is_some());
3342 let original_delete_at = updated1.delete_at.clone();
3343
3344 let update = TransactionUpdateRequest {
3346 status: None, status_reason: Some("Updated reason".to_string()),
3348 confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
3349 ..Default::default()
3350 };
3351
3352 let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
3353
3354 assert_eq!(
3356 updated2.delete_at, original_delete_at,
3357 "delete_at should be preserved when status is not updated"
3358 );
3359
3360 assert_eq!(updated2.status, TransactionStatus::Confirmed); assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
3363 assert_eq!(
3364 updated2.confirmed_at,
3365 Some("2023-01-01T12:10:00Z".to_string())
3366 );
3367
3368 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3370 }
3371
3372 #[tokio::test]
3375 #[ignore = "Requires active Redis instance"]
3376 async fn test_delete_by_ids_empty_list() {
3377 let repo = setup_test_repo().await;
3378 let tx_id = format!("test-empty-{}", Uuid::new_v4());
3379
3380 let tx = create_test_transaction(&tx_id);
3382 repo.create(tx).await.unwrap();
3383
3384 let result = repo.delete_by_ids(vec![]).await.unwrap();
3386
3387 assert_eq!(result.deleted_count, 0);
3388 assert!(result.failed.is_empty());
3389
3390 assert!(repo.get_by_id(tx_id).await.is_ok());
3392 }
3393
3394 #[tokio::test]
3395 #[ignore = "Requires active Redis instance"]
3396 async fn test_delete_by_ids_single_transaction() {
3397 let repo = setup_test_repo().await;
3398 let tx_id = format!("test-single-{}", Uuid::new_v4());
3399
3400 let tx = create_test_transaction(&tx_id);
3401 repo.create(tx).await.unwrap();
3402
3403 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3404
3405 assert_eq!(result.deleted_count, 1);
3406 assert!(result.failed.is_empty());
3407
3408 assert!(repo.get_by_id(tx_id).await.is_err());
3410 }
3411
3412 #[tokio::test]
3413 #[ignore = "Requires active Redis instance"]
3414 async fn test_delete_by_ids_multiple_transactions() {
3415 let repo = setup_test_repo().await;
3416 let base_id = Uuid::new_v4();
3417
3418 let mut created_ids = Vec::new();
3420 for i in 1..=5 {
3421 let tx_id = format!("test-multi-{base_id}-{i}");
3422 let tx = create_test_transaction(&tx_id);
3423 repo.create(tx).await.unwrap();
3424 created_ids.push(tx_id);
3425 }
3426
3427 let ids_to_delete = vec![
3429 created_ids[0].clone(),
3430 created_ids[2].clone(),
3431 created_ids[4].clone(),
3432 ];
3433 let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3434
3435 assert_eq!(result.deleted_count, 3);
3436 assert!(result.failed.is_empty());
3437
3438 assert!(repo.get_by_id(created_ids[0].clone()).await.is_err());
3440 assert!(repo.get_by_id(created_ids[1].clone()).await.is_ok()); assert!(repo.get_by_id(created_ids[2].clone()).await.is_err());
3442 assert!(repo.get_by_id(created_ids[3].clone()).await.is_ok()); assert!(repo.get_by_id(created_ids[4].clone()).await.is_err());
3444 }
3445
3446 #[tokio::test]
3447 #[ignore = "Requires active Redis instance"]
3448 async fn test_delete_by_ids_nonexistent_transactions() {
3449 let repo = setup_test_repo().await;
3450 let base_id = Uuid::new_v4();
3451
3452 let ids_to_delete = vec![
3454 format!("nonexistent-{}-1", base_id),
3455 format!("nonexistent-{}-2", base_id),
3456 ];
3457 let result = repo.delete_by_ids(ids_to_delete.clone()).await.unwrap();
3458
3459 assert_eq!(result.deleted_count, 0);
3460 assert_eq!(result.failed.len(), 2);
3461
3462 let failed_ids: Vec<&String> = result.failed.iter().map(|(id, _)| id).collect();
3464 assert!(failed_ids.contains(&&ids_to_delete[0]));
3465 assert!(failed_ids.contains(&&ids_to_delete[1]));
3466 }
3467
3468 #[tokio::test]
3469 #[ignore = "Requires active Redis instance"]
3470 async fn test_delete_by_ids_mixed_existing_and_nonexistent() {
3471 let repo = setup_test_repo().await;
3472 let base_id = Uuid::new_v4();
3473
3474 let existing_ids: Vec<String> = (1..=3)
3476 .map(|i| format!("test-mixed-existing-{base_id}-{i}"))
3477 .collect();
3478
3479 for id in &existing_ids {
3480 let tx = create_test_transaction(id);
3481 repo.create(tx).await.unwrap();
3482 }
3483
3484 let nonexistent_ids: Vec<String> = (1..=2)
3485 .map(|i| format!("test-mixed-nonexistent-{base_id}-{i}"))
3486 .collect();
3487
3488 let ids_to_delete = vec![
3490 existing_ids[0].clone(),
3491 nonexistent_ids[0].clone(),
3492 existing_ids[1].clone(),
3493 nonexistent_ids[1].clone(),
3494 ];
3495 let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3496
3497 assert_eq!(result.deleted_count, 2);
3498 assert_eq!(result.failed.len(), 2);
3499
3500 assert!(repo.get_by_id(existing_ids[0].clone()).await.is_err());
3502 assert!(repo.get_by_id(existing_ids[1].clone()).await.is_err());
3503
3504 assert!(repo.get_by_id(existing_ids[2].clone()).await.is_ok());
3506 }
3507
3508 #[tokio::test]
3509 #[ignore = "Requires active Redis instance"]
3510 async fn test_delete_by_ids_removes_all_indexes() {
3511 let repo = setup_test_repo().await;
3512 let relayer_id = format!("relayer-{}", Uuid::new_v4());
3513 let tx_id = format!("test-indexes-{}", Uuid::new_v4());
3514
3515 let mut tx = create_test_transaction(&tx_id);
3517 tx.relayer_id = relayer_id.clone();
3518 tx.status = TransactionStatus::Confirmed;
3519 repo.create(tx).await.unwrap();
3520
3521 let found = repo
3523 .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3524 .await
3525 .unwrap();
3526 assert!(found.iter().any(|t| t.id == tx_id));
3527
3528 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3530 assert_eq!(result.deleted_count, 1);
3531
3532 let found_after = repo
3534 .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3535 .await
3536 .unwrap();
3537 assert!(!found_after.iter().any(|t| t.id == tx_id));
3538
3539 assert!(repo.get_by_id(tx_id).await.is_err());
3541 }
3542
3543 #[tokio::test]
3544 #[ignore = "Requires active Redis instance"]
3545 async fn test_delete_by_ids_removes_nonce_index() {
3546 let repo = setup_test_repo().await;
3547 let relayer_id = format!("relayer-{}", Uuid::new_v4());
3548 let tx_id = format!("test-nonce-{}", Uuid::new_v4());
3549 let nonce = 12345u64;
3550
3551 let tx = create_test_transaction_with_nonce(&tx_id, nonce, &relayer_id);
3553 repo.create(tx).await.unwrap();
3554
3555 let found = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3557 assert!(found.is_some());
3558 assert_eq!(found.unwrap().id, tx_id);
3559
3560 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3562 assert_eq!(result.deleted_count, 1);
3563
3564 let found_after = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3566 assert!(found_after.is_none());
3567 }
3568
3569 #[tokio::test]
3570 #[ignore = "Requires active Redis instance"]
3571 async fn test_delete_by_ids_large_batch() {
3572 let repo = setup_test_repo().await;
3573 let base_id = Uuid::new_v4();
3574
3575 let count = 50;
3577 let mut created_ids = Vec::new();
3578
3579 for i in 0..count {
3580 let tx_id = format!("test-large-{base_id}-{i}");
3581 let tx = create_test_transaction(&tx_id);
3582 repo.create(tx).await.unwrap();
3583 created_ids.push(tx_id);
3584 }
3585
3586 let result = repo.delete_by_ids(created_ids.clone()).await.unwrap();
3588
3589 assert_eq!(result.deleted_count, count);
3590 assert!(result.failed.is_empty());
3591
3592 for id in created_ids {
3594 assert!(repo.get_by_id(id).await.is_err());
3595 }
3596 }
3597
3598 #[tokio::test]
3599 #[ignore = "Requires active Redis instance"]
3600 async fn test_delete_by_ids_preserves_other_relayer_transactions() {
3601 let repo = setup_test_repo().await;
3602 let relayer_1 = format!("relayer-1-{}", Uuid::new_v4());
3603 let relayer_2 = format!("relayer-2-{}", Uuid::new_v4());
3604 let tx_id_1 = format!("tx-relayer-1-{}", Uuid::new_v4());
3605 let tx_id_2 = format!("tx-relayer-2-{}", Uuid::new_v4());
3606
3607 let tx1 = create_test_transaction_with_relayer(&tx_id_1, &relayer_1);
3609 let tx2 = create_test_transaction_with_relayer(&tx_id_2, &relayer_2);
3610
3611 repo.create(tx1).await.unwrap();
3612 repo.create(tx2).await.unwrap();
3613
3614 let result = repo.delete_by_ids(vec![tx_id_1.clone()]).await.unwrap();
3616
3617 assert_eq!(result.deleted_count, 1);
3618
3619 assert!(repo.get_by_id(tx_id_1).await.is_err());
3621
3622 let remaining = repo.get_by_id(tx_id_2).await.unwrap();
3624 assert_eq!(remaining.relayer_id, relayer_2);
3625 }
3626
3627 #[tokio::test]
3630 #[ignore = "Requires active Redis instance"]
3631 async fn test_increment_status_check_failures_no_prior_metadata() {
3632 let _lock = ENV_MUTEX.lock().await;
3633 let repo = setup_test_repo().await;
3634 let relayer_id = Uuid::new_v4().to_string();
3635 let tx_id = Uuid::new_v4().to_string();
3636 let mut tx =
3637 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3638 tx.metadata = None;
3639 repo.create(tx).await.unwrap();
3640
3641 let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3642
3643 let meta = updated.metadata.expect("metadata should be set");
3644 assert_eq!(meta.consecutive_failures, 1);
3645 assert_eq!(meta.total_failures, 1);
3646 assert_eq!(meta.insufficient_fee_retries, 0);
3647 }
3648
3649 #[tokio::test]
3650 #[ignore = "Requires active Redis instance"]
3651 async fn test_increment_status_check_failures_accumulates() {
3652 let _lock = ENV_MUTEX.lock().await;
3653 let repo = setup_test_repo().await;
3654 let relayer_id = Uuid::new_v4().to_string();
3655 let tx_id = Uuid::new_v4().to_string();
3656 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3657 repo.create(tx).await.unwrap();
3658
3659 repo.increment_status_check_failures(tx_id.clone())
3660 .await
3661 .unwrap();
3662 repo.increment_status_check_failures(tx_id.clone())
3663 .await
3664 .unwrap();
3665 let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3666
3667 let meta = updated.metadata.unwrap();
3668 assert_eq!(meta.consecutive_failures, 3);
3669 assert_eq!(meta.total_failures, 3);
3670 }
3671
3672 #[tokio::test]
3673 #[ignore = "Requires active Redis instance"]
3674 async fn test_increment_status_check_failures_noop_on_final_state() {
3675 let _lock = ENV_MUTEX.lock().await;
3676 let repo = setup_test_repo().await;
3677 let relayer_id = Uuid::new_v4().to_string();
3678 let tx_id = Uuid::new_v4().to_string();
3679 let tx =
3680 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3681 repo.create(tx).await.unwrap();
3682
3683 let result = repo.increment_status_check_failures(tx_id).await.unwrap();
3684
3685 assert!(result.metadata.is_none());
3687 assert_eq!(result.status, TransactionStatus::Confirmed);
3688 }
3689
3690 #[tokio::test]
3691 #[ignore = "Requires active Redis instance"]
3692 async fn test_increment_status_check_failures_not_found() {
3693 let _lock = ENV_MUTEX.lock().await;
3694 let repo = setup_test_repo().await;
3695
3696 let result = repo
3697 .increment_status_check_failures("nonexistent".to_string())
3698 .await;
3699
3700 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3701 }
3702
3703 #[tokio::test]
3706 #[ignore = "Requires active Redis instance"]
3707 async fn test_reset_consecutive_failures() {
3708 let _lock = ENV_MUTEX.lock().await;
3709 let repo = setup_test_repo().await;
3710 let relayer_id = Uuid::new_v4().to_string();
3711 let tx_id = Uuid::new_v4().to_string();
3712 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3713 repo.create(tx).await.unwrap();
3714
3715 repo.increment_status_check_failures(tx_id.clone())
3717 .await
3718 .unwrap();
3719 repo.increment_status_check_failures(tx_id.clone())
3720 .await
3721 .unwrap();
3722
3723 let updated = repo
3724 .reset_status_check_consecutive_failures(tx_id)
3725 .await
3726 .unwrap();
3727
3728 let meta = updated.metadata.unwrap();
3729 assert_eq!(meta.consecutive_failures, 0);
3730 assert_eq!(meta.total_failures, 2);
3732 }
3733
3734 #[tokio::test]
3735 #[ignore = "Requires active Redis instance"]
3736 async fn test_reset_consecutive_failures_noop_on_final_state() {
3737 let _lock = ENV_MUTEX.lock().await;
3738 let repo = setup_test_repo().await;
3739 let relayer_id = Uuid::new_v4().to_string();
3740 let tx_id = Uuid::new_v4().to_string();
3741 let mut tx =
3742 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Failed);
3743 tx.metadata = Some(crate::models::TransactionMetadata {
3744 consecutive_failures: 5,
3745 total_failures: 10,
3746 insufficient_fee_retries: 0,
3747 try_again_later_retries: 0,
3748 });
3749 repo.create(tx).await.unwrap();
3750
3751 let result = repo
3752 .reset_status_check_consecutive_failures(tx_id)
3753 .await
3754 .unwrap();
3755
3756 let meta = result.metadata.unwrap();
3758 assert_eq!(meta.consecutive_failures, 5);
3759 }
3760
3761 #[tokio::test]
3762 #[ignore = "Requires active Redis instance"]
3763 async fn test_reset_consecutive_failures_not_found() {
3764 let _lock = ENV_MUTEX.lock().await;
3765 let repo = setup_test_repo().await;
3766
3767 let result = repo
3768 .reset_status_check_consecutive_failures("nonexistent".to_string())
3769 .await;
3770
3771 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3772 }
3773
3774 #[tokio::test]
3777 #[ignore = "Requires active Redis instance"]
3778 async fn test_record_insufficient_fee_retry() {
3779 let _lock = ENV_MUTEX.lock().await;
3780 let repo = setup_test_repo().await;
3781 let relayer_id = Uuid::new_v4().to_string();
3782 let tx_id = Uuid::new_v4().to_string();
3783 let mut tx =
3784 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3785 tx.sent_at = None;
3786 repo.create(tx).await.unwrap();
3787
3788 let updated = repo
3789 .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:00:00Z".to_string())
3790 .await
3791 .unwrap();
3792
3793 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
3794 let meta = updated.metadata.unwrap();
3795 assert_eq!(meta.insufficient_fee_retries, 1);
3796 assert_eq!(meta.consecutive_failures, 0);
3797 assert_eq!(meta.total_failures, 0);
3798 }
3799
3800 #[tokio::test]
3801 #[ignore = "Requires active Redis instance"]
3802 async fn test_record_insufficient_fee_retry_accumulates() {
3803 let _lock = ENV_MUTEX.lock().await;
3804 let repo = setup_test_repo().await;
3805 let relayer_id = Uuid::new_v4().to_string();
3806 let tx_id = Uuid::new_v4().to_string();
3807 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3808 repo.create(tx).await.unwrap();
3809
3810 repo.record_stellar_insufficient_fee_retry(
3811 tx_id.clone(),
3812 "2025-03-18T10:00:00Z".to_string(),
3813 )
3814 .await
3815 .unwrap();
3816
3817 let updated = repo
3818 .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:01:00Z".to_string())
3819 .await
3820 .unwrap();
3821
3822 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
3823 let meta = updated.metadata.unwrap();
3824 assert_eq!(meta.insufficient_fee_retries, 2);
3825 }
3826
3827 #[tokio::test]
3828 #[ignore = "Requires active Redis instance"]
3829 async fn test_record_insufficient_fee_retry_noop_on_final_state() {
3830 let _lock = ENV_MUTEX.lock().await;
3831 let repo = setup_test_repo().await;
3832 let relayer_id = Uuid::new_v4().to_string();
3833 let tx_id = Uuid::new_v4().to_string();
3834 let mut tx =
3835 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3836 tx.sent_at = Some("old-time".to_string());
3837 repo.create(tx).await.unwrap();
3838
3839 let result = repo
3840 .record_stellar_insufficient_fee_retry(tx_id, "new-time".to_string())
3841 .await
3842 .unwrap();
3843
3844 assert_eq!(result.sent_at.as_deref(), Some("old-time"));
3846 assert!(result.metadata.is_none());
3847 }
3848
3849 #[tokio::test]
3850 #[ignore = "Requires active Redis instance"]
3851 async fn test_record_insufficient_fee_retry_not_found() {
3852 let _lock = ENV_MUTEX.lock().await;
3853 let repo = setup_test_repo().await;
3854
3855 let result = repo
3856 .record_stellar_insufficient_fee_retry(
3857 "nonexistent".to_string(),
3858 "2025-03-18T10:00:00Z".to_string(),
3859 )
3860 .await;
3861
3862 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3863 }
3864
3865 #[tokio::test]
3868 #[ignore = "Requires active Redis instance"]
3869 async fn test_record_try_again_later_retry() {
3870 let _lock = ENV_MUTEX.lock().await;
3871 let repo = setup_test_repo().await;
3872 let relayer_id = Uuid::new_v4().to_string();
3873 let tx_id = Uuid::new_v4().to_string();
3874 let mut tx =
3875 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3876 tx.sent_at = None;
3877 repo.create(tx).await.unwrap();
3878
3879 let updated = repo
3880 .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:00:00Z".to_string())
3881 .await
3882 .unwrap();
3883
3884 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
3885 let meta = updated.metadata.unwrap();
3886 assert_eq!(meta.try_again_later_retries, 1);
3887 assert_eq!(meta.consecutive_failures, 0);
3888 assert_eq!(meta.total_failures, 0);
3889 }
3890
3891 #[tokio::test]
3892 #[ignore = "Requires active Redis instance"]
3893 async fn test_record_try_again_later_retry_accumulates() {
3894 let _lock = ENV_MUTEX.lock().await;
3895 let repo = setup_test_repo().await;
3896 let relayer_id = Uuid::new_v4().to_string();
3897 let tx_id = Uuid::new_v4().to_string();
3898 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3899 repo.create(tx).await.unwrap();
3900
3901 repo.record_stellar_try_again_later_retry(
3902 tx_id.clone(),
3903 "2025-03-18T10:00:00Z".to_string(),
3904 )
3905 .await
3906 .unwrap();
3907
3908 let updated = repo
3909 .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:01:00Z".to_string())
3910 .await
3911 .unwrap();
3912
3913 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
3914 let meta = updated.metadata.unwrap();
3915 assert_eq!(meta.try_again_later_retries, 2);
3916 }
3917
3918 #[tokio::test]
3919 #[ignore = "Requires active Redis instance"]
3920 async fn test_record_try_again_later_retry_noop_on_final_state() {
3921 let _lock = ENV_MUTEX.lock().await;
3922 let repo = setup_test_repo().await;
3923 let relayer_id = Uuid::new_v4().to_string();
3924 let tx_id = Uuid::new_v4().to_string();
3925 let mut tx =
3926 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3927 tx.sent_at = Some("old-time".to_string());
3928 repo.create(tx).await.unwrap();
3929
3930 let result = repo
3931 .record_stellar_try_again_later_retry(tx_id, "new-time".to_string())
3932 .await
3933 .unwrap();
3934
3935 assert_eq!(result.sent_at.as_deref(), Some("old-time"));
3937 assert!(result.metadata.is_none());
3938 }
3939
3940 #[tokio::test]
3941 #[ignore = "Requires active Redis instance"]
3942 async fn test_record_try_again_later_retry_not_found() {
3943 let _lock = ENV_MUTEX.lock().await;
3944 let repo = setup_test_repo().await;
3945
3946 let result = repo
3947 .record_stellar_try_again_later_retry(
3948 "nonexistent".to_string(),
3949 "2025-03-18T10:00:00Z".to_string(),
3950 )
3951 .await;
3952
3953 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3954 }
3955
3956 #[tokio::test]
3959 #[ignore = "Requires active Redis instance"]
3960 async fn test_increment_failures_preserves_try_again_later_retries() {
3961 let _lock = ENV_MUTEX.lock().await;
3962 let repo = setup_test_repo().await;
3963 let relayer_id = Uuid::new_v4().to_string();
3964 let tx_id = Uuid::new_v4().to_string();
3965 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3966 repo.create(tx).await.unwrap();
3967
3968 repo.record_stellar_try_again_later_retry(
3970 tx_id.clone(),
3971 "2025-03-18T10:00:00Z".to_string(),
3972 )
3973 .await
3974 .unwrap();
3975
3976 let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3978
3979 let meta = updated.metadata.unwrap();
3980 assert_eq!(
3981 meta.try_again_later_retries, 1,
3982 "try_again_later_retries must survive increment_status_check_failures"
3983 );
3984 assert_eq!(meta.consecutive_failures, 1);
3985 assert_eq!(meta.total_failures, 1);
3986 }
3987
3988 #[tokio::test]
3989 #[ignore = "Requires active Redis instance"]
3990 async fn test_increment_failures_preserves_insufficient_fee_retries() {
3991 let _lock = ENV_MUTEX.lock().await;
3992 let repo = setup_test_repo().await;
3993 let relayer_id = Uuid::new_v4().to_string();
3994 let tx_id = Uuid::new_v4().to_string();
3995 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3996 repo.create(tx).await.unwrap();
3997
3998 repo.record_stellar_insufficient_fee_retry(
4000 tx_id.clone(),
4001 "2025-03-18T10:00:00Z".to_string(),
4002 )
4003 .await
4004 .unwrap();
4005
4006 let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
4008
4009 let meta = updated.metadata.unwrap();
4010 assert_eq!(
4011 meta.insufficient_fee_retries, 1,
4012 "insufficient_fee_retries must survive increment_status_check_failures"
4013 );
4014 assert_eq!(meta.consecutive_failures, 1);
4015 }
4016
4017 #[tokio::test]
4018 #[ignore = "Requires active Redis instance"]
4019 async fn test_reset_failures_preserves_retry_counters() {
4020 let _lock = ENV_MUTEX.lock().await;
4021 let repo = setup_test_repo().await;
4022 let relayer_id = Uuid::new_v4().to_string();
4023 let tx_id = Uuid::new_v4().to_string();
4024 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4025 repo.create(tx).await.unwrap();
4026
4027 repo.record_stellar_try_again_later_retry(
4029 tx_id.clone(),
4030 "2025-03-18T10:00:00Z".to_string(),
4031 )
4032 .await
4033 .unwrap();
4034 repo.record_stellar_insufficient_fee_retry(
4035 tx_id.clone(),
4036 "2025-03-18T10:01:00Z".to_string(),
4037 )
4038 .await
4039 .unwrap();
4040
4041 repo.increment_status_check_failures(tx_id.clone())
4043 .await
4044 .unwrap();
4045 let updated = repo
4046 .reset_status_check_consecutive_failures(tx_id)
4047 .await
4048 .unwrap();
4049
4050 let meta = updated.metadata.unwrap();
4051 assert_eq!(meta.consecutive_failures, 0);
4052 assert_eq!(meta.total_failures, 1);
4053 assert_eq!(
4054 meta.try_again_later_retries, 1,
4055 "try_again_later_retries must survive reset"
4056 );
4057 assert_eq!(
4058 meta.insufficient_fee_retries, 1,
4059 "insufficient_fee_retries must survive reset"
4060 );
4061 }
4062
4063 #[tokio::test]
4064 #[ignore = "Requires active Redis instance"]
4065 async fn test_fee_and_try_again_later_retries_independent() {
4066 let _lock = ENV_MUTEX.lock().await;
4067 let repo = setup_test_repo().await;
4068 let relayer_id = Uuid::new_v4().to_string();
4069 let tx_id = Uuid::new_v4().to_string();
4070 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4071 repo.create(tx).await.unwrap();
4072
4073 repo.record_stellar_try_again_later_retry(
4075 tx_id.clone(),
4076 "2025-03-18T10:00:00Z".to_string(),
4077 )
4078 .await
4079 .unwrap();
4080 repo.record_stellar_try_again_later_retry(
4081 tx_id.clone(),
4082 "2025-03-18T10:01:00Z".to_string(),
4083 )
4084 .await
4085 .unwrap();
4086
4087 let updated = repo
4089 .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:02:00Z".to_string())
4090 .await
4091 .unwrap();
4092
4093 let meta = updated.metadata.unwrap();
4094 assert_eq!(
4095 meta.try_again_later_retries, 2,
4096 "try_again_later_retries must survive insufficient_fee_retry"
4097 );
4098 assert_eq!(meta.insufficient_fee_retries, 1);
4099 }
4100}