Skip to content

Commit a9cd738

Browse files
authored
Merge pull request #133 from PrimeIntellect-ai/fix/optimize-validator
fix/optimize-validator
2 parents 29948a7 + bea279a commit a9cd738

File tree

5 files changed

+127
-17
lines changed

5 files changed

+127
-17
lines changed

Cargo.lock

+23-11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

validator/Cargo.toml

+5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ actix-web = "4.9.0"
88
alloy = { version = "0.9.2", features = ["full"] }
99
anyhow = "1.0.95"
1010
clap = { version = "4.5.26", features = ["derive"] }
11+
directories = "6.0.0"
1112
env_logger = "0.11.6"
1213
log = "0.4.25"
1314
nalgebra = "0.33.2"
@@ -17,4 +18,8 @@ serde = "1.0.217"
1718
serde_json = "1.0.135"
1819
shared = { path = "../shared" }
1920
tokio = "1.43.0"
21+
toml = "0.8.20"
2022
url = "2.5.4"
23+
24+
[dev-dependencies]
25+
tempfile = "=3.14.0"

validator/src/main.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ fn main() {
9393

9494
let pool_id = args.pool_id.clone();
9595
let mut synthetic_validator = match contracts.synthetic_data_validator.clone() {
96-
Some(validator) => SyntheticDataValidator::new(pool_id.unwrap(), validator),
96+
Some(validator) => SyntheticDataValidator::new(None, pool_id.unwrap(), validator),
9797
None => {
9898
error!("Synthetic data validator not found");
9999
std::process::exit(1);
@@ -178,7 +178,7 @@ fn main() {
178178

179179
#[cfg(test)]
180180
mod tests {
181-
181+
182182
use actix_web::{test, App};
183183
use actix_web::{
184184
web::{self, post},

validator/src/validators/synthetic_data.rs

+95-2
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,37 @@
11
use alloy::primitives::U256;
22
use anyhow::{Context, Result};
3+
use directories::ProjectDirs;
4+
use log::debug;
35
use log::{error, info};
6+
use serde::{Deserialize, Serialize};
7+
use std::fs;
8+
use std::path::Path;
9+
use std::path::PathBuf;
10+
use toml;
411

512
use crate::validators::Validator;
613
use shared::web3::contracts::implementations::work_validators::synthetic_data_validator::SyntheticDataWorkValidator;
714

15+
fn get_default_state_dir() -> Option<String> {
16+
ProjectDirs::from("com", "prime", "validator")
17+
.map(|proj_dirs| proj_dirs.data_local_dir().to_string_lossy().into_owned())
18+
}
19+
20+
fn state_filename(pool_id: &str) -> String {
21+
format!("work_state_{}.toml", pool_id)
22+
}
23+
24+
#[derive(Debug, Clone, Serialize, Deserialize)]
25+
struct PersistedWorkState {
26+
pool_id: U256,
27+
last_validation_timestamp: U256,
28+
}
29+
830
pub struct SyntheticDataValidator {
931
pool_id: U256,
1032
validator: SyntheticDataWorkValidator,
1133
last_validation_timestamp: U256,
34+
state_dir: Option<PathBuf>,
1235
}
1336

1437
impl Validator for SyntheticDataValidator {
@@ -20,16 +43,84 @@ impl Validator for SyntheticDataValidator {
2043
}
2144

2245
impl SyntheticDataValidator {
23-
pub fn new(pool_id_str: String, validator: SyntheticDataWorkValidator) -> Self {
46+
pub fn new(
47+
state_dir: Option<String>,
48+
pool_id_str: String,
49+
validator: SyntheticDataWorkValidator,
50+
) -> Self {
2451
let pool_id = pool_id_str.parse::<U256>().expect("Invalid pool ID");
52+
let default_state_dir = get_default_state_dir();
53+
debug!("Default state dir: {:?}", default_state_dir);
54+
let state_path = state_dir
55+
.map(PathBuf::from)
56+
.or_else(|| default_state_dir.map(PathBuf::from));
57+
debug!("State path: {:?}", state_path);
58+
let mut last_validation_timestamp: Option<_> = None;
59+
60+
// Try to load state, log info if creating new file
61+
if let Some(path) = &state_path {
62+
let state_file = path.join(state_filename(&pool_id.to_string()));
63+
if !state_file.exists() {
64+
debug!(
65+
"No state file found at {:?}, will create on first state change",
66+
state_file
67+
);
68+
} else if let Ok(Some(loaded_state)) =
69+
SyntheticDataValidator::load_state(path, &pool_id.to_string())
70+
{
71+
debug!("Loaded previous state from {:?}", state_file);
72+
last_validation_timestamp = Some(loaded_state.last_validation_timestamp);
73+
} else {
74+
debug!("Failed to load state from {:?}", state_file);
75+
}
76+
}
77+
78+
// if no last time, set it to 24 hours ago, as nothing before that can be invalidated
79+
if last_validation_timestamp.is_none() {
80+
last_validation_timestamp = Some(U256::from(
81+
std::time::SystemTime::now()
82+
.duration_since(std::time::UNIX_EPOCH)
83+
.expect("Failed to get current timestamp")
84+
.as_secs()
85+
.saturating_sub(24 * 60 * 60),
86+
));
87+
}
2588

2689
Self {
2790
pool_id,
2891
validator,
29-
last_validation_timestamp: U256::from(0),
92+
last_validation_timestamp: last_validation_timestamp.unwrap(),
93+
state_dir: state_path.clone(),
3094
}
3195
}
3296

97+
fn save_state(&self) -> Result<()> {
98+
// Get values without block_on
99+
let state = PersistedWorkState {
100+
pool_id: self.pool_id,
101+
last_validation_timestamp: self.last_validation_timestamp,
102+
};
103+
104+
if let Some(ref state_dir) = self.state_dir {
105+
fs::create_dir_all(state_dir)?;
106+
let state_path = state_dir.join(state_filename(&self.pool_id.to_string()));
107+
let toml = toml::to_string_pretty(&state)?;
108+
fs::write(&state_path, toml)?;
109+
debug!("Saved state to {:?}", state_path);
110+
}
111+
Ok(())
112+
}
113+
114+
fn load_state(state_dir: &Path, pool_id: &str) -> Result<Option<PersistedWorkState>> {
115+
let state_path = state_dir.join(state_filename(pool_id));
116+
if state_path.exists() {
117+
let contents = fs::read_to_string(state_path)?;
118+
let state: PersistedWorkState = toml::from_str(&contents)?;
119+
return Ok(Some(state));
120+
}
121+
Ok(None)
122+
}
123+
33124
pub async fn validate_work(&mut self) -> Result<()> {
34125
info!("Validating work for pool ID: {:?}", self.pool_id);
35126

@@ -70,6 +161,8 @@ impl SyntheticDataValidator {
70161
.as_secs(),
71162
);
72163

164+
self.save_state()?;
165+
73166
Ok(())
74167
}
75168
}

worker/src/operations/heartbeat/service.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,10 @@ impl HeartbeatService {
9494
match Self::send_heartbeat(&client, state.get_endpoint().await, wallet_clone.clone(), docker_service.clone(), metrics_store.clone()).await {
9595
Ok(_) => {
9696
state.update_last_heartbeat().await;
97-
Console::success("Synced with orchestrator"); // Updated message to reflect sync
97+
log::info!("Synced with orchestrator"); // Updated message to reflect sync
9898
}
9999
Err(e) => {
100-
Console::error(&format!("Failed to sync with orchestrator: {:?}", e)); // Updated error message
100+
log::error!("{}", &format!("Failed to sync with orchestrator: {:?}", e)); // Updated error message
101101
}
102102
}
103103
}

0 commit comments

Comments
 (0)