Собираем Task Manager API: структура проекта, репозитории, CRUD-хендлеры, пагинация, фильтрация.
Построим полноценный Task Manager API с CRUD-операциями, пагинацией, фильтрацией и документацией
task_manager/
├── app/
│ ├── __init__.py
│ ├── main.py # Точка входа, создание приложения
│ ├── config.py # Конфигурация
│ ├── routes.py # Настройка роутов
│ ├── middleware/
│ │ ├── __init__.py
│ │ ├── auth.py # Аутентификация
│ │ ├── logging.py # Логирование
│ │ └── error.py # Обработка ошибок
│ ├── handlers/
│ │ ├── __init__.py
│ │ ├── tasks.py # Обработчики задач
│ │ ├── users.py # Обработчики пользователей
│ │ └── auth.py # Логин/регистрация
│ ├── repositories/
│ │ ├── __init__.py
│ │ ├── tasks.py # Доступ к БД для задач
│ │ └── users.py # Доступ к БД для пользователей
│ ├── models/
│ │ ├── __init__.py
│ │ ├── task.py # Pydantic-модели
│ │ └── user.py
│ └── services/
│ ├── __init__.py
│ ├── notifications.py # WebSocket уведомления
│ └── email.py # Email-рассылки
├── migrations/ # Alembic миграции
├── tests/
├── requirements.txt
└── run.py
# app/models/task.py
from pydantic import BaseModel, Field, ConfigDict
from datetime import datetime
from typing import Optional
class TaskBase(BaseModel):
title: str = Field(..., min_length=1, max_length=200)
description: str = Field(default='', max_length=1000)
class TaskCreate(TaskBase):
pass
class TaskUpdate(BaseModel):
title: Optional[str] = Field(None, min_length=1, max_length=200)
description: Optional[str] = Field(None, max_length=1000)
completed: Optional[bool] = None
class Task(TaskBase):
model_config = ConfigDict(from_attributes=True)
id: int
completed: bool = False
user_id: int
created_at: datetime
updated_at: datetime
class TaskList(BaseModel):
tasks: list[Task]
total: int
page: int
page_size: int# app/repositories/tasks.py
import asyncpg
from typing import Optional, List
from datetime import datetime
class TaskRepository:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def list(
self,
user_id: int,
page: int = 1,
page_size: int = 20,
completed: Optional[bool] = None
) -> tuple[List[dict], int]:
"""Список задач с пагинацией и фильтрацией"""
offset = (page - 1) * page_size
async with self.pool.acquire() as conn:
# Построение запроса с условиями
where_clause = 'WHERE user_id = $1'
params = [user_id]
if completed is not None:
where_clause += f' AND completed = ${len(params) + 1}'
params.append(completed)
# Получение задач
query = f'''
SELECT id, title, description, completed, user_id, created_at, updated_at
FROM tasks
{where_clause}
ORDER BY created_at DESC
LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
'''
params.extend([page_size, offset])
rows = await conn.fetch(query, *params)
# Получение общего количества
count_query = f'''
SELECT COUNT(*) FROM tasks {where_clause}
'''
count_row = await conn.fetchrow(count_query, *params[:-2])
total = count_row['count']
return [dict(r) for r in rows], total
async def get(self, task_id: int, user_id: int) -> Optional[dict]:
"""Получить задачу по ID"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'''
SELECT * FROM tasks
WHERE id = $1 AND user_id = $2
''',
task_id, user_id
)
return dict(row) if row else None
async def create(
self,
title: str,
description: str,
user_id: int
) -> dict:
"""Создать задачу"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'''
INSERT INTO tasks (title, description, user_id)
VALUES ($1, $2, $3)
RETURNING *
''',
title, description, user_id
)
return dict(row)
async def update(
self,
task_id: int,
user_id: int,
title: Optional[str] = None,
description: Optional[str] = None,
completed: Optional[bool] = None
) -> Optional[dict]:
"""Обновить задачу"""
updates = []
values = []
if title is not None:
updates.append(f'title = ${len(values) + 1}')
values.append(title)
if description is not None:
updates.append(f'description = ${len(values) + 1}')
values.append(description)
if completed is not None:
updates.append(f'completed = ${len(values) + 1}')
values.append(completed)
if not updates:
return await self.get(task_id, user_id)
values.extend([task_id, user_id])
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
f'''
UPDATE tasks
SET {', '.join(updates)}, updated_at = CURRENT_TIMESTAMP
WHERE id = ${len(values) - 1} AND user_id = ${len(values)}
RETURNING *
''',
*values
)
return dict(row) if row else None
async def delete(self, task_id: int, user_id: int) -> bool:
"""Удалить задачу"""
async with self.pool.acquire() as conn:
result = await conn.execute(
'''
DELETE FROM tasks
WHERE id = $1 AND user_id = $2
''',
task_id, user_id
)
return result == 'DELETE 1'# app/handlers/tasks.py
from aiohttp import web
from ..models.task import TaskCreate, TaskUpdate, Task, TaskList
async def list_tasks(request: web.Request) -> web.Response:
"""GET /api/v1/tasks — список задач"""
# Пагинация и фильтрация
try:
page = int(request.query.get('page', 1))
page_size = int(request.query.get('page_size', 20))
except ValueError:
raise web.HTTPBadRequest(text='Invalid page or page_size')
# Фильтр по статусу
status = request.query.get('status')
completed = None
if status == 'completed':
completed = True
elif status == 'pending':
completed = False
# Получение пользователя из middleware
user = request['user']
# Запрос к репозиторию
repo = request.app['task_repo']
tasks, total = await repo.list(
user_id=user['id'],
page=page,
page_size=page_size,
completed=completed
)
# Формирование ответа
response = TaskList(
tasks=[Task.model_validate(t) for t in tasks],
total=total,
page=page,
page_size=page_size
)
return web.json_response(response.model_dump())
async def get_task(request: web.Request) -> web.Response:
"""GET /api/v1/tasks/{id} — получить задачу"""
try:
task_id = int(request.match_info['id'])
except ValueError:
raise web.HTTPBadRequest(text='Invalid task ID')
user = request['user']
repo = request.app['task_repo']
task = await repo.get(task_id, user['id'])
if not task:
raise web.HTTPNotFound(text='Task not found')
return web.json_response(Task.model_validate(task).model_dump())
async def create_task(request: web.Request) -> web.Response:
"""POST /api/v1/tasks — создать задачу"""
try:
data = await request.json()
task_create = TaskCreate(**data)
except ValueError as e:
raise web.HTTPBadRequest(text=str(e))
user = request['user']
repo = request.app['task_repo']
task = await repo.create(
title=task_create.title,
description=task_create.description,
user_id=user['id']
)
# Уведомление через WebSocket (если есть)
if 'ws_manager' in request.app:
await request.app['ws_manager'].send_to_user(
user['id'],
web.json_response({'type': 'task_created', 'task': task}).text
)
return web.json_response(
Task.model_validate(task).model_dump(),
status=201
)
async def update_task(request: web.Request) -> web.Response:
"""PUT /api/v1/tasks/{id} — обновить задачу"""
try:
task_id = int(request.match_info['id'])
data = await request.json()
task_update = TaskUpdate(**data)
except (ValueError, Exception) as e:
raise web.HTTPBadRequest(text=str(e))
user = request['user']
repo = request.app['task_repo']
task = await repo.update(
task_id=task_id,
user_id=user['id'],
title=task_update.title,
description=task_update.description,
completed=task_update.completed
)
if not task:
raise web.HTTPNotFound(text='Task not found')
return web.json_response(Task.model_validate(task).model_dump())
async def delete_task(request: web.Request) -> web.Response:
"""DELETE /api/v1/tasks/{id} — удалить задачу"""
try:
task_id = int(request.match_info['id'])
except ValueError:
raise web.HTTPBadRequest(text='Invalid task ID')
user = request['user']
repo = request.app['task_repo']
deleted = await repo.delete(task_id, user['id'])
if not deleted:
raise web.HTTPNotFound(text='Task not found')
return web.Response(status=204)
def setup_routes(app: web.Application):
"""Настройка роутов для задач"""
app.router.add_get('/api/v1/tasks', list_tasks)
app.router.add_get('/api/v1/tasks/{id}', get_task)
app.router.add_post('/api/v1/tasks', create_task)
app.router.add_put('/api/v1/tasks/{id}', update_task)
app.router.add_delete('/api/v1/tasks/{id}', delete_task)# app/main.py
import asyncio
import asyncpg
import aiohttp
from aiohttp import web
from .config import settings
from .middleware.auth import auth_middleware
from .middleware.logging import logging_middleware
from .middleware.error import error_handler_middleware
from .repositories.tasks import TaskRepository
from .handlers import tasks, users, auth, websocket
async def init_db(app: web.Application):
"""Инициализация пула соединений с БД"""
app['db'] = await asyncpg.create_pool(
settings.database_url,
min_size=settings.db_min_size,
max_size=settings.db_max_size
)
app['task_repo'] = TaskRepository(app['db'])
print("Database connected")
async def close_db(app: web.Application):
"""Закрытие соединений"""
await app['db'].close()
print("Database closed")
async def init_services(app: web.Application):
"""Инициализация внешних сервисов"""
# HTTP-клиент для внешних API
app['http_session'] = aiohttp.ClientSession()
async def close_services(app: web.Application):
"""Закрытие сервисов"""
await app['http_session'].close()
def create_app() -> web.Application:
"""Создание и настройка приложения"""
app = web.Application()
# Middleware
app.middlewares.append(logging_middleware)
app.middlewares.append(error_handler_middleware)
app.middlewares.append(auth_middleware)
# Хуки жизненного цикла
app.on_startup.append(init_db)
app.on_startup.append(init_services)
app.on_cleanup.append(close_db)
app.on_cleanup.append(close_services)
# Роуты
app.router.add_get('/health', lambda r: web.json_response({'status': 'ok'}))
tasks.setup_routes(app)
users.setup_routes(app)
auth.setup_routes(app)
app.router.add_get('/ws', websocket.websocket_handler)
return app
def main():
"""Точка входа"""
app = create_app()
web.run_app(
app,
host=settings.host,
port=settings.port
)
if __name__ == '__main__':
main()# app/config.py
import os
from dataclasses import dataclass
@dataclass
class Settings:
host: str = os.getenv('HOST', '0.0.0.0')
port: int = int(os.getenv('PORT', 8080))
database_url: str = os.getenv(
'DATABASE_URL',
'postgresql://postgres:postgres@localhost:5432/task_manager'
)
db_min_size: int = int(os.getenv('DB_MIN_SIZE', '5'))
db_max_size: int = int(os.getenv('DB_MAX_SIZE', '20'))
jwt_secret: str = os.getenv('JWT_SECRET', 'dev-secret-key')
jwt_algorithm: str = os.getenv('JWT_ALGORITHM', 'HS256')
debug: bool = os.getenv('DEBUG', 'false').lower() == 'true'
settings = Settings()# run.py
from app.main import main
if __name__ == '__main__':
main()# Запуск
python run.py
# Или через Gunicorn для продакшена
gunicorn -w 4 -k aiohttp.GunicornWebWorker app.main:create_app# Health check
curl http://localhost:8080/health
# Регистрация
curl -X POST http://localhost:8080/api/v1/auth/register \
-H "Content-Type: application/json" \
-d '{"email": "user@example.com", "password": "secret"}'
# Логин
curl -X POST http://localhost:8080/api/v1/auth/login \
-H "Content-Type: application/json" \
-d '{"email": "user@example.com", "password": "secret"}'
# Создать задачу (с токеном)
curl -X POST http://localhost:8080/api/v1/tasks \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_TOKEN" \
-d '{"title": "Learn aiohttp", "description": "Build REST API"}'
# Получить список задач
curl http://localhost:8080/api/v1/tasks \
-H "Authorization: Bearer YOUR_TOKEN"
# С пагинацией
curl "http://localhost:8080/api/v1/tasks?page=1&page_size=10" \
-H "Authorization: Bearer YOUR_TOKEN"
# Фильтр по статусу
curl "http://localhost:8080/api/v1/tasks?status=completed" \
-H "Authorization: Bearer YOUR_TOKEN"Убедитесь, что вы понимаете:
Переходите к вопросам для закрепления.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.