From 9e835992cbe2f23c9de29405416fddf2a083c39e Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Mon, 29 Jan 2024 21:08:47 +0100 Subject: [PATCH] Use logging module for controller --- examples/README.md | 5 +- examples/async-simulation/Experiment.ipynb | 4 +- fedn/fedn/network/combiner/roundhandler.py | 32 +++--- fedn/fedn/network/controller/control.py | 112 ++++++-------------- fedn/fedn/network/controller/controlbase.py | 36 ++----- 5 files changed, 59 insertions(+), 130 deletions(-) diff --git a/examples/README.md b/examples/README.md index 337a047c8..52adf95e2 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,8 +1,9 @@ ## Examples -The examples distributed here in this folder are regularly maintained by Scaleout and are part of the continuous integrations tests. +The examples distributed here in this folder are maintained by Scaleout. ### External examples -Below we maintain a list of examples provided both by the Scaleout core team and users. They may or may not be tested with the latest release of FEDn, please refer to the README of each specific project/example for detail. If you have a project that you want to include in this list, talk to a core developer in [Discord](https://discord.gg/CCRgjpMsVA). +Below we maintain a list of examples provided both by the Scaleout core team and users. They may or may not be tested with the latest release of FEDn, please refer to the README of each specific project/example for detail. +If you have a project that you want to include in this list, talk to a core developer in [Discord](https://discord.gg/CCRgjpMsVA). - [C++ version of the MNIST example](https://github.com/scaleoutsystems/examples) Also executes in Intel SGX TEE via Gramine. - [Human activity recognition use case from the FEDn paper](https://github.com/scaleoutsystems/examples) IoT/cross-device. diff --git a/examples/async-simulation/Experiment.ipynb b/examples/async-simulation/Experiment.ipynb index 12a9aee42..f8ef5583a 100644 --- a/examples/async-simulation/Experiment.ipynb +++ b/examples/async-simulation/Experiment.ipynb @@ -80,14 +80,14 @@ }, { "cell_type": "code", - "execution_count": 70, + "execution_count": 74, "id": "f0380d35", "metadata": {}, "outputs": [], "source": [ "session_config_fedavg = {\n", " \"helper\": \"numpyhelper\",\n", - " \"session_id\": \"experiment_fedavg4\",\n", + " \"session_id\": \"experiment_fedavg6\",\n", " \"aggregator\": \"fedavg\",\n", " \"model_id\": seed_model['model_id'],\n", " \"rounds\": 1,\n", diff --git a/fedn/fedn/network/combiner/roundhandler.py b/fedn/fedn/network/combiner/roundhandler.py index 416f12188..3956858e0 100644 --- a/fedn/fedn/network/combiner/roundhandler.py +++ b/fedn/fedn/network/combiner/roundhandler.py @@ -54,8 +54,7 @@ def push_round_config(self, round_config): round_config['_job_id'] = str(uuid.uuid4()) self.round_configs.put(round_config) except Exception: - logger.warning( - "ROUNDCONTROL: Failed to push round config.") + logger.warning("Failed to push round config.") raise return round_config['_job_id'] @@ -100,9 +99,7 @@ def load_model_update_str(self, model_id, retry=3): while tries < retry: tries += 1 if not model_str or sys.getsizeof(model_str) == 80: - logger.warning( - "ROUNDCONTROL: Model download failed. retrying") - + logger.warning("Model download failed. retrying") time.sleep(1) model_str = self.modelservice.get_model(model_id) @@ -170,7 +167,7 @@ def _training_round(self, config, clients): try: helper = get_helper(config['helper_type']) - logger.info("ROUNDCONTROL: Config delete_models_storage: {}".format(config['delete_models_storage'])) + logger.info("Config delete_models_storage: {}".format(config['delete_models_storage'])) if config['delete_models_storage'] == 'True': delete_models = True else: @@ -209,9 +206,9 @@ def stage_model(self, model_id, timeout_retry=3, retry=2): # If the model is already in memory at the server we do not need to do anything. if self.modelservice.temp_model_storage.exist(model_id): - logger.info("ROUNDCONTROL: Model already exists in memory, skipping model staging.") + logger.info("Model already exists in memory, skipping model staging.") return - logger.info("ROUNDCONTROL: Model Staging, fetching model from storage...") + logger.info("Model Staging, fetching model from storage...") # If not, download it and stage it in memory at the combiner. tries = 0 while True: @@ -220,12 +217,11 @@ def stage_model(self, model_id, timeout_retry=3, retry=2): if model: break except Exception: - logger.info("ROUNDCONTROL: Could not fetch model from storage backend, retrying.") + logger.info("Could not fetch model from storage backend, retrying.") time.sleep(timeout_retry) tries += 1 if tries > retry: - logger.info( - "ROUNDCONTROL: Failed to stage model {} from storage backend!".format(model_id)) + logger.info("Failed to stage model {} from storage backend!".format(model_id)) raise self.modelservice.set_model(model, model_id) @@ -246,8 +242,7 @@ def _assign_round_clients(self, n, type="trainers"): elif type == "trainers": clients = self.server.get_active_trainers() else: - logger.info( - "ROUNDCONTROL(ERROR): {} is not a supported type of client".format(type)) + logger.error("(ERROR): {} is not a supported type of client".format(type)) raise # If the number of requested trainers exceeds the number of available, use all available. @@ -315,8 +310,7 @@ def execute_training_round(self, config): :rtype: dict """ - logger.info( - "ROUNDCONTROL: Processing training round, job_id {}".format(config['_job_id'])) + logger.info("Processing training round, job_id {}".format(config['_job_id'])) data = {} data['config'] = config @@ -341,7 +335,7 @@ def execute_training_round(self, config): data['model_id'] = model_id logger.info( - "ROUNDCONTROL: TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id'])) + "TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id'])) # Delete temp model self.modelservice.temp_model_storage.delete(config['model_id']) @@ -374,14 +368,12 @@ def run(self, polling_interval=1.0): elif round_config['task'] == 'validation' or round_config['task'] == 'inference': self.execute_validation_round(round_config) else: - logger.warning( - "ROUNDCONTROL: Round config contains unkown task type.") + logger.warning("config contains unkown task type.") else: round_meta = {} round_meta['status'] = "Failed" round_meta['reason'] = "Failed to meet client allocation requirements for this round config." - logger.warning( - "ROUNDCONTROL: {0}".format(round_meta['reason'])) + logger.warning("{0}".format(round_meta['reason'])) self.round_configs.task_done() except queue.Empty: diff --git a/fedn/fedn/network/controller/control.py b/fedn/fedn/network/controller/control.py index a2ff6f4cc..255e77b2e 100644 --- a/fedn/fedn/network/controller/control.py +++ b/fedn/fedn/network/controller/control.py @@ -91,14 +91,11 @@ def session(self, config): """ if self._state == ReducerState.instructing: - print( - "Controller already in INSTRUCTING state. A session is in progress.", - flush=True, - ) + logger.info("Controller already in INSTRUCTING state. A session is in progress.") return if not self.statestore.get_latest_model(): - print("No model in model chain, please provide a seed model!") + logger.info("No model in model chain, please provide a seed model!") return self._state = ReducerState.instructing @@ -107,11 +104,6 @@ def session(self, config): ) self.create_session(config) - if not self.statestore.get_latest_model(): - print( - "No model in model chain, please provide a seed model!", - flush=True, - ) self._state = ReducerState.monitoring last_round = int(self.get_latest_round_id()) @@ -130,17 +122,9 @@ def session(self, config): try: _, round_data = self.round(config, str(current_round)) except TypeError as e: - print( - "Could not unpack data from round: {0}".format(e), - flush=True, - ) - - print( - "CONTROL: Round completed with status {}".format( - round_data["status"] - ), - flush=True, - ) + logger.error("Failed to execute round: {0}".format(e)) + + logger.info("Round completed with status {}".format(round_data["status"])) config["model_id"] = self.statestore.get_latest_model() @@ -160,7 +144,7 @@ def round(self, session_config, round_id): self.create_round({'round_id': round_id, 'status': "Pending"}) if len(self.network.get_combiners()) < 1: - print("CONTROLLER: Round cannot start, no combiners connected!", flush=True) + logger.warning("Round cannot start, no combiners connected!", flush=True) self.set_round_status(round_id, 'Failed') return None, self.statestore.get_round(round_id) @@ -179,10 +163,10 @@ def round(self, session_config, round_id): round_start = self.evaluate_round_start_policy(participating_combiners) if round_start: - print("CONTROL: round start policy met, {} participating combiners.".format( - len(participating_combiners)), flush=True) + logger.info("round start policy met, {} participating combiners.".format( + len(participating_combiners))) else: - print("CONTROL: Round start policy not met, skipping round!", flush=True) + logger.warning("Round start policy not met, skipping round!") self.set_round_status(round_id, 'Failed') return None, self.statestore.get_round(round_id) @@ -193,22 +177,21 @@ def round(self, session_config, round_id): # Wait until participating combiners have produced an updated global model, # or round times out. def do_if_round_times_out(result): - print("CONTROL: Round timed out!", flush=True) + logger.warning("Round timed out!") - @retry(wait=wait_random(min=1.0, max=2.0), - stop=stop_after_delay(session_config['round_timeout']), - retry_error_callback=do_if_round_times_out, - retry=retry_if_exception_type(CombinersNotDoneException)) + @ retry(wait=wait_random(min=1.0, max=2.0), + stop=stop_after_delay(session_config['round_timeout']), + retry_error_callback=do_if_round_times_out, + retry=retry_if_exception_type(CombinersNotDoneException)) def combiners_done(): round = self.statestore.get_round(round_id) if 'combiners' not in round: - # TODO: use logger - print("CONTROL: Waiting for combiners to update model...", flush=True) + logger.info("Waiting for combiners to update model...") raise CombinersNotDoneException("Combiners have not yet reported.") if len(round['combiners']) < len(participating_combiners): - print("CONTROL: Waiting for combiners to update model...", flush=True) + logger.info("Waiting for combiners to update model...") raise CombinersNotDoneException("All combiners have not yet reported.") return True @@ -218,8 +201,8 @@ def combiners_done(): # Due to the distributed nature of the computation, there might be a # delay before combiners have reported the round data to the db, # so we need some robustness here. - @retry(wait=wait_random(min=0.1, max=1.0), - retry=retry_if_exception_type(KeyError)) + @ retry(wait=wait_random(min=0.1, max=1.0), + retry=retry_if_exception_type(KeyError)) def check_combiners_done_reporting(): round = self.statestore.get_round(round_id) combiners = round['combiners'] @@ -230,30 +213,26 @@ def check_combiners_done_reporting(): round = self.statestore.get_round(round_id) round_valid = self.evaluate_round_validity_policy(round) if not round_valid: - print("REDUCER CONTROL: Round invalid!", flush=True) + logger.error("Round failed. Invalid - evaluate_round_validity_policy: False") self.set_round_status(round_id, 'Failed') return None, self.statestore.get_round(round_id) - print("CONTROL: Reducing combiner level models...", flush=True) + logger.info("Reducing combiner level models...") # Reduce combiner models into a new global model round_data = {} try: round = self.statestore.get_round(round_id) model, data = self.reduce(round['combiners']) round_data['reduce'] = data - print("CONTROL: Done reducing models from combiners!", flush=True) + logger.info("Done reducing models from combiners!") except Exception as e: - print("CONTROL: Failed to reduce models from combiners: {}".format( - e), flush=True) + logger.error("Failed to reduce models from combiners, reason: {}".format(e)) self.set_round_status(round_id, 'Failed') return None, self.statestore.get_round(round_id) # Commit the new global model to the model trail if model is not None: - print( - "CONTROL: Committing global model to model trail...", - flush=True, - ) + logger.info("Committing global model to model trail...") tic = time.time() model_id = uuid.uuid4() session_id = ( @@ -263,17 +242,9 @@ def check_combiners_done_reporting(): ) self.commit(model_id, model, session_id) round_data["time_commit"] = time.time() - tic - print( - "CONTROL: Done committing global model to model trail!", - flush=True, - ) + logger.info("Done committing global model to model trail.") else: - print( - "REDUCER: failed to update model in round with config {}".format( - session_config - ), - flush=True, - ) + logger.error("Failed to commit model to global model trail.") self.set_round_status(round_id, 'Failed') return None, self.statestore.get_round(round_id) @@ -293,12 +264,7 @@ def check_combiners_done_reporting(): for combiner, combiner_config in validating_combiners: try: - print( - "CONTROL: Submitting validation round to combiner {}".format( - combiner - ), - flush=True, - ) + logger.info("Submitting validation round to combiner {}".format(combiner)) combiner.submit(combiner_config) except CombinerUnavailableError: self._handle_unavailable_combiner(combiner) @@ -322,10 +288,6 @@ def reduce(self, combiners): i = 1 model = None - # Check if there are any combiners to reduce - if len(combiners) == 0: - print("REDUCER: No combiners to reduce!", flush=True) - return model, meta for combiner in combiners: name = combiner['name'] @@ -348,7 +310,7 @@ def reduce(self, combiners): model_next = load_model_from_BytesIO(data, helper) meta["time_load_model"] += time.time() - tic tic = time.time() - model = helper.increment_average(model, model_next, i, i) + model = helper.increment_average(model, model_next, 1.0, i) meta["time_aggregate_model"] += time.time() - tic except Exception: tic = time.time() @@ -368,13 +330,13 @@ def infer_instruct(self, config): # Check/set instucting state if self.__state == ReducerState.instructing: - print("Already set in INSTRUCTING state", flush=True) + logger.info("Already set in INSTRUCTING state") return self.__state = ReducerState.instructing # Check for a model chain if not self.statestore.latest_model(): - print("No model in model chain, please seed the alliance!") + logger.info("No model in model chain, please seed the alliance!") # Set reducer in monitoring state self.__state = ReducerState.monitoring @@ -383,7 +345,7 @@ def infer_instruct(self, config): try: self.inference_round(config) except TypeError: - print("Could not unpack data from round...", flush=True) + logger.error("Round failed.") # Set reducer in idle state self.__state = ReducerState.idle @@ -399,7 +361,7 @@ def inference_round(self, config): # Check for at least one combiner in statestore if len(self.network.get_combiners()) < 1: - print("REDUCER: No combiners connected!") + logger.warning("REDUCER: No combiners connected!") return round_data # Setup combiner configuration @@ -415,17 +377,9 @@ def inference_round(self, config): # Test round start policy round_start = self.check_round_start_policy(validating_combiners) if round_start: - print( - "CONTROL: round start policy met, participating combiners {}".format( - validating_combiners - ), - flush=True, - ) + logger.info("Round start policy met, participating combiners {}".format(validating_combiners)) else: - print( - "CONTROL: Round start policy not met, skipping round!", - flush=True, - ) + logger.warning("Round start policy not met, skipping round!") return None # Synch combiners with latest model and trigger inference diff --git a/fedn/fedn/network/controller/controlbase.py b/fedn/fedn/network/controller/controlbase.py index 2e17d0b97..cc5416fde 100644 --- a/fedn/fedn/network/controller/controlbase.py +++ b/fedn/fedn/network/controller/controlbase.py @@ -4,6 +4,7 @@ from time import sleep import fedn.utils.helpers.helpers +from fedn.common.log_config import logger from fedn.network.api.network import Network from fedn.network.combiner.interfaces import CombinerUnavailableError from fedn.network.state import ReducerState @@ -50,19 +51,13 @@ def __init__(self, statestore): if storage_config: not_ready = False else: - print( - "REDUCER CONTROL: Storage backend not configured, waiting...", - flush=True, - ) + logger.info("Storage backend not configured, waiting...") sleep(5) tries += 1 if tries > MAX_TRIES_BACKEND: raise Exception except Exception: - print( - "REDUCER CONTROL: Failed to retrive storage configuration, exiting.", - flush=True, - ) + logger.error("Failed to retrive storage configuration, exiting.") raise MisconfiguredStorageBackend() if storage_config["storage_type"] == "S3": @@ -70,10 +65,7 @@ def __init__(self, statestore): storage_config["storage_config"] ) else: - print( - "REDUCER CONTROL: Unsupported storage backend, exiting.", - flush=True, - ) + logger.error("Unsupported storage backend, exiting.") raise UnsupportedStorageBackend() if self.statestore.is_inited(): @@ -163,10 +155,7 @@ def get_compute_package_name(self): package_name = definition["storage_file_name"] return package_name except (IndexError, KeyError): - print( - "No context filename set for compute context definition", - flush=True, - ) + logger.error("No context filename set for compute context definition") return None else: return None @@ -260,24 +249,17 @@ def commit(self, model_id, model=None, session_id=None): helper = self.get_helper() if model is not None: - print( - "CONTROL: Saving model file temporarily to disk...", flush=True - ) + logger.info("Saving model file temporarily to disk...") outfile_name = helper.save(model) - print("CONTROL: Uploading model to Minio...", flush=True) + logger.info("CONTROL: Uploading model to Minio...") model_id = self.model_repository.set_model( outfile_name, is_file=True ) - print("CONTROL: Deleting temporary model file...", flush=True) + logger.info("CONTROL: Deleting temporary model file...") os.unlink(outfile_name) - print( - "CONTROL: Committing model {} to global model trail in statestore...".format( - model_id - ), - flush=True, - ) + logger.info("Committing model {} to global model trail in statestore...".format(model_id)) self.statestore.set_latest_model(model_id, session_id) def get_combiner(self, name):