Skip to content

Commit

Permalink
send cpu and memory utilisation via send heart beat
Browse files Browse the repository at this point in the history
  • Loading branch information
niklastheman committed Mar 5, 2025
1 parent 8ab6f9b commit acd7e06
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 97 deletions.
14 changes: 10 additions & 4 deletions fedn/network/clients/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any, Callable, Optional, Union

import grpc
import psutil
from google.protobuf.json_format import MessageToJson

import fedn.network.grpc.fedn_pb2 as fedn
Expand Down Expand Up @@ -107,13 +108,17 @@ def _init_insecure_channel(self, host: str, port: int) -> None:
options=GRPC_OPTIONS,
)

def heartbeat(self, client_name: str, client_id: str) -> fedn.Response:
def heartbeat(self, client_name: str, client_id: str, memory_utilisation: float, cpu_utilisation: float) -> fedn.Response:
"""Send a heartbeat to the combiner.
:return: Response from the combiner.
:rtype: fedn.Response
"""
heartbeat = fedn.Heartbeat(sender=fedn.Client(name=client_name, role=fedn.CLIENT, client_id=client_id))
heartbeat = fedn.Heartbeat(
sender=fedn.Client(name=client_name, role=fedn.CLIENT, client_id=client_id),
memory_utilisation=memory_utilisation,
cpu_utilisation=cpu_utilisation,
)

try:
logger.info("Sending heartbeat to combiner")
Expand All @@ -131,7 +136,9 @@ def send_heartbeats(self, client_name: str, client_id: str, update_frequency: fl
send_heartbeat = True
while send_heartbeat:
try:
response = self.heartbeat(client_name, client_id)
memory_usage = psutil.virtual_memory().percent
cpu_usage = psutil.cpu_percent(interval=update_frequency)
response = self.heartbeat(client_name, client_id, memory_usage, cpu_usage)
except grpc.RpcError as e:
self._handle_grpc_error(e, "SendHeartbeat", lambda: self.send_heartbeats(client_name, client_id, update_frequency))
return
Expand All @@ -143,7 +150,6 @@ def send_heartbeats(self, client_name: str, client_id: str, update_frequency: fl
else:
logger.error("Heartbeat failed.")
send_heartbeat = False
time.sleep(update_frequency)

def listen_to_task_stream(self, client_name: str, client_id: str, callback: Callable[[Any], None]) -> None:
"""Subscribe to the model update request stream."""
Expand Down
18 changes: 16 additions & 2 deletions fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from fedn.common.certificate.certificate import Certificate
from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream
from fedn.network.combiner.roundhandler import RoundConfig, RoundHandler
from fedn.network.combiner.shared import client_store, combiner_store, prediction_store, repository, round_store, status_store, validation_store
from fedn.network.combiner.shared import analytic_store, client_store, combiner_store, prediction_store, repository, round_store, status_store, validation_store
from fedn.network.grpc.server import Server, ServerConfig

VALID_NAME_REGEX = "^[a-zA-Z0-9_-]*$"
Expand Down Expand Up @@ -641,12 +641,26 @@ def SendHeartbeat(self, heartbeat: fedn.Heartbeat, context):
:return: the response
:rtype: :class:`fedn.network.grpc.fedn_pb2.Response`
"""
logger.debug("GRPC: Received heartbeat from {}".format(heartbeat.sender.name))
logger.info("GRPC: Received heartbeat from {}".format(heartbeat.sender.name))
# Update the clients dict with the last seen timestamp.
client = heartbeat.sender
self.__join_client(client)
self.clients[client.client_id]["last_seen"] = datetime.now()

success, msg = analytic_store.add(
{
"id": str(uuid.uuid4()),
"sender_id": client.client_id,
"sender_role": "client",
"cpu_utilisation": heartbeat.cpu_utilisation,
"memory_utilisation": heartbeat.memory_utilisation,
"committed_at": datetime.now(),
}
)

if not success:
logger.error(f"GRPC: SendHeartbeat error: {msg}")

response = fedn.Response()
response.sender.name = heartbeat.sender.name
response.sender.role = heartbeat.sender.role
Expand Down
2 changes: 2 additions & 0 deletions fedn/network/combiner/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from fedn.network.combiner.modelservice import ModelService
from fedn.network.storage.dbconnection import DatabaseConnection
from fedn.network.storage.s3.repository import Repository
from fedn.network.storage.statestore.stores.analytic_store import AnalyticStore
from fedn.network.storage.statestore.stores.client_store import ClientStore
from fedn.network.storage.statestore.stores.combiner_store import CombinerStore
from fedn.network.storage.statestore.stores.model_store import ModelStore
Expand All @@ -25,6 +26,7 @@
status_store: StatusStore = stores.status_store
validation_store: ValidationStore = stores.validation_store
prediction_store: PredictionStore = stores.prediction_store
analytic_store: AnalyticStore = stores.analytic_store


repository = Repository(modelstorage_config["storage_config"], init_buckets=False)
Expand Down
2 changes: 2 additions & 0 deletions fedn/network/grpc/fedn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ message GetGlobalModelResponse {

message Heartbeat {
Client sender = 1;
float memory_utilisation = 2;
float cpu_utilisation = 3;
}

message ClientAvailableMessage {
Expand Down
158 changes: 79 additions & 79 deletions fedn/network/grpc/fedn_pb2.py

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions fedn/network/grpc/fedn_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings

import grpc

from fedn.network.grpc import fedn_pb2 as network_dot_grpc_dot_fedn__pb2

GRPC_GENERATED_VERSION = '1.66.2'
GRPC_GENERATED_VERSION = '1.68.1'
GRPC_VERSION = grpc.__version__
_version_not_supported = False

Expand Down
20 changes: 10 additions & 10 deletions fedn/network/storage/statestore/stores/analytic_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@


class Analytic:
def __init__(self, id: str, client_id: str, type: str, execution_duration: int, model_id: str, committed_at: datetime):
def __init__(self, id: str, sender_id: str, sender_role: str, memory_utilisation: float, cpu_utilisation: float, committed_at: datetime):
self.id = id
self.client_id = client_id
self.type = type
self.execution_duration = execution_duration
self.model_id = model_id
self.sender_id = sender_id
self.sender_role = sender_role
self.memory_utilisation = memory_utilisation
self.cpu_utilisation = cpu_utilisation
self.committed_at = committed_at


Expand All @@ -22,10 +22,10 @@ class AnalyticStore(Store[Analytic]):


def _validate_analytic(analytic: dict) -> Tuple[bool, str]:
if "client_id" not in analytic:
return False, "client_id is required"
if "type" not in analytic or analytic["type"] not in ["training", "inference"]:
return False, "type must be either 'training' or 'inference'"
if "sender_id" not in analytic:
return False, "sender_id is required"
if "sender_role" not in analytic or analytic["sender_role"] not in ["combiner", "client"]:
return False, "sender_role must be either 'combiner' or 'client'"
return analytic, ""


Expand All @@ -37,7 +37,7 @@ def _complete_analytic(analytic: dict) -> dict:
class MongoDBAnalyticStore(AnalyticStore, MongoDBStore[Analytic]):
def __init__(self, database: Database, collection: str):
super().__init__(database, collection)
self.database[self.collection].create_index([("client_id", pymongo.DESCENDING)])
self.database[self.collection].create_index([("sender_id", pymongo.DESCENDING)])

def get(self, id: str) -> Analytic:
return super().get(id)
Expand Down

0 comments on commit acd7e06

Please sign in to comment.