Skip to content

Commit b5f3237

Browse files
mchenaniinsipxDakota Brinkrygine
authored
fix(group): make MLS group thread safe #1349 (#1404)
--------- Co-authored-by: Andrew Plaza <github@andrewplaza.dev> Co-authored-by: Dakota Brink <github@codabrink> Co-authored-by: Ry Racherbaumer <ry@xmtp.com>
1 parent 5e0761f commit b5f3237

File tree

16 files changed

+813
-616
lines changed

16 files changed

+813
-616
lines changed

bindings_ffi/src/mls.rs

+50-9
Original file line numberDiff line numberDiff line change
@@ -1317,13 +1317,13 @@ impl FfiConversation {
13171317
Ok(())
13181318
}
13191319

1320-
pub fn find_messages(
1320+
pub async fn find_messages(
13211321
&self,
13221322
opts: FfiListMessagesOptions,
13231323
) -> Result<Vec<FfiMessage>, GenericError> {
13241324
let delivery_status = opts.delivery_status.map(|status| status.into());
13251325
let direction = opts.direction.map(|dir| dir.into());
1326-
let kind = match self.conversation_type()? {
1326+
let kind = match self.conversation_type().await? {
13271327
FfiConversationType::Group => None,
13281328
FfiConversationType::Dm => Some(GroupMessageKind::Application),
13291329
FfiConversationType::Sync => None,
@@ -1445,7 +1445,7 @@ impl FfiConversation {
14451445

14461446
pub fn group_image_url_square(&self) -> Result<String, GenericError> {
14471447
let provider = self.inner.mls_provider()?;
1448-
Ok(self.inner.group_image_url_square(provider)?)
1448+
Ok(self.inner.group_image_url_square(&provider)?)
14491449
}
14501450

14511451
pub async fn update_group_description(
@@ -1461,7 +1461,7 @@ impl FfiConversation {
14611461

14621462
pub fn group_description(&self) -> Result<String, GenericError> {
14631463
let provider = self.inner.mls_provider()?;
1464-
Ok(self.inner.group_description(provider)?)
1464+
Ok(self.inner.group_description(&provider)?)
14651465
}
14661466

14671467
pub async fn update_group_pinned_frame_url(
@@ -1593,9 +1593,9 @@ impl FfiConversation {
15931593
self.inner.added_by_inbox_id().map_err(Into::into)
15941594
}
15951595

1596-
pub fn group_metadata(&self) -> Result<Arc<FfiConversationMetadata>, GenericError> {
1596+
pub async fn group_metadata(&self) -> Result<Arc<FfiConversationMetadata>, GenericError> {
15971597
let provider = self.inner.mls_provider()?;
1598-
let metadata = self.inner.metadata(provider)?;
1598+
let metadata = self.inner.metadata(&provider).await?;
15991599
Ok(Arc::new(FfiConversationMetadata {
16001600
inner: Arc::new(metadata),
16011601
}))
@@ -1605,9 +1605,9 @@ impl FfiConversation {
16051605
self.inner.dm_inbox_id().map_err(Into::into)
16061606
}
16071607

1608-
pub fn conversation_type(&self) -> Result<FfiConversationType, GenericError> {
1608+
pub async fn conversation_type(&self) -> Result<FfiConversationType, GenericError> {
16091609
let provider = self.inner.mls_provider()?;
1610-
let conversation_type = self.inner.conversation_type(&provider)?;
1610+
let conversation_type = self.inner.conversation_type(&provider).await?;
16111611
Ok(conversation_type.into())
16121612
}
16131613
}
@@ -2104,6 +2104,9 @@ mod tests {
21042104
.await
21052105
.unwrap();
21062106

2107+
let conn = client.inner_client.context().store().conn().unwrap();
2108+
conn.register_triggers();
2109+
21072110
register_client(&ffi_inbox_owner, &client).await;
21082111
client
21092112
}
@@ -2595,6 +2598,8 @@ mod tests {
25952598
async fn test_can_stream_group_messages_for_updates() {
25962599
let alix = new_test_client().await;
25972600
let bo = new_test_client().await;
2601+
let alix_provider = alix.inner_client.mls_provider().unwrap();
2602+
let bo_provider = bo.inner_client.mls_provider().unwrap();
25982603

25992604
// Stream all group messages
26002605
let message_callbacks = Arc::new(RustStreamCallback::default());
@@ -2627,21 +2632,29 @@ mod tests {
26272632
.unwrap();
26282633
let bo_group = &bo_groups[0];
26292634
bo_group.sync().await.unwrap();
2635+
2636+
// alix published + processed group creation and name update
2637+
assert_eq!(alix_provider.conn_ref().intents_published(), 2);
2638+
assert_eq!(alix_provider.conn_ref().intents_deleted(), 2);
2639+
26302640
bo_group
26312641
.update_group_name("Old Name2".to_string())
26322642
.await
26332643
.unwrap();
26342644
message_callbacks.wait_for_delivery(None).await.unwrap();
2645+
assert_eq!(bo_provider.conn_ref().intents_published(), 1);
26352646

26362647
alix_group.send(b"Hello there".to_vec()).await.unwrap();
26372648
message_callbacks.wait_for_delivery(None).await.unwrap();
2649+
assert_eq!(alix_provider.conn_ref().intents_published(), 3);
26382650

26392651
let dm = bo
26402652
.conversations()
26412653
.create_dm(alix.account_address.clone())
26422654
.await
26432655
.unwrap();
26442656
dm.send(b"Hello again".to_vec()).await.unwrap();
2657+
assert_eq!(bo_provider.conn_ref().intents_published(), 3);
26452658
message_callbacks.wait_for_delivery(None).await.unwrap();
26462659

26472660
// Uncomment the following lines to add more group name updates
@@ -2650,6 +2663,8 @@ mod tests {
26502663
.await
26512664
.unwrap();
26522665
message_callbacks.wait_for_delivery(None).await.unwrap();
2666+
message_callbacks.wait_for_delivery(None).await.unwrap();
2667+
assert_eq!(bo_provider.conn_ref().intents_published(), 4);
26532668

26542669
assert_eq!(message_callbacks.message_count(), 6);
26552670

@@ -2693,9 +2708,11 @@ mod tests {
26932708

26942709
let bo_messages1 = bo_group1
26952710
.find_messages(FfiListMessagesOptions::default())
2711+
.await
26962712
.unwrap();
26972713
let bo_messages5 = bo_group5
26982714
.find_messages(FfiListMessagesOptions::default())
2715+
.await
26992716
.unwrap();
27002717
assert_eq!(bo_messages1.len(), 0);
27012718
assert_eq!(bo_messages5.len(), 0);
@@ -2707,9 +2724,11 @@ mod tests {
27072724

27082725
let bo_messages1 = bo_group1
27092726
.find_messages(FfiListMessagesOptions::default())
2727+
.await
27102728
.unwrap();
27112729
let bo_messages5 = bo_group5
27122730
.find_messages(FfiListMessagesOptions::default())
2731+
.await
27132732
.unwrap();
27142733
assert_eq!(bo_messages1.len(), 1);
27152734
assert_eq!(bo_messages5.len(), 1);
@@ -2828,11 +2847,13 @@ mod tests {
28282847
alix_group.sync().await.unwrap();
28292848
let alix_messages = alix_group
28302849
.find_messages(FfiListMessagesOptions::default())
2850+
.await
28312851
.unwrap();
28322852

28332853
bo_group.sync().await.unwrap();
28342854
let bo_messages = bo_group
28352855
.find_messages(FfiListMessagesOptions::default())
2856+
.await
28362857
.unwrap();
28372858
assert_eq!(bo_messages.len(), 9);
28382859
assert_eq!(alix_messages.len(), 10);
@@ -3016,15 +3037,19 @@ mod tests {
30163037
// Get the message count for all the clients
30173038
let caro_messages = caro_group
30183039
.find_messages(FfiListMessagesOptions::default())
3040+
.await
30193041
.unwrap();
30203042
let alix_messages = alix_group
30213043
.find_messages(FfiListMessagesOptions::default())
3044+
.await
30223045
.unwrap();
30233046
let bo_messages = bo_group
30243047
.find_messages(FfiListMessagesOptions::default())
3048+
.await
30253049
.unwrap();
30263050
let bo2_messages = bo2_group
30273051
.find_messages(FfiListMessagesOptions::default())
3052+
.await
30283053
.unwrap();
30293054

30303055
assert_eq!(caro_messages.len(), 5);
@@ -3080,9 +3105,11 @@ mod tests {
30803105

30813106
let alix_messages = alix_group
30823107
.find_messages(FfiListMessagesOptions::default())
3108+
.await
30833109
.unwrap();
30843110
let bo_messages = bo_group
30853111
.find_messages(FfiListMessagesOptions::default())
3112+
.await
30863113
.unwrap();
30873114

30883115
let alix_can_see_bo_message = alix_messages
@@ -3189,6 +3216,7 @@ mod tests {
31893216

31903217
let bo_messages = bo_group
31913218
.find_messages(FfiListMessagesOptions::default())
3219+
.await
31923220
.unwrap();
31933221
assert_eq!(bo_messages.len(), 0);
31943222

@@ -3204,8 +3232,12 @@ mod tests {
32043232

32053233
let bo_messages = bo_group
32063234
.find_messages(FfiListMessagesOptions::default())
3235+
.await
32073236
.unwrap();
3208-
assert!(bo_messages.first().unwrap().kind == FfiConversationMessageKind::MembershipChange);
3237+
assert_eq!(
3238+
bo_messages.first().unwrap().kind,
3239+
FfiConversationMessageKind::MembershipChange
3240+
);
32093241
assert_eq!(bo_messages.len(), 1);
32103242

32113243
let bo_members = bo_group.list_members().await.unwrap();
@@ -3263,6 +3295,7 @@ mod tests {
32633295

32643296
let bo_messages1 = bo_group
32653297
.find_messages(FfiListMessagesOptions::default())
3298+
.await
32663299
.unwrap();
32673300
assert_eq!(bo_messages1.len(), first_msg_check);
32683301

@@ -3275,6 +3308,7 @@ mod tests {
32753308

32763309
let alix_messages = alix_group
32773310
.find_messages(FfiListMessagesOptions::default())
3311+
.await
32783312
.unwrap();
32793313
assert_eq!(alix_messages.len(), second_msg_check);
32803314

@@ -3284,6 +3318,7 @@ mod tests {
32843318

32853319
let bo_messages2 = bo_group
32863320
.find_messages(FfiListMessagesOptions::default())
3321+
.await
32873322
.unwrap();
32883323
assert_eq!(bo_messages2.len(), second_msg_check);
32893324
assert_eq!(message_callbacks.message_count(), second_msg_check as u32);
@@ -4529,15 +4564,19 @@ mod tests {
45294564
// Get messages for both participants in both conversations
45304565
let alix_dm_messages = alix_dm
45314566
.find_messages(FfiListMessagesOptions::default())
4567+
.await
45324568
.unwrap();
45334569
let bo_dm_messages = bo_dm
45344570
.find_messages(FfiListMessagesOptions::default())
4571+
.await
45354572
.unwrap();
45364573
let alix_group_messages = alix_group
45374574
.find_messages(FfiListMessagesOptions::default())
4575+
.await
45384576
.unwrap();
45394577
let bo_group_messages = bo_group
45404578
.find_messages(FfiListMessagesOptions::default())
4579+
.await
45414580
.unwrap();
45424581

45434582
// Verify DM messages
@@ -4658,13 +4697,15 @@ mod tests {
46584697
.await
46594698
.unwrap()[0]
46604699
.find_messages(FfiListMessagesOptions::default())
4700+
.await
46614701
.unwrap();
46624702
let bo_dm_messages = client_b
46634703
.conversations()
46644704
.list(FfiListConversationsOptions::default())
46654705
.await
46664706
.unwrap()[0]
46674707
.find_messages(FfiListMessagesOptions::default())
4708+
.await
46684709
.unwrap();
46694710
assert_eq!(alix_dm_messages[0].content, "Hello in DM".as_bytes());
46704711
assert_eq!(bo_dm_messages[0].content, "Hello in DM".as_bytes());

bindings_node/src/conversation.rs

+12-10
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ impl Conversation {
161161
}
162162

163163
#[napi]
164-
pub fn find_messages(&self, opts: Option<ListMessagesOptions>) -> Result<Vec<Message>> {
164+
pub async fn find_messages(&self, opts: Option<ListMessagesOptions>) -> Result<Vec<Message>> {
165165
let opts = opts.unwrap_or_default();
166166
let group = MlsGroup::new(
167167
self.inner_client.clone(),
@@ -171,6 +171,7 @@ impl Conversation {
171171
let provider = group.mls_provider().map_err(ErrorWrapper::from)?;
172172
let conversation_type = group
173173
.conversation_type(&provider)
174+
.await
174175
.map_err(ErrorWrapper::from)?;
175176
let kind = match conversation_type {
176177
ConversationType::Group => None,
@@ -250,7 +251,7 @@ impl Conversation {
250251
);
251252

252253
let admin_list = group
253-
.admin_list(group.mls_provider().map_err(ErrorWrapper::from)?)
254+
.admin_list(&group.mls_provider().map_err(ErrorWrapper::from)?)
254255
.map_err(ErrorWrapper::from)?;
255256

256257
Ok(admin_list)
@@ -265,7 +266,7 @@ impl Conversation {
265266
);
266267

267268
let super_admin_list = group
268-
.super_admin_list(group.mls_provider().map_err(ErrorWrapper::from)?)
269+
.super_admin_list(&group.mls_provider().map_err(ErrorWrapper::from)?)
269270
.map_err(ErrorWrapper::from)?;
270271

271272
Ok(super_admin_list)
@@ -451,7 +452,7 @@ impl Conversation {
451452
);
452453

453454
let group_name = group
454-
.group_name(group.mls_provider().map_err(ErrorWrapper::from)?)
455+
.group_name(&group.mls_provider().map_err(ErrorWrapper::from)?)
455456
.map_err(ErrorWrapper::from)?;
456457

457458
Ok(group_name)
@@ -482,7 +483,7 @@ impl Conversation {
482483
);
483484

484485
let group_image_url_square = group
485-
.group_image_url_square(group.mls_provider().map_err(ErrorWrapper::from)?)
486+
.group_image_url_square(&group.mls_provider().map_err(ErrorWrapper::from)?)
486487
.map_err(ErrorWrapper::from)?;
487488

488489
Ok(group_image_url_square)
@@ -513,7 +514,7 @@ impl Conversation {
513514
);
514515

515516
let group_description = group
516-
.group_description(group.mls_provider().map_err(ErrorWrapper::from)?)
517+
.group_description(&group.mls_provider().map_err(ErrorWrapper::from)?)
517518
.map_err(ErrorWrapper::from)?;
518519

519520
Ok(group_description)
@@ -544,7 +545,7 @@ impl Conversation {
544545
);
545546

546547
let group_pinned_frame_url = group
547-
.group_pinned_frame_url(group.mls_provider().map_err(ErrorWrapper::from)?)
548+
.group_pinned_frame_url(&group.mls_provider().map_err(ErrorWrapper::from)?)
548549
.map_err(ErrorWrapper::from)?;
549550

550551
Ok(group_pinned_frame_url)
@@ -587,7 +588,7 @@ impl Conversation {
587588

588589
Ok(
589590
group
590-
.is_active(group.mls_provider().map_err(ErrorWrapper::from)?)
591+
.is_active(&group.mls_provider().map_err(ErrorWrapper::from)?)
591592
.map_err(ErrorWrapper::from)?,
592593
)
593594
}
@@ -604,15 +605,16 @@ impl Conversation {
604605
}
605606

606607
#[napi]
607-
pub fn group_metadata(&self) -> Result<GroupMetadata> {
608+
pub async fn group_metadata(&self) -> Result<GroupMetadata> {
608609
let group = MlsGroup::new(
609610
self.inner_client.clone(),
610611
self.group_id.clone(),
611612
self.created_at_ns,
612613
);
613614

614615
let metadata = group
615-
.metadata(group.mls_provider().map_err(ErrorWrapper::from)?)
616+
.metadata(&group.mls_provider().map_err(ErrorWrapper::from)?)
617+
.await
616618
.map_err(ErrorWrapper::from)?;
617619

618620
Ok(GroupMetadata { inner: metadata })

0 commit comments

Comments
 (0)