From a4914cabbf08e31ac790ff07e908fe8b7ce69948 Mon Sep 17 00:00:00 2001 From: chaen Date: Mon, 3 Feb 2025 18:59:50 +0100 Subject: [PATCH] sweep: #8025 explicitly disconnect Stomp before reconnecting --- .../MessageQueue/Simple/StompInterface.py | 34 +++++++++++++++++-- .../MessageQueue/StompMQConnector.py | 3 ++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/DIRAC/Resources/MessageQueue/Simple/StompInterface.py b/src/DIRAC/Resources/MessageQueue/Simple/StompInterface.py index 526df164f1f..dbc7de010e1 100644 --- a/src/DIRAC/Resources/MessageQueue/Simple/StompInterface.py +++ b/src/DIRAC/Resources/MessageQueue/Simple/StompInterface.py @@ -10,6 +10,32 @@ from DIRAC import gConfig from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue, returnValueOrRaise +# Setting for the reconnection handling by stomp interface. +# See e.g. the description of Transport class in +# https://github.com/jasonrbriggs/stomp.py/blob/master/stomp/transport.py + +RECONNECT_SLEEP_INITIAL = 1 # [s] Initial delay before reattempting to establish a connection. +RECONNECT_SLEEP_INCREASE = 0.5 # Factor by which sleep delay is increased 0.5 means increase by 50%. +RECONNECT_SLEEP_MAX = 120 # [s] The maximum delay that can be reached independent of increasing procedure. +RECONNECT_SLEEP_JITTER = 0.1 # Random factor to add. 0.1 means a random number from 0 to 10% of the current time. +RECONNECT_ATTEMPTS_MAX = 1e4 # Maximum attempts to reconnect. + +OUTGOING_HEARTBEAT_MS = 15_000 +INCOMING_HEARTBEAT_MS = 15_000 +STOMP_TIMEOUT = 60 + + +DEFAULT_CONNECTION_KWARGS = { + "keepalive": True, + "timeout": STOMP_TIMEOUT, + "heartbeats": (OUTGOING_HEARTBEAT_MS, INCOMING_HEARTBEAT_MS), + "reconnect_sleep_initial": RECONNECT_SLEEP_INITIAL, + "reconnect_sleep_increase": RECONNECT_SLEEP_INCREASE, + "reconnect_sleep_max": RECONNECT_SLEEP_MAX, + "reconnect_sleep_jitter": RECONNECT_SLEEP_JITTER, + "reconnect_attempts_max": RECONNECT_ATTEMPTS_MAX, +} + def _resolve_brokers(alias: str, port: int, ipv4Only: bool = False, ipv6Only: bool = False) -> list[tuple[str, int]]: """ @@ -207,7 +233,9 @@ def _connectAndSubscribe( as a callback to the reconnect listener """ - + # We need to explicitely call disconnect to avoid leaving + # threads behind + conn.disconnect() conn.connect(username=username, passcode=password, wait=True) for dest in destinations: subscribtionID = getSubscriptionID(broker, dest) @@ -283,6 +311,7 @@ def __init__(self, host: str, port: int, username: str, password: str, destinati :param kwargs: given to ~stomp.Connection constructor """ brokers = _resolve_brokers(host, port) + super().__init__(brokers, *args, **kwargs) self.connect(username, password, True) @@ -306,6 +335,7 @@ def send(self, body, **kwargs): try: super().send(self._destination, body, **kwargs) except stomp.exception.StompException: + self.disconnect() self.connect(self._username, self._password, True) else: return True @@ -400,6 +430,6 @@ def createProducer( raise ValueError("There should be exactly one destination given in parameter or in the CS") destination = csDestinations[0] - producer = StompProducer(host, port, username, password, destination) + producer = StompProducer(host, port, username, password, destination, **DEFAULT_CONNECTION_KWARGS) return producer diff --git a/src/DIRAC/Resources/MessageQueue/StompMQConnector.py b/src/DIRAC/Resources/MessageQueue/StompMQConnector.py index 4f00f7bc18f..aeec67b0309 100644 --- a/src/DIRAC/Resources/MessageQueue/StompMQConnector.py +++ b/src/DIRAC/Resources/MessageQueue/StompMQConnector.py @@ -197,6 +197,9 @@ def connect(self, parameters=None): for _ in range(10): try: + # We need to explicitely call disconnect to avoid leaving + # threads behind + self.connection.disconnect() self.connection.connect(username=user, passcode=password, wait=True) if self.connection.is_connected():