Распределённый лог событий: topics, partitions, consumer groups, offset management. Отличия Kafka от традиционных брокеров.
Распределённый лог событий: topics, partitions, consumer groups, offset management. Отличия Kafka от традиционных брокеров.
Kafka — это не просто брокер сообщений, а распределённая платформа потоковой обработки событий.
| Характеристика | RabbitMQ | Apache Kafka |
|---|---|---|
| Модель | Очередь с маршрутизацией | Распределённый лог событий |
| Хранение | Сообщение удаляется после ACK | Сообщения хранятся в логе (retention до лет) |
| Потребление | Один потребитель получает сообщение | Multiple consumers читают независимо |
| Масштабирование | Очереди масштабируются вертикально | Партиции масштабируются горизонтально |
| Throughput | ~10K msg/s | ~1M msg/s |
| Latency | ~1ms | ~10ms |
| Replay | Нет (сообщение удаляется) | Да (можно читать старые сообщения) |
| Идеально для | Task queues, RPC | Event sourcing, стримы, CQRS |
┌─────────────┐ ┌──────────────────────────────────────┐
│ Producer │ ──> │ Kafka Cluster │
└─────────────┘ │ ┌────────────────────────────────┐ │
│ │ Topic: orders │ │
│ │ ┌──────────┬──────────┬──────┐│ │
│ │ │Partition │Partition │ ... ││ │
│ │ │ 0 │ 1 │ ││ │
│ │ │ [log] │ [log] │ ││ │
│ │ └──────────┴──────────┴──────┘│ │
│ └────────────────────────────────┘ │
└──────────────────────────────────────┘
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Consumer │ │ Consumer │ │ Consumer │
│ Group A │ │ Group B │ │ Group C │
└──────────────┘ └──────────────┘ └──────────────┘
Логическая категория сообщений. Аналог таблицы в БД или очереди в RabbitMQ.
orders
├─ Partition 0: [msg0, msg3, msg6, ...]
├─ Partition 1: [msg1, msg4, msg7, ...]
└─ Partition 2: [msg2, msg5, msg8, ...]
Особенности:
Физический сегмент топика — упорядоченный лог сообщений.
Partition 0:
Offset 0: {"order_id": 1, "ts": "10:00:00"}
Offset 1: {"order_id": 4, "ts": "10:00:05"}
Offset 2: {"order_id": 7, "ts": "10:00:10"}
...
Важно:
Набор потребителей, совместно обрабатывающих топик.
Consumer Group: order-processors
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Consumer 1 │ │ Consumer 2 │ │ Consumer 3 │
│ Partition 0 │ │ Partition 1 │ │ Partition 2 │
└─────────────┘ └─────────────┘ └─────────────┘
Правила:
Позиция потребителя в партиции — номер последнего прочитанного сообщения.
Consumer читает Partition 0:
Offset 0: ✓ (обработано)
Offset 1: ✓ (обработано)
Offset 2: → (следующее для чтения)
Offset 3: (ещё не читал)
Commit offset:
from faststream import FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker(
"localhost:9092",
bootstrap_servers=["localhost:9092", "localhost:9093"], # Для кластера
security_protocol="SASL_SSL", # Для production
sasl_mechanism="PLAIN",
sasl_plain_username="user",
sasl_plain_password="password"
)
app = FastStream(broker)from pydantic import BaseModel
class Order(BaseModel):
id: int
amount: float
@broker.publisher("orders")
async def create_order(order: Order):
await broker.publish(order, "orders")@broker.subscriber(
"orders",
group_id="order-processors", # Consumer group
auto_commit=True, # Автоматический коммит offset
auto_offset_reset="earliest" # С чего начать, если offset не найден
)
async def process_order(order: Order):
print(f"Processing order {order.id}")По умолчанию Kafka автоматически распределяет партиции между потребителями.
Стратегии:
range — партиции распределяются по диапазонуroundrobin — равномерное распределениеsticky — минимизация перераспределений при ребалансе@broker.subscriber(
"orders",
group_id="order-processors",
partition_assignment_strategy="roundrobin"
)
async def process_order(order: Order):
...from faststream.kafka import KafkaMessage
@broker.subscriber("orders", group_id="order-processors", auto_commit=False)
async def process_order(order: Order, message: KafkaMessage):
try:
await db.save(order)
await message.commit() # Ручной коммит после обработки
except Exception:
await message.nack() # Отрицательное подтверждениеОбработка сообщений пачками для производительности:
@broker.subscriber(
"orders",
group_id="order-processors",
batch=True,
max_batch_size=100, # Максимум сообщений в batch
batch_timeout_ms=1000 # Ждать batch не дольше 1 секунды
)
async def process_orders(orders: list[Order]):
# Обработать сразу 100 заказов
await db.bulk_save(orders)Kafka не поддерживает отложенные сообщения нативно, но можно использовать:
# Через timestamp в сообщении
await broker.publish(
{"order_id": 123, "process_at": "2026-03-30T15:00:00"},
"orders.delayed"
)
# Потребитель фильтрует по времени
@broker.subscriber("orders.delayed", group_id="delayed-processor")
async def process_delayed(data: dict, message: KafkaMessage):
if datetime.fromisoformat(data["process_at"]) > datetime.now():
await message.nack() # Вернуть в очередь
return
# ОбработатьТопики с compaction хранят только последнее значение по ключу:
# Топик с compaction (настраивается на брокере)
# cleanup.policy=compact
# Сообщения:
{"user_id": 1, "status": "active"}
{"user_id": 1, "status": "inactive"}
{"user_id": 1, "status": "active"} # Останется только это
# После compaction в топе останется только последнее значение для user_id=1Сценарий: хранение состояния (user status, config changes).
Гарантия отсутствия дубликатов при retry:
broker = KafkaBroker(
"localhost:9092",
enable_idempotence=True, # Идемпотентная публикация
acks="all" # Подтверждение от всех реплик
)Атомарная публикация в несколько топиков:
from faststream.kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
transactional_id="order-tx"
)
async with producer.transaction():
await producer.publish(order, "orders")
await producer.publish(event, "order-events")
# Или всё откатится при exceptionKafka идеально подходит для event sourcing — хранения истории событий.
from pydantic import BaseModel
from enum import Enum
class OrderEventType(str, Enum):
CREATED = "order.created"
PAID = "order.paid"
SHIPPED = "order.shipped"
CANCELLED = "order.cancelled"
class OrderEvent(BaseModel):
order_id: int
event_type: OrderEventType
payload: dict
timestamp: str
# Публикация событий
@broker.publisher("order-events")
async def emit_event(event: OrderEvent):
await broker.publish(event, "order-events")
# Потребитель для проекции (читает с начала)
@broker.subscriber(
"order-events",
group_id="order-projection",
auto_offset_reset="earliest" # Читать с самого начала
)
async def build_projection(event: OrderEvent):
# Строим текущее состояние из истории событий
if event.event_type == OrderEventType.CREATED:
await db.orders.insert(event.order_id, event.payload)
elif event.event_type == OrderEventType.PAID:
await db.orders.update(event.order_id, {"status": "paid"})
# ...
# Потребитель для уведомлений (только новые)
@broker.subscriber(
"order-events",
group_id="notification-service",
auto_offset_reset="latest" # Только новые события
)
async def send_notification(event: OrderEvent):
if event.event_type == OrderEventType.SHIPPED:
await send_email(event.order_id, "Ваш заказ отправлен")Отставание потребителя — разница между последним сообщением в партиции и закоммиченным offset.
# Через kafka-consumer-groups
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group order-processors
# Вывод:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
order-processors orders 0 1000 1500 500
order-processors orders 1 2000 2000 0LAG > 0 — потребитель не успевает обрабатывать.
Kafka экспортирует метрики через JMX Exporter:
kafka_consumer_lag — отставание по партициямkafka_consumer_records_consumed_rate — скорость потребленияkafka_topic_partition_current_offset — текущий offsetauto_offset_reset="earliest"Следующая тема — Redis Pub/Sub: лёгкий messaging для real-time уведомлений и простых сценариев.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.