Skip to content

Commit

Permalink
Collect logs, start looking at collecting resource usage data.
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanhellander committed Mar 7, 2024
1 parent fc0d5af commit 81f3373
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 123 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
"source.organizeImports": "explicit"
},
"python.linting.enabled": true,
"python.linting.flake8Enabled": true,
Expand Down
28 changes: 16 additions & 12 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,6 @@ ARG GRPC_HEALTH_PROBE_VERSION=""
# Requirements (use MNIST Keras as default)
ARG REQUIREMENTS=""

# Add FEDn and default configs
COPY fedn /app/fedn
COPY config/settings-client.yaml.template /app/config/settings-client.yaml
COPY config/settings-combiner.yaml.template /app/config/settings-combiner.yaml
COPY config/settings-reducer.yaml.template /app/config/settings-reducer.yaml
COPY $REQUIREMENTS /app/config/requirements.txt

# Install developer tools (needed for psutil)
RUN apt-get update && apt-get install -y python3-dev gcc

Expand All @@ -27,26 +20,37 @@ RUN if [ ! -z "$GRPC_HEALTH_PROBE_VERSION" ]; then \
echo "No grpc_health_probe version specified, skipping installation"; \
fi

# Add FEDn and default configs

COPY config/settings-client.yaml.template /app/config/settings-client.yaml
COPY config/settings-combiner.yaml.template /app/config/settings-combiner.yaml
COPY config/settings-reducer.yaml.template /app/config/settings-reducer.yaml
COPY $REQUIREMENTS /app/config/requirements.txt

# Create FEDn app directory
SHELL ["/bin/bash", "-c"]
RUN mkdir -p /app \
&& mkdir -p /app/client \
&& mkdir -p /app/certs \
&& mkdir -p /app/client/package \
&& mkdir -p /app/certs \
&& mkdir -p /app/certs
#
# Install FEDn and requirements
&& python -m venv /venv \
&& /venv/bin/pip install --upgrade pip \
&& /venv/bin/pip install --no-cache-dir -e /app/fedn \
&& if [[ ! -z "$REQUIREMENTS" ]]; then \
RUN python -m venv /venv \
&& /venv/bin/pip install --upgrade pip

RUN if [[ ! -z "$REQUIREMENTS" ]]; then \
/venv/bin/pip install --no-cache-dir -r /app/config/requirements.txt; \
fi \
#
# Clean up
&& rm -r /app/config/requirements.txt

COPY fedn /app/fedn

RUN /venv/bin/pip install --no-cache-dir -e /app/fedn


# Setup working directory
WORKDIR /app
ENTRYPOINT [ "/venv/bin/fedn" ]
213 changes: 160 additions & 53 deletions fedn/fedn/common/log_config.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import json
import logging
import logging.config
import os
import threading
from functools import wraps

import psutil
import requests
import urllib3

try:
import os

import platform
import socket

import psutil
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource
Expand All @@ -21,85 +25,188 @@
except ImportError:
telemetry_enabled = False

def get_system_info():
system_info = [
["os.name", os.name],
["platform.system", platform.system()],
["platform.release", platform.release()],
["hostname", socket.gethostname()],
["ip_address", socket.gethostbyname(socket.gethostname())],
["cpu_count", psutil.cpu_count(logical=True)],
["total_memory", psutil.virtual_memory().total],
["total_disk", psutil.disk_usage('/').total],
]
return system_info
# def get_system_info():
# system_info = [
# ["os.name", os.name],
# ["platform.system", platform.system()],
# ["platform.release", platform.release()],
# ["hostname", socket.gethostname()],
# ["ip_address", socket.gethostbyname(socket.gethostname())],
# ["cpu_count", psutil.cpu_count(logical=True)],
# ["total_memory", psutil.virtual_memory().total],
# ["total_disk", psutil.disk_usage('/').total],
# ]
# return system_info

# Configure the tracer to export traces to Jaeger
resource = Resource.create({ResourceAttributes.SERVICE_NAME: "FEDn Client"})
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)

# Create a JaegerExporter
jaeger_exporter = JaegerExporter(
agent_host_name='localhost',
agent_port=6831,
)
# # Configure the tracer to export traces to Jaeger
# resource = Resource.create({ResourceAttributes.SERVICE_NAME: "FEDn Client"})
# tracer_provider = TracerProvider(resource=resource)
# trace.set_tracer_provider(tracer_provider)

# Add the Jaeger exporter to the tracer provider
tracer_provider.add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# # Create a JaegerExporter
# jaeger_exporter = JaegerExporter(
# agent_host_name='localhost',
# agent_port=6831,
# )

tracer = None
# # Add the Jaeger exporter to the tracer provider
# tracer_provider.add_span_processor(
# BatchSpanProcessor(jaeger_exporter)
# )

# tracer = None



urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logging.getLogger("urllib3").setLevel(logging.ERROR)

handler = logging.StreamHandler()
logger = logging.getLogger()
logger = logging.getLogger("fedn")
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
handler.setFormatter(formatter)

def add_trace(name=""):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
self = args[0]
name = func.__name__
if tracer:
# def add_trace(name=""):
# def decorator(func):
# @wraps(func)
# def wrapper(*args, **kwargs):
# self = args[0]
# name = func.__name__
# if tracer:

with tracer.start_as_current_span(name) as span:
# print("name={}....{}".format(name, attributes))
if self.trace_attribs:
for attrib in self.trace_attribs:
span.set_attribute(attrib[0], attrib[1])
# system_attribs = get_system_info()
# print(system_attribs)
# for attrib in system_attribs:
# span.set_attribute(attrib[0], attrib[1])
return func(*args, **kwargs)
else:
return func(*args, **kwargs)
return wrapper
return decorator
# with tracer.start_as_current_span(name) as span:
# # print("name={}....{}".format(name, attributes))
# if self.trace_attribs:
# for attrib in self.trace_attribs:
# span.set_attribute(attrib[0], attrib[1])
# # system_attribs = get_system_info()
# # print(system_attribs)
# # for attrib in system_attribs:
# # span.set_attribute(attrib[0], attrib[1])
# return func(*args, **kwargs)
# else:
# return func(*args, **kwargs)
# return wrapper
# return decorator


def get_tracer():
global tracer
return tracer


def enable_tracing():
global tracer
tracer = trace.get_tracer(__name__)

def log_remote(server='localhost:8000', path='/log'):
http_handler = logging.handlers.HTTPHandler(server, '/log', method='POST')
http_handler.setLevel(logging.WARNING)
def get_disk_usage(path='/'):
disk_usage = psutil.disk_usage(path)

total_gb = disk_usage.total / (1024**3)
used_gb = disk_usage.used / (1024**3)
free_gb = disk_usage.free / (1024**3)
percent_used = disk_usage.percent

return total_gb, used_gb, free_gb, percent_used


def get_network_io_in_gb():
net_io = psutil.net_io_counters()
bytes_sent = net_io.bytes_sent / (1024**3) # Convert from bytes to GB
bytes_recv = net_io.bytes_recv / (1024**3) # Convert from bytes to GB

return bytes_sent, bytes_recv


def periodic_function():
print("This function is called every 5 seconds.")
# Get the current process ID
pid = os.getpid()

# Get the process instance for the current process
p = psutil.Process(pid)

# CPU usage of current process
cpu_usage = p.cpu_percent(interval=0.01)

# Memory usage of current process
memory_usage = p.memory_info().rss / (1024 * 1024 * 1024) # Convert bytes to GB

print(f"CPU Usage: {cpu_usage}%")
print(f"Memory Usage: {memory_usage} GB")
total_gb, used_gb, free_gb, percent_used = get_disk_usage('/')

print(f"Total: {total_gb:.2f} GB")
print(f"Used: {used_gb:.2f} GB")
print(f"Free: {free_gb:.2f} GB")
print(f"Percent Used: {percent_used}%")

bytes_sent, bytes_recv = get_network_io_in_gb()

print(f"GB Sent: {bytes_sent:.2f} GB")
print(f"GB Received: {bytes_recv:.2f} GB")
# Schedule this function to be called again after 5 seconds
threading.Timer(5, periodic_function).start()


class CustomHTTPHandler(logging.handlers.HTTPHandler):
def __init__(self, host, url, method='POST', credentials=None, projectname='', apptype=''):
super().__init__(host, url, method)
self.credentials = credentials # Basic auth (username, password)
self.projectname = projectname
self.apptype = apptype

def emit(self, record):
# Customize the log record, for example, by adding metadata
# record.projectname = self.projectname
# record.apptype = self.apptype

# Convert log record to json format

log_entry = self.mapLogRecord(record)

log_entry = {
"msg": log_entry['msg'],
"levelname": log_entry['levelname'],
"project": os.environ.get("PROJECT_ID"),
"appinstance": os.environ.get("APP_ID")

}
# Setup headers
headers = {
'Content-type': 'application/json',
}
if self.credentials:
import base64
auth = base64.b64encode(f"{self.credentials[0]}:{self.credentials[1]}".encode('utf-8')).decode('utf-8')
headers['Authorization'] = f'Basic {auth}'

# Use http.client or requests to send the log data
if self.method.lower() == 'post':
requests.post(self.host+self.url, json=log_entry, headers=headers)
else:
# Implement other methods if needed, e.g., GET
pass


def log_remote(server='http://studio-studio:8080', path='/api/applog/'):
# http_handler = logging.handlers.HTTPHandler(server, '/log', method='POST')
http_handler = CustomHTTPHandler(
host=server,
url=path,
method='POST',
credentials=None, # Basic Auth
projectname='test-project',
apptype='client'
)
http_handler.setLevel(logging.DEBUG)
logger.addHandler(http_handler)

log_remote()

def set_log_level_from_string(level_str):
"""
Set the log level based on a string input.
Expand Down
Loading

0 comments on commit 81f3373

Please sign in to comment.