Ветвление, data migrations, кастомные операции
В этой теме вы изучите продвинутые возможности Alembic: ветвление миграций, data migrations, кастомные операции и стратегии онлайн-миграций без downtime.
Когда два разработчика создают миграции одновременно, возникает ветвление:
rev1 → rev2 → rev3 (feature-a)
↘ rev4 (feature-b) # CONFLICT!
# Показать все heads (может быть несколько при ветвлении)
alembic heads
# Показать историю с ветвлением
alembic history --treeВывод при ветвлении:
rev1 → rev2 (head)
↘ rev3 (head) # Multiple heads detected!
# Alembic автоматически предложит создать merge
alembic revision -m "merge feature-a and feature-b"
# Или явно указать слияние
alembic revision --head=rev2 --head=rev3 -m "merge""""merge feature-a and feature-b
Revision ID: merge123
Revises: rev2, rev3
Create Date: 2024-01-15 10:00:00
"""
from alembic import op
revision = 'merge123'
down_revision = ('rev2', 'rev3') # Кортеж предыдущих ревизий
def upgrade():
pass # Merge не содержит изменений
def downgrade():
passalembic upgrade head перед созданием миграцииМиграции данных изменяют данные, а не схему.
"""Add full_name column and migrate data
Revision ID: data123
Revises: prev123
Create Date: 2024-01-15 10:00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
revision = 'data123'
down_revision = 'prev123'
def upgrade():
# 1. Добавить колонку
op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))
# 2. Заполнить данные
conn = op.get_bind()
conn.execute(
sa.text("""
UPDATE users
SET full_name = CONCAT(first_name, ' ', last_name)
WHERE full_name IS NULL
""")
)
# 3. Сделать колонку NOT NULL (опционально)
op.alter_column('users', 'full_name', nullable=False)
def downgrade():
op.drop_column('users', 'full_name')⚠️ Предупреждение: Используйте ORM в миграциях осторожно. Модели меняются, а миграции должны работать вечно.
"""Migrate user roles
Revision ID: role123
Revises: prev123
"""
from alembic import op
import sqlalchemy as sa
revision = 'role123'
down_revision = 'prev123'
def upgrade():
# Создаём временную таблицу для работы с ORM
connection = op.get_bind()
# Используем SQLAlchemy Core для безопасности
users = sa.sql.table('users',
sa.sql.column('id', sa.Integer),
sa.sql.column('email', sa.String),
sa.sql.column('role', sa.String),
)
# Массовое обновление
connection.execute(
users.update()
.where(users.c.email.like('%@admin.com'))
.values(role='admin')
)
connection.execute(
users.update()
.where(users.c.email.notlike('%@admin.com'))
.values(role='user')
)
def downgrade():
connection = op.get_bind()
users = sa.sql.table('users',
sa.sql.column('id', sa.Integer),
sa.sql.column('role', sa.String),
)
connection.execute(
users.update().values(role=None)
)"""Migrate large table in chunks
Revision ID: chunk123
Revises: prev123
"""
from alembic import op
import sqlalchemy as sa
revision = 'chunk123'
down_revision = 'prev123'
BATCH_SIZE = 10000
def upgrade():
connection = op.get_bind()
# Получаем количество записей
result = connection.execute(
sa.text("SELECT COUNT(*) FROM users WHERE processed = false")
)
total = result.scalar()
# Обрабатываем чанками
offset = 0
while offset < total:
connection.execute(
sa.text("""
UPDATE users
SET processed = true,
updated_at = NOW()
WHERE id IN (
SELECT id FROM users
WHERE processed = false
ORDER BY id
LIMIT :limit OFFSET :offset
)
"""),
{"limit": BATCH_SIZE, "offset": offset}
)
offset += BATCH_SIZE
print(f"Processed {offset} records")
def downgrade():
connection = op.get_bind()
connection.execute(
sa.text("UPDATE users SET processed = false")
)# alembic/custom_ops.py
from alembic.operations import Operations, MigrateOperation
@Operations.register_operation("create_enum", "invoke_create_enum")
class CreateEnumOp(MigrateOperation):
def __init__(self, name, values):
self.name = name
self.values = values
@classmethod
def invoke_create_enum(cls, operations, name, values):
op = CreateEnumOp(name, values)
return operations.invoke(op)
@Operations.implementation_for(CreateEnumOp)
def create_enum(operations, operation):
# PostgreSQL specific
values_str = ', '.join(f"'{v}'" for v in operation.values)
operations.execute(
f"CREATE TYPE {operation.name} AS ENUM ({values_str})"
)
@Operations.register_operation("drop_enum", "invoke_drop_enum")
class DropEnumOp(MigrateOperation):
def __init__(self, name):
self.name = name
@classmethod
def invoke_drop_enum(cls, operations, name):
op = DropEnumOp(name)
return operations.invoke(op)
@Operations.implementation_for(DropEnumOp)
def drop_enum(operations, operation):
operations.execute(f"DROP TYPE {operation.name}")"""Add status enum
Revision ID: enum123
Revises: prev123
"""
from alembic import op
# Импортируем кастомные операции
import alembic_custom_ops # noqa
revision = 'enum123'
down_revision = 'prev123'
def upgrade():
# Используем кастомную операцию
op.create_enum('user_status', ['active', 'inactive', 'banned'])
op.create_table(
'users',
sa.Column('id', sa.Integer(), primary_key=True),
sa.Column('status', sa.Enum('active', 'inactive', 'banned', name='user_status')),
)
def downgrade():
op.drop_table('users')
op.drop_enum('user_status')Для production без downtime используйте двухфазные миграции:
# Фаза 1: Expand (добавляем новое, не удаляя старое)
"""Add new_column (expand phase)
Revision ID: expand123
Revises: prev123
"""
def upgrade():
# 1. Добавить новую колонку как nullable
op.add_column('users', sa.Column('new_email', sa.String(255), nullable=True))
# 2. Заполнить данные (фоновый процесс)
# Можно вынести в отдельную миграцию
# 3. Создать триггер для синхронизации (опционально)
op.execute("""
CREATE TRIGGER sync_emails
BEFORE INSERT OR UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION sync_email_columns()
""")
def downgrade():
op.drop_column('users', 'new_email')
# Фаза 2: Contract (удаляем старое)
"""Remove old_column (contract phase)
Revision ID: contract123
Revises: expand123
"""
def upgrade():
# 1. Удалить триггер
op.execute("DROP TRIGGER IF EXISTS sync_emails ON users")
# 2. Удалить старую колонку
op.drop_column('users', 'old_email')
# 3. Переименовать новую (если нужно)
op.alter_column('users', 'new_email', new_column_name='email')
def downgrade():
# Восстановить старую структуру
op.add_column('users', sa.Column('old_email', sa.String(255)))
op.alter_column('users', 'email', new_column_name='new_email')"""Add not null column safely
Revision ID: notnull123
Revises: prev123
"""
def upgrade():
# 1. Добавить как nullable с default
op.add_column('users', sa.Column('age', sa.Integer(), nullable=True, server_default='0'))
# 2. Заполнить существующие записи
op.execute("UPDATE users SET age = 18 WHERE age IS NULL")
# 3. Убрать default
op.alter_column('users', 'age', server_default=None)
# 4. Сделать NOT NULL
op.alter_column('users', 'age', nullable=False)
def downgrade():
op.drop_column('users', 'age')"""Rename column safely
Revision ID: rename123
Revises: prev123
"""
def upgrade():
# 1. Добавить новую колонку
op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))
# 2. Скопировать данные
op.execute("UPDATE users SET full_name = CONCAT(first_name, ' ', last_name)")
# 3. Создать триггер для синхронизации (на время перехода)
op.execute("""
CREATE TRIGGER sync_names
BEFORE INSERT OR UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION sync_name_columns()
""")
# 4. В коде приложения: писать в обе колонки
# 5. (Отдельная миграция) Удалить старую колонку
# op.execute("DROP TRIGGER sync_names ON users")
# op.drop_column('users', 'first_name')
# op.drop_column('users', 'last_name')
def downgrade():
op.drop_column('users', 'full_name')# 1. Создать свежую БД
createdb test_migration
# 2. Применить все миграции
alembic upgrade head
# 3. Проверить структуру
psql test_migration -c "\d users"
# 4. Откатить и применить снова
alembic downgrade -1
alembic upgrade +1
# 5. Откатить всё и применить
alembic downgrade base
alembic upgrade head# tests/test_migrations.py
import pytest
from alembic import command
from alembic.config import Config
from sqlalchemy import inspect
@pytest.fixture
def alembic_config(db_engine):
config = Config("alembic.ini")
config.set_main_option("sqlalchemy.url", str(db_engine.url))
return config
@pytest.fixture
def alembic_upgraded(alembic_config):
"""Применить все миграции перед тестом."""
command.upgrade(alembic_config, "head")
yield alembic_config
# Откатить после теста
command.downgrade(alembic_config, "base")
def test_migration_upgrade(alembic_upgraded):
"""Тест что миграции применяются без ошибок."""
inspector = inspect(alembic_upgraded.get_section('sqlalchemy.url'))
tables = inspector.get_table_names()
assert 'users' in tables
assert 'posts' in tables
def test_migration_downgrade(alembic_config, db_engine):
"""Тест что downgrade работает."""
# Применить
command.upgrade(alembic_config, "head")
# Откатить на одну
command.downgrade(alembic_config, "-1")
# Проверить что таблица удалена
inspector = inspect(db_engine)
tables = inspector.get_table_names()
assert 'last_migration_table' not in tables# Разделяйте schema и data migrations
def upgrade():
# Schema change
op.add_column('users', sa.Column('age', sa.Integer()))
# Data migration в отдельной ревизии или чанками
def upgrade():
# Process in batches
for offset in range(0, total, BATCH_SIZE):
connection.execute(...)
# Используйте expand/contract для production
# Фаза 1: добавить новое
# Фаза 2: удалить старое (отдельная миграция)
# Тестируйте downgrade
alembic upgrade head
alembic downgrade -1
alembic upgrade +1# Не удаляйте данные без возможности восстановления
def upgrade():
op.execute("DELETE FROM users WHERE inactive = true") # ПЛОХО!
# Не блокируйте таблицу надолго в production
def upgrade():
# ПЛОХО: блокирует таблицу на время обновления
op.execute("UPDATE huge_table SET column = value")
# ХОРОШО: чанками
for offset in range(0, total, BATCH_SIZE):
op.execute(f"UPDATE ... LIMIT {BATCH_SIZE} OFFSET {offset}")
# Не используйте ORM модели напрямую в миграциях
from myapp.models import User # ПЛОХО: модель может измениться
# Используйте Core API
users = sa.sql.table('users', ...)В следующей теме вы изучите Async SQLAlchemy — асинхронную работу с базой данных через asyncpg и aiomysql, паттерны async-кода и transaction management.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.