Files
full-stack-fastapi-template/backend/app/api/routes/users.py

220 lines
6.4 KiB
Python
Raw Normal View History

from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from sqlmodel import col, delete, func, select
from app import crud
from app.api.deps import (
CurrentUser,
SessionDep,
get_current_active_superuser,
)
from app.core.config import settings
from app.core.security import get_password_hash, verify_password
from app.models import (
Item,
Message,
UpdatePassword,
User,
UserCreate,
UserCreateOpen,
UserOut,
UsersOut,
UserUpdate,
UserUpdateMe,
)
from app.utils import generate_new_account_email, send_email
router = APIRouter()
@router.get(
"/", dependencies=[Depends(get_current_active_superuser)], response_model=UsersOut
)
def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any:
"""
Retrieve users.
"""
count_statement = select(func.count()).select_from(User)
count = session.exec(count_statement).one()
statement = select(User).offset(skip).limit(limit)
users = session.exec(statement).all()
return UsersOut(data=users, count=count)
@router.post(
"/", dependencies=[Depends(get_current_active_superuser)], response_model=UserOut
)
def create_user(*, session: SessionDep, user_in: UserCreate) -> Any:
"""
Create new user.
"""
user = crud.get_user_by_email(session=session, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this email already exists in the system.",
)
user = crud.create_user(session=session, user_create=user_in)
if settings.emails_enabled and user_in.email:
email_data = generate_new_account_email(
email_to=user_in.email, username=user_in.email, password=user_in.password
)
send_email(
email_to=user_in.email,
subject=email_data.subject,
html_content=email_data.html_content,
)
return user
@router.patch("/me", response_model=UserOut)
def update_user_me(
*, session: SessionDep, user_in: UserUpdateMe, current_user: CurrentUser
) -> Any:
"""
Update own user.
"""
2024-03-12 17:29:49 +01:00
if user_in.email:
existing_user = crud.get_user_by_email(session=session, email=user_in.email)
if existing_user and existing_user.id != current_user.id:
2024-03-12 17:29:49 +01:00
raise HTTPException(
status_code=409, detail="User with this email already exists"
)
user_data = user_in.model_dump(exclude_unset=True)
current_user.sqlmodel_update(user_data)
session.add(current_user)
session.commit()
session.refresh(current_user)
return current_user
@router.patch("/me/password", response_model=Message)
def update_password_me(
*, session: SessionDep, body: UpdatePassword, current_user: CurrentUser
) -> Any:
"""
Update own password.
"""
if not verify_password(body.current_password, current_user.hashed_password):
raise HTTPException(status_code=400, detail="Incorrect password")
if body.current_password == body.new_password:
raise HTTPException(
status_code=400, detail="New password cannot be the same as the current one"
)
hashed_password = get_password_hash(body.new_password)
current_user.hashed_password = hashed_password
session.add(current_user)
session.commit()
return Message(message="Password updated successfully")
@router.get("/me", response_model=UserOut)
def read_user_me(session: SessionDep, current_user: CurrentUser) -> Any:
"""
Get current user.
"""
return current_user
@router.post("/open", response_model=UserOut)
def create_user_open(session: SessionDep, user_in: UserCreateOpen) -> Any:
"""
Create new user without the need to be logged in.
"""
if not settings.USERS_OPEN_REGISTRATION:
raise HTTPException(
status_code=403,
detail="Open user registration is forbidden on this server",
)
user = crud.get_user_by_email(session=session, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this email already exists in the system",
)
user_create = UserCreate.from_orm(user_in)
user = crud.create_user(session=session, user_create=user_create)
return user
@router.get("/{user_id}", response_model=UserOut)
def read_user_by_id(
user_id: int, session: SessionDep, current_user: CurrentUser
) -> Any:
"""
Get a specific user by id.
"""
user = session.get(User, user_id)
if user == current_user:
return user
if not current_user.is_superuser:
raise HTTPException(
status_code=403,
detail="The user doesn't have enough privileges",
)
return user
@router.patch(
"/{user_id}",
dependencies=[Depends(get_current_active_superuser)],
response_model=UserOut,
)
def update_user(
*,
session: SessionDep,
user_id: int,
user_in: UserUpdate,
) -> Any:
"""
Update a user.
"""
2024-03-12 17:29:49 +01:00
db_user = session.get(User, user_id)
if not db_user:
raise HTTPException(
status_code=404,
detail="The user with this id does not exist in the system",
)
2024-03-12 17:29:49 +01:00
if user_in.email:
existing_user = crud.get_user_by_email(session=session, email=user_in.email)
if existing_user and existing_user.id != user_id:
2024-03-12 17:29:49 +01:00
raise HTTPException(
status_code=409, detail="User with this email already exists"
)
db_user = crud.update_user(session=session, db_user=db_user, user_in=user_in)
return db_user
@router.delete("/{user_id}")
def delete_user(
session: SessionDep, current_user: CurrentUser, user_id: int
) -> Message:
"""
Delete a user.
"""
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
elif user != current_user and not current_user.is_superuser:
raise HTTPException(
status_code=403, detail="The user doesn't have enough privileges"
)
elif user == current_user and current_user.is_superuser:
raise HTTPException(
status_code=403, detail="Super users are not allowed to delete themselves"
)
statement = delete(Item).where(col(Item.owner_id) == user_id)
session.exec(statement) # type: ignore
session.delete(user)
session.commit()
return Message(message="User deleted successfully")