diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index 4dd67eaad..c5f93d315 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -370,8 +370,11 @@ def get_model_from_combiner(self, id, timeout=20): """ data = BytesIO() time_start = time.time() + request = fedn.ModelRequest(id=id) + request.sender.name = self.name + request.sender.role = fedn.WORKER - for part in self.modelStub.Download(fedn.ModelRequest(id=id), metadata=self.metadata): + for part in self.modelStub.Download(request, metadata=self.metadata): if part.status == fedn.ModelStatus.IN_PROGRESS: data.write(part.data) @@ -515,6 +518,9 @@ def _process_training_request(self, model_id): meta = {} tic = time.time() mdl = self.get_model_from_combiner(str(model_id)) + if mdl is None: + logger.error("Could not retrieve model from combiner. Aborting training request.") + return None, None meta['fetch_model'] = time.time() - tic inpath = self.helper.get_tmp_path() @@ -579,6 +585,9 @@ def _process_validation_request(self, model_id, is_inference): self.state = ClientState.validating try: model = self.get_model_from_combiner(str(model_id)) + if model is None: + logger.error("Could not retrieve model from combiner. Aborting validation request.") + return None inpath = self.helper.get_tmp_path() with open(inpath, "wb") as fh: diff --git a/fedn/fedn/network/combiner/interfaces.py b/fedn/fedn/network/combiner/interfaces.py index 1bb93168b..69b987be0 100644 --- a/fedn/fedn/network/combiner/interfaces.py +++ b/fedn/fedn/network/combiner/interfaces.py @@ -1,6 +1,7 @@ import base64 import copy import json +import time from io import BytesIO import grpc @@ -239,7 +240,7 @@ def submit(self, config): return response - def get_model(self, id): + def get_model(self, id, timeout=10): """ Download a model from the combiner server. :param id: The model id. @@ -255,7 +256,13 @@ def get_model(self, id): data = BytesIO() data.seek(0, 0) - parts = modelservice.Download(fedn.ModelRequest(id=id)) + time_start = time.time() + + request = fedn.ModelRequest(id=id) + request.sender.name = self.name + request.sender.role = fedn.WORKER + + parts = modelservice.Download(request) for part in parts: if part.status == fedn.ModelStatus.IN_PROGRESS: data.write(part.data) @@ -263,6 +270,10 @@ def get_model(self, id): return data if part.status == fedn.ModelStatus.FAILED: return None + if part.status == fedn.ModelStatus.UNKNOWN: + if time.time() - time_start > timeout: + return None + continue def allowing_clients(self): """ Check if the combiner is allowing additional client connections. diff --git a/fedn/fedn/network/combiner/modelservice.py b/fedn/fedn/network/combiner/modelservice.py index 9f0646a06..59c63108b 100644 --- a/fedn/fedn/network/combiner/modelservice.py +++ b/fedn/fedn/network/combiner/modelservice.py @@ -179,7 +179,7 @@ def Download(self, request, context): :return: A model response iterator. :rtype: :class:`fedn.network.grpc.fedn_pb2.ModelResponse` """ - logger.debug("grpc.ModelService.Download: Called") + logger.info(f'grpc.ModelService.Download: {request.sender.role}:{request.sender.name} requested model {request.id}') try: status = self.temp_model_storage.get_model_metadata(request.id) if status != fedn.ModelStatus.OK: