Управление сессиями, JWT blacklist, хранение токенов в Redis
Redis идеально подходит для управления сессиями и токенами аутентификации: быстрый доступ, TTL для авто-очистки, атомарные операции для безопасного обновления.
import redis.asyncio as redis
import uuid
import json
from typing import Optional
from datetime import datetime, timedelta
class SessionStore:
def __init__(
self,
redis_client: redis.Redis,
session_ttl: int = 3600, # 1 час
extend_on_access: bool = True,
):
self.redis = redis_client
self.session_ttl = session_ttl
self.extend_on_access = extend_on_access
self.key_prefix = 'session:'
def _key(self, session_id: str) -> str:
return f'{self.key_prefix}{session_id}'
async def create(self, data: dict) -> str:
"""Создание новой сессии"""
session_id = str(uuid.uuid4())
key = self._key(session_id)
session_data = {
'id': session_id,
'created_at': datetime.utcnow().isoformat(),
'last_accessed': datetime.utcnow().isoformat(),
**data
}
await self.redis.setex(
key,
self.session_ttl,
json.dumps(session_data),
)
return session_id
async def get(self, session_id: str) -> Optional[dict]:
"""Получение сессии"""
key = self._key(session_id)
data = await self.redis.get(key)
if not data:
return None
session = json.loads(data)
# Продление TTL при доступе
if self.extend_on_access:
await self.redis.expire(key, self.session_ttl)
session['last_accessed'] = datetime.utcnow().isoformat()
return session
async def update(self, session_id: str, data: dict) -> bool:
"""Обновление сессии"""
key = self._key(session_id)
existing = await self.redis.get(key)
if not existing:
return False
session = json.loads(existing)
session.update(data)
session['updated_at'] = datetime.utcnow().isoformat()
await self.redis.setex(key, self.session_ttl, json.dumps(session))
return True
async def delete(self, session_id: str) -> bool:
"""Удаление сессии (logout)"""
key = self._key(session_id)
result = await self.redis.delete(key)
return result > 0
async def extend(self, session_id: str, ttl: int | None = None) -> bool:
"""Продление сессии"""
key = self._key(session_id)
exists = await self.redis.exists(key)
if not exists:
return False
await self.redis.expire(key, ttl or self.session_ttl)
return True
# Использование в FastAPI
from fastapi import FastAPI, Depends, HTTPException, status, Request
from fastapi.responses import JSONResponse
app = FastAPI()
session_store = SessionStore(app.state.redis, session_ttl=3600)
async def get_current_session(request: Request) -> Optional[dict]:
"""Dependency для получения текущей сессии"""
session_id = request.cookies.get('session_id')
if not session_id:
return None
session = await session_store.get(session_id)
if session:
request.state.session = session
request.state.session_id = session_id
return session
@app.post("/login")
async def login(username: str, password: str):
# Проверка credentials (в реальности — запрос к БД)
user = await authenticate_user(username, password)
if not user:
raise HTTPException(status_code=401, detail='Invalid credentials')
# Создание сессии
session_id = await session_store.create({
'user_id': user['id'],
'username': user['username'],
'role': user['role'],
})
response = JSONResponse({'status': 'logged in'})
response.set_cookie(
key='session_id',
value=session_id,
httponly=True,
secure=True, # Только HTTPS
samesite='lax',
max_age=3600,
)
return response
@app.get("/me")
async def get_me(session: dict = Depends(get_current_session)):
if not session:
raise HTTPException(status_code=401, detail='Not authenticated')
return {'user_id': session['user_id'], 'username': session['username']}
@app.post("/logout")
async def logout(session: dict = Depends(get_current_session)):
if session:
await session_store.delete(session['id'])
response = JSONResponse({'status': 'logged out'})
response.delete_cookie('session_id')
return responseclass UserSession:
def __init__(self, redis_client: redis.Redis, ttl: int = 3600):
self.redis = redis_client
self.ttl = ttl
self.key_prefix = 'user:session:'
async def create_for_user(self, user_id: int, user_data: dict) -> str:
"""Создание сессии для пользователя"""
session_id = str(uuid.uuid4())
key = self._key(session_id)
# Используем Hash для эффективного хранения
session_data = {
'user_id': str(user_id),
'created_at': datetime.utcnow().isoformat(),
'last_accessed': datetime.utcnow().isoformat(),
**{k: str(v) for k, v in user_data.items()}
}
await self.redis.hset(key, mapping=session_data)
await self.redis.expire(key, self.ttl)
# Индекс: user_id -> session_id (для поиска сессий пользователя)
await self.redis.sadd(f'user:sessions:{user_id}', session_id)
return session_id
def _key(self, session_id: str) -> str:
return f'{self.key_prefix}{session_id}'
async def get(self, session_id: str) -> Optional[dict]:
"""Получение сессии"""
key = self._key(session_id)
data = await self.redis.hgetall(key)
if not data:
return None
# Продление TTL
await self.redis.expire(key, self.ttl)
# Обновление last_accessed
await self.redis.hset(key, 'last_accessed', datetime.utcnow().isoformat())
return data
async def get_user_sessions(self, user_id: int) -> list[str]:
"""Получение всех активных сессий пользователя"""
session_ids = await self.redis.smembers(f'user:sessions:{user_id}')
return list(session_ids)
async def revoke_all_user_sessions(self, user_id: int):
"""Отзыв всех сессий пользователя (например, при смене пароля)"""
session_ids = await self.get_user_sessions(user_id)
if session_ids:
keys = [self._key(sid) for sid in session_ids]
await self.redis.delete(*keys)
await self.redis.delete(f'user:sessions:{user_id}')
async def update_field(self, session_id: str, field: str, value: str) -> bool:
"""Обновление одного поля сессии"""
key = self._key(session_id)
exists = await self.redis.exists(key)
if not exists:
return False
await self.redis.hset(key, field, value)
return TrueПри использовании JWT токенов часто требуется механизм отзыва токенов до истечения их срока действия (logout, смена пароля, блокировка).
import redis.asyncio as redis
from datetime import datetime, timedelta
from typing import Optional
class JWTBlacklist:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.key_prefix = 'jwt:blacklist:'
async def add(self, jti: str, exp: datetime):
"""
Добавление токена в blacklist.
jti: уникальный ID токена (JWT ID claim)
exp: время истечения токена
"""
key = f'{self.key_prefix}{jti}'
ttl = int((exp - datetime.utcnow()).total_seconds())
if ttl > 0:
await self.redis.setex(key, ttl, 'blacklisted')
async def is_blacklisted(self, jti: str) -> bool:
"""Проверка, находится ли токен в blacklist"""
key = f'{self.key_prefix}{jti}'
return await self.redis.exists(key) > 0
async def remove(self, jti: str) -> bool:
"""Удаление токена из blacklist (редко используется)"""
key = f'{self.key_prefix}{jti}'
result = await self.redis.delete(key)
return result > 0
# Middleware для проверки blacklist
from fastapi import FastAPI, Request, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
app = FastAPI()
security = HTTPBearer()
blacklist = JWTBlacklist(app.state.redis)
async def verify_token_not_blacklisted(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
):
"""Проверка, что токен не в blacklist"""
# Извлекаем jti из токена (требуется декодирование без проверки подписи)
import jwt
try:
# Декодируем без проверки для получения jti
payload = jwt.decode(credentials.credentials, options={'verify_signature': False})
jti = payload.get('jti')
if not jti:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Token missing jti claim',
)
# Проверка blacklist
if await blacklist.is_blacklisted(jti):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Token has been revoked',
headers={'WWW-Authenticate': 'Bearer'},
)
# Сохраняем payload для дальнейшего использования
request.state.token_payload = payload
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid token',
headers={'WWW-Authenticate': 'Bearer'},
)
@app.post("/logout")
async def logout(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
):
"""Logout — добавление токена в blacklist"""
import jwt
# Декодируем токен для получения exp и jti
payload = jwt.decode(
credentials.credentials,
SECRET_KEY,
algorithms=['HS256'],
)
jti = payload['jti']
exp = datetime.fromtimestamp(payload['exp'])
# Добавляем в blacklist до истечения токена
await blacklist.add(jti, exp)
return {'status': 'logged out'}class RefreshTokenStore:
def __init__(self, redis_client: redis.Redis, ttl_days: int = 30):
self.redis = redis_client
self.ttl = ttl_days * 24 * 3600 # дни в секунды
self.key_prefix = 'refresh_token:'
self.user_key_prefix = 'user:refresh_tokens:'
async def store(
self,
user_id: int,
refresh_token: str,
access_token_jti: str,
):
"""Сохранение refresh токена"""
key = f'{self.key_prefix}{refresh_token}'
# Храним данные токена
await self.redis.setex(
key,
self.ttl,
json.dumps({
'user_id': user_id,
'access_token_jti': access_token_jti,
'created_at': datetime.utcnow().isoformat(),
}),
)
# Индекс: пользователь -> refresh токены
await self.redis.sadd(f'{self.user_key_prefix}{user_id}', refresh_token)
async def get(self, refresh_token: str) -> Optional[dict]:
"""Получение данных refresh токена"""
key = f'{self.key_prefix}{refresh_token}'
data = await self.redis.get(key)
return json.loads(data) if data else None
async def revoke(self, refresh_token: str) -> bool:
"""Отзыв refresh токена"""
key = f'{self.key_prefix}{refresh_token}'
# Получаем user_id для удаления из индекса
data = await self.get(refresh_token)
if data:
user_id = data['user_id']
await self.redis.srem(f'{self.user_key_prefix}{user_id}', refresh_token)
return await self.redis.delete(key) > 0
async def revoke_all_user_tokens(self, user_id: int):
"""Отзыв всех refresh токенов пользователя"""
tokens = await self.redis.smembers(f'{self.user_key_prefix}{user_id}')
if tokens:
keys = [f'{self.key_prefix}{t}' for t in tokens]
await self.redis.delete(*keys)
await self.redis.delete(f'{self.user_key_prefix}{user_id}')
async def rotate(
self,
old_refresh_token: str,
new_refresh_token: str,
new_access_token_jti: str,
) -> Optional[dict]:
"""
Rotation refresh токена.
Старый токен отзывается, новый создаётся.
"""
# Получаем данные старого токена
old_data = await self.get(old_refresh_token)
if not old_data:
return None
user_id = old_data['user_id']
# Отзываем старый токен
await self.revoke(old_refresh_token)
# Создаём новый
await self.store(user_id, new_refresh_token, new_access_token_jti)
return {'user_id': user_id}
# Endpoint для refresh токена
@app.post("/refresh")
async def refresh_token(refresh_token: str):
token_store = RefreshTokenStore(app.state.redis)
# Rotation
result = await token_store.rotate(
old_refresh_token=refresh_token,
new_refresh_token=str(uuid.uuid4()),
new_access_token_jti=str(uuid.uuid4()),
)
if not result:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid or expired refresh token',
)
# Генерируем новые access и refresh токены
new_access_token = create_access_token(result['user_id'])
new_refresh_token_value = result['new_refresh_token']
return {
'access_token': new_access_token,
'refresh_token': new_refresh_token_value,
}class AuthRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def check_login_attempt(self, identifier: str) -> tuple[bool, int]:
"""
Проверка попытки входа.
identifier: username или email
Returns:
(allowed, retry_after_seconds)
"""
key = f'auth:login_attempts:{identifier}'
# Получаем текущее количество попыток
attempts = int(await self.redis.get(key) or 0)
# Лимит: 5 попыток в час
if attempts >= 5:
ttl = await self.redis.ttl(key)
return False, ttl if ttl > 0 else 3600
# Инкремент попыток
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, 3600) # 1 час
await pipe.execute()
return True, 0
async def record_failed_login(self, identifier: str, ip_address: str):
"""Запись неудачной попытки входа"""
# Инкремент попыток для identifier
key = f'auth:login_attempts:{identifier}'
await self.redis.incr(key)
await self.redis.expire(key, 3600)
# Логирование по IP
ip_key = f'auth:failed_logins:ip:{ip_address}'
await self.redis.zadd(ip_key, {identifier: time.time()})
await self.redis.expire(ip_key, 3600)
async def is_ip_blocked(self, ip_address: str) -> bool:
"""Проверка, заблокирован ли IP (слишком много неудачных попыток)"""
ip_key = f'auth:failed_logins:ip:{ip_address}'
count = await self.redis.zcard(ip_key)
return count >= 10 # Блокировка после 10 неудачных попыток с IP@app.post("/login")
async def login(
username: str,
password: str,
request: Request,
):
auth_limiter = AuthRateLimiter(app.state.redis)
# Проверка IP блокировки
ip_address = request.client.host
if await auth_limiter.is_ip_blocked(ip_address):
raise HTTPException(
status_code=429,
detail='Too many failed login attempts. Please try again later.',
headers={'Retry-After': '3600'},
)
# Проверка лимита попыток для пользователя
allowed, retry_after = await auth_limiter.check_login_attempt(username)
if not allowed:
raise HTTPException(
status_code=429,
detail='Too many login attempts. Please try again later.',
headers={'Retry-After': str(retry_after)},
)
# Проверка credentials
user = await authenticate_user(username, password)
if not user:
# Запись неудачной попытки
await auth_limiter.record_failed_login(username, ip_address)
raise HTTPException(status_code=401, detail='Invalid credentials')
# Успешный вход — очистка счётчика
await app.state.redis.delete(f'auth:login_attempts:{username}')
# Создание сессии
session_id = await session_store.create({
'user_id': user['id'],
'username': user['username'],
})
response = JSONResponse({'status': 'logged in'})
response.set_cookie('session_id', session_id, httponly=True, secure=True)
return responseclass VerificationCodeStore:
def __init__(self, redis_client: redis.Redis, ttl_minutes: int = 10):
self.redis = redis_client
self.ttl = ttl_minutes * 60
self.key_prefix = 'verification:'
async def create(
self,
user_id: int,
code_type: str, # 'email_verify', 'password_reset', '2fa'
code: str,
):
"""Создание verification кода"""
key = f'{self.key_prefix}{code_type}:{user_id}'
await self.redis.setex(key, self.ttl, code)
return code
async def verify(self, user_id: int, code_type: str, code: str) -> bool:
"""Проверка кода"""
key = f'{self.key_prefix}{code_type}:{user_id}'
stored_code = await self.redis.get(key)
if stored_code == code:
await self.redis.delete(key) # Одноразовый код
return True
return False
async def delete(self, user_id: int, code_type: str):
"""Удаление кода (например, после успешной верификации)"""
key = f'{self.key_prefix}{code_type}:{user_id}'
await self.redis.delete(key)
# Пример использования для password reset
@app.post("/password-reset/request")
async def request_password_reset(email: str):
user = await get_user_by_email(email)
if not user:
# Не раскрываем, существует ли пользователь
return {'status': 'If the email exists, a reset code has been sent'}
# Генерация кода
code = str(random.randint(100000, 999999))
code_store = VerificationCodeStore(app.state.redis)
await code_store.create(user['id'], 'password_reset', code)
# Отправка email (в реальности — через очередь задач)
await send_reset_email(email, code)
return {'status': 'If the email exists, a reset code has been sent'}
@app.post("/password-reset/confirm")
async def confirm_password_reset(
email: str,
code: str,
new_password: str,
):
user = await get_user_by_email(email)
if not user:
raise HTTPException(status_code=400, detail='Invalid code')
code_store = VerificationCodeStore(app.state.redis)
valid = await code_store.verify(user['id'], 'password_reset', code)
if not valid:
raise HTTPException(status_code=400, detail='Invalid or expired code')
# Сброс пароля
await update_user_password(user['id'], new_password)
# Отзыв всех сессий пользователя
await session_store.revoke_all_user_sessions(user['id'])
return {'status': 'Password has been reset'}| Аспект | Рекомендация |
|---|---|
| TTL сессий | 1-24 часа для веб, 7-30 дней для мобильных |
| Extend on access | Продлевать TTL при каждом обращении для активных пользователей |
| JWT Blacklist | Обязательно для logout при использовании JWT |
| Refresh token rotation | Отзыв старого токена при использовании нового |
| Rate limiting | 5-10 попыток входа в час на пользователя/IP |
| Secure cookies | httponly=True, secure=True, samesite='lax' или 'strict' |
| Индексация | Хранить mapping user_id → session_ids для быстрого отзыва |
Проверьте понимание → ответьте на вопросы в sessions_auth.json
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.