Skip to content

Lazer exporter fixes #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "3.0.2"
version = "3.0.3"
edition = "2024"

[[bin]]
Expand All @@ -10,6 +10,7 @@ path = "src/bin/agent.rs"
[dependencies]
anyhow = "1.0.81"
backoff = "0.4.0"
base64 = "0.22.1"
ed25519-dalek = "2.1.1"
serde = { version = "1.0.197", features = ["derive", "rc"] }
async-trait = "0.1.79"
Expand Down
169 changes: 100 additions & 69 deletions src/agent/services/lazer_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
crate::agent::state,
anyhow::{
Context,
Result,
anyhow,
bail,
Expand All @@ -9,6 +10,11 @@ use {
ExponentialBackoffBuilder,
backoff::Backoff,
},
base64::{
Engine,
prelude::BASE64_STANDARD,
},
ed25519_dalek::SigningKey,
futures_util::{
SinkExt,
stream::{
Expand All @@ -25,6 +31,7 @@ use {
Deserialize,
Serialize,
},
solana_sdk::signature::keypair,
std::{
path::PathBuf,
sync::Arc,
Expand Down Expand Up @@ -59,18 +66,23 @@ pub const RELAYER_CHANNEL_CAPACITY: usize = 1000;

#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub history_url: Url,
pub relayer_urls: Vec<Url>,
pub authorization_token: String,
pub publish_keypair_path: PathBuf,
pub history_url: Url,
pub relayer_urls: Vec<Url>,
pub publish_keypair_path: PathBuf,
#[serde(with = "humantime_serde", default = "default_publish_interval")]
pub publish_interval_duration: Duration,
pub publish_interval_duration: Duration,
#[serde(with = "humantime_serde", default = "default_symbol_fetch_interval")]
pub symbol_fetch_interval_duration: Duration,
}

fn default_publish_interval() -> Duration {
Duration::from_millis(200)
}

fn default_symbol_fetch_interval() -> Duration {
Duration::from_secs(60 * 60)
}

struct RelayerWsSession {
ws_sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>,
}
Expand Down Expand Up @@ -145,8 +157,8 @@ impl RelayerSessionTask {

failure_count += 1;
let next_backoff = backoff.next_backoff().unwrap_or(max_interval);
tracing::error!(
"relayer session failed with error: {:?}, failure_count: {}; retrying in {:?}",
tracing::warn!(
"relayer session ended with error: {:?}, failure_count: {}; retrying in {:?}",
e,
failure_count,
next_backoff
Expand Down Expand Up @@ -199,7 +211,7 @@ impl RelayerSessionTask {
tracing::error!("Error receiving message from at relayer: {e:?}");
}
None => {
tracing::error!("relayer connection closed");
tracing::warn!("relayer connection closed");
bail!("relayer connection closed");
}
}
Expand Down Expand Up @@ -240,25 +252,53 @@ struct SymbolResponse {

async fn fetch_symbols(history_url: &Url) -> Result<Vec<SymbolResponse>> {
let mut url = history_url.clone();
url.set_scheme("http").map_err(|_| anyhow!("invalid url"))?;
url.set_path("/history/v1/symbols");
let client = Client::new();
let response = client.get(url).send().await?.error_for_status()?;
let data = response.json().await?;
Ok(data)
}

fn get_signing_key(config: &Config) -> Result<SigningKey> {
// Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) {
Ok(k) => k,
Err(e) => {
tracing::error!(
error = ?e,
publish_keypair_path = config.publish_keypair_path.display().to_string(),
"Reading publish keypair returned an error. ",
);
bail!("Reading publish keypair returned an error. ");
}
};

SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
.context("Failed to create signing key from keypair")
}

#[instrument(skip(config, state))]
pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandle<()>> {
let mut handles = vec![];

let signing_key = match get_signing_key(&config) {
Ok(signing_key) => signing_key,
Err(e) => {
// This is fatal as we can't publish without the key.
tracing::error!("failed to get Lazer signing key: {e:?}");
panic!("failed to get Lazer signing key")
}
};
let pubkey_base64 = BASE64_STANDARD.encode(signing_key.verifying_key().to_bytes());
tracing::info!("Loaded Lazer signing key; pubkey in base64: {pubkey_base64}");

// can safely drop first receiver for ease of iteration
let (relayer_sender, _) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);

for url in config.relayer_urls.iter() {
let mut task = RelayerSessionTask {
url: url.clone(),
token: config.authorization_token.to_owned(),
token: pubkey_base64.clone(),
receiver: relayer_sender.subscribe(),
};
handles.push(tokio::spawn(async move { task.run().await }));
Expand All @@ -268,6 +308,7 @@ pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandl
config.clone(),
state,
relayer_sender,
signing_key,
)));

handles
Expand All @@ -284,11 +325,6 @@ mod lazer_exporter {
},
state::local::LocalStore,
},
anyhow::{
Context,
Result,
bail,
},
ed25519_dalek::{
Signer,
SigningKey,
Expand All @@ -314,74 +350,39 @@ mod lazer_exporter {
signature_data::Data::Ed25519,
},
},
solana_sdk::signer::keypair,
std::{
collections::HashMap,
sync::Arc,
},
tokio::sync::broadcast::Sender,
url::Url,
};

fn get_signing_key(config: &Config) -> Result<SigningKey> {
// Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) {
Ok(k) => k,
Err(e) => {
tracing::error!(
error = ?e,
publish_keypair_path = config.publish_keypair_path.display().to_string(),
"Reading publish keypair returned an error. ",
);
bail!("Reading publish keypair returned an error. ");
}
};

SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
.context("Failed to create signing key from keypair")
}

pub async fn lazer_exporter<S>(
config: Config,
state: Arc<S>,
relayer_sender: Sender<SignedLazerTransaction>,
signing_key: SigningKey,
) where
S: LocalStore,
S: Send + Sync + 'static,
{
let signing_key = match get_signing_key(&config) {
Ok(signing_key) => signing_key,
Err(e) => {
tracing::error!("lazer_exporter signing key failure: {e:?}");
return;
}
};

// TODO: Re-fetch on an interval?
let lazer_symbols: HashMap<pyth_sdk::Identifier, SymbolResponse> =
match fetch_symbols(&config.history_url).await {
Ok(symbols) => symbols
.into_iter()
.filter_map(|symbol| {
let hermes_id = symbol.hermes_id.clone()?;
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
Ok(id) => Some((id, symbol)),
Err(e) => {
tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id);
None
}
}
})
.collect(),
Err(e) => {
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
return;
}
};
let mut lazer_symbols = get_lazer_symbol_map(&config.history_url).await;
tracing::info!(
"Retrieved {} Lazer feeds with hermes symbols from symbols endpoint: {}",
lazer_symbols.len(),
&config.history_url
);

let mut publish_interval = tokio::time::interval(config.publish_interval_duration);
let mut symbol_fetch_interval =
tokio::time::interval(config.symbol_fetch_interval_duration);

loop {
tokio::select! {
_ = symbol_fetch_interval.tick() => {
lazer_symbols = get_lazer_symbol_map(&config.history_url).await;
},
_ = publish_interval.tick() => {
let publisher_timestamp = MessageField::some(Timestamp::now());
let mut publisher_update = PublisherUpdate {
Expand Down Expand Up @@ -452,6 +453,30 @@ mod lazer_exporter {
}
}
}

async fn get_lazer_symbol_map(
history_url: &Url,
) -> HashMap<pyth_sdk::Identifier, SymbolResponse> {
match fetch_symbols(history_url).await {
Ok(symbols) => symbols
.into_iter()
.filter_map(|symbol| {
let hermes_id = symbol.hermes_id.clone()?;
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
Ok(id) => Some((id, symbol)),
Err(e) => {
tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id);
None
}
}
})
.collect(),
Err(e) => {
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
HashMap::new()
}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -604,15 +629,21 @@ mod tests {
let state = Arc::new(local::Store::new(&mut Registry::default()));
let (relayer_sender, mut relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);
let private_key_file = get_private_key_file();
let private_key = get_private_key();

let config = Config {
history_url: Url::parse("http://127.0.0.1:12345").unwrap(),
relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()],
authorization_token: "token1".to_string(),
publish_keypair_path: PathBuf::from(private_key_file.path()),
publish_interval_duration: Duration::from_secs(1),
history_url: Url::parse("http://127.0.0.1:12345").unwrap(),
relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()],
publish_keypair_path: PathBuf::from(private_key_file.path()),
publish_interval_duration: Duration::from_secs(1),
symbol_fetch_interval_duration: Duration::from_secs(60 * 60),
};
tokio::spawn(lazer_exporter(config, state.clone(), relayer_sender));
tokio::spawn(lazer_exporter(
config,
state.clone(),
relayer_sender,
private_key,
));

tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
match relayer_receiver.try_recv() {
Expand Down
Loading