Module routers.user
Endpoints related to user management
Expand source code
"""Endpoints related to user management"""
from fastapi import APIRouter, HTTPException, Depends, status
from typing import List
from models.user import User, Admin, UserCreate, UserRead, UserRole, UserUpdate
from repositories.user import UserRepository
from security.auth import INSUFFICIENT_PERMISSIONS_ERROR, AuthRules
router = APIRouter()
async def get_user_by_email(email: str)->User:
"""Gets the user from the database
Args:
email (str): the email of the user
Raises:
HTTPException: with status 404 if not found
Returns:
User: the user in the database
"""
user = await UserRepository().get_user_by_email(email)
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f'The user with email {email} does not exist')
return user
async def get_edit_user(is_admin: bool = Depends(AuthRules.is_admin), current_user: User = Depends(AuthRules.require_user), user: User = Depends(get_user_by_email)):
"""dependency to check wheter the current user can edit the edit user
Args:
is_admin (bool, optional): boolean indicating if the current user is an admin. Defaults to Depends(AuthRules.is_admin).
current_user (User, optional): the current user. Defaults to Depends(AuthRules.require_user).
user (User, optional): the user to edit. Defaults to Depends(get_user_by_email).
Raises:
INSUFFICIENT_PERMISSIONS_ERROR: when the current user is neither an admin nor the user to edit
Returns:
User: the user to edit
"""
if not is_admin and not user.email == current_user.email:
raise INSUFFICIENT_PERMISSIONS_ERROR
return user
@router.post('', response_model=UserRead)
async def create_user(user: UserCreate)->User:
return await UserRepository().create_user(user)
@router.get('', response_model=List[UserRead])
async def list_users():
return await UserRepository().list_users()
@router.patch('/{email}', response_model=UserRead)
async def patch_user(user_update: UserUpdate, user: User = Depends(get_edit_user)):
return await UserRepository().update_user(user, user_update)
@router.patch('/{email}/role', response_model=User, dependencies=[(Depends(AuthRules.require_admin))])
async def patch_role(new_role: UserRole, user: User = Depends(get_edit_user)):
return await UserRepository().update_role(user, new_role)
@router.post('/admins', response_model=Admin, dependencies=[Depends(AuthRules.require_admin)])
async def create_admin(user: UserCreate):
return await UserRepository().create_admin(user)
@router.get('/admins', response_model=List[UserRead])
async def list_admins():
return await UserRepository().list_admins()
Functions
async def create_admin(user: UserCreate)
-
Expand source code
@router.post('/admins', response_model=Admin, dependencies=[Depends(AuthRules.require_admin)]) async def create_admin(user: UserCreate): return await UserRepository().create_admin(user)
async def create_user(user: UserCreate) ‑> User
-
Expand source code
@router.post('', response_model=UserRead) async def create_user(user: UserCreate)->User: return await UserRepository().create_user(user)
async def get_edit_user(is_admin: bool = Depends(is_admin), current_user: User = Depends(require_user), user: User = Depends(get_user_by_email))
-
dependency to check wheter the current user can edit the edit user
Args
is_admin
:bool
, optional- boolean indicating if the current user is an admin. Defaults to Depends(AuthRules.is_admin).
current_user
:User
, optional- the current user. Defaults to Depends(AuthRules.require_user).
user
:User
, optional- the user to edit. Defaults to Depends(get_user_by_email).
Raises
INSUFFICIENT_PERMISSIONS_ERROR
- when the current user is neither an admin nor the user to edit
Returns
User
- the user to edit
Expand source code
async def get_edit_user(is_admin: bool = Depends(AuthRules.is_admin), current_user: User = Depends(AuthRules.require_user), user: User = Depends(get_user_by_email)): """dependency to check wheter the current user can edit the edit user Args: is_admin (bool, optional): boolean indicating if the current user is an admin. Defaults to Depends(AuthRules.is_admin). current_user (User, optional): the current user. Defaults to Depends(AuthRules.require_user). user (User, optional): the user to edit. Defaults to Depends(get_user_by_email). Raises: INSUFFICIENT_PERMISSIONS_ERROR: when the current user is neither an admin nor the user to edit Returns: User: the user to edit """ if not is_admin and not user.email == current_user.email: raise INSUFFICIENT_PERMISSIONS_ERROR return user
async def get_user_by_email(email: str) ‑> User
-
Gets the user from the database
Args
email
:str
- the email of the user
Raises
HTTPException
- with status 404 if not found
Returns
User
- the user in the database
Expand source code
async def get_user_by_email(email: str)->User: """Gets the user from the database Args: email (str): the email of the user Raises: HTTPException: with status 404 if not found Returns: User: the user in the database """ user = await UserRepository().get_user_by_email(email) if user is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f'The user with email {email} does not exist') return user
async def list_admins()
-
Expand source code
@router.get('/admins', response_model=List[UserRead]) async def list_admins(): return await UserRepository().list_admins()
async def list_users()
-
Expand source code
@router.get('', response_model=List[UserRead]) async def list_users(): return await UserRepository().list_users()
async def patch_role(new_role: UserRole, user: User = Depends(get_edit_user))
-
Expand source code
@router.patch('/{email}/role', response_model=User, dependencies=[(Depends(AuthRules.require_admin))]) async def patch_role(new_role: UserRole, user: User = Depends(get_edit_user)): return await UserRepository().update_role(user, new_role)
async def patch_user(user_update: UserUpdate, user: User = Depends(get_edit_user))
-
Expand source code
@router.patch('/{email}', response_model=UserRead) async def patch_user(user_update: UserUpdate, user: User = Depends(get_edit_user)): return await UserRepository().update_user(user, user_update)