diff --git a/.vscode/settings.json b/.vscode/settings.json index 07cfc57ae..d4c2ea8ad 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,7 +1,7 @@ { "editor.formatOnSave": true, "editor.codeActionsOnSave": { - "source.organizeImports": true + "source.organizeImports": "explicit" }, "python.linting.enabled": true, "python.linting.flake8Enabled": true, diff --git a/Dockerfile b/Dockerfile index 5a7259859..47e3201c7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,13 +7,6 @@ ARG GRPC_HEALTH_PROBE_VERSION="" # Requirements (use MNIST Keras as default) ARG REQUIREMENTS="" -# Add FEDn and default configs -COPY fedn /app/fedn -COPY config/settings-client.yaml.template /app/config/settings-client.yaml -COPY config/settings-combiner.yaml.template /app/config/settings-combiner.yaml -COPY config/settings-reducer.yaml.template /app/config/settings-reducer.yaml -COPY $REQUIREMENTS /app/config/requirements.txt - # Install developer tools (needed for psutil) RUN apt-get update && apt-get install -y python3-dev gcc @@ -27,6 +20,12 @@ RUN if [ ! -z "$GRPC_HEALTH_PROBE_VERSION" ]; then \ echo "No grpc_health_probe version specified, skipping installation"; \ fi +# Add FEDn and default configs + +COPY config/settings-client.yaml.template /app/config/settings-client.yaml +COPY config/settings-combiner.yaml.template /app/config/settings-combiner.yaml +COPY config/settings-reducer.yaml.template /app/config/settings-reducer.yaml +COPY $REQUIREMENTS /app/config/requirements.txt # Create FEDn app directory SHELL ["/bin/bash", "-c"] @@ -34,19 +33,24 @@ RUN mkdir -p /app \ && mkdir -p /app/client \ && mkdir -p /app/certs \ && mkdir -p /app/client/package \ - && mkdir -p /app/certs \ + && mkdir -p /app/certs # # Install FEDn and requirements - && python -m venv /venv \ - && /venv/bin/pip install --upgrade pip \ - && /venv/bin/pip install --no-cache-dir -e /app/fedn \ - && if [[ ! -z "$REQUIREMENTS" ]]; then \ +RUN python -m venv /venv \ + && /venv/bin/pip install --upgrade pip + +RUN if [[ ! -z "$REQUIREMENTS" ]]; then \ /venv/bin/pip install --no-cache-dir -r /app/config/requirements.txt; \ fi \ # # Clean up && rm -r /app/config/requirements.txt +COPY fedn /app/fedn + +RUN /venv/bin/pip install --no-cache-dir -e /app/fedn + + # Setup working directory WORKDIR /app ENTRYPOINT [ "/venv/bin/fedn" ] \ No newline at end of file diff --git a/fedn/fedn/common/log_config.py b/fedn/fedn/common/log_config.py index fbc3541cd..d6a04ec46 100644 --- a/fedn/fedn/common/log_config.py +++ b/fedn/fedn/common/log_config.py @@ -1,15 +1,19 @@ +import json import logging import logging.config +import os +import threading from functools import wraps +import psutil +import requests import urllib3 try: - import os + import platform import socket - import psutil from opentelemetry import trace from opentelemetry.exporter.jaeger.thrift import JaegerExporter from opentelemetry.sdk.resources import Resource @@ -21,36 +25,37 @@ except ImportError: telemetry_enabled = False -def get_system_info(): - system_info = [ - ["os.name", os.name], - ["platform.system", platform.system()], - ["platform.release", platform.release()], - ["hostname", socket.gethostname()], - ["ip_address", socket.gethostbyname(socket.gethostname())], - ["cpu_count", psutil.cpu_count(logical=True)], - ["total_memory", psutil.virtual_memory().total], - ["total_disk", psutil.disk_usage('/').total], - ] - return system_info +# def get_system_info(): +# system_info = [ +# ["os.name", os.name], +# ["platform.system", platform.system()], +# ["platform.release", platform.release()], +# ["hostname", socket.gethostname()], +# ["ip_address", socket.gethostbyname(socket.gethostname())], +# ["cpu_count", psutil.cpu_count(logical=True)], +# ["total_memory", psutil.virtual_memory().total], +# ["total_disk", psutil.disk_usage('/').total], +# ] +# return system_info -# Configure the tracer to export traces to Jaeger -resource = Resource.create({ResourceAttributes.SERVICE_NAME: "FEDn Client"}) -tracer_provider = TracerProvider(resource=resource) -trace.set_tracer_provider(tracer_provider) -# Create a JaegerExporter -jaeger_exporter = JaegerExporter( - agent_host_name='localhost', - agent_port=6831, -) +# # Configure the tracer to export traces to Jaeger +# resource = Resource.create({ResourceAttributes.SERVICE_NAME: "FEDn Client"}) +# tracer_provider = TracerProvider(resource=resource) +# trace.set_tracer_provider(tracer_provider) -# Add the Jaeger exporter to the tracer provider -tracer_provider.add_span_processor( - BatchSpanProcessor(jaeger_exporter) -) +# # Create a JaegerExporter +# jaeger_exporter = JaegerExporter( +# agent_host_name='localhost', +# agent_port=6831, +# ) -tracer = None +# # Add the Jaeger exporter to the tracer provider +# tracer_provider.add_span_processor( +# BatchSpanProcessor(jaeger_exporter) +# ) + +# tracer = None @@ -58,48 +63,150 @@ def get_system_info(): logging.getLogger("urllib3").setLevel(logging.ERROR) handler = logging.StreamHandler() -logger = logging.getLogger() +logger = logging.getLogger("fedn") logger.addHandler(handler) logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') handler.setFormatter(formatter) -def add_trace(name=""): - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - self = args[0] - name = func.__name__ - if tracer: +# def add_trace(name=""): +# def decorator(func): +# @wraps(func) +# def wrapper(*args, **kwargs): +# self = args[0] +# name = func.__name__ +# if tracer: - with tracer.start_as_current_span(name) as span: - # print("name={}....{}".format(name, attributes)) - if self.trace_attribs: - for attrib in self.trace_attribs: - span.set_attribute(attrib[0], attrib[1]) - # system_attribs = get_system_info() - # print(system_attribs) - # for attrib in system_attribs: - # span.set_attribute(attrib[0], attrib[1]) - return func(*args, **kwargs) - else: - return func(*args, **kwargs) - return wrapper - return decorator +# with tracer.start_as_current_span(name) as span: +# # print("name={}....{}".format(name, attributes)) +# if self.trace_attribs: +# for attrib in self.trace_attribs: +# span.set_attribute(attrib[0], attrib[1]) +# # system_attribs = get_system_info() +# # print(system_attribs) +# # for attrib in system_attribs: +# # span.set_attribute(attrib[0], attrib[1]) +# return func(*args, **kwargs) +# else: +# return func(*args, **kwargs) +# return wrapper +# return decorator + def get_tracer(): global tracer return tracer + def enable_tracing(): global tracer tracer = trace.get_tracer(__name__) -def log_remote(server='localhost:8000', path='/log'): - http_handler = logging.handlers.HTTPHandler(server, '/log', method='POST') - http_handler.setLevel(logging.WARNING) +def get_disk_usage(path='/'): + disk_usage = psutil.disk_usage(path) + + total_gb = disk_usage.total / (1024**3) + used_gb = disk_usage.used / (1024**3) + free_gb = disk_usage.free / (1024**3) + percent_used = disk_usage.percent + + return total_gb, used_gb, free_gb, percent_used + + +def get_network_io_in_gb(): + net_io = psutil.net_io_counters() + bytes_sent = net_io.bytes_sent / (1024**3) # Convert from bytes to GB + bytes_recv = net_io.bytes_recv / (1024**3) # Convert from bytes to GB + + return bytes_sent, bytes_recv + + +def periodic_function(): + print("This function is called every 5 seconds.") + # Get the current process ID + pid = os.getpid() + + # Get the process instance for the current process + p = psutil.Process(pid) + + # CPU usage of current process + cpu_usage = p.cpu_percent(interval=0.01) + + # Memory usage of current process + memory_usage = p.memory_info().rss / (1024 * 1024 * 1024) # Convert bytes to GB + + print(f"CPU Usage: {cpu_usage}%") + print(f"Memory Usage: {memory_usage} GB") + total_gb, used_gb, free_gb, percent_used = get_disk_usage('/') + + print(f"Total: {total_gb:.2f} GB") + print(f"Used: {used_gb:.2f} GB") + print(f"Free: {free_gb:.2f} GB") + print(f"Percent Used: {percent_used}%") + + bytes_sent, bytes_recv = get_network_io_in_gb() + + print(f"GB Sent: {bytes_sent:.2f} GB") + print(f"GB Received: {bytes_recv:.2f} GB") + # Schedule this function to be called again after 5 seconds + threading.Timer(5, periodic_function).start() + + +class CustomHTTPHandler(logging.handlers.HTTPHandler): + def __init__(self, host, url, method='POST', credentials=None, projectname='', apptype=''): + super().__init__(host, url, method) + self.credentials = credentials # Basic auth (username, password) + self.projectname = projectname + self.apptype = apptype + + def emit(self, record): + # Customize the log record, for example, by adding metadata + # record.projectname = self.projectname + # record.apptype = self.apptype + + # Convert log record to json format + + log_entry = self.mapLogRecord(record) + + log_entry = { + "msg": log_entry['msg'], + "levelname": log_entry['levelname'], + "project": os.environ.get("PROJECT_ID"), + "appinstance": os.environ.get("APP_ID") + + } + # Setup headers + headers = { + 'Content-type': 'application/json', + } + if self.credentials: + import base64 + auth = base64.b64encode(f"{self.credentials[0]}:{self.credentials[1]}".encode('utf-8')).decode('utf-8') + headers['Authorization'] = f'Basic {auth}' + + # Use http.client or requests to send the log data + if self.method.lower() == 'post': + requests.post(self.host+self.url, json=log_entry, headers=headers) + else: + # Implement other methods if needed, e.g., GET + pass + + +def log_remote(server='http://studio-studio:8080', path='/api/applog/'): + # http_handler = logging.handlers.HTTPHandler(server, '/log', method='POST') + http_handler = CustomHTTPHandler( + host=server, + url=path, + method='POST', + credentials=None, # Basic Auth + projectname='test-project', + apptype='client' + ) + http_handler.setLevel(logging.DEBUG) logger.addHandler(http_handler) +log_remote() + def set_log_level_from_string(level_str): """ Set the log level based on a string input. diff --git a/fedn/fedn/network/clients/client.py b/fedn/fedn/network/clients/client.py index c845f2b01..ec2d68cf8 100644 --- a/fedn/fedn/network/clients/client.py +++ b/fedn/fedn/network/clients/client.py @@ -19,11 +19,10 @@ from google.protobuf.json_format import MessageToJson from OpenSSL import SSL -import fedn.common.net.grpc.fedn_pb2 as fedn -import fedn.common.net.grpc.fedn_pb2_grpc as rpc -from fedn.common.log_config import (add_trace, enable_tracing, get_tracer, - log_remote, logger, - set_log_level_from_string, set_log_stream) +import fedn.network.grpc.fedn_pb2 as fedn +import fedn.network.grpc.fedn_pb2_grpc as rpc +from fedn.common.log_config import ( # add_trace, enable_tracing, get_tracer, + log_remote, logger, set_log_level_from_string, set_log_stream) from fedn.network.clients.connect import ConnectorClient, Status from fedn.network.clients.package import PackageRuntime from fedn.network.clients.state import ClientState, ClientStateToString @@ -35,29 +34,28 @@ VALID_NAME_REGEX = '^[a-zA-Z0-9_-]*$' import os -import platform +# import platform import socket -import GPUtil -import psutil - - -def get_system_info(): - gpus = GPUtil.getGPUs() - gpu_info = [["GPU ID: {}".format(gpu.id), gpu.name] for gpu in gpus] - - system_info = { - "os.name": os.name, - "platform.system": platform.system(), - "platform.release": platform.release(), - "hostname": socket.gethostname(), - "ip_address": socket.gethostbyname(socket.gethostname()), - "cpu_count": psutil.cpu_count(logical=True), - "total_memory": psutil.virtual_memory().total, - "total_disk": psutil.disk_usage('/').total, - # Add more details as needed - } - return system_info, gpu_info +# import GPUtil +# import psutil + +# def get_system_info(): +# # gpus = GPUtil.getGPUs() +# # gpu_info = [["GPU ID: {}".format(gpu.id), gpu.name] for gpu in gpus] +# gpu_info = [] +# system_info = { +# "os.name": os.name, +# "platform.system": platform.system(), +# "platform.release": platform.release(), +# "hostname": socket.gethostname(), +# "ip_address": socket.gethostbyname(socket.gethostname()), +# "cpu_count": psutil.cpu_count(logical=True), +# "total_memory": psutil.virtual_memory().total, +# "total_disk": psutil.disk_usage('/').total, +# # Add more details as needed +# } +# return system_info, gpu_info class GrpcAuth(grpc.AuthMetadataPlugin): def __init__(self, key): @@ -86,21 +84,22 @@ def __init__(self, config): set_log_level_from_string(config.get('verbosity', "INFO")) set_log_stream(config.get('logfile', None)) - if config.get('telemetry', False): - log_remote() - enable_tracing() - proj = config['discover_host'].split('/')[1] - self.trace_attribs = [["project", proj], ["client_name", config["name"]]] - system_info, gpu_info = get_system_info() - print(system_info) - with get_tracer().start_as_current_span("TelemetryInit") as span: - for key, value in system_info.items(): - span.set_attribute(key, value) - print(gpu_info) - for attrib in gpu_info: - span.set_attribute(attrib[0], attrib[1]) - for attrib in self.trace_attribs: - span.set_attribute(attrib[0], attrib[1]) + # if config.get('telemetry', False): + # logger.debug("Telemetry enabled.") + # log_remote() + # enable_tracing() + # proj = config['discover_host'].split('/')[1] + # self.trace_attribs = [["project", proj], ["client_name", config["name"]]] + # system_info, gpu_info = get_system_info() + # print(system_info) + # with get_tracer().start_as_current_span("TelemetryInit") as span: + # for key, value in system_info.items(): + # span.set_attribute(key, value) + # print(gpu_info) + # for attrib in gpu_info: + # span.set_attribute(attrib[0], attrib[1]) + # for attrib in self.trace_attribs: + # span.set_attribute(attrib[0], attrib[1]) self.connector = ConnectorClient(host=config['discover_host'], port=config['discover_port'], @@ -141,7 +140,7 @@ def __init__(self, config): self.state = ClientState.idle - @add_trace() + # @add_trace() def _assign(self): """Contacts the controller and asks for combiner assignment. @@ -171,7 +170,7 @@ def _assign(self): logger.info("Received combiner configuration: {}".format(client_config)) return client_config - @add_trace() + # @add_trace() def _add_grpc_metadata(self, key, value): """Add metadata for gRPC calls. @@ -194,7 +193,7 @@ def _add_grpc_metadata(self, key, value): # Set metadata using tuple concatenation self.metadata += ((key, value),) - @add_trace() + # @add_trace() def _get_ssl_certificate(self, domain, port=443): context = SSL.Context(SSL.SSLv23_METHOD) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -209,7 +208,7 @@ def _get_ssl_certificate(self, domain, port=443): cert = cert.to_cryptography().public_bytes(Encoding.PEM).decode() return cert - @add_trace() + # @add_trace() def _connect(self, client_config): """Connect to assigned combiner. @@ -277,12 +276,12 @@ def _connect(self, client_config): logger.info("Using {} compute package.".format( client_config["package"])) - @add_trace() + # @add_trace() def _disconnect(self): """Disconnect from the combiner.""" self.channel.close() - @add_trace() + # @add_trace() def _detach(self): """Detach from the FEDn network (disconnect from combiner)""" # Setting _attached to False will make all processing threads return @@ -293,7 +292,7 @@ def _detach(self): # Close gRPC connection to combiner self._disconnect() - @add_trace() + # @add_trace() def _attach(self): """Attach to the FEDn network (connect to combiner)""" # Ask controller for a combiner and connect to that combiner. @@ -308,7 +307,7 @@ def _attach(self): self._attached = True return client_config - @add_trace() + # @add_trace() def _initialize_helper(self, client_config): """Initialize the helper class for the client. @@ -322,7 +321,7 @@ def _initialize_helper(self, client_config): if 'helper_type' in client_config.keys(): self.helper = get_helper(client_config['helper_type']) - @add_trace() + # @add_trace() def _subscribe_to_combiner(self, config): """Listen to combiner message stream and start all processing threads. @@ -343,7 +342,7 @@ def _subscribe_to_combiner(self, config): # Start processing the client message inbox threading.Thread(target=self.process_request, daemon=True).start() - @add_trace() + # @add_trace() def _initialize_dispatcher(self, config): """ Initialize the dispatcher for the client. @@ -626,7 +625,7 @@ def _process_validation_request(self, model_id: str, is_inference: bool, session self.state = ClientState.idle return validation - @add_trace() + # @add_trace() def process_request(self): """Process training and validation tasks. """ while True: @@ -706,14 +705,14 @@ def process_request(self): except queue.Empty: pass - @add_trace() + # @add_trace() def _handle_combiner_failure(self): """ Register failed combiner connection.""" self._missed_heartbeat += 1 if self._missed_heartbeat > self.config['reconnect_after_missed_heartbeat']: self.detach()() - @add_trace() + # @add_trace() def _send_heartbeat(self, update_frequency=2.0): """Send a heartbeat to the combiner. @@ -769,7 +768,7 @@ def _send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None, status.status)) _ = self.connectorStub.SendStatus(status, metadata=self.metadata) - @add_trace() + # @add_trace() def run(self): """ Run the client. """ try: diff --git a/fedn/fedn/network/combiner/combiner.py b/fedn/fedn/network/combiner/combiner.py index 5adf2d412..a2b31b72d 100644 --- a/fedn/fedn/network/combiner/combiner.py +++ b/fedn/fedn/network/combiner/combiner.py @@ -12,8 +12,8 @@ import fedn.network.grpc.fedn_pb2 as fedn import fedn.network.grpc.fedn_pb2_grpc as rpc -from fedn.common.log_config import (logger, set_log_level_from_string, - set_log_stream) +from fedn.common.log_config import (logger, periodic_function, + set_log_level_from_string, set_log_stream) from fedn.network.combiner.connect import ConnectorCombiner, Status from fedn.network.combiner.modelservice import ModelService from fedn.network.combiner.roundhandler import RoundHandler @@ -62,6 +62,7 @@ def __init__(self, config): set_log_level_from_string(config.get('verbosity', "INFO")) set_log_stream(config.get('logfile', None)) + periodic_function() # Client queues self.clients = {}