Skip to content

Commit 3e43b68

Browse files
authored
Merge branch 'main' into mc/thread-safe-groups
2 parents 37ed3ec + 5e0761f commit 3e43b68

File tree

2 files changed

+54
-22
lines changed

2 files changed

+54
-22
lines changed

xmtp_mls/src/groups/mls_sync.rs

+38-18
Original file line numberDiff line numberDiff line change
@@ -1603,33 +1603,53 @@ pub(crate) mod tests {
16031603

16041604
use super::*;
16051605
use crate::builder::ClientBuilder;
1606-
use futures::future;
16071606
use std::sync::Arc;
16081607
use xmtp_cryptography::utils::generate_local_wallet;
16091608

1610-
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
1611-
#[cfg_attr(not(target_arch = "wasm32"), tokio::test(flavor = "multi_thread"))]
1609+
/// This test is not reproducible in webassembly, b/c webassembly has only one thread.
1610+
#[cfg_attr(
1611+
not(target_arch = "wasm32"),
1612+
tokio::test(flavor = "multi_thread", worker_threads = 10)
1613+
)]
1614+
#[cfg(not(target_family = "wasm"))]
16121615
async fn publish_intents_worst_case_scenario() {
16131616
let wallet = generate_local_wallet();
1614-
let amal = Arc::new(ClientBuilder::new_test_client(&wallet).await);
1615-
let amal_group: Arc<MlsGroup<_>> =
1616-
Arc::new(amal.create_group(None, Default::default()).unwrap());
1617+
let amal_a = Arc::new(ClientBuilder::new_test_client(&wallet).await);
1618+
let amal_group_a: Arc<MlsGroup<_>> =
1619+
Arc::new(amal_a.create_group(None, Default::default()).unwrap());
16171620

1618-
amal_group.send_message_optimistic(b"1").unwrap();
1619-
amal_group.send_message_optimistic(b"2").unwrap();
1620-
amal_group.send_message_optimistic(b"3").unwrap();
1621-
amal_group.send_message_optimistic(b"4").unwrap();
1622-
amal_group.send_message_optimistic(b"5").unwrap();
1623-
amal_group.send_message_optimistic(b"6").unwrap();
1621+
let conn = amal_a.context().store().conn().unwrap();
1622+
let provider: Arc<XmtpOpenMlsProvider> = Arc::new(conn.into());
16241623

1625-
let conn = amal.context().store().conn().unwrap();
1626-
let provider: XmtpOpenMlsProvider = conn.into();
1624+
// create group intent
1625+
amal_group_a.sync().await.unwrap();
1626+
assert_eq!(provider.conn_ref().intents_deleted(), 1);
16271627

1628-
let mut futures = vec![];
1629-
for _ in 0..10 {
1630-
futures.push(amal_group.publish_intents(&provider))
1628+
for _ in 0..100 {
1629+
let s = xmtp_common::rand_string::<100>();
1630+
amal_group_a.send_message_optimistic(s.as_bytes()).unwrap();
1631+
}
1632+
1633+
let mut set = tokio::task::JoinSet::new();
1634+
for _ in 0..50 {
1635+
let g = amal_group_a.clone();
1636+
let p = provider.clone();
1637+
set.spawn(async move { g.publish_intents(&p).await });
1638+
}
1639+
1640+
let res = set.join_all().await;
1641+
let errs: Vec<&Result<_, _>> = res.iter().filter(|r| r.is_err()).collect();
1642+
errs.iter().for_each(|e| {
1643+
tracing::error!("{}", e.as_ref().unwrap_err());
1644+
});
1645+
1646+
let published = provider.conn_ref().intents_published();
1647+
assert_eq!(published, 101);
1648+
let created = provider.conn_ref().intents_created();
1649+
assert_eq!(created, 101);
1650+
if !errs.is_empty() {
1651+
panic!("Errors during publish");
16311652
}
1632-
future::join_all(futures).await;
16331653
}
16341654

16351655
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]

xmtp_mls/src/storage/encrypted_store/group_intent.rs

+16-4
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,22 @@ impl DbConnection {
214214
})?;
215215

216216
match res {
217-
// If nothing matched the query, return an error. Either ID or state was wrong
218-
0 => Err(StorageError::NotFound(format!(
219-
"ToPublish intent {intent_id} for publish"
220-
))),
217+
// If nothing matched the query, check if its already published, otherwise return an error. Either ID or state was wrong
218+
0 => {
219+
let already_published = self.raw_query(|conn| {
220+
dsl::group_intents
221+
.filter(dsl::id.eq(intent_id))
222+
.first::<StoredGroupIntent>(conn)
223+
});
224+
225+
if already_published.is_ok() {
226+
Ok(())
227+
} else {
228+
Err(StorageError::NotFound(format!(
229+
"Published intent {intent_id} for commit"
230+
)))
231+
}
232+
}
221233
_ => Ok(()),
222234
}
223235
}

0 commit comments

Comments
 (0)