当前位置: 首页 > article >正文

Citus源码(1)分布式表行为测试

  1. 最近对citus的实现非常好奇,本篇对citus的行为做一些测试。
  2. 本篇只测行为,不分析源码。后面会继续写一系列文章分析citus源码。

环境:3节点 PG17 with citus。

SELECT citus_set_coordinator_host('127.0.0.1', 3001);
SELECT citus_add_node('127.0.0.1', 3002);
SELECT citus_add_node('127.0.0.1', 3003);
SELECT rebalance_table_shards();
SELECT * from pg_dist_node;

postgres=# SELECT * from pg_dist_node;
+--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------+
| nodeid | groupid | nodename  | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards |
+--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------+
|      1 |       0 | 10.0.0.1  |     3002 | default  | t           | t        | primary  | default     | t              | f                |
|      7 |       6 | 127.0.0.1 |     3002 | default  | t           | t        | primary  | default     | t              | t                |
|      8 |       7 | 127.0.0.1 |     3003 | default  | t           | t        | primary  | default     | t              | t                |
+--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------+
(3 rows)

case1:分布式表

  • 注意表只能在cn建,dn建不能做create_distributed_table操作。
  • dn上表能建,但cn上create_distributed_table时会报错,dn上已经有同名包。

结论:shard在dn上是暴漏在外面的,可以直接查询,和cn上查询效果一样。

CREATE TABLE events (
  device_id bigint,
  event_id bigserial,
  event_time timestamptz default now(),
  data jsonb not null,
  PRIMARY KEY (device_id, event_id)
);

SELECT create_distributed_table('events', 'device_id');
INSERT INTO events (device_id, data) SELECT s % 100, ('{"measurement":'||random()||'}')::jsonb FROM generate_series(1,1000000) s;
  • device_id = 1在3003上,device_id = 2在3002上,说明数据还是分片存储的,分片策略现在还不清楚,后面看源码。
  • 无论在哪个节点上,只要查询分布式表都能生成分布式计划。应该是会统一路由到cn上。具体怎么路由的需要看源码。
postgres=# EXPLAIN ANALYZE SELECT * FROM events WHERE device_id = 1 ORDER BY event_time DESC, event_id DESC LIMIT 3;
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                         QUERY PLAN                                                                         |
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0) (actual time=13.756..13.758 rows=3 loops=1)                                                 |
|   Task Count: 1                                                                                                                                            |
|   Tuple data received from nodes: 181 bytes                                                                                                                |
|   Tasks Shown: All                                                                                                                                         |
|   ->  Task                                                                                                                                                 |
|         Tuple data received from node: 181 bytes                                                                                                           |
|         Node: host=127.0.0.1 port=3003 dbname=postgres                                                                                                     |
|         ->  Limit  (cost=1071.55..1071.56 rows=3 width=63) (actual time=11.164..11.166 rows=3 loops=1)                                                     |
|               ->  Sort  (cost=1071.55..1096.36 rows=9925 width=63) (actual time=11.162..11.163 rows=3 loops=1)                                             |
|                     Sort Key: event_time DESC, event_id DESC                                                                                               |
|                     Sort Method: top-N heapsort  Memory: 26kB                                                                                              |
|                     ->  Bitmap Heap Scan on events_102204 events  (cost=237.21..943.27 rows=9925 width=63) (actual time=1.900..6.872 rows=10000 loops=1)   |
|                           Recheck Cond: (device_id = 1)                                                                                                    |
|                           Heap Blocks: exact=582                                                                                                           |
|                           ->  Bitmap Index Scan on events_pkey_102204  (cost=0.00..234.73 rows=9925 width=0) (actual time=1.729..1.730 rows=10000 loops=1) |
|                                 Index Cond: (device_id = 1)                                                                                                |
|             Planning Time: 0.121 ms                                                                                                                        |
|             Execution Time: 11.236 ms                                                                                                                      |
| Planning Time: 0.129 ms                                                                                                                                    |
| Execution Time: 13.973 ms                                                                                                                                  |
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
(20 rows)


postgres=# EXPLAIN ANALYZE SELECT * FROM events WHERE device_id = 2 ORDER BY event_time DESC, event_id DESC LIMIT 3;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                         QUERY PLAN                                                                          |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0) (actual time=12.512..12.514 rows=3 loops=1)                                                  |
|   Task Count: 1                                                                                                                                             |
|   Tuple data received from nodes: 181 bytes                                                                                                                 |
|   Tasks Shown: All                                                                                                                                          |
|   ->  Task                                                                                                                                                  |
|         Tuple data received from node: 181 bytes                                                                                                            |
|         Node: host=127.0.0.1 port=3002 dbname=postgres                                                                                                      |
|         ->  Limit  (cost=842.04..842.04 rows=3 width=63) (actual time=9.843..9.846 rows=3 loops=1)                                                          |
|               ->  Sort  (cost=842.04..867.04 rows=10000 width=63) (actual time=9.841..9.843 rows=3 loops=1)                                                 |
|                     Sort Key: event_time DESC, event_id DESC                                                                                                |
|                     Sort Method: top-N heapsort  Memory: 26kB                                                                                               |
|                     ->  Bitmap Heap Scan on events_102227 events  (cost=237.79..712.79 rows=10000 width=63) (actual time=1.744..5.660 rows=10000 loops=1)   |
|                           Recheck Cond: (device_id = 2)                                                                                                     |
|                           Heap Blocks: exact=350                                                                                                            |
|                           ->  Bitmap Index Scan on events_pkey_102227  (cost=0.00..235.29 rows=10000 width=0) (actual time=1.647..1.647 rows=10000 loops=1) |
|                                 Index Cond: (device_id = 2)                                                                                                 |
|             Planning Time: 0.113 ms                                                                                                                         |
|             Execution Time: 9.908 ms                                                                                                                        |
| Planning Time: 0.126 ms                                                                                                                                     |
| Execution Time: 12.710 ms                                                                                                                                   |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
(20 rows)

case2:分布式表 join 分布式表

CREATE TABLE events1 (
  device_id bigint,
  event_id bigserial,
  event_time timestamptz default now(),
  data jsonb not null,
  PRIMARY KEY (device_id, event_id)
);

SELECT create_distributed_table('events1', 'device_id');
INSERT INTO events1 (device_id, data) SELECT s % 100, ('{"measurement":'||random()||'}')::jsonb FROM generate_series(1,1000000) s;

分布键join

  • 并发开的很积极,in的条件全部被并行了,in几个值就能并行几个。
  • 从计划上看是OK的,但执行时间上看明显不是每个worker在本地join的,要大量shuffle。说明citus默认分布式表的分布策略,对于不同表是不一样的。例如两个表的shard key都是device_id,但同一个device_id不一定在容一个节点上。
  • Co-location表负责解决这类问题。Co-location表 = PGXC的distribute by hash表
postgres=# explain select * from events e,events1 e1 where e.device_id=e1.device_id and e.device_id in (1,2,3,4,5);
+-------------------------------------------------------------------------------------------------------------------+
|                                                    QUERY PLAN                                                     |
+-------------------------------------------------------------------------------------------------------------------+
| Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=112)                                             |
|   Task Count: 5                                                                                                   |
|   Tasks Shown: One of 5                                                                                           |
|   ->  Task                                                                                                        |
|         Node: host=127.0.0.1 port=3003 dbname=postgres                                                            |
|         ->  Hash Join  (cost=1707.29..1126727.63 rows=99645950 width=126)                                         |
|               Hash Cond: (e.device_id = e1.device_id)                                                             |
|               ->  Index Scan using events_pkey_102332 on events_102332 e  (cost=0.29..2603.53 rows=9965 width=63) |
|                     Index Cond: (device_id = ANY ('{1,2,3,4,5}'::bigint[]))                                       |
|               ->  Hash  (cost=1082.00..1082.00 rows=50000 width=63)                                               |
|                     ->  Seq Scan on events1_102364 e1  (cost=0.00..1082.00 rows=50000 width=63)                   |
+-------------------------------------------------------------------------------------------------------------------+

非分布键join,默认不支持shuffle

postgres=# select * from events e,events1 e1 where e.device_id=e1.event_id and e.device_id in (1,2,3,4,5);
ERROR:  the query contains a join that requires repartitioning
HINT:  Set citus.enable_repartition_joins to on to enable repartitioning

case3:分布式表 join 本地表

  • 计划能做出来一半本地计划,一半分布式计划。
  • 分布式结果会做成Function Scan on read_intermediate_result intermediate_result节点,提供给本地计划。
  • CN上或DN上都能执行,分布式计划的部分都会路由到CN上执行,无论在哪个节点上,分布式计划结果都会做成Function Scan在进行下一步处理。

在CN上执行:

postgres=# drop table localtbl;
DROP TABLE
postgres=# create table localtbl(a int, b int);
CREATE TABLE
postgres=# insert into localtbl select t.i,t.i%10 from generate_series(0, 99) t(i);
INSERT 0 100
postgres=# select * from events e, localtbl l where e.device_id = l.a and e.device_id = 1 and e.event_id=86201;
+-----------+----------+-------------------------------+--------------------------------------+---+---+
| device_id | event_id |          event_time           |                 data                 | a | b |
+-----------+----------+-------------------------------+--------------------------------------+---+---+
|         1 |    86201 | 2025-03-25 17:02:52.776599+08 | {"measurement": 0.39845320795492434} | 1 | 1 |
+-----------+----------+-------------------------------+--------------------------------------+---+---+
(1 row)

postgres=# explain select * from events e, localtbl l where e.device_id = l.a and e.device_id = 1 and e.event_id=86201;
+---------------------------------------------------------------------------------------------------------------------+
|                                                     QUERY PLAN                                                      |
+---------------------------------------------------------------------------------------------------------------------+
| Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)                                                      |
|   ->  Distributed Subplan 89_1                                                                                      |
|         ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)                                          |
|               Task Count: 1                                                                                         |
|               Tasks Shown: All                                                                                      |
|               ->  Task                                                                                              |
|                     Node: host=127.0.0.1 port=3003 dbname=postgres                                                  |
|                     ->  Index Scan using events_pkey_102332 on events_102332 e  (cost=0.29..8.31 rows=1 width=63)   |
|                           Index Cond: ((device_id = 1) AND (event_id = 86201))                                      |
|   Task Count: 1                                                                                                     |
|   Tasks Shown: All                                                                                                  |
|   ->  Task                                                                                                          |
|         Node: host=localhost port=3001 dbname=postgres                                                              |
|         ->  Nested Loop  (cost=0.00..53.36 rows=11 width=64)                                                        |
|               ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..15.00 rows=1 width=56) |
|                     Filter: ((device_id = 1) AND (event_id = 86201))                                                |
|               ->  Seq Scan on localtbl l  (cost=0.00..38.25 rows=11 width=8)                                        |
|                     Filter: (a = 1)                                                                                 |
+---------------------------------------------------------------------------------------------------------------------+
(18 rows)

在DN上执行

postgres=# create table localtbl123(a int, b int);
CREATE TABLE
postgres=# insert into localtbl123 select t.i,t.i%10 from generate_series(0, 99) t(i);
INSERT 0 100
postgres=# select * from events e, localtbl123 l where e.device_id = l.a and e.device_id = 1 and e.event_id=86201;
+-----------+----------+-------------------------------+--------------------------------------+---+---+
| device_id | event_id |          event_time           |                 data                 | a | b |
+-----------+----------+-------------------------------+--------------------------------------+---+---+
|         1 |    86201 | 2025-03-25 17:02:52.776599+08 | {"measurement": 0.39845320795492434} | 1 | 1 |
+-----------+----------+-------------------------------+--------------------------------------+---+---+
(1 row)

postgres=# explain select * from events e, localtbl123 l where e.device_id = l.a and e.device_id = 1 and e.event_id=86201;
+---------------------------------------------------------------------------------------------------------------------+
|                                                     QUERY PLAN                                                      |
+---------------------------------------------------------------------------------------------------------------------+
| Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)                                                      |
|   ->  Distributed Subplan 45_1                                                                                      |
|         ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)                                          |
|               Task Count: 1                                                                                         |
|               Tasks Shown: All                                                                                      |
|               ->  Task                                                                                              |
|                     Node: host=127.0.0.1 port=3003 dbname=postgres                                                  |
|                     ->  Index Scan using events_pkey_102332 on events_102332 e  (cost=0.29..8.31 rows=1 width=63)   |
|                           Index Cond: ((device_id = 1) AND (event_id = 86201))                                      |
|   Task Count: 1                                                                                                     |
|   Tasks Shown: All                                                                                                  |
|   ->  Task                                                                                                          |
|         Node: host=localhost port=3002 dbname=postgres                                                              |
|         ->  Nested Loop  (cost=0.00..53.36 rows=11 width=64)                                                        |
|               ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..15.00 rows=1 width=56) |
|                     Filter: ((device_id = 1) AND (event_id = 86201))                                                |
|               ->  Seq Scan on localtbl123 l  (cost=0.00..38.25 rows=11 width=8)                                     |
|                     Filter: (a = 1)                                                                                 |
+---------------------------------------------------------------------------------------------------------------------+
(18 rows)

case4:分布式colocate_with表

CREATE TABLE devices (
  device_id bigint primary key,
  device_name text,
  device_type_id int
);

SELECT create_distributed_table('devices', 'device_id', colocate_with := 'events');

INSERT INTO devices (device_id, device_name, device_type_id)
SELECT s, 'device-'||s, 55 FROM generate_series(0, 99) s;
CREATE INDEX ON devices (device_type_id);

ALTER TABLE events ADD CONSTRAINT device_id_fk
FOREIGN KEY (device_id) REFERENCES devices (device_id);

SELECT avg((data->>'measurement')::double precision)
FROM events JOIN devices USING (device_id)
WHERE device_type_id = 55;

结果:分布键join,执行飞快,本地join,无shuffle。

postgres=# explain SELECT avg((data->>'measurement')::double precision)
FROM events e, devices d
WHERE e.device_id=d.device_id and d.device_type_id = 55;
+---------------------------------------------------------------------------------------------------------------------+
|                                                     QUERY PLAN                                                      |
+---------------------------------------------------------------------------------------------------------------------+
| Aggregate  (cost=500.00..500.02 rows=1 width=8)                                                                     |
|   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=16)                                          |
|         Task Count: 32                                                                                              |
|         Tasks Shown: One of 32                                                                                      |
|         ->  Task                                                                                                    |
|               Node: host=127.0.0.1 port=3002 dbname=postgres                                                        |
|               ->  Aggregate  (cost=1013.83..1013.84 rows=1 width=16)                                                |
|                     ->  Nested Loop  (cost=237.79..813.83 rows=10000 width=39)                                      |
|                           ->  Seq Scan on devices_102395 d  (cost=0.00..1.04 rows=1 width=8)                        |
|                                 Filter: (device_type_id = 55)                                                       |
|                           ->  Bitmap Heap Scan on events_102331 e  (cost=237.79..712.79 rows=10000 width=47)        |
|                                 Recheck Cond: (device_id = d.device_id)                                             |
|                                 ->  Bitmap Index Scan on events_pkey_102331  (cost=0.00..235.29 rows=10000 width=0) |
|                                       Index Cond: (device_id = d.device_id)                                         |
+---------------------------------------------------------------------------------------------------------------------+
(14 rows)

Time: 8.212 ms
postgres=# SELECT avg((data->>'measurement')::double precision)
FROM events e, devices d
WHERE e.device_id=d.device_id and d.device_type_id = 55;
+-------------------+
|        avg        |
+-------------------+
| 0.500442721594347 |
+-------------------+
(1 row)

Time: 354.369 ms

http://www.kler.cn/a/613819.html

相关文章:

  • 【Mac】npm error Error: EACCES: permission denied, mkdir‘/Users/...
  • 第十三届蓝桥杯国赛电子类单片机学习记录(客观题)
  • HCIP笔记整理
  • 2025年春招-Linux面经
  • 从零开始跑通3DGS教程:(一)数据(采集)
  • 群体智能优化算法-蜂鸟优化算法(Artificial Hummingbird Algorithm, AHA, 含Matlab源代码)
  • EF Core 乐观并发控制(并发令牌)
  • Vue学习笔记集--postcss-px-to-viewport
  • 性能比拼: Rust vs C++
  • 从泛读到精读:合合信息文档解析如何让大模型更懂复杂文档
  • SQLModel笔记
  • 视图、MySQL、触发器、存储过程、流程控制语句
  • 免去繁琐的手动埋点,Gin 框架可观测性最佳实践
  • SpringBoot 面试八股文
  • 【Pytorch实战教程】with torch.no_grad():
  • 【ArcGIS】ArcGIS10.6彻底卸载和ArcGIS10.2安装全过程
  • git push的时候出现无法访问的解决
  • Flink TaskManager之间数据传输(NetworkManager)
  • 服务器硬盘出现故障都有哪些解决方法?
  • Redis中的数据类型与适用场景