Создание consumer, подписка на топологи, poll, commit offset, ручное и автоматическое управление.
Consumer — это клиент Kafka, который читает сообщения из топиков. В этой теме вы изучите создание consumer, управление offset, poll loop и обработку ошибок.
from confluent_kafka import Consumer, KafkaError
# Минимальная конфигурация
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-group'
})
# Production-конфигурация
c = Consumer({
'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 10000,
'max.poll.interval.ms': 300000
})from kafka import KafkaConsumer
# Минимальная конфигурация
c = KafkaConsumer(
'orders',
bootstrap_servers='localhost:9092',
group_id='my-consumer-group'
)
# Production-конфигурация
c = KafkaConsumer(
'orders',
bootstrap_servers=['broker1:9092', 'broker2:9092', 'broker3:9092'],
group_id='order-processors',
auto_offset_reset='earliest',
enable_auto_commit=False,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
max_poll_interval_ms=300000
)from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
})
# Подписка на один топик
c.subscribe(['orders'])
# Подписка на несколько топиков
c.subscribe(['orders', 'payments', 'shipments'])
# Подписка с regex (все топики, начинающиеся с 'events-')
c.subscribe(['^events-.*'])
# Callback при назначении партиций
def on_assign(consumer, partitions):
print(f'Assigned partitions: {[p.partition for p in partitions]}')
# Callback при отзыве партиций
def on_revoke(consumer, partitions):
print(f'Revoked partitions: {[p.partition for p in partitions]}')
# Commit offset перед rebalance
consumer.commit(asynchronous=False)
c.subscribe(['orders'], on_assign=on_assign, on_revoke=on_revoke)from confluent_kafka import TopicPartition
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
})
# Ручное назначение партиций (без consumer group)
c.assign([
TopicPartition('orders', partition=0, offset=0),
TopicPartition('orders', partition=1, offset=100),
TopicPartition('orders', partition=2, offset='earliest')
])from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'enable.auto.commit': False
})
c.subscribe(['orders'])
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
# Нет сообщений за timeout
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# Достигнут конец партиции
continue
elif msg.error():
raise msg.error()
# Обработка сообщения
print(f'Received: {msg.value().decode()}')
print(f'Topic: {msg.topic()}, Partition: {msg.partition()}, Offset: {msg.offset()}')
# Commit offset после обработки
c.commit(msg)
except KeyboardInterrupt:
pass
finally:
c.close()from confluent_kafka import Consumer
import time
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'batch-processor',
'enable.auto.commit': False,
'max.poll.interval.ms': 300000
})
c.subscribe(['orders'])
batch = []
BATCH_SIZE = 100
BATCH_TIMEOUT = 5.0 # секунды
last_batch_time = time.time()
try:
while True:
msg = c.poll(timeout=0.1)
if msg is not None and not msg.error():
batch.append(msg)
current_time = time.time()
# Commit batch по размеру или таймауту
if len(batch) >= BATCH_SIZE or (batch and current_time - last_batch_time >= BATCH_TIMEOUT):
process_batch(batch)
# Commit offset последнего сообщения в batch
c.commit(batch[-1])
batch = []
last_batch_time = current_time
except KeyboardInterrupt:
# Обработка оставшихся сообщений
if batch:
process_batch(batch)
c.commit(batch[-1])
finally:
c.close()
def process_batch(messages):
"""Обработка batch сообщений"""
orders = [json.loads(msg.value().decode()) for msg in messages]
# Bulk обработка
db.bulk_insert(orders)import asyncio
from confluent_kafka import Consumer
from concurrent.futures import ThreadPoolExecutor
class AsyncConsumer:
def __init__(self, config, topics):
self.consumer = Consumer(config)
self.consumer.subscribe(topics)
self.executor = ThreadPoolExecutor(max_workers=4)
self.running = True
async def consume(self):
loop = asyncio.get_event_loop()
while self.running:
# Poll в executor для неблокирующей работы
msg = await loop.run_in_executor(
self.executor,
self.consumer.poll,
0.1
)
if msg and not msg.error():
# Обработка в background
asyncio.create_task(self.process_message(msg))
self.consumer.close()
async def process_message(self, msg):
"""Асинхронная обработка сообщения"""
data = json.loads(msg.value().decode())
await self.handle_order(data)
# Commit в main thread
await asyncio.get_event_loop().run_in_executor(
self.executor,
self.consumer.commit,
msg,
False
)
async def handle_order(self, order):
# Бизнес-логика
await asyncio.sleep(0.1)c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'enable.auto.commit': True, # Включить auto commit
'auto.commit.interval.ms': 5000 # Commit каждые 5 секунд
})
c.subscribe(['orders'])
try:
while True:
msg = c.poll(timeout=1.0)
if msg and not msg.error():
process(msg)
# Commit произойдёт автоматически в фоновом режиме
except KeyboardInterrupt:
pass
finally:
c.close()Проблемы auto commit:
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'enable.auto.commit': False # Ручной commit
})
c.subscribe(['orders'])
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
continue
try:
# Обработка сообщения
process(msg)
# Commit ПОСЛЕ успешной обработки
c.commit(msg, asynchronous=False)
except Exception as e:
# Логирование ошибки, retry, DLQ
print(f'Processing failed: {e}')
# Не commit — сообщение будет прочитано снова
except KeyboardInterrupt:
pass
finally:
c.close()from confluent_kafka import TopicPartition
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'enable.auto.commit': False
})
c.subscribe(['orders'])
# Commit конкретного offset
tp = TopicPartition('orders', partition=0, offset=100)
c.commit(offsets=[tp], asynchronous=False)
# Получение текущих offset
offsets = c.position([TopicPartition('orders', 0)])
print(f'Current offset: {offsets[0].offset}')
# Получение committed offset
committed = c.committed([TopicPartition('orders', 0)], timeout=10)
print(f'Committed offset: {committed[0].offset}')import psycopg2
from confluent_kafka import Consumer
class DatabaseOffsetConsumer:
def __init__(self):
self.consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'db-offset-consumer',
'enable.auto.commit': False
})
self.consumer.subscribe(['orders'])
self.conn = psycopg2.connect('dbname=mydb')
self.cursor = self.conn.cursor()
def get_stored_offset(self, partition):
"""Получение offset из базы данных"""
self.cursor.execute(
'SELECT offset FROM consumer_offsets WHERE topic=%s AND partition=%s',
('orders', partition)
)
result = self.cursor.fetchone()
return result[0] if result else 0
def store_offset(self, partition, offset):
"""Сохранение offset в базу данных"""
self.cursor.execute(
'''INSERT INTO consumer_offsets (topic, partition, offset)
VALUES (%s, %s, %s)
ON CONFLICT (topic, partition) DO UPDATE SET offset=%s''',
('orders', partition, offset, offset)
)
self.conn.commit()
def consume(self):
for partition in range(6):
offset = self.get_stored_offset(partition)
print(f'Starting from offset {offset} for partition {partition}')
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg and not msg.error():
process(msg.value())
self.store_offset(msg.partition(), msg.offset() + 1)
finally:
self.consumer.close()
self.conn.close()Настройка auto.offset.reset определяет поведение при отсутствии committed offset:
# earliest — начать с самого начала (oldest сообщение)
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
})
# latest — начать с текущего момента (новые сообщения)
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'latest'
})
# error — выбросить ошибку если нет committed offset
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'error'
})Когда использовать:
earliest: новый consumer должен обработать все исторические данныеlatest: consumer должен обрабатывать только новые событияerror: для отладки и явного контроля offsetfrom confluent_kafka import KafkaError
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
})
c.subscribe(['orders'])
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# Конец партиции — не ошибка, просто нет новых сообщений
continue
elif msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
# Топик не существует
print(f'Topic not found: {msg.topic()}')
elif msg.error().code() == KafkaError._AUTHENTICATION:
# Ошибка аутентификации
print('Authentication failed')
break
elif msg.error().code() == KafkaError.OFFSET_OUT_OF_RANGE:
# Offset вне диапазона (например, после retention)
print('Offset out of range, resetting')
c.seek(msg) # Сброс к valid offset
else:
# Другие ошибки
raise msg.error()
process(msg)
except KeyboardInterrupt:
pass
finally:
c.close()import json
from confluent_kafka import Consumer, KafkaError
class SafeConsumer:
def __init__(self):
self.consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'safe-consumer',
'enable.auto.commit': False
})
self.consumer.subscribe(['orders'])
self.dlq_producer = Producer({'bootstrap.servers': 'localhost:9092'})
def consume(self):
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
continue
try:
# Попытка десериализации
data = json.loads(msg.value().decode('utf-8'))
# Валидация данных
self.validate_order(data)
# Обработка
self.process_order(data)
# Commit после успешной обработки
self.consumer.commit(msg)
except json.JSONDecodeError as e:
# Невалидный JSON — poison pill
print(f'Invalid JSON: {e}')
self.send_to_dlq(msg, 'invalid_json')
self.consumer.commit(msg) # Commit чтобы не читать снова
except ValidationError as e:
# Ошибка валидации
print(f'Validation failed: {e}')
self.send_to_dlq(msg, 'validation_error')
self.consumer.commit(msg)
except Exception as e:
# Неожиданная ошибка — не commit для retry
print(f'Unexpected error: {e}')
# Не commit — сообщение будет прочитано снова
def send_to_dlq(self, msg, reason):
"""Отправка problematic сообщения в DLQ"""
self.dlq_producer.produce(
'orders-dlq',
key=msg.key(),
value=msg.value(),
headers={'dlq-reason': reason.encode(), 'original-topic': msg.topic().encode()}
)
self.dlq_producer.flush()import signal
import sys
from confluent_kafka import Consumer
class GracefulConsumer:
def __init__(self):
self.consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'graceful-consumer',
'enable.auto.commit': False,
'session.timeout.ms': 30000
})
self.consumer.subscribe(['orders'])
self.running = True
# Регистрация signal handlers
signal.signal(signal.SIGINT, self.shutdown)
signal.signal(signal.SIGTERM, self.shutdown)
def shutdown(self, signum, frame):
print('Shutting down consumer...')
self.running = False
def consume(self):
try:
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg and not msg.error():
self.process(msg)
self.consumer.commit(msg)
finally:
# Commit перед закрытием
self.consumer.commit(asynchronous=False)
self.consumer.close()
print('Consumer closed')
def process(self, msg):
# Обработка сообщения
passfrom confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
})
# Получение метрик
metrics = c.metrics()
# Пример метрик
print(metrics['brokers']['localhost:9092']['fetchq_size']) # Размер очереди fetch
print(metrics['topics']['orders']['rxmsgs']) # Получено сообщений
print(metrics['topics']['orders']['rxbytes']) # Получено байт
print(metrics['consumer']['rebalance_cnt']) # Количество rebalancefrom confluent_kafka import Consumer, TopicPartition, KafkaError
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
})
c.subscribe(['orders'])
def get_consumer_lag():
"""Получение lag для всех партиций"""
lag_info = {}
# Получение assignment
assignment = c.assignment()
for tp in assignment:
# Low watermark (earliest offset)
low, high = c.get_watermark_offsets(tp)
# Текущий position
position = c.position([tp])[0]
# Lag = high - position
lag = high - position.offset if position.offset >= 0 else 0
lag_info[f'partition_{tp.partition}'] = {
'low': low,
'high': high,
'position': position.offset,
'lag': lag
}
return lag_info
# Проверка lag
lag = get_consumer_lag()
for partition, info in lag.items():
print(f'{partition}: lag={info["lag"]}, position={info["position"]}')PRODUCTION_CONFIG = {
# Брокеры
'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
# Consumer group
'group.id': 'order-processors',
'group.instance.id': 'consumer-1', # Статический ID для stable rebalance
# Offset management
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
# Session management
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 10000,
'max.poll.interval.ms': 300000,
# Fetch настройки
'fetch.min.bytes': 1024, # Минимальный размер fetch
'fetch.max.wait.ms': 500, # Максимальное ожидание fetch
'fetch.max.bytes': 52428800, # Максимальный размер fetch (50MB)
# Обработка ошибок
'socket.timeout.ms': 30000,
'socket.keepalive.enable': True
}
c = Consumer(PRODUCTION_CONFIG)# Static membership уменьшает частоту rebalance
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'group.instance.id': 'consumer-1', # Уникальный статический ID
'session.timeout.ms': 30000
})При static membership consumer сохраняет свои партиции при временном отключении (в течение session.timeout.ms).
В следующей теме вы изучите Consumer Groups:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.