Три столпа observability, метрики, алертинг, distributed tracing, structured logging.
Observability — это способность понять внутреннее состояние системы по её внешним выходам. В распределённых системах это не роскошь, а необходимость: без observability вы летите вслепую.
Представьте ситуацию: пользователи жалуются на медленные запросы, но все метрики в норме. CPU в порядке, память в порядке, база данных отвечает быстро. Что происходит?
Без proper observability вы будете:
print() в код и деплоить в productionС proper observability вы:
┌─────────────────────────────────────────────────────────────┐
│ Observability Stack │
├─────────────────┬─────────────────┬─────────────────────────┤
│ Metrics │ Logging │ Tracing │
│ (Prometheus) │ (ELK) │ (Jaeger/Zipkin) │
│ │ │ │
│ - Что сломалось │ - Почему │ - Где │
│ - Когда │ - Контекст │ - Какой путь │
│ - Насколько │ - Детали │ - Задержки по шагам │
└─────────────────┴─────────────────┴─────────────────────────┘
Логи — это записи о событиях, произошедших в системе.
2026-03-03T10:15:32.451Z INFO [order-service] Order created: order_id=12345 user_id=67890
2026-03-03T10:15:33.102Z ERROR [payment-service] Payment failed: order_id=12345 error="insufficient_funds"
Хорошие логи:
Плохие логи:
# НЕ ДЕЛАЙТЕ ТАК
print(f"Order {order_id} created for user {user_id}") # Plain text, нет уровня
logger.error("Error occurred") # Какой error? Где? Почему?
logger.debug(data) # Логирование сырых данных без контекста
Метрики — это числовые измерения состояния системы во времени.
http_requests_total{method="POST", endpoint="/orders", status="200"} 15234
http_request_duration_seconds{endpoint="/orders", quantile="0.95"} 0.245
payment_processing_errors_total{reason="insufficient_funds"} 12
active_connections{service="database"} 45
Типы метрик:
Трейсы — это запись пути запроса через распределённую систему.
Request: POST /orders
Trace ID: abc123def456
┌─────────────────────────────────────────────────────────────┐
│ API Gateway (15ms) │
│ └─┬─ Auth Service (8ms) │
│ └─┬─ Order Service (45ms) │
│ ├─┬─ User Service (12ms) — проверка пользователя │
│ ├─┬─ Inventory Service (23ms) — резервирование │
│ └─┬─ Payment Service (89ms) ⚠️ SLOW! │
│ └─┬─ External Payment Gateway (85ms) │
└─────────────────────────────────────────────────────────────┘
Total latency: 152ms
Bottleneck: Payment Service → External Gateway (85ms)
Span — единица трейса, представляет одну операцию. Trace ID — уникальный идентификатор всего запроса. Parent Span ID — связь между спанами (дерево вызовов).
# app/logging_config.py
import logging
import sys
import json
from datetime import datetime
from typing import Any, Dict
class JSONFormatter(logging.Formatter):
"""
Форматтер для структурированных JSON-логов.
Каждый лог — это JSON-объект с метаданными.
"""
def format(self, record: logging.LogRecord) -> str:
log_entry: Dict[str, Any] = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Добавляем exception info если есть
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
# Добавляем extra поля (correlation_id, user_id, etc.)
for key, value in record.__dict__.items():
if key not in {
"name", "msg", "args", "created", "filename", "funcName",
"levelname", "levelno", "lineno", "module", "msecs",
"pathname", "process", "processName", "relativeCreated",
"stack_info", "exc_info", "exc_text", "thread", "threadName"
}:
log_entry[key] = value
return json.dumps(log_entry, default=str)
def setup_logging(
service_name: str,
log_level: str = "INFO",
include_correlation_id: bool = True
) -> logging.Logger:
"""
Настройка структурированного логирования для сервиса.
Args:
service_name: Имя сервиса для идентификации в логах
log_level: Уровень логирования (DEBUG, INFO, WARNING, ERROR)
include_correlation_id: Добавлять ли correlation_id в логи
Returns:
Настроенный logger
"""
logger = logging.getLogger(service_name)
logger.setLevel(getattr(logging, log_level.upper()))
# Console handler с JSON форматом
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
# Добавляем service_name во все логи
extra = {"service": service_name}
if include_correlation_id:
extra["correlation_id"] = "not-set" # Будет переопределён в request
# Создаём адаптер для автоматического добавления extra полей
return logging.LoggerAdapter(logger, extra)
# Использование
logger = setup_logging("order-service")# app/middleware/correlation.py
import uuid
import logging
from contextvars import ContextVar
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
# ContextVar хранит correlation_id в контексте asyncio задачи
# Это позволяет получать ID в любом месте кода без передачи параметра
correlation_id_var: ContextVar[str] = ContextVar("correlation_id", default="")
class CorrelationIDMiddleware(BaseHTTPMiddleware):
"""
Middleware добавляет correlation_id к каждому запросу.
ID используется для трассировки запроса через все сервисы.
"""
async def dispatch(self, request: Request, call_next) -> Response:
# Берём correlation_id из заголовка (если пришёл от другого сервиса)
# или генерируем новый
correlation_id = request.headers.get(
"X-Correlation-ID",
str(uuid.uuid4())
)
# Устанавливаем в context var
correlation_id_var.set(correlation_id)
# Логгируем начало запроса
logger = logging.getLogger(request.app.title)
logger.info(
f"Incoming request: {request.method} {request.url.path}",
extra={
"correlation_id": correlation_id,
"method": request.method,
"path": request.url.path,
"client_ip": request.client.host if request.client else "unknown"
}
)
# Вызываем следующий middleware / handler
response = await call_next(request)
# Добавляем correlation_id в ответ (для клиента)
response.headers["X-Correlation-ID"] = correlation_id
# Логгируем завершение запроса
logger.info(
f"Outgoing response: {response.status_code}",
extra={
"correlation_id": correlation_id,
"status_code": response.status_code
}
)
return response
def get_correlation_id() -> str:
"""Получить текущий correlation_id из контекста."""
return correlation_id_var.get()
def inject_correlation_id(record: logging.LogRecord) -> None:
"""
Фильтр для автоматического добавления correlation_id в логи.
Используется с logging.Filter.
"""
record.correlation_id = correlation_id_var.get()
return True# app/services/order_service.py
import logging
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel
from app.logging_config import setup_logging
from app.middleware.correlation import get_correlation_id
logger = setup_logging("order-service")
class OrderItem(BaseModel):
product_id: int
quantity: int
price: float
class OrderCreate(BaseModel):
user_id: int
items: List[OrderItem]
total_amount: float
class OrderService:
def __init__(self, db_connection, inventory_client, payment_client):
self.db = db_connection
self.inventory = inventory_client
self.payment = payment_client
async def create_order(self, order_data: OrderCreate) -> dict:
"""
Создание заказа с полным логированием каждого шага.
"""
correlation_id = get_correlation_id()
logger.info(
"Starting order creation",
extra={
"correlation_id": correlation_id,
"user_id": order_data.user_id,
"total_amount": order_data.total_amount,
"items_count": len(order_data.items)
}
)
try:
# Шаг 1: Проверка пользователя
user = await self._get_user(order_data.user_id)
if not user:
logger.warning(
"User not found for order",
extra={
"correlation_id": correlation_id,
"user_id": order_data.user_id
}
)
return {"error": "user_not_found", "status_code": 400}
# Шаг 2: Резервирование товаров
logger.debug(
"Reserving inventory",
extra={
"correlation_id": correlation_id,
"products": [item.product_id for item in order_data.items]
}
)
inventory_reserved = await self.inventory.reserve_items(
order_data.items,
correlation_id=correlation_id
)
if not inventory_reserved:
logger.warning(
"Inventory reservation failed",
extra={
"correlation_id": correlation_id,
"user_id": order_data.user_id
}
)
return {"error": "inventory_unavailable", "status_code": 409}
# Шаг 3: Обработка платежа
logger.info(
"Processing payment",
extra={
"correlation_id": correlation_id,
"amount": order_data.total_amount
}
)
payment_result = await self.payment.charge(
user_id=order_data.user_id,
amount=order_data.total_amount,
correlation_id=correlation_id
)
if not payment_result.success:
logger.error(
"Payment failed",
extra={
"correlation_id": correlation_id,
"user_id": order_data.user_id,
"error": payment_result.error_message,
"error_code": payment_result.error_code
}
)
# Компенсирующая транзакция: отмена резерва
await self.inventory.release_reservation(
order_data.items,
correlation_id=correlation_id
)
return {
"error": "payment_failed",
"details": payment_result.error_message,
"status_code": 402
}
# Шаг 4: Сохранение заказа в БД
order = await self._save_order(order_data, user.id)
logger.info(
"Order created successfully",
extra={
"correlation_id": correlation_id,
"order_id": order.id,
"user_id": order_data.user_id,
"total_amount": order_data.total_amount
}
)
# Шаг 5: Публикация события
await self._publish_order_created_event(order)
return {"order_id": order.id, "status": "created"}
except Exception as e:
logger.exception(
"Unexpected error during order creation",
extra={
"correlation_id": correlation_id,
"user_id": order_data.user_id,
"error_type": type(e).__name__
}
)
raise
async def _get_user(self, user_id: int) -> Optional[dict]:
# ... реализация ...
pass
async def _save_order(self, order_data: OrderCreate, user_id: int) -> dict:
# ... реализация ...
pass
async def _publish_order_created_event(self, order: dict):
# ... реализация ...
pass# logging_config.yaml
version: 1
disable_existing_loggers: false
formatters:
json:
class: pythonjsonlogger.jsonlogger.JsonFormatter
format: >
%(timestamp)s %(level)s %(service)s %(correlation_id)s
%(name)s %(funcName)s %(lineno)d %(message)s
handlers:
console:
class: logging.StreamHandler
formatter: json
stream: ext://sys.stdout
level: INFO
file:
class: logging.handlers.RotatingFileHandler
formatter: json
filename: /var/log/order-service/app.log
maxBytes: 104857600 # 100MB
backupCount: 10
level: DEBUG
error_file:
class: logging.handlers.RotatingFileHandler
formatter: json
filename: /var/log/order-service/error.log
maxBytes: 104857600
backupCount: 10
level: ERROR
loggers:
order-service:
level: INFO
handlers: [console, file, error_file]
propagate: false
# Тихие логи для noisy библиотек
uvicorn.access:
level: WARNING
handlers: [console]
propagate: false
sqlalchemy.engine:
level: WARNING
handlers: [console]
propagate: false
root:
level: INFO
handlers: [console]# Применение конфигурации
import logging.config
import yaml
with open("logging_config.yaml") as f:
config = yaml.safe_load(f)
logging.config.dictConfig(config)
logger = logging.getLogger("order-service")Prometheus — это time-series база данных и система мониторинга.
Архитектура:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Application │ │ Prometheus │ │ Grafana │
│ (instrumented)│───►│ Server │───►│ Dashboard │
│ │ │ │ │ │
│ /metrics │ │ - Pull metrics │ │ - Visualization │
│ Prometheus │ │ - Storage │ │ - Alerting UI │
│ Client │ │ - Query (PromQL)│ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Модель данных:
# app/metrics.py
from prometheus_client import (
Counter,
Gauge,
Histogram,
Summary,
generate_latest,
CONTENT_TYPE_LATEST,
CollectorRegistry,
multiprocess,
CollectorRegistry
)
from prometheus_client import start_http_server as start_metrics_server
import time
from functools import wraps
from typing import Callable, Any
import logging
logger = logging.getLogger(__name__)
# === COUNTER: только растёт ===
# Общее количество HTTP запросов
HTTP_REQUESTS_TOTAL = Counter(
"http_requests_total",
"Total number of HTTP requests",
labelnames=["method", "endpoint", "status"]
)
# Количество ошибок по типам
ERRORS_TOTAL = Counter(
"errors_total",
"Total number of errors",
labelnames=["error_type", "service"]
)
# Количество созданных заказов
ORDERS_CREATED_TOTAL = Counter(
"orders_created_total",
"Total number of orders created",
labelnames=["status"] # success, payment_failed, inventory_unavailable
)
# === GAUGE: может расти и падать ===
# Количество активных соединений к БД
DB_ACTIVE_CONNECTIONS = Gauge(
"database_active_connections",
"Number of active database connections",
labelnames=["pool_name"]
)
# Количество элементов в очереди
QUEUE_SIZE = Gauge(
"queue_size",
"Number of items in queue",
labelnames=["queue_name"]
)
# Количество активных пользователей
ACTIVE_USERS = Gauge(
"active_users",
"Number of currently active users"
)
# === HISTOGRAM: распределение значений ===
# Время выполнения HTTP запросов
HTTP_REQUEST_DURATION = Histogram(
"http_request_duration_seconds",
"HTTP request duration in seconds",
labelnames=["method", "endpoint"],
buckets=[
0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75,
1.0, 2.5, 5.0, 7.5, 10.0, float("inf")
]
)
# Время обработки платежа
PAYMENT_PROCESSING_TIME = Histogram(
"payment_processing_time_seconds",
"Time spent processing payments",
labelnames=["payment_provider"],
buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, float("inf")]
)
# === SUMMARY: перцентили ===
REQUEST_LATENCY = Summary(
"request_latency_seconds",
"Request latency in seconds",
labelnames=["service", "operation"]
)
def track_request_duration(
method: str,
endpoint: str
) -> Callable:
"""
Декоратор для автоматического трекинга длительности запросов.
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
start_time = time.time()
status = "200"
try:
result = await func(*args, **kwargs)
if hasattr(result, "status_code"):
status = str(result.status_code)
return result
except Exception as e:
status = "500"
ERRORS_TOTAL.labels(error_type=type(e).__name__, service="order-service").inc()
raise
finally:
duration = time.time() - start_time
HTTP_REQUEST_DURATION.labels(method=method, endpoint=endpoint).observe(duration)
HTTP_REQUESTS_TOTAL.labels(method=method, endpoint=endpoint, status=status).inc()
return wrapper
return decorator
def setup_metrics_endpoint(app, port: int = 8000):
"""
Настройка endpoint для Prometheus scraping.
"""
from fastapi import Response
@app.get("/metrics")
async def metrics():
"""Endpoint для Prometheus scraper."""
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
# Запуск отдельного сервера для метрик (опционально)
# start_metrics_server(port)
logger.info(f"Metrics endpoint available at /metrics")# app/services/order_service_with_metrics.py
import logging
from prometheus_client import Counter, Histogram
from typing import List, Optional
logger = logging.getLogger(__name__)
# Бизнес-метрики
ORDER_VALUE_HISTOGRAM = Histogram(
"order_value_usd",
"Distribution of order values in USD",
buckets=[10, 50, 100, 250, 500, 1000, 2500, 5000, float("inf")]
)
PAYMENT_RETRIES_COUNTER = Counter(
"payment_retries_total",
"Total number of payment retry attempts",
labelnames=["payment_provider", "final_status"] # success, failed
)
INVENTORY_CHECK_DURATION = Histogram(
"inventory_check_duration_seconds",
"Time spent checking inventory availability",
labelnames=["items_count"]
)
class OrderServiceWithMetrics:
def __init__(self, db, inventory_client, payment_client):
self.db = db
self.inventory = inventory_client
self.payment = payment_client
async def create_order(self, order_data) -> dict:
"""Создание заказа с бизнес-метриками."""
# Метрика: размер заказа
ORDER_VALUE_HISTOGRAM.observe(order_data.total_amount)
# Метрика: время проверки инвентаря
start_time = time.time()
items_available = await self.inventory.check_availability(order_data.items)
INVENTORY_CHECK_DURATION.labels(
items_count=len(order_data.items)
).observe(time.time() - start_time)
if not items_available:
# Метрика: причина неудачи
ERRORS_TOTAL.labels(
error_type="inventory_unavailable",
service="order-service"
).inc()
return {"error": "inventory_unavailable"}
# Метрика: retries при оплате
max_retries = 3
for attempt in range(max_retries):
try:
payment_result = await self.payment.charge(
user_id=order_data.user_id,
amount=order_data.total_amount
)
if payment_result.success:
if attempt > 0:
PAYMENT_RETRIES_COUNTER.labels(
payment_provider=self.payment.provider,
final_status="success"
).inc(attempt)
break
except PaymentGatewayTimeout:
if attempt == max_retries - 1:
PAYMENT_RETRIES_COUNTER.labels(
payment_provider=self.payment.provider,
final_status="failed"
).inc(attempt)
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
return {"order_id": order.id, "status": "created"}# prometheus.yml
global:
scrape_interval: 15s # Как часто опрашивать targets
evaluation_interval: 15s # Как часто оценивать правила алертинга
scrape_configs:
# Prometheus сам себя мониторит
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
# Order Service
- job_name: 'order-service'
static_configs:
- targets: ['order-service:8000']
metrics_path: /metrics
scrape_interval: 10s # Более частый scrape для критичных сервисов
# User Service
- job_name: 'user-service'
static_configs:
- targets: ['user-service:8000']
# Payment Service
- job_name: 'payment-service'
static_configs:
- targets: ['payment-service:8000']
# Node Exporter для метрик хоста (CPU, память, диск)
- job_name: 'node-exporter'
static_configs:
- targets: ['node-exporter:9100']
# Правила алертинга
rule_files:
- "alerts.yml"
# Alertmanager для отправки уведомлений
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']# alerts.yml
groups:
- name: order-service-alerts
rules:
# Высокий уровень ошибок (>5% запросов возвращают 5xx)
- alert: HighErrorRate
expr: |
sum(rate(http_requests_total{status=~"5.."}[5m]))
/
sum(rate(http_requests_total[5m]))
> 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} over the last 5 minutes"
# Высокая латентность (p95 > 1s)
- alert: HighLatency
expr: |
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket[5m])) by (le, endpoint)
) > 1
for: 10m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "95th percentile latency is {{ $value }}s for endpoint {{ $labels.endpoint }}"
# Сервис недоступен
- alert: ServiceDown
expr: up{job="order-service"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Service {{ $labels.job }} is down"
description: "{{ $labels.job }} has been down for more than 1 minute"
# Мало свободной памяти
- alert: LowMemory
expr: |
(node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100 < 10
for: 5m
labels:
severity: warning
annotations:
summary: "Low memory on {{ $labels.instance }}"
description: "Memory usage is above 90%"
# Очередь заказов растёт
- alert: QueueGrowing
expr: |
delta(queue_size{queue_name="orders"}[10m]) > 100
for: 5m
labels:
severity: warning
annotations:
summary: "Order queue is growing"
description: "Queue size increased by {{ $value }} in 10 minutes"
- name: database-alerts
rules:
# Много активных соединений
- alert: DatabaseConnectionPoolExhausted
expr: |
database_active_connections / database_max_connections > 0.9
for: 5m
labels:
severity: critical
annotations:
summary: "Database connection pool nearly exhausted"
description: "{{ $value | humanizePercentage }} of connections in use"
# Медленные запросы
- alert: SlowQueries
expr: |
rate(database_query_duration_seconds_bucket{le="1.0"}[5m])
/
rate(database_query_duration_seconds_count[5m])
< 0.95
for: 10m
labels:
severity: warning
annotations:
summary: "Many slow database queries"
description: "More than 5% of queries take longer than 1 second"Проблема: Слишком много алертов → команда игнорирует все алерты.
Решения:
# Стратегии борьбы с alert fatigue
# 1. Используйте for: для устойчивости к кратковременным спайкам
- alert: HighErrorRate
expr: error_rate > 0.05
for: 5m # Ждём 5 минут перед алертом
# 2. Разделяйте severity levels
labels:
severity: critical # Звонит PagerDuty, будит ночью
severity: warning # Создаёт тикет, не будит
severity: info # Только логгируется
# 3. Используйте inhibition rules (подавление алертов)
# Если ServiceDown, не алертить о HighLatency для того же сервиса
inhibit_rules:
- source_match:
alertname: ServiceDown
target_match:
alertname: HighLatency
equal: ['service']
# 4. Группировка алертов
# group_wait: 30s — ждать 30s перед отправкой первой группы
# group_interval: 5m — ждать 5m перед отправкой новых алертов в группу
# repeat_interval: 4h — повторять алерт каждые 4 часа если не resolved
group:
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
receiver: 'slack-notifications'# alertmanager.yml
global:
smtp_smarthost: 'smtp.company.com:587'
smtp_from: 'alertmanager@company.com'
slack_api_url: 'https://hooks.slack.com/services/XXX/YYY/ZZZ'
route:
# Маршрутизация по severity
receiver: 'slack-info'
group_by: ['alertname', 'service']
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
- match:
severity: critical
receiver: 'pagerduty-critical'
continue: true # Продолжить проверку следующих routes
- match:
severity: warning
receiver: 'slack-warnings'
- match:
severity: info
receiver: 'slack-info'
receivers:
- name: 'pagerduty-critical'
pagerduty_configs:
- service_key: 'YOUR_PAGERDUTY_KEY'
severity: critical
- name: 'slack-warnings'
slack_configs:
- channel: '#alerts-warnings'
send_resolved: true
- name: 'slack-info'
slack_configs:
- channel: '#alerts-info'
send_resolved: false # Не спамить о resolved info алертахПроблема: В микросервисной архитектуре запрос проходит через 5-10 сервисов. Где именно произошла задержка?
Решение: Distributed tracing добавляет уникальный trace_id к запросу и отслеживает каждый шаг.
Запрос: POST /orders
Trace ID: 4bf92f3577b34da6a3ce929d0e0e4736
Span 1: API Gateway
├─ Span ID: 00f067aa0ba902b7
├─ Duration: 15ms
└─ Children: [Span 2]
Span 2: Order Service
├─ Span ID: 00f067aa0ba902b8
├─ Parent: 00f067aa0ba902b7
├─ Duration: 145ms
└─ Children: [Span 3, Span 4, Span 5]
Span 3: User Service (проверка пользователя)
├─ Span ID: 00f067aa0ba902b9
├─ Parent: 00f067aa0ba902b8
└─ Duration: 12ms
Span 4: Inventory Service (резервирование)
├─ Span ID: 00f067aa0ba902ba
├─ Parent: 00f067aa0ba902b8
└─ Duration: 45ms
Span 5: Payment Service (обработка платежа) ⚠️ SLOW
├─ Span ID: 00f067aa0ba902bb
├─ Parent: 00f067aa0ba902b8
├─ Duration: 89ms
└─ Children: [Span 6]
Span 6: External Payment Gateway
├─ Span ID: 00f067aa0ba902bc
├─ Parent: 00f067aa0ba902bb
└─ Duration: 85ms ← Bottleneck найден!
# app/telemetry.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
)
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.exporter.zipkin.json import ZipkinExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.b3 import B3MultiFormat
import logging
logger = logging.getLogger(__name__)
def setup_opentelemetry(
service_name: str,
jaeger_endpoint: str = "http://jaeger:14268/api/traces",
sample_rate: float = 1.0
) -> trace.Tracer:
"""
Настройка OpenTelemetry для распределённой трассировки.
Args:
service_name: Имя сервиса для идентификации в трейсах
jaeger_endpoint: URL Jaeger collector
sample_rate: Процент запросов для трассировки (1.0 = 100%)
"""
# Настройка провайдера трейсов
provider = TracerProvider()
# Настройка сэмплирования (чтобы не трассировать 100% запросов в production)
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
provider.sampler = ParentBasedTraceIdRatio(sample_rate)
# Экспорт в Jaeger
jaeger_exporter = JaegerExporter(
endpoint=jaeger_endpoint,
username=None,
password=None,
)
# Batch processor для эффективности
span_processor = BatchSpanProcessor(jaeger_exporter)
provider.add_span_processor(span_processor)
# Для отладки можно добавить console exporter
# provider.add_span_processor(
# BatchSpanProcessor(ConsoleSpanExporter())
# )
# Устанавливаем провайдер глобально
trace.set_tracer_provider(provider)
# Настройка propagation (для передачи trace context между сервисами)
# B3 format используется Zipkin и Jaeger
set_global_textmap(B3MultiFormat())
# Инструментация фреймворков
FastAPIInstrumentor.instrument_app()
HTTPXClientInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument()
RedisInstrumentor().instrument()
logger.info(f"OpenTelemetry configured for {service_name}")
return trace.get_tracer(service_name)
# Глобальный tracer для использования в приложении
tracer = None
def init_telemetry(service_name: str, **kwargs):
global tracer
tracer = setup_opentelemetry(service_name, **kwargs)
return tracer# app/services/order_service_traced.py
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.semconv.trace import SpanAttributes
import logging
logger = logging.getLogger(__name__)
tracer = trace.get_tracer("order-service")
class OrderServiceTraced:
def __init__(self, db, inventory_client, payment_client):
self.db = db
self.inventory = inventory_client
self.payment = payment_client
async def create_order(self, order_data) -> dict:
"""
Создание заказа с ручной трассировкой.
"""
# Получаем текущий контекст (trace_id, span_id из входящего запроса)
current_span = trace.get_current_span()
with tracer.start_as_current_span(
"create_order",
kind=trace.SpanKind.INTERNAL,
attributes={
SpanAttributes.DB_SYSTEM: "postgresql",
"order.user_id": order_data.user_id,
"order.total_amount": order_data.total_amount,
"order.items_count": len(order_data.items)
}
) as span:
try:
# Шаг 1: Проверка пользователя
user = await self._check_user_with_trace(order_data.user_id)
if not user:
span.set_attribute("order.user_found", False)
span.set_status(Status(StatusCode.ERROR, "User not found"))
return {"error": "user_not_found", "status_code": 400}
span.set_attribute("order.user_found", True)
# Шаг 2: Резервирование инвентаря
with tracer.start_as_current_span("reserve_inventory") as inventory_span:
inventory_span.set_attribute("inventory.items_count", len(order_data.items))
reserved = await self.inventory.reserve_items(
order_data.items
)
inventory_span.set_attribute(
"inventory.reserved",
reserved
)
if not reserved:
inventory_span.set_status(
Status(StatusCode.ERROR, "Inventory unavailable")
)
return {"error": "inventory_unavailable", "status_code": 409}
# Шаг 3: Обработка платежа
with tracer.start_as_current_span("process_payment") as payment_span:
payment_span.set_attribute(
"payment.amount",
order_data.total_amount
)
payment_result = await self.payment.charge(
user_id=order_data.user_id,
amount=order_data.total_amount
)
payment_span.set_attribute(
"payment.success",
payment_result.success
)
if not payment_result.success:
payment_span.set_status(
Status(StatusCode.ERROR, payment_result.error_message)
)
payment_span.set_attribute(
"payment.error_code",
payment_result.error_code
)
# Компенсирующая транзакция
await self._release_inventory_with_trace(order_data.items)
return {
"error": "payment_failed",
"details": payment_result.error_message,
"status_code": 402
}
# Шаг 4: Сохранение заказа
with tracer.start_as_current_span("save_order") as save_span:
order = await self._save_order(order_data, user.id)
save_span.set_attribute("order.id", order.id)
span.set_attribute("order.id", order.id)
span.set_status(Status(StatusCode.OK))
logger.info(f"Order created: {order.id}")
return {"order_id": order.id, "status": "created"}
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
logger.exception("Error creating order")
raise
async def _check_user_with_trace(self, user_id: int):
with tracer.start_as_current_span("check_user") as span:
span.set_attribute("user.id", user_id)
# ... реализация ...
pass
async def _release_inventory_with_trace(self, items):
with tracer.start_as_current_span("release_inventory") as span:
span.set_attribute("inventory.items_count", len(items))
await self.inventory.release_reservation(items)# app/middleware/tracing.py
from opentelemetry import trace, context
from opentelemetry.propagate import extract, inject
from opentelemetry.trace import Link
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
class OpenTelemetryMiddleware(BaseHTTPMiddleware):
"""
Middleware для извлечения и внедрения trace context.
Автоматически передаёт trace_id между сервисами.
"""
async def dispatch(self, request: Request, call_next):
# Извлекаем trace context из заголовков входящего запроса
# (если запрос пришёл от другого сервиса)
ctx = extract(request.headers)
# Активируем контекст для этого запроса
token = context.attach(ctx)
try:
# Получаем текущий span (созданный FastAPIInstrumentor)
current_span = trace.get_current_span()
# Добавляем дополнительную информацию в span
current_span.set_attribute("http.method", request.method)
current_span.set_attribute("http.url", str(request.url))
current_span.set_attribute("http.client_ip", request.client.host if request.client else "")
# Вызываем следующий middleware / handler
response = await call_next(request)
# Добавляем статус код в span
current_span.set_attribute("http.status_code", response.status_code)
# Внедряем trace context в заголовки ответа
# (для downstream сервисов)
headers = {}
inject(headers)
for key, value in headers.items():
response.headers[key] = value
return response
finally:
context.detach(token)
# Пример propagation при вызове другого сервиса
# app/clients/http_client.py
import httpx
from opentelemetry.propagate import inject
async def call_user_service(user_id: int, correlation_id: str) -> dict:
"""
Вызов User Service с propagation trace context.
"""
headers = {
"X-Correlation-ID": correlation_id
}
# Внедряем trace context в заголовки
inject(headers)
async with httpx.AsyncClient() as client:
response = await client.get(
f"http://user-service:8000/users/{user_id}",
headers=headers
)
response.raise_for_status()
return response.json()# docker-compose.yml для Jaeger
version: '3.8'
services:
jaeger:
image: jaegertracing/all-in-one:1.45
ports:
- "16686:16686" # UI
- "14268:14268" # Thrift collector
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
environment:
- COLLECTOR_OTLP_ENABLED=true
- LOG_LEVEL=debug
networks:
- app-network
# Альтернатива: Zipkin
zipkin:
image: openzipkin/zipkin:2.24
ports:
- "9411:9411" # UI
environment:
- STORAGE_TYPE=mem # В production использовать Elasticsearch
networks:
- app-networkJaeger UI: http://localhost:16686
Zipkin UI: http://localhost:9411
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Application │ │ Filebeat │ │ Logstash │
│ (JSON logs) │───►│ (shipper) │───►│ (processing) │
└─────────────────┘ └─────────────────┘ └────────┬────────┘
│
┌──────────────────────────────────┘
│
▼
┌─────────────────┐
│ Elasticsearch │
│ (storage & │
│ search) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Kibana │
│ (visualization) │
└─────────────────┘
Компоненты:
# filebeat.yml
filebeat.inputs:
- type: container
enabled: true
paths:
- /var/lib/docker/containers/*/*.log
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/lib/docker/containers/"
# Альтернатива: чтение из файлов
- type: log
enabled: true
paths:
- /var/log/order-service/*.log
json.keys_under_root: true
json.add_error_key: true
json.message_key: message
fields:
service: order-service
field_under_root: true
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
output.elasticsearch:
hosts: ["elasticsearch:9200"]
indices:
- index: "logs-order-service-%{+yyyy.MM.dd}"
when.equals:
service: "order-service"
- index: "logs-user-service-%{+yyyy.MM.dd}"
when.equals:
service: "user-service"
- index: "logs-payment-service-%{+yyyy.MM.dd}"
when.equals:
service: "payment-service"
# Логгирование самого Filebeat
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat.log# logstash/pipeline.conf
input {
beats {
port => 5044
}
}
filter {
# Парсинг JSON логов
json {
source => "message"
target => "parsed"
}
# Копируем поля на верхний уровень
mutate {
rename => {
"[parsed][timestamp]" => "@timestamp"
"[parsed][level]" => "log_level"
"[parsed][message]" => "message"
"[parsed][correlation_id]" => "correlation_id"
"[parsed][service]" => "service"
}
}
# Добавляем теги для ошибок
if [log_level] == "ERROR" {
mutate {
add_tag => ["error"]
}
}
# Парсинг trace_id из correlation_id если есть
if [correlation_id] {
mutate {
add_field => {
"trace_id" => "%{correlation_id}"
}
}
}
# Удаляем временные поля
mutate {
remove_field => ["parsed", "message", "ecs", "agent", "host", "input"]
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{service}-%{+yyyy.MM.dd}"
}
# Для отладки
# stdout { codec => rubydebug }
}# docker-compose.elk.yml
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
ports:
- "9200:9200"
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
networks:
- app-network
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
volumes:
- ./logstash/pipeline.conf:/usr/share/logstash/pipeline/logstash.conf
ports:
- "5044:5044"
environment:
- LS_JAVA_OPTS=-Xmx512m -Xms512m
networks:
- app-network
depends_on:
- elasticsearch
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- xpack.security.enabled=false
ports:
- "5601:5601"
networks:
- app-network
depends_on:
- elasticsearch
filebeat:
image: docker.elastic.co/beats/filebeat:8.11.0
user: root
volumes:
- ./filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
networks:
- app-network
depends_on:
- logstash
volumes:
elasticsearch_data:
networks:
app-network:Полезные запросы Kibana Discover:
# Все ошибки за последний час
log_level: ERROR AND @timestamp >= now-1h
# Ошибки конкретного сервиса
service: order-service AND log_level: ERROR
# Запросы с конкретным correlation_id
correlation_id: abc123def456
# Медленные запросы (по длительности)
duration_ms > 1000
# Ошибки платежей
service: payment-service AND message: "Payment failed"
Создание визуализаций:
Симптом: Пользователи жалуются на медленное создание заказов.
Шаг 1: Проверка метрик (Grafana)
# Проверяем latency по endpoint
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket{endpoint="/orders"}[5m]))
# Результат: p95 = 2.5s (было 200ms)Шаг 2: Поиск аномалий
# Сравниваем с baseline
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket{endpoint="/orders"}[5m]))
/
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket{endpoint="/orders"}[1h] offset 1h))
# Результат: 12.5x медленнее чем час назадШаг 3: Анализ трейсов (Jaeger)
Trace: 4bf92f3577b34da6a3ce929d0e0e4736
Duration: 2.8s
┌─────────────────────────────────────────────────────────┐
│ API Gateway (25ms) │
│ └─ Order Service (2.7s) │
│ ├─ User Service (15ms) │
│ ├─ Inventory Service (35ms) │
│ └─ Payment Service (2.6s) ⚠️ │
│ └─ External Payment Gateway (2.5s) ⚠️⚠️ │
└─────────────────────────────────────────────────────────┘
Вывод: Проблема во внешнем payment gateway.
Шаг 4: Проверка логов (Kibana)
service: payment-service AND correlation_id: 4bf92f3577b34da6a3ce929d0e0e4736
Находим логи:
{
"timestamp": "2026-03-03T10:15:32.451Z",
"level": "WARNING",
"service": "payment-service",
"correlation_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"message": "Payment gateway response slow",
"duration_ms": 2500,
"gateway_response_code": "DELAYED_PROCESSING"
}Решение:
Симптом: Alertmanager прислал алерт HighErrorRate.
Шаг 1: Проверка алерта
Alert: HighErrorRate
Severity: critical
Error rate: 8.5% (threshold: 5%)
Duration: 7m
Шаг 2: Анализ по сервисам (Grafana)
# Error rate по сервисам
sum(rate(http_requests_total{status=~"5.."}[5m])) by (service)
/
sum(rate(http_requests_total[5m])) by (service)Результат:
Шаг 3: Анализ ошибок (Kibana)
service: payment-service AND log_level: ERROR AND @timestamp >= now-15m
Top error messages:
Шаг 4: Проверка метрик БД
# Активные соединения
database_active_connections{service="payment-service"}
# Результат: 98/100 соединений использованоШаг 5: Проверка трейсов В Jaeger видим много трейсов со span "database_query" длительностью > 30s (таймаут).
Диагноз: Connection pool exhaustion в payment-service.
Решение:
Симптом: Все сервисы начали возвращать ошибки.
Шаг 1: Проверка зависимостей
# Up статус всех сервисов
up{job=~".*-service"}Результат: database: down
Шаг 2: Анализ трейсов В Jaeger видим что все трейсы failing на одном и том же span: "database_query".
Шаг 3: Проверка логов БД
service: postgresql AND log_level: ERROR
FATAL: too many connections for role "app_user"
Диагноз: БД упала из-за слишком большого количества соединений.
Решение:
✅ Используйте structured logging (JSON)
# Good
logger.info("Order created", extra={"order_id": 123, "user_id": 456})
# Bad
logger.info(f"Order {123} created for user {456}")✅ Добавляйте correlation_id ко всем логам
# Все логи запроса имеют один correlation_id
# Легко трассировать через все сервисы✅ Инструментируйте всё автоматически
# FastAPI, httpx, SQLAlchemy, Redis — всё должно быть инструментировано
FastAPIInstrumentor.instrument_app()
HTTPXClientInstrumentor().instrument()✅ Настройте meaningful alerts
# Алерт должен быть actionable
# Если алерт сработал — должно быть понятно что делать✅ Используйте sampling в production
# 100% tracing слишком дорого
# Используйте 1-10% sampling для production
sample_rate = 0.01 # 1%❌ Не логируйте чувствительные данные
# Bad
logger.info(f"User password: {password}")
logger.info(f"Credit card: {card_number}")
# Good
logger.info("User authenticated", extra={"user_id": user_id})❌ Не создавайте too many metrics
# Bad: метрика с high cardinality
REQUESTS_TOTAL.labels(user_id=user_id).inc() # Миллионы уникальных user_id
# Good: метрика с low cardinality
REQUESTS_TOTAL.labels(endpoint=endpoint).inc() # Десятки endpoints❌ Не игнорируйте alert fatigue
# Bad: алерты на всё подряд
# Good: только actionable алерты с правильным severity❌ Не трассируйте 100% запросов в production
# Bad: sample_rate = 1.0 # Слишком дорого
# Good: sample_rate = 0.01 # 1% достаточно для анализаObservability — это не опция, а необходимость для production систем. Инвестиции в proper logging, metrics и tracing окупаются многократно при debugging production issues.
Ключевые принципы:
Помните: вы не можете улучшить то, что не можете измерить.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.