Add base class to simplify CRUD (#23)

This commit is contained in:
Manu
2020-01-19 22:40:50 +01:00
committed by Sebastián Ramírez
parent 1c975c7f2d
commit ab46165387
33 changed files with 322 additions and 283 deletions

View File

@@ -1 +1,10 @@
from . import item, user
from .crud_user import user
from .crud_item import item
# For a new basic set of CRUD operations you could just do
# from .base import CRUDBase
# from app.models.item import Item
# from app.schemas.item import ItemCreate, ItemUpdate
# item = CRUDBase[Item, ItemCreate, ItemUpdate](Item)

View File

@@ -0,0 +1,57 @@
from typing import List, Optional, Generic, TypeVar, Type
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.base_class import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
def get(self, db_session: Session, id: int) -> Optional[ModelType]:
return db_session.query(self.model).filter(self.model.id == id).first()
def get_multi(self, db_session: Session, *, skip=0, limit=100) -> List[ModelType]:
return db_session.query(self.model).offset(skip).limit(limit).all()
def create(self, db_session: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data)
db_session.add(db_obj)
db_session.commit()
db_session.refresh(db_obj)
return db_obj
def update(
self, db_session: Session, *, db_obj: ModelType, obj_in: UpdateSchemaType
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
update_data = obj_in.dict(skip_defaults=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db_session.add(db_obj)
db_session.commit()
db_session.refresh(db_obj)
return db_obj
def remove(self, db_session: Session, *, id: int) -> ModelType:
obj = db_session.query(self.model).get(id)
db_session.delete(obj)
db_session.commit()
return obj

View File

@@ -0,0 +1,34 @@
from typing import List
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from app.models.item import Item
from app.schemas.item import ItemCreate, ItemUpdate
from app.crud.base import CRUDBase
class CRUDItem(CRUDBase[Item, ItemCreate, ItemUpdate]):
def create_with_owner(
self, db_session: Session, *, obj_in: ItemCreate, owner_id: int
) -> Item:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data, owner_id=owner_id)
db_session.add(db_obj)
db_session.commit()
db_session.refresh(db_obj)
return db_obj
def get_multi_by_owner(
self, db_session: Session, *, owner_id: int, skip=0, limit=100
) -> List[Item]:
return (
db_session.query(self.model)
.filter(Item.owner_id == owner_id)
.offset(skip)
.limit(limit)
.all()
)
item = CRUDItem(Item)

View File

@@ -0,0 +1,44 @@
from typing import Optional
from sqlalchemy.orm import Session
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.core.security import verify_password, get_password_hash
from app.crud.base import CRUDBase
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db_session: Session, *, email: str) -> Optional[User]:
return db_session.query(User).filter(User.email == email).first()
def create(self, db_session: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_superuser=obj_in.is_superuser,
)
db_session.add(db_obj)
db_session.commit()
db_session.refresh(db_obj)
return db_obj
def authenticate(
self, db_session: Session, *, email: str, password: str
) -> Optional[User]:
user = self.get_by_email(db_session, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
user = CRUDUser(User)

View File

@@ -1,55 +0,0 @@
from typing import List, Optional
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from app.db_models.item import Item
from app.models.item import ItemCreate, ItemUpdate
def get(db_session: Session, *, id: int) -> Optional[Item]:
return db_session.query(Item).filter(Item.id == id).first()
def get_multi(db_session: Session, *, skip=0, limit=100) -> List[Optional[Item]]:
return db_session.query(Item).offset(skip).limit(limit).all()
def get_multi_by_owner(
db_session: Session, *, owner_id: int, skip=0, limit=100
) -> List[Optional[Item]]:
return (
db_session.query(Item)
.filter(Item.owner_id == owner_id)
.offset(skip)
.limit(limit)
.all()
)
def create(db_session: Session, *, item_in: ItemCreate, owner_id: int) -> Item:
item_in_data = jsonable_encoder(item_in)
item = Item(**item_in_data, owner_id=owner_id)
db_session.add(item)
db_session.commit()
db_session.refresh(item)
return item
def update(db_session: Session, *, item: Item, item_in: ItemUpdate) -> Item:
item_data = jsonable_encoder(item)
update_data = item_in.dict(skip_defaults=True)
for field in item_data:
if field in update_data:
setattr(item, field, update_data[field])
db_session.add(item)
db_session.commit()
db_session.refresh(item)
return item
def remove(db_session: Session, *, id: int):
item = db_session.query(Item).filter(Item.id == id).first()
db_session.delete(item)
db_session.commit()
return item

View File

@@ -1,65 +0,0 @@
from typing import List, Optional
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.db_models.user import User
from app.models.user import UserCreate, UserUpdate
def get(db_session: Session, *, user_id: int) -> Optional[User]:
return db_session.query(User).filter(User.id == user_id).first()
def get_by_email(db_session: Session, *, email: str) -> Optional[User]:
return db_session.query(User).filter(User.email == email).first()
def authenticate(db_session: Session, *, email: str, password: str) -> Optional[User]:
user = get_by_email(db_session, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(user) -> bool:
return user.is_active
def is_superuser(user) -> bool:
return user.is_superuser
def get_multi(db_session: Session, *, skip=0, limit=100) -> List[Optional[User]]:
return db_session.query(User).offset(skip).limit(limit).all()
def create(db_session: Session, *, user_in: UserCreate) -> User:
user = User(
email=user_in.email,
hashed_password=get_password_hash(user_in.password),
full_name=user_in.full_name,
is_superuser=user_in.is_superuser,
)
db_session.add(user)
db_session.commit()
db_session.refresh(user)
return user
def update(db_session: Session, *, user: User, user_in: UserUpdate) -> User:
user_data = jsonable_encoder(user)
update_data = user_in.dict(skip_defaults=True)
for field in user_data:
if field in update_data:
setattr(user, field, update_data[field])
if user_in.password:
passwordhash = get_password_hash(user_in.password)
user.hashed_password = passwordhash
db_session.add(user)
db_session.commit()
db_session.refresh(user)
return user