Redis для rate limiting и кэша, оптимизация запросов, фоновые задачи, ujson, gzip, мониторинг.
Научитесь защищать API от злоупотреблений, ускорять ответы через кэширование и оптимизировать производительность приложения
Rate limiting ограничивает количество запросов от клиента за период времени.
pip install aioredis# app/middleware/rate_limit.py
import time
from aiohttp import web
from typing import Dict
class RateLimiter:
def __init__(self, redis):
self.redis = redis
async def is_rate_limited(
self,
key: str,
max_requests: int,
window_seconds: int
) -> bool:
"""Проверка лимита через sliding window"""
now = time.time()
window_key = f"rate_limit:{key}:{int(now // window_seconds)}"
# Инкремент счётчика
current = await self.redis.incr(window_key)
# Установка TTL при первом запросе
if current == 1:
await self.redis.expire(window_key, window_seconds * 2)
return current > max_requests
@web.middleware
async def rate_limit_middleware(request, handler):
"""Middleware для rate limiting"""
# Получаем клиентский IP
client_ip = request.remote
# Исключаем некоторые роуты
exempt_paths = {'/health', '/metrics'}
if request.path in exempt_paths:
return await handler(request)
# Проверяем лимит
limiter: RateLimiter = request.app.get('rate_limiter')
if limiter:
# 100 запросов в минуту
limited = await limiter.is_rate_limited(
key=f"ip:{client_ip}",
max_requests=100,
window_seconds=60
)
if limited:
raise web.HTTPTooManyRequests(
text='Rate limit exceeded. Try again later.'
)
return await handler(request)Более плавное ограничение:
class TokenBucketLimiter:
def __init__(self, redis):
self.redis = redis
async def acquire(
self,
key: str,
capacity: int,
refill_rate: float
) -> bool:
"""
capacity: максимум токенов
refill_rate: токенов в секунду
"""
now = time.time()
bucket_key = f"bucket:{key}"
# Получаем текущее состояние
bucket = await self.redis.hgetall(bucket_key)
if not bucket:
# Новый бакет
tokens = capacity
last_update = now
else:
tokens = float(bucket[b'tokens'])
last_update = float(bucket[b'last_update'])
# Добавляем токены за прошедшее время
elapsed = now - last_update
tokens = min(capacity, tokens + elapsed * refill_rate)
# Пытаемся взять токен
if tokens >= 1:
tokens -= 1
await self.redis.hset(bucket_key, mapping={
'tokens': tokens,
'last_update': now
})
await self.redis.expire(bucket_key, 3600) # TTL 1 час
return True
else:
# Нет токенов — лимит превышен
await self.redis.hset(bucket_key, mapping={
'tokens': tokens,
'last_update': now
})
return False@web.middleware
async def user_rate_limit_middleware(request, handler):
"""Разные лимиты для разных тарифов"""
user = request.get('user')
if user:
# Получаем тариф пользователя
tariff = user.get('tariff', 'free')
limits = {
'free': {'requests': 100, 'window': 60},
'pro': {'requests': 1000, 'window': 60},
'enterprise': {'requests': 10000, 'window': 60}
}
limit = limits.get(tariff, limits['free'])
limiter = request.app['rate_limiter']
is_limited = await limiter.is_rate_limited(
key=f"user:{user['id']}",
max_requests=limit['requests'],
window_seconds=limit['window']
)
if is_limited:
raise web.HTTPTooManyRequests(
text=f'Rate limit exceeded for {tariff} plan'
)
return await handler(request)# app/services/cache.py
import json
import aioredis
from typing import Optional, Any
class CacheService:
def __init__(self, redis: aioredis.Redis):
self.redis = redis
self.default_ttl = 300 # 5 минут
def _make_key(self, prefix: str, *args) -> str:
"""Создание ключа кэша"""
key_parts = ':'.join(str(a) for a in args)
return f"{prefix}:{key_parts}"
async def get(self, key: str) -> Optional[Any]:
"""Получение из кэша"""
data = await self.redis.get(key)
if data:
return json.loads(data)
return None
async def set(
self,
key: str,
value: Any,
ttl: int = None
) -> None:
"""Запись в кэш"""
ttl = ttl or self.default_ttl
await self.redis.setex(
key,
ttl,
json.dumps(value)
)
async def delete(self, key: str) -> None:
"""Удаление из кэша"""
await self.redis.delete(key)
async def invalidate_pattern(self, pattern: str) -> None:
"""Инвалидация по паттерну"""
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
# Декоратор для кэширования
from functools import wraps
def cache(ttl: int = 300, prefix: str = 'cache'):
"""Декоратор для кэширования результатов функции"""
def decorator(func):
@wraps(func)
async def wrapper(self, *args, **kwargs):
cache_service: CacheService = self.cache
# Создание ключа
key = cache_service._make_key(
prefix,
func.__name__,
*args,
str(sorted(kwargs.items()))
)
# Попытка получить из кэша
cached = await cache_service.get(key)
if cached is not None:
return cached
# Вызов функции
result = await func(self, *args, **kwargs)
# Запись в кэш
await cache_service.set(key, result, ttl)
return result
return wrapper
return decorator# app/repositories/tasks.py
from ..services.cache import cache
class TaskRepository:
def __init__(self, pool, cache: CacheService):
self.pool = pool
self.cache = cache
@cache(ttl=60, prefix='tasks')
async def get(self, task_id: int, user_id: int) -> Optional[dict]:
"""Получение задачи с кэшированием"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT * FROM tasks WHERE id = $1 AND user_id = $2',
task_id, user_id
)
return dict(row) if row else None
async def create(self, **kwargs) -> dict:
"""Создание задачи с инвалидацией кэша"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'INSERT INTO tasks (...) VALUES (...) RETURNING *',
...
)
task = dict(row)
# Инвалидация кэша списка задач
await self.cache.invalidate_pattern('tasks:list:*')
return task
async def update(self, task_id: int, **kwargs) -> Optional[dict]:
"""Обновление с инвалидацией"""
updated = await self._update_db(task_id, **kwargs)
if updated:
# Инвалидация конкретной задачи
await self.cache.delete(f'tasks:get:{task_id}:*')
await self.cache.invalidate_pattern('tasks:list:*')
return updated@web.middleware
async def cache_middleware(request, handler):
"""Кэширование GET-запросов"""
# Только GET запросы
if request.method != 'GET':
return await handler(request)
# Проверка кэша
cache_key = f"http_cache:{request.path}:{request.query_string}"
cache_service = request.app.get('cache')
if cache_service:
cached = await cache_service.get(cache_key)
if cached:
response = web.json_response(cached)
response.headers['X-Cache'] = 'HIT'
return response
# Выполнение запроса
response = await handler(request)
# Кэширование ответа
if response.status == 200 and cache_service:
try:
data = await response.json()
await cache_service.set(cache_key, data, ttl=60)
response.headers['X-Cache'] = 'MISS'
except:
pass
return response# Оптимальные настройки пула
app['db'] = await asyncpg.create_pool(
dsn,
min_size=10, # Минимум соединений
max_size=50, # Максимум соединений
max_queries=50000,# Пересоздание после N запросов
max_inactive_connection_lifetime=300.0, # 5 минут
command_timeout=60
)# Использование prepared statements для частых запросов
async def init_prepared_statements(pool):
async with pool.acquire() as conn:
await conn.execute('''
PREPARE get_user_tasks(int, int, int) AS
SELECT * FROM tasks
WHERE user_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
''')
async def get_user_tasks(conn, user_id, limit, offset):
# Выполнение prepared statement
rows = await conn.fetch('get_user_tasks', user_id, limit, offset)
return [dict(r) for r in rows]-- Индексы для ускорения запросов
CREATE INDEX idx_tasks_user_id ON tasks(user_id);
CREATE INDEX idx_tasks_user_created ON tasks(user_id, created_at DESC);
CREATE INDEX idx_tasks_completed ON tasks(completed) WHERE completed = false;
CREATE INDEX idx_users_email ON users(email);
-- Composite индекс для частых запросов
CREATE INDEX idx_tasks_filter ON tasks(user_id, completed, created_at DESC);# app/middleware/query_logger.py
import time
import logging
from aiohttp import web
logger = logging.getLogger('queries')
@web.middleware
async def query_logger_middleware(request, handler):
"""Логирование времени выполнения запросов"""
start = time.perf_counter()
try:
response = await handler(request)
return response
finally:
duration = (time.perf_counter() - start) * 1000
if duration > 1000: # > 1 секунды
logger.warning(
f"Slow request: {request.method} {request.path} "
f"took {duration:.2f}ms"
)
else:
logger.debug(
f"Request: {request.method} {request.path} "
f"took {duration:.2f}ms"
)# Отправка email в фоне
async def send_email_task(app, user_id: int, template: str):
"""Фоновая задача отправки email"""
try:
# Получение данных пользователя
user = await app['user_repo'].get(user_id)
# Отправка email (внешний сервис)
await app['email_service'].send(user['email'], template)
except Exception as e:
logging.exception(f"Failed to send email: {e}")
async def create_user(request):
"""Создание пользователя с фоновой отправкой email"""
user = await create_user_in_db(request)
# Запуск фоновой задачи
asyncio.create_task(
send_email_task(request.app, user['id'], 'welcome')
)
return web.json_response(user, status=201)# Использование ujson для быстрой сериализации
# pip install ujson
from aiohttp import web
import ujson
async def fast_json_response(data, **kwargs):
"""Быстрый JSON response с ujson"""
return web.Response(
body=ujson.dumps(data),
content_type='application/json',
**kwargs
)# app/middleware/compression.py
from aiohttp import web
import gzip
@web.middleware
async def compression_middleware(request, handler):
"""Gzip сжатие ответов"""
response = await handler(request)
# Проверка поддержки клиентом
accept_encoding = request.headers.get('Accept-Encoding', '')
if 'gzip' in accept_encoding and response.status == 200:
body = response.body
if isinstance(body, bytes) and len(body) > 1024: # > 1KB
compressed = gzip.compress(body)
# Если сжатие выгодно
if len(compressed) < len(body):
response = web.Response(
body=compressed,
status=response.status,
headers={
**response.headers,
'Content-Encoding': 'gzip',
'Vary': 'Accept-Encoding'
}
)
return responsefrom aiohttp_prometheus import setup_metrics
from prometheus_client import Counter, Histogram, Gauge
# Счётчики
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
# Гистограмма времени ответа
REQUEST_LATENCY = Histogram(
'http_request_duration_seconds',
'HTTP request latency',
['method', 'endpoint']
)
# Gauge для активных соединений
ACTIVE_CONNECTIONS = Gauge(
'db_active_connections',
'Active database connections'
)
# Middleware для сбора метрик
@web.middleware
async def metrics_middleware(request, handler):
with REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.path
).time():
response = await handler(request)
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.path,
status=response.status
).inc()
return responseУбедитесь, что вы понимаете:
Переходите к вопросам для закрепления.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.