Базовые концепции messaging-систем: брокеры, очереди, pub/sub, RPC. Зачем нужен FastStream и как он упрощает работу с очередями.
Почему современные микросервисы не могут жить без очередей сообщений и как FastStream делает работу с ними такой же простой, как создание FastAPI-приложений.
Представьте типичную микросервисную архитектуру:
┌─────────────┐ HTTP ┌─────────────┐
│ Frontend │ ────────────> │ API Gateway │
└─────────────┘ └─────────────┘
│
┌─────────────────┼─────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌───────────────┐
│ User Service│ │ Order Service│ │Payment Service│
└──────────────┘ └──────────────┘ └───────────────┘
Проблема: сервисы вызывают друг друга синхронно по HTTP. Что происходит, если Payment Service упал?
Решение: асинхронная коммуникация через брокер сообщений:
┌──────────────┐ ┌─────────────┐ ┌───────────────┐
│ Order Service│ ──────> │ Broker │ ──────> │Payment Service│
│ (publisher) │ queue │ (RabbitMQ) │ queue │ (subscriber) │
└──────────────┘ └─────────────┘ └───────────────┘
Теперь:
order.created и сразу отвечает пользователюБрокер сообщений (Message Broker) — промежуточное ПО, которое управляет передачей сообщений между приложениями.
Publisher ──> [Broker] ──> Subscriber
Задачи брокера:
Главное правило: сообщение получает ровно один потребитель.
┌─────────────┐
Publisher ──> │ Queue │ ──> Consumer A ✅ (получил)
│ (orders) │ ──> Consumer B ❌ (ждёт следующее)
└─────────────┘
Как работает:
Сценарий: распределение нагрузки между воркерами. Например, 1000 заказов и 5 воркеров — каждый воркер берёт заказы из общей кучи, ни один заказ не обработается дважды.
Пример:
# Publisher: отправка заказа в очередь
await broker.publish({"order_id": 123}, "orders")
# Consumer A ИЛИ Consumer B получит сообщение (только один!)
@broker.subscriber("orders")
async def process_order(order: dict):
print(f"Processing order {order['order_id']}")
# Этот код выполнится ровно один раз для каждого заказаАналогия из жизни: очередь в банке — один клиент идёт к следующему свободному оператору. Два оператора не обслуживают одного клиента одновременно.
Главное правило: сообщение получают все подписчики.
┌───────────────────────┐
Publisher ──> │ Topic/Exchange │ ──> Subscriber A (email) ✅
│ (order.created) │ ──> Subscriber B (analytics) ✅
└───────────────────────┘ └─> Subscriber C (inventory) ✅
Как работает:
Сценарий: событие «заказ создан» интересует разные сервисы — email-сервис хочет отправить подтверждение, аналитика хочет посчитать метрики, склад хочет зарезервировать товар. Все три сервиса должны получить событие.
Пример:
# Publisher: событие "заказ создан"
await broker.publish({"order_id": 123}, "order.created")
# ВСЕ подписчики получат копию сообщения
@broker.subscriber("order.created", "email-service")
async def send_email(event: dict):
print("Отправляю email...")
@broker.subscriber("order.created", "analytics-service")
async def track_event(event: dict):
print("Записываю в аналитику...")
@broker.subscriber("order.created", "inventory-service")
async def reserve_item(event: dict):
print("Резервирую товар на складе...")Аналогия из жизни: YouTube-канал — все подписчики получают уведомление о новом видео. Одно видео — миллионы просмотров, каждый смотрит сам.
| Критерий | Point-to-Point | Publish-Subscribe |
|---|---|---|
| Кто получает? | Один потребитель | Все подписчики |
| Очереди | Одна общая | У каждого своя |
| Сообщений на 1 публикацию | 1 обработка | N обработок (по числу подписчиков) |
| Цель | Распараллеливание нагрузки | Уведомление всех заинтересованных систем |
| Что если потребителей 2? | Каждый получит ~50% сообщений | Каждый получит 100% сообщений |
| Пример из жизни | Очередь задач | Подписка на рассылку |
Синхронный вызов через очередь.
Client ──> [Request Queue] ──> Server
▲ │
│ [Reply Queue] <──────┘
└──────────────────────────────┘
Сценарий: удалённые вычисления, запрос данных у другого сервиса.
Пример:
# RPC-вызов
result = await broker.request(
"calculate.total",
{"items": [...]},
timeout=5.0
)# «Сырой» RabbitMQ с aio-pika
import aio_pika
async def setup_rabbitmq():
connection = await aio_pika.connect_robust("amqp://localhost")
channel = await connection.channel()
queue = await channel.declare_queue("orders", durable=True)
exchange = await channel.declare_exchange("orders.x", aio_pika.ExchangeType.DIRECT)
await queue.bind(exchange, routing_key="new")
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
await handle(message.body)Много бойлерплейта, нет типизации, сложно тестировать.
from faststream import FastStream
from faststream.rabbit import RabbitBroker
from pydantic import BaseModel
class Order(BaseModel):
id: int
amount: float
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
@broker.subscriber("orders")
async def handle_order(order: Order): # Автоматическая валидация!
print(f"Order {order.id}: ${order.amount}")
await broker.publish({"status": "processed"}, "orders.processed")Преимущества FastStream:
┌─────────────────────────────────────────────────────┐
│ FastStream App │
├─────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Rabbit │ │ Kafka │ │ Redis │ │
│ │ Broker │ │ Broker │ │ Broker │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ ┌─────────┴─────────┐ │
│ │ Core Engine │ │
│ │ (subscribers, │ │
│ │ publishers, │ │
│ │ validation) │ │
│ └───────────────────┘ │
└─────────────────────────────────────────────────────┘
Ключевые компоненты:
Depends| Критерий | RabbitMQ | Kafka | Redis Pub/Sub | NATS |
|---|---|---|---|---|
| Модель | Очереди + Exchanges | Лог событий (topics) | Каналы | Subjects |
| Персистентность | Да (очереди) | Да (лог) | Нет | Опционально (JetStream) |
| Throughput | ~10K msg/s | ~1M msg/s | ~50K msg/s | ~500K msg/s |
| Latency | ~1ms | ~10ms | <1ms | <1ms |
| Сложная маршрутизация | Да (exchanges) | Нет | Нет | Wildcards |
| Replay событий | Нет | Да | Нет | С JetStream |
| Идеально для | Task queues, RPC | Event sourcing, стримы | Real-time уведомления | High-performance сервисы |
RabbitMQ:
Kafka:
Redis Pub/Sub:
NATS:
В следующих темах:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.