Skip to content

Commit e5b2bf1

Browse files
committed
wip
1 parent 653f6c9 commit e5b2bf1

File tree

7 files changed

+67
-58
lines changed

7 files changed

+67
-58
lines changed

bindings_node/src/conversation.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{ops::Deref, sync::Arc};
2-
use futures::TryFutureExt;
2+
33
use napi::{
44
bindgen_prelude::{Result, Uint8Array},
55
threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode},

xmtp_mls/src/client.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -845,8 +845,8 @@ where
845845
async move {
846846
tracing::info!(
847847
inbox_id = self.inbox_id(),
848-
"current epoch for [{}] in sync_all_groups()",
849-
self.inbox_id(),
848+
"[{}] syncing group",
849+
self.inbox_id()
850850
);
851851
tracing::info!(
852852
inbox_id = self.inbox_id(),

xmtp_mls/src/groups/intents.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,6 @@ impl TryFrom<Vec<u8>> for PostCommitAction {
725725
pub(crate) mod tests {
726726
#[cfg(target_arch = "wasm32")]
727727
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_dedicated_worker);
728-
729728
use openmls::prelude::{MlsMessageBodyIn, MlsMessageIn, ProcessedMessageContent};
730729
use tls_codec::Deserialize;
731730
use xmtp_cryptography::utils::generate_local_wallet;
@@ -866,9 +865,8 @@ pub(crate) mod tests {
866865
let provider = group.client.mls_provider().unwrap();
867866
let decrypted_message = match group
868867
.load_mls_group_with_lock(&provider, |mut mls_group| {
869-
mls_group
870-
.process_message(&provider, mls_message)
871-
.map_err(|e| GroupError::Generic(e.to_string()))
868+
Ok(mls_group
869+
.process_message(&provider, mls_message).unwrap())
872870
}) {
873871
Ok(message) => message,
874872
Err(err) => panic!("Error: {:?}", err),

xmtp_mls/src/groups/members.rs

-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ where
4141
provider: &XmtpOpenMlsProvider,
4242
) -> Result<Vec<GroupMember>, GroupError> {
4343
let group_membership = self.load_mls_group_with_lock(provider, |mls_group| {
44-
// Extract group membership from extensions
4544
Ok(extract_group_membership(mls_group.extensions())?)
4645
})?;
4746
let requests = group_membership

xmtp_mls/src/groups/mls_sync.rs

+22-7
Original file line numberDiff line numberDiff line change
@@ -360,9 +360,8 @@ where
360360
if intent.state == IntentState::Committed {
361361
return Ok(IntentState::Committed);
362362
}
363-
let group_epoch = mls_group.epoch();
364-
365363
let message_epoch = message.epoch();
364+
let group_epoch = mls_group.epoch();
366365
debug!(
367366
inbox_id = self.client.inbox_id(),
368367
installation_id = hex::encode(self.client.installation_id()),
@@ -702,6 +701,14 @@ where
702701
let intent = provider
703702
.conn_ref()
704703
.find_group_intent_by_payload_hash(sha256(envelope.data.as_slice()));
704+
tracing::info!(
705+
inbox_id = self.client.inbox_id(),
706+
group_id = hex::encode(&self.group_id),
707+
msg_id = envelope.id,
708+
"Processing envelope with hash {:?}",
709+
hex::encode(sha256(envelope.data.as_slice()))
710+
);
711+
705712
match intent {
706713
// Intent with the payload hash matches
707714
Ok(Some(intent)) => {
@@ -729,6 +736,14 @@ where
729736
}
730737
// No matching intent found
731738
Ok(None) => {
739+
tracing::info!(
740+
inbox_id = self.client.inbox_id(),
741+
group_id = hex::encode(&self.group_id),
742+
msg_id = envelope.id,
743+
"client [{}] is about to process external envelope [{}]",
744+
self.client.inbox_id(),
745+
envelope.id
746+
);
732747
self.process_external_message(provider, message, envelope)
733748
.await
734749
}
@@ -792,7 +807,10 @@ where
792807
for message in messages.into_iter() {
793808
let result = retry_async!(
794809
Retry::default(),
795-
(async { self.consume_message(&message, provider.conn_ref()).await })
810+
(async {
811+
self.consume_message(&message, provider.conn_ref())
812+
.await
813+
})
796814
);
797815
if let Err(e) = result {
798816
let is_retryable = e.is_retryable();
@@ -1139,10 +1157,7 @@ where
11391157
return Ok(());
11401158
}
11411159
// determine how long of an interval in time to use before updating list
1142-
let interval_ns = match update_interval_ns {
1143-
Some(val) => val,
1144-
None => SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS,
1145-
};
1160+
let interval_ns = update_interval_ns.unwrap_or_else(|| SYNC_UPDATE_INSTALLATIONS_INTERVAL_NS);
11461161

11471162
let now_ns = crate::utils::time::now_ns();
11481163
let last_ns = provider

xmtp_mls/src/groups/mod.rs

+31-42
Original file line numberDiff line numberDiff line change
@@ -332,19 +332,6 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
332332
}
333333

334334
// Load the stored OpenMLS group from the OpenMLS provider's keystore
335-
#[tracing::instrument(level = "trace", skip_all)]
336-
pub(crate) fn load_mls_group(
337-
&self,
338-
provider: impl OpenMlsProvider,
339-
) -> Result<OpenMlsGroup, GroupError> {
340-
let mls_group =
341-
OpenMlsGroup::load(provider.storage(), &GroupId::from_slice(&self.group_id))
342-
.map_err(|_| GroupError::GroupNotFound)?
343-
.ok_or(GroupError::GroupNotFound)?;
344-
345-
Ok(mls_group)
346-
}
347-
348335
#[tracing::instrument(level = "trace", skip_all)]
349336
pub(crate) fn load_mls_group_with_lock<F, R>(
350337
&self,
@@ -358,7 +345,6 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
358345
let group_id = self.group_id.clone();
359346

360347
// Acquire the lock synchronously using blocking_lock
361-
362348
let _lock = MLS_COMMIT_LOCK.get_lock_sync(group_id.clone())?;
363349
// Load the MLS group
364350
let mls_group =
@@ -370,6 +356,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
370356
operation(mls_group)
371357
}
372358

359+
// Load the stored OpenMLS group from the OpenMLS provider's keystore
373360
#[tracing::instrument(level = "trace", skip_all)]
374361
pub(crate) async fn load_mls_group_with_lock_async<F, E, R, Fut>(
375362
&self,
@@ -1173,33 +1160,35 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
11731160
///
11741161
/// If the current user has been kicked out of the group, `is_active` will return `false`
11751162
pub fn is_active(&self, provider: impl OpenMlsProvider) -> Result<bool, GroupError> {
1176-
self.load_mls_group_with_lock(provider, |mls_group| Ok(mls_group.is_active()))
1163+
self.load_mls_group_with_lock(provider, |mls_group|
1164+
Ok(mls_group.is_active())
1165+
)
11771166
}
11781167

11791168
/// Get the `GroupMetadata` of the group.
11801169
pub fn metadata(&self, provider: impl OpenMlsProvider) -> Result<GroupMetadata, GroupError> {
1181-
self.load_mls_group_with_lock(provider, |mls_group| {
1170+
self.load_mls_group_with_lock(provider, |mls_group|
11821171
Ok(extract_group_metadata(&mls_group)?)
1183-
})
1172+
)
11841173
}
11851174

11861175
/// Get the `GroupMutableMetadata` of the group.
11871176
pub fn mutable_metadata(
11881177
&self,
11891178
provider: impl OpenMlsProvider,
11901179
) -> Result<GroupMutableMetadata, GroupError> {
1191-
self.load_mls_group_with_lock(provider, |mls_group| {
1180+
self.load_mls_group_with_lock(provider, |mls_group|
11921181
Ok(GroupMutableMetadata::try_from(&mls_group)?)
1193-
})
1182+
)
11941183
}
11951184

11961185
pub fn permissions(&self) -> Result<GroupMutablePermissions, GroupError> {
11971186
let conn = self.context().store().conn()?;
11981187
let provider = XmtpOpenMlsProvider::new(conn);
11991188

1200-
self.load_mls_group_with_lock(&provider, |mls_group| {
1189+
self.load_mls_group_with_lock(&provider, |mls_group|
12011190
Ok(extract_group_permissions(&mls_group)?)
1202-
})
1191+
)
12031192
}
12041193

12051194
/// Used for testing that dm group validation works as expected.
@@ -1920,17 +1909,17 @@ pub(crate) mod tests {
19201909

19211910
// Check Amal's MLS group state.
19221911
let amal_db = XmtpOpenMlsProvider::from(amal.context.store().conn().unwrap());
1923-
let amal_members_len = amal_group.load_mls_group_with_lock(&amal_db, |amal_mls_group| {
1924-
Ok(amal_mls_group.members().count())
1925-
}).unwrap();
1912+
let amal_members_len = amal_group.load_mls_group_with_lock(&amal_db, |mls_group|
1913+
Ok(mls_group.members().count())
1914+
).unwrap();
19261915

19271916
assert_eq!(amal_members_len, 3);
19281917

19291918
// Check Bola's MLS group state.
19301919
let bola_db = XmtpOpenMlsProvider::from(bola.context.store().conn().unwrap());
1931-
let bola_members_len = bola_group.load_mls_group_with_lock(&bola_db, |bola_mls_group| {
1932-
Ok(bola_mls_group.members().count())
1933-
}).unwrap();
1920+
let bola_members_len = bola_group.load_mls_group_with_lock(&bola_db, |mls_group|
1921+
Ok(mls_group.members().count())
1922+
).unwrap();
19341923

19351924
assert_eq!(bola_members_len, 3);
19361925

@@ -2009,8 +1998,8 @@ pub(crate) mod tests {
20091998
Ok(mls_group) // Return the updated group if necessary
20101999
}).unwrap();
20112000

2012-
force_add_member(&alix, &bo, &alix_group, &mut mls_group, &provider).await;
20132001
// Now add bo to the group
2002+
force_add_member(&alix, &bo, &alix_group, &mut mls_group, &provider).await;
20142003

20152004
// Bo should not be able to actually read this group
20162005
bo.sync_welcomes(&bo.store().conn().unwrap()).await.unwrap();
@@ -2134,9 +2123,9 @@ pub(crate) mod tests {
21342123
assert_eq!(messages.len(), 2);
21352124

21362125
let provider: XmtpOpenMlsProvider = client.context.store().conn().unwrap().into();
2137-
let pending_commit_is_none = group.load_mls_group_with_lock(&provider, |mls_group| {
2126+
let pending_commit_is_none = group.load_mls_group_with_lock(&provider, |mls_group|
21382127
Ok(mls_group.pending_commit().is_none())
2139-
}).unwrap();
2128+
).unwrap();
21402129

21412130
assert!(pending_commit_is_none);
21422131

@@ -2317,9 +2306,9 @@ pub(crate) mod tests {
23172306
assert!(new_installations_were_added.is_ok());
23182307

23192308
group.sync().await.unwrap();
2320-
let num_members = group.load_mls_group_with_lock(&provider, |mls_group| {
2309+
let num_members = group.load_mls_group_with_lock(&provider, |mls_group|
23212310
Ok(mls_group.members().collect::<Vec<_>>().len())
2322-
}).unwrap();
2311+
).unwrap();
23232312

23242313
assert_eq!(num_members, 3);
23252314
}
@@ -3899,9 +3888,9 @@ pub(crate) mod tests {
38993888
)
39003889
.unwrap();
39013890
assert!(valid_dm_group
3902-
.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| {
3891+
.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group|
39033892
validate_dm_group(&client, &mls_group, added_by_inbox)
3904-
})
3893+
)
39053894
.is_ok());
39063895

39073896
// Test case 2: Invalid conversation type
@@ -3917,9 +3906,9 @@ pub(crate) mod tests {
39173906
)
39183907
.unwrap();
39193908
assert!(matches!(
3920-
invalid_type_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| {
3909+
invalid_type_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group|
39213910
validate_dm_group(&client, &mls_group, added_by_inbox)
3922-
}),
3911+
),
39233912
Err(GroupError::Generic(msg)) if msg.contains("Invalid conversation type")
39243913
));
39253914
// Test case 3: Missing DmMembers
@@ -3939,9 +3928,9 @@ pub(crate) mod tests {
39393928
)
39403929
.unwrap();
39413930
assert!(matches!(
3942-
mismatched_dm_members_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| {
3931+
mismatched_dm_members_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group|
39433932
validate_dm_group(&client, &mls_group, added_by_inbox)
3944-
}),
3933+
),
39453934
Err(GroupError::Generic(msg)) if msg.contains("DM members do not match expected inboxes")
39463935
));
39473936

@@ -3961,9 +3950,9 @@ pub(crate) mod tests {
39613950
)
39623951
.unwrap();
39633952
assert!(matches!(
3964-
non_empty_admin_list_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| {
3953+
non_empty_admin_list_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group|
39653954
validate_dm_group(&client, &mls_group, added_by_inbox)
3966-
}),
3955+
),
39673956
Err(GroupError::Generic(msg)) if msg.contains("DM group must have empty admin and super admin lists")
39683957
));
39693958

@@ -3982,9 +3971,9 @@ pub(crate) mod tests {
39823971
)
39833972
.unwrap();
39843973
assert!(matches!(
3985-
invalid_permissions_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group| {
3974+
invalid_permissions_group.load_mls_group_with_lock(client.mls_provider().unwrap(), |mls_group|
39863975
validate_dm_group(&client, &mls_group, added_by_inbox)
3987-
}),
3976+
),
39883977
Err(GroupError::Generic(msg)) if msg.contains("Invalid permissions for DM group")
39893978
));
39903979
}

xmtp_mls/src/groups/subscriptions.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,24 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
4141
let process_result = retry_async!(
4242
Retry::default(),
4343
(async {
44+
let client_id = &client_id;
4445
let msgv1 = &msgv1;
4546
self.context()
4647
.store()
4748
.transaction_async(|provider| async move {
4849
let prov_ref = &provider; // Borrow provider instead of moving it
4950
self.load_mls_group_with_lock_async(
5051
prov_ref,
51-
|mut mls_group| async move {
52+
|mls_group| async move {
5253
// Attempt processing immediately, but fail if the message is not an Application Message
5354
// Returning an error should roll back the DB tx
55+
tracing::info!(
56+
inbox_id = self.client.inbox_id(),
57+
group_id = hex::encode(&self.group_id),
58+
msg_id = msgv1.id,
59+
"current epoch for [{}] in process_stream_entry()",
60+
client_id,
61+
);
5462
self.process_message(&prov_ref, msgv1, false)
5563
.await
5664
// NOTE: We want to make sure we retry an error in process_message

0 commit comments

Comments
 (0)