Аутентификация, авторизация, валидация входных данных, защита от инъекций.
Безопасность — не опция, а необходимость. В этой теме изучим аутентификацию, авторизацию, валидацию и защиту от распространённых атак.
| Категория | Примеры | Риск |
|---|---|---|
| Инъекции | SQL, NoSQL, Command Injection | Критический |
| Неавторизованный доступ | Обход аутентификации, эскалация прав | Высокий |
| Утечка данных | Чтение чувствительных файлов, данных БД | Высокий |
| DoS | Перегрузка сервера запросами | Средний |
| Path Traversal | Доступ к файлам вне разрешённой зоны | Средний |
from mcp.server.fastmcp import FastMCP
import os
from functools import wraps
mcp = FastMCP("Auth Demo")
# Хранилище API ключей (в реальности — БД)
API_KEYS = {
"sk-abc123": {"user_id": "user1", "role": "admin"},
"sk-def456": {"user_id": "user2", "role": "user"}
}
def require_auth(func):
"""Декоратор для требования аутентификации."""
@wraps(func)
async def wrapper(*args, api_key: str = None, **kwargs):
if not api_key:
raise PermissionError("API key required")
if api_key not in API_KEYS:
raise PermissionError("Invalid API key")
# Добавляем информацию о пользователе в контекст
kwargs['auth_context'] = API_KEYS[api_key]
return await func(*args, **kwargs)
return wrapper
@mcp.tool()
@require_auth
async def get_user_data(api_key: str, auth_context: dict = None) -> dict:
"""Получение данных пользователя (требует аутентификации)."""
user_id = auth_context['user_id']
return {"user_id": user_id, "data": "sensitive data"}import jwt
from datetime import datetime, timedelta
from functools import wraps
JWT_SECRET = os.getenv("JWT_SECRET", "your-secret-key")
JWT_ALGORITHM = "HS256"
def create_jwt_token(user_id: str, role: str = "user") -> str:
"""Создание JWT токена."""
payload = {
"user_id": user_id,
"role": role,
"exp": datetime.utcnow() + timedelta(hours=24),
"iat": datetime.utcnow()
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def verify_jwt_token(token: str) -> dict:
"""Проверка JWT токена."""
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise PermissionError("Token expired")
except jwt.InvalidTokenError:
raise PermissionError("Invalid token")
def require_jwt(func):
"""Декоратор для требования JWT аутентификации."""
@wraps(func)
async def wrapper(*args, token: str = None, **kwargs):
if not token:
raise PermissionError("Authentication token required")
payload = verify_jwt_token(token)
kwargs['auth_context'] = payload
return await func(*args, **kwargs)
return wrapper
@mcp.tool()
@require_jwt
async def protected_action(token: str, auth_context: dict = None) -> str:
"""Защищённое действие (требует JWT)."""
user_id = auth_context['user_id']
return f"Action performed for user {user_id}"from authlib.integrations.starlette_client import OAuth
class OAuthProvider:
"""OAuth провайдер для аутентификации."""
def __init__(self):
self.oauth = OAuth()
self.oauth.register(
name='google',
client_id=os.getenv("GOOGLE_CLIENT_ID"),
client_secret=os.getenv("GOOGLE_CLIENT_SECRET"),
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={'scope': 'openid email profile'}
)
async def get_user_info(self, token: str) -> dict:
"""Получение информации о пользователе."""
session = await self.oauth.google.parse_id_token(token, nonce=None)
return {
"user_id": session['sub'],
"email": session['email'],
"name": session['name']
}
oauth = OAuthProvider()
@mcp.tool()
async def oauth_action(id_token: str) -> str:
"""Действие с OAuth аутентификацией."""
user_info = await oauth.get_user_info(id_token)
return f"Authenticated user: {user_info['email']}"from enum import Enum
from functools import wraps
class Role(Enum):
ADMIN = "admin"
EDITOR = "editor"
VIEWER = "viewer"
# Матрица прав доступа
PERMISSIONS = {
Role.ADMIN: {"read", "write", "delete", "admin"},
Role.EDITOR: {"read", "write"},
Role.VIEWER: {"read"}
}
def require_permission(permission: str):
"""Декоратор для требования конкретного права."""
def decorator(func):
@wraps(func)
async def wrapper(*args, auth_context: dict = None, **kwargs):
if not auth_context:
raise PermissionError("Authentication required")
user_role = Role(auth_context.get('role', 'viewer'))
user_permissions = PERMISSIONS.get(user_role, set())
if permission not in user_permissions:
raise PermissionError(
f"Insufficient permissions. Required: {permission}, "
f"has: {user_permissions}"
)
return await func(*args, **kwargs)
return wrapper
return decorator
@mcp.tool()
@require_permission("read")
async def read_data(auth_context: dict = None) -> list:
"""Чтение данных (требуется read permission)."""
return database.get_all()
@mcp.tool()
@require_permission("write")
async def write_data(data: dict, auth_context: dict = None) -> str:
"""Запись данных (требуется write permission)."""
database.insert(data)
return "Data written"
@mcp.tool()
@require_permission("delete")
async def delete_record(record_id: str, auth_context: dict = None) -> str:
"""Удаление записи (требуется delete permission)."""
database.delete(record_id)
return f"Record {record_id} deleted"
@mcp.tool()
@require_permission("admin")
async def admin_action(auth_context: dict = None) -> str:
"""Административное действие (требуется admin permission)."""
return "Admin action performed"@mcp.tool()
@require_jwt
async def get_document(
document_id: str,
token: str = None,
auth_context: dict = None
) -> dict:
"""Получение документа с проверкой прав доступа."""
user_id = auth_context['user_id']
user_role = Role(auth_context.get('role', 'viewer'))
# Получаем документ
doc = database.get_document(document_id)
if not doc:
raise ValueError(f"Document {document_id} not found")
# Проверка прав доступа
if doc['owner_id'] != user_id and user_role != Role.ADMIN:
# Проверяем, есть ли доступ по sharing rules
sharing = database.get_sharing(document_id)
if not sharing or user_id not in sharing.get('users', []):
raise PermissionError(f"Access denied to document {document_id}")
return docfrom pydantic import BaseModel, EmailStr, Field, validator, conint
class CreateUserInput(BaseModel):
"""Схема валидации для создания пользователя."""
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
age: conint(ge=0, le=150)
password: str = Field(..., min_length=8)
@validator('username')
def validate_username(cls, v):
if not v.isalnum() and '_' not in v:
raise ValueError('Username must be alphanumeric with underscores')
return v
@validator('password')
def validate_password(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('Password must contain uppercase letter')
if not any(c.isdigit() for c in v):
raise ValueError('Password must contain digit')
return v
@mcp.tool()
async def create_user(user_data: dict) -> dict:
"""Создание пользователя с валидацией."""
try:
validated = CreateUserInput(**user_data)
except ValueError as e:
raise ValueError(f"Validation error: {e}")
# Создаём пользователя (пароль уже хэширован)
user = database.create_user(
username=validated.username,
email=validated.email,
age=validated.age,
password_hash=hash_password(validated.password)
)
return {"id": user.id, "username": user.username}import re
from enum import Enum
class SQLOperation(Enum):
SELECT = "SELECT"
INSERT = "INSERT"
UPDATE = "UPDATE"
DELETE = "DELETE"
# Whitelist разрешённых операций
ALLOWED_OPERATIONS = {SQLOperation.SELECT}
# Whitelist разрешённых таблиц
ALLOWED_TABLES = {"users", "posts", "comments"}
def validate_sql_query(query: str, allowed_ops: set = None) -> SQLOperation:
"""
Валидация SQL запроса.
Returns:
Тип операции если запрос валиден
"""
allowed_ops = allowed_ops or ALLOWED_OPERATIONS
# Нормализация запроса
query_upper = query.strip().upper()
# Определение типа операции
operation = None
for op in SQLOperation:
if query_upper.startswith(op.value):
operation = op
break
if not operation:
raise ValueError("Unknown SQL operation")
if operation not in allowed_ops:
raise ValueError(f"Operation {operation.value} not allowed")
# Проверка имени таблицы
table_match = re.search(r'FROM\s+(\w+)', query_upper)
if table_match:
table_name = table_match.group(1)
if table_name not in ALLOWED_TABLES:
raise ValueError(f"Table {table_name} not allowed")
# Проверка на опасные паттерны
dangerous_patterns = [
r'DROP\s+TABLE',
r'TRUNCATE\s+',
r'ALTER\s+TABLE',
r'CREATE\s+USER',
r'GRANT\s+',
r';\s*--', # Multiple statements
]
for pattern in dangerous_patterns:
if re.search(pattern, query_upper):
raise ValueError("Dangerous SQL pattern detected")
return operation
@mcp.tool()
async def execute_query(sql: str, params: dict = None) -> list:
"""Выполнение SQL запроса с валидацией."""
# Валидация
validate_sql_query(sql)
# Параметризированный запрос
async with db_pool.acquire() as conn:
rows = await conn.fetch(sql, **(params or {}))
return [dict(row) for row in rows]from pathlib import Path
class SecureFileAccess:
"""Безопасный доступ к файлам."""
def __init__(self, base_dir: str):
self.base_dir = Path(base_dir).resolve()
def validate_path(self, requested_path: str) -> Path:
"""
Валидация пути для предотвращения path traversal.
Returns:
Разрешённый абсолютный путь
"""
# Конструируем полный путь
full_path = (self.base_dir / requested_path).resolve()
# Проверяем, что путь внутри base_dir
try:
full_path.relative_to(self.base_dir)
except ValueError:
raise PermissionError(
f"Access denied: path traversal detected ({requested_path})"
)
return full_path
def read_file(self, filename: str) -> str:
"""Чтение файла с валидацией пути."""
safe_path = self.validate_path(filename)
if not safe_path.exists():
raise FileNotFoundError(f"File not found: {filename}")
if not safe_path.is_file():
raise ValueError(f"Not a file: {filename}")
return safe_path.read_text()
def list_directory(self, subdir: str = "") -> list:
"""Список файлов в директории."""
if subdir:
safe_path = self.validate_path(subdir)
else:
safe_path = self.base_dir
if not safe_path.is_dir():
raise ValueError(f"Not a directory: {subdir}")
return [f.name for f in safe_path.iterdir() if f.is_file()]
# Использование
file_access = SecureFileAccess(base_dir="/var/mcp/data")
@mcp.resource("file://{filename}")
def read_secure_file(filename: str) -> str:
"""Чтение файла из безопасной директории."""
return file_access.read_file(filename)
@mcp.tool()
def list_files(subdir: str = "") -> list:
"""Список файлов в директории."""
return file_access.list_directory(subdir)# ❌ ПЛОХО: Уязвимость SQL-инъекции
@mcp.tool()
async def get_user_insecure(username: str) -> dict:
query = f"SELECT * FROM users WHERE username = '{username}'"
return await database.execute(query)
# ✅ ХОРОШО: Параметризированный запрос
@mcp.tool()
async def get_user_secure(username: str) -> dict:
async with db_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM users WHERE username = $1",
username
)
return dict(row) if row else Noneimport subprocess
import shlex
# ❌ ПЛОХО: Уязвимость command injection
@mcp.tool()
def run_command_insecure(command: str) -> str:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return result.stdout
# ✅ ХОРОШО: Безопасное выполнение
@mcp.tool()
def run_command_secure(command: str, args: list) -> str:
# Whitelist разрешённых команд
allowed_commands = {"ls", "cat", "grep"}
if command not in allowed_commands:
raise ValueError(f"Command {command} not allowed")
# Выполнение без shell
full_command = [command] + args
result = subprocess.run(full_command, capture_output=True, text=True)
return result.stdoutfrom pymongo import MongoClient
# ❌ ПЛОХО: Уязвимость NoSQL-инъекции
@mcp.tool()
def find_user_insecure(username: str) -> dict:
# Если username = {"$ne": null}, найдёт всех пользователей
return db.users.find_one({"username": username})
# ✅ ХОРОШО: Валидация типа
@mcp.tool()
def find_user_secure(username: str) -> dict:
# Явная валидация типа
if not isinstance(username, str):
raise ValueError("Username must be a string")
return db.users.find_one({"username": username})import logging
logger = logging.getLogger(__name__)
@mcp.tool()
async def process_payment(amount: float, card_number: str) -> str:
"""Обработка платежа с безопасной обработкой ошибок."""
try:
# Валидация
if amount <= 0:
raise ValueError("Amount must be positive")
# Обработка платежа
result = await payment_gateway.charge(card_number, amount)
return f"Payment of ${amount} successful"
except ValueError as e:
# Пользовательские ошибки показываем как есть
raise ValueError(str(e))
except PaymentGatewayError as e:
# Логируем полную ошибку
logger.exception(f"Payment gateway error: {e}")
# Пользователю показываем упрощённую версию
raise ValueError("Payment service temporarily unavailable")
except Exception as e:
# Неожиданные ошибки
logger.exception(f"Unexpected error: {e}")
raise ValueError("Internal error occurred")import logging
from typing import Any
logger = logging.getLogger(__name__)
def sanitize_for_logging(value: Any) -> str:
"""Санитизация данных для логирования."""
if isinstance(value, str):
# Скрываем потенциальные секреты
sensitive_patterns = [
r'password[=:]\S+',
r'token[=:]\S+',
r'api_key[=:]\S+',
r'secret[=:]\S+'
]
result = value
for pattern in sensitive_patterns:
result = re.sub(pattern, '[REDACTED]', result, flags=re.IGNORECASE)
return result
return str(value)
async def logged_tool_call(tool_name: str, params: dict):
"""Вызов инструмента с безопасным логированием."""
logger.info(
f"Tool call: {tool_name}",
extra={
"params": sanitize_for_logging(str(params))
}
)
try:
result = await call_tool(tool_name, params)
logger.info(f"Tool completed: {tool_name}")
return result
except Exception as e:
logger.error(
f"Tool error: {tool_name}",
extra={
"error": sanitize_for_logging(str(e))
}
)
raisefrom datetime import datetime
import json
class AuditLogger:
"""Логгер для аудита действий."""
def __init__(self, log_file: str):
self.log_file = Path(log_file)
def log_action(
self,
user_id: str,
action: str,
resource: str,
status: str,
details: dict = None
):
"""Логирование действия."""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"action": action,
"resource": resource,
"status": status,
"details": details or {}
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(entry) + '\n')
audit_logger = AuditLogger("/var/log/mcp/audit.log")
def audit_log(func):
"""Декоратор для аудита действий."""
@wraps(func)
async def wrapper(*args, auth_context: dict = None, **kwargs):
user_id = auth_context.get('user_id', 'anonymous') if auth_context else 'anonymous'
action = func.__name__
try:
result = await func(*args, **kwargs)
audit_logger.log_action(
user_id=user_id,
action=action,
resource=str(kwargs.get('id', 'N/A')),
status='success'
)
return result
except Exception as e:
audit_logger.log_action(
user_id=user_id,
action=action,
resource=str(kwargs.get('id', 'N/A')),
status='failure',
details={"error": str(e)}
)
raise
return wrapper
@mcp.tool()
@audit_log
@require_jwt
async def delete_user(
user_id: str,
token: str = None,
auth_context: dict = None
) -> str:
"""Удаление пользователя с аудитом."""
database.delete_user(user_id)
return f"User {user_id} deleted"| Механизм | Назначение |
|---|---|
| Аутентификация | Проверка личности (API Keys, JWT, OAuth) |
| Авторизация | Проверка прав (RBAC, permissions) |
| Валидация | Проверка входных данных (Pydantic, whitelist) |
| Параметризация | Защита от инъекций (SQL, NoSQL, Command) |
| Аудит | Логирование действий для расследования |
Следующая тема: Тестирование MCP серверов — юнит-тесты, моки, интеграционное тестирование.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.