From defd0624f491af5f033a4c901732d14902b9ab67 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Mon, 5 Feb 2024 13:47:45 +0000 Subject: [PATCH] merge services in combiner --- fedn/fedn/network/clients/client.py | 5 +- fedn/fedn/network/combiner/combiner.py | 6 +- fedn/fedn/network/combiner/interfaces.py | 12 +- fedn/fedn/network/grpc/fedn_pb2.py | 15 +- fedn/fedn/network/grpc/fedn_pb2_grpc.py | 695 ++++++++++------------- fedn/fedn/network/grpc/server.py | 13 +- 6 files changed, 321 insertions(+), 425 deletions(-) diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index 68eae0845..ca3a572fd 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -222,7 +222,6 @@ def _connect(self, client_config): self.channel = channel - self.connectorStub = rpc.ConnectorStub(channel) self.combinerStub = rpc.CombinerStub(channel) self.modelStub = rpc.ModelServiceStub(channel) @@ -693,7 +692,7 @@ def _send_heartbeat(self, update_frequency=2.0): heartbeat = fedn.Heartbeat(sender=fedn.Client( name=self.name, role=fedn.WORKER)) try: - self.connectorStub.SendHeartbeat(heartbeat, metadata=self.metadata) + self.combinerStub.SendHeartbeat(heartbeat, metadata=self.metadata) self._missed_heartbeat = 0 except grpc.RpcError as e: status_code = e.code() @@ -733,7 +732,7 @@ def _send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None) self.logs.append( "{} {} LOG LEVEL {} MESSAGE {}".format(str(datetime.now()), status.sender.name, status.log_level, status.status)) - _ = self.connectorStub.SendStatus(status, metadata=self.metadata) + _ = self.combinerStub.SendStatus(status, metadata=self.metadata) def run(self): """ Run the client. """ diff --git a/fedn/fedn/network/combiner/combiner.py b/fedn/fedn/network/combiner/combiner.py index 207786e1f..684d4a67b 100644 --- a/fedn/fedn/network/combiner/combiner.py +++ b/fedn/fedn/network/combiner/combiner.py @@ -50,7 +50,7 @@ def role_to_proto_role(role): return fedn.OTHER -class Combiner(rpc.CombinerServicer, rpc.ReducerServicer, rpc.ConnectorServicer, rpc.ControlServicer): +class Combiner(rpc.CombinerServicer): """ Combiner gRPC server. :param config: configuration for the combiner @@ -400,7 +400,7 @@ def _flush_model_update_queue(self): # Controller Service - def Start(self, control: fedn.ControlRequest, context): + def StartRound(self, control: fedn.ControlRequest, context): """ Start a round of federated learning" :param control: the control request @@ -475,7 +475,7 @@ def FlushAggregationQueue(self, control: fedn.ControlRequest, context): ############################################################################## - def Stop(self, control: fedn.ControlRequest, context): + def StopRound(self, control: fedn.ControlRequest, context): """ TODO: Not yet implemented. :param control: the control request diff --git a/fedn/fedn/network/combiner/interfaces.py b/fedn/fedn/network/combiner/interfaces.py index 1bb93168b..cb9431b73 100644 --- a/fedn/fedn/network/combiner/interfaces.py +++ b/fedn/fedn/network/combiner/interfaces.py @@ -175,7 +175,7 @@ def flush_model_update_queue(self): channel = Channel(self.address, self.port, self.certificate).get_channel() - control = rpc.ControlStub(channel) + control = rpc.CombinerStub(channel) request = fedn.ControlRequest() @@ -196,7 +196,7 @@ def set_aggregator(self, aggregator): channel = Channel(self.address, self.port, self.certificate).get_channel() - control = rpc.ControlStub(channel) + control = rpc.CombinerStub(channel) request = fedn.ControlRequest() p = request.parameter.add() @@ -221,7 +221,7 @@ def submit(self, config): """ channel = Channel(self.address, self.port, self.certificate).get_channel() - control = rpc.ControlStub(channel) + control = rpc.CombinerStub(channel) request = fedn.ControlRequest() request.command = fedn.Command.START for k, v in config.items(): @@ -230,7 +230,7 @@ def submit(self, config): p.value = str(v) try: - response = control.Start(request) + response = control.StartRound(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: raise CombinerUnavailableError @@ -272,7 +272,7 @@ def allowing_clients(self): """ channel = Channel(self.address, self.port, self.certificate).get_channel() - connector = rpc.ConnectorStub(channel) + connector = rpc.CombinerStub(channel) request = fedn.ConnectionRequest() try: @@ -302,7 +302,7 @@ def list_active_clients(self, queue=1): """ channel = Channel(self.address, self.port, self.certificate).get_channel() - control = rpc.ConnectorStub(channel) + control = rpc.CombinerStub(channel) request = fedn.ListClientsRequest() request.channel = queue try: diff --git a/fedn/fedn/network/grpc/fedn_pb2.py b/fedn/fedn/network/grpc/fedn_pb2.py index 752bb20c3..18883a88f 100644 --- a/fedn/fedn/network/grpc/fedn_pb2.py +++ b/fedn/fedn/network/grpc/fedn_pb2.py @@ -15,7 +15,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66\x65\x64n/network/grpc/fedn.proto\x12\x04grpc\":\n\x08Response\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08response\x18\x02 \x01(\t\"\x8c\x02\n\x06Status\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x0e\n\x06status\x18\x02 \x01(\t\x12(\n\tlog_level\x18\x03 \x01(\x0e\x32\x15.grpc.Status.LogLevel\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x1e\n\x04type\x18\x07 \x01(\x0e\x32\x10.grpc.StatusType\x12\r\n\x05\x65xtra\x18\x08 \x01(\t\"B\n\x08LogLevel\x12\x08\n\x04INFO\x10\x00\x12\t\n\x05\x44\x45\x42UG\x10\x01\x12\x0b\n\x07WARNING\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\t\n\x05\x41UDIT\x10\x04\"\xab\x01\n\x12ModelUpdateRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\xaf\x01\n\x0bModelUpdate\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x17\n\x0fmodel_update_id\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\xc5\x01\n\x16ModelValidationRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\x12\x14\n\x0cis_inference\x18\x08 \x01(\x08\"\xa8\x01\n\x0fModelValidation\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\x89\x01\n\x0cModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\x12\n\n\x02id\x18\x04 \x01(\t\x12!\n\x06status\x18\x05 \x01(\x0e\x32\x11.grpc.ModelStatus\"]\n\rModelResponse\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\n\n\x02id\x18\x02 \x01(\t\x12!\n\x06status\x18\x03 \x01(\x0e\x32\x11.grpc.ModelStatus\x12\x0f\n\x07message\x18\x04 \x01(\t\"U\n\x15GetGlobalModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\"h\n\x16GetGlobalModelResponse\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\")\n\tHeartbeat\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\"W\n\x16\x43lientAvailableMessage\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"R\n\x12ListClientsRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x07\x63hannel\x18\x02 \x01(\x0e\x32\r.grpc.Channel\"*\n\nClientList\x12\x1c\n\x06\x63lient\x18\x01 \x03(\x0b\x32\x0c.grpc.Client\"0\n\x06\x43lient\x12\x18\n\x04role\x18\x01 \x01(\x0e\x32\n.grpc.Role\x12\x0c\n\x04name\x18\x02 \x01(\t\"m\n\x0fReassignRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x0e\n\x06server\x18\x03 \x01(\t\x12\x0c\n\x04port\x18\x04 \x01(\r\"c\n\x10ReconnectRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x11\n\treconnect\x18\x03 \x01(\r\"\'\n\tParameter\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"T\n\x0e\x43ontrolRequest\x12\x1e\n\x07\x63ommand\x18\x01 \x01(\x0e\x32\r.grpc.Command\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"F\n\x0f\x43ontrolResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"\x13\n\x11\x43onnectionRequest\"<\n\x12\x43onnectionResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.grpc.ConnectionStatus*\x84\x01\n\nStatusType\x12\x07\n\x03LOG\x10\x00\x12\x18\n\x14MODEL_UPDATE_REQUEST\x10\x01\x12\x10\n\x0cMODEL_UPDATE\x10\x02\x12\x1c\n\x18MODEL_VALIDATION_REQUEST\x10\x03\x12\x14\n\x10MODEL_VALIDATION\x10\x04\x12\r\n\tINFERENCE\x10\x05*\x86\x01\n\x07\x43hannel\x12\x0b\n\x07\x44\x45\x46\x41ULT\x10\x00\x12\x19\n\x15MODEL_UPDATE_REQUESTS\x10\x01\x12\x11\n\rMODEL_UPDATES\x10\x02\x12\x1d\n\x19MODEL_VALIDATION_REQUESTS\x10\x03\x12\x15\n\x11MODEL_VALIDATIONS\x10\x04\x12\n\n\x06STATUS\x10\x05*F\n\x0bModelStatus\x12\x06\n\x02OK\x10\x00\x12\x0f\n\x0bIN_PROGRESS\x10\x01\x12\x12\n\x0eIN_PROGRESS_OK\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03*8\n\x04Role\x12\n\n\x06WORKER\x10\x00\x12\x0c\n\x08\x43OMBINER\x10\x01\x12\x0b\n\x07REDUCER\x10\x02\x12\t\n\x05OTHER\x10\x03*J\n\x07\x43ommand\x12\x08\n\x04IDLE\x10\x00\x12\t\n\x05START\x10\x01\x12\t\n\x05PAUSE\x10\x02\x12\x08\n\x04STOP\x10\x03\x12\t\n\x05RESET\x10\x04\x12\n\n\x06REPORT\x10\x05*I\n\x10\x43onnectionStatus\x12\x11\n\rNOT_ACCEPTING\x10\x00\x12\r\n\tACCEPTING\x10\x01\x12\x13\n\x0fTRY_AGAIN_LATER\x10\x02\x32z\n\x0cModelService\x12\x33\n\x06Upload\x12\x12.grpc.ModelRequest\x1a\x13.grpc.ModelResponse(\x01\x12\x35\n\x08\x44ownload\x12\x12.grpc.ModelRequest\x1a\x13.grpc.ModelResponse0\x01\x32\xf8\x01\n\x07\x43ontrol\x12\x34\n\x05Start\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x33\n\x04Stop\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x44\n\x15\x46lushAggregationQueue\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12<\n\rSetAggregator\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse2V\n\x07Reducer\x12K\n\x0eGetGlobalModel\x12\x1b.grpc.GetGlobalModelRequest\x1a\x1c.grpc.GetGlobalModelResponse2\xab\x03\n\tConnector\x12\x44\n\x14\x41llianceStatusStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x0c.grpc.Status0\x01\x12*\n\nSendStatus\x12\x0c.grpc.Status\x1a\x0e.grpc.Response\x12?\n\x11ListActiveClients\x12\x18.grpc.ListClientsRequest\x1a\x10.grpc.ClientList\x12\x45\n\x10\x41\x63\x63\x65ptingClients\x12\x17.grpc.ConnectionRequest\x1a\x18.grpc.ConnectionResponse\x12\x30\n\rSendHeartbeat\x12\x0f.grpc.Heartbeat\x1a\x0e.grpc.Response\x12\x37\n\x0eReassignClient\x12\x15.grpc.ReassignRequest\x1a\x0e.grpc.Response\x12\x39\n\x0fReconnectClient\x12\x16.grpc.ReconnectRequest\x1a\x0e.grpc.Response2\xca\x03\n\x08\x43ombiner\x12T\n\x18ModelUpdateRequestStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x18.grpc.ModelUpdateRequest0\x01\x12\x46\n\x11ModelUpdateStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x11.grpc.ModelUpdate0\x01\x12\\\n\x1cModelValidationRequestStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x1c.grpc.ModelValidationRequest0\x01\x12N\n\x15ModelValidationStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x15.grpc.ModelValidation0\x01\x12\x34\n\x0fSendModelUpdate\x12\x11.grpc.ModelUpdate\x1a\x0e.grpc.Response\x12<\n\x13SendModelValidation\x12\x15.grpc.ModelValidation\x1a\x0e.grpc.Responseb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66\x65\x64n/network/grpc/fedn.proto\x12\x04grpc\":\n\x08Response\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08response\x18\x02 \x01(\t\"\x8c\x02\n\x06Status\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x0e\n\x06status\x18\x02 \x01(\t\x12(\n\tlog_level\x18\x03 \x01(\x0e\x32\x15.grpc.Status.LogLevel\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x1e\n\x04type\x18\x07 \x01(\x0e\x32\x10.grpc.StatusType\x12\r\n\x05\x65xtra\x18\x08 \x01(\t\"B\n\x08LogLevel\x12\x08\n\x04INFO\x10\x00\x12\t\n\x05\x44\x45\x42UG\x10\x01\x12\x0b\n\x07WARNING\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\t\n\x05\x41UDIT\x10\x04\"\xab\x01\n\x12ModelUpdateRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\xaf\x01\n\x0bModelUpdate\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x17\n\x0fmodel_update_id\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\xc5\x01\n\x16ModelValidationRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\x12\x14\n\x0cis_inference\x18\x08 \x01(\x08\"\xa8\x01\n\x0fModelValidation\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\t\x12\x16\n\x0e\x63orrelation_id\x18\x05 \x01(\t\x12\x11\n\ttimestamp\x18\x06 \x01(\t\x12\x0c\n\x04meta\x18\x07 \x01(\t\"\x89\x01\n\x0cModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\x12\n\n\x02id\x18\x04 \x01(\t\x12!\n\x06status\x18\x05 \x01(\x0e\x32\x11.grpc.ModelStatus\"]\n\rModelResponse\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\n\n\x02id\x18\x02 \x01(\t\x12!\n\x06status\x18\x03 \x01(\x0e\x32\x11.grpc.ModelStatus\x12\x0f\n\x07message\x18\x04 \x01(\t\"U\n\x15GetGlobalModelRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\"h\n\x16GetGlobalModelResponse\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x10\n\x08model_id\x18\x03 \x01(\t\")\n\tHeartbeat\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\"W\n\x16\x43lientAvailableMessage\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\t\x12\x11\n\ttimestamp\x18\x03 \x01(\t\"R\n\x12ListClientsRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x07\x63hannel\x18\x02 \x01(\x0e\x32\r.grpc.Channel\"*\n\nClientList\x12\x1c\n\x06\x63lient\x18\x01 \x03(\x0b\x32\x0c.grpc.Client\"0\n\x06\x43lient\x12\x18\n\x04role\x18\x01 \x01(\x0e\x32\n.grpc.Role\x12\x0c\n\x04name\x18\x02 \x01(\t\"m\n\x0fReassignRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x0e\n\x06server\x18\x03 \x01(\t\x12\x0c\n\x04port\x18\x04 \x01(\r\"c\n\x10ReconnectRequest\x12\x1c\n\x06sender\x18\x01 \x01(\x0b\x32\x0c.grpc.Client\x12\x1e\n\x08receiver\x18\x02 \x01(\x0b\x32\x0c.grpc.Client\x12\x11\n\treconnect\x18\x03 \x01(\r\"\'\n\tParameter\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"T\n\x0e\x43ontrolRequest\x12\x1e\n\x07\x63ommand\x18\x01 \x01(\x0e\x32\r.grpc.Command\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"F\n\x0f\x43ontrolResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\"\n\tparameter\x18\x02 \x03(\x0b\x32\x0f.grpc.Parameter\"\x13\n\x11\x43onnectionRequest\"<\n\x12\x43onnectionResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.grpc.ConnectionStatus*\x84\x01\n\nStatusType\x12\x07\n\x03LOG\x10\x00\x12\x18\n\x14MODEL_UPDATE_REQUEST\x10\x01\x12\x10\n\x0cMODEL_UPDATE\x10\x02\x12\x1c\n\x18MODEL_VALIDATION_REQUEST\x10\x03\x12\x14\n\x10MODEL_VALIDATION\x10\x04\x12\r\n\tINFERENCE\x10\x05*\x86\x01\n\x07\x43hannel\x12\x0b\n\x07\x44\x45\x46\x41ULT\x10\x00\x12\x19\n\x15MODEL_UPDATE_REQUESTS\x10\x01\x12\x11\n\rMODEL_UPDATES\x10\x02\x12\x1d\n\x19MODEL_VALIDATION_REQUESTS\x10\x03\x12\x15\n\x11MODEL_VALIDATIONS\x10\x04\x12\n\n\x06STATUS\x10\x05*F\n\x0bModelStatus\x12\x06\n\x02OK\x10\x00\x12\x0f\n\x0bIN_PROGRESS\x10\x01\x12\x12\n\x0eIN_PROGRESS_OK\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03*8\n\x04Role\x12\n\n\x06WORKER\x10\x00\x12\x0c\n\x08\x43OMBINER\x10\x01\x12\x0b\n\x07REDUCER\x10\x02\x12\t\n\x05OTHER\x10\x03*J\n\x07\x43ommand\x12\x08\n\x04IDLE\x10\x00\x12\t\n\x05START\x10\x01\x12\t\n\x05PAUSE\x10\x02\x12\x08\n\x04STOP\x10\x03\x12\t\n\x05RESET\x10\x04\x12\n\n\x06REPORT\x10\x05*I\n\x10\x43onnectionStatus\x12\x11\n\rNOT_ACCEPTING\x10\x00\x12\r\n\tACCEPTING\x10\x01\x12\x13\n\x0fTRY_AGAIN_LATER\x10\x02\x32z\n\x0cModelService\x12\x33\n\x06Upload\x12\x12.grpc.ModelRequest\x1a\x13.grpc.ModelResponse(\x01\x12\x35\n\x08\x44ownload\x12\x12.grpc.ModelRequest\x1a\x13.grpc.ModelResponse0\x01\x32\xb0\t\n\x08\x43ombiner\x12T\n\x18ModelUpdateRequestStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x18.grpc.ModelUpdateRequest0\x01\x12\x46\n\x11ModelUpdateStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x11.grpc.ModelUpdate0\x01\x12\\\n\x1cModelValidationRequestStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x1c.grpc.ModelValidationRequest0\x01\x12N\n\x15ModelValidationStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x15.grpc.ModelValidation0\x01\x12\x34\n\x0fSendModelUpdate\x12\x11.grpc.ModelUpdate\x1a\x0e.grpc.Response\x12<\n\x13SendModelValidation\x12\x15.grpc.ModelValidation\x1a\x0e.grpc.Response\x12\x39\n\nStartRound\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x38\n\tStopRound\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12\x44\n\x15\x46lushAggregationQueue\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12<\n\rSetAggregator\x12\x14.grpc.ControlRequest\x1a\x15.grpc.ControlResponse\x12K\n\x0eGetGlobalModel\x12\x1b.grpc.GetGlobalModelRequest\x1a\x1c.grpc.GetGlobalModelResponse\x12\x44\n\x14\x41llianceStatusStream\x12\x1c.grpc.ClientAvailableMessage\x1a\x0c.grpc.Status0\x01\x12*\n\nSendStatus\x12\x0c.grpc.Status\x1a\x0e.grpc.Response\x12?\n\x11ListActiveClients\x12\x18.grpc.ListClientsRequest\x1a\x10.grpc.ClientList\x12\x45\n\x10\x41\x63\x63\x65ptingClients\x12\x17.grpc.ConnectionRequest\x1a\x18.grpc.ConnectionResponse\x12\x30\n\rSendHeartbeat\x12\x0f.grpc.Heartbeat\x1a\x0e.grpc.Response\x12\x37\n\x0eReassignClient\x12\x15.grpc.ReassignRequest\x1a\x0e.grpc.Response\x12\x39\n\x0fReconnectClient\x12\x16.grpc.ReconnectRequest\x1a\x0e.grpc.Responseb\x06proto3') _STATUSTYPE = DESCRIPTOR.enum_types_by_name['StatusType'] StatusType = enum_type_wrapper.EnumTypeWrapper(_STATUSTYPE) @@ -238,9 +238,6 @@ _sym_db.RegisterMessage(ConnectionResponse) _MODELSERVICE = DESCRIPTOR.services_by_name['ModelService'] -_CONTROL = DESCRIPTOR.services_by_name['Control'] -_REDUCER = DESCRIPTOR.services_by_name['Reducer'] -_CONNECTOR = DESCRIPTOR.services_by_name['Connector'] _COMBINER = DESCRIPTOR.services_by_name['Combiner'] if _descriptor._USE_C_DESCRIPTORS == False: @@ -305,12 +302,6 @@ _CONNECTIONRESPONSE._serialized_end=2322 _MODELSERVICE._serialized_start=2877 _MODELSERVICE._serialized_end=2999 - _CONTROL._serialized_start=3002 - _CONTROL._serialized_end=3250 - _REDUCER._serialized_start=3252 - _REDUCER._serialized_end=3338 - _CONNECTOR._serialized_start=3341 - _CONNECTOR._serialized_end=3768 - _COMBINER._serialized_start=3771 - _COMBINER._serialized_end=4229 + _COMBINER._serialized_start=3002 + _COMBINER._serialized_end=4202 # @@protoc_insertion_point(module_scope) diff --git a/fedn/fedn/network/grpc/fedn_pb2_grpc.py b/fedn/fedn/network/grpc/fedn_pb2_grpc.py index 1a03fe970..8934f5932 100644 --- a/fedn/fedn/network/grpc/fedn_pb2_grpc.py +++ b/fedn/fedn/network/grpc/fedn_pb2_grpc.py @@ -99,7 +99,7 @@ def Download(request, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) -class ControlStub(object): +class CombinerStub(object): """Missing associated documentation comment in .proto file.""" def __init__(self, channel): @@ -108,267 +108,168 @@ def __init__(self, channel): Args: channel: A grpc.Channel. """ - self.Start = channel.unary_unary( - '/grpc.Control/Start', + self.ModelUpdateRequestStream = channel.unary_stream( + '/grpc.Combiner/ModelUpdateRequestStream', + request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, + response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.FromString, + ) + self.ModelUpdateStream = channel.unary_stream( + '/grpc.Combiner/ModelUpdateStream', + request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, + response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, + ) + self.ModelValidationRequestStream = channel.unary_stream( + '/grpc.Combiner/ModelValidationRequestStream', + request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, + response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidationRequest.FromString, + ) + self.ModelValidationStream = channel.unary_stream( + '/grpc.Combiner/ModelValidationStream', + request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, + response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, + ) + self.SendModelUpdate = channel.unary_unary( + '/grpc.Combiner/SendModelUpdate', + request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, + response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + ) + self.SendModelValidation = channel.unary_unary( + '/grpc.Combiner/SendModelValidation', + request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, + response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + ) + self.StartRound = channel.unary_unary( + '/grpc.Combiner/StartRound', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, ) - self.Stop = channel.unary_unary( - '/grpc.Control/Stop', + self.StopRound = channel.unary_unary( + '/grpc.Combiner/StopRound', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, ) self.FlushAggregationQueue = channel.unary_unary( - '/grpc.Control/FlushAggregationQueue', + '/grpc.Combiner/FlushAggregationQueue', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, ) self.SetAggregator = channel.unary_unary( - '/grpc.Control/SetAggregator', + '/grpc.Combiner/SetAggregator', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, ) - - -class ControlServicer(object): - """Missing associated documentation comment in .proto file.""" - - def Start(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def Stop(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def FlushAggregationQueue(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def SetAggregator(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_ControlServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Start': grpc.unary_unary_rpc_method_handler( - servicer.Start, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, - ), - 'Stop': grpc.unary_unary_rpc_method_handler( - servicer.Stop, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, - ), - 'FlushAggregationQueue': grpc.unary_unary_rpc_method_handler( - servicer.FlushAggregationQueue, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, - ), - 'SetAggregator': grpc.unary_unary_rpc_method_handler( - servicer.SetAggregator, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'grpc.Control', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - # This class is part of an EXPERIMENTAL API. -class Control(object): - """Missing associated documentation comment in .proto file.""" - - @staticmethod - def Start(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Control/Start', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def Stop(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Control/Stop', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def FlushAggregationQueue(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Control/FlushAggregationQueue', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def SetAggregator(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Control/SetAggregator', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - -class ReducerStub(object): - """Missing associated documentation comment in .proto file.""" - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ self.GetGlobalModel = channel.unary_unary( - '/grpc.Reducer/GetGlobalModel', + '/grpc.Combiner/GetGlobalModel', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.SerializeToString, response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.FromString, ) - - -class ReducerServicer(object): - """Missing associated documentation comment in .proto file.""" - - def GetGlobalModel(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_ReducerServicer_to_server(servicer, server): - rpc_method_handlers = { - 'GetGlobalModel': grpc.unary_unary_rpc_method_handler( - servicer.GetGlobalModel, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'grpc.Reducer', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - # This class is part of an EXPERIMENTAL API. -class Reducer(object): - """Missing associated documentation comment in .proto file.""" - - @staticmethod - def GetGlobalModel(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Reducer/GetGlobalModel', - fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - -class ConnectorStub(object): - """Missing associated documentation comment in .proto file.""" - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ self.AllianceStatusStream = channel.unary_stream( - '/grpc.Connector/AllianceStatusStream', + '/grpc.Combiner/AllianceStatusStream', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Status.FromString, ) self.SendStatus = channel.unary_unary( - '/grpc.Connector/SendStatus', + '/grpc.Combiner/SendStatus', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Status.SerializeToString, response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, ) self.ListActiveClients = channel.unary_unary( - '/grpc.Connector/ListActiveClients', + '/grpc.Combiner/ListActiveClients', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ListClientsRequest.SerializeToString, response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientList.FromString, ) self.AcceptingClients = channel.unary_unary( - '/grpc.Connector/AcceptingClients', + '/grpc.Combiner/AcceptingClients', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ConnectionRequest.SerializeToString, response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ConnectionResponse.FromString, ) self.SendHeartbeat = channel.unary_unary( - '/grpc.Connector/SendHeartbeat', + '/grpc.Combiner/SendHeartbeat', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Heartbeat.SerializeToString, response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, ) self.ReassignClient = channel.unary_unary( - '/grpc.Connector/ReassignClient', + '/grpc.Combiner/ReassignClient', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ReassignRequest.SerializeToString, response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, ) self.ReconnectClient = channel.unary_unary( - '/grpc.Connector/ReconnectClient', + '/grpc.Combiner/ReconnectClient', request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ReconnectRequest.SerializeToString, response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, ) -class ConnectorServicer(object): +class CombinerServicer(object): """Missing associated documentation comment in .proto file.""" + def ModelUpdateRequestStream(self, request, context): + """Stream endpoints for training/validation pub/sub + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ModelUpdateStream(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ModelValidationRequestStream(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ModelValidationStream(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def SendModelUpdate(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def SendModelValidation(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def StartRound(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def StopRound(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def FlushAggregationQueue(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def SetAggregator(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetGlobalModel(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def AllianceStatusStream(self, request, context): """Stream endpoint for status updates """ @@ -384,8 +285,7 @@ def SendStatus(self, request, context): raise NotImplementedError('Method not implemented!') def ListActiveClients(self, request, context): - """rpc RegisterClient (ClientAvailableMessage) returns (Response); - List active clients endpoint + """List active clients endpoint """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') @@ -417,8 +317,63 @@ def ReconnectClient(self, request, context): raise NotImplementedError('Method not implemented!') -def add_ConnectorServicer_to_server(servicer, server): +def add_CombinerServicer_to_server(servicer, server): rpc_method_handlers = { + 'ModelUpdateRequestStream': grpc.unary_stream_rpc_method_handler( + servicer.ModelUpdateRequestStream, + request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, + response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.SerializeToString, + ), + 'ModelUpdateStream': grpc.unary_stream_rpc_method_handler( + servicer.ModelUpdateStream, + request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, + response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, + ), + 'ModelValidationRequestStream': grpc.unary_stream_rpc_method_handler( + servicer.ModelValidationRequestStream, + request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, + response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidationRequest.SerializeToString, + ), + 'ModelValidationStream': grpc.unary_stream_rpc_method_handler( + servicer.ModelValidationStream, + request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, + response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, + ), + 'SendModelUpdate': grpc.unary_unary_rpc_method_handler( + servicer.SendModelUpdate, + request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, + response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, + ), + 'SendModelValidation': grpc.unary_unary_rpc_method_handler( + servicer.SendModelValidation, + request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, + response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, + ), + 'StartRound': grpc.unary_unary_rpc_method_handler( + servicer.StartRound, + request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, + response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, + ), + 'StopRound': grpc.unary_unary_rpc_method_handler( + servicer.StopRound, + request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, + response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, + ), + 'FlushAggregationQueue': grpc.unary_unary_rpc_method_handler( + servicer.FlushAggregationQueue, + request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, + response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, + ), + 'SetAggregator': grpc.unary_unary_rpc_method_handler( + servicer.SetAggregator, + request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.FromString, + response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.SerializeToString, + ), + 'GetGlobalModel': grpc.unary_unary_rpc_method_handler( + servicer.GetGlobalModel, + request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.FromString, + response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.SerializeToString, + ), 'AllianceStatusStream': grpc.unary_stream_rpc_method_handler( servicer.AllianceStatusStream, request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, @@ -456,16 +411,67 @@ def add_ConnectorServicer_to_server(servicer, server): ), } generic_handler = grpc.method_handlers_generic_handler( - 'grpc.Connector', rpc_method_handlers) + 'grpc.Combiner', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) - # This class is part of an EXPERIMENTAL API. -class Connector(object): - """Missing associated documentation comment in .proto file.""" + # This class is part of an EXPERIMENTAL API. +class Combiner(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def ModelUpdateRequestStream(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelUpdateRequestStream', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ModelUpdateStream(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelUpdateStream', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ModelValidationRequestStream(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelValidationRequestStream', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidationRequest.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def AllianceStatusStream(request, + def ModelValidationStream(request, target, options=(), channel_credentials=None, @@ -475,14 +481,14 @@ def AllianceStatusStream(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_stream(request, target, '/grpc.Connector/AllianceStatusStream', + return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelValidationStream', fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.Status.FromString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def SendStatus(request, + def SendModelUpdate(request, target, options=(), channel_credentials=None, @@ -492,14 +498,14 @@ def SendStatus(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Connector/SendStatus', - fedn_dot_network_dot_grpc_dot_fedn__pb2.Status.SerializeToString, + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/SendModelUpdate', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def ListActiveClients(request, + def SendModelValidation(request, target, options=(), channel_credentials=None, @@ -509,14 +515,14 @@ def ListActiveClients(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Connector/ListActiveClients', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ListClientsRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientList.FromString, + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/SendModelValidation', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def AcceptingClients(request, + def StartRound(request, target, options=(), channel_credentials=None, @@ -526,14 +532,14 @@ def AcceptingClients(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Connector/AcceptingClients', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ConnectionRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ConnectionResponse.FromString, + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/StartRound', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def SendHeartbeat(request, + def StopRound(request, target, options=(), channel_credentials=None, @@ -543,14 +549,14 @@ def SendHeartbeat(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Connector/SendHeartbeat', - fedn_dot_network_dot_grpc_dot_fedn__pb2.Heartbeat.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/StopRound', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def ReassignClient(request, + def FlushAggregationQueue(request, target, options=(), channel_credentials=None, @@ -560,14 +566,14 @@ def ReassignClient(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Connector/ReassignClient', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ReassignRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/FlushAggregationQueue', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def ReconnectClient(request, + def SetAggregator(request, target, options=(), channel_credentials=None, @@ -577,139 +583,31 @@ def ReconnectClient(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Connector/ReconnectClient', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ReconnectRequest.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/SetAggregator', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlRequest.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.ControlResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - -class CombinerStub(object): - """Missing associated documentation comment in .proto file.""" - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.ModelUpdateRequestStream = channel.unary_stream( - '/grpc.Combiner/ModelUpdateRequestStream', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.FromString, - ) - self.ModelUpdateStream = channel.unary_stream( - '/grpc.Combiner/ModelUpdateStream', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, - ) - self.ModelValidationRequestStream = channel.unary_stream( - '/grpc.Combiner/ModelValidationRequestStream', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidationRequest.FromString, - ) - self.ModelValidationStream = channel.unary_stream( - '/grpc.Combiner/ModelValidationStream', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, - ) - self.SendModelUpdate = channel.unary_unary( - '/grpc.Combiner/SendModelUpdate', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, - ) - self.SendModelValidation = channel.unary_unary( - '/grpc.Combiner/SendModelValidation', - request_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, - response_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, - ) - - -class CombinerServicer(object): - """Missing associated documentation comment in .proto file.""" - - def ModelUpdateRequestStream(self, request, context): - """Stream endpoints for training/validation pub/sub - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ModelUpdateStream(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ModelValidationRequestStream(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ModelValidationStream(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def SendModelUpdate(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def SendModelValidation(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_CombinerServicer_to_server(servicer, server): - rpc_method_handlers = { - 'ModelUpdateRequestStream': grpc.unary_stream_rpc_method_handler( - servicer.ModelUpdateRequestStream, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.SerializeToString, - ), - 'ModelUpdateStream': grpc.unary_stream_rpc_method_handler( - servicer.ModelUpdateStream, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, - ), - 'ModelValidationRequestStream': grpc.unary_stream_rpc_method_handler( - servicer.ModelValidationRequestStream, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidationRequest.SerializeToString, - ), - 'ModelValidationStream': grpc.unary_stream_rpc_method_handler( - servicer.ModelValidationStream, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, - ), - 'SendModelUpdate': grpc.unary_unary_rpc_method_handler( - servicer.SendModelUpdate, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, - ), - 'SendModelValidation': grpc.unary_unary_rpc_method_handler( - servicer.SendModelValidation, - request_deserializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, - response_serializer=fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'grpc.Combiner', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - # This class is part of an EXPERIMENTAL API. -class Combiner(object): - """Missing associated documentation comment in .proto file.""" + @staticmethod + def GetGlobalModel(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/GetGlobalModel', + fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelRequest.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.GetGlobalModelResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def ModelUpdateRequestStream(request, + def AllianceStatusStream(request, target, options=(), channel_credentials=None, @@ -719,14 +617,14 @@ def ModelUpdateRequestStream(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelUpdateRequestStream', + return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/AllianceStatusStream', fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdateRequest.FromString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.Status.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def ModelUpdateStream(request, + def SendStatus(request, target, options=(), channel_credentials=None, @@ -736,14 +634,14 @@ def ModelUpdateStream(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelUpdateStream', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.FromString, + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/SendStatus', + fedn_dot_network_dot_grpc_dot_fedn__pb2.Status.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def ModelValidationRequestStream(request, + def ListActiveClients(request, target, options=(), channel_credentials=None, @@ -753,14 +651,14 @@ def ModelValidationRequestStream(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelValidationRequestStream', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidationRequest.FromString, + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/ListActiveClients', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ListClientsRequest.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientList.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def ModelValidationStream(request, + def AcceptingClients(request, target, options=(), channel_credentials=None, @@ -770,14 +668,14 @@ def ModelValidationStream(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_stream(request, target, '/grpc.Combiner/ModelValidationStream', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ClientAvailableMessage.SerializeToString, - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.FromString, + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/AcceptingClients', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ConnectionRequest.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.ConnectionResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def SendModelUpdate(request, + def SendHeartbeat(request, target, options=(), channel_credentials=None, @@ -787,14 +685,14 @@ def SendModelUpdate(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/SendModelUpdate', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelUpdate.SerializeToString, + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/SendHeartbeat', + fedn_dot_network_dot_grpc_dot_fedn__pb2.Heartbeat.SerializeToString, fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod - def SendModelValidation(request, + def ReassignClient(request, target, options=(), channel_credentials=None, @@ -804,8 +702,25 @@ def SendModelValidation(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/SendModelValidation', - fedn_dot_network_dot_grpc_dot_fedn__pb2.ModelValidation.SerializeToString, + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/ReassignClient', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ReassignRequest.SerializeToString, + fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ReconnectClient(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/grpc.Combiner/ReconnectClient', + fedn_dot_network_dot_grpc_dot_fedn__pb2.ReconnectRequest.SerializeToString, fedn_dot_network_dot_grpc_dot_fedn__pb2.Response.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/fedn/fedn/network/grpc/server.py b/fedn/fedn/network/grpc/server.py index 59ed6b1ba..a2b0ad4da 100644 --- a/fedn/fedn/network/grpc/server.py +++ b/fedn/fedn/network/grpc/server.py @@ -20,17 +20,8 @@ def __init__(self, servicer, modelservicer, config): self.certificate = None self.health_servicer = health.HealthServicer() - if isinstance(servicer, rpc.CombinerServicer): - rpc.add_CombinerServicer_to_server(servicer, self.server) - if isinstance(servicer, rpc.ConnectorServicer): - rpc.add_ConnectorServicer_to_server(servicer, self.server) - if isinstance(servicer, rpc.ReducerServicer): - rpc.add_ReducerServicer_to_server(servicer, self.server) - if isinstance(modelservicer, rpc.ModelServiceServicer): - rpc.add_ModelServiceServicer_to_server(modelservicer, self.server) - if isinstance(servicer, rpc.CombinerServicer): - rpc.add_ControlServicer_to_server(servicer, self.server) - + rpc.add_CombinerServicer_to_server(servicer, self.server) + rpc.add_ModelServiceServicer_to_server(modelservicer, self.server) health_pb2_grpc.add_HealthServicer_to_server(self.health_servicer, self.server) if config['secure']: