Add Items (crud, models, endpoints), utils, refactor (#14)

* Update CRUD utils to use types better.
* Simplify Pydantic model names, from `UserInCreate` to `UserCreate`, etc.
* Upgrade packages.
* Add new generic "Items" models, crud utils, endpoints, and tests. To facilitate re-using them to create new functionality. As they are simple and generic (not like Users), it's easier to copy-paste and adapt them to each use case.
* Update endpoints/*path operations* to simplify code and use new utilities, prefix and tags in `include_router`.
* Update testing utils.
* Update linting rules, relax vulture to reduce false positives.
* Update migrations to include new Items.
* Update project README.md with tips about how to start with backend.
This commit is contained in:
Sebastián Ramírez
2019-04-19 09:45:23 +04:00
committed by GitHub
parent 1fe4908b0a
commit ecd634e497
32 changed files with 426 additions and 1091 deletions

View File

@@ -53,7 +53,9 @@ If your Docker is not running in `localhost` (the URLs above wouldn't work) chec
### General workflow
Add and modify SQLAlchemy models in `./backend/app/app/db_models/`, Pydantic models in `./backend/app/app/models` and API endpoints in `./backend/app/app/api/`.
Open your editor at `./backend/app/` (instead of the project root: `./`), so that you see an `./app/` directory with your code inside. That way, your editor will be able to find all the imports, etc.
Modify or add SQLAlchemy models in `./backend/app/app/db_models/`, Pydantic models in `./backend/app/app/models/`, API endpoints in `./backend/app/app/api/`, CRUD (Create, Read, Update, Delete) utils in `./backend/app/app/crud/`. The easiest might be to copy the ones for Items (models, endpoints, and CRUD utils) and update them to your needs.
Add and modify tasks to the Celery worker in `./backend/app/app/worker.py`.

View File

@@ -20,7 +20,7 @@ pyjwt = "*"
python-multipart = "*"
email-validator = "*"
requests = "*"
celery = "~=4.3"
celery = "*"
passlib = {extras = ["bcrypt"],version = "*"}
tenacity = "*"
pydantic = "*"

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +1,8 @@
"""First revision
Revision ID: e6ae69e9dcb9
Revision ID: d4867f3a4c0a
Revises:
Create Date: 2019-02-13 14:27:57.038583
Create Date: 2019-04-17 13:53:32.978401
"""
from alembic import op
@@ -10,7 +10,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e6ae69e9dcb9'
revision = 'd4867f3a4c0a'
down_revision = None
branch_labels = None
depends_on = None
@@ -30,11 +30,26 @@ def upgrade():
op.create_index(op.f('ix_user_email'), 'user', ['email'], unique=True)
op.create_index(op.f('ix_user_full_name'), 'user', ['full_name'], unique=False)
op.create_index(op.f('ix_user_id'), 'user', ['id'], unique=False)
op.create_table('item',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(), nullable=True),
sa.Column('description', sa.String(), nullable=True),
sa.Column('owner_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['owner_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_item_description'), 'item', ['description'], unique=False)
op.create_index(op.f('ix_item_id'), 'item', ['id'], unique=False)
op.create_index(op.f('ix_item_title'), 'item', ['title'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_item_title'), table_name='item')
op.drop_index(op.f('ix_item_id'), table_name='item')
op.drop_index(op.f('ix_item_description'), table_name='item')
op.drop_table('item')
op.drop_index(op.f('ix_user_id'), table_name='user')
op.drop_index(op.f('ix_user_full_name'), table_name='user')
op.drop_index(op.f('ix_user_email'), table_name='user')

View File

@@ -1,8 +1,9 @@
from fastapi import APIRouter
from app.api.api_v1.endpoints import token, user, utils
from app.api.api_v1.endpoints import items, login, users, utils
api_router = APIRouter()
api_router.include_router(token.router)
api_router.include_router(user.router)
api_router.include_router(utils.router)
api_router.include_router(login.router, tags=["login"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(utils.router, prefix="/utils", tags=["utils"])
api_router.include_router(items.router, prefix="/items", tags=["items"])

View File

@@ -0,0 +1,102 @@
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud
from app.api.utils.db import get_db
from app.api.utils.security import get_current_active_user
from app.db_models.user import User as DBUser
from app.models.item import Item, ItemCreate, ItemUpdate
router = APIRouter()
@router.get("/", response_model=List[Item])
def read_items(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
current_user: DBUser = Depends(get_current_active_user),
):
"""
Retrieve items.
"""
if crud.user.is_superuser(current_user):
items = crud.item.get_multi(db, skip=skip, limit=limit)
else:
items = crud.item.get_multi_by_owner(
db_session=db, owner_id=current_user.id, skip=skip, limit=limit
)
return items
@router.post("/", response_model=Item)
def create_item(
*,
db: Session = Depends(get_db),
item_in: ItemCreate,
current_user: DBUser = Depends(get_current_active_user),
):
"""
Create new item.
"""
item = crud.item.create(db_session=db, item_in=item_in, owner_id=current_user.id)
return item
@router.put("/{id}", response_model=Item)
def update_item(
*,
db: Session = Depends(get_db),
id: int,
item_in: ItemUpdate,
current_user: DBUser = Depends(get_current_active_user),
):
"""
Update an item.
"""
item = crud.item.get(db_session=db, id=id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
if not crud.user.is_superuser(current_user) and (item.owner_id != current_user.id):
raise HTTPException(status_code=400, detail="Not enough permissions")
item = crud.item.update(db_session=db, item=item, item_in=item_in)
return item
@router.get("/{id}", response_model=Item)
def read_user_me(
*,
db: Session = Depends(get_db),
id: int,
current_user: DBUser = Depends(get_current_active_user),
):
"""
Get item by ID.
"""
item = crud.item.get(db_session=db, id=id)
if not item:
raise HTTPException(status_code=400, detail="Item not found")
if not crud.user.is_superuser(current_user) and (item.owner_id != current_user.id):
raise HTTPException(status_code=400, detail="Not enough permissions")
return item
@router.delete("/{id}", response_model=Item)
def delete_item(
*,
db: Session = Depends(get_db),
id: int,
current_user: DBUser = Depends(get_current_active_user),
):
"""
Delete an item.
"""
item = crud.item.get(db_session=db, id=id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
if not crud.user.is_superuser(current_user) and (item.owner_id != current_user.id):
raise HTTPException(status_code=400, detail="Not enough permissions")
item = crud.item.remove(db_session=db, id=id)
return item

View File

@@ -10,13 +10,13 @@ from app.api.utils.db import get_db
from app.api.utils.security import get_current_active_superuser, get_current_active_user
from app.core import config
from app.db_models.user import User as DBUser
from app.models.user import User, UserInCreate, UserInDB, UserInUpdate
from app.models.user import User, UserCreate, UserInDB, UserUpdate
from app.utils import send_new_account_email
router = APIRouter()
@router.get("/users/", tags=["users"], response_model=List[User])
@router.get("/", response_model=List[User])
def read_users(
db: Session = Depends(get_db),
skip: int = 0,
@@ -24,21 +24,21 @@ def read_users(
current_user: DBUser = Depends(get_current_active_superuser),
):
"""
Retrieve users
Retrieve users.
"""
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users
@router.post("/users/", tags=["users"], response_model=User)
@router.post("/", response_model=User)
def create_user(
*,
db: Session = Depends(get_db),
user_in: UserInCreate,
user_in: UserCreate,
current_user: DBUser = Depends(get_current_active_superuser),
):
"""
Create new user
Create new user.
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
@@ -54,7 +54,7 @@ def create_user(
return user
@router.put("/users/me", tags=["users"], response_model=User)
@router.put("/me", response_model=User)
def update_user_me(
*,
db: Session = Depends(get_db),
@@ -64,10 +64,10 @@ def update_user_me(
current_user: DBUser = Depends(get_current_active_user),
):
"""
Update own user
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
user_in = UserInUpdate(**current_user_data)
user_in = UserUpdate(**current_user_data)
if password is not None:
user_in.password = password
if full_name is not None:
@@ -78,18 +78,18 @@ def update_user_me(
return user
@router.get("/users/me", tags=["users"], response_model=User)
@router.get("/me", response_model=User)
def read_user_me(
db: Session = Depends(get_db),
current_user: DBUser = Depends(get_current_active_user),
):
"""
Get current user
Get current user.
"""
return current_user
@router.post("/users/open", tags=["users"], response_model=User)
@router.post("/open", response_model=User)
def create_user_open(
*,
db: Session = Depends(get_db),
@@ -98,7 +98,7 @@ def create_user_open(
full_name: str = Body(None),
):
"""
Create new user without the need to be logged in
Create new user without the need to be logged in.
"""
if not config.USERS_OPEN_REGISTRATION:
raise HTTPException(
@@ -111,19 +111,19 @@ def create_user_open(
status_code=400,
detail="The user with this username already exists in the system",
)
user_in = UserInCreate(password=password, email=email, full_name=full_name)
user_in = UserCreate(password=password, email=email, full_name=full_name)
user = crud.user.create(db, user_in=user_in)
return user
@router.get("/users/{user_id}", tags=["users"], response_model=User)
@router.get("/{user_id}", response_model=User)
def read_user_by_id(
user_id: int,
current_user: DBUser = Depends(get_current_active_user),
db: Session = Depends(get_db),
):
"""
Get a specific user by id
Get a specific user by id.
"""
user = crud.user.get(db, user_id=user_id)
if user == current_user:
@@ -135,19 +135,18 @@ def read_user_by_id(
return user
@router.put("/users/{user_id}", tags=["users"], response_model=User)
@router.put("/{user_id}", response_model=User)
def update_user(
*,
db: Session = Depends(get_db),
user_id: int,
user_in: UserInUpdate,
user_in: UserUpdate,
current_user: UserInDB = Depends(get_current_active_superuser),
):
"""
Update a user
Update a user.
"""
user = crud.user.get(db, user_id=user_id)
if not user:
raise HTTPException(
status_code=404,

View File

@@ -10,23 +10,23 @@ from app.utils import send_test_email
router = APIRouter()
@router.post("/test-celery/", tags=["utils"], response_model=Msg, status_code=201)
@router.post("/test-celery/", response_model=Msg, status_code=201)
def test_celery(
msg: Msg, current_user: UserInDB = Depends(get_current_active_superuser)
):
"""
Test Celery worker
Test Celery worker.
"""
celery_app.send_task("app.worker.test_celery", args=[msg.msg])
return {"msg": "Word received"}
@router.post("/test-email/", tags=["utils"], response_model=Msg, status_code=201)
@router.post("/test-email/", response_model=Msg, status_code=201)
def test_email(
email_to: EmailStr, current_user: UserInDB = Depends(get_current_active_superuser)
):
"""
Test emails
Test emails.
"""
send_test_email(email_to=email_to)
return {"msg": "Test email sent"}

View File

@@ -1 +1 @@
from . import user
from . import item, user

View File

@@ -0,0 +1,54 @@
from typing import List, Optional
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from app.db_models.item import Item
from app.models.item import ItemCreate, ItemUpdate
def get(db_session: Session, *, id: int) -> Optional[Item]:
return db_session.query(Item).filter(Item.id == id).first()
def get_multi(db_session: Session, *, skip=0, limit=100) -> List[Optional[Item]]:
return db_session.query(Item).offset(skip).limit(limit).all()
def get_multi_by_owner(
db_session: Session, *, owner_id: int, skip=0, limit=100
) -> List[Optional[Item]]:
return (
db_session.query(Item)
.filter(Item.owner_id == owner_id)
.offset(skip)
.limit(limit)
.all()
)
def create(db_session: Session, *, item_in: ItemCreate, owner_id: int) -> Item:
item = Item(title=item_in.title, description=item_in.description, owner_id=owner_id)
db_session.add(item)
db_session.commit()
db_session.refresh(item)
return item
def update(db_session: Session, *, item: Item, item_in: ItemUpdate) -> Item:
item_data = jsonable_encoder(item)
update_data = item_in.dict(skip_defaults=True)
for field in item_data:
if field in update_data:
setattr(item, field, update_data[field])
db_session.add(item)
db_session.commit()
db_session.refresh(item)
return item
def remove(db_session: Session, *, id: int):
item = db_session.query(Item).filter(Item.id == id).first()
db_session.delete(item)
db_session.commit()
return item

View File

@@ -1,21 +1,22 @@
from typing import List, Optional
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.db_models.user import User
from app.models.user import UserInCreate, UserInUpdate
from app.models.user import UserCreate, UserUpdate
def get(db_session, *, user_id: int) -> Optional[User]:
def get(db_session: Session, *, user_id: int) -> Optional[User]:
return db_session.query(User).filter(User.id == user_id).first()
def get_by_email(db_session, *, email: str) -> Optional[User]:
def get_by_email(db_session: Session, *, email: str) -> Optional[User]:
return db_session.query(User).filter(User.email == email).first()
def authenticate(db_session, *, email: str, password: str) -> Optional[User]:
def authenticate(db_session: Session, *, email: str, password: str) -> Optional[User]:
user = get_by_email(db_session, email=email)
if not user:
return None
@@ -32,11 +33,11 @@ def is_superuser(user) -> bool:
return user.is_superuser
def get_multi(db_session, *, skip=0, limit=100) -> List[Optional[User]]:
def get_multi(db_session: Session, *, skip=0, limit=100) -> List[Optional[User]]:
return db_session.query(User).offset(skip).limit(limit).all()
def create(db_session, *, user_in: UserInCreate) -> User:
def create(db_session: Session, *, user_in: UserCreate) -> User:
user = User(
email=user_in.email,
hashed_password=get_password_hash(user_in.password),
@@ -49,13 +50,12 @@ def create(db_session, *, user_in: UserInCreate) -> User:
return user
def update(db_session, *, user: User, user_in: UserInUpdate) -> User:
def update(db_session: Session, *, user: User, user_in: UserUpdate) -> User:
user_data = jsonable_encoder(user)
update_data = user_in.dict(skip_defaults=True)
for field in user_data:
if field in user_in.fields:
value_in = getattr(user_in, field)
if value_in is not None:
setattr(user, field, value_in)
if field in update_data:
setattr(user, field, update_data[field])
if user_in.password:
passwordhash = get_password_hash(user_in.password)
user.hashed_password = passwordhash

View File

@@ -2,3 +2,4 @@
# imported by Alembic
from app.db.base_class import Base # noqa
from app.db_models.user import User # noqa
from app.db_models.item import Item # noqa

View File

@@ -1,6 +1,6 @@
from app import crud
from app.core import config
from app.models.user import UserInCreate
from app.models.user import UserCreate
def init_db(db_session):
@@ -11,7 +11,7 @@ def init_db(db_session):
user = crud.user.get_by_email(db_session, email=config.FIRST_SUPERUSER)
if not user:
user_in = UserInCreate(
user_in = UserCreate(
email=config.FIRST_SUPERUSER,
password=config.FIRST_SUPERUSER_PASSWORD,
is_superuser=True,

View File

@@ -0,0 +1,12 @@
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class Item(Base):
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("user.id"))
owner = relationship("User", back_populates="items")

View File

@@ -1,4 +1,5 @@
from sqlalchemy import Boolean, Column, Integer, String
from sqlalchemy.orm import relationship
from app.db.base_class import Base
@@ -10,3 +11,4 @@ class User(Base):
hashed_password = Column(String)
is_active = Column(Boolean(), default=True)
is_superuser = Column(Boolean(), default=False)
items = relationship("Item", back_populates="owner")

View File

@@ -0,0 +1,34 @@
from pydantic import BaseModel
# Shared properties
class ItemBase(BaseModel):
title: str = None
description: str = None
# Properties to receive on item creation
class ItemCreate(ItemBase):
title: str
# Properties to receive on item update
class ItemUpdate(ItemBase):
pass
# Properties shared by models stored in DB
class ItemInDBBase(ItemBase):
id: int
title: str
owner_id: int
# Properties to return to client
class Item(ItemInDBBase):
pass
# Properties properties stored in DB
class ItemInDB(ItemInDBBase):
pass

View File

@@ -16,13 +16,13 @@ class UserBaseInDB(UserBase):
# Properties to receive via API on creation
class UserInCreate(UserBaseInDB):
class UserCreate(UserBaseInDB):
email: str
password: str
# Properties to receive via API on update
class UserInUpdate(UserBaseInDB):
class UserUpdate(UserBaseInDB):
password: Optional[str] = None

View File

@@ -8,7 +8,7 @@ def test_celery_worker_test(superuser_token_headers):
server_api = get_server_api()
data = {"msg": "test"}
r = requests.post(
f"{server_api}{config.API_V1_STR}/test-celery/",
f"{server_api}{config.API_V1_STR}/utils/test-celery/",
json=data,
headers=superuser_token_headers,
)

View File

@@ -0,0 +1,34 @@
import requests
from app.core import config
from app.tests.utils.item import create_random_item
from app.tests.utils.utils import get_server_api
def test_create_item(superuser_token_headers):
server_api = get_server_api()
data = {"title": "Foo", "description": "Fighters"}
response = requests.post(
f"{server_api}{config.API_V1_STR}/items/",
headers=superuser_token_headers,
json=data,
)
content = response.json()
assert content["title"] == data["title"]
assert content["description"] == data["description"]
assert "id" in content
assert "owner_id" in content
def test_read_item(superuser_token_headers):
item = create_random_item()
server_api = get_server_api()
response = requests.get(
f"{server_api}{config.API_V1_STR}/items/{item.id}",
headers=superuser_token_headers,
)
content = response.json()
assert content["title"] == item.title
assert content["description"] == item.description
assert content["id"] == item.id
assert content["owner_id"] == item.owner_id

View File

@@ -3,7 +3,7 @@ import requests
from app import crud
from app.core import config
from app.db.session import db_session
from app.models.user import UserInCreate
from app.models.user import UserCreate
from app.tests.utils.user import user_authentication_headers
from app.tests.utils.utils import get_server_api, random_lower_string
@@ -40,7 +40,7 @@ def test_get_existing_user(superuser_token_headers):
server_api = get_server_api()
username = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(email=username, password=password)
user_in = UserCreate(email=username, password=password)
user = crud.user.create(db_session, user_in=user_in)
user_id = user.id
r = requests.get(
@@ -58,7 +58,7 @@ def test_create_user_existing_username(superuser_token_headers):
username = random_lower_string()
# username = email
password = random_lower_string()
user_in = UserInCreate(email=username, password=password)
user_in = UserCreate(email=username, password=password)
user = crud.user.create(db_session, user_in=user_in)
data = {"email": username, "password": password}
r = requests.post(
@@ -75,7 +75,7 @@ def test_create_user_by_normal_user():
server_api = get_server_api()
username = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(email=username, password=password)
user_in = UserCreate(email=username, password=password)
user = crud.user.create(db_session, user_in=user_in)
user_token_headers = user_authentication_headers(server_api, username, password)
data = {"email": username, "password": password}
@@ -89,12 +89,12 @@ def test_retrieve_users(superuser_token_headers):
server_api = get_server_api()
username = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(email=username, password=password)
user_in = UserCreate(email=username, password=password)
user = crud.user.create(db_session, user_in=user_in)
username2 = random_lower_string()
password2 = random_lower_string()
user_in2 = UserInCreate(email=username2, password=password2)
user_in2 = UserCreate(email=username2, password=password2)
user2 = crud.user.create(db_session, user_in=user_in2)
r = requests.get(

View File

@@ -0,0 +1,61 @@
from app import crud
from app.models.item import ItemCreate, ItemUpdate
from app.tests.utils.user import create_random_user
from app.tests.utils.utils import random_lower_string
from app.db.session import db_session
def test_create_item():
title = random_lower_string()
description = random_lower_string()
item_in = ItemCreate(title=title, description=description)
user = create_random_user()
item = crud.item.create(db_session=db_session, item_in=item_in, owner_id=user.id)
assert item.title == title
assert item.description == description
assert item.owner_id == user.id
def test_get_item():
title = random_lower_string()
description = random_lower_string()
item_in = ItemCreate(title=title, description=description)
user = create_random_user()
item = crud.item.create(db_session=db_session, item_in=item_in, owner_id=user.id)
stored_item = crud.item.get(db_session=db_session, id=item.id)
assert item.id == stored_item.id
assert item.title == stored_item.title
assert item.description == stored_item.description
assert item.owner_id == stored_item.owner_id
def test_update_item():
title = random_lower_string()
description = random_lower_string()
item_in = ItemCreate(title=title, description=description)
user = create_random_user()
item = crud.item.create(db_session=db_session, item_in=item_in, owner_id=user.id)
description2 = random_lower_string()
item_update = ItemUpdate(description=description2)
item2 = crud.item.update(
db_session=db_session, item=item, item_in=item_update
)
assert item.id == item2.id
assert item.title == item2.title
assert item2.description == description2
assert item.owner_id == item2.owner_id
def test_delete_item():
title = random_lower_string()
description = random_lower_string()
item_in = ItemCreate(title=title, description=description)
user = create_random_user()
item = crud.item.create(db_session=db_session, item_in=item_in, owner_id=user.id)
item2 = crud.item.remove(db_session=db_session, id=item.id)
item3 = crud.item.get(db_session=db_session, id=item.id)
assert item3 is None
assert item2.id == item.id
assert item2.title == title
assert item2.description == description
assert item2.owner_id == user.id

View File

@@ -2,14 +2,14 @@ from fastapi.encoders import jsonable_encoder
from app import crud
from app.db.session import db_session
from app.models.user import UserInCreate
from app.models.user import UserCreate
from app.tests.utils.utils import random_lower_string
def test_create_user():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(email=email, password=password)
user_in = UserCreate(email=email, password=password)
user = crud.user.create(db_session, user_in=user_in)
assert user.email == email
assert hasattr(user, "hashed_password")
@@ -18,7 +18,7 @@ def test_create_user():
def test_authenticate_user():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(email=email, password=password)
user_in = UserCreate(email=email, password=password)
user = crud.user.create(db_session, user_in=user_in)
authenticated_user = crud.user.authenticate(
db_session, email=email, password=password
@@ -37,7 +37,7 @@ def test_not_authenticate_user():
def test_check_if_user_is_active():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(email=email, password=password)
user_in = UserCreate(email=email, password=password)
user = crud.user.create(db_session, user_in=user_in)
is_active = crud.user.is_active(user)
assert is_active is True
@@ -46,7 +46,7 @@ def test_check_if_user_is_active():
def test_check_if_user_is_active_inactive():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(email=email, password=password, disabled=True)
user_in = UserCreate(email=email, password=password, disabled=True)
print(user_in)
user = crud.user.create(db_session, user_in=user_in)
print(user)
@@ -58,7 +58,7 @@ def test_check_if_user_is_active_inactive():
def test_check_if_user_is_superuser():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(email=email, password=password, is_superuser=True)
user_in = UserCreate(email=email, password=password, is_superuser=True)
user = crud.user.create(db_session, user_in=user_in)
is_superuser = crud.user.is_superuser(user)
assert is_superuser is True
@@ -67,7 +67,7 @@ def test_check_if_user_is_superuser():
def test_check_if_user_is_superuser_normal_user():
username = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(email=username, password=password)
user_in = UserCreate(email=username, password=password)
user = crud.user.create(db_session, user_in=user_in)
is_superuser = crud.user.is_superuser(user)
assert is_superuser is False
@@ -76,7 +76,7 @@ def test_check_if_user_is_superuser_normal_user():
def test_get_user():
password = random_lower_string()
username = random_lower_string()
user_in = UserInCreate(email=username, password=password, is_superuser=True)
user_in = UserCreate(email=username, password=password, is_superuser=True)
user = crud.user.create(db_session, user_in=user_in)
user_2 = crud.user.get(db_session, user_id=user.id)
assert user.email == user_2.email

View File

@@ -0,0 +1,17 @@
from app import crud
from app.db.session import db_session
from app.models.item import ItemCreate
from app.tests.utils.user import create_random_user
from app.tests.utils.utils import random_lower_string
def create_random_item(owner_id: int = None):
if owner_id is None:
user = create_random_user()
owner_id = user.id
title = random_lower_string()
description = random_lower_string()
item_in = ItemCreate(title=title, description=description, id=id)
return crud.item.create(
db_session=db_session, item_in=item_in, owner_id=owner_id
)

View File

@@ -1,6 +1,10 @@
import requests
from app import crud
from app.core import config
from app.db.session import db_session
from app.models.user import UserCreate
from app.tests.utils.utils import random_lower_string
def user_authentication_headers(server_api, email, password):
@@ -11,3 +15,11 @@ def user_authentication_headers(server_api, email, password):
auth_token = response["access_token"]
headers = {"Authorization": f"Bearer {auth_token}"}
return headers
def create_random_user():
email = random_lower_string()
password = random_lower_string()
user_in = UserCreate(username=email, email=email, password=password)
user = crud.user.create(db_session=db_session, user_in=user_in)
return user

View File

@@ -3,7 +3,7 @@ import logging
from tenacity import after_log, before_log, retry, stop_after_attempt, wait_fixed
from app.db.session import db_session
from app.tests.api.api_v1.test_token import test_get_access_token
from app.tests.api.api_v1.test_login import test_get_access_token
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

View File

@@ -5,4 +5,4 @@ set -x
autoflake --remove-all-unused-imports --recursive --remove-unused-variables --in-place app --exclude=__init__.py
isort --multi-line=3 --trailing-comma --force-grid-wrap=0 --combine-as --line-width 88 --recursive --apply app
black app
vulture app
vulture app --min-confidence 70

View File

@@ -1,6 +1,6 @@
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.7
RUN pip install celery~=4.3 passlib[bcrypt] tenacity requests emails "fastapi>=0.7.1" uvicorn gunicorn pyjwt python-multipart email_validator jinja2 psycopg2-binary alembic SQLAlchemy
RUN pip install celery~=4.3 passlib[bcrypt] tenacity requests emails "fastapi>=0.16.0" uvicorn gunicorn pyjwt python-multipart email_validator jinja2 psycopg2-binary alembic SQLAlchemy
# For development, Jupyter remote kernel, Hydrogen
# Using inside the container:

View File

@@ -1,6 +1,6 @@
FROM python:3.7
RUN pip install raven celery~=4.3 passlib[bcrypt] tenacity requests "fastapi>=0.7.1" emails pyjwt email_validator jinja2 psycopg2-binary alembic SQLAlchemy
RUN pip install raven celery~=4.3 passlib[bcrypt] tenacity requests "fastapi>=0.16.0" emails pyjwt email_validator jinja2 psycopg2-binary alembic SQLAlchemy
# For development, Jupyter remote kernel, Hydrogen
# Using inside the container:

View File

@@ -1,6 +1,6 @@
FROM python:3.7
RUN pip install requests pytest tenacity passlib[bcrypt] "fastapi>=0.7.1" psycopg2-binary SQLAlchemy
RUN pip install requests pytest tenacity passlib[bcrypt] "fastapi>=0.16.0" psycopg2-binary SQLAlchemy
# For development, Jupyter remote kernel, Hydrogen
# Using inside the container: