Создание и настройка producer, отправка сообщений, уровни гарантий доставки, acks, retry.
Producer — это клиент Kafka, который публикует сообщения в топики. В этой теме вы изучите детальную настройку producer, гарантии доставки и оптимизацию производительности.
from confluent_kafka import Producer
# Минимальная конфигурация
p = Producer({
'bootstrap.servers': 'localhost:9092'
})
# Production-конфигурация
p = Producer({
'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
'acks': 'all',
'retries': 5,
'retry.backoff.ms': 100,
'queue.buffering.max.ms': 5,
'compression.type': 'lz4'
})from kafka import KafkaProducer
# Минимальная конфигурация
p = KafkaProducer(bootstrap_servers='localhost:9092')
# Production-конфигурация
p = KafkaProducer(
bootstrap_servers=['broker1:9092', 'broker2:9092', 'broker3:9092'],
acks='all',
retries=5,
retry_backoff_ms=100,
compression_type='lz4',
batch_size=16384,
linger_ms=5
)from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
# Отправка без ключа (round-robin)
p.produce(
topic='orders',
value=b'{"order_id": 123}'
)
# Отправка с ключом (hash-based partitioning)
p.produce(
topic='orders',
key=b'user_456',
value=b'{"order_id": 123}'
)
# Отправка с указанием партиции
p.produce(
topic='orders',
partition=0,
value=b'{"order_id": 123}'
)
# Ожидание отправки всех сообщений
p.flush()def delivery_report(err, msg):
"""Callback вызывается при доставке сообщения или ошибке"""
if err is not None:
print(f'Delivery failed: {err}')
# Логирование ошибки, retry, отправка в DLQ
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
p = Producer({'bootstrap.servers': 'localhost:9092'})
p.produce(
topic='orders',
key=b'user_456',
value=b'{"order_id": 123}',
callback=delivery_report
)
p.flush()import json
from confluent_kafka import Producer
p = Producer({
'bootstrap.servers': 'localhost:9092',
'queue.buffering.max.ms': 10, # Batch window
'batch.num.messages': 10000 # Максимум сообщений в batch
})
def send_order(order_id, user_id, amount):
value = json.dumps({
'order_id': order_id,
'user_id': user_id,
'amount': amount
}).encode('utf-8')
p.produce(
topic='orders',
key=f'user_{user_id}'.encode(),
value=value,
callback=lambda err, msg: print(f'Order {order_id}: {err or "OK"}')
)
# Отправка множества сообщений
for i in range(1000):
send_order(i, i % 100, 100.0 + i)
# Flush ожидает отправки всех queued сообщений
p.flush(timeout=10)Kafka поддерживает три уровня гарантий доставки:
| Уровень | Описание | Когда использовать |
|---|---|---|
| At most once | Сообщение может быть потеряно, но не дублируется | Метрики, логи (потеря отдельных записей допустима) |
| At least once | Сообщение гарантированно доставлено, но может дублироваться | Финансовые транзакции, заказы (потеря недопустима) |
| Exactly once | Сообщение доставлено ровно один раз | Критичные транзакции с идемпотентной обработкой |
Параметр acks определяет сколько реплик должны подтвердить запись:
# acks=0 — нет подтверждений (at most once)
p = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 0 # или '0'
})
# Быстро, но сообщение может быть потеряно при отказе брокера
# acks=1 — лидер подтверждает (at least once)
p = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 1 # или '1'
})
# Сообщение сохранено у лидера, но может быть потеряно при failover до репликации
# acks=all — все ISR подтверждают (at least once, надёжнее)
p = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all' # или -1
})
# Сообщение сохранено во всех ISR-репликахИдемпотентный producer гарантирует, что каждое сообщение будет записано ровно один раз, даже при retry:
p = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all',
'enable.idempotence': True, # Включить идемпотентность
'max.in.flight.requests.per.connection': 5 # Должно быть <= 5
})Как это работает:
Ограничения:
max.in.flight.requests.per.connection ≤ 5p = Producer({
'bootstrap.servers': 'localhost:9092',
'retries': 5, # Количество попыток
'retry.backoff.ms': 100, # Задержка между retry
'delivery.timeout.ms': 120000, # Общий таймаут доставки
'message.timeout.ms': 30000 # Таймаут одного сообщения
})from confluent_kafka import KafkaError, Producer
class OrderProducer:
def __init__(self):
self.p = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all',
'retries': 3,
'retry.backoff.ms': 100
})
self.failed_messages = []
def delivery_callback(self, err, msg):
if err is not None:
# Классификация ошибок
if err.code() == KafkaError.MSG_SIZE_TOO_LARGE:
print(f'Message too large, sending to DLQ')
self.send_to_dlq(msg)
elif err.code() == KafkaError.NOT_ENOUGH_REPLICAS:
print(f'Not enough replicas, retrying')
self.retry_message(msg)
else:
print(f'Delivery failed: {err}')
self.failed_messages.append(msg)
else:
print(f'Delivered: {msg.topic()} [{msg.partition()}] @{msg.offset()}')
def send_order(self, order_data):
self.p.produce(
topic='orders',
key=order_data['user_id'].encode(),
value=json.dumps(order_data).encode(),
callback=self.delivery_callback
)
def send_to_dlq(self, msg):
# Отправка в dead letter queue
self.p.produce('orders-dlq', value=msg.value())
def retry_message(self, msg):
# Повторная отправка
self.p.produce(msg.topic(), key=msg.key(), value=msg.value())
def flush(self):
self.p.flush()from confluent_kafka import KafkaError
# Ошибки доставки
KafkaError.MSG_SIZE_TOO_LARGE # Сообщение превышает max.message.bytes
KafkaError.NOT_ENOUGH_REPLICAS # ISR < min.insync.replicas
KafkaError.NOT_LEADER_FOR_PARTITION # Брокер не лидер для партиции
KafkaError.UNKNOWN_TOPIC_OR_PART # Топик или партиция не существуют
# Ошибки сети
KafkaError._MSG_TIMED_OUT # Таймаут отправки
KafkaError._PARTITION_EOF # Достигнут конец партиции (не ошибка)
KafkaError._TRANSPORT # Ошибка соединенияProducer batching группирует сообщения в один request для уменьшения overhead:
p = Producer({
'bootstrap.servers': 'localhost:9092',
# Batch настройки
'queue.buffering.max.ms': 10, # Время ожидания batch (0 = немедленно)
'batch.num.messages': 10000, # Макс сообщений в batch
'batch.size': 1048576, # Макс размер batch в байтах (1MB)
# Linger — задержка для накопления batch
'linger.ms': 5, # Ждать 5ms для накопления сообщений
})Компромисс:
Сжатие уменьшает размер сообщений и network I/O:
p = Producer({
'bootstrap.servers': 'localhost:9092',
'compression.type': 'lz4' # или 'gzip', 'snappy', 'zstd'
})Сравнение алгоритмов:
| Алгоритм | Compression ratio | Speed | CPU usage |
|---|---|---|---|
| none | 1x | Быстро | Низкое |
| gzip | Высокое | Медленно | Высокое |
| snappy | Среднее | Очень быстро | Низкое |
| lz4 | Среднее | Быстро | Низкое |
| zstd | Высокое | Быстро | Среднее |
Рекомендация: lz4 или zstd для большинства use cases.
p = Producer({
'bootstrap.servers': 'localhost:9092',
# Buffer настройки
'queue.buffering.max.messages': 100000, # Макс сообщений в очереди
'queue.buffering.max.kbytes': 1048576, # Макс размер очереди (1GB)
'message.max.bytes': 1000012, # Макс размер сообщения (1MB)
})Для максимального throughput используйте несколько producer instances:
from concurrent.futures import ThreadPoolExecutor
from confluent_kafka import Producer
def create_producer():
return Producer({
'bootstrap.servers': 'localhost:9092',
'compression.type': 'lz4',
'batch.size': 1048576
})
class ParallelProducer:
def __init__(self, num_producers=4):
self.producers = [create_producer() for _ in range(num_producers)]
self.executor = ThreadPoolExecutor(max_workers=num_producers)
def send(self, topic, key, value):
# Round-robin между producer
producer = self.producers[hash(key) % len(self.producers)]
producer.produce(topic, key=key, value=value)
def flush_all(self):
for p in self.producers:
p.flush()# С ключом — hash(key) % num_partitions
p.produce('orders', key=b'user_123', value=b'...')
# Без ключа — round-robin
p.produce('orders', value=b'...')from confluent_kafka import Producer
def custom_partitioner(key, all_partitions, available_partitions):
"""Кастомная логика выбора партиции"""
if key is None:
return available_partitions[0] # Default
# Парсинг ключа
key_str = key.decode('utf-8')
# VIP пользователи — в отдельную партицию
if key_str.startswith('vip_'):
return 0 # Партиция 0 для VIP
# Остальные — hash-based
return hash(key) % len(available_partitions)
p = Producer({
'bootstrap.servers': 'localhost:9092',
'partitioner': 'consistent_random' # Встроенные: random, consistent, consistent_random
})
# Custom partitioner требует ручного выбора
def send_with_custom_partition(topic, key, value):
partition = custom_partitioner(key, range(6), range(6))
p.produce(topic, partition=partition, key=key, value=value)from kafka import KafkaProducer
from kafka.partitioner.roundrobin import RoundRobinPartitioner
from kafka.partitioner.hashed import HashedPartitioner
# Встроенные partitioner
p1 = KafkaProducer(
bootstrap_servers='localhost:9092',
partitioner=RoundRobinPartitioner()
)
p2 = KafkaProducer(
bootstrap_servers='localhost:9092',
partitioner=HashedPartitioner()
)
# Кастомный partitioner
class VIPPartitioner:
def __call__(self, key, all_partitions, available_partitions):
if key and key.decode().startswith('vip_'):
return 0
return hash(key) % len(available_partitions) if key else available_partitions[0]
p3 = KafkaProducer(
bootstrap_servers='localhost:9092',
partitioner=VIPPartitioner()
)Транзакционный producer для exactly-once semantics:
from confluent_kafka import Producer
p = Producer({
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'order-producer-1', # Уникальный ID
'acks': 'all',
'enable.idempotence': True
})
# Инициализация транзакций
p.init_transactions()
try:
# Начало транзакции
p.begin_transaction()
# Отправка сообщений в несколько топиков
p.produce('orders', key=b'order_1', value=b'{"id": 1}')
p.produce('order-events', key=b'order_1', value=b'{"event": "created"}')
p.produce('notifications', key=b'user_1', value=b'{"type": "order_created"}')
# Commit транзакции
p.commit_transaction()
except KafkaError as e:
# Abort при ошибке
p.abort_transaction()
print(f'Transaction aborted: {e}')Важно: Транзакции требуют transactional.id и enable.idempotence=true.
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
# Получение метрик
metrics = p.metrics()
# Пример метрик
print(metrics['brokers']['localhost:9092']['outbuf_msg_cnt']) # Сообщения в очереди
print(metrics['topics']['orders']['txmsgs']) # Отправлено сообщений
print(metrics['topics']['orders']['txbytes']) # Отправлено байтimport logging
from confluent_kafka import Producer
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('kafka.producer')
class LoggingProducer:
def __init__(self):
self.p = Producer({
'bootstrap.servers': 'localhost:9092',
'debug': 'broker,topic,msg', # Debug категории
'log_level': 6 # INFO
})
def send(self, topic, key, value):
logger.info(f'Sending to {topic}: key={key}, value={value}')
def callback(err, msg):
if err:
logger.error(f'Delivery failed: {err}')
else:
logger.info(f'Delivered: {msg.topic()} [{msg.partition()}] @{msg.offset()}')
self.p.produce(topic, key=key, value=value, callback=callback)PRODUCTION_CONFIG = {
# Брокеры
'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
# Гарантии доставки
'acks': 'all',
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 5,
# Retry
'retries': 5,
'retry.backoff.ms': 100,
'delivery.timeout.ms': 120000,
# Performance
'compression.type': 'lz4',
'batch.size': 1048576,
'linger.ms': 5,
'queue.buffering.max.ms': 10,
# Buffer
'queue.buffering.max.messages': 100000,
'message.max.bytes': 1000012,
}
p = Producer(PRODUCTION_CONFIG)import signal
import sys
from confluent_kafka import Producer
class GracefulProducer:
def __init__(self):
self.p = Producer(PRODUCTION_CONFIG)
self.running = True
signal.signal(signal.SIGINT, self.shutdown)
signal.signal(signal.SIGTERM, self.shutdown)
def shutdown(self, signum, frame):
print('Shutting down producer...')
self.running = False
self.p.flush(timeout=10)
sys.exit(0)
def run(self):
while self.running:
# Отправка сообщений
self.p.produce('orders', key=b'key', value=b'value')
self.p.poll(0) # Обработка callbacks
self.p.flush()В следующей теме вы изучите Consumer API:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.