Skip to content

Commit

Permalink
chore(boundary): make salt-fetching schedule strictly periodic (#4032)
Browse files Browse the repository at this point in the history
Co-authored-by: IDX GitLab Automation <idx@dfinity.org>
  • Loading branch information
nikolay-komarevskiy and sa-idx-admin authored Feb 21, 2025
1 parent e3d3eb8 commit a5d225c
Showing 1 changed file with 81 additions and 60 deletions.
141 changes: 81 additions & 60 deletions rs/boundary_node/ic_boundary/src/salt_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec,
IntGauge, Registry,
};
use salt_sharing_api::GetSaltResponse;
use salt_sharing_api::{GetSaltError, GetSaltResponse};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use tokio::time::{interval, MissedTickBehavior};
use tracing::warn;

const SERVICE: &str = "AnonymizationSaltFetcher";
Expand Down Expand Up @@ -54,7 +54,7 @@ impl Metrics {
fetches: register_int_counter_vec_with_registry!(
format!("{METRIC_PREFIX}_fetches"),
format!("Count of salt fetches and their outcome"),
&["result"],
&["status", "message"],
registry
)
.unwrap(),
Expand Down Expand Up @@ -86,70 +86,91 @@ impl AnonymizationSaltFetcher {
metrics: Metrics::new(registry),
}
}
}

#[async_trait]
impl Run for Arc<AnonymizationSaltFetcher> {
async fn run(&mut self) -> Result<(), Error> {
loop {
let query_response = match self
.agent
.execute_update(
&self.canister_id,
&self.canister_id,
"get_salt",
Encode!().unwrap(),
nonce(),
)
.await
{
Ok(response) => match response {
Some(response) => response,
None => {
warn!("{SERVICE}: got empty response from the canister");
continue;
}
},
Err(err) => {
warn!("{SERVICE}: failed to get salt from the canister: {err:#}");
continue;
}
};
async fn fetch_salt(&self) {
let update_fetch_metric = |status: &str, message: &str| {
self.metrics
.fetches
.with_label_values(&[status, message])
.inc();
};

let salt_response = match Decode!(&query_response, GetSaltResponse) {
Ok(response) => response,
Err(err) => {
warn!("{SERVICE}: failed to decode candid response: {err:?}");
continue;
let query_response = match self
.agent
.execute_update(
&self.canister_id,
&self.canister_id,
"get_salt",
Encode!().unwrap(),
nonce(),
)
.await
{
Ok(response) => match response {
Some(response) => response,
None => {
update_fetch_metric("failure", "empty_response");
warn!("{SERVICE}: got empty response from the canister");
return;
}
};
},
Err(err) => {
update_fetch_metric("failure", "update_call_failure");
warn!("{SERVICE}: failed to get salt from the canister: {err:#}");
return;
}
};

let status = if salt_response.is_ok() {
"success"
} else {
"failure"
};
self.metrics.fetches.with_label_values(&[status]).inc();
let salt_response = match Decode!(&query_response, GetSaltResponse) {
Ok(response) => response,
Err(err) => {
update_fetch_metric("failure", "response_decoding_failure");
warn!("{SERVICE}: failed to decode candid response: {err:?}");
return;
}
};

match salt_response {
Ok(resp) => {
// Overwrite salt used for hashing sensitive data
self.anonymization_salt.store(Some(Arc::new(resp.salt)));
// Update metrics
self.metrics.last_salt_id.set(resp.salt_id as i64);
self.metrics.last_successful_fetch.set(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
}
Err(err) => {
warn!("{SERVICE}: get_salt failed: {err:?}");
}
match salt_response {
Ok(resp) => {
update_fetch_metric("success", "");
// Overwrite salt (used for hashing sensitive data)
self.anonymization_salt.store(Some(Arc::new(resp.salt)));
// Update metrics
self.metrics.last_salt_id.set(resp.salt_id as i64);
self.metrics.last_successful_fetch.set(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
}
Err(err) => {
let message = match err {
GetSaltError::SaltNotInitialized => "salt_not_initialized",
GetSaltError::Unauthorized => "unauthorized",
GetSaltError::Internal(_) => "internal",
};
update_fetch_metric("failure", message);
warn!("{SERVICE}: get_salt failed: {err:?}");
}
}
}
}

sleep(self.polling_interval).await;
#[async_trait]
impl Run for Arc<AnonymizationSaltFetcher> {
async fn run(&mut self) -> Result<(), Error> {
// Create an interval to enable strictly periodic execution
let mut interval = interval(self.polling_interval);
// Skip missed ticks to prevent timing drift and maintain absolute schedule
// Example: with 5s interval, if fetch_salt() takes 7s at 0s:
// 0s: first execution starts
// 7s: first execution completes (5s tick was missed)
// 10s: next execution starts (skips to next absolute tick)
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
self.fetch_salt().await;
}
}
}

0 comments on commit a5d225c

Please sign in to comment.