Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/SK-1257 | Bugfixes and improvements of combiner, and new load test #774

Open
wants to merge 52 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
9678042
Added load test based on shuffling numpy arrays
Mar 11, 2024
52e4b88
Added load test based on shuffling numpy arrays
Mar 11, 2024
58a5e11
complete example
Mar 11, 2024
ab21e13
Merge master
Mar 25, 2024
4e9dd7c
Code checks
Mar 25, 2024
8905724
Script to save seed model to file
benjaminastrand Dec 4, 2024
74034f1
Script to split dataset across clients
benjaminastrand Dec 4, 2024
2f2f4c5
Add gitignore
benjaminastrand Dec 4, 2024
5483a93
Function to get data loader for a given subset
benjaminastrand Dec 4, 2024
3d4b38b
Script to upload seed model
benjaminastrand Dec 4, 2024
b7348ec
File to store configuration settings
benjaminastrand Dec 4, 2024
6f8adf8
Add training config to config.py
benjaminastrand Dec 5, 2024
f3b1701
Default values for data split config
benjaminastrand Dec 5, 2024
ca28544
Add function to load params from npz file
benjaminastrand Dec 5, 2024
ab77567
Script to connect client, train and validate
benjaminastrand Dec 5, 2024
89ec85b
Add npz files to gitignore
benjaminastrand Dec 5, 2024
cbd86ba
Ruff (bypass pickle warning since data comes from trusted source)
benjaminastrand Dec 5, 2024
fdbb46b
Added requirements file
benjaminastrand Dec 6, 2024
abe3028
Merge branch 'master' into feature/load-test
Dec 9, 2024
e1ce6f3
Refactored entrypoint
Dec 9, 2024
30d7bc5
Refactored entrypoint
Dec 9, 2024
9e096b0
clean up
Dec 9, 2024
a76b150
Clean
Dec 9, 2024
9292e52
clean up
Dec 9, 2024
d4a6058
Fixed bug where client would fail if no env in fedn.yaml
Dec 10, 2024
a85fc13
Train callback compatible with 0.19.0
benjaminastrand Dec 11, 2024
8ef8e78
Fixed error in apidocs
Dec 13, 2024
7ef5d51
Updated dev instructions
Dec 13, 2024
a1862f2
Experiment with chunk_size
Dec 13, 2024
de7a577
Experiment with chunk_size
Dec 13, 2024
9d7b137
Reverting chunk_size, so significant difference
Dec 13, 2024
376698f
test
Dec 13, 2024
05f29fe
test
Dec 13, 2024
9bd1283
test
Dec 13, 2024
ce258e6
test
Dec 13, 2024
e772691
Latest
Dec 16, 2024
6087890
Added back time_model_load/aggregation
Dec 16, 2024
10a8ca4
Bugfix timing of aggregation
Dec 16, 2024
5d188d1
Fix bug in fedopt that causes model delete to fail in fedopt
Dec 16, 2024
c68c1af
Improver error handling in fedopt
Dec 16, 2024
500563c
Bugfix
Dec 16, 2024
dd5db2f
ruff linting
Dec 16, 2024
f8f9732
Train callback compatible with 0.20.0
benjaminastrand Jan 8, 2025
0150ab9
wip
Jan 9, 2025
ef184fa
Defined pseudo_gradient
Jan 9, 2025
e2a7180
Merge branch 'master' into feature/SK-1257
Jan 10, 2025
aa11dc7
Refactor FedOpt
Jan 10, 2025
755051d
Merge branch 'feature/SK-1183' into feature/SK-1257
Jan 10, 2025
24e60c5
wip
Jan 10, 2025
e1cc4a1
Fixed review comments
Jan 11, 2025
99608b6
fix
Jan 11, 2025
94c6a53
Ruff
benjaminastrand Jan 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/apiclient.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ using the default aggregator (FedAvg):
.. code:: python

>>> ...
>>> client.start_session(id="test-session", rounds=3)
>>> client.start_session(id="test-session", helper="numpyhelper", rounds=3)
# Wait for training to complete, when controller is idle:
>>> client.get_controller_status()
# Show model trail:
Expand Down
28 changes: 15 additions & 13 deletions docs/developer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,32 @@ We provide Dockerfiles and docker-compose template for an all-in-one local sandb

.. code-block::

docker compose \
-f ../../docker-compose.yaml \
-f docker-compose.override.yaml \
up
docker compose up

This starts up local services for MongoDB, Minio, the API Server, one Combiner and two clients.
This starts up local services for MongoDB, Minio, the API Server, and one Combiner.
You can verify the deployment on localhost using these urls:

- API Server: http://localhost:8092/get_controller_status
- Minio: http://localhost:9000
- Mongo Express: http://localhost:8081

This setup does not include any of the security and authentication features available in a Studio Project,
so we will not require authentication of clients (insecure mode) when using the APIClient:
To connect a native FEDn client to the sandbox deployment, first edit '/etc/hosts' and add the line 'localhost api-server combiner'. Then
create a file `client.yaml` with the following content:

.. code-block::

from fedn import APIClient
client = APIClient(host="localhost", port=8092)
client.set_active_package("package.tgz", helper="numpyhelper")
client.set_active_model("seed.npz")
network_id: fedn-network
discover_host: api-server
discover_port: 8092
name: myclient

To connect a native FEDn client to the sandbox deployment, you need to make sure that the combiner service can be resolved by the client using the name "combiner".
One way to achieve this is to edit your '/etc/hosts' and add a line '127.0.0.1 combiner'.
Now you can start a client:

.. code-block::
fedn client start -in client.yaml --api-url=http://localhost --api-port=8090

If you are running the server on a remote machine/VM, simply replace 'localhost' with the IP address or hostname of that machine in the instructions above.
Make sure to remember to open ports 8081, 8090, and 12080 on the server host.

Access message logs and validation data from MongoDB
------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions examples/cifar100/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
data/*
results/*
*.npz
13 changes: 13 additions & 0 deletions examples/cifar100/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
settings = {
"N_CLIENTS": 5,
"DISCOVER_HOST": "localhost",
"DISCOVER_PORT": 8092,
"SECURE": False,
"VERIFY": False,
"ADMIN_TOKEN": None,
"CLIENT_TOKEN": None,
"BATCH_SIZE": 128,
"EPOCHS": 1,
"BALANCED": True,
"IID": True,
}
312 changes: 312 additions & 0 deletions examples/cifar100/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
import os
import pickle
from typing import List

import numpy as np
from scipy.stats import dirichlet
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision import datasets, transforms

# Set a fixed random seed for reproducibility
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
# testloader = DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)


def fine_to_coarse_labels(fine_labels: np.ndarray) -> np.ndarray:
coarse = np.array(
[
4,
1,
14,
8,
0,
6,
7,
7,
18,
3,
3,
14,
9,
18,
7,
11,
3,
9,
7,
11,
6,
11,
5,
10,
7,
6,
13,
15,
3,
15,
0,
11,
1,
10,
12,
14,
16,
9,
11,
5,
5,
19,
8,
8,
15,
13,
14,
17,
18,
10,
16,
4,
17,
4,
2,
0,
17,
4,
18,
17,
10,
3,
2,
12,
12,
16,
12,
1,
9,
19,
2,
10,
0,
1,
16,
12,
9,
13,
15,
13,
16,
19,
2,
4,
6,
19,
5,
5,
8,
19,
18,
1,
2,
15,
6,
0,
17,
8,
14,
13,
]
)
return coarse[fine_labels]


class CIFAR100Federated:
def __init__(self, root_dir: str = "./data/splits"):
"""Initialize the splitter
:param root_dir: Directory to save the split datasets
"""
self.root_dir = root_dir
self.splits = {}
os.makedirs(root_dir, exist_ok=True)

# Load the full dataset
self.transform_train = transforms.Compose(
[
transforms.RandomCrop(24),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761)),
]
)
self.trainset = datasets.CIFAR100(root="./data", train=True, download=True, transform=self.transform_train)

self.transform_test = transforms.Compose(
[transforms.CenterCrop(24), transforms.ToTensor(), transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761))]
)
self.testset = datasets.CIFAR100(root="./data", train=False, download=True, transform=self.transform_test)

def create_splits(self, num_splits: int, balanced: bool, iid: bool) -> None:
"""Create dataset splits based on specified parameters
:param num_splits: Number of splits to create
:param balanced: Whether splits should have equal size
:param iid: Whether splits should be IID
"""
config_key = f"splits_{num_splits}_bal_{balanced}_iid_{iid}"

if iid:
indices = self._create_iid_splits(num_splits, balanced)
else:
indices = self._create_non_iid_splits(num_splits, balanced)

# Save splits
for i, split_indices in enumerate(indices):
split_path = os.path.join(self.root_dir, f"{config_key}_split_{i}.pkl")
with open(split_path, "wb") as f:
pickle.dump(split_indices, f)

self.splits[config_key] = indices

def _create_iid_splits(self, num_splits: int, balanced: bool) -> List[np.ndarray]:
"""Create IID splits of the dataset"""
indices = np.arange(len(self.trainset))
np.random.shuffle(indices)

if balanced:
# Equal size splits
split_size = len(indices) // num_splits
return [indices[i * split_size : (i + 1) * split_size] for i in range(num_splits)]
else:
# Random size splits
split_points = sorted(np.random.choice(len(indices) - 1, num_splits - 1, replace=False))
return np.split(indices, split_points)

def _create_non_iid_splits(self, num_splits: int, balanced: bool) -> List[np.ndarray]:
"""Create non-IID splits using Pachinko Allocation Method (PAM)"""
# Initialize parameters
alpha = 0.1 # Root Dirichlet parameter
beta = 10.0 # Coarse-to-fine Dirichlet parameter
total_examples = len(self.trainset)

# Calculate examples per split
if balanced:
examples_per_split = [total_examples // num_splits] * num_splits
else:
# Use Dirichlet to create unbalanced split sizes
split_ratios = np.random.dirichlet([0.5] * num_splits) # Lower alpha = more unbalanced
examples_per_split = np.round(split_ratios * total_examples).astype(int)
# Ensure we use exactly total_examples
examples_per_split[-1] = total_examples - examples_per_split[:-1].sum()

# Get fine labels and map them to coarse labels
fine_labels = np.array(self.trainset.targets)
coarse_labels = fine_to_coarse_labels(fine_labels)

# Initialize DAG structure (track available labels)
available_coarse = list(range(20)) # 20 coarse labels as list instead of set
available_fine = {c: set(np.where(coarse_labels == c)[0]) for c in available_coarse}

indices_per_split = []
for split_idx in range(num_splits):
split_indices = []
N = examples_per_split[split_idx] # Use the pre-calculated split size

# Sample root distribution over coarse labels
coarse_probs = dirichlet.rvs(alpha=[alpha] * len(available_coarse), size=1, random_state=RANDOM_SEED + split_idx)[0]

# Sample fine label distributions for each available coarse label
fine_distributions = {}
for c in available_coarse:
if len(available_fine[c]) > 0:
fine_probs = dirichlet.rvs(alpha=[beta] * len(available_fine[c]), size=1, random_state=RANDOM_SEED + split_idx + c)[0]
fine_distributions[c] = fine_probs

# Sample N examples for this split
for _ in range(N):
if len(available_coarse) == 0:
break

# Sample coarse label
coarse_idx = np.random.choice(available_coarse, p=coarse_probs)

if len(available_fine[coarse_idx]) == 0:
# Remove empty coarse label and renormalize
idx_to_remove = available_coarse.index(coarse_idx)
available_coarse.remove(coarse_idx)
coarse_probs = self._renormalize(coarse_probs, idx_to_remove)
continue

# Sample fine label
fine_probs = fine_distributions[coarse_idx]
available_fine_indices = list(available_fine[coarse_idx])
fine_probs = fine_probs[: len(available_fine_indices)]
fine_probs = fine_probs / fine_probs.sum() # Renormalize
fine_idx = np.random.choice(available_fine_indices, p=fine_probs)

# Add example to split
split_indices.append(fine_idx)

# Remove selected example
available_fine[coarse_idx].remove(fine_idx)

# Renormalize if necessary
if len(available_fine[coarse_idx]) == 0:
idx_to_remove = available_coarse.index(coarse_idx)
available_coarse.remove(coarse_idx)
coarse_probs = self._renormalize(coarse_probs, idx_to_remove)

indices_per_split.append(np.array(split_indices))

return indices_per_split

def _renormalize(self, probs: np.ndarray, removed_idx: int) -> np.ndarray:
"""Implementation of Algorithm 8 from the paper"""
# Create a list of valid indices (excluding the removed index)
valid_indices = [i for i in range(len(probs)) if i != removed_idx]

# Select only the probabilities for valid indices
valid_probs = probs[valid_indices]

# Normalize the remaining probabilities
return valid_probs / valid_probs.sum()

def get_split(self, split_id: int, num_splits: int, balanced: bool, iid: bool) -> Dataset:
"""Get a specific split of the dataset
:param split_id: ID of the split to retrieve
:param num_splits: Total number of splits
:param balanced: Whether splits are balanced
:param iid: Whether splits are IID
:return: Dataset split
"""
config_key = f"splits_{num_splits}_bal_{balanced}_iid_{iid}"
split_path = os.path.join(self.root_dir, f"{config_key}_split_{split_id}.pkl")

if not os.path.exists(split_path):
self.create_splits(num_splits, balanced, iid)

with open(split_path, "rb") as f:
indices = pickle.load(f) # noqa: S301

return Subset(self.trainset, indices)


def get_data_loader(num_splits: int = 5, balanced: bool = True, iid: bool = True, batch_size: int = 100, is_train: bool = True):
"""Get a data loader for the CIFAR-100 dataset
:param num_splits: Number of splits to create
:param balanced: Whether splits are balanced
:param iid: Whether splits are IID
:param batch_size: Batch size
:param is_train: Whether to get the training or test data loader
:return: Data loader
"""
cifar_data = CIFAR100Federated()

if is_train:
split_id = os.environ.get("FEDN_DATA_SPLIT_ID", 0)
dataset = cifar_data.get_split(split_id=split_id, num_splits=num_splits, balanced=balanced, iid=iid)
print(f"Getting data loader for split {split_id} of trainset (size: {len(dataset)})")
else:
dataset = cifar_data.testset
print(f"Getting data loader for testset (size: {len(dataset)})")

return DataLoader(dataset, batch_size=batch_size, shuffle=is_train)
13 changes: 13 additions & 0 deletions examples/cifar100/init_fedn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from config import settings
from fedn import APIClient

client = APIClient(
host=settings["DISCOVER_HOST"],
port=settings["DISCOVER_PORT"],
secure=settings["SECURE"],
verify=settings["VERIFY"],
token=settings["ADMIN_TOKEN"],
)

result = client.set_active_model("seed.npz")
print(result["message"])
Loading
Loading