Группы потребителей, балансировка нагрузки, протоколы rebalance, координатор групп.
Consumer Groups позволяют масштабировать потребление сообщений между несколькими consumer. В этой теме вы изучите балансировку нагрузки, протоколы rebalance и стратегии assignment партиций.
Consumer Group — группа consumer, которые совместно потребляют сообщения из топиков. Каждая партиция обслуживается только одним consumer в группе.
Топик: orders (6 партиций)
Consumer Group: order-processors (3 consumer)
┌─────────────────────────────────────────────────────┐
│ Consumer Group │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ │ Consumer 1 │ │ Consumer 2 │ │ Consumer 3 │
│ │ │ │ │ │ │
│ │ Partition 0 │ │ Partition 2 │ │ Partition 4 │
│ │ Partition 1 │ │ Partition 3 │ │ Partition 5 │
│ └──────────────┘ └──────────────┘ └──────────────┘
└─────────────────────────────────────────────────────┘
from confluent_kafka import Consumer
# Все consumer с одинаковым group.id входят в одну группу
c1 = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors' # Одинаковый для всех consumer группы
})
c2 = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors' # Тот же group.id
})
c3 = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors' # Тот же group.id
})from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors'
})
# Подписка на один топик
c.subscribe(['orders'])
# Подписка на несколько топиков
c.subscribe(['orders', 'payments', 'shipments'])
# Подписка с regex
c.subscribe(['^events-.*'])# Пример: топик с 6 партициями, 3 consumer
# Consumer 1: партиции 0, 1
# Consumer 2: партиции 2, 3
# Consumer 3: партиции 4, 5
# Добавляем Consumer 4:
# Consumer 1: партиции 0, 1
# Consumer 2: партиции 2, 3
# Consumer 3: партиции 4
# Consumer 4: партиции 5# Список всех consumer groups
kafka-consumer-groups --bootstrap-server localhost:9092 --list
# Описание consumer group
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group order-processors
# Вывод:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# order-processors orders 0 100 150 50
# order-processors orders 1 200 200 0
# order-processors orders 2 150 180 30При eager rebalance все consumer останавливаются на время rebalance:
from confluent_kafka import Consumer
def on_assign(consumer, partitions):
print(f'Assigned: {[p.partition for p in partitions]}')
def on_revoke(consumer, partitions):
print(f'Revoked: {[p.partition for p in partitions]}')
# Важно: commit offset перед rebalance!
consumer.commit(asynchronous=False)
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'partition.assignment.strategy': 'range' # или 'roundrobin'
})
c.subscribe(['orders'], on_assign=on_assign, on_revoke=on_revoke)Проблема eager rebalance: "Stop-the-world" — все consumer останавливаются на время rebalance.
Cooperative rebalance позволяет consumer продолжать работу во время rebalance:
from confluent_kafka import Consumer, CooperativeStickyAssignor
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'partition.assignment.strategy': 'cooperative-sticky'
})
def on_assign(consumer, partitions):
print(f'Assigned (incremental): {[p.partition for p in partitions]}')
# Не нужно останавливать обработку
def on_revoke(consumer, partitions):
print(f'Revoked (incremental): {[p.partition for p in partitions]}')
# Отдаём только указанные партиции, не все
c.subscribe(['orders'], on_assign=on_assign, on_revoke=on_revoke)Преимущества cooperative rebalance:
Распределяет партиции по диапазону:
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'partition.assignment.strategy': 'range'
})Топик orders (6 партиций), 2 consumer:
- Consumer 1: партиции 0, 1, 2
- Consumer 2: партиции 3, 4, 5
Топики orders + payments (по 3 партиции), 2 consumer:
- Consumer 1: orders-0, orders-1, orders-2, payments-0
- Consumer 2: payments-1, payments-2
Проблема: может возникнуть дисбаланс при нескольких топиках.
Распределяет партиции по кругу:
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'partition.assignment.strategy': 'roundrobin'
})Топики orders (3 партиции) + payments (3 партиции), 2 consumer:
- Consumer 1: orders-0, payments-1, orders-2
- Consumer 2: payments-0, orders-1, payments-2
Преимущество: более равномерное распределение.
Инкрементальное распределение с минимальными изменениями:
from confluent_kafka import Consumer, CooperativeStickyAssignor
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'partition.assignment.strategy': 'cooperative-sticky'
})Было (3 consumer, 6 партиций):
- C1: 0, 1
- C2: 2, 3
- C3: 4, 5
Добавили C4:
- C1: 0, 1 (без изменений)
- C2: 2, 3 (без изменений)
- C3: 4 (отдал 5)
- C4: 5 (получил 5)
Group Coordinator — брокер Kafka, который управляет consumer group:
1. Consumer отправляет JoinGroup request координатору
2. Координатор выбирает leader группы
3. Leader запрашивает metadata топиков
4. Leader выполняет assignment партиций
5. Leader отправляет SyncGroup request координатору
6. Координатор рассылает assignment всем consumer
7. Consumer начинают обработку
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'session.timeout.ms': 30000, # Таймаут сессии
'heartbeat.interval.ms': 10000 # Интервал heartbeat
})
# Heartbeat отправляется автоматически при вызове poll()
# Если heartbeat не поступает session.timeout.ms — consumer считается мёртвымПравило: heartbeat.interval.ms должен быть меньше session.timeout.ms.
from confluent_kafka import Consumer
class RebalanceHandler:
def __init__(self, consumer):
self.consumer = consumer
self.pending_commits = []
def on_assign(self, consumer, partitions):
"""Вызывается при назначении партиций"""
print(f'Assigned partitions: {[p.partition for p in partitions]}')
# Сброс offset при необходимости
for p in partitions:
if p.partition == 0:
p.offset = 'earliest' # Начать с начала
consumer.assign(partitions)
def on_revoke(self, consumer, partitions):
"""Вызывается при отзыве партиций"""
print(f'Revoked partitions: {[p.partition for p in partitions]}')
# Commit offset перед rebalance
consumer.commit(asynchronous=False)
# Сохранение состояния для обработки
self.save_state()
def on_lost(self, consumer, partitions):
"""Вызывается при потере партиций (cooperative rebalance)"""
print(f'Lost partitions: {[p.partition for p in partitions]}')
# Не commit — партиции потеряны неожиданно
def save_state(self):
"""Сохранение состояния обработки"""
# Сохранить в базу данных или кэш
pass
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'enable.auto.commit': False
})
handler = RebalanceHandler(c)
c.subscribe(
['orders'],
on_assign=handler.on_assign,
on_revoke=handler.on_revoke,
on_lost=handler.on_lost
)Static membership уменьшает частоту rebalance при временных отключениях:
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'group.instance.id': 'consumer-1', # Статический ID
'session.timeout.ms': 30000
})Преимущество: при restart consumer сохраняет свои партиции (в течение session.timeout.ms).
from confluent_kafka import Consumer, TopicPartition
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
})
c.subscribe(['orders'])
def get_lag_info():
"""Получение информации о lag для всех партиций"""
lag_info = {}
assignment = c.assignment()
for tp in assignment:
low, high = c.get_watermark_offsets(tp)
position = c.position([tp])[0]
lag = high - position.offset if position.offset >= 0 else 0
lag_info[tp.partition] = {
'low': low,
'high': high,
'position': position.offset,
'lag': lag
}
return lag_info
# Проверка lag
lag_info = get_lag_info()
total_lag = sum(info['lag'] for info in lag_info.values())
print(f'Total lag: {total_lag}')
for partition, info in lag_info.items():
print(f'Partition {partition}: lag={info["lag"]}, position={info["position"]}')from prometheus_client import Counter, Histogram, Gauge
from confluent_kafka import Consumer
# Метрики
messages_consumed = Counter('kafka_consumer_messages_total', 'Total messages consumed')
consumer_lag = Gauge('kafka_consumer_lag', 'Consumer lag by partition', ['partition'])
rebalance_count = Counter('kafka_rebalance_total', 'Total rebalance events')
class MonitoredConsumer:
def __init__(self):
self.consumer = Consumer({...})
self.consumer.subscribe(['orders'], on_assign=self.on_assign)
def on_assign(self, consumer, partitions):
rebalance_count.inc()
def consume(self):
while True:
msg = self.consumer.poll(timeout=1.0)
if msg and not msg.error():
messages_consumed.inc()
# Обновление метрик lag
lag_info = self.get_lag_info()
for partition, lag in lag_info.items():
consumer_lag.labels(partition=partition).set(lag)import signal
import sys
from confluent_kafka import Consumer
class GracefulConsumer:
def __init__(self):
self.consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'enable.auto.commit': False,
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 10000
})
self.running = True
self.pending_messages = []
signal.signal(signal.SIGINT, self.shutdown)
signal.signal(signal.SIGTERM, self.shutdown)
def shutdown(self, signum, frame):
print('Initiating graceful shutdown...')
self.running = False
def on_revoke(self, consumer, partitions):
print(f'Revoking partitions: {[p.partition for p in partitions]}')
# Обработка pending сообщений
self.process_pending()
# Commit offset
consumer.commit(asynchronous=False)
def consume(self):
self.consumer.subscribe(['orders'], on_revoke=self.on_revoke)
try:
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg and not msg.error():
self.pending_messages.append(msg)
# Batch обработка
if len(self.pending_messages) >= 100:
self.process_pending()
self.consumer.commit(self.pending_messages[-1])
self.pending_messages = []
finally:
# Финальная обработка
if self.pending_messages:
self.process_pending()
self.consumer.commit(self.pending_messages[-1])
self.consumer.close()
def process_pending(self):
for msg in self.pending_messages:
self.process_message(msg)
def process_message(self, msg):
# Бизнес-логика
passRebalance storm — частые rebalance из-за нестабильных consumer:
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
# Увеличенные таймауты для стабильности
'session.timeout.ms': 45000, # Увеличено с 30s
'heartbeat.interval.ms': 15000, # Увеличено с 10s
'max.poll.interval.ms': 600000, # 10 минут на обработку
# Static membership
'group.instance.id': 'consumer-1',
# Cooperative rebalance
'partition.assignment.strategy': 'cooperative-sticky'
})import logging
from confluent_kafka import Consumer
logging.basicConfig(level=logging.DEBUG)
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'debug': 'cgrp,topic,fetch' # Debug категории
})
c.subscribe(['orders'])
while True:
msg = c.poll(timeout=1.0)
if msg and not msg.error():
logging.info(f'Received message from partition {msg.partition()}')# Описание consumer group
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group order-processors
# Reset offset
kafka-consumer-groups --bootstrap-server localhost:9092 \
--group order-processors --reset-offsets --to-earliest \
--topic orders --execute
# Dry run (без применения)
kafka-consumer-groups --bootstrap-server localhost:9092 \
--group order-processors --reset-offsets --to-latest \
--topic ordersВ следующей теме вы изучите Kafka Streams и Faust:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.