Redis, memoization, стратегии инвалидации кэша
Кэширование ускоряет API, сохраняя результаты дорогих операций. В этой теме вы научитесь использовать Redis, memoization и стратегии инвалидации кэша.
Кэширование — сохранение результатов вычислений для быстрого доступа без повторного выполнения.
| Сценарий | Кэшировать | Почему |
|---|---|---|
| Частые GET запросы | ✅ | Избегаем повторных запросов к БД |
| Дорогие вычисления | ✅ | Сохраняем результат |
| Статические данные | ✅ | Редко меняются |
| Персональные данные | ❌ | У каждого свои данные |
| Частые изменения | ❌ | Кэш устаревает быстрее, чем используется |
| POST/PUT/DELETE | ❌ | Изменяют данные |
Без кэша:
Запрос → БД (50ms) → Ответ
С кэшем:
Запрос → Кэш (1ms) → Ответ (98% быстрее!)
docker run -d -p 6379:6379 --name redis redis:latest# macOS
brew install redis
brew services start redis
# Ubuntu
sudo apt-get install redis-server
sudo systemctl start redisredis-cli ping
# PONGpip install redisfrom fastapi import FastAPI
import redis
import json
import time
app = FastAPI()
# Подключение к Redis
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_expensive_data():
"""Имитация дорогой операции"""
time.sleep(2) # 2 секунды
return {'data': 'result', 'timestamp': time.time()}
@app.get('/data')
def get_data():
cache_key = 'expensive_data'
# Проверка кэша
cached = redis_client.get(cache_key)
if cached:
return {'data': json.loads(cached), 'from_cache': True}
# Получение данных
data = get_expensive_data()
# Сохранение в кэш (1 час)
redis_client.setex(
cache_key,
3600, # TTL в секундах
json.dumps(data)
)
return {'data': data, 'from_cache': False}from fastapi import FastAPI, Depends
import redis
import json
from datetime import timedelta
app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_db():
"""Имитация подключения к БД"""
# В реальности: SQLAlchemy session
return {'db': 'connection'}
def cache_result(expire: int = 300):
"""
Декоратор для кэширования результатов.
"""
def decorator(func):
def wrapper(*args, **kwargs):
# Генерация ключа кэша
cache_key = f"{func.__name__}:{str(kwargs)}"
# Проверка кэша
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Выполнение функции
result = func(*args, **kwargs)
# Сохранение в кэш
redis_client.setex(
cache_key,
expire,
json.dumps(result, default=str)
)
return result
return wrapper
return decorator
@app.get('/users/{user_id}')
@cache_result(expire=600)
def get_user(user_id: int, db = Depends(get_db)):
"""Получение пользователя с кэшированием"""
# Имитация запроса к БД
time.sleep(0.5)
return {
'id': user_id,
'name': f'User {user_id}',
'fetched_at': time.time()
}# Сохранение
redis_client.set('user:1:name', 'John')
# Получение
name = redis_client.get('user:1:name')
# Сохранение с TTL
redis_client.setex('user:1:name', 3600, 'John')# Инкремент
redis_client.incr('counter') # +1
redis_client.incrby('counter', 5) # +5
# Декремент
redis_client.decr('counter')
# Получение
count = int(redis_client.get('counter'))# Добавление в начало
redis_client.lpush('queue', 'item1', 'item2')
# Добавление в конец
redis_client.rpush('queue', 'item3')
# Получение элемента
item = redis_client.lpop('queue') # Из начала
item = redis_client.rpop('queue') # Из конца
# Длина списка
length = redis_client.llen('queue')# Добавление
redis_client.sadd('tags', 'python', 'fastapi', 'redis')
# Проверка наличия
is_member = redis_client.sismember('tags', 'python') # True
# Получение всех
tags = redis_client.smembers('tags')
# Удаление
redis_client.srem('tags', 'python')# Сохранение объекта
redis_client.hset('user:1', mapping={
'name': 'John',
'email': 'john@example.com',
'age': '30'
})
# Получение поля
name = redis_client.hget('user:1', 'name')
# Получение всех полей
user = redis_client.hgetall('user:1')
# Удаление поля
redis_client.hdel('user:1', 'age')# Кэш истекает через 1 час
redis_client.setex('data', 3600, json.dumps(data))@app.put('/users/{user_id}')
def update_user(user_id: int, user_data: dict, db = Depends(get_db)):
# Обновление в БД
# ...
# Инвалидация кэша
redis_client.delete(f'user:{user_id}')
# Или удаление всех ключей пользователя
keys = redis_client.keys(f'user:{user_id}:*')
if keys:
redis_client.delete(*keys)
return {'status': 'updated'}def get_user(user_id: int):
cache_key = f'user:{user_id}'
# Проверка кэша
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Получение из БД
user = db.query(User).get(user_id)
# Сохранение в кэш
redis_client.setex(
cache_key,
3600,
json.dumps(user, default=str)
)
return user
def update_user(user_id: int, user_data: dict):
# Обновление в БД
user = db.query(User).get(user_id)
# ...
# Обновление кэша сразу
redis_client.setex(
f'user:{user_id}',
3600,
json.dumps(user, default=str)
)
return userdef get_data(key: str):
# Сначала кэш
cached = redis_client.get(key)
if cached:
return json.loads(cached)
# Потом БД
data = db.query(...).all()
# Сохранение в кэш
redis_client.setex(key, 300, json.dumps(data, default=str))
return datafrom fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import hashlib
import redis
import json
app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
def cache_response(expire: int = 300):
"""
Декоратор для кэширования ответов endpoint.
"""
def decorator(func):
async def wrapper(request: Request, *args, **kwargs):
# Генерация ключа из URL и параметров
cache_key = hashlib.md5(
f"{request.url.path}:{request.url.query}".encode()
).hexdigest()
# Проверка кэша
cached = redis_client.get(cache_key)
if cached:
cached_data = json.loads(cached)
return JSONResponse(
content=cached_data['body'],
headers=cached_data.get('headers', {}),
status_code=cached_data.get('status_code', 200)
)
# Выполнение endpoint
response = await func(request, *args, **kwargs)
# Сохранение ответа в кэш
if isinstance(response, JSONResponse):
cache_data = {
'body': response.body.decode(),
'status_code': response.status_code,
'headers': dict(response.headers)
}
redis_client.setex(
cache_key,
expire,
json.dumps(cache_data)
)
return response
return wrapper
return decorator
@app.get('/api/items')
@cache_response(expire=600)
async def get_items(request: Request, skip: int = 0, limit: int = 100):
"""Кэшированный endpoint"""
items = [{'id': i, 'name': f'Item {i}'} for i in range(skip, skip + limit)]
return JSONResponse(content={'items': items})pip install fastapi-cache2[redis]from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import redis
app = FastAPI()
@app.on_event("startup")
async def startup():
redis_client = redis.Redis(host='localhost', port=6379)
FastAPICache.init(RedisBackend(redis_client), prefix='fastapi-cache')
@app.get('/items/{item_id}')
@cache(expire=600)
async def get_item(item_id: int):
await asyncio.sleep(1) # Имитация
return {'id': item_id, 'name': f'Item {item_id}'}
@app.get('/users')
@cache(expire=300)
async def get_users(skip: int = 0, limit: int = 100):
return {'users': [{'id': i} for i in range(skip, skip + limit)]}from fastapi_cache import FastAPICache
@app.put('/items/{item_id}')
async def update_item(item_id: int, item_data: dict):
# Обновление
# ...
# Инвалидация кэша
await FastAPICache.clear(namespace='default')
return {'status': 'updated'}from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel
from typing import List, Optional
import redis
import json
import hashlib
from datetime import datetime
from database import get_db
from models import Product, Category
app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
class ProductResponse(BaseModel):
id: int
name: str
price: float
category_id: int
class Config:
from_attributes = True
def get_cache_key(prefix: str, **kwargs) -> str:
"""Генерация ключа кэша"""
key_string = f"{prefix}:{':'.join(str(v) for v in kwargs.values())}"
return hashlib.md5(key_string.encode()).hexdigest()
def cache_get(key: str):
"""Получение из кэша"""
cached = redis_client.get(key)
return json.loads(cached) if cached else None
def cache_set(key: str, data, expire: int = 300):
"""Сохранение в кэш"""
redis_client.setex(key, expire, json.dumps(data, default=str))
def cache_delete(key: str):
"""Удаление из кэша"""
redis_client.delete(key)
def invalidate_pattern(pattern: str):
"""Инвалидация по паттерну"""
keys = redis_client.keys(pattern)
if keys:
redis_client.delete(*keys)
# === Endpoints ===
@app.get('/products', response_model=List[ProductResponse])
def list_products(
skip: int = 0,
limit: int = 100,
category_id: Optional[int] = None,
db: Session = Depends(get_db)
):
"""Список продуктов с кэшированием"""
cache_key = get_cache_key(
'products:list',
skip=skip,
limit=limit,
category_id=category_id or 'all'
)
# Проверка кэша
cached = cache_get(cache_key)
if cached:
return cached
# Запрос к БД
query = db.query(Product)
if category_id:
query = query.filter(Product.category_id == category_id)
products = query.offset(skip).limit(limit).all()
result = [ProductResponse.from_orm(p).dict() for p in products]
# Сохранение в кэш
cache_set(cache_key, result, expire=600)
return result
@app.get('/products/{product_id}', response_model=ProductResponse)
def get_product(product_id: int, db: Session = Depends(get_db)):
"""Продукт по ID с кэшированием"""
cache_key = get_cache_key('products:detail', id=product_id)
# Проверка кэша
cached = cache_get(cache_key)
if cached:
return cached
# Запрос к БД
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(404, "Product not found")
result = ProductResponse.from_orm(product).dict()
# Сохранение в кэш
cache_set(cache_key, result, expire=3600)
return result
@app.post('/products', response_model=ProductResponse)
def create_product(product_data: dict, db: Session = Depends(get_db)):
"""Создание продукта с инвалидацией кэша"""
product = Product(**product_data)
db.add(product)
db.commit()
db.refresh(product)
# Инвалидация кэша списка
invalidate_pattern('products:list:*')
return ProductResponse.from_orm(product)
@app.put('/products/{product_id}', response_model=ProductResponse)
def update_product(
product_id: int,
product_data: dict,
db: Session = Depends(get_db)
):
"""Обновление продукта с инвалидацией кэша"""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(404, "Product not found")
for key, value in product_data.items():
setattr(product, key, value)
db.commit()
db.refresh(product)
# Инвалидация кэша
cache_delete(get_cache_key('products:detail', id=product_id))
invalidate_pattern('products:list:*')
return ProductResponse.from_orm(product)
@app.delete('/products/{product_id}')
def delete_product(product_id: int, db: Session = Depends(get_db)):
"""Удаление продукта с инвалидацией кэша"""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(404, "Product not found")
db.delete(product)
db.commit()
# Инвалидация кэша
cache_delete(get_cache_key('products:detail', id=product_id))
invalidate_pattern('products:list:*')
return {'status': 'deleted'}
@app.get('/stats')
def get_stats(db: Session = Depends(get_db)):
"""Статистика с длительным кэшированием"""
cache_key = 'stats:overview'
cached = cache_get(cache_key)
if cached:
return cached
# Дорогие запросы
total_products = db.query(Product).count()
total_categories = db.query(Category).count()
result = {
'total_products': total_products,
'total_categories': total_categories,
'generated_at': datetime.utcnow().isoformat()
}
# Длительный TTL
cache_set(cache_key, result, expire=3600)
return result@app.get('/cache/stats')
def get_cache_stats():
"""Статистика Redis"""
info = redis_client.info('stats')
return {
'keyspace_hits': info.get('keyspace_hits', 0),
'keyspace_misses': info.get('keyspace_misses', 0),
'hit_rate': round(
info.get('keyspace_hits', 0) /
max(1, info.get('keyspace_hits', 0) + info.get('keyspace_misses', 0)) * 100,
2
)
}
@app.get('/cache/keys')
def list_cache_keys(pattern: str = '*'):
"""Список ключей кэша"""
keys = redis_client.keys(pattern)
return {'keys': keys, 'count': len(keys)}
@app.delete('/cache/clear')
def clear_cache(pattern: str = '*'):
"""Очистка кэша"""
keys = redis_client.keys(pattern)
if keys:
redis_client.delete(*keys)
return {'deleted': len(keys)}@cache(expire=300)
def get_user_data(user_id: int): # Кэш общий для всех!
...Проблема: Все пользователи получат данные первого запросившего.
Решение: Включайте user_id в ключ кэша: f'user:{user_id}:data'.
cache_set('products', products, expire=86400) # 24 часа
# При обновлении кэш не инвалидируетсяРешение: Инвалидируйте кэш при изменении данных.
try:
data = get_data()
except Exception:
# Ошибка не кэшируется правильно
cache_set('data', {'error': True})Решение: Не кэшируйте ошибки или используйте короткий TTL.
В следующей теме вы изучите Middleware — логирование, CORS, rate limiting, кастомные middleware.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.