WebSocket-рукопожатие, отправка/получение сообщений, менеджер подключений, broadcast, обработка отключений.
Научитесь организовывать двустороннюю связь в реальном времени между сервером и клиентами
WebSocket — протокол полнодуплексной связи поверх TCP. В отличие от HTTP:
| HTTP | WebSocket |
|---|---|
| Запрос-ответ (клиент инициирует) | Двусторонняя связь |
| Соединение закрывается после ответа | Постоянное соединение |
| Нет push-уведомлений от сервера | Сервер может отправлять данные когда угодно |
Use cases:
from aiohttp import web
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# Отправка приветствия
await ws.send_str('Connected!')
# Чтение сообщений от клиента
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
# Эхо-ответ
await ws.send_str(f'Echo: {msg.data}')
elif msg.type == web.WSMsgType.BINARY:
await ws.send_bytes(msg.data)
elif msg.type == web.WSMsgType.ERROR:
print(f'WebSocket error: {ws.exception()}')
break
return ws
app = web.Application()
app.router.add_get('/ws', websocket_handler)| Тип | Описание |
|---|---|
WSMsgType.TEXT | Текстовое сообщение (UTF-8) |
WSMsgType.BINARY | Бинарное сообщение (байты) |
WSMsgType.CLOSE | Клиент закрыл соединение |
WSMsgType.PING | Ping для проверки соединения |
WSMsgType.PONG | Ответ на ping |
# Текст
await ws.send_str('Hello, client!')
# JSON
import json
await ws.send_str(json.dumps({'type': 'notification', 'data': 'Task updated'}))
# Бинарные данные
await ws.send_bytes(b'\x00\x01\x02\x03')
# Закрытие соединения
await ws.close(code=1000, message='Goodbye')# Цикл для чтения
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
data = msg.data # Строка
elif msg.type == web.WSMsgType.BINARY:
data = msg.data # Bytes
elif msg.type == web.WSMsgType.CLOSE:
print(f'Client closed: {msg.extra}')
break
elif msg.type == web.WSMsgType.ERROR:
print(f'Error: {ws.exception()}')
breakimport asyncio
try:
msg = await asyncio.wait_for(ws.receive(), timeout=30.0)
except asyncio.TimeoutError:
# Отправить ping для проверки соединения
await ws.ping()
msg = await ws.receive()from typing import Set, Dict
from aiohttp import web
class WebSocketManager:
def __init__(self):
self.connections: Set[web.WebSocketResponse] = set()
self.user_connections: Dict[int, web.WebSocketResponse] = {}
async def connect(self, ws: web.WebSocketResponse, user_id: int = None):
"""Регистрация подключения"""
self.connections.add(ws)
if user_id:
self.user_connections[user_id] = ws
def disconnect(self, ws: web.WebSocketResponse):
"""Удаление подключения"""
self.connections.discard(ws)
# Найти и удалить по user_id
for uid, conn in list(self.user_connections.items()):
if conn is ws:
del self.user_connections[uid]
break
async def broadcast(self, message: str):
"""Отправка сообщения всем подключенным клиентам"""
disconnected = set()
for ws in self.connections:
try:
await ws.send_str(message)
except ConnectionResetError:
disconnected.add(ws)
# Удалить отключившихся
for ws in disconnected:
self.disconnect(ws)
async def send_to_user(self, user_id: int, message: str):
"""Отправка сообщения конкретному пользователю"""
ws = self.user_connections.get(user_id)
if ws and not ws.closed:
try:
await ws.send_str(message)
except ConnectionResetError:
self.disconnect(ws)
# Глобальный менеджер
ws_manager = WebSocketManager()import json
from aiohttp import web
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# Получить user_id из токена (если есть аутентификация)
user_id = request.get('user', {}).get('id')
# Регистрация
await ws_manager.connect(ws, user_id)
# Уведомление о подключении
await ws.send_str(json.dumps({
'type': 'connected',
'user_id': user_id
}))
try:
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
# Обработка сообщений от клиента
data = json.loads(msg.data)
if data.get('type') == 'subscribe':
# Подписка на обновления
room_id = data.get('room_id')
await ws.send_str(json.dumps({
'type': 'subscribed',
'room_id': room_id
}))
elif data.get('type') == 'message':
# Отправка сообщения в чат
await ws_manager.broadcast(json.dumps({
'type': 'chat_message',
'user_id': user_id,
'text': data.get('text')
}))
elif msg.type == web.WSMsgType.ERROR:
break
finally:
# Отключение
ws_manager.disconnect(ws)
return ws# app/services/task_notifications.py
import json
from aiohttp import web
async def notify_task_update(ws_manager, task_id: int, action: str, task_data: dict):
"""Отправить уведомление об изменении задачи"""
message = json.dumps({
'type': 'task_update',
'action': action, # 'created', 'updated', 'deleted'
'task_id': task_id,
'data': task_data
})
await ws_manager.broadcast(message)
# В обработчике CRUD
async def create_task(request):
data = await request.json()
task = await create_task_in_db(data)
# Уведомление через WebSocket
await notify_task_update(
request.app['ws_manager'],
task['id'],
'created',
task
)
return web.json_response(task)Альтернатива WebSocket для push-уведомлений:
from aiohttp import web
import asyncio
import json
async def sse_handler(request):
response = web.StreamResponse(
status=200,
headers={
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}
)
await response.prepare(request)
while True:
try:
# Отправка события
data = json.dumps({'time': asyncio.get_event_loop().time()})
await response.write(f'data: {data}\n\n'.encode())
await asyncio.sleep(1)
except ConnectionResetError:
break
return response
app.router.add_get('/events', sse_handler)// Подключение
const ws = new WebSocket('ws://localhost:8080/ws');
// Обработка подключения
ws.onopen = () => {
console.log('Connected to WebSocket');
// Отправка сообщения
ws.send(JSON.stringify({
type: 'subscribe',
room_id: 123
}));
};
// Получение сообщений
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch(data.type) {
case 'connected':
console.log('Connected, user_id:', data.user_id);
break;
case 'task_update':
console.log('Task updated:', data);
updateUI(data);
break;
case 'chat_message':
addMessage(data.text);
break;
}
};
// Обработка ошибок
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
// Обработка отключения
ws.onclose = (event) => {
console.log('Disconnected:', event.code, event.reason);
// Автоматическое переподключение
setTimeout(() => connectWebSocket(), 5000);
};from aiohttp import web
@web.middleware
async def websocket_cors_middleware(request, handler):
if request.headers.get('Upgrade') == 'websocket':
# WebSocket не требует CORS preflight
pass
response = await handler(request)
# Добавить CORS заголовки
response.headers['Access-Control-Allow-Origin'] = '*'
return responseУбедитесь, что вы понимаете:
Переходите к вопросам для закрепления.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.