Skip to content

Commit

Permalink
Made remote logging configurable via env variables, changed print sta…
Browse files Browse the repository at this point in the history
…tements to logging statements.
  • Loading branch information
stefanhellander committed Mar 7, 2024
1 parent 81f3373 commit 8952d8b
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 206 deletions.
6 changes: 2 additions & 4 deletions fedn/cli/run_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,10 @@ def run_cmd(ctx):
@click.option('--heartbeat-interval', required=False, default=2)
@click.option('--reconnect-after-missed-heartbeat', required=False, default=30)
@click.option('--verbosity', required=False, default='INFO', type=click.Choice(['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], case_sensitive=False))
@click.option('--telemetry', required=False, default=False)
@click.pass_context
def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_package, force_ssl, dry_run, secure, preshared_cert,
verify, preferred_combiner, validator, trainer, init, logfile, heartbeat_interval, reconnect_after_missed_heartbeat,
verbosity, telemetry):
verbosity):
"""
:param ctx:
Expand All @@ -125,15 +124,14 @@ def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_pa
:param hearbeat_interval
:param reconnect_after_missed_heartbeat
:param verbosity
:param telemetry
:return:
"""
remote = False if local_package else True
config = {'discover_host': discoverhost, 'discover_port': discoverport, 'token': token, 'name': name,
'client_id': client_id, 'remote_compute_context': remote, 'force_ssl': force_ssl, 'dry_run': dry_run, 'secure': secure,
'preshared_cert': preshared_cert, 'verify': verify, 'preferred_combiner': preferred_combiner,
'validator': validator, 'trainer': trainer, 'init': init, 'logfile': logfile, 'heartbeat_interval': heartbeat_interval,
'reconnect_after_missed_heartbeat': reconnect_after_missed_heartbeat, 'verbosity': verbosity, 'telemetry': telemetry}
'reconnect_after_missed_heartbeat': reconnect_after_missed_heartbeat, 'verbosity': verbosity}

if init:
apply_config(config)
Expand Down
6 changes: 4 additions & 2 deletions fedn/fedn/common/certificate/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from OpenSSL import crypto

from fedn.common.log_config import logger


class Certificate:
"""
Expand All @@ -20,9 +22,9 @@ def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", cre
try:
os.makedirs(cwd)
except OSError:
print("Directory exists, will store all cert and keys here.")
logger.info("Directory exists, will store all cert and keys here.")
else:
print(
logger.info(
"Successfully created the directory to store cert and keys in {}".format(cwd))

self.key_path = os.path.join(cwd, key_name)
Expand Down
156 changes: 17 additions & 139 deletions fedn/fedn/common/log_config.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,18 @@
import json
import logging
import logging.config
import os
import threading
from functools import wraps

import psutil
import requests
import urllib3

try:

import platform
import socket

from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.semconv.resource import ResourceAttributes

telemetry_enabled = True
except ImportError:
telemetry_enabled = False

# def get_system_info():
# system_info = [
# ["os.name", os.name],
# ["platform.system", platform.system()],
# ["platform.release", platform.release()],
# ["hostname", socket.gethostname()],
# ["ip_address", socket.gethostbyname(socket.gethostname())],
# ["cpu_count", psutil.cpu_count(logical=True)],
# ["total_memory", psutil.virtual_memory().total],
# ["total_disk", psutil.disk_usage('/').total],
# ]
# return system_info


# # Configure the tracer to export traces to Jaeger
# resource = Resource.create({ResourceAttributes.SERVICE_NAME: "FEDn Client"})
# tracer_provider = TracerProvider(resource=resource)
# trace.set_tracer_provider(tracer_provider)

# # Create a JaegerExporter
# jaeger_exporter = JaegerExporter(
# agent_host_name='localhost',
# agent_port=6831,
# )

# # Add the Jaeger exporter to the tracer provider
# tracer_provider.add_span_processor(
# BatchSpanProcessor(jaeger_exporter)
# )

# tracer = None

log_levels = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL
}


urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Expand All @@ -69,88 +25,6 @@
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
handler.setFormatter(formatter)

# def add_trace(name=""):
# def decorator(func):
# @wraps(func)
# def wrapper(*args, **kwargs):
# self = args[0]
# name = func.__name__
# if tracer:

# with tracer.start_as_current_span(name) as span:
# # print("name={}....{}".format(name, attributes))
# if self.trace_attribs:
# for attrib in self.trace_attribs:
# span.set_attribute(attrib[0], attrib[1])
# # system_attribs = get_system_info()
# # print(system_attribs)
# # for attrib in system_attribs:
# # span.set_attribute(attrib[0], attrib[1])
# return func(*args, **kwargs)
# else:
# return func(*args, **kwargs)
# return wrapper
# return decorator


def get_tracer():
global tracer
return tracer


def enable_tracing():
global tracer
tracer = trace.get_tracer(__name__)

def get_disk_usage(path='/'):
disk_usage = psutil.disk_usage(path)

total_gb = disk_usage.total / (1024**3)
used_gb = disk_usage.used / (1024**3)
free_gb = disk_usage.free / (1024**3)
percent_used = disk_usage.percent

return total_gb, used_gb, free_gb, percent_used


def get_network_io_in_gb():
net_io = psutil.net_io_counters()
bytes_sent = net_io.bytes_sent / (1024**3) # Convert from bytes to GB
bytes_recv = net_io.bytes_recv / (1024**3) # Convert from bytes to GB

return bytes_sent, bytes_recv


def periodic_function():
print("This function is called every 5 seconds.")
# Get the current process ID
pid = os.getpid()

# Get the process instance for the current process
p = psutil.Process(pid)

# CPU usage of current process
cpu_usage = p.cpu_percent(interval=0.01)

# Memory usage of current process
memory_usage = p.memory_info().rss / (1024 * 1024 * 1024) # Convert bytes to GB

print(f"CPU Usage: {cpu_usage}%")
print(f"Memory Usage: {memory_usage} GB")
total_gb, used_gb, free_gb, percent_used = get_disk_usage('/')

print(f"Total: {total_gb:.2f} GB")
print(f"Used: {used_gb:.2f} GB")
print(f"Free: {free_gb:.2f} GB")
print(f"Percent Used: {percent_used}%")

bytes_sent, bytes_recv = get_network_io_in_gb()

print(f"GB Sent: {bytes_sent:.2f} GB")
print(f"GB Received: {bytes_recv:.2f} GB")
# Schedule this function to be called again after 5 seconds
threading.Timer(5, periodic_function).start()


class CustomHTTPHandler(logging.handlers.HTTPHandler):
def __init__(self, host, url, method='POST', credentials=None, projectname='', apptype=''):
Expand Down Expand Up @@ -192,20 +66,24 @@ def emit(self, record):
pass


def log_remote(server='http://studio-studio:8080', path='/api/applog/'):
# http_handler = logging.handlers.HTTPHandler(server, '/log', method='POST')
# Remote logging can only be configured via environment variables for now.
REMOTE_LOG_SERVER = os.environ.get('FEDN_REMOTE_LOG_URL', False)
REMOTE_LOG_PATH = os.environ.get('FEDN_REMOTE_LOG_PATH', False)
REMOTE_LOG_LEVEL = os.environ.get('FEDN_REMOTE_LOG_LEVEL', 'INFO')

if REMOTE_LOG_SERVER:
rloglevel = log_levels.get(REMOTE_LOG_LEVEL, logging.INFO)
http_handler = CustomHTTPHandler(
host=server,
url=path,
host=REMOTE_LOG_SERVER,
url=REMOTE_LOG_PATH,
method='POST',
credentials=None, # Basic Auth
projectname='test-project',
apptype='client'
)
http_handler.setLevel(logging.DEBUG)
http_handler.setLevel(rloglevel)
logger.addHandler(http_handler)

log_remote()

def set_log_level_from_string(level_str):
"""
Expand Down
11 changes: 6 additions & 5 deletions fedn/fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from werkzeug.utils import secure_filename

from fedn.common.config import get_controller_config, get_network_config
from fedn.common.log_config import logger
from fedn.network.combiner.interfaces import (CombinerInterface,
CombinerUnavailableError)
from fedn.network.state import ReducerState, ReducerStateToString
Expand Down Expand Up @@ -274,7 +275,7 @@ def _get_compute_package_name(self):
name = package_objects["storage_file_name"]
except KeyError as e:
message = "No compute package found. Key error."
print(e)
logger.debug(e)
return None, message
return name, "success"

Expand Down Expand Up @@ -655,7 +656,7 @@ def add_client(self, client_id, preferred_combiner, remote_addr):
"certificate": cert,
"helper_type": self.control.statestore.get_helper(),
}
print("Seding payload: ", payload, flush=True)
logger.info(f"Seding payload: {payload}")

return jsonify(payload)

Expand Down Expand Up @@ -687,7 +688,7 @@ def set_initial_model(self, file):
model = helper.load(object)
self.control.commit(file.filename, model)
except Exception as e:
print(e, flush=True)
logger.debug(e)
return jsonify({"success": False, "message": e})

return jsonify(
Expand Down Expand Up @@ -967,7 +968,7 @@ def get_plot_data(self, feature=None):
except Exception as e:
valid_metrics = None
box_plot = None
print(e, flush=True)
logger.debug(e)

result = {
"valid_metrics": valid_metrics,
Expand Down Expand Up @@ -1081,7 +1082,7 @@ def start_session(
clients_available = clients_available + int(nr_active_clients)
except CombinerUnavailableError as e:
# TODO: Handle unavailable combiner, stop session or continue?
print("COMBINER UNAVAILABLE: {}".format(e), flush=True)
logger.error("COMBINER UNAVAILABLE: {}".format(e))
continue

if clients_available < min_clients:
Expand Down
13 changes: 7 additions & 6 deletions fedn/fedn/network/api/network.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64

from fedn.common.log_config import logger
from fedn.network.combiner.interfaces import CombinerInterface
from fedn.network.loadbalancer.leastpacked import LeastPacked

Expand Down Expand Up @@ -67,13 +68,13 @@ def add_combiner(self, combiner):
:return: None
"""
if not self.control.idle():
print("Reducer is not idle, cannot add additional combiner.")
logger.warning("Reducer is not idle, cannot add additional combiner.")
return

if self.get_combiner(combiner.name):
return

print("adding combiner {}".format(combiner.name), flush=True)
logger.info("adding combiner {}".format(combiner.name))
self.statestore.set_combiner(combiner.to_dict())

def remove_combiner(self, combiner):
Expand All @@ -84,7 +85,7 @@ def remove_combiner(self, combiner):
:return: None
"""
if not self.control.idle():
print("Reducer is not idle, cannot remove combiner.")
logger.warning("Reducer is not idle, cannot remove combiner.")
return
self.statestore.delete_combiner(combiner.name)

Expand All @@ -105,8 +106,8 @@ def handle_unavailable_combiner(self, combiner):
:return: None
"""
# TODO: Implement strategy to handle an unavailable combiner.
print("REDUCER CONTROL: Combiner {} unavailable.".format(
combiner.name), flush=True)
logger.warning("REDUCER CONTROL: Combiner {} unavailable.".format(
combiner.name))

def add_client(self, client):
""" Add a new client to the network.
Expand All @@ -119,7 +120,7 @@ def add_client(self, client):
if self.get_client(client['name']):
return

print("adding client {}".format(client['name']), flush=True)
logger.info("adding client {}".format(client['name']))
self.statestore.set_client(client)

def get_client(self, name):
Expand Down
33 changes: 8 additions & 25 deletions fedn/fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,32 +84,15 @@ def __init__(self, config):
set_log_level_from_string(config.get('verbosity', "INFO"))
set_log_stream(config.get('logfile', None))

# if config.get('telemetry', False):
# logger.debug("Telemetry enabled.")
# log_remote()
# enable_tracing()
# proj = config['discover_host'].split('/')[1]
# self.trace_attribs = [["project", proj], ["client_name", config["name"]]]
# system_info, gpu_info = get_system_info()
# print(system_info)
# with get_tracer().start_as_current_span("TelemetryInit") as span:
# for key, value in system_info.items():
# span.set_attribute(key, value)
# print(gpu_info)
# for attrib in gpu_info:
# span.set_attribute(attrib[0], attrib[1])
# for attrib in self.trace_attribs:
# span.set_attribute(attrib[0], attrib[1])

self.connector = ConnectorClient(host=config['discover_host'],
port=config['discover_port'],
token=config['token'],
name=config['name'],
remote_package=config['remote_compute_context'],
force_ssl=config['force_ssl'],
verify=config['verify'],
combiner=config['preferred_combiner'],
id=config['client_id'])
port=config['discover_port'],
token=config['token'],
name=config['name'],
remote_package=config['remote_compute_context'],
force_ssl=config['force_ssl'],
verify=config['verify'],
combiner=config['preferred_combiner'],
id=config['client_id'])

# Validate client name
match = re.search(VALID_NAME_REGEX, config['name'])
Expand Down
2 changes: 1 addition & 1 deletion fedn/fedn/network/clients/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def assign(self):
allow_redirects=True,
headers={'Authorization': 'Token {}'.format(self.token)})
except Exception as e:
print('***** {}'.format(e), flush=True)
logger.debug('***** {}'.format(e), flush=True)
return Status.Unassigned, {}

if retval.status_code == 400:
Expand Down
Loading

0 comments on commit 8952d8b

Please sign in to comment.