Citus源码(1)分布式表行为测试
- 最近对citus的实现非常好奇,本篇对citus的行为做一些测试。
- 本篇只测行为,不分析源码。后面会继续写一系列文章分析citus源码。
环境:3节点 PG17 with citus。
SELECT citus_set_coordinator_host('127.0.0.1', 3001);
SELECT citus_add_node('127.0.0.1', 3002);
SELECT citus_add_node('127.0.0.1', 3003);
SELECT rebalance_table_shards();
SELECT * from pg_dist_node;
postgres=# SELECT * from pg_dist_node;
+--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------+
| nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards |
+--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------+
| 1 | 0 | 10.0.0.1 | 3002 | default | t | t | primary | default | t | f |
| 7 | 6 | 127.0.0.1 | 3002 | default | t | t | primary | default | t | t |
| 8 | 7 | 127.0.0.1 | 3003 | default | t | t | primary | default | t | t |
+--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------+
(3 rows)
case1:分布式表
- 注意表只能在cn建,dn建不能做create_distributed_table操作。
- dn上表能建,但cn上create_distributed_table时会报错,dn上已经有同名包。
结论:shard在dn上是暴漏在外面的,可以直接查询,和cn上查询效果一样。
CREATE TABLE events (
device_id bigint,
event_id bigserial,
event_time timestamptz default now(),
data jsonb not null,
PRIMARY KEY (device_id, event_id)
);
SELECT create_distributed_table('events', 'device_id');
INSERT INTO events (device_id, data) SELECT s % 100, ('{"measurement":'||random()||'}')::jsonb FROM generate_series(1,1000000) s;
- device_id = 1在3003上,device_id = 2在3002上,说明数据还是分片存储的,分片策略现在还不清楚,后面看源码。
- 无论在哪个节点上,只要查询分布式表都能生成分布式计划。应该是会统一路由到cn上。具体怎么路由的需要看源码。
postgres=# EXPLAIN ANALYZE SELECT * FROM events WHERE device_id = 1 ORDER BY event_time DESC, event_id DESC LIMIT 3;
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| QUERY PLAN |
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) (actual time=13.756..13.758 rows=3 loops=1) |
| Task Count: 1 |
| Tuple data received from nodes: 181 bytes |
| Tasks Shown: All |
| -> Task |
| Tuple data received from node: 181 bytes |
| Node: host=127.0.0.1 port=3003 dbname=postgres |
| -> Limit (cost=1071.55..1071.56 rows=3 width=63) (actual time=11.164..11.166 rows=3 loops=1) |
| -> Sort (cost=1071.55..1096.36 rows=9925 width=63) (actual time=11.162..11.163 rows=3 loops=1) |
| Sort Key: event_time DESC, event_id DESC |
| Sort Method: top-N heapsort Memory: 26kB |
| -> Bitmap Heap Scan on events_102204 events (cost=237.21..943.27 rows=9925 width=63) (actual time=1.900..6.872 rows=10000 loops=1) |
| Recheck Cond: (device_id = 1) |
| Heap Blocks: exact=582 |
| -> Bitmap Index Scan on events_pkey_102204 (cost=0.00..234.73 rows=9925 width=0) (actual time=1.729..1.730 rows=10000 loops=1) |
| Index Cond: (device_id = 1) |
| Planning Time: 0.121 ms |
| Execution Time: 11.236 ms |
| Planning Time: 0.129 ms |
| Execution Time: 13.973 ms |
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
(20 rows)
postgres=# EXPLAIN ANALYZE SELECT * FROM events WHERE device_id = 2 ORDER BY event_time DESC, event_id DESC LIMIT 3;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| QUERY PLAN |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) (actual time=12.512..12.514 rows=3 loops=1) |
| Task Count: 1 |
| Tuple data received from nodes: 181 bytes |
| Tasks Shown: All |
| -> Task |
| Tuple data received from node: 181 bytes |
| Node: host=127.0.0.1 port=3002 dbname=postgres |
| -> Limit (cost=842.04..842.04 rows=3 width=63) (actual time=9.843..9.846 rows=3 loops=1) |
| -> Sort (cost=842.04..867.04 rows=10000 width=63) (actual time=9.841..9.843 rows=3 loops=1) |
| Sort Key: event_time DESC, event_id DESC |
| Sort Method: top-N heapsort Memory: 26kB |
| -> Bitmap Heap Scan on events_102227 events (cost=237.79..712.79 rows=10000 width=63) (actual time=1.744..5.660 rows=10000 loops=1) |
| Recheck Cond: (device_id = 2) |
| Heap Blocks: exact=350 |
| -> Bitmap Index Scan on events_pkey_102227 (cost=0.00..235.29 rows=10000 width=0) (actual time=1.647..1.647 rows=10000 loops=1) |
| Index Cond: (device_id = 2) |
| Planning Time: 0.113 ms |
| Execution Time: 9.908 ms |
| Planning Time: 0.126 ms |
| Execution Time: 12.710 ms |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
(20 rows)
case2:分布式表 join 分布式表
CREATE TABLE events1 (
device_id bigint,
event_id bigserial,
event_time timestamptz default now(),
data jsonb not null,
PRIMARY KEY (device_id, event_id)
);
SELECT create_distributed_table('events1', 'device_id');
INSERT INTO events1 (device_id, data) SELECT s % 100, ('{"measurement":'||random()||'}')::jsonb FROM generate_series(1,1000000) s;
分布键join
- 并发开的很积极,in的条件全部被并行了,in几个值就能并行几个。
- 从计划上看是OK的,但执行时间上看明显不是每个worker在本地join的,要大量shuffle。说明citus默认分布式表的分布策略,对于不同表是不一样的。例如两个表的shard key都是device_id,但同一个device_id不一定在容一个节点上。
- Co-location表负责解决这类问题。Co-location表 = PGXC的distribute by hash表
postgres=# explain select * from events e,events1 e1 where e.device_id=e1.device_id and e.device_id in (1,2,3,4,5);
+-------------------------------------------------------------------------------------------------------------------+
| QUERY PLAN |
+-------------------------------------------------------------------------------------------------------------------+
| Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=112) |
| Task Count: 5 |
| Tasks Shown: One of 5 |
| -> Task |
| Node: host=127.0.0.1 port=3003 dbname=postgres |
| -> Hash Join (cost=1707.29..1126727.63 rows=99645950 width=126) |
| Hash Cond: (e.device_id = e1.device_id) |
| -> Index Scan using events_pkey_102332 on events_102332 e (cost=0.29..2603.53 rows=9965 width=63) |
| Index Cond: (device_id = ANY ('{1,2,3,4,5}'::bigint[])) |
| -> Hash (cost=1082.00..1082.00 rows=50000 width=63) |
| -> Seq Scan on events1_102364 e1 (cost=0.00..1082.00 rows=50000 width=63) |
+-------------------------------------------------------------------------------------------------------------------+
非分布键join,默认不支持shuffle
postgres=# select * from events e,events1 e1 where e.device_id=e1.event_id and e.device_id in (1,2,3,4,5);
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
case3:分布式表 join 本地表
- 计划能做出来一半本地计划,一半分布式计划。
- 分布式结果会做成
Function Scan on read_intermediate_result intermediate_result
节点,提供给本地计划。 - CN上或DN上都能执行,分布式计划的部分都会路由到CN上执行,无论在哪个节点上,分布式计划结果都会做成Function Scan在进行下一步处理。
在CN上执行:
postgres=# drop table localtbl;
DROP TABLE
postgres=# create table localtbl(a int, b int);
CREATE TABLE
postgres=# insert into localtbl select t.i,t.i%10 from generate_series(0, 99) t(i);
INSERT 0 100
postgres=# select * from events e, localtbl l where e.device_id = l.a and e.device_id = 1 and e.event_id=86201;
+-----------+----------+-------------------------------+--------------------------------------+---+---+
| device_id | event_id | event_time | data | a | b |
+-----------+----------+-------------------------------+--------------------------------------+---+---+
| 1 | 86201 | 2025-03-25 17:02:52.776599+08 | {"measurement": 0.39845320795492434} | 1 | 1 |
+-----------+----------+-------------------------------+--------------------------------------+---+---+
(1 row)
postgres=# explain select * from events e, localtbl l where e.device_id = l.a and e.device_id = 1 and e.event_id=86201;
+---------------------------------------------------------------------------------------------------------------------+
| QUERY PLAN |
+---------------------------------------------------------------------------------------------------------------------+
| Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) |
| -> Distributed Subplan 89_1 |
| -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) |
| Task Count: 1 |
| Tasks Shown: All |
| -> Task |
| Node: host=127.0.0.1 port=3003 dbname=postgres |
| -> Index Scan using events_pkey_102332 on events_102332 e (cost=0.29..8.31 rows=1 width=63) |
| Index Cond: ((device_id = 1) AND (event_id = 86201)) |
| Task Count: 1 |
| Tasks Shown: All |
| -> Task |
| Node: host=localhost port=3001 dbname=postgres |
| -> Nested Loop (cost=0.00..53.36 rows=11 width=64) |
| -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..15.00 rows=1 width=56) |
| Filter: ((device_id = 1) AND (event_id = 86201)) |
| -> Seq Scan on localtbl l (cost=0.00..38.25 rows=11 width=8) |
| Filter: (a = 1) |
+---------------------------------------------------------------------------------------------------------------------+
(18 rows)
在DN上执行
postgres=# create table localtbl123(a int, b int);
CREATE TABLE
postgres=# insert into localtbl123 select t.i,t.i%10 from generate_series(0, 99) t(i);
INSERT 0 100
postgres=# select * from events e, localtbl123 l where e.device_id = l.a and e.device_id = 1 and e.event_id=86201;
+-----------+----------+-------------------------------+--------------------------------------+---+---+
| device_id | event_id | event_time | data | a | b |
+-----------+----------+-------------------------------+--------------------------------------+---+---+
| 1 | 86201 | 2025-03-25 17:02:52.776599+08 | {"measurement": 0.39845320795492434} | 1 | 1 |
+-----------+----------+-------------------------------+--------------------------------------+---+---+
(1 row)
postgres=# explain select * from events e, localtbl123 l where e.device_id = l.a and e.device_id = 1 and e.event_id=86201;
+---------------------------------------------------------------------------------------------------------------------+
| QUERY PLAN |
+---------------------------------------------------------------------------------------------------------------------+
| Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) |
| -> Distributed Subplan 45_1 |
| -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) |
| Task Count: 1 |
| Tasks Shown: All |
| -> Task |
| Node: host=127.0.0.1 port=3003 dbname=postgres |
| -> Index Scan using events_pkey_102332 on events_102332 e (cost=0.29..8.31 rows=1 width=63) |
| Index Cond: ((device_id = 1) AND (event_id = 86201)) |
| Task Count: 1 |
| Tasks Shown: All |
| -> Task |
| Node: host=localhost port=3002 dbname=postgres |
| -> Nested Loop (cost=0.00..53.36 rows=11 width=64) |
| -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..15.00 rows=1 width=56) |
| Filter: ((device_id = 1) AND (event_id = 86201)) |
| -> Seq Scan on localtbl123 l (cost=0.00..38.25 rows=11 width=8) |
| Filter: (a = 1) |
+---------------------------------------------------------------------------------------------------------------------+
(18 rows)
case4:分布式colocate_with表
CREATE TABLE devices (
device_id bigint primary key,
device_name text,
device_type_id int
);
SELECT create_distributed_table('devices', 'device_id', colocate_with := 'events');
INSERT INTO devices (device_id, device_name, device_type_id)
SELECT s, 'device-'||s, 55 FROM generate_series(0, 99) s;
CREATE INDEX ON devices (device_type_id);
ALTER TABLE events ADD CONSTRAINT device_id_fk
FOREIGN KEY (device_id) REFERENCES devices (device_id);
SELECT avg((data->>'measurement')::double precision)
FROM events JOIN devices USING (device_id)
WHERE device_type_id = 55;
结果:分布键join,执行飞快,本地join,无shuffle。
postgres=# explain SELECT avg((data->>'measurement')::double precision)
FROM events e, devices d
WHERE e.device_id=d.device_id and d.device_type_id = 55;
+---------------------------------------------------------------------------------------------------------------------+
| QUERY PLAN |
+---------------------------------------------------------------------------------------------------------------------+
| Aggregate (cost=500.00..500.02 rows=1 width=8) |
| -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=16) |
| Task Count: 32 |
| Tasks Shown: One of 32 |
| -> Task |
| Node: host=127.0.0.1 port=3002 dbname=postgres |
| -> Aggregate (cost=1013.83..1013.84 rows=1 width=16) |
| -> Nested Loop (cost=237.79..813.83 rows=10000 width=39) |
| -> Seq Scan on devices_102395 d (cost=0.00..1.04 rows=1 width=8) |
| Filter: (device_type_id = 55) |
| -> Bitmap Heap Scan on events_102331 e (cost=237.79..712.79 rows=10000 width=47) |
| Recheck Cond: (device_id = d.device_id) |
| -> Bitmap Index Scan on events_pkey_102331 (cost=0.00..235.29 rows=10000 width=0) |
| Index Cond: (device_id = d.device_id) |
+---------------------------------------------------------------------------------------------------------------------+
(14 rows)
Time: 8.212 ms
postgres=# SELECT avg((data->>'measurement')::double precision)
FROM events e, devices d
WHERE e.device_id=d.device_id and d.device_type_id = 55;
+-------------------+
| avg |
+-------------------+
| 0.500442721594347 |
+-------------------+
(1 row)
Time: 354.369 ms