Fanout обменники, временные очереди, паттерн event broadcasting
Pub/Sub (Publish-Subscribe) — паттерн рассылки сообщений всем подписчикам. В этой теме изучим реализацию Pub/Sub через fanout exchange и сценарии использования.
┌─────────────┐
│ Publisher │
│ (сервис А) │
└──────┬──────┘
│
▼
┌─────────────┐
│ Fanout │
│ Exchange │
└──────┬──────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────┐ ┌──────────┐
│ Service B │ │ Service C│ │ Service D│
│ (email) │ │ (analytics)│ │ (CRM) │
└──────────────┘ └──────────┘ └──────────┘
Одно сообщение → все подписчики.
Publisher создаёт fanout exchange и публикует события для всех подписчиков:
import aio_pika
from aio_pika import Message, ExchangeType
import json
import uuid
from datetime import datetime
class PubSubPublisher:
"""Publisher для широковещательной рассылки событий."""
def __init__(self, connection_url: str, exchange_name: str):
self.connection_url = connection_url
self.exchange_name = exchange_name
self.connection = None
self.exchange = None
async def connect(self):
"""Подключение и создание fanout exchange."""
self.connection = await aio_pika.connect_robust(
self.connection_url,
client_properties={"connection_name": "pubsub_publisher"}
)
channel = await self.connection.channel()
# Fanout exchange — рассылает всем подписчикам
self.exchange = await channel.declare_exchange(
self.exchange_name,
ExchangeType.FANOUT,
durable=True
)
async def publish(self, event_type: str, payload: dict):
"""
Публикация события для всех подписчиков.
Args:
event_type: Тип события (например, 'user.created')
payload: Данные события
"""
event = {
"event_id": str(uuid.uuid4()),
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
"payload": payload
}
message = Message(
body=json.dumps(event).encode(),
content_type="application/json",
delivery_mode=2 # Персистентное
)
# routing_key игнорируется для fanout
await self.exchange.publish(message, routing_key="")
async def close(self):
await self.connection.close()Subscriber создаёт уникальную очередь и подписывается на все сообщения из exchange:
from aio_pika.abc import AbstractIncomingMessage
class PubSubSubscriber:
"""
Subscriber для получения событий.
Каждый subscriber создаёт свою уникальную очередь
и получает все сообщения из exchange.
"""
def __init__(
self,
connection_url: str,
exchange_name: str,
subscriber_name: str
):
self.connection_url = connection_url
self.exchange_name = exchange_name
self.subscriber_name = subscriber_name
self.connection = None
self.exchange = None
self.queue = None
async def connect(self):
"""Подключение и подписка."""
self.connection = await aio_pika.connect_robust(
self.connection_url,
client_properties={"connection_name": f"pubsub_{self.subscriber_name}"}
)
channel = await self.connection.channel()
# Fanout exchange
self.exchange = await channel.declare_exchange(
self.exchange_name,
ExchangeType.FANOUT,
durable=True
)
# Уникальная очередь для этого подписчика
# Имя включает subscriber_name для читаемости
self.queue = await channel.declare_queue(
f"{self.subscriber_name}_queue",
durable=True
)
# Привязка к exchange (routing_key игнорируется)
await self.queue.bind(self.exchange)
async def subscribe(self, handler):
"""
Подписка на события.
Args:
handler: Асинхронная функция для обработки событий
"""
async def process_message(message: AbstractIncomingMessage):
async with message.process():
event = json.loads(message.body)
await handler(event)
await self.queue.consume(process_message)
print(f"Subscriber '{self.subscriber_name}' started")
async def close(self):
await self.connection.close()Пример использования Pub/Sub для рассылки событий пользователя нескольким сервисам:
# Publisher: Сервис пользователей
async def publish_user_events():
publisher = PubSubPublisher(
"amqp://guest:guest@localhost/",
"user_events"
)
await publisher.connect()
# Событие создания пользователя
await publisher.publish(
"user.created",
{
"user_id": 123,
"email": "user@example.com",
"name": "John Doe"
}
)
await publisher.close()
# Subscriber 1: Email сервис
async def email_service_handler(event: dict):
"""Отправка email при создании пользователя."""
if event["event_type"] == "user.created":
await send_welcome_email(
event["payload"]["email"],
event["payload"]["name"]
)
print(f"Welcome email sent to {event['payload']['email']}")
async def run_email_service():
subscriber = PubSubSubscriber(
"amqp://guest:guest@localhost/",
"user_events",
"email_service"
)
await subscriber.connect()
await subscriber.subscribe(email_service_handler)
await asyncio.Future() # Держим сервис запущенным
# Subscriber 2: Analytics сервис
async def analytics_service_handler(event: dict):
"""Логирование событий для аналитики."""
await log_event_to_analytics(event)
print(f"Event logged: {event['event_type']}")
async def run_analytics_service():
subscriber = PubSubSubscriber(
"amqp://guest:guest@localhost/",
"user_events",
"analytics_service"
)
await subscriber.connect()
await subscriber.subscribe(analytics_service_handler)
await asyncio.Future()
# Subscriber 3: CRM сервис
async def crm_service_handler(event: dict):
"""Создание записи в CRM."""
if event["event_type"] == "user.created":
await create_crm_contact(event["payload"])
print(f"CRM contact created for {event['payload']['email']}")
async def run_crm_service():
subscriber = PubSubSubscriber(
"amqp://guest:guest@localhost/",
"user_events",
"crm_service"
)
await subscriber.connect()
await subscriber.subscribe(crm_service_handler)
await asyncio.Future()Для временных подписчиков используйте exclusive очереди которые удаляются при отключении:
class EphemeralSubscriber(PubSubSubscriber):
"""
Временный подписчик.
Очередь создаётся с exclusive=True и auto_delete=True,
что означает:
- Видна только этому connection
- Удаляется при отключении
"""
async def connect(self):
self.connection = await aio_pika.connect_robust(
self.connection_url,
client_properties={"connection_name": f"ephemeral_{self.subscriber_name}"}
)
channel = await self.connection.channel()
self.exchange = await channel.declare_exchange(
self.exchange_name,
ExchangeType.FANOUT,
durable=True
)
# Временная очередь — авто-имя, exclusive, auto_delete
self.queue = await channel.declare_queue(
"", # Авто-имя (amq.gen-...)
exclusive=True,
auto_delete=True
)
await self.queue.bind(self.exchange)
print(f"Ephemeral subscriber started with queue '{self.queue.name}'")Используйте временные очереди для live обновлений через WebSocket:
class LiveUpdatesSubscriber(EphemeralSubscriber):
"""Подписчик на live обновления в веб-приложении."""
def __init__(self, connection_url, exchange_name, websocket):
super().__init__(connection_url, exchange_name, "live_updates")
self.websocket = websocket # WebSocket клиента
async def subscribe(self):
"""Отправка обновлений через WebSocket."""
async def process_message(message: AbstractIncomingMessage):
async with message.process():
event = json.loads(message.body)
# Отправка через WebSocket
await self.websocket.send_json(event)
await self.queue.consume(process_message)
# В веб-приложении (FastAPI)
@app.websocket("/ws/updates")
async def websocket_updates(websocket: WebSocket):
await websocket.accept()
subscriber = LiveUpdatesSubscriber(
"amqp://guest:guest@localhost/",
"live_updates",
websocket
)
await subscriber.connect()
await subscriber.subscribe()
try:
await websocket.receive() # Держим соединение
finally:
await subscriber.close()Fanout exchange не поддерживает фильтрацию — все подписчики получают всё. Для фильтрации на стороне подписчика:
class FilteredSubscriber(PubSubSubscriber):
"""Подписчик с фильтрацией событий."""
def __init__(self, connection_url, exchange_name, subscriber_name, event_types):
super().__init__(connection_url, exchange_name, subscriber_name)
self.event_types = set(event_types) # Интересующие типы событий
async def subscribe(self, handler):
"""Подписка с фильтрацией по типам событий."""
async def process_message(message: AbstractIncomingMessage):
async with message.process():
event = json.loads(message.body)
# Фильтрация
if event.get("event_type") in self.event_types:
await handler(event)
else:
# Игнорируем неподходящие события
pass
await self.queue.consume(process_message)
# Использование — только user.created и user.updated
subscriber = FilteredSubscriber(
"amqp://guest:guest@localhost/",
"user_events",
"audit_service",
event_types=["user.created", "user.updated", "user.deleted"]
)
await subscriber.connect()
await subscriber.subscribe(audit_handler)Для сложной фильтрации используйте topic exchange вместо fanout — фильтрация на уровне брокера:
class TopicSubscriber:
"""Подписчик с фильтрацией через topic exchange."""
def __init__(self, connection_url, exchange_name, subscriber_name, routing_pattern):
self.connection_url = connection_url
self.exchange_name = exchange_name
self.subscriber_name = subscriber_name
self.routing_pattern = routing_pattern # Например, "user.*"
self.connection = None
self.exchange = None
self.queue = None
async def connect(self):
self.connection = await aio_pika.connect_robust(self.connection_url)
channel = await self.connection.channel()
# Topic exchange вместо fanout
self.exchange = await channel.declare_exchange(
self.exchange_name,
ExchangeType.TOPIC,
durable=True
)
self.queue = await channel.declare_queue(
f"{self.subscriber_name}_queue",
durable=True
)
# Привязка с паттерном фильтрации
await self.queue.bind(self.exchange, routing_key=self.routing_pattern)
async def subscribe(self, handler):
async def process_message(message: AbstractIncomingMessage):
async with message.process():
event = json.loads(message.body)
await handler(event)
await self.queue.consume(process_message)
# Подписка только на события пользователя
subscriber = TopicSubscriber(
"amqp://guest:guest@localhost/",
"domain_events",
"user_service",
routing_pattern="user.*" # Все события пользователя
)Метрики Prometheus для отслеживания получения событий и lag:
from prometheus_client import Counter, Histogram
pubsub_events_received = Counter(
'pubsub_events_received_total',
'Total events received',
['subscriber', 'event_type']
)
pubsub_event_lag = Histogram(
'pubsub_event_lag_seconds',
'Time between event publish and consume',
['subscriber']
)
class MonitoredSubscriber(PubSubSubscriber):
"""Подписчик с метриками."""
async def subscribe(self, handler):
async def process_message(message: AbstractIncomingMessage):
async with message.process():
event = json.loads(message.body)
# Метрики
pubsub_events_received.labels(
subscriber=self.subscriber_name,
event_type=event.get("event_type", "unknown")
).inc()
# Lag (время от публикации до обработки)
timestamp = event.get("timestamp")
if timestamp:
publish_time = datetime.fromisoformat(timestamp)
lag = (datetime.utcnow() - publish_time).total_seconds()
pubsub_event_lag.labels(
subscriber=self.subscriber_name
).observe(lag)
await handler(event)
await self.queue.consume(process_message)Следующие рекомендации обеспечат надёжную работу Pub/Sub:
Используйте понятные и предсказуемые имена для обменников и очередей:
# ✅ Хорошо — понятно и предсказуемо
exchange_name = "user_events"
queue_name = "email_service_queue"
# ❌ Плохо — нет контекста
exchange_name = "events"
queue_name = "queue1"Для критичных событий используйте персистентные сообщения:
# ✅ Для критичных событий (аудит, платежи)
message = Message(
body=json.dumps(event).encode(),
delivery_mode=2 # Persistent
)
# ❌ Для временных уведомлений можно transient
message = Message(
body=json.dumps(event).encode(),
delivery_mode=1 # Transient
)Добавляйте версию схемы события для совместимости:
event = {
"event_id": str(uuid.uuid4()),
"event_type": "user.created",
"event_version": "2.0", # Версия схемы
"timestamp": datetime.utcnow().isoformat(),
"payload": payload
}Реализуйте идемпотентность для защиты от дубликатов событий:
class IdempotentHandler:
"""Обработчик с защитой от дубликатов."""
def __init__(self):
self.processed_events = set()
async def handle(self, event: dict):
event_id = event.get("event_id")
# Проверка дубликата
if event_id in self.processed_events:
logger.warning(f"Duplicate event: {event_id}")
return
# Обработка
await self._process(event)
# Сохранение ID обработанного события
self.processed_events.add(event_id)Реализуйте graceful shutdown для корректной остановки подписчика:
class GracefulSubscriber(PubSubSubscriber):
async def close(self):
"""Корректная остановка подписчика."""
print(f"Stopping subscriber '{self.subscriber_name}'...")
# Отмена подписки
if self.queue:
await self.queue.cancel("consumer")
# Ожидание завершения текущих обработок
await asyncio.sleep(1)
# Закрытие подключения
if self.connection:
await self.connection.close()
print(f"Subscriber '{self.subscriber_name}' stopped")В следующей теме изучим мониторинг и наблюдаемость RabbitMQ систем.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.