From 0975e88dcc4be1f2df27bb3cc6c2572d03cabe27 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Tue, 13 May 2025 01:06:47 -0500 Subject: [PATCH] fix: hermes id handling --- src/agent/services/lazer_exporter.rs | 36 ++++++++++++++++++---------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index 7e0c6c5..92b41f1 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -4,7 +4,6 @@ use { Result, anyhow, }, - ed25519_dalek::SecretKey, futures_util::{ SinkExt, stream::{ @@ -54,14 +53,6 @@ pub struct Config { pub publish_interval_duration: Duration, } -#[derive(Clone, Deserialize)] -struct PublisherSecretKey(SecretKey); -impl std::fmt::Debug for PublisherSecretKey { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "PublisherSecretKey(redacted)") - } -} - fn default_publish_interval() -> Duration { Duration::from_millis(200) } @@ -129,16 +120,27 @@ async fn connect_to_relayers( #[derive(Deserialize)] struct SymbolResponse { pub pyth_lazer_id: u32, + #[serde(rename = "name")] pub _name: String, + #[serde(rename = "symbol")] pub _symbol: String, + #[serde(rename = "description")] pub _description: String, + #[serde(rename = "asset_type")] pub _asset_type: String, + #[serde(rename = "exponent")] pub _exponent: i16, + #[serde(rename = "cmc_id")] pub _cmc_id: Option, + #[serde(rename = "interval")] pub _interval: Option, + #[serde(rename = "min_publishers")] pub _min_publishers: u16, + #[serde(rename = "min_channel")] pub _min_channel: String, + #[serde(rename = "state")] pub _state: String, + #[serde(rename = "schedule")] pub _schedule: String, pub hermes_id: Option, } @@ -245,11 +247,20 @@ mod lazer_exporter { S: Send + Sync + 'static, { // TODO: Re-fetch on an interval? - let lazer_symbols: HashMap = + let lazer_symbols: HashMap = match fetch_symbols(&config.history_url).await { Ok(symbols) => symbols .into_iter() - .filter_map(|symbol| symbol.hermes_id.clone().map(|id| (id, symbol))) + .filter_map(|symbol| { + let hermes_id = symbol.hermes_id.clone()?; + match pyth_sdk::Identifier::from_hex(hermes_id.clone()) { + Ok(id) => Some((id, symbol)), + Err(e) => { + tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id); + None + } + } + }) .collect(), Err(e) => { tracing::error!("Failed to fetch Lazer symbols: {e:?}"); @@ -296,7 +307,7 @@ mod lazer_exporter { // TODO: This read locks and clones local::Store::prices, which may not meet performance needs. for (identifier, price_info) in state.get_all_price_infos().await { - if let Some(symbol) = lazer_symbols.get(&identifier.to_string()) { + if let Some(symbol) = lazer_symbols.get(&identifier) { let source_timestamp_micros = price_info.timestamp.and_utc().timestamp_micros(); let source_timestamp = MessageField::some(Timestamp { seconds: source_timestamp_micros / 1_000_000, @@ -353,7 +364,6 @@ mod lazer_exporter { tracing::error!("Error receiving message from at relayer {relayer_url}: {e:?}"); } None => { - // TODO: Probably still appropriate to return here, but retry in caller. tracing::error!("relayer connection closed"); bail!("relayer connection closed"); }