♻️ Refactor email logic to allow re-using util functions for testing and development (#663)

This commit is contained in:
Sebastián Ramírez
2024-03-10 00:02:36 +01:00
committed by GitHub
parent c048a61d81
commit d4e35a0dfd
8 changed files with 102 additions and 70 deletions

View File

@@ -3,7 +3,7 @@ from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from jose import JWTError, jwt
from pydantic import ValidationError
from sqlmodel import Session
@@ -32,7 +32,7 @@ def get_current_user(session: SessionDep, token: TokenDep) -> User:
token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]
)
token_data = TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
except (JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",

View File

@@ -12,7 +12,8 @@ from app.core.security import get_password_hash
from app.models import Message, NewPassword, Token, User, UserOut
from app.utils import (
generate_password_reset_token,
send_reset_password_email,
generate_reset_password_email,
send_email,
verify_password_reset_token,
)
@@ -62,9 +63,14 @@ def recover_password(email: str, session: SessionDep) -> Message:
detail="The user with this username does not exist in the system.",
)
password_reset_token = generate_password_reset_token(email=email)
send_reset_password_email(
email_data = generate_reset_password_email(
email_to=user.email, email=email, token=password_reset_token
)
send_email(
email_to=user.email,
subject=email_data.subject,
html_content=email_data.html_content,
)
return Message(message="Password recovery email sent")

View File

@@ -23,7 +23,7 @@ from app.models import (
UserUpdate,
UserUpdateMe,
)
from app.utils import send_new_account_email
from app.utils import generate_new_account_email, send_email
router = APIRouter()
@@ -61,9 +61,14 @@ def create_user(*, session: SessionDep, user_in: UserCreate) -> Any:
user = crud.create_user(session=session, user_create=user_in)
if settings.EMAILS_ENABLED and user_in.email:
send_new_account_email(
email_data = generate_new_account_email(
email_to=user_in.email, username=user_in.email, password=user_in.password
)
send_email(
email_to=user_in.email,
subject=email_data.subject,
html_content=email_data.html_content,
)
return user
@@ -185,7 +190,9 @@ def delete_user(
if not user:
raise HTTPException(status_code=404, detail="User not found")
if (user == current_user and not current_user.is_superuser) or (user != current_user and current_user.is_superuser):
if (user == current_user and not current_user.is_superuser) or (
user != current_user and current_user.is_superuser
):
statement = delete(Item).where(Item.owner_id == user_id)
session.exec(statement)
session.delete(user)

View File

@@ -4,7 +4,7 @@ from pydantic.networks import EmailStr
from app.api.deps import get_current_active_superuser
from app.core.celery_app import celery_app
from app.models import Message
from app.utils import send_test_email
from app.utils import generate_test_email, send_email
router = APIRouter()
@@ -31,5 +31,10 @@ def test_email(email_to: EmailStr) -> Message:
"""
Test emails.
"""
send_test_email(email_to=email_to)
email_data = generate_test_email(email_to=email_to)
send_email(
email_to=email_to,
subject=email_data.subject,
html_content=email_data.html_content,
)
return Message(message="Test email sent")

View File

@@ -39,8 +39,8 @@ def test_use_access_token(
def test_recovery_password(
client: TestClient, normal_user_token_headers: dict[str, str], mocker
) -> None:
mocker.patch("app.utils.send_reset_password_email", return_value=None)
mocker.patch("app.utils.send_email", return_value=None)
mocker.patch("app.core.config.settings.EMAILS_ENABLED", True)
email = "test@example.com"
r = client.post(
f"{settings.API_V1_STR}/password-recovery/{email}",
@@ -75,7 +75,7 @@ def test_reset_password(
r = client.post(
f"{settings.API_V1_STR}/reset-password/",
headers=superuser_token_headers,
json=data
json=data,
)
assert r.status_code == 200
assert r.json() == {"message": "Password updated successfully"}
@@ -88,7 +88,7 @@ def test_reset_password_invalid_token(
r = client.post(
f"{settings.API_V1_STR}/reset-password/",
headers=superuser_token_headers,
json=data
json=data,
)
response = r.json()

View File

@@ -32,7 +32,7 @@ def test_get_users_normal_user_me(
def test_create_user_new_email(
client: TestClient, superuser_token_headers: dict, db: Session, mocker
) -> None:
mocker.patch("app.utils.send_new_account_email")
mocker.patch("app.utils.send_email")
mocker.patch("app.core.config.settings.EMAILS_ENABLED", True)
username = random_email()
password = random_lower_string()
@@ -68,9 +68,7 @@ def test_get_existing_user(
assert existing_user.email == api_user["email"]
def test_get_existing_user_current_user(
client: TestClient, db: Session
) -> None:
def test_get_existing_user_current_user(client: TestClient, db: Session) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
@@ -184,7 +182,10 @@ def test_update_password_me(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
new_password = random_lower_string()
data = {"current_password": settings.FIRST_SUPERUSER_PASSWORD, "new_password": new_password}
data = {
"current_password": settings.FIRST_SUPERUSER_PASSWORD,
"new_password": new_password,
}
r = client.patch(
f"{settings.API_V1_STR}/users/me/password",
headers=superuser_token_headers,
@@ -195,7 +196,10 @@ def test_update_password_me(
assert updated_user["message"] == "Password updated successfully"
# Revert to the old password to keep consistency in test
old_data = {"current_password": new_password, "new_password": settings.FIRST_SUPERUSER_PASSWORD}
old_data = {
"current_password": new_password,
"new_password": settings.FIRST_SUPERUSER_PASSWORD,
}
r = client.patch(
f"{settings.API_V1_STR}/users/me/password",
headers=superuser_token_headers,
@@ -222,7 +226,10 @@ def test_update_password_me_incorrect_password(
def test_update_password_me_same_password_error(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
data = {"current_password": settings.FIRST_SUPERUSER_PASSWORD, "new_password": settings.FIRST_SUPERUSER_PASSWORD}
data = {
"current_password": settings.FIRST_SUPERUSER_PASSWORD,
"new_password": settings.FIRST_SUPERUSER_PASSWORD,
}
r = client.patch(
f"{settings.API_V1_STR}/users/me/password",
headers=superuser_token_headers,
@@ -230,12 +237,12 @@ def test_update_password_me_same_password_error(
)
assert r.status_code == 400
updated_user = r.json()
assert updated_user["detail"] == "New password cannot be the same as the current one"
assert (
updated_user["detail"] == "New password cannot be the same as the current one"
)
def test_create_user_open(
client: TestClient, mocker
) -> None:
def test_create_user_open(client: TestClient, mocker) -> None:
mocker.patch("app.core.config.settings.USERS_OPEN_REGISTRATION", True)
username = random_email()
password = random_lower_string()
@@ -251,9 +258,7 @@ def test_create_user_open(
assert created_user["full_name"] == full_name
def test_create_user_open_forbidden_error(
client: TestClient, mocker
) -> None:
def test_create_user_open_forbidden_error(client: TestClient, mocker) -> None:
mocker.patch("app.core.config.settings.USERS_OPEN_REGISTRATION", False)
username = random_email()
password = random_lower_string()
@@ -267,19 +272,23 @@ def test_create_user_open_forbidden_error(
assert r.json()["detail"] == "Open user registration is forbidden on this server"
def test_create_user_open_already_exists_error(
client: TestClient, mocker
) -> None:
def test_create_user_open_already_exists_error(client: TestClient, mocker) -> None:
mocker.patch("app.core.config.settings.USERS_OPEN_REGISTRATION", True)
password = random_lower_string()
full_name = random_lower_string()
data = {"email": settings.FIRST_SUPERUSER, "password": password, "full_name": full_name}
data = {
"email": settings.FIRST_SUPERUSER,
"password": password,
"full_name": full_name,
}
r = client.post(
f"{settings.API_V1_STR}/users/open",
json=data,
)
assert r.status_code == 400
assert r.json()["detail"] == "The user with this username already exists in the system"
assert (
r.json()["detail"] == "The user with this username already exists in the system"
)
def test_update_user(
@@ -311,7 +320,9 @@ def test_update_user_not_exists(
json=data,
)
assert r.status_code == 404
assert r.json()["detail"] == "The user with this username does not exist in the system"
assert (
r.json()["detail"] == "The user with this username does not exist in the system"
)
def test_delete_user_super_user(
@@ -331,9 +342,7 @@ def test_delete_user_super_user(
assert deleted_user["message"] == "User deleted successfully"
def test_delete_user_current_user(
client: TestClient, db: Session
) -> None:
def test_delete_user_current_user(client: TestClient, db: Session) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)

View File

@@ -1,26 +1,38 @@
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
import emails
from emails.template import JinjaTemplate
from jose import jwt
from jinja2 import Template
from jose import JWTError, jwt
from app.core.config import settings
@dataclass
class EmailData:
html_content: str
subject: str
def render_email_template(*, template_name: str, context: dict[str, Any]):
template_str = (Path(settings.EMAIL_TEMPLATES_DIR) / template_name).read_text()
html_content = Template(template_str).render(context)
return html_content
def send_email(
*,
email_to: str,
subject_template: str = "",
html_template: str = "",
environment: dict[str, Any] | None = None,
subject: str = "",
html_content: str = "",
) -> None:
current_environment = environment or {}
assert settings.EMAILS_ENABLED, "no provided configuration for email variables"
message = emails.Message(
subject=JinjaTemplate(subject_template),
html=JinjaTemplate(html_template),
subject=subject,
html=html_content,
mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
)
smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
@@ -30,35 +42,28 @@ def send_email(
smtp_options["user"] = settings.SMTP_USER
if settings.SMTP_PASSWORD:
smtp_options["password"] = settings.SMTP_PASSWORD
response = message.send(to=email_to, render=current_environment, smtp=smtp_options)
response = message.send(to=email_to, smtp=smtp_options)
logging.info(f"send email result: {response}")
def send_test_email(email_to: str) -> None:
def generate_test_email(email_to: str) -> EmailData:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Test email"
with open(Path(settings.EMAIL_TEMPLATES_DIR) / "test_email.html") as f:
template_str = f.read()
send_email(
email_to=email_to,
subject_template=subject,
html_template=template_str,
environment={"project_name": settings.PROJECT_NAME, "email": email_to},
html_content = render_email_template(
template_name="test_email.html",
context={"project_name": settings.PROJECT_NAME, "email": email_to},
)
return EmailData(html_content=html_content, subject=subject)
def send_reset_password_email(email_to: str, email: str, token: str) -> None:
def generate_reset_password_email(email_to: str, email: str, token: str) -> EmailData:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Password recovery for user {email}"
with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
template_str = f.read()
server_host = settings.SERVER_HOST
link = f"{server_host}/reset-password?token={token}"
send_email(
email_to=email_to,
subject_template=subject,
html_template=template_str,
environment={
html_content = render_email_template(
template_name="reset_password.html",
context={
"project_name": settings.PROJECT_NAME,
"username": email,
"email": email_to,
@@ -66,19 +71,18 @@ def send_reset_password_email(email_to: str, email: str, token: str) -> None:
"link": link,
},
)
return EmailData(html_content=html_content, subject=subject)
def send_new_account_email(email_to: str, username: str, password: str) -> None:
def generate_new_account_email(
email_to: str, username: str, password: str
) -> EmailData:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - New account for user {username}"
with open(Path(settings.EMAIL_TEMPLATES_DIR) / "new_account.html") as f:
template_str = f.read()
link = settings.SERVER_HOST
send_email(
email_to=email_to,
subject_template=subject,
html_template=template_str,
environment={
html_content = render_email_template(
template_name="new_account.html",
context={
"project_name": settings.PROJECT_NAME,
"username": username,
"password": password,
@@ -86,6 +90,7 @@ def send_new_account_email(email_to: str, username: str, password: str) -> None:
"link": link,
},
)
return EmailData(html_content=html_content, subject=subject)
def generate_password_reset_token(email: str) -> str:
@@ -105,5 +110,5 @@ def verify_password_reset_token(token: str) -> str | None:
try:
decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
return decoded_token["sub"]
except jwt.JWTError:
except JWTError:
return None