Skip to content

Add async aggregator #3455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Add async aggregator #3455

wants to merge 2 commits into from

Conversation

yanchengnv
Copy link
Collaborator

@yanchengnv yanchengnv commented Apr 28, 2025

Fixes # .

Description

This PR implements a wrapper aggregator that can be used to perform another aggregator's "accept" method in a separate thread. This can drastically reduce the block time of the SAG workflow when processing client submission if the "accept" processing is time-consuming, as reported by some customers.

To use this aggregator, the user simply puts the original aggregator as a component in this wrapper aggregator (AsyncAggregator):

"components": [
{
"id": "aggregator",
"path": "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator",
"args": {
"expected_data_kind": "WEIGHTS"
}
},
{
"id": "aggr_wrapper",
"path": "nvflare.app_common.aggregators.async.AsyncAggregator",
"args": {
"aggregator_id": "aggregator"
}
},
...
],
"workflows": [
{
"id": "scatter_and_gather",
"path": "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather",
"args": {
"min_clients": "{min_clients}",
"num_rounds": 2,
"start_round": 0,
"wait_time_after_min_received": "{wait_time}",
"aggregator_id": "aggr_wrapper",
"persistor_id": "persistor",
"shareable_generator_id": "shareable_generator",
"train_task_name": "train",
"train_timeout": 6000,
"ignore_result_error": true
}
}
]

Types of changes

  • Non-breaking change (fix or new feature that would not break existing functionality).
  • Breaking change (fix or new feature that would cause existing functionality to change).
  • New tests added to cover the changes.
  • Quick tests passed locally by running ./runtest.sh.
  • In-line docstrings updated.
  • Documentation updated.

Copy link
Collaborator

@YuanTingHsieh YuanTingHsieh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM

"""
with _update_lock:
new_ctx = FLContext()
new_ctx.model = self.model
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no related to this PR, but are we using this "self.model" anywhere?

Comment on lines +155 to +156
if rc != _AcceptWaitRC.IS_SET:
self.log_warning(fl_ctx, f"abnormal result {rc} waiting for accept thread")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the accept thread has "abnormal" RC, should we just return here? or raise Exception?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could only happen when the "accept" thread is timed out (e.g. the original aggregator's accept method got stuck). I think you are right that we should raise exception, since there won't be a good way to recover.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants