Skip to content

Commit

Permalink
merge services in combiner
Browse files Browse the repository at this point in the history
  • Loading branch information
Wrede committed Feb 5, 2024
1 parent e3c880c commit defd062
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 425 deletions.
5 changes: 2 additions & 3 deletions fedn/fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ def _connect(self, client_config):

self.channel = channel

self.connectorStub = rpc.ConnectorStub(channel)
self.combinerStub = rpc.CombinerStub(channel)
self.modelStub = rpc.ModelServiceStub(channel)

Expand Down Expand Up @@ -693,7 +692,7 @@ def _send_heartbeat(self, update_frequency=2.0):
heartbeat = fedn.Heartbeat(sender=fedn.Client(
name=self.name, role=fedn.WORKER))
try:
self.connectorStub.SendHeartbeat(heartbeat, metadata=self.metadata)
self.combinerStub.SendHeartbeat(heartbeat, metadata=self.metadata)
self._missed_heartbeat = 0
except grpc.RpcError as e:
status_code = e.code()
Expand Down Expand Up @@ -733,7 +732,7 @@ def _send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None)
self.logs.append(
"{} {} LOG LEVEL {} MESSAGE {}".format(str(datetime.now()), status.sender.name, status.log_level,
status.status))
_ = self.connectorStub.SendStatus(status, metadata=self.metadata)
_ = self.combinerStub.SendStatus(status, metadata=self.metadata)

def run(self):
""" Run the client. """
Expand Down
6 changes: 3 additions & 3 deletions fedn/fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def role_to_proto_role(role):
return fedn.OTHER


class Combiner(rpc.CombinerServicer, rpc.ReducerServicer, rpc.ConnectorServicer, rpc.ControlServicer):
class Combiner(rpc.CombinerServicer):
""" Combiner gRPC server.
:param config: configuration for the combiner
Expand Down Expand Up @@ -400,7 +400,7 @@ def _flush_model_update_queue(self):

# Controller Service

def Start(self, control: fedn.ControlRequest, context):
def StartRound(self, control: fedn.ControlRequest, context):
""" Start a round of federated learning"
:param control: the control request
Expand Down Expand Up @@ -475,7 +475,7 @@ def FlushAggregationQueue(self, control: fedn.ControlRequest, context):

##############################################################################

def Stop(self, control: fedn.ControlRequest, context):
def StopRound(self, control: fedn.ControlRequest, context):
""" TODO: Not yet implemented.
:param control: the control request
Expand Down
12 changes: 6 additions & 6 deletions fedn/fedn/network/combiner/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def flush_model_update_queue(self):

channel = Channel(self.address, self.port,
self.certificate).get_channel()
control = rpc.ControlStub(channel)
control = rpc.CombinerStub(channel)

request = fedn.ControlRequest()

Expand All @@ -196,7 +196,7 @@ def set_aggregator(self, aggregator):

channel = Channel(self.address, self.port,
self.certificate).get_channel()
control = rpc.ControlStub(channel)
control = rpc.CombinerStub(channel)

request = fedn.ControlRequest()
p = request.parameter.add()
Expand All @@ -221,7 +221,7 @@ def submit(self, config):
"""
channel = Channel(self.address, self.port,
self.certificate).get_channel()
control = rpc.ControlStub(channel)
control = rpc.CombinerStub(channel)
request = fedn.ControlRequest()
request.command = fedn.Command.START
for k, v in config.items():
Expand All @@ -230,7 +230,7 @@ def submit(self, config):
p.value = str(v)

try:
response = control.Start(request)
response = control.StartRound(request)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
raise CombinerUnavailableError
Expand Down Expand Up @@ -272,7 +272,7 @@ def allowing_clients(self):
"""
channel = Channel(self.address, self.port,
self.certificate).get_channel()
connector = rpc.ConnectorStub(channel)
connector = rpc.CombinerStub(channel)
request = fedn.ConnectionRequest()

try:
Expand Down Expand Up @@ -302,7 +302,7 @@ def list_active_clients(self, queue=1):
"""
channel = Channel(self.address, self.port,
self.certificate).get_channel()
control = rpc.ConnectorStub(channel)
control = rpc.CombinerStub(channel)
request = fedn.ListClientsRequest()
request.channel = queue
try:
Expand Down
15 changes: 3 additions & 12 deletions fedn/fedn/network/grpc/fedn_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit defd062

Please sign in to comment.