AMQP-модель: exchanges, queues, bindings, routing keys. Типы exchanges и сценарии использования.
AMQP-модель: exchanges, queues, bindings, routing keys. Типы exchanges и сценарии использования в FastStream.
RabbitMQ реализует протокол AMQP 0.9.1 с моделью маршрутизации через exchanges:
Producer ──> Exchange ──> Queue ──> Consumer
│
Binding
(routing key)
Ключевые компоненты:
В отличие от прямой отправки в очередь, AMQP использует двухступенчатую модель:
# Producer отправляет сообщение в exchange
await broker.publish(
{"order_id": 123},
exchange="orders", # Куда отправить
routing_key="new" # Ключ маршрутизации
)
# Exchange маршрутизирует в queue по binding
# orders (exchange) + "new" (routing key) → orders.new (queue)Преимущество: гибкая маршрутизация без знания очередей производителем.
Маршрутизация по точному совпадению routing key:
Direct Exchange "orders"
┌──────────────────────┐
│ │
routing key │ │ binding key
"new" ─────────>│ ├──────────> "new" ──> orders.new queue
"paid" ────────>│ ├──────────> "paid" ──> orders.paid queue
"cancelled" ───>│ ├──────────> "cancelled" ──> orders.cancelled queue
│ │
└──────────────────────┘
Сценарий: разделение сообщений по типам (новый заказ, оплата, отмена).
FastStream:
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://localhost")
@broker.subscriber(
"orders", # routing key
"orders.new", # queue name
exchange="orders", # exchange name
exchange_type="direct"
)
async def handle_new_order(order: dict):
...Рассылка во все привязанные очереди (игнорирует routing key):
Fanout Exchange "notifications"
┌──────────────────────────────┐
│ │
Message ───────>│ ├────> email-queue
│ ├────> sms-queue
│ ├────> push-queue
└──────────────────────────────┘
Сценарий: broadcast-уведомления, события домена (order.created).
FastStream:
@broker.subscriber(
"", # routing key не важен
"email-queue",
exchange="notifications",
exchange_type="fanout"
)
async def send_email(event: dict):
...
@broker.subscriber(
"",
"sms-queue",
exchange="notifications",
exchange_type="fanout"
)
async def send_sms(event: dict):
...Маршрутизация по паттерну (wildcards * и #):
* — ровно одно слово# — ноль или более слов Topic Exchange "logs"
┌──────────────────────┐
│ │
"app.error" ───>│ ├──────────> "app.*" ──> app-queue
"app.warning" ─>│ ├──────────> "*.error" ──> errors-queue
"db.error.sql"─>│ ├──────────> "db.#" ────> db-queue
│ │
└──────────────────────┘
Сценарий: логирование, события с иерархической структурой.
FastStream:
# Получать все события приложения
@broker.subscriber(
"app.*", # app.error, app.warning, app.info
"app-queue",
exchange="logs",
exchange_type="topic"
)
async def handle_app_event(event: dict):
...
# Получать только ошибки
@broker.subscriber(
"*.error", # app.error, db.error, api.error
"errors-queue",
exchange="logs",
exchange_type="topic"
)
async def handle_error(event: dict):
...Маршрутизация по заголовкам сообщения (игнорирует routing key):
await broker.publish(
{"data": "..."},
exchange="headers",
headers={"priority": "high", "type": "order"}
)Сценарий: сложная маршрутизация по метаданным.
FastStream:
@broker.subscriber(
"",
"high-priority-queue",
exchange="headers",
exchange_type="headers",
bind_arguments={"priority": "high"}
)
async def handle_high_priority(msg: dict):
...from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker(
"amqp://guest:guest@localhost:5672/",
max_retries=5,
retry_delay=1.0
)
app = FastStream(broker)from pydantic import BaseModel
class Order(BaseModel):
id: int
amount: float
@broker.publisher("orders.new")
async def create_order(order: Order):
await broker.publish(order, "orders.new")@broker.subscriber(
"orders.new",
"orders.new.queue",
exchange="orders",
exchange_type="direct",
durable=True # Персистентная очередь
)
async def process_order(order: Order):
print(f"Processing order {order.id}")FastStream автоматически создаёт очереди и exchanges при старте:
@broker.subscriber(
"orders.new",
"orders.new.queue",
exchange="orders",
exchange_type="direct",
durable=True
)
async def process_order(order: Order):
...При старте приложения:
orders (type: direct, durable)orders.new.queue (durable)orders + orders.new → orders.new.queueRabbitMQ требует подтверждения обработки:
@broker.subscriber(
"orders",
"orders.queue",
ack=True # Подтверждение после успешной обработки
)
async def process_order(order: Order):
# Если exception — сообщение вернётся в очередь
await db.save(order)Режимы:
ack=True — ACK после успешного выполнения обработчикаack=False — нет подтверждения (fire-and-forget)Message объектМаршрутизация неудачных сообщений в DLQ:
@broker.subscriber(
"orders",
"orders.queue",
ack=True,
retry_queue="orders.retry",
dead_letter_queue="orders.dlq",
max_retry_count=3
)
async def process_order(order: Order):
if not validate(order):
raise ValueError("Invalid order") # После 3 попыток → DLQЛогика:
retry_queue с задержкойmax_retry_count попыток → dead_letter_queue@broker.subscriber(
"orders",
"orders.priority",
max_priority=10 # Приоритет 0-10
)
async def process_order(order: Order):
...Публикация с приоритетом:
await broker.publish(
order,
"orders",
priority=10 # Высокий приоритет
)Автоматическое удаление старых сообщений:
@broker.subscriber(
"orders",
"orders.queue",
message_ttl=3600000, # 1 час в миллисекундах
expires=3600000 # Время жизни очереди
)
async def process_order(order: Order):
...Для надёжности в кластере:
@broker.subscriber(
"orders",
"orders.quorum",
quorum=True # Quorum queue с репликацией
)
async def process_order(order: Order):
...from faststream import FastStream
from faststream.rabbit import RabbitBroker
from pydantic import BaseModel
from enum import Enum
class OrderStatus(str, Enum):
NEW = "new"
PAID = "paid"
CANCELLED = "cancelled"
class Order(BaseModel):
id: int
amount: float
status: OrderStatus
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
# Exchange для заказов
ORDERS_EXCHANGE = "orders"
@broker.subscriber(
OrderStatus.NEW.value,
"orders.new.queue",
exchange=ORDERS_EXCHANGE,
exchange_type="direct",
durable=True,
ack=True,
retry_queue="orders.retry",
dead_letter_queue="orders.dlq"
)
async def handle_new_order(order: Order):
print(f"New order: {order.id}")
# Обработка...
await broker.publish(order, ORDERS_EXCHANGE, routing_key=OrderStatus.PAID.value)
@broker.subscriber(
OrderStatus.PAID.value,
"orders.paid.queue",
exchange=ORDERS_EXCHANGE,
exchange_type="direct",
durable=True,
ack=True
)
async def handle_paid_order(order: Order):
print(f"Paid order: {order.id}")
# Отправка на склад...
@broker.subscriber(
OrderStatus.CANCELLED.value,
"orders.cancelled.queue",
exchange=ORDERS_EXCHANGE,
exchange_type="direct",
durable=True,
ack=True
)
async def handle_cancelled_order(order: Order):
print(f"Cancelled order: {order.id}")
# Возврат средств...# Получить информацию о очереди
curl -u guest:guest http://localhost:15672/api/queues/%2F/orders.new.queue
# Получить метрики
curl -u guest:guest http://localhost:15672/api/overviewRabbitMQ экспортирует метрики:
rabbitmq_queue_messages — количество сообщенийrabbitmq_queue_consumers — количество потребителейrabbitmq_queue_messages_ready — готовые к доставкеrabbitmq_queue_messages_unacknowledged — в обработкеСледующая тема — Apache Kafka: распределённый лог событий, topics, partitions, consumer groups.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.