Legacy-системы, микросервисы, event-driven архитектура, очереди сообщений.
MCP — клей для ваших систем. В этой теме научимся интегрировать MCP серверы с legacy, микросервисами, очередями сообщений и event-driven архитектурой.
from mcp.server.fastmcp import FastMCP
from zeep import Client
from functools import lru_cache
mcp = FastMCP("Legacy SOAP Integration")
# SOAP клиент
SOAP_WSDL = "https://legacy.example.com/service?wsdl"
soap_client = Client(SOAP_WSDL)
@mcp.tool()
def get_legacy_customer(customer_id: str) -> dict:
"""
Получение данных клиента из legacy SOAP системы.
Args:
customer_id: ID клиента
"""
try:
result = soap_client.service.GetCustomer(CustomerID=customer_id)
return {
"id": result.CustomerID,
"name": result.Name,
"email": result.Email,
"status": result.Status
}
except Exception as e:
raise ValueError(f"Legacy system error: {str(e)}")
@mcp.tool()
def create_legacy_order(
customer_id: str,
items: list,
total: float
) -> dict:
"""
Создание заказа в legacy системе.
Args:
customer_id: ID клиента
items: Список товаров
total: Общая сумма
"""
# Конвертация в SOAP формат
soap_items = [
{"ProductID": item["id"], "Quantity": item["qty"]}
for item in items
]
result = soap_client.service.CreateOrder(
CustomerID=customer_id,
OrderItems=soap_items,
TotalAmount=total
)
return {
"order_id": result.OrderID,
"status": result.Status,
"created_at": result.CreatedDate
}import aiohttp
class MainframeAdapter:
"""Адаптер для mainframe системы через REST шлюз."""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self._session = None
async def get_session(self):
if not self._session:
self._session = aiohttp.ClientSession(
headers={"X-API-Key": self.api_key}
)
return self._session
async def close(self):
if self._session:
await self._session.close()
async def get_account(self, account_id: str) -> dict:
"""Получение данных счёта."""
session = await self.get_session()
async with session.get(
f"{self.base_url}/accounts/{account_id}"
) as response:
response.raise_for_status()
return await response.json()
async def post_transaction(
self,
account_id: str,
amount: float,
description: str
) -> dict:
"""Проведение транзакции."""
session = await self.get_session()
async with session.post(
f"{self.base_url}/transactions",
json={
"accountId": account_id,
"amount": amount,
"description": description
}
) as response:
response.raise_for_status()
return await response.json()
mainframe = MainframeAdapter(
base_url="https://mainframe-gateway.example.com/api",
api_key="your-api-key"
)
@mcp.tool()
async def get_account_balance(account_id: str) -> dict:
"""Получение баланса счёта из mainframe."""
account = await mainframe.get_account(account_id)
return {
"account_id": account_id,
"balance": account["balance"],
"currency": account["currency"],
"last_updated": account["lastUpdated"]
}
@mcp.tool()
async def transfer_funds(
from_account: str,
to_account: str,
amount: float,
description: str
) -> dict:
"""Перевод средств между счетами."""
# Дебетование
debit = await mainframe.post_transaction(
from_account, -amount, f"Transfer to {to_account}: {description}"
)
# Кредитование
credit = await mainframe.post_transaction(
to_account, amount, f"Transfer from {from_account}: {description}"
)
return {
"debit_transaction": debit["transactionId"],
"credit_transaction": credit["transactionId"],
"status": "completed"
}from mcp.server.fastmcp import FastMCP
import aiohttp
from typing import Dict, List
mcp = FastMCP("Microservices Integration")
class ServiceRegistry:
"""Реестр микросервисов."""
def __init__(self, consul_url: str):
self.consul_url = consul_url
self._cache: Dict[str, List[dict]] = {}
self._cache_ttl = 60 # секунд
async def get_service_instances(self, service_name: str) -> List[dict]:
"""Получение инстансов сервиса из Consul."""
import time
# Проверка кэша
if service_name in self._cache:
instances, timestamp = self._cache[service_name]
if time.time() - timestamp < self._cache_ttl:
return instances
# Запрос к Consul
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.consul_url}/v1/catalog/service/{service_name}"
) as response:
response.raise_for_status()
instances = await response.json()
# Кэширование
self._cache[service_name] = (instances, time.time())
return instances
async def get_healthy_instance(self, service_name: str) -> dict:
"""Получение здорового инстанса."""
instances = await self.get_service_instances(service_name)
healthy = [i for i in instances if i.get("HealthStatus") == "passing"]
if not healthy:
raise ValueError(f"No healthy instances for {service_name}")
# Load balancing: random selection
import random
return random.choice(healthy)
registry = ServiceRegistry(consul_url="http://consul:8500")
@mcp.tool()
async def call_user_service(endpoint: str, user_id: str) -> dict:
"""Вызов user-service микросервиса."""
instance = await registry.get_healthy_instance("user-service")
async with aiohttp.ClientSession() as session:
async with session.get(
f"http://{instance['Address']}:{instance['ServicePort']}/{endpoint}/{user_id}"
) as response:
response.raise_for_status()
return await response.json()
@mcp.tool()
async def call_order_service(
endpoint: str,
order_data: dict
) -> dict:
"""Вызов order-service микросервиса."""
instance = await registry.get_healthy_instance("order-service")
async with aiohttp.ClientSession() as session:
async with session.post(
f"http://{instance['Address']}:{instance['ServicePort']}/{endpoint}",
json=order_data
) as response:
response.raise_for_status()
return await response.json()class APIGateway:
"""API Gateway для агрегации микросервисов."""
def __init__(self, registry: ServiceRegistry):
self.registry = registry
async def get_user_profile(self, user_id: str) -> dict:
"""Агрегация профиля пользователя из нескольких сервисов."""
import asyncio
# Параллельные запросы к сервисам
user_task = self._call_user_service("users", user_id)
orders_task = self._call_order_service("orders", {"userId": user_id, "limit": 5})
notifications_task = self._call_notification_service("unread", user_id)
user_info, recent_orders, unread_count = await asyncio.gather(
user_task, orders_task, notifications_task,
return_exceptions=True
)
# Агрегация результатов
profile = {
"user_id": user_id,
"user_info": user_info if not isinstance(user_info, Exception) else None,
"recent_orders": recent_orders if not isinstance(recent_orders, Exception) else [],
"unread_notifications": unread_count if not isinstance(unread_count, Exception) else 0
}
return profile
async def _call_user_service(self, endpoint: str, *args):
instance = await self.registry.get_healthy_instance("user-service")
# ... реализация вызова
async def _call_order_service(self, endpoint: str, *args):
instance = await self.registry.get_healthy_instance("order-service")
# ... реализация вызова
async def _call_notification_service(self, endpoint: str, *args):
instance = await self.registry.get_healthy_instance("notification-service")
# ... реализация вызова
gateway = APIGateway(registry)
@mcp.tool()
async def get_full_user_profile(user_id: str) -> dict:
"""Получение полного профиля пользователя (агрегация из микросервисов)."""
return await gateway.get_user_profile(user_id)import aio_pika
from mcp.server.fastmcp import FastMCP
import asyncio
import json
mcp = FastMCP("RabbitMQ Integration")
class RabbitMQClient:
"""Клиент для RabbitMQ."""
def __init__(self, url: str):
self.url = url
self._connection = None
self._channel = None
async def connect(self):
"""Подключение к RabbitMQ."""
self._connection = await aio_pika.connect_robust(self.url)
self._channel = await self._connection.channel()
async def close(self):
"""Закрытие подключения."""
if self._connection:
await self._connection.close()
async def publish_message(
self,
exchange: str,
routing_key: str,
message: dict
) -> str:
"""Публикация сообщения."""
if not self._channel:
await self.connect()
body = json.dumps(message).encode()
message_obj = aio_pika.Message(body=body)
exchange_obj = await self._channel.get_exchange(exchange)
await exchange_obj.publish(message_obj, routing_key=routing_key)
return message_obj.message_id
async def subscribe_queue(
self,
queue_name: str,
callback
):
"""Подписка на очередь."""
if not self._channel:
await self.connect()
queue = await self._channel.get_queue(queue_name)
await queue.consume(callback)
rabbitmq = RabbitMQClient(url="amqp://guest:guest@rabbitmq/")
@mcp.tool()
async def publish_event(
event_type: str,
payload: dict
) -> str:
"""
Публикация события в очередь.
Args:
event_type: Тип события (order.created, user.updated)
payload: Данные события
"""
message = {
"event_type": event_type,
"payload": payload,
"timestamp": asyncio.get_event_loop().time()
}
message_id = await rabbitmq.publish_message(
exchange="events",
routing_key=event_type,
message=message
)
return f"Event published with id: {message_id}"
@mcp.tool()
async def get_queue_length(queue_name: str) -> int:
"""Получение длины очереди."""
if not rabbitmq._channel:
await rabbitmq.connect()
queue = await rabbitmq._channel.get_queue(queue_name)
return queue.declaration_result.message_countfrom aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
class KafkaClient:
"""Клиент для Apache Kafka."""
def __init__(self, brokers: list):
self.brokers = brokers
self._producer = None
async def get_producer(self) -> AIOKafkaProducer:
if not self._producer:
self._producer = AIOKafkaProducer(
bootstrap_servers=','.join(self.brokers),
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
await self._producer.start()
return self._producer
async def close(self):
if self._producer:
await self._producer.stop()
async def send_message(
self,
topic: str,
key: str,
value: dict
):
"""Отправка сообщения в Kafka."""
producer = await self.get_producer()
await producer.send_and_wait(
topic,
key=key.encode('utf-8'),
value=value
)
kafka = KafkaClient(brokers=["kafka1:9092", "kafka2:9092"])
@mcp.tool()
async def send_kafka_message(
topic: str,
key: str,
message: dict
) -> str:
"""
Отправка сообщения в Kafka топик.
Args:
topic: Название топика
key: Ключ сообщения
message: Данные сообщения
"""
await kafka.send_message(topic, key, message)
return f"Message sent to {topic} with key {key}"from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List
import json
@dataclass
class Event:
"""Базовое событие."""
event_id: str
event_type: str
aggregate_id: str
timestamp: str
payload: dict
class EventStore:
"""Хранилище событий."""
def __init__(self, db_pool):
self.db_pool = db_pool
async def append_event(self, event: Event):
"""Добавление события."""
async with self.db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO events
(event_id, event_type, aggregate_id, timestamp, payload)
VALUES ($1, $2, $3, $4, $5)
""",
event.event_id,
event.event_type,
event.aggregate_id,
event.timestamp,
json.dumps(event.payload)
)
async def get_events(
self,
aggregate_id: str,
from_version: int = 0
) -> List[Event]:
"""Получение событий агрегата."""
async with self.db_pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT * FROM events
WHERE aggregate_id = $1
ORDER BY timestamp
OFFSET $2
""",
aggregate_id,
from_version
)
return [
Event(
event_id=row["event_id"],
event_type=row["event_type"],
aggregate_id=row["aggregate_id"],
timestamp=row["timestamp"],
payload=json.loads(row["payload"])
)
for row in rows
]
event_store = EventStore(db_pool)
@mcp.tool()
async def create_order_event(
order_id: str,
customer_id: str,
items: list
) -> str:
"""Создание события OrderCreated."""
import uuid
event = Event(
event_id=str(uuid.uuid4()),
event_type="OrderCreated",
aggregate_id=order_id,
timestamp=datetime.utcnow().isoformat(),
payload={
"customer_id": customer_id,
"items": items
}
)
await event_store.append_event(event)
return f"Event appended for order {order_id}"
@mcp.tool()
async def get_order_history(order_id: str) -> list:
"""Получение истории событий заказа."""
events = await event_store.get_events(order_id)
return [asdict(e) for e in events]class CommandHandler:
"""Обработчик команд (запись)."""
async def handle_create_order(self, command: dict) -> str:
"""Обработка команды создания заказа."""
# Валидация
# Бизнес-логика
# События
return order_id
async def handle_update_order(self, command: dict) -> str:
"""Обработка команды обновления заказа."""
pass
class QueryHandler:
"""Обработчик запросов (чтение)."""
async def get_order(self, order_id: str) -> dict:
"""Получение заказа из read модели."""
async with db_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM order_read_model WHERE order_id = $1",
order_id
)
return dict(row) if row else None
async def search_orders(
self,
customer_id: str = None,
status: str = None
) -> list:
"""Поиск заказов в read модели."""
query = "SELECT * FROM order_read_model WHERE 1=1"
params = []
if customer_id:
query += " AND customer_id = $" + str(len(params) + 1)
params.append(customer_id)
if status:
query += " AND status = $" + str(len(params) + 1)
params.append(status)
async with db_pool.acquire() as conn:
rows = await conn.fetch(query, *params)
return [dict(r) for r in rows]
command_handler = CommandHandler()
query_handler = QueryHandler()
@mcp.tool()
async def create_order_command(
customer_id: str,
items: list
) -> str:
"""Команда: Создать заказ."""
return await command_handler.handle_create_order({
"customer_id": customer_id,
"items": items
})
@mcp.tool()
async def get_order_query(order_id: str) -> dict:
"""Запрос: Получить заказ."""
return await query_handler.get_order(order_id)
@mcp.tool()
async def search_orders_query(
customer_id: str = None,
status: str = None
) -> list:
"""Запрос: Поиск заказов."""
return await query_handler.search_orders(customer_id, status)import aioboto3
class AWSClient:
"""Клиент для AWS сервисов."""
def __init__(self, aws_access_key: str, aws_secret_key: str, region: str):
self.session = aioboto3.Session(
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=region
)
async def invoke_lambda(
self,
function_name: str,
payload: dict
) -> dict:
"""Вызов AWS Lambda функции."""
async with self.session.client('lambda') as client:
response = await client.invoke(
FunctionName=function_name,
Payload=json.dumps(payload)
)
result = await response['Payload'].read()
return json.loads(result)
async def get_s3_object(
self,
bucket: str,
key: str
) -> str:
"""Получение объекта из S3."""
async with self.session.client('s3') as client:
response = await client.get_object(Bucket=bucket, Key=key)
return (await response['Body'].read()).decode('utf-8')
aws = AWSClient(
aws_access_key=os.getenv("AWS_ACCESS_KEY"),
aws_secret_key=os.getenv("AWS_SECRET_KEY"),
region="us-east-1"
)
@mcp.tool()
async def invoke_aws_lambda(
function_name: str,
payload: dict
) -> dict:
"""Вызов AWS Lambda функции."""
return await aws.invoke_lambda(function_name, payload)
@mcp.tool()
async def get_s3_file(bucket: str, key: str) -> str:
"""Получение файла из S3."""
return await aws.get_s3_object(bucket, key)| Интеграция | Паттерн | Инструменты |
|---|---|---|
| Legacy | Adapter | zeep (SOAP), REST шлюзы |
| Микросервисы | Service Discovery, API Gateway | Consul, aiohttp |
| Очереди | Message Queue | RabbitMQ (aio_pika), Kafka (aiokafka) |
| Event-Driven | Event Sourcing, CQRS | PostgreSQL, Redis |
| Облака | Cloud Services | aioboto3 (AWS), azure-sdk |
Следующая тема: Production Best Practices — масштабирование, отказоустойчивость, CI/CD, документирование.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.