Manual ack, reject, requeue, DLX — гарантированная обработка сообщений
Надёжность доставки сообщений — критическое требование для production-систем. В этой теме изучим acknowledgments, publisher confirms и стратегии гарантированной обработки.
Для гарантированной доставки нужно обеспечить надёжность на трёх уровнях:
┌─────────────────┐
│ Producer │ ──► Publisher Confirms (подтверждение от брокера)
├─────────────────┤
│ RabbitMQ │ ──► Perсистентность (очередь + сообщения на диск)
├─────────────────┤
│ Consumer │ ──► Manual Acknowledgments (подтверждение обработки)
└─────────────────┘
По умолчанию публикация асинхронна — продюсер не знает, получило ли брокер сообщение. Publisher Confirms включают подтверждения:
Publisher Confirms позволяют получить подтверждение от брокера о получении сообщения:
import aio_pika
from aio_pika import Message
async def publish_with_confirms():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange(
"reliable_exchange",
aio_pika.ExchangeType.DIRECT,
durable=True
)
message = Message(
body=b"Important message",
delivery_mode=2 # Персистентное
)
# Публикация с подтверждением
confirmed = await exchange.publish(message, routing_key="test")
if confirmed:
print("✓ Брокер подтвердил получение сообщения")
else:
print("✗ Сообщение не доставлено")Для высокой пропускной способности используйте callback:
import asyncio
from collections import defaultdict
class ConfirmedPublisher:
def __init__(self, channel):
self.channel = channel
self.pending = asyncio.Future()
self.confirmed = set()
self.unconfirmed = set()
# Подписка на подтверждения
self.channel.on("publish_confirmed", self.on_confirmed)
self.channel.on("publish_returned", self.on_returned)
def on_confirmed(self, message: aio_pika.abc.AbstractMessage):
"""Вызывается когда брокер подтвердил сообщение."""
if message.delivery_tag:
self.confirmed.add(message.delivery_tag)
def on_returned(self, message: aio_pika.abc.AbstractMessage):
"""Вызывается когда сообщение не смогло быть маршрутизировано."""
self.unconfirmed.add(message.delivery_tag)
print(f"Message returned: {message.body}")
async def publish(self, exchange, message, routing_key):
"""Публикация с ожиданием подтверждения."""
await exchange.publish(message, routing_key=routing_key)
# Ждём подтверждения (опционально)
try:
await asyncio.wait_for(self.pending, timeout=5.0)
return True
except asyncio.TimeoutError:
return FalseКонсьюмер должен подтвердить обработку сообщения. Существует три режима ack:
При no_ack=True сообщение удаляется сразу после отправки, до обработки:
# ❌ Опасно — сообщение удаляется до обработки
async def process(message: aio_pika.abc.AbstractIncomingMessage):
# Сообщение уже удалено из очереди!
await handle(message) # Если здесь ошибка — сообщение потеряно
await queue.consume(process, no_ack=True)Контекстный менеджер process() автоматически отправляет ack при успешном завершении:
# ✅ Безопасно — ack после успешной обработки
async def process(message: aio_pika.abc.AbstractIncomingMessage):
async with message.process(): # ack при выходе без исключений
await handle(message)
await queue.consume(process, no_ack=False)Для большего контроля используйте явные методы подтверждения:
async def process(message: aio_pika.abc.AbstractIncomingMessage):
try:
await handle(message)
await message.ack() # Подтверждение
except TemporaryError as e:
# Временная ошибка — вернуть в очередь
await message.nack(requeue=True)
except PermanentError as e:
# Постоянная ошибка — не возвращать
await message.nack(requeue=False)
except Exception as e:
# Неожиданная ошибка — логировать и reject
logger.error(f"Unexpected error: {e}", exc_info=True)
await message.reject(requeue=False)Правильный выбор requeue зависит от типа ошибки:
Используйте для временных ошибок которые могут разрешиться со временем:
# Временные ошибки — можно retry
class TemporaryError(Exception):
"""База для временных ошибок."""
async def process_payment(message):
try:
await charge_card(message.data)
await message.ack()
except DatabaseConnectionError:
# БД временно недоступна — retry позже
await message.nack(requeue=True)
except PaymentGatewayTimeout:
# Шлюз не отвечает — retry позже
await message.nack(requeue=True)Используйте для постоянных ошибок где retry не поможет:
# Постоянные ошибки — не имеет смысла retry
class PermanentError(Exception):
"""База для постоянных ошибок."""
async def process_email(message):
try:
await send_email(message.data)
await message.ack()
except InvalidEmailAddress as e:
# Адрес невалиден — retry не поможет
logger.error(f"Invalid email: {e}")
await message.nack(requeue=False) # В DLX
except EmailAlreadySent as e:
# Дубликат — игнорируем
logger.warning(f"Duplicate email: {e}")
await message.nack(requeue=False)Если сообщение постоянно вызывает ошибку и requeue=True, оно будет бесконечно возвращаться в очередь:
# ❌ Опасно — бесконечный цикл
async def process(message):
async with message.process(): # При ошибке автоматически nack(requeue=True)
data = json.loads(message.body) # JSONError!
await handle(data)Решение: Счётчик попыток в заголовках:
MAX_RETRIES = 3
async def process(message):
try:
async with message.process():
retry_count = message.headers.get('x-retry-count', 0)
if retry_count >= MAX_RETRIES:
logger.error(f"Max retries reached for message {message.message_id}")
await message.nack(requeue=False) # В DLX
return
data = json.loads(message.body)
await handle(data)
except json.JSONDecodeError as e:
# Постоянная ошибка — не возвращать
logger.error(f"Invalid JSON: {e}")
await message.nack(requeue=False)
except TemporaryError as e:
# Временная ошибка — инкремент счётчика и retry
retry_count = message.headers.get('x-retry-count', 0)
await message.update_headers({'x-retry-count': retry_count + 1})
await message.nack(requeue=True)DLX — специальный обменник для проблемных сообщений которые не могут быть обработаны:
Настройте DLX для всех production очередей чтобы не терять сообщения:
async def setup_dlx(channel):
# Dead Letter Exchange
dlx = await channel.declare_exchange(
"dlx_exchange",
aio_pika.ExchangeType.DIRECT,
durable=True
)
# Dead Letter Queue
dlq = await channel.declare_queue(
"dead_letter_queue",
durable=True,
arguments={
"x-dead-letter-exchange": "dlx_exchange",
"x-dead-letter-routing-key": "dead"
}
)
# Основная очередь с DLX
main_queue = await channel.declare_queue(
"main_queue",
durable=True,
arguments={
"x-dead-letter-exchange": "dlx_exchange",
"x-dead-letter-routing-key": "dead"
}
)
return main_queue, dlqАнализируйте сообщения из DLQ для понимания причины смерти:
async def process_dead_letter(message):
"""Обработка сообщений из Dead Letter Queue."""
async with message.process():
original_exchange = message.headers.get('x-first-death-exchange')
original_reason = message.headers.get('x-first-death-reason')
original_time = message.headers.get('x-first-death-time')
logger.error(
f"Dead letter from {original_exchange}: {original_reason}",
extra={
"body": message.body,
"headers": message.headers
}
)
# Сохраняем для последующего анализа
await save_for_analysis(message)Полный паттерн надёжной доставки объединяет publisher confirms и manual ack:
import aio_pika
from aio_pika import Message
from contextlib import asynccontextmanager
class ReliablePublisher:
def __init__(self, connection_url: str):
self.connection_url = connection_url
self.connection = None
self.channel = None
self.exchange = None
async def connect(self):
self.connection = await aio_pika.connect_robust(self.connection_url)
self.channel = await self.connection.channel()
self.exchange = await self.channel.declare_exchange(
"reliable_exchange",
aio_pika.ExchangeType.DIRECT,
durable=True
)
@asynccontextmanager
async def publish_reliable(self, routing_key: str, body: bytes):
"""
Контекстный менеджер для надёжной публикации.
Гарантирует:
- Сообщение сохранено на диске брокера
- Подтверждение от брокера получено
"""
message = Message(
body=body,
delivery_mode=2, # Персистентное
content_type="application/json"
)
try:
confirmed = await self.exchange.publish(message, routing_key=routing_key)
if not confirmed:
raise RuntimeError("Publisher confirm failed")
yield message
except Exception as e:
# Логирование, retry, алертинг
logger.error(f"Publish failed: {e}", exc_info=True)
raise
async def close(self):
await self.connection.close()
class ReliableConsumer:
def __init__(self, connection_url: str, queue_name: str):
self.connection_url = connection_url
self.queue_name = queue_name
self.connection = None
self.channel = None
self.queue = None
async def connect(self):
self.connection = await aio_pika.connect_robust(self.connection_url)
self.channel = await self.connection.channel()
# QoS для fair dispatch
await self.channel.set_qos(prefetch_count=1)
self.queue = await self.channel.declare_queue(
self.queue_name,
durable=True,
arguments={
"x-dead-letter-exchange": "dlx_exchange",
"x-dead-letter-routing-key": "dead"
}
)
async def consume(self, callback):
"""Запуск консьюмера с ручным ack."""
await self.queue.consume(callback)
async def close(self):
await self.connection.close()Регулярно тестируйте сохранность сообщений при перезапуске RabbitMQ:
async def test_reliability():
"""Тест: сообщения не теряются при перезапуске."""
# 1. Опубликовать сообщения
publisher = ReliablePublisher("amqp://guest:guest@localhost/")
await publisher.connect()
for i in range(100):
async with publisher.publish_reliable("test", f"Message {i}".encode()):
pass
# 2. Перезапустить RabbitMQ (вручную или через тест)
input("Перезапустите RabbitMQ и нажмите Enter...")
# 3. Проверить что все сообщения доставлены
received = []
async def collect(message):
async with message.process():
received.append(message.body)
consumer = ReliableConsumer("amqp://guest:guest@localhost/", "test_queue")
await consumer.connect()
await consumer.consume(collect)
await asyncio.sleep(5)
print(f"Получено {len(received)} из 100 сообщений")
assert len(received) == 100, "Сообщения потеряны!"Следующие рекомендации обеспечат надёжную доставку сообщений:
Ручной ack гарантирует что сообщения не будут потеряны при crash консьюмера:
# ✅ Всегда no_ack=False (по умолчанию)
await queue.consume(callback) # no_ack=False по умолчанию
# ❌ Никогда no_ack=True в production
await queue.consume(callback, no_ack=True)Разделяйте временные и постоянные ошибки для правильной обработки:
async def process(message):
try:
await handle(message)
await message.ack()
except TemporaryError:
await message.nack(requeue=True)
except PermanentError:
await message.nack(requeue=False)
except Exception:
logger.exception("Unexpected error")
await message.nack(requeue=False)DLX позволяет сохранить сообщения которые не могут быть обработаны:
queue = await channel.declare_queue(
"main_queue",
durable=True,
arguments={"x-dead-letter-exchange": "dlx"}
)Алертинг на рост DLQ помогает вовремя обнаружить проблемы:
# Алерт если в DLQ больше 10 сообщений
dlq_size = await get_queue_size("dead_letter_queue")
if dlq_size > 10:
send_alert(f"DLQ size: {dlq_size}")Логирование с контекстом помогает отлаживать проблемы обработки:
async def process(message):
logger.info(
f"Processing message {message.message_id}",
extra={
"routing_key": message.routing_key,
"headers": message.headers,
"body_size": len(message.body)
}
)В следующей теме изучим QoS и контроль потока для управления нагрузкой.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.