REST, GraphQL, gRPC — выбор инструмента. Контрактный подход, версионирование, документирование API.
Хороший API — как хорошая шутка: не требует объяснений
Представьте: вы разрабатываете микросервис, который используют пять других команд. Через полгода появляется требование добавить новую функциональность, но существующий API не позволяет сделать это без breaking changes. Клиенты ломаются, релиз откладывается, все недовольны.
Правильное проектирование API с самого начала экономит сотни часов на поддержку и миграции.
В этой теме разберём:
REST — это архитектурный стиль, а не протокол. Он основан на ресурсах и стандартных HTTP-методах.
Ресурс — это любая сущность, к которой нужен доступ: пользователь, заказ, товар.
Правила именования:
# Плохо
GET /getUsers
POST /createUser
GET /api/v1/users/123/orders/456/items
# Хорошо
GET /users
POST /users
GET /users/123/orders
GET /orders/456/items| Метод | Описание | Идемпотентность | Безопасность |
|---|---|---|---|
GET | Получение ресурса | Да | Да |
POST | Создание ресурса | Нет | Нет |
PUT | Полное обновление | Да | Нет |
PATCH | Частичное обновление | Нет* | Нет |
DELETE | Удаление ресурса | Да | Нет |
*PATCH не идемпотентен по спецификации, но на практике часто делается идемпотентным.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class UserCreate(BaseModel):
name: str
email: str
class UserUpdate(BaseModel):
name: str | None = None
email: str | None = None
class User(BaseModel):
id: int
name: str
email: str
# Имитация базы данных
users_db = {
1: User(id=1, name="Alice", email="alice@example.com"),
}
next_id = 2
@app.get("/users", response_model=list[User])
def list_users():
"""GET — получение коллекции"""
return list(users_db.values())
@app.get("/users/{user_id}", response_model=User)
def get_user(user_id: int):
"""GET — получение конкретного ресурса"""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="User not found")
return users_db[user_id]
@app.post("/users", response_model=User, status_code=201)
def create_user(user: UserCreate):
"""POST — создание нового ресурса"""
global next_id
new_user = User(id=next_id, **user.model_dump())
users_db[next_id] = new_user
next_id += 1
return new_user
@app.put("/users/{user_id}", response_model=User)
def update_user(user_id: int, user: UserCreate):
"""PUT — полное обновление (заменяет весь ресурс)"""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="User not found")
updated = User(id=user_id, **user.model_dump())
users_db[user_id] = updated
return updated
@app.patch("/users/{user_id}", response_model=User)
def patch_user(user_id: int, user: UserUpdate):
"""PATCH — частичное обновление (только указанные поля)"""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="User not found")
existing = users_db[user_id]
update_data = user.model_dump(exclude_unset=True)
updated = existing.model_copy(update=update_data)
users_db[user_id] = updated
return updated
@app.delete("/users/{user_id}", status_code=204)
def delete_user(user_id: int):
"""DELETE — удаление ресурса"""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="User not found")
del users_db[user_id]
return None # 204 No Content| Код | Значение | Когда использовать |
|---|---|---|
200 OK | Успех | GET, PUT, PATCH успешно выполнены |
201 Created | Ресурс создан | POST успешно создал ресурс |
204 No Content | Успех без тела ответа | DELETE успешно выполнен |
400 Bad Request | Ошибка клиента | Неверный формат данных |
401 Unauthorized | Не аутентифицирован | Нет токена или токен невалиден |
403 Forbidden | Нет доступа | Аутентифицирован, но нет прав |
404 Not Found | Ресурс не найден | ID не существует |
409 Conflict | Конфликт | Дубликат email, версия устарела |
422 Unprocessable Entity | Ошибка валидации | Данные валидны синтаксически, но не семантически |
429 Too Many Requests | Rate limit | Превышен лимит запросов |
500 Internal Server Error | Ошибка сервера | Unexpected exception |
from fastapi import status
@app.post("/users", response_model=User, status_code=status.HTTP_201_CREATED)
def create_user(user: UserCreate):
# Проверка на дубликат
for existing in users_db.values():
if existing.email == user.email:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="User with this email already exists"
)
# ... создание пользователяHATEOAS — это уровень зрелости REST, где ответы содержат ссылки на связанные ресурсы.
from pydantic import BaseModel
class UserWithLinks(BaseModel):
id: int
name: str
email: str
links: dict[str, str] # Ссылки на связанные действия
@app.get("/users/{user_id}", response_model=UserWithLinks)
def get_user_hateoas(user_id: int):
user = users_db.get(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserWithLinks(
id=user.id,
name=user.name,
email=user.email,
links={
"self": f"/users/{user_id}",
"update": f"/users/{user_id}",
"delete": f"/users/{user_id}",
"orders": f"/users/{user_id}/orders",
}
)Преимущества HATEOAS:
Недостатки:
GraphQL — это язык запросов для API, разработанный Facebook. Клиент запрашивает только нужные поля.
| Критерий | REST | GraphQL |
|---|---|---|
| Простая структура данных | ✅ | ⚠️ Избыточен |
| Сложные вложенные данные | ⚠️ Over/under-fetching | ✅ Точно нужные поля |
| Мобильные клиенты (трафик) | ⚠️ Фиксированный ответ | ✅ Минимальный ответ |
| Частые изменения фронтенда | ⚠️ Нужно менять бэкенд | ✅ Клиент сам выбирает поля |
| Кэширование | ✅ HTTP-кэш | ⚠️ Нужен слой кэширования |
| Файлы | ✅ Загрузка/скачивание | ⚠️ Требует расширений |
import graphene
from graphene import ObjectType, String, Int, List, Field, InputObjectType, Mutation
class User(ObjectType):
"""Тип пользователя в GraphQL"""
id = Int(required=True)
name = String(required=True)
email = String(required=True)
posts = List(lambda: Post) # Связь с постами
class Post(ObjectType):
"""Тип поста"""
id = Int(required=True)
title = String(required=True)
content = String()
author = Field(User)
class Query(ObjectType):
"""Корневой тип для запросов"""
# Получить одного пользователя
user = Field(User, id=Int(required=True))
# Получить список пользователей
users = List(User, limit=Int(default_value=10), offset=Int(default_value=0))
def resolve_user(root, info, id):
# Аналог SELECT * FROM users WHERE id = ?
return users_db.get(id)
def resolve_users(root, info, limit, offset):
# Аналог SELECT * FROM users LIMIT ? OFFSET ?
all_users = list(users_db.values())
return all_users[offset:offset + limit]
class UserInput(InputObjectType):
"""Входные данные для создания/обновления"""
name = String(required=True)
email = String(required=True)
class CreateUser(Mutation):
"""Мутация для создания пользователя"""
user = Field(User)
class Arguments:
name = String(required=True)
email = String(required=True)
def mutate(root, info, name, email):
global next_id
new_user = User(id=next_id, name=name, email=email)
users_db[next_id] = new_user
next_id += 1
return CreateUser(user=new_user)
class UpdateUser(Mutation):
"""Мутация для обновления"""
user = Field(User)
class Arguments:
id = Int(required=True)
name = String()
email = String()
def mutate(root, info, id, name=None, email=None):
if id not in users_db:
raise Exception("User not found")
user = users_db[id]
if name:
user.name = name
if email:
user.email = email
return UpdateUser(user=user)
class Schema(graphene.Schema):
query = Query
mutation = graphene.Mutation
schema = Schema()# Запрос только нужных полей — нет over-fetching
query {
user(id: 1) {
id
name
# email не запрашиваем — экономим трафик
}
}
# Вложенные данные в одном запросе — нет N+1 проблемы (при правильном resolver)
query {
user(id: 1) {
id
name
posts {
id
title
}
}
}
# Мутация
mutation {
createUser(name: "Bob", email: "bob@example.com") {
user {
id
name
}
}
}from fastapi import FastAPI
from starlette.graphql import GraphQLApp
app = FastAPI()
# GraphQL endpoint
app.add_route("/graphql", GraphQLApp(schema=schema))
# REST endpoints остаются
app.include_router(user_router, prefix="/api/v1/users")N+1 запросов:
def resolve_users(root, info, limit, offset):
users = list(users_db.values())[offset:offset + limit]
# Для каждого пользователя будет отдельный запрос к БД при получении posts
return users
def resolve_posts(user, info):
# SELECT * FROM posts WHERE author_id = ? — выполнится N раз
return get_posts_by_author(user.id)Решение — DataLoader (batch loading):
from aiodataloader import DataLoader
class PostLoader(DataLoader):
async def batch_load_fn(self, user_ids):
# Один запрос: SELECT * FROM posts WHERE author_id IN (?, ?, ...)
posts = await db.fetch_all(
"SELECT * FROM posts WHERE author_id = ANY($1)",
user_ids
)
# Группируем по author_id
posts_by_user = {}
for post in posts:
posts_by_user.setdefault(post.author_id, []).append(post)
return [posts_by_user.get(uid, []) for uid in user_ids]
# В resolver
async def resolve_posts(user, info):
loader = info.context.post_loader
return await loader.load(user.id)gRPC — это высокопроизводительный RPC-фреймворк от Google. Использует HTTP/2 и Protocol Buffers (protobuf) для сериализации.
| Критерий | REST/GraphQL | gRPC |
|---|---|---|
| Публичное API | ✅ | ⚠️ Требует grpc-web |
| Внутренние микросервисы | ⚠️ JSON медленнее | ✅ Бинарный протокол, HTTP/2 |
| Стриминг | ⚠️ SSE, WebSockets | ✅ Встроенная поддержка |
| Кодогенерация | ⚠️ OpenAPI генераторы | ✅ Официальные генераторы |
| Браузерный клиент | ✅ | ⚠️ grpc-web прокси |
| Интерактивная отладка | ✅ curl, Postman | ⚠️ BloomRPC, grpcurl |
// user.proto
syntax = "proto3";
package user;
// Сервис определяет методы
service UserService {
// Unary RPC — один запрос, один ответ
rpc GetUser(GetUserRequest) returns (User);
// Server streaming — один запрос, поток ответов
rpc ListUsers(ListUsersRequest) returns (stream User);
// Client streaming — поток запросов, один ответ
rpc CreateUserStream(stream User) returns (User);
// Bidirectional streaming — поток в обе стороны
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
// Сообщения
message User {
int32 id = 1;
string name = 2;
string email = 3;
repeated string roles = 4; // Список строк
}
message GetUserRequest {
int32 id = 1;
}
message ListUsersRequest {
int32 limit = 1;
int32 offset = 2;
}
message ChatMessage {
string user_id = 1;
string message = 2;
int64 timestamp = 3;
}# Установка
pip install grpcio grpcio-tools
# Генерация
python -m grpc_tools.protoc \
-I. \
--python_out=. \
--grpc_python_out=. \
user.protoimport grpc
from concurrent import futures
import user_pb2
import user_pb2_grpc
class UserServiceServicer(user_pb2_grpc.UserServiceServicer):
"""Реализация сервиса"""
def GetUser(self, request, context):
user = users_db.get(request.id)
if not user:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details("User not found")
return user_pb2.User()
return user_pb2.User(id=user.id, name=user.name, email=user.email)
def ListUsers(self, request, context):
all_users = list(users_db.values())
for user in all_users[request.offset:request.offset + request.limit]:
yield user_pb2.User(id=user.id, name=user.name, email=user.email)
def CreateUserStream(self, request_iterator, context):
# Клиент отправляет поток пользователей, сервер возвращает последнего
last_user = None
for user_request in request_iterator:
last_user = user_request
return last_user
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
user_pb2_grpc.add_UserServiceServicer_to_server(
UserServiceServicer(), server
)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
serve()import grpc
import user_pb2
import user_pb2_grpc
def get_user():
channel = grpc.insecure_channel("localhost:50051")
stub = user_pb2_grpc.UserServiceStub(channel)
response = stub.GetUser(user_pb2.GetUserRequest(id=1))
print(f"User: {response.name} ({response.email})")
def list_users():
channel = grpc.insecure_channel("localhost:50051")
stub = user_pb2_grpc.UserServiceStub(channel)
for user in stub.ListUsers(user_pb2.ListUsersRequest(limit=10, offset=0)):
print(f"User: {user.name}")
def create_user_stream():
channel = grpc.insecure_channel("localhost:50051")
stub = user_pb2_grpc.UserServiceStub(channel)
def request_generator():
yield user_pb2.User(id=1, name="Alice", email="alice@example.com")
yield user_pb2.User(id=2, name="Bob", email="bob@example.com")
response = stub.CreateUserStream(request_generator())
print(f"Last user: {response.name}")from fastapi import FastAPI
import grpc
import user_pb2
import user_pb2_grpc
app = FastAPI()
@app.get("/api/users/{user_id}")
def get_user_via_grpc(user_id: int):
"""FastAPI endpoint, который вызывает gRPC сервис"""
channel = grpc.insecure_channel("user-service:50051")
stub = user_pb2_grpc.UserServiceStub(channel)
try:
response = stub.GetUser(user_pb2.GetUserRequest(id=user_id))
return {"id": response.id, "name": response.name, "email": response.email}
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
raise HTTPException(status_code=404, detail="User not found")
raise HTTPException(status_code=500, detail="gRPC error")API будет меняться. Вопрос не в том, изменится ли API, а в том, как вы обработаете изменения.
GET /api/v1/users
GET /api/v2/users
Плюсы:
Минусы:
from fastapi import APIRouter
v1_router = APIRouter(prefix="/api/v1")
v2_router = APIRouter(prefix="/api/v2")
@v1_router.get("/users")
def list_users_v1():
# Старая версия: email в верхнем регистре
return [{"id": 1, "name": "Alice", "email": "ALICE@EXAMPLE.COM"}]
@v2_router.get("/users")
def list_users_v2():
# Новая версия: email в нижнем регистре
return [{"id": 1, "name": "Alice", "email": "alice@example.com"}]
app.include_router(v1_router)
app.include_router(v2_router)GET /users
Accept-Version: v2
# или
X-API-Version: 2
Плюсы:
Минусы:
from fastapi import Header
@app.get("/users")
def list_users(version: str = Header(default="v1")):
if version == "v2":
return list_users_v2()
return list_users_v1()GET /users
Accept: application/vnd.myapi.v1+json
# или
Accept: application/vnd.myapi.v2+json
Плюсы:
Минусы:
from fastapi import Request
@app.get("/users")
async def list_users(request: Request):
accept = request.headers.get("Accept", "")
if "v2" in accept:
return list_users_v2()
return list_users_v1()GET /users?version=2
Плюсы:
Минусы:
from fastapi import HTTPException
from datetime import datetime, timedelta
DEPRECATION_SCHEDULE = {
"/api/v1/users": {
"deprecated_since": datetime(2025, 1, 1),
"sunset": datetime(2025, 7, 1),
"replacement": "/api/v2/users",
}
}
@app.get("/api/v1/users")
def list_users_v1():
route = "/api/v1/users"
schedule = DEPRECATION_SCHEDULE.get(route)
if schedule and datetime.now() > schedule["sunset"]:
raise HTTPException(
status_code=410, # Gone
detail=f"This API version is deprecated. Use {schedule['replacement']}"
)
response = list_users_v1_impl()
if schedule:
# Добавить заголовки deprecation
response.headers["Deprecation"] = schedule["deprecated_since"].isoformat()
response.headers["Sunset"] = schedule["sunset"].isoformat()
response.headers["Link"] = f'<{schedule["replacement"]}>; rel="successor-version"'
return responseСначала определяем контракт (спецификацию), потом пишем код.
OpenAPI — это спецификация для описания REST API.
Преимущества:
from fastapi import FastAPI
from pydantic import BaseModel, Field
app = FastAPI(
title="User API",
description="API для управления пользователями",
version="2.0.0",
docs_url="/docs", # Swagger UI
redoc_url="/redoc", # ReDoc
openapi_url="/openapi.json",
)
class UserCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=100, description="Имя пользователя")
email: str = Field(..., pattern=r"^[^@]+@[^@]+\.[^@]+$", description="Email")
class Config:
json_schema_extra = {
"example": {
"name": "Alice",
"email": "alice@example.com"
}
}
class User(UserCreate):
id: int = Field(..., description="Уникальный ID")
class Config:
json_schema_extra = {
"example": {
"id": 1,
"name": "Alice",
"email": "alice@example.com"
}
}
class Error(BaseModel):
detail: str
code: str | None = None
@app.post(
"/users",
response_model=User,
status_code=201,
responses={
400: {"model": Error, "description": "Неверные данные"},
409: {"model": Error, "description": "Пользователь существует"},
},
summary="Создать пользователя",
description="Создаёт нового пользователя с указанными именем и email",
tags=["Users"],
)
def create_user(user: UserCreate):
"""Создание пользователя с валидацией контракта"""
# FastAPI автоматически валидирует запрос по схеме
# ...from pydantic import BaseModel, validator, Field
from typing import Optional
from enum import Enum
class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
GUEST = "guest"
class UserCreate(BaseModel):
name: str = Field(..., min_length=2, max_length=50)
email: str
role: UserRole = UserRole.USER
age: Optional[int] = Field(None, ge=0, le=150)
@validator("email")
def validate_email(cls, v):
if "@" not in v or "." not in v:
raise ValueError("Invalid email format")
return v.lower()
@validator("name")
def validate_name(cls, v):
if not v.strip():
raise ValueError("Name cannot be empty or whitespace")
return v.strip().title()
# Автоматическая валидация
try:
user = UserCreate(name=" alice ", email="ALICE@EXAMPLE.COM", age=200)
except ValidationError as e:
print(e) # age: ensure this value is less than or equal to 150# Генерация Python клиента из OpenAPI спецификации
pip install openapi-generator
openapi-generator generate \
-i http://localhost:8000/openapi.json \
-g python \
-o ./generated-clientСтандартизированная обработка ошибок упрощает жизнь клиентам.
from pydantic import BaseModel
from typing import Optional, Any
class ErrorDetail(BaseModel):
"""Детали ошибки"""
field: str # Поле, вызвашее ошибку
message: str # Сообщение
code: Optional[str] = None # Код ошибки для программного обработки
class ErrorResponse(BaseModel):
"""Стандартный формат ответа об ошибке"""
error: str # Тип ошибки
message: str # Человекочитаемое сообщение
details: Optional[list[ErrorDetail]] = None # Детали
request_id: Optional[str] = None # ID запроса для трейсинга
timestamp: str # Время ошибки
# Пример ответа
{
"error": "validation_error",
"message": "Request validation failed",
"details": [
{"field": "email", "message": "Invalid email format", "code": "INVALID_FORMAT"},
{"field": "age", "message": "Must be at least 18", "code": "MIN_VALUE"}
],
"request_id": "req_123456",
"timestamp": "2025-03-03T10:00:00Z"
}from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
import uuid
from datetime import datetime
app = FastAPI()
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""Обработка ошибок валидации"""
details = []
for error in exc.errors():
details.append(ErrorDetail(
field=".".join(str(x) for x in error["loc"]),
message=error["msg"],
code=error["type"].upper()
))
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=ErrorResponse(
error="validation_error",
message="Request validation failed",
details=details,
request_id=str(uuid.uuid4()),
timestamp=datetime.utcnow().isoformat()
).model_dump()
)
@app.exception_handler(ValidationError)
async def pydantic_exception_handler(request: Request, exc: ValidationError):
"""Обработка ошибок Pydantic"""
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=ErrorResponse(
error="internal_error",
message="Response validation failed",
request_id=str(uuid.uuid4()),
timestamp=datetime.utcnow().isoformat()
).model_dump()
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Обработка всех остальных ошибок"""
# Логирование ошибки
app.logger.exception(f"Unhandled exception: {exc}")
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=ErrorResponse(
error="internal_error",
message="An unexpected error occurred",
request_id=str(uuid.uuid4()),
timestamp=datetime.utcnow().isoformat()
).model_dump()
)class APIException(Exception):
"""Базовое исключение для API"""
def __init__(self, status_code: int, error: str, message: str, details: list = None):
self.status_code = status_code
self.error = error
self.message = message
self.details = details
class NotFoundException(APIException):
def __init__(self, resource: str, id: Any):
super().__init__(
status_code=404,
error="not_found",
message=f"{resource} with id {id} not found"
)
class ConflictException(APIException):
def __init__(self, message: str, field: str = None):
details = [ErrorDetail(field=field, message=message)] if field else None
super().__init__(
status_code=409,
error="conflict",
message=message,
details=details
)
@app.exception_handler(APIException)
async def api_exception_handler(request: Request, exc: APIException):
return JSONResponse(
status_code=exc.status_code,
content=ErrorResponse(
error=exc.error,
message=exc.message,
details=exc.details,
request_id=str(uuid.uuid4()),
timestamp=datetime.utcnow().isoformat()
).model_dump()
)
# Использование
@app.get("/users/{user_id}")
def get_user(user_id: int):
if user_id not in users_db:
raise NotFoundException("User", user_id)
return users_db[user_id]Ограничение количества запросов для защиты от злоупотреблений.
| Алгоритм | Описание | Плюсы | Минусы |
|---|---|---|---|
| Fixed Window | Счётчик сбрасывается каждые N секунд | Просто реализовать | Spike на границе окна |
| Sliding Window | Скользящее окно | Равномерное распределение | Сложнее |
| Token Bucket | Токены добавляются с фиксированной скоростью | Burst allowed | Нужно хранить состояние |
| Leaky Bucket | Запросы обрабатываются с фиксированной скоростью | Равномерный выход | Нет burst |
import redis
from fastapi import Request, HTTPException, status
from functools import wraps
import time
class RateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def is_allowed(self, key: str, limit: int, window: int) -> bool:
"""
Проверка, разрешён ли запрос.
Args:
key: Уникальный ключ (например, user_id или IP)
limit: Максимальное количество запросов
window: Окно в секундах
Returns:
True если запрос разрешён
"""
current_time = int(time.time())
window_key = f"ratelimit:{key}:{current_time // window}"
# Атомарное увеличение счётчика
current = self.redis.incr(window_key)
if current == 1:
# Первый запрос в окне — устанавливаем TTL
self.redis.expire(window_key, window)
return current <= limit
def get_remaining(self, key: str, limit: int, window: int) -> int:
"""Получить оставшееся количество запросов"""
current_time = int(time.time())
window_key = f"ratelimit:{key}:{current_time // window}"
current = self.redis.get(window_key)
return max(0, limit - (int(current) if current else 0))
# Middleware для rate limiting
from fastapi import Response
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host
limiter = RateLimiter(redis_client)
# Глобальный лимит: 100 запросов в минуту
if not limiter.is_allowed(f"global:{client_ip}", limit=100, window=60):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Rate limit exceeded"
)
response = await call_next(request)
# Добавить заголовки с информацией о лимитах
remaining = limiter.get_remaining(f"global:{client_ip}", limit=100, window=60)
response.headers["X-RateLimit-Limit"] = "100"
response.headers["X-RateLimit-Remaining"] = str(remaining)
response.headers["X-RateLimit-Reset"] = str(int(time.time()) + 60)
return responsefrom slowapi import SlowRateLimiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
# Установка: pip install slowapi
slowapi_limiter = SlowRateLimiter(time_period=60, max_requests=10)
app = FastAPI()
app.state.limiter = slowapi_limiter
app.add_middleware(SlowAPIMiddleware)
@app.get("/users")
@slowapi_limiter.limit("5/minute") # 5 запросов в минуту для этого endpoint
def list_users():
return list(users_db.values())
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={"error": "rate_limit_exceeded", "message": str(exc)}
)Постраничная выдача больших наборов данных.
GET /users?limit=20&offset=40
Плюсы:
Минусы:
@app.get("/users")
def list_users(limit: int = 20, offset: int = 0):
# Ограничиваем максимальный лимит
limit = min(limit, 100)
all_users = list(users_db.values())
return all_users[offset:offset + limit]GET /users?limit=20&cursor=eyJpZCI6MTAwfQ==
Плюсы:
Минусы:
import base64
import json
def encode_cursor(user_id: int) -> str:
"""Кодировать cursor в base64"""
return base64.b64encode(json.dumps({"id": user_id}).encode()).decode()
def decode_cursor(cursor: str) -> dict:
"""Декодировать cursor"""
return json.loads(base64.b64decode(cursor).decode())
@app.get("/users")
def list_users_cursor(limit: int = 20, cursor: str = None):
all_users = sorted(users_db.values(), key=lambda u: u.id)
limit = min(limit, 100)
if cursor:
cursor_data = decode_cursor(cursor)
# Найти позицию после курсора
start_idx = next(
(i for i, u in enumerate(all_users) if u.id > cursor_data["id"]),
0
)
else:
start_idx = 0
users = all_users[start_idx:start_idx + limit]
# Следующий курсор
next_cursor = None
if users and len(users) == limit:
next_cursor = encode_cursor(users[-1].id)
return {
"data": users,
"pagination": {
"limit": limit,
"next_cursor": next_cursor,
"has_more": next_cursor is not None
}
}from pydantic import BaseModel
from typing import Generic, TypeVar
T = TypeVar("T")
class PaginatedResponse(BaseModel, Generic[T]):
"""Стандартный формат пагинированного ответа"""
data: list[T]
pagination: dict
class Config:
json_schema_extra = {
"example": {
"data": [{"id": 1, "name": "Alice"}],
"pagination": {
"total": 100,
"page": 1,
"per_page": 20,
"total_pages": 5,
"has_next": True,
"has_prev": False
}
}
}
@app.get("/users", response_model=PaginatedResponse[User])
def list_users_paginated(page: int = 1, per_page: int = 20):
all_users = list(users_db.values())
total = len(all_users)
per_page = min(per_page, 100)
total_pages = (total + per_page - 1) // per_page
offset = (page - 1) * per_page
users = all_users[offset:offset + per_page]
return PaginatedResponse(
data=users,
pagination={
"total": total,
"page": page,
"per_page": per_page,
"total_pages": total_pages,
"has_next": page < total_pages,
"has_prev": page > 1
}
)| Критерий | REST | GraphQL | gRPC |
|---|---|---|---|
| Простота | ✅ Простой, понятный | ⚠️ Нужно изучать язык запросов | ⚠️ Требует protobuf |
| Гибкость запросов | ⚠️ Фиксированные ответы | ✅ Клиент выбирает поля | ❌ Фиксированные сообщения |
| Производительность | ⚠️ JSON, HTTP/1.1 | ⚠️ JSON, HTTP/1.1 | ✅ Бинарный, HTTP/2 |
| Кэширование | ✅ HTTP-кэш | ⚠️ Нужен слой кэширования | ❌ Сложно кэшировать |
| Стриминг | ⚠️ SSE, WebSockets | ⚠️ Подписки | ✅ Встроенный |
| Браузер | ✅ Нативная поддержка | ✅ Нативная поддержка | ⚠️ Требуется grpc-web |
| Codegen | ⚠️ OpenAPI генераторы | ⚠️ Есть генераторы | ✅ Официальные генераторы |
| Публичное API | ✅ Отлично | ✅ Хорошо | ❌ Не рекомендуется |
| Микросервисы | ⚠️ Приемлемо | ⚠️ Приемлемо | ✅ Отлично |
Задача: Спроектировать API для системы управления задачами (Task Manager)
Требования:
Дополнительно:
| Аспект | Рекомендация |
|---|---|
| Публичное API | REST с OpenAPI |
| Мобильные клиенты | GraphQL для гибкости |
| Внутренние сервисы | gRPC для производительности |
| Версионирование | URL versioning для простоты |
| Ошибки | Стандартизированный формат с request_id |
| Rate limiting | Token bucket с Redis |
| Пагинация | Cursor-based для больших данных |
Выбор инструмента зависит от контекста. REST — безопасный выбор по умолчанию. GraphQL — когда нужна гибкость. gRPC — когда важна производительность.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.