Skip to content

Commit 434b679

Browse files
committed
Use broadcast::channel for multiplex to relayers
1 parent 4f66a50 commit 434b679

File tree

1 file changed

+37
-23
lines changed

1 file changed

+37
-23
lines changed

src/agent/services/lazer_exporter.rs

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,7 @@ use {
3333
tokio::{
3434
net::TcpStream,
3535
select,
36-
sync::mpsc::{
37-
self,
38-
Receiver,
39-
},
36+
sync::broadcast,
4037
task::JoinHandle,
4138
},
4239
tokio_tungstenite::{
@@ -80,7 +77,7 @@ impl RelayerWsSession {
8077
&mut self,
8178
signed_lazer_transaction: &SignedLazerTransaction,
8279
) -> Result<()> {
83-
tracing::debug!("price_update: {:?}", signed_lazer_transaction);
80+
tracing::debug!("signed_lazer_transaction: {:?}", signed_lazer_transaction);
8481
let buf = signed_lazer_transaction.write_to_bytes()?;
8582
self.ws_sender
8683
.send(TungsteniteMessage::from(buf.clone()))
@@ -113,7 +110,7 @@ struct RelayerSessionTask {
113110
// connection state
114111
url: Url,
115112
token: String,
116-
receiver: Receiver<SignedLazerTransaction>,
113+
receiver: broadcast::Receiver<SignedLazerTransaction>,
117114
}
118115

119116
impl RelayerSessionTask {
@@ -168,11 +165,25 @@ impl RelayerSessionTask {
168165

169166
loop {
170167
select! {
171-
Some(transaction) = self.receiver.recv() => {
172-
if let Err(e) = relayer_ws_session.send_transaction(&transaction).await
173-
{
174-
tracing::error!("Error publishing transaction to Lazer relayer: {e:?}");
175-
bail!("Failed to publish transaction to Lazer relayer: {e:?}");
168+
recv_result = self.receiver.recv() => {
169+
match recv_result {
170+
Ok(transaction) => {
171+
if let Err(e) = relayer_ws_session.send_transaction(&transaction).await {
172+
tracing::error!("Error publishing transaction to Lazer relayer: {e:?}");
173+
bail!("Failed to publish transaction to Lazer relayer: {e:?}");
174+
}
175+
},
176+
Err(e) => {
177+
match e {
178+
broadcast::error::RecvError::Closed => {
179+
tracing::error!("transaction broadcast channel closed");
180+
bail!("transaction broadcast channel closed");
181+
}
182+
broadcast::error::RecvError::Lagged(skipped_count) => {
183+
tracing::warn!("transaction broadcast channel lagged by {skipped_count} messages");
184+
}
185+
}
186+
}
176187
}
177188
}
178189
// Handle messages from the relayers, such as errors if we send a bad update
@@ -237,23 +248,23 @@ async fn fetch_symbols(history_url: &Url) -> Result<Vec<SymbolResponse>> {
237248
#[instrument(skip(config, state))]
238249
pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandle<()>> {
239250
let mut handles = vec![];
240-
let mut relayer_senders = vec![];
251+
252+
// can safely drop first receiver for ease of iteration
253+
let (relayer_sender, _) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);
241254

242255
for url in config.relayer_urls.iter() {
243-
let (sender, receiver) = mpsc::channel(RELAYER_CHANNEL_CAPACITY);
244256
let mut task = RelayerSessionTask {
245-
url: url.clone(),
246-
token: config.authorization_token.to_owned(),
247-
receiver,
257+
url: url.clone(),
258+
token: config.authorization_token.to_owned(),
259+
receiver: relayer_sender.subscribe(),
248260
};
249261
handles.push(tokio::spawn(async move { task.run().await }));
250-
relayer_senders.push(sender);
251262
}
252263

253264
handles.push(tokio::spawn(lazer_exporter::lazer_exporter(
254265
config.clone(),
255266
state,
256-
relayer_senders,
267+
relayer_sender,
257268
)));
258269

259270
handles
@@ -305,7 +316,7 @@ mod lazer_exporter {
305316
collections::HashMap,
306317
sync::Arc,
307318
},
308-
tokio::sync::mpsc::Sender,
319+
tokio::sync::broadcast::Sender,
309320
};
310321

311322
fn get_signing_key(config: &Config) -> Result<SigningKey> {
@@ -329,7 +340,7 @@ mod lazer_exporter {
329340
pub async fn lazer_exporter<S>(
330341
config: Config,
331342
state: Arc<S>,
332-
relayer_senders: Vec<Sender<SignedLazerTransaction>>,
343+
relayer_sender: Sender<SignedLazerTransaction>,
333344
) where
334345
S: LocalStore,
335346
S: Send + Sync + 'static,
@@ -428,9 +439,12 @@ mod lazer_exporter {
428439
payload: Some(buf),
429440
special_fields: Default::default(),
430441
};
431-
futures::future::join_all(relayer_senders.iter().map(|relayer_sender|
432-
relayer_sender.send(signed_lazer_transaction.clone()))
433-
).await;
442+
match relayer_sender.send(signed_lazer_transaction.clone()) {
443+
Ok(_) => (),
444+
Err(e) => {
445+
tracing::error!("Error sending transaction to relayer receivers: {e}");
446+
}
447+
}
434448
}
435449
}
436450
}

0 commit comments

Comments
 (0)