Онлайн-миграции без downtime, кастомные операции, тестирование миграций
Продвинутые техники Alembic для production: онлайн-миграции без downtime, кастомные операции, тестирование миграций и deployment стратегии.
Обычные миграции могут вызвать:
Разделите миграцию на две фазы:
┌─────────────────────────────────────────────────────────┐
│ Фаза 1: EXPAND (добавляем новое) │
│ - Код пишет в старое и новое │
│ - Миграция применяется без downtime │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Фаза 2: CONTRACT (удаляем старое) │
│ - Код переключился на новое │
│ - Удаляем старое после обновления кода │
└─────────────────────────────────────────────────────────┘
# Миграция 1: Expand (revision: expand_001)
"""Add new column name (expand phase)
Revision ID: expand_001
Revises: prev_001
"""
from alembic import op
import sqlalchemy as sa
revision = 'expand_001'
down_revision = 'prev_001'
def upgrade():
# 1. Добавить новую колонку
op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))
# 2. Скопировать данные
op.execute("""
UPDATE users
SET full_name = CONCAT(first_name, ' ', last_name)
WHERE full_name IS NULL
""")
# 3. Создать триггер для синхронизации
op.execute("""
CREATE OR REPLACE FUNCTION sync_user_names()
RETURNS TRIGGER AS $$
BEGIN
NEW.full_name := CONCAT(NEW.first_name, ' ', NEW.last_name);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER sync_names_trigger
BEFORE INSERT OR UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION sync_user_names();
""")
def downgrade():
op.execute("DROP TRIGGER IF EXISTS sync_names_trigger ON users")
op.execute("DROP FUNCTION IF EXISTS sync_user_names()")
op.drop_column('users', 'full_name')
# Миграция 2: Contract (revision: contract_001)
"""Remove old columns (contract phase)
Revision ID: contract_001
Revises: expand_001
"""
revision = 'contract_001'
down_revision = 'expand_001'
def upgrade():
# 1. Удалить триггер
op.execute("DROP TRIGGER IF EXISTS sync_names_trigger ON users")
op.execute("DROP FUNCTION IF EXISTS sync_user_names()")
# 2. Удалить старые колонки
op.drop_column('users', 'first_name')
op.drop_column('users', 'last_name')
# 3. Сделать новую колонку NOT NULL
op.alter_column('users', 'full_name', nullable=False)
def downgrade():
# Восстановить старые колонки
op.add_column('users', sa.Column('first_name', sa.String(100), nullable=True))
op.add_column('users', sa.Column('last_name', sa.String(100), nullable=True))
# Заполнить данными
op.execute("""
UPDATE users
SET first_name = SPLIT_PART(full_name, ' ', 1),
last_name = SPLIT_PART(full_name, ' ', 2)
""")
op.alter_column('users', 'full_name', new_column_name='temp_name')
op.drop_column('users', 'first_name')
op.drop_column('users', 'last_name')"""Add not null column safely
Revision ID: safe_notnull_001
Revises: prev_001
"""
from alembic import op
import sqlalchemy as sa
revision = 'safe_notnull_001'
down_revision = 'prev_001'
BATCH_SIZE = 10000
def upgrade():
# 1. Добавить как nullable с default
op.add_column('users', sa.Column(
'age', sa.Integer(), nullable=True, server_default='0'
))
# 2. Заполнить существующие записи чанками
connection = op.get_bind()
while True:
result = connection.execute(sa.text(f"""
UPDATE users
SET age = 18
WHERE age = 0
AND id IN (
SELECT id FROM users
WHERE age = 0
ORDER BY id
LIMIT {BATCH_SIZE}
)
"""))
if result.rowcount < BATCH_SIZE:
break
# 3. Убрать default
op.alter_column('users', 'age', server_default=None)
# 4. Сделать NOT NULL
op.alter_column('users', 'age', nullable=False)
def downgrade():
op.drop_column('users', 'age')# alembic/custom_ops.py
from alembic.operations import Operations, MigrateOperation
from typing import List
@Operations.register_operation("create_enum", "invoke_create_enum")
class CreateEnumOp(MigrateOperation):
"""Операция для создания ENUM типа в PostgreSQL."""
def __init__(self, name: str, values: List[str]):
self.name = name
self.values = values
@classmethod
def invoke_create_enum(cls, operations: Operations, name: str, values: List[str]):
op = CreateEnumOp(name, values)
return operations.invoke(op)
@Operations.implementation_for(CreateEnumOp)
def create_enum_impl(operations: Operations, operation: CreateEnumOp):
"""Реализация создания ENUM."""
values_str = ', '.join(f"'{v}'" for v in operation.values)
operations.execute(
f"CREATE TYPE {operation.name} AS ENUM ({values_str})"
)
@Operations.register_operation("drop_enum", "invoke_drop_enum")
class DropEnumOp(MigrateOperation):
"""Операция для удаления ENUM типа."""
def __init__(self, name: str):
self.name = name
@classmethod
def invoke_drop_enum(cls, operations: Operations, name: str):
op = DropEnumOp(name)
return operations.invoke(op)
@Operations.implementation_for(DropEnumOp)
def drop_enum_impl(operations: Operations, operation: DropEnumOp):
"""Реализация удаления ENUM."""
operations.execute(f"DROP TYPE IF EXISTS {operation.name}")
@Operations.register_operation("create_index_concurrently", "invoke_create_index_concurrently")
class CreateIndexConcurrentlyOp(MigrateOperation):
"""Создание индекса CONCURRENTLY (без блокировки таблицы)."""
def __init__(self, index_name: str, table_name: str, columns: List[str]):
self.index_name = index_name
self.table_name = table_name
self.columns = columns
@classmethod
def invoke_create_index_concurrently(
cls, operations: Operations,
index_name: str, table_name: str, columns: List[str]
):
op = CreateIndexConcurrentlyOp(index_name, table_name, columns)
return operations.invoke(op)
@Operations.implementation_for(CreateIndexConcurrentlyOp)
def create_index_concurrently_impl(
operations: Operations,
operation: CreateIndexConcurrentlyOp
):
"""Реализация CONCURRENTLY индекса."""
columns_str = ', '.join(operation.columns)
operations.execute(
f"CREATE INDEX CONCURRENTLY {operation.index_name} "
f"ON {operation.table_name} ({columns_str})"
)"""Add status enum and index
Revision ID: custom_ops_001
Revises: prev_001
"""
from alembic import op
import sqlalchemy as sa
# Импортируем кастомные операции (регистрация происходит при импорте)
import alembic.custom_ops # noqa
revision = 'custom_ops_001'
down_revision = 'prev_001'
def upgrade():
# Кастомная операция для ENUM
op.create_enum('user_status', ['active', 'inactive', 'banned', 'pending'])
op.create_table(
'users',
sa.Column('id', sa.Integer(), primary_key=True),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('status', sa.Enum('active', 'inactive', 'banned', 'pending', name='user_status')),
)
# Кастомная операция для CONCURRENTLY индекса
op.create_index_concurrently('ix_users_email', 'users', ['email'])
def downgrade():
op.drop_index('ix_users_email', table_name='users')
op.drop_table('users')
op.drop_enum('user_status')# 1. Создать свежую тестовую БД
createdb test_migrations
# 2. Применить все миграции
alembic upgrade head
# 3. Проверить структуру
psql test_migrations -c "\d users"
# 4. Откатить и применить снова
alembic downgrade -1
alembic upgrade +1
# 5. Полный цикл
alembic downgrade base
alembic upgrade head# tests/test_migrations.py
import pytest
from alembic import command
from alembic.config import Config
from sqlalchemy import inspect, text
@pytest.fixture
def alembic_config(db_engine):
"""Конфигурация Alembic для тестовой БД."""
config = Config("alembic.ini")
config.set_main_option("sqlalchemy.url", str(db_engine.url))
return config
@pytest.fixture
def alembic_upgraded(alembic_config):
"""Применить все миграции перед тестом."""
command.upgrade(alembic_config, "head")
yield alembic_config
# Откатить после теста
command.downgrade(alembic_config, "base")
def test_all_migrations_apply(alembic_upgraded):
"""Тест что все миграции применяются без ошибок."""
inspector = inspect(alembic_upgraded.get_section('sqlalchemy.url'))
tables = inspector.get_table_names()
assert 'users' in tables
assert 'posts' in tables
def test_migration_downgrade(alembic_config, db_engine):
"""Тест что downgrade работает."""
# Применить все
command.upgrade(alembic_config, "head")
# Откатить на одну
command.downgrade(alembic_config, "-1")
# Проверить что таблица удалена
inspector = inspect(db_engine)
tables = inspector.get_table_names()
assert 'last_migration_table' not in tables
def test_data_migration(alembic_upgraded, db_engine):
"""Тест data migration."""
with db_engine.connect() as conn:
# Вставить тестовые данные
conn.execute(text("""
INSERT INTO users (email, name)
VALUES ('test@example.com', 'Test User')
"""))
conn.commit()
# Применить миграцию
command.upgrade(alembic_config, "next_revision")
# Проверить результат
result = conn.execute(text("""
SELECT full_name FROM users WHERE email = 'test@example.com'
"""))
full_name = result.scalar()
assert full_name == 'Test User'# .gitlab-ci.yml
test_migrations:
stage: test
services:
- postgres:15
variables:
DATABASE_URL: postgresql://postgres:postgres@postgres/test_db
script:
# Установить зависимости
- pip install -r requirements.txt
# Запустить тесты миграций
- pytest tests/test_migrations.py -v
# Проверить что все миграции применяются
- alembic upgrade head
# Проверить downgrade
- alembic downgrade base
- alembic upgrade head┌─────────────────────────────────────────────────────────┐
│ Blue (active) │ Green (standby) │
│ - Код версии 1.0 │ - Код версии 2.0 │
│ - БД версия N │ - БД версия N+1 │
└─────────────────────────────────────────────────────────┘
↓ Переключение
┌─────────────────────────────────────────────────────────┐
│ Blue (standby) │ Green (active) │
│ - Код версии 1.0 │ - Код версии 2.0 │
│ - БД версия N │ - БД версия N+1 │
└─────────────────────────────────────────────────────────┘
# 1. Применить миграции (expand phase)
alembic upgrade head
# 2. Обновлять поды по одному
for pod in pods:
kubectl rollout restart deployment/app
kubectl rollout status deployment/app
# 3. Применить contract phase (отдельная миграция)
alembic upgrade +1## Pre-deployment
- [ ] Миграции протестированы на staging
- [ ] Downgrade протестирован
- [ ] Резервная копия БД создана
- [ ] Миграции разбиты на expand/contract если нужно
- [ ] Индексы создаются CONCURRENTLY
## Во время deployment
- [ ] Метрики БД мониторятся (CPU, connections, locks)
- [ ] Есть план отката
- [ ] Команда на связи
## Post-deployment
- [ ] Миграции применены успешно
- [ ] Приложение работает корректно
- [ ] Метрики в норме
- [ ] Резервная копия после миграции# scripts/check_migration_health.py
from sqlalchemy import create_engine, text
def check_migration_health(database_url: str):
engine = create_engine(database_url)
with engine.connect() as conn:
# Проверка блокировок
result = conn.execute(text("""
SELECT
pg_locks.locktype,
pg_locks.mode,
pg_stat_activity.query,
pg_stat_activity.state
FROM pg_locks
JOIN pg_stat_activity ON pg_locks.pid = pg_stat_activity.pid
WHERE pg_locks.granted = false
"""))
blocking = result.fetchall()
if blocking:
print(f"WARNING: {len(blocking)} блокировок ожидают")
for row in blocking:
print(f" {row}")
# Проверка размера БД
result = conn.execute(text("""
SELECT pg_size_pretty(pg_database_size(current_database()))
"""))
size = result.scalar()
print(f"Размер БД: {size}")
# Проверка alembic_version
result = conn.execute(text("SELECT version_num FROM alembic_version"))
version = result.scalar()
print(f"Версия миграции: {version}")# Используйте expand/contract для production
# Фаза 1: добавить новое
# Фаза 2: удалить старое (отдельная миграция)
# Создавайте индексы CONCURRENTLY в production
op.execute("CREATE INDEX CONCURRENTLY ...")
# Разбивайте data migrations на чанки
BATCH_SIZE = 10000
for offset in range(0, total, BATCH_SIZE):
...
# Тестируйте downgrade
alembic upgrade head
alembic downgrade -1
alembic upgrade +1
# Делайте резервную копию перед production миграцией
pg_dump mydb > backup.sql# Не блокируйте таблицу надолго в production
def upgrade():
# ПЛОХО: блокирует таблицу
op.execute("UPDATE huge_table SET column = value")
# ХОРОШО: чанками
for offset in range(0, total, BATCH_SIZE):
op.execute(f"UPDATE ... LIMIT {BATCH_SIZE} OFFSET {offset}")
# Не применяйте миграции без тестирования на staging
# Не удаляйте данные без возможности восстановления
op.execute("DELETE FROM users WHERE inactive = true") # ПЛОХО!
# Не используйте ORM модели в миграциях
from myapp.models import User # ПЛОХО
# Используйте Core API
users = sa.sql.table('users', ...)В следующей теме вы изучите Testing Strategies — transactional тесты, фикстуры, изоляция тестов и лучшие практики тестирования кода с SQLAlchemy.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.