Layered Architecture, Hexagonal, Clean Architecture, Event-Driven — когда и зачем применять каждый паттерн.
Правильная архитектура — это когда код легко менять, а система растёт без боли
Представьте: вы начинаете проект с одного файла main.py. Через месяц — 10 файлов. Через год — 10 000 строк кода, которые никто не понимает. Добавление новой фичи ломает три старые. Знакомо?
Архитектурные паттерны — это проверенные решения типовых проблем организации кода. Они не серебряная пуля, но дают:
В этом туториале разберём четыре ключевых паттерна:
Layered Architecture делит систему на горизонтальные слои, где каждый слой зависит только от слоя ниже.
┌─────────────────────────────────┐
│ Presentation Layer │ ← HTTP, CLI, GUI
│ (Controllers, Views) │
├─────────────────────────────────┤
│ Business Logic Layer │ ← Правила, валидация
│ (Services, Domain) │
├─────────────────────────────────┤
│ Data Access Layer │ ← БД, кэш, файлы
│ (Repositories, DAO) │
├─────────────────────────────────┤
│ Database Layer │ ← PostgreSQL, Redis
└─────────────────────────────────┘
Правило: вызовы идут только сверху вниз. Presentation → Business → Data → Database.
Классическая веб-архитектура:
┌──────────────┐
│ Client │ ← Браузер, мобильное приложение
├──────────────┤
│ Server │ ← Бизнес-логика, API
├──────────────┤
│ Database │ ← Хранение данных
└──────────────┘
Добавляет слой балансировки нагрузки:
┌──────────────┐
│ Client │
├──────────────┤
│ Load │ ← Nginx, HAProxy
│ Balancer │
├──────────────┤
│ Application│ ← Несколько инстансов
│ Servers │
├──────────────┤
│ Database │
└──────────────┘
# data_access_layer.py
# Слой доступа к данным — только CRUD
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String, unique=True)
class UserRepository:
"""Data Access Layer — инкапсулирует работу с БД"""
def __init__(self, session_factory):
self._session_factory = session_factory
def get_by_id(self, user_id: int) -> User | None:
with self._session_factory() as session:
return session.get(User, user_id)
def get_by_email(self, email: str) -> User | None:
with self._session_factory() as session:
return session.query(User).filter(User.email == email).first()
def create(self, name: str, email: str) -> User:
with self._session_factory() as session:
user = User(name=name, email=email)
session.add(user)
session.commit()
session.refresh(user)
return user
def delete(self, user_id: int) -> bool:
with self._session_factory() as session:
user = session.get(User, user_id)
if user:
session.delete(user)
session.commit()
return True
return False# business_logic_layer.py
# Бизнес-логика — правила, валидация, оркестрация
from dataclasses import dataclass
from typing import Optional
from data_access_layer import UserRepository, User
@dataclass
class CreateUserCommand:
name: str
email: str
@dataclass
class UserDTO:
id: int
name: str
email: str
class UserService:
"""Business Logic Layer — содержит правила предметной области"""
def __init__(self, user_repo: UserRepository):
self._user_repo = user_repo
def create_user(self, command: CreateUserCommand) -> UserDTO:
# Валидация — часть бизнес-логики
if not command.name.strip():
raise ValueError("Name cannot be empty")
if "@" not in command.email:
raise ValueError("Invalid email format")
# Проверка на дубликат
existing = self._user_repo.get_by_email(command.email)
if existing:
raise ValueError(f"Email {command.email} already exists")
# Создание через репозиторий
user = self._user_repo.create(command.name, command.email)
return UserDTO(id=user.id, name=user.name, email=user.email)
def get_user(self, user_id: int) -> UserDTO:
user = self._user_repo.get_by_id(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
return UserDTO(id=user.id, name=user.name, email=user.email)
def delete_user(self, user_id: int) -> bool:
# Бизнес-правило: нельзя удалить последнего пользователя
# (упрощённо — проверяем existence)
return self._user_repo.delete(user_id)# presentation_layer.py
# Presentation Layer — HTTP endpoints
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
from business_logic_layer import UserService, CreateUserCommand, UserDTO
app = FastAPI()
class CreateUserRequest(BaseModel):
name: str
email: str
class UserResponse(BaseModel):
id: int
name: str
email: str
class Config:
from_attributes = True
# Dependency injection для сервиса
def get_user_service() -> UserService:
# В реальном приложении — из контейнера зависимостей
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from data_access_layer import UserRepository, Base
engine = create_engine("sqlite:///./test.db")
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)
user_repo = UserRepository(SessionLocal)
return UserService(user_repo)
@app.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(request: CreateUserRequest, service: UserService = None):
if service is None:
service = get_user_service()
try:
command = CreateUserCommand(name=request.name, email=request.email)
user = service.create_user(command)
return user
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/users/{user_id}", response_model=UserResponse)
def get_user(user_id: int, service: UserService = None):
if service is None:
service = get_user_service()
try:
return service.get_user(user_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, service: UserService = None):
if service is None:
service = get_user_service()
if not service.delete_user(user_id):
raise HTTPException(status_code=404, detail="User not found")| Ситуация | Подходит? |
|---|---|
| Типичное CRUD-приложение | ✅ Да |
| Веб-сервис с REST API | ✅ Да |
| Быстрый старт проекта | ✅ Да |
| Высокие требования к тестируемости бизнес-логики | ⚠️ Частично |
| Микросервисы с независимым деплоем | ❌ Нет |
| Event-driven система | ❌ Нет |
Плюсы:
Минусы:
Hexagonal Architecture (также известная как Ports & Adapters) ставит бизнес-логику в центр, а внешние зависимости (БД, HTTP, CLI) — на периферию.
┌─────────────────┐
│ HTTP Adapter │
│ (Controller) │
└────────┬────────┘
│
┌──────────────┐ ┌────────▼────────┐ ┌──────────────┐
│ DB Adapter │◄───│ INPUT PORT │───►│ CLI Adapter │
│ (Repository)│ │ │ │ │
└──────────────┘ │ Business Logic │ └──────────────┘
│ (Domain) │
┌──────────────┐ │ │ ┌──────────────┐
│ Email │◄───│ OUTPUT PORT │───►│ Message │
│ Adapter │ │ (Interface) │ │ Queue │
└──────────────┘ └─────────────────┘ └──────────────┘
Ключевые понятия:
# domain/models.py
# Доменная модель — чистая бизнес-логика, без зависимостей
from dataclasses import dataclass
from typing import Optional
from uuid import UUID, uuid4
@dataclass
class User:
"""Доменная сущность — не знает о БД или HTTP"""
id: UUID
name: str
email: str
is_active: bool = True
@classmethod
def create(cls, name: str, email: str) -> "User":
"""Factory method для создания нового пользователя"""
return cls(id=uuid4(), name=name, email=email)
def deactivate(self):
"""Бизнес-правило: деактивация пользователя"""
if not self.is_active:
raise ValueError("User already inactive")
self.is_active = False
@dataclass
class Order:
"""Другая доменная сущность"""
id: UUID
user_id: UUID
total_amount: float
status: str = "pending"
def complete(self):
if self.status != "pending":
raise ValueError(f"Cannot complete order in status {self.status}")
self.status = "completed"# domain/ports.py
# Порты — интерфейсы, которые определяет домен
from abc import ABC, abstractmethod
from typing import Optional
from uuid import UUID
from domain.models import User, Order
class UserRepositoryPort(ABC):
"""Output Port для работы с пользователями"""
@abstractmethod
def get_by_id(self, user_id: UUID) -> Optional[User]:
pass
@abstractmethod
def get_by_email(self, email: str) -> Optional[User]:
pass
@abstractmethod
def save(self, user: User) -> None:
pass
@abstractmethod
def delete(self, user_id: UUID) -> bool:
pass
class OrderRepositoryPort(ABC):
"""Output Port для работы с заказами"""
@abstractmethod
def get_by_id(self, order_id: UUID) -> Optional[Order]:
pass
@abstractmethod
def save(self, order: Order) -> None:
pass
class EmailSenderPort(ABC):
"""Output Port для отправки уведомлений"""
@abstractmethod
def send_welcome(self, user: User) -> None:
pass
@abstractmethod
def send_order_confirmation(self, user: User, order: Order) -> None:
pass# domain/services.py
# Доменные сервисы — бизнес-логика, зависит только от портов
from domain.models import User, Order
from domain.ports import UserRepositoryPort, OrderRepositoryPort, EmailSenderPort
from uuid import UUID
class UserService:
"""
Input Port — внешний мир вызывает методы этого сервиса.
Зависит от абстракций (портов), не от реализаций.
"""
def __init__(
self,
user_repo: UserRepositoryPort,
email_sender: EmailSenderPort,
):
self._user_repo = user_repo
self._email_sender = email_sender
def register_user(self, name: str, email: str) -> User:
# Бизнес-валидация
existing = self._user_repo.get_by_email(email)
if existing:
raise ValueError(f"Email {email} already registered")
# Создание через factory method доменной модели
user = User.create(name=name, email=email)
# Сохранение через порт
self._user_repo.save(user)
# Отправка уведомления через порт
self._email_sender.send_welcome(user)
return user
def deactivate_user(self, user_id: UUID) -> None:
user = self._user_repo.get_by_id(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
user.deactivate() # Вызов бизнес-метода доменной модели
self._user_repo.save(user)
class OrderService:
def __init__(
self,
order_repo: OrderRepositoryPort,
user_repo: UserRepositoryPort,
email_sender: EmailSenderPort,
):
self._order_repo = order_repo
self._user_repo = user_repo
self._email_sender = email_sender
def create_order(self, user_id: UUID, total_amount: float) -> Order:
# Проверка существования пользователя
user = self._user_repo.get_by_id(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
if not user.is_active:
raise ValueError(f"User {user_id} is deactivated")
order = Order(id=UUID(int=0), user_id=user_id, total_amount=total_amount)
self._order_repo.save(order)
self._email_sender.send_order_confirmation(user, order)
return order
def complete_order(self, order_id: UUID) -> None:
order = self._order_repo.get_by_id(order_id)
if not order:
raise ValueError(f"Order {order_id} not found")
order.complete()
self._order_repo.save(order)# adapters/repositories.py
# Адаптеры — реализации портов для конкретных технологий
from typing import Optional
from uuid import UUID
from domain.models import User
from domain.ports import UserRepositoryPort
class SQLAlchemyUserRepository(UserRepositoryPort):
"""Адаптер для SQLAlchemy"""
def __init__(self, session_factory):
self._session_factory = session_factory
def get_by_id(self, user_id: UUID) -> Optional[User]:
with self._session_factory() as session:
# Конвертация из ORM в доменную модель
orm_user = session.get(UserORM, user_id)
if not orm_user:
return None
return User(
id=orm_user.id,
name=orm_user.name,
email=orm_user.email,
is_active=orm_user.is_active,
)
def get_by_email(self, email: str) -> Optional[User]:
with self._session_factory() as session:
orm_user = session.query(UserORM).filter(UserORM.email == email).first()
if not orm_user:
return None
return User(
id=orm_user.id,
name=orm_user.name,
email=orm_user.email,
is_active=orm_user.is_active,
)
def save(self, user: User) -> None:
with self._session_factory() as session:
orm_user = UserORM(
id=user.id,
name=user.name,
email=user.email,
is_active=user.is_active,
)
session.merge(orm_user)
session.commit()
def delete(self, user_id: UUID) -> bool:
with self._session_factory() as session:
orm_user = session.get(UserORM, user_id)
if not orm_user:
return False
session.delete(orm_user)
session.commit()
return True
class InMemoryUserRepository(UserRepositoryPort):
"""Адаптер для тестов — in-memory хранилище"""
def __init__(self):
self._users: dict[UUID, User] = {}
def get_by_id(self, user_id: UUID) -> Optional[User]:
return self._users.get(user_id)
def get_by_email(self, email: str) -> Optional[User]:
for user in self._users.values():
if user.email == email:
return user
return None
def save(self, user: User) -> None:
self._users[user.id] = user
def delete(self, user_id: UUID) -> bool:
return self._users.pop(user_id, None) is not None# adapters/email.py
# Адаптеры для внешних сервисов
from domain.models import User, Order
from domain.ports import EmailSenderPort
class SMTPEmailSender(EmailSenderPort):
"""Реальный SMTP-отправитель"""
def __init__(self, smtp_host: str, smtp_port: int):
self._smtp_host = smtp_host
self._smtp_port = smtp_port
def send_welcome(self, user: User) -> None:
# Реальная отправка через SMTP
print(f"Sending welcome email to {user.email} via {self._smtp_host}")
def send_order_confirmation(self, user: User, order: Order) -> None:
print(f"Sending order confirmation to {user.email}: ${order.total_amount}")
class ConsoleEmailSender(EmailSenderPort):
"""Адаптер для разработки/тестов — пишет в консоль"""
def send_welcome(self, user: User) -> None:
print(f"[EMAIL] Welcome, {user.name}!")
def send_order_confirmation(self, user: User, order: Order) -> None:
print(f"[EMAIL] Order ${order.total_amount} confirmed for {user.name}")
class NullEmailSender(EmailSenderPort):
"""Null Object pattern — для тестов, где email не нужен"""
def send_welcome(self, user: User) -> None:
pass
def send_order_confirmation(self, user: User, order: Order) -> None:
pass# adapters/controllers.py
# HTTP-адаптер — входной порт
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, EmailStr
from uuid import UUID
from domain.services import UserService, OrderService
from domain.models import User, Order
app = FastAPI()
class CreateUserRequest(BaseModel):
name: str
email: EmailStr
class UserResponse(BaseModel):
id: UUID
name: str
email: str
is_active: bool
class Config:
from_attributes = True
class CreateOrderRequest(BaseModel):
user_id: UUID
total_amount: float
# Адаптер преобразует HTTP-запрос в вызов доменного сервиса
@app.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(request: CreateUserRequest, user_service: UserService):
try:
user = user_service.register_user(name=request.name, email=request.email)
return user
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/users/{user_id}", response_model=UserResponse)
def get_user(user_id: UUID, user_service: UserService):
user = user_service._user_repo.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.post("/orders", status_code=status.HTTP_201_CREATED)
def create_order(request: CreateOrderRequest, order_service: OrderService):
try:
order = order_service.create_order(
user_id=request.user_id,
total_amount=request.total_amount,
)
return {"id": order.id, "status": order.status}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))# main.py
# Сборка приложения — wiring
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from adapters.repositories import SQLAlchemyUserRepository, InMemoryUserRepository
from adapters.email import SMTPEmailSender, ConsoleEmailSender
from domain.services import UserService
# Для production
engine = create_engine("postgresql://user:pass@localhost/dbname")
SessionLocal = sessionmaker(bind=engine)
user_repo = SQLAlchemyUserRepository(SessionLocal)
email_sender = SMTPEmailSender("smtp.example.com", 587)
user_service = UserService(user_repo, email_sender)
# Для тестов
# user_repo = InMemoryUserRepository()
# email_sender = NullEmailSender()
# user_service = UserService(user_repo, email_sender)| Ситуация | Подходит? |
|---|---|
| Требуется высокая тестируемость | ✅ Да |
| Частая смена внешних зависимостей (БД, API) | ✅ Да |
| Сложная бизнес-логика | ✅ Да |
| Простое CRUD-приложение | ⚠️ Overkill |
| Быстрый прототип / MVP | ⚠️ Может замедлить |
| Микросервис с чёткими границами | ✅ Да |
Плюсы:
Минусы:
Clean Architecture (Роберт Мартин) развивает идеи Hexagonal, добавляя строгие правила зависимостей и разделение на слои внутри ядра.
┌─────────────────────────────────────────────────────────────┐
│ Frameworks & Drivers │
│ (FastAPI, SQLAlchemy, Django, HTTP) │
├─────────────────────────────────────────────────────────────┤
│ Interface Adapters │
│ (Controllers, Presenters, Gateways, Repositories) │
├─────────────────────────────────────────────────────────────┤
│ Use Cases │
│ (Application-specific business rules, оркестрация) │
├─────────────────────────────────────────────────────────────┤
│ Enterprise Business Rules │
│ (Domain Models, Entities, чистая бизнес-логика) │
└─────────────────────────────────────────────────────────────┘
↓ Зависимости направлены внутрь ↓
Правило зависимостей: код может зависеть только от слоёв внутри. Внешние слои зависят от внутренних через интерфейсы.
# entities/user.py
# Enterprise Business Rules — самый внутренний слой
from dataclasses import dataclass
from uuid import UUID, uuid4
from typing import Optional
@dataclass
class User:
"""
Entity — бизнес-объект с идентичностью.
Не зависит НИ от чего внешнего.
"""
id: UUID
name: str
email: str
is_active: bool = True
@classmethod
def create(cls, name: str, email: str) -> "User":
return cls(id=uuid4(), name=name, email=email)
def deactivate(self) -> None:
"""Бизнес-правило: деактивация"""
if not self.is_active:
raise ValueError("User already inactive")
self.is_active = False
def update_email(self, new_email: str) -> None:
"""Бизнес-правило: смена email"""
if "@" not in new_email:
raise ValueError("Invalid email")
self.email = new_email# entities/order.py
from dataclasses import dataclass
from uuid import UUID
from enum import Enum
class OrderStatus(str, Enum):
PENDING = "pending"
PAID = "paid"
SHIPPED = "shipped"
CANCELLED = "cancelled"
@dataclass
class Order:
"""Order Entity"""
id: UUID
user_id: UUID
items: list[dict]
total_amount: float
status: OrderStatus = OrderStatus.PENDING
def can_cancel(self) -> bool:
"""Бизнес-правило: отмена заказа"""
return self.status == OrderStatus.PENDING
def cancel(self) -> None:
if not self.can_cancel():
raise ValueError(f"Cannot cancel order in status {self.status}")
self.status = OrderStatus.CANCELLED
def mark_as_paid(self) -> None:
if self.status != OrderStatus.PENDING:
raise ValueError(f"Cannot pay order in status {self.status}")
self.status = OrderStatus.PAID# use_cases/user_use_cases.py
# Use Cases — application business rules
from abc import ABC, abstractmethod
from typing import Optional
from uuid import UUID
from entities.user import User
class UserRepositoryInterface(ABC):
"""Interface для репозитория — определяется в слое Use Cases"""
@abstractmethod
def get_by_id(self, user_id: UUID) -> Optional[User]:
pass
@abstractmethod
def get_by_email(self, email: str) -> Optional[User]:
pass
@abstractmethod
def save(self, user: User) -> None:
pass
class UserNotifierInterface(ABC):
"""Interface для уведомлений"""
@abstractmethod
def send_welcome(self, user: User) -> None:
pass
class RegisterUserUseCase:
"""
Use Case — описывает конкретное действие пользователя в системе.
Зависит только от интерфейсов, определённых здесь же.
"""
def __init__(
self,
user_repo: UserRepositoryInterface,
notifier: UserNotifierInterface,
):
self._user_repo = user_repo
self._notifier = notifier
def execute(self, name: str, email: str) -> User:
# Проверка на дубликат
existing = self._user_repo.get_by_email(email)
if existing:
raise ValueError(f"Email {email} already registered")
# Создание entity через factory
user = User.create(name=name, email=email)
# Сохранение
self._user_repo.save(user)
# Уведомление
self._notifier.send_welcome(user)
return user
class DeactivateUserUseCase:
def __init__(self, user_repo: UserRepositoryInterface):
self._user_repo = user_repo
def execute(self, user_id: UUID) -> None:
user = self._user_repo.get_by_id(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
user.deactivate()
self._user_repo.save(user)
class GetUserUseCase:
def __init__(self, user_repo: UserRepositoryInterface):
self._user_repo = user_repo
def execute(self, user_id: UUID) -> User:
user = self._user_repo.get_by_id(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
return user# use_cases/order_use_cases.py
from abc import ABC, abstractmethod
from typing import Optional
from uuid import UUID
from entities.order import Order, OrderStatus
from entities.user import User
class OrderRepositoryInterface(ABC):
@abstractmethod
def get_by_id(self, order_id: UUID) -> Optional[Order]:
pass
@abstractmethod
def save(self, order: Order) -> None:
pass
class UserRepositoryInterface(ABC):
@abstractmethod
def get_by_id(self, user_id: UUID) -> Optional[User]:
pass
class CreateOrderUseCase:
def __init__(
self,
order_repo: OrderRepositoryInterface,
user_repo: UserRepositoryInterface,
):
self._order_repo = order_repo
self._user_repo = user_repo
def execute(self, user_id: UUID, items: list[dict]) -> Order:
# Проверка пользователя
user = self._user_repo.get_by_id(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
if not user.is_active:
raise ValueError(f"User {user_id} is deactivated")
# Расчёт суммы
total_amount = sum(item["price"] * item["quantity"] for item in items)
order = Order(
id=UUID(int=0),
user_id=user_id,
items=items,
total_amount=total_amount,
)
self._order_repo.save(order)
return order
class CancelOrderUseCase:
def __init__(self, order_repo: OrderRepositoryInterface):
self._order_repo = order_repo
def execute(self, order_id: UUID) -> None:
order = self._order_repo.get_by_id(order_id)
if not order:
raise ValueError(f"Order {order_id} not found")
order.cancel()
self._order_repo.save(order)# adapters/repositories.py
# Interface Adapters — реализация интерфейсов
from typing import Optional
from uuid import UUID
from sqlalchemy.orm import sessionmaker, declarative_base
from entities.user import User
from use_cases.user_use_cases import UserRepositoryInterface
Base = declarative_base()
class UserORM(Base):
__tablename__ = "users"
id = Column(UUID, primary_key=True)
name = Column(String)
email = Column(String, unique=True)
is_active = Column(Boolean, default=True)
class SQLAlchemyUserRepository(UserRepositoryInterface):
"""Адаптер SQLAlchemy"""
def __init__(self, session_factory: sessionmaker):
self._session_factory = session_factory
def get_by_id(self, user_id: UUID) -> Optional[User]:
with self._session_factory() as session:
orm = session.get(UserORM, user_id)
if not orm:
return None
return User(id=orm.id, name=orm.name, email=orm.email, is_active=orm.is_active)
def get_by_email(self, email: str) -> Optional[User]:
with self._session_factory() as session:
orm = session.query(UserORM).filter(UserORM.email == email).first()
if not orm:
return None
return User(id=orm.id, name=orm.name, email=orm.email, is_active=orm.is_active)
def save(self, user: User) -> None:
with self._session_factory() as session:
orm = UserORM(id=user.id, name=user.name, email=user.email, is_active=user.is_active)
session.merge(orm)
session.commit()
class InMemoryUserRepository(UserRepositoryInterface):
"""Адаптер для тестов"""
def __init__(self):
self._storage: dict[UUID, User] = {}
def get_by_id(self, user_id: UUID) -> Optional[User]:
return self._storage.get(user_id)
def get_by_email(self, email: str) -> Optional[User]:
return next((u for u in self._storage.values() if u.email == email), None)
def save(self, user: User) -> None:
self._storage[user.id] = user# adapters/controllers.py
# Interface Adapters — HTTP контроллеры
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
from uuid import UUID
from use_cases.user_use_cases import RegisterUserUseCase, DeactivateUserUseCase, GetUserUseCase
from use_cases.order_use_cases import CreateOrderUseCase, CancelOrderUseCase
app = FastAPI()
class RegisterUserRequest(BaseModel):
name: str
email: str
class UserResponse(BaseModel):
id: UUID
name: str
email: str
is_active: bool
class Config:
from_attributes = True
class CreateOrderRequest(BaseModel):
user_id: UUID
items: list[dict]
# Контроллер использует Use Case
@app.post("/users", response_model=UserResponse, status_code=201)
def register_user(request: RegisterUserRequest, use_case: RegisterUserUseCase):
try:
user = use_case.execute(name=request.name, email=request.email)
return user
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/users/{user_id}", response_model=UserResponse)
def get_user(user_id: UUID, use_case: GetUserUseCase):
try:
return use_case.execute(user_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@app.delete("/users/{user_id}", status_code=204)
def deactivate_user(user_id: UUID, use_case: DeactivateUserUseCase):
try:
use_case.execute(user_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@app.post("/orders", status_code=201)
def create_order(request: CreateOrderRequest, use_case: CreateOrderUseCase):
try:
order = use_case.execute(user_id=request.user_id, items=request.items)
return {"id": order.id, "status": order.status, "total": order.total_amount}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))# adapters/notifiers.py
from entities.user import User
from use_cases.user_use_cases import UserNotifierInterface
class SMTPEmailNotifier(UserNotifierInterface):
def __init__(self, smtp_config: dict):
self._smtp_config = smtp_config
def send_welcome(self, user: User) -> None:
# Реальная отправка SMTP
print(f"Sending welcome to {user.email}")
class ConsoleNotifier(UserNotifierInterface):
def send_welcome(self, user: User) -> None:
print(f"[NOTIFIER] Welcome, {user.name}!")
class NullNotifier(UserNotifierInterface):
def send_welcome(self, user: User) -> None:
pass# dependency_injection.py
# Сборка приложения — wiring
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from adapters.repositories import SQLAlchemyUserRepository, InMemoryUserRepository
from adapters.notifiers import SMTPEmailNotifier, ConsoleNotifier, NullNotifier
from use_cases.user_use_cases import RegisterUserUseCase, DeactivateUserUseCase, GetUserUseCase
from use_cases.order_use_cases import CreateOrderUseCase, CancelOrderUseCase
def create_production_dependencies():
"""Factory для production зависимостей"""
engine = create_engine("postgresql://user:pass@localhost/dbname")
SessionLocal = sessionmaker(bind=engine)
user_repo = SQLAlchemyUserRepository(SessionLocal)
notifier = SMTPEmailNotifier({"host": "smtp.example.com", "port": 587})
return {
"register_user": RegisterUserUseCase(user_repo, notifier),
"deactivate_user": DeactivateUserUseCase(user_repo),
"get_user": GetUserUseCase(user_repo),
}
def create_test_dependencies():
"""Factory для тестовых зависимостей"""
user_repo = InMemoryUserRepository()
notifier = NullNotifier()
return {
"register_user": RegisterUserUseCase(user_repo, notifier),
"deactivate_user": DeactivateUserUseCase(user_repo),
"get_user": GetUserUseCase(user_repo),
}| Ситуация | Подходит? |
|---|---|
| Крупный проект с долгой поддержкой | ✅ Да |
| Сложная бизнес-логика | ✅ Да |
| Команда 5+ разработчиков | ✅ Да |
| Требуется максимальная тестируемость | ✅ Да |
| Быстрый стартап / MVP | ⚠️ Overkill |
| Простое CRUD-приложение | ❌ Нет |
| Проект < 6 месяцев жизни | ⚠️ Возможно излишне |
Плюсы:
Минусы:
Event-Driven Architecture (EDA) строится вокруг событий — фактов, которые произошли в системе. Компоненты общаются асинхронно через события, не зная друг о друге.
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Service A │─────►│ Event Bus │─────►│ Service B │
│ (Publisher) │ │ (Kafka, │ │ (Subscriber)│
└──────────────┘ │ RabbitMQ) │ └──────────────┘
└──────────────┘
│
┌──────▼──────┐
│ Service C │
│ (Subscriber)│
└──────────────┘
Ключевые понятия:
# events/domain_events.py
# Определение событий
from dataclasses import dataclass, field
from datetime import datetime
from uuid import UUID, uuid4
from enum import Enum
class EventType(str, Enum):
USER_REGISTERED = "user.registered"
USER_DEACTIVATED = "user.deactivated"
ORDER_CREATED = "order.created"
ORDER_PAID = "order.paid"
ORDER_CANCELLED = "order.cancelled"
PAYMENT_FAILED = "payment.failed"
@dataclass
class DomainEvent:
"""Базовый класс события"""
event_id: UUID = field(default_factory=uuid4)
event_type: str = ""
timestamp: datetime = field(default_factory=datetime.utcnow)
aggregate_id: UUID = None # ID сущности, к которой относится событие
@dataclass
class UserRegisteredEvent(DomainEvent):
event_type: str = EventType.USER_REGISTERED
user_id: UUID = None
email: str = ""
name: str = ""
@dataclass
class UserDeactivatedEvent(DomainEvent):
event_type: str = EventType.USER_DEACTIVATED
user_id: UUID = None
@dataclass
class OrderCreatedEvent(DomainEvent):
event_type: str = EventType.ORDER_CREATED
order_id: UUID = None
user_id: UUID = None
total_amount: float = 0.0
@dataclass
class OrderPaidEvent(DomainEvent):
event_type: str = EventType.ORDER_PAID
order_id: UUID = None
payment_id: str = ""
@dataclass
class OrderCancelledEvent(DomainEvent):
event_type: str = EventType.ORDER_CANCELLED
order_id: UUID = None
reason: str = ""# events/event_bus.py
# Шина событий — абстракция
from abc import ABC, abstractmethod
from typing import Callable, Awaitable
from events.domain_events import DomainEvent
class EventBusInterface(ABC):
"""Интерфейс шины событий"""
@abstractmethod
async def publish(self, event: DomainEvent) -> None:
"""Опубликовать событие"""
pass
@abstractmethod
def subscribe(
self,
event_type: str,
handler: Callable[[DomainEvent], Awaitable[None]],
) -> None:
"""Подписаться на тип событий"""
pass
class InMemoryEventBus(EventBusInterface):
"""In-memory реализация для разработки и тестов"""
def __init__(self):
self._subscribers: dict[str, list[Callable]] = {}
self._published_events: list[DomainEvent] = []
async def publish(self, event: DomainEvent) -> None:
self._published_events.append(event)
# Асинхронный вызов всех подписчиков
handlers = self._subscribers.get(event.event_type, [])
for handler in handlers:
await handler(event)
def subscribe(
self,
event_type: str,
handler: Callable[[DomainEvent], Awaitable[None]],
) -> None:
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(handler)
def get_published_events(self) -> list[DomainEvent]:
return self._published_events.copy()# services/user_service.py
# Сервис с публикацией событий
from uuid import UUID
from events.domain_events import UserRegisteredEvent, UserDeactivatedEvent
from events.event_bus import EventBusInterface
class UserService:
def __init__(self, user_repo, event_bus: EventBusInterface):
self._user_repo = user_repo
self._event_bus = event_bus
async def register_user(self, name: str, email: str) -> UUID:
# Проверка на дубликат
existing = self._user_repo.get_by_email(email)
if existing:
raise ValueError(f"Email {email} already registered")
# Создание пользователя
user = self._user_repo.create(name=name, email=email)
# Публикация события
event = UserRegisteredEvent(
user_id=user.id,
email=user.email,
name=user.name,
)
await self._event_bus.publish(event)
return user.id
async def deactivate_user(self, user_id: UUID) -> None:
user = self._user_repo.get_by_id(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
user.deactivate()
self._user_repo.save(user)
# Публикация события
event = UserDeactivatedEvent(user_id=user_id)
await self._event_bus.publish(event)# services/email_service.py
# Подписчик на события — отправка email
from events.domain_events import UserRegisteredEvent, DomainEvent
from events.event_bus import EventBusInterface
class EmailService:
"""Сервис, который подписывается на события"""
def __init__(self, event_bus: EventBusInterface, smtp_client):
self._smtp_client = smtp_client
# Подписка на события
event_bus.subscribe("user.registered", self._on_user_registered)
async def _on_user_registered(self, event: DomainEvent) -> None:
"""Обработчик события регистрации пользователя"""
if not isinstance(event, UserRegisteredEvent):
return
# Отправка welcome email
await self._smtp_client.send(
to=event.email,
subject="Welcome!",
body=f"Hello, {event.name}! Welcome to our platform.",
)
print(f"Welcome email sent to {event.email}")# services/analytics_service.py
# Другой подписчик — аналитика
from events.domain_events import UserRegisteredEvent, OrderCreatedEvent, DomainEvent
from events.event_bus import EventBusInterface
class AnalyticsService:
"""Сервис аналитики — подписывается на разные события"""
def __init__(self, event_bus: EventBusInterface, analytics_db):
self._analytics_db = analytics_db
# Подписка на несколько типов событий
event_bus.subscribe("user.registered", self._on_user_registered)
event_bus.subscribe("order.created", self._on_order_created)
async def _on_user_registered(self, event: DomainEvent) -> None:
if not isinstance(event, UserRegisteredEvent):
return
# Логирование в аналитику
await self._analytics_db.record_event(
event_type="user_signup",
user_id=event.user_id,
timestamp=event.timestamp,
)
async def _on_order_created(self, event: DomainEvent) -> None:
if not isinstance(event, OrderCreatedEvent):
return
# Логирование заказа
await self._analytics_db.record_event(
event_type="order_created",
order_id=event.order_id,
amount=event.total_amount,
timestamp=event.timestamp,
)# services/order_service.py
# Сервис заказов с событиями
from uuid import UUID
from events.domain_events import OrderCreatedEvent, OrderPaidEvent, OrderCancelledEvent
from events.event_bus import EventBusInterface
class OrderService:
def __init__(
self,
order_repo,
user_repo,
event_bus: EventBusInterface,
):
self._order_repo = order_repo
self._user_repo = user_repo
self._event_bus = event_bus
async def create_order(self, user_id: UUID, items: list[dict]) -> UUID:
# Проверка пользователя
user = self._user_repo.get_by_id(user_id)
if not user or not user.is_active:
raise ValueError(f"Invalid user {user_id}")
total = sum(item["price"] * item["quantity"] for item in items)
order = self._order_repo.create(user_id=user_id, items=items, total=total)
# Публикация события
event = OrderCreatedEvent(
order_id=order.id,
user_id=user_id,
total_amount=total,
)
await self._event_bus.publish(event)
return order.id
async def mark_order_paid(self, order_id: UUID, payment_id: str) -> None:
order = self._order_repo.get_by_id(order_id)
if not order:
raise ValueError(f"Order {order_id} not found")
order.mark_as_paid()
self._order_repo.save(order)
# Публикация события оплаты
event = OrderPaidEvent(order_id=order_id, payment_id=payment_id)
await self._event_bus.publish(event)
async def cancel_order(self, order_id: UUID, reason: str) -> None:
order = self._order_repo.get_by_id(order_id)
if not order:
raise ValueError(f"Order {order_id} not found")
order.cancel()
self._order_repo.save(order)
# Публикация события отмены
event = OrderCancelledEvent(order_id=order_id, reason=reason)
await self._event_bus.publish(event)# services/payment_service.py
# Сервис оплаты с обработкой событий
from events.domain_events import OrderCreatedEvent, DomainEvent
from events.event_bus import EventBusInterface
class PaymentService:
"""Сервис оплаты — автоматически обрабатывает создание заказов"""
def __init__(self, order_service, event_bus: EventBusInterface, payment_gateway):
self._order_service = order_service
self._payment_gateway = payment_gateway
# Подписка на создание заказов
event_bus.subscribe("order.created", self._on_order_created)
async def _on_order_created(self, event: DomainEvent) -> None:
"""Автоматическая обработка оплаты при создании заказа"""
from events.domain_events import OrderCreatedEvent
if not isinstance(event, OrderCreatedEvent):
return
# Инициация оплаты через payment gateway
payment_result = await self._payment_gateway.charge(
amount=event.total_amount,
order_id=event.order_id,
)
if payment_result.success:
await self._order_service.mark_order_paid(
order_id=event.order_id,
payment_id=payment_result.id,
)
else:
# Публикация события неудачной оплаты
from events.domain_events import PaymentFailedEvent
fail_event = PaymentFailedEvent(
order_id=event.order_id,
reason=payment_result.error,
)
await self._event_bus.publish(fail_event)# main.py
# Сборка приложения
from events.event_bus import InMemoryEventBus
from services.user_service import UserService
from services.order_service import OrderService
from services.email_service import EmailService
from services.analytics_service import AnalyticsService
from services.payment_service import PaymentService
# Создание шины событий
event_bus = InMemoryEventBus()
# Создание сервисов
user_service = UserService(user_repo, event_bus)
order_service = OrderService(order_repo, user_repo, event_bus)
# Подписчики автоматически регистрируются при создании
email_service = EmailService(event_bus, smtp_client)
analytics_service = AnalyticsService(event_bus, analytics_db)
payment_service = PaymentService(order_service, event_bus, payment_gateway)
# Теперь при регистрации пользователя:
# 1. UserService публикует UserRegisteredEvent
# 2. EmailService отправляет welcome email
# 3. AnalyticsService логирует событие
# Все сервисы не знают друг о друге!# events/event_store.py
# Event Store для Event Sourcing
from typing import list
from events.domain_events import DomainEvent
class EventStore:
"""Хранилище событий — источник истины"""
def __init__(self):
self._events: list[DomainEvent] = []
def append(self, event: DomainEvent) -> None:
"""Добавить событие в поток"""
self._events.append(event)
def get_events_for_aggregate(
self,
aggregate_id: UUID,
aggregate_type: str,
) -> list[DomainEvent]:
"""Получить все события для агрегата"""
return [
e for e in self._events
if e.aggregate_id == aggregate_id and e.event_type.startswith(aggregate_type)
]
def get_all_events(self, after_version: int = 0) -> list[DomainEvent]:
"""Получить все события после версии (для проекций)"""
return self._events[after_version:]| Ситуация | Подходит? |
|---|---|
| Асинхронная обработка | ✅ Да |
| Микросервисы | ✅ Да |
| Высокая масштабируемость | ✅ Да |
| Реактивные системы | ✅ Да |
| Простое CRUD-приложение | ❌ Нет |
| Строгая консистентность | ⚠️ Сложно |
| Маленькая команда | ⚠️ Может быть overkill |
Плюсы:
Минусы:
| Критерий | Layered | Hexagonal | Clean | Event-Driven |
|---|---|---|---|---|
| Сложность | Низкая | Средняя | Высокая | Высокая |
| Тестируемость | Средняя | Высокая | Очень высокая | Высокая |
| Гибкость | Низкая | Высокая | Очень высокая | Очень высокая |
| Производительность | Высокая | Высокая | Высокая | Средняя |
| Время разработки | Быстро | Средне | Медленно | Медленно |
| Для CRUD | ✅ Отлично | ⚠️ OK | ❌ Overkill | ❌ Нет |
| Для микросервисов | ⚠️ OK | ✅ Отлично | ✅ Отлично | ✅ Отлично |
Layered Architecture:
Hexagonal Architecture:
Clean Architecture:
Event-Driven Architecture:
На практике паттерны часто комбинируют:
┌─────────────────────────────────────────────┐
│ Layered Architecture │ ← Общая структура
│ ┌─────────────────────────────────────┐ │
│ │ Hexagonal в Business Layer │ │ ← Изоляция домена
│ │ ┌─────────────────────────────┐ │ │
│ │ │ Clean Architecture │ │ │ ← Строгие правила
│ │ │ внутри домена │ │ │
│ │ └─────────────────────────────┘ │ │
│ └─────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────┐ │
│ │ Event-Driven между сервисами │ │ ← Асинхронность
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
| Паттерн | Когда использовать | Главная выгода |
|---|---|---|
| Layered | CRUD, быстрый старт | Простота |
| Hexagonal | Сложная логика, тестируемость | Изоляция домена |
| Clean | Enterprise, долгая поддержка | Строгость зависимостей |
| Event-Driven | Микросервисы, асинхронность | Loose coupling |
Архитектура — это компромиссы. Выбирайте паттерн под задачу, а не под моду.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.