Горизонтальное масштабирование: конкурентность, worker pools, оптимизация потребления, backpressure.
Горизонтальное масштабирование: конкурентность, worker pools, оптимизация потребления, backpressure.
Запуск нескольких инстансов приложения:
# Docker Compose
services:
order-processor:
build: .
deploy:
replicas: 5 # 5 инстансов
environment:
- BROKER_URL=amqp://rabbitmqКак работает:
@broker.subscriber(
"orders",
group_id="order-processors", # Одна группа на все инстансы
max_workers=10
)
async def process_order(order: dict):
...Важно:
@broker.subscriber(
"orders",
max_workers=10 # 10 параллельных обработчиков
)
async def process_order(order: dict):
# До 10 сообщений обрабатываются параллельно
await db.save(order)Выбор max_workers:
1 — строго последовательно (нет race conditions)10-50 — баланс между производительностью и памятью100+ — высокая конкурентность, но больше потребление памяти@broker.subscriber(
"orders",
max_workers=10,
prefetch_count=1 # Одно сообщение на воркера до ACK
)
async def process_order(order: dict):
...prefetch_count=1:
prefetch_count=10:
# Тяжёлые задачи
@broker.subscriber(
"orders.heavy",
"orders.heavy.queue",
max_workers=2 # Мало воркеров, тяжёлая обработка
)
async def process_heavy_order(order: dict):
await complex_calculation(order)
# Лёгкие задачи
@broker.subscriber(
"orders.light",
"orders.light.queue",
max_workers=20 # Много воркеров, лёгкая обработка
)
async def process_light_order(order: dict):
await quick_save(order)@broker.subscriber(
"orders",
"orders.priority",
max_priority=10,
max_workers=5
)
async def process_priority_order(order: dict):
...
# Публикация с приоритетом
await broker.publish(order, "orders", priority=10) # Высокий приоритет@broker.subscriber(
"orders",
batch=True,
max_batch_size=100,
batch_timeout_ms=1000
)
async def process_orders(orders: list[dict]):
# Один bulk insert вместо 100 отдельных
await db.orders.bulk_insert(orders)Преимущества:
Недостатки:
import asyncio
@broker.subscriber(
"orders",
max_workers=50 # Высокая конкурентность
)
async def process_order(order: dict):
# Параллельные I/O операции
await asyncio.gather(
db.save(order),
cache.set(f"order:{order['id']}", order),
notifications.send(order)
)@broker.subscriber(
"orders",
max_workers=10,
prefetch_count=1
)
async def process_order(order: dict):
# Имитация медленной обработки
await asyncio.sleep(1.0)Что происходит:
# RabbitMQ: max length
await channel.queue_declare(
"orders",
arguments={
"x-max-length": 10000, # Максимум 10K сообщений
"x-overflow": "reject-publish" # Отклонять новые при переполнении
}
)
# Или удалять старые
arguments={"x-max-length": 10000, "x-overflow": "drop-head"}from aiolimiter import AsyncLimiter
limiter = AsyncLimiter(100, 1) # 100 запросов в секунду
@broker.subscriber("orders")
async def process_order(order: dict):
async with limiter:
await external_api.call(order)# Медленно: JSON сериализация больших объектов
class LargeMessage(BaseModel):
data: list[dict] # 10K элементов
# Быстро: только необходимые поля
class CompactMessage(BaseModel):
id: int
action: str
@broker.subscriber("orders")
async def process_order(order: CompactMessage):
# Загрузить полные данные из БД при необходимости
full_data = await db.get(order.id)from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
# Один pool на приложение
engine = create_async_engine(
"postgresql+asyncpg://localhost/db",
pool_size=20, # 20 подключений в пуле
max_overflow=10, # +10 временных при пике
pool_pre_ping=True # Проверка перед использованием
)
async def get_db_session():
async with async_session(engine) as session:
yield sessionfrom functools import lru_cache
import aioredis
@lru_cache
def get_settings():
return Settings(...) # Кэширование настроек
async def get_redis():
redis = await aioredis.from_url("redis://localhost")
yield redis
@broker.subscriber("products")
async def process_product(product: dict, redis=Depends(get_redis)):
# Проверка кэша
cached = await redis.get(f"product:{product['id']}")
if cached:
return
# Обработка
result = await heavy_computation(product)
# Сохранение в кэш
await redis.setex(f"product:{product['id']}", 3600, result)from prometheus_client import Histogram, Counter
processing_time = Histogram(
'processing_duration_seconds',
'Processing duration',
['queue']
)
messages_per_second = Counter(
'messages_processed_total',
'Messages processed',
['queue']
)
@broker.middleware
async def metrics_middleware(message, next_middleware):
start = time.time()
try:
result = await next_middleware(message)
messages_per_second.labels(queue=message.routing_key).inc()
return result
finally:
duration = time.time() - start
processing_time.labels(queue=message.routing_key).observe(duration)# Проверка отставания
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group order-processors
# Вывод:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
order-processors orders 0 1000 1500 500
order-processors orders 1 2000 2000 0LAG > 0 — потребители не успевают.
Решения:
from faststream import FastStream
from faststream.rabbit import RabbitBroker
from sqlalchemy.ext.asyncio import create_async_engine
import aioredis
from aiolimiter import AsyncLimiter
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
# Connection pools
engine = create_async_engine(
"postgresql+asyncpg://localhost/db",
pool_size=20,
max_overflow=10
)
redis = None
async def init_redis():
global redis
redis = await aioredis.from_url("redis://localhost")
# Rate limiter для внешнего API
api_limiter = AsyncLimiter(100, 1) # 100 req/s
# Лёгкие задачи — высокая конкурентность
@broker.subscriber(
"orders.light",
"orders.light.queue",
max_workers=50,
prefetch_count=5
)
async def process_light_order(order: dict):
await db.orders.insert(order)
await redis.setex(f"order:{order['id']}", 3600, order)
# Тяжёлые задачи — низкая конкурентность
@broker.subscriber(
"orders.heavy",
"orders.heavy.queue",
max_workers=5,
prefetch_count=1
)
async def process_heavy_order(order: dict):
async with api_limiter:
result = await external_api.call(order)
await db.orders.update(order['id'], result)
# Batch обработка для массовых операций
@broker.subscriber(
"analytics.events",
batch=True,
max_batch_size=500,
batch_timeout_ms=2000
)
async def process_events(events: list[dict]):
await db.analytics.bulk_insert(events)Следующая тема — Production: мониторинг, логирование, деплой: observability, Prometheus, Grafana, Kubernetes.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.