Skip to content

Commit 41cba8c

Browse files
authored
fix(disappearing-messages): run disappearing messages cleaner worker every second (#1611)
* run disappearing messages worker every second
1 parent cb8ee07 commit 41cba8c

File tree

3 files changed

+53
-17
lines changed

3 files changed

+53
-17
lines changed

bindings_ffi/src/mls.rs

-4
Original file line numberDiff line numberDiff line change
@@ -5080,10 +5080,6 @@ mod tests {
50805080
.unwrap();
50815081
let msg_counts_before_cleanup = alix_messages.len();
50825082

5083-
// Step 7: Start cleanup worker and delete expired messages
5084-
alix.inner_client
5085-
.start_disappearing_messages_cleaner_worker();
5086-
50875083
// Wait for cleanup to complete
50885084
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
50895085

common/src/time.rs

+39
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Time primitives for native and WebAssembly
22
33
use std::fmt;
4+
use tokio::time;
45

56
#[derive(Debug)]
67
pub struct Expired;
@@ -96,3 +97,41 @@ pub async fn sleep(duration: Duration) {
9697
pub async fn sleep(duration: Duration) {
9798
tokio::time::sleep(duration).await
9899
}
100+
101+
pub struct Interval {
102+
#[cfg(target_arch = "wasm32")]
103+
duration: Duration,
104+
105+
#[cfg(not(target_arch = "wasm32"))]
106+
inner: time::Interval,
107+
}
108+
109+
impl Interval {
110+
/// Creates a new interval that ticks every `duration`
111+
pub fn new(duration: Duration) -> Self {
112+
#[cfg(target_arch = "wasm32")]
113+
{
114+
Self { duration }
115+
}
116+
117+
#[cfg(not(target_arch = "wasm32"))]
118+
{
119+
Self {
120+
inner: time::interval(duration),
121+
}
122+
}
123+
}
124+
125+
/// Waits for the next tick of the interval
126+
pub async fn tick(&mut self) {
127+
#[cfg(target_arch = "wasm32")]
128+
{
129+
sleep(self.duration).await;
130+
}
131+
132+
#[cfg(not(target_arch = "wasm32"))]
133+
{
134+
self.inner.tick().await;
135+
}
136+
}
137+
}

xmtp_mls/src/groups/disappearing_messages.rs

+14-13
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@ use crate::Client;
44
use std::time::Duration;
55
use thiserror::Error;
66
use tokio::sync::OnceCell;
7+
use xmtp_common::time::Interval;
78
use xmtp_id::scw_verifier::SmartContractSignatureVerifier;
89
use xmtp_proto::api_client::trait_impls::XmtpApi;
910

10-
/// Restart the DisappearingMessagesCleanerWorker every 1 sec to delete the expired messages
11+
/// Duration to wait before restarting the worker in case of an error.
1112
pub const WORKER_RESTART_DELAY: Duration = Duration::from_secs(1);
1213

14+
/// Interval at which the DisappearingMessagesCleanerWorker runs to delete expired messages.
15+
pub const INTERVAL_DURATION: Duration = Duration::from_secs(1);
16+
1317
#[derive(Debug, Error)]
1418
pub enum DisappearingMessagesCleanerError {
1519
#[error("storage error: {0}")]
@@ -68,21 +72,18 @@ where
6872
{
6973
/// Iterate on the list of groups and delete expired messages
7074
async fn delete_expired_messages(&mut self) -> Result<(), DisappearingMessagesCleanerError> {
71-
let provider = self.client.mls_provider()?;
72-
match provider.conn_ref().delete_expired_messages() {
73-
Ok(deleted_count) => {
74-
tracing::info!("Successfully deleted {} expired messages", deleted_count);
75-
}
76-
Err(e) => {
77-
tracing::error!("Failed to delete expired messages, error: {:?}", e);
78-
}
79-
}
75+
self.client
76+
.mls_provider()?
77+
.conn_ref()
78+
.delete_expired_messages()?;
8079
Ok(())
8180
}
8281
async fn run(&mut self) -> Result<(), DisappearingMessagesCleanerError> {
83-
if let Err(err) = self.delete_expired_messages().await {
84-
tracing::error!("Error during deletion of expired messages: {:?}", err);
82+
let mut interval = Interval::new(INTERVAL_DURATION);
83+
84+
loop {
85+
interval.tick().await;
86+
self.delete_expired_messages().await?;
8587
}
86-
Ok(())
8788
}
8889
}

0 commit comments

Comments
 (0)