Распределённые таблицы, шардирование, кластеры, distributed-движок, глобальные JOIN, балансировка
Кластеры, Distributed-движок, балансировка нагрузки и глобальные JOIN
Шардирование — горизонтальное разделение данных между несколькими серверами (шардами).
Клиент
↓
┌────────────────────────────────────────────┐
│ Distributed-таблица │
│ (логическое представление) │
└────────────────────────────────────────────┘
↓ ↓ ↓
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Шард 1 │ │ Шард 2 │ │ Шард 3 │
│ [1/3] │ │ [2/3] │ │ [3/3] │
└─────────┘ └─────────┘ └─────────┘
Преимущества:
Недостатки:
<clickhouse>
<remote_servers>
<cluster_default>
<!-- Шард 1 с 2 репликами -->
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>clickhouse-01-1</host>
<port>9000</port>
</replica>
<replica>
<host>clickhouse-01-2</host>
<port>9000</port>
</replica>
</shard>
<!-- Шард 2 с 2 репликами -->
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>clickhouse-02-1</host>
<port>9000</port>
</replica>
<replica>
<host>clickhouse-02-2</host>
<port>9000</port>
</replica>
</shard>
<!-- Шард 3 с 2 репликами -->
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>clickhouse-03-1</host>
<port>9000</port>
</replica>
<replica>
<host>clickhouse-03-2</host>
<port>9000</port>
</replica>
</shard>
</cluster_default>
</remote_servers>
</clickhouse>Параметры:
internal_replication — true для репликации внутри ClickHouseshard — группа серверов с одинаковыми даннымиreplica — отдельный сервер в шарде<clickhouse>
<macros>
<!-- Уникально для каждого узла -->
<cluster>cluster_default</cluster>
<shard>01</shard>
<replica>01</replica>
</macros>
</clickhouse>-- Локальная таблица на каждом шарде
CREATE TABLE events_local ON CLUSTER cluster_default
(
event_time DateTime,
user_id UInt64,
event_type String,
value Decimal(10, 2)
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/events',
'{replica}'
)
ORDER BY (event_time, user_id);
-- Distributed-таблица (логическое представление)
CREATE TABLE events_all ON CLUSTER cluster_default
(
event_time DateTime,
user_id UInt64,
event_type String,
value Decimal(10, 2)
)
ENGINE = Distributed(
cluster_default, -- Имя кластера
default, -- База данных
events_local, -- Локальная таблица
rand() -- Шардирующее выражение
);Параметры Distributed:
Варианты:
-- Случайное распределение
ENGINE = Distributed(cluster_default, default, events_local, rand());
-- По user_id (равномерное распределение)
ENGINE = Distributed(cluster_default, default, events_local, user_id);
-- По хэшу (более равномерное)
ENGINE = Distributed(cluster_default, default, events_local, sipHash64(user_id));
-- Без ключа (все данные на всех шардах — для репликации)
ENGINE = Distributed(cluster_default, default, events_local);Рекомендации:
rand() — для равномерного распределения без логикиuser_id — для локальности данных пользователяsipHash64() — для равномерного хэширования-- Вставка распределяется по шардам автоматически
INSERT INTO events_all (event_time, user_id, event_type, value)
VALUES
(now(), 1, 'click', 1.0),
(now(), 2, 'view', 2.0),
(now(), 3, 'purchase', 100.0);
-- Каждая строка попадает на свой шард на основе sharding_keyКак работает:
-- Подключение к конкретному шарду
INSERT INTO events_local ON CLUSTER cluster_default
SELECT * FROM source_data;
-- Или напрямую через клиент
clickhouse-client --host clickhouse-01-1
INSERT INTO events_local VALUES (...);Преимущества:
Недостатки:
-- Запрос выполняется на всех шардах параллельно
SELECT
user_id,
count() AS events
FROM events_all
WHERE event_time >= '2026-03-01'
GROUP BY user_id;План выполнения:
-- Запрос с фильтром по sharding_key
SELECT * FROM events_all
WHERE user_id = 123;
-- ClickHouse может оптимизировать и отправить
-- запрос только на один шард (если sharding_key = user_id)-- Ограничение с балансировкой
SELECT * FROM events_all
ORDER BY event_time DESC
LIMIT 100 WITH TIES;-- JOIN между Distributed-таблицами
SELECT
e.user_id,
e.event_type,
u.country
FROM events_all AS e
JOIN users_all AS u ON e.user_id = u.id;Проблема: Данные могут быть на разных шардах.
1. GLOBAL JOIN (данные рассылаются по шардам):
SELECT
e.user_id,
e.event_type,
u.country
FROM events_all AS e
GLOBAL JOIN users_all AS u ON e.user_id = u.id;Как работает:
2. JOIN с локальными таблицами:
-- users_local на каждом шарде
SELECT
e.user_id,
e.event_type,
u.country
FROM events_all AS e
JOIN users_local AS u ON e.user_id = u.id;Преимущества:
Требование: Таблица users должна быть продублирована на всех шардах.
3. Словари вместо JOIN:
-- Создание словаря
CREATE DICTIONARY users_dict ON CLUSTER cluster_default
(
user_id UInt64,
country String
)
PRIMARY KEY user_id
SOURCE(CLICKHOUSE(
HOST 'localhost'
PORT 9000
DB 'default'
TABLE 'users_local'
))
LAYOUT(HASHED())
LIFETIME(MIN 300 MAX 360);
-- Использование в запросе
SELECT
user_id,
event_type,
dictGet('users_dict', 'country', user_id) AS country
FROM events_all;-- Вставка через Distributed с rand()
-- Данные равномерно распределяются по шардам
INSERT INTO events_all
SELECT now(), user_id, 'event', value
FROM source;1. round-robin балансировка:
<!-- users.xml -->
<profiles>
<default>
<load_balancing>round_robin</load_balancing>
</default>
</profiles>2. nearest_hostname:
<profiles>
<default>
<load_balancing>nearest_hostname</load_balancing>
</default>
</profiles>3. in_order:
<profiles>
<default>
<load_balancing>in_order</load_balancing>
</default>
</profiles>-- Для сессии
SET load_balancing = 'round_robin';
SET prefer_localhost_replica = 1;
-- Проверка настроек
SELECT name, value
FROM system.settings
WHERE name LIKE 'load_balancing%';SELECT
cluster,
shard_num,
replica_num,
host_name,
host_address,
port,
is_local
FROM system.clusters
WHERE cluster = 'cluster_default';-- Статус выполнения DDL на кластере
SELECT
query,
host,
shard_num,
replica_num,
status,
exception_code,
query_finish_time
FROM system.distributed_ddl_queue
ORDER BY entry DESC
LIMIT 20;-- Количество строк по шардам
SELECT
_shard_num,
count() AS rows
FROM events_all
GROUP BY _shard_num
ORDER BY _shard_num;
-- Неравномерное распределение (skew)
SELECT
_shard_num,
count() AS rows,
count() * 100.0 / sum(count()) OVER () AS pct
FROM events_all
GROUP BY _shard_num;Хорошо:
-- Равномерное распределение
ENGINE = Distributed(cluster, default, events_local, sipHash64(user_id));
-- Локальность данных пользователя
ENGINE = Distributed(cluster, default, events_local, user_id);Плохо:
-- Все данные на одном шарде
ENGINE = Distributed(cluster, default, events_local, 1);
-- Неравномерное распределение
ENGINE = Distributed(cluster, default, events_local, toYYYYMM(event_time));-- users_local на каждом шарде
CREATE TABLE users_local ON CLUSTER cluster_default
(
user_id UInt64,
country String
)
ENGINE = ReplicatedMergeTree(...)
ORDER BY user_id;
-- GLOBAL JOIN не нужен
SELECT e.*, u.country
FROM events_all e
JOIN users_local u ON e.user_id = u.id;-- Вместо GLOBAL JOIN используйте словари
SELECT
user_id,
dictGet('users_dict', 'country', user_id) AS country
FROM events_all;-- Проверка равномерности распределения
SELECT
_shard_num,
count() AS rows,
round(count() * 100.0 / sum(count()) OVER (), 2) AS pct
FROM events_all
GROUP BY _shard_num
ORDER BY _shard_num;
-- Если pct сильно отличается → пересмотреть shard key-- Плохо: все данные на координаторе
SELECT * FROM events_all WHERE ...;
-- Хорошо: агрегация на шардах
SELECT
shard_id,
sum(amount) AS shard_total
FROM events_all
GROUP BY shard_id;Изучим оптимизацию запросов: EXPLAIN, анализ планов выполнения, устранение узких мест.
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.