Потоковая обработка данных: Kafka Streams, Faust, топологии, state stores, windowing.
Потоковая обработка позволяет обрабатывать данные из Kafka в реальном времени. В этой теме вы изучите Kafka Streams, Faust, топологии, state stores и windowing.
Потоковая обработка (stream processing) — обработка данных в реальном времени по мере их поступления, в отличие от пакетной обработки (batch).
Kafka Streams — библиотека для потоковой обработки от создателей Kafka.
<!-- Maven -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.6.0</version>
</dependency>import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Properties;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Чтение из топика
KStream<String, String> orders = builder.stream("orders");
// Фильтрация
KStream<String, String> largeOrders = orders
.filter((key, value) -> {
Order order = parseOrder(value);
return order.getAmount() > 100;
});
// Запись в другой топик
largeOrders.to("large-orders", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();Faust — библиотека потоковой обработки для Python, вдохновлённая Kafka Streams.
pip install faust
# или
pip install faust-streaming # Актуальный форк# worker --запуск consumer/processor
faust -A app worker -l info
# С несколькими workers для параллелизма
faust -A app worker -l info --workers=4# app.py
import faust
from faust import Stream, Record
# Определение модели
class Order(Record):
order_id: int
user_id: int
amount: float
status: str
# Создание приложения
app = faust.App(
'order-processor',
broker='kafka://localhost:9092',
value_serializer='json'
)
# Топик
orders_topic = app.topic('orders', value_type=Order)
# Agent — обработчик сообщений
@app.agent(orders_topic)
async def process_order(orders: Stream[Order]) -> None:
async for order in orders:
print(f'Processing order: {order.order_id}')
if order.amount > 1000:
await high_value_orders.send(value=order)
# Другой топик для больших заказов
high_value_orders = app.topic('high-value-orders', value_type=Order)
@app.agent(high_value_orders)
async def process_high_value(orders: Stream[Order]) -> None:
async for order in orders:
print(f'High value order: {order.order_id}, Amount: {order.amount}')Топология — граф обработки данных: источники (source topics), процессоры (operators), стоки (sink topics).
Source Topic → Filter → Transform → Aggregate → Sink Topic
import faust
from faust import Stream
app = faust.App('my-app', broker='kafka://localhost:9092')
orders_topic = app.topic('orders')
@app.agent(orders_topic)
async def process(orders: Stream) -> None:
# Filter — фильтрация
large_orders = orders.filter(lambda order: order['amount'] > 100)
# Map — трансформация
order_ids = large_orders.map(lambda order: order['order_id'])
# Take — ограничение количества
recent = orders.take(10)
# Group by — группировка по ключу
by_user = orders.group_by(lambda order: order['user_id'])
# Join — соединение двух потоков
# payments_topic = app.topic('payments')
# joined = orders.join(payments_topic, lambda o: o['user_id'])
# Windowed aggregation — агрегация по окнам
# См. раздел Windowing@app.agent(orders_topic)
async def process_chain(orders: Stream) -> None:
async for order in (
orders
.filter(lambda o: o['amount'] > 100)
.map(lambda o: {'user_id': o['user_id'], 'total': o['amount']})
.take(100)
):
print(order)State Store — локальное хранилище состояния для stateful операций (агрегации, join).
import faust
app = faust.App('order-tracker', broker='kafka://localhost:9092')
# Table — хранилище состояния с ключом
order_totals = app.Table('order_totals', default=int)
orders_topic = app.topic('orders')
@app.agent(orders_topic)
async def track_orders(orders: Stream) -> None:
async for order in orders:
user_id = order['user_id']
amount = order['amount']
# Обновление состояния
order_totals[user_id] += amount
# Чтение состояния
total = order_totals[user_id]
print(f'User {user_id} total: {total}')Faust автоматически создаёт changelog topic для персистентности:
# order_totals-changelog — топик для восстановления состояния
# При restart Faust читает changelog и восстанавливает table# Global Table — копия данных на каждом instance
user_profiles = app.GlobalTable('user-profiles')
@app.agent(orders_topic)
async def enrich_orders(orders: Stream) -> None:
async for order in orders:
# Доступ к данным любого ключа (не только локального)
profile = user_profiles[order['user_id']]
enriched = {**order, 'profile': profile}
await enriched_orders.send(value=enriched)Windowing — группировка событий по временным интервалам для агрегации.
| Тип | Описание | Пример |
|---|---|---|
| Tumbling | Неперекрывающиеся окна фиксированного размера | 5-минутные интервалы: [0-5), [5-10), [10-15) |
| Hopping | Перекрывающиеся окна | 10-минутные окна каждые 5 минут |
| Session | Окна по сессиям активности | Периоды активности пользователя |
from faust import Windowed
from datetime import timedelta
app = faust.App('page-views', broker='kafka://localhost:9092')
# Tumbling window: 1 минута
page_views = app.Table('page_views', default=int).tumbling(timedelta(minutes=1))
views_topic = app.topic('page-views')
@app.agent(views_topic)
async def count_views(views: Stream) -> None:
async for view in views:
page_id = view['page_id']
# Инкремент счётчика в текущем окне
page_views[page_id] += 1
# Получение значения для конкретного окна
window = page_views[page_id].now()
print(f'Page {page_id} views in current window: {window}')from datetime import timedelta
# Hopping window: 10 минут, шаг 5 минут
page_views = app.Table('page_views', default=int).hopping(
timedelta(minutes=10),
timedelta(minutes=5)
)from datetime import timedelta
# Session window: сессия заканчивается после 30 минут неактивности
user_sessions = app.Table('user_sessions', default=int).session(
timedelta(minutes=30)
)
@app.agent(views_topic)
async def track_sessions(views: Stream) -> None:
async for view in views:
user_id = view['user_id']
user_sessions[user_id] += 1from faust import TableT
@app.timer(interval=60.0)
async def print_window_stats():
"""Вывод статистики каждые 60 секунд"""
for page_id, window_value in page_views.items():
# window_value содержит значение для текущего окна
print(f'Page {page_id}: {window_value} views')orders_topic = app.topic('orders')
# Подсчёт количества заказов по пользователям
order_count = app.Table('order_count', default=int)
@app.agent(orders_topic)
async def count_orders(orders: Stream) -> None:
async for order in orders:
user_id = order['user_id']
order_count[user_id] += 1# Сумма заказов по пользователям
order_total = app.Table('order_total', default=float)
@app.agent(orders_topic)
async def sum_orders(orders: Stream) -> None:
async for order in orders:
user_id = order['user_id']
amount = order['amount']
order_total[user_id] += amountfrom faust import Stream
@app.agent(orders_topic)
async def reduce_orders(orders: Stream) -> None:
# Группировка и редюс
async for user_id, total in (
orders
.group_by(lambda order: order['user_id'])
.reduce(lambda acc, order: acc + order['amount'], default=0.0)
):
print(f'User {user_id} total: {total}')from dataclasses import dataclass
@dataclass
class OrderStats:
count: int = 0
total: float = 0.0
avg: float = 0.0
order_stats = app.Table('order_stats', default=OrderStats)
@app.agent(orders_topic)
async def aggregate_orders(orders: Stream) -> None:
async for order in orders:
user_id = order['user_id']
stats = order_stats[user_id]
stats.count += 1
stats.total += order['amount']
stats.avg = stats.total / stats.count
order_stats[user_id] = statsorders_topic = app.topic('orders')
payments_topic = app.topic('payments')
@app.agent(orders_topic)
async def join_orders_payments(orders: Stream) -> None:
# Join с payments по order_id
async for order in orders:
async for payment in orders.join(
payments_topic,
lambda o: o['order_id'],
timeout=30.0 # Ждать payment 30 секунд
):
if order['order_id'] == payment['order_id']:
print(f'Order {order["order_id"]} paid!')orders_topic = app.topic('orders')
users_table = app.Table('users') # User profile data
@app.agent(orders_topic)
async def enrich_orders_with_users(orders: Stream) -> None:
async for order in orders:
# Join с таблицей пользователей
user = users_table.get(order['user_id'])
enriched_order = {
**order,
'user_name': user.get('name') if user else 'Unknown'
}
await enriched_orders.send(value=enriched_order)import faust
from faust import Stream
app = faust.App('order-processor', broker='kafka://localhost:9092')
orders_topic = app.topic('orders')
dlq_topic = app.topic('orders-dlq')
@app.agent(orders_topic)
async def process_with_error_handling(orders: Stream) -> None:
async for order in orders:
try:
await process_order(order)
except ValidationError as e:
# Валидационная ошибка — отправляем в DLQ
await dlq_topic.send(
key=str(order.get('order_id')),
value={
'error': 'validation_error',
'message': str(e),
'original': order
}
)
except Exception as e:
# Неожиданная ошибка — retry через exception
raise e
async def process_order(order):
# Бизнес-логика
passapp = faust.App(
'order-processor',
broker='kafka://localhost:9092',
transactional_id='order-processor-1', # Включает транзакции
processing_guarantee='exactly_once'
)# Запуск нескольких workers для параллелизма
faust -A app worker -l info --workers=4
# Каждый worker обрабатывает свою часть партиций
# Для N партиций рекомендуется N workersfrom faust import Monitor
class CustomMonitor(Monitor):
def on_agent_process_start(self, agent):
print(f'Agent {agent.name} started processing')
def on_agent_process_end(self, agent):
print(f'Agent {agent.name} finished processing')
app = faust.App(
'my-app',
broker='kafka://localhost:9092',
monitor=CustomMonitor()
)| Характеристика | Kafka Streams | Faust |
|---|---|---|
| Язык | Java/Scala | Python |
| Производительность | Высокая | Средняя |
| State Store | RocksDB | SQLite / Memory |
| Транзакции | Да | Да (exactly_once) |
| Windowing | Все типы | Tumbling, Hopping, Session |
| Экосистема | Confluent Platform | Python ecosystem |
В следующей теме вы изучите Транзакции в Kafka:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.