Skip to content

Commit 573bdc6

Browse files
authored
Simplify external API interface (#206)
1. Make all methods of `Conversations` static, and accept a `Client` argument, so that `Conversations` doesn't need to be instantiated anywhere. `Conversations` doesn't really hold state anyway and effectively functions as a wrapper for `Client` right now, so it doesn't make sense for consumers to have to instantiate it and hold onto it. 2. Call `process_outbound_messages()` on init and on message send, so that consumers don't have to manually call it. 3. Limit the connection pool size to 1 for the ephemeral DB instance, because when multiple connections are made, each one operates on a separate database. In the event that a connection is requested from the pool while another one is ongoing, it will wait for a time limit before expiring with an error. An added plus of this is it's much easier for tests to detect deadlocks. This simplifies the client API quite a bit, as can be seen in `bindings_ffi` and in the `CLI`. Ideally, we could have spawned a separate thread to call `process_outbound_messages` so that the existing thread is not blocked, but that requires additional refactoring. This requires us to enforce `XmtpApiClient` is `Send + Sync` (which is easy), but also comes up against issues with lifetimes for `Client`, for which a reference is needed in the other thread that may outlive the current thread. This possibly requires us to start using `Arcs` again.
1 parent 60347f8 commit 573bdc6

File tree

7 files changed

+209
-297
lines changed

7 files changed

+209
-297
lines changed

bindings_ffi/src/lib.rs

+10-14
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use log::info;
66
use logger::FfiLogger;
77
use std::error::Error;
88
use std::sync::Arc;
9-
use xmtp::conversation::ListMessagesOptions;
9+
use xmtp::conversation::{ListMessagesOptions, SecretConversation};
10+
use xmtp::conversations::Conversations;
1011
use xmtp::storage::StoredMessage;
1112
use xmtp::types::Address;
1213
use xmtp_networking::grpc_api_helper::Client as TonicApiClient;
@@ -111,9 +112,7 @@ impl FfiConversations {
111112
&self,
112113
wallet_address: String,
113114
) -> Result<Arc<FfiConversation>, GenericError> {
114-
let conversations = xmtp::conversations::Conversations::new(self.inner_client.as_ref());
115-
let convo = conversations
116-
.new_secret_conversation(wallet_address)
115+
let convo = SecretConversation::new(self.inner_client.as_ref(), wallet_address)
117116
.map_err(|e| e.to_string())?;
118117
// TODO: This should happen as part of `new_secret_conversation` and should only send to new participants
119118
convo.initialize().await.map_err(|e| e.to_string())?;
@@ -128,8 +127,9 @@ impl FfiConversations {
128127

129128
pub async fn list(&self) -> Result<Vec<Arc<FfiConversation>>, GenericError> {
130129
let inner = self.inner_client.as_ref();
131-
let conversations = xmtp::conversations::Conversations::new(inner);
132-
let convo_list = conversations.list(true).await.map_err(|e| e.to_string())?;
130+
let convo_list = Conversations::list(inner, true)
131+
.await
132+
.map_err(|e| e.to_string())?;
133133
let out: Vec<Arc<FfiConversation>> = convo_list
134134
.into_iter()
135135
.map(|convo| {
@@ -173,15 +173,10 @@ impl FfiConversation {
173173
let conversation = xmtp::conversation::SecretConversation::new(
174174
self.inner_client.as_ref(),
175175
self.peer_address.clone(),
176-
);
177-
let conversations = xmtp::conversations::Conversations::new(self.inner_client.as_ref());
178-
176+
)
177+
.map_err(|e| e.to_string())?;
179178
conversation
180179
.send(content_bytes)
181-
.map_err(|e| e.to_string())?;
182-
183-
conversations
184-
.process_outbound_messages()
185180
.await
186181
.map_err(|e| e.to_string())?;
187182

@@ -195,7 +190,8 @@ impl FfiConversation {
195190
let conversation = xmtp::conversation::SecretConversation::new(
196191
self.inner_client.as_ref(),
197192
self.peer_address.clone(),
198-
);
193+
)
194+
.map_err(|e| e.to_string())?;
199195
let options: ListMessagesOptions = opts.to_options();
200196

201197
let messages: Vec<Arc<FfiMessage>> = conversation

examples/cli/cli-client.rs

+4-12
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ async fn main() {
160160
.unwrap();
161161

162162
recv(&client).await.unwrap();
163-
let conversations = Conversations::new(&client);
164-
let convo_list = conversations.list(true).await.unwrap();
163+
let convo_list = Conversations::list(&client, true).await.unwrap();
165164

166165
for (index, convo) in convo_list.iter().enumerate() {
167166
info!(
@@ -253,23 +252,16 @@ async fn register(cli: &Cli, use_local_db: bool, wallet_seed: &u64) -> Result<()
253252
}
254253

255254
async fn send(client: Client, addr: &str, msg: &String) -> Result<(), CliError> {
256-
let conversations = Conversations::new(&client);
257-
let conversation = conversations
258-
.new_secret_conversation(addr.to_string())
259-
.unwrap();
255+
let conversation = SecretConversation::new(&client, addr.to_string()).unwrap();
260256
conversation.initialize().await.unwrap();
261-
conversation.send_text(msg).unwrap();
262-
conversations.process_outbound_messages().await.unwrap();
257+
conversation.send_text(msg).await.unwrap();
263258
info!("Message successfully sent");
264259

265260
Ok(())
266261
}
267262

268263
async fn recv(client: &Client) -> Result<(), CliError> {
269-
let conversations = Conversations::new(client);
270-
conversations.save_inbound_messages()?;
271-
conversations.process_inbound_messages()?;
272-
264+
Conversations::receive(client)?;
273265
Ok(())
274266
}
275267

xmtp/src/client.rs

+7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use vodozemac::olm::PreKeyMessage;
99
use crate::{
1010
account::Account,
1111
contact::{Contact, ContactError},
12+
conversations::Conversations,
1213
session::SessionManager,
1314
storage::{
1415
now, DbConnection, EncryptedMessageStore, StorageError, StoredInstallation, StoredSession,
@@ -119,6 +120,12 @@ where
119120
}
120121

121122
self.is_initialized = true;
123+
124+
// Send any unsent messages
125+
if let Err(err) = Conversations::process_outbound_messages(&self).await {
126+
log::error!("Could not process outbound messages on init: {:?}", err)
127+
}
128+
122129
Ok(())
123130
}
124131

xmtp/src/conversation.rs

+23-32
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::{
22
client::ClientError,
33
codecs::{text::TextCodec, CodecError, ContentCodec},
44
contact::Contact,
5+
conversations::Conversations,
56
invitation::{Invitation, InvitationError},
67
message::PayloadError,
78
session::SessionError,
@@ -103,19 +104,12 @@ impl<'c, A> SecretConversation<'c, A>
103104
where
104105
A: XmtpApiClient,
105106
{
106-
pub fn new(client: &'c Client<A>, peer_address: Address) -> Self {
107-
Self {
107+
// Instantiate the conversation and insert all the necessary records into the database
108+
pub fn new(client: &'c Client<A>, peer_address: Address) -> Result<Self, ConversationError> {
109+
let obj = Self {
108110
client,
109111
peer_address,
110-
}
111-
}
112-
113-
// Instantiate the conversation and insert all the necessary records into the database
114-
pub(crate) fn create(
115-
client: &'c Client<A>,
116-
peer_address: Address,
117-
) -> Result<Self, ConversationError> {
118-
let obj = Self::new(client, peer_address);
112+
};
119113
let conn = &mut client.store.conn()?;
120114

121115
obj.client.store.insert_or_ignore_user_with_conn(
@@ -148,7 +142,7 @@ where
148142
self.peer_address.clone()
149143
}
150144

151-
pub fn send(&self, content_bytes: Vec<u8>) -> Result<(), ConversationError> {
145+
pub async fn send(&self, content_bytes: Vec<u8>) -> Result<(), ConversationError> {
152146
NewStoredMessage::new(
153147
self.convo_id(),
154148
self.client.account.addr(),
@@ -158,15 +152,19 @@ where
158152
)
159153
.store(&mut self.client.store.conn().unwrap())?;
160154

155+
if let Err(err) = Conversations::process_outbound_messages(&self.client).await {
156+
log::error!("Could not process outbound messages on init: {:?}", err)
157+
}
158+
161159
Ok(())
162160
}
163161

164-
pub fn send_text(&self, text: &str) -> Result<(), ConversationError> {
162+
pub async fn send_text(&self, text: &str) -> Result<(), ConversationError> {
165163
// TODO support other codecs
166164
let encoded_content = TextCodec::encode(text.to_string())?;
167165
let content_bytes = encoded_content.encode_to_vec();
168166

169-
self.send(content_bytes)
167+
self.send(content_bytes).await
170168
}
171169

172170
pub async fn list_messages(
@@ -247,10 +245,7 @@ mod tests {
247245
codecs::{text::TextCodec, ContentCodec},
248246
conversation::ListMessagesOptions,
249247
conversations::Conversations,
250-
mock_xmtp_api_client::MockXmtpApiClient,
251-
test_utils::test_utils::{
252-
gen_test_client, gen_test_client_internal, gen_test_conversation,
253-
},
248+
test_utils::test_utils::{gen_test_client, gen_test_conversation, gen_two_test_clients},
254249
};
255250

256251
#[tokio::test]
@@ -260,18 +255,16 @@ mod tests {
260255
let convo_id = format!(":{}:{}", peer_address, client.wallet_address());
261256
assert!(client.store.get_conversation(&convo_id).unwrap().is_none());
262257

263-
let conversations = Conversations::new(&client);
264-
let conversation = gen_test_conversation(&conversations, peer_address).await;
258+
let conversation = gen_test_conversation(&client, peer_address).await;
265259
assert!(conversation.peer_address() == peer_address);
266260
assert!(client.store.get_conversation(&convo_id).unwrap().is_some());
267261
}
268262

269263
#[tokio::test]
270264
async fn test_send_text() {
271265
let client = gen_test_client().await;
272-
let conversations = Conversations::new(&client);
273-
let conversation = gen_test_conversation(&conversations, "0x000").await;
274-
conversation.send_text("Hello, world!").unwrap();
266+
let conversation = gen_test_conversation(&client, "0x000").await;
267+
conversation.send_text("Hello, world!").await.unwrap();
275268

276269
let message = &client.store.get_unprocessed_messages().unwrap()[0];
277270
let content = EncodedContent::decode(&message.content[..]).unwrap();
@@ -280,17 +273,15 @@ mod tests {
280273

281274
#[tokio::test]
282275
async fn test_list_messages() {
283-
let api_client = MockXmtpApiClient::new();
284-
let client = gen_test_client_internal(api_client.clone()).await;
285-
let recipient = gen_test_client_internal(api_client.clone()).await;
286-
let conversations = Conversations::new(&client);
287-
let conversation =
288-
gen_test_conversation(&conversations, recipient.account.addr().as_str()).await;
276+
let (client, recipient) = gen_two_test_clients().await;
277+
let conversation = gen_test_conversation(&client, recipient.account.addr().as_str()).await;
289278
conversation.initialize().await.unwrap();
290-
conversation.send_text("Hello, world!").unwrap();
291-
conversation.send_text("Hello, again").unwrap();
279+
conversation.send_text("Hello, world!").await.unwrap();
280+
conversation.send_text("Hello, again").await.unwrap();
292281

293-
conversations.process_outbound_messages().await.unwrap();
282+
Conversations::process_outbound_messages(&client)
283+
.await
284+
.unwrap();
294285

295286
let results = conversation
296287
.list_messages(&ListMessagesOptions::default())

0 commit comments

Comments
 (0)