PostgreSQL через asyncpg: пул соединений, CRUD, транзакции, prepared statements, миграции через Alembic.
Научитесь подключаться к PostgreSQL асинхронно, использовать пул соединений и транзакции
asyncpg — асинхронный драйвер PostgreSQL для Python. Преимущества:
pip install asyncpgimport asyncpg
async def main():
conn = await asyncpg.connect(
host='localhost',
port=5432,
user='postgres',
password='secret',
database='task_manager'
)
# Выполнение запроса
row = await conn.fetchrow('SELECT NOW()')
print(row[0])
await conn.close()
asyncio.run(main())conn = await asyncpg.connect(
'postgresql://user:password@localhost:5432/database'
)В продакшене используйте пул соединений, а не отдельные подключения:
import asyncpg
async def init_db(app):
"""Создание пула при старте приложения"""
app['db'] = await asyncpg.create_pool(
'postgresql://user:pass@localhost/task_manager',
min_size=5, # Минимум соединений в пуле
max_size=20, # Максимум соединений
command_timeout=60 # Таймаут запросов
)
async def close_db(app):
"""Закрытие пула при остановке"""
await app['db'].close()
# Использование в aiohttp
app = web.Application()
app.on_startup.append(init_db)
app.on_cleanup.append(close_db)async def create_task(conn, title: str, description: str = '') -> dict:
row = await conn.fetchrow(
'''
INSERT INTO tasks (title, description, completed)
VALUES ($1, $2, $3)
RETURNING id, title, description, completed, created_at
''',
title, description, False
)
return dict(row)async def get_task(conn, task_id: int) -> dict | None:
row = await conn.fetchrow(
'SELECT * FROM tasks WHERE id = $1',
task_id
)
return dict(row) if row else None
async def list_tasks(
conn,
page: int = 1,
page_size: int = 20,
status: str | None = None
) -> list[dict]:
offset = (page - 1) * page_size
if status == 'completed':
rows = await conn.fetch(
'''
SELECT * FROM tasks
WHERE completed = true
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
''',
page_size, offset
)
elif status == 'pending':
rows = await conn.fetch(
'''
SELECT * FROM tasks
WHERE completed = false
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
''',
page_size, offset
)
else:
rows = await conn.fetch(
'''
SELECT * FROM tasks
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
''',
page_size, offset
)
return [dict(row) for row in rows]async def update_task(
conn,
task_id: int,
title: str | None = None,
description: str | None = None,
completed: bool | None = None
) -> dict | None:
# Динамическое обновление только указанных полей
updates = []
values = []
if title is not None:
updates.append(f'title = ${len(values) + 1}')
values.append(title)
if description is not None:
updates.append(f'description = ${len(values) + 1}')
values.append(description)
if completed is not None:
updates.append(f'completed = ${len(values) + 1}')
values.append(completed)
if not updates:
return await get_task(conn, task_id)
values.append(task_id)
query = f'''
UPDATE tasks
SET {', '.join(updates)}
WHERE id = ${len(values)}
RETURNING *
'''
row = await conn.fetchrow(query, *values)
return dict(row) if row else Noneasync def delete_task(conn, task_id: int) -> bool:
result = await conn.execute(
'DELETE FROM tasks WHERE id = $1',
task_id
)
# Возвращает 'DELETE 1' если удалена строка
return result == 'DELETE 1'async def transfer_tasks(conn, from_user: int, to_user: int):
async with conn.transaction():
# Все запросы внутри блока выполняются в одной транзакции
await conn.execute(
'UPDATE tasks SET user_id = $1 WHERE user_id = $2',
to_user, from_user
)
await conn.execute(
'INSERT INTO audit_log (action, from_user, to_user) VALUES ($1, $2, $3)',
'transfer_tasks', from_user, to_user
)
# Если будет исключение — транзакция откатитсяasync with conn.transaction(isolation='repeatable_read'):
# Защита от phantom reads
rows = await conn.fetch('SELECT * FROM tasks WHERE user_id = $1', user_id)
# ... обработкаУровни изоляции:
read_committed (по умолчанию)repeatable_readserializableasync def get_user_tasks(app, user_id: int):
pool = app['db']
# Получение соединения из пула
async with pool.acquire() as conn:
tasks = await conn.fetch(
'SELECT * FROM tasks WHERE user_id = $1',
user_id
)
return [dict(t) for t in tasks]
# Соединение автоматически возвращается в пулasync def get_user_with_tasks(app, user_id: int):
pool = app['db']
# Параллельное выполнение независимых запросов
async with pool.acquire() as conn:
user, tasks, stats = await asyncio.gather(
conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id),
conn.fetch('SELECT * FROM tasks WHERE user_id = $1', user_id),
conn.fetchrow(
'SELECT COUNT(*) as total, COUNT(*) FILTER (WHERE completed) as done FROM tasks WHERE user_id = $1',
user_id
)
)
return {
'user': dict(user),
'tasks': [dict(t) for t in tasks],
'stats': dict(stats)
}Используйте Alembic для управления миграциями:
pip install alembic
alembic init alembic# alembic/versions/001_create_tasks_table.py
def upgrade():
op.execute('''
CREATE TABLE tasks (
id SERIAL PRIMARY KEY,
title VARCHAR(200) NOT NULL,
description TEXT,
completed BOOLEAN DEFAULT FALSE,
user_id INTEGER REFERENCES users(id),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
op.execute('CREATE INDEX idx_tasks_user_id ON tasks(user_id)')
op.execute('CREATE INDEX idx_tasks_completed ON tasks(completed)')
def downgrade():
op.drop_table('tasks')from alembic import command
from alembic.config import Config
def run_migrations():
alembic_cfg = Config('alembic.ini')
command.upgrade(alembic_cfg, 'head')
async def init_db(app):
run_migrations()
app['db'] = await asyncpg.create_pool(...)from pydantic import BaseModel, Field
from datetime import datetime
class TaskCreate(BaseModel):
title: str = Field(..., min_length=1, max_length=200)
description: str = Field(default='', max_length=1000)
class Task(BaseModel):
id: int
title: str
description: str
completed: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# В обработчике aiohttp
async def create_task(request):
data = await request.json()
task_create = TaskCreate(**data)
pool = request.app['db']
async with pool.acquire() as conn:
row = await conn.fetchrow(
'''
INSERT INTO tasks (title, description)
VALUES ($1, $2)
RETURNING *
''',
task_create.title,
task_create.description
)
task = Task.model_validate(dict(row))
return web.json_response(task.model_dump())import asyncpg
async def safe_create_task(conn, title: str):
try:
row = await conn.fetchrow(
'INSERT INTO tasks (title) VALUES ($1) RETURNING *',
title
)
return dict(row)
except asyncpg.UniqueViolationError:
# Нарушение уникальности (дубликат)
raise web.HTTPConflict(text='Task already exists')
except asyncpg.ForeignKeyViolationError:
# Несуществующий внешний ключ
raise web.HTTPBadRequest(text='Invalid reference')
except asyncpg.CheckViolationError:
# Нарушение CHECK-ограничения
raise web.HTTPBadRequest(text='Validation failed')
except asyncpg.PostgresError as e:
# Другие ошибки БД
logging.exception(f"Database error: {e}")
raise web.HTTPInternalServerError(text='Database error')# app/repositories/tasks.py
import asyncpg
from typing import Optional
class TaskRepository:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def list(
self,
page: int = 1,
page_size: int = 20,
user_id: Optional[int] = None
) -> list[dict]:
offset = (page - 1) * page_size
async with self.pool.acquire() as conn:
if user_id:
rows = await conn.fetch(
'''
SELECT * FROM tasks
WHERE user_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
''',
user_id, page_size, offset
)
else:
rows = await conn.fetch(
'''
SELECT * FROM tasks
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
''',
page_size, offset
)
return [dict(r) for r in rows]
async def get(self, task_id: int) -> Optional[dict]:
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT * FROM tasks WHERE id = $1',
task_id
)
return dict(row) if row else None
async def create(
self,
title: str,
description: str,
user_id: int
) -> dict:
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'''
INSERT INTO tasks (title, description, user_id)
VALUES ($1, $2, $3)
RETURNING *
''',
title, description, user_id
)
return dict(row)
async def delete(self, task_id: int) -> bool:
async with self.pool.acquire() as conn:
result = await conn.execute(
'DELETE FROM tasks WHERE id = $1',
task_id
)
return result == 'DELETE 1'
# Инициализация в app/main.py
def create_app():
app = web.Application()
async def init_repo(app):
app['db'] = await asyncpg.create_pool(DSN)
app['task_repo'] = TaskRepository(app['db'])
async def close_repo(app):
await app['db'].close()
app.on_startup.append(init_repo)
app.on_cleanup.append(close_repo)
return appУбедитесь, что вы понимаете:
Переходите к вопросам для закрепления.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.