Валидация webhook, защита от XSS/injection, секреты, rate limiting
Проблема: Ваш бот обрабатывает персональные данные, платежи, команды администраторов. Одна уязвимость — и злоумышленник получает доступ к данным пользователей, рассылает спам от имени бота, или бот падает под нагрузкой атаки.
Реальные угрозы:
- Webhook spoofing — злоумышленник отправляет поддельные обновления от имени Telegram
- SQL injection — через пользовательский ввод в БД
- XSS в сообщениях — вредоносный HTML/Markdown в ответах
- Токен бота в логах — утечка через лог-файлы, скриншоты
- Rate limiting bypass — спам и DDoS атаки
- Injection атак — через callback_data, файлы, команды
Решение: Многоуровневая защита — валидация webhook, параметризованные SQL-запросы, rate limiting, безопасное хранение секретов, аудит действий.
Зачем это нужно: Безопасность — не опция, а обязательное требование для production-бота. Утечка данных = потеря доверия + возможные юридические последствия.
Проблема: Злоумышленник может отправить POST-запрос на ваш webhook с поддельными данными. Как отличить запрос от Telegram?
Решение: Telegram отправляет secret token в заголовке X-Telegram-Bot-Api-Secret-Token.
# bot/middleware/webhook_auth.py
from typing import Callable, Dict, Any
from aiogram import BaseMiddleware
from aiogram.types import Update
from bot.config import settings
class WebhookAuthMiddleware(BaseMiddleware):
"""
Проверяет secret token от Telegram.
💡 Зачем:
- Гарантия, что запрос от Telegram
- Защита от webhook spoofing
🔗 Связь с другими темами: Архитектура (middleware)
"""
async def __call__(
self,
handler: Callable,
event: Update,
data: Dict[str, Any]
) -> Any:
# Получаем заголовок из запроса
request = data.get('request')
if request:
secret_token = request.headers.get('X-Telegram-Bot-Api-Secret-Token')
if secret_token != settings.webhook_secret:
# Отклоняем запрос
from aiohttp import web
raise web.HTTPForbidden(reason='Invalid secret token')
return await handler(event, data)
# Регистрация
dp.update.outer_middleware(WebhookAuthMiddleware())⚠️ Важно: Secret token должен быть уникальным для каждого бота. Не используйте одинаковый токен для dev/prod окружений.
from aiogram.webhook.aiohttp_server import SimpleRequestHandler
# Secret token проверяется автоматически
# 💡 Зачем: Меньше кода, меньше шансов ошибиться
handler = SimpleRequestHandler(
dispatcher=dp,
bot=bot,
secret_token=settings.webhook_secret # ✅ Автоматическая проверка
)Проблема: Secret token может быть скомпрометирован. Дополнительная проверка по IP снижает риски.
# bot/middleware/ip_whitelist.py
import ipaddress
from typing import Callable, Dict, Any, Set
from aiogram import BaseMiddleware
from aiogram.types import Update
# Диапазоны IP Telegram
# 💡 Зачем: Дополнительный уровень защиты
TELEGRAM_IP_RANGES = [
ipaddress.ip_network('149.154.160.0/20'),
ipaddress.ip_network('91.108.4.0/22'),
]
class IPWhitelistMiddleware(BaseMiddleware):
"""Проверяет IP-адрес запроса."""
def __init__(self, enabled: bool = False):
self.enabled = enabled
# ⚠️ Важно: Включайте только для критичных ботов
async def __call__(
self,
handler: Callable,
event: Update,
data: Dict[str, Any]
) -> Any:
if not self.enabled:
return await handler(event, data)
request = data.get('request')
if request:
remote_ip = ipaddress.ip_address(request.remote)
if not any(remote_ip in network for network in TELEGRAM_IP_RANGES):
from aiohttp import web
raise web.HTTPForbidden(reason='IP not allowed')
return await handler(event, data)⚠️ Антипаттерн: Не полагайтесь только на IP whitelist. Telegram может менять диапазоны. Используйте вместе с secret token.
Проблема: Пользователь вводит ' OR '1'='1 в поле поиска. Без защиты это может дать доступ к чужим данным.
Решение: Параметризованные запросы — данные передаются отдельно от SQL-кода.
# ❌ ПЛОХО: Никогда не используйте f-strings в SQL
async def get_user(username: str):
query = f"SELECT * FROM users WHERE username = '{username}'"
await session.execute(query)
# ⚠️ Уязвимость: username = "admin' --" отключит проверку пароля
# ✅ ХОРОШО: Используйте параметризованные запросы
from sqlalchemy import select
async def get_user(username: str):
query = select(User).where(User.username == username)
result = await session.execute(query)
return result.scalar_one_or_none()
# 💡 Зачем: SQLAlchemy экранирует значения автоматически💡 Правило: Никогда не вставляйте пользовательские данные напрямую в SQL-запрос через f-strings или конкатенацию.
Проблема: Пользователь отправляет сообщение с <script>alert('xss')</script>. Бот пересылает это другому пользователю с parse_mode='HTML' — скрипт выполняется.
Решение: Санитизация пользовательского ввода.
# bot/utils/sanitize.py
import html
import re
def sanitize_text(text: str) -> str:
"""
Экранирует HTML-символы.
💡 Зачем:
- < превращается в <
- > превращается в >
- Скрипты не выполняются
"""
return html.escape(text)
def sanitize_markdown(text: str) -> str:
"""
Экранирует Markdown-символы.
💡 Зачем: Пользователь не сможет сломать форматирование.
"""
# Экранируем специальные символы Markdown
special_chars = ['_', '*', '[', ']', '(', ')', '~', '`', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!']
for char in special_chars:
text = text.replace(char, f'\\{char}')
return text
# Использование
@router.message(Command('echo'))
async def cmd_echo(msg: Message):
user_text = msg.text.replace('/echo ', '')
# Безопасная отправка
# ⚠️ Важно: parse_mode=None отключает интерпретацию
await msg.answer(
sanitize_text(user_text),
parse_mode=None
)⚠️ Антипаттерн: Не используйте parse_mode='HTML' или 'Markdown' с пользовательским вводом без санитизации.
Проблема: Пользователь вводит email "не email", возраст "-5", имя из 1000 символов. Без валидации бот падает или сохраняет мусор.
Решение: Pydantic для валидации всех входных данных.
from pydantic import BaseModel, Field, EmailStr, validator
from datetime import datetime
import re
class RegistrationData(BaseModel):
"""
Валидация данных регистрации.
💡 Зачем:
- Автоматическая проверка типов
- Понятные ошибки валидации
- Документация схемы данных
"""
name: str = Field(min_length=2, max_length=50)
email: EmailStr
age: int = Field(ge=18, le=120)
username: str | None = Field(default=None, max_length=32)
@validator('name')
def name_alphanumeric(cls, v):
"""Проверка: имя содержит только буквы и цифры."""
if not v.replace(' ', '').isalnum():
raise ValueError('Имя должно содержать только буквы и цифры')
return v
@validator('username')
def username_valid(cls, v):
"""Проверка: корректный формат username."""
if v and not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Неверный формат username')
return v
# Использование
@router.message(Registration.email)
async def process_email(msg: Message, state: FSMContext):
try:
data = await state.get_data()
# Валидация всех данных
registration = RegistrationData(
name=data['name'],
email=msg.text,
age=data['age']
)
# Сохраняем валидированные данные
await user_repo.create(**registration.dict())
except ValidationError as e:
# 💡 Зачем: Пользователь видит понятную ошибку
await msg.answer(f'Ошибка валидации: {e.errors()[0]["msg"]}')
return💡 Совет: Валидируйте данные как можно раньше — на этапе получения от пользователя.
from pydantic import BaseModel, Field, validator
class ProductCallback(BaseModel):
"""
Валидация callback данных.
💡 Зачем:
- Защита от поддельных callback
- Проверка допустимых действий
"""
product_id: int = Field(gt=0)
action: str
quantity: int = Field(default=1, ge=1, le=100)
@validator('action')
def action_valid(cls, v):
"""Проверка: действие должно быть разрешённым."""
allowed = ['view', 'buy', 'cart']
if v not in allowed:
raise ValueError(f'Действие должно быть одним из: {allowed}')
return v
@router.callback_query(F.data.startswith('product:'))
async def product_callback(cb: CallbackQuery):
try:
# Парсим callback
data = parse_callback(cb.data)
callback = ProductCallback(**data)
# Используем валидированные данные
await process_product_action(callback)
except ValidationError as e:
# ⚠️ Важно: Не раскрывайте детали валидации пользователю
await cb.answer('Неверные данные', show_alert=True)Проблема: Токен бота, пароль БД, API ключи — всё это нельзя хранить в коде. Утечка = компрометация бота.
Решение: Переменные окружения + Docker Secrets для продакшена.
# bot/config.py
from pydantic_settings import BaseSettings
from pydantic import SecretStr, Field
class Settings(BaseSettings):
"""
Настройки с защитой секретов.
💡 Зачем SecretStr:
- Не отображается в логах
- Не сериализуется в JSON
- Явное получение через get_secret_value()
"""
# Секреты как SecretStr
bot_token: SecretStr
database_password: SecretStr
api_key: SecretStr
webhook_secret: SecretStr
# Получение значения
@property
def bot_token_value(self) -> str:
return self.bot_token.get_secret_value()
class Config:
env_file = '.env'
# Запрет на вывод секретов
json_encoders = {SecretStr: lambda v: '**********'}
settings = Settings()⚠️ Важно: Никогда не логируйте значения SecretStr. Даже в debug-режиме.
# .gitignore
# ⚠️ Критично: Никогда не коммитьте секреты в git
.env
.env.local
.env.*.local
# Docker secrets
/secrets/
*.secret# .env.example (можно коммитить)
BOT_TOKEN=your_bot_token_here
DATABASE_URL=postgresql://user:password@localhost/dbname
API_KEY=your_api_key_here# docker-compose.yml
services:
bot:
secrets:
- bot_token
- database_password
secrets:
bot_token:
file: ./secrets/bot_token.txt
database_password:
file: ./secrets/db_password.txt# Чтение secrets
def read_secret(path: str) -> str:
with open(path, 'r') as f:
return f.read().strip()
class Settings(BaseSettings):
bot_token: str = Field(
default_factory=lambda: read_secret('/run/secrets/bot_token')
)💡 Зачем Docker Secrets: Файлы секретов монтируются в контейнер, не передаются через переменные окружения (которые видны в
docker inspect).
Проблема: Злоумышленник отправляет 1000 запросов в минуту. Бот тратит все ресурсы на обработку спама.
Решение: Rate limiting + блокировка при множественных нарушениях.
# bot/middleware/anti_flood.py
from collections import defaultdict
from aiogram import BaseMiddleware
from aiogram.types import Message
import time
class AntiFloodMiddleware(BaseMiddleware):
"""
Защита от флуда сообщениями.
💡 Зачем:
- Игнорирование спама
- Сохранение ресурсов для легитимных пользователей
"""
def __init__(self, interval: int = 1, limit: int = 5):
self.interval = interval
self.limit = limit
self.flood_cache: dict[int, list[float]] = defaultdict(list)
async def __call__(
self,
handler: Callable,
event: Message,
data: Dict[str, Any]
) -> Any:
user_id = event.from_user.id
now = time.time()
# Очищаем старые записи
self.flood_cache[user_id] = [
t for t in self.flood_cache[user_id]
if now - t < self.interval
]
if len(self.flood_cache[user_id]) >= self.limit:
# Игнорируем сообщение
# 💡 Зачем: Не тратим ресурсы на спам
return None
self.flood_cache[user_id].append(now)
return await handler(event, data)# bot/services/ban_service.py
from collections import defaultdict
from bot.repositories.user import UserRepository
import logging
logger = logging.getLogger(__name__)
class BanService:
"""
Сервис блокировок.
💡 Зачем:
- Автоматическая блокировка при нарушениях
- Прогрессивное наказание (3 нарушения = бан)
🔗 Связь с другими темами: Мониторинг (алерты на блокировки)
"""
def __init__(self, user_repo: UserRepository):
self.user_repo = user_repo
self.violations: dict[int, int] = defaultdict(int)
async def report_violation(self, user_id: int, reason: str):
"""
Зафиксировать нарушение.
💡 Зачем:
- Накопительная система
- 3 нарушения = автоматический бан
"""
self.violations[user_id] += 1
if self.violations[user_id] >= 3:
await self.ban_user(user_id, reason=f'Multiple violations: {reason}')
self.violations[user_id] = 0
logger.warning(f'User {user_id} auto-banned: {reason}')
async def ban_user(self, user_id: int, reason: str):
"""Заблокировать пользователя."""
user = await self.user_repo.get_by_telegram_id(user_id)
if user:
await self.user_repo.update(user, is_banned=True)
logger.warning(f'User {user_id} banned: {reason}')⚠️ Важно: Ведите журнал блокировок для аудита.
Проблема: Пользователь загружает файл с именем ../../../etc/passwd или вирус. Без проверки это опасно.
Решение: Валидация типа, размера, имени файла.
# bot/utils/file_security.py
import os
from pathlib import Path
from aiogram.types import Document, PhotoSize
ALLOWED_MIME_TYPES = {
'image/jpeg',
'image/png',
'image/gif',
'application/pdf',
}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
def is_safe_file(file: Document | PhotoSize) -> bool:
"""
Проверяет безопасность файла.
💡 Зачем:
- Проверка MIME типа (не доверяйте расширению)
- Ограничение размера (защита от DoS)
"""
# Проверка размера
if hasattr(file, 'file_size') and file.file_size > MAX_FILE_SIZE:
return False
# Проверка MIME типа
if hasattr(file, 'mime_type') and file.mime_type not in ALLOWED_MIME_TYPES:
return False
return True
def safe_filename(filename: str) -> str:
"""
Очищает имя файла от опасных символов.
💡 Зачем:
- Защита от path traversal (../)
- Удаление специальных символов
"""
# Удаляем путь и опасные символы
name = Path(filename).name
name = ''.join(c for c in name if c.isalnum() or c in '._-')
return name
# Использование
@router.message(F.document)
async def handle_document(msg: Message):
doc = msg.document
if not is_safe_file(doc):
await msg.answer('Недопустимый тип файла')
return
# Скачиваем с безопасным именем
# ⚠️ Важно: Сохраняем в отдельной директории, не в корне проекта
file_path = f'uploads/{safe_filename(doc.file_name)}'
await bot.download(doc, destination=file_path)⚠️ Антипаттерн: Не сохраняйте файлы в публично доступной директории без проверки.
Проблема: Произошла подозрительная активность. Кто это сделал? Когда? Какие данные затронуты?
Решение: Аудит-логирование всех критичных действий.
# bot/services/audit_logger.py
import logging
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
audit_logger = logging.getLogger('audit')
class AuditLogger:
"""
Логирование действий пользователей.
💡 Зачем:
- Расследование инцидентов
- Compliance (GDPR, PCI DSS)
- Отслеживание подозрительной активности
🔗 Связь с другими темами: Мониторинг (Sentry, алерты)
"""
def __init__(self, session: AsyncSession):
self.session = session
async def log_action(
self,
user_id: int,
action: str,
details: dict = None,
ip_address: str = None
):
"""
Записать действие в аудит-лог.
💡 Зачем:
- Кто (user_id)
- Что сделал (action)
- Когда (timestamp)
- Детали (details)
"""
audit_logger.info(
f'ACTION: {action}',
extra={
'user_id': user_id,
'details': details,
'ip_address': ip_address,
'timestamp': datetime.utcnow().isoformat()
}
)
# Сохранение в БД для долгосрочного хранения
await self._save_to_db(user_id, action, details)
async def _save_to_db(self, user_id: int, action: str, details: dict):
# Сохранение в таблицу audit_logs
...
# Использование
@router.callback_query(F.data.startswith('payment:'))
async def process_payment(cb: CallbackQuery, audit_logger: AuditLogger):
# 💡 Зачем: Аудит финансовых операций обязателен
await audit_logger.log_action(
user_id=cb.from_user.id,
action='payment_initiated',
details={'amount': 1000}
).env в .gitignoreSecretStr для чувствительных данных.env.example с примерами (без реальных значений)| Тема | Как связана |
|---|---|
| Архитектура | Middleware для сквозной безопасности |
| Мониторинг | Алерты на подозрительную активность |
| Масштабирование | Rate limiting с Redis для нескольких инстансов |
| Тестирование | Тесты на уязвимости |
→ Мониторинг — обнаружение атак в реальном времени
→ Тестирование — security-тесты, penetration testing
→ Масштабирование — распределённый rate limiting
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.