Celery, Kafka, RabbitMQ — фоновые задачи, event streaming, паттерны обработки очередей.
Когда синхронная обработка становится узким местом, очереди и асинхронность превращают монолитные блокировки в масштабируемые потоки данных.
Очереди сообщений (Message Queues) — это механизм асинхронной коммуникации между компонентами системы. Они позволяют:
Типичные сценарии использования:
Producer → Queue → Consumer
# Концептуальный пример point-to-point
# Несколько воркеров читают из одной очереди, но каждое сообщение обрабатывает только один
# Worker 1: получает сообщение A
# Worker 2: получает сообщение B
# Worker 1: получает сообщение CProducer → Topic/Exchange → Consumer 1
→ Consumer 2
→ Consumer 3
# Концептуальный пример pub/sub
# Событие "user.created" получают все подписчики:
# EmailService: отправляет welcome email
# AnalyticsService: записывает событие в аналитику
# SearchService: индексирует пользователя| Гарантия | Описание | Когда использовать |
|---|---|---|
| At most once | Сообщение может быть потеряно, но не продублировано | Метрики, логи (потеря отдельных сообщений допустима) |
| At least once | Сообщение будет доставлено, но возможны дубли | Финансовые операции, заказы (дубли обрабатываются идемпотентно) |
| Exactly once | Сообщение доставляется ровно один раз | Критичные транзакции (сложно реализовать, требует идемпотентности на стороне потребителя) |
Важно: "Exactly once" в распределённых системах достигается через идемпотентность потребителя, а не через гарантии брокера.
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Producer │────▶│ Broker │────▶│ Worker │
│ (ваш код) │ │ (RabbitMQ/ │ │ (процесс, │
│ │ │ Redis) │ │ обрабатывающий│
└─────────────┘ └──────────────┘ │ задачи) │
└─────────────┘
│
▼
┌──────────────┐
│ Backend │
│ (результаты) │
└──────────────┘
Компоненты:
# celery_app.py
from celery import Celery
app = Celery(
'tasks',
broker='amqp://guest:guest@localhost:5672/', # RabbitMQ
backend='redis://localhost:6379/0', # Redis для результатов
include=['tasks.user_tasks', 'tasks.email_tasks']
)
# Конфигурация
app.conf.update(
# Сериализация
task_serializer='json',
accept_content=['json'],
result_serializer='json',
# Таймауты
task_ack_late=True,
task_acks_late=True, # Подтверждение после выполнения
task_reject_on_worker_lost=True, # Возврат задачи в очередь при падении воркера
# Retry
task_default_retry_delay=60, # Задержка перед первым retry
task_max_retries=3,
# Rate limiting
worker_prefetch_multiplier=1, # Не брать задачи вперёд
)# tasks/user_tasks.py
from celery_app import app
from datetime import timedelta
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def create_user_profile(self, user_id: int):
"""
Задача создания профиля пользователя.
bind=True даёт доступ к self для retry.
"""
try:
# Имитация работы с БД
user = get_user_from_db(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
# Создание профиля
profile = create_profile_in_db(user_id)
return {"status": "success", "profile_id": profile.id}
except Exception as exc:
# Логирование ошибки
logger.error(f"Error creating profile for user {user_id}: {exc}")
# Retry с экспоненциальной задержкой
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
@app.task
def send_welcome_email(user_id: int, email: str):
"""Задача отправки email — без retry, так как не критично."""
email_service.send(
to=email,
subject="Welcome!",
body=f"Hello, user {user_id}!"
)
return {"status": "email_sent"}
@app.task
def process_user_upload(user_id: int, file_path: str):
"""Обработка загруженного файла с цепочкой задач."""
# Загрузка файла
file_data = load_file(file_path)
# Валидация
if not validate_file(file_data):
raise ValueError("Invalid file format")
# Обработка
result = transform_file(file_data)
# Сохранение результата
save_result(user_id, result)
return {"status": "processed", "result_path": f"/results/{user_id}"}# views.py
from tasks.user_tasks import create_user_profile, send_welcome_email
def register_user(request):
# Синхронная часть — создание пользователя в БД
user = create_user_in_db(request.data)
# Асинхронные задачи — не блокируем ответ
create_user_profile.delay(user.id) # .delay() = .apply_async()
send_welcome_email.delay(user.id, user.email)
# Можно получить результат позже
# task_result = create_user_profile.apply_async(args=[user.id])
# result = task_result.get(timeout=10) # Блокирующий вызов
return {"user_id": user.id, "status": "registered"}# celery_app.py
from celery.schedules import crontab
app.conf.beat_schedule = {
# Каждую полночь
'cleanup-old-sessions': {
'task': 'tasks.cleanup.cleanup_old_sessions',
'schedule': crontab(hour=0, minute=0),
'args': (days_old=30,)
},
# Каждые 5 минут
'send-digest-emails': {
'task': 'tasks.email_tasks.send_digest_emails',
'schedule': timedelta(minutes=5),
},
# Каждый понедельник в 9:00
'generate-weekly-report': {
'task': 'tasks.reports.generate_weekly_report',
'schedule': crontab(hour=9, minute=0, day_of_week=1),
},
}# tasks/cleanup.py
from celery_app import app
@app.task
def cleanup_old_sessions(days_old: int):
"""Удаление старых сессий."""
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days_old)
deleted_count = Session.objects.filter(
created_at__lt=cutoff_date
).delete()[0]
return {"deleted_sessions": deleted_count}# tasks/computation.py
from celery_app import app
import time
@app.task
def heavy_computation(data_size: int):
"""Тяжёлая задача с прогрессом."""
total_steps = data_size // 100
for i in range(total_steps):
# Обновление прогресса
heavy_computation.update_state(
state='PROGRESS',
meta={'current': i, 'total': total_steps, 'percent': i / total_steps * 100}
)
time.sleep(0.1) # Имитация работы
return {"result": "computed", "data_size": data_size}
# views.py
from tasks.computation import heavy_computation
from celery.result import AsyncResult
def start_computation(request):
task = heavy_computation.delay(data_size=10000)
return {"task_id": task.id}
def get_computation_status(request, task_id: str):
result = AsyncResult(task_id, app=heavy_computation.app)
if result.state == 'PENDING':
return {"status": "pending"}
elif result.state == 'PROGRESS':
return {
"status": "in_progress",
"current": result.info.get('current'),
"total": result.info.get('total'),
"percent": result.info.get('percent')
}
elif result.state == 'SUCCESS':
return {"status": "success", "result": result.result}
elif result.state == 'FAILURE':
return {"status": "failed", "error": str(result.info)}from celery import chain, group, chord
# Цепочка: задача1 → задача2 → задача3
pipeline = chain(
create_user_profile.s(user_id),
send_welcome_email.s(user_id, email),
log_registration.s(user_id)
)
result = pipeline.apply_async()
# Группа: параллельное выполнение
parallel_tasks = group(
send_email.s(user_id, email),
send_sms.s(user_id, phone),
push_notification.s(user_id, device_token)
)
results = parallel_tasks.apply_async()
# Хорда: группа + callback после завершения всех
chord_task = chord(
group(validate_file.s(file_id), scan_virus.s(file_id)),
process_approved_file.s(file_id)
)
result = chord_task.apply_async()# celery_app.py
app.conf.task_routes = {
# Тяжёлые задачи — на отдельные воркеры
'tasks.computation.*': {'queue': 'heavy'},
'tasks.reports.*': {'queue': 'reports'},
# Email задачи — на свои воркеры
'tasks.email_tasks.*': {'queue': 'emails'},
# Остальные — в очередь по умолчанию
}
# Запуск воркеров для разных очередей:
# celery -A celery_app worker -Q default -n worker-default@%h
# celery -A celery_app worker -Q heavy -n worker-heavy@%h
# celery -A celery_app worker -Q emails -n worker-emails@%h┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Producer │────▶│ Exchange │────▶│ Queue │
│ │ │ (маршрутиза- │ │ (хранение) │
└─────────────┘ │ ция) │ └─────────────┘
└──────────────┘ │
▼
┌─────────────┐
│ Consumer │
└─────────────┘
Ключевые концепции:
Producer → Exchange (direct) → Queue (routing_key = "order.created")
→ Queue (routing_key = "order.cancelled")
import pika
# Producer
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявление exchange
channel.exchange_declare(exchange='orders', exchange_type='direct')
# Отправка сообщения
channel.basic_publish(
exchange='orders',
routing_key='order.created', # Точное совпадение
body='{"order_id": 123}'
)Producer → Exchange (fanout) → Queue 1
→ Queue 2
→ Queue 3
# Producer
channel.exchange_declare(exchange='notifications', exchange_type='fanout')
channel.basic_publish(
exchange='notifications',
routing_key='', # Игнорируется
body='{"event": "system.maintenance"}'
)
# Consumer 1 (Email Service)
channel.queue_declare(queue='email_notifications')
channel.queue_bind(queue='email_notifications', exchange='notifications')
# Consumer 2 (SMS Service)
channel.queue_declare(queue='sms_notifications')
channel.queue_bind(queue='sms_notifications', exchange='notifications')
# Consumer 3 (Push Service)
channel.queue_declare(queue='push_notifications')
channel.queue_bind(queue='push_notifications', exchange='notifications')Producer → Exchange (topic) → Queue (binding_key = "order.*")
→ Queue (binding_key = "*.created")
→ Queue (binding_key = "order.#")
* — ровно одно слово# — ноль или более слов# Producer
channel.exchange_declare(exchange='events', exchange_type='topic')
# Разные типы событий
channel.basic_publish(exchange='events', routing_key='order.created', body='...')
channel.basic_publish(exchange='events', routing_key='order.cancelled', body='...')
channel.basic_publish(exchange='events', routing_key='user.registered', body='...')
channel.basic_publish(exchange='events', routing_key='payment.completed', body='...')
# Consumer: подписка на все события заказов
channel.queue_declare(queue='order_events')
channel.queue_bind(queue='order_events', exchange='events', routing_key='order.*')
# Consumer: подписка на все события создания
channel.queue_declare(queue='created_events')
channel.queue_bind(queue='created_events', exchange='events', routing_key='*.created')
# Consumer: подписка на все события
channel.queue_declare(queue='all_events')
channel.queue_bind(queue='all_events', exchange='events', routing_key='#')# Объявление durable очереди (переживает перезапуск брокера)
channel.queue_declare(queue='orders', durable=True)
# Отправка persistent сообщения (сохраняется на диск)
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body='{"order_id": 123}',
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
)
)# Consumer с ручным ack
def callback(ch, method, properties, body):
try:
# Обработка сообщения
process_order(body)
# Подтверждение после успешной обработки
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"Error processing message: {e}")
# Отрицательное подтверждение — сообщение вернётся в очередь
# requeue=True — вернуть в ту же очередь
# requeue=False — отправить в dead letter queue
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
# Настройка consumer
channel.basic_consume(
queue='orders',
on_message_callback=callback,
auto_ack=False # Ручное подтверждение!
)DLQ — очередь для сообщений, которые не удалось обработать.
# Настройка DLQ
channel.queue_declare(queue='orders.dlq', durable=True)
# Основная очередь с DLQ
channel.queue_declare(queue='orders', durable=True, arguments={
'x-dead-letter-exchange': '', # Пустой = default exchange
'x-dead-letter-routing-key': 'orders.dlq',
'x-message-ttl': 60000, # 60 секунд TTL
})
# Альтернатива: DLX (Dead Letter Exchange) для централизованной обработки
channel.exchange_declare(exchange='dlx', exchange_type='direct', durable=True)
channel.queue_declare(queue='orders.dlq', durable=True)
channel.queue_bind(queue='orders.dlq', exchange='dlx', routing_key='orders')
channel.queue_declare(queue='orders', durable=True, arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'orders',
})Когда сообщение попадает в DLQ:
requeue=Falsetask_reject_on_worker_lost=True┌─────────────┐ ┌──────────────────────────────────────┐
│ Producer │────▶│ Kafka Cluster │
│ │ │ ┌─────────┐ ┌─────────┐ │
└─────────────┘ │ │ Broker 1│ │ Broker 2│ │
│ │ ┌─────┐ │ │ ┌─────┐ │ │
│ │ │Part0│ │ │ │Part1│ │ Topic │
┌─────────────┐ │ │ ├─────┤ │ │ ├─────┤ │ "orders" │
│ Consumer │◀────│ │ │Part2│ │ │ │Part3│ │ │
│ Group │ │ └───────┘ └───────┘ │ │
└─────────────┘ └──────────────────────────────────────┘
Ключевые концепции:
Topic "orders"
├── Partition 0: [msg0, msg3, msg6, msg9] ← Consumer A
├── Partition 1: [msg1, msg4, msg7, msg10] ← Consumer B
├── Partition 2: [msg2, msg5, msg8, msg11] ← Consumer C
└── Partition 3: [msg12, msg13, msg14] ← (без потребителя)
Важные свойства:
from kafka import KafkaProducer
import json
# Настройка producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
# Надёжность
acks='all', # Ждать подтверждения от всех реплик
retries=5,
retry_backoff_ms=100,
# Производительность
batch_size=16384, # 16KB
linger_ms=5, # Ждать 5ms для заполнения батча
compression_type='gzip',
)
# Отправка сообщения с ключом (для партиционирования)
producer.send(
topic='orders',
key='user_123', # Сообщения с одинаковым ключом → одна партиция
value={
'order_id': 456,
'user_id': 123,
'amount': 99.99,
'timestamp': '2026-03-03T10:00:00Z'
}
)
# Отправка с callback
future = producer.send('orders', value={'event': 'test'})
def on_send_success(record_metadata):
print(f"Sent to {record_metadata.topic}-{record_metadata.partition}@{record_metadata.offset}")
def on_send_error(exc):
print(f"Error: {exc}")
future.add_callback(on_send_success)
future.add_errback(on_send_error)
# Flush и close
producer.flush()
producer.close()from kafka import KafkaConsumer
import json
# Настройка consumer
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
# Consumer group
group_id='order-processor',
# Управление offset
auto_offset_reset='earliest', # или 'latest'
enable_auto_commit=False, # Ручной commit!
# Производительность
max_poll_records=500,
fetch_max_wait_ms=500,
)
# Подписка на топик
consumer.subscribe(['orders'])
try:
for message in consumer:
try:
# Обработка сообщения
process_order(message.value)
# Ручной commit после успешной обработки
consumer.commit()
except Exception as e:
logger.error(f"Error processing message {message.offset}: {e}")
# Не commit — сообщение будет прочитано снова
finally:
consumer.close()# Consumer Group 1: order-processor (основная обработка)
consumer1 = KafkaConsumer(
'orders',
group_id='order-processor',
bootstrap_servers=['localhost:9092']
)
# Consumer Group 2: analytics (аналитика — получает те же сообщения)
consumer2 = KafkaConsumer(
'orders',
group_id='analytics',
bootstrap_servers=['localhost:9092']
)
# Каждая группа получает полную копию сообщений
# Внутри группы сообщения распределяются между потребителямиRebalancing: При добавлении/удалении потребителя происходит ребалансировка партиций.
from kafka import KafkaConsumer
from kafka.consumer.subscription_state import SubscriptionState
consumer = KafkaConsumer(
'orders',
group_id='order-processor',
bootstrap_servers=['localhost:9092'],
# Настройка rebalancing
session_timeout_ms=30000, # Таймаут сессии
heartbeat_interval_ms=10000, # Интервал heartbeat
max_poll_interval_ms=300000, # Макс время между poll()
# Callback при rebalancing
)
class OrderProcessor:
def __init__(self, consumer):
self.consumer = consumer
self.consumer.subscribe(
['orders'],
on_assign=self.on_partitions_assigned,
on_revoke=self.on_partitions_revoked
)
def on_partitions_assigned(self, consumer, partitions):
print(f"Assigned partitions: {partitions}")
# Можно загрузить состояние для новых партиций
def on_partitions_revoked(self, consumer, partitions):
print(f"Revoked partitions: {partitions}")
# Commit offset, очистка ресурсов
consumer.commit()
processor = OrderProcessor(consumer)| Критерий | RabbitMQ | Kafka |
|---|---|---|
| Модель | Message Queue | Event Streaming |
| Доставка | At-least-once (с ack) | At-least-once (с commit) |
| Хранение | До потребления | До retention period (дни/недели) |
| Пропускная способность | ~10K msg/s | ~100K+ msg/s |
| Задержка | Низкая (ms) | Низкая (ms) |
| Партиции | Нет (очереди) | Да (масштабирование) |
| Replay событий | Нет | Да (чтение с любого offset) |
| Use case | Task queues, RPC | Event sourcing, streaming analytics |
import time
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1, exponential_base=2):
"""Декоратор для retry с экспоненциальной задержкой."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt == max_retries:
break
# Экспоненциальная задержка
delay = base_delay * (exponential_base ** attempt)
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# Пример использования
@retry_with_backoff(max_retries=3, base_delay=1, exponential_base=2)
def call_external_api(data):
response = requests.post('https://api.example.com', json=data)
response.raise_for_status()
return response.json()
# Задержки: 1s, 2s, 4simport random
def retry_with_jitter(max_retries=3, base_delay=1, jitter_range=0.5):
"""Retry с jitter для предотвращения thundering herd."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt == max_retries:
break
# Экспоненциальная задержка + jitter
delay = base_delay * (2 ** attempt)
jitter = random.uniform(-jitter_range * delay, jitter_range * delay)
total_delay = max(0, delay + jitter)
logger.warning(f"Attempt {attempt + 1} failed. Retrying in {total_delay:.2f}s")
time.sleep(total_delay)
raise last_exception
return wrapper
return decoratorfrom celery import Celery
from celery.exceptions import Retry
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task(bind=True, max_retries=5)
def process_payment(self, payment_id: int):
"""Обработка платежа с retry."""
try:
payment = get_payment(payment_id)
# Вызов внешнего платёжного шлюза
result = payment_gateway.charge(payment)
return {"status": "success", "transaction_id": result.id}
except PaymentGatewayTimeoutError as e:
# Временная ошибка — retry
raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
except PaymentGatewayUnavailableError as e:
# Сервис недоступен — retry с большей задержкой
raise self.retry(exc=e, countdown=300)
except InvalidPaymentError as e:
# Постоянная ошибка — не retry, сразу в DLQ
logger.error(f"Invalid payment {payment_id}: {e}")
raise # Без retry# tasks/processors.py
from celery import Celery
from celery.exceptions import MaxRetriesExceededError
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: int):
"""Обработка заказа с отправкой в DLQ при неудаче."""
try:
order = get_order(order_id)
validate_order(order)
charge_payment(order)
update_inventory(order)
return {"status": "processed"}
except MaxRetriesExceededError:
# Достигнуто максимальное число retry — в DLQ
send_to_dlq('orders', order_id, str(self.request.exception))
logger.error(f"Order {order_id} sent to DLQ after max retries")
return {"status": "dlq"}
except ValidationError as e:
# Ошибка валидации — не retry, сразу в DLQ
send_to_dlq('orders', order_id, f"Validation error: {e}")
return {"status": "dlq", "reason": "validation"}
except Exception as e:
# Другие ошибки — retry
raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
def send_to_dlq(queue_name: str, message_id: int, error: str):
"""Отправка сообщения в Dead Letter Queue."""
from tasks.dlq import handle_dlq_message
handle_dlq_message.delay({
'original_queue': queue_name,
'message_id': message_id,
'error': error,
'timestamp': datetime.now().isoformat()
})
# tasks/dlq.py
@app.task
def handle_dlq_message(message: dict):
"""Обработка сообщений из DLQ."""
# Логирование для анализа
logger.critical(f"DLQ Message: {message}")
# Уведомление разработчиков
send_alert(f"Message in DLQ: {message['message_id']}")
# Сохранение для последующего анализа
save_dlq_record(message)
# Можно реализовать механику повторной обработки вручную
# если ошибка была временнойИдемпотентность — свойство операции возвращать одинаковый результат при многократном выполнении с теми же входными данными.
Операция f идемпотентна, если: f(f(x)) = f(x)
Почему это важно: При гарантии доставки "at-least-once" сообщения могут дублироваться. Идемпотентный потребитель обрабатывает дубли корректно.
# consumers/order_consumer.py
import hashlib
from redis import Redis
redis = Redis()
def process_order_message(message: dict):
"""Идемпотентная обработка заказа."""
order_id = message['order_id']
# Уникальный ключ идемпотентности
idempotency_key = f"order:processed:{order_id}"
# Проверка: уже обработано?
if redis.exists(idempotency_key):
logger.info(f"Order {order_id} already processed (duplicate)")
return {"status": "duplicate", "order_id": order_id}
# Обработка с атомарной установкой ключа
with redis.lock(f"lock:{idempotency_key}", timeout=30):
# Двойная проверка после получения блокировки
if redis.exists(idempotency_key):
return {"status": "duplicate", "order_id": order_id}
# Основная обработка
result = process_order_internal(order_id)
# Установка ключа идемпотентности с TTL
redis.setex(idempotency_key, 86400 * 7, result) # 7 дней
return result
def process_order_internal(order_id: int):
"""Внутренняя логика обработки заказа."""
order = get_order(order_id)
charge_payment(order)
update_inventory(order)
return {"status": "processed", "order_id": order_id}# consumers/payment_consumer.py
from django.db import transaction, IntegrityError
@transaction.atomic
def process_payment_message(message: dict):
"""Идемпотентная обработка платежа через БД."""
payment_id = message['payment_id']
idempotency_key = message.get('idempotency_key', f"payment:{payment_id}")
try:
# Попытка создать запись идемпотентности
IdempotencyRecord.objects.create(
key=idempotency_key,
status='processing',
created_at=timezone.now()
)
# Основная обработка
payment = get_payment(payment_id)
if payment.status == 'completed':
# Уже обработано
IdempotencyRecord.objects.filter(key=idempotency_key).update(
status='duplicate',
result={'payment_id': payment_id, 'status': 'already_completed'}
)
return {"status": "duplicate"}
# Обработка платежа
result = charge_payment_internal(payment)
# Обновление записи идемпотентности
IdempotencyRecord.objects.filter(key=idempotency_key).update(
status='completed',
result=result
)
return result
except IntegrityError:
# Запись уже существует — дубликат
record = IdempotencyRecord.objects.get(key=idempotency_key)
if record.status == 'completed':
return {"status": "duplicate", "result": record.result}
elif record.status == 'processing':
# Другой воркер обрабатывает — ждём или возвращаем ошибку
raise RetryException("Another worker is processing")
else:
return {"status": "duplicate"}# consumers/inventory_consumer.py
from django.db.models import F
def update_inventory_message(message: dict):
"""Идемпотентное обновление инвентаря через условное обновление."""
product_id = message['product_id']
quantity = message['quantity']
operation_id = message['operation_id'] # Уникальный ID операции
# Условное обновление: только если операция ещё не выполнена
updated = Inventory.objects.filter(
id=product_id,
last_operation_id__lt=operation_id # Или проверка через отдельную таблицу
).update(
quantity=F('quantity') - quantity,
last_operation_id=operation_id
)
if updated == 0:
# Уже обработано или операция устарела
return {"status": "duplicate_or_stale"}
return {"status": "updated"}# tasks/idempotent_tasks.py
from celery import Celery
from celery.exceptions import Ignore
import hashlib
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task(bind=True)
def idempotent_send_email(self, user_id: int, template: str):
"""Идемпотентная задача отправки email."""
# Генерация ключа идемпотентности из аргументов
args_hash = hashlib.md5(
f"{user_id}:{template}".encode()
).hexdigest()
idempotency_key = f"task:send_email:{args_hash}"
# Проверка в Redis
if redis.exists(idempotency_key):
logger.info(f"Duplicate task detected: {idempotency_key}")
raise Ignore() # Не сохранять результат, не retry
# Блокировка для атомарности
with redis.lock(f"lock:{idempotency_key}", timeout=60):
if redis.exists(idempotency_key):
raise Ignore()
# Выполнение задачи
result = send_email_internal(user_id, template)
# Сохранение ключа идемпотентности
redis.setex(idempotency_key, 3600, 'done')
return resultAsync/await — для I/O-bound задач, где код большую часть времени ждёт:
Не для CPU-bound задач — используйте multiprocessing.
import asyncio
import aiohttp
# Async функция (coroutine)
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""Асинхронный HTTP запрос."""
async with session.get(url) as response:
return {
'url': url,
'status': response.status,
'body': await response.text()
}
async def fetch_all_urls(urls: list[str]) -> list[dict]:
"""Параллельная загрузка нескольких URL."""
async with aiohttp.ClientSession() as session:
# Создание задач
tasks = [fetch_url(session, url) for url in urls]
# Параллельное выполнение
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Запуск event loop
if __name__ == '__main__':
urls = [
'https://api.example.com/users/1',
'https://api.example.com/users/2',
'https://api.example.com/users/3',
]
results = asyncio.run(fetch_all_urls(urls))
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"Got {result['status']} from {result['url']}")import asyncio
import asyncpg # Асинхронный PostgreSQL драйвер
async def get_user_orders(user_id: int) -> list[dict]:
"""Асинхронный запрос к PostgreSQL."""
conn = await asyncpg.connect(
host='localhost',
database='mydb',
user='user',
password='pass'
)
try:
rows = await conn.fetch(
'SELECT * FROM orders WHERE user_id = $1',
user_id
)
return [dict(row) for row in rows]
finally:
await conn.close()
async def process_users_batch(user_ids: list[int]) -> list[dict]:
"""Параллельная обработка нескольких пользователей."""
tasks = [get_user_orders(uid) for uid in user_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
return resultsCelery не поддерживает async задачи напрямую. Паттерны интеграции:
# Pattern 1: Sync wrapper для async кода
@app.task
def process_with_async(user_id: int):
"""Celery задача, вызывающая async код."""
import asyncio
async def async_work(uid):
# Async логика
await fetch_data(uid)
await process_data(uid)
return {"status": "done"}
# Запуск event loop в sync контексте
return asyncio.run(async_work(user_id))
# Pattern 2: Отдельный async worker
# FastAPI endpoint запускает задачу, async worker обрабатываетimport asyncio
async def robust_fetch(session, url: str, retries: int = 3):
"""Асинхронный запрос с retry."""
last_exception = None
for attempt in range(retries):
try:
async with session.get(url, timeout=10) as response:
response.raise_for_status()
return await response.text()
except asyncio.TimeoutError as e:
last_exception = e
await asyncio.sleep(2 ** attempt) # Exponential backoff
except aiohttp.ClientError as e:
last_exception = e
await asyncio.sleep(2 ** attempt)
raise last_exception
async def fetch_with_timeout(session, url: str, timeout: float = 5.0):
"""Запрос с таймаутом."""
try:
async with asyncio.timeout(timeout):
return await robust_fetch(session, url)
except asyncio.TimeoutError:
logger.error(f"Timeout fetching {url}")
return None# celery_app.py
from celery import Celery
app = Celery(
'ecommerce',
broker='amqp://guest:guest@rabbitmq:5672/',
backend='redis://redis:6379/0',
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_acks_late=True,
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1,
)
# tasks/orders.py
from celery_app import app
from celery.exceptions import MaxRetriesExceededError
import hashlib
from redis import Redis
redis = Redis()
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def create_order(self, user_id: int, items: list[dict], total: float):
"""Создание заказа с идемпотентностью."""
# Ключ идемпотентности
idempotency_key = hashlib.md5(
f"{user_id}:{items}:{total}".encode()
).hexdigest()
cache_key = f"order:create:{idempotency_key}"
# Проверка дубликата
if redis.exists(cache_key):
order_id = redis.get(cache_key)
return {"status": "duplicate", "order_id": int(order_id)}
try:
# Создание заказа
order = Order.objects.create(
user_id=user_id,
total=total,
status='pending'
)
for item in items:
OrderItem.objects.create(
order=order,
product_id=item['product_id'],
quantity=item['quantity'],
price=item['price']
)
# Сохранение ключа идемпотентности
redis.setex(cache_key, 3600, order.id)
# Запуск цепочки задач
process_order_chain.delay(order.id)
return {"status": "created", "order_id": order.id}
except Exception as e:
raise self.retry(exc=e)
@app.task
def process_order_chain(order_id: int):
"""Цепочка обработки заказа."""
from celery import chain
workflow = chain(
reserve_inventory.s(order_id),
charge_payment.s(order_id),
send_confirmation_email.s(order_id),
notify_warehouse.s(order_id)
)
workflow.apply_async()
@app.task(bind=True, max_retries=5)
def reserve_inventory(self, order_id: int):
"""Резервирование инвентаря."""
try:
order = Order.objects.get(id=order_id)
for item in order.items.all():
product = Product.objects.select_for_update().get(id=item.product_id)
if product.stock < item.quantity:
raise InventoryError(f"Insufficient stock for {product.name}")
product.stock -= item.quantity
product.save()
order.status = 'inventory_reserved'
order.save()
return {"status": "inventory_reserved"}
except InventoryError as e:
# Критическая ошибка — не retry
order.status = 'failed'
order.save()
raise
except Exception as e:
raise self.retry(exc=e)
@app.task(bind=True, max_retries=3)
def charge_payment(self, order_id: int):
"""Списание платежа."""
try:
order = Order.objects.get(id=order_id)
# Вызов платёжного шлюза
result = payment_gateway.charge(
amount=order.total,
card_token=order.card_token
)
order.payment_id = result.transaction_id
order.status = 'paid'
order.save()
return {"status": "paid", "transaction_id": result.transaction_id}
except PaymentTimeoutError as e:
raise self.retry(exc=e, countdown=120)
except PaymentError as e:
order.status = 'payment_failed'
order.save()
raise
@app.task
def send_confirmation_email(order_id: int):
"""Отправка confirmation email."""
order = Order.objects.get(id=order_id)
user = order.user
email_service.send(
to=user.email,
subject=f"Order #{order.id} confirmed",
template='order_confirmation',
context={'order': order}
)
return {"status": "email_sent"}
@app.task
def notify_warehouse(order_id: int):
"""Уведомление склада."""
order = Order.objects.get(id=order_id)
warehouse_service.notify_shipment(
order_id=order.id,
items=[
{"product_id": i.product_id, "quantity": i.quantity}
for i in order.items.all()
],
shipping_address=order.shipping_address
)
order.status = 'notified'
order.save()
return {"status": "warehouse_notified"}# consumers/analytics_consumer.py
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AnalyticsConsumer:
def __init__(self):
self.consumer = KafkaConsumer(
'orders',
'payments',
'user_events',
bootstrap_servers=['kafka:9092'],
group_id='analytics-processor',
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
max_poll_records=500,
)
self.consumer.subscribe(
['orders', 'payments', 'user_events'],
on_assign=self.on_partitions_assigned,
on_revoke=self.on_partitions_revoked
)
def on_partitions_assigned(self, consumer, partitions):
logger.info(f"Assigned partitions: {partitions}")
def on_partitions_revoked(self, consumer, partitions):
logger.info(f"Revoked partitions: {partitions}")
# Commit перед ребалансировкой
consumer.commit()
def process_message(self, message):
topic = message.topic
data = message.value
try:
if topic == 'orders':
self.process_order_event(data)
elif topic == 'payments':
self.process_payment_event(data)
elif topic == 'user_events':
self.process_user_event(data)
# Commit после успешной обработки
self.consumer.commit()
except Exception as e:
logger.error(f"Error processing message from {topic}: {e}")
# Не commit — сообщение будет обработано снова
def process_order_event(self, data: dict):
"""Обработка события заказа."""
# Сохранение в аналитическую БД
analytics_db.insert('orders', {
'order_id': data['order_id'],
'user_id': data['user_id'],
'total': data['total'],
'timestamp': data['timestamp'],
'processed_at': datetime.now().isoformat()
})
# Обновление real-time дашборда
dashboard.update_metric('total_orders', 1)
def process_payment_event(self, data: dict):
"""Обработка события платежа."""
analytics_db.insert('payments', {
'payment_id': data['payment_id'],
'order_id': data['order_id'],
'amount': data['amount'],
'status': data['status'],
'timestamp': data['timestamp']
})
def process_user_event(self, data: dict):
"""Обработка пользовательского события."""
analytics_db.insert('user_events', data)
def run(self):
"""Запуск consumer loop."""
logger.info("Starting analytics consumer...")
try:
for message in self.consumer:
self.process_message(message)
except KeyboardInterrupt:
logger.info("Shutting down...")
finally:
self.consumer.close()
if __name__ == '__main__':
consumer = AnalyticsConsumer()
consumer.run()| Сценарий | Рекомендация |
|---|---|
| Фоновые задачи в Django/FastAPI | Celery + RabbitMQ/Redis |
| Микросервисная коммуникация | RabbitMQ (RPC) или Kafka (events) |
| Event sourcing, audit log | Kafka |
| Real-time аналитика | Kafka + Kafka Streams |
| Отложенные задачи, cron | Celery Beat |
| Высокая пропускная способность | Kafka |
| Сложная маршрутизация | RabbitMQ (topic exchange) |
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.