Алгоритмы rate limiting, реализация middleware для FastAPI
Rate limiting защищает API от злоупотреблений, DDoS-атак, ошибок клиентов и обеспечивает fair usage. Redis с его атомарными операциями и TTL — идеальное хранилище для реализации rate limiting.
| Причина | Описание |
|---|---|
| Защита от DDoS | Ограничение запросов от одного IP/клиента |
| Предотвращение abuse | Защита от брутфорса, скрапинга, спама |
| Fair usage | Гарантия, что один клиент не монополизирует ресурсы |
| Cost control | Ограничение дорогих операций (поиск, экспорт) |
| API tiers | Разные лимиты для free/premium/enterprise планов |
Простейший алгоритм: подсчёт запросов в фиксированном окне времени.
import redis.asyncio as redis
class FixedWindowRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def is_allowed(
self,
key: str,
max_requests: int,
window_seconds: int,
) -> tuple[bool, dict]:
"""
Проверка, разрешён ли запрос.
Returns:
(allowed, info) где info содержит детали
"""
# Ключ для текущего окна
window = int(asyncio.get_event_loop().time()) // window_seconds
redis_key = f'rate_limit:fixed:{key}:{window}'
# Атомарный инкремент
current = await self.redis.incr(redis_key)
# Устанавливаем TTL только для первого запроса в окне
if current == 1:
await self.redis.expire(redis_key, window_seconds + 1)
# Проверка лимита
allowed = current <= max_requests
return allowed, {
'current': current,
'limit': max_requests,
'reset': window_seconds - (int(asyncio.get_event_loop().time()) % window_seconds),
'retry_after': 0 if allowed else (window_seconds - (int(asyncio.get_event_loop().time()) % window_seconds)),
}
# Использование в middleware
from fastapi import FastAPI, Request, HTTPException, status
app = FastAPI()
limiter = FixedWindowRateLimiter(app.state.redis)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host
key = f'ip:{client_ip}'
allowed, info = await limiter.is_allowed(
key,
max_requests=100,
window_seconds=60,
)
if not allowed:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail='Rate limit exceeded',
headers={
'X-RateLimit-Limit': str(info['limit']),
'X-RateLimit-Remaining': '0',
'X-RateLimit-Reset': str(info['reset']),
'Retry-After': str(info['retry_after']),
},
)
response = await call_next(request)
# Добавляем заголовки с информацией о лимитах
response.headers['X-RateLimit-Limit'] = str(info['limit'])
response.headers['X-RateLimit-Remaining'] = str(max(0, info['limit'] - info['current']))
response.headers['X-RateLimit-Reset'] = str(info['reset'])
return responseПроблема Fixed Window: на границе окон возможны всплески. Например, при лимите 100 запросов в минуту, клиент может сделать 100 запросов в 59-й секунде и ещё 100 в 0-й секунде следующей минуты — 200 запросов за 2 секунды.
Хранит timestamp каждого запроса. Точно, но требует больше памяти.
import time
class SlidingWindowLogRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def is_allowed(
self,
key: str,
max_requests: int,
window_seconds: int,
) -> tuple[bool, dict]:
now = time.time()
redis_key = f'rate_limit:sliding_log:{key}'
window_start = now - window_seconds
# Удаляем старые записи за окном
await self.redis.zremrangebyscore(redis_key, '-inf', window_start)
# Считаем текущее количество запросов в окне
current = await self.redis.zcard(redis_key)
if current >= max_requests:
# Лимит превышен
# Получаем timestamp oldest запроса для расчёта retry_after
oldest = await self.redis.zrange(redis_key, 0, 0, withscores=True)
retry_after = int(oldest[0][1] + window_seconds - now) if oldest else 0
return False, {
'current': current,
'limit': max_requests,
'reset': window_seconds,
'retry_after': max(0, retry_after),
}
# Добавляем текущий запрос
await self.redis.zadd(redis_key, {str(now): now})
await self.redis.expire(redis_key, window_seconds)
return True, {
'current': current + 1,
'limit': max_requests,
'reset': window_seconds,
'retry_after': 0,
}Преимущества: точный подсчёт, нет проблемы границ окон. Недостатки: больше памяти (O(N) где N — количество запросов в окне).
Компромисс между Fixed Window и Sliding Window Log. Использует взвешенное среднее двух окон.
class SlidingWindowCounterRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def is_allowed(
self,
key: str,
max_requests: int,
window_seconds: int,
) -> tuple[bool, dict]:
now = time.time()
current_window = int(now // window_seconds)
previous_window = current_window - 1
current_key = f'rate_limit:swc:{key}:{current_window}'
previous_key = f'rate_limit:swc:{key}:{previous_window}'
# Получаем счётчики окон
current_count = int(await self.redis.get(current_key) or 0)
previous_count = int(await self.redis.get(previous_key) or 0)
# Вес предыдущего окна (какая часть окна ещё актуальна)
window_progress = (now % window_seconds) / window_seconds
previous_weight = 1 - window_progress
# Взвешенное среднее
weighted_count = current_count + (previous_count * previous_weight)
if weighted_count >= max_requests:
return False, {
'current': int(weighted_count),
'limit': max_requests,
'reset': int(window_seconds - (now % window_seconds)),
'retry_after': int(window_seconds - (now % window_seconds)),
}
# Инкремент текущего окна
pipe = self.redis.pipeline()
pipe.incr(current_key)
pipe.expire(current_key, window_seconds * 2) # TTL для текущего и предыдущего
await pipe.execute()
return True, {
'current': int(weighted_count) + 1,
'limit': max_requests,
'reset': int(window_seconds - (now % window_seconds)),
'retry_after': 0,
}Преимущества: точность близка к Sliding Window Log, но использует O(1) памяти. Недостатки: приближённый алгоритм, возможна небольшая погрешность.
Классический алгоритм: бакет наполняется токенами со скоростью R токенов в секунду. Каждый запрос потребляет токен.
class TokenBucketRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def is_allowed(
self,
key: str,
max_tokens: int,
refill_rate: float, # токенов в секунду
) -> tuple[bool, dict]:
now = time.time()
redis_key = f'rate_limit:token_bucket:{key}'
# Lua скрипт для атомарной операции
lua_script = """
local key = KEYS[1]
local max_tokens = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- Получаем текущее состояние
local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
local tokens = tonumber(bucket[1]) or max_tokens
local last_update = tonumber(bucket[2]) or now
-- Вычисляем количество добавленных токенов
local elapsed = now - last_update
local new_tokens = math.min(max_tokens, tokens + (elapsed * refill_rate))
-- Проверяем, есть ли токен
local allowed = 0
if new_tokens >= 1 then
new_tokens = new_tokens - 1
allowed = 1
end
-- Обновляем состояние
redis.call('HMSET', key, 'tokens', new_tokens, 'last_update', now)
redis.call('EXPIRE', key, math.ceil(max_tokens / refill_rate) + 1)
return {allowed, math.floor(new_tokens)}
"""
if not hasattr(self, '_token_bucket_script'):
self._token_bucket_script = self.redis.register_script(lua_script)
result = await self._token_bucket_script(
keys=[redis_key],
args=[max_tokens, refill_rate, now],
)
allowed = bool(result[0])
remaining_tokens = int(result[1])
# Расчёт retry_after
if not allowed:
retry_after = int((1 - remaining_tokens) / refill_rate)
else:
retry_after = 0
return allowed, {
'current': max_tokens - remaining_tokens,
'limit': max_tokens,
'reset': int(max_tokens / refill_rate),
'retry_after': retry_after,
'remaining_tokens': remaining_tokens,
}Преимущества:
Недостатки: сложнее в реализации, требует Lua для атомарности.
Обратная идея: запросы добавляются в очередь (бакет), которая "протекает" с постоянной скоростью.
class LeakyBucketRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def is_allowed(
self,
key: str,
max_capacity: int,
leak_rate: float, # запросов в секунду
) -> tuple[bool, dict]:
now = time.time()
redis_key = f'rate_limit:leaky_bucket:{key}'
# Lua скрипт
lua_script = """
local key = KEYS[1]
local max_capacity = tonumber(ARGV[1])
local leak_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- Получаем текущее состояние
local bucket = redis.call('HMGET', key, 'water_level', 'last_leak')
local water_level = tonumber(bucket[1]) or 0
local last_leak = tonumber(bucket[2]) or now
-- Вычисляем, сколько утекло
local elapsed = now - last_leak
local leaked = elapsed * leak_rate
water_level = math.max(0, water_level - leaked)
-- Проверяем, есть ли место
local allowed = 0
if water_level < max_capacity then
water_level = water_level + 1
allowed = 1
end
-- Обновляем состояние
redis.call('HMSET', key, 'water_level', water_level, 'last_leak', now)
redis.call('EXPIRE', key, math.ceil(max_capacity / leak_rate) + 1)
return {allowed, math.floor(water_level)}
"""
if not hasattr(self, '_leaky_bucket_script'):
self._leaky_bucket_script = self.redis.register_script(lua_script)
result = await self._leaky_bucket_script(
keys=[redis_key],
args=[max_capacity, leak_rate, now],
)
allowed = bool(result[0])
current_level = int(result[1])
return allowed, {
'current': current_level,
'limit': max_capacity,
'reset': int(max_capacity / leak_rate),
'retry_after': 0 if allowed else int((current_level + 1 - max_capacity) / leak_rate),
}| Алгоритм | Точность | Память | Burst | Сложность |
|---|---|---|---|---|
| Fixed Window | Низкая | O(1) | Нет | Простой |
| Sliding Window Log | Высокая | O(N) | Нет | Средний |
| Sliding Window Counter | Средняя | O(1) | Нет | Средний |
| Token Bucket | Высокая | O(1) | Да | Сложный |
| Leaky Bucket | Высокая | O(1) | Сглаживает | Сложный |
from enum import Enum
from typing import Callable
from fastapi import FastAPI, Request, HTTPException, status
from fastapi.responses import JSONResponse
class RateLimitAlgorithm(str, Enum):
FIXED_WINDOW = "fixed_window"
SLIDING_WINDOW_LOG = "sliding_window_log"
SLIDING_WINDOW_COUNTER = "sliding_window_counter"
TOKEN_BUCKET = "token_bucket"
class RateLimiter:
def __init__(self, redis_client, algorithm: RateLimitAlgorithm = RateLimitAlgorithm.TOKEN_BUCKET):
self.redis = redis_client
self.algorithm = algorithm
if algorithm == RateLimitAlgorithm.FIXED_WINDOW:
self._limiter = FixedWindowRateLimiter(redis_client)
elif algorithm == RateLimitAlgorithm.SLIDING_WINDOW_LOG:
self._limiter = SlidingWindowLogRateLimiter(redis_client)
elif algorithm == RateLimitAlgorithm.SLIDING_WINDOW_COUNTER:
self._limiter = SlidingWindowCounterRateLimiter(redis_client)
elif algorithm == RateLimitAlgorithm.TOKEN_BUCKET:
self._limiter = TokenBucketRateLimiter(redis_client)
async def check(
self,
key: str,
max_requests: int,
window_seconds: int,
) -> tuple[bool, dict]:
return await self._limiter.is_allowed(key, max_requests, window_seconds)
# Middleware с гибкой конфигурацией
def create_rate_limit_middleware(
limiter: RateLimiter,
default_limit: int = 100,
default_window: int = 60,
key_func: Callable[[Request], str] = lambda r: f'ip:{r.client.host}',
exclude_paths: set | None = None,
):
exclude_paths = exclude_paths or {'/health', '/docs', '/openapi.json'}
async def middleware(request: Request, call_next):
# Исключения
if request.url.path in exclude_paths:
return await call_next(request)
# Получаем ключ
key = key_func(request)
# Проверка лимита
allowed, info = await limiter.check(key, default_limit, default_window)
if not allowed:
return JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={
'detail': 'Rate limit exceeded',
'limit': info['limit'],
'retry_after': info['retry_after'],
},
headers={
'X-RateLimit-Limit': str(info['limit']),
'X-RateLimit-Remaining': '0',
'X-RateLimit-Reset': str(info['reset']),
'Retry-After': str(info['retry_after']),
},
)
response = await call_next(request)
# Заголовки
response.headers['X-RateLimit-Limit'] = str(info['limit'])
response.headers['X-RateLimit-Remaining'] = str(max(0, info['limit'] - info['current']))
response.headers['X-RateLimit-Reset'] = str(info['reset'])
return response
return middleware
# Использование
app = FastAPI()
limiter = RateLimiter(app.state.redis, algorithm=RateLimitAlgorithm.TOKEN_BUCKET)
app.add_middleware(
create_rate_limit_middleware,
limiter=limiter,
default_limit=100,
default_window=60,
)from fastapi import Depends
class AuthenticatedRateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
async def get_user_key(self, request: Request) -> str:
"""Получение ключа для аутентифицированного пользователя"""
# Предполагается, что user_id установлен в request.state
user_id = getattr(request.state, 'user_id', None)
if user_id:
return f'user:{user_id}'
else:
# Для анонимных — по IP
return f'ip:{request.client.host}'
async def check_by_endpoint(
self,
request: Request,
endpoint_limits: dict[str, tuple[int, int]],
) -> tuple[bool, dict]:
"""
Разные лимиты для разных эндпоинтов.
endpoint_limits: {
'/api/search': (10, 60), # 10 запросов в минуту
'/api/export': (5, 3600), # 5 экспортов в час
}
"""
path = request.url.path
key = await self.get_user_key(request)
# Находим подходящий лимит
for endpoint_pattern, (limit, window) in endpoint_limits.items():
if path.startswith(endpoint_pattern):
limiter = TokenBucketRateLimiter(self.redis)
return await limiter.is_allowed(f'{key}:{endpoint_pattern}', limit, window)
# Дефолтный лимит
limiter = TokenBucketRateLimiter(self.redis)
return await limiter.is_allowed(f'{key}:default', 100, 60)
# Dependency для endpoint-specific rate limiting
async def rate_limit_dependency(
request: Request,
limiter: AuthenticatedRateLimiter = Depends(lambda: AuthenticatedRateLimiter(app.state.redis)),
):
endpoint_limits = {
'/api/search': (10, 60),
'/api/export': (5, 3600),
'/api/upload': (20, 60),
}
allowed, info = await limiter.check_by_endpoint(request, endpoint_limits)
if not allowed:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail='Rate limit exceeded',
headers={'Retry-After': str(info['retry_after'])},
)
return info
# Использование в endpoint
@app.get("/api/search")
async def search(
query: str,
rate_limit_info: dict = Depends(rate_limit_dependency),
):
# Поиск...
return {'results': [], 'rate_limit': rate_limit_info}class RateLimitTier:
def __init__(self, name: str, requests_per_minute: int, requests_per_hour: int):
self.name = name
self.rpm = requests_per_minute
self.rph = requests_per_hour
TIERS = {
'free': RateLimitTier('free', 10, 100),
'premium': RateLimitTier('premium', 100, 1000),
'enterprise': RateLimitTier('enterprise', 1000, 10000),
}
class TieredRateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
async def check(
self,
user_id: str,
tier: RateLimitTier,
) -> tuple[bool, dict]:
# Проверяем оба лимита: per-minute и per-hour
minute_limiter = TokenBucketRateLimiter(self.redis)
hour_limiter = TokenBucketRateLimiter(self.redis)
allowed_minute, minute_info = await minute_limiter.is_allowed(
f'{user_id}:rpm',
tier.rpm,
1.0, # refill_rate = rpm токенов в секунду
)
if not allowed_minute:
return False, {'limit': tier.rpm, 'window': 'minute', **minute_info}
allowed_hour, hour_info = await hour_limiter.is_allowed(
f'{user_id}:rph',
tier.rph,
tier.rph / 3600, # refill_rate = rph токенов в час
)
if not allowed_hour:
return False, {'limit': tier.rph, 'window': 'hour', **hour_info}
return True, {
'tier': tier.name,
'rpm_remaining': minute_info['remaining_tokens'],
'rph_remaining': hour_info['remaining_tokens'],
}
# Использование
async def get_user_tier(user_id: str) -> RateLimitTier:
"""Получение тира пользователя из БД"""
# В реальности — запрос к БД
return TIERS.get('free')
async def tiered_rate_limit(
request: Request,
limiter: TieredRateLimiter = Depends(lambda: TieredRateLimiter(app.state.redis)),
):
user_id = getattr(request.state, 'user_id', f'anon:{request.client.host}')
tier = await get_user_tier(user_id)
allowed, info = await limiter.check(f'user:{user_id}', tier)
if not allowed:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f'Rate limit exceeded for {info["window"]} window',
headers={'Retry-After': str(info.get('retry_after', 60))},
)
return infoclass RateLimitMetrics:
def __init__(self, redis_client):
self.redis = redis_client
async def record_rejection(self, key: str, reason: str):
"""Запись отклонённого запроса"""
await self.redis.incr(f'rate_limit:rejections:{reason}')
await self.redis.incr(f'rate_limit:rejections:{key}')
async def get_stats(self) -> dict:
"""Статистика rate limiting"""
rejections_total = int(await self.redis.get('rate_limit:rejections:total') or 0)
return {
'rejections_total': rejections_total,
'top_rejected_keys': await self._get_top_rejected(),
}
async def _get_top_rejected(self) -> list:
"""Топ ключей с наибольшим количеством отклонений"""
# Используем Sorted Set для хранения счёта отклонений
keys = await self.redis.zrevrange(
'rate_limit:rejections:leaderboard',
0,
9,
withscores=True,
)
return [{'key': k, 'rejections': int(s)} for k, s in keys]Проверьте понимание → ответьте на вопросы в rate_limiting.json
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.