Горизонтальное масштабирование, routing, sharding стратегии
Горизонтальное масштабирование с SQLAlchemy: подключение к нескольким базам данных, routing запросов, sharding стратегии и управление распределёнными данными.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
# Основная БД
engine_primary = create_engine(
'postgresql+psycopg://user:pass@primary-host:5432/main_db',
pool_size=20,
)
# Реплика для чтения
engine_replica = create_engine(
'postgresql+psycopg://user:pass@replica-host:5432/main_db',
pool_size=20,
)
# Аналитическая БД
engine_analytics = create_engine(
'postgresql+psycopg://user:pass@analytics-host:5432/analytics_db',
pool_size=10,
)
# Фабрики сессий
SessionPrimary = sessionmaker(bind=engine_primary, class_=Session)
SessionReplica = sessionmaker(bind=engine_replica, class_=Session)
SessionAnalytics = sessionmaker(bind=engine_analytics, class_=Session)from sqlalchemy.orm import sessionmaker
# Единая сессия с привязкой моделей к БД
Session = sessionmaker(bind=engine_primary)
# Конфигурация binds
Session.configure(binds={
User: engine_primary, # User в primary
Post: engine_primary, # Post в primary
AuditLog: engine_analytics, # AuditLog в analytics
})
# Использование
session = Session()
# Запросы автоматически идут в правильную БД
users = session.query(User).all() # primary
logs = session.query(AuditLog).all() # analyticsfrom sqlalchemy.orm import Session, sessionmaker
from typing import Type, Optional
class DatabaseRouter:
"""Роутер запросов к разным базам данных."""
def __init__(self):
self.engines = {
'primary': engine_primary,
'replica': engine_replica,
'analytics': engine_analytics,
}
# Маппинг моделей к БД
self.model_binds = {
User: 'primary',
Post: 'primary',
Comment: 'primary',
AuditLog: 'analytics',
Report: 'analytics',
}
def get_engine_for_model(self, model: Type) -> Optional[str]:
"""Получить имя БД для модели."""
# Прямое соответствие
if model in self.model_binds:
return self.model_binds[model]
# Проверка базовых классов
for base_class, db_name in self.model_binds.items():
if issubclass(model, base_class):
return db_name
# По умолчанию primary
return 'primary'
def get_engine_for_query(self, mapper) -> Optional[str]:
"""Получить имя БД для запроса."""
if hasattr(mapper, 'class_'):
return self.get_engine_for_model(mapper.class_)
return None
# Глобальный роутер
router = DatabaseRouter()from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy import event
class RoutingSession(Session):
"""Сессия с автоматическим роутингом."""
def get_bind(self, mapper=None, clause=None, **kwargs):
"""Определить engine для запроса."""
# Для записи всегда primary
if self._flushing or (clause is None and mapper is None):
return engine_primary
# Для чтения проверяем роутер
if mapper is not None:
db_name = router.get_engine_for_query(mapper)
if db_name:
return router.engines[db_name]
# По умолчанию primary
return engine_primary
# Фабрика сессий
Session = sessionmaker(class_=RoutingSession)
Session.configure(bind=engine_primary)# Чтение автоматически идёт на реплику
session = Session()
users = session.query(User).all() # primary (по умолчанию)
# Явное указание БД для запроса
from sqlalchemy.orm import with_loader_criteria
def use_replica(query):
return query.with_session(
Session(bind=engine_replica)
)
reports = use_replica(session.query(Report)).all() # analyticsfrom sqlalchemy.orm import sessionmaker, Session
from sqlalchemy import event
import hashlib
class ShardedSession(Session):
"""Сессия с шардированием по user_id."""
SHARDS = {
0: engine_shard_0,
1: engine_shard_1,
2: engine_shard_2,
3: engine_shard_3,
}
def get_bind(self, mapper=None, clause=None, **kwargs):
"""Определить шард на основе user_id."""
# Получить user_id из контекста запроса
user_id = self.info.get('user_id')
if user_id is None:
# Если нет user_id, используем шард 0
return self.SHARDS[0]
# Вычислить шард через хеш
shard_id = self.get_shard_for_user(user_id)
return self.SHARDS[shard_id]
@staticmethod
def get_shard_for_user(user_id: int) -> int:
"""Вычислить ID шарда для user_id."""
return user_id % len(ShardedSession.SHARDS)
# Фабрика сессий
ShardedSessionMaker = sessionmaker(class_=ShardedSession)
ShardedSessionMaker.configure(bind=engine_shard_0)
# Использование
def get_user_session(user_id: int) -> Session:
"""Создать сессию для конкретного пользователя."""
session = ShardedSessionMaker()
session.info['user_id'] = user_id
return session
# Пример
session = get_user_session(user_id=12345)
user = session.get(User, 12345) # Запрос на правильный шардclass RangeShardedSession(Session):
"""Шардирование по диапазонам ID."""
SHARD_RANGES = [
(0, 1000000, engine_shard_0), # ID 0-999999
(1000000, 2000000, engine_shard_1), # ID 1000000-1999999
(2000000, None, engine_shard_2), # ID 2000000+
]
def get_bind(self, mapper=None, clause=None, **kwargs):
"""Определить шард по диапазону ID."""
# Попытаться получить ID из clause
if clause is not None:
shard_id = self._get_shard_from_clause(clause)
if shard_id is not None:
return self.SHARD_RANGES[shard_id][2]
# По умолчанию первый шард
return self.SHARD_RANGES[0][2]
def _get_shard_from_clause(self, clause) -> Optional[int]:
"""Извлечь ID из WHERE clause."""
# Парсинг условия WHERE id = X
# Упрощённая реализация
return None
# Фабрика
RangeShardedSessionMaker = sessionmaker(class_=RangeShardedSession)from typing import List, Type
from sqlalchemy.orm import Session
class CrossShardQuery:
"""Выполнение запросов на всех шардах."""
def __init__(self, model: Type, session_factory):
self.model = model
self.session_factory = session_factory
self.shards = ShardedSession.SHARDS
def all(self) -> List:
"""Получить все объекты со всех шардов."""
results = []
for shard_id, engine in self.shards.items():
session = Session(bind=engine)
try:
shard_results = session.query(self.model).all()
results.extend(shard_results)
finally:
session.close()
return results
def filter(self, **kwargs) -> List:
"""Фильтрация на всех шардах."""
results = []
for shard_id, engine in self.shards.items():
session = Session(bind=engine)
try:
shard_results = session.query(self.model).filter_by(**kwargs).all()
results.extend(shard_results)
finally:
session.close()
return results
def count(self) -> int:
"""Общий count по всем шардам."""
total = 0
for shard_id, engine in self.shards.items():
session = Session(bind=engine)
try:
total += session.query(self.model).count()
finally:
session.close()
return total
# Использование
query = CrossShardQuery(User, ShardedSessionMaker)
all_users = query.all()
active_count = query.filter(active=True).count()from sqlalchemy import func
class ShardAggregator:
"""Агрегация данных с нескольких шардов."""
def __init__(self, session_factory):
self.session_factory = session_factory
self.shards = ShardedSession.SHARDS
def sum_across_shards(self, model, column) -> float:
"""SUM по всем шардам."""
total = 0
for shard_id, engine in self.shards.items():
session = Session(bind=engine)
try:
result = session.query(func.sum(column)).scalar()
if result:
total += result
finally:
session.close()
return total
def avg_across_shards(self, model, column) -> float:
"""AVG по всем шардам."""
total_sum = 0
total_count = 0
for shard_id, engine in self.shards.items():
session = Session(bind=engine)
try:
result = session.query(
func.sum(column),
func.count(column)
).first()
if result[0]:
total_sum += result[0]
total_count += result[1]
finally:
session.close()
return total_sum / total_count if total_count > 0 else 0
# Использование
aggregator = ShardAggregator(ShardedSessionMaker)
total_revenue = aggregator.sum_across_shards(Order, Order.amount)
avg_order_value = aggregator.avg_across_shards(Order, Order.amount)# alembic/env.py
from alembic import context
from sqlalchemy import engine_from_config, pool
# Конфигурация шардов
SHARD_ENGINES = {
'shard_0': 'postgresql+psycopg://user:pass@shard0/db',
'shard_1': 'postgresql+psycopg://user:pass@shard1/db',
'shard_2': 'postgresql+psycopg://user:pass@shard2/db',
}
def run_migrations_online():
"""Применить миграции ко всем шардам."""
for shard_name, url in SHARD_ENGINES.items():
print(f"Applying migrations to {shard_name}...")
config = context.config
config.set_main_option('sqlalchemy.url', url)
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()
print(f"Migrations applied to {shard_name}")
if context.is_offline_mode():
# Offline mode не поддерживается для шардов
raise Exception("Offline mode not supported for sharded databases")
else:
run_migrations_online()# scripts/apply_migrations.py
from alembic.config import Config
from alembic import command
import sys
def apply_to_all_shards():
"""Применить миграции ко всем шардам."""
shards = ['shard_0', 'shard_1', 'shard_2']
for shard in shards:
print(f"\n{'='*50}")
print(f"Migrating {shard}...")
print('='*50)
alembic_cfg = Config('alembic.ini')
alembic_cfg.set_main_option(
'sqlalchemy.url',
get_shard_url(shard)
)
try:
command.upgrade(alembic_cfg, 'head')
print(f"✓ {shard} migrated successfully")
except Exception as e:
print(f"✗ Migration failed for {shard}: {e}")
sys.exit(1)
print("\n✓ All shards migrated successfully!")
if __name__ == '__main__':
apply_to_all_shards()# Используйте роутинг для разделения чтения/записи
# Write → primary, Read → replica
# Шардируйте по равномерному ключу (user_id, hash)
# Избегайте hotspot шардов
# Реализуйте cross-shard queries для агрегаций
# Но минимизируйте их использование
# Мониторьте размер и нагрузку на каждый шард
# Балансируйте при необходимости
# Используйте consistent hashing для добавления шардов# Не делайте JOIN между шардами
# Это не поддерживается напрямую
# Не полагайтесь на cross-shard запросы для hot paths
# Кэшируйте результаты или денормализуйте
# Не шардируйте без реальной необходимости
# Начните с read replicas
# Не забывайте про миграции на всех шардах
# Автоматизируйте deploymentВ следующей теме вы изучите Production Performance — connection pooling, batching, monitoring, production deployment стратегии и best practices для высоконагруженных систем.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.