Skip to content

Commit

Permalink
add sender to download
Browse files Browse the repository at this point in the history
  • Loading branch information
Wrede committed Feb 6, 2024
1 parent 693eca3 commit 7ee1232
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
11 changes: 10 additions & 1 deletion fedn/fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,11 @@ def get_model_from_combiner(self, id, timeout=20):
"""
data = BytesIO()
time_start = time.time()
request = fedn.ModelRequest(id=id)
request.sender.name = self.name
request.sender.role = fedn.WORKER

for part in self.modelStub.Download(fedn.ModelRequest(id=id), metadata=self.metadata):
for part in self.modelStub.Download(request, metadata=self.metadata):

if part.status == fedn.ModelStatus.IN_PROGRESS:
data.write(part.data)
Expand Down Expand Up @@ -515,6 +518,9 @@ def _process_training_request(self, model_id):
meta = {}
tic = time.time()
mdl = self.get_model_from_combiner(str(model_id))
if mdl is None:
logger.error("Could not retrieve model from combiner. Aborting training request.")
return None, None
meta['fetch_model'] = time.time() - tic

inpath = self.helper.get_tmp_path()
Expand Down Expand Up @@ -579,6 +585,9 @@ def _process_validation_request(self, model_id, is_inference):
self.state = ClientState.validating
try:
model = self.get_model_from_combiner(str(model_id))
if model is None:
logger.error("Could not retrieve model from combiner. Aborting validation request.")
return None
inpath = self.helper.get_tmp_path()

with open(inpath, "wb") as fh:
Expand Down
15 changes: 13 additions & 2 deletions fedn/fedn/network/combiner/interfaces.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import copy
import json
import time
from io import BytesIO

import grpc
Expand Down Expand Up @@ -239,7 +240,7 @@ def submit(self, config):

return response

def get_model(self, id):
def get_model(self, id, timeout=10):
""" Download a model from the combiner server.
:param id: The model id.
Expand All @@ -255,14 +256,24 @@ def get_model(self, id):
data = BytesIO()
data.seek(0, 0)

parts = modelservice.Download(fedn.ModelRequest(id=id))
time_start = time.time()

request = fedn.ModelRequest(id=id)
request.sender.name = self.name
request.sender.role = fedn.WORKER

parts = modelservice.Download(request)
for part in parts:
if part.status == fedn.ModelStatus.IN_PROGRESS:
data.write(part.data)
if part.status == fedn.ModelStatus.OK:
return data
if part.status == fedn.ModelStatus.FAILED:
return None
if part.status == fedn.ModelStatus.UNKNOWN:
if time.time() - time_start > timeout:
return None
continue

def allowing_clients(self):
""" Check if the combiner is allowing additional client connections.
Expand Down
2 changes: 1 addition & 1 deletion fedn/fedn/network/combiner/modelservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def Download(self, request, context):
:return: A model response iterator.
:rtype: :class:`fedn.network.grpc.fedn_pb2.ModelResponse`
"""
logger.debug("grpc.ModelService.Download: Called")
logger.info(f'grpc.ModelService.Download: {request.sender.role}:{request.sender.name} requested model {request.id}')
try:
status = self.temp_model_storage.get_model_metadata(request.id)
if status != fedn.ModelStatus.OK:
Expand Down

0 comments on commit 7ee1232

Please sign in to comment.