JWT access + refresh токены, bcrypt, auth middleware, role-based доступ, logout и отзыв токенов.
Научитесь защищать API с помощью JWT-токенов, хешировать пароли и реализовывать систему прав доступа
Никогда не храните пароли в открытом виде! Используйте bcrypt или argon2.
pip install passlib[bcrypt]# app/utils/password.py
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
def hash_password(password: str) -> str:
"""Хеширование пароля"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Проверка пароля"""
return pwd_context.verify(plain_password, hashed_password)# При регистрации
hashed = hash_password('user_password')
# Сохранить hashed в БД
# При логине
if verify_password('entered_password', stored_hash):
# Пароль верный
else:
# Неверный парольJWT (JSON Web Token) — стандарт для передачи claims (утверждений) между сторонами.
Структура: header.payload.signature
pip install PyJWT# app/utils/jwt.py
import jwt
from datetime import datetime, timedelta
from ..config import settings
def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
"""Создание JWT-токена"""
to_encode = data.copy()
# Время истечения
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({'exp': expire})
# Создание токена
encoded_jwt = jwt.encode(
to_encode,
settings.jwt_secret,
algorithm=settings.jwt_algorithm
)
return encoded_jwt
def decode_access_token(token: str) -> dict | None:
"""Декодирование и валидация токена"""
try:
payload = jwt.decode(
token,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm]
)
return payload
except jwt.ExpiredSignatureError:
# Токен истёк
return None
except jwt.InvalidTokenError:
# Неверный токен
return None# app/repositories/users.py
import asyncpg
from typing import Optional
class UserRepository:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def get_by_email(self, email: str) -> Optional[dict]:
"""Получить пользователя по email"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT * FROM users WHERE email = $1',
email
)
return dict(row) if row else None
async def create(
self,
email: str,
password_hash: str,
name: str
) -> dict:
"""Создать пользователя"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'''
INSERT INTO users (email, password_hash, name)
VALUES ($1, $2, $3)
RETURNING id, email, name, role, created_at
''',
email, password_hash, name
)
return dict(row)# app/handlers/auth.py
from aiohttp import web
from datetime import timedelta
from ..utils.password import hash_password, verify_password
from ..utils.jwt import create_access_token
from ..config import settings
async def register(request: web.Request) -> web.Response:
"""POST /api/v1/auth/register — регистрация"""
try:
data = await request.json()
email = data['email']
password = data['password']
name = data.get('name', '')
# Валидация
if not email or '@' not in email:
raise web.HTTPBadRequest(text='Invalid email')
if len(password) < 8:
raise web.HTTPBadRequest(text='Password too short')
except (KeyError, ValueError):
raise web.HTTPBadRequest(text='Invalid data')
repo = request.app['user_repo']
# Проверка на дубликат
existing = await repo.get_by_email(email)
if existing:
raise web.HTTPConflict(text='Email already registered')
# Хеширование и создание
password_hash = hash_password(password)
user = await repo.create(email, password_hash, name)
return web.json_response({
'id': user['id'],
'email': user['email'],
'name': user['name']
}, status=201)
async def login(request: web.Request) -> web.Response:
"""POST /api/v1/auth/login — логин"""
try:
data = await request.json()
email = data['email']
password = data['password']
except KeyError:
raise web.HTTPBadRequest(text='email and password required')
repo = request.app['user_repo']
user = await repo.get_by_email(email)
if not user:
# Не раскрываем, существует ли пользователь
raise web.HTTPUnauthorized(text='Invalid credentials')
if not verify_password(password, user['password_hash']):
raise web.HTTPUnauthorized(text='Invalid credentials')
# Создание токена
access_token = create_access_token(
data={'sub': str(user['id']), 'email': user['email']},
expires_delta=timedelta(days=7)
)
return web.json_response({
'access_token': access_token,
'token_type': 'bearer',
'user': {
'id': user['id'],
'email': user['email'],
'name': user['name']
}
})# app/middleware/auth.py
from aiohttp import web
from functools import wraps
from ..utils.jwt import decode_access_token
# Пути, не требующие аутентификации
PUBLIC_PATHS = {
'/api/v1/auth/login',
'/api/v1/auth/register',
'/health',
'/ws'
}
@web.middleware
async def auth_middleware(request: web.Request, handler):
"""Проверка JWT-токена"""
# Пропуск публичных роутов
if request.path in PUBLIC_PATHS:
return await handler(request)
# Проверка заголовка Authorization
auth_header = request.headers.get('Authorization')
if not auth_header:
raise web.HTTPUnauthorized(text='Authorization header required')
# Проверка формата Bearer
parts = auth_header.split()
if len(parts) != 2 or parts[0] != 'Bearer':
raise web.HTTPUnauthorized(text='Invalid authorization header')
token = parts[1]
# Декодирование токена
payload = decode_access_token(token)
if not payload:
raise web.HTTPUnauthorized(text='Invalid or expired token')
# Добавление пользователя в request
request['user'] = {
'id': int(payload['sub']),
'email': payload['email']
}
return await handler(request)# app/models/user.py
from enum import Enum
class UserRole(str, Enum):
USER = 'user'
ADMIN = 'admin'
MODERATOR = 'moderator'# app/middleware/authorization.py
from aiohttp import web
from functools import wraps
from enum import Enum
def require_role(*allowed_roles):
"""Декоратор для проверки роли пользователя"""
def decorator(handler):
@wraps(handler)
async def wrapper(request):
user = request.get('user')
if not user:
raise web.HTTPUnauthorized(text='Authentication required')
user_role = user.get('role', 'user')
if user_role not in allowed_roles:
raise web.HTTPForbidden(
text=f'Required role: {", ".join(allowed_roles)}'
)
return await handler(request)
return wrapper
return decorator
# Использование в handlers
@require_role('admin', 'moderator')
async def delete_any_task(request):
"""Удалить любую задачу (только админ/модератор)"""
task_id = int(request.match_info['id'])
# ...async def update_task(request):
"""Обновление задачи"""
user = request['user']
task_id = int(request.match_info['id'])
repo = request.app['task_repo']
task = await repo.get(task_id)
if not task:
raise web.HTTPNotFound(text='Task not found')
# Проверка: владелец или админ
if task['user_id'] != user['id'] and user.get('role') != 'admin':
raise web.HTTPForbidden(text='Not enough permissions')
# Обновление
updated = await repo.update(task_id, **data)
return web.json_response(updated)Для долгой сессии используйте пару access + refresh токенов:
from datetime import timedelta
async def login(request):
# ... проверка credentials ...
# Короткоживущий access token (15 минут)
access_token = create_access_token(
data={'sub': str(user['id'])},
expires_delta=timedelta(minutes=15)
)
# Долгоживущий refresh token (30 дней)
refresh_token = create_access_token(
data={'sub': str(user['id']), 'type': 'refresh'},
expires_delta=timedelta(days=30)
)
# Сохранение refresh token в БД для возможности отзыва
await save_refresh_token(user['id'], refresh_token)
return web.json_response({
'access_token': access_token,
'refresh_token': refresh_token
})
async def refresh_token(request):
"""POST /api/v1/auth/refresh — обновление access токена"""
try:
data = await request.json()
refresh_token = data['refresh_token']
except KeyError:
raise web.HTTPBadRequest(text='refresh_token required')
payload = decode_access_token(refresh_token)
if not payload or payload.get('type') != 'refresh':
raise web.HTTPUnauthorized(text='Invalid refresh token')
# Проверка в БД (не отозван ли)
if not await is_valid_refresh_token(refresh_token):
raise web.HTTPUnauthorized(text='Token revoked')
# Новый access token
new_access_token = create_access_token(
data={'sub': payload['sub']},
expires_delta=timedelta(minutes=15)
)
return web.json_response({'access_token': new_access_token})async def logout(request):
"""POST /api/v1/auth/logout — выход"""
user = request['user']
# Отзыв refresh токена
refresh_token = request.query.get('refresh_token')
if refresh_token:
await revoke_refresh_token(refresh_token)
return web.json_response({'message': 'Logged out'})Всегда используйте HTTPS! Токены передаются в заголовках и могут быть перехвачены.
# Защита от brute force
@rate_limit(requests=5, window_seconds=60)
async def login(request):
...from aiohttp import web
response = web.json_response({'token': access_token})
response.set_cookie(
'access_token',
access_token,
httponly=True, # Не доступно через JavaScript
secure=True, # Только HTTPS
samesite='Strict', # Защита от CSRF
max_age=900 # 15 минут
)
return responseУбедитесь, что вы понимаете:
Переходите к вопросам для закрепления.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.