Skip to content

Commit 798e5da

Browse files
nplasterercameronvoellcodabrink
authored
Correctly set shouldPush for messages (#1707)
* generate the protos and the database changes * store should push * update places to set the should push * Update schema_gen.rs * Update schema_gen.rs * dont pull in the payer proto changes * undo the toml change * fix test utils * add should push mapping for content types (#1715) * add should push mapping for content types * add comment --------- Co-authored-by: cameronvoell <cameronvoell@users.noreply.github.com> * fix the borrow * cargo fmt * add should_push * set to false for other content types * update the defaults * should push stored on inten * should push stored on inten (#1717) * add tests for it * fix the linte --------- Co-authored-by: Cameron Voell <1103838+cameronvoell@users.noreply.github.com> Co-authored-by: cameronvoell <cameronvoell@users.noreply.github.com> Co-authored-by: Dakota Brink <git@kota.is> Co-authored-by: Dakota Brink <779390+codabrink@users.noreply.github.com>
1 parent a821d7e commit 798e5da

File tree

15 files changed

+1147
-961
lines changed

15 files changed

+1147
-961
lines changed

xmtp_api/src/test_utils.rs

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub fn build_group_messages(num_messages: usize, group_id: Vec<u8>) -> Vec<Group
4343
group_id: group_id.clone(),
4444
data: vec![i as u8],
4545
sender_hmac: vec![],
46+
should_push: Some(true),
4647
})),
4748
})
4849
}

xmtp_api_grpc/src/replication_client.rs

+1
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ impl XmtpMlsClient for ClientV4 {
335335
group_id: req.group_id.clone(),
336336
data: v1_group_message.data,
337337
sender_hmac: v1_group_message.sender_hmac,
338+
should_push: v1_group_message.should_push,
338339
})),
339340
})
340341
})

xmtp_content_types/src/lib.rs

+60
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,66 @@ pub enum CodecError {
2121
Decode(String),
2222
}
2323

24+
pub enum ContentType {
25+
Text,
26+
GroupMembershipChange,
27+
GroupUpdated,
28+
Reaction,
29+
ReadReceipt,
30+
Reply,
31+
Attachment,
32+
RemoteAttachment,
33+
MultiRemoteAttachment,
34+
TransactionReference,
35+
}
36+
37+
impl TryFrom<&str> for ContentType {
38+
type Error = String;
39+
40+
fn try_from(type_id: &str) -> Result<Self, Self::Error> {
41+
match type_id {
42+
text::TextCodec::TYPE_ID => Ok(Self::Text),
43+
membership_change::GroupMembershipChangeCodec::TYPE_ID => {
44+
Ok(Self::GroupMembershipChange)
45+
}
46+
group_updated::GroupUpdatedCodec::TYPE_ID => Ok(Self::GroupUpdated),
47+
reaction::ReactionCodec::TYPE_ID => Ok(Self::Reaction),
48+
read_receipt::ReadReceiptCodec::TYPE_ID => Ok(Self::ReadReceipt),
49+
reply::ReplyCodec::TYPE_ID => Ok(Self::Reply),
50+
attachment::AttachmentCodec::TYPE_ID => Ok(Self::Attachment),
51+
remote_attachment::RemoteAttachmentCodec::TYPE_ID => Ok(Self::RemoteAttachment),
52+
multi_remote_attachment::MultiRemoteAttachmentCodec::TYPE_ID => {
53+
Ok(Self::MultiRemoteAttachment)
54+
}
55+
transaction_reference::TransactionReferenceCodec::TYPE_ID => {
56+
Ok(Self::TransactionReference)
57+
}
58+
_ => Err(format!("Unknown content type ID: {}", type_id)),
59+
}
60+
}
61+
}
62+
63+
// Represents whether this message type should send pushed notification when received by a user
64+
pub fn should_push(content_type_id: String) -> bool {
65+
let content_type = ContentType::try_from(content_type_id.as_str()).ok();
66+
if let Some(content_type) = content_type {
67+
match content_type {
68+
ContentType::Text => true,
69+
ContentType::GroupMembershipChange => false,
70+
ContentType::GroupUpdated => false,
71+
ContentType::Reaction => false,
72+
ContentType::ReadReceipt => false,
73+
ContentType::Reply => true,
74+
ContentType::Attachment => true,
75+
ContentType::RemoteAttachment => true,
76+
ContentType::MultiRemoteAttachment => true,
77+
ContentType::TransactionReference => true,
78+
}
79+
} else {
80+
false
81+
}
82+
}
83+
2484
pub trait ContentCodec<T> {
2585
fn content_type() -> ContentTypeId;
2686
fn encode(content: T) -> Result<EncodedContent, CodecError>;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE group_messages
2+
DROP COLUMN should_push;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE group_intents
2+
ADD COLUMN should_push BOOLEAN NOT NULL DEFAULT TRUE;

xmtp_mls/src/groups/device_sync.rs

+25-15
Original file line numberDiff line numberDiff line change
@@ -438,15 +438,20 @@ where
438438
let content = DeviceSyncContent::Request(request.clone());
439439
let content_bytes = serde_json::to_vec(&content)?;
440440

441-
let _message_id = sync_group.prepare_message(&content_bytes, provider, {
442-
let request = request.clone();
443-
move |now| PlaintextEnvelope {
444-
content: Some(Content::V2(V2 {
445-
message_type: Some(MessageType::DeviceSyncRequest(request)),
446-
idempotency_key: now.to_string(),
447-
})),
448-
}
449-
})?;
441+
let _message_id = sync_group.prepare_message(
442+
&content_bytes,
443+
provider,
444+
{
445+
let request = request.clone();
446+
move |now| PlaintextEnvelope {
447+
content: Some(Content::V2(V2 {
448+
message_type: Some(MessageType::DeviceSyncRequest(request)),
449+
idempotency_key: now.to_string(),
450+
})),
451+
}
452+
},
453+
false,
454+
)?;
450455

451456
// publish the intent
452457
sync_group.publish_intents(provider).await?;
@@ -507,12 +512,17 @@ where
507512
(content_bytes, contents)
508513
};
509514

510-
sync_group.prepare_message(&content_bytes, provider, |now| PlaintextEnvelope {
511-
content: Some(Content::V2(V2 {
512-
message_type: Some(MessageType::DeviceSyncReply(contents)),
513-
idempotency_key: now.to_string(),
514-
})),
515-
})?;
515+
sync_group.prepare_message(
516+
&content_bytes,
517+
provider,
518+
|now| PlaintextEnvelope {
519+
content: Some(Content::V2(V2 {
520+
message_type: Some(MessageType::DeviceSyncReply(contents)),
521+
idempotency_key: now.to_string(),
522+
})),
523+
},
524+
false,
525+
)?;
516526

517527
sync_group.publish_intents(provider).await?;
518528

xmtp_mls/src/groups/device_sync/preference_sync.rs

+11-6
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,17 @@ impl UserPreferenceUpdate {
3131
.collect::<Result<Vec<_>, _>>()?;
3232
let update_proto = UserPreferenceUpdateProto { contents: updates };
3333
let content_bytes = serde_json::to_vec(&update_proto)?;
34-
sync_group.prepare_message(&content_bytes, &provider, |now| PlaintextEnvelope {
35-
content: Some(Content::V2(V2 {
36-
message_type: Some(MessageType::UserPreferenceUpdate(update_proto)),
37-
idempotency_key: now.to_string(),
38-
})),
39-
})?;
34+
sync_group.prepare_message(
35+
&content_bytes,
36+
&provider,
37+
|now| PlaintextEnvelope {
38+
content: Some(Content::V2(V2 {
39+
message_type: Some(MessageType::UserPreferenceUpdate(update_proto)),
40+
idempotency_key: now.to_string(),
41+
})),
42+
},
43+
false,
44+
)?;
4045

4146
sync_group.publish_intents(&provider).await?;
4247

xmtp_mls/src/groups/intents.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
6161
provider: &XmtpOpenMlsProvider,
6262
intent_kind: IntentKind,
6363
intent_data: Vec<u8>,
64+
should_push: bool,
6465
) -> Result<StoredGroupIntent, GroupError> {
6566
let res = provider.transaction(|provider| {
6667
let conn = provider.conn_ref();
67-
self.queue_intent_with_conn(conn, intent_kind, intent_data)
68+
self.queue_intent_with_conn(conn, intent_kind, intent_data, should_push)
6869
});
6970

7071
res
@@ -75,6 +76,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
7576
conn: &DbConnection,
7677
intent_kind: IntentKind,
7778
intent_data: Vec<u8>,
79+
should_push: bool,
7880
) -> Result<StoredGroupIntent, GroupError> {
7981
if intent_kind == IntentKind::SendMessage {
8082
self.maybe_insert_key_update_intent(conn)?;
@@ -84,6 +86,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
8486
intent_kind,
8587
self.group_id.clone(),
8688
intent_data,
89+
should_push,
8790
))?;
8891

8992
if intent_kind != IntentKind::SendMessage {
@@ -99,7 +102,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
99102
let now_ns = xmtp_common::time::now_ns();
100103
let elapsed_ns = now_ns - last_rotated_at_ns;
101104
if elapsed_ns > GROUP_KEY_ROTATION_INTERVAL_NS {
102-
self.queue_intent_with_conn(conn, IntentKind::KeyUpdate, vec![])?;
105+
self.queue_intent_with_conn(conn, IntentKind::KeyUpdate, vec![], false)?;
103106
}
104107
Ok(())
105108
}

xmtp_mls/src/groups/mls_sync.rs

+13-3
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ struct PublishIntentData {
175175
staged_commit: Option<Vec<u8>>,
176176
post_commit_action: Option<Vec<u8>>,
177177
payload_to_publish: Vec<u8>,
178+
should_send_push_notification: bool,
178179
}
179180

180181
impl<ScopedClient> MlsGroup<ScopedClient>
@@ -1234,6 +1235,7 @@ where
12341235
payload_to_publish,
12351236
post_commit_action,
12361237
staged_commit,
1238+
should_send_push_notification
12371239
})) => {
12381240
let payload_slice = payload_to_publish.as_slice();
12391241
let has_staged_commit = staged_commit.is_some();
@@ -1255,7 +1257,7 @@ where
12551257
intent.id
12561258
);
12571259

1258-
let messages = self.prepare_group_messages(vec![payload_slice])?;
1260+
let messages = self.prepare_group_messages(vec![(payload_slice, should_send_push_notification)])?;
12591261
self.client
12601262
.api()
12611263
.send_group_messages(messages)
@@ -1330,6 +1332,7 @@ where
13301332
payload_to_publish: msg.tls_serialize_detached()?,
13311333
post_commit_action: None,
13321334
staged_commit: None,
1335+
should_send_push_notification: intent.should_push,
13331336
}))
13341337
}
13351338
IntentKind::KeyUpdate => {
@@ -1343,6 +1346,7 @@ where
13431346
payload_to_publish: commit.tls_serialize_detached()?,
13441347
staged_commit: get_and_clear_pending_commit(openmls_group, provider)?,
13451348
post_commit_action: None,
1349+
should_send_push_notification: intent.should_push,
13461350
}))
13471351
}
13481352
IntentKind::MetadataUpdate => {
@@ -1365,6 +1369,7 @@ where
13651369
payload_to_publish: commit_bytes,
13661370
staged_commit: get_and_clear_pending_commit(openmls_group, provider)?,
13671371
post_commit_action: None,
1372+
should_send_push_notification: intent.should_push,
13681373
}))
13691374
}
13701375
IntentKind::UpdateAdminList => {
@@ -1386,6 +1391,7 @@ where
13861391
payload_to_publish: commit_bytes,
13871392
staged_commit: get_and_clear_pending_commit(openmls_group, provider)?,
13881393
post_commit_action: None,
1394+
should_send_push_notification: intent.should_push,
13891395
}))
13901396
}
13911397
IntentKind::UpdatePermission => {
@@ -1405,6 +1411,7 @@ where
14051411
payload_to_publish: commit_bytes,
14061412
staged_commit: get_and_clear_pending_commit(openmls_group, provider)?,
14071413
post_commit_action: None,
1414+
should_send_push_notification: intent.should_push,
14081415
}))
14091416
}
14101417
}
@@ -1496,6 +1503,7 @@ where
14961503
provider,
14971504
IntentKind::UpdateGroupMembership,
14981505
intent_data.into(),
1506+
false,
14991507
)?;
15001508

15011509
self.sync_until_intent_resolved(provider, intent.id).await
@@ -1689,7 +1697,7 @@ where
16891697
#[tracing::instrument(level = "trace", skip_all)]
16901698
pub(super) fn prepare_group_messages(
16911699
&self,
1692-
payloads: Vec<&[u8]>,
1700+
payloads: Vec<(&[u8], bool)>,
16931701
) -> Result<Vec<GroupMessageInput>, GroupError> {
16941702
let hmac_key = self
16951703
.hmac_keys(0..=0)?
@@ -1699,7 +1707,7 @@ where
16991707
Hmac::<Sha256>::new_from_slice(&hmac_key.key).expect("HMAC can take key of any size");
17001708

17011709
let mut result = vec![];
1702-
for payload in payloads {
1710+
for (payload, should_push) in payloads {
17031711
let mut sender_hmac = sender_hmac.clone();
17041712
sender_hmac.update(payload);
17051713
let sender_hmac = sender_hmac.finalize();
@@ -1708,6 +1716,7 @@ where
17081716
version: Some(GroupMessageInputVersion::V1(GroupMessageInputV1 {
17091717
data: payload.to_vec(),
17101718
sender_hmac: sender_hmac.into_bytes().to_vec(),
1719+
should_push: Some(should_push),
17111720
})),
17121721
});
17131722
}
@@ -1907,6 +1916,7 @@ async fn apply_update_group_membership_intent(
19071916
payload_to_publish: commit.tls_serialize_detached()?,
19081917
post_commit_action: post_commit_action.map(|action| action.to_bytes()),
19091918
staged_commit: Some(staged_commit),
1919+
should_send_push_notification: false,
19101920
}))
19111921
}
19121922

0 commit comments

Comments
 (0)