Kafka, RabbitMQ, событийная архитектура, паттерны
Событийная архитектура для масштабируемых приложений.
Event-driven архитектура — сервисы общаются через события (events). Производитель публикует событие, потребители реагируют.
Order Service → [OrderCreated] → Kafka → Payment Service
→ Email Service
→ Notification Service
docker run -d --name kafka -p 9092:9092 confluentinc/cp-kafkafrom aiokafka import AIOKafkaProducer
import json
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode()
)
await producer.start()
await producer.send(
'orders',
{'order_id': 123, 'user_id': 1, 'total': 99.99}
)
await producer.stop()from aiokafka import AIOKafkaConsumer
import json
consumer = AIOKafkaConsumer(
'orders',
bootstrap_servers='localhost:9092',
value_deserializer=lambda m: json.loads(m.decode()),
group_id='payment-service'
)
await consumer.start()
async for message in consumer:
order = message.value
await process_order(order)
await consumer.stop()from fastapi import FastAPI
from aiokafka import AIOKafkaProducer
app = FastAPI()
producer = AIOKafkaProducer(bootstrap_servers='kafka:9092')
@app.on_event("startup")
async def startup():
await producer.start()
@app.on_event("shutdown")
async def shutdown():
await producer.stop()
@app.post('/orders')
async def create_order(order: OrderCreate):
# Создание заказа
order = await orders_service.create(order)
# Публикация события
await producer.send('orders', {
'event': 'OrderCreated',
'order_id': order.id,
'user_id': order.user_id
})
return orderfrom fastapi import FastAPI
from aiokafka import AIOKafkaConsumer
import asyncio
app = FastAPI()
consumer = AIOKafkaConsumer(
'orders',
bootstrap_servers='kafka:9092',
group_id='email-service'
)
@app.on_event("startup")
async def startup():
await consumer.start()
asyncio.create_task(consume_events())
@app.on_event("shutdown")
async def shutdown():
await consumer.stop()
async def consume_events():
async for message in consumer:
event = message.value
if event['event'] == 'OrderCreated':
await send_order_email(event)docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:managementimport aio_pika
async def publish_order(order: dict):
connection = await aio_pika.connect_robust("amqp://guest:guest@rabbitmq/")
async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange('orders', aio_pika.ExchangeType.DIRECT)
queue = await channel.declare_queue('orders_queue')
await exchange.publish(
aio_pika.Message(body=json.dumps(order).encode()),
routing_key='order.created'
)async def consume():
connection = await aio_pika.connect_robust("amqp://guest:guest@rabbitmq/")
async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange('orders', aio_pika.ExchangeType.DIRECT)
queue = await channel.declare_queue('orders_queue')
await queue.bind(exchange, routing_key='order.created')
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
order = json.loads(message.body)
await process_order(order)class EventStore:
def __init__(self):
self.events = []
def append(self, event: dict):
self.events.append({
'id': len(self.events),
'timestamp': datetime.utcnow(),
**event
})
def get_events(self, aggregate_id: str):
return [e for e in self.events if e.get('aggregate_id') == aggregate_id]
# Восстановление состояния из событий
def rebuild_state(events: list):
state = {}
for event in events:
if event['type'] == 'OrderCreated':
state = {'status': 'created', **event['data']}
elif event['type'] == 'OrderPaid':
state['status'] = 'paid'
elif event['type'] == 'OrderShipped':
state['status'] = 'shipped'
return state# Write model (commands)
class OrderCommand:
def create(self, data: dict):
# Валидация, бизнес-логика
event = {'type': 'OrderCreated', 'data': data}
event_store.append(event)
# Read model (queries)
class OrderQuery:
def get_order(self, order_id: str):
# Оптимизированный read model
return read_db.query(f"SELECT * FROM orders WHERE id = {order_id}")# Событие содержит все необходимые данные
{
'event': 'OrderCreated',
'order_id': 123,
'user_id': 1,
'total': 99.99,
'items': [...],
'shipping_address': {...}
}consumer = AIOKafkaConsumer(
'orders',
bootstrap_servers='kafka:9092',
group_id='payment-service'
)
async for message in consumer:
try:
await process_order(message.value)
await consumer.commit()
except Exception as e:
# Отправка в DLQ
await producer.send('orders-dlq', {
'error': str(e),
'original': message.value
})Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.