♻️ Refactor project generation, discard cookiecutter, use plain git/clone/fork (#553)

This commit is contained in:
Sebastián Ramírez
2023-11-15 21:20:29 +01:00
committed by GitHub
parent 30c722339b
commit 455de4d9a9
166 changed files with 49 additions and 49 deletions

3
src/backend/app/.flake8 Normal file
View File

@@ -0,0 +1,3 @@
[flake8]
max-line-length = 88
exclude = .git,__pycache__,__init__.py,.mypy_cache,.pytest_cache

3
src/backend/app/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
.mypy_cache
.coverage
htmlcov

71
src/backend/app/alembic.ini Executable file
View File

@@ -0,0 +1,71 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
#truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat alembic/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
src/backend/app/alembic/README Executable file
View File

@@ -0,0 +1 @@
Generic single-database configuration.

87
src/backend/app/alembic/env.py Executable file
View File

@@ -0,0 +1,87 @@
from __future__ import with_statement
import os
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
# target_metadata = None
from app.db.base import Base # noqa
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def get_url():
user = os.getenv("POSTGRES_USER", "postgres")
password = os.getenv("POSTGRES_PASSWORD", "")
server = os.getenv("POSTGRES_SERVER", "db")
db = os.getenv("POSTGRES_DB", "app")
return f"postgresql://{user}:{password}@{server}/{db}"
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = get_url()
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True, compare_type=True
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
configuration = config.get_section(config.config_ini_section)
configuration["sqlalchemy.url"] = get_url()
connectable = engine_from_config(
configuration, prefix="sqlalchemy.", poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata, compare_type=True
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

View File

@@ -0,0 +1,59 @@
"""First revision
Revision ID: d4867f3a4c0a
Revises:
Create Date: 2019-04-17 13:53:32.978401
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "d4867f3a4c0a"
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"user",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("full_name", sa.String(), nullable=True),
sa.Column("email", sa.String(), nullable=True),
sa.Column("hashed_password", sa.String(), nullable=True),
sa.Column("is_active", sa.Boolean(), nullable=True),
sa.Column("is_superuser", sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_user_email"), "user", ["email"], unique=True)
op.create_index(op.f("ix_user_full_name"), "user", ["full_name"], unique=False)
op.create_index(op.f("ix_user_id"), "user", ["id"], unique=False)
op.create_table(
"item",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("title", sa.String(), nullable=True),
sa.Column("description", sa.String(), nullable=True),
sa.Column("owner_id", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(["owner_id"], ["user.id"],),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_item_description"), "item", ["description"], unique=False)
op.create_index(op.f("ix_item_id"), "item", ["id"], unique=False)
op.create_index(op.f("ix_item_title"), "item", ["title"], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_item_title"), table_name="item")
op.drop_index(op.f("ix_item_id"), table_name="item")
op.drop_index(op.f("ix_item_description"), table_name="item")
op.drop_table("item")
op.drop_index(op.f("ix_user_id"), table_name="user")
op.drop_index(op.f("ix_user_full_name"), table_name="user")
op.drop_index(op.f("ix_user_email"), table_name="user")
op.drop_table("user")
# ### end Alembic commands ###

View File

View File

View File

@@ -0,0 +1,9 @@
from fastapi import APIRouter
from app.api.api_v1.endpoints import items, login, users, utils
api_router = APIRouter()
api_router.include_router(login.router, tags=["login"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(utils.router, prefix="/utils", tags=["utils"])
api_router.include_router(items.router, prefix="/items", tags=["items"])

View File

@@ -0,0 +1,99 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/", response_model=List[schemas.Item])
def read_items(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Retrieve items.
"""
if crud.user.is_superuser(current_user):
items = crud.item.get_multi(db, skip=skip, limit=limit)
else:
items = crud.item.get_multi_by_owner(
db=db, owner_id=current_user.id, skip=skip, limit=limit
)
return items
@router.post("/", response_model=schemas.Item)
def create_item(
*,
db: Session = Depends(deps.get_db),
item_in: schemas.ItemCreate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Create new item.
"""
item = crud.item.create_with_owner(db=db, obj_in=item_in, owner_id=current_user.id)
return item
@router.put("/{id}", response_model=schemas.Item)
def update_item(
*,
db: Session = Depends(deps.get_db),
id: int,
item_in: schemas.ItemUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update an item.
"""
item = crud.item.get(db=db, id=id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
if not crud.user.is_superuser(current_user) and (item.owner_id != current_user.id):
raise HTTPException(status_code=400, detail="Not enough permissions")
item = crud.item.update(db=db, db_obj=item, obj_in=item_in)
return item
@router.get("/{id}", response_model=schemas.Item)
def read_item(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get item by ID.
"""
item = crud.item.get(db=db, id=id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
if not crud.user.is_superuser(current_user) and (item.owner_id != current_user.id):
raise HTTPException(status_code=400, detail="Not enough permissions")
return item
@router.delete("/{id}", response_model=schemas.Item)
def delete_item(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Delete an item.
"""
item = crud.item.get(db=db, id=id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
if not crud.user.is_superuser(current_user) and (item.owner_id != current_user.id):
raise HTTPException(status_code=400, detail="Not enough permissions")
item = crud.item.remove(db=db, id=id)
return item

View File

@@ -0,0 +1,96 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
from app.core.security import get_password_hash
from app.utils import (
generate_password_reset_token,
send_reset_password_email,
verify_password_reset_token,
)
router = APIRouter()
@router.post("/login/access-token", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/login/test-token", response_model=schemas.User)
def test_token(current_user: models.User = Depends(deps.get_current_user)) -> Any:
"""
Test access token
"""
return current_user
@router.post("/password-recovery/{email}", response_model=schemas.Msg)
def recover_password(email: str, db: Session = Depends(deps.get_db)) -> Any:
"""
Password Recovery
"""
user = crud.user.get_by_email(db, email=email)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
password_reset_token = generate_password_reset_token(email=email)
send_reset_password_email(
email_to=user.email, email=email, token=password_reset_token
)
return {"msg": "Password recovery email sent"}
@router.post("/reset-password/", response_model=schemas.Msg)
def reset_password(
token: str = Body(...),
new_password: str = Body(...),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Reset password
"""
email = verify_password_reset_token(token)
if not email:
raise HTTPException(status_code=400, detail="Invalid token")
user = crud.user.get_by_email(db, email=email)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
hashed_password = get_password_hash(new_password)
user.hashed_password = hashed_password
db.add(user)
db.commit()
return {"msg": "Password updated successfully"}

View File

@@ -0,0 +1,153 @@
from typing import Any, List
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic.networks import EmailStr
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.core.config import settings
from app.utils import send_new_account_email
router = APIRouter()
@router.get("/", response_model=List[schemas.User])
def read_users(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve users.
"""
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users
@router.post("/", response_model=schemas.User)
def create_user(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.UserCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new user.
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system.",
)
user = crud.user.create(db, obj_in=user_in)
if settings.EMAILS_ENABLED and user_in.email:
send_new_account_email(
email_to=user_in.email, username=user_in.email, password=user_in.password
)
return user
@router.put("/me", response_model=schemas.User)
def update_user_me(
*,
db: Session = Depends(deps.get_db),
password: str = Body(None),
full_name: str = Body(None),
email: EmailStr = Body(None),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
user_in = schemas.UserUpdate(**current_user_data)
if password is not None:
user_in.password = password
if full_name is not None:
user_in.full_name = full_name
if email is not None:
user_in.email = email
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user
@router.get("/me", response_model=schemas.User)
def read_user_me(
db: Session = Depends(deps.get_db),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.post("/open", response_model=schemas.User)
def create_user_open(
*,
db: Session = Depends(deps.get_db),
password: str = Body(...),
email: EmailStr = Body(...),
full_name: str = Body(None),
) -> Any:
"""
Create new user without the need to be logged in.
"""
if not settings.USERS_OPEN_REGISTRATION:
raise HTTPException(
status_code=403,
detail="Open user registration is forbidden on this server",
)
user = crud.user.get_by_email(db, email=email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system",
)
user_in = schemas.UserCreate(password=password, email=email, full_name=full_name)
user = crud.user.create(db, obj_in=user_in)
return user
@router.get("/{user_id}", response_model=schemas.User)
def read_user_by_id(
user_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Get a specific user by id.
"""
user = crud.user.get(db, id=user_id)
if user == current_user:
return user
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return user
@router.put("/{user_id}", response_model=schemas.User)
def update_user(
*,
db: Session = Depends(deps.get_db),
user_id: int,
user_in: schemas.UserUpdate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Update a user.
"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system",
)
user = crud.user.update(db, db_obj=user, obj_in=user_in)
return user

View File

@@ -0,0 +1,35 @@
from typing import Any
from fastapi import APIRouter, Depends
from pydantic.networks import EmailStr
from app import models, schemas
from app.api import deps
from app.core.celery_app import celery_app
from app.utils import send_test_email
router = APIRouter()
@router.post("/test-celery/", response_model=schemas.Msg, status_code=201)
def test_celery(
msg: schemas.Msg,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Test Celery worker.
"""
celery_app.send_task("app.worker.test_celery", args=[msg.msg])
return {"msg": "Word received"}
@router.post("/test-email/", response_model=schemas.Msg, status_code=201)
def test_email(
email_to: EmailStr,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Test emails.
"""
send_test_email(email_to=email_to)
return {"msg": "Test email sent"}

View File

@@ -0,0 +1,61 @@
from typing import Generator
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.core import security
from app.core.config import settings
from app.db.session import SessionLocal
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
)
def get_db() -> Generator:
try:
db = SessionLocal()
yield db
finally:
db.close()
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(reusable_oauth2)
) -> models.User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_active(current_user):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return current_user

View File

@@ -0,0 +1,37 @@
import logging
from tenacity import after_log, before_log, retry, stop_after_attempt, wait_fixed
from app.db.session import SessionLocal
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
max_tries = 60 * 5 # 5 minutes
wait_seconds = 1
@retry(
stop=stop_after_attempt(max_tries),
wait=wait_fixed(wait_seconds),
before=before_log(logger, logging.INFO),
after=after_log(logger, logging.WARN),
)
def init() -> None:
try:
db = SessionLocal()
# Try to create session to check if DB is awake
db.execute("SELECT 1")
except Exception as e:
logger.error(e)
raise e
def main() -> None:
logger.info("Initializing service")
init()
logger.info("Service finished initializing")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,37 @@
import logging
from tenacity import after_log, before_log, retry, stop_after_attempt, wait_fixed
from app.db.session import SessionLocal
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
max_tries = 60 * 5 # 5 minutes
wait_seconds = 1
@retry(
stop=stop_after_attempt(max_tries),
wait=wait_fixed(wait_seconds),
before=before_log(logger, logging.INFO),
after=after_log(logger, logging.WARN),
)
def init() -> None:
try:
# Try to create session to check if DB is awake
db = SessionLocal()
db.execute("SELECT 1")
except Exception as e:
logger.error(e)
raise e
def main() -> None:
logger.info("Initializing service")
init()
logger.info("Service finished initializing")
if __name__ == "__main__":
main()

View File

View File

@@ -0,0 +1,5 @@
from celery import Celery
celery_app = Celery("worker", broker="amqp://guest@queue//")
celery_app.conf.task_routes = {"app.worker.test_celery": "main-queue"}

View File

@@ -0,0 +1,89 @@
import secrets
from typing import Any, Dict, List, Optional, Union
from pydantic import AnyHttpUrl, BaseSettings, EmailStr, HttpUrl, PostgresDsn, validator
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
SECRET_KEY: str = secrets.token_urlsafe(32)
# 60 minutes * 24 hours * 8 days = 8 days
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8
SERVER_NAME: str
SERVER_HOST: AnyHttpUrl
# BACKEND_CORS_ORIGINS is a JSON-formatted list of origins
# e.g: '["http://localhost", "http://localhost:4200", "http://localhost:3000", \
# "http://localhost:8080", "http://local.dockertoolbox.tiangolo.com"]'
BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = []
@validator("BACKEND_CORS_ORIGINS", pre=True)
def assemble_cors_origins(cls, v: Union[str, List[str]]) -> Union[List[str], str]:
if isinstance(v, str) and not v.startswith("["):
return [i.strip() for i in v.split(",")]
elif isinstance(v, (list, str)):
return v
raise ValueError(v)
PROJECT_NAME: str
SENTRY_DSN: Optional[HttpUrl] = None
@validator("SENTRY_DSN", pre=True)
def sentry_dsn_can_be_blank(cls, v: str) -> Optional[str]:
if len(v) == 0:
return None
return v
POSTGRES_SERVER: str
POSTGRES_USER: str
POSTGRES_PASSWORD: str
POSTGRES_DB: str
SQLALCHEMY_DATABASE_URI: Optional[PostgresDsn] = None
@validator("SQLALCHEMY_DATABASE_URI", pre=True)
def assemble_db_connection(cls, v: Optional[str], values: Dict[str, Any]) -> Any:
if isinstance(v, str):
return v
return PostgresDsn.build(
scheme="postgresql",
user=values.get("POSTGRES_USER"),
password=values.get("POSTGRES_PASSWORD"),
host=values.get("POSTGRES_SERVER"),
path=f"/{values.get('POSTGRES_DB') or ''}",
)
SMTP_TLS: bool = True
SMTP_PORT: Optional[int] = None
SMTP_HOST: Optional[str] = None
SMTP_USER: Optional[str] = None
SMTP_PASSWORD: Optional[str] = None
EMAILS_FROM_EMAIL: Optional[EmailStr] = None
EMAILS_FROM_NAME: Optional[str] = None
@validator("EMAILS_FROM_NAME")
def get_project_name(cls, v: Optional[str], values: Dict[str, Any]) -> str:
if not v:
return values["PROJECT_NAME"]
return v
EMAIL_RESET_TOKEN_EXPIRE_HOURS: int = 48
EMAIL_TEMPLATES_DIR: str = "/app/app/email-templates/build"
EMAILS_ENABLED: bool = False
@validator("EMAILS_ENABLED", pre=True)
def get_emails_enabled(cls, v: bool, values: Dict[str, Any]) -> bool:
return bool(
values.get("SMTP_HOST")
and values.get("SMTP_PORT")
and values.get("EMAILS_FROM_EMAIL")
)
EMAIL_TEST_USER: EmailStr = "test@example.com" # type: ignore
FIRST_SUPERUSER: EmailStr
FIRST_SUPERUSER_PASSWORD: str
USERS_OPEN_REGISTRATION: bool = False
class Config:
case_sensitive = True
settings = Settings()

View File

@@ -0,0 +1,34 @@
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
def create_access_token(
subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

View File

@@ -0,0 +1,10 @@
from .crud_item import item
from .crud_user import user
# For a new basic set of CRUD operations you could just do
# from .base import CRUDBase
# from app.models.item import Item
# from app.schemas.item import ItemCreate, ItemUpdate
# item = CRUDBase[Item, ItemCreate, ItemUpdate](Item)

View File

@@ -0,0 +1,66 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.base_class import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data) # type: ignore
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]]
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: int) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

View File

@@ -0,0 +1,34 @@
from typing import List
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.item import Item
from app.schemas.item import ItemCreate, ItemUpdate
class CRUDItem(CRUDBase[Item, ItemCreate, ItemUpdate]):
def create_with_owner(
self, db: Session, *, obj_in: ItemCreate, owner_id: int
) -> Item:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data, owner_id=owner_id)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def get_multi_by_owner(
self, db: Session, *, owner_id: int, skip: int = 0, limit: int = 100
) -> List[Item]:
return (
db.query(self.model)
.filter(Item.owner_id == owner_id)
.offset(skip)
.limit(limit)
.all()
)
item = CRUDItem(Item)

View File

@@ -0,0 +1,55 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_superuser=obj_in.is_superuser,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
if update_data["password"]:
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
user = CRUDUser(User)

View File

View File

@@ -0,0 +1,5 @@
# Import all the models, so that Base has them before being
# imported by Alembic
from app.db.base_class import Base # noqa
from app.models.item import Item # noqa
from app.models.user import User # noqa

View File

@@ -0,0 +1,13 @@
from typing import Any
from sqlalchemy.ext.declarative import as_declarative, declared_attr
@as_declarative()
class Base:
id: Any
__name__: str
# Generate __tablename__ automatically
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()

View File

@@ -0,0 +1,25 @@
from sqlalchemy.orm import Session
from app import crud, schemas
from app.core.config import settings
from app.db import base # noqa: F401
# make sure all SQL Alchemy models are imported (app.db.base) before initializing DB
# otherwise, SQL Alchemy might fail to initialize relationships properly
# for more details: https://github.com/tiangolo/full-stack-fastapi-postgresql/issues/28
def init_db(db: Session) -> None:
# Tables should be created with Alembic migrations
# But if you don't want to use migrations, create
# the tables un-commenting the next line
# Base.metadata.create_all(bind=engine)
user = crud.user.get_by_email(db, email=settings.FIRST_SUPERUSER)
if not user:
user_in = schemas.UserCreate(
email=settings.FIRST_SUPERUSER,
password=settings.FIRST_SUPERUSER_PASSWORD,
is_superuser=True,
)
user = crud.user.create(db, obj_in=user_in) # noqa: F841

View File

@@ -0,0 +1,7 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_engine(settings.SQLALCHEMY_DATABASE_URI, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

View File

@@ -0,0 +1,26 @@
<!doctype html><html xmlns="http://www.w3.org/1999/xhtml" xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office"><head><title></title><!--[if !mso]><!-- --><meta http-equiv="X-UA-Compatible" content="IE=edge"><!--<![endif]--><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><meta name="viewport" content="width=device-width,initial-scale=1"><style type="text/css">#outlook a { padding:0; }
.ReadMsgBody { width:100%; }
.ExternalClass { width:100%; }
.ExternalClass * { line-height:100%; }
body { margin:0;padding:0;-webkit-text-size-adjust:100%;-ms-text-size-adjust:100%; }
table, td { border-collapse:collapse;mso-table-lspace:0pt;mso-table-rspace:0pt; }
img { border:0;height:auto;line-height:100%; outline:none;text-decoration:none;-ms-interpolation-mode:bicubic; }
p { display:block;margin:13px 0; }</style><!--[if !mso]><!--><style type="text/css">@media only screen and (max-width:480px) {
@-ms-viewport { width:320px; }
@viewport { width:320px; }
}</style><!--<![endif]--><!--[if mso]>
<xml>
<o:OfficeDocumentSettings>
<o:AllowPNG/>
<o:PixelsPerInch>96</o:PixelsPerInch>
</o:OfficeDocumentSettings>
</xml>
<![endif]--><!--[if lte mso 11]>
<style type="text/css">
.outlook-group-fix { width:100% !important; }
</style>
<![endif]--><!--[if !mso]><!--><link href="https://fonts.googleapis.com/css?family=Ubuntu:300,400,500,700" rel="stylesheet" type="text/css"><style type="text/css">@import url(https://fonts.googleapis.com/css?family=Ubuntu:300,400,500,700);</style><!--<![endif]--><style type="text/css">@media only screen and (min-width:480px) {
.mj-column-per-100 { width:100% !important; max-width: 100%; }
}</style><style type="text/css"></style></head><body style="background-color:#ffffff;"><div style="background-color:#ffffff;"><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" class="" style="width:600px;" width="600" ><tr><td style="line-height:0px;font-size:0px;mso-line-height-rule:exactly;"><![endif]--><div style="Margin:0px auto;max-width:600px;"><table align="center" border="0" cellpadding="0" cellspacing="0" role="presentation" style="width:100%;"><tbody><tr><td style="direction:ltr;font-size:0px;padding:20px 0;text-align:center;vertical-align:top;"><!--[if mso | IE]><table role="presentation" border="0" cellpadding="0" cellspacing="0"><tr><td class="" style="vertical-align:top;width:600px;" ><![endif]--><div class="mj-column-per-100 outlook-group-fix" style="font-size:13px;text-align:left;direction:ltr;display:inline-block;vertical-align:top;width:100%;"><table border="0" cellpadding="0" cellspacing="0" role="presentation" style="vertical-align:top;" width="100%"><tr><td style="font-size:0px;padding:10px 25px;word-break:break-word;"><p style="border-top:solid 4px #555555;font-size:1;margin:0px auto;width:100%;"></p><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" style="border-top:solid 4px #555555;font-size:1;margin:0px auto;width:550px;" role="presentation" width="550px" ><tr><td style="height:0;line-height:0;"> &nbsp;
</td></tr></table><![endif]--></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:helvetica;font-size:20px;line-height:1;text-align:left;color:#555555;">{{ project_name }} - New Account</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">You have a new account:</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">Username: {{ username }}</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">Password: {{ password }}</div></td></tr><tr><td align="center" vertical-align="middle" style="font-size:0px;padding:50px 0px;word-break:break-word;"><table border="0" cellpadding="0" cellspacing="0" role="presentation" style="border-collapse:separate;line-height:100%;"><tr><td align="center" bgcolor="#414141" role="presentation" style="border:none;border-radius:3px;cursor:auto;padding:10px 25px;background:#414141;" valign="middle"><a href="{{ link }}" style="background:#414141;color:#ffffff;font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:13px;font-weight:normal;line-height:120%;Margin:0;text-decoration:none;text-transform:none;" target="_blank">Go to Dashboard</a></td></tr></table></td></tr><tr><td style="font-size:0px;padding:10px 25px;word-break:break-word;"><p style="border-top:solid 2px #555555;font-size:1;margin:0px auto;width:100%;"></p><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" style="border-top:solid 2px #555555;font-size:1;margin:0px auto;width:550px;" role="presentation" width="550px" ><tr><td style="height:0;line-height:0;"> &nbsp;
</td></tr></table><![endif]--></td></tr></table></div><!--[if mso | IE]></td></tr></table><![endif]--></td></tr></tbody></table></div><!--[if mso | IE]></td></tr></table><![endif]--></div></body></html>

View File

@@ -0,0 +1,26 @@
<!doctype html><html xmlns="http://www.w3.org/1999/xhtml" xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office"><head><title></title><!--[if !mso]><!-- --><meta http-equiv="X-UA-Compatible" content="IE=edge"><!--<![endif]--><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><meta name="viewport" content="width=device-width,initial-scale=1"><style type="text/css">#outlook a { padding:0; }
.ReadMsgBody { width:100%; }
.ExternalClass { width:100%; }
.ExternalClass * { line-height:100%; }
body { margin:0;padding:0;-webkit-text-size-adjust:100%;-ms-text-size-adjust:100%; }
table, td { border-collapse:collapse;mso-table-lspace:0pt;mso-table-rspace:0pt; }
img { border:0;height:auto;line-height:100%; outline:none;text-decoration:none;-ms-interpolation-mode:bicubic; }
p { display:block;margin:13px 0; }</style><!--[if !mso]><!--><style type="text/css">@media only screen and (max-width:480px) {
@-ms-viewport { width:320px; }
@viewport { width:320px; }
}</style><!--<![endif]--><!--[if mso]>
<xml>
<o:OfficeDocumentSettings>
<o:AllowPNG/>
<o:PixelsPerInch>96</o:PixelsPerInch>
</o:OfficeDocumentSettings>
</xml>
<![endif]--><!--[if lte mso 11]>
<style type="text/css">
.outlook-group-fix { width:100% !important; }
</style>
<![endif]--><!--[if !mso]><!--><link href="https://fonts.googleapis.com/css?family=Ubuntu:300,400,500,700" rel="stylesheet" type="text/css"><style type="text/css">@import url(https://fonts.googleapis.com/css?family=Ubuntu:300,400,500,700);</style><!--<![endif]--><style type="text/css">@media only screen and (min-width:480px) {
.mj-column-per-100 { width:100% !important; max-width: 100%; }
}</style><style type="text/css"></style></head><body style="background-color:#ffffff;"><div style="background-color:#ffffff;"><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" class="" style="width:600px;" width="600" ><tr><td style="line-height:0px;font-size:0px;mso-line-height-rule:exactly;"><![endif]--><div style="Margin:0px auto;max-width:600px;"><table align="center" border="0" cellpadding="0" cellspacing="0" role="presentation" style="width:100%;"><tbody><tr><td style="direction:ltr;font-size:0px;padding:20px 0;text-align:center;vertical-align:top;"><!--[if mso | IE]><table role="presentation" border="0" cellpadding="0" cellspacing="0"><tr><td class="" style="vertical-align:top;width:600px;" ><![endif]--><div class="mj-column-per-100 outlook-group-fix" style="font-size:13px;text-align:left;direction:ltr;display:inline-block;vertical-align:top;width:100%;"><table border="0" cellpadding="0" cellspacing="0" role="presentation" style="vertical-align:top;" width="100%"><tr><td style="font-size:0px;padding:10px 25px;word-break:break-word;"><p style="border-top:solid 4px #555555;font-size:1;margin:0px auto;width:100%;"></p><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" style="border-top:solid 4px #555555;font-size:1;margin:0px auto;width:550px;" role="presentation" width="550px" ><tr><td style="height:0;line-height:0;"> &nbsp;
</td></tr></table><![endif]--></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:helvetica;font-size:20px;line-height:1;text-align:left;color:#555555;">{{ project_name }} - Password Recovery</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">We received a request to recover the password for user {{ username }} with email {{ email }}</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">Reset your password by clicking the button below:</div></td></tr><tr><td align="center" vertical-align="middle" style="font-size:0px;padding:50px 0px;word-break:break-word;"><table border="0" cellpadding="0" cellspacing="0" role="presentation" style="border-collapse:separate;line-height:100%;"><tr><td align="center" bgcolor="#414141" role="presentation" style="border:none;border-radius:3px;cursor:auto;padding:10px 25px;background:#414141;" valign="middle"><a href="{{ link }}" style="background:#414141;color:#ffffff;font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:13px;font-weight:normal;line-height:120%;Margin:0;text-decoration:none;text-transform:none;" target="_blank">Reset Password</a></td></tr></table></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">Or open the following link:</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;"><a href="{{ link }}">{{ link }}</a></div></td></tr><tr><td style="font-size:0px;padding:10px 25px;word-break:break-word;"><p style="border-top:solid 2px #555555;font-size:1;margin:0px auto;width:100%;"></p><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" style="border-top:solid 2px #555555;font-size:1;margin:0px auto;width:550px;" role="presentation" width="550px" ><tr><td style="height:0;line-height:0;"> &nbsp;
</td></tr></table><![endif]--></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:14px;line-height:1;text-align:left;color:#555555;">The reset password link / button will expire in {{ valid_hours }} hours.</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:14px;line-height:1;text-align:left;color:#555555;">If you didn't request a password recovery you can disregard this email.</div></td></tr></table></div><!--[if mso | IE]></td></tr></table><![endif]--></td></tr></tbody></table></div><!--[if mso | IE]></td></tr></table><![endif]--></div></body></html>

View File

@@ -0,0 +1,25 @@
<!doctype html><html xmlns="http://www.w3.org/1999/xhtml" xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office"><head><title></title><!--[if !mso]><!-- --><meta http-equiv="X-UA-Compatible" content="IE=edge"><!--<![endif]--><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><meta name="viewport" content="width=device-width,initial-scale=1"><style type="text/css">#outlook a { padding:0; }
.ReadMsgBody { width:100%; }
.ExternalClass { width:100%; }
.ExternalClass * { line-height:100%; }
body { margin:0;padding:0;-webkit-text-size-adjust:100%;-ms-text-size-adjust:100%; }
table, td { border-collapse:collapse;mso-table-lspace:0pt;mso-table-rspace:0pt; }
img { border:0;height:auto;line-height:100%; outline:none;text-decoration:none;-ms-interpolation-mode:bicubic; }
p { display:block;margin:13px 0; }</style><!--[if !mso]><!--><style type="text/css">@media only screen and (max-width:480px) {
@-ms-viewport { width:320px; }
@viewport { width:320px; }
}</style><!--<![endif]--><!--[if mso]>
<xml>
<o:OfficeDocumentSettings>
<o:AllowPNG/>
<o:PixelsPerInch>96</o:PixelsPerInch>
</o:OfficeDocumentSettings>
</xml>
<![endif]--><!--[if lte mso 11]>
<style type="text/css">
.outlook-group-fix { width:100% !important; }
</style>
<![endif]--><!--[if !mso]><!--><link href="https://fonts.googleapis.com/css?family=Ubuntu:300,400,500,700" rel="stylesheet" type="text/css"><style type="text/css">@import url(https://fonts.googleapis.com/css?family=Ubuntu:300,400,500,700);</style><!--<![endif]--><style type="text/css">@media only screen and (min-width:480px) {
.mj-column-per-100 { width:100% !important; max-width: 100%; }
}</style><style type="text/css"></style></head><body style="background-color:#ffffff;"><div style="background-color:#ffffff;"><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" class="" style="width:600px;" width="600" ><tr><td style="line-height:0px;font-size:0px;mso-line-height-rule:exactly;"><![endif]--><div style="Margin:0px auto;max-width:600px;"><table align="center" border="0" cellpadding="0" cellspacing="0" role="presentation" style="width:100%;"><tbody><tr><td style="direction:ltr;font-size:0px;padding:20px 0;text-align:center;vertical-align:top;"><!--[if mso | IE]><table role="presentation" border="0" cellpadding="0" cellspacing="0"><tr><td class="" style="vertical-align:top;width:600px;" ><![endif]--><div class="mj-column-per-100 outlook-group-fix" style="font-size:13px;text-align:left;direction:ltr;display:inline-block;vertical-align:top;width:100%;"><table border="0" cellpadding="0" cellspacing="0" role="presentation" style="vertical-align:top;" width="100%"><tr><td style="font-size:0px;padding:10px 25px;word-break:break-word;"><p style="border-top:solid 4px #555555;font-size:1;margin:0px auto;width:100%;"></p><!--[if mso | IE]><table align="center" border="0" cellpadding="0" cellspacing="0" style="border-top:solid 4px #555555;font-size:1;margin:0px auto;width:550px;" role="presentation" width="550px" ><tr><td style="height:0;line-height:0;"> &nbsp;
</td></tr></table><![endif]--></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:helvetica;font-size:20px;line-height:1;text-align:left;color:#555555;">{{ project_name }}</div></td></tr><tr><td align="left" style="font-size:0px;padding:10px 25px;word-break:break-word;"><div style="font-family:Ubuntu, Helvetica, Arial, sans-serif;font-size:16px;line-height:1;text-align:left;color:#555555;">Test email for: {{ email }}</div></td></tr></table></div><!--[if mso | IE]></td></tr></table><![endif]--></td></tr></tbody></table></div><!--[if mso | IE]></td></tr></table><![endif]--></div></body></html>

View File

@@ -0,0 +1,15 @@
<mjml>
<mj-body background-color="#fff">
<mj-section>
<mj-column>
<mj-divider border-color="#555"></mj-divider>
<mj-text font-size="20px" color="#555" font-family="helvetica">{{ project_name }} - New Account</mj-text>
<mj-text font-size="16px" color="#555">You have a new account:</mj-text>
<mj-text font-size="16px" color="#555">Username: {{ username }}</mj-text>
<mj-text font-size="16px" color="#555">Password: {{ password }}</mj-text>
<mj-button padding="50px 0px" href="{{ link }}">Go to Dashboard</mj-button>
<mj-divider border-color="#555" border-width="2px" />
</mj-column>
</mj-section>
</mj-body>
</mjml>

View File

@@ -0,0 +1,19 @@
<mjml>
<mj-body background-color="#fff">
<mj-section>
<mj-column>
<mj-divider border-color="#555"></mj-divider>
<mj-text font-size="20px" color="#555" font-family="helvetica">{{ project_name }} - Password Recovery</mj-text>
<mj-text font-size="16px" color="#555">We received a request to recover the password for user {{ username }}
with email {{ email }}</mj-text>
<mj-text font-size="16px" color="#555">Reset your password by clicking the button below:</mj-text>
<mj-button padding="50px 0px" href="{{ link }}">Reset Password</mj-button>
<mj-text font-size="16px" color="#555">Or open the following link:</mj-text>
<mj-text font-size="16px" color="#555"><a href="{{ link }}">{{ link }}</a></mj-text>
<mj-divider border-color="#555" border-width="2px" />
<mj-text font-size="14px" color="#555">The reset password link / button will expire in {{ valid_hours }} hours.</mj-text>
<mj-text font-size="14px" color="#555">If you didn't request a password recovery you can disregard this email.</mj-text>
</mj-column>
</mj-section>
</mj-body>
</mjml>

View File

@@ -0,0 +1,11 @@
<mjml>
<mj-body background-color="#fff">
<mj-section>
<mj-column>
<mj-divider border-color="#555"></mj-divider>
<mj-text font-size="20px" color="#555" font-family="helvetica">{{ project_name }}</mj-text>
<mj-text font-size="16px" color="#555">Test email for: {{ email }}</mj-text>
</mj-column>
</mj-section>
</mj-body>
</mjml>

View File

@@ -0,0 +1,22 @@
import logging
from app.db.init_db import init_db
from app.db.session import SessionLocal
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def init() -> None:
db = SessionLocal()
init_db(db)
def main() -> None:
logger.info("Creating initial data")
init()
logger.info("Initial data created")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,21 @@
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from app.api.api_v1.api import api_router
from app.core.config import settings
app = FastAPI(
title=settings.PROJECT_NAME, openapi_url=f"{settings.API_V1_STR}/openapi.json"
)
# Set all CORS enabled origins
if settings.BACKEND_CORS_ORIGINS:
app.add_middleware(
CORSMiddleware,
allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router, prefix=settings.API_V1_STR)

View File

@@ -0,0 +1,2 @@
from .item import Item
from .user import User

View File

@@ -0,0 +1,17 @@
from typing import TYPE_CHECKING
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from app.db.base_class import Base
if TYPE_CHECKING:
from .user import User # noqa: F401
class Item(Base):
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("user.id"))
owner = relationship("User", back_populates="items")

View File

@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING
from sqlalchemy import Boolean, Column, Integer, String
from sqlalchemy.orm import relationship
from app.db.base_class import Base
if TYPE_CHECKING:
from .item import Item # noqa: F401
class User(Base):
id = Column(Integer, primary_key=True, index=True)
full_name = Column(String, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean(), default=True)
is_superuser = Column(Boolean(), default=False)
items = relationship("Item", back_populates="owner")

View File

@@ -0,0 +1,4 @@
from .item import Item, ItemCreate, ItemInDB, ItemUpdate
from .msg import Msg
from .token import Token, TokenPayload
from .user import User, UserCreate, UserInDB, UserUpdate

View File

@@ -0,0 +1,39 @@
from typing import Optional
from pydantic import BaseModel
# Shared properties
class ItemBase(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
# Properties to receive on item creation
class ItemCreate(ItemBase):
title: str
# Properties to receive on item update
class ItemUpdate(ItemBase):
pass
# Properties shared by models stored in DB
class ItemInDBBase(ItemBase):
id: int
title: str
owner_id: int
class Config:
orm_mode = True
# Properties to return to client
class Item(ItemInDBBase):
pass
# Properties properties stored in DB
class ItemInDB(ItemInDBBase):
pass

View File

@@ -0,0 +1,5 @@
from pydantic import BaseModel
class Msg(BaseModel):
msg: str

View File

@@ -0,0 +1,12 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

View File

@@ -0,0 +1,39 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
# Shared properties
class UserBase(BaseModel):
email: Optional[EmailStr] = None
is_active: Optional[bool] = True
is_superuser: bool = False
full_name: Optional[str] = None
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
password: str
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = None
class UserInDBBase(UserBase):
id: Optional[int] = None
class Config:
orm_mode = True
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str

1
src/backend/app/app/tests/.gitignore vendored Executable file
View File

@@ -0,0 +1 @@
.cache

View File

View File

@@ -0,0 +1,18 @@
from typing import Dict
from fastapi.testclient import TestClient
from app.core.config import settings
def test_celery_worker_test(
client: TestClient, superuser_token_headers: Dict[str, str]
) -> None:
data = {"msg": "test"}
r = client.post(
f"{settings.API_V1_STR}/utils/test-celery/",
json=data,
headers=superuser_token_headers,
)
response = r.json()
assert response["msg"] == "Word received"

View File

@@ -0,0 +1,35 @@
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.core.config import settings
from app.tests.utils.item import create_random_item
def test_create_item(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
data = {"title": "Foo", "description": "Fighters"}
response = client.post(
f"{settings.API_V1_STR}/items/", headers=superuser_token_headers, json=data,
)
assert response.status_code == 200
content = response.json()
assert content["title"] == data["title"]
assert content["description"] == data["description"]
assert "id" in content
assert "owner_id" in content
def test_read_item(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
item = create_random_item(db)
response = client.get(
f"{settings.API_V1_STR}/items/{item.id}", headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert content["title"] == item.title
assert content["description"] == item.description
assert content["id"] == item.id
assert content["owner_id"] == item.owner_id

View File

@@ -0,0 +1,28 @@
from typing import Dict
from fastapi.testclient import TestClient
from app.core.config import settings
def test_get_access_token(client: TestClient) -> None:
login_data = {
"username": settings.FIRST_SUPERUSER,
"password": settings.FIRST_SUPERUSER_PASSWORD,
}
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
tokens = r.json()
assert r.status_code == 200
assert "access_token" in tokens
assert tokens["access_token"]
def test_use_access_token(
client: TestClient, superuser_token_headers: Dict[str, str]
) -> None:
r = client.post(
f"{settings.API_V1_STR}/login/test-token", headers=superuser_token_headers,
)
result = r.json()
assert r.status_code == 200
assert "email" in result

View File

@@ -0,0 +1,115 @@
from typing import Dict
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app import crud
from app.core.config import settings
from app.schemas.user import UserCreate
from app.tests.utils.utils import random_email, random_lower_string
def test_get_users_superuser_me(
client: TestClient, superuser_token_headers: Dict[str, str]
) -> None:
r = client.get(f"{settings.API_V1_STR}/users/me", headers=superuser_token_headers)
current_user = r.json()
assert current_user
assert current_user["is_active"] is True
assert current_user["is_superuser"]
assert current_user["email"] == settings.FIRST_SUPERUSER
def test_get_users_normal_user_me(
client: TestClient, normal_user_token_headers: Dict[str, str]
) -> None:
r = client.get(f"{settings.API_V1_STR}/users/me", headers=normal_user_token_headers)
current_user = r.json()
assert current_user
assert current_user["is_active"] is True
assert current_user["is_superuser"] is False
assert current_user["email"] == settings.EMAIL_TEST_USER
def test_create_user_new_email(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
username = random_email()
password = random_lower_string()
data = {"email": username, "password": password}
r = client.post(
f"{settings.API_V1_STR}/users/", headers=superuser_token_headers, json=data,
)
assert 200 <= r.status_code < 300
created_user = r.json()
user = crud.user.get_by_email(db, email=username)
assert user
assert user.email == created_user["email"]
def test_get_existing_user(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = crud.user.create(db, obj_in=user_in)
user_id = user.id
r = client.get(
f"{settings.API_V1_STR}/users/{user_id}", headers=superuser_token_headers,
)
assert 200 <= r.status_code < 300
api_user = r.json()
existing_user = crud.user.get_by_email(db, email=username)
assert existing_user
assert existing_user.email == api_user["email"]
def test_create_user_existing_username(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
username = random_email()
# username = email
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
crud.user.create(db, obj_in=user_in)
data = {"email": username, "password": password}
r = client.post(
f"{settings.API_V1_STR}/users/", headers=superuser_token_headers, json=data,
)
created_user = r.json()
assert r.status_code == 400
assert "_id" not in created_user
def test_create_user_by_normal_user(
client: TestClient, normal_user_token_headers: Dict[str, str]
) -> None:
username = random_email()
password = random_lower_string()
data = {"email": username, "password": password}
r = client.post(
f"{settings.API_V1_STR}/users/", headers=normal_user_token_headers, json=data,
)
assert r.status_code == 400
def test_retrieve_users(
client: TestClient, superuser_token_headers: dict, db: Session
) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
crud.user.create(db, obj_in=user_in)
username2 = random_email()
password2 = random_lower_string()
user_in2 = UserCreate(email=username2, password=password2)
crud.user.create(db, obj_in=user_in2)
r = client.get(f"{settings.API_V1_STR}/users/", headers=superuser_token_headers)
all_users = r.json()
assert len(all_users) > 1
for item in all_users:
assert "email" in item

View File

@@ -0,0 +1,34 @@
from typing import Dict, Generator
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.core.config import settings
from app.db.session import SessionLocal
from app.main import app
from app.tests.utils.user import authentication_token_from_email
from app.tests.utils.utils import get_superuser_token_headers
@pytest.fixture(scope="session")
def db() -> Generator:
yield SessionLocal()
@pytest.fixture(scope="module")
def client() -> Generator:
with TestClient(app) as c:
yield c
@pytest.fixture(scope="module")
def superuser_token_headers(client: TestClient) -> Dict[str, str]:
return get_superuser_token_headers(client)
@pytest.fixture(scope="module")
def normal_user_token_headers(client: TestClient, db: Session) -> Dict[str, str]:
return authentication_token_from_email(
client=client, email=settings.EMAIL_TEST_USER, db=db
)

View File

@@ -0,0 +1,61 @@
from sqlalchemy.orm import Session
from app import crud
from app.schemas.item import ItemCreate, ItemUpdate
from app.tests.utils.user import create_random_user
from app.tests.utils.utils import random_lower_string
def test_create_item(db: Session) -> None:
title = random_lower_string()
description = random_lower_string()
item_in = ItemCreate(title=title, description=description)
user = create_random_user(db)
item = crud.item.create_with_owner(db=db, obj_in=item_in, owner_id=user.id)
assert item.title == title
assert item.description == description
assert item.owner_id == user.id
def test_get_item(db: Session) -> None:
title = random_lower_string()
description = random_lower_string()
item_in = ItemCreate(title=title, description=description)
user = create_random_user(db)
item = crud.item.create_with_owner(db=db, obj_in=item_in, owner_id=user.id)
stored_item = crud.item.get(db=db, id=item.id)
assert stored_item
assert item.id == stored_item.id
assert item.title == stored_item.title
assert item.description == stored_item.description
assert item.owner_id == stored_item.owner_id
def test_update_item(db: Session) -> None:
title = random_lower_string()
description = random_lower_string()
item_in = ItemCreate(title=title, description=description)
user = create_random_user(db)
item = crud.item.create_with_owner(db=db, obj_in=item_in, owner_id=user.id)
description2 = random_lower_string()
item_update = ItemUpdate(description=description2)
item2 = crud.item.update(db=db, db_obj=item, obj_in=item_update)
assert item.id == item2.id
assert item.title == item2.title
assert item2.description == description2
assert item.owner_id == item2.owner_id
def test_delete_item(db: Session) -> None:
title = random_lower_string()
description = random_lower_string()
item_in = ItemCreate(title=title, description=description)
user = create_random_user(db)
item = crud.item.create_with_owner(db=db, obj_in=item_in, owner_id=user.id)
item2 = crud.item.remove(db=db, id=item.id)
item3 = crud.item.get(db=db, id=item.id)
assert item3 is None
assert item2.id == item.id
assert item2.title == title
assert item2.description == description
assert item2.owner_id == user.id

View File

@@ -0,0 +1,94 @@
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from app import crud
from app.core.security import verify_password
from app.schemas.user import UserCreate, UserUpdate
from app.tests.utils.utils import random_email, random_lower_string
def test_create_user(db: Session) -> None:
email = random_email()
password = random_lower_string()
user_in = UserCreate(email=email, password=password)
user = crud.user.create(db, obj_in=user_in)
assert user.email == email
assert hasattr(user, "hashed_password")
def test_authenticate_user(db: Session) -> None:
email = random_email()
password = random_lower_string()
user_in = UserCreate(email=email, password=password)
user = crud.user.create(db, obj_in=user_in)
authenticated_user = crud.user.authenticate(db, email=email, password=password)
assert authenticated_user
assert user.email == authenticated_user.email
def test_not_authenticate_user(db: Session) -> None:
email = random_email()
password = random_lower_string()
user = crud.user.authenticate(db, email=email, password=password)
assert user is None
def test_check_if_user_is_active(db: Session) -> None:
email = random_email()
password = random_lower_string()
user_in = UserCreate(email=email, password=password)
user = crud.user.create(db, obj_in=user_in)
is_active = crud.user.is_active(user)
assert is_active is True
def test_check_if_user_is_active_inactive(db: Session) -> None:
email = random_email()
password = random_lower_string()
user_in = UserCreate(email=email, password=password, disabled=True)
user = crud.user.create(db, obj_in=user_in)
is_active = crud.user.is_active(user)
assert is_active
def test_check_if_user_is_superuser(db: Session) -> None:
email = random_email()
password = random_lower_string()
user_in = UserCreate(email=email, password=password, is_superuser=True)
user = crud.user.create(db, obj_in=user_in)
is_superuser = crud.user.is_superuser(user)
assert is_superuser is True
def test_check_if_user_is_superuser_normal_user(db: Session) -> None:
username = random_email()
password = random_lower_string()
user_in = UserCreate(email=username, password=password)
user = crud.user.create(db, obj_in=user_in)
is_superuser = crud.user.is_superuser(user)
assert is_superuser is False
def test_get_user(db: Session) -> None:
password = random_lower_string()
username = random_email()
user_in = UserCreate(email=username, password=password, is_superuser=True)
user = crud.user.create(db, obj_in=user_in)
user_2 = crud.user.get(db, id=user.id)
assert user_2
assert user.email == user_2.email
assert jsonable_encoder(user) == jsonable_encoder(user_2)
def test_update_user(db: Session) -> None:
password = random_lower_string()
email = random_email()
user_in = UserCreate(email=email, password=password, is_superuser=True)
user = crud.user.create(db, obj_in=user_in)
new_password = random_lower_string()
user_in_update = UserUpdate(password=new_password, is_superuser=True)
crud.user.update(db, db_obj=user, obj_in=user_in_update)
user_2 = crud.user.get(db, id=user.id)
assert user_2
assert user.email == user_2.email
assert verify_password(new_password, user_2.hashed_password)

View File

@@ -0,0 +1,18 @@
from typing import Optional
from sqlalchemy.orm import Session
from app import crud, models
from app.schemas.item import ItemCreate
from app.tests.utils.user import create_random_user
from app.tests.utils.utils import random_lower_string
def create_random_item(db: Session, *, owner_id: Optional[int] = None) -> models.Item:
if owner_id is None:
user = create_random_user(db)
owner_id = user.id
title = random_lower_string()
description = random_lower_string()
item_in = ItemCreate(title=title, description=description, id=id)
return crud.item.create_with_owner(db=db, obj_in=item_in, owner_id=owner_id)

View File

@@ -0,0 +1,50 @@
from typing import Dict
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app import crud
from app.core.config import settings
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.tests.utils.utils import random_email, random_lower_string
def user_authentication_headers(
*, client: TestClient, email: str, password: str
) -> Dict[str, str]:
data = {"username": email, "password": password}
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=data)
response = r.json()
auth_token = response["access_token"]
headers = {"Authorization": f"Bearer {auth_token}"}
return headers
def create_random_user(db: Session) -> User:
email = random_email()
password = random_lower_string()
user_in = UserCreate(username=email, email=email, password=password)
user = crud.user.create(db=db, obj_in=user_in)
return user
def authentication_token_from_email(
*, client: TestClient, email: str, db: Session
) -> Dict[str, str]:
"""
Return a valid token for the user with given email.
If the user doesn't exist it is created first.
"""
password = random_lower_string()
user = crud.user.get_by_email(db, email=email)
if not user:
user_in_create = UserCreate(username=email, email=email, password=password)
user = crud.user.create(db, obj_in=user_in_create)
else:
user_in_update = UserUpdate(password=password)
user = crud.user.update(db, db_obj=user, obj_in=user_in_update)
return user_authentication_headers(client=client, email=email, password=password)

View File

@@ -0,0 +1,27 @@
import random
import string
from typing import Dict
from fastapi.testclient import TestClient
from app.core.config import settings
def random_lower_string() -> str:
return "".join(random.choices(string.ascii_lowercase, k=32))
def random_email() -> str:
return f"{random_lower_string()}@{random_lower_string()}.com"
def get_superuser_token_headers(client: TestClient) -> Dict[str, str]:
login_data = {
"username": settings.FIRST_SUPERUSER,
"password": settings.FIRST_SUPERUSER_PASSWORD,
}
r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
tokens = r.json()
a_token = tokens["access_token"]
headers = {"Authorization": f"Bearer {a_token}"}
return headers

View File

@@ -0,0 +1,37 @@
import logging
from tenacity import after_log, before_log, retry, stop_after_attempt, wait_fixed
from app.db.session import SessionLocal
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
max_tries = 60 * 5 # 5 minutes
wait_seconds = 1
@retry(
stop=stop_after_attempt(max_tries),
wait=wait_fixed(wait_seconds),
before=before_log(logger, logging.INFO),
after=after_log(logger, logging.WARN),
)
def init() -> None:
try:
# Try to create session to check if DB is awake
db = SessionLocal()
db.execute("SELECT 1")
except Exception as e:
logger.error(e)
raise e
def main() -> None:
logger.info("Initializing service")
init()
logger.info("Service finished initializing")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,106 @@
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Optional
import emails
from emails.template import JinjaTemplate
from jose import jwt
from app.core.config import settings
def send_email(
email_to: str,
subject_template: str = "",
html_template: str = "",
environment: Dict[str, Any] = {},
) -> None:
assert settings.EMAILS_ENABLED, "no provided configuration for email variables"
message = emails.Message(
subject=JinjaTemplate(subject_template),
html=JinjaTemplate(html_template),
mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
)
smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
if settings.SMTP_TLS:
smtp_options["tls"] = True
if settings.SMTP_USER:
smtp_options["user"] = settings.SMTP_USER
if settings.SMTP_PASSWORD:
smtp_options["password"] = settings.SMTP_PASSWORD
response = message.send(to=email_to, render=environment, smtp=smtp_options)
logging.info(f"send email result: {response}")
def send_test_email(email_to: str) -> None:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Test email"
with open(Path(settings.EMAIL_TEMPLATES_DIR) / "test_email.html") as f:
template_str = f.read()
send_email(
email_to=email_to,
subject_template=subject,
html_template=template_str,
environment={"project_name": settings.PROJECT_NAME, "email": email_to},
)
def send_reset_password_email(email_to: str, email: str, token: str) -> None:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Password recovery for user {email}"
with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
template_str = f.read()
server_host = settings.SERVER_HOST
link = f"{server_host}/reset-password?token={token}"
send_email(
email_to=email_to,
subject_template=subject,
html_template=template_str,
environment={
"project_name": settings.PROJECT_NAME,
"username": email,
"email": email_to,
"valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
"link": link,
},
)
def send_new_account_email(email_to: str, username: str, password: str) -> None:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - New account for user {username}"
with open(Path(settings.EMAIL_TEMPLATES_DIR) / "new_account.html") as f:
template_str = f.read()
link = settings.SERVER_HOST
send_email(
email_to=email_to,
subject_template=subject,
html_template=template_str,
environment={
"project_name": settings.PROJECT_NAME,
"username": username,
"password": password,
"email": email_to,
"link": link,
},
)
def generate_password_reset_token(email: str) -> str:
delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
now = datetime.utcnow()
expires = now + delta
exp = expires.timestamp()
encoded_jwt = jwt.encode(
{"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",
)
return encoded_jwt
def verify_password_reset_token(token: str) -> Optional[str]:
try:
decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
return decoded_token["email"]
except jwt.JWTError:
return None

View File

@@ -0,0 +1,11 @@
from raven import Client
from app.core.celery_app import celery_app
from app.core.config import settings
client_sentry = Client(settings.SENTRY_DSN)
@celery_app.task(acks_late=True)
def test_celery(word: str) -> str:
return f"test task return {word}"

4
src/backend/app/mypy.ini Normal file
View File

@@ -0,0 +1,4 @@
[mypy]
plugins = pydantic.mypy, sqlmypy
ignore_missing_imports = True
disallow_untyped_defs = True

View File

@@ -0,0 +1,10 @@
#! /usr/bin/env bash
# Let the DB start
python /app/app/backend_pre_start.py
# Run migrations
alembic upgrade head
# Create initial data in DB
python /app/app/initial_data.py

View File

@@ -0,0 +1,46 @@
[tool.poetry]
name = "app"
version = "0.1.0"
description = ""
authors = ["Admin <admin@example.com>"]
[tool.poetry.dependencies]
python = "^3.8"
uvicorn = ">=0.24.0.post1"
fastapi = "^0.54.1"
python-multipart = "^0.0.5"
email-validator = "^1.0.5"
requests = "^2.23.0"
celery = "^4.4.2"
passlib = {extras = ["bcrypt"], version = "^1.7.2"}
tenacity = "^6.1.0"
pydantic = "^1.4"
emails = "^0.5.15"
raven = "^6.10.0"
gunicorn = "^20.0.4"
jinja2 = "^2.11.2"
psycopg2-binary = "^2.8.5"
alembic = "^1.4.2"
sqlalchemy = "^1.3.16"
pytest = "^5.4.1"
python-jose = {extras = ["cryptography"], version = "^3.1.0"}
[tool.poetry.dev-dependencies]
mypy = ">=1.7.0"
black = ">=23.11.0"
isort = "^4.3.21"
autoflake = "^1.3.1"
flake8 = "^3.7.9"
pytest = "^5.4.1"
sqlalchemy-stubs = "^0.3"
pytest-cov = "^2.8.1"
[tool.isort]
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
line_length = 88
[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"

View File

@@ -0,0 +1,6 @@
#!/bin/sh -e
set -x
# Sort imports one per line, so autoflake can remove unused imports
isort --recursive --force-single-line-imports --apply app
sh ./scripts/format.sh

View File

@@ -0,0 +1,6 @@
#!/bin/sh -e
set -x
autoflake --remove-all-unused-imports --recursive --remove-unused-variables --in-place app --exclude=__init__.py
black app
isort --recursive --apply app

View File

@@ -0,0 +1,8 @@
#!/usr/bin/env bash
set -x
mypy app
black app --check
isort --recursive --check-only app
flake8

View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
set -e
set -x
bash scripts/test.sh --cov-report=html "${@}"

View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
set -e
set -x
pytest --cov=app --cov-report=term-missing app/tests "${@}"

View File

@@ -0,0 +1,6 @@
#! /usr/bin/env bash
set -e
python /app/app/tests_pre_start.py
bash ./scripts/test.sh "$@"

View File

@@ -0,0 +1,6 @@
#! /usr/bin/env bash
set -e
python /app/app/celeryworker_pre_start.py
celery worker -A app.worker -l info -Q main-queue -c 1