Connection pooling, batching, monitoring, production deployment
Production-готовность SQLAlchemy требует понимания connection pooling, batching, мониторинга и стратегий развёртывания. В этой теме вы изучите best practices для высоконагруженных систем.
from sqlalchemy import create_engine, event
engine = create_engine(
'postgresql+psycopg://user:pass@localhost:5432/dbname',
# Размер пула
pool_size=20, # Постоянные соединения (по умолчанию 5)
max_overflow=40, # Дополнительные при пике (по умолчанию 10)
# Таймауты
pool_timeout=30, # Ждать соединения 30 сек (по умолчанию 30)
pool_recycle=1800, # Пересоздавать через 30 мин (по умолчанию -1)
# Надёжность
pool_pre_ping=True, # Проверка SELECT 1 перед использованием
# Логирование
echo=False, # Выключить в production
)# Формула для web-приложений
# pool_size = (num_cores * 2) + num_disks
# Но лучше эмпирически
# Для 4 CPU, 100 concurrent requests:
pool_size = 20 # Начальное значение
max_overflow = 40 # 2x pool_size для пиков
# Мониторинг использования
@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
stats = {
'size': engine.pool.size(),
'checked_out': engine.pool.checkedout(),
'overflow': engine.pool.overflow(),
}
print(f"Pool stats: {stats}")# Для AWS Lambda / serverless (нет постоянных соединений)
from sqlalchemy.pool import NullPool
engine = create_engine(
'postgresql+psycopg://...',
poolclass=NullPool, # Новое соединение для каждого запроса
)from sqlalchemy.pool import QueuePool
engine = create_engine(
'postgresql+psycopg://...',
poolclass=QueuePool, # По умолчанию
pool_size=20,
max_overflow=40,
)from sqlalchemy import insert
# Обычная вставка (медленно)
for i in range(10000):
user = User(email=f'user{i}@example.com', name=f'User {i}')
session.add(user)
session.commit()
# ~10000 INSERT запросов, ~10-30 секунд
# Bulk insert mappings (быстро)
session.bulk_insert_mappings(User, [
{'email': f'user{i}@example.com', 'name': f'User {i}'}
for i in range(10000)
])
session.commit()
# 1 INSERT запрос, ~0.5 секунды
# Bulk save objects
users = [User(email=f'user{i}@example.com', name=f'User {i}') for i in range(10000)]
session.bulk_save_objects(users)
session.commit()from sqlalchemy import select
def process_large_table(session, batch_size=1000):
"""Обработка большой таблицы чанками."""
offset = 0
while True:
# Загрузка чанка
stmt = select(User).limit(batch_size).offset(offset)
users = session.execute(stmt).scalars().all()
if not users:
break
# Обработка
for user in users:
process_user(user)
# Коммит каждого чанка
session.commit()
offset += batch_size
print(f"Processed {offset} records")
# Использование
with Session(engine) as session:
process_large_table(session, batch_size=1000)# Streaming больших результатов
stmt = select(User).execution_options(yield_per=1000)
with Session(engine) as session:
# Загружает по 1000 записей за раз
for user in session.scalars(stmt):
process_user(user)from prometheus_client import Counter, Histogram, Gauge
from sqlalchemy import event
import time
# Метрики
DB_QUERY_TOTAL = Counter(
'db_query_total',
'Total database queries',
['database', 'operation']
)
DB_QUERY_DURATION = Histogram(
'db_query_duration_seconds',
'Database query duration',
['database', 'operation'],
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
DB_POOL_SIZE = Gauge(
'db_pool_size',
'Database pool size',
['database']
)
DB_POOL_CHECKED_OUT = Gauge(
'db_pool_checked_out',
'Database pool checked out connections',
['database']
)
@event.listens_for(engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
context._query_start_time = time.time()
# Определение операции
operation = statement.split()[0].upper() if statement else 'UNKNOWN'
DB_QUERY_TOTAL.labels(database='main', operation=operation).inc()
@event.listens_for(engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
duration = time.time() - context._query_start_time
operation = statement.split()[0].upper() if statement else 'UNKNOWN'
DB_QUERY_DURATION.labels(database='main', operation=operation).observe(duration)
# Логирование медленных запросов
if duration > 1.0:
print(f"Slow query ({duration:.3f}s): {statement[:200]}")
# Сбор метрик пула
def collect_pool_metrics():
DB_POOL_SIZE.labels(database='main').set(engine.pool.size())
DB_POOL_CHECKED_OUT.labels(database='main').set(engine.pool.checkedout())from sqlalchemy import event
import logging
import time
logger = logging.getLogger('sqlalchemy.slow')
SLOW_QUERY_THRESHOLD = 1.0 # секунды
@event.listens_for(engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
context._query_start_time = time.time()
@event.listens_for(engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
duration = time.time() - context._query_start_time
if duration > SLOW_QUERY_THRESHOLD:
logger.warning(
f"Slow query ({duration:.3f}s):\n"
f"SQL: {statement}\n"
f"Params: {parameters}"
)from sqlalchemy import text
from contextlib import contextmanager
@contextmanager
def db_health_check(engine, timeout=5):
"""Проверка доступности БД."""
try:
with engine.connect() as conn:
# Проверка подключения
conn.execute(text("SELECT 1"), execution_options={'isolation_level': 'AUTOCOMMIT'})
# Проверка пула
if engine.pool.checkedout() >= engine.pool.size() + engine.pool.max_overflow:
raise Exception("Pool exhausted")
yield True
except Exception as e:
yield False
logger.error(f"DB health check failed: {e}")
# Использование в health endpoint
@app.get('/health')
def health_check():
with db_health_check(engine) as healthy:
if healthy:
return {"status": "healthy"}
else:
return {"status": "unhealthy"}, 503# .gitlab-ci.yml
stages:
- test
- migrate
- deploy
test_migrations:
stage: test
script:
- alembic upgrade head
- alembic downgrade base
- alembic upgrade head
rules:
- changes:
- alembic/versions/*
apply_migrations:
stage: migrate
script:
- python scripts/apply_migrations.py
environment:
name: production
when: manual # Ручное подтверждение
rules:
- if: $CI_COMMIT_BRANCH == "main"
deploy:
stage: deploy
script:
- kubectl rollout restart deployment/app
needs:
- apply_migrations#!/bin/bash
# scripts/deploy.sh
set -e
echo "Starting rolling deployment..."
# 1. Применить миграции
echo "Applying migrations..."
python scripts/apply_migrations.py
# 2. Проверить health
echo "Running health checks..."
python scripts/health_check.py
# 3. Rolling update
echo "Starting rolling update..."
kubectl rollout restart deployment/app
# 4. Ждать готовности
kubectl rollout status deployment/app --timeout=300s
# 5. Финальная проверка
python scripts/smoke_test.py
echo "Deployment complete!"# scripts/blue_green_deploy.py
import subprocess
import sys
def deploy():
# Текущий цвет
current_color = get_current_color() # 'blue' или 'green'
new_color = 'green' if current_color == 'blue' else 'blue'
print(f"Deploying to {new_color} environment...")
# 1. Применить миграции (expand phase)
run(f"kubectl apply -f migrations-{new_color}.yaml")
# 2. Deploy new version
run(f"kubectl set image deployment/app-{new_color} app=myapp:v2")
# 3. Wait for ready
run(f"kubectl rollout status deployment/app-{new_color}")
# 4. Health checks
run(f"python scripts/smoke_test.py --env {new_color}")
# 5. Switch traffic
run(f"kubectl patch service/app -p '{{\"spec\":{{\"selector\":{{\"version\":\"{new_color}\"}}}}}}'")
# 6. Contract phase (отдельная миграция)
run("alembic upgrade +1")
print(f"Deployment to {new_color} complete!")
if __name__ == '__main__':
deploy()from sqlalchemy.orm import Query
from functools import lru_cache
import redis
redis_client = redis.Redis()
@lru_cache(maxsize=1000)
def get_cached_query(query_key: str, params: tuple):
"""Кэширование результатов запросов."""
cached = redis_client.get(query_key)
if cached:
return json.loads(cached)
return None
def set_cached_query(query_key: str, result, ttl=300):
"""Сохранение в кэш."""
redis_client.setex(query_key, ttl, json.dumps(result))
# Использование
def get_user_stats(user_id: int):
query_key = f"user_stats:{user_id}"
cached = get_cached_query(query_key, ())
if cached:
return cached
# Запрос к БД
stats = session.query(func.count(Post.id)).filter(Post.user_id == user_id).scalar()
set_cached_query(query_key, stats, ttl=300)
return statsfrom sqlalchemy import create_engine
from dogpile.cache import make_region
# Конфигурация кэша
region = make_region().configure(
'dogpile.cache.redis',
arguments={
'host': 'localhost',
'port': 6379,
'db': 1,
'redis_expiration_time': 3600,
'distributed_lock': True,
}
)
# Engine с кэшем
engine = create_engine(
'postgresql+psycopg://...',
execution_options={
'cache_region': region,
}
)# Настраивайте pool_size под нагрузку
engine = create_engine(..., pool_size=20, max_overflow=40)
# Используйте pool_pre_ping в production
engine = create_engine(..., pool_pre_ping=True)
# Логируйте медленные запросы
@event.listens_for(engine, "after_cursor_execute")
def log_slow(...):
if duration > 1.0:
logger.warning(f"Slow query: {statement}")
# Используйте bulk операции для массовых вставок
session.bulk_insert_mappings(User, data)
# Делайте health checks
def health_check():
with engine.connect() as conn:
conn.execute(text("SELECT 1"))# Не используйте echo=True в production
engine = create_engine(..., echo=True) # ПЛОХО
# Не создавайте engine для каждого запроса
def get_user(user_id):
engine = create_engine(...) # ПЛОХО: нет пула
...
# Не забывайте закрывать сессии
session = Session()
# ПЛОХО: нет session.close()
# Не делайте N+1 запросы
for user in users:
user.posts # ПЛОХО: N+1В следующей теме вы изучите Final Project — полноценное приложение с тестами, миграциями и всеми best practices которые вы изучили в курсе.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.