Обработчики сообщений: декораторы, фильтры, batching, acknowledgment, ручное управление подтверждением.
Обработчики сообщений: декораторы, фильтры, batching, acknowledgment, ручное управление подтверждением.
Декоратор @subscriber регистрирует обработчик сообщений:
from faststream import FastStream
from faststream.rabbit import RabbitBroker
from pydantic import BaseModel
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
class Order(BaseModel):
id: int
amount: float
@broker.subscriber("orders")
async def handle_order(order: Order):
print(f"Received order {order.id}: ${order.amount}")Что происходит:
orders (если не существует)@broker.subscriber(
"orders.new", # routing key
"orders.new.queue", # queue name
exchange="orders", # exchange name
exchange_type="direct", # тип exchange
durable=True, # персистентная очередь
ack=True, # подтверждение обработки
retry_queue="orders.retry", # очередь для retry
dead_letter_queue="orders.dlq", # DLQ для ошибок
max_retry_count=3, # максимум попыток
max_workers=10, # параллельных обработчиков
prefetch_count=1 # сообщений на воркер
)
async def handle_order(order: Order):
...@broker.subscriber(
"orders", # topic
group_id="order-processors", # consumer group
auto_commit=True, # авто-коммит offset
auto_offset_reset="earliest", # с чего начать
batch=True, # batch processing
max_batch_size=100, # сообщений в batch
batch_timeout_ms=1000, # таймаут batch
partitions=[0, 1, 2], # конкретные партиции
consumer_timeout_ms=10000 # таймаут простоя
)
async def handle_order(order: Order):
...@broker.subscriber(
"orders", # channel
batch=False, # без batch
max_workers=5, # параллельных воркеров
stream=False # Pub/Sub (не Stream)
)
async def handle_order(order: Order):
...
# Redis Streams
@broker.subscriber(
"orders",
stream=True, # Stream mode
group="order-processors", # consumer group
consumer="worker-1", # consumer name
ack=True # acknowledgment
)
async def handle_order(order: Order):
...@broker.subscriber(
"orders.new", # subject
queue="order-workers", # queue group
stream="orders", # JetStream stream
durable="order-worker", # durable consumer
ack=True, # acknowledgment
manual_ack=True, # ручное подтверждение
deliver_policy="all" # deliver policy
)
async def handle_order(order: Order):
...@broker.subscriber("orders", ack=True)
async def handle_order(order: Order):
await db.save(order)
# ACK отправляется автоматически после успешного выполненияfrom faststream.rabbit import RabbitMessage
@broker.subscriber("orders", ack=True, manual_ack=True)
async def handle_order(order: Order, message: RabbitMessage):
try:
await db.save(order)
await message.ack() # Явное подтверждение
except Exception as e:
logger.error(f"Failed: {e}")
await message.nack() # Отрицательное подтверждение@broker.subscriber("orders", manual_ack=True)
async def handle_order(order: Order, message: RabbitMessage):
if not validate(order):
# Не возвращать в очередь — сразу в DLQ
await message.nack(requeue=False)
return
try:
await db.save(order)
await message.ack()
except TemporaryError:
# Вернуть в очередь для retry
await message.nack(requeue=True)Обработка сообщений пачками:
@broker.subscriber(
"orders",
batch=True,
max_batch_size=100,
batch_timeout_ms=1000
)
async def handle_orders(orders: list[Order]):
# orders — список из максимум 100 сообщений
await db.bulk_save(orders)@broker.subscriber(
"orders",
batch=True,
max_batch_size=500,
batch_timeout_ms=5000
)
async def handle_orders(orders: list[Order], message: KafkaMessage):
# message содержит метаданные batch
for order in orders:
await process(order)Преимущества batch:
Недостатки:
from faststream.utils import apply_types
@apply_types
@broker.subscriber("events")
async def handle_event(event: dict, filter: str = None):
if filter and event.get("type") != filter:
return # Игнорировать
# Обработать@broker.subscriber("orders")
async def handle_order(order: Order, message: RabbitMessage):
priority = message.headers.get("priority", "normal")
if priority == "high":
await process_high_priority(order)
else:
await process_normal(order)# Подписка на паттерн
@broker.subscriber("orders.*")
async def handle_order_event(event: dict):
# orders.new, orders.paid, orders.shipped
...
# Подписка на все вложенные
@broker.subscriber("orders.>")
async def handle_all_orders(event: dict):
...@broker.subscriber(
"orders",
retry_queue="orders.retry",
dead_letter_queue="orders.dlq",
max_retry_count=3
)
async def handle_order(order: Order):
if not is_valid(order):
raise ValueError("Invalid order") # После 3 попыток → DLQfrom circuitbreaker import circuit
@broker.subscriber("orders")
@circuit(failure_threshold=5, recovery_timeout=30)
async def handle_order(order: Order):
# При 5 ошибках подряд — разрыв цепи на 30 секунд
await external_service.call(order)@broker.subscriber(
"orders",
ack=True,
dead_letter_queue="orders.dlq",
dead_letter_exchange="orders.dlx"
)
async def handle_order(order: Order):
# При ошибке → retry_queue → DLQ
...
# Обработчик DLQ — анализ проблемных сообщений
@broker.subscriber("orders.dlq")
async def handle_dlq(message: RabbitMessage):
logger.error(f"DLQ message: {message.body}")
# Отправить алерт, сохранить для анализа@broker.subscriber(
"orders",
max_workers=10 # 10 параллельных обработчиков
)
async def handle_order(order: Order):
...@broker.subscriber(
"orders",
prefetch_count=1 # Одно сообщение за раз
)
async def handle_order(order: Order):
# Брокер не отправит следующее, пока не получен ACK
...Prefetch count:
1 — строго последовательная обработка (без перегрузки)10 — до 10 сообщений в обработке одновременно0 — без ограничений (может перегрузить память)from faststream import FastStream
from faststream.rabbit import RabbitBroker
from pydantic import BaseModel
from enum import Enum
class OrderStatus(str, Enum):
NEW = "new"
PAID = "paid"
SHIPPED = "shipped"
class Order(BaseModel):
id: int
status: OrderStatus
amount: float
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
@broker.subscriber(
"orders.new",
"orders.new.queue",
exchange="orders",
exchange_type="direct",
durable=True,
ack=True,
retry_queue="orders.retry",
dead_letter_queue="orders.dlq",
max_retry_count=3,
max_workers=5
)
async def process_new_order(order: Order):
logger.info(f"Processing order {order.id}")
try:
# Валидация
if order.amount <= 0:
raise ValueError("Invalid amount")
# Сохранение
await db.orders.insert(order)
# Публикация следующего события
await broker.publish(
{"order_id": order.id},
"orders.paid",
exchange="orders"
)
except ValueError as e:
# Ошибка валидации — не retry, сразу в DLQ
logger.error(f"Validation failed: {e}")
raise # FastStream отправит в DLQ после max_retry_count
except Exception as e:
# Временная ошибка — retry
logger.error(f"Temporary error: {e}")
raiseСледующая тема — Валидация данных с Pydantic: типизация сообщений, кастомные валидаторы, обработка ошибок валидации.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.