Skip to content

Commit

Permalink
Use logging module for controller
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Hellander committed Jan 29, 2024
1 parent bc24cba commit 9e83599
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 130 deletions.
5 changes: 3 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
## Examples
The examples distributed here in this folder are regularly maintained by Scaleout and are part of the continuous integrations tests.
The examples distributed here in this folder are maintained by Scaleout.

### External examples
Below we maintain a list of examples provided both by the Scaleout core team and users. They may or may not be tested with the latest release of FEDn, please refer to the README of each specific project/example for detail. If you have a project that you want to include in this list, talk to a core developer in [Discord](https://discord.gg/CCRgjpMsVA).
Below we maintain a list of examples provided both by the Scaleout core team and users. They may or may not be tested with the latest release of FEDn, please refer to the README of each specific project/example for detail.
If you have a project that you want to include in this list, talk to a core developer in [Discord](https://discord.gg/CCRgjpMsVA).

- [C++ version of the MNIST example](https://github.com/scaleoutsystems/examples) Also executes in Intel SGX TEE via Gramine.
- [Human activity recognition use case from the FEDn paper](https://github.com/scaleoutsystems/examples) IoT/cross-device.
Expand Down
4 changes: 2 additions & 2 deletions examples/async-simulation/Experiment.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@
},
{
"cell_type": "code",
"execution_count": 70,
"execution_count": 74,
"id": "f0380d35",
"metadata": {},
"outputs": [],
"source": [
"session_config_fedavg = {\n",
" \"helper\": \"numpyhelper\",\n",
" \"session_id\": \"experiment_fedavg4\",\n",
" \"session_id\": \"experiment_fedavg6\",\n",
" \"aggregator\": \"fedavg\",\n",
" \"model_id\": seed_model['model_id'],\n",
" \"rounds\": 1,\n",
Expand Down
32 changes: 12 additions & 20 deletions fedn/fedn/network/combiner/roundhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ def push_round_config(self, round_config):
round_config['_job_id'] = str(uuid.uuid4())
self.round_configs.put(round_config)
except Exception:
logger.warning(
"ROUNDCONTROL: Failed to push round config.")
logger.warning("Failed to push round config.")
raise
return round_config['_job_id']

Expand Down Expand Up @@ -100,9 +99,7 @@ def load_model_update_str(self, model_id, retry=3):
while tries < retry:
tries += 1
if not model_str or sys.getsizeof(model_str) == 80:
logger.warning(
"ROUNDCONTROL: Model download failed. retrying")

logger.warning("Model download failed. retrying")
time.sleep(1)
model_str = self.modelservice.get_model(model_id)

Expand Down Expand Up @@ -170,7 +167,7 @@ def _training_round(self, config, clients):

try:
helper = get_helper(config['helper_type'])
logger.info("ROUNDCONTROL: Config delete_models_storage: {}".format(config['delete_models_storage']))
logger.info("Config delete_models_storage: {}".format(config['delete_models_storage']))
if config['delete_models_storage'] == 'True':
delete_models = True
else:
Expand Down Expand Up @@ -209,9 +206,9 @@ def stage_model(self, model_id, timeout_retry=3, retry=2):

# If the model is already in memory at the server we do not need to do anything.
if self.modelservice.temp_model_storage.exist(model_id):
logger.info("ROUNDCONTROL: Model already exists in memory, skipping model staging.")
logger.info("Model already exists in memory, skipping model staging.")
return
logger.info("ROUNDCONTROL: Model Staging, fetching model from storage...")
logger.info("Model Staging, fetching model from storage...")
# If not, download it and stage it in memory at the combiner.
tries = 0
while True:
Expand All @@ -220,12 +217,11 @@ def stage_model(self, model_id, timeout_retry=3, retry=2):
if model:
break
except Exception:
logger.info("ROUNDCONTROL: Could not fetch model from storage backend, retrying.")
logger.info("Could not fetch model from storage backend, retrying.")
time.sleep(timeout_retry)
tries += 1
if tries > retry:
logger.info(
"ROUNDCONTROL: Failed to stage model {} from storage backend!".format(model_id))
logger.info("Failed to stage model {} from storage backend!".format(model_id))
raise

self.modelservice.set_model(model, model_id)
Expand All @@ -246,8 +242,7 @@ def _assign_round_clients(self, n, type="trainers"):
elif type == "trainers":
clients = self.server.get_active_trainers()
else:
logger.info(
"ROUNDCONTROL(ERROR): {} is not a supported type of client".format(type))
logger.error("(ERROR): {} is not a supported type of client".format(type))
raise

# If the number of requested trainers exceeds the number of available, use all available.
Expand Down Expand Up @@ -315,8 +310,7 @@ def execute_training_round(self, config):
:rtype: dict
"""

logger.info(
"ROUNDCONTROL: Processing training round, job_id {}".format(config['_job_id']))
logger.info("Processing training round, job_id {}".format(config['_job_id']))

data = {}
data['config'] = config
Expand All @@ -341,7 +335,7 @@ def execute_training_round(self, config):
data['model_id'] = model_id

logger.info(
"ROUNDCONTROL: TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id']))
"TRAINING ROUND COMPLETED. Aggregated model id: {}, Job id: {}".format(model_id, config['_job_id']))

# Delete temp model
self.modelservice.temp_model_storage.delete(config['model_id'])
Expand Down Expand Up @@ -374,14 +368,12 @@ def run(self, polling_interval=1.0):
elif round_config['task'] == 'validation' or round_config['task'] == 'inference':
self.execute_validation_round(round_config)
else:
logger.warning(
"ROUNDCONTROL: Round config contains unkown task type.")
logger.warning("config contains unkown task type.")
else:
round_meta = {}
round_meta['status'] = "Failed"
round_meta['reason'] = "Failed to meet client allocation requirements for this round config."
logger.warning(
"ROUNDCONTROL: {0}".format(round_meta['reason']))
logger.warning("{0}".format(round_meta['reason']))

self.round_configs.task_done()
except queue.Empty:
Expand Down
112 changes: 33 additions & 79 deletions fedn/fedn/network/controller/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,11 @@ def session(self, config):
"""

if self._state == ReducerState.instructing:
print(
"Controller already in INSTRUCTING state. A session is in progress.",
flush=True,
)
logger.info("Controller already in INSTRUCTING state. A session is in progress.")
return

if not self.statestore.get_latest_model():
print("No model in model chain, please provide a seed model!")
logger.info("No model in model chain, please provide a seed model!")
return

self._state = ReducerState.instructing
Expand All @@ -107,11 +104,6 @@ def session(self, config):
)
self.create_session(config)

if not self.statestore.get_latest_model():
print(
"No model in model chain, please provide a seed model!",
flush=True,
)
self._state = ReducerState.monitoring

last_round = int(self.get_latest_round_id())
Expand All @@ -130,17 +122,9 @@ def session(self, config):
try:
_, round_data = self.round(config, str(current_round))
except TypeError as e:
print(
"Could not unpack data from round: {0}".format(e),
flush=True,
)

print(
"CONTROL: Round completed with status {}".format(
round_data["status"]
),
flush=True,
)
logger.error("Failed to execute round: {0}".format(e))

logger.info("Round completed with status {}".format(round_data["status"]))

config["model_id"] = self.statestore.get_latest_model()

Expand All @@ -160,7 +144,7 @@ def round(self, session_config, round_id):
self.create_round({'round_id': round_id, 'status': "Pending"})

if len(self.network.get_combiners()) < 1:
print("CONTROLLER: Round cannot start, no combiners connected!", flush=True)
logger.warning("Round cannot start, no combiners connected!", flush=True)
self.set_round_status(round_id, 'Failed')
return None, self.statestore.get_round(round_id)

Expand All @@ -179,10 +163,10 @@ def round(self, session_config, round_id):
round_start = self.evaluate_round_start_policy(participating_combiners)

if round_start:
print("CONTROL: round start policy met, {} participating combiners.".format(
len(participating_combiners)), flush=True)
logger.info("round start policy met, {} participating combiners.".format(
len(participating_combiners)))
else:
print("CONTROL: Round start policy not met, skipping round!", flush=True)
logger.warning("Round start policy not met, skipping round!")
self.set_round_status(round_id, 'Failed')
return None, self.statestore.get_round(round_id)

Expand All @@ -193,22 +177,21 @@ def round(self, session_config, round_id):
# Wait until participating combiners have produced an updated global model,
# or round times out.
def do_if_round_times_out(result):
print("CONTROL: Round timed out!", flush=True)
logger.warning("Round timed out!")

@retry(wait=wait_random(min=1.0, max=2.0),
stop=stop_after_delay(session_config['round_timeout']),
retry_error_callback=do_if_round_times_out,
retry=retry_if_exception_type(CombinersNotDoneException))
@ retry(wait=wait_random(min=1.0, max=2.0),
stop=stop_after_delay(session_config['round_timeout']),
retry_error_callback=do_if_round_times_out,
retry=retry_if_exception_type(CombinersNotDoneException))
def combiners_done():

round = self.statestore.get_round(round_id)
if 'combiners' not in round:
# TODO: use logger
print("CONTROL: Waiting for combiners to update model...", flush=True)
logger.info("Waiting for combiners to update model...")
raise CombinersNotDoneException("Combiners have not yet reported.")

if len(round['combiners']) < len(participating_combiners):
print("CONTROL: Waiting for combiners to update model...", flush=True)
logger.info("Waiting for combiners to update model...")
raise CombinersNotDoneException("All combiners have not yet reported.")

return True
Expand All @@ -218,8 +201,8 @@ def combiners_done():
# Due to the distributed nature of the computation, there might be a
# delay before combiners have reported the round data to the db,
# so we need some robustness here.
@retry(wait=wait_random(min=0.1, max=1.0),
retry=retry_if_exception_type(KeyError))
@ retry(wait=wait_random(min=0.1, max=1.0),
retry=retry_if_exception_type(KeyError))
def check_combiners_done_reporting():
round = self.statestore.get_round(round_id)
combiners = round['combiners']
Expand All @@ -230,30 +213,26 @@ def check_combiners_done_reporting():
round = self.statestore.get_round(round_id)
round_valid = self.evaluate_round_validity_policy(round)
if not round_valid:
print("REDUCER CONTROL: Round invalid!", flush=True)
logger.error("Round failed. Invalid - evaluate_round_validity_policy: False")
self.set_round_status(round_id, 'Failed')
return None, self.statestore.get_round(round_id)

print("CONTROL: Reducing combiner level models...", flush=True)
logger.info("Reducing combiner level models...")
# Reduce combiner models into a new global model
round_data = {}
try:
round = self.statestore.get_round(round_id)
model, data = self.reduce(round['combiners'])
round_data['reduce'] = data
print("CONTROL: Done reducing models from combiners!", flush=True)
logger.info("Done reducing models from combiners!")
except Exception as e:
print("CONTROL: Failed to reduce models from combiners: {}".format(
e), flush=True)
logger.error("Failed to reduce models from combiners, reason: {}".format(e))
self.set_round_status(round_id, 'Failed')
return None, self.statestore.get_round(round_id)

# Commit the new global model to the model trail
if model is not None:
print(
"CONTROL: Committing global model to model trail...",
flush=True,
)
logger.info("Committing global model to model trail...")
tic = time.time()
model_id = uuid.uuid4()
session_id = (
Expand All @@ -263,17 +242,9 @@ def check_combiners_done_reporting():
)
self.commit(model_id, model, session_id)
round_data["time_commit"] = time.time() - tic
print(
"CONTROL: Done committing global model to model trail!",
flush=True,
)
logger.info("Done committing global model to model trail.")
else:
print(
"REDUCER: failed to update model in round with config {}".format(
session_config
),
flush=True,
)
logger.error("Failed to commit model to global model trail.")
self.set_round_status(round_id, 'Failed')
return None, self.statestore.get_round(round_id)

Expand All @@ -293,12 +264,7 @@ def check_combiners_done_reporting():

for combiner, combiner_config in validating_combiners:
try:
print(
"CONTROL: Submitting validation round to combiner {}".format(
combiner
),
flush=True,
)
logger.info("Submitting validation round to combiner {}".format(combiner))
combiner.submit(combiner_config)
except CombinerUnavailableError:
self._handle_unavailable_combiner(combiner)
Expand All @@ -322,10 +288,6 @@ def reduce(self, combiners):

i = 1
model = None
# Check if there are any combiners to reduce
if len(combiners) == 0:
print("REDUCER: No combiners to reduce!", flush=True)
return model, meta

for combiner in combiners:
name = combiner['name']
Expand All @@ -348,7 +310,7 @@ def reduce(self, combiners):
model_next = load_model_from_BytesIO(data, helper)
meta["time_load_model"] += time.time() - tic
tic = time.time()
model = helper.increment_average(model, model_next, i, i)
model = helper.increment_average(model, model_next, 1.0, i)
meta["time_aggregate_model"] += time.time() - tic
except Exception:
tic = time.time()
Expand All @@ -368,13 +330,13 @@ def infer_instruct(self, config):

# Check/set instucting state
if self.__state == ReducerState.instructing:
print("Already set in INSTRUCTING state", flush=True)
logger.info("Already set in INSTRUCTING state")
return
self.__state = ReducerState.instructing

# Check for a model chain
if not self.statestore.latest_model():
print("No model in model chain, please seed the alliance!")
logger.info("No model in model chain, please seed the alliance!")

# Set reducer in monitoring state
self.__state = ReducerState.monitoring
Expand All @@ -383,7 +345,7 @@ def infer_instruct(self, config):
try:
self.inference_round(config)
except TypeError:
print("Could not unpack data from round...", flush=True)
logger.error("Round failed.")

# Set reducer in idle state
self.__state = ReducerState.idle
Expand All @@ -399,7 +361,7 @@ def inference_round(self, config):

# Check for at least one combiner in statestore
if len(self.network.get_combiners()) < 1:
print("REDUCER: No combiners connected!")
logger.warning("REDUCER: No combiners connected!")
return round_data

# Setup combiner configuration
Expand All @@ -415,17 +377,9 @@ def inference_round(self, config):
# Test round start policy
round_start = self.check_round_start_policy(validating_combiners)
if round_start:
print(
"CONTROL: round start policy met, participating combiners {}".format(
validating_combiners
),
flush=True,
)
logger.info("Round start policy met, participating combiners {}".format(validating_combiners))
else:
print(
"CONTROL: Round start policy not met, skipping round!",
flush=True,
)
logger.warning("Round start policy not met, skipping round!")
return None

# Synch combiners with latest model and trigger inference
Expand Down
Loading

0 comments on commit 9e83599

Please sign in to comment.