-
Notifications
You must be signed in to change notification settings - Fork 197
Add async aggregator #3455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add async aggregator #3455
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM
""" | ||
with _update_lock: | ||
new_ctx = FLContext() | ||
new_ctx.model = self.model |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no related to this PR, but are we using this "self.model" anywhere?
if rc != _AcceptWaitRC.IS_SET: | ||
self.log_warning(fl_ctx, f"abnormal result {rc} waiting for accept thread") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the accept thread has "abnormal" RC, should we just return here? or raise Exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could only happen when the "accept" thread is timed out (e.g. the original aggregator's accept method got stuck). I think you are right that we should raise exception, since there won't be a good way to recover.
Fixes # .
Description
This PR implements a wrapper aggregator that can be used to perform another aggregator's "accept" method in a separate thread. This can drastically reduce the block time of the SAG workflow when processing client submission if the "accept" processing is time-consuming, as reported by some customers.
To use this aggregator, the user simply puts the original aggregator as a component in this wrapper aggregator (AsyncAggregator):
"components": [
{
"id": "aggregator",
"path": "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator",
"args": {
"expected_data_kind": "WEIGHTS"
}
},
{
"id": "aggr_wrapper",
"path": "nvflare.app_common.aggregators.async.AsyncAggregator",
"args": {
"aggregator_id": "aggregator"
}
},
...
],
"workflows": [
{
"id": "scatter_and_gather",
"path": "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather",
"args": {
"min_clients": "{min_clients}",
"num_rounds": 2,
"start_round": 0,
"wait_time_after_min_received": "{wait_time}",
"aggregator_id": "aggr_wrapper",
"persistor_id": "persistor",
"shareable_generator_id": "shareable_generator",
"train_task_name": "train",
"train_timeout": 6000,
"ignore_result_error": true
}
}
]
Types of changes
./runtest.sh
.