Стратегии кэширования, Redis, CDN, инвалидация кэша, cache stampede, cache penetration, cache avalanche.
Кэширование — это искусство балансировать между актуальностью данных и скоростью доступа. Правильная стратегия кэширования может ускорить приложение в 100 раз, неправильная — привести к потере данных и нестабильности.
Кэширование решает фундаментальную проблему: разница в скорости доступа к данным. Представьте иерархию хранения:
CPU Registers ~0.5 ns
L1 Cache ~1 ns
L2 Cache ~4 ns
RAM ~100 ns
SSD ~100 μs (1000x медленнее RAM)
HDD ~10 ms (100x медленнее SSD)
Network (LAN) ~1 ms
Network (WAN) ~100 ms
Database Query ~10-100 ms
External API ~100-1000 ms
Почему это важно на практике:
Представьте интернет-магазин с 10 000 посетителей в минуту. Каждый посетитель загружает главную страницу, которая требует:
Без кэширования: 10 000 × 7 = 70 000 запросов в минуту к базе данных и внешним сервисам. База данных начнёт тормозить при 1000-5000 запросов в секунду, время ответа вырастет с 50 мс до 5-10 секунд.
С кэшированием: Главная страница кэшируется на 1 минуту. Вместо 70 000 запросов — ~200 запросов (только для пользователей с устаревшим кэшем). Ускорение в 350 раз.
Кэш помещает часто используемые данные в более быстрое хранилище, сокращая время доступа.
| Компания | Что кэшируют | Эффект |
|---|---|---|
| Лента новостей, граф друзей | Снижение нагрузки на MySQL с 500K до 50K RPS | |
| Твиты звёзд, таймлайны | Ускорение загрузки ленты с 5с до 200мс | |
| Amazon | Цены товаров, наличие | 1 мс задержки = $1.6 млрд потерь в год |
| Фотографии, лайки, комментарии | 99% запросов обслуживаются из кэша |
Представьте библиотеку:
Кэширование — это когда вы кладёте часто используемые книги на стол, чтобы не ходить в хранилище каждый раз.
✅ Применяйте кэширование:
❌ Не применяйте кэширование:
Задайте себе вопросы:
1. Как часто данные читаются?
→ Редко (<1 раза в минуту) = НЕ кэшировать
2. Как часто данные меняются?
→ Чаще чем читаются = НЕ кэшировать
3. Насколько дорого получение данных?
→ Быстрый запрос (<10 мс) = возможно НЕ кэшировать
4. Насколько критична актуальность?
→ Требуется real-time = кэшировать с осторожностью
5. Какой объём данных?
→ Больше доступной памяти = нужна стратегия вытеснения
Золотое правило кэширования: Кэшируйте только то, что прошло нагрузочное тестирование и показало узкое место. Преждевременная оптимизация — корень всех зол.
Кэширование работает на разных уровнях архитектуры. Понимание каждого уровня помогает выбрать правильную стратегию.
Важная концепция: Кэширование работает как матрёшка — каждый уровень вложен в предыдущий. Запрос проходит путь от клиента к базе данных, останавливаясь на первом уровне, где найдены данные.
Клиент → CDN → Reverse Proxy → Application Cache → Database Cache → БД
Время прохождения:
Клиент (0 мс) → CDN (10 мс) → Proxy (20 мс) → App Cache (1 мс) → DB Cache (50 мс) → БД (100 мс)
| Уровень | Latency | Пропускная способность | Стоимость | Контроль |
|---|---|---|---|---|
| Browser | 0 мс | Зависит от клиента | Бесплатно | Низкий |
| CDN | 10-50 мс | Очень высокая | $0.01-0.1/GB | Средний |
| Reverse Proxy | 1-5 мс | Высокая | $/мес за сервер | Средний |
| Application (Redis) | 0.5-1 мс | Очень высокая | $/мес за инстанс | Полный |
| Database | 0.1-10 мс | Средняя | Включено в БД | Ограниченный |
Где: Браузер пользователя, мобильное приложение
Что кэширует: Статические ресурсы (HTML, CSS, JS, изображения), API-ответы
Как это работает в браузере:
Когда вы впервые заходите на сайт, браузер загружает все ресурсы (картинки, CSS, JS) и сохраняет их локально. При повторном посещении:
If-Modified-Since или If-None-Match304 Not Modified (без тела) или 200 с новыми даннымиДирективы Cache-Control:
| Директива | Значение | Пример использования |
|---|---|---|
public | Может кэшироваться браузером и CDN | Статика, публичные страницы |
private | Только браузер пользователя | Персональные данные |
no-store | Не кэшировать вообще | Платежи, пароли |
no-cache | Кэшировать, но проверять актуальность | Новости, блоги |
max-age=N | Кэшировать N секунд | Статика |
must-revalidate | Всегда проверять после истечения | Финансовые данные |
Пример HTTP-заголовков:
Cache-Control: public, max-age=3600
ETag: "33a64df551425fcc55e4d42a148795d9f25f89d4"
Last-Modified: Wed, 21 Oct 2025 07:28:00 GMT# FastAPI пример настройки кэш-заголовков
from fastapi import FastAPI, Response
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get("/api/users/{user_id}")
async def get_user(user_id: int, response: Response):
# Кэшируем на 1 час, публично (может кэшироваться CDN)
response.headers["Cache-Control"] = "public, max-age=3600"
return {"user_id": user_id, "name": "John"}
@app.get("/api/account/balance")
async def get_balance(response: Response):
# Не кэшируем чувствительные данные
response.headers["Cache-Control"] = "no-store, no-cache"
return {"balance": 1000}Преимущества:
Недостатки:
Где: Пограничные серверы CDN (Cloudflare, AWS CloudFront, Akamai)
Что кэширует: Статический контент, медиафайлы, HTML-страницы
Как работает CDN:
CDN — это географически распределённая сеть серверов. Когда пользователь запрашивает ресурс:
Пример географии:
Пользователь в Москве → CDN сервер Moscow Edge → 5 мс
Пользователь в Лондоне → CDN сервер London Edge → 10 мс
Пользователь в Нью-Йорке → CDN сервер NYC Edge → 15 мс
Без CDN: все запросы идут на origin в Москве → 100-300 мс
# Пример: Cloudflare Cache Rules через API
import requests
def set_cdn_cache_rule():
"""Настройка правил кэширования Cloudflare"""
url = "https://api.cloudflare.com/client/v4/zones/{ZONE_ID}/rules/cache_rules"
headers = {"Authorization": "Bearer {API_TOKEN}"}
data = {
"id": "cache_static_assets",
"action": "cache",
"expression": "(http.request.uri.path matches \"^/static/.*\")",
"cache_ttl": 86400, # 24 часа
}
requests.put(url, json=data, headers=headers)Преимущества:
Недостатки:
Где: Между клиентом и приложением (на уровне load balancer или отдельного сервера)
Что кэширует: HTTP-ответы приложения (полные страницы, API-ответы)
Архитектура запроса:
Клиент → Nginx (проверяет кэш) → [кэш miss] → Backend App → БД
→ [кэш hit] → Возвращает из кэша
Почему Reverse Proxy эффективен:
Пример Nginx конфигурации:
# nginx.conf
proxy_cache_path /var/cache/nginx levels=1:2 keys_zone=app_cache:10m
max_size=1g inactive=60m use_temp_path=off;
server {
location /api/ {
proxy_pass http://backend:8000;
proxy_cache app_cache;
proxy_cache_valid 200 10m; # Кэшируем 200 ответы 10 минут
proxy_cache_valid 404 1m; # 404 кэшируем 1 минуту
proxy_cache_key $scheme$request_method$host$request_uri;
add_header X-Cache-Status $upstream_cache_status;
}
# Endpoint для принудительной инвалидации
location /purge/ {
allow 127.0.0.1;
deny all;
proxy_cache_purge app_cache $scheme$request_method$host$request_uri;
}
}Преимущества:
Недостатки:
Где: Внутри приложения, как слой между бизнес-логикой и БД
Что кэширует: Результаты запросов к БД, сессии, вычисления
Почему Redis так быстр:
Redis хранит данные в оперативной памяти (RAM), что даёт доступ за ~1 мс. Для сравнения:
Redis vs Memcached:
| Характеристика | Redis | Memcached |
|---|---|---|
| Структуры данных | Строки, хэши, списки, сеты, sorted sets | Только строки |
| Персистентность | Есть (RDB/AOF) | Нет |
| Репликация | Есть | Нет |
| Кластеризация | Встроенная | Через клиент |
| Pub/Sub | Есть | Нет |
| Использование | Кэш + очередь + БД | Только кэш |
Когда что выбирать:
# Пример с Redis (подробнее ниже)
import redis
import json
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
async def get_user_profile(user_id: int):
# Проверяем кэш
cached = redis_client.get(f"user:{user_id}:profile")
if cached:
return json.loads(cached)
# Кэш-мисс — читаем из БД
user = await db.query("SELECT * FROM users WHERE id = $1", user_id)
# Сохраняем в кэш на 1 час
redis_client.setex(
f"user:{user_id}:profile",
3600,
json.dumps(user)
)
return userПреимущества:
Недостатки:
Где: Внутри СУБД
Что кэширует: Результаты запросов, индексы, планы выполнения
Уровни кэширования в PostgreSQL:
┌─────────────────────────────────────────┐
│ Shared Buffers (4 GB) │ ← Кэш страниц данных
├─────────────────────────────────────────┤
│ OS Page Cache (~8 GB) │ ← Кэш операционной системы
├─────────────────────────────────────────┤
│ Query Plan Cache │ ← Кэш планов выполнения
├─────────────────────────────────────────┤
│ WAL Buffers │ ← Кэш журнала транзакций
└─────────────────────────────────────────┘
Как это работает:
Примеры:
-- PostgreSQL: настройка shared_buffers (кэш страниц)
-- postgresql.conf
shared_buffers = 4GB # Кэш данных в памяти
effective_cache_size = 12GB # Оценка общего кэша ОС + БД
work_mem = 256MB # Память для сортировок, хэшей
-- Materialized Views для кэширования сложных запросов
CREATE MATERIALIZED VIEW user_stats AS
SELECT
user_id,
COUNT(*) as order_count,
SUM(amount) as total_spent
FROM orders
GROUP BY user_id;
-- Обновление по расписанию
REFRESH MATERIALIZED VIEW CONCURRENTLY user_stats;
-- Query Cache (MySQL, удалён в 8.0, но есть аналоги)
-- Результат одинаковых SELECT кэшируетсяПреимущества:
Недостатки:
Выбор паттерна определяет, как данные попадают в кэш и как синхронизируются с источником истины.
Важно: Не существует «лучшего» паттерна. Выбор зависит от соотношения чтения/записи и требований к актуальности.
| Паттерн | Чтение | Запись | Актуальность | Сложность |
|---|---|---|---|---|
| Cache-Aside | Быстрое | Быстрая | Возможна задержка | Низкая |
| Write-Through | Быстрое | Медленная | Всегда актуально | Средняя |
| Write-Behind | Быстрое | Очень быстрая | Задержка до flush | Высокая |
| Write-Around | Медленнее* | Быстрая | Актуально после чтения | Низкая |
*Первое чтение после записи — cache miss
Вопрос 1: Какое соотношение чтения и записи?
├─ Чтение >> Запись (90/10) → Cache-Aside
├─ Чтение ≈ Запись (50/50) → Write-Through
└─ Запись >> Чтение → Write-Behind
Вопрос 2: Насколько критична актуальность?
├─ Критична (финансы) → Write-Through
├─ Допустима задержка (соцсеть) → Cache-Aside
└─ Задержка ОК (аналитика) → Write-Behind
Вопрос 3: Есть ли редкие чтения после записи?
├─ Да → Write-Around (не засорять кэш)
└─ Нет → Write-Through или Write-Behind
Самый распространённый паттерн. Приложение сначала проверяет кэш, при промахе — читает из БД и заполняет кэш.
Поток запроса:
Запрос: "Получить профиль пользователя #123"
↓
┌─────────────────┐
│ Проверка кэша │
└────────┬────────┘
│
┌────┴────┐
│ │
HIT MISS
│ │
↓ ↓
┌────────┐ ┌──────────┐
│ Вернуть│ │ Чтение │
│ из кэша│ │ из БД │
│ (1 мс) │ │ (50 мс) │
└────────┘ └────┬─────┘
│
↓
┌──────────┐
│ Записать │
│ в кэш │
└────┬─────┘
│
↓
┌──────────┐
│ Вернуть │
│ клиенту │
└──────────┘
Почему называется "Lazy" (ленивый): Кэш заполняется только тогда, когда данные действительно запрашивают. Не тратится память на данные, которые никто не читает.
import redis
import json
from typing import Optional, Dict, Any
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
async def get_user_cache_aside(user_id: int) -> Optional[Dict[str, Any]]:
"""
Cache-Aside паттерн:
1. Проверяем кэш
2. При cache miss — читаем из БД
3. Записываем в кэш
"""
cache_key = f"user:{user_id}"
# Шаг 1: Проверка кэша
cached_data = redis_client.get(cache_key)
if cached_data:
print(f"Cache HIT for user {user_id}")
return json.loads(cached_data)
# Шаг 2: Cache MISS — чтение из БД
print(f"Cache MISS for user {user_id}, reading from DB")
user_data = await db.fetch_user(user_id)
if user_data is None:
# Кэшируем отсутствие данных (защита от cache penetration)
redis_client.setex(f"{cache_key}:null", 60, "null")
return None
# Шаг 3: Запись в кэш с TTL
redis_client.setex(cache_key, 3600, json.dumps(user_data))
return user_dataПреимущества:
Недостатки:
Когда использовать:
Данные записываются одновременно в кэш и БД. Кэш всегда актуален.
Поток записи:
Запрос: "Обновить email пользователя #123"
↓
┌─────────────────┐
│ Запись в БД │ ← Транзакция
│ (50 мс) │
└────────┬────────┘
│
│ Успех
↓
┌─────────────────┐
│ Запись в кэш │ ← Синхронно
│ (1 мс) │
└────────┬────────┘
│
↓
┌─────────────────┐
│ Вернуть OK │
└─────────────────┘
Общее время записи: ~51 мс
Гарантия консистентности: Если запись в БД успешна, но запись в кэш провалилась — вся операция откатывается. Кэш никогда не содержит данных, которых нет в БД.
async def update_user_write_through(user_id: int, data: Dict[str, Any]) -> None:
"""
Write-Through паттерн:
1. Обновляем БД
2. Обновляем кэш
3. Оба источника синхронизированы
"""
cache_key = f"user:{user_id}"
# Транзакция: БД + кэш
async with db.transaction():
# Шаг 1: Запись в БД
await db.execute(
"UPDATE users SET name = $1, email = $2 WHERE id = $3",
data["name"], data["email"], user_id
)
# Шаг 2: Запись в кэш (после успешной БД)
user_data = {**data, "id": user_id}
redis_client.setex(cache_key, 3600, json.dumps(user_data))Преимущества:
Недостатки:
Когда использовать:
Запись сначала в кэш, асинхронно — в БД. Максимальная производительность записи.
Поток записи:
Запрос: "Обновить email пользователя #123"
↓
┌─────────────────┐
│ Запись в кэш │ ← Быстро (1 мс)
│ (1 мс) │
└────────┬────────┘
│
│ Возвращаем OK сразу!
↓
┌─────────────────┐
│ Вернуть OK │ ← Клиент свободен
└─────────────────┘
│
│ Параллельно (асинхронно)
↓
┌─────────────────┐
│ Очередь на │ ← Пакетное накопление
│ запись в БД │
└────────┬────────┘
│
│ Раз в 1 секунду ИЛИ 100 записей
↓
┌─────────────────┐
│ Пакетная │ ← 100 записей за 50 мс
│ запись в БД │ (вместо 100 × 50 мс)
└─────────────────┘
Почему это быстрее:
Риск потери данных:
Запись в кэш: ✅ Успех
Возврат клиенту: ✅ OK
Сбой питания: ❌ Данные в кэше потеряны, в БД не записаны
Решение: Redis с персистентностью (AOF — append-only file)
import asyncio
from collections import deque
import json
class WriteBehindCache:
"""
Write-Behind паттерн:
1. Запись только в кэш
2. Асинхронная очередь на запись в БД
3. Пакетная запись для эффективности
"""
def __init__(self, batch_size: int = 100, flush_interval: float = 1.0):
self.redis = redis.Redis(host='localhost', port=6379)
self.write_queue = deque()
self.batch_size = batch_size
self.flush_interval = flush_interval
async def update_user(self, user_id: int, data: Dict[str, Any]) -> None:
cache_key = f"user:{user_id}"
# Шаг 1: Быстрая запись в кэш
self.redis.setex(cache_key, 3600, json.dumps(data))
# Шаг 2: Добавляем в очередь на запись в БД
self.write_queue.append((user_id, data))
# Шаг 3: Пакетная запись при заполнении
if len(self.write_queue) >= self.batch_size:
await self._flush_to_db()
async def _flush_to_db(self):
"""Асинхронная пакетная запись в БД"""
if not self.write_queue:
return
batch = list(self.write_queue)[:self.batch_size]
del self.write_queue[:self.batch_size]
# Пакетный UPDATE
await db.execute_many(
"UPDATE users SET name = $1, email = $2 WHERE id = $3",
[(d["name"], d["email"], uid) for uid, d in batch]
)
async def background_flush(self):
"""Фоновая задача для периодической записи"""
while True:
await asyncio.sleep(self.flush_interval)
await self._flush_to_db()Преимущества:
Недостатки:
Когда использовать:
Запись только в БД, кэш инвалидируется. Следующее чтение заполнит кэш актуальными данными.
Поток записи:
Запрос: "Обновить email пользователя #123"
↓
┌─────────────────┐
│ Запись в БД │
│ (50 мс) │
└────────┬────────┘
│
↓
┌─────────────────┐
│ Удалить из кэша│ ← Инвалидация
│ (1 мс) │
└────────┬────────┘
│
↓
┌─────────────────┐
│ Вернуть OK │
└─────────────────┘
Что происходит при следующем чтении:
Запрос: "Получить профиль #123"
↓
┌─────────────────┐
│ Проверка кэша │ ← Ключ удалён!
└────────┬────────┘
│
MISS
│
↓
┌─────────────────┐
│ Чтение из БД │ ← Актуальные данные
│ (50 мс) │
└────────┬────────┘
│
↓
┌─────────────────┐
│ Записать в кэш │ ← Заполняем заново
└─────────────────┘
Почему это полезно:
async def update_user_write_around(user_id: int, data: Dict[str, Any]) -> None:
"""
Write-Around паттерн:
1. Запись в БД
2. Инвалидация кэша
3. Следующее чтение заполнит кэш актуальными данными
"""
cache_key = f"user:{user_id}"
# Шаг 1: Запись в БД
await db.execute(
"UPDATE users SET name = $1, email = $2 WHERE id = $3",
data["name"], data["email"], user_id
)
# Шаг 2: Инвалидация кэша (удаление)
redis_client.delete(cache_key)
# Или пометка как устаревший
# redis_client.expire(cache_key, 0)Преимущества:
Недостатки:
Когда использовать:
Redis — самый популярный in-memory кэш для Python-приложений.
# redis_client.py
import redis
from redis.asyncio import Redis as AsyncRedis
from typing import Optional
import json
# Синхронный клиент
redis_sync = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
health_check_interval=30,
)
# Асинхронный клиент (для FastAPI, asyncio)
redis_async: Optional[AsyncRedis] = None
async def get_redis_async() -> AsyncRedis:
global redis_async
if redis_async is None:
redis_async = AsyncRedis(
host='localhost',
port=6379,
db=0,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
)
return redis_async# cache_service.py
from datetime import timedelta
from typing import Any, Optional, Callable, TypeVar
import functools
import hashlib
T = TypeVar('T')
class CacheService:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.default_ttl = timedelta(hours=1)
def _serialize(self, value: Any) -> str:
"""Сериализация значения для хранения в Redis"""
return json.dumps(value, default=str)
def _deserialize(self, value: Optional[str]) -> Any:
"""Десериализация значения из Redis"""
if value is None:
return None
return json.loads(value)
def get(self, key: str) -> Optional[Any]:
"""Получение значения из кэша"""
return self._deserialize(self.redis.get(key))
def set(
self,
key: str,
value: Any,
ttl: Optional[timedelta] = None
) -> None:
"""Запись значения в кэш с TTL"""
ttl_seconds = int(ttl.total_seconds()) if ttl else int(self.default_ttl.total_seconds())
self.redis.setex(key, ttl_seconds, self._serialize(value))
def delete(self, key: str) -> None:
"""Удаление значения из кэша"""
self.redis.delete(key)
def exists(self, key: str) -> bool:
"""Проверка существования ключа"""
return self.redis.exists(key) > 0
def invalidate_pattern(self, pattern: str) -> None:
"""Инвалидация по паттерну (осторожно в production!)"""
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)# async_cache.py
import functools
import hashlib
import json
from typing import Any, Callable, Optional
from datetime import timedelta
from redis.asyncio import Redis
def async_cache(
prefix: str = "",
ttl: timedelta = timedelta(hours=1),
key_builder: Optional[Callable[..., str]] = None,
skip_conditions: Optional[Callable[..., bool]] = None,
):
"""
Асинхронный декоратор для кэширования результатов функций.
Args:
prefix: Префикс для ключа кэша
ttl: Время жизни кэша
key_builder: Функция для построения ключа (по умолчанию — хэш аргументов)
skip_conditions: Функция, возвращающая True для пропуска кэширования
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Проверка условия пропуска
if skip_conditions and skip_conditions(*args, **kwargs):
return await func(*args, **kwargs)
# Построение ключа
if key_builder:
cache_key = f"{prefix}:{key_builder(*args, **kwargs)}"
else:
# Хэш от аргументов
key_data = json.dumps({"args": args, "kwargs": kwargs}, default=str, sort_keys=True)
key_hash = hashlib.md5(key_data.encode()).hexdigest()
cache_key = f"{prefix}:{func.__name__}:{key_hash}"
redis_client = await get_redis_async()
# Проверка кэша
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Вызов функции
result = await func(*args, **kwargs)
# Запись в кэш
if result is not None:
await redis_client.setex(
cache_key,
int(ttl.total_seconds()),
json.dumps(result, default=str)
)
return result
return wrapper
return decorator
# Пример использования
@async_cache(
prefix="user",
ttl=timedelta(minutes=30),
key_builder=lambda user_id: f"profile:{user_id}"
)
async def get_user_profile(user_id: int) -> dict:
"""Получение профиля пользователя с кэшированием"""
# Эмуляция запроса к БД
await asyncio.sleep(0.1)
return {"user_id": user_id, "name": f"User {user_id}"}async def get_paginated_items(
page: int = 1,
page_size: int = 20,
filters: Optional[dict] = None
) -> dict:
"""
Кэширование пагинированных результатов.
Важно: кэшировать каждую страницу отдельно.
"""
filters = filters or {}
# Ключ включает параметры пагинации
cache_key = f"items:page:{page}:size:{page_size}:filters:{hashlib.md5(json.dumps(filters, sort_keys=True).encode()).hexdigest()}"
redis_client = await get_redis_async()
# Проверка кэша
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Запрос к БД
offset = (page - 1) * page_size
items = await db.fetch_all(
"SELECT * FROM items WHERE ... LIMIT $1 OFFSET $2",
page_size, offset
)
total = await db.fetch_val("SELECT COUNT(*) FROM items WHERE ...")
result = {
"items": items,
"page": page,
"page_size": page_size,
"total": total,
"pages": (total + page_size - 1) // page_size
}
# Кэширование на 5 минут (страницы могут меняться)
await redis_client.setex(cache_key, 300, json.dumps(result, default=str))
return resultИнвалидация — самая сложная часть кэширования. Неправильная стратегия приводит к stale data или избыточной нагрузке на БД.
Автоматическая инвалидация по времени. Простейшая стратегия.
# Разные TTL для разных типов данных
TTL_CONFIG = {
"user_profile": timedelta(hours=1), # Профиль меняется редко
"user_session": timedelta(days=7), # Сессия живёт неделю
"product_price": timedelta(minutes=5), # Цены могут меняться
"api_response": timedelta(minutes=1), # Динамические данные
"null_result": timedelta(minutes=1), # Кэш отсутствующих данных
}
async def cache_with_ttl(key: str, fetch_func: Callable, data_type: str):
"""Кэширование с TTL в зависимости от типа данных"""
redis_client = await get_redis_async()
cached = await redis_client.get(key)
if cached:
return json.loads(cached)
result = await fetch_func()
ttl = TTL_CONFIG.get(data_type, timedelta(hours=1))
await redis_client.setex(
key,
int(ttl.total_seconds()),
json.dumps(result, default=str)
)
return resultПреимущества:
Недостатки:
Приложение явно удаляет кэш при изменении данных.
class UserService:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def update_user(self, user_id: int, data: dict) -> None:
"""Обновление пользователя с инвалидацией кэша"""
# Запись в БД
await db.execute("UPDATE users SET ... WHERE id = $1", user_id)
# Явная инвалидация
await self.redis.delete(f"user:{user_id}:profile")
await self.redis.delete(f"user:{user_id}:settings")
# Инвалидация связанных списков
await self.redis.delete("users:list:recent")
async def delete_user(self, user_id: int) -> None:
"""Удаление пользователя"""
await db.execute("DELETE FROM users WHERE id = $1", user_id)
# Полная очистка всех ключей пользователя
pattern = f"user:{user_id}:*"
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)Преимущества:
Недостатки:
Инвалидация через события/сообщения. Подходит для микросервисов.
# producer.py — сервис, изменяющий данные
async def update_user_and_publish(user_id: int, data: dict):
"""Обновление и публикация события"""
await db.execute("UPDATE users SET ... WHERE id = $1", user_id)
# Публикация события в Redis Pub/Sub или Kafka
await redis_client.publish(
"user.updated",
json.dumps({"user_id": user_id, "action": "invalidate_cache"})
)
# Или через Kafka
# await kafka_producer.send("cache-invalidation", {
# "entity": "user",
# "id": user_id,
# "action": "invalidate"
# })
# consumer.py — сервис, потребляющий события
async def cache_invalidation_listener():
"""Слушатель событий инвалидации"""
pubsub = redis_client.pubsub()
await pubsub.subscribe("user.updated", "product.updated", "order.updated")
async for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
await handle_invalidation_event(event)
async def handle_invalidation_event(event: dict):
"""Обработка события инвалидации"""
entity = event.get("entity")
entity_id = event.get("id")
if entity == "user":
await redis_client.delete(f"user:{entity_id}:profile")
await redis_client.delete(f"user:{entity_id}:settings")
elif entity == "product":
await redis_client.delete(f"product:{entity_id}")
await redis_client.delete(f"product:{entity_id}:stock")Преимущества:
Недостатки:
Изменение версии ключа при изменении схемы данных.
class VersionedCache:
"""Кэширование с версионированием ключей"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.versions = {
"user_profile": 1,
"product_list": 3, # Версия увеличена после изменения схемы
}
def _make_key(self, entity: str, identifier: str) -> str:
version = self.versions.get(entity, 1)
return f"{entity}:v{version}:{identifier}"
async def get_user(self, user_id: int) -> Optional[dict]:
key = self._make_key("user_profile", str(user_id))
cached = await self.redis.get(key)
return json.loads(cached) if cached else None
async def invalidate_all_users(self):
"""Инвалидация всех пользователей (увеличение версии)"""
self.versions["user_profile"] += 1
# Старые ключи удалятся автоматически по TTL
# Или можно явно удалить: await self.redis.delete("user_profile:v1:*")Преимущества:
Недостатки:
Проблема: Когда кэш истекает, множество одновременных запросов обнаруживают cache miss и одновременно идут в БД, вызывая перегрузку.
Наглядная аналогия:
Представьте популярный кофе-шоп у метро:
Время 0: Кэш для user:123 истекает
Время 0: Запрос 1 → cache miss → БД
Время 0: Запрос 2 → cache miss → БД
Время 0: Запрос 3 → cache miss → БД
...
Время 0: Запрос 100 → cache miss → БД
Результат: 100 одновременных запросов к БД
Почему это происходит в продакшене:
Популярный товар на маркетплейсе (iPhone 15)
- Кэш установлен на 1 час после обновления цены
- 13:00 — кэш истекает
- 13:00:00 — 10 000 пользователей одновременно открывают страницу
- 10 000 запросов → cache miss
- 10 000 запросов к БД за 100 мс
- БД не выдерживает (рассчитана на 1000 RPS)
- Время ответа: 50 мс → 5000 мс (в 100 раз медленнее)
- Пользователи видят таймауты, уходят к конкурентам
import asyncio
from redis.asyncio.lock import Lock
async def get_user_with_lock(user_id: int) -> dict:
"""Cache-aside с блокировкой для предотвращения stampede"""
redis_client = await get_redis_async()
cache_key = f"user:{user_id}"
lock_key = f"lock:{cache_key}"
# Проверка кэша
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Попытка захватить блокировку
async with Lock(redis_client, lock_key, timeout=10, blocking_timeout=5):
# Двойная проверка после получения блокировки
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Только один запрос идёт в БД
user_data = await db.fetch_user(user_id)
# Запись в кэш
await redis_client.setex(cache_key, 3600, json.dumps(user_data, default=str))
return user_dataИдея: Заранее обновлять кэш с некоторой вероятностью, чтобы не допустить одновременного истечения.
Как это работает:
Кэш установлен на 3600 секунд (1 час)
Последние 10% TTL (последние 6 минут) — «опасная зона»
Когда запрос попадает в опасную зону:
- С вероятностью 20% запускаем фоновое обновление
- Возвращаем старые данные (но кэш уже обновляется)
Пример:
13:00 — кэш создан (TTL = 3600 сек)
13:54 — осталось 6 минут (10% от 3600)
13:55 — запрос попал в опасную зону
→ random() = 0.15 < 0.20 → запускаем обновление
→ возвращаем кэш (ещё действителен)
→ через 1 сек кэш обновлён на 1 час вперёд
13:56 — кэш уже с TTL до 14:56, опасности нет
Визуализация:
TTL: |━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━|
0 мин 60 мин
↑
«опасная зона»
(последние 10%)
Запрос в опасной зоне:
→ 20% запросов запускают обновление
→ 80% запросов просто используют кэш
import random
from datetime import timedelta
async def get_user_early_expire(user_id: int) -> dict:
"""
Раннее обновление кэша с вероятностью.
20% запросов в последние 10% TTL инициируют фоновое обновление.
"""
redis_client = await get_redis_async()
cache_key = f"user:{user_id}"
cached = await redis_client.get(cache_key)
if cached:
data = json.loads(cached)
ttl = await redis_client.ttl(cache_key)
# Если осталось меньше 10% TTL и случайное срабатывание
if ttl > 0 and ttl < 360 and random.random() < 0.2:
# Фоновое обновление (не блокируя ответ)
asyncio.create_task(refresh_cache_background(cache_key, user_id))
return data
# Cache miss
user_data = await db.fetch_user(user_id)
await redis_client.setex(cache_key, 3600, json.dumps(user_data, default=str))
return user_data
async def refresh_cache_background(cache_key: str, user_id: int):
"""Фоновое обновление кэша"""
try:
user_data = await db.fetch_user(user_id)
redis_client = await get_redis_async()
await redis_client.setex(cache_key, 3600, json.dumps(user_data, default=str))
except Exception as e:
# Логирование ошибки, но не прерывание основного потока
logger.error(f"Background cache refresh failed: {e}")Идея из RFC 5861: Возвращать устаревшие данные, пока фоновая задача обновляет кэш.
Как это работает:
Этот паттерн заимствован из HTTP-кэширования (RFC 5861). Название переводится как «Устаревший, пока обновляется».
Два уровня кэша:
1. Основной кэш (свежий) — TTL 1 час
2. Stale кэш (устаревший) — TTL 24 часа
Поток запроса:
┌──────────────────────────────────────────┐
│ Запрос: получить профиль #123 │
└────────────────────┬─────────────────────┘
│
↓
┌──────────────────────┐
│ Основной кэш пуст? │
└──────────┬───────────┘
│
┌──────┴──────┐
│ │
НЕТ ДА
│ │
↓ ↓
┌──────────────┐ ┌──────────────────┐
│ Вернуть из │ │ Stale кэш пуст? │
│ основного │ └────────┬─────────┘
└──────────────┘ │
┌───────┴────────┐
│ │
НЕТ ДА
│ │
↓ ↓
┌─────────────────┐ ┌──────────┐
│ Вернуть stale │ │ Чтение │
│ + обновить │ │ из БД │
│ фонем │ └──────────┘
└─────────────────┘
Пример из жизни:
Представьте ресторан:
Если пришло много гостей и основное меню не готово:
async def get_user_stale_while_revalidate(user_id: int) -> dict:
"""
Возвращаем устаревший кэш, пока фоновая задача обновляет данные.
Паттерн из RFC 5861.
"""
redis_client = await get_redis_async()
cache_key = f"user:{user_id}"
stale_key = f"{cache_key}:stale"
# Проверяем основной кэш
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Проверяем устаревший кэш
stale = await redis_client.get(stale_key)
if stale:
# Возвращаем stale, запускаем обновление
asyncio.create_task(refresh_cache_background(cache_key, user_id))
return json.loads(stale)
# Полностью нет данных — идём в БД
user_data = await db.fetch_user(user_id)
await redis_client.setex(cache_key, 3600, json.dumps(user_data, default=str))
return user_dataПроблема: Злоумышленник или сбой запрашивают несуществующие ключи, которые не кэшируются. Все запросы идут в БД.
Запрос user:999999999 → cache miss → БД (нет такого пользователя)
Запрос user:888888888 → cache miss → БД (нет такого пользователя)
...
10000 запросов к несуществующим ключам = DDoS на БД
async def get_user_prevent_penetration(user_id: int) -> Optional[dict]:
"""Кэширование отсутствующих данных для защиты от penetration"""
redis_client = await get_redis_async()
cache_key = f"user:{user_id}"
cached = await redis_client.get(cache_key)
if cached:
if cached == "NULL_MARKER":
return None # Кэшированное отсутствие
return json.loads(cached)
# Запрос к БД
user_data = await db.fetch_user(user_id)
if user_data is None:
# Кэшируем отсутствие на короткое время
await redis_client.setex(f"{cache_key}", 60, "NULL_MARKER")
return None
await redis_client.setex(cache_key, 3600, json.dumps(user_data, default=str))
return user_dataЧто такое Bloom Filter?
Bloom Filter (фильтр Блума) — это вероятностная структура данных, которая позволяет быстро проверить, точно не существует ли элемент.
Как это работает:
Представьте битовый массив из 1 миллиона бит (все изначально 0) и 3 хэш-функции:
Инициализация:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...] ← 1 000 000 бит
Добавляем user_id = 123:
hash1(123) = позиция 5 → устанавливаем бит 5 в 1
hash2(123) = позиция 42 → устанавливаем бит 42 в 1
hash3(123) = позиция 100 → устанавливаем бит 100 в 1
[0, 0, 0, 0, 0, 1, 0, 0, ..., 0, 1, ..., 0, 1, ...]
↑ ↑ ↑
позиция 5 позиция 42 позиция 100
Добавляем user_id = 456:
hash1(456) = позиция 7 → устанавливаем бит 7 в 1
hash2(456) = позиция 42 ← УЖЕ 1! (коллизия)
hash3(456) = позиция 200 → устанавливаем бит 200 в 1
Проверяем user_id = 999:
hash1(999) = позиция 15 → бит 15 = 0
hash2(999) = позиция 50 → бит 50 = 0
hash3(999) = позиция 300 → бит 300 = 0
Хотя бы один бит = 0? → ТОЧНО НЕ СУЩЕСТВУЕТ ✅
Проверяем user_id = 789:
hash1(789) = позиция 5 → бит 5 = 1 ✅
hash2(789) = позиция 42 → бит 42 = 1 ✅
hash3(789) = позиция 200 → бит 200 = 1 ✅
Все биты = 1? → ВОЗМОЖНО СУЩЕСТВУЕТ (но может быть ложное срабатывание)
Ключевые свойства Bloom Filter:
| Свойство | Значение |
|---|---|
| False Positive (ложное срабатывание) | Возможно (элемент не существует, но фильтр говорит что существует) |
| False Negative (ложное отсутствие) | Невозможно (если фильтр говорит что элемента нет — его точно нет) |
| Вероятность ошибки | Настраивается (0.1% - 1% типично) |
| Память | ~10 бит на элемент для 1% ошибки |
| Скорость | O(1) — константное время |
Почему это полезно для защиты от Cache Penetration:
Без Bloom Filter:
Злоумышленник запрашивает 1 000 000 несуществующих user_id
→ 1 000 000 cache miss
→ 1 000 000 запросов к БД
→ БД перегружена и падает
С Bloom Filter:
Злоумышленник запрашивает 1 000 000 несуществующих user_id
→ Bloom Filter проверяет: "точно не существует"
→ 0 запросов к БД (отклоняем сразу)
→ БД в безопасности
Когда использовать Bloom Filter:
✅ Применять:
❌ Не применять:
Расчёт размера Bloom Filter:
# Формула для оптимального размера
m = -(n * ln(p)) / (ln(2)^2)
# где:
# n = ожидаемое количество элементов
# p = допустимая вероятность ошибки
# m = размер битового массива
# Пример: 1 млн элементов, ошибка 1%
n = 1_000_000
p = 0.01
m = -(1_000_000 * ln(0.01)) / (ln(2)^2)
m ≈ 9 600 000 бит ≈ 1.2 МБ
# Количество хэш-функций:
k = (m/n) * ln(2) ≈ 7 хэш-функцийfrom redis.commands.bf import BFBloom
async def setup_bloom_filter():
"""Настройка Bloom Filter в Redis"""
bf = BFBloom(redis_client)
# Создание фильтра (ожидаем 1 млн элементов, ошибка 0.1%)
await bf.create("user_ids", capacity=1000000, error_rate=0.001)
# Добавление существующих user_id
all_user_ids = await db.fetch_all("SELECT id FROM users")
for (user_id,) in all_user_ids:
await bf.add("user_ids", str(user_id))
async def get_user_with_bloom(user_id: int) -> Optional[dict]:
"""Проверка через Bloom Filter перед запросом к БД"""
redis_client = await get_redis_async()
cache_key = f"user:{user_id}"
# Проверка кэша
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached) if cached != "NULL" else None
# Проверка Bloom Filter
exists = await redis_client.bf().exists("user_ids", str(user_id))
if not exists:
# Точно не существует — не идём в БД
return None
# Возможно существует — проверяем БД
user_data = await db.fetch_user(user_id)
if user_data is None:
await redis_client.setex(cache_key, 60, "NULL")
return None
await redis_client.setex(cache_key, 3600, json.dumps(user_data, default=str))
return user_dataПроблема: Множество ключей истекают одновременно, вызывая массовый cache miss и перегрузку БД.
Отличие от Cache Stampede:
Наглядная аналогия:
Представьте стадион на 50 000 зрителей:
Сценарий запуска нового функционала:
00:00 — Деплой новой версии
00:01 — Кэш очищен полностью (reset)
00:01 — Все 100 000 пользователей заходят на сайт
00:01 — 100% cache miss
00:01 — 500 000 запросов к БД (5 запросов на пользователя)
00:02 — БД падает от перегрузки
00:02 — Сайт недоступен
Реальный кейс:
Компания X запустила новую функцию в 12:00:
import random
def get_randomized_ttl(base_ttl: int, jitter_percent: int = 20) -> int:
"""Добавление случайности к TTL для предотвращения avalanche"""
jitter = base_ttl * jitter_percent // 100
return base_ttl + random.randint(-jitter, jitter)
async def cache_with_jitter(key: str, value: Any, base_ttl: int = 3600):
"""Кэширование с рандомизированным TTL"""
redis_client = await get_redis_async()
ttl = get_randomized_ttl(base_ttl, jitter_percent=20)
await redis_client.setex(key, ttl, json.dumps(value, default=str))async def warmup_cache_gradually():
"""
Постепенное заполнение кэша после деплоя.
Не кэшировать всё сразу, а постепенно.
"""
# Получаем топ-100 популярных элементов
popular_items = await db.fetch_all(
"SELECT id FROM products ORDER BY views DESC LIMIT 100"
)
# Кэшируем с задержкой
for i, (item_id,) in enumerate(popular_items):
item_data = await db.fetch_product(item_id)
await cache_with_jitter(f"product:{item_id}", item_data)
# Задержка между элементами
if i % 10 == 0:
await asyncio.sleep(0.1)В микросервисной архитектуре кэш должен быть доступен всем инстансам приложения.
from redis.cluster import RedisCluster
# Подключение к Redis Cluster
cluster = RedisCluster(
host='redis-node-1',
port=7000,
startup_nodes=[
{"host": "redis-node-1", "port": 7000},
{"host": "redis-node-2", "port": 7000},
{"host": "redis-node-3", "port": 7000},
],
decode_responses=True,
)
# Данные автоматически шардируются по нодам
cluster.set("user:123", "data") # Может попасть на node-1
cluster.set("user:456", "data") # Может попасть на node-2Проблема обычного хэширования:
Обычное хэширование (key % n_nodes) создаёт проблему при изменении количества нод:
3 ноды: Redis-1, Redis-2, Redis-3
key = "user:123"
hash("user:123") % 3 = 1 → попадает на Redis-2
Добавляем Redis-4:
hash("user:123") % 4 = 0 → попадает на Redis-1 ❌
Результат: При добавлении 1 ноды 75% ключей перемещаются!
Все cache miss, БД перегружена.
Consistent Hashing решает эту проблему:
Представьте круг (кольцо), на котором расположены:
┌─────────────────────────────────┐
│ Кольцо хэшей │
│ │
│ Redis-1 │
│ ● │
│ │
│ │
user:456 ● │
│ │
│ ● Redis-2 │
│ │
│ │
│ ● user:123 │
│ │
│ Redis-3 ● │
│ │
└─────────────────────────────────┘
Правило: ключ принадлежит первой ноде по часовой стрелке
user:123 → движемся по часовой → Redis-3
user:456 → движемся по часовой → Redis-1
Что происходит при добавлении ноды:
┌─────────────────────────────────┐
│ Кольцо хэшей │
│ │
│ Redis-1 │
│ ● │
│ │
│ Redis-NEW ● │ ← Новая нода
│ │
user:456 ● │
│ │
│ ● Redis-2 │
│ │
│ │
│ ● user:123 │
│ │
│ Redis-3 ● │
│ │
└─────────────────────────────────┘
До: user:456 → Redis-1
После: user:456 → Redis-NEW (переместился)
До: user:123 → Redis-3
После: user:123 → Redis-3 (не переместился) ✅
Переместились только ключи между Redis-1 и Redis-NEW!
import hashlib
from typing import List, Any
class ConsistentHashCache:
"""
Реализация consistent hashing для распределённого кэша.
Ключи распределяются по нодам так, что при добавлении/удалении
ноды перемещается минимальное количество ключей.
"""
def __init__(self, nodes: List[str], virtual_nodes: int = 150):
self.nodes = nodes
self.virtual_nodes = virtual_nodes
self.ring = {}
self.sorted_keys = []
# Построение кольца
for node in nodes:
for i in range(virtual_nodes):
key = f"{node}:{i}"
hash_key = self._hash(key)
self.ring[hash_key] = node
self.sorted_keys = sorted(self.ring.keys())
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def get_node(self, cache_key: str) -> str:
"""Получение ноды для ключа"""
hash_key = self._hash(cache_key)
# Бинарный поиск ближайшей ноды по часовой стрелке
import bisect
idx = bisect.bisect(self.sorted_keys, hash_key)
if idx >= len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
def get(self, cache_key: str) -> Optional[Any]:
"""Получение значения с правильной ноды"""
node = self.get_node(cache_key)
# Подключение к конкретной ноде
redis_client = self.get_redis_for_node(node)
return redis_client.get(cache_key)
# Что такое виртуальные ноды (virtual nodes)?
"""
Проблема без виртуальных нод:
3 физические ноды на кольце:
┌─────────────────────────────────┐
│ Redis-1 │
│ ● │
│ │
│ │
│ │
│ │
│ ● Redis-2 │
│ │
│ │
│ │
│ │
│ Redis-3 ● │
└─────────────────────────────────┘
Ключи распределяются неравномерно!
- Сектор Redis-1: 40% ключей
- Сектор Redis-2: 35% ключей
- Сектор Redis-3: 25% ключей
Решение: виртуальные ноды
Каждая физическая нода представляется как 150 виртуальных:
- Redis-1: v1, v2, v3, ..., v150
- Redis-2: v1, v2, v3, ..., v150
- Redis-3: v1, v2, v3, ..., v150
┌─────────────────────────────────┐
│ ●● ● ●● ●●● ● ●●●● │ ← 450 точек
│ ● ●●● ●● ●● ●●●● ●●● ● │ равномерно
│●● ●●● ●●●● ●● ● ●● ●●● │ распределены
│ ●●● ●●●● ●●●●● ●●●●● ●● │
│ ● ●●●●●●●●●●●●●●●●●●●● │
└─────────────────────────────────┘
Результат:
- Ключи распределяются равномерно (~33% на каждую физическую ноду)
- При добавлении новой ноды перемещается только 1/N ключей
"""Что это: Один ключ запрашивается чаще других, создавая нагрузку на одну конкретную ноду.
Пример:
Популярный товар на маркетплейсе:
- product:iphone-15-pro-max → 10 000 запросов в секунду
- product:обычный-чехол → 10 запросов в секунду
Все запросы на product:iphone-15-pro-max идут на одну ноду Redis
(например, Redis-2 по хэшу ключа)
Результат:
- Redis-2: 10 000 RPS (перегружен, latency растёт)
- Redis-1: 100 RPS (простой)
- Redis-3: 100 RPS (простой)
Неравномерная нагрузка = проблемы производительности
Как обнаружить:
hot_key_cache = {} # Локальный кэш для горячих ключей
async def get_with_local_hot_cache(key: str): if key in HOT_KEYS: # Список известных горячих ключей if key in hot_key_cache: return hot_key_cache[key]
# Обычное получение из Redis
value = await redis.get(key)
if key in HOT_KEYS:
hot_key_cache[key] = value
return value
from redis.asyncio.lock import Lock
async def distributed_cache_with_lock(key: str, fetch_func): redis_client = await get_redis_async()
cached = await redis_client.get(key)
if cached:
return cached
# Распределённая блокировка
async with Lock(redis_client, f"lock:{key}", timeout=10):
# Двойная проверка
cached = await redis_client.get(key)
if cached:
return cached
result = await fetch_func()
await redis_client.setex(key, 3600, result)
return result
#### Проблема 2: Large/Big Keys (большие ключи)
**Что это:** Ключи с очень большим объёмом данных (>100 КБ).
**Почему это проблема:**
Большой ключ: user:123:all-history → 5 МБ (вся история действий)
Проблемы:
**Решение:**
- Разбивайте большие ключи на части (sharding by field)
- Храните в БД, кэшируйте только части
- Используйте Redis Hash вместо JSON строки
---
## 9. Практические примеры
### 9.1. Кэширование API-ответов в FastAPI
```python
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import hashlib
import json
app = FastAPI()
@app.middleware("http")
async def cache_response_middleware(request: Request, call_next):
"""Middleware для кэширования GET-запросов"""
# Кэшируем только GET запросы
if request.method != "GET":
return await call_next(request)
# Генерация ключа кэша
cache_key = f"api:{request.url.path}:{request.url.query}"
cache_key_hash = hashlib.md5(cache_key.encode()).hexdigest()
redis_key = f"http_cache:{cache_key_hash}"
redis_client = await get_redis_async()
# Проверка кэша
cached = await redis_client.get(redis_key)
if cached:
cached_data = json.loads(cached)
response = JSONResponse(content=cached_data["body"])
response.headers["X-Cache"] = "HIT"
return response
# Выполнение запроса
response = await call_next(request)
# Кэширование успешных ответов
if response.status_code == 200:
body = b""
async for chunk in response.body_iterator:
body += chunk
await redis_client.setex(
redis_key,
300, # 5 минут
json.dumps({"body": json.loads(body)})
)
response.headers["X-Cache"] = "MISS"
return response
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import uuid
app = FastAPI()
security = HTTPBearer()
async def create_session(user_id: int) -> str:
"""Создание сессии в Redis"""
session_id = str(uuid.uuid4())
session_data = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat(),
}
redis_client = await get_redis_async()
await redis_client.setex(
f"session:{session_id}",
604800, # 7 дней
json.dumps(session_data)
)
return session_id
async def get_current_session(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
"""Получение текущей сессии"""
session_id = credentials.credentials
redis_client = await get_redis_async()
session_data = await redis_client.get(f"session:{session_id}")
if not session_data:
raise HTTPException(status_code=401, detail="Session expired")
return json.loads(session_data)
@app.get("/api/profile")
async def get_profile(session: dict = Depends(get_current_session)):
"""Эндпоинт, требующий сессию"""
user_id = session["user_id"]
# ... получение профиляfrom fastapi import Request, HTTPException
from datetime import datetime
async def rate_limit(
request: Request,
max_requests: int = 100,
window_seconds: int = 60
) -> bool:
"""
Rate limiting с скользящим окном в Redis.
"""
redis_client = await get_redis_async()
client_ip = request.client.host
key = f"rate_limit:{client_ip}"
current_time = datetime.utcnow().timestamp()
window_start = current_time - window_seconds
# Удаление старых записей
await redis_client.zremrangebyscore(key, 0, window_start)
# Подсчёт запросов в окне
request_count = await redis_client.zcard(key)
if request_count >= max_requests:
return False # Лимит превышен
# Добавление текущего запроса
await redis_client.zadd(key, {str(current_time): current_time})
await redis_client.expire(key, window_seconds)
return True
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
if not await rate_limit(request, max_requests=100, window_seconds=60):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
return await call_next(request)Кэширование — мощный инструмент, требующий тщательного проектирования. Это не просто «добавить Redis и забыть», а постоянный баланс между производительностью, актуальностью данных и сложностью системы.
1. Стратегия:
2. Инвалидация:
3. Защита от проблем:
4. Распределённый кэш:
5. Мониторинг:
| Сценарий | Рекомендуемая стратегия | Почему |
|---|---|---|
| Профиль пользователя | Cache-Aside + TTL 1 час | Чтение >> записи, допустима задержка |
| Баланс счёта | Write-Through | Критична актуальность, частое чтение после записи |
| Счётчики лайков | Write-Behind + пакетная запись | Высокая нагрузка на запись, допустима задержка |
| Новости / статьи | Cache-Aside + явная инвалидация | Публикация редкая, чтение частое |
| Сессии пользователей | Cache-Aside + TTL 7 дней | Персональные данные, фиксированный срок жизни |
| Курсы валют | Cache-Aside + TTL 1-5 мин | Частое обновление, много читателей |
| Рекомендации товаров | Cache-Aside + TTL 24 часа | Дорогие вычисления (ML), редкое обновление |
| Корзина покупок | Write-Through | Частые изменения, важно для конверсии |
| Поиск / фильтры | Cache-Aside + версионирование | Много комбинаций параметров, инвалидация по версии |
| Статика (CSS, JS, изображения) | CDN + Browser Cache | Глобальная доставка, нулевая нагрузка на бэкенд |
| Ошибка | Последствия | Решение |
|---|---|---|
| Кэширование всего подряд | Переполнение памяти, stale data | Кэшируйте только hot data, настройте LRU eviction |
| Слишком долгий TTL | Устаревшие данные часами | Используйте события инвалидации, shorter TTL + refresh |
| Слишком короткий TTL | Низкий hit rate, нагрузка на БД | Увеличьте TTL, используйте stale-while-revalidate |
| Отсутствие защиты от stampede | Периодические падения БД | Mutex locks или probabilistic early expiration |
| Игнорирование hot keys | Перегрузка одной ноды Redis | Локальный кэш, репликация горячих ключей |
| Большие ключи (>1 МБ) | Блокировка Redis, latency | Декомпозиция, Redis Hash, хранение в БД |
| Нет мониторинга | Проблемы обнаруживаются постфактум | Дашборды с hit rate, latency, memory, eviction |
| Кэширование персонализированных данных | Переполнение памяти | Кэшируйте только общие данные, персонализацию в БД |
Помните два правила кэширования:
Первое правило кэширования: Не кэшируйте без измерений. Профилируйте приложение, находите узкие места, кэшируйте только то, что действительно нужно.
Второе правило кэширования: Всегда инвалидируйте кэш. Неправильная инвалидация — источник 90% багов, связанных с кэшированием.
Золотая середина:
❌ Без кэширования: Медленно, БД перегружена
❌ Слишком много кэша: Устаревшие данные, переполнение памяти
✅ Правильный баланс: 80%+ hit rate, данные актуальны, БД в порядке
Кэширование — это не серебряная пуля, а инструмент в арсенале инженера. Используйте его с умом, постоянно мониторьте и настраивайте под нагрузку вашего приложения.
Дополнительные ресурсы:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.