Настройка Celery и ARQ с Redis как broker и backend
Celery и ARQ — две популярные библиотеки для фоновых задач в Python. Обе используют Redis как broker (очередь задач) и backend (хранение результатов). Celery — зрелая, мощная, но сложная. ARQ — легковесная, async-native, проще в настройке.
pip install celery[redis]
# или
poetry add celery[redis]# celery_config.py
from celery import Celery
import os
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
celery_app = Celery(
'tasks',
broker=REDIS_URL,
backend=f'{REDIS_URL}', # Для хранения результатов задач
include=['tasks'], # Модули с задачами
)
# Конфигурация
celery_app.conf.update(
# Сериализация
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
# Надёжность
task_acks_late=True, # Подтверждение после выполнения
task_reject_on_worker_or_lost=True, # Requeue при сбое воркера
task_track_started=True,
# Retry
task_default_retry_delay=30, # Задержка перед retry
task_max_retry_delay=600, # Максимальная задержка
task_default_retries=3,
# Очереди
task_default_queue='default',
task_queues={
'default': {'exchange': 'default', 'routing_key': 'default'},
'high_priority': {'exchange': 'high_priority', 'routing_key': 'high'},
'low_priority': {'exchange': 'low_priority', 'routing_key': 'low'},
},
task_routes={
'tasks.send_email': {'queue': 'high_priority'},
'tasks.process_bulk': {'queue': 'low_priority'},
},
# Rate limiting
worker_prefetch_multiplier=1, # Не брать задачи вперёд
task_default_rate_limit='100/m', # 100 задач в минуту по умолчанию
)# tasks.py
from celery_config import celery_app
from celery import Task
import logging
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, to: str, subject: str, body: str):
"""Отправка email"""
try:
logger.info(f"Sending email to {to}: {subject}")
# Логика отправки...
return {'status': 'sent', 'to': to}
except Exception as exc:
# Retry с экспоненциальной задержкой
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@celery_app.task(bind=True)
def process_payment(self, user_id: int, amount: float):
"""Обработка платежа"""
try:
# Логика обработки...
return {'status': 'processed', 'user_id': user_id}
except Exception as exc:
# Retry с кастомной задержкой
raise self.retry(exc=exc, countdown=60)
@celery_app.task
def heavy_computation(data: list):
"""Тяжёлая вычислительная задача"""
result = sum(x ** 2 for x in data)
return result
# Задачи с разными очередями
@celery_app.task(queue='high_priority')
def send_push_notification(user_id: int, message: str):
"""Срочное уведомление"""
pass
@celery_app.task(queue='low_priority')
def generate_report(report_id: int):
"""Генерация отчёта (не срочно)"""
pass# main.py
from fastapi import FastAPI, BackgroundTasks, HTTPException
from tasks import send_email_task, process_payment, heavy_computation
app = FastAPI()
@app.post("/send-email")
async def send_email_endpoint(to: str, subject: str, body: str):
"""Отправка email через Celery"""
task = send_email_task.delay(to, subject, body)
return {'task_id': task.id, 'status': 'queued'}
@app.post("/process-payment")
async def process_payment_endpoint(user_id: int, amount: float):
"""Обработка платежа"""
task = process_payment.delay(user_id, amount)
return {'task_id': task.id, 'status': 'queued'}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
"""Проверка статуса задачи"""
from celery_config import celery_app
task = celery_app.AsyncResult(task_id)
return {
'task_id': task_id,
'status': task.status,
'result': task.result if task.ready() else None,
}
@app.post("/heavy-computation")
async def heavy_computation_endpoint(data: list):
"""Тяжёлая вычислительная задача"""
task = heavy_computation.delay(data)
return {'task_id': task.id}# Основной воркер (обрабатывает default очередь)
celery -A celery_config worker --loglevel=info
# Воркер для high_priority очереди
celery -A celery_config worker --loglevel=info -Q high_priority
# Воркер для всех очередей с приоритетами
celery -A celery_config worker --loglevel=info -Q default,high_priority,low_priority
# Запуск в production (с eventlet/gevent для concurrency)
celery -A celery_config worker --loglevel=info --pool=gevent --concurrency=100
# Beat для периодических задач
celery -A celery_config beat --loglevel=info# celery_config.py
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
'send-daily-report': {
'task': 'tasks.send_daily_report',
'schedule': crontab(hour=9, minute=0), # Каждый день в 9:00
},
'cleanup-old-data': {
'task': 'tasks.cleanup_old_data',
'schedule': crontab(hour=3, minute=0, day_of_week='sunday'), # Каждое воскресенье в 3:00
},
'process-pending-tasks': {
'task': 'tasks.process_pending',
'schedule': 60.0, # Каждую минуту
},
}# tasks.py
@celery_app.task
def send_daily_report():
"""Ежедневный отчёт"""
# Логика отчёта...
pass
@celery_app.task
def cleanup_old_data():
"""Очистка старых данных"""
passARQ — легковесная async queue для Python, нативно поддерживает asyncio.
pip install arq
# или
poetry add arq# worker_settings.py
from arq import cron
import redis.asyncio as redis
import os
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
class Settings:
redis_settings = {
'address': ('localhost', 6379),
'password': os.getenv('REDIS_PASSWORD'),
'db': 0,
}
# Очереди
queue_name = 'arq:queue'
# Воркеры
max_jobs = 10 # Максимум одновременных задач
job_timeout = 300 # Таймаут задачи (5 минут)
burst = False # False = постоянный режим
# Retry
max_tries = 3
job_retry_delay = 1 # Задержка перед retry
# Функции
functions = [
'tasks.send_email',
'tasks.process_payment',
'tasks.heavy_computation',
]
# Периодические задачи
cron_jobs = [
cron(send_daily_report, hour=9, minute=0),
cron(cleanup_old_data, hour=3, minute=0, day_of_week=6), # Суббота
]
settings = Settings()# tasks.py
from arq.worker import Worker
import logging
import asyncio
logger = logging.getLogger(__name__)
async def send_email(ctx, to: str, subject: str, body: str):
"""Отправка email"""
logger.info(f"Sending email to {to}: {subject}")
# Имитация отправки
await asyncio.sleep(0.1)
return {'status': 'sent', 'to': to}
async def process_payment(ctx, user_id: int, amount: float):
"""Обработка платежа"""
logger.info(f"Processing payment: user={user_id}, amount={amount}")
# Логика обработки...
await asyncio.sleep(0.5)
return {'status': 'processed', 'user_id': user_id}
async def heavy_computation(ctx, data: list):
"""Тяжёлая вычислительная задача"""
# Имитация вычислений
await asyncio.sleep(1)
result = sum(x ** 2 for x in data)
return result
async def task_with_retry(ctx, item_id: str):
"""Задача с retry"""
attempt = ctx['job_try']
if attempt < 3:
# Принудительный retry
raise ValueError(f"Attempt {attempt} failed")
return {'status': 'success', 'item_id': item_id}
# Периодическая задача
async def send_daily_report(ctx):
"""Ежедневный отчёт"""
logger.info("Sending daily report...")
# Логика отчёта...
async def cleanup_old_data(ctx):
"""Очистка старых данных"""
logger.info("Cleaning up old data...")
# Логика очистки...# main.py
from fastapi import FastAPI
from arq import create_pool
from arq.connections import RedisSettings
import os
app = FastAPI()
# Redis settings для ARQ
redis_settings = RedisSettings(
host='localhost',
port=6379,
password=os.getenv('REDIS_PASSWORD'),
)
@app.on_event("startup")
async def startup():
app.state.arq_pool = await create_pool(redis_settings)
@app.on_event("shutdown")
async def shutdown():
await app.state.arq_pool.close()
@app.post("/send-email")
async def send_email_endpoint(to: str, subject: str, body: str):
"""Отправка email через ARQ"""
job = await app.state.arq_pool.enqueue_job(
'send_email',
to=to,
subject=subject,
body=body,
)
return {'job_id': job.job_id, 'status': 'queued'}
@app.post("/process-payment")
async def process_payment_endpoint(user_id: int, amount: float):
"""Обработка платежа"""
job = await app.state.arq_pool.enqueue_job(
'process_payment',
user_id=user_id,
amount=amount,
)
return {'job_id': job.job_id}
@app.post("/heavy-computation")
async def heavy_computation_endpoint(data: list):
"""Тяжёлая вычислительная задача"""
job = await app.state.arq_pool.enqueue_job(
'heavy_computation',
data=data,
)
return {'job_id': job.job_id}
@app.get("/job-status/{job_id}")
async def get_job_status(job_id: str):
"""Проверка статуса задачи"""
from arq.jobs import Job
job = Job(job_id, pool=app.state.arq_pool)
info = await job.info()
return {
'job_id': job_id,
'status': info.state, # pending, running, complete, failed
'result': info.result if info.state == 'complete' else None,
}
@app.post("/delayed-task")
async def delayed_task_endpoint(delay_seconds: int):
"""Отложенная задача"""
job = await app.state.arq_pool.enqueue_job(
'task_with_retry',
item_id='test',
_defer_until=__import__('datetime').datetime.utcnow() + __import__('datetime').timedelta(seconds=delay_seconds),
)
return {'job_id': job.job_id}# Запуск воркера
arq worker_settings.Settings
# Запуск с логированием
arq worker_settings.Settings --log-level=info
# Burst mode (обработать все задачи и выйти)
arq worker_settings.Settings --burst| Аспект | Celery | ARQ |
|---|---|---|
| Async поддержка | Через eventlet/gevent (костыли) | Нативная asyncio |
| Сложность | Высокая, много настроек | Низкая, простая конфигурация |
| Зрелость | Очень зрелая (с 2009) | Относительно новая |
| Функции | Много: chains, groups, chords, canvas | Базовые: очереди, cron |
| Мониторинг | Flower, celery events | Встроенный dashboard (arq-dashboard) |
| Broker | Redis, RabbitMQ, SQS, др. | Только Redis |
| Размер | Тяжёлая зависимость | Легковесная |
# tasks.py
from celery import Task
from celery.exceptions import Retry
class BaseTask(Task):
autoretry_for = (Exception,)
retry_kwargs = {'max_retries': 3, 'countdown': 60}
retry_backoff = True # Экспоненциальная задержка
retry_backoff_max = 600
retry_jitter = True # Случайная вариация задержки
@celery_app.task(base=BaseTask, bind=True)
def reliable_task(self, data: dict):
"""Надёжная задача с автоматическим retry"""
try:
# Логика...
pass
except TemporaryError as e:
# Явный retry для временных ошибок
raise self.retry(exc=e)
except PermanentError as e:
# Не retry для постоянных ошибок
logger.error(f"Permanent error: {e}")
raise# worker.py
import signal
import sys
from celery.signals import worker_shutting_down
@worker_shutting_down.connect
def graceful_shutdown(**kwargs):
"""Обработка graceful shutdown"""
logger.info("Worker shutting down gracefully...")
# Завершение текущих задач, очистка ресурсов...# monitoring.py
from celery.events.state import State
from celery import Celery
celery_app = Celery(broker='redis://localhost:6379/0')
def get_queue_stats():
"""Статистика очередей"""
inspect = celery_app.control.inspect()
# Активные задачи
active = inspect.active()
# Зарезервированные задачи
reserved = inspect.reserved()
# Статистика воркеров
stats = inspect.stats()
return {
'active': active,
'reserved': reserved,
'stats': stats,
}
def get_task_stats(task_name: str):
"""Статистика конкретной задачи"""
# Через Flower API или celery events
pass# tasks.py
@celery_app.task(rate_limit='10/m')
def rate_limited_task():
"""Задача с rate limit 10 выполнений в минуту"""
pass
# Динамический rate limit
celery_app.control.rate_limit('tasks.send_email', '100/m')# celery_config.py
task_queues = {
'critical': {'exchange': 'critical', 'routing_key': 'critical'},
'default': {'exchange': 'default', 'routing_key': 'default'},
'low': {'exchange': 'low', 'routing_key': 'low'},
}
task_routes = {
'tasks.send_sms': {'queue': 'critical'},
'tasks.send_email': {'queue': 'default'},
'tasks.generate_report': {'queue': 'low'},
}# Запуск воркеров с приоритетами
celery -A celery_config worker -Q critical -n critical@%h
celery -A celery_config worker -Q default -n default@%h
celery -A celery_config worker -Q low -n low@%hПроверьте понимание → ответьте на вопросы в celery_arq.json
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.