Тестирование логов, отладка конфигурации, common pitfalls
Тестирование логов обеспечивает уверенность, что приложение логирует правильную информацию. Отладка помогает решать проблемы с конфигурацией.
Проблема: Нужно проверить, что код логирует ожидаемые сообщения.
Решение: pytest caplog фикстура для перехвата логов.
# app.py
import logging
logger = logging.getLogger(__name__)
def process_data(data):
if not data:
logger.warning("Empty data received")
return None
logger.info(f"Processing {len(data)} items")
# ... обработка
return True# test_app.py
import pytest
import logging
def test_process_data_empty(caplog):
"""Тест: пустые данные вызывают warning."""
from app import process_data
caplog.set_level(logging.WARNING)
result = process_data([])
assert result is None
assert "Empty data received" in caplog.text
assert caplog.record_tuples == [
('app', logging.WARNING, 'Empty data received')
]
def test_process_data_with_items(caplog):
"""Тест: данные логируют info."""
from app import process_data
caplog.set_level(logging.INFO)
result = process_data([1, 2, 3])
assert result is True
assert "Processing 3 items" in caplog.text# app.py
logger.info("User logged in", extra={"user_id": 123, "ip": "192.168.1.1"})# test_app.py
def test_user_login_logs_extra(caplog):
"""Тест: лог содержит extra поля."""
from app import login_user
caplog.set_level(logging.INFO)
login_user(user_id=123, ip="192.168.1.1")
# Проверка через records
assert len(caplog.records) == 1
record = caplog.records[0]
assert record.user_id == 123
assert record.ip == "192.168.1.1"
assert record.message == "User logged in"def test_process_multiple_items(caplog):
"""Тест: проверка количества логов."""
from app import process_batch
caplog.set_level(logging.DEBUG)
process_batch([1, 2, 3, 4, 5])
# Проверка количества warning логов
warnings = [r for r in caplog.records if r.levelname == 'WARNING']
assert len(warnings) == 0
# Проверка количества info логов
infos = [r for r in caplog.records if r.levelname == 'INFO']
assert len(infos) >= 1@pytest.mark.parametrize("level,expected_count", [
(logging.DEBUG, 4),
(logging.INFO, 3),
(logging.WARNING, 1),
(logging.ERROR, 0),
])
def test_log_levels(caplog, level, expected_count):
"""Тест: проверка логов на разных уровнях."""
from app import run_process
caplog.set_level(level)
run_process()
assert len(caplog.records) == expected_countПроблема: caplog не подходит для сложных сценариев.
Решение: MagicMock handler для перехвата логов.
import logging
from unittest.mock import MagicMock
def test_with_mock_handler():
"""Тест с MagicMock handler."""
from app import logger
# Создание mock handler
mock_handler = MagicMock()
logger.addHandler(mock_handler)
try:
# Вызов кода
logger.info("Test message")
# Проверка вызова
assert mock_handler.handle.called
assert mock_handler.handle.call_count == 1
# Проверка LogRecord
log_record = mock_handler.handle.call_args[0][0]
assert log_record.getMessage() == "Test message"
finally:
logger.removeHandler(mock_handler)import logging
from unittest.mock import MagicMock
from io import StringIO
def test_formatter_output():
"""Тест: проверка вывода formatter."""
# StringIO для перехвата вывода
stream = StringIO()
handler = logging.StreamHandler(stream)
handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
logger = logging.getLogger('test')
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Логирование
logger.info("Test message")
# Проверка вывода
output = stream.getvalue()
assert "INFO: Test message" in outputПроблема: QueueHandler требует обработки очереди.
Решение: QueueListener для обработки сообщений.
import logging
import logging.handlers
import queue
import pytest
@pytest.fixture
def async_logging_setup():
"""Фикстура для асинхронного логирования."""
log_queue = queue.Queue()
queue_handler = logging.handlers.QueueHandler(log_queue)
# StringIO handler для перехвата
stream = StringIO()
stream_handler = logging.StreamHandler(stream)
listener = logging.handlers.QueueListener(log_queue, stream_handler)
listener.start()
logger = logging.getLogger('test')
logger.addHandler(queue_handler)
logger.setLevel(logging.INFO)
yield stream, logger
# Остановка и очистка
listener.stop()
logger.removeHandler(queue_handler)
def test_async_logging(async_logging_setup):
"""Тест асинхронного логирования."""
stream, logger = async_logging_setup
logger.info("Async message")
# Ожидание обработки
import time
time.sleep(0.1)
output = stream.getvalue()
assert "Async message" in outputПроблема: Логи не выводятся или дублируются.
Решение: Диагностика конфигурации.
import logging
def debug_logging_config():
"""Диагностика конфигурации logging."""
logger = logging.getLogger(__name__)
print(f"Logger name: {logger.name}")
print(f"Logger level: {logger.level} ({logging.getLevelName(logger.level)})")
print(f"Logger handlers: {logger.handlers}")
print(f"Logger propagate: {logger.propagate}")
print()
# Root logger
root = logging.getLogger()
print(f"Root level: {root.level} ({logging.getLevelName(root.level)})")
print(f"Root handlers: {root.handlers}")
for i, handler in enumerate(root.handlers):
print(f" Handler {i}: {type(handler).__name__} "
f"level={handler.level} "
f"formatter={handler.formatter}")
print()
# Проверка иерархии
print("Logger hierarchy:")
current = logger
while current:
print(f" {current.name} (level={current.level})")
current = current.parent
# Использование
debug_logging_config()logger = logging.getLogger(__name__)
print(f"Effective level: {logger.getEffectiveLevel()}")
# Показывает уровень, который будет использоваться
# (свой или ближайшего родителя с установленным уровнем)import logging
# Включение внутреннего логирования модуля logging
logging.basicConfig(level=logging.DEBUG)
internal_logger = logging.getLogger('logging')
internal_logger.setLevel(logging.DEBUG)
# Теперь видно, что происходит внутри logging модуляПроблема: Каждое сообщение выводится дважды.
Причина: Handler в logger + handler в root logger.
# ❌ Плохо: дублирование
logging.basicConfig(level=logging.INFO) # Handler в root
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler()) # Ещё один handler
logger.info("Test") # Выводится дважды!Решение:
# ✅ Хорошо: один handler
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Не добавлять handler, использовать root
# Или: отключить propagation
logger.addHandler(logging.StreamHandler())
logger.propagate = FalseПроблема: logger.info() ничего не выводит.
Причины:
Диагностика:
logger = logging.getLogger(__name__)
print(f"Level: {logger.level}") # 0 = NOTSET
print(f"Handlers: {logger.handlers}") # Пусто?
print(f"Effective level: {logger.getEffectiveLevel()}")Решение:
# Добавить handler
if not logger.handlers:
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)Проблема: basicConfig() вызван, но ничего не происходит.
Причина: basicConfig работает только один раз.
# ❌ Плохо: второй вызов игнорируется
logging.basicConfig(level=logging.INFO)
# ... какой-то код ...
logging.basicConfig(level=logging.DEBUG) # Игнорируется!Решение:
# ✅ Хорошо: один вызов
if not logging.getLogger().handlers:
logging.basicConfig(level=logging.INFO)
# Или: dictConfig для полной конфигурации
import logging.config
logging.config.dictConfig({...})Проблема: extra поля не появляются в выводе.
Причина: Formatter не включает поля.
# ❌ Плохо: formatter не знает о extra полях
formatter = logging.Formatter('%(levelname)s: %(message)s')
logger.info("Test", extra={"user_id": 123})
# user_id не выводитсяРешение:
# ✅ Хорошо: включить extra поля в formatter
formatter = logging.Formatter(
'%(levelname)s: %(message)s - user_id=%(user_id)s'
)
# Или: использовать JsonFormatter
from pythonjsonlogger import jsonlogger
formatter = jsonlogger.JsonFormatter()Проблема: Нужно проверить конфигурацию перед deploy.
Решение: Интеграционные тесты конфигурации.
import logging.config
import yaml
import pytest
@pytest.fixture
def logging_config():
"""Загрузка production конфигурации."""
with open('logging.production.yaml') as f:
return yaml.safe_load(f)
def test_logging_config_valid(logging_config):
"""Тест: конфигурация валидна."""
# Проверка структуры
assert 'version' in logging_config
assert logging_config['version'] == 1
assert 'handlers' in logging_config
assert 'loggers' in logging_config
def test_logging_config_applies(logging_config):
"""Тест: конфигурация применяется без ошибок."""
import logging.config
logging.config.dictConfig(logging_config)
logger = logging.getLogger('app')
assert logger is not None
assert len(logger.handlers) > 0
def test_logging_output(logging_config, tmp_path):
"""Тест: логи записываются в файл."""
# Модификация конфига для теста
logging_config['handlers']['file']['filename'] = str(tmp_path / 'test.log')
logging.config.dictConfig(logging_config)
logger = logging.getLogger('app')
logger.info("Test message")
# Ожидание записи
import time
time.sleep(0.1)
# Проверка файла
log_file = tmp_path / 'test.log'
assert log_file.exists()
content = log_file.read_text()
assert "Test message" in contentПроблема: JSON логи требуют парсинга для тестирования.
Решение: Специализированные фикстуры и утилиты.
# conftest.py
import json
import logging
import pytest
from io import StringIO
from pythonjsonlogger import jsonlogger
@pytest.fixture
def json_logger():
"""
Фикстура для тестирования JSON логов.
Usage:
def test_json_logger(json_logger):
json_logger.info("User created", extra={"user_id": 123})
logs = json_logger.get_logs()
assert logs[0]["user_id"] == 123
"""
stream = StringIO()
handler = logging.StreamHandler(stream)
handler.setFormatter(jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s'
))
logger = logging.getLogger('json_test')
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
class JsonLogCapture:
def __init__(self, stream, logger):
self.stream = stream
self.logger = logger
def get_logs(self):
"""Парсит JSON логи из StringIO."""
self.stream.seek(0)
lines = self.stream.getvalue().strip().split('\n')
return [json.loads(line) for line in lines if line]
def clear(self):
self.stream.truncate(0)
self.stream.seek(0)
capture = JsonLogCapture(stream, logger)
yield capture
logger.removeHandler(handler)
handler.close()
def test_json_logger_with_extra(json_logger):
"""Тест: JSON лог с extra полями."""
json_logger.logger.info(
"User action",
extra={
"user_id": 123,
"action": "login",
"ip_address": "192.168.1.1"
}
)
logs = json_logger.get_logs()
assert len(logs) == 1
assert logs[0]["message"] == "User action"
assert logs[0]["user_id"] == 123
assert logs[0]["action"] == "login"
assert logs[0]["ip_address"] == "192.168.1.1"
assert "asctime" in logs[0]
assert "levelname" in logs[0]# conftest.py
import pytest
import structlog
from structlog.testing import capture_logs
@pytest.fixture
def structlog_capture():
"""
Фикстура для перехвата structlog событий.
Usage:
def test_structlog(structlog_capture):
logger = structlog.get_logger()
logger.info("user_created", user_id=123)
assert structlog_capture[0]["event"] == "user_created"
"""
with capture_logs() as cap_logs:
yield cap_logs
def test_structlog_events(structlog_capture):
"""Тест: structlog события корректны."""
# Конфигурация structlog для теста
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=False,
)
logger = structlog.get_logger()
logger.info("user_created", user_id=123, username="john")
logger.warning("low_balance", account_id=456, balance=-100)
assert len(structlog_capture) == 2
assert structlog_capture[0]["event"] == "user_created"
assert structlog_capture[0]["user_id"] == 123
assert structlog_capture[1]["event"] == "low_balance"# conftest.py
import json
import logging
import pytest
from io import StringIO
from pydantic import BaseModel, ValidationError
from datetime import datetime
class LogRecordSchema(BaseModel):
"""Схема для валидации JSON логов."""
level: str
message: str
timestamp: str
logger: str
class Config:
extra = 'allow' # Разрешить extra поля
@pytest.fixture
def validated_json_logger():
"""
Фикстура для тестирования JSON логов с валидацией схемы.
Usage:
def test_schema(validated_json_logger):
validated_json_logger.info("Test", extra={"user_id": 123})
logs = validated_json_logger.get_validated_logs()
assert logs[0].user_id == 123
"""
stream = StringIO()
handler = logging.StreamHandler(stream)
class JsonFormatter(logging.Formatter):
def format(self, record):
log_data = {
"level": record.levelname,
"message": record.getMessage(),
"timestamp": datetime.utcnow().isoformat(),
"logger": record.name,
}
# Добавление extra полей
for key, value in record.__dict__.items():
if key not in ['name', 'msg', 'args', 'levelname',
'levelno', 'pathname', 'filename',
'module', 'lineno', 'funcName',
'created', 'msecs', 'relativeCreated',
'thread', 'threadName', 'processName',
'process', 'message', 'asctime']:
log_data[key] = value
return json.dumps(log_data)
handler.setFormatter(JsonFormatter())
logger = logging.getLogger('validated_test')
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
class ValidatedLogCapture:
def __init__(self, stream, logger):
self.stream = stream
self.logger = logger
def get_validated_logs(self):
"""Возвращает валидированные Pydantic модели."""
self.stream.seek(0)
lines = self.stream.getvalue().strip().split('\n')
validated = []
for line in lines:
if line:
data = json.loads(line)
validated.append(LogRecordSchema(**data))
return validated
def clear(self):
self.stream.truncate(0)
self.stream.seek(0)
capture = ValidatedLogCapture(stream, logger)
yield capture
logger.removeHandler(handler)
handler.close()
def test_json_schema_validation(validated_json_logger):
"""Тест: валидация схемы JSON логов."""
validated_json_logger.logger.info(
"Payment processed",
extra={
"payment_id": "pay_123",
"amount": 99.99,
"currency": "USD"
}
)
logs = validated_json_logger.get_validated_logs()
assert len(logs) == 1
log = logs[0]
assert log.level == "INFO"
assert log.message == "Payment processed"
assert log.payment_id == "pay_123"
assert log.amount == 99.99
assert log.currency == "USD"
# Валидация timestamp
datetime.fromisoformat(log.timestamp) # Не должно вызвать ошибку# conftest.py
import pytest
import logging
from io import StringIO
import json
@pytest.fixture(params=[
'json',
'text',
'structlog'
])
def log_format(request):
"""Фикстура для тестирования разных форматов логов."""
return request.param
def test_application_with_formats(log_format, tmp_path):
"""Тест: приложение работает с разными форматами логов."""
from app import setup_logging, run_process
log_file = tmp_path / f"test_{log_format}.log"
setup_logging(format=log_format, filename=str(log_file))
run_process()
# Проверка что файл создан и не пуст
assert log_file.exists()
content = log_file.read_text()
assert len(content) > 0
# Парсинг в зависимости от формата
if log_format == 'json':
for line in content.strip().split('\n'):
json.loads(line) # Валидация JSON# conftest.py
import logging
import pytest
@pytest.fixture(autouse=True)
def isolate_logging():
"""
Фикстура изолирует logging между тестами.
Сохраняет состояние, очищает после теста, восстанавливает.
"""
# Сохранение состояния root logger
root = logging.getLogger()
old_level = root.level
old_handlers = root.handlers[:]
old_propagate = logging.getLogger('app').propagate
yield
# Восстановление состояния
root.handlers.clear()
for handler in old_handlers:
handler.close()
root.handlers.extend(old_handlers)
root.setLevel(old_level)
logging.getLogger('app').propagate = old_propagate
@pytest.fixture
def log_capture():
"""
Фикстура для перехвата логов с кастомным уровнем.
Usage:
def test_something(log_capture):
log_capture.set_level(logging.DEBUG)
function_under_test()
assert log_capture.contains("message")
"""
class LogCapture:
def __init__(self):
self.handler = logging.MemoryHandler(capacity=100)
self.handler.setFormatter(
logging.Formatter('%(name)s - %(levelname)s - %(message)s')
)
logging.getLogger().addHandler(self.handler)
def set_level(self, level):
self.handler.setLevel(level)
@property
def records(self):
return self.handler.buffer
def contains(self, message):
return any(message in r.getMessage() for r in self.records)
def clear(self):
self.handler.acquire()
self.handler.buffer.clear()
self.handler.release()
def teardown(self):
logging.getLogger().removeHandler(self.handler)
self.handler.close()
capture = LogCapture()
yield capture
capture.teardown()# conftest.py
import json
import logging
import pytest
from io import StringIO
@pytest.fixture
def json_log_capture():
"""
Фикстура для перехвата JSON логов.
Usage:
def test_json_log(json_log_capture):
logger.info("User created", extra={"user_id": 123})
logs = json_log_capture.get_logs()
assert logs[0]["event"] == "User created"
assert logs[0]["user_id"] == 123
"""
stream = StringIO()
handler = logging.StreamHandler(stream)
handler.setFormatter(logging.Formatter('%(message)s'))
logger = logging.getLogger('test_app')
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
yield stream, logger
logger.removeHandler(handler)
handler.close()
def test_json_output(json_log_capture):
"""Тест: проверка JSON формата логов."""
stream, logger = json_log_capture
# Эмуляция JSON логирования
import json
log_record = {
"level": "INFO",
"event": "User created",
"user_id": 123,
"timestamp": "2024-01-01T00:00:00Z"
}
logger.info(json.dumps(log_record))
output = stream.getvalue().strip()
parsed = json.loads(output)
assert parsed["event"] == "User created"
assert parsed["user_id"] == 123# conftest.py
import logging
import pytest
@pytest.fixture(params=[
logging.DEBUG,
logging.INFO,
logging.WARNING,
logging.ERROR,
])
def log_level(request):
"""Фикстура для тестирования на разных уровнях логирования."""
return request.param
def test_application_with_different_levels(log_level, caplog):
"""Тест: приложение работает на всех уровнях логирования."""
from app import run_application
caplog.set_level(log_level)
run_application()
# Проверка что логи не падают на любом уровне
assert caplog.records is not None# conftest.py
import logging
import pytest
from pathlib import Path
@pytest.fixture
def temp_log_file(tmp_path):
"""
Фикстура создаёт временный файл для логов.
Usage:
def test_file_logging(temp_log_file, caplog):
setup_file_logging(temp_log_file)
logger.info("Test")
assert "Test" in temp_log_file.read_text()
"""
log_file = tmp_path / "test.log"
yield log_file
# Очистка после теста
if log_file.exists():
log_file.unlink()
def test_handler_logging(temp_log_file):
"""Тест: запись в файл работает корректно."""
handler = logging.FileHandler(temp_log_file)
handler.setFormatter(logging.Formatter('%(levelname)s - %(message)s'))
logger = logging.getLogger('test')
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.info("Test message")
handler.close()
logger.removeHandler(handler)
content = temp_log_file.read_text()
assert "INFO - Test message" in contentПроблема: Логирование влияет на производительность приложения.
Решение: Бенчмарки для оценки overhead логирования.
# test_logging_performance.py
import logging
import time
import pytest
from io import StringIO
@pytest.fixture(params=[
'no_handler',
'stream_handler',
'file_handler',
'json_handler',
'async_handler',
])
def logging_config(request, tmp_path):
"""Разные конфигурации для бенчмарка."""
config_type = request.param
logger = logging.getLogger('perf_test')
logger.handlers.clear()
logger.setLevel(logging.DEBUG)
if config_type == 'no_handler':
pass # Без handlers
elif config_type == 'stream_handler':
handler = logging.StreamHandler(StringIO())
handler.setFormatter(logging.Formatter('%(levelname)s - %(message)s'))
logger.addHandler(handler)
elif config_type == 'file_handler':
log_file = tmp_path / 'perf.log'
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter('%(levelname)s - %(message)s'))
logger.addHandler(handler)
elif config_type == 'json_handler':
from pythonjsonlogger import jsonlogger
handler = logging.StreamHandler(StringIO())
handler.setFormatter(jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s'
))
logger.addHandler(handler)
elif config_type == 'async_handler':
import logging.handlers
import queue
log_queue = queue.Queue()
queue_handler = logging.handlers.QueueHandler(log_queue)
stream_handler = logging.StreamHandler(StringIO())
listener = logging.handlers.QueueListener(
log_queue, stream_handler
)
listener.start()
logger.addHandler(queue_handler)
# Сохраняем listener для остановки
logger._listener = listener
yield logger, config_type
# Очистка
for handler in logger.handlers[:]:
handler.close()
logger.removeHandler(handler)
if hasattr(logger, '_listener'):
logger._listener.stop()
def test_logging_throughput(logging_config, benchmark):
"""Бенчмарк: throughput логирования."""
logger, config_type = logging_config
def log_many():
for i in range(1000):
logger.info(f"Message {i}")
# Для async handler
if hasattr(logger, '_listener'):
time.sleep(0.1)
benchmark(log_many)pip install pytest-benchmark# test_logging_benchmark.py
import logging
import pytest
from io import StringIO
@pytest.fixture
def fast_logger():
"""Быстрый logger без formatter."""
logger = logging.getLogger('fast_test')
logger.handlers.clear()
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(StringIO())
# Минимальный formatter
handler.setFormatter(logging.Formatter('%(levelname)s'))
logger.addHandler(handler)
yield logger
logger.removeHandler(handler)
handler.close()
@pytest.fixture
def slow_logger():
"""Медленный logger с complex formatter."""
logger = logging.getLogger('slow_test')
logger.handlers.clear()
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(StringIO())
# Сложный formatter
handler.setFormatter(logging.Formatter(
'%(asctime)s %(name)s %(levelname)s %(message)s '
'%(pathname)s:%(lineno)d %(funcName)s'
))
logger.addHandler(handler)
yield logger
logger.removeHandler(handler)
handler.close()
def test_fast_logger_benchmark(benchmark, fast_logger):
"""Бенчмарк быстрого logger."""
def log():
for i in range(100):
fast_logger.info(f"Message {i}")
result = benchmark(log)
print(f"Fast logger: {result}")
def test_slow_logger_benchmark(benchmark, slow_logger):
"""Бенчмарк медленного logger."""
def log():
for i in range(100):
slow_logger.info(f"Message {i}")
result = benchmark(log)
print(f"Slow logger: {result}")Запуск:
pytest test_logging_benchmark.py --benchmark-onlyimport logging
import logging.handlers
import queue
import time
from contextlib import contextmanager
@contextmanager
def measure_time(label):
"""Контекстный менеджер для замера времени."""
start = time.perf_counter()
yield
end = time.perf_counter()
print(f"{label}: {end - start:.4f} seconds")
def compare_sync_async_logging():
"""Сравнение sync и async логирования."""
iterations = 10000
# Синхронное логирование
sync_logger = logging.getLogger('sync_test')
sync_logger.handlers.clear()
sync_handler = logging.StreamHandler(open('/dev/null', 'w'))
sync_logger.addHandler(sync_handler)
sync_logger.setLevel(logging.DEBUG)
with measure_time(f"Sync ({iterations} logs)"):
for i in range(iterations):
sync_logger.info(f"Sync message {i}")
sync_handler.close()
# Асинхронное логирование
async_logger = logging.getLogger('async_test')
async_logger.handlers.clear()
log_queue = queue.Queue()
queue_handler = logging.handlers.QueueHandler(log_queue)
async_handler = logging.StreamHandler(open('/dev/null', 'w'))
listener = logging.handlers.QueueListener(log_queue, async_handler)
listener.start()
async_logger.addHandler(queue_handler)
async_logger.setLevel(logging.DEBUG)
with measure_time(f"Async ({iterations} logs)"):
for i in range(iterations):
async_logger.info(f"Async message {i}")
# Ожидание обработки очереди
time.sleep(0.1)
listener.stop()
async_handler.close()
# Запуск сравнения
if __name__ == '__main__':
compare_sync_async_logging()import logging
import tracemalloc
import pytest
@pytest.fixture
def memory_logger():
"""Logger для тестирования памяти."""
logger = logging.getLogger('memory_test')
logger.handlers.clear()
logger.setLevel(logging.DEBUG)
yield logger
logger.handlers.clear()
def test_logging_memory_usage(memory_logger):
"""Тест: потребление памяти при логировании."""
tracemalloc.start()
# Логирование 10000 сообщений
for i in range(10000):
memory_logger.info(f"Message {i}")
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Current memory: {current / 1024 / 1024:.2f} MB")
print(f"Peak memory: {peak / 1024 / 1024:.2f} MB")
# Проверка что память не утекła
assert peak < 50 * 1024 * 1024 # Менее 50 MB
def test_handler_memory_leak():
"""Тест: утечка памяти в handlers."""
tracemalloc.start()
logger = logging.getLogger('leak_test')
logger.handlers.clear()
# Многократное добавление/удаление handlers
for _ in range(100):
handler = logging.StreamHandler()
logger.addHandler(handler)
logger.info("Test message")
logger.removeHandler(handler)
handler.close()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
# Проверка что память освобождается
assert current < 10 * 1024 * 1024 # Менее 10 MBimport logging
import logging.handlers
import threading
import time
import pytest
def stress_test_logging(num_threads=10, logs_per_thread=1000):
"""Нагрузочный тест: многопоточное логирование."""
logger = logging.getLogger('stress_test')
logger.handlers.clear()
logger.setLevel(logging.DEBUG)
# Файловый handler
handler = logging.FileHandler('/tmp/stress_test.log')
handler.setFormatter(logging.Formatter(
'%(asctime)s %(threadName)s %(levelname)s %(message)s'
))
logger.addHandler(handler)
def log_from_thread(thread_id):
for i in range(logs_per_thread):
logger.info(f"Thread {thread_id}: Message {i}")
threads = []
start_time = time.time()
# Запуск потоков
for i in range(num_threads):
t = threading.Thread(target=log_from_thread, args=(i,))
threads.append(t)
t.start()
# Ожидание завершения
for t in threads:
t.join()
end_time = time.time()
handler.close()
total_logs = num_threads * logs_per_thread
duration = end_time - start_time
print(f"Total logs: {total_logs}")
print(f"Duration: {duration:.2f}s")
print(f"Logs/sec: {total_logs / duration:.2f}")
return {
'total_logs': total_logs,
'duration': duration,
'logs_per_second': total_logs / duration
}
def test_stress_logging():
"""Тест: стабильность при высокой нагрузке."""
result = stress_test_logging(num_threads=10, logs_per_thread=1000)
# Проверка что производительность приемлемая
assert result['logs_per_second'] > 1000 # Более 1000 логов/секПроблема: Логи в контейнерах и Kubernetes требуют особого подхода.
Решение: Специализированные техники отладки для containerized сред.
# docker_logging.py
import logging
import sys
import os
def setup_docker_logging():
"""
Настройка логирования для Docker контейнеров.
Best practices:
- Логирование в stdout/stderr (собирается docker logs)
- JSON формат для парсинга
- Без файловых handlers
"""
# Определение уровня из переменной окружения
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
logger = logging.getLogger()
logger.setLevel(getattr(logging, log_level))
# Очистка существующих handlers
logger.handlers.clear()
# Stream handler для stdout
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(getattr(logging, log_level))
# JSON формат для production
if os.getenv('ENVIRONMENT') == 'production':
from pythonjsonlogger import jsonlogger
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s '
'%(pathname)s %(lineno)d %(funcName)s'
)
else:
# Цветной вывод для development
formatter = logging.Formatter(
'%(asctime)s %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
stdout_handler.setFormatter(formatter)
logger.addHandler(stdout_handler)
# Ошибки в stderr
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(logging.ERROR)
stderr_handler.setFormatter(formatter)
logger.addHandler(stderr_handler)
return logger
# Использование
logger = setup_docker_logging()Dockerfile:
FROM python:3.11-slim
ENV LOG_LEVEL=INFO
ENV ENVIRONMENT=production
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Не запускать как root
RUN useradd -m appuser
USER appuser
CMD ["python", "main.py"]docker-compose.yml:
version: '3.8'
services:
app:
build: .
environment:
- LOG_LEVEL=DEBUG
- ENVIRONMENT=development
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
# Для отладки можно увеличить verbosity
command: ["python", "-u", "main.py"] # -u для unbuffered output# k8s_logging.py
import logging
import os
import socket
from pythonjsonlogger import jsonlogger
def setup_kubernetes_logging():
"""
Настройка логирования для Kubernetes.
Учитывает:
- Pod name, namespace, node name
- Container name
- Labels для фильтрации
"""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.handlers.clear()
# Kubernetes metadata из environment variables
pod_name = os.getenv('POD_NAME', 'unknown')
namespace = os.getenv('POD_NAMESPACE', 'default')
node_name = os.getenv('NODE_NAME', 'unknown')
container_name = os.getenv('CONTAINER_NAME', 'app')
class KubernetesJsonFormatter(jsonlogger.JsonFormatter):
"""JSON formatter с Kubernetes metadata."""
def add_fields(self, log_record, record, message_dict):
super().add_fields(log_record, record, message_dict)
# Добавление Kubernetes metadata
log_record['pod_name'] = pod_name
log_record['namespace'] = namespace
log_record['node_name'] = node_name
log_record['container_name'] = container_name
log_record['hostname'] = socket.gethostname()
# Kubernetes recommended поля
log_record['severity'] = record.levelname
log_record['logging.googleapis.com/sourceLocation'] = {
'file': record.pathname,
'line': record.lineno,
'function': record.funcName,
}
handler = logging.StreamHandler()
handler.setFormatter(KubernetesJsonFormatter())
logger.addHandler(handler)
return logger
logger = setup_kubernetes_logging()Kubernetes Deployment с логированием:
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-app
spec:
replicas: 3
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
version: v1
annotations:
# Для сбора логов Fluentd/Fluent Bit
fluentbit.io/parser: "json"
spec:
containers:
- name: app
image: my-app:latest
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: CONTAINER_NAME
value: "app"
- name: LOG_LEVEL
value: "INFO"
- name: ENVIRONMENT
value: "production"
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"Просмотр логов:
# Логи пода
kubectl logs my-app-5d4f6c7b8-xvz9p
# Логи с follow
kubectl logs -f my-app-5d4f6c7b8-xvz9p
# Логи предыдущего контейнера (после crash)
kubectl logs my-app-5d4f6c7b8-xvz9p --previous
# Логи с timestamp
kubectl logs my-app-5d4f6c7b8-xvz9p -f --timestamps
# Поиск по логам
kubectl logs my-app-5d4f6c7b8-xvz9p | grep "ERROR"
# Последние N строк
kubectl logs my-app-5d4f6c7b8-xvz9p --tail=100
# Логи всех контейнеров в поде
kubectl logs my-app-5d4f6c7b8-xvz9p --all-containers
# Логи конкретного контейнера
kubectl logs my-app-5d4f6c7b8-xvz9p -c sidecar
# Логи всех подов с label
kubectl logs -l app=my-app --all-containersОтладка с kubectl debug:
# Запуск ephemeral контейнера для отладки
kubectl debug -it my-app-5d4f6c7b8-xvz9p --image=busybox
# Внутри debug контейнера можно посмотреть логи
cat /var/log/containers/*.log
# Или использовать journalctl
journalctl -u containerd# elk_logging.py
import logging
from logging.handlers import SocketHandler
import json
import os
def setup_elk_logging():
"""
Настройка логирования для ELK Stack (Elasticsearch, Logstash, Kibana).
Архитектура:
App -> Logstash -> Elasticsearch -> Kibana
"""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.handlers.clear()
# Local stdout для сбора Filebeat/Fluentd
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(console_handler)
# Опционально: прямая отправка в Logstash
logstash_host = os.getenv('LOGSTASH_HOST')
logstash_port = os.getenv('LOGSTASH_PORT', 5000)
if logstash_host:
class LogstashHandler(SocketHandler):
"""Handler для отправки в Logstash."""
def __init__(self, host, port):
super().__init__(host, port)
self.app_name = os.getenv('APP_NAME', 'app')
def makePickle(self, record):
log_data = {
'@timestamp': self.formatTime(record),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'app': self.app_name,
'host': record.hostname if hasattr(record, 'hostname') else '',
'pathname': record.pathname,
'lineno': record.lineno,
}
# Добавление extra полей
for key, value in record.__dict__.items():
if key not in ['name', 'msg', 'args', 'levelname',
'levelno', 'pathname', 'filename',
'module', 'lineno', 'funcName',
'created', 'msecs', 'relativeCreated',
'thread', 'threadName', 'processName',
'process', 'message', 'asctime', 'hostname']:
log_data[key] = value
return json.dumps(log_data).encode('utf-8') + b'\n'
logstash_handler = LogstashHandler(logstash_host, logstash_port)
logstash_handler.setFormatter(logging.Formatter())
logger.addHandler(logstash_handler)
return logger
logger = setup_elk_logging()Logstash конфигурация:
# logstash.conf
input {
tcp {
port => 5000
codec => json_lines
}
}
filter {
# Парсинг timestamp
date {
match => ["@timestamp", "ISO8601"]
}
# Добавление metadata
mutate {
add_field => {
"environment" => "production"
"team" => "backend"
}
}
# Парсинг Python traceback
if [message] =~ "Traceback" {
grok {
match => { "message" => "(?<traceback>.*)" }
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}# loki_logging.py
import logging
import os
from pythonjsonlogger import jsonlogger
def setup_loki_logging():
"""
Настройка логирования для Grafana Loki.
Loki использует labels для индексации,
поэтому важно добавлять структурированные поля.
"""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.handlers.clear()
handler = logging.StreamHandler()
# JSON формат с labels для Loki
class LokiJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super().add_fields(log_record, record, message_dict)
# Loki labels (должны быть строками)
log_record['level'] = record.levelname
log_record['service'] = os.getenv('SERVICE_NAME', 'app')
log_record['environment'] = os.getenv('ENVIRONMENT', 'dev')
log_record['version'] = os.getenv('APP_VERSION', 'unknown')
handler.setFormatter(LokiJsonFormatter())
logger.addHandler(handler)
return logger
logger = setup_loki_logging()Promtail конфигурация:
# promtail.yaml
server:
http_listen_port: 9080
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: containers
static_configs:
- targets:
- localhost
labels:
job: containerlogs
__path__: /var/log/containers/*pod-name*.log
pipeline_stages:
- json:
expressions:
level: level
service: service
message: message
- labels:
- level
- service
- environment
- output:
source: messageLogQL запросы в Grafana:
# Все ERROR логи
{service="my-app", environment="production"} |= "ERROR"
# Логи по user_id
{service="my-app"} | json | user_id="123"
# Агрегация по уровню
sum by (level) (count_over_time({service="my-app"}[1h]))
# Поиск traceback
{service="my-app"} |= "Traceback"
# Rate логов
rate({service="my-app"}[5m])# tracing_logging.py
import logging
import os
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def setup_tracing_with_logging():
"""
Настройка distributed tracing с корреляцией логов.
Интеграция с Jaeger/Zipkin для tracing,
с добавлением trace_id в логи.
"""
# Настройка tracing
trace.set_tracer_provider(TracerProvider())
jaeger_exporter = JaegerExporter(
agent_host_name=os.getenv('JAEGER_HOST', 'localhost'),
agent_port=int(os.getenv('JAEGER_PORT', 6831)),
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
tracer = trace.get_tracer(__name__)
# Настройка логирования с trace_id
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.handlers.clear()
class TraceIdFormatter(logging.Formatter):
"""Formatter с добавлением trace_id."""
def format(self, record):
# Получение текущего trace_id из context
span = trace.get_current_span()
if span.is_recording():
trace_id = span.get_span_context().trace_id
# Форматирование trace_id в hex
record.trace_id = f"{trace_id:032x}"
record.span_id = f"{span.get_span_context().span_id:016x}"
else:
record.trace_id = "0" * 32
record.span_id = "0" * 16
return super().format(record)
handler = logging.StreamHandler()
handler.setFormatter(TraceIdFormatter(
'%(asctime)s [%(trace_id)s:%(span_id)s] %(levelname)s %(message)s'
))
logger.addHandler(handler)
return logger, tracer
# Использование
logger, tracer = setup_tracing_with_logging()
def process_request(request_id):
with tracer.start_as_current_span("process_request") as span:
span.set_attribute("request_id", request_id)
logger.info(f"Processing request {request_id}")
# ... обработка□ 1. Проверить уровень логирования
- logger.level != 0 (NOTSET)?
- logger.getEffectiveLevel() показывает что?
- Handler.level установлен?
□ 2. Проверить наличие handlers
- logger.handlers не пуст?
- Root logger имеет handlers?
□ 3. Проверить иерархию
- logger.propagate = True?
- Родительский logger имеет handlers?
□ 4. Проверить фильтр
- Нет ли filter на handler?
- Нет ли filter на logger?
Команды для диагностики:
import logging
logger = logging.getLogger('my_logger')
# Быстрая диагностика
print(f"Level: {logger.level} ({logging.getLevelName(logger.level)})")
print(f"Effective: {logger.getEffectiveLevel()}")
print(f"Handlers: {len(logger.handlers)}")
print(f"Propagate: {logger.propagate}")
print(f"Disabled: {logger.disabled}")
# Проверка root
root = logging.getLogger()
print(f"Root level: {root.level}")
print(f"Root handlers: {len(root.handlers)}")□ 1. Проверить количество handlers
- len(logger.handlers) > 1?
- len(root.handlers) > 0?
□ 2. Проверить propagate
- logger.propagate = False если есть свои handlers?
□ 3. Проверить иерархию
- Родительский logger имеет те же handlers?
Решение:
# Вариант 1: Использовать только root logger
logger = logging.getLogger(__name__)
# Не добавлять handlers, использовать root
# Вариант 2: Отключить propagate
logger = logging.getLogger(__name__)
logger.addHandler(my_handler)
logger.propagate = False
# Вариант 3: Очистить унаследованные handlers
logger.handlers.clear()
logger.addHandler(my_handler)□ 1. Проверить количество вызовов
- basicConfig вызывался раньше?
- Есть ли другие handlers в root?
□ 2. Проверить force параметр (Python 3.8+)
- logging.basicConfig(..., force=True)
□ 3. Проверить наличие handlers в root
- len(logging.getLogger().handlers) > 0?
Решение:
# Вариант 1: Использовать force (Python 3.8+)
logging.basicConfig(level=logging.DEBUG, force=True)
# Вариант 2: Проверка перед вызовом
if not logging.getLogger().handlers:
logging.basicConfig(level=logging.INFO)
# Вариант 3: Очистка root handlers
root = logging.getLogger()
root.handlers.clear()
logging.basicConfig(level=logging.INFO)
# Вариант 4: Использовать dictConfig
import logging.config
logging.config.dictConfig({...})□ 1. Проверить formatter
- Formatter использует %(field_name)s?
- JsonFormatter используется?
□ 2. Проверить передачу extra
- logger.info("msg", extra={"field": value})?
- Поля не конфликтуют с зарезервированными?
□ 3. Проверить запись в LogRecord
- record.__dict__ содержит extra поля?
Решение:
# Вариант 1: Добавить поля в formatter
formatter = logging.Formatter(
'%(levelname)s - %(message)s - user_id=%(user_id)s'
)
# Вариант 2: Использовать JsonFormatter
from pythonjsonlogger import jsonlogger
formatter = jsonlogger.JsonFormatter()
# Вариант 3: Кастомный formatter
class ExtraFormatter(logging.Formatter):
def format(self, record):
# Добавление extra полей автоматически
for key, value in record.__dict__.items():
if key not in ['name', 'msg', 'args', 'levelname',
'levelno', 'pathname', 'filename',
'module', 'lineno', 'funcName',
'created', 'msecs', 'relativeCreated',
'thread', 'threadName', 'processName',
'process', 'message', 'asctime']:
record.__dict__[key] = value
return super().format(record)□ 1. Проверить тип handler
- FileHandler может быть медленным
- Сетевые handlers (SocketHandler) блокируют?
□ 2. Проверить formatter
- Сложный formatter замедляет?
- Много полей в formatter?
□ 3. Проверить уровень логирования
- DEBUG в production?
- Много лишних логов?
□ 4. Проверить I/O
- Диск медленный?
- Сеть лагает?
Решение:
# Вариант 1: Использовать QueueHandler (асинхронно)
import logging.handlers
import queue
log_queue = queue.Queue()
queue_handler = logging.handlers.QueueHandler(log_queue)
# Быстрый handler для записи
file_handler = logging.FileHandler('app.log')
listener = logging.handlers.QueueListener(log_queue, file_handler)
listener.start()
logger.addHandler(queue_handler)
# Вариант 2: Увеличить уровень логирования
logger.setLevel(logging.WARNING)
# Вариант 3: Упростить formatter
formatter = logging.Formatter('%(levelname)s: %(message)s')| Проблема | Причина | Решение |
|---|---|---|
| Логи не выводятся | Нет handlers | logger.addHandler(StreamHandler()) |
| Логи не выводятся | Уровень слишком высокий | logger.setLevel(DEBUG) |
| Дублирование | Handler в logger + root | logger.propagate = False |
| Дублирование | Несколько одинаковых handlers | logger.handlers.clear() |
| basicConfig игнорируется | Вызывался раньше | basicConfig(..., force=True) |
| Extra поля не видны | Formatter не включает | Добавить %(field)s в format |
| JSON не парсится | Нет JsonFormatter | pythonjsonlogger.jsonlogger.JsonFormatter() |
| Тормозит логирование | Блокирующий I/O | QueueHandler + QueueListener |
| Утечка памяти | Handlers не закрываются | handler.close() после removeHandler() |
| Нет trace_id | Нет интеграции | OpenTelemetry + кастомный formatter |
# Плохо
for item in large_list:
logger.debug(f"Processing {item}") # Тысячи логов
# Хорошо
if len(large_list) > 100:
logger.debug(f"Processing batch of {len(large_list)} items")
else:
for item in large_list:
logger.debug(f"Processing {item}")# Плохо
logger.info(f"User login: {username}, password: {password}")
logger.debug(f"Request headers: {request.headers}") # Может содержать токены
# Хорошо
logger.info(f"User login attempt: {username}")
logger.debug(f"Request headers: {sanitize_headers(request.headers)}")
def sanitize_headers(headers):
safe = dict(headers)
safe.pop('authorization', None)
safe.pop('cookie', None)
return safe# Плохо
try:
process()
except Exception as e:
logger.error(f"Error: {e}") # Нет traceback!
# Хорошо
try:
process()
except Exception:
logger.exception("Error processing") # Автоматически добавляет traceback
# Или
try:
process()
except Exception as e:
logger.error("Error processing", exc_info=True)# Плохо
def process():
logger = logging.getLogger(__name__) # Создаётся каждый раз
logger.info("Processing")
# Хорошо
logger = logging.getLogger(__name__) # На уровне модуля
def process():
logger.info("Processing")# Плохо
print("Debug: processing...")
print(f"Error: {error}")
# Хорошо
logger.debug("Processing...")
logger.error("Error: %s", error)# Плохо
logger.info("User " + username + " logged in with id " + str(user_id))
# Хорошо
logger.info("User %s logged in with id %d", username, user_id)
# Или с f-string (Python 3.6+)
logger.info(f"User {username} logged in with id {user_id}")# Плохо
logger.error("Failed to process request")
# Хорошо
logger.error(
"Failed to process request",
extra={
"request_id": request_id,
"user_id": user_id,
"endpoint": endpoint,
"method": request.method,
}
)project/
├── logging_config/
│ ├── __init__.py
│ ├── formatters.py # Кастомные formatter
│ ├── handlers.py # Кастомные handlers
│ ├── filters.py # Кастомные filters
│ └── conf.py # Конфигурация
├── tests/
│ ├── conftest.py # Fixtures для тестирования
│ └── test_logging.py # Тесты логирования
└── main.py
# logging_config/__init__.py
import logging
import logging.config
from pathlib import Path
import yaml
def setup_logging(
env: str = 'development',
config_file: str = 'logging.yaml',
):
"""Централизованная настройка логирования."""
config_path = Path(__file__).parent / config_file
if config_path.exists():
with open(config_path) as f:
config = yaml.safe_load(f)
# Переопределение для разных сред
if env == 'production':
config['loggers']['']['level'] = 'WARNING'
config['handlers']['console']['level'] = 'WARNING'
logging.config.dictConfig(config)
else:
# Fallback конфигурация
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(name)s %(levelname)s %(message)s'
)import logging
import random
class SamplingFilter(logging.Filter):
"""Фильтр для sampling логов."""
def __init__(self, rate=0.1):
self.rate = rate
def filter(self, record):
return random.random() < self.rate
# Использование
logger = logging.getLogger('high_volume')
handler = logging.StreamHandler()
handler.addFilter(SamplingFilter(rate=0.01)) # 1% логов
logger.addHandler(handler)import logging
from logging.handlers import RateLimiter
class RateLimitedFilter(logging.Filter):
"""Фильтр для rate limiting логов."""
def __init__(self, max_records=10, interval=60):
self.limiter = RateLimiter(max_records, interval)
def filter(self, record):
return self.limiter.try_acquire()
# Использование
logger = logging.getLogger('api')
handler = logging.StreamHandler()
handler.addFilter(RateLimitedFilter(max_records=100, interval=60))
logger.addHandler(handler)import logging
import time
from functools import wraps
logger = logging.getLogger('health')
def log_execution_time(level=logging.INFO):
"""Decorator для логирования времени выполнения."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = func(*args, **kwargs)
logger.log(
level,
f"{func.__name__} completed in {time.perf_counter() - start:.4f}s"
)
return result
except Exception:
logger.exception(
f"{func.__name__} failed after {time.perf_counter() - start:.4f}s"
)
raise
return wrapper
return decorator
# Использование
@log_execution_time()
def process_data():
# ... обработка
pass# Установка
brew install lnav # macOS
apt install lnav # Ubuntu
# Использование
lnav /var/log/*.log
lnav app.log
# Функции:
# - Подсветка синтаксиса
# - Автоматическое определение формата
# - SQL запросы к логам
# - Объединение нескольких файлов# Фильтрация
cat app.log | jq 'select(.level == "ERROR")'
# Поиск по полю
cat app.log | jq 'select(.user_id == 123)'
# Агрегация
cat app.log | jq -s 'group_by(.level) | map({level: .[0].level, count: length})'
# Извлечение полей
cat app.log | jq '{timestamp, level, message}'# Поиск ошибок с контекстом
grep -A 5 -B 5 "ERROR" app.log
# Подсчёт ошибок по времени
grep "ERROR" app.log | cut -d' ' -f1 | sort | uniq -c
# Извлечение JSON полей через awk
awk -F'"' '/ERROR/ {print $4}' app.logfrom loguru import logger
# Автоматическая настройка
logger.info("No configuration needed!")
# Встроенная ротация
logger.add("file_{time}.log", rotation="500 MB")
# Встроенный traceback
logger.opt(exception=True).info("Error occurred")
# Контекстные поля
logger.bind(user_id=123).info("User action")from rich.logging import RichHandler
import logging
logging.basicConfig(
level=logging.INFO,
handlers=[RichHandler(rich_tracebacks=True, markup=True)]
)
logger = logging.getLogger(__name__)
logger.info("Красивые логи с подсветкой!")import colorlog
handler = colorlog.ColoredHandler(
fmt='%(log_color)s%(levelname)-8s%(reset)s %(message)s',
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
}
)from prometheus_client import Counter, Histogram
# Метрики для логов
log_errors = Counter('log_errors_total', 'Total log errors', ['level'])
log_duration = Histogram('log_duration_seconds', 'Log duration')
# Подсчёт ошибок
log_errors.labels(level='ERROR').inc()
# В PrometheusQL
# rate(log_errors_total[5m]) > 10 # Алерт при > 10 ошибок/секimport sentry_sdk
import logging
sentry_sdk.init(
dsn="your-sentry-dsn",
integrations=[
sentry_sdk.integrations.logging.LoggingIntegration(
level=logging.INFO, # Capture info and above as breadcrumbs
event_level=logging.ERROR # Send errors as events
)
]
)pip install pytest-logging# conftest.py
import pytest
@pytest.fixture
def assert_logs():
"""Утилита для assertions на логи."""
class LogAssert:
def __init__(self, caplog):
self.caplog = caplog
def contains(self, message, level=None):
for record in self.caplog.records:
if message in record.getMessage():
if level is None or record.levelno == level:
return True
return False
def count(self, message=None, level=None):
count = 0
for record in self.caplog.records:
if message and message not in record.getMessage():
continue
if level and record.levelno != level:
continue
count += 1
return count
return lambda caplog: LogAssert(caplog)
# Тест
def test_logging(assert_logs, caplog):
assert_logs = assert_logs(caplog)
logger.info("Test message")
assert assert_logs.contains("Test message")
assert assert_logs.count(level=logging.INFO) == 1| Инструмент | Назначение | Когда использовать |
|---|---|---|
| caplog | Перехват логов в тестах | Простые unit тесты |
| MagicMock | Mock handlers | Сложные сценарии |
| StringIO | Перехват stdout/stderr | Тестирование formatter |
| QueueListener | Async логирование | Performance тесты |
| pytest-structlog | Тестирование structlog | При использовании structlog |
| pydantic | Валидация JSON схемы | Структурированные логи |
| Проблема | Быстрое решение |
|---|---|
| Логи не выводятся | Проверить getEffectiveLevel() и handlers |
| Дублирование | logger.propagate = False |
| basicConfig игнорируется | basicConfig(..., force=True) |
| Extra поля не видны | Добавить в formatter |
| Тормозит | Использовать QueueHandler |
logger.exception() для ошибокВопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.