Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/SK-732 | Introduces FedYogi and FedAdaGrad aggregators #580

Merged
merged 32 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
dd7f228
Removed unused file
Mar 25, 2024
b26e0c9
Add fedadagrad and fedyogi
Mar 25, 2024
589fbed
Work in progress
Mar 26, 2024
d4653de
latest
Mar 27, 2024
421de7b
resolved conflict
Apr 11, 2024
86931c4
Fixed serialization issue, ast
Apr 12, 2024
8a6f65d
Merge branch 'master' into feature/SK-732
Apr 13, 2024
551b731
Passing hyperparameters to aggregators works
Apr 13, 2024
a83eae5
Added sign method to helper
Apr 16, 2024
67018dd
Moved notebooks to dedicated folder in example root
Apr 16, 2024
805159d
Merge branch 'master' of https://github.com/scaleoutsystems/fedn into…
Apr 18, 2024
90fb195
Updated docs with example of how to use aggregators
Apr 21, 2024
9ec1d02
Merge branch 'master' into feature/SK-732
Apr 26, 2024
5f48bd0
Remover print statement
Apr 26, 2024
eee7766
Check aggregator_kwargs exists
Apr 27, 2024
2b0e148
Added tests
Apr 29, 2024
8df629b
Added tests
Apr 29, 2024
14eedaf
Changed paramter class
Apr 29, 2024
5de2c11
updated method naming
Apr 29, 2024
3fd66ff
combine_model validates parameters
Apr 29, 2024
5d14a09
Updated notebook with reasonable learning rate for adagrad
Apr 29, 2024
fc5c145
latest
Apr 29, 2024
bc32810
Added parameters module
Apr 29, 2024
6367552
Code checks
Apr 29, 2024
f91f4a8
logger info default parameters.
Apr 29, 2024
bc693cb
Fixed aggregators.rst
Apr 29, 2024
a2a55ed
Clarified aggregator docs
Apr 29, 2024
a270530
Fixed typo
Apr 29, 2024
bac12d2
Fixed typo
Apr 29, 2024
356be6a
Added note about hyperparamer validation
Apr 29, 2024
25b7499
Updated docs and notebook
Apr 29, 2024
2c1b794
Minor fixes after review
Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 44 additions & 23 deletions docs/aggregators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Aggregators

Overview
---------
Aggregators handle combinations of model updates received by the combiner into a combiner-level global model.
Aggregators are responsible for combining client model updates into a combiner-level global model.
During a training session, the combiners will instantiate an Aggregator and use it to process the incoming model updates from clients.

.. image:: img/aggregators.png
Expand All @@ -21,20 +21,19 @@ As multiple clients submit updates, the aggregation queue accumulates. Once spec
begins processing the queue, aggregating models according to the specifics of the scheme (e.g., FedAvg, FedAdam).


Using different aggregators
----------------------------
Using built-in Aggregators
--------------------------

FEDn supports the following aggregation algorithms:

- FedAvg (default)
- FedAdam
- FedYogi
- FedAdaGrad
- FedAdam (FedOpt)
- FedYogi (FedOpt)
- FedAdaGrad (FedOpt)

The implementation of the methods from the FedOpt family follows the implemenation in this paper: https://arxiv.org/pdf/2003.00295.pdf


Training sessions can be configured to use a given aggregator method. For example, to use FedAdam:
Training sessions can be configured to use a given aggregator. For example, to use FedAdam:

.. code:: python

Expand All @@ -49,49 +48,71 @@ Training sessions can be configured to use a given aggregator method. For exampl
"beta2": 0.99,
"tau": 1e-4
},
"model_id": seed_model['model'],
"rounds": 10
}

result_fedadam = client.start_session(**session_config)

.. note::

The FedOpt methods use server-side momentum. FEDn resets the aggregator for each new session. This means that momentum terms
will also be reset, i.e. the history will be forgotten. When using FedAdam, FedYogi and FedAdaGrad, the user needs to strike a
The FedOpt family of methods use server-side momentum. FEDn resets the aggregator for each new session.
This means that the history will will also be reset, i.e. the momentum terms will be forgotten.
When using FedAdam, FedYogi and FedAdaGrad, the user needs to strike a
balance between the number of rounds in the session from a convergence and utility perspective.

.. note::

The parameter ``aggregator_kwargs`` are hyperparameters for the FedOpt family aggregators. The
data types for these parameters (str, float) are validated by the aggregator and an error
will be issued if passing parameter values of incompatible type. All hyperparameters are
given above for completeness. It is primarily the ``learning_rate`` that will require tuning.

Several additional parameters that guide general behavior of the aggregation flow can be configured:

- Round timeout: The maximal time the combiner waits before processing the update queue.
- Buffer size: The maximal allowed length of the queue before processing it.
- Whether to retain or delete model update files after they have been processed (default is to delete them)

See API documenation for

Extending FEDn with new aggregators
Extending FEDn with new Aggregators
-----------------------------------

A developer can extend FEDn with his/her own Aggregator(s) by implementing the interface specified in
:py:mod:`fedn.network.combiner.aggregators.aggregatorbase.AggregatorBase`. The developer implements two following methods:
:py:mod:`fedn.network.combiner.aggregators.aggregatorbase.AggregatorBase`. This involes implementing the two methods:

- ``on_model_update`` (optional)
- ``combine_models``
- ``on_model_update`` (perform model update validation before update is placed on queue, optional)
- ``combine_models`` (process the queue and aggregate updates)

**on_model_update**

The on_model_update has access to the complete model update including the metadata passed on by the clients (as specified in the training entrypoint,
see compute package). The base class implements a default callback that checks that all metadata assumed by the aggregation algorithms FedAvg and FedOpt
is present in the metadata. However, the callback could also be used to implement custom preprocessing and additional checks including strategies
The ``on_model_update`` callback recieves the model update messages from clients (including all metadata) and can be used to perform validation and
potential transformation of the model update before it is places on the aggregation queue (see image above).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo, "placed"

The base class implements a default callback that checks that all metadata assumed by the aggregation algorithms FedAvg and FedOpt
is available. The callback could also be used to implement custom pre-processing and additional checks including strategies
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new line should not be here?

to filter out updates that are suspected to be corrupted or malicious.

**combine_models**

This method is responsible for processing the model update queue and in doing so produce an aggregated model. This is the main extension point where the numerical detail of the aggregation scheme is implemented. The best way to understand how to implement this methods is to study the already implemented algorithms:
When a certain criteria is met, e.g. if all clients have sent updates, or the round has times out, the ``combine_model_update`` method
processes the model update queue, producing an aggregated model. This is the main extension point where the
numerical details of the aggregation scheme is implemented. The best way to understand how to implement this method
is to study the built-in aggregation algorithms:

- :py:mod:`fedn.network.combiner.aggregators.fedavg` (weighted average of parameters)
- :py:mod:`fedn.network.combiner.aggregators.fedopt` (compute pseudo-gradients and apply a server-side optmizer)

To add an aggregator plugin ``myaggregator``, the developer implements the interface and places a file called ‘myaggregator.py’ in the folder ‘fedn.network.combiner.aggregators’.
This extension can then simply be called as such:

- :py:mod:`fedn.network.combiner.aggregators.fedavg`
- :py:mod:`fedn.network.combiner.aggregators.fedopt`
.. code:: python

session_config = {
"helper": "numpyhelper",
"id": "experiment_myaggregator",
"aggregator": "myaggregator",
"rounds": 10
}

To add an aggregator plugin ``myaggregator``, the developer implements the interface and places a file called ‘myaggregator.py’ in the folder ‘fedn.network.combiner.aggregators’.
result_myaggregator = client.start_session(**session_config)


107 changes: 49 additions & 58 deletions examples/notebooks/Aggregators.ipynb

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions fedn/fedn/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ class ModelError(BaseException):

class InvalidClientConfig(BaseException):
pass


class InvalidParameterError(BaseException):
pass
6 changes: 3 additions & 3 deletions fedn/fedn/network/combiner/aggregators/aggregatorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, storage, server, modelservice, round_handler):
self.model_updates = queue.Queue()

@abstractmethod
def combine_models(self, nr_expected_models=None, nr_required_models=1, helper=None, timeout=180, delete_models=True, params=None):
def combine_models(self, nr_expected_models=None, nr_required_models=1, helper=None, timeout=180, delete_models=True, parameters=None):
"""Routine for combining model updates. Implemented in subclass.

:param nr_expected_models: Number of expected models. If None, wait for all models.
Expand All @@ -47,8 +47,8 @@ def combine_models(self, nr_expected_models=None, nr_required_models=1, helper=N
:type timeout: int
:param delete_models: Delete client models after combining.
:type delete_models: bool
:param params: Additional key-word arguments.
:type params: dict
:param parameters: Additional key-word arguments.
:type parameters: dict
:return: The global model and metadata
:rtype: tuple
"""
Expand Down
2 changes: 1 addition & 1 deletion fedn/fedn/network/combiner/aggregators/fedavg.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, storage, server, modelservice, round_handler):

self.name = "fedavg"

def combine_models(self, helper=None, delete_models=True, params=None):
def combine_models(self, helper=None, delete_models=True, parameters=None):
"""Aggregate all model updates in the queue by computing an incremental
weighted average of model parameters.

Expand Down
120 changes: 73 additions & 47 deletions fedn/fedn/network/combiner/aggregators/fedopt.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import ast
import math

from fedn.common.exceptions import InvalidParameterError
from fedn.common.log_config import logger
from fedn.network.combiner.aggregators.aggregatorbase import AggregatorBase

Expand All @@ -10,8 +10,12 @@ class Aggregator(AggregatorBase):

Implmentation following: https://arxiv.org/pdf/2003.00295.pdf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class name should be somethiing more related to FedOpt? like FedOptAggregator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is how the plugin architecture is implemented, it has to be called Aggregator.


Aggregate pseudo gradients computed by subtracting the model
This aggregator computes pseudo gradients by subtracting the model
update from the global model weights from the previous round.
A server-side scheme is then applied, currenty supported schemes
are "adam", "yogi", "adagrad".



:param id: A reference to id of :class: `fedn.network.combiner.Combiner`
:type id: str
Expand All @@ -34,16 +38,7 @@ def __init__(self, storage, server, modelservice, round_handler):
self.v = None
self.m = None

# Server side default hyperparameters. Note that these may need fine tuning.
self.default_params = {
'serveropt': 'adam',
'learning_rate': 1e-3,
'beta1': 0.9,
'beta2': 0.99,
'tau': 1e-4,
}

def combine_models(self, helper=None, delete_models=True, params=None):
def combine_models(self, helper=None, delete_models=True, parameters=None):
"""Compute pseudo gradients using model updates in the queue.

:param helper: An instance of :class: `fedn.utils.helpers.helpers.HelperBase`, ML framework specific helper, defaults to None
Expand All @@ -54,24 +49,55 @@ def combine_models(self, helper=None, delete_models=True, params=None):
:type max_nr_models: int, optional
:param delete_models: Delete models from storage after aggregation, defaults to True
:type delete_models: bool, optional
:param params: Additional key-word arguments.
:type params: dict
:param parameters: Aggregator hyperparameters.
:type parameters: `fedn.utils.parmeters.Parameters`, optional
:return: The global model and metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:class: then reference to the class

:rtype: tuple
"""

params = ast.literal_eval(params)
data = {}
data['time_model_load'] = 0.0
data['time_model_aggregation'] = 0.0

# Override default hyperparameters:
if params:
for key, value in self.default_params.items():
if key not in params:
params[key] = value
# Define parameter schema
parameter_schema = {
'serveropt': str,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have this a required attribute of AggregatorBase, even if it can be empty

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's think about this for a bit longer, I think we should discuss this whole Parameter thing in more depth and make sure we have something that we want to use across the whole code-base.

'learning_rate': float,
'beta1': float,
'beta2': float,
'tau': float,
}

try:
parameters.validate(parameter_schema)
except InvalidParameterError as e:
logger.error("Aggregator {} recieved invalid parameters. Reason {}".format(self.name, e))
return None, data

# Default hyperparameters. Note that these may need fine tuning.
default_parameters = {
'serveropt': 'adam',
'learning_rate': 1e-3,
'beta1': 0.9,
'beta2': 0.99,
'tau': 1e-4,
}

# Validate parameters
if parameters:
try:
parameters.validate(parameter_schema)
except InvalidParameterError as e:
logger.error("Aggregator {} recieved invalid parameters. Reason {}".format(self.name, e))
return None, data
else:
params = self.default_params
logger.info("Aggregator {} using default parameteres.", format(self.name))
parameters = self.default_parameters

# Override missing paramters with defaults
for key, value in default_parameters.items():
if key not in parameters:
parameters[key] = value

model = None
nr_aggregated_models = 0
Expand Down Expand Up @@ -114,12 +140,12 @@ def combine_models(self, helper=None, delete_models=True, params=None):
"AGGREGATOR({}): Error encoutered while processing model update {}, skipping this update.".format(self.name, e))
self.model_updates.task_done()

if params['serveropt'] == 'adam':
model = self.serveropt_adam(helper, pseudo_gradient, model_old, params)
elif params['serveropt'] == 'yogi':
model = self.serveropt_yogi(helper, pseudo_gradient, model_old, params)
elif params['serveropt'] == 'adagrad':
model = self.serveropt_adagrad(helper, pseudo_gradient, model_old, params)
if parameters['serveropt'] == 'adam':
model = self.serveropt_adam(helper, pseudo_gradient, model_old, parameters)
elif parameters['serveropt'] == 'yogi':
model = self.serveropt_yogi(helper, pseudo_gradient, model_old, parameters)
elif parameters['serveropt'] == 'adagrad':
model = self.serveropt_adagrad(helper, pseudo_gradient, model_old, parameters)
else:
logger.error("Unsupported server optimizer passed to FedOpt.")
return
Expand All @@ -129,7 +155,7 @@ def combine_models(self, helper=None, delete_models=True, params=None):
logger.info("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models))
return model, data

def serveropt_adam(self, helper, pseudo_gradient, model_old, params):
def serveropt_adam(self, helper, pseudo_gradient, model_old, parameters):
""" Server side optimization, FedAdam.

:param helper: instance of helper class.
Expand All @@ -138,15 +164,15 @@ def serveropt_adam(self, helper, pseudo_gradient, model_old, params):
:type pseudo_gradient: As defined by helper.
:param model_old: The current global model.
:type model_old: As defined in helper.
:param params: Hyperparamters for the aggregator.
:type params: dict
:param parameters: Hyperparamters for the aggregator.
:type parameters: dict
:return: new model weights.
:rtype: as defined by helper.
"""
beta1 = params['beta1']
beta2 = params['beta2']
learning_rate = params['learning_rate']
tau = params['tau']
beta1 = parameters['beta1']
beta2 = parameters['beta2']
learning_rate = parameters['learning_rate']
tau = parameters['tau']

if not self.v:
self.v = helper.ones(pseudo_gradient, math.pow(tau, 2))
Expand All @@ -165,7 +191,7 @@ def serveropt_adam(self, helper, pseudo_gradient, model_old, params):

return model

def serveropt_yogi(self, helper, pseudo_gradient, model_old, params):
def serveropt_yogi(self, helper, pseudo_gradient, model_old, parameters):
""" Server side optimization, FedYogi.

:param helper: instance of helper class.
Expand All @@ -174,16 +200,16 @@ def serveropt_yogi(self, helper, pseudo_gradient, model_old, params):
:type pseudo_gradient: As defined by helper.
:param model_old: The current global model.
:type model_old: As defined in helper.
:param params: Hyperparamters for the aggregator.
:type params: dict
:param parameters: Hyperparamters for the aggregator.
:type parameters: dict
:return: new model weights.
:rtype: as defined by helper.
"""

beta1 = params['beta1']
beta2 = params['beta2']
learning_rate = params['learning_rate']
tau = params['tau']
beta1 = parameters['beta1']
beta2 = parameters['beta2']
learning_rate = parameters['learning_rate']
tau = parameters['tau']

if not self.v:
self.v = helper.ones(pseudo_gradient, math.pow(tau, 2))
Expand All @@ -204,7 +230,7 @@ def serveropt_yogi(self, helper, pseudo_gradient, model_old, params):

return model

def serveropt_adagrad(self, helper, pseudo_gradient, model_old, params):
def serveropt_adagrad(self, helper, pseudo_gradient, model_old, parameters):
""" Server side optimization, FedAdam.

:param helper: instance of helper class.
Expand All @@ -213,15 +239,15 @@ def serveropt_adagrad(self, helper, pseudo_gradient, model_old, params):
:type pseudo_gradient: As defined by helper.
:param model_old: The current global model.
:type model_old: As defined in helper.
:param params: Hyperparamters for the aggregator.
:type params: dict
:param parameters: Hyperparamters for the aggregator.
:type parameters: dict
:return: new model weights.
:rtype: as defined by helper.
"""

beta1 = params['beta1']
learning_rate = params['learning_rate']
tau = params['tau']
beta1 = parameters['beta1']
learning_rate = parameters['learning_rate']
tau = parameters['tau']

if not self.v:
self.v = helper.ones(pseudo_gradient, math.pow(tau, 2))
Expand Down
Loading
Loading