prefetch_count, fair dispatch, backpressure — управление нагрузкой консьюмера
Quality of Service (QoS) в RabbitMQ управляет потоком сообщений от брокера к консьюмеру. Правильная настройка QoS предотвращает перегрузку консьюмеров и обеспечивает справедливое распределение нагрузки.
QoS (Quality of Service) — механизм контроля количества сообщений, которые брокер выдаёт консьюмеру до получения подтверждения (ack).
Основной параметр QoS — prefetch_count. Он ограничивает количество не подтверждённых сообщений:
# Не более 5 сообщений одновременно в обработке
await channel.set_qos(prefetch_count=5)prefetch_count ограничивает количество не подтверждённых сообщений у каждого консьюмера:
┌─────────────┐
│ RabbitMQ │
│ Broker │
└──────┬──────┘
│ prefetch_count=1
├─────────────────┐
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Consumer A │ │ Consumer B │
│ 1 message │ │ 1 message │
└─────────────┘ └─────────────┘
При prefetch_count=1:
Без QoS брокер распределяет сообщения round-robin, что может привести к дисбалансу нагрузки:
# ❌ Без QoS — неравномерная нагрузка
# Consumer A (медленный): 50 сообщений в обработке
# Consumer B (быстрый): 0 сообщений, простаиваетС prefetch_count=1 работает fair dispatch — нагрузка распределяется пропорционально производительности:
# ✅ С QoS prefetch_count=1 — fair dispatch
# Consumer A: 1 сообщение в обработке
# Consumer B: 1 сообщение в обработке
# Быстрый консьюмер обработает больше, медленный — меньшеИспользуйте prefetch_count=1 для fair dispatch в worker queue:
import aio_pika
from aio_pika.abc import AbstractIncomingMessage
async def worker():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Fair dispatch — по одному сообщению на воркера
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue("tasks", durable=True)
async def process(message: AbstractIncomingMessage):
async with message.process():
# Имитация длительной обработки
await asyncio.sleep(1)
print(f"Processed: {message.body}")
await queue.consume(process)
print("Worker started with QoS prefetch_count=1")
await asyncio.Future()Альтернативный параметр — ограничение по размеру сообщений в байтах:
# Не более 1MB сообщений одновременно в обработке
await channel.set_qos(prefetch_size=1048576)Важно: prefetch_count и prefetch_size взаимоисключающие — используйте один из них.
QoS может применяться к конкретному консьюмеру или ко всем на канале:
При global_=False QoS применяется только к этому консьюмеру:
# QoS применяется к этому channel
await channel.set_qos(prefetch_count=10)При global_=True QoS применяется ко всем консьюмерам на канале:
# QoS применяется ко всем консьюмерам на канале
await channel.set_qos(prefetch_count=10, global_=True)| Параметр | Область применения | Use Case |
|---|---|---|
global_=False (default) | К конкретному консьюмеру | Fair dispatch между воркерами |
global_=True | Ко всем консьюмерам на канале | Ограничение общего потребления |
Backpressure — механизм защиты от перегрузки когда консьюмер не успевает обрабатывать сообщения.
QoS физически ограничивает количество сообщений у консьюмера:
# Консьюмер физически не получит больше N сообщений
await channel.set_qos(prefetch_count=10)Semaphore даёт дополнительный контроль параллелизма на уровне приложения:
class BackpressureConsumer:
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.pending_tasks = []
async def process(self, message: AbstractIncomingMessage):
async with self.semaphore:
async with message.process():
await self._handle(message.body)
async def _handle(self, body: bytes):
# Длительная обработка
await asyncio.sleep(1)Очередь задач с ограниченным размером создаёт backpressure:
class BoundedConsumer:
def __init__(self, max_queue_size: int = 100):
self.queue = asyncio.Queue(maxsize=max_queue_size)
self.workers = []
async def on_message(self, message: AbstractIncomingMessage):
# Блокирующая запись — если очередь полна, ждём
await self.queue.put(message)
async def worker(self, worker_id: int):
while True:
message = await self.queue.get()
try:
async with message.process():
await self._process(message)
finally:
self.queue.task_done()
async def start(self, num_workers: int = 5):
self.workers = [
asyncio.create_task(self.worker(i))
for i in range(num_workers)
]Используйте Prometheus для сбора метрик backpressure:
from prometheus_client import Gauge, Counter
unacked_gauge = Gauge('rabbitmq_unacknowledged', 'Unacknowledged messages')
queue_length_gauge = Gauge('rabbitmq_queue_length', 'Queue length')
processing_time_histogram = Histogram('rabbitmq_processing_seconds', 'Processing time')
class MonitoredConsumer:
def __init__(self):
self.processing_count = 0
async def process(self, message: AbstractIncomingMessage):
start_time = asyncio.get_event_loop().time()
self.processing_count += 1
try:
async with message.process():
await self._handle(message)
finally:
elapsed = asyncio.get_event_loop().time() - start_time
processing_time_histogram.observe(elapsed)
self.processing_count -= 1
def get_health(self) -> dict:
return {
"processing_count": self.processing_count,
"status": "healthy" if self.processing_count < 100 else "overloaded"
}Мониторьте метрики из Management API для алертинга:
async def check_backpressure():
"""Проверка признаков backpressure."""
# Получаем метрики из RabbitMQ Management API
async with aiohttp.ClientSession() as session:
async with session.get(
"http://localhost:15672/api/queues/%2F/my_queue",
auth=aiohttp.BasicAuth("guest", "guest")
) as response:
data = await response.json()
queue_length = data["messages"]
consumers = data["consumers"]
unacked = data["messages_unacknowledged"]
# Алерт если очередь растёт
if queue_length > 1000:
send_alert(f"Queue length: {queue_length}")
# Алерт если много unacknowledged
if unacked > consumers * 10:
send_alert(f"High unacknowledged: {unacked}")Rate limiting ограничивает количество сообщений обрабатываемых в секунду:
import time
from collections import deque
class RateLimitedConsumer:
def __init__(self, max_per_second: int = 100):
self.max_per_second = max_per_second
self.timestamps = deque()
async def process(self, message: AbstractIncomingMessage):
async with message.process():
await self._wait_for_slot()
await self._handle(message)
async def _wait_for_slot(self):
"""Ждём пока не освободится слот rate limiter."""
now = time.time()
# Удаляем старые timestamps (старше 1 секунды)
while self.timestamps and self.timestamps[0] < now - 1:
self.timestamps.popleft()
# Если достигли лимита — ждём
if len(self.timestamps) >= self.max_per_second:
sleep_time = 1 - (now - self.timestamps[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.timestamps.append(time.time())Batch processing обрабатывает сообщения пачками для уменьшения накладных расходов:
class BatchConsumer:
def __init__(self, batch_size: int = 10, timeout: float = 1.0):
self.batch_size = batch_size
self.timeout = timeout
self.buffer = []
self.lock = asyncio.Lock()
async def on_message(self, message: AbstractIncomingMessage):
async with self.lock:
self.buffer.append(message)
if len(self.buffer) >= self.batch_size:
await self._process_batch()
async def _process_batch(self):
batch = self.buffer
self.buffer = []
try:
# Обработка пачки
await self._handle_batch(batch)
# Ack всех сообщений
for message in batch:
await message.ack()
except Exception as e:
# Nack всех при ошибке
for message in batch:
await message.nack(requeue=True)
raise
async def start(self):
"""Периодическая обработка неполных пачек."""
while True:
await asyncio.sleep(self.timeout)
async with self.lock:
if self.buffer:
await self._process_batch()Priority consumer обрабатывает сообщения с высоким приоритетом в первую очередь:
class PriorityConsumer:
def __init__(self):
self.high_priority_queue = asyncio.PriorityQueue()
self.low_priority_queue = asyncio.PriorityQueue()
async def on_message(self, message: AbstractIncomingMessage):
priority = message.headers.get('priority', 5) # 1-10
if priority <= 3:
await self.high_priority_queue.put((priority, message))
else:
await self.low_priority_queue.put((priority, message))
async def worker(self):
while True:
# Сначала обрабатываем high priority
try:
priority, message = self.high_priority_queue.get_nowait()
async with message.process():
await self._handle(message)
continue
except asyncio.QueueEmpty:
pass
# Если high priority пуста — low priority
try:
priority, message = self.low_priority_queue.get_nowait()
async with message.process():
await self._handle(message)
except asyncio.QueueEmpty:
await asyncio.sleep(0.1)| Сценарий | prefetch_count | Обоснование |
|---|---|---|
| Длительная обработка (>1с) | 1-5 | Fair dispatch, избежание перегрузки |
| Быстрая обработка (<100мс) | 10-100 | Уменьшение round-trip |
| Однородная нагрузка | 10-50 | Баланс между throughput и latency |
| Неоднородная нагрузка | 1-5 | Fair dispatch критичен |
| Ограниченная память | 1-10 | Защита от OOM |
Подбирайте оптимальный prefetch_count экспериментально для вашего workload:
async def tune_qos():
"""Экспериментальный подбор оптимального prefetch_count."""
for prefetch in [1, 5, 10, 20, 50]:
print(f"\nTesting prefetch_count={prefetch}")
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
channel = await connection.channel()
await channel.set_qos(prefetch_count=prefetch)
queue = await channel.declare_queue("benchmark_queue")
processed = 0
start_time = time.time()
async def process(message):
nonlocal processed
async with message.process():
await asyncio.sleep(0.01) # Имитация работы
processed += 1
await queue.consume(process)
await asyncio.sleep(10) # Бенчмарк 10 секунд
elapsed = time.time() - start_time
throughput = processed / elapsed
print(f"Throughput: {throughput:.2f} msg/sec")
await connection.close()Избегайте следующих типичных ошибок при настройке QoS:
Большой prefetch_count приводит к перегрузке консьюмера:
# ❌ Плохо — консьюмер перегружен
await channel.set_qos(prefetch_count=1000)
# Consumer получает 1000 сообщений сразу
# При длительной обработке — таймауты и OOMБез QoS брокер выдаёт сообщения без ограничений:
# ❌ Плохо — нет контроля потока
# Брокер будет выдавать сообщения без ограничений
queue = await channel.declare_queue("my_queue")
await queue.consume(process) # Нет set_qos!Разные консьюмеры с разными требованиями не должны делить один QoS:
# ❌ Плохо — разные консьюмеры с разными требованиями
await channel.set_qos(prefetch_count=1)
# Лёгкие уведомления и тяжёлые отчёты в одном канале
# будут иметь одинаковый QoSВ следующей теме изучим долговечность очередей и персистентность сообщений.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.