Transaction management, isolation levels, session patterns
Session — это центральный паттерн SQLAlchemy, реализующий Unit of Work. Глубокое понимание сессий критически важно для правильной работы с ORM.
Unit of Work — это паттерн который:
with Session(engine) as session:
# Создание
user = User(email='test@example.com', name='Test')
session.add(user)
# Обновление
user.name = 'Updated' # Отслеживается
# Удаление
session.delete(some_old_user)
# При commit все изменения синхронизируются
session.commit()
# 1. INSERT для новых объектов
# 2. UPDATE для изменённых
# 3. DELETE для удалённых# Автоматическое определение порядка
user = User(email='test@example.com')
post = Post(title='Test', author=user) # Foreign key на user
session.add(post) # SQLAlchemy знает что нужно сначала INSERT user
session.commit()
# 1. INSERT INTO users ... RETURNING id
# 2. INSERT INTO posts (user_id=1) ...from sqlalchemy import inspect
user = User(email='test@example.com')
inspector = inspect(user)
# transient — объект не в сессии, нет id
print(inspector.transient) # True
session.add(user)
# pending — объект в сессии, ещё не закоммичен
print(inspector.pending) # True
session.commit()
# persistent — объект в БД и в сессии
print(inspector.persistent) # True
session.expunge(user)
# detached — объект был в БД, но сессия закрыта
print(inspector.detached) # Truetransient → add() → pending → commit() → persistent
↓
expunge()/close() → detached
detached → add() → persistent (reattach)
from sqlalchemy.orm import sessionmaker, Session
# Фабрика сессий
SessionLocal = sessionmaker(
bind=engine,
autoflush=False, # Не делать flush перед query
autocommit=False, # Не коммитить автоматически
expire_on_commit=True, # Истекать атрибуты после commit
)
# Использование
session = SessionLocal()
try:
# работа
session.commit()
except:
session.rollback()
raise
finally:
session.close()with Session(engine) as session:
try:
user = User(email='test@example.com')
session.add(user)
session.commit()
except Exception:
session.rollback()
raise
# session.close() автоматическиfrom sqlalchemy.orm import scoped_session
# Один session на поток
Session = scoped_session(sessionmaker(bind=engine))
# В любом месте кода
session = Session()
user = session.query(User).first()
# В конце запроса (например, в middleware)
Session.remove() # Закрыть сессию# Явная транзакция
with Session(engine) as session:
with session.begin():
user = User(email='test@example.com')
session.add(user)
# commit автоматически при выходе
# rollback при исключенииwith Session(engine) as session:
with session.begin():
# Внешняя транзакция
user = User(email='test@example.com')
session.add(user)
try:
with session.begin_nested():
# Внутренняя транзакция (savepoint)
post = Post(title='Test', user_id=user.id)
session.add(post)
# rollback только этой части при ошибке
except Exception:
pass # Внешняя транзакция продолжается
# commit внешней транзакции# На уровне engine
engine = create_engine(
'postgresql://...',
isolation_level='READ COMMITTED', # По умолчанию в PostgreSQL
# isolation_level='REPEATABLE READ',
# isolation_level='SERIALIZABLE',
)
# На уровне сессии
with session.begin(isolation_level='SERIALIZABLE'):
# Строгая изоляция для критичных операций
pass| Уровень | Dirty Read | Non-repeatable Read | Phantom Read |
|---|---|---|---|
| READ UNCOMMITTED | Возможно | Возможно | Возможно |
| READ COMMITTED | Нет | Возможно | Возможно |
| REPEATABLE READ | Нет | Нет | Возможно |
| SERIALIZABLE | Нет | Нет | Нет |
with Session(engine) as session:
user = User(email='test@example.com')
session.add(user)
# Flush отправляет изменения в БД но не коммитит
session.flush()
# id теперь доступен (PostgreSQL RETURNING)
print(user.id) # 1
# Можно использовать id для других объектов
post = Post(title='Test', user_id=user.id)
session.add(post)
session.commit() # Фиксирует транзакцию# Когда нужен id перед commit
user = User(email='test@example.com')
session.add(user)
session.flush() # Получить id
# Создать связанные объекты
for i in range(10):
post = Post(title=f'Post {i}', user_id=user.id)
session.add(post)
session.commit()# По умолчанию expire_on_commit=True
SessionLocal = sessionmaker(engine, expire_on_commit=True)
with SessionLocal() as session:
user = session.get(User, 1)
print(user.name) # 'Alice'
session.commit()
# Атрибуты 'истекли' после commit
print(user.name) # Загружается из БД при доступе# Принудительно истереть атрибуты
session.expire(user)
# Доступ вызовет загрузку из БД
print(user.name) # SELECT из БД
# Истереть конкретные атрибуты
session.expire(user, ['name', 'email'])# Перезагрузить объект из БД
session.refresh(user)
# Все атрибуты загружаются немедленно
print(user.name) # Уже загружено
# Refresh с конкретными колонками
session.refresh(user, attribute_names=['name', 'email'])# Detached объект
user = User(id=1, email='new@example.com')
# Merge возвращает persistent копию
persistent_user = session.merge(user)
# Изменения будут закоммичены
session.commit()# Удалить объект из сессии (сделать detached)
user = session.get(User, 1)
session.expunge(user)
# Объект больше не отслеживается
user.name = 'Updated' # Не сохранится в БД# Закрыть сессию (все объекты становятся detached)
session.close()
# Но session можно использовать снова
session = SessionLocal() # Resetclass User(Base):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(primary_key=True)
posts: Mapped[list["Post"]] = relationship(
back_populates="author",
cascade="all, delete-orphan", # Каскад
)| Каскад | Описание |
|---|---|
save-update | add() каскадируется на связанные объекты |
merge | merge() каскадируется |
refresh-expire | refresh/expire каскадируются |
expunge | expunge() каскадируется |
delete | delete() каскадируется |
all | Все вышеперечисленные |
delete-orphan | Удаление осиротевших объектов |
user = session.get(User, 1)
# Удалить все посты из коллекции
user.posts = []
session.commit() # Посты удалены из БД (orphan)
# Или удалить пользователя
session.delete(user)
session.commit() # Посты тоже удалены (cascade delete)from sqlalchemy import event
@event.listens_for(Session, 'before_commit')
def before_commit(session):
"""Перед commit."""
print("Before commit")
@event.listens_for(Session, 'after_commit')
def after_commit(session):
"""После commit."""
print("After commit")
@event.listens_for(Session, 'after_rollback')
def after_rollback(session):
"""После rollback."""
print("After rollback")
@event.listens_for(Session, 'transient_to_pending')
def on_transient_to_pending(target, context):
"""Объект добавлен в сессию."""
print(f"Added: {target}")
@event.listens_for(Session, 'pending_to_persistent')
def on_pending_to_persistent(target, context):
"""Объект сохранён в БД."""
print(f"Saved: {target}")# Используйте context manager
with Session(engine) as session:
session.commit()
# Обрабатывайте ошибки
try:
with session.begin():
# работа
pass
except Exception:
await session.rollback()
raise
# Используйте expire_on_commit=False для async
async_session_maker = async_sessionmaker(
engine,
expire_on_commit=False,
)
# Закрывайте сессии
session.close()
# Используйте flush() когда нужен id перед commit
session.add(obj)
session.flush() # Получить id
session.commit()# Не используйте сессию без context manager
session = Session(engine) # ПЛОХО: может не закрыться
# Не забывайте rollback при ошибке
try:
session.commit() # ПЛОХО: нет rollback
except:
pass
# Не используйте detached объекты
session.close()
user.name = 'Updated' # ПЛОХО: не сохранится
session.add(user) # Нужно для reattach
# Не создавайте сессию на каждый запрос
def get_user(user_id):
session = Session(engine) # ПЛОХО: нет пула
return session.get(User, user_id)В следующей теме вы изучите Advanced Alembic — онлайн-миграции без downtime, кастомные операции, тестирование миграций и production deployment стратегии.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.