Перехватчики сообщений: логирование, метрики, retry-логика, dead letter queues, circuit breaker, alerting.
Перехватчики сообщений: логирование, метрики, retry-логика, dead letter queues, circuit breaker, alerting.
Middleware позволяет перехватывать сообщения до и после обработки:
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
# Middleware для логирования
@broker.middleware
async def log_middleware(message, next_middleware):
print(f"Received: {message}")
result = await next_middleware(message)
print(f"Processed: {result}")
return resultЦепочка вызовов:
Message → Middleware 1 → Middleware 2 → Handler → Middleware 2 → Middleware 1
import logging
from faststream.rabbit import RabbitMessage
logger = logging.getLogger(__name__)
@broker.middleware
async def logging_middleware(message: RabbitMessage, next_middleware):
# Извлечь контекст
trace_id = message.headers.get("trace_id", "unknown")
correlation_id = message.correlation_id
# Логировать вход
logger.info(
f"Processing message",
extra={
"trace_id": trace_id,
"correlation_id": correlation_id,
"routing_key": message.routing_key
}
)
try:
result = await next_middleware(message)
logger.info("Message processed successfully")
return result
except Exception as e:
logger.error(
f"Message processing failed: {e}",
exc_info=True,
extra={"trace_id": trace_id}
)
raisefrom contextvars import ContextVar
trace_id_var: ContextVar[str] = ContextVar("trace_id", default="")
@broker.middleware
async def context_middleware(message: RabbitMessage, next_middleware):
# Установить контекст
trace_id = message.headers.get("trace_id")
token = trace_id_var.set(trace_id)
try:
return await next_middleware(message)
finally:
# Восстановить контекст
trace_id_var.reset(token)
# В обработчике
@broker.subscriber("orders")
async def handle_order(order: dict):
trace_id = trace_id_var.get()
logger.info(f"Processing order {order['id']}", extra={"trace_id": trace_id})from prometheus_client import Counter, Histogram, generate_latest
import time
# Метрики
messages_processed = Counter(
'faststream_messages_processed_total',
'Total processed messages',
['queue', 'status']
)
processing_duration = Histogram(
'faststream_processing_duration_seconds',
'Message processing duration',
['queue']
)
@broker.middleware
async def metrics_middleware(message: RabbitMessage, next_middleware):
queue = message.routing_key
start_time = time.time()
try:
result = await next_middleware(message)
messages_processed.labels(queue=queue, status='success').inc()
return result
except Exception as e:
messages_processed.labels(queue=queue, status='error').inc()
raise
finally:
duration = time.time() - start_time
processing_duration.labels(queue=queue).observe(duration)
# Endpoint для Prometheus
from fastapi import FastAPI
app_metrics = FastAPI()
@app_metrics.get("/metrics")
async def get_metrics():
return Response(generate_latest(), media_type="text/plain")from statsd import StatsClient
statsd = StatsClient(host='localhost', port=8125)
@broker.middleware
async def statsd_middleware(message: RabbitMessage, next_middleware):
start = time.time()
try:
result = await next_middleware(message)
statsd.incr('messages.processed')
return result
except Exception:
statsd.incr('messages.error')
raise
finally:
statsd.timing('messages.duration', (time.time() - start) * 1000)import asyncio
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1.0, exceptions=(Exception,)):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
break
delay = base_delay * (2 ** attempt) # Экспоненциальная задержка
logger.warning(
f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s"
)
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
# Использование
@broker.subscriber("orders")
@retry_with_backoff(max_retries=3, base_delay=1.0)
async def process_order(order: dict):
# Временная ошибка (таймаут БД)
await db.execute("INSERT INTO orders ...")@broker.subscriber(
"orders",
retry_queue="orders.retry",
dead_letter_queue="orders.dlq",
max_retry_count=3
)
async def process_order(order: dict):
if is_temporary_error():
raise Exception("Temporary error") # Вернётся в retry_queue
if is_fatal_error():
raise ValueError("Fatal error") # После 3 попыток → DLQ@broker.subscriber(
"orders",
"orders.main",
exchange="orders",
durable=True,
ack=True,
retry_queue="orders.retry",
dead_letter_queue="orders.dlq",
dead_letter_exchange="orders.dlx",
max_retry_count=3
)
async def process_order(order: dict):
...
# Обработчик DLQ
@broker.subscriber("orders.dlq")
async def handle_dlq(message: RabbitMessage):
logger.error(f"DLQ message: {message.body}")
# Сохранить для анализа
await db.dlq.insert({
"body": message.body,
"headers": message.headers,
"error_time": datetime.utcnow()
})
# Отправить алерт
await send_alert(f"Message in DLQ: {message.correlation_id}")from pydantic import BaseModel
from datetime import datetime
class DLQEntry(BaseModel):
id: int
body: dict
error_time: datetime
error_type: str
retry_count: int
@broker.subscriber("orders.dlq")
async def analyze_dlq(entry: DLQEntry):
# Классификация ошибок
if "validation" in entry.error_type:
# Ошибка валидации — не retry'нуть
logger.warning(f"Validation error: {entry.body}")
elif "timeout" in entry.error_type:
# Временная ошибка — можно retry'нуть вручную
logger.info(f"Timeout error, may retry: {entry.body}")
else:
# Неизвестная ошибка — требует анализа
logger.error(f"Unknown error: {entry.body}")from circuitbreaker import circuit
from datetime import datetime, timedelta
class CircuitBreakerState:
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@circuit(
failure_threshold=5, # 5 ошибок подряд
recovery_timeout=30, # 30 секунд до повторной попытки
expected_exception=Exception
)
async def external_api_call(data):
# Вызов внешнего сервиса
async with aiohttp.ClientSession() as session:
async with session.post("https://api.example.com", json=data) as resp:
return await resp.json()
@broker.subscriber("orders")
async def process_order(order: dict):
try:
# Circuit breaker защитит от cascade failure
result = await external_api_call(order)
return result
except CircuitBreakerOpen:
# Circuit открыт — использовать fallback
logger.warning("Circuit breaker open, using fallback")
return await fallback_processing(order)async def fallback_processing(order: dict):
# Сохранить в очередь для последующей обработки
await broker.publish(order, "orders.pending")
logger.info(f"Order {order['id']} saved for later processing")
# Вернуть пользователю
return {"status": "pending", "order_id": order["id"]}import aiohttp
async def send_alert(message: str, severity: str = "error"):
webhook_url = "https://hooks.slack.com/services/XXX/YYY/ZZZ"
async with aiohttp.ClientSession() as session:
await session.post(webhook_url, json={
"text": f"[{severity.upper()}] {message}",
"channel": "#alerts",
"username": "FastStream Bot"
})
@broker.middleware
async def alerting_middleware(message: RabbitMessage, next_middleware):
try:
return await next_middleware(message)
except Exception as e:
# Отправить алерт при ошибке
await send_alert(
f"Error processing message: {e}\n"
f"Queue: {message.routing_key}\n"
f"Correlation ID: {message.correlation_id}",
severity="critical"
)
raisefrom collections import defaultdict
from datetime import datetime, timedelta
alert_cooldowns = defaultdict(datetime)
async def send_alert_if_needed(error_type: str, message: str):
now = datetime.utcnow()
# Не чаще раза в 5 минут для каждого типа ошибки
if now - alert_cooldowns[error_type] < timedelta(minutes=5):
return
await send_alert(message)
alert_cooldowns[error_type] = now
@broker.middleware
async def smart_alerting_middleware(message: RabbitMessage, next_middleware):
try:
return await next_middleware(message)
except Exception as e:
error_type = type(e).__name__
await send_alert_if_needed(
error_type,
f"{error_type}: {e} in {message.routing_key}"
)
raisefrom faststream import FastStream
from faststream.rabbit import RabbitBroker
import logging
import time
from prometheus_client import Counter
logger = logging.getLogger(__name__)
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
# Middleware 1: Логирование
@broker.middleware
async def logging_middleware(message: RabbitMessage, next_middleware):
trace_id = message.headers.get("trace_id")
logger.info(f"Processing {message.routing_key}", extra={"trace_id": trace_id})
start = time.time()
try:
result = await next_middleware(message)
logger.info(f"Success", extra={"trace_id": trace_id})
return result
except Exception as e:
logger.error(f"Error: {e}", exc_info=True, extra={"trace_id": trace_id})
raise
finally:
duration = time.time() - start
logger.debug(f"Duration: {duration:.3f}s")
# Middleware 2: Метрики
messages_total = Counter('messages_total', 'Total messages', ['queue', 'status'])
@broker.middleware
async def metrics_middleware(message: RabbitMessage, next_middleware):
try:
result = await next_middleware(message)
messages_total.labels(queue=message.routing_key, status='success').inc()
return result
except Exception:
messages_total.labels(queue=message.routing_key, status='error').inc()
raise
# Middleware 3: Circuit breaker для внешних вызовов
@broker.middleware
async def circuit_breaker_middleware(message: RabbitMessage, next_middleware):
try:
return await next_middleware(message)
except ExternalServiceError as e:
# Circuit breaker сработает внутри обработчика
raise
# Обработчик со всеми middleware
@broker.subscriber(
"orders",
retry_queue="orders.retry",
dead_letter_queue="orders.dlq",
max_retry_count=3
)
async def process_order(order: dict):
# Логирование и метрики автоматически применяются
result = await call_external_service(order)
return resultСледующая тема — Продвинутые паттерны: RPC, Request-Reply: синхронные вызовы через очереди, таймауты, обработка ошибок.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.