diff --git a/chain-signatures/Cargo.lock b/chain-signatures/Cargo.lock index fecf0a24..90bf3f84 100644 --- a/chain-signatures/Cargo.lock +++ b/chain-signatures/Cargo.lock @@ -1310,7 +1310,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" dependencies = [ "bytes", + "futures-core", "memchr", + "pin-project-lite", + "tokio", + "tokio-util", ] [[package]] @@ -1601,6 +1605,36 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "deadpool" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed" +dependencies = [ + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-redis" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfae6799b68a735270e4344ee3e834365f707c72da09c9a8bb89b45cc3351395" +dependencies = [ + "deadpool", + "redis", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" +dependencies = [ + "tokio", +] + [[package]] name = "debugid" version = "0.7.3" @@ -3190,6 +3224,7 @@ dependencies = [ "chrono", "clap", "crypto-shared", + "deadpool-redis", "google-datastore1", "google-secretmanager1", "hex", @@ -4131,6 +4166,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.9", + "libc", +] + [[package]] name = "num_threads" version = "0.1.7" @@ -4702,13 +4747,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc6baebe319ef5e4b470f248335620098d1c2e9261e995be05f56f719ca4bdb2" dependencies = [ "arc-swap", + "async-trait", + "bytes", "combine", + "futures-util", "itoa", "num-bigint 0.4.6", "percent-encoding 2.3.1", + "pin-project-lite", "ryu", "sha1_smol", "socket2 0.5.7", + "tokio", + "tokio-util", "url 2.5.2", ] diff --git a/chain-signatures/node/Cargo.toml b/chain-signatures/node/Cargo.toml index cdd8ce22..81ca8408 100644 --- a/chain-signatures/node/Cargo.toml +++ b/chain-signatures/node/Cargo.toml @@ -61,4 +61,5 @@ http = "1.1.0" prometheus = { version = "0.13.3" } once_cell = "1.13.1" redis = "0.27.2" +deadpool-redis = "0.18.0" sysinfo = "0.32.0" diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 04569f46..85042f6a 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -5,6 +5,7 @@ use crate::storage::presignature_storage::LockPresignatureRedisStorage; use crate::storage::triple_storage::LockTripleRedisStorage; use crate::{http_client, indexer, mesh, storage, web}; use clap::Parser; +use deadpool_redis::Runtime; use local_ip_address::local_ip; use near_account_id::AccountId; use near_crypto::{InMemorySigner, SecretKey}; @@ -210,12 +211,15 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { let redis_url: Url = Url::parse(storage_options.redis_url.as_str())?; + let redis_cfg = deadpool_redis::Config::from_url(redis_url); + let redis_pool = redis_cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); + let triple_storage: LockTripleRedisStorage = Arc::new(RwLock::new( - storage::triple_storage::init(redis_url.clone(), &account_id), + storage::triple_storage::init(redis_pool.clone(), &account_id), )); let presignature_storage: LockPresignatureRedisStorage = Arc::new(RwLock::new( - storage::presignature_storage::init(redis_url, &account_id), + storage::presignature_storage::init(redis_pool.clone(), &account_id), )); let sign_sk = sign_sk.unwrap_or_else(|| account_sk.clone()); diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index 793fcb20..af070acb 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -361,14 +361,14 @@ impl ConsensusProtocol for WaitingForConsensusState { // Clear triples from storage before starting the new epoch. This is necessary if the node has accumulated // triples from previous epochs. If it was not able to clear the previous triples, we'll leave them as-is - if let Err(err) = ctx.triple_storage().write().await.clear() { + if let Err(err) = ctx.triple_storage().write().await.clear().await { tracing::error!( ?err, "failed to clear triples from storage on new epoch start" ); } - if let Err(err) = ctx.presignature_storage().write().await.clear() { + if let Err(err) = ctx.presignature_storage().write().await.clear().await { tracing::error!( ?err, "failed to clear presignatures from storage on new epoch start" diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 1b9828bf..c0238a3a 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -189,7 +189,13 @@ impl PresignatureManager { tracing::info!(id = ?presignature.id, "inserting presignature"); // Remove from taken list if it was there self.gc.remove(&presignature.id); - if let Err(e) = self.presignature_storage.write().await.insert(presignature) { + if let Err(e) = self + .presignature_storage + .write() + .await + .insert(presignature) + .await + { tracing::error!(?e, "failed to insert presignature"); } } @@ -203,6 +209,7 @@ impl PresignatureManager { .write() .await .insert_mine(presignature) + .await { tracing::error!(?e, "failed to insert mine presignature"); } @@ -214,6 +221,7 @@ impl PresignatureManager { .write() .await .contains(id) + .await .map_err(|e| { tracing::warn!(?e, "failed to check if presignature exist"); }) @@ -226,6 +234,7 @@ impl PresignatureManager { .write() .await .contains_mine(id) + .await .map_err(|e| { tracing::warn!(?e, "failed to check if mine presignature exist"); }) @@ -233,15 +242,16 @@ impl PresignatureManager { } pub async fn take(&mut self, id: PresignatureId) -> Result { - if let Some(presignature) = - self.presignature_storage - .write() - .await - .take(&id) - .map_err(|e| { - tracing::error!(?e, "failed to look for presignature"); - GenerationError::PresignatureIsMissing(id) - })? + if let Some(presignature) = self + .presignature_storage + .write() + .await + .take(&id) + .await + .map_err(|e| { + tracing::error!(?e, "failed to look for presignature"); + GenerationError::PresignatureIsMissing(id) + })? { self.gc.insert(id, Instant::now()); tracing::info!(id, "took presignature"); @@ -266,6 +276,7 @@ impl PresignatureManager { .write() .await .take_mine() + .await .map_err(|e| { tracing::error!(?e, "failed to look for mine presignature"); }) @@ -283,6 +294,7 @@ impl PresignatureManager { .write() .await .count_all() + .await .map_err(|e| { tracing::error!(?e, "failed to count all presignatures"); }) @@ -295,6 +307,7 @@ impl PresignatureManager { .write() .await .count_mine() + .await .map_err(|e| { tracing::error!(?e, "failed to count mine presignatures"); }) diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index 95c42db6..dece6b98 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -147,7 +147,7 @@ impl TripleManager { pub async fn insert(&mut self, triple: Triple) { tracing::info!(id = triple.id, "inserting triple"); self.gc.remove(&triple.id); - if let Err(e) = self.triple_storage.write().await.insert(triple) { + if let Err(e) = self.triple_storage.write().await.insert(triple).await { tracing::warn!(?e, "failed to insert triple"); } } @@ -155,7 +155,7 @@ impl TripleManager { pub async fn insert_mine(&mut self, triple: Triple) { tracing::debug!(id = triple.id, "inserting mine triple"); self.gc.remove(&triple.id); - if let Err(e) = self.triple_storage.write().await.insert_mine(triple) { + if let Err(e) = self.triple_storage.write().await.insert_mine(triple).await { tracing::warn!(?e, "failed to insert mine triple"); } } @@ -165,6 +165,7 @@ impl TripleManager { .write() .await .contains(id) + .await .map_err(|e| tracing::warn!(?e, "failed to check if triple exists")) .unwrap_or(false) } @@ -174,6 +175,7 @@ impl TripleManager { .write() .await .contains_mine(id) + .await .map_err(|e| tracing::warn!(?e, "failed to check if mine triple exists")) .unwrap_or(false) } @@ -187,7 +189,7 @@ impl TripleManager { id0: TripleId, id1: TripleId, ) -> Result<(Triple, Triple), GenerationError> { - let triple_0 = match self.triple_storage.write().await.take(&id0) { + let triple_0 = match self.triple_storage.write().await.take(&id0).await { Ok(Some(triple)) => triple, Ok(None) => { if self.generators.contains_key(&id0) { @@ -207,10 +209,10 @@ impl TripleManager { } }; - let triple_1 = match self.triple_storage.write().await.take(&id1) { + let triple_1 = match self.triple_storage.write().await.take(&id1).await { Ok(Some(triple)) => triple, Ok(None) => { - if let Err(e) = self.triple_storage.write().await.insert(triple_0) { + if let Err(e) = self.triple_storage.write().await.insert(triple_0).await { tracing::warn!(id0, ?e, "failed to insert triple back"); } if self.generators.contains_key(&id1) { @@ -226,7 +228,7 @@ impl TripleManager { } Err(e) => { tracing::warn!(id1, ?e, "failed to take triple"); - if let Err(e) = self.triple_storage.write().await.insert(triple_0) { + if let Err(e) = self.triple_storage.write().await.insert(triple_0).await { tracing::warn!(id0, ?e, "failed to insert triple back"); } return Err(GenerationError::TripleIsMissing(id1)); @@ -249,7 +251,7 @@ impl TripleManager { tracing::warn!("not enough mine triples"); return None; } - let triple_0 = match self.triple_storage.write().await.take_mine() { + let triple_0 = match self.triple_storage.write().await.take_mine().await { Ok(Some(triple)) => triple, Ok(None) => { tracing::warn!("no mine triple left"); @@ -261,10 +263,16 @@ impl TripleManager { } }; - let triple_1 = match self.triple_storage.write().await.take_mine() { + let triple_1 = match self.triple_storage.write().await.take_mine().await { Ok(Some(triple)) => triple, Ok(None) => { - if let Err(e) = self.triple_storage.write().await.insert_mine(triple_0) { + if let Err(e) = self + .triple_storage + .write() + .await + .insert_mine(triple_0) + .await + { tracing::warn!(?e, "failed to insert mine triple back"); } tracing::warn!("no mine triple left"); @@ -272,7 +280,13 @@ impl TripleManager { } Err(e) => { tracing::warn!(?e, "failed to take mine triple"); - if let Err(e) = self.triple_storage.write().await.insert_mine(triple_0) { + if let Err(e) = self + .triple_storage + .write() + .await + .insert_mine(triple_0) + .await + { tracing::warn!(?e, "failed to insert mine triple back"); } return None; @@ -289,12 +303,22 @@ impl TripleManager { /// Returns the number of unspent triples available in the manager. pub async fn count_all(&self) -> usize { - self.triple_storage.write().await.count_all().unwrap_or(0) + self.triple_storage + .write() + .await + .count_all() + .await + .unwrap_or(0) } /// Returns the number of unspent triples assigned to this node. pub async fn count_mine(&self) -> usize { - self.triple_storage.write().await.count_mine().unwrap_or(0) + self.triple_storage + .write() + .await + .count_mine() + .await + .unwrap_or(0) } /// Returns if there's any unspent triple in the manager. diff --git a/chain-signatures/node/src/storage/presignature_storage.rs b/chain-signatures/node/src/storage/presignature_storage.rs index c820167b..e955dacc 100644 --- a/chain-signatures/node/src/storage/presignature_storage.rs +++ b/chain-signatures/node/src/storage/presignature_storage.rs @@ -1,10 +1,10 @@ use std::sync::Arc; use anyhow::Ok; +use deadpool_redis::Pool; use near_sdk::AccountId; -use redis::{Commands, Connection, FromRedisValue, RedisWrite, ToRedisArgs}; +use redis::{AsyncCommands, FromRedisValue, RedisWrite, ToRedisArgs}; use tokio::sync::RwLock; -use url::Url; use crate::protocol::presignature::{Presignature, PresignatureId}; @@ -14,98 +14,99 @@ pub type LockPresignatureRedisStorage = Arc>; // Can be used to "clear" redis storage in case of a breaking change const PRESIGNATURE_STORAGE_VERSION: &str = "v1"; -pub fn init(redis_url: Url, node_account_id: &AccountId) -> PresignatureRedisStorage { - PresignatureRedisStorage::new(redis_url, node_account_id) +pub fn init(redis_pool: Pool, node_account_id: &AccountId) -> PresignatureRedisStorage { + PresignatureRedisStorage { + redis_pool, + node_account_id: node_account_id.clone(), + } } pub struct PresignatureRedisStorage { - redis_connection: Connection, + redis_pool: Pool, node_account_id: AccountId, } impl PresignatureRedisStorage { - fn new(redis_url: Url, node_account_id: &AccountId) -> Self { - Self { - redis_connection: redis::Client::open(redis_url.as_str()) - .expect("Failed to connect to Redis") - .get_connection() - .expect("Failed to get Redis connection"), - node_account_id: node_account_id.clone(), - } - } -} - -impl PresignatureRedisStorage { - pub fn insert(&mut self, presignature: Presignature) -> PresigResult<()> { - self.redis_connection + pub async fn insert(&mut self, presignature: Presignature) -> PresigResult<()> { + let mut connection = self.redis_pool.get().await?; + connection .hset::<&str, PresignatureId, Presignature, ()>( - &self.presignature_key(), + &self.presig_key(), presignature.id, presignature, - )?; + ) + .await?; Ok(()) } - pub fn insert_mine(&mut self, presignature: Presignature) -> PresigResult<()> { - self.redis_connection - .sadd::<&str, PresignatureId, ()>(&self.mine_key(), presignature.id)?; - self.insert(presignature)?; + pub async fn insert_mine(&mut self, presignature: Presignature) -> PresigResult<()> { + let mut connection = self.redis_pool.get().await?; + connection + .sadd::<&str, PresignatureId, ()>(&self.mine_key(), presignature.id) + .await?; + self.insert(presignature).await?; Ok(()) } - pub fn contains(&mut self, id: &PresignatureId) -> PresigResult { - let result: bool = self.redis_connection.hexists(self.presignature_key(), id)?; + pub async fn contains(&mut self, id: &PresignatureId) -> PresigResult { + let mut connection = self.redis_pool.get().await?; + let result: bool = connection.hexists(self.presig_key(), id).await?; Ok(result) } - pub fn contains_mine(&mut self, id: &PresignatureId) -> PresigResult { - let result: bool = self.redis_connection.sismember(self.mine_key(), id)?; + pub async fn contains_mine(&mut self, id: &PresignatureId) -> PresigResult { + let mut connection = self.redis_pool.get().await?; + let result: bool = connection.sismember(self.mine_key(), id).await?; Ok(result) } - pub fn take(&mut self, id: &PresignatureId) -> PresigResult> { - if self.contains_mine(id)? { + pub async fn take(&mut self, id: &PresignatureId) -> PresigResult> { + let mut connection = self.redis_pool.get().await?; + if self.contains_mine(id).await? { tracing::error!("Can not take mine presignature as foreign: {:?}", id); return Ok(None); } - let result: Option = - self.redis_connection.hget(self.presignature_key(), id)?; + let result: Option = connection.hget(self.presig_key(), id).await?; match result { Some(presignature) => { - self.redis_connection - .hdel::<&str, PresignatureId, ()>(&self.presignature_key(), *id)?; + connection + .hdel::<&str, PresignatureId, ()>(&self.presig_key(), *id) + .await?; Ok(Some(presignature)) } None => Ok(None), } } - pub fn take_mine(&mut self) -> PresigResult> { - let id: Option = self.redis_connection.spop(self.mine_key())?; + pub async fn take_mine(&mut self) -> PresigResult> { + let mut connection = self.redis_pool.get().await?; + let id: Option = connection.spop(self.mine_key()).await?; match id { - Some(id) => self.take(&id), + Some(id) => self.take(&id).await, None => Ok(None), } } - pub fn count_all(&mut self) -> PresigResult { - let result: usize = self.redis_connection.hlen(self.presignature_key())?; + pub async fn count_all(&mut self) -> PresigResult { + let mut connection = self.redis_pool.get().await?; + let result: usize = connection.hlen(self.presig_key()).await?; Ok(result) } - pub fn count_mine(&mut self) -> PresigResult { - let result: usize = self.redis_connection.scard(self.mine_key())?; + pub async fn count_mine(&mut self) -> PresigResult { + let mut connection = self.redis_pool.get().await?; + let result: usize = connection.scard(self.mine_key()).await?; Ok(result) } - pub fn clear(&mut self) -> PresigResult<()> { - self.redis_connection - .del::<&str, ()>(&self.presignature_key())?; - self.redis_connection.del::<&str, ()>(&self.mine_key())?; + pub async fn clear(&mut self) -> PresigResult<()> { + let mut connection = self.redis_pool.get().await?; + connection.del::<&str, ()>(&self.presig_key()).await?; + connection.del::<&str, ()>(&self.mine_key()).await?; Ok(()) } - fn presignature_key(&self) -> String { + fn presig_key(&self) -> String { format!( "presignatures:{}:{}", PRESIGNATURE_STORAGE_VERSION, self.node_account_id diff --git a/chain-signatures/node/src/storage/triple_storage.rs b/chain-signatures/node/src/storage/triple_storage.rs index f24039de..98ef1418 100644 --- a/chain-signatures/node/src/storage/triple_storage.rs +++ b/chain-signatures/node/src/storage/triple_storage.rs @@ -1,11 +1,11 @@ use crate::protocol::triple::{Triple, TripleId}; use std::sync::Arc; -use redis::{Commands, Connection, FromRedisValue, RedisWrite, ToRedisArgs}; +use deadpool_redis::Pool; +use redis::{AsyncCommands, FromRedisValue, RedisWrite, ToRedisArgs}; use tokio::sync::RwLock; use near_account_id::AccountId; -use url::Url; pub type LockTripleRedisStorage = Arc>; type TripleResult = std::result::Result; @@ -13,89 +13,88 @@ type TripleResult = std::result::Result; // Can be used to "clear" redis storage in case of a breaking change const TRIPLE_STORAGE_VERSION: &str = "v1"; -pub fn init(redis_url: Url, account_id: &AccountId) -> TripleRedisStorage { - TripleRedisStorage::new(redis_url, account_id) +pub fn init(redis_pool: Pool, account_id: &AccountId) -> TripleRedisStorage { + TripleRedisStorage { + redis_pool, + node_account_id: account_id.clone(), + } } pub struct TripleRedisStorage { - redis_connection: Connection, + redis_pool: Pool, node_account_id: AccountId, } impl TripleRedisStorage { - fn new(redis_url: Url, account_id: &AccountId) -> Self { - Self { - redis_connection: redis::Client::open(redis_url.as_str()) - .expect("Failed to connect to Redis") - .get_connection() - .expect("Failed to get Redis connection"), - node_account_id: account_id.clone(), - } - } - - pub fn insert(&mut self, triple: Triple) -> TripleResult<()> { - self.redis_connection.hset::<&str, TripleId, Triple, ()>( - &self.triple_key(), - triple.id, - triple, - )?; + pub async fn insert(&mut self, triple: Triple) -> TripleResult<()> { + let mut conn = self.redis_pool.get().await?; + conn.hset::<&str, TripleId, Triple, ()>(&self.triple_key(), triple.id, triple) + .await?; Ok(()) } - pub fn insert_mine(&mut self, triple: Triple) -> TripleResult<()> { - self.redis_connection - .sadd::<&str, TripleId, ()>(&self.mine_key(), triple.id)?; - self.insert(triple)?; + pub async fn insert_mine(&mut self, triple: Triple) -> TripleResult<()> { + let mut conn = self.redis_pool.get().await?; + conn.sadd::<&str, TripleId, ()>(&self.mine_key(), triple.id) + .await?; + self.insert(triple).await?; Ok(()) } - pub fn contains(&mut self, id: &TripleId) -> TripleResult { - let result: bool = self.redis_connection.hexists(self.triple_key(), id)?; + pub async fn contains(&mut self, id: &TripleId) -> TripleResult { + let mut conn = self.redis_pool.get().await?; + let result: bool = conn.hexists(self.triple_key(), id).await?; Ok(result) } - pub fn contains_mine(&mut self, id: &TripleId) -> TripleResult { - let result: bool = self.redis_connection.sismember(self.mine_key(), id)?; + pub async fn contains_mine(&mut self, id: &TripleId) -> TripleResult { + let mut conn = self.redis_pool.get().await?; + let result: bool = conn.sismember(self.mine_key(), id).await?; Ok(result) } - pub fn take(&mut self, id: &TripleId) -> TripleResult> { - if self.contains_mine(id)? { + pub async fn take(&mut self, id: &TripleId) -> TripleResult> { + let mut conn = self.redis_pool.get().await?; + if self.contains_mine(id).await? { tracing::error!("Can not take mine triple as foreign: {:?}", id); return Ok(None); } - let result: Option = self.redis_connection.hget(self.triple_key(), id)?; + let result: Option = conn.hget(self.triple_key(), id).await?; match result { Some(triple) => { - self.redis_connection - .hdel::<&str, TripleId, ()>(&self.triple_key(), *id)?; + conn.hdel::<&str, TripleId, ()>(&self.triple_key(), *id) + .await?; Ok(Some(triple)) } None => Ok(None), } } - pub fn take_mine(&mut self) -> TripleResult> { - let id: Option = self.redis_connection.spop(self.mine_key())?; + pub async fn take_mine(&mut self) -> TripleResult> { + let mut conn = self.redis_pool.get().await?; + let id: Option = conn.spop(self.mine_key()).await?; match id { - Some(id) => self.take(&id), + Some(id) => self.take(&id).await, None => Ok(None), } } - pub fn count_all(&mut self) -> TripleResult { - let result: usize = self.redis_connection.hlen(self.triple_key())?; + pub async fn count_all(&mut self) -> TripleResult { + let mut conn = self.redis_pool.get().await?; + let result: usize = conn.hlen(self.triple_key()).await?; Ok(result) } - pub fn count_mine(&mut self) -> TripleResult { - let result: usize = self.redis_connection.scard(self.mine_key())?; + pub async fn count_mine(&mut self) -> TripleResult { + let mut conn = self.redis_pool.get().await?; + let result: usize = conn.scard(self.mine_key()).await?; Ok(result) } - pub fn clear(&mut self) -> TripleResult<()> { - self.redis_connection.del::<&str, ()>(&self.triple_key())?; - self.redis_connection.del::<&str, ()>(&self.mine_key())?; + pub async fn clear(&mut self) -> TripleResult<()> { + let mut conn = self.redis_pool.get().await?; + conn.del::<&str, ()>(&self.triple_key()).await?; + conn.del::<&str, ()>(&self.mine_key()).await?; Ok(()) } diff --git a/integration-tests/chain-signatures/Cargo.lock b/integration-tests/chain-signatures/Cargo.lock index 6f2d03ed..67e02718 100644 --- a/integration-tests/chain-signatures/Cargo.lock +++ b/integration-tests/chain-signatures/Cargo.lock @@ -377,9 +377,9 @@ checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" [[package]] name = "async-trait" -version = "0.1.80" +version = "0.1.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" +checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", @@ -1397,7 +1397,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" dependencies = [ "bytes", + "futures-core", "memchr", + "pin-project-lite", + "tokio", + "tokio-util", ] [[package]] @@ -1713,6 +1717,36 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "deadpool" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed" +dependencies = [ + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-redis" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfae6799b68a735270e4344ee3e834365f707c72da09c9a8bb89b45cc3351395" +dependencies = [ + "deadpool", + "redis", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" +dependencies = [ + "tokio", +] + [[package]] name = "debugid" version = "0.7.3" @@ -3284,6 +3318,7 @@ dependencies = [ "cait-sith", "clap", "crypto-shared", + "deadpool-redis", "ecdsa 0.16.9", "elliptic-curve 0.13.8", "ethers-core", @@ -3715,6 +3750,7 @@ dependencies = [ "chrono", "clap", "crypto-shared", + "deadpool-redis", "google-datastore1", "google-secretmanager1", "hex", @@ -5387,13 +5423,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc6baebe319ef5e4b470f248335620098d1c2e9261e995be05f56f719ca4bdb2" dependencies = [ "arc-swap", + "async-trait", + "bytes", "combine", + "futures-util", "itoa", "num-bigint 0.4.6", "percent-encoding 2.3.1", + "pin-project-lite", "ryu", "sha1_smol", "socket2 0.5.7", + "tokio", + "tokio-util", "url 2.5.1", ] diff --git a/integration-tests/chain-signatures/Cargo.toml b/integration-tests/chain-signatures/Cargo.toml index 0712fba4..1b80084b 100644 --- a/integration-tests/chain-signatures/Cargo.toml +++ b/integration-tests/chain-signatures/Cargo.toml @@ -28,6 +28,7 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } thiserror = "1" url = { version = "2.4.0", features = ["serde"] } +deadpool-redis = "0.18.0" # crypto dependencies cait-sith = { git = "https://github.com/LIT-Protocol/cait-sith.git", features = [ diff --git a/integration-tests/chain-signatures/src/lib.rs b/integration-tests/chain-signatures/src/lib.rs index 328061cf..028f203a 100644 --- a/integration-tests/chain-signatures/src/lib.rs +++ b/integration-tests/chain-signatures/src/lib.rs @@ -3,6 +3,7 @@ pub mod execute; pub mod local; pub mod utils; +use deadpool_redis::Pool; use std::collections::HashMap; use self::local::NodeConfig; @@ -25,7 +26,6 @@ use near_workspaces::types::{KeyType, SecretKey}; use near_workspaces::{Account, AccountId, Contract, Worker}; use serde_json::json; use testcontainers::{Container, GenericImage}; -use url::Url; const NETWORK: &str = "mpc_it_network"; @@ -158,10 +158,10 @@ impl Nodes<'_> { pub async fn triple_storage( &self, - redis_url: Url, + redis_pool: Pool, account_id: &AccountId, ) -> TripleRedisStorage { - storage::triple_storage::init(redis_url, account_id) + storage::triple_storage::init(redis_pool, account_id) } pub async fn gcp_services(&self) -> anyhow::Result> { diff --git a/integration-tests/chain-signatures/tests/cases/mod.rs b/integration-tests/chain-signatures/tests/cases/mod.rs index 36007a21..c3daac37 100644 --- a/integration-tests/chain-signatures/tests/cases/mod.rs +++ b/integration-tests/chain-signatures/tests/cases/mod.rs @@ -8,6 +8,7 @@ use cait_sith::protocol::Participant; use cait_sith::triples::{TriplePub, TripleShare}; use cait_sith::PresignOutput; use crypto_shared::{self, derive_epsilon, derive_key, x_coordinate, ScalarExt}; +use deadpool_redis::Runtime; use elliptic_curve::CurveArithmetic; use integration_tests_chain_signatures::containers::{self, DockerClient}; use integration_tests_chain_signatures::MultichainConfig; @@ -217,8 +218,10 @@ async fn test_triple_persistence() -> anyhow::Result<()> { docker_client.create_network(docker_network).await?; let redis = containers::Redis::run(&docker_client, docker_network).await?; let redis_url = Url::parse(redis.internal_address.as_str())?; + let redis_cfg = deadpool_redis::Config::from_url(redis_url); + let redis_pool = redis_cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); let triple_storage: storage::triple_storage::LockTripleRedisStorage = Arc::new(RwLock::new( - storage::triple_storage::init(redis_url, &AccountId::from_str("test.near").unwrap()), + storage::triple_storage::init(redis_pool.clone(), &AccountId::from_str("test.near").unwrap()), )); let mut triple_manager = TripleManager::new( @@ -304,8 +307,10 @@ async fn test_presignature_persistence() -> anyhow::Result<()> { docker_client.create_network(docker_network).await?; let redis = containers::Redis::run(&docker_client, docker_network).await?; let redis_url = Url::parse(redis.internal_address.as_str())?; + let redis_cfg = deadpool_redis::Config::from_url(redis_url); + let redis_pool = redis_cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); let presignature_storage: LockPresignatureRedisStorage = Arc::new(RwLock::new( - storage::presignature_storage::init(redis_url, &AccountId::from_str("test.near").unwrap()), + storage::presignature_storage::init(redis_pool.clone(), &AccountId::from_str("test.near").unwrap()), )); let mut presignature_manager = PresignatureManager::new( Participant::from(0),