Sentry, Datadog, New Relic, CloudWatch, Loki
Production-приложения интегрируются с внешними системами для мониторинга ошибок, хранения логов и алертинга.
┌─────────────────────────────────────────────────────────────────────────┐
│ Production логирование │
└─────────────────────────────────────────────────────────────────────────┘
Приложение Агрегаторы Хранилища
│
│ logger.error()
▼
┌─────────────────┐
│ Python App │
│ ┌───────────┐ │
│ │ Logger │ │
│ └─────┬─────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ Handlers │ │
│ └─────┬─────┘ │
└────────┼────────┘
│
┌────┴────┬─────────────┬──────────────┬────────────┐
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Sentry │ │CloudWatch│ │ Datadog │ │ Loki │ │ File │
│(ошибки)│ │ (AWS) │ │(метрики) │ │ (Grafana)│ │(backup) │
└────────┘ └────────┘ └──────────┘ └──────────┘ └──────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Dashboards & Alerts │
└─────────────────────────────────────────────────────────────────┘
Проблема: Нужно узнавать об ошибках в реальном времени и иметь контекст для отладки.
Решение: Sentry автоматически собирает ошибки с traceback и контекстом.
pip install sentry-sdkimport sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration
# Инициализация
sentry_sdk.init(
dsn="https://key@o0.ingest.sentry.io/0",
integrations=[
LoggingIntegration(
level=logging.INFO, # Какие логи отправлять как breadcrumbs
event_level=logging.ERROR # Какие логи отправлять как события
)
],
traces_sample_rate=1.0, # 100% трассировка
environment="production"
)
logger = logging.getLogger(__name__)
# Ошибка автоматически отправится в Sentry
logger.error("Database connection failed", exc_info=True)┌─────────────────────────────────────────────────────────────┐
│ Sentry Integration │
└─────────────────────────────────────────────────────────────┘
logger.error("DB failed", exc_info=True)
│
▼
┌───────────────────┐
│ LoggingIntegration │
│ (перехватывает) │
└─────────┬─────────┘
│
┌─────┴─────┐
│ │
▼ ▼
┌─────────┐ ┌─────────────┐
│Breadcrumb│ │ Event │
│ (info) │ │ (error) │
└────┬─────┘ └──────┬──────┘
│ │
└──────┬───────┘
│
▼
┌──────────────┐
│ Sentry SDK │
│ (отправка) │
└──────┬───────┘
│
▼
┌──────────────┐
│ Sentry │
│ Server │
└──────────────┘
Уровни логирования:
| Уровень logger | Что делает Sentry |
|---|---|
DEBUG | Игнорируется (по умолчанию) |
INFO | Отправляется как Breadcrumb (след) |
WARNING | Отправляется как Breadcrumb |
ERROR | Отправляется как Event (создаёт issue) |
CRITICAL | Отправляется как Event (создаёт issue) |
import sentry_sdk
try:
risky_operation()
except Exception as e:
# Автоматическая отправка с traceback
sentry_sdk.capture_exception(e)
# Или через logger
logger.exception("Operation failed")from sentry_sdk import configure_scope, set_user, set_tag, set_context
# Контекст для всех последующих событий
set_user({"id": 123, "email": "user@example.com"})
set_tag("request_id", "abc-123")
set_context("custom", {"key": "value"})
# Или через configure_scope для детальной настройки
with configure_scope() as scope:
scope.set_extra("batch_id", batch_id)
scope.set_tag("operation", "import")
scope.user = {"id": user_id}
# Лог с контекстом
logger.error("Import failed")Проблема: Нужно видеть путь к ошибке.
Решение: Sentry Breadcrumbs записывают шаги перед ошибкой.
from sentry_sdk import add_breadcrumb
add_breadcrumb(
category="auth",
message="User authenticated",
level="info",
data={"user_id": 123}
)
add_breadcrumb(
category="database",
message="Query executed",
level="debug",
data={"query": "SELECT * FROM users"}
)
# При ошибке Sentry покажет все breadcrumbs
logger.error("Unexpected error")В Sentry:
Breadcrumbs:
[10:00:00] auth: User authenticated (user_id=123)
[10:00:01] database: Query executed
[10:00:02] error: Unexpected error
Автоматические breadcrumbs:
Sentry автоматически собирает:
requests, httpx, aiohttp)sqlalchemy, psycopg2)loggingПроблема: Не все ошибки нужно отправлять в Sentry.
Решение: before_send для фильтрации.
def before_send(event, hint):
# Игнорировать определённые исключения
if 'exc_info' in hint:
exc_type, exc_value, tb = hint['exc_info']
# Игнорировать 404 ошибки
if hasattr(exc_value, 'status_code'):
if exc_value.status_code == 404:
return None
# Игнорировать ошибки валидации
if exc_type.__name__ == 'ValidationError':
return None
# Игнорировать ошибки в health check
if 'health' in str(exc_value).lower():
return None
return event
sentry_sdk.init(
dsn="...",
before_send=before_send
)Типы хуков фильтрации:
| Хук | Когда вызывается | Что возвращает |
|---|---|---|
before_send | Перед отправкой error/event | event или None |
before_breadcrumb | Перед добавлением breadcrumb | breadcrumb или None |
before_send_transaction | Перед отправкой транзакции | transaction или None |
from sentry_sdk import start_transaction
with start_transaction(op="task", name="process_order"):
# Транзакция автоматически измерит время
process_payment()
create_order()
send_email()
# Или декоратор
@sentry_sdk.tracing.trace
def process_order():
...Трассировка запроса:
┌─────────────────────────────────────────────────────────────┐
│ Sentry Transaction │
└─────────────────────────────────────────────────────────────┘
process_order (1.2s)
│
├─ process_payment (0.5s)
│ └─ HTTP POST /payment (0.4s)
│
├─ create_order (0.3s)
│ └─ SQL INSERT (0.2s)
│
└─ send_email (0.4s)
└─ SMTP send (0.3s)
sentry_sdk.init(
dsn="...",
# 100% для development
traces_sample_rate=1.0,
# 10% для production (экономия денег)
traces_sample_rate=0.1,
# Динамический sample rate
traces_sampler=sampling_context,
)
def sampling_context(sampling_context):
"""Динамический выбор sample rate."""
# Всегда трассировать ошибки
if sampling_context.get("parent_sampled") is True:
return 1.0
# Трассировать 10% обычных запросов
return 0.1sentry_sdk.init(
dsn="...",
# Версия приложения для отслеживания релизов
release="my-app@1.2.3",
# Или автоматически из git
# release=sentry_sdk.get_git_revision(),
)В Sentry:
Проблема: Нужно централизованное хранение логов в AWS.
Решение: watchtower библиотека для CloudWatch Logs.
pip install watchtowerimport logging
import watchtower
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# CloudWatch Handler
cloudwatch_handler = watchtower.CloudWatchLogHandler(
log_group='my-app',
stream_name='production',
boto3_client=None, # Использует default credentials
create_log_group=True,
send_interval=60, # Отправка каждые 60 секунд
max_batch_size=1024*1024, # 1MB
max_batch_count=10000
)
logger.addHandler(cloudwatch_handler)
logger.info("Application started")┌─────────────────────────────────────────────────────────────┐
│ CloudWatch Logs Architecture │
└─────────────────────────────────────────────────────────────┘
Приложение
│
│ logger.info()
▼
┌─────────────────┐
│ CloudWatchLogHandler │
│ (буфер в памяти) │
└─────────┬─────────┘
│
│ send_interval=60s
│ max_batch_size=1MB
▼
┌─────────────────┐
│ CloudWatch API │
│ PutLogEvents │
└─────────┬─────────┘
│
▼
┌─────────────────────────────────────────┐
│ Log Group: my-app │
│ ┌─────────────────────────────────┐ │
│ │ Stream: production │ │
│ │ [10:00:00] App started │ │
│ │ [10:00:01] Request processed │ │
│ └─────────────────────────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ Stream: production-2024-01-01 │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
import watchtower
from datetime import datetime
# Stream по дате
stream_name = f"production-{datetime.now().strftime('%Y-%m-%d')}"
handler = watchtower.CloudWatchLogHandler(
log_group='my-app',
stream_name=stream_name
)Стратегии именования stream'ов:
| Стратегия | Пример | Когда использовать |
|---|---|---|
| По окружению | production, staging | Микросервисы |
| По дате | 2024-01-15 | Daily rotation |
| По хосту | i-0abc123def | EC2 instances |
| По контейнеру | container-id | Kubernetes |
from pythonjsonlogger import jsonlogger
import watchtower
json_formatter = jsonlogger.JsonFormatter()
cloudwatch_handler = watchtower.CloudWatchLogHandler(
log_group='my-app',
stream_name='production'
)
cloudwatch_handler.setFormatter(json_formatter)
logger.addHandler(cloudwatch_handler)Вывод в CloudWatch:
{
"asctime": "2024-01-15 10:00:00,000",
"name": "app.api",
"levelname": "INFO",
"message": "Request processed",
"user_id": 123,
"request_id": "abc-123"
}Проблема: Нужны права для записи в CloudWatch.
Решение: IAM policy для EC2/ECS role.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogStreams"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}Проблема: Нужно искать и анализировать логи.
Решение: CloudWatch Insights Query Language.
-- Найти все ошибки за последний час
fields @timestamp, @message
| filter @message like /ERROR/
| sort @timestamp desc
| limit 100
-- Агрегация по уровню логирования
stats count(*) as count by level
| sort count desc
-- Найти медленные запросы
fields @timestamp, duration
| filter duration > 1000
| sort duration desc| Параметр | Стоимость | Пример |
|---|---|---|
| Ingestion | $0.50 / GB | 10 GB/день = $150/мес |
| Storage | $0.03 / GB/мес | 100 GB = $3/мес |
| Insights | $0.005 / GB сканирования | 100 GB = $0.50 |
Оптимизация costs:
WARNINGПроблема: Нужно единое решение для логов, метрик и tracing.
Решение: Datadog Agent + Python библиотека.
pip install datadogfrom datadog import initialize, api
initialize(
api_key="your_api_key",
app_key="your_app_key"
)
# Отправка лога
api.Event.create(
title="Application Error",
text="Database connection failed",
alert_type="error",
tags=["env:production", "service:users-api"]
)import logging
from datadog import initialize
initialize(api_key="your_api_key")
# Форматирование для Datadog
class DatadogFormatter(logging.Formatter):
def format(self, record):
record.ddtags = "env:production,service:users-api"
return super().format(record)
handler = logging.StreamHandler()
handler.setFormatter(DatadogFormatter())
logger = logging.getLogger(__name__)
logger.addHandler(handler)Проблема: Нужна отправка логов через Datadog Agent.
Решение: Agent читает файлы логов или принимает по сети.
# datadog.yaml
logs:
- type: file
path: /var/log/app/*.log
service: users-api
source: python
log_processing_rules:
- type: multi_line
name: log_start_with_date
pattern: \d{4}-\d{2}-\d{2}┌─────────────────────────────────────────────────────────────┐
│ Datadog Architecture │
└─────────────────────────────────────────────────────────────┘
Приложение
│
│ 1. logger.info()
▼
┌─────────────────┐
│ StreamHandler │
│ (stdout) │
└─────────┬───────┘
│
│ 2. stdout
▼
┌─────────────────┐ ┌─────────────────┐
│ Docker/K8s │────▶│ Datadog Agent │
│ stdout │ │ (sidecar) │
└─────────────────┘ └─────────┬───────┘
│
│ 3. HTTPS
▼
┌─────────────────┐
│ Datadog │
│ Cloud │
└─────────────────┘
from ddtrace import tracer
@tracer.wrap(service="users-api", resource="get_user")
def get_user(user_id):
# Span автоматически создаётся
user = db.query(User).filter_by(id=user_id).first()
return user
# Ручное создание span
with tracer.trace("external_api.call") as span:
span.set_tag("api", "payment")
response = requests.post("https://payment.api/charge")
span.set_metric("response_time", response.elapsed.total_seconds())Проблема: Нужно дешёвое хранение логов с интеграцией в Grafana.
Решение: Loki — легковесная система логирования.
pip install loki-logger-handlerimport logging
from loki_logger_handler import LokiLoggerHandler
logger = logging.getLogger(__name__)
# Loki Handler
loki_handler = LokiLoggerHandler(
host="http://localhost:3100",
labels={
"app": "users-api",
"env": "production"
}
)
logger.addHandler(loki_handler)
logger.setLevel(logging.INFO)
logger.info("Request processed", extra={"user_id": 123})Проблема: Loki индексирует только labels.
Решение: Правильный выбор labels.
# ✅ Хорошо: селективные labels
loki_handler = LokiLoggerHandler(
host="http://loki:3100",
labels={
"app": "users-api",
"level": "%(levelname)s",
"env": "production"
}
)
# ❌ Плохо: слишком много labels (дорого)
loki_handler = LokiLoggerHandler(
host="http://loki:3100",
labels={
"app": "users-api",
"user_id": "%(user_id)s", # Высокая кардинальность!
"request_id": "%(request_id)s" # Высокая кардинальность!
}
)Правило: Labels должны иметь низкую кардинальность (не меняться для каждого запроса).
┌─────────────────────────────────────────────────────────────┐
│ Loki Cardinality │
└─────────────────────────────────────────────────────────────┘
Низкая кардинальность (✅ Хорошо):
app: users-api, orders-api, payments-api (3 значения)
env: production, staging (2 значения)
level: INFO, WARNING, ERROR (3 значения)
Total: 3 × 2 × 3 = 18 unique label combinations
Высокая кардинальность (❌ Плохо):
user_id: 1, 2, 3, ... 1000000 (1M значений)
request_id: uuid для каждого запроса (∞ значений)
Total: 1M+ unique label combinations = дорого!
-- Найти все ошибки
{app="users-api", env="production"} |= "ERROR"
-- Поиск по содержимому
{app="users-api"} |= "timeout" |~ "database"
-- Агрегация
sum(rate({app="users-api"} |= "ERROR" [5m])) by (level)
-- Pattern matching
{app="users-api"} | pattern "<_> Request <request_id> took <duration>ms"| Параметр | Loki | Elasticsearch |
|---|---|---|
| Индексация | Только labels | Полнотекстовая |
| Стоимость хранения | Дёшево (S3, GCS) | Дорого (SSD) |
| Поиск | Медленнее | Быстрее |
| Использование | Debug, audit | Security, compliance |
| Ресурсы | Низкие | Высокие |
Проблема: Традиционные логи сложно парсить и анализировать.
Решение: JSON форматирование для всех handlers.
pip install python-json-loggerimport logging
from pythonjsonlogger import jsonlogger
logger = logging.getLogger(__name__)
# JSON Formatter
json_formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S'
)
handler = logging.StreamHandler()
handler.setFormatter(json_formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.info("User logged in", extra={
"user_id": 123,
"ip_address": "192.168.1.1"
})Вывод:
{
"asctime": "2024-01-15T10:00:00",
"name": "app.auth",
"levelname": "INFO",
"message": "User logged in",
"user_id": 123,
"ip_address": "192.168.1.1"
}import logging
import json
from datetime import datetime
class CustomJsonFormatter(logging.Formatter):
"""JSON formatter с дополнительными полями."""
def __init__(self):
super().__init__()
self.default_fields = {
"service": "users-api",
"version": "1.2.3"
}
def format(self, record):
log_data = {
**self.default_fields,
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
# Добавление exc_info
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# Добавление extra полей
for key, value in record.__dict__.items():
if key not in ['name', 'msg', 'args', 'created', 'filename',
'funcName', 'levelname', 'levelno', 'lineno',
'module', 'msecs', 'pathname', 'process',
'processName', 'relativeCreated', 'stack_info',
'exc_info', 'exc_text', 'thread', 'threadName']:
try:
json.dumps(value) # Проверка на сериализуемость
log_data[key] = value
except (TypeError, ValueError):
log_data[key] = str(value)
return json.dumps(log_data, ensure_ascii=False)
# Использование
handler = logging.StreamHandler()
handler.setFormatter(CustomJsonFormatter())from dataclasses import dataclass, asdict
import logging
@dataclass
class UserEvent:
user_id: int
action: str
metadata: dict
logger = logging.getLogger(__name__)
event = UserEvent(
user_id=123,
action="login",
metadata={"ip": "192.168.1.1"}
)
# ✅ Хорошо: сериализация объекта
logger.info("User event", extra=asdict(event))
# ❌ Плохо: str() объекта
logger.info(f"User event: {event}")Проблема: Нужно комплексное APM решение с логами.
Решение: New Relic Python agent.
pip install newrelicimport newrelic.agent
# Инициализация
newrelic.agent.initialize('newrelic.ini', 'production')
# Обёртывание приложения
@newrelic.agent.background_task()
def process_data():
...
# Логирование ошибок
import logging
logger = logging.getLogger(__name__)
try:
risky_operation()
except Exception:
newrelic.agent.notice_error()
logger.exception("Operation failed")[newrelic]
license_key = your_license_key
app_name = users-api
environment = production
log_level = info
# Transaction tracing
transaction_tracer.enabled = true
transaction_tracer.transaction_threshold = apdex_f
# Error collector
error_collector.enabled = true
error_collector.ignore_errors = exceptions.ValidationError| Система | Назначение | Стоимость | Поиск | Когда использовать |
|---|---|---|---|---|
| Sentry | Ошибки и исключения | Freemium ($26/мес) | По ошибкам | Отладка багов |
| CloudWatch | Хранение логов в AWS | $0.50/GB | CloudWatch Insights | AWS-native приложения |
| Datadog | Логи + метрики + tracing | Дорого ($15/хост/мес) | Полнотекстовый | Enterprise monitoring |
| Loki | Дешёвое хранение логов | Дёшево (S3) | По labels + содержимое | Grafana stack |
| Elasticsearch | Полнотекстовый поиск | Средне/дорого | Full-text | Security, compliance |
| Splunk | Enterprise логирование | Очень дорого | Full-text + ML | Крупный enterprise |
┌─────────────────────────────────────────────────────────────┐
│ Decision Tree │
└─────────────────────────────────────────────────────────────┘
Нужно отслеживать ошибки?
│
├─ Да → Sentry (лучший DX)
│
└─ Нет → Нужно хранить все логи?
│
├─ Да → В AWS?
│ │
│ ├─ Да → CloudWatch
│ │
│ └─ Нет → Loki (дёшево) или Datadog (enterprise)
│
└─ Нет → Только метрики?
│
├─ Да → Prometheus + Grafana
│
└─ Нет → Возможно, логи не нужны
Проблема: Нужно использовать несколько систем одновременно.
Решение: Несколько handlers для разных целей.
import logging
from logging.handlers import RotatingFileHandler
import watchtower
from pythonjsonlogger import jsonlogger
import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration
def setup_production_logging(
app_name: str = "my-app",
environment: str = "production",
sentry_dsn: str = None,
cloudwatch_log_group: str = None,
):
"""Настройка production логирования."""
logger = logging.getLogger(app_name)
logger.setLevel(logging.DEBUG)
# JSON Formatter для всех handlers
json_formatter = jsonlogger.JsonFormatter()
# 1. Файл с ротацией (локальный backup)
file_handler = RotatingFileHandler(
f'/var/log/{app_name}/app.log',
maxBytes=100*1024*1024,
backupCount=7
)
file_handler.setFormatter(json_formatter)
file_handler.setLevel(logging.DEBUG)
# 2. Console (для k8s stdout)
console_handler = logging.StreamHandler()
console_handler.setFormatter(json_formatter)
console_handler.setLevel(logging.INFO)
# 3. CloudWatch (централизованное хранение)
if cloudwatch_log_group:
cloudwatch_handler = watchtower.CloudWatchLogHandler(
log_group=cloudwatch_log_group,
stream_name=f"{environment}-{app_name}"
)
cloudwatch_handler.setFormatter(json_formatter)
cloudwatch_handler.setLevel(logging.INFO)
logger.addHandler(cloudwatch_handler)
# 4. Sentry (ошибки)
if sentry_dsn:
sentry_integration = LoggingIntegration(
level=logging.INFO,
event_level=logging.ERROR
)
sentry_sdk.init(
dsn=sentry_dsn,
integrations=[sentry_integration],
traces_sample_rate=0.1,
environment=environment,
release=f"{app_name}@1.0.0"
)
# Добавление handlers
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
# Использование
logger = setup_production_logging(
app_name="users-api",
environment="production",
sentry_dsn="https://...",
cloudwatch_log_group="my-app"
)
logger.info("Application started")┌─────────────────────────────────────────────────────────────┐
│ Combined Logging Architecture │
└─────────────────────────────────────────────────────────────┘
logger.error("DB failed")
│
▼
┌─────────────────┐
│ Root Logger │
│ (DEBUG) │
└────────┬────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ File Handler │ │ Console │ │ Sentry │
│ (DEBUG) │ │ Handler │ │ Integration │
│ │ │ (INFO) │ │ (ERROR) │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ /var/log/ │ │ stdout │ │ Sentry │
│ app.log │ │ (k8s logs) │ │ Dashboard │
│ (backup) │ │ │ │ │
└───────────────┘ └───────────────┘ └───────────────┘
# ❌ Плохо: пароль в логах
logger.error("Login failed", extra={"password": password})
# ✅ Хорошо: только username
logger.error("Login failed", extra={"username": username})
# ❌ Плохо: полный запрос
logger.debug(f"SQL: SELECT * FROM users WHERE password='{password}'")
# ✅ Хорошо: хеширование/маскирование
logger.debug("SQL: SELECT * FROM users WHERE password='***'")Что нельзя логировать:
# ✅ Хорошо: 10% трассировка для production
sentry_sdk.init(
dsn="...",
traces_sample_rate=0.1 # 10%
)
# ✅ Лучше: динамический sample rate
def traces_sampler(sampling_context):
# Всегда трассировать ошибки
if sampling_context.get("parent_sampled") is True:
return 1.0
# Трассировать 1% обычных запросов
return 0.01
sentry_sdk.init(
dsn="...",
traces_sampler=traces_sampler
)
# ❌ Плохо: 100% трассировка дорого
sentry_sdk.init(
dsn="...",
traces_sample_rate=1.0
)def before_send(event, hint):
# Игнорировать 4xx ошибки
if 'exc_info' in hint:
exc = hint['exc_info'][1]
if hasattr(exc, 'status_code') and exc.status_code < 500:
return None
# Игнорировать известные ошибки
if 'ConnectionResetError' in str(event.get('exception', '')):
return None
return event# ✅ Хорошо: низкая кардинальность
labels={
"app": "users-api",
"env": "production",
"level": "%(levelname)s"
}
# ❌ Плохо: высокая кардинальность
labels={
"user_id": user_id, # 1M+ значений
"request_id": request_id # ∞ значений
}import atexit
import logging
# Регистрация shutdown handler
@atexit.register
def cleanup():
"""Корректная остановка logging handlers."""
for handler in logging.getLogger().handlers:
if hasattr(handler, 'close'):
handler.close()
if hasattr(handler, 'flush'):
handler.flush()
# Или через context manager
class LoggingContext:
def __enter__(self):
self.logger = setup_logging()
return self.logger
def __exit__(self, exc_type, exc_val, exc_tb):
for handler in self.logger.handlers:
handler.close()
handler.flush()
with LoggingContext() as logger:
logger.info("Application running")# ❌ Плохо: health check спамит логи
@app.get("/health")
def health():
logger.info("Health check") # Тысячи запросов в час
return {"status": "ok"}
# ✅ Хорошо: debug уровень или без логов
@app.get("/health")
def health():
logger.debug("Health check") # Только при отладке
return {"status": "ok"}
# ✅ Лучше: отдельный logger для health
health_logger = logging.getLogger('health')
health_logger.setLevel(logging.WARNING) # Игнорировать INFO
@app.get("/health")
def health():
health_logger.info("Health check") # Не попадёт в логи
return {"status": "ok"}Проверка:
import sentry_sdk
# Проверка DSN
print(sentry_sdk.Hub.current.client.dsn)
# Тестовое событие
sentry_sdk.capture_message("Test message")
# Включение debug режима
sentry_sdk.init(
dsn="...",
debug=True # Покажет ошибки отправки
)Проверка IAM:
aws sts get-caller-identity
aws logs describe-log-groups --log-group-name-prefix my-appПроверка credentials:
import boto3
# Проверка credentials
session = boto3.Session()
credentials = session.get_credentials()
print(f"Access key: {credentials.access_key}")Проблема: Логи дублируются в stdout.
Решение: Отключить propagate.
logger = logging.getLogger('myapp')
logger.propagate = False # Не передавать в root loggerВопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.