From 8e937f662777b76f877e671ca4c303b4731b756b Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Mon, 8 Apr 2024 13:11:50 +0200 Subject: [PATCH 1/5] Increase the time between client active status reports --- fedn/fedn/network/clients/client.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index 691b592d5..8f2c6d99b 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -773,16 +773,18 @@ def run(self): cnt = 0 old_state = self.state while True: - time.sleep(1) + time.sleep(5) if cnt == 0: logger.info("Client is active, waiting for model update requests.") cnt = 1 if self.state != old_state: logger.info("Client in {} state.".format(ClientStateToString(self.state))) if not self._connected: - logger.info("Detached from combiner.") - # TODO: Implement a check/condition to ulitmately close down if too many reattachment attepts have failed. s - self.attach() + logger.warning("Detached from combiner.") + # TODO: Implement a check/condition to ulitmately close down if too many reattachment attepts have failed. + # Attach to the FEDn network (get combiner) + combiner_config = self.assign() + self.connect(combiner_config) self._subscribe_to_combiner(self.config) if self.error_state: return From daf7ef10c5103a1a5298b0c3b6fd49493cae0fab Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Mon, 8 Apr 2024 13:12:45 +0200 Subject: [PATCH 2/5] Reverted that change --- fedn/fedn/network/clients/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index 8f2c6d99b..de55ade72 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -773,7 +773,7 @@ def run(self): cnt = 0 old_state = self.state while True: - time.sleep(5) + time.sleep(1) if cnt == 0: logger.info("Client is active, waiting for model update requests.") cnt = 1 From 95c0f51d03508c30ce3c9969d4625d47a498e324 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Mon, 8 Apr 2024 13:27:48 +0200 Subject: [PATCH 3/5] Disconnect if the reason for missed hearbeat is expired token --- fedn/fedn/network/clients/client.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index de55ade72..21015b5e2 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -710,13 +710,14 @@ def _send_heartbeat(self, update_frequency=2.0): status_code = e.code() if status_code == grpc.StatusCode.UNAVAILABLE: logger.warning("GRPC hearbeat: server unavailable during send heartbeat. Retrying.") + self._handle_combiner_failure() if status_code == grpc.StatusCode.UNAUTHENTICATED: details = e.details() if details == 'Token expired': - logger.warning("GRPC hearbeat: Token expired. Reconnecting.") + logger.warning("GRPC hearbeat: Token expired. Disconnecting.") self.detach() + exit(0) logger.debug(e) - self._handle_combiner_failure() time.sleep(update_frequency) if not self._connected: @@ -780,13 +781,15 @@ def run(self): if self.state != old_state: logger.info("Client in {} state.".format(ClientStateToString(self.state))) if not self._connected: - logger.warning("Detached from combiner.") + reconnection_attempt = 1 + logger.warning("Client is not connected to combiner. Attempting to reconnect (attempt={})".format(reconnection_attempt)) # TODO: Implement a check/condition to ulitmately close down if too many reattachment attepts have failed. # Attach to the FEDn network (get combiner) combiner_config = self.assign() self.connect(combiner_config) self._subscribe_to_combiner(self.config) if self.error_state: + logger.error("Client in error state. Terminiating.") return except KeyboardInterrupt: logger.info("Shutting down.") From a6796943926b1f5e76deb5fe72c2a0ead6972ff7 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Mon, 8 Apr 2024 13:45:10 +0200 Subject: [PATCH 4/5] Logging and error messages --- fedn/fedn/network/clients/client.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index 21015b5e2..a7f1c45f7 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -94,14 +94,14 @@ def __init__(self, config): combiner_config = self.assign() self.connect(combiner_config) - self._initialize_dispatcher(config) + self._initialize_dispatcher(self.config) self._initialize_helper(combiner_config) if not self.helper: logger.warning("Failed to retrieve helper class settings: {}".format( combiner_config)) - self._subscribe_to_combiner(config) + self._subscribe_to_combiner(self.config) self.state = ClientState.idle @@ -715,8 +715,8 @@ def _send_heartbeat(self, update_frequency=2.0): details = e.details() if details == 'Token expired': logger.warning("GRPC hearbeat: Token expired. Disconnecting.") - self.detach() - exit(0) + self.disconnect() + sys.exit("Unauthorized. Token expired. Please obtain a new token.") logger.debug(e) time.sleep(update_frequency) @@ -781,15 +781,13 @@ def run(self): if self.state != old_state: logger.info("Client in {} state.".format(ClientStateToString(self.state))) if not self._connected: - reconnection_attempt = 1 - logger.warning("Client is not connected to combiner. Attempting to reconnect (attempt={})".format(reconnection_attempt)) - # TODO: Implement a check/condition to ulitmately close down if too many reattachment attepts have failed. - # Attach to the FEDn network (get combiner) + logger.warning("Client lost connection to combiner. Attempting to reconnect to FEDn network (attempt={})".format(reconnection_attempt)) combiner_config = self.assign() self.connect(combiner_config) self._subscribe_to_combiner(self.config) + cnt = 0 if self.error_state: logger.error("Client in error state. Terminiating.") - return + sys.exit("Client in error state. Terminiating.") except KeyboardInterrupt: logger.info("Shutting down.") From 13f7d01b49c0e19866a9e78ed5ca497e27ae176e Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Mon, 8 Apr 2024 14:17:54 +0200 Subject: [PATCH 5/5] Bugfix --- fedn/fedn/network/clients/client.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index a7f1c45f7..ba7aabf9d 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -686,12 +686,6 @@ def process_request(self): except grpc.RpcError as e: logger.critical(f"GRPC process_request: An error occurred during process request: {e}") - def _handle_combiner_failure(self): - """ Register failed combiner connection.""" - self._missed_heartbeat += 1 - if self._missed_heartbeat > self.config['reconnect_after_missed_heartbeat']: - self.disconnect() - def _send_heartbeat(self, update_frequency=2.0): """Send a heartbeat to the combiner. @@ -709,12 +703,15 @@ def _send_heartbeat(self, update_frequency=2.0): except grpc.RpcError as e: status_code = e.code() if status_code == grpc.StatusCode.UNAVAILABLE: - logger.warning("GRPC hearbeat: server unavailable during send heartbeat. Retrying.") - self._handle_combiner_failure() + self._missed_heartbeat += 1 + logger.error("GRPC hearbeat: combiner unavailable, retrying (attempt {}/{}).".format(self._missed_heartbeat, + self.config['reconnect_after_missed_heartbeat'])) + if self._missed_heartbeat > self.config['reconnect_after_missed_heartbeat']: + self.disconnect() if status_code == grpc.StatusCode.UNAUTHENTICATED: details = e.details() if details == 'Token expired': - logger.warning("GRPC hearbeat: Token expired. Disconnecting.") + logger.error("GRPC hearbeat: Token expired. Disconnecting.") self.disconnect() sys.exit("Unauthorized. Token expired. Please obtain a new token.") logger.debug(e) @@ -781,7 +778,7 @@ def run(self): if self.state != old_state: logger.info("Client in {} state.".format(ClientStateToString(self.state))) if not self._connected: - logger.warning("Client lost connection to combiner. Attempting to reconnect to FEDn network (attempt={})".format(reconnection_attempt)) + logger.warning("Client lost connection to combiner. Attempting to reconnect to FEDn network.") combiner_config = self.assign() self.connect(combiner_config) self._subscribe_to_combiner(self.config)