DLX, TTL, delayed messages — обработка проблемных сообщений и отложенное выполнение
Dead Letter Exchange (DLX) — механизм обработки проблемных сообщений. В этой теме изучим как настроить DLX, анализировать мёртвые письма и реализовать отложенные сообщения через TTL.
Dead Letter Exchange (DLX) — специальный обменник куда RabbitMQ направляет сообщения которые не могут быть обработаны:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Main Queue │ ───► │ DLX Exchange │ ───► │ DLQ (Queue) │
│ (основная) │ nack │ (dead_letter) │ │ (dead_letter) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Настройка DLX требует создания обменника, очереди и привязки:
import aio_pika
from aio_pika import ExchangeType, Message
async def setup_dlx(channel):
"""Настройка Dead Letter Exchange."""
# 1. Dead Letter Exchange
dlx = await channel.declare_exchange(
"dlx_exchange",
ExchangeType.DIRECT,
durable=True
)
# 2. Dead Letter Queue
dlq = await channel.declare_queue(
"dead_letter_queue",
durable=True,
arguments={
# Не направлять сообщения из DLQ обратно в DLX
"x-dead-letter-exchange": ""
}
)
# 3. Binding DLQ к DLX
await dlq.bind(dlx, routing_key="dead")
# 4. Основная очередь с DLX
main_queue = await channel.declare_queue(
"main_queue",
durable=True,
arguments={
"x-dead-letter-exchange": "dlx_exchange",
"x-dead-letter-routing-key": "dead"
}
)
return main_queue, dlqАнализируйте заголовки мёртвых писем для понимания причины смерти:
from datetime import datetime
async def process_dead_letter(message: aio_pika.abc.AbstractIncomingMessage):
"""
Обработка сообщений из Dead Letter Queue.
Показывает информацию о причине смерти сообщения.
"""
async with message.process():
# Информация из заголовков
death_headers = {
'x-first-death-exchange': message.headers.get('x-first-death-exchange'),
'x-first-death-queue': message.headers.get('x-first-death-queue'),
'x-first-death-reason': message.headers.get('x-first-death-reason'),
'x-first-death-time': message.headers.get('x-first-death-time'),
'x-death': message.headers.get('x-death', []),
}
logger.error(
f"Dead letter received: {message.body}",
extra={
**death_headers,
'original_routing_key': message.routing_key,
'original_headers': message.headers
}
)
# Сохранение для анализа
await save_for_analysis({
'body': message.body.decode(),
'headers': message.headers,
'death_info': death_headers,
'timestamp': datetime.utcnow().isoformat()
})RabbitMQ добавляет заголовки к сообщениям в DLQ для отладки:
# Пример заголовков dead letter
{
'x-first-death-exchange': 'main_exchange',
'x-first-death-queue': 'main_queue',
'x-first-death-reason': 'rejected', # rejected, expired, maxlen
'x-first-death-time': '2026-03-28T10:00:00.000Z',
# История всех 'смертей' (если сообщение retry'илось)
'x-death': [
{
'count': 3,
'reason': 'rejected',
'queue': 'main_queue',
'time': '2026-03-28T10:00:00.000Z',
'exchange': 'main_exchange',
'original-expiration': '3600000',
'routing-keys': ['main']
}
]
}| Причина | Описание |
|---|---|
rejected | nack(requeue=False) от консьюмера |
expired | TTL expired — истекло время жизни |
maxlen | Queue full — переполнение очереди |
TTL ограничивает время жизни сообщения в очереди:
Укажите expiration при публикации сообщения:
async def publish_with_ttl(exchange):
"""Публикация сообщения с TTL."""
message = Message(
body=b"Process within 1 hour",
delivery_mode=2,
expiration="3600000" # 1 час в миллисекундах
)
await exchange.publish(message, routing_key="task")Настройте TTL для очереди чтобы все сообщения имели одинаковое время жизни:
async def create_queue_with_ttl(channel):
"""Очередь где все сообщения имеют одинаковый TTL."""
queue = await channel.declare_queue(
"expiring_queue",
durable=True,
arguments={
"x-message-ttl": 3600000, # 1 час для всех сообщений
"x-dead-letter-exchange": "dlx_exchange",
"x-dead-letter-routing-key": "expired"
}
)
return queueTTL полезен для автоматической очистки неактуальных данных:
# Сообщения о верификации email актуальны 24 часа
verification_message = Message(
body=json.dumps({
"user_id": 123,
"token": "abc123"
}).encode(),
expiration="86400000" # 24 часа
)
# Если пользователь не подтвердил за 24 часа — сообщение expires
# и попадает в DLQ для очистки токенаRabbitMQ не поддерживает native delayed messages, но можно эмулировать через TTL + DLX:
Сообщение отправляется в delay queue с TTL, после expiration попадает в main queue:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producer │ ───► │ Delay Queue │ ───► │ Main Queue │
│ │ │ (TTL=5min) │ │ │
└─────────────┘ └──────┬──────┘ └─────────────┘
│
│ TTL expired
▼
┌─────────────┐
│ DLX Exchange│
└─────────────┘
Настройка delay queue с TTL для отложенной доставки:
async def setup_delay_queue(channel):
"""Настройка очереди для отложенных сообщений."""
# Delay exchange
delay_exchange = await channel.declare_exchange(
"delay_exchange",
ExchangeType.DIRECT,
durable=True
)
# Delay queue с TTL
delay_queue = await channel.declare_queue(
"delay_queue_5min",
durable=True,
arguments={
"x-message-ttl": 300000, # 5 минут
"x-dead-letter-exchange": "main_exchange",
"x-dead-letter-routing-key": "main"
}
)
await delay_queue.bind(delay_exchange, routing_key="delay")
return delay_exchange
async def send_delayed_message(
delay_exchange,
body: bytes,
delay_minutes: int = 5
):
"""
Отправка отложенного сообщения.
Сообщение будет доставлено через delay_minutes минут.
"""
message = Message(
body=body,
delivery_mode=2,
headers={
"x-delay-target": "main_queue",
"x-scheduled-time": datetime.utcnow().timestamp() + delay_minutes * 60
}
)
await delay_exchange.publish(
message,
routing_key="delay"
)Используйте delayed messages для отложенных уведомлений:
async def schedule_reminder(user_id: int, reminder_text: str):
"""Отправка напоминания через 1 час."""
delay_exchange = get_delay_exchange()
await send_delayed_message(
delay_exchange,
body=json.dumps({
"user_id": user_id,
"type": "reminder",
"text": reminder_text
}).encode(),
delay_minutes=60
)
# Через 1 час сообщение попадёт в main_queue
# и консьюмер отправит напоминание пользователюСоздайте несколько delay queue для разных задержек:
async def setup_multiple_delay_queues(channel):
"""Создание очередей для разных задержек."""
delays = {
"1min": 60000,
"5min": 300000,
"15min": 900000,
"1hour": 3600000,
"1day": 86400000
}
delay_queues = {}
for name, ttl in delays.items():
queue = await channel.declare_queue(
f"delay_queue_{name}",
durable=True,
arguments={
"x-message-ttl": ttl,
"x-dead-letter-exchange": "main_exchange",
"x-dead-letter-routing-key": "main"
}
)
await queue.bind(
await channel.declare_exchange(
f"delay_exchange_{name}",
ExchangeType.DIRECT,
durable=True
),
routing_key="delay"
)
delay_queues[name] = queue
return delay_queuesАльтернатива — официальный plugin для отложенных сообщений с задержкой в заголовке:
# Включение плагина
# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
async def setup_delayed_exchange(channel):
"""Настройка с plugin delayed message."""
# Exchange с x-delayed-type
delayed_exchange = await channel.declare_exchange(
"delayed_exchange",
aio_pika.ExchangeType.X_DELAYED, # Специальный тип
arguments={"x-delayed-type": "direct"},
durable=True
)
# Публикация с задержкой в заголовке
message = Message(
body=b"Delayed message",
delivery_mode=2,
headers={"x-delay": 60000} # 60 секунд
)
await delayed_exchange.publish(
message,
routing_key="main"
)Мониторьте размер DLQ через Prometheus:
from prometheus_client import Gauge
dlq_size_gauge = Gauge('rabbitmq_dlq_size', 'DLQ size')
dlq_rate_gauge = Gauge('rabbitmq_dlq_rate', 'DLQ messages per minute')
async def monitor_dlq():
"""Мониторинг Dead Letter Queue."""
async with aiohttp.ClientSession() as session:
async with session.get(
"http://localhost:15672/api/queues/%2F/dead_letter_queue",
auth=aiohttp.BasicAuth("guest", "guest")
) as response:
data = await response.json()
dlq_size_gauge.set(data["messages"])
# Алерт если DLQ растёт
if data["messages"] > 100:
send_alert(f"DLQ size: {data['messages']}")Автоматизируйте обработку DLQ с retry для временных ошибок:
class DLQProcessor:
"""Автоматическая обработка Dead Letter Queue."""
def __init__(self, channel):
self.channel = channel
self.retryable_errors = {
'rejected', # Можно retry'ить
}
self.permanent_errors = {
'expired', # Истёк TTL — не retry'ить
'maxlen' # Переполнение — не retry'ить
}
async def process_dlq(self):
"""Обработка сообщений из DLQ."""
dlq = await self.channel.declare_queue("dead_letter_queue")
async def handler(message):
async with message.process():
death_reason = message.headers.get('x-first-death-reason')
retry_count = len(message.headers.get('x-death', []))
if death_reason in self.retryable_errors and retry_count < 3:
# Retry — отправить обратно в main queue
await self._retry_message(message)
else:
# Сохранить для ручного анализа
await self._save_for_manual_review(message)
await dlq.consume(handler)
async def _retry_message(self, message):
"""Повторная попытка обработки."""
main_exchange = await self.channel.declare_exchange(
"main_exchange",
ExchangeType.DIRECT
)
retry_message = Message(
body=message.body,
delivery_mode=2,
headers={
**message.headers,
'x-retry-from-dlq': True
}
)
await main_exchange.publish(
retry_message,
routing_key="main"
)
async def _save_for_manual_review(self, message):
"""Сохранение для ручного анализа."""
await save_to_database({
'body': message.body.decode(),
'headers': dict(message.headers),
'death_reason': message.headers.get('x-first-death-reason'),
'timestamp': datetime.utcnow().isoformat()
})Следующие рекомендации обеспечат надёжную обработку проблемных сообщений:
Настройте DLX для всех production очередей чтобы не терять сообщения:
# ✅ Production очередь с DLX
queue = await channel.declare_queue(
"production_queue",
durable=True,
arguments={
"x-dead-letter-exchange": "dlx_exchange",
"x-dead-letter-routing-key": "dead"
}
)
# ❌ Опасно — нет DLX для проблемных сообщений
queue = await channel.declare_queue("production_queue", durable=True)Настройте многоуровневый алертинг для DLQ:
# Алерт если в DLQ больше 10 сообщений
if dlq_size > 10:
send_alert(f"DLQ size: {dlq_size}", severity="warning")
# Critical алерт если DLQ растёт быстро
if dlq_growth_rate > 5: # 5 сообщений в минуту
send_alert(f"DLQ growing fast: {dlq_growth_rate}/min", severity="critical")Логирование с контекстом помогает отлаживать проблемы:
logger.error(
f"Message died: {message.headers.get('x-first-death-reason')}",
extra={
'death_exchange': message.headers.get('x-first-death-exchange'),
'death_queue': message.headers.get('x-first-death-queue'),
'death_time': message.headers.get('x-first-death-time'),
'body_preview': message.body[:100]
}
)Выбирайте TTL в зависимости от бизнес-требований:
# ✅ Разумный TTL — 1 час для задач
message = Message(body=b"Task", expiration="3600000")
# ❌ Слишком короткий TTL — 1 секунда
message = Message(body=b"Task", expiration="1000")
# ❌ Слишком длинный TTL — 30 дней
message = Message(body=b"Task", expiration="2592000000")Автоматизируйте очистку старых сообщений из DLQ:
# Скрипт очистки старых DLQ сообщений
async def cleanup_dlq(max_age_hours: int = 24):
dlq = await channel.declare_queue("dead_letter_queue")
async def process(message):
death_time = message.headers.get('x-first-death-time')
if death_time:
age = datetime.utcnow() - datetime.fromisoformat(death_time)
if age.total_seconds() > max_age_hours * 3600:
await message.ack() # Удалить старое
return
await message.nack(requeue=False)
await dlq.consume(process)В следующей теме изучим RPC паттерн для организации запрос-ответ через RabbitMQ.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.