BackgroundTasks, отложенное выполнение, уведомления
Background Tasks позволяют выполнять код после отправки ответа клиенту. В этой теме вы научитесь отправлять email, обрабатывать файлы и выполнять отложенные задачи.
Background Tasks — задачи, выполняемые после отправки ответа клиенту. Полезны для операций, не требующих немедленного результата.
| Операция | Background Task | Почему |
|---|---|---|
| Отправка email | ✅ | Клиент не должен ждать SMTP |
| Обработка изображения | ✅ | Долгая CPU-операция |
| Генерация отчёта | ✅ | Может занимать минуты |
| Уведомления | ✅ | Не блокирует ответ |
| Запись в БД | ❌ | Должна быть до ответа |
| Валидация данных | ❌ | Должна быть до ответа |
from fastapi import FastAPI, BackgroundTasks
import time
app = FastAPI()
def write_log(message: str):
"""
Синхронная функция для логирования.
Выполняется в фоне.
"""
with open('log.txt', 'a') as f:
f.write(f"{message}\n")
@app.post('/send-email')
async def send_email(
email: str,
background_tasks: BackgroundTasks
):
"""
Отправляет email и возвращает ответ сразу.
"""
# Добавляем задачу в фон
background_tasks.add_task(write_log, f"Email sent to {email}")
# Возвращаем ответ сразу
return {'message': f'Email queued for {email}'}1. Клиент → POST /send-email
↓
2. Сервер → Добавляет задачу в background_tasks
↓
3. Сервер → Возвращает ответ клиенту
↓
4. Клиент получает ответ (200 OK)
↓
5. Сервер → Выполняет фоновую задачу (write_log)
pip install fastapi-mailfrom fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, EmailStr
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig
from typing import List
import os
app = FastAPI()
# Конфигурация
conf = ConnectionConfig(
MAIL_USERNAME=os.environ.get('MAIL_USERNAME'),
MAIL_PASSWORD=os.environ.get('MAIL_PASSWORD'),
MAIL_FROM='noreply@example.com',
MAIL_PORT=587,
MAIL_SERVER='smtp.gmail.com',
MAIL_FROM_NAME='My App',
MAIL_STARTTLS=True,
MAIL_SSL_TLS=False
)
class EmailRequest(BaseModel):
email: EmailStr
subject: str
body: str
async def send_email_task(
email: str,
subject: str,
body: str
):
"""
Фоновая задача для отправки email.
"""
message = MessageSchema(
subject=subject,
recipients=[email],
body=body,
subtype='plain'
)
fm = FastMail(conf)
await fm.send_message(message)
@app.post('/send-email')
async def send_email(
request: EmailRequest,
background_tasks: BackgroundTasks
):
"""
Отправляет email через фоновую задачу.
"""
# Добавляем задачу
background_tasks.add_task(
send_email_task,
request.email,
request.subject,
request.body
)
return {'message': f'Email queued for {request.email}'}from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from pathlib import Path
import aiofiles
from PIL import Image
import io
app = FastAPI()
UPLOAD_DIR = Path('uploads')
UPLOAD_DIR.mkdir(exist_ok=True)
def process_image(file_path: Path, output_path: Path):
"""
Обрабатывает изображение (синхронно).
"""
with open(file_path, 'rb') as f:
image = Image.open(io.BytesIO(f.read()))
# Изменение размера
image.thumbnail((800, 600))
# Сохранение
image.save(output_path, 'JPEG', quality=85)
@app.post('/upload-and-process')
async def upload_and_process(
file: UploadFile = File(...),
background_tasks: BackgroundTasks = None
):
"""
Сохраняет файл и обрабатывает в фоне.
"""
# Сохраняем оригинал
original_path = UPLOAD_DIR / f"original_{file.filename}"
contents = await file.read()
async with aiofiles.open(original_path, 'wb') as f:
await f.write(contents)
# Планируем обработку
output_path = UPLOAD_DIR / f"processed_{file.filename}"
background_tasks.add_task(
process_image,
original_path,
output_path
)
return {
'message': 'File uploaded, processing in background',
'original': str(original_path),
'processed': str(output_path)
}from fastapi import FastAPI, BackgroundTasks, Depends
from datetime import datetime
from typing import Dict, List
app = FastAPI()
# Хранилище уведомлений (в реальности — БД)
notifications_db: Dict[int, List[dict]] = {}
def save_notification(user_id: int, message: str):
"""
Сохраняет уведомление в БД.
"""
if user_id not in notifications_db:
notifications_db[user_id] = []
notifications_db[user_id].append({
'message': message,
'timestamp': datetime.utcnow().isoformat(),
'read': False
})
def send_push_notification(user_id: int, message: str):
"""
Отправляет push-уведомление.
"""
# Интеграция с Firebase, OneSignal и т.д.
print(f"Push to {user_id}: {message}")
@app.post('/users/{user_id}/notify')
async def notify_user(
user_id: int,
message: str,
background_tasks: BackgroundTasks
):
"""
Создаёт уведомление и отправляет push.
"""
# Сохраняем в БД (фон)
background_tasks.add_task(save_notification, user_id, message)
# Отправляем push (фон)
background_tasks.add_task(send_push_notification, user_id, message)
return {'message': 'Notification queued'}
@app.get('/users/{user_id}/notifications')
async def get_notifications(user_id: int):
"""
Получает уведомления пользователя.
"""
return {
'user_id': user_id,
'notifications': notifications_db.get(user_id, [])
}Для сложных задач (повторные попытки, расписание, прогресс) используйте Celery.
pip install celery rediscelery_app.py:
from celery import Celery
import time
celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, email: str, subject: str, body: str):
"""
Celery задача для отправки email.
"""
try:
# Имитация отправки
time.sleep(2)
print(f"Email sent to {email}")
return {'status': 'success', 'email': email}
except Exception as exc:
# Повторная попытка
raise self.retry(exc=exc, countdown=60)
@celery_app.task
def process_large_file_task(file_path: str):
"""
Обработка большого файла.
"""
time.sleep(10) # Имитация долгой обработки
return {'status': 'completed', 'file': file_path}
@celery_app.task
def heavy_computation_task(data: list):
"""
Тяжёлые вычисления.
"""
result = sum(x ** 2 for x in data)
return {'result': result}from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, EmailStr
from celery_app import send_email_task, process_large_file_task
app = FastAPI()
class EmailRequest(BaseModel):
email: EmailStr
subject: str
body: str
@app.post('/send-email-celery')
async def send_email_celery(request: EmailRequest):
"""
Отправляет email через Celery.
"""
# Отправляем задачу в Celery
task = send_email_task.delay(
request.email,
request.subject,
request.body
)
return {
'message': 'Email task queued',
'task_id': task.id
}
@app.get('/task-status/{task_id}')
async def get_task_status(task_id: str):
"""
Проверяет статус задачи.
"""
from celery.result import AsyncResult
task = AsyncResult(task_id, app=send_email_task.app)
if task.state == 'PENDING':
return {'status': 'pending', 'task_id': task_id}
elif task.state == 'STARTED':
return {'status': 'started', 'task_id': task_id}
elif task.state == 'SUCCESS':
return {'status': 'success', 'result': task.result}
elif task.state == 'FAILURE':
return {'status': 'failed', 'error': str(task.info)}
else:
return {'status': task.state}
@app.post('/process-file-celery')
async def process_file_celery(file_path: str):
"""
Обрабатывает файл через Celery.
"""
task = process_large_file_task.delay(file_path)
return {
'message': 'File processing started',
'task_id': task.id
}# В отдельном терминале
celery -A celery_app worker --loglevel=info# celery_app.py
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
'send-daily-report': {
'task': 'celery_app.send_daily_report_task',
'schedule': crontab(hour=9, minute=0), # Каждый день в 9:00
},
'cleanup-old-files': {
'task': 'celery_app.cleanup_old_files_task',
'schedule': crontab(hour=2, minute=0, day_of_week=0), # Каждое воскресенье в 2:00
},
}
@celery_app.task
def send_daily_report_task():
"""
Ежедневный отчёт.
"""
# Генерация и отправка отчёта
pass
@celery_app.task
def cleanup_old_files_task():
"""
Очистка старых файлов.
"""
# Удаление файлов старше 30 дней
passcelery -A celery_app beat --loglevel=infofrom fastapi import FastAPI, BackgroundTasks, Depends, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel, EmailStr
from datetime import datetime
from typing import List, Optional
import smtplib
from email.mime.text import MIMEText
from database import get_db
from models import User, Notification
app = FastAPI()
# === Модели ===
class NotificationCreate(BaseModel):
user_id: int
title: str
message: str
send_email: bool = False
class NotificationResponse(BaseModel):
id: int
title: str
message: str
created_at: datetime
read: bool
class Config:
from_attributes = True
# === Фоновые задачи ===
def save_notification_db(
user_id: int,
title: str,
message: str,
db: Session
):
"""Сохраняет уведомление в БД"""
notification = Notification(
user_id=user_id,
title=title,
message=message,
created_at=datetime.utcnow()
)
db.add(notification)
db.commit()
def send_email_notification(
user_email: str,
title: str,
message: str
):
"""Отправляет email уведомление"""
msg = MIMEText(message)
msg['Subject'] = title
msg['From'] = 'noreply@example.com'
msg['To'] = user_email
with smtplib.SMTP('smtp.example.com', 587) as server:
server.starttls()
server.login('user', 'password')
server.send_message(msg)
def send_push_notification(user_id: int, title: str, message: str):
"""Отправляет push уведомление"""
# Интеграция с Firebase Cloud Messaging
print(f"Push to {user_id}: {title} - {message}")
# === Endpoints ===
@app.post('/notifications', response_model=NotificationResponse)
async def create_notification(
notification: NotificationCreate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
"""
Создаёт уведомление и отправляет через email/push.
"""
# Проверяем пользователя
user = db.query(User).filter(User.id == notification.user_id).first()
if not user:
raise HTTPException(404, "User not found")
# Сохраняем в БД (фон)
background_tasks.add_task(
save_notification_db,
notification.user_id,
notification.title,
notification.message,
db
)
# Отправляем email если нужно (фон)
if notification.send_email:
background_tasks.add_task(
send_email_notification,
user.email,
notification.title,
notification.message
)
# Отправляем push (фон)
background_tasks.add_task(
send_push_notification,
notification.user_id,
notification.title,
notification.message
)
# Возвращаем ответ сразу
return NotificationResponse(
id=0, # Будет создан в фоне
title=notification.title,
message=notification.message,
created_at=datetime.utcnow(),
read=False
)
@app.get('/users/{user_id}/notifications', response_model=List[NotificationResponse])
async def get_user_notifications(
user_id: int,
skip: int = 0,
limit: int = 50,
db: Session = Depends(get_db)
):
"""
Получает уведомления пользователя.
"""
notifications = db.query(Notification).filter(
Notification.user_id == user_id
).order_by(Notification.created_at.desc()).offset(skip).limit(limit).all()
return notifications
@app.patch('/notifications/{notification_id}/read')
async def mark_notification_read(
notification_id: int,
db: Session = Depends(get_db)
):
"""
Отмечает уведомление как прочитанное.
"""
notification = db.query(Notification).filter(
Notification.id == notification_id
).first()
if not notification:
raise HTTPException(404, "Notification not found")
notification.read = True
db.commit()
return {'status': 'marked as read'}background_tasks.add_task(long_task)
# Ждём завершения — неправильно!
time.sleep(10)
return {'result': 'done'}Проблема: Блокирует ответ, теряется смысл background tasks.
Решение: Возвращайте ответ сразу после add_task().
background_tasks.add_task(save_payment_to_db)
return {'status': 'paid'}Проблема: Если задача упадёт, оплата не сохранится, но клиент получит успех.
Решение: Критичные операции выполняйте до возврата ответа.
def send_email(email: str):
smtp.send(email) # Может выбросить исключение!Решение:
def send_email(email: str):
try:
smtp.send(email)
except Exception as e:
logger.error(f"Failed to send email: {e}")В следующей теме вы изучите кэширование — Redis, memoization, стратегии инвалидации.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.