From 7a5197e90de8361f5e062273e89359126fa915da Mon Sep 17 00:00:00 2001 From: viktorvaladi Date: Tue, 7 May 2024 17:26:54 +0200 Subject: [PATCH 1/3] ruff autofix --- docs/conf.py | 66 +++++++++---------- examples/async-clients/client/entrypoint.py | 1 - examples/async-clients/run_clients.py | 1 - examples/flower-client/client/entrypoint.py | 1 - examples/mnist-keras/client/entrypoint.py | 1 - fedn/cli/client_cmd.py | 12 +--- fedn/cli/combiner_cmd.py | 11 +--- fedn/cli/config_cmd.py | 3 +- fedn/cli/main.py | 4 +- fedn/cli/model_cmd.py | 7 +- fedn/cli/package_cmd.py | 7 +- fedn/cli/round_cmd.py | 7 +- fedn/cli/run_cmd.py | 16 ++--- fedn/cli/session_cmd.py | 7 +- fedn/cli/shared.py | 3 +- fedn/cli/status_cmd.py | 7 +- fedn/cli/validation_cmd.py | 7 +- fedn/common/certificate/certificate.py | 22 ++----- fedn/common/certificate/certificatemanager.py | 16 ++--- fedn/common/log_config.py | 6 +- fedn/network/api/client.py | 1 - fedn/network/api/interface.py | 10 +-- fedn/network/api/network.py | 1 - fedn/network/api/server.py | 7 -- fedn/network/clients/client.py | 9 +-- fedn/network/clients/connect.py | 6 +- fedn/network/clients/package.py | 1 - .../combiner/aggregators/aggregatorbase.py | 14 ++-- fedn/network/combiner/aggregators/fedavg.py | 12 ++-- fedn/network/combiner/aggregators/fedopt.py | 61 ++++++++--------- fedn/network/combiner/combiner.py | 14 +--- fedn/network/combiner/connect.py | 4 +- fedn/network/combiner/interfaces.py | 4 -- fedn/network/combiner/modelservice.py | 1 - fedn/network/combiner/roundhandler.py | 8 --- fedn/network/controller/control.py | 6 -- fedn/network/controller/controlbase.py | 19 ++---- fedn/network/grpc/__init__.py | 2 +- fedn/network/grpc/server.py | 15 ++--- fedn/network/loadbalancer/firstavailable.py | 1 - fedn/network/loadbalancer/leastpacked.py | 8 +-- fedn/network/storage/models/__init__.py | 3 +- .../storage/models/memorymodelstorage.py | 4 +- .../storage/models/tempmodelstorage.py | 4 +- fedn/network/storage/s3/__init__.py | 3 +- fedn/network/storage/s3/miniorepository.py | 2 - fedn/network/storage/s3/repository.py | 2 - .../storage/statestore/mongostatestore.py | 16 ----- .../storage/statestore/stores/client_store.py | 20 +++--- .../statestore/stores/combiner_store.py | 32 ++++----- .../storage/statestore/stores/model_store.py | 34 +++++----- .../statestore/stores/package_store.py | 28 ++++---- .../storage/statestore/stores/round_store.py | 16 ++--- .../statestore/stores/session_store.py | 16 ++--- .../storage/statestore/stores/shared.py | 4 +- .../storage/statestore/stores/status_store.py | 26 ++++---- .../storage/statestore/stores/store.py | 4 +- .../statestore/stores/validation_store.py | 24 +++---- fedn/utils/dispatcher.py | 12 ++-- fedn/utils/environment.py | 6 +- fedn/utils/helpers/helperbase.py | 1 - fedn/utils/helpers/plugins/numpyhelper.py | 8 --- fedn/utils/plots.py | 41 +++--------- fedn/utils/process.py | 3 +- pyproject.toml | 14 ---- 65 files changed, 259 insertions(+), 473 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 913c35d9c..869496f66 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -4,31 +4,31 @@ import sphinx_rtd_theme # noqa: F401 # Insert path -sys.path.insert(0, os.path.abspath('../fedn')) +sys.path.insert(0, os.path.abspath("../fedn")) # Project info -project = 'FEDn' -copyright = '2021, Scaleout Systems AB' -author = 'Scaleout Systems AB' +project = "FEDn" +copyright = "2021, Scaleout Systems AB" +author = "Scaleout Systems AB" # The full version, including alpha/beta/rc tags -release = '0.9.2' +release = "0.9.2" # Add any Sphinx extension module names here, as strings extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.doctest', - 'sphinx.ext.intersphinx', - 'sphinx.ext.coverage', - 'sphinx.ext.mathjax', - 'sphinx.ext.ifconfig', - 'sphinx.ext.viewcode', - 'sphinx_rtd_theme', - 'sphinx_code_tabs' + "sphinx.ext.autodoc", + "sphinx.ext.doctest", + "sphinx.ext.intersphinx", + "sphinx.ext.coverage", + "sphinx.ext.mathjax", + "sphinx.ext.ifconfig", + "sphinx.ext.viewcode", + "sphinx_rtd_theme", + "sphinx_code_tabs" ] # The master toctree document. -master_doc = 'index' +master_doc = "index" # Add any paths that contain templates here, relative to this directory. templates_path = [] @@ -39,31 +39,31 @@ exclude_patterns = [] # The theme to use for HTML and HTML Help pages. -html_theme = 'sphinx_rtd_theme' +html_theme = "sphinx_rtd_theme" html_theme_options = { - 'logo_only': True, + "logo_only": True, } # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = ["_static"] # Output file base name for HTML help builder. -htmlhelp_basename = 'fedndocs' +htmlhelp_basename = "fedndocs" # If defined shows an image instead of project name on page top-left (link to index page) -html_logo = '_static/images/scaleout_logo_flat_dark.svg' +html_logo = "_static/images/scaleout_logo_flat_dark.svg" # FEDn logo looks ugly on rtd theme -html_favicon = 'favicon.png' +html_favicon = "favicon.png" # Here we assume that the file is at _static/custom.css html_css_files = [ - 'css/elements.css', - 'css/text.css', - 'css/utilities.css', + "css/elements.css", + "css/text.css", + "css/utilities.css", ] # LaTeX elements @@ -89,14 +89,14 @@ # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, 'fedn.tex', 'FEDn Documentation', - 'Scaleout Systems AB', 'manual'), + (master_doc, "fedn.tex", "FEDn Documentation", + "Scaleout Systems AB", "manual"), ] # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). man_pages = [ - (master_doc, 'fedn', 'FEDn Documentation', + (master_doc, "fedn", "FEDn Documentation", [author], 1) ] @@ -104,17 +104,17 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, 'fedn', 'FEDn Documentation', - author, 'fedn', 'One line description of project.', - 'Miscellaneous'), + (master_doc, "fedn", "FEDn Documentation", + author, "fedn", "One line description of project.", + "Miscellaneous"), ] # Bibliographic Dublin Core info. epub_title = project -epub_exclude_files = ['search.html'] +epub_exclude_files = ["search.html"] # Example configuration for intersphinx: refer to the Python standard library. -intersphinx_mapping = {'https://docs.python.org/': None} +intersphinx_mapping = {"https://docs.python.org/": None} -pygments_style = 'sphinx' +pygments_style = "sphinx" diff --git a/examples/async-clients/client/entrypoint.py b/examples/async-clients/client/entrypoint.py index 220b5299b..0b8e10668 100644 --- a/examples/async-clients/client/entrypoint.py +++ b/examples/async-clients/client/entrypoint.py @@ -79,7 +79,6 @@ def make_data(n_min=50, n_max=100): def train(in_model_path, out_model_path): """Train model.""" - # Load model parameters = load_parameters(in_model_path) model = compile_model() diff --git a/examples/async-clients/run_clients.py b/examples/async-clients/run_clients.py index 82da30ad9..f2ce72291 100644 --- a/examples/async-clients/run_clients.py +++ b/examples/async-clients/run_clients.py @@ -68,7 +68,6 @@ def run_client(online_for=120, name="client"): This is repeated for N_CYCLES. """ - conf = copy.deepcopy(client_config) conf["name"] = name diff --git a/examples/flower-client/client/entrypoint.py b/examples/flower-client/client/entrypoint.py index 1a9a8b8cf..a790644ef 100755 --- a/examples/flower-client/client/entrypoint.py +++ b/examples/flower-client/client/entrypoint.py @@ -14,7 +14,6 @@ def _get_node_id(): """Get client number from environment variable.""" - number = os.environ.get("CLIENT_NUMBER", "0") return int(number) diff --git a/examples/mnist-keras/client/entrypoint.py b/examples/mnist-keras/client/entrypoint.py index 5420a78bb..1ed8f2f77 100755 --- a/examples/mnist-keras/client/entrypoint.py +++ b/examples/mnist-keras/client/entrypoint.py @@ -135,7 +135,6 @@ def validate(in_model_path, out_json_path, data_path=None): :param data_path: The path to the data file. :type data_path: str """ - # Load data x_train, y_train = load_data(data_path) x_test, y_test = load_data(data_path, is_train=False) diff --git a/fedn/cli/client_cmd.py b/fedn/cli/client_cmd.py index e72f29569..80b0b3353 100644 --- a/fedn/cli/client_cmd.py +++ b/fedn/cli/client_cmd.py @@ -15,7 +15,6 @@ def validate_client_config(config): :param config: Client config (dict). """ - try: if config["discover_host"] is None or config["discover_host"] == "": raise InvalidClientConfig("Missing required configuration: discover_host") @@ -28,9 +27,7 @@ def validate_client_config(config): @main.group("client") @click.pass_context def client_cmd(ctx): - """ - - :param ctx: + """:param ctx: """ pass @@ -43,8 +40,7 @@ def client_cmd(ctx): @client_cmd.command("list") @click.pass_context def list_clients(ctx, protocol: str, host: str, port: str, token: str = None, n_max: int = None): - """ - Return: + """Return: ------ - count: number of clients - result: list of clients @@ -114,9 +110,7 @@ def client_cmd( reconnect_after_missed_heartbeat, verbosity, ): - """ - - :param ctx: + """:param ctx: :param discoverhost: :param discoverport: :param token: diff --git a/fedn/cli/combiner_cmd.py b/fedn/cli/combiner_cmd.py index 2b4447437..02a797448 100644 --- a/fedn/cli/combiner_cmd.py +++ b/fedn/cli/combiner_cmd.py @@ -12,9 +12,7 @@ @main.group("combiner") @click.pass_context def combiner_cmd(ctx): - """ - - :param ctx: + """:param ctx: """ pass @@ -33,9 +31,7 @@ def combiner_cmd(ctx): @click.option("-in", "--init", required=False, default=None, help="Path to configuration file to (re)init combiner.") @click.pass_context def start_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, secure, verify, max_clients, init): - """ - - :param ctx: + """:param ctx: :param discoverhost: :param discoverport: :param token: @@ -76,8 +72,7 @@ def start_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, se @combiner_cmd.command("list") @click.pass_context def list_combiners(ctx, protocol: str, host: str, port: str, token: str = None, n_max: int = None): - """ - Return: + """Return: ------ - count: number of combiners - result: list of combiners diff --git a/fedn/cli/config_cmd.py b/fedn/cli/config_cmd.py index d5286997f..0b77260c3 100644 --- a/fedn/cli/config_cmd.py +++ b/fedn/cli/config_cmd.py @@ -21,8 +21,7 @@ @main.group("config", invoke_without_command=True) @click.pass_context def config_cmd(ctx): - """ - - Configuration commands for the FEDn CLI. + """- Configuration commands for the FEDn CLI. """ if ctx.invoked_subcommand is None: click.echo("\n--- FEDn Cli Configuration ---\n") diff --git a/fedn/cli/main.py b/fedn/cli/main.py index 52276c418..d6f912e62 100644 --- a/fedn/cli/main.py +++ b/fedn/cli/main.py @@ -9,8 +9,6 @@ @click.group(context_settings=CONTEXT_SETTINGS) @click.pass_context def main(ctx): - """ - - :param ctx: + """:param ctx: """ ctx.obj = dict() diff --git a/fedn/cli/model_cmd.py b/fedn/cli/model_cmd.py index e44793a9f..80a8f795e 100644 --- a/fedn/cli/model_cmd.py +++ b/fedn/cli/model_cmd.py @@ -8,9 +8,7 @@ @main.group("model") @click.pass_context def model_cmd(ctx): - """ - - :param ctx: + """:param ctx: """ pass @@ -23,8 +21,7 @@ def model_cmd(ctx): @model_cmd.command("list") @click.pass_context def list_models(ctx, protocol: str, host: str, port: str, token: str = None, n_max: int = None): - """ - Return: + """Return: ------ - count: number of models - result: list of models diff --git a/fedn/cli/package_cmd.py b/fedn/cli/package_cmd.py index 6d503d414..3c78d9944 100644 --- a/fedn/cli/package_cmd.py +++ b/fedn/cli/package_cmd.py @@ -13,9 +13,7 @@ @main.group("package") @click.pass_context def package_cmd(ctx): - """ - - :param ctx: + """:param ctx: """ pass @@ -51,8 +49,7 @@ def create_cmd(ctx, path, name): @package_cmd.command("list") @click.pass_context def list_packages(ctx, protocol: str, host: str, port: str, token: str = None, n_max: int = None): - """ - Return: + """Return: ------ - count: number of packages - result: list of packages diff --git a/fedn/cli/round_cmd.py b/fedn/cli/round_cmd.py index ca23cafe7..ac42f43ef 100644 --- a/fedn/cli/round_cmd.py +++ b/fedn/cli/round_cmd.py @@ -8,9 +8,7 @@ @main.group("round") @click.pass_context def round_cmd(ctx): - """ - - :param ctx: + """:param ctx: """ pass @@ -23,8 +21,7 @@ def round_cmd(ctx): @round_cmd.command("list") @click.pass_context def list_rounds(ctx, protocol: str, host: str, port: str, token: str = None, n_max: int = None): - """ - Return: + """Return: ------ - count: number of rounds - result: list of rounds diff --git a/fedn/cli/run_cmd.py b/fedn/cli/run_cmd.py index b9fe4528e..123f17320 100644 --- a/fedn/cli/run_cmd.py +++ b/fedn/cli/run_cmd.py @@ -17,9 +17,7 @@ def get_statestore_config_from_file(init): - """ - - :param init: + """:param init: :return: """ with open(init, "r") as file: @@ -43,9 +41,7 @@ def check_helper_config_file(config): @main.group("run") @click.pass_context def run_cmd(ctx): - """ - - :param ctx: + """:param ctx: """ pass @@ -125,9 +121,7 @@ def client_cmd( reconnect_after_missed_heartbeat, verbosity, ): - """ - - :param ctx: + """:param ctx: :param discoverhost: :param discoverport: :param token: @@ -201,9 +195,7 @@ def client_cmd( @click.option("-in", "--init", required=False, default=None, help="Path to configuration file to (re)init combiner.") @click.pass_context def combiner_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, secure, verify, max_clients, init): - """ - - :param ctx: + """:param ctx: :param discoverhost: :param discoverport: :param token: diff --git a/fedn/cli/session_cmd.py b/fedn/cli/session_cmd.py index 55597b5b3..65db98c69 100644 --- a/fedn/cli/session_cmd.py +++ b/fedn/cli/session_cmd.py @@ -8,9 +8,7 @@ @main.group("session") @click.pass_context def session_cmd(ctx): - """ - - :param ctx: + """:param ctx: """ pass @@ -23,8 +21,7 @@ def session_cmd(ctx): @session_cmd.command("list") @click.pass_context def list_sessions(ctx, protocol: str, host: str, port: str, token: str = None, n_max: int = None): - """ - Return: + """Return: ------ - count: number of sessions - result: list of sessions diff --git a/fedn/cli/shared.py b/fedn/cli/shared.py index 2500d9e2b..d32f4ff43 100644 --- a/fedn/cli/shared.py +++ b/fedn/cli/shared.py @@ -65,8 +65,7 @@ def get_client_package_dir(path: str) -> str: # Print response from api (list of entities) def print_response(response, entity_name: str): - """ - Prints the api response to the cli. + """Prints the api response to the cli. :param response: type: array description: list of entities diff --git a/fedn/cli/status_cmd.py b/fedn/cli/status_cmd.py index a4f17e349..078acaf13 100644 --- a/fedn/cli/status_cmd.py +++ b/fedn/cli/status_cmd.py @@ -8,9 +8,7 @@ @main.group("status") @click.pass_context def status_cmd(ctx): - """ - - :param ctx: + """:param ctx: """ pass @@ -23,8 +21,7 @@ def status_cmd(ctx): @status_cmd.command("list") @click.pass_context def list_statuses(ctx, protocol: str, host: str, port: str, token: str = None, n_max: int = None): - """ - Return: + """Return: ------ - count: number of statuses - result: list of statuses diff --git a/fedn/cli/validation_cmd.py b/fedn/cli/validation_cmd.py index 055be0c65..4bf4e63fa 100644 --- a/fedn/cli/validation_cmd.py +++ b/fedn/cli/validation_cmd.py @@ -8,9 +8,7 @@ @main.group("validation") @click.pass_context def validation_cmd(ctx): - """ - - :param ctx: + """:param ctx: """ pass @@ -23,8 +21,7 @@ def validation_cmd(ctx): @validation_cmd.command("list") @click.pass_context def list_validations(ctx, protocol: str, host: str, port: str, token: str = None, n_max: int = None): - """ - Return: + """Return: ------ - count: number of validations - result: list of validations diff --git a/fedn/common/certificate/certificate.py b/fedn/common/certificate/certificate.py index 857a05e7c..3cb09016c 100644 --- a/fedn/common/certificate/certificate.py +++ b/fedn/common/certificate/certificate.py @@ -9,8 +9,7 @@ class Certificate: - """ - Utility to generate unsigned certificates. + """Utility to generate unsigned certificates. """ @@ -37,8 +36,7 @@ def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", cre def gen_keypair( self, ): - """ - Generate keypair. + """Generate keypair. """ key = crypto.PKey() @@ -65,9 +63,7 @@ def gen_keypair( certfile.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) def set_keypair_raw(self, certificate, privatekey): - """ - - :param certificate: + """:param certificate: :param privatekey: """ with open(self.key_path, "wb") as keyfile: @@ -77,9 +73,7 @@ def set_keypair_raw(self, certificate, privatekey): certfile.write(crypto.dump_certificate(crypto.FILETYPE_PEM, certificate)) def get_keypair_raw(self): - """ - - :return: + """:return: """ with open(self.key_path, "rb") as keyfile: key_buf = keyfile.read() @@ -88,9 +82,7 @@ def get_keypair_raw(self): return copy.deepcopy(cert_buf), copy.deepcopy(key_buf) def get_key(self): - """ - - :return: + """:return: """ with open(self.key_path, "rb") as keyfile: key_buf = keyfile.read() @@ -98,9 +90,7 @@ def get_key(self): return key def get_cert(self): - """ - - :return: + """:return: """ with open(self.cert_path, "rb") as certfile: cert_buf = certfile.read() diff --git a/fedn/common/certificate/certificatemanager.py b/fedn/common/certificate/certificatemanager.py index ce165d862..172d799ed 100644 --- a/fedn/common/certificate/certificatemanager.py +++ b/fedn/common/certificate/certificatemanager.py @@ -4,8 +4,7 @@ class CertificateManager: - """ - Utility to handle certificates for both Reducer and Combiner services. + """Utility to handle certificates for both Reducer and Combiner services. """ @@ -16,8 +15,7 @@ def __init__(self, directory): self.load_all() def get_or_create(self, name): - """ - Look for an existing certificate, if not found, generate a self-signed certificate based on name. + """Look for an existing certificate, if not found, generate a self-signed certificate based on name. :param name: The name used when issuing the certificate. :return: A certificate @@ -33,8 +31,7 @@ def get_or_create(self, name): return cert def add(self, certificate): - """ - Add certificate to certificate list. + """Add certificate to certificate list. :param certificate: :return: Success status (True, False) @@ -46,8 +43,7 @@ def add(self, certificate): return False def load_all(self): - """ - Load all certificates and add to certificates list. + """Load all certificates and add to certificates list. """ for filename in sorted(os.listdir(self.directory)): @@ -59,9 +55,7 @@ def load_all(self): self.certificates.append(c) def find(self, name): - """ - - :param name: Name of certificate + """:param name: Name of certificate :return: certificate if successful, else None """ for cert in self.certificates: diff --git a/fedn/common/log_config.py b/fedn/common/log_config.py index b8aa1218b..0d3ddb96c 100644 --- a/fedn/common/log_config.py +++ b/fedn/common/log_config.py @@ -62,8 +62,7 @@ def emit(self, record): def set_log_level_from_string(level_str): - """ - Set the log level based on a string input. + """Set the log level based on a string input. """ # Mapping of string representation to logging constants level_mapping = { @@ -85,8 +84,7 @@ def set_log_level_from_string(level_str): def set_log_stream(log_file): - """ - Redirect the log stream to a specified file, if log_file is set. + """Redirect the log stream to a specified file, if log_file is set. """ if not log_file: return diff --git a/fedn/network/api/client.py b/fedn/network/api/client.py index 43678fbcf..cd0ca5a7a 100644 --- a/fedn/network/api/client.py +++ b/fedn/network/api/client.py @@ -497,7 +497,6 @@ def get_session(self, id: str): :return: Session. :rtype: dict """ - response = requests.get(self._get_url_api_v1(f"sessions/{id}"), self.verify, headers=self.headers) _json = response.json() diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 718cd8a18..663add77e 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -194,7 +194,6 @@ def set_compute_package(self, file, helper_type: str, name: str = None, descript :return: A json response with success or failure message. :rtype: :class:`flask.Response` """ - if self.control.state() == ReducerState.instructing or self.control.state() == ReducerState.monitoring: return ( jsonify( @@ -307,7 +306,6 @@ def list_compute_packages(self, limit: str = None, skip: str = None, include_act :return: All compute packages as a json response. :rtype: :class:`flask.Response` """ - if limit is not None and skip is not None: limit = int(limit) skip = int(skip) @@ -397,7 +395,6 @@ def _create_checksum(self, name=None): :return: Success or failure boolean, message and the checksum. :rtype: bool, str, str """ - if name is None: name, message = self._get_compute_package_name() if name is None: @@ -418,7 +415,6 @@ def get_checksum(self, name): :return: The checksum as a json object. :rtype: :py:class:`flask.Response` """ - success, message, sum = self._create_checksum(name) if not success: return jsonify({"success": False, "message": message}), 404 @@ -816,7 +812,6 @@ def get_model_descendants(self, model_id: str, limit: str = None): :return: The model descendants for the given model as a json response. :rtype: :class:`flask.Response` """ - if model_id is None: return jsonify({"success": False, "message": "No model id provided."}) @@ -868,8 +863,7 @@ def get_all_rounds(self): "combiners": combiners, } payload[id] = info - else: - return jsonify(payload) + return jsonify(payload) def get_round(self, round_id): """Get a round. @@ -915,7 +909,6 @@ def get_plot_data(self, feature=None): :return: The plot data as json response. :rtype: :py:class:`flask.Response` """ - plot = Plot(self.control.statestore) try: @@ -942,7 +935,6 @@ def list_combiners_data(self, combiners): :return: The combiners data as json response. :rtype: :py:class:`flask.Response` """ - response = self.statestore.list_combiners_data(combiners) arr = [] diff --git a/fedn/network/api/network.py b/fedn/network/api/network.py index cb105f10a..045f8aa34 100644 --- a/fedn/network/api/network.py +++ b/fedn/network/api/network.py @@ -113,7 +113,6 @@ def add_client(self, client): :type client: dict :return: None """ - if self.get_client(client["name"]): return diff --git a/fedn/network/api/server.py b/fedn/network/api/server.py index 5f645e4e2..c196da762 100644 --- a/fedn/network/api/server.py +++ b/fedn/network/api/server.py @@ -105,7 +105,6 @@ def list_models(): Returns: _type_: json """ - session_id = request.args.get("session_id", None) limit = request.args.get("limit", None) skip = request.args.get("skip", None) @@ -161,7 +160,6 @@ def list_clients(): return: All clients as a json object. rtype: json """ - limit = request.args.get("limit", None) skip = request.args.get("skip", None) status = request.args.get("status", None) @@ -202,7 +200,6 @@ def list_combiners(): return: All combiners as a json object. rtype: json """ - limit = request.args.get("limit", None) skip = request.args.get("skip", None) @@ -389,7 +386,6 @@ def list_compute_packages(): return: The compute package as a json object. rtype: json """ - limit = request.args.get("limit", None) skip = request.args.get("skip", None) include_active = request.args.get("include_active", None) @@ -596,7 +592,6 @@ def add_client(): return: The response from control. rtype: json """ - json_data = request.get_json() remote_addr = request.remote_addr try: @@ -617,7 +612,6 @@ def list_combiners_data(): return: The response from control. rtype: json """ - json_data = request.get_json() # expects a list of combiner names (strings) in an array @@ -640,7 +634,6 @@ def get_plot_data(): """Get plot data from the statestore. rtype: json """ - try: feature = request.args.get("feature", None) response = api.get_plot_data(feature=feature) diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index c8a5afc4f..70fe005ff 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -114,7 +114,6 @@ def assign(self): :return: A configuration dictionary containing connection information for combiner. :rtype: dict """ - logger.info("Initiating assignment request.") while True: status, response = self.connector.assign() @@ -179,10 +178,9 @@ def connect(self, combiner_config): :param combiner_config: connection information for the combiner. :type combiner_config: dict """ - if self._connected: logger.info("Client is already attached. ") - return None + return # TODO use the combiner_config['certificate'] for setting up secure comms' host = combiner_config["host"] @@ -257,7 +255,6 @@ def _initialize_helper(self, combiner_config): :type combiner_config: dict :return: """ - if "helper_type" in combiner_config.keys(): self.helper = get_helper(combiner_config["helper_type"]) @@ -268,7 +265,6 @@ def _subscribe_to_combiner(self, config): | the discovery service (controller) and settings governing e.g. | client-combiner assignment behavior. """ - # Start sending heartbeats to the combiner. threading.Thread(target=self._send_heartbeat, kwargs={"update_frequency": config["heartbeat_interval"]}, daemon=True).start() @@ -420,7 +416,6 @@ def _listen_to_task_stream(self): :return: None :rtype: None """ - r = fedn.ClientAvailableMessage() r.sender.name = self.name r.sender.role = fedn.WORKER @@ -489,7 +484,6 @@ def _process_training_request(self, model_id: str, session_id: str = None): :return: The model id of the updated model, or None if the update failed. And a dict with metadata. :rtype: tuple """ - self.send_status("\t Starting processing of training request for model_id {}".format(model_id), sesssion_id=session_id) self.state = ClientState.training @@ -740,7 +734,6 @@ def send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None, :param request: The request message. :type request: fedn.Request """ - if not self._connected: logger.info("SendStatus: Client disconnected.") return diff --git a/fedn/network/clients/connect.py b/fedn/network/clients/connect.py index efeb3d1e9..09450c5ab 100644 --- a/fedn/network/clients/connect.py +++ b/fedn/network/clients/connect.py @@ -67,8 +67,7 @@ def __init__(self, host, port, token, name, remote_package, force_ssl=False, ver logger.info("Setting connection string to {}.".format(self.connect_string)) def assign(self): - """ - Connect client to FEDn network discovery service, ask for combiner assignment. + """Connect client to FEDn network discovery service, ask for combiner assignment. :return: Tuple with assingment status, combiner connection information if sucessful, else None. :rtype: tuple(:class:`fedn.network.clients.connect.Status`, str) @@ -127,8 +126,7 @@ def assign(self): return Status.Unassigned, None def refresh_token(self): - """ - Refresh client token. + """Refresh client token. :return: Tuple with assingment status, combiner connection information if sucessful, else None. :rtype: tuple(:class:`fedn.network.clients.connect.Status`, str) diff --git a/fedn/network/clients/package.py b/fedn/network/clients/package.py index 54f45b883..f99d12d49 100644 --- a/fedn/network/clients/package.py +++ b/fedn/network/clients/package.py @@ -153,7 +153,6 @@ def dispatcher(self, run_path): :return: Dispatcher object :rtype: :class:`fedn.utils.dispatcher.Dispatcher` """ - self.dispatch_config = _read_yaml_file(os.path.join(run_path, "fedn.yaml")) dispatcher = Dispatcher(self.dispatch_config, run_path) diff --git a/fedn/network/combiner/aggregators/aggregatorbase.py b/fedn/network/combiner/aggregators/aggregatorbase.py index e0053cb6e..0a9c33f43 100644 --- a/fedn/network/combiner/aggregators/aggregatorbase.py +++ b/fedn/network/combiner/aggregators/aggregatorbase.py @@ -86,8 +86,8 @@ def _validate_model_update(self, model_update): :return: True if the model update is valid, False otherwise. :rtype: bool """ - data = json.loads(model_update.meta)['training_metadata'] - if 'num_examples' not in data.keys(): + data = json.loads(model_update.meta)["training_metadata"] + if "num_examples" not in data.keys(): logger.error("AGGREGATOR({}): Model validation failed, num_examples missing in metadata.".format(self.name)) return False return True @@ -120,21 +120,21 @@ def load_model_update(self, model_update, helper): model = self.round_handler.load_model_update(helper, model_id) # Get relevant metadata metadata = json.loads(model_update.meta) - if 'config' in metadata.keys(): + if "config" in metadata.keys(): # Used in Python client - config = json.loads(metadata['config']) + config = json.loads(metadata["config"]) else: # Used in C++ client config = json.loads(model_update.config) - training_metadata = metadata['training_metadata'] - training_metadata['round_id'] = config['round_id'] + training_metadata = metadata["training_metadata"] + training_metadata["round_id"] = config["round_id"] return model, training_metadata def get_state(self): """ Get the state of the aggregator's queue, including the number of model updates.""" state = { - 'queue_len': self.model_updates.qsize() + "queue_len": self.model_updates.qsize() } return state diff --git a/fedn/network/combiner/aggregators/fedavg.py b/fedn/network/combiner/aggregators/fedavg.py index 19ce84803..9ed0adf3c 100644 --- a/fedn/network/combiner/aggregators/fedavg.py +++ b/fedn/network/combiner/aggregators/fedavg.py @@ -21,7 +21,6 @@ class Aggregator(AggregatorBase): def __init__(self, storage, server, modelservice, round_handler): """Constructor method""" - super().__init__(storage, server, modelservice, round_handler) self.name = "fedavg" @@ -41,10 +40,9 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): :return: The global model and metadata :rtype: tuple """ - data = {} - data['time_model_load'] = 0.0 - data['time_model_aggregation'] = 0.0 + data["time_model_load"] = 0.0 + data["time_model_aggregation"] = 0.0 model = None nr_aggregated_models = 0 @@ -67,13 +65,13 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): "AGGREGATOR({}): Processing model update {}, metadata: {} ".format(self.name, model_update.model_update_id, metadata)) # Increment total number of examples - total_examples += metadata['num_examples'] + total_examples += metadata["num_examples"] if nr_aggregated_models == 0: model = model_next else: model = helper.increment_average( - model, model_next, metadata['num_examples'], total_examples) + model, model_next, metadata["num_examples"], total_examples) nr_aggregated_models += 1 # Delete model from storage @@ -87,7 +85,7 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): "AGGREGATOR({}): Error encoutered while processing model update {}, skipping this update.".format(self.name, e)) self.model_updates.task_done() - data['nr_aggregated_models'] = nr_aggregated_models + data["nr_aggregated_models"] = nr_aggregated_models logger.info("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models)) return model, data diff --git a/fedn/network/combiner/aggregators/fedopt.py b/fedn/network/combiner/aggregators/fedopt.py index 305340f10..5041e097f 100644 --- a/fedn/network/combiner/aggregators/fedopt.py +++ b/fedn/network/combiner/aggregators/fedopt.py @@ -55,18 +55,17 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): :return: The global model and metadata :rtype: tuple """ - data = {} - data['time_model_load'] = 0.0 - data['time_model_aggregation'] = 0.0 + data["time_model_load"] = 0.0 + data["time_model_aggregation"] = 0.0 # Define parameter schema parameter_schema = { - 'serveropt': str, - 'learning_rate': float, - 'beta1': float, - 'beta2': float, - 'tau': float, + "serveropt": str, + "learning_rate": float, + "beta1": float, + "beta2": float, + "tau": float, } try: @@ -77,11 +76,11 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): # Default hyperparameters. Note that these may need fine tuning. default_parameters = { - 'serveropt': 'adam', - 'learning_rate': 1e-3, - 'beta1': 0.9, - 'beta2': 0.99, - 'tau': 1e-4, + "serveropt": "adam", + "learning_rate": 1e-3, + "beta1": 0.9, + "beta2": 0.99, + "tau": 1e-4, } # Validate parameters @@ -119,7 +118,7 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): "AGGREGATOR({}): Processing model update {}".format(self.name, model_update.model_update_id)) # Increment total number of examples - total_examples += metadata['num_examples'] + total_examples += metadata["num_examples"] if nr_aggregated_models == 0: model_old = self.round_handler.load_model_update(helper, model_update.model_id) @@ -127,7 +126,7 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): else: pseudo_gradient_next = helper.subtract(model_next, model_old) pseudo_gradient = helper.increment_average( - pseudo_gradient, pseudo_gradient_next, metadata['num_examples'], total_examples) + pseudo_gradient, pseudo_gradient_next, metadata["num_examples"], total_examples) nr_aggregated_models += 1 # Delete model from storage @@ -141,17 +140,17 @@ def combine_models(self, helper=None, delete_models=True, parameters=None): "AGGREGATOR({}): Error encoutered while processing model update {}, skipping this update.".format(self.name, e)) self.model_updates.task_done() - if parameters['serveropt'] == 'adam': + if parameters["serveropt"] == "adam": model = self.serveropt_adam(helper, pseudo_gradient, model_old, parameters) - elif parameters['serveropt'] == 'yogi': + elif parameters["serveropt"] == "yogi": model = self.serveropt_yogi(helper, pseudo_gradient, model_old, parameters) - elif parameters['serveropt'] == 'adagrad': + elif parameters["serveropt"] == "adagrad": model = self.serveropt_adagrad(helper, pseudo_gradient, model_old, parameters) else: logger.error("Unsupported server optimizer passed to FedOpt.") return None, data - data['nr_aggregated_models'] = nr_aggregated_models + data["nr_aggregated_models"] = nr_aggregated_models logger.info("AGGREGATOR({}): Aggregation completed, aggregated {} models.".format(self.name, nr_aggregated_models)) return model, data @@ -170,10 +169,10 @@ def serveropt_adam(self, helper, pseudo_gradient, model_old, parameters): :return: new model weights. :rtype: as defined by helper. """ - beta1 = parameters['beta1'] - beta2 = parameters['beta2'] - learning_rate = parameters['learning_rate'] - tau = parameters['tau'] + beta1 = parameters["beta1"] + beta2 = parameters["beta2"] + learning_rate = parameters["learning_rate"] + tau = parameters["tau"] if not self.v: self.v = helper.ones(pseudo_gradient, math.pow(tau, 2)) @@ -206,11 +205,10 @@ def serveropt_yogi(self, helper, pseudo_gradient, model_old, parameters): :return: new model weights. :rtype: as defined by helper. """ - - beta1 = parameters['beta1'] - beta2 = parameters['beta2'] - learning_rate = parameters['learning_rate'] - tau = parameters['tau'] + beta1 = parameters["beta1"] + beta2 = parameters["beta2"] + learning_rate = parameters["learning_rate"] + tau = parameters["tau"] if not self.v: self.v = helper.ones(pseudo_gradient, math.pow(tau, 2)) @@ -245,10 +243,9 @@ def serveropt_adagrad(self, helper, pseudo_gradient, model_old, parameters): :return: new model weights. :rtype: as defined by helper. """ - - beta1 = parameters['beta1'] - learning_rate = parameters['learning_rate'] - tau = parameters['tau'] + beta1 = parameters["beta1"] + learning_rate = parameters["learning_rate"] + tau = parameters["tau"] if not self.v: self.v = helper.ones(pseudo_gradient, math.pow(tau, 2)) diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index f19674b73..450b8b689 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -59,7 +59,6 @@ class Combiner(rpc.CombinerServicer, rpc.ReducerServicer, rpc.ConnectorServicer, def __init__(self, config): """Initialize Combiner server.""" - set_log_level_from_string(config.get("verbosity", "INFO")) set_log_stream(config.get("logfile", None)) @@ -327,11 +326,9 @@ def _list_active_clients(self, channel): if status != "online": self.clients[client]["status"] = "online" clients["update_active_clients"].append(client) - else: - # If client has changed status, update statestore - if status == "online": - self.clients[client]["status"] = "offline" - clients["update_offline_clients"].append(client) + elif status == "online": + self.clients[client]["status"] = "offline" + clients["update_offline_clients"].append(client) # Update statestore with client status if len(clients["update_active_clients"]) > 0: self.statestore.update_client_status(clients["update_active_clients"], "online") @@ -369,7 +366,6 @@ def _send_status(self, status): :param status: the status to report :type status: :class:`fedn.network.grpc.fedn_pb2.Status` """ - self.statestore.report_status(status) def _flush_model_update_queue(self): @@ -377,7 +373,6 @@ def _flush_model_update_queue(self): :return: True if successful, else False """ - q = self.round_handler.aggregator.model_updates try: with q.mutex: @@ -588,7 +583,6 @@ def TaskStream(self, response, context): :param context: the context :type context: :class:`grpc._server._Context` """ - client = response.sender metadata = context.invocation_metadata() if metadata: @@ -643,7 +637,6 @@ def register_model_validation(self, validation): :param validation: the model validation :type validation: :class:`fedn.network.grpc.fedn_pb2.ModelValidation` """ - self.statestore.report_validation(validation) def SendModelValidation(self, request, context): @@ -668,7 +661,6 @@ def SendModelValidation(self, request, context): def run(self): """Start the server.""" - logger.info("COMBINER: {} started, ready for gRPC requests.".format(self.id)) try: while True: diff --git a/fedn/network/combiner/connect.py b/fedn/network/combiner/connect.py index e144baa94..854c8e103 100644 --- a/fedn/network/combiner/connect.py +++ b/fedn/network/combiner/connect.py @@ -67,7 +67,6 @@ def __init__(self, host, port, myhost, fqdn, myport, token, name, secure=False, :param verify: Verify the connection to the discovery service. :type verify: bool """ - self.host = host self.fqdn = fqdn self.port = port @@ -92,8 +91,7 @@ def __init__(self, host, port, myhost, fqdn, myport, token, name, secure=False, logger.info("Setting connection string to {}".format(self.connect_string)) def announce(self): - """ - Announce combiner to FEDn network via discovery service (REST-API). + """Announce combiner to FEDn network via discovery service (REST-API). :return: Tuple with announcement Status, FEDn network configuration if sucessful, else None. :rtype: :class:`fedn.network.combiner.connect.Status`, str diff --git a/fedn/network/combiner/interfaces.py b/fedn/network/combiner/interfaces.py index f247c2bc1..bf10a00f1 100644 --- a/fedn/network/combiner/interfaces.py +++ b/fedn/network/combiner/interfaces.py @@ -113,7 +113,6 @@ def to_dict(self): :return: A dictionary with the combiner configuration. :rtype: dict """ - data = { "parent": self.parent, "name": self.name, @@ -168,7 +167,6 @@ def get_key(self): def flush_model_update_queue(self): """Reset the model update queue on the combiner.""" - channel = Channel(self.address, self.port, self.certificate).get_channel() control = rpc.ControlStub(channel) @@ -188,7 +186,6 @@ def set_aggregator(self, aggregator): :param aggregator: The name of the aggregator module. :type config: str """ - channel = Channel(self.address, self.port, self.certificate).get_channel() control = rpc.ControlStub(channel) @@ -240,7 +237,6 @@ def get_model(self, id, timeout=10): :return: A file-like object containing the model. :rtype: :class:`io.BytesIO`, None if the model is not available. """ - channel = Channel(self.address, self.port, self.certificate).get_channel() modelservice = rpc.ModelServiceStub(channel) diff --git a/fedn/network/combiner/modelservice.py b/fedn/network/combiner/modelservice.py index 0b50edbc7..b5e7bff73 100644 --- a/fedn/network/combiner/modelservice.py +++ b/fedn/network/combiner/modelservice.py @@ -112,7 +112,6 @@ def get_model(self, id): :return: A BytesIO object containing the model. :rtype: :class:`io.BytesIO`, None if model does not exist. """ - data = BytesIO() data.seek(0, 0) diff --git a/fedn/network/combiner/roundhandler.py b/fedn/network/combiner/roundhandler.py index 2a8436e01..4edc04b6e 100644 --- a/fedn/network/combiner/roundhandler.py +++ b/fedn/network/combiner/roundhandler.py @@ -34,7 +34,6 @@ class RoundHandler: def __init__(self, storage, server, modelservice): """Initialize the RoundHandler.""" - self.round_configs = queue.Queue() self.storage = storage self.server = server @@ -67,7 +66,6 @@ def load_model_update(self, helper, model_id): :param model_id: The ID of the model update, UUID in str format :type model_id: str """ - model_str = self.load_model_update_str(model_id) if model_str: try: @@ -119,7 +117,6 @@ def waitforit(self, config, buffer_size=100, polling_interval=0.1): :param polling_interval: The polling interval, defaults to 0.1 :type polling_interval: float, optional """ - time_window = float(config["round_timeout"]) tt = 0.0 @@ -140,7 +137,6 @@ def _training_round(self, config, clients): :return: an aggregated model and associated metadata :rtype: model, dict """ - logger.info("ROUNDHANDLER: Initiating training round, participating clients: {}".format(clients)) meta = {} @@ -208,7 +204,6 @@ def stage_model(self, model_id, timeout_retry=3, retry=2): :param retry: Number of retries, defaults to 2 :type retry: int, optional """ - # If the model is already in memory at the server we do not need to do anything. if self.modelservice.temp_model_storage.exist(model_id): logger.info("Model already exists in memory, skipping model staging.") @@ -241,7 +236,6 @@ def _assign_round_clients(self, n, type="trainers"): :return: Set of clients :rtype: list """ - if type == "validators": clients = self.server.get_active_validators() elif type == "trainers": @@ -269,7 +263,6 @@ def _check_nr_round_clients(self, config): :return: True if the required number of clients are available, False otherwise. :rtype: bool """ - active = self.server.nr_active_trainers() if active >= int(config["clients_required"]): logger.info("Number of clients required ({0}) to start round met {1}.".format(config["clients_required"], active)) @@ -298,7 +291,6 @@ def execute_training_round(self, config): :return: metadata about the training round. :rtype: dict """ - logger.info("Processing training round, job_id {}".format(config["_job_id"])) data = {} diff --git a/fedn/network/controller/control.py b/fedn/network/controller/control.py index 99a59469c..a422383f0 100644 --- a/fedn/network/controller/control.py +++ b/fedn/network/controller/control.py @@ -75,7 +75,6 @@ class Control(ControlBase): def __init__(self, statestore): """Constructor method.""" - super().__init__(statestore) self.name = "DefaultControl" @@ -88,7 +87,6 @@ def session(self, config): :type config: dict """ - if self._state == ReducerState.instructing: logger.info("Controller already in INSTRUCTING state. A session is in progress.") return @@ -140,7 +138,6 @@ def round(self, session_config, round_id): : type round_id: str """ - self.create_round({"round_id": round_id, "status": "Pending"}) if len(self.network.get_combiners()) < 1: @@ -275,7 +272,6 @@ def reduce(self, combiners): : param combiners: dict of combiner names(key) and model IDs(value) to reduce : type combiners: dict """ - meta = {} meta["time_fetch_model"] = 0.0 meta["time_load_model"] = 0.0 @@ -322,7 +318,6 @@ def infer_instruct(self, config): : param config: configuration for the inference round """ - # Check/set instucting state if self.__state == ReducerState.instructing: logger.info("Already set in INSTRUCTING state") @@ -350,7 +345,6 @@ def inference_round(self, config): : param config: configuration for the inference round """ - # Init meta round_data = {} diff --git a/fedn/network/controller/controlbase.py b/fedn/network/controller/controlbase.py index d99bae40a..d667e01c4 100644 --- a/fedn/network/controller/controlbase.py +++ b/fedn/network/controller/controlbase.py @@ -113,17 +113,13 @@ def idle(self): return False def get_model_info(self): - """ - - :return: + """:return: """ return self.statestore.get_model_trail() # TODO: remove use statestore.get_events() instead def get_events(self): - """ - - :return: + """:return: """ return self.statestore.get_events() @@ -139,9 +135,7 @@ def get_latest_round(self): return round def get_compute_package_name(self): - """ - - :return: + """:return: """ definition = self.statestore.get_compute_package() if definition: @@ -159,9 +153,7 @@ def set_compute_package(self, filename, path): self.model_repository.set_compute_package(filename, path) def get_compute_package(self, compute_package=""): - """ - - :param compute_package: + """:param compute_package: :return: """ if compute_package == "": @@ -173,7 +165,6 @@ def get_compute_package(self, compute_package=""): def create_session(self, config, status="Initialized"): """Initialize a new session in backend db.""" - if "session_id" not in config.keys(): session_id = uuid.uuid4() config["session_id"] = str(session_id) @@ -196,7 +187,6 @@ def set_session_status(self, session_id, status): def create_round(self, round_data): """Initialize a new round in backend db.""" - self.statestore.create_round(round_data) def set_round_data(self, round_id, round_data): @@ -251,7 +241,6 @@ def commit(self, model_id, model=None, session_id=None): :param session_id: Unique identifier for the session :type session_id: str """ - helper = self.get_helper() if model is not None: logger.info("Saving model file temporarily to disk...") diff --git a/fedn/network/grpc/__init__.py b/fedn/network/grpc/__init__.py index ad5e023ab..19daa5e47 100644 --- a/fedn/network/grpc/__init__.py +++ b/fedn/network/grpc/__init__.py @@ -1 +1 @@ -__all__ = ['fedn_pb2', 'fedn_pb2_grpc'] +__all__ = ["fedn_pb2", "fedn_pb2_grpc"] diff --git a/fedn/network/grpc/server.py b/fedn/network/grpc/server.py index f953bf96a..4354a7aa5 100644 --- a/fedn/network/grpc/server.py +++ b/fedn/network/grpc/server.py @@ -4,8 +4,7 @@ from grpc_health.v1 import health, health_pb2_grpc import fedn.network.grpc.fedn_pb2_grpc as rpc -from fedn.common.log_config import (logger, set_log_level_from_string, - set_log_stream) +from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream from fedn.network.grpc.auth import JWTInterceptor @@ -14,8 +13,8 @@ class Server: def __init__(self, servicer, modelservicer, config): - set_log_level_from_string(config.get('verbosity', "INFO")) - set_log_stream(config.get('logfile', None)) + set_log_level_from_string(config.get("verbosity", "INFO")) + set_log_stream(config.get("logfile", None)) self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=350), interceptors=[JWTInterceptor()]) self.certificate = None @@ -34,15 +33,15 @@ def __init__(self, servicer, modelservicer, config): health_pb2_grpc.add_HealthServicer_to_server(self.health_servicer, self.server) - if config['secure']: + if config["secure"]: logger.info(f'Creating secure gRPCS server using certificate: {config["certificate"]}') server_credentials = grpc.ssl_server_credentials( - ((config['key'], config['certificate'],),)) + ((config["key"], config["certificate"],),)) self.server.add_secure_port( - '[::]:' + str(config['port']), server_credentials) + "[::]:" + str(config["port"]), server_credentials) else: logger.info("Creating gRPC server") - self.server.add_insecure_port('[::]:' + str(config['port'])) + self.server.add_insecure_port("[::]:" + str(config["port"])) def start(self): """ Start the gRPC server.""" diff --git a/fedn/network/loadbalancer/firstavailable.py b/fedn/network/loadbalancer/firstavailable.py index 13dc766b2..5de8be881 100644 --- a/fedn/network/loadbalancer/firstavailable.py +++ b/fedn/network/loadbalancer/firstavailable.py @@ -13,7 +13,6 @@ def __init__(self, network): def find_combiner(self): """Find the first available combiner.""" - for combiner in self.network.get_combiners(): if combiner.allowing_clients(): return combiner diff --git a/fedn/network/loadbalancer/leastpacked.py b/fedn/network/loadbalancer/leastpacked.py index a762701b0..786dd8de0 100644 --- a/fedn/network/loadbalancer/leastpacked.py +++ b/fedn/network/loadbalancer/leastpacked.py @@ -13,8 +13,7 @@ def __init__(self, network): super().__init__(network) def find_combiner(self): - """ - Find the combiner with the least number of attached clients. + """Find the combiner with the least number of attached clients. """ min_clients = None @@ -25,10 +24,7 @@ def find_combiner(self): if combiner.allowing_clients(): # Using default default Channel = 1, MODEL_UPDATE_REQUESTS nr_active_clients = len(combiner.list_active_clients()) - if not min_clients: - min_clients = nr_active_clients - selected_combiner = combiner - elif nr_active_clients < min_clients: + if not min_clients or nr_active_clients < min_clients: min_clients = nr_active_clients selected_combiner = combiner except CombinerUnavailableError: diff --git a/fedn/network/storage/models/__init__.py b/fedn/network/storage/models/__init__.py index fdfba1986..38135d697 100644 --- a/fedn/network/storage/models/__init__.py +++ b/fedn/network/storage/models/__init__.py @@ -1,4 +1,5 @@ """ The models package handles storing of model updates durign the federated training process. The functionality is used by the combiner service during aggregation of model updates from clients. By implementing the interface in the base class modelstorage.py, a developer may customize the -behaviour of the framework. """ +behaviour of the framework. +""" diff --git a/fedn/network/storage/models/memorymodelstorage.py b/fedn/network/storage/models/memorymodelstorage.py index 6a40a7ae0..54599fb28 100644 --- a/fedn/network/storage/models/memorymodelstorage.py +++ b/fedn/network/storage/models/memorymodelstorage.py @@ -31,9 +31,7 @@ def get(self, model_id): return obj def get_ptr(self, model_id): - """ - - :param model_id: + """:param model_id: :return: """ return self.models[model_id] diff --git a/fedn/network/storage/models/tempmodelstorage.py b/fedn/network/storage/models/tempmodelstorage.py index 214fac4d7..891f6ea07 100644 --- a/fedn/network/storage/models/tempmodelstorage.py +++ b/fedn/network/storage/models/tempmodelstorage.py @@ -41,9 +41,7 @@ def get(self, model_id): return obj def get_ptr(self, model_id): - """ - - :param model_id: + """:param model_id: :return: """ try: diff --git a/fedn/network/storage/s3/__init__.py b/fedn/network/storage/s3/__init__.py index 0befb7819..2e9f7d361 100644 --- a/fedn/network/storage/s3/__init__.py +++ b/fedn/network/storage/s3/__init__.py @@ -1,3 +1,4 @@ """ Module handling storage of objects in S3-compatible object storage. This functionality is used by the controller to store global models in the model trail in persistent storage. Currently implemented for MinIO, but a ' -developer can extend the framwork by implemeting the interface in base.py. """ +developer can extend the framwork by implemeting the interface in base.py. +""" diff --git a/fedn/network/storage/s3/miniorepository.py b/fedn/network/storage/s3/miniorepository.py index 9c86b8997..ff329856a 100644 --- a/fedn/network/storage/s3/miniorepository.py +++ b/fedn/network/storage/s3/miniorepository.py @@ -19,7 +19,6 @@ def __init__(self, config): :param config: Dictionary containing configuration for credentials and bucket names. :type config: dict """ - super().__init__() self.name = "MINIORepository" @@ -91,7 +90,6 @@ def delete_artifact(self, instance_name, bucket): :param bucket: Buckets to delete from :type bucket: str """ - try: self.client.remove_object(bucket, instance_name) except InvalidResponseError as err: diff --git a/fedn/network/storage/s3/repository.py b/fedn/network/storage/s3/repository.py index 18d36cdbb..c1704e5ca 100644 --- a/fedn/network/storage/s3/repository.py +++ b/fedn/network/storage/s3/repository.py @@ -73,7 +73,6 @@ def set_compute_package(self, name, compute_package, is_file=True): :type compute_pacakge: BytesIO or str file name. :param is_file: True if model is a file name, else False """ - try: self.client.set_artifact(str(name), compute_package, bucket=self.context_bucket, is_file=is_file) except Exception: @@ -100,7 +99,6 @@ def delete_compute_package(self, compute_package): :param compute_package: The name of the compute_package :type compute_package: str """ - try: self.client.delete_artifact(compute_package, bucket=[self.context_bucket]) except Exception: diff --git a/fedn/network/storage/statestore/mongostatestore.py b/fedn/network/storage/statestore/mongostatestore.py index a53a6e4d5..6bf3be4ff 100644 --- a/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/network/storage/statestore/mongostatestore.py @@ -140,7 +140,6 @@ def get_sessions(self, limit=None, skip=None, sort_key="_id", sort_order=pymongo :type sort_order: pymongo.ASCENDING or pymongo.DESCENDING :return: Dictionary of sessions in result (array of session objects) and count. """ - result = None if limit is not None and skip is not None: @@ -175,7 +174,6 @@ def set_latest_model(self, model_id, session_id=None): :type model_id: str :return: """ - committed_at = datetime.now() current_model = self.model.find_one({"key": "current_model"}) parent_model = None @@ -214,7 +212,6 @@ def get_initial_model(self): :return: The initial model id. None if no model is found. :rtype: str """ - result = self.model.find_one({"key": "model_trail"}, sort=[("committed_at", pymongo.ASCENDING)]) if result is None: return None @@ -252,7 +249,6 @@ def set_current_model(self, model_id: str): :type model_id: str :return: """ - try: committed_at = datetime.now() @@ -273,7 +269,6 @@ def get_latest_round(self): :return: The id of the most recent round. :rtype: ObjectId """ - return self.rounds.find_one(sort=[("_id", pymongo.DESCENDING)]) def get_round(self, id): @@ -284,7 +279,6 @@ def get_round(self, id): :return: round with id, reducer and combiners :rtype: ObjectId """ - return self.rounds.find_one({"round_id": str(id)}) def get_rounds(self): @@ -293,7 +287,6 @@ def get_rounds(self): :return: All rounds. :rtype: ObjectId """ - return self.rounds.find() def get_validations(self, **kwargs): @@ -304,7 +297,6 @@ def get_validations(self, **kwargs): :return: validations matching query :rtype: ObjectId """ - result = self.control.validations.find(kwargs) return result @@ -316,7 +308,6 @@ def set_active_compute_package(self, id: str): :return: True if successful. :rtype: bool """ - try: find = {"id": id} projection = {"_id": False, "key": False} @@ -344,7 +335,6 @@ def set_compute_package(self, file_name: str, storage_file_name: str, helper_typ :return: True if successful. :rtype: bool """ - obj = { "file_name": file_name, "storage_file_name": storage_file_name, @@ -396,7 +386,6 @@ def list_compute_packages(self, limit: int = None, skip: int = None, sort_key="c :return: Dictionary of compute packages in result and count. :rtype: dict """ - result = None count = None @@ -544,7 +533,6 @@ def get_model_descendants(self, model_id: str, limit: int): :return: List of model descendants. :rtype: list """ - model: object = self.model.find_one({"key": "models", "model": model_id}) current_model_id: str = model["model"] if model is not None else None result: list = [] @@ -684,7 +672,6 @@ def get_combiners(self, limit=None, skip=None, sort_key="updated_at", sort_order :return: Dictionary of combiners in result and count. :rtype: dict """ - result = None count = None @@ -713,7 +700,6 @@ def set_combiner(self, combiner_data): :type combiner_data: dict :return: """ - combiner_data["updated_at"] = str(datetime.now()) self.combiners.update_one({"name": combiner_data["name"]}, {"$set": combiner_data}, True) @@ -769,7 +755,6 @@ def list_clients(self, limit=None, skip=None, status=None, sort_key="last_seen", :type status: str :param sort_key: The key to sort by. """ - result = None count = None @@ -806,7 +791,6 @@ def list_combiners_data(self, combiners, sort_key="count", sort_order=pymongo.DE :return: list of combiner data. :rtype: list(ObjectId) """ - result = None try: diff --git a/fedn/network/storage/statestore/stores/client_store.py b/fedn/network/storage/statestore/stores/client_store.py index 05c03643a..5797fab7d 100644 --- a/fedn/network/storage/statestore/stores/client_store.py +++ b/fedn/network/storage/statestore/stores/client_store.py @@ -18,16 +18,16 @@ def __init__(self, id: str, name: str, combiner: str, combiner_preferred: str, i self.updated_at = updated_at self.last_seen = last_seen - def from_dict(data: dict) -> 'Client': + def from_dict(data: dict) -> "Client": return Client( - id=str(data['_id']), - name=data['name'] if 'name' in data else None, - combiner=data['combiner'] if 'combiner' in data else None, - combiner_preferred=data['combiner_preferred'] if 'combiner_preferred' in data else None, - ip=data['ip'] if 'ip' in data else None, - status=data['status'] if 'status' in data else None, - updated_at=data['updated_at'] if 'updated_at' in data else None, - last_seen=data['last_seen'] if 'last_seen' in data else None + id=str(data["_id"]), + name=data["name"] if "name" in data else None, + combiner=data["combiner"] if "combiner" in data else None, + combiner_preferred=data["combiner_preferred"] if "combiner_preferred" in data else None, + ip=data["ip"] if "ip" in data else None, + status=data["status"] if "status" in data else None, + updated_at=data["updated_at"] if "updated_at" in data else None, + last_seen=data["last_seen"] if "last_seen" in data else None ) @@ -74,7 +74,7 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI """ response = super().list(limit, skip, sort_key or "last_seen", sort_order, use_typing=use_typing, **kwargs) - result = [Client.from_dict(item) for item in response['result']] if use_typing else response['result'] + result = [Client.from_dict(item) for item in response["result"]] if use_typing else response["result"] return { "count": response["count"], diff --git a/fedn/network/storage/statestore/stores/combiner_store.py b/fedn/network/storage/statestore/stores/combiner_store.py index d47386e8a..02495b66f 100644 --- a/fedn/network/storage/statestore/stores/combiner_store.py +++ b/fedn/network/storage/statestore/stores/combiner_store.py @@ -38,20 +38,20 @@ def __init__( self.status = status self.updated_at = updated_at - def from_dict(data: dict) -> 'Combiner': + def from_dict(data: dict) -> "Combiner": return Combiner( - id=str(data['_id']), - name=data['name'] if 'name' in data else None, - address=data['address'] if 'address' in data else None, - certificate=data['certificate'] if 'certificate' in data else None, - config=data['config'] if 'config' in data else None, - fqdn=data['fqdn'] if 'fqdn' in data else None, - ip=data['ip'] if 'ip' in data else None, - key=data['key'] if 'key' in data else None, - parent=data['parent'] if 'parent' in data else None, - port=data['port'] if 'port' in data else None, - status=data['status'] if 'status' in data else None, - updated_at=data['updated_at'] if 'updated_at' in data else None + id=str(data["_id"]), + name=data["name"] if "name" in data else None, + address=data["address"] if "address" in data else None, + certificate=data["certificate"] if "certificate" in data else None, + config=data["config"] if "config" in data else None, + fqdn=data["fqdn"] if "fqdn" in data else None, + ip=data["ip"] if "ip" in data else None, + key=data["key"] if "key" in data else None, + parent=data["parent"] if "parent" in data else None, + port=data["port"] if "port" in data else None, + status=data["status"] if "status" in data else None, + updated_at=data["updated_at"] if "updated_at" in data else None ) @@ -70,9 +70,9 @@ def get(self, id: str, use_typing: bool = False) -> Combiner: """ if ObjectId.is_valid(id): id_obj = ObjectId(id) - document = self.database[self.collection].find_one({'_id': id_obj}) + document = self.database[self.collection].find_one({"_id": id_obj}) else: - document = self.database[self.collection].find_one({'name': id}) + document = self.database[self.collection].find_one({"name": id}) if document is None: raise EntityNotFound(f"Entity with (id | name) {id} not found") @@ -107,7 +107,7 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI """ response = super().list(limit, skip, sort_key or "updated_at", sort_order, use_typing=use_typing, **kwargs) - result = [Combiner.from_dict(item) for item in response['result']] if use_typing else response['result'] + result = [Combiner.from_dict(item) for item in response["result"]] if use_typing else response["result"] return { "count": response["count"], diff --git a/fedn/network/storage/statestore/stores/model_store.py b/fedn/network/storage/statestore/stores/model_store.py index f72beefa7..172603405 100644 --- a/fedn/network/storage/statestore/stores/model_store.py +++ b/fedn/network/storage/statestore/stores/model_store.py @@ -19,14 +19,14 @@ def __init__(self, id: str, key: str, model: str, parent_model: str, session_id: self.session_id = session_id self.committed_at = committed_at - def from_dict(data: dict) -> 'Model': + def from_dict(data: dict) -> "Model": return Model( - id=str(data['_id']), - key=data['key'] if 'key' in data else None, - model=data['model'] if 'model' in data else None, - parent_model=data['parent_model'] if 'parent_model' in data else None, - session_id=data['session_id'] if 'session_id' in data else None, - committed_at=data['committed_at'] if 'committed_at' in data else None + id=str(data["_id"]), + key=data["key"] if "key" in data else None, + model=data["model"] if "model" in data else None, + parent_model=data["parent_model"] if "parent_model" in data else None, + session_id=data["session_id"] if "session_id" in data else None, + committed_at=data["committed_at"] if "committed_at" in data else None ) @@ -46,9 +46,9 @@ def get(self, id: str, use_typing: bool = False) -> Model: kwargs = {"key": "models"} if ObjectId.is_valid(id): id_obj = ObjectId(id) - kwargs['_id'] = id_obj + kwargs["_id"] = id_obj else: - kwargs['model'] = id + kwargs["model"] = id document = self.database[self.collection].find_one(kwargs) @@ -83,13 +83,13 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI example: {"key": "models"} return: A dictionary with the count and the result """ - kwargs['key'] = "models" + kwargs["key"] = "models" response = super().list(limit, skip, sort_key or "committed_at", sort_order, use_typing=use_typing, **kwargs) - result = [Model.from_dict(item) for item in response['result']] if use_typing else response['result'] + result = [Model.from_dict(item) for item in response["result"]] if use_typing else response["result"] return { - "count": response['count'], + "count": response["count"], "result": result } @@ -107,9 +107,9 @@ def list_descendants(self, id: str, limit: int, use_typing: bool = False) -> Lis kwargs = {"key": "models"} if ObjectId.is_valid(id): id_obj = ObjectId(id) - kwargs['_id'] = id_obj + kwargs["_id"] = id_obj else: - kwargs['model'] = id + kwargs["model"] = id model: object = self.database[self.collection].find_one(kwargs) @@ -150,9 +150,9 @@ def list_ancestors(self, id: str, limit: int, include_self: bool = False, revers kwargs = {"key": "models"} if ObjectId.is_valid(id): id_obj = ObjectId(id) - kwargs['_id'] = id_obj + kwargs["_id"] = id_obj else: - kwargs['model'] = id + kwargs["model"] = id model: object = self.database[self.collection].find_one(kwargs) @@ -191,5 +191,5 @@ def count(self, **kwargs) -> int: example: {"key": "models"} return: The count (int) """ - kwargs['key'] = "models" + kwargs["key"] = "models" return super().count(**kwargs) diff --git a/fedn/network/storage/statestore/stores/package_store.py b/fedn/network/storage/statestore/stores/package_store.py index 423b8716a..eb7154af2 100644 --- a/fedn/network/storage/statestore/stores/package_store.py +++ b/fedn/network/storage/statestore/stores/package_store.py @@ -32,21 +32,21 @@ def __init__( self.storage_file_name = storage_file_name self.active = active - def from_dict(data: dict, active_package: dict) -> 'Package': + def from_dict(data: dict, active_package: dict) -> "Package": active = False if active_package: if "id" in active_package and "id" in data: active = active_package["id"] == data["id"] return Package( - id=data['id'] if 'id' in data else None, - key=data['key'] if 'key' in data else None, - committed_at=data['committed_at'] if 'committed_at' in data else None, - description=data['description'] if 'description' in data else None, - file_name=data['file_name'] if 'file_name' in data else None, - helper=data['helper'] if 'helper' in data else None, - name=data['name'] if 'name' in data else None, - storage_file_name=data['storage_file_name'] if 'storage_file_name' in data else None, + id=data["id"] if "id" in data else None, + key=data["key"] if "key" in data else None, + committed_at=data["committed_at"] if "committed_at" in data else None, + description=data["description"] if "description" in data else None, + file_name=data["file_name"] if "file_name" in data else None, + helper=data["helper"] if "helper" in data else None, + name=data["name"] if "name" in data else None, + storage_file_name=data["storage_file_name"] if "storage_file_name" in data else None, active=active ) @@ -66,7 +66,7 @@ def get(self, id: str, use_typing: bool = False) -> Package: If True, and active property will be set based on the active package. return: The entity """ - document = self.database[self.collection].find_one({'id': id}) + document = self.database[self.collection].find_one({"id": id}) if document is None: raise EntityNotFound(f"Entity with id {id} not found") @@ -74,7 +74,7 @@ def get(self, id: str, use_typing: bool = False) -> Package: if not use_typing: return from_document(document) - response_active = self.database[self.collection].find_one({'key': 'active'}) + response_active = self.database[self.collection].find_one({"key": "active"}) return Package.from_dict(document, response_active) @@ -84,7 +84,7 @@ def get_active(self, use_typing: bool = False) -> Package: type: bool return: The entity """ - response = self.database[self.collection].find_one({'key': 'active'}) + response = self.database[self.collection].find_one({"key": "active"}) if response is None: raise EntityNotFound(f"Entity with id {id} not found") @@ -123,9 +123,9 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI response = super().list(limit, skip, sort_key or "committed_at", sort_order, use_typing=True, **kwargs) - response_active = self.database[self.collection].find_one({'key': 'active'}) + response_active = self.database[self.collection].find_one({"key": "active"}) - result = [Package.from_dict(item, response_active) for item in response['result']] + result = [Package.from_dict(item, response_active) for item in response["result"]] return { "count": response["count"], diff --git a/fedn/network/storage/statestore/stores/round_store.py b/fedn/network/storage/statestore/stores/round_store.py index 5afde0d7e..03af044c3 100644 --- a/fedn/network/storage/statestore/stores/round_store.py +++ b/fedn/network/storage/statestore/stores/round_store.py @@ -15,14 +15,14 @@ def __init__(self, id: str, round_id: str, status: str, round_config: dict, comb self.combiners = combiners self.round_data = round_data - def from_dict(data: dict) -> 'Round': + def from_dict(data: dict) -> "Round": return Round( - id=str(data['_id']), - round_id=data['round_id'] if 'round_id' in data else None, - status=data['status'] if 'status' in data else None, - round_config=data['round_config'] if 'round_config' in data else None, - combiners=data['combiners'] if 'combiners' in data else None, - round_data=data['round_data'] if 'round_data' in data else None + id=str(data["_id"]), + round_id=data["round_id"] if "round_id" in data else None, + status=data["status"] if "status" in data else None, + round_config=data["round_config"] if "round_config" in data else None, + combiners=data["combiners"] if "combiners" in data else None, + round_data=data["round_data"] if "round_data" in data else None ) @@ -70,7 +70,7 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI """ response = super().list(limit, skip, sort_key or "round_id", sort_order, use_typing=use_typing, **kwargs) - result = [Round.from_dict(item) for item in response['result']] if use_typing else response['result'] + result = [Round.from_dict(item) for item in response["result"]] if use_typing else response["result"] return { "count": response["count"], diff --git a/fedn/network/storage/statestore/stores/session_store.py b/fedn/network/storage/statestore/stores/session_store.py index c0a6b7da8..31c5e25b4 100644 --- a/fedn/network/storage/statestore/stores/session_store.py +++ b/fedn/network/storage/statestore/stores/session_store.py @@ -16,12 +16,12 @@ def __init__(self, id: str, session_id: str, status: str, session_config: dict = self.status = status self.session_config = session_config - def from_dict(data: dict) -> 'Session': + def from_dict(data: dict) -> "Session": return Session( - id=str(data['_id']), - session_id=data['session_id'] if 'session_id' in data else None, - status=data['status'] if 'status' in data else None, - session_config=data['session_config'] if 'session_config' in data else None + id=str(data["_id"]), + session_id=data["session_id"] if "session_id" in data else None, + status=data["status"] if "status" in data else None, + session_config=data["session_config"] if "session_config" in data else None ) @@ -40,9 +40,9 @@ def get(self, id: str, use_typing: bool = False) -> Session: """ if ObjectId.is_valid(id): id_obj = ObjectId(id) - document = self.database[self.collection].find_one({'_id': id_obj}) + document = self.database[self.collection].find_one({"_id": id_obj}) else: - document = self.database[self.collection].find_one({'session_id': id}) + document = self.database[self.collection].find_one({"session_id": id}) if document is None: raise EntityNotFound(f"Entity with (id | session_id) {id} not found") @@ -82,7 +82,7 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI """ response = super().list(limit, skip, sort_key or "session_id", sort_order, use_typing=use_typing, **kwargs) - result = [Session.from_dict(item) for item in response['result']] if use_typing else response['result'] + result = [Session.from_dict(item) for item in response["result"]] if use_typing else response["result"] return { "count": response["count"], diff --git a/fedn/network/storage/statestore/stores/shared.py b/fedn/network/storage/statestore/stores/shared.py index bf74296af..1ccce636e 100644 --- a/fedn/network/storage/statestore/stores/shared.py +++ b/fedn/network/storage/statestore/stores/shared.py @@ -1,7 +1,7 @@ def from_document(document: dict) -> dict: - document['id'] = str(document['_id']) - del document['_id'] + document["id"] = str(document["_id"]) + del document["_id"] return document diff --git a/fedn/network/storage/statestore/stores/status_store.py b/fedn/network/storage/statestore/stores/status_store.py index 4a62fc9bf..73fa8e588 100644 --- a/fedn/network/storage/statestore/stores/status_store.py +++ b/fedn/network/storage/statestore/stores/status_store.py @@ -31,18 +31,18 @@ def __init__( self.session_id = session_id self.sender = sender - def from_dict(data: dict) -> 'Status': + def from_dict(data: dict) -> "Status": return Status( - id=str(data['_id']), - status=data['status'] if 'status' in data else None, - timestamp=data['timestamp'] if 'timestamp' in data else None, - log_level=data['logLevel'] if 'logLevel' in data else None, - data=data['data'] if 'data' in data else None, - correlation_id=data['correlationId'] if 'correlationId' in data else None, - type=data['type'] if 'type' in data else None, - extra=data['extra'] if 'extra' in data else None, - session_id=data['sessionId'] if 'sessionId' in data else None, - sender=data['sender'] if 'sender' in data else None + id=str(data["_id"]), + status=data["status"] if "status" in data else None, + timestamp=data["timestamp"] if "timestamp" in data else None, + log_level=data["logLevel"] if "logLevel" in data else None, + data=data["data"] if "data" in data else None, + correlation_id=data["correlationId"] if "correlationId" in data else None, + type=data["type"] if "type" in data else None, + extra=data["extra"] if "extra" in data else None, + session_id=data["sessionId"] if "sessionId" in data else None, + sender=data["sender"] if "sender" in data else None ) @@ -91,6 +91,6 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI """ response = super().list(limit, skip, sort_key or "timestamp", sort_order, use_typing=use_typing, **kwargs) - result = [Status.from_dict(item) for item in response['result']] if use_typing else response['result'] + result = [Status.from_dict(item) for item in response["result"]] if use_typing else response["result"] - return {'count': response['count'], 'result': result} + return {"count": response["count"], "result": result} diff --git a/fedn/network/storage/statestore/stores/store.py b/fedn/network/storage/statestore/stores/store.py index 72a7de6e4..8334ac6b8 100644 --- a/fedn/network/storage/statestore/stores/store.py +++ b/fedn/network/storage/statestore/stores/store.py @@ -6,7 +6,7 @@ from .shared import EntityNotFound, from_document -T = TypeVar('T') +T = TypeVar("T") class Store(Generic[T]): @@ -23,7 +23,7 @@ def get(self, id: str, use_typing: bool = False) -> T: return: The entity """ id_obj = ObjectId(id) - document = self.database[self.collection].find_one({'_id': id_obj}) + document = self.database[self.collection].find_one({"_id": id_obj}) if document is None: raise EntityNotFound(f"Entity with id {id} not found") diff --git a/fedn/network/storage/statestore/stores/validation_store.py b/fedn/network/storage/statestore/stores/validation_store.py index 4e09072b1..a64d1a41e 100644 --- a/fedn/network/storage/statestore/stores/validation_store.py +++ b/fedn/network/storage/statestore/stores/validation_store.py @@ -29,17 +29,17 @@ def __init__( self.sender = sender self.receiver = receiver - def from_dict(data: dict) -> 'Validation': + def from_dict(data: dict) -> "Validation": return Validation( - id=str(data['_id']), - model_id=data['modelId'] if 'modelId' in data else None, - data=data['data'] if 'data' in data else None, - correlation_id=data['correlationId'] if 'correlationId' in data else None, - timestamp=data['timestamp'] if 'timestamp' in data else None, - session_id=data['sessionId'] if 'sessionId' in data else None, - meta=data['meta'] if 'meta' in data else None, - sender=data['sender'] if 'sender' in data else None, - receiver=data['receiver'] if 'receiver' in data else None + id=str(data["_id"]), + model_id=data["modelId"] if "modelId" in data else None, + data=data["data"] if "data" in data else None, + correlation_id=data["correlationId"] if "correlationId" in data else None, + timestamp=data["timestamp"] if "timestamp" in data else None, + session_id=data["sessionId"] if "sessionId" in data else None, + meta=data["meta"] if "meta" in data else None, + sender=data["sender"] if "sender" in data else None, + receiver=data["receiver"] if "receiver" in data else None ) @@ -89,8 +89,8 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI """ response = super().list(limit, skip, sort_key or "timestamp", sort_order, use_typing=use_typing, **kwargs) - result = [Validation.from_dict(item) for item in response['result']] if use_typing else response['result'] + result = [Validation.from_dict(item) for item in response["result"]] if use_typing else response["result"] return { - "count": response['count'], + "count": response["count"], "result": result } diff --git a/fedn/utils/dispatcher.py b/fedn/utils/dispatcher.py index 5d00021b1..d551b8053 100644 --- a/fedn/utils/dispatcher.py +++ b/fedn/utils/dispatcher.py @@ -1,5 +1,4 @@ -""" -Portions of this code are derived from the Apache 2.0 licensed project mlflow (https://mlflow.org/)., +"""Portions of this code are derived from the Apache 2.0 licensed project mlflow (https://mlflow.org/)., with modifications made by Scaleout Systems AB. Copyright (c) 2018 Databricks, Inc. @@ -60,15 +59,13 @@ def _install_python(version, pyenv_root=None, capture_output=False): def _is_virtualenv_available(): - """ - Returns True if virtualenv is available, otherwise False. + """Returns True if virtualenv is available, otherwise False. """ return shutil.which("virtualenv") is not None def _validate_virtualenv_is_available(): - """ - Validates virtualenv is available. If not, throws an `Exception` with a brief instruction + """Validates virtualenv is available. If not, throws an `Exception` with a brief instruction on how to install virtualenv. """ if not _is_virtualenv_available(): @@ -85,8 +82,7 @@ def _get_virtualenv_extra_env_vars(env_root_dir=None): def _get_python_env(python_env_file): - """ - Parses a python environment file and returns a dictionary with the parsed content. + """Parses a python environment file and returns a dictionary with the parsed content. """ if os.path.exists(python_env_file): return _PythonEnv.from_yaml(python_env_file) diff --git a/fedn/utils/environment.py b/fedn/utils/environment.py index 754cb5312..03d93eae7 100644 --- a/fedn/utils/environment.py +++ b/fedn/utils/environment.py @@ -1,5 +1,4 @@ -""" -Portions of this code are derived from the Apache 2.0 licensed project mlflow (https://mlflow.org/)., +"""Portions of this code are derived from the Apache 2.0 licensed project mlflow (https://mlflow.org/)., with modifications made by Scaleout Systems AB. Copyright (c) 2018 Databricks, Inc. @@ -28,8 +27,7 @@ class _PythonEnv: BUILD_PACKAGES = ("pip", "setuptools", "wheel") def __init__(self, name=None, python=None, build_dependencies=None, dependencies=None): - """ - Represents environment information for FEDn compute packages. + """Represents environment information for FEDn compute packages. Args: ---- diff --git a/fedn/utils/helpers/helperbase.py b/fedn/utils/helpers/helperbase.py index 3377d0336..109ab2a47 100644 --- a/fedn/utils/helpers/helperbase.py +++ b/fedn/utils/helpers/helperbase.py @@ -8,7 +8,6 @@ class HelperBase(ABC): def __init__(self): """Initialize helper.""" - self.name = self.__class__.__name__ @abstractmethod diff --git a/fedn/utils/helpers/plugins/numpyhelper.py b/fedn/utils/helpers/plugins/numpyhelper.py index ce6c29420..822ce929e 100644 --- a/fedn/utils/helpers/plugins/numpyhelper.py +++ b/fedn/utils/helpers/plugins/numpyhelper.py @@ -25,7 +25,6 @@ def increment_average(self, m1, m2, n, N): :return: Updated incremental weighted average. :rtype: list of numpy ndarray """ - return [np.add(x, n * (y - x) / N) for x, y in zip(m1, m2)] def add(self, m1, m2, a=1.0, b=1.0): @@ -38,7 +37,6 @@ def add(self, m1, m2, a=1.0, b=1.0): :return: Incremental weighted average of model weights. :rtype: list of ndarrays """ - return [x * a + y * b for x, y in zip(m1, m2)] def subtract(self, m1, m2, a=1.0, b=1.0): @@ -63,7 +61,6 @@ def divide(self, m1, m2): :return: m1/m2. :rtype: list of ndarrays """ - return [np.divide(x, y) for x, y in zip(m1, m2)] def multiply(self, m1, m2): @@ -76,7 +73,6 @@ def multiply(self, m1, m2): :return: m1.*m2 :rtype: list of ndarrays """ - return [np.multiply(x, y) for (x, y) in zip(m1, m2)] def sqrt(self, m1): @@ -89,7 +85,6 @@ def sqrt(self, m1): :return: sqrt(m1) :rtype: list of ndarrays """ - return [np.sqrt(x) for x in m1] def power(self, m1, a): @@ -102,7 +97,6 @@ def power(self, m1, a): :return: m1.^m2 :rtype: list of ndarrays """ - return [np.power(x, a) for x in m1] def norm(self, m): @@ -126,7 +120,6 @@ def sign(self, m): :return: sign(m) :rtype: list of ndarrays """ - return [np.sign(x) for x in m] def ones(self, m1, a): @@ -139,7 +132,6 @@ def ones(self, m1, a): :return: list of numpy arrays of the same shape as m1, filled with ones. :rtype: list of ndarrays """ - res = [] for x in m1: res.append(np.ones(np.shape(x)) * a) diff --git a/fedn/utils/plots.py b/fedn/utils/plots.py index 7901e2374..d04fffc4e 100644 --- a/fedn/utils/plots.py +++ b/fedn/utils/plots.py @@ -32,7 +32,6 @@ def __init__(self, statestore): # plot metrics from DB def _scalar_metrics(self, metrics): """Extract all scalar valued metrics from a MODEL_VALIDATON.""" - data = json.loads(metrics["data"]) data = json.loads(data["data"]) @@ -48,9 +47,7 @@ def _scalar_metrics(self, metrics): return valid_metrics def create_table_plot(self): - """ - - :return: + """:return: """ metrics = self.status.find_one({"type": "MODEL_VALIDATION"}) if metrics is None: @@ -111,9 +108,7 @@ def create_table_plot(self): return table def create_timeline_plot(self): - """ - - :return: + """:return: """ trace_data = [] x = [] @@ -184,9 +179,7 @@ def create_timeline_plot(self): return timeline def create_client_training_distribution(self): - """ - - :return: + """:return: """ training = [] for p in self.status.find({"type": "MODEL_UPDATE"}): @@ -202,9 +195,7 @@ def create_client_training_distribution(self): return histogram def create_client_histogram_plot(self): - """ - - :return: + """:return: """ training = [] for p in self.status.find({"type": "MODEL_UPDATE"}): @@ -230,9 +221,7 @@ def create_client_histogram_plot(self): return histogram_plot def create_client_plot(self): - """ - - :return: + """:return: """ processing = [] upload = [] @@ -258,9 +247,7 @@ def create_client_plot(self): return client_plot def create_combiner_plot(self): - """ - - :return: + """:return: """ waiting = [] aggregation = [] @@ -292,18 +279,14 @@ def create_combiner_plot(self): return combiner_plot def fetch_valid_metrics(self): - """ - - :return: + """:return: """ metrics = self.status.find_one({"type": "MODEL_VALIDATION"}) valid_metrics = self._scalar_metrics(metrics) return valid_metrics def create_box_plot(self, metric): - """ - - :param metric: + """:param metric: :return: """ metrics = self.status.find_one({"type": "MODEL_VALIDATION"}) @@ -361,9 +344,7 @@ def create_box_plot(self, metric): return box def create_round_plot(self): - """ - - :return: + """:return: """ trace_data = [] metrics = self.round_time.find_one({"key": "round_time"}) @@ -391,9 +372,7 @@ def create_round_plot(self): return round_t def create_cpu_plot(self): - """ - - :return: + """:return: """ metrics = self.psutil_usage.find_one({"key": "cpu_mem_usage"}) if metrics is None: diff --git a/fedn/utils/process.py b/fedn/utils/process.py index 1a30fca2c..c2574a760 100644 --- a/fedn/utils/process.py +++ b/fedn/utils/process.py @@ -1,5 +1,4 @@ -""" -Portions of this code are derived from the Apache 2.0 licensed project mlflow (https://mlflow.org/)., +"""Portions of this code are derived from the Apache 2.0 licensed project mlflow (https://mlflow.org/)., with modifications made by Scaleout Systems AB. Copyright (c) 2018 Databricks, Inc. diff --git a/pyproject.toml b/pyproject.toml index 24233b9f0..7eb4d0a11 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -177,21 +177,7 @@ lint.ignore = [ "PLW1508", # Invalid type for environment variable default; expected `str` or `None` "B007", # Loop control variable `v` not used within loop body "N806", # Variable `X_test` in function should be lowercase - - # solved with --fix - "Q000", # [*] Single quotes found but double quotes preferred - "D212", # [*] Multi-line docstring summary should start at the first line - "D213", # [*] Multi-line docstring summary should start at the second line - "D202", # [*] No blank lines allowed after function docstring (found 1) - "D209", # [*] Multi-line docstring closing quotes should be on a separate line - "D204", # [*] 1 blank line required after class docstring - "SIM114", # [*] Combine `if` branches using logical `or` operator - "D208", # [*] Docstring is over-indented - "I001", # [*] Import block is un-sorted or un-formatted "SIM103", # Return the condition directly - "PLR5501", # [*] Use `elif` instead of `else` then `if`, to reduce indentation - "RET501", # [*] Do not explicitly `return None` in function if it is the only possible return value - "PLW0120", # [*] `else` clause on loop without a `break` statement; remove the `else` and dedent its contents # unsafe? "S104", # Possible binding to all interfaces From 8ebe64d419b85f7255483e400925565488fdbdc9 Mon Sep 17 00:00:00 2001 From: viktorvaladi Date: Tue, 7 May 2024 17:33:38 +0200 Subject: [PATCH 2/3] fix imports --- examples/mnist-pytorch/client/train.py | 3 ++- examples/mnist-pytorch/client/validate.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/mnist-pytorch/client/train.py b/examples/mnist-pytorch/client/train.py index 9ac9cce61..d8297de0d 100644 --- a/examples/mnist-pytorch/client/train.py +++ b/examples/mnist-pytorch/client/train.py @@ -5,9 +5,10 @@ import torch from model import load_parameters, save_parameters -from data import load_data from fedn.utils.helpers.helpers import save_metadata +from .data import load_data + dir_path = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.abspath(dir_path)) diff --git a/examples/mnist-pytorch/client/validate.py b/examples/mnist-pytorch/client/validate.py index 09328181f..e2fb097eb 100644 --- a/examples/mnist-pytorch/client/validate.py +++ b/examples/mnist-pytorch/client/validate.py @@ -4,9 +4,10 @@ import torch from model import load_parameters -from data import load_data from fedn.utils.helpers.helpers import save_metrics +from .data import load_data + dir_path = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.abspath(dir_path)) From 425c352de907d6e821f35314959cfd434fa5066d Mon Sep 17 00:00:00 2001 From: viktorvaladi Date: Wed, 8 May 2024 11:12:57 +0200 Subject: [PATCH 3/3] add exclude --- examples/mnist-pytorch/client/train.py | 3 +-- examples/mnist-pytorch/client/validate.py | 3 +-- pyproject.toml | 1 + 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/examples/mnist-pytorch/client/train.py b/examples/mnist-pytorch/client/train.py index d8297de0d..9ac9cce61 100644 --- a/examples/mnist-pytorch/client/train.py +++ b/examples/mnist-pytorch/client/train.py @@ -5,10 +5,9 @@ import torch from model import load_parameters, save_parameters +from data import load_data from fedn.utils.helpers.helpers import save_metadata -from .data import load_data - dir_path = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.abspath(dir_path)) diff --git a/examples/mnist-pytorch/client/validate.py b/examples/mnist-pytorch/client/validate.py index e2fb097eb..09328181f 100644 --- a/examples/mnist-pytorch/client/validate.py +++ b/examples/mnist-pytorch/client/validate.py @@ -4,10 +4,9 @@ import torch from model import load_parameters +from data import load_data from fedn.utils.helpers.helpers import save_metrics -from .data import load_data - dir_path = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.abspath(dir_path)) diff --git a/pyproject.toml b/pyproject.toml index 7eb4d0a11..7e3484a8f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -178,6 +178,7 @@ lint.ignore = [ "B007", # Loop control variable `v` not used within loop body "N806", # Variable `X_test` in function should be lowercase "SIM103", # Return the condition directly + "I001", # [*] Import block is un-sorted or un-formatted # unsafe? "S104", # Possible binding to all interfaces