Высокопроизводительная коммуникация: subjects, queue groups, wildcards. JetStream для персистентности.
Высокопроизводительная коммуникация: subjects, queue groups, wildcards. JetStream для персистентности.
NATS — это высокопроизводительный брокер сообщений с простой моделью subjects, разработанный для:
Ключевые особенности:
*, >)| Характеристика | NATS Core | JetStream |
|---|---|---|
| Персистентность | Нет (at-most-once) | Да (at-least-once) |
| Хранение | Сообщения не хранятся | Лог сообщений на диске |
| Delivery | Fire-and-forget | Гарантированная доставка |
| Replay | Нет | Да (чтение с любого offset) |
| Сценарий | Real-time, service mesh | Event sourcing, очереди |
┌─────────────┐
│ Publisher │
└──────┬──────┘
│ PUBLISH orders.new {...}
▼
┌─────────────────┐
│ NATS │
│ Server │
└──────┬──────────┘
│
├────────────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Subscriber │ │ Subscriber │
│ Queue Group │ │ Queue Group │
│ (workers) │ │ (analytics) │
└─────────────┘ └─────────────┘
Тема сообщения в формате dot-separated:
orders.new
orders.paid
orders.shipped
user.created
user.deleted
Иерархия:
orders.new — новый заказorders.paid — оплаченный заказorders.* — все события заказов (wildcard)* — ровно один токен> — ноль или более токенов (должен быть в конце)# Подписка на все события заказов
orders.* → orders.new, orders.paid, orders.shipped
orders.> → orders.new, orders.paid, orders.shipped, orders.shipped.international
# Подписка на все события
> → все сообщения
# Подписка на все user-события
user.* → user.created, user.deleted
user.> → user.created, user.deleted, user.profile.updated
Балансировка нагрузки между потребителями:
Queue Group: "order-workers"
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
│ orders.new │ │ orders.new │ │ orders.new │
└─────────────┘ └─────────────┘ └─────────────┘
▲ ▲ ▲
└───────────────────┼───────────────────┘
│
NATS Server
(round-robin балансировка)
Правила:
from faststream import FastStream
from faststream.nats import NatsBroker
broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)@broker.publisher("orders.new")
async def create_order(order: dict):
await broker.publish(order, "orders.new")from pydantic import BaseModel
class Order(BaseModel):
id: int
amount: float
@broker.subscriber("orders.new")
async def handle_order(order: Order):
print(f"New order: {order.id}")@broker.subscriber(
"orders.new",
queue="order-workers" # Queue group для балансировки
)
async def handle_order(order: Order):
# Только один воркер из группы получит сообщение
...# Подписка на все события заказов
@broker.subscriber("orders.*")
async def handle_order_event(event: dict):
...
# Подписка на все вложенные события
@broker.subscriber("orders.>")
async def handle_all_orders(event: dict):
...NATS имеет встроенную поддержку RPC:
# Server
@broker.subscriber("calculate")
async def handle_calc(data: dict) -> dict:
return {"result": data["a"] + data["b"]}
# Client
response = await broker.request(
{"a": 2, "b": 3},
"calculate",
timeout=5.0
)
print(response) # {"result": 5}Преимущества перед ручным RPC:
JetStream добавляет персистентность к NATS:
from faststream.nats import NatsBroker
broker = NatsBroker(
"nats://localhost:4222",
jetstream=True # Включить JetStream
)Stream — это лог хранения сообщений:
from faststream.nats import JetStreamConfig
# Создание stream
await broker.add_stream(
name="orders",
subjects=["orders.*"], # Subjects для stream
storage="file" # file или memory
)@broker.subscriber(
"orders.new",
stream="orders", # JetStream stream
durable="order-worker", # Имя потребителя (персистентное)
ack=True, # Подтверждение обработки
manual_ack=True # Ручной ack после обработки
)
async def handle_order(order: Order, message):
try:
await db.save(order)
await message.ack() # Подтвердить
except Exception:
await message.nack() # Вернуть в streamПолитика доставки для новых потребителей:
@broker.subscriber(
"orders.new",
stream="orders",
deliver_policy="all" # all, last, new, start_time, start_seq
)
async def handle_order(order: Order):
...Опции:
all — читать с начала streamlast — только последнее сообщениеnew — только новые сообщения (после подписки)start_time — с указанного времениstart_seq — с указанной последовательностиПотребитель сам запрашивает сообщения (вместо push):
@broker.subscriber(
"orders.new",
stream="orders",
pull=True, # Pull consumer
batch_size=10, # Запрашивать по 10 сообщений
batch_timeout_ms=1000
)
async def handle_orders(orders: list[Order]):
# Обработать batch
...Преимущества:
NATS JetStream KV — распределённое key-value хранилище:
# Создание KV
await broker.create_kv_bucket(
name="config",
history=10 # Хранить 10 версий
)
# Запись
await broker.put("config", "app.version", "1.2.3")
# Чтение
value = await broker.get("config", "app.version")
# Подписка на изменения
@broker.subscriber("config.>", stream="KV_config")
async def handle_config_change(key: str, value: str):
...Сценарии:
Хранение бинарных объектов:
# Создание Object Store
await broker.create_object_store(
name="uploads",
storage="file"
)
# Загрузка
await broker.put_object(
"uploads",
"file.pdf",
data=binary_data
)
# Скачивание
data = await broker.get_object("uploads", "file.pdf")NATS идеален для service mesh благодаря производительности и простоте:
from faststream import FastStream
from faststream.nats import NatsBroker
from pydantic import BaseModel
class ServiceRequest(BaseModel):
service: str
method: str
payload: dict
class ServiceResponse(BaseModel):
status: int
data: dict
broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)
# Service Discovery
@broker.subscriber("service.discover")
async def discover_service(request: ServiceRequest) -> ServiceResponse:
# Вернуть информацию о сервисе
return ServiceResponse(status=200, data={"host": "user-service:8080"})
# RPC вызов сервиса
@broker.subscriber("service.call", queue="rpc-workers")
async def call_service(request: ServiceRequest) -> ServiceResponse:
# Проксировать вызов к целевому сервису
response = await http.post(f"{request.service}/{request.method}", request.payload)
return ServiceResponse(status=response.status, data=response.json())
# События сервисов
@broker.subscriber("events.>")
async def handle_event(event: dict):
# Логирование всех событий
print(f"Event: {event}")# Информация о сервере
nats server info
# Статистика соединений
nats connection ls
# Мониторинг subjects
nats sub ">" --countNATS экспортирует метрики на порту 8222:
curl http://localhost:8222/connz # Соединения
curl http://localhost:8222/subsz # Подписки
curl http://localhost:8222/routez # Маршруты (в кластере)
curl http://localhost:8222/jz # JetStream статусNATS экспортирует метрики для Prometheus:
nats_connection_count — активные соединенияnats_sent_msgs — отправленные сообщенияnats_received_msgs — полученные сообщенияnats_jetstream_storage_used — использованное место JetStream| Характеристика | NATS Core | NATS JetStream | RabbitMQ | Kafka |
|---|---|---|---|---|
| Производительность | ~500K msg/s | ~100K msg/s | ~10K msg/s | ~1M msg/s |
| Персистентность | Нет | Да | Да | Да |
| Модель | Subjects | Streams | Exchanges/Queues | Topics/Partitions |
| Сложность | Низкая | Средняя | Средняя | Высокая |
| Идеально для | Service mesh, real-time | Event sourcing, очереди | Task queues, RPC | Event streaming |
*, >) — гибкая подписка на паттерныСледующая тема — Публикация сообщений (Publishing): отправка сообщений во все брокеры, кастомизация заголовков, delivery modes.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.