Масштабирование, отказоустойчивость, CI/CD, документирование, версионирование.
Продакшен — это где начинается настоящая работа. В этой теме изучим best practices для надёжных, масштабируемых и поддерживаемых MCP серверов.
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-server-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-server
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 60# ❌ ПЛОХО: Состояние в памяти сервера
class MCPServer:
def __init__(self):
self.sessions = {} # Потеряется при рестарте
self.cache = {} # Потеряется при рестарте
# ✅ ХОРОШО: Внешнее хранилище состояния
import redis.asyncio as redis
class MCPServer:
def __init__(self):
self.redis = redis.Redis(host='redis', port=6379)
async def get_session(self, session_id: str):
# Состояние во внешнем хранилище
data = await self.redis.get(f"session:{session_id}")
return json.loads(data) if data else None
async def save_session(self, session_id: str, data: dict):
await self.redis.setex(
f"session:{session_id}",
3600, # TTL 1 час
json.dumps(data)
)import hashlib
from typing import List
class ShardManager:
"""Менеджер шардирования по user_id."""
def __init__(self, num_shards: int = 16):
self.num_shards = num_shards
def get_shard(self, user_id: str) -> int:
"""Определение шарда по user_id."""
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
return hash_value % self.num_shards
def get_shard_dsn(self, shard: int, base_dsn: str) -> str:
"""Получение DSN для шарда."""
# postgresql://user:pass@host:5432/db_0
return base_dsn.replace("/db", f"/db_{shard}")
shard_manager = ShardManager(num_shards=16)
async def get_user_data(user_id: str) -> dict:
"""Получение данных пользователя с шардированием."""
shard = shard_manager.get_shard(user_id)
shard_dsn = shard_manager.get_shard_dsn(shard, DATABASE_URL)
async with create_db_pool(shard_dsn) as conn:
return await conn.fetchrow(
"SELECT * FROM users WHERE id = $1",
user_id
)import asyncio
from functools import wraps
from typing import Type, Tuple
def retry(
exceptions: Tuple[Type[Exception], ...] = (Exception,),
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0
):
"""
Декоратор для retry с exponential backoff.
Args:
exceptions: Типы исключений для retry
max_retries: Максимальное количество попыток
base_delay: Базовая задержка в секундах
max_delay: Максимальная задержка
exponential_base: База экспоненты
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
break
# Exponential backoff с jitter
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
jitter = delay * 0.1 * asyncio.get_event_loop().time() % 1
await asyncio.sleep(delay + jitter)
raise last_exception
return wrapper
return decorator
@mcp.tool()
@retry(
exceptions=(aiohttp.ClientError, asyncio.TimeoutError),
max_retries=3,
base_delay=1.0
)
async def call_external_api(endpoint: str) -> dict:
"""Вызов внешнего API с retry."""
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, timeout=10) as response:
response.raise_for_status()
return await response.json()import time
from enum import Enum
from dataclasses import dataclass
from typing import Dict, Optional
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitStats:
"""Статистика Circuit Breaker."""
success_count: int = 0
failure_count: int = 0
last_failure_time: Optional[float] = None
last_state_change: float = 0
class ProductionCircuitBreaker:
"""Production-ready Circuit Breaker."""
def __init__(
self,
failure_threshold: int = 5,
success_threshold: int = 3,
recovery_timeout: float = 30.0,
timeout: float = 10.0
):
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.recovery_timeout = recovery_timeout
self.timeout = timeout
self._circuits: Dict[str, CircuitStats] = {}
def _get_circuit(self, name: str) -> CircuitStats:
if name not in self._circuits:
self._circuits[name] = CircuitStats(
last_state_change=time.time()
)
return self._circuits[name]
def get_state(self, name: str) -> CircuitState:
"""Получение состояния цепи."""
circuit = self._get_circuit(name)
if circuit.failure_count >= self.failure_threshold:
if time.time() - circuit.last_failure_time >= self.recovery_timeout:
return CircuitState.HALF_OPEN
return CircuitState.OPEN
return CircuitState.CLOSED
async def call(self, name: str, func, *args, **kwargs):
"""Вызов функции через Circuit Breaker."""
state = self.get_state(name)
circuit = self._get_circuit(name)
if state == CircuitState.OPEN:
raise Exception(f"Circuit {name} is OPEN")
if state == CircuitState.HALF_OPEN:
if circuit.success_count >= self.success_threshold:
circuit.success_count = 0
circuit.failure_count = 0
circuit.last_state_change = time.time()
try:
result = await asyncio.wait_for(
func(*args, **kwargs),
timeout=self.timeout
)
# Успех
if state == CircuitState.HALF_OPEN:
circuit.success_count += 1
else:
circuit.success_count = min(circuit.success_count + 1, 100)
return result
except Exception as e:
# Ошибка
circuit.failure_count += 1
circuit.last_failure_time = time.time()
raise
def get_stats(self, name: str) -> dict:
"""Получение статистики."""
circuit = self._get_circuit(name)
return {
"state": self.get_state(name).value,
"success_count": circuit.success_count,
"failure_count": circuit.failure_count,
"last_failure_time": circuit.last_failure_time
}
# Глобальные Circuit Breaker
circuit_breakers = {
"database": ProductionCircuitBreaker(),
"external_api": ProductionCircuitBreaker(),
"cache": ProductionCircuitBreaker()
}
@mcp.tool()
async def resilient_database_call(query: str) -> list:
"""Вызов БД с Circuit Breaker."""
cb = circuit_breakers["database"]
return await cb.call("database", execute_query, query)
@mcp.tool()
def get_circuit_stats() -> dict:
"""Статус всех Circuit Breaker."""
return {
name: cb.get_stats(name)
for name, cb in circuit_breakers.items()
}from pydantic import BaseSettings, Field
from typing import Optional, List
class Settings(BaseSettings):
"""Настройки приложения из переменных окружения."""
# Приложение
app_name: str = Field(default="MCP Server", env="APP_NAME")
version: str = Field(default="1.0.0", env="VERSION")
debug: bool = Field(default=False, env="DEBUG")
# Сервер
host: str = Field(default="0.0.0.0", env="HOST")
port: int = Field(default=8000, env="PORT")
workers: int = Field(default=4, env="WORKERS")
# База данных
database_url: str = Field(env="DATABASE_URL")
db_pool_size: int = Field(default=10, env="DB_POOL_SIZE")
db_max_overflow: int = Field(default=20, env="DB_MAX_OVERFLOW")
# Redis
redis_url: str = Field(default="redis://localhost:6379", env="REDIS_URL")
# Логирование
log_level: str = Field(default="INFO", env="LOG_LEVEL")
log_format: str = Field(default="json", env="LOG_FORMAT")
# Безопасность
jwt_secret: str = Field(env="JWT_SECRET")
api_keys: List[str] = Field(default_factory=list, env="API_KEYS")
# Rate limiting
rate_limit: int = Field(default=100, env="RATE_LIMIT")
rate_limit_period: int = Field(default=60, env="RATE_LIMIT_PERIOD")
# Внешние сервисы
external_api_url: Optional[str] = Field(default=None, env="EXTERNAL_API_URL")
external_api_key: Optional[str] = Field(default=None, env="EXTERNAL_API_KEY")
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# Глобальные настройки
settings = Settings()from typing import Dict, Any
import redis.asyncio as redis
class FeatureFlagManager:
"""Менеджер feature flags."""
def __init__(self, redis_url: str):
self.redis = redis.Redis.from_url(redis_url)
self._cache: Dict[str, Any] = {}
self._cache_ttl = 60
async def is_enabled(
self,
flag_name: str,
default: bool = False
) -> bool:
"""Проверка, включен ли флаг."""
# Проверка кэша
if flag_name in self._cache:
value, timestamp = self._cache[flag_name]
if time.time() - timestamp < self._cache_ttl:
return value
# Получение из Redis
value = await self.redis.get(f"flag:{flag_name}")
enabled = value == "true" if value else default
# Кэширование
self._cache[flag_name] = (enabled, time.time())
return enabled
async def get_value(
self,
flag_name: str,
default: Any = None
) -> Any:
"""Получение значения флага."""
value = await self.redis.get(f"flag:{flag_name}")
if value is None:
return default
try:
return json.loads(value)
except (json.JSONDecodeError, TypeError):
return value
feature_flags = FeatureFlagManager(settings.redis_url)
@mcp.tool()
async def new_feature(data: str) -> str:
"""Новая функция за feature flag."""
if await feature_flags.is_enabled("new_feature", default=False):
return await new_implementation(data)
else:
return await old_implementation(data)from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
app = FastAPI(
title="MCP Server API",
description="MCP сервер для интеграции с внешними системами",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="MCP Server API",
version="1.0.0",
description="""
## MCP Server API
Этот API предоставляет доступ к MCP инструментам и ресурсам.
### Аутентификация
Используйте API ключ в заголовке:Authorization: Bearer YOUR_API_KEY
### Rate Limiting
- 100 запросов в минуту для обычных пользователей
- 1000 запросов в минуту для premium пользователей
""",
routes=app.routes,
)
# Добавляем примеры
openapi_schema["components"]["securitySchemes"] = {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
# MCP Server
## Быстрый старт
### Локальный запуск
```bash
# Клонирование
git clone https://github.com/company/mcp-server.git
cd mcp-server
# Установка зависимостей
poetry install
# Настройка окружения
cp .env.example .env
# Отредактируйте .env
# Запуск
poetry run python -m uvicorn src.server:app --reloaddocker-compose up -d# Юнит-тесты
poetry run pytest tests/unit
# Интеграционные тесты
poetry run pytest tests/integration
# Покрытие
poetry run pytest --cov=src --cov-report=htmlСм. deployment
---
## 5. Версионирование
### API версионирование
```python
from fastapi import APIRouter, FastAPI
app = FastAPI()
# v1 роуты
v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/tools")
async def list_tools_v1():
return {"version": "v1", "tools": [...]}
# v2 роуты
v2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/tools")
async def list_tools_v2():
return {
"version": "v2",
"tools": [...],
"metadata": {...} # Новые поля
}
app.include_router(v1_router)
app.include_router(v2_router)
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Versioned MCP")
@mcp.tool(name="search_v1")
async def search_v1(query: str) -> list:
"""Поиск (версия 1, устаревшая)."""
# Старая реализация
pass
@mcp.tool(name="search_v2")
async def search_v2(query: str, limit: int = 10, filters: dict = None) -> list:
"""
Поиск (версия 2, текущая).
Args:
query: Поисковый запрос
limit: Максимум результатов (1-100)
filters: Фильтры (category, date_range)
"""
# Новая реализация с фильтрами
pass
@mcp.tool(name="search")
async def search(query: str, **kwargs) -> list:
"""
Поиск (алиас на последнюю версию).
Перенаправляет на search_v2.
"""
return await search_v2(query, **kwargs)# .gitlab-ci.yml
stages:
- lint
- test
- security
- build
- deploy
variables:
DOCKER_IMAGE: $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
lint:
stage: lint
script:
- poetry run ruff check src/ tests/
- poetry run black --check src/ tests/
- poetry run mypy src/
test:
stage: test
script:
- poetry run pytest tests/ --cov=src --cov-report=xml
- poetry run coverage report --fail-under=80
security:
stage: security
script:
- poetry run bandit -r src/
- poetry run safety check
- docker run --rm -v $PWD:/app aquasec/trivy fs /app
build:
stage: build
script:
- docker build -t $DOCKER_IMAGE .
- docker push $DOCKER_IMAGE
only:
- main
deploy-staging:
stage: deploy
script:
- kubectl set image deployment/mcp-server mcp-server=$DOCKER_IMAGE -n staging
environment:
name: staging
only:
- main
deploy-production:
stage: deploy
script:
- kubectl rollout status deployment/mcp-server -n production
environment:
name: production
when: manual
only:
- main| Практика | Назначение |
|---|---|
| Stateless | Горизонтальное масштабирование |
| Circuit Breaker | Изоляция сбоев |
| Retry с backoff | Обработка временных ошибок |
| Feature Flags | Безопасный деплой новых функций |
| API версионирование | Обратная совместимость |
| CI/CD | Автоматизация деплоя |
Курс завершён! 🎉
Вы освоили создание MCP серверов от основ до production best practices. Теперь вы готовы создавать надёжные, масштабируемые интеграции для ИИ-агентов.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.