Connection pooling, query optimization, read replicas, partitioning
«База данных никогда не врет о задержках». Оптимизация БД — ключ к предсказуемой производительности.
Базы данных часто становятся причиной long tail latency:
| Проблема | Влияние на latency |
|---|---|
| Отсутствие индексов | 10 мс → 1000 мс (full table scan) |
| Lock contention | 5 мс → 500 мс (ожидание блокировки) |
| Connection pool exhaustion | 5 мс → 5000 мс (ожидание соединения) |
| Slow queries (отсутствие LIMIT) | 10 мс → 10000 мс |
| Vacuum/автовакуум в PostgreSQL | 10 мс → 200 мс |
| Checkpoint в PostgreSQL | 10 мс → 500 мс |
# ❌ Плохо: новое соединение на каждый запрос
import psycopg2
def get_user(user_id):
conn = psycopg2.connect(
host='localhost',
database='mydb',
user='user',
password='pass'
)
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE id = %s', (user_id,))
result = cursor.fetchone()
conn.close()
return result
# Время: TCP handshake (5 мс) + auth (10 мс) + query (5 мс) = 20 мс
# При 100 RPS: 100 новых соединений в секунду → перегрузка БД# ✅ Хорошо: пул соединений
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
'postgresql://user:pass@localhost/mydb',
poolclass=QueuePool,
pool_size=20, # Количество постоянных соединений
max_overflow=10, # Дополнительные соединения при пике
pool_timeout=5, # Таймаут ожидания соединения
pool_recycle=3600, # Пересоздавать соединения через 1 час
pool_pre_ping=True # Проверка соединения перед использованием
)
def get_user(user_id):
with engine.connect() as conn:
result = conn.execute(
'SELECT * FROM users WHERE id = %s',
(user_id,)
)
return result.fetchone()
# Время: query (5 мс) — соединение из пула бесплатно# Для read-heavy нагрузки
read_engine = create_engine(
'postgresql://user:pass@localhost/mydb',
pool_size=50, # Больше read соединений
max_overflow=20,
pool_timeout=5
)
# Для write-heavy нагрузки
write_engine = create_engine(
'postgresql://user:pass@localhost/mydb',
pool_size=10, # Меньше write соединений (избегаем lock contention)
max_overflow=5,
pool_timeout=10 # Дольше ждём при пике
)
# Для аналитики (тяжёлые query)
analytics_engine = create_engine(
'postgresql://user:pass@localhost/mydb',
pool_size=5, # Ограниченное количество тяжёлых query
max_overflow=0, # Без overflow
pool_timeout=30 # Долгое ожидание допустимо
)import asyncpg
import asyncio
class AsyncPool:
"""Асинхронный пул соединений."""
def __init__(self, dsn, min_size=5, max_size=20):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def create_pool(self):
self.pool = await asyncpg.create_pool(
self.dsn,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=60
)
async def close_pool(self):
if self.pool:
await self.pool.close()
async def execute(self, query, *args):
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
# Использование
pool = AsyncPool('postgresql://user:pass@localhost/mydb')
async def main():
await pool.create_pool()
# Параллельные запросы через пул
results = await asyncio.gather(
pool.execute('SELECT * FROM users WHERE id = $1', 1),
pool.execute('SELECT * FROM users WHERE id = $1', 2),
pool.execute('SELECT * FROM users WHERE id = $1', 3),
)
await pool.close_pool()
# asyncio.run(main())# ❌ Плохо: full table scan
# SELECT * FROM orders WHERE user_id = 123 AND status = 'pending'
# Время: 500 мс (1M записей)
# ✅ Хорошо: составной индекс
# CREATE INDEX idx_orders_user_status ON orders (user_id, status)
# Время: 5 мс (index scan)
# Проверка использования индекса
# EXPLAIN ANALYZE SELECT * FROM orders
# WHERE user_id = 123 AND status = 'pending'# ❌ Плохо: N+1 запрос
def get_users_with_orders():
users = db.execute('SELECT * FROM users') # 1 запрос
for user in users:
# N запросов к БД
orders = db.execute(
'SELECT * FROM orders WHERE user_id = %s',
user.id
)
user.orders = orders
return users
# Время: 10 мс + 100 × 5 мс = 510 мс
# ✅ Хорошо: JOIN или IN
def get_users_with_orders_optimized():
users = db.execute('SELECT * FROM users')
user_ids = [user.id for user in users]
# 1 запрос с IN
orders = db.execute(
'SELECT * FROM orders WHERE user_id = ANY(%s)',
(user_ids,)
)
# Группируем по user_id
orders_by_user = {}
for order in orders:
orders_by_user.setdefault(order.user_id, []).append(order)
for user in users:
user.orders = orders_by_user.get(user.id, [])
return users
# Время: 10 мс + 10 мс = 20 мс (в 25 раз быстрее!)# ❌ Плохо: может вернуть миллионы строк
def search_products(query):
return db.execute(
'''SELECT * FROM products
WHERE name ILIKE %s''',
(f'%{query}%',)
)
# ✅ Хорошо: ограничение количества
def search_products_limited(query, limit=100):
return db.execute(
'''SELECT * FROM products
WHERE name ILIKE %s
LIMIT %s''',
(f'%{query}%', limit)
)
# ✅ Отлично: с pagination
def search_products_paginated(query, offset=0, limit=50):
return db.execute(
'''SELECT * FROM products
WHERE name ILIKE %s
ORDER BY name
LIMIT %s OFFSET %s''',
(f'%{query}%', limit, offset)
)# ❌ Плохо: много отдельных INSERT
def insert_users(users):
for user in users:
db.execute(
'INSERT INTO users (name, email) VALUES (%s, %s)',
(user.name, user.email)
)
# 1000 INSERT = 1000 × 5 мс = 5000 мс
# ✅ Хорошо: batch INSERT
def insert_users_batch(users):
values = [(user.name, user.email) for user in users]
db.execute(
'INSERT INTO users (name, email) VALUES %s',
(values,) # psycopg2 execute_values
)
# 1 INSERT = 50 мс (в 100 раз быстрее!)
# ✅ Отлично: copy для больших объёмов
def insert_users_copy(users):
from io import StringIO
buffer = StringIO()
for user in users:
buffer.write(f'{user.name}\t{user.email}\n')
buffer.seek(0)
with db.cursor() as cursor:
cursor.copy_from(
buffer,
'users',
columns=('name', 'email')
)
# COPY = 10 мс для 1000 записей (в 500 раз быстрее!)from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# Master для write
master_engine = create_engine(
'postgresql://user:pass@master/mydb',
pool_size=10,
max_overflow=5
)
# Replicas для read
replica_engines = [
create_engine(
f'postgresql://user:pass@replica{i}/mydb',
pool_size=20,
max_overflow=10
)
for i in range(1, 4) # 3 реплики
]
# Балансировка между репликами
import random
def get_replica_engine():
return random.choice(replica_engines)
# Роутинг запросов
class ReadWriteRouter:
def route_query(self, query_type, *args, **kwargs):
if query_type == 'write':
return self._execute_on_master(*args, **kwargs)
else:
return self._execute_on_replica(*args, **kwargs)
def _execute_on_master(self, query, params):
with master_engine.connect() as conn:
return conn.execute(query, params)
def _execute_on_replica(self, query, params):
engine = get_replica_engine()
with engine.connect() as conn:
return conn.execute(query, params)
# Использование
router = ReadWriteRouter()
# Write → master
router.route_query('write', 'INSERT INTO users VALUES (%s)', (data,))
# Read → replica
users = router.route_query('read', 'SELECT * FROM users', ())Master: INSERT user:123 (T=0)
↓ репликация (50-500 мс)
Replica: получает изменения (T=50-500 мс)
Клиент: INSERT user:123 → SELECT user:123
↑ ↑
master replica (ещё не получил!)
Результат: user не найден!
Решение 1: Read-your-writes consistency
import time
from threading import local
_thread_local = local()
def write_to_master(query, params):
"""Запись на master с запоминанием времени."""
with master_engine.connect() as conn:
result = conn.execute(query, params)
conn.commit()
# Запоминаем время записи
_thread_local.last_write_time = time.time()
return result
def read_from_replica(query, params, replication_lag_threshold=1.0):
"""Чтение с replica с проверкой актуальности."""
last_write = getattr(_thread_local, 'last_write_time', 0)
# Если была недавняя запись, читаем с master
if time.time() - last_write < replication_lag_threshold:
return read_from_master(query, params)
# Иначе читаем с replica
return read_from_replica_impl(query, params)Решение 2: Session-level routing
class SessionRouter:
"""Роутинг на уровне сессии."""
def __init__(self):
self.sessions = {}
def get_session(self, session_id, is_write=False):
if session_id not in self.sessions:
# Новая сессия
if is_write:
engine = master_engine
else:
engine = get_replica_engine()
self.sessions[session_id] = {
'engine': engine,
'is_write': is_write,
'created_at': time.time()
}
else:
session = self.sessions[session_id]
# Если сессия начиналась с write, используем master
if session['is_write']:
session['engine'] = master_engine
return self.sessions[session_id]['engine']
# Использование
router = SessionRouter()
# Запрос 1: read → replica
session1_engine = router.get_session('session1', is_write=False)
# Запрос 2: write → master
session1_engine = router.get_session('session1', is_write=True) # master
# Запрос 3: read → master (сессия была write)
session1_engine = router.get_session('session1', is_write=False) # master!-- Секционирование по диапазону дат
CREATE TABLE orders (
id SERIAL,
user_id INTEGER,
amount DECIMAL,
created_at TIMESTAMP
) PARTITION BY RANGE (created_at);
-- Секции по месяцам
CREATE TABLE orders_2026_01 PARTITION OF orders
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE orders_2026_02 PARTITION OF orders
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
-- Query автоматически использует нужную секцию
SELECT * FROM orders
WHERE created_at >= '2026-01-15'
AND created_at < '2026-01-20';
-- Scan только orders_2026_01, не все данныеclass ShardedDatabase:
"""Горизонтальное шардирование по user_id."""
def __init__(self, num_shards=4):
self.num_shards = num_shards
self.shards = [
create_engine(f'postgresql://user:pass@shard{i}/mydb')
for i in range(num_shards)
]
def get_shard(self, user_id):
"""Определяет шард по user_id."""
shard_id = user_id % self.num_shards
return self.shards[shard_id]
def get_user(self, user_id):
shard = self.get_shard(user_id)
with shard.connect() as conn:
return conn.execute(
'SELECT * FROM users WHERE id = %s',
(user_id,)
).fetchone()
def get_users(self, user_ids):
"""Получение пользователей с разных шардов."""
# Группируем по шардам
by_shard = {}
for user_id in user_ids:
shard = self.get_shard(user_id)
by_shard.setdefault(shard, []).append(user_id)
# Параллельные запросы к шардам
results = {}
for shard, ids in by_shard.items():
with shard.connect() as conn:
rows = conn.execute(
'SELECT * FROM users WHERE id = ANY(%s)',
(ids,)
)
for row in rows:
results[row.id] = row
return results
# Использование
sharded_db = ShardedDatabase(num_shards=4)
# Запрос идёт только на один шард
user = sharded_db.get_user(123) # shard 3 (123 % 4 = 3)
# Массовый запрос распределяется по шардам
users = sharded_db.get_users([1, 2, 3, 4, 5]) # все 4 шардаВыбор ключа шардирования — критичное архитектурное решение, которое сложно изменить потом. Неправильный ключ приведёт к:
| Критерий | Описание | Почему важен |
|---|---|---|
| High cardinality | Много уникальных значений (тысячи/миллионы) | Равномерное распределение данных |
| Even distribution | Значения распределяются равномерно | Избегание hot shards |
| Query alignment | Ключ используется в WHERE запросов | Запросы идут на один шард, не на все |
| Stable | Значение не меняется со временем | Избегаем миграции данных между шардами |
| Scalable | Позволяет добавлять новые шарды | Горизонтальное масштабирование |
# ✅ Хорошо для multi-tenant систем
shard_id = user_id % num_shards
# Преимущества:
# - Запросы к данным пользователя идут на один шард
# - Изоляция тенантов (данные не пересекаются)
# - Простая логика
# Недостатки:
# - "Шумные" пользователи могут создать hot shard
# - Неравномерное распределение, если пользователи имеют разный объём данныхКогда использовать: SaaS платформы, социальные сети, e-commerce (данные пользователя локализованы).
# ✅ Хорошо для гео-распределённых систем
region_map = {
'us-east': 0,
'us-west': 1,
'eu-west': 2,
'asia-east': 3
}
shard_id = region_map[user_region]
# Преимущества:
# - Данные близко к пользователям (низкая latency)
# - Соответствие регуляторным требованиям (GDPR)
# - Изоляция по регионам
# Недостатки:
# - Неравномерное распределение (us-east может быть больше asia-east)
# - Сложные кросс-региональные запросыКогда использовать: Глобальные приложения с требованиями к data residency.
import hashlib
def get_shard_by_hash(key, num_shards):
"""
Равномерное распределение через хэш-функцию.
"""
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
return hash_value % num_shards
# Использование
shard_id = get_shard_by_hash(f"user:{user_id}:{order_id}", num_shards=8)
# Преимущества:
# - Очень равномерное распределение
# - Можно шардировать по составному ключу
# - Детерминировано (тот же ключ → тот же шард)
# Недостатки:
# - Запросы без полного ключа требуют scatter-gather
# - Сложная отладка (неочевидно, на каком шарде данные)Когда использовать: Когда нужно равномерное распределение и есть составной ключ доступа.
# ✅ Хорошо для временных данных
def get_shard_by_date(created_at, shard_ranges):
"""
Шардирование по диапазонам дат.
"""
for shard_id, (start_date, end_date) in enumerate(shard_ranges):
if start_date <= created_at < end_date:
return shard_id
return len(shard_ranges) - 1 # Последний шард для новых данных
shard_ranges = [
(datetime(2024, 1, 1), datetime(2024, 4, 1)), # Q1 2024 → shard 0
(datetime(2024, 4, 1), datetime(2024, 7, 1)), # Q2 2024 → shard 1
(datetime(2024, 7, 1), datetime(2024, 10, 1)), # Q3 2024 → shard 2
]
# Преимущества:
# - Эффективные диапазонные запросы (SELECT WHERE created_at BETWEEN)
# - Легко архивировать старые шарды
# - Естественное старение данных
# Недостатки:
# - Hot shard для текущих данных (все пишут в последний шард)
# - Неравномерная нагрузкаКогда использовать: Временные ряды, логи, события с TTL.
def get_shard_composite(user_id, order_id, num_shards):
"""
Составной ключ для балансировки между локальностью и распределением.
"""
# Комбинируем user_id (локальность) и order_id (распределение)
combined = f"{user_id}:{order_id // 1000}" # Группируем заказы по 1000
return get_shard_by_hash(combined, num_shards)
# Преимущества:
# - Баланс между локальностью данных и равномерным распределением
# - Снижает риск hot shards
# Недостатки:
# - Сложнее логика
# - Требует понимания паттернов доступаКогда использовать: Когда один ключ приводит к hot shards.
# ❌ Плохо: неравномерное распределение
shard_id = int(uuid.uuid4().hex[:8], 16) % num_shards
# Проблема: случайные UUID не гарантируют равномерность при малом num_shards
# Решение: используйте hash(UUID) % num_shards# ❌ Плохо: всего 3 значения → 3 шарда, независимо от объёма данных
shard_id = user_type # 'free', 'premium', 'enterprise'
# Проблема:
# - free: 90% пользователей → перегруженный шард
# - enterprise: 1% пользователей → простой шарда
# Решение: комбинируйте с user_id: hash(f"{user_type}:{user_id}") % num_shards# ❌ Плохо: email может измениться
shard_id = hash(user_email) % num_shards
# Проблема: при смене email данные нужно мигрировать на другой шард
# Решение: используйте неизменяемый user_id# ❌ Плохо: запрос на все шарды
def search_all_shards(query):
results = []
for shard in all_shards: # Запрос на каждый шард
results.extend(shard.query(query))
return results
# Проблема: latency = сумма latency всех шардов
# Решение: выбирайте ключ, по которому можно фильтровать в WHEREclass ConsistentHashSharding:
"""
Шардирование с минимальной миграцией при добавлении шардов.
"""
def __init__(self, virtual_nodes=150):
self.virtual_nodes = virtual_nodes
self.ring = {} # hash → shard_id
self.sorted_keys = []
def add_shard(self, shard_id):
"""Добавляет шард на consistent hash ring."""
for i in range(self.virtual_nodes):
key = f"shard:{shard_id}:node:{i}"
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
self.ring[hash_value] = shard_id
self.sorted_keys.append(hash_value)
self.sorted_keys.sort()
def get_shard(self, key):
"""Находит шард для ключа."""
if not self.ring:
return None
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
# Бинарный поиск на кольце
from bisect import bisect_right
pos = bisect_right(self.sorted_keys, hash_value)
if pos == len(self.sorted_keys):
pos = 0
return self.ring[self.sorted_keys[pos]]
# При добавлении шарда мигрирует только 1/N данных
# vs N/(N+1) при обычном modulo шардированииПеред выбором ключа ответьте на вопросы:
# PostgreSQL: логирование медленных запросов
# postgresql.conf:
# log_min_duration_statement = 100 # Логирует query > 100 мс
# log_checkpoints = on
# log_lock_waits = on
# Анализ логов
from collections import defaultdict
def analyze_slow_queries(log_file):
query_times = defaultdict(list)
with open(log_file) as f:
for line in f:
if 'duration:' in line:
# duration: 150.234 ms statement: SELECT * FROM users...
parts = line.split('duration:')[1].strip()
duration_ms = float(parts.split('ms')[0])
statement = parts.split('statement:')[1].strip()
# Нормализуем query (убираем значения)
normalized = normalize_query(statement)
query_times[normalized].append(duration_ms)
# Агрегируем
for query, times in query_times.items():
print(f"Query: {query[:50]}...")
print(f" Count: {len(times)}")
print(f" Avg: {sum(times)/len(times):.1f} мс")
print(f" Max: {max(times):.1f} мс")
print()from prometheus_client import Gauge
DB_CONNECTIONS_USED = Gauge(
'db_connections_used',
'Number of active database connections',
['pool_name']
)
DB_CONNECTIONS_MAX = Gauge(
'db_connections_max',
'Maximum database connections',
['pool_name']
)
def monitor_connection_pool(engine, pool_name):
pool = engine.pool
DB_CONNECTIONS_USED.labels(pool_name=pool_name).set(pool.checkedout())
DB_CONNECTIONS_MAX.labels(pool_name=pool_name).set(pool.size())
# Алерт: pool exhaustion
if pool.checkedout() >= pool.size() * 0.9:
logger.warning(f"Connection pool {pool_name} nearly exhausted")def detect_locks():
"""Выявляет блокировки в PostgreSQL."""
query = '''
SELECT
blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocked_activity.query AS blocked_statement,
blocking_activity.query AS current_statement_in_blocking_process
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted
'''
locks = db.execute(query)
for lock in locks:
logger.warning(f"Lock detected: {lock.blocked_pid} blocked by {lock.blocking_pid}")# Всегда с пулом
engine = create_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=5,
pool_recycle=3600
)read_pool = create_engine(DATABASE_URL, pool_size=50)
write_pool = create_engine(DATABASE_URL, pool_size=10)
analytics_pool = create_engine(DATABASE_URL, pool_size=5)-- Анализируйте slow query
EXPLAIN ANALYZE SELECT * FROM orders WHERE user_id = 123;
-- Создавайте индексы
CREATE INDEX idx_orders_user_id ON orders (user_id);# Используйте JOIN или batch запросы
users_with_orders = db.execute('''
SELECT u.*, o.*
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
''')В следующей теме рассмотрим очереди и backpressure для управления потоком данных.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.