Durable queues, persistent messages, recovery после перезапуска брокера
Для критичных данных (платежи, заказы, транзакции) потеря сообщений недопустима. В этой теме изучим как обеспечить сохранность сообщений при перезапуске RabbitMQ и сбоях оборудования.
Для гарантированной сохранности сообщений нужны все три компонента:
┌─────────────────────────────────────────────────────────┐
│ 1. Durable Exchange — обменник сохраняется │
│ после перезапуска RabbitMQ │
├─────────────────────────────────────────────────────────┤
│ 2. Durable Queue — очередь сохраняется после │
│ перезапуска RabbitMQ │
├─────────────────────────────────────────────────────────┤
│ 3. Persistent Message — сообщение сохраняется │
│ на диск (delivery_mode=2) │
└─────────────────────────────────────────────────────────┘
Если хотя бы один компонент не персистентный — сообщения будут потеряны при перезапуске.
Объявление персистентного обменника сохраняет его после перезапуска RabbitMQ:
import aio_pika
from aio_pika import ExchangeType
async def create_durable_exchange():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# durable=True — обменник сохраняется после перезапуска
exchange = await channel.declare_exchange(
"persistent_exchange",
ExchangeType.DIRECT,
durable=True # ← Ключевой параметр
)Для временных данных можно использовать обменник без durable:
# durable=False (по умолчанию) — обменник исчезнет при перезапуске
exchange = await channel.declare_exchange(
"temporary_exchange",
ExchangeType.DIRECT
# durable=False по умолчанию
)Объявление персистентной очереди сохраняет её после перезапуска RabbitMQ:
async def create_durable_queue():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# durable=True — очередь сохраняется после перезапуска
queue = await channel.declare_queue(
"persistent_queue",
durable=True, # ← Ключевой параметр
exclusive=False, # Не exclusive для персистентности
auto_delete=False # Не auto_delete для персистентности
)| Параметр | Значение для персистентности | Описание |
|---|---|---|
durable | True | Очередь сохраняется после перезапуска |
exclusive | False | Exclusive очереди удаляются при отключении владельца |
auto_delete | False | Auto-delete очереди удаляются когда нет консьюмеров |
Для временных данных используйте очередь без durable:
# Временная очередь — для тестов или кэша
queue = await channel.declare_queue(
"temporary_queue",
durable=False, # Исчезнет при перезапуске
exclusive=True, # Только для этого connection
auto_delete=True # Удалится когда не будет консьюмеров
)Публикация персистентного сообщения сохраняет его на диск брокера:
from aio_pika import Message
async def publish_persistent_message(exchange):
message = Message(
body=b"Important payment data",
delivery_mode=2, # ← Ключевой параметр: 2=persistent
content_type="application/json",
headers={"priority": "high"}
)
await exchange.publish(message, routing_key="payment")| Значение | Название | Поведение |
|---|---|---|
1 | Transient | Сообщение в памяти, теряется при перезапуске |
2 | Persistent | Сообщение на диске, переживает перезапуск |
Важно: delivery_mode=2 гарантирует сохранность только если очередь и обменник также durable.
Полный пример создания полностью персистентной инфраструктуры для критичных данных:
import aio_pika
from aio_pika import Message, ExchangeType
async def setup_persistent_infrastructure():
"""Создание полностью персистентной инфраструктуры."""
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# 1. Durable Exchange
exchange = await channel.declare_exchange(
"payments_exchange",
ExchangeType.DIRECT,
durable=True
)
# 2. Durable Queue с DLX
queue = await channel.declare_queue(
"payments_queue",
durable=True,
arguments={
"x-dead-letter-exchange": "payments_dlx",
"x-message-ttl": 86400000 # 24 часа
}
)
# 3. Binding
await queue.bind(exchange, routing_key="payment.process")
# 4. Публикация персистентного сообщения
message = Message(
body=json.dumps({
"payment_id": "pay_123",
"amount": 1000,
"currency": "USD"
}).encode(),
delivery_mode=2, # Persistent
content_type="application/json",
headers={
"trace_id": "abc-123"
}
)
await exchange.publish(message, routing_key="payment.process")
print("Персистентное сообщение опубликовано")Регулярно тестируйте сохранность сообщений при перезапуске RabbitMQ:
async def test_persistence():
"""Тест: сообщения сохраняются при перезапуске RabbitMQ."""
# Этап 1: Публикация сообщений
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange(
"test_persistent_exchange",
ExchangeType.DIRECT,
durable=True
)
queue = await channel.declare_queue(
"test_persistent_queue",
durable=True
)
await queue.bind(exchange, routing_key="test")
# Публикация 10 персистентных сообщений
for i in range(10):
message = Message(
body=f"Message {i}".encode(),
delivery_mode=2
)
await exchange.publish(message, routing_key="test")
print("Опубликовано 10 персистентных сообщений")
print("\nПерезапустите RabbitMQ и нажмите Enter для продолжения...")
input()
# Этап 2: Проверка сохранности
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Объявляем те же очередь и обменник
exchange = await channel.declare_exchange(
"test_persistent_exchange",
ExchangeType.DIRECT,
durable=True
)
queue = await channel.declare_queue(
"test_persistent_queue",
durable=True
)
received = []
async def collect(message):
async with message.process():
received.append(message.body)
# Подписка на очередь
await queue.consume(collect)
# Ждём получения сообщений
await asyncio.sleep(2)
print(f"\nПолучено {len(received)} из 10 сообщений")
if len(received) == 10:
print("✓ Тест пройден: все сообщения сохранены")
else:
print(f"✗ Тест провален: потеряно {10 - len(received)} сообщений")Персистентность снижает производительность из-за записи на диск:
| Режим | Пропускная способность |
|---|---|
| Transient | ~10 000 msg/sec |
| Persistent | ~1 000 msg/sec |
Пакетная публикация уменьшает накладные расходы на персистентность:
async def publish_batch_persistent(exchange, messages):
"""Пакетная публикация персистентных сообщений."""
publish_tasks = []
for body in messages:
message = Message(
body=body.encode(),
delivery_mode=2
)
publish_tasks.append(
exchange.publish(message, routing_key="batch")
)
# Публикация пачки параллельно
results = await asyncio.gather(*publish_tasks)
return all(results)Асинхронные подтверждения не блокируют публикацию:
class OptimizedPersistentPublisher:
def __init__(self, channel):
self.channel = channel
self.pending_confirms = asyncio.Queue()
async def publish(self, exchange, message, routing_key):
"""Публикация без блокирующего ожидания подтверждения."""
await exchange.publish(message, routing_key=routing_key)
# Подтверждение обработается в callbackРазделяйте персистентные и transient сообщения по важности:
# Критичные данные — персистентные
payment_message = Message(
body=b"Payment data",
delivery_mode=2 # Persistent
)
# Некритичные данные — transient
log_message = Message(
body=b"Log entry",
delivery_mode=1 # Transient
)Для максимальной надёжности в кластере используйте кворумные очереди с репликацией через Raft consensus:
async def create_quorum_queue():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Кворумная очередь — репликация через Raft consensus
queue = await channel.declare_queue(
"quorum_payments_queue",
durable=True,
arguments={
"x-queue-type": "quorum" # ← Кворумная очередь
}
)| Характеристика | Классическая | Кворумная |
|---|---|---|
| Репликация | Master-mirror | Raft consensus |
| Потеря данных | Возможна при failover | Не возможна |
| Производительность | Выше | Ниже (консенсус) |
| Use Case | Кэши, временные данные | Платежи, транзакции |
RabbitMQ автоматически создаёт персистентные объекты для системных нужд:
# Internal exchange — нельзя публиковать напрямую
# Используется для DLX
dlx = await channel.declare_exchange(
"dlx_exchange",
ExchangeType.DIRECT,
durable=True,
auto_delete=False
)
dlq = await channel.declare_queue(
"dead_letter_queue",
durable=True,
arguments={
"x-dead-letter-exchange": "" # Нет DLX для DLQ
}
)Следующие рекомендации обеспечат сохранность ваших данных:
Для критичных данных всегда объявляйте очереди как durable:
# ✅ Для критичных данных
queue = await channel.declare_queue(
"orders_queue",
durable=True
)
# ❌ Не для критичных данных
queue = await channel.declare_queue(
"cache_queue",
durable=False # Исчезнет при перезапуске
)Комментируйте требования к персистентности в коде:
# Персистентность для платежей — требование бизнеса
# delivery_mode=2, durable=True для exchange и queue
# Кворумная очередь для кластераСледите за свободным местом на диске:
# Алерт если диск заполнен > 80%
# RabbitMQ блокирует публикации при disk_free_limitРегулярно тестируйте восстановление после перезапуска:
# Регулярно тестируйте:
# 1. Опубликовать персистентные сообщения
# 2. Перезапустить RabbitMQ
# 3. Проверить сохранностьОграничьте минимальное свободное место на диске:
# В rabbitmq.conf
disk_free_limit.absolute = 2GB
# Блокирует публикации если свободно < 2GBВ следующей теме изучим обработку ошибок и стратегии повторных попыток.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.