Upload файлов, обработка изображений, интеграция с S3
Загрузка файлов — частая задача в веб-приложениях. В этой теме вы научитесь принимать файлы, обрабатывать изображения, загружать в S3 и обеспечивать безопасность.
FastAPI использует UploadFile для приёма файлов:
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
app = FastAPI()
@app.post('/upload')
async def upload_file(file: UploadFile = File(...)):
"""
Принимает один файл.
"""
# Чтение содержимого
contents = await file.read()
return {
'filename': file.filename,
'content_type': file.content_type,
'size': len(contents)
}| Атрибут | Описание |
|---|---|
filename | Имя файла от клиента |
content_type | MIME-тип (image/jpeg, application/pdf) |
file | SpooledTemporaryFile (файл на диске при большом размере) |
headers | Заголовки |
curl -X POST 'http://localhost:8000/upload' \
-F 'file=@/path/to/file.pdf'/docs/uploadfrom typing import List
@app.post('/upload-multiple')
async def upload_multiple(files: List[UploadFile] = File(...)):
"""
Принимает несколько файлов.
"""
uploaded = []
for file in files:
contents = await file.read()
uploaded.append({
'filename': file.filename,
'size': len(contents)
})
return {'uploaded': uploaded}curl -X POST 'http://localhost:8000/upload-multiple' \
-F 'files=@file1.pdf' \
-F 'files=@file2.pdf' \
-F 'files=@file3.pdf'import aiofiles
from pathlib import Path
UPLOAD_DIR = Path('uploads')
UPLOAD_DIR.mkdir(exist_ok=True)
@app.post('/upload/save')
async def save_file(file: UploadFile = File(...)):
"""
Сохраняет файл на диск.
"""
file_path = UPLOAD_DIR / file.filename
# Асинхронная запись
async with aiofiles.open(file_path, 'wb') as out_file:
contents = await file.read()
await out_file.write(contents)
return {'path': str(file_path)}from fastapi import BackgroundTasks
import shutil
def save_file_sync(file_path: Path, contents: bytes):
"""Синхронное сохранение (для CPU-bound операций)"""
with open(file_path, 'wb') as f:
f.write(contents)
@app.post('/upload/save-sync')
async def save_file_sync_endpoint(
file: UploadFile = File(...),
background_tasks: BackgroundTasks = None
):
contents = await file.read()
file_path = UPLOAD_DIR / file.filename
# Сохранение в background (не блокирует ответ)
if background_tasks:
background_tasks.add_task(save_file_sync, file_path, contents)
return {'path': str(file_path), 'status': 'saving'}from fastapi import HTTPException, status
ALLOWED_TYPES = ['image/jpeg', 'image/png', 'image/gif']
@app.post('/upload/image')
async def upload_image(file: UploadFile = File(...)):
if file.content_type not in ALLOWED_TYPES:
raise HTTPException(
status_code=400,
detail=f"Invalid file type. Allowed: {ALLOWED_TYPES}"
)
return {'filename': file.filename}from pathlib import Path
ALLOWED_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif'}
def validate_extension(filename: str):
ext = Path(filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Invalid extension. Allowed: {ALLOWED_EXTENSIONS}"
)
return ext
@app.post('/upload/validated')
async def upload_validated(file: UploadFile = File(...)):
# Проверка расширения
ext = validate_extension(file.filename)
# Проверка MIME-типа
if not file.content_type.startswith('image/'):
raise HTTPException(
status_code=400,
detail="File must be an image"
)
# Проверка размера (10 MB)
contents = await file.read()
if len(contents) > 10 * 1024 * 1024:
raise HTTPException(
status_code=400,
detail="File too large (max 10MB)"
)
return {'filename': file.filename, 'extension': ext, 'size': len(contents)}from fastapi import File
@app.post('/upload/size-limited')
async def upload_size_limited(
file: UploadFile = File(
...,
description="Image file (max 10MB)"
)
):
contents = await file.read()
if len(contents) > 10 * 1024 * 1024:
raise HTTPException(400, "File too large")
return {'size': len(contents)}pip install pillow python-magicfrom PIL import Image
import io
@app.post('/upload/resize')
async def resize_image(
file: UploadFile = File(...),
width: int = 800,
height: int = 600
):
# Проверка типа
if not file.content_type.startswith('image/'):
raise HTTPException(400, "File must be an image")
# Чтение и обработка
contents = await file.read()
image = Image.open(io.BytesIO(contents))
# Изменение размера с сохранением пропорций
image.thumbnail((width, height), Image.Resampling.LANCZOS)
# Сохранение в буфер
output = io.BytesIO()
# Определяем формат
format_name = image.format or 'JPEG'
image.save(output, format=format_name, quality=85)
output.seek(0)
# Сохранение файла
output_path = UPLOAD_DIR / f"resized_{file.filename}"
with open(output_path, 'wb') as f:
f.write(output.getvalue())
return {
'original_size': len(contents),
'resized_size': output.tell(),
'dimensions': image.size,
'path': str(output_path)
}@app.post('/upload/thumbnail')
async def create_thumbnail(file: UploadFile = File(...)):
if not file.content_type.startswith('image/'):
raise HTTPException(400, "File must be an image")
contents = await file.read()
image = Image.open(io.BytesIO(contents))
# Создание квадратной миниатюры
size = (200, 200)
image.thumbnail(size, Image.Resampling.LANCZOS)
# Crop по центру для квадрата
width, height = image.size
left = (width - 200) / 2
top = (height - 200) / 2
right = (width + 200) / 2
bottom = (height + 200) / 2
image = image.crop((left, top, right, bottom))
# Сохранение
output = io.BytesIO()
image.save(output, format='JPEG', quality=85)
output.seek(0)
return {
'thumbnail_size': output.tell(),
'dimensions': (200, 200)
}import magic
@app.post('/upload/analyze')
async def analyze_file(file: UploadFile = File(...)):
contents = await file.read()
# Определение реального типа файла (по содержимому)
mime = magic.Magic(mime=True)
real_type = mime.from_buffer(contents[:1024])
# Проверка на соответствие declared типа
if file.content_type != real_type:
# Предупреждение о возможной подделке
print(f"Warning: declared {file.content_type}, actual {real_type}")
result = {
'filename': file.filename,
'declared_type': file.content_type,
'actual_type': real_type,
'size': len(contents)
}
# Для изображений — дополнительные данные
if real_type.startswith('image/'):
image = Image.open(io.BytesIO(contents))
result.update({
'format': image.format,
'dimensions': image.size,
'mode': image.mode
})
return resultpip install boto3import boto3
from botocore.exceptions import NoCredentialsError
S3_BUCKET = 'your-bucket-name'
S3_REGION = 'us-east-1'
AWS_ACCESS_KEY = os.environ.get('AWS_ACCESS_KEY')
AWS_SECRET_KEY = os.environ.get('AWS_SECRET_KEY')
s3_client = boto3.client(
's3',
region_name=S3_REGION,
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY
)import uuid
from datetime import timedelta
@app.post('/upload/s3')
async def upload_to_s3(file: UploadFile = File(...)):
# Генерация уникального имени
ext = Path(file.filename).suffix
object_name = f"uploads/{uuid.uuid4()}{ext}"
try:
# Чтение содержимого
contents = await file.read()
# Загрузка в S3
s3_client.put_object(
Bucket=S3_BUCKET,
Key=object_name,
Body=contents,
ContentType=file.content_type,
ACL='public-read' # Или 'private'
)
# Генерация URL
file_url = f"https://{S3_BUCKET}.s3.{S3_REGION}.amazonaws.com/{object_name}"
return {
'url': file_url,
'object_name': object_name,
'bucket': S3_BUCKET
}
except NoCredentialsError:
raise HTTPException(500, "AWS credentials not found")
except Exception as e:
raise HTTPException(500, f"S3 upload failed: {str(e)}")@app.get('/download/{object_name}')
async def get_presigned_url(object_name: str):
"""
Генерирует временную ссылку для скачивания.
"""
try:
url = s3_client.generate_presigned_url(
'get_object',
Params={
'Bucket': S3_BUCKET,
'Key': object_name
},
ExpiresIn=3600 # 1 час
)
return {'download_url': url, 'expires_in': 3600}
except Exception as e:
raise HTTPException(500, f"Failed to generate URL: {str(e)}")@app.post('/upload/s3/presigned')
async def get_upload_presigned_url(filename: str, content_type: str):
"""
Возвращает presigned URL для прямой загрузки клиента в S3.
Сервер не обрабатывает файл.
"""
ext = Path(filename).suffix
object_name = f"uploads/{uuid.uuid4()}{ext}"
try:
url = s3_client.generate_presigned_url(
'put_object',
Params={
'Bucket': S3_BUCKET,
'Key': object_name,
'ContentType': content_type
},
ExpiresIn=3600
)
return {
'upload_url': url,
'object_name': object_name,
'expires_in': 3600
}
except Exception as e:
raise HTTPException(500, f"Failed to generate URL: {str(e)}")| Уязвимость | Описание | Защита |
|---|---|---|
| Malware | Загрузка вредоносных файлов | Антивирусная проверка, sandbox |
| Path traversal | ../../../etc/passwd | Валидация имени, sanitization |
| Oversized files | DoS через большие файлы | Лимит размера, streaming |
| Wrong extension | .php вместо .jpg | Проверка расширения и MIME |
| Magic bytes | Подделка типа файла | Проверка по содержимому (magic) |
import re
import secrets
def sanitize_filename(filename: str) -> str:
"""
Очищает имя файла от опасных символов.
"""
# Удаляем путь
filename = Path(filename).name
# Разрешаем только буквы, цифры, дефис, подчёркивание, точку
filename = re.sub(r'[^a-zA-Z0-9._-]', '_', filename)
# Ограничиваем длину
if len(filename) > 255:
name, ext = Path(filename).stem, Path(filename).suffix
filename = f"{name[:255-len(ext)]}{ext}"
return filename
@app.post('/upload/secure')
async def upload_secure(file: UploadFile = File(...)):
# 1. Проверка расширения
allowed_ext = {'.jpg', '.jpeg', '.png', '.gif', '.pdf'}
ext = Path(file.filename).suffix.lower()
if ext not in allowed_ext:
raise HTTPException(400, f"Invalid extension. Allowed: {allowed_ext}")
# 2. Проверка MIME-типа
allowed_types = ['image/jpeg', 'image/png', 'image/gif', 'application/pdf']
if file.content_type not in allowed_types:
raise HTTPException(400, f"Invalid content type")
# 3. Чтение и проверка размера
contents = await file.read()
if len(contents) > 10 * 1024 * 1024: # 10 MB
raise HTTPException(400, "File too large")
# 4. Проверка по magic bytes
mime = magic.Magic(mime=True)
real_type = mime.from_buffer(contents[:1024])
if real_type not in allowed_types:
raise HTTPException(400, "File content does not match declared type")
# 5. Генерация безопасного имени
safe_filename = f"{secrets.token_hex(16)}{ext}"
file_path = UPLOAD_DIR / safe_filename
# 6. Сохранение
with open(file_path, 'wb') as f:
f.write(contents)
return {
'filename': safe_filename,
'size': len(contents),
'content_type': real_type
}from fastapi import FastAPI, UploadFile, File, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from pathlib import Path
from datetime import datetime
from typing import Optional
import uuid
import aiofiles
app = FastAPI()
# === Конфигурация ===
UPLOAD_DIR = Path('uploads')
UPLOAD_DIR.mkdir(exist_ok=True)
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
ALLOWED_TYPES = {
'image': ['image/jpeg', 'image/png', 'image/gif', 'image/webp'],
'document': ['application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'],
'video': ['video/mp4', 'video/quicktime']
}
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# === Модели ===
class FileUploadResponse(BaseModel):
id: str
filename: str
original_filename: str
content_type: str
size: int
uploaded_at: datetime
url: str
class FileListResponse(BaseModel):
files: list[FileUploadResponse]
total: int
# === Имитация БД ===
files_db = {}
# === Зависимости ===
async def get_current_user(token: str = Depends(oauth2_scheme)):
# Проверка токена
return {'id': 1, 'username': 'testuser'}
# === Endpoints ===
@app.post('/upload', response_model=FileUploadResponse)
async def upload_file(
file: UploadFile = File(...),
category: str = 'general',
current_user: dict = Depends(get_current_user)
):
"""
Загрузка файла с аутентификацией.
"""
# Проверка категории
if category not in ALLOWED_TYPES:
raise HTTPException(400, f"Invalid category. Allowed: {list(ALLOWED_TYPES.keys())}")
# Проверка типа файла
allowed = ALLOWED_TYPES.get(category, [])
if file.content_type not in allowed:
raise HTTPException(
400,
f"Invalid file type for category '{category}'. Allowed: {allowed}"
)
# Чтение и проверка размера
contents = await file.read()
if len(contents) > MAX_FILE_SIZE:
raise HTTPException(400, f"File too large (max {MAX_FILE_SIZE} bytes)")
# Генерация ID и имени
file_id = str(uuid.uuid4())
ext = Path(file.filename).suffix
safe_filename = f"{file_id}{ext}"
file_path = UPLOAD_DIR / safe_filename
# Сохранение
async with aiofiles.open(file_path, 'wb') as out_file:
await out_file.write(contents)
# Сохранение метаданных
file_data = {
'id': file_id,
'filename': safe_filename,
'original_filename': file.filename,
'content_type': file.content_type,
'size': len(contents),
'uploaded_at': datetime.utcnow(),
'uploaded_by': current_user['id'],
'category': category
}
files_db[file_id] = file_data
return FileUploadResponse(
id=file_id,
filename=safe_filename,
original_filename=file.filename,
content_type=file.content_type,
size=len(contents),
uploaded_at=file_data['uploaded_at'],
url=f"/files/{file_id}"
)
@app.get('/files/{file_id}', response_model=FileUploadResponse)
async def get_file(file_id: str):
"""
Получение информации о файле.
"""
if file_id not in files_db:
raise HTTPException(404, "File not found")
file_data = files_db[file_id]
return FileUploadResponse(
id=file_data['id'],
filename=file_data['filename'],
original_filename=file_data['original_filename'],
content_type=file_data['content_type'],
size=file_data['size'],
uploaded_at=file_data['uploaded_at'],
url=f"/files/{file_id}/download"
)
@app.get('/files/{file_id}/download')
async def download_file(file_id: str):
"""
Скачивание файла.
"""
from fastapi.responses import FileResponse
if file_id not in files_db:
raise HTTPException(404, "File not found")
file_data = files_db[file_id]
file_path = UPLOAD_DIR / file_data['filename']
if not file_path.exists():
raise HTTPException(404, "File not found on disk")
return FileResponse(
path=file_path,
filename=file_data['original_filename'],
media_type=file_data['content_type']
)
@app.delete('/files/{file_id}')
async def delete_file(
file_id: str,
current_user: dict = Depends(get_current_user)
):
"""
Удаление файла.
"""
if file_id not in files_db:
raise HTTPException(404, "File not found")
file_data = files_db[file_id]
# Проверка прав (владелец или админ)
if file_data['uploaded_by'] != current_user['id']:
raise HTTPException(403, "Not enough permissions")
# Удаление с диска
file_path = UPLOAD_DIR / file_data['filename']
if file_path.exists():
file_path.unlink()
# Удаление из БД
del files_db[file_id]
return {'deleted': True}
@app.get('/files', response_model=FileListResponse)
async def list_files(
skip: int = 0,
limit: int = 100,
current_user: dict = Depends(get_current_user)
):
"""
Список файлов пользователя.
"""
user_files = [
f for f in files_db.values()
if f['uploaded_by'] == current_user['id']
]
paginated = user_files[skip:skip + limit]
return FileListResponse(
files=[
FileUploadResponse(
id=f['id'],
filename=f['filename'],
original_filename=f['original_filename'],
content_type=f['content_type'],
size=f['size'],
uploaded_at=f['uploaded_at'],
url=f"/files/{f['id']}"
)
for f in paginated
],
total=len(user_files)
)contents = await file.read() # Может быть огромным!
# Валидация размера после чтенияРешение: Читайте частями или используйте streaming для больших файлов.
file_path = UPLOAD_DIR / file.filename # Опасно!Решение: Генерируйте уникальное имя: f"{uuid.uuid4()}{ext}".
if file.content_type == 'image/jpeg': # Можно подделать!
...Решение: Проверяйте magic bytes через python-magic.
В следующей теме вы изучите фоновые задачи (BackgroundTasks) — отложенное выполнение, уведомления, email.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.