Аудит, триггеры, кастомная логика через events
Event listeners SQLAlchemy позволяют внедрять кастомную логику в жизненный цикл ORM: аудит изменений, автоматические timestamp, валидация, кэширование и многое другое.
from sqlalchemy import event
# События ORM (на уровне модели)
@event.listens_for(User, 'before_insert')
def before_insert(mapper, connection, target):
"""Перед вставкой объекта."""
pass
@event.listens_for(User, 'after_update')
def after_update(mapper, connection, target):
"""После обновления объекта."""
pass
# События Session
@event.listens_for(Session, 'before_commit')
def before_commit(session):
"""Перед commit сессии."""
pass
# События Engine
@event.listens_for(Engine, 'before_cursor_execute')
def before_execute(conn, cursor, statement, parameters, context, executemany):
"""Перед выполнением SQL."""
passfrom sqlalchemy import event
from datetime import datetime
@event.listens_for(User, 'before_insert')
def receive_before_insert(mapper, connection, target):
"""
Вызывается перед INSERT.
:param mapper: SQLAlchemy mapper
:param connection: DBAPI connection
:param target: ORM объект который вставляется
"""
# Автоматическая установка timestamp
target.created_at = datetime.utcnow()
target.updated_at = datetime.utcnow()
# Нормализация данных
if target.email:
target.email = target.email.lower().strip()
# Аудит
target.created_by = get_current_user_id() # Из контекста
@event.listens_for(User, 'after_insert')
def receive_after_insert(mapper, connection, target):
"""
Вызывается после INSERT.
Объект уже имеет id и server default значения.
"""
# Отправка уведомления
send_welcome_email(target.email)
# Логирование
print(f"User {target.id} created")from sqlalchemy import event
from datetime import datetime
@event.listens_for(User, 'before_update')
def receive_before_update(mapper, connection, target):
"""Вызывается перед UPDATE."""
target.updated_at = datetime.utcnow()
target.updated_by = get_current_user_id()
@event.listens_for(User, 'after_update')
def receive_after_update(mapper, connection, target):
"""Вызывается после UPDATE."""
# Логирование изменений
print(f"User {target.id} updated")@event.listens_for(User, 'before_delete')
def receive_before_delete(mapper, connection, target):
"""
Вызывается перед DELETE.
Можно предотвратить удаление выбросив исключение.
"""
# Проверка что у пользователя нет активных заказов
if target.has_active_orders():
from sqlalchemy.exc import IntegrityError
raise IntegrityError("Cannot delete user with active orders", None, None)
# Архивирование данных
archive_user_data(target.id)
@event.listens_for(User, 'after_delete')
def receive_after_delete(mapper, connection, target):
"""Вызывается после DELETE."""
# Очистка кэша
clear_user_cache(target.id)
# Логирование
print(f"User {target.id} deleted")@event.listens_for(User, 'init')
def receive_init(target, args, kwargs):
"""
Вызывается при инициализации объекта.
target — новый экземпляр, args/kwargs — аргументы конструктора.
"""
# Установка значений по умолчанию
target.preferences = {'theme': 'light', 'language': 'en'}
# Логирование создания
print(f"New User instance created with args: {args}")@event.listens_for(User, 'load')
def receive_load(target, context):
"""
Вызывается когда объект загружается из БД.
context — QueryContext или None для get()
"""
# Инициализация кэша на объекте
target._cache = {}
# Логирование доступа
print(f"User {target.id} loaded from database")from sqlalchemy import event
@event.listens_for(Session, 'before_commit')
def before_commit(session):
"""
Вызывается перед commit любой транзакции.
Полезно для финальной валидации.
"""
# Проверка что все обязательные поля заполнены
for obj in session.new:
if hasattr(obj, 'email') and not obj.email:
from sqlalchemy.exc import IntegrityError
raise IntegrityError("Email is required", None, None)
# Логирование транзакции
print(f"Committing transaction with {len(session.new)} new objects")
@event.listens_for(Session, 'after_commit')
def after_commit(session):
"""
Вызывается после успешного commit.
"""
# Очистка кэша после изменений
clear_all_caches()
# Отправка событий в очередь
publish_events(session.info.get('pending_events', []))@event.listens_for(Session, 'before_flush')
def before_flush(session, flush_context, instances):
"""
Вызывается перед flush().
Можно модифицировать объекты перед записью.
"""
# Аудит изменений
changes = []
for obj in session.new:
changes.append({
'action': 'create',
'type': type(obj).__name__,
'id': None, # Ещё нет id
})
for obj in session.deleted:
changes.append({
'action': 'delete',
'type': type(obj).__name__,
'id': obj.id,
})
for obj in session.dirty:
changes.append({
'action': 'update',
'type': type(obj).__name__,
'id': obj.id,
'changes': get_changed_fields(obj),
})
session.info['pending_audit'] = changes
@event.listens_for(Session, 'after_flush')
def after_flush(session, flush_context):
"""
Вызывается после flush().
Объекты имеют id (для новых).
"""
# Теперь можно получить id новых объектов
audit_log = session.info.get('pending_audit', [])
for change in audit_log:
if change['action'] == 'create' and change['id'] is None:
# id теперь доступен
pass@event.listens_for(Session, 'after_rollback')
def after_rollback(session):
"""
Вызывается после rollback.
Полезно для очистки ресурсов.
"""
# Очистка временных данных
session.info.clear()
# Логирование
print("Transaction rolled back")from sqlalchemy import event
import time
@event.listens_for(Engine, 'before_cursor_execute')
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
"""Перед выполнением SQL."""
# Сохранение времени начала
context._query_start_time = time.time()
# Логирование медленных запросов
conn.info.setdefault('query_count', [0])
conn.info['query_count'][0] += 1
@event.listens_for(Engine, 'after_cursor_execute')
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
"""После выполнения SQL."""
# Логирование медленных запросов
total = time.time() - context._query_start_time
if total > 1.0: # Больше 1 секунды
print(f"Slow query ({total:.3f}s): {statement}")from sqlalchemy import event
from sqlalchemy.exc import DBAPIError
@event.listens_for(Engine, 'handle_error')
def handle_error(exception_context):
"""
Вызывается при ошибке БД.
exception_context.origin — исходное исключение
"""
# Логирование ошибок
error = exception_context.original_exception
print(f"Database error: {error}")
# Метрики
increment_error_counter(str(type(error)))from sqlalchemy import event, inspect
from datetime import datetime
import json
class AuditLog(Base):
__tablename__ = 'audit_logs'
id: Mapped[int] = mapped_column(primary_key=True)
table_name: Mapped[str] = mapped_column(String(100))
record_id: Mapped[int] = mapped_column(Integer)
action: Mapped[str] = mapped_column(String(10)) # INSERT, UPDATE, DELETE
old_values: Mapped[dict | None] = mapped_column(JSON)
new_values: Mapped[dict | None] = mapped_column(JSON)
user_id: Mapped[int | None] = mapped_column(Integer)
timestamp: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
def get_changed_fields(obj) -> dict:
"""Получить изменённые поля объекта."""
history = get_history(obj)
changes = {}
for attr in inspect(obj).mapper.column_attrs:
key = attr.key
hist = getattr(history, key)
if hist.has_changes():
old_value = hist.deleted[0] if hist.deleted else None
new_value = hist.added[0] if hist.added else None
changes[key] = {'old': old_value, 'new': new_value}
return changes
@event.listens_for(Session, 'before_flush')
def audit_before_flush(session, flush_context, instances):
"""Сбор информации об изменениях перед flush."""
audit_entries = []
# Новые объекты
for obj in session.new:
audit_entries.append({
'table_name': obj.__tablename__,
'record_id': None, # Будет после flush
'action': 'INSERT',
'old_values': None,
'new_values': {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs},
'user_id': get_current_user_id(),
})
# Обновлённые объекты
for obj in session.dirty:
changes = get_changed_fields(obj)
if changes: # Только если есть реальные изменения
audit_entries.append({
'table_name': obj.__tablename__,
'record_id': obj.id,
'action': 'UPDATE',
'old_values': {k: v['old'] for k, v in changes.items()},
'new_values': {k: v['new'] for k, v in changes.items()},
'user_id': get_current_user_id(),
})
# Удалённые объекты
for obj in session.deleted:
audit_entries.append({
'table_name': obj.__tablename__,
'record_id': obj.id,
'action': 'DELETE',
'old_values': {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs},
'new_values': None,
'user_id': get_current_user_id(),
})
session.info['audit_entries'] = audit_entries
@event.listens_for(Session, 'after_flush')
def audit_after_flush(session, flush_context):
"""Сохранение аудита после flush когда id новых объектов известны."""
audit_entries = session.info.get('audit_entries', [])
if not audit_entries:
return
# Обновить id для новых записей
for entry in audit_entries:
if entry['action'] == 'INSERT' and entry['record_id'] is None:
# Найти объект и получить его id
for obj in session.new:
if obj.__tablename__ == entry['table_name']:
entry['record_id'] = obj.id
break
# Сохранить аудит
for entry_data in audit_entries:
audit_log = AuditLog(**entry_data)
session.add(audit_log)from sqlalchemy import event
from datetime import datetime
class SoftDeleteMixin:
"""Mixin для soft delete."""
deleted_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
deleted_by: Mapped[int | None] = mapped_column(Integer, nullable=True)
@property
def is_deleted(self) -> bool:
return self.deleted_at is not None
class Article(Base, SoftDeleteMixin):
__tablename__ = 'articles'
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(Text)
@event.listens_for(Article, 'before_delete')
def soft_delete_article(mapper, connection, target):
"""
Перехватывает delete и превращает в soft delete.
"""
# Отменяем реальное удаление
from sqlalchemy import exc
raise exc.StopValidation("Soft delete in effect")
@event.listens_for(Session, 'before_flush')
def apply_soft_delete(session, flush_context, instances):
"""
Применяет soft delete вместо hard delete.
"""
for obj in session.deleted:
if isinstance(obj, SoftDeleteMixin):
# Отменяем удаление
session.expunge(obj)
# Устанавливаем soft delete флаги
obj.deleted_at = datetime.utcnow()
obj.deleted_by = get_current_user_id()
# Добавляем обратно в сессию для update
session.add(obj)from sqlalchemy import event
from functools import wraps
import redis
redis_client = redis.Redis()
def cache_key(model_name: str, obj_id: int) -> str:
return f"{model_name}:{obj_id}"
@event.listens_for(Session, 'after_commit')
def invalidate_cache_after_commit(session):
"""Инвалидация кэша после изменений."""
keys_to_invalidate = []
# Новые объекты
for obj in session.new:
if hasattr(obj, 'id'):
keys_to_invalidate.append(cache_key(obj.__tablename__, obj.id))
# Обновлённые объекты
for obj in session.dirty:
if hasattr(obj, 'id'):
keys_to_invalidate.append(cache_key(obj.__tablename__, obj.id))
# Удалённые объекты (нужно сохранить id до удаления)
for obj in session.deleted:
if hasattr(obj, 'id'):
keys_to_invalidate.append(cache_key(obj.__tablename__, obj.id))
# Инвалидация
if keys_to_invalidate:
redis_client.delete(*keys_to_invalidate)# Используйте события для кросс-катting concerns (аудит, timestamp)
@event.listens_for(User, 'before_insert')
def set_timestamps(mapper, connection, target):
target.created_at = datetime.utcnow()
# Храните логику простой валидации в @validates
# Сложную валидацию — в before_flush
# Используйте session.info для передачи данных между событиями
session.info['current_user'] = user
# Обрабатывайте ошибки в handle_error
@event.listens_for(Engine, 'handle_error')
def log_error(context):
logger.error(f"DB error: {context.original_exception}")# Не используйте события для бизнес-логики
# Бизнес-логика должна быть явной в сервисах
# Не делайте тяжёлые операции в событиях
@event.listens_for(User, 'after_insert')
def send_email(mapper, connection, target):
send_welcome_email(target.email) # ПЛОХО: блокирует транзакцию
# ХОРОШО: отправить в очередь задач
# Не создавайте циклические зависимости в событиях
# Событие → изменение объекта → flush → событие → ...
# Не полагайтесь на события для критичной валидации
# Дублируйте валидацию на уровне БД (constraints)В следующей теме вы изучите Custom Types — TypeDecorator, кастомные типы данных, JSON с валидацией, шифрование и специализированные типы.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.