From 3760db33ba949d1eeb5dc12785f940d528e90028 Mon Sep 17 00:00:00 2001 From: Daniel Krippner Date: Fri, 17 May 2024 16:02:22 +0200 Subject: [PATCH 1/3] Adopt changes from uProtocol Spec The UPayload structure has been removed from UMessage. The code has been adapted accordingly. Also-by: Kai Hudalla --- build.rs | 1 - src/lib.rs | 11 +- src/rpc.rs | 2 - src/rpc/rpcclient.rs | 17 +- src/rpc/rpcmapper.rs | 528 -------------------------------- src/umessage.rs | 89 ++++++ src/umessage/umessagebuilder.rs | 141 +++++---- src/utransport.rs | 2 +- 8 files changed, 172 insertions(+), 619 deletions(-) delete mode 100644 src/rpc/rpcmapper.rs diff --git a/build.rs b/build.rs index 0d941b75..3c4616f9 100644 --- a/build.rs +++ b/build.rs @@ -26,7 +26,6 @@ fn main() -> Result<(), Box> { format!("{}/uuid.proto", UPROTOCOL_BASE_URI).as_str(), format!("{}/uri.proto", UPROTOCOL_BASE_URI).as_str(), format!("{}/uattributes.proto", UPROTOCOL_BASE_URI).as_str(), - format!("{}/upayload.proto", UPROTOCOL_BASE_URI).as_str(), format!("{}/umessage.proto", UPROTOCOL_BASE_URI).as_str(), format!("{}/ustatus.proto", UPROTOCOL_BASE_URI).as_str(), // not used in the SDK yet, but for completeness sake diff --git a/src/lib.rs b/src/lib.rs index 764dd5e2..b5b40166 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,22 +44,17 @@ // up_core_api types used and augmented by up_rust - symbols re-exported to toplevel, errors are module-specific mod rpc; -pub use rpc::{RpcClient, RpcClientResult}; -pub use rpc::{RpcMapper, RpcMapperError}; -pub use rpc::{RpcPayload, RpcPayloadResult, RpcResult}; +pub use rpc::{RpcClient, RpcClientResult, RpcResult}; mod uattributes; pub use uattributes::{ PublishValidator, RequestValidator, ResponseValidator, UAttributesValidator, UAttributesValidators, }; -pub use uattributes::{UAttributes, UAttributesError, UMessageType, UPriority}; +pub use uattributes::{UAttributes, UAttributesError, UMessageType, UPayloadFormat, UPriority}; mod umessage; -pub use umessage::{UMessage, UMessageBuilder, UMessageBuilderError}; - -mod upayload; -pub use upayload::{UPayload, UPayloadError, UPayloadFormat}; +pub use umessage::{UMessage, UMessageBuilder, UMessageError}; mod uri; pub use uri::{UUri, UUriError}; diff --git a/src/rpc.rs b/src/rpc.rs index fae00401..8b877acb 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -12,9 +12,7 @@ ********************************************************************************/ mod rpcclient; -mod rpcmapper; mod rpcresult; pub use rpcclient::*; -pub use rpcmapper::*; pub use rpcresult::*; diff --git a/src/rpc/rpcclient.rs b/src/rpc/rpcclient.rs index 84a3b770..00c3ba8b 100644 --- a/src/rpc/rpcclient.rs +++ b/src/rpc/rpcclient.rs @@ -13,9 +13,9 @@ use async_trait::async_trait; -use crate::{RpcMapperError, UMessage, UPayload, UPriority, UUri}; +use crate::{UMessage, UMessageError, UUri}; -pub type RpcClientResult = Result; +pub type RpcClientResult = Result; /// `RpcClient` is an interface used by code generators for uProtocol services defined in `.proto` files such as /// the core uProtocol services found in [uProtocol Core API](https://github.com/eclipse-uprotocol/up-spec/tree/main/up-core-api). @@ -35,21 +35,10 @@ pub trait RpcClient: Send + Sync { /// /// * `method` - The URI of the method to be invoked. For example, in long form: "/example.hello_world/1/rpc.SayHello". /// * `request` - The request message to be sent to the server. - /// * `priority` - The priority to use for sending the request and corresponding response messages. Must be at least - /// [`UPriority::UPRIORITY_CS4`], which is also the default if not specified explicitly. - /// * `ttl` - The request's time-to-live in milliseconds. - /// * `token` - The authorization token to use for TAP. /// /// # Returns /// /// Returns a `RpcClientResult` which contains the response message. /// If the invocation fails, it contains a `UStatus` detailing the failure reason. - async fn invoke_method( - &self, - method: UUri, - request: UPayload, - priority: Option, - ttl: Option, - token: Option, - ) -> RpcClientResult; + async fn invoke_method(&self, method: UUri, request: UMessage) -> RpcClientResult; } diff --git a/src/rpc/rpcmapper.rs b/src/rpc/rpcmapper.rs deleted file mode 100644 index 91f3fd7d..00000000 --- a/src/rpc/rpcmapper.rs +++ /dev/null @@ -1,528 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2023 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ - -use std::{default::Default, fmt}; - -use protobuf::{well_known_types::any::Any, MessageFull}; - -use crate::{RpcClientResult, UPayload, UPayloadFormat, UStatus}; - -pub type RpcPayloadResult = Result; - -#[derive(Clone)] -pub struct RpcPayload { - pub status: UStatus, - pub payload: Option, -} - -#[derive(Debug)] -pub enum RpcMapperError { - UnexpectedError(String), - InvalidPayload(String), - UnknownType(String), - ProtobufError(String), -} - -impl fmt::Display for RpcMapperError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - RpcMapperError::UnexpectedError(msg) => write!(f, "Unexpected error: {msg}"), - RpcMapperError::InvalidPayload(msg) => write!(f, "Invalid payload: {msg}",), - RpcMapperError::UnknownType(msg) => write!(f, "Unknown type: {msg}"), - RpcMapperError::ProtobufError(msg) => write!(f, "Protobuf error: {msg}"), - } - } -} - -/// `RpcMapper` is a structure that provides static methods to wrap an RPC request with -/// an RPC response (uP-L2). APIs that return a `Message` assume that the payload is -/// protobuf-serialized `com.google.protobuf.Any` (USerializationHint.PROTOBUF), and will -/// return an error if anything else is passed. -pub struct RpcMapper; - -impl RpcMapper { - /// Maps the payload data returned by a peer to the expected return type of the RPC method. - /// - /// # Parameters - /// - /// - `response`: A `Result` of type [`RpcClientResult`], representing the response from an RPC call. - /// - /// # Type Parameters - /// - /// - `T`: The expected return type of the RPC method. - /// - /// # Returns - /// - /// Returns a `Result` either containing the expected return type of the RPC method wrapped, - /// or an [`RpcMapperError`]. - /// - /// # Errors - /// - /// This function can return an [`RpcMapperError`] in the following cases: - /// - /// - `InvalidPayload`: If the payload received in the response cannot be decoded into the expected return type `T`. - /// This error includes the detailed error message from the decoding process. - /// - /// - `UnknownType`: If the payload is present but cannot be decoded into a protobuf `Any` type. - /// This typically indicates an issue with the payload format or the expected type `T`. - /// - pub fn map_response(response: RpcClientResult) -> Result - where - T: MessageFull + Default, - { - let message = response?; // Directly returns in case of error - - let Some(payload) = message.payload.into_option() else { - return Err(RpcMapperError::InvalidPayload( - "Payload is empty".to_string(), - )); - }; - if payload.data.is_empty() { - return Err(RpcMapperError::InvalidPayload( - "Payload is empty".to_string(), - )); - } - Any::try_from(payload) - .map_err(|_e| { - RpcMapperError::UnknownType("Couldn't decode payload into Any".to_string()) - }) - .and_then(|any| match any.unpack::() { - Ok(Some(m)) => Ok(m), - Ok(None) => Err(RpcMapperError::InvalidPayload(String::from( - "Any object is not of expected type", - ))), - Err(error) => Err(RpcMapperError::InvalidPayload(error.to_string())), - }) - } - - /// This function checks if a `RpcClientResult` contains a protobuf status type, - /// - if that is so it extracts the status code from the protobuf status and - /// - returns an [`RpcPayloadResult`] result with `UStatus::Ok()` and No(ne) [`UPayload`] if the protobuf status was Ok - /// - returns an [`RpcPayloadResult`] result with a failed `UStatus` (mirroring the protobuf status) and No(ne) [`UPayload`] if the protobuf status was not Ok - /// - if the payload did not contain a protobuf status, return [`RpcPayloadResult`] result with `UStatus::Ok()` and the original payload in Some([`UPayload`]) - /// - /// The usage idea is to apply this function to a `RpcClient::invoke_method()` result, then match the return to see if it's gotten a(ny) valid response, and - /// apply `RpcMapper::map_result()` in case a payload was returned and a specific payload type is expected. - /// - /// # Errors - /// - /// This function can return an `RpcMapperError` in the following cases: - /// - /// - `UnknownType`: If the payload is present but cannot be decoded into a protobuf `Any` type. This indicates an issue with the payload format. - /// - /// - Other errors propagated from the `RpcClientResult` processing, including failure in unpacking a protobuf status or other issues encountered during processing. - /// - /// # Note - /// There is one conscious deviation from the Java SDK: this implementation returns a `failed` status in every case where there's not a protobuf status - /// in the payload. In such cases, the payload is still passed on as a function result so it can be used in further decoding attempts. So there are two - /// things to check with the return from this function: - /// - is there [`UStatus`] information (transporting info about the status of an operation, sent from a remote service)? - /// - is there payload data passed in the result, to be decoded by the caller. - /// - // TODO This entire thing feels klunky and kludgy; this needs to be revisited... - pub fn map_response_to_result(response: RpcClientResult) -> RpcPayloadResult { - let message = response?; // Directly returns in case of error - let Some(payload) = message.payload.into_option() else { - return Err(RpcMapperError::InvalidPayload( - "Payload is empty".to_string(), - )); - }; - if payload.data.is_empty() { - return Err(RpcMapperError::InvalidPayload( - "Payload is empty".to_string(), - )); - } - Any::try_from(payload) - .map_err(|_e| { - RpcMapperError::UnknownType("Couldn't decode payload into Any".to_string()) - }) - .and_then(|any| { - match Self::unpack_any::(&any) { - Ok(proto_status) => Ok(RpcPayload { - status: proto_status, - payload: None, - }), - Err(_error) => { - // in this branch, we couldn't decode the payload into a protobuf-status, but there is something else there to pass on - UPayload::try_from(&any) - .map_err(|e| RpcMapperError::InvalidPayload(e.to_string())) - .map(|payload| RpcPayload { - status: UStatus::fail(format!( - "Unexpected any-payload type {}", - any.type_url - )), - payload: Some(payload), // get the original payload back to avoid having to .clone() payload, above - }) - } - } - }) - } - - /// Packs a protobuf message into a `UPayload` object. - /// - /// This function is used to encapsulate a strongly-typed data object into a `UPayload`, - /// which allows for more generic data handling. It leverages Prost's protobuf encoding for - /// serializing the data. - /// - /// # Type Parameters - /// - /// * `T`: The type of the data to be packed. - /// - /// # Parameters - /// - /// * `data`: The data to pack. - /// - /// # Returns - /// - /// The payload containing the packed data. - /// - /// # Errors - /// - /// Returns an `RpcMapperError` if the protobuf serialization of the data exceeds 2^32 - 1 bytes. - pub fn pack_payload(data: &T) -> Result { - let buf = data.write_to_bytes().map_err(|_e| { - RpcMapperError::ProtobufError(String::from("failed to serialize payload to protobuf")) - })?; - Ok(UPayload { - data: buf, - format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF.into(), - ..Default::default() - }) - } - - /// Unpacks a given `UPayload` into a protobuf message. - /// - /// This function is used to extract strongly-typed data from a `UPayload` object, leveraging - /// Prost's protobuf decoding capabilities for deserialization. - /// - /// # Type Parameters - /// - /// * `T`: The target type of the data to be unpacked. - /// - /// # Parameters - /// - /// * `payload`: The `UPayload` object containing the data to be unpacked. - /// - /// # Returns - /// - /// * `Ok(T)`: The deserialized protobuf message contained in the payload. - /// - /// # Errors - /// - /// Returns an `RpcMapperError` if the unpacking process fails, for example if the payload could - /// not be deserialized into the target protobuf type `T`. - pub fn unpack_payload( - payload: UPayload, - ) -> Result { - Any::try_from(payload) - .map_err(|_e| RpcMapperError::UnknownType("Couldn't decode payload".to_string())) - .and_then(|any| { - T::parse_from_bytes(any.value.as_slice()) - .map_err(|error| RpcMapperError::InvalidPayload(error.to_string())) - }) - } - - /// Packs a given `data` of type `T` into a protbuf `Any` object. - /// - /// This function is useful for converting strongly-typed data into an `Any` - /// object for use in message-passing scenarios where the type needs to be - /// encoded as `Any`. - /// - /// # Type Parameters - /// - /// * `T`: The type of the data to be packed. - /// - /// # Parameters - /// - /// * `data`: The data of type `T` that will be packed into the returned `Any` object. - /// - /// # Returns - /// - /// * `Ok(Any)`: A protobuf `Any` object containing the packed `data`. - /// * `Err(RpcMapperError)`: An error that occurred during the packing process. - /// - /// # Errors - /// - /// Returns an `RpcMapperError` if the packing process fails. - pub fn pack_any(data: &T) -> Result { - Any::pack(data).map_err(|error| RpcMapperError::InvalidPayload(error.to_string())) - } - - /// Unpacks a given protbuf `Any` object into a data of type `T`. - /// - /// This function is used to convert an `Any` object back into its original - /// strongly-typed data. It's essentially the reverse operation of `pack_any`. - /// - /// # Type Parameters - /// - /// * `T`: The expected type of the unpacked data. This type must implement `prost::Name` - /// for type URL validation and `std::default::Default` for initializing the type. - /// - /// # Parameters - /// - /// * `any`: The `Any` object that will be unpacked. - /// - /// # Returns - /// - /// * `Ok(T)`: A `T` object containing the unpacked data. - /// * `Err(RpcMapperError)`: An error that occurred during the unpacking process. - /// - /// # Errors - /// - /// Returns an `RpcMapperError` if the unpacking process fails, for example due to type mismatch - /// or if the data inside `Any` could not be decoded into type `T`. - pub fn unpack_any(any: &Any) -> Result { - match any.unpack() { - Err(error) => Err(RpcMapperError::InvalidPayload(error.to_string())), - Ok(v) => match v { - Some(msg) => Ok(msg), - None => Err(RpcMapperError::ProtobufError(String::from( - "Any object does not contain payload", - ))), - }, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use bytes::{Buf, BufMut}; - use protobuf::MessageField; - - use crate::{UCode, UMessage}; - - fn build_status_response(code: UCode, msg: &str) -> RpcClientResult { - let status: UStatus = UStatus::fail_with_code(code, msg); - let any = RpcMapper::pack_any(&status)?; - let payload = - UPayload::try_from(any).map_err(|e| RpcMapperError::InvalidPayload(e.to_string()))?; - let message = UMessage { - payload: MessageField::some(payload), - ..Default::default() - }; - Ok(message) - } - - fn build_empty_payload_response() -> RpcClientResult { - let payload = UPayload { - data: vec![], - ..Default::default() - }; - let message = UMessage { - payload: MessageField::some(payload), - ..Default::default() - }; - Ok(message) - } - - fn build_number_response(number: i32) -> RpcClientResult { - let any: Any = Any { - type_url: "type.googleapis.com/Int32Value".to_string(), - value: number.to_be_bytes().into(), - ..Default::default() - }; - let payload = - UPayload::try_from(any).map_err(|e| RpcMapperError::InvalidPayload(e.to_string()))?; - let message = UMessage { - payload: MessageField::some(payload), - ..Default::default() - }; - Ok(message) - } - - #[test] - fn test_map_response_to_result_happy_path() { - let result = RpcMapper::map_response_to_result(build_number_response(3)).unwrap(); - - assert!(result.status.is_failed()); // TODO this seems strange - - let payload = result.payload.unwrap(); - let any = Any::try_from(payload).unwrap(); - assert_eq!("type.googleapis.com/Int32Value", any.type_url); - let value = (&any.value[..]).get_i32(); - assert_eq!(value, 3); - } - - #[test] - fn test_compose_that_returns_status() { - let response = build_status_response(UCode::INVALID_ARGUMENT, "boom"); - - let result = RpcMapper::map_response_to_result(response).unwrap(); - - assert!(result.status.is_failed()); - assert_eq!(result.status.get_code(), UCode::INVALID_ARGUMENT); - assert_eq!(result.status.message.unwrap(), "boom"); - } - - #[test] - fn test_compose_with_failure() { - let response = Err(RpcMapperError::UnexpectedError("Boom".to_string())); - let result = RpcMapper::map_response_to_result(response); - - assert!(result.is_err()); - assert_eq!(result.err().unwrap().to_string(), "Unexpected error: Boom"); - } - - #[test] - fn test_fail_invoke_method_when_invoke_method_returns_a_status_using_map_response_to_rpc_response( - ) { - let response = build_status_response(UCode::INVALID_ARGUMENT, "boom"); - let result = RpcMapper::map_response_to_result(response).unwrap(); - - assert!(result.status.is_failed()); - assert_eq!(UCode::INVALID_ARGUMENT, result.status.get_code()); - assert_eq!("boom", result.status.message.unwrap()); - } - - #[test] - fn test_fail_invoke_method_when_invoke_method_returns_a_bad_proto_using_map_response_to_rpc_response( - ) { - let response = build_number_response(42); - let result = RpcMapper::map_response_to_result(response).unwrap(); - - assert!(result.status.is_failed()); - assert_eq!( - result.status.message.unwrap(), - "Unexpected any-payload type type.googleapis.com/Int32Value" - ); - } - - // Create a generic UMessage for use in test cases - fn build_umessage_for_test() -> UMessage { - let arbitrary_proto = crate::up_core_api::file::FileBatch::default(); - let any = RpcMapper::pack_any(&arbitrary_proto).unwrap(); - - let payload = UPayload::try_from(any).unwrap(); - UMessage { - payload: MessageField::some(payload), - ..Default::default() - } - } - - #[test] - fn test_success_invoke_method_happy_flow_using_map_response_to_rpc_response() { - let response_message = build_umessage_for_test(); - let result = RpcMapper::map_response_to_result(Ok(response_message.clone())).unwrap(); - - assert!(result.status.is_failed()); - assert_eq!(result.payload.unwrap(), response_message.payload.unwrap()); - } - - #[test] - fn test_success_invoke_method_happy_flow_using_map_response() { - let response_message = build_umessage_for_test(); - let file_batch = - RpcMapper::map_response::(Ok(response_message)) - .unwrap(); - - assert_eq!(file_batch, crate::up_core_api::file::FileBatch::default()); - } - - #[test] - fn test_fail_invoke_method_when_invoke_method_returns_a_status_using_map_response() { - let response = build_status_response(UCode::ABORTED, "hello"); - let e = RpcMapper::map_response::(response); - - assert!(e.is_err()); - } - - #[test] - fn test_fail_invoke_method_when_invoke_method_returns_a_bad_proto_using_map_response() { - let response = build_number_response(42); - let e = RpcMapper::map_response::(response); - - assert!(e.is_err()); - } - - // all these stub-using tests, what do they add? - - #[test] - fn test_success_invoke_method_that_has_null_payload_map_response() { - let response = Err(RpcMapperError::InvalidPayload( - "not a CloudEvent".to_string(), - )); - let result = RpcMapper::map_response::(response); - - assert!(result.is_err()); - assert_eq!( - result.err().unwrap().to_string(), - "Invalid payload: not a CloudEvent" - ); - } - - #[test] - fn test_success_invoke_method_that_has_null_payload_map_response_to_result() { - let response = Err(RpcMapperError::InvalidPayload( - "Invalid payload".to_string(), - )); - let result = RpcMapper::map_response_to_result(response); - - assert!(result.is_err()); - assert_eq!( - result.err().unwrap().to_string(), - "Invalid payload: Invalid payload" - ); - } - - #[test] - fn test_success_invoke_method_happy_flow_that_returns_status_using_map_response() { - let response = build_status_response(UCode::OK, "all good"); - let s = RpcMapper::map_response::(response).unwrap(); - let ustatus = s; - - assert_eq!(UCode::OK, ustatus.get_code()); - assert_eq!("all good", ustatus.message.unwrap()); - } - - #[test] - fn test_success_invoke_method_happy_flow_that_returns_status_using_map_response_to_result_to_rpc_response( - ) { - let response = build_status_response(UCode::OK, "all good"); - let s = RpcMapper::map_response_to_result(response).unwrap(); - - assert!(s.status.is_success()); - assert_eq!(s.status.get_code(), UCode::OK); - } - - #[test] - fn test_unpack_payload_failed() { - let payload = Any { - type_url: "type.googleapis.com/Int32Value".to_string(), - value: { - let mut buf = vec![]; - buf.put_i32(42); - buf - }, - ..Default::default() - }; - - let result: Result = RpcMapper::unpack_any::(&payload); - - assert!(result.is_err()); - } - - #[test] - fn test_invalid_payload_that_is_not_type_any() { - let response = build_empty_payload_response(); - let result = RpcMapper::map_response::(response); - assert!(result.is_err()); - } - - #[test] - fn test_invalid_payload_that_is_not_type_any_map_to_result() { - let response = build_empty_payload_response(); - let result = RpcMapper::map_response_to_result(response); - assert!(result.is_err()); - } -} diff --git a/src/umessage.rs b/src/umessage.rs index 35111039..d27f9ff9 100644 --- a/src/umessage.rs +++ b/src/umessage.rs @@ -17,3 +17,92 @@ mod umessagetype; pub use umessagebuilder::*; pub use crate::up_core_api::umessage::UMessage; + +use crate::{UAttributesError, UPayloadFormat}; +use protobuf::{well_known_types::any::Any, Message}; + +#[derive(Debug)] +pub enum UMessageError { + AttributesValidationError(UAttributesError), + DataSerializationError(protobuf::Error), + PayloadError(String), +} + +impl std::fmt::Display for UMessageError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::AttributesValidationError(e) => f.write_fmt(format_args!( + "Builder state is not consistent with message type: {}", + e + )), + Self::DataSerializationError(e) => { + f.write_fmt(format_args!("Failed to serialize payload: {}", e)) + } + Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {}", e)), + } + } +} + +impl std::error::Error for UMessageError {} + +impl From for UMessageError { + fn from(value: UAttributesError) -> Self { + Self::AttributesValidationError(value) + } +} + +impl From for UMessageError { + fn from(value: protobuf::Error) -> Self { + Self::DataSerializationError(value) + } +} + +impl From<&str> for UMessageError { + fn from(value: &str) -> Self { + Self::PayloadError(value.into()) + } +} + +impl UMessage { + /// Extracts the payload-contained protobuf message from a `UMessage`. + /// + /// This function is used to extract strongly-typed data from a `UMessage` object, + /// taking into account `UMessage::UPayloadFormat` (will only succeed if payload format is + /// `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF` or `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`) + /// + /// # Type Parameters + /// + /// * `T`: The target type of the data to be unpacked. + /// + /// # Returns + /// + /// * `Ok(T)`: The deserialized protobuf message contained in the payload. + /// + /// # Errors + /// + /// * Err(`UMessageError`) if the unpacking process fails, for example if the payload could + /// not be deserialized into the target type `T`. + pub fn extract_protobuf_payload(&self) -> Result { + if let Some(payload) = self.payload.as_ref() { + match self.attributes.payload_format.enum_value_or_default() { + UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF => { + return T::parse_from_bytes(payload.as_ref()) + .map_err(UMessageError::DataSerializationError); + } + UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY => { + return Any::parse_from_bytes(payload.as_ref()) + .map_err(UMessageError::DataSerializationError) + .and_then(|any| { + T::parse_from_bytes(any.value.as_slice()) + .map_err(UMessageError::DataSerializationError) + }); + } + _ => Err(UMessageError::from( + "Unknown/invalid/unsupported payload format", + )), + } + } else { + Err(UMessageError::from("Payload is empty")) + } + } +} diff --git a/src/umessage/umessagebuilder.rs b/src/umessage/umessagebuilder.rs index d9bea6d8..47b9c8ad 100644 --- a/src/umessage/umessagebuilder.rs +++ b/src/umessage/umessagebuilder.rs @@ -12,48 +12,15 @@ ********************************************************************************/ use bytes::Bytes; -use protobuf::{Enum, EnumOrUnknown, Message}; +use protobuf::{well_known_types::any::Any, Enum, EnumOrUnknown, Message, MessageFull}; -use crate::uattributes::{NotificationValidator, UAttributesError}; +use crate::uattributes::NotificationValidator; use crate::{ PublishValidator, RequestValidator, ResponseValidator, UAttributes, UAttributesValidator, - UCode, UMessage, UMessageType, UPayload, UPayloadFormat, UPriority, UUIDBuilder, UUri, UUID, + UCode, UMessage, UMessageError, UMessageType, UPayloadFormat, UPriority, UUIDBuilder, UUri, + UUID, }; -#[derive(Debug)] -pub enum UMessageBuilderError { - DataSerializationError(protobuf::Error), - AttributesValidationError(UAttributesError), -} - -impl std::fmt::Display for UMessageBuilderError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::DataSerializationError(e) => { - f.write_fmt(format_args!("Failed to serialize payload: {}", e)) - } - Self::AttributesValidationError(e) => f.write_fmt(format_args!( - "Builder state is not consistent with message type: {}", - e - )), - } - } -} - -impl std::error::Error for UMessageBuilderError {} - -impl From for UMessageBuilderError { - fn from(value: UAttributesError) -> Self { - Self::AttributesValidationError(value) - } -} - -impl From for UMessageBuilderError { - fn from(value: protobuf::Error) -> Self { - Self::DataSerializationError(value) - } -} - /// A builder for creating [`UMessage`]s. /// /// Messages are being used by a uEntity to inform other entities about the occurrence of events @@ -553,7 +520,7 @@ impl UMessageBuilder { /// # Errors /// /// If the properties set on the builder do not represent a consistent set of [`UAttributes`], - /// a [`UMessageBuilderError::AttributesValidationError`] is returned. + /// a [`UMessageError::AttributesValidationError`] is returned. /// /// # Examples /// @@ -562,7 +529,7 @@ impl UMessageBuilder { /// The recommended way to use the `UMessageBuilder`. /// /// ```rust - /// use up_rust::{UAttributes, UAttributesValidators, UMessageBuilder, UMessageBuilderError, UMessageType, UPriority, UUIDBuilder, UUri}; + /// use up_rust::{UAttributes, UAttributesValidators, UMessageBuilder, UMessageError, UMessageType, UPriority, UUIDBuilder, UUri}; /// /// # fn main() -> Result<(), Box> { /// let invoked_method = UUri::try_from("my-vehicle/4210/5/64AB")?; @@ -599,7 +566,7 @@ impl UMessageBuilder { /// # Ok(()) /// # } /// ``` - pub fn build(&self) -> Result { + pub fn build(&self) -> Result { let message_id = self .message_id .clone() @@ -615,26 +582,16 @@ impl UMessageBuilder { permission_level: self.permission_level, commstatus: self.comm_status, reqid: self.request_id.clone().into(), + payload_format: self.payload_format.into(), ..Default::default() }; self.validator .validate(&attributes) - .map_err(UMessageBuilderError::from) - .map(|_| { - let payload = self - .payload - .as_ref() - .map(|bytes| bytes.to_vec()) - .map(|data| UPayload { - format: self.payload_format.into(), - data, - ..Default::default() - }); - UMessage { - attributes: Some(attributes).into(), - payload: payload.into(), - ..Default::default() - } + .map_err(UMessageError::from) + .map(|_| UMessage { + attributes: Some(attributes).into(), + payload: self.payload.as_ref().map(|bytes| bytes.to_vec()), + ..Default::default() }) } @@ -652,7 +609,7 @@ impl UMessageBuilder { /// # Errors /// /// If the properties set on the builder do not represent a consistent set of [`UAttributes`], - /// a [`UMessageBuilderError::AttributesValidationError`] is returned. + /// a [`UMessageError::AttributesValidationError`] is returned. /// /// # Examples /// @@ -671,9 +628,10 @@ impl UMessageBuilder { &mut self, payload: Bytes, format: UPayloadFormat, - ) -> Result { + ) -> Result { self.payload = Some(payload); self.payload_format = format; + self.build() } @@ -682,23 +640,23 @@ impl UMessageBuilder { /// # Arguments /// /// * `payload` - The data to set as payload. - /// * `format` - The payload format. /// /// # Returns /// - /// A message ready to be sent using [`crate::UTransport::send`]. + /// A message ready to be sent using [`crate::UTransport::send`]. The message will have + /// [`UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF`] set as its payload format. /// /// # Errors /// - /// If the given payload cannot be serialized into a byte array, a [`UMessageBuilderError::DataSerializationError`] is returned. + /// If the given payload cannot be serialized into a protobuf byte array, a [`UMessageError::DataSerializationError`] is returned. /// If the properties set on the builder do not represent a consistent set of [`UAttributes`], - /// a [`UMessageBuilderError::AttributesValidationError`] is returned. + /// a [`UMessageError::AttributesValidationError`] is returned. /// /// # Examples /// /// ```rust /// use protobuf::{Enum, Message}; - /// use up_rust::{UCode, UMessageBuilder, UMessageType, UPriority, UStatus, UUIDBuilder, UUri}; + /// use up_rust::{UCode, UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UStatus, UUIDBuilder, UUri}; /// /// # fn main() -> Result<(), Box> { /// let invoked_method = UUri::try_from("my-vehicle/4210/5/64AB")?; @@ -710,16 +668,17 @@ impl UMessageBuilder { /// .with_comm_status(UCode::INVALID_ARGUMENT.value()) /// .build_with_protobuf_payload(&UStatus::fail("failed to parse request"))?; /// assert!(message.payload.is_some()); + /// assert_eq!(message.attributes.payload_format.enum_value().unwrap(), UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF); /// # Ok(()) /// # } /// ``` pub fn build_with_protobuf_payload( &mut self, payload: &T, - ) -> Result { + ) -> Result { payload .write_to_bytes() - .map_err(UMessageBuilderError::from) + .map_err(UMessageError::from) .and_then(|serialized_payload| { self.build_with_payload( serialized_payload.into(), @@ -727,6 +686,58 @@ impl UMessageBuilder { ) }) } + + /// Creates the message based on the builder's state and some payload. + /// + /// # Arguments + /// + /// * `payload` - The data to set as payload. + /// + /// # Returns + /// + /// A message ready to be sent using [`crate::UTransport::send`]. The message will have + /// [`UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`] set as its payload format. + /// + /// # Errors + /// + /// If the given payload cannot be serialized into a protobuf byte array, a [`UMessageError::DataSerializationError`] is returned. + /// If the properties set on the builder do not represent a consistent set of [`UAttributes`], + /// a [`UMessageError::AttributesValidationError`] is returned. + /// + /// # Examples + /// + /// ```rust + /// use protobuf::{Enum, Message}; + /// use up_rust::{UCode, UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UStatus, UUIDBuilder, UUri}; + /// + /// # fn main() -> Result<(), Box> { + /// let invoked_method = UUri::try_from("my-vehicle/4210/5/64AB")?; + /// let reply_to_address = UUri::try_from("my-cloud/BA4C/1/0")?; + /// let request_id = UUIDBuilder::build(); + /// // a service implementation would normally use + /// // `UMessageBuilder::response_for_request(&request_message.attributes)` instead + /// let message = UMessageBuilder::response(reply_to_address, request_id, invoked_method) + /// .with_comm_status(UCode::INVALID_ARGUMENT.value()) + /// .build_with_wrapped_protobuf_payload(&UStatus::fail("failed to parse request"))?; + /// assert!(message.payload.is_some()); + /// assert_eq!(message.attributes.payload_format.enum_value().unwrap(), UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY); + /// # Ok(()) + /// # } + /// ``` + pub fn build_with_wrapped_protobuf_payload( + &mut self, + payload: &T, + ) -> Result { + Any::pack(payload) + .map_err(UMessageError::DataSerializationError) + .and_then(|any| any.write_to_bytes().map_err(UMessageError::from)) + .and_then(|serialized_payload| { + self.build_with_payload( + serialized_payload.into(), + UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY, + ) + }) + } } #[cfg(test)] diff --git a/src/utransport.rs b/src/utransport.rs index 70e6c84f..1ddd0998 100644 --- a/src/utransport.rs +++ b/src/utransport.rs @@ -44,7 +44,7 @@ use crate::{UMessage, UStatus, UUri}; /// async fn on_receive(&self, msg: UMessage) { /// let mut inner_foo = self.inner_foo.lock().unwrap(); /// if let Some(payload) = msg.payload.as_ref() { -/// *inner_foo = format!("latest message length: {}", payload.data.len()); +/// *inner_foo = format!("latest message length: {}", payload.len()); /// } /// } /// From c57d8d3cea390ace251dfd86f7d9593967980b89 Mon Sep 17 00:00:00 2001 From: Kai Hudalla Date: Tue, 21 May 2024 11:01:03 +0200 Subject: [PATCH 2/3] Add support for parsing byte arrays as Bytes The Protobuf crate supports using Bytes instead of Vec when parsing byte arrays from protobuf structs. This can help reduce copying of data shared by multiple consumers of messages. Protobuf's support for using Bytes has been enabled. UMessageBuilder's functions for building with payload have been adapted to accept any payload that can be converted to Bytes. --- Cargo.lock | 1 + Cargo.toml | 2 +- build.rs | 3 +++ src/umessage/umessagebuilder.rs | 40 ++++++++++++++++----------------- 4 files changed, 25 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df23a775..e761b0a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1144,6 +1144,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58678a64de2fced2bdec6bca052a6716a0efe692d6e3f53d1bda6a1def64cfc0" dependencies = [ + "bytes", "once_cell", "protobuf-support", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 5dccffd6..694e0a6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ bytes = { version = "1.5" } chrono = { version = "0.4.32" } mediatype = "0.19" once_cell = { version = "1.19" } -protobuf = { version = "3.3" } +protobuf = { version = "3.3", features = ["with-bytes"] } rand = { version = "0.8" } regex = { version = "1.10" } url = { version = "2.5" } diff --git a/build.rs b/build.rs index 3c4616f9..7241f816 100644 --- a/build.rs +++ b/build.rs @@ -16,6 +16,8 @@ use std::fs; use std::path::Path; use std::path::PathBuf; +use protobuf_codegen::Customize; + const UPROTOCOL_BASE_URI: &str = "https://raw.githubusercontent.com/eclipse-uprotocol/up-spec/main/up-core-api/uprotocol"; @@ -72,6 +74,7 @@ fn get_and_build_protos( .protoc() // use vendored protoc instead of relying on user provided protobuf installation .protoc_path(&protoc_bin_vendored::protoc_bin_path().unwrap()) + .customize(Customize::default().tokio_bytes(true)) .include(proto_folder) .inputs(proto_files) .cargo_out_dir(output_folder) diff --git a/src/umessage/umessagebuilder.rs b/src/umessage/umessagebuilder.rs index 47b9c8ad..89ed1494 100644 --- a/src/umessage/umessagebuilder.rs +++ b/src/umessage/umessagebuilder.rs @@ -79,7 +79,7 @@ impl UMessageBuilder { /// # fn main() -> Result<(), Box> { /// let topic = UUri::try_from("my-vehicle/4210/1/B24D")?; /// let message = UMessageBuilder::publish(topic.clone()) - /// .build_with_payload("closed".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_PUBLISH.into()); /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS1.into()); /// assert_eq!(message.attributes.source, Some(topic).into()); @@ -113,7 +113,7 @@ impl UMessageBuilder { /// let origin = UUri::try_from("my-vehicle/4210/5/F20B")?; /// let destination = UUri::try_from("my-cloud/CCDD/2/75FD")?; /// let message = UMessageBuilder::notification(origin.clone(), destination.clone()) - /// .build_with_payload("unexpected movement".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("unexpected movement", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_NOTIFICATION.into()); /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS1.into()); /// assert_eq!(message.attributes.source, Some(origin).into()); @@ -154,7 +154,7 @@ impl UMessageBuilder { /// let method_to_invoke = UUri::try_from("my-vehicle/4210/5/64AB")?; /// let reply_to_address = UUri::try_from("my-cloud/BA4C/1/0")?; /// let message = UMessageBuilder::request(method_to_invoke.clone(), reply_to_address.clone(), 5000) - /// .build_with_payload("lock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_REQUEST.into()); /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS4.into()); /// assert_eq!(message.attributes.source, Some(reply_to_address).into()); @@ -249,7 +249,7 @@ impl UMessageBuilder { /// let request_message_id = UUIDBuilder::build(); /// let request_message = UMessageBuilder::request(method_to_invoke.clone(), reply_to_address.clone(), 5000) /// .with_message_id(request_message_id.clone()) // normally not needed, used only for asserts below - /// .build_with_payload("lock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// /// let response_message = UMessageBuilder::response_for_request(&request_message.attributes) /// .with_priority(UPriority::UPRIORITY_CS5) @@ -307,11 +307,11 @@ impl UMessageBuilder { /// builder.with_priority(UPriority::UPRIORITY_CS2); /// let message_one = builder /// .with_message_id(UUIDBuilder::build()) - /// .build_with_payload("closed".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// let message_two = builder /// // use new message ID but retain all other attributes /// .with_message_id(UUIDBuilder::build()) - /// .build_with_payload("open".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("open", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_ne!(message_one.attributes.id, message_two.attributes.id); /// assert_eq!(message_one.attributes.source, message_two.attributes.source); /// assert_eq!(message_one.attributes.priority, UPriority::UPRIORITY_CS2.into()); @@ -351,7 +351,7 @@ impl UMessageBuilder { /// let topic = UUri::try_from("my-vehicle/4210/1/B24D")?; /// let message = UMessageBuilder::publish(topic) /// .with_priority(UPriority::UPRIORITY_CS5) - /// .build_with_payload("closed".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS5.into()); /// # Ok(()) /// # } @@ -424,7 +424,7 @@ impl UMessageBuilder { /// let token = String::from("this-is-my-token"); /// let message = UMessageBuilder::request(method_to_invoke, reply_to_address, 5000) /// .with_token(token.clone()) - /// .build_with_payload("lock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.token, Some(token)); /// # Ok(()) /// # } @@ -460,7 +460,7 @@ impl UMessageBuilder { /// let reply_to_address = UUri::try_from("my-cloud/BA4C/1/0")?; /// let message = UMessageBuilder::request(method_to_invoke, reply_to_address, 5000) /// .with_permission_level(12) - /// .build_with_payload("lock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.permission_level, Some(12)); /// # Ok(()) /// # } @@ -590,7 +590,7 @@ impl UMessageBuilder { .map_err(UMessageError::from) .map(|_| UMessage { attributes: Some(attributes).into(), - payload: self.payload.as_ref().map(|bytes| bytes.to_vec()), + payload: self.payload.to_owned(), ..Default::default() }) } @@ -619,17 +619,17 @@ impl UMessageBuilder { /// # fn main() -> Result<(), Box> { /// let topic = UUri::try_from("my-vehicle/4210/1/B24D")?; /// let message = UMessageBuilder::publish(topic) - /// .build_with_payload("locked".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("locked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert!(message.payload.is_some()); /// # Ok(()) /// # } /// ``` - pub fn build_with_payload( + pub fn build_with_payload>( &mut self, - payload: Bytes, + payload: T, format: UPayloadFormat, ) -> Result { - self.payload = Some(payload); + self.payload = Some(payload.into()); self.payload_format = format; self.build() @@ -681,7 +681,7 @@ impl UMessageBuilder { .map_err(UMessageError::from) .and_then(|serialized_payload| { self.build_with_payload( - serialized_payload.into(), + serialized_payload, UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF, ) }) @@ -733,7 +733,7 @@ impl UMessageBuilder { .and_then(|any| any.write_to_bytes().map_err(UMessageError::from)) .and_then(|serialized_payload| { self.build_with_payload( - serialized_payload.into(), + serialized_payload, UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY, ) }) @@ -824,11 +824,11 @@ mod tests { let mut builder = UMessageBuilder::publish(topic); let message_one = builder .with_message_id(UUIDBuilder::build()) - .build_with_payload("locked".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT) + .build_with_payload("locked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT) .expect("should have been able to create message"); let message_two = builder .with_message_id(UUIDBuilder::build()) - .build_with_payload("unlocked".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT) + .build_with_payload("unlocked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT) .expect("should have been able to create message"); assert_eq!(message_one.attributes.type_, message_two.attributes.type_); assert_ne!(message_one.attributes.id, message_two.attributes.id); @@ -844,7 +844,7 @@ mod tests { .with_message_id(message_id.clone()) .with_priority(UPriority::UPRIORITY_CS2) .with_ttl(5000) - .build_with_payload("locked".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT) + .build_with_payload("locked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT) .expect("should have been able to create message"); assert_eq!(message.attributes.id, Some(message_id).into()); assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS2.into()); @@ -870,7 +870,7 @@ mod tests { .with_permission_level(5) .with_priority(UPriority::UPRIORITY_CS4) .with_token(token.clone()) - .build_with_payload("unlock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT) + .build_with_payload("unlock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT) .expect("should have been able to create message"); assert_eq!(message.attributes.id, Some(message_id).into()); From a15872c349b169c8e6df2a50154c86d495d439b9 Mon Sep 17 00:00:00 2001 From: Kai Hudalla Date: Tue, 21 May 2024 11:26:26 +0200 Subject: [PATCH 3/3] Only set message priority if not default priority --- src/umessage/umessagebuilder.rs | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/umessage/umessagebuilder.rs b/src/umessage/umessagebuilder.rs index 89ed1494..13babad9 100644 --- a/src/umessage/umessagebuilder.rs +++ b/src/umessage/umessagebuilder.rs @@ -21,6 +21,8 @@ use crate::{ UUID, }; +const PRIORITY_DEFAULT: UPriority = UPriority::UPRIORITY_CS1; + /// A builder for creating [`UMessage`]s. /// /// Messages are being used by a uEntity to inform other entities about the occurrence of events @@ -51,7 +53,7 @@ impl Default for UMessageBuilder { payload: None, payload_format: UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED, permission_level: None, - priority: UPriority::UPRIORITY_CS1, + priority: UPriority::UPRIORITY_UNSPECIFIED, request_id: None, sink: None, source: None, @@ -81,7 +83,7 @@ impl UMessageBuilder { /// let message = UMessageBuilder::publish(topic.clone()) /// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_PUBLISH.into()); - /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS1.into()); + /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_UNSPECIFIED.into()); /// assert_eq!(message.attributes.source, Some(topic).into()); /// # Ok(()) /// # } @@ -115,7 +117,7 @@ impl UMessageBuilder { /// let message = UMessageBuilder::notification(origin.clone(), destination.clone()) /// .build_with_payload("unexpected movement", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_NOTIFICATION.into()); - /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS1.into()); + /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_UNSPECIFIED.into()); /// assert_eq!(message.attributes.source, Some(origin).into()); /// assert_eq!(message.attributes.sink, Some(destination).into()); /// # Ok(()) @@ -342,6 +344,10 @@ impl UMessageBuilder { /// /// The builder. /// + /// # Panics + /// + /// if the builder is used for creating an RPC message but the given priority is less than CS4. + /// /// # Examples /// /// ```rust @@ -362,7 +368,14 @@ impl UMessageBuilder { { assert!(priority.value() >= UPriority::UPRIORITY_CS4.value()) } - self.priority = priority; + if priority != PRIORITY_DEFAULT { + // only set priority explicitly if it differs from the default priority + self.priority = priority; + } else { + // in all other cases set to UNSPECIFIED which will result in the + // priority not being included in the serialized protobuf + self.priority = UPriority::UPRIORITY_UNSPECIFIED; + } self }