Разделение чтения и записи. События как источник истины. Проекции и материализованные представления.
CQRS разделяет чтение и запись. Event Sourcing хранит события как истину.
CQRS (Command Query Responsibility Segregation) — архитектурный паттерн разделения операций чтения (Query) и записи (Command) на разные модели.
# ❌ ПЛОХО: Одна модель для чтения и записи
class User:
def __init__(self, id, name, email):
self.id = id
self.name = name
self.email = email
# Методы записи
def update_email(self, new_email):
# Валидация, бизнес-логика
if not self._validate_email(new_email):
raise ValueError("Invalid email")
self.email = new_email
# Сохранение в БД
db.execute("UPDATE users SET email = ? WHERE id = ?",
(new_email, self.id))
# Методы чтения
def get_dashboard_data(self):
# Сложный запрос с JOIN и агрегацией
return db.execute("""
SELECT u.name, COUNT(o.id) as orders, SUM(o.total) as spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.id = ?
""", (self.id,))
# Проблема:
# 1. Модель раздувается методами
# 2. Оптимизация чтения вредит записи
# 3. Сложная бизнес-логика смешана с запросами# ✅ ХОРОШО: Разделение моделей
# Command модель (запись)
class UserCommand:
def __init__(self, id, name, email):
self.id = id
self.name = name
self.email = email
self._events = []
def update_email(self, new_email: str):
# Бизнес-логика
if not self._validate_email(new_email):
raise ValueError("Invalid email")
# Генерация события
self._events.append(UserEmailChanged(
user_id=self.id,
old_email=self.email,
new_email=new_email
))
self.email = new_email
def get_events(self):
events = self._events.copy()
self._events.clear()
return events
def _validate_email(self, email: str) -> bool:
return "@" in email and "." in email
# Query модель (чтение)
@dataclass
class UserDashboard:
user_id: UUID
name: str
order_count: int
total_spent: float
class UserDashboardQuery:
def __init__(self, connection):
self._conn = connection
def get_dashboard(self, user_id: UUID) -> UserDashboard:
# Оптимизированный запрос
cursor = self._conn.execute("""
SELECT u.name,
COUNT(o.id) as order_count,
COALESCE(SUM(o.total), 0) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.id = ?
GROUP BY u.id
""", (str(user_id),))
row = cursor.fetchone()
return UserDashboard(
user_id=user_id,
name=row[0],
order_count=row[1],
total_spent=row[2]
)
# Использование
# Запись через Command
user = UserCommand(uuid, "Alice", "alice@example.com")
user.update_email("new@example.com")
events = user.get_events()
# События сохраняются в Event Store
# Чтение через Query
query = UserDashboardQuery(conn)
dashboard = query.get_dashboard(uuid)
print(f"{dashboard.name}: {dashboard.order_count} orders, ${dashboard.total_spent}")Event Sourcing — паттерн, где состояние системы определяется как последовательность событий; текущее состояние вычисляется применением событий.
# ✅ Event Sourcing
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Any
from uuid import UUID, uuid4
@dataclass
class Event:
event_id: UUID
aggregate_id: UUID
event_type: str
data: dict
timestamp: float
class EventStore:
def __init__(self):
self._events: List[Event] = []
def append(self, events: List[Event]):
self._events.extend(events)
def get_events(self, aggregate_id: UUID) -> List[Event]:
return [e for e in self._events if e.aggregate_id == aggregate_id]
def get_all_events(self) -> List[Event]:
return self._events.copy()
class Aggregate(ABC):
def __init__(self, id: UUID = None):
self._id = id or uuid4()
self._events: List[Event] = []
@property
def id(self) -> UUID:
return self._id
@property
def events(self) -> List[Event]:
return self._events.copy()
def clear_events(self):
self._events.clear()
def _raise_event(self, event_type: str, data: dict):
event = Event(
event_id=uuid4(),
aggregate_id=self._id,
event_type=event_type,
data=data,
timestamp=time.time()
)
self._events.append(event)
self._apply(event)
@abstractmethod
def _apply(self, event: Event):
"""Применяет событие к состоянию"""
pass
@classmethod
@abstractmethod
def from_history(cls, events: List[Event]) -> "Aggregate":
"""Восстанавливает состояние из истории событий"""
pass
class BankAccount(Aggregate):
def __init__(self, id: UUID = None):
super().__init__(id)
self._balance = 0
self._owner = ""
@property
def balance(self) -> float:
return self._balance
@property
def owner(self) -> str:
return self._owner
def open(self, owner: str, initial_balance: float = 0):
if self._owner:
raise ValueError("Account already opened")
self._raise_event("AccountOpened", {
"owner": owner,
"initial_balance": initial_balance
})
def deposit(self, amount: float):
if amount <= 0:
raise ValueError("Amount must be positive")
self._raise_event("MoneyDeposited", {"amount": amount})
def withdraw(self, amount: float):
if amount <= 0:
raise ValueError("Amount must be positive")
if amount > self._balance:
raise ValueError("Insufficient funds")
self._raise_event("MoneyWithdrawn", {"amount": amount})
def _apply(self, event: Event):
if event.event_type == "AccountOpened":
self._owner = event.data["owner"]
self._balance = event.data["initial_balance"]
elif event.event_type == "MoneyDeposited":
self._balance += event.data["amount"]
elif event.event_type == "MoneyWithdrawn":
self._balance -= event.data["amount"]
@classmethod
def from_history(cls, events: List[Event]) -> "BankAccount":
account = cls()
for event in events:
account._apply(event)
return account
# Использование
event_store = EventStore()
# Создание счёта
account = BankAccount()
account.open("Alice", 100)
event_store.append(account.events)
account.clear_events()
# Операции
account.deposit(50)
account.withdraw(30)
event_store.append(account.events)
account.clear_events()
# Восстановление из истории
events = event_store.get_events(account.id)
restored = BankAccount.from_history(events)
print(f"Balance: ${restored.balance}") # $120# ✅ Order Event Sourcing
from enum import Enum
class OrderStatus(Enum):
CREATED = "created"
PAID = "paid"
SHIPPED = "shipped"
CANCELLED = "cancelled"
class Order(Aggregate):
def __init__(self, id: UUID = None):
super().__init__(id)
self._status = OrderStatus.CREATED
self._items = []
self._total = 0
@property
def status(self) -> OrderStatus:
return self._status
@property
def total(self) -> float:
return self._total
def create(self, items: List[dict]):
if self._status != OrderStatus.CREATED:
raise ValueError("Order already created")
total = sum(item["price"] * item["qty"] for item in items)
self._raise_event("OrderCreated", {
"items": items,
"total": total
})
def pay(self):
if self._status != OrderStatus.CREATED:
raise ValueError("Invalid status")
self._raise_event("OrderPaid", {})
def ship(self):
if self._status != OrderStatus.PAID:
raise ValueError("Cannot ship unpaid order")
self._raise_event("OrderShipped", {})
def cancel(self):
if self._status not in (OrderStatus.CREATED, OrderStatus.PAID):
raise ValueError("Cannot cancel")
self._raise_event("OrderCancelled", {})
def _apply(self, event: Event):
if event.event_type == "OrderCreated":
self._items = event.data["items"]
self._total = event.data["total"]
elif event.event_type == "OrderPaid":
self._status = OrderStatus.PAID
elif event.event_type == "OrderShipped":
self._status = OrderStatus.SHIPPED
elif event.event_type == "OrderCancelled":
self._status = OrderStatus.CANCELLED
@classmethod
def from_history(cls, events: List[Event]) -> "Order":
order = cls()
for event in events:
order._apply(event)
return order
# Проекции (Projections)
class OrderStatistics:
"""Проекция для статистики заказов"""
def __init__(self):
self.total_orders = 0
self.total_revenue = 0
self.orders_by_status = {}
def apply(self, event: Event):
if event.event_type == "OrderCreated":
self.total_orders += 1
elif event.event_type == "OrderPaid":
self.total_revenue += event.data.get("total", 0)
status = event.data.get("status", "unknown")
self.orders_by_status[status] = self.orders_by_status.get(status, 0) + 1
# Построение проекции из всех событий
def build_statistics(event_store: EventStore) -> OrderStatistics:
stats = OrderStatistics()
for event in event_store.get_all_events():
stats.apply(event)
return stats# ✅ CQRS с Event Sourcing
class CommandHandler:
def __init__(self, event_store: EventStore):
self._event_store = event_store
def create_order(self, order_id: UUID, items: List[dict]):
order = Order(order_id)
order.create(items)
self._event_store.append(order.events)
order.clear_events()
def pay_order(self, order_id: UUID):
events = self._event_store.get_events(order_id)
order = Order.from_history(events)
order.pay()
self._event_store.append(order.events)
order.clear_events()
class QueryHandler:
def __init__(self, event_store: EventStore, read_db):
self._event_store = event_store
self._read_db = read_db
def get_order(self, order_id: UUID) -> dict:
events = self._event_store.get_events(order_id)
order = Order.from_history(events)
return {
"id": str(order.id),
"status": order.status.value,
"total": order.total
}
def get_order_statistics(self) -> dict:
# Чтение из оптимизированной read-модели
return self._read_db.get_statistics()
# Использование
event_store = EventStore()
commands = CommandHandler(event_store)
queries = QueryHandler(event_store, read_db)
# Запись
order_id = uuid4()
commands.create_order(order_id, [{"price": 10, "qty": 2}])
commands.pay_order(order_id)
# Чтение
order = queries.get_order(order_id)
print(f"Order {order['id']}: ${order['total']}")# ✅ Snapshots для больших историй
class SnapshotStore:
def __init__(self):
self._snapshots: Dict[UUID, dict] = {}
def save(self, aggregate_id: UUID, state: dict, version: int):
self._snapshots[aggregate_id] = {
"state": state,
"version": version
}
def get(self, aggregate_id: UUID) -> Optional[dict]:
return self._snapshots.get(aggregate_id)
class AggregateWithSnapshot(Aggregate):
def __init__(self, id: UUID = None):
super().__init__(id)
self._version = 0
def _raise_event(self, event_type: str, data: dict):
super()._raise_event(event_type, data)
self._version += 1
def get_snapshot(self) -> dict:
"""Возвращает снимок состояния"""
return {
"version": self._version,
"state": self._to_state()
}
@abstractmethod
def _to_state(self) -> dict:
pass
@classmethod
def from_snapshot(cls, snapshot: dict, events: List[Event]) -> "AggregateWithSnapshot":
"""Восстанавливает из snapshot + события после"""
instance = cls._from_state(snapshot["state"])
instance._version = snapshot["version"]
# Применяем только события после snapshot
for event in events:
if event.data.get("version", 0) > snapshot["version"]:
instance._apply(event)
return instance
# Использование каждые N событий
SNAPSHOT_FREQUENCY = 100
if aggregate._version % SNAPSHOT_FREQUENCY == 0:
snapshot_store.save(aggregate.id, aggregate.get_snapshot(), aggregate._version)| Паттерн | Когда использовать | Pythonic-реализация |
|---|---|---|
| CQRS | Разная оптимизация чтения/записи, сложные дашборды | Command/Query модели |
| Event Sourcing | Аудит, временные запросы, отмена операций | Aggregate + Events |
| Projections | Быстрое чтение из событий | Обработчики событий |
| Snapshots | Большие истории событий | Периодические снимки |
Главный принцип: CQRS разделяет чтение/запись, Event Sourcing хранит события вместо состояния.
Изучите тему Asyncio паттерны для асинхронного программирования.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.