Инъекции, десериализация, валидация, секреты, HTTPS, CVE
Безопасность — не опция, а требование. Безопасная программа — это та, которая сломалась корректно, а не предсказуемо.
Самая частая уязвимость веб-приложений.
# ❌ ОПАСНО: имя пользователя вставляется напрямую
name = "'; DROP TABLE users; --"
query = f"SELECT * FROM users WHERE name = '{name}'"
# Результат: DELETE всей таблицы!
# ✅ БЕЗОПАСНО: параметризованные запросы
cursor.execute("SELECT * FROM users WHERE name = %s", (name,))
# SQLAlchemy ORM — автоматически защищает
user = session.query(User).filter(User.name == name).first()
# SQLAlchemy Core с bindparams
from sqlalchemy import text
result = conn.execute(
text("SELECT * FROM users WHERE name = :name"),
{"name": name}
)Правило: никогда не формируйте SQL через f-string или % форматирование с пользовательскими данными.
# ❌ ОПАСНО: os.system и shell=True с пользовательским вводом
filename = "file.txt; rm -rf /"
os.system(f"cat {filename}") # выполнит rm -rf /!
# ❌ ОПАСНО: shell=True с конкатенацией
subprocess.run(f"cat {filename}", shell=True)
# ✅ БЕЗОПАСНО: список аргументов без shell=True
subprocess.run(["cat", filename], check=True)
# ✅ БЕЗОПАСНО: явная проверка пути
import pathlib
safe_path = pathlib.Path(filename).resolve()
if not safe_path.is_relative_to(BASE_DIR):
raise ValueError("Path traversal attempt")
subprocess.run(["cat", str(safe_path)], check=True)# ❌ ОПАСНО: пользователь контролирует путь
def serve_file(filename):
path = f"/var/www/files/{filename}" # filename = "../../../etc/passwd"
return open(path).read()
# ✅ БЕЗОПАСНО: canonicalize и проверяйте базовый путь
import os
BASE_DIR = "/var/www/files"
def serve_file_safe(filename):
# realpath разрешает symlinks и ..
safe_path = os.path.realpath(os.path.join(BASE_DIR, filename))
if not safe_path.startswith(BASE_DIR + os.sep):
raise PermissionError(f"Access denied: {filename}")
return open(safe_path).read()# ❌ ОПАСНО: запрос к URL, контролируемому пользователем
def fetch_avatar(url):
return requests.get(url).content # может быть http://169.254.169.254/
# ✅ БЕЗОПАСНО: валидация URL
import ipaddress
from urllib.parse import urlparse
ALLOWED_SCHEMES = {'https'}
BLOCKED_NETWORKS = [
ipaddress.ip_network("169.254.0.0/16"), # link-local (AWS metadata)
ipaddress.ip_network("10.0.0.0/8"), # private
ipaddress.ip_network("172.16.0.0/12"), # private
ipaddress.ip_network("192.168.0.0/16"), # private
ipaddress.ip_network("127.0.0.0/8"), # loopback
]
def safe_fetch(url: str) -> bytes:
parsed = urlparse(url)
if parsed.scheme not in ALLOWED_SCHEMES:
raise ValueError(f"Scheme {parsed.scheme} not allowed")
import socket
ip = socket.gethostbyname(parsed.hostname)
addr = ipaddress.ip_address(ip)
for network in BLOCKED_NETWORKS:
if addr in network:
raise ValueError(f"Access to {ip} is blocked")
return requests.get(url, timeout=5).contentpickle — произвольное выполнение кода# ❌ КРИТИЧЕСКИ ОПАСНО: pickle может выполнить любой код
import pickle
# Злонамеренный pickle-пейлоад
import os
class Exploit:
def __reduce__(self):
return (os.system, ("rm -rf /",))
payload = pickle.dumps(Exploit())
pickle.loads(payload) # ВЫПОЛНИТ rm -rf /!
# ✅ БЕЗОПАСНО: никогда не загружайте pickle из недоверенных источников
# Используйте: json, msgpack, cbor2, protobuf, marshmallow
data = json.loads(untrusted_json) # только встроенные типыПравило:
pickle,marshal,shelve— только для доверенных данных между известными системами.
yaml.load() — тот же вектор# ❌ ОПАСНО: yaml.load() с полным загрузчиком
import yaml
payload = "!!python/object/apply:os.system ['ls']"
yaml.load(payload, Loader=yaml.FullLoader) # выполнит os.system!
# ✅ БЕЗОПАСНО: safe_load разрешает только базовые типы
yaml.safe_load(payload) # ValueError — нет python-тегов# ❌ ОПАСНО: стандартный xml.etree.ElementTree уязвим к XXE
from xml.etree.ElementTree import parse
tree = parse(untrusted_xml_file) # может читать /etc/passwd
# ✅ БЕЗОПАСНО: defusedxml блокирует XXE
import defusedxml.ElementTree as ET
tree = ET.parse(untrusted_xml_file) # безопасно
# ✅ ИЛИ: lxml с resolve_entities=False
from lxml import etree
parser = etree.XMLParser(resolve_entities=False, no_network=True)
tree = etree.parse(untrusted_xml_file, parser)eval() и exec()# ❌ КАТАСТРОФИЧЕСКИ ОПАСНО
user_input = "__import__('os').system('rm -rf /')"
eval(user_input) # выполнит команду!
# ❌ Ограниченный eval НЕ работает как sandbox
eval(user_input, {"__builtins__": {}})
# Атакующий всё равно может добраться до os через объекты
# ✅ Для вычисления математических выражений используйте специализированные библиотеки
import ast
def safe_eval(expr: str) -> float:
"""Вычисляет только простые математические выражения."""
tree = ast.parse(expr, mode='eval')
# Проверяем что в дереве только допустимые узлы
allowed = (ast.Expression, ast.BinOp, ast.UnaryOp, ast.Constant,
ast.Add, ast.Sub, ast.Mult, ast.Div, ast.USub)
for node in ast.walk(tree):
if not isinstance(node, allowed):
raise ValueError(f"Недопустимое выражение: {type(node).__name__}")
return eval(compile(tree, '<string>', 'eval'))
safe_eval("2 + 3 * 4") # 14.0
safe_eval("__import__('os')") # ValueErrorsecrets — криптографически безопасные случайные числаimport secrets
# ❌ random — НЕ подходит для криптографии!
import random
token = random.token_hex(32) # предсказуем!
# ✅ secrets — CSPRNG (Cryptographically Secure Pseudo-Random Number Generator)
token = secrets.token_hex(32) # 64-символьный hex
token = secrets.token_urlsafe(32) # URL-safe base64
token = secrets.token_bytes(32) # 32 случайных байта
otp = secrets.randbelow(1_000_000) # случайное 6-значное число
# Генерация паролей
import string
alphabet = string.ascii_letters + string.digits + string.punctuation
password = ''.join(secrets.choice(alphabet) for _ in range(16))# ❌ Хардкод в коде
DB_PASSWORD = "super_secret_123"
# ❌ В логах
logger.info(f"Connecting with password={password}")
# ❌ В git-истории (даже если удалили — история сохраняется)
# ✅ Переменные окружения
import os
DB_PASSWORD = os.environ.get("DB_PASSWORD")
# ✅ .env файлы (только локально, в .gitignore)
from dotenv import load_dotenv
load_dotenv()
DB_PASSWORD = os.environ["DB_PASSWORD"]
# ✅ Для production: Vault, AWS Secrets Manager, K8s Secrets
import boto3
def get_secret(name):
client = boto3.client('secretsmanager')
return client.get_secret_value(SecretId=name)['SecretString']# ❌ НИКОГДА не храните пароли в открытом виде или в MD5/SHA1
import hashlib
hashed = hashlib.md5(password.encode()).hexdigest() # взламывается за секунды!
hashed = hashlib.sha256(password.encode()).hexdigest() # тоже плохо — rainbow tables
# ❌ SHA-256 без соли — уязвим к атакам по словарю
# Если два пользователя имеют одинаковый пароль — одинаковый хеш!
# ✅ bcrypt — медленный адаптивный хэш
import bcrypt
def hash_password(password: str) -> bytes:
salt = bcrypt.gensalt(rounds=12) # rounds: стоимость работы
return bcrypt.hashpw(password.encode(), salt)
def verify_password(password: str, hashed: bytes) -> bool:
return bcrypt.checkpw(password.encode(), hashed)
# ✅ argon2 — победитель Password Hashing Competition 2015
from argon2 import PasswordHasher
ph = PasswordHasher(time_cost=3, memory_cost=65536, parallelism=4)
hashed = ph.hash(password)
ph.verify(hashed, password) # True или argon2.exceptions.VerifyMismatchError
# ✅ Встроенный hashlib с PBKDF2 (если нет bcrypt/argon2)
import hashlib, os
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 100_000)# ❌ УЯЗВИМО к timing attack — Python прерывает сравнение при первом несовпадении
if user_token == expected_token: # атакующий может угадать байт за байтом!
authenticate()
# ✅ Constant-time сравнение — hmac.compare_digest
import hmac
if hmac.compare_digest(user_token.encode(), expected_token.encode()):
authenticate()
# Принцип: функция всегда занимает одинаковое время независимо от совпадения
# Разница в 1 нс между "нет" и "почти да" позволяет угадать секрет за O(n) попытокimport ssl
import requests
# ❌ ОПАСНО: отключение проверки сертификата
requests.get(url, verify=False) # уязвимо к MITM-атакам
# ✅ БЕЗОПАСНО: проверка включена по умолчанию
requests.get(url)
# ✅ Для самоподписанных сертификатов
requests.get(url, verify="/path/to/ca-cert.pem")
# Настройка SSL-контекста вручную
ctx = ssl.create_default_context()
ctx.minimum_version = ssl.TLSVersion.TLSv1_2 # запрет устаревших версий
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
conn = ctx.wrap_socket(socket.socket(), server_hostname=hostname)import re
# ❌ ОПАСНО: уязвимый паттерн — exponential backtracking
pattern = re.compile(r"(a+)+$")
pattern.match("a" * 30 + "!") # зависает! (экспоненциальное время)
# Проблема: (a+)+ — вложенные квантификаторы создают катастрофический backtracking
# ✅ БЕЗОПАСНО: используйте атомарные группы и ограничения
import regex # сторонняя библиотека с possessive quantifiers
pattern = regex.compile(r"(?>a+)+$") # атомарная группа — нет backtracking
# ✅ Ограничивайте длину ввода перед проверкой
def validate_email(email: str) -> bool:
if len(email) > 254: # RFC 5321 limit
return False
return bool(re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email))
# Используйте timeout (Python 3.11+):
import re
try:
re.match(pattern, user_input, timeout=0.5)
except TimeoutError:
passbandit — статический анализ# Установка
# pip install bandit
# Запуск
# bandit -r my_project/ -ll
# Найдёт: hardcoded passwords, использование assert для security,
# subprocess shell=True, eval/exec, md5/sha1 хэширование, и др.pip-audit и safety — уязвимости в зависимостях# pip-audit — проверяет PyPI Advisory Database
pip install pip-audit
pip-audit
# safety — проверяет safety-db
pip install safety
safety checksemgrep — гибкий SAST# Поиск опасных паттернов с готовыми правилами
semgrep --config=p/python-security .from pydantic import BaseModel, EmailStr, Field, validator
class UserCreate(BaseModel):
email: EmailStr # встроенная валидация email
name: str = Field(min_length=1, max_length=100)
age: int = Field(ge=0, le=150)
bio: str = Field(default="", max_length=1000)
@validator('name')
def name_no_special(cls, v):
if not v.replace(' ', '').isalnum():
raise ValueError('Имя должно содержать только буквы и цифры')
return v.strip()
# Pydantic автоматически вызывает ValueError/ValidationError при нарушении
try:
user = UserCreate(email="invalid", name="", age=-1)
except ValidationError as e:
print(e.json())import html
import bleach
# Экранирование HTML — для вставки пользовательских данных в HTML
safe_text = html.escape("<script>alert('xss')</script>")
# "<script>alert('xss')</script>"
# bleach — разрешает только безопасные теги
clean = bleach.clean(
user_content,
tags=['p', 'b', 'i', 'a'],
attributes={'a': ['href']},
strip=True
)| Уязвимость | Python-контекст | Защита |
|---|---|---|
| SQL Injection | string format в запросах | Параметризованные запросы |
| Broken Authentication | слабые токены (random), небезопасное хранение паролей | secrets, bcrypt/argon2 |
| XSS | вставка user-data в HTML | html.escape(), bleach |
| Insecure Deserialization | pickle.loads(untrusted) | json, отказ от pickle |
| SSRF | requests.get(user_url) | Валидация URL, whitelist |
| Path Traversal | open(user_path) | os.path.realpath() + проверка базового пути |
| Command Injection | os.system(user_input) | subprocess.run([list]) |
| ReDoS | вложенные квантификаторы в regex | Atomic groups, ограничение длины |
| Hardcoded Secrets | пароли в коде | os.environ, Vault |
| Outdated Dependencies | CVE в старых версиях | pip-audit, safety |
import jwt
from datetime import datetime, timedelta, timezone
SECRET_KEY = os.environ["JWT_SECRET"] # минимум 256 бит
ALGORITHM = "HS256"
# Создание токена
def create_access_token(user_id: int) -> str:
payload = {
"sub": str(user_id),
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(hours=1),
"type": "access",
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
# Валидация токена
def decode_token(token: str) -> dict:
try:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=[ALGORITHM],
options={"verify_exp": True},
)
return payload
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token expired")
except jwt.InvalidTokenError:
raise AuthenticationError("Invalid token")
# ❌ НЕ ДЕЛАЙТЕ: decode без верификации
jwt.decode(token, options={"verify_signature": False}) # уязвимо!
# ❌ Не храните чувствительные данные в payload (payload не зашифрован, только подписан!)import time
from collections import defaultdict
from threading import Lock
class InMemoryRateLimiter:
"""Sliding window rate limiter (только для одного процесса)."""
def __init__(self, max_requests: int, window_seconds: int):
self._max = max_requests
self._window = window_seconds
self._requests: dict[str, list[float]] = defaultdict(list)
self._lock = Lock()
def is_allowed(self, key: str) -> bool:
now = time.time()
cutoff = now - self._window
with self._lock:
# Удаляем старые запросы
self._requests[key] = [t for t in self._requests[key] if t > cutoff]
if len(self._requests[key]) >= self._max:
return False
self._requests[key].append(now)
return True
# Для production используйте Redis:
def redis_rate_limit(redis_client, key: str, limit: int, window: int) -> bool:
"""Token bucket через Redis INCR + EXPIRE."""
current = redis_client.incr(key)
if current == 1:
redis_client.expire(key, window)
return current <= limitfrom pydantic_settings import BaseSettings
from pydantic import SecretStr, validator
class Settings(BaseSettings):
"""Настройки приложения с валидацией безопасности."""
database_url: SecretStr # SecretStr скрывает значение в логах
jwt_secret: SecretStr
allowed_hosts: list[str] = ["localhost"]
debug: bool = False
@validator("jwt_secret")
def validate_jwt_secret(cls, v):
secret = v.get_secret_value()
if len(secret) < 32:
raise ValueError("JWT secret must be at least 32 characters")
return v
class Config:
env_file = ".env"
settings = Settings()
# SecretStr предотвращает случайный вывод:
print(settings.jwt_secret) # '**********'
print(settings.jwt_secret.get_secret_value()) # только явноsafe_eval(expr), разрешающий только числовые выражения через AST-проверку.secrets.bandit -r . и исправьте найденные проблемы.hmac.compare_digest() и == при сравнении токенов.safe_open(filename, base_dir) защищённую от path traversal.Settings которая валидирует длину SECRET_KEY и запрещает DEBUG=True в production.Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.