Multi-level caching, cache invalidation, warming, stale-while-revalidate
«Самый быстрый запрос — тот, который не был выполнен». Правильное кэширование снижает latency на порядки.
Кэширование решает две проблемы:
Latency сравнение:
├─ L1 кэш (CPU): 0.5 мкс
├─ L2 кэш (CPU): 7 мкс
├─ RAM: 100 мкс (0.1 мс)
├─ Redis (локальный): 1 мс
├─ Redis (сеть): 5 мс
├─ PostgreSQL: 50-500 мс
└─ Внешний API: 100-2000 мс
from functools import lru_cache
import time
# ❌ Плохо: кэширование без TTL
@lru_cache(maxsize=128)
def get_user(user_id):
return db.query('SELECT * FROM users WHERE id = ?', user_id)
# ✅ Хорошо: кэш с TTL
from cachetools import TTLCache
user_cache = TTLCache(maxsize=1000, ttl=300) # 5 минут
def get_user_cached(user_id):
if user_id in user_cache:
return user_cache[user_id]
user = db.query('SELECT * FROM users WHERE id = ?', user_id)
user_cache[user_id] = user
return userПлюсы:
Минусы:
Когда использовать:
import redis
import json
from typing import Optional
redis_client = redis.Redis(
host='redis-cluster',
port=6379,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True
)
class RedisCache:
"""Распределённый кэш с Redis."""
def __init__(self, default_ttl=300):
self.default_ttl = default_ttl
self.redis = redis_client
def get(self, key: str) -> Optional[dict]:
try:
data = self.redis.get(key)
if data:
return json.loads(data)
except redis.RedisError as e:
# Кэш недоступен — логируем, но не ломаем запрос
logger.warning(f"Redis get error: {e}")
return None
def set(self, key: str, value: dict, ttl: int = None):
try:
ttl = ttl or self.default_ttl
self.redis.setex(key, ttl, json.dumps(value))
except redis.RedisError as e:
logger.warning(f"Redis set error: {e}")
def delete(self, key: str):
try:
self.redis.delete(key)
except redis.RedisError as e:
logger.warning(f"Redis delete error: {e}")
def get_many(self, keys: list) -> dict:
"""Пакетное получение (снижает latency)."""
try:
values = self.redis.mget(keys)
return {
key: json.loads(value) if value else None
for key, value in zip(keys, values)
}
except redis.RedisError as e:
logger.warning(f"Redis mget error: {e}")
return {}
# Использование
cache = RedisCache(default_ttl=300)
def get_user_with_cache(user_id):
key = f"user:{user_id}"
# L1: локальный кэш
user = local_cache.get(user_id)
if user:
return user
# L2: Redis кэш
user = cache.get(key)
if user:
local_cache[user_id] = user # Заполняем L1
return user
# Cache miss — идём в БД
user = db.query('SELECT * FROM users WHERE id = ?', user_id)
# Заполняем кэши
cache.set(key, user)
local_cache[user_id] = user
return userПлюсы:
Минусы:
# PostgreSQL query cache через материализованные представления
MATERIALIZED VIEW user_stats_mv AS
SELECT
user_id,
COUNT(*) as order_count,
SUM(amount) as total_spent
FROM orders
GROUP BY user_id;
-- Обновляем периодически
REFRESH MATERIALIZED VIEW CONCURRENTLY user_stats_mv;Плюсы:
Минусы:
Самый распространённый паттерн.
def get_product_cache_aside(product_id):
key = f"product:{product_id}"
# 1. Проверяем кэш
product = cache.get(key)
if product:
return product
# 2. Cache miss — загружаем из БД
product = db.query(
'SELECT * FROM products WHERE id = ?',
product_id
)
# 3. Заполняем кэш
cache.set(key, product, ttl=600)
return productПлюсы:
Минусы:
Кэш сам загружает данные при miss.
class ReadThroughCache:
def __init__(self, loader_func, ttl=300):
self.loader_func = loader_func
self.ttl = ttl
self.cache = RedisCache(default_ttl=ttl)
def get(self, key):
value = self.cache.get(key)
if value is None:
# Кэш сам загружает данные
value = self.loader_func(key)
self.cache.set(key, value, self.ttl)
return value
# Использование
def load_product(key):
product_id = key.split(':')[1]
return db.query('SELECT * FROM products WHERE id = ?', product_id)
product_cache = ReadThroughCache(load_product, ttl=600)
# Прозрачное кэширование
product = product_cache.get('product:123')Данные записываются в кэш и БД одновременно.
def update_product_write_through(product_id, data):
key = f"product:{product_id}"
# 1. Обновляем БД
db.execute(
'UPDATE products SET name=?, price=? WHERE id = ?',
data['name'], data['price'], product_id
)
# 2. Обновляем кэш
cache.set(key, data, ttl=600)Плюсы:
Минусы:
Данные записываются в кэш сразу, в БД — асинхронно.
import asyncio
from collections import deque
class WriteBehindCache:
"""Кэш с асинхронной записью в БД."""
def __init__(self, batch_size=100, flush_interval=5):
self.cache = RedisCache()
self.write_queue = deque()
self.batch_size = batch_size
self.flush_interval = flush_interval
def write(self, key, value):
# Сразу пишем в кэш
self.cache.set(key, value)
# Ставим в очередь для БД
self.write_queue.append((key, value))
# Периодическая запись в БД
if len(self.write_queue) >= self.batch_size:
asyncio.create_task(self._flush_to_db())
async def _flush_to_db(self):
"""Асинхронная запись пакета в БД."""
batch = []
while len(batch) < self.batch_size and self.write_queue:
batch.append(self.write_queue.popleft())
if batch:
await self._db_batch_update(batch)
async def _db_batch_update(self, batch):
"""Пакетное обновление БД."""
# Реализация зависит от вашей БД
pass
# Использование
wb_cache = WriteBehindCache(batch_size=100, flush_interval=5)
# Быстрая запись (только в кэш)
wb_cache.write('product:123', {'name': 'New', 'price': 99})Плюсы:
Минусы:
Проблема: при истечении TTL множество запросов одновременно идут в БД.
Время T: TTL кэша истёк
Запрос 1: cache miss → БД
Запрос 2: cache miss → БД
...
Запрос 1000: cache miss → БД
Результат: 1000 одновременных запросов к БД → перегрузка
Решение 1: Probabilistic Early Expiration
import random
def get_product_early_expiration(product_id):
key = f"product:{product_id}"
base_ttl = 300 # 5 минут
product = cache.get(key)
if product:
return product
# Cache miss
product = db.query('SELECT * FROM products WHERE id = ?', product_id)
# Добавляем jitter к TTL: 300 ± 30 секунд
jitter = random.randint(-30, 30)
actual_ttl = base_ttl + jitter
cache.set(key, product, ttl=actual_ttl)
return productРешение 2: Stale-While-Revalidate
import time
def get_product_stale_while_revalidate(product_id):
key = f"product:{product_id}"
cache_ttl = 300 # 5 минут
stale_ttl = 600 # 10 минут (время на revalidate)
cached = cache.get_with_metadata(key)
if cached:
value, timestamp = cached['value'], cached['timestamp']
age = time.time() - timestamp
if age < cache_ttl:
# Свежий кэш
return value
if age < stale_ttl:
# Устаревший, но ещё допустимый кэш
# Асинхронно обновляем
asyncio.create_task(_refresh_cache(key, product_id))
return value
# Кэш отсутствует или слишком старый
return _refresh_cache_sync(key, product_id)
async def _refresh_cache(key, product_id):
"""Асинхронное обновление кэша."""
product = db.query('SELECT * FROM products WHERE id = ?', product_id)
cache.set(key, product, ttl=300)
def _refresh_cache_sync(key, product_id):
"""Синхронное обновление (для первого запроса)."""
product = db.query('SELECT * FROM products WHERE id = ?', product_id)
cache.set(key, product, ttl=300)
return productРешение 3: Mutex (Locking)
import redis
redis_client = redis.Redis()
def get_product_with_mutex(product_id):
key = f"product:{product_id}"
lock_key = f"lock:{key}"
product = cache.get(key)
if product:
return product
# Пытаемся получить lock
lock_acquired = redis_client.set(
lock_key,
'1',
nx=True, # Только если не существует
ex=10 # TTL lock
)
if lock_acquired:
try:
# Double-check после получения lock
product = cache.get(key)
if product:
return product
# Загружаем из БД
product = db.query(
'SELECT * FROM products WHERE id = ?',
product_id
)
cache.set(key, product, ttl=300)
return product
finally:
redis_client.delete(lock_key)
else:
# Ждём, пока другой процесс обновит кэш
time.sleep(0.1)
return get_product_with_mutex(product_id) # RetryПроблема: как вовремя удалять устаревшие данные?
Решение 1: Time-based (TTL)
# Простой TTL
cache.set('user:123', user_data, ttl=300) # 5 минутРешение 2: Explicit Invalidation
def update_user(user_id, data):
key = f"user:{user_id}"
# Обновляем БД
db.execute('UPDATE users SET ... WHERE id = ?', user_id)
# Инвалидируем кэш
cache.delete(key)
# Или обновляем кэш сразу (write-through)
# cache.set(key, new_data, ttl=300)Решение 3: Version-based
def get_user_versioned(user_id):
# Получаем версию пользователя
version = db.query(
'SELECT cache_version FROM users WHERE id = ?',
user_id
)
key = f"user:{user_id}:v{version}"
user = cache.get(key)
if user:
return user
user = db.query('SELECT * FROM users WHERE id = ?', user_id)
cache.set(key, user, ttl=3600) # Долгий TTL
return user
def update_user(user_id, data):
# Обновляем БД и увеличиваем версию
db.execute(
'UPDATE users SET ..., cache_version = cache_version + 1 WHERE id = ?',
user_id
)
# Старый кэш становится неактуальным автоматическиРешение 4: Dependency-based
class DependencyCache:
"""Кэш с зависимостями."""
def __init__(self):
self.cache = RedisCache()
self.deps = {} # key -> [dependent_keys]
def set_with_deps(self, key, value, depends_on=None, ttl=300):
self.cache.set(key, value, ttl)
if depends_on:
for dep_key in depends_on:
if dep_key not in self.deps:
self.deps[dep_key] = []
self.deps[dep_key].append(key)
def invalidate_with_deps(self, key):
"""Инвалидирует ключ и все зависимые."""
self.cache.delete(key)
if key in self.deps:
for dep_key in self.deps[key]:
self.cache.delete(dep_key)
del self.deps[key]
# Использование
dep_cache = DependencyCache()
# Кэш заказа зависит от кэша пользователя
dep_cache.set_with_deps(
'order:123',
order_data,
depends_on=['user:456'],
ttl=600
)
# При обновлении пользователя инвалидируем все зависимые кэши
dep_cache.invalidate_with_deps('user:456')Проблема: после деплоя или сброса кэша все запросы идут в БД.
Решение 1: Preload популярных данных
def warm_cache_on_startup():
"""Прогрев кэша при старте приложения."""
# Загружаем топ-100 популярных товаров
popular_products = db.query(
'''SELECT * FROM products
ORDER BY views DESC
LIMIT 100'''
)
for product in popular_products:
cache.set(f"product:{product['id']}", product, ttl=600)
# Загружаем конфигурацию
config = db.query('SELECT * FROM app_config')
cache.set('app_config', config, ttl=3600)
# Вызов при старте
warm_cache_on_startup()Решение 2: Фоновый прогрев
import asyncio
from datetime import datetime, timedelta
class CacheWarmer:
"""Фоновый прогрев кэша по расписанию."""
def __init__(self):
self.cache = RedisCache()
async def warm_popular_products(self):
"""Прогрев популярных товаров каждые 5 минут."""
while True:
popular = db.query(
'''SELECT * FROM products
WHERE updated_at > NOW() - INTERVAL '5 minutes'
ORDER BY views DESC
LIMIT 100'''
)
for product in popular:
self.cache.set(
f"product:{product['id']}",
product,
ttl=600
)
await asyncio.sleep(300) # 5 минут
async def warm_user_profiles(self, user_ids):
"""Прогрев профилей пользователей."""
users = db.query(
'SELECT * FROM users WHERE id = ANY(?)',
user_ids
)
for user in users:
self.cache.set(f"user:{user['id']}", user, ttl=1800)
# Запуск фонового процесса
warmer = CacheWarmer()
asyncio.create_task(warmer.warm_popular_products())from prometheus_client import Counter, Gauge, Histogram
CACHE_HITS = Counter('cache_hits_total', 'Total cache hits', ['cache_type'])
CACHE_MISSES = Counter('cache_misses_total', 'Total cache misses', ['cache_type'])
CACHE_HIT_RATIO = Gauge('cache_hit_ratio', 'Cache hit ratio', ['cache_type'])
CACHE_LATENCY = Histogram('cache_latency_seconds', 'Cache operation latency', ['cache_type', 'operation'])
class MonitoredCache:
def __init__(self, name, cache_impl):
self.name = name
self.cache = cache_impl
self._hits = 0
self._misses = 0
def get(self, key):
start = time.time()
try:
value = self.cache.get(key)
if value:
self._hits += 1
CACHE_HITS.labels(cache_type=self.name).inc()
else:
self._misses += 1
CACHE_MISSES.labels(cache_type=self.name).inc()
ratio = self._hits / (self._hits + self._misses) if (self._hits + self._misses) > 0 else 0
CACHE_HIT_RATIO.labels(cache_type=self.name).set(ratio)
return value
finally:
CACHE_LATENCY.labels(cache_type=self.name, operation='get').observe(time.time() - start)
def set(self, key, value, ttl=None):
start = time.time()
try:
return self.cache.set(key, value, ttl)
finally:
CACHE_LATENCY.labels(cache_type=self.name, operation='set').observe(time.time() - start)
# Использование
monitored_redis = MonitoredCache('redis', RedisCache())
monitored_local = MonitoredCache('local', TTLCache(maxsize=1000, ttl=300))Запрос → L1 (local) → L2 (Redis) → L3 (БД)
TTL_CONFIG = {
'config': 3600, # 1 час — редко меняется
'product': 600, # 10 минут — иногда меняется
'user_profile': 300, # 5 минут — часто меняется
'stock': 60, # 1 минута — очень часто
}# ❌ Плохо: кэш без TTL
cache.set('user:123', user) # Навсегда?
# ✅ Хорошо: с TTL
cache.set('user:123', user, ttl=300)def get_user_resilient(user_id):
try:
user = cache.get(f"user:{user_id}")
if user:
return user
except redis.RedisError:
logger.warning("Redis unavailable, falling back to DB")
# Fallback к БД
return db.query('SELECT * FROM users WHERE id = ?', user_id)В следующей теме рассмотрим асинхронность и конкурентность для скрытия задержек.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.