Metrics, логирование, tracing, RabbitMQ Management API, Prometheus интеграция
Наблюдаемость (observability) — способность понимать внутреннее состояние системы по её внешним выходам. В этой теме изучим мониторинг RabbitMQ, метрики, логирование и tracing.
┌─────────────────────────────────────────────────────────┐
│ Observability │
├─────────────────┬─────────────────┬─────────────────────┤
│ Metrics │ Logs │ Traces │
│ (метрики) │ (логи) │ (трассировка) │
├─────────────────┼─────────────────┼─────────────────────┤
│ - Что происходит│ - Почему │ - Где проблема │
│ - Тренды │ - Детали │ - Путь запроса │
│ - Алерты │ - Отладка │ - Задержки │
└─────────────────┴─────────────────┴─────────────────────┘
| Метрика | Описание | Тревожное значение |
|---|---|---|
messages | Количество сообщений в очереди | > 10 000 |
messages_ready | Сообщения готовые к доставке | Растёт постоянно |
messages_unacknowledged | Сообщения в обработке | > consumers × 10 |
consumers | Количество активных консьюмеров | Внезапное падение |
connections | Количество подключений | > 1000 |
channels | Количество каналов | > 10 000 |
message_stats.publish | Скорость публикации | Резкое падение |
message_stats.deliver | Скорость доставки | Резкое падение |
memory | Использование памяти | > 80% |
disk_free | Свободное место на диске | < 2 GB |
Используйте Management API для сбора метрик и экспорта в Prometheus:
import aiohttp
from prometheus_client import Gauge, Counter, start_http_server
import asyncio
from datetime import datetime
class RabbitMQMetricsCollector:
"""Сбор метрик из RabbitMQ Management API."""
def __init__(self, rabbitmq_url: str, username: str, password: str):
self.base_url = rabbitmq_url.rstrip('/')
self.username = username
self.password = password
self.auth = aiohttp.BasicAuth(username, password)
# Prometheus метрики
self.queue_messages = Gauge(
'rabbitmq_queue_messages',
'Messages in queue',
['queue', 'vhost']
)
self.queue_consumers = Gauge(
'rabbitmq_queue_consumers',
'Consumers for queue',
['queue', 'vhost']
)
self.queue_unacked = Gauge(
'rabbitmq_queue_unacknowledged',
'Unacknowledged messages',
['queue', 'vhost']
)
self.connection_count = Gauge(
'rabbitmq_connections',
'Total connections'
)
self.memory_usage = Gauge(
'rabbitmq_memory_bytes',
'Memory usage in bytes'
)
self.disk_free = Gauge(
'rabbitmq_disk_free_bytes',
'Free disk space in bytes'
)
async def collect_queue_metrics(self):
"""Сбор метрик очередей."""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.base_url}/api/queues",
auth=self.auth
) as response:
queues = await response.json()
for queue in queues:
vhost = queue['vhost']
name = queue['name']
self.queue_messages.labels(
queue=name,
vhost=vhost
).set(queue.get('messages', 0))
self.queue_consumers.labels(
queue=name,
vhost=vhost
).set(queue.get('consumers', 0))
self.queue_unacked.labels(
queue=name,
vhost=vhost
).set(queue.get('messages_unacknowledged', 0))
async def collect_broker_metrics(self):
"""Сбор метрик брокера."""
async with aiohttp.ClientSession() as session:
# Статус узла
async with session.get(
f"{self.base_url}/api/nodes",
auth=self.auth
) as response:
nodes = await response.json()
for node in nodes:
if node['type'] == 'disc':
self.memory_usage.set(node.get('mem_used', 0))
self.disk_free.set(node.get('disk_free', 0))
# Подключения
async with session.get(
f"{self.base_url}/api/connections",
auth=self.auth
) as response:
connections = await response.json()
self.connection_count.set(len(connections))
async def collect_all(self):
"""Сбор всех метрик."""
await asyncio.gather(
self.collect_queue_metrics(),
self.collect_broker_metrics(),
return_exceptions=True
)
async def start(self, port: int = 9090, interval: int = 15):
"""Запуск сбора метрик."""
# Prometheus HTTP server
start_http_server(port)
print(f"Metrics server started on port {port}")
while True:
try:
await self.collect_all()
except Exception as e:
print(f"Error collecting metrics: {e}")
await asyncio.sleep(interval)Настройте алерты для своевременного обнаружения проблем:
from prometheus_alertmanager import Alertmanager
class RabbitMQAlerts:
"""Алертинг на метрики RabbitMQ."""
def __init__(self, alertmanager_url: str):
self.am = Alertmanager(alertmanager_url)
async def check_alerts(self, collector: RabbitMQMetricsCollector):
"""Проверка алертов."""
# Алерт: растущая очередь
queue_size = collector.queue_messages._metrics.get(('main_queue', '/'))
if queue_size and queue_size > 10000:
await self.am.send_alert(
alertname='RabbitMQQueueGrowing',
severity='warning',
description=f"Queue 'main_queue' has {queue_size} messages",
labels={'queue': 'main_queue'}
)
# Алерт: много unacknowledged
unacked = collector.queue_unacked._metrics.get(('main_queue', '/'))
consumers = collector.queue_consumers._metrics.get(('main_queue', '/'))
if unacked and consumers and unacked > consumers * 10:
await self.am.send_alert(
alertname='RabbitMQHighUnacknowledged',
severity='critical',
description=f"High unacknowledged: {unacked} messages, {consumers} consumers",
labels={'queue': 'main_queue'}
)
# Алерт: мало места на диске
disk_free = collector.disk_free._metrics.get(())
if disk_free and disk_free < 2 * 1024 * 1024 * 1024: # 2GB
await self.am.send_alert(
alertname='RabbitMQDiskSpaceLow',
severity='critical',
description=f"Low disk space: {disk_free} bytes",
labels={'node': 'rabbitmq-1'}
)Используйте структурированное логирование для машинной обработки логов:
import logging
import json
from pythonjsonlogger import jsonlogger
def setup_logging(service_name: str):
"""Настройка структурированного логирования."""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# JSON formatter для машинной обработки
formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%dT%H:%M:%SZ'
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logging.getLogger(service_name)
logger = setup_logging('rabbitmq_consumer')
class LoggedConsumer:
"""Консьюмер с подробным логированием."""
async def process(self, message: aio_pika.abc.AbstractIncomingMessage):
"""Обработка с логированием."""
# Логирование начала обработки
logger.info(
"Processing message",
extra={
"message_id": message.message_id,
"correlation_id": message.correlation_id,
"routing_key": message.routing_key,
"body_size": len(message.body),
"headers": dict(message.headers),
"timestamp": message.timestamp.isoformat() if message.timestamp else None
}
)
try:
async with message.process():
await self._handle(message)
logger.info(
"Message processed successfully",
extra={
"message_id": message.message_id,
"processing_time_ms": self.processing_time_ms
}
)
except Exception as e:
logger.error(
"Message processing failed",
extra={
"message_id": message.message_id,
"error": str(e),
"error_type": type(e).__name__
},
exc_info=True
)
raiseЛогируйте события подключения для мониторинга состояния соединения:
import aio_pika.abc
class EventLogger:
"""Логирование событий подключения RabbitMQ."""
def __init__(self, logger: logging.Logger):
self.logger = logger
def setup_connection_logging(self, connection: aio_pika.abc.AbstractConnection):
"""Подписка на события подключения."""
connection.on('connected', self._on_connected)
connection.on('disconnected', self._on_disconnected)
connection.on('error', self._on_error)
def _on_connected(self, connection: aio_pika.abc.AbstractConnection):
self.logger.info(
"RabbitMQ connected",
extra={
"url": str(connection.url),
"client_properties": connection.client_properties
}
)
def _on_disconnected(self, connection: aio_pika.abc.AbstractConnection,
reason: Exception = None):
self.logger.warning(
"RabbitMQ disconnected",
extra={
"reason": str(reason) if reason else "unknown"
}
)
def _on_error(self, connection: aio_pika.abc.AbstractConnection,
error: Exception):
self.logger.error(
"RabbitMQ connection error",
extra={
"error": str(error),
"error_type": type(error).__name__
},
exc_info=True
)OpenTelemetry обеспечивает end-to-end трассировку через микросервисы:
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.context import attach, detach, set_value
from opentelemetry.propagate import inject, extract
tracer = trace.get_tracer(__name__)
class TracedConsumer:
"""Консьюмер с distributed tracing."""
async def process(self, message: aio_pika.abc.AbstractIncomingMessage):
"""Обработка с трассировкой."""
# Извлечение context из заголовков
carrier = {"headers": dict(message.headers)}
ctx = extract(carrier)
token = attach(set_value("rabbitmq_message", message, context=ctx))
try:
with tracer.start_as_current_span(
"rabbitmq_process",
kind=trace.SpanKind.CONSUMER,
attributes={
"messaging.system": "rabbitmq",
"messaging.destination": message.exchange,
"messaging.message.id": message.message_id,
"messaging.rabbitmq.routing_key": message.routing_key
}
) as span:
async with message.process():
await self._handle(message)
span.set_status(Status(StatusCode.OK))
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
finally:
detach(token)
async def _handle(self, message: aio_pika.abc.AbstractIncomingMessage):
"""Обработка сообщения."""
# Длительная операция
await asyncio.sleep(0.1)
class TracedPublisher:
"""Продюсер с distributed tracing."""
def __init__(self, channel):
self.channel = channel
async def publish(self, exchange, message, routing_key):
"""Публикация с трассировкой."""
with tracer.start_as_current_span(
"rabbitmq_publish",
kind=trace.SpanKind.PRODUCER,
attributes={
"messaging.system": "rabbitmq",
"messaging.destination": exchange.name,
"messaging.rabbitmq.routing_key": routing_key
}
) as span:
# Инъекция context в заголовки
carrier = {}
inject(carrier)
# Добавление tracing заголовков к сообщению
for key, value in carrier.items():
message.headers[key] = value
return await exchange.publish(message, routing_key=routing_key)Health check endpoint позволяет проверять состояние подключения к RabbitMQ:
from aiohttp import web
from enum import Enum
class HealthStatus(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
class HealthChecker:
"""Проверка здоровья RabbitMQ подключения."""
def __init__(self, connection_url: str):
self.connection_url = connection_url
self.connection = None
self.last_check_time = None
self.last_check_status = None
async def connect(self):
self.connection = await aio_pika.connect_robust(self.connection_url)
async def check(self) -> dict:
"""Проверка здоровья."""
checks = {}
overall_status = HealthStatus.HEALTHY
try:
# Проверка подключения
if self.connection and not self.connection.is_closed:
checks["connection"] = {"status": "ok"}
else:
checks["connection"] = {"status": "error", "message": "Not connected"}
overall_status = HealthStatus.UNHEALTHY
# Проверка канала
channel = await self.connection.channel()
await channel.close()
checks["channel"] = {"status": "ok"}
# Проверка возможности публикации
test_exchange = await channel.declare_exchange(
"health_check_exchange",
aio_pika.ExchangeType.DIRECT,
passive=True # Проверка существования
)
checks["exchange"] = {"status": "ok"}
except Exception as e:
checks["error"] = {"status": "error", "message": str(e)}
overall_status = HealthStatus.UNHEALTHY
self.last_check_time = datetime.utcnow()
self.last_check_status = overall_status
return {
"status": overall_status.value,
"timestamp": self.last_check_time.isoformat(),
"checks": checks
}
# FastAPI integration
app = web.Application()
health_checker = HealthChecker("amqp://guest:guest@localhost/")
async def health_handler(request):
health = await health_checker.check()
status_code = 200 if health["status"] == "healthy" else 503
return web.json_response(health, status=status_code)
app.router.add_get("/health", health_handler)Создайте дашборд в Grafana для визуализации метрик:
{
"dashboard": {
"title": "RabbitMQ Monitoring",
"panels": [
{
"title": "Messages in Queue",
"targets": [
{
"expr": "rabbitmq_queue_messages{queue=\"main_queue\"}",
"legendFormat": "Messages"
}
]
},
{
"title": "Publish vs Deliver Rate",
"targets": [
{
"expr": "rate(rabbitmq_message_stats_publish[5m])",
"legendFormat": "Publish"
},
{
"expr": "rate(rabbitmq_message_stats_deliver[5m])",
"legendFormat": "Deliver"
}
]
},
{
"title": "Consumer Count",
"targets": [
{
"expr": "rabbitmq_queue_consumers{queue=\"main_queue\"}",
"legendFormat": "Consumers"
}
]
},
{
"title": "Memory Usage",
"targets": [
{
"expr": "rabbitmq_memory_bytes",
"legendFormat": "Memory"
}
]
}
]
}
}Следующие рекомендации обеспечат надёжную наблюдаемость системы:
Добавляйте контекст к логам для упрощения отладки:
logger.info(
"Message processed",
extra={
"message_id": message.message_id,
"correlation_id": message.correlation_id,
"queue": queue_name,
"processing_time_ms": elapsed_ms
}
)Используйте лейблы для фильтрации и агрегации метрик:
message_counter = Counter(
'rabbitmq_messages_processed_total',
'Total processed messages',
['queue', 'status', 'error_type'] # Лейблы для фильтрации
)Настройте многоуровневый алертинг с эскалацией:
# Warning → Critical → Page
if dlq_size > 10:
send_alert("DLQ growing", severity="warning")
elif dlq_size > 50:
send_alert("DLQ critical", severity="critical")
elif dlq_size > 100:
page_oncall("DLQ emergency")Используйте sampling для уменьшения накладных расходов tracing:
# 10% трассировка для экономии ресурсов
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
sampler = ParentBasedTraceIdRatio(0.1) # 10%# Проверка здоровья каждые 30 секунд
async def health_check_loop():
while True:
health = await health_checker.check()
if health["status"] != "healthy":
send_alert(f"Health check failed: {health}")
await asyncio.sleep(30)В следующей теме изучим production-паттерны и лучшие практики.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.