Подписка на очереди, обработка сообщений, базовые паттерны потребления
Консьюмер (consumer) — компонент, который подписывается на очередь и обрабатывает сообщения. Правильная реализация консьюмера критически важна для надёжности системы.
В aio-pika есть два способа получения сообщений:
queue.get() — синхронное получение одного сообщения (polling)queue.consume() — асинхронная подписка с callback (рекомендуется)Метод queue.get() позволяет получить одно сообщение из очереди синхронно. Это простой способ но не масштабируется для production:
import aio_pika
async def main():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue("my_queue")
# Получение одного сообщения
message = await queue.get(timeout=5)
if message:
async with message.process(): # Автоматический ack
print(f"Получено: {message.body}")
else:
print("Очередь пуста")Важно:
queue.get()не рекомендуется для production — это polling, который не масштабируется. Используйтеqueue.consume().
Метод queue.consume() подписывает callback-функцию на получение сообщений. Это предпочтительный способ для production:
import aio_pika
from aio_pika.abc import AbstractIncomingMessage
async def process_message(message: AbstractIncomingMessage):
"""Callback для обработки сообщений."""
async with message.process(): # Автоматический ack после выхода из блока
print(f"Получено: {message.body}")
# Обработка сообщения...
async def main():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue("my_queue", durable=True)
# Подписка консьюмера
await queue.consume(process_message)
print("Консьюмер запущен. Нажмите Ctrl+C для остановки.")
# Держим приложение запущенным
try:
await asyncio.Future() # Бесконечное ожидание
except asyncio.CancelledError:
passRabbitMQ предоставляет три режима подтверждения обработки сообщений:
При no_ack=True сообщение удаляется из очереди сразу после отправки консьюмеру, до обработки:
# ❌ Опасно — сообщение удаляется сразу после отправки
await queue.consume(callback, no_ack=True)Проблема: Если консьюмер упадёт во время обработки, сообщение будет потеряно — оно уже удалено из очереди.
При ручном ack сообщение удаляется только после подтверждения от консьюмера:
# ✅ Безопасно — ack после успешной обработки
async def process_message(message: AbstractIncomingMessage):
async with message.process(): # ack при выходе из блока без исключений
# Длительная обработка
await process_data(message.body)Контекстный менеджер process() автоматически:
Для большего контроля используйте явные методы ack/nack:
async def process_message(message: AbstractIncomingMessage):
try:
await process_data(message.body)
await message.ack() # Явное подтверждение
except TemporaryError:
# Временная ошибка — вернуть в очередь
await message.nack(requeue=True)
except PermanentError:
# Постоянная ошибка — не возвращать
await message.nack(requeue=False)При обработке с ошибкой есть два пути — вернуть сообщение в очередь или отправить в Dead Letter:
async def process_message(message: AbstractIncomingMessage):
try:
await process_data(message.body)
await message.ack()
except TemporaryError as e:
# Временная ошибка (БД недоступна) — вернуть в очередь
# Сообщение будет обработано другим консьюмером
await message.nack(requeue=True)
except PermanentError as e:
# Постоянная ошибка (невалидные данные) — не возвращать
# Сообщение отправится в Dead Letter Exchange если настроен
await message.nack(requeue=False)Для автоматизации retry используйте декоратор который обрабатывает временные и постоянные ошибки:
import functools
import logging
from aio_pika.abc import AbstractIncomingMessage
logger = logging.getLogger(__name__)
def retry_on_failure(max_attempts: int = 3):
"""Декоратор для автоматических повторных попыток."""
def decorator(func):
@functools.wraps(func)
async def wrapper(message: AbstractIncomingMessage, *args, **kwargs):
attempt = message.headers.get('x-retry-count', 0)
try:
await func(message, *args, **kwargs)
await message.ack()
except TemporaryError as e:
if attempt < max_attempts:
# Инкремент счетчика попыток
await message.update_headers({
'x-retry-count': attempt + 1
})
logger.warning(f"Попытка {attempt + 1} failed: {e}")
await message.nack(requeue=True)
else:
logger.error(f"Max attempts reached for message {message.message_id}")
await message.nack(requeue=False)
except PermanentError as e:
logger.error(f"Permanent error: {e}")
await message.nack(requeue=False)
return wrapper
return decorator
@retry_on_failure(max_attempts=3)
async def process_email(message: AbstractIncomingMessage):
"""Обработка сообщения с автоматическими retry."""
data = json.loads(message.body)
await send_email(data['to'], data['subject'], data['body'])Корректное завершение работы консьюмера критически важно для предотвращения потери сообщений. Реализуйте graceful shutdown через обработку сигналов SIGTERM/SIGINT:
import asyncio
import signal
from aio_pika.abc import AbstractConsumer
class GracefulConsumer:
def __init__(self, connection_url: str):
self.connection_url = connection_url
self.connection = None
self.channel = None
self.consumer_tag = None
self._shutdown_event = asyncio.Event()
async def connect(self):
self.connection = await aio_pika.connect_robust(self.connection_url)
self.channel = await self.connection.channel()
async def start_consuming(self, queue_name: str, callback):
queue = await self.channel.declare_queue(queue_name, durable=True)
# Сохраняем consumer_tag для отмены
consumer = await queue.consume(callback)
self.consumer_tag = consumer.consumer_tag
print(f"Консьюмер запущен с тегом {self.consumer_tag}")
async def stop(self):
"""Graceful shutdown — завершение текущих обработок."""
print("Начало graceful shutdown...")
if self.consumer_tag:
# Отмена подписки — новые сообщения не будут поступать
await self.channel.cancel(self.consumer_tag)
print("Подписка отменена, завершение текущих обработок...")
# Ожидание завершения текущих обработок (до 30 секунд)
try:
await asyncio.wait_for(self._shutdown_event.wait(), timeout=30)
except asyncio.TimeoutError:
print("Таймаут завершения обработок")
# Закрытие подключения
if self.connection:
await self.connection.close()
print("Консьюмер остановлен")
async def run(self, queue_name: str, callback):
"""Запуск консьюмера с обработкой сигналов."""
await self.connect()
await self.start_consuming(queue_name, callback)
# Обработка сигналов завершения
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(self._shutdown())
)
# Ожидание сигнала завершения
await self._shutdown_event.wait()
await self.stop()
async def _shutdown(self):
self._shutdown_event.set()
# Использование
async def process_message(message: AbstractIncomingMessage):
async with message.process():
await asyncio.sleep(1) # Имитация обработки
print(f"Обработано: {message.body}")
async def main():
consumer = GracefulConsumer("amqp://guest:guest@localhost/")
await consumer.run("my_queue", process_message)Управляйте параллелизмом обработки сообщений через QoS и semaphore:
QoS prefetch_count ограничивает количество не подтверждённых сообщений у консьюмера:
# Не более 10 сообщений одновременно в обработке
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("my_queue")
await queue.consume(process_message)Semaphore даёт дополнительный контроль параллелизма на уровне приложения:
class ParallelConsumer:
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def process_message(self, message: AbstractIncomingMessage):
async with self.semaphore: # Ограничение параллелизма
async with message.process():
await self._do_work(message.body)
async def _do_work(self, body: bytes):
# Длительная обработка
await asyncio.sleep(1)Рассмотрим три основных паттерна использования консьюмеров:
Паттерн Worker Queue распределяет сообщения между воркерами — одно сообщение обрабатывает один воркер:
async def worker_queue_consumer():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Fair dispatch — по одному сообщению на воркера
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue("tasks", durable=True)
async def process(message: AbstractIncomingMessage):
async with message.process():
task = json.loads(message.body)
await execute_task(task)
print(f"Task completed: {task['id']}")
await queue.consume(process)
print("Worker started")
await asyncio.Future()Fan-out консьюмер получает все сообщения — каждый подписчик обрабатывает каждое сообщение:
async def broadcast_consumer():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Fanout exchange
exchange = await channel.declare_exchange(
"broadcast",
aio_pika.ExchangeType.FANOUT
)
# Временная exclusive очередь для этого консьюмера
queue = await channel.declare_queue(
"", # Авто-имя
exclusive=True,
auto_delete=True
)
await queue.bind(exchange)
async def process(message: AbstractIncomingMessage):
async with message.process():
print(f"Broadcast: {message.body}")
await queue.consume(process)
await asyncio.Future()Несколько консьюмеров на одну очередь автоматически балансируют нагрузку:
# Запустите этот код в нескольких процессах/контейнерах
async def distributed_consumer():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
await channel.set_qos(prefetch_count=5)
queue = await channel.declare_queue("shared_queue", durable=True)
async def process(message: AbstractIncomingMessage):
async with message.process():
await handle_message(message)
await queue.consume(process)
await asyncio.Future()RabbitMQ автоматически балансирует нагрузку между консьюмерами.
Для наблюдаемости системы реализуйте логирование и метрики:
Используйте Prometheus для сбора метрик и логирование с контекстом:
import time
from prometheus_client import Counter, Histogram
messages_processed = Counter('rabbitmq_messages_processed_total', 'Total processed messages')
message_processing_time = Histogram('rabbitmq_message_processing_seconds', 'Message processing time')
async def monitored_process(message: AbstractIncomingMessage):
start_time = time.time()
try:
async with message.process():
await process_message(message)
messages_processed.inc()
except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
raise
finally:
message_processing_time.observe(time.time() - start_time)Health check endpoint позволяет проверять состояние консьюмера:
class HealthCheckedConsumer:
def __init__(self):
self.last_message_time = None
self.processed_count = 0
async def process(self, message: AbstractIncomingMessage):
async with message.process():
await self._handle(message)
self.last_message_time = time.time()
self.processed_count += 1
def get_health(self) -> dict:
return {
"status": "healthy" if self.last_message_time else "starting",
"last_message": self.last_message_time,
"processed_total": self.processed_count
}Избегайте следующих типичных ошибок при работе с консьюмерами:
Отсутствие ack приводит к зависанию сообщений в unacknowledged:
# ❌ Сообщение никогда не будет удалено
async def process(message: AbstractIncomingMessage):
await handle(message)
# Нет ack/nack — сообщение зависнет в unacknowledged
# ✅ Используйте контекстный менеджер
async def process(message: AbstractIncomingMessage):
async with message.process():
await handle(message)Синхронный код блокирует event loop и останавливает обработку:
# ❌ Блокирует event loop
async def process(message: AbstractIncomingMessage):
async with message.process():
time.sleep(5) # Блокировка!
# ✅ Асинхронная версия
async def process(message: AbstractIncomingMessage):
async with message.process():
await asyncio.sleep(5)
# ✅ Или в executor
async def process(message: AbstractIncomingMessage):
async with message.process():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, blocking_function)Автоматический ack приводит к потере сообщений при crash консьюмера:
# ❌ no_ack=True — сообщения теряются при crash
await queue.consume(callback, no_ack=True)
# ✅ Ручной ack после обработки
await queue.consume(callback, no_ack=False)queue.consume() предпочтительнее queue.get() для productionasync with message.process() — безопасный паттернВ следующей теме изучим типы обменников и маршрутизацию сообщений.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.