SQLAlchemy, асинхронные ORM, миграции Alembic
Интеграция с базой данных — ключевая часть большинства API. В этой теме вы научитесь использовать SQLAlchemy, асинхронные ORM и миграции Alembic.
FastAPI не требует конкретную ORM. Популярные варианты:
| ORM | Тип | Когда использовать |
|---|---|---|
| SQLAlchemy | Синхронная/асинхронная | Универсальный выбор, много документации |
| SQLModel | Синхронная/асинхронная | От создателя FastAPI, объединяет Pydantic + SQLAlchemy |
| Tortoise ORM | Асинхронная | Похожа на Django ORM, для async |
| Peewee | Синхронная | Лёгкая, для простых проектов |
В этой теме используем SQLAlchemy 2.0+ — наиболее зрелое решение.
pip install sqlalchemy psycopg2-binarysqlalchemy — ORMpsycopg2-binary — драйвер PostgreSQL (для SQLite не нужен)Для SQLite:
pip install sqlalchemyproject/
├── main.py
├── database.py # Настройки БД, сессии
├── models.py # SQLAlchemy модели
├── schemas.py # Pydantic схемы
└── crud.py # Операции CRUD
database.py:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# SQLite для разработки
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
# PostgreSQL для production
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False} # Только для SQLite
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()models.py:
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.orm import relationship
from datetime import datetime
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
# Связь с постами
posts = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
content = Column(String(10000), nullable=False)
published_at = Column(DateTime, nullable=True)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
# Связь с пользователем
author = relationship("User", back_populates="posts")schemas.py:
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime
from typing import Optional
# === User ===
class UserBase(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
class UserCreate(UserBase):
password: str = Field(..., min_length=8)
class UserUpdate(BaseModel):
username: str | None = Field(None, min_length=3, max_length=50)
email: EmailStr | None = None
password: str | None = Field(None, min_length=8)
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
class Config:
from_attributes = True
# === Post ===
class PostBase(BaseModel):
title: str = Field(..., min_length=5, max_length=200)
content: str = Field(..., min_length=10)
class PostCreate(PostBase):
pass
class PostResponse(PostBase):
id: int
author_id: int
published_at: datetime | None = None
class Config:
from_attributes = Truedatabase.py (дополнение):
from fastapi import Depends
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()crud.py:
from sqlalchemy.orm import Session
from models import User, Post
from schemas import UserCreate, UserUpdate, PostCreate
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# === User CRUD ===
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(User).filter(User.email == email).first()
def get_user_by_username(db: Session, username: str):
return db.query(User).filter(User.username == username).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user: UserCreate) -> User:
hashed_password = hash_password(user.password)
db_user = User(
username=user.username,
email=user.email,
hashed_password=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_user(db: Session, user_id: int, user_update: UserUpdate) -> User | None:
user = get_user(db, user_id)
if not user:
return None
update_data = user_update.model_dump(exclude_unset=True)
if 'password' in update_data and update_data['password']:
update_data['hashed_password'] = hash_password(update_data.pop('password'))
for field, value in update_data.items():
setattr(user, field, value)
db.commit()
db.refresh(user)
return user
def delete_user(db: Session, user_id: int) -> bool:
user = get_user(db, user_id)
if not user:
return False
db.delete(user)
db.commit()
return True
# === Post CRUD ===
def get_post(db: Session, post_id: int):
return db.query(Post).filter(Post.id == post_id).first()
def get_posts(db: Session, skip: int = 0, limit: int = 100):
return db.query(Post).offset(skip).limit(limit).all()
def create_post(db: Session, post: PostCreate, author_id: int) -> Post:
db_post = Post(**post.model_dump(), author_id=author_id)
db.add(db_post)
db.commit()
db.refresh(db_post)
return db_post
def update_post(db: Session, post_id: int, post_update: dict) -> Post | None:
post = get_post(db, post_id)
if not post:
return None
for field, value in post_update.items():
setattr(post, field, value)
db.commit()
db.refresh(post)
return post
def delete_post(db: Session, post_id: int) -> bool:
post = get_post(db, post_id)
if not post:
return False
db.delete(post)
db.commit()
return Truemain.py:
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import Annotated
from database import engine, get_db, Base
from models import User, Post
from schemas import UserCreate, UserResponse, UserUpdate, PostCreate, PostResponse
import crud
# Создаём таблицы (для разработки)
Base.metadata.create_all(bind=engine)
app = FastAPI()
DbSession = Annotated[Session, Depends(get_db)]
# === User Endpoints ===
@app.post('/users', response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def register_user(user: UserCreate, db: DbSession):
# Проверка уникальности email
if crud.get_user_by_email(db, user.email):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already registered"
)
# Проверка уникальности username
if crud.get_user_by_username(db, user.username):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Username already taken"
)
return crud.create_user(db, user)
@app.get('/users', response_model=list[UserResponse])
def list_users(skip: int = 0, limit: int = 100, db: DbSession = None):
return crud.get_users(db, skip, limit)
@app.get('/users/{user_id}', response_model=UserResponse)
def get_user(user_id: int, db: DbSession):
user = crud.get_user(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@app.put('/users/{user_id}', response_model=UserResponse)
def update_user(user_id: int, user_update: UserUpdate, db: DbSession):
user = crud.update_user(db, user_id, user_update)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@app.delete('/users/{user_id}', status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, db: DbSession):
if not crud.delete_user(db, user_id):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# === Post Endpoints ===
@app.post('/posts', response_model=PostResponse, status_code=status.HTTP_201_CREATED)
def create_post(post: PostCreate, author_id: int, db: DbSession):
author = crud.get_user(db, author_id)
if not author:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Author not found"
)
return crud.create_post(db, post, author_id)
@app.get('/posts', response_model=list[PostResponse])
def list_posts(skip: int = 0, limit: int = 100, db: DbSession = None):
return crud.get_posts(db, skip, limit)
@app.get('/posts/{post_id}', response_model=PostResponse)
def get_post(post_id: int, db: DbSession):
post = crud.get_post(db, post_id)
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Post not found"
)
return postДля production используйте Alembic для управления миграциями.
pip install alembic
alembic init alembicalembic.ini:
[alembic]
sqlalchemy.url = postgresql://user:password@localhost/dbnamealembic/env.py:
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
import sys
sys.path.insert(0, '..')
from database import Base
from models import User, Post # Импортируем модели
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()# Создать миграцию
alembic revision --autogenerate -m "Create users and posts tables"
# Применить миграции
alembic upgrade head
# Откатить миграцию
alembic downgrade -1Для высокой нагрузки используйте асинхронную SQLAlchemy:
pip install sqlalchemy[asyncio] asyncpgdatabase.py (async):
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
Base = declarative_base()
async def get_db():
async with AsyncSessionLocal() as session:
yield sessioncrud.py (async):
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
async def get_user(db: AsyncSession, user_id: int):
result = await db.execute(select(User).filter(User.id == user_id))
return result.scalar_one_or_none()
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100):
result = await db.execute(select(User).offset(skip).limit(limit))
return result.scalars().all()
async def create_user(db: AsyncSession, user: UserCreate):
db_user = User(**user.model_dump())
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_userdef create_user(db: Session, user: UserCreate):
db_user = User(**user.model_dump())
db.add(db_user)
# Забыли db.commit()!
return db_userПроблема: Данные не сохранятся в БД.
Решение: Всегда вызывайте db.commit() после db.add().
def get_db():
db = SessionLocal()
yield db
# Забыли db.close()!Проблема: Утечка соединений с БД.
Решение: Используйте try/finally:
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()posts = db.query(Post).all()
for post in posts:
print(post.author.username) # Запрос к БД для каждого поста!Проблема: 1 запрос для постов + N запросов для авторов.
Решение: Используйте joinedload:
from sqlalchemy.orm import joinedload
posts = db.query(Post).options(joinedload(Post.author)).all()
for post in posts:
print(post.author.username) # Без дополнительных запросовВ следующей теме вы изучите продвинутый Pydantic — валидаторы, сериализация, сложные типы данных.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.