diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index 691b592d5..ba7aabf9d 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -94,14 +94,14 @@ def __init__(self, config): combiner_config = self.assign() self.connect(combiner_config) - self._initialize_dispatcher(config) + self._initialize_dispatcher(self.config) self._initialize_helper(combiner_config) if not self.helper: logger.warning("Failed to retrieve helper class settings: {}".format( combiner_config)) - self._subscribe_to_combiner(config) + self._subscribe_to_combiner(self.config) self.state = ClientState.idle @@ -686,12 +686,6 @@ def process_request(self): except grpc.RpcError as e: logger.critical(f"GRPC process_request: An error occurred during process request: {e}") - def _handle_combiner_failure(self): - """ Register failed combiner connection.""" - self._missed_heartbeat += 1 - if self._missed_heartbeat > self.config['reconnect_after_missed_heartbeat']: - self.disconnect() - def _send_heartbeat(self, update_frequency=2.0): """Send a heartbeat to the combiner. @@ -709,14 +703,18 @@ def _send_heartbeat(self, update_frequency=2.0): except grpc.RpcError as e: status_code = e.code() if status_code == grpc.StatusCode.UNAVAILABLE: - logger.warning("GRPC hearbeat: server unavailable during send heartbeat. Retrying.") + self._missed_heartbeat += 1 + logger.error("GRPC hearbeat: combiner unavailable, retrying (attempt {}/{}).".format(self._missed_heartbeat, + self.config['reconnect_after_missed_heartbeat'])) + if self._missed_heartbeat > self.config['reconnect_after_missed_heartbeat']: + self.disconnect() if status_code == grpc.StatusCode.UNAUTHENTICATED: details = e.details() if details == 'Token expired': - logger.warning("GRPC hearbeat: Token expired. Reconnecting.") - self.detach() + logger.error("GRPC hearbeat: Token expired. Disconnecting.") + self.disconnect() + sys.exit("Unauthorized. Token expired. Please obtain a new token.") logger.debug(e) - self._handle_combiner_failure() time.sleep(update_frequency) if not self._connected: @@ -780,11 +778,13 @@ def run(self): if self.state != old_state: logger.info("Client in {} state.".format(ClientStateToString(self.state))) if not self._connected: - logger.info("Detached from combiner.") - # TODO: Implement a check/condition to ulitmately close down if too many reattachment attepts have failed. s - self.attach() + logger.warning("Client lost connection to combiner. Attempting to reconnect to FEDn network.") + combiner_config = self.assign() + self.connect(combiner_config) self._subscribe_to_combiner(self.config) + cnt = 0 if self.error_state: - return + logger.error("Client in error state. Terminiating.") + sys.exit("Client in error state. Terminiating.") except KeyboardInterrupt: logger.info("Shutting down.")