Skip to content

Commit

Permalink
Add support for parsing byte arrays as Bytes
Browse files Browse the repository at this point in the history
The Protobuf crate supports using Bytes instead of Vec<u8> when parsing
byte arrays from protobuf structs. This can help reduce copying of
data shared by multiple consumers of messages.

Protobuf's support for using Bytes has been enabled.
UMessageBuilder's functions for building with payload have been
adapted to accept any payload that can be converted to Bytes.
  • Loading branch information
sophokles73 committed May 21, 2024
1 parent 3760db3 commit c57d8d3
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ bytes = { version = "1.5" }
chrono = { version = "0.4.32" }
mediatype = "0.19"
once_cell = { version = "1.19" }
protobuf = { version = "3.3" }
protobuf = { version = "3.3", features = ["with-bytes"] }
rand = { version = "0.8" }
regex = { version = "1.10" }
url = { version = "2.5" }
Expand Down
3 changes: 3 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::fs;
use std::path::Path;
use std::path::PathBuf;

use protobuf_codegen::Customize;

const UPROTOCOL_BASE_URI: &str =
"https://raw.githubusercontent.com/eclipse-uprotocol/up-spec/main/up-core-api/uprotocol";

Expand Down Expand Up @@ -72,6 +74,7 @@ fn get_and_build_protos(
.protoc()
// use vendored protoc instead of relying on user provided protobuf installation
.protoc_path(&protoc_bin_vendored::protoc_bin_path().unwrap())
.customize(Customize::default().tokio_bytes(true))
.include(proto_folder)
.inputs(proto_files)
.cargo_out_dir(output_folder)
Expand Down
40 changes: 20 additions & 20 deletions src/umessage/umessagebuilder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl UMessageBuilder {
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let topic = UUri::try_from("my-vehicle/4210/1/B24D")?;
/// let message = UMessageBuilder::publish(topic.clone())
/// .build_with_payload("closed".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_PUBLISH.into());
/// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS1.into());
/// assert_eq!(message.attributes.source, Some(topic).into());
Expand Down Expand Up @@ -113,7 +113,7 @@ impl UMessageBuilder {
/// let origin = UUri::try_from("my-vehicle/4210/5/F20B")?;
/// let destination = UUri::try_from("my-cloud/CCDD/2/75FD")?;
/// let message = UMessageBuilder::notification(origin.clone(), destination.clone())
/// .build_with_payload("unexpected movement".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// .build_with_payload("unexpected movement", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_NOTIFICATION.into());
/// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS1.into());
/// assert_eq!(message.attributes.source, Some(origin).into());
Expand Down Expand Up @@ -154,7 +154,7 @@ impl UMessageBuilder {
/// let method_to_invoke = UUri::try_from("my-vehicle/4210/5/64AB")?;
/// let reply_to_address = UUri::try_from("my-cloud/BA4C/1/0")?;
/// let message = UMessageBuilder::request(method_to_invoke.clone(), reply_to_address.clone(), 5000)
/// .build_with_payload("lock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_REQUEST.into());
/// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS4.into());
/// assert_eq!(message.attributes.source, Some(reply_to_address).into());
Expand Down Expand Up @@ -249,7 +249,7 @@ impl UMessageBuilder {
/// let request_message_id = UUIDBuilder::build();
/// let request_message = UMessageBuilder::request(method_to_invoke.clone(), reply_to_address.clone(), 5000)
/// .with_message_id(request_message_id.clone()) // normally not needed, used only for asserts below
/// .build_with_payload("lock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
///
/// let response_message = UMessageBuilder::response_for_request(&request_message.attributes)
/// .with_priority(UPriority::UPRIORITY_CS5)
Expand Down Expand Up @@ -307,11 +307,11 @@ impl UMessageBuilder {
/// builder.with_priority(UPriority::UPRIORITY_CS2);
/// let message_one = builder
/// .with_message_id(UUIDBuilder::build())
/// .build_with_payload("closed".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// let message_two = builder
/// // use new message ID but retain all other attributes
/// .with_message_id(UUIDBuilder::build())
/// .build_with_payload("open".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// .build_with_payload("open", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// assert_ne!(message_one.attributes.id, message_two.attributes.id);
/// assert_eq!(message_one.attributes.source, message_two.attributes.source);
/// assert_eq!(message_one.attributes.priority, UPriority::UPRIORITY_CS2.into());
Expand Down Expand Up @@ -351,7 +351,7 @@ impl UMessageBuilder {
/// let topic = UUri::try_from("my-vehicle/4210/1/B24D")?;
/// let message = UMessageBuilder::publish(topic)
/// .with_priority(UPriority::UPRIORITY_CS5)
/// .build_with_payload("closed".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS5.into());
/// # Ok(())
/// # }
Expand Down Expand Up @@ -424,7 +424,7 @@ impl UMessageBuilder {
/// let token = String::from("this-is-my-token");
/// let message = UMessageBuilder::request(method_to_invoke, reply_to_address, 5000)
/// .with_token(token.clone())
/// .build_with_payload("lock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// assert_eq!(message.attributes.token, Some(token));
/// # Ok(())
/// # }
Expand Down Expand Up @@ -460,7 +460,7 @@ impl UMessageBuilder {
/// let reply_to_address = UUri::try_from("my-cloud/BA4C/1/0")?;
/// let message = UMessageBuilder::request(method_to_invoke, reply_to_address, 5000)
/// .with_permission_level(12)
/// .build_with_payload("lock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// assert_eq!(message.attributes.permission_level, Some(12));
/// # Ok(())
/// # }
Expand Down Expand Up @@ -590,7 +590,7 @@ impl UMessageBuilder {
.map_err(UMessageError::from)
.map(|_| UMessage {
attributes: Some(attributes).into(),
payload: self.payload.as_ref().map(|bytes| bytes.to_vec()),
payload: self.payload.to_owned(),
..Default::default()
})
}
Expand Down Expand Up @@ -619,17 +619,17 @@ impl UMessageBuilder {
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let topic = UUri::try_from("my-vehicle/4210/1/B24D")?;
/// let message = UMessageBuilder::publish(topic)
/// .build_with_payload("locked".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// .build_with_payload("locked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?;
/// assert!(message.payload.is_some());
/// # Ok(())
/// # }
/// ```
pub fn build_with_payload(
pub fn build_with_payload<T: Into<Bytes>>(
&mut self,
payload: Bytes,
payload: T,
format: UPayloadFormat,
) -> Result<UMessage, UMessageError> {
self.payload = Some(payload);
self.payload = Some(payload.into());
self.payload_format = format;

self.build()
Expand Down Expand Up @@ -681,7 +681,7 @@ impl UMessageBuilder {
.map_err(UMessageError::from)
.and_then(|serialized_payload| {
self.build_with_payload(
serialized_payload.into(),
serialized_payload,
UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF,
)
})
Expand Down Expand Up @@ -733,7 +733,7 @@ impl UMessageBuilder {
.and_then(|any| any.write_to_bytes().map_err(UMessageError::from))
.and_then(|serialized_payload| {
self.build_with_payload(
serialized_payload.into(),
serialized_payload,
UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
)
})
Expand Down Expand Up @@ -824,11 +824,11 @@ mod tests {
let mut builder = UMessageBuilder::publish(topic);
let message_one = builder
.with_message_id(UUIDBuilder::build())
.build_with_payload("locked".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
.build_with_payload("locked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
.expect("should have been able to create message");
let message_two = builder
.with_message_id(UUIDBuilder::build())
.build_with_payload("unlocked".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
.build_with_payload("unlocked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
.expect("should have been able to create message");
assert_eq!(message_one.attributes.type_, message_two.attributes.type_);
assert_ne!(message_one.attributes.id, message_two.attributes.id);
Expand All @@ -844,7 +844,7 @@ mod tests {
.with_message_id(message_id.clone())
.with_priority(UPriority::UPRIORITY_CS2)
.with_ttl(5000)
.build_with_payload("locked".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
.build_with_payload("locked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
.expect("should have been able to create message");
assert_eq!(message.attributes.id, Some(message_id).into());
assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS2.into());
Expand All @@ -870,7 +870,7 @@ mod tests {
.with_permission_level(5)
.with_priority(UPriority::UPRIORITY_CS4)
.with_token(token.clone())
.build_with_payload("unlock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
.build_with_payload("unlock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)
.expect("should have been able to create message");

assert_eq!(message.attributes.id, Some(message_id).into());
Expand Down

0 comments on commit c57d8d3

Please sign in to comment.