Стратегии retry, exponential backoff, circuit breaker в асинхронных консьюмерах
В production-системах ошибки неизбежны: внешние сервисы недоступны, данные невалидны, таймауты. Правильная стратегия обработки ошибок определяет надёжность всей системы.
Первое правило — разделять ошибки на временные и постоянные. Это определяет стратегию обработки:
class TemporaryError(Exception):
"""
Временная ошибка — обработка может succeed позже.
Примеры:
- БД временно недоступна
- Таймаут внешнего API
- Блокировка ресурса (lock timeout)
"""
class PermanentError(Exception):
"""
Постоянная ошибка — retry не поможет.
Примеры:
- Невалидный JSON
- Несуществующий пользователь
- Ошибка бизнес-логики
"""Рассмотрим три основных стратегии обработки ошибок:
Простой retry возвращает сообщение в очередь для немедленной повторной обработки:
async def process_message(message: aio_pika.abc.AbstractIncomingMessage):
try:
await handle(message)
await message.ack()
except TemporaryError as e:
# Временная ошибка — вернуть в очередь для немедленной retry
await message.nack(requeue=True)
except PermanentError as e:
# Постоянная ошибка — не возвращать
await message.nack(requeue=False)Проблема: При постоянных временных ошибках сообщение будет бесконечно retry'иться.
Счётчик попыток предотвращает бесконечный retry:
MAX_RETRIES = 3
async def process_message(message: aio_pika.abc.AbstractIncomingMessage):
try:
async with message.process():
# Получаем счётчик попыток из заголовков
retry_count = message.headers.get('x-retry-count', 0)
if retry_count >= MAX_RETRIES:
logger.error(f"Max retries reached for message {message.message_id}")
await message.nack(requeue=False) # В DLX
return
await handle(message)
except TemporaryError as e:
retry_count = message.headers.get('x-retry-count', 0)
await message.update_headers({'x-retry-count': retry_count + 1})
await message.nack(requeue=True)
except PermanentError as e:
logger.error(f"Permanent error: {e}")
await message.nack(requeue=False)Exponential backoff откладывает retry с увеличивающейся задержкой для предотвращения перегрузки:
import random
from datetime import datetime, timedelta
def calculate_backoff(retry_count: int, base_delay: int = 1) -> int:
"""
Расчёт задержки с exponential backoff и jitter.
Формула: delay = base_delay * 2^retry_count + random_jitter
"""
delay = base_delay * (2 ** retry_count)
# Jitter для предотвращения thundering herd
jitter = random.uniform(0, delay * 0.1)
return int(delay + jitter)
async def process_with_backoff(message: aio_pika.abc.AbstractIncomingMessage):
try:
async with message.process():
retry_count = message.headers.get('x-retry-count', 0)
if retry_count >= MAX_RETRIES:
await message.nack(requeue=False)
return
await handle(message)
except TemporaryError as e:
retry_count = message.headers.get('x-retry-count', 0)
# Расчёт задержки
delay_seconds = calculate_backoff(retry_count)
# Отправка в delay queue (см. тему DLX)
await send_to_delay_queue(message, delay_seconds)
await message.ack() # Ack чтобы удалить из текущей очереди
except PermanentError as e:
await message.nack(requeue=False)Circuit Breaker предотвращает перегрузку недоступного сервиса, размыкая цепь при множественных ошибках:
from enum import Enum
from datetime import datetime, timedelta
from typing import Optional
class CircuitState(Enum):
CLOSED = "closed" # Нормальная работа
OPEN = "open" # Сервис недоступен, requests блокируются
HALF_OPEN = "half_open" # Тестовый запрос
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
half_open_max_calls: int = 1
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time: Optional[datetime] = None
self.half_open_calls = 0
async def call(self, func, *args, **kwargs):
"""Вызов функции с circuit breaker."""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise CircuitBreakerOpenError("Circuit breaker open")
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise CircuitBreakerOpenError("Half-open limit reached")
self.half_open_calls += 1
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Успешный вызов."""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Неудачный вызов."""
self.failure_count += 1
self.last_failure_time = datetime.utcnow()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _should_attempt_reset(self) -> bool:
"""Проверка возможности перехода в half-open."""
if self.last_failure_time is None:
return True
elapsed = datetime.utcnow() - self.last_failure_time
return elapsed.total_seconds() >= self.recovery_timeout
class CircuitBreakerOpenError(Exception):
"""Circuit breaker в состоянии open."""
# Использование
circuit_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
async def process_with_circuit_breaker(message):
try:
async with message.process():
# Вызов внешнего сервиса через circuit breaker
await circuit_breaker.call(
external_api.process,
message.body
)
except CircuitBreakerOpenError:
# Сервис недоступен — retry позже
logger.warning("Circuit breaker open, retrying later")
await message.nack(requeue=True)
except TemporaryError:
await message.nack(requeue=True)
except PermanentError:
await message.nack(requeue=False)Создаём очередь для отложенных retry с помощью TTL:
async def setup_retry_queues(channel):
"""Настройка очередей для retry с задержкой."""
# Основная очередь
main_queue = await channel.declare_queue(
"main_queue",
durable=True,
arguments={
"x-dead-letter-exchange": "retry_exchange",
"x-dead-letter-routing-key": "retry"
}
)
# Retry очередь с TTL
retry_queue = await channel.declare_queue(
"retry_queue",
durable=True,
arguments={
"x-message-ttl": 60000, # 1 минута задержка
"x-dead-letter-exchange": "", # Back to main
"x-dead-letter-routing-key": "main"
}
)
# Обменники
main_exchange = await channel.declare_exchange(
"main_exchange",
aio_pika.ExchangeType.DIRECT,
durable=True
)
retry_exchange = await channel.declare_exchange(
"retry_exchange",
aio_pika.ExchangeType.DIRECT,
durable=True
)
# Binding
await main_queue.bind(main_exchange, routing_key="main")
await retry_queue.bind(retry_exchange, routing_key="retry")
return main_queue, retry_queue
async def send_to_retry_queue(message, retry_exchange):
"""Отправка сообщения в retry queue."""
retry_message = Message(
body=message.body,
headers=message.headers,
delivery_mode=2
)
await retry_exchange.publish(
retry_message,
routing_key="retry"
)Объединим все паттерны в единую стратегию robust consumer:
import asyncio
from datetime import datetime
from typing import Callable, Awaitable
class RobustConsumer:
def __init__(
self,
max_retries: int = 3,
base_delay: int = 1,
max_delay: int = 300
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.circuit_breakers: dict = {}
def get_circuit_breaker(self, service: str) -> CircuitBreaker:
"""Получение circuit breaker для сервиса."""
if service not in self.circuit_breakers:
self.circuit_breakers[service] = CircuitBreaker()
return self.circuit_breakers[service]
async def process(
self,
message: aio_pika.abc.AbstractIncomingMessage,
handler: Callable[[aio_pika.abc.AbstractIncomingMessage], Awaitable]
):
"""
Обработка сообщения с полной стратегией retry.
"""
try:
async with message.process():
retry_count = message.headers.get('x-retry-count', 0)
if retry_count >= self.max_retries:
logger.error(f"Max retries reached: {message.message_id}")
await self._send_to_dlq(message)
return
await handler(message)
except TemporaryError as e:
await self._handle_temporary_error(message, e)
except PermanentError as e:
logger.error(f"Permanent error: {e}")
await message.nack(requeue=False)
except CircuitBreakerOpenError as e:
logger.warning(f"Circuit breaker open: {e}")
await message.nack(requeue=True)
except Exception as e:
logger.exception(f"Unexpected error: {e}")
await message.nack(requeue=False)
async def _handle_temporary_error(
self,
message: aio_pika.abc.AbstractIncomingMessage,
error: Exception
):
"""Обработка временной ошибки с exponential backoff."""
retry_count = message.headers.get('x-retry-count', 0)
# Расчёт задержки
delay = min(
self.base_delay * (2 ** retry_count),
self.max_delay
)
# Jitter
jitter = random.uniform(0, delay * 0.1)
delay += jitter
logger.warning(
f"Temporary error: {error}. Retry {retry_count + 1} in {delay:.1f}s"
)
# Обновление заголовков
await message.update_headers({
'x-retry-count': retry_count + 1,
'x-retry-delay': int(delay * 1000), # ms
'x-last-error': str(error),
'x-last-error-time': datetime.utcnow().isoformat()
})
# Отправка в delay queue или requeue
if hasattr(self, 'delay_queue'):
await self._send_to_delay_queue(message, int(delay * 1000))
await message.ack()
else:
await message.nack(requeue=True)
async def _send_to_dlq(self, message):
"""Отправка в Dead Letter Queue."""
await message.update_headers({
'x-death-time': datetime.utcnow().isoformat(),
'x-death-reason': 'max_retries_exceeded'
})
await message.nack(requeue=False)
async def _send_to_delay_queue(self, message, delay_ms: int):
"""Отправка в очередь с задержкой."""
# Реализация зависит от настройки delay queue
passИспользуйте Prometheus для сбора метрик об ошибках:
from prometheus_client import Counter, Histogram
error_counter = Counter(
'rabbitmq_errors_total',
'Total errors',
['error_type', 'queue']
)
retry_counter = Counter(
'rabbitmq_retries_total',
'Total retries',
['queue']
)
dlq_counter = Counter(
'rabbitmq_dlq_total',
'Total messages sent to DLQ',
['queue']
)
processing_error_histogram = Histogram(
'rabbitmq_error_processing_seconds',
'Error processing time',
['error_type']
)
class MonitoredConsumer(RobustConsumer):
async def process(self, message, handler):
start_time = asyncio.get_event_loop().time()
queue_name = message.exchange.name
try:
await super().process(message, handler)
except Exception as e:
error_counter.labels(
error_type=type(e).__name__,
queue=queue_name
).inc()
raise
finally:
elapsed = asyncio.get_event_loop().time() - start_time
processing_error_histogram.observe(elapsed)Следующие рекомендации обеспечат надёжную обработку ошибок:
Логирование с полным контекстом помогает отлаживать проблемы:
logger.error(
f"Failed to process message {message.message_id}",
extra={
"routing_key": message.routing_key,
"headers": message.headers,
"body_preview": message.body[:100],
"retry_count": message.headers.get('x-retry-count', 0),
"error": str(e)
},
exc_info=True
)Мониторьте размер DLQ для своевременного обнаружения проблем:
async def check_dlq():
"""Проверка DLQ и алертинг."""
dlq_size = await get_queue_size("dead_letter_queue")
if dlq_size > 10:
send_alert(
f"DLQ size: {dlq_size}",
severity="warning" if dlq_size < 50 else "critical"
)Создайте дашборд для визуализации метрик DLQ:
# Дашборд с метриками:
# - DLQ size over time
# - Error rate by type
# - Retry success rate
# - Circuit breaker stateТестируйте обработку ошибок в CI/CD:
async def test_error_handling():
"""Тест обработки ошибок."""
# Тест временной ошибки
await publish_test_message(headers={'x-test-error': 'temporary'})
# Тест постоянной ошибки
await publish_test_message(headers={'x-test-error': 'permanent'})
# Тест max retries
await publish_test_message(headers={'x-retry-count': 3})
# Проверка что сообщения в DLQ
dlq_size = await get_queue_size("dead_letter_queue")
assert dlq_size == 1 # Только permanent errorВ следующей теме изучим Dead Letter Exchanges и отложенные сообщения.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.