Кластеры RabbitMQ, кворумные очереди, federation, sharding для масштабирования
Для production-систем важна отказоустойчивость и масштабирование. В этой теме изучим кластеризацию RabbitMQ, кворумные очереди, federation и sharding.
┌─────────────────────────────────────────────────────────┐
│ Load Balancer │
│ (HAProxy/Nginx) │
└────────────┬─────────────────────────────────┬──────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ RabbitMQ-1 │ ◄────────────►│ RabbitMQ-2 │
│ (master) │ clustering │ (mirror) │
└─────────────────┘ └─────────────────┘
│ │
└─────────────────────────────────┘
│
▼
┌─────────────────┐
│ RabbitMQ-3 │
│ (mirror) │
└─────────────────┘
Пример настройки кластера RabbitMQ с тремя узлами через Docker Compose:
# docker-compose.cluster.yml
version: '3.8'
services:
rabbitmq-1:
image: rabbitmq:3-management
hostname: rabbitmq-1
environment:
- RABBITMQ_ERLANG_COOKIE=secret_cookie
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=secret
ports:
- "5672:5672"
- "15672:15672"
command: >
bash -c "
rabbitmq-server -detached &&
sleep 5 &&
rabbitmqctl stop_app &&
rabbitmqctl join_cluster rabbit@rabbitmq-1 &&
rabbitmqctl start_app
"
rabbitmq-2:
image: rabbitmq:3-management
hostname: rabbitmq-2
environment:
- RABBITMQ_ERLANG_COOKIE=secret_cookie
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=secret
depends_on:
- rabbitmq-1
command: >
bash -c "
rabbitmq-server -detached &&
sleep 10 &&
rabbitmqctl stop_app &&
rabbitmqctl join_cluster rabbit@rabbitmq-1 &&
rabbitmqctl start_app
"
rabbitmq-3:
image: rabbitmq:3-management
hostname: rabbitmq-3
environment:
- RABBITMQ_ERLANG_COOKIE=secret_cookie
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=secret
depends_on:
- rabbitmq-1
command: >
bash -c "
rabbitmq-server -detached &&
sleep 15 &&
rabbitmqctl stop_app &&
rabbitmqctl join_cluster rabbit@rabbitmq-1 &&
rabbitmqctl start_app
"Кворумные очереди обеспечивают максимальную надёжность через Raft consensus:
import aio_pika
from aio_pika import ExchangeType
async def create_quorum_queue():
"""Создание кворумной очереди."""
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Кворумная очередь
queue = await channel.declare_queue(
"quorum_payments_queue",
durable=True,
arguments={
"x-queue-type": "quorum" # ← Кворумная очередь
}
)
return queue| Характеристика | Классическая | Кворумная |
|---|---|---|
| Репликация | Master-mirror | Raft consensus |
| Потеря данных | Возможна при failover | Не возможна |
| Производительность | Выше (~10K msg/s) | Ниже (~5K msg/s) |
| Latency | Ниже | Выше (консенсус) |
| Use Case | Кэши, временные данные | Платежи, транзакции |
Да:
Нет:
Stream queues для логов и event sourcing — сообщения хранятся в append-only логе:
async def create_stream_queue():
"""Создание stream очереди для логов."""
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Stream queue
queue = await channel.declare_queue(
"logs_stream_queue",
durable=True,
arguments={
"x-queue-type": "stream", # ← Stream очередь
"x-max-length-bytes": 1073741824, # 1GB
"x-stream-max-segment-size-bytes": 52428800 # 50MB сегменты
}
)
return queueFederation для соединения брокеров в разных локациях через federation plugin:
┌─────────────────┐ ┌─────────────────┐
│ RabbitMQ-EU │ ──────► │ RabbitMQ-US │
│ (upstream) │ feder │ (downstream) │
└─────────────────┘ └─────────────────┘
Настройте federation через Management API для соединения брокеров:
async def setup_federation():
"""Настройка federation между брокерами."""
import aiohttp
# Включение federation плагина
# rabbitmq-plugins enable rabbitmq_federation_management
# Создание federation upstream через Management API
async with aiohttp.ClientSession() as session:
# Upstream определение
await session.put(
"http://localhost:15672/api/parameters/federation-upstream/default/us-upstream",
json={
"value": {
"uri": "amqp://admin:secret@rabbitmq-eu:5672/",
"prefetch-count": 1000,
"reconnect-delay": 5,
"ack-mode": "on-confirm",
"trust-user-id": False
}
},
auth=aiohttp.BasicAuth("admin", "secret")
)
# Federation policy
await session.put(
"http://localhost:15672/api/policies/default/federate-all",
json={
"pattern": ".*",
"definition": {
"federation-upstream": "us-upstream"
},
"priority": 0,
"apply-to": "all"
},
auth=aiohttp.BasicAuth("admin", "secret")
)Sharding для горизонтального масштабирования — распределение сообщений по разным брокерам:
class ShardedProducer:
"""
Продюсер с sharding по ключу.
Распределяет сообщения по разным брокерам
на основе hash от shard_key.
"""
def __init__(self, broker_urls: list[str]):
self.brokers = []
for url in broker_urls:
connection = await aio_pika.connect_robust(url)
self.brokers.append(connection)
def _get_broker_index(self, shard_key: str) -> int:
"""Определение брокера по ключу."""
return hash(shard_key) % len(self.brokers)
async def publish(self, shard_key: str, routing_key: str, body: bytes):
"""Публикация в правильный шард."""
index = self._get_broker_index(shard_key)
connection = self.brokers[index]
channel = await connection.channel()
exchange = await channel.declare_exchange(
"app_exchange",
aio_pika.ExchangeType.DIRECT
)
await exchange.publish(
aio_pika.Message(body=body, delivery_mode=2),
routing_key=routing_key
)
# Использование
producer = ShardedProducer([
"amqp://broker1:5672/",
"amqp://broker2:5672/",
"amqp://broker3:5672/"
])
# Сообщения одного пользователя идут в один брокер
await producer.publish(
shard_key="user_123", # Hash определяет брокер
routing_key="user.event",
body=b"..."
)Используйте HAProxy для балансировки нагрузки между узлами кластера:
# HAProxy конфигурация
# /etc/haproxy/haproxy.cfg
defaults
mode tcp
timeout connect 5s
timeout client 30s
timeout server 30s
frontend rabbitmq_front
bind *:5672
default_backend rabbitmq_back
backend rabbitmq_back
balance roundrobin
option tcp-check
server rabbit1 rabbitmq-1:5672 check
server rabbit2 rabbitmq-2:5672 check
server rabbit3 rabbitmq-3:5672 checkКлиенты подключаются к HAProxy который распределяет нагрузку:
# Клиент подключается к HAProxy
connection = await aio_pika.connect_robust(
"amqp://guest:guest@haproxy:5672/"
)Health check endpoint позволяет HAProxy определять доступность узлов:
from aiohttp import web
async def rabbitmq_health(request):
"""Health check для HAProxy."""
try:
connection = await aio_pika.connect_robust(
"amqp://guest:guest@localhost/",
timeout=5
)
await connection.close()
return web.Response(status=200)
except Exception:
return web.Response(status=503)
app = web.Application()
app.router.add_get("/health", rabbitmq_health)Собирайте метрики кластера для мониторинга состояния:
from prometheus_client import Gauge
cluster_nodes = Gauge(
'rabbitmq_cluster_nodes',
'Number of nodes in cluster'
)
cluster_partitions = Gauge(
'rabbitmq_cluster_partitions',
'Network partitions detected'
)
queue_sync_status = Gauge(
'rabbitmq_queue_sync_status',
'Queue synchronization status',
['queue', 'status'] # synced, unsynced
)
async def collect_cluster_metrics():
"""Сбор метрик кластера."""
async with aiohttp.ClientSession() as session:
# Статус узлов
async with session.get(
"http://localhost:15672/api/nodes",
auth=aiohttp.BasicAuth("admin", "secret")
) as response:
nodes = await response.json()
cluster_nodes.set(len(nodes))
# Проверка на partitions
partitions = sum(1 for n in nodes if n.get('partitions', []))
cluster_partitions.set(partitions)
# Статус синхронизации очередей
async with session.get(
"http://localhost:15672/api/queues",
auth=aiohttp.BasicAuth("admin", "secret")
) as response:
queues = await response.json()
for queue in queues:
status = "synced" if queue.get('synchronised_slave_nodes') else "unsynced"
queue_sync_status.labels(
queue=queue['name'],
status=status
).set(1)Настройте алерты для обнаружения проблем кластера:
CLUSTER_ALERTS = [
{
"name": "RabbitMQNodeDown",
"condition": lambda metrics: metrics['cluster_nodes'] < 3,
"severity": "critical",
"message": "Cluster node down - less than 3 nodes"
},
{
"name": "RabbitMQPartition",
"condition": lambda metrics: metrics['cluster_partitions'] > 0,
"severity": "critical",
"message": "Network partition detected in cluster"
},
{
"name": "RabbitMQQueueUnsynced",
"condition": lambda metrics: metrics['unsynced_queues'] > 0,
"severity": "warning",
"message": "Queues not synchronised"
}
]Следующие рекомендации обеспечат надёжную работу кластера:
Используйте минимум 3 узла для обеспечения кворума:
# ✅ Хорошо — 3 узла для кворума
services:
rabbitmq-1: ...
rabbitmq-2: ...
rabbitmq-3: ...
# ❌ Плохо — 2 узла не обеспечивают кворум
services:
rabbitmq-1: ...
rabbitmq-2: ...Одинаковый cookie на всех узлах необходим для кластеризации:
# Одинаковый cookie на всех узлах
echo "secret_cookie" > /var/lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookieКворумные очереди обеспечивают максимальную надёжность:
# ✅ Для критичных данных
queue = await channel.declare_queue(
"payments_queue",
arguments={"x-queue-type": "quorum"}
)
# ❌ Не для кэша
queue = await channel.declare_queue(
"cache_queue",
arguments={"x-queue-type": "quorum"} # Избыточно
)Алертинг на несинхронизированные очереди:
# Алерт если очереди не синхронизированы
if unsynced_queues > 0:
send_alert(f"Unsynced queues: {unsynced_queues}")Регулярно тестируйте отказоустойчивость кластера:
async def test_failover():
"""Тест отказоустойчивости."""
# 1. Опубликовать сообщения
await publish_messages(100)
# 2. Остановить master узел
await stop_node("rabbitmq-1")
# 3. Проверить что сообщения доступны
received = await consume_messages()
assert len(received) == 100
# 4. Запустить узел обратно
await start_node("rabbitmq-1")Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.