diff --git a/fedn/network/combiner/aggregators/fedavg.py b/fedn/network/combiner/aggregators/fedavg.py index 9ed0adf3c..0d965cfa0 100644 --- a/fedn/network/combiner/aggregators/fedavg.py +++ b/fedn/network/combiner/aggregators/fedavg.py @@ -1,9 +1,11 @@ +import traceback + from fedn.common.log_config import logger from fedn.network.combiner.aggregators.aggregatorbase import AggregatorBase class Aggregator(AggregatorBase): - """ Local SGD / Federated Averaging (FedAvg) aggregator. Computes a weighted mean + """Local SGD / Federated Averaging (FedAvg) aggregator. Computes a weighted mean of parameter updates. :param id: A reference to id of :class: `fedn.network.combiner.Combiner` @@ -48,8 +50,7 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): nr_aggregated_models = 0 total_examples = 0 - logger.info( - "AGGREGATOR({}): Aggregating model updates... ".format(self.name)) + logger.info("AGGREGATOR({}): Aggregating model updates... ".format(self.name)) while not self.model_updates.empty(): try: @@ -61,8 +62,7 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): logger.info("AGGREGATOR({}): Loading model metadata {}.".format(self.name, model_update.model_update_id)) model_next, metadata = self.load_model_update(model_update, helper) - logger.info( - "AGGREGATOR({}): Processing model update {}, metadata: {} ".format(self.name, model_update.model_update_id, metadata)) + logger.info("AGGREGATOR({}): Processing model update {}, metadata: {} ".format(self.name, model_update.model_update_id, metadata)) # Increment total number of examples total_examples += metadata["num_examples"] @@ -70,19 +70,18 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): if nr_aggregated_models == 0: model = model_next else: - model = helper.increment_average( - model, model_next, metadata["num_examples"], total_examples) + model = helper.increment_average(model, model_next, metadata["num_examples"], total_examples) nr_aggregated_models += 1 # Delete model from storage if delete_models: self.modelservice.temp_model_storage.delete(model_update.model_update_id) - logger.info( - "AGGREGATOR({}): Deleted model update {} from storage.".format(self.name, model_update.model_update_id)) + logger.info("AGGREGATOR({}): Deleted model update {} from storage.".format(self.name, model_update.model_update_id)) self.model_updates.task_done() except Exception as e: - logger.error( - "AGGREGATOR({}): Error encoutered while processing model update {}, skipping this update.".format(self.name, e)) + tb = traceback.format_exc() + logger.error(f"AGGREGATOR({self.name}): Error encoutered while processing model update: {e}") + logger.error(tb) self.model_updates.task_done() data["nr_aggregated_models"] = nr_aggregated_models