From 44e73745ca52344726f084d4aed4a6d1d9eddab5 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Mon, 16 Jun 2025 01:33:49 -0500 Subject: [PATCH 1/5] Lazer exporter fixes --- Cargo.lock | 3 +- Cargo.toml | 3 +- src/agent/services/lazer_exporter.rs | 156 +++++++++++++++------------ 3 files changed, 94 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 56eb557..bd0dce2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3705,11 +3705,12 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "3.0.1" +version = "3.0.2" dependencies = [ "anyhow", "async-trait", "backoff", + "base64 0.22.1", "bincode 2.0.1", "bytemuck", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 12f93d4..0817199 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "3.0.1" +version = "3.0.2" edition = "2024" [[bin]] @@ -10,6 +10,7 @@ path = "src/bin/agent.rs" [dependencies] anyhow = "1.0.81" backoff = "0.4.0" +base64 = "0.22.1" ed25519-dalek = "2.1.1" serde = { version = "1.0.197", features = ["derive", "rc"] } async-trait = "0.1.79" diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index ab06572..b6a07f8 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -1,6 +1,7 @@ use { crate::agent::state, anyhow::{ + Context, Result, anyhow, bail, @@ -9,6 +10,11 @@ use { ExponentialBackoffBuilder, backoff::Backoff, }, + base64::{ + Engine, + prelude::BASE64_STANDARD, + }, + ed25519_dalek::SigningKey, futures_util::{ SinkExt, stream::{ @@ -25,6 +31,7 @@ use { Deserialize, Serialize, }, + solana_sdk::signature::keypair, std::{ path::PathBuf, sync::Arc, @@ -59,18 +66,23 @@ pub const RELAYER_CHANNEL_CAPACITY: usize = 1000; #[derive(Clone, Debug, Deserialize)] pub struct Config { - pub history_url: Url, - pub relayer_urls: Vec, - pub authorization_token: String, - pub publish_keypair_path: PathBuf, + pub history_url: Url, + pub relayer_urls: Vec, + pub publish_keypair_path: PathBuf, #[serde(with = "humantime_serde", default = "default_publish_interval")] - pub publish_interval_duration: Duration, + pub publish_interval_duration: Duration, + #[serde(with = "humantime_serde", default = "default_symbol_fetch_interval")] + pub symbol_fetch_interval_duration: Duration, } fn default_publish_interval() -> Duration { Duration::from_millis(200) } +fn default_symbol_fetch_interval() -> Duration { + Duration::from_secs(60 * 60) +} + struct RelayerWsSession { ws_sender: SplitSink>, TungsteniteMessage>, } @@ -248,17 +260,44 @@ async fn fetch_symbols(history_url: &Url) -> Result> { Ok(data) } +fn get_signing_key(config: &Config) -> Result { + // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher + let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) { + Ok(k) => k, + Err(e) => { + tracing::error!( + error = ?e, + publish_keypair_path = config.publish_keypair_path.display().to_string(), + "Reading publish keypair returned an error. ", + ); + bail!("Reading publish keypair returned an error. "); + } + }; + + SigningKey::from_keypair_bytes(&publish_keypair.to_bytes()) + .context("Failed to create signing key from keypair") +} + #[instrument(skip(config, state))] pub fn lazer_exporter(config: Config, state: Arc) -> Vec> { let mut handles = vec![]; + let signing_key = match get_signing_key(&config) { + Ok(signing_key) => signing_key, + Err(e) => { + // This is fatal as we can't publish without the key. + tracing::error!("failed to get Lazer signing key: {e:?}"); + panic!("failed to get Lazer signing key") + } + }; + // can safely drop first receiver for ease of iteration let (relayer_sender, _) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); for url in config.relayer_urls.iter() { let mut task = RelayerSessionTask { url: url.clone(), - token: config.authorization_token.to_owned(), + token: BASE64_STANDARD.encode(signing_key.verifying_key().to_bytes()), receiver: relayer_sender.subscribe(), }; handles.push(tokio::spawn(async move { task.run().await })); @@ -268,6 +307,7 @@ pub fn lazer_exporter(config: Config, state: Arc) -> Vec Result { - // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher - let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) { - Ok(k) => k, - Err(e) => { - tracing::error!( - error = ?e, - publish_keypair_path = config.publish_keypair_path.display().to_string(), - "Reading publish keypair returned an error. ", - ); - bail!("Reading publish keypair returned an error. "); - } - }; - - SigningKey::from_keypair_bytes(&publish_keypair.to_bytes()) - .context("Failed to create signing key from keypair") - } - pub async fn lazer_exporter( config: Config, state: Arc, relayer_sender: Sender, + signing_key: SigningKey, ) where S: LocalStore, S: Send + Sync + 'static, { - let signing_key = match get_signing_key(&config) { - Ok(signing_key) => signing_key, - Err(e) => { - tracing::error!("lazer_exporter signing key failure: {e:?}"); - return; - } - }; - - // TODO: Re-fetch on an interval? - let lazer_symbols: HashMap = - match fetch_symbols(&config.history_url).await { - Ok(symbols) => symbols - .into_iter() - .filter_map(|symbol| { - let hermes_id = symbol.hermes_id.clone()?; - match pyth_sdk::Identifier::from_hex(hermes_id.clone()) { - Ok(id) => Some((id, symbol)), - Err(e) => { - tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id); - None - } - } - }) - .collect(), - Err(e) => { - tracing::error!("Failed to fetch Lazer symbols: {e:?}"); - return; - } - }; - + let mut lazer_symbols = get_lazer_symbol_map(&config.history_url).await; let mut publish_interval = tokio::time::interval(config.publish_interval_duration); + let mut symbol_fetch_interval = + tokio::time::interval(config.symbol_fetch_interval_duration); loop { tokio::select! { + _ = symbol_fetch_interval.tick() => { + lazer_symbols = get_lazer_symbol_map(&config.history_url).await; + }, _ = publish_interval.tick() => { let publisher_timestamp = MessageField::some(Timestamp::now()); let mut publisher_update = PublisherUpdate { @@ -452,6 +446,30 @@ mod lazer_exporter { } } } + + async fn get_lazer_symbol_map( + history_url: &Url, + ) -> HashMap { + match fetch_symbols(history_url).await { + Ok(symbols) => symbols + .into_iter() + .filter_map(|symbol| { + let hermes_id = symbol.hermes_id.clone()?; + match pyth_sdk::Identifier::from_hex(hermes_id.clone()) { + Ok(id) => Some((id, symbol)), + Err(e) => { + tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id); + None + } + } + }) + .collect(), + Err(e) => { + tracing::error!("Failed to fetch Lazer symbols: {e:?}"); + HashMap::new() + } + } + } } #[cfg(test)] @@ -604,15 +622,21 @@ mod tests { let state = Arc::new(local::Store::new(&mut Registry::default())); let (relayer_sender, mut relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); let private_key_file = get_private_key_file(); + let private_key = get_private_key(); let config = Config { - history_url: Url::parse("http://127.0.0.1:12345").unwrap(), - relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()], - authorization_token: "token1".to_string(), - publish_keypair_path: PathBuf::from(private_key_file.path()), - publish_interval_duration: Duration::from_secs(1), + history_url: Url::parse("http://127.0.0.1:12345").unwrap(), + relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()], + publish_keypair_path: PathBuf::from(private_key_file.path()), + publish_interval_duration: Duration::from_secs(1), + symbol_fetch_interval_duration: Duration::from_secs(60 * 60), }; - tokio::spawn(lazer_exporter(config, state.clone(), relayer_sender)); + tokio::spawn(lazer_exporter( + config, + state.clone(), + relayer_sender, + private_key, + )); tokio::time::sleep(std::time::Duration::from_millis(2000)).await; match relayer_receiver.try_recv() { From a51fbd97c32d964b3911dee2e977bef143634253 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 18 Jun 2025 07:46:53 -0500 Subject: [PATCH 2/5] version bump --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd0dce2..f544279 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3705,7 +3705,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "3.0.2" +version = "3.0.3" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 0817199..48900f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "3.0.2" +version = "3.0.3" edition = "2024" [[bin]] From 28f71a53fda73013744d4f7723c2882598076da0 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 18 Jun 2025 16:11:09 -0500 Subject: [PATCH 3/5] logging fixes --- src/agent/services/lazer_exporter.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index b6a07f8..506ce8d 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -157,8 +157,8 @@ impl RelayerSessionTask { failure_count += 1; let next_backoff = backoff.next_backoff().unwrap_or(max_interval); - tracing::error!( - "relayer session failed with error: {:?}, failure_count: {}; retrying in {:?}", + tracing::warn!( + "relayer session ended with error: {:?}, failure_count: {}; retrying in {:?}", e, failure_count, next_backoff @@ -211,7 +211,7 @@ impl RelayerSessionTask { tracing::error!("Error receiving message from at relayer: {e:?}"); } None => { - tracing::error!("relayer connection closed"); + tracing::warn!("relayer connection closed"); bail!("relayer connection closed"); } } @@ -290,6 +290,8 @@ pub fn lazer_exporter(config: Config, state: Arc) -> Vec) -> Vec Date: Wed, 18 Jun 2025 16:47:41 -0500 Subject: [PATCH 4/5] history http/https fix, logging --- src/agent/services/lazer_exporter.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index 506ce8d..89502c0 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -252,7 +252,6 @@ struct SymbolResponse { async fn fetch_symbols(history_url: &Url) -> Result> { let mut url = history_url.clone(); - url.set_scheme("http").map_err(|_| anyhow!("invalid url"))?; url.set_path("/history/v1/symbols"); let client = Client::new(); let response = client.get(url).send().await?.error_for_status()?; @@ -369,6 +368,8 @@ mod lazer_exporter { S: Send + Sync + 'static, { let mut lazer_symbols = get_lazer_symbol_map(&config.history_url).await; + tracing::info!("Retrieved {} Lazer feeds with hermes symbols from symbols endpoint: {}", lazer_symbols.len(), &config.history_url); + let mut publish_interval = tokio::time::interval(config.publish_interval_duration); let mut symbol_fetch_interval = tokio::time::interval(config.symbol_fetch_interval_duration); From 657e6399cab4aa1bdd3262dfc0da7894b1ecb98c Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 18 Jun 2025 16:49:28 -0500 Subject: [PATCH 5/5] fmt --- src/agent/services/lazer_exporter.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index 89502c0..36ba07d 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -368,7 +368,11 @@ mod lazer_exporter { S: Send + Sync + 'static, { let mut lazer_symbols = get_lazer_symbol_map(&config.history_url).await; - tracing::info!("Retrieved {} Lazer feeds with hermes symbols from symbols endpoint: {}", lazer_symbols.len(), &config.history_url); + tracing::info!( + "Retrieved {} Lazer feeds with hermes symbols from symbols endpoint: {}", + lazer_symbols.len(), + &config.history_url + ); let mut publish_interval = tokio::time::interval(config.publish_interval_duration); let mut symbol_fetch_interval =