Celery, Redis queues, rate limiting, горизонтальное масштабирование
Проблема: Ваш бот стал популярным. 1000 пользователей одновременно нажали /start. Бот "ложится" — не отвечает, теряет сообщения, падает по памяти.
Сценарий из практики: Бот отправляет PDF-отчёты по запросу. Генерация занимает 30 секунд. Пока бот генерирует отчёт для одного пользователя, остальные 100 ждут — бот "завис". При 10 одновременных запросах сервер падает от нагрузки.
Решение:
- Очереди задач (Celery/RQ) — тяжёлые операции выполняются фоном
- Горизонтальное масштабирование — несколько инстансов бота
- Rate limiting — защита от спама и злоупотреблений
- Кэширование — снижение нагрузки на БД
Когда это нужно:
- Бот отвечает дольше 2-3 секунд
- Пользователи жалуются на "зависания"
- Сервер использует 80%+ CPU/памяти
- Планируете 1000+ одновременных пользователей
Одиночный инстанс бота сталкивается с проблемами:
| Проблема | Симптом | Решение |
|---|---|---|
| Блокировка на тяжёлых операциях | Бот не отвечает 10+ секунд | Очереди задач |
| Ограниченная пропускная способность | 100+ пользователей = падение | Горизонтальное масштабирование |
| Нет отказоустойчивости | Упал сервер = упал бот | Несколько инстансов |
| Сложность обновлений | Деплой = даунтайм | Rolling updates с балансировщиком |
Проблема: Генерация отчёта, отправка email, вызов внешнего API — операции занимают секунды или минуты. Бот не может ждать — нужно отвечать пользователю сразу.
Решение: Celery принимает задачу, кладёт в очередь, бот сразу отвечает "Готовим отчёт". Воркер забирает задачу из очереди и выполняет фоновом.
pip install celery[redis] flower💡 Зачем flower: Веб-интерфейс для мониторинга задач Celery.
# bot/celery_app.py
from celery import Celery
from bot.config import settings
celery_app = Celery(
'telegram_bot',
broker=settings.celery_broker_url, # redis://localhost:6379/1
backend=settings.celery_result_url, # redis://localhost:6379/1
include=['bot.tasks']
)
# Конфигурация
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
# Retry — повтор при ошибке
# 💡 Зачем: Внешние API могут временно не отвечать
task_acks_late=True, # Подтверждение после выполнения
task_reject_on_worker_or_memory_loss=True,
# Rate limiting на уровне воркера
# 💡 Зачем: Один воркер обрабатывает одну задачу за раз
worker_prefetch_multiplier=1,
# Результаты хранятся 1 час
result_expires=3600,
# Маршрутизация задач по очередям
# 💡 Зачем: Приоритизация — уведомления важнее экспорта
task_routes={
'bot.tasks.send_notification': {'queue': 'notifications'},
'bot.tasks.process_payment': {'queue': 'payments'},
'bot.tasks.export_data': {'queue': 'heavy_tasks'}
}
)⚠️ Антипаттерн: Не используйте одну очередь для всех задач. Уведомления должны обрабатываться быстрее, чем генерация отчётов.
# bot/tasks.py
import asyncio
import logging
from celery import Task
from aiogram import Bot
from bot.celery_app import celery_app
from bot.config import settings
logger = logging.getLogger(__name__)
class BotTask(Task):
"""
Базовый класс для задач с ботом.
💡 Зачем:
- Общий бот для всех задач
- Ленивая инициализация (только при первом вызове)
"""
_bot = None
@property
def bot(self):
if self._bot is None:
self._bot = Bot(token=settings.bot_token.get_secret_value())
return self._bot
@celery_app.task(base=BotTask, bind=True)
def send_notification(self, user_id: int, text: str):
"""
Отправить уведомление пользователю.
💡 Зачем в Celery:
- Не блокировать бота при отправке
- Retry при временных ошибках Telegram
⚠️ Важно: asyncio.run() для вызова асинхронного кода
"""
try:
asyncio.run(self.bot.send_message(user_id, text))
logger.info(f'Notification sent to {user_id}')
return {'status': 'success', 'user_id': user_id}
except Exception as e:
logger.error(f'Failed to send notification: {e}')
# 💡 Зачем retry: Telegram может временно не отвечать
raise self.retry(exc=e, countdown=60, max_retries=3)
@celery_app.task(base=BotTask, bind=True)
def send_bulk_notification(self, user_ids: list, text: str):
"""
Массовая рассылка с rate limiting.
⚠️ Важно: Telegram лимитирует — ~30 сообщений в секунду.
Нарушение = бан бота.
"""
success_count = 0
fail_count = 0
for i, user_id in enumerate(user_ids):
try:
asyncio.run(self.bot.send_message(user_id, text))
success_count += 1
except Exception as e:
logger.error(f'Failed to send to {user_id}: {e}')
fail_count += 1
# Rate limiting: 30 сообщений в секунду
# 💡 Зачем: Соблюдение лимитов Telegram
if (i + 1) % 30 == 0:
asyncio.run(asyncio.sleep(1))
return {
'status': 'completed',
'success': success_count,
'failed': fail_count
}
@celery_app.task
def process_payment(order_id: int, payment_data: dict):
"""
Обработка платежа (тяжёлая операция).
💡 Зачем в Celery:
- Вызов внешнего API (30+ секунд)
- Не блокировать бота
- Retry при ошибке API
"""
# Имитация обработки
import time
time.sleep(5)
# Логика оплаты
...
return {'order_id': order_id, 'status': 'paid'}
@celery_app.task
def export_data(user_id: int, format: str = 'csv'):
"""
Экспорт данных пользователя (тяжёлая задача).
💡 Зачем:
- Генерация файла (CPU-intensive)
- Загрузка в S3
- Отправка пользователю
"""
# Генерация отчёта
...
# Отправка файла
...
return {'user_id': user_id, 'file_path': '/path/to/export.csv'}
@celery_app.task(bind=True, max_retries=3)
def call_external_api(self, url: str, data: dict):
"""
Вызов внешнего API с retry.
💡 Зачем exponential backoff:
- 1-я попытка: сразу
- 2-я попытка: через 2 секунды
- 3-я попытка: через 4 секунды
"""
import requests
try:
response = requests.post(url, json=data, timeout=10)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.warning(f'API call failed: {e}')
raise self.retry(exc=e, countdown=2 ** self.request.retries)⚠️ Антипаттерн: Не передавайте в задачи сложные объекты (SQLAlchemy модели, сессии). Только примитивные типы (int, str, dict).
# Воркер для всех очередей
celery -A bot.celery_app worker --loglevel=info
# Воркер только для notifications (высокий приоритет)
celery -A bot.celery_app worker -Q notifications --loglevel=info
# Воркер для heavy_tasks с concurrency=1
# 💡 Зачем: Тяжёлые задачи не должны выполняться параллельно
celery -A bot.celery_app worker -Q heavy_tasks --concurrency=1 --loglevel=info
# Flower для мониторинга
celery -A bot.celery_app flower --port=5555💡 Совет: Запускайте разные воркеры на разных серверах. Heavy tasks на отдельном сервере с большим CPU.
# Отправить через 5 минут
# 💡 Зачем: Напоминания, отложенные уведомления
send_notification.apply_async(
args=[user_id, 'Hello!'],
countdown=300 # 5 минут
)
# Отправить в определённое время
from datetime import datetime, timedelta
eta_time = datetime.utcnow() + timedelta(hours=1)
send_notification.apply_async(
args=[user_id, 'Reminder!'],
eta=eta_time
)
# Периодическая задача (см. beat)# bot/celery_app.py
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
'cleanup-old-data': {
'task': 'bot.tasks.cleanup_old_data',
'schedule': crontab(hour=3, minute=0), # Каждый день в 3:00
# 💡 Зачем: Ночью меньше пользователей
},
'send-daily-summary': {
'task': 'bot.tasks.send_daily_summary',
'schedule': crontab(hour=9, minute=0), # Каждый день в 9:00
},
'check-pending-orders': {
'task': 'bot.tasks.check_pending_orders',
'schedule': 300.0, # Каждые 5 минут
# 💡 Зачем: Проверка "зависших" заказов
},
}# bot/tasks.py
@celery_app.task
def cleanup_old_data():
"""
Удаление старых данных.
💡 Зачем:
- Очистка места в БД
- GDPR (удаление данных по истечении срока)
"""
from bot.repositories.order import OrderRepository
from bot.database import async_session_factory
from datetime import datetime, timedelta
cutoff = datetime.utcnow() - timedelta(days=90)
# Очистка старых заказов
...
return {'status': 'cleaned', 'cutoff': cutoff.isoformat()}
@celery_app.task
def send_daily_summary():
"""
Ежедневная сводка админам.
💡 Зачем: Автоматический отчёт без участия человека.
"""
from bot.config import settings
# Сбор статистики
stats = get_daily_stats()
text = f"""
📊 Daily Summary
New users: {stats['new_users']}
Orders: {stats['orders']}
Revenue: {stats['revenue']} ₽
"""
# Отправка админам
for admin_id in settings.admin_ids:
send_notification.delay(admin_id, text)celery -A bot.celery_app beat --loglevel=info⚠️ Важно: Запускайте только один экземпляр beat. Иначе задачи будут выполняться несколько раз.
Проблема: Celery сложный, с кучей настроек. Нужно что-то проще для базовых очередей.
Решение: RQ (Redis Queue) — минималистичная очередь на Redis.
pip install rq redis# bot/rq_queue.py
import redis
from rq import Queue
redis_conn = redis.Redis.from_url('redis://localhost:6379/2')
# Очереди
# 💡 Зачем: Разделение по приоритетам
notification_queue = Queue('notifications', connection=redis_conn)
payment_queue = Queue('payments', connection=redis_conn)
heavy_queue = Queue('heavy_tasks', connection=redis_conn)# bot/rq_tasks.py
import asyncio
from aiogram import Bot
from bot.config import settings
def send_notification(user_id: int, text: str):
"""
Отправить уведомление.
⚠️ Важно: RQ не поддерживает async функции напрямую.
Нужно обёртывать через asyncio.run().
"""
bot = Bot(token=settings.bot_token.get_secret_value())
try:
asyncio.run(bot.send_message(user_id, text))
finally:
asyncio.run(bot.session.close())
def process_payment(order_id: int, payment_data: dict):
"""Обработка платежа."""
# Логика оплаты
...from bot.rq_queue import notification_queue, payment_queue
# Обычная очередь
# 💡 Зачем: Задача выполняется асинхронно
job = notification_queue.enqueue(
send_notification,
user_id=123,
text='Hello!'
)
# С приоритетом
job = payment_queue.enqueue(
process_payment,
order_id=456,
payment_data=data,
job_timeout=300 # 5 минут
)
# Отложенная задача
job = notification_queue.enqueue_in(
timedelta(minutes=5),
send_notification,
user_id=123,
text='Reminder!'
)
# Периодическая задача (через rq-scheduler)
from rq_scheduler import Scheduler
scheduler = Scheduler(connection=redis_conn)
scheduler.cron(
'0 9 * * *', # Каждый день в 9:00
send_daily_summary
)# Воркер для всех очередей
rq worker
# Воркер для конкретных очередей
rq worker notifications payments
# С указанием имени (для мониторинга)
rq worker --name bot-worker-1💡 Совет: RQ проще Celery, но меньше функций. Выбирайте Celery для сложных сценариев.
Проблема: Пользователь отправляет 100 сообщений в минуту. Бот тратит все ресурсы на обработку спама, другие пользователи не могут получить ответ.
Решение: Rate limiting ограничивает частоту запросов от одного пользователя.
from aiogram.dispatcher.middlewares.rate_limit import RateLimitMiddleware
# Регистрация
# ⚠️ Важно: Простой rate limiter, не для продакшена
dp.message.middleware(RateLimitMiddleware(limit=5, interval=1))⚠️ Антипаттерн: Не используйте встроенный rate limiter для продакшена. Данные теряются при перезапуске.
# bot/middleware/rate_limit.py
import asyncio
from typing import Callable, Dict, Any
from redis.asyncio import Redis
from aiogram import BaseMiddleware
from aiogram.types import Message, CallbackQuery
class RedisRateLimitMiddleware(BaseMiddleware):
"""
Rate limiting с использованием Redis.
💡 Зачем Redis:
- Данные сохраняются при перезапуске
- Работает с несколькими инстансами бота
🔗 Связь с другими темами: Архитектура (middleware)
"""
def __init__(
self,
redis: Redis,
limit: int = 10,
interval: int = 1,
key_prefix: str = 'ratelimit'
):
self.redis = redis
self.limit = limit
self.interval = interval
self.key_prefix = key_prefix
async def __call__(
self,
handler: Callable,
event: Message | CallbackQuery,
data: Dict[str, Any]
) -> Any:
user_id = event.from_user.id
key = f'{self.key_prefix}:{user_id}'
# Получаем текущее количество запросов
current = await self.redis.get(key)
if current and int(current) >= self.limit:
# Превышен лимит
# 💡 Зачем: Пользователь видит понятное сообщение
await event.answer(
'Слишком много запросов. Подождите немного.',
show_alert=True
)
return None
# Увеличиваем счётчик
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, self.interval) # TTL
await pipe.execute()
return await handler(event, data)
# Использование
from redis.asyncio import Redis
redis = Redis.from_url(settings.redis_url)
dp.message.middleware(RedisRateLimitMiddleware(redis, limit=10, interval=1))# bot/middleware/callback_rate_limit.py
from aiogram.types import CallbackQuery
class CallbackRateLimitMiddleware(BaseMiddleware):
"""
Rate limiting только для callback query.
💡 Зачем:
- Отдельный лимит для callback
- Пользователи часто кликают по кнопкам
"""
def __init__(self, redis: Redis, limit: int = 5, interval: int = 1):
self.redis = redis
self.limit = limit
self.interval = interval
async def __call__(
self,
handler: Callable,
event: CallbackQuery,
data: Dict[str, Any]
) -> Any:
user_id = event.from_user.id
key = f'callback_limit:{user_id}'
current = await self.redis.get(key)
if current and int(current) >= self.limit:
await event.answer('Подождите немного', show_alert=True)
return None
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, self.interval)
await pipe.execute()
return await handler(event, data)
# Регистрация только для callback
dp.callback_query.middleware(CallbackRateLimitMiddleware(redis))Проблема: Простой rate limiting (счётчик за интервал) имеет дыру. Пользователь может отправить 10 запросов в конце интервала и 10 в начале следующего — 20 запросов за 1 секунду.
Решение: Sliding window учитывает запросы в скользящем окне.
# bot/middleware/sliding_window.py
import time
class SlidingWindowRateLimit(BaseMiddleware):
"""
Sliding window rate limiting.
💡 Зачем:
- Более точное ограничение
- Нет "дыр" на границах интервалов
⚠️ Антипаттерн: Не используйте для high-load (медленнее простого счётчика).
"""
def __init__(self, redis: Redis, limit: int, window: int):
self.redis = redis
self.limit = limit
self.window = window # Окно в секундах
async def __call__(
self,
handler: Callable,
event: Message,
data: Dict[str, Any]
) -> Any:
user_id = event.from_user.id
key = f'sliding:{user_id}'
now = time.time()
# Удаляем старые записи за пределами окна
await self.redis.zremrangebyscore(key, 0, now - self.window)
# Считаем запросы в окне
count = await self.redis.zcard(key)
if count >= self.limit:
await event.answer('Rate limit exceeded', show_alert=True)
return None
# Добавляем текущий запрос
await self.redis.zadd(key, {str(now): now})
await self.redis.expire(key, self.window)
return await handler(event, data)Проблема: Один сервер не справляется с нагрузкой. CPU 100%, память заполнена, бот отвечает медленно.
Решение: Несколько инстансов бота за балансировщиком нагрузки.
# docker-compose.scale.yml
version: '3.8'
services:
bot-1:
build: .
environment:
- BOT_TOKEN=${BOT_TOKEN}
- INSTANCE_ID=bot-1
depends_on: [db, redis]
networks: [bot-network]
# 💡 Зачем: Одинаковая конфигурация для всех инстансов
bot-2:
build: .
environment:
- BOT_TOKEN=${BOT_TOKEN}
- INSTANCE_ID=bot-2
depends_on: [db, redis]
networks: [bot-network]
bot-3:
build: .
environment:
- BOT_TOKEN=${BOT_TOKEN}
- INSTANCE_ID=bot-3
depends_on: [db, redis]
networks: [bot-network]
# Celery воркеры
celery-worker:
build: .
command: celery -A bot.celery_app worker -Q notifications,payments
environment:
- BOT_TOKEN=${BOT_TOKEN}
depends_on: [redis, db]
networks: [bot-network]
celery-beat:
build: .
command: celery -A bot.celery_app beat
environment:
- BOT_TOKEN=${BOT_TOKEN}
depends_on: [redis]
networks: [bot-network]⚠️ Важно: Все инстансы должны использовать общий Redis для FSM и общий БД.
# nginx.conf для нескольких инстансов
upstream bot_backend {
least_conn; # 💡 Зачем: Балансировка по наименьшей загрузке
server bot-1:8000;
server bot-2:8000;
server bot-3:8000;
}
server {
listen 443 ssl;
location /webhook {
proxy_pass http://bot_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}💡 Зачем least_conn: Запросы идут на наименее загруженный сервер.
Проблема: Несколько инстансов бота должны координировать действия. Например, только один инстанс должен обрабатывать заказ.
Решение: Распределённые lock через Redis.
# bot/services/shared_state.py
from redis.asyncio import Redis
import json
class SharedState:
"""
Общее состояние между инстансами.
💡 Зачем:
- Координация между инстансами
- Избежание дублирования действий
"""
def __init__(self, redis: Redis):
self.redis = redis
async def set(self, key: str, value: any, ttl: int = 3600):
"""Установить значение."""
await self.redis.setex(
f'state:{key}',
ttl,
json.dumps(value)
)
async def get(self, key: str) -> any:
"""Получить значение."""
data = await self.redis.get(f'state:{key}')
return json.loads(data) if data else None
async def increment(self, key: str, by: int = 1) -> int:
"""Инкремент счётчика."""
return await self.redis.incrby(f'counter:{key}', by)
async def acquire_lock(self, lock_name: str, timeout: int = 10) -> bool:
"""
Получить распределённый lock.
💡 Зачем: Только один инстанс выполняет задачу.
Пример: Обработка заказа
"""
return await self.redis.set(
f'lock:{lock_name}',
'1',
nx=True, # Только если не существует
ex=timeout
)
async def release_lock(self, lock_name: str):
"""Освободить lock."""
await self.redis.delete(f'lock:{lock_name}')
# Использование
async with redis_lock.acquire_lock('process_order:123'):
# Только один инстанс обрабатывает заказ #123
await process_order(order_id)⚠️ Важно: Всегда устанавливайте timeout на lock. Иначе при падении инстанса lock останется навсегда.
Проблема: Частые запросы к БД для одних и тех же данных (профиль пользователя, настройки). БД становится узким местом.
Решение: Кэширование в Redis снижает нагрузку на БД.
# bot/services/cache.py
from redis.asyncio import Redis
import json
from functools import wraps
class CacheService:
"""
Сервис кэширования.
💡 Зачем:
- Снижение нагрузки на БД
- Ускорение ответов (Redis быстрее БД)
🔗 Связь с другими темами: Мониторинг (метрики кэша)
"""
def __init__(self, redis: Redis, prefix: str = 'cache'):
self.redis = redis
self.prefix = prefix
async def get(self, key: str) -> any:
"""Получить из кэша."""
data = await self.redis.get(f'{self.prefix}:{key}')
return json.loads(data) if data else None
async def set(self, key: str, value: any, ttl: int = 300):
"""
Сохранить в кэш.
💡 Зачем ttl: Данные не хранятся вечно.
"""
await self.redis.setex(
f'{self.prefix}:{key}',
ttl,
json.dumps(value)
)
async def delete(self, key: str):
"""Удалить из кэша."""
await self.redis.delete(f'{self.prefix}:{key}')
def cached(self, ttl: int = 300):
"""
Декоратор для кэширования результатов функции.
💡 Зачем:
- Автоматическое кэширование
- Прозрачно для вызывающего кода
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Генерируем ключ из аргументов
cache_key = f'{func.__name__}:{args}:{kwargs}'
# Пробуем кэш
cached = await self.get(cache_key)
if cached is not None:
return cached
# Вызываем функцию
result = await func(*args, **kwargs)
# Кэшируем результат
await self.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# Использование
cache = CacheService(redis)
@cache.cached(ttl=600)
async def get_user_profile(user_id: int):
"""
Получение профиля с кэшированием.
💡 Зачем: Профиль меняется редко, кэш экономит запросы к БД.
"""
return await user_repo.get_by_id(user_id)⚠️ Антипаттерн: Не кэшируйте данные, которые часто меняются (баланс, статус заказа).
| Тема | Как связана |
|---|---|
| Архитектура | Celery задачи вынесены в отдельный модуль |
| Базы данных | Connection pooling критичен при масштабировании |
| Мониторинг | Метрики очереди, кэша, rate limiting |
| Безопасность | Rate limiting защищает от DDoS |
→ Мониторинг — метрики производительности, алерты
→ Безопасность — защита от злоупотреблений
→ Тестирование — тесты производительности
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.