Skip to content

Commit

Permalink
Merge branch 'master' into feature/SK-823
Browse files Browse the repository at this point in the history
  • Loading branch information
FrankJonasmoelle committed May 17, 2024
2 parents 53b3d4f + 186a1bf commit 7e6b025
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def __init__(self, config):
# Client queues
self.clients = {}

self.modelservice = ModelService()

# Validate combiner name
match = re.search(VALID_NAME_REGEX, config["name"])
Expand Down Expand Up @@ -122,6 +121,17 @@ def __init__(self, config):
self.repository = Repository(announce_config["storage"]["storage_config"])

self.statestore = MongoStateStore(announce_config["statestore"]["network_id"], announce_config["statestore"]["mongo_config"])

# Fetch all clients previously connected to the combiner
# If a client and a combiner goes down at the same time,
# the client will be stuck listed as "online" in the statestore.
# Set the status to offline for previous clients.
previous_clients = self.statestore.clients.find({"combiner": config["name"]})
for client in previous_clients:
self.statestore.set_client({"name": client["name"], "status": "offline"})

self.modelservice = ModelService()

# Create gRPC server
self.server = Server(self, self.modelservice, grpc_config)

Expand Down Expand Up @@ -326,7 +336,7 @@ def _list_active_clients(self, channel):
if status != "online":
self.clients[client]["status"] = "online"
clients["update_active_clients"].append(client)
elif status == "online":
elif status != "offline":
self.clients[client]["status"] = "offline"
clients["update_offline_clients"].append(client)
# Update statestore with client status
Expand All @@ -337,7 +347,7 @@ def _list_active_clients(self, channel):

return clients["active_clients"]

def _deamon_thread_client_status(self, timeout=10):
def _deamon_thread_client_status(self, timeout=5):
"""Deamon thread that checks for inactive clients and updates statestore."""
while True:
time.sleep(timeout)
Expand Down Expand Up @@ -600,6 +610,10 @@ def TaskStream(self, response, context):

self._send_status(status)

# Set client status to online
self.clients[client.name]["status"] = "online"
self.statestore.set_client({"name": client.name, "status": "online"})

# Keep track of the time context has been active
start_time = time.time()
while context.is_active():
Expand Down

0 comments on commit 7e6b025

Please sign in to comment.