Module security.auth

Dependencies and helper functions related to security

Expand source code
"""Dependencies and helper functions related to security"""
from datetime import datetime, timedelta
from fastapi import HTTPException, status, Depends
from fastapi.security import OAuth2PasswordBearer
from models.user import Admin, User, UserRole
from repositories.user import UserRepository
from jose import jwt, JWTError
from settings import AuthSettings
from functools import wraps

INVALID_CREDENTIAL_ERROR = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f'Invalid username or password')
INVALID_TOKEN_ERROR = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, 
    detail=f'Invalid token, log in again', headers={"WWW-Authenticate": "Bearer"})
INSUFFICIENT_PERMISSIONS_ERROR = HTTPException(status_code=status.HTTP_403_FORBIDDEN, 
    detail=f'Your permissions are not sufficient')

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def auth_user(email: str, password: str)->User:
    """Checks if the given user exists and the password matches with the database entry

    Args:
        email (str): provided email
        password (str): provided password

    Raises:
        INVALID_CREDENTIAL_ERROR: when the user was not found or the password was wrong

    Returns:
        User: The user in the database
    """
    user = await UserRepository().get_user_by_email(email)
    if user is None:
        raise INVALID_CREDENTIAL_ERROR
    if not user.verify_password(password):
        raise INVALID_CREDENTIAL_ERROR
    return user

def create_access_token(data: dict, expires_delta: timedelta | None = None)->str:
    """Creates a jwt token string

    Args:
        data (dict): The data to encode in the jwt
        expires_delta (timedelta | None, optional): timedelta when the token should expire. Defaults to None.

    Returns:
        str: The encoded jwt token string
    """
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, AuthSettings().secret_key, algorithm=AuthSettings().jwt_algorithm)
    return encoded_jwt

class AuthRules:
    """Class holding all auth rule dependencies"""
    async def require_user(token: str = Depends(oauth2_scheme))->User:
        """Gets the user from the jwt token from the request

        Args:
            token (str, optional): The token. Defaults to Depends(oauth2_scheme).

        Raises:
            INVALID_TOKEN_ERROR: When the token is expired, invalid or the user from the token does not exist

        Returns:
            User: the owner of the token
        """
        try:
            payload = jwt.decode(token, AuthSettings().secret_key, algorithms=[AuthSettings().jwt_algorithm])
            email = payload.get('sub')
            if email is None:
                raise INVALID_TOKEN_ERROR
        except JWTError as e:
            raise INVALID_TOKEN_ERROR
        user = await UserRepository().get_user_by_email(email)
        if user is None:
            raise INVALID_TOKEN_ERROR
        return user

    async def is_admin(user: User = Depends(require_user))->bool:
        """checks if the current user has the admin role

        Args:
            user (User, optional): the user object to check. Defaults to Depends(require_user).

        Returns:
            bool: true if the user has the admin role
        """
        return user.role == UserRole.admin

    async def require_admin(user: User = Depends(require_user), is_admin: bool = Depends(is_admin))->Admin:
        """dependency returning the current user if it has the admin role. 

        Args:
            user (User, optional): the user object to check. Defaults to Depends(require_user).
            is_admin (bool, optional): a boolean indicating wheter the user is admin or not. Defaults to Depends(is_admin).

        Raises:
            INSUFFICIENT_PERMISSIONS_ERROR: when the user is not an admin

        Returns:
            Admin: the admin object
        """
        if not is_admin:
            raise INSUFFICIENT_PERMISSIONS_ERROR
        return user

Functions

async def auth_user(email: str, password: str) ‑> User

Checks if the given user exists and the password matches with the database entry

Args

email : str
provided email
password : str
provided password

Raises

INVALID_CREDENTIAL_ERROR
when the user was not found or the password was wrong

Returns

User
The user in the database
Expand source code
async def auth_user(email: str, password: str)->User:
    """Checks if the given user exists and the password matches with the database entry

    Args:
        email (str): provided email
        password (str): provided password

    Raises:
        INVALID_CREDENTIAL_ERROR: when the user was not found or the password was wrong

    Returns:
        User: The user in the database
    """
    user = await UserRepository().get_user_by_email(email)
    if user is None:
        raise INVALID_CREDENTIAL_ERROR
    if not user.verify_password(password):
        raise INVALID_CREDENTIAL_ERROR
    return user
def create_access_token(data: dict, expires_delta: datetime.timedelta | None = None) ‑> str

Creates a jwt token string

Args

data : dict
The data to encode in the jwt

expires_delta (timedelta | None, optional): timedelta when the token should expire. Defaults to None.

Returns

str
The encoded jwt token string
Expand source code
def create_access_token(data: dict, expires_delta: timedelta | None = None)->str:
    """Creates a jwt token string

    Args:
        data (dict): The data to encode in the jwt
        expires_delta (timedelta | None, optional): timedelta when the token should expire. Defaults to None.

    Returns:
        str: The encoded jwt token string
    """
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, AuthSettings().secret_key, algorithm=AuthSettings().jwt_algorithm)
    return encoded_jwt

Classes

class AuthRules

Class holding all auth rule dependencies

Expand source code
class AuthRules:
    """Class holding all auth rule dependencies"""
    async def require_user(token: str = Depends(oauth2_scheme))->User:
        """Gets the user from the jwt token from the request

        Args:
            token (str, optional): The token. Defaults to Depends(oauth2_scheme).

        Raises:
            INVALID_TOKEN_ERROR: When the token is expired, invalid or the user from the token does not exist

        Returns:
            User: the owner of the token
        """
        try:
            payload = jwt.decode(token, AuthSettings().secret_key, algorithms=[AuthSettings().jwt_algorithm])
            email = payload.get('sub')
            if email is None:
                raise INVALID_TOKEN_ERROR
        except JWTError as e:
            raise INVALID_TOKEN_ERROR
        user = await UserRepository().get_user_by_email(email)
        if user is None:
            raise INVALID_TOKEN_ERROR
        return user

    async def is_admin(user: User = Depends(require_user))->bool:
        """checks if the current user has the admin role

        Args:
            user (User, optional): the user object to check. Defaults to Depends(require_user).

        Returns:
            bool: true if the user has the admin role
        """
        return user.role == UserRole.admin

    async def require_admin(user: User = Depends(require_user), is_admin: bool = Depends(is_admin))->Admin:
        """dependency returning the current user if it has the admin role. 

        Args:
            user (User, optional): the user object to check. Defaults to Depends(require_user).
            is_admin (bool, optional): a boolean indicating wheter the user is admin or not. Defaults to Depends(is_admin).

        Raises:
            INSUFFICIENT_PERMISSIONS_ERROR: when the user is not an admin

        Returns:
            Admin: the admin object
        """
        if not is_admin:
            raise INSUFFICIENT_PERMISSIONS_ERROR
        return user

Methods

async def is_admin(user: User = Depends(require_user)) ‑> bool

checks if the current user has the admin role

Args

user : User, optional
the user object to check. Defaults to Depends(require_user).

Returns

bool
true if the user has the admin role
Expand source code
async def is_admin(user: User = Depends(require_user))->bool:
    """checks if the current user has the admin role

    Args:
        user (User, optional): the user object to check. Defaults to Depends(require_user).

    Returns:
        bool: true if the user has the admin role
    """
    return user.role == UserRole.admin
async def require_admin(user: User = Depends(require_user), is_admin: bool = Depends(is_admin)) ‑> Admin

dependency returning the current user if it has the admin role.

Args

user : User, optional
the user object to check. Defaults to Depends(require_user).
is_admin : bool, optional
a boolean indicating wheter the user is admin or not. Defaults to Depends(is_admin).

Raises

INSUFFICIENT_PERMISSIONS_ERROR
when the user is not an admin

Returns

Admin
the admin object
Expand source code
async def require_admin(user: User = Depends(require_user), is_admin: bool = Depends(is_admin))->Admin:
    """dependency returning the current user if it has the admin role. 

    Args:
        user (User, optional): the user object to check. Defaults to Depends(require_user).
        is_admin (bool, optional): a boolean indicating wheter the user is admin or not. Defaults to Depends(is_admin).

    Raises:
        INSUFFICIENT_PERMISSIONS_ERROR: when the user is not an admin

    Returns:
        Admin: the admin object
    """
    if not is_admin:
        raise INSUFFICIENT_PERMISSIONS_ERROR
    return user
async def require_user(token: str = Depends(OAuth2PasswordBearer)) ‑> User

Gets the user from the jwt token from the request

Args

token : str, optional
The token. Defaults to Depends(oauth2_scheme).

Raises

INVALID_TOKEN_ERROR
When the token is expired, invalid or the user from the token does not exist

Returns

User
the owner of the token
Expand source code
async def require_user(token: str = Depends(oauth2_scheme))->User:
    """Gets the user from the jwt token from the request

    Args:
        token (str, optional): The token. Defaults to Depends(oauth2_scheme).

    Raises:
        INVALID_TOKEN_ERROR: When the token is expired, invalid or the user from the token does not exist

    Returns:
        User: the owner of the token
    """
    try:
        payload = jwt.decode(token, AuthSettings().secret_key, algorithms=[AuthSettings().jwt_algorithm])
        email = payload.get('sub')
        if email is None:
            raise INVALID_TOKEN_ERROR
    except JWTError as e:
        raise INVALID_TOKEN_ERROR
    user = await UserRepository().get_user_by_email(email)
    if user is None:
        raise INVALID_TOKEN_ERROR
    return user