Полноценное приложение с тестами, миграциями и best practices
В этой теме вы примените все знания курса для создания полноценного production-ready приложения с SQLAlchemy: блог-платформа с пользователями, постами, комментариями, тестами, миграциями и best practices.
blog_app/
├── alembic/
│ ├── env.py
│ ├── script.py.mako
│ └── versions/
├── app/
│ ├── __init__.py
│ ├── config.py # Настройки
│ ├── database.py # SQLAlchemy setup
│ ├── models/ # ORM модели
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── post.py
│ │ └── comment.py
│ ├── schemas/ # Pydantic схемы
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── post.py
│ │ └── comment.py
│ ├── services/ # Бизнес-логика
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── post.py
│ │ └── comment.py
│ ├── api/ # FastAPI endpoints
│ │ ├── __init__.py
│ │ ├── users.py
│ │ ├── posts.py
│ │ └── comments.py
│ └── events.py # SQLAlchemy events
├── tests/
│ ├── __init__.py
│ ├── conftest.py # Фикстуры
│ ├── test_users.py
│ ├── test_posts.py
│ └── test_migrations.py
├── alembic.ini
├── pyproject.toml
└── .env
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
"""Настройки приложения."""
# Database
database_url: str = "postgresql+psycopg://postgres:postgres@localhost:5432/blog"
database_pool_size: int = 20
database_max_overflow: int = 40
database_pool_recycle: int = 1800
database_pool_pre_ping: bool = True
# Application
app_name: str = "Blog API"
debug: bool = False
secret_key: str = "change-me-in-production"
# Pagination
default_page_size: int = 10
max_page_size: int = 100
class Config:
env_file = ".env"
@lru_cache()
def get_settings() -> Settings:
"""Кэшированная загрузка настроек."""
return Settings()
settings = get_settings()# app/database.py
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker,
AsyncEngine,
)
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from sqlalchemy import event
import logging
from app.config import settings
logger = logging.getLogger(__name__)
# Sync engine для Alembic
sync_engine = create_engine(
settings.database_url.replace('+asyncpg', ''),
pool_size=settings.database_pool_size,
max_overflow=settings.database_max_overflow,
pool_recycle=settings.database_pool_recycle,
pool_pre_ping=settings.database_pool_pre_ping,
)
# Async engine для приложения
async_engine: AsyncEngine = create_async_engine(
settings.database_url,
pool_size=settings.database_pool_size,
max_overflow=settings.database_max_overflow,
pool_recycle=settings.database_pool_recycle,
pool_pre_ping=settings.database_pool_pre_ping,
echo=settings.debug, # Логирование SQL в debug режиме
)
# Фабрика сессий
async_session_maker = async_sessionmaker(
async_engine,
class_=AsyncSession,
autoflush=False,
autocommit=False,
expire_on_commit=False,
)
class Base(DeclarativeBase):
"""Базовый класс для моделей."""
pass
# Dependency для FastAPI
async def get_db() -> AsyncSession:
"""Получить сессию БД."""
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# Event listeners для мониторинга
@event.listens_for(sync_engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
context._query_start_time = __import__('time').time()
@event.listens_for(sync_engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
duration = __import__('time').time() - context._query_start_time
if duration > 1.0:
logger.warning(f"Slow query ({duration:.3f}s): {statement[:200]}")# app/models/user.py
from sqlalchemy import String, Boolean, DateTime
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from datetime import datetime
from typing import TYPE_CHECKING
from app.database import Base
if TYPE_CHECKING:
from app.models.post import Post
class User(Base):
"""Модель пользователя."""
__tablename__ = 'users'
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True, nullable=False)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True, nullable=False)
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
is_superuser: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now()
)
# Relationships
posts: Mapped[list["Post"]] = relationship(
back_populates="author",
cascade="all, delete-orphan",
lazy="selectin"
)
def __repr__(self) -> str:
return f"<User(id={self.id}, username='{self.username}')>"# app/models/post.py
from sqlalchemy import String, Text, DateTime, ForeignKey, Index
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from datetime import datetime
from typing import TYPE_CHECKING
from app.database import Base
if TYPE_CHECKING:
from app.models.user import User
from app.models.comment import Comment
class Post(Base):
"""Модель поста."""
__tablename__ = 'posts'
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
title: Mapped[str] = mapped_column(String(200), nullable=False)
content: Mapped[str] = mapped_column(Text, nullable=False)
is_published: Mapped[bool] = mapped_column(Boolean, default=False)
author_id: Mapped[int] = mapped_column(ForeignKey('users.id'), nullable=False, index=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now()
)
# Relationships
author: Mapped["User"] = relationship(back_populates="posts")
comments: Mapped[list["Comment"]] = relationship(
back_populates="post",
cascade="all, delete-orphan",
lazy="selectin"
)
# Indexes
__table_args__ = (
Index('ix_posts_author_created', 'author_id', 'created_at'),
)
def __repr__(self) -> str:
return f"<Post(id={self.id}, title='{self.title}')>"# app/models/comment.py
from sqlalchemy import String, Text, DateTime, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from datetime import datetime
from typing import TYPE_CHECKING
from app.database import Base
if TYPE_CHECKING:
from app.models.user import User
from app.models.post import Post
class Comment(Base):
"""Модель комментария."""
__tablename__ = 'comments'
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
content: Mapped[str] = mapped_column(Text, nullable=False)
author_id: Mapped[int] = mapped_column(ForeignKey('users.id'), nullable=False)
post_id: Mapped[int] = mapped_column(ForeignKey('posts.id'), nullable=False)
parent_id: Mapped[int | None] = mapped_column(ForeignKey('comments.id'))
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now()
)
# Relationships
author: Mapped["User"] = relationship(back_populates="comments")
post: Mapped["Post"] = relationship(back_populates="comments")
parent: Mapped["Comment | None"] = relationship(
back_populates="replies",
remote_side=[id]
)
replies: Mapped[list["Comment"]] = relationship(
back_populates="parent",
cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<Comment(id={self.id}, post_id={self.post_id})>"
# Добавить relationship к User
User.comments = relationship(
"Comment",
back_populates="author",
cascade="all, delete-orphan"
)# app/schemas/user.py
from pydantic import BaseModel, EmailStr, Field, ConfigDict
from datetime import datetime
from typing import Optional
class UserBase(BaseModel):
"""Базовая схема пользователя."""
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
model_config = ConfigDict(
from_attributes=True,
str_strip_whitespace=True,
)
class UserCreate(UserBase):
"""Схема для создания пользователя."""
password: str = Field(..., min_length=8, max_length=128)
class UserResponse(UserBase):
"""Схема ответа."""
id: int
is_active: bool
created_at: datetime
model_config = ConfigDict(
from_attributes=True,
)# app/schemas/post.py
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional
class PostBase(BaseModel):
"""Базовая схема поста."""
title: str = Field(..., min_length=1, max_length=200)
content: str = Field(..., min_length=1)
class PostCreate(PostBase):
"""Схема для создания поста."""
is_published: bool = False
class PostResponse(PostBase):
"""Схема ответа."""
id: int
author_id: int
is_published: bool
created_at: datetime
updated_at: datetime
model_config = ConfigDict(
from_attributes=True,
)# app/services/user.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from passlib.context import CryptContext
from app.models.user import User
from app.schemas.user import UserCreate
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class UserService:
"""Сервис для работы с пользователями."""
def __init__(self, session: AsyncSession):
self.session = session
async def get_by_id(self, user_id: int) -> User | None:
"""Получить пользователя по ID."""
result = await self.session.get(User, user_id)
return result
async def get_by_email(self, email: str) -> User | None:
"""Получить пользователя по email."""
result = await self.session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def create(self, user_data: UserCreate) -> User:
"""Создать нового пользователя."""
user = User(
email=user_data.email,
username=user_data.username,
password_hash=pwd_context.hash(user_data.password),
)
self.session.add(user)
await self.session.flush()
return user
async def update(self, user: User, **kwargs) -> User:
"""Обновить пользователя."""
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, value)
await self.session.flush()
return user# app/api/users.py
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from app.database import get_db
from app.schemas.user import UserCreate, UserResponse
from app.services.user import UserService
router = APIRouter(prefix='/users', tags=['users'])
@router.post('', response_model=UserResponse, status_code=201)
async def create_user(
user_data: UserCreate,
db: AsyncSession = Depends(get_db)
):
"""Создать нового пользователя."""
service = UserService(db)
# Проверка на дубликат email
existing = await service.get_by_email(user_data.email)
if existing:
raise HTTPException(status_code=400, detail='Email already registered')
user = await service.create(user_data)
return user
@router.get('/{user_id}', response_model=UserResponse)
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db)
):
"""Получить пользователя по ID."""
service = UserService(db)
user = await service.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail='User not found')
return user
@router.get('', response_model=List[UserResponse])
async def list_users(
page: int = Query(1, ge=1),
per_page: int = Query(10, ge=1, le=100),
db: AsyncSession = Depends(get_db)
):
"""Список пользователей с пагинацией."""
from app.models.user import User
from sqlalchemy import select
offset = (page - 1) * per_page
result = await db.execute(
select(User).offset(offset).limit(per_page)
)
users = result.scalars().all()
return users# tests/conftest.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from typing import AsyncGenerator
from app.database import get_db
from app.models import Base
@pytest.fixture(scope='session')
def test_engine():
"""Создать engine для тестов."""
engine = create_async_engine(
'postgresql+asyncpg://postgres:postgres@localhost:5432/test_blog',
echo=False,
)
return engine
@pytest.fixture
async def db_session(test_engine) -> AsyncGenerator[AsyncSession, None]:
"""
Transactional фикстура: каждый тест в транзакции.
Транзакция откатывается после теста.
"""
async with test_engine.begin() as conn:
# Создать таблицы
await conn.run_sync(Base.metadata.create_all)
async_session_maker = async_sessionmaker(
conn,
class_=AsyncSession,
expire_on_commit=False,
)
async with async_session_maker() as session:
yield session
# Откатить всё
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
def override_get_db(db_session):
"""Override dependency для тестов."""
async def _get_db():
yield db_session
return _get_db# tests/test_users.py
import pytest
from httpx import AsyncClient
from sqlalchemy import select
from app.main import app
from app.models.user import User
@pytest.mark.asyncio
async def test_create_user(db_session, override_get_db):
"""Тест создания пользователя."""
from app.database import get_db
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(app=app, base_url='http://test') as client:
response = await client.post(
'/users',
json={
'email': 'test@example.com',
'username': 'testuser',
'password': 'password123'
}
)
assert response.status_code == 201
data = response.json()
assert data['email'] == 'test@example.com'
assert data['username'] == 'testuser'
# Проверка в БД
result = await db_session.execute(select(User))
users = result.scalars().all()
assert len(users) == 1
@pytest.mark.asyncio
async def test_duplicate_email(db_session, override_get_db):
"""Тест дубликата email."""
from app.database import get_db
app.dependency_overrides[get_db] = override_get_db
# Создать первого пользователя
from app.services.user import UserService
from app.schemas.user import UserCreate
service = UserService(db_session)
await service.create(UserCreate(
email='test@example.com',
username='testuser',
password='password123'
))
await db_session.commit()
# Попытка создать второго с тем же email
async with AsyncClient(app=app, base_url='http://test') as client:
response = await client.post(
'/users',
json={
'email': 'test@example.com',
'username': 'anotheruser',
'password': 'password123'
}
)
assert response.status_code == 400# alembic/versions/001_initial.py
"""Initial migration
Revision ID: 001
Revises:
Create Date: 2024-01-01 12:00:00
"""
from alembic import op
import sqlalchemy as sa
revision = '001'
down_revision = None
def upgrade():
# Users table
op.create_table(
'users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('username', sa.String(50), nullable=False),
sa.Column('password_hash', sa.String(255), nullable=False),
sa.Column('is_active', sa.Boolean(), default=True),
sa.Column('is_superuser', sa.Boolean(), default=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('email'),
sa.UniqueConstraint('username'),
)
op.create_index('ix_users_email', 'users', ['email'])
op.create_index('ix_users_username', 'users', ['username'])
# Posts table
op.create_table(
'posts',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(200), nullable=False),
sa.Column('content', sa.Text(), nullable=False),
sa.Column('is_published', sa.Boolean(), default=False),
sa.Column('author_id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.ForeignKeyConstraint(['author_id'], ['users.id']),
sa.PrimaryKeyConstraint('id'),
)
op.create_index('ix_posts_author_id', 'posts', ['author_id'])
op.create_index('ix_posts_author_created', 'posts', ['author_id', 'created_at'])
# Comments table
op.create_table(
'comments',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('content', sa.Text(), nullable=False),
sa.Column('author_id', sa.Integer(), nullable=False),
sa.Column('post_id', sa.Integer(), nullable=False),
sa.Column('parent_id', sa.Integer(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.ForeignKeyConstraint(['author_id'], ['users.id']),
sa.ForeignKeyConstraint(['post_id'], ['posts.id']),
sa.ForeignKeyConstraint(['parent_id'], ['comments.id']),
sa.PrimaryKeyConstraint('id'),
)
def downgrade():
op.drop_table('comments')
op.drop_table('posts')
op.drop_table('users')# app/main.py
from fastapi import FastAPI
from contextlib import asynccontextmanager
from app.database import async_engine
from app.api import users, posts, comments
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup и shutdown события."""
# Startup
print("Starting up...")
yield
# Shutdown
print("Shutting down...")
await async_engine.dispose()
app = FastAPI(
title="Blog API",
description="Blog platform API with SQLAlchemy",
version="1.0.0",
lifespan=lifespan,
)
# Include routers
app.include_router(users.router)
app.include_router(posts.router)
app.include_router(comments.router)
@app.get('/health')
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}# Запуск
uvicorn app.main:app --reload
# Тесты
pytest tests/ -v --asyncio
# Миграции
alembic upgrade headВы создали полноценное production-ready приложение с:
Поздравляем с завершением курса!
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.