Real-time коммуникация, чаты, уведомления, broadcast
WebSocket обеспечивает двустороннюю связь в реальном времени. В этой теме вы научитесь создавать чаты, уведомления, live-обновления и broadcast рассылки.
WebSocket — протокол двусторонней связи поверх TCP, позволяющий серверу и клиенту обмениваться данными в реальном времени.
| Характеристика | HTTP | WebSocket |
|---|---|---|
| Связь | Односторонняя (запрос-ответ) | Двусторонняя |
| Соединение | Закрывается после ответа | Постоянное |
| Инициатива | Только клиент | Клиент и сервер |
| Накладные расходы | Заголовки каждый запрос | Минимальные после handshake |
| Use case | REST API, CRUD | Чаты, уведомления, игры, трейдинг |
1. Клиент → HTTP Upgrade запрос
GET /ws HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
2. Сервер → HTTP 101 Switching Protocols
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
3. Двусторонняя связь установлена
Клиент ↔ Сервер (данные в обоих направлениях)
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
"""
Простой WebSocket endpoint.
"""
# Принятие соединения
await websocket.accept()
while True:
# Получение данных от клиента
data = await websocket.receive_text()
# Отправка ответа
await websocket.send_text(f"Message received: {data}")const ws = new WebSocket('ws://localhost:8000/ws');
ws.onopen = () => {
console.log('Connected');
ws.send('Hello Server!');
};
ws.onmessage = (event) => {
console.log('Received:', event.data);
};
ws.onclose = () => {
console.log('Disconnected');
};
ws.onerror = (error) => {
console.error('Error:', error);
};@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
# Текст
text_data = await websocket.receive_text()
await websocket.send_text(f"Text: {text_data}")
# Байты
bytes_data = await websocket.receive_bytes()
await websocket.send_bytes(b"Received bytes")
# JSON
json_data = await websocket.receive_json()
await websocket.send_json({"echo": json_data})import asyncio
@app.websocket('/ws/stream')
async def websocket_stream(websocket: WebSocket):
await websocket.accept()
for i in range(10):
await websocket.send_json({"count": i})
await asyncio.sleep(1)
await websocket.close()@app.websocket('/ws/{client_id}')
async def websocket_with_params(websocket: WebSocket, client_id: int):
await websocket.accept()
await websocket.send_text(f"Hello client {client_id}")
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Client {client_id}: {data}")@app.websocket('/ws')
async def websocket_with_query(
websocket: WebSocket,
token: str | None = None
):
await websocket.accept()
if token:
await websocket.send_text(f"Authenticated as {token}")
else:
await websocket.send_text("No token provided")from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict
import asyncio
class ConnectionManager:
"""
Управляет WebSocket подключениями.
"""
def __init__(self):
# Активные подключения: {client_id: websocket}
self.active_connections: Dict[int, WebSocket] = {}
async def connect(self, websocket: WebSocket, client_id: int):
"""Принять соединение и сохранить"""
await websocket.accept()
self.active_connections[client_id] = websocket
def disconnect(self, client_id: int):
"""Удалить подключение"""
if client_id in self.active_connections:
del self.active_connections[client_id]
async def send_personal_message(self, message: str, client_id: int):
"""Отправить сообщение одному клиенту"""
if client_id in self.active_connections:
await self.active_connections[client_id].send_text(message)
async def broadcast(self, message: str):
"""Отправить сообщение всем клиентам"""
for websocket in self.active_connections.values():
await websocket.send_text(message)
async def broadcast_json(self, data: dict):
"""Отправить JSON всем клиентам"""
for websocket in self.active_connections.values():
await websocket.send_json(data)
manager = ConnectionManager()
app = FastAPI()
@app.websocket('/ws/{client_id}')
async def websocket_chat(websocket: WebSocket, client_id: int):
await manager.connect(websocket, client_id)
try:
await manager.broadcast(f"Client {client_id} joined")
while True:
data = await websocket.receive_text()
# Отправить всем, включая отправителя
await manager.broadcast(f"Client {client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(client_id)
await manager.broadcast(f"Client {client_id} left")from typing import Dict, Set, List
class RoomManager:
"""
Управляет комнатами для чата.
"""
def __init__(self):
# Комнаты: {room_id: {client_id: websocket}}
self.rooms: Dict[str, Dict[int, WebSocket]] = {}
# Клиенты: {client_id: room_id}
self.client_rooms: Dict[int, str] = {}
def create_room(self, room_id: str):
if room_id not in self.rooms:
self.rooms[room_id] = {}
async def join_room(self, websocket: WebSocket, client_id: int, room_id: str):
await websocket.accept()
if room_id not in self.rooms:
self.create_room(room_id)
self.rooms[room_id][client_id] = websocket
self.client_rooms[client_id] = room_id
# Уведомить участников комнаты
await self.broadcast_to_room(
room_id,
f"Client {client_id} joined the room"
)
def leave_room(self, client_id: int):
if client_id in self.client_rooms:
room_id = self.client_rooms[client_id]
del self.rooms[room_id][client_id]
del self.client_rooms[client_id]
return room_id
return None
async def send_to_room(self, room_id: str, message: str, sender_id: int):
"""Отправить сообщение в комнату"""
if room_id in self.rooms:
for client_id, websocket in self.rooms[room_id].items():
if client_id != sender_id: # Не отправлять отправителю
await websocket.send_text(message)
async def broadcast_to_room(self, room_id: str, message: str):
"""Отправить сообщение всем в комнате (включая отправителя)"""
if room_id in self.rooms:
for websocket in self.rooms[room_id].values():
await websocket.send_text(message)
room_manager = RoomManager()
@app.websocket('/ws/room/{room_id}/{client_id}')
async def websocket_room(
websocket: WebSocket,
room_id: str,
client_id: int
):
await room_manager.join_room(websocket, client_id, room_id)
try:
while True:
data = await websocket.receive_text()
# Отправить в комнату
await room_manager.send_to_room(
room_id,
f"Client {client_id}: {data}",
client_id
)
except WebSocketDisconnect:
room_id = room_manager.leave_room(client_id)
if room_id:
await room_manager.broadcast_to_room(
room_id,
f"Client {client_id} left the room"
)from jose import JWTError, jwt
SECRET_KEY = "secret"
async def get_user_from_token(token: str):
"""Проверка токена и получение пользователя"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
username: str = payload.get("sub")
if username is None:
return None
return username
except JWTError:
return None
@app.websocket('/ws/auth')
async def websocket_auth(
websocket: WebSocket,
token: str | None = None
):
# Проверка токена
if not token:
await websocket.close(code=4001, reason="No token")
return
username = await get_user_from_token(token)
if not username:
await websocket.close(code=4002, reason="Invalid token")
return
# Принять соединение
await websocket.accept()
await websocket.send_text(f"Welcome, {username}!")
while True:
data = await websocket.receive_text()
await websocket.send_text(f"{username}: {data}")const token = localStorage.getItem('token');
const ws = new WebSocket(`ws://localhost:8000/ws/auth?token=${token}`);import asyncio
from contextlib import asynccontextmanager
class BackgroundBroadcaster:
"""
Фоновая рассылка данных всем подключенным клиентам.
"""
def __init__(self):
self.connections: List[WebSocket] = []
self.task = None
self.running = False
async def add_connection(self, websocket: WebSocket):
self.connections.append(websocket)
def remove_connection(self, websocket: WebSocket):
if websocket in self.connections:
self.connections.remove(websocket)
async def broadcast(self, data: dict):
"""Отправить данные всем"""
if self.connections:
await asyncio.gather(
*[ws.send_json(data) for ws in self.connections],
return_exceptions=True
)
async def background_task(self):
"""Фоновая задача для периодической рассылки"""
counter = 0
while self.running:
await asyncio.sleep(5)
counter += 1
message = {
"type": "heartbeat",
"count": counter,
"connections": len(self.connections)
}
# Отправить всем, обработать ошибки
for ws in self.connections[:]:
try:
await ws.send_json(message)
except:
self.connections.remove(ws)
broadcaster = BackgroundBroadcaster()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Запуск фоновой задачи при старте"""
broadcaster.running = True
broadcaster.task = asyncio.create_task(broadcaster.background_task())
yield
broadcaster.running = False
if broadcaster.task:
broadcaster.task.cancel()
app = FastAPI(lifespan=lifespan)
@app.websocket('/ws/live')
async def websocket_live(websocket: WebSocket):
await websocket.accept()
await broadcaster.add_connection(websocket)
try:
while True:
data = await websocket.receive_text()
await broadcaster.broadcast({
"type": "message",
"data": data
})
except WebSocketDisconnect:
broadcaster.remove_connection(websocket)from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from typing import List, Dict
from datetime import datetime
import asyncio
app = FastAPI()
# === Менеджер подключений ===
class ChatManager:
def __init__(self):
self.connections: Dict[str, WebSocket] = {}
self.message_history: List[dict] = []
async def connect(self, websocket: WebSocket, username: str):
await websocket.accept()
self.connections[username] = websocket
# Отправить историю новому пользователю
for msg in self.message_history[-50:]:
await websocket.send_json(msg)
# Уведомить остальных
await self.broadcast_system(f"{username} joined the chat")
def disconnect(self, username: str):
if username in self.connections:
del self.connections[username]
async def broadcast_system(self, message: str):
"""Системное сообщение всем"""
msg = {
"type": "system",
"message": message,
"timestamp": datetime.utcnow().isoformat()
}
self.message_history.append(msg)
for ws in self.connections.values():
try:
await ws.send_json(msg)
except:
pass
async def send_message(self, username: str, text: str):
"""Сообщение от пользователя"""
msg = {
"type": "message",
"username": username,
"text": text,
"timestamp": datetime.utcnow().isoformat()
}
self.message_history.append(msg)
for ws in self.connections.values():
try:
await ws.send_json(msg)
except:
pass
chat_manager = ChatManager()
# === HTML интерфейс ===
@app.get('/')
async def get():
return HTMLResponse('''
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
<style>
body { font-family: Arial; max-width: 800px; margin: 0 auto; }
#messages { height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px; }
.message { margin: 5px 0; }
.system { color: #888; font-style: italic; }
.username { font-weight: bold; color: #0066cc; }
#input { width: 70%; padding: 10px; }
#send { width: 25%; padding: 10px; }
</style>
</head>
<body>
<h1>WebSocket Chat</h1>
<div id="messages"></div>
<input id="username" placeholder="Your name" style="width: 25%; padding: 10px;">
<input id="input" placeholder="Message...">
<button id="send">Send</button>
<script>
const usernameInput = document.getElementById('username');
const messageInput = document.getElementById('input');
const sendButton = document.getElementById('send');
const messagesDiv = document.getElementById('messages');
let ws;
function connect() {
const username = usernameInput.value.trim();
if (!username) {
alert('Please enter your name');
return;
}
ws = new WebSocket(`ws://localhost:8000/ws/chat/${encodeURIComponent(username)}`);
ws.onopen = () => {
console.log('Connected');
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
const div = document.createElement('div');
div.className = 'message ' + data.type;
if (data.type === 'system') {
div.textContent = data.message;
} else {
div.innerHTML = `<span class="username">${data.username}:</span> ${data.text}`;
}
messagesDiv.appendChild(div);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
ws.onclose = () => {
console.log('Disconnected');
setTimeout(connect, 3000);
};
}
sendButton.onclick = () => {
const text = messageInput.value.trim();
if (text && ws && ws.readyState === WebSocket.OPEN) {
ws.send(text);
messageInput.value = '';
}
};
messageInput.onkeypress = (e) => {
if (e.key === 'Enter') sendButton.onclick();
};
connect();
</script>
</body>
</html>
''')
# === WebSocket ===
@app.websocket('/ws/chat/{username}')
async def websocket_chat(websocket: WebSocket, username: str):
await chat_manager.connect(websocket, username)
try:
while True:
text = await websocket.receive_text()
await chat_manager.send_message(username, text)
except WebSocketDisconnect:
chat_manager.disconnect(username)
await chat_manager.broadcast_system(f"{username} left the chat")from fastapi.testclient import TestClient
client = TestClient(app)
def test_websocket():
with client.websocket_connect('/ws') as websocket:
websocket.send_text('Hello')
data = websocket.receive_text()
assert data == 'Message received: Hello'
def test_websocket_with_params():
with client.websocket_connect('/ws/123') as websocket:
data = websocket.receive_text()
assert 'Hello client 123' in data@app.websocket('/ws')
async def ws(websocket: WebSocket):
# Забыли accept()!
data = await websocket.receive_text()Проблема: Соединение не принято, клиент не сможет отправить данные.
Решение: Всегда вызывайте await websocket.accept() перед receive().
while True:
data = await websocket.receive_text()
# При отключении — ошибка!Решение:
try:
while True:
data = await websocket.receive_text()
except WebSocketDisconnect:
# Корректная обработка
passwhile True:
data = await websocket.receive_text()
time.sleep(10) # Блокирует все подключения!Решение: Используйте await asyncio.sleep(10).
websocket_connect()В следующей теме вы изучите фоновые задачи (BackgroundTasks) — отложенное выполнение после отправки ответа.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.