后端服务集成ElasticSearch搜索功能技术方案
文章目录
- 一、为什么选用ElasticSearch
- 二、ElasticSearch基本概念
- 1、文档和字段
- 2、索引和映射
- 3、倒排索引、文档和词条
- 4、分词器
- 三、ElasticSearch工作原理
- 1、Term Dictionary、Term index
- 2、Stored Fields
- 3、Docs Values
- 4、Segment
- 5、Lucene
- 6、高性能、高扩展性、高可用
- ①高性能
- ②高扩展性
- ③高可用
- 7、ES架构
- 8、ES写入流程
- 9、ES搜索流程
- 四、如何使用ElasticSearch
- 1、安装部署
- 2、CRUD操作
- ① index索引操作
- ② 文档操作
- 五、MySQL数据导入到ElasticSearch的4种方案
- 1、同步双写
- 2、异步双写
- 3、基于MySQL表定时同步
- 4、基于Binlog日志实时同步
- 总结:上述四种同步方案总结
- 六、Canal工作原理
- 1、MySQL主备复制原理
- 2、canal 工作原理
- 3、canal架构
- 七、Canal实时同步Mysql数据到ElasticSearch
- 1、MySQL安装部署
- 2、ElasticSearch安装部署
- 3、安装kibana
- 4、Canal安装部署
- ① canal.admin 安装启动
- ② canal.deployer安装启动
- ③ canal.adapter安装启动
- 5、整体验证
- 6、启动脚本整理
- 八、Go-Zero项目集成ElasticSearch
- 1、数据同步
- 2、多表连表查询
- 3、Go-Zero框架集成ES
相关链接:
ElasticSearch官网地址:https://www.elastic.co/cn/
Lucene官网地址:https://lucene.apache.org/
IK分词器官网地址:https://github.com/infinilabs/analysis-ik
Canal官网地址:https://github.com/alibaba/canal
一、为什么选用ElasticSearch
ElasticSearch是一款非常强大的开源的分布式搜索引擎,具备从海量数据中快速找到需要内容的功能,可以用来实现搜索、日志统计、分析、系统监控等功能。
在实际项目开发中,我们经常将Mysql作为业务数据库,ES作为查询数据库,一是可以用来实现读写分离,使项目的架构有更好的扩展性。二是可以缓解Mysql数据库的查询压力,应对海量数据的复杂查询。
ES 几个显著的特点,能够有效补足 MySQL 在企业级数据操作场景的缺陷,而这也是我们将其选择作为下游数据源重要原因。
核心特点:支持分词检索,多维筛选性能好,支持海量数据查询。
-
文本搜索能力:ES 是基于倒排索引实现的搜索系统,配合多样的分词器,在文本模糊匹配搜索上表现得比较好,业务场景广泛。
-
多维筛选性能好:亿级规模数据使用宽表预构建(消除 join),配合全字段索引,使 ES 在多维筛选能力上具备压倒性优势,而这个能力是诸如 CRM, BOSS, MIS 等企业运营系统核心诉求,加上文本搜索能力,独此一家。
-
开源和商业并行:ES 开源生态非常活跃,具备大量的用户群体,同时其背后也有独立的商业公司支撑,而这让用户根据自身特点有了更加多样、渐进的选择。
ElasticSearch的发展历史:
2004年Shay Banon基于Lucene开发了Compass。
2010年Shay Banon重写了Compass,并取名为Elasticsearch。
后端服务系统集成ElasticSearch的好处:
- 强大的搜索功能,支持大数据量的模糊搜索功能。比如:设备、测点等。
- 一定程度上做到了读写分离,减轻MySQL数据库的读写压力。
- 也方便后续的ELK搭建,为服务日志提供搜索功能。
二、ElasticSearch基本概念
ElasticSearch官网地址:https://www.elastic.co/cn/
1、文档和字段
ES是面向文档(Document)存储的,可以是数据库中的一条商品数据,一个订单信息。文档数据会被序列化为json格式后存储在elasticsearch中。
ES中的文档和字段对应MySQL中行记录和列属性。
2、索引和映射
索引(Index),就是相同类型的文档的集合。
数据库中的表会有约束信息,用来定义表的结构、字段的名称、类型等信息。因此,索引库中就有映射(mapping),是索引中文档的字段约束信息,类似表的结构约束。
ES中的索引和映射对应MySQL中的表和约束。
为避免大家对这两个概念比较陌生,这里给出ES的索引和映射的更具象化的列子:在ES中获取索引和映射。
- 方式1:基于HTTP请求获取
# 获取索引的全部信息
curl -X GET 10.0.0.101:9200/mybook?pretty=true
# 获取索引的映射信息
curl -X GET 10.0.0.101:9200/mybook/_mapping?pretty=true
- 方式2:在kibana中基于API获取
# 获取索引的全部信息
GET /mybook
# 获取索引的映射信息
GET /mybook/_mapping
无法是哪种获取方式,都是在组装DSL并通过Restful接口进行调用【这也是ES可被任何语言调用的原因】。
这里获取结果如下:
# mybook索引的全部信息
{
"mybook" : {
"aliases" : { },
"mappings" : {
"dynamic" : "false",
"properties" : {
"author" : {
"type" : "text"
},
"id" : {
"type" : "long"
},
"isbn" : {
"type" : "text"
},
"publisherName" : {
"type" : "text"
},
"title" : {
"type" : "text"
}
}
},
"settings" : {
"index" : {
"routing" : {
"allocation" : {
"include" : {
"_tier_preference" : "data_content"
}
}
},
"number_of_shards" : "1",
"provided_name" : "mybook",
"creation_date" : "1735287860250",
"number_of_replicas" : "1",
"uuid" : "eAT8Z7V9ThiPyRu5rnVQLQ",
"version" : {
"created" : "7130499"
}
}
}
}
}
# mybook索引的映射信息
{
"mybook" : {
"mappings" : {
"dynamic" : "false",
"properties" : {
"author" : {
"type" : "text"
},
"id" : {
"type" : "long"
},
"isbn" : {
"type" : "text"
},
"publisherName" : {
"type" : "text"
},
"title" : {
"type" : "text"
}
}
}
}
}
- MySQL基本概念和ES基本概念对比
Elasticsearch | 说明 | MySQL |
---|---|---|
Index | 索引(index),就是文档的集合,类似数据库的表(table) | Table |
Document | 文档(Document),就是一条条的数据,类似数据库中的行(Row),文档都是JSON格式 | Row |
Field | 字段(Field),就是JSON文档中的字段,类似数据库中的列(Column) | Column |
Mapping | Mapping(映射)是索引中文档的约束,例如字段类型约束。类似数据库的表结构(Schema) | Schema |
DSL | DSL是elasticsearch提供的JSON风格的请求语句,用来操作elasticsearch,实现CRUD | SQL |
3、倒排索引、文档和词条
倒排索引的概念比较奇特,它是根据MySQL的索引(也即正向索引)概念而命名的。
理解倒排索引需要先理解两个基本概念:文档和词条。
文档就是用来搜索的数据,每一条数据就是一个文档。
词条,即term,也称词项。就是对文档数据或用户搜索数据,利用某种算法分词,得到的具备含义的词语就是词条。
还是来举一个具体的例子来说明:“华为小米充电器” 就是文档,它可以分为:华为、小米、充电器等这样的几个词条。
倒排索引就是以词条为核心,记录哪些文档包含了该词条。
实际上,倒排索引除了记录哪些文档包含该词条,还记录了词条的词频,词条在文本里的偏移量等信息。
正向索引 V.S 倒排索引
正向索引是最传统的,根据id索引的方式。所以当需要根据词条查询时,必须先逐条获取每个文档,然后判断文档中是否包含所需要的词条,是根据文档找词条的过程。
而倒排索引则相反,是先找到用户要搜索的词条,根据词条得到保护词条的文档的id,然后根据id获取文档。是根据词条找文档的过程。
正向索引—优点:
可以给多个字段创建索引;根据索引字段搜索、排序速度非常快。
正向索引—缺点:
根据非索引字段,或者索引字段中的部分词条查找时,只能全表扫描。
倒排索引—优点:
根据词条搜索、模糊搜索时,速度非常快。
倒排索引—缺点:
只能给词条创建索引,而不是字段;无法根据字段做排序。
4、分词器
分词器有两个作用:一是创建倒排索引时对文档分词,二是用户搜索时,对输入的内容进行分词。
IK 分词器(IK Analyzer)是专门为 ElasticSearch 设计开发的一款中文分词插件。主要用于对中文文本进行准确且合理的分词处理,以便更好地实现搜索、索引等功能。
IK 分词原理:IK 分词器基于词典和一些特定的规则来对文本进行切分。它内部有一个较为丰富的中文词典,里面包含了大量常见的中文词汇、短语等内容。在对文本进行分词时,会根据词典中的词条以及一些预设的规则去识别和划分词语。
IK分词有两种分词模式:
- ik_max_word:这是一种细粒度的分词模式。例如对于文本 “中华人民共和国”,它会被切分成 “中华”“中华人民”“中华人民共和国”“华人”“人民”“人民共和国”“共和”“共和国” 等多个词语,尽可能多地拆分出不同组合的词语,这种模式适用于需要更全面捕捉文本语义的场景,比如搜索引擎的索引构建,能让更多的词语组合参与到搜索匹配中。
- ik_smart:属于粗粒度的分词模式。同样对于 “中华人民共和国”,可能就直接切分成 “中华人民共和国” 这样一个完整的词语,相对而言拆分得较为 “简洁”,比较适合在一些对文本理解不需要特别细致拆分的场景中使用,例如对文档做简单分类等情况。
下载:可以从官方 GitHub 仓库或者相关的软件下载站点获取对应的 IK 分词器版本,要确保其版本和所使用的 ElasticSearch 版本兼容。
安装到 ElasticSearch:将下载好的 IK 分词器压缩包解压后,把整个目录复制到 ElasticSearch 安装目录下的 plugins
文件夹中(如果没有 plugins
文件夹就新建一个)。然后重启 ElasticSearch,它就会自动加载 IK 分词器插件。
三、ElasticSearch工作原理
相关链接:https://mp.weixin.qq.com/s/RUQXIyN95hvi2wM3CyPI9w
1、Term Dictionary、Term index
文档可以通过ik分词器分为多个词条(又称词项,即Term)。词条term会按字典排好序形成Term Dictionary(用于二分查找)。将 Term Dictionary 的部分词条的前缀信息提取出来构建出一个精简的目录树。目录树的节点中存放这些词条在磁盘中的偏移量,也就是指向磁盘中的位置。这个目录树结构,体积小,适合放内存中,它就是所谓的 Term Index。用它可以加速搜索。
这样当需要查找某个词项的时候,只需要搜索 Term Index,就能快速获得词项在 Term Dictionary 中的大概位置。再跳转到 Term Dictionary,通过少量的检索,定位到词条内容。
2、Stored Fields
倒排索引,搜索到的是文档 id,我们还需要拿着这个 id 找到文档内容本身,才能返回给用户。因此还需要有个地方,存放完整的文档内容,它就是 Stored Fields(行式存储)。
3、Docs Values
后端的业务经常需要根据某个字段排序文档,比如按时间排序或商品价格排序。但问题就来了,这些字段散落在文档里。也就是说,我们需要先获取 Stored Fields 里的文档,再提取出内部字段进行排序。也不是说不行,但其实有更高效的做法。我们可以用空间换时间的思路,再构造一个列式存储结构,将散落在各个文档的某个字段,集中存放,当我们想对某个字段排序的时候,就只需要将这些集中存放的字段一次性读取出来,就能做到针对性地进行排序。这个列式存储结构,就是所谓的 Doc Values。
4、Segment
倒排索引用于搜索,Term Index 用于加速搜索,Stored Fields 用于存放文档的原始信息,以及 Doc Values 用于排序和聚合。这些结构共同组成了一个复合文件,也就是所谓的"segment", 它是一个具备完整搜索功能的最小单元。
5、Lucene
多个文档记录可以用来生成一份 segment,如果新增文档时,还是写入到这份 segment,那就得同时更新 segment 内部的多个数据结构,这样并发读写时性能肯定会受影响。所以规定:segment 一旦生成,则不能再被修改。如果还有新的文档要写入,老的segment已经写满,那就生成新的 segment。这样老的 segment 只需要负责读,写则生成新的 segment。同时保证了读和写的性能。
随着数据量增大,segment文件数会变多,这是就可以并发同时读多个 segment。当然segment文件数也不能无限制的变多,程序会不定期的合并多个小的 segment, 也就是段合并(segment merging) 。这就是有名的Lucene,一个单机文本检索库。
6、高性能、高扩展性、高可用
ElasticSearch就是基于单机检索库Lucene构建了一个高性能、高扩展性、高可用的强大的搜索引擎。
①高性能
当多个调用方同时读写同一个 lucene 必然导致争抢计算资源。 所以ES首先将不同类型的数据写入到了不同的Lucene中,这样在读取数据时,根据需要搜索不同的 Index Name,这就大大降低了单个 lucene 的压力。其次,ES还将某些Index Name内数据可能过多的单个lucene 拆成好几份,每份都是一个 shard 分片,每个 shard 分片本质上就是一个独立的 lucene 库。这样就可以将读写操作分摊到多个 分片 中去,大大降低了争抢,提升了系统性能。
②高扩展性
随着 分片 变多,如果 分片 都在同一台机器上的话,就会导致单机 cpu 和内存过高,影响整体系统性能。
在ES中,可以使用更多的机器,将 分片 分散部署在多台机器上,这每一台机器,就是一个 Node。通过增加 Node 缓解机器 cpu 过高带来的性能问题。
③高可用
高可用问题基本都是通过副本解决。ES中也是一样, 通过给 分片 多加几个副本。将 分片 分为 Primary shard 和 Replica shard,也就是主分片和副本分片 。主分片会将数据同步给副本分片,副本分片既可以同时提供读操作,还能在主分片挂了的时候,升级成新的主分片让系统保持正常运行,提高性能的同时,还保证了系统的高可用。
7、ES架构
从架构角度来看,ES给了一套方案,让一个单机系统 lucene 变成一个高性能、高扩展、高可用的分布式系统。
在ES集群中,分为三类node角色。
- 主节点(Master Node), 负责管理集群。
- 协调节点(Coordinate Node),负责存储管理数据。
- 数据节点(Data Node),负责接受客户端请求。
集群规模小的时候,一个 Node 可以同时充当多个角色,随着集群规模变大,可以让一个 Node 一个角色。
ES集群中的节点之间基于类似一致性算法 Raft 的方式,在节点间互相同步数据,让所有 Node 看到的集群数据状态都是一致的。这样,集群内的 Node 就能参与选主过程,还能了解到集群内某个 Node 是不是挂了等信息。
- ES架构 V.S Kafka架构 V.S RocketMQ架构
确实:很多优秀的开源项目架构都是相似的。
架构 | ES | Kafka | RocketMQ |
---|---|---|---|
消息分类 | index name | topic | topic |
数据分片 | Shard | Partition | msgqueue |
节点 | node | broker | broker |
高可用 | 多副本 | 多副本 | master-slave |
数据一致性 | 类似Raft协议 | ISR和ack机制 | Raft协议 |
元数据 | coordinate node | zookeeper | nameserver |
8、ES写入流程
ES 对外提供 http
接口,任何语言的客户端都可以通过 HTTP 接口接入 es,实现对数据的增删改查。
- 当客户端应用发起数据写入请求,请求会先发到集群中协调节点。
- 协调节点根据 hash 路由,判断数据该写入到哪个数据节点里的哪个分片(Shard),找到主分片并写入。分片底层是 lucene,所以最终是将数据写入到 lucene 库里的 segment 内,将数据固化为倒排索引和 Stored Fields 以及 Doc Values 等多种结构。
- 主分片 写入成功后会将数据同步给 副本分片。
- 副本分片 写入完成后,主分片会响应协调节点一个 ACK,意思是写入完成。
- 最后,协调节点响应客户端应用写入完成。
9、ES搜索流程
ES 的搜索流程分为两个阶段:分别是查询阶段(Query Phase)和获取阶段(Fetch Phase)
Query Phase:
- 当客户端应用发起搜索请求,请求会先发到集群中的协调节点。
- 协调节点根据 index name 的信息,可以了解到 index name 被分为了几个 分片,以及这些分片 分散哪个数据节点上,将请求转发到这些数据节点的 分片 上面。
- 搜索请求到达分片后,分片 底层的 lucene 库会并发搜索多个 segment,利用每个 segment 内部的倒排索引获取到对应文档 id,并结合 doc values 获得排序信息。分片将结果聚合返回给协调节点。
- 协调节点对多个分片中拿到的数据进行一次排序聚合,舍弃大部分不需要的数据。
Fetch Phase:
- 协调节点再次拿着文档 id 请求数据节点里的 分片,分片 底层的 lucene 库会从 segment 内的 Stored Fields 中取出完整文档内容,并返回给协调节点。
- 协调节点最终将数据结果返回给客户端。完成整个搜索过程。
四、如何使用ElasticSearch
1、安装部署
通过docker-compose一键部署一套es+kibana容器服务。【第七章节有详细部署步骤】
vim docker-compose-es.yml
version: '3'
services:
#elasticsearch服务
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.13.4
container_name: elasticsearch
user: root
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- TZ=Asia/Shanghai
volumes:
- ./data/elasticsearch/data:/usr/share/elasticsearch/data
restart: always
ports:
- 9200:9200
- 9300:9300
networks:
- looklook_net
#查看elasticsearch数据
kibana:
image: docker.elastic.co/kibana/kibana:7.13.4
container_name: kibana
environment:
- elasticsearch.hosts=http://elasticsearch:9200
- TZ=Asia/Shanghai
restart: always
networks:
- looklook_net
ports:
- "5601:5601"
depends_on:
- elasticsearch
networks:
looklook_net:
driver: bridge
ipam:
config:
- subnet: 172.16.0.0/16
- 一键启动
docker-compose -f docker-compose-es.yml up -d
2、CRUD操作
有多种方式可以对ES进行CRUD操作,比如:shell命令行操作、kibana界面操作、基于各种开发语言的客户端调用操作等等。这些操作的本质都一样:即通过编写组装DSL语句,基于Restful请求通过http接口调用发送给ES服务端。
① index索引操作
-
查看当前节点的所有 Index
# 如何是shell命令操作 curl -X GET localhost:9200/_cat/indices?v curl -X GET http://localhost:9200/_cat/indices?v curl -X GET "http://localhost:9200/_cat/indices?v" # 等同于在kibana界面操作 GET /_cat/indices?v
可以看出shell命令不大方便,所有基本都推荐使用 Kibana。后面统一使用Kibana编写DSL的方式来演示。
-
创建Index
PUT /mybook { "mappings": { "dynamic": false, "properties": { "id": { "type": "keyword" //使用 keyword 类型,适合精确匹配 }, "title": { "type": "text", // 使用 text 类型,支持全文搜索 "analyzer": "standard" // 使用标准分析器 }, "isbn": { "type": "text" }, "author": { "type": "text" }, "created_at": { "type": "date", // 使用 date 类型,处理日期和时间 "format": "yyyy-MM-dd'T'HH:mm:ss" // 日期格式 }, "publisherName": { "type": "text" } } } }
常见的mapping属性包括:
-
type:字段数据类型,常见的简单类型有:
-
- 字符串:text(可分词的文本)、keyword(精确值,例如:唯一id、品牌、国家、ip地址)
- 数值:long、integer、short、byte、double、float
- 布尔:boolean
- 日期:date
- 对象:object
-
index:是否创建索引,默认为true
-
analyzer:使用哪种分词器
-
properties:该字段的子字段
-
-
删除Index
DELETE /mybook
-
修改Index
索引库一旦创建,无法修改mapping。这是因为虽然倒排索引结构并不复杂,但是一旦索引数据结构有改变(比如改变了分词器),就需要重新创建倒排索引。 因此索引修改只允许添加新的字段到mapping中,因为不会对倒排索引产生影响。
PUT /mybook/_mapping { "properties" : { "description" : { "type" : "text" } } }
-
查询Index
GET /mybook # 等同于 curl -X GET localhost:9200/mybook curl -X GET localhost:9200/mybook?pretty=true
② 文档操作
索引创建完毕之后,可以开始文档的操作。
## 创建索引
PUT /books
## 查看索引
GET /books
## 查看索引映射【这时索引映射是空的】
GET /books/_mapping
## 创建文档
POST /books/_doc
{
"name": "Snow Crash liuwen",
"author": "Neal Stephenson",
"release_date": "1992-06-01",
"page_count": 470
}
## 查看索引映射【索引映射是可以根据文档自动生成的】
GET /books/_mapping
POST /books/_doc
{
"name": "The Great Gatsby",
"author": "F. Scott Fitzgerald",
"release_date": "1925-04-10",
"page_count": 180,
"language": "EN"
}
GET /books/_mapping
POST /_bulk
{ "index" : { "_index" : "books" } }
{"name": "Revelation Space", "author": "Alastair Reynolds", "release_date": "2000-03-15", "page_count": 585}
{ "index" : { "_index" : "books" } }
{"name": "1984", "author": "George Orwell", "release_date": "1985-06-01", "page_count": 328}
{ "index" : { "_index" : "books" } }
{"name": "Fahrenheit 451", "author": "Ray Bradbury", "release_date": "1953-10-15", "page_count": 227}
{ "index" : { "_index" : "books" } }
{"name": "Brave New World", "author": "Aldous Huxley", "release_date": "1932-06-01", "page_count": 268}
{ "index" : { "_index" : "books" } }
{"name": "The Handmaids Tale", "author": "Margaret Atwood", "release_date": "1985-06-01", "page_count": 311}
## 全部查询
GET /books/_search
## 模糊查询
GET books/_search
{
"query": {
"match": {
"name": "Brave"
}
}
}
## 支持排序、limit查询
GET /books/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"page_count": {
"order": "desc"
}
}
],
"size": 3
}
五、MySQL数据导入到ElasticSearch的4种方案
1、同步双写
这是一种最为简单的方式,在将数据写到mysql时,同时将数据写到ES。
-
优点:
1、业务逻辑简单;
2、实时性高。 -
缺点:
1、 硬编码,有需要写入mysql的地方都需要添加写入ES的代码;
2、 业务强耦合;
3、 存在双写失败丢数据风险;
4、 性能较差:本来mysql的性能不是很高,再加一个ES,系统的性能必然会下降。 -
附:
上面说的双写失败风险,包括以下几种:
1) ES系统不可用;
2) 程序和ES之间的网络故障;
3) 程序重启,导致系统来不及写入ES等。
针对这种情况,有数据强一致性要求的,就必须双写放到事务中来处理,而一旦用上事物,则性能下降更加明显。
2、异步双写
针对多数据源写入的场景,可以借助MQ实现异步的多源写入,这种情况下各个源的写入逻辑互不干扰,不会由于单个数据源写入异常或缓慢影响其他数据源的写入,虽然整体写入的吞吐量增大了,但是由于MQ消费是异步消费,所以不适合实时业务场景。
-
优点:
1、性能高;
2、不易出现数据丢失问题,主要基于MQ消息的消费保障机制,比如ES宕机或者写入失败,还能重新消费MQ消息;
3、多源写入之间相互隔离,便于扩展更多的数据源写入。 -
缺点:
1、硬编码问题,接入新的数据源需要实现新的消费者代码;
3、系统复杂度增加:引入了消息中间件;
4、可能出现延时问题:MQ是异步消费模型,用户写入的数据不一定可以马上看到,造成延时。
3、基于MySQL表定时同步
上面两种方案中都存在硬编码问题,也就是有任何对mysq进行增删改查的地方要么植入ES代码,要么替换为MQ代码,代码的侵入性太强。
如果对实时性要求不高的情况下,可以考虑用定时器来处理,具体步骤如下:
1、数据库的相关表中增加一个字段为timestamp的字段,任何crud操作都会导致该字段的时间发生变化;
2、原来程序中的CURD操作不做任何变化;
3、增加一个定时器程序,让该程序按一定的时间周期扫描指定的表,把该时间段内发生变化的数据提取出来;
4、逐条写入到ES中。
典型实现案例——logstash
实现数据同步,其底层实现原理就是根据配置定期使用sql查询新增的数据写入ES中,实现数据的增量同步。
- 优点:
1、不改变原来代码,没有侵入性、没有硬编码;
2、没有业务强耦合,不改变原来程序的性能;
3、Worker代码编写简单不需要考虑增删改查。 - 缺点:
1、时效性较差,由于是采用定时器根据固定频率查询表来同步数据,尽管将同步周期设置到秒级,也还是会存在一定时间的延迟;
2、对数据库有一定的轮询压力,一种改进方法是将轮询放到压力不大的从库上。
4、基于Binlog日志实时同步
上面三种方案要么有代码侵入,要么有硬编码,要么有延迟,那么有没有一种方案既能保证数据同步的实时性又没有代入侵入呢?
当然有,可以利用mysql的binlog来进行同步。其实现原理如下:
具体步骤如下:
1) 读取mysql的binlog日志,获取指定表的日志信息;
2) 将读取的信息转为MQ;
3) 编写一个MQ消费程序;
4) 不断消费MQ,每消费完一条消息,将消息写入到ES中。
- 优点:
1、没有代码侵入、没有硬编码;
2、原有系统不需要任何变化,没有感知;
3、性能高;
4、业务解耦,不需要关注原来系统的业务逻辑。 - 缺点:
1、构建Binlog系统复杂;
2、如果采用MQ消费解析的binlog信息,也会像方案二一样存在MQ延时的风险。
业界目前较为流行的实现方案——使用Canal实时同步MySQL数据到ElasticSearch。【见第六节】
总结:上述四种同步方案总结
- 1、同步双写是最简单的同步方式,能最大程度保证数据同步写入的实时性,最大的问题是代码侵入性太强。
- 2、异步双写引入了消息中间件,由于MQ都是异步消费模型,所以可能出现数据同步延迟的问题。好处是在大规模消息同步时吞吐量更、高性能更好,便于接入更多的数据源,且各个数据源数据消费写入相互隔离互不影响。
- 3、基于Mysql表定时扫描同步 ,原理是通过定时器定时扫描表中的增量数据进行数据同步,不会产生代码侵入,但由于是定时扫描同步,所以也会存在数据同步延迟问题,典型实现是采用 Logstash 实现增量同步。
- 4、基于Binlog实时同步 ,原理是通过监听Mysql的binlog日志进行增量同步数据。不会产生代码侵入,数据同步的实时也能得到保障,弊端是Binlog系统都较为复杂。典型实现是采用 canal 实现数据同步。
六、Canal工作原理
Canal官网地址: https://github.com/alibaba/canal
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
1、MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log,其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)。
- MySQL slave 将 master 的 binary log events 通过IO线程拷贝到它的中继日志(relay log)。
- MySQL slave 通过SQL线程重放 relay log 中事件,将数据变更反映它自己的数据。
2、canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向 MySQL master 发送dump 协议。
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal)。
- Canal 解析 binary log 对象(原始为 byte 流)。
- Canal 作为
binlog
的监听者,能够实时捕捉到这些数据变动,然后推送到下游系统。
3、canal架构
canal.deployer
作为基础,负责从数据库获取原始的变更数据,是整个流程的数据源头。它获取到的数据变更事件会传递给后续环节。canal.admin
对canal.deployer
以及整个Canal
体系进行管理和配置,通过它可以确保canal.deployer
能正确地连接数据源、按照合适的参数运行等;同时也可以对canal.adapter
相关配置进行管理,比如指定canal.adapter
要将数据同步到哪些目标端等。canal.adapter
依赖canal.deployer
解析出来的数据,然后按照既定的规则和配置,将这些数据同步到目标存储或应用中,完成数据的最终流向和落地,而它的配置和运行状态又受到canal.admin
的管控。
- “侦察兵”
canal.deployer
是Canal
项目的核心部署包,它主要负责与数据库的连接以及数据变更日志(比如 MySQL 的 binlog)的读取等基础操作。它内部包含了处理数据抓取、解析等关键逻辑的模块,是整个数据同步链路启动的源头部分。 - “控制中心”
canal.admin
是Canal
的管理端组件包。它提供了一个可视化的管理界面以及对应的管理接口,方便运维人员和开发人员对Canal
服务进行配置管理、监控等操作。比如,通过canal.admin
的界面,可以轻松地配置要连接的数据源信息(像数据库的连接地址、账号、密码等),管理不同的Canal
实例,查看各个实例的运行状态(是正常运行、还是出现了错误等),还能进行一些动态的参数调整,像是调整数据抓取的频率等。 - “数据搬运工”
canal.adapter
主要承担着将canal.deployer
抓取并解析出来的数据变更内容,适配并发送到其他目标存储或应用系统中的任务。例如,它可以把从 MySQL 数据库解析出来的表数据变更,按照特定的格式和规则,同步到 Elasticsearch 搜索引擎中,使得 Elasticsearch 中的数据能够及时更新,保持和MySQL 数据源的一致性;或者将数据同步到 Kafka 消息队列,以便后续其他系统可以从 Kafka 中消费这些变更数据做进一步处理。
七、Canal实时同步Mysql数据到ElasticSearch
要将 MySQL 数据库中表的增、删、改操作同步到 Elasticsearch 的索引,需要正确安装配置MySQL、Canal 、Canal Adapter 、 Elasticsearch 和 kibana。
1、MySQL安装部署
- 安装MySQL
参考如下链接,当前安装MySQL 5.7版本。
MySQL最全安装教程:https://blog.csdn.net/qq_41822345/article/details/117779815
- 开启binlog日志。
vim /etc/my.cnf
## 在[mysqld]模块添加如下内容
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
同步MySQL数据到ES依赖与binlog,如果MySQL没有开启binlog,或者binlog被删除。那么那些没有binlog的旧数据就没法同步到ES了,怎么办?——后面会提到:es提供了全量导入的方法。
- 重启MySQL,通过show variables like ‘%XXX%’;查看上述配置是否生效。
- 准备好同步用户。
出于数据安全考虑,需要为canal组件单独创建一个账号【给予该账号从库权限】。
#创建用户,密码自己填写,由于创建用户时默认的密码加密方式为caching_sha2_password,所以修改为mysql_native_password,否则服务端启动时可能会报错
## mysql 5.7
create user 'canal'@'%' identified with mysql_native_password by '123456';
## mysql 5.6
create user 'canal'@'%' identified by '123456';
# 给新创建账户赋予从库权限
grant select, replication slave, replication client on *.* to 'canal'@'%';
# 刷新权限
flush privileges;
- 准备好同步数据【用于Demo演示】。
## 这里准备好一个数据库canal,两张表notice和result,每张表插入5条数据。
CREATE DATABASE canal;
use canal;
DROP TABLE IF EXISTS `notice`;
CREATE TABLE `notice` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`content` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
`created_at` datetime NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` datetime NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 24 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `notice` VALUES (1, 'Title 1', 'Content for notice 1', '2024-12-27 14:55:01', '2024-12-27 14:55:01');
INSERT INTO `notice` VALUES (2, 'Title 2', 'Content for notice 2', '2024-12-27 14:55:01', '2024-12-27 14:55:01');
INSERT INTO `notice` VALUES (3, 'Title 3', 'Content for notice 3', '2024-12-27 14:55:01', '2024-12-27 14:55:01');
INSERT INTO `notice` VALUES (4, 'Title 4', 'Content for notice 4', '2024-12-27 14:55:01', '2024-12-27 14:55:01');
INSERT INTO `notice` VALUES (5, 'Title 5', 'Content for notice 5', '2024-12-27 14:55:01', '2024-12-27 14:55:01');
DROP TABLE IF EXISTS `result`;
CREATE TABLE `result` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) NOT NULL,
`score` decimal(5, 2) NOT NULL,
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 21 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `result` VALUES (1, 1, 95.75, '2024-08-01 10:00:00', '2024-08-01 10:00:00');
INSERT INTO `result` VALUES (2, 2, 88.50, '2024-08-01 10:05:00', '2024-08-01 10:05:00');
INSERT INTO `result` VALUES (3, 3, 76.20, '2024-08-01 10:10:00', '2024-08-01 10:10:00');
INSERT INTO `result` VALUES (4, 4, 82.00, '2024-08-01 10:15:00', '2024-08-01 10:15:00');
INSERT INTO `result` VALUES (5, 5, 91.30, '2024-08-01 10:20:00', '2024-08-01 10:20:00');
2、ElasticSearch安装部署
官网:https://www.elastic.co/cn/
ElasticSearch源码包下载官网:https://github.com/elastic/elasticsearch
ElasticSearch安装包下载官网:https://www.elastic.co/downloads/elasticsearch
- step0:先对操作系统做一些参数配置优化。
#1 设置打开的文件句柄数和线程数
vim /etc/security/limits.conf
# 添加
# soft:软限制;hard:硬限制
# nproc:单个用户可打开的进程最大数
# nofile:单个进程打开文件最大数
# as:地址空间限制(unlimited:无限)
# fsize:最大文件大小
# memlock:最大锁定内存地址空间
* soft nproc 65536
* hard nproc 65536
* soft nofile 65536
* hard nofile 65536
* - as unlimited
* - fsize unlimited
* - memlock unlimited
#2 关闭 swap 交换空间
swapoff -a && sed -i '/swap/s/^.*$/#&/' /etc/fstab
#3 设置虚拟内存大小和 TCP 超时重传次数
vim /etc/sysctl.conf
# 添加
vm.max_map_count=262144
net.ipv4.tcp_retries2=5
net.core.somaxconn = 1024
vm.overcommit_memory = 1
# 默认情况下 TCP keepalive 时间为 60 秒,超时重传 15 次。
# 使上述配置生效
sysctl -p
以版本8.17为例(当前[2025年1月2日]最新版本就是8.17)。
- step1:官网下载安装包上传到Linux服务器
# Step1:官网下载安装包上传到Linux服务器
tar -xf elasticsearch-8.17.0-linux-x86_64.tar.gz -C /usr/local/
useradd -u 9200 esuser
mkdir -p /data/elasticsearch/{data,logs,temp}
chown -R esuser:esuser /data/elasticsearch/ /usr/local/elasticsearch-8.17.0/
cd /usr/local/elasticsearch-8.17.0/
- step2:修改ES的配置文件
vim config/elasticsearch.yml
## Step2:修改配置文件,添加如下内容
cluster.name: es-dev #集群名称
node.name: es #节点名称
#######----------这个配置只需要在kibana节点安装 ,且需要使用堆栈监测功能---------#######
node.roles: [master,data,ingest, ml, transform, remote_cluster_client]
#######----------如果使用堆栈功能,需要把安全认证关闭------------------#######
path.data: /data/elasticsearch/data # 数据存储位置
path.logs: /data/elasticsearch/logs #日志存储位置
network.host: 0.0.0.0 #允许连接IP
# 允许跨域
http.port: 9200 # 网页访问端口
transport.profiles.default.port: 9300
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: "*"
#http.cors.allow-methods: "GET"
cluster.initial_master_nodes: ["es"]
action.destructive_requires_name: false
discovery.seed_hosts: ["10.0.0.101:9300"] # 集群成员
#关闭安全认证配置
xpack.security.enabled: false
xpack.security.transport.ssl.enabled: false
#关闭geoip配置
ingest.geoip.downloader.enabled: false
xpack.monitoring.collection.enabled: true
- step3:修改jvm配置
vim config/jvm.options
将注释删除,改为当前自己需要的内存大小,比如当前内存是16G,所以改为4G大小 1/4即可
-Xms4g
-Xmx4g
- step4:下载中文分词器
./bin/elasticsearch-plugin install https://get.infini.cloud/elasticsearch/analysis-ik/8.17.0
#中途输入 y
#下载完成会在 plugins 目录生成analysis-ik 目录
chown -R esuser.esuser /usr/local/elasticsearch-8.17.0/
- step5:启动ES
runuser -l esuser -c "/usr/local/elasticsearch-8.17.0/bin/elasticsearch -d"
# 查看日志
tail -f /data/elasticsearch/logs/es-dev.log
- step6:验证
## 查看9200端口
curl http://127.0.0.1:9200
## 输出如下,说明启动成功
{
"name" : "es",
"cluster_name" : "es-dev",
"cluster_uuid" : "Dg6aV1E8QzetPH9vgek-zg",
"version" : {
"number" : "8.17.0",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "2b6a7fed44faa321997703718f07ee0420804b41",
"build_date" : "2024-12-11T12:08:05.663969764Z",
"build_snapshot" : false,
"lucene_version" : "9.12.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
3、安装kibana
kibana源码包下载官网:https://github.com/elastic/kibana
kibana安装包下载官网:https://www.elastic.co/downloads/kibana
- step1:官网下载安装包上传到Linux服务器
tar -xf kibana-8.17.0-linux-x86_64.tar.gz -C /usr/local/
cd /usr/local/kibana-8.17.0/
- step2:修改kibana配置文件
vim config/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
##填本机IP或者 0.0.0.0 都可以,最好写本机IP
server.name: "kibana-dev"
## name 名称可以随便指定
### es集群配置
elasticsearch.hosts: ["http://127.0.0.1:9200"]
pid.file: /usr/local/kibana-8.17.0/kibana.pid
elasticsearch.requestTimeout: 99999
i18n.locale: "zh-CN"
#---------------------#####-------------------------
#如果高版本需要配置如下两个参数 用户名密码为 elasticsearch 安全证书用户密码
#如果没有生成证书认证,可不加如下两个参数
elasticsearch.username: "user"
elasticsearch.password: "password"
- step3:启动kibana
useradd -u 5601 kibana
chown -R kibana:kibana /usr/local/kibana-8.17.0/
nohup /usr/local/kibana-8.17.0/bin/kibana --allow-root > /var/log/kibana.log &
#查看日志
tail -f /var/log/kibana.log
- step4:验证
## 查看5601端口 或者浏览器上访问kibana页面
curl http://127.0.0.1:5601/app/home
- 准备好同步数据。
这里创建两个索引:notice和result。
GET /notice/_mapping
PUT /notice
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": {
"type": "keyword" // 使用 keyword 类型,适合精确匹配
},
"title": {
"type": "text", // 使用 text 类型,支持全文搜索
"analyzer": "standard" // 使用标准分析器
},
"content": {
"type": "text", // 使用 text 类型,支持全文搜索
"analyzer": "standard" // 使用标准分析器
},
"created_at": {
"type": "date", // 使用 date 类型,处理日期和时间
"format": "yyyy-MM-dd'T'HH:mm:ss" // 日期格式
},
"updated_at": {
"type": "date", // 使用 date 类型,处理日期和时间
"format": "yyyy-MM-dd'T'HH:mm:ss" // 日期格式
}
}
}
}
GET /result/_mapping
PUT /result
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"user_id": {
"type": "integer"
},
"score": {
"type": "float"
},
"created_at": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"updated_at": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
}
POST /notice/_doc/1
{
"id": "1",
"title": "Sample Notice Title",
"content": "This is the content of the notice.",
"created_at": "2024-08-29T08:00:00",
"updated_at": "2024-08-29T08:00:00"
}
GET /notice/_search
4、Canal安装部署
Canal安装包下载官网:https://github.com/alibaba/canal
- step1:官网下载安装包上传到Linux服务器
这里选择版本1.1.7【当前[2025年1月2日]最新版本1.1.8属于 α 版本,还处于内侧阶段】
一共三个包canal.adapter-1.1.7.tar.gz、canal.admin-1.1.7.tar.gz、canal.deployer-1.1.7.tar.gz
启动顺序:canal.admin、canal.deployer、canal.adapter
mkdir -p /usr/local/canal/canal-adapter
mkdir -p /usr/local/canal/canal-admin
mkdir -p /usr/local/canal/canal-deployer
tar -xf canal.adapter-1.1.7.tar.gz -C /usr/local/canal/canal-adapter
tar -xf canal.admin-1.1.7.tar.gz -C /usr/local/canal/canal-admin
tar -xf canal.deployer-1.1.7.tar.gz -C /usr/local/canal/canal-deployer
① canal.admin 安装启动
- step2:事先准备好canal_manager库表。【sql文件在/usr/local/canal/canal-admin/conf/canal_manager.sql】
CREATE DATABASE /*!32312 IF NOT EXISTS*/ `canal_manager` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;
USE `canal_manager`;
SET NAMES utf8;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for canal_adapter_config
-- ----------------------------
DROP TABLE IF EXISTS `canal_adapter_config`;
CREATE TABLE `canal_adapter_config` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`category` varchar(45) NOT NULL,
`name` varchar(45) NOT NULL,
`status` varchar(45) DEFAULT NULL,
`content` text NOT NULL,
`modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_cluster
-- ----------------------------
DROP TABLE IF EXISTS `canal_cluster`;
CREATE TABLE `canal_cluster` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(63) NOT NULL,
`zk_hosts` varchar(255) NOT NULL,
`modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_config
-- ----------------------------
DROP TABLE IF EXISTS `canal_config`;
CREATE TABLE `canal_config` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`cluster_id` bigint(20) DEFAULT NULL,
`server_id` bigint(20) DEFAULT NULL,
`name` varchar(45) NOT NULL,
`status` varchar(45) DEFAULT NULL,
`content` text NOT NULL,
`content_md5` varchar(128) NOT NULL,
`modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `sid_UNIQUE` (`server_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_instance_config
-- ----------------------------
DROP TABLE IF EXISTS `canal_instance_config`;
CREATE TABLE `canal_instance_config` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`cluster_id` bigint(20) DEFAULT NULL,
`server_id` bigint(20) DEFAULT NULL,
`name` varchar(45) NOT NULL,
`status` varchar(45) DEFAULT NULL,
`content` text NOT NULL,
`content_md5` varchar(128) DEFAULT NULL,
`modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `name_UNIQUE` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_node_server
-- ----------------------------
DROP TABLE IF EXISTS `canal_node_server`;
CREATE TABLE `canal_node_server` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`cluster_id` bigint(20) DEFAULT NULL,
`name` varchar(63) NOT NULL,
`ip` varchar(63) NOT NULL,
`admin_port` int(11) DEFAULT NULL,
`tcp_port` int(11) DEFAULT NULL,
`metric_port` int(11) DEFAULT NULL,
`status` varchar(45) DEFAULT NULL,
`modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_user
-- ----------------------------
DROP TABLE IF EXISTS `canal_user`;
CREATE TABLE `canal_user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`username` varchar(31) NOT NULL,
`password` varchar(128) NOT NULL,
`name` varchar(31) NOT NULL,
`roles` varchar(31) NOT NULL,
`introduction` varchar(255) DEFAULT NULL,
`avatar` varchar(255) DEFAULT NULL,
`creation_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;
-- ----------------------------
-- Records of canal_user
-- ----------------------------
BEGIN;
INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28');
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
- step3:进入到canal.admin目录。
cd /usr/local/canal/canal-admin
修改conf/application.yml内容如下:
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
## 这里需要给canal.admin服务至少写权限,为了简单,直接赋予root用户
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
- step4:启动并验证admin服务。
## 启动
./bin/startup.sh
## 观察日志
tail -f logs/admin.log
## 访问页面【浏览器访问】
curl http://127.0.0.1:8089
# 登录:admin/123456
② canal.deployer安装启动
- ste5:进入到canal.deployer目录。
cd /usr/local/canal/canal-deployer
## 先将canal.properties备份【这里面配置太多了,实际上不需要动,放一边去】
mv conf/canal.properties conf/canal.properties.bak
## 再修改canal_local文件【这里面配置就少了很多】
mv conf/canal_local.properties conf/canal.properties
再修改conf/canal.properties内容如下:
# register ip
canal.register.ip = 127.0.0.1
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name =
- step6:修改conf/example/instance.properties配置
#被同步的mysql地址
canal.instance.master.address=127.0.0.1:3306
#数据库从库权限账号
canal.instance.dbUsername=canal
#数据库从库权限账号的密码
canal.instance.dbPassword=123456
#数据库连接编码
canal.instance.connectionCharset = UTF-8
#需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*
#这里与当前文件夹名保持一致,后面会用到
canal.mq.topic=example
- step7:上述修改不会生效,需要在canal-admin上的instance管理处进行instance的配置。如下:
- step8:启动并验证
## 启动
./bin/startup.sh
## 观察日志
tail -f logs/canal/canal.log
tail -f logs/example/example.log
## 访问页面【浏览器访问】
curl http://127.0.0.1:8089
# 登录:admin/123456
# 可以看见server管理出于启动状态
③ canal.adapter安装启动
- step9:进入到canal.adapter目录。
cd /usr/local/canal/canal-adapter
修改conf/application.yml文件内容如下:
## 源端配置
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true
username: canal
password: 123456
## 目标端配置
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
## ...
- name: es8 ## 对应es8目录
## 注意,要加上http://
hosts: http://127.0.0.1:9200 # or http://127.0.0.1:9300
properties:
mode: rest # transport for 9300, rest for 9200
# security.auth: test:123456 # only used for rest mode
cluster.name: es-dev # 部署es时定义es集群名称
修改conf/bootstrap.yml文件内容如下:
canal:
manager:
jdbc:
url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
username: root
password: 123456
- step10:增加mysql表到es索引的映射配置。
conf目录下有es6,es7,es8三个目录,根据es版本选择一个目录。一张MySQL表对应一个配置文件【也可以多张表对应一个配置文件,后面会提到】。
## 先删除es8目标下的默认配置文件。
rm -rf conf/es8/*
## 新建notice表对应的配置文件
vim conf/es8/esMappingNotice.yml
## 文件内容如下:
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: notice
_id: _id
_type: _doc
upsert: true
sql: "
SELECT
c.id AS _id,
c.title AS title,
c.content AS content,
DATE_FORMAT (c.created_at, '%Y-%m-%dT%H:%i:%s') AS created,
DATE_FORMAT (c.updated_at, '%Y-%m-%dT%H:%i:%s') AS updated
FROM
notice AS c
"
commitBatch: 3000
## 新建result表对应的配置文件
vim conf/es8/esMappingResult.yml
## 文件内容如下:
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: result
_id: _id
_type: _doc
upsert: true
sql: "
SELECT
c.id AS _id,
c.user_id AS userid,
c.score AS score,
DATE_FORMAT (c.created_at, '%Y-%m-%dT%H:%i:%s') AS created,
DATE_FORMAT (c.updated_at, '%Y-%m-%dT%H:%i:%s') AS updated
FROM
result AS c
"
commitBatch: 3000
时间类型的表结构想要存到es中必须自定义转换器或格式化程序,将 Timestamp 转换为 Elasticsearch 支持的日期格式 , 否则导入时会报错。
- step11:启动并验证。
## 启动执行
cd /usr/local/canal/canal-adapter
chmod 777 -R conf/es8
## 启动
./bin/startup.sh
## 观察日志
tail -f logs/adapter/adapter.log
5、整体验证
- step12:全量导入验证
## 执行如下命令会进行全量导入【该操作是幂等的】
curl "localhost:8081/etl/es8/esMappingNotice.yml" -X POST
curl "localhost:8081/etl/es8/esMappingResult.yml" -X POST
## 输出:{"succeeded":true,"resultMessage":"导入ES 数据:9 条"}
## 输出:{"succeeded":true,"resultMessage":"导入ES 数据:5 条"}
- step13:增量导入验证
## 在MySQL中执行
INSERT INTO notice (id, title, content, created_at, updated_at) VALUES (30, 'New Notice', 'This is a new notice', NOW(), NOW());
## 查看日志
tail -f logs/adapter/adapter.log
## 日志输出:2025-01-03 16:58:36.800 [pool-3-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":30,"title":"New Notice","content":"This is a new notice","created_at":1735894716000,"updated_at":1735894716000}],"database":"canal","destination":"example","es":1735894716000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"notice","ts":1735894716798,"type":"INSERT"}
## 2025-01-03 16:58:36.815 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":30,"title":"New Notice","content":"This is a new notice","created_at":1735894716000,"updated_at":1735894716000}],"database":"canal","destination":"example","es":1735894716000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"notice","ts":1735894716798,"type":"INSERT"}
## Affected indexes: notice
- step14:查看ES中的结果
curl -X GET localhost:9200/result/_search?pretty
curl -X GET localhost:9200/notice/_search?pretty
6、启动脚本整理
# 启动ES
cd /usr/local/elasticsearch-8.17.0/
runuser -l esuser -c "/usr/local/elasticsearch-8.17.0/bin/elasticsearch -d"
# 启动kibana
cd /usr/local/kibana-8.17.0/
nohup /usr/local/kibana-8.17.0/bin/kibana --allow-root > /var/log/kibana.log &
# 启动canal-admin
cd /usr/local/canal/canal-admin
./bin/stop.sh
./bin/startup.sh
# 启动canal-deployer
cd /usr/local/canal/canal-deployer
./bin/stop.sh
./bin/startup.sh
# 启动canal-adapter
cd /usr/local/canal/canal-adapter
./bin/stop.sh
./bin/startup.sh
## 验证
curl "localhost:8081/etl/es8/esMappingNotice.yml" -X POST
curl "localhost:8081/etl/es8/esMappingResult.yml" -X POST
curl "localhost:8081/etl/es8/esMappingPatrolPlan.yml" -X POST
八、Go-Zero项目集成ElasticSearch
1、数据同步
- canal开启前的数据如何同步
canal-adapter提供一个REST接口可全量同步数据到ES,调用Client-Adapter服务的方法触发同步任务。此时,canal会先中止增量数据传输,然后同步全量数据。待全量数据同步完成后,canal会自动进行增量数据同步。
注意:如果数据是binlog开启前存在,则不可以使用此种方式
## eg:执行如下命令会进行全量导入【该操作是幂等的】
curl "localhost:8081/etl/es8/esMappingNotice.yml" -X POST
- binlog未开启前的历史数据如何同步?
因为canal是基于binlog实现全量同步的,那么未开启binlog之前的历史数据就无法被同步,这时需要将数据库中的数据导出再重新导入一遍,这样就可以生成binlog 。
2、多表连表查询
- 多张表数据同步到一个索引中
在MySQL导入ES的过程中,一张MySQL表对应ES的一个索引index,只需要配置一个对应的yml文件。多张表如何对应到一个index索引里?在我们的业务代码中,一个查询接口经常需要进行多张表的连表查询才能获取最终的结果,这种MySQL表数据如何导入到ES中?举个例子如下:
## 这种SQL语句查出来的数据如何导入到ES中?
SELECT `dcom_fo_patrol_plan`.*,`dcom_fo_patrol_template`.`template_name` FROM `dcom_fo_patrol_plan` LEFT JOIN `dcom_fo_patrol_template` ON `dcom_fo_patrol_template`.`id` = `dcom_fo_patrol_plan`.`template_id` WHERE `dcom_fo_patrol_plan`.`module_gid` = '26' AND `dcom_fo_patrol_plan`.`deleted_at` IS NULL ORDER BY `dcom_fo_patrol_plan`.`created_time` DESC LIMIT 10;
注意:在yml映射文件中,主表一定要在最左侧,从表的数据改变也会自动同步到es中!
示例:dcom_fo_patrol_template 表中的数据改变,也会自动同步到dcom_fo_patrol_plan表对应的es索引patrolplan中。
## 先在ES中创建好对应的索引patrolplan
PUT /patrolplan
{
"mappings": {
"properties": {
"planName": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"templateId": {
"type": "keyword"
},
"moduleGid": {
"type": "keyword"
},
"areaGid": {
"type": "keyword"
},
"period": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"firstTime": {
"type": "date"
},
"lastTime": {
"type": "date"
},
"execTime": {
"type": "date"
},
"createdBy": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"executor": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"executorGroup": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"status": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"executorType": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"realExecutor": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"realExecutorIds": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"uuid": {
"type": "keyword"
},
"reportReceiver": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"deletedAt": {
"type": "date"
},
"updatedTime": {
"type": "date"
},
"createdTime": {
"type": "date"
},
"templateName": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
}
## 增加mysql表到es索引的映射配置
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: patrolplan
_id: _id
_type: _doc
upsert: true
sql: "
SELECT
pp.id AS _id,
pp.plan_name AS planName,
pp.template_id AS templateId,
pp.module_gid AS moduleGid,
pp.area_gid AS areaGid,
pp.period AS period,
pp.first_time AS firstTime,
pp.last_time AS lastTime,
pp.exec_time AS execTime,
pp.created_by AS createdBy,
pp.executor AS executor,
pp.executor_group AS executorGroup,
pp.status AS status,
pp.executor_type AS executorType,
pp.real_executor AS realExecutor,
pp.real_executor_ids AS realExecutorIds,
pp.uuid AS uuid,
pp.report_receiver AS reportReceiver,
pp.deleted_at AS deletedAt,
pp.updated_time AS updatedTime,
pp.created_time AS createdTime,
pt.template_name AS templateName
FROM
dcom_fo_patrol_plan pp
LEFT JOIN
dcom_fo_patrol_template pt
ON
pt.id=pp.template_id
"
commitBatch: 3000
# 在MySQL里修改从表
update dcom_fo_patrol_template set template_name = "test_template" where id = 10;
# 查看canal的adapter日志
tail -f logs/adapter/adapter.log
# 日志内容如下:
... ...
2025-01-08 14:46:12.325 [pool-3-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":10,"template_name":"test_template","bui_name":"","created_by":"dczhiwei","module_gid":"26","area_gid":"1171","created_u_id":"240","duration":1.0,"patrol_type":3,"uuid":0,"report_mail":"","group_id":0,"is_delete":0,"deleted_at":null,"update_time":1733451716000,"created_time":1736318771000}],"database":"canal","destination":"example","es":1736318771000,"groupId":"g1","isDdl":false,"old":[{"template_name":"123456","created_time":1736316855000}],"pkNames":["id"],"sql":"","table":"dcom_fo_patrol_template","ts":1736318772316,"type":"UPDATE"}
2025-01-08 14:46:12.343 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":10,"template_name":"test_template","bui_name":"","created_by":"dczhiwei","module_gid":"26","area_gid":"1171","created_u_id":"240","duration":1.0,"patrol_type":3,"uuid":0,"report_mail":"","group_id":0,"is_delete":0,"deleted_at":null,"update_time":1733451716000,"created_time":1736318771000}],"database":"canal","destination":"example","es":1736318771000,"groupId":"g1","isDdl":false,"old":[{"template_name":"123456","created_time":1736316855000}],"pkNames":["id"],"sql":"","table":"dcom_fo_patrol_template","ts":1736318772316,"type":"UPDATE"}
Affected indexes: patrolplan
... ...
# 查看ES中的搜索结果
GET /patrolplan/_search
{
"query": {
"match": {
"templateName": "test_template"
}
}
}
# 对比MySQL搜索的结果
SELECT `dcom_fo_patrol_plan`.*,`dcom_fo_patrol_template`.`template_name` FROM `dcom_fo_patrol_plan` LEFT JOIN `dcom_fo_patrol_template` ON `dcom_fo_patrol_template`.`id` = `dcom_fo_patrol_plan`.`template_id` WHERE
template_name="test_template";
3、Go-Zero框架集成ES
go-elasticsearch官网:https://github.com/elastic/go-elasticsearch
- 基于go-elasticsearch构建es-client客户端。
package es
import (
"net"
"net/http"
"strings"
"time"
es7 "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/estransport"
"github.com/zeromicro/go-zero/core/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
oteltrace "go.opentelemetry.io/otel/trace"
)
type (
Config struct {
Addresses []string
Username string
Password string
MaxRetries int
MaxIdleConns int // 全局最大空闲连接数
MaxConnsPerHost int // 每主机最大连接数
IdleConnTimeout time.Duration // 空闲连接超时时间
}
Es struct {
*es7.Client
}
// esTransport is a transport for elasticsearch client
esTransport struct {
baseTransport *http.Transport
}
)
func (t *esTransport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
var (
ctx = req.Context()
span oteltrace.Span
//startTime = time.Now()
propagator = otel.GetTextMapPropagator()
indexName = strings.Split(req.URL.RequestURI(), "/")[1]
tracer = trace.TracerFromContext(ctx)
)
ctx, span = tracer.Start(ctx,
req.URL.Path,
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
oteltrace.WithAttributes(semconv.HTTPClientAttributesFromHTTPRequest(req)...),
)
defer func() {
//metric
//metricClientReqDur.Observe(time.Since(startTime).Milliseconds(), indexName)
//metricClientReqErrTotal.Inc(indexName, strconv.FormatBool(err != nil))
span.End()
}()
req = req.WithContext(ctx)
propagator.Inject(ctx, propagation.HeaderCarrier(req.Header))
// 调用基础 Transport 执行请求
resp, err = t.baseTransport.RoundTrip(req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return
}
span.SetAttributes(semconv.DBSQLTableKey.String(indexName))
span.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(resp.StatusCode)...)
span.SetStatus(semconv.SpanStatusFromHTTPStatusCodeAndSpanKind(resp.StatusCode, oteltrace.SpanKindClient))
return
}
func NewEs(conf *Config) (*Es, error) {
transport := &http.Transport{
MaxIdleConns: conf.MaxIdleConns, // 全局最大空闲连接数
MaxIdleConnsPerHost: conf.MaxConnsPerHost, // 每主机最大空闲连接数
MaxConnsPerHost: conf.MaxConnsPerHost, // 每主机最大连接数
IdleConnTimeout: conf.IdleConnTimeout, // 空闲连接超时时间
DialContext: (&net.Dialer{
Timeout: 3 * time.Second, // 建立连接超时时间
KeepAlive: time.Hour, // 保持活动连接的时间
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second, // TLS 握手超时时间
}
// 自定义连接池函数
// 作用
// 1. 多节点请求分发
// 2. 负载均衡
// 3. 故障节点管理
customConnectionPoolFunc := func(addrs []*estransport.Connection, selector estransport.Selector) estransport.ConnectionPool {
// 使用 RoundRobinConnectionPool(轮询连接池)
cp, err := estransport.NewConnectionPool(addrs, selector)
if err != nil {
panic(err)
}
return cp
}
c := es7.Config{
Addresses: conf.Addresses,
Username: conf.Username,
Password: conf.Password,
MaxRetries: conf.MaxRetries,
Transport: &esTransport{baseTransport: transport},
ConnectionPoolFunc: customConnectionPoolFunc,
}
client, err := es7.NewClient(c)
if err != nil {
return nil, err
}
return &Es{
Client: client,
}, nil
}
func MustNewEs(conf *Config) *Es {
es, err := NewEs(conf)
if err != nil {
panic(err)
}
return es
}
- Demo
package es
import (
"context"
"strings"
"testing"
)
func TestElastic(t *testing.T) {
esClient := MustNewEs(&Config{
Addresses: []string{"http://10.0.0.101:9200"},
Username: "es",
Password: "123456",
})
searchResult, err := esClient.Search(
esClient.Search.WithContext(context.Background()),
esClient.Search.WithIndex("mybook"),
esClient.Search.WithBody(strings.NewReader(`{"query":{"match":{"title":"三体"}}}`)),
esClient.Search.WithPretty(),
)
if err != nil {
t.Fatal(err)
}
t.Log("searchResult: ", searchResult)
}