Библиотека redis-py, синхронные и асинхронные клиенты, пулы соединений
redis-py— официальная библиотека для работы с Redis. В версии 5.x она предоставляет как синхронный, так и полностью асинхронный интерфейс, что критически важно для FastAPI.
pip install redis
# или
poetry add redisДля работы с URL-подключениями и кластером:
pip install redis[hiredis]
# hiredis — C-библиотека для ускорения парсинга протокола Redis (до 10x быстрее)import redis
# Простое подключение
client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True, # Возвращать str вместо bytes
)
# Проверка
client.ping() # Trueimport redis
client = redis.Redis(
host='redis.internal',
port=6379,
db=0,
password='SuperSecretPassword123!', # Никогда не хардкодить! Использовать env
ssl=True, # TLS для production
ssl_cert_reqs=None, # Для самоподписанных сертификатов
# Таймауты
socket_timeout=5.0, # Таймаут операций
socket_connect_timeout=5.0, # Таймаут подключения
socket_keepalive=True, # TCP keepalive
# Пул соединений
max_connections=50, # Максимум соединений в пуле
# Декодирование
decode_responses=True,
encoding='utf-8',
# Retry logic
retry_on_timeout=True,
health_check_interval=30, # Проверка соединения каждые 30 сек
)import redis
from redis.exceptions import (
RedisError,
ConnectionError,
TimeoutError,
ResponseError,
BusyLoadingError,
)
def safe_redis_operation():
try:
client.set('key', 'value')
except ConnectionError as e:
# Потеряно соединение с Redis
print(f"Connection error: {e}")
raise
except TimeoutError as e:
# Таймаут операции
print(f"Timeout: {e}")
raise
except ResponseError as e:
# Ошибка на стороне Redis (например, неверная команда)
print(f"Response error: {e}")
raise
except BusyLoadingError as e:
# Redis загружает данные после рестарта
print(f"Redis is loading: {e}")
raise
except RedisError as e:
# Базовый класс для всех ошибок Redis
print(f"Redis error: {e}")
raiseimport redis.asyncio as redis
# Создание клиента
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True,
socket_timeout=5.0,
socket_connect_timeout=5.0,
)
# Использование в async контексте
async def example():
await redis_client.set('user:1001:name', 'Alice')
name = await redis_client.get('user:1001:name')
print(name) # 'Alice'
# Закрытие соединения
await redis_client.close()import redis.asyncio as redis
async def with_redis():
async with redis.Redis(
host='localhost',
port=6379,
decode_responses=True,
) as client:
await client.set('key', 'value')
value = await client.get('key')
print(value)
# Соединение автоматически закрываетсяСоздание нового TCP-соединения — дорогая операция. Пул соединений переиспользует существующие подключения, что значительно ускоряет работу.
import redis
# redis-py автоматически создаёт пул при первом подключении
# Но можно настроить явно:
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50, # Максимум соединений в пуле
decode_responses=True,
)
# Несколько клиентов могут использовать один пул
client1 = redis.Redis(connection_pool=pool)
client2 = redis.Redis(connection_pool=pool)import redis.asyncio as redis
pool = redis.asyncio.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50,
decode_responses=True,
)
async_client = redis.Redis(connection_pool=pool)Pipeline позволяет отправить несколько команд за один сетевой вызов, что значительно уменьшает latency.
import redis
client = redis.Redis(decode_responses=True)
# Без pipeline: 3 сетевых вызова
client.set('user:1:name', 'Alice')
client.set('user:1:email', 'alice@example.com')
client.set('user:1:age', '25')
# С pipeline: 1 сетевой вызов
pipe = client.pipeline()
pipe.set('user:1:name', 'Alice')
pipe.set('user:1:email', 'alice@example.com')
pipe.set('user:1:age', '25')
results = pipe.execute() # [True, True, True]# MULTI/EXEC обеспечивают атомарность
pipe = client.pipeline(transaction=True)
pipe.set('counter', 0)
pipe.incr('counter')
pipe.incr('counter')
results = pipe.execute() # [True, 1, 2] — атомарноimport redis.asyncio as redis
async def batch_update():
async with redis.Redis(decode_responses=True) as client:
async with client.pipeline(transaction=True) as pipe:
await pipe.set('user:1:name', 'Alice')
await pipe.set('user:1:email', 'alice@example.com')
await pipe.incrby('user:1:login_count', 1)
results = await pipe.execute()
return resultsimport redis
client = redis.Redis(decode_responses=True)
# Публикация сообщения
client.publish('notifications', 'New order #123!')
client.publish('chat:room:1', 'Hello from Alice!')import redis
client = redis.Redis(decode_responses=True)
pubsub = client.pubsub()
# Подписка на каналы
pubsub.subscribe('notifications')
pubsub.subscribe('chat:room:1')
# Подписка по паттерну
pubsub.psubscribe('chat:*') # Все каналы, начинающиеся с 'chat:'
# Получение сообщений
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Channel: {message['channel']}, Data: {message['data']}")import redis
class MessageHandler:
def __init__(self, redis_client):
self.client = redis_client
self.pubsub = self.client.pubsub()
def subscribe(self, *channels):
self.pubsub.subscribe(*channels)
def handle_message(self, message):
if message['type'] == 'message':
print(f"Received: {message['data']}")
def listen(self):
for message in self.pubsub.listen():
if message['type'] == 'message':
self.handle_message(message)
elif message['type'] == 'subscribe':
print(f"Subscribed to {message['channel']}")import redis.asyncio as redis
async def async_subscriber():
client = redis.Redis(decode_responses=True)
pubsub = client.pubsub()
await pubsub.subscribe('notifications')
async for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
await client.close()Lua скрипты выполняются атомарно на стороне Redis.
import redis
client = redis.Redis(decode_responses=True)
# Lua скрипт для атомарного получения и удаления
lua_script = """
local value = redis.call('GET', KEYS[1])
if value then
redis.call('DEL', KEYS[1])
end
return value
"""
# Регистрация скрипта
get_and_delete = client.register_script(lua_script)
# Использование
result = get_and_delete(keys=['temp:key'])import redis.asyncio as redis
async def async_lua():
client = redis.Redis(decode_responses=True)
lua_script = """
local current = redis.call('GET', KEYS[1])
if current then
return tonumber(current) + ARGV[1]
end
return ARGV[1]
"""
increment = client.register_script(lua_script)
result = await increment(keys=['counter'], args=[5])
print(result)
await client.close()import redis
# Формат: redis://[:password]@host:port/db
client = redis.from_url(
'redis://localhost:6379/0',
decode_responses=True,
)
# С паролем
client = redis.from_url(
'redis://:mypassword@localhost:6379/0',
decode_responses=True,
)
# TLS
client = redis.from_url(
'rediss://:mypassword@redis.internal:6379/0',
decode_responses=True,
)
# Async
import redis.asyncio as redis
client = redis.from_url(
'redis://localhost:6379/0',
decode_responses=True,
)import os
import redis
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
client = redis.from_url(REDIS_URL, decode_responses=True)import redis
import pickle
client = redis.Redis(decode_responses=True)
# Сериализация
data = {'name': 'Alice', 'age': 25}
client.set('user:1', pickle.dumps(data))
# Десериализация
data = pickle.loads(client.get('user:1'))import redis
import json
client = redis.Redis(decode_responses=True)
# Сериализация
data = {'name': 'Alice', 'age': 25}
client.set('user:1', json.dumps(data))
# Десериализация
data = json.loads(client.get('user:1'))import redis
import json
from datetime import datetime
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
class JSONEncoder:
def encode(self, value):
return json.dumps(value, cls=DateTimeEncoder)
def decode(self, value):
if value is None:
return None
return json.loads(value)
client = redis.Redis(
host='localhost',
port=6379,
encoding='utf-8',
)
encoder = JSONEncoder()
# Использование
data = {'created_at': datetime.utcnow()}
client.set('event:1', encoder.encode(data))
data = encoder.decode(client.get('event:1'))# ✅ Хорошо: переиспользование пула
pool = redis.ConnectionPool(
host='redis.internal',
port=6379,
max_connections=50,
decode_responses=True,
)
def get_redis():
return redis.Redis(connection_pool=pool)
# ❌ Плохо: создание нового подключения каждый раз
def get_redis_bad():
return redis.Redis(host='localhost', port=6379)# ✅ Хорошо: таймауты защищены от зависаний
client = redis.Redis(
socket_timeout=5.0,
socket_connect_timeout=5.0,
socket_keepalive=True,
)
# ❌ Плохо: нет таймаутов, может зависнуть навсегда
client = redis.Redis()from redis.retry import Retry
from redis.backoff import ExponentialBackoff
from redis.exceptions import ConnectionError, TimeoutError
retry = Retry(
ExponentialBackoff(cap=10, base=1), # Экспоненциальная задержка
3 # 3 попытки
)
client = redis.Redis(
host='redis.internal',
retry=retry,
retry_on_error=[ConnectionError, TimeoutError],
)# Регулярная проверка здоровья соединений
client = redis.Redis(
health_check_interval=30, # Проверка каждые 30 секунд
)# ✅ Для FastAPI всегда используем async клиент
import redis.asyncio as redis
redis_client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True,
socket_timeout=5.0,
)
# В dependency FastAPI
async def get_redis():
try:
yield redis_client
finally:
pass # Не закрываем, пул переиспользуетсяimport asyncio
import redis.asyncio as redis
class RedisManager:
def __init__(self, url: str):
self.url = url
self.client = None
async def connect(self):
self.client = redis.from_url(
self.url,
decode_responses=True,
socket_timeout=5.0,
)
await self.client.ping()
async def disconnect(self):
if self.client:
await self.client.close()
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()
# Использование
async def main():
async with RedisManager('redis://localhost:6379/0') as redis_mgr:
await redis_mgr.client.set('key', 'value')| Аспект | Синхронный | Асинхронный |
|---|---|---|
| Модуль | redis | redis.asyncio |
| Блокировка | Блокирует event loop | Не блокирует |
| Использование | Синхронный код, воркеры | FastAPI, async приложения |
| Pipeline | client.pipeline() | async with client.pipeline() |
| Pub/Sub | pubsub.listen() | async for message in pubsub.listen() |
| Методы | client.get() | await client.get() |
Проверьте понимание → ответьте на вопросы ниже
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.