Паттерн автоматического отключения неисправных сервисов
«Лучшая защита — нападение». Circuit breaker автоматически отключает неисправные сервисы, предотвращая каскадные сбои.
В микросервисной архитектуре сервисы зависят друг от друга:
Frontend → API Gateway → Order Service → Payment Service → Database
↓
Inventory Service
Если Payment Service замедляется или падает:
Без circuit breaker система продолжает отправлять запросы к мёртвому сервису, усугубляя проблему.
Circuit Breaker (автоматический выключатель) — паттерн, который автоматически прекращает запросы к неисправному сервису на определённое время.
Заимствован из электротехники: автоматический выключатель размыкает цепь при перегрузке, предотвращая пожар.
[CLOSED]
│
│ failure_rate > threshold
▼
[OPEN] ────┐
│ │
│ │ sleep_time elapsed
│ ▼
[HALF_OPEN]
│
│ success → CLOSED
│ failure → OPEN
▼
# CLOSED: все запросы выполняются
response = payment_service.charge(amount) # ✅ Успех
response = payment_service.charge(amount) # ✅ Успех
response = payment_service.charge(amount) # ❌ Ошибка
# ... ещё ошибки ...
# failure_rate достиг 50% → переход в OPEN# OPEN: запросы сразу отклоняются
try:
response = payment_service.charge(amount)
except CircuitBreakerOpenError:
# Используем fallback
response = use_fallback_payment()# HALF_OPEN: один тестовый запрос
response = payment_service.charge(amount) # ✅ Успех
# → переход в CLOSED, нормальная работа возобновляется
response = payment_service.charge(amount) # ❌ Ошибка
# → возврат в OPEN, ждём ещёimport time
from enum import Enum
from threading import Lock
from collections import deque
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreakerError(Exception):
"""Базовое исключение circuit breaker."""
pass
class CircuitBreakerOpenError(CircuitBreakerError):
"""Запрос отклонён, так как circuit breaker в состоянии OPEN."""
pass
class CircuitBreaker:
"""
Circuit breaker с настраиваемыми параметрами.
Args:
failure_threshold: Количество ошибок для перехода в OPEN
success_threshold: Количество успехов для перехода в CLOSED из HALF_OPEN
timeout: Время в секундах до перехода из OPEN в HALF_OPEN
expected_exceptions: Исключения, которые считаются ошибками
"""
def __init__(
self,
failure_threshold=5,
success_threshold=2,
timeout=30,
expected_exceptions=(Exception,)
):
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.timeout = timeout
self.expected_exceptions = expected_exceptions
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time = None
self._lock = Lock()
@property
def state(self):
with self._lock:
self._check_state_transition()
return self._state
def _check_state_transition(self):
"""Проверяет, нужно ли перейти из OPEN в HALF_OPEN."""
if self._state == CircuitState.OPEN:
if self._last_failure_time is None:
return
elapsed = time.time() - self._last_failure_time
if elapsed >= self.timeout:
self._state = CircuitState.HALF_OPEN
self._success_count = 0
self._failure_count = 0
def call(self, func, *args, **kwargs):
"""
Выполняет функцию с защитой circuit breaker.
Raises:
CircuitBreakerOpenError: Если circuit breaker в состоянии OPEN
"""
with self._lock:
self._check_state_transition()
if self._state == CircuitState.OPEN:
raise CircuitBreakerOpenError(
f"Circuit breaker is OPEN, try again after {self.timeout}s"
)
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exceptions as e:
self._on_failure()
raise
def _on_success(self):
"""Обработка успешного запроса."""
with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = CircuitState.CLOSED
self._failure_count = 0
elif self._state == CircuitState.CLOSED:
# Сбрасываем счётчик ошибок при успехе
self._failure_count = 0
def _on_failure(self):
"""Обработка ошибки."""
with self._lock:
self._failure_count += 1
self._last_failure_time = time.time()
if self._state == CircuitState.HALF_OPEN:
# Любая ошибка в HALF_OPEN возвращает в OPEN
self._state = CircuitState.OPEN
elif self._state == CircuitState.CLOSED:
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
def reset(self):
"""Сбрасывает circuit breaker в CLOSED."""
with self._lock:
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time = None
# Декоратор для удобного использования
def circuit_breaker(cb: CircuitBreaker):
def decorator(func):
def wrapper(*args, **kwargs):
return cb.call(func, *args, **kwargs)
return wrapper
return decorator
# Пример использования
payment_cb = CircuitBreaker(
failure_threshold=5, # 5 ошибок → OPEN
success_threshold=2, # 2 успеха → CLOSED
timeout=30, # 30 секунд в OPEN
expected_exceptions=(TimeoutError, ConnectionError)
)
@circuit_breaker(payment_cb)
def charge_payment(amount):
response = requests.post(
'http://payment-service/charge',
json={'amount': amount},
timeout=5.0
)
response.raise_for_status()
return response.json()
# Вызов с fallback
def charge_with_fallback(amount):
try:
return charge_payment(amount)
except CircuitBreakerOpenError:
# Circuit breaker открыт — используем fallback
logger.warning("Payment service unavailable, using fallback")
return queue_payment_for_later(amount)
except (TimeoutError, ConnectionError) as e:
# Ошибка соединения — retry или fallback
logger.warning(f"Payment error: {e}")
return queue_payment_for_later(amount)Базовая реализация считает все ошибки подряд. Более продвинутая версия использует скользящее окно (sliding window).
from collections import deque
import time
class SlidingWindowCircuitBreaker:
"""
Circuit breaker со скользящим окном для подсчёта ошибок.
Считает процент ошибок за последние N запросов или за временное окно.
"""
def __init__(
self,
failure_rate_threshold=0.5, # 50% ошибок
window_size=10, # Последние 10 запросов
timeout=30,
min_requests=5 # Минимум запросов для оценки
):
self.failure_rate_threshold = failure_rate_threshold
self.window_size = window_size
self.timeout = timeout
self.min_requests = min_requests
self._state = CircuitState.CLOSED
self._results = deque() # (timestamp, success: bool)
self._last_failure_time = None
self._lock = Lock()
def _cleanup_window(self):
"""Удаляет старые записи за окном."""
cutoff = time.time() - 60 # Окно 60 секунд
while self._results and self._results[0][0] < cutoff:
self._results.popleft()
def _get_failure_rate(self):
"""Вычисляет процент ошибок за окном."""
self._cleanup_window()
if len(self._results) < self.min_requests:
return 0.0
failures = sum(1 for _, success in self._results if not success)
return failures / len(self._results)
def _record_result(self, success):
"""Записывает результат запроса."""
self._results.append((time.time(), success))
# Ограничиваем размер окна
while len(self._results) > self.window_size:
self._results.popleft()
def call(self, func, *args, **kwargs):
with self._lock:
# Проверка перехода из OPEN в HALF_OPEN
if self._state == CircuitState.OPEN:
if self._last_failure_time:
elapsed = time.time() - self._last_failure_time
if elapsed >= self.timeout:
self._state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError("Circuit breaker is OPEN")
if self._state == CircuitState.OPEN:
raise CircuitBreakerOpenError("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except (TimeoutError, ConnectionError) as e:
self._on_failure()
raise
def _on_success(self):
with self._lock:
self._record_result(True)
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.CLOSED
def _on_failure(self):
with self._lock:
self._record_result(False)
self._last_failure_time = time.time()
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
elif self._state == CircuitState.CLOSED:
failure_rate = self._get_failure_rate()
if failure_rate >= self.failure_rate_threshold:
self._state = CircuitState.OPENCircuit breaker отклоняет запросы в состоянии OPEN. Что делать вместо этого?
def get_product_with_fallback(product_id):
try:
return product_service.get(product_id)
except CircuitBreakerOpenError:
# Возвращаем данные из кэша
cached = redis.get(f"product:{product_id}")
if cached:
return json.loads(cached)
raise ProductUnavailableError("Product service unavailable")def get_recommendations_with_fallback(user_id):
try:
return recommendation_service.get(user_id)
except CircuitBreakerOpenError:
# Возвращаем популярные товары вместо персональных
return get_popular_items(limit=10)def charge_payment_with_fallback(amount):
try:
return payment_service.charge(amount)
except CircuitBreakerOpenError:
# Сохраняем в очередь для обработки позже
payment_queue.enqueue({
'amount': amount,
'timestamp': time.time()
})
return {'status': 'queued', 'message': 'Payment will be processed later'}def process_payment_with_fallback(amount):
try:
return primary_payment_service.charge(amount)
except CircuitBreakerOpenError:
# Переключаемся на резервный сервис
return backup_payment_service.charge(amount)def get_checkout_page_with_fallback(cart_id):
"""Собирает страницу checkout с деградацией."""
cart = cart_service.get(cart_id)
# Пытаемся получить все данные, с fallback при недоступности
try:
shipping_options = shipping_service.get_options(cart.address)
except CircuitBreakerOpenError:
shipping_options = DEFAULT_SHIPPING_OPTIONS
try:
tax_amount = tax_service.calculate(cart.total)
except CircuitBreakerOpenError:
tax_amount = estimate_tax(cart.total) # Приблизительный расчёт
try:
promotions = promotion_service.get_active(cart.user_id)
except CircuitBreakerOpenError:
promotions = [] # Без промокодов
return render_checkout(
cart=cart,
shipping=shipping_options,
tax=tax_amount,
promotions=promotions
)| Сценарий | Рекомендуемое значение | Обоснование |
|---|---|---|
| Критичный сервис (платежи) | 3-5 | Быстрая реакция на сбои |
| Стабильный внутренний сервис | 5-10 | Допускаем временные сбои |
| Внешний API (нестабильный) | 3-5 | Быстрая защита от внешних проблем |
| Высокий трафик (>1000 RPS) | Используйте % ошибок | Абсолютное число не подходит |
| Сценарий | Рекомендуемое значение |
|---|---|
| Быстровосстанавливаемый сервис | 10-30 секунд |
| Сервис с долгим рестартом | 1-5 минут |
| Внешний API | 30-60 секунд |
| База данных | 30-120 секунд |
from prometheus_client import Counter, Gauge
# Метрики для мониторинга
CIRCUIT_BREAKER_STATE = Gauge(
'circuit_breaker_state',
'Current state of circuit breaker (0=closed, 1=open, 2=half_open)',
['service']
)
CIRCUIT_BREAKER_FAILURES = Counter(
'circuit_breaker_failures_total',
'Total number of failures recorded',
['service']
)
CIRCUIT_BREAKER_TRIPS = Counter(
'circuit_breaker_trips_total',
'Total number of times circuit breaker opened',
['service']
)
class MonitoredCircuitBreaker(CircuitBreaker):
def __init__(self, service_name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.service_name = service_name
def _on_success(self):
super()._on_success()
CIRCUIT_BREAKER_STATE.labels(service=self.service_name).set(
list(CircuitState).index(self._state)
)
def _on_failure(self):
super()._on_failure()
CIRCUIT_BREAKER_FAILURES.labels(service=self.service_name).inc()
CIRCUIT_BREAKER_STATE.labels(service=self.service_name).set(
list(CircuitState).index(self._state)
)
if self._state == CircuitState.OPEN:
CIRCUIT_BREAKER_TRIPS.labels(service=self.service_name).inc()
# Использование
payment_cb = MonitoredCircuitBreaker(
service_name='payment_service',
failure_threshold=5,
timeout=30
)Circuit breaker добавляет сложность. Используйте для:
Не используйте для:
# ❌ Плохо: circuit breaker без fallback
try:
return payment_service.charge(amount)
except CircuitBreakerOpenError:
raise # Клиент получает ошибку
# ✅ Хорошо: с fallback
try:
return payment_service.charge(amount)
except CircuitBreakerOpenError:
return queue_payment_for_later(amount)def test_circuit_breaker_opens_on_failures():
cb = CircuitBreaker(failure_threshold=3, timeout=10)
# Симулируем 3 ошибки
for _ in range(3):
with pytest.raises(TimeoutError):
cb.call(lambda: (_ for _ in ()).throw(TimeoutError()))
assert cb.state == CircuitState.OPEN
# Запрос должен сразу отклоняться
with pytest.raises(CircuitBreakerOpenError):
cb.call(lambda: "success")# Алерт: circuit breaker открыт более 5 минут
# Severity: warning
# Алерт: circuit breaker постоянно открывается/закрывается (flapping)
# Severity: critical
# Алерт: все circuit breaker к сервису X открыты
# Severity: critical — сервис X полностью недоступенВ следующей теме рассмотрим Bulkheading — паттерн изоляции ресурсов для локализации сбоев.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.