Event sourcing, CQRS, event storming, реактивные системы, паттерны event-driven архитектуры.
Event-Driven Architecture (EDA) — это архитектурный паттерн, где компоненты системы общаются через события. Событие — это факт, что что-то произошло в прошлом. EDA обеспечивает слабую связанность, масштабируемость и возможность эволюции системы без breaking changes.
Традиционная синхронная архитектура (REST, gRPC) создаёт жёсткую связанность между сервисами:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │ ────►│ Service A │ ────►│ Service B │
│ │ ◄────│ │ ◄────│ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└──────────────────────┴──────────────────────┘
Синхронные вызовы, блокировки
Проблемы синхронной архитектуры:
Event-Driven Architecture решает эти проблемы через асинхронную коммуникацию:
┌─────────────┐ ┌─────────────────────────┐ ┌─────────────┐
│ Service A │ ────►│ Event Broker │ ◄────│ Service B │
│ (Producer) │ │ (Kafka/RabbitMQ) │ │ (Consumer) │
└─────────────┘ │ │ └─────────────┘
│ ┌─────────────────┐ │
│ │ Event Store │ │ ┌─────────────┐
│ │ │ │ ◄────│ Service C │
│ └─────────────────┘ │ │ (Consumer) │
└─────────────────────────┘ └─────────────┘
Преимущества EDA:
Event Sourcing — это паттерн, где состояние системы определяется как последовательность событий. Вместо хранения текущего состояния, мы сохраняем все изменения как события.
Традиционный подход (CRUD):
# Традиционное хранение состояния
class Order:
def __init__(self, id: int, status: str, total: float):
self.id = id
self.status = status # Текущее состояние
self.total = total
# В БД сохраняется только текущее состояние
# UPDATE orders SET status = 'shipped' WHERE id = 123
# История изменений теряетсяEvent Sourcing подход:
# События вместо состояния
class OrderCreated:
def __init__(self, order_id: int, user_id: int, items: list, total: float):
self.order_id = order_id
self.user_id = user_id
self.items = items
self.total = total
self.timestamp = datetime.utcnow()
class OrderStatusChanged:
def __init__(self, order_id: int, old_status: str, new_status: str):
self.order_id = order_id
self.old_status = old_status
self.new_status = new_status
self.timestamp = datetime.utcnow()
class OrderShipped:
def __init__(self, order_id: int, tracking_number: str):
self.order_id = order_id
self.tracking_number = tracking_number
self.timestamp = datetime.utcnow()
# В Event Store сохраняются ВСЕ события
# SELECT * FROM events WHERE aggregate_id = 123 ORDER BY version
# Состояние вычисляется применением всех событийEvent Store — это специализированная база данных для хранения событий.
# event_store/store.py
from dataclasses import dataclass, field
from typing import Any, Optional, List
from datetime import datetime
import json
import asyncpg
import uuid
@dataclass
class StoredEvent:
"""Событие, сохранённое в Event Store"""
event_id: str
event_type: str
aggregate_id: str
aggregate_type: str
version: int
data: dict
metadata: dict
timestamp: datetime
def to_json(self) -> str:
return json.dumps({
"event_id": self.event_id,
"event_type": self.event_type,
"aggregate_id": self.aggregate_id,
"aggregate_type": self.aggregate_type,
"version": self.version,
"data": self.data,
"metadata": self.metadata,
"timestamp": self.timestamp.isoformat()
})
@classmethod
def from_row(cls, row: asyncpg.Record) -> "StoredEvent":
return cls(
event_id=row["event_id"],
event_type=row["event_type"],
aggregate_id=row["aggregate_id"],
aggregate_type=row["aggregate_type"],
version=row["version"],
data=row["data"],
metadata=row["metadata"],
timestamp=row["timestamp"]
)
class EventStore:
"""
Event Store для хранения и чтения событий.
Гарантирует атомарность записи событий для одного агрегата.
"""
def __init__(self, dsn: str):
self.dsn = dsn
self._pool: Optional[asyncpg.Pool] = None
async def connect(self):
"""Подключение к БД"""
self._pool = await asyncpg.create_pool(self.dsn)
await self._init_schema()
async def close(self):
"""Закрытие подключения"""
if self._pool:
await self._pool.close()
async def _init_schema(self):
"""Инициализация схемы Event Store"""
async with self._pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS events (
event_id UUID PRIMARY KEY,
event_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
aggregate_type TEXT NOT NULL,
version INTEGER NOT NULL,
data JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Уникальность версии для агрегата (optimistic locking)
UNIQUE (aggregate_id, version)
)
-- Индексы для быстрого поиска
CREATE INDEX IF NOT EXISTS idx_events_aggregate
ON events (aggregate_id, version);
CREATE INDEX IF NOT EXISTS idx_events_type
ON events (event_type);
CREATE INDEX IF NOT EXISTS idx_events_timestamp
ON events (timestamp);
""")
async def append_events(
self,
aggregate_id: str,
aggregate_type: str,
expected_version: int,
events: List[Any]
) -> List[StoredEvent]:
"""
Добавление событий к агрегату.
Args:
aggregate_id: ID агрегата
aggregate_type: Тип агрегата
expected_version: Ожидаемая версия (для optimistic locking)
events: Список событий для добавления
Returns:
Список сохранённых событий
Raises:
ConcurrencyError: Если версия не совпадает (конфликт)
"""
class ConcurrencyError(Exception):
pass
async with self._pool.acquire() as conn:
async with conn.transaction():
# Проверка ожидаемой версии (optimistic locking)
if expected_version != -1: # -1 означает новый агрегат
current_version = await conn.fetchval(
"""
SELECT MAX(version) FROM events
WHERE aggregate_id = $1
""",
aggregate_id
)
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, "
f"got {current_version}"
)
# Сохранение событий
stored_events = []
for i, event in enumerate(events):
version = expected_version + i + 1
stored_event = StoredEvent(
event_id=str(uuid.uuid4()),
event_type=event.__class__.__name__,
aggregate_id=aggregate_id,
aggregate_type=aggregate_type,
version=version,
data=event.__dict__,
metadata={"causation_id": str(uuid.uuid4())},
timestamp=datetime.utcnow()
)
await conn.execute(
"""
INSERT INTO events
(event_id, event_type, aggregate_id, aggregate_type,
version, data, metadata, timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
""",
stored_event.event_id,
stored_event.event_type,
stored_event.aggregate_id,
stored_event.aggregate_type,
stored_event.version,
stored_event.data,
stored_event.metadata,
stored_event.timestamp
)
stored_events.append(stored_event)
return stored_events
async def get_events(
self,
aggregate_id: str,
from_version: int = 0
) -> List[StoredEvent]:
"""
Получение событий агрегата.
Args:
aggregate_id: ID агрегата
from_version: Начальная версия (для инкрементальной загрузки)
Returns:
Список событий, отсортированных по версии
"""
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT event_id, event_type, aggregate_id, aggregate_type,
version, data, metadata, timestamp
FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC
""",
aggregate_id,
from_version
)
return [StoredEvent.from_row(row) for row in rows]
async def get_all_streams(self) -> List[str]:
"""Получение всех потоков (агрегатов)"""
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT DISTINCT aggregate_id
FROM events
ORDER BY aggregate_id
"""
)
return [row["aggregate_id"] for row in rows]Агрегат — это кластер объектов, которые обрабатываются как единое целое. Агрегат имеет идентичность и инкапсулирует бизнес-логику.
# aggregates/order.py
from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
class OrderStatus(Enum):
PENDING = "pending"
CONFIRMED = "confirmed"
PAID = "paid"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
@dataclass
class OrderCreated:
order_id: str
user_id: int
items: List[dict]
total_amount: float
shipping_address: str
@dataclass
class OrderConfirmed:
order_id: str
confirmed_at: str
@dataclass
class OrderPaid:
order_id: str
payment_id: str
paid_at: str
@dataclass
class OrderShipped:
order_id: str
tracking_number: str
shipped_at: str
@dataclass
class OrderCancelled:
order_id: str
reason: str
cancelled_at: str
class OrderAggregate:
"""
Агрегат заказа.
Инкапсулирует бизнес-логику и состояние заказа.
"""
def __init__(self, order_id: str):
self.order_id = order_id
self.user_id: Optional[int] = None
self.items: List[dict] = []
self.total_amount: float = 0.0
self.shipping_address: str = ""
self.status: OrderStatus = OrderStatus.PENDING
self.payment_id: Optional[str] = None
self.tracking_number: Optional[str] = None
self.version: int = 0
self._pending_events: List[Any] = []
@classmethod
def create(
cls,
order_id: str,
user_id: int,
items: List[dict],
shipping_address: str
) -> "OrderAggregate":
"""
Фабричный метод создания нового заказа.
"""
order = cls(order_id)
total_amount = sum(item["price"] * item["quantity"] for item in items)
# Создание события
event = OrderCreated(
order_id=order_id,
user_id=user_id,
items=items,
total_amount=total_amount,
shipping_address=shipping_address
)
order._apply_event(event)
order._pending_events.append(event)
return order
def confirm(self) -> None:
"""Подтверждение заказа"""
if self.status != OrderStatus.PENDING:
raise ValueError(f"Cannot confirm order in status {self.status}")
event = OrderConfirmed(
order_id=self.order_id,
confirmed_at=datetime.utcnow().isoformat()
)
self._apply_event(event)
self._pending_events.append(event)
def mark_as_paid(self, payment_id: str) -> None:
"""Отметка заказа как оплаченного"""
if self.status != OrderStatus.CONFIRMED:
raise ValueError(f"Cannot pay order in status {self.status}")
event = OrderPaid(
order_id=self.order_id,
payment_id=payment_id,
paid_at=datetime.utcnow().isoformat()
)
self._apply_event(event)
self._pending_events.append(event)
def ship(self, tracking_number: str) -> None:
"""Отгрузка заказа"""
if self.status != OrderStatus.PAID:
raise ValueError(f"Cannot ship order in status {self.status}")
event = OrderShipped(
order_id=self.order_id,
tracking_number=tracking_number,
shipped_at=datetime.utcnow().isoformat()
)
self._apply_event(event)
self._pending_events.append(event)
def cancel(self, reason: str) -> None:
"""Отмена заказа"""
if self.status in [OrderStatus.SHIPPED, OrderStatus.DELIVERED]:
raise ValueError(f"Cannot cancel order in status {self.status}")
event = OrderCancelled(
order_id=self.order_id,
reason=reason,
cancelled_at=datetime.utcnow().isoformat()
)
self._apply_event(event)
self._pending_events.append(event)
def _apply_event(self, event: Any) -> None:
"""
Применение события к состоянию агрегата.
Это ключевой метод Event Sourcing — состояние вычисляется из событий.
"""
if isinstance(event, OrderCreated):
self.user_id = event.user_id
self.items = event.items
self.total_amount = event.total_amount
self.shipping_address = event.shipping_address
self.status = OrderStatus.PENDING
elif isinstance(event, OrderConfirmed):
self.status = OrderStatus.CONFIRMED
elif isinstance(event, OrderPaid):
self.status = OrderStatus.PAID
self.payment_id = event.payment_id
elif isinstance(event, OrderShipped):
self.status = OrderStatus.SHIPPED
self.tracking_number = event.tracking_number
elif isinstance(event, OrderCancelled):
self.status = OrderStatus.CANCELLED
self.version += 1
def load_from_history(self, events: List[Any]) -> None:
"""
Загрузка состояния из истории событий.
Вызывается при восстановлении агрегата из Event Store.
"""
for event in events:
self._apply_event(event)
def get_pending_events(self) -> List[Any]:
"""Получение новых событий для сохранения"""
return self._pending_events.copy()
def clear_pending_events(self) -> None:
"""Очистка сохранённых событий"""
self._pending_events.clear()Проекции — это read-модели, которые строятся из событий для удобного чтения и отображения.
# projections/order_projection.py
from dataclasses import dataclass
from typing import Optional, List
import asyncpg
@dataclass
class OrderReadModel:
"""Read-модель заказа для отображения"""
order_id: str
user_id: int
status: str
total_amount: float
items: List[dict]
shipping_address: str
payment_id: Optional[str]
tracking_number: Optional[str]
created_at: str
updated_at: str
class OrderProjection:
"""
Проекция для построения read-модели заказов.
Слушает события и обновляет read-модель.
"""
def __init__(self, dsn: str):
self.dsn = dsn
self._pool: Optional[asyncpg.Pool] = None
async def connect(self):
self._pool = await asyncpg.create_pool(self.dsn)
await self._init_schema()
async def close(self):
if self._pool:
await self._pool.close()
async def _init_schema(self):
"""Инициализация read-модели"""
async with self._pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS orders_read (
order_id TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
status TEXT NOT NULL,
total_amount NUMERIC NOT NULL,
items JSONB NOT NULL DEFAULT '[]',
shipping_address TEXT NOT NULL,
payment_id TEXT,
tracking_number TEXT,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
)
""")
-- Индексы для поиска
CREATE INDEX IF NOT EXISTS idx_orders_read_user
ON orders_read (user_id);
CREATE INDEX IF NOT EXISTS idx_orders_read_status
ON orders_read (status);
""")
async def apply_event(self, event_type: str, event_data: dict) -> None:
"""
Применение события к read-модели.
Вызывается обработчиком событий.
"""
async with self._pool.acquire() as conn:
if event_type == "OrderCreated":
await conn.execute(
"""
INSERT INTO orders_read
(order_id, user_id, status, total_amount, items,
shipping_address, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, NOW(), NOW())
ON CONFLICT (order_id) DO NOTHING
""",
event_data["order_id"],
event_data["user_id"],
"pending",
event_data["total_amount"],
json.dumps(event_data["items"]),
event_data["shipping_address"]
)
elif event_type == "OrderConfirmed":
await conn.execute(
"""
UPDATE orders_read
SET status = 'confirmed', updated_at = NOW()
WHERE order_id = $1
""",
event_data["order_id"]
)
elif event_type == "OrderPaid":
await conn.execute(
"""
UPDATE orders_read
SET status = 'paid', payment_id = $2, updated_at = NOW()
WHERE order_id = $1
""",
event_data["order_id"],
event_data["payment_id"]
)
elif event_type == "OrderShipped":
await conn.execute(
"""
UPDATE orders_read
SET status = 'shipped', tracking_number = $2, updated_at = NOW()
WHERE order_id = $1
""",
event_data["order_id"],
event_data["tracking_number"]
)
elif event_type == "OrderCancelled":
await conn.execute(
"""
UPDATE orders_read
SET status = 'cancelled', updated_at = NOW()
WHERE order_id = $1
""",
event_data["order_id"]
)
async def get_order(self, order_id: str) -> Optional[OrderReadModel]:
"""Получение заказа из read-модели"""
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM orders_read WHERE order_id = $1",
order_id
)
if not row:
return None
return OrderReadModel(
order_id=row["order_id"],
user_id=row["user_id"],
status=row["status"],
total_amount=float(row["total_amount"]),
items=row["items"],
shipping_address=row["shipping_address"],
payment_id=row["payment_id"],
tracking_number=row["tracking_number"],
created_at=row["created_at"].isoformat(),
updated_at=row["updated_at"].isoformat()
)
async def get_user_orders(self, user_id: int) -> List[OrderReadModel]:
"""Получение всех заказов пользователя"""
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"SELECT * FROM orders_read WHERE user_id = $1 ORDER BY created_at DESC",
user_id
)
return [
OrderReadModel(
order_id=row["order_id"],
user_id=row["user_id"],
status=row["status"],
total_amount=float(row["total_amount"]),
items=row["items"],
shipping_address=row["shipping_address"],
payment_id=row["payment_id"],
tracking_number=row["tracking_number"],
created_at=row["created_at"].isoformat(),
updated_at=row["updated_at"].isoformat()
)
for row in rows
]CQRS — это паттерн разделения операций записи (Commands) и чтения (Queries). Запись и чтение используют разные модели данных.
┌─────────────────────────────────────────────────────────────┐
│ CQRS Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Commands │ │ Queries │ │
│ │ (Write) │ │ (Read) │ │
│ │ │ │ │ │
│ │ - Create │ │ - Get by ID │ │
│ │ - Update │ │ - List │ │
│ │ - Delete │ │ - Search │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Write Model │ │ Read Model │ │
│ │ (Domain) │ │ (Projection) │ │
│ │ │ │ │ │
│ │ - Нормализ. │ │ - Денормализ.│ │
│ │ - Валидация │ │ - Оптимиз. │ │
│ │ - Бизнес- │ │ - Быстрый │ │
│ │ логика │ │ поиск │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Write DB │ │ Read DB │ │
│ │ (Master) │ │ (Replica) │ │
│ │ │ │ │ │
│ │ - PostgreSQL │ │ - PostgreSQL │ │
│ │ - События │ │ - Read Model │ │
│ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
# cqrs/commands.py - Команды (Write Side)
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, Field
from typing import List, Optional
import uuid
from aggregates.order import OrderAggregate, OrderCreated
from event_store.store import EventStore
app = FastAPI(title="Order Commands API")
# Зависимости
def get_event_store() -> EventStore:
return EventStore("postgresql://user:pass@localhost/order_db")
class CreateOrderCommand(BaseModel):
user_id: int
items: List[dict]
shipping_address: str
class ConfirmOrderCommand(BaseModel):
order_id: str
class CancelOrderCommand(BaseModel):
order_id: str
reason: str = Field(..., min_length=10)
@app.post("/orders", status_code=201)
async def create_order(
command: CreateOrderCommand,
event_store: EventStore = Depends(get_event_store)
):
"""
Команда создания заказа.
Только запись, валидация бизнес-правил.
"""
order_id = str(uuid.uuid4())
# Создание агрегата через фабричный метод
order = OrderAggregate.create(
order_id=order_id,
user_id=command.user_id,
items=command.items,
shipping_address=command.shipping_address
)
# Сохранение событий
try:
await event_store.append_events(
aggregate_id=order_id,
aggregate_type="Order",
expected_version=-1, # Новый агрегат
events=order.get_pending_events()
)
except EventStore.ConcurrencyError as e:
raise HTTPException(status_code=409, detail=str(e))
return {"order_id": order_id, "status": "pending"}
@app.post("/orders/{order_id}/confirm")
async def confirm_order(
order_id: str,
command: ConfirmOrderCommand,
event_store: EventStore = Depends(get_event_store)
):
"""Команда подтверждения заказа"""
# Восстановление агрегата из событий
order = OrderAggregate(order_id)
events = await event_store.get_events(order_id)
if not events:
raise HTTPException(status_code=404, detail="Order not found")
# Применение истории событий
stored_events = [e.data for e in events]
order.load_from_history(stored_events)
# Выполнение команды
order.confirm()
# Сохранение новых событий
await event_store.append_events(
aggregate_id=order_id,
aggregate_type="Order",
expected_version=order.version - 1,
events=order.get_pending_events()
)
return {"order_id": order_id, "status": "confirmed"}
@app.post("/orders/{order_id}/cancel")
async def cancel_order(
order_id: str,
command: CancelOrderCommand,
event_store: EventStore = Depends(get_event_store)
):
"""Команда отмены заказа"""
order = OrderAggregate(order_id)
events = await event_store.get_events(order_id)
if not events:
raise HTTPException(status_code=404, detail="Order not found")
order.load_from_history([e.data for e in events])
order.cancel(reason=command.reason)
await event_store.append_events(
aggregate_id=order_id,
aggregate_type="Order",
expected_version=order.version - 1,
events=order.get_pending_events()
)
return {"order_id": order_id, "status": "cancelled"}# cqrs/queries.py - Запросы (Read Side)
from fastapi import FastAPI, HTTPException, Query, Depends
from typing import List, Optional
from projections.order_projection import OrderProjection, OrderReadModel
app = FastAPI(title="Order Queries API")
def get_order_projection() -> OrderProjection:
return OrderProjection("postgresql://user:pass@localhost/order_read_db")
@app.get("/orders/{order_id}", response_model=OrderReadModel)
async def get_order(
order_id: str,
projection: OrderProjection = Depends(get_order_projection)
):
"""
Запрос получения заказа.
Только чтение из оптимизированной read-модели.
"""
order = await projection.get_order(order_id)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
return order
@app.get("/users/{user_id}/orders", response_model=List[OrderReadModel])
async def get_user_orders(
user_id: int,
status: Optional[str] = Query(None),
projection: OrderProjection = Depends(get_order_projection)
):
"""
Запрос заказов пользователя с фильтрацией.
Read-модель оптимизирована для таких запросов.
"""
if status:
# Фильтрация по статусу
orders = await projection.get_user_orders(user_id)
return [o for o in orders if o.status == status]
else:
return await projection.get_user_orders(user_id)
@app.get("/orders", response_model=List[OrderReadModel])
async def search_orders(
status: Optional[str] = Query(None),
min_amount: Optional[float] = Query(None),
max_amount: Optional[float] = Query(None),
projection: OrderProjection = Depends(get_order_projection)
):
"""
Поиск заказов с фильтрами.
Read-модель может иметь специализированные индексы.
"""
# В реальной реализации — SQL запрос с фильтрами
all_orders = await projection.get_all_orders()
filtered = all_orders
if status:
filtered = [o for o in filtered if o.status == status]
if min_amount is not None:
filtered = [o for o in filtered if o.total_amount >= min_amount]
if max_amount is not None:
filtered = [o for o in filtered if o.total_amount <= max_amount]
return filtered# cqrs/sync.py - Синхронизатор моделей
from event_store.store import EventStore
from projections.order_projection import OrderProjection
import asyncio
class ModelSynchronizer:
"""
Синхронизатор Write и Read моделей.
Слушает новые события и обновляет read-модель.
"""
def __init__(
self,
event_store: EventStore,
projection: OrderProjection
):
self.event_store = event_store
self.projection = projection
self.last_processed_version = 0
async def sync_from_event(self, stored_event) -> None:
"""Синхронизация read-модели из одного события"""
await self.projection.apply_event(
event_type=stored_event.event_type,
event_data=stored_event.data
)
self.last_processed_version = stored_event.version
async def catch_up(self) -> None:
"""
Догоняющая синхронизация.
Применяет все пропущенные события к read-модели.
"""
all_streams = await self.event_store.get_all_streams()
for aggregate_id in all_streams:
events = await self.event_store.get_events(
aggregate_id,
from_version=self.last_processed_version
)
for event in events:
await self.sync_from_event(event)
async def start_listening(self) -> None:
"""
Запуск прослушивания новых событий.
В production используйте Kafka/RabbitMQ для push-уведомлений.
"""
while True:
await self.catch_up()
await asyncio.sleep(1) # Polling intervalEvent Storming — это collaborative техника моделирования, где команда вместе исследует домен через события.
Event Storming был создан Alberto Brandolini. Это воркшоп, где участники используют стикеры для визуализации потока событий в системе.
Типы стикеров:
┌─────────────────────────────────────────────────────────────┐
│ Event Storming Legend │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ Domain Event (Оранжевый) │
│ │ Order │ Факт, что что-то произошло в прошлом │
│ │ Created │ (Past tense, business language) │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ Command (Синий) │
│ │ Create │ Действие, которое инициирует событие │
│ │ Order │ (Imperative form) │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ Aggregate (Жёлтый) │
│ │ Order │ Объект, который обрабатывает команду │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ Policy (Фиолетовый) │
│ │ When Order │ Бизнес-правило: "Когда X, тогда Y" │
│ │ Paid, then │ │
│ │ Ship │ │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ Read Model (Зелёный) │
│ │ Order List │ Данные для отображения │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ External System (Розовый) │
│ │ Payment │ Система вне нашего контроля │
│ │ Gateway │ │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ Hot Spot (Красный) │
│ │ ??? │ Неясность, требует обсуждения │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Шаг 1: Big Picture (30-60 минут)
Timeline: Order Lifecycle
[Order Created] → [Order Confirmed] → [Order Paid] → [Order Shipped] → [Order Delivered]
↑ ↑ ↑ ↑
Customer Admin Payment Shipping
places order confirms processes delivers
Шаг 2: Process Modeling (1-2 часа)
┌─────────────────────────────────────────────────────────────────┐
│ Order Fulfillment Process │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────┐ ┌─────────────┐ ┌──────────┐ │
│ │ Create │───►│ Order │───►│ Order │ │
│ │ Order │ │ Created │ │ Confirmed│ │
│ └────────┘ └─────────────┘ └──────────┘ │
│ ↑ │ │ │
│ │ │ Policy │ Policy │
│ │ │ "Validate │ "Reserve │
│ │ │ inventory" │ stock" │
│ │ ▼ ▼ │
│ │ ┌─────────────┐ ┌──────────┐ │
│ │ │ Order │ │ Order │ │
│ │ │ Aggregate │ │ Aggregate│ │
│ │ └─────────────┘ └──────────┘ │
│ │ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ External: Payment Gateway │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Шаг 3: Design Level (2-4 часа)
# Результат Event Storming для E-commerce
# Bounded Context: Order Management
# Aggregates: Order, OrderItem
# Commands: CreateOrder, ConfirmOrder, CancelOrder
# Events: OrderCreated, OrderConfirmed, OrderCancelled
# Policies:
# - When OrderCreated, then validate inventory
# - When OrderConfirmed, then reserve stock
# - When OrderCancelled, then release stock
# Read Models: OrderList, OrderDetails, UserOrders
# Bounded Context: Payment
# Aggregates: Payment
# Commands: InitiatePayment, ConfirmPayment, RefundPayment
# Events: PaymentInitiated, PaymentConfirmed, PaymentRefunded
# Policies:
# - When OrderConfirmed, then initiate payment
# - When PaymentConfirmed, then mark order as paid
# External Systems: Stripe, PayPal
# Bounded Context: Shipping
# Aggregates: Shipment
# Commands: CreateShipment, UpdateTracking, MarkDelivered
# Events: ShipmentCreated, TrackingUpdated, ShipmentDelivered
# Policies:
# - When OrderPaid, then create shipment
# - When ShipmentDelivered, then mark order as delivered
# External Systems: FedEx API, UPS APIDomain Events — это события, которые происходят внутри домена и имеют бизнес-значение.
Domain Event — это запись о факте, что что-то важное произошло в домене.
Характеристики Domain Events:
# domain/events.py
from dataclasses import dataclass
from datetime import datetime
from typing import Any
import uuid
@dataclass(frozen=True) # frozen = immutable
class DomainEvent:
"""Базовый класс для всех доменных событий"""
event_id: str = None
occurred_on: datetime = None
aggregate_id: str = None
aggregate_type: str = None
def __post_init__(self):
# Установка значений по умолчанию для immutable объектов
if object.__getattribute__(self, 'event_id') is None:
object.__setattr__(self, 'event_id', str(uuid.uuid4()))
if object.__getattribute__(self, 'occurred_on') is None:
object.__setattr__(self, 'occurred_on', datetime.utcnow())
@dataclass(frozen=True)
class OrderCreated(DomainEvent):
"""Событие создания заказа"""
user_id: int = None
items: tuple = None # tuple для immutability
total_amount: float = None
shipping_address: str = None
@dataclass(frozen=True)
class PaymentProcessed(DomainEvent):
"""Событие обработки платежа"""
order_id: str = None
payment_id: str = None
amount: float = None
payment_method: str = None
@dataclass(frozen=True)
class InventoryReserved(DomainEvent):
"""Событие резервирования товара"""
order_id: str = None
product_id: str = None
quantity: int = None
warehouse_id: str = None# domain/event_dispatcher.py
from typing import Callable, Dict, List, Any
from collections import defaultdict
import asyncio
class EventDispatcher:
"""
Диспетчер доменных событий.
Позволяет подписываться на события и обрабатывать их асинхронно.
"""
def __init__(self):
self._handlers: Dict[str, List[Callable]] = defaultdict(list)
def subscribe(
self,
event_type: str,
handler: Callable
) -> None:
"""
Подписка обработчика на событие.
Args:
event_type: Имя типа события (например, "OrderCreated")
handler: Асинхронная функция-обработчик
"""
self._handlers[event_type].append(handler)
def unsubscribe(
self,
event_type: str,
handler: Callable
) -> None:
"""Отписка обработчика от события"""
if handler in self._handlers[event_type]:
self._handlers[event_type].remove(handler)
async def dispatch(self, event: Any) -> None:
"""
Отправка события всем подписчикам.
Args:
event: Экземпляр доменного события
"""
event_type = event.__class__.__name__
handlers = self._handlers.get(event_type, [])
# Параллельное выполнение всех обработчиков
tasks = [handler(event) for handler in handlers]
await asyncio.gather(*tasks, return_exceptions=True)
# Пример использования
dispatcher = EventDispatcher()
async def send_confirmation_email(event: OrderCreated):
"""Отправка email подтверждения"""
print(f"Sending email to user {event.user_id} for order {event.aggregate_id}")
async def reserve_inventory(event: OrderCreated):
"""Резервирование товаров на складе"""
for item in event.items:
print(f"Reserving {item['quantity']} of product {item['product_id']}")
async def update_analytics(event: OrderCreated):
"""Обновление аналитики"""
print(f"Recording order of {event.total_amount} in analytics")
# Подписка обработчиков
dispatcher.subscribe("OrderCreated", send_confirmation_email)
dispatcher.subscribe("OrderCreated", reserve_inventory)
dispatcher.subscribe("OrderCreated", update_analytics)
# Отправка события
event = OrderCreated(
aggregate_id="order-123",
user_id=42,
items=({"product_id": "p1", "quantity": 2},),
total_amount=99.99,
shipping_address="123 Main St"
)
await dispatcher.dispatch(event)# domain/order_aggregate_with_events.py
from typing import List
from domain.events import OrderCreated, OrderConfirmed, OrderCancelled
class OrderAggregate:
"""Агрегат заказа с поддержкой Domain Events"""
def __init__(self, order_id: str):
self.order_id = order_id
self.user_id: int = None
self.status: str = "pending"
self._domain_events: List[DomainEvent] = []
@classmethod
def create(
cls,
order_id: str,
user_id: int,
items: List[dict],
shipping_address: str
) -> "OrderAggregate":
"""Создание нового заказа"""
order = cls(order_id)
order.user_id = user_id
# Создание и запись доменного события
event = OrderCreated(
aggregate_id=order_id,
aggregate_type="Order",
user_id=user_id,
items=tuple(items),
total_amount=sum(i["price"] * i["quantity"] for i in items),
shipping_address=shipping_address
)
order._record_event(event)
return order
def confirm(self) -> None:
"""Подтверждение заказа"""
if self.status != "pending":
raise ValueError(f"Cannot confirm order in status {self.status}")
self.status = "confirmed"
event = OrderConfirmed(
aggregate_id=self.order_id,
aggregate_type="Order"
)
self._record_event(event)
def cancel(self, reason: str) -> None:
"""Отмена заказа"""
if self.status in ["shipped", "delivered"]:
raise ValueError(f"Cannot cancel order in status {self.status}")
self.status = "cancelled"
event = OrderCancelled(
aggregate_id=self.order_id,
aggregate_type="Order",
reason=reason
)
self._record_event(event)
def _record_event(self, event: DomainEvent) -> None:
"""Запись доменного события"""
self._domain_events.append(event)
def get_domain_events(self) -> List[DomainEvent]:
"""Получение новых событий"""
return self._domain_events.copy()
def clear_domain_events(self) -> None:
"""Очистка записанных событий после сохранения"""
self._domain_events.clear()События живут долго. Схемы должны эволюционировать без breaking changes.
CloudEvents — это стандарт формата событий от CNCF.
# events/cloudevents.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Any
import json
@dataclass
class CloudEvent:
"""
CloudEvent v1.0 specification.
https://cloudevents.io/
"""
# Required attributes
id: str # Unique event ID
source: str # Context about producer (URI)
specversion: str = "1.0" # CloudEvents version
type: str = "" # Event type (e.g., "com.example.order.created")
# Optional attributes
datacontenttype: str = "application/json"
dataschema: Optional[str] = None # URI to schema
subject: Optional[str] = None # Subject of event
time: datetime = None
data: Optional[Any] = None
def __post_init__(self):
if self.time is None:
self.time = datetime.utcnow()
def to_json(self) -> str:
"""Сериализация в JSON"""
return json.dumps({
"specversion": self.specversion,
"id": self.id,
"source": self.source,
"type": self.type,
"datacontenttype": self.datacontenttype,
"dataschema": self.dataschema,
"subject": self.subject,
"time": self.time.isoformat(),
"data": self.data
})
@classmethod
def from_json(cls, json_str: str) -> "CloudEvent":
"""Десериализация из JSON"""
data = json.loads(json_str)
return cls(
id=data["id"],
source=data["source"],
specversion=data["specversion"],
type=data["type"],
datacontenttype=data.get("datacontenttype", "application/json"),
dataschema=data.get("dataschema"),
subject=data.get("subject"),
time=datetime.fromisoformat(data["time"]),
data=data.get("data")
)
# Пример использования
event = CloudEvent(
id="order-123-created",
source="order-service",
type="com.example.order.created",
dataschema="https://schema.example.com/order/v1",
subject="order-123",
data={
"order_id": "order-123",
"user_id": 42,
"total_amount": 99.99
}
)
json_str = event.to_json()
# {
# "specversion": "1.0",
# "id": "order-123-created",
# "source": "order-service",
# "type": "com.example.order.created",
# "datacontenttype": "application/json",
# "dataschema": "https://schema.example.com/order/v1",
# "subject": "order-123",
# "time": "2026-03-03T10:00:00",
# "data": {"order_id": "order-123", "user_id": 42, "total_amount": 99.99}
# }Apache Avro обеспечивает schema evolution с обратной совместимостью.
# events/avro_schemas.py
# Order.avsc - Avro schema для события OrderCreated
ORDER_CREATED_SCHEMA_V1 = """
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.events",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "user_id", "type": "int"},
{"name": "total_amount", "type": "double"},
{"name": "shipping_address", "type": "string"}
]
}
"""
# OrderCreated V2 - добавлено новое поле с default (BACKWARD compatible)
ORDER_CREATED_SCHEMA_V2 = """
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.events",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "user_id", "type": "int"},
{"name": "total_amount", "type": "double"},
{"name": "shipping_address", "type": "string"},
{"name": "items", "type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "product_id", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "price", "type": "double"}
]
}
}, "default": []}
]
}
"""
# Правила совместимости Avro:
# BACKWARD compatible (новые consumers читают старые данные):
# - Добавление поля с default value
# - Удаление поля без default
#
# FORWARD compatible (старые consumers читают новые данные):
# - Добавление поля без default
# - Удаление поля с default
#
# FULL compatible (и backward, и forward):
# - Добавление поля с default valueProtocol Buffers от Google.
// events/order.proto
syntax = "proto3";
package events;
// OrderCreated V1
message OrderCreatedV1 {
string order_id = 1;
int32 user_id = 2;
double total_amount = 3;
string shipping_address = 4;
}
// OrderCreated V2 - добавлены новые поля (compatible)
// Правила совместимости Protobuf:
// - Можно добавлять новые поля с новыми номерами
// - Нельзя менять номера существующих полей
// - Нельзя удалять поля (лучше пометить как reserved)
message OrderCreatedV2 {
string order_id = 1;
int32 user_id = 2;
double total_amount = 3;
string shipping_address = 4;
// Новые поля в V2
repeated OrderItem items = 5;
string currency = 6;
int64 created_at = 7; // Unix timestamp
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
double price = 3;
}
// Зарезервированные номера (удалённые поля)
message OrderCreatedV3 {
string order_id = 1;
int32 user_id = 2;
double total_amount = 3;
string shipping_address = 4;
repeated OrderItem items = 5;
string currency = 6;
int64 created_at = 7;
// Поле 8 было удалено, резервируем номер
reserved 8;
reserved "old_field";
}# events/protobuf_usage.py
# После компиляции .proto файлов:
# python -m grpc_tools.protoc -I. --python_out=. order.proto
from order_pb2 import OrderCreatedV2, OrderItem
# Создание события
event = OrderCreatedV2(
order_id="order-123",
user_id=42,
total_amount=99.99,
shipping_address="123 Main St",
currency="USD",
items=[
OrderItem(product_id="p1", quantity=2, price=49.99)
]
)
# Сериализация (бинарный формат)
serialized = event.SerializeToString()
# Десериализация
new_event = OrderCreatedV2()
new_event.ParseFromString(serialized)Schema Registry хранит версии схем и проверяет совместимость.
# events/schema_registry_client.py
import httpx
from typing import Optional
class SchemaRegistryClient:
"""
Клиент для Confluent Schema Registry.
Управляет версиями схем и проверяет совместимость.
"""
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.AsyncClient()
async def register_schema(
self,
subject: str,
schema: str,
compatibility: str = "BACKWARD"
) -> int:
"""
Регистрация новой версии схемы.
Returns:
version: Номер версии схемы
"""
response = await self.client.post(
f"{self.base_url}/subjects/{subject}/versions",
json={"schema": schema}
)
response.raise_for_status()
return response.json()["id"]
async def get_schema(
self,
subject: str,
version: Optional[int] = None
) -> str:
"""Получение схемы по subject и версии"""
if version is None:
url = f"{self.base_url}/subjects/{subject}/schema"
else:
url = f"{self.base_url}/subjects/{subject}/versions/{version}/schema"
response = await self.client.get(url)
response.raise_for_status()
return response.json()["schema"]
async def check_compatibility(
self,
subject: str,
schema: str,
version: Optional[int] = None
) -> dict:
"""
Проверка совместимости схемы.
Returns:
{"is_compatible": bool, "messages": [...]}
"""
response = await self.client.post(
f"{self.base_url}/compatibility/subjects/{subject}/versions/{version or 'latest'}",
json={"schema": schema}
)
response.raise_for_status()
return response.json()
async def set_compatibility(self, subject: str, level: str) -> None:
"""
Установка уровня совместимости.
Levels: NONE, BACKWARD, BACKWARD_TRANSITIVE, FORWARD,
FORWARD_TRANSITIVE, FULL, FULL_TRANSITIVE
"""
await self.client.put(
f"{self.base_url}/config/{subject}",
json={"compatibility": level}
)
# Использование
registry = SchemaRegistryClient("http://schema-registry:8081")
# Регистрация схемы
version = await registry.register_schema(
subject="order-created-value",
schema=ORDER_CREATED_SCHEMA_V2,
compatibility="BACKWARD"
)
# Проверка совместимости перед регистрацией
compat = await registry.check_compatibility(
subject="order-created-value",
schema=ORDER_CREATED_SCHEMA_V3
)
if not compat["is_compatible"]:
print(f"Schema incompatible: {compat['messages']}")Reactive Systems — это архитектурный стиль для создания отзывчивых, устойчивых, эластичных и message-driven систем.
Согласно reactivemanifesto.org, реактивная система должна быть:
┌─────────────────────────────────────────────────────────────┐
│ Reactive Systems Manifesto │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Responsive (Отзывчивая) │
│ - Система отвечает вовремя │
│ - Предсказуемое время отклика │
│ - Даже при ошибках — понятный ответ │
│ │
│ 2. Resilient (Устойчивая) │
│ - Система остаётся отзывчивой при сбоях │
│ - Изоляция компонентов (bulkhead) │
│ - Делегирование (circuit breaker) │
│ - Самовосстановление │
│ │
│ 3. Elastic (Эластичная) │
│ - Система остаётся отзывчивой при изменении нагрузки │
│ - Масштабирование по требованию │
│ - Шардирование, репликация │
│ │
│ 4. Message-Driven (На сообщениях) │
│ - Асинхронная коммуникация через сообщения │
│ - Слабая связанность компонентов │
│ - Изоляция времени и пространства │
│ │
└─────────────────────────────────────────────────────────────┘
# reactive/responsive.py - Отзывчивость
import asyncio
from datetime import timedelta
from typing import Any, Optional
class TimeoutError(Exception):
pass
async def with_timeout(
coro,
timeout: timedelta,
fallback: Any = None
) -> Any:
"""
Выполнение операции с таймаутом.
Гарантирует отзывчивость даже при медленных зависимостях.
"""
try:
return await asyncio.wait_for(coro, timeout=timeout.total_seconds())
except asyncio.TimeoutError:
if fallback is not None:
return fallback
raise TimeoutError(f"Operation timed out after {timeout}")
# Пример: отзывчивый API endpoint
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
"""
Отзывчивый endpoint с таймаутом.
"""
order = await with_timeout(
order_repository.get_by_id(order_id),
timeout=timedelta(seconds=2),
fallback=None
)
if order is None:
# Fallback: возвращаем кэшированные данные или ошибку
cached = await cache.get(f"order:{order_id}")
if cached:
return {**cached, "stale": True} # Помечаем как устаревшие
raise HTTPException(status_code=408, detail="Request timeout")
return order# reactive/resilient.py - Устойчивость
from typing import Callable, Any
import asyncio
from functools import wraps
class CircuitBreaker:
"""
Circuit Breaker pattern для устойчивости.
"""
CLOSED = "closed" # Нормальная работа
OPEN = "open" # Блокировка вызовов
HALF_OPEN = "half_open" # Проверка восстановления
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max_calls: int = 1
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = self.CLOSED
self.failure_count = 0
self.last_failure_time: Optional[float] = None
self.half_open_calls = 0
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Вызов функции через circuit breaker"""
if self.state == self.OPEN:
if self._should_attempt_reset():
self.state = self.HALF_OPEN
self.half_open_calls = 0
else:
raise Exception("Circuit breaker is OPEN")
if self.state == self.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise Exception("Circuit breaker HALF_OPEN limit reached")
self.half_open_calls += 1
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
self.state = self.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = asyncio.get_event_loop().time()
if self.failure_count >= self.failure_threshold:
self.state = self.OPEN
def _should_attempt_reset(self) -> bool:
if self.last_failure_time is None:
return True
elapsed = asyncio.get_event_loop().time() - self.last_failure_time
return elapsed >= self.recovery_timeout
# Декоратор для circuit breaker
def circuit_breaker(cb: CircuitBreaker):
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
return await cb.call(func, *args, **kwargs)
return wrapper
return decorator
# Использование
payment_cb = CircuitBreaker(failure_threshold=3, recovery_timeout=60)
@circuit_breaker(payment_cb)
async def call_payment_gateway(payment_data: dict):
"""Вызов платежного шлюза с circuit breaker"""
async with httpx.AsyncClient() as client:
response = await client.post(
"https://payment.gateway/charge",
json=payment_data
)
response.raise_for_status()
return response.json()# reactive/elastic.py - Эластичность
import asyncio
from typing import List, Callable
from dataclasses import dataclass
@dataclass
class LoadMetrics:
"""Метрики нагрузки"""
requests_per_second: float
avg_latency_ms: float
error_rate: float
queue_size: int
class ElasticScaler:
"""
Автоматическое масштабирование на основе нагрузки.
"""
def __init__(
self,
min_instances: int = 1,
max_instances: int = 10,
scale_up_threshold: float = 0.8,
scale_down_threshold: float = 0.3,
cooldown_seconds: float = 60.0
):
self.min_instances = min_instances
self.max_instances = max_instances
self.scale_up_threshold = scale_up_threshold
self.scale_down_threshold = scale_down_threshold
self.cooldown_seconds = cooldown_seconds
self.current_instances = min_instances
self.last_scale_time: float = 0
async def evaluate_and_scale(
self,
metrics: LoadMetrics,
scale_up: Callable,
scale_down: Callable
) -> None:
"""
Оценка метрик и масштабирование.
"""
import time
now = time.time()
# Проверка cooldown
if now - self.last_scale_time < self.cooldown_seconds:
return
# Вычисление нагрузки (0.0 - 1.0)
load = self._calculate_load(metrics)
if load > self.scale_up_threshold:
await self._scale_up(scale_up)
elif load < self.scale_down_threshold:
await self._scale_down(scale_down)
def _calculate_load(self, metrics: LoadMetrics) -> float:
"""Вычисление общей нагрузки"""
# Комбинированный показатель нагрузки
latency_factor = min(metrics.avg_latency_ms / 1000, 1.0)
error_factor = metrics.error_rate
queue_factor = min(metrics.queue_size / 1000, 1.0)
return (latency_factor + error_factor + queue_factor) / 3
async def _scale_up(self, scale_up: Callable) -> None:
if self.current_instances < self.max_instances:
await scale_up()
self.current_instances += 1
import time
self.last_scale_time = time.time()
async def _scale_down(self, scale_down: Callable) -> None:
if self.current_instances > self.min_instances:
await scale_down()
self.current_instances -= 1
import time
self.last_scale_time = time.time()# reactive/message_driven.py - Message-Driven архитектура
import asyncio
from typing import Callable, Any
from dataclasses import dataclass
import json
@dataclass
class Message:
"""Сообщение для асинхронной коммуникации"""
id: str
topic: str
key: str
value: Any
headers: dict
timestamp: float
class MessageBus:
"""
Message Bus для асинхронной коммуникации.
Реализует publish/subscribe паттерн.
"""
def __init__(self):
self._subscribers: dict[str, list[Callable]] = {}
self._message_queue: asyncio.Queue = asyncio.Queue()
async def publish(self, message: Message) -> None:
"""Публикация сообщения"""
await self._message_queue.put(message)
asyncio.create_task(self._dispatch(message))
def subscribe(self, topic: str, handler: Callable) -> None:
"""Подписка на топик"""
if topic not in self._subscribers:
self._subscribers[topic] = []
self._subscribers[topic].append(handler)
async def _dispatch(self, message: Message) -> None:
"""Диспетчеризация сообщения подписчикам"""
handlers = self._subscribers.get(message.topic, [])
await asyncio.gather(
*[handler(message) for handler in handlers],
return_exceptions=True
)
# Пример: Event-Driven микросервисы
message_bus = MessageBus()
async def order_service_handler(message: Message):
"""Order Service публикует события"""
if message.value["event_type"] == "order.created":
# Публикация для других сервисов
await message_bus.publish(Message(
id=str(uuid.uuid4()),
topic="inventory.reserve",
key=message.value["order_id"],
value={"order_id": message.value["order_id"], "items": message.value["items"]},
headers={},
timestamp=time.time()
))
async def inventory_service_handler(message: Message):
"""Inventory Service потребляет и реагирует"""
# Резервирование товаров
await reserve_inventory(message.value["order_id"], message.value["items"])
# Публикация события
await message_bus.publish(Message(
id=str(uuid.uuid4()),
topic="inventory.reserved",
key=message.value["order_id"],
value={"order_id": message.value["order_id"], "status": "reserved"},
headers={},
timestamp=time.time()
))
# Подписка сервисов
message_bus.subscribe("order.created", order_service_handler)
message_bus.subscribe("inventory.reserve", inventory_service_handler)# main.py - Полная интеграция
from fastapi import FastAPI
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
import asyncio
from event_store.store import EventStore
from aggregates.order import OrderAggregate
from projections.order_projection import OrderProjection
from cqrs.commands import CreateOrderCommand
from domain.event_dispatcher import EventDispatcher
app = FastAPI(title="Event-Driven Order Service")
# Инициализация компонентов
event_store = EventStore("postgresql://user:pass@localhost/events_db")
projection = OrderProjection("postgresql://user:pass@localhost/read_db")
event_dispatcher = EventDispatcher()
kafka_producer = AIOKafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v, default=str).encode()
)
kafka_consumer = AIOKafkaConsumer(
'order-events',
bootstrap_servers='kafka:9092',
group_id='order-service-group',
auto_offset_reset='earliest'
)
@app.on_event("startup")
async def startup():
await event_store.connect()
await projection.connect()
await kafka_producer.start()
await kafka_consumer.start()
# Запуск обработчика событий из Kafka
asyncio.create_task(process_kafka_events())
@app.on_event("shutdown")
async def shutdown():
await event_store.close()
await projection.close()
await kafka_producer.stop()
await kafka_consumer.stop()
@app.post("/orders", status_code=201)
async def create_order(command: CreateOrderCommand):
"""Создание заказа с публикацией события"""
import uuid
order_id = str(uuid.uuid4())
# Создание агрегата
order = OrderAggregate.create(
order_id=order_id,
user_id=command.user_id,
items=command.items,
shipping_address=command.shipping_address
)
# Сохранение событий
await event_store.append_events(
aggregate_id=order_id,
aggregate_type="Order",
expected_version=-1,
events=order.get_pending_events()
)
# Публикация в Kafka
for event in order.get_pending_events():
await kafka_producer.send_and_wait(
topic="order-events",
key=order_id.encode(),
value={
"event_type": event.__class__.__name__,
"aggregate_id": order_id,
"data": event.__dict__
}
)
return {"order_id": order_id, "status": "pending"}
async def process_kafka_events():
"""Обработчик событий из Kafka для обновления read-модели"""
async for msg in kafka_consumer:
event = json.loads(msg.value)
# Обновление read-модели
await projection.apply_event(
event_type=event["event_type"],
event_data=event["data"]
)
# Диспетчеризация доменных событий
# (для триггеров в других сервисах)Event-Driven Architecture предоставляет мощные паттерны для создания масштабируемых, устойчивых и слабосвязанных систем:
Ключевые принципы успешной EDA:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.