Эволюция форматов сообщений: версионирование, совместимость, миграции без downtime, schema registry.
Эволюция форматов сообщений: версионирование, совместимость, миграции без downtime, schema registry.
Когда вы изменяете формат сообщений, возникают проблемы:
# Версия 1
class OrderV1(BaseModel):
id: int
amount: float
# Версия 2 (добавили поле)
class OrderV2(BaseModel):
id: int
amount: float
discount: float # Новое поле
# Что произойдёт?
# - Старые потребители не поймут новые сообщения
# - Новые потребители не обработают старые сообщения
# - Сообщения в очереди могут быть разных версийНовые потребители читают старые сообщения.
# Новое поле optional с default
class OrderV2(BaseModel):
id: int
amount: float
discount: float = 0.0 # Default для старых сообщенийПравила:
Старые потребители читают новые сообщения.
# Игнорирование неизвестных полей
class OrderV1(BaseModel):
id: int
amount: float
class Config:
extra = "ignore" # Игнорировать новые поляПравила:
И backward, и forward одновременно. Требуется для zero-downtime деплоя.
# Публикация
await broker.publish(order_v2, "orders.v2")
# Потребление
@broker.subscriber("orders.v1")
async def handle_v1(order: OrderV1):
...
@broker.subscriber("orders.v2")
async def handle_v2(order: OrderV2):
...Преимущества:
Недостатки:
class BaseMessage(BaseModel):
version: str = "1.0"
class OrderV1(BaseMessage):
version: str = "1.0"
id: int
amount: float
class OrderV2(BaseMessage):
version: str = "2.0"
id: int
amount: float
discount: float
# Потребитель с маршрутизацией
@broker.subscriber("orders")
async def handle_order(message: dict):
version = message.get("version", "1.0")
if version == "1.0":
order = OrderV1(**message)
elif version == "2.0":
order = OrderV2(**message)
else:
raise ValueError(f"Unknown version: {version}")# Публикация
await broker.publish(
order,
"orders",
headers={"schema_version": "2.0"}
)
# Потребитель
@broker.subscriber("orders")
async def handle_order(message: dict, msg: RabbitMessage):
version = msg.headers.get("schema_version", "1.0")
if version == "2.0":
order = OrderV2(**message)
else:
order = OrderV1(**message)async def publish_order(order: dict):
# Публикация в обе версии
await broker.publish(order, "orders.v1") # Старые потребители
await broker.publish({**order, "version": "2.0"}, "orders.v2") # НовыеЭтапы:
Фаза 1: Expand (расширение)
# Добавляем новое поле как optional
class Order(BaseModel):
id: int
amount: float
discount: float = 0.0 # Новое полеФаза 2: Migration (миграция)
Фаза 3: Contract (сжатие)
# Удаляем старое поле (если нужно)
class OrderV3(BaseModel):
id: int
discount: float # amount удалёнТопик: orders
┌─────────────┐
│ Publisher │
│ (v1+v2) │
└──────┬──────┘
│
├──> orders.v1 ──> Consumer v1 (старый)
│
└──> orders.v2 ──> Consumer v2 (новый)
Этапы:
from schema_registry.client import SchemaRegistryClient, Schema
client = SchemaRegistryClient(url="http://schema-registry:8081")
# Регистрация схемы
schema = Schema({
"type": "record",
"name": "Order",
"fields": [
{"name": "id", "type": "int"},
{"name": "amount", "type": "float"},
{"name": "discount", "type": "float", "default": 0.0}
]
}, schema_type="AVRO")
subject = "orders-value"
schema_id = client.register(subject, schema)
# Проверка совместимости
compatibility = client.test_compatibility(subject, schema)
# True если backward compatiblefrom schema_registry.client import Compatibility
# Установить уровень совместимости
client.set_compatibility(subject, Compatibility.BACKWARD)
# Перед публикацией проверить
if not client.test_compatibility(subject, new_schema):
raise ValueError("Schema not compatible!")
# Опубликовать с валидацией
await broker.publish(
order,
"orders",
headers={"schema_id": schema_id}
)from pydantic import BaseModel
from typing import Union, Literal
# === Версия 1 (старая) ===
class OrderV1(BaseModel):
version: Literal["1.0"] = "1.0"
id: int
amount: float
user_id: int
# === Версия 2 (новая) ===
class OrderV2(BaseModel):
version: Literal["2.0"] = "2.0"
id: int
amount: float
discount: float = 0.0
user_id: int
currency: str = "USD"
# === Union для потребителей ===
Order = Union[OrderV1, OrderV2]
# === Publisher с миграцией ===
async def publish_order(order_data: dict):
# Dual write: публикация в обе версии
await broker.publish(order_data, "orders.v1")
await broker.publish({**order_data, "version": "2.0"}, "orders.v2")
# === Потребитель с маршрутизацией ===
@broker.subscriber("orders.v1")
async def handle_order_v1(order: OrderV1):
logger.info(f"V1 order: {order.id}")
# Обработка v1
@broker.subscriber("orders.v2")
async def handle_order_v2(order: OrderV2):
logger.info(f"V2 order: {order.id}, discount: {order.discount}")
# Обработка v2
# === Миграция: после обновления всех потребителей ===
@broker.subscriber("orders")
async def handle_order_unified(order: Order):
if order.version == "1.0":
# Конвертация v1 → v2
order = OrderV2(
version="2.0",
id=order.id,
amount=order.amount,
discount=0.0,
user_id=order.user_id
)
# Единая обработка
await process_order(order)class BaseMessage(BaseModel):
version: str
timestamp: str
correlation_id: str# ✅ Хорошо
discount: float = 0.0
# ❌ Плохо
discount: float # Breaks backward compatibilityclass Config:
extra = "ignore" # Для forward compatibility## Changelog
### v2.0 (2026-03-30)
- Добавлено поле `discount` (optional, default=0.0)
- Добавлено поле `currency` (optional, default="USD")
- Backward compatible с v1.0
### v1.0 (2026-01-01)
- Initial schemadef test_backward_compatibility():
# Старые данные должны валидироваться новой схемой
old_data = {"id": 1, "amount": 100, "user_id": 123}
order = OrderV2(**old_data) # Не должно выбросить
assert order.discount == 0.0
def test_forward_compatibility():
# Новые данные должны игнорироваться старой схемой
new_data = {"id": 1, "amount": 100, "user_id": 123, "discount": 10.0}
order = OrderV1(**new_data) # discount игнорируется
assert not hasattr(order, 'discount')Поздравляем! Вы прошли полный курс по FastStream:
Теперь вы готовы строить надёжные, масштабируемые микросервисные приложения с помощью FastStream!
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.