From 6b921be414d021d84dcfd68ffb0a3ccec96bd0ed Mon Sep 17 00:00:00 2001 From: Matthew D'Alonzo Date: Tue, 16 Jul 2024 11:25:06 -0400 Subject: [PATCH] Update examples to work with Java MQTT --- examples/mqtt5_pub.py | 5 +-- examples/mqtt5_sub.py | 6 ++- up_client_mqtt5_python/mqtt5_utransport.py | 45 ++++++++++++++-------- up_client_mqtt5_python/utils/utils.py | 30 ++++++++++----- 4 files changed, 55 insertions(+), 31 deletions(-) diff --git a/examples/mqtt5_pub.py b/examples/mqtt5_pub.py index 3474d12..d9a6afb 100644 --- a/examples/mqtt5_pub.py +++ b/examples/mqtt5_pub.py @@ -39,9 +39,8 @@ def build_source(): return UUri(authority_name="vcu.matthew.com", ue_id=1234, ue_version_major=1, resource_id=0x8000) - def build_sink(): - return UUri(authority_name="vcu.matthew.com", ue_id=1234, ue_version_major=1, resource_id=0) + return UUri(authority_name="vcu.matthew.com", ue_id=0xFFFF, ue_version_major=0xFF, resource_id=0xFFFF) def build_timestamp_upayload(): @@ -53,7 +52,7 @@ def build_umessage(payload, source=build_source()): if __name__ == "__main__": - mqtt5_publisher = MQTT5UTransport(build_sink(), "client_pub", "127.0.0.1", 1883, False) + mqtt5_publisher = MQTT5UTransport(build_sink(), "client_pub", "127.0.0.1", 8883, False) mqtt5_publisher.connect() umsg: UMessage = build_umessage(build_timestamp_upayload()) while True: diff --git a/examples/mqtt5_sub.py b/examples/mqtt5_sub.py index 07599dd..e4408fb 100644 --- a/examples/mqtt5_sub.py +++ b/examples/mqtt5_sub.py @@ -57,12 +57,14 @@ def on_receive(self, umsg: UMessage) -> None: def build_source(): return UUri(authority_name="vcu.matthew.com", ue_id=1234, ue_version_major=1, resource_id=0x8000) +def build_sink(): + return UUri(authority_name="vcu.matthew.com", ue_id=0xFFFF, ue_version_major=0xFF, resource_id=0xFFFF) if __name__ == "__main__": - mqtt5_subscriber = MQTT5UTransport(build_source(), "client_sub", "127.0.0.1", 1883, False) + mqtt5_subscriber = MQTT5UTransport(build_source(), "client_sub", "127.0.0.1", 8883, False) mqtt5_subscriber.connect() source: UUri = build_source() listener: MQTT5UListener = MQTT5UListener() - mqtt5_subscriber.register_listener(source, listener) + mqtt5_subscriber.register_listener(source, listener, build_sink()) while True: time.sleep(10) diff --git a/up_client_mqtt5_python/mqtt5_utransport.py b/up_client_mqtt5_python/mqtt5_utransport.py index daa96a8..aadebe2 100644 --- a/up_client_mqtt5_python/mqtt5_utransport.py +++ b/up_client_mqtt5_python/mqtt5_utransport.py @@ -40,6 +40,7 @@ build_attributes_from_mqtt_properties, build_message_from_mqtt_message_and_attributes, build_mqtt_properties_from_attributes, + uuri_field_resolver, ) logging.basicConfig(format="%(levelname)s| %(filename)s:%(lineno)s %(message)s") @@ -70,7 +71,7 @@ def __init__(self, source: UUri, client_id: str, host_name: str, port: int, clou self._connected_signal = threading.Event() - self.topic_to_listener: Dict[bytes, List[UListener]] = {} + self.topic_to_listener: Dict[str, List[UListener]] = {} self.reqid_to_future: Dict[bytes, Future] = {} self._mqtt_client = mqtt.Client( @@ -133,11 +134,13 @@ def _listen(self, client, userdata, msg): @param msg: @return: None """ + logger.info(f"Received Message on MQTT: {msg}") attributes: UAttributes = build_attributes_from_mqtt_properties(msg.properties) umsg: UMessage = build_message_from_mqtt_message_and_attributes(msg, attributes) message_type_handlers = { + UMessageType.UMESSAGE_TYPE_UNSPECIFIED: self._handle_unspecified_message, UMessageType.UMESSAGE_TYPE_PUBLISH: self._handle_publish_message, UMessageType.UMESSAGE_TYPE_REQUEST: self._handle_publish_message, UMessageType.UMESSAGE_TYPE_RESPONSE: self._handle_response_message, @@ -147,9 +150,14 @@ def _listen(self, client, userdata, msg): if handler: handler(msg.topic, umsg) else: - raise ValueError("Unsupported message type: " + attributes.type) + raise ValueError("Unsupported message type: " + UMessageType.Name(attributes.type)) - def _handle_response_message(self, umsg: UMessage): + def _handle_unspecified_message(self, topic: str, umsg: UMessage): + logger.info("%s Unspecified Message Received", self.__class__.__name__) + logger.info(f"Message Details: {umsg}") + logger.info(f"Unspecified Message received on topic {topic}") + + def _handle_response_message(self, topic: str, umsg: UMessage): request_id: UUID = umsg.attributes.reqid request_id_b: bytes = request_id.SerializeToString() @@ -161,7 +169,7 @@ def _handle_response_message(self, umsg: UMessage): def _handle_publish_message(self, topic: str, umsg: UMessage): if topic in self.topic_to_listener: - logger.info("%s Handle Topic", self.__class__.__name__) + logger.info("%s Handle Publish Message on Topic", self.__class__.__name__) for listener in self.topic_to_listener[topic]: listener.on_receive(umsg) @@ -179,35 +187,36 @@ def mqtt_topic_builder(self, source: UUri, sink: UUri = None) -> str: """ device = "c" if self.cloud_device else "d" - src_auth_name = source.authority_name if source != UUri() else "+" - src_ue_id = source.ue_id if source != UUri() and source.ue_id != 0xFFFF else "+" - src_ue_version_major = source.ue_version_major if source != UUri() and source.ue_version_major != 0xFF else "+" - src_resource_id = source.resource_id if source != UUri() and source.resource_id != 0xFFFF else "+" + if source != UUri(): + src_auth_name = source.authority_name if source != UUri() else "+" + src_ue_id = uuri_field_resolver(source.ue_id, 0xFFFF, "ffff") + src_ue_version_major = uuri_field_resolver(source.ue_version_major, 0xFF, "ff") + src_resource_id = uuri_field_resolver(source.resource_id, 0xFFFF, "ffff") topic = ( device + "/" + src_auth_name + "/" - + str(src_ue_id) + + src_ue_id + "/" - + str(src_ue_version_major) + + src_ue_version_major + "/" - + str(src_resource_id) + + src_resource_id ) if sink is not None and sink != UUri(): sink_auth_name = sink.authority_name - sink_ue_id = sink.ue_id if sink.ue_id != 0xFFFF else "+" - sink_ue_version_major = sink.ue_version_major if sink.ue_version_major != 0xFF else "+" - sink_resource_id = sink.resource_id if sink.resource_id != 0xFFFF else "+" + sink_ue_id = uuri_field_resolver(sink.ue_id, 0xFFFF, "ffff") + sink_ue_version_major = uuri_field_resolver(sink.ue_version_major, 0xFF, "ff") + sink_resource_id = uuri_field_resolver(sink.resource_id, 0xFFFF, "ffff") topic += ( "/" + sink_auth_name + "/" - + str(sink_ue_id) + + sink_ue_id + "/" - + str(sink_ue_version_major) + + sink_ue_version_major + "/" - + str(sink_resource_id) + + sink_resource_id ) return topic @@ -248,6 +257,8 @@ def register_listener(self, source_filter: UUri, listener: UListener, sink_filte """ mqtt_topic = self.mqtt_topic_builder(source=source_filter, sink=sink_filter) + logger.info("%s Registering Listener for Topic: %s", self.__class__.__name__, mqtt_topic) + self.topic_to_listener.setdefault(mqtt_topic, []).append(listener) self._mqtt_client.subscribe(topic=mqtt_topic, qos=1) diff --git a/up_client_mqtt5_python/utils/utils.py b/up_client_mqtt5_python/utils/utils.py index 113f5ac..e50f973 100644 --- a/up_client_mqtt5_python/utils/utils.py +++ b/up_client_mqtt5_python/utils/utils.py @@ -51,7 +51,6 @@ def build_message_from_mqtt_message_and_attributes(msg: mqtt.MQTTMessage, attrib elif attributes.type == UMessageType.UMESSAGE_TYPE_NOTIFICATION: return UMessageBuilder.notification(attributes.source, attributes.sink).build_from_upayload(payload_data) - def build_attributes_from_mqtt_properties(publish_properties) -> UAttributes: """ Build UAttributes from MQTT properties @@ -63,19 +62,19 @@ def build_attributes_from_mqtt_properties(publish_properties) -> UAttributes: if user_property[0] == "1": attributes.id.CopyFrom(UuidSerializer.deserialize(user_property[1])) elif user_property[0] == "2": - attributes.type = UMessageType.Value(user_property[1]) + attributes.type = int(user_property[1]) elif user_property[0] == "3": attributes.source.CopyFrom(UriSerializer.deserialize(user_property[1])) elif user_property[0] == "4": attributes.sink.CopyFrom(UriSerializer.deserialize(user_property[1])) elif user_property[0] == "5": - attributes.priority = UPriority.Value(user_property[1]) + attributes.priority = int(user_property[1]) elif user_property[0] == "6": attributes.ttl = int(user_property[1]) elif user_property[0] == "7": attributes.permission_level = int(user_property[1]) elif user_property[0] == "8": - attributes.commstatus = UCode.Value(user_property[1]) + attributes.commstatus = int(user_property[1]) elif user_property[0] == "9": attributes.reqid.CopyFrom(UuidSerializer.deserialize(user_property[1])) elif user_property[0] == "10": @@ -83,7 +82,7 @@ def build_attributes_from_mqtt_properties(publish_properties) -> UAttributes: elif user_property[0] == "11": attributes.traceparent = user_property[1] elif user_property[0] == "12": - attributes.payload_format = user_property[1] + attributes.payload_format = int(user_property[1]) return attributes @@ -98,7 +97,7 @@ def build_mqtt_properties_from_attributes(attributes: UAttributes): try: if attributes.HasField("id"): publish_properties.UserProperty.append(("1", UuidSerializer.serialize(attributes.id))) - publish_properties.UserProperty.append(("2", UMessageType.Name(attributes.type))) + publish_properties.UserProperty.append(("2", str(attributes.type))) if attributes.HasField("source"): publish_properties.UserProperty.append(("3", UriSerializer.serialize(attributes.source))) if attributes.HasField("sink"): @@ -107,14 +106,14 @@ def build_mqtt_properties_from_attributes(attributes: UAttributes): "4", UriSerializer.serialize(attributes.sink), ) - ) - publish_properties.UserProperty.append(("5", UPriority.Name(attributes.priority))) + ) + publish_properties.UserProperty.append(("5", str(attributes.priority))) if attributes.HasField("ttl"): publish_properties.UserProperty.append(("6", str(attributes.ttl))) if attributes.HasField("permission_level"): publish_properties.UserProperty.append(("7", str(attributes.permission_level))) if attributes.HasField("commstatus"): - publish_properties.UserProperty.append(("8", UCode.Name(attributes.commstatus))) + publish_properties.UserProperty.append(("8", str(attributes.commstatus))) if attributes.type == UMessageType.UMESSAGE_TYPE_RESPONSE: publish_properties.UserProperty.append( ( @@ -130,3 +129,16 @@ def build_mqtt_properties_from_attributes(attributes: UAttributes): raise ValueError(e) from e return publish_properties + +def length_resolver(field): + return "0" + field if len(field) % 2 == 1 else field + +def uuri_field_resolver(field, wildcard_value, wild_return="+"): + """ + Returns self if value isn't wild or empty, else returns wildcard_value + :param field: field to resolve + :wildcard_value: wildcard value of the field + :return: resolved field + """ + hex_val = length_resolver(f'{field:x}') + return hex_val if field != wildcard_value else wild_return \ No newline at end of file