Skip to content

Commit 234888c

Browse files
authored
Some lazer exporter unit tests (#165)
1 parent 9482f3d commit 234888c

File tree

3 files changed

+317
-4
lines changed

3 files changed

+317
-4
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ http = "1.3.1"
5757
url = { version = "2.5.4", features = ["serde"] }
5858
pyth-lazer-publisher-sdk = "0.1.5"
5959

60+
[dev-dependencies]
61+
tempfile = "3.20.0"
62+
6063
[profile.release]
6164
panic = 'abort'
6265

src/agent/services/lazer_exporter.rs

Lines changed: 311 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ use {
2121
protobuf::Message as ProtobufMessage,
2222
pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction,
2323
reqwest::Client,
24-
serde::Deserialize,
24+
serde::{
25+
Deserialize,
26+
Serialize,
27+
},
2528
std::{
2629
path::PathBuf,
2730
sync::Arc,
@@ -207,7 +210,7 @@ impl RelayerSessionTask {
207210
}
208211

209212
// TODO: This is copied from history-service; move to Lazer protocol sdk.
210-
#[derive(Deserialize)]
213+
#[derive(Debug, Serialize, Deserialize)]
211214
struct SymbolResponse {
212215
pub pyth_lazer_id: u32,
213216
#[serde(rename = "name")]
@@ -450,3 +453,309 @@ mod lazer_exporter {
450453
}
451454
}
452455
}
456+
457+
#[cfg(test)]
458+
mod tests {
459+
use {
460+
crate::agent::{
461+
services::lazer_exporter::{
462+
Config,
463+
RELAYER_CHANNEL_CAPACITY,
464+
RelayerSessionTask,
465+
SymbolResponse,
466+
lazer_exporter::lazer_exporter,
467+
},
468+
state::{
469+
local,
470+
local::{
471+
LocalStore,
472+
PriceInfo,
473+
},
474+
},
475+
},
476+
ed25519_dalek::{
477+
Signer,
478+
SigningKey,
479+
},
480+
futures_util::StreamExt,
481+
prometheus_client::registry::Registry,
482+
protobuf::{
483+
Message,
484+
MessageField,
485+
well_known_types::timestamp::Timestamp,
486+
},
487+
pyth_lazer_publisher_sdk::{
488+
publisher_update::{
489+
FeedUpdate,
490+
PriceUpdate,
491+
PublisherUpdate,
492+
feed_update::{
493+
self,
494+
Update,
495+
},
496+
},
497+
transaction::{
498+
Ed25519SignatureData,
499+
LazerTransaction,
500+
SignatureData,
501+
SignedLazerTransaction,
502+
lazer_transaction::{
503+
self,
504+
Payload,
505+
},
506+
signature_data::Data::Ed25519,
507+
},
508+
},
509+
pyth_sdk_solana::state::PriceStatus,
510+
std::{
511+
io::Write,
512+
net::SocketAddr,
513+
path::PathBuf,
514+
sync::{
515+
Arc,
516+
Once,
517+
},
518+
time::Duration,
519+
},
520+
tempfile::NamedTempFile,
521+
tokio::{
522+
net::TcpListener,
523+
sync::{
524+
broadcast::{
525+
self,
526+
error::TryRecvError,
527+
},
528+
mpsc,
529+
},
530+
},
531+
url::Url,
532+
warp::Filter,
533+
};
534+
535+
static INIT: Once = Once::new();
536+
537+
fn init_tracing() {
538+
INIT.call_once(|| {
539+
tracing_subscriber::fmt()
540+
.with_max_level(tracing::Level::DEBUG)
541+
.with_test_writer() // Send output to test output
542+
.init();
543+
});
544+
}
545+
546+
pub async fn run_mock_history_server(addr: SocketAddr) {
547+
let route = warp::path!("history" / "v1" / "symbols")
548+
.and(warp::get())
549+
.map(|| {
550+
let response = vec![SymbolResponse {
551+
pyth_lazer_id: 1,
552+
_name: "BTCUSD".to_string(),
553+
_symbol: "Crypto.BTC/USD".to_string(),
554+
_description: "BITCOIN / US DOLLAR".to_string(),
555+
_asset_type: "crypto".to_string(),
556+
_exponent: -8,
557+
_cmc_id: Some(1),
558+
_interval: None,
559+
_min_publishers: 1,
560+
_min_channel: "real_time".to_string(),
561+
_state: "stable".to_string(),
562+
_schedule: "America/New_York;O,O,O,O,O,O,O;".to_string(),
563+
hermes_id: Some(
564+
"e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"
565+
.to_string(),
566+
),
567+
}];
568+
warp::reply::json(&response)
569+
});
570+
warp::serve(route).run(addr).await;
571+
}
572+
573+
fn get_private_key() -> SigningKey {
574+
SigningKey::from_keypair_bytes(&[
575+
105, 175, 146, 91, 32, 145, 164, 199, 37, 111, 139, 255, 44, 225, 5, 247, 154, 170,
576+
238, 70, 47, 15, 9, 48, 102, 87, 180, 50, 50, 38, 148, 243, 62, 148, 219, 72, 222, 170,
577+
8, 246, 176, 33, 205, 29, 118, 11, 220, 163, 214, 204, 46, 49, 132, 94, 170, 173, 244,
578+
39, 179, 211, 177, 70, 252, 31,
579+
])
580+
.unwrap()
581+
}
582+
583+
fn get_private_key_file() -> NamedTempFile {
584+
let private_key_string = "[105,175,146,91,32,145,164,199,37,111,139,255,44,225,5,247,154,170,238,70,47,15,9,48,102,87,180,50,50,38,148,243,62,148,219,72,222,170,8,246,176,33,205,29,118,11,220,163,214,204,46,49,132,94,170,173,244,39,179,211,177,70,252,31]";
585+
let mut temp_file = NamedTempFile::new().unwrap();
586+
temp_file
587+
.as_file_mut()
588+
.write(private_key_string.as_bytes())
589+
.unwrap();
590+
temp_file.flush().unwrap();
591+
temp_file
592+
}
593+
594+
#[tokio::test]
595+
async fn test_lazer_exporter() {
596+
init_tracing();
597+
598+
let history_addr = "127.0.0.1:12345".parse().unwrap();
599+
tokio::spawn(async move {
600+
run_mock_history_server(history_addr).await;
601+
});
602+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
603+
604+
let state = Arc::new(local::Store::new(&mut Registry::default()));
605+
let (relayer_sender, mut relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);
606+
let private_key_file = get_private_key_file();
607+
608+
let config = Config {
609+
history_url: Url::parse("http://127.0.0.1:12345").unwrap(),
610+
relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()],
611+
authorization_token: "token1".to_string(),
612+
publish_keypair_path: PathBuf::from(private_key_file.path()),
613+
publish_interval_duration: Duration::from_secs(1),
614+
};
615+
tokio::spawn(lazer_exporter(config, state.clone(), relayer_sender));
616+
617+
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
618+
match relayer_receiver.try_recv() {
619+
Err(TryRecvError::Empty) => (),
620+
_ => panic!("channel should be empty"),
621+
}
622+
623+
let btc_id = pyth_sdk::Identifier::from_hex(
624+
"e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43",
625+
)
626+
.unwrap();
627+
let price = PriceInfo {
628+
status: PriceStatus::Trading,
629+
price: 100_000_00000000i64,
630+
conf: 1_00000000u64,
631+
timestamp: Default::default(),
632+
};
633+
state.update(btc_id, price).await.unwrap();
634+
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
635+
match relayer_receiver.try_recv() {
636+
Ok(transaction) => {
637+
let lazer_transaction =
638+
LazerTransaction::parse_from_bytes(transaction.payload.unwrap().as_slice())
639+
.unwrap();
640+
let publisher_update =
641+
if let lazer_transaction::Payload::PublisherUpdate(publisher_update) =
642+
lazer_transaction.payload.unwrap()
643+
{
644+
publisher_update
645+
} else {
646+
panic!("expected publisher_update")
647+
};
648+
assert_eq!(publisher_update.updates.len(), 1);
649+
let feed_update = &publisher_update.updates[0];
650+
assert_eq!(feed_update.feed_id, Some(1u32));
651+
let price_update = if let feed_update::Update::PriceUpdate(price_update) =
652+
feed_update.clone().update.unwrap()
653+
{
654+
price_update
655+
} else {
656+
panic!("expected price_update")
657+
};
658+
assert_eq!(price_update.price, Some(100_000_00000000i64));
659+
}
660+
_ => panic!("channel should have a transaction waiting"),
661+
}
662+
}
663+
664+
pub async fn run_mock_relayer(
665+
addr: SocketAddr,
666+
back_sender: mpsc::Sender<SignedLazerTransaction>,
667+
) {
668+
let listener = TcpListener::bind(addr).await.unwrap();
669+
670+
tokio::spawn(async move {
671+
let Ok((stream, _)) = listener.accept().await else {
672+
panic!("failed to accept mock relayer websocket connection");
673+
};
674+
let ws_stream = tokio_tungstenite::accept_async(stream)
675+
.await
676+
.expect("handshake failed");
677+
let (_, mut read) = ws_stream.split();
678+
while let Some(msg) = read.next().await {
679+
if let Ok(msg) = msg {
680+
if msg.is_binary() {
681+
tracing::info!("Received binary message: {msg:?}");
682+
let transaction =
683+
SignedLazerTransaction::parse_from_bytes(msg.into_data().as_ref())
684+
.unwrap();
685+
back_sender.clone().send(transaction).await.unwrap();
686+
}
687+
} else {
688+
tracing::error!("Received a malformed message: {msg:?}");
689+
}
690+
}
691+
});
692+
}
693+
694+
#[tokio::test]
695+
async fn test_relayer_session() {
696+
init_tracing();
697+
698+
let (back_sender, mut back_receiver) = mpsc::channel(RELAYER_CHANNEL_CAPACITY);
699+
let relayer_addr = "127.0.0.1:12346".parse().unwrap();
700+
run_mock_relayer(relayer_addr, back_sender).await;
701+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
702+
703+
let (relayer_sender, relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);
704+
705+
let mut relayer_session_task = RelayerSessionTask {
706+
// connection state
707+
url: Url::parse("ws://127.0.0.1:12346").unwrap(),
708+
token: "token1".to_string(),
709+
receiver: relayer_receiver,
710+
};
711+
tokio::spawn(async move { relayer_session_task.run().await });
712+
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
713+
714+
let transaction = get_signed_lazer_transaction();
715+
relayer_sender
716+
.send(transaction.clone())
717+
.expect("relayer_sender.send failed");
718+
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
719+
let received_transaction = back_receiver
720+
.recv()
721+
.await
722+
.expect("back_receiver.recv failed");
723+
assert_eq!(transaction, received_transaction);
724+
}
725+
726+
fn get_signed_lazer_transaction() -> SignedLazerTransaction {
727+
let publisher_update = PublisherUpdate {
728+
updates: vec![FeedUpdate {
729+
feed_id: Some(1),
730+
source_timestamp: MessageField::some(Timestamp::now()),
731+
update: Some(Update::PriceUpdate(PriceUpdate {
732+
price: Some(1_000_000_000i64),
733+
..PriceUpdate::default()
734+
})),
735+
special_fields: Default::default(),
736+
}],
737+
publisher_timestamp: MessageField::some(Timestamp::now()),
738+
special_fields: Default::default(),
739+
};
740+
let lazer_transaction = LazerTransaction {
741+
payload: Some(Payload::PublisherUpdate(publisher_update)),
742+
special_fields: Default::default(),
743+
};
744+
let buf = lazer_transaction.write_to_bytes().unwrap();
745+
let signing_key = get_private_key();
746+
let signature = signing_key.sign(&buf);
747+
let signature_data = SignatureData {
748+
data: Some(Ed25519(Ed25519SignatureData {
749+
signature: Some(signature.to_bytes().into()),
750+
public_key: Some(signing_key.verifying_key().to_bytes().into()),
751+
special_fields: Default::default(),
752+
})),
753+
special_fields: Default::default(),
754+
};
755+
SignedLazerTransaction {
756+
signature_data: MessageField::some(signature_data),
757+
payload: Some(buf),
758+
special_fields: Default::default(),
759+
}
760+
}
761+
}

0 commit comments

Comments
 (0)