Execution plans, индексы, решение N+1 проблемы
Производительность запросов — критический навык для работы с базами данных. В этой теме вы изучите execution plans, индексы, решение N+1 проблемы и продвинутые техники оптимизации.
PostgreSQL предоставляет детальную информацию о выполнении запросов:
from sqlalchemy import text
# Получить план выполнения
result = session.execute(
text("EXPLAIN ANALYZE SELECT * FROM users WHERE email = 'test@example.com'")
)
plan = result.all()
for row in plan:
print(row[0])Seq Scan on users (cost=0.00..18.50 rows=1 width=100) (actual time=0.123..0.456 ms rows=1 loops=1)
Filter: (email = 'test@example.com'::text)
Rows Removed by Filter: 9999
Planning Time: 0.234 ms
Execution Time: 0.512 ms
| Параметр | Описание |
|---|---|
Seq Scan | Последовательное сканирование (чтение всей таблицы) |
Index Scan | Сканирование по индексу |
Index Only Scan | Только индекс (данные не читаются) |
cost=0.00..18.50 | Стоимость (startup..total) |
rows=1 | Ожидаемое количество строк |
actual time=0.123..0.456 | Реальное время (startup..total) |
loops=1 | Сколько раз выполнен узел |
# Seq Scan — полное сканирование таблицы (медленно для больших таблиц)
# SELECT * FROM users WHERE email = 'test@example.com' # Без индекса
# Index Scan — сканирование по индексу с чтением таблицы
# SELECT * FROM users WHERE email = 'test@example.com' # С индексом
# Index Only Scan — только индекс (быстрее всего)
# SELECT email FROM users WHERE email = 'test@example.com' # Покрывающий индексfrom sqlalchemy import create_engine, event
engine = create_engine('postgresql+psycopg://...', echo=True)
@event.listens_for(engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
# Включить EXPLAIN ANALYZE для отладки
if os.getenv('DEBUG_QUERIES'):
cursor.execute(f"EXPLAIN ANALYZE {statement}")
print(cursor.fetchall())from sqlalchemy import Index
# В модели
class User(Base):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), index=True) # Простой индекс
name: Mapped[str] = mapped_column(String(100))
# Составной индекс
__table_args__ = (
Index('ix_users_name_email', 'name', 'email'),
)
# Через Alembic
def upgrade():
op.create_index('ix_users_email', 'users', ['email'], unique=False)
# Составной индекс
op.create_index('ix_users_name_email', 'users', ['name', 'email'])
# Уникальный индекс
op.create_index('ix_users_email', 'users', ['email'], unique=True)# B-tree (по умолчанию) — для =, <, >, BETWEEN, ORDER BY
op.create_index('ix_users_email', 'users', ['email'])
# GIN — для JSONB, массивов, полнотекстового поиска
op.create_index('ix_users_data', 'users', ['data'], postgresql_using='gin')
# GiST — для геоданных, диапазонов
op.create_index('ix_users_location', 'users', ['location'], postgresql_using='gist')
# Hash — только для = (редко используется)
op.create_index('ix_users_hash', 'users', ['email'], postgresql_using='hash')
# BRIN — для очень больших таблиц с естественным порядком
op.create_index('ix_users_created_at', 'users', ['created_at'], postgresql_using='brin')# INCLUDE (PostgreSQL 11+) — данные в индексе для Index Only Scan
op.create_index(
'ix_users_email_include',
'users',
['email'],
postgresql_include=['name', 'id'] # Колонки только для возврата
)
# Запрос использует Index Only Scan:
# SELECT email, name, id FROM users WHERE email = 'test@example.com'# Индекс только для части строк
op.create_index(
'ix_users_active',
'users',
['email'],
postgresql_where=sa.text('active = true')
)
# Полезно когда:
# - Таблица большая, но выбирается малая часть
# - Есть много NULL значений# Индекс по выражению
op.create_index(
'ix_users_email_lower',
'users',
[sa.func.lower('email')] # Index on lower(email)
)
# Запрос использует индекс:
# SELECT * FROM users WHERE lower(email) = 'test@example.com'# ПЛОХО: N+1 запросов
users = session.query(User).all() # 1 запрос
for user in users:
print(user.posts) # N запросов (по одному на пользователя)
# Итого: 1 + N запросов# joinedload — через JOIN
from sqlalchemy.orm import joinedload
users = session.execute(
select(User).options(joinedload(User.posts))
).unique().scalars().all()
# 1 запрос с JOIN
# selectinload — отдельный SELECT с IN
from sqlalchemy.orm import selectinload
users = session.execute(
select(User).options(selectinload(User.posts))
).scalars().all()
# 2 запроса: SELECT users + SELECT posts WHERE user_id IN (...)
# subqueryload — через подзапрос
from sqlalchemy.orm import subqueryload
users = session.execute(
select(User).options(subqueryload(User.posts))
).scalars().all()
# 2 запроса: SELECT users + SELECT posts с подзапросом| Стратегия | Запросов | Когда использовать |
|---|---|---|
joinedload | 1 (JOIN) | One-to-one, мало дочерних объектов |
selectinload | 2 (SELECT + IN) | One-to-many, много дочерних |
subqueryload | 2 (SELECT + подзапрос) | Сложные фильтры, вложенные связи |
# ПЛОХО: N+1 с фильтрацией
users = session.query(User).all()
for user in users:
recent_posts = [p for p in user.posts if p.created_at > last_week]
# ХОРОШО: загрузить с фильтрацией
from sqlalchemy.orm import selectinload
users = session.execute(
select(User).options(
selectinload(User.posts).where(Post.created_at > last_week)
)
).scalars().all()# Загрузка вложенных relationships
from sqlalchemy.orm import selectinload
stmt = select(User).options(
selectinload(User.posts).selectinload(Post.comments)
)
users = session.execute(stmt).scalars().all()
# 3 запроса: users + posts + commentsfrom sqlalchemy import create_engine
engine = create_engine(
'postgresql+psycopg://...',
pool_size=10, # Постоянные соединения
max_overflow=20, # Дополнительные при пике
pool_timeout=30, # Ждать соединения 30 сек
pool_recycle=3600, # Пересоздавать через час
pool_pre_ping=True, # Проверять перед использованием
)pool = engine.pool
print(f"Size: {pool.size()}") # Общее количество
print(f"Checked in: {pool.checkedin()}") # Доступные
print(f"Checked out: {pool.checkedout()}") # Используется
print(f"Overflow: {pool.overflow()}") # Сверх pool_size
print(f"Invalid: {pool.invalid()}") # Нерабочиеfrom sqlalchemy import event
@event.listens_for(engine, "connect")
def on_connect(dbapi_conn, connection_record):
print("New connection created")
@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
print("Connection checked out from pool")
@event.listens_for(engine, "checkin")
def on_checkin(dbapi_conn, connection_record, connection_proxy):
print("Connection returned to pool")# Обычная вставка (медленно для больших объёмов)
for i in range(10000):
user = User(email=f'user{i}@example.com', name=f'User {i}')
session.add(user)
session.commit()
# ~10000 INSERT запросов
# Bulk insert mappings (быстро)
session.bulk_insert_mappings(User, [
{'email': f'user{i}@example.com', 'name': f'User {i}'}
for i in range(10000)
])
session.commit()
# 1 INSERT запрос с VALUES
# Bulk save objects
users = [User(email=f'user{i}@example.com', name=f'User {i}') for i in range(10000)]
session.bulk_save_objects(users)
session.commit()# Обычное обновление
users = session.query(User).filter(User.active == False).all()
for user in users:
user.active = True
session.commit()
# N UPDATE запросов + 1 SELECT
# Bulk update mappings
session.bulk_update_mappings(User, [
{'id': id, 'active': True}
for id in inactive_user_ids
])
session.commit()
# 1 UPDATE запрос
# Core update
session.execute(
update(User).where(User.active == False).values(active=True)
)
session.commit()
# 1 UPDATE запрос# Обработка чанками
from sqlalchemy import select
def process_large_table(session, batch_size=1000):
offset = 0
while True:
stmt = select(User).limit(batch_size).offset(offset)
users = session.execute(stmt).scalars().all()
if not users:
break
for user in users:
process_user(user)
session.commit() # Коммитить каждый чанк
offset += batch_size
# Использование
with Session(engine) as session:
process_large_table(session)# ПЛОХО: загружает все колонки
users = session.query(User).all()
# ХОРОШО: только нужные колонки
from sqlalchemy.orm import load_only
users = session.execute(
select(User).options(load_only(User.id, User.email))
).scalars().all()
# Или через Core
result = session.execute(
select(User.id, User.email)
).all()# ПЛОХО: медленно для больших подзапросов
from sqlalchemy import select
subq = select(Post.user_id).where(Post.published == True)
stmt = select(User).where(User.id.in_(subq))
# ХОРОШО: EXISTS эффективнее
from sqlalchemy import exists
subq = exists().where(Post.user_id == User.id, Post.published == True)
stmt = select(User).where(subq)# ПЛОХО: медленный COUNT для больших таблиц
count = session.query(User).count()
# ХОРОШО: приблизительный count из системных таблиц (PostgreSQL)
result = session.execute(
text("SELECT reltuples::bigint FROM pg_class WHERE relname = 'users'")
)
approx_count = result.scalar()
# ХОРОШО: count по индексу
count = session.execute(
select(func.count(User.id)) # id — primary key, индекс
).scalar()# Сложный запрос с подзапросами
from sqlalchemy import select
# ПЛОХО: сложно читать и оптимизировать
stmt = select(User).where(
User.id.in_(
select(Post.user_id).where(
Post.id.in_(
select(Comment.post_id).where(Comment.created_at > last_week)
)
)
)
)
# ХОРОШО: CTE для читаемости
from sqlalchemy import select
recent_comments = select(Comment.post_id).where(
Comment.created_at > last_week
).cte('recent_comments')
recent_posts = select(Post.user_id).where(
Post.id.in_(select(recent_comments.c.post_id))
).cte('recent_posts')
stmt = select(User).where(User.id.in_(select(recent_posts.c.user_id)))# Включить логирование всех запросов
engine = create_engine('postgresql://...', echo=True)
# Логирование с уровнем
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
# Только медленные запросы
from sqlalchemy import event
import time
@event.listens_for(engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
context._query_start_time = time.time()
@event.listens_for(engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - context._query_start_time
if total > 1.0: # Логируем запросы > 1 секунды
print(f"Slow query ({total:.3f}s): {statement}")# Включить pg_stat_statements
# shared_preload_libraries = 'pg_stat_statements'
# Топ медленных запросов
result = session.execute(text("""
SELECT query, calls, total_exec_time, mean_exec_time
FROM pg_stat_statements
ORDER BY mean_exec_time DESC
LIMIT 10
"""))
for row in result:
print(row)# Используйте EXPLAIN ANALYZE для отладки
result = session.execute(text("EXPLAIN ANALYZE " + str(stmt)))
# Создавайте индексы для часто используемых WHERE и JOIN
op.create_index('ix_users_email', 'users', ['email'])
# Используйте eager loading для relationships
selectinload(User.posts)
# Обрабатывайте большие данные чанками
for batch in range(0, total, BATCH_SIZE):
...
# Используйте bulk операции для массовых вставок
session.bulk_insert_mappings(User, data)# Не используйте N+1
for user in users:
user.posts # ПЛОХО
# Не создавайте индексы без анализа
op.create_index('ix_users_name', 'users', ['name']) # Без анализа запросов
# Не используйте SELECT *
session.query(User).all() # Загружает все колонки
# Не забывайте про транзакции при bulk операциях
session.bulk_insert_mappings(...) # Без session.commit()В следующей теме вы изучите Sessions и Unit of Work — глубокое понимание transaction management, isolation levels, session patterns и продвинутые техники работы с сессиями.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.