Введение в Apache Kafka: основные концепции, архитектура, топологи, партиции, репликация.
Apache Kafka — это распределённая потоковая платформа для построения event-driven архитектур. В этой теме вы изучите фундаментальные концепции Kafka.
Apache Kafka — это распределённая система обмена сообщениями (message broker), разработанная LinkedIn и ставшая open-source проектом Apache в 2011 году.
Kafka отличается от традиционных message broker (RabbitMQ, ActiveMQ) несколькими ключевыми особенностями:
Kafka используется для:
Broker (брокер) — это сервер Kafka, который хранит данные и обслуживает клиентов. Один брокер может обрабатывать тысячи партиций и обслуживать сотни клиентов.
Cluster (кластер) — группа брокеров, работающих вместе. Кластер обеспечивает:
┌─────────────────────────────────────────────────┐
│ Kafka Cluster │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ :9092 │ │ :9093 │ │ :9094 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────┘
Традиционно Kafka использует ZooKeeper для хранения метаданных кластера и координации брокеров:
Важно: Начиная с Kafka 3.0, появляется режим KRaft (Kafka Raft), который устраняет зависимость от ZooKeeper. KRaft использует встроенный консенсус-алгоритм Raft.
# Docker Compose для локальной разработки с ZooKeeper
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1Topic — это логическая категория сообщений. Producer отправляют сообщения в топики, а consumer читают из топиков.
Примеры топов:
user-events — события пользователей (регистрация, логин, покупка)orders — заказы интернет-магазинаlogs — логи приложенийnotifications — уведомления для пользователейКаждый топик разделяется на партиции — физические сегменты данных. Партиции позволяют:
Topic: orders
┌────────────────────────────────────────────────────┐
│ Partition 0 Partition 1 Partition 2
│ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ │ msg: 0 │ │ msg: 0 │ │ msg: 0 │
│ │ msg: 1 │ │ msg: 1 │ │ msg: 1 │
│ │ msg: 2 │ │ msg: 2 │ │ msg: 2 │
│ │ msg: 3 │ │ msg: 3 │ │ msg: 3 │
│ │ msg: 4 │ │ │ │ msg: 4 │
│ └─────────┘ └─────────┘ └─────────┘
│ Broker 1 Broker 2 Broker 3
└────────────────────────────────────────────────────┘
Каждое сообщение в партиции имеет уникальный offset — порядковый номер, начиная с 0. Offset позволяет:
Важно: Порядок сообщений гарантируется только внутри одной партиции. Между партициями порядок не гарантируется.
При отправке сообщения producer может указать ключ:
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
# Сообщение с ключом — всегда попадает в одну партицию
p.produce(
topic='orders',
key=b'user_123', # Ключ
value=b'{"order_id": 456}',
partition=0 # Можно указать явно или оставить выбор Kafka
)
# Сообщение без ключа — round-robin между партициями
p.produce(
topic='orders',
value=b'{"order_id": 789}'
)Правило выбора партиции:
hash(key) % num_partitionsКаждая партиция реплицируется на несколько брокеров. Одна реплика — leader, остальные — follower.
Partition 0 (replication.factor=3)
┌──────────────────────────────────────┐
│ Broker 1 Broker 2 Broker 3│
│ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ │ LEADER │───▶│ FOLLOWER│ │ FOLLOWER│
│ │ │ │ │ │ │
│ └─────────┘ └─────────┘ └─────────┘
└──────────────────────────────────────┘
ISR — множество реплик, которые полностью синхронизированы с лидером. Только ISR-реплики могут стать новым лидером при failover.
Если follower отстаёт от лидера больше чем на replica.lag.time.max.ms, он исключается из ISR.
Настройка min.insync.replicas определяет минимальное количество ISR-реплик для подтверждения записи:
# Producer с гарантией репликации
p = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all', # Ждём подтверждения от всех ISR
'min.insync.replicas': 2 # Минимум 2 реплики должны быть в ISR
})Kafka хранит сообщения определённое время, даже если они уже прочитаны. Настройки:
retention.ms — время хранения в миллисекундахretention.bytes — максимальный размер лога на партицию# Создание топика с настройками хранения
from confluent_kafka.admin import AdminClient, NewTopic
admin = AdminClient({'bootstrap.servers': 'localhost:9092'})
topics = [
NewTopic(
name='user-events',
num_partitions=6,
replication_factor=3,
config={
'retention.ms': '604800000', # 7 дней
'retention.bytes': '1073741824' # 1 GB
}
)
]
admin.create_topics(topics)Для некоторых use cases полезно хранить только последнее значение для каждого ключа. Например:
Для этого используется cleanup.policy=compact:
topics = [
NewTopic(
name='user-profiles',
num_partitions=3,
replication_factor=3,
config={
'cleanup.policy': 'compact', # Уплотнение вместо удаления по времени
'min.compaction.lag.ms': '60000' # Минимальная задержка перед compaction
}
)
]Compaction гарантирует, что для каждого ключа останется хотя бы последнее сообщение.
Producer — клиент, который публикует сообщения в топики Kafka.
from confluent_kafka import Producer
def delivery_report(err, msg):
"""Callback для подтверждения доставки"""
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
p = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all',
'retries': 5,
'retry.backoff.ms': 100
})
# Отправка сообщения
p.produce(
topic='orders',
key=b'user_123',
value=b'{"order_id": 456, "amount": 99.99}',
callback=delivery_report
)
# Flush ожидает отправки всех сообщений
p.flush()Consumer — клиент, который читает сообщения из топиков Kafka.
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest', # С какого offset начать: earliest или latest
'enable.auto.commit': False # Ручной commit для надёжности
})
c.subscribe(['orders'])
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise msg.error()
print(f'Received: {msg.value().decode()}')
# Ручной commit после обработки
c.commit(msg)
except KeyboardInterrupt:
pass
finally:
c.close()Официальный клиент от Confluent (обёртка над librdkafka на C).
# Установка
# pip install confluent-kafka
from confluent_kafka import Producer, Consumer
p = Producer({'bootstrap.servers': 'localhost:9092'})
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
})Преимущества:
Недостатки:
Чистый Python без внешних зависимостей.
# Установка
# pip install kafka-python
from kafka import KafkaProducer, KafkaConsumer
p = KafkaProducer(bootstrap_servers='localhost:9092')
c = KafkaConsumer('orders', bootstrap_servers='localhost:9092')Преимущества:
Недостатки:
| Метрика | confluent-kafka | kafka-python |
|---|---|---|
| Producer throughput | ~500K msg/s | ~50K msg/s |
| Consumer throughput | ~400K msg/s | ~40K msg/s |
| Latency (p99) | ~5ms | ~20ms |
| CPU usage | Низкое | Высокое |
# docker-compose.kafka.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
# Schema Registry для Avro
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
# UI для просмотра топиков
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181Запуск:
docker-compose -f docker-compose.kafka.yml up -d# Подключиться к контейнеру
docker exec -it <kafka-container-id> bash
# Создать топик
kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic orders \
--partitions 3 \
--replication-factor 1
# Описать топик
kafka-topics --describe \
--bootstrap-server localhost:9092 \
--topic orders
# Отправить сообщение
kafka-console-producer --bootstrap-server localhost:9092 --topic orders
> {"order_id": 1, "amount": 99.99}
# Прочитать сообщения
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic orders --from-beginningВ следующей теме вы изучите Producer API:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.