Skip to content

Commit 05ad3b4

Browse files
authored
Fix tokio task leak (#151)
* Fix tokio task leak * Using config * Using config file * Bump version * Format * Removing worker group * Fix unused imports
1 parent a9d77e0 commit 05ad3b4

File tree

5 files changed

+45
-19
lines changed

5 files changed

+45
-19
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-agent"
3-
version = "2.12.1"
3+
version = "2.12.2"
44
edition = "2021"
55

66
[[bin]]

config/config.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ key_store.pyth_price_store_program_key = "3m6sv6HGqEbuyLV84mD7rJn4MAC9LhUa1y1AUN
7373
# takes to fetch all symbols.
7474
# oracle.max_lookup_batch_size = 100
7575

76+
# Minimum time for a subscriber to run
77+
# oracle.subscriber_finished_min_time = "30s"
78+
# Time to sleep if the subscriber do not run for more than the minimum time
79+
# oracle.subscriber_finished_sleep_time = "1s"
80+
7681
# How often to refresh the cached network state (current slot and blockhash).
7782
# It is recommended to set this to slightly less than the network's block time,
7883
# as the slot fetched will be used as the time of the price update.

src/agent/services/oracle.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,7 @@ use {
3333
},
3434
std::{
3535
sync::Arc,
36-
time::{
37-
Duration,
38-
Instant,
39-
},
36+
time::Instant,
4037
},
4138
tokio::task::JoinHandle,
4239
tokio_stream::StreamExt,
@@ -67,6 +64,9 @@ where
6764
)));
6865

6966
if config.oracle.subscriber_enabled {
67+
let min_elapsed_time = config.oracle.subscriber_finished_min_time;
68+
let sleep_time = config.oracle.subscriber_finished_sleep_time;
69+
7070
handles.push(tokio::spawn(async move {
7171
loop {
7272
let current_time = Instant::now();
@@ -78,10 +78,10 @@ where
7878
)
7979
.await
8080
{
81-
tracing::error!(err = ?err, "Subscriber exited unexpectedly.");
82-
if current_time.elapsed() < Duration::from_secs(30) {
83-
tracing::warn!("Subscriber restarting too quickly. Sleeping for 1 second.");
84-
tokio::time::sleep(Duration::from_secs(1)).await;
81+
tracing::error!(?err, "Subscriber exited unexpectedly");
82+
if current_time.elapsed() < min_elapsed_time {
83+
tracing::warn!(?sleep_time, "Subscriber restarting too quickly. Sleeping");
84+
tokio::time::sleep(sleep_time).await;
8585
}
8686
}
8787
}
@@ -134,7 +134,7 @@ where
134134
Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
135135
.await
136136
{
137-
tracing::error!(err = ?err, "Failed to handle account update.");
137+
tracing::error!(?err, "Failed to handle account update");
138138
}
139139
});
140140
}

src/agent/state/oracle.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,14 @@ pub struct Data {
138138
pub publisher_buffer_key: Option<Pubkey>,
139139
}
140140

141+
fn default_subscriber_finished_min_time() -> Duration {
142+
Duration::from_secs(30)
143+
}
144+
145+
fn default_subscriber_finished_sleep_time() -> Duration {
146+
Duration::from_secs(1)
147+
}
148+
141149
#[derive(Clone, Serialize, Deserialize, Debug)]
142150
#[serde(default)]
143151
pub struct Config {
@@ -159,17 +167,26 @@ pub struct Config {
159167
/// socket count at bay, the batches are looked up sequentially,
160168
/// trading off overall time it takes to fetch all symbols.
161169
pub max_lookup_batch_size: usize,
170+
171+
/// Minimum time for a subscriber to run
172+
#[serde(default = "default_subscriber_finished_min_time")]
173+
pub subscriber_finished_min_time: Duration,
174+
/// Time to sleep if the subscriber do not run for more than the minimum time
175+
#[serde(default = "default_subscriber_finished_sleep_time")]
176+
pub subscriber_finished_sleep_time: Duration,
162177
}
163178

164179
impl Default for Config {
165180
fn default() -> Self {
166181
Self {
167-
commitment: CommitmentLevel::Confirmed,
168-
poll_interval_duration: Duration::from_secs(5),
169-
subscriber_enabled: true,
170-
updates_channel_capacity: 10000,
171-
data_channel_capacity: 10000,
172-
max_lookup_batch_size: 100,
182+
commitment: CommitmentLevel::Confirmed,
183+
poll_interval_duration: Duration::from_secs(5),
184+
subscriber_enabled: true,
185+
updates_channel_capacity: 10000,
186+
data_channel_capacity: 10000,
187+
max_lookup_batch_size: 100,
188+
subscriber_finished_min_time: default_subscriber_finished_min_time(),
189+
subscriber_finished_sleep_time: default_subscriber_finished_sleep_time(),
173190
}
174191
}
175192
}
@@ -241,6 +258,7 @@ where
241258
);
242259

243260
data.price_accounts.insert(*account_key, price_entry.into());
261+
drop(data);
244262

245263
Prices::update_global_price(
246264
self,
@@ -333,13 +351,16 @@ where
333351
let mut data = self.into().data.write().await;
334352
log_data_diff(&data, &new_data);
335353
*data = new_data;
354+
let data_publisher_permissions = data.publisher_permissions.clone();
355+
let data_publisher_buffer_key = data.publisher_buffer_key;
356+
drop(data);
336357

337358
Exporter::update_on_chain_state(
338359
self,
339360
network,
340361
publish_keypair,
341-
data.publisher_permissions.clone(),
342-
data.publisher_buffer_key,
362+
data_publisher_permissions,
363+
data_publisher_buffer_key,
343364
)
344365
.await?;
345366

0 commit comments

Comments
 (0)