Декомпозиция монолита, сервис-дискавери, circuit breaker, saga, распределённые транзакции.
Микросервисная архитектура — это не просто технический выбор, это организационная структура, воплощённая в коде. Правильно спроектированные микросервисы позволяют командам работать независимо, масштабироваться горизонтально и отказоустойчиво. Но цена ошибки — распределённый монолит, который сложнее поддерживать, чем обычный монолит.
Выбор между монолитом и микросервисами — это компромисс между простотой и масштабируемостью.
┌─────────────────────────────────────────┐
│ Monolith Application │
│ ┌─────────┬─────────┬─────────────┐ │
│ │ Users │ Orders │ Payments │ │
│ │ Module │ Module │ Module │ │
│ └─────────┴─────────┴─────────────┘ │
│ Single Database │
└─────────────────────────────────────────┘
Преимущества монолита:
Недостатки монолита:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ User Service │ │Order Service │ │Payment Svc │
│ FastAPI │ │ FastAPI │ │ FastAPI │
│ PostgreSQL │ │ PostgreSQL │ │ PostgreSQL │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└───────────────────┼───────────────────┘
┌──────▼───────┐
│ API Gateway │
│ Kong/Nginx │
└──────────────┘
Преимущества микросервисов:
Недостатки микросервисов:
# Чеклист для принятия решения
MICROSERVICES_CHECKLIST = {
"team_size": ">= 5 разработчиков (Conway's Law)",
"scale_requirements": "Нужно масштабировать части системы независимо",
"availability": "Требования к uptime > 99.9%",
"deployment_frequency": "Несколько развёртываний в день",
"complexity": "Система слишком сложна для одного кода",
"organizational": "Независимые команды с разными целями"
}
def should_use_microservices(context: dict) -> bool:
"""
Решение о переходе на микросервисы.
Начинайте с монолита, разделяйте при росте.
"""
score = 0
if context.get("team_size", 1) >= 5:
score += 1
if context.get("need_independent_scaling"):
score += 1
if context.get("high_availability_required"):
score += 1
if context.get("deployment_frequency", 0) >= 5: # в день
score += 1
if context.get("system_complexity") == "high":
score += 1
# Микросервисы оправданы при 4+ баллах
return score >= 4
# Правило: Start with monolith, split when it hurtsПравильная декомпозиция — ключ к успешной микросервисной архитектуре. Ошибка здесь ведёт к распределённому монолиту.
Принцип: Каждый сервис отвечает за одну бизнес-сущность или ограниченную область (Bounded Context).
┌─────────────────────────────────────────────────────┐
│ E-commerce System │
├─────────────┬─────────────┬─────────────┬───────────┤
│ User │ Order │ Product │ Payment │
│ Service │ Service │ Service │ Service │
│ │ │ │ │
│ - Register │ - Create │ - List │ - Charge │
│ - Login │ - Cancel │ - Search │ - Refund │
│ - Profile │ - Track │ - Details │ - History │
│ - Settings │ - History │ - Inventory │ - Methods │
└─────────────┴─────────────┴─────────────┴───────────┘
Пример реализации:
# user_service/app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, EmailStr
from typing import Optional
import asyncpg
app = FastAPI(title="User Service")
class UserCreate(BaseModel):
email: EmailStr
password: str
name: str
class UserResponse(BaseModel):
id: int
email: str
name: str
is_active: bool
# Изолированная БД только для User Service
# Другие сервисы не имеют прямого доступа к этой БД
@app.post("/users", response_model=UserResponse)
async def create_user(user: UserCreate):
"""Создание пользователя — ответственность только User Service"""
async with asyncpg.create_pool("postgresql://user_svc_db") as pool:
async with pool.acquire() as conn:
# Проверка на дубликат email
existing = await conn.fetchrow(
"SELECT id FROM users WHERE email = $1", user.email
)
if existing:
raise HTTPException(status_code=400, detail="Email already exists")
# Создание пользователя
user_row = await conn.fetchrow(
"""
INSERT INTO users (email, password_hash, name, is_active)
VALUES ($1, $2, $3, true)
RETURNING id, email, name, is_active
""",
user.email, hash_password(user.password), user.name
)
return dict(user_row)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
"""Получение пользователя"""
async with asyncpg.create_pool("postgresql://user_svc_db") as pool:
async with pool.acquire() as conn:
user = await conn.fetchrow(
"SELECT id, email, name, is_active FROM users WHERE id = $1",
user_id
)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return dict(user)Преимущества:
Недостатки:
Принцип: Сервисы выделяются по типу операции или функции.
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Write │ │ Read │ │ Analytics │
│ Service │ │ Service │ │ Service │
│ (Commands) │ │ (Queries) │ │ (Reports) │
└──────────────┘ └──────────────┘ └──────────────┘
PostgreSQL PostgreSQL ClickHouse
(Master) (Replica) (OLAP)
Пример: CQRS (Command Query Responsibility Segregation)
# commands_service/app/main.py - Запись (Write Side)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncpg
import asyncio
app = FastAPI(title="Order Command Service")
class CreateOrderCommand(BaseModel):
user_id: int
items: list[dict]
total_amount: float
@app.post("/orders", status_code=201)
async def create_order(command: CreateOrderCommand):
"""
Команда создания заказа.
Только запись, валидация бизнес-правил.
"""
async with asyncpg.create_pool("postgresql://order_write_db") as pool:
async with pool.acquire() as conn:
async with conn.transaction():
# Валидация бизнес-правил
user_exists = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)",
command.user_id
)
if not user_exists:
raise HTTPException(status_code=400, detail="User not found")
# Создание заказа
order = await conn.fetchrow(
"""
INSERT INTO orders (user_id, status, total_amount, created_at)
VALUES ($1, 'pending', $2, NOW())
RETURNING id, user_id, status, total_amount
""",
command.user_id, command.total_amount
)
# Создание позиций заказа
for item in command.items:
await conn.execute(
"""
INSERT INTO order_items (order_id, product_id, quantity, price)
VALUES ($1, $2, $3, $4)
""",
order["id"], item["product_id"], item["quantity"], item["price"]
)
# Публикация события для обновления read модели
await publish_event("order.created", {"order_id": order["id"]})
return {"order_id": order["id"], "status": "pending"}
# queries_service/app/main.py - Чтение (Read Side)
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
import asyncpg
from redis.asyncio import Redis
app = FastAPI(title="Order Query Service")
# Read-optimized денормализованная схема
# orders_read таблица содержит все данные для отображения
@app.get("/orders/{order_id}")
async def get_order(order_id: int):
"""
Запрос получения заказа.
Только чтение, оптимизировано для скорости.
"""
# Сначала проверяем кэш
redis = await Redis.from_url("redis://localhost")
cached = await redis.get(f"order:{order_id}")
if cached:
return JSONResponse(content=cached, headers={"X-Cache": "HIT"})
# Чтение из оптимизированной read БД
async with asyncpg.create_pool("postgresql://order_read_db") as pool:
async with pool.acquire() as conn:
order = await conn.fetchrow(
"""
SELECT o.id, o.user_id, o.status, o.total_amount,
o.created_at, u.email as user_email,
array_agg(oi.product_id) as product_ids,
array_agg(oi.quantity) as quantities
FROM orders_read o
JOIN users_read u ON o.user_id = u.id
LEFT JOIN order_items_read oi ON o.id = oi.order_id
WHERE o.id = $1
GROUP BY o.id, o.user_id, u.email
""",
order_id
)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
result = dict(order)
await redis.setex(f"order:{order_id}", 300, str(result))
return JSONResponse(content=result, headers={"X-Cache": "MISS"})Преимущества:
Недостатки:
Принцип: Разные версии API работают как отдельные сервисы.
┌─────────────────┐ ┌─────────────────┐
│ API v1 Service │ │ API v2 Service │
│ (Legacy) │ │ (New) │
│ Python 3.8 │ │ Python 3.11 │
│ Old Schema │ │ New Schema │
└─────────────────┘ └─────────────────┘
Выбор способа коммуникации влияет на coupling, производительность и надёжность системы.
Синхронная коммуникация через HTTP. Самый распространённый вариант.
# order_service calling user_service via REST
import httpx
from typing import Optional
class UserServiceClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=5.0)
async def get_user(self, user_id: int) -> Optional[dict]:
"""Получение пользователя из User Service"""
try:
response = await self.client.get(
f"{self.base_url}/users/{user_id}"
)
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
except httpx.ConnectError:
# User Service недоступен
return None
except httpx.TimeoutException:
# Превышено время ожидания
return None
# Использование в Order Service
user_client = UserServiceClient("http://user-service:8000")
@app.post("/orders")
async def create_order(order_data: OrderCreate):
# Проверка существования пользователя через REST
user = await user_client.get_user(order_data.user_id)
if not user:
raise HTTPException(status_code=400, detail="User not found")
# Создание заказа...Преимущества REST:
Недостатки REST:
Высокопроизводительная RPC коммуникация. Использует бинарный формат Protobuf.
// user_service/proto/user.proto
syntax = "proto3";
package user;
service UserService {
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc ListUsers(ListUsersRequest) returns (stream User);
rpc CreateUser(CreateUserRequest) returns (User);
}
message User {
int32 id = 1;
string email = 2;
string name = 3;
bool is_active = 4;
google.protobuf.Timestamp created_at = 5;
}
message GetUserRequest {
int32 user_id = 1;
}
message GetUserResponse {
User user = 1;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
}
message CreateUserRequest {
string email = 1;
string password = 2;
string name = 3;
}# user_service/app/grpc_server.py
from grpc import aio
import user_pb2
import user_pb2_grpc
import asyncpg
class UserServiceServicer(user_pb2_grpc.UserServiceServicer):
def __init__(self, db_pool):
self.db_pool = db_pool
async def GetUser(self, request, context):
"""gRPC метод получения пользователя"""
async with self.db_pool.acquire() as conn:
user = await conn.fetchrow(
"SELECT id, email, name, is_active, created_at FROM users WHERE id = $1",
request.user_id
)
if not user:
context.abort(aio.NotFound, "User not found")
return user_pb2.GetUserResponse(
user=user_pb2.User(
id=user["id"],
email=user["email"],
name=user["name"],
is_active=user["is_active"],
created_at=user["created_at"]
)
)
async def ListUsers(self, request, context):
"""gRPC streaming метод для пагинации"""
async with self.db_pool.acquire() as conn:
offset = (request.page - 1) * request.page_size
rows = await conn.fetch(
"SELECT id, email, name, is_active, created_at FROM users LIMIT $1 OFFSET $2",
request.page_size, offset
)
for row in rows:
yield user_pb2.User(
id=row["id"],
email=row["email"],
name=row["name"],
is_active=row["is_active"],
created_at=row["created_at"]
)
# order_service/app/grpc_client.py
import grpc
import user_pb2
import user_pb2_grpc
class UserServiceGrpcClient:
def __init__(self, channel: grpc.aio.Channel):
self.stub = user_pb2_grpc.UserServiceStub(channel)
async def get_user(self, user_id: int) -> user_pb2.User:
"""Вызов gRPC метода"""
try:
response = await self.stub.GetUser(
user_pb2.GetUserRequest(user_id=user_id),
timeout=2.0 # Таймаут критичен для gRPC
)
return response.user
except grpc.aio.AioRpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
return None
elif e.code() == grpc.StatusCode.UNAVAILABLE:
# Сервис недоступен — используем fallback
return None
raise
# Создание канала
channel = grpc.aio.insecure_channel("user-service:50051")
user_client = UserServiceGrpcClient(channel)Преимущества gRPC:
Недостатки gRPC:
Событийная архитектура через брокеры сообщений (Kafka, RabbitMQ, Redis).
# order_service/app/events.py
from aiokafka import AIOKafkaProducer
import json
from datetime import datetime
kafka_producer = AIOKafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v, default=str).encode()
)
async def publish_order_event(event_type: str, order_data: dict):
"""Публикация события о заказе"""
event = {
"event_type": event_type,
"event_id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"data": order_data
}
await kafka_producer.send_and_wait(
topic="order-events",
key=str(order_data["order_id"]).encode(),
value=event
)
# payment_service/app/consumer.py
from aiokafka import AIOKafkaConsumer
import json
async def consume_order_events():
"""Потребление событий о заказах"""
consumer = AIOKafkaConsumer(
"order-events",
bootstrap_servers='kafka:9092',
group_id="payment-service-group",
auto_offset_reset="earliest",
enable_auto_commit=False
)
await consumer.start()
try:
async for msg in consumer:
event = json.loads(msg.value)
if event["event_type"] == "order.created":
await handle_order_created(event["data"])
elif event["event_type"] == "order.cancelled":
await handle_order_cancelled(event["data"])
# Подтверждение обработки
await consumer.commit()
finally:
await consumer.stop()
async def handle_order_created(order_data: dict):
"""Обработка события создания заказа"""
# Инициализация платежа
payment = await create_payment(
order_id=order_data["order_id"],
amount=order_data["total_amount"],
user_id=order_data["user_id"]
)
# Публикация события оплаты
await publish_payment_event("payment.initiated", payment)Преимущества messaging:
Недостатки messaging:
| Критерий | REST | gRPC | Messaging |
|---|---|---|---|
| Производительность | Средняя | Высокая | Высокая |
| Сложность | Низкая | Средняя | Высокая |
| Связанность | Средняя | Высокая | Низкая |
| Консистентность | Синхронная | Синхронная | Асинхронная |
| Идемпотентность | Нужно реализовывать | Нужно реализовывать | Обязательно |
| Использование | Внешние API, простые вызовы | Внутренние high-perf вызовы | События, фоновые задачи |
В динамической среде (Kubernetes, Docker Swarm) IP-адреса сервисов постоянно меняются. Service Discovery решает проблему поиска сервисов.
Клиент сам узнаёт адреса сервисов через Service Registry.
┌─────────────┐ ┌──────────────────┐
│ Client │ │ Service Registry │
│ │◄─────│ (Consul/etcd) │
│ 1. Query │ │ │
│ 2. Get IPs │ │ - svc: 10.0.1.1 │
│ 3. Call │──────► - svc: 10.0.1.2 │
└─────────────┘ │ - svc: 10.0.1.3 │
└──────────────────┘
Реализация с Consul:
import consul
import httpx
import random
from typing import Optional, List
class ConsulServiceDiscovery:
"""Client-side discovery через Consul"""
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
self.consul = consul.aio.Consul(host=consul_host, port=consul_port)
async def get_service_instances(self, service_name: str) -> List[dict]:
"""Получение всех инстансов сервиса"""
_, services = await self.consul.catalog.service(service_name)
return [
{
"address": s["ServiceAddress"] or s["Address"],
"port": s["ServicePort"],
"id": s["ServiceID"]
}
for s in services
]
async def get_healthy_instance(self, service_name: str) -> Optional[dict]:
"""Получение здорового инстанса (random для балансировки)"""
instances = await self.get_service_instances(service_name)
if not instances:
return None
return random.choice(instances)
# Использование
discovery = ConsulServiceDiscovery()
async def call_user_service(endpoint: str, data: dict):
"""Вызов User Service через service discovery"""
instance = await discovery.get_healthy_instance("user-service")
if not instance:
raise Exception("No healthy instances available")
url = f"http://{instance['address']}:{instance['port']}/{endpoint}"
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
return response.json()Преимущества:
Недостатки:
Load Balancer узнаёт адреса сервисов.
┌─────────────┐ ┌─────────────┐ ┌──────────────────┐
│ Client │───►│ Load │ │ Service Registry │
│ │ │ Balancer │◄───│ (Kubernetes) │
└─────────────┘ │ (Nginx/ │ └──────────────────┘
│ Kong) │
└──────┬──────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ svc :1 │ │ svc :2 │ │ svc :3 │
└─────────┘ └─────────┘ └─────────┘
Пример с Nginx + Kubernetes:
# Kubernetes Service (автоматический service discovery)
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
# Nginx Ingress конфигурация
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: api-ingress
annotations:
nginx.ingress.kubernetes.io/load-balance: "round-robin"
nginx.ingress.kubernetes.io/upstream-hash-by: "$remote_addr" # sticky sessions
spec:
rules:
- host: api.example.com
http:
paths:
- path: /users
pathType: Prefix
backend:
service:
name: user-service
port:
number: 80Преимущества:
Недостатки:
Выделенный слой инфраструктуры для сервисной коммуникации.
┌─────────────────────────────────────────────────────┐
│ Service Mesh (Istio) │
├─────────────┬─────────────┬─────────────────────────┤
│ Service │ Service │ Control Plane │
│ Pod │ Pod │ (Pilot, Citadel) │
│ ┌───────┐ │ ┌───────┐ │ │
│ │ App │ │ │ App │ │ - Service Discovery │
│ └───┬───┘ │ └───┬───┘ │ - Traffic Management │
│ │ │ │ │ - Security (mTLS) │
│ ┌───▼───┐ │ ┌───▼───┐ │ - Observability │
│ │ Sidecar│ │ │ Sidecar│ │ │
│ │ Envoy │ │ │ Envoy │ │ │
│ └───────┘ │ └───────┘ │ │
└─────────────┴─────────────┴─────────────────────────┘
Пример Istio VirtualService для traffic management:
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: user-service
spec:
hosts:
- user-service
http:
- match:
- headers:
x-canary:
exact: "true"
route:
- destination:
host: user-service
subset: canary
weight: 100
- route:
- destination:
host: user-service
subset: stable
weight: 95
- destination:
host: user-service
subset: canary
weight: 5
---
# DestinationRule для circuit breaker
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: user-service
spec:
host: user-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
h2UpgradePolicy: UPGRADE
http1MaxPendingRequests: 100
http2MaxRequests: 1000
outlierDetection:
consecutive5xxErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50Преимущества Service Mesh:
Недостатки:
Circuit Breaker (автоматический выключатель) — паттерн для предотвращения каскадных отказов в распределённых системах.
┌─────────────┐
│ CLOSED │──────┐
│ (нормально) │ │ Failures >= threshold
└──────┬──────┘ │
│ Success ▼
│ ┌─────────────┐
│ │ OPEN │──────┐
│ │ (не ходим) │ │ Timeout elapsed
│ └──────┬──────┘ │
│ │ ▼
│ │ Request ┌─────────────┐
└───────────────┴────────────►│ HALF-OPEN │
Success │ (проверка) │
└──────┬──────┘
│
┌────────────────────────────┘
│ Success
▼
┌─────────────┐
│ CLOSED │
└─────────────┘
Состояния:
# circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Callable, Any, Optional
from dataclasses import dataclass
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # Количество ошибок до открытия
success_threshold: int = 2 # Количество успехов до закрытия
timeout: float = 30.0 # Время в открытом состоянии
expected_exceptions: tuple = (Exception,)
class CircuitBreakerError(Exception):
"""Исключение, когда circuit breaker открыт"""
pass
class CircuitBreaker:
"""
Circuit Breaker реализация.
Защищает от каскадных отказов.
"""
def __init__(self, config: CircuitBreakerConfig = None):
self.config = config or CircuitBreakerConfig()
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: Optional[float] = None
self._lock = asyncio.Lock()
@property
def state(self) -> CircuitState:
return self._state
@property
def is_closed(self) -> bool:
return self._state == CircuitState.CLOSED
@property
def is_open(self) -> bool:
return self._state == CircuitState.OPEN
@property
def is_half_open(self) -> bool:
return self._state == CircuitState.HALF_OPEN
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""
Вызов функции через circuit breaker.
"""
async with self._lock:
# Проверка состояния
if self._state == CircuitState.OPEN:
# Проверка timeout для перехода в half-open
if time.time() - self._last_failure_time >= self.config.timeout:
self._state = CircuitState.HALF_OPEN
self._success_count = 0
print(f"Circuit breaker: OPEN -> HALF_OPEN")
else:
raise CircuitBreakerError(
f"Circuit breaker is OPEN. Retry after {self.config.timeout}s"
)
try:
# Вызов функции
result = await func(*args, **kwargs)
# Обработка успеха
async with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.config.success_threshold:
self._state = CircuitState.CLOSED
self._failure_count = 0
print(f"Circuit breaker: HALF_OPEN -> CLOSED")
elif self._state == CircuitState.CLOSED:
# Сброс счётчика при успехе (опционально)
self._failure_count = 0
return result
except self.config.expected_exceptions as e:
# Обработка ошибки
async with self._lock:
self._failure_count += 1
self._last_failure_time = time.time()
if self._state == CircuitState.HALF_OPEN:
# Любая ошибка в half-open возвращает в open
self._state = CircuitState.OPEN
print(f"Circuit breaker: HALF_OPEN -> OPEN")
elif self._state == CircuitState.CLOSED:
if self._failure_count >= self.config.failure_threshold:
self._state = CircuitState.OPEN
print(f"Circuit breaker: CLOSED -> OPEN")
raise
async def reset(self):
"""Сброс circuit breaker в закрытое состояние"""
async with self._lock:
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time = None
# Декоратор для удобного использования
def circuit_breaker(config: CircuitBreakerConfig = None):
"""Декоратор circuit breaker"""
cb = CircuitBreaker(config)
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
return await cb.call(func, *args, **kwargs)
wrapper.circuit_breaker = cb # Доступ к состоянию для тестов
return wrapper
return decorator# order_service/app/external_services.py
from fastapi import FastAPI, HTTPException
import httpx
from circuit_breaker import CircuitBreaker, CircuitBreakerConfig, CircuitBreakerError
app = FastAPI()
# Конфигурация circuit breaker для User Service
user_service_cb = CircuitBreaker(CircuitBreakerConfig(
failure_threshold=5, # 5 ошибок подряд
success_threshold=2, # 2 успеха для восстановления
timeout=30.0, # 30 секунд в открытом состоянии
expected_exceptions=(httpx.ConnectError, httpx.TimeoutException, httpx.HTTPStatusError)
))
async def call_user_service_with_cb(user_id: int) -> dict:
"""Вызов User Service через circuit breaker"""
async def _call():
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"http://user-service:8000/users/{user_id}")
response.raise_for_status()
return response.json()
try:
return await user_service_cb.call(_call)
except CircuitBreakerError:
# Circuit breaker открыт — используем fallback
return get_user_fallback(user_id)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise HTTPException(status_code=404, detail="User not found")
raise
def get_user_fallback(user_id: int) -> dict:
"""
Fallback при недоступности User Service.
Возвращает данные из кэша или дефолтные значения.
"""
# Попытка получить из кэша
# cached = redis.get(f"user:{user_id}")
# if cached:
# return cached
# Дефолтное значение
return {
"id": user_id,
"email": f"user{user_id}@unknown.local",
"name": "Unknown User",
"is_fallback": True
}
@app.get("/orders/{order_id}")
async def get_order(order_id: int):
"""Получение заказа с защитой circuit breaker"""
order = await get_order_from_db(order_id)
# Получение пользователя с circuit breaker
user = await call_user_service_with_cb(order.user_id)
return {
"order": order,
"user": user
}
@app.get("/circuit-breaker/status")
async def get_circuit_breaker_status():
"""Эндпоинт для мониторинга circuit breaker"""
return {
"user_service": {
"state": user_service_cb.state.value,
"failure_count": user_service_cb._failure_count,
"is_healthy": user_service_cb.is_closed
}
}# Симуляция отказа и восстановления
import asyncio
async def simulate_failure_scenario():
"""Демонстрация работы circuit breaker при отказах"""
call_count = 0
async def unstable_service():
nonlocal call_count
call_count += 1
if call_count <= 7: # Первые 7 вызовов падают
raise httpx.ConnectError("Service unavailable")
return {"status": "ok"}
cb = CircuitBreaker(CircuitBreakerConfig(
failure_threshold=5,
success_threshold=2,
timeout=2.0 # 2 секунды для демо
))
# Вызов 1-5: ошибки, circuit breaker закроется
for i in range(5):
try:
await cb.call(unstable_service)
except Exception as e:
print(f"Call {i+1}: {type(e).__name__}")
print(f"After 5 failures: state = {cb.state.value}") # OPEN
# Вызов 6: CircuitBreakerError (сразу, без вызова сервиса)
try:
await cb.call(unstable_service)
except CircuitBreakerError as e:
print(f"Call 6: {e}") # Fail fast
# Ждём timeout
await asyncio.sleep(2.5)
# Вызов 7: HALF_OPEN, но сервис ещё падает
try:
await cb.call(unstable_service)
except Exception as e:
print(f"Call 7: {type(e).__name__}, state = {cb.state.value}") # Снова OPEN
# Ждём timeout
await asyncio.sleep(2.5)
# Вызов 8-9: HALF_OPEN, сервис работает
result1 = await cb.call(unstable_service)
print(f"Call 8: success, state = {cb.state.value}") # HALF_OPEN
result2 = await cb.call(unstable_service)
print(f"Call 9: success, state = {cb.state.value}") # CLOSED!Проблема: В микросервисах нет распределённых ACID-транзакций. Как обеспечить консистентность данных между сервисами?
Saga — последовательность локальных транзакций, где каждая транзакция обновляет данные в одном сервисе и публикует событие для следующей транзакции.
Создание заказа (успешная Saga):
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Order │ │ User │ │ Payment │ │ Inventory │
│ Service │ │ Service │ │ Service │ │ Service │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │ │
│ 1. Create Order │ │ │
│ (PENDING) │ │ │
│─────────────────►│ │ │
│ │ 2. Reserve Funds │ │
│ │─────────────────►│ │
│ │ │ 3. Reserve Stock │
│ │ │─────────────────►│
│ │ │ │
│◄─────────────────┴──────────────────┴──────────────────│
│ 4. Confirm Order (CONFIRMED) │
Откат Saga (компенсирующие транзакции):
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Order │ │ User │ │ Payment │ │ Inventory │
│ Service │ │ Service │ │ Service │ │ Service │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │ │
│ 1. Create Order │ │ │
│ (PENDING) │ │ │
│─────────────────►│ │ │
│ │ 2. Reserve Funds │ │
│ │─────────────────►│ │
│ │ │ 3. Reserve Stock │
│ │ │─────────────────►│
│ │ │ │
│ │ │ X FAIL! │
│ │ │ │
│ │ 4. Cancel Funds │ │
│ │◄─────────────────│ │
│ │ │ │
│ 5. Cancel Order │ │ │
│◄─────────────────│ │ │
Каждый сервис слушает события и реагирует. Нет центрального координатора.
# order_service/app/saga_choreography.py
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import json
from enum import Enum
class OrderStatus(Enum):
PENDING = "pending"
PAYMENT_RESERVED = "payment_reserved"
CONFIRMED = "confirmed"
CANCELLED = "cancelled"
class OrderSagaChoreography:
"""
Saga через хореографию (event-driven).
Нет центрального координатора.
"""
def __init__(self):
self.producer = AIOKafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v, default=str).encode()
)
self.consumer = AIOKafkaConsumer(
"order-events", "payment-events", "inventory-events",
bootstrap_servers='kafka:9092',
group_id="order-saga-group"
)
async def start(self):
await self.producer.start()
await self.consumer.start()
async def stop(self):
await self.producer.stop()
await self.consumer.stop()
async def create_order(self, order_data: dict):
"""Шаг 1: Создание заказа в состоянии PENDING"""
order = await db.execute(
"""
INSERT INTO orders (user_id, total_amount, status, created_at)
VALUES ($1, $2, $3, NOW())
RETURNING id, user_id, total_amount, status
""",
order_data["user_id"], order_data["total_amount"], OrderStatus.PENDING.value
)
# Публикация события для Payment Service
await self.producer.send_and_wait(
topic="order-events",
key=str(order["id"]).encode(),
value={
"event_type": "order.created",
"order_id": order["id"],
"user_id": order["user_id"],
"amount": order["total_amount"]
}
)
return order
async def handle_payment_reserved(self, event: dict):
"""Шаг 3: Payment зарезервирован — резервируем inventory"""
order_id = event["order_id"]
# Публикация события для Inventory Service
await self.producer.send_and_wait(
topic="order-events",
key=str(order_id).encode(),
value={
"event_type": "order.payment_reserved",
"order_id": order_id,
"items": event["items"]
}
)
async def handle_inventory_reserved(self, event: dict):
"""Шаг 4: Inventory зарезервирован — подтверждаем заказ"""
order_id = event["order_id"]
await db.execute(
"UPDATE orders SET status = $1 WHERE id = $2",
OrderStatus.CONFIRMED.value, order_id
)
await self.producer.send_and_wait(
topic="order-events",
key=str(order_id).encode(),
value={
"event_type": "order.confirmed",
"order_id": order_id
}
)
async def handle_inventory_failed(self, event: dict):
"""Шаг 4 (ошибка): Inventory не зарезервирован — откат"""
order_id = event["order_id"]
# Публикация события для отката Payment
await self.producer.send_and_wait(
topic="order-events",
key=str(order_id).encode(),
value={
"event_type": "order.inventory_failed",
"order_id": order_id
}
)
async def handle_payment_cancelled(self, event: dict):
"""Шаг 5 (откат): Payment отменён — отменяем заказ"""
order_id = event["order_id"]
await db.execute(
"UPDATE orders SET status = $1 WHERE id = $2",
OrderStatus.CANCELLED.value, order_id
)
# payment_service/app/saga_participant.py
class PaymentSagaParticipant:
"""Payment Service — участник Saga"""
async def handle_order_created(self, event: dict):
"""Обработка события order.created"""
try:
# Резервирование средств
await self.reserve_funds(
user_id=event["user_id"],
amount=event["amount"]
)
# Публикация успеха
await self.producer.send_and_wait(
topic="payment-events",
value={
"event_type": "payment.reserved",
"order_id": event["order_id"]
}
)
except InsufficientFundsError:
# Публикация ошибки
await self.producer.send_and_wait(
topic="payment-events",
value={
"event_type": "payment.failed",
"order_id": event["order_id"],
"reason": "insufficient_funds"
}
)
async def handle_order_inventory_failed(self, event: dict):
"""Компенсирующая транзакция: отмена резерва"""
await self.cancel_reservation(event["order_id"])
await self.producer.send_and_wait(
topic="payment-events",
value={
"event_type": "payment.cancelled",
"order_id": event["order_id"]
}
)Преимущества хореографии:
Недостатки хореографии:
Центральный оркестратор управляет потоком Saga.
# saga_orchestrator/app/orchestrator.py
from enum import Enum
from typing import Callable, Dict, Any
import asyncio
class SagaStatus(Enum):
RUNNING = "running"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
class SagaStep:
"""Шаг Saga с действием и компенсацией"""
def __init__(self, name: str, action: Callable, compensate: Callable):
self.name = name
self.action = action
self.compensate = compensate
class SagaOrchestrator:
"""
Оркестратор Saga.
Управляет выполнением шагов и откатом при ошибке.
"""
def __init__(self, saga_id: str):
self.saga_id = saga_id
self.steps: list[SagaStep] = []
self.executed_steps: list[SagaStep] = []
self.status = SagaStatus.RUNNING
self.context: Dict[str, Any] = {}
def add_step(self, name: str, action: Callable, compensate: Callable):
"""Добавление шага в Saga"""
self.steps.append(SagaStep(name, action, compensate))
return self # Fluent interface
async def execute(self) -> bool:
"""
Выполнение Saga.
Возвращает True при успехе, False при неудаче.
"""
for step in self.steps:
if self.status != SagaStatus.RUNNING:
break
try:
print(f"Saga {self.saga_id}: executing step '{step.name}'")
result = await step.action(self.context)
self.context.update(result or {})
self.executed_steps.append(step)
print(f"Saga {self.saga_id}: step '{step.name}' completed")
except Exception as e:
print(f"Saga {self.saga_id}: step '{step.name}' failed: {e}")
await self._compensate()
self.status = SagaStatus.FAILED
return False
self.status = SagaStatus.COMPLETED
print(f"Saga {self.saga_id}: completed successfully")
return True
async def _compensate(self):
"""Выполнение компенсирующих транзакций в обратном порядке"""
self.status = SagaStatus.COMPENSATING
print(f"Saga {self.saga_id}: starting compensation")
for step in reversed(self.executed_steps):
try:
print(f"Saga {self.saga_id}: compensating step '{step.name}'")
await step.compensate(self.context)
print(f"Saga {self.saga_id}: step '{step.name}' compensated")
except Exception as e:
# Логирование критической ошибки компенсации
print(f"Saga {self.saga_id}: compensation failed for '{step.name}': {e}")
# В production — отправка алерта, ручное вмешательство
# Пример использования: Create Order Saga
async def create_order_saga(user_id: int, items: list, amount: float):
"""Создание Saga для заказа"""
saga = SagaOrchestrator(saga_id=f"order-{uuid.uuid4()}")
# Шаг 1: Создать заказ (PENDING)
async def create_order(ctx):
order = await db.fetchrow(
"""
INSERT INTO orders (user_id, total_amount, status)
VALUES ($1, $2, 'pending')
RETURNING id
""",
user_id, amount
)
ctx["order_id"] = order["id"]
return {"order_id": order["id"]}
async def cancel_order(ctx):
await db.execute(
"UPDATE orders SET status = 'cancelled' WHERE id = $1",
ctx["order_id"]
)
saga.add_step("create_order", create_order, cancel_order)
# Шаг 2: Резервировать оплату
async def reserve_payment(ctx):
payment = await payment_client.reserve(
user_id=user_id,
amount=amount,
order_id=ctx["order_id"]
)
ctx["payment_id"] = payment["id"]
return {"payment_id": payment["id"]}
async def cancel_payment(ctx):
await payment_client.cancel(ctx["payment_id"])
saga.add_step("reserve_payment", reserve_payment, cancel_payment)
# Шаг 3: Резервировать inventory
async def reserve_inventory(ctx):
for item in items:
await inventory_client.reserve(
product_id=item["product_id"],
quantity=item["quantity"]
)
ctx["inventory_reserved"] = True
async def cancel_inventory(ctx):
for item in items:
await inventory_client.cancel_reservation(
product_id=item["product_id"],
quantity=item["quantity"]
)
saga.add_step("reserve_inventory", reserve_inventory, cancel_inventory)
# Шаг 4: Подтвердить заказ
async def confirm_order(ctx):
await db.execute(
"UPDATE orders SET status = 'confirmed' WHERE id = $1",
ctx["order_id"]
)
# confirm_order не требует компенсации (заказ уже подтверждён)
saga.add_step("confirm_order", confirm_order, lambda ctx: None)
# Выполнение Saga
success = await saga.execute()
return success, saga.contextПреимущества оркестрации:
Недостатки оркестрации:
| Критерий | Choreography | Orchestration |
|---|---|---|
| Сложность | Низкая (2-3 шага) | Средняя/Высокая |
| Количество сервисов | 2-3 | 3+ |
| Связанность | Низкая | Средняя (оркестратор знает всё) |
| Отладка | Сложная | Проще (центральный лог) |
| Масштабируемость | Высокая | Оркестратор может стать bottleneck |
| Гибкость | Сложно изменить поток | Легко добавить шаги |
API Gateway — единая точка входа для всех клиентов, которая маршрутизирует запросы к соответствующим сервисам.
┌─────────────────────────────────────────────────────────┐
│ API Gateway │
│ (Kong / AWS API Gateway / Nginx / Spring Cloud Gateway)│
├─────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │
│ │ Auth │ │ Rate │ │ Request/ │ │
│ │ (JWT) │ │ Limiting │ │ Response │ │
│ │ │ │ │ │ Transform │ │
│ └─────────────┘ └─────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│User Service │ │Order Service│ │Product Svc │
└─────────────┘ └─────────────┘ └─────────────┘
# api_gateway/app/main.py
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
import httpx
import jwt
from typing import Optional
app = FastAPI(title="API Gateway")
# Сервисы для маршрутизации
SERVICES = {
"users": "http://user-service:8000",
"orders": "http://order-service:8000",
"products": "http://product-service:8000",
"payments": "http://payment-service:8000",
}
JWT_SECRET = "your-secret-key"
JWT_ALGORITHM = "HS256"
async def verify_token(request: Request) -> Optional[dict]:
"""Верификация JWT токена"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return None
token = auth_header.split(" ")[1]
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload
except jwt.InvalidTokenError:
return None
@app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def proxy_request(
service: str,
path: str,
request: Request,
user: dict = Depends(verify_token)
):
"""
Универсальный прокси для всех сервисов.
"""
# Проверка сервиса
if service not in SERVICES:
raise HTTPException(status_code=404, detail="Service not found")
# Проверка авторизации для защищённых эндпоинтов
protected_services = ["users", "orders", "payments"]
if service in protected_services and not user:
raise HTTPException(status_code=401, detail="Unauthorized")
# Построение URL
target_url = f"{SERVICES[service]}/{path}"
# Получение тела запроса
body = None
if request.method in ["POST", "PUT", "PATCH"]:
body = await request.json()
# Проксирование запроса
async with httpx.AsyncClient() as client:
try:
response = await client.request(
method=request.method,
url=target_url,
json=body,
headers={k: v for k, v in request.headers.items() if k != "host"},
timeout=30.0
)
# Возврат ответа
return JSONResponse(
status_code=response.status_code,
content=response.json(),
headers=dict(response.headers)
)
except httpx.ConnectError:
raise HTTPException(
status_code=503,
detail=f"Service {service} unavailable"
)
except httpx.TimeoutException:
raise HTTPException(
status_code=504,
detail=f"Service {service} timeout"
)
# Health check endpoint
@app.get("/health")
async def health_check():
"""Проверка здоровья gateway"""
return {"status": "healthy"}# kong.yml — декларативная конфигурация Kong
_format_version: "2.1"
services:
- name: user-service
url: http://user-service:8000
routes:
- name: users-route
paths:
- /api/v1/users
plugins:
- name: jwt
config:
key_claim_name: sub
secret_is_base64: false
- name: rate-limiting
config:
minute: 100
policy: redis
redis_host: redis
- name: cors
config:
origins:
- https://app.example.com
methods:
- GET
- POST
- PUT
- DELETE
credentials: true
- name: order-service
url: http://order-service:8000
routes:
- name: orders-route
paths:
- /api/v1/orders
plugins:
- name: jwt
- name: rate-limiting
config:
minute: 50
- name: product-service
url: http://product-service:8000
routes:
- name: products-route
paths:
- /api/v1/products
plugins:
- name: rate-limiting
config:
minute: 200 # Публичный API, выше лимит
consumers:
- username: mobile-app
jwt_secrets:
- key: mobile-app-key
secret: mobile-secret
- key: web-app-key
secret: web-secret| Функция | Описание | Пример |
|---|---|---|
| Маршрутизация | Направление запроса к сервису | /api/users → User Service |
| Аутентификация | Проверка токенов | JWT, OAuth2, API Keys |
| Rate Limiting | Ограничение запросов | 100 запросов/минуту |
| Трансформация | Изменение запроса/ответа | Добавление заголовков |
| Кэширование | Кэш ответов | Кэш GET запросов |
| Логирование | Централизованные логи | Access logs, audit |
| Мониторинг | Метрики и трассировка | Prometheus, Jaeger |
| SSL Termination | Расшифровка HTTPS | HTTPS → HTTP внутри |
Распределённые системы должны быть устойчивы к отказам. Circuit Breaker — только один из паттернов.
Автоматический повтор запроса при временных ошибках.
# retry_pattern.py
import asyncio
import random
from typing import Callable, Any, Type, Tuple
from functools import wraps
class RetryError(Exception):
"""Исключение после исчерпания попыток"""
pass
async def retry_with_backoff(
func: Callable,
*args,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,),
**kwargs
) -> Any:
"""
Повторные попытки с экспоненциальной задержкой.
Args:
func: Асинхронная функция для вызова
max_retries: Максимальное количество попыток
base_delay: Базовая задержка в секундах
max_delay: Максимальная задержка
exponential_base: База экспоненты
jitter: Добавить случайность к задержке
retryable_exceptions: Исключения для повторной попытки
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt == max_retries:
break
# Расчёт задержки
delay = min(base_delay * (exponential_base ** attempt), max_delay)
if jitter:
# Добавление случайности ±25%
delay = delay * (0.75 + random.random() * 0.5)
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
await asyncio.sleep(delay)
raise RetryError(f"Failed after {max_retries + 1} attempts") from last_exception
# Декоратор для удобного использования
def retry(
max_retries: int = 3,
base_delay: float = 1.0,
retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
"""Декоратор retry"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
return await retry_with_backoff(
func, *args,
max_retries=max_retries,
base_delay=base_delay,
retryable_exceptions=retryable_exceptions,
**kwargs
)
return wrapper
return decorator
# Пример использования
@retry(
max_retries=3,
base_delay=1.0,
retryable_exceptions=(httpx.ConnectError, httpx.TimeoutException)
)
async def call_external_api():
"""Вызов внешнего API с retry"""
async with httpx.AsyncClient() as client:
response = await client.get("https://api.external.com/data")
response.raise_for_status()
return response.json()Ограничение времени выполнения операции.
# timeout_pattern.py
import asyncio
from functools import wraps
class TimeoutError(Exception):
"""Исключение при превышении таймаута"""
pass
async def with_timeout(
func: Callable,
timeout: float,
*args,
timeout_message: str = None,
**kwargs
) -> Any:
"""
Выполнение функции с таймаутом.
Args:
func: Асинхронная функция
timeout: Таймаут в секундах
timeout_message: Сообщение при таймауте
"""
try:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=timeout
)
except asyncio.TimeoutError:
raise TimeoutError(timeout_message or f"Operation timed out after {timeout}s")
# Декоратор timeout
def timeout(seconds: float, message: str = None):
"""Декоратор таймаута"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
return await with_timeout(func, seconds, *args, timeout_message=message, **kwargs)
return wrapper
return decorator
# Пример: комбинация retry + timeout
@retry(
max_retries=3,
base_delay=1.0,
retryable_exceptions=(TimeoutError, httpx.ConnectError)
)
@timeout(seconds=5.0, message="External API timeout")
async def call_with_resilience():
"""Вызов с retry и timeout"""
async with httpx.AsyncClient() as client:
response = await client.get("https://api.external.com/data")
return response.json()Изоляция ресурсов для предотвращения каскадных отказов.
# bulkhead_pattern.py
import asyncio
from typing import Callable, Any
from dataclasses import dataclass
@dataclass
class BulkheadConfig:
max_concurrent: int = 10 # Максимум одновременных вызовов
max_queue_size: int = 100 # Размер очереди
class BulkheadError(Exception):
"""Исключение при переполнении bulkhead"""
pass
class Bulkhead:
"""
Bulkhead паттерн.
Ограничивает количество одновременных вызовов.
"""
def __init__(self, config: BulkheadConfig = None):
self.config = config or BulkheadConfig()
self._semaphore = asyncio.Semaphore(self.config.max_concurrent)
self._queue = asyncio.Queue(maxsize=self.config.max_queue_size)
self._current_count = 0
@property
def current_count(self) -> int:
return self._current_count
@property
def available_capacity(self) -> int:
return self.config.max_concurrent - self._current_count
async def execute(self, func: Callable, *args, **kwargs) -> Any:
"""
Выполнение функции через bulkhead.
"""
# Попытка добавить в очередь
try:
self._queue.put_nowait(None)
except asyncio.QueueFull:
raise BulkheadError(
f"Bulkhead queue full ({self.config.max_queue_size} limit)"
)
try:
# Ожидание доступного слота
async with self._semaphore:
self._current_count += 1
try:
return await func(*args, **kwargs)
finally:
self._current_count -= 1
finally:
# Удаление из очереди
self._queue.get_nowait()
# Пример: отдельные bulkhead для разных сервисов
user_service_bulkhead = Bulkhead(BulkheadConfig(max_concurrent=20, max_queue_size=50))
payment_service_bulkhead = Bulkhead(BulkheadConfig(max_concurrent=5, max_queue_size=20))
async def call_user_service(user_id: int):
"""Вызов User Service с ограничением параллелизма"""
async def _call():
async with httpx.AsyncClient() as client:
response = await client.get(f"http://user-service/users/{user_id}")
return response.json()
return await user_service_bulkhead.execute(_call)
async def call_payment_service(amount: float):
"""Вызов Payment Service с более строгим ограничением"""
async def _call():
async with httpx.AsyncClient() as client:
response = await client.post(
"http://payment-service/charge",
json={"amount": amount}
)
return response.json()
return await payment_service_bulkhead.execute(_call)Зачем Bulkhead:
Альтернативное действие при неудаче основного.
# fallback_pattern.py
from typing import Callable, Any, Optional, Type, Tuple
async def with_fallback(
primary: Callable,
fallback: Callable,
*args,
fallback_exceptions: Tuple[Type[Exception], ...] = (Exception,),
**kwargs
) -> Any:
"""
Выполнение primary, при ошибке — fallback.
Args:
primary: Основная функция
fallback: Резервная функция
fallback_exceptions: Исключения для fallback
"""
try:
return await primary(*args, **kwargs)
except fallback_exceptions as e:
print(f"Primary failed ({e}), using fallback")
return await fallback(*args, **kwargs)
# Примеры fallback стратегий
async def get_user_with_fallback(user_id: int) -> dict:
"""Получение пользователя с fallback"""
# Primary: вызов User Service
async def primary():
async with httpx.AsyncClient() as client:
response = await client.get(f"http://user-service/users/{user_id}")
response.raise_for_status()
return response.json()
# Fallback 1: Кэш
async def fallback_cache():
cached = await redis.get(f"user:{user_id}")
if cached:
return json.loads(cached)
raise Exception("Cache miss")
# Fallback 2: Дефолтное значение
async def fallback_default():
return {
"id": user_id,
"name": "Unknown",
"email": f"user{user_id}@default.local",
"is_fallback": True
}
# Цепочка fallback
try:
return await primary()
except Exception:
try:
return await fallback_cache()
except Exception:
return await fallback_default()# resilience_patterns_combined.py
from circuit_breaker import CircuitBreaker, CircuitBreakerConfig
from retry_pattern import retry_with_backoff
from timeout_pattern import with_timeout
from bulkhead_pattern import Bulkhead, BulkheadConfig
from fallback_pattern import with_fallback
class ResilientServiceClient:
"""
Клиент сервиса со всеми паттернами устойчивости.
"""
def __init__(self, service_name: str, base_url: str):
self.service_name = service_name
self.base_url = base_url
# Circuit Breaker
self.circuit_breaker = CircuitBreaker(CircuitBreakerConfig(
failure_threshold=5,
success_threshold=2,
timeout=30.0
))
# Bulkhead
self.bulkhead = Bulkhead(BulkheadConfig(
max_concurrent=20,
max_queue_size=50
))
async def call(self, endpoint: str, method: str = "GET", **kwargs) -> Any:
"""
Вызов сервиса со всеми паттернами.
Порядок: Bulkhead → Circuit Breaker → Retry → Timeout → Fallback
"""
async def primary_call():
async def _request():
async with httpx.AsyncClient() as client:
response = await client.request(
method=method,
url=f"{self.base_url}/{endpoint}",
**kwargs
)
response.raise_for_status()
return response.json()
# Retry с экспоненциальной задержкой
return await retry_with_backoff(
_request,
max_retries=3,
base_delay=1.0,
retryable_exceptions=(httpx.ConnectError, httpx.TimeoutException)
)
async def fallback():
# Fallback: возвращаем дефолтное значение или кэш
return {"error": "service_unavailable", "fallback": True}
async def execute_with_resilience():
# Timeout
result = await with_timeout(
lambda: self.circuit_breaker.call(primary_call),
timeout=10.0
)
return result
try:
# Bulkhead ограничивает параллелизм
return await self.bulkhead.execute(execute_with_resilience)
except Exception:
# Fallback при всех неудачах
return await fallback()
# Использование
user_client = ResilientServiceClient("user-service", "http://user-service:8000")
async def get_user(user_id: int):
return await user_client.call(f"users/{user_id}")┌─────────────────────────────────────────────────────────────────┐
│ API Gateway │
│ (Kong + JWT + Rate Limit) │
└─────────────────────────────────────────────────────────────────┘
│
┌───────────────────────┼───────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ User Service │ │ Order Service │ │Product Service│
│ FastAPI │ │ FastAPI │ │ FastAPI │
│ PostgreSQL │ │ PostgreSQL │ │ PostgreSQL │
│ Redis │ │ Redis │ │ │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
└──────────────────────┼──────────────────────┘
│
┌──────────▼──────────┐
│ Kafka Cluster │
│ (order-events, │
│ payment-events) │
└──────────┬──────────┘
│
┌──────────────────────┼──────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│Payment Service│ │Inventory Svc │ │ Notify Service│
│ FastAPI │ │ FastAPI │ │ FastAPI │
│ PostgreSQL │ │ PostgreSQL │ │ SendGrid │
└───────────────┘ └───────────────┘ └───────────────┘
# order_service/app/orders.py
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List
import httpx
from saga_orchestrator import SagaOrchestrator
from circuit_breaker import CircuitBreaker, CircuitBreakerConfig
from retry_pattern import retry_with_backoff
app = FastAPI()
# Circuit breakers для внешних сервисов
payment_cb = CircuitBreaker(CircuitBreakerConfig(failure_threshold=5, timeout=30))
inventory_cb = CircuitBreaker(CircuitBreakerConfig(failure_threshold=5, timeout=30))
class OrderItem(BaseModel):
product_id: int
quantity: int
class CreateOrderRequest(BaseModel):
user_id: int
items: List[OrderItem]
@app.post("/orders")
async def create_order(request: CreateOrderRequest):
"""Создание заказа с Saga"""
# Расчёт общей суммы
total_amount = await calculate_total(request.items)
# Создание Saga
saga = SagaOrchestrator(saga_id=f"order-{uuid.uuid4()}")
# Шаг 1: Создать заказ
async def create_order_step(ctx):
order = await db.fetchrow(
"""
INSERT INTO orders (user_id, total_amount, status)
VALUES ($1, $2, 'pending')
RETURNING id
""",
request.user_id, total_amount
)
ctx["order_id"] = order["id"]
async def cancel_order_step(ctx):
await db.execute(
"UPDATE orders SET status = 'cancelled' WHERE id = $1",
ctx["order_id"]
)
saga.add_step("create_order", create_order_step, cancel_order_step)
# Шаг 2: Резервировать оплату
async def reserve_payment_step(ctx):
async def _call():
async with httpx.AsyncClient() as client:
response = await client.post(
"http://payment-service/reserve",
json={"user_id": request.user_id, "amount": total_amount}
)
response.raise_for_status()
return response.json()
# С circuit breaker и retry
result = await retry_with_backoff(
lambda: payment_cb.call(_call),
max_retries=3,
retryable_exceptions=(httpx.ConnectError, httpx.TimeoutException)
)
ctx["payment_id"] = result["payment_id"]
async def cancel_payment_step(ctx):
async with httpx.AsyncClient() as client:
await client.post(
f"http://payment-service/cancel/{ctx['payment_id']}"
)
saga.add_step("reserve_payment", reserve_payment_step, cancel_payment_step)
# Шаг 3: Резервировать inventory
async def reserve_inventory_step(ctx):
for item in request.items:
async def _reserve(item):
async with httpx.AsyncClient() as client:
response = await client.post(
"http://inventory-service/reserve",
json={"product_id": item.product_id, "quantity": item.quantity}
)
response.raise_for_status()
await retry_with_backoff(
lambda: inventory_cb.call(lambda: _reserve(item)),
max_retries=3
)
async def cancel_inventory_step(ctx):
for item in request.items:
async with httpx.AsyncClient() as client:
await client.post(
"http://inventory-service/release",
json={"product_id": item.product_id, "quantity": item.quantity}
)
saga.add_step("reserve_inventory", reserve_inventory_step, cancel_inventory_step)
# Шаг 4: Подтвердить заказ
async def confirm_order_step(ctx):
await db.execute(
"UPDATE orders SET status = 'confirmed' WHERE id = $1",
ctx["order_id"]
)
# Отправка уведомления
await publish_event("order.confirmed", {"order_id": ctx["order_id"]})
saga.add_step("confirm_order", confirm_order_step, lambda ctx: None)
# Выполнение Saga
success = await saga.execute()
if not success:
raise HTTPException(status_code=400, detail="Order creation failed")
return {"order_id": saga.context["order_id"], "status": "confirmed"}Микросервисная архитектура — мощный инструмент, но требует зрелости команды и инфраструктуры. Ключевые принципы:
Помните: микросервисы не решают проблемы плохой архитектуры, они их масштабируют.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.