SQLAlchemy, asyncpg, миграции Alembic, паттерн Repository, connection pooling
Проблема: Production-ready бот не может работать без базы данных. Пользователи должны регистрироваться, заказы — сохраняться, история — храниться. Но как правильно подключить БД к асинхронному боту?
Типичные ошибки:
- Синхронные вызовы БД блокируют event loop — бот "зависает"
- Соединения не закрываются — утечка ресурсов
- SQL-запросы в обработчиках — невозможно тестировать
- Нет миграций — схема БД рассинхронизирована с кодом
Решение: SQLAlchemy 2.0+ с asyncio, миграции Alembic, паттерн Repository для абстракции доступа к данным.
Зачем это нужно: Правильная интеграция с БД — основа надёжности. Потеря данных = потеря доверия пользователей.
Почему PostgreSQL, а не SQLite?
| Критерий | SQLite | PostgreSQL |
|---|---|---|
| Конкурентность | Блокировка при записи | Параллельные запросы |
| Надёжность | Риск повреждения при сбое | ACID-транзакции, WAL |
| Масштабирование | Один файл | Репликация, шардинг |
| JSON | Ограниченная поддержка | JSONB с индексами |
| Инструменты | Минимум | Alembic, pgAdmin, мониторинг |
💡 Когда SQLite: Прототип, локальная разработка, бот с <100 пользователей в день.
⚠️ Антипаттерн: Использовать SQLite в продакшене при нагрузке >10 запросов в секунду.
Проблема: Обычная SQLAlchemy блокирует event loop при каждом запросе к БД. Бот перестаёт обрабатывать сообщения, пока выполняется запрос.
Решение: sqlalchemy.ext.asyncio — асинхронная обёртка, которая не блокирует event loop.
pip install sqlalchemy[asyncio] asyncpg💡 Зачем asyncpg: Это самый быстрый асинхронный драйвер для PostgreSQL. На 2-3 раза быстрее
asyncpg+ SQLAlchemy чем синхронные аналоги.
# bot/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
class Base(DeclarativeBase):
"""
Базовый класс для моделей.
Зачем:
- Общая конфигурация для всех моделей
- Метаданные для Alembic миграций
"""
pass
engine = create_async_engine(
DATABASE_URL,
echo=False, # Логирование SQL (True для отладки, False для продакшена)
pool_size=10, # Размер пула соединений
max_overflow=20, # Дополнительные соединения при пике
pool_pre_ping=True, # Проверка живости соединений перед использованием
)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False # ⚠️ Важно: объекты не становятся "detached" после commit
)⚠️ Антипаттерн: Не создавайте
engineиsession_factoryв каждом файле. Вынесите в отдельный модуль — это синглтоны.
# bot/models/user.py
from datetime import datetime
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text
from bot.database import Base
class User(Base):
"""
Модель пользователя.
Зачем:
- telegram_id — уникальный идентификатор в Telegram
- created_at/updated_at — аудит действий
- is_premium — для разделения функционала
"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
telegram_id = Column(Integer, unique=True, nullable=False, index=True)
username = Column(String(255), nullable=True)
first_name = Column(String(255), nullable=True)
last_name = Column(String(255), nullable=True)
language_code = Column(String(10), default='ru')
is_premium = Column(Boolean, default=False)
is_banned = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<User {self.telegram_id} ({self.username})>"
# bot/models/order.py
from enum import Enum as PyEnum
from sqlalchemy import Enum, ForeignKey, Numeric, Text
from sqlalchemy.orm import relationship
class OrderStatus(str, PyEnum):
"""
Статусы заказа.
Зачем: Enum предотвращает невалидные значения.
Нельзя создать заказ со статусом 'shippedd' (опечатка).
"""
PENDING = 'pending'
PAID = 'paid'
SHIPPED = 'shipped'
DELIVERED = 'delivered'
CANCELLED = 'cancelled'
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
status = Column(Enum(OrderStatus), default=OrderStatus.PENDING)
total = Column(Numeric(10, 2), default=0)
items = Column(Text) # JSON с товарами
shipping_address = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Связь с пользователем
# 💡 Зачем: order.user даёт доступ к пользователю без дополнительного запроса (при eager loading)
user = relationship('User', backref='orders')⚠️ Антипаттерн: Не храните сложные структуры в Text-поле без валидации JSON. Используйте
JSONBдля PostgreSQL.
Проблема: SQL-запросы разбросаны по обработчикам. Изменили схему — правим десятки файлов. Невозможно протестировать обработчик без реальной БД.
Решение: Repository инкапсулирует доступ к данным. Обработчик работает с абстракцией, не зная о SQL.
# bot/repositories/base.py
from typing import TypeVar, Generic, Type, Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, func, Select
from sqlalchemy.orm import joinedload
ModelType = TypeVar('ModelType', bound='Base')
class BaseRepository(Generic[ModelType]):
"""
Базовый CRUD репозиторий.
Зачем:
- Переиспользование CRUD-операций
- Единый стиль запросов
- Упрощение тестирования (можно замокать)
🔗 Связь с другими темами: Архитектурные паттерны
"""
def __init__(self, model: Type[ModelType], session: AsyncSession):
self.model = model
self.session = session
async def get_by_id(self, id: int, options: list = None) -> Optional[ModelType]:
"""
Получить объект по ID с опциями (eager loading).
💡 Зачем options: При загрузке связанных объектов (order.user)
можно избежать N+1 запросов через joinedload/selectinload.
"""
query = select(self.model).where(self.model.id == id)
if options:
query = query.options(*options)
result = await self.session.execute(query)
return result.scalar_one_or_none()
async def get_all(
self,
limit: int = 100,
offset: int = 0,
order_by: str = 'id',
desc: bool = False
) -> List[ModelType]:
"""
Получить все объекты с пагинацией.
⚠️ Антипаттерн: Не загружайте все записи без limit.
При 100k записей это упадёт по памяти.
"""
order_column = getattr(self.model, order_by)
if desc:
order_column = order_column.desc()
query = select(self.model).order_by(order_column).limit(limit).offset(offset)
result = await self.session.execute(query)
return list(result.scalars().all())
async def count(self) -> int:
"""Получить количество записей."""
query = select(func.count()).select_from(self.model)
result = await self.session.execute(query)
return result.scalar()
async def create(self, **kwargs) -> ModelType:
"""Создать новый объект."""
obj = self.model(**kwargs)
self.session.add(obj)
await self.session.commit()
await self.session.refresh(obj)
return obj
async def update(self, obj: ModelType, **kwargs) -> ModelType:
"""Обновить объект."""
for key, value in kwargs.items():
setattr(obj, key, value)
await self.session.commit()
await self.session.refresh(obj)
return obj
async def delete(self, obj: ModelType) -> None:
"""Удалить объект."""
await self.session.delete(obj)
await self.session.commit()
async def delete_by_id(self, id: int) -> bool:
"""
Удалить по ID. Возвращает True если удалено.
💡 Зачем: Удобно для обработчиков — получил True/False и ответил пользователю.
"""
obj = await self.get_by_id(id)
if obj:
await self.delete(obj)
return True
return False# bot/repositories/user.py
from typing import Optional, List
from sqlalchemy import select, and_
from sqlalchemy.orm import selectinload
from bot.repositories.base import BaseRepository
from bot.models.user import User
class UserRepository(BaseRepository[User]):
"""
Репозиторий для пользователей.
Зачем:
- Специфичные запросы (по telegram_id, username)
- get_or_create — частый паттерн для ботов
"""
def __init__(self, session: AsyncSession):
super().__init__(User, session)
async def get_by_telegram_id(self, telegram_id: int) -> Optional[User]:
"""
Найти пользователя по Telegram ID.
💡 Зачем: Telegram ID — основной идентификатор в боте.
Это самый частый запрос.
"""
query = select(User).where(User.telegram_id == telegram_id)
result = await self.session.execute(query)
return result.scalar_one_or_none()
async def get_by_username(self, username: str) -> Optional[User]:
"""Найти пользователя по username."""
query = select(User).where(User.username == username)
result = await self.session.execute(query)
return result.scalar_one_or_none()
async def get_or_create(self, telegram_id: int, **defaults) -> User:
"""
Получить или создать пользователя.
💡 Зачем: Частый паттерн — пользователь написал /start,
нужно проверить, есть ли он в БД, и создать если нет.
⚠️ Антипаттерн: В продакшене с высокой конкурентностью
возможна race condition. Используйте UPSERT или блокировки.
"""
user = await self.get_by_telegram_id(telegram_id)
if not user:
user = await self.create(telegram_id=telegram_id, **defaults)
return user
async def get_admins(self) -> List[User]:
"""Получить всех администраторов."""
query = select(User).where(User.is_premium == True)
result = await self.session.execute(query)
return list(result.scalars().all())
async def get_premium_users(self) -> List[User]:
"""Получить премиум-пользователей."""
query = select(User).where(User.is_premium == True)
result = await self.session.execute(query)
return list(result.scalars().all())
async def update_premium(self, telegram_id: int, is_premium: bool) -> Optional[User]:
"""Обновить статус премиума."""
user = await self.get_by_telegram_id(telegram_id)
if user:
return await self.update(user, is_premium=is_premium)
return None
async def get_users_with_orders(self, limit: int = 10) -> List[User]:
"""
Получить пользователей с заказами (eager loading).
💡 Зачем selectinload: Без него при доступе к user.orders
будет выполнен дополнительный запрос для каждого пользователя (N+1).
"""
query = (
select(User)
.options(selectinload(User.orders))
.limit(limit)
)
result = await self.session.execute(query)
return list(result.scalars().unique().all())# bot/repositories/order.py
from typing import Optional, List
from datetime import datetime
from sqlalchemy import select, and_, func
from sqlalchemy.orm import selectinload
from bot.repositories.base import BaseRepository
from bot.models.order import Order, OrderStatus
class OrderRepository(BaseRepository[Order]):
"""
Репозиторий для заказов.
Зачем:
- Специфичные запросы (по пользователю, статусу, дате)
- Статистика и агрегации
"""
def __init__(self, session: AsyncSession):
super().__init__(Order, session)
async def get_by_user_id(self, user_id: int) -> List[Order]:
"""
Получить заказы пользователя.
💡 Зачем order_by desc: Последние заказы первыми — удобно для UI.
"""
query = (
select(Order)
.where(Order.user_id == user_id)
.order_by(Order.created_at.desc())
.options(selectinload(Order.user))
)
result = await self.session.execute(query)
return list(result.scalars().unique().all())
async def get_by_status(self, status: OrderStatus) -> List[Order]:
"""Получить заказы по статусу."""
query = select(Order).where(Order.status == status)
result = await self.session.execute(query)
return list(result.scalars().all())
async def get_pending_orders(self) -> List[Order]:
"""
Получить ожидающие заказы.
💡 Зачем: Для обработки фоновым воркером (см. "Масштабирование").
"""
return await self.get_by_status(OrderStatus.PENDING)
async def get_order_with_user(self, order_id: int) -> Optional[Order]:
"""Получить заказ с пользователем."""
query = (
select(Order)
.where(Order.id == order_id)
.options(selectinload(Order.user))
)
result = await self.session.execute(query)
return result.scalar_one_or_none()
async def get_user_total_spent(self, user_id: int) -> float:
"""
Получить общую сумму потраченных средств.
💡 Зачем: Для статистики в профиле пользователя.
"""
query = (
select(func.sum(Order.total))
.where(Order.user_id == user_id)
.where(Order.status == OrderStatus.DELIVERED)
)
result = await self.session.execute(query)
return float(result.scalar() or 0)
async def get_orders_by_date_range(
self,
start_date: datetime,
end_date: datetime
) -> List[Order]:
"""Получить заказы за период."""
query = select(Order).where(
and_(
Order.created_at >= start_date,
Order.created_at <= end_date
)
)
result = await self.session.execute(query)
return list(result.scalars().all())
async def get_statistics_by_status(self) -> dict:
"""
Получить статистику по статусам.
💡 Зачем: Для админ-дашборда.
"""
query = (
select(Order.status, func.count(Order.id))
.group_by(Order.status)
)
result = await self.session.execute(query)
return {status.value: count for status, count in result.all()}Проблема: Создавать сессию БД в каждом обработчике? Это дублирование кода и риск утечки соединений.
Решение: Middleware создаёт сессию и передаёт в обработчики через data.
# bot/middleware/db.py
from typing import Callable, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from aiogram import BaseMiddleware
from aiogram.types import Update
from bot.repositories.user import UserRepository
from bot.repositories.order import OrderRepository
class DatabaseMiddleware(BaseMiddleware):
"""
Middleware для injection сессии БД и репозиториев.
Зачем:
- Автоматическое создание сессии для каждого запроса
- Гарантированное закрытие сессии (даже при ошибке)
- Передача репозиториев в обработчики
🔗 Связь с другими темами: Архитектурные паттерны (DI)
"""
def __init__(self, session_factory: async_sessionmaker):
self.session_factory = session_factory
async def __call__(
self,
handler: Callable,
event: Update,
data: Dict[str, Any]
) -> Any:
# ⚠️ Важно: async with гарантирует закрытие сессии
async with self.session_factory() as session:
# Inject сессии
data['session'] = session
# Inject репозиториев
data['user_repo'] = UserRepository(session)
data['order_repo'] = OrderRepository(session)
return await handler(event, data)
# Регистрация в dispatcher.py
from bot.middleware.db import DatabaseMiddleware
dp.update.middleware(DatabaseMiddleware(async_session_factory))⚠️ Антипаттерн: Не сохраняйте сессию между запросами. Каждая сессия — для одного обновления.
# bot/handlers/user.py
from aiogram import Router, F
from aiogram.types import Message, CallbackQuery
from aiogram.filters import Command
from sqlalchemy.ext.asyncio import AsyncSession
from bot.repositories.user import UserRepository
from bot.repositories.order import OrderRepository
router = Router()
@router.message(Command('profile'))
async def cmd_profile(
msg: Message,
session: AsyncSession,
user_repo: UserRepository,
order_repo: OrderRepository
):
"""
Показать профиль пользователя.
💡 Зачем так много параметров:
- session — для транзакций (если нужно)
- user_repo — операции с пользователями
- order_repo — операции с заказами
"""
telegram_id = msg.from_user.id
# Получаем или создаём пользователя
# 💡 get_or_create — частый паттерн для ботов
user = await user_repo.get_or_create(
telegram_id=telegram_id,
username=msg.from_user.username,
first_name=msg.from_user.first_name
)
# Получаем статистику
total_spent = await order_repo.get_user_total_spent(user.id)
orders_count = len(await order_repo.get_by_user_id(user.id))
text = f"""
👤 Профиль
ID: {user.telegram_id}
Имя: {user.first_name or 'Не указано'}
Статус: {'💎 Premium' if user.is_premium else 'Standard'}
📊 Статистика:
Заказы: {orders_count}
Потрачено: {total_spent} ₽
"""
await msg.answer(text)
@router.message(Command('orders'))
async def cmd_orders(msg: Message, order_repo: OrderRepository):
"""Показать последние заказы."""
user_id = msg.from_user.id
orders = await order_repo.get_by_user_id(user_id)
if not orders:
await msg.answer('У вас пока нет заказов.')
return
text = '📦 Ваши заказы:\n\n'
for order in orders[:5]: # Последние 5
text += f"#{order.id} — {order.status.value}: {order.total} ₽\n"
await msg.answer(text)Проблема: Изменили модель (добавили поле) — как обновить схему БД на продакшене без потери данных?
Решение: Alembic — система версионирования и миграции схемы БД.
pip install alembic
alembic init alembic# alembic.ini
[alembic]
script_location = alembic
sqlalchemy.url = postgresql+asyncpg://user:password@localhost/dbname
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S⚠️ Важно: Стандартный env.py не работает с async. Нужна специальная настройка.
# alembic/env.py
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from bot.database import Base
from bot.models.user import User
from bot.models.order import Order
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Запуск миграций в offline-режиме (без подключения к БД)."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Запуск миграций в async-режиме."""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Запуск миграций в online-режиме."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()# Автосоздание миграции на основе моделей
alembic revision --autogenerate -m "Initial migration: users and orders"
# Применение миграций
alembic upgrade head
# Откат миграции
alembic downgrade -1
# Просмотр истории
alembic history
# Проверка статуса
alembic current💡 Совет: Всегда проверяйте сгенерированную миграцию перед
upgrade head. Alembic может пропустить изменения.
# alembic/versions/001_initial.py
"""Initial migration: users and orders
Revision ID: 001
Revises:
Create Date: 2026-03-05 10:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = '001'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Таблица пользователей
op.create_table(
'users',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('telegram_id', sa.Integer(), nullable=False),
sa.Column('username', sa.String(length=255), nullable=True),
sa.Column('first_name', sa.String(length=255), nullable=True),
sa.Column('last_name', sa.String(length=255), nullable=True),
sa.Column('language_code', sa.String(length=10), nullable=True),
sa.Column('is_premium', sa.Boolean(), nullable=True),
sa.Column('is_banned', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('telegram_id')
)
op.create_index(op.f('ix_users_telegram_id'), 'users', ['telegram_id'], unique=False)
# Таблица заказов
op.create_table(
'orders',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('status', sa.Enum('pending', 'paid', 'shipped', 'delivered', 'cancelled', name='orderstatus'), nullable=True),
sa.Column('total', sa.Numeric(precision=10, scale=2), nullable=True),
sa.Column('items', sa.Text(), nullable=True),
sa.Column('shipping_address', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
"""
⚠️ Важно: downgrade должен быть безопасным.
Не удаляйте данные в downgrade для продакшена.
"""
op.drop_table('orders')
op.drop_index(op.f('ix_users_telegram_id'), table_name='users')
op.drop_table('users')Проблема: Создание нового соединения с БД для каждого запроса — медленно (рукопожатие, аутентификация).
Решение: Пул соединений — готовые соединения переиспользуются.
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # Базовое количество соединений в пуле
max_overflow=40, # Дополнительные соединения при пике
pool_timeout=30, # Таймаут ожидания свободного соединения
pool_recycle=3600, # Пересоздание соединений через час (защита от stale connections)
pool_pre_ping=True, # Проверка перед использованием (защита от разорванных соединений)
)💡 Как подобрать pool_size:
- Начните с
pool_size = 10- Мониторьте
pool.checkedout()(см. "Мониторинг")- Увеличивайте, если видите таймауты
import logging
logger = logging.getLogger(__name__)
async def check_pool_status():
"""
Проверка статуса пула соединений.
💡 Зачем: Для отладки проблем с производительностью.
"""
pool = engine.pool
logger.info(f"Pool size: {pool.size()}")
logger.info(f"Checked in: {pool.checkedin()}") # Свободные
logger.info(f"Checked out: {pool.checkedout()}") # Используются
logger.info(f"Overflow: {pool.overflow()}") # Дополнительные🔗 Связь с другими темами: Интегрируйте с Prometheus для графика в Grafana (см. "Мониторинг").
Проблема: Нужно создать заказ и элементы заказа атомарно. Если элементы не создались — заказ тоже не нужен.
Решение: Транзакции обеспечивают атомарность операций.
from sqlalchemy.ext.asyncio import AsyncSession
async def create_order_with_items(
session: AsyncSession,
user_id: int,
items: list,
total: float
):
"""
Создание заказа с элементами в транзакции.
💡 Зачем:
- session.begin() автоматически commit при успехе
- rollback при ошибке
- не нужно явно вызывать commit/rollback
"""
async with session.begin():
# Создаём заказ
order = Order(
user_id=user_id,
total=total,
status=OrderStatus.PENDING
)
session.add(order)
await session.flush() # Получаем ID заказа
# Создаём элементы заказа
for item in items:
order_item = OrderItem(
order_id=order.id,
**item
)
session.add(order_item)
# При ошибке всё откатится автоматически
return orderasync def manual_transaction(session: AsyncSession):
"""
Ручное управление транзакцией.
💡 Зачем: Когда нужна сложная логика обработки ошибок.
"""
async with session.begin():
try:
# Операции
...
await session.commit()
except Exception:
await session.rollback()
raise⚠️ Антипаттерн: Не оставляйте транзакции открытыми. Всегда используйте
async withдля гарантии закрытия.
| Тема | Как связана |
|---|---|
| Архитектура | Repository Pattern — часть сервисного слоя |
| Middleware | DatabaseMiddleware inject сессии и репозитории |
| Масштабирование | Connection pooling критичен при высокой нагрузке |
| Мониторинг | Метрики пула соединений, медленные запросы |
| Тестирование | SQLite in-memory для быстрых unit-тестов |
→ Масштабирование — очереди задач для фоновой обработки БД
→ Мониторинг — метрики запросов, алерты на медленные операции
→ Тестирование — изолированные тесты репозиториев
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.