Skip to content

Commit

Permalink
Telemetry in client
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanhellander committed Apr 25, 2024
1 parent 2b91787 commit f0646a1
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 21 deletions.
3 changes: 2 additions & 1 deletion fedn/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import click

from fedn.common.telemetry import tracer, get_context

CONTEXT_SETTINGS = dict(
# Support -h as a shortcut for --help
help_option_names=['-h', '--help'],
Expand All @@ -10,7 +12,6 @@
@click.pass_context
def main(ctx):
"""
:param ctx:
"""
ctx.obj = dict()
48 changes: 34 additions & 14 deletions fedn/cli/run_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from fedn.common.exceptions import InvalidClientConfig
from fedn.common.log_config import logger
from fedn.common.telemetry import tracer, get_context
from fedn.network.clients.client import Client
from fedn.network.combiner.combiner import Combiner
from fedn.utils.dispatcher import Dispatcher, _read_yaml_file
Expand Down Expand Up @@ -73,6 +74,22 @@ def validate_client_config(config):
raise InvalidClientConfig("Could not load config from file. Check config")


def sanitize_config(config):
# List of keys to sanitize (remove or mask)
sensitive_keys = ["discover_host",
"discover_port",
"name",
"token",
"client_id",
"preshared_cert"
]

# Create a sanitized copy of the dictionary
sanitized_config = {key: (config[key] if key not in sensitive_keys else "***") for key in config}

return sanitized_config


@main.group('run')
@click.pass_context
def run_cmd(ctx):
Expand Down Expand Up @@ -130,20 +147,23 @@ def client_cmd(ctx, discoverhost, discoverport, token, name, client_id, local_pa
:param verbosity
:return:
"""
remote = False if local_package else True
config = {'discover_host': discoverhost, 'discover_port': discoverport, 'token': token, 'name': name,
'client_id': client_id, 'remote_compute_context': remote, 'force_ssl': force_ssl, 'dry_run': dry_run, 'secure': secure,
'preshared_cert': preshared_cert, 'verify': verify, 'preferred_combiner': preferred_combiner,
'validator': validator, 'trainer': trainer, 'init': init, 'logfile': logfile, 'heartbeat_interval': heartbeat_interval,
'reconnect_after_missed_heartbeat': reconnect_after_missed_heartbeat, 'verbosity': verbosity}

if init:
apply_config(config)

validate_client_config(config)

client = Client(config)
client.run()
with tracer.start_as_current_span("client_cmd") as span:
remote = False if local_package else True
config = {'discover_host': discoverhost, 'discover_port': discoverport, 'token': token, 'name': name,
'client_id': client_id, 'remote_compute_context': remote, 'force_ssl': force_ssl, 'dry_run': dry_run, 'secure': secure,
'preshared_cert': preshared_cert, 'verify': verify, 'preferred_combiner': preferred_combiner,
'validator': validator, 'trainer': trainer, 'init': init, 'logfile': logfile, 'heartbeat_interval': heartbeat_interval,
'reconnect_after_missed_heartbeat': reconnect_after_missed_heartbeat, 'verbosity': verbosity}
span.set_attribute("client_config", str(sanitize_config(config)))
context = get_context()
span.set_attribute("context", str(context))
if init:
apply_config(config)

validate_client_config(config)

client = Client(config)
client.run()


@run_cmd.command('combiner')
Expand Down
139 changes: 139 additions & 0 deletions fedn/fedn/common/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
import os
import psutil
import platform

from fedn.common.log_config import logger


class NoopTracer:
def start_as_current_span(self, name, context=None, kind=None):
return NoopSpan()


class NoopSpan:
def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def set_attribute(self, key, value):
pass

def add_event(self, name, attributes=None):
pass

def end(self, end_time=None):
pass

def record_exception(self, exception, attributes=None):
pass

def is_recording(self):
return False

def __call__(self, func):
# Makes NoopSpan callable, and it returns the function itself, hence preserving its original functionality
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper


def initialize_tracer():
if os.getenv("FEDN_TELEMETRY", 'true').lower() in ('false', '0'):
return NoopTracer()
else:
logger.info("Telemetry enabled. Disable by setting FEDN_TELEMETRY=false")

telemetry_server = os.getenv("FEDN_TELEMETRY_SERVER", 'jaeger.akkelis.com')
telemetry_port = os.getenv("FEDN_TELEMETRY_PORT", 6831)

# Configure the tracer to report data to Jaeger
trace.set_tracer_provider(
TracerProvider(
resource=Resource.create({SERVICE_NAME: "FEDn"})
)
)

# Set up the Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name=telemetry_server,
agent_port=telemetry_port,
)

# Attach the exporter to the tracer provider
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)

return trace.get_tracer(__name__)


def get_context():
battery = psutil.sensors_battery()
context = {
"fedn": {
"version": "0.9.1",
},
"hw": {
"cpu_count": os.cpu_count(),
"total_memory": psutil.virtual_memory().total,
"available_memory": psutil.virtual_memory().available,
"total_disk_space": psutil.disk_usage('/').total,
"available_disk_space": psutil.disk_usage('/').free,
"battery": {
"percent": battery.percent if battery else None,
"plugged_in": battery.power_plugged if battery else None,
}
},
"platform": {
"system": platform.system(),
"release": platform.release(),
"platform": platform.platform(),
"python_implementation": platform.python_implementation(),
"python_version": platform.python_version(),
"machine": platform.machine(),
"architecture": platform.architecture(),
"version": platform.uname().version,
},
}
return context


# Initialize tracer
tracer = initialize_tracer()
# try:
# with tracer.start_as_current_span("initialize_tracer") as span:
# context = get_context()
# span.set_attribute("context", str(context))
# except Exception as e:
# logger.error("Failed to initialize tracer: {}".format(e))


def trace_all_methods(cls):
def traced_method(method):
"""Wrap the method so that it executes within a traced span."""
def wrapper(*args, **kwargs):
with tracer.start_as_current_span(method.__name__) as span:
# Set the class name attribute on the span
span.set_attribute("class_name", cls.__name__)
return method(*args, **kwargs)
return wrapper

# Apply the traced_method decorator to each callable method of the class
for key, method in cls.__dict__.items():
if callable(method) and not key.startswith('__'):
# Set the decorated method back on the class
setattr(cls, key, traced_method(method))
return cls

def trace_module_functions(module):
for attribute_name in dir(module):
attribute = getattr(module, attribute_name)
if callable(attribute):
setattr(module, attribute_name, tracer.start_as_current_span(attribute.__name__)(attribute))
11 changes: 8 additions & 3 deletions fedn/fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from fedn.common.config import FEDN_AUTH_SCHEME, FEDN_PACKAGE_EXTRACT_DIR
from fedn.common.log_config import (logger, set_log_level_from_string,
set_log_stream)
from fedn.common.telemetry import tracer, trace_all_methods
from fedn.network.clients.connect import ConnectorClient, Status
from fedn.network.clients.package import PackageRuntime
from fedn.network.clients.state import ClientState, ClientStateToString
Expand All @@ -36,6 +37,7 @@
VALID_NAME_REGEX = '^[a-zA-Z0-9_-]*$'


@trace_all_methods
class GrpcAuth(grpc.AuthMetadataPlugin):
def __init__(self, key):
self._key = key
Expand All @@ -44,6 +46,7 @@ def __call__(self, context, callback):
callback((('authorization', f'{FEDN_AUTH_SCHEME} {self._key}'),), None)


@trace_all_methods
class Client:
"""FEDn Client. Service running on client/datanodes in a federation,
recieving and handling model update and model validation requests.
Expand All @@ -52,7 +55,6 @@ class Client:
and settings governing e.g. client-combiner assignment behavior.
:type config: dict
"""

def __init__(self, config):
"""Initialize the client."""
self.state = None
Expand Down Expand Up @@ -793,7 +795,10 @@ def run(self):
self._subscribe_to_combiner(self.config)
cnt = 0
if self.error_state:
logger.error("Client in error state. Terminiating.")
sys.exit("Client in error state. Terminiating.")
logger.error("Client in error state. Terminating.")
sys.exit("Client in error state. Terminating.")

except KeyboardInterrupt:
with tracer.start_as_current_span("Shutting down."):
pass
logger.info("Shutting down.")
2 changes: 2 additions & 0 deletions fedn/fedn/network/clients/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
FEDN_AUTH_REFRESH_TOKEN_URI, FEDN_AUTH_SCHEME,
FEDN_CUSTOM_URL_PREFIX)
from fedn.common.log_config import logger
from fedn.common.telemetry import trace_all_methods


class Status(enum.Enum):
Expand All @@ -23,6 +24,7 @@ class Status(enum.Enum):
UnMatchedConfig = 4


@trace_all_methods
class ConnectorClient:
""" Connector for assigning client to a combiner in the FEDn network.
Expand Down
2 changes: 2 additions & 0 deletions fedn/fedn/network/clients/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

from fedn.common.config import FEDN_AUTH_SCHEME, FEDN_CUSTOM_URL_PREFIX
from fedn.common.log_config import logger
from fedn.common.telemetry import trace_all_methods
from fedn.utils.checksum import sha
from fedn.utils.dispatcher import Dispatcher, _read_yaml_file


@trace_all_methods
class PackageRuntime:
""" PackageRuntime is used to download, validate and unpack compute packages.
Expand Down
3 changes: 2 additions & 1 deletion fedn/fedn/network/clients/state.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum

from fedn.common.telemetry import tracer

class ClientState(Enum):
""" Enum for representing the state of a client."""
Expand All @@ -8,6 +8,7 @@ class ClientState(Enum):
validating = 3


@tracer.start_as_current_span(name="ClientStateToString")
def ClientStateToString(state):
""" Convert a ClientState to a string representation.
Expand Down
7 changes: 5 additions & 2 deletions fedn/fedn/utils/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import yaml

from fedn.common.log_config import logger
from fedn.common.telemetry import tracer, trace_all_methods, trace_module_functions
from fedn.utils import PYTHON_VERSION
from fedn.utils.environment import _PythonEnv
from fedn.utils.process import _exec_cmd, _join_commands
Expand Down Expand Up @@ -150,6 +151,7 @@ def _read_yaml_file(file_path):
return cfg


@trace_all_methods
class Dispatcher:
""" Dispatcher class for compute packages.
Expand Down Expand Up @@ -237,9 +239,7 @@ def run_cmd(self,
"""
try:
cmdsandargs = cmd_type.split(' ')

entry_point = self.config['entry_points'][cmdsandargs[0]]['command']

# remove the first element, that is not a file but a command
args = cmdsandargs[1:]

Expand Down Expand Up @@ -267,3 +267,6 @@ def run_cmd(self,
except IndexError:
message = "No such argument or configuration to run."
logger.error(message)


trace_module_functions(sys.modules[__name__])

0 comments on commit f0646a1

Please sign in to comment.