Skip to content

Commit 3103e7b

Browse files
authored
fix sync worker err loop (#1378)
1 parent 76114b2 commit 3103e7b

File tree

8 files changed

+352
-216
lines changed

8 files changed

+352
-216
lines changed

bindings_ffi/src/mls.rs

+4-11
Original file line numberDiff line numberDiff line change
@@ -440,23 +440,16 @@ impl FfiXmtpClient {
440440
.register_identity(signature_request.clone())
441441
.await?;
442442

443-
self.maybe_start_sync_worker().await?;
443+
self.maybe_start_sync_worker();
444444

445445
Ok(())
446446
}
447447

448448
/// Starts the sync worker if the history sync url is present.
449-
async fn maybe_start_sync_worker(&self) -> Result<(), GenericError> {
450-
if self.inner_client.history_sync_url().is_none() {
451-
return Ok(());
449+
fn maybe_start_sync_worker(&self) {
450+
if self.inner_client.history_sync_url().is_some() {
451+
self.inner_client.start_sync_worker();
452452
}
453-
454-
self.inner_client
455-
.start_sync_worker()
456-
.await
457-
.map_err(GenericError::from_error)?;
458-
459-
Ok(())
460453
}
461454

462455
pub async fn send_sync_request(&self, kind: FfiDeviceSyncKind) -> Result<(), GenericError> {

examples/cli/cli-client.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ async fn main() -> color_eyre::eyre::Result<()> {
443443
let conn = client.store().conn().unwrap();
444444
let provider = client.mls_provider().unwrap();
445445
client.sync_welcomes(&conn).await.unwrap();
446-
client.start_sync_worker().await.unwrap();
446+
client.start_sync_worker();
447447
client
448448
.send_sync_request(&provider, DeviceSyncKind::MessageHistory)
449449
.await

xmtp_mls/Cargo.toml

+4-4
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ harness = false
192192
name = "identity"
193193
required-features = ["bench"]
194194

195-
[[bench]]
196-
harness = false
197-
name = "sync"
198-
required-features = ["bench"]
195+
#[[bench]]
196+
#harness = false
197+
#name = "sync"
198+
#required-features = ["bench"]
199199

xmtp_mls/benches/sync.rs

+48-54
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,49 @@
1-
//! Benchmarking for syncing functions
2-
use crate::tracing::Instrument;
3-
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
4-
use tokio::runtime::{Builder, Runtime};
5-
use xmtp_mls::utils::bench::{bench_async_setup, BENCH_ROOT_SPAN};
6-
use xmtp_mls::utils::bench::{clients, init_logging};
1+
// //! Benchmarking for syncing functions
2+
// use crate::tracing::Instrument;
3+
// use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
4+
// use tokio::runtime::{Builder, Runtime};
5+
// use xmtp_mls::utils::bench::{bench_async_setup, BENCH_ROOT_SPAN};
6+
// use xmtp_mls::utils::bench::{clients, init_logging};
7+
//
8+
// #[macro_use]
9+
// extern crate tracing;
10+
//
11+
// fn setup() -> Runtime {
12+
// Builder::new_multi_thread()
13+
// .enable_time()
14+
// .enable_io()
15+
// .thread_name("xmtp-bencher")
16+
// .build()
17+
// .unwrap()
18+
// }
19+
//
20+
// fn start_sync_worker(c: &mut Criterion) {
21+
// init_logging();
22+
//
23+
// let runtime = setup();
24+
// let mut benchmark_group = c.benchmark_group("start_sync_worker");
25+
// benchmark_group.sample_size(10);
26+
// benchmark_group.bench_function("start_sync_worker", |b| {
27+
// let span = trace_span!(BENCH_ROOT_SPAN);
28+
// b.to_async(&runtime).iter_batched(
29+
// || {
30+
// bench_async_setup(|| async {
31+
// let client = clients::new_client(true).await;
32+
// // set history sync URL
33+
// (client, span.clone())
34+
// })
35+
// },
36+
// |(client, span)| async move { client.start_sync_worker().instrument(span) },
37+
// BatchSize::SmallInput,
38+
// )
39+
// });
40+
//
41+
// benchmark_group.finish();
42+
// }
743

8-
#[macro_use]
9-
extern crate tracing;
10-
11-
fn setup() -> Runtime {
12-
Builder::new_multi_thread()
13-
.enable_time()
14-
.enable_io()
15-
.thread_name("xmtp-bencher")
16-
.build()
17-
.unwrap()
18-
}
19-
20-
fn start_sync_worker(c: &mut Criterion) {
21-
init_logging();
22-
23-
let runtime = setup();
24-
let mut benchmark_group = c.benchmark_group("start_sync_worker");
25-
benchmark_group.sample_size(10);
26-
benchmark_group.bench_function("start_sync_worker", |b| {
27-
let span = trace_span!(BENCH_ROOT_SPAN);
28-
b.to_async(&runtime).iter_batched(
29-
|| {
30-
bench_async_setup(|| async {
31-
let client = clients::new_client(true).await;
32-
// set history sync URL
33-
(client, span.clone())
34-
})
35-
},
36-
|(client, span)| async move {
37-
client
38-
.start_sync_worker()
39-
.instrument(span)
40-
.await
41-
.unwrap()
42-
},
43-
BatchSize::SmallInput,
44-
)
45-
});
46-
47-
benchmark_group.finish();
48-
}
49-
50-
criterion_group!(
51-
name = sync;
52-
config = Criterion::default().sample_size(10);
53-
targets = start_sync_worker
54-
);
55-
criterion_main!(sync);
44+
// criterion_group!(
45+
// name = sync;
46+
// config = Criterion::default().sample_size(10);
47+
// targets = start_sync_worker
48+
// );
49+
// criterion_main!(sync);

xmtp_mls/src/client.rs

+20-6
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ use crate::{
4040
mutex_registry::MutexRegistry,
4141
retry::Retry,
4242
retry_async, retryable,
43-
storage::wallet_addresses::WalletEntry,
4443
storage::{
4544
consent_record::{ConsentState, ConsentType, StoredConsentRecord},
4645
db_connection::DbConnection,
4746
group::{GroupMembershipState, GroupQueryArgs, StoredGroup},
4847
group_message::StoredGroupMessage,
4948
refresh_state::EntityKind,
49+
wallet_addresses::WalletEntry,
5050
EncryptedMessageStore, StorageError,
5151
},
5252
subscriptions::{LocalEventError, LocalEvents},
@@ -253,6 +253,25 @@ where
253253
}
254254
}
255255

256+
impl<ApiClient, V> Client<ApiClient, V>
257+
where
258+
ApiClient: XmtpApi + Send + Sync + 'static,
259+
V: SmartContractSignatureVerifier + Send + Sync + 'static,
260+
{
261+
/// Reconnect to the client's database if it has previously been released
262+
pub fn reconnect_db(&self) -> Result<(), ClientError> {
263+
self.context.store.reconnect()?;
264+
// restart all the workers
265+
// TODO: The only worker we have right now are the
266+
// sync workers. if we have other workers we
267+
// should create a better way to track them.
268+
if self.history_sync_url.is_some() {
269+
self.start_sync_worker();
270+
}
271+
Ok(())
272+
}
273+
}
274+
256275
impl<ApiClient, V> Client<ApiClient, V>
257276
where
258277
ApiClient: XmtpApi,
@@ -467,11 +486,6 @@ where
467486
Ok(())
468487
}
469488

470-
/// Reconnect to the client's database if it has previously been released
471-
pub fn reconnect_db(&self) -> Result<(), ClientError> {
472-
self.context.store.reconnect()?;
473-
Ok(())
474-
}
475489
/// Get a reference to the client's identity struct
476490
pub fn identity(&self) -> &Identity {
477491
&self.context.identity

0 commit comments

Comments
 (0)