select(), фильтрация, агрегации, оконные функции
В продакшене 90% проблем с производительностью — это не «сложные запросы», а неправильный выбор инструмента для простой задачи. Здесь вы научитесь не тому, как писать запросы, а тому, какой запрос выбрать и почему.
SQLAlchemy 2.0 унифицировала Core и ORM. Раньше session.query(Model) и select(Model) жили параллельно — теперь query() — это legacy-обёртка над select().
Почему это важно:
select() — это объект Core, который понимает ORM. Его можно композить, передавать между функциями, сериализовать логироватьquery() — это ORM-специфичный API, который несовместим с Core-паттернамиselect() работает одинаково в sync и async сессиях# ❌ Legacy: нельзя передать в функцию логирования как SQL
users = session.query(User).filter_by(active=True).all()
# ✅ Можно логировать, композить, переиспользовать
stmt = select(User).where(User.active == True)
logger.info("Executing: %s", stmt.compile(compile_kwargs={"literal_binds": True}))
users = session.execute(stmt).scalars().all()Когда использовать query(): только в legacy-коде. В новом — всегда select().
В реальном API фильтры часто приходят из query-параметров. Наивная реализация:
# ❌ DYNAMIC FILTERS ANTI-PATTERN
def search_users(request: Request):
stmt = select(User)
if name := request.query_params.get("name"):
# f-string = SQL injection risk + no plan caching
stmt = stmt.where(User.name == f"{name}")
if min_age := request.query_params.get("min_age"):
stmt = stmt.where(User.age >= int(min_age))
return session.execute(stmt).scalars().all()Почему это плохо:
name не эскейпится# ✅ DYNAMIC FILTERS: правильное решение
def search_users(request: Request):
stmt = select(User)
conditions = []
params = {}
if name := request.query_params.get("name"):
conditions.append(User.name == bindparam("name"))
params["name"] = name
if min_age := request.query_params.get("min_age"):
conditions.append(User.age >= bindparam("min_age"))
params["min_age"] = int(min_age)
if conditions:
stmt = stmt.where(and_(*conditions))
return session.execute(stmt, params).scalars().all()Что изменилось:
params из тестов без моков HTTPlike() vs ilike() vs contains()# PostgreSQL: ilike использует GIN-индекс, like — b-tree
# Если на email есть GIN-индекс для поиска:
# CREATE INDEX idx_users_email_gin ON users USING gin(email gin_trgm_ops)
# ✅ Использует GIN-индекс в PostgreSQL
stmt = select(User).where(User.email.ilike("%@gmail.com"))
# ❌ Не использует GIN-индекс (зависит от collation)
stmt = select(User).where(User.email.like("%@gmail.com"))
# ❌ То же что like(), но менее читаемо
stmt = select(User).where(User.email.contains("@gmail"))Правило: для case-insensitive поиска используйте ilike() + GIN-индекс. Для case-sensitive — like() + b-tree.
IS NULL через == None# ❌ Работает, но создаёт предупреждение в SQLAlchemy 2.0
stmt = select(User).where(User.deleted_at == None)
# ✅ Явно и понятно
stmt = select(User).where(User.deleted_at.is_(None))
# ✅ Для soft-delete паттерна
def active_users_only():
return User.deleted_at.is_(None)
stmt = select(User).where(active_users_only())# ❌ При 1M записей — filesort в памяти/на диске
stmt = select(User).order_by(User.created_at.desc()).limit(20)EXPLAIN покажет: Using filesort — это сортировка всех записей перед LIMIT.
-- Решение: индекс по сортируемой колонке
CREATE INDEX idx_users_created_at ON users(created_at DESC);# ✅ С индексом — берёт первые 20 записей из индекса без сортировки
stmt = select(User).order_by(User.created_at.desc()).limit(20)# ❌ Нужен композитный индекс для обоих колонок
stmt = select(User).order_by(User.country.asc(), User.created_at.desc())
-- Создаём композитный индекс с правильным порядком сортировки
CREATE INDEX idx_users_country_created
ON users(country ASC, created_at DESC);Правило: порядок колонок в order_by() должен совпадать с порядком в индексе.
# У одного пользователя 5 постов
# ❌ Вернёт 5 строк с одинаковым User
stmt = select(User).join(Post, User.id == Post.user_id)
# В результате: User(id=1), User(id=1), User(id=1), User(id=1), User(id=1)
# session.execute(stmt).scalars().all() → 5 одинаковых объектовРешения:
# Решение 1: unique() — убирает дубликаты на уровне SQLAlchemy
stmt = select(User).join(Post, User.id == Post.user_id)
users = session.execute(stmt).unique().scalars().all()
# Решение 2: DISTINCT — убирает дубликаты на уровне БД
from sqlalchemy import distinct
stmt = select(distinct(User.id), User).join(Post, User.id == Post.user_id)
# Но это сложнее в работе с ORM-объектами
# Решение 3: EXISTS — если нужны только пользователи с постами
from sqlalchemy import exists
stmt = select(User).where(
exists().where(Post.user_id == User.id)
)
# Нет дубликатов, потому что нет JOINКогда что использовать:
JOIN + unique() — когда нужны данные из связанной таблицыEXISTS — когда нужна только фильтрация (пользователи, у которых есть посты)IN с подзапросом — когда фильтр по агрегации (пользователи с >10 постами)# ✅ Через relationship (рекомендуется)
stmt = select(User).join(User.posts)
# ❌ Явное условие — дублирует логику из relationship
stmt = select(User).join(Post, User.id == Post.user_id)Почему через relationship лучше:
primaryjoin в relationship, все запросы обновятся автоматическиКогда нужно явное условие:
# JOIN к users как к "автору" и "редактору"
AuthorAlias = aliased(User)
stmt = (
select(Post, AuthorAlias)
.join(AuthorAlias, Post.editor_id == AuthorAlias.id)
)# ❌ N+1: 1 запрос на пользователей + N запросов на посты
users = session.execute(select(User)).scalars().all()
for user in users:
print(user.posts) # Каждый доступ — отдельный запрос к БДРешения:
# Решение 1: joinedload — LEFT OUTER JOIN в одном запросе
stmt = select(User).options(joinedload(User.posts))
users = session.execute(stmt).unique().scalars().all()
# Когда использовать: связь один-ко-многим, нужно загрузить всё сразу
# Минус: если у пользователя 1000 постов — огромный результат
# Решение 2: selectinload — отдельный запрос с IN
stmt = select(User).options(selectinload(User.posts))
users = session.execute(stmt).scalars().all()
# Когда использовать: коллекция может быть большой
# Плюс: два эффективных запроса вместо одного огромного JOIN
# Решение 3: subqueryload — подзапрос для ID
stmt = select(User).options(subqueryload(User.posts))
users = session.execute(stmt).scalars().all()
# Когда использовать: сложная фильтрация в связанной таблицеПравило выбора:
joinedload — для many-to-one и one-to-one (не дублирует данные)selectinload — для one-to-many (оптимально для больших коллекций)subqueryload — для сложных подзапросов в связанных таблицах# ❌ При OFFSET 100000 база данных сканирует и отбрасывает 100000 строк
stmt = select(User).offset(100000).limit(20)Почему медленно: БД должна прочитать 100020 строк, отбросить первые 100000, вернуть 20.
Решение: курсорная пагинация (keyset pagination)
# ✅ Курсорная пагинация — использует индекс вместо OFFSET
def get_page(last_seen_id: int | None, per_page: int = 20):
stmt = select(User).order_by(User.id)
if last_seen_id is not None:
stmt = stmt.where(User.id > last_seen_id)
return session.execute(stmt.limit(per_page + 1)).scalars().all()
# Страница 1
page1 = get_page(last_seen_id=None)
# Страница 2 (продолжаем с последнего ID)
page2 = get_page(last_seen_id=page1[-1].id)Сравнение:
| Метод | Страница 1 | Страница 1000 | Страница 100000 |
|---|---|---|---|
| OFFSET | 1ms | 50ms | 2000ms |
| Cursor | 1ms | 1ms | 1ms |
Когда использовать курсорную пагинацию:
Когда OFFSET допустим:
# ❌ PostgreSQL считает все строки каждый раз
stmt = select(func.count(User.id))
count = session.execute(stmt).scalar()
# На 10M строк — 2-5 секундПочему медленно: PostgreSQL должен просканировать индекс или таблицу для точного подсчёта.
Решения:
# Решение 1: Приблизительный счёт из pg_class (только PostgreSQL)
from sqlalchemy import text
stmt = text("""
SELECT reltuples::bigint as estimate
FROM pg_class
WHERE relname = 'users'
""")
estimate = session.execute(stmt).scalar() # Мгновенно, точность ~90%
# Решение 2: Кеширование счётчика
class Counter(Base):
__tablename__ = "counters"
entity_type = Column(String, primary_key=True)
count = Column(BigInteger, default=0)
def get_user_count():
counter = session.get(Counter, "user")
if not counter or counter.stale():
counter = Counter(entity_type="user", count=calculate_exact_count())
session.add(counter)
return counter.count# ❌ Несколько запросов для разной фильтрации
active_count = session.execute(
select(func.count(User.id)).where(User.active == True)
).scalar()
inactive_count = session.execute(
select(func.count(User.id)).where(User.active == False)
).scalar()Решение: условная агрегация в одном запросе
from sqlalchemy import case
# ✅ Один запрос вместо двух
stmt = select(
func.count(case((User.active == True, 1))).label("active"),
func.count(case((User.active == False, 1))).label("inactive"),
func.count(User.id).label("total")
)
result = session.execute(stmt).one()
# result.active, result.inactive, result.totalКогда использовать: дашборды, отчёты, метрики — когда нужно несколько агрегатов с разными условиями.
Задача: показать топ-3 поста из каждой категории.
# ❌ Без оконных функций: N запросов или сложный self-JOIN
for category in categories:
top_posts = session.execute(
select(Post).where(Post.category_id == category.id)
.order_by(Post.rating.desc()).limit(3)
).scalars().all()# ✅ Оконная функция — один запрос
from sqlalchemy import func, select
ranked_posts = (
select(
Post.id,
Post.title,
Post.category_id,
Post.rating,
func.rank().over(
partition_by=Post.category_id,
order_by=Post.rating.desc()
).label("rank")
)
.subquery()
)
stmt = select(ranked_posts).where(ranked_posts.c.rank <= 3)
top_posts_per_category = session.execute(stmt).all()Почему оконная функция лучше:
Задача: график роста пользователей по дням.
# ✅ SUM() OVER для накопительной суммы
daily_signups = (
select(
func.date(User.created_at).label("signup_date"),
func.count(User.id).label("new_users"),
func.sum(func.count(User.id)).over(
order_by=func.date(User.created_at)
).label("total_users")
)
.group_by(func.date(User.created_at))
.order_by(func.date(User.created_at))
)
# Результат:
# date | new_users | total_users
# 2024-01-01 | 10 | 10
# 2024-01-02 | 15 | 25 ← 10 + 15
# 2024-01-03 | 8 | 33 ← 25 + 8Когда использовать: финансовые отчёты, метрики роста, когортный анализ.
Задача: найти дни с аномальным скачком регистраций.
# ✅ LAG для сравнения с предыдущим днём
from sqlalchemy import func, select, literal
daily = (
select(
func.date(User.created_at).label("signup_date"),
func.count(User.id).label("new_users"),
func.lag(func.count(User.id)).over(
order_by=func.date(User.created_at)
).label("prev_day_users")
)
.group_by(func.date(User.created_at))
.subquery()
)
# Найти дни где рост > 50%
stmt = select(daily).where(
(daily.c.new_users - daily.c.prev_day_users) * 100 / daily.c.prev_day_users > 50
)
anomalies = session.execute(stmt).all()CTT — это не «оптимизация», это инструмент читаемости. PostgreSQL до версии 12 материализовал CTE всегда (создавал временную таблицу), что могло замедлять запрос.
# ❌ CTE здесь не нужен — простой подзапрос эффективнее
cte = select(Post.user_id).where(Post.published == True).cte("active_authors")
stmt = select(User).where(User.id.in_(select(cte.c.user_id)))
# ✅ Эквивалентный подзапрос — оптимизатор может его развернуть
subq = select(Post.user_id).where(Post.published == True)
stmt = select(User).where(User.id.in_(subq))# ✅ CTE когда: 1) используется несколько раз, 2) сложная логика
active_authors = (
select(Post.user_id, func.count(Post.id).label("post_count"))
.where(Post.published == True)
.group_by(Post.user_id)
.having(func.count(Post.id) > 5)
.cte("active_authors")
)
# Используем CTE дважды в одном запросе
stmt = select(User).join(
active_authors, User.id == active_authors.c.user_id
).where(
active_authors.c.post_count > 10
)Правила:
# ❌ IN — подзапрос выполняется целиком, даже если нашёл совпадение
subq = select(Post.user_id).where(Post.published == True)
stmt = select(User).where(User.id.in_(subq))
# ✅ EXISTS — останавливается на первом совпадении (быстрее для больших подзапросов)
from sqlalchemy import exists
subq = exists().where(Post.user_id == User.id, Post.published == True)
stmt = select(User).where(subq)Когда что использовать:
EXISTS — когда подзапрос может вернуть много строк (останавливается рано)IN — когда подзапрос маленький и конкретный (IN [1, 2, 3])# При joinedload SQLAlchemy делает OUTER JOIN
# У пользователя 3 поста → 3 строки в результате → 3 копии User
stmt = select(User).options(joinedload(User.posts))
# ❌ Без unique() — дубликаты объектов
users = session.execute(stmt).scalars().all()
# [User(1), User(1), User(1), User(2), User(2)]
# ✅ С unique() — дедупликация
users = session.execute(stmt).unique().scalars().all()
# [User(1), User(2)]Почему SQLAlchemy не делает unique() автоматически:
unique() — операция в памяти, которая может быть дорогой для больших результатов# scalar() — одно значение из одной строки
stmt = select(func.count(User.id))
count = session.execute(stmt).scalar() # 42
# scalars() — колонка из каждой строки
stmt = select(User.name)
names = session.execute(stmt).scalars().all() # ["Alice", "Bob", ...]
# one() — ровно одна строка, ошибка если 0 или >1
stmt = select(User).where(User.id == 1)
user = session.execute(stmt).scalars().one()
# NoResultFound если нет, MultipleResultsFound если >1
# one_or_none() — одна строка или None
user = session.execute(stmt).scalars().one_or_none()
# first() — первая строка или None (не проверяет уникальность)
user = session.execute(stmt).scalars().first()Когда что использовать:
scalar() — для COUNT, SUM, single valuescalars().all() — для списка объектов/значенийone() — когда ожидаете ровно один результат (get by ID)one_or_none() — когда результат может не существоватьfirst() — когда нужна первая запись без гарантий уникальности# Обычная вставка — Unit of Work отслеживает каждый объект
for user_data in users_data:
user = User(**user_data)
session.add(user)
# session.flush() — INSERT для каждого + отслеживание состояния
session.commit()
# Bulk вставка — один INSERT ALL VALUES
session.bulk_insert_mappings(User, users_data)
# Один запрос: INSERT INTO users (email, name) VALUES (...), (...), (...)Компромиссы bulk операций:
| Аспект | Обычная вставка | Bulk вставка |
|---|---|---|
| Скорость (1000 записей) | ~5 сек | ~0.1 сек |
| ID после вставки | ✅ Установлены | ❌ Не установлены |
| Events (before_insert) | ✅ Срабатывают | ❌ Не срабатывают |
| Отслеживание изменений | ✅ Работает | ❌ Отключено |
| Memory usage | Высокое | Низкое |
# ✅ Импорт данных из CSV/external API
session.bulk_insert_mappings(User, csv_rows)
# ✅ Миграция данных между таблицами
session.bulk_insert_mappings(ArchiveTable, old_records)
# ❌ НЕ использовать когда нужны events или ID
for data in important_data:
obj = Model(**data) # Нужен ID и events
session.add(obj)
session.flush() # ID теперь установлен
process_after_save(obj) # Зависит от IDfrom sqlalchemy import insert
# ✅ Быстрая вставка с возвратом ID (PostgreSQL RETURNING)
stmt = insert(User).returning(User.id)
result = session.execute(stmt, users_data)
ids = result.scalars().all() # [1, 2, 3, ...]
# ❌ Без returning ID не будут доступны
session.bulk_insert_mappings(User, users_data)
# ID = None у всех объектовПравило: если нужны ID — используйте insert().returning(). Если нет — bulk_insert_mappings() быстрее.
# ❌ Каждый раз новый SQL — нет кеширования плана
for user_id in user_ids:
stmt = select(User).where(User.id == user_id)
session.execute(stmt)
# ✅ Один план, много выполнений
stmt = select(User).where(User.id == bindparam("uid"))
for user_id in user_ids:
session.execute(stmt, {"uid": user_id})Почему важно: база данных компилирует план выполнения один раз и переиспользует. Для сложных запросов с JOIN — экономия 10-50ms на каждом вызове.
from sqlalchemy import insert, update
# Массовая вставка
stmt = insert(User)
session.execute(stmt, [
{"email": "a@test.com", "name": "Alice"},
{"email": "b@test.com", "name": "Bob"},
])
# Массовое обновление
stmt = update(User).where(User.id == bindparam("id")).values(
name=bindparam("name")
)
session.execute(stmt, [
{"id": 1, "name": "Alice Updated"},
{"id": 2, "name": "Bob Updated"},
])# ✅ Сложный аналитический запрос, который невозможно выразить через ORM
from sqlalchemy import text
stmt = text("""
SELECT
date_trunc('month', created_at) as month,
count(*) as signups,
count(*) FILTER (WHERE activated = true) as activated
FROM users
GROUP BY 1
ORDER BY 1
""")
report = session.execute(stmt).mappings().all()
# ✅ PostgreSQL-специфичные функции (JSONB, arrays)
stmt = text("""
SELECT id, data->>'email' as email
FROM users
WHERE data @> '{"role": "admin"}'::jsonb
""")
admins = session.execute(stmt).mappings().all()# ❌ Простой запрос через text() — теряете типизацию и безопасность
stmt = text("SELECT * FROM users WHERE id = :id")
user = session.execute(stmt, {"id": 1}).first()
# ✅ Тот же запрос через ORM — типобезопасно
stmt = select(User).where(User.id == 1)
user = session.execute(stmt).scalars().one_or_none()Правило: text() только для того, что невозможно выразить через ORM/Core.
Задача: API endpoint с 10+ фильтрами, сортировкой и пагинацией.
from typing import Optional
from sqlalchemy import select, and_, bindparam
def search_users(
name: Optional[str] = None,
min_age: Optional[int] = None,
country: Optional[str] = None,
has_posts: bool = False,
sort_by: str = "created_at",
sort_order: str = "desc",
page: int = 1,
per_page: int = 20,
):
"""Универсальный поиск с динамическими фильтрами."""
# Базовый запрос
stmt = select(User)
conditions = []
params = {}
# Динамические фильтры
if name:
conditions.append(User.name.ilike(bindparam("name")))
params["name"] = f"%{name}%"
if min_age is not None:
conditions.append(User.age >= bindparam("min_age"))
params["min_age"] = min_age
if country:
conditions.append(User.country == bindparam("country"))
params["country"] = country
# Фильтр по связанной таблице через EXISTS
if has_posts:
from sqlalchemy import exists
conditions.append(
exists().where(Post.user_id == User.id)
)
# Применяем фильтры
if conditions:
stmt = stmt.where(and_(*conditions))
# Сортировка (белый список колонок!)
sort_columns = {
"created_at": User.created_at,
"name": User.name,
"age": User.age,
}
sort_col = sort_columns.get(sort_by, User.created_at)
stmt = stmt.order_by(
sort_col.desc() if sort_order == "desc" else sort_col.asc()
)
# Курсорная пагинация (эффективнее OFFSET)
stmt = stmt.limit(per_page + 1)
result = session.execute(stmt, params)
users = result.scalars().all()
has_next = len(users) > per_page
if has_next:
users = users[:per_page]
next_cursor = users[-1].id if users else None
return {
"users": users,
"has_next": has_next,
"next_cursor": next_cursor,
}Почему это хорошо:
Задача: один запрос для всех метрик дашборда.
from sqlalchemy import func, case
def get_dashboard_metrics():
"""Все метрики дашборда одним запросом."""
stmt = select(
# Базовые счётчики
func.count(User.id).label("total_users"),
func.count(case((User.active == True, 1))).label("active_users"),
# Возрастная статистика
func.avg(User.age).label("avg_age"),
func.percentile_cont(0.5).within_group(User.age).label("median_age"),
# География
func.count(func.distinct(User.country)).label("countries_count"),
# Активность за 30 дней
func.count(
case((
User.last_login >= func.now() - text("interval '30 days'"),
1
))
).label("users_30d")
)
result = session.execute(stmt).one()
return {
"total_users": result.total_users,
"active_users": result.active_users,
"avg_age": float(result.avg_age),
"median_age": result.median_age,
"countries_count": result.countries_count,
"users_30d": result.users_30d,
}Задача: найти позицию конкретного пользователя в рейтинге по очкам.
from sqlalchemy import func, select
def get_user_rank(user_id: int):
"""Позиция пользователя в глобальном рейтинге."""
# Подзапрос с рангами всех пользователей
ranked = (
select(
User.id,
User.score,
func.rank().over(order_by=User.score.desc()).label("rank")
)
.subquery()
)
# Находим конкретный рейтинг
stmt = select(ranked.c.rank).where(ranked.c.id == user_id)
rank = session.execute(stmt).scalar()
return rank# 1. Всегда unique() с joinedload
stmt = select(User).options(joinedload(User.posts))
users = session.execute(stmt).unique().scalars().all()
# 2. bindparam для динамических значений
stmt = select(User).where(User.id == bindparam("uid"))
session.execute(stmt, {"uid": user_id})
# 3. EXISTS вместо IN для больших подзапросов
from sqlalchemy import exists
stmt = select(User).where(
exists().where(Post.user_id == User.id, Post.published == True)
)
# 4. selectinload для больших коллекций
stmt = select(User).options(selectinload(User.posts))
# 5. Курсорная пагинация вместо OFFSET
stmt = select(User).where(User.id > last_id).limit(per_page + 1)# 1. N+1 запросы
for user in users:
print(user.posts) # N дополнительных запросов
# 2. SQL-инъекции через f-strings
stmt = select(User).where(User.email == f"{email}")
# 3. OFFSET на больших таблицах
stmt = select(User).offset(100000).limit(20)
# 4. Без unique() при joinedload
users = session.execute(
select(User).options(joinedload(User.posts))
).scalars().all() # Дубликаты!
# 5. query() вместо select()
users = session.query(User).filter_by(active=True).all() # Legacy APIПеред мержем проверьте:
select() а не query()bindparam() (нет f-strings в WHERE)unique() вызван при joinedloadselectinload а не joinedloadВ следующей теме вы изучите Alembic — инструмент миграции базы данных. Вы научитесь создавать, ревьюить и применять миграции, а также работать с data migrations.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.