forked from Image-X-Institute/The-Real-Time-Imaging-Database
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAuthManagement.py
68 lines (54 loc) · 2.32 KB
/
AuthManagement.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from flask import request, redirect
import functools
from typing import NamedTuple, List, Tuple
from datetime import datetime, timedelta
import random
import string
from DatabaseAdapter import DatabaseAdapter
class SessionInfo(NamedTuple):
id: str
expires: datetime
peer_addr: str
userId: str
def autheticated_access(func):
""" Decorator function, to be used on routes that need to be protected
with an authenticated access mechanism.
"""
@functools.wraps(func)
def wrapper_decorator(*args, **kwargs):
sessionId = request.cookies.get("session")
if sessionId and authManagerInstance.checkSessionValidity(sessionId):
return func(*args, **kwargs)
return redirect('/auth')
return wrapper_decorator
class _AuthManager:
def __init__(self, ) -> None:
self.currentSessions:List[SessionInfo] = []
def addNewSession(self, userId:str, peer_addr="", size=12) -> str:
chars = string.ascii_uppercase + string.digits
newSessionId = ''.join(random.choice(chars) for _ in range(size))
sessionInfo = SessionInfo(id=newSessionId,
expires=datetime.now() + timedelta(hours=1),
peer_addr=peer_addr, userId=userId)
self.currentSessions.append(sessionInfo)
return newSessionId
def getValidSessions(self) ->List[SessionInfo]:
self.removeExpiredSessions()
return self.currentSessions
def checkSessionValidity(self, sessionId:str, peer_addr:str='') -> bool:
for session in self.getValidSessions():
if session.id == sessionId:
return True
return False
def getUserIdForSession(self, sessionId: str) -> str:
for session in self.currentSessions:
if session.id == sessionId:
return session.userId
return None
def removeExpiredSessions(self):
self.currentSessions = [s for s in self.currentSessions \
if (s.expires - datetime.utcnow()).total_seconds() > 0]
def validateAuthRequest(self, email:str, password:str) -> Tuple[bool, str]:
dbAdapter = DatabaseAdapter()
return dbAdapter.authenticateUser(email, password)
authManagerInstance = _AuthManager()