diff --git a/ChangeLog.txt b/ChangeLog.txt index 0e13b7d0..77d51d0d 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,12 +1,18 @@ v2.0.0 - 2023-xx-xx =================== +- **BREAKING** Added callback_api_version. This break *ALL* users of paho-mqtt Client class. + See migrations.md for details on how to upgrade. + tl; dr; add CallbackAPIVersion.VERSION1 to first argument of Client() - **BREAKING** Drop support for Python 2.7, Python 3.5 and Python 3.6 Minimum tested version is Python 3.7 Python version up to Python 3.12 are tested. - **BREAKING** connect_srv changed it signature to take an additional bind_port parameter. This is a breaking change, but in previous version connect_srv was broken anyway. Closes #493. +- Add version 2 of user-callback which allow to access MQTTv5 reason code & properties that were + missing from on_publish callback. Also it's more consistent in parameter order or between + MQTTv3 and MQTTv5. - Add types to Client class, which caused few change which should be compatible. Known risk of breaking changes: - Use enum for returned error code (like MQTT_ERR_SUCCESS). It use an IntEnum diff --git a/migrations.md b/migrations.md new file mode 100644 index 00000000..db1b9da5 --- /dev/null +++ b/migrations.md @@ -0,0 +1,189 @@ +# Migration + +## Change between version 1.x and 2.0 + +### Improved typing + +Version 2.0 improved typing, but this would be compatible with existing code. +The most likely issue are some integer that are now better type, like `dup` on MQTTMessage. + +That means that code that used `if msg.dup == 1:` will need to be change to `if msg.dup:` (the later version +for with both paho-mqtt 1.x and 2.0). + +### Versioned the user callbacks + +Version 2.0 of paho-mqtt introduced versioning of user-callback. To fix some inconsistency in callback +arguments and to provide better support for MQTTv5, version 2.0 changed the arguments passed to user-callback. + +You can still use old version of callback, you are just require to tell paho-mqtt that you opt for this +version. For that just change your client creation from: +``` +# OLD code +>>> mqttc = mqtt.Client() + +# NEW code +>>> mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1) +``` + +That it, remaining of the code could stay unchanged. + +The version 1 of callback is deprecated, but is still supported in version 2.x. If you want to upgrade to newer version of API callback, you will need to update your callbacks: + +#### on_connect + +``` +# OLD code for MQTTv3 +def on_connect(client, userdata, flags, rc): + if flags["session present"] == 1: + # ... + if rc == 0: + # success connect + if rc > 0: + # error processing + +# OLD code for MQTTv5 +def on_connect(client, userdata, flags, reason_code, properties): + if flags["session present"] == 1: + # ... + if reason_code == 0: + # success connect + +# NEW code for both version +def on_connect(client, userdata, flags, reason_code, properties): + if flags.session_present: + # ... + if reason_code == 0: + # success connect + if reason_code > 0: + # error processing +``` + +Be careful that for MQTTv3, `rc` (an integer) changed to `reason_code` (an instance of ReasonCodes), and the numeric value changed. +The numeric value 0 means success for both, so as in above example, using `reason_code == 0`, `reason_code != 0` or other comparison with zero +is fine. +But if you had comparison with other value, you will need to update the code. It's recommended to compare to string value: + +``` +# OLD code for MQTTv3 +def on_connect(client, userdata, flags, rc): + if rc == 1: + # handle bad protocol version + if rc == CONNACK_REFUSED_IDENTIFIER_REJECTED: + # handle bad identifier + +# NEW code +def on_connect(client, userdata, flags, reason_code, properties): + if rc == "Unsupported protocol version": + # handle bad protocol version + if rc == "Client identifier not valid": + # handle bad identifier +``` + +#### on_disconnect + +``` +# OLD code for MQTTv3 +def on_disconnect(client, userdata, rc): + if rc == 0: + # success disconnect + if rc > 0: + # error processing + +# OLD code for MQTTv5 +def on_disconnect(client, userdata, reason_code, properties): + if reason_code: + # error processing + +# NEW code for both version +def on_disconnect(client, userdata, flags, reason_code, properties): + if reason_code == 0: + # success disconnect + if reason_code > 0: + # error processing +``` + + +#### on_subscribe + +``` +# OLD code for MQTTv3 +def on_subscribe(client, userdata, mid, granted_qos): + for sub_result in granted_qos: + if sub_result == 1: + # process QoS == 1 + if sub_result == 0x80: + # error processing + +# OLD code for MQTTv5 +def on_disconnect(client, userdata, mid, reason_codes, properties): + for sub_result in reason_codes: + if sub_result == 1: + # process QoS == 1 + # Any reason code >= 128 is a failure. + if sub_result >= 128: + # error processing + +# NEW code for both version +def on_subscribe(client, userdata, mid, reason_codes, properties): + for sub_result in reason_codes: + if sub_result == 1: + # process QoS == 1 + # Any reason code >= 128 is a failure. + if sub_result >= 128: + # error processing +``` + + +#### on_unsubscribe + +``` +# OLD code for MQTTv3 +def on_unsubscribe(client, userdata, mid): + # ... + +# OLD code for MQTTv5 +def on_unsubscribe(client, userdata, mid, properties, reason_codes): + # In OLD version, reason_codes could be a list or a single ReasonCodes object + if isinstance(reason_codes, list): + for unsub_result in reason_codes: + # Any reason code >= 128 is a failure. + if reason_codes[0] >= 128: + # error processing + else: + # Any reason code >= 128 is a failure. + if reason_codes > 128: + # error processing + + +# NEW code for both version +def on_subscribe(client, userdata, mid, reason_codes, properties): + # In NEW version, reason_codes is always a list. Empty for MQTTv3 + for unsub_result in reason_codes: + # Any reason code >= 128 is a failure. + if reason_codes[0] >= 128: + # error processing +``` + + +#### on_publish + +``` +# OLD code +def on_publish(client, userdata, mid): + # ... + + +# NEW code +def on_publish(client, userdata, mid, reason_codes, properties): + # ... +``` + + +#### on_message + +No change for this callback. +``` +# OLD & NEW code +def on_message(client, userdata, message): + # ... +``` \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 2b62fb8d..e990812d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,6 +90,9 @@ disallow_untyped_defs = true [tool.pytest.ini_options] addopts = ["-r", "xs"] testpaths = "tests src" +filterwarnings = [ + "ignore:Callback API version 1 is deprecated, update to latest version" +] [tool.ruff] exclude = ["test/lib/python/*"] diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index c4c93753..aaf37307 100644 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -34,9 +34,12 @@ import urllib.parse import urllib.request import uuid +import warnings from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, List, NamedTuple, Sequence, Tuple, Union, cast -from .enums import ConnackCode, ConnectionState, LogLevel, MessageState, MessageType, MQTTErrorCode, MQTTProtocolVersion, PahoClientMode +from paho.mqtt.packettypes import PacketTypes + +from .enums import CallbackAPIVersion, ConnackCode, ConnectionState, LogLevel, MessageState, MessageType, MQTTErrorCode, MQTTProtocolVersion, PahoClientMode from .matcher import MQTTMatcher from .properties import Properties from .reasoncodes import ReasonCodes @@ -209,10 +212,27 @@ class _OutPacket(TypedDict): class ConnectFlags(NamedTuple): + """Contains additional information for on_connect + + session_present: this flag is useful for clients that are + using clean session set to False only (MQTTv3) or clean_start = False (MQTTv5). + In that case, if client that reconnects to a broker that it has previously + connected to, this flag indicates whether the broker still has the + session information for the client. If true, the session still exists. + """ + session_present: bool class DisconnectFlags(NamedTuple): + """Contains additional information of on_disconnect + + is_disconnect_packet_from_server: tell whether this on_disconnect call is the result + of receiving an DISCONNECT packet from the broker or if the on_disconnect is only + generated by the client library. + When true, the reason code is generated by the broker. + """ + is_disconnect_packet_from_server: bool @@ -293,8 +313,11 @@ def error_string(mqtt_errno: MQTTErrorCode) -> str: return "Unknown error." -def connack_string(connack_code: int) -> str: - """Return the string associated with a CONNACK result.""" +def connack_string(connack_code: int|ReasonCodes) -> str: + """Return the string associated with a CONNACK result or CONNACK reason code.""" + if isinstance(connack_code, ReasonCodes): + return str(connack_code) + if connack_code == CONNACK_ACCEPTED: return "Connection Accepted." elif connack_code == CONNACK_REFUSED_PROTOCOL_VERSION: @@ -311,6 +334,55 @@ def connack_string(connack_code: int) -> str: return "Connection Refused: unknown reason." +def convert_connack_rc_to_reason_code(connack_code: ConnackCode) -> ReasonCodes: + """Convert a MQTTv3 / MQTTv3.1.1 connack result to Reason code. + + Be careful that the numeric value isn't the same, for example: + >>> ConnackCode.CONNACK_REFUSED_SERVER_UNAVAILABLE == 3 + >>> convert_connack_rc_to_reason_code(ConnackCode.CONNACK_REFUSED_SERVER_UNAVAILABLE) == 136 + + It's recommended to compare by names + >>> code_to_test = ReasonCodes(PacketTypes.CONNACK, "Server unavailable") + >>> convert_connack_rc_to_reason_code(ConnackCode.CONNACK_REFUSED_SERVER_UNAVAILABLE) == code_to_test + """ + if connack_code == ConnackCode.CONNACK_ACCEPTED: + return ReasonCodes(PacketTypes.CONNACK, "Success") + if connack_code == ConnackCode.CONNACK_REFUSED_PROTOCOL_VERSION: + return ReasonCodes(PacketTypes.CONNACK, "Unsupported protocol version") + if connack_code == ConnackCode.CONNACK_REFUSED_IDENTIFIER_REJECTED: + return ReasonCodes(PacketTypes.CONNACK, "Client identifier not valid") + if connack_code == ConnackCode.CONNACK_REFUSED_SERVER_UNAVAILABLE: + return ReasonCodes(PacketTypes.CONNACK, "Server unavailable") + if connack_code == ConnackCode.CONNACK_REFUSED_BAD_USERNAME_PASSWORD: + return ReasonCodes(PacketTypes.CONNACK, "Bad user name or password") + if connack_code == ConnackCode.CONNACK_REFUSED_NOT_AUTHORIZED: + return ReasonCodes(PacketTypes.CONNACK, "Not authorized") + + return ReasonCodes(PacketTypes.CONNACK, "Unspecified error") + + +def convert_disconnect_error_code_to_reason_code(rc: MQTTErrorCode) -> ReasonCodes: + """Convert an MQTTErrorCode to Reason code. + + This is used in on_disconnect callback to have a consistent API. + + Be careful that the numeric value isn't the same, for example: + >>> MQTTErrorCode.MQTT_ERR_PROTOCOL == 2 + >>> convert_disconnect_error_code_to_reason_code(MQTTErrorCode.MQTT_ERR_PROTOCOL) == 130 + + It's recommended to compare by names + >>> code_to_test = ReasonCodes(PacketTypes.DISCONNECT, "Protocol error") + >>> convert_disconnect_error_code_to_reason_code(MQTTErrorCode.MQTT_ERR_PROTOCOL) == code_to_test + """ + if rc == MQTTErrorCode.MQTT_ERR_SUCCESS: + return ReasonCodes(PacketTypes.DISCONNECT, "Success") + if rc == MQTTErrorCode.MQTT_ERR_KEEPALIVE: + return ReasonCodes(PacketTypes.DISCONNECT, "Keep alive timeout") + if rc == MQTTErrorCode.MQTT_ERR_CONN_LOST: + return ReasonCodes(PacketTypes.DISCONNECT, "Unspecified error") + return ReasonCodes(PacketTypes.DISCONNECT, "Unspecified error") + + def base62( num: int, base: str = string.digits + string.ascii_letters, @@ -589,6 +661,7 @@ def on_connect(client, userdata, flags, rc): def __init__( self, + callback_api_version: CallbackAPIVersion, client_id: str = "", clean_session: bool | None = None, userdata: Any = None, @@ -597,7 +670,13 @@ def __init__( reconnect_on_failure: bool = True, manual_ack: bool = False, ) -> None: - """client_id is the unique client id string used when connecting to the + """ + callback_api_version define the API version for user-callback (on_connect, on_publish,...). + This field is required and it's recommended to use the latest version (CallbackAPIVersion.API_VERSION2). + See each callback for description of API for each version. The file migrations.md contains details on + how to migrate between version. + + client_id is the unique client id string used when connecting to the broker. If client_id is zero length or None, then the behaviour is defined by which protocol version is in use. If using MQTT v3.1.1, then a zero length client id will be sent to the broker and the broker will @@ -651,6 +730,22 @@ def __init__( self._keepalive = 60 self._connect_timeout = 5.0 self._client_mode = MQTT_CLIENT + self._callback_api_version = callback_api_version + + if self._callback_api_version == CallbackAPIVersion.VERSION1: + warnings.warn( + "Callback API version 1 is deprecated, update to latest version", + category=DeprecationWarning, + stacklevel=2, + ) + if isinstance(self._callback_api_version, str): + # Help user to migrate, it probably provided a client id + # as first arguments + raise ValueError( + "Unsupported callback API version: version 2.0 added a callback_api_version, see migrations.md for details" + ) + if self._callback_api_version not in CallbackAPIVersion: + raise ValueError("Unsupported callback API version") if protocol == MQTTv5: if clean_session is not None: @@ -1716,7 +1811,7 @@ def loop_read(self, max_packets: int = 1) -> MQTTErrorCode: return MQTTErrorCode.MQTT_ERR_NO_CONN rc = self._packet_read() if rc > 0: - return self._loop_rc_handle(rc) # type: ignore + return self._loop_rc_handle(rc) elif rc == MQTTErrorCode.MQTT_ERR_AGAIN: return MQTTErrorCode.MQTT_ERR_SUCCESS return MQTTErrorCode.MQTT_ERR_SUCCESS @@ -1739,7 +1834,7 @@ def loop_write(self, max_packets: int = 1) -> MQTTErrorCode: if rc == MQTTErrorCode.MQTT_ERR_AGAIN: return MQTTErrorCode.MQTT_ERR_SUCCESS elif rc > 0: - return self._loop_rc_handle(rc) # type: ignore + return self._loop_rc_handle(rc) else: return MQTTErrorCode.MQTT_ERR_SUCCESS finally: @@ -1780,7 +1875,10 @@ def loop_misc(self) -> MQTTErrorCode: else: rc = MQTTErrorCode.MQTT_ERR_KEEPALIVE - self._do_on_disconnect(rc) + self._do_on_disconnect( + packet_from_broker=False, + v1_rc=rc, + ) return MQTTErrorCode.MQTT_ERR_CONN_LOST @@ -1987,6 +2085,14 @@ def loop_stop(self, force: bool = False) -> MQTTErrorCode: return MQTTErrorCode.MQTT_ERR_SUCCESS + @property + def callback_api_version(self) -> CallbackAPIVersion: + """ + Return the callback API version used for user-callback. See docstring for + each user-callback (on_connect, on_publish, ...) for details. + """ + return self._callback_api_version + @property def on_log(self) -> CallbackOnLog | None: """If implemented, called when the client has log information. @@ -2028,7 +2134,7 @@ def on_pre_connect(self) -> CallbackOnPreConnect | None: def on_pre_connect(self, func: CallbackOnPreConnect | None) -> None: """ Define the pre_connect callback implementation. - Expected signature: + Expected signature (for all callback API version): connect_callback(client, userdata) client: the client instance for this callback @@ -2059,22 +2165,31 @@ def on_connect(self) -> CallbackOnConnect | None: def on_connect(self, func: CallbackOnConnect | None) -> None: """ Define the connect callback implementation. - Expected signature for MQTT v3.1 and v3.1.1 is: - connect_callback(client, userdata, flags, rc) + Expected signature for callback API version 2: + connect_callback(client, userdata, connect_flags, reason_code, properties) - and for MQTT v5.0: - connect_callback(client, userdata, flags, reasonCode, properties) + Expected signature for callback API version 1 change with MQTT protocol version: + * For MQTT v3.1 and v3.1.1 it's: + connect_callback(client, userdata, flags, rc) + + * For MQTT it's v5.0: + connect_callback(client, userdata, flags, reason_code, properties) - client: the client instance for this callback - userdata: the private user data as set in Client() or user_data_set() - flags: response flags sent by the broker - rc: the connection result - reasonCode: the MQTT v5.0 reason code: an instance of the ReasonCode class. - ReasonCode may be compared to integer. - properties: the MQTT v5.0 properties returned from the broker. An instance - of the Properties class. - For MQTT v3.1 and v3.1.1 properties is not provided but for compatibility - with MQTT v5.0, we recommend adding properties=None. + + client: the client instance for this callback + userdata: the private user data as set in Client() or user_data_set() + connect_flags: the flags for this connection, it has type ConnectFlags + reason_code: the connection reason code received from the broken. + In MQTT v5.0 it's the reason code defined by the standard. + In MQTT v3, we convert return code to a reason code, see + convert_connack_rc_to_reason_code(). + ReasonCode may be compared to integer. + properties: the MQTT v5.0 properties received from the broker. An instance + of the Properties class. + For MQTT v3.1 and v3.1.1 properties is not provided and an empty Properties + object is always used. + flags: response flags sent by the broker + rc: the connection result flags is a dict that contains response flags from the broker: flags['session present'] - this flag is useful for clients that are @@ -2117,8 +2232,8 @@ def on_connect_fail(self) -> CallbackOnConnectFail | None: def on_connect_fail(self, func: CallbackOnConnectFail | None) -> None: """ Define the connection failure callback implementation - Expected signature is: - on_connect_fail(client, userdata) + Expected signature is (for all callback_api_version): + connect_fail_callback(client, userdata) client: the client instance for this callback userdata: the private user data as set in Client() or user_data_set() @@ -2148,22 +2263,30 @@ def on_subscribe(self) -> CallbackOnSubscribe | None: def on_subscribe(self, func: CallbackOnSubscribe | None) -> None: """ Define the subscribe callback implementation. - Expected signature for MQTT v3.1.1 and v3.1 is: - subscribe_callback(client, userdata, mid, granted_qos) - - and for MQTT v5.0: - subscribe_callback(client, userdata, mid, reasonCodes, properties) - - client: the client instance for this callback - userdata: the private user data as set in Client() or user_data_set() - mid: matches the mid variable returned from the corresponding - subscribe() call. - granted_qos: list of integers that give the QoS level the broker has - granted for each of the different subscription requests. - reasonCodes: the MQTT v5.0 reason codes received from the broker for each - subscription. A list of ReasonCodes instances. - properties: the MQTT v5.0 properties received from the broker. A - list of Properties class instances. + Expected signature for callback API version 2: + subscribe_callback(client, userdata, mid, reason_code_list, properties) + + Expected signature for callback API version 1 change with MQTT protocol version: + * For MQTT v3.1 and v3.1.1 it's: + subscribe_callback(client, userdata, mid, granted_qos) + + * For MQTT v5.0 it's: + subscribe_callback(client, userdata, mid, reason_code_list, properties) + + client: the client instance for this callback + userdata: the private user data as set in Client() or user_data_set() + mid: matches the mid variable returned from the corresponding + subscribe() call. + reason_code_list: reason codes received from the broker for each subscription. + In MQTT v5.0 it's the reason code defined by the standard. + In MQTT v3, we convert granted QoS to a reason code. + It's a list of ReasonCodes instances. + properties: the MQTT v5.0 properties received from the broker. An instance + of the Properties class. + For MQTT v3.1 and v3.1.1 properties is not provided and an empty Properties + object is always used. + granted_qos: list of integers that give the QoS level the broker has + granted for each of the different subscription requests. Decorator: @client.subscribe_callback() (```client``` is the name of the instance which this callback is being attached to) @@ -2193,8 +2316,8 @@ def on_message(self) -> CallbackOnMessage | None: def on_message(self, func: CallbackOnMessage | None) -> None: """ Define the message received callback implementation. - Expected signature is: - on_message_callback(client, userdata, message) + Expected signature is (for all callback API version): + message_callback(client, userdata, message) client: the client instance for this callback userdata: the private user data as set in Client() or user_data_set() @@ -2232,13 +2355,28 @@ def on_publish(self) -> CallbackOnPublish | None: def on_publish(self, func: CallbackOnPublish | None) -> None: """ Define the published message callback implementation. - Expected signature is: - on_publish_callback(client, userdata, mid) - - client: the client instance for this callback - userdata: the private user data as set in Client() or user_data_set() - mid: matches the mid variable returned from the corresponding - publish() call, to allow outgoing messages to be tracked. + Expected signature for callback API version 2: + publish_callback(client, userdata, mid, reason_code, properties) + + Expected signature for callback API version 1: + publish_callback(client, userdata, mid) + + client: the client instance for this callback + userdata: the private user data as set in Client() or user_data_set() + mid: matches the mid variable returned from the corresponding + publish() call, to allow outgoing messages to be tracked. + reason_code: the connection reason code received from the broken. + In MQTT v5.0 it's the reason code defined by the standard. + In MQTT v3 it's always the reason code Success + properties: the MQTT v5.0 properties received from the broker. An instance + of the Properties class. + For MQTT v3.1 and v3.1.1 properties is not provided and an empty Properties + object is always used. + + Note: for QoS = 0, the reason_code and the properties don't really exist, it's the client + library that generate them. It's always an empty properties and a success reason code. + Because the (MQTTv5) standard don't have reason code for PUBLISH packet, the library create them + at PUBACK packet, as if the message was sent with QoS = 1. Decorator: @client.publish_callback() (```client``` is the name of the instance which this callback is being attached to) @@ -2265,20 +2403,31 @@ def on_unsubscribe(self) -> CallbackOnUnsubscribe | None: def on_unsubscribe(self, func: CallbackOnUnsubscribe | None) -> None: """ Define the unsubscribe callback implementation. - Expected signature for MQTT v3.1.1 and v3.1 is: - unsubscribe_callback(client, userdata, mid) - - and for MQTT v5.0: - unsubscribe_callback(client, userdata, mid, properties, reasonCodes) - - client: the client instance for this callback - userdata: the private user data as set in Client() or user_data_set() - mid: matches the mid variable returned from the corresponding - unsubscribe() call. - properties: the MQTT v5.0 properties received from the broker. A - list of Properties class instances. - reasonCodes: the MQTT v5.0 reason codes received from the broker for each - unsubscribe topic. A list of ReasonCodes instances + Expected signature for callback API version 2: + unsubscribe_callback(client, userdata, mid, reason_code_list, properties) + + Expected signature for callback API version 1 change with MQTT protocol version: + * For MQTT v3.1 and v3.1.1 it's: + unsubscribe_callback(client, userdata, mid) + + * For MQTT v5.0 it's: + unsubscribe_callback(client, userdata, mid, properties, v1_reason_codes) + + client: the client instance for this callback + userdata: the private user data as set in Client() or user_data_set() + mid: matches the mid variable returned from the corresponding + unsubscribe() call. + reason_code_list: reason codes received from the broker for each unsubscription. + In MQTT v5.0 it's the reason code defined by the standard. + In MQTT v3, there is not equivalent from broken and empty list + is always used. + properties: the MQTT v5.0 properties received from the broker. An instance + of the Properties class. + For MQTT v3.1 and v3.1.1 properties is not provided and an empty Properties + object is always used. + v1_reason_codes: the MQTT v5.0 reason codes received from the broker for each + unsubscribe topic. A list of ReasonCodes instances OR a single + ReasonCodes when we unsubscribe from a single topic. Decorator: @client.unsubscribe_callback() (```client``` is the name of the instance which this callback is being attached to) @@ -2304,19 +2453,33 @@ def on_disconnect(self) -> CallbackOnDisconnect | None: def on_disconnect(self, func: CallbackOnDisconnect | None) -> None: """ Define the disconnect callback implementation. - Expected signature for MQTT v3.1.1 and v3.1 is: - disconnect_callback(client, userdata, rc) - - and for MQTT v5.0: - disconnect_callback(client, userdata, reasonCode, properties) - - client: the client instance for this callback - userdata: the private user data as set in Client() or user_data_set() - rc: the disconnection result - The rc parameter indicates the disconnection state. If - MQTT_ERR_SUCCESS (0), the callback was called in response to - a disconnect() call. If any other value the disconnection - was unexpected, such as might be caused by a network error. + Expected signature for callback API version 2: + disconnect_callback(client, userdata, disconnect_flags, reason_code, properties) + + Expected signature for callback API version 1 change with MQTT protocol version: + * For MQTT v3.1 and v3.1.1 it's: + disconnect_callback(client, userdata, rc) + + * For MQTT it's v5.0: + disconnect_callback(client, userdata, reason_code, properties) + + client: the client instance for this callback + userdata: the private user data as set in Client() or user_data_set() + disconnect_flags: the flags for this disconnection, it has type DisconnectFlags + reason_code: the disconnection reason code possibly received from the broker (see flags). + In MQTT v5.0 it's the reason code defined by the standard. + In MQTT v3 it's never received from the broker, we convert an MQTTErrorCode, + see convert_disconnect_error_code_to_reason_code(). + ReasonCode may be compared to integer. + properties: the MQTT v5.0 properties received from the broker. An instance + of the Properties class. + For MQTT v3.1 and v3.1.1 properties is not provided and an empty Properties + object is always used. + rc: the disconnection result + The rc parameter indicates the disconnection state. If + MQTT_ERR_SUCCESS (0), the callback was called in response to + a disconnect() call. If any other value the disconnection + was unexpected, such as might be caused by a network error. Decorator: @client.disconnect_callback() (```client``` is the name of the instance which this callback is being attached to) @@ -2344,7 +2507,7 @@ def on_socket_open(self, func: CallbackOnSocket | None) -> None: This should be used to register the socket to an external event loop for reading. - Expected signature is: + Expected signature is (for all callback API version): socket_open_callback(client, userdata, socket) client: the client instance for this callback @@ -2391,7 +2554,7 @@ def on_socket_close(self, func: CallbackOnSocket | None) -> None: This should be used to unregister the socket from an external event loop for reading. - Expected signature is: + Expected signature is (for all callback API version): socket_close_callback(client, userdata, socket) client: the client instance for this callback @@ -2438,7 +2601,7 @@ def on_socket_register_write(self, func: CallbackOnSocket | None) -> None: This should be used to register the socket with an external event loop for writing. - Expected signature is: + Expected signature is (for all callback API version): socket_register_write_callback(client, userdata, socket) client: the client instance for this callback @@ -2492,7 +2655,7 @@ def on_socket_unregister_write( This should be used to unregister the socket from an external event loop for writing. - Expected signature is: + Expected signature is (for all callback API version): socket_unregister_write_callback(client, userdata, socket) client: the client instance for this callback @@ -2579,16 +2742,15 @@ def message_callback_remove(self, sub: str) -> None: def _loop_rc_handle( self, - rc: MQTTErrorCode | ReasonCodes | None, - properties: Properties | None = None, - ) -> MQTTErrorCode | ReasonCodes | None: + rc: MQTTErrorCode, + ) -> MQTTErrorCode: if rc: self._sock_close() if self._state == mqtt_cs_disconnecting: rc = MQTTErrorCode.MQTT_ERR_SUCCESS - self._do_on_disconnect(rc, properties) + self._do_on_disconnect(packet_from_broker=False, v1_rc=rc) return rc @@ -2730,12 +2892,24 @@ def _packet_write(self) -> MQTTErrorCode: on_publish = self.on_publish if on_publish: - # For now only v1 is implemented - on_publish = cast(CallbackOnPublish_v1, on_publish) with self._in_callback_mutex: try: - on_publish( - self, self._userdata, packet['mid']) + if self._callback_api_version == CallbackAPIVersion.VERSION1: + on_publish = cast(CallbackOnPublish_v1, on_publish) + + on_publish(self, self._userdata, packet["mid"]) + elif self._callback_api_version == CallbackAPIVersion.VERSION2: + on_publish = cast(CallbackOnPublish_v2, on_publish) + + on_publish( + self, + self._userdata, + packet["mid"], + ReasonCodes(PacketTypes.PUBACK), + Properties(PacketTypes.PUBACK), + ) + else: + raise RuntimeError("Unsupported callback API version") except Exception as err: self._easy_log( MQTT_LOG_ERR, 'Caught exception in on_publish: %s', err) @@ -2752,7 +2926,10 @@ def _packet_write(self) -> MQTTErrorCode: with self._msgtime_mutex: self._last_msg_out = time_func() - self._do_on_disconnect(MQTTErrorCode.MQTT_ERR_SUCCESS) + self._do_on_disconnect( + packet_from_broker=False, + v1_rc=MQTTErrorCode.MQTT_ERR_SUCCESS, + ) self._sock_close() return MQTTErrorCode.MQTT_ERR_SUCCESS @@ -2795,7 +2972,10 @@ def _check_keepalive(self) -> None: self._send_pingreq() except Exception: self._sock_close() - self._do_on_disconnect(MQTT_ERR_CONN_LOST) + self._do_on_disconnect( + packet_from_broker=False, + v1_rc=MQTTErrorCode.MQTT_ERR_CONN_LOST, + ) else: with self._msgtime_mutex: self._last_msg_out = now @@ -2808,7 +2988,10 @@ def _check_keepalive(self) -> None: else: rc = MQTTErrorCode.MQTT_ERR_KEEPALIVE - self._do_on_disconnect(rc) + self._do_on_disconnect( + packet_from_broker=False, + v1_rc=rc, + ) def _mid_generate(self) -> int: with self._mid_generate_mutex: @@ -3376,6 +3559,8 @@ def _handle_connack(self) -> MQTTErrorCode: properties.unpack(self._in_packet['packet'][2:]) else: (flags, result) = struct.unpack("!BB", self._in_packet['packet']) + reason = convert_connack_rc_to_reason_code(result) + properties = None if self._protocol == MQTTv311: if result == CONNACK_REFUSED_PROTOCOL_VERSION: if not self._reconnect_on_failure: @@ -3422,16 +3607,36 @@ def _handle_connack(self) -> MQTTErrorCode: flags_dict['session present'] = flags & 0x01 with self._in_callback_mutex: try: - if self._protocol == MQTTv5: - on_connect = cast(CallbackOnConnect_v1_mqtt5, on_connect) + if self._callback_api_version == CallbackAPIVersion.VERSION1: + if self._protocol == MQTTv5: + on_connect = cast(CallbackOnConnect_v1_mqtt5, on_connect) - on_connect(self, self._userdata, - flags_dict, reason, properties) - else: - on_connect = cast(CallbackOnConnect_v1_mqtt3, on_connect) + on_connect(self, self._userdata, + flags_dict, reason, properties) + else: + on_connect = cast(CallbackOnConnect_v1_mqtt3, on_connect) + + on_connect( + self, self._userdata, flags_dict, result) + elif self._callback_api_version == CallbackAPIVersion.VERSION2: + on_connect = cast(CallbackOnConnect_v2, on_connect) + + connect_flags = ConnectFlags( + session_present=flags_dict['session present'] > 0 + ) + + if properties is None: + properties = Properties(PacketTypes.CONNACK) on_connect( - self, self._userdata, flags_dict, result) + self, + self._userdata, + connect_flags, + reason, + properties, + ) + else: + raise RuntimeError("Unsupported callback API version") except Exception as err: self._easy_log( MQTT_LOG_ERR, 'Caught exception in on_connect: %s', err) @@ -3522,7 +3727,13 @@ def _handle_disconnect(self) -> None: properties ) - self._loop_rc_handle(reasonCode, properties) + self._sock_close() + self._do_on_disconnect( + packet_from_broker=True, + v1_rc=MQTTErrorCode.MQTT_ERR_SUCCESS, # If reason is absent (remaining length < 1), it means normal disconnection + reason=reasonCode, + properties=properties, + ) def _handle_suback(self) -> None: self._easy_log(MQTT_LOG_DEBUG, "Received SUBACK") @@ -3532,13 +3743,12 @@ def _handle_suback(self) -> None: if self._protocol == MQTTv5: properties = Properties(SUBACK >> 4) props, props_len = properties.unpack(packet) - reasoncodes = [ - ReasonCodes(SUBACK >> 4, identifier=c) - for c in packet[props_len:] - ] + reasoncodes = [ReasonCodes(SUBACK >> 4, identifier=c) for c in packet[props_len:]] else: pack_format = f"!{'B' * len(packet)}" granted_qos = struct.unpack(pack_format, packet) + reasoncodes = [ReasonCodes(SUBACK >> 4, identifier=c) for c in granted_qos] + properties = Properties(SUBACK >> 4) with self._callback_mutex: on_subscribe = self.on_subscribe @@ -3546,16 +3756,29 @@ def _handle_suback(self) -> None: if on_subscribe: with self._in_callback_mutex: # Don't call loop_write after _send_publish() try: - if self._protocol == MQTTv5: - on_subscribe = cast(CallbackOnSubscribe_v1_mqtt5, on_subscribe) + if self._callback_api_version == CallbackAPIVersion.VERSION1: + if self._protocol == MQTTv5: + on_subscribe = cast(CallbackOnSubscribe_v1_mqtt5, on_subscribe) - on_subscribe( - self, self._userdata, mid, reasoncodes, properties) - else: - on_subscribe = cast(CallbackOnSubscribe_v1_mqtt3, on_subscribe) + on_subscribe( + self, self._userdata, mid, reasoncodes, properties) + else: + on_subscribe = cast(CallbackOnSubscribe_v1_mqtt3, on_subscribe) + + on_subscribe( + self, self._userdata, mid, granted_qos) + elif self._callback_api_version == CallbackAPIVersion.VERSION2: + on_subscribe = cast(CallbackOnSubscribe_v2, on_subscribe) on_subscribe( - self, self._userdata, mid, granted_qos) + self, + self._userdata, + mid, + reasoncodes, + properties, + ) + else: + raise RuntimeError("Unsupported callback API version") except Exception as err: self._easy_log( MQTT_LOG_ERR, 'Caught exception in on_subscribe: %s', err) @@ -3760,10 +3983,9 @@ def _handle_unsuback(self) -> MQTTErrorCode: ReasonCodes(UNSUBACK >> 4, identifier=c) for c in packet[props_len:] ] - - reasoncodes: ReasonCodes | list[ReasonCodes] = reasoncodes_list - if len(reasoncodes_list) == 1: - reasoncodes = reasoncodes_list[0] + else: + reasoncodes_list = [] + properties = Properties(UNSUBACK >> 4) self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: %d)", mid) with self._callback_mutex: @@ -3772,15 +3994,35 @@ def _handle_unsuback(self) -> MQTTErrorCode: if on_unsubscribe: with self._in_callback_mutex: try: - if self._protocol == MQTTv5: - on_unsubscribe = cast(CallbackOnUnsubscribe_v1_mqtt5, on_unsubscribe) + if self._callback_api_version == CallbackAPIVersion.VERSION1: + if self._protocol == MQTTv5: + on_unsubscribe = cast(CallbackOnUnsubscribe_v1_mqtt5, on_unsubscribe) + + reasoncodes: ReasonCodes | list[ReasonCodes] = reasoncodes_list + if len(reasoncodes_list) == 1: + reasoncodes = reasoncodes_list[0] + + on_unsubscribe( + self, self._userdata, mid, properties, reasoncodes) + else: + on_unsubscribe = cast(CallbackOnUnsubscribe_v1_mqtt3, on_unsubscribe) + + on_unsubscribe(self, self._userdata, mid) + elif self._callback_api_version == CallbackAPIVersion.VERSION2: + on_unsubscribe = cast(CallbackOnUnsubscribe_v2, on_unsubscribe) + + if properties is None: + properties = Properties(PacketTypes.CONNACK) on_unsubscribe( - self, self._userdata, mid, properties, reasoncodes) + self, + self._userdata, + mid, + reasoncodes_list, + properties, + ) else: - on_unsubscribe = cast(CallbackOnUnsubscribe_v1_mqtt3, on_unsubscribe) - - on_unsubscribe(self, self._userdata, mid) + raise RuntimeError("Unsupported callback API version") except Exception as err: self._easy_log( MQTT_LOG_ERR, 'Caught exception in on_unsubscribe: %s', err) @@ -3791,7 +4033,9 @@ def _handle_unsuback(self) -> MQTTErrorCode: def _do_on_disconnect( self, - rc: MQTTErrorCode | ReasonCodes, + packet_from_broker: bool, + v1_rc: MQTTErrorCode, + reason: ReasonCodes | None = None, properties: Properties | None = None, ) -> None: with self._callback_mutex: @@ -3800,34 +4044,69 @@ def _do_on_disconnect( if on_disconnect: with self._in_callback_mutex: try: - if self._protocol == MQTTv5: - on_disconnect = cast(CallbackOnDisconnect_v1_mqtt5, on_disconnect) + if self._callback_api_version == CallbackAPIVersion.VERSION1: + if self._protocol == MQTTv5: + on_disconnect = cast(CallbackOnDisconnect_v1_mqtt5, on_disconnect) + + if packet_from_broker: + on_disconnect(self, self._userdata, reason, properties) + else: + on_disconnect(self, self._userdata, v1_rc, None) + else: + on_disconnect = cast(CallbackOnDisconnect_v1_mqtt3, on_disconnect) + + on_disconnect(self, self._userdata, v1_rc) + elif self._callback_api_version == CallbackAPIVersion.VERSION2: + on_disconnect = cast(CallbackOnDisconnect_v2, on_disconnect) + + disconnect_flags = DisconnectFlags( + is_disconnect_packet_from_server=packet_from_broker + ) + + if reason is None: + reason = convert_disconnect_error_code_to_reason_code(v1_rc) + + if properties is None: + properties = Properties(PacketTypes.DISCONNECT) on_disconnect( - self, self._userdata, rc, properties) + self, + self._userdata, + disconnect_flags, + reason, + properties, + ) else: - on_disconnect = cast(CallbackOnDisconnect_v1_mqtt3, on_disconnect) - # rc could be a ReasonCode only in handle_disconnect with MQTTv5 - rc = cast(MQTTErrorCode, rc) - - on_disconnect(self, self._userdata, rc) + raise RuntimeError("Unsupported callback API version") except Exception as err: self._easy_log( MQTT_LOG_ERR, 'Caught exception in on_disconnect: %s', err) if not self.suppress_exceptions: raise - def _do_on_publish(self, mid: int) -> MQTTErrorCode: + def _do_on_publish(self, mid: int, reason_code: ReasonCodes, properties: Properties) -> MQTTErrorCode: with self._callback_mutex: on_publish = self.on_publish if on_publish: - # For now only v1 is implemented - on_publish = cast(CallbackOnPublish_v1, on_publish) - with self._in_callback_mutex: try: - on_publish(self, self._userdata, mid) + if self._callback_api_version == CallbackAPIVersion.VERSION1: + on_publish = cast(CallbackOnPublish_v1, on_publish) + + on_publish(self, self._userdata, mid) + elif self._callback_api_version == CallbackAPIVersion.VERSION2: + on_publish = cast(CallbackOnPublish_v2, on_publish) + + on_publish( + self, + self._userdata, + mid, + reason_code, + properties, + ) + else: + raise RuntimeError("Unsupported callback API version") except Exception as err: self._easy_log( MQTT_LOG_ERR, 'Caught exception in on_publish: %s', err) @@ -3856,12 +4135,12 @@ def _handle_pubackcomp( packet_type_enum = PUBACK if cmd == "PUBACK" else PUBCOMP packet_type = packet_type_enum.value >> 4 mid, = struct.unpack("!H", self._in_packet['packet'][:2]) + reasonCode = ReasonCodes(packet_type) + properties = Properties(packet_type) if self._protocol == MQTTv5: if self._in_packet['remaining_length'] > 2: - reasonCode = ReasonCodes(packet_type) reasonCode.unpack(self._in_packet['packet'][2:]) if self._in_packet['remaining_length'] > 3: - properties = Properties(packet_type) props, props_len = properties.unpack( self._in_packet['packet'][3:]) self._easy_log(MQTT_LOG_DEBUG, "Received %s (Mid: %d)", cmd, mid) @@ -3869,7 +4148,7 @@ def _handle_pubackcomp( with self._out_message_mutex: if mid in self._out_messages: # Only inform the client the message has been sent once. - rc = self._do_on_publish(mid) + rc = self._do_on_publish(mid, reasonCode, properties) return rc return MQTTErrorCode.MQTT_ERR_SUCCESS diff --git a/src/paho/mqtt/enums.py b/src/paho/mqtt/enums.py index 33f18cb6..77fefa21 100644 --- a/src/paho/mqtt/enums.py +++ b/src/paho/mqtt/enums.py @@ -28,6 +28,11 @@ class MQTTProtocolVersion(enum.IntEnum): MQTTv5 = 5 +class CallbackAPIVersion(enum.Enum): + VERSION1 = 1 + VERSION2 = 2 + + class MessageType(enum.IntEnum): CONNECT = 0x10 CONNACK = 0x20 diff --git a/src/paho/mqtt/publish.py b/src/paho/mqtt/publish.py index 0e68a82a..260a4800 100644 --- a/src/paho/mqtt/publish.py +++ b/src/paho/mqtt/publish.py @@ -24,6 +24,10 @@ from collections.abc import Iterable from typing import TYPE_CHECKING, Any, List, Tuple, Union +from paho.mqtt.enums import CallbackAPIVersion +from paho.mqtt.properties import Properties +from paho.mqtt.reasoncodes import ReasonCodes + from .. import mqtt from . import client as paho @@ -72,22 +76,17 @@ def _do_publish(client: paho.Client): raise TypeError('message must be a dict, tuple, or list') -def _on_connect(client, userdata, flags, rc): - """Internal callback""" - #pylint: disable=invalid-name, unused-argument - - if rc == 0: +def _on_connect(client: paho.Client, userdata: MessagesList, flags, reason_code, properties): + """Internal v5 callback""" + if reason_code == 0: if len(userdata) > 0: _do_publish(client) else: - raise mqtt.MQTTException(paho.connack_string(rc)) + raise mqtt.MQTTException(paho.connack_string(reason_code)) -def _on_connect_v5(client: paho.Client, userdata: MessagesList, flags, rc, properties): - """Internal v5 callback""" - _on_connect(client, userdata, flags, rc) def _on_publish( - client: paho.Client, userdata: collections.deque[MessagesList], mid: int + client: paho.Client, userdata: collections.deque[MessagesList], mid: int, reason_codes: ReasonCodes, properties: Properties, ) -> None: """Internal callback""" #pylint: disable=unused-argument @@ -176,15 +175,16 @@ def multiple( if not isinstance(msgs, Iterable): raise TypeError('msgs must be an iterable') - - client = paho.Client(client_id=client_id, userdata=collections.deque(msgs), - protocol=protocol, transport=transport) + client = paho.Client( + CallbackAPIVersion.VERSION2, + client_id=client_id, + userdata=collections.deque(msgs), + protocol=protocol, + transport=transport, + ) client.on_publish = _on_publish - if protocol == mqtt.client.MQTTv5: - client.on_connect = _on_connect_v5 # type: ignore - else: - client.on_connect = _on_connect # type: ignore + client.on_connect = _on_connect # type: ignore if proxy_args is not None: client.proxy_set(**proxy_args) diff --git a/src/paho/mqtt/subscribe.py b/src/paho/mqtt/subscribe.py index 076e35f2..c90cd7d5 100644 --- a/src/paho/mqtt/subscribe.py +++ b/src/paho/mqtt/subscribe.py @@ -23,10 +23,10 @@ from . import client as paho -def _on_connect_v5(client, userdata, flags, rc, properties): +def _on_connect(client, userdata, flags, reason_code, properties): """Internal callback""" - if rc != 0: - raise mqtt.MQTTException(paho.connack_string(rc)) + if reason_code != 0: + raise mqtt.MQTTException(paho.connack_string(reason_code)) if isinstance(userdata['topics'], list): for topic in userdata['topics']: @@ -34,10 +34,6 @@ def _on_connect_v5(client, userdata, flags, rc, properties): else: client.subscribe(userdata['topics'], userdata['qos']) -def _on_connect(client, userdata, flags, rc): - """Internal v5 callback""" - _on_connect_v5(client, userdata, flags, rc, None) - def _on_message_callback(client, userdata, message): """Internal callback""" @@ -142,14 +138,17 @@ def callback(callback, topics, qos=0, userdata=None, hostname="localhost", 'qos':qos, 'userdata':userdata} - client = paho.Client(client_id=client_id, userdata=callback_userdata, - protocol=protocol, transport=transport, - clean_session=clean_session) + client = paho.Client( + paho.CallbackAPIVersion.VERSION2, + client_id=client_id, + userdata=callback_userdata, + protocol=protocol, + transport=transport, + clean_session=clean_session, + ) + client.on_message = _on_message_callback - if protocol == mqtt.client.MQTTv5: - client.on_connect = _on_connect_v5 - else: - client.on_connect = _on_connect + client.on_connect = _on_connect if proxy_args is not None: client.proxy_set(**proxy_args) diff --git a/tests/lib/clients/01-asyncio.py b/tests/lib/clients/01-asyncio.py index eeab4433..a8ba9280 100644 --- a/tests/lib/clients/01-asyncio.py +++ b/tests/lib/clients/01-asyncio.py @@ -70,7 +70,7 @@ def on_disconnect(client, userdata, rc): disconnected = loop.create_future() - client = mqtt.Client(client_id=client_id) + client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION1, client_id=client_id) client.on_connect = on_connect client.on_message = on_message client.on_publish = on_publish diff --git a/tests/lib/clients/01-decorators.py b/tests/lib/clients/01-decorators.py index 7e3633a2..55e6d485 100644 --- a/tests/lib/clients/01-decorators.py +++ b/tests/lib/clients/01-decorators.py @@ -2,7 +2,7 @@ from tests.paho_test import get_test_server_port, loop_until_keyboard_interrupt -mqttc = mqtt.Client("decorators-test", clean_session=True) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "decorators-test", clean_session=True) payload = b"" diff --git a/tests/lib/clients/01-keepalive-pingreq.py b/tests/lib/clients/01-keepalive-pingreq.py index e6ab8545..ffa6383b 100644 --- a/tests/lib/clients/01-keepalive-pingreq.py +++ b/tests/lib/clients/01-keepalive-pingreq.py @@ -7,7 +7,7 @@ def on_connect(mqttc, obj, flags, rc): assert rc == 0, f"Connect failed ({rc})" -mqttc = mqtt.Client("01-keepalive-pingreq") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "01-keepalive-pingreq") mqttc.on_connect = on_connect mqttc.connect("localhost", get_test_server_port(), keepalive=4) diff --git a/tests/lib/clients/01-no-clean-session.py b/tests/lib/clients/01-no-clean-session.py index a620ef99..62966130 100644 --- a/tests/lib/clients/01-no-clean-session.py +++ b/tests/lib/clients/01-no-clean-session.py @@ -2,7 +2,7 @@ from tests.paho_test import get_test_server_port, loop_until_keyboard_interrupt -mqttc = mqtt.Client("01-no-clean-session", clean_session=False) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "01-no-clean-session", clean_session=False) mqttc.connect("localhost", get_test_server_port()) loop_until_keyboard_interrupt(mqttc) diff --git a/tests/lib/clients/01-reconnect-on-failure.py b/tests/lib/clients/01-reconnect-on-failure.py index afbd3b77..907a2f18 100644 --- a/tests/lib/clients/01-reconnect-on-failure.py +++ b/tests/lib/clients/01-reconnect-on-failure.py @@ -7,7 +7,7 @@ def on_connect(mqttc, obj, flags, rc): mqttc.publish("reconnect/test", "message") -mqttc = mqtt.Client("01-reconnect-on-failure", reconnect_on_failure=False) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "01-reconnect-on-failure", reconnect_on_failure=False) mqttc.on_connect = on_connect with wait_for_keyboard_interrupt(): diff --git a/tests/lib/clients/01-unpwd-empty-password-set.py b/tests/lib/clients/01-unpwd-empty-password-set.py index c1115800..40a1a434 100644 --- a/tests/lib/clients/01-unpwd-empty-password-set.py +++ b/tests/lib/clients/01-unpwd-empty-password-set.py @@ -2,7 +2,7 @@ from tests.paho_test import get_test_server_port, loop_until_keyboard_interrupt -mqttc = mqtt.Client("01-unpwd-set") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "01-unpwd-set") mqttc.username_pw_set("uname", "") mqttc.connect("localhost", get_test_server_port()) diff --git a/tests/lib/clients/01-unpwd-empty-set.py b/tests/lib/clients/01-unpwd-empty-set.py index 300c18a1..b65d6797 100644 --- a/tests/lib/clients/01-unpwd-empty-set.py +++ b/tests/lib/clients/01-unpwd-empty-set.py @@ -2,7 +2,7 @@ from tests.paho_test import get_test_server_port, loop_until_keyboard_interrupt -mqttc = mqtt.Client("01-unpwd-set") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "01-unpwd-set") mqttc.username_pw_set("", "") mqttc.connect("localhost", get_test_server_port()) diff --git a/tests/lib/clients/01-unpwd-set.py b/tests/lib/clients/01-unpwd-set.py index a872a28c..763297f5 100644 --- a/tests/lib/clients/01-unpwd-set.py +++ b/tests/lib/clients/01-unpwd-set.py @@ -2,7 +2,7 @@ from tests.paho_test import get_test_server_port, loop_until_keyboard_interrupt -mqttc = mqtt.Client("01-unpwd-set") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "01-unpwd-set") mqttc.username_pw_set("uname", ";'[08gn=#") mqttc.connect("localhost", get_test_server_port()) diff --git a/tests/lib/clients/01-unpwd-unicode-set.py b/tests/lib/clients/01-unpwd-unicode-set.py index b0d12f9d..58445417 100644 --- a/tests/lib/clients/01-unpwd-unicode-set.py +++ b/tests/lib/clients/01-unpwd-unicode-set.py @@ -3,7 +3,7 @@ from tests.paho_test import get_test_server_port, loop_until_keyboard_interrupt -mqttc = mqtt.Client("01-unpwd-unicode-set") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "01-unpwd-unicode-set") username = "\u00fas\u00e9rn\u00e1m\u00e9-h\u00e9ll\u00f3" password = "h\u00e9ll\u00f3" diff --git a/tests/lib/clients/01-will-set.py b/tests/lib/clients/01-will-set.py index b273da32..c3310a49 100644 --- a/tests/lib/clients/01-will-set.py +++ b/tests/lib/clients/01-will-set.py @@ -2,7 +2,7 @@ from tests.paho_test import get_test_server_port, loop_until_keyboard_interrupt -mqttc = mqtt.Client("01-will-set") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "01-will-set") mqttc.will_set("topic/on/unexpected/disconnect", "will message", 1, True) mqttc.connect("localhost", get_test_server_port()) diff --git a/tests/lib/clients/01-will-unpwd-set.py b/tests/lib/clients/01-will-unpwd-set.py index 3510716e..31bb976c 100644 --- a/tests/lib/clients/01-will-unpwd-set.py +++ b/tests/lib/clients/01-will-unpwd-set.py @@ -2,7 +2,7 @@ from tests.paho_test import get_test_server_port, loop_until_keyboard_interrupt -mqttc = mqtt.Client("01-will-unpwd-set") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "01-will-unpwd-set") mqttc.username_pw_set("oibvvwqw", "#'^2hg9a&nm38*us") mqttc.will_set("will-topic", "will message", 2, False) diff --git a/tests/lib/clients/01-zero-length-clientid.py b/tests/lib/clients/01-zero-length-clientid.py index fcd2d446..992efedc 100644 --- a/tests/lib/clients/01-zero-length-clientid.py +++ b/tests/lib/clients/01-zero-length-clientid.py @@ -12,7 +12,7 @@ def on_disconnect(mqttc, obj, rc): mqttc.loop() -mqttc = mqtt.Client("", clean_session=True, protocol=mqtt.MQTTv311) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "", clean_session=True, protocol=mqtt.MQTTv311) mqttc.on_connect = on_connect mqttc.on_disconnect = on_disconnect diff --git a/tests/lib/clients/02-subscribe-qos0.py b/tests/lib/clients/02-subscribe-qos0.py index ee602ca4..2444ffb0 100644 --- a/tests/lib/clients/02-subscribe-qos0.py +++ b/tests/lib/clients/02-subscribe-qos0.py @@ -12,7 +12,7 @@ def on_subscribe(mqttc, obj, mid, granted_qos): mqttc.disconnect() -mqttc = mqtt.Client("subscribe-qos0-test", clean_session=True) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "subscribe-qos0-test", clean_session=True) mqttc.on_connect = on_connect mqttc.on_subscribe = on_subscribe diff --git a/tests/lib/clients/02-subscribe-qos1.py b/tests/lib/clients/02-subscribe-qos1.py index 9bbe044a..2079de6e 100644 --- a/tests/lib/clients/02-subscribe-qos1.py +++ b/tests/lib/clients/02-subscribe-qos1.py @@ -12,7 +12,7 @@ def on_subscribe(mqttc, obj, mid, granted_qos): mqttc.disconnect() -mqttc = mqtt.Client("subscribe-qos1-test", clean_session=True) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "subscribe-qos1-test", clean_session=True) mqttc.on_connect = on_connect mqttc.on_subscribe = on_subscribe diff --git a/tests/lib/clients/02-subscribe-qos2.py b/tests/lib/clients/02-subscribe-qos2.py index 678bf17c..4776a769 100644 --- a/tests/lib/clients/02-subscribe-qos2.py +++ b/tests/lib/clients/02-subscribe-qos2.py @@ -12,7 +12,7 @@ def on_subscribe(mqttc, obj, mid, granted_qos): mqttc.disconnect() -mqttc = mqtt.Client("subscribe-qos2-test", clean_session=True) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "subscribe-qos2-test", clean_session=True) mqttc.on_connect = on_connect mqttc.on_subscribe = on_subscribe diff --git a/tests/lib/clients/02-unsubscribe.py b/tests/lib/clients/02-unsubscribe.py index b3998dc4..de36bfb6 100644 --- a/tests/lib/clients/02-unsubscribe.py +++ b/tests/lib/clients/02-unsubscribe.py @@ -12,7 +12,7 @@ def on_unsubscribe(mqttc, obj, mid): mqttc.disconnect() -mqttc = mqtt.Client("unsubscribe-test", clean_session=True) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "unsubscribe-test", clean_session=True) mqttc.on_connect = on_connect mqttc.on_unsubscribe = on_unsubscribe diff --git a/tests/lib/clients/03-publish-b2c-qos1.py b/tests/lib/clients/03-publish-b2c-qos1.py index 9efc928d..81beef96 100644 --- a/tests/lib/clients/03-publish-b2c-qos1.py +++ b/tests/lib/clients/03-publish-b2c-qos1.py @@ -17,7 +17,7 @@ def on_connect(mqttc, obj, flags, rc): assert rc == 0, f"Connect failed ({rc})" -mqttc = mqtt.Client("publish-qos1-test") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "publish-qos1-test") mqttc.on_connect = on_connect mqttc.on_message = on_message diff --git a/tests/lib/clients/03-publish-b2c-qos2.py b/tests/lib/clients/03-publish-b2c-qos2.py index 246cb097..96e01d5e 100644 --- a/tests/lib/clients/03-publish-b2c-qos2.py +++ b/tests/lib/clients/03-publish-b2c-qos2.py @@ -17,7 +17,7 @@ def on_connect(mqttc, obj, flags, rc): assert rc == 0, f"Connect failed ({rc})" -mqttc = mqtt.Client("publish-qos2-test", clean_session=True) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "publish-qos2-test", clean_session=True) mqttc.on_connect = on_connect mqttc.on_message = on_message diff --git a/tests/lib/clients/03-publish-c2b-qos1-disconnect.py b/tests/lib/clients/03-publish-c2b-qos1-disconnect.py index 17a0df3d..52fa5167 100644 --- a/tests/lib/clients/03-publish-c2b-qos1-disconnect.py +++ b/tests/lib/clients/03-publish-c2b-qos1-disconnect.py @@ -24,7 +24,7 @@ def on_publish(mqttc, obj, mid): mqttc.disconnect() -mqttc = mqtt.Client("publish-qos1-test", clean_session=False) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "publish-qos1-test", clean_session=False) mqttc.on_connect = on_connect mqttc.on_disconnect = on_disconnect mqttc.on_publish = on_publish diff --git a/tests/lib/clients/03-publish-c2b-qos2-disconnect.py b/tests/lib/clients/03-publish-c2b-qos2-disconnect.py index 6e0fddfe..fe7c2ae2 100644 --- a/tests/lib/clients/03-publish-c2b-qos2-disconnect.py +++ b/tests/lib/clients/03-publish-c2b-qos2-disconnect.py @@ -22,7 +22,7 @@ def on_publish(mqttc, obj, mid): mqttc.disconnect() -mqttc = mqtt.Client("publish-qos2-test", clean_session=False) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "publish-qos2-test", clean_session=False) mqttc.on_connect = on_connect mqttc.on_disconnect = on_disconnect mqttc.on_publish = on_publish diff --git a/tests/lib/clients/03-publish-fill-inflight.py b/tests/lib/clients/03-publish-fill-inflight.py index f954dd13..0a7eb857 100644 --- a/tests/lib/clients/03-publish-fill-inflight.py +++ b/tests/lib/clients/03-publish-fill-inflight.py @@ -28,7 +28,7 @@ def on_disconnect(mqttc, rc, properties): logging.basicConfig(level=logging.DEBUG) logging.info(str(mqtt)) -mqttc = mqtt.Client("publish-qos1-test") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "publish-qos1-test") mqttc.max_inflight_messages_set(10) mqttc.on_connect = on_connect mqttc.on_disconnect = on_disconnect diff --git a/tests/lib/clients/03-publish-qos0-no-payload.py b/tests/lib/clients/03-publish-qos0-no-payload.py index 37458a83..7ac8e351 100644 --- a/tests/lib/clients/03-publish-qos0-no-payload.py +++ b/tests/lib/clients/03-publish-qos0-no-payload.py @@ -16,7 +16,7 @@ def on_publish(mqttc, obj, mid): mqttc.disconnect() -mqttc = mqtt.Client("publish-qos0-test-np", clean_session=True) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "publish-qos0-test-np", clean_session=True) mqttc.on_connect = on_connect mqttc.on_publish = on_publish diff --git a/tests/lib/clients/03-publish-qos0.py b/tests/lib/clients/03-publish-qos0.py index 85386dda..dc6ac745 100644 --- a/tests/lib/clients/03-publish-qos0.py +++ b/tests/lib/clients/03-publish-qos0.py @@ -18,7 +18,7 @@ def on_publish(mqttc, obj, mid): mqttc.disconnect() -mqttc = mqtt.Client("publish-qos0-test", clean_session=True) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "publish-qos0-test", clean_session=True) mqttc.on_connect = on_connect mqttc.on_publish = on_publish diff --git a/tests/lib/clients/04-retain-qos0.py b/tests/lib/clients/04-retain-qos0.py index b50a7381..bdfa660d 100644 --- a/tests/lib/clients/04-retain-qos0.py +++ b/tests/lib/clients/04-retain-qos0.py @@ -8,7 +8,7 @@ def on_connect(mqttc, obj, flags, rc): mqttc.publish("retain/qos0/test", "retained message", 0, True) -mqttc = mqtt.Client("retain-qos0-test", clean_session=True) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "retain-qos0-test", clean_session=True) mqttc.on_connect = on_connect mqttc.connect("localhost", get_test_server_port()) diff --git a/tests/lib/clients/08-ssl-connect-alpn.py b/tests/lib/clients/08-ssl-connect-alpn.py index 513f2b4c..f830e506 100755 --- a/tests/lib/clients/08-ssl-connect-alpn.py +++ b/tests/lib/clients/08-ssl-connect-alpn.py @@ -10,7 +10,7 @@ def on_connect(mqttc, obj, flags, rc): mqttc.disconnect() -mqttc = mqtt.Client("08-ssl-connect-alpn", clean_session=True) +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "08-ssl-connect-alpn", clean_session=True) mqttc.tls_set( os.path.join(os.environ["PAHO_SSL_PATH"], "all-ca.crt"), os.path.join(os.environ["PAHO_SSL_PATH"], "client.crt"), diff --git a/tests/lib/clients/08-ssl-connect-cert-auth-pw.py b/tests/lib/clients/08-ssl-connect-cert-auth-pw.py index f72c3956..4681dc42 100644 --- a/tests/lib/clients/08-ssl-connect-cert-auth-pw.py +++ b/tests/lib/clients/08-ssl-connect-cert-auth-pw.py @@ -10,7 +10,7 @@ def on_connect(mqttc, obj, flags, rc): mqttc.disconnect() -mqttc = mqtt.Client("08-ssl-connect-crt-auth-pw") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "08-ssl-connect-crt-auth-pw") mqttc.tls_set( os.path.join(os.environ["PAHO_SSL_PATH"], "all-ca.crt"), os.path.join(os.environ["PAHO_SSL_PATH"], "client-pw.crt"), diff --git a/tests/lib/clients/08-ssl-connect-cert-auth.py b/tests/lib/clients/08-ssl-connect-cert-auth.py index 53be547b..a34409a3 100644 --- a/tests/lib/clients/08-ssl-connect-cert-auth.py +++ b/tests/lib/clients/08-ssl-connect-cert-auth.py @@ -10,7 +10,7 @@ def on_connect(mqttc, obj, flags, rc): mqttc.disconnect() -mqttc = mqtt.Client("08-ssl-connect-crt-auth") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "08-ssl-connect-crt-auth") mqttc.tls_set( os.path.join(os.environ["PAHO_SSL_PATH"], "all-ca.crt"), os.path.join(os.environ["PAHO_SSL_PATH"], "client.crt"), diff --git a/tests/lib/clients/08-ssl-connect-no-auth.py b/tests/lib/clients/08-ssl-connect-no-auth.py index ee8989f3..9d86f288 100644 --- a/tests/lib/clients/08-ssl-connect-no-auth.py +++ b/tests/lib/clients/08-ssl-connect-no-auth.py @@ -10,7 +10,7 @@ def on_connect(mqttc, obj, flags, rc): mqttc.disconnect() -mqttc = mqtt.Client("08-ssl-connect-no-auth") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "08-ssl-connect-no-auth") mqttc.tls_set(os.path.join(os.environ["PAHO_SSL_PATH"], "all-ca.crt")) mqttc.on_connect = on_connect diff --git a/tests/lib/clients/08-ssl-fake-cacert.py b/tests/lib/clients/08-ssl-fake-cacert.py index 04b146ed..ffa53645 100644 --- a/tests/lib/clients/08-ssl-fake-cacert.py +++ b/tests/lib/clients/08-ssl-fake-cacert.py @@ -10,7 +10,7 @@ def on_connect(mqttc, obj, flags, rc): raise RuntimeError("Connection should have failed!") -mqttc = mqtt.Client("08-ssl-fake-cacert") +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "08-ssl-fake-cacert") mqttc.tls_set( os.path.join(os.environ["PAHO_SSL_PATH"], "test-fake-root-ca.crt"), os.path.join(os.environ["PAHO_SSL_PATH"], "client.crt"), diff --git a/tests/lib/test_08_ssl_bad_cacert.py b/tests/lib/test_08_ssl_bad_cacert.py index 1cb5ec8b..0031a2fd 100644 --- a/tests/lib/test_08_ssl_bad_cacert.py +++ b/tests/lib/test_08_ssl_bad_cacert.py @@ -4,5 +4,5 @@ def test_08_ssl_bad_cacert(): with pytest.raises(IOError): - mqttc = mqtt.Client("08-ssl-bad-cacert") + mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "08-ssl-bad-cacert") mqttc.tls_set("this/file/doesnt/exist") diff --git a/tests/test_client.py b/tests/test_client.py index 9775f90f..f0ff5c4f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -3,6 +3,9 @@ import paho.mqtt.client as client import pytest +from paho.mqtt.enums import CallbackAPIVersion +from paho.mqtt.packettypes import PacketTypes +from paho.mqtt.properties import Properties from paho.mqtt.reasoncodes import ReasonCodes import tests.paho_test as paho_test @@ -11,21 +14,26 @@ from tests.testsupport.broker import fake_broker # noqa: F401 -@pytest.mark.parametrize("proto_ver", [ - (client.MQTTv31), - (client.MQTTv311), +@pytest.mark.parametrize("proto_ver,callback_version", [ + (client.MQTTv31, CallbackAPIVersion.VERSION1), + (client.MQTTv31, CallbackAPIVersion.VERSION2), + (client.MQTTv311, CallbackAPIVersion.VERSION1), + (client.MQTTv311, CallbackAPIVersion.VERSION2), ]) class Test_connect: """ Tests on connect/disconnect behaviour of the client """ - def test_01_con_discon_success(self, proto_ver, fake_broker): + def test_01_con_discon_success(self, proto_ver, callback_version, fake_broker): mqttc = client.Client( - "01-con-discon-success", protocol=proto_ver) + callback_version, + "01-con-discon-success", + protocol=proto_ver, + ) - def on_connect(mqttc, obj, flags, rc): - assert rc == 0 + def on_connect(mqttc, obj, flags, rc_or_reason_code, properties_or_none=None): + assert rc_or_reason_code == 0 mqttc.disconnect() mqttc.on_connect = on_connect @@ -59,12 +67,17 @@ def on_connect(mqttc, obj, flags, rc): packet_in = fake_broker.receive_packet(1) assert not packet_in # Check connection is closed - def test_01_con_failure_rc(self, proto_ver, fake_broker): + def test_01_con_failure_rc(self, proto_ver, callback_version, fake_broker): mqttc = client.Client( - "01-con-failure-rc", protocol=proto_ver) + callback_version, "01-con-failure-rc", protocol=proto_ver) - def on_connect(mqttc, obj, flags, rc): - assert rc == 1 + def on_connect(mqttc, obj, flags, rc_or_reason_code, properties_or_none=None): + assert rc_or_reason_code > 0 + assert rc_or_reason_code != 0 + if callback_version == CallbackAPIVersion.VERSION1: + assert rc_or_reason_code == 1 + else: + assert rc_or_reason_code == ReasonCodes(PacketTypes.CONNACK, "Unsupported protocol version") mqttc.on_connect = on_connect @@ -100,7 +113,7 @@ class Test_connect_v5: def test_01_broker_no_support(self, fake_broker): mqttc = client.Client( - "01-broker-no-support", protocol=client.MQTTv5) + CallbackAPIVersion.VERSION2, "01-broker-no-support", protocol=client.MQTTv5) def on_connect(mqttc, obj, flags, reason, properties): assert reason == 132 @@ -137,10 +150,13 @@ def on_connect(mqttc, obj, flags, reason, properties): mqttc.loop_stop() +@pytest.mark.parametrize("callback_version", [ + (CallbackAPIVersion.VERSION1), + (CallbackAPIVersion.VERSION2), +]) class TestPublishBroker2Client: - - def test_invalid_utf8_topic(self, fake_broker): - mqttc = client.Client("client-id") + def test_invalid_utf8_topic(self, callback_version, fake_broker): + mqttc = client.Client(callback_version, "client-id") def on_message(client, userdata, msg): with pytest.raises(UnicodeDecodeError): @@ -181,8 +197,8 @@ def on_message(client, userdata, msg): packet_in = fake_broker.receive_packet(1) assert not packet_in # Check connection is closed - def test_valid_utf8_topic_recv(self, fake_broker): - mqttc = client.Client("client-id") + def test_valid_utf8_topic_recv(self, callback_version, fake_broker): + mqttc = client.Client(callback_version, "client-id") # It should be non-ascii multi-bytes character topic = unicodedata.lookup('SNOWMAN') @@ -227,8 +243,8 @@ def on_message(client, userdata, msg): packet_in = fake_broker.receive_packet(1) assert not packet_in # Check connection is closed - def test_valid_utf8_topic_publish(self, fake_broker): - mqttc = client.Client("client-id") + def test_valid_utf8_topic_publish(self, callback_version, fake_broker): + mqttc = client.Client(callback_version, "client-id") # It should be non-ascii multi-bytes character topic = unicodedata.lookup('SNOWMAN') @@ -273,8 +289,8 @@ def test_valid_utf8_topic_publish(self, fake_broker): packet_in = fake_broker.receive_packet(1) assert not packet_in # Check connection is closed - def test_message_callback(self, fake_broker): - mqttc = client.Client("client-id") + def test_message_callback(self, callback_version, fake_broker): + mqttc = client.Client(callback_version, "client-id") userdata = { 'on_message': 0, 'callback1': 0, @@ -403,12 +419,18 @@ def test_change_error_code_to_enum(self): # operation assert rc_ok + 1 == 1 + def test_migration_callback_version(self): + with pytest.raises(ValueError, match="see migrations.md"): + _ = client.Client("client-id") + def test_callback_v1_mqtt3(self, fake_broker): callback_called = [] - mqttc = client.Client( - "client-id", - userdata=callback_called, - ) + with pytest.deprecated_call(): + mqttc = client.Client( + CallbackAPIVersion.VERSION1, + "client-id", + userdata=callback_called, + ) def on_connect(cl, userdata, flags, rc): assert isinstance(cl, client.Client) @@ -526,3 +548,142 @@ def on_disconnect(cl, userdata, rc): packet_in = fake_broker.receive_packet(1) assert not packet_in # Check connection is closed + + def test_callback_v2_mqtt3(self, fake_broker): + callback_called = [] + mqttc = client.Client( + CallbackAPIVersion.VERSION2, + "client-id", + userdata=callback_called, + ) + + def on_connect(cl, userdata, flags, reason, properties): + assert isinstance(cl, client.Client) + assert isinstance(flags, client.ConnectFlags) + assert isinstance(reason, ReasonCodes) + assert isinstance(properties, Properties) + assert reason == 0 + assert properties.isEmpty() + userdata.append("on_connect") + cl.subscribe([("topic", 0)]) + + def on_subscribe(cl, userdata, mid, reason_code_list, properties): + assert isinstance(cl, client.Client) + assert isinstance(mid, int) + assert isinstance(reason_code_list, list) + assert isinstance(reason_code_list[0], ReasonCodes) + assert isinstance(properties, Properties) + assert properties.isEmpty() + userdata.append("on_subscribe") + cl.publish("topic", "payload", 2) + + def on_publish(cl, userdata, mid, reason_code, properties): + assert isinstance(cl, client.Client) + assert isinstance(mid, int) + assert isinstance(reason_code, ReasonCodes) + assert isinstance(properties, Properties) + assert properties.isEmpty() + userdata.append("on_publish") + + def on_message(cl, userdata, message): + assert isinstance(cl, client.Client) + assert isinstance(message, client.MQTTMessage) + userdata.append("on_message") + cl.unsubscribe("topic") + + def on_unsubscribe(cl, userdata, mid, reason_code_list, properties): + assert isinstance(cl, client.Client) + assert isinstance(mid, int) + assert isinstance(reason_code_list, list) + assert len(reason_code_list) == 0 + assert isinstance(properties, Properties) + assert properties.isEmpty() + userdata.append("on_unsubscribe") + cl.disconnect() + + def on_disconnect(cl, userdata, flags, reason_code, properties): + assert isinstance(cl, client.Client) + assert isinstance(flags, client.DisconnectFlags) + assert isinstance(reason_code, ReasonCodes) + assert isinstance(properties, Properties) + assert properties.isEmpty() + userdata.append("on_disconnect") + + mqttc.on_connect = on_connect + mqttc.on_subscribe = on_subscribe + mqttc.on_publish = on_publish + mqttc.on_message = on_message + mqttc.on_unsubscribe = on_unsubscribe + mqttc.on_disconnect = on_disconnect + + mqttc.enable_logger() + mqttc.connect_async("localhost", fake_broker.port) + mqttc.loop_start() + + try: + fake_broker.start() + + connect_packet = paho_test.gen_connect( + "client-id", keepalive=60) + fake_broker.expect_packet("connect", connect_packet) + + connack_packet = paho_test.gen_connack(rc=0) + count = fake_broker.send_packet(connack_packet) + assert count # Check connection was not closed + assert count == len(connack_packet) + + subscribe_packet = paho_test.gen_subscribe(1, "topic", 0) + fake_broker.expect_packet("subscribe", subscribe_packet) + + suback_packet = paho_test.gen_suback(1, 0) + count = fake_broker.send_packet(suback_packet) + assert count # Check connection was not closed + assert count == len(suback_packet) + + publish_packet = paho_test.gen_publish("topic", 2, "payload", mid=2) + fake_broker.expect_packet("publish", publish_packet) + + pubrec_packet = paho_test.gen_pubrec(mid=2) + count = fake_broker.send_packet(pubrec_packet) + assert count # Check connection was not closed + assert count == len(pubrec_packet) + + pubrel_packet = paho_test.gen_pubrel(mid=2) + fake_broker.expect_packet("pubrel", pubrel_packet) + + pubcomp_packet = paho_test.gen_pubcomp(mid=2) + count = fake_broker.send_packet(pubcomp_packet) + assert count # Check connection was not closed + assert count == len(pubcomp_packet) + + publish_from_broker_packet = paho_test.gen_publish("topic", qos=0, payload="payload", mid=99) + count = fake_broker.send_packet(publish_from_broker_packet) + assert count # Check connection was not closed + assert count == len(publish_from_broker_packet) + + unsubscribe_packet = paho_test.gen_unsubscribe(mid=3, topic="topic") + fake_broker.expect_packet("unsubscribe", unsubscribe_packet) + + suback_packet = paho_test.gen_unsuback(mid=3) + count = fake_broker.send_packet(suback_packet) + assert count # Check connection was not closed + assert count == len(suback_packet) + + disconnect_packet = paho_test.gen_disconnect() + fake_broker.expect_packet("disconnect", disconnect_packet) + + assert callback_called == [ + "on_connect", + "on_subscribe", + "on_publish", + "on_message", + "on_unsubscribe", + "on_disconnect", + ] + + finally: + mqttc.disconnect() + mqttc.loop_stop() + + packet_in = fake_broker.receive_packet(1) + assert not packet_in # Check connection is closed diff --git a/tests/test_mqttv5.py b/tests/test_mqttv5.py index a09800e3..29bc2feb 100644 --- a/tests/test_mqttv5.py +++ b/tests/test_mqttv5.py @@ -25,6 +25,7 @@ import paho.mqtt import paho.mqtt.client +from paho.mqtt.enums import CallbackAPIVersion from paho.mqtt.packettypes import PacketTypes from paho.mqtt.properties import Properties from paho.mqtt.subscribeoptions import SubscribeOptions @@ -116,8 +117,11 @@ def register(self, client): def cleanRetained(port): callback = Callbacks() - curclient = paho.mqtt.client.Client(b"clean retained", - protocol=paho.mqtt.client.MQTTv5) + curclient = paho.mqtt.client.Client( + CallbackAPIVersion.VERSION1, + b"clean retained", + protocol=paho.mqtt.client.MQTTv5, + ) curclient.loop_start() callback.register(curclient) curclient.connect(host="localhost", port=port) @@ -139,8 +143,11 @@ def cleanup(port): clientids = ("aclient", "bclient") for clientid in clientids: - curclient = paho.mqtt.client.Client(clientid.encode( - "utf-8"), protocol=paho.mqtt.client.MQTTv5) + curclient = paho.mqtt.client.Client( + CallbackAPIVersion.VERSION1, + clientid.encode("utf-8"), + protocol=paho.mqtt.client.MQTTv5, + ) curclient.loop_start() curclient.connect(host="localhost", port=port, clean_start=True) time.sleep(.1) @@ -189,10 +196,10 @@ def setUpClass(cls): #aclient = mqtt_client.Client(b"\xEF\xBB\xBF" + "myclientid".encode("utf-8")) #aclient = mqtt_client.Client("myclientid".encode("utf-8")) - aclient = paho.mqtt.client.Client(b"aclient", protocol=paho.mqtt.client.MQTTv5) + aclient = paho.mqtt.client.Client(CallbackAPIVersion.VERSION1, b"aclient", protocol=paho.mqtt.client.MQTTv5) callback.register(aclient) - bclient = paho.mqtt.client.Client(b"bclient", protocol=paho.mqtt.client.MQTTv5) + bclient = paho.mqtt.client.Client(CallbackAPIVersion.VERSION1, b"bclient", protocol=paho.mqtt.client.MQTTv5) callback2.register(bclient) @classmethod @@ -324,7 +331,7 @@ def test_zero_length_clientid(self): callback0 = Callbacks() - client0 = paho.mqtt.client.Client(protocol=paho.mqtt.client.MQTTv5) + client0 = paho.mqtt.client.Client(CallbackAPIVersion.VERSION1, protocol=paho.mqtt.client.MQTTv5) callback0.register(client0) client0.loop_start() # should not be rejected @@ -336,7 +343,7 @@ def test_zero_length_clientid(self): client0.disconnect() client0.loop_stop() - client0 = paho.mqtt.client.Client(protocol=paho.mqtt.client.MQTTv5) + client0 = paho.mqtt.client.Client(CallbackAPIVersion.VERSION1, protocol=paho.mqtt.client.MQTTv5) callback0.register(client0) client0.loop_start() client0.connect(host="localhost", port=self._test_broker_port) # should work @@ -349,7 +356,8 @@ def test_zero_length_clientid(self): # when we supply a client id, we should not get one assigned client0 = paho.mqtt.client.Client( - "client0", protocol=paho.mqtt.client.MQTTv5) + CallbackAPIVersion.VERSION1, "client0", protocol=paho.mqtt.client.MQTTv5, + ) callback0.register(client0) client0.loop_start() client0.connect(host="localhost", port=self._test_broker_port) # should work @@ -367,7 +375,8 @@ def test_offline_message_queueing(self): clientid = b"offline message queueing" oclient = paho.mqtt.client.Client( - clientid, protocol=paho.mqtt.client.MQTTv5) + CallbackAPIVersion.VERSION1, clientid, protocol=paho.mqtt.client.MQTTv5, + ) ocallback.register(oclient) connect_properties = Properties(PacketTypes.CONNECT) connect_properties.SessionExpiryInterval = 99999 @@ -390,7 +399,8 @@ def test_offline_message_queueing(self): bclient.loop_stop() oclient = paho.mqtt.client.Client( - clientid, protocol=paho.mqtt.client.MQTTv5) + CallbackAPIVersion.VERSION1, clientid, protocol=paho.mqtt.client.MQTTv5, + ) ocallback.register(oclient) oclient.loop_start() oclient.connect(host="localhost", port=self._test_broker_port, clean_start=False) @@ -412,7 +422,8 @@ def test_overlapping_subscriptions(self): clientid = b"overlapping subscriptions" oclient = paho.mqtt.client.Client( - clientid, protocol=paho.mqtt.client.MQTTv5) + CallbackAPIVersion.VERSION1, clientid, protocol=paho.mqtt.client.MQTTv5, + ) ocallback.register(oclient) oclient.loop_start() @@ -447,7 +458,8 @@ def test_subscribe_failure(self): ocallback = Callbacks() clientid = b"subscribe failure" oclient = paho.mqtt.client.Client( - clientid, protocol=paho.mqtt.client.MQTTv5) + CallbackAPIVersion.VERSION1, clientid, protocol=paho.mqtt.client.MQTTv5, + ) ocallback.register(oclient) oclient.loop_start() oclient.connect(host="localhost", port=self._test_broker_port) @@ -493,8 +505,11 @@ def test_unsubscribe(self): def new_client(self, clientid): callback = Callbacks() - client = paho.mqtt.client.Client(clientid.encode( - "utf-8"), protocol=paho.mqtt.client.MQTTv5) + client = paho.mqtt.client.Client( + CallbackAPIVersion.VERSION1, + clientid.encode("utf-8"), + protocol=paho.mqtt.client.MQTTv5, + ) callback.register(client) client.loop_start() return client, callback diff --git a/tests/test_websocket_integration.py b/tests/test_websocket_integration.py index 80872f9e..b44f3475 100644 --- a/tests/test_websocket_integration.py +++ b/tests/test_websocket_integration.py @@ -48,6 +48,7 @@ def test_unexpected_response(self, proto_ver, proto_name, fake_websocket_broker) """ Server responds with a valid code, but it's not what the client expected """ mqttc = client.Client( + client.CallbackAPIVersion.VERSION1, "test_unexpected_response", protocol=proto_ver, transport="websockets" @@ -92,6 +93,7 @@ def test_no_upgrade(self, proto_ver, proto_name, fake_websocket_broker, """ Server doesn't respond with 'connection: upgrade' """ mqttc = client.Client( + client.CallbackAPIVersion.VERSION1, "test_no_upgrade", protocol=proto_ver, transport="websockets" @@ -110,6 +112,7 @@ def test_bad_secret_key(self, proto_ver, proto_name, fake_websocket_broker, """ Server doesn't give anything after connection: upgrade """ mqttc = client.Client( + client.CallbackAPIVersion.VERSION1, "test_bad_secret_key", protocol=proto_ver, transport="websockets" @@ -168,6 +171,7 @@ def test_successful_connection(self, proto_ver, proto_name, """ Connect successfully, on correct path """ mqttc = client.Client( + client.CallbackAPIVersion.VERSION1, "test_successful_connection", protocol=proto_ver, transport="websockets" @@ -190,6 +194,7 @@ def test_correct_path(self, proto_ver, proto_name, fake_websocket_broker, """ Make sure it can connect on user specified paths """ mqttc = client.Client( + client.CallbackAPIVersion.VERSION1, "test_correct_path", protocol=proto_ver, transport="websockets" @@ -225,6 +230,7 @@ def test_correct_auth(self, proto_ver, proto_name, fake_websocket_broker, """ Make sure it sends the right auth headers """ mqttc = client.Client( + client.CallbackAPIVersion.VERSION1, "test_correct_path", protocol=proto_ver, transport="websockets"