Токены доступа, refresh, blacklist, хранение паролей
JWT (JSON Web Tokens) — стандарт для безопасной аутентификации в API. В этой теме вы научитесь реализовывать вход по токену, refresh-токены, blacklist и защиту эндпоинтов.
JWT (JSON Web Token) — компактный способ безопасной передачи информации между сторонами в виде JSON-объекта.
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiZXhwIjoxNTE2MjM5MDIyfQ.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
Токен состоит из трёх частей:
pip install python-jose[cryptography] passlib[bcrypt]python-jose — работа с JWTpasslib[bcrypt] — хеширование паролейfrom datetime import datetime, timedelta
from typing import Optional
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
app = FastAPI()
# === Конфигурация ===
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# === Хеш паролей ===
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
# === Модели ===
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
hashed_password: str
# === Имитация БД ===
fake_users_db = {
"john": {
"username": "john",
"full_name": "John Doe",
"email": "john@example.com",
"hashed_password": get_password_hash("secret"),
"disabled": False,
}
}
def get_user(db, username: str):
if username in db:
user_data = db[username]
return User(**user_data)
def authenticate_user(db, username: str, password: str):
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# === JWT функции ===
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# === OAuth2 схема ===
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# === Endpoints ===
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": 1, "owner": current_user.username}]/docs# Получить токен
curl -X POST 'http://localhost:8000/token' \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'username=john&password=secret'
# Использовать токен
curl 'http://localhost:8000/users/me' \
-H 'Authorization: Bearer <token>'import requests
# Получить токен
response = requests.post(
'http://localhost:8000/token',
data={'username': 'john', 'password': 'secret'}
)
token = response.json()['access_token']
# Использовать токен
response = requests.get(
'http://localhost:8000/users/me',
headers={'Authorization': f'Bearer {token}'}
)
print(response.json())Access токены живут недолго (15-30 минут). Refresh токены живут дольше (дни/недели) и используются для получения новых access токенов.
from datetime import datetime, timedelta
from jose import jwt, JWTError
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from typing import Optional
import uuid
app = FastAPI()
SECRET_KEY = "secret"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
# Хранилище refresh токенов (в реальности — Redis/БД)
refresh_tokens_db = {}
class TokenPair(BaseModel):
access_token: str
refresh_token: str
token_type: str
def create_access_token(username: str):
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
return jwt.encode(
{"sub": username, "exp": expire, "type": "access"},
SECRET_KEY,
algorithm=ALGORITHM
)
def create_refresh_token(username: str):
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
token = str(uuid.uuid4())
# Сохраняем токен в БД
refresh_tokens_db[token] = {
"username": username,
"exp": expire
}
return token
def verify_refresh_token(token: str) -> Optional[str]:
if token not in refresh_tokens_db:
return None
token_data = refresh_tokens_db[token]
if datetime.utcnow() > token_data["exp"]:
del refresh_tokens_db[token]
return None
return token_data["username"]
def revoke_refresh_token(token: str):
if token in refresh_tokens_db:
del refresh_tokens_db[token]
@app.post("/auth/login", response_model=TokenPair)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=401,
detail="Incorrect username or password"
)
access_token = create_access_token(user.username)
refresh_token = create_refresh_token(user.username)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
@app.post("/auth/refresh")
async def refresh_token(refresh_token: str):
username = verify_refresh_token(refresh_token)
if not username:
raise HTTPException(
status_code=401,
detail="Invalid or expired refresh token"
)
# Аннулируем старый refresh токен (rotation)
revoke_refresh_token(refresh_token)
# Создаём новую пару
access_token = create_access_token(username)
new_refresh_token = create_refresh_token(username)
return {
"access_token": access_token,
"refresh_token": new_refresh_token,
"token_type": "bearer"
}
@app.post("/auth/logout")
async def logout(refresh_token: str):
revoke_refresh_token(refresh_token)
return {"message": "Logged out"}При каждом использовании refresh токена:
Если украденный токен используется злоумышленником, легитимный пользователь не сможет получить новый токен — это сигнал о компрометации.
Для принудительного отзыва access токенов используйте blacklist:
from datetime import datetime
from typing import Set
# Хранилище заблокированных токенов (в реальности — Redis с TTL)
token_blacklist: Set[str] = set()
def is_token_blacklisted(token: str) -> bool:
return token in token_blacklist
def blacklist_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
exp = payload.get("exp")
# В реальности: Redis SETEX token exp ""
token_blacklist.add(token)
except JWTError:
pass
async def get_current_user(token: str = Depends(oauth2_scheme)):
if is_token_blacklisted(token):
raise HTTPException(
status_code=401,
detail="Token has been revoked"
)
# Остальная логика проверки...| Способ | Безопасность | Когда использовать |
|---|---|---|
| httpOnly cookie | Высокая | Веб-приложения (защита от XSS) |
| Secure storage | Средняя | Mobile apps (Keychain/Keystore) |
| Memory | Средняя | SPA (токен теряется при обновлении) |
| localStorage | Низкая | Не рекомендуется (уязвимо для XSS) |
from fastapi.responses import JSONResponse
@app.post("/auth/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = create_access_token(user.username)
refresh_token = create_refresh_token(user.username)
response = JSONResponse(content={
"access_token": access_token,
"token_type": "bearer"
})
# Refresh токен в httpOnly cookie
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True, # Не доступно через JavaScript
secure=True, # Только HTTPS
samesite="lax",
max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
)
return responsefrom datetime import datetime, timedelta
from fastapi import FastAPI, Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from typing import Optional
import uuid
from database import get_db
from models import User as DBUser
# === Конфигурация ===
SECRET_KEY = "change-this-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
# === Модели ===
class UserCreate(BaseModel):
username: str
email: EmailStr
password: str
class UserResponse(BaseModel):
id: int
username: str
email: str
class Config:
from_attributes = True
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str
class TokenRefresh(BaseModel):
refresh_token: str
# === JWT функции ===
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(username: str, expires_delta: timedelta):
expire = datetime.utcnow() + expires_delta
return jwt.encode(
{"sub": username, "exp": expire, "type": "access"},
SECRET_KEY,
algorithm=ALGORITHM
)
def create_refresh_token(username: str, db: Session):
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
token = str(uuid.uuid4())
# Сохраняем в БД
db_token = DBRefreshToken(
token=token,
username=username,
expires_at=expire
)
db.add(db_token)
db.commit()
return token
# === Зависимости ===
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
token_type: str = payload.get("type")
if username is None or token_type != "access":
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(DBUser).filter(DBUser.username == username).first()
if user is None:
raise credentials_exception
return user
# === Endpoints ===
app = FastAPI()
@app.post("/auth/register", response_model=UserResponse, status_code=201)
def register(user: UserCreate, db: Session = Depends(get_db)):
# Проверка существующего пользователя
existing = db.query(DBUser).filter(
(DBUser.username == user.username) |
(DBUser.email == user.email)
).first()
if existing:
raise HTTPException(
status_code=409,
detail="Username or email already registered"
)
# Создание пользователя
hashed_password = pwd_context.hash(user.password)
db_user = DBUser(
username=user.username,
email=user.email,
hashed_password=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.post("/auth/login", response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(DBUser).filter(DBUser.username == form_data.username).first()
if not user or not pwd_context.verify(form_data.password, user.hashed_password):
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(
user.username,
timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
refresh_token = create_refresh_token(user.username, db)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
@app.post("/auth/refresh", response_model=Token)
def refresh(token_data: TokenRefresh, db: Session = Depends(get_db)):
# Проверка refresh токена
db_token = db.query(DBRefreshToken).filter(
DBRefreshToken.token == token_data.refresh_token
).first()
if not db_token or db_token.expires_at < datetime.utcnow():
raise HTTPException(
status_code=401,
detail="Invalid or expired refresh token"
)
# Rotation: удаляем старый, создаём новый
db.delete(db_token)
access_token = create_access_token(
db_token.username,
timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
new_refresh_token = create_refresh_token(db_token.username, db)
return {
"access_token": access_token,
"refresh_token": new_refresh_token,
"token_type": "bearer"
}
@app.post("/auth/logout")
def logout(
request: Request,
current_user: DBUser = Depends(get_current_user),
db: Session = Depends(get_db)
):
# Удаляем все refresh токены пользователя
db.query(DBRefreshToken).filter(
DBRefreshToken.username == current_user.username
).delete()
db.commit()
return {"message": "Logged out successfully"}
@app.get("/auth/me", response_model=UserResponse)
def get_me(current_user: DBUser = Depends(get_current_user)):
return current_userSECRET_KEY = "secret" # Плохо!Решение: Используйте криптографически стойкий ключ:
import secrets
SECRET_KEY = secrets.token_urlsafe(32)ACCESS_TOKEN_EXPIRE_MINUTES = 525600 # 1 год - плохо!Решение: Access токены должны жить 15-30 минут. Для долгой сессии используйте refresh токены.
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# Забыли проверить exp!Решение: jwt.decode() автоматически проверяет exp. Если токен истёк, выбросит ExpiredSignatureError.
jwt.encode(), jwt.decode()В следующей теме вы изучите OAuth2 — интеграцию с провайдерами (Google, GitHub, Facebook).
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.