Кэширование, пулы соединений, асинхронность, оптимизация производительности.
От работающего кода к профессиональному решению. В этой теме изучим продвинутые паттерны для создания масштабируемых и производительных MCP серверов.
from mcp.server.fastmcp import FastMCP
from functools import lru_cache
import time
mcp = FastMCP("Caching Demo")
@lru_cache(maxsize=100)
def get_expensive_data(query: str) -> str:
"""Кэширование дорогих запросов."""
time.sleep(2) # Имитация дорогого запроса
return f"Data for: {query}"
@mcp.resource("data://{query}")
def get_cached_resource(query: str) -> str:
"""Ресурс с LRU кэшированием."""
return get_expensive_data(query)
# Сброс кэша при необходимости
@mcp.tool()
def clear_cache():
"""Очистить кэш."""
get_expensive_data.cache_clear()
return "Cache cleared"from functools import wraps
import time
from typing import Any, Dict
class TTLCache:
"""Кэш с временем жизни записей."""
def __init__(self, ttl_seconds: int = 300):
self._cache: Dict[str, tuple] = {}
self._ttl = ttl_seconds
def get(self, key: str) -> Any:
"""Получить значение из кэша."""
if key in self._cache:
value, timestamp = self._cache[key]
if time.time() - timestamp < self._ttl:
return value
else:
del self._cache[key]
return None
def set(self, key: str, value: Any):
"""Сохранить значение в кэш."""
self._cache[key] = (value, time.time())
def clear(self):
"""Очистить кэш."""
self._cache.clear()
# Глобальный кэш
response_cache = TTLCache(ttl_seconds=60)
def cached(ttl: int = 300):
"""Декоратор для кэширования с TTL."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Создаём ключ кэша из аргументов
cache_key = f"{func.__name__}:{args}:{kwargs}"
# Проверяем кэш
cached_value = response_cache.get(cache_key)
if cached_value is not None:
return cached_value
# Вызываем функцию
result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
# Сохраняем в кэш
response_cache.set(cache_key, result)
return result
return wrapper
return decorator
@mcp.tool()
@cached(ttl=60)
async def fetch_external_data(url: str) -> str:
"""Получение данных с внешним кэшированием."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Cache Invalidation Demo")
cache = TTLCache(ttl_seconds=300)
@mcp.tool()
def get_user_data(user_id: str) -> dict:
"""Получение данных пользователя с кэшированием."""
cache_key = f"user:{user_id}"
cached = cache.get(cache_key)
if cached:
return cached
# Получаем данные из БД
data = database.get_user(user_id)
cache.set(cache_key, data)
return data
@mcp.tool()
def update_user_data(user_id: str, **kwargs) -> str:
"""Обновление данных пользователя с инвалидацией кэша."""
# Обновляем в БД
database.update_user(user_id, **kwargs)
# Инвалидируем кэш
cache.set(f"user:{user_id}", None)
return f"User {user_id} updated"
@mcp.tool()
def invalidate_user_cache(user_id: str) -> str:
"""Явная инвалидация кэша пользователя."""
cache.set(f"user:{user_id}", None)
return f"Cache invalidated for user {user_id}"import asyncpg
from contextlib import asynccontextmanager
from typing import AsyncIterator
class DatabasePool:
"""Пул соединений с PostgreSQL."""
def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20):
self._dsn = dsn
self._min_size = min_size
self._max_size = max_size
self._pool: asyncpg.Pool | None = None
async def create_pool(self):
"""Создание пула соединений."""
self._pool = await asyncpg.create_pool(
dsn=self._dsn,
min_size=self._min_size,
max_size=self._max_size,
command_timeout=60
)
async def close_pool(self):
"""Закрытие пула соединений."""
if self._pool:
await self._pool.close()
@asynccontextmanager
async def acquire(self) -> AsyncIterator[asyncpg.Connection]:
"""Получение соединения из пула."""
if not self._pool:
raise RuntimeError("Pool not initialized")
async with self._pool.acquire() as connection:
yield connection
# Глобальный пул
db_pool = DatabasePool(
dsn="postgresql://user:pass@localhost:5432/mydb",
min_size=5,
max_size=20
)
@mcp.tool()
async def query_users(limit: int = 100) -> list:
"""Запрос к БД с использованием пула соединений."""
async with db_pool.acquire() as conn:
rows = await conn.fetch("SELECT * FROM users LIMIT $1", limit)
return [dict(row) for row in rows]
@mcp.tool()
async def get_user_by_id(user_id: int) -> dict:
"""Получение пользователя по ID."""
async with db_pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
return dict(row) if row else None
# Инициализация при старте сервера
async def on_startup():
await db_pool.create_pool()
async def on_shutdown():
await db_pool.close_pool()import aiohttp
from contextlib import asynccontextmanager
class HTTPClientPool:
"""Пул HTTP клиентов."""
def __init__(self, max_connections: int = 100):
self._max_connections = max_connections
self._session: aiohttp.ClientSession | None = None
async def create_session(self):
"""Создание HTTP сессии."""
timeout = aiohttp.ClientTimeout(total=30)
connector = aiohttp.TCPConnector(
limit=self._max_connections,
ttl_dns_cache=300,
use_dns_cache=True
)
self._session = aiohttp.ClientSession(
timeout=timeout,
connector=connector
)
async def close_session(self):
"""Закрытие HTTP сессии."""
if self._session:
await self._session.close()
@asynccontextmanager
async def get_session(self) -> AsyncIterator[aiohttp.ClientSession]:
"""Получение HTTP сессии."""
if not self._session:
await self.create_session()
yield self._session
http_pool = HTTPClientPool(max_connections=100)
@mcp.tool()
async def fetch_url(url: str) -> str:
"""Получение URL с использованием пула соединений."""
async with http_pool.get_session() as session:
async with session.get(url) as response:
response.raise_for_status()
return await response.text()
@mcp.tool()
async def post_json(url: str, data: dict) -> dict:
"""POST запрос с JSON данными."""
async with http_pool.get_session() as session:
async with session.post(url, json=data) as response:
response.raise_for_status()
return await response.json()import asyncio
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Async Patterns Demo")
@mcp.tool()
async def fetch_multiple_sources(urls: list[str]) -> dict:
"""
Параллельное получение данных из нескольких источников.
Args:
urls: Список URL для запроса
"""
async def fetch(session, url):
try:
async with session.get(url, timeout=10) as response:
return {"url": url, "status": response.status, "data": await response.text()}
except Exception as e:
return {"url": url, "error": str(e)}
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {"results": results}
@mcp.tool()
async def parallel_database_queries(queries: list[str]) -> list:
"""
Параллельное выполнение нескольких SQL запросов.
Args:
queries: Список SQL запросов
"""
async def execute_query(conn, query):
try:
rows = await conn.fetch(query)
return {"query": query, "rows": len(rows), "data": [dict(r) for r in rows]}
except Exception as e:
return {"query": query, "error": str(e)}
async with db_pool.acquire() as conn:
tasks = [execute_query(conn, query) for query in queries]
results = await asyncio.gather(*tasks, return_exceptions=True)
return resultsimport asyncio
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
id: str
name: str
status: TaskStatus = TaskStatus.PENDING
result: str = ""
error: str = ""
class TaskQueue:
"""Очередь задач с асинхронным выполнением."""
def __init__(self, max_concurrent: int = 5):
self._queue: asyncio.Queue = asyncio.Queue()
self._tasks: dict[str, Task] = {}
self._max_concurrent = max_concurrent
self._workers: list[asyncio.Task] = []
async def start_workers(self):
"""Запуск рабочих для обработки очереди."""
for i in range(self._max_concurrent):
worker = asyncio.create_task(self._worker(f"worker-{i}"))
self._workers.append(worker)
async def _worker(self, name: str):
"""Рабочий для обработки задач."""
while True:
task = await self._queue.get()
try:
task.status = TaskStatus.RUNNING
# Выполнение задачи (в реальности здесь был бы вызов функции)
await asyncio.sleep(1) # Имитация работы
task.status = TaskStatus.COMPLETED
task.result = f"Completed by {name}"
except Exception as e:
task.status = TaskStatus.FAILED
task.error = str(e)
finally:
self._queue.task_done()
async def submit_task(self, task_id: str, name: str) -> str:
"""Отправка задачи в очередь."""
task = Task(id=task_id, name=name)
self._tasks[task_id] = task
await self._queue.put(task)
return task_id
def get_task_status(self, task_id: str) -> Task:
"""Получение статуса задачи."""
return self._tasks.get(task_id)
# Глобальная очередь задач
task_queue = TaskQueue(max_concurrent=5)
@mcp.tool()
async def submit_long_task(name: str) -> str:
"""Отправка долгой задачи в очередь."""
import uuid
task_id = str(uuid.uuid4())
await task_queue.submit_task(task_id, name)
return task_id
@mcp.tool()
def get_task_status(task_id: str) -> dict:
"""Получение статуса задачи."""
task = task_queue.get_task_status(task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
return {
"id": task.id,
"name": task.name,
"status": task.status.value,
"result": task.result,
"error": task.error
}import asyncio
from functools import wraps
from typing import TypeVar, Callable
T = TypeVar('T')
def with_timeout(seconds: float):
"""Декоратор для установки таймаута на функцию."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
except asyncio.TimeoutError:
raise TimeoutError(f"Operation timed out after {seconds}s")
return wrapper
return decorator
def with_retry(max_attempts: int = 3, delay: float = 1.0):
"""Декоратор для повторных попыток при ошибке."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
await asyncio.sleep(delay * (attempt + 1)) # Exponential backoff
raise last_exception
return wrapper
return decorator
@mcp.tool()
@with_timeout(seconds=30)
@with_retry(max_attempts=3, delay=1.0)
async def unreliable_external_api(endpoint: str) -> dict:
"""
Вызов ненадёжного внешнего API с таймаутом и retry.
Args:
endpoint: Endpoint API
"""
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, timeout=10) as response:
response.raise_for_status()
return await response.json()import asyncio
import time
from enum import Enum
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed" # Нормальная работа
OPEN = "open" # Цепь разомкнута, ошибки
HALF_OPEN = "half_open" # Проверка восстановления
class CircuitBreaker:
"""Circuit Breaker паттерн для защиты от каскадных ошибок."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max_calls: int = 3
):
self._failure_threshold = failure_threshold
self._recovery_timeout = recovery_timeout
self._half_open_max_calls = half_open_max_calls
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: float | None = None
self._half_open_calls = 0
@property
def state(self) -> CircuitState:
"""Текущее состояние."""
if self._state == CircuitState.OPEN:
if time.time() - self._last_failure_time >= self._recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
return self._state
async def call(self, func, *args, **kwargs):
"""Вызов функции через Circuit Breaker."""
state = self.state
if state == CircuitState.OPEN:
raise Exception("Circuit breaker is OPEN")
if state == CircuitState.HALF_OPEN:
if self._half_open_calls >= self._half_open_max_calls:
raise Exception("Circuit breaker half-open limit reached")
self._half_open_calls += 1
try:
result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Обработка успешного вызова."""
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self._half_open_max_calls:
self._state = CircuitState.CLOSED
self._success_count = 0
def _on_failure(self):
"""Обработка неудачного вызова."""
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self._failure_threshold:
self._state = CircuitState.OPEN
# Глобальные Circuit Breaker для внешних сервисов
external_api_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
@mcp.tool()
async def call_external_api(endpoint: str) -> dict:
"""Вызов внешнего API через Circuit Breaker."""
async def make_request():
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, timeout=10) as response:
response.raise_for_status()
return await response.json()
try:
return await external_api_breaker.call(make_request)
except Exception as e:
if "Circuit breaker is OPEN" in str(e):
raise ValueError("External service temporarily unavailable (circuit open)")
raise
@mcp.tool()
def get_circuit_status() -> dict:
"""Получение статуса Circuit Breaker."""
return {
"state": external_api_breaker.state.value,
"failure_count": external_api_breaker._failure_count
}import asyncio
import time
from typing import Dict
class TokenBucket:
"""Rate limiting через Token Bucket алгоритм."""
def __init__(self, rate: float, capacity: int):
"""
Инициализация бакета.
Args:
rate: Количество токенов в секунду
capacity: Максимальная ёмкость бакета
"""
self._rate = rate
self._capacity = capacity
self._tokens = capacity
self._last_update = time.time()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> bool:
"""
Попытка получить токены.
Returns:
True если токены получены, False если недостаточно
"""
async with self._lock:
now = time.time()
elapsed = now - self._last_update
self._last_update = now
# Добавляем токены за прошедшее время
self._tokens = min(self._capacity, self._tokens + elapsed * self._rate)
if self._tokens >= tokens:
self._tokens -= tokens
return True
return False
async def wait_for_token(self, tokens: int = 1):
"""Ожидание доступности токенов."""
while not await self.acquire(tokens):
await asyncio.sleep(0.1)
# Rate limiter для разных типов операций
request_limiter = TokenBucket(rate=10, capacity=20) # 10 req/s, burst 20
write_limiter = TokenBucket(rate=2, capacity=5) # 2 write/s, burst 5
def rate_limited(limiter: TokenBucket):
"""Декоратор для rate limiting."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
await limiter.wait_for_token()
return await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
return wrapper
return decorator
@mcp.tool()
@rate_limited(request_limiter)
async def search(query: str) -> list:
"""Поиск с rate limiting."""
return await database.search(query)
@mcp.tool()
@rate_limited(write_limiter)
async def create_record(data: dict) -> dict:
"""Создание записи с более строгим rate limiting."""
return await database.insert(data)import asyncio
import time
import uuid
from contextvars import ContextVar
from typing import Optional
# Контекстная переменная для trace_id
trace_id_var: ContextVar[Optional[str]] = ContextVar('trace_id', default=None)
class Span:
"""Спан для трассировки."""
def __init__(self, name: str, trace_id: str):
self.name = name
self.trace_id = trace_id
self.span_id = str(uuid.uuid4())
self.start_time: float = 0
self.end_time: float = 0
self.attributes: dict = {}
async def __aenter__(self):
self.start_time = time.time()
logger.info(f"Span started: {self.name} (trace={self.trace_id}, span={self.span_id})")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.end_time = time.time()
duration_ms = (self.end_time - self.start_time) * 1000
logger.info(
f"Span completed: {self.name}",
extra={
"trace_id": self.trace_id,
"span_id": self.span_id,
"duration_ms": duration_ms,
"attributes": self.attributes
}
)
def trace(func):
"""Декоратор для трассировки функции."""
@wraps(func)
async def wrapper(*args, **kwargs):
trace_id = trace_id_var.get() or str(uuid.uuid4())
token = trace_id_var.set(trace_id)
try:
async with Span(func.__name__, trace_id) as span:
span.attributes["args"] = str(args)[:100]
result = await func(*args, **kwargs)
span.attributes["result_size"] = len(str(result)) if result else 0
return result
finally:
trace_id_var.reset(token)
return wrapper
@trace
@mcp.tool()
async def traced_tool(param: str) -> str:
"""Инструмент с трассировкой."""
# Вложенный спан
async with Span("database_query", trace_id_var.get()):
await asyncio.sleep(0.1) # Имитация запроса
return f"Processed: {param}"from prometheus_client import Counter, Histogram, Gauge
import time
# Метрики
request_counter = Counter('mcp_requests_total', 'Total MCP requests', ['tool', 'status'])
request_duration = Histogram('mcp_request_duration_seconds', 'Request duration', ['tool'])
active_connections = Gauge('mcp_active_connections', 'Active connections')
def metrics(func):
"""Декоратор для сбора метрик."""
@wraps(func)
async def wrapper(*args, **kwargs):
tool_name = func.__name__
start_time = time.time()
active_connections.inc()
try:
result = await func(*args, **kwargs)
request_counter.labels(tool=tool_name, status='success').inc()
return result
except Exception as e:
request_counter.labels(tool=tool_name, status='error').inc()
raise
finally:
duration = time.time() - start_time
request_duration.labels(tool=tool_name).observe(duration)
active_connections.dec()
return wrapper
@metrics
@mcp.tool()
async def measured_tool(query: str) -> str:
"""Инструмент со сбором метрик."""
await asyncio.sleep(0.1)
return f"Result for: {query}"| Паттерн | Назначение |
|---|---|
| Кэширование | Снижение нагрузки, ускорение ответов |
| Пул соединений | Эффективное использование ресурсов |
| Асинхронность | Параллельное выполнение I/O операций |
| Circuit Breaker | Защита от каскадных ошибок |
| Token Bucket | Rate limiting для защиты от перегрузки |
| Трассировка | Отладка распределённых систем |
| Метрики | Мониторинг производительности |
Следующая тема: Безопасность — аутентификация, авторизация, валидация, защита от атак.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.