🎨 Format files with pre-commit and Ruff (#611)
This commit is contained in:
committed by
GitHub
parent
2802a4df9e
commit
0cc802eec8
@@ -1,10 +1,8 @@
|
|||||||
from __future__ import with_statement
|
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
from logging.config import fileConfig
|
||||||
|
|
||||||
from alembic import context
|
from alembic import context
|
||||||
from sqlalchemy import engine_from_config, pool
|
from sqlalchemy import engine_from_config, pool
|
||||||
from logging.config import fileConfig
|
|
||||||
|
|
||||||
# this is the Alembic Config object, which provides
|
# this is the Alembic Config object, which provides
|
||||||
# access to the values within the .ini file in use.
|
# access to the values within the .ini file in use.
|
||||||
@@ -69,7 +67,9 @@ def run_migrations_online():
|
|||||||
configuration = config.get_section(config.config_ini_section)
|
configuration = config.get_section(config.config_ini_section)
|
||||||
configuration["sqlalchemy.url"] = get_url()
|
configuration["sqlalchemy.url"] = get_url()
|
||||||
connectable = engine_from_config(
|
connectable = engine_from_config(
|
||||||
configuration, prefix="sqlalchemy.", poolclass=pool.NullPool,
|
configuration,
|
||||||
|
prefix="sqlalchemy.",
|
||||||
|
poolclass=pool.NullPool,
|
||||||
)
|
)
|
||||||
|
|
||||||
with connectable.connect() as connection:
|
with connectable.connect() as connection:
|
||||||
|
|||||||
@@ -5,13 +5,12 @@ Revises:
|
|||||||
Create Date: 2023-11-24 22:55:43.195942
|
Create Date: 2023-11-24 22:55:43.195942
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from alembic import op
|
|
||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
import sqlmodel.sql.sqltypes
|
import sqlmodel.sql.sqltypes
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
# revision identifiers, used by Alembic.
|
||||||
revision = 'e2412789c190'
|
revision = "e2412789c190"
|
||||||
down_revision = None
|
down_revision = None
|
||||||
branch_labels = None
|
branch_labels = None
|
||||||
depends_on = None
|
depends_on = None
|
||||||
@@ -19,30 +18,37 @@ depends_on = None
|
|||||||
|
|
||||||
def upgrade():
|
def upgrade():
|
||||||
# ### commands auto generated by Alembic - please adjust! ###
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
op.create_table('user',
|
op.create_table(
|
||||||
sa.Column('email', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
|
"user",
|
||||||
sa.Column('is_active', sa.Boolean(), nullable=False),
|
sa.Column("email", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
|
||||||
sa.Column('is_superuser', sa.Boolean(), nullable=False),
|
sa.Column("is_active", sa.Boolean(), nullable=False),
|
||||||
sa.Column('full_name', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
sa.Column("is_superuser", sa.Boolean(), nullable=False),
|
||||||
sa.Column('id', sa.Integer(), nullable=False),
|
sa.Column("full_name", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||||
sa.Column('hashed_password', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
|
sa.Column("id", sa.Integer(), nullable=False),
|
||||||
sa.PrimaryKeyConstraint('id')
|
sa.Column(
|
||||||
|
"hashed_password", sqlmodel.sql.sqltypes.AutoString(), nullable=False
|
||||||
|
),
|
||||||
|
sa.PrimaryKeyConstraint("id"),
|
||||||
)
|
)
|
||||||
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
|
op.create_index(op.f("ix_user_email"), "user", ["email"], unique=True)
|
||||||
op.create_table('item',
|
op.create_table(
|
||||||
sa.Column('description', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
"item",
|
||||||
sa.Column('id', sa.Integer(), nullable=False),
|
sa.Column("description", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
|
||||||
sa.Column('title', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
|
sa.Column("id", sa.Integer(), nullable=False),
|
||||||
sa.Column('owner_id', sa.Integer(), nullable=False),
|
sa.Column("title", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
|
||||||
sa.ForeignKeyConstraint(['owner_id'], ['user.id'], ),
|
sa.Column("owner_id", sa.Integer(), nullable=False),
|
||||||
sa.PrimaryKeyConstraint('id')
|
sa.ForeignKeyConstraint(
|
||||||
|
["owner_id"],
|
||||||
|
["user.id"],
|
||||||
|
),
|
||||||
|
sa.PrimaryKeyConstraint("id"),
|
||||||
)
|
)
|
||||||
# ### end Alembic commands ###
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
def downgrade():
|
def downgrade():
|
||||||
# ### commands auto generated by Alembic - please adjust! ###
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
op.drop_table('item')
|
op.drop_table("item")
|
||||||
op.drop_index(op.f('ix_user_email'), table_name='user')
|
op.drop_index(op.f("ix_user_email"), table_name="user")
|
||||||
op.drop_table('user')
|
op.drop_table("user")
|
||||||
# ### end Alembic commands ###
|
# ### end Alembic commands ###
|
||||||
|
|||||||
@@ -1,10 +1,10 @@
|
|||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from fastapi import APIRouter, HTTPException
|
from fastapi import APIRouter, HTTPException
|
||||||
from sqlmodel import select, func
|
from sqlmodel import func, select
|
||||||
|
|
||||||
from app.api.deps import CurrentUser, SessionDep
|
from app.api.deps import CurrentUser, SessionDep
|
||||||
from app.models import Item, ItemCreate, ItemOut, ItemUpdate, Message, ItemsOut
|
from app.models import Item, ItemCreate, ItemOut, ItemsOut, ItemUpdate, Message
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
from typing import Any, List
|
from typing import Any
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, HTTPException
|
from fastapi import APIRouter, Depends, HTTPException
|
||||||
from sqlmodel import select, func
|
from sqlmodel import func, select
|
||||||
|
|
||||||
from app import crud
|
from app import crud
|
||||||
from app.api.deps import (
|
from app.api.deps import (
|
||||||
@@ -28,9 +28,7 @@ router = APIRouter()
|
|||||||
|
|
||||||
|
|
||||||
@router.get(
|
@router.get(
|
||||||
"/",
|
"/", dependencies=[Depends(get_current_active_superuser)], response_model=UsersOut
|
||||||
dependencies=[Depends(get_current_active_superuser)],
|
|
||||||
response_model=UsersOut
|
|
||||||
)
|
)
|
||||||
def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any:
|
def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
from typing import Annotated, Generator
|
from collections.abc import Generator
|
||||||
|
from typing import Annotated
|
||||||
|
|
||||||
from fastapi import Depends, HTTPException, status
|
from fastapi import Depends, HTTPException, status
|
||||||
from fastapi.security import OAuth2PasswordBearer
|
from fastapi.security import OAuth2PasswordBearer
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import secrets
|
import secrets
|
||||||
from typing import Any, Dict, List, Optional, Union
|
from typing import Any
|
||||||
|
|
||||||
from pydantic import AnyHttpUrl, BaseSettings, EmailStr, HttpUrl, PostgresDsn, validator
|
from pydantic import AnyHttpUrl, BaseSettings, EmailStr, HttpUrl, PostgresDsn, validator
|
||||||
|
|
||||||
@@ -14,21 +14,21 @@ class Settings(BaseSettings):
|
|||||||
# BACKEND_CORS_ORIGINS is a JSON-formatted list of origins
|
# BACKEND_CORS_ORIGINS is a JSON-formatted list of origins
|
||||||
# e.g: '["http://localhost", "http://localhost:4200", "http://localhost:3000", \
|
# e.g: '["http://localhost", "http://localhost:4200", "http://localhost:3000", \
|
||||||
# "http://localhost:8080", "http://local.dockertoolbox.tiangolo.com"]'
|
# "http://localhost:8080", "http://local.dockertoolbox.tiangolo.com"]'
|
||||||
BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = []
|
BACKEND_CORS_ORIGINS: list[AnyHttpUrl] = []
|
||||||
|
|
||||||
@validator("BACKEND_CORS_ORIGINS", pre=True)
|
@validator("BACKEND_CORS_ORIGINS", pre=True)
|
||||||
def assemble_cors_origins(cls, v: Union[str, List[str]]) -> Union[List[str], str]:
|
def assemble_cors_origins(cls, v: str | list[str]) -> list[str] | str:
|
||||||
if isinstance(v, str) and not v.startswith("["):
|
if isinstance(v, str) and not v.startswith("["):
|
||||||
return [i.strip() for i in v.split(",")]
|
return [i.strip() for i in v.split(",")]
|
||||||
elif isinstance(v, (list, str)):
|
elif isinstance(v, list | str):
|
||||||
return v
|
return v
|
||||||
raise ValueError(v)
|
raise ValueError(v)
|
||||||
|
|
||||||
PROJECT_NAME: str
|
PROJECT_NAME: str
|
||||||
SENTRY_DSN: Optional[HttpUrl] = None
|
SENTRY_DSN: HttpUrl | None = None
|
||||||
|
|
||||||
@validator("SENTRY_DSN", pre=True)
|
@validator("SENTRY_DSN", pre=True)
|
||||||
def sentry_dsn_can_be_blank(cls, v: str) -> Optional[str]:
|
def sentry_dsn_can_be_blank(cls, v: str) -> str | None:
|
||||||
if len(v) == 0:
|
if len(v) == 0:
|
||||||
return None
|
return None
|
||||||
return v
|
return v
|
||||||
@@ -37,10 +37,10 @@ class Settings(BaseSettings):
|
|||||||
POSTGRES_USER: str
|
POSTGRES_USER: str
|
||||||
POSTGRES_PASSWORD: str
|
POSTGRES_PASSWORD: str
|
||||||
POSTGRES_DB: str
|
POSTGRES_DB: str
|
||||||
SQLALCHEMY_DATABASE_URI: Optional[PostgresDsn] = None
|
SQLALCHEMY_DATABASE_URI: PostgresDsn | None = None
|
||||||
|
|
||||||
@validator("SQLALCHEMY_DATABASE_URI", pre=True)
|
@validator("SQLALCHEMY_DATABASE_URI", pre=True)
|
||||||
def assemble_db_connection(cls, v: Optional[str], values: Dict[str, Any]) -> Any:
|
def assemble_db_connection(cls, v: str | None, values: dict[str, Any]) -> Any:
|
||||||
if isinstance(v, str):
|
if isinstance(v, str):
|
||||||
return v
|
return v
|
||||||
return PostgresDsn.build(
|
return PostgresDsn.build(
|
||||||
@@ -52,15 +52,15 @@ class Settings(BaseSettings):
|
|||||||
)
|
)
|
||||||
|
|
||||||
SMTP_TLS: bool = True
|
SMTP_TLS: bool = True
|
||||||
SMTP_PORT: Optional[int] = None
|
SMTP_PORT: int | None = None
|
||||||
SMTP_HOST: Optional[str] = None
|
SMTP_HOST: str | None = None
|
||||||
SMTP_USER: Optional[str] = None
|
SMTP_USER: str | None = None
|
||||||
SMTP_PASSWORD: Optional[str] = None
|
SMTP_PASSWORD: str | None = None
|
||||||
EMAILS_FROM_EMAIL: Optional[EmailStr] = None
|
EMAILS_FROM_EMAIL: EmailStr | None = None
|
||||||
EMAILS_FROM_NAME: Optional[str] = None
|
EMAILS_FROM_NAME: str | None = None
|
||||||
|
|
||||||
@validator("EMAILS_FROM_NAME")
|
@validator("EMAILS_FROM_NAME")
|
||||||
def get_project_name(cls, v: Optional[str], values: Dict[str, Any]) -> str:
|
def get_project_name(cls, v: str | None, values: dict[str, Any]) -> str:
|
||||||
if not v:
|
if not v:
|
||||||
return values["PROJECT_NAME"]
|
return values["PROJECT_NAME"]
|
||||||
return v
|
return v
|
||||||
@@ -70,7 +70,7 @@ class Settings(BaseSettings):
|
|||||||
EMAILS_ENABLED: bool = False
|
EMAILS_ENABLED: bool = False
|
||||||
|
|
||||||
@validator("EMAILS_ENABLED", pre=True)
|
@validator("EMAILS_ENABLED", pre=True)
|
||||||
def get_emails_enabled(cls, v: bool, values: Dict[str, Any]) -> bool:
|
def get_emails_enabled(cls, v: bool, values: dict[str, Any]) -> bool:
|
||||||
return bool(
|
return bool(
|
||||||
values.get("SMTP_HOST")
|
values.get("SMTP_HOST")
|
||||||
and values.get("SMTP_PORT")
|
and values.get("SMTP_PORT")
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import Any, Union
|
from typing import Any
|
||||||
|
|
||||||
from jose import jwt
|
from jose import jwt
|
||||||
from passlib.context import CryptContext
|
from passlib.context import CryptContext
|
||||||
@@ -12,9 +12,7 @@ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|||||||
ALGORITHM = "HS256"
|
ALGORITHM = "HS256"
|
||||||
|
|
||||||
|
|
||||||
def create_access_token(
|
def create_access_token(subject: str | Any, expires_delta: timedelta = None) -> str:
|
||||||
subject: Union[str, Any], expires_delta: timedelta = None
|
|
||||||
) -> str:
|
|
||||||
if expires_delta:
|
if expires_delta:
|
||||||
expire = datetime.utcnow() + expires_delta
|
expire = datetime.utcnow() + expires_delta
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -1,16 +1,15 @@
|
|||||||
from .crud_item import item
|
|
||||||
from .crud_user import user
|
|
||||||
|
|
||||||
# For a new basic set of CRUD operations you could just do
|
# For a new basic set of CRUD operations you could just do
|
||||||
|
|
||||||
# from .base import CRUDBase
|
# from .base import CRUDBase
|
||||||
# from app.models.item import Item
|
# from app.models.item import Item
|
||||||
# from app.schemas.item import ItemCreate, ItemUpdate
|
# from app.schemas.item import ItemCreate, ItemUpdate
|
||||||
|
|
||||||
# item = CRUDBase[Item, ItemCreate, ItemUpdate](Item)
|
# item = CRUDBase[Item, ItemCreate, ItemUpdate](Item)
|
||||||
from sqlmodel import Session, select
|
from sqlmodel import Session, select
|
||||||
|
|
||||||
from app.core.security import get_password_hash, verify_password
|
from app.core.security import get_password_hash, verify_password
|
||||||
from app.models import UserCreate, User
|
from app.models import User, UserCreate
|
||||||
|
|
||||||
|
from .crud_item import item as item
|
||||||
|
from .crud_user import user as user
|
||||||
|
|
||||||
|
|
||||||
def create_user(*, session: Session, user_create: UserCreate) -> User:
|
def create_user(*, session: Session, user_create: UserCreate) -> User:
|
||||||
@@ -30,9 +29,9 @@ def get_user_by_email(*, session: Session, email: str) -> User | None:
|
|||||||
|
|
||||||
|
|
||||||
def authenticate(*, session: Session, email: str, password: str) -> User | None:
|
def authenticate(*, session: Session, email: str, password: str) -> User | None:
|
||||||
user = get_user_by_email(session=session, email=email)
|
db_user = get_user_by_email(session=session, email=email)
|
||||||
if not user:
|
if not db_user:
|
||||||
return None
|
return None
|
||||||
if not verify_password(password, user.hashed_password):
|
if not verify_password(password, db_user.hashed_password):
|
||||||
return None
|
return None
|
||||||
return user
|
return db_user
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
|
from typing import Any, Generic, TypeVar
|
||||||
|
|
||||||
from fastapi.encoders import jsonable_encoder
|
from fastapi.encoders import jsonable_encoder
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
@@ -10,7 +10,7 @@ UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
|
|||||||
|
|
||||||
|
|
||||||
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
|
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
|
||||||
def __init__(self, model: Type[ModelType]):
|
def __init__(self, model: type[ModelType]):
|
||||||
"""
|
"""
|
||||||
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
|
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
|
||||||
|
|
||||||
@@ -21,7 +21,7 @@ class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
|
|||||||
"""
|
"""
|
||||||
self.model = model
|
self.model = model
|
||||||
|
|
||||||
def get(self, db: Session, id: Any) -> Optional[ModelType]:
|
def get(self, db: Session, id: Any) -> ModelType | None:
|
||||||
return db.query(self.model).filter(self.model.id == id).first()
|
return db.query(self.model).filter(self.model.id == id).first()
|
||||||
|
|
||||||
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
|
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
|
||||||
@@ -37,7 +37,7 @@ class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
|
|||||||
db: Session,
|
db: Session,
|
||||||
*,
|
*,
|
||||||
db_obj: ModelType,
|
db_obj: ModelType,
|
||||||
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
|
obj_in: UpdateSchemaType | dict[str, Any],
|
||||||
) -> ModelType:
|
) -> ModelType:
|
||||||
obj_data = jsonable_encoder(db_obj)
|
obj_data = jsonable_encoder(db_obj)
|
||||||
if isinstance(obj_in, dict):
|
if isinstance(obj_in, dict):
|
||||||
|
|||||||
@@ -1,5 +1,3 @@
|
|||||||
from typing import List
|
|
||||||
|
|
||||||
from fastapi.encoders import jsonable_encoder
|
from fastapi.encoders import jsonable_encoder
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
@@ -21,7 +19,7 @@ class CRUDItem(CRUDBase[Item, ItemCreate, ItemUpdate]):
|
|||||||
|
|
||||||
def get_multi_by_owner(
|
def get_multi_by_owner(
|
||||||
self, db: Session, *, owner_id: int, skip: int = 0, limit: int = 100
|
self, db: Session, *, owner_id: int, skip: int = 0, limit: int = 100
|
||||||
) -> List[Item]:
|
) -> list[Item]:
|
||||||
return (
|
return (
|
||||||
db.query(self.model)
|
db.query(self.model)
|
||||||
.filter(Item.owner_id == owner_id)
|
.filter(Item.owner_id == owner_id)
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from typing import Any, Dict, Optional, Union
|
from typing import Any
|
||||||
|
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
@@ -9,7 +9,7 @@ from app.schemas.user import UserCreate, UserUpdate
|
|||||||
|
|
||||||
|
|
||||||
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
|
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
|
||||||
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
|
def get_by_email(self, db: Session, *, email: str) -> User | None:
|
||||||
return db.query(User).filter(User.email == email).first()
|
return db.query(User).filter(User.email == email).first()
|
||||||
|
|
||||||
def create(self, db: Session, *, obj_in: UserCreate) -> User:
|
def create(self, db: Session, *, obj_in: UserCreate) -> User:
|
||||||
@@ -25,7 +25,7 @@ class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
|
|||||||
return db_obj
|
return db_obj
|
||||||
|
|
||||||
def update(
|
def update(
|
||||||
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
|
self, db: Session, *, db_obj: User, obj_in: UserUpdate | dict[str, Any]
|
||||||
) -> User:
|
) -> User:
|
||||||
if isinstance(obj_in, dict):
|
if isinstance(obj_in, dict):
|
||||||
update_data = obj_in
|
update_data = obj_in
|
||||||
@@ -37,7 +37,7 @@ class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
|
|||||||
update_data["hashed_password"] = hashed_password
|
update_data["hashed_password"] = hashed_password
|
||||||
return super().update(db, db_obj=db_obj, obj_in=update_data)
|
return super().update(db, db_obj=db_obj, obj_in=update_data)
|
||||||
|
|
||||||
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
|
def authenticate(self, db: Session, *, email: str, password: str) -> User | None:
|
||||||
user = self.get_by_email(db, email=email)
|
user = self.get_by_email(db, email=email)
|
||||||
if not user:
|
if not user:
|
||||||
return None
|
return None
|
||||||
|
|||||||
@@ -1,5 +1,3 @@
|
|||||||
from typing import Union
|
|
||||||
|
|
||||||
from pydantic import EmailStr
|
from pydantic import EmailStr
|
||||||
from sqlmodel import Field, Relationship, SQLModel
|
from sqlmodel import Field, Relationship, SQLModel
|
||||||
|
|
||||||
@@ -9,7 +7,7 @@ class UserBase(SQLModel):
|
|||||||
email: EmailStr = Field(unique=True, index=True)
|
email: EmailStr = Field(unique=True, index=True)
|
||||||
is_active: bool = True
|
is_active: bool = True
|
||||||
is_superuser: bool = False
|
is_superuser: bool = False
|
||||||
full_name: Union[str, None] = None
|
full_name: str | None = None
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on creation
|
# Properties to receive via API on creation
|
||||||
@@ -20,18 +18,18 @@ class UserCreate(UserBase):
|
|||||||
class UserCreateOpen(SQLModel):
|
class UserCreateOpen(SQLModel):
|
||||||
email: EmailStr
|
email: EmailStr
|
||||||
password: str
|
password: str
|
||||||
full_name: Union[str, None] = None
|
full_name: str | None = None
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on update, all are optional
|
# Properties to receive via API on update, all are optional
|
||||||
class UserUpdate(UserBase):
|
class UserUpdate(UserBase):
|
||||||
email: Union[EmailStr, None] = None
|
email: EmailStr | None = None
|
||||||
password: Union[str, None] = None
|
password: str | None = None
|
||||||
|
|
||||||
|
|
||||||
class UserUpdateMe(SQLModel):
|
class UserUpdateMe(SQLModel):
|
||||||
full_name: Union[str, None] = None
|
full_name: str | None = None
|
||||||
email: Union[EmailStr, None] = None
|
email: EmailStr | None = None
|
||||||
|
|
||||||
|
|
||||||
class UpdatePassword(SQLModel):
|
class UpdatePassword(SQLModel):
|
||||||
@@ -41,7 +39,7 @@ class UpdatePassword(SQLModel):
|
|||||||
|
|
||||||
# Database model, database table inferred from class name
|
# Database model, database table inferred from class name
|
||||||
class User(UserBase, table=True):
|
class User(UserBase, table=True):
|
||||||
id: Union[int, None] = Field(default=None, primary_key=True)
|
id: int | None = Field(default=None, primary_key=True)
|
||||||
hashed_password: str
|
hashed_password: str
|
||||||
items: list["Item"] = Relationship(back_populates="owner")
|
items: list["Item"] = Relationship(back_populates="owner")
|
||||||
|
|
||||||
@@ -59,7 +57,7 @@ class UsersOut(SQLModel):
|
|||||||
# Shared properties
|
# Shared properties
|
||||||
class ItemBase(SQLModel):
|
class ItemBase(SQLModel):
|
||||||
title: str
|
title: str
|
||||||
description: Union[str, None] = None
|
description: str | None = None
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive on item creation
|
# Properties to receive on item creation
|
||||||
@@ -69,17 +67,15 @@ class ItemCreate(ItemBase):
|
|||||||
|
|
||||||
# Properties to receive on item update
|
# Properties to receive on item update
|
||||||
class ItemUpdate(ItemBase):
|
class ItemUpdate(ItemBase):
|
||||||
title: Union[str, None] = None
|
title: str | None = None
|
||||||
|
|
||||||
|
|
||||||
# Database model, database table inferred from class name
|
# Database model, database table inferred from class name
|
||||||
class Item(ItemBase, table=True):
|
class Item(ItemBase, table=True):
|
||||||
id: Union[int, None] = Field(default=None, primary_key=True)
|
id: int | None = Field(default=None, primary_key=True)
|
||||||
title: str
|
title: str
|
||||||
owner_id: Union[int, None] = Field(
|
owner_id: int | None = Field(default=None, foreign_key="user.id", nullable=False)
|
||||||
default=None, foreign_key="user.id", nullable=False
|
owner: User | None = Relationship(back_populates="items")
|
||||||
)
|
|
||||||
owner: Union[User, None] = Relationship(back_populates="items")
|
|
||||||
|
|
||||||
|
|
||||||
# Properties to return via API, id is always required
|
# Properties to return via API, id is always required
|
||||||
@@ -106,7 +102,7 @@ class Token(SQLModel):
|
|||||||
|
|
||||||
# Contents of JWT token
|
# Contents of JWT token
|
||||||
class TokenPayload(SQLModel):
|
class TokenPayload(SQLModel):
|
||||||
sub: Union[int, None] = None
|
sub: int | None = None
|
||||||
|
|
||||||
|
|
||||||
class NewPassword(SQLModel):
|
class NewPassword(SQLModel):
|
||||||
|
|||||||
@@ -1,12 +1,10 @@
|
|||||||
from typing import Optional
|
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
|
||||||
# Shared properties
|
# Shared properties
|
||||||
class ItemBase(BaseModel):
|
class ItemBase(BaseModel):
|
||||||
title: Optional[str] = None
|
title: str | None = None
|
||||||
description: Optional[str] = None
|
description: str | None = None
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive on item creation
|
# Properties to receive on item creation
|
||||||
|
|||||||
@@ -1,5 +1,3 @@
|
|||||||
from typing import Optional
|
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
|
||||||
@@ -9,4 +7,4 @@ class Token(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class TokenPayload(BaseModel):
|
class TokenPayload(BaseModel):
|
||||||
sub: Optional[int] = None
|
sub: int | None = None
|
||||||
|
|||||||
@@ -1,14 +1,12 @@
|
|||||||
from typing import Optional
|
|
||||||
|
|
||||||
from pydantic import BaseModel, EmailStr
|
from pydantic import BaseModel, EmailStr
|
||||||
|
|
||||||
|
|
||||||
# Shared properties
|
# Shared properties
|
||||||
class UserBase(BaseModel):
|
class UserBase(BaseModel):
|
||||||
email: Optional[EmailStr] = None
|
email: EmailStr | None = None
|
||||||
is_active: Optional[bool] = True
|
is_active: bool | None = True
|
||||||
is_superuser: bool = False
|
is_superuser: bool = False
|
||||||
full_name: Optional[str] = None
|
full_name: str | None = None
|
||||||
|
|
||||||
|
|
||||||
# Properties to receive via API on creation
|
# Properties to receive via API on creation
|
||||||
@@ -19,11 +17,11 @@ class UserCreate(UserBase):
|
|||||||
|
|
||||||
# Properties to receive via API on update
|
# Properties to receive via API on update
|
||||||
class UserUpdate(UserBase):
|
class UserUpdate(UserBase):
|
||||||
password: Optional[str] = None
|
password: str | None = None
|
||||||
|
|
||||||
|
|
||||||
class UserInDBBase(UserBase):
|
class UserInDBBase(UserBase):
|
||||||
id: Optional[int] = None
|
id: int | None = None
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
orm_mode = True
|
orm_mode = True
|
||||||
|
|||||||
@@ -1,12 +1,10 @@
|
|||||||
from typing import Dict
|
|
||||||
|
|
||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
|
|
||||||
|
|
||||||
def test_celery_worker_test(
|
def test_celery_worker_test(
|
||||||
client: TestClient, superuser_token_headers: Dict[str, str]
|
client: TestClient, superuser_token_headers: dict[str, str]
|
||||||
) -> None:
|
) -> None:
|
||||||
data = {"message": "test"}
|
data = {"message": "test"}
|
||||||
r = client.post(
|
r = client.post(
|
||||||
|
|||||||
@@ -10,7 +10,9 @@ def test_create_item(
|
|||||||
) -> None:
|
) -> None:
|
||||||
data = {"title": "Foo", "description": "Fighters"}
|
data = {"title": "Foo", "description": "Fighters"}
|
||||||
response = client.post(
|
response = client.post(
|
||||||
f"{settings.API_V1_STR}/items/", headers=superuser_token_headers, json=data,
|
f"{settings.API_V1_STR}/items/",
|
||||||
|
headers=superuser_token_headers,
|
||||||
|
json=data,
|
||||||
)
|
)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
content = response.json()
|
content = response.json()
|
||||||
@@ -25,7 +27,8 @@ def test_read_item(
|
|||||||
) -> None:
|
) -> None:
|
||||||
item = create_random_item(db)
|
item = create_random_item(db)
|
||||||
response = client.get(
|
response = client.get(
|
||||||
f"{settings.API_V1_STR}/items/{item.id}", headers=superuser_token_headers,
|
f"{settings.API_V1_STR}/items/{item.id}",
|
||||||
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
content = response.json()
|
content = response.json()
|
||||||
|
|||||||
@@ -1,5 +1,3 @@
|
|||||||
from typing import Dict
|
|
||||||
|
|
||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
@@ -18,10 +16,11 @@ def test_get_access_token(client: TestClient) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def test_use_access_token(
|
def test_use_access_token(
|
||||||
client: TestClient, superuser_token_headers: Dict[str, str]
|
client: TestClient, superuser_token_headers: dict[str, str]
|
||||||
) -> None:
|
) -> None:
|
||||||
r = client.post(
|
r = client.post(
|
||||||
f"{settings.API_V1_STR}/login/test-token", headers=superuser_token_headers,
|
f"{settings.API_V1_STR}/login/test-token",
|
||||||
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
result = r.json()
|
result = r.json()
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
|||||||
@@ -1,5 +1,3 @@
|
|||||||
from typing import Dict
|
|
||||||
|
|
||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
@@ -10,7 +8,7 @@ from app.tests.utils.utils import random_email, random_lower_string
|
|||||||
|
|
||||||
|
|
||||||
def test_get_users_superuser_me(
|
def test_get_users_superuser_me(
|
||||||
client: TestClient, superuser_token_headers: Dict[str, str]
|
client: TestClient, superuser_token_headers: dict[str, str]
|
||||||
) -> None:
|
) -> None:
|
||||||
r = client.get(f"{settings.API_V1_STR}/users/me", headers=superuser_token_headers)
|
r = client.get(f"{settings.API_V1_STR}/users/me", headers=superuser_token_headers)
|
||||||
current_user = r.json()
|
current_user = r.json()
|
||||||
@@ -21,7 +19,7 @@ def test_get_users_superuser_me(
|
|||||||
|
|
||||||
|
|
||||||
def test_get_users_normal_user_me(
|
def test_get_users_normal_user_me(
|
||||||
client: TestClient, normal_user_token_headers: Dict[str, str]
|
client: TestClient, normal_user_token_headers: dict[str, str]
|
||||||
) -> None:
|
) -> None:
|
||||||
r = client.get(f"{settings.API_V1_STR}/users/me", headers=normal_user_token_headers)
|
r = client.get(f"{settings.API_V1_STR}/users/me", headers=normal_user_token_headers)
|
||||||
current_user = r.json()
|
current_user = r.json()
|
||||||
@@ -38,7 +36,9 @@ def test_create_user_new_email(
|
|||||||
password = random_lower_string()
|
password = random_lower_string()
|
||||||
data = {"email": username, "password": password}
|
data = {"email": username, "password": password}
|
||||||
r = client.post(
|
r = client.post(
|
||||||
f"{settings.API_V1_STR}/users/", headers=superuser_token_headers, json=data,
|
f"{settings.API_V1_STR}/users/",
|
||||||
|
headers=superuser_token_headers,
|
||||||
|
json=data,
|
||||||
)
|
)
|
||||||
assert 200 <= r.status_code < 300
|
assert 200 <= r.status_code < 300
|
||||||
created_user = r.json()
|
created_user = r.json()
|
||||||
@@ -56,7 +56,8 @@ def test_get_existing_user(
|
|||||||
user = crud.user.create(db, obj_in=user_in)
|
user = crud.user.create(db, obj_in=user_in)
|
||||||
user_id = user.id
|
user_id = user.id
|
||||||
r = client.get(
|
r = client.get(
|
||||||
f"{settings.API_V1_STR}/users/{user_id}", headers=superuser_token_headers,
|
f"{settings.API_V1_STR}/users/{user_id}",
|
||||||
|
headers=superuser_token_headers,
|
||||||
)
|
)
|
||||||
assert 200 <= r.status_code < 300
|
assert 200 <= r.status_code < 300
|
||||||
api_user = r.json()
|
api_user = r.json()
|
||||||
@@ -75,7 +76,9 @@ def test_create_user_existing_username(
|
|||||||
crud.user.create(db, obj_in=user_in)
|
crud.user.create(db, obj_in=user_in)
|
||||||
data = {"email": username, "password": password}
|
data = {"email": username, "password": password}
|
||||||
r = client.post(
|
r = client.post(
|
||||||
f"{settings.API_V1_STR}/users/", headers=superuser_token_headers, json=data,
|
f"{settings.API_V1_STR}/users/",
|
||||||
|
headers=superuser_token_headers,
|
||||||
|
json=data,
|
||||||
)
|
)
|
||||||
created_user = r.json()
|
created_user = r.json()
|
||||||
assert r.status_code == 400
|
assert r.status_code == 400
|
||||||
@@ -83,13 +86,15 @@ def test_create_user_existing_username(
|
|||||||
|
|
||||||
|
|
||||||
def test_create_user_by_normal_user(
|
def test_create_user_by_normal_user(
|
||||||
client: TestClient, normal_user_token_headers: Dict[str, str]
|
client: TestClient, normal_user_token_headers: dict[str, str]
|
||||||
) -> None:
|
) -> None:
|
||||||
username = random_email()
|
username = random_email()
|
||||||
password = random_lower_string()
|
password = random_lower_string()
|
||||||
data = {"email": username, "password": password}
|
data = {"email": username, "password": password}
|
||||||
r = client.post(
|
r = client.post(
|
||||||
f"{settings.API_V1_STR}/users/", headers=normal_user_token_headers, json=data,
|
f"{settings.API_V1_STR}/users/",
|
||||||
|
headers=normal_user_token_headers,
|
||||||
|
json=data,
|
||||||
)
|
)
|
||||||
assert r.status_code == 400
|
assert r.status_code == 400
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from typing import Dict, Generator
|
from collections.abc import Generator
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
@@ -24,12 +24,12 @@ def client() -> Generator:
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
def superuser_token_headers(client: TestClient) -> Dict[str, str]:
|
def superuser_token_headers(client: TestClient) -> dict[str, str]:
|
||||||
return get_superuser_token_headers(client)
|
return get_superuser_token_headers(client)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
def normal_user_token_headers(client: TestClient, db: Session) -> Dict[str, str]:
|
def normal_user_token_headers(client: TestClient, db: Session) -> dict[str, str]:
|
||||||
return authentication_token_from_email(
|
return authentication_token_from_email(
|
||||||
client=client, email=settings.EMAIL_TEST_USER, db=db
|
client=client, email=settings.EMAIL_TEST_USER, db=db
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,5 +1,3 @@
|
|||||||
from typing import Optional
|
|
||||||
|
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from app import crud, models
|
from app import crud, models
|
||||||
@@ -8,7 +6,7 @@ from app.tests.utils.user import create_random_user
|
|||||||
from app.tests.utils.utils import random_lower_string
|
from app.tests.utils.utils import random_lower_string
|
||||||
|
|
||||||
|
|
||||||
def create_random_item(db: Session, *, owner_id: Optional[int] = None) -> models.Item:
|
def create_random_item(db: Session, *, owner_id: int | None = None) -> models.Item:
|
||||||
if owner_id is None:
|
if owner_id is None:
|
||||||
user = create_random_user(db)
|
user = create_random_user(db)
|
||||||
owner_id = user.id
|
owner_id = user.id
|
||||||
|
|||||||
@@ -1,5 +1,3 @@
|
|||||||
from typing import Dict
|
|
||||||
|
|
||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
@@ -12,7 +10,7 @@ from app.tests.utils.utils import random_email, random_lower_string
|
|||||||
|
|
||||||
def user_authentication_headers(
|
def user_authentication_headers(
|
||||||
*, client: TestClient, email: str, password: str
|
*, client: TestClient, email: str, password: str
|
||||||
) -> Dict[str, str]:
|
) -> dict[str, str]:
|
||||||
data = {"username": email, "password": password}
|
data = {"username": email, "password": password}
|
||||||
|
|
||||||
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=data)
|
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=data)
|
||||||
@@ -32,7 +30,7 @@ def create_random_user(db: Session) -> User:
|
|||||||
|
|
||||||
def authentication_token_from_email(
|
def authentication_token_from_email(
|
||||||
*, client: TestClient, email: str, db: Session
|
*, client: TestClient, email: str, db: Session
|
||||||
) -> Dict[str, str]:
|
) -> dict[str, str]:
|
||||||
"""
|
"""
|
||||||
Return a valid token for the user with given email.
|
Return a valid token for the user with given email.
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
from typing import Dict
|
|
||||||
|
|
||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
@@ -15,7 +14,7 @@ def random_email() -> str:
|
|||||||
return f"{random_lower_string()}@{random_lower_string()}.com"
|
return f"{random_lower_string()}@{random_lower_string()}.com"
|
||||||
|
|
||||||
|
|
||||||
def get_superuser_token_headers(client: TestClient) -> Dict[str, str]:
|
def get_superuser_token_headers(client: TestClient) -> dict[str, str]:
|
||||||
login_data = {
|
login_data = {
|
||||||
"username": settings.FIRST_SUPERUSER,
|
"username": settings.FIRST_SUPERUSER,
|
||||||
"password": settings.FIRST_SUPERUSER_PASSWORD,
|
"password": settings.FIRST_SUPERUSER_PASSWORD,
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import logging
|
import logging
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any
|
||||||
|
|
||||||
import emails
|
import emails
|
||||||
from emails.template import JinjaTemplate
|
from emails.template import JinjaTemplate
|
||||||
@@ -14,8 +14,9 @@ def send_email(
|
|||||||
email_to: str,
|
email_to: str,
|
||||||
subject_template: str = "",
|
subject_template: str = "",
|
||||||
html_template: str = "",
|
html_template: str = "",
|
||||||
environment: Dict[str, Any] = {},
|
environment: dict[str, Any] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
current_environment = environment or {}
|
||||||
assert settings.EMAILS_ENABLED, "no provided configuration for email variables"
|
assert settings.EMAILS_ENABLED, "no provided configuration for email variables"
|
||||||
message = emails.Message(
|
message = emails.Message(
|
||||||
subject=JinjaTemplate(subject_template),
|
subject=JinjaTemplate(subject_template),
|
||||||
@@ -29,7 +30,7 @@ def send_email(
|
|||||||
smtp_options["user"] = settings.SMTP_USER
|
smtp_options["user"] = settings.SMTP_USER
|
||||||
if settings.SMTP_PASSWORD:
|
if settings.SMTP_PASSWORD:
|
||||||
smtp_options["password"] = settings.SMTP_PASSWORD
|
smtp_options["password"] = settings.SMTP_PASSWORD
|
||||||
response = message.send(to=email_to, render=environment, smtp=smtp_options)
|
response = message.send(to=email_to, render=current_environment, smtp=smtp_options)
|
||||||
logging.info(f"send email result: {response}")
|
logging.info(f"send email result: {response}")
|
||||||
|
|
||||||
|
|
||||||
@@ -42,7 +43,7 @@ def send_test_email(email_to: str) -> None:
|
|||||||
email_to=email_to,
|
email_to=email_to,
|
||||||
subject_template=subject,
|
subject_template=subject,
|
||||||
html_template=template_str,
|
html_template=template_str,
|
||||||
environment={"project_name": settings.PROJECT_NAME, "email": email_to},
|
current_environment={"project_name": settings.PROJECT_NAME, "email": email_to},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -57,7 +58,7 @@ def send_reset_password_email(email_to: str, email: str, token: str) -> None:
|
|||||||
email_to=email_to,
|
email_to=email_to,
|
||||||
subject_template=subject,
|
subject_template=subject,
|
||||||
html_template=template_str,
|
html_template=template_str,
|
||||||
environment={
|
current_environment={
|
||||||
"project_name": settings.PROJECT_NAME,
|
"project_name": settings.PROJECT_NAME,
|
||||||
"username": email,
|
"username": email,
|
||||||
"email": email_to,
|
"email": email_to,
|
||||||
@@ -77,7 +78,7 @@ def send_new_account_email(email_to: str, username: str, password: str) -> None:
|
|||||||
email_to=email_to,
|
email_to=email_to,
|
||||||
subject_template=subject,
|
subject_template=subject,
|
||||||
html_template=template_str,
|
html_template=template_str,
|
||||||
environment={
|
current_environment={
|
||||||
"project_name": settings.PROJECT_NAME,
|
"project_name": settings.PROJECT_NAME,
|
||||||
"username": username,
|
"username": username,
|
||||||
"password": password,
|
"password": password,
|
||||||
@@ -93,12 +94,14 @@ def generate_password_reset_token(email: str) -> str:
|
|||||||
expires = now + delta
|
expires = now + delta
|
||||||
exp = expires.timestamp()
|
exp = expires.timestamp()
|
||||||
encoded_jwt = jwt.encode(
|
encoded_jwt = jwt.encode(
|
||||||
{"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",
|
{"exp": exp, "nbf": now, "sub": email},
|
||||||
|
settings.SECRET_KEY,
|
||||||
|
algorithm="HS256",
|
||||||
)
|
)
|
||||||
return encoded_jwt
|
return encoded_jwt
|
||||||
|
|
||||||
|
|
||||||
def verify_password_reset_token(token: str) -> Optional[str]:
|
def verify_password_reset_token(token: str) -> str | None:
|
||||||
try:
|
try:
|
||||||
decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
|
decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
|
||||||
return decoded_token["email"]
|
return decoded_token["email"]
|
||||||
|
|||||||
Reference in New Issue
Block a user