Типизация сообщений: Pydantic-модели, валидация входящих данных, кастомные валидаторы, обработка ошибок валидации.
Типизация сообщений: Pydantic-модели, валидация входящих данных, кастомные валидаторы, обработка ошибок валидации.
FastStream автоматически валидирует входящие сообщения через Pydantic:
from pydantic import BaseModel
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://localhost")
app = FastStream(broker)
class Order(BaseModel):
id: int
amount: float
status: str
@broker.subscriber("orders")
async def handle_order(order: Order):
# order — уже валидированная Pydantic-модель
print(f"Order {order.id}: ${order.amount}")Что происходит:
{"id": 123, "amount": 99.99, "status": "new"}OrderPydantic проверяет типы данных автоматически:
from pydantic import BaseModel, EmailStr
from datetime import datetime
from decimal import Decimal
class User(BaseModel):
id: int # Целое число
name: str # Строка
email: EmailStr # Email (валидация формата)
balance: Decimal # Десятичное число (для денег)
created_at: datetime # Дата-время
is_active: bool = True # Булево со значением по умолчаниюПримеры сообщений:
✅ Валидно:
{
"id": 123,
"name": "John",
"email": "john@example.com",
"balance": "99.99",
"created_at": "2026-03-30T12:00:00Z",
"is_active": true
}❌ Не валидно (ошибка типов):
{
"id": "abc", // Ожидается int, не str
"name": "John",
"email": "invalid", // Неверный формат email
"balance": "not-a-number",
"created_at": "yesterday",
"is_active": "yes"
}from pydantic import BaseModel, Field
class Order(BaseModel):
id: int = Field(gt=0) # Больше 0
amount: float = Field(ge=0, le=1000000) # 0 <= amount <= 1000000
status: str = Field(min_length=1, max_length=50)
description: str = Field(default="", max_length=500)
tags: list[str] = Field(default_factory=list, max_length=10)Ограничения:
gt, ge, lt, le — сравнение чиселmin_length, max_length — длина строки/спискаregex — проверка по регулярному выражениюpattern — алиас для regex в Pydantic v2from pydantic import BaseModel, Field
class User(BaseModel):
phone: str = Field(pattern=r'^\+7\d{10}$') # +7XXXXXXXXXX
postal_code: str = Field(pattern=r'^\d{6}$') # 6 цифрfrom pydantic import BaseModel, field_validator
class Order(BaseModel):
id: int
amount: float
discount: float = 0.0
@field_validator('discount')
@classmethod
def validate_discount(cls, v):
if v < 0 or v > 100:
raise ValueError('Discount must be between 0 and 100')
return vfrom pydantic import BaseModel, field_validator, model_validator
class Order(BaseModel):
amount: float
discount: float
final_amount: float = None
@field_validator('discount')
@classmethod
def validate_discount(cls, v):
if v < 0 or v > 100:
raise ValueError('Discount must be 0-100')
return v
@model_validator(mode='after')
def calculate_final(self):
if self.final_amount is None:
self.final_amount = self.amount * (1 - self.discount / 100)
return selffrom pydantic import BaseModel, field_validator
class Order(BaseModel):
items: list[dict]
@field_validator('items')
@classmethod
def validate_items(cls, items):
if not items:
raise ValueError('Order must have at least one item')
# Проверка уникальности SKU
skus = [item.get('sku') for item in items]
if len(skus) != len(set(skus)):
raise ValueError('Duplicate items in order')
return itemsfrom pydantic import BaseModel
from enum import Enum
class OrderStatus(str, Enum):
NEW = "new"
PAID = "paid"
SHIPPED = "shipped"
CANCELLED = "cancelled"
class Order(BaseModel):
id: int
status: OrderStatus # Только допустимые значения
# Валидно: {"id": 123, "status": "new"}
# Не валидно: {"id": 123, "status": "unknown"} → ValueErrorfrom pydantic import BaseModel
from typing import Union
class EmailNotification(BaseModel):
type: str = "email"
email: str
subject: str
class SMSNotification(BaseModel):
type: str = "sms"
phone: str
message: str
Notification = Union[EmailNotification, SMSNotification]
@broker.subscriber("notifications")
async def handle_notification(notif: Notification):
if notif.type == "email":
print(f"Email to {notif.email}")
else:
print(f"SMS to {notif.phone}")Более эффективный Union с дискриминатором:
from pydantic import BaseModel, Field
from typing import Union, Literal
class EmailNotification(BaseModel):
type: Literal["email"]
email: str
subject: str
class SMSNotification(BaseModel):
type: Literal["sms"]
phone: str
message: str
class PushNotification(BaseModel):
type: Literal["push"]
user_id: int
title: str
Notification = Annotated[
Union[EmailNotification, SMSNotification, PushNotification],
Field(discriminator='type')
]
# Pydantic быстро определяет тип по полю 'type'from faststream import FastStream
from faststream.exceptions import ValidationError
from pydantic import ValidationError as PydanticValidationError
broker = RabbitBroker("amqp://localhost")
@broker.subscriber("orders")
async def handle_order(order: Order):
...
# Глобальный обработчик ошибок
@broker.subscriber("orders")
async def handle_order_with_validation(order: Order):
try:
# Обработка
...
except PydanticValidationError as e:
logger.error(f"Validation failed: {e}")
# Сообщение автоматически вернётся в очередь или DLQ
raisefrom faststream.rabbit import RabbitMessage
@broker.subscriber("orders", ack=True, manual_ack=True)
async def handle_order(order: Order, message: RabbitMessage):
try:
await process_order(order)
await message.ack()
except PydanticValidationError as e:
logger.error(f"Invalid message: {e}")
# Не retry'нуть — ошибка в данных
await message.nack(requeue=False)
except Exception as e:
logger.error(f"Processing error: {e}")
# Временная ошибка — retry
await message.nack(requeue=True)from pydantic import BaseModel
class Address(BaseModel):
street: str
city: str
postal_code: str
class Customer(BaseModel):
id: int
name: str
email: str
address: Address # Вложенная модель
class Order(BaseModel):
id: int
customer: Customer
items: list[dict]
total: float
# Валидируется вся иерархия
@broker.subscriber("orders")
async def handle_order(order: Order):
print(f"Order for {order.customer.name}")
print(f"Ship to {order.customer.address.city}")from pydantic import BaseModel
class Order(BaseModel):
id: int
amount: float
status: str
class Config:
json_schema_extra = {
"example": {
"id": 123,
"amount": 99.99,
"status": "new"
}
}
# Сериализация
order = Order(id=123, amount=99.99, status="new")
# В dict
data = order.model_dump()
# В JSON
json_str = order.model_dump_json()
# Только определённые поля
data = order.model_dump(include={"id", "status"})
data = order.model_dump(exclude={"amount"})from pydantic import BaseModel, Field, field_validator, model_validator
from datetime import datetime
from decimal import Decimal
from enum import Enum
class OrderStatus(str, Enum):
NEW = "new"
PAID = "paid"
CANCELLED = "cancelled"
class OrderItem(BaseModel):
sku: str = Field(min_length=1, max_length=50)
quantity: int = Field(gt=0, le=1000)
price: Decimal = Field(ge=0)
@field_validator('sku')
@classmethod
def validate_sku(cls, v):
if not v.replace('-', '').isalnum():
raise ValueError('SKU must be alphanumeric with dashes')
return v
class Order(BaseModel):
id: int = Field(gt=0)
customer_id: int = Field(gt=0)
items: list[OrderItem]
status: OrderStatus = OrderStatus.NEW
created_at: datetime = Field(default_factory=datetime.utcnow)
@field_validator('items')
@classmethod
def validate_items(cls, items):
if not items:
raise ValueError('Order must have at least one item')
return items
@model_validator(mode='after')
def validate_total(self):
total = sum(item.price * item.quantity for item in self.items)
if total > 1000000:
raise ValueError('Order total exceeds limit')
return self
@broker.subscriber("orders")
async def handle_order(order: Order):
logger.info(f"Valid order {order.id} for customer {order.customer_id}")Следующая тема — Dependency Injection в FastStream: внедрение зависимостей через Depends, контекст, базы данных, кэш, внешние сервисы.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.