Skip to content

Commit

Permalink
sweep: #8025 explicitly disconnect Stomp before reconnecting
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen authored and web-flow committed Feb 3, 2025
1 parent d9da8cc commit a4914ca
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 2 deletions.
34 changes: 32 additions & 2 deletions src/DIRAC/Resources/MessageQueue/Simple/StompInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,32 @@
from DIRAC import gConfig
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue, returnValueOrRaise

# Setting for the reconnection handling by stomp interface.
# See e.g. the description of Transport class in
# https://github.com/jasonrbriggs/stomp.py/blob/master/stomp/transport.py

RECONNECT_SLEEP_INITIAL = 1 # [s] Initial delay before reattempting to establish a connection.
RECONNECT_SLEEP_INCREASE = 0.5 # Factor by which sleep delay is increased 0.5 means increase by 50%.
RECONNECT_SLEEP_MAX = 120 # [s] The maximum delay that can be reached independent of increasing procedure.
RECONNECT_SLEEP_JITTER = 0.1 # Random factor to add. 0.1 means a random number from 0 to 10% of the current time.
RECONNECT_ATTEMPTS_MAX = 1e4 # Maximum attempts to reconnect.

OUTGOING_HEARTBEAT_MS = 15_000
INCOMING_HEARTBEAT_MS = 15_000
STOMP_TIMEOUT = 60


DEFAULT_CONNECTION_KWARGS = {
"keepalive": True,
"timeout": STOMP_TIMEOUT,
"heartbeats": (OUTGOING_HEARTBEAT_MS, INCOMING_HEARTBEAT_MS),
"reconnect_sleep_initial": RECONNECT_SLEEP_INITIAL,
"reconnect_sleep_increase": RECONNECT_SLEEP_INCREASE,
"reconnect_sleep_max": RECONNECT_SLEEP_MAX,
"reconnect_sleep_jitter": RECONNECT_SLEEP_JITTER,
"reconnect_attempts_max": RECONNECT_ATTEMPTS_MAX,
}


def _resolve_brokers(alias: str, port: int, ipv4Only: bool = False, ipv6Only: bool = False) -> list[tuple[str, int]]:
"""
Expand Down Expand Up @@ -207,7 +233,9 @@ def _connectAndSubscribe(
as a callback to the reconnect listener
"""

# We need to explicitely call disconnect to avoid leaving
# threads behind
conn.disconnect()
conn.connect(username=username, passcode=password, wait=True)
for dest in destinations:
subscribtionID = getSubscriptionID(broker, dest)
Expand Down Expand Up @@ -283,6 +311,7 @@ def __init__(self, host: str, port: int, username: str, password: str, destinati
:param kwargs: given to ~stomp.Connection constructor
"""
brokers = _resolve_brokers(host, port)

super().__init__(brokers, *args, **kwargs)

self.connect(username, password, True)
Expand All @@ -306,6 +335,7 @@ def send(self, body, **kwargs):
try:
super().send(self._destination, body, **kwargs)
except stomp.exception.StompException:
self.disconnect()
self.connect(self._username, self._password, True)
else:
return True
Expand Down Expand Up @@ -400,6 +430,6 @@ def createProducer(
raise ValueError("There should be exactly one destination given in parameter or in the CS")
destination = csDestinations[0]

producer = StompProducer(host, port, username, password, destination)
producer = StompProducer(host, port, username, password, destination, **DEFAULT_CONNECTION_KWARGS)

return producer
3 changes: 3 additions & 0 deletions src/DIRAC/Resources/MessageQueue/StompMQConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ def connect(self, parameters=None):

for _ in range(10):
try:
# We need to explicitely call disconnect to avoid leaving
# threads behind
self.connection.disconnect()
self.connection.connect(username=user, passcode=password, wait=True)

if self.connection.is_connected():
Expand Down

0 comments on commit a4914ca

Please sign in to comment.