Skip to content

Commit

Permalink
Chore/SK-1468 | Add new CI tests with integration with studio (#833)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wrede authored Mar 4, 2025
1 parent 2a80776 commit cabd5d4
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 17 deletions.
80 changes: 80 additions & 0 deletions .ci/tests/studio/no_pytest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from fedn import APIClient
from fedn.cli.shared import get_token, get_project_url

import os
import time

FEDN_NR_ROUNDS = int(os.environ.get("FEDN_NR_ROUNDS", 5))
FEDN_ROUND_TIMEOUT = int(os.environ.get("FEDN_ROUND_TIMEOUT", 180))
FEDN_BUFFER_SIZE = int(os.environ.get("FEDN_BUFFER_SIZE", -1))
FEDN_NR_CLIENTS = int(os.environ.get("FEDN_NR_CLIENTS", 2))
FEDN_CLIENT_TIMEOUT = int(os.environ.get("FEDN_CLIENT_TIMEOUT", 60))
FEDN_FL_ALG = os.environ.get("FEDN_FL_ALG", "fedavg")
FEDN_NR_EXPECTED_AGG = int(os.environ.get("FEDN_NR_EXPECTED_AGG", 2)) # Number of expected aggregated models per combiner
FEDN_SESSION_TIMEOUT = int(os.environ.get("FEDN_SESSION_TIMEOUT", 300)) # Session timeout in seconds, all rounds must be finished within this time
FEDN_SESSION_NAME = os.environ.get("FEDN_SESSION_NAME", "test")


token = get_token(token=None, usr_token=False)
host = get_project_url("", "", None, False)
print(f"Connecting to {host}")
client = APIClient(host=host, token=token, secure=True, verify=False)

start_time = time.time()
while time.time() - start_time < FEDN_CLIENT_TIMEOUT:
client_obj = client.get_clients()
if client_obj["count"] == FEDN_NR_CLIENTS and all(c["status"] in ["available", "online"] for c in client_obj["result"]):
break
time.sleep(5) # Wait for 5 seconds before checking again
else:
raise TimeoutError(f"Not all clients are online within {FEDN_CLIENT_TIMEOUT} seconds")

# Start a new session
result = client.start_session(name=FEDN_SESSION_NAME,
aggregator=FEDN_FL_ALG,
round_timeout=FEDN_ROUND_TIMEOUT,
rounds=FEDN_NR_ROUNDS,
round_buffer_size=FEDN_BUFFER_SIZE,
min_clients=FEDN_NR_CLIENTS,
requested_clients=FEDN_NR_CLIENTS)


assert result["message"] == "Session started", f"Expected status 'Session started', got {result['message']}"

session_obj = client.get_sessions()
assert session_obj["count"] == 1, f"Expected 1 session, got {session_obj['count']}"

start_time = time.time()
while time.time() - start_time < FEDN_SESSION_TIMEOUT:
session_obj = client.get_sessions()
session_result = session_obj["result"][0]
if session_result["status"] == "Finished":
break
time.sleep(5) # Wait for 5 seconds before checking again
else:
raise TimeoutError(f"Session did not finish within {FEDN_SESSION_TIMEOUT} seconds")

assert session_result["status"] == "Finished", "Expected session status 'Finished', got {}".format(session_result["status"])
session_config = session_result["session_config"]
assert session_config["buffer_size"] == FEDN_BUFFER_SIZE, f"Expected buffer size {FEDN_BUFFER_SIZE}, got {session_config['buffer_size']}"
assert session_config["round_timeout"] == FEDN_ROUND_TIMEOUT, f"Expected round timeout {FEDN_ROUND_TIMEOUT}, got {session_config['round_timeout']}"

rounds_obj = client.get_rounds()
assert rounds_obj["count"] == FEDN_NR_ROUNDS, f"Expected {FEDN_NR_ROUNDS} rounds, got {rounds_obj['count']}"
rounds_result = rounds_obj["result"]
for round in rounds_result:
assert round["status"] == "Finished", f"Expected round status 'Finished', got {round['status']}"
for combiner in round["combiners"]:
assert combiner["status"] == "Success", f"Expected combiner status 'Finished', got {combiner['status']}"
data = combiner["data"]
assert data["aggregation_time"]["nr_aggregated_models"] == FEDN_NR_EXPECTED_AGG, f"Expected {FEDN_NR_EXPECTED_AGG} aggregated models, got {data['aggregation_time']['nr_aggregated_models']}"


validation_obj = client.get_validations()
assert validation_obj["count"] == FEDN_NR_ROUNDS*FEDN_NR_CLIENTS, f"Expected {FEDN_NR_ROUNDS*FEDN_NR_CLIENTS} validations, got {validation_obj['count']}"
# We could assert or test model convergence here

print("All tests passed!", flush=True)



2 changes: 2 additions & 0 deletions .ci/tests/studio/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pytest
pytest-order
44 changes: 44 additions & 0 deletions .ci/tests/studio/studio.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
set -e

# Define a cleanup function to run on exit
cleanup() {
echo "Running cleanup..."
# Add any cleanup commands here
# For example, killing background processes
for i in $(seq 0 $(($FEDN_NR_CLIENTS - 1))); do
eval "kill \$PID${i}" || true
done
echo "Cleanup completed."
}

# Register the cleanup function to be called on the EXIT signal
trap cleanup EXIT

# Load environment variables from .env file
if [ -f "$(dirname "$0")/.env" ]; then
echo "Loading environment variables from $(dirname "$0")/.env"
export $(cat "$(dirname "$0")/.env" | xargs)
# Echo each variable
while IFS= read -r line; do
if [[ ! "$line" =~ ^# && "$line" =~ = ]]; then
varname=$(echo "$line" | cut -d '=' -f 1)
echo "$varname=${!varname}"
fi
done < "$(dirname "$0")/.env"
fi

fedn studio login -u $STUDIO_USER -P $STUDIO_PASSWORD -H $STUDIO_HOST
fedn project create -n citest -H $STUDIO_HOST --no-interactive
sleep 5
FEDN_PROJECT=$(fedn project list -H $STUDIO_HOST | awk 'NR>=1 {print $1; exit}')
fedn project set-context -id $FEDN_PROJECT -H $STUDIO_HOST
pushd examples/$FEDN_EXAMPLE
fedn client get-config -n test -g $FEDN_NR_CLIENTS -H $STUDIO_HOST
fedn run build --path client --keep-venv
fedn model set-active -f seed.npz -H $STUDIO_HOST
for i in $(seq 0 $(($FEDN_NR_CLIENTS - 1))); do
fedn client start --init test_${i}.yaml --local-package > test_${i}.log 2>&1 & eval "PID${i}=$!"
done
popd
sleep 5
pytest .ci/tests/studio/tests.py
95 changes: 95 additions & 0 deletions .ci/tests/studio/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import os
import time
import pytest
from fedn import APIClient
from fedn.cli.shared import get_token, get_project_url

@pytest.fixture(scope="module")
def fedn_client():
token = get_token(token=None, usr_token=False)
host = get_project_url("", "", None, False)
print(f"Connecting to {host}")
client = APIClient(host=host, token=token, secure=True, verify=True)
return client

@pytest.fixture(scope="module")
def fedn_env():
return {
"FEDN_NR_ROUNDS": int(os.environ.get("FEDN_NR_ROUNDS", 5)),
"FEDN_ROUND_TIMEOUT": int(os.environ.get("FEDN_ROUND_TIMEOUT", 180)),
"FEDN_BUFFER_SIZE": int(os.environ.get("FEDN_BUFFER_SIZE", -1)),
"FEDN_NR_CLIENTS": int(os.environ.get("FEDN_NR_CLIENTS", 2)),
"FEDN_CLIENT_TIMEOUT": int(os.environ.get("FEDN_CLIENT_TIMEOUT", 60)),
"FEDN_FL_ALG": os.environ.get("FEDN_FL_ALG", "fedavg"),
"FEDN_NR_EXPECTED_AGG": int(os.environ.get("FEDN_NR_EXPECTED_AGG", 2)), # Number of expected aggregated models per combiner
"FEDN_SESSION_TIMEOUT": int(os.environ.get("FEDN_SESSION_TIMEOUT", 300)), # Session timeout in seconds, all rounds must be finished within this time
"FEDN_SESSION_NAME": os.environ.get("FEDN_SESSION_NAME", "test")
}

@pytest.mark.order(1)
class TestFednStudio:

@pytest.mark.order(1)
def test_clients_online(self, fedn_client, fedn_env):
start_time = time.time()
while time.time() - start_time < fedn_env["FEDN_CLIENT_TIMEOUT"]:
client_obj = fedn_client.get_clients()
if client_obj["count"] == fedn_env["FEDN_NR_CLIENTS"] and all(c["status"] in ["available", "online"] for c in client_obj["result"]):
break
time.sleep(5) # Wait for 5 seconds before checking again
else:
raise TimeoutError(f"Not all clients are online within {fedn_env['FEDN_CLIENT_TIMEOUT']} seconds")

@pytest.mark.order(2)
def test_start_session(self, fedn_client, fedn_env):
result = fedn_client.start_session(
name=fedn_env["FEDN_SESSION_NAME"],
aggregator=fedn_env["FEDN_FL_ALG"],
round_timeout=fedn_env["FEDN_ROUND_TIMEOUT"],
rounds=fedn_env["FEDN_NR_ROUNDS"],
round_buffer_size=fedn_env["FEDN_BUFFER_SIZE"],
min_clients=fedn_env["FEDN_NR_CLIENTS"],
requested_clients=fedn_env["FEDN_NR_CLIENTS"]
)
assert result["message"] == "Session started", f"Expected status 'Session started', got {result['message']}"

@pytest.mark.order(3)
def test_session_completion(self, fedn_client, fedn_env):
session_obj = fedn_client.get_sessions()
assert session_obj["count"] == 1, f"Expected 1 session, got {session_obj['count']}"
session_result = session_obj["result"][0]

start_time = time.time()
while time.time() - start_time < fedn_env["FEDN_SESSION_TIMEOUT"]:
session_obj = fedn_client.get_sessions()
session_result = session_obj["result"][0]
if session_result["status"] == "Finished":
break
time.sleep(5) # Wait for 5 seconds before checking again
else:
raise TimeoutError(f"Session did not finish within {fedn_env['FEDN_SESSION_TIMEOUT']} seconds")

assert session_result["status"] == "Finished", "Expected session status 'Finished', got {}".format(session_result["status"])
session_config = session_result["session_config"]
assert session_config["buffer_size"] == fedn_env["FEDN_BUFFER_SIZE"], f"Expected buffer size {fedn_env['FEDN_BUFFER_SIZE']}, got {session_config['buffer_size']}"
assert session_config["round_timeout"] == fedn_env["FEDN_ROUND_TIMEOUT"], f"Expected round timeout {fedn_env['FEDN_ROUND_TIMEOUT']}, got {session_config['round_timeout']}"

@pytest.mark.order(4)
def test_rounds_completion(self, fedn_client, fedn_env):
rounds_obj = fedn_client.get_rounds()
assert rounds_obj["count"] == fedn_env["FEDN_NR_ROUNDS"], f"Expected {fedn_env['FEDN_NR_ROUNDS']} rounds, got {rounds_obj['count']}"
rounds_result = rounds_obj["result"]
for round in rounds_result:
assert round["status"] == "Finished", f"Expected round status 'Finished', got {round['status']}"
for combiner in round["combiners"]:
assert combiner["status"] == "Success", f"Expected combiner status 'Finished', got {combiner['status']}"
data = combiner["data"]
assert data["aggregation_time"]["nr_aggregated_models"] == fedn_env["FEDN_NR_EXPECTED_AGG"], f"Expected {fedn_env['FEDN_NR_EXPECTED_AGG']} aggregated models, got {data['aggregation_time']['nr_aggregated_models']}"

@pytest.mark.order(5)
def test_validations(self, fedn_client, fedn_env):
validation_obj = fedn_client.get_validations()
assert validation_obj["count"] == fedn_env["FEDN_NR_ROUNDS"] * fedn_env["FEDN_NR_CLIENTS"], f"Expected {fedn_env['FEDN_NR_ROUNDS'] * fedn_env['FEDN_NR_CLIENTS']} validations, got {validation_obj['count']}"
# We could assert or test model convergence here

print("All tests passed!", flush=True)
103 changes: 103 additions & 0 deletions .github/workflows/integration-test-studio.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
name: Integration Test Studio

on:
schedule:
- cron: '0 0 * * *' # This cron expression runs the workflow every day at midnight UTC
workflow_dispatch: # Allows manual triggering of the workflow
inputs:
STUDIO_HOST:
description: 'Studio Host'
required: false
default: 'api.studio.scaleoutplatform.com'
STUDIO_USER:
description: 'Studio User'
required: false
default: 'github@scaleoutsystems.com'
FEDN_EXAMPLE:
description: 'FEDN Example'
required: false
default: 'mnist-pytorch'
FEDN_NR_CLIENTS:
description: 'Number of Clients'
required: false
default: '2'
FEDN_NR_ROUNDS:
description: 'Number of Rounds'
required: false
default: '5'
FEDN_ROUND_TIMEOUT:
description: 'Round Timeout'
required: false
default: '180'
FEDN_BUFFER_SIZE:
description: 'Buffer Size'
required: false
default: '-1'
FEDN_FL_ALG:
description: 'FL Algorithm'
required: false
default: 'fedavg'
FEDN_NR_EXPECTED_AGG:
description: 'Number of Expected Aggregations Per Round'
required: false
default: '2'
FEDN_SESSION_TIMEOUT:
description: 'Session Timeout'
required: false
default: '300'
FEDN_SESSION_NAME:
description: 'Session Name'
required: false
default: 'test'
FEDN_CLIENT_TIMEOUT:
description: 'Client Connection Timeout (OBS - not related to round timeout)'
required: false
default: '60'

jobs:
integration-test:
runs-on: self-hosted

steps:
- name: Checkout repository
uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.12'

- name: Create and activate virtual environment
run: |
python -m venv venv
source venv/bin/activate
- name: Install dependencies
run: |
source venv/bin/activate
python -m pip install --upgrade pip
pip install .
pip install -r .ci/tests/studio/requirements.txt
- name: Generate .env file
run: |
echo "STUDIO_HOST=${{ github.event.inputs.STUDIO_HOST || 'api.studio.scaleoutplatform.com' }}" > .ci/tests/studio/.env
echo "STUDIO_USER=${{ github.event.inputs.STUDIO_USER || 'github@scaleoutsystems.com' }}" >> .ci/tests/studio/.env
echo "FEDN_EXAMPLE=${{ github.event.inputs.FEDN_EXAMPLE || 'mnist-pytorch' }}" >> .ci/tests/studio/.env
echo "FEDN_NR_CLIENTS=${{ github.event.inputs.FEDN_NR_CLIENTS || '2' }}" >> .ci/tests/studio/.env
echo "FEDN_NR_ROUNDS=${{ github.event.inputs.FEDN_NR_ROUNDS || '5' }}" >> .ci/tests/studio/.env
echo "FEDN_ROUND_TIMEOUT=${{ github.event.inputs.FEDN_ROUND_TIMEOUT || '180' }}" >> .ci/tests/studio/.env
echo "FEDN_BUFFER_SIZE=${{ github.event.inputs.FEDN_BUFFER_SIZE || '-1' }}" >> .ci/tests/studio/.env
echo "FEDN_FL_ALG=${{ github.event.inputs.FEDN_FL_ALG || 'fedavg' }}" >> .ci/tests/studio/.env
echo "FEDN_NR_EXPECTED_AGG=${{ github.event.inputs.FEDN_NR_EXPECTED_AGG || '2' }}" >> .ci/tests/studio/.env
echo "FEDN_SESSION_TIMEOUT=${{ github.event.inputs.FEDN_SESSION_TIMEOUT || '300' }}" >> .ci/tests/studio/.env
echo "FEDN_SESSION_NAME=${{ github.event.inputs.FEDN_SESSION_NAME || 'test' }}" >> .ci/tests/studio/.env
echo "FEDN_CLIENT_TIMEOUT=${{ github.event.inputs.FEDN_CLIENT_TIMEOUT || '60' }}" >> .ci/tests/studio/.env
- name: Run integration tests
env:
STUDIO_PASSWORD: ${{ secrets.STUDIO_PASSWORD }}
run: |
source venv/bin/activate
chmod +x .ci/tests/studio/studio.sh
.ci/tests/studio/studio.sh
35 changes: 19 additions & 16 deletions fedn/cli/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,27 @@ def get_api_url(protocol: str, host: str, port: str, endpoint: str, usr_api: boo
_host = host or os.environ.get("FEDN_STUDIO_HOST") or STUDIO_DEFAULTS["host"]

if _url is None:
return f"{_protocol}://{_host}/api/{API_VERSION}/{endpoint}"

return f"{_url}/api/{API_VERSION}/{endpoint}"
_url = f"{_protocol}://{_host}/api/{API_VERSION}/{endpoint}"
else:
_url = os.environ.get("FEDN_CONTROLLER_URL")
_protocol = protocol or os.environ.get("FEDN_CONTROLLER_PROTOCOL") or CONTROLLER_DEFAULTS["protocol"]
_host = host or os.environ.get("FEDN_CONTROLLER_HOST") or CONTROLLER_DEFAULTS["host"]
_port = port or os.environ.get("FEDN_CONTROLLER_PORT") or CONTROLLER_DEFAULTS["port"]
_url = get_project_url(protocol, host, port, endpoint)
_url = f"{_url}/api/{API_VERSION}/{endpoint}"
return _url

if _url is None:
context_path = os.path.join(home_dir, ".fedn")
try:
context_data = get_context(context_path)
_url = context_data.get("Active project url")
except Exception as e:
click.echo(f"Encountered error {e}. Make sure you are logged in and have activated a project. Using controller defaults instead.", fg="red")
_url = f"{_protocol}://{_host}:{_port}"
return f"{_url}/api/{API_VERSION}/{endpoint}"

def get_project_url(protocol: str, host: str, port: str, endpoint: str) -> str:
_url = os.environ.get("FEDN_CONTROLLER_URL")
if _url is None:
context_path = os.path.join(home_dir, ".fedn")
try:
context_data = get_context(context_path)
_url = context_data.get("Active project url")
except Exception as e:
click.echo(f"Encountered error {e}. Make sure you are logged in and have activated a project. Using controller defaults instead.", fg="red")
_protocol = protocol or os.environ.get("FEDN_CONTROLLER_PROTOCOL") or CONTROLLER_DEFAULTS["protocol"]
_host = host or os.environ.get("FEDN_CONTROLLER_HOST") or CONTROLLER_DEFAULTS["host"]
_port = port or os.environ.get("FEDN_CONTROLLER_PORT") or CONTROLLER_DEFAULTS["port"]
_url = f"{_protocol}://{_host}:{_port}"
return _url


def get_token(token: str, usr_token: bool) -> str:
Expand Down
7 changes: 6 additions & 1 deletion fedn/network/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ class APIClient:
:type verify: bool
"""

def __init__(self, host, port=None, secure=False, verify=False, token=None, auth_scheme=None):
def __init__(self, host: str, port: int = None, secure: bool = False, verify: bool = False, token: str = None, auth_scheme: str = None):
if "://" in host:
host = host.split("://")[1]
self.host = host
self.port = port
self.secure = secure
Expand All @@ -37,6 +39,9 @@ def __init__(self, host, port=None, secure=False, verify=False, token=None, auth
token = os.environ.get("FEDN_AUTH_TOKEN", False)

if token:
# Split the token if it contains a space (scheme + token).
if " " in token:
token = token.split()[1]
self.headers = {"Authorization": f"{auth_scheme} {token}"}

def _get_url(self, endpoint):
Expand Down

0 comments on commit cabd5d4

Please sign in to comment.