Отправка сообщений во все брокеры: базовая публикация, кастомизация заголовков, delivery modes, подтверждение отправки.
Отправка сообщений во все брокеры: базовая публикация, кастомизация заголовков, delivery modes, подтверждение отправки.
FastStream абстрагирует различия между брокерами сообщений, предоставляя единый API для публикации. Это позволяет:
Почему это важно: В production вы можете начать с RabbitMQ для простоты, но позже мигрировать на Kafka для throughput. FastStream делает это безболезненным.
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
# Публикация словаря
await broker.publish({"order_id": 123}, "orders")
# Публикация строки
await broker.publish("Hello, World!", "greetings")
# Публикация Pydantic-модели
from pydantic import BaseModel
class Order(BaseModel):
id: int
amount: float
order = Order(id=123, amount=99.99)
await broker.publish(order, "orders") # Автоматическая сериализация в JSONПочему Pydantic модели предпочтительнее словарей:
Когда использовать строки: Только для интеграции с legacy системами, ожидающими raw текст.
@broker.subscriber("orders.new")
async def process_order(order: dict):
# Обработать заказ
await db.save(order)
# Опубликовать событие
await broker.publish(
{"order_id": order["id"], "status": "processed"},
"orders.processed"
)Почему это анти-pattern: Если db.save() выполнится, а publish() упадёт — данные в БД есть, а событие о том не сообщает. Подписчики orders.processed не узнают о заказе.
Правильный подход — транзакционная публикация:
@broker.subscriber("orders.new")
async def process_order(order: dict):
# Сначала готовим событие
event = {"order_id": order["id"], "status": "processed"}
# Сохраняем в БД вместе с событием (outbox pattern)
await db.save_with_event(order, event)
# Публикуем после успешного коммита
await broker.publish(event, "orders.processed")Или используйте транзакционные outbox паттерн для гарантии доставки.
Декоратор @publisher объявляет издатель на уровне приложения:
@broker.publisher("orders.processed")
@broker.subscriber("orders.new")
async def process_order(order: dict):
await db.save(order)
# publisher автоматически доступен
await process_order.publish(
{"order_id": order["id"], "status": "processed"}
)Преимущества перед broker.publish():
process_order.publish в unit тестахКогда использовать broker.publish() напрямую:
Пример response routing:
@broker.subscriber("orders.new")
@broker.publisher("orders.processed")
async def process_order(order: dict) -> dict:
# Возвращаемое значение автоматически публикуется
return {"order_id": order["id"], "status": "processed"}await broker.publish(
{"order_id": 123},
"orders",
headers={
"priority": "high",
"trace_id": "abc-123",
"content-type": "application/json"
}
)Зачем нужны заголовки:
Заголовки — это метаданные, которые не являются частью бизнес-логики сообщения. Они решают несколько критичных задач:
trace_id и span_id позволяют отследить путь запроса через все микросервисыschema_version: "1.0" помогает обработчикам поддерживать обратную совместимостьtenant_id для изоляции данных разных клиентов в одной очередиProduction пример с tracing:
import uuid
from contextvars import ContextVar
# Context variable для хранения trace_id в рамках обработки
trace_id_ctx: ContextVar[str] = ContextVar("trace_id", default="")
@broker.subscriber("orders.new")
async def process_order(order: dict, message: Message):
# Получаем trace_id из входящего сообщения или генерируем новый
trace_id = message.headers.get("trace_id", str(uuid.uuid4()))
trace_id_ctx.set(trace_id)
# Логируем с trace_id — теперь все логи этого запроса связаны
logger.info("Processing order", extra={"trace_id": trace_id})
# Публикуем с тем же trace_id
await broker.publish(
{"order_id": order["id"], "status": "processed"},
"orders.processed",
headers={"trace_id": trace_id} # Сохраняем trace chain
)Когда не использовать заголовки:
Для трассировки и RPC:
import uuid
correlation_id = str(uuid.uuid4())
await broker.publish(
{"request": "data"},
"requests",
correlation_id=correlation_id
)
# В обработчике
@broker.subscriber("requests")
async def handle_request(data: dict, message: Message):
trace_id = message.correlation_id
# Логировать с trace_id
logger.info(f"Processing {data}", extra={"trace_id": trace_id})Correlation ID vs Trace ID — в чем разница:
Когда использовать correlation_id:
Production пример с дедупликацией:
import uuid
from redis import AsyncRedis
dedup_cache = AsyncRedis("redis://localhost")
@broker.subscriber("payments.process")
async def process_payment(payment: dict, message: Message):
corr_id = message.correlation_id
# Проверяем, не обрабатывали ли уже это сообщение
if await dedup_cache.exists(f"processed:{corr_id}"):
logger.warning(f"Duplicate message {corr_id}, skipping")
return
# Обрабатываем платеж
await process(payment)
# Запоминаем correlation_id (TTL 24 часа)
await dedup_cache.set(f"processed:{corr_id}", "1", ex=86400)
# Публикуем результат
await broker.publish(
{"payment_id": payment["id"], "status": "done"},
"payments.done",
correlation_id=corr_id # Сохраняем correlation
)Для RPC указать очередь ответа:
await broker.publish(
{"query": "select *"},
"queries",
reply_to="query-results"
)Почему RPC через брокер — это не always good idea:
RPC (request-response) через брокер сообщений увеличивает latency по сравнению с HTTP/gRPC. Используйте когда:
Для low-latency сценариев используйте HTTP/gRPC напрямую.
# Персистентное сообщение (сохраняется на диск)
await broker.publish(
{"order_id": 123},
"orders",
persistent=True # delivery_mode=2
)
# Неперсистентное (только в памяти)
await broker.publish(
{"event": "click"},
"events",
persistent=False # delivery_mode=1
)Что происходит под капотом:
persistent=True — RabbitMQ записывает сообщение на диск перед подтверждением отправки. При перезапуске брокера сообщение сохраняется.persistent=False — сообщение только в RAM. Быстрее, но при креше брокера теряется.Критичный нюанс: Персистентность сообщения работает ТОЛЬКО если очередь и exchange тоже персистентные. Иначе сообщение сохраняется, но queue/exchange исчезают — сообщение теряется.
Trade-off: Throughput vs Durability
| Mode | Throughput | Durability | Use Case |
|---|---|---|---|
persistent=False | ~50K msg/s | При креше теряется | Метрики, клики, логи |
persistent=True | ~5-10K msg/s | Переживает креш | Заказы, платежи, регистрации |
Production правило:
persistent=True: критичные данные (заказы, платежи, пользовательские данные)persistent=False: временные события (клики, метрики, health checks), которые можно regenerate# Высокий приоритет
await broker.publish(
{"order_id": 123, "vip": True},
"orders",
priority=10 # 0-10, чем выше — тем приоритетнее
)
# Низкий приоритет
await broker.publish(
{"order_id": 124},
"orders",
priority=1
)Как это работает:
RabbitMQ хранит сообщения в queue. При priority брокер сортирует очередь — сообщения с высоким приоритетом доставляются ПЕРВЫМИ, даже если они пришли позже.
Критичное требование: Очередь ДОЛЖНА быть объявлена с max_priority, иначе приоритет игнорируется:
# Без max_priority приоритет НЕ работает
broker = RabbitBroker("amqp://localhost")
@broker.subscriber(
"orders",
queue_options={
"x-max-priority": 10 # Обязательно!
}
)
async def process_order(order: dict):
...Когда использовать приоритеты:
Когда НЕ использовать:
Production паттерн: Dynamic priority based on SLA
from datetime import datetime, timedelta
@broker.subscriber("orders.new")
async def process_order_with_priority(order: dict):
created_at = datetime.fromisoformat(order["created_at"])
age_minutes = (datetime.utcnow() - created_at).total_seconds() / 60
# Чем старше сообщение, тем выше приоритет
if age_minutes > 50: # Почти timeout (60 мин)
priority = 10
elif age_minutes > 30:
priority = 5
else:
priority = 1
await broker.publish(
order,
"orders.processing",
priority=priority
)# Сообщение истекает через 1 час
await broker.publish(
{"cache_key": "user:123"},
"cache-invalidation",
expiration=3600000 # миллисекунды
)Зачем нужен TTL:
TTL гарантируетет, что сообщение будет удалено из очереди после указанного времени, даже если не было обработано.
Use cases:
TTL на уровне queue vs message:
# TTL на очереди (все сообщения живут 1 час)
@broker.subscriber(
"notifications",
queue_options={
"x-message-ttl": 3600000 # 1 час для всех
}
)
async def handle_notification(msg: dict):
...
# TTL на сообщение (гибче)
await broker.publish(
{"type": "urgent", "data": "..."},
"notifications",
expiration=60000 # 1 минута для критичных
)
await broker.publish(
{"type": "normal", "data": "..."},
"notifications",
expiration=3600000 # 1 час для обычных
)Production нюанс: Сообщение с истекшим TTL не удаляется сразу. Оно удаляется когда:
Это значит, что при большом backlog вы можете получить сообщения, которые "expired" но еще не удалены.
| Характеристика | RabbitMQ | Kafka | Redis | NATS |
|---|---|---|---|---|
| Гарантия доставки | At-least-once | Exactly-once (с idempotence) | At-most-once (pub/sub) | At-most-once (core) |
| Throughput | ~10K msg/s | ~1M msg/s | ~50K msg/s | ~100K msg/s |
| Latency | <10ms | <5ms | <1ms | <1ms |
| Message ordering | Queue level | Partition level | No guarantee | No guarantee (core) |
| Message replay | ❌ | ✅ | ✅ (streams) | ✅ (JetStream) |
| Best for | Task queues, RPC | Event sourcing, streams | Caching, simple pub/sub | IoT, real-time |
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://localhost")
await broker.publish(
{"order_id": 123},
"orders.new", # routing key
exchange="orders", # exchange name
exchange_type="direct", # тип exchange
persistent=True, # персистентность
priority=5, # приоритет
headers={"tenant": "acme"} # заголовки
)Почему RabbitMQ — good default choice:
Когда НЕ RabbitMQ:
from faststream.kafka import KafkaBroker
broker = KafkaBroker("localhost:9092")
await broker.publish(
{"order_id": 123},
"orders", # topic
key=b"order-123", # partition key (для порядка)
headers={"trace_id": "abc"}, # заголовки
partition=0 # конкретная партиция (редко)
)Partition key — это критично:
Partition key определяет, в какую партицию попадет сообщение. Все сообщения с одинаковым key идут в одну партицию, что гарантирует:
Production примеры partition key:
# Все события одного пользователя в одной партиции (ordering guaranteed)
await broker.publish(
{"event": "user.login"},
"user-events",
key=b"user-123" # user_id как partition key
)
# Все заказы одного ресторана в одной партиции
await broker.publish(
{"order_id": 456},
"orders",
key=b"restaurant-789" # restaurant_id как partition key
)Когда НЕ указывать key:
Kafka vs RabbitMQ trade-offs:
| Aspect | Kafka | RabbitMQ |
|---|---|---|
| Ordering | Per-partition | Per-queue |
| Replay | ✅ Хранит историю | ❌ Нет replay |
| Throughput | ~1M msg/s | ~10K msg/s |
| Routing | Simple (topics) | Flexible (exchanges) |
| Latency | <5ms | <10ms |
from faststream.redis import RedisBroker
broker = RedisBroker("redis://localhost:6379")
# Pub/Sub
await broker.publish(
{"event": "user.login"},
"events"
)
# Stream (персистентность)
await broker.publish(
{"event": "user.login"},
"events",
stream=True
)
# List (очередь задач)
await broker.publish(
{"task": "send_email"},
"tasks",
list=True
)Redis — это НЕ типичный брокер сообщений:
Redis — in-memory datastore, который "из коробки" поддерживает pub/sub, streams и lists. Это одновременно и сила, и слабость.
Три паттерна в Redis:
| Pattern | Durability | Ordering | Use Case |
|---|---|---|---|
| Pub/Sub | ❌ At-most-once | ❌ | Real-time notifications, live updates |
| Stream | ✅ Persistent | ✅ Per-stream | Event logging, simple event sourcing |
| List | ✅ Persistent | ✅ FIFO | Task queues, job processing |
Когда Redis — правильный выбор:
Когда НЕ Redis:
Production пример: когда использовать stream vs pub/sub
# Pub/Sub — для real-time уведомлений (не критично если потеряется)
await broker.publish(
{"user_id": 123, "notification": "New message!"},
"notifications", # Pub/Sub channel
# Без stream=True — получатели offline = сообщение потеряно
)
# Stream — для event logging (нужна персистентность)
await broker.publish(
{"user_id": 123, "event": "message.sent", "message_id": 456},
"user-events", # Stream name
stream=True, # Теперь это Redis Stream — сохраняется
)from faststream.nats import NatsBroker
broker = NatsBroker("nats://localhost:4222")
# Core (без персистентности)
await broker.publish(
{"event": "user.login"},
"events.user.login"
)
# JetStream (персистентность)
await broker.publish(
{"event": "user.login"},
"events.user.login",
stream="events"
)NATS — lightweight alternative:
NATS создан для high-performance IoT и real-time систем. Два режима:
| Mode | Durability | Ordering | Use Case |
|---|---|---|---|
| Core NATS | ❌ At-most-once | ❌ | IoT telemetry, real-time sync |
| JetStream | ✅ Persistent | ✅ Per-stream | Event sourcing, task queues |
Почему NATS:
Когда NATS — правильный выбор:
Когда НЕ NATS:
Production пример: Core vs JetStream
# Core NATS — для real-time telemetry (не критично если потеряется)
await broker.publish(
{"device_id": "sensor-123", "temperature": 23.5},
"iot.telemetry"
# Без stream — device offline = данные потеряны
)
# JetStream — для критичных событий
await broker.publish(
{"device_id": "sensor-123", "alert": "temperature > threshold"},
"iot.alerts",
stream="iot-events" # Теперь персистентно
)Отправка нескольких сообщений в одном запросе:
Network overhead: Каждое сообщение = отдельный network round-trip + сериализация + подтверждение. При 1000 msg/s это создает значительный overhead.
Batch решает: Отправляем N сообщений за один network call → меньше latency, выше throughput.
from faststream.rabbit import RabbitBatch
async with RabbitBatch(broker) as batch:
batch.publish({"id": 1}, "orders")
batch.publish({"id": 2}, "orders")
batch.publish({"id": 3}, "orders")
# Один network round-trip вместо трёхКак работает RabbitBatch:
Когда использовать batch:
Batch trade-offs:
| Aspect | Single Publish | Batch Publish |
|---|---|---|
| Latency per msg | <10ms | <10ms (same) |
| Throughput | ~10K msg/s | ~50-100K msg/s |
| Memory | Low | Higher (буферизация) |
| Failure handling | Single msg retry | Весь batch retry |
Production пример: bulk import
async def import_products(products: list[dict]):
"""Импорт тысяч товаров в систему"""
async with RabbitBatch(broker, batch_size=100) as batch:
for product in products:
batch.publish(
product,
"products.imported",
exchange="products",
persistent=True
)
# Каждые 100 сообщений отправляются за разKafka по умолчанию batch'ит сообщения на уровне producer'а:
from faststream.kafka import KafkaBroker
broker = KafkaBroker(
"localhost:9092",
# Kafka batch настройки
batch_size=16384, # 16KB max batch size
linger_ms=5, # Ждать 5ms для заполнения batch
)
# Каждое publish вызов может попасть в batch
await broker.publish({"id": 1}, "orders")
await broker.publish({"id": 2}, "orders")
await broker.publish({"id": 3}, "orders")
# Kafka автоматически объединит в batchKafka batch vs RabbitMQ batch:
| Aspect | Kafka | RabbitMQ |
|---|---|---|
| Batch automatic | ✅ Автоматически | ❌ Явный batch context |
| Configuration | linger_ms, batch_size | Context manager |
| Throughput | ~1M msg/s | ~50-100K msg/s |
# По умолчанию broker подтверждает публикацию
result = await broker.publish(
{"order_id": 123},
"orders",
raise_timeout=True # Выбросить exception при таймауте
)
if result:
print("Message published successfully")# acks="all" — подтверждение от всех реплик
broker = KafkaBroker(
"localhost:9092",
acks="all", # Надёжность
enable_idempotence=True # Без дубликатов
)
await broker.publish({"order_id": 123}, "orders")
# Exception при ошибке публикацииKafka acks — trade-off между reliability и latency:
| acks value | Что подтверждает | Latency | Durability | Use Case |
|---|---|---|---|---|
acks=0 | Нет подтверждения | Lowest | ❌ Может потерять | Метрики, логи |
acks=1 | Leader только | Low | ⚠️ Если leader crash | Не критичные данные |
acks=all | Leader + все ISR | Highest | ✅ Переживает crash | Заказы, платежи |
Idempotence producer — почему это важно:
Без idempotence при network timeout producer может отправить сообщение дважды (не уверен, дошло ли первое):
Producer Kafka
| |
|-- Publish message ----------->|
| | (сохраняет)
| <-- Network timeout ---------| (ACK потерялся)
| |
|-- Publish message ----------->| (retry — дубликат!)
С enable_idempotence=True:
Когда НЕ включать idempotence:
Этот пример показывает production-ready паттерн для event-driven архитектуры с domain events:
from faststream import FastStream
from faststream.rabbit import RabbitBroker
from pydantic import BaseModel
from datetime import datetime
import uuid
class DomainEvent(BaseModel):
id: str
type: str
aggregate_id: str
timestamp: str
payload: dict
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
# Декларация publishers
@broker.publisher("orders.events")
@broker.publisher("users.events")
@broker.publisher("payments.events")
class EventPublisher:
pass
async def publish_event(event: DomainEvent, exchange: str):
"""Универсальная функция публикации событий"""
await broker.publish(
event.model_dump(),
f"{event.type}",
exchange=exchange,
exchange_type="topic",
persistent=True,
headers={
"trace_id": str(uuid.uuid4()),
"event_type": event.type,
"schema_version": "1.0"
},
correlation_id=event.id
)
# Использование
async def create_order(user_id: int, items: list):
order_id = str(uuid.uuid4())
# Сохранить заказ
await db.orders.insert({"id": order_id, "user_id": user_id})
# Опубликовать событие
event = DomainEvent(
id=str(uuid.uuid4()),
type="order.created",
aggregate_id=order_id,
timestamp=datetime.utcnow().isoformat(),
payload={"user_id": user_id, "items": items}
)
await publish_event(event, "orders.events")Разбор архитектурных решений:
class DomainEvent(BaseModel):
id: str # Уникальный ID события (для дедупликации)
type: str # Тип события (order.created, user.signup)
aggregate_id: str # ID сущности, которую изменили
timestamp: str # Когда произошло событие (не когда опубликовано!)
payload: dict # Бизнес-данные событияПочему не просто dict:
exchange_type="topic" # Позволяет routing по паттернамПочему topic exchange:
order.*, *.created, order.shippeddirect (точный match) и fanout (broadcast)Production routing примеры:
# Подписчик всех событий заказов
@broker.subscriber("order.*", exchange="orders.events", exchange_type="topic")
async def handle_any_order_event(event: dict): ...
# Подписчик только на creation
@broker.subscriber("order.created", exchange="orders.events", exchange_type="topic")
async def handle_order_created(event: dict): ...
# Подписчик всех creation событий
@broker.subscriber("*.created", exchange="orders.events", exchange_type="topic")
async def handle_any_created(event: dict): ...Текущий код имеет anti-pattern:
async def create_order(user_id: int, items: list):
order_id = str(uuid.uuid4())
# 1. Сохранить заказ
await db.orders.insert({"id": order_id, "user_id": user_id})
# 2. Опубликовать событие
event = DomainEvent(...)
await publish_event(event, "orders.events") # ⚠️ Может упасть!Если step 2 упадет: заказ в БД есть, но событие не опубликовано. Подписчики не узнают о заказе.
Решение 1: Outbox Pattern (production-ready)
async def create_order_with_outbox(user_id: int, items: list):
order_id = str(uuid.uuid4())
event = DomainEvent(
id=str(uuid.uuid4()),
type="order.created",
aggregate_id=order_id,
timestamp=datetime.utcnow().isoformat(),
payload={"user_id": user_id, "items": items}
)
# Атомарно сохраняем заказ И событие в БД
await db.transactional_save_order_and_event(order, event)
# Отдельный процесс (CDC poller) прочитает outbox и опубликует
# Это гарантирует доставку даже если broker downРешение 2: Transactional outbox с Debezium
Application → DB (order + event in outbox table)
↓
Debezium CDC
↓
Kafka topic
Debezium читает DB binlog и публикует изменения в Kafka — атомарность на уровне БД.
class OrderCreated(BaseModel):
order_id: str
user_id: int
total: float
items: list[str]
await broker.publish(
OrderCreated(order_id="123", user_id=1, total=99.9, items=["item1"]),
"orders.created"
)Почему: Валидация на отправке, автодокументирование, type safety.
import uuid
trace_id = str(uuid.uuid4())
await broker.publish(
data,
"channel",
correlation_id=trace_id,
headers={"trace_id": trace_id}
)Почему: Без tracing невозможно debug distributed systems.
async def publish_with_retry(data, channel):
for attempt in range(3):
try:
await broker.publish(data, channel, raise_timeout=True)
return
except PublishError:
await asyncio.sleep(2 ** attempt) # 1s, 2s, 4s
raise PublishFailed("All retries exhausted")Почему: Сеть ненадежна, transient failures случаются часто.
import time
start = time.monotonic()
await broker.publish(data, channel)
latency = time.monotonic() - start
metrics.histogram("publish_latency_ms", latency * 1000)Почему: Рост latency — ранний сигнал о проблемах брокера.
async with RabbitBatch(broker, batch_size=100) as batch:
for item in items:
batch.publish(item, "channel")Почему: 10x больше throughput при тех же ресурсах.
# ПЛОХО — ошибка публикации теряется
await broker.publish(data, "channel")Проблема: Если broker down, сообщение теряется тихо.
Хорошо:
try:
await broker.publish(data, "channel", raise_timeout=True)
except PublishError as e:
logger.error(f"Publish failed: {e}")
# Retry или dead letter queue# ПЛОХО — сначала publish, потом DB
await broker.publish({"order_id": 123}, "orders.created")
await db.orders.insert(order) # Может упасть!Проблема: Событие опубликовано, но заказа нет — инконсистентность.
Хорошо: Сначала БД, потом publish (или outbox pattern).
# ПЛОХО — 10MB payload
await broker.publish({
"order_id": 123,
"huge_payload": giant_json_blob # 10MB
}, "orders.created")Проблема:
Хорошо: Публикуйте ID, данные загруйте отдельно:
await broker.publish({
"order_id": 123,
"data_url": f"https://storage/orders/123.json" # Ссылка на данные
}, "orders.created")# ПЛОХО — все события в один exchange
await broker.publish(order_event, "events", exchange="all-events")
await broker.publish(user_event, "events", exchange="all-events")
await broker.publish(payment_event, "events", exchange="all-events")Проблема:
Хорошо: Разделите по domain:
await broker.publish(order_event, "orders", exchange="orders.events")
await broker.publish(user_event, "users", exchange="users.events")# ПЛОХО — заказы без persistent
await broker.publish({"order_id": 123}, "orders.created") # persistent=False by defaultПроблема: При restart RabbitMQ заказы потеряны.
Хорошо:
await broker.publish(
{"order_id": 123},
"orders.created",
persistent=True # Явно указываем
)Нужно опубликовать событие
|
├── Критичные данные (заказы, платежи)?
│ ├── Да → persistent=True + retry + outbox pattern
│ └── Нет → persistent=False для скорости
|
├── Высокий throughput (>10K msg/s)?
│ ├── Да → Kafka или RabbitMQ batch
│ └── Нет → RabbitMQ проще
|
├── Нужен message replay?
│ ├── Да → Kafka (хранит историю)
│ └── Нет → RabbitMQ
|
├── Ultra-low latency (<1ms)?
│ ├── Да → NATS или Redis
│ └── Нет → RabbitMQ/Kafka
|
└── Simple pub/sub без routing?
├── Да → Redis pub/sub
└── Нет → RabbitMQ exchanges
broker.publish(data, channel) — просто, но без гарантийСледующая тема — Подписка и обработка сообщений (Subscribers): декораторы, фильтры, batching, acknowledgment, ручное управление подтверждением.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.