From d03b3c0ff955dd21da6dafdaab788dc374909f80 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 13 Feb 2025 10:25:27 +0100 Subject: [PATCH] Feature/SK-1405 | Add analytics routes and store (#811) --- fedn/network/api/shared.py | 2 + fedn/network/api/v1/__init__.py | 3 +- fedn/network/api/v1/analytic_routes.py | 39 +++++++++++ fedn/network/storage/dbconnection.py | 20 +++++- .../statestore/stores/analytic_store.py | 64 +++++++++++++++++++ 5 files changed, 126 insertions(+), 2 deletions(-) create mode 100644 fedn/network/api/v1/analytic_routes.py create mode 100644 fedn/network/storage/statestore/stores/analytic_store.py diff --git a/fedn/network/api/shared.py b/fedn/network/api/shared.py index b1969bd6f..6ec3bba48 100644 --- a/fedn/network/api/shared.py +++ b/fedn/network/api/shared.py @@ -8,6 +8,7 @@ from fedn.network.storage.s3.base import RepositoryBase from fedn.network.storage.s3.miniorepository import MINIORepository from fedn.network.storage.s3.repository import Repository +from fedn.network.storage.statestore.stores.analytic_store import AnalyticStore from fedn.network.storage.statestore.stores.client_store import ClientStore from fedn.network.storage.statestore.stores.combiner_store import CombinerStore from fedn.network.storage.statestore.stores.model_store import ModelStore @@ -34,6 +35,7 @@ status_store: StatusStore = stores.status_store validation_store: ValidationStore = stores.validation_store prediction_store: PredictionStore = stores.prediction_store +analytic_store: AnalyticStore = stores.analytic_store repository = Repository(modelstorage_config["storage_config"]) diff --git a/fedn/network/api/v1/__init__.py b/fedn/network/api/v1/__init__.py index 83af60aad..0a7cedc01 100644 --- a/fedn/network/api/v1/__init__.py +++ b/fedn/network/api/v1/__init__.py @@ -1,3 +1,4 @@ +from fedn.network.api.v1.analytic_routes import bp as analytic_bp from fedn.network.api.v1.client_routes import bp as client_bp from fedn.network.api.v1.combiner_routes import bp as combiner_bp from fedn.network.api.v1.helper_routes import bp as helper_bp @@ -9,4 +10,4 @@ from fedn.network.api.v1.status_routes import bp as status_bp from fedn.network.api.v1.validation_routes import bp as validation_bp -_routes = [client_bp, combiner_bp, model_bp, package_bp, round_bp, session_bp, status_bp, validation_bp, prediction_bp, helper_bp] +_routes = [client_bp, combiner_bp, model_bp, package_bp, round_bp, session_bp, status_bp, validation_bp, prediction_bp, helper_bp, analytic_bp] diff --git a/fedn/network/api/v1/analytic_routes.py b/fedn/network/api/v1/analytic_routes.py new file mode 100644 index 000000000..212d912c3 --- /dev/null +++ b/fedn/network/api/v1/analytic_routes.py @@ -0,0 +1,39 @@ +from flask import Blueprint, jsonify, request + +from fedn.common.log_config import logger +from fedn.network.api.auth import jwt_auth_required +from fedn.network.api.shared import analytic_store +from fedn.network.api.v1.shared import api_version, get_typed_list_headers + +bp = Blueprint("analytic", __name__, url_prefix=f"/api/{api_version}/analytics") + + +@bp.route("/", methods=["GET"]) +@jwt_auth_required(role="admin") +def get_analytics(): + try: + limit, skip, sort_key, sort_order = get_typed_list_headers(request.headers) + kwargs = request.args.to_dict() + + response = analytic_store.list(limit, skip, sort_key, sort_order, **kwargs) + + return jsonify(response), 200 + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + return jsonify({"message": "An unexpected error occurred"}), 500 + + +@bp.route("/", methods=["POST"]) +@jwt_auth_required(role="admin") +def add_analytics(): + try: + data = request.json if request.headers["Content-Type"] == "application/json" else request.form.to_dict() + + successful, result = analytic_store.add(data) + response = result + status_code: int = 201 if successful else 400 + + return jsonify(response), status_code + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/storage/dbconnection.py b/fedn/network/storage/dbconnection.py index 52e56abd1..f580e7714 100644 --- a/fedn/network/storage/dbconnection.py +++ b/fedn/network/storage/dbconnection.py @@ -11,6 +11,7 @@ from sqlalchemy.orm import sessionmaker from fedn.common.config import get_network_config, get_statestore_config +from fedn.network.storage.statestore.stores.analytic_store import AnalyticStore, MongoDBAnalyticStore from fedn.network.storage.statestore.stores.client_store import ClientStore, MongoDBClientStore, SQLClientStore from fedn.network.storage.statestore.stores.combiner_store import CombinerStore, MongoDBCombinerStore, SQLCombinerStore from fedn.network.storage.statestore.stores.model_store import ModelStore, MongoDBModelStore, SQLModelStore @@ -37,6 +38,7 @@ def __init__( # noqa: PLR0913 package_store: PackageStore, model_store: ModelStore, session_store: SessionStore, + analytic_store: AnalyticStore, ) -> None: """Initialize the StoreContainer with various store instances.""" self.client_store = client_store @@ -48,6 +50,7 @@ def __init__( # noqa: PLR0913 self.package_store = package_store self.model_store = model_store self.session_store = session_store + self.analytic_store = analytic_store class DatabaseConnection: @@ -88,6 +91,7 @@ def _init_connection(self) -> None: package_store = MongoDBPackageStore(mdb, "control.packages") model_store = MongoDBModelStore(mdb, "control.models") session_store = MongoDBSessionStore(mdb, "control.sessions") + analytic_store = MongoDBAnalyticStore(mdb, "control.analytics") elif statestore_config["type"] in ["SQLite", "PostgreSQL"]: Session = self._setup_sql(statestore_config) # noqa: N806 @@ -101,11 +105,21 @@ def _init_connection(self) -> None: package_store = SQLPackageStore(Session) model_store = SQLModelStore(Session) session_store = SQLSessionStore(Session) + analytic_store = None else: raise ValueError("Unknown statestore type") self.sc = StoreContainer( - client_store, validation_store, combiner_store, status_store, prediction_store, round_store, package_store, model_store, session_store + client_store, + validation_store, + combiner_store, + status_store, + prediction_store, + round_store, + package_store, + model_store, + session_store, + analytic_store, ) def close(self) -> None: @@ -178,3 +192,7 @@ def model_store(self) -> ModelStore: @property def session_store(self) -> SessionStore: return self.sc.session_store + + @property + def analytic_store(self) -> AnalyticStore: + return self.sc.analytic_store diff --git a/fedn/network/storage/statestore/stores/analytic_store.py b/fedn/network/storage/statestore/stores/analytic_store.py new file mode 100644 index 000000000..4b11e8e92 --- /dev/null +++ b/fedn/network/storage/statestore/stores/analytic_store.py @@ -0,0 +1,64 @@ +from datetime import datetime +from typing import Any, Dict, List, Tuple + +import pymongo +from pymongo.database import Database + +from fedn.network.storage.statestore.stores.store import MongoDBStore, Store + + +class Analytic: + def __init__(self, id: str, client_id: str, type: str, execution_duration: int, model_id: str, committed_at: datetime): + self.id = id + self.client_id = client_id + self.type = type + self.execution_duration = execution_duration + self.model_id = model_id + self.committed_at = committed_at + + +class AnalyticStore(Store[Analytic]): + pass + + +def _validate_analytic(analytic: dict) -> Tuple[bool, str]: + if "client_id" not in analytic: + return False, "client_id is required" + if "type" not in analytic or analytic["type"] not in ["training", "inference"]: + return False, "type must be either 'training' or 'inference'" + return analytic, "" + + +def _complete_analytic(analytic: dict) -> dict: + if "committed_at" not in analytic: + analytic["committed_at"] = datetime.now() + + +class MongoDBAnalyticStore(AnalyticStore, MongoDBStore[Analytic]): + def __init__(self, database: Database, collection: str): + super().__init__(database, collection) + self.database[self.collection].create_index([("client_id", pymongo.DESCENDING)]) + + def get(self, id: str) -> Analytic: + return super().get(id) + + def update(self, id: str, item: Analytic) -> Tuple[bool, Any]: + pass + + def add(self, item: Analytic) -> Tuple[bool, Any]: + valid, msg = _validate_analytic(item) + if not valid: + return False, msg + + _complete_analytic(item) + + return super().add(item) + + def delete(self, id: str) -> bool: + pass + + def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDING, **kwargs) -> Dict[int, List[Analytic]]: + return super().list(limit, skip, sort_key or "committed_at", sort_order, **kwargs) + + def count(self, **kwargs) -> int: + return super().count(**kwargs)