OWASP Top 10, валидация, injection, XSS, CSRF
Безопасность критична для защиты данных пользователей. В этой теме вы изучите OWASP Top 10, валидацию, injection, XSS, CSRF и лучшие практики.
OWASP (Open Web Application Security Project) — список наиболее критичных уязвимостей веб-приложений.
Уязвимость:
# ОПАСНО! Никогда не делайте так!
@app.get('/users')
def get_users(email: str):
query = f"SELECT * FROM users WHERE email = '{email}'"
user = db.execute(query) # SQL injection!
return userАтака:
GET /users?email=' OR '1'='1
# query = "SELECT * FROM users WHERE email = '' OR '1'='1'"
# Вернёт всех пользователей!
Защита (параметризованные запросы):
# Безопасно
@app.get('/users')
def get_users(email: str):
user = db.query(User).filter(User.email == email).first()
# SQLAlchemy использует параметризованные запросы
return user
# Или raw SQL с параметрами
@app.get('/users')
def get_users(email: str):
result = db.execute(
text("SELECT * FROM users WHERE email = :email"),
{"email": email}
)
return result.fetchall()Уязвимость:
# ОПАСНО!
@app.get('/users')
def get_users(username: str):
user = db.users.find_one({"username": username})
return userАтака:
GET /users?username[$ne]=
# {"username": {"$ne": null}}
# Вернёт первого пользователя с непустым usernameЗащита:
from bson.regex import Regex
@app.get('/users')
def get_users(username: str):
# Явное приведение к строке
user = db.users.find_one({"username": {"$eq": str(username)}})
return userУязвимость:
# ОПАСНО!
import subprocess
@app.post('/process')
def process_file(filename: str):
result = subprocess.run(f"cat {filename}", shell=True) # Command injection!
return result.stdoutАтака:
filename = "test.txt; rm -rf /"
# Выполнит: cat test.txt; rm -rf /
Защита:
# Безопасно
import subprocess
@app.post('/process')
def process_file(filename: str):
# Без shell=True, список аргументов
result = subprocess.run(
["cat", filename],
capture_output=True,
check=False
)
return result.stdout.decode()Уязвимость:
# ОПАСНО! Возврат HTML с пользовательскими данными
@app.get('/search')
def search(q: str):
return HTMLResponse(f"<h1>Results for: {q}</h1>")Атака:
GET /search?q=<script>alert('XSS')</script>
# Выполнит JavaScript в браузере жертвы
Защита:
# Безопасно: экранирование HTML
from markupsafe import escape
@app.get('/search')
def search(q: str):
return HTMLResponse(f"<h1>Results for: {escape(q)}</h1>")
# Или используйте шаблонизаторы с авто-экранированием
from fastapi.templating import Jinja2Templates
templates = Jinja2Templates(directory="templates")
@app.get('/search')
def search(request: Request, q: str):
return templates.TemplateResponse(
"search.html",
{"request": request, "query": q} # Jinja2 экранирует автоматически
)Уязвимость:
# ОПАСНО! Сохранение и отображение без экранирования
@app.post('/comments')
def add_comment(text: str):
db.save_comment(text) # Сохраняем как есть
return {"status": "ok"}
@app.get('/comments')
def get_comments():
comments = db.get_comments()
return HTMLResponse("".join(f"<p>{c.text}</p>" for c in comments))Защита:
# Безопасно: экранирование при выводе
from markupsafe import escape
@app.get('/comments')
def get_comments():
comments = db.get_comments()
return HTMLResponse("".join(f"<p>{escape(c.text)}</p>" for c in comments))Злоумышленник заставляет жертву выполнить действие на доверенном сайте.
<!-- Злоумышленник размещает на своём сайте -->
<img src="https://bank.com/transfer?to=attacker&amount=1000" />
<!-- Если жертва авторизована в bank.com, перевод выполнится -->from fastapi_csrf_protect import CsrfProtect
app = FastAPI()
@app.post('/transfer')
@CsrfProtect.validate_csrf
def transfer(to: str, amount: float):
# CSRF токен проверен
...from fastapi import Request
from fastapi.responses import HTMLResponse
@app.get('/form')
def get_form(request: Request):
csrf_token = CsrfProtect.generate_csrf()
return HTMLResponse(f"""
<form method="POST" action="/transfer">
<input type="hidden" name="csrf_token" value="{csrf_token}">
<input type="text" name="to">
<input type="number" name="amount">
<button type="submit">Transfer</button>
</form>
""")from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware("http")
async def security_headers(request: Request, call_next):
response = await call_next(request)
# Защита от MIME sniffing
response.headers['X-Content-Type-Options'] = 'nosniff'
# Защита от clickjacking
response.headers['X-Frame-Options'] = 'DENY'
# XSS фильтр
response.headers['X-XSS-Protection'] = '1; mode=block'
# Referrer policy
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Content Security Policy
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self'"
)
# HSTS (только для HTTPS)
response.headers['Strict-Transport-Security'] = (
'max-age=31536000; includeSubDomains'
)
return responsefrom pydantic import BaseModel, EmailStr, Field, validator
import re
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=8)
age: int = Field(..., ge=18, le=120)
@validator('username')
def validate_username(cls, v):
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Username must be alphanumeric')
return v
@validator('password')
def validate_password(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('Password must contain uppercase letter')
if not any(c.isdigit() for c in v):
raise ValueError('Password must contain digit')
return vfrom fastapi import FastAPI, Request, HTTPException
import json
app = FastAPI()
MAX_BODY_SIZE = 1024 * 1024 # 1 MB
@app.middleware("http")
async def limit_body_size(request: Request, call_next):
content_length = request.headers.get('Content-Length')
if content_length and int(content_length) > MAX_BODY_SIZE:
raise HTTPException(
status_code=413,
detail="Request body too large"
)
return await call_next(request)from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# Использование
@app.post('/register')
def register(user: UserCreate):
hashed = hash_password(user.password)
# Сохраняем hashed, не plain password!
...from pydantic import BaseModel, Field, validator
class UserCreate(BaseModel):
password: str = Field(..., min_length=12)
@validator('password')
def password_strength(cls, v):
if len(v) < 12:
raise ValueError('Password must be at least 12 characters')
if not any(c.isupper() for c in v):
raise ValueError('Password must contain uppercase letter')
if not any(c.islower() for c in v):
raise ValueError('Password must contain lowercase letter')
if not any(c.isdigit() for c in v):
raise ValueError('Password must contain digit')
if not any(c in '!@#$%^&*()_+-=[]{}|;:,.<>?' for c in v):
raise ValueError('Password must contain special character')
return vfrom slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post('/login')
@limiter.limit("5/minute") # Максимум 5 попыток в минуту
def login(request: Request, credentials: OAuth2PasswordRequestForm):
# Защита от перебора паролей
...from fastapi import FastAPI, UploadFile, File, HTTPException
from pathlib import Path
import magic
import secrets
app = FastAPI()
ALLOWED_TYPES = {'image/jpeg', 'image/png', 'image/gif'}
MAX_SIZE = 10 * 1024 * 1024 # 10 MB
@app.post('/upload')
async def upload_file(file: UploadFile = File(...)):
# Проверка MIME-типа
if file.content_type not in ALLOWED_TYPES:
raise HTTPException(400, "Invalid file type")
# Чтение и проверка размера
contents = await file.read()
if len(contents) > MAX_SIZE:
raise HTTPException(400, "File too large")
# Проверка по magic bytes
mime = magic.Magic(mime=True)
real_type = mime.from_buffer(contents[:1024])
if real_type not in ALLOWED_TYPES:
raise HTTPException(400, "File content does not match declared type")
# Безопасное имя файла
ext = Path(file.filename).suffix
safe_filename = f"{secrets.token_hex(16)}{ext}"
# Сохранение
file_path = Path('uploads') / safe_filename
with open(file_path, 'wb') as f:
f.write(contents)
return {'filename': safe_filename}import logging
from datetime import datetime
security_logger = logging.getLogger('security')
def log_security_event(event_type: str, details: dict, user_id: int = None):
"""Логирование событий безопасности"""
security_logger.warning(
f"SECURITY_EVENT: {event_type}",
extra={
'event_type': event_type,
'details': details,
'user_id': user_id,
'timestamp': datetime.utcnow().isoformat()
}
)
# Примеры использования
@app.post('/login')
def login(credentials: OAuth2PasswordRequestForm):
user = authenticate(credentials.username, credentials.password)
if not user:
log_security_event(
'FAILED_LOGIN',
{'username': credentials.username, 'ip': request.client.host}
)
raise HTTPException(401, "Invalid credentials")
log_security_event(
'SUCCESSFUL_LOGIN',
{'username': credentials.username},
user.id
)
return {'token': create_token(user)}В следующей теме вы изучите версионирование API — стратегии, обратная совместимость, управление изменениями.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.