Observability: Prometheus, Grafana, структурированное логирование, трассировка, деплой в Kubernetes.
Observability: Prometheus, Grafana, структурированное логирование, трассировка, деплой в Kubernetes.
import logging
import json
from pythonjsonlogger import jsonlogger
# Настройка JSON логгера
logger = logging.getLogger()
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
fmt="%(asctime)s %(levelname)s %(name)s %(message)s %(trace_id)s %(span_id)s",
datefmt="%Y-%m-%dT%H:%M:%S"
)
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)
# Использование
logger.info("Order processed", extra={
"order_id": 123,
"trace_id": "abc-123",
"duration_ms": 45.2
})Вывод:
{
"asctime": "2026-03-30T12:00:00",
"levelname": "INFO",
"name": "app.orders",
"message": "Order processed",
"order_id": 123,
"trace_id": "abc-123",
"duration_ms": 45.2
}from contextvars import ContextVar
from faststream.rabbit import RabbitMessage
trace_id_var: ContextVar[str] = ContextVar("trace_id", default="")
@broker.middleware
async def logging_middleware(message: RabbitMessage, next_middleware):
trace_id = message.headers.get("trace_id", str(uuid.uuid4()))
token = trace_id_var.set(trace_id)
logger.info(f"Processing {message.routing_key}", extra={
"trace_id": trace_id,
"correlation_id": message.correlation_id
})
try:
result = await next_middleware(message)
logger.info("Success", extra={"trace_id": trace_id})
return result
except Exception as e:
logger.error(f"Error: {e}", exc_info=True, extra={"trace_id": trace_id})
raise
finally:
trace_id_var.reset(token)# Production: INFO
logger.info("Order created") # Бизнес-события
logger.warning("Retry attempt 2") # Предупреждения
logger.error("DB connection lost") # Ошибки
# Development: DEBUG
logger.debug(f"Message body: {message.body}")
logger.debug(f"Headers: {message.headers}")
# Критичные ошибки
logger.critical("Queue unavailable", exc_info=True)from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Start Prometheus HTTP server
start_http_server(8000)
# Метрики
messages_processed = Counter(
'faststream_messages_processed_total',
'Total processed messages',
['queue', 'status', 'broker']
)
processing_duration = Histogram(
'faststream_processing_duration_seconds',
'Message processing duration',
['queue'],
buckets=(0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0)
)
queue_size = Gauge(
'faststream_queue_size',
'Current queue size',
['queue']
)
# Middleware для сбора метрик
@broker.middleware
async def metrics_middleware(message, next_middleware):
start = time.time()
queue = message.routing_key
try:
result = await next_middleware(message)
messages_processed.labels(queue=queue, status='success', broker='rabbitmq').inc()
return result
except Exception:
messages_processed.labels(queue=queue, status='error', broker='rabbitmq').inc()
raise
finally:
duration = time.time() - start
processing_duration.labels(queue=queue).observe(duration)# prometheus.yml
scrape_configs:
- job_name: 'faststream'
static_configs:
- targets: ['app:8000']
scrape_interval: 15s
metrics_path: /metrics{
"dashboard": {
"title": "FastStream Overview",
"panels": [
{
"title": "Messages per Second",
"targets": [
{
"expr": "rate(faststream_messages_processed_total[5m])",
"legendFormat": "{{queue}} - {{status}}"
}
]
},
{
"title": "Processing Duration (p95)",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(faststream_processing_duration_seconds_bucket[5m]))",
"legendFormat": "{{queue}}"
}
]
},
{
"title": "Queue Size",
"targets": [
{
"expr": "rabbitmq_queue_messages",
"legendFormat": "{{queue}}"
}
]
}
]
}
}# alerting.yml
groups:
- name: faststream
rules:
- alert: HighErrorRate
expr: rate(faststream_messages_processed_total{status="error"}[5m]) > 0.1
for: 5m
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors/sec"
- alert: HighLatency
expr: histogram_quantile(0.95, rate(faststream_processing_duration_seconds_bucket[5m])) > 1
for: 10m
annotations:
summary: "High processing latency"
description: "p95 latency is {{ $value }}s"
- alert: ConsumerLag
expr: kafka_consumer_lag > 10000
for: 5m
annotations:
summary: "Consumer lag detected"
description: "Lag is {{ $value }} messages"from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Настройка Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger",
agent_port=6831,
)
# Setup trace provider
provider = TracerProvider()
processor = BatchSpanProcessor(jaeger_exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
# Middleware для трассировки
@broker.middleware
async def tracing_middleware(message, next_middleware):
trace_id = message.headers.get("trace_id")
with tracer.start_as_current_span(
f"process_{message.routing_key}",
kind=trace.SpanKind.CONSUMER
) as span:
span.set_attribute("messaging.destination", message.routing_key)
span.set_attribute("messaging.message_id", message.correlation_id)
result = await next_middleware(message)
return resultfrom opentelemetry.propagate import extract, inject
# Publisher: inject context
headers = {}
inject(headers) # Добавляет traceparent, tracestate
await broker.publish(
{"order_id": 123},
"orders",
headers=headers
)
# Subscriber: extract context
@broker.subscriber("orders")
async def handle_order(order: dict, message: RabbitMessage):
context = extract(message.headers)
with tracer.start_as_current_span("handle_order", context=context):
# Span связан с trace publisher'а
...# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-processor
spec:
replicas: 3
selector:
matchLabels:
app: order-processor
template:
metadata:
labels:
app: order-processor
spec:
containers:
- name: app
image: myapp:latest
env:
- name: BROKER_URL
valueFrom:
secretKeyRef:
name: broker-secret
key: url
- name: LOG_LEVEL
value: "INFO"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
exec:
command:
- faststream
- check
- app.main:app
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
exec:
command:
- faststream
- check
- app.main:app
initialDelaySeconds: 5
periodSeconds: 5# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: order-processor-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: order-processor
minReplicas: 3
maxReplicas: 10
metrics:
- type: External
external:
metric:
name: kafka_consumer_lag
target:
type: AverageValue
averageValue: "1000"# configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: app-config
data:
LOG_LEVEL: "INFO"
METRICS_ENABLED: "true"
TRACING_ENABLED: "true"
JAEGER_HOST: "jaeger.observability.svc"
# secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: broker-secret
type: Opaque
stringData:
url: "amqp://user:password@rabbitmq:5672"from fastapi import FastAPI
health_app = FastAPI()
@health_app.get("/health")
async def health_check():
return {"status": "healthy"}
@health_app.get("/ready")
async def readiness_check():
try:
# Проверка подключения к брокеру
await broker.ping()
# Проверка БД
await db.execute("SELECT 1")
return {"status": "ready"}
except Exception as e:
return {"status": "not ready", "error": str(e)}, 503# main.py
from faststream import FastStream
from faststream.rabbit import RabbitBroker
from prometheus_client import start_http_server
from opentelemetry import trace
import logging
from pythonjsonlogger import jsonlogger
# Настройка логирования
logger = logging.getLogger()
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)
# Настройка метрик
start_http_server(8000)
# Настройка трассировки
# ... (Jaeger exporter)
# Брокер
broker = RabbitBroker(
os.getenv("BROKER_URL"),
max_retries=5,
retry_delay=1.0
)
app = FastStream(broker)
# Middleware: логирование + метрики + трассировка
@broker.middleware
async def observability_middleware(message, next_middleware):
# Логирование
trace_id = message.headers.get("trace_id")
logger.info(f"Processing {message.routing_key}", extra={"trace_id": trace_id})
# Метрики
start = time.time()
try:
result = await next_middleware(message)
messages_processed.labels(queue=message.routing_key, status='success').inc()
return result
except Exception as e:
logger.error(f"Error: {e}", extra={"trace_id": trace_id})
messages_processed.labels(queue=message.routing_key, status='error').inc()
raise
finally:
duration = time.time() - start
processing_duration.labels(queue=message.routing_key).observe(duration)
@broker.subscriber("orders")
async def process_order(order: dict):
await db.orders.save(order)Следующая тема — Миграции и версионирование схем: эволюция форматов сообщений, совместимость, schema registry.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.