Паттерн Publish/Subscribe, WebSocket интеграция, real-time фичи
Redis Pub/Sub — это паттерн обмена сообщениями: отправители (publishers) отправляют сообщения в каналы, получатели (subscribers) получают все сообщения из каналов, на которые подписаны. Идеально для real-time уведомлений, чатов, live-обновлений.
import redis.asyncio as redis
class PubSubPublisher:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def publish(self, channel: str, message: str) -> int:
"""
Публикация сообщения в канал.
Возвращает количество подписчиков, получивших сообщение.
"""
return await self.redis.publish(channel, message)
async def publish_json(self, channel: str, data: dict) -> int:
"""Публикация JSON сообщения"""
import json
return await self.redis.publish(channel, json.dumps(data))
# Использование
publisher = PubSubPublisher(redis_client)
# Отправка сообщения
subscribers_count = await publisher.publish(
'notifications',
'New order #123!',
)
# Отправка JSON
await publisher.publish_json('notifications', {
'type': 'order_created',
'order_id': 123,
'user_id': 456,
})import redis
class PubSubSubscriber:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.pubsub = self.redis.pubsub()
async def subscribe(self, *channels: str):
"""Подписка на каналы"""
await self.pubsub.subscribe(*channels)
async def subscribe_pattern(self, pattern: str):
"""Подписка по паттерну"""
await self.pubsub.psubscribe(pattern)
async def listen(self):
"""Слушание сообщений"""
async for message in self.pubsub.listen():
if message['type'] == 'message':
yield {
'channel': message['channel'],
'data': message['data'],
}
async def unsubscribe(self, *channels: str):
"""Отписка от каналов"""
await self.pubsub.unsubscribe(*channels)
async def close(self):
"""Закрытие соединения"""
await self.pubsub.close()
# Использование
subscriber = PubSubSubscriber(redis_client)
await subscriber.subscribe('notifications', 'chat:room:1')
async for message in subscriber.listen():
print(f"Received on {message['channel']}: {message['data']}")import redis.asyncio as redis
class AsyncPubSubSubscriber:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.pubsub = self.redis.pubsub()
async def subscribe(self, *channels: str):
await self.pubsub.subscribe(*channels)
async def listen(self):
async for message in self.pubsub.listen():
if message['type'] == 'message':
yield message
async def close(self):
await self.pubsub.close()
await self.redis.close()from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
from fastapi.responses import HTMLResponse
import redis.asyncio as redis
import json
from typing import Dict, Set
import asyncio
app = FastAPI()
class ConnectionManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
# room_id -> set of WebSocket connections
self.active_connections: Dict[str, Set[WebSocket]] = {}
# websocket -> room_id mapping
self.websocket_rooms: Dict[WebSocket, str] = {}
async def connect(self, websocket: WebSocket, room_id: str):
"""Подключение клиента к комнате"""
await websocket.accept()
if room_id not in self.active_connections:
self.active_connections[room_id] = set()
self.active_connections[room_id].add(websocket)
self.websocket_rooms[websocket] = room_id
# Подписка на Redis канал комнаты
pubsub = self.redis.pubsub()
await pubsub.subscribe(f'room:{room_id}')
# Запуск задачи для получения сообщений из Redis
asyncio.create_task(self._redis_listener(pubsub, room_id))
def disconnect(self, websocket: WebSocket):
"""Отключение клиента"""
room_id = self.websocket_rooms.get(websocket)
if room_id:
self.active_connections[room_id].discard(websocket)
del self.websocket_rooms[websocket]
if not self.active_connections[room_id]:
del self.active_connections[room_id]
async def _redis_listener(self, pubsub, room_id: str):
"""Слушание сообщений из Redis и отправка клиентам"""
try:
async for message in pubsub.listen():
if message['type'] == 'message':
# Отправка всем клиентам в комнате
await self.broadcast(room_id, message['data'])
except asyncio.CancelledError:
pass
finally:
await pubsub.unsubscribe(f'room:{room_id}')
await pubsub.close()
async def broadcast(self, room_id: str, message: str):
"""Отправка сообщения всем клиентам в комнате"""
if room_id not in self.active_connections:
return
disconnected = set()
for connection in self.active_connections[room_id]:
try:
await connection.send_text(message)
except Exception:
disconnected.add(connection)
# Удаление отключившихся клиентов
for connection in disconnected:
self.disconnect(connection)
async def send_to_room(self, room_id: str, message: dict):
"""Публикация сообщения в Redis для комнаты"""
await self.redis.publish(
f'room:{room_id}',
json.dumps(message),
)
# Глобальный менеджер
manager: ConnectionManager | None = None
@app.on_event("startup")
async def startup():
global manager
redis_client = redis.from_url('redis://localhost:6379/0')
manager = ConnectionManager(redis_client)
@app.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
await manager.connect(websocket, room_id)
try:
while True:
# Получение сообщения от клиента
data = await websocket.receive_text()
message = json.loads(data)
# Публикация в Redis
await manager.send_to_room(room_id, {
'type': 'message',
'user': message.get('user'),
'text': message.get('text'),
'timestamp': asyncio.get_event_loop().time(),
})
except WebSocketDisconnect:
manager.disconnect(websocket)
# Уведомление других участников
await manager.send_to_room(room_id, {
'type': 'user_left',
'user': 'anonymous',
})
# HTTP endpoint для отправки сообщений в комнату
@app.post("/room/{room_id}/message")
async def send_room_message(room_id: str, message: dict):
await manager.send_to_room(room_id, {
'type': 'message',
'user': 'system',
'text': message.get('text'),
})
return {'status': 'sent'}from fastapi import FastAPI, WebSocket, Depends, HTTPException
from fastapi.security import HTTPBearer
import redis.asyncio as redis
import json
app = FastAPI()
security = HTTPBearer()
class NotificationService:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.user_connections: Dict[int, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: int):
"""Подключение пользователя к уведомлениям"""
await websocket.accept()
if user_id not in self.user_connections:
self.user_connections[user_id] = set()
self.user_connections[user_id].add(websocket)
# Подписка на персональный канал пользователя
pubsub = self.redis.pubsub()
await pubsub.subscribe(f'user:{user_id}:notifications')
# Запуск listener
asyncio.create_task(self._listen_user(pubsub, user_id))
def disconnect(self, websocket: WebSocket, user_id: int):
"""Отключение пользователя"""
if user_id in self.user_connections:
self.user_connections[user_id].discard(websocket)
if not self.user_connections[user_id]:
del self.user_connections[user_id]
async def _listen_user(self, pubsub, user_id: int):
"""Слушание уведомлений пользователя"""
try:
async for message in pubsub.listen():
if message['type'] == 'message':
await self._send_to_user(user_id, message['data'])
except asyncio.CancelledError:
pass
finally:
await pubsub.unsubscribe(f'user:{user_id}:notifications')
await pubsub.close()
async def _send_to_user(self, user_id: int, message: str):
"""Отправка сообщения всем подключениям пользователя"""
if user_id not in self.user_connections:
return
disconnected = set()
for connection in self.user_connections[user_id]:
try:
await connection.send_text(message)
except Exception:
disconnected.add(connection)
for connection in disconnected:
self.disconnect(connection, user_id)
async def notify_user(self, user_id: int, notification: dict):
"""Отправка уведомления пользователю через Redis"""
await self.redis.publish(
f'user:{user_id}:notifications',
json.dumps(notification),
)
async def notify_users(self, user_ids: list[int], notification: dict):
"""Отправка уведомления нескольким пользователям"""
for user_id in user_ids:
await self.notify_user(user_id, notification)
async def broadcast(self, notification: dict):
"""Широковещательное уведомление всем"""
await self.redis.publish(
'notifications:global',
json.dumps(notification),
)
notification_service: NotificationService | None = None
@app.on_event("startup")
async def startup():
global notification_service
redis_client = redis.from_url('redis://localhost:6379/0')
notification_service = NotificationService(redis_client)
@app.websocket("/ws/notifications")
async def notifications_websocket(
websocket: WebSocket,
):
# Здесь должна быть аутентификация
# credentials = await security(websocket)
# user_id = get_user_from_token(credentials.credentials)
# Для примера берём user_id из query params
user_id = int(websocket.query_params.get('user_id', 0))
if not user_id:
await websocket.close(code=4000, reason='user_id required')
return
await notification_service.connect(websocket, user_id)
try:
# Держим соединение открытым
while True:
await websocket.receive_text() # heartbeat
except WebSocketDisconnect:
notification_service.disconnect(websocket, user_id)
# Endpoint для отправки уведомления
@app.post("/notify/{user_id}")
async def send_notification(user_id: int, notification: dict):
await notification_service.notify_user(user_id, notification)
return {'status': 'sent'}Подписка по паттерну позволяет получать сообщения из нескольких каналов сразу.
class PatternSubscriber:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.pubsub = self.redis.pubsub()
async def subscribe_pattern(self, pattern: str):
"""
Подписка по паттерну.
pattern: 'chat:*' — все каналы, начинающиеся с 'chat:'
"""
await self.pubsub.psubscribe(pattern)
async def listen(self):
async for message in self.pubsub.listen():
if message['type'] == 'pmessage':
yield {
'pattern': message['pattern'],
'channel': message['channel'],
'data': message['data'],
}
# Использование
subscriber = PatternSubscriber(redis_client)
await subscriber.subscribe_pattern('chat:room:*')
async for message in subscriber.listen():
print(f"Pattern {message['pattern']}, Channel {message['channel']}: {message['data']}")class EventBus:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def publish_event(self, event_type: str, payload: dict):
"""Публикация события"""
event = {
'type': event_type,
'payload': payload,
'timestamp': datetime.utcnow().isoformat(),
}
await self.redis.publish(f'events:{event_type}', json.dumps(event))
async def subscribe_events(self, *event_types: str):
"""Подписка на события"""
pubsub = self.redis.pubsub()
# Подписка на конкретные типы событий
channels = [f'events:{et}' for et in event_types]
await pubsub.subscribe(*channels)
return pubsub
async def subscribe_all_events(self):
"""Подписка на все события"""
pubsub = self.redis.pubsub()
await pubsub.psubscribe('events:*')
return pubsub
# Пример: сервис заказов
class OrderService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
async def create_order(self, user_id: int, items: list) -> dict:
# Создание заказа в БД
order = await db.create_order(user_id, items)
# Публикация события
await self.event_bus.publish_event('order.created', {
'order_id': order['id'],
'user_id': user_id,
'items': items,
})
return order
# Пример: сервис уведомлений (подписчик)
class NotificationService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
async def start_listening(self):
pubsub = await self.event_bus.subscribe_events('order.created', 'order.shipped')
async for message in pubsub.listen():
if message['type'] == 'message':
event = json.loads(message['data'])
await self.handle_event(event)
async def handle_event(self, event: dict):
if event['type'] == 'order.created':
await self.send_order_confirmation(event['payload'])
elif event['type'] == 'order.shipped':
await self.send_shipping_notification(event['payload'])| Аспект | Pub/Sub | Streams |
|---|---|---|
| Доставка | Fire-and-forget, нет гарантии | С подтверждением (ack) |
| История | Нет истории, только активные подписчики | Сохраняет историю сообщений |
| Consumer groups | Нет | Да, с load balancing |
| Persistance | Нет | Да, в Redis |
| Use case | Real-time уведомления, чаты | Очереди задач, event sourcing |
Pub/Sub: сообщения не сохраняются. Если нет подписчиков в момент публикации, сообщение теряется. Подходит для real-time обновлений, где важна актуальность, а не доставка.
Streams: сообщения сохраняются в потоке. Подписчики могут читать историю. Consumer groups обеспечивают надёжную обработку с ack/nack.
class ResilientPubSubSubscriber:
def __init__(self, redis_client: redis.Redis, channels: list[str]):
self.redis = redis_client
self.channels = channels
self.pubsub = None
async def subscribe_with_retry(self, max_retries: int = 5):
"""Подписка с повторными попытками при обрыве"""
retry_count = 0
while retry_count < max_retries:
try:
self.pubsub = self.redis.pubsub()
await self.pubsub.subscribe(*self.channels)
retry_count = 0 # Сброс при успешной подписке
async for message in self.pubsub.listen():
if message['type'] == 'message':
yield message
except (redis.ConnectionError, redis.TimeoutError) as e:
retry_count += 1
wait_time = min(2 ** retry_count, 30) # Экспоненциальная задержка
if retry_count >= max_retries:
raise
await asyncio.sleep(wait_time)
async def close(self):
if self.pubsub:
await self.pubsub.close()class ManagedPubSub:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.pubsub = None
self._listen_task = None
async def subscribe(self, *channels: str):
self.pubsub = self.redis.pubsub()
await self.pubsub.subscribe(*channels)
async def start_listening(self, handler):
"""Запуск слушателя с возможностью остановки"""
async def listener():
try:
async for message in self.pubsub.listen():
if message['type'] == 'message':
await handler(message)
except asyncio.CancelledError:
pass
self._listen_task = asyncio.create_task(listener())
async def stop(self):
"""Остановка и очистка"""
if self._listen_task:
self._listen_task.cancel()
try:
await self._listen_task
except asyncio.CancelledError:
pass
if self.pubsub:
await self.pubsub.unsubscribe()
await self.pubsub.close()class PubSubMonitor:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def get_subscriber_count(self, channel: str) -> int:
"""Получение количества подписчиков канала"""
info = await self.redis.pubsub_numsub(channel)
# Возвращает [(channel_name, subscriber_count), ...]
return info[0][1] if info else 0
async def get_channels(self, pattern: str = '*') -> list[str]:
"""Получение каналов по паттерну"""
return await self.redis.pubsub_numpat()Проверьте понимание → ответьте на вопросы в pub_sub.json
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.