Strings, Hashes, Lists, Sets, Sorted Sets, Bitmaps, HyperLogLog, Streams, Geo
Redis — это не просто key-value хранилище. Это 11+ структур данных, каждая из которых решает конкретные задачи эффективнее, чем универсальные решения.
String — это не только текст. В Redis String может содержать:
import redis
client = redis.Redis(decode_responses=True)
# Установка и получение
client.set('user:1001:name', 'Alice')
name = client.get('user:1001:name') # 'Alice'
# Установка с TTL (атомарно)
client.setex('session:abc123', 3600, '{"user_id": 1001}')
# Получение TTL
ttl = client.ttl('session:abc123') # остаток секунд
# Атомарный инкремент
client.set('counter:visits', 0)
client.incr('counter:visits') # 1
client.incrby('counter:visits', 5) # 6
client.decr('counter:visits') # 5class MetricsCollector:
def __init__(self, redis_client):
self.redis = redis_client
def record_page_view(self, page_id: int):
"""Атомарно увеличиваем счётчик просмотров"""
self.redis.incr(f'metrics:page_views:{page_id}')
def record_unique_visitor(self, page_id: int, user_id: int):
"""Считаем уникальных посетителей через Bitmap"""
self.redis.setbit(f'metrics:unique:{page_id}', user_id, 1)
def get_page_views(self, page_id: int) -> int:
return int(self.redis.get(f'metrics:page_views:{page_id}') or 0)
def get_unique_visitors(self, page_id: int) -> int:
return self.redis.bitcount(f'metrics:unique:{page_id}')Hash — это словарь полей и значений. Идеален для хранения объектов.
| Критерий | Hash | JSON в String |
|---|---|---|
| Доступ к полю | HGET user:1 name — без загрузки всего объекта | Нужно получить весь JSON, распарсить |
| Изменение поля | HSET user:1 age 26 — атомарно | Read-modify-write с race condition |
| Инкремент поля | HINCRBY user:1 logins 1 | Ручная реализация с блокировками |
| Память | Эффективнее для мелких объектов | Накладные расходы на JSON |
# Создание/обновление полей
client.hset('user:1001', mapping={
'name': 'Alice',
'age': '25',
'email': 'alice@example.com',
'login_count': '0'
})
# Получение одного поля
name = client.hget('user:1001', 'name') # 'Alice'
# Получение всех полей (dict)
user = client.hgetall('user:1001')
# {'name': 'Alice', 'age': '25', 'email': 'alice@example.com', ...}
# Получение нескольких полей
name, email = client.hmget('user:1001', 'name', 'email')
# Инкремент поля
client.hincrby('user:1001', 'login_count', 1) # +1
# Удаление поля
client.hdel('user:1001', 'age')
# Проверка существования поля
exists = client.hexists('user:1001', 'name') # Truefrom typing import Optional, Dict, Any
import json
class UserProfile:
def __init__(self, redis_client):
self.redis = redis_client
self.key_prefix = 'user:profile:'
def create(self, user_id: int, data: Dict[str, Any]):
"""Создание профиля"""
key = f'{self.key_prefix}{user_id}'
# Hash требует строковых значений
str_data = {k: str(v) for k, v in data.items()}
self.redis.hset(key, mapping=str_data)
return key
def get(self, user_id: int) -> Optional[Dict[str, str]]:
"""Получение профиля"""
key = f'{self.key_prefix}{user_id}'
return self.redis.hgetall(key) or None
def update_field(self, user_id: int, field: str, value: Any):
"""Обновление одного поля (атомарно)"""
key = f'{self.key_prefix}{user_id}'
self.redis.hset(key, field, str(value))
def increment_login_count(self, user_id: int) -> int:
"""Атомарный инкремент счётчика входов"""
key = f'{self.key_prefix}{user_id}'
return self.redis.hincrby(key, 'login_count', 1)
def get_fields(self, user_id: int, *fields: str) -> Dict[str, Optional[str]]:
"""Получение конкретных полей"""
key = f'{self.key_prefix}{user_id}'
values = self.redis.hmget(key, *fields)
return dict(zip(fields, values))
def delete(self, user_id: int):
"""Удаление профиля"""
key = f'{self.key_prefix}{user_id}'
self.redis.delete(key)List — упорядоченный список строк. Реализован как linked list.
# LPUSH — push слева (в начало)
client.lpush('tasks', 'task3', 'task2', 'task1')
# ['task1', 'task2', 'task3']
# RPUSH — push справа (в конец)
client.rpush('queue:emails', 'email1@example.com')
# LPOP — pop слева
task = client.lpop('tasks') # 'task1'
# RPOP — pop справа
email = client.rpop('queue:emails')
# Длина списка
length = client.llen('tasks')
# Диапазон (0-based, -1 = последний)
all_tasks = client.lrange('tasks', 0, -1)
# Обрезать список (оставить элементы с 0 по 99)
client.ltrim('logs:recent', 0, 99)from typing import Optional, List
import json
class TaskQueue:
def __init__(self, redis_client, queue_name: str = 'tasks:default'):
self.redis = redis_client
self.queue_name = queue_name
def enqueue(self, task_data: dict, priority: str = 'normal'):
"""
Добавление задачи в очередь.
priority: 'high' (левый push), 'normal' (правый push)
"""
task_json = json.dumps(task_data)
if priority == 'high':
self.redis.lpush(self.queue_name, task_json)
else:
self.redis.rpush(self.queue_name, task_json)
def dequeue(self, timeout: int = 0) -> Optional[dict]:
"""
Получение задачи из очереди.
timeout=0 — блокировать пока не появится задача.
"""
# BLPOP возвращает (queue_name, value) или None
result = self.redis.blpop(self.queue_name, timeout=timeout)
if result:
_, task_json = result
return json.loads(task_json)
return None
def peek(self, count: int = 10) -> List[dict]:
"""Посмотреть первые N задач без удаления"""
tasks_json = self.redis.lrange(self.queue_name, 0, count - 1)
return [json.loads(t) for t in tasks_json]
def size(self) -> int:
"""Размер очереди"""
return self.redis.llen(self.queue_name)
def clear(self):
"""Очистить очередь"""
self.redis.delete(self.queue_name)
# Использование в FastAPI
class EmailService:
def __init__(self, redis_client):
self.queue = TaskQueue(redis_client, 'queue:emails')
async def send_email(self, to: str, subject: str, body: str):
"""Поставить email в очередь на отправку"""
self.queue.enqueue({
'to': to,
'subject': subject,
'body': body,
'created_at': datetime.utcnow().isoformat()
})# Воркер, обрабатывающий задачи
def worker():
redis_client = redis.Redis()
queue = TaskQueue(redis_client, 'tasks:default')
while True:
task = queue.dequeue(timeout=5) # Ждать 5 секунд
if task:
process_task(task)
else:
# Таймаут — можно сделать что-то ещё
continueSet — неупорядоченная коллекция уникальных элементов.
# Добавление элементов (уникальность гарантирована)
client.sadd('tags:article:123', 'python', 'fastapi', 'redis', 'python')
# Вернёт 3 (добавлено 3 уникальных, 'python' дубликат не добавлен)
# Все элементы
tags = client.smembers('tags:article:123') # {'python', 'fastapi', 'redis'}
# Проверка принадлежности
is_member = client.sismember('tags:article:123', 'python') # True
# Удаление элемента
client.srem('tags:article:123', 'redis')
# Количество элементов
count = client.scard('tags:article:123') # 2
# Случайный элемент
random_tag = client.srandmember('tags:article:123')
# Извлечь случайный элемент (удалить из множества)
random_tag = client.spop('tags:article:123')# Добавим ещё множества
client.sadd('tags:article:456', 'python', 'django', 'postgresql')
# Объединение
all_tags = client.sunion('tags:article:123', 'tags:article:456')
# {'python', 'fastapi', 'redis', 'django', 'postgresql'}
# Пересечение (общие элементы)
common = client.sinter('tags:article:123', 'tags:article:456')
# {'python'}
# Разность (в первом, но не во втором)
diff = client.sdiff('tags:article:123', 'tags:article:456')
# {'fastapi', 'redis'}
# Сохранение результата в новое множество
client.sunionstore('tags:all', 'tags:article:123', 'tags:article:456')class TagSystem:
def __init__(self, redis_client):
self.redis = redis_client
def add_tags(self, entity_type: str, entity_id: int, tags: List[str]):
"""Добавить теги к сущности"""
key = f'tags:{entity_type}:{entity_id}'
if tags:
self.redis.sadd(key, *tags)
def get_tags(self, entity_type: str, entity_id: int) -> set:
"""Получить все теги сущности"""
key = f'tags:{entity_type}:{entity_id}'
return self.redis.smembers(key) or set()
def remove_tag(self, entity_type: str, entity_id: int, tag: str):
"""Удалить тег"""
key = f'tags:{entity_type}:{entity_id}'
self.redis.srem(key, tag)
def find_by_tag(self, entity_type: str, tag: str) -> set:
"""Найти все сущности с данным тегом"""
# Обратный индекс: тег → сущности
key = f'tag_index:{tag}'
return self.redis.smembers(key) or set()
def get_common_tags(self, entity_type: str, *entity_ids: int) -> set:
"""Найти общие теги для нескольких сущностей"""
keys = [f'tags:{entity_type}:{eid}' for eid in entity_ids]
if len(keys) == 1:
return self.redis.smembers(keys[0]) or set()
return self.redis.sinter(*keys)Sorted Set (ZSet) — как Set, но каждый элемент имеет числовой score для сортировки.
# Добавление элементов с score
client.zadd('leaderboard:global', {'player:1': 1500, 'player:2': 2000, 'player:3': 1800})
# Обновление score
client.zincrby('leaderboard:global', 100, 'player:1') # +100 к score
# Ранг элемента (0-based, от меньшего к большему)
rank = client.zrank('leaderboard:global', 'player:1') # 0
# Обратный ранг (от большего к меньшему)
rank = client.zrevrank('leaderboard:global', 'player:2') # 0 (первый)
# Диапазон по рангам (топ-10)
top10 = client.zrevrange('leaderboard:global', 0, 9, withscores=True)
# [('player:2', 2000.0), ('player:3', 1800.0), ...]
# Диапазон по score
high_scorers = client.zrangebyscore('leaderboard:global', 1500, 2000)
# Количество элементов в диапазоне
count = client.zcount('leaderboard:global', 1500, 2000)
# Удаление элемента
client.zrem('leaderboard:global', 'player:1')
# Размер множества
size = client.zcard('leaderboard:global')from typing import List, Tuple
class Leaderboard:
def __init__(self, redis_client, name: str):
self.redis = redis_client
self.name = name
def add_player(self, player_id: str, score: int):
"""Добавить/обновить игрока в лидерборде"""
self.redis.zadd(self.name, {player_id: score})
def add_score(self, player_id: str, increment: int) -> int:
"""Добавить очки к текущему счёту (атомарно)"""
return int(self.redis.zincrby(self.name, increment, player_id))
def get_top(self, n: int = 10) -> List[Tuple[str, int]]:
"""Получить топ-N игроков"""
results = self.redis.zrevrange(self.name, 0, n - 1, withscores=True)
return [(player, int(score)) for player, score in results]
def get_player_rank(self, player_id: str) -> int:
"""Получить ранг игрока (1-based)"""
rank = self.redis.zrevrank(self.name, player_id)
return rank + 1 if rank is not None else None
def get_player_score(self, player_id: str) -> int:
"""Получить счёт игрока"""
score = self.redis.zscore(self.name, player_id)
return int(score) if score is not None else None
def get_players_around(self, player_id: str, count: int = 5) -> List[Tuple[str, int, int]]:
"""
Получить игроков вокруг указанного (для отображения в UI).
Возвращает [(player_id, score, rank), ...]
"""
rank = self.redis.zrevrank(self.name, player_id)
if rank is None:
return []
start = max(0, rank - count // 2)
end = start + count - 1
results = self.redis.zrevrange(self.name, start, end, withscores=True)
return [(p, int(s), start + i + 1) for i, (p, s) in enumerate(results)]
def remove_player(self, player_id: str):
"""Удалить игрока из лидерборда"""
self.redis.zrem(self.name, player_id)import time
from typing import Optional
class PriorityQueue:
def __init__(self, redis_client, queue_name: str):
self.redis = redis_client
self.queue_name = queue_name
def enqueue(self, task_id: str, priority: int, task_data: dict):
"""
Добавить задачу с приоритетом.
priority: чем меньше число, тем выше приоритет.
"""
task_json = json.dumps(task_data)
self.redis.zadd(self.queue_name, {f'{task_id}:{task_json}': priority})
def dequeue(self) -> Optional[dict]:
"""
Получить задачу с наивысшим приоритетом (наименьший score).
Атомарно: получаем и удаляем первый элемент.
"""
# ZPOPMIN возвращает [(member, score)] или пустой список
result = self.redis.zpopmin(self.queue_name, count=1)
if result:
member, _ = result[0]
# Извлекаем JSON из member (формат: "task_id:{json}")
task_json = member.split(':', 1)[1]
return json.loads(task_json)
return None
def enqueue_delayed(self, task_id: str, delay_seconds: int, task_data: dict):
"""
Добавить задачу с задержкой.
Score = timestamp выполнения.
"""
execute_at = int(time.time()) + delay_seconds
task_json = json.dumps(task_data)
self.redis.zadd(f'{self.queue_name}:delayed', {f'{task_id}:{task_json}': execute_at})
def process_delayed(self) -> int:
"""
Переместить готовые задачи из delayed очереди в основную.
Возвращает количество перемещённых задач.
"""
now = int(time.time())
# Задачи, которые должны выполниться сейчас
ready = self.redis.zrangebyscore(
f'{self.queue_name}:delayed',
'-inf',
now
)
if not ready:
return 0
# Перемещаем в основную очередь с приоритетом 0
pipe = self.redis.pipeline()
for task in ready:
pipe.zadd(self.queue_name, {task: 0})
pipe.zrem(f'{self.queue_name}:delayed', task)
pipe.execute()
return len(ready)Bitmap — это не отдельный тип, а использование String как битового массива.
# Установить бит в позицию 1001 в 1
client.setbit('users:online:2026-03-02', 1001, 1)
# Получить бит
is_online = client.getbit('users:online:2026-03-02', 1001) # 1 или 0
# Посчитать установленные биты
count = client.bitcount('users:online:2026-03-02')
# Побитовые операции
client.bitop('AND', 'result:key', 'key1', 'key2') # И
client.bitop('OR', 'result:key', 'key1', 'key2') # ИЛИ
client.bitop('XOR', 'result:key', 'key1', 'key2') # Исключающее ИЛИ
client.bitop('NOT', 'result:key', 'key1') # НЕfrom datetime import datetime, timedelta
class ActivityTracker:
def __init__(self, redis_client):
self.redis = redis_client
def _date_key(self, date: datetime) -> str:
return f'activity:{date.strftime("%Y-%m-%d")}'
def mark_active(self, user_id: int, date: datetime = None):
"""Отметить пользователя активным сегодня"""
if date is None:
date = datetime.utcnow()
key = self._date_key(date)
self.redis.setbit(key, user_id, 1)
def is_active(self, user_id: int, date: datetime) -> bool:
"""Проверить, был ли пользователь активен в дату"""
key = self._date_key(date)
return bool(self.redis.getbit(key, user_id))
def count_active(self, date: datetime) -> int:
"""Количество активных пользователей в дату"""
key = self._date_key(date)
return self.redis.bitcount(key)
def count_active_range(self, start: datetime, end: datetime) -> int:
"""Уникальные активные пользователи за период (OR битов)"""
keys = [self._date_key(start + timedelta(days=i))
for i in range((end - start).days + 1)]
if not keys:
return 0
# Временный ключ для результата
result_key = 'activity:temp:result'
self.redis.bitop('OR', result_key, *keys)
count = self.redis.bitcount(result_key)
self.redis.delete(result_key) # Очистка
return count
def get_dau_mau_ratio(self, date: datetime) -> float:
"""
DAU/MAU ratio — метрика вовлечённости.
DAU: Daily Active Users
MAU: Monthly Active Users
"""
dau = self.count_active(date)
# MAU: OR всех дней за месяц
month_keys = [self._date_key(date - timedelta(days=i)) for i in range(30)]
result_key = 'activity:temp:mau'
self.redis.bitop('OR', result_key, *month_keys)
mau = self.redis.bitcount(result_key)
self.redis.delete(result_key)
return dau / mau if mau > 0 else 0.0HyperLogLog — вероятностная структура для подсчёта cardinality (уникальных элементов).
| Метод | Память | Точность |
|---|---|---|
| Set (SMEMBERS) | O(N) | 100% |
| HyperLogLog | ~12 KB | ~99.19% (погрешность 0.81%) |
# Добавление элементов
client.pfadd('visitors:2026-03-02', 'user1', 'user2', 'user3')
# Подсчёт уникальных
count = client.pfcount('visitors:2026-03-02') # ~3
# Объединение нескольких HLL
client.pfmerge('visitors:2026-03', 'visitors:2026-03-01', 'visitors:2026-03-02')
total = client.pfcount('visitors:2026-03')class UniqueVisitorCounter:
def __init__(self, redis_client):
self.redis = redis_client
def record_visit(self, page_id: str, user_id: str):
"""Записать посещение страницы пользователем"""
key = f'uv:{page_id}:{datetime.utcnow().strftime("%Y-%m-%d")}'
self.redis.pfadd(key, user_id)
# TTL 90 дней
self.redis.expire(key, 90 * 24 * 3600)
def get_unique_count(self, page_id: str, date: datetime = None) -> int:
"""Получить количество уникальных посетителей"""
if date is None:
date = datetime.utcnow()
key = f'uv:{page_id}:{date.strftime("%Y-%m-%d")}'
return self.redis.pfcount(key)
def get_unique_count_range(self, page_id: str, start: datetime, end: datetime) -> int:
"""Уникальные посетители за период (объединение HLL)"""
keys = []
current = start
while current <= end:
keys.append(f'uv:{page_id}:{current.strftime("%Y-%m-%d")}')
current += timedelta(days=1)
if not keys:
return 0
# Проверяем существование ключей
existing_keys = [k for k in keys if self.redis.exists(k)]
if not existing_keys:
return 0
return self.redis.pfcount(*existing_keys)Stream — структура для хранения последовательности записей (как лог).
# Добавление записи (ID генерируется автоматически)
message_id = client.xadd('tasks:email', {'task_type': 'welcome', 'user_id': '1001'})
# '1709395200000-0'
# Добавление с явным ID
client.xadd('tasks:email', {'data': '...'}, id='1709395200000-0')
# Чтение последних N записей
messages = client.xrevrange('tasks:email', max='+', min='-', count=10)
# Чтение с указанного ID
messages = client.xrange('tasks:email', min='1709395200000-0', max='+')
# Длина потока
length = client.xlen('tasks:email')
# Удаление записей по ID
client.xdel('tasks:email', '1709395200000-0', '1709395200000-1')
# Обрезка потока (оставить последние 1000 записей)
client.xtrim('tasks:email', maxlen=1000)# Создание группы потребителей
client.xgroup_create('tasks:email', 'email-workers', id='0', mkstream=True)
# Чтение сообщений группой
messages = client.xreadgroup(
groupname='email-workers',
consumername='worker-1',
streams={'tasks:email': '>'}, # '>' = только новые сообщения
count=10,
block=5000 # Ждать 5 секунд
)
# Подтверждение обработки
client.xack('tasks:email', 'email-workers', '1709395200000-0')
# Проверка pending сообщений
pending = client.xpending('tasks:email', 'email-workers')
# Получение деталей pending
pending_details = client.xpending_range(
'tasks:email',
'email-workers',
min='-',
max='+',
count=10
)Подробно о Streams и consumer groups — в теме 9.
Geo — команды для работы с географическими координатами.
# Добавление местоположений
client.geoadd('cities', (55.7558, 37.6173, 'Moscow')) # lon, lat, name
client.geoadd('cities', (59.9343, 30.3351, 'Saint Petersburg'))
# Расстояние между точками (в метрах)
dist = client.geodist('cities', 'Moscow', 'Saint Petersburg') # ~635000 м
# Поиск рядом
nearby = client.georadius('cities', 55.7558, 37.6173, radius=100, unit='km')
# ['Moscow']
# Поиск рядом с местом из базы
nearby = client.georadiusbymember('cities', 'Moscow', radius=500, unit='km')
# Получение координат
coords = client.geopos('cities', 'Moscow', 'Saint Petersburg')
# [(37.6173, 55.7558), (30.3351, 59.9343)]
# GeoHash
hashes = client.geohash('cities', 'Moscow')
# ['9q16p0b5j']from typing import List, Tuple
class CourierLocator:
def __init__(self, redis_client):
self.redis = redis_client
self.key = 'couriers:locations'
def update_location(self, courier_id: str, lon: float, lat: float):
"""Обновить местоположение курьера"""
self.redis.geoadd(self.key, (lon, lat, courier_id))
# TTL 1 час (если курьер не обновился — считаем офлайн)
self.redis.expire(self.key, 3600)
def find_nearby(self, lon: float, lat: float, radius_km: float = 5) -> List[Tuple[str, float]]:
"""
Найти ближайших курьеров.
Возвращает [(courier_id, distance_meters), ...]
"""
results = self.redis.georadius(
self.key,
lon,
lat,
radius=radius_km,
unit='km',
withdist=True,
count=10
)
return [(courier_id, dist) for courier_id, dist in results]
def get_courier_location(self, courier_id: str) -> Tuple[float, float]:
"""Получить координаты курьера"""
coords = self.redis.geopos(self.key, courier_id)
if coords and coords[0]:
return coords[0] # (lon, lat)
return None
def remove_courier(self, courier_id: str):
"""Удалить курьера (завершил смену)"""
self.redis.zrem(self.key, courier_id)| Структура | Use case | Сложность операций |
|---|---|---|
| String | Кэш, счётчики, сессии | O(1) |
| Hash | Объекты, профили | O(1) на поле |
| List | Очереди, ленты | O(1) push/pop |
| Set | Теги, уникальные элементы | O(1) |
| Sorted Set | Рейтинги, приоритеты | O(log N) |
| Bitmap | Активность, флаги | O(1) |
| HyperLogLog | Уникальные посетители | O(1), погрешность 0.81% |
| Stream | Логи, очереди задач | O(1) добавление |
| Geo | Геолокация | O(log N) |
Проверьте понимание → ответьте на вопросы в data_structures.json
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.