Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/SK-721 | Add JWT verification (decoding) #543

Merged
merged 16 commits into from
Mar 19, 2024
74 changes: 74 additions & 0 deletions fedn/fedn/network/api/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os
from functools import wraps

import jwt
from flask import jsonify, request

# Define your secret key for JWT
SECRET_KEY = os.environ.get('FEDN_JWT_SECRET_KEY', False)
FEDN_JWT_CUSTOM_CLAIM_KEY = os.environ.get('FEDN_JWT_CUSTOM_CLAIM_KEY', False)
FEDN_JWT_CUSTOM_CLAIM_VALUE = os.environ.get('FEDN_JWT_CUSTOM_CLAIM_VALUE', False)
FEDN_AUTH_SCHEME = os.environ.get('FEDN_AUTH_SCHEME', 'Bearer')
FEDN_AUTH_WHITELIST_URL_PREFIX = os.environ.get('FEDN_AUTH_WHITELIST_URL_PREFIX', False)
FEDN_JWT_ALGORITHM = os.environ.get('FEDN_JWT_ALGORITHM', 'HS256')


def check_role_claims(payload, role):
if 'role' not in payload:
return False
if payload['role'] != role:
return False

return True


def check_custom_claims(payload):
if FEDN_JWT_CUSTOM_CLAIM_KEY and FEDN_JWT_CUSTOM_CLAIM_VALUE:
if payload[FEDN_JWT_CUSTOM_CLAIM_KEY] != FEDN_JWT_CUSTOM_CLAIM_VALUE:
return False
return True


def if_whitelisted_url_prefix(path):
if FEDN_AUTH_WHITELIST_URL_PREFIX and path.startswith(FEDN_AUTH_WHITELIST_URL_PREFIX):
return True
else:
return False


def jwt_auth_required(role=None):
def actual_decorator(func):
if not SECRET_KEY:
return func

@wraps(func)
def decorated(*args, **kwargs):
if if_whitelisted_url_prefix(request.path):
return func(*args, **kwargs)
token = request.headers.get('Authorization')
if not token:
return jsonify({'message': 'Missing token'}), 401
# Get token from the header Bearer
if token.startswith(FEDN_AUTH_SCHEME):
token = token.split(' ')[1]
else:
return jsonify({'message':
f'Invalid token scheme, expected {FEDN_AUTH_SCHEME}'
}), 401
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[FEDN_JWT_ALGORITHM])
if not check_role_claims(payload, role):
return jsonify({'message': 'Invalid token'}), 401
if not check_custom_claims(payload):
return jsonify({'message': 'Invalid token'}), 401

except jwt.ExpiredSignatureError:
return jsonify({'message': 'Token expired'}), 401

except jwt.InvalidTokenError:
return jsonify({'message': 'Invalid token'}), 401

return func(*args, **kwargs)

return decorated
return actual_decorator
2 changes: 1 addition & 1 deletion fedn/fedn/network/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, host, port=None, secure=False, verify=False, token=None, auth
# Auth scheme passed as argument overrides environment variable.
# "Token" is the default auth scheme.
if not auth_scheme:
auth_scheme = os.environ.get("FEDN_AUTH_SCHEME", "Token")
auth_scheme = os.environ.get("FEDN_AUTH_SCHEME", "Bearer")
# Override potential env variable if token is passed as argument.
if not token:
token = os.environ.get("FEDN_AUTH_TOKEN", False)
Expand Down
Loading
Loading