diff --git a/Cargo.lock b/Cargo.lock index df23a775..e761b0a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1144,6 +1144,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58678a64de2fced2bdec6bca052a6716a0efe692d6e3f53d1bda6a1def64cfc0" dependencies = [ + "bytes", "once_cell", "protobuf-support", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 5dccffd6..694e0a6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ bytes = { version = "1.5" } chrono = { version = "0.4.32" } mediatype = "0.19" once_cell = { version = "1.19" } -protobuf = { version = "3.3" } +protobuf = { version = "3.3", features = ["with-bytes"] } rand = { version = "0.8" } regex = { version = "1.10" } url = { version = "2.5" } diff --git a/build.rs b/build.rs index 0d941b75..7241f816 100644 --- a/build.rs +++ b/build.rs @@ -16,6 +16,8 @@ use std::fs; use std::path::Path; use std::path::PathBuf; +use protobuf_codegen::Customize; + const UPROTOCOL_BASE_URI: &str = "https://raw.githubusercontent.com/eclipse-uprotocol/up-spec/main/up-core-api/uprotocol"; @@ -26,7 +28,6 @@ fn main() -> Result<(), Box> { format!("{}/uuid.proto", UPROTOCOL_BASE_URI).as_str(), format!("{}/uri.proto", UPROTOCOL_BASE_URI).as_str(), format!("{}/uattributes.proto", UPROTOCOL_BASE_URI).as_str(), - format!("{}/upayload.proto", UPROTOCOL_BASE_URI).as_str(), format!("{}/umessage.proto", UPROTOCOL_BASE_URI).as_str(), format!("{}/ustatus.proto", UPROTOCOL_BASE_URI).as_str(), // not used in the SDK yet, but for completeness sake @@ -73,6 +74,7 @@ fn get_and_build_protos( .protoc() // use vendored protoc instead of relying on user provided protobuf installation .protoc_path(&protoc_bin_vendored::protoc_bin_path().unwrap()) + .customize(Customize::default().tokio_bytes(true)) .include(proto_folder) .inputs(proto_files) .cargo_out_dir(output_folder) diff --git a/src/lib.rs b/src/lib.rs index 764dd5e2..b5b40166 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,22 +44,17 @@ // up_core_api types used and augmented by up_rust - symbols re-exported to toplevel, errors are module-specific mod rpc; -pub use rpc::{RpcClient, RpcClientResult}; -pub use rpc::{RpcMapper, RpcMapperError}; -pub use rpc::{RpcPayload, RpcPayloadResult, RpcResult}; +pub use rpc::{RpcClient, RpcClientResult, RpcResult}; mod uattributes; pub use uattributes::{ PublishValidator, RequestValidator, ResponseValidator, UAttributesValidator, UAttributesValidators, }; -pub use uattributes::{UAttributes, UAttributesError, UMessageType, UPriority}; +pub use uattributes::{UAttributes, UAttributesError, UMessageType, UPayloadFormat, UPriority}; mod umessage; -pub use umessage::{UMessage, UMessageBuilder, UMessageBuilderError}; - -mod upayload; -pub use upayload::{UPayload, UPayloadError, UPayloadFormat}; +pub use umessage::{UMessage, UMessageBuilder, UMessageError}; mod uri; pub use uri::{UUri, UUriError}; diff --git a/src/rpc.rs b/src/rpc.rs index fae00401..8b877acb 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -12,9 +12,7 @@ ********************************************************************************/ mod rpcclient; -mod rpcmapper; mod rpcresult; pub use rpcclient::*; -pub use rpcmapper::*; pub use rpcresult::*; diff --git a/src/rpc/rpcclient.rs b/src/rpc/rpcclient.rs index 84a3b770..00c3ba8b 100644 --- a/src/rpc/rpcclient.rs +++ b/src/rpc/rpcclient.rs @@ -13,9 +13,9 @@ use async_trait::async_trait; -use crate::{RpcMapperError, UMessage, UPayload, UPriority, UUri}; +use crate::{UMessage, UMessageError, UUri}; -pub type RpcClientResult = Result; +pub type RpcClientResult = Result; /// `RpcClient` is an interface used by code generators for uProtocol services defined in `.proto` files such as /// the core uProtocol services found in [uProtocol Core API](https://github.com/eclipse-uprotocol/up-spec/tree/main/up-core-api). @@ -35,21 +35,10 @@ pub trait RpcClient: Send + Sync { /// /// * `method` - The URI of the method to be invoked. For example, in long form: "/example.hello_world/1/rpc.SayHello". /// * `request` - The request message to be sent to the server. - /// * `priority` - The priority to use for sending the request and corresponding response messages. Must be at least - /// [`UPriority::UPRIORITY_CS4`], which is also the default if not specified explicitly. - /// * `ttl` - The request's time-to-live in milliseconds. - /// * `token` - The authorization token to use for TAP. /// /// # Returns /// /// Returns a `RpcClientResult` which contains the response message. /// If the invocation fails, it contains a `UStatus` detailing the failure reason. - async fn invoke_method( - &self, - method: UUri, - request: UPayload, - priority: Option, - ttl: Option, - token: Option, - ) -> RpcClientResult; + async fn invoke_method(&self, method: UUri, request: UMessage) -> RpcClientResult; } diff --git a/src/rpc/rpcmapper.rs b/src/rpc/rpcmapper.rs deleted file mode 100644 index 91f3fd7d..00000000 --- a/src/rpc/rpcmapper.rs +++ /dev/null @@ -1,528 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2023 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ - -use std::{default::Default, fmt}; - -use protobuf::{well_known_types::any::Any, MessageFull}; - -use crate::{RpcClientResult, UPayload, UPayloadFormat, UStatus}; - -pub type RpcPayloadResult = Result; - -#[derive(Clone)] -pub struct RpcPayload { - pub status: UStatus, - pub payload: Option, -} - -#[derive(Debug)] -pub enum RpcMapperError { - UnexpectedError(String), - InvalidPayload(String), - UnknownType(String), - ProtobufError(String), -} - -impl fmt::Display for RpcMapperError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - RpcMapperError::UnexpectedError(msg) => write!(f, "Unexpected error: {msg}"), - RpcMapperError::InvalidPayload(msg) => write!(f, "Invalid payload: {msg}",), - RpcMapperError::UnknownType(msg) => write!(f, "Unknown type: {msg}"), - RpcMapperError::ProtobufError(msg) => write!(f, "Protobuf error: {msg}"), - } - } -} - -/// `RpcMapper` is a structure that provides static methods to wrap an RPC request with -/// an RPC response (uP-L2). APIs that return a `Message` assume that the payload is -/// protobuf-serialized `com.google.protobuf.Any` (USerializationHint.PROTOBUF), and will -/// return an error if anything else is passed. -pub struct RpcMapper; - -impl RpcMapper { - /// Maps the payload data returned by a peer to the expected return type of the RPC method. - /// - /// # Parameters - /// - /// - `response`: A `Result` of type [`RpcClientResult`], representing the response from an RPC call. - /// - /// # Type Parameters - /// - /// - `T`: The expected return type of the RPC method. - /// - /// # Returns - /// - /// Returns a `Result` either containing the expected return type of the RPC method wrapped, - /// or an [`RpcMapperError`]. - /// - /// # Errors - /// - /// This function can return an [`RpcMapperError`] in the following cases: - /// - /// - `InvalidPayload`: If the payload received in the response cannot be decoded into the expected return type `T`. - /// This error includes the detailed error message from the decoding process. - /// - /// - `UnknownType`: If the payload is present but cannot be decoded into a protobuf `Any` type. - /// This typically indicates an issue with the payload format or the expected type `T`. - /// - pub fn map_response(response: RpcClientResult) -> Result - where - T: MessageFull + Default, - { - let message = response?; // Directly returns in case of error - - let Some(payload) = message.payload.into_option() else { - return Err(RpcMapperError::InvalidPayload( - "Payload is empty".to_string(), - )); - }; - if payload.data.is_empty() { - return Err(RpcMapperError::InvalidPayload( - "Payload is empty".to_string(), - )); - } - Any::try_from(payload) - .map_err(|_e| { - RpcMapperError::UnknownType("Couldn't decode payload into Any".to_string()) - }) - .and_then(|any| match any.unpack::() { - Ok(Some(m)) => Ok(m), - Ok(None) => Err(RpcMapperError::InvalidPayload(String::from( - "Any object is not of expected type", - ))), - Err(error) => Err(RpcMapperError::InvalidPayload(error.to_string())), - }) - } - - /// This function checks if a `RpcClientResult` contains a protobuf status type, - /// - if that is so it extracts the status code from the protobuf status and - /// - returns an [`RpcPayloadResult`] result with `UStatus::Ok()` and No(ne) [`UPayload`] if the protobuf status was Ok - /// - returns an [`RpcPayloadResult`] result with a failed `UStatus` (mirroring the protobuf status) and No(ne) [`UPayload`] if the protobuf status was not Ok - /// - if the payload did not contain a protobuf status, return [`RpcPayloadResult`] result with `UStatus::Ok()` and the original payload in Some([`UPayload`]) - /// - /// The usage idea is to apply this function to a `RpcClient::invoke_method()` result, then match the return to see if it's gotten a(ny) valid response, and - /// apply `RpcMapper::map_result()` in case a payload was returned and a specific payload type is expected. - /// - /// # Errors - /// - /// This function can return an `RpcMapperError` in the following cases: - /// - /// - `UnknownType`: If the payload is present but cannot be decoded into a protobuf `Any` type. This indicates an issue with the payload format. - /// - /// - Other errors propagated from the `RpcClientResult` processing, including failure in unpacking a protobuf status or other issues encountered during processing. - /// - /// # Note - /// There is one conscious deviation from the Java SDK: this implementation returns a `failed` status in every case where there's not a protobuf status - /// in the payload. In such cases, the payload is still passed on as a function result so it can be used in further decoding attempts. So there are two - /// things to check with the return from this function: - /// - is there [`UStatus`] information (transporting info about the status of an operation, sent from a remote service)? - /// - is there payload data passed in the result, to be decoded by the caller. - /// - // TODO This entire thing feels klunky and kludgy; this needs to be revisited... - pub fn map_response_to_result(response: RpcClientResult) -> RpcPayloadResult { - let message = response?; // Directly returns in case of error - let Some(payload) = message.payload.into_option() else { - return Err(RpcMapperError::InvalidPayload( - "Payload is empty".to_string(), - )); - }; - if payload.data.is_empty() { - return Err(RpcMapperError::InvalidPayload( - "Payload is empty".to_string(), - )); - } - Any::try_from(payload) - .map_err(|_e| { - RpcMapperError::UnknownType("Couldn't decode payload into Any".to_string()) - }) - .and_then(|any| { - match Self::unpack_any::(&any) { - Ok(proto_status) => Ok(RpcPayload { - status: proto_status, - payload: None, - }), - Err(_error) => { - // in this branch, we couldn't decode the payload into a protobuf-status, but there is something else there to pass on - UPayload::try_from(&any) - .map_err(|e| RpcMapperError::InvalidPayload(e.to_string())) - .map(|payload| RpcPayload { - status: UStatus::fail(format!( - "Unexpected any-payload type {}", - any.type_url - )), - payload: Some(payload), // get the original payload back to avoid having to .clone() payload, above - }) - } - } - }) - } - - /// Packs a protobuf message into a `UPayload` object. - /// - /// This function is used to encapsulate a strongly-typed data object into a `UPayload`, - /// which allows for more generic data handling. It leverages Prost's protobuf encoding for - /// serializing the data. - /// - /// # Type Parameters - /// - /// * `T`: The type of the data to be packed. - /// - /// # Parameters - /// - /// * `data`: The data to pack. - /// - /// # Returns - /// - /// The payload containing the packed data. - /// - /// # Errors - /// - /// Returns an `RpcMapperError` if the protobuf serialization of the data exceeds 2^32 - 1 bytes. - pub fn pack_payload(data: &T) -> Result { - let buf = data.write_to_bytes().map_err(|_e| { - RpcMapperError::ProtobufError(String::from("failed to serialize payload to protobuf")) - })?; - Ok(UPayload { - data: buf, - format: UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF.into(), - ..Default::default() - }) - } - - /// Unpacks a given `UPayload` into a protobuf message. - /// - /// This function is used to extract strongly-typed data from a `UPayload` object, leveraging - /// Prost's protobuf decoding capabilities for deserialization. - /// - /// # Type Parameters - /// - /// * `T`: The target type of the data to be unpacked. - /// - /// # Parameters - /// - /// * `payload`: The `UPayload` object containing the data to be unpacked. - /// - /// # Returns - /// - /// * `Ok(T)`: The deserialized protobuf message contained in the payload. - /// - /// # Errors - /// - /// Returns an `RpcMapperError` if the unpacking process fails, for example if the payload could - /// not be deserialized into the target protobuf type `T`. - pub fn unpack_payload( - payload: UPayload, - ) -> Result { - Any::try_from(payload) - .map_err(|_e| RpcMapperError::UnknownType("Couldn't decode payload".to_string())) - .and_then(|any| { - T::parse_from_bytes(any.value.as_slice()) - .map_err(|error| RpcMapperError::InvalidPayload(error.to_string())) - }) - } - - /// Packs a given `data` of type `T` into a protbuf `Any` object. - /// - /// This function is useful for converting strongly-typed data into an `Any` - /// object for use in message-passing scenarios where the type needs to be - /// encoded as `Any`. - /// - /// # Type Parameters - /// - /// * `T`: The type of the data to be packed. - /// - /// # Parameters - /// - /// * `data`: The data of type `T` that will be packed into the returned `Any` object. - /// - /// # Returns - /// - /// * `Ok(Any)`: A protobuf `Any` object containing the packed `data`. - /// * `Err(RpcMapperError)`: An error that occurred during the packing process. - /// - /// # Errors - /// - /// Returns an `RpcMapperError` if the packing process fails. - pub fn pack_any(data: &T) -> Result { - Any::pack(data).map_err(|error| RpcMapperError::InvalidPayload(error.to_string())) - } - - /// Unpacks a given protbuf `Any` object into a data of type `T`. - /// - /// This function is used to convert an `Any` object back into its original - /// strongly-typed data. It's essentially the reverse operation of `pack_any`. - /// - /// # Type Parameters - /// - /// * `T`: The expected type of the unpacked data. This type must implement `prost::Name` - /// for type URL validation and `std::default::Default` for initializing the type. - /// - /// # Parameters - /// - /// * `any`: The `Any` object that will be unpacked. - /// - /// # Returns - /// - /// * `Ok(T)`: A `T` object containing the unpacked data. - /// * `Err(RpcMapperError)`: An error that occurred during the unpacking process. - /// - /// # Errors - /// - /// Returns an `RpcMapperError` if the unpacking process fails, for example due to type mismatch - /// or if the data inside `Any` could not be decoded into type `T`. - pub fn unpack_any(any: &Any) -> Result { - match any.unpack() { - Err(error) => Err(RpcMapperError::InvalidPayload(error.to_string())), - Ok(v) => match v { - Some(msg) => Ok(msg), - None => Err(RpcMapperError::ProtobufError(String::from( - "Any object does not contain payload", - ))), - }, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use bytes::{Buf, BufMut}; - use protobuf::MessageField; - - use crate::{UCode, UMessage}; - - fn build_status_response(code: UCode, msg: &str) -> RpcClientResult { - let status: UStatus = UStatus::fail_with_code(code, msg); - let any = RpcMapper::pack_any(&status)?; - let payload = - UPayload::try_from(any).map_err(|e| RpcMapperError::InvalidPayload(e.to_string()))?; - let message = UMessage { - payload: MessageField::some(payload), - ..Default::default() - }; - Ok(message) - } - - fn build_empty_payload_response() -> RpcClientResult { - let payload = UPayload { - data: vec![], - ..Default::default() - }; - let message = UMessage { - payload: MessageField::some(payload), - ..Default::default() - }; - Ok(message) - } - - fn build_number_response(number: i32) -> RpcClientResult { - let any: Any = Any { - type_url: "type.googleapis.com/Int32Value".to_string(), - value: number.to_be_bytes().into(), - ..Default::default() - }; - let payload = - UPayload::try_from(any).map_err(|e| RpcMapperError::InvalidPayload(e.to_string()))?; - let message = UMessage { - payload: MessageField::some(payload), - ..Default::default() - }; - Ok(message) - } - - #[test] - fn test_map_response_to_result_happy_path() { - let result = RpcMapper::map_response_to_result(build_number_response(3)).unwrap(); - - assert!(result.status.is_failed()); // TODO this seems strange - - let payload = result.payload.unwrap(); - let any = Any::try_from(payload).unwrap(); - assert_eq!("type.googleapis.com/Int32Value", any.type_url); - let value = (&any.value[..]).get_i32(); - assert_eq!(value, 3); - } - - #[test] - fn test_compose_that_returns_status() { - let response = build_status_response(UCode::INVALID_ARGUMENT, "boom"); - - let result = RpcMapper::map_response_to_result(response).unwrap(); - - assert!(result.status.is_failed()); - assert_eq!(result.status.get_code(), UCode::INVALID_ARGUMENT); - assert_eq!(result.status.message.unwrap(), "boom"); - } - - #[test] - fn test_compose_with_failure() { - let response = Err(RpcMapperError::UnexpectedError("Boom".to_string())); - let result = RpcMapper::map_response_to_result(response); - - assert!(result.is_err()); - assert_eq!(result.err().unwrap().to_string(), "Unexpected error: Boom"); - } - - #[test] - fn test_fail_invoke_method_when_invoke_method_returns_a_status_using_map_response_to_rpc_response( - ) { - let response = build_status_response(UCode::INVALID_ARGUMENT, "boom"); - let result = RpcMapper::map_response_to_result(response).unwrap(); - - assert!(result.status.is_failed()); - assert_eq!(UCode::INVALID_ARGUMENT, result.status.get_code()); - assert_eq!("boom", result.status.message.unwrap()); - } - - #[test] - fn test_fail_invoke_method_when_invoke_method_returns_a_bad_proto_using_map_response_to_rpc_response( - ) { - let response = build_number_response(42); - let result = RpcMapper::map_response_to_result(response).unwrap(); - - assert!(result.status.is_failed()); - assert_eq!( - result.status.message.unwrap(), - "Unexpected any-payload type type.googleapis.com/Int32Value" - ); - } - - // Create a generic UMessage for use in test cases - fn build_umessage_for_test() -> UMessage { - let arbitrary_proto = crate::up_core_api::file::FileBatch::default(); - let any = RpcMapper::pack_any(&arbitrary_proto).unwrap(); - - let payload = UPayload::try_from(any).unwrap(); - UMessage { - payload: MessageField::some(payload), - ..Default::default() - } - } - - #[test] - fn test_success_invoke_method_happy_flow_using_map_response_to_rpc_response() { - let response_message = build_umessage_for_test(); - let result = RpcMapper::map_response_to_result(Ok(response_message.clone())).unwrap(); - - assert!(result.status.is_failed()); - assert_eq!(result.payload.unwrap(), response_message.payload.unwrap()); - } - - #[test] - fn test_success_invoke_method_happy_flow_using_map_response() { - let response_message = build_umessage_for_test(); - let file_batch = - RpcMapper::map_response::(Ok(response_message)) - .unwrap(); - - assert_eq!(file_batch, crate::up_core_api::file::FileBatch::default()); - } - - #[test] - fn test_fail_invoke_method_when_invoke_method_returns_a_status_using_map_response() { - let response = build_status_response(UCode::ABORTED, "hello"); - let e = RpcMapper::map_response::(response); - - assert!(e.is_err()); - } - - #[test] - fn test_fail_invoke_method_when_invoke_method_returns_a_bad_proto_using_map_response() { - let response = build_number_response(42); - let e = RpcMapper::map_response::(response); - - assert!(e.is_err()); - } - - // all these stub-using tests, what do they add? - - #[test] - fn test_success_invoke_method_that_has_null_payload_map_response() { - let response = Err(RpcMapperError::InvalidPayload( - "not a CloudEvent".to_string(), - )); - let result = RpcMapper::map_response::(response); - - assert!(result.is_err()); - assert_eq!( - result.err().unwrap().to_string(), - "Invalid payload: not a CloudEvent" - ); - } - - #[test] - fn test_success_invoke_method_that_has_null_payload_map_response_to_result() { - let response = Err(RpcMapperError::InvalidPayload( - "Invalid payload".to_string(), - )); - let result = RpcMapper::map_response_to_result(response); - - assert!(result.is_err()); - assert_eq!( - result.err().unwrap().to_string(), - "Invalid payload: Invalid payload" - ); - } - - #[test] - fn test_success_invoke_method_happy_flow_that_returns_status_using_map_response() { - let response = build_status_response(UCode::OK, "all good"); - let s = RpcMapper::map_response::(response).unwrap(); - let ustatus = s; - - assert_eq!(UCode::OK, ustatus.get_code()); - assert_eq!("all good", ustatus.message.unwrap()); - } - - #[test] - fn test_success_invoke_method_happy_flow_that_returns_status_using_map_response_to_result_to_rpc_response( - ) { - let response = build_status_response(UCode::OK, "all good"); - let s = RpcMapper::map_response_to_result(response).unwrap(); - - assert!(s.status.is_success()); - assert_eq!(s.status.get_code(), UCode::OK); - } - - #[test] - fn test_unpack_payload_failed() { - let payload = Any { - type_url: "type.googleapis.com/Int32Value".to_string(), - value: { - let mut buf = vec![]; - buf.put_i32(42); - buf - }, - ..Default::default() - }; - - let result: Result = RpcMapper::unpack_any::(&payload); - - assert!(result.is_err()); - } - - #[test] - fn test_invalid_payload_that_is_not_type_any() { - let response = build_empty_payload_response(); - let result = RpcMapper::map_response::(response); - assert!(result.is_err()); - } - - #[test] - fn test_invalid_payload_that_is_not_type_any_map_to_result() { - let response = build_empty_payload_response(); - let result = RpcMapper::map_response_to_result(response); - assert!(result.is_err()); - } -} diff --git a/src/umessage.rs b/src/umessage.rs index 35111039..d27f9ff9 100644 --- a/src/umessage.rs +++ b/src/umessage.rs @@ -17,3 +17,92 @@ mod umessagetype; pub use umessagebuilder::*; pub use crate::up_core_api::umessage::UMessage; + +use crate::{UAttributesError, UPayloadFormat}; +use protobuf::{well_known_types::any::Any, Message}; + +#[derive(Debug)] +pub enum UMessageError { + AttributesValidationError(UAttributesError), + DataSerializationError(protobuf::Error), + PayloadError(String), +} + +impl std::fmt::Display for UMessageError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::AttributesValidationError(e) => f.write_fmt(format_args!( + "Builder state is not consistent with message type: {}", + e + )), + Self::DataSerializationError(e) => { + f.write_fmt(format_args!("Failed to serialize payload: {}", e)) + } + Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {}", e)), + } + } +} + +impl std::error::Error for UMessageError {} + +impl From for UMessageError { + fn from(value: UAttributesError) -> Self { + Self::AttributesValidationError(value) + } +} + +impl From for UMessageError { + fn from(value: protobuf::Error) -> Self { + Self::DataSerializationError(value) + } +} + +impl From<&str> for UMessageError { + fn from(value: &str) -> Self { + Self::PayloadError(value.into()) + } +} + +impl UMessage { + /// Extracts the payload-contained protobuf message from a `UMessage`. + /// + /// This function is used to extract strongly-typed data from a `UMessage` object, + /// taking into account `UMessage::UPayloadFormat` (will only succeed if payload format is + /// `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF` or `UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`) + /// + /// # Type Parameters + /// + /// * `T`: The target type of the data to be unpacked. + /// + /// # Returns + /// + /// * `Ok(T)`: The deserialized protobuf message contained in the payload. + /// + /// # Errors + /// + /// * Err(`UMessageError`) if the unpacking process fails, for example if the payload could + /// not be deserialized into the target type `T`. + pub fn extract_protobuf_payload(&self) -> Result { + if let Some(payload) = self.payload.as_ref() { + match self.attributes.payload_format.enum_value_or_default() { + UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF => { + return T::parse_from_bytes(payload.as_ref()) + .map_err(UMessageError::DataSerializationError); + } + UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY => { + return Any::parse_from_bytes(payload.as_ref()) + .map_err(UMessageError::DataSerializationError) + .and_then(|any| { + T::parse_from_bytes(any.value.as_slice()) + .map_err(UMessageError::DataSerializationError) + }); + } + _ => Err(UMessageError::from( + "Unknown/invalid/unsupported payload format", + )), + } + } else { + Err(UMessageError::from("Payload is empty")) + } + } +} diff --git a/src/umessage/umessagebuilder.rs b/src/umessage/umessagebuilder.rs index d9bea6d8..13babad9 100644 --- a/src/umessage/umessagebuilder.rs +++ b/src/umessage/umessagebuilder.rs @@ -12,47 +12,16 @@ ********************************************************************************/ use bytes::Bytes; -use protobuf::{Enum, EnumOrUnknown, Message}; +use protobuf::{well_known_types::any::Any, Enum, EnumOrUnknown, Message, MessageFull}; -use crate::uattributes::{NotificationValidator, UAttributesError}; +use crate::uattributes::NotificationValidator; use crate::{ PublishValidator, RequestValidator, ResponseValidator, UAttributes, UAttributesValidator, - UCode, UMessage, UMessageType, UPayload, UPayloadFormat, UPriority, UUIDBuilder, UUri, UUID, + UCode, UMessage, UMessageError, UMessageType, UPayloadFormat, UPriority, UUIDBuilder, UUri, + UUID, }; -#[derive(Debug)] -pub enum UMessageBuilderError { - DataSerializationError(protobuf::Error), - AttributesValidationError(UAttributesError), -} - -impl std::fmt::Display for UMessageBuilderError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::DataSerializationError(e) => { - f.write_fmt(format_args!("Failed to serialize payload: {}", e)) - } - Self::AttributesValidationError(e) => f.write_fmt(format_args!( - "Builder state is not consistent with message type: {}", - e - )), - } - } -} - -impl std::error::Error for UMessageBuilderError {} - -impl From for UMessageBuilderError { - fn from(value: UAttributesError) -> Self { - Self::AttributesValidationError(value) - } -} - -impl From for UMessageBuilderError { - fn from(value: protobuf::Error) -> Self { - Self::DataSerializationError(value) - } -} +const PRIORITY_DEFAULT: UPriority = UPriority::UPRIORITY_CS1; /// A builder for creating [`UMessage`]s. /// @@ -84,7 +53,7 @@ impl Default for UMessageBuilder { payload: None, payload_format: UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED, permission_level: None, - priority: UPriority::UPRIORITY_CS1, + priority: UPriority::UPRIORITY_UNSPECIFIED, request_id: None, sink: None, source: None, @@ -112,9 +81,9 @@ impl UMessageBuilder { /// # fn main() -> Result<(), Box> { /// let topic = UUri::try_from("my-vehicle/4210/1/B24D")?; /// let message = UMessageBuilder::publish(topic.clone()) - /// .build_with_payload("closed".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_PUBLISH.into()); - /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS1.into()); + /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_UNSPECIFIED.into()); /// assert_eq!(message.attributes.source, Some(topic).into()); /// # Ok(()) /// # } @@ -146,9 +115,9 @@ impl UMessageBuilder { /// let origin = UUri::try_from("my-vehicle/4210/5/F20B")?; /// let destination = UUri::try_from("my-cloud/CCDD/2/75FD")?; /// let message = UMessageBuilder::notification(origin.clone(), destination.clone()) - /// .build_with_payload("unexpected movement".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("unexpected movement", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_NOTIFICATION.into()); - /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS1.into()); + /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_UNSPECIFIED.into()); /// assert_eq!(message.attributes.source, Some(origin).into()); /// assert_eq!(message.attributes.sink, Some(destination).into()); /// # Ok(()) @@ -187,7 +156,7 @@ impl UMessageBuilder { /// let method_to_invoke = UUri::try_from("my-vehicle/4210/5/64AB")?; /// let reply_to_address = UUri::try_from("my-cloud/BA4C/1/0")?; /// let message = UMessageBuilder::request(method_to_invoke.clone(), reply_to_address.clone(), 5000) - /// .build_with_payload("lock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.type_, UMessageType::UMESSAGE_TYPE_REQUEST.into()); /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS4.into()); /// assert_eq!(message.attributes.source, Some(reply_to_address).into()); @@ -282,7 +251,7 @@ impl UMessageBuilder { /// let request_message_id = UUIDBuilder::build(); /// let request_message = UMessageBuilder::request(method_to_invoke.clone(), reply_to_address.clone(), 5000) /// .with_message_id(request_message_id.clone()) // normally not needed, used only for asserts below - /// .build_with_payload("lock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// /// let response_message = UMessageBuilder::response_for_request(&request_message.attributes) /// .with_priority(UPriority::UPRIORITY_CS5) @@ -340,11 +309,11 @@ impl UMessageBuilder { /// builder.with_priority(UPriority::UPRIORITY_CS2); /// let message_one = builder /// .with_message_id(UUIDBuilder::build()) - /// .build_with_payload("closed".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// let message_two = builder /// // use new message ID but retain all other attributes /// .with_message_id(UUIDBuilder::build()) - /// .build_with_payload("open".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("open", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_ne!(message_one.attributes.id, message_two.attributes.id); /// assert_eq!(message_one.attributes.source, message_two.attributes.source); /// assert_eq!(message_one.attributes.priority, UPriority::UPRIORITY_CS2.into()); @@ -375,6 +344,10 @@ impl UMessageBuilder { /// /// The builder. /// + /// # Panics + /// + /// if the builder is used for creating an RPC message but the given priority is less than CS4. + /// /// # Examples /// /// ```rust @@ -384,7 +357,7 @@ impl UMessageBuilder { /// let topic = UUri::try_from("my-vehicle/4210/1/B24D")?; /// let message = UMessageBuilder::publish(topic) /// .with_priority(UPriority::UPRIORITY_CS5) - /// .build_with_payload("closed".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("closed", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS5.into()); /// # Ok(()) /// # } @@ -395,7 +368,14 @@ impl UMessageBuilder { { assert!(priority.value() >= UPriority::UPRIORITY_CS4.value()) } - self.priority = priority; + if priority != PRIORITY_DEFAULT { + // only set priority explicitly if it differs from the default priority + self.priority = priority; + } else { + // in all other cases set to UNSPECIFIED which will result in the + // priority not being included in the serialized protobuf + self.priority = UPriority::UPRIORITY_UNSPECIFIED; + } self } @@ -457,7 +437,7 @@ impl UMessageBuilder { /// let token = String::from("this-is-my-token"); /// let message = UMessageBuilder::request(method_to_invoke, reply_to_address, 5000) /// .with_token(token.clone()) - /// .build_with_payload("lock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.token, Some(token)); /// # Ok(()) /// # } @@ -493,7 +473,7 @@ impl UMessageBuilder { /// let reply_to_address = UUri::try_from("my-cloud/BA4C/1/0")?; /// let message = UMessageBuilder::request(method_to_invoke, reply_to_address, 5000) /// .with_permission_level(12) - /// .build_with_payload("lock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("lock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert_eq!(message.attributes.permission_level, Some(12)); /// # Ok(()) /// # } @@ -553,7 +533,7 @@ impl UMessageBuilder { /// # Errors /// /// If the properties set on the builder do not represent a consistent set of [`UAttributes`], - /// a [`UMessageBuilderError::AttributesValidationError`] is returned. + /// a [`UMessageError::AttributesValidationError`] is returned. /// /// # Examples /// @@ -562,7 +542,7 @@ impl UMessageBuilder { /// The recommended way to use the `UMessageBuilder`. /// /// ```rust - /// use up_rust::{UAttributes, UAttributesValidators, UMessageBuilder, UMessageBuilderError, UMessageType, UPriority, UUIDBuilder, UUri}; + /// use up_rust::{UAttributes, UAttributesValidators, UMessageBuilder, UMessageError, UMessageType, UPriority, UUIDBuilder, UUri}; /// /// # fn main() -> Result<(), Box> { /// let invoked_method = UUri::try_from("my-vehicle/4210/5/64AB")?; @@ -599,7 +579,7 @@ impl UMessageBuilder { /// # Ok(()) /// # } /// ``` - pub fn build(&self) -> Result { + pub fn build(&self) -> Result { let message_id = self .message_id .clone() @@ -615,26 +595,16 @@ impl UMessageBuilder { permission_level: self.permission_level, commstatus: self.comm_status, reqid: self.request_id.clone().into(), + payload_format: self.payload_format.into(), ..Default::default() }; self.validator .validate(&attributes) - .map_err(UMessageBuilderError::from) - .map(|_| { - let payload = self - .payload - .as_ref() - .map(|bytes| bytes.to_vec()) - .map(|data| UPayload { - format: self.payload_format.into(), - data, - ..Default::default() - }); - UMessage { - attributes: Some(attributes).into(), - payload: payload.into(), - ..Default::default() - } + .map_err(UMessageError::from) + .map(|_| UMessage { + attributes: Some(attributes).into(), + payload: self.payload.to_owned(), + ..Default::default() }) } @@ -652,7 +622,7 @@ impl UMessageBuilder { /// # Errors /// /// If the properties set on the builder do not represent a consistent set of [`UAttributes`], - /// a [`UMessageBuilderError::AttributesValidationError`] is returned. + /// a [`UMessageError::AttributesValidationError`] is returned. /// /// # Examples /// @@ -662,18 +632,19 @@ impl UMessageBuilder { /// # fn main() -> Result<(), Box> { /// let topic = UUri::try_from("my-vehicle/4210/1/B24D")?; /// let message = UMessageBuilder::publish(topic) - /// .build_with_payload("locked".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; + /// .build_with_payload("locked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT)?; /// assert!(message.payload.is_some()); /// # Ok(()) /// # } /// ``` - pub fn build_with_payload( + pub fn build_with_payload>( &mut self, - payload: Bytes, + payload: T, format: UPayloadFormat, - ) -> Result { - self.payload = Some(payload); + ) -> Result { + self.payload = Some(payload.into()); self.payload_format = format; + self.build() } @@ -682,23 +653,23 @@ impl UMessageBuilder { /// # Arguments /// /// * `payload` - The data to set as payload. - /// * `format` - The payload format. /// /// # Returns /// - /// A message ready to be sent using [`crate::UTransport::send`]. + /// A message ready to be sent using [`crate::UTransport::send`]. The message will have + /// [`UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF`] set as its payload format. /// /// # Errors /// - /// If the given payload cannot be serialized into a byte array, a [`UMessageBuilderError::DataSerializationError`] is returned. + /// If the given payload cannot be serialized into a protobuf byte array, a [`UMessageError::DataSerializationError`] is returned. /// If the properties set on the builder do not represent a consistent set of [`UAttributes`], - /// a [`UMessageBuilderError::AttributesValidationError`] is returned. + /// a [`UMessageError::AttributesValidationError`] is returned. /// /// # Examples /// /// ```rust /// use protobuf::{Enum, Message}; - /// use up_rust::{UCode, UMessageBuilder, UMessageType, UPriority, UStatus, UUIDBuilder, UUri}; + /// use up_rust::{UCode, UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UStatus, UUIDBuilder, UUri}; /// /// # fn main() -> Result<(), Box> { /// let invoked_method = UUri::try_from("my-vehicle/4210/5/64AB")?; @@ -710,23 +681,76 @@ impl UMessageBuilder { /// .with_comm_status(UCode::INVALID_ARGUMENT.value()) /// .build_with_protobuf_payload(&UStatus::fail("failed to parse request"))?; /// assert!(message.payload.is_some()); + /// assert_eq!(message.attributes.payload_format.enum_value().unwrap(), UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF); /// # Ok(()) /// # } /// ``` pub fn build_with_protobuf_payload( &mut self, payload: &T, - ) -> Result { + ) -> Result { payload .write_to_bytes() - .map_err(UMessageBuilderError::from) + .map_err(UMessageError::from) .and_then(|serialized_payload| { self.build_with_payload( - serialized_payload.into(), + serialized_payload, UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF, ) }) } + + /// Creates the message based on the builder's state and some payload. + /// + /// # Arguments + /// + /// * `payload` - The data to set as payload. + /// + /// # Returns + /// + /// A message ready to be sent using [`crate::UTransport::send`]. The message will have + /// [`UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY`] set as its payload format. + /// + /// # Errors + /// + /// If the given payload cannot be serialized into a protobuf byte array, a [`UMessageError::DataSerializationError`] is returned. + /// If the properties set on the builder do not represent a consistent set of [`UAttributes`], + /// a [`UMessageError::AttributesValidationError`] is returned. + /// + /// # Examples + /// + /// ```rust + /// use protobuf::{Enum, Message}; + /// use up_rust::{UCode, UMessageBuilder, UMessageType, UPayloadFormat, UPriority, UStatus, UUIDBuilder, UUri}; + /// + /// # fn main() -> Result<(), Box> { + /// let invoked_method = UUri::try_from("my-vehicle/4210/5/64AB")?; + /// let reply_to_address = UUri::try_from("my-cloud/BA4C/1/0")?; + /// let request_id = UUIDBuilder::build(); + /// // a service implementation would normally use + /// // `UMessageBuilder::response_for_request(&request_message.attributes)` instead + /// let message = UMessageBuilder::response(reply_to_address, request_id, invoked_method) + /// .with_comm_status(UCode::INVALID_ARGUMENT.value()) + /// .build_with_wrapped_protobuf_payload(&UStatus::fail("failed to parse request"))?; + /// assert!(message.payload.is_some()); + /// assert_eq!(message.attributes.payload_format.enum_value().unwrap(), UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY); + /// # Ok(()) + /// # } + /// ``` + pub fn build_with_wrapped_protobuf_payload( + &mut self, + payload: &T, + ) -> Result { + Any::pack(payload) + .map_err(UMessageError::DataSerializationError) + .and_then(|any| any.write_to_bytes().map_err(UMessageError::from)) + .and_then(|serialized_payload| { + self.build_with_payload( + serialized_payload, + UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY, + ) + }) + } } #[cfg(test)] @@ -813,11 +837,11 @@ mod tests { let mut builder = UMessageBuilder::publish(topic); let message_one = builder .with_message_id(UUIDBuilder::build()) - .build_with_payload("locked".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT) + .build_with_payload("locked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT) .expect("should have been able to create message"); let message_two = builder .with_message_id(UUIDBuilder::build()) - .build_with_payload("unlocked".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT) + .build_with_payload("unlocked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT) .expect("should have been able to create message"); assert_eq!(message_one.attributes.type_, message_two.attributes.type_); assert_ne!(message_one.attributes.id, message_two.attributes.id); @@ -833,7 +857,7 @@ mod tests { .with_message_id(message_id.clone()) .with_priority(UPriority::UPRIORITY_CS2) .with_ttl(5000) - .build_with_payload("locked".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT) + .build_with_payload("locked", UPayloadFormat::UPAYLOAD_FORMAT_TEXT) .expect("should have been able to create message"); assert_eq!(message.attributes.id, Some(message_id).into()); assert_eq!(message.attributes.priority, UPriority::UPRIORITY_CS2.into()); @@ -859,7 +883,7 @@ mod tests { .with_permission_level(5) .with_priority(UPriority::UPRIORITY_CS4) .with_token(token.clone()) - .build_with_payload("unlock".into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT) + .build_with_payload("unlock", UPayloadFormat::UPAYLOAD_FORMAT_TEXT) .expect("should have been able to create message"); assert_eq!(message.attributes.id, Some(message_id).into()); diff --git a/src/utransport.rs b/src/utransport.rs index 70e6c84f..1ddd0998 100644 --- a/src/utransport.rs +++ b/src/utransport.rs @@ -44,7 +44,7 @@ use crate::{UMessage, UStatus, UUri}; /// async fn on_receive(&self, msg: UMessage) { /// let mut inner_foo = self.inner_foo.lock().unwrap(); /// if let Some(payload) = msg.payload.as_ref() { -/// *inner_foo = format!("latest message length: {}", payload.data.len()); +/// *inner_foo = format!("latest message length: {}", payload.len()); /// } /// } ///