Паттерны для production: обработка ошибок, retry с backoff, dead letter queue, circuit breaker.
Production-паттерны обеспечивают надёжность и отказоустойчивость Kafka-приложений. В этой теме вы изучите обработку ошибок, retry с backoff, dead letter queue и circuit breaker.
В production приложения сталкиваются с различными ошибками:
from confluent_kafka import KafkaError
import json
class OrderProcessingError(Exception):
"""Базовое исключение для ошибок обработки"""
pass
class ValidationError(OrderProcessingError):
"""Ошибка валидации данных — не retry"""
pass
class TemporaryError(OrderProcessingError):
"""Временная ошибка — требует retry"""
pass
class FatalError(OrderProcessingError):
"""Критическая ошибка — требует вмешательства"""
pass
def classify_error(error):
"""Классификация ошибки для выбора стратегии"""
if isinstance(error, json.JSONDecodeError):
return 'poison_pill' # Невалидные данные
if isinstance(error, ValidationError):
return 'validation' # Ошибка валидации
if isinstance(error, (ConnectionError, TimeoutError)):
return 'temporary' # Временная ошибка
if isinstance(error, KafkaError):
if error.code() in [KafkaError._MSG_TIMED_OUT, KafkaError._TRANSPORT]:
return 'temporary'
if error.code() in [KafkaError.MSG_SIZE_TOO_LARGE]:
return 'poison_pill'
return 'unknown'from confluent_kafka import Producer, Consumer, KafkaError
import time
class RetryProducer:
def __init__(self):
self.producer = Producer({
'bootstrap.servers': 'localhost:9092',
'retries': 3,
'retry.backoff.ms': 100
})
def send_with_retry(self, topic, key, value, max_retries=3):
for attempt in range(max_retries):
try:
self.producer.produce(topic, key=key, value=value)
self.producer.flush(timeout=10)
return True
except KafkaError as e:
if attempt == max_retries - 1:
raise # Последний retry failed
wait_time = 0.1 * (2 ** attempt) # Exponential backoff
print(f'Retry {attempt + 1}/{max_retries} after {wait_time}s: {e}')
time.sleep(wait_time)
return Falseimport random
import time
from functools import wraps
def retry_with_backoff(max_retries=5, base_delay=1.0, max_delay=60.0, jitter=True):
"""Декоратор для retry с exponential backoff"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (ConnectionError, TimeoutError) as e:
last_exception = e
if attempt == max_retries - 1:
break # Последний retry failed
# Exponential backoff
delay = min(base_delay * (2 ** attempt), max_delay)
# Jitter для предотвращения thundering herd
if jitter:
delay = delay * (0.5 + random.random())
print(f'Retry {attempt + 1}/{max_retries} after {delay:.2f}s')
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# Использование
@retry_with_backoff(max_retries=5, base_delay=1.0, max_delay=60.0)
def send_message(producer, topic, key, value):
producer.produce(topic, key=key, value=value)
producer.flush()from confluent_kafka import Consumer, KafkaError
import time
class RetryConsumer:
def __init__(self):
self.consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'retry-consumer',
'enable.auto.commit': False
})
self.consumer.subscribe(['orders'])
def consume_with_retry(self, max_retries=3):
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
continue
for attempt in range(max_retries):
try:
self.process_message(msg)
self.consumer.commit(msg)
break
except TemporaryError as e:
if attempt == max_retries - 1:
# Все retry failed — отправляем в DLQ
self.send_to_dlq(msg, f'All retries failed: {e}')
self.consumer.commit(msg)
else:
wait_time = 2 ** attempt
print(f'Retry {attempt + 1}/{max_retries} after {wait_time}s')
time.sleep(wait_time)
except ValidationError as e:
# Валидационная ошибка — не retry, сразу в DLQ
self.send_to_dlq(msg, f'Validation error: {e}')
self.consumer.commit(msg)
break
except Exception as e:
# Неожиданная ошибка — логирование и retry
print(f'Unexpected error: {e}')
if attempt == max_retries - 1:
self.send_to_dlq(msg, f'Unexpected error: {e}')
self.consumer.commit(msg)
def process_message(self, msg):
# Бизнес-логика
pass
def send_to_dlq(self, msg, reason):
# Отправка в dead letter queue
passDead Letter Queue — топик для хранения сообщений, которые не удалось обработать.
from confluent_kafka.admin import AdminClient, NewTopic
admin = AdminClient({'bootstrap.servers': 'localhost:9092'})
# DLQ топик с длительным retention
dlq_topic = NewTopic(
name='orders-dlq',
num_partitions=3,
replication_factor=3,
config={
'retention.ms': '2592000000', # 30 дней для анализа
'cleanup.policy': 'delete'
}
)
admin.create_topics([dlq_topic])from confluent_kafka import Producer
import json
from datetime import datetime
class DLQProducer:
def __init__(self):
self.producer = Producer({
'bootstrap.servers': 'localhost:9092'
})
def send_to_dlq(self, original_msg, reason, retry_count=0):
"""Отправка сообщения в DLQ с метаданными"""
dlq_message = {
'original_topic': original_msg.topic(),
'original_partition': original_msg.partition(),
'original_offset': original_msg.offset(),
'original_key': original_msg.key().decode() if original_msg.key() else None,
'original_value': original_msg.value().decode(),
'error_reason': str(reason),
'retry_count': retry_count,
'timestamp': datetime.utcnow().isoformat()
}
self.producer.produce(
'orders-dlq',
key=original_msg.key(),
value=json.dumps(dlq_message).encode('utf-8'),
headers={
'dlq-reason': reason.encode('utf-8'),
'original-topic': original_msg.topic().encode('utf-8')
}
)
self.producer.flush()
print(f'Sent to DLQ: {original_msg.topic()} [{original_msg.offset()}] - {reason}')from confluent_kafka import Consumer
import json
class DLQProcessor:
def __init__(self):
self.consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'dlq-processor',
'auto.offset.reset': 'earliest'
})
self.consumer.subscribe(['orders-dlq'])
self.producer = Producer({
'bootstrap.servers': 'localhost:9092'
})
def process_dlq(self):
"""Обработка сообщений из DLQ"""
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
continue
dlq_data = json.loads(msg.value().decode('utf-8'))
# Анализ причины ошибки
reason = dlq_data.get('error_reason', '')
if 'Validation error' in reason:
# Валидационная ошибка — требует исправления данных
print(f'Validation error, manual review needed: {dlq_data}')
elif 'Connection' in reason or 'Timeout' in reason:
# Временная ошибка — можно retry
print(f'Temporary error, retrying: {dlq_data}')
self.retry_message(dlq_data)
else:
# Неизвестная ошибка — логирование
print(f'Unknown error: {dlq_data}')
def retry_message(self, dlq_data):
"""Повторная отправка сообщения в оригинальный топик"""
original_topic = dlq_data.get('original_topic')
original_key = dlq_data.get('original_key')
original_value = dlq_data.get('original_value')
if original_value:
self.producer.produce(
original_topic,
key=original_key.encode() if original_key else None,
value=original_value.encode()
)
self.producer.flush()
print(f'Retried message from {original_topic}')Circuit Breaker предотвращает каскадные сбои, временно блокируя вызовы при множественных ошибках.
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = 'closed' # Нормальная работа
OPEN = 'open' # Цепь разомкнута, вызовы блокируются
HALF_OPEN = 'half_open' # Проверка восстановления
class CircuitBreaker:
def __init__(
self,
failure_threshold=5,
recovery_timeout=30,
half_open_max_calls=3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.half_open_calls = 0
self._lock = Lock()
def call(self, func, *args, **kwargs):
"""Вызов функции с circuit breaker"""
with self._lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise CircuitBreakerOpenError('Circuit breaker is OPEN')
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise CircuitBreakerOpenError('Circuit breaker HALF_OPEN limit reached')
self.half_open_calls += 1
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _should_attempt_reset(self):
"""Проверка возможности сброса"""
if self.last_failure_time is None:
return True
return (time.time() - self.last_failure_time) >= self.recovery_timeout
def _on_success(self):
"""Обработка успешного вызова"""
with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
def _on_failure(self):
"""Обработка неудачного вызова"""
with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class CircuitBreakerOpenError(Exception):
"""Исключение при открытом circuit breaker"""
passfrom confluent_kafka import Producer, Consumer, KafkaError
class ResilientKafkaProducer:
def __init__(self):
self.producer = Producer({
'bootstrap.servers': 'localhost:9092'
})
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=30,
half_open_max_calls=3
)
def produce(self, topic, key, value):
"""Отправка сообщения с circuit breaker"""
def _produce():
self.producer.produce(topic, key=key, value=value)
self.producer.flush(timeout=10)
try:
self.circuit_breaker.call(_produce)
return True
except CircuitBreakerOpenError:
# Circuit breaker open — отправляем в DLQ или кэшируем
print('Circuit breaker OPEN, message queued')
self.queue_for_retry(topic, key, value)
return False
except KafkaError as e:
print(f'Kafka error: {e}')
return False
def queue_for_retry(self, topic, key, value):
"""Кэширование сообщения для последующей отправки"""
# Сохранение в локальную очередь или базу данных
pass
# Использование
producer = ResilientKafkaProducer()
for i in range(100):
producer.produce('orders', key=f'order_{i}', value=f'{{"id": {i}}}')from confluent_kafka import Consumer, KafkaError
class ResilientConsumer:
def __init__(self):
self.consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'resilient-consumer',
'enable.auto.commit': False
})
self.consumer.subscribe(['orders'])
# Circuit breaker для обработки сообщений
self.processing_circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60
)
# Circuit breaker для внешних вызовов
self.external_service_cb = CircuitBreaker(
failure_threshold=3,
recovery_timeout=30
)
def consume(self):
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
continue
try:
# Обработка с circuit breaker
self.processing_circuit_breaker.call(
self.process_message,
msg
)
self.consumer.commit(msg)
except CircuitBreakerOpenError:
# Circuit breaker open — skip message или в DLQ
print(f'Circuit breaker OPEN, skipping message')
self.send_to_dlq(msg, 'Circuit breaker open')
self.consumer.commit(msg)
except Exception as e:
print(f'Processing error: {e}')
# Не commit — сообщение будет прочитано снова
def process_message(self, msg):
"""Обработка сообщения с внешними вызовами"""
data = json.loads(msg.value().decode('utf-8'))
# Внешний вызов с circuit breaker
result = self.external_service_cb.call(
self.call_external_service,
data
)
return result
def call_external_service(self, data):
# Вызов внешнего API
pass
def send_to_dlq(self, msg, reason):
# Отправка в DLQ
passRate Limiting ограничивает количество операций в единицу времени.
import time
from threading import Lock
class TokenBucket:
def __init__(self, rate=100, capacity=100):
"""
rate: количество токенов в секунду
capacity: максимальное количество токенов
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self._lock = Lock()
def acquire(self, tokens=1):
"""Получение токенов"""
with self._lock:
now = time.time()
elapsed = now - self.last_update
self.last_update = now
# Добавление токенов
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_for_token(self, tokens=1, timeout=None):
"""Ожидание токенов"""
start = time.time()
while True:
if self.acquire(tokens):
return True
if timeout and (time.time() - start) >= timeout:
return False
time.sleep(0.01) # Небольшая задержка
# Использование с Kafka producer
class RateLimitedProducer:
def __init__(self, rate=1000):
self.producer = Producer({'bootstrap.servers': 'localhost:9092'})
self.rate_limiter = TokenBucket(rate=rate, capacity=rate)
def produce(self, topic, key, value):
# Ожидание токена
self.rate_limiter.wait_for_token()
self.producer.produce(topic, key=key, value=value)
self.producer.poll(0)
def flush(self):
self.producer.flush()Bulkhead изолирует ресурсы для предотвращения каскадных сбоев.
from concurrent.futures import ThreadPoolExecutor
import threading
class BulkheadProcessor:
def __init__(self, max_concurrent=10):
self.executor = ThreadPoolExecutor(max_workers=max_concurrent)
self.semaphore = threading.Semaphore(max_concurrent)
def process(self, msg):
"""Обработка с ограничением параллелизма"""
if not self.semaphore.acquire(blocking=False):
# Достигнут лимит — отправка в DLQ или queue
self.send_to_dlq(msg, 'Bulkhead limit reached')
return
try:
future = self.executor.submit(self._process_message, msg)
return future
finally:
self.semaphore.release()
def _process_message(self, msg):
# Обработка сообщения
pass
def send_to_dlq(self, msg, reason):
# Отправка в DLQ
passPRODUCTION_CONFIG = {
# Producer
'producer': {
'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
'acks': 'all',
'retries': 5,
'retry.backoff.ms': 100,
'enable.idempotence': True,
'compression.type': 'lz4'
},
# Consumer
'consumer': {
'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
'group.id': 'production-consumer',
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
'session.timeout.ms': 30000,
'max.poll.interval.ms': 300000
},
# Retry
'retry': {
'max_retries': 5,
'base_delay': 1.0,
'max_delay': 60.0,
'jitter': True
},
# Circuit Breaker
'circuit_breaker': {
'failure_threshold': 5,
'recovery_timeout': 30,
'half_open_max_calls': 3
},
# Rate Limiting
'rate_limit': {
'rate': 1000, # сообщений в секунду
'capacity': 1000
}
}from prometheus_client import Counter, Histogram, Gauge
# Метрики
messages_processed = Counter('kafka_messages_processed_total', 'Total messages processed')
messages_failed = Counter('kafka_messages_failed_total', 'Total messages failed')
messages_dlq = Counter('kafka_messages_dlq_total', 'Total messages sent to DLQ')
retry_count = Histogram('kafka_retry_count', 'Retry count per message')
circuit_breaker_state = Gauge('kafka_circuit_breaker_state', 'Circuit breaker state', ['name'])
class MonitoredProcessor:
def __init__(self):
self.circuit_breaker = CircuitBreaker()
def process(self, msg):
try:
self.circuit_breaker.call(self._process, msg)
messages_processed.inc()
except Exception as e:
messages_failed.inc()
self.send_to_dlq(msg, str(e))
messages_dlq.inc()В следующей теме вы изучите Мониторинг и отладку:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.