当前位置: 首页 > article >正文

ClickHouse创建分布式表

ClickHouse创建分布式表

当数据量剧增的时候,clickhouse是采用分片的方式进行数据的存储的,类似于redis集群的实现方式。然后想进行统一的查询的时候,因为涉及到多个本地表,可以通过分布式表的方式来提供统一的入口。由于是涉及到分布式存储,保证高可用就必须有数据冗余—即副本(replica)。Clickhouse依靠ReplicatedMergeTree引擎族与Zookeeper实现了复制表机制,成为其高可用的基础。该引擎和 MergeTree 的不同之处在于它会删除排序键值相同的重复项。

同时,Clickhouse具有数据分片(shard)的概念,这也是分布式存储的特点之一,即通过并行读写提高效率。ClickhouseHouse依靠Distributed引擎实现了分布式表机制,在所有分片(本地表)上建立视图进行分布式查询。

本文使用ReplicatedMergeTree和Distributed引擎来构建Clickhouse的分布式表。分布式表包括了逻辑表和物理表,逻辑表主要用于查询,物理表是实际存储数据的。

官方给出的创建复制表示例

CREATE TABLE table_name
(
    EventDate DateTime,
    CounterID UInt32,
    UserID UInt32
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/table_name', '{replica}')
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)

贴上本文本地表的创建代码:

-- 建本地表
CREATE TABLE test.countly_device_local on cluster ch_cluster
(
    `appKey` String COMMENT 'appKey',
    `deviceId` String COMMENT 'deviceId',
    `nginxTime` DateTime COMMENT 'nginxTime',
    `rooted` String COMMENT 'rooted',
    `charging` String COMMENT 'charging',
    `idfa` String COMMENT 'idfa'
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/test/countly_device_local', '{replica}')
PARTITION BY toYYYYMMDD(nginxTime)
ORDER BY (nginxTime);

其中,on cluster ch_cluster语法标识分布式DDL,即执行一次就可以在集群所有实例上创建同样的本地表。

ReplicatedMergeTree

ReplicatedMergeTree引擎族接受两个参数:

  • zoo_path—Zookeeper中该表的路径
  • replica_name—Zookeeper中的该表的副本名称

如上例所示,这些参数可以包含宏替换的占位符,即大括号的部分。它们会被替换为配置文件里macros那部分配置的值。

示例:

<macros>
    <layer>05</layer>
    <shard>02</shard>
    <replica>example05-02-1.yandex.ru</replica>
</macros>

«ZooKeeper 中该表的路径»对每个可复制表都要是唯一的。不同分片上的表要有不同的路径。

这种情况下,路径应包含下面这些部分:

/clickhouse/tables/ 是公共前缀,官方推荐使用这个。

{layer}-{shard} 是分片标识部分。在此示例中,由于 Yandex.Metrica 集群使用了两级分片,所以它是由两部分组成的。但对于大多数情况来说,你只需保留 {shard} 占位符即可,它会替换展开为分片标识。

table_name 是该表在 ZooKeeper 中的名称。使其与 ClickHouse 中的表名相同比较好。 这里它被明确定义,跟 ClickHouse 表名不一样,它并不会被 RENAME 语句修改。

HINT:你可以在前面添加一个数据库名称 table_name 也是 例如。 db_name.table_name

副本名称用于标识同一个表分片的不同副本。你可以使用服务器名称,如上例所示。同个分片中不同副本的副本名称要唯一。

你也可以显式指定这些参数,而不是使用宏替换。对于测试和配置小型集群这可能会很方便。但是,这种情况下,则不能使用分布式 DDL 语句(ON CLUSTER)。使用大型集群时,官方建议使用宏替换,因为它可以降低出错的可能性。

要删除副本,使用 DROP TABLE。但它只删除那个 – 位于运行该语句的服务器上的副本。

想要删除集群上的所有副本,可以使用drop table [if exists] table_name on cluster cluster_name

Distributed Table & Distributed Engine

ClickHouse分布式表的本质并不是一张表,而是一些本地物理表(分片)的分布式视图,分布式引擎本身不存储数据, 但可以在多个服务器上进行分布式查询。读是自动并行的。读取时,远程服务器表的索引(如果有的话)会被使用。

支持分布式表的引擎是Distributed,建表DDL语句示例如下

Distributed(logs, default, hits[, sharding_key])

分布式引擎参数:服务器配置文件中的集群名,远程数据库名,远程表名,数据分片键(可选)。

将会从位于«logs»集群中 default.hits 表所有服务器上读取数据。

建立上面的本地表的分布式的语句为:

-- 建分布式表
create TABLE test.countly_device_dist on cluster ch_cluster as test.countly_device_local
ENGINE = Distributed("ch_cluster", "test", "countly_device_local", rand());

远程服务器不仅用于读取数据,还会对尽可能数据做部分处理。 例如,对于使用 GROUP BY 的查询,数据首先在远程服务器聚合,之后返回聚合函数的中间状态给查询请求的服务器。再在请求的服务器上进一步汇总数据。

要查看集群的详细信息,可以查看system.clusters表:

select * from system.clusters
r-clickhouse-m220-210.hd163.internal :) select * from system.clusters;

SELECT *
FROM system.clusters

Query id: bbf4a9fa-5a94-48b4-b485-e50bc9a2e891

┌─cluster─────────┬─shard_num─┬─shard_weight─┬─replica_num─┬─host_name──────────────┬─host_address───┬─port─┬─is_local─┬─user────┬─default_database─┬─errors_count─┬─estimated_recovery_time─┐
│ ch_base_cluster │         1 │        10000 │           1 │ clickhouse_node1_1_1   │ 10.105.220.210 │ 9000 │        1 │ default │                  │            0 │                       0 │
│ ch_base_cluster │         1 │        10000 │           2 │ clickhouse_node2_1_2   │ 10.105.220.211 │ 9001 │        0 │ default │                  │            0 │                       0 │
│ ch_base_cluster │         2 │        10000 │           1 │ clickhouse_node2_2_1   │ 10.105.220.211 │ 9000 │        0 │ default │ 
...

本文所使用的环境有一个名为ch_cluster的集群,它由两个分片组成,每个分片包含两个副本。

要注意区分分片和副本:

  • 分片是指包含数据不同部分的服务器(要读取所有数据,必须访问所有分片)。
  • 副本是存储复制数据的服务器(要读取所有数据,访问任一副本上的数据即可)。

向分布式表中写数据

向集群写数据的方法有两种:

  1. 自已指定要将哪些数据写入哪些服务器,并直接在每个分片上执行写入。换句话说,在分布式表上«查询»,在数据表上 INSERT。 这是最灵活的解决方案 – 你可以使用任何分片方案,对于复杂业务特性的需求,这可能是非常重要的。 这也是最佳解决方案,因为数据可以完全独立地写入不同的分片。
  2. 在分布式表上执行 INSERT。在这种情况下,分布式表会跨服务器分发插入数据。 为了写入分布式表,必须要配置分片键(最后一个参数)。当然,如果只有一个分片,则写操作在没有分片键的情况下也能工作,因为这种情况下分片键没有意义。

每个分片都可以在配置文件中定义权重。默认情况下,权重等于1。数据依据分片权重按比例分发到分片上。例如,如果有两个分片,第一个分片的权重是9,而第二个分片的权重是10,则发送 9 / 19 的行到第一个分片, 10 / 19 的行到第二个分片。

选择将一行数据发送到哪个分片的方法是,首先计算分片表达式,然后将这个计算结果除以所有分片的权重总和得到余数。该行会发送到那个包含该余数的从prev_weightprev_weights + weight的半闭半开区间对应的分片上,其中 prev_weights 是该分片前面的所有分片的权重和,weight 是该分片的权重。例如,如果有两个分片,第一个分片权重为9,而第二个分片权重为10,则余数在 [0,9) 中的行发给第一个分片,余数在 [9,19) 中的行发给第二个分片。

分片表达式可以是由常量和表列组成的任何返回整数表达式。例如,您可以使用表达式 ‘rand()’ 来随机分配数据,或者使用 ‘UserID’ 来按用户 ID 的余数分布

总结

Clickhouse采用了特殊的引擎设计结构和各种方案保证其查询和存储的高效。分布式的创建和使用还有很多的细节等待去深挖。


http://www.kler.cn/a/388460.html

相关文章:

  • JAVA题目笔记(十五)经典算法题
  • JSON-RPC-CXX深度解析:C++中的远程调用利器
  • 前端--> nginx-->gateway产生的跨域问题分析
  • 什么是数字图像?
  • GISBox VS ArcGIS:分别适用于大型和小型项目的两款GIS软件
  • FFmpeg 4.3 音视频-多路H265监控录放C++开发十三:将AVFrame转换成AVPacket。视频编码原理.编码相关api
  • 用Java实现samza转换成flink
  • linprog函数在octave中的使用
  • WPF中ImageBrush和Image的区别
  • 斐波那契数的第n个数代码分享(c基础)
  • 【如何使用 ADB 脚本批量停止 Android 设备上的所有应用】
  • 基于WebService的面向服务架构研究
  • 浅谈“通感一体”
  • el-table 表格索引不展示问题
  • Golang | Leetcode Golang题解之第556题下一个更大元素III
  • Facebook定位不准是什么原因?
  • 零基础入门进程间通信:task 1(匿名管道与vscode使用)
  • JS如何读取JSON数据并且格式化解析?
  • 京准同步:GPS北斗卫星授时服务器发展趋势介绍
  • javascript中的 fetch API和 $.ajax API
  • 24年11月架构考试题里的两道小学数学题
  • ⭐SmartControl: Enhancing ControlNet for Handling Rough Visual Conditions
  • 使用Go语言编写一个简单的NTP服务器
  • 《重学Java设计模式》之 建造者模式
  • 第三十八章 章节练习之面经页面
  • (一)<江科大STM32>——软件环境搭建+新建工程步骤