Транзакции в Kafka, idempotent producer, exactly-once semantics, read-process-write.
Транзакции в Kafka обеспечивают атомарную запись сообщений и exactly-once семантику обработки. В этой теме вы изучите idempotent producer, транзакции и read-process-write паттерн.
Сообщение может быть потеряно, но не дублируется.
from confluent_kafka import Producer
p = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 0 # Нет подтверждений
})
p.produce('orders', value=b'message')
# Сообщение отправлено без подтверждения — может быть потеряноUse cases: метрики, логи (потеря отдельных записей допустима).
Сообщение гарантированно доставлено, но может дублироваться.
from confluent_kafka import Producer
p = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all', # Ждём подтверждения от всех ISR
'retries': 5 # Retry при ошибках
})
p.produce('orders', value=b'message')
# Сообщение подтверждено, но при retry может быть дубликатUse cases: финансовые транзакции, заказы (потеря недопустима).
Сообщение доставлено ровно один раз.
from confluent_kafka import Producer
p = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all',
'enable.idempotence': True # Идемпотентность
})
# Или транзакции для атомарной записи в несколько топиков
p.init_transactions()
p.begin_transaction()
p.produce('orders', value=b'message')
p.produce('order-events', value=b'event')
p.commit_transaction()Use cases: критичные транзакции, агрегации, join.
Идемпотентный producer гарантирует, что каждое сообщение будет записано в партицию ровно один раз, даже при retry.
1. Producer получает PID (Producer ID) от coordinator
2. Каждое сообщение получает sequence number
3. Broker отслеживает PID + sequence number
4. Дубликаты (тот же PID + sequence) отклоняются
Producer Broker
│ │
│──PID request──▶│
│◀─PID=42────────│
│ │
│──msg seq=1────▶│ OK
│──msg seq=2────▶│ OK
│──msg seq=2────▶│ DUPLICATE (отклонено)
│──msg seq=3────▶│ OK
from confluent_kafka import Producer
p = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all', # Обязательно для idempotence
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 5, # Должно быть <= 5
'transactional.id': None # Не транзакционный, только idempotent
})Важно: max.in.flight.requests.per.connection должно быть ≤ 5 для сохранения порядка сообщений при retry.
from confluent_kafka import Producer, KafkaError
class IdempotentOrderProducer:
def __init__(self):
self.p = Producer({
'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
'acks': 'all',
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 5,
'retries': 5,
'retry.backoff.ms': 100
})
def send_order(self, order_id, data):
def callback(err, msg):
if err:
print(f'Order {order_id} failed: {err}')
else:
print(f'Order {order_id} delivered: {msg.topic()} [{msg.partition()}] @{msg.offset()}')
self.p.produce(
topic='orders',
key=f'order_{order_id}'.encode(),
value=data.encode(),
callback=callback
)
def flush(self):
self.p.flush()
# Использование
producer = IdempotentOrderProducer()
for i in range(100):
producer.send_order(i, f'{{"order_id": {i}}}')
producer.flush()Транзакции обеспечивают атомарную запись сообщений в несколько топиков и партиций.
from confluent_kafka import Producer
p = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'order-producer-1', # Уникальный ID
'acks': 'all',
'enable.idempotence': True # Обязательно для транзакций
})
# Инициализация (получение PID, регистрация)
p.init_transactions()from confluent_kafka import Producer, KafkaError
p = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'order-producer-1'
})
p.init_transactions()
try:
# Начало транзакции
p.begin_transaction()
# Отправка сообщений в несколько топиков
p.produce('orders', key=b'order_1', value=b'{"id": 1}')
p.produce('order-events', key=b'order_1', value=b'{"event": "created"}')
p.produce('notifications', key=b'user_1', value=b'{"type": "order_created"}')
# Commit транзакции
p.commit_transaction()
print('Transaction committed')
except KafkaError as e:
# Abort при ошибке
p.abort_transaction()
print(f'Transaction aborted: {e}')from confluent_kafka import Consumer, Producer, KafkaError, TopicPartition
# Consumer конфигурация
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor',
'enable.auto.commit': False,
'isolation.level': 'read_committed' # Читать только committed сообщения
})
# Producer конфигурация
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'order-processor-1',
'enable.idempotence': True
})
producer.init_transactions()
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise msg.error()
try:
# Начало транзакции
producer.begin_transaction()
# Обработка сообщения
order_data = process_order(msg.value())
# Отправка результатов в другие топики
producer.produce('processed-orders', value=order_data)
producer.produce('order-events', value=b'{"event": "processed"}')
# Commit offset в транзакции
# offsets_to_send включает consumer offset
offsets_to_send = consumer.position(consumer.assignment())
offsets_to_send.append(TopicPartition(msg.topic(), msg.partition(), msg.offset() + 1))
producer.send_offsets_to_transaction(
offsets_to_send,
consumer.consumer_group_metadata()
)
# Commit транзакции
producer.commit_transaction()
print(f'Processed order: {msg.value()}')
except KafkaError as e:
# Abort транзакции при ошибке
producer.abort_transaction()
print(f'Transaction aborted: {e}')
# Сообщение будет прочитано снова (не было commit offset)
except KeyboardInterrupt:
pass
finally:
consumer.close()Паттерн read-process-write обеспечивает атомарную обработку: сообщение читается, обрабатывается, результат записывается — всё в одной транзакции.
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Orders │────▶│ Processor │────▶│ Processed │
│ (input) │ │ (consumer) │ │ (output) │
└─────────────┘ └──────────────┘ └─────────────┘
│
▼
┌──────────────┐
│ Events │
│ (output) │
└──────────────┘
from confluent_kafka import Consumer, Producer, KafkaError, TopicPartition
import json
class OrderProcessor:
def __init__(self):
# Consumer
self.consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor',
'enable.auto.commit': False,
'isolation.level': 'read_committed',
'auto.offset.reset': 'earliest'
})
# Transactional Producer
self.producer = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'order-processor-1',
'enable.idempotence': True,
'acks': 'all'
})
self.producer.init_transactions()
self.consumer.subscribe(['orders'])
def process_order(self, order_data):
"""Бизнес-логика обработки заказа"""
order = json.loads(order_data.decode('utf-8'))
# Валидация
if order['amount'] <= 0:
raise ValueError('Invalid amount')
# Трансформация
processed = {
'order_id': order['order_id'],
'status': 'processed',
'amount': order['amount'],
'tax': order['amount'] * 0.1
}
return json.dumps(processed).encode('utf-8')
def run(self):
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise msg.error()
self.process_with_transaction(msg)
except KeyboardInterrupt:
print('Shutting down...')
finally:
self.consumer.close()
def process_with_transaction(self, msg):
"""Обработка сообщения в транзакции"""
try:
self.producer.begin_transaction()
# Обработка
processed_data = self.process_order(msg.value())
# Запись результатов
self.producer.produce(
'processed-orders',
key=msg.key(),
value=processed_data
)
self.producer.produce(
'order-events',
key=msg.key(),
value=json.dumps({
'event': 'order_processed',
'order_id': json.loads(msg.value())['order_id']
}).encode()
)
# Commit offset в транзакции
self.commit_offset_in_transaction(msg)
# Commit транзакции
self.producer.commit_transaction()
except Exception as e:
# Abort при любой ошибке
self.producer.abort_transaction()
print(f'Transaction aborted: {e}')
# Сообщение будет прочитано снова
def commit_offset_in_transaction(self, msg):
"""Commit consumer offset в транзакции"""
# Получение текущих position
offsets = self.consumer.position(self.consumer.assignment())
# Добавление offset текущего сообщения
current_offset = msg.offset() + 1
offsets.append(TopicPartition(msg.topic(), msg.partition(), current_offset))
# Отправка offset в транзакции
self.producer.send_offsets_to_transaction(
offsets,
self.consumer.consumer_group_metadata()
)
# Запуск
processor = OrderProcessor()
processor.run()EXACTLY_ONCE_PRODUCER_CONFIG = {
'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
# Транзакции
'transactional.id': 'my-producer-1', # Уникальный ID
'enable.idempotence': True, # Обязательно
# Гарантии доставки
'acks': 'all',
'min.insync.replicas': 2,
# Retry
'retries': 5,
'retry.backoff.ms': 100,
# Performance
'compression.type': 'lz4',
'batch.size': 1048576,
'linger.ms': 5
}EXACTLY_ONCE_CONSUMER_CONFIG = {
'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
'group.id': 'my-consumer-group',
# Читать только committed сообщения
'isolation.level': 'read_committed',
# Offset management
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
# Session management
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 10000
}from confluent_kafka import KafkaError
# Ошибки транзакций
KafkaError._INVALID_TRANSACTION # Невалидное состояние транзакции
KafkaError._TRANSACTIONAL_ID_AUTHORIZATION_FAILED # Auth ошибка
KafkaError._TRANSACTION_TIMEOUT # Таймаут транзакции
KafkaError._CONCURRENT_TRANSACTIONS # Конкурирующие транзакцииfrom confluent_kafka import Producer, KafkaError
import time
class TransactionalProducerWithRetry:
def __init__(self, transactional_id):
self.producer = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': transactional_id,
'enable.idempotence': True,
'transaction.timeout.ms': 60000
})
self.producer.init_transactions()
def send_with_retry(self, topic, key, value, max_retries=3):
for attempt in range(max_retries):
try:
self.producer.begin_transaction()
self.producer.produce(topic, key=key, value=value)
self.producer.commit_transaction()
return True
except KafkaError as e:
if e.code() == KafkaError._CONCURRENT_TRANSACTIONS:
# Другая транзакция с тем же transactional.id
wait_time = 2 ** attempt # Exponential backoff
print(f'Concurrent transaction, retrying in {wait_time}s')
time.sleep(wait_time)
# Abort текущей транзакции
self.producer.abort_transaction()
else:
# Другие ошибки
self.producer.abort_transaction()
raise
return Falsep = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'my-producer',
'enable.idempotence': True,
'transaction.timeout.ms': 60000, # 60 секунд
'message.timeout.ms': 30000 # Должно быть < transaction.timeout.ms
})
p.init_transactions()
try:
p.begin_transaction()
# Долгая обработка может вызвать timeout
result = long_running_operation()
p.produce('output', value=result)
p.commit_transaction()
except KafkaError as e:
if e.code() == KafkaError._TRANSACTION_TIMEOUT:
# Транзакция aborted брокером из-за timeout
print('Transaction timed out')
# Не нужно вызывать abort_transaction() — уже abortedfrom confluent_kafka import Producer
p = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'my-producer'
})
# Получение метрик
metrics = p.metrics()
# Метрики транзакций
print(metrics['producer']['transactional-id'])
print(metrics['producer']['transactional-state']) # current state
print(metrics['topics']['orders']['txmsgs']) # committed messagesimport logging
from confluent_kafka import Producer
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('kafka.transaction')
class LoggedTransactionalProducer:
def __init__(self, transactional_id):
self.producer = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': transactional_id,
'enable.idempotence': True,
'debug': 'transaction' # Debug категория
})
self.producer.init_transactions()
def process(self, topic, key, value):
try:
logger.info(f'Beginning transaction {transactional_id}')
self.producer.begin_transaction()
self.producer.produce(topic, key=key, value=value)
self.producer.commit_transaction()
logger.info(f'Transaction {transactional_id} committed')
except KafkaError as e:
self.producer.abort_transaction()
logger.error(f'Transaction {transactional_id} aborted: {e}')
raisePRODUCTION_CONFIG = {
# Брокеры
'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
# Транзакции
'transactional.id': 'my-producer-1',
'enable.idempotence': True,
'transaction.timeout.ms': 60000,
# Гарантии
'acks': 'all',
'min.insync.replicas': 2,
# Retry
'retries': 5,
'retry.backoff.ms': 100,
'delivery.timeout.ms': 120000,
# Performance
'compression.type': 'lz4',
'batch.size': 1048576,
'linger.ms': 5,
# Ограничения
'max.in.flight.requests.per.connection': 5,
'message.max.bytes': 1000012
}import signal
import sys
from confluent_kafka import Producer
class GracefulTransactionalProducer:
def __init__(self, transactional_id):
self.producer = Producer({
**PRODUCTION_CONFIG,
'transactional.id': transactional_id
})
self.producer.init_transactions()
self.running = True
self.in_transaction = False
signal.signal(signal.SIGINT, self.shutdown)
signal.signal(signal.SIGTERM, self.shutdown)
def shutdown(self, signum, frame):
print('Shutting down...')
self.running = False
if self.in_transaction:
print('Aborting in-flight transaction')
self.producer.abort_transaction()
def send(self, topic, key, value):
if not self.running:
return
try:
self.producer.begin_transaction()
self.in_transaction = True
self.producer.produce(topic, key=key, value=value)
self.producer.commit_transaction()
self.in_transaction = False
except Exception as e:
self.producer.abort_transaction()
self.in_transaction = False
raiseВ следующей теме вы изучите Production-паттерны:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.