Use Pydantic BaseSettings for config settings (#87)

* Use Pydantic BaseSettings for config settings

* Update fastapi dep to >=0.47.0 and email_validator to email-validator

* Fix deprecation warning for Pydantic >=1.0

* Properly support old-format comma separated strings for BACKEND_CORS_ORIGINS

Co-authored-by: Sebastián Ramírez <tiangolo@gmail.com>
This commit is contained in:
Stephen Brown II
2020-04-16 23:56:10 -06:00
committed by GitHub
parent cd875e5bef
commit 79631c7619
24 changed files with 163 additions and 130 deletions

View File

@@ -10,7 +10,7 @@
"secret_key": "changethis", "secret_key": "changethis",
"first_superuser": "admin@{{cookiecutter.domain_main}}", "first_superuser": "admin@{{cookiecutter.domain_main}}",
"first_superuser_password": "changethis", "first_superuser_password": "changethis",
"backend_cors_origins": "http://localhost, http://localhost:4200, http://localhost:3000, http://localhost:8080, https://localhost, https://localhost:4200, https://localhost:3000, https://localhost:8080, http://dev.{{cookiecutter.domain_main}}, https://{{cookiecutter.domain_staging}}, https://{{cookiecutter.domain_main}}, http://local.dockertoolbox.tiangolo.com, http://localhost.tiangolo.com", "backend_cors_origins": "[\"http://localhost\", \"http://localhost:4200\", \"http://localhost:3000\", \"http://localhost:8080\", \"https://localhost\", \"https://localhost:4200\", \"https://localhost:3000\", \"https://localhost:8080\", \"http://dev.{{cookiecutter.domain_main}}\", \"https://{{cookiecutter.domain_staging}}\", \"https://{{cookiecutter.domain_main}}\", \"http://local.dockertoolbox.tiangolo.com\", \"http://localhost.tiangolo.com\"]",
"smtp_port": "587", "smtp_port": "587",
"smtp_host": "", "smtp_host": "",
"smtp_user": "", "smtp_user": "",

View File

@@ -7,7 +7,7 @@ from sqlalchemy.orm import Session
from app import crud from app import crud
from app.api.utils.db import get_db from app.api.utils.db import get_db
from app.api.utils.security import get_current_user from app.api.utils.security import get_current_user
from app.core import config from app.core.config import settings
from app.core.jwt import create_access_token from app.core.jwt import create_access_token
from app.core.security import get_password_hash from app.core.security import get_password_hash
from app.models.user import User as DBUser from app.models.user import User as DBUser
@@ -37,7 +37,7 @@ def login_access_token(
raise HTTPException(status_code=400, detail="Incorrect email or password") raise HTTPException(status_code=400, detail="Incorrect email or password")
elif not crud.user.is_active(user): elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user") raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return { return {
"access_token": create_access_token( "access_token": create_access_token(
data={"user_id": user.id}, expires_delta=access_token_expires data={"user_id": user.id}, expires_delta=access_token_expires

View File

@@ -8,7 +8,7 @@ from sqlalchemy.orm import Session
from app import crud from app import crud
from app.api.utils.db import get_db from app.api.utils.db import get_db
from app.api.utils.security import get_current_active_superuser, get_current_active_user from app.api.utils.security import get_current_active_superuser, get_current_active_user
from app.core import config from app.core.config import settings
from app.models.user import User as DBUser from app.models.user import User as DBUser
from app.schemas.user import User, UserCreate, UserUpdate from app.schemas.user import User, UserCreate, UserUpdate
from app.utils import send_new_account_email from app.utils import send_new_account_email
@@ -47,7 +47,7 @@ def create_user(
detail="The user with this username already exists in the system.", detail="The user with this username already exists in the system.",
) )
user = crud.user.create(db, obj_in=user_in) user = crud.user.create(db, obj_in=user_in)
if config.EMAILS_ENABLED and user_in.email: if settings.EMAILS_ENABLED and user_in.email:
send_new_account_email( send_new_account_email(
email_to=user_in.email, username=user_in.email, password=user_in.password email_to=user_in.email, username=user_in.email, password=user_in.password
) )
@@ -100,7 +100,7 @@ def create_user_open(
""" """
Create new user without the need to be logged in. Create new user without the need to be logged in.
""" """
if not config.USERS_OPEN_REGISTRATION: if not settings.USERS_OPEN_REGISTRATION:
raise HTTPException( raise HTTPException(
status_code=403, status_code=403,
detail="Open user registration is forbidden on this server", detail="Open user registration is forbidden on this server",

View File

@@ -7,19 +7,19 @@ from starlette.status import HTTP_403_FORBIDDEN
from app import crud from app import crud
from app.api.utils.db import get_db from app.api.utils.db import get_db
from app.core import config from app.core.config import settings
from app.core.jwt import ALGORITHM from app.core.jwt import ALGORITHM
from app.models.user import User from app.models.user import User
from app.schemas.token import TokenPayload from app.schemas.token import TokenPayload
reusable_oauth2 = OAuth2PasswordBearer(tokenUrl="/api/v1/login/access-token") reusable_oauth2 = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/login/access-token")
def get_current_user( def get_current_user(
db: Session = Depends(get_db), token: str = Security(reusable_oauth2) db: Session = Depends(get_db), token: str = Security(reusable_oauth2)
): ):
try: try:
payload = jwt.decode(token, config.SECRET_KEY, algorithms=[ALGORITHM]) payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
token_data = TokenPayload(**payload) token_data = TokenPayload(**payload)
except PyJWTError: except PyJWTError:
raise HTTPException( raise HTTPException(

View File

@@ -1,55 +1,92 @@
import os import os
import secrets
from typing import List
from pydantic import AnyHttpUrl, BaseSettings, EmailStr, HttpUrl, PostgresDsn, validator
def getenv_boolean(var_name, default_value=False): class Settings(BaseSettings):
result = default_value
env_value = os.getenv(var_name)
if env_value is not None:
result = env_value.upper() in ("TRUE", "1")
return result
API_V1_STR: str = "/api/v1"
API_V1_STR = "/api/v1" SECRET_KEY: str = secrets.token_urlsafe(32)
SECRET_KEY = os.getenvb(b"SECRET_KEY") ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8 # 60 minutes * 24 hours * 8 days = 8 days
if not SECRET_KEY:
SECRET_KEY = os.urandom(32)
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 8 # 60 minutes * 24 hours * 8 days = 8 days SERVER_NAME: str
SERVER_HOST: AnyHttpUrl
# BACKEND_CORS_ORIGINS is a JSON-formatted list of origins
# e.g: '["http://localhost", "http://localhost:4200", "http://localhost:3000", \
# "http://localhost:8080", "http://local.dockertoolbox.tiangolo.com"]'
BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = []
SERVER_NAME = os.getenv("SERVER_NAME") @validator("BACKEND_CORS_ORIGINS", pre=True)
SERVER_HOST = os.getenv("SERVER_HOST") def assemble_cors_origins(cls, v):
BACKEND_CORS_ORIGINS = os.getenv( if isinstance(v, str) and not v.startswith("["):
"BACKEND_CORS_ORIGINS" return [i.strip() for i in v.split(",")]
) # a string of origins separated by commas, e.g: "http://localhost, http://localhost:4200, http://localhost:3000, http://localhost:8080, http://local.dockertoolbox.tiangolo.com" return v
PROJECT_NAME = os.getenv("PROJECT_NAME")
SENTRY_DSN = os.getenv("SENTRY_DSN")
POSTGRES_SERVER = os.getenv("POSTGRES_SERVER") PROJECT_NAME: str
POSTGRES_USER = os.getenv("POSTGRES_USER") SENTRY_DSN: HttpUrl = None
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_DB = os.getenv("POSTGRES_DB")
SQLALCHEMY_DATABASE_URI = (
f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}/{POSTGRES_DB}"
)
SMTP_TLS = getenv_boolean("SMTP_TLS", True) @validator("SENTRY_DSN", pre=True)
SMTP_PORT = None def sentry_dsn_can_be_blank(cls, v):
_SMTP_PORT = os.getenv("SMTP_PORT") if len(v) == 0:
if _SMTP_PORT is not None: return None
SMTP_PORT = int(_SMTP_PORT) return v
SMTP_HOST = os.getenv("SMTP_HOST")
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
EMAILS_FROM_EMAIL = os.getenv("EMAILS_FROM_EMAIL")
EMAILS_FROM_NAME = PROJECT_NAME
EMAIL_RESET_TOKEN_EXPIRE_HOURS = 48
EMAIL_TEMPLATES_DIR = "/app/app/email-templates/build"
EMAILS_ENABLED = SMTP_HOST and SMTP_PORT and EMAILS_FROM_EMAIL
FIRST_SUPERUSER = os.getenv("FIRST_SUPERUSER") POSTGRES_SERVER: str
FIRST_SUPERUSER_PASSWORD = os.getenv("FIRST_SUPERUSER_PASSWORD") POSTGRES_USER: str
POSTGRES_PASSWORD: str
POSTGRES_DB: str
SQLALCHEMY_DATABASE_URI: PostgresDsn = None
USERS_OPEN_REGISTRATION = getenv_boolean("USERS_OPEN_REGISTRATION") @validator("SQLALCHEMY_DATABASE_URI", pre=True)
def assemble_db_connection(cls, v, values):
if isinstance(v, str):
return v
return PostgresDsn.build(
scheme="postgresql",
user=values.get("POSTGRES_USER"),
password=values.get("POSTGRES_PASSWORD"),
host=values.get("POSTGRES_SERVER"),
path=f"/{values.get('POSTGRES_DB') or ''}",
)
EMAIL_TEST_USER = "test@example.com" SMTP_TLS: bool = True
SMTP_PORT: int = None
SMTP_HOST: str = None
SMTP_USER: str = None
SMTP_PASSWORD: str = None
EMAILS_FROM_EMAIL: EmailStr = None
EMAILS_FROM_NAME: str = None
@validator("EMAILS_FROM_NAME")
def get_project_name(cls, v, values):
if not v:
return values["PROJECT_NAME"]
return v
EMAIL_RESET_TOKEN_EXPIRE_HOURS: int = 48
EMAIL_TEMPLATES_DIR: str = "/app/app/email-templates/build"
EMAILS_ENABLED: bool = False
@validator("EMAILS_ENABLED", pre=True)
def get_emails_enabled(cls, v, values):
return bool(
values.get("SMTP_HOST")
and values.get("SMTP_PORT")
and values.get("EMAILS_FROM_EMAIL")
)
EMAIL_TEST_USER: EmailStr = "test@example.com"
FIRST_SUPERUSER: EmailStr
FIRST_SUPERUSER_PASSWORD: str
USERS_OPEN_REGISTRATION: bool = False
class Config:
case_sensitive = True
settings = Settings()

View File

@@ -2,7 +2,7 @@ from datetime import datetime, timedelta
import jwt import jwt
from app.core import config from app.core.config import settings
ALGORITHM = "HS256" ALGORITHM = "HS256"
access_token_jwt_subject = "access" access_token_jwt_subject = "access"
@@ -15,5 +15,5 @@ def create_access_token(*, data: dict, expires_delta: timedelta = None):
else: else:
expire = datetime.utcnow() + timedelta(minutes=15) expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire, "sub": access_token_jwt_subject}) to_encode.update({"exp": expire, "sub": access_token_jwt_subject})
encoded_jwt = jwt.encode(to_encode, config.SECRET_KEY, algorithm=ALGORITHM) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt return encoded_jwt

View File

@@ -41,7 +41,7 @@ class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
self, db_session: Session, *, db_obj: ModelType, obj_in: UpdateSchemaType self, db_session: Session, *, db_obj: ModelType, obj_in: UpdateSchemaType
) -> ModelType: ) -> ModelType:
obj_data = jsonable_encoder(db_obj) obj_data = jsonable_encoder(db_obj)
update_data = obj_in.dict(skip_defaults=True) update_data = obj_in.dict(exclude_unset=True)
for field in obj_data: for field in obj_data:
if field in update_data: if field in update_data:
setattr(db_obj, field, update_data[field]) setattr(db_obj, field, update_data[field])

View File

@@ -1,5 +1,5 @@
from app import crud from app import crud
from app.core import config from app.core.config import settings
from app.schemas.user import UserCreate from app.schemas.user import UserCreate
# make sure all SQL Alchemy models are imported before initializing DB # make sure all SQL Alchemy models are imported before initializing DB
@@ -14,11 +14,11 @@ def init_db(db_session):
# the tables un-commenting the next line # the tables un-commenting the next line
# Base.metadata.create_all(bind=engine) # Base.metadata.create_all(bind=engine)
user = crud.user.get_by_email(db_session, email=config.FIRST_SUPERUSER) user = crud.user.get_by_email(db_session, email=settings.FIRST_SUPERUSER)
if not user: if not user:
user_in = UserCreate( user_in = UserCreate(
email=config.FIRST_SUPERUSER, email=settings.FIRST_SUPERUSER,
password=config.FIRST_SUPERUSER_PASSWORD, password=settings.FIRST_SUPERUSER_PASSWORD,
is_superuser=True, is_superuser=True,
) )
user = crud.user.create(db_session, obj_in=user_in) user = crud.user.create(db_session, obj_in=user_in)

View File

@@ -1,9 +1,9 @@
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.orm import scoped_session, sessionmaker
from app.core import config from app.core.config import settings
engine = create_engine(config.SQLALCHEMY_DATABASE_URI, pool_pre_ping=True) engine = create_engine(settings.SQLALCHEMY_DATABASE_URI, pool_pre_ping=True)
db_session = scoped_session( db_session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=engine) sessionmaker(autocommit=False, autoflush=False, bind=engine)
) )

View File

@@ -3,29 +3,22 @@ from starlette.middleware.cors import CORSMiddleware
from starlette.requests import Request from starlette.requests import Request
from app.api.api_v1.api import api_router from app.api.api_v1.api import api_router
from app.core import config from app.core.config import settings
from app.db.session import Session from app.db.session import Session
app = FastAPI(title=config.PROJECT_NAME, openapi_url="/api/v1/openapi.json") app = FastAPI(title=settings.PROJECT_NAME, openapi_url=f"{settings.API_V1_STR}/openapi.json")
# CORS
origins = []
# Set all CORS enabled origins # Set all CORS enabled origins
if config.BACKEND_CORS_ORIGINS: if settings.BACKEND_CORS_ORIGINS:
origins_raw = config.BACKEND_CORS_ORIGINS.split(",")
for origin in origins_raw:
use_origin = origin.strip()
origins.append(use_origin)
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=origins, allow_origins=[str(origin) for origin in settings.BACKEND_CORS_ORIGINS],
allow_credentials=True, allow_credentials=True,
allow_methods=["*"], allow_methods=["*"],
allow_headers=["*"], allow_headers=["*"],
), ),
app.include_router(api_router, prefix=config.API_V1_STR) app.include_router(api_router, prefix=settings.API_V1_STR)
@app.middleware("http") @app.middleware("http")

View File

@@ -1,6 +1,6 @@
import requests import requests
from app.core import config from app.core.config import settings
from app.tests.utils.utils import get_server_api from app.tests.utils.utils import get_server_api
@@ -8,7 +8,7 @@ def test_celery_worker_test(superuser_token_headers):
server_api = get_server_api() server_api = get_server_api()
data = {"msg": "test"} data = {"msg": "test"}
r = requests.post( r = requests.post(
f"{server_api}{config.API_V1_STR}/utils/test-celery/", f"{server_api}{settings.API_V1_STR}/utils/test-celery/",
json=data, json=data,
headers=superuser_token_headers, headers=superuser_token_headers,
) )

View File

@@ -1,6 +1,6 @@
import requests import requests
from app.core import config from app.core.config import settings
from app.tests.utils.item import create_random_item from app.tests.utils.item import create_random_item
from app.tests.utils.utils import get_server_api from app.tests.utils.utils import get_server_api
from app.tests.utils.user import create_random_user from app.tests.utils.user import create_random_user
@@ -10,7 +10,7 @@ def test_create_item(superuser_token_headers):
server_api = get_server_api() server_api = get_server_api()
data = {"title": "Foo", "description": "Fighters"} data = {"title": "Foo", "description": "Fighters"}
response = requests.post( response = requests.post(
f"{server_api}{config.API_V1_STR}/items/", f"{server_api}{settings.API_V1_STR}/items/",
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
@@ -26,7 +26,7 @@ def test_read_item(superuser_token_headers):
item = create_random_item() item = create_random_item()
server_api = get_server_api() server_api = get_server_api()
response = requests.get( response = requests.get(
f"{server_api}{config.API_V1_STR}/items/{item.id}", f"{server_api}{settings.API_V1_STR}/items/{item.id}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert response.status_code == 200 assert response.status_code == 200

View File

@@ -1,17 +1,17 @@
import requests import requests
from app.core import config from app.core.config import settings
from app.tests.utils.utils import get_server_api from app.tests.utils.utils import get_server_api
def test_get_access_token(): def test_get_access_token():
server_api = get_server_api() server_api = get_server_api()
login_data = { login_data = {
"username": config.FIRST_SUPERUSER, "username": settings.FIRST_SUPERUSER,
"password": config.FIRST_SUPERUSER_PASSWORD, "password": settings.FIRST_SUPERUSER_PASSWORD,
} }
r = requests.post( r = requests.post(
f"{server_api}{config.API_V1_STR}/login/access-token", data=login_data f"{server_api}{settings.API_V1_STR}/login/access-token", data=login_data
) )
tokens = r.json() tokens = r.json()
assert r.status_code == 200 assert r.status_code == 200
@@ -22,7 +22,7 @@ def test_get_access_token():
def test_use_access_token(superuser_token_headers): def test_use_access_token(superuser_token_headers):
server_api = get_server_api() server_api = get_server_api()
r = requests.post( r = requests.post(
f"{server_api}{config.API_V1_STR}/login/test-token", f"{server_api}{settings.API_V1_STR}/login/test-token",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
result = r.json() result = r.json()

View File

@@ -1,7 +1,7 @@
import requests import requests
from app import crud from app import crud
from app.core import config from app.core.config import settings
from app.db.session import db_session from app.db.session import db_session
from app.schemas.user import UserCreate from app.schemas.user import UserCreate
from app.tests.utils.utils import get_server_api, random_lower_string, random_email from app.tests.utils.utils import get_server_api, random_lower_string, random_email
@@ -10,25 +10,25 @@ from app.tests.utils.utils import get_server_api, random_lower_string, random_em
def test_get_users_superuser_me(superuser_token_headers): def test_get_users_superuser_me(superuser_token_headers):
server_api = get_server_api() server_api = get_server_api()
r = requests.get( r = requests.get(
f"{server_api}{config.API_V1_STR}/users/me", headers=superuser_token_headers f"{server_api}{settings.API_V1_STR}/users/me", headers=superuser_token_headers
) )
current_user = r.json() current_user = r.json()
assert current_user assert current_user
assert current_user["is_active"] is True assert current_user["is_active"] is True
assert current_user["is_superuser"] assert current_user["is_superuser"]
assert current_user["email"] == config.FIRST_SUPERUSER assert current_user["email"] == settings.FIRST_SUPERUSER
def test_get_users_normal_user_me(normal_user_token_headers): def test_get_users_normal_user_me(normal_user_token_headers):
server_api = get_server_api() server_api = get_server_api()
r = requests.get( r = requests.get(
f"{server_api}{config.API_V1_STR}/users/me", headers=normal_user_token_headers f"{server_api}{settings.API_V1_STR}/users/me", headers=normal_user_token_headers
) )
current_user = r.json() current_user = r.json()
assert current_user assert current_user
assert current_user["is_active"] is True assert current_user["is_active"] is True
assert current_user["is_superuser"] is False assert current_user["is_superuser"] is False
assert current_user["email"] == config.EMAIL_TEST_USER assert current_user["email"] == settings.EMAIL_TEST_USER
def test_create_user_new_email(superuser_token_headers): def test_create_user_new_email(superuser_token_headers):
@@ -37,7 +37,7 @@ def test_create_user_new_email(superuser_token_headers):
password = random_lower_string() password = random_lower_string()
data = {"email": username, "password": password} data = {"email": username, "password": password}
r = requests.post( r = requests.post(
f"{server_api}{config.API_V1_STR}/users/", f"{server_api}{settings.API_V1_STR}/users/",
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
@@ -55,7 +55,7 @@ def test_get_existing_user(superuser_token_headers):
user = crud.user.create(db_session, obj_in=user_in) user = crud.user.create(db_session, obj_in=user_in)
user_id = user.id user_id = user.id
r = requests.get( r = requests.get(
f"{server_api}{config.API_V1_STR}/users/{user_id}", f"{server_api}{settings.API_V1_STR}/users/{user_id}",
headers=superuser_token_headers, headers=superuser_token_headers,
) )
assert 200 <= r.status_code < 300 assert 200 <= r.status_code < 300
@@ -73,7 +73,7 @@ def test_create_user_existing_username(superuser_token_headers):
crud.user.create(db_session, obj_in=user_in) crud.user.create(db_session, obj_in=user_in)
data = {"email": username, "password": password} data = {"email": username, "password": password}
r = requests.post( r = requests.post(
f"{server_api}{config.API_V1_STR}/users/", f"{server_api}{settings.API_V1_STR}/users/",
headers=superuser_token_headers, headers=superuser_token_headers,
json=data, json=data,
) )
@@ -88,7 +88,7 @@ def test_create_user_by_normal_user(normal_user_token_headers):
password = random_lower_string() password = random_lower_string()
data = {"email": username, "password": password} data = {"email": username, "password": password}
r = requests.post( r = requests.post(
f"{server_api}{config.API_V1_STR}/users/", f"{server_api}{settings.API_V1_STR}/users/",
headers=normal_user_token_headers, headers=normal_user_token_headers,
json=data, json=data,
) )
@@ -108,7 +108,7 @@ def test_retrieve_users(superuser_token_headers):
crud.user.create(db_session, obj_in=user_in2) crud.user.create(db_session, obj_in=user_in2)
r = requests.get( r = requests.get(
f"{server_api}{config.API_V1_STR}/users/", headers=superuser_token_headers f"{server_api}{settings.API_V1_STR}/users/", headers=superuser_token_headers
) )
all_users = r.json() all_users = r.json()

View File

@@ -1,6 +1,6 @@
import pytest import pytest
from app.core import config from app.core.config import settings
from app.tests.utils.utils import get_server_api, get_superuser_token_headers from app.tests.utils.utils import get_server_api, get_superuser_token_headers
from app.tests.utils.user import authentication_token_from_email from app.tests.utils.user import authentication_token_from_email
@@ -17,4 +17,4 @@ def superuser_token_headers():
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def normal_user_token_headers(): def normal_user_token_headers():
return authentication_token_from_email(config.EMAIL_TEST_USER) return authentication_token_from_email(settings.EMAIL_TEST_USER)

View File

@@ -1,7 +1,7 @@
import requests import requests
from app import crud from app import crud
from app.core import config from app.core.config import settings
from app.db.session import db_session from app.db.session import db_session
from app.schemas.user import UserCreate, UserUpdate from app.schemas.user import UserCreate, UserUpdate
from app.tests.utils.utils import get_server_api, random_lower_string, random_email from app.tests.utils.utils import get_server_api, random_lower_string, random_email
@@ -10,7 +10,7 @@ from app.tests.utils.utils import get_server_api, random_lower_string, random_em
def user_authentication_headers(server_api, email, password): def user_authentication_headers(server_api, email, password):
data = {"username": email, "password": password} data = {"username": email, "password": password}
r = requests.post(f"{server_api}{config.API_V1_STR}/login/access-token", data=data) r = requests.post(f"{server_api}{settings.API_V1_STR}/login/access-token", data=data)
response = r.json() response = r.json()
auth_token = response["access_token"] auth_token = response["access_token"]
headers = {"Authorization": f"Bearer {auth_token}"} headers = {"Authorization": f"Bearer {auth_token}"}

View File

@@ -2,7 +2,8 @@ import random
import string import string
import requests import requests
from app.core import config
from app.core.config import settings
def random_lower_string(): def random_lower_string():
@@ -14,18 +15,18 @@ def random_email():
def get_server_api(): def get_server_api():
server_name = f"http://{config.SERVER_NAME}" server_name = f"http://{settings.SERVER_NAME}"
return server_name return server_name
def get_superuser_token_headers(): def get_superuser_token_headers():
server_api = get_server_api() server_api = get_server_api()
login_data = { login_data = {
"username": config.FIRST_SUPERUSER, "username": settings.FIRST_SUPERUSER,
"password": config.FIRST_SUPERUSER_PASSWORD, "password": settings.FIRST_SUPERUSER_PASSWORD,
} }
r = requests.post( r = requests.post(
f"{server_api}{config.API_V1_STR}/login/access-token", data=login_data f"{server_api}{settings.API_V1_STR}/login/access-token", data=login_data
) )
tokens = r.json() tokens = r.json()
a_token = tokens["access_token"] a_token = tokens["access_token"]

View File

@@ -8,79 +8,79 @@ import jwt
from emails.template import JinjaTemplate from emails.template import JinjaTemplate
from jwt.exceptions import InvalidTokenError from jwt.exceptions import InvalidTokenError
from app.core import config from app.core.config import settings
password_reset_jwt_subject = "preset" password_reset_jwt_subject = "preset"
def send_email(email_to: str, subject_template="", html_template="", environment={}): def send_email(email_to: str, subject_template="", html_template="", environment={}):
assert config.EMAILS_ENABLED, "no provided configuration for email variables" assert settings.EMAILS_ENABLED, "no provided configuration for email variables"
message = emails.Message( message = emails.Message(
subject=JinjaTemplate(subject_template), subject=JinjaTemplate(subject_template),
html=JinjaTemplate(html_template), html=JinjaTemplate(html_template),
mail_from=(config.EMAILS_FROM_NAME, config.EMAILS_FROM_EMAIL), mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
) )
smtp_options = {"host": config.SMTP_HOST, "port": config.SMTP_PORT} smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
if config.SMTP_TLS: if settings.SMTP_TLS:
smtp_options["tls"] = True smtp_options["tls"] = True
if config.SMTP_USER: if settings.SMTP_USER:
smtp_options["user"] = config.SMTP_USER smtp_options["user"] = settings.SMTP_USER
if config.SMTP_PASSWORD: if settings.SMTP_PASSWORD:
smtp_options["password"] = config.SMTP_PASSWORD smtp_options["password"] = settings.SMTP_PASSWORD
response = message.send(to=email_to, render=environment, smtp=smtp_options) response = message.send(to=email_to, render=environment, smtp=smtp_options)
logging.info(f"send email result: {response}") logging.info(f"send email result: {response}")
def send_test_email(email_to: str): def send_test_email(email_to: str):
project_name = config.PROJECT_NAME project_name = settings.PROJECT_NAME
subject = f"{project_name} - Test email" subject = f"{project_name} - Test email"
with open(Path(config.EMAIL_TEMPLATES_DIR) / "test_email.html") as f: with open(Path(settings.EMAIL_TEMPLATES_DIR) / "test_email.html") as f:
template_str = f.read() template_str = f.read()
send_email( send_email(
email_to=email_to, email_to=email_to,
subject_template=subject, subject_template=subject,
html_template=template_str, html_template=template_str,
environment={"project_name": config.PROJECT_NAME, "email": email_to}, environment={"project_name": settings.PROJECT_NAME, "email": email_to},
) )
def send_reset_password_email(email_to: str, email: str, token: str): def send_reset_password_email(email_to: str, email: str, token: str):
project_name = config.PROJECT_NAME project_name = settings.PROJECT_NAME
subject = f"{project_name} - Password recovery for user {email}" subject = f"{project_name} - Password recovery for user {email}"
with open(Path(config.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f: with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
template_str = f.read() template_str = f.read()
if hasattr(token, "decode"): if hasattr(token, "decode"):
use_token = token.decode() use_token = token.decode()
else: else:
use_token = token use_token = token
server_host = config.SERVER_HOST server_host = settings.SERVER_HOST
link = f"{server_host}/reset-password?token={use_token}" link = f"{server_host}/reset-password?token={use_token}"
send_email( send_email(
email_to=email_to, email_to=email_to,
subject_template=subject, subject_template=subject,
html_template=template_str, html_template=template_str,
environment={ environment={
"project_name": config.PROJECT_NAME, "project_name": settings.PROJECT_NAME,
"username": email, "username": email,
"email": email_to, "email": email_to,
"valid_hours": config.EMAIL_RESET_TOKEN_EXPIRE_HOURS, "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
"link": link, "link": link,
}, },
) )
def send_new_account_email(email_to: str, username: str, password: str): def send_new_account_email(email_to: str, username: str, password: str):
project_name = config.PROJECT_NAME project_name = settings.PROJECT_NAME
subject = f"{project_name} - New account for user {username}" subject = f"{project_name} - New account for user {username}"
with open(Path(config.EMAIL_TEMPLATES_DIR) / "new_account.html") as f: with open(Path(settings.EMAIL_TEMPLATES_DIR) / "new_account.html") as f:
template_str = f.read() template_str = f.read()
link = config.SERVER_HOST link = settings.SERVER_HOST
send_email( send_email(
email_to=email_to, email_to=email_to,
subject_template=subject, subject_template=subject,
html_template=template_str, html_template=template_str,
environment={ environment={
"project_name": config.PROJECT_NAME, "project_name": settings.PROJECT_NAME,
"username": username, "username": username,
"password": password, "password": password,
"email": email_to, "email": email_to,
@@ -90,13 +90,13 @@ def send_new_account_email(email_to: str, username: str, password: str):
def generate_password_reset_token(email): def generate_password_reset_token(email):
delta = timedelta(hours=config.EMAIL_RESET_TOKEN_EXPIRE_HOURS) delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
now = datetime.utcnow() now = datetime.utcnow()
expires = now + delta expires = now + delta
exp = expires.timestamp() exp = expires.timestamp()
encoded_jwt = jwt.encode( encoded_jwt = jwt.encode(
{"exp": exp, "nbf": now, "sub": password_reset_jwt_subject, "email": email}, {"exp": exp, "nbf": now, "sub": password_reset_jwt_subject, "email": email},
config.SECRET_KEY, settings.SECRET_KEY,
algorithm="HS256", algorithm="HS256",
) )
return encoded_jwt return encoded_jwt
@@ -104,7 +104,7 @@ def generate_password_reset_token(email):
def verify_password_reset_token(token) -> Optional[str]: def verify_password_reset_token(token) -> Optional[str]:
try: try:
decoded_token = jwt.decode(token, config.SECRET_KEY, algorithms=["HS256"]) decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
assert decoded_token["sub"] == password_reset_jwt_subject assert decoded_token["sub"] == password_reset_jwt_subject
return decoded_token["email"] return decoded_token["email"]
except InvalidTokenError: except InvalidTokenError:

View File

@@ -1,9 +1,9 @@
from raven import Client from raven import Client
from app.core import config from app.core.config import settings
from app.core.celery_app import celery_app from app.core.celery_app import celery_app
client_sentry = Client(config.SENTRY_DSN) client_sentry = Client(settings.SENTRY_DSN)
@celery_app.task(acks_late=True) @celery_app.task(acks_late=True)

View File

@@ -1,6 +1,6 @@
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.7 FROM tiangolo/uvicorn-gunicorn-fastapi:python3.7
RUN pip install celery~=4.3 passlib[bcrypt] tenacity requests emails "fastapi>=0.47.0" "uvicorn>=0.11.1" gunicorn pyjwt python-multipart email_validator jinja2 psycopg2-binary alembic SQLAlchemy email_validator RUN pip install celery~=4.3 passlib[bcrypt] tenacity requests emails "fastapi>=0.47.0" "uvicorn>=0.11.1" gunicorn pyjwt python-multipart email-validator jinja2 psycopg2-binary alembic SQLAlchemy
# For development, Jupyter remote kernel, Hydrogen # For development, Jupyter remote kernel, Hydrogen
# Using inside the container: # Using inside the container:

View File

@@ -1,6 +1,6 @@
FROM python:3.7 FROM python:3.7
RUN pip install raven celery~=4.3 passlib[bcrypt] tenacity requests "fastapi>=0.16.0" emails pyjwt email_validator jinja2 psycopg2-binary alembic SQLAlchemy email_validator RUN pip install raven celery~=4.3 passlib[bcrypt] tenacity requests "fastapi>=0.47.0" emails pyjwt email-validator jinja2 psycopg2-binary alembic SQLAlchemy
# For development, Jupyter remote kernel, Hydrogen # For development, Jupyter remote kernel, Hydrogen
# Using inside the container: # Using inside the container:

View File

@@ -1,6 +1,6 @@
FROM python:3.7 FROM python:3.7
RUN pip install requests pytest tenacity passlib[bcrypt] "fastapi>=0.16.0" psycopg2-binary SQLAlchemy email_validator RUN pip install requests pytest tenacity passlib[bcrypt] "fastapi>=0.47.0" email-validator psycopg2-binary SQLAlchemy
# For development, Jupyter remote kernel, Hydrogen # For development, Jupyter remote kernel, Hydrogen
# Using inside the container: # Using inside the container:

View File

@@ -12,3 +12,4 @@ services:
backend-tests: backend-tests:
environment: environment:
- JUPYTER=jupyter lab --ip=0.0.0.0 --allow-root --NotebookApp.custom_display_url=http://127.0.0.1:8888 - JUPYTER=jupyter lab --ip=0.0.0.0 --allow-root --NotebookApp.custom_display_url=http://127.0.0.1:8888
- SERVER_HOST=http://${DOMAIN}

View File

@@ -10,6 +10,7 @@ services:
- env-postgres.env - env-postgres.env
environment: environment:
- SERVER_NAME=backend - SERVER_NAME=backend
- SERVER_HOST=http://${DOMAIN}
backend: backend:
environment: environment:
# Don't send emails during testing # Don't send emails during testing