Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make choice of aggregator dynamic at session level and additional aggregators (FedOpt) #498

Merged
merged 36 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
357198a
Removed unused base class
Dec 14, 2023
f59e0bf
work in progress, fedadam
Dec 28, 2023
fa791cf
Fix race condition in docker-compose template
Dec 28, 2023
7a20a6b
Merge branch 'bugfix/GH-496' into feature/fedopt
Dec 28, 2023
9aaf22e
Working fedopt, sgd as server side optimizer
Dec 28, 2023
8cceef3
Working fedopt, sgd as server side optimizer
Dec 30, 2023
353dcf4
Simple notebook demonstrating use of API to run an experiment and to …
Jan 2, 2024
e01eb8e
Make it possible to configure the aggregator per session
Jan 2, 2024
be80527
code checks
Jan 2, 2024
846fafa
Set initial model_id in session config
Jan 11, 2024
be5051b
Merge branch 'master' into feature/fedopt
Jan 11, 2024
e8238b0
fedadam working for pytorch
Jan 19, 2024
94f23c4
Rename numpyarrayhelper to numpyhelper
Jan 21, 2024
a88e557
Updated helper interface with numerics primitives
Jan 22, 2024
9e5ad43
PyTorch models now use list of numpy ndarray as format
Jan 23, 2024
5868624
kerashelper and pytorchhelper consolidated into one numpyhelper
Jan 23, 2024
d214884
Cleaned a bit in examples and added documentation
Jan 23, 2024
6583fc6
removed non working healthcheck
Jan 23, 2024
dc9502b
codechecks
Jan 23, 2024
3df3036
add back inference entrypoint
Jan 23, 2024
21f8065
Update integration tests
Jan 23, 2024
6e0611b
codechecks
Jan 23, 2024
d16f671
Fix imports
Jan 23, 2024
33742e4
Removed unused arguments to combine_models
Jan 23, 2024
29212a6
Refactor helper module and update unit tests
Jan 23, 2024
6a8396d
Refactor helper module
Jan 24, 2024
db1eb74
codecheck
Jan 24, 2024
f000351
Improve aggrgator interface
Jan 24, 2024
d6049ea
codecheck
Jan 24, 2024
a35d1fe
Removed addition to fedn.yaml
Jan 24, 2024
847d6ba
Updated docstrings
Jan 24, 2024
bd3c411
Changed RoundControl to RoundHandler to avoid confusion with the glob…
Jan 24, 2024
5a80fcf
Clean up notebook
Jan 24, 2024
f70a7d5
Moved notebook into pytorch example folder
Jan 24, 2024
b1b57ae
Added notebooks in torch example folder to gitignore
Jan 24, 2024
109b62e
Updated docstrings
Jan 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 43 additions & 47 deletions examples/notebooks/API_Example.ipynb
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add this to gitignore after it has been commited

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I also moved it into the mnist-pytorch example

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions fedn/fedn/network/combiner/aggregators/aggregatorbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ class AggregatorBase(ABC):
:type server: class: `fedn.network.combiner.Combiner`
:param modelservice: A handle to the model service :class: `fedn.network.combiner.modelservice.ModelService`
:type modelservice: class: `fedn.network.combiner.modelservice.ModelService`
:param control: A handle to the :class: `fedn.network.combiner.round.RoundController`
:type control: class: `fedn.network.combiner.round.RoundController`
:param control: A handle to the :class: `fedn.network.combiner.round.RoundHandler`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roundhelper

:type control: class: `fedn.network.combiner.round.RoundHandler`
"""

@abstractmethod
def __init__(self, storage, server, modelservice, control):
def __init__(self, storage, server, modelservice, round_handler):
""" Initialize the aggregator."""
self.name = self.__class__.__name__
self.storage = storage
self.server = server
self.modelservice = modelservice
self.control = control
self.round_handler = round_handler
self.model_updates = queue.Queue()

@abstractmethod
Expand Down Expand Up @@ -58,7 +58,7 @@ def on_model_update(self, model_update):
and then puts the update id on the aggregation queue.
Override in subclass as needed.

:param model_update: A ModelUpdate message.
:param model_update: fedn.network.grpc.fedn.proto.ModelUpdate
:type model_id: str
"""
try:
Expand Down Expand Up @@ -114,7 +114,7 @@ def load_model_update(self, model_update, helper):
:rtype: tuple
"""
model_id = model_update.model_update_id
model = self.control.load_model_update(helper, model_id)
model = self.round_handler.load_model_update(helper, model_id)
# Get relevant metadata
data = json.loads(model_update.meta)['training_metadata']
config = json.loads(json.loads(model_update.meta)['config'])
Expand All @@ -141,8 +141,8 @@ def get_aggregator(aggregator_module_name, storage, server, modelservice, contro
:type server: class: `fedn.network.combiner.Combiner`
:param modelservice: A handle to the model service :class: `fedn.network.combiner.modelservice.ModelService`
:type modelservice: class: `fedn.network.combiner.modelservice.ModelService`
:param control: A handle to the :class: `fedn.network.combiner.round.RoundController`
:type control: class: `fedn.network.combiner.round.RoundController`
:param control: A handle to the :class: `fedn.network.combiner.round.RoundHandler`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roundhelper

:type control: class: `fedn.network.combiner.round.RoundHandler`
:return: An aggregator instance.
:rtype: class: `fedn.combiner.aggregators.AggregatorBase`
"""
Expand Down
8 changes: 4 additions & 4 deletions fedn/fedn/network/combiner/aggregators/fedavg.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ class Aggregator(AggregatorBase):
:type server: class: `fedn.network.combiner.Combiner`
:param modelservice: A handle to the model service :class: `fedn.network.combiner.modelservice.ModelService`
:type modelservice: class: `fedn.network.combiner.modelservice.ModelService`
:param control: A handle to the :class: `fedn.network.combiner.round.RoundController`
:type control: class: `fedn.network.combiner.round.RoundController`
:param control: A handle to the :class: `fedn.network.combiner.round.RoundHandler`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roundhelper

:type control: class: `fedn.network.combiner.round.RoundHandler`

"""

def __init__(self, storage, server, modelservice, control):
def __init__(self, storage, server, modelservice, round_handler):
"""Constructor method"""

super().__init__(storage, server, modelservice, control)
super().__init__(storage, server, modelservice, round_handler)

self.name = "fedavg"

Expand Down
10 changes: 5 additions & 5 deletions fedn/fedn/network/combiner/aggregators/fedopt.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ class Aggregator(AggregatorBase):
:type server: class: `fedn.network.combiner.Combiner`
:param modelservice: A handle to the model service :class: `fedn.network.combiner.modelservice.ModelService`
:type modelservice: class: `fedn.network.combiner.modelservice.ModelService`
:param control: A handle to the :class: `fedn.network.combiner.round.RoundController`
:type control: class: `fedn.network.combiner.round.RoundController`
:param control: A handle to the :class: `fedn.network.combiner.round.RoundHandler`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roundhelper

:type control: class: `fedn.network.combiner.round.RoundHandler`

"""

def __init__(self, storage, server, modelservice, control):
def __init__(self, storage, server, modelservice, round_handler):

super().__init__(storage, server, modelservice, control)
super().__init__(storage, server, modelservice, round_handler)

self.name = "fedopt"
self.v = None
Expand Down Expand Up @@ -81,7 +81,7 @@ def combine_models(self, helper=None, delete_models=True):
total_examples += metadata['num_examples']

if nr_aggregated_models == 0:
model_old = self.control.load_model_update(helper, model_update.model_id)
model_old = self.round_handler.load_model_update(helper, model_update.model_id)
pseudo_gradient = helper.subtract(model_next, model_old)
else:
pseudo_gradient_next = helper.subtract(model_next, model_old)
Expand Down
14 changes: 7 additions & 7 deletions fedn/fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
set_log_stream)
from fedn.network.combiner.connect import ConnectorCombiner, Status
from fedn.network.combiner.modelservice import ModelService
from fedn.network.combiner.round import RoundController
from fedn.network.combiner.roundhandler import RoundHandler
from fedn.network.grpc.server import Server
from fedn.network.storage.s3.repository import Repository
from fedn.network.storage.statestore.mongostatestore import MongoStateStore
Expand Down Expand Up @@ -134,10 +134,10 @@ def __init__(self, config):
self.server = Server(self, self.modelservice, grpc_config)

# Set up round controller
self.control = RoundController(self.repository, self, self.modelservice)
self.round_handler = RoundHandler(self.repository, self, self.modelservice)

# Start thread for round controller
threading.Thread(target=self.control.run, daemon=True).start()
threading.Thread(target=self.round_handler.run, daemon=True).start()

# Start thread for client status updates: TODO: Should be configurable
threading.Thread(target=self._deamon_thread_client_status, daemon=True).start()
Expand Down Expand Up @@ -385,7 +385,7 @@ def _flush_model_update_queue(self):
:return: True if successful, else False
"""

q = self.control.aggregator.model_updates
q = self.round_handler.aggregator.model_updates
try:
with q.mutex:
q.queue.clear()
Expand Down Expand Up @@ -418,7 +418,7 @@ def Start(self, control: fedn.ControlRequest, context):

logger.debug("grpc.Combiner.Start: Round config {}".format(config))

job_id = self.control.push_round_config(config)
job_id = self.round_handler.push_round_config(config)
logger.info("grcp.Combiner.Start: Pushed round config (job_id): {}".format(job_id))

response = fedn.ControlResponse()
Expand All @@ -442,7 +442,7 @@ def SetAggregator(self, control: fedn.ControlRequest, context):
for parameter in control.parameter:
aggregator = parameter.value

status = self.control.set_aggregator(aggregator)
status = self.round_handler.set_aggregator(aggregator)

response = fedn.ControlResponse()
if status:
Expand Down Expand Up @@ -723,7 +723,7 @@ def SendModelUpdate(self, request, context):
:return: the response
:rtype: :class:`fedn.network.grpc.fedn_pb2.Response`
"""
self.control.aggregator.on_model_update(request)
self.round_handler.aggregator.on_model_update(request)

response = fedn.Response()
response.response = "RECEIVED ModelUpdate {} from client {}".format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ class ModelUpdateError(Exception):
pass


class RoundController:
""" Round controller.
class RoundHandler:
""" Round handler.

The round controller recieves round configurations from the global controller
and coordinates model updates and aggregation, and model validations.
The round handler processes requests from the global controller
to produce model updates and perform model validations.

:param aggregator_name: The name of the aggregator plugin module.
:type aggregator_name: str
Expand All @@ -30,7 +30,7 @@ class RoundController:
"""

def __init__(self, storage, server, modelservice):
""" Initialize the RoundController."""
""" Initialize the RoundHandler."""

self.round_configs = queue.Queue()
self.storage = storage
Expand Down Expand Up @@ -58,9 +58,9 @@ def push_round_config(self, round_config):
return round_config['_job_id']

def load_model_update(self, helper, model_id):
"""Load model update in its native format.
"""Load model update with id model_id into its memory representation.

:param helper: An instance of :class: `fedn.utils.helpers.helpers.HelperBase`, ML framework specific helper, defaults to None
:param helper: An instance of :class: `fedn.utils.helpers.helpers.HelperBase`
:type helper: class: `fedn.utils.helpers.helpers.HelperBase`
:param model_id: The ID of the model update, UUID in str format
:type model_id: str
Expand Down