Skip to content

Commit 44e7374

Browse files
committed
Lazer exporter fixes
1 parent 234888c commit 44e7374

File tree

3 files changed

+94
-68
lines changed

3 files changed

+94
-68
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-agent"
3-
version = "3.0.1"
3+
version = "3.0.2"
44
edition = "2024"
55

66
[[bin]]
@@ -10,6 +10,7 @@ path = "src/bin/agent.rs"
1010
[dependencies]
1111
anyhow = "1.0.81"
1212
backoff = "0.4.0"
13+
base64 = "0.22.1"
1314
ed25519-dalek = "2.1.1"
1415
serde = { version = "1.0.197", features = ["derive", "rc"] }
1516
async-trait = "0.1.79"

src/agent/services/lazer_exporter.rs

Lines changed: 90 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use {
22
crate::agent::state,
33
anyhow::{
4+
Context,
45
Result,
56
anyhow,
67
bail,
@@ -9,6 +10,11 @@ use {
910
ExponentialBackoffBuilder,
1011
backoff::Backoff,
1112
},
13+
base64::{
14+
Engine,
15+
prelude::BASE64_STANDARD,
16+
},
17+
ed25519_dalek::SigningKey,
1218
futures_util::{
1319
SinkExt,
1420
stream::{
@@ -25,6 +31,7 @@ use {
2531
Deserialize,
2632
Serialize,
2733
},
34+
solana_sdk::signature::keypair,
2835
std::{
2936
path::PathBuf,
3037
sync::Arc,
@@ -59,18 +66,23 @@ pub const RELAYER_CHANNEL_CAPACITY: usize = 1000;
5966

6067
#[derive(Clone, Debug, Deserialize)]
6168
pub struct Config {
62-
pub history_url: Url,
63-
pub relayer_urls: Vec<Url>,
64-
pub authorization_token: String,
65-
pub publish_keypair_path: PathBuf,
69+
pub history_url: Url,
70+
pub relayer_urls: Vec<Url>,
71+
pub publish_keypair_path: PathBuf,
6672
#[serde(with = "humantime_serde", default = "default_publish_interval")]
67-
pub publish_interval_duration: Duration,
73+
pub publish_interval_duration: Duration,
74+
#[serde(with = "humantime_serde", default = "default_symbol_fetch_interval")]
75+
pub symbol_fetch_interval_duration: Duration,
6876
}
6977

7078
fn default_publish_interval() -> Duration {
7179
Duration::from_millis(200)
7280
}
7381

82+
fn default_symbol_fetch_interval() -> Duration {
83+
Duration::from_secs(60 * 60)
84+
}
85+
7486
struct RelayerWsSession {
7587
ws_sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>,
7688
}
@@ -248,17 +260,44 @@ async fn fetch_symbols(history_url: &Url) -> Result<Vec<SymbolResponse>> {
248260
Ok(data)
249261
}
250262

263+
fn get_signing_key(config: &Config) -> Result<SigningKey> {
264+
// Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
265+
let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) {
266+
Ok(k) => k,
267+
Err(e) => {
268+
tracing::error!(
269+
error = ?e,
270+
publish_keypair_path = config.publish_keypair_path.display().to_string(),
271+
"Reading publish keypair returned an error. ",
272+
);
273+
bail!("Reading publish keypair returned an error. ");
274+
}
275+
};
276+
277+
SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
278+
.context("Failed to create signing key from keypair")
279+
}
280+
251281
#[instrument(skip(config, state))]
252282
pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandle<()>> {
253283
let mut handles = vec![];
254284

285+
let signing_key = match get_signing_key(&config) {
286+
Ok(signing_key) => signing_key,
287+
Err(e) => {
288+
// This is fatal as we can't publish without the key.
289+
tracing::error!("failed to get Lazer signing key: {e:?}");
290+
panic!("failed to get Lazer signing key")
291+
}
292+
};
293+
255294
// can safely drop first receiver for ease of iteration
256295
let (relayer_sender, _) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);
257296

258297
for url in config.relayer_urls.iter() {
259298
let mut task = RelayerSessionTask {
260299
url: url.clone(),
261-
token: config.authorization_token.to_owned(),
300+
token: BASE64_STANDARD.encode(signing_key.verifying_key().to_bytes()),
262301
receiver: relayer_sender.subscribe(),
263302
};
264303
handles.push(tokio::spawn(async move { task.run().await }));
@@ -268,6 +307,7 @@ pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandl
268307
config.clone(),
269308
state,
270309
relayer_sender,
310+
signing_key,
271311
)));
272312

273313
handles
@@ -284,11 +324,6 @@ mod lazer_exporter {
284324
},
285325
state::local::LocalStore,
286326
},
287-
anyhow::{
288-
Context,
289-
Result,
290-
bail,
291-
},
292327
ed25519_dalek::{
293328
Signer,
294329
SigningKey,
@@ -314,74 +349,33 @@ mod lazer_exporter {
314349
signature_data::Data::Ed25519,
315350
},
316351
},
317-
solana_sdk::signer::keypair,
318352
std::{
319353
collections::HashMap,
320354
sync::Arc,
321355
},
322356
tokio::sync::broadcast::Sender,
357+
url::Url,
323358
};
324359

325-
fn get_signing_key(config: &Config) -> Result<SigningKey> {
326-
// Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
327-
let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) {
328-
Ok(k) => k,
329-
Err(e) => {
330-
tracing::error!(
331-
error = ?e,
332-
publish_keypair_path = config.publish_keypair_path.display().to_string(),
333-
"Reading publish keypair returned an error. ",
334-
);
335-
bail!("Reading publish keypair returned an error. ");
336-
}
337-
};
338-
339-
SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
340-
.context("Failed to create signing key from keypair")
341-
}
342-
343360
pub async fn lazer_exporter<S>(
344361
config: Config,
345362
state: Arc<S>,
346363
relayer_sender: Sender<SignedLazerTransaction>,
364+
signing_key: SigningKey,
347365
) where
348366
S: LocalStore,
349367
S: Send + Sync + 'static,
350368
{
351-
let signing_key = match get_signing_key(&config) {
352-
Ok(signing_key) => signing_key,
353-
Err(e) => {
354-
tracing::error!("lazer_exporter signing key failure: {e:?}");
355-
return;
356-
}
357-
};
358-
359-
// TODO: Re-fetch on an interval?
360-
let lazer_symbols: HashMap<pyth_sdk::Identifier, SymbolResponse> =
361-
match fetch_symbols(&config.history_url).await {
362-
Ok(symbols) => symbols
363-
.into_iter()
364-
.filter_map(|symbol| {
365-
let hermes_id = symbol.hermes_id.clone()?;
366-
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
367-
Ok(id) => Some((id, symbol)),
368-
Err(e) => {
369-
tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id);
370-
None
371-
}
372-
}
373-
})
374-
.collect(),
375-
Err(e) => {
376-
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
377-
return;
378-
}
379-
};
380-
369+
let mut lazer_symbols = get_lazer_symbol_map(&config.history_url).await;
381370
let mut publish_interval = tokio::time::interval(config.publish_interval_duration);
371+
let mut symbol_fetch_interval =
372+
tokio::time::interval(config.symbol_fetch_interval_duration);
382373

383374
loop {
384375
tokio::select! {
376+
_ = symbol_fetch_interval.tick() => {
377+
lazer_symbols = get_lazer_symbol_map(&config.history_url).await;
378+
},
385379
_ = publish_interval.tick() => {
386380
let publisher_timestamp = MessageField::some(Timestamp::now());
387381
let mut publisher_update = PublisherUpdate {
@@ -452,6 +446,30 @@ mod lazer_exporter {
452446
}
453447
}
454448
}
449+
450+
async fn get_lazer_symbol_map(
451+
history_url: &Url,
452+
) -> HashMap<pyth_sdk::Identifier, SymbolResponse> {
453+
match fetch_symbols(history_url).await {
454+
Ok(symbols) => symbols
455+
.into_iter()
456+
.filter_map(|symbol| {
457+
let hermes_id = symbol.hermes_id.clone()?;
458+
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
459+
Ok(id) => Some((id, symbol)),
460+
Err(e) => {
461+
tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id);
462+
None
463+
}
464+
}
465+
})
466+
.collect(),
467+
Err(e) => {
468+
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
469+
HashMap::new()
470+
}
471+
}
472+
}
455473
}
456474

457475
#[cfg(test)]
@@ -604,15 +622,21 @@ mod tests {
604622
let state = Arc::new(local::Store::new(&mut Registry::default()));
605623
let (relayer_sender, mut relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);
606624
let private_key_file = get_private_key_file();
625+
let private_key = get_private_key();
607626

608627
let config = Config {
609-
history_url: Url::parse("http://127.0.0.1:12345").unwrap(),
610-
relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()],
611-
authorization_token: "token1".to_string(),
612-
publish_keypair_path: PathBuf::from(private_key_file.path()),
613-
publish_interval_duration: Duration::from_secs(1),
628+
history_url: Url::parse("http://127.0.0.1:12345").unwrap(),
629+
relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()],
630+
publish_keypair_path: PathBuf::from(private_key_file.path()),
631+
publish_interval_duration: Duration::from_secs(1),
632+
symbol_fetch_interval_duration: Duration::from_secs(60 * 60),
614633
};
615-
tokio::spawn(lazer_exporter(config, state.clone(), relayer_sender));
634+
tokio::spawn(lazer_exporter(
635+
config,
636+
state.clone(),
637+
relayer_sender,
638+
private_key,
639+
));
616640

617641
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
618642
match relayer_receiver.try_recv() {

0 commit comments

Comments
 (0)