Создание и публикация сообщений, обменники типа direct, базовая структура приложения
Продюсер (producer) — это компонент, который создаёт и публикует сообщения в RabbitMQ. Понимание правильного способа публикации — основа надёжной системы.
Сообщение в AMQP состоит из двух частей: тела (body) и свойств (properties). Тело содержит сами данные, а свойства метаданные для маршрутизации и обработки.
Класс Message в aio-pika предоставляет единый интерфейс для создания сообщений со всеми необходимыми атрибутами:
from aio_pika import Message
message = Message(
body=b"Hello, RabbitMQ!", # Тело сообщения (bytes)
content_type="application/json", # MIME-тип содержимого
delivery_mode=2, # 1=transient, 2=persistent
correlation_id="abc-123", # Для RPC
reply_to="callback_queue", # Для RPC
expiration="3600000", # TTL в миллисекундах
headers={"priority": "high"} # Пользовательские заголовки
)Тело сообщения — это байты. Вы можете передать любую сериализованную данные. Чаще всего используют JSON для текстовых данных или бинарные форматы вроде Protobuf для производительности:
import json
# JSON
message = Message(
body=json.dumps({"user_id": 123, "action": "created"}).encode()
)
# Protobuf, Pickle, или любой другой формат
# message = Message(body=protobuf_data)| Значение | Название | Поведение |
|---|---|---|
1 | Transient | Сообщение хранится в памяти, теряется при перезапуске |
2 | Persistent | Сообщение сохраняется на диск, переживает перезапуск |
Важно: delivery_mode=2 гарантирует сохранность только если очередь и обменник также объявлены как durable.
Указывайте content_type для правильной десериализации на стороне консьюмера. Это помогает получателю понять как парсить тело сообщения:
# JSON
Message(body=json.dumps(data).encode(), content_type="application/json")
# Protobuf
Message(body=protobuf_bytes, content_type="application/x-protobuf")
# Plain text
Message(body=b"Hello", content_type="text/plain")Рассмотрим полный пример продюсера который подключается к RabbitMQ, объявляет обменник и очередь, и публикует сообщение:
import aio_pika
from aio_pika import Message, ExchangeType
import asyncio
async def main():
# Подключение
connection = await aio_pika.connect_robust(
"amqp://guest:guest@localhost/"
)
async with connection:
channel = await connection.channel()
# Объявление обменника
exchange = await channel.declare_exchange(
"notifications_exchange",
ExchangeType.DIRECT,
durable=True
)
# Объявление очереди
queue = await channel.declare_queue(
"email_notifications",
durable=True
)
# Привязка
await queue.bind(exchange, routing_key="email")
# Публикация сообщения
message = Message(
body=b"Send welcome email to user@example.com",
delivery_mode=2 # Персистентное
)
await exchange.publish(message, routing_key="email")
print("Сообщение опубликовано!")
if __name__ == "__main__":
asyncio.run(main())Direct exchange маршрутизирует сообщения по точному совпадению routing_key. Используйте для разделения сообщений по типу события:
exchange = await channel.declare_exchange(
"orders_exchange",
ExchangeType.DIRECT,
durable=True
)
# Разные routing_key для разных типов событий
await exchange.publish(
Message(body=b"Order created", delivery_mode=2),
routing_key="order.created"
)
await exchange.publish(
Message(body=b"Order paid", delivery_mode=2),
routing_key="order.paid"
)Fanout exchange рассылает сообщение всем подписчикам независимо от routing_key. Используйте для broadcast уведомлений:
exchange = await channel.declare_exchange(
"broadcast_exchange",
ExchangeType.FANOUT,
durable=True
)
# routing_key игнорируется — сообщение получат все подписчики
await exchange.publish(
Message(body=b"System maintenance in 1 hour"),
routing_key="" # Пустой, не используется
)Topic exchange позволяет маршрутизировать по паттернам с wildcard. Используйте для иерархической маршрутизации логов:
exchange = await channel.declare_exchange(
"logs_exchange",
ExchangeType.TOPIC,
durable=True
)
# routing_key с точечной нотацией
await exchange.publish(
Message(body=b"Error occurred"),
routing_key="logs.app.error"
)
await exchange.publish(
Message(body=b"Warning"),
routing_key="logs.api.warning"
)По умолчанию публикация асинхронна — вы не знаете, доставлено ли сообщение брокеру. Publisher Confirms позволяют получить подтверждение:
import aio_pika
from aio_pika import Message
async def publish_with_confirm():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Включаем подтверждения
await channel.set_qos(prefetch_count=1)
exchange = await channel.declare_exchange(
"confirmed_exchange",
aio_pika.ExchangeType.DIRECT,
durable=True
)
message = Message(b"Important message", delivery_mode=2)
# Публикация с подтверждением
confirm = await exchange.publish(message, routing_key="test")
if confirm:
print("Брокер подтвердил получение сообщения")
else:
print("Сообщение не было доставлено")Для высоконагруженных систем используйте callback вместо ожидания подтверждения каждого сообщения. Это позволяет публиковать сообщения асинхронно:
async def on_publish_confirm(message: aio_pika.abc.AbstractMessage):
if message.delivery_tag:
print(f"Сообщение {message.delivery_tag} подтверждено")
else:
print(f"Сообщение {message.delivery_tag} отклонено")
async def main():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# Подписка на подтверждения
channel.on("publish_confirmed", on_publish_confirm)
exchange = await channel.declare_exchange(
"async_confirm_exchange",
aio_pika.ExchangeType.DIRECT
)
# Публикация без ожидания (callback вызовется позже)
await exchange.publish(Message(b"Async message"), routing_key="test")AMQP поддерживает транзакции, но они значительно медленнее Publisher Confirms:
# ❌ Медленно — синхронный режим
await channel.tx_select() # Включить транзакции
try:
await exchange.publish(message1, routing_key="test")
await exchange.publish(message2, routing_key="test")
await channel.tx_commit() # Подтвердить транзакцию
except Exception:
await channel.tx_rollback() # ОткатитьBest Practice: Используйте Publisher Confirms вместо транзакций. Они асинхронны и быстрее.
Для высокой пропускной способности публикуйте сообщения пачками используя asyncio.gather. Это уменьшает накладные расходы на сетевые вызовы:
async def publish_batch(messages: list):
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange(
"batch_exchange",
aio_pika.ExchangeType.DIRECT
)
# Публикация пачки
publish_tasks = [
exchange.publish(
Message(body=msg, delivery_mode=2),
routing_key="batch"
)
for msg in messages
]
results = await asyncio.gather(*publish_tasks, return_exceptions=True)
success_count = sum(1 for r in results if r is True)
print(f"Опубликовано {success_count} из {len(messages)}")При публикации могут возникать различные ошибки. Рассмотрим основные сценарии обработки:
Ограничьте время подключения чтобы избежать долгого ожидания:
import asyncio
try:
connection = await asyncio.wait_for(
aio_pika.connect_robust("amqp://guest:guest@localhost/"),
timeout=5.0
)
except asyncio.TimeoutError:
print("Таймаут подключения к RabbitMQ")Обработка ошибок доступа к обменнику:
from aio_pika.exceptions import ProbableAuthenticationError, ChannelClosed
try:
await exchange.publish(message, routing_key="test")
except ChannelClosed as e:
if e.args[0] == 404:
print("Обменник не существует")
elif e.args[0] == 403:
print("Нет прав на публикацию")connect_robust автоматически переподключается при разрыве соединения:
# Автоматическое переподключение с экспоненциальной задержкой
connection = await aio_pika.connect_robust(
"amqp://guest:guest@localhost/",
reconnect_interval=5 # Секунды между попытками
)Следующие рекомендации помогут создать надёжный продюсер для production:
Создание подключения — дорогая операция. Переиспользуйте connection и channel вместо создания на каждый запрос:
# ❌ Плохо — создание на каждый запрос
async def send_message():
conn = await aio_pika.connect("amqp://guest:guest@localhost/")
# ...
await conn.close()
# ✅ Хорошо — создание при старте приложения
class RabbitMQProducer:
def __init__(self):
self.connection = None
self.channel = None
self.exchange = None
async def connect(self):
self.connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
self.channel = await self.connection.channel()
self.exchange = await self.channel.declare_exchange(
"app_exchange",
aio_pika.ExchangeType.TOPIC,
durable=True
)
async def publish(self, routing_key: str, body: bytes):
await self.exchange.publish(
Message(body=body, delivery_mode=2),
routing_key=routing_key
)
async def close(self):
await self.connection.close()Для распределённой трассировки добавляйте уникальные ID в сообщения:
import uuid
correlation_id = str(uuid.uuid4())
message = Message(
body=json.dumps(event_data).encode(),
correlation_id=correlation_id,
headers={
"trace_id": correlation_id,
"span_id": uuid.uuid4().hex[:16]
}
)Ограничьте время жизни сообщения чтобы избежать обработки устаревших данных:
# Сообщение актуально только 1 час
message = Message(
body=b"Process this",
expiration="3600000" # 1 час в миллисекундах
)RabbitMQ не предназначен для передачи больших файлов — это снижает производительность. Для больших данных используйте внешние хранилища:
# ❌ Плохо — сообщение 10MB
large_message = Message(body=large_binary_data)
# ✅ Хорошо — отправить ссылку на S3
message = Message(
body=json.dumps({"file_url": "s3://bucket/file.zip"}).encode()
)Рекомендация: Размер сообщения < 1MB. Для больших данных передавайте ссылку на внешнее хранилище.
Полный пример продюсера для публикации событий домена в микросервисной архитектуре:
import aio_pika
from aio_pika import Message, ExchangeType
from typing import Optional
import json
import uuid
from datetime import datetime
class EventProducer:
"""Продюсер для публикации событий домена."""
def __init__(self, rabbitmq_url: str):
self.rabbitmq_url = rabbitmq_url
self.connection: Optional[aio_pika.Connection] = None
self.channel: Optional[aio_pika.Channel] = None
self.exchange: Optional[aio_pika.Exchange] = None
async def connect(self):
"""Подключение к RabbitMQ и объявление обменника."""
self.connection = await aio_pika.connect_robust(
self.rabbitmq_url,
client_properties={"connection_name": "event_producer"}
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=1)
# Topic exchange для событий домена
self.exchange = await self.channel.declare_exchange(
"domain_events",
ExchangeType.TOPIC,
durable=True
)
async def publish_event(self, event_type: str, payload: dict):
"""
Публикация события домена.
Args:
event_type: Тип события в формате 'entity.action'
Например: 'user.created', 'order.paid'
payload: Данные события (сериализуемые в JSON)
"""
if not self.exchange:
raise RuntimeError("Not connected to RabbitMQ")
correlation_id = str(uuid.uuid4())
event = {
"event_id": correlation_id,
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
"payload": payload
}
message = Message(
body=json.dumps(event).encode(),
content_type="application/json",
delivery_mode=2,
correlation_id=correlation_id,
headers={
"event_type": event_type,
"trace_id": correlation_id
}
)
await self.exchange.publish(
message,
routing_key=event_type
)
async def disconnect(self):
"""Закрытие подключения."""
if self.connection:
await self.connection.close()
# Использование
async def main():
producer = EventProducer("amqp://guest:guest@localhost/")
await producer.connect()
# Публикация события
await producer.publish_event(
"user.created",
{"user_id": 123, "email": "user@example.com"}
)
await producer.disconnect()В следующей теме изучим консьюмеров — как получать и обрабатывать сообщения.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.