Паттерны кэширования, кэш ответов API, инвалидация, cache stampede
Кэширование — самый распространённый use case Redis. Правильное кэширование может уменьшить нагрузку на БД на 90%+ и ускорить ответы API в разы. Но неправильное — приведёт к устаревшим данным и трудноотлавливаемым багам.
Самый распространённый паттерн: приложение сначала проверяет кэш, при промахе — читает из БД и записывает в кэш.
import redis.asyncio as redis
from typing import Optional, Dict, Any
class CacheAside:
def __init__(self, redis_client: redis.Redis, default_ttl: int = 300):
self.redis = redis_client
self.default_ttl = default_ttl
async def get_or_set(
self,
key: str,
fetch_func, # Async функция для получения данных
ttl: int | None = None,
) -> Any:
"""
Получение данных с кэшированием.
fetch_func вызывается только при cache miss.
"""
# Попытка получить из кэша
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
# Cache miss — получаем из источника
data = await fetch_func()
# Записываем в кэш
await self.redis.setex(
key,
ttl or self.default_ttl,
json.dumps(data),
)
return data
# Использование в FastAPI
from fastapi import FastAPI, Depends
app = FastAPI()
cache = CacheAside(app.state.redis, default_ttl=300)
async def get_user_from_db(user_id: int) -> dict:
"""Имитация запроса к БД"""
await asyncio.sleep(0.1) # Имитация задержки БД
return {'id': user_id, 'name': f'User {user_id}'}
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await cache.get_or_set(
f'user:{user_id}',
lambda: get_user_from_db(user_id),
ttl=600,
)
return userДанные записываются одновременно в кэш и БД. Гарантирует консистентность.
class WriteThroughCache:
def __init__(self, redis_client: redis.Redis, db_client, ttl: int = 300):
self.redis = redis_client
self.db = db_client
self.ttl = ttl
async def get(self, key: str) -> Optional[Dict]:
"""Чтение из кэша или БД"""
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
# Чтение из БД
data = await self.db.get(key)
if data:
await self.redis.setex(key, self.ttl, json.dumps(data))
return data
async def set(self, key: str, data: Dict):
"""Атомарная запись в кэш и БД"""
# Транзакция: записываем в оба хранилища
async with self.redis.pipeline(transaction=True) as pipe:
await pipe.setex(key, self.ttl, json.dumps(data))
await pipe.execute()
# Запись в БД (должна быть в той же транзакции на уровне БД)
await self.db.set(key, data)
async def delete(self, key: str):
"""Удаление из кэша и БД"""
async with self.redis.pipeline(transaction=True) as pipe:
await pipe.delete(key)
await pipe.execute()
await self.db.delete(key)Запись сначала в кэш, асинхронно — в БД. Высокая производительность записи, но риск потери данных.
import asyncio
from collections import OrderedDict
class WriteBehindCache:
def __init__(self, redis_client: redis.Redis, db_client, ttl: int = 300):
self.redis = redis_client
self.db = db_client
self.ttl = ttl
self._pending_writes: OrderedDict = OrderedDict()
self._write_task = None
async def start_background_writer(self, interval: float = 1.0):
"""Фоновая задача для сброса в БД"""
async def writer():
while True:
await asyncio.sleep(interval)
await self._flush_to_db()
self._write_task = asyncio.create_task(writer())
async def _flush_to_db(self):
"""Сброс накопленных записей в БД"""
if not self._pending_writes:
return
# Копируем и очищаем
to_write = dict(self._pending_writes)
self._pending_writes.clear()
# Пакетная запись в БД
for key, data in to_write.items():
await self.db.set(key, data)
async def set(self, key: str, data: Dict):
"""Запись в кэш и очередь на сброс в БД"""
# Быстрая запись в кэш
await self.redis.setex(key, self.ttl, json.dumps(data))
# Добавляем в очередь на сброс
self._pending_writes[key] = data
async def stop(self):
"""Остановка и финальный сброс"""
if self._write_task:
self._write_task.cancel()
try:
await self._write_task
except asyncio.CancelledError:
pass
# Финальный сброс
await self._flush_to_db()from functools import wraps
import hashlib
import json
def cache_response(ttl: int = 300):
"""Декоратор для кэширования ответов endpoint"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Получаем redis из args (обычно self или request)
redis_client = kwargs.get('redis') or args[0].redis
# Создаём ключ кэша из аргументов
cache_key_data = {
'func': func.__name__,
'args': args[1:], # Пропускаем self
'kwargs': kwargs,
}
key_string = json.dumps(cache_key_data, sort_keys=True, default=str)
cache_key = f'cache:{func.__name__}:{hashlib.md5(key_string.encode()).hexdigest()}'
# Проверка кэша
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Вызов функции
result = await func(*args, **kwargs)
# Кэширование результата
await redis_client.setex(
cache_key,
ttl,
json.dumps(result, default=str),
)
return result
return wrapper
return decorator
# Использование
class UserService:
def __init__(self, redis: redis.Redis):
self.redis = redis
@cache_response(ttl=600)
async def get_user(self, user_id: int) -> dict:
# Тяжёлый запрос к БД
user = await db.query('SELECT * FROM users WHERE id = $1', user_id)
return userfrom fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import hashlib
import json
class CacheMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app,
redis_client,
default_ttl: int = 300,
exclude_paths: set | None = None,
):
super().__init__(app)
self.redis = redis_client
self.default_ttl = default_ttl
self.exclude_paths = exclude_paths or {'/health', '/docs', '/openapi.json'}
def _create_cache_key(self, request: Request) -> str:
"""Создание уникального ключа кэша для запроса"""
# Включаем path, query params, и заголовки (если нужно)
key_data = {
'path': request.url.path,
'query': request.url.query,
# 'headers': dict(request.headers), # Если нужно учитывать заголовки
}
key_string = json.dumps(key_data, sort_keys=True)
key_hash = hashlib.md5(key_string.encode()).hexdigest()
return f'cache:http:{key_hash}'
async def dispatch(self, request: Request, call_next) -> Response:
# Только для GET запросов
if request.method != 'GET':
return await call_next(request)
# Исключаем определённые пути
if request.url.path in self.exclude_paths:
return await call_next(request)
cache_key = self._create_cache_key(request)
# Проверка кэша
cached_data = await self.redis.get(cache_key)
if cached_data:
cached = json.loads(cached_data)
response = Response(
content=json.dumps(cached['body']),
status_code=cached['status_code'],
media_type='application/json',
)
response.headers['X-Cache'] = 'HIT'
response.headers['X-Cache-Key'] = cache_key
return response
# Выполнение запроса
response = await call_next(request)
# Кэширование успешных ответов
if response.status_code == 200:
# Чтение тела ответа
body = b''
async for chunk in response.body_iterator:
body += chunk
try:
# Проверяем, что это JSON
body_data = json.loads(body)
# Кэшируем
await self.redis.setex(
cache_key,
self.default_ttl,
json.dumps({
'body': body_data,
'status_code': response.status_code,
}),
)
# Возвращаем ответ с телом
response = Response(
content=body,
status_code=response.status_code,
media_type=response.media_type,
)
response.headers['X-Cache'] = 'MISS'
except json.JSONDecodeError:
# Не JSON ответ — не кэшируем
pass
return response
# Использование в FastAPI
app = FastAPI()
@app.on_event("startup")
async def startup():
app.state.redis = redis.asyncio.from_url('redis://localhost:6379/0')
app.add_middleware(
CacheMiddleware,
redis_client=app.state.redis,
default_ttl=300,
)class CacheManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def invalidate_user_cache(self, user_id: int):
"""Инвалидация всего кэша пользователя"""
pattern = f'cache:user:{user_id}:*'
cursor = 0
while True:
cursor, keys = await self.redis.scan(cursor, match=pattern, count=100)
if keys:
await self.redis.delete(*keys)
if cursor == 0:
break
async def invalidate_pattern(self, pattern: str):
"""Инвалидация по паттерну"""
cursor = 0
while True:
cursor, keys = await self.redis.scan(cursor, match=pattern, count=100)
if keys:
await self.redis.delete(*keys)
if cursor == 0:
breakclass VersionedCache:
def __init__(self, redis_client: redis.Redis, version_key: str):
self.redis = redis_client
self.version_key = version_key
async def _get_version(self) -> str:
"""Получение текущей версии кэша"""
version = await self.redis.get(self.version_key)
return version or 'v1'
async def _make_key(self, key: str) -> str:
"""Создание ключа с версией"""
version = await self._get_version()
return f'{self.version_key}:{version}:{key}'
async def get(self, key: str) -> Optional[Any]:
"""Получение данных"""
full_key = await self._make_key(key)
data = await self.redis.get(full_key)
return json.loads(data) if data else None
async def set(self, key: str, data: Any, ttl: int = 300):
"""Запись данных"""
full_key = await self._make_key(key)
await self.redis.setex(full_key, ttl, json.dumps(data))
async def invalidate_all(self):
"""
Инвалидация всего кэша через инкремент версии.
Старые ключи останутся, но перестанут использоваться.
"""
await self.redis.incr(self.version_key)class CachedRepository:
def __init__(self, redis_client: redis.Redis, db_client, ttl: int = 300):
self.redis = redis_client
self.db = db_client
self.ttl = ttl
async def get_user(self, user_id: int) -> Optional[dict]:
"""Получение пользователя с кэшированием"""
cache_key = f'user:{user_id}'
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss
user = await self.db.get_user(user_id)
if user:
await self.redis.setex(cache_key, self.ttl, json.dumps(user))
return user
async def update_user(self, user_id: int, data: dict):
"""Обновление пользователя с инвалидацией кэша"""
# Обновление в БД
await self.db.update_user(user_id, data)
# Инвалидация кэша
await self.redis.delete(f'user:{user_id}')
# Опционально: обновить кэш сразу (write-through)
# updated_user = await self.db.get_user(user_id)
# await self.redis.setex(f'user:{user_id}', self.ttl, json.dumps(updated_user))
async def delete_user(self, user_id: int):
"""Удаление пользователя с инвалидацией кэша"""
await self.db.delete_user(user_id)
await self.redis.delete(f'user:{user_id}')Cache Stampede (Dog Piling) — проблема, когда множество запросов одновременно обнаруживают истечение кэша и все идут в БД.
import asyncio
class StampedeCache:
def __init__(self, redis_client: redis.Redis, ttl: int = 300):
self.redis = redis_client
self.ttl = ttl
self._locks: dict[str, asyncio.Lock] = {}
async def get_or_set(
self,
key: str,
fetch_func,
lock_ttl: int = 10,
) -> Any:
"""
Получение данных с защитой от stampede через mutex.
"""
# Попытка получить из кэша
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
# Кэша нет — пытаемся получить lock
lock_key = f'lock:{key}'
lock_value = str(asyncio.current_task())
# SETNX для получения lock
acquired = await self.redis.set(
lock_key,
lock_value,
nx=True,
ex=lock_ttl,
)
if acquired:
try:
# Мы получили lock — идём в БД
data = await fetch_func()
# Записываем в кэш
await self.redis.setex(key, self.ttl, json.dumps(data))
return data
finally:
# Освобождаем lock (только если он наш)
current_value = await self.redis.get(lock_key)
if current_value == lock_value:
await self.redis.delete(lock_key)
else:
# Lock у другого — ждём и пробуем снова
await asyncio.sleep(0.1)
return await self.get_or_set(key, fetch_func, lock_ttl)import random
class EarlyExpirationCache:
def __init__(self, redis_client: redis.Redis, base_ttl: int = 300, early_range: int = 60):
self.redis = redis_client
self.base_ttl = base_ttl
self.early_range = early_range
async def get_or_set(self, key: str, fetch_func) -> Any:
"""
Кэширование с вероятностным ранним обновлением.
Кэш обновляется до истечения TTL с некоторой вероятностью.
"""
cached = await self.redis.get(key)
if cached:
data = json.loads(cached)
# Проверяем, не пора ли обновлять заранее
ttl = await self.redis.ttl(key)
# Если осталось меньше early_range секунд, с вероятностью обновляем
if ttl > 0 and ttl < self.early_range:
# Вероятность обновления растёт с уменьшением TTL
update_probability = 1 - (ttl / self.early_range)
if random.random() < update_probability:
# Асинхронное обновление (fire and forget)
asyncio.create_task(self._refresh(key, fetch_func))
return data
# Cache miss
data = await fetch_func()
await self.redis.setex(key, self.base_ttl, json.dumps(data))
return data
async def _refresh(self, key: str, fetch_func):
"""Фоновое обновление кэша"""
try:
data = await fetch_func()
await self.redis.setex(key, self.base_ttl, json.dumps(data))
except Exception as e:
# Логирование ошибки, но не пробрасывание
print(f"Failed to refresh cache {key}: {e}")import uuid
class DistributedLockCache:
def __init__(self, redis_client: redis.Redis, ttl: int = 300):
self.redis = redis_client
self.ttl = ttl
async def _acquire_lock(self, lock_key: str, lock_value: str, ttl: int) -> bool:
"""Попытка получить распределённый lock"""
return await self.redis.set(
lock_key,
lock_value,
nx=True,
ex=ttl,
)
async def _release_lock(self, lock_key: str, lock_value: str):
"""Освобождение lock (только если он наш)"""
# Lua скрипт для атомарной проверки и удаления
lua_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
unlock = self.redis.register_script(lua_script)
await unlock(keys=[lock_key], args=[lock_value])
async def get_or_set(self, key: str, fetch_func) -> Any:
"""Получение данных с распределённым lock"""
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
lock_key = f'lock:{key}'
lock_value = str(uuid.uuid4())
# Попытка получить lock
acquired = await self._acquire_lock(lock_key, lock_value, ttl=10)
if acquired:
try:
# Двойная проверка кэша (другой воркер мог уже обновить)
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
# Идём в БД
data = await fetch_func()
await self.redis.setex(key, self.ttl, json.dumps(data))
return data
finally:
await self._release_lock(lock_key, lock_value)
else:
# Ждём пока другой воркер обновит кэш
await asyncio.sleep(0.1)
return await self.get_or_set(key, fetch_func)from fastapi import Request
class AuthAwareCache:
def __init__(self, redis_client: redis.Redis, ttl: int = 300):
self.redis = redis_client
self.ttl = ttl
def _create_key(self, request: Request, resource: str, resource_id: int) -> str:
"""Создание ключа кэша с учётом прав"""
# Получаем user_id из запроса (если есть аутентификация)
user_id = getattr(request.state, 'user_id', 'anonymous')
return f'cache:{resource}:{resource_id}:user:{user_id}'
async def get(self, request: Request, resource: str, resource_id: int) -> Optional[Any]:
"""Получение кэшированных данных"""
key = self._create_key(request, resource, resource_id)
cached = await self.redis.get(key)
return json.loads(cached) if cached else None
async def set(self, request: Request, resource: str, resource_id: int, data: Any):
"""Кэширование данных"""
key = self._create_key(request, resource, resource_id)
await self.redis.setex(key, self.ttl, json.dumps(data))
async def invalidate(self, resource: str, resource_id: int, user_id: str | None = None):
"""
Инвалидация кэша.
Если user_id не указан — инвалидируем для всех пользователей.
"""
if user_id:
key = f'cache:{resource}:{resource_id}:user:{user_id}'
await self.redis.delete(key)
else:
# Инвалидация для всех
pattern = f'cache:{resource}:{resource_id}:user:*'
cursor = 0
while True:
cursor, keys = await self.redis.scan(cursor, match=pattern, count=100)
if keys:
await self.redis.delete(*keys)
if cursor == 0:
breakclass CacheMetrics:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self._hits = 0
self._misses = 0
async def record_hit(self):
self._hits += 1
await self.redis.incr('cache:metrics:hits')
async def record_miss(self):
self._misses += 1
await self.redis.incr('cache:metrics:misses')
async def get_hit_rate(self) -> float:
"""Получение hit rate кэша"""
hits = int(await self.redis.get('cache:metrics:hits') or 0)
misses = int(await self.redis.get('cache:metrics:misses') or 0)
total = hits + misses
return hits / total if total > 0 else 0.0
async def reset_metrics(self):
"""Сброс метрик"""
await self.redis.delete('cache:metrics:hits', 'cache:metrics:misses')
async def get_stats(self) -> dict:
"""Статистика кэша"""
info = await self.redis.info('memory')
hits = int(await self.redis.get('cache:metrics:hits') or 0)
misses = int(await self.redis.get('cache:metrics:misses') or 0)
return {
'hits': hits,
'misses': misses,
'hit_rate': hits / (hits + misses) if (hits + misses) > 0 else 0,
'used_memory': info.get('used_memory_human', 'N/A'),
'used_memory_peak': info.get('used_memory_peak_human', 'N/A'),
}| Аспект | Рекомендация |
|---|---|
| TTL | Всегда устанавливайте TTL, даже большой (1-24 часа) |
| Именование ключей | Используйте namespace: cache:entity:id:field |
| Инвалидация | План инвалидации важнее плана кэширования |
| Stampede | Используйте mutex или early expiration для популярных ключей |
| Мониторинг | Отслеживайте hit rate, memory usage, eviction rate |
| Сериализация | JSON для простых данных, pickle для сложных объектов |
| Компрессия | Для больших объектов (>1KB) используйте сжатие |
Проверьте понимание → ответьте на вопросы в caching.json
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.