Кэширование, очереди, rate limiting, горизонтальное масштабирование
Масштабируемость — это способность системы справляться с ростом нагрузки без деградации. Проверяйте её до того, как продакшен упадёт.
# ✅ Cache-aside (Lazy Loading)
from redis import Redis
import json
redis = Redis()
def get_user(user_id):
key = f"user:{user_id}"
# Проверка кэша
cached = redis.get(key)
if cached:
return json.loads(cached)
# Загрузка из БД
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
# Запись в кэш с TTL
redis.setex(key, 300, json.dumps(user)) # 5 минут
return user
# ✅ Write-through (запись в кэш и БД одновременно)
def update_user(user_id, data):
db.execute("UPDATE users SET ... WHERE id = ?", data)
# Обновить кэш
key = f"user:{user_id}"
redis.setex(key, 300, json.dumps(data))
# ✅ Invalidate (удаление из кэша)
def delete_user(user_id):
db.execute("DELETE FROM users WHERE id = ?", user_id)
redis.delete(f"user:{user_id}") # ИнвалидацияЧто проверять:
# ❌ Проблема: много запросов к БД при истечении кэша
# 1000 запросов одновременно видят miss → 1000 запросов к БД
# ✅ Решение: блокировка
import redis.lock
def get_user(user_id):
key = f"user:{user_id}"
lock_key = f"lock:{key}"
cached = redis.get(key)
if cached:
return json.loads(cached)
# Только один поток загружает данные
with redis.lock(lock_key, timeout=10):
# Двойная проверка после захвата замка
cached = redis.get(key)
if cached:
return json.loads(cached)
user = db.query(...)
redis.setex(key, 300, json.dumps(user))
return user# ❌ Синхронно: пользователь ждёт
def send_email(user_id, template):
user = get_user(user_id)
email_service.send(user.email, template) # 2 секунды
return "OK" # Пользователь ждал 2 секунды!
# ✅ Асинхронно: очередь задач
from celery import Celery
app = Celery('tasks', broker='redis://localhost')
@app.task
def send_email_task(user_id, template):
user = get_user(user_id)
email_service.send(user.email, template)
def send_email(user_id, template):
send_email_task.delay(user_id, template) # Мгновенный ответ
return "Email queued"Что проверять:
# ✅ Ограничение частоты задач
@app.task(rate_limit='10/m') # 10 задач в минуту
def send_sms(phone, message):
sms_service.send(phone, message)
# ✅ Prioritization
@app.task(priority='high')
def process_payment(order_id):
...
@app.task(priority='low')
def generate_report(user_id):
...# ✅ Token Bucket алгоритм
from redis import Redis
import time
redis = Redis()
def is_rate_limited(user_id, limit=100, window=60):
key = f"rate:{user_id}"
now = time.time()
# Удалить старые токены
redis.zremrangebyscore(key, 0, now - window)
# Посчитать текущие запросы
count = redis.zcard(key)
if count >= limit:
return True # Превышен лимит
# Добавить текущий запрос
redis.zadd(key, {str(now): now})
redis.expire(key, window)
return False
# Использование
@app.before_request
def check_rate_limit():
user_id = get_current_user_id()
if is_rate_limited(user_id):
abort(429, "Too many requests")Что проверять:
# ❌ Stateful: сессия в памяти
sessions = {} # Не работает при нескольких серверах!
@app.route('/login')
def login():
session_id = generate_id()
sessions[session_id] = user # Только на этом сервере!
return session_id
# ✅ Stateless: сессия во внешнем хранилище
from redis import Redis
redis = Redis()
@app.route('/login')
def login():
session_id = generate_id()
redis.setex(f"session:{session_id}", 3600, user_json)
return session_idЧто проверять:
Ключевая мысль: Масштабируемость требует планирования. Кэшируйте, используйте очереди, ограничивайте rate, проектируйте stateless.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.