Архитектура RabbitMQ, основные компоненты AMQP, установка и первое подключение через aio-pika
RabbitMQ — самый популярный open-source брокер сообщений, реализующий протокол AMQP 0.9.1. Понимание архитектуры RabbitMQ — фундамент для построения надёжных распределённых систем.
В современных распределённых системах сервисы не должны общаться напрямую. Жёсткая связанность создаёт проблемы:
RabbitMQ решает эти проблемы через асинхронную передачу сообщений:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Producer │ ───► │ RabbitMQ │ ───► │ Consumer │
│ (сервис А) │ │ (брокер) │ │ (сервис Б) │
└─────────────┘ └──────────────┘ └─────────────┘
Продюсер и консьюмер не знают друг о друге — они общаются только через брокер. Это называется слабая связанность (loose coupling).
AMQP (Advanced Message Queuing Protocol) — открытый стандарт протокола для обмена сообщениями на уровне приложений. RabbitMQ реализует версию AMQP 0.9.1.
Ключевые концепции AMQP:
| Концепция | Описание |
|---|---|
| Producer | Создаёт и публикует сообщения |
| Consumer | Подписывается на очередь и обрабатывает сообщения |
| Queue | Буфер, хранящий сообщения до обработки |
| Exchange | Принимает сообщения от продюсеров и маршрутизирует в очереди |
| Binding | Правило, связывающее обменник с очередью |
| Routing Key | Ключ, используемый для маршрутизации сообщения |
RabbitMQ состоит из следующих компонентов:
import aio_pika
# Connection — TCP-соединение с брокером
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
# Channel — виртуальное соединение внутри connection
# Одно connection может содержать множество channel
channel = await connection.channel()Connection — физическое TCP-соединение с брокером. Создание соединения — дорогая операция.
Channel — лёгкое виртуальное соединение, мультиплексированное поверх connection. Каждый channel имеет свой набор очередей, обменников и контекст выполнения.
Best Practice: Создавайте одно connection на приложение и несколько channel для разных задач (публикация, потребление, RPC).
Обменник принимает сообщения от продюсеров и маршрутизирует их в очереди. Продюсер всегда публикует сообщения в обменник, никогда напрямую в очередь.
Типы обменников определяют логику маршрутизации:
| Тип | Логика маршрутизации |
|---|---|
| direct | Точное совпадение routing_key и binding_key |
| fanout | Рассылка во все привязанные очереди (игнорирует routing_key) |
| topic | Маршрутизация по паттернам с wildcard (*, #) |
| headers | Маршрутизация по заголовкам сообщения |
Очередь — буфер, хранящий сообщения. Сообщения обрабатываются в порядке FIFO (First In, First Out).
Параметры очереди:
await channel.declare_queue(
"my_queue",
durable=True, # Сохраняется после перезапуска RabbitMQ
exclusive=False, # Не доступна для других connection
auto_delete=False, # Не удаляется после отписки последнего консьюмера
)Binding связывает обменник с очередью и определяет, какие сообщения попадут в очередь:
# Привязка очереди к обменнику с ключом маршрутизации
await queue.bind(exchange, routing_key="user.created")Запустите RabbitMQ с Management plugin через Docker:
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=guest \
rabbitmq:3-managementОткройте http://localhost:15672 и войдите как guest/guest.
Установите библиотеку aio-pika для работы с RabbitMQ:
pip install aio-pikaСоздадим базовое подключение к RabbitMQ с объявлением обменника и очереди:
import aio_pika
from aio_pika import ExchangeType
async def main():
# Подключение к RabbitMQ
connection = await aio_pika.connect_robust(
"amqp://guest:guest@localhost/",
client_properties={"connection_name": "my_app"}
)
# Создание канала
channel = await connection.channel()
# Объявление обменника
exchange = await channel.declare_exchange(
"hello_exchange",
ExchangeType.DIRECT,
durable=True
)
# Объявление очереди
queue = await channel.declare_queue(
"hello_queue",
durable=True
)
# Привязка очереди к обменнику
await queue.bind(exchange, routing_key="hello")
print("Подключение успешно!")
# Закрытие подключения
await connection.close()
if __name__ == "__main__":
import asyncio
asyncio.run(main())aio-pika поддерживает различные форматы URI для подключения:
# Базовый формат
"amqp://user:password@host:port/vhost"
# Примеры
"amqp://guest:guest@localhost/"
"amqp://admin:secret@rabbitmq.example.com:5672/production"
"amqps://guest:guest@localhost/" # SSL-соединение
# С параметрами
"amqp://guest:guest@localhost/?heartbeat=60&connection_timeout=30"| Параметр | Описание |
|---|---|
heartbeat | Интервал проверки соединения (секунды) |
connection_timeout | Таймаут подключения (секунды) |
ssl | Включить SSL (или использовать amqps://) |
RabbitMQ Management Plugin предоставляет:
Пример запроса к Management API для получения информации об очереди:
import aiohttp
async def get_queue_info():
async with aiohttp.ClientSession() as session:
async with session.get(
"http://localhost:15672/api/queues/%2F/hello_queue",
auth=aiohttp.BasicAuth("guest", "guest")
) as response:
data = await response.json()
print(f"Messages: {data['messages']}")
print(f"Consumers: {data['consumers']}")Понимание пути сообщения критически важно:
1. Producer публикует сообщение в Exchange
│
▼
2. Exchange маршрутизирует в Queue (согласно binding)
│
▼
3. Queue хранит сообщение до появления Consumer
│
▼
4. Consumer получает сообщение
│
▼
5. Consumer обрабатывает и отправляет ACK
│
▼
6. RabbitMQ удаляет сообщение из очереди
Если Consumer не отправил ACK (упал, таймаут), сообщение возвращается в очередь или отправляется в Dead Letter Exchange.
Избегайте следующих типичных ошибок при работе с RabbitMQ:
Всегда объявляйте обменник и очередь явно перед публикацией:
# ❌ Неправильно — очередь не существует до публикации
await channel.default_exchange.publish(message, routing_key="my_queue")
# ✅ Правильно — явное объявление обменника и очереди
exchange = await channel.declare_exchange("my_exchange", ExchangeType.DIRECT)
queue = await channel.declare_queue("my_queue")
await queue.bind(exchange)Используйте durable=True для критичных данных:
# ❌ Сообщения пропадут после перезапуска RabbitMQ
queue = await channel.declare_queue("my_queue") # durable=False по умолчанию
# ✅ Для критичных данных
queue = await channel.declare_queue("my_queue", durable=True)Переиспользуйте connection для уменьшения накладных расходов:
# ❌ Очень медленно — TCP handshake каждый раз
async def publish_message():
connection = await aio_pika.connect("amqp://guest:guest@localhost/")
# ... публикация ...
await connection.close()
# ✅ Переиспользование connection
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
# ... публикация многих сообщений ...В следующей теме создадим первого продюсера и опубликуем сообщение.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.