Skip to content

Commit c9b5d6b

Browse files
authored
Reproduce and fix streaming issue (#550)
## Summary - Reproduces the streaming issue seen in RN iOS in pure Rust. - Adds a hacky version of an Authenticator so that we can send publish requests to the dev network without getting auth errors back - Sets `http2_keep_alive_interval` to 10 seconds to fix the disconnect issue. ## Notes Issue is only reproducible in the dev network and only with ~15 seconds of inactivity in the stream.
1 parent 144d214 commit c9b5d6b

File tree

8 files changed

+243
-3
lines changed

8 files changed

+243
-3
lines changed

Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bindings_ffi/Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bindings_ffi/src/v2.rs

+35-1
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,9 @@ mod tests {
502502
use futures::stream;
503503
use xmtp_proto::api_client::{Envelope, Error as ApiError};
504504

505-
use crate::v2::FfiV2Subscription;
505+
use crate::v2::{
506+
create_v2_client, FfiEnvelope, FfiPublishRequest, FfiV2SubscribeRequest, FfiV2Subscription,
507+
};
506508

507509
// Try a query on a test topic, and make sure we get a response
508510
#[tokio::test]
@@ -577,4 +579,36 @@ mod tests {
577579
let second = stream_handler.next().await;
578580
assert!(second.is_err());
579581
}
582+
583+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
584+
async fn test_e2e() {
585+
let client = create_v2_client("http://localhost:5556".to_string(), false)
586+
.await
587+
.unwrap();
588+
let content_topic = "/xmtp/0/foo";
589+
590+
let subscription = client
591+
.subscribe(FfiV2SubscribeRequest {
592+
content_topics: vec![content_topic.to_string()],
593+
})
594+
.await
595+
.unwrap();
596+
597+
client
598+
.publish(
599+
FfiPublishRequest {
600+
envelopes: vec![FfiEnvelope {
601+
content_topic: content_topic.to_string(),
602+
timestamp_ns: 3,
603+
message: vec![1, 2, 3],
604+
}],
605+
},
606+
"".to_string(),
607+
)
608+
.await
609+
.unwrap();
610+
611+
let sub_result = subscription.next().await.unwrap();
612+
assert_eq!(sub_result.content_topic, content_topic.to_string());
613+
}
580614
}

xmtp_api_grpc/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ version = "0.1.0"
77
async-stream = "0.3.5"
88
base64 = "0.21.0"
99
futures = "0.3.29"
10+
hex.workspace = true
1011
http-body = "0.4.5"
1112
hyper = "0.14.26"
1213
log = { version = "0.4", features = ["std"] }
@@ -24,6 +25,7 @@ tonic = { workspace = true, features = [
2425
tower = "0.4.13"
2526
webpki-roots = "0.23.0"
2627
xmtp_proto = { path = "../xmtp_proto", features = ["proto_full", "grpc"] }
28+
xmtp_v2 = { path = "../xmtp_v2" }
2729

2830
[dev-dependencies]
2931
uuid = { version = "1.3.1", features = ["v4"] }

xmtp_api_grpc/src/auth_token.rs

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use base64::Engine;
2+
use prost::Message;
3+
use xmtp_proto::xmtp::message_api::v1::{AuthData, Token};
4+
use xmtp_proto::xmtp::message_contents::private_key_bundle::Version;
5+
use xmtp_proto::xmtp::message_contents::signature::{EcdsaCompact, Union as SignatureUnion};
6+
use xmtp_proto::xmtp::message_contents::{
7+
private_key::Union as PrivateKeyUnion, PrivateKeyBundle, PrivateKeyBundleV1, PublicKey,
8+
Signature,
9+
};
10+
use xmtp_v2::k256_helper::sign_keccak_256;
11+
12+
fn create_auth_data(wallet_address: String) -> AuthData {
13+
AuthData {
14+
wallet_addr: wallet_address,
15+
created_ns: std::time::SystemTime::now()
16+
.duration_since(std::time::UNIX_EPOCH)
17+
.expect("Time went backwards")
18+
.as_nanos() as u64,
19+
}
20+
}
21+
22+
pub struct Authenticator {
23+
identity_key: PublicKey,
24+
wallet_address: String,
25+
private_key_bytes: Vec<u8>,
26+
}
27+
28+
impl Authenticator {
29+
pub fn new(
30+
identity_key: PublicKey,
31+
wallet_address: String,
32+
private_key_bytes: Vec<u8>,
33+
) -> Self {
34+
Self {
35+
identity_key,
36+
wallet_address,
37+
private_key_bytes,
38+
}
39+
}
40+
41+
pub fn create_token(&self) -> String {
42+
let auth_data = create_auth_data(self.wallet_address.clone());
43+
let mut serialized = Vec::new();
44+
auth_data
45+
.encode(&mut serialized)
46+
.expect("serialization failed");
47+
48+
let signature = self.sign(serialized.as_slice());
49+
50+
let token = Token {
51+
identity_key: Some(self.identity_key.clone()),
52+
auth_data_bytes: serialized,
53+
auth_data_signature: Some(signature),
54+
};
55+
let mut token_bytes = Vec::new();
56+
let _ = token.encode(&mut token_bytes);
57+
58+
let token_base64 = base64::engine::general_purpose::STANDARD.encode(&token_bytes);
59+
token_base64
60+
}
61+
62+
fn sign(&self, bytes_to_sign: &[u8]) -> Signature {
63+
let (sig, recovery) = sign_keccak_256(self.private_key_bytes.as_slice(), bytes_to_sign)
64+
.expect("signature failed");
65+
66+
Signature {
67+
union: Some(SignatureUnion::EcdsaCompact(EcdsaCompact {
68+
bytes: sig,
69+
recovery: recovery as u32,
70+
})),
71+
}
72+
}
73+
74+
pub fn from_bytes(private_key_bundle_bytes: Vec<u8>, wallet_address: String) -> Self {
75+
let bundle = PrivateKeyBundle::decode(&mut private_key_bundle_bytes.as_slice())
76+
.expect("deserialization");
77+
let identity_key = match bundle.version {
78+
Some(Version::V1(PrivateKeyBundleV1 {
79+
identity_key,
80+
pre_keys: _,
81+
})) => identity_key.unwrap(),
82+
_ => panic!("missing identity key"),
83+
};
84+
85+
let private_key_bytes = match identity_key.union {
86+
Some(PrivateKeyUnion::Secp256k1(inner)) => inner.bytes.clone(),
87+
_ => panic!("missing private key bytes"),
88+
};
89+
90+
Self {
91+
wallet_address,
92+
identity_key: identity_key.public_key.unwrap(),
93+
private_key_bytes,
94+
}
95+
}
96+
97+
pub fn from_hex(private_key_bundle_string: String, wallet_address: String) -> Self {
98+
let decoded_bytes = hex::decode(private_key_bundle_string).unwrap();
99+
Self::from_bytes(decoded_bytes, wallet_address)
100+
}
101+
}

xmtp_api_grpc/src/grpc_api_helper.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ async fn create_tls_channel(address: String) -> Result<Channel, Error> {
3333
.map_err(|e| Error::new(ErrorKind::SetupError).with(e))?
3434
.keep_alive_while_idle(true)
3535
.connect_timeout(Duration::from_secs(5))
36-
.http2_keep_alive_interval(Duration::from_secs(3))
37-
.keep_alive_timeout(Duration::from_secs(5))
36+
.http2_keep_alive_interval(Duration::from_secs(10))
37+
.keep_alive_timeout(Duration::from_secs(25))
3838
.tls_config(ClientTlsConfig::new())
3939
.map_err(|e| Error::new(ErrorKind::SetupError).with(e))?
4040
.connect()

xmtp_api_grpc/src/lib.rs

+90
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod auth_token;
12
pub mod grpc_api_helper;
23

34
pub const LOCALHOST_ADDRESS: &str = "http://localhost:5556";
@@ -9,6 +10,8 @@ pub use grpc_api_helper::Client;
910
mod tests {
1011
use std::time::{SystemTime, UNIX_EPOCH};
1112

13+
use self::auth_token::Authenticator;
14+
1215
use super::*;
1316
use futures::StreamExt;
1417
use xmtp_proto::{
@@ -18,6 +21,9 @@ mod tests {
1821
},
1922
};
2023

24+
const PRIVATE_KEY_BUNDLE_HEX: &str = "0a88030ac20108eec0888ae33112220a201cd19d1d6e129cb8f8ba4bd85aae10ffcc97a3de939d85f9bc378d47e6ba83711a940108eec0888ae33112460a440a40130cfb1cd667f48585f90372fe4b529da318e83221a3bfd1446ef6cf00d173543fed831d1517d310b05bd5ab138fde22af50a3ffce1aa72da8c7084e9bab0e4910011a430a4104c4eb77c3b2eaacaca12e2b55c6c42dc33f4518a5690bb49cd6ae0e0a652e59fbc9defd98242d30a0737a13c3461cac1edc0f8e3007d65b1637382088ac1cd3d712c00108a4c1888ae33112220a2062e553bceac5247e7bebfdcc8c31959965603e442f79c6346028060ab2129e931a920108a4c1888ae33112440a420a40d12c6ab6eb1874edd3044fdc753543516130bd4d1db11024bd81cd9c2c4bb6b6138e85ed313f387ea7707e09090659b580ee22f42f022c4521e4a11ab7abddfc1a430a4104175097c31bbe1700729f1f1ede87b8bd21a5bc62e4bb4c963e0de885080048bd31138b657fd9146aa8255f1c57c4fa1f8cb7b30bed8803eed48d6a3e67e71ccf";
25+
const WALLET_ADDRESS: &str = "0xA38A1f04B29dea1de621E17447fB4efB11BFfBdf";
26+
2127
// Return the json serialization of an Envelope with bytes
2228
pub fn test_envelope(topic: String) -> Envelope {
2329
let time_since_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
@@ -29,6 +35,16 @@ mod tests {
2935
}
3036
}
3137

38+
fn get_auth_token() -> String {
39+
// This is a private key bundle exported from the JS SDK and hex encoded
40+
let authenticator = Authenticator::from_hex(
41+
PRIVATE_KEY_BUNDLE_HEX.to_string(),
42+
WALLET_ADDRESS.to_string(),
43+
);
44+
45+
authenticator.create_token()
46+
}
47+
3248
#[tokio::test]
3349
async fn grpc_query_test() {
3450
let mut client = Client::create(LOCALHOST_ADDRESS.to_string(), false)
@@ -210,4 +226,78 @@ mod tests {
210226
let value_2 = stream.next().await.unwrap().unwrap();
211227
assert_eq!(value_2.content_topic, topic_2.to_string());
212228
}
229+
230+
#[tokio::test]
231+
async fn test_dev_publish() {
232+
let auth_token = get_auth_token();
233+
let dev_client = Client::create(DEV_ADDRESS.to_string(), true).await.unwrap();
234+
dev_client
235+
.publish(
236+
auth_token,
237+
PublishRequest {
238+
envelopes: vec![Envelope {
239+
content_topic: "/xmtp/0/foo/2".to_string(),
240+
timestamp_ns: 3,
241+
message: vec![1, 2, 3],
242+
}],
243+
},
244+
)
245+
.await
246+
.unwrap();
247+
}
248+
249+
#[tokio::test]
250+
async fn long_lived_subscribe_test() {
251+
let auth_token = get_auth_token();
252+
tokio::time::timeout(std::time::Duration::from_secs(30), async move {
253+
let client = Client::create(DEV_ADDRESS.to_string(), true).await.unwrap();
254+
255+
let topic = uuid::Uuid::new_v4();
256+
let mut subscription = client
257+
.subscribe2(SubscribeRequest {
258+
content_topics: vec![topic.to_string()],
259+
})
260+
.await
261+
.unwrap();
262+
263+
client
264+
.publish(
265+
auth_token.to_string(),
266+
PublishRequest {
267+
envelopes: vec![test_envelope(topic.to_string())],
268+
},
269+
)
270+
.await
271+
.unwrap();
272+
273+
// Sleep to give the response time to come back
274+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
275+
276+
let mut next_message = subscription.next().await.unwrap();
277+
if let Err(err) = next_message {
278+
panic!("Message 1 Error: {}", err);
279+
}
280+
281+
tokio::time::sleep(std::time::Duration::from_secs(15)).await;
282+
client
283+
.publish(
284+
auth_token.to_string(),
285+
PublishRequest {
286+
envelopes: vec![test_envelope(topic.to_string())],
287+
},
288+
)
289+
.await
290+
.unwrap();
291+
292+
// Sleep to give the response time to come back
293+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
294+
295+
next_message = subscription.next().await.unwrap();
296+
if let Err(err) = next_message {
297+
panic!("Message 2 Error: {}", err);
298+
}
299+
})
300+
.await
301+
.expect("Timed out");
302+
}
213303
}

xmtp_v2/src/k256_helper.rs

+9
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,15 @@ pub fn sign_sha256(secret_key: &[u8], message: &[u8]) -> Result<(Vec<u8>, u8), S
4343
Ok((signature.to_vec(), recovery_id.to_byte()))
4444
}
4545

46+
pub fn sign_keccak_256(secret_key: &[u8], message: &[u8]) -> Result<(Vec<u8>, u8), String> {
47+
let signing_key = SigningKey::from_bytes(secret_key).map_err(|e| e.to_string())?;
48+
let hash = Keccak256::new().chain(message);
49+
let (signature, recovery_id) = signing_key
50+
.sign_digest_recoverable::<Keccak256>(hash)
51+
.map_err(|e| e.to_string())?;
52+
Ok((signature.to_vec(), recovery_id.to_byte()))
53+
}
54+
4655
/// Verify given a compact signature, recovery_id, digest, and public key in uncompressed format
4756
/// NOTE: the recovery_id situation is not necessary, but it is a good sanity check
4857
pub fn verify_sha256(

0 commit comments

Comments
 (0)