Изоляция пулов ресурсов для предотвращения каскадных сбоев
«Не клади все яйца в одну корзину». Bulkheading изолирует ресурсы, чтобы сбой в одной части системы не затронул остальные.
Термин bulkheading (переборка) заимствован из кораблестроения. Корабли разделяются на водонепроницаемые отсеки переборками. Если корпус повреждён, вода заполняет только один отсек, а корабль остаётся на плаву.
Корабль без переборок: Корабль с переборками:
┌─────────────────┐ ┌─────┬─────┬─────┐
│ │ │ │ │ │
│ Вода │ │ Вода│ │ │
│ заполняет │ │ │ │ │
│ весь отсек │ │ │ │ │
│ │ │ │ │ │
└─────────────────┘ └─────┴─────┴─────┘
Корабль тонет Корабль плывёт
В распределённых системах bulkheading изолирует ресурсы (thread pools, connection pools, memory) для разных типов операций или клиентов.
Рассмотрим типичный сервис без изоляции:
# ❌ Плохо: один thread pool для всех операций
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=50)
def handle_request(request):
return executor.submit(process, request)
def process(request):
if request.type == 'report':
# Тяжёлая операция, занимает 30 секунд
return generate_report(request)
elif request.type == 'api':
# Быстрая API операция, 50 мс
return handle_api(request)
elif request.type == 'background':
# Фоновая задача, 5 секунд
return run_background_task(request)Сценарий сбоя:
Проблема: тяжёлые операции монополизируют общий ресурс, блокируя лёгкие операции.
# ✅ Хорошо: отдельные thread pools для разных типов операций
from concurrent.futures import ThreadPoolExecutor
# Изолированные пулы
api_executor = ThreadPoolExecutor(max_workers=40) # Критичные API
report_executor = ThreadPoolExecutor(max_workers=8) # Тяжёлые отчёты
background_executor = ThreadPoolExecutor(max_workers=2) # Фоновые задачи
def handle_request(request):
if request.type == 'report':
return report_executor.submit(generate_report, request)
elif request.type == 'api':
return api_executor.submit(handle_api, request)
elif request.type == 'background':
return background_executor.submit(run_background_task, request)Преимущества:
report_executor, API всё ещё имеет 40 workersИзоляция на уровне потоков выполнения.
import asyncio
from concurrent.futures import ThreadPoolExecutor
class IsolatedExecutors:
"""Набор изолированных executor для разных типов задач."""
def __init__(self):
# Критичные HTTP запросы — больше всего workers
self.http_executor = ThreadPoolExecutor(
max_workers=100,
thread_name_prefix='http_worker'
)
# Database операции — отдельный пул
self.db_executor = ThreadPoolExecutor(
max_workers=50,
thread_name_prefix='db_worker'
)
# CPU-bound задачи — ограниченный пул
self.cpu_executor = ThreadPoolExecutor(
max_workers=4, # Обычно = количеству CPU cores
thread_name_prefix='cpu_worker'
)
# External API calls — отдельный пул с таймаутами
self.external_executor = ThreadPoolExecutor(
max_workers=20,
thread_name_prefix='external_worker'
)
# Использование
executors = IsolatedExecutors()
async def process_user_request(user_id):
loop = asyncio.get_event_loop()
# HTTP запросы не блокируют DB операции
http_result = await loop.run_in_executor(
executors.http_executor,
fetch_user_data,
user_id
)
# DB операции изолированы
db_result = await loop.run_in_executor(
executors.db_executor,
save_to_database,
http_result
)
return db_resultИзоляция соединений с базами данных и внешними сервисами.
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# ❌ Плохо: один connection pool для всех операций
shared_engine = create_engine(
'postgresql://localhost/mydb',
poolclass=QueuePool,
pool_size=50, # 50 соединений на всё
max_overflow=10
)
# ✅ Хорошо: отдельные pools для разных типов операций
read_engine = create_engine(
'postgresql://localhost/mydb',
poolclass=QueuePool,
pool_size=30, # 30 соединений для read
max_overflow=10,
echo=False
)
write_engine = create_engine(
'postgresql://localhost/mydb',
poolclass=QueuePool,
pool_size=15, # 15 соединений для write
max_overflow=5,
echo=False
)
analytics_engine = create_engine(
'postgresql://localhost/mydb',
poolclass=QueuePool,
pool_size=5, # 5 соединений для аналитики
max_overflow=0, # Без overflow для тяжёлых query
echo=False
)
# Использование
def get_user(user_id):
# Использует read pool
with read_engine.connect() as conn:
return conn.execute('SELECT * FROM users WHERE id = ?', user_id)
def create_user(data):
# Использует write pool
with write_engine.connect() as conn:
return conn.execute('INSERT INTO users VALUES (?)', data)
def generate_analytics():
# Использует analytics pool — не затронет read/write
with analytics_engine.connect() as conn:
return conn.execute('SELECT complex_aggregation(...)')Изоляция памяти для разных компонентов.
import tracemalloc
class MemoryBulkhead:
"""
Ограничивает память для операции.
Если операция превышает лимит — принудительно завершается.
"""
def __init__(self, max_memory_mb=100):
self.max_memory_bytes = max_memory_mb * 1024 * 1024
def execute_with_limit(self, func, *args, **kwargs):
tracemalloc.start()
try:
result = func(*args, **kwargs)
current, peak = tracemalloc.get_traced_memory()
if peak > self.max_memory_bytes:
raise MemoryError(
f"Operation exceeded memory limit: "
f"{peak / 1024 / 1024:.2f}MB > {self.max_memory_bytes / 1024 / 1024}MB"
)
return result
finally:
tracemalloc.stop()
# Использование
memory_bulkhead = MemoryBulkhead(max_memory_mb=50)
def process_large_file(filepath):
# Если функция использует >50MB, будет MemoryError
return memory_bulkhead.execute_with_limit(
_process_file_impl,
filepath
)Изоляция ресурсов по клиентам или тенантам.
from collections import defaultdict
from threading import Semaphore
class PerClientBulkhead:
"""
Изолирует ресурсы для каждого клиента.
Предотвращает ситуацию, когда один клиент монополизирует все ресурсы.
"""
def __init__(self, max_clients=1000, max_requests_per_client=10):
self.max_clients = max_clients
self.max_requests_per_client = max_requests_per_client
# Семафоры для каждого клиента
self._client_semaphores = defaultdict(
lambda: Semaphore(max_requests_per_client)
)
# Ограничение на общее количество клиентов
self._active_clients = set()
self._clients_lock = threading.Lock()
def acquire(self, client_id):
"""Получить разрешение на выполнение запроса для клиента."""
with self._clients_lock:
if len(self._active_clients) >= self.max_clients:
# Слишком много активных клиентов
# Можно вытеснить наименее активного
pass
self._active_clients.add(client_id)
semaphore = self._client_semaphores[client_id]
acquired = semaphore.acquire(blocking=False)
if not acquired:
raise RateLimitExceeded(
f"Client {client_id} exceeded limit of {self.max_requests_per_client} concurrent requests"
)
return BulkheadHandle(self, client_id)
def release(self, client_id):
"""Освободить ресурс после завершения запроса."""
semaphore = self._client_semaphores[client_id]
semaphore.release()
class BulkheadHandle:
def __init__(self, bulkhead, client_id):
self.bulkhead = bulkhead
self.client_id = client_id
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.bulkhead.release(self.client_id)
# Использование
client_bulkhead = PerClientBulkhead(
max_clients=1000,
max_requests_per_client=5
)
def handle_request(client_id, request):
with client_bulkhead.acquire(client_id):
# Запрос выполняется в рамках лимита клиента
return process(request)Kubernetes предоставляет изоляцию на уровне инфраструктуры.
# Resource quota для namespace
apiVersion: v1
kind: ResourceQuota
metadata:
name: critical-service-quota
namespace: critical
spec:
hard:
requests.cpu: "10"
requests.memory: 20Gi
limits.cpu: "20"
limits.memory: 40Gi
pods: "50"# Ограничения для контейнеров
apiVersion: v1
kind: LimitRange
metadata:
name: container-limits
namespace: critical
spec:
limits:
- type: Container
default:
cpu: "500m"
memory: "512Mi"
defaultRequest:
cpu: "100m"
memory: "128Mi"
max:
cpu: "2"
memory: "4Gi"
min:
cpu: "50m"
memory: "64Mi"# Гарантирует минимальное количество доступных подов
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: critical-pdb
namespace: critical
spec:
minAvailable: 3
selector:
matchLabels:
app: critical-service# Приоритет для критичных подов
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: critical-priority
value: 1000000
globalDefault: false
description: "Высокий приоритет для критичных сервисов"# Под с высоким приоритетом
apiVersion: v1
kind: Pod
metadata:
name: critical-pod
spec:
priorityClassName: critical-priority
containers:
- name: app
image: myapp:latest
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1"
memory: "1Gi"Комбинация bulkhead + circuit breaker даёт максимальную защиту.
from pybreaker import CircuitBreaker
class BulkheadedService:
"""
Сервис с bulkhead и circuit breaker.
Bulkhead изолирует ресурсы, circuit breaker защищает от cascade failure.
"""
def __init__(self, name, max_workers=10, failure_threshold=5):
self.name = name
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.circuit_breaker = CircuitBreaker(
fail_max=failure_threshold,
reset_timeout=30
)
def execute(self, func, *args, **kwargs):
"""Выполняет функцию с защитой bulkhead и circuit breaker."""
# Circuit breaker проверяет состояние
@self.circuit_breaker
def protected_call():
# Bulkhead выполняет в изолированном пуле
future = self.executor.submit(func, *args, **kwargs)
return future.result(timeout=30)
return protected_call()
# Использование
payment_service = BulkheadedService(
name='payment',
max_workers=20,
failure_threshold=5
)
notification_service = BulkheadedService(
name='notification',
max_workers=10,
failure_threshold=3
)
# Сбой в notification не затронет payment
# благодаря изоляции bulkhead# Критичные операции — больше ресурсов, выше приоритет
CRITICAL_POOL = ThreadPoolExecutor(max_workers=50) # API, платежи
# Обычные операции — стандартные ресурсы
DEFAULT_POOL = ThreadPoolExecutor(max_workers=20) # Обычные запросы
# Фоновые задачи — минимум ресурсов
BACKGROUND_POOL = ThreadPoolExecutor(max_workers=5) # Отчёты, уведомленияfrom concurrent.futures import TimeoutError as FuturesTimeoutError
def execute_with_timeout(executor, func, timeout, *args, **kwargs):
"""Выполняет функцию с таймаутом в рамках bulkhead."""
future = executor.submit(func, *args, **kwargs)
try:
return future.result(timeout=timeout)
except FuturesTimeoutError:
future.cancel()
raise TimeoutError(f"Operation timed out after {timeout}s")from prometheus_client import Gauge
THREAD_POOL_SIZE = Gauge('thread_pool_size', 'Current pool size', ['pool_name'])
THREAD_POOL_ACTIVE = Gauge('thread_pool_active', 'Active threads', ['pool_name'])
THREAD_POOL_QUEUE = Gauge('thread_pool_queue', 'Queued tasks', ['pool_name'])
class MonitoredExecutor(ThreadPoolExecutor):
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = name
def submit(self, fn, *args, **kwargs):
THREAD_POOL_SIZE.labels(pool_name=self.name).set(self._max_workers)
THREAD_POOL_ACTIVE.labels(pool_name=self.name).set(len(self._threads))
return super().submit(fn, *args, **kwargs)# Алерт: thread pool заполнен более 5 минут
# expr: thread_pool_active / thread_pool_size > 0.9 for 5m
# Алерт: очередь пулов растёт
# expr: rate(thread_pool_queue[5m]) > 10
# Алерт: connection pool исчерпан
# expr: db_pool_connections_used / db_pool_connections_max > 0.95В следующей теме рассмотрим стратегии кэширования для снижения latency и нагрузки на backend.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.