Skip to content

Commit

Permalink
use redis connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
volovyks committed Oct 29, 2024
1 parent f10bafc commit 41a46ce
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 124 deletions.
51 changes: 51 additions & 0 deletions chain-signatures/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain-signatures/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ http = "1.1.0"
prometheus = { version = "0.13.3" }
once_cell = "1.13.1"
redis = "0.27.2"
deadpool-redis = "0.18.0"
sysinfo = "0.32.0"
8 changes: 6 additions & 2 deletions chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::storage::presignature_storage::LockPresignatureRedisStorage;
use crate::storage::triple_storage::LockTripleRedisStorage;
use crate::{http_client, indexer, mesh, storage, web};
use clap::Parser;
use deadpool_redis::Runtime;
use local_ip_address::local_ip;
use near_account_id::AccountId;
use near_crypto::{InMemorySigner, SecretKey};
Expand Down Expand Up @@ -210,12 +211,15 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {

let redis_url: Url = Url::parse(storage_options.redis_url.as_str())?;

let redis_cfg = deadpool_redis::Config::from_url(redis_url);
let redis_pool = redis_cfg.create_pool(Some(Runtime::Tokio1)).unwrap();

let triple_storage: LockTripleRedisStorage = Arc::new(RwLock::new(
storage::triple_storage::init(redis_url.clone(), &account_id),
storage::triple_storage::init(redis_pool.clone(), &account_id),
));

let presignature_storage: LockPresignatureRedisStorage = Arc::new(RwLock::new(
storage::presignature_storage::init(redis_url, &account_id),
storage::presignature_storage::init(redis_pool.clone(), &account_id),
));

let sign_sk = sign_sk.unwrap_or_else(|| account_sk.clone());
Expand Down
4 changes: 2 additions & 2 deletions chain-signatures/node/src/protocol/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,14 +361,14 @@ impl ConsensusProtocol for WaitingForConsensusState {

// Clear triples from storage before starting the new epoch. This is necessary if the node has accumulated
// triples from previous epochs. If it was not able to clear the previous triples, we'll leave them as-is
if let Err(err) = ctx.triple_storage().write().await.clear() {
if let Err(err) = ctx.triple_storage().write().await.clear().await {
tracing::error!(
?err,
"failed to clear triples from storage on new epoch start"
);
}

if let Err(err) = ctx.presignature_storage().write().await.clear() {
if let Err(err) = ctx.presignature_storage().write().await.clear().await {
tracing::error!(
?err,
"failed to clear presignatures from storage on new epoch start"
Expand Down
33 changes: 23 additions & 10 deletions chain-signatures/node/src/protocol/presignature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,13 @@ impl PresignatureManager {
tracing::info!(id = ?presignature.id, "inserting presignature");
// Remove from taken list if it was there
self.gc.remove(&presignature.id);
if let Err(e) = self.presignature_storage.write().await.insert(presignature) {
if let Err(e) = self
.presignature_storage
.write()
.await
.insert(presignature)
.await
{
tracing::error!(?e, "failed to insert presignature");
}
}
Expand All @@ -203,6 +209,7 @@ impl PresignatureManager {
.write()
.await
.insert_mine(presignature)
.await
{
tracing::error!(?e, "failed to insert mine presignature");
}
Expand All @@ -214,6 +221,7 @@ impl PresignatureManager {
.write()
.await
.contains(id)
.await
.map_err(|e| {
tracing::warn!(?e, "failed to check if presignature exist");
})
Expand All @@ -226,22 +234,24 @@ impl PresignatureManager {
.write()
.await
.contains_mine(id)
.await
.map_err(|e| {
tracing::warn!(?e, "failed to check if mine presignature exist");
})
.unwrap_or(false)
}

pub async fn take(&mut self, id: PresignatureId) -> Result<Presignature, GenerationError> {
if let Some(presignature) =
self.presignature_storage
.write()
.await
.take(&id)
.map_err(|e| {
tracing::error!(?e, "failed to look for presignature");
GenerationError::PresignatureIsMissing(id)
})?
if let Some(presignature) = self
.presignature_storage
.write()
.await
.take(&id)
.await
.map_err(|e| {
tracing::error!(?e, "failed to look for presignature");
GenerationError::PresignatureIsMissing(id)
})?
{
self.gc.insert(id, Instant::now());
tracing::info!(id, "took presignature");
Expand All @@ -266,6 +276,7 @@ impl PresignatureManager {
.write()
.await
.take_mine()
.await
.map_err(|e| {
tracing::error!(?e, "failed to look for mine presignature");
})
Expand All @@ -283,6 +294,7 @@ impl PresignatureManager {
.write()
.await
.count_all()
.await
.map_err(|e| {
tracing::error!(?e, "failed to count all presignatures");
})
Expand All @@ -295,6 +307,7 @@ impl PresignatureManager {
.write()
.await
.count_mine()
.await
.map_err(|e| {
tracing::error!(?e, "failed to count mine presignatures");
})
Expand Down
48 changes: 36 additions & 12 deletions chain-signatures/node/src/protocol/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,15 @@ impl TripleManager {
pub async fn insert(&mut self, triple: Triple) {
tracing::info!(id = triple.id, "inserting triple");
self.gc.remove(&triple.id);
if let Err(e) = self.triple_storage.write().await.insert(triple) {
if let Err(e) = self.triple_storage.write().await.insert(triple).await {
tracing::warn!(?e, "failed to insert triple");
}
}

pub async fn insert_mine(&mut self, triple: Triple) {
tracing::debug!(id = triple.id, "inserting mine triple");
self.gc.remove(&triple.id);
if let Err(e) = self.triple_storage.write().await.insert_mine(triple) {
if let Err(e) = self.triple_storage.write().await.insert_mine(triple).await {
tracing::warn!(?e, "failed to insert mine triple");
}
}
Expand All @@ -165,6 +165,7 @@ impl TripleManager {
.write()
.await
.contains(id)
.await
.map_err(|e| tracing::warn!(?e, "failed to check if triple exists"))
.unwrap_or(false)
}
Expand All @@ -174,6 +175,7 @@ impl TripleManager {
.write()
.await
.contains_mine(id)
.await
.map_err(|e| tracing::warn!(?e, "failed to check if mine triple exists"))
.unwrap_or(false)
}
Expand All @@ -187,7 +189,7 @@ impl TripleManager {
id0: TripleId,
id1: TripleId,
) -> Result<(Triple, Triple), GenerationError> {
let triple_0 = match self.triple_storage.write().await.take(&id0) {
let triple_0 = match self.triple_storage.write().await.take(&id0).await {
Ok(Some(triple)) => triple,
Ok(None) => {
if self.generators.contains_key(&id0) {
Expand All @@ -207,10 +209,10 @@ impl TripleManager {
}
};

let triple_1 = match self.triple_storage.write().await.take(&id1) {
let triple_1 = match self.triple_storage.write().await.take(&id1).await {
Ok(Some(triple)) => triple,
Ok(None) => {
if let Err(e) = self.triple_storage.write().await.insert(triple_0) {
if let Err(e) = self.triple_storage.write().await.insert(triple_0).await {
tracing::warn!(id0, ?e, "failed to insert triple back");
}
if self.generators.contains_key(&id1) {
Expand All @@ -226,7 +228,7 @@ impl TripleManager {
}
Err(e) => {
tracing::warn!(id1, ?e, "failed to take triple");
if let Err(e) = self.triple_storage.write().await.insert(triple_0) {
if let Err(e) = self.triple_storage.write().await.insert(triple_0).await {
tracing::warn!(id0, ?e, "failed to insert triple back");
}
return Err(GenerationError::TripleIsMissing(id1));
Expand All @@ -249,7 +251,7 @@ impl TripleManager {
tracing::warn!("not enough mine triples");
return None;
}
let triple_0 = match self.triple_storage.write().await.take_mine() {
let triple_0 = match self.triple_storage.write().await.take_mine().await {
Ok(Some(triple)) => triple,
Ok(None) => {
tracing::warn!("no mine triple left");
Expand All @@ -261,18 +263,30 @@ impl TripleManager {
}
};

let triple_1 = match self.triple_storage.write().await.take_mine() {
let triple_1 = match self.triple_storage.write().await.take_mine().await {
Ok(Some(triple)) => triple,
Ok(None) => {
if let Err(e) = self.triple_storage.write().await.insert_mine(triple_0) {
if let Err(e) = self
.triple_storage
.write()
.await
.insert_mine(triple_0)
.await
{
tracing::warn!(?e, "failed to insert mine triple back");
}
tracing::warn!("no mine triple left");
return None;
}
Err(e) => {
tracing::warn!(?e, "failed to take mine triple");
if let Err(e) = self.triple_storage.write().await.insert_mine(triple_0) {
if let Err(e) = self
.triple_storage
.write()
.await
.insert_mine(triple_0)
.await
{
tracing::warn!(?e, "failed to insert mine triple back");
}
return None;
Expand All @@ -289,12 +303,22 @@ impl TripleManager {

/// Returns the number of unspent triples available in the manager.
pub async fn count_all(&self) -> usize {
self.triple_storage.write().await.count_all().unwrap_or(0)
self.triple_storage
.write()
.await
.count_all()
.await
.unwrap_or(0)
}

/// Returns the number of unspent triples assigned to this node.
pub async fn count_mine(&self) -> usize {
self.triple_storage.write().await.count_mine().unwrap_or(0)
self.triple_storage
.write()
.await
.count_mine()
.await
.unwrap_or(0)
}

/// Returns if there's any unspent triple in the manager.
Expand Down
Loading

0 comments on commit 41a46ce

Please sign in to comment.