Интеграция Kafka с веб-приложениями: FastAPI, Django, Celery, фоновые задачи, event-driven архитектура.
Интеграция Kafka с веб-приложениями enables event-driven архитектуру. В этой теме вы изучите интеграцию с FastAPI, Django, Celery и паттерны event-driven приложений.
from fastapi import FastAPI, BackgroundTasks
from confluent_kafka import Producer
from pydantic import BaseModel
import json
app = FastAPI()
# Kafka Producer
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all'
})
class OrderCreate(BaseModel):
user_id: int
amount: float
items: list[str]
@app.post("/orders")
async def create_order(order: OrderCreate):
"""Создание заказа с отправкой в Kafka"""
order_data = {
'user_id': order.user_id,
'amount': order.amount,
'items': order.items
}
# Отправка в Kafka
producer.produce(
topic='orders',
key=f'order_{order.user_id}'.encode(),
value=json.dumps(order_data).encode('utf-8')
)
producer.flush()
return {'status': 'created', 'order': order_data}
@app.get("/health")
async def health_check():
"""Проверка здоровья"""
return {'status': 'healthy'}from fastapi import FastAPI, BackgroundTasks
from confluent_kafka import Producer
import logging
logger = logging.getLogger(__name__)
app = FastAPI()
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all',
'retries': 3
})
def delivery_callback(err, msg):
if err:
logger.error(f'Message delivery failed: {err}')
else:
logger.info(f'Message delivered to {msg.topic()} [{msg.partition()}]')
def send_to_kafka(topic: str, key: str, value: dict):
"""Фоновая отправка в Kafka"""
producer.produce(
topic=topic,
key=key.encode(),
value=json.dumps(value).encode('utf-8'),
callback=delivery_callback
)
producer.poll(0) # Обработка callbacks
@app.post("/orders", status_code=202)
async def create_order_async(
order: OrderCreate,
background_tasks: BackgroundTasks
):
"""Асинхронное создание заказа"""
order_data = order.dict()
# Отправка в background
background_tasks.add_task(
send_to_kafka,
'orders',
f'order_{order.user_id}',
order_data
)
return {'status': 'accepted', 'order_id': order.user_id}import asyncio
from confluent_kafka import Consumer, KafkaError
from fastapi import FastAPI
app = FastAPI()
consumer = None
consumer_task = None
async def consume_messages():
"""Фоновая задача для потребления сообщений"""
global consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'fastapi-consumer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
})
consumer.subscribe(['orders'])
while True:
try:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise msg.error()
# Обработка сообщения
await process_order(msg.value())
# Commit offset
consumer.commit(msg)
except Exception as e:
logger.error(f'Error consuming message: {e}')
async def process_order(data: bytes):
"""Обработка заказа"""
order = json.loads(data.decode('utf-8'))
# Бизнес-логика
logger.info(f'Processed order: {order}')
@app.on_event("startup")
async def startup_event():
"""Запуск consumer при старте приложения"""
global consumer_task
consumer_task = asyncio.create_task(consume_messages())
@app.on_event("shutdown")
async def shutdown_event():
"""Остановка consumer при завершении"""
global consumer, consumer_task
if consumer:
consumer.close()
if consumer_task:
consumer_task.cancel()
try:
await consumer_task
except asyncio.CancelledError:
pass# myapp/management/commands/consume_kafka.py
from django.core.management.base import BaseCommand
from confluent_kafka import Consumer, KafkaError
import json
import logging
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'Consume messages from Kafka'
def add_arguments(self, parser):
parser.add_argument(
'--topic',
type=str,
default='orders',
help='Kafka topic to consume'
)
def handle(self, *args, **options):
topic = options['topic']
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'django-consumer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
})
consumer.subscribe([topic])
self.stdout.write(f'Starting consumer for topic: {topic}')
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise msg.error()
# Обработка сообщения
self.process_message(msg)
# Commit offset
consumer.commit(msg)
except KeyboardInterrupt:
self.stdout.write('Shutting down consumer...')
finally:
consumer.close()
def process_message(self, msg):
"""Обработка сообщения"""
data = json.loads(msg.value().decode('utf-8'))
# Django ORM operations
from myapp.models import Order
Order.objects.create(
user_id=data['user_id'],
amount=data['amount'],
status='pending'
)
self.stdout.write(f'Created order: {data["user_id"]}')# myapp/signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from confluent_kafka import Producer
from myapp.models import Order
import json
producer = Producer({'bootstrap.servers': 'localhost:9092'})
@receiver(post_save, sender=Order)
def order_created_signal(sender, instance, created, **kwargs):
"""Отправка события в Kafka при создании заказа"""
if created:
event = {
'event_type': 'order_created',
'order_id': instance.id,
'user_id': instance.user_id,
'amount': float(instance.amount)
}
producer.produce(
topic='order-events',
key=f'order_{instance.id}'.encode(),
value=json.dumps(event).encode('utf-8')
)
producer.poll(0)
# myapp/apps.py
class MyAppConfig(AppConfig):
name = 'myapp'
def ready(self):
import myapp.signals# myapp/middleware.py
import uuid
from contextvars import ContextVar
from confluent_kafka import Producer
correlation_id_var = ContextVar('correlation_id', default='')
class KafkaTracingMiddleware:
def __init__(self, get_response):
self.get_response = get_response
self.producer = Producer({'bootstrap.servers': 'localhost:9092'})
def __call__(self, request):
# Извлечение или генерация correlation ID
correlation_id = request.headers.get(
'X-Correlation-ID',
str(uuid.uuid4())
)
correlation_id_var.set(correlation_id)
# Добавление в response headers
response = self.get_response(request)
response['X-Correlation-ID'] = correlation_id
return response
def send_event(topic: str, event: dict):
"""Отправка события с correlation ID"""
correlation_id = correlation_id_var.get()
producer.produce(
topic=topic,
key=event.get('id', '').encode(),
value=json.dumps(event).encode('utf-8'),
headers={
'correlation_id': correlation_id.encode('utf-8')
}
)
producer.poll(0)# proj/celery.py
from celery import Celery
app = Celery('proj', broker='redis://localhost:6379/0')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# proj/tasks.py
from celery import shared_task
from confluent_kafka import Producer
import json
kafka_producer = Producer({'bootstrap.servers': 'localhost:9092'})
@shared_task
def process_order_task(order_data: dict):
"""Celery task для обработки заказа"""
# Бизнес-логика
print(f'Processing order: {order_data}')
# Отправка результата в Kafka
kafka_producer.produce(
topic='processed-orders',
key=f'order_{order_data["order_id"]}'.encode(),
value=json.dumps({
'status': 'processed',
'order_id': order_data['order_id']
}).encode('utf-8')
)
kafka_producer.flush()
return {'status': 'success'}
# proj/kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
from proj.tasks import process_order_task
import json
def consume_and_dispatch():
"""Потребление из Kafka и dispatch в Celery"""
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'celery-kafka-consumer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
})
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise msg.error()
# Dispatch в Celery
order_data = json.loads(msg.value().decode('utf-8'))
process_order_task.delay(order_data)
# Commit offset
consumer.commit(msg)
except KeyboardInterrupt:
pass
finally:
consumer.close()# proj/settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'send-daily-report': {
'task': 'proj.tasks.send_daily_report',
'schedule': crontab(hour=23, minute=0),
},
'cleanup-old-orders': {
'task': 'proj.tasks.cleanup_old_orders',
'schedule': crontab(hour=3, minute=0),
},
}
# proj/tasks.py
from celery import shared_task
from confluent_kafka import Producer
import json
from datetime import datetime
kafka_producer = Producer({'bootstrap.servers': 'localhost:9092'})
@shared_task
def send_daily_report():
"""Отправка ежедневного отчета в Kafka"""
report = {
'report_type': 'daily_summary',
'date': datetime.utcnow().isoformat(),
'total_orders': 100,
'total_revenue': 10000.00
}
kafka_producer.produce(
topic='reports',
key=f'daily_{datetime.utcnow().date()}'.encode(),
value=json.dumps(report).encode('utf-8')
)
kafka_producer.flush()from dataclasses import dataclass
from datetime import datetime
from confluent_kafka import Producer, Consumer
import json
@dataclass
class Event:
event_id: str
event_type: str
aggregate_id: str
timestamp: str
data: dict
class EventStore:
def __init__(self):
self.producer = Producer({'bootstrap.servers': 'localhost:9092'})
self.consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'event-store',
'auto.offset.reset': 'earliest'
})
def append(self, event: Event):
"""Добавление события в event store"""
self.producer.produce(
topic='events',
key=event.aggregate_id.encode(),
value=json.dumps({
'event_id': event.event_id,
'event_type': event.event_type,
'aggregate_id': event.aggregate_id,
'timestamp': event.timestamp,
'data': event.data
}).encode('utf-8')
)
self.producer.flush()
def get_events(self, aggregate_id: str) -> list[Event]:
"""Получение всех событий для aggregate"""
events = []
self.consumer.subscribe(['events'])
for _ in range(1000): # Ограничение для примера
msg = self.consumer.poll(timeout=0.1)
if msg is None:
break
if msg.error():
continue
data = json.loads(msg.value().decode('utf-8'))
if data['aggregate_id'] == aggregate_id:
events.append(Event(**data))
return events
# Использование
class OrderAggregate:
def __init__(self, order_id: str):
self.order_id = order_id
self.status = 'created'
self.amount = 0.0
def apply_event(self, event: Event):
"""Применение события к агрегату"""
if event.event_type == 'OrderCreated':
self.status = 'created'
self.amount = event.data['amount']
elif event.event_type == 'OrderPaid':
self.status = 'paid'
elif event.event_type == 'OrderShipped':
self.status = 'shipped'
@classmethod
def load(cls, order_id: str, event_store: EventStore):
"""Загрузка агрегата из событий"""
aggregate = cls(order_id)
events = event_store.get_events(order_id)
for event in events:
aggregate.apply_event(event)
return aggregatefrom dataclasses import dataclass
from confluent_kafka import Consumer, KafkaError
import json
# Command side
@dataclass
class CreateOrderCommand:
user_id: int
amount: float
class CommandHandler:
def __init__(self):
self.producer = Producer({'bootstrap.servers': 'localhost:9092'})
def handle(self, command: CreateOrderCommand):
# Валидация и бизнес-логика
event = {
'event_type': 'OrderCreated',
'user_id': command.user_id,
'amount': command.amount
}
self.producer.produce(
topic='order-events',
key=f'order_{command.user_id}'.encode(),
value=json.dumps(event).encode('utf-8')
)
self.producer.flush()
# Query side
class QueryHandler:
def __init__(self):
self.read_model = {} # В production использовать БД
self.consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'query-handler',
'auto.offset.reset': 'earliest'
})
self.consumer.subscribe(['order-events'])
# Build read model
self._build_read_model()
def _build_read_model(self):
"""Построение read модели из событий"""
while True:
msg = self.consumer.poll(timeout=0.1)
if msg is None:
break
if msg.error():
continue
event = json.loads(msg.value().decode('utf-8'))
if event['event_type'] == 'OrderCreated':
user_id = event['user_id']
if user_id not in self.read_model:
self.read_model[user_id] = []
self.read_model[user_id].append(event)
def get_orders_by_user(self, user_id: int) -> list:
"""Получение заказов пользователя"""
return self.read_model.get(user_id, [])from enum import Enum
from confluent_kafka import Producer, Consumer
import json
class SagaStep(Enum):
CREATE_ORDER = 'create_order'
RESERVE_INVENTORY = 'reserve_inventory'
PROCESS_PAYMENT = 'process_payment'
SHIP_ORDER = 'ship_order'
class OrderSaga:
def __init__(self):
self.producer = Producer({'bootstrap.servers': 'localhost:9092'})
self.current_step = None
self.compensating_transactions = []
def execute(self, order_data: dict):
"""Выполнение саги"""
try:
# Step 1: Create Order
self._create_order(order_data)
# Step 2: Reserve Inventory
self._reserve_inventory(order_data)
# Step 3: Process Payment
self._process_payment(order_data)
# Step 4: Ship Order
self._ship_order(order_data)
# Saga completed
self._publish_event('OrderSagaCompleted', order_data)
except Exception as e:
# Compensation
self._compensate(order_data)
self._publish_event('OrderSagaFailed', {
**order_data,
'error': str(e)
})
def _create_order(self, order_data: dict):
self.current_step = SagaStep.CREATE_ORDER
self.producer.produce(
topic='orders',
value=json.dumps(order_data).encode()
)
self.compensating_transactions.append(
lambda: self._cancel_order(order_data)
)
self.producer.flush()
def _reserve_inventory(self, order_data: dict):
self.current_step = SagaStep.RESERVE_INVENTORY
self.producer.produce(
topic='inventory-reservations',
value=json.dumps(order_data).encode()
)
self.compensating_transactions.append(
lambda: self._release_inventory(order_data)
)
self.producer.flush()
def _process_payment(self, order_data: dict):
self.current_step = SagaStep.PROCESS_PAYMENT
self.producer.produce(
topic='payments',
value=json.dumps(order_data).encode()
)
self.compensating_transactions.append(
lambda: self._refund_payment(order_data)
)
self.producer.flush()
def _ship_order(self, order_data: dict):
self.current_step = SagaStep.SHIP_ORDER
self.producer.produce(
topic='shipments',
value=json.dumps(order_data).encode()
)
self.producer.flush()
def _compensate(self, order_data: dict):
"""Выполнение компенсирующих транзакций в обратном порядке"""
for compensate in reversed(self.compensating_transactions):
try:
compensate()
except Exception as e:
print(f'Compensation failed: {e}')
def _cancel_order(self, order_data: dict):
self.producer.produce(
topic='order-cancellations',
value=json.dumps(order_data).encode()
)
self.producer.flush()
def _release_inventory(self, order_data: dict):
self.producer.produce(
topic='inventory-releases',
value=json.dumps(order_data).encode()
)
self.producer.flush()
def _refund_payment(self, order_data: dict):
self.producer.produce(
topic='refunds',
value=json.dumps(order_data).encode()
)
self.producer.flush()
def _publish_event(self, event_type: str, data: dict):
self.producer.produce(
topic='saga-events',
value=json.dumps({
'event_type': event_type,
'data': data
}).encode()
)
self.producer.flush()from confluent_kafka import Producer, Consumer
from contextlib import contextmanager
from queue import Queue
import threading
class KafkaConnectionPool:
def __init__(self, config: dict, pool_size: int = 5):
self.config = config
self.pool_size = pool_size
self._pool = Queue(maxsize=pool_size)
self._lock = threading.Lock()
# Initialize pool
for _ in range(pool_size):
self._pool.put(self._create_producer())
def _create_producer(self):
return Producer(self.config)
@contextmanager
def get_producer(self):
producer = self._pool.get()
try:
yield producer
finally:
self._pool.put(producer)
def close_all(self):
while not self._pool.empty():
producer = self._pool.get()
producer.flush()
# Использование
pool = KafkaConnectionPool({
'bootstrap.servers': 'localhost:9092'
})
with pool.get_producer() as producer:
producer.produce('orders', value=b'data')
producer.flush()from fastapi import FastAPI, HTTPException
from confluent_kafka import Producer, Consumer, KafkaError
import time
app = FastAPI()
kafka_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'health-check'
}
@app.get("/health/kafka")
async def kafka_health_check():
"""Проверка подключения к Kafka"""
try:
# Producer check
producer = Producer({'bootstrap.servers': kafka_config['bootstrap.servers']})
producer.produce('health-check', value=b'ping')
producer.flush(timeout=5)
# Consumer check
consumer = Consumer({
**kafka_config,
'auto.offset.reset': 'latest'
})
consumer.subscribe(['health-check'])
msg = consumer.poll(timeout=5)
consumer.close()
if msg and not msg.error():
return {'status': 'healthy', 'kafka': 'connected'}
raise HTTPException(status_code=503, detail='Kafka health check failed')
except Exception as e:
raise HTTPException(status_code=503, detail=str(e))Поздравляем! Вы завершили курс по Kafka на Python. Вы изучили:
Теперь вы готовы строить надёжные event-driven системы на Kafka!
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.