Работа с WebSocket-соединениями, broadcast-рассылка, Server-Sent Events, realtime-приложения
Realtime-коммуникация с клиентами через WebSocket и SSE
WebSocket — протокол двусторонней связи поверх TCP, позволяющий серверу и клиенту обмениваться данными в реальном времени.
from starlite import WebSocket, websocket
@websocket("/ws")
async def websocket_handler(socket: WebSocket) -> None:
await socket.accept()
while True:
data = await socket.receive_text()
# Обработка данных
await socket.send_text(f"Echo: {data}")from starlite import WebSocket
@websocket("/ws")
async def websocket_handler(socket: WebSocket) -> None:
# Принять соединение
await socket.accept()
# Принять с подпротоколом
await socket.accept(subprotocol="graphql-ws")
# Принять с заголовками
await socket.accept(headers={"X-Custom": "value"})WebSocket поддерживает различные типы данных:
from starlite import WebSocket
@websocket("/ws")
async def websocket_handler(socket: WebSocket) -> None:
await socket.accept()
# Текст
text_data = await socket.receive_text()
await socket.send_text("Hello")
# Бинарные данные
bytes_data = await socket.receive_bytes()
await socket.send_bytes(b"\x00\x01\x02")
# JSON
json_data = await socket.receive_json()
await socket.send_json({"status": "ok"})from starlite import WebSocket
@websocket("/ws")
async def websocket_handler(socket: WebSocket) -> None:
await socket.accept()
try:
# Работа с соединением
...
except Exception:
# Закрыть с кодом ошибки
await socket.close(code=1011, reason="Internal error")
finally:
# Нормальное закрытие
await socket.close(code=1000, reason="Goodbye")Коды закрытия:
1000 — нормальное закрытие1001 — клиент уходит1006 — обрыв соединения1011 — ошибка сервераДля рассылки сообщений множеству клиентов используйте хранилище подключений:
from starlite import WebSocket, websocket
from typing import Set
# Глобальное хранилище подключений
active_connections: Set[WebSocket] = set()
@websocket("/ws")
async def websocket_handler(socket: WebSocket) -> None:
await socket.accept()
active_connections.add(socket)
try:
while True:
data = await socket.receive_text()
# Broadcast всем подключенным клиентам
for conn in active_connections:
await conn.send_text(f"Broadcast: {data}")
except Exception:
active_connections.remove(socket)
await socket.close()Создайте класс для управления подключениями:
from starlite import WebSocket
from typing import Set, Dict
import asyncio
class ConnectionManager:
def __init__(self):
self.active_connections: Set[WebSocket] = set()
self.lock = asyncio.Lock()
async def connect(self, socket: WebSocket) -> None:
await socket.accept()
async with self.lock:
self.active_connections.add(socket)
async def disconnect(self, socket: WebSocket) -> None:
async with self.lock:
self.active_connections.remove(socket)
await socket.close()
async def broadcast(self, message: str) -> None:
async with self.lock:
disconnected = set()
for conn in self.active_connections:
try:
await conn.send_text(message)
except Exception:
disconnected.add(conn)
# Удалить отключившихся
self.active_connections -= disconnected
async def send_personal(self, socket: WebSocket, message: str) -> None:
await socket.send_text(message)
manager = ConnectionManager()
@websocket("/ws")
async def websocket_handler(socket: WebSocket) -> None:
await manager.connect(socket)
try:
while True:
data = await socket.receive_text()
await manager.broadcast(f"User: {data}")
except Exception:
await manager.disconnect(socket)Группировка клиентов по комнатам:
from starlite import WebSocket, websocket
from typing import Dict, Set
import asyncio
class RoomManager:
def __init__(self):
self.rooms: Dict[str, Set[WebSocket]] = {}
self.lock = asyncio.Lock()
async def join_room(
self,
socket: WebSocket,
room: str
) -> None:
async with self.lock:
if room not in self.rooms:
self.rooms[room] = set()
self.rooms[room].add(socket)
async def leave_room(
self,
socket: WebSocket,
room: str
) -> None:
async with self.lock:
if room in self.rooms:
self.rooms[room].discard(socket)
if not self.rooms[room]:
del self.rooms[room]
async def broadcast_to_room(
self,
room: str,
message: str
) -> None:
async with self.lock:
if room in self.rooms:
disconnected = set()
for conn in self.rooms[room]:
try:
await conn.send_text(message)
except Exception:
disconnected.add(conn)
self.rooms[room] -= disconnected
room_manager = RoomManager()
@websocket("/ws/{room:str}")
async def websocket_handler(
socket: WebSocket,
room: str
) -> None:
await room_manager.join_room(socket, room)
await room_manager.broadcast_to_room(
room,
f"User joined {room}"
)
try:
while True:
data = await socket.receive_text()
await room_manager.broadcast_to_room(
room,
f"User in {room}: {data}"
)
except Exception:
await room_manager.leave_room(socket, room)
await socket.close()SSE — технология односторонней рассылки событий от сервера клиенту поверх HTTP.
from starlite import ServerSentEvent, get
from typing import AsyncGenerator
@get("/events")
async def sse_handler() -> AsyncGenerator[ServerSentEvent, None]:
for i in range(5):
yield ServerSentEvent(data=f"Message {i}")from starlite import ServerSentEvent, get
from typing import AsyncGenerator
import asyncio
@get("/events")
async def sse_handler() -> AsyncGenerator[ServerSentEvent, None]:
# Событие с типом
yield ServerSentEvent(
data={"status": "started"},
event="status",
)
# Событие с ID
yield ServerSentEvent(
data={"message": "Processing"},
event="progress",
id="1",
)
# Событие с retry
yield ServerSentEvent(
data={"error": "Retry in 5s"},
event="error",
retry=5000, # миллисекунды
)
await asyncio.sleep(1)from starlite import ServerSentEvent, get
from typing import AsyncGenerator
import asyncio
async def data_stream() -> AsyncGenerator[dict, None]:
"""Генератор данных"""
for i in range(100):
yield {"count": i, "timestamp": asyncio.get_event_loop().time()}
await asyncio.sleep(0.1)
@get("/stream")
async def stream_handler() -> AsyncGenerator[ServerSentEvent, None]:
async for data in data_stream():
yield ServerSentEvent(
data=data,
event="data",
)from starlite import ServerSentEvent, get
from typing import AsyncGenerator
@get("/events")
async def sse_handler() -> AsyncGenerator[ServerSentEvent, None]:
# Комментарий (keep-alive)
yield ServerSentEvent(comment="keep-alive")
yield ServerSentEvent(
data={"message": "Hello"},
event="message",
)| Характеристика | WebSocket | SSE |
|---|---|---|
| Направление | Двустороннее | Одностороннее (сервер → клиент) |
| Протокол | ws://, wss:// | http://, https:// |
| Формат данных | Текст, бинарные, JSON | Текст (обычно JSON) |
| Переподключение | Вручную | Автоматически браузером |
| Поддержка | Все браузеры | Все современные браузеры |
| Use case | Чат, игры, collaboration | Уведомления, ленты, мониторинг |
# ✅ Хорошо
@websocket("/ws")
async def websocket_handler(socket: WebSocket) -> None:
await socket.accept()
try:
while True:
data = await socket.receive_text()
...
except Exception:
# Клиент отключился
await socket.close()
# ❌ Плохо — бесконечный цикл без обработки ошибок
@websocket("/ws")
async def websocket_handler(socket: WebSocket) -> None:
await socket.accept()
while True:
data = await socket.receive_text()
...import asyncio
from starlite import WebSocket, websocket
@websocket("/ws")
async def websocket_handler(socket: WebSocket) -> None:
await socket.accept()
async def heartbeat():
while True:
await asyncio.sleep(30)
try:
await socket.send_text("ping")
except Exception:
break
# Запуск heartbeat в фоне
asyncio.create_task(heartbeat())
try:
while True:
data = await socket.receive_text()
...
except Exception:
await socket.close()MAX_MESSAGE_SIZE = 1024 * 1024 # 1MB
@websocket("/ws")
async def websocket_handler(socket: WebSocket) -> None:
await socket.accept()
while True:
data = await socket.receive()
if len(data) > MAX_MESSAGE_SIZE:
await socket.close(code=1009, reason="Message too large")
break
...from pydantic import BaseModel, ValidationError
class ChatMessage(BaseModel):
text: str
room_id: str
@websocket("/ws")
async def websocket_handler(socket: WebSocket) -> None:
await socket.accept()
try:
while True:
data = await socket.receive_json()
try:
message = ChatMessage(**data)
# Обработка валидного сообщения
...
except ValidationError as e:
await socket.send_json({
"error": "Invalid message",
"details": e.errors()
})
except Exception:
await socket.close()# ✅ Хорошо — SSE для уведомлений
@get("/notifications")
async def notifications_handler(
user: User
) -> AsyncGenerator[ServerSentEvent, None]:
async for event in user_notification_stream(user):
yield ServerSentEvent(
data=event,
event="notification",
)
# ❌ Избыточно — WebSocket для простых уведомлений
@websocket("/notifications")
async def notifications_handler(socket: WebSocket) -> None:
...Realtime-возможности Starlite включают:
В следующей теме мы изучим OpenAPI и документацию.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.