Лёгкий messaging: каналы, publish/subscribe, паттерны. Когда Redis подходит для очередей, а когда нет.
Лёгкий messaging: каналы, publish/subscribe, паттерны. Когда Redis подходит для очередей, а когда нет.
Redis — это не только key-value хранилище, но и простой брокер сообщений с механизмом Pub/Sub.
Особенности Redis Pub/Sub:
Когда использовать:
┌─────────────┐
│ Publisher │
└──────┬──────┘
│ PUBLISH notifications "Hello!"
▼
┌─────────────────┐
│ Redis │
│ (Message Hub) │
└──────┬──────────┘
│
├──────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Subscriber │ │ Subscriber │
│ (Email) │ │ (Push) │
└─────────────┘ └─────────────┘
Модель:
from faststream import FastStream
from faststream.redis import RedisBroker
broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)@broker.publisher("notifications")
async def send_notification(user_id: int, message: str):
await broker.publish(
{"user_id": user_id, "message": message},
"notifications"
)from pydantic import BaseModel
class Notification(BaseModel):
user_id: int
message: str
@broker.subscriber("notifications")
async def handle_notification(notif: Notification):
print(f"User {notif.user_id}: {notif.message}")Базовая публикация-подписка:
# Publisher
await broker.publish({"event": "user.login"}, "events")
# Subscriber
@broker.subscriber("events")
async def handle_event(event: dict):
...Подписка на несколько каналов по паттерну:
# Публикация в разные каналы
await broker.publish({"msg": "error"}, "logs.error")
await broker.publish({"msg": "warning"}, "logs.warning")
await broker.publish({"msg": "info"}, "logs.info")
# Подписка на все логи
@broker.subscriber("logs.*") # Паттерн
async def handle_all_logs(msg: dict):
...
# Подписка только на ошибки
@broker.subscriber("logs.error")
async def handle_errors(msg: dict):
...Паттерны:
logs.* — logs.error, logs.warning (один уровень)logs.** — logs.error, logs.error.db, logs.error.db.sql (любая вложенность)user.*.events — user.123.events, user.456.eventsСинхронный вызов через Redis:
# Server
@broker.subscriber("calculate", reply_to="results")
async def handle_calc(data: dict):
result = data["a"] + data["b"]
return {"result": result}
# Client
response = await broker.request(
{"a": 2, "b": 3},
"calculate",
timeout=5.0
)
print(response) # {"result": 5}Redis Streams — персистентная альтернатива Pub/Sub:
# Публикация в stream
await broker.publish(
{"order_id": 123},
"orders",
stream=True # Использовать Redis Streams
)
# Потребление из stream
@broker.subscriber(
"orders",
stream=True,
group="order-processors", # Consumer group
consumer="worker-1"
)
async def process_order(order: dict):
...Отличия Streams от Pub/Sub:
Публикация нескольких сообщений в одном запросе:
from faststream.redis import RedisBatch
async with RedisBatch(broker) as batch:
batch.publish({"id": 1}, "orders")
batch.publish({"id": 2}, "orders")
batch.publish({"id": 3}, "orders")
# Один RTT вместо трёхОчереди на основе списков Redis (альтернатива Pub/Sub):
# Producer: LPUSH
await broker.connection.lpush("tasks", json.dumps(task))
# Consumer: BRPOP (blocking)
@broker.subscriber("tasks", list=True)
async def process_task(task: dict):
...Преимущества перед Pub/Sub:
Передача метаданных через заголовки:
await broker.publish(
{"data": "..."},
"channel",
headers={"priority": "high", "trace_id": "abc123"}
)
@broker.subscriber("channel")
async def handle(msg: dict, message: RedisMessage):
print(f"Priority: {message.headers.get('priority')}")from faststream import FastStream
from faststream.redis import RedisBroker
from pydantic import BaseModel
from enum import Enum
class NotificationType(str, Enum):
EMAIL = "email"
PUSH = "push"
SMS = "sms"
class Notification(BaseModel):
user_id: int
type: NotificationType
message: str
broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)
# Publisher
@broker.publisher("notifications")
async def notify_user(notif: Notification):
await broker.publish(notif.model_dump(), "notifications")
# Email subscriber
@broker.subscriber("notifications")
async def send_email(notif: Notification):
if notif.type == NotificationType.EMAIL:
print(f"Email to {notif.user_id}: {notif.message}")
# Push subscriber
@broker.subscriber("notifications")
async def send_push(notif: Notification):
if notif.type == NotificationType.PUSH:
print(f"Push to {notif.user_id}: {notif.message}")
# Логирование всех уведомлений (pattern sub)
@broker.subscriber("notifications.*")
async def log_notification(notif: Notification):
print(f"Logged: {notif}")# Просмотр активных подписчиков
redis-cli PUBSUB NUMSUB notifications
# notifications: 5 (5 подписчиков)
# Просмотр подписок по паттерну
redis-cli PUBSUB NUMPAT logs.*
# Мониторинг в реальном времени
redis-cli MONITORRedis экспортирует метрики через INFO:
pubsub_channels — активные каналыpubsub_patterns — активные паттерныconnected_clients — подключённые клиенты| Характеристика | Pub/Sub | Streams | Lists |
|---|---|---|---|
| Персистентность | Нет | Да | Да |
| Доставка | Всем подписчикам | Consumer groups | Один потребитель |
| Ack | Нет | Да (XACK) | Нет (но можно эмулировать) |
| Replay | Нет | Да (XREAD) | Да (LRANGE) |
| Задержка | <1ms | ~1ms | ~1ms |
| Сценарий | Real-time уведомления | Event sourcing, лог | Очереди задач |
Следующая тема — NATS: высокопроизводительная коммуникация с простой моделью subjects и поддержкой JetStream.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.