Skip to content

Commit

Permalink
Set initial model_id in session config
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Hellander committed Jan 11, 2024
1 parent be80527 commit 846fafa
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 50 deletions.
2 changes: 1 addition & 1 deletion examples/mnist-pytorch/client/entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def _load_data(data_path, is_train=True):


def _save_model(model, out_path):
""" Save model to disk.
""" Save model to disk.
:param model: The model to save.
:type model: torch.nn.Module
Expand Down
63 changes: 34 additions & 29 deletions examples/notebooks/API_Example.ipynb

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions fedn/fedn/network/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,16 @@ def get_round(self, round_id):
response = requests.get(self._get_url(f'get_round?round_id={round_id}'), verify=self.verify)
return response.json()

def start_session(self, session_id=None, aggregator='fedavg', round_timeout=180, rounds=5, round_buffer_size=-1, delete_models=True,
def start_session(self, session_id=None, aggregator='fedavg', model_id=None, round_timeout=180, rounds=5, round_buffer_size=-1, delete_models=True,
validate=True, helper='kerashelper', min_clients=1, requested_clients=8):
""" Start a new session.
:param session_id: The session id to start.
:type session_id: str
:param aggregator: The aggregator plugin to use.
:type aggregator: str
:param model_id: The id of the initial model.
:type model_id: str
:param round_timeout: The round timeout to use in seconds.
:type round_timeout: int
:param rounds: The number of rounds to perform.
Expand All @@ -136,8 +140,9 @@ def start_session(self, session_id=None, aggregator='fedavg', round_timeout=180,
:rtype: dict
"""
response = requests.post(self._get_url('start_session'), json={
'aggregator': aggregator,
'session_id': session_id,
'aggregator': aggregator,
'model_id': model_id,
'round_timeout': round_timeout,
'rounds': rounds,
'round_buffer_size': round_buffer_size,
Expand Down
10 changes: 8 additions & 2 deletions fedn/fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ def start_session(
self,
session_id,
aggregator='fedavg',
model_id=None,
rounds=5,
round_timeout=180,
round_buffer_size=-1,
Expand All @@ -765,6 +766,10 @@ def start_session(
:param session_id: The session id to start.
:type session_id: str
:param aggregator: The aggregator plugin to use.
:type aggregator: str
:param initial_model: The initial model for the session.
:type initial_model: str
:param rounds: The number of rounds to perform.
:type rounds: int
:param round_timeout: The round timeout to use in seconds.
Expand Down Expand Up @@ -831,13 +836,14 @@ def start_session(
validate = False

# Get lastest model as initial model for session
model_id = self.statestore.get_latest_model()
if not model_id:
model_id = self.statestore.get_latest_model()

# Setup session config
session_config = {
"session_id": session_id if session_id else str(uuid.uuid4()),
"round_timeout": round_timeout,
"aggregator": aggregator,
"round_timeout": round_timeout,
"buffer_size": round_buffer_size,
"model_id": model_id,
"rounds": rounds,
Expand Down
7 changes: 3 additions & 4 deletions fedn/fedn/network/controller/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ def session(self, config):

last_round = int(self.get_latest_round_id())

# Clear potential stragglers/old model updates at combiners
for combiner in self.network.get_combiners():
# combiner.flush_model_update_queue()
combiner.set_aggregator(config['aggregator'])

# Execute the rounds in this session
Expand All @@ -142,6 +140,8 @@ def session(self, config):
flush=True,
)

config["model_id"] = self.statestore.get_latest_model()

# TODO: Report completion of session
self._state = ReducerState.idle

Expand All @@ -167,8 +167,7 @@ def round(self, session_config, round_id):
round_config["rounds"] = 1
round_config["round_id"] = round_id
round_config["task"] = "training"
round_config["model_id"] = self.statestore.get_latest_model()
round_config["helper_type"] = self.statestore.get_helper()
#round_config["helper_type"] = self.statestore.get_helper()

self.set_round_config(round_id, round_config)

Expand Down
6 changes: 3 additions & 3 deletions fedn/fedn/utils/plugins/helperbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ def __init__(self):
self.name = self.__class__.__name__

@abstractmethod
def increment_average(self, model, model_next, a, W):
def increment_average(self, m1, m2, a, W):
""" Compute one increment of incremental weighted averaging.
:param model: Current model weights in array-like format.
:param model_next: New model weights in array-like format.
:param m1: Current model weights in array-like format.
:param m2: New model weights in array-like format.
:param a: Number of examples in new model.
:param W: Total number of examples.
:return: Incremental weighted average of model weights.
Expand Down
50 changes: 44 additions & 6 deletions fedn/fedn/utils/plugins/numpyarrayhelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,57 @@
class Helper(HelperBase):
""" FEDn helper class for numpy arrays. """

def increment_average(self, model, model_next, n):
def increment_average(self, m1, m2, n, N):
""" Update an incremental average.
:param model: Current model weights.
:type model: numpy array.
:param model_next: New model weights.
:type model_next: numpy array.
:param m1: Current model weights.
:type m1: numpy ndarray.
:param m2: New model weights.
:type m2: numpy ndarray.
:param n: Number of examples in new model.
:type n: int
:param N: Total number of examples
:return: Incremental weighted average of model weights.
:rtype: :class:`numpy.array`
"""
return np.add(model, (model_next - model) / n)
return np.add(m1, n*(m2 - m1) / N)

def add(self, m1, m2, a=1.0, b=1.0):
""" Add weights.
:param model: Current model weights with keys from torch state_dict.
:type model: OrderedDict
:param model_next: New model weights with keys from torch state_dict.
:type model_next: OrderedDict
:return: Incremental weighted average of model weights.
:rtype: OrderedDict
"""

w = a*m1 + b*m2
return w

def subtract(self, m1, m2, a=1.0, b=1.0):
""" Subtract model weights m2 from m1.
:param model: Current model weights.
:type model: list of numpy arrays.
:param model_next: New model weights.
:type model_next: list of numpy arrays.
:param num_examples: Number of examples in new model.
:type num_examples: int
:param total_examples: Total number of examples.
:type total_examples: int
:return: Incremental weighted average of model weights.
:rtype: list of numpy arrays.
"""

w = a*m1-b*m2
return w

def norm(self, m):
""" Compute the L2 norm of the weights/model update. """

return np.linalg.norm(m)

def save(self, model, path=None):
""" Serialize weights/parameters to file.
Expand Down
6 changes: 3 additions & 3 deletions fedn/fedn/utils/plugins/pytorchhelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ def subtract(self, m1, m2, a=1.0, b=1.0):
w[name] = tensorDiff
return w

def norm(self, model):
"""Compute the L1-norm of the tensor. """
def norm(self, m):
"""Compute the L1-norm of the tensor m. """
n = 0.0
for name, val in model.items():
for name, val in m.items():
n += np.linalg.norm(np.array(val), 1)
return n

Expand Down

0 comments on commit 846fafa

Please sign in to comment.