asyncpg, aiomysql, AsyncSession, паттерны асинхронного кода
Асинхронная работа с базой данных позволяет обрабатывать тысячи одновременных запросов без блокировки event loop. В этой теме вы изучите asyncpg, aiomysql и паттерны асинхронного кода.
# Синхронный код (блокирующий)
def get_users():
# Блокировка на время запроса к БД (~10-100ms)
users = session.query(User).all()
return users
# Асинхронный код (неблокирующий)
async def get_users():
# Event loop может обрабатывать другие запросы
users = await session.execute(select(User))
return users.scalars().all()✅ Используйте async когда:
❌ Не используйте async когда:
# PostgreSQL (рекомендуется)
poetry add sqlalchemy asyncpg
# MySQL
poetry add sqlalchemy aiomysql
# SQLite (ограниченная поддержка)
poetry add aiosqlitefrom sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
# PostgreSQL с asyncpg
engine: AsyncEngine = create_async_engine(
'postgresql+asyncpg://user:pass@localhost:5432/dbname',
echo=True, # Логирование SQL
pool_size=10, # Размер пула
max_overflow=20, # Дополнительные соединения
pool_recycle=3600, # Пересоздание через час
pool_pre_ping=True, # Проверка перед использованием
)
# MySQL с aiomysql
engine = create_async_engine(
'mysql+aiomysql://user:pass@localhost:3306/dbname',
echo=True,
)
# SQLite (для тестов)
engine = create_async_engine(
'sqlite+aiosqlite:///test.db',
echo=True,
)| База данных | URL |
|---|---|
| PostgreSQL (asyncpg) | postgresql+asyncpg://user:pass@localhost/db |
| MySQL (aiomysql) | mysql+aiomysql://user:pass@localhost/db |
| SQLite (aiosqlite) | sqlite+aiosqlite:///test.db |
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
# Фабрика сессий
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
autoflush=False,
autocommit=False,
expire_on_commit=False, # Важно для async
)
# Создание сессии
async with AsyncSession(engine) as session:
# Работа с сессией
await session.commit()# database.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
engine = create_async_engine(DATABASE_URL)
async_session_maker = async_sessionmaker(engine, class_=AsyncSession)
async def get_db() -> AsyncSession:
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# main.py (FastAPI)
from fastapi import Depends
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
return userasync with AsyncSession(engine) as session:
# Создание одного объекта
user = User(email='test@example.com', name='Test')
session.add(user)
await session.commit()
await session.refresh(user) # Загрузить id и default значения
# Массовая вставка
users = [
User(email='a@example.com', name='A'),
User(email='b@example.com', name='B'),
]
session.add_all(users)
await session.commit()async with AsyncSession(engine) as session:
# Получить по ID
user = await session.get(User, 1)
# Получить все
result = await session.execute(select(User))
users = result.scalars().all()
# С фильтрацией
result = await session.execute(
select(User).where(User.email == 'test@example.com')
)
user = result.scalar_one_or_none()
# Сортировка и пагинация
result = await session.execute(
select(User)
.order_by(User.created_at.desc())
.limit(10)
.offset(20)
)
users = result.scalars().all()async with AsyncSession(engine) as session:
# Обновление объекта
user = await session.get(User, 1)
if user:
user.name = 'Updated'
await session.commit()
# Массовое обновление
await session.execute(
update(User)
.where(User.active == False)
.values(active=True)
)
await session.commit()async with AsyncSession(engine) as session:
# Удаление объекта
user = await session.get(User, 1)
if user:
await session.delete(user)
await session.commit()
# Массовое удаление
await session.execute(
delete(User).where(User.created_at < datetime(2023, 1, 1))
)
await session.commit()from sqlalchemy.orm import joinedload, selectinload
async with AsyncSession(engine) as session:
# Joinedload (JOIN)
result = await session.execute(
select(User).options(joinedload(User.posts))
)
users = result.unique().scalars().all()
# Selectinload (отдельный SELECT с IN)
result = await session.execute(
select(User).options(selectinload(User.posts))
)
users = result.scalars().all()
# Multiple relationships
result = await session.execute(
select(User)
.options(joinedload(User.profile))
.options(selectinload(User.posts))
.options(selectinload(User.comments))
)
users = result.unique().scalars().all()# ПЛОХО: ленивая загрузка не работает в async
user = await session.get(User, 1)
posts = user.posts # Ошибка! Requires sync I/O
# ХОРОШО: eager loading
result = await session.execute(
select(User).options(selectinload(User.posts))
)
user = result.scalar()
posts = user.posts # Уже загруженоasync with AsyncSession(engine) as session:
async with session.begin():
# Транзакция начинается автоматически
user = User(email='test@example.com', name='Test')
session.add(user)
post = Post(title='Test', user_id=user.id)
session.add(post)
# Commit при успешном выходе
# Rollback при исключенииasync with AsyncSession(engine) as session:
try:
user = User(email='test@example.com', name='Test')
session.add(user)
await session.flush() # Получить id без commit
post = Post(title='Test', user_id=user.id)
session.add(post)
await session.commit()
except Exception:
await session.rollback()
raiseasync with AsyncSession(engine) as session:
async with session.begin():
# Внешняя транзакция
user = User(email='test@example.com', name='Test')
session.add(user)
try:
async with session.begin_nested():
# Внутренняя транзакция (savepoint)
post = Post(title='Test', user_id=user.id)
session.add(post)
# Rollback только этой части при ошибке
except Exception:
pass # Внешняя транзакция продолжается
await session.commit()from sqlalchemy import text
async with AsyncSession(engine) as session:
# Простой запрос
result = await session.execute(
text('SELECT * FROM users WHERE id = :id'),
{'id': 1}
)
row = result.first()
# INSERT
await session.execute(
text('INSERT INTO users (email, name) VALUES (:email, :name)'),
{'email': 'test@example.com', 'name': 'Test'}
)
await session.commit()from sqlalchemy import select, insert, update, delete
async with AsyncSession(engine) as session:
# SELECT
result = await session.execute(
select(User.id, User.email).where(User.active == True)
)
rows = result.all()
# INSERT
await session.execute(
insert(User).values(email='test@example.com', name='Test')
)
await session.commit()
# UPDATE
await session.execute(
update(User).where(User.id == 1).values(name='Updated')
)
await session.commit()engine = create_async_engine(
DATABASE_URL,
pool_size=10, # Количество постоянных соединений
max_overflow=20, # Дополнительные при пике
pool_recycle=3600, # Пересоздание через час
pool_pre_ping=True, # Проверка перед использованием
pool_timeout=30, # Таймаут ожидания соединения
)pool = engine.pool
print(f"Size: {pool.size()}")
print(f"Checked in: {pool.checkedin()}")
print(f"Checked out: {pool.checkedout()}")
print(f"Overflow: {pool.overflow()}")# Используйте context manager для сессий
async with AsyncSession(engine) as session:
await session.commit()
# Используйте expire_on_commit=False
async_session_maker = async_sessionmaker(
engine,
expire_on_commit=False, # Избегает проблем с async
)
# Используйте eager loading для relationships
result = await session.execute(
select(User).options(selectinload(User.posts))
)
# Обрабатывайте ошибки
try:
async with session.begin():
# работа
pass
except Exception:
await session.rollback()
raise
# Закрывайте engine при shutdown
await engine.dispose()# Не используйте ленивую загрузку
user = await session.get(User, 1)
posts = user.posts # Ошибка в async!
# Не создавайте сессию без context manager
session = AsyncSession(engine) # ПЛОХО: может не закрыться
# Не используйте sync драйверы в async коде
engine = create_engine('postgresql+psycopg://...') # ПЛОХО в async
# Не забывайте await
session.execute(select(User)) # ПЛОХО: нет await
await session.execute(select(User)) # ХОРОШОAlembic работает с async через sync подключение:
# alembic/env.py
import asyncio
from sqlalchemy.ext.asyncio import async_engine_from_config
from sqlalchemy import pool
def run_migrations_online():
"""Run migrations in 'online' mode with async engine."""
# Создаём sync engine для миграций
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()В следующей теме вы изучите Query Optimization — execution plans, индексы, решение N+1 проблемы и продвинутые техники оптимизации производительности.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.