Rate limiting, load shedding, adaptive concurrency control
«Лучше отклонить часть запросов, чем потерять всё». Адаптивные системы жертвуют частью нагрузки ради стабильности.
Нормальная нагрузка:
Запросы: 1000 RPS → Система: 1000 RPS, latency: 50 мс
Перегрузка:
Запросы: 5000 RPS → Система: 1000 RPS, latency: 2000 мс, errors: 50%
Система работает на пределе, но пытается принять всё → деградация для всех
Решение: адаптивные системы отклоняют избыточные запросы, сохраняя latency для остальных.
Rate limiting ограничивает количество запросов от клиента или к сервису.
import time
from collections import defaultdict
class FixedWindowRateLimiter:
"""
Rate limiter с фиксированными окнами.
Просто реализуется, но имеет проблему на границах окон.
"""
def __init__(self, max_requests=100, window_seconds=60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.counters = defaultdict(int) # {window_key: count}
self.windows = defaultdict(int) # {client_id: window_key}
def _get_window_key(self):
"""Получает ключ текущего окна."""
return int(time.time() // self.window_seconds)
def allow_request(self, client_id):
"""Проверяет, разрешён ли запрос."""
current_window = self._get_window_key()
# Сброс, если новое окно
if self.windows.get(client_id) != current_window:
self.counters[(client_id, current_window)] = 0
self.windows[client_id] = current_window
# Проверка лимита
key = (client_id, current_window)
if self.counters[key] >= self.max_requests:
return False
self.counters[key] += 1
return True
# Использование
limiter = FixedWindowRateLimiter(max_requests=100, window_seconds=60)
def handle_request(client_id):
if limiter.allow_request(client_id):
return process(client_id)
else:
return {'status': 429, 'message': 'Rate limit exceeded'}Проблема на границах:
Окно 1: [0-60с] Окно 2: [60-120с]
T=59с: 100 запросов (лимит исчерпан)
T=61с: 100 запросов (новое окно)
Итого: 200 запросов за 2 секунды!
from collections import deque
import time
class SlidingWindowLogRateLimiter:
"""
Rate limiter со скользящим окном (лог запросов).
Точный, но требует больше памяти.
"""
def __init__(self, max_requests=100, window_seconds=60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.logs = defaultdict(deque) # {client_id: [timestamps]}
def allow_request(self, client_id):
now = time.time()
window_start = now - self.window_seconds
# Удаляем старые записи за окном
log = self.logs[client_id]
while log and log[0] < window_start:
log.popleft()
# Проверка лимита
if len(log) >= self.max_requests:
return False
log.append(now)
return True
# Использование
limiter = SlidingWindowLogRateLimiter(max_requests=100, window_seconds=60)
# Точный подсчёт за любые 60 секунд
# Нет проблемы на границах оконПреимущество: точный подсчёт за любое скользящее окно. Недостаток: память O(N) для логов.
class SlidingWindowCounterRateLimiter:
"""
Rate limiter со скользящим окном (аппроксимация).
Компромисс между точностью и памятью.
"""
def __init__(self, max_requests=100, window_seconds=60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.current_window = defaultdict(int)
self.previous_window = defaultdict(int)
self.current_window_key = defaultdict(int)
def _get_window_key(self):
return int(time.time() // self.window_seconds)
def allow_request(self, client_id):
now = time.time()
current_key = self._get_window_key()
# Сброс окон
if self.current_window_key[client_id] != current_key:
self.previous_window[client_id] = self.current_window[client_id]
self.current_window[client_id] = 0
self.current_window_key[client_id] = current_key
# Вычисляем вес предыдущего окна
window_progress = (now % self.window_seconds) / self.window_seconds
previous_weight = 1 - window_progress
# Аппроксимация: current + previous * weight
estimated_count = (
self.current_window[client_id] +
self.previous_window[client_id] * previous_weight
)
if estimated_count >= self.max_requests:
return False
self.current_window[client_id] += 1
return True
# Использование
# T=59с: 100 запросов
# T=61с: window_progress=0.017, previous_weight=0.983
# estimated = 0 + 100 * 0.983 = 98.3 ≈ 98
# Можно ещё 2 запросаimport time
class TokenBucketRateLimiter:
"""
Token bucket rate limiter.
Позволяет кратковременные всплески (burst), но ограничивает среднюю скорость.
"""
def __init__(self, capacity=100, refill_rate=10):
"""
Args:
capacity: Максимальное количество токенов (burst size)
refill_rate: Токенов в секунду (sustainable rate)
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.buckets = {} # {client_id: (tokens, last_update)}
def _refill(self, client_id):
"""Пополняет bucket токенами."""
now = time.time()
if client_id not in self.buckets:
self.buckets[client_id] = (self.capacity, now)
return
tokens, last_update = self.buckets[client_id]
elapsed = now - last_update
# Добавляем токены за прошедшее время
new_tokens = min(self.capacity, tokens + elapsed * self.refill_rate)
self.buckets[client_id] = (new_tokens, now)
def allow_request(self, client_id):
self._refill(client_id)
tokens, last_update = self.buckets[client_id]
if tokens < 1:
return False
self.buckets[client_id] = (tokens - 1, last_update)
return True
def get_tokens(self, client_id):
"""Возвращает текущее количество токенов."""
self._refill(client_id)
return self.buckets.get(client_id, (self.capacity, time.time()))[0]
# Использование
limiter = TokenBucketRateLimiter(capacity=100, refill_rate=10)
# Burst: можно 100 запросов сразу (если bucket полный)
# Затем: максимум 10 запросов в секунду (refill rate)
# Пример:
# T=0: bucket = 100 токенов
# T=0: 100 запросов → bucket = 0
# T=1: bucket = 10 (refill 10 токенов/сек)
# T=1: 10 запросов → bucket = 0Преимущества:
import time
from collections import deque
class LeakyBucketRateLimiter:
"""
Leaky bucket rate limiter.
Сглаживает всплески, выдавая запросы с фиксированной скоростью.
"""
def __init__(self, capacity=100, leak_rate=10):
"""
Args:
capacity: Максимальный размер очереди (bucket size)
leak_rate: Запросов в секунду (выходная скорость)
"""
self.capacity = capacity
self.leak_rate = leak_rate
self.buckets = defaultdict(deque) # {client_id: [request_times]}
def _leak(self, client_id):
"""Удаляет обработанные запросы."""
now = time.time()
bucket = self.buckets[client_id]
# Вычисляем, сколько запросов «вытекло»
while bucket and bucket[0] <= now:
bucket.popleft()
def allow_request(self, client_id):
now = time.time()
bucket = self.buckets[client_id]
# Вычисляем время следующего «вытекания»
if bucket:
next_leak_time = bucket[0] + 1 / self.leak_rate
if now < next_leak_time:
# Bucket ещё не «вытек»
if len(bucket) >= self.capacity:
return False
bucket.append(now + 1 / self.leak_rate)
return True
# Использование
limiter = LeakyBucketRateLimiter(capacity=100, leak_rate=10)
# В отличие от token bucket, leaky bucket не позволяет burst
# Запросы обрабатываются равномерно: 10 запросов в секундуRate limiting ограничивает клиентов. Load shedding отклоняет запросы при перегрузке системы.
import os
import time
class CPUBasedLoadShedder:
"""
Load shedder на основе загрузки CPU.
При высокой CPU отклоняет часть запросов.
"""
def __init__(self, threshold=0.8, shed_rate=0.5):
"""
Args:
threshold: Порог CPU (0.8 = 80%)
shed_rate: Процент отклоняемых запросов при превышении
"""
self.threshold = threshold
self.shed_rate = shed_rate
def get_cpu_usage(self):
"""Получает загрузку CPU."""
return os.getloadavg()[0] / os.cpu_count()
def should_accept(self):
"""Решает, принять ли запрос."""
cpu_usage = self.get_cpu_usage()
if cpu_usage < self.threshold:
return True
# Прогрессивное отклонение
excess = (cpu_usage - self.threshold) / (1 - self.threshold)
actual_shed_rate = min(1.0, self.shed_rate * excess)
import random
return random.random() > actual_shed_rate
# Использование
shedder = CPUBasedLoadShedder(threshold=0.8, shed_rate=0.5)
def handle_request(request):
if not shedder.should_accept():
return {'status': 503, 'message': 'Service overloaded'}
return process(request)import time
from collections import deque
class LatencyBasedLoadShedder:
"""
Load shedder на основе latency.
При росте latency отклоняет часть запросов.
"""
def __init__(self, p95_threshold=200, shed_rate=0.5, window_size=100):
"""
Args:
p95_threshold: Порог p95 latency (мс)
shed_rate: Процент отклоняемых запросов при превышении
window_size: Размер окна для расчёта p95
"""
self.p95_threshold = p95_threshold
self.shed_rate = shed_rate
self.window_size = window_size
self.latencies = deque(maxlen=window_size)
def record_latency(self, latency_ms):
"""Записывает latency."""
self.latencies.append(latency_ms)
def _calculate_p95(self):
"""Вычисляет p95 latency."""
if len(self.latencies) < 10:
return 0
sorted_latencies = sorted(self.latencies)
p95_idx = int(len(sorted_latencies) * 0.95)
return sorted_latencies[p95_idx]
def should_accept(self):
"""Решает, принять ли запрос."""
p95 = self._calculate_p95()
if p95 < self.p95_threshold:
return True
# Прогрессивное отклонение
excess = (p95 - self.p95_threshold) / self.p95_threshold
actual_shed_rate = min(1.0, self.shed_rate * excess)
import random
return random.random() > actual_shed_rate
# Использование
shedder = LatencyBasedLoadShedder(p95_threshold=200, shed_rate=0.5)
def handle_request(request):
start = time.time()
if not shedder.should_accept():
return {'status': 503, 'message': 'Service overloaded'}
try:
result = process(request)
return result
finally:
latency_ms = (time.time() - start) * 1000
shedder.record_latency(latency_ms)import asyncio
from enum import Enum
class Priority(Enum):
HIGH = 1
MEDIUM = 2
LOW = 3
class PriorityLoadShedder:
"""
Load shedder с приоритетами.
При перегрузке отклоняет запросы с низким приоритетом.
"""
def __init__(self, max_queue_size=1000):
self.max_queue_size = max_queue_size
self.queue_size = 0
def should_accept(self, priority: Priority):
"""Решает, принять ли запрос на основе приоритета."""
queue_ratio = self.queue_size / self.max_queue_size
if queue_ratio < 0.5:
# Очередь менее чем наполовину — принимаем всё
return True
if queue_ratio < 0.7:
# 50-70% — отклоняем LOW
return priority != Priority.LOW
if queue_ratio < 0.9:
# 70-90% — отклоняем LOW и MEDIUM
return priority == Priority.HIGH
# >90% — отклоняем всё
return False
def enter_queue(self):
self.queue_size += 1
def leave_queue(self):
self.queue_size -= 1
# Использование
shedder = PriorityLoadShedder(max_queue_size=1000)
async def handle_request(request, priority: Priority):
if not shedder.should_accept(priority):
return {'status': 503, 'message': f'Service overloaded, {priority.name} rejected'}
shedder.enter_queue()
try:
return await process(request)
finally:
shedder.leave_queue()Автоматическая регулировка количества параллельных запросов.
import asyncio
import time
from collections import deque
class AdaptiveConcurrencyLimiter:
"""
Адаптивный ограничитель конкурентности.
Основан на алгоритме Netflix Concurrency Limits.
Принцип:
- Увеличивает concurrency limit, пока latency растёт медленно
- Уменьшает при резком скачке latency (признак насыщения)
"""
def __init__(self, initial_limit=10, min_limit=5, max_limit=100):
self.concurrency_limit = initial_limit
self.min_limit = min_limit
self.max_limit = max_limit
self.active_requests = 0
self.lock = asyncio.Lock()
# История для адаптации
self.latency_history = deque(maxlen=100)
self.min_rtt = float('inf') # Минимальная latency
self.smoothed_rtt = None
async def acquire(self):
"""Получает разрешение на выполнение запроса."""
async with self.lock:
while self.active_requests >= self.concurrency_limit:
# Ждём, пока освободится слот
await asyncio.sleep(0.01)
self.active_requests += 1
def release(self, latency_ms: float):
"""
Освобождает слот и обновляет limit на основе latency.
Args:
latency_ms: Время выполнения запроса
"""
self.active_requests -= 1
self._update_limit(latency_ms)
def _update_limit(self, latency_ms: float):
"""Обновляет concurrency limit на основе latency."""
self.latency_history.append(latency_ms)
# Обновляем min_rtt (скользящий минимум)
if latency_ms < self.min_rtt:
self.min_rtt = latency_ms
else:
# Минимальная latency постепенно «забывается»
self.min_rtt = self.min_rtt * 0.999 + latency_ms * 0.001
# Вычисляем smoothed RTT
if self.smoothed_rtt is None:
self.smoothed_rtt = latency_ms
else:
self.smoothed_rtt = self.smoothed_rtt * 0.9 + latency_ms * 0.1
# Оцениваем нагрузку
if len(self.latency_history) < 10:
return
# Если latency близка к min_rtt — можно увеличить limit
# Если latency сильно выросла — уменьшаем limit
ratio = self.smoothed_rtt / max(self.min_rtt, 1)
if ratio < 1.5:
# Latency стабильна — увеличиваем limit
self.concurrency_limit = min(
self.max_limit,
self.concurrency_limit + 1
)
elif ratio > 3:
# Latency выросла в 3+ раза — насыщение, уменьшаем limit
self.concurrency_limit = max(
self.min_limit,
int(self.concurrency_limit * 0.8)
)
# Использование
limiter = AdaptiveConcurrencyLimiter(initial_limit=10)
async def handle_request(request):
await limiter.acquire()
start = time.time()
try:
return await process(request)
finally:
latency_ms = (time.time() - start) * 1000
limiter.release(latency_ms)
# Автоматическая адаптация:
# - При низкой нагрузке limit растёт до max_limit
# - При перегрузке limit падает, предотвращая collapsefrom prometheus_client import Counter, Gauge, Histogram
RATE_LIMIT_HITS = Counter(
'rate_limit_hits_total',
'Total rate limit rejections',
['client_id', 'limiter_type']
)
LOAD_SHED_TOTAL = Counter(
'load_shed_total',
'Total load shed rejections',
['reason']
)
CONCURRENCY_LIMIT = Gauge(
'concurrency_limit',
'Current concurrency limit'
)
CPU_USAGE = Gauge(
'cpu_usage_ratio',
'Current CPU usage ratio'
)
P95_LATENCY = Gauge(
'request_latency_p95_seconds',
'Current p95 request latency'
)
# Алерты
# rate(rate_limit_hits_total[5m]) > 100
# rate(load_shed_total[5m]) > 10
# cpu_usage_ratio > 0.8 for 5m
# concurrency_limit < min_limit for 5m# Для клиентских API
limiter = TokenBucketRateLimiter(capacity=100, refill_rate=10)
# Позволяет burst, но ограничивает среднюю скорость# При росте latency отклоняйте запросы
shedder = LatencyBasedLoadShedder(p95_threshold=200, shed_rate=0.5)# Критичные запросы (платежи) не отклоняются
# Фоновые задачи (отчёты) отклоняются первыми
shedder = PriorityLoadShedder(max_queue_size=1000)# Автоматическая настройка под нагрузку
limiter = AdaptiveConcurrencyLimiter(initial_limit=10, min_limit=5, max_limit=100)Курс завершён! Запустите валидатор для проверки структуры курса.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.