Files
full-stack-fastapi-template/backend/app/tests/api/routes/test_users.py

511 lines
17 KiB
Python
Raw Normal View History

from unittest.mock import patch
from fastapi.testclient import TestClient
from sqlmodel import Session, select
from app import crud
from app.core.config import settings
from app.core.security import verify_password
from app.models import User, UserCreate
from app.tests.utils.utils import random_email, random_lower_string
def test_get_users_superuser_me(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
r = client.get(f"{settings.API_V1_STR}/users/me", headers=superuser_token_headers)
current_user = r.json()
assert current_user
assert current_user["is_active"] is True
assert current_user["is_superuser"]
assert current_user["email"] == settings.FIRST_SUPERUSER
def test_get_users_normal_user_me(
client: TestClient, normal_user_token_headers: dict[str, str]
) -> None:
r = client.get(f"{settings.API_V1_STR}/users/me", headers=normal_user_token_headers)
current_user = r.json()
assert current_user
assert current_user["is_active"] is True
assert current_user["is_superuser"] is False
assert current_user["email"] == settings.EMAIL_TEST_USER
def test_create_user_new_email(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
with (
patch("app.utils.send_email", return_value=None),
patch("app.core.config.settings.SMTP_HOST", "smtp.example.com"),
patch("app.core.config.settings.SMTP_USER", "admin@example.com"),
):
username = random_email()
password = random_lower_string()
data = {"email": username, "password": password}
r = client.post(
f"{settings.API_V1_STR}/users/",
headers=superuser_token_headers,
json=data,
)
assert 200 <= r.status_code < 300
created_user = r.json()
user = crud.get_user_by_email(session=db, email=username)
assert user
assert user.email == created_user["email"]
def test_get_existing_user(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = crud.create_user(session=db, user_create=user_in)
user_id = user.id
r = client.get(
f"{settings.API_V1_STR}/users/{user_id}",
headers=superuser_token_headers,
)
assert 200 <= r.status_code < 300
api_user = r.json()
existing_user = crud.get_user_by_email(session=db, email=username)
assert existing_user
assert existing_user.email == api_user["email"]
def test_get_existing_user_current_user(client: TestClient, db: Session) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = crud.create_user(session=db, user_create=user_in)
user_id = user.id
login_data = {
"username": username,
"password": password,
}
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
tokens = r.json()
a_token = tokens["access_token"]
headers = {"Authorization": f"Bearer {a_token}"}
r = client.get(
f"{settings.API_V1_STR}/users/{user_id}",
headers=headers,
)
assert 200 <= r.status_code < 300
api_user = r.json()
existing_user = crud.get_user_by_email(session=db, email=username)
assert existing_user
assert existing_user.email == api_user["email"]
def test_get_existing_user_permissions_error(
client: TestClient, normal_user_token_headers: dict[str, str]
) -> None:
r = client.get(
f"{settings.API_V1_STR}/users/999999",
headers=normal_user_token_headers,
)
assert r.status_code == 403
assert r.json() == {"detail": "The user doesn't have enough privileges"}
def test_create_user_existing_username(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
username = random_email()
# username = email
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
crud.create_user(session=db, user_create=user_in)
data = {"email": username, "password": password}
r = client.post(
f"{settings.API_V1_STR}/users/",
headers=superuser_token_headers,
json=data,
)
created_user = r.json()
assert r.status_code == 400
assert "_id" not in created_user
def test_create_user_by_normal_user(
client: TestClient, normal_user_token_headers: dict[str, str]
) -> None:
username = random_email()
password = random_lower_string()
data = {"email": username, "password": password}
r = client.post(
f"{settings.API_V1_STR}/users/",
headers=normal_user_token_headers,
json=data,
)
assert r.status_code == 403
def test_retrieve_users(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
crud.create_user(session=db, user_create=user_in)
username2 = random_email()
password2 = random_lower_string()
user_in2 = UserCreate(email=username2, password=password2)
crud.create_user(session=db, user_create=user_in2)
r = client.get(f"{settings.API_V1_STR}/users/", headers=superuser_token_headers)
all_users = r.json()
assert len(all_users["data"]) > 1
assert "count" in all_users
for item in all_users["data"]:
assert "email" in item
def test_update_user_me(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
full_name = "Updated Name"
2024-03-12 17:29:49 +01:00
email = random_email()
data = {"full_name": full_name, "email": email}
r = client.patch(
f"{settings.API_V1_STR}/users/me",
headers=normal_user_token_headers,
json=data,
)
assert r.status_code == 200
updated_user = r.json()
assert updated_user["email"] == email
assert updated_user["full_name"] == full_name
user_query = select(User).where(User.email == email)
user_db = db.exec(user_query).first()
assert user_db
assert user_db.email == email
assert user_db.full_name == full_name
def test_update_password_me(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
new_password = random_lower_string()
data = {
"current_password": settings.FIRST_SUPERUSER_PASSWORD,
"new_password": new_password,
}
r = client.patch(
f"{settings.API_V1_STR}/users/me/password",
headers=superuser_token_headers,
json=data,
)
assert r.status_code == 200
updated_user = r.json()
assert updated_user["message"] == "Password updated successfully"
user_query = select(User).where(User.email == settings.FIRST_SUPERUSER)
user_db = db.exec(user_query).first()
assert user_db
assert user_db.email == settings.FIRST_SUPERUSER
assert verify_password(new_password, user_db.hashed_password)
# Revert to the old password to keep consistency in test
old_data = {
"current_password": new_password,
"new_password": settings.FIRST_SUPERUSER_PASSWORD,
}
r = client.patch(
f"{settings.API_V1_STR}/users/me/password",
headers=superuser_token_headers,
json=old_data,
)
db.refresh(user_db)
assert r.status_code == 200
assert verify_password(settings.FIRST_SUPERUSER_PASSWORD, user_db.hashed_password)
def test_update_password_me_incorrect_password(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
new_password = random_lower_string()
data = {"current_password": new_password, "new_password": new_password}
r = client.patch(
f"{settings.API_V1_STR}/users/me/password",
headers=superuser_token_headers,
json=data,
)
assert r.status_code == 400
updated_user = r.json()
assert updated_user["detail"] == "Incorrect password"
2024-03-12 17:29:49 +01:00
def test_update_user_me_email_exists(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = crud.create_user(session=db, user_create=user_in)
data = {"email": user.email}
r = client.patch(
f"{settings.API_V1_STR}/users/me",
headers=normal_user_token_headers,
json=data,
)
assert r.status_code == 409
assert r.json()["detail"] == "User with this email already exists"
def test_update_password_me_same_password_error(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
data = {
"current_password": settings.FIRST_SUPERUSER_PASSWORD,
"new_password": settings.FIRST_SUPERUSER_PASSWORD,
}
r = client.patch(
f"{settings.API_V1_STR}/users/me/password",
headers=superuser_token_headers,
json=data,
)
assert r.status_code == 400
updated_user = r.json()
assert (
updated_user["detail"] == "New password cannot be the same as the current one"
)
def test_register_user(client: TestClient, db: Session) -> None:
with patch("app.core.config.settings.USERS_OPEN_REGISTRATION", True):
username = random_email()
password = random_lower_string()
full_name = random_lower_string()
data = {"email": username, "password": password, "full_name": full_name}
r = client.post(
f"{settings.API_V1_STR}/users/signup",
json=data,
)
assert r.status_code == 200
created_user = r.json()
assert created_user["email"] == username
assert created_user["full_name"] == full_name
user_query = select(User).where(User.email == username)
user_db = db.exec(user_query).first()
assert user_db
assert user_db.email == username
assert user_db.full_name == full_name
assert verify_password(password, user_db.hashed_password)
def test_register_user_forbidden_error(client: TestClient) -> None:
with patch("app.core.config.settings.USERS_OPEN_REGISTRATION", False):
username = random_email()
password = random_lower_string()
full_name = random_lower_string()
data = {"email": username, "password": password, "full_name": full_name}
r = client.post(
f"{settings.API_V1_STR}/users/signup",
json=data,
)
assert r.status_code == 403
assert (
r.json()["detail"] == "Open user registration is forbidden on this server"
)
def test_register_user_already_exists_error(client: TestClient) -> None:
with patch("app.core.config.settings.USERS_OPEN_REGISTRATION", True):
password = random_lower_string()
full_name = random_lower_string()
data = {
"email": settings.FIRST_SUPERUSER,
"password": password,
"full_name": full_name,
}
r = client.post(
f"{settings.API_V1_STR}/users/signup",
json=data,
)
assert r.status_code == 400
assert (
r.json()["detail"]
== "The user with this email already exists in the system"
)
def test_update_user(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = crud.create_user(session=db, user_create=user_in)
data = {"full_name": "Updated_full_name"}
r = client.patch(
f"{settings.API_V1_STR}/users/{user.id}",
headers=superuser_token_headers,
json=data,
)
assert r.status_code == 200
updated_user = r.json()
assert updated_user["full_name"] == "Updated_full_name"
user_query = select(User).where(User.email == username)
user_db = db.exec(user_query).first()
db.refresh(user_db)
assert user_db
assert user_db.full_name == "Updated_full_name"
def test_update_user_not_exists(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
data = {"full_name": "Updated_full_name"}
r = client.patch(
f"{settings.API_V1_STR}/users/99999999",
headers=superuser_token_headers,
json=data,
)
assert r.status_code == 404
assert r.json()["detail"] == "The user with this id does not exist in the system"
2024-03-12 17:29:49 +01:00
def test_update_user_email_exists(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = crud.create_user(session=db, user_create=user_in)
username2 = random_email()
password2 = random_lower_string()
user_in2 = UserCreate(email=username2, password=password2)
user2 = crud.create_user(session=db, user_create=user_in2)
data = {"email": user2.email}
r = client.patch(
f"{settings.API_V1_STR}/users/{user.id}",
headers=superuser_token_headers,
json=data,
)
assert r.status_code == 409
assert r.json()["detail"] == "User with this email already exists"
def test_delete_user_me(client: TestClient, db: Session) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = crud.create_user(session=db, user_create=user_in)
user_id = user.id
login_data = {
"username": username,
"password": password,
}
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
tokens = r.json()
a_token = tokens["access_token"]
headers = {"Authorization": f"Bearer {a_token}"}
r = client.delete(
f"{settings.API_V1_STR}/users/me",
headers=headers,
)
assert r.status_code == 200
deleted_user = r.json()
assert deleted_user["message"] == "User deleted successfully"
result = db.exec(select(User).where(User.id == user_id)).first()
assert result is None
user_query = select(User).where(User.id == user_id)
user_db = db.execute(user_query).first()
assert user_db is None
def test_delete_user_me_as_superuser(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
r = client.delete(
f"{settings.API_V1_STR}/users/me",
headers=superuser_token_headers,
)
assert r.status_code == 403
response = r.json()
assert response["detail"] == "Super users are not allowed to delete themselves"
def test_delete_user_super_user(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = crud.create_user(session=db, user_create=user_in)
user_id = user.id
r = client.delete(
f"{settings.API_V1_STR}/users/{user_id}",
headers=superuser_token_headers,
)
assert r.status_code == 200
deleted_user = r.json()
assert deleted_user["message"] == "User deleted successfully"
result = db.exec(select(User).where(User.id == user_id)).first()
assert result is None
user_query = select(User).where(User.id == user_id)
user_db = db.execute(user_query).first()
assert user_db is None
def test_delete_user_not_found(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
r = client.delete(
f"{settings.API_V1_STR}/users/99999999",
headers=superuser_token_headers,
)
assert r.status_code == 404
assert r.json()["detail"] == "User not found"
def test_delete_user_current_super_user_error(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
super_user = crud.get_user_by_email(session=db, email=settings.FIRST_SUPERUSER)
assert super_user
user_id = super_user.id
r = client.delete(
f"{settings.API_V1_STR}/users/{user_id}",
headers=superuser_token_headers,
)
assert r.status_code == 403
assert r.json()["detail"] == "Super users are not allowed to delete themselves"
def test_delete_user_without_privileges(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = crud.create_user(session=db, user_create=user_in)
r = client.delete(
f"{settings.API_V1_STR}/users/{user.id}",
headers=normal_user_token_headers,
)
assert r.status_code == 403
assert r.json()["detail"] == "The user doesn't have enough privileges"