Мониторинг Kafka: метрики, логи, Prometheus, Grafana. Локальная разработка с Docker Compose.
Мониторинг критически важен для production Kafka-приложений. В этой теме вы изучите метрики Kafka, Prometheus, Grafana, логирование и локальную разработку с Docker Compose.
Kafka экспортирует множество метрик через JMX (Java Management Extensions).
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
# Получение метрик
metrics = p.metrics()
# Метрики broker
broker_metrics = metrics['brokers']['localhost:9092']
print(f'Outbuf messages: {broker_metrics["outbuf_msg_cnt"]}') # Сообщения в очереди
print(f'Outbuf bytes: {broker_metrics["outbuf_byte_cnt"]}') # Байты в очереди
# Метрики топика
topic_metrics = metrics['topics']['orders']
print(f'Transmitted messages: {topic_metrics["txmsgs"]}')
print(f'Transmitted bytes: {topic_metrics["txbytes"]}')
print(f'Batch count: {topic_metrics["tx_batches"]}')from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
})
metrics = c.metrics()
# Метрики consumer group
consumer_metrics = metrics['consumer']
print(f'Rebalance count: {consumer_metrics["rebalance_cnt"]}')
print(f'Fetch queue size: {consumer_metrics["fetchq_size"]}')
# Метрики топика
topic_metrics = metrics['topics']['orders']
print(f'Received messages: {topic_metrics["rxmsgs"]}')
print(f'Received bytes: {topic_metrics["rxbytes"]}')from confluent_kafka import Consumer, TopicPartition
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
})
c.subscribe(['orders'])
def get_lag_metrics():
"""Получение lag для всех партиций"""
lag_data = {}
for tp in c.assignment():
# Watermark offsets
low, high = c.get_watermark_offsets(tp)
# Current position
position = c.position([tp])[0]
# Lag calculation
lag = high - position.offset if position.offset >= 0 else 0
lag_data[tp.partition] = {
'low': low,
'high': high,
'position': position.offset,
'lag': lag
}
return lag_data
# Использование
lag = get_lag_metrics()
total_lag = sum(p['lag'] for p in lag.values())
print(f'Total lag: {total_lag}')# kafka-jmx-exporter.yml
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
- "9999:9999" # JMX port
environment:
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: localhost
KAFKA_OPTS: "-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Djava.rmi.server.hostname=localhost"
jmx-exporter:
image: bitnami/jmx-exporter:latest
ports:
- "5556:5556"
environment:
JMX_EXPORTER_PORT: 5556
JMX_EXPORTER_HOSTNAME: localhost
JMX_EXPORTER_START_DELAY_SECONDS: 30
volumes:
- ./jmx-kafka-prometheus.yml:/opt/bitnami/jmx-exporter/config.yml# jmx-kafka-prometheus.yml
lowercaseOutputName: true
rules:
- pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
- pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
broker: "$4:$5"
- pattern: kafka.consumer<type=(.+), client-id=(.+), topic=(.+), partition=(.*)><>Value
name: kafka_consumer_$1
type: GAUGE
labels:
clientId: "$2"
topic: "$3"
partition: "$4"# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['jmx-exporter:5556']
- job_name: 'kafka-consumer'
static_configs:
- targets: ['app:8000'] # Ваше приложение с метриками
metrics_path: '/metrics'# docker-compose.monitoring.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:v2.47.0
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--storage.tsdb.retention.time=200h'
- '--web.enable-lifecycle'
grafana:
image: grafana/grafana:10.1.0
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: admin
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
depends_on:
- prometheus
volumes:
prometheus_data:
grafana_data:# grafana/provisioning/datasources/prometheus.yml
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
editable: true# grafana/provisioning/dashboards/kafka.yml
apiVersion: 1
providers:
- name: 'Kafka Dashboards'
orgId: 1
folder: 'Kafka'
folderUid: ''
type: file
disableDeletion: false
updateIntervalSeconds: 10
allowUiUpdates: true
options:
path: /etc/grafana/provisioning/dashboards{
"dashboard": {
"title": "Kafka Consumer Lag",
"panels": [
{
"title": "Consumer Lag by Partition",
"type": "graph",
"targets": [
{
"expr": "kafka_consumer_lag{group_id=\"order-processors\"}",
"legendFormat": "Partition {{partition}}"
}
]
},
{
"title": "Total Consumer Lag",
"type": "stat",
"targets": [
{
"expr": "sum(kafka_consumer_lag{group_id=\"order-processors\"})",
"legendFormat": "Total Lag"
}
]
}
]
}
}import logging
import json
from confluent_kafka import Producer, Consumer
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('kafka.app')
class JsonFormatter(logging.Formatter):
"""JSON formatter для структурированного логирования"""
def format(self, record):
log_record = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# Добавление extra полей
if hasattr(record, 'topic'):
log_record['topic'] = record.topic
if hasattr(record, 'partition'):
log_record['partition'] = record.partition
if hasattr(record, 'offset'):
log_record['offset'] = record.offset
return json.dumps(log_record)
# Использование
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)
class LoggedProducer:
def __init__(self):
self.producer = Producer({'bootstrap.servers': 'localhost:9092'})
logger.info('Producer initialized', extra={'bootstrap': 'localhost:9092'})
def produce(self, topic, key, value):
logger.info(
f'Producing message to {topic}',
extra={'topic': topic, 'key': key}
)
self.producer.produce(topic, key=key, value=value)
self.producer.poll(0)import uuid
from contextvars import ContextVar
# Context variable для correlation ID
correlation_id_var: ContextVar[str] = ContextVar('correlation_id', default='')
def set_correlation_id(correlation_id: str = None):
"""Установка correlation ID для трассировки"""
correlation_id = correlation_id or str(uuid.uuid4())
correlation_id_var.set(correlation_id)
return correlation_id
class CorrelationJsonFormatter(JsonFormatter):
def format(self, record):
log_record = super().format(record)
log_dict = json.loads(log_record)
log_dict['correlation_id'] = correlation_id_var.get()
return json.dumps(log_dict)
# Использование в consumer
class TracedConsumer:
def __init__(self):
self.consumer = Consumer({...})
self.consumer.subscribe(['orders'])
def consume(self):
while True:
msg = self.consumer.poll(timeout=1.0)
if msg and not msg.error():
# Установка correlation ID из headers или генерация нового
correlation_id = self.extract_correlation_id(msg)
set_correlation_id(correlation_id)
logger.info(
f'Processing message',
extra={
'topic': msg.topic(),
'partition': msg.partition(),
'offset': msg.offset()
}
)
self.process_message(msg)
def extract_correlation_id(self, msg):
"""Извлечение correlation ID из headers"""
headers = msg.headers() or []
for key, value in headers:
if key == 'correlation_id':
return value.decode('utf-8')
return str(uuid.uuid4())# docker-compose.dev.yml
version: '3.8'
services:
# ZooKeeper
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- zookeeper_data:/var/lib/zookeeper/data
# Kafka
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9999:9999"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
volumes:
- kafka_data:/var/lib/kafka/data
# Schema Registry
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: BACKWARD
# Kafka UI
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
# Prometheus
prometheus:
image: prom/prometheus:v2.47.0
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
# Grafana
grafana:
image: grafana/grafana:10.1.0
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: admin
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
depends_on:
- prometheus
volumes:
zookeeper_data:
kafka_data:
prometheus_data:
grafana_data:# Запуск всего стека
docker-compose -f docker-compose.dev.yml up -d
# Проверка статуса
docker-compose -f docker-compose.dev.yml ps
# Просмотр логов Kafka
docker-compose -f docker-compose.dev.yml logs -f kafka
# Остановка
docker-compose -f docker-compose.dev.yml down
# Остановка с удалением volumes
docker-compose -f docker-compose.dev.yml down -vfrom confluent_kafka import Producer, Consumer
import logging
# Включение debug логирования
logging.basicConfig(level=logging.DEBUG)
p = Producer({
'bootstrap.servers': 'localhost:9092',
'debug': 'broker,topic,msg' # Категории отладки
})
# Категории debug:
# generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, all# Список consumer groups
kafka-consumer-groups --bootstrap-server localhost:9092 --list
# Описание consumer group
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group order-processors
# Reset offset (dry run)
kafka-consumer-groups --bootstrap-server localhost:9092 \
--group order-processors --reset-offsets --to-earliest \
--topic orders
# Reset offset (execute)
kafka-consumer-groups --bootstrap-server localhost:9092 \
--group order-processors --reset-offsets --to-earliest \
--topic orders --executeKafka UI (http://localhost:8080) предоставляет:
# prometheus-alerts.yml
groups:
- name: kafka
rules:
- alert: HighConsumerLag
expr: sum(kafka_consumer_lag{group_id="order-processors"}) > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "High consumer lag detected"
description: "Consumer lag is {{ $value }} for group order-processors"
- alert: ConsumerGroupDown
expr: count(kafka_consumer_group_members{group_id="order-processors"}) == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Consumer group is down"
description: "No active members in consumer group order-processors"
- alert: KafkaBrokerDown
expr: up{job="kafka"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka broker is down"
description: "Broker {{ $labels.instance }} is not responding"# alertmanager.yml
global:
smtp_smarthost: 'localhost:587'
smtp_from: 'alertmanager@example.com'
route:
group_by: ['alertname', 'severity']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'email-notifications'
routes:
- match:
severity: critical
receiver: 'pagerduty'
receivers:
- name: 'email-notifications'
email_configs:
- to: 'team@example.com'
- name: 'pagerduty'
pagerduty_configs:
- service_key: '<pagerduty-service-key>'# docker-compose.observability.yml
version: '3.8'
services:
# Jaeger для distributed tracing
jaeger:
image: jaegertracing/all-in-one:1.49
ports:
- "16686:16686" # UI
- "14268:14268" # Collector
environment:
- COLLECTOR_OTLP_ENABLED=true
# Loki для логирования
loki:
image: grafana/loki:2.9.0
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml
# Promtail для сбора логов
promtail:
image: grafana/promtail:2.9.0
volumes:
- /var/log:/var/log
- ./promtail-config.yml:/etc/promtail/config.yml
command: -config.file=/etc/promtail/config.ymlВ следующей теме вы изучите Интеграцию с веб-приложениями:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.