Структура Streams, consumer groups, обработка очередей задач
Redis Streams — это структура данных для хранения последовательности записей (как лог событий). В отличие от Pub/Sub, Streams сохраняет историю сообщений и поддерживает consumer groups с подтверждением обработки (ack/nack). Идеально для очередей задач, event sourcing, аудита.
import redis.asyncio as redis
from datetime import datetime
class StreamProducer:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def add(self, stream_name: str, data: dict, id: str = '*') -> str:
"""
Добавление записи в поток.
id='*' — Redis сгенерирует уникальный ID (timestamp-sequence).
Возвращает ID добавленной записи.
"""
# Streams требуют строковых значений
str_data = {k: str(v) for k, v in data.items()}
message_id = await self.redis.xadd(stream_name, str_data, id=id)
return message_id
async def add_with_timestamp(self, stream_name: str, data: dict) -> str:
"""Добавление с явным timestamp"""
# ID формата timestamp-sequence (например, '1709395200000-0')
timestamp = int(datetime.utcnow().timestamp() * 1000) # миллисекунды
message_id = await self.redis.xadd(stream_name, data, id=f'{timestamp}-*')
return message_id
# Использование
producer = StreamProducer(redis_client)
message_id = await producer.add('tasks:email', {
'task_type': 'welcome',
'user_id': '1001',
'email': 'user@example.com',
})
print(f"Added message with ID: {message_id}")class StreamConsumer:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def read_latest(self, stream_name: str, count: int = 10) -> list:
"""Чтение последних N записей"""
messages = await self.redis.xrevrange(stream_name, max='+', min='-', count=count)
return messages
async def read_from_id(self, stream_name: str, start_id: str, end_id: str = '+') -> list:
"""Чтение записей с указанного ID"""
messages = await self.redis.xrange(stream_name, min=start_id, max=end_id)
return messages
async def read_blocking(self, stream_name: str, timeout: int = 5000) -> list:
"""
Блокирующее чтение (ждёт новых записей).
timeout в миллисекундах.
"""
messages = await self.redis.xread(
streams={stream_name: '$'}, # '$' = только новые записи
block=timeout,
count=10,
)
return messages
# Использование
consumer = StreamConsumer(redis_client)
# Чтение последних 10 записей
messages = await consumer.read_latest('tasks:email', count=10)
for message_id, fields in messages:
print(f"ID: {message_id}, Data: {fields}")
# Блокирующее чтение (ждать новые записи до 5 секунд)
messages = await consumer.read_blocking('tasks:email', timeout=5000)class StreamManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def get_length(self, stream_name: str) -> int:
"""Длина потока (количество записей)"""
return await self.redis.xlen(stream_name)
async def trim(self, stream_name: str, max_length: int):
"""
Обрезка потока (удаление старых записей).
Оставляет последние max_length записей.
"""
await self.redis.xtrim(stream_name, maxlen=max_length)
async def delete(self, stream_name: str, *message_ids: str):
"""Удаление записей по ID"""
await self.redis.xdel(stream_name, *message_ids)
async def get_info(self, stream_name: str) -> dict:
"""Информация о потоке"""
info = await self.redis.xinfo_stream(stream_name)
return info
# Использование
manager = StreamManager(redis_client)
# Обрезка: оставить последние 1000 записей
await manager.trim('tasks:email', max_length=1000)
# Удаление конкретных записей
await manager.delete('tasks:email', '1709395200000-0', '1709395200000-1')
# Информация о потоке
info = await manager.get_info('tasks:email')
print(f"Length: {info['length']}, First entry: {info['first-entry']}, Last entry: {info['last-entry']}")Consumer groups позволяют нескольким потребителям обрабатывать сообщения из одного потока с load balancing и гарантией доставки.
class StreamGroupManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def create_group(
self,
stream_name: str,
group_name: str,
id: str = '0',
mkstream: bool = True,
):
"""
Создание consumer group.
id='0' — читать все сообщения с начала.
id='$' — читать только новые сообщения.
mkstream=True — создать поток, если не существует.
"""
try:
await self.redis.xgroup_create(
stream_name,
group_name,
id=id,
mkstream=mkstream,
)
except redis.ResponseError as e:
if 'BUSYGROUP' in str(e):
# Группа уже существует
pass
else:
raise
async def delete_group(self, stream_name: str, group_name: str):
"""Удаление consumer group"""
await self.redis.xgroup_destroy(stream_name, group_name)
async def set_last_delivered_id(
self,
stream_name: str,
group_name: str,
id: str,
):
"""Установка ID последнего доставленного сообщения"""
await self.redis.xgroup_setid(stream_name, group_name, id)class StreamWorker:
def __init__(self, redis_client: redis.Redis, stream_name: str, group_name: str, consumer_name: str):
self.redis = redis_client
self.stream_name = stream_name
self.group_name = group_name
self.consumer_name = consumer_name
async def read_messages(self, count: int = 10, block: int = 5000) -> list:
"""
Чтение сообщений из consumer group.
block=0 — блокировать бесконечно.
Возвращает [(message_id, {fields}), ...]
"""
messages = await self.redis.xreadgroup(
groupname=self.group_name,
consumername=self.consumer_name,
streams={self.stream_name: '>'}, # '>' = только новые сообщения
count=count,
block=block,
)
if messages:
# xreadgroup возвращает {stream_name: [(id, fields), ...]}
return messages[0][1]
return []
async def acknowledge(self, *message_ids: str) -> int:
"""
Подтверждение обработки сообщений.
Возвращает количество подтверждённых сообщений.
"""
return await self.redis.xack(self.stream_name, self.group_name, *message_ids)
async def claim_pending(
self,
min_idle_time: int,
*message_ids: str,
) -> list:
"""
Claim зависших сообщений от других потребителей.
min_idle_time: время бездействия в мс.
"""
return await self.redis.xclaim(
self.stream_name,
self.group_name,
self.consumer_name,
min_idle_time,
*message_ids,
)
async def get_pending(self) -> dict:
"""Получение информации о pending сообщениях"""
return await self.redis.xpending(self.stream_name, self.group_name)
# Использование воркера
async def run_worker():
redis_client = redis.from_url('redis://localhost:6379/0')
# Создание группы (один раз при деплое)
group_manager = StreamGroupManager(redis_client)
await group_manager.create_group('tasks:email', 'email-workers', id='0')
# Воркер
worker = StreamWorker(redis_client, 'tasks:email', 'email-workers', 'worker-1')
while True:
# Чтение сообщений
messages = await worker.read_messages(count=10, block=5000)
for message_id, fields in messages:
try:
# Обработка задачи
await process_email_task(fields)
# Подтверждение обработки
await worker.acknowledge(message_id)
except Exception as e:
# Ошибка обработки — сообщение останется pending
# и может быть claim'нуто другим воркером
print(f"Error processing {message_id}: {e}")class PendingMessageHandler:
def __init__(self, redis_client: redis.Redis, stream_name: str, group_name: str, consumer_name: str):
self.redis = redis_client
self.stream_name = stream_name
self.group_name = group_name
self.consumer_name = consumer_name
async def get_pending_details(
self,
min_idle_time: int = 0,
count: int = 10,
) -> list:
"""
Получение деталей pending сообщений.
min_idle_time: минимальное время бездействия в мс.
"""
pending = await self.redis.xpending_range(
self.stream_name,
self.group_name,
min='-',
max='+',
count=count,
consumername=self.consumer_name,
)
return pending
async def claim_idle_messages(
self,
min_idle_time_ms: int = 60000, # 1 минута
count: int = 10,
) -> list:
"""
Claim сообщений, которые зависли у других потребителей.
"""
# Сначала получаем список pending сообщений
pending = await self.redis.xpending_range(
self.stream_name,
self.group_name,
min='-',
max='+',
count=count,
)
# Фильтруем по времени бездействия
idle_messages = [p['message_id'] for p in pending if p['time_since_delivered'] >= min_idle_time_ms]
if not idle_messages:
return []
# Claim сообщений
claimed = await self.redis.xclaim(
self.stream_name,
self.group_name,
self.consumer_name,
min_idle_time_ms,
*idle_messages,
)
return claimed
# Воркер с обработкой зависших сообщений
async def run_resilient_worker():
redis_client = redis.from_url('redis://localhost:6379/0')
worker = StreamWorker(redis_client, 'tasks:email', 'email-workers', 'worker-1')
pending_handler = PendingMessageHandler(redis_client, 'tasks:email', 'email-workers', 'worker-1')
while True:
# Сначала обрабатываем зависшие сообщения
claimed = await pending_handler.claim_idle_messages(min_idle_time_ms=60000, count=10)
for message_id, fields in claimed:
try:
await process_email_task(fields)
await worker.acknowledge(message_id)
except Exception as e:
print(f"Error processing claimed message {message_id}: {e}")
# Затем читаем новые сообщения
messages = await worker.read_messages(count=10, block=5000)
for message_id, fields in messages:
try:
await process_email_task(fields)
await worker.acknowledge(message_id)
except Exception as e:
print(f"Error processing new message {message_id}: {e}")import json
from enum import Enum
from typing import Any, Optional
from datetime import datetime, timedelta
class TaskPriority(str, Enum):
LOW = 'low'
NORMAL = 'normal'
HIGH = 'high'
URGENT = 'urgent'
class TaskProducer:
def __init__(self, redis_client: redis.Redis, stream_name: str = 'tasks:main'):
self.redis = redis_client
self.stream_name = stream_name
async def enqueue(
self,
task_type: str,
payload: dict,
priority: TaskPriority = TaskPriority.NORMAL,
delay_seconds: int = 0,
max_retries: int = 3,
) -> str:
"""
Добавление задачи в очередь.
priority: влияет на порядок обработки (через score в delayed очереди).
delay_seconds: задержка перед выполнением.
max_retries: максимальное количество попыток.
"""
task_id = str(uuid.uuid4())
now = datetime.utcnow()
task_data = {
'task_id': task_id,
'task_type': task_type,
'payload': json.dumps(payload),
'priority': priority.value,
'max_retries': str(max_retries),
'retry_count': '0',
'created_at': now.isoformat(),
'status': 'pending',
}
if delay_seconds > 0:
# Отложенная задача
execute_at = now + timedelta(seconds=delay_seconds)
task_data['execute_at'] = execute_at.isoformat()
task_data['status'] = 'delayed'
# Добавляем в delayed очередь (sorted set)
execute_timestamp = int(execute_at.timestamp() * 1000)
await self.redis.zadd(
f'{self.stream_name}:delayed',
{json.dumps(task_data): execute_timestamp},
)
else:
# Немедленное выполнение
await self.redis.xadd(self.stream_name, task_data)
return task_id
async def enqueue_batch(self, tasks: list[dict]) -> list[str]:
"""Пакетное добавление задач"""
pipeline = self.redis.pipeline()
task_ids = []
for task in tasks:
task_id = str(uuid.uuid4())
task_data = {
'task_id': task_id,
'task_type': task['type'],
'payload': json.dumps(task.get('payload', {})),
'priority': task.get('priority', TaskPriority.NORMAL.value),
'retry_count': '0',
'created_at': datetime.utcnow().isoformat(),
}
pipeline.xadd(self.stream_name, task_data)
task_ids.append(task_id)
await pipeline.execute()
return task_idsclass TaskWorker:
def __init__(
self,
redis_client: redis.Redis,
stream_name: str = 'tasks:main',
group_name: str = 'task-workers',
consumer_name: str = 'worker-1',
poll_interval: float = 1.0,
):
self.redis = redis_client
self.stream_name = stream_name
self.group_name = group_name
self.consumer_name = consumer_name
self.poll_interval = poll_interval
self._running = False
self._task_handlers: dict[str, callable] = {}
def register_task(self, task_type: str, handler: callable):
"""Регистрация обработчика для типа задачи"""
self._task_handlers[task_type] = handler
async def start(self):
"""Запуск воркера"""
# Создание consumer group
try:
await self.redis.xgroup_create(
self.stream_name,
self.group_name,
id='0',
mkstream=True,
)
except redis.ResponseError as e:
if 'BUSYGROUP' not in str(e):
raise
self._running = True
# Запуск процесса переноса отложенных задач
asyncio.create_task(self._process_delayed_tasks())
# Основной цикл обработки
await self._process_messages()
async def stop(self):
"""Остановка воркера"""
self._running = False
async def _process_messages(self):
"""Основной цикл обработки сообщений"""
while self._running:
try:
messages = await self.redis.xreadgroup(
groupname=self.group_name,
consumername=self.consumer_name,
streams={self.stream_name: '>'},
count=10,
block=5000,
)
if messages:
for stream, msgs in messages:
for message_id, fields in msgs:
await self._process_task(message_id, fields)
except asyncio.CancelledError:
break
except Exception as e:
print(f"Error in worker loop: {e}")
await asyncio.sleep(1)
async def _process_task(self, message_id: str, fields: dict):
"""Обработка одной задачи"""
task_type = fields.get('task_type')
task_id = fields.get('task_id')
retry_count = int(fields.get('retry_count', 0))
max_retries = int(fields.get('max_retries', 3))
handler = self._task_handlers.get(task_type)
if not handler:
print(f"No handler for task type: {task_type}")
await self.redis.xack(self.stream_name, self.group_name, message_id)
return
try:
payload = json.loads(fields.get('payload', '{}'))
await handler(payload)
# Успешная обработка — ack
await self.redis.xack(self.stream_name, self.group_name, message_id)
except Exception as e:
print(f"Task {task_id} failed: {e}")
if retry_count < max_retries:
# Retry: увеличиваем retry_count и добавляем обратно
await self.redis.xadd(self.stream_name, {
**fields,
'retry_count': str(retry_count + 1),
'last_error': str(e),
'retried_at': datetime.utcnow().isoformat(),
})
await self.redis.xack(self.stream_name, self.group_name, message_id)
else:
# Максимум попыток исчерпан — перемещаем в dead letter
await self._move_to_dead_letter(message_id, fields, str(e))
await self.redis.xack(self.stream_name, self.group_name, message_id)
async def _move_to_dead_letter(self, message_id: str, fields: dict, error: str):
"""Перемещение неудачной задачи в dead letter queue"""
dead_letter_data = {
**fields,
'original_message_id': message_id,
'error': error,
'moved_to_dlq_at': datetime.utcnow().isoformat(),
}
await self.redis.xadd(f'{self.stream_name}:dead', dead_letter_data)
async def _process_delayed_tasks(self):
"""Перенос готовых отложенных задач в основную очередь"""
while self._running:
try:
now = int(datetime.utcnow().timestamp() * 1000)
# Получаем задачи, которые должны выполниться сейчас
ready_tasks = await self.redis.zrangebyscore(
f'{self.stream_name}:delayed',
'-inf',
now,
)
if ready_tasks:
pipeline = self.redis.pipeline()
for task_json in ready_tasks:
task_data = json.loads(task_json)
pipeline.xadd(self.stream_name, task_data)
pipeline.zrem(f'{self.stream_name}:delayed', task_json)
await pipeline.execute()
await asyncio.sleep(self.poll_interval)
except Exception as e:
print(f"Error processing delayed tasks: {e}")
await asyncio.sleep(1)
# Использование
async def send_email_handler(payload: dict):
print(f"Sending email to {payload['email']}: {payload['subject']}")
# Логика отправки email...
async def main():
redis_client = redis.from_url('redis://localhost:6379/0')
# Producer
producer = TaskProducer(redis_client)
# Добавление задачи
task_id = await producer.enqueue(
task_type='send_email',
payload={'email': 'user@example.com', 'subject': 'Welcome!'},
priority=TaskPriority.NORMAL,
)
print(f"Task queued: {task_id}")
# Worker
worker = TaskWorker(redis_client, consumer_name='worker-1')
worker.register_task('send_email', send_email_handler)
# Запуск воркера
await worker.start()class StreamMonitor:
def __init__(self, redis_client: redis.Redis, stream_name: str):
self.redis = redis_client
self.stream_name = stream_name
async def get_stream_info(self) -> dict:
"""Информация о потоке"""
info = await self.redis.xinfo_stream(self.stream_name)
return {
'length': info['length'],
'first_entry': info['first-entry'],
'last_entry': info['last-entry'],
'groups': info['groups'],
}
async def get_group_info(self, group_name: str) -> dict:
"""Информация о consumer group"""
groups = await self.redis.xinfo_groups(self.stream_name)
for group in groups:
if group['name'] == group_name:
return {
'consumers': group['consumers'],
'pending': group['pending'],
'last_delivered_id': group['last-delivered-id'],
}
return None
async def get_pending_summary(self, group_name: str) -> dict:
"""Сводка по pending сообщениям"""
pending = await self.redis.xpending(self.stream_name, group_name)
return {
'total_pending': pending['pending'],
'min_id': pending['min'],
'max_id': pending['max'],
'consumers': pending['consumers'], # {consumer_name: count}
}
async def get_queue_stats(self) -> dict:
"""Статистика очереди задач"""
main_length = await self.redis.xlen(self.stream_name)
delayed_count = await self.redis.zcard(f'{self.stream_name}:delayed')
dead_letter_length = await self.redis.xlen(f'{self.stream_name}:dead')
return {
'pending_tasks': main_length,
'delayed_tasks': delayed_count,
'dead_letter_tasks': dead_letter_length,
}Проверьте понимание → ответьте на вопросы в streams.json
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.