Celery, Redis, очереди задач, планирование
Celery — распределённая очередь задач для Python. В этой теме вы научитесь отправлять email через очереди, планировать задачи и отслеживать прогресс выполнения.
Представьте, что пользователь регистрируется на вашем сайте. Без Celery процесс выглядит так:
Пользователь нажимает "Регистрация"
↓
FastAPI создаёт пользователя в БД
↓
FastAPI отправляет welcome-email (2-3 секунды через SMTP)
↓
FastAPI возвращает ответ пользователю
Проблема: Пользователь ждёт 2-3 секунды. Если SMTP-сервер тормозит — 10 секунд. Если упал — пользователь получает ошибку.
С Celery процесс меняется:
Пользователь нажимает "Регистрация"
↓
FastAPI создаёт пользователя в БД
↓
FastAPI кладёт задачу "отправить email" в очередь (5 миллисекунд)
↓
FastAPI сразу возвращает ответ ← Пользователь счастлив!
↓
... а Celery Worker в фоне отправляет email
Выгода:
| Сценарий | Celery? | Почему |
|---|---|---|
| Отправка email | ✅ Да | Медленная внешняя операция |
| Обработка загруженного файла | ✅ Да | Долго, нужно прогресс |
| Генерация PDF-отчётов | ✅ Да | CPU-ёмко |
| Уведомления в Telegram/Slack | ✅ Да | Внешний API может тормозить |
| Парсинг сайтов | ✅ Да | Сетевые задержки |
| Простое вычисление в запросе | ❌ Нет | Быстро, не стоит усложнять |
| Чтение из БД | ❌ Нет | Обычно достаточно быстро |
BackgroundTasks из FastAPI?FastAPI уже имеет встроенный механизм фоновых задач. Вот сравнение:
| Критерий | BackgroundTasks | Celery |
|---|---|---|
| Простота | Простой — просто добавьте функцию | Требует Redis/RabbitMQ и Worker |
| Надёжность | ❌ Задача пропадёт при перезапуске | ✅ Задачи сохраняются в брокере |
| Повторные попытки | ❌ Нет встроенных retry | ✅ Встроенные retry с задержкой |
| Планирование | ❌ Нет | ✅ Celery Beat (cron-расписание) |
| Мониторинг | ❌ Нет | ✅ Flower — веб-интерфейс |
| Масштабирование | ❌ Работает только на этом сервере | ✅ Множество Worker на разных машинах |
| Отложенный запуск | ❌ Нет | ✅ ETA (выполнить в 9:00 завтра) |
Правило выбора:
BackgroundTasks для простых сценариев (отправить одно письмо, записать лог)Celery когда нужна надёжность, повторные попытки, расписание или масштабированиеПредставьте почтовую службу:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ FastAPI │─────►│ Broker │◄─────│ Worker │
│ (Секретарь) │ │ (Почтовый │ │ (Курьер) │
│ │ │ ящик) │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Backend │
│ (Архив) │
└─────────────┘
Разберём по ролям:
FastAPI (Секретарь) — принимает запросы от пользователей и кладёт задачи в очередь. Сам ничего не обрабатывает, только делегирует.
Broker (Почтовый ящик) — Redis или RabbitMQ. Хранит задачи в очереди, пока Worker не заберёт их. Без брокера Celery не работает!
Worker (Курьер) — отдельный процесс, который забирает задачи из очереди и выполняет их. Может быть запущен на другом сервере.
Backend (Архив) — опционально. Хранит результаты выполнения задач. Нужен если хотите узнать статус задачи позже.
Ключевая идея: FastAPI и Worker — это разные процессы. Они не делят память, переменные или соединения с БД. Worker должен сам создавать соединение с БД внутри задачи.
Нам нужны три компонента:
pip install celery[redis] rediscelery[redis] — Celery с поддержкой Redis как брокераredis — Python-клиент для Redis (нужен для подключения)Примечание: В production используйте конкретные версии:
celery==5.3.4 redis==5.0.1
Вариант A — Docker (рекомендуется):
docker run -d -p 6379:6379 redis:7-alpineВариант B — локальная установка:
# macOS
brew install redis
redis-server
# Linux
sudo apt install redis-server
sudo systemctl start redisЧто мы сейчас сделаем: Создадим файл celery_app.py — это точка входа для Celery. Его будут импортировать и FastAPI, и Worker.
# celery_app.py
from celery import Celery
# Создаём экземпляр Celery
# broker — где хранить очередь задач (Redis)
# backend — где хранить результаты задач (тоже Redis, но другая БД)
celery_app = Celery(
'my_app', # Имя приложения (произвольное)
broker='redis://localhost:6379/0', # Брокер — БД 0
backend='redis://localhost:6379/1', # Бэкэнд — БД 1
)
# Конфигурация сериализации
celery_app.conf.update(
# Формат данных для передачи задач
# JSON — универсальный, виден в брокере, легко отлаживать
task_serializer='json',
accept_content=['json'],
result_serializer='json',
# Часовой пояс для планирования
timezone='Europe/Moscow',
enable_utc=True,
# --- Надёжность ---
# Задача подтверждается ПОСЛЕ выполнения, а не ДО
# Если Worker упадёт — задача вернётся в очередь
task_acks_late=True,
# Если Worker потерял соединение — вернуть задачу в очередь
task_reject_on_worker_lost=True,
# --- Таймауты ---
# Максимальное время выполнения (5 минут)
task_time_limit=300,
# Мягкий лимит — можно перехватить исключение
task_soft_time_limit=240,
)Как это работает:
| Параметр | Зачем | Что будет если не настроить |
|---|---|---|
broker | Где хранить очередь | Без него Celery не запустится |
backend | Где хранить результаты | Не сможете узнать статус задачи |
task_serializer='json' | Формат данных | По умолчанию pickle — медленнее и небезопасно |
task_acks_late=True | Подтверждение ПОСЛЕ выполнения | При падении Worker задача потеряется |
task_time_limit | Лимит времени | Зависшая задача будет блокировать Worker вечно |
Почему разные БД (0 и 1)? Redis поддерживает несколько "баз данных" в одном инстансе. Разделяем очереди задач и результаты, чтобы они не мешали друг другу.
Что мы сейчас сделаем: Напишем простую задачу отправки email с обработкой ошибок и повторными попытками.
# tasks.py
from celery_app import celery_app
import smtplib
from email.mime.text import MIMEText
import logging
logger = logging.getLogger(__name__)
@celery_app.task(
bind=True, # Передаёт экземпляр задачи как self (нужен для retry)
max_retries=3, # Максимум 3 попытки
default_retry_delay=60 # Ждать 60 секунд между попытками
)
def send_email_task(
self, # Экземпляр задачи (появляется благодаря bind=True)
to_email: str, # Кому
subject: str, # Тема
body: str # Текст письма
):
"""
Отправка email через Celery с повторными попытками.
Почему bind=True?
- Без bind: функция получает только аргументы
- С bind: первым аргументом идёт self — экземпляр задачи
- self нужен для вызова self.retry() при ошибке
"""
try:
logger.info(f"Sending email to {to_email}")
# Создаём письмо
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = 'noreply@example.com'
msg['To'] = to_email
# Отправляем через SMTP
with smtplib.SMTP('smtp.example.com', 587) as server:
server.starttls() # Шифрование
server.login('user', 'password')
server.send_message(msg)
logger.info(f"Email sent to {to_email}")
return {'status': 'success', 'email': to_email}
except Exception as exc:
# Произошла ошибка (SMTP недоступен, таймаут и т.д.)
logger.error(f"Failed to send email to {to_email}: {exc}")
# self.retry() выбрасывает специальную ошибку
# Celery поймёт её и запланирует повторный запуск
raise self.retry(exc=exc)Как работает retry:
Попытка 1: Ошибка! → Ждём 60 сек
Попытка 2: Ошибка! → Ждём 60 сек
Попытка 3: Ошибка! → Задача переходит в состояние FAILURE
Важно: После
max_retriesисчерпания задача не удаляется из backend — её статус становитсяFAILUREи вы можете обработать это в коде.
Зачем это нужно: Если задача выполняется долго (генерация отчёта, обработка файла), пользователь хочет знать, сколько ещё осталось.
Что мы сейчас сделаем: Создадим задачу, которая сообщает о своём прогрессе в процентах.
# tasks.py (продолжение)
import time
@celery_app.task(bind=True)
def process_data_task(self, data: list[int]):
"""
Обрабатывает список данных и сообщает о прогрессе.
Ключевое отличие: self.update_state() — сообщает Celery
о текущем состоянии задачи.
"""
total = len(data)
results = []
for i, item in enumerate(data):
# Имитация долгой обработки (в реальности — запрос к API, парсинг и т.д.)
time.sleep(0.1)
results.append(item * 2)
# === КЛЮЧЕВОЙ МОМЕНТ ===
# update_state() обновляет статус задачи в backend
state='PROGRESS' — специальное состояние для "в процессе"
self.update_state(
state='PROGRESS', # Состояние задачи
meta={ # Произвольные данные (сохранятся в backend)
'current': i + 1,
'total': total,
'percent': round((i + 1) / total * 100, 2)
}
)
return {'status': 'completed', 'results': results}Как это выглядит для пользователя:
Запрос → "Processing started, task_id: abc-123"
GET /task-status/abc-123 → {"progress": 25}
GET /task-status/abc-123 → {"progress": 50}
GET /task-status/abc-123 → {"progress": 75}
GET /task-status/abc-123 → {"progress": 100, "status": "Completed"}
Важно: Для работы
update_state()обязательно нуженbackendв конфигурации Celery. Без него прогресс не сохранится.
Что это: Worker — это отдельный процесс, который выполняет задачи. Без него задачи будут копиться в очереди, но не выполняться.
# В отдельном терминале запустите:
celery -A celery_app worker --loglevel=info --concurrency=4Разбор команды:
| Флаг | Что делает |
|---|---|
-A celery_app | Путь к модулю с экземпляром Celery (celery_app.py) |
worker | Запустить воркер (исполнитель задач) |
--loglevel=info | Уровень логирования (видно что происходит) |
--concurrency=4 | Сколько задач обрабатывать параллельно (по умолчанию = кол-во CPU) |
Вы должны увидеть:
-------------- celery@your-machine v5.3.4
---- **** -----
--- * *** * -- Darwin-21.0.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: my_app:0x10c682e50
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/1
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.send_email_task
. tasks.process_data_task
Запомните: Worker должен быть запущен всегда, когда ваше приложение использует Celery. В production используйте supervisor или systemd для автоматического перезапуска.
Что мы сейчас сделаем: Создадим FastAPI-приложение с тремя эндпоинтами:
# main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, EmailStr
from celery.result import AsyncResult
from tasks import send_email_task, process_data_task
app = FastAPI(title="Celery + FastAPI Demo")
# === Pydantic-модели для запросов ===
class EmailRequest(BaseModel):
to_email: EmailStr
subject: str
body: str
class Config:
json_schema_extra = {
"examples": [{
"to_email": "user@example.com",
"subject": "Привет!",
"body": "Добро пожаловать в наше приложение!"
}]
}
class ProcessDataRequest(BaseModel):
data: list[int]
# === Эндпоинты ===
@app.post('/send-email')
async def send_email(request: EmailRequest):
"""
Отправляет email через Celery.
Как это работает:
1. Создаём задачу вызовом .delay() — это аналог вызова функции
2. .delay() мгновенно возвращает объект задачи с task_id
3. Задача уходит в очередь Redis
4. Worker заберёт её когда будет свободен
"""
# .delay() — асинхронный вызов задачи
# Возвращает AsyncResult с task_id
task = send_email_task.delay(
request.to_email,
request.subject,
request.body
)
# Возвращаем task_id чтобы клиент мог проверить статус
return {
'message': 'Email task queued',
'task_id': task.id
}
@app.get('/task-status/{task_id}')
async def get_task_status(task_id: str):
"""
Проверяет статус задачи по task_id.
AsyncResult — объект для работы с задачей:
- task.state — текущее состояние (PENDING, STARTED, PROGRESS, SUCCESS, FAILURE)
- task.info — данные из update_state() или результат выполнения
"""
# Создаём объект результата из task_id
# Важно: используем celery_app из tasks, а не send_email_task.app
# (они ссылаются на одно приложение, но так понятнее)
task = AsyncResult(task_id, app=send_email_task.app)
response = {
'task_id': task_id,
'state': task.state,
}
# Разные состояния — разная логика ответа
if task.state == 'PENDING':
# Задача ещё не началась (или task_id не существует)
response['status'] = 'Task is pending'
elif task.state == 'STARTED':
# Worker начал выполнять, но ещё не обновил прогресс
response['status'] = 'Task started'
elif task.state == 'PROGRESS':
# Задача сообщила о прогрессе через update_state()
response['status'] = 'In progress'
response['progress'] = task.info # {'current': 5, 'total': 10, 'percent': 50}
elif task.state == 'SUCCESS':
# Задача успешно завершена
response['status'] = 'Completed'
response['result'] = task.result # {'status': 'success', 'email': '...'}
elif task.state == 'FAILURE':
# Задача завершилась с ошибкой (после всех retry)
response['status'] = 'Failed'
response['error'] = str(task.info) # Текст исключения
else:
# Другие состояния (RETRY, REVOKED и т.д.)
response['status'] = task.state
return response
@app.post('/process-data')
async def process_data(request: ProcessDataRequest):
"""
Обрабатывает данные с отслеживанием прогресса.
Проверка на размер данных — хорошая практика:
Celery сериализует аргументы в JSON и отправляет через Redis.
Огромные данные = медленная сериализация + нагрузка на Redis.
"""
if len(request.data) > 10000:
raise HTTPException(400, "Data too large")
task = process_data_task.delay(request.data)
return {
'message': 'Processing started',
'task_id': task.id
}Как тестировать:
# 1. Запустите Redis
docker run -d -p 6379:6379 redis:7-alpine
# 2. Запустите Worker
celery -A celery_app worker --loglevel=info
# 3. Запустите FastAPI
uvicorn main:app --reload
# 4. Отправьте запрос
curl -X POST http://localhost:8000/send-email \
-H "Content-Type: application/json" \
-d '{"to_email": "test@example.com", "subject": "Hi", "body": "Hello!"}'
# Ответ: {"message": "Email task queued", "task_id": "abc-123-..."}
# 5. Проверьте статус
curl http://localhost:8000/task-status/abc-123-...
# Ответ: {"task_id": "...", "state": "SUCCESS", "status": "Completed", ...}Что такое Celery Beat: Это планировщик, который автоматически запускает задачи по расписанию — как cron в Linux.
Зачем это нужно:
Что мы сейчас сделаем: Добавим расписание задач в celery_app.py.
# celery_app.py (добавьте в конец файла)
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
# Имя расписания (произвольное, для мониторинга)
'send-daily-report': {
'task': 'tasks.send_daily_report_task', # Имя задачи (модуль.функция)
'schedule': crontab(hour=9, minute=0), # Каждый день в 9:00 UTC
},
'cleanup-old-data': {
'task': 'tasks.cleanup_old_data_task',
'schedule': crontab(hour=2, minute=0, day_of_week=0), # Воскресенье 2:00
},
'health-check': {
'task': 'tasks.health_check_task',
'schedule': crontab(minute='*/5'), # Каждые 5 минут
},
'monthly-report': {
'task': 'tasks.monthly_report_task',
'schedule': crontab(hour=0, minute=0, day_of_month=1), # 1-е число месяца
},
}crontab() работает как cron в Linux. Вот все параметры:
crontab() # Каждую минуту
crontab(minute='*/5') # Каждые 5 минут
crontab(hour='*/2') # Каждые 2 часа
crontab(hour=9, minute=0) # Каждый день в 9:00
crontab(hour=9, minute=0, day_of_week=1) # Каждый понедельник в 9:00
crontab(hour=0, minute=0, day_of_month=1) # 1-е число каждого месяца
crontab(minute=0, hour='*/3') # Каждые 3 часа в :00# tasks.py (добавьте в конец)
@celery_app.task
def send_daily_report_task():
"""
Ежедневный отчёт — отправляет email с статистикой.
Эта задача будет вызываться автоматически Celery Beat
по расписанию из beat_schedule.
"""
# Здесь ваша логика:
# 1. Собрать статистику из БД
# 2. Сформировать отчёт
# 3. Отправить email администраторам
logger.info("Daily report sent")
return {'status': 'daily report sent'}
@celery_app.task
def cleanup_old_data_task():
"""Очистка старых данных — удаляет записи старше 30 дней."""
# Здесь ваша логика:
# db.query(Log).filter(Log.created_at < cutoff).delete()
logger.info("Cleanup completed")
return {'status': 'cleanup completed'}
@celery_app.task
def health_check_task():
"""Проверка здоровья — пингует БД и внешние сервисы."""
# Здесь ваша логика:
# 1. Проверить соединение с БД
# 2. Проверить внешний API
# 3. Отправить алерт если что-то не так
logger.info("Health check passed")
return {'status': 'healthy'}
@celery_app.task
def monthly_report_task():
"""Ежемесячный отчёт — генерирует PDF и отправляет руководству."""
logger.info("Monthly report generated")
return {'status': 'monthly report generated'}Важно: Beat и Worker — это два разных процесса. Запускайте их в отдельных терминалах.
# Терминал 1 — Worker (выполняет задачи)
celery -A celery_app worker --loglevel=info
# Терминал 2 — Beat (запускает задачи по расписанию)
celery -A celery_app beat --loglevel=infoЧто вы увидите в логах Beat:
celery beat v5.3.4 is starting.
LocalTime -> 2024-01-15 09:00:00
...
Scheduler: Sending due task send-daily-report (tasks.send_daily_report_task)
Важно: Запускайте только один экземпляр Beat! Иначе одна и та же задача будет запущена несколько раз. В production используйте флаг
-S redisдля координации:celery -A celery_app beat -S redis --loglevel=info
Зачем это нужно: Иногда нужно выполнить задачу не сейчас, а в определённое время.
Два способа отложить задачу:
countdown — отсчёт от текущего моментаfrom datetime import timedelta
# Выполнить через 10 минут (600 секунд)
task = send_email_task.apply_async(
args=[to_email, subject, body],
countdown=600 # Секунды от сейчас
)
# Проще: через 1 час
task = send_email_task.apply_async(
args=[to_email, subject, body],
countdown=3600
)eta — конкретное время выполненияfrom datetime import datetime, timezone
# Выполнить завтра в 9:00 UTC
send_at = datetime(2024, 1, 16, 9, 0, 0, tzinfo=timezone.utc)
task = send_email_task.apply_async(
args=[to_email, subject, body],
eta=send_at # Exact time — точное время
)Разница:
countdown— "через X секунд",eta— "в такое-то время".etaполезен когда пользователь сам выбирает время отправки.
# main.py (добавьте)
from datetime import datetime, timezone
class ScheduledEmailRequest(BaseModel):
to_email: EmailStr
subject: str
body: str
send_at: datetime # Когда отправить
@app.post('/send-email-scheduled')
async def send_email_scheduled(request: ScheduledEmailRequest):
"""
Отправляет email в указанное время.
Пример запроса:
{
"to_email": "user@example.com",
"subject": "Напоминание",
"body": "Не забудьте про встречу!",
"send_at": "2024-01-16T09:00:00Z"
}
"""
# Проверяем что время в будущем
now = datetime.now(timezone.utc)
if request.send_at <= now:
raise HTTPException(400, "send_at must be in the future")
# Ставим задачу с eta
task = send_email_task.apply_async(
args=[request.to_email, request.subject, request.body],
eta=request.send_at
)
return {
'message': f'Email scheduled for {request.send_at}',
'task_id': task.id
}Как Celery хранит отложенные задачи: Задача лежит в брокере (Redis) и не выполняется пока не наступит время
eta. Можно безопасно перезапускать Worker — задача не потеряется.
Зачем это нужно: Иногда задачи зависят друг от друга или нужно выполнить несколько задач параллельно.
Сценарий: Сначала вычислить результат, потом отправить его по email.
from celery import chain
# Цепочка: задача1 → задача2 → задача3
# Результат каждой передаётся следующей
workflow = chain(
heavy_computation.s(1, 2), # 1 + 2 = 3
heavy_computation.s(3), # 3 + 3 = 6 (результат предыдущей + 3)
send_email_task.s('admin@ex.com', 'Result', 'Done') # Отправляем
)
result = workflow.apply_async()Как это работает:
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ heavy_computation│────►│ heavy_computation│────►│ send_email_task │
│ (1, 2) │ 3 │ (3) │ 6 │ admin@ex.com │
└──────────────────┘ └──────────────────┘ └──────────────────┘
Важно:
.s()— это "signature" (сигнатура задачи). Она позволяет передать задачу как объект. Результат предыдущей задачи становится первым аргументом следующей.
Сценарий: Отправить письма 100 пользователям одновременно, не дожидаясь завершения каждого.
from celery import group
# Группа задач — выполняются параллельно
group_workflow = group(
send_email_task.s('user1@example.com', 'Subject', 'Body'),
send_email_task.s('user2@example.com', 'Subject', 'Body'),
send_email_task.s('user3@example.com', 'Subject', 'Body'),
)
group_result = group_workflow.apply_async()
# Получить результаты всех задач
results = group_result.get() # [{'status': 'success', ...}, ...]Как это работает:
┌──────────────────────┐
│ GROUP │
└──────────────────────┘
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│ user1 │ │ user2 │ │ user3 │
└──────────┘ └──────────┘ └──────────┘
Сценарий: Обработать 10 файлов, потом отправить сводный отчёт.
from celery import chord
# Хорда: группа задач → callback когда все завершены
chord_workflow = chord(
# Группа: 10 задач параллельно
group(heavy_computation.s(i, i) for i in range(10))
)(
# Callback: вызывается когда ВСЕ задачи из группы завершены
send_email_task.s('admin@ex.com', 'All Done', 'Summary')
)
chord_result = chord_workflow.apply_async()Как это работает:
┌─────┐ ┌─────┐ ┌─────┐ ... ┌─────┐
│ 0 │ │ 1 │ │ 2 │ │ 9 │
└─────┘ └─────┘ └─────┘ └─────┘
↓ ↓ ↓ ↓
[Ждём завершения ВСЕХ задач]
↓
┌───────────────────┐
│ send_email_task │ ← Вызывается один раз
└───────────────────┘
Когда использовать:
- Chain — когда задачи зависят друг от друга
- Group — когда задачи независимы и можно выполнить параллельно
- Chord — когда нужно дождаться завершения группы и сделать что-то после
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
from sqlalchemy.orm import Session
from pydantic import BaseModel, EmailStr
from celery.result import AsyncResult
from typing import List, Optional
from datetime import datetime
from database import get_db
from models import User, EmailCampaign, EmailLog
from tasks import send_email_task
app = FastAPI()
# === Модели ===
class CampaignCreate(BaseModel):
name: str
subject: str
body: str
user_ids: List[int]
class CampaignResponse(BaseModel):
id: int
name: str
subject: str
status: str
total_emails: int
sent_count: int
created_at: datetime
class Config:
from_attributes = True
# === Endpoints ===
@app.post('/campaigns', response_model=CampaignResponse)
async def create_campaign(
campaign: CampaignCreate,
db: Session = Depends(get_db)
):
"""
Создаёт email-кампанию и запускает рассылку.
"""
# Создаём кампанию
db_campaign = EmailCampaign(
name=campaign.name,
subject=campaign.subject,
body=campaign.body,
status='pending',
total_emails=len(campaign.user_ids),
sent_count=0
)
db.add(db_campaign)
db.commit()
db.refresh(db_campaign)
# Запускаем задачи для каждого пользователя
task_ids = []
for user_id in campaign.user_ids:
user = db.query(User).filter(User.id == user_id).first()
if user and user.email:
task = send_email_task.delay(
user.email,
campaign.subject,
campaign.body
)
task_ids.append(task.id)
# Логируем отправку
log = EmailLog(
campaign_id=db_campaign.id,
user_id=user_id,
task_id=task.id,
status='queued'
)
db.add(log)
db_campaign.status = 'sending'
db_campaign.task_ids = task_ids
db.commit()
return db_campaign
@app.get('/campaigns/{campaign_id}')
async def get_campaign(
campaign_id: int,
db: Session = Depends(get_db)
):
"""
Получает статус кампании.
"""
campaign = db.query(EmailCampaign).filter(
EmailCampaign.id == campaign_id
).first()
if not campaign:
raise HTTPException(404, "Campaign not found")
# Подсчитываем прогресс
sent = db.query(EmailLog).filter(
EmailLog.campaign_id == campaign_id,
EmailLog.status == 'sent'
).count()
failed = db.query(EmailLog).filter(
EmailLog.campaign_id == campaign_id,
EmailLog.status == 'failed'
).count()
return {
**campaign.__dict__,
'sent_count': sent,
'failed_count': failed,
'progress': round(sent / campaign.total_emails * 100, 2) if campaign.total_emails > 0 else 0
}
@app.get('/campaigns/{campaign_id}/logs')
async def get_campaign_logs(
campaign_id: int,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
"""
Получает логи рассылки.
"""
logs = db.query(EmailLog).filter(
EmailLog.campaign_id == campaign_id
).offset(skip).limit(limit).all()
return logs
@app.post('/campaigns/{campaign_id}/retry-failed')
async def retry_failed_emails(
campaign_id: int,
db: Session = Depends(get_db)
):
"""
Повторная отправка неудачных писем.
"""
campaign = db.query(EmailCampaign).filter(
EmailCampaign.id == campaign_id
).first()
if not campaign:
raise HTTPException(404, "Campaign not found")
# Находим неудачные логи
failed_logs = db.query(EmailLog).filter(
EmailLog.campaign_id == campaign_id,
EmailLog.status == 'failed'
).all()
retried_count = 0
for log in failed_logs:
user = db.query(User).filter(User.id == log.user_id).first()
if user and user.email:
task = send_email_task.delay(
user.email,
campaign.subject,
campaign.body
)
log.task_id = task.id
log.status = 'queued'
retried_count += 1
db.commit()
return {
'message': f'Retried {retried_count} failed emails',
'retried_count': retried_count
}Симптом:
kombu.exceptions.EncodeError: Object of type User is not JSON serializable
Проблема:
@celery_app.task
def send_welcome_email(user: User): # ❌ ОШИБКА!
send_email(user.email, "Welcome!")Celery сериализует аргументы в JSON для передачи через Redis. Объекты SQLAlchemy не сериализуются.
Решение: Передавайте ID, загружайте объект внутри задачи:
@celery_app.task
def send_welcome_email(user_id: int): # ✅ OK
from database import SessionLocal
from models import User
with SessionLocal() as db:
user = db.query(User).filter(User.id == user_id).first()
if user:
send_email(user.email, "Welcome!")Почему нужно создавать Session внутри задачи? FastAPI и Worker — разные процессы. Они не делят соединения с БД. Worker должен создать своё собственное соединение.
Симптом: Задача висит в состоянии PENDING бесконечно.
Причины и решения:
| Причина | Как проверить | Решение |
|---|---|---|
| Worker не запущен | Нет логов в терминале Worker | Запустите celery -A celery_app worker |
| Worker не видит задачу | В логах нет вашей задачи в [tasks] | Проверьте include=['tasks'] в Celery конфига |
| Redis не запущен | redis-cli ping не отвечает PONG | Запустите Redis |
| Неправильное имя задачи | В логах Worker нет tasks.my_task | Проверьте декоратор @celery_app.task |
Проблема:
@celery_app.task
def send_email(email: str):
smtp.send(email) # Может упасть! Задача потерется.Если SMTP недоступен, задача упадёт без повторных попыток.
Решение:
@celery_app.task(bind=True, max_retries=3)
def send_email(self, email: str):
try:
smtp.send(email)
except Exception as exc:
# retry() автоматически запланирует повторный запуск
raise self.retry(exc=exc, countdown=60)Проблема:
@celery_app.task
def heavy_task():
time.sleep(60) # Блокирует worker на минуту!По умолчанию Worker обрабатывает 4 задачи параллельно. Если все 4 займутся sleep(), остальные задачи будут ждать.
Решение: Увеличьте --concurrency или используйте gevent пул:
# 10 задач параллельно
celery -A celery_app worker --concurrency=10 --loglevel=info
# Или с gevent (нужен pip install gevent)
celery -A celery_app worker --pool=gevent --concurrency=100Проблема: Вы вызываете update_state() для прогресса, но клиент всегда видит PENDING.
Причина: Без backend Celery не сохраняет статус задач.
Решение: Убедитесь что backend указан в конфигурации:
celery_app = Celery(
'my_app',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1', # ← Обязательно!
)Цель: Реализовать отправку приветственного письма при регистрации пользователя.
# tasks.py
from celery_app import celery_app
import smtplib
from email.mime.text import MIMEText
import logging
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
def send_welcome_email_task(self, user_email: str, user_name: str):
"""
Отправляет welcome-email новому пользователю.
Задание:
1. Создайте MIME письмо с персонализацией
2. Отправьте через SMTP
3. Обработайте ошибки с retry
"""
try:
# TODO: Создайте письмо с темой "Добро пожаловать, {user_name}!"
# TODO: В теле письма: "Привет, {user_name}! Рады видеть вас."
# TODO: Отправьте на user_email
# Подсказка:
msg = MIMEText(f"Привет, {user_name}! Рады видеть вас.")
msg['Subject'] = f"Добро пожаловать, {user_name}!"
msg['From'] = 'noreply@example.com'
msg['To'] = user_email
with smtplib.SMTP('smtp.example.com', 587) as server:
server.starttls()
server.login('user', 'password')
server.send_message(msg)
logger.info(f"Welcome email sent to {user_email}")
return {'status': 'success', 'email': user_email}
except Exception as exc:
logger.error(f"Failed to send welcome email: {exc}")
raise self.retry(exc=exc)# main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, EmailStr
from tasks import send_welcome_email_task
app = FastAPI()
class UserRegisterRequest(BaseModel):
name: str
email: EmailStr
@app.post('/register')
async def register_user(request: UserRegisterRequest):
"""
Регистрирует пользователя и отправляет welcome-email.
TODO:
1. Создайте пользователя в БД (или имитируйте)
2. Запустите задачу send_welcome_email_task
3. Верните task_id для проверки статуса
"""
# TODO: Здесь создайте пользователя в БД
# db_user = User(name=request.name, email=request.email)
# db.add(db_user)
# db.commit()
# Запустите задачу отправки email
task = send_welcome_email_task.delay(request.email, request.name)
return {
'message': f'User {request.name} registered',
'email_task_id': task.id
}# 1. Запустите Redis
docker run -d -p 6379:6379 redis:7-alpine
# 2. Запустите Worker
celery -A celery_app worker --loglevel=info
# 3. Запустите FastAPI
uvicorn main:app --reload
# 4. Зарегистрируйте пользователя
curl -X POST http://localhost:8000/register \
-H "Content-Type: application/json" \
-d '{"name": "Иван", "email": "ivan@example.com"}'
# 5. Проверьте что Worker получил задачу
# В логах Worker должны увидеть:
# "Sending email to ivan@example.com"
# "Welcome email sent to ivan@example.com"send_welcome_email_task появляется в списке [tasks]/register задача ставится в очередьtask.status меняется от PENDING до SUCCESSGET /task-status/{task_id} для проверки статуса отправкиeta чтобы отправлять welcome-email через 1 час после регистрацииFlower — веб-интерфейс для мониторинга Celery. Показывает задачи в реальном времени, статистику и позволяет управлять задачами.
pip install flowercelery -A celery_app flower --port=5555Откройте http://localhost:5555. Вы увидите:
| Вкладка | Что показывает |
|---|---|
| Dashboard | Общая статистика: задач/сек, успехи, ошибки |
| Tasks | Все задачи: активные, выполненные, проваленные |
| Worker | Статус Worker процессов, загрузка CPU |
| Broker | Статистика Redis: длина очереди, потребители |
Что можно делать через Flower:
task_time_limit)| Концепция | Что научились делать |
|---|---|
| Celery архитектура | Понимать роли: Broker, Worker, Backend |
| Создание задач | Декорировать функции @celery_app.task |
| Повторные попытки | Использовать max_retries и self.retry() |
| Прогресс задач | Сообщать прогресс через update_state() |
| Планирование | Настраивать Celery Beat с crontab |
| Отложенные задачи | Использовать eta и countdown |
| Цепочки | Комбинировать chain, group, chord |
| Мониторинг | Запускать Flower для наблюдения |
| Интеграция с FastAPI | Вызывать .delay() из эндпоинтов |
В следующей теме вы изучите кэширование — Redis, memoization, стратегии инвалидации кэша.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.