Direct, fanout, topic и header обменники — когда и какой использовать
Обменник (exchange) — сердце маршрутизации в RabbitMQ. Понимание типов обменников и их правильного применения — ключ к гибкой архитектуре.
RabbitMQ поддерживает четыре типа обменников, каждый со своей логикой маршрутизации:
| Тип | Логика | Use Case |
|---|---|---|
| Direct | Точное совпадение routing_key и binding_key | Маршрутизация по типу события |
| Fanout | Игнорирует routing_key, рассылает всем | Pub/Sub, broadcast уведомления |
| Topic | Паттерны с wildcard (*, #) | Сложная маршрутизация по категориям |
| Headers | Маршрутизация по заголовкам | Специфичные сценарии, когда ключи недостаточны |
Логика: Сообщение попадает в очередь только если routing_key точно совпадает с binding_key.
Пример использования direct exchange для маршрутизации уведомлений по типам:
import aio_pika
from aio_pika import ExchangeType, Message
async def direct_example():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Объявление direct exchange
exchange = await channel.declare_exchange(
"direct_exchange",
ExchangeType.DIRECT,
durable=True
)
# Очереди для разных типов уведомлений
email_queue = await channel.declare_queue("email_queue", durable=True)
sms_queue = await channel.declare_queue("sms_queue", durable=True)
push_queue = await channel.declare_queue("push_queue", durable=True)
# Привязка с разными ключами
await email_queue.bind(exchange, routing_key="email")
await sms_queue.bind(exchange, routing_key="sms")
await push_queue.bind(exchange, routing_key="push")
# Сообщение с routing_key="email" попадёт только в email_queue
await exchange.publish(
Message(body=b"Send email"),
routing_key="email"
) ┌─────────────┐
│ Producer │
└──────┬──────┘
│ routing_key="email"
▼
┌─────────────┐
┌───────│ Direct │───────┐
│ │ Exchange │ │
│ └──────┬──────┘ │
│ │ │
binding_key="email" binding_key="sms" binding_key="push"
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────┐ ┌──────────┐
│ email_queue │ │ sms_queue│ │ push_queue│
└──────────────┘ └──────────┘ └──────────┘
Одна очередь может быть привязана к нескольким ключам:
# Очередь получает сообщения с двумя ключами
await email_queue.bind(exchange, routing_key="email")
await email_queue.bind(exchange, routing_key="newsletter")Теперь email_queue получит сообщения с routing_key="email" И routing_key="newsletter".
Логика: Игнорирует routing_key и рассылает сообщение во все привязанные очереди. Используйте для broadcast уведомлений:
async def fanout_example():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Fanout exchange для broadcast
exchange = await channel.declare_exchange(
"broadcast_exchange",
ExchangeType.FANOUT,
durable=True
)
# Все очереди получат все сообщения
queue1 = await channel.declare_queue("service_a_events", durable=True)
queue2 = await channel.declare_queue("service_b_events", durable=True)
queue3 = await channel.declare_queue("analytics_events", durable=True)
await queue1.bind(exchange)
await queue2.bind(exchange)
await queue3.bind(exchange)
# routing_key игнорируется — может быть пустым
await exchange.publish(
Message(body=b"System maintenance in 1 hour"),
routing_key="" # Не используется
) ┌─────────────┐
│ Producer │
└──────┬──────┘
│
▼
┌─────────────┐
│ Fanout │
│ Exchange │
└──────┬──────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────┐ ┌──────────┐
│ service_a │ │ service_b│ │ analytics│
└──────────────┘ └──────────┘ └──────────┘
# Сервис уведомлений рассылает события всем подписчикам
async def broadcast_user_created(user_data: dict):
exchange = get_exchange() # fanout exchange
await exchange.publish(
Message(
body=json.dumps({
"event": "user.created",
"user_id": user_data["id"],
"timestamp": datetime.utcnow().isoformat()
}).encode()
),
routing_key=""
)
# Это сообщение получат:
# - Email сервис (отправить welcome email)
# - CRM сервис (создать запись)
# - Analytics сервис (зафиксировать событие)
# - Anti-fraud сервис (проверить на мошенничество)Логика: Маршрутизация по паттернам с wildcard:
* (звёздочка) — заменяет ровно одно слово# (решетка) — заменяет ноль или более словСлова разделяются точками: logs.app.error, orders.us.created. Используйте для иерархической маршрутизации:
async def topic_example():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Topic exchange для логов
exchange = await channel.declare_exchange(
"logs_topic",
ExchangeType.TOPIC,
durable=True
)
# Очереди для разных уровней логов
errors_queue = await channel.declare_queue("errors_queue", durable=True)
warnings_queue = await channel.declare_queue("warnings_queue", durable=True)
all_logs_queue = await channel.declare_queue("all_logs", durable=True)
# Паттерны маршрутизации
await errors_queue.bind(exchange, routing_key="*.error") # logs.app.error, api.error
await warnings_queue.bind(exchange, routing_key="*.warning") # logs.app.warning
await all_logs_queue.bind(exchange, routing_key="logs.#") # logs.*.* — все логи
# Примеры публикаций
await exchange.publish(Message(b"App error"), routing_key="logs.app.error")
await exchange.publish(Message(b"API warning"), routing_key="logs.api.warning")
await exchange.publish(Message(b"DB info"), routing_key="logs.db.info")| Паттерн | Совпадения | Не совпадает с |
|---|---|---|
*.error | logs.error, api.error | logs.app.error, error |
logs.* | logs.error, logs.warning | logs.app.error, logs |
logs.# | logs.error, logs.app.error, logs.app.db.error | — (совпадает со всем) |
# | Всё | — |
user.*.created | user.admin.created, user.guest.created | user.created, user.admin.guest.created |
┌─────────────┐
│ Producer │
└──────┬──────┘
│ routing_key="logs.app.error"
▼
┌─────────────┐
│ Topic │
│ Exchange │
└──────┬──────┘
│
┌──────────────┼──────────────┐
│ │ │
binding="*.error" binding="logs.#" binding="api.*"
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────┐ ┌──────────┐
│ errors_queue │ │all_logs │ │ api_logs │
└──────────────┘ └──────────┘ └──────────┘
Message с routing_key="logs.app.error" попадёт в:
errors_queue (совпадает с *.error — нет, потому что два слова перед error)all_logs (совпадает с logs.# — да)Message с routing_key="api.error" попадёт в:
errors_queue (совпадает с *.error — да)all_logs (не совпадает с logs.# — нет)Логика: Маршрутизация по заголовкам сообщения, игнорирует routing_key. Используйте для специфичных сценариев когда ключи недостаточны:
async def headers_example():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Headers exchange
exchange = await channel.declare_exchange(
"headers_exchange",
ExchangeType.HEADERS,
durable=True
)
queue = await channel.declare_queue("priority_queue", durable=True)
# Привязка по заголовкам
await queue.bind(exchange, arguments={
"x-match": "all", # Все заголовки должны совпасть
"priority": "high",
"type": "urgent"
})
# Сообщение с matching headers
message = Message(
body=b"Urgent high priority message",
headers={
"priority": "high",
"type": "urgent"
}
)
await exchange.publish(message, routing_key="") # routing_key игнорируется| Значение | Логика |
|---|---|
all | Все заголовки из binding должны совпасть |
any | Хотя бы один заголовок должен совпасть |
Нужно ли отправлять сообщение всем подписчикам?
├─ Да → Fanout Exchange
└─ Нет → Нужна фильтрация по ключу?
├─ Нет (только по заголовкам) → Headers Exchange
└─ Да → Точное совпадение или паттерны?
├─ Точное совпадение → Direct Exchange
└─ Паттерны с wildcard → Topic Exchange
| Сценарий | Рекомендуемый тип |
|---|---|
| Уведомления всем сервисам | Fanout |
| Маршрутизация по типу события (email, sms) | Direct |
| Логирование с уровнями и компонентами | Topic |
| Сложная бизнес-логика маршрутизации | Topic или Headers |
| Мульти-тенант изоляция | Direct (tenant_id как key) |
Используйте иерархическую нотацию с точками для читаемости и предсказуемости:
# ✅ Хорошо — читаемо и предсказуемо
"user.created"
"user.deleted"
"order.paid"
"order.shipped"
"logs.app.error"
"logs.db.warning"
# ❌ Плохо — нет структуры
"email"
"sms_notification"
"log_error_app"Дублирование binding приводит к получению сообщений несколько раз:
# ❌ Плохо — одна очередь привязана несколько раз с одним ключом
await queue.bind(exchange, routing_key="email")
await queue.bind(exchange, routing_key="email") # Дубликат!
# ✅ Хорошо — уникальные binding
await queue.bind(exchange, routing_key="email")
await queue.bind(exchange, routing_key="newsletter")Создайте документацию по маршрутизации для команды:
# Routing Rules
## Exchange: notifications
| Routing Key | Queue | Description |
|-------------|-------|-------------|
| email | email_queue | Email уведомления |
| sms | sms_queue | SMS уведомления |
| push.* | push_queue | Все push уведомления |Настройте Dead Letter для обработки непомаршрутизированных сообщений:
# Очередь для неподтверждённых событий
dlx = await channel.declare_exchange("dlx_exchange", ExchangeType.DIRECT)
dlq = await channel.declare_queue(
"unrouted_queue",
durable=True,
arguments={"x-dead-letter-exchange": "dlx_exchange"}
)Полный пример роутера событий для микросервисной архитектуры с использованием topic и fanout обменников:
import aio_pika
from aio_pika import ExchangeType, Message
from enum import Enum
class EventType(str, Enum):
USER_CREATED = "user.created"
USER_UPDATED = "user.updated"
ORDER_PLACED = "order.placed"
ORDER_PAID = "order.paid"
PAYMENT_FAILED = "payment.failed"
class EventRouter:
"""Роутер событий для микросервисной архитектуры."""
def __init__(self, connection: aio_pika.Connection):
self.channel = await connection.channel()
# Topic exchange для всех событий домена
self.events_exchange = await self.channel.declare_exchange(
"domain_events",
ExchangeType.TOPIC,
durable=True
)
# Fanout для критичных событий (broadcast)
self.critical_exchange = await self.channel.declare_exchange(
"critical_events",
ExchangeType.FANOUT,
durable=True
)
async def publish(self, event_type: EventType, payload: dict):
"""Публикация события с автоматической маршрутизацией."""
message = Message(
body=json.dumps(payload).encode(),
content_type="application/json",
delivery_mode=2,
headers={"event_type": event_type.value}
)
# Критичные события дублируются в fanout
if event_type in (EventType.PAYMENT_FAILED,):
await self.critical_exchange.publish(
message,
routing_key=""
)
# Все события в topic exchange
await self.events_exchange.publish(
message,
routing_key=event_type.value
)
async def subscribe(self, pattern: str, callback):
"""
Подписка на события по паттерну.
Args:
pattern: Паттерн например "user.*" или "order.#"
callback: Асинхронная функция для обработки
"""
queue = await self.channel.declare_queue(
f"service_{pattern.replace('.', '_')}",
durable=True
)
await queue.bind(self.events_exchange, routing_key=pattern)
await queue.consume(callback)
# Использование
async def main():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
router = await EventRouter(connection)
# Публикация события
await router.publish(
EventType.USER_CREATED,
{"user_id": 123, "email": "user@example.com"}
)
# Подписка на события пользователя
async def handle_user_event(message):
async with message.process():
print(f"User event: {message.body}")
await router.subscribe("user.*", handle_user_event)
await asyncio.Future()* и #, для сложной иерархической маршрутизацииdomain.entity.action для читаемостиВ следующей теме изучим подтверждения, надёжность доставки и обработку ошибок.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.