Форматы сериализации данных: JSON, Avro, Protobuf. Работа с Schema Registry, версионирование схем.
Сериализация преобразует объекты Python в байты для отправки в Kafka. В этой теме вы изучите форматы сериализации, Schema Registry и эволюцию схем.
Kafka хранит сообщения в виде байтов. Producer должны сериализовать объекты в байты, consumer — десериализовать байты обратно.
# Producer сериализует
obj = {'user_id': 123, 'event': 'login'}
bytes_data = serialize(obj) # → b'{"user_id": 123, "event": "login"}'
# Consumer десериализует
obj = deserialize(bytes_data) # → {'user_id': 123, 'event': 'login'}import json
from confluent_kafka import Producer, Consumer
# Producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
order = {
'order_id': 123,
'user_id': 456,
'amount': 99.99,
'items': ['item1', 'item2']
}
p.produce(
topic='orders',
key=f'order_{order["order_id"]}'.encode(),
value=json.dumps(order).encode('utf-8')
)
# Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors'
})
c.subscribe(['orders'])
while True:
msg = c.poll(timeout=1.0)
if msg and not msg.error():
order = json.loads(msg.value().decode('utf-8'))
print(f'Order: {order}')from datetime import datetime
from decimal import Decimal
import json
class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, Decimal):
return float(obj)
if isinstance(obj, bytes):
return obj.decode('utf-8')
return super().default(obj)
def serialize(obj):
return json.dumps(obj, cls=CustomEncoder).encode('utf-8')
def deserialize(data):
return json.loads(data.decode('utf-8'))
# Использование
order = {
'order_id': 123,
'created_at': datetime.now(),
'amount': Decimal('99.99')
}
bytes_data = serialize(order)
restored = deserialize(bytes_data)from jsonschema import validate, ValidationError
ORDER_SCHEMA = {
'type': 'object',
'required': ['order_id', 'user_id', 'amount'],
'properties': {
'order_id': {'type': 'integer'},
'user_id': {'type': 'integer'},
'amount': {'type': 'number', 'minimum': 0},
'items': {'type': 'array', 'items': {'type': 'string'}}
}
}
def validate_order(order):
try:
validate(instance=order, schema=ORDER_SCHEMA)
return True
except ValidationError as e:
print(f'Validation error: {e.message}')
return False
# Producer с валидацией
def send_order(order):
if not validate_order(order):
raise ValueError('Invalid order schema')
p.produce(
topic='orders',
value=json.dumps(order).encode('utf-8')
)Avro — бинарный формат сериализации с использованием схем. Преимущества:
pip install confluent-kafka[avro]
# или
pip install fastavrofrom confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
# Схема Avro
value_schema_str = """
{
"namespace": "com.example.orders",
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "int"},
{"name": "user_id", "type": "int"},
{"name": "amount", "type": "double"},
{"name": "items", "type": {"type": "array", "items": "string"}}
]
}
"""
key_schema_str = """
{
"namespace": "com.example.orders",
"type": "record",
"name": "OrderKey",
"fields": [
{"name": "order_id", "type": "int"}
]
}
"""
# Producer с Avro
producer = AvroProducer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
}, default_key_schema=key_schema_str, default_value_schema=value_schema_str)
order = {
'order_id': 123,
'user_id': 456,
'amount': 99.99,
'items': ['item1', 'item2']
}
producer.produce(
topic='orders',
key={'order_id': 123},
value=order
)
producer.flush()
# Consumer с Avro
consumer = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'schema.registry.url': 'http://localhost:8081',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])
while True:
try:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise msg.error()
# Avro автоматически десериализует в dict
order = msg.value()
print(f'Order: {order}')
print(f'Order ID: {order["order_id"]}')
consumer.commit(msg)
except SerializerError as e:
print(f'Avro serialization error: {e}')
except Exception as e:
print(f'Error: {e}')
break
consumer.close()import fastavro
from io import BytesIO
# Схема Avro
schema = {
'namespace': 'com.example.orders',
'type': 'record',
'name': 'Order',
'fields': [
{'name': 'order_id', 'type': 'int'},
{'name': 'user_id', 'type': 'int'},
{'name': 'amount', 'type': 'double'},
{'name': 'items', 'type': {'type': 'array', 'items': 'string'}}
]
}
# Сериализация
def serialize_avro(record, schema):
buffer = BytesIO()
fastavro.schemaless_writer(buffer, schema, record)
return buffer.getvalue()
# Десериализация
def deserialize_avro(data, schema):
buffer = BytesIO(data)
return fastavro.schemaless_reader(buffer, schema)
# Использование
order = {
'order_id': 123,
'user_id': 456,
'amount': 99.99,
'items': ['item1', 'item2']
}
bytes_data = serialize_avro(order, schema)
restored = deserialize_avro(bytes_data, schema)
print(f'Original: {order}')
print(f'Restored: {restored}')Schema Registry — сервис для хранения и управления схемами Avro. Обеспечивает:
# docker-compose.yml
version: '3.8'
services:
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: FULLimport requests
SR_URL = 'http://localhost:8081'
# Регистрация схемы
def register_schema(subject, schema):
response = requests.post(
f'{SR_URL}/subjects/{subject}/versions',
headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'},
json={'schema': schema}
)
response.raise_for_status()
return response.json()
# Получение схемы по subject и version
def get_schema(subject, version='latest'):
response = requests.get(f'{SR_URL}/subjects/{subject}/versions/{version}')
response.raise_for_status()
return response.json()
# Проверка совместимости
def check_compatibility(subject, schema, version='latest'):
response = requests.post(
f'{SR_URL}/compatibility/subjects/{subject}/versions/{version}',
headers={'Content-Type': 'application/vnd.schemaregistry.v1+json'},
json={'schema': schema}
)
response.raise_for_status()
return response.json()
# Пример использования
order_schema = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "int"},
{"name": "user_id", "type": "int"},
{"name": "amount", "type": "double"}
]
}
"""
# Регистрация
result = register_schema('orders-value', order_schema)
print(f'Schema ID: {result["id"]}')
# Получение
schema_info = get_schema('orders-value', 'latest')
print(f'Schema: {schema_info["schema"]}')from confluent_kafka.schema_registry import SchemaRegistryClient, Schema, RecordMetadata
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka import Producer, Consumer
# Клиент Schema Registry
sr_client = SchemaRegistryClient({'url': 'http://localhost:8081'})
# Схема
order_schema_str = """
{
"namespace": "com.example.orders",
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "int"},
{"name": "user_id", "type": "int"},
{"name": "amount", "type": "double"}
]
}
"""
order_schema = Schema(order_schema_str, schema_type='AVRO')
# Сериализатор
order_serializer = AvroSerializer(
sr_client,
order_schema_str,
lambda ctx, obj: obj # Преобразование объекта (если нужно)
)
# Десериализатор
order_deserializer = AvroDeserializer(
sr_client,
order_schema_str,
lambda ctx, obj: obj # Преобразование объекта (если нужно)
)
# Producer с сериализацией
p = Producer({'bootstrap.servers': 'localhost:9092'})
order = {'order_id': 123, 'user_id': 456, 'amount': 99.99}
p.produce(
topic='orders',
value=order_serializer(order, ctx={'topic': 'orders'})
)
p.flush()
# Consumer с десериализацией
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest'
})
c.subscribe(['orders'])
while True:
msg = c.poll(timeout=1.0)
if msg and not msg.error():
order = order_deserializer(msg.value(), ctx={'topic': 'orders'})
print(f'Order: {order}')| Тип | Описание |
|---|---|
| BACKWARD | Новая схема читает старые данные (рекомендуется) |
| FORWARD | Старая схема читает новые данные |
| FULL | И backward, и forward совместимость |
| NONE | Без проверки совместимости |
# Глобальная настройка
curl -X PUT http://localhost:8081/config \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
# Настройка для конкретного subject
curl -X PUT http://localhost:8081/config/orders-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'# Версия 1
schema_v1 = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "int"},
{"name": "user_id", "type": "int"},
{"name": "amount", "type": "double"}
]
}
"""
# Версия 2 - добавление поля с default (BACKWARD совместимо)
schema_v2 = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "int"},
{"name": "user_id", "type": "int"},
{"name": "amount", "type": "double"},
{"name": "discount", "type": "double", "default": 0.0}
]
}
"""
# Версия 3 - удаление поля (НЕ BACKWARD совместимо!)
schema_v3 = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "int"},
{"name": "user_id", "type": "int"}
]
}
"""Правила Backward совместимости:
default[null, "string"]defaultProtobuf — альтернатива Avro от Google.
pip install protobuf// order.proto
syntax = "proto3";
package com.example.orders;
message Order {
int32 order_id = 1;
int32 user_id = 2;
double amount = 3;
repeated string items = 4;
}protoc --python_out=. order.protoimport order_pb2
from confluent_kafka import Producer, Consumer
# Producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
order = order_pb2.Order(
order_id=123,
user_id=456,
amount=99.99,
items=['item1', 'item2']
)
p.produce(
topic='orders',
key=f'order_{order.order_id}'.encode(),
value=order.SerializeToString()
)
p.flush()
# Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors'
})
c.subscribe(['orders'])
while True:
msg = c.poll(timeout=1.0)
if msg and not msg.error():
order = order_pb2.Order()
order.ParseFromString(msg.value())
print(f'Order: {order.order_id}, Amount: {order.amount}')| Формат | Размер | Скорость | Схема | Schema Registry |
|---|---|---|---|---|
| JSON | Большой | Медленно | Нет | Нет |
| Avro | Малый | Быстро | Да | Да |
| Protobuf | Малый | Очень быстро | Да | Да |
| Pickle | Средний | Быстро | Нет | Нет |
from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
# Schema Registry клиент
sr_client = SchemaRegistryClient({
'url': 'http://schema-registry:8081',
'basic.auth.user.info': 'username:password', # Если нужна аутентификация
'ssl.ca.location': '/path/to/ca.pem' # Если нужен SSL
})
# Producer конфигурация
producer_config = {
'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
'acks': 'all',
'enable.idempotence': True
}
# Consumer конфигурация
consumer_config = {
'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
}
# Сериализаторы
value_schema_str = """...""" # Avro схема
key_serializer = ... # Сериализатор для ключа
value_serializer = AvroSerializer(sr_client, value_schema_str)
value_deserializer = AvroDeserializer(sr_client, value_schema_str)from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka.schema_registry import SchemaRegistryError
class SafeAvroConsumer:
def __init__(self):
self.consumer = Consumer({...})
self.dlq_producer = Producer({...})
def consume(self):
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
continue
try:
# Десериализация может выбросить SerializerError
order = self.deserialize(msg.value())
self.process(order)
self.consumer.commit(msg)
except SerializerError as e:
# Ошибка десериализации (несоответствие схемы)
print(f'Serialization error: {e}')
self.send_to_dlq(msg, 'serialization_error')
self.consumer.commit(msg)
except SchemaRegistryError as e:
# Ошибка подключения к Schema Registry
print(f'Schema Registry error: {e}')
# Не commit — retry при следующем poll
except Exception as e:
# Другие ошибки
print(f'Unexpected error: {e}')
# Не commit для retryВ следующей теме вы изучите Consumer Groups:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.