Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/SK-845 | As a user I want to continue an existing session #609

Merged
merged 9 commits into from
May 17, 2024
13 changes: 3 additions & 10 deletions fedn/network/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,12 @@

from flask import Flask, jsonify, request

from fedn.common.config import get_controller_config, get_modelstorage_config, get_network_config, get_statestore_config
from fedn.common.config import get_controller_config
from fedn.network.api.auth import jwt_auth_required
from fedn.network.api.interface import API
from fedn.network.api.v1 import _routes
from fedn.network.controller.control import Control
from fedn.network.storage.statestore.mongostatestore import MongoStateStore

statestore_config = get_statestore_config()
network_id = get_network_config()
modelstorage_config = get_modelstorage_config()
statestore = MongoStateStore(network_id, statestore_config["mongo_config"])
statestore.set_storage_backend(modelstorage_config)
control = Control(statestore=statestore)
from fedn.network.api.shared import statestore, control


custom_url_prefix = os.environ.get("FEDN_CUSTOM_URL_PREFIX", False)
api = API(statestore, control)
Expand Down
11 changes: 11 additions & 0 deletions fedn/network/api/shared.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from fedn.common.config import get_modelstorage_config, get_network_config, get_statestore_config
from fedn.network.controller.control import Control
from fedn.network.storage.statestore.mongostatestore import MongoStateStore

statestore_config = get_statestore_config()
modelstorage_config = get_modelstorage_config()
network_id = get_network_config()

statestore = MongoStateStore(network_id, statestore_config["mongo_config"])
statestore.set_storage_backend(modelstorage_config)
control = Control(statestore=statestore)
3 changes: 2 additions & 1 deletion fedn/network/api/v1/model_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from flask import Blueprint, jsonify, request, send_file

from fedn.network.api.auth import jwt_auth_required
from fedn.network.api.v1.shared import api_version, get_limit, get_post_data_to_kwargs, get_reverse, get_typed_list_headers, mdb, modelstorage_config
from fedn.network.api.v1.shared import api_version, get_limit, get_post_data_to_kwargs, get_reverse, get_typed_list_headers, mdb
from fedn.network.api.shared import modelstorage_config
from fedn.network.storage.s3.base import RepositoryBase
from fedn.network.storage.s3.miniorepository import MINIORepository
from fedn.network.storage.statestore.stores.model_store import ModelStore
Expand Down
84 changes: 84 additions & 0 deletions fedn/network/api/v1/session_routes.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import threading

from flask import Blueprint, jsonify, request

from fedn.network.api.auth import jwt_auth_required
from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, mdb
from fedn.network.storage.statestore.stores.session_store import SessionStore
from fedn.network.storage.statestore.stores.shared import EntityNotFound
from .model_routes import model_store
from fedn.network.api.shared import control

bp = Blueprint("session", __name__, url_prefix=f"/api/{api_version}/sessions")

Expand Down Expand Up @@ -304,3 +308,83 @@ def get_session(id: str):
return jsonify({"message": str(e)}), 404
except Exception as e:
return jsonify({"message": str(e)}), 500


@bp.route("/", methods=["POST"])
@jwt_auth_required(role="admin")
def post():
"""Create session
Creates a new session based on the provided data.
---
tags:
- Sessions
parameters:
- name: session
in: body
required: true
schema:
type: object
properties:
session_id:
type: string
session_config:
type: object
responses:
201:
description: The created session
schema:
$ref: '#/definitions/Session'
500:
description: An error occurred
schema:
type: object
properties:
message:
type: string
"""
try:
data = request.json if request.headers["Content-Type"] == "application/json" else request.form.to_dict()
successful, result = session_store.add(data)
response = result
status_code: int = 201 if successful else 400

return jsonify(response), status_code
except Exception as e:
return jsonify({"message": str(e)}), 500


@bp.route("/start", methods=["POST"])
@jwt_auth_required(role="admin")
def start_session():
"""Start a new session.
param: session_id: The session id to start.
type: session_id: str
param: rounds: The number of rounds to run.
type: rounds: int
"""
try:
data = request.json if request.headers["Content-Type"] == "application/json" else request.form.to_dict()
session_id: str = data.get("session_id")
rounds: int = data.get("rounds", "")

if not session_id or session_id == "":
return jsonify({"message": "Session ID is required"}), 400

if not rounds or rounds == "":
return jsonify({"message": "Rounds is required"}), 400

if not isinstance(rounds, int):
return jsonify({"message": "Rounds must be an integer"}), 400

session = session_store.get(session_id, use_typing=False)

session_config = session["session_config"]
model_id = session_config["model_id"]

_ = model_store.get(model_id, use_typing=False)

threading.Thread(target=control.start_session, args=(session_id, rounds)).start()

return jsonify({"message": "Session started"}), 200
except Exception as e:
return jsonify({"message": str(e)}), 500
6 changes: 1 addition & 5 deletions fedn/network/api/v1/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@
import pymongo
from pymongo.database import Database

from fedn.common.config import get_modelstorage_config, get_network_config, get_statestore_config
from fedn.network.api.shared import statestore_config, network_id

api_version = "v1"

statestore_config = get_statestore_config()
modelstorage_config = get_modelstorage_config()
network_id = get_network_config()

mc = pymongo.MongoClient(**statestore_config["mongo_config"])
mc.server_info()
mdb: Database = mc[network_id]
Expand Down
54 changes: 54 additions & 0 deletions fedn/network/controller/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,60 @@ def __init__(self, statestore):
super().__init__(statestore)
self.name = "DefaultControl"

def start_session(self, session_id: str, rounds: int):
if self._state == ReducerState.instructing:
logger.info("Controller already in INSTRUCTING state. A session is in progress.")
return

if not self.statestore.get_latest_model():
logger.warning("No model in model chain, please provide a seed model!")
return

self._state = ReducerState.instructing

session = self.statestore.get_session(session_id)

if not session:
logger.error("Session not found.")
return

session_config = session["session_config"]

if not session_config or not isinstance(session_config, dict):
logger.error("Session not properly configured.")
return

self._state = ReducerState.monitoring

last_round = int(self.get_latest_round_id())

aggregator = session_config["aggregator"]

session_config["session_id"] = session_id

for combiner in self.network.get_combiners():
combiner.set_aggregator(aggregator)

self.set_session_status(session_id, "Started")

for round in range(1, rounds + 1):
if last_round:
current_round = last_round + round
else:
current_round = round

try:
_, round_data = self.round(session_config, str(current_round))
except TypeError as e:
logger.error("Failed to execute round: {0}".format(e))

logger.info("Round completed with status {}".format(round_data["status"]))

session_config["model_id"] = self.statestore.get_latest_model()

self.set_session_status(session_id, "Finished")
self._state = ReducerState.idle

def session(self, config):
"""Execute a new training session. A session consists of one
or several global rounds. All rounds in the same session
Expand Down
4 changes: 2 additions & 2 deletions fedn/network/storage/statestore/stores/client_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Dict, List
from typing import Any, Dict, List, Tuple

import pymongo
from pymongo.database import Database
Expand Down Expand Up @@ -49,7 +49,7 @@ def get(self, id: str, use_typing: bool = False) -> Client:
def update(self, id: str, item: Client) -> bool:
raise NotImplementedError("Update not implemented for ClientStore")

def add(self, item: Client) -> bool:
def add(self, item: Client)-> Tuple[bool, Any]:
raise NotImplementedError("Add not implemented for ClientStore")

def delete(self, id: str) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions fedn/network/storage/statestore/stores/combiner_store.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List
from typing import Any, Dict, List, Tuple

import pymongo
from bson import ObjectId
Expand Down Expand Up @@ -82,7 +82,7 @@ def get(self, id: str, use_typing: bool = False) -> Combiner:
def update(self, id: str, item: Combiner) -> bool:
raise NotImplementedError("Update not implemented for CombinerStore")

def add(self, item: Combiner) -> bool:
def add(self, item: Combiner)-> Tuple[bool, Any]:
raise NotImplementedError("Add not implemented for CombinerStore")

def delete(self, id: str) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions fedn/network/storage/statestore/stores/model_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Dict, List
from typing import Any, Dict, List, Tuple

import pymongo
from bson import ObjectId
Expand Down Expand Up @@ -60,7 +60,7 @@ def get(self, id: str, use_typing: bool = False) -> Model:
def update(self, id: str, item: Model) -> bool:
raise NotImplementedError("Update not implemented for ModelStore")

def add(self, item: Model) -> bool:
def add(self, item: Model)-> Tuple[bool, Any]:
raise NotImplementedError("Add not implemented for ModelStore")

def delete(self, id: str) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions fedn/network/storage/statestore/stores/package_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Dict, List
from typing import Any, Dict, List, Tuple

import pymongo
from pymongo.database import Database
Expand Down Expand Up @@ -94,7 +94,7 @@ def get_active(self, use_typing: bool = False) -> Package:
def update(self, id: str, item: Package) -> bool:
raise NotImplementedError("Update not implemented for PackageStore")

def add(self, item: Package) -> bool:
def add(self, item: Package)-> Tuple[bool, Any]:
raise NotImplementedError("Add not implemented for PackageStore")

def delete(self, id: str) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions fedn/network/storage/statestore/stores/round_store.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List
from typing import Any, Dict, List, Tuple

import pymongo
from pymongo.database import Database
Expand Down Expand Up @@ -44,7 +44,7 @@ def get(self, id: str, use_typing: bool = False) -> Round:
def update(self, id: str, item: Round) -> bool:
raise NotImplementedError("Update not implemented for RoundStore")

def add(self, item: Round) -> bool:
def add(self, item: Round)-> Tuple[bool, Any]:
raise NotImplementedError("Add not implemented for RoundStore")

def delete(self, id: str) -> bool:
Expand Down
Loading
Loading