-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/SK-732 | Introduces FedYogi and FedAdaGrad aggregators #580
Changes from 19 commits
dd7f228
b26e0c9
589fbed
d4653de
421de7b
86931c4
8a6f65d
551b731
a83eae5
67018dd
805159d
90fb195
9ec1d02
5f48bd0
eee7766
2b0e148
8df629b
14eedaf
5de2c11
3fd66ff
5d14a09
fc5c145
bc32810
6367552
f91f4a8
bc693cb
a2a55ed
a270530
bac12d2
356be6a
25b7499
2c1b794
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ Aggregators | |
|
||
Overview | ||
--------- | ||
Aggregators handle combinations of model updates received by the combiner into a combiner-level global model. | ||
Aggregators are responsible for combining client model updates into a combiner-level global model. | ||
During a training session, the combiners will instantiate an Aggregator and use it to process the incoming model updates from clients. | ||
|
||
.. image:: img/aggregators.png | ||
|
@@ -21,20 +21,19 @@ As multiple clients submit updates, the aggregation queue accumulates. Once spec | |
begins processing the queue, aggregating models according to the specifics of the scheme (e.g., FedAvg, FedAdam). | ||
|
||
|
||
Using different aggregators | ||
---------------------------- | ||
Using built-in Aggregators | ||
-------------------------- | ||
|
||
FEDn supports the following aggregation algorithms: | ||
|
||
- FedAvg (default) | ||
- FedAdam | ||
- FedYogi | ||
- FedAdaGrad | ||
- FedAdam (FedOpt) | ||
- FedYogi (FedOpt) | ||
- FedAdaGrad (FedOpt) | ||
|
||
The implementation of the methods from the FedOpt family follows the implemenation in this paper: https://arxiv.org/pdf/2003.00295.pdf | ||
|
||
|
||
Training sessions can be configured to use a given aggregator method. For example, to use FedAdam: | ||
Training sessions can be configured to use a given aggregator. For example, to use FedAdam: | ||
|
||
.. code:: python | ||
|
||
|
@@ -49,49 +48,71 @@ Training sessions can be configured to use a given aggregator method. For exampl | |
"beta2": 0.99, | ||
"tau": 1e-4 | ||
}, | ||
"model_id": seed_model['model'], | ||
"rounds": 10 | ||
} | ||
|
||
result_fedadam = client.start_session(**session_config) | ||
|
||
.. note:: | ||
|
||
The FedOpt methods use server-side momentum. FEDn resets the aggregator for each new session. This means that momentum terms | ||
will also be reset, i.e. the history will be forgotten. When using FedAdam, FedYogi and FedAdaGrad, the user needs to strike a | ||
The FedOpt family of methods use server-side momentum. FEDn resets the aggregator for each new session. | ||
This means that the history will will also be reset, i.e. the momentum terms will be forgotten. | ||
When using FedAdam, FedYogi and FedAdaGrad, the user needs to strike a | ||
balance between the number of rounds in the session from a convergence and utility perspective. | ||
|
||
.. note:: | ||
|
||
The parameter ``aggregator_kwargs`` are hyperparameters for the FedOpt family aggregators. The | ||
data types for these parameters (str, float) are validated by the aggregator and an error | ||
will be issued if passing parameter values of incompatible type. All hyperparameters are | ||
given above for completeness. It is primarily the ``learning_rate`` that will require tuning. | ||
|
||
Several additional parameters that guide general behavior of the aggregation flow can be configured: | ||
|
||
- Round timeout: The maximal time the combiner waits before processing the update queue. | ||
- Buffer size: The maximal allowed length of the queue before processing it. | ||
- Whether to retain or delete model update files after they have been processed (default is to delete them) | ||
|
||
See API documenation for | ||
|
||
Extending FEDn with new aggregators | ||
Extending FEDn with new Aggregators | ||
----------------------------------- | ||
|
||
A developer can extend FEDn with his/her own Aggregator(s) by implementing the interface specified in | ||
:py:mod:`fedn.network.combiner.aggregators.aggregatorbase.AggregatorBase`. The developer implements two following methods: | ||
:py:mod:`fedn.network.combiner.aggregators.aggregatorbase.AggregatorBase`. This involes implementing the two methods: | ||
|
||
- ``on_model_update`` (optional) | ||
- ``combine_models`` | ||
- ``on_model_update`` (perform model update validation before update is placed on queue, optional) | ||
- ``combine_models`` (process the queue and aggregate updates) | ||
|
||
**on_model_update** | ||
|
||
The on_model_update has access to the complete model update including the metadata passed on by the clients (as specified in the training entrypoint, | ||
see compute package). The base class implements a default callback that checks that all metadata assumed by the aggregation algorithms FedAvg and FedOpt | ||
is present in the metadata. However, the callback could also be used to implement custom preprocessing and additional checks including strategies | ||
The ``on_model_update`` callback recieves the model update messages from clients (including all metadata) and can be used to perform validation and | ||
potential transformation of the model update before it is places on the aggregation queue (see image above). | ||
The base class implements a default callback that checks that all metadata assumed by the aggregation algorithms FedAvg and FedOpt | ||
is available. The callback could also be used to implement custom pre-processing and additional checks including strategies | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. new line should not be here? |
||
to filter out updates that are suspected to be corrupted or malicious. | ||
|
||
**combine_models** | ||
|
||
This method is responsible for processing the model update queue and in doing so produce an aggregated model. This is the main extension point where the numerical detail of the aggregation scheme is implemented. The best way to understand how to implement this methods is to study the already implemented algorithms: | ||
When a certain criteria is met, e.g. if all clients have sent updates, or the round has times out, the ``combine_model_update`` method | ||
processes the model update queue, producing an aggregated model. This is the main extension point where the | ||
Wrede marked this conversation as resolved.
Show resolved
Hide resolved
|
||
numerical details of the aggregation scheme is implemented. The best way to understand how to implement this method | ||
is to study the built-in aggregation algorithms: | ||
|
||
- :py:mod:`fedn.network.combiner.aggregators.fedavg` (weighted average of parameters) | ||
- :py:mod:`fedn.network.combiner.aggregators.fedopt` (compute pseudo-gradients and apply a server-side optmizer) | ||
|
||
To add an aggregator plugin ``myaggregator``, the developer implements the interface and places a file called ‘myaggregator.py’ in the folder ‘fedn.network.combiner.aggregators’. | ||
This extension can then simply be called as such: | ||
|
||
- :py:mod:`fedn.network.combiner.aggregators.fedavg` | ||
- :py:mod:`fedn.network.combiner.aggregators.fedopt` | ||
.. code:: python | ||
|
||
session_config = { | ||
"helper": "numpyhelper", | ||
"id": "experiment_myaggregator", | ||
"aggregator": "myaggregator", | ||
"rounds": 10 | ||
} | ||
|
||
To add an aggregator plugin ``myaggregator``, the developer implements the interface and places a file called ‘myaggregator.py’ in the folder ‘fedn.network.combiner.aggregators’. | ||
result_myaggregator = client.start_session(**session_config) | ||
|
||
|
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
import ast | ||
import math | ||
|
||
from fedn.common.exceptions import InvalidParameterError | ||
from fedn.common.log_config import logger | ||
from fedn.network.combiner.aggregators.aggregatorbase import AggregatorBase | ||
|
||
|
@@ -10,8 +10,12 @@ class Aggregator(AggregatorBase): | |
|
||
Implmentation following: https://arxiv.org/pdf/2003.00295.pdf | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. class name should be somethiing more related to FedOpt? like FedOptAggregator? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, this is how the plugin architecture is implemented, it has to be called Aggregator. |
||
|
||
Aggregate pseudo gradients computed by subtracting the model | ||
This aggregator computes pseudo gradients by subtracting the model | ||
update from the global model weights from the previous round. | ||
A server-side scheme is then applied, currenty supported schemes | ||
are "adam", "yogi", "adagrad". | ||
|
||
|
||
|
||
:param id: A reference to id of :class: `fedn.network.combiner.Combiner` | ||
:type id: str | ||
|
@@ -34,16 +38,7 @@ def __init__(self, storage, server, modelservice, round_handler): | |
self.v = None | ||
self.m = None | ||
Wrede marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Server side default hyperparameters. Note that these may need fine tuning. | ||
self.default_params = { | ||
'serveropt': 'adam', | ||
'learning_rate': 1e-3, | ||
'beta1': 0.9, | ||
'beta2': 0.99, | ||
'tau': 1e-4, | ||
} | ||
|
||
def combine_models(self, helper=None, delete_models=True, params=None): | ||
def combine_models(self, helper=None, delete_models=True, parameters=None): | ||
"""Compute pseudo gradients using model updates in the queue. | ||
|
||
:param helper: An instance of :class: `fedn.utils.helpers.helpers.HelperBase`, ML framework specific helper, defaults to None | ||
|
@@ -54,24 +49,55 @@ def combine_models(self, helper=None, delete_models=True, params=None): | |
:type max_nr_models: int, optional | ||
:param delete_models: Delete models from storage after aggregation, defaults to True | ||
:type delete_models: bool, optional | ||
:param params: Additional key-word arguments. | ||
:type params: dict | ||
:param parameters: Aggregator hyperparameters. | ||
:type parameters: `fedn.utils.parmeters.Parameters`, optional | ||
:return: The global model and metadata | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. :class: then reference to the class |
||
:rtype: tuple | ||
""" | ||
|
||
params = ast.literal_eval(params) | ||
data = {} | ||
data['time_model_load'] = 0.0 | ||
data['time_model_aggregation'] = 0.0 | ||
|
||
# Override default hyperparameters: | ||
if params: | ||
for key, value in self.default_params.items(): | ||
if key not in params: | ||
params[key] = value | ||
# Define parameter schema | ||
parameter_schema = { | ||
'serveropt': str, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would have this a required attribute of AggregatorBase, even if it can be empty There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's think about this for a bit longer, I think we should discuss this whole Parameter thing in more depth and make sure we have something that we want to use across the whole code-base. |
||
'learning_rate': float, | ||
'beta1': float, | ||
'beta2': float, | ||
'tau': float, | ||
} | ||
|
||
try: | ||
parameters.validate(parameter_schema) | ||
except InvalidParameterError as e: | ||
logger.error("Aggregator {} recieved invalid parameters. Reason {}".format(self.name, e)) | ||
return None, data | ||
|
||
# Default hyperparameters. Note that these may need fine tuning. | ||
default_parameters = { | ||
'serveropt': 'adam', | ||
'learning_rate': 1e-3, | ||
'beta1': 0.9, | ||
'beta2': 0.99, | ||
'tau': 1e-4, | ||
} | ||
|
||
# Validate parameters | ||
if parameters: | ||
try: | ||
parameters.validate(parameter_schema) | ||
except InvalidParameterError as e: | ||
logger.error("Aggregator {} recieved invalid parameters. Reason {}".format(self.name, e)) | ||
return None, data | ||
else: | ||
params = self.default_params | ||
logger.info("Aggregator {} using default parameteres.", format(self.name)) | ||
parameters = self.default_parameters | ||
|
||
# Override missing paramters with defaults | ||
for key, value in default_parameters.items(): | ||
if key not in parameters: | ||
parameters[key] = value | ||
|
||
model = None | ||
Wrede marked this conversation as resolved.
Show resolved
Hide resolved
|
||
nr_aggregated_models = 0 | ||
|
@@ -114,12 +140,12 @@ def combine_models(self, helper=None, delete_models=True, params=None): | |
"AGGREGATOR({}): Error encoutered while processing model update {}, skipping this update.".format(self.name, e)) | ||
self.model_updates.task_done() | ||
|
||
if params['serveropt'] == 'adam': | ||
model = self.serveropt_adam(helper, pseudo_gradient, model_old, params) | ||
elif params['serveropt'] == 'yogi': | ||
model = self.serveropt_yogi(helper, pseudo_gradient, model_old, params) | ||
elif params['serveropt'] == 'adagrad': | ||
model = self.serveropt_adagrad(helper, pseudo_gradient, model_old, params) | ||
if parameters['serveropt'] == 'adam': | ||
model = self.serveropt_adam(helper, pseudo_gradient, model_old, parameters) | ||
elif parameters['serveropt'] == 'yogi': | ||
model = self.serveropt_yogi(helper, pseudo_gradient, model_old, parameters) | ||
elif parameters['serveropt'] == 'adagrad': | ||
model = self.serveropt_adagrad(helper, pseudo_gradient, model_old, parameters) | ||
else: | ||
logger.error("Unsupported server optimizer passed to FedOpt.") | ||
return | ||
|
@@ -129,7 +155,7 @@ def combine_models(self, helper=None, delete_models=True, params=None): | |
logger.info("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models)) | ||
return model, data | ||
|
||
def serveropt_adam(self, helper, pseudo_gradient, model_old, params): | ||
def serveropt_adam(self, helper, pseudo_gradient, model_old, parameters): | ||
""" Server side optimization, FedAdam. | ||
|
||
:param helper: instance of helper class. | ||
|
@@ -138,15 +164,15 @@ def serveropt_adam(self, helper, pseudo_gradient, model_old, params): | |
:type pseudo_gradient: As defined by helper. | ||
:param model_old: The current global model. | ||
:type model_old: As defined in helper. | ||
:param params: Hyperparamters for the aggregator. | ||
:type params: dict | ||
:param parameters: Hyperparamters for the aggregator. | ||
:type parameters: dict | ||
:return: new model weights. | ||
:rtype: as defined by helper. | ||
""" | ||
beta1 = params['beta1'] | ||
beta2 = params['beta2'] | ||
learning_rate = params['learning_rate'] | ||
tau = params['tau'] | ||
beta1 = parameters['beta1'] | ||
beta2 = parameters['beta2'] | ||
learning_rate = parameters['learning_rate'] | ||
tau = parameters['tau'] | ||
|
||
if not self.v: | ||
self.v = helper.ones(pseudo_gradient, math.pow(tau, 2)) | ||
|
@@ -165,7 +191,7 @@ def serveropt_adam(self, helper, pseudo_gradient, model_old, params): | |
|
||
return model | ||
|
||
def serveropt_yogi(self, helper, pseudo_gradient, model_old, params): | ||
def serveropt_yogi(self, helper, pseudo_gradient, model_old, parameters): | ||
""" Server side optimization, FedYogi. | ||
|
||
:param helper: instance of helper class. | ||
|
@@ -174,16 +200,16 @@ def serveropt_yogi(self, helper, pseudo_gradient, model_old, params): | |
:type pseudo_gradient: As defined by helper. | ||
:param model_old: The current global model. | ||
:type model_old: As defined in helper. | ||
:param params: Hyperparamters for the aggregator. | ||
:type params: dict | ||
:param parameters: Hyperparamters for the aggregator. | ||
:type parameters: dict | ||
:return: new model weights. | ||
:rtype: as defined by helper. | ||
""" | ||
|
||
beta1 = params['beta1'] | ||
beta2 = params['beta2'] | ||
learning_rate = params['learning_rate'] | ||
tau = params['tau'] | ||
beta1 = parameters['beta1'] | ||
beta2 = parameters['beta2'] | ||
learning_rate = parameters['learning_rate'] | ||
tau = parameters['tau'] | ||
|
||
if not self.v: | ||
self.v = helper.ones(pseudo_gradient, math.pow(tau, 2)) | ||
|
@@ -204,7 +230,7 @@ def serveropt_yogi(self, helper, pseudo_gradient, model_old, params): | |
|
||
return model | ||
|
||
def serveropt_adagrad(self, helper, pseudo_gradient, model_old, params): | ||
def serveropt_adagrad(self, helper, pseudo_gradient, model_old, parameters): | ||
""" Server side optimization, FedAdam. | ||
|
||
:param helper: instance of helper class. | ||
|
@@ -213,15 +239,15 @@ def serveropt_adagrad(self, helper, pseudo_gradient, model_old, params): | |
:type pseudo_gradient: As defined by helper. | ||
:param model_old: The current global model. | ||
:type model_old: As defined in helper. | ||
:param params: Hyperparamters for the aggregator. | ||
:type params: dict | ||
:param parameters: Hyperparamters for the aggregator. | ||
:type parameters: dict | ||
:return: new model weights. | ||
:rtype: as defined by helper. | ||
""" | ||
|
||
beta1 = params['beta1'] | ||
learning_rate = params['learning_rate'] | ||
tau = params['tau'] | ||
beta1 = parameters['beta1'] | ||
learning_rate = parameters['learning_rate'] | ||
tau = parameters['tau'] | ||
|
||
if not self.v: | ||
self.v = helper.ones(pseudo_gradient, math.pow(tau, 2)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo, "placed"