OAuth2, JWT, сессии, rate limiting, защита от OWASP Top 10, безопасное хранение секретов.
Безопасность — это не функция, а фундамент. Правильная архитектура аутентификации и защиты от атак определяет надёжность всей системы.
Аутентификация (Authentication) — процесс проверки личности пользователя. Ответ на вопрос: «Кто ты?»
Авторизация (Authorization) — процесс проверки прав доступа. Ответ на вопрос: «Что тебе разрешено делать?»
# FastAPI пример разделения
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
"""Аутентификация: проверяем токен и возвращаем пользователя"""
user = validate_jwt_token(token) # Кто ты?
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
async def require_admin(user: dict = Depends(get_current_user)) -> dict:
"""Авторизация: проверяем роль пользователя"""
if user.get("role") != "admin": # Что разрешено?
raise HTTPException(status_code=403, detail="Not enough permissions")
return user
@app.get("/admin/users")
async def list_users(admin: dict = Depends(require_admin)):
# Только админы могут получить этот список
return {"users": [...]}Ключевое различие:
OAuth 2.0 — стандарт делегированного доступа. Позволяет приложению получить ограниченный доступ к ресурсам пользователя без передачи пароля.
Самый безопасный flow для серверных приложений.
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Client │ │ User │ │ Auth │ │ Resource│
│ (App) │ │(Browser)│ │ Server │ │ Server │
└────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘
│ │ │ │
│ 1. Redirect │ │ │
│──────────────>│ │ │
│ │ 2. Login │ │
│ │──────────────>│ │
│ │ 3. Auth Code │ │
│ │<──────────────│ │
│ 4. Code │ │ │
│<──────────────│ │ │
│ │ │ │
│ 5. Code + Secret │ │
│──────────────────────────────>│ │
│ 6. Access Token │ │
│<──────────────────────────────│ │
│ │ │ │
│ 7. Request + Access Token │ │
│──────────────────────────────────────────────>│
│ 8. Protected Resource │ │
│<──────────────────────────────────────────────│
# FastAPI OAuth2 Authorization Code
from fastapi import FastAPI, Request, HTTPException
from fastapi.security import OAuth2AuthorizationCodeBearer
import httpx
app = FastAPI()
oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl="https://auth.example.com/authorize",
tokenUrl="https://auth.example.com/token"
)
@app.get("/callback")
async def auth_callback(request: Request, code: str):
"""Получение токена после авторизации"""
async with httpx.AsyncClient() as client:
response = await client.post(
"https://auth.example.com/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": "https://myapp.com/callback",
"client_id": "your_client_id",
"client_secret": "your_client_secret" # Только на сервере!
}
)
tokens = response.json()
return {"access_token": tokens["access_token"]}Используется для взаимодействия между сервисами (machine-to-machine).
# Получение токена для сервис-сервис взаимодействия
import httpx
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
async def get_service_token() -> str:
"""Получение токена для доступа к другому сервису"""
async with httpx.AsyncClient() as client:
response = await client.post(
"https://auth.example.com/token",
data={
"grant_type": "client_credentials",
"client_id": "service_a",
"client_secret": "service_a_secret",
"scope": "read:users write:logs"
}
)
return response.json()["access_token"]
# Использование токена
async def call_service_b():
token = await get_service_token()
async with httpx.AsyncClient() as client:
response = await client.get(
"https://service-b.example.com/api/data",
headers={"Authorization": f"Bearer {token}"}
)
return response.json()Расширение Authorization Code Flow для мобильных и SPA приложений, где нельзя безопасно хранить client_secret.
# Генерация PKCE параметров
import hashlib
import base64
import secrets
def generate_pkce_pair() -> tuple[str, str]:
"""Генерация code_verifier и code_challenge"""
# code_verifier — случайная строка 43-128 символов
code_verifier = secrets.token_urlsafe(32)
# code_challenge = BASE64URL(SHA256(code_verifier))
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b'=').decode()
return code_verifier, code_challenge
# Шаг 1: Авторизация с code_challenge
@app.get("/login")
async def login():
code_verifier, code_challenge = generate_pkce_pair()
# Сохраняем code_verifier в сессии
auth_url = (
"https://auth.example.com/authorize?"
f"response_type=code&client_id=myapp&"
f"redirect_uri=https://myapp.com/callback&"
f"code_challenge={code_challenge}&"
f"code_challenge_method=S256"
)
return {"auth_url": auth_url, "code_verifier": code_verifier}
# Шаг 2: Обмен кода на токен с code_verifier
@app.get("/callback")
async def callback(code: str, code_verifier: str):
async with httpx.AsyncClient() as client:
response = await client.post(
"https://auth.example.com/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": "https://myapp.com/callback",
"client_id": "myapp",
"code_verifier": code_verifier # Доказываем, что мы тот же клиент
}
)
return response.json()Почему PKCE важен: Без PKCE злоумышленник может перехватить authorization code и обменять его на токен. С PKCE нужен code_verifier, который хранится только у легитимного клиента.
JSON Web Token — стандарт RFC 7519 для безопасной передачи информации между сторонами.
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
Три части, разделённые точкой:
# Декодирование JWT (без валидации)
import base64
import json
def decode_jwt_unsafe(token: str) -> dict:
"""Только для демонстрации структуры!"""
parts = token.split('.')
header = json.loads(base64.urlsafe_b64decode(parts[0] + '=='))
payload = json.loads(base64.urlsafe_b64decode(parts[1] + '=='))
return {"header": header, "payload": payload}
# Пример структуры
{
"header": {"alg": "HS256", "typ": "JWT"},
"payload": {
"sub": "user_123",
"name": "John Doe",
"iat": 1516239022, # Issued At
"exp": 1516242622, # Expiration Time
"role": "admin"
}
}from datetime import datetime, timedelta
from typing import Optional
from jose import jwt, JWTError
SECRET_KEY = "your-secret-key" # Хранить в env!
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Создание подписанного JWT токена"""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def validate_jwt_token(token: str) -> Optional[dict]:
"""Валидация и декодирование JWT токена"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# Проверяем обязательные claims
if "sub" not in payload:
return None
return payload
except JWTError:
return None
# FastAPI интеграция
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = validate_jwt_token(token)
if payload is None:
raise credentials_exception
return payloadAccess tokens живут недолго (15-30 минут). Refresh tokens живут дольше и используются для получения новых access tokens.
REFRESH_TOKEN_EXPIRE_DAYS = 7
def create_refresh_token(data: dict) -> str:
"""Создание refresh токена с долгим сроком жизни"""
return create_access_token(data, expires_delta=timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS))
@app.post("/refresh")
async def refresh_access_token(refresh_token: str):
"""Получение нового access token по refresh tokenу"""
payload = validate_jwt_token(refresh_token)
if payload is None or payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Invalid refresh token")
# Создаём новый access token
new_access = create_access_token({"sub": payload["sub"], "type": "access"})
return {"access_token": new_access}
# Хранение refresh токенов
# Вариант 1: В БД с возможностью отзыва
# Вариант 2: В Redis с TTL
# Вариант 3: В http-only cookieBest Practices для JWT:
exp claimКлассический подход с хранением сессии на сервере.
from fastapi import Request, Response
from starlette.middleware.sessions import SessionMiddleware
import secrets
app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY)
@app.post("/login")
async def login(request: Request, username: str, password: str):
"""Логин с созданием сессии"""
user = authenticate_user(username, password)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# Создаём сессию на сервере
session_id = secrets.token_urlsafe(32)
request.session["user_id"] = user.id
request.session["session_id"] = session_id
# Опционально: сохраняем сессию в БД/Redis для отзыва
await store_session(session_id, user.id, expires_in=3600)
return {"message": "Logged in"}
@app.get("/protected")
async def protected_route(request: Request):
"""Маршрут, требующий аутентификации"""
user_id = request.session.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Not authenticated")
return {"user_id": user_id}
@app.post("/logout")
async def logout(request: Request):
"""Логаут с очисткой сессии"""
session_id = request.session.get("session_id")
if session_id:
await delete_session(session_id) # Отзываем сессию на сервере
request.session.clear()
return {"message": "Logged out"}JWT vs Sessions:
| Критерий | JWT | Sessions |
|---|---|---|
| Хранение состояния | Stateless (токен содержит данные) | Stateful (сессия на сервере) |
| Масштабирование | Легко (нет состояния) | Требует shared storage (Redis) |
| Отзыв токена | Сложно (нужен blacklist) | Легко (удалить сессию) |
| Размер | Может быть большим | Маленький session ID |
| Использование | API, микросервисы | Веб-приложения |
Атака: Внедрение malicious SQL через пользовательский ввод.
# ❌ УЯЗВИМОСТЬ: Конкатенация строк
async def get_user(username: str):
query = f"SELECT * FROM users WHERE username = '{username}'"
# Атака: username = "' OR '1'='1" → вернёт всех пользователей
# ✅ ЗАЩИТА: Параметризированные запросы
async def get_user_safe(username: str):
query = "SELECT * FROM users WHERE username = :username"
result = await db.execute(query, {"username": username})
# Даже при вводе "' OR '1'='1" будет искаться literal строка# SQLAlchemy ORM защита (автоматическая параметризация)
from sqlalchemy import select
async def get_user_orm(username: str):
stmt = select(User).where(User.username == username)
result = await db.execute(stmt)
return result.scalar()Атака: Внедрение JavaScript в страницы, которые видят другие пользователи.
# ❌ УЯЗВИМОСТЬ: Рендеринг пользовательского ввода без экранирования
@app.get("/comment")
async def show_comment(text: str):
return f"<div>{text}</div>" # Атака: text = "<script>steal()</script>"
# ✅ ЗАЩИТА: Экранирование HTML
from fastapi.responses import HTMLResponse
import html
@app.get("/comment")
async def show_comment_safe(text: str):
safe_text = html.escape(text) # < → <, > → >
return HTMLResponse(f"<div>{safe_text}</div>")
# FastAPI по умолчанию экранирует JSON ответы
# Для React/Vue фронтендов XSS менее критичен (DOM экранирование)Атака: Злоумышленник заставляет пользователя выполнить действие без его ведома.
# ✅ ЗАЩИТА: CSRF токены
from fastapi_csrf_protect import CsrfProtect
@app.post("/transfer")
@CsrfProtect.validate_csrf
async def transfer_money(request: Request, amount: float):
# Требует CSRF токен в заголовке или форме
# Злоумышленник не может подделать запрос с другого домена
...
# Для API с JWT CSRF менее критичен:
# - Токены в Authorization header (не отправляются автоматически)
# - SameSite cookie атрибут
# - CORS политикаЗащита:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)Rate limiting защищает от brute force, DDoS и abuse.
from slowapi import SlowAPIClient, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
app.state.limiter = SlowAPIClient()
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Лимит на логин (защита от brute force)
@app.post("/login")
@rate_limit("5/minute") # 5 попыток в минуту
async def login(credentials: Credentials):
...
# Лимит на API запросы
@app.get("/api/data")
@rate_limit("100/hour") # 100 запросов в час
async def get_data():
...
# Кастомный лимит по user_id
async def get_user_id(request: Request) -> str:
user = await get_current_user(request)
return user.get("sub", get_remote_address(request))
@rate_limit("1000/hour", key_func=get_user_id)
@app.get("/api/premium")
async def premium_endpoint():
...Алгоритмы Rate Limiting:
# Token Bucket реализация
import time
from collections import defaultdict
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity # Максимум токенов
self.refill_rate = refill_rate # Токенов в секунду
self.buckets = defaultdict(lambda: {"tokens": capacity, "last_update": time.time()})
def consume(self, key: str, tokens: int = 1) -> bool:
bucket = self.buckets[key]
now = time.time()
# Пополняем бакет
elapsed = now - bucket["last_update"]
bucket["tokens"] = min(self.capacity, bucket["tokens"] + elapsed * self.refill_rate)
bucket["last_update"] = now
# Проверяем наличие токенов
if bucket["tokens"] >= tokens:
bucket["tokens"] -= tokens
return True
return False
limiter = TokenBucket(capacity=10, refill_rate=1) # 10 токенов, 1/сек пополнение# .env файл (не коммитить в git!)
DATABASE_URL=postgresql://user:pass@localhost/db
SECRET_KEY=super-secret-key-change-in-prod
JWT_ALGORITHM=HS256
OAUTH_CLIENT_SECRET=oauth-secret
# Загрузка в приложении
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str
secret_key: str
jwt_algorithm: str = "HS256"
oauth_client_secret: str
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()Для production используйте специализированные хранилища:
# HashiCorp Vault пример
import hvac
def get_secret_from_vault(path: str) -> str:
client = hvac.Client(
url="https://vault.example.com",
token=os.getenv("VAULT_TOKEN")
)
secret = client.secrets.kv.v2.read_secret_version(path=path)
return secret["data"]["data"]["value"]
# AWS Secrets Manager
import boto3
def get_secret_from_aws(secret_name: str) -> str:
client = boto3.client("secretsmanager")
response = client.get_secret_value(SecretId=secret_name)
return response["SecretString"]Best Practices:
TLS (Transport Layer Security) шифрует трафик между клиентом и сервером.
┌─────────┐ ┌─────────┐
│ Client │ │ Server │
└────┬────┘ └────┬────┘
│ │
│ 1. ClientHello │
│ (supported ciphers) │
│─────────────────────────────>│
│ │
│ 2. ServerHello + Certificate │
│<─────────────────────────────│
│ │
│ 3. Verify Certificate │
│ (CA chain, domain, expiry) │
│ │
│ 4. Key Exchange │
│<────────────────────────────>│
│ │
│ 5. Encrypted Communication │
│<────────────────────────────>│
# FastAPI с HTTPS (через uvicorn)
# uvicorn main:app --host 0.0.0.0 --port 443 \
# --ssl-keyfile=./key.pem --ssl-certfile=./cert.pem
# HSTS заголовок (принудительный HTTPS)
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
app.add_middleware(HTTPSRedirectMiddleware)
# Security headers
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Content-Security-Policy"] = "default-src 'self'"
return responseTLS Best Practices:
from datetime import datetime, timedelta
from typing import Optional
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel
# Конфигурация
SECRET_KEY = "change-this-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
# Модели
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class UserCreate(BaseModel):
username: str
password: str
email: str
class User(BaseModel):
id: int
username: str
email: str
role: str = "user"
# Хранилище (в production — БД)
fake_users_db = {}
refresh_tokens_db = set()
# Утилиты
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_token(data: dict, expires_delta: timedelta) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> Optional[dict]:
try:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
return None
# Endpoints
@app.post("/register")
async def register(user: UserCreate):
if user.username in fake_users_db:
raise HTTPException(status_code=400, detail="Username already registered")
fake_users_db[user.username] = {
"id": len(fake_users_db) + 1,
"username": user.username,
"email": user.email,
"hashed_password": hash_password(user.password),
"role": "user"
}
return {"message": "User created"}
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = fake_users_db.get(form_data.username)
if not user or not verify_password(form_data.password, user["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Access token
access_token = create_token(
data={"sub": user["username"], "type": "access", "role": user["role"]},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
# Refresh token
refresh_token = create_token(
data={"sub": user["username"], "type": "refresh"},
expires_delta=timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
)
refresh_tokens_db.add(refresh_token)
return Token(access_token=access_token, refresh_token=refresh_token)
@app.post("/refresh", response_model=Token)
async def refresh_token(refresh_token: str):
payload = decode_token(refresh_token)
if not payload or payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Invalid refresh token")
if refresh_token not in refresh_tokens_db:
raise HTTPException(status_code=401, detail="Token revoked")
username = payload["sub"]
user = fake_users_db.get(username)
if not user:
raise HTTPException(status_code=401, detail="User not found")
# Новый access token
new_access = create_token(
data={"sub": username, "type": "access", "role": user["role"]},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
# Опционально: ротация refresh токена
refresh_tokens_db.remove(refresh_token)
new_refresh = create_token(
data={"sub": username, "type": "refresh"},
expires_delta=timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
)
refresh_tokens_db.add(new_refresh)
return Token(access_token=new_access, refresh_token=new_refresh)
@app.post("/logout")
async def logout(refresh_token: str):
"""Отзыв refresh токена"""
if refresh_token in refresh_tokens_db:
refresh_tokens_db.remove(refresh_token)
return {"message": "Logged out"}
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_token(token)
if payload is None or payload.get("type") != "access":
raise credentials_exception
username = payload.get("sub")
user = fake_users_db.get(username)
if user is None:
raise credentials_exception
return User(**user)
@app.get("/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
async def require_admin(current_user: User = Depends(get_current_user)) -> User:
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
return current_user
@app.get("/admin/users")
async def list_all_users(admin: User = Depends(require_admin)):
return {"users": list(fake_users_db.values())}Этот пример демонстрирует:
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.