diff --git a/.github/ruff.toml b/.github/ruff.toml new file mode 100644 index 0000000..eee3cf0 --- /dev/null +++ b/.github/ruff.toml @@ -0,0 +1,37 @@ +# ruff.toml + +line-length = 120 + +# Exclude specific files or directories. +exclude = [ + "build/", + "dist/", + ".venv/", + "__pycache__/", + +] + +[lint] +# Enable all basic PEP 8 checks and other common linting rules. +select = [ + "E", # PEP 8 rules (formatting issues) + "F", # Pyflakes rules (undefined names, etc.) + "W", # Additional PEP 8 rules (warning-level issues) + "I", # Import order checker (ensuring imports are ordered correctly) + "ERA", # flake8-eradicate (identifying commented-out code) + "N" # Naming convention rules +] + +ignore = [ + "F811", # Ignore the error for multimethod function + "N999" # Invalid module name: 'up-python' +] + +[lint.flake8-annotations] +allow-star-arg-any = true + +[format] +quote-style = "preserve" +indent-style = "space" +docstring-code-format = true +docstring-code-line-length = 100 diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..cdd647b --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,43 @@ +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +name: Run Linter + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +permissions: + contents: read + +jobs: + lint: + + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + + - name: Set up Python + uses: actions/setup-python@v3 + with: + python-version: '3.x' + + - name: Install Dependencies + run: | + python -m pip install --upgrade pip + python -m pip install ruff + + - name: Lint Python Code with Ruff + run: | + ruff check --config .github/ruff.toml + + - name: Check Python Code Formatting with Ruff + run: | + ruff format --check --config .github/ruff.toml diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml new file mode 100644 index 0000000..85cb004 --- /dev/null +++ b/.github/workflows/python-publish.yml @@ -0,0 +1,49 @@ +# This workflow will upload a Python Package using Twine when a release is created +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#publishing-to-package-registries + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +name: Upload Python Package + +on: + release: + types: [published] + workflow_dispatch: + +permissions: + contents: read + +jobs: + deploy: + + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v3 + with: + python-version: '3.x' + + - name: Install Poetry + run: | + python -m pip install --upgrade pip + python -m pip install poetry + + - name: Install dependencies + run: | + poetry install + + - name: Build package + run: poetry build + + - name: Publish package + uses: pypa/gh-action-pypi-publish@27b31702a0e7fc50959f5ad993c78deac1bdfc29 + with: + user: __token__ + password: ${{ secrets.PYPI_TOKEN }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..79863dc --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.vscode/** +**/.coverage +**/__pycache__ diff --git a/CODE_OF_CONDUCT.adoc b/CODE_OF_CONDUCT.adoc new file mode 100644 index 0000000..b348bd3 --- /dev/null +++ b/CODE_OF_CONDUCT.adoc @@ -0,0 +1,46 @@ +# Code of Conduct + +The Eclipse Foundation has a code of conduct that all projects must adhere to. the standard code of conduct that all Eclipse projects must adhere are defined in the following documents: + +- https://raw.githubusercontent.com/eclipse/.github/master/CODE_OF_CONDUCT.md[Eclipse Code of Conduct] +- https://www.eclipse.org/projects/handbook/[Eclipse Foundation Handbook] +- https://www.eclipse.org/legal/committerguidelines.php[Eclipse Committer Due Diligence Guidelines] + +We (the committers) aim to run this project in an open and fair manner where contributions are encouraged from all companies and individuals. We aim for high quality, well documented and tested software that can be used in production environments. + +This document shall outline additional roles and responsibilities for committers and project leads to ensure the project is well maintained and supported to be useable in production environments. + +## Committers +https://www.eclipse.org/projects/handbook/#roles-cm[Eclipse-uProtocol Committers] play a vital role to ensure contributions from others (and themselves) follow the vision and mission of the project as well. In this section we will outline how committers are nominated, retired, and their duties while in service. + +### Duties + +* Contribute to specifications by providing feedback, code contributions in up-spec, up-core-api repos +* Ensure all contributors (including themselves) adhere to this code of conduct, the Eclipse Foundation Handbook, and the vision & mission of the project +* Make _significant_ code contributions to one or more repositories in the Eclipse-uProtocol project +* Review and provide feedback to pull requests from other contributors +* _Actively_ participate in weekly/bi-weekly project meetings + +### Nomination +Contributors are nominated by a uProtocol Committer when they meet the https://www.eclipse.org/projects/handbook/#elections-committer[Eclipse Committer Nomination Process] requirements and are actively performing the duties of a committer mentioned above. + +### Retirement +Per https://www.eclipse.org/projects/handbook/#elections-retire-cm[Eclipse Foundation Handbook], Committers may retire for one of the following reasons: + +1. Their own volition +2. By the project lead (with supporting justification) + +Non-exhaustive examples for early retirement might be: + - Inactivity over extended period of time + - Repeated violations of this code of conduct (ex. obstructing progress during discussions/PRs without valid justification, intentional damage of various repos/projects, etc...) + +All communication regarding committer nominations and retirement, *SHALL* be sent to the uprotocol-dev@eclipse.org mailing list. + + +## Project Lead +In addition to the duties mentioned in https://www.eclipse.org/projects/handbook/#roles-pl[Eclipse Contributor Handbook], project leads *MUST* also fulfill the Committer <> defined above. + + +''' + +NOTE: Violation to this code of conduct should be reported to the https://gitlab.eclipse.org/eclipsefdn/emo-team/emo/-/issues[Eclipse Foundation Management Office (EMO)] \ No newline at end of file diff --git a/CONTRIBUTING.adoc b/CONTRIBUTING.adoc new file mode 100644 index 0000000..d5bb791 --- /dev/null +++ b/CONTRIBUTING.adoc @@ -0,0 +1,36 @@ += Contributing to Eclipse uProtocol + +Thanks for your interest in this project. Contributions are welcome! + +== Developer resources + +Information regarding source code management, builds, coding standards, and +more. + +https://projects.eclipse.org/proposals/eclipse-uprotocol + +The project maintains the following source code repositories + +* https://github.com/eclipse-uprotocol + +== Eclipse Contributor Agreement + +Before your contribution can be accepted by the project team contributors must +electronically sign the Eclipse Contributor Agreement (ECA). + +* http://www.eclipse.org/legal/ECA.php + +Commits that are provided by non-committers must have a Signed-off-by field in +the footer indicating that the author is aware of the terms by which the +contribution has been provided to the project. The non-committer must +additionally have an Eclipse Foundation account and must have a signed Eclipse +Contributor Agreement (ECA) on file. + +For more information, please see the Eclipse Committer Handbook: +https://www.eclipse.org/projects/handbook/#resources-commit + +== Contact + +Contact the project developers via the project's "dev" list. + +* https://accounts.eclipse.org/mailing-list/uprotocol-dev \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f49a4e1 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/README.adoc b/README.adoc new file mode 100644 index 0000000..44181b9 --- /dev/null +++ b/README.adoc @@ -0,0 +1,94 @@ += uProtocol Client MQTTv5 Python Library +:toc: +:toclevels: 4 +:sectnums: +:source-highlighter: coderay + +== Overview + +The following is the uProtocol client library that implements uTransport, defined in https://github.com/eclipse-uprotocol/up-python[uProtocol Python Library] using MQTTv5. + +--- + +== Getting Started + +--- + +=== Installing the library + +Run the following in your command line to install up_client_mqtt5_python into your site-packages: + +---- +cd up-client-mqtt5-python +pip install . +---- + +--- + +=== Import Guide + + +==== Importing MQTT5UTransport +---- +from up_client_mqtt5_python.mqtt5_utransport import MQTT5UTransport +---- + + +==== Creating an instance of MQTT5UTransport: +---- +mqtt5_publisher = MQTT5UTransport("client_id", "host_name", port) +---- +* client_id (String): Device Id of your created MQTT Client +* host_name (String): IP Address of the MQTT Broker +* port (Integer): Port Number of your MQTT Broker + + +==== Setting SSL Parameters for MQTT5UTransport (optional) +---- +mqtt5_publisher.create_tls_context("certificate_filename", "key_filename", "key_pass_phrase", ssl_method, verify_mode, check_hostname) +---- +* certificate_filename (String): Path to certificate file +* key_filename (String): Path to file containing private key +* key_pass_phrase (String): Password +* ssl_method (_SSLMethod): Selection of encryption protocol. Default version is `ssl.PROTOCOL_TLSv1_2`. +* verify_mode (VerifyMode): Selection of certification requirements for Client. Default is `ssl.CERT_NONE`. +* check_hostname (Boolean): Whether to match the peer cert's hostname. Default is `False`. + + +==== Connecting to the MQTT Broker +---- +mqtt5_publisher.connect() +---- + +--- + +=== Using UTransport APIs + +==== register_listener() +---- +mqtt5_subscriber.register_listener(topic, listener) +---- +* topic (UUri): Topic that the client will register to. This UUri is mapped to an MQTT topic according to the https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l1/mqtt.adoc[up-spec for MQTT]. +* listener (UListener): The listener that will be triggered when a message is received on the registered topic. + +==== unregister_listener() +---- +mqtt5_subscriber.unregister_listener(topic, listener) +---- +* topic (UUri): Topic that the client will unregister from. This UUri is mapped to an MQTT topic according to https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l1/mqtt.adoc[up-spec for MQTT]. +* listener (UListener): The listener that will be unregistered from the topic. + +==== send() +---- +mqtt5_publisher.send(umsg) +---- +* umsg (UMessage): Message that the publisher will send to the MQTT Broker. The topic will be constructed according to the UAttributes in the message. + +=== Running MQTT Pub/Sub Examples + +To run the provided MQTT Pub/Sub Examples, you will need to run the "eclipse-mosquitto" MQTT Broker inside docker. First, you will need to have docker and docker-compose on your system. Once you have this, run the following commands: +---- +cd examples/docker_mosquitto +docker-compose up -d --build +---- +Once done, you can run `mqtt5_pub.py` and `mqtt5_sub.py` in the `examples` folder to test publishing and subscribing using the MQTTv5 UTransport. \ No newline at end of file diff --git a/README.md b/README.md deleted file mode 100644 index 53c5c05..0000000 --- a/README.md +++ /dev/null @@ -1,2 +0,0 @@ -# up-client-mqtt5-python -Python uPClient for MQTT5 to be used by up-simulator and others diff --git a/SECURITY.adoc b/SECURITY.adoc new file mode 100644 index 0000000..3400d8e --- /dev/null +++ b/SECURITY.adoc @@ -0,0 +1,14 @@ +# Security Policy + +This project implements the Eclipse Foundation Security Policy + +* https://www.eclipse.org/security + +## Supported Versions + +TBD + +## Reporting a Vulnerability + +Please report vulnerabilities to the Eclipse Foundation Security Team at +security@eclipse.org \ No newline at end of file diff --git a/clean_project.py b/clean_project.py new file mode 100644 index 0000000..8a2ae69 --- /dev/null +++ b/clean_project.py @@ -0,0 +1,22 @@ +import os +import shutil + + +def clean_project(): + # Remove build/ directory + if os.path.exists('build'): + shutil.rmtree('build') + + # Remove dist/ directory + if os.path.exists('dist'): + shutil.rmtree('dist') + + # Remove *.egg-info/ directories + egg_info_directories = [d for d in os.listdir() if d.endswith('.egg-info')] + for egg_info_directory in egg_info_directories: + shutil.rmtree(egg_info_directory) + + +if __name__ == "__main__": + clean_project() + print("Cleanup complete.") diff --git a/examples/docker_mosquitto/docker-compose.yml b/examples/docker_mosquitto/docker-compose.yml new file mode 100644 index 0000000..e2344cf --- /dev/null +++ b/examples/docker_mosquitto/docker-compose.yml @@ -0,0 +1,43 @@ +# ------------------------------------------------------------------------- +# +# SPDX-FileCopyrightText: Copyright (c) 2023 Contributors to the +# Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# SPDX-FileType: SOURCE +# SPDX-License-Identifier: Apache-2.0 +# +# ------------------------------------------------------------------------- + +version: "3.7" +services: + mqtt5: + image: eclipse-mosquitto + container_name: mqtt5 + ports: + - "1883:1883" + - "9001:9001" + volumes: + - type: bind + source: ./mosquitto/config + target: /mosquitto/config + read_only: true + - type: bind + source: ./mosquitto/data + target: /mosquitto/data + - type: bind + source: ./mosquitto/log + target: /mosquitto/log diff --git a/examples/docker_mosquitto/mosquitto/config/mosquitto.conf b/examples/docker_mosquitto/mosquitto/config/mosquitto.conf new file mode 100644 index 0000000..ca7564e --- /dev/null +++ b/examples/docker_mosquitto/mosquitto/config/mosquitto.conf @@ -0,0 +1,36 @@ +# ------------------------------------------------------------------------- +# +# Copyright (c) 2024 General Motors GTO LLC +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# SPDX-FileType: SOURCE +# SPDX-FileCopyrightText: 2024 General Motors GTO LLC +# SPDX-License-Identifier: Apache-2.0 +# +# ------------------------------------------------------------------------- + +allow_anonymous true +listener 1883 +listener 9001 +protocol websockets +log_type all +log_dest stdout +log_timestamp true +log_timestamp_format %Y-%m-%dT%H:%M:%S + +max_queued_messages 0 diff --git a/examples/mqtt5_pub.py b/examples/mqtt5_pub.py new file mode 100644 index 0000000..1a1a958 --- /dev/null +++ b/examples/mqtt5_pub.py @@ -0,0 +1,61 @@ +""" +SPDX-FileCopyrightText: Copyright (c) 2023 Contributors to the +Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +SPDX-FileType: SOURCE +SPDX-License-Identifier: Apache-2.0 +""" + +import logging +import time + +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.uattributes_pb2 import UPayloadFormat + +from up_client_mqtt5_python.mqtt5_utransport import MQTT5UTransport + +logging.basicConfig(format='%(levelname)s| %(filename)s:%(lineno)s %(message)s') +logger = logging.getLogger('File:Line# Debugger') +logger.setLevel(logging.DEBUG) + + +def build_source(): + return UUri(authority_name="vcu.matthew.com", ue_id=0x4D2, ue_version_major=1, resource_id=0x8000) + + +def build_sink(): + return UUri(authority_name="vcu.matthew.com", ue_id=0x1111, ue_version_major=0x22, resource_id=0x3333) + + +def build_timestamp_upayload(): + return UPayload.pack_from_data_and_format(b"hi all", UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF) + + +def build_umessage(payload, source=build_source(), sink=build_sink()): + return UMessageBuilder.notification(source=source, sink=sink).build_from_upayload(payload) + + +if __name__ == "__main__": + mqtt5_publisher = MQTT5UTransport(build_sink(), "client_pub", "127.0.0.1", 8883, False) + mqtt5_publisher.connect() + umsg: UMessage = build_umessage(build_timestamp_upayload()) + while True: + mqtt5_publisher.send(umsg) + time.sleep(1) diff --git a/examples/mqtt5_sub.py b/examples/mqtt5_sub.py new file mode 100644 index 0000000..1b22aea --- /dev/null +++ b/examples/mqtt5_sub.py @@ -0,0 +1,72 @@ +""" +SPDX-FileCopyrightText: Copyright (c) 2023 Contributors to the +Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +SPDX-FileType: SOURCE +SPDX-License-Identifier: Apache-2.0 +""" + +import logging +import time + +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + +from up_client_mqtt5_python.mqtt5_utransport import MQTT5UTransport + +logging.basicConfig(format='%(levelname)s| %(filename)s:%(lineno)s %(message)s') +logger = logging.getLogger('File:Line# Debugger') +logger.setLevel(logging.DEBUG) + + +class MQTT5UListener(UListener): + def __init__(self) -> None: + pass + + def on_receive(self, umsg: UMessage) -> None: + """ + Method called to handle/process events.

+ Sends UMessage data directly to Test Manager + @param topic: Topic the underlying source of the message. + @param payload: Payload of the message. + @param attributes: Transportation attributes. + @return Returns an Ack every time a message is received and processed. + """ + print(umsg) + + return UStatus(code=UCode.OK, message="all good") + + +def build_source(): + return UUri(authority_name="vcu.matthew.com", ue_id=1234, ue_version_major=1, resource_id=0x8000) + + +def build_sink(): + return UUri(authority_name="vcu.matthew.com", ue_id=0xFFFF, ue_version_major=0xFF, resource_id=0xFFFF) + + +if __name__ == "__main__": + mqtt5_subscriber = MQTT5UTransport(build_source(), "client_sub", "127.0.0.1", 8883, False) + mqtt5_subscriber.connect() + source: UUri = build_source() + listener: MQTT5UListener = MQTT5UListener() + mqtt5_subscriber.register_listener(source, listener, build_sink()) + while True: + time.sleep(10) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..7d23cac --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[tool.poetry] +name = "up-client-mqtt5-python" +version = "0.0.0-dev" +description = "MQTT5-specific uProtocol library for building and using UUri, UUID, UAttributes, UTransport, and more." +authors = ["Matthew D'Alonzo "] +license = "The Apache License, Version 2.0" +readme = "README.adoc" +repository = "https://github.com/eclipse-uprotocol/up-python" +packages = [{ include = "up_client_mqtt5_python" }, + { include = "tests" } + +] + +[tool.poetry.dependencies] +python = "^3.8" +up-python = "0.2.0-dev0" +paho-mqtt = "2.0.0" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_mqtt5_utransport_send.py b/tests/test_mqtt5_utransport_send.py new file mode 100644 index 0000000..bfe3e83 --- /dev/null +++ b/tests/test_mqtt5_utransport_send.py @@ -0,0 +1,133 @@ +""" +SPDX-FileCopyrightText: Copyright (c) 2023 Contributors to the +Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +SPDX-FileType: SOURCE +SPDX-License-Identifier: Apache-2.0 +""" + +import json +import logging + +from google.protobuf.timestamp_pb2 import Timestamp +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + +from tests.testsupport.broker import fake_broker # noqa: F401, F811 +from up_client_mqtt5_python.mqtt5_utransport import MQTT5UTransport + +logging.basicConfig(format="%(levelname)s| %(filename)s:%(lineno)s %(message)s") +logger = logging.getLogger("File:Line# Debugger") +logger.setLevel(logging.DEBUG) + + +def build_source(): + return UUri(authority_name="vcu.matthew.com", ue_id=1234, ue_version_major=1, resource_id=0x8000) + + +def build_sink(): + return UUri(authority_name="vcu.matthew.com", ue_id=1234, ue_version_major=1, resource_id=0) + + +def build_format_protobuf_upayload(): + return UPayload.pack(Timestamp(seconds=1000, nanos=1000)) + + +def build_format_protobuf_any_upayload(): + return UPayload.pack_to_any(Timestamp(seconds=1000, nanos=1000)) + + +def build_format_protobuf_json_upayload(): + json_data = {"key1": "value1", "key2": "value2"} + return UPayload(value=json.dumps(json_data).encode("utf-8"), format=3) + + +def build_umessage(payload, source=build_source()): + return UMessageBuilder.publish(source=source).build_from_upayload(payload) + + +class MQTT5UListener(UListener): + def __init__(self) -> None: + pass + + def on_receive(self, umsg: UMessage) -> None: + """ + Method called to handle/process events.

+ Sends UMessage data directly to Test Manager + @param topic: Topic the underlying source of the message. + @param payload: Payload of the message. + @param attributes: Transportation attributes. + @return Returns an Ack every time a message is received and processed. + """ + print(umsg) + + return UStatus(code=UCode.OK, message="all good") + + +class TestMQTT5UTransportSend: + def test_utransport_send_valid_format_protobuf(self, fake_broker): # noqa: F811 + transport = MQTT5UTransport( + client_id="test_client", + host_name="localhost", + port=fake_broker.port, + cloud_device=True, + ) + transport.connect() + umsg: UMessage = build_umessage(build_format_protobuf_upayload()) + status = transport.send(umsg) + assert status.code == UCode.OK + + def test_utransport_send_valid_format_protobuf_any(self, fake_broker): # noqa: F811 + transport = MQTT5UTransport( + client_id="test_client", + host_name="localhost", + port=fake_broker.port, + cloud_device=True, + ) + transport.connect() + umsg: UMessage = build_umessage(build_format_protobuf_any_upayload()) + status = transport.send(umsg) + assert status.code == UCode.OK + + def test_utransport_send_valid_format_json(self, fake_broker): # noqa: F811 + transport = MQTT5UTransport( + client_id="test_client", + host_name="localhost", + port=fake_broker.port, + cloud_device=True, + ) + transport.connect() + umsg: UMessage = build_umessage(build_format_protobuf_json_upayload()) + status = transport.send(umsg) + assert status.code == UCode.OK + + def test_utransport_register_listener_valid(self, fake_broker): # noqa: F811 + transport = MQTT5UTransport( + client_id="test_client", + host_name="localhost", + port=fake_broker.port, + cloud_device=True, + ) + transport.connect() + topic: UUri = build_source() + status = transport.register_listener(topic, MQTT5UListener()) + assert status.code == UCode.OK diff --git a/tests/testsupport/broker.py b/tests/testsupport/broker.py new file mode 100644 index 0000000..ad41ae3 --- /dev/null +++ b/tests/testsupport/broker.py @@ -0,0 +1,127 @@ +# Copyright (C) 2021 +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0. +# +# SPDX-License-Identifier: EPL-2.0 +# +# This Source Code may also be made available under the following Secondary +# Licenses when the conditions for such availability set forth in the Eclipse +# Public License, v. 2.0 are satisfied: GNU General Public License as published +# by the Free Software Foundation, either version 2 of the License, or (at your +# option) any later version, with the GNU Classpath Exception which is available +# at https://www.gnu.org/software/classpath/license.html. + +import contextlib +import socket +import socketserver +import threading + +import pytest + +from tests.testsupport import paho_test + + +class FakeBroker: + def __init__(self): + # Bind to "localhost" for maximum performance, as described in: + # http://docs.python.org/howto/sockets.html#ipc + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.settimeout(5) + sock.bind(("localhost", 0)) + self.port = sock.getsockname()[1] + sock.listen(1) + + self._sock = sock + self._conn = None + + def start(self): + if self._sock is None: + raise ValueError("Socket is not open") + + (conn, address) = self._sock.accept() + conn.settimeout(5) + self._conn = conn + + def finish(self): + if self._conn is not None: + self._conn.close() + self._conn = None + + if self._sock is not None: + self._sock.close() + self._sock = None + + def receive_packet(self, num_bytes): + if self._conn is None: + raise ValueError("Connection is not open") + + packet_in = self._conn.recv(num_bytes) + return packet_in + + def send_packet(self, packet_out): + if self._conn is None: + raise ValueError("Connection is not open") + + count = self._conn.send(packet_out) + return count + + def expect_packet(self, name, packet): + if self._conn is None: + raise ValueError("Connection is not open") + + paho_test.expect_packet(self._conn, name, packet) + + +@pytest.fixture +def fake_broker(): + broker = FakeBroker() + + yield broker + + broker.finish() + + +class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + pass + + +class FakeWebsocketBroker(threading.Thread): + def __init__(self): + super().__init__() + + self.host = "localhost" + self.port = -1 # Will be set by `serve()` + + self._server = None + self._running = True + self.handler_cls = False + + @contextlib.contextmanager + def serve(self, tcphandler): + self._server = ThreadedTCPServer((self.host, 0), tcphandler) + + try: + self.start() + self.port = self._server.server_address[1] + + if not self._running: + raise RuntimeError("Error starting server") + yield + finally: + if self._server: + self._server.shutdown() + self._server.server_close() + + def run(self): + self._running = True + self._server.serve_forever() + + +@pytest.fixture +def fake_websocket_broker(): + broker = FakeWebsocketBroker() + + yield broker diff --git a/tests/testsupport/consts.py b/tests/testsupport/consts.py new file mode 100644 index 0000000..da0b7d6 --- /dev/null +++ b/tests/testsupport/consts.py @@ -0,0 +1,20 @@ +# Copyright (C) 2021 +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0. +# +# SPDX-License-Identifier: EPL-2.0 +# +# This Source Code may also be made available under the following Secondary +# Licenses when the conditions for such availability set forth in the Eclipse +# Public License, v. 2.0 are satisfied: GNU General Public License as published +# by the Free Software Foundation, either version 2 of the License, or (at your +# option) any later version, with the GNU Classpath Exception which is available +# at https://www.gnu.org/software/classpath/license.html. + +import pathlib + +tests_path = pathlib.Path(__file__).parent +lib_path = tests_path.parent +ssl_path = tests_path / "ssl" diff --git a/tests/testsupport/debug_helpers.py b/tests/testsupport/debug_helpers.py new file mode 100644 index 0000000..db62f15 --- /dev/null +++ b/tests/testsupport/debug_helpers.py @@ -0,0 +1,258 @@ +# Copyright (C) 2021 +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0. +# +# SPDX-License-Identifier: EPL-2.0 +# +# This Source Code may also be made available under the following Secondary +# Licenses when the conditions for such availability set forth in the Eclipse +# Public License, v. 2.0 are satisfied: GNU General Public License as published +# by the Free Software Foundation, either version 2 of the License, or (at your +# option) any later version, with the GNU Classpath Exception which is available +# at https://www.gnu.org/software/classpath/license.html. + +import binascii +import struct +from typing import Tuple + + +def dump_packet(prefix: str, data: bytes) -> None: + try: + data = to_string(data) + print(prefix, ": ", data, sep="") + except struct.error: + data = binascii.b2a_hex(data).decode("utf8") + print(prefix, " (not decoded): 0x", data, sep="") + + +def remaining_length(packet: bytes) -> Tuple[bytes, int]: + l = min(5, len(packet)) # noqa: E741 + all_bytes = struct.unpack("!" + "B" * l, packet[:l]) + mult = 1 + rl = 0 + for i in range(1, l - 1): + byte = all_bytes[i] + + rl += (byte & 127) * mult + mult *= 128 + if byte & 128 == 0: + packet = packet[i + 1 :] + break + + return (packet, rl) + + +def to_hex_string(packet: bytes) -> str: + if not packet: + return "" + + s = "" + while len(packet) > 0: + packet0 = struct.unpack("!B", packet[0]) + s = s + hex(packet0[0]) + " " + packet = packet[1:] + + return s + + +def to_string(packet: bytes) -> str: + if not packet: + return "" + + packet0 = struct.unpack("!B%ds" % (len(packet) - 1), bytes(packet)) + packet0 = packet0[0] + cmd = packet0 & 0xF0 + if cmd == 0x00: + # Reserved + return "0x00" + elif cmd == 0x10: + # CONNECT + (packet, rl) = remaining_length(packet) + pack_format = "!H" + str(len(packet) - 2) + "s" + (slen, packet) = struct.unpack(pack_format, packet) + pack_format = "!" + str(slen) + "sBBH" + str(len(packet) - slen - 4) + "s" + (protocol, proto_ver, flags, keepalive, packet) = struct.unpack(pack_format, packet) + kind = "clean-session" if flags & 2 else "durable" + s = f"CONNECT, proto={protocol}{proto_ver}, keepalive={keepalive}, {kind}" + + pack_format = "!H" + str(len(packet) - 2) + "s" + (slen, packet) = struct.unpack(pack_format, packet) + pack_format = "!" + str(slen) + "s" + str(len(packet) - slen) + "s" + (client_id, packet) = struct.unpack(pack_format, packet) + s = s + ", id=" + str(client_id) + + if flags & 4: + pack_format = "!H" + str(len(packet) - 2) + "s" + (slen, packet) = struct.unpack(pack_format, packet) + pack_format = "!" + str(slen) + "s" + str(len(packet) - slen) + "s" + (will_topic, packet) = struct.unpack(pack_format, packet) + s = s + ", will-topic=" + str(will_topic) + + pack_format = "!H" + str(len(packet) - 2) + "s" + (slen, packet) = struct.unpack(pack_format, packet) + pack_format = "!" + str(slen) + "s" + str(len(packet) - slen) + "s" + (will_message, packet) = struct.unpack(pack_format, packet) + s = s + ", will-message=" + will_message + + s = s + ", will-qos=" + str((flags & 24) >> 3) + s = s + ", will-retain=" + str((flags & 32) >> 5) + + if flags & 128: + pack_format = "!H" + str(len(packet) - 2) + "s" + (slen, packet) = struct.unpack(pack_format, packet) + pack_format = "!" + str(slen) + "s" + str(len(packet) - slen) + "s" + (username, packet) = struct.unpack(pack_format, packet) + s = s + ", username=" + str(username) + + if flags & 64: + pack_format = "!H" + str(len(packet) - 2) + "s" + (slen, packet) = struct.unpack(pack_format, packet) + pack_format = "!" + str(slen) + "s" + str(len(packet) - slen) + "s" + (password, packet) = struct.unpack(pack_format, packet) + s = s + ", password=" + str(password) + + if flags & 1: + s = s + ", reserved=1" + + return s + elif cmd == 0x20: + # CONNACK + if len(packet) == 4: + (cmd, rl, resv, rc) = struct.unpack("!BBBB", packet) + return "CONNACK, rl=" + str(rl) + ", res=" + str(resv) + ", rc=" + str(rc) + elif len(packet) == 5: + (cmd, rl, flags, reason_code, proplen) = struct.unpack("!BBBBB", packet) + return ( + "CONNACK, rl=" + + str(rl) + + ", flags=" + + str(flags) + + ", rc=" + + str(reason_code) + + ", proplen=" + + str(proplen) + ) + else: + return "CONNACK, (not decoded)" + + elif cmd == 0x30: + # PUBLISH + dup = (packet0 & 0x08) >> 3 + qos = (packet0 & 0x06) >> 1 + retain = packet0 & 0x01 + (packet, rl) = remaining_length(packet) + pack_format = "!H" + str(len(packet) - 2) + "s" + (tlen, packet) = struct.unpack(pack_format, packet) + pack_format = "!" + str(tlen) + "s" + str(len(packet) - tlen) + "s" + (topic, packet) = struct.unpack(pack_format, packet) + s = ( + "PUBLISH, rl=" + + str(rl) + + ", topic=" + + str(topic) + + ", qos=" + + str(qos) + + ", retain=" + + str(retain) + + ", dup=" + + str(dup) + ) + if qos > 0: + pack_format = "!H" + str(len(packet) - 2) + "s" + (mid, packet) = struct.unpack(pack_format, packet) + s = s + ", mid=" + str(mid) + + s = s + ", payload=" + str(packet) + return s + elif cmd == 0x40: + # PUBACK + if len(packet) == 5: + (cmd, rl, mid, reason_code) = struct.unpack("!BBHB", packet) + return "PUBACK, rl=" + str(rl) + ", mid=" + str(mid) + ", reason_code=" + str(reason_code) + else: + (cmd, rl, mid) = struct.unpack("!BBH", packet) + return "PUBACK, rl=" + str(rl) + ", mid=" + str(mid) + elif cmd == 0x50: + # PUBREC + if len(packet) == 5: + (cmd, rl, mid, reason_code) = struct.unpack("!BBHB", packet) + return "PUBREC, rl=" + str(rl) + ", mid=" + str(mid) + ", reason_code=" + str(reason_code) + else: + (cmd, rl, mid) = struct.unpack("!BBH", packet) + return "PUBREC, rl=" + str(rl) + ", mid=" + str(mid) + elif cmd == 0x60: + # PUBREL + dup = (packet0 & 0x08) >> 3 + (cmd, rl, mid) = struct.unpack("!BBH", packet) + return "PUBREL, rl=" + str(rl) + ", mid=" + str(mid) + ", dup=" + str(dup) + elif cmd == 0x70: + # PUBCOMP + (cmd, rl, mid) = struct.unpack("!BBH", packet) + return "PUBCOMP, rl=" + str(rl) + ", mid=" + str(mid) + elif cmd == 0x80: + # SUBSCRIBE + (packet, rl) = remaining_length(packet) + pack_format = "!H" + str(len(packet) - 2) + "s" + (mid, packet) = struct.unpack(pack_format, packet) + s = "SUBSCRIBE, rl=" + str(rl) + ", mid=" + str(mid) + topic_index = 0 + while len(packet) > 0: + pack_format = "!H" + str(len(packet) - 2) + "s" + (tlen, packet) = struct.unpack(pack_format, packet) + pack_format = "!" + str(tlen) + "sB" + str(len(packet) - tlen - 1) + "s" + (topic, qos, packet) = struct.unpack(pack_format, packet) + s = s + ", topic" + str(topic_index) + "=" + str(topic) + "," + str(qos) + return s + elif cmd == 0x90: + # SUBACK + (packet, rl) = remaining_length(packet) + pack_format = "!H" + str(len(packet) - 2) + "s" + (mid, packet) = struct.unpack(pack_format, packet) + pack_format = "!" + "B" * len(packet) + granted_qos = struct.unpack(pack_format, packet) + + s = "SUBACK, rl=" + str(rl) + ", mid=" + str(mid) + ", granted_qos=" + str(granted_qos[0]) + for i in range(1, len(granted_qos) - 1): + s = s + ", " + str(granted_qos[i]) + return s + elif cmd == 0xA0: + # UNSUBSCRIBE + (packet, rl) = remaining_length(packet) + pack_format = "!H" + str(len(packet) - 2) + "s" + (mid, packet) = struct.unpack(pack_format, packet) + s = "UNSUBSCRIBE, rl=" + str(rl) + ", mid=" + str(mid) + topic_index = 0 + while len(packet) > 0: + pack_format = "!H" + str(len(packet) - 2) + "s" + (tlen, packet) = struct.unpack(pack_format, packet) + pack_format = "!" + str(tlen) + "s" + str(len(packet) - tlen) + "s" + (topic, packet) = struct.unpack(pack_format, packet) + s = s + ", topic" + str(topic_index) + "=" + str(topic) + return s + elif cmd == 0xB0: + # UNSUBACK + (cmd, rl, mid) = struct.unpack("!BBH", packet) + return "UNSUBACK, rl=" + str(rl) + ", mid=" + str(mid) + elif cmd == 0xC0: + # PINGREQ + (cmd, rl) = struct.unpack("!BB", packet) + return "PINGREQ, rl=" + str(rl) + elif cmd == 0xD0: + # PINGRESP + (cmd, rl) = struct.unpack("!BB", packet) + return "PINGRESP, rl=" + str(rl) + elif cmd == 0xE0: + # DISCONNECT + if len(packet) == 3: + (cmd, rl, reason_code) = struct.unpack("!BBB", packet) + return "DISCONNECT, rl=" + str(rl) + ", reason_code=" + str(reason_code) + else: + (cmd, rl) = struct.unpack("!BB", packet) + return "DISCONNECT, rl=" + str(rl) + elif cmd == 0xF0: + # AUTH + (cmd, rl) = struct.unpack("!BB", packet) + return "AUTH, rl=" + str(rl) + raise ValueError(f"Unknown packet type {cmd}") diff --git a/tests/testsupport/mqtt5_props.py b/tests/testsupport/mqtt5_props.py new file mode 100644 index 0000000..4fc1907 --- /dev/null +++ b/tests/testsupport/mqtt5_props.py @@ -0,0 +1,98 @@ +# Copyright (C) 2021 +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0. +# +# SPDX-License-Identifier: EPL-2.0 +# +# This Source Code may also be made available under the following Secondary +# Licenses when the conditions for such availability set forth in the Eclipse +# Public License, v. 2.0 are satisfied: GNU General Public License as published +# by the Free Software Foundation, either version 2 of the License, or (at your +# option) any later version, with the GNU Classpath Exception which is available +# at https://www.gnu.org/software/classpath/license.html. + +import struct + +PROP_PAYLOAD_FORMAT_INDICATOR = 1 +PROP_MESSAGE_EXPIRY_INTERVAL = 2 +PROP_CONTENT_TYPE = 3 +PROP_RESPONSE_TOPIC = 8 +PROP_CORRELATION_DATA = 9 +PROP_SUBSCRIPTION_IDENTIFIER = 11 +PROP_SESSION_EXPIRY_INTERVAL = 17 +PROP_ASSIGNED_CLIENT_IDENTIFIER = 18 +PROP_SERVER_KEEP_ALIVE = 19 +PROP_AUTHENTICATION_METHOD = 21 +PROP_AUTHENTICATION_DATA = 22 +PROP_REQUEST_PROBLEM_INFO = 23 +PROP_WILL_DELAY_INTERVAL = 24 +PROP_REQUEST_RESPONSE_INFO = 25 +PROP_RESPONSE_INFO = 26 +PROP_SERVER_REFERENCE = 28 +PROP_REASON_STRING = 31 +PROP_RECEIVE_MAXIMUM = 33 +PROP_TOPIC_ALIAS_MAXIMUM = 34 +PROP_TOPIC_ALIAS = 35 +PROP_MAXIMUM_QOS = 36 +PROP_RETAIN_AVAILABLE = 37 +PROP_USER_PROPERTY = 38 +PROP_MAXIMUM_PACKET_SIZE = 39 +PROP_WILDCARD_SUB_AVAILABLE = 40 +PROP_SUBSCRIPTION_ID_AVAILABLE = 41 +PROP_SHARED_SUB_AVAILABLE = 42 + + +def gen_byte_prop(identifier, byte): + prop = struct.pack("BB", identifier, byte) + return prop + + +def gen_uint16_prop(identifier, word): + prop = struct.pack("!BH", identifier, word) + return prop + + +def gen_uint32_prop(identifier, word): + prop = struct.pack("!BI", identifier, word) + return prop + + +def gen_string_prop(identifier, s): + s = s.encode("utf-8") + prop = struct.pack(f"!BH{len(s)}s", identifier, len(s), s) + return prop + + +def gen_string_pair_prop(identifier, s1, s2): + s1 = s1.encode("utf-8") + s2 = s2.encode("utf-8") + prop = struct.pack(f"!BH{len(s1)}sH{len(s2)}s", identifier, len(s1), s1, len(s2), s2) + return prop + + +def gen_varint_prop(identifier, val): + v = pack_varint(val) + return struct.pack(f"!B{len(v)}s", identifier, v) + + +def pack_varint(varint): + s = b"" + while True: + byte = varint % 128 + varint = varint // 128 + # If there are more digits to encode, set the top bit of this digit + if varint > 0: + byte = byte | 0x80 + + s = s + struct.pack("!B", byte) + if varint == 0: + return s + + +def prop_finalise(props): + if props is None: + return pack_varint(0) + else: + return pack_varint(len(props)) + props diff --git a/tests/testsupport/paho_test.py b/tests/testsupport/paho_test.py new file mode 100644 index 0000000..f531f5a --- /dev/null +++ b/tests/testsupport/paho_test.py @@ -0,0 +1,555 @@ +# Copyright (C) 2021 +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0. +# +# SPDX-License-Identifier: EPL-2.0 +# +# This Source Code may also be made available under the following Secondary +# Licenses when the conditions for such availability set forth in the Eclipse +# Public License, v. 2.0 are satisfied: GNU General Public License as published +# by the Free Software Foundation, either version 2 of the License, or (at your +# option) any later version, with the GNU Classpath Exception which is available +# at https://www.gnu.org/software/classpath/license.html. + +import contextlib +import os +import socket +import struct +import time + +from tests.testsupport.consts import ssl_path +from tests.testsupport.debug_helpers import dump_packet + +try: + import ssl +except ImportError: + ssl = None + +from tests.testsupport import mqtt5_props + + +def bind_to_any_free_port(sock) -> int: + """ + Bind a socket to an available port on localhost, + and return the port number. + """ + sock.bind(("localhost", 0)) + return sock.getsockname()[1] + + +def create_server_socket(): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(10) + port = bind_to_any_free_port(sock) + sock.listen(5) + return (sock, port) + + +def create_server_socket_ssl(*, verify_mode=None, alpn_protocols=None): + assert ssl, "SSL not available" + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + context.load_verify_locations(str(ssl_path / "all-ca.crt")) + context.load_cert_chain( + str(ssl_path / "server.crt"), + str(ssl_path / "server.key"), + ) + if verify_mode: + context.verify_mode = verify_mode + + if alpn_protocols is not None: + context.set_alpn_protocols(alpn_protocols) + + ssock = context.wrap_socket(sock, server_side=True) + ssock.settimeout(10) + port = bind_to_any_free_port(ssock) + ssock.listen(5) + return (ssock, port) + + +def expect_packet(sock, name, expected): + rlen = len(expected) if len(expected) > 0 else 1 + + packet_recvd = b"" + try: + while len(packet_recvd) < rlen: + data = sock.recv(rlen - len(packet_recvd)) + if len(data) == 0: + break + packet_recvd += data + except socket.timeout: # pragma: no cover + pass + + assert packet_matches(name, packet_recvd, expected) + return True + + +def expect_no_packet(sock, delay=1): + """expect that nothing is received within given delay""" + sock.settimeout(delay) + try: + previous_timeout = sock.gettimeout() + data = sock.recv(1024) + except socket.timeout: + data = None + finally: + sock.settimeout(previous_timeout) + + if data is not None: + dump_packet("Received unexpected", data) + + assert data is None, "shouldn't receive any data" + + +def packet_matches(name, recvd, expected): + if recvd != expected: # pragma: no cover + print(f"FAIL: Received incorrect {name}.") + dump_packet("Received", recvd) + dump_packet("Expected", expected) + return False + else: + return True + + +def gen_connect( + client_id, + clean_session=True, + keepalive=60, + username=None, + password=None, + will_topic=None, + will_qos=0, + will_retain=False, + will_payload=b"", + proto_ver=4, + connect_reserved=False, + properties=b"", + will_properties=b"", + session_expiry=-1, +): + if (proto_ver & 0x7F) == 3 or proto_ver == 0: + remaining_length = 12 + elif (proto_ver & 0x7F) == 4 or proto_ver == 5: + remaining_length = 10 + else: + raise ValueError + + if client_id is not None: + client_id = client_id.encode("utf-8") + remaining_length = remaining_length + 2 + len(client_id) + else: + remaining_length = remaining_length + 2 + + connect_flags = 0 + + if connect_reserved: + connect_flags = connect_flags | 0x01 + + if clean_session: + connect_flags = connect_flags | 0x02 + + if proto_ver == 5: + if properties == b"": + properties += mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 20) + + if session_expiry != -1: + properties += mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, session_expiry) + + properties = mqtt5_props.prop_finalise(properties) + remaining_length += len(properties) + + if will_topic is not None: + will_topic = will_topic.encode("utf-8") + remaining_length = remaining_length + 2 + len(will_topic) + 2 + len(will_payload) + connect_flags = connect_flags | 0x04 | ((will_qos & 0x03) << 3) + if will_retain: + connect_flags = connect_flags | 32 + if proto_ver == 5: + will_properties = mqtt5_props.prop_finalise(will_properties) + remaining_length += len(will_properties) + + if username is not None: + username = username.encode("utf-8") + remaining_length = remaining_length + 2 + len(username) + connect_flags = connect_flags | 0x80 + if password is not None: + password = password.encode("utf-8") + connect_flags = connect_flags | 0x40 + remaining_length = remaining_length + 2 + len(password) + + rl = pack_remaining_length(remaining_length) + packet = struct.pack("!B" + str(len(rl)) + "s", 0x10, rl) + if (proto_ver & 0x7F) == 3 or proto_ver == 0: + packet = packet + struct.pack( + "!H6sBBH", + len(b"MQIsdp"), + b"MQIsdp", + proto_ver, + connect_flags, + keepalive, + ) + elif (proto_ver & 0x7F) == 4 or proto_ver == 5: + packet = packet + struct.pack( + "!H4sBBH", + len(b"MQTT"), + b"MQTT", + proto_ver, + connect_flags, + keepalive, + ) + + if proto_ver == 5: + packet += properties + + if client_id is not None: + packet = packet + struct.pack("!H" + str(len(client_id)) + "s", len(client_id), bytes(client_id)) + else: + packet = packet + struct.pack("!H", 0) + + if will_topic is not None: + packet += will_properties + packet = packet + struct.pack("!H" + str(len(will_topic)) + "s", len(will_topic), will_topic) + if len(will_payload) > 0: + packet = packet + struct.pack( + "!H" + str(len(will_payload)) + "s", + len(will_payload), + will_payload.encode("utf8"), + ) + else: + packet = packet + struct.pack("!H", 0) + + if username is not None: + packet = packet + struct.pack("!H" + str(len(username)) + "s", len(username), username) + if password is not None: + packet = packet + struct.pack("!H" + str(len(password)) + "s", len(password), password) + return packet + + +def gen_connack(flags=0, rc=0, proto_ver=4, properties=b"", property_helper=True): + if proto_ver == 5: + if property_helper: + if properties is not None: + properties = ( + mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_TOPIC_ALIAS_MAXIMUM, 10) + + properties + + mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 20) + ) + else: + properties = b"" + properties = mqtt5_props.prop_finalise(properties) + + packet = struct.pack("!BBBB", 32, 2 + len(properties), flags, rc) + properties + else: + packet = struct.pack("!BBBB", 32, 2, flags, rc) + + return packet + + +def gen_publish( + topic, + qos, + payload=None, + retain=False, + dup=False, + mid=0, + proto_ver=4, + properties=b"", +): + if isinstance(topic, str): + topic = topic.encode("utf-8") + rl = 2 + len(topic) + pack_format = "H" + str(len(topic)) + "s" + if qos > 0: + rl = rl + 2 + pack_format = pack_format + "H" + + if proto_ver == 5: + properties = mqtt5_props.prop_finalise(properties) + rl += len(properties) + # This will break if len(properties) > 127 + pack_format = pack_format + "%ds" % (len(properties)) + + if payload is not None: + payload = payload.encode("utf-8") + rl = rl + len(payload) + pack_format = pack_format + str(len(payload)) + "s" + else: + payload = b"" + pack_format = pack_format + "0s" + + rlpacked = pack_remaining_length(rl) + cmd = 48 | (qos << 1) + if retain: + cmd = cmd + 1 + if dup: + cmd = cmd + 8 + + if proto_ver == 5: + if qos > 0: + return struct.pack( + "!B" + str(len(rlpacked)) + "s" + pack_format, + cmd, + rlpacked, + len(topic), + topic, + mid, + properties, + payload, + ) + else: + return struct.pack( + "!B" + str(len(rlpacked)) + "s" + pack_format, + cmd, + rlpacked, + len(topic), + topic, + properties, + payload, + ) + else: + if qos > 0: + return struct.pack( + "!B" + str(len(rlpacked)) + "s" + pack_format, + cmd, + rlpacked, + len(topic), + topic, + mid, + payload, + ) + else: + return struct.pack( + "!B" + str(len(rlpacked)) + "s" + pack_format, + cmd, + rlpacked, + len(topic), + topic, + payload, + ) + + +def _gen_command_with_mid(cmd, mid, proto_ver=4, reason_code=-1, properties=None): + if proto_ver == 5 and (reason_code != -1 or properties is not None): + if reason_code == -1: + reason_code = 0 + + if properties is None: + return struct.pack("!BBHB", cmd, 3, mid, reason_code) + elif properties == "": + return struct.pack("!BBHBB", cmd, 4, mid, reason_code, 0) + else: + properties = mqtt5_props.prop_finalise(properties) + pack_format = "!BBHB" + str(len(properties)) + "s" + return struct.pack( + pack_format, + cmd, + 2 + 1 + len(properties), + mid, + reason_code, + properties, + ) + else: + return struct.pack("!BBH", cmd, 2, mid) + + +def gen_puback(mid, proto_ver=4, reason_code=-1, properties=None): + return _gen_command_with_mid(64, mid, proto_ver, reason_code, properties) + + +def gen_pubrec(mid, proto_ver=4, reason_code=-1, properties=None): + return _gen_command_with_mid(80, mid, proto_ver, reason_code, properties) + + +def gen_pubrel(mid, dup=False, proto_ver=4, reason_code=-1, properties=None): + if dup: + cmd = 96 + 8 + 2 + else: + cmd = 96 + 2 + return _gen_command_with_mid(cmd, mid, proto_ver, reason_code, properties) + + +def gen_pubcomp(mid, proto_ver=4, reason_code=-1, properties=None): + return _gen_command_with_mid(112, mid, proto_ver, reason_code, properties) + + +def gen_subscribe(mid, topic, qos, cmd=130, proto_ver=4, properties=b""): + topic = topic.encode("utf-8") + packet = struct.pack("!B", cmd) + if proto_ver == 5: + if properties == b"": + packet += pack_remaining_length(2 + 1 + 2 + len(topic) + 1) + pack_format = "!HBH" + str(len(topic)) + "sB" + return packet + struct.pack(pack_format, mid, 0, len(topic), topic, qos) + else: + properties = mqtt5_props.prop_finalise(properties) + packet += pack_remaining_length(2 + 1 + 2 + len(topic) + len(properties)) + pack_format = "!H" + str(len(properties)) + "s" + "H" + str(len(topic)) + "sB" + return packet + struct.pack(pack_format, mid, properties, len(topic), topic, qos) + else: + packet += pack_remaining_length(2 + 2 + len(topic) + 1) + pack_format = "!HH" + str(len(topic)) + "sB" + return packet + struct.pack(pack_format, mid, len(topic), topic, qos) + + +def gen_suback(mid, qos, proto_ver=4): + if proto_ver == 5: + return struct.pack("!BBHBB", 144, 2 + 1 + 1, mid, 0, qos) + else: + return struct.pack("!BBHB", 144, 2 + 1, mid, qos) + + +def gen_unsubscribe(mid, topic, cmd=162, proto_ver=4, properties=b""): + topic = topic.encode("utf-8") + if proto_ver == 5: + if properties == b"": + pack_format = "!BBHBH" + str(len(topic)) + "s" + return struct.pack( + pack_format, + cmd, + 2 + 2 + len(topic) + 1, + mid, + 0, + len(topic), + topic, + ) + else: + properties = mqtt5_props.prop_finalise(properties) + packet = struct.pack("!B", cmd) + l = 2 + 2 + len(topic) + 1 + len(properties) # noqa: E741 + packet += pack_remaining_length(l) + pack_format = "!HB" + str(len(properties)) + "sH" + str(len(topic)) + "s" + packet += struct.pack( + pack_format, + mid, + len(properties), + properties, + len(topic), + topic, + ) + return packet + else: + pack_format = "!BBHH" + str(len(topic)) + "s" + return struct.pack(pack_format, cmd, 2 + 2 + len(topic), mid, len(topic), topic) + + +def gen_unsubscribe_multiple(mid, topics, proto_ver=4): + packet = b"" + remaining_length = 0 + for t in topics: + t = t.encode("utf-8") + remaining_length += 2 + len(t) + packet += struct.pack("!H" + str(len(t)) + "s", len(t), t) + + if proto_ver == 5: + remaining_length += 2 + 1 + + return struct.pack("!BBHB", 162, remaining_length, mid, 0) + packet + else: + remaining_length += 2 + + return struct.pack("!BBH", 162, remaining_length, mid) + packet + + +def gen_unsuback(mid, reason_code=0, proto_ver=4): + if proto_ver == 5: + if isinstance(reason_code, list): + reason_code_count = len(reason_code) + p = struct.pack("!BBHB", 176, 3 + reason_code_count, mid, 0) + for r in reason_code: + p += struct.pack("B", r) + return p + else: + return struct.pack("!BBHBB", 176, 4, mid, 0, reason_code) + else: + return struct.pack("!BBH", 176, 2, mid) + + +def gen_pingreq(): + return struct.pack("!BB", 192, 0) + + +def gen_pingresp(): + return struct.pack("!BB", 208, 0) + + +def _gen_short(cmd, reason_code=-1, proto_ver=5, properties=None): + if proto_ver == 5 and (reason_code != -1 or properties is not None): + if reason_code == -1: + reason_code = 0 + + if properties is None: + return struct.pack("!BBB", cmd, 1, reason_code) + elif properties == "": + return struct.pack("!BBBB", cmd, 2, reason_code, 0) + else: + properties = mqtt5_props.prop_finalise(properties) + return struct.pack("!BBB", cmd, 1 + len(properties), reason_code) + properties + else: + return struct.pack("!BB", cmd, 0) + + +def gen_disconnect(reason_code=-1, proto_ver=4, properties=None): + return _gen_short(0xE0, reason_code, proto_ver, properties) + + +def gen_auth(reason_code=-1, properties=None): + return _gen_short(0xF0, reason_code, 5, properties) + + +def pack_remaining_length(remaining_length): + s = b"" + while True: + byte = remaining_length % 128 + remaining_length = remaining_length // 128 + # If there are more digits to encode, set the top bit of this digit + if remaining_length > 0: + byte = byte | 0x80 + + s = s + struct.pack("!B", byte) + if remaining_length == 0: + return s + + +def loop_until_keyboard_interrupt(mqttc): + """ + Call loop() in a loop until KeyboardInterrupt is received. + + This is used by the test clients in `lib/clients`; + the client spawner will send a SIGINT to the client process + when it wants the client to stop, so we should catch that + and stop the client gracefully. + """ + try: + while True: + mqttc.loop() + except KeyboardInterrupt: + pass + + +@contextlib.contextmanager +def wait_for_keyboard_interrupt(): + """ + Run the code in the context manager, then wait for a KeyboardInterrupt. + + This is used by the test clients in `lib/clients`; + the client spawner will send a SIGINT to the client process + when it wants the client to stop, so we should catch that + and stop the client gracefully. + """ + yield # If we get a KeyboardInterrupt during the block, it's too soon! + try: + while True: + time.sleep(0.1) + except KeyboardInterrupt: + pass + + +def get_test_server_port() -> int: + """ + Get the port number for the test server. + """ + return int(os.environ["PAHO_SERVER_PORT"]) diff --git a/up_client_mqtt5_python/__init__.py b/up_client_mqtt5_python/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/up_client_mqtt5_python/mqtt5_utransport.py b/up_client_mqtt5_python/mqtt5_utransport.py new file mode 100644 index 0000000..66e1fbb --- /dev/null +++ b/up_client_mqtt5_python/mqtt5_utransport.py @@ -0,0 +1,299 @@ +""" +SPDX-FileCopyrightText: Copyright (c) 2023 Contributors to the +Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +SPDX-FileType: SOURCE +SPDX-License-Identifier: Apache-2.0 +""" + +import logging +import ssl +import threading +from concurrent.futures import Future +from typing import Dict, List + +import paho.mqtt.client as mqtt +from uprotocol.transport.ulistener import UListener +from uprotocol.transport.utransport import UTransport +from uprotocol.v1.uattributes_pb2 import UAttributes, UMessageType +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus +from uprotocol.v1.uuid_pb2 import UUID + +from up_client_mqtt5_python.utils.utils import ( + build_attributes_from_mqtt_properties, + build_message_from_mqtt_message_and_attributes, + build_mqtt_properties_from_attributes, + uuri_field_resolver, +) + +logging.basicConfig(format="%(levelname)s| %(filename)s:%(lineno)s %(message)s") +logger = logging.getLogger("File:Line# Debugger") +logger.setLevel(logging.DEBUG) + + +class MQTT5UTransport(UTransport): + """ + MQTTv5 Transport for UProtocol + """ + + def __init__(self, source: UUri, client_id: str, host_name: str, port: int, cloud_device: bool) -> None: + """ + Creates a UEntity with an MQTTv5 Connection, as well as tracking a + list of registered listeners. + @param client_id: ID of the MQTT Client + @param host_name: Address of the MQTT Broker + @param port: Port of the MQTT Broker + @param cloud_device: Whether or not your device lives in the cloud. + """ + + self.source = source + self.host_name = host_name + self.port = port + self.cloud_device = cloud_device + self.context = None + + self._connected_signal = threading.Event() + + self.topic_to_listener: Dict[str, List[UListener]] = {} + self.reqid_to_future: Dict[bytes, Future] = {} + + self._mqtt_client = mqtt.Client( + mqtt.CallbackAPIVersion.VERSION2, + client_id=client_id, + protocol=mqtt.MQTTv5, + ) + + self._mqtt_client.enable_logger() + + def create_tls_context( + self, + certificate_filename: str = None, + key_filename: str = None, + key_pass_phrase: str = None, + ssl_method=ssl.PROTOCOL_TLSv1_2, + verify_mode=ssl.CERT_NONE, + check_hostname=False, + ) -> None: + """ + Creates a TLS Context for the MQTT Broker. + @param certificate_filename: Filename of the certificate + @param key_filename: Filename of the key + @param key_pass_phrase: Passphrase for the key + @param ssl_method: SSL Method + @param verify_mode: Verification Mode + @param check_hostname: Whether or not to check the hostname + @return: None + """ + + self.context = ssl.SSLContext(protocol=ssl_method) + self.context.verify_mode = verify_mode + self.context.check_hostname = check_hostname + if certificate_filename is not None: + self.context.load_cert_chain(certificate_filename, key_filename, key_pass_phrase) + self._mqtt_client.tls_set_context(self.context) + + def connect(self): + """ + Connects to the MQTT Broker. + @return: None + """ + self._mqtt_client.on_message = self._listen + logger.info("%s Connecting to MQTT Broker", self.__class__.__name__) + self._mqtt_client.connect( + host=self.host_name, + port=self.port, + clean_start=False, + keepalive=60, + ) + logger.info("%s Connected to MQTT Broker", self.__class__.__name__) + self._mqtt_client.loop_start() + logger.info("%s started MQTT Loop", self.__class__.__name__) + + def _listen(self, client, userdata, msg): + """ + Listens for and processes messages from MQTT Broker. + @param client: + @param userdata: + @param msg: + @return: None + """ + logger.info(f"Received Message on MQTT: {msg}") + + attributes: UAttributes = build_attributes_from_mqtt_properties(msg.properties) + umsg: UMessage = build_message_from_mqtt_message_and_attributes(msg, attributes) + + message_type_handlers = { + UMessageType.UMESSAGE_TYPE_UNSPECIFIED: self._handle_unspecified_message, + UMessageType.UMESSAGE_TYPE_PUBLISH: self._handle_gen_message, + UMessageType.UMESSAGE_TYPE_REQUEST: self._handle_gen_message, + UMessageType.UMESSAGE_TYPE_NOTIFICATION: self._handle_gen_message, + UMessageType.UMESSAGE_TYPE_RESPONSE: self._handle_response_message, + } + + handler = message_type_handlers.get(attributes.type) + if handler: + handler(msg.topic, umsg) + else: + raise ValueError("Unsupported message type: " + UMessageType.Name(attributes.type)) + + def _handle_unspecified_message(self, topic: str, umsg: UMessage): + logger.info("%s Unspecified Message Received", self.__class__.__name__) + logger.info(f"Message Details: {umsg}") + logger.info(f"Unspecified Message received on topic {topic}") + + def _handle_response_message(self, topic: str, umsg: UMessage): + request_id: UUID = umsg.attributes.reqid + request_id_b: bytes = request_id.SerializeToString() + + if request_id_b in self.reqid_to_future: + respose_future: Future = self.reqid_to_future[request_id_b] + respose_future.set_result(umsg) + + del self.reqid_to_future[request_id_b] + + def _handle_gen_message(self, topic: str, umsg: UMessage): + + topic = topic.replace("FFFF", "+").replace("FF", "+") + + pieces_of_topic = topic.split("/") + for x in self.topic_to_listener.keys(): + pieces_of_x = x.split("/") + if len(pieces_of_topic) == len(pieces_of_x): + matches = True + for i in range(len(pieces_of_topic)): + if pieces_of_x[i] == "+": + continue + if pieces_of_x[i] != pieces_of_topic[i]: + matches = False + if matches: + logger.info("%s Handle Message on Topic", self.__class__.__name__) + for listener in self.topic_to_listener[x]: + listener.on_receive(umsg) + + def mqtt_topic_builder(self, source: UUri, sink: UUri = None) -> str: + """ + Builds MQTT topic based on whether the topic authority is + local or remote. + @param topic: UUri with which MQTT topics are built + @param msg_type: Whether the topic is for sending or + registering a listener + @return: returns MQTT Topic + """ + + device = "c" if self.cloud_device else "d" + if source != UUri(): + src_auth_name = source.authority_name if source != UUri() else "+" + src_ue_id = uuri_field_resolver(source.ue_id, 0xFFFF, "+") + src_ue_version_major = uuri_field_resolver(source.ue_version_major, 0xFF, "+") + src_resource_id = uuri_field_resolver(source.resource_id, 0xFFFF, "+") + topic = device + "/" + src_auth_name + "/" + src_ue_id + "/" + src_ue_version_major + "/" + src_resource_id + if sink is not None and sink != UUri(): + sink_auth_name = sink.authority_name + sink_ue_id = uuri_field_resolver(sink.ue_id, 0xFFFF, "+") + sink_ue_version_major = uuri_field_resolver(sink.ue_version_major, 0xFF, "+") + sink_resource_id = uuri_field_resolver(sink.resource_id, 0xFFFF, "+") + topic += "/" + sink_auth_name + "/" + sink_ue_id + "/" + sink_ue_version_major + "/" + sink_resource_id + return topic + + def send(self, message: UMessage) -> UStatus: + """ + Transmits UPayload to the topic using the attributes defined in + UTransportAttributes. + @param umsg: UMessage to be sent to MQTT + @return:Returns OKSTATUS if the payload has been successfully + sent (ACK'ed), otherwise it returns FAILSTATUS + with the appropriate failure. + """ + + payload: bytes = message.payload + + publish_properties = build_mqtt_properties_from_attributes(message.attributes) + + self._mqtt_client.publish( + topic=self.mqtt_topic_builder(source=message.attributes.source, sink=message.attributes.sink), + payload=payload, + qos=1, + properties=publish_properties, + ) + + return UStatus(code=UCode.OK, message="OK") + + def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + """ + Register listener to be called when UPayload is received for the + specific topic. + @param topic:Resolved UUri for where the message arrived via + the underlying transport technology. + @param listener:The method to execute to process the date for the + topic. + @return:Returns OKSTATUS if the listener is registered + correctly, otherwise it returns FAILSTATUS with the + appropriate failure. + """ + + mqtt_topic = self.mqtt_topic_builder(source=source_filter, sink=sink_filter) + logger.info("%s Registering Listener for Topic: %s", self.__class__.__name__, mqtt_topic) + + self.topic_to_listener.setdefault(mqtt_topic, []).append(listener) + + self._mqtt_client.subscribe(topic=mqtt_topic, qos=1) + + self._mqtt_client.loop_start() + + return UStatus(code=UCode.OK, message="OK") + + def unregister_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri) -> UStatus: + """ + Register listener to be called when UPayload is received for the + specific topic. + @param topic:Resolved UUri for where the message arrived via + the underlying transport technology. + @param listener:The method to execute to process the date for the + topic. + @return:Returns OKSTATUS if the listener is registered + correctly, otherwise it returns FAILSTATUS with the + appropriate failure. + """ + mqtt_topic = self.mqtt_topic_builder(source=source_filter, sink=sink_filter) + + if mqtt_topic in self.topic_to_listener: + if len(self.topic_to_listener[mqtt_topic]) > 1: + self.topic_to_listener[mqtt_topic].remove(listener) + else: + del self.topic_to_listener[mqtt_topic] + + self._mqtt_client.unsubscribe(topic=mqtt_topic) + + return UStatus(code=UCode.OK, message="OK") + + def get_source(self) -> UUri: + """ + Returns the source of the MQTT Transport. + @return: UUri source + """ + return self.source + + def close(self): + """ + Closes the MQTT Connection. + @return: None + """ + self._mqtt_client.disconnect() + self._mqtt_client.loop_stop() diff --git a/up_client_mqtt5_python/utils/utils.py b/up_client_mqtt5_python/utils/utils.py new file mode 100644 index 0000000..cc05c88 --- /dev/null +++ b/up_client_mqtt5_python/utils/utils.py @@ -0,0 +1,146 @@ +""" +SPDX-FileCopyrightText: Copyright (c) 2024 Contributors to the +Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +SPDX-FileType: SOURCE +SPDX-License-Identifier: Apache-2.0 +""" + +import paho.mqtt.client as mqtt +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.uri.serializer.uriserializer import UriSerializer +from uprotocol.uuid.serializer.uuidserializer import UuidSerializer +from uprotocol.v1.uattributes_pb2 import UAttributes, UMessageType +from uprotocol.v1.umessage_pb2 import UMessage + + +def build_message_from_mqtt_message_and_attributes(msg: mqtt.MQTTMessage, attributes: UAttributes) -> UMessage: + """ + Build a message from a MQTT message and UAttributes + :param msg: MQTT message + :param attributes: UAttributes attributes + :return: UMessage message + """ + payload_data: UPayload = UPayload(msg.payload, attributes.payload_format) + if attributes.type == UMessageType.UMESSAGE_TYPE_RESPONSE: + return UMessageBuilder.response(attributes.source, attributes.sink, attributes.reqid).build_from_upayload( + payload_data + ) + elif attributes.type == UMessageType.UMESSAGE_TYPE_PUBLISH: + return UMessageBuilder.publish(attributes.source).build_from_upayload(payload_data) + elif attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST: + return UMessageBuilder.request(attributes.source, attributes.sink, attributes.ttl).build_from_upayload( + payload_data + ) + elif attributes.type == UMessageType.UMESSAGE_TYPE_NOTIFICATION: + return UMessageBuilder.notification(attributes.source, attributes.sink).build_from_upayload(payload_data) + + +def build_attributes_from_mqtt_properties(publish_properties) -> UAttributes: + """ + Build UAttributes from MQTT properties + :param properties: MQTT properties + :return: UAttributes attributes + """ + attributes: UAttributes = UAttributes() + for user_property in publish_properties.UserProperty: + if user_property[0] == "1": + attributes.id.CopyFrom(UuidSerializer.deserialize(user_property[1])) + elif user_property[0] == "2": + attributes.type = int(user_property[1]) + elif user_property[0] == "3": + attributes.source.CopyFrom(UriSerializer.deserialize(user_property[1])) + elif user_property[0] == "4": + attributes.sink.CopyFrom(UriSerializer.deserialize(user_property[1])) + elif user_property[0] == "5": + attributes.priority = int(user_property[1]) + elif user_property[0] == "6": + attributes.ttl = int(user_property[1]) + elif user_property[0] == "7": + attributes.permission_level = int(user_property[1]) + elif user_property[0] == "8": + attributes.commstatus = int(user_property[1]) + elif user_property[0] == "9": + attributes.reqid.CopyFrom(UuidSerializer.deserialize(user_property[1])) + elif user_property[0] == "10": + attributes.token = user_property[1] + elif user_property[0] == "11": + attributes.traceparent = user_property[1] + elif user_property[0] == "12": + attributes.payload_format = int(user_property[1]) + return attributes + + +def build_mqtt_properties_from_attributes(attributes: UAttributes): + """ + Build MQTT properties from UAttributes + :param attributes: UAttributes attributes + :return: MQTT properties + """ + publish_properties = mqtt.Properties(mqtt.PacketTypes.PUBLISH) + publish_properties.UserProperty = [] + try: + if attributes.HasField("id"): + publish_properties.UserProperty.append(("1", UuidSerializer.serialize(attributes.id))) + publish_properties.UserProperty.append(("2", str(attributes.type))) + if attributes.HasField("source"): + publish_properties.UserProperty.append(("3", UriSerializer.serialize(attributes.source))) + if attributes.HasField("sink"): + publish_properties.UserProperty.append( + ( + "4", + UriSerializer.serialize(attributes.sink), + ) + ) + publish_properties.UserProperty.append(("5", str(attributes.priority))) + if attributes.HasField("ttl"): + publish_properties.UserProperty.append(("6", str(attributes.ttl))) + if attributes.HasField("permission_level"): + publish_properties.UserProperty.append(("7", str(attributes.permission_level))) + if attributes.HasField("commstatus"): + publish_properties.UserProperty.append(("8", str(attributes.commstatus))) + if attributes.type == UMessageType.UMESSAGE_TYPE_RESPONSE: + publish_properties.UserProperty.append( + ( + "9", + UuidSerializer.serialize(attributes.reqid), + ) + ) + if attributes.HasField("token"): + publish_properties.UserProperty.append(("10", attributes.token)) + if attributes.HasField("traceparent"): + publish_properties.UserProperty.append(("11", attributes.traceparent)) + except ValueError as e: + raise ValueError(e) from e + + return publish_properties + + +def length_resolver(field): + return "0" + field if len(field) % 2 == 1 else field + + +def uuri_field_resolver(field, wildcard_value, wild_return="+"): + """ + Returns self if value isn't wild or empty, else returns wildcard_value + :param field: field to resolve + :wildcard_value: wildcard value of the field + :return: resolved field + """ + hex_val = length_resolver(f'{field:x}') + return hex_val if field != wildcard_value else wild_return