From b86d1ab832166a5f5a70c017b0cd221adcdca867 Mon Sep 17 00:00:00 2001 From: Matthew Di Ferrante Date: Mon, 3 Mar 2025 18:51:15 +0100 Subject: [PATCH 1/2] use log --- worker/src/operations/heartbeat/service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/src/operations/heartbeat/service.rs b/worker/src/operations/heartbeat/service.rs index 4032c0a..5b3ded5 100644 --- a/worker/src/operations/heartbeat/service.rs +++ b/worker/src/operations/heartbeat/service.rs @@ -94,10 +94,10 @@ impl HeartbeatService { match Self::send_heartbeat(&client, state.get_endpoint().await, wallet_clone.clone(), docker_service.clone(), metrics_store.clone()).await { Ok(_) => { state.update_last_heartbeat().await; - Console::success("Synced with orchestrator"); // Updated message to reflect sync + log::info!("Synced with orchestrator"); // Updated message to reflect sync } Err(e) => { - Console::error(&format!("Failed to sync with orchestrator: {:?}", e)); // Updated error message + log::error!("{}", &format!("Failed to sync with orchestrator: {:?}", e)); // Updated error message } } } From bea279a5ad4d7fca811d0b802dcc886d6ce4cd6c Mon Sep 17 00:00:00 2001 From: Matthew Di Ferrante Date: Mon, 3 Mar 2025 21:43:35 +0100 Subject: [PATCH 2/2] add state persistence, check at most last 24h --- Cargo.lock | 34 +++++--- validator/Cargo.toml | 5 ++ validator/src/main.rs | 4 +- validator/src/validators/synthetic_data.rs | 97 +++++++++++++++++++++- 4 files changed, 125 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63e8a8e..bb60848 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -368,7 +368,7 @@ dependencies = [ "itoa", "serde", "serde_json", - "winnow", + "winnow 0.6.24", ] [[package]] @@ -799,7 +799,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1eda2711ab2e1fb517fc6e2ffa9728c9a232e296d16810810e6957b781a1b8bc" dependencies = [ "serde", - "winnow", + "winnow 0.6.24", ] [[package]] @@ -5719,9 +5719,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.19" +version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" +checksum = "cd87a5cdd6ffab733b2f74bc4fd7ee5fff6634124999ac278c35fc78c6120148" dependencies = [ "serde", "serde_spanned", @@ -5740,15 +5740,15 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.22.22" +version = "0.22.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" +checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" dependencies = [ "indexmap 2.7.0", "serde", "serde_spanned", "toml_datetime", - "winnow", + "winnow 0.7.3", ] [[package]] @@ -5975,6 +5975,7 @@ dependencies = [ "alloy", "anyhow", "clap 4.5.27", + "directories", "env_logger 0.11.6", "log", "nalgebra", @@ -5983,7 +5984,9 @@ dependencies = [ "serde", "serde_json", "shared", + "tempfile", "tokio", + "toml", "url", ] @@ -6571,6 +6574,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "winnow" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7f4ea97f6f78012141bcdb6a216b2609f0979ada50b20ca5b52dde2eac2bb1" +dependencies = [ + "memchr", +] + [[package]] name = "wit-bindgen-rt" version = "0.33.0" @@ -6750,7 +6762,7 @@ dependencies = [ "tracing", "uds_windows", "windows-sys 0.59.0", - "winnow", + "winnow 0.6.24", "xdg-home", "zbus_macros", "zbus_names", @@ -6780,7 +6792,7 @@ checksum = "856b7a38811f71846fd47856ceee8bccaec8399ff53fb370247e66081ace647b" dependencies = [ "serde", "static_assertions", - "winnow", + "winnow 0.6.24", "zvariant", ] @@ -6926,7 +6938,7 @@ dependencies = [ "enumflags2", "serde", "static_assertions", - "winnow", + "winnow 0.6.24", "zvariant_derive", "zvariant_utils", ] @@ -6955,5 +6967,5 @@ dependencies = [ "serde", "static_assertions", "syn 2.0.92", - "winnow", + "winnow 0.6.24", ] diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 5b22a7c..280c5a4 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -8,6 +8,7 @@ actix-web = "4.9.0" alloy = { version = "0.9.2", features = ["full"] } anyhow = "1.0.95" clap = { version = "4.5.26", features = ["derive"] } +directories = "6.0.0" env_logger = "0.11.6" log = "0.4.25" nalgebra = "0.33.2" @@ -17,4 +18,8 @@ serde = "1.0.217" serde_json = "1.0.135" shared = { path = "../shared" } tokio = "1.43.0" +toml = "0.8.20" url = "2.5.4" + +[dev-dependencies] +tempfile = "=3.14.0" diff --git a/validator/src/main.rs b/validator/src/main.rs index 27a8add..f73acc1 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -93,7 +93,7 @@ fn main() { let pool_id = args.pool_id.clone(); let mut synthetic_validator = match contracts.synthetic_data_validator.clone() { - Some(validator) => SyntheticDataValidator::new(pool_id.unwrap(), validator), + Some(validator) => SyntheticDataValidator::new(None, pool_id.unwrap(), validator), None => { error!("Synthetic data validator not found"); std::process::exit(1); @@ -178,7 +178,7 @@ fn main() { #[cfg(test)] mod tests { - + use actix_web::{test, App}; use actix_web::{ web::{self, post}, diff --git a/validator/src/validators/synthetic_data.rs b/validator/src/validators/synthetic_data.rs index 08f2ad4..d9e342e 100644 --- a/validator/src/validators/synthetic_data.rs +++ b/validator/src/validators/synthetic_data.rs @@ -1,14 +1,37 @@ use alloy::primitives::U256; use anyhow::{Context, Result}; +use directories::ProjectDirs; +use log::debug; use log::{error, info}; +use serde::{Deserialize, Serialize}; +use std::fs; +use std::path::Path; +use std::path::PathBuf; +use toml; use crate::validators::Validator; use shared::web3::contracts::implementations::work_validators::synthetic_data_validator::SyntheticDataWorkValidator; +fn get_default_state_dir() -> Option { + ProjectDirs::from("com", "prime", "validator") + .map(|proj_dirs| proj_dirs.data_local_dir().to_string_lossy().into_owned()) +} + +fn state_filename(pool_id: &str) -> String { + format!("work_state_{}.toml", pool_id) +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PersistedWorkState { + pool_id: U256, + last_validation_timestamp: U256, +} + pub struct SyntheticDataValidator { pool_id: U256, validator: SyntheticDataWorkValidator, last_validation_timestamp: U256, + state_dir: Option, } impl Validator for SyntheticDataValidator { @@ -20,16 +43,84 @@ impl Validator for SyntheticDataValidator { } impl SyntheticDataValidator { - pub fn new(pool_id_str: String, validator: SyntheticDataWorkValidator) -> Self { + pub fn new( + state_dir: Option, + pool_id_str: String, + validator: SyntheticDataWorkValidator, + ) -> Self { let pool_id = pool_id_str.parse::().expect("Invalid pool ID"); + let default_state_dir = get_default_state_dir(); + debug!("Default state dir: {:?}", default_state_dir); + let state_path = state_dir + .map(PathBuf::from) + .or_else(|| default_state_dir.map(PathBuf::from)); + debug!("State path: {:?}", state_path); + let mut last_validation_timestamp: Option<_> = None; + + // Try to load state, log info if creating new file + if let Some(path) = &state_path { + let state_file = path.join(state_filename(&pool_id.to_string())); + if !state_file.exists() { + debug!( + "No state file found at {:?}, will create on first state change", + state_file + ); + } else if let Ok(Some(loaded_state)) = + SyntheticDataValidator::load_state(path, &pool_id.to_string()) + { + debug!("Loaded previous state from {:?}", state_file); + last_validation_timestamp = Some(loaded_state.last_validation_timestamp); + } else { + debug!("Failed to load state from {:?}", state_file); + } + } + + // if no last time, set it to 24 hours ago, as nothing before that can be invalidated + if last_validation_timestamp.is_none() { + last_validation_timestamp = Some(U256::from( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Failed to get current timestamp") + .as_secs() + .saturating_sub(24 * 60 * 60), + )); + } Self { pool_id, validator, - last_validation_timestamp: U256::from(0), + last_validation_timestamp: last_validation_timestamp.unwrap(), + state_dir: state_path.clone(), } } + fn save_state(&self) -> Result<()> { + // Get values without block_on + let state = PersistedWorkState { + pool_id: self.pool_id, + last_validation_timestamp: self.last_validation_timestamp, + }; + + if let Some(ref state_dir) = self.state_dir { + fs::create_dir_all(state_dir)?; + let state_path = state_dir.join(state_filename(&self.pool_id.to_string())); + let toml = toml::to_string_pretty(&state)?; + fs::write(&state_path, toml)?; + debug!("Saved state to {:?}", state_path); + } + Ok(()) + } + + fn load_state(state_dir: &Path, pool_id: &str) -> Result> { + let state_path = state_dir.join(state_filename(pool_id)); + if state_path.exists() { + let contents = fs::read_to_string(state_path)?; + let state: PersistedWorkState = toml::from_str(&contents)?; + return Ok(Some(state)); + } + Ok(None) + } + pub async fn validate_work(&mut self) -> Result<()> { info!("Validating work for pool ID: {:?}", self.pool_id); @@ -70,6 +161,8 @@ impl SyntheticDataValidator { .as_secs(), ); + self.save_state()?; + Ok(()) } }