diff --git a/requirements.txt b/requirements.txt index 4b7d565..8b01c5e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ up-python==0.2.0-dev -eclipse-zenoh==0.11.0 +eclipse-zenoh==1.0.0-alpha.6 diff --git a/up_transport_zenoh/examples/publish.py b/up_transport_zenoh/examples/publish.py index 57d6d3f..6e03f87 100644 --- a/up_transport_zenoh/examples/publish.py +++ b/up_transport_zenoh/examples/publish.py @@ -23,14 +23,14 @@ from up_transport_zenoh.examples.common_uuri import get_zenoh_default_config from up_transport_zenoh.uptransportzenoh import UPTransportZenoh -source = UUri(authority_name="vehicle1", ue_id=18) +source = UUri(authority_name="publisher", ue_id=1, ue_version_major=1) publisher = UPTransportZenoh.new(get_zenoh_default_config(), source) async def publish_to_zenoh(): # create uuri - uuri = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000) - builder = UMessageBuilder.publish(uuri) + source.resource_id = 0x8001 + builder = UMessageBuilder.publish(source) payload = UPayload.pack(UUri()) umessage = builder.build_from_upayload(payload) status = await publisher.send(umessage) diff --git a/up_transport_zenoh/examples/subscribe.py b/up_transport_zenoh/examples/subscribe.py index 688ea0a..b5fee2c 100644 --- a/up_transport_zenoh/examples/subscribe.py +++ b/up_transport_zenoh/examples/subscribe.py @@ -33,10 +33,10 @@ async def on_receive(self, msg: UMessage) -> None: return UStatus(message="Received event") -source = UUri(authority_name="vehicle1", ue_id=18) +source = UUri(authority_name="subscriber", ue_id=9) transport = UPTransportZenoh.new(get_zenoh_default_config(), source) # create topic uuri -uuri = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000) +uuri = UUri(authority_name="publisher", ue_id=1, ue_version_major=1, resource_id=0x8001) async def subscribe_to_zenoh_if_subscription_service_is_not_running(): diff --git a/up_transport_zenoh/uptransportzenoh.py b/up_transport_zenoh/uptransportzenoh.py index 344cd51..c46f614 100644 --- a/up_transport_zenoh/uptransportzenoh.py +++ b/up_transport_zenoh/uptransportzenoh.py @@ -29,8 +29,8 @@ from uprotocol.v1.umessage_pb2 import UMessage from uprotocol.v1.uri_pb2 import UUri from uprotocol.v1.ustatus_pb2 import UStatus -from zenoh import Config, Query, Queryable, Sample, Session, Subscriber, Value -from zenoh.keyexpr import KeyExpr +from zenoh import Config, Query, Queryable, Sample, Session, Subscriber +from zenoh.zenoh import KeyExpr from up_transport_zenoh.zenohutils import MessageFlag, ZenohUtils @@ -93,7 +93,7 @@ def send_publish_notification(self, zenoh_key: str, payload: bytes, attributes: logging.debug(f"Priority: {priority}") logging.debug(f"Attachment: {attachment}") - self.session.put(keyexpr=zenoh_key, value=payload, attachment=attachment, priority=priority) + self.session.put(key_expr=zenoh_key, payload=payload, attachment=attachment, priority=priority) msg = "Successfully sent data to Zenoh" logging.debug(f"SUCCESS:{msg}") return UStatus(code=UCode.OK, message=msg) @@ -111,8 +111,8 @@ def send_request(self, zenoh_key: str, payload: bytes, attributes: UAttributes) return UStatus(code=UCode.INVALID_ARGUMENT, message=msg) resp_callback = None for saved_zenoh_key, listener in self.rpc_callback_map.items(): - keyexpr_zenohkey = KeyExpr.new(zenoh_key) - keyexpr_savedkey = KeyExpr.new(saved_zenoh_key) + keyexpr_zenohkey = KeyExpr(zenoh_key) + keyexpr_savedkey = KeyExpr(saved_zenoh_key) if keyexpr_zenohkey.intersects(keyexpr_savedkey): resp_callback = self.rpc_callback_map.get(saved_zenoh_key) @@ -138,7 +138,7 @@ def handle_response(reply: Query.reply) -> None: logging.debug(msg) return UStatus(code=UCode.INTERNAL, message=msg) # Create UMessage - msg = UMessage(attributes=u_attribute, payload=sample.payload) + msg = UMessage(attributes=u_attribute, payload=bytes(sample.payload)) asyncio.run(resp_callback.on_receive(msg)) except Exception: msg = f"Error while parsing Zenoh reply: {reply.error}" @@ -148,21 +148,19 @@ def handle_response(reply: Query.reply) -> None: # Send query ttl = attributes.ttl / 1000 if attributes.ttl is not None else 1000 - value = Value(payload) # Send the query - get_builder = self.session.get( - zenoh_key, - zenoh.Queue(), - target=zenoh.QueryTarget.BEST_MATCHING(), + replies = self.session.get( + selector=zenoh_key, + target=zenoh.QueryTarget.BEST_MATCHING, attachment=attachment, - value=value, + payload=payload, timeout=ttl, ) def get_response(): try: - for reply in get_builder.receiver: - if reply.is_ok: + for reply in replies: + if reply.ok: handle_response(reply) break except Exception: @@ -192,11 +190,9 @@ def send_response(self, payload: bytes, attributes: UAttributes) -> UStatus: msg = "Query doesn't exist" logging.debug(msg) return UStatus(code=UCode.INTERNAL, message=msg) # Send back the query - value = Value(payload) - reply = Sample(query.key_expr, value, attachment=attachment) try: - query.reply(reply) + query.reply(query.key_expr, payload, attachment=attachment) msg = "Successfully sent rpc response to Zenoh" logging.debug(f"SUCCESS:{msg}") return UStatus(code=UCode.OK, message=msg) @@ -223,7 +219,7 @@ def callback(sample: Sample) -> None: msg = "Unable to decode attributes" logging.debug(msg) return UStatus(code=UCode.INTERNAL, message=msg) - message = UMessage(attributes=u_attribute, payload=sample.payload) + message = UMessage(attributes=u_attribute, payload=bytes(sample.payload)) asyncio.run(listener.on_receive(message)) # Create Zenoh subscriber @@ -260,7 +256,7 @@ def callback(query: Query) -> None: logging.debug(msg) return UStatus(code=UCode.INTERNAL, message=msg) - message = UMessage(attributes=u_attribute, payload=query.value.payload if query.value else None) + message = UMessage(attributes=u_attribute, payload=bytes(query.payload) if query.payload else None) self.query_map[u_attribute.id.SerializeToString()] = query asyncio.run(listener.on_receive(message)) @@ -286,7 +282,6 @@ async def send(self, message: UMessage) -> UStatus: source = attributes.source sink = attributes.sink zenoh_key = ZenohUtils.to_zenoh_key_string(self.authority_name, source, sink) - if not source: return UStatus(code=UCode.INVALID_ARGUMENT, message="attributes.source shouldn't be empty") payload = message.payload or b'' diff --git a/up_transport_zenoh/zenohutils.py b/up_transport_zenoh/zenohutils.py index e19e824..220fdf2 100644 --- a/up_transport_zenoh/zenohutils.py +++ b/up_transport_zenoh/zenohutils.py @@ -20,14 +20,12 @@ from uprotocol.uri.factory.uri_factory import UriFactory from uprotocol.v1.uattributes_pb2 import ( UAttributes, - UPayloadFormat, UPriority, ) from uprotocol.v1.ucode_pb2 import UCode from uprotocol.v1.uri_pb2 import UUri from uprotocol.v1.ustatus_pb2 import UStatus -from zenoh import Encoding, Priority -from zenoh.value import Attachment +from zenoh import Priority, ZBytes UATTRIBUTE_VERSION: int = 1 @@ -72,77 +70,71 @@ def get_uauth_from_uuri(uri: UUri) -> Union[str, UStatus]: @staticmethod def to_zenoh_key_string(authority_name: str, src_uri: UUri, dst_uri: UUri = None) -> str: src = ZenohUtils.uri_to_zenoh_key(authority_name, src_uri) - dst = ZenohUtils.uri_to_zenoh_key(authority_name, dst_uri) if dst_uri else "{}/{}/{}/{}" + dst = ZenohUtils.uri_to_zenoh_key(authority_name, dst_uri) if dst_uri and dst_uri != UUri() else "{}/{}/{}/{}" return f"up/{src}/{dst}" @staticmethod def map_zenoh_priority(upriority: UPriority) -> Priority: mapping = { - UPriority.UPRIORITY_CS0: Priority.BACKGROUND(), - UPriority.UPRIORITY_CS1: Priority.DATA_LOW(), - UPriority.UPRIORITY_CS2: Priority.DATA(), - UPriority.UPRIORITY_CS3: Priority.DATA_HIGH(), - UPriority.UPRIORITY_CS4: Priority.INTERACTIVE_LOW(), - UPriority.UPRIORITY_CS5: Priority.INTERACTIVE_HIGH(), - UPriority.UPRIORITY_CS6: Priority.REAL_TIME(), - UPriority.UPRIORITY_UNSPECIFIED: Priority.DATA_LOW(), + UPriority.UPRIORITY_CS0: Priority.BACKGROUND, + UPriority.UPRIORITY_CS1: Priority.DATA_LOW, + UPriority.UPRIORITY_CS2: Priority.DATA, + UPriority.UPRIORITY_CS3: Priority.DATA_HIGH, + UPriority.UPRIORITY_CS4: Priority.INTERACTIVE_LOW, + UPriority.UPRIORITY_CS5: Priority.INTERACTIVE_HIGH, + UPriority.UPRIORITY_CS6: Priority.REAL_TIME, + UPriority.UPRIORITY_UNSPECIFIED: Priority.DATA_LOW, } return mapping[upriority] - @staticmethod - def to_upayload_format(encoding: Encoding) -> UPayloadFormat: - try: - value = int(encoding.suffix) - return value if UPayloadFormat.Name(value) else None - except (ValueError, AttributeError): - return None - @staticmethod def uattributes_to_attachment(uattributes: UAttributes): - attachment = [("", UATTRIBUTE_VERSION.to_bytes(1, byteorder='little')), ("", uattributes.SerializeToString())] - return attachment + # Convert the version number to bytes (assuming 1 as in the Rust example) + version_bytes = UATTRIBUTE_VERSION.to_bytes(1, byteorder='little') + + # Serialize the UAttributes to bytes + uattributes_bytes = uattributes.SerializeToString() + + # Combine version bytes and uattributes bytes into one list of bytes + attachment_bytes = [version_bytes, uattributes_bytes] + + # Convert the combined bytes to ZBytes + return attachment_bytes @staticmethod - def attachment_to_uattributes(attachment: Attachment) -> UAttributes: + def attachment_to_uattributes(attachment: ZBytes) -> UAttributes: try: - version = None - version_found = False - uattributes = None - - items = attachment.items() - for pair in items: - if not version_found: - version = pair[1] - version_found = True - else: - # Process UAttributes data - uattributes = UAttributes() - uattributes.ParseFromString(pair[1]) - break - - if version is None: - msg = f"UAttributes version is empty (should be {UATTRIBUTE_VERSION})" - logging.debug(msg) - raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg) + # Convert ZBytes to a list of bytes + attachment_bytes = attachment.deserialize(list) - if not version_found: - msg = "UAttributes version is missing in the attachment" + # Ensure there is at least one byte for the version + if len(attachment_bytes) < 1: + msg = "Unable to get the UAttributes version" logging.debug(msg) raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg) - if version != UATTRIBUTE_VERSION.to_bytes(1, byteorder='little'): + # Check the version + version = int.from_bytes(bytes(attachment_bytes[0]), byteorder='big') + if version != UATTRIBUTE_VERSION: msg = f"UAttributes version is {version} (should be {UATTRIBUTE_VERSION})" logging.debug(msg) raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg) - if uattributes is None: + # Get the attributes from the remaining bytes + uattributes_data = bytes(attachment_bytes[1]) + if not uattributes_data: msg = "Unable to get the UAttributes" logging.debug(msg) raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg) + # Parse the UAttributes from the bytes + uattributes = UAttributes() + uattributes.ParseFromString(uattributes_data) + return uattributes + except Exception as e: - msg = f"Failed to convert Attachment to UAttributes: {e}" + msg = f"Failed to convert Attachment to UAttributes: {str(e)}" logging.debug(msg) raise UStatusError.from_code_message(code=UCode.INVALID_ARGUMENT, message=msg)