es 3期 第25节-运用Rollup减少数据存储
#### 1.Elasticsearch是数据库,不是普通的Java应用程序,传统数据库需要的硬件资源同样需要,提升性能最有效的就是升级硬件。
#### 2.Elasticsearch是文档型数据库,不是关系型数据库,不具备严格的ACID事务特性,任何企图直接替代严格事务性场景的应用项目都会失败!!!
#### 3.Elasticsearch原则上适合一切非事务性应用场景或能够容许一定的延迟的事务性场景;能最大限度的替代mongodb与传统关系型数据库
##### 索引字段与属性都属于静态设置,若后期变更历史数据需要重建索引才可生效
##### 对历史数据无效!!!!
##### 一定要重建索引!!!!
#### 1、Materialized view 概念介绍
### 概念介绍
## 物化视图名词
# Materialized view
## 物化视图概念介绍
# 将原始数据按照一定规则提前计算好,并存储起来,供其它应用使用,达到高效查询分析的需求
# join/aggs
## 物化数据需求背景
# 原始数据量过大,转换压缩后,节约存储空间,时序数据;
# 聚合压缩后,数据统计检索更快:
## 行业数据产品
# MMP产品: Clickhosue、Greenplum、dorisdb等
### 物化视图技术便利
## 技术便利
# 原有数据压缩,需要自己开发三方应用,或借助三方产品如Spark/Hive,将数据明细聚合之后,存储到ES中,便于查询,需要很多开发工作;
# 2.ES提供的Rollup/Transforms,可以直接将原始数据进行实时的转换,存储到新的索引空间,查询检索更加高效便捷。
### ES物化视图能力
## 自有能力
# 1.Rollup,数据上卷
# 2.Transformer,数据聚合
# 3.自有能力仅限能处理单索引,不能跨多个索引关联
## 外部能力
# 1.基于Spark/Hive/Prestodb查询聚合能力,将多个索引数据关联合并到单个大宽表索引
#### 2、Rollup应用实战
### Rollup概念
# 1.数据上卷,基于时间维度,将原有细粒度时间的数据按照粗一点的粒度汇总,并移除原有的数据;节约存储空间,应用在时序数据领域;
# 2.大数据广告应用产品:Apache Druid
### Rollup应用场景
# 应用场景
# 1.基于时间维度提取预聚合数据,典型的时序数据领域。
# 2.业务数据具备时序特征,都可以运用。
# 3.物联网IOT
# 4.监控系统
# 5.其它
### Rollup接口API
# 1.创建API
# 2.管理维护API3.查询API
## /job/
# PUT /_rollup/job/<job_id>: Create a rollup job
# GET /_rollup/job: List rollup jobs
# GET /_rollup/job/<job_id>: Get rollup job details
# POST /_rollup/job/<job_id>/_start: Start a rollup job
# POST /_rollup/job/<job_id>/_stop: Stop a rollup job
# DELETE /_rollup/job/<job_id>: Delete a rollup job
## /data/
# GET /_rollup/data/<index_pattern>/_rollup_caps: Get Rollup CapabilitiesGET/<index_name>/_rollup/data/: Get Rollup Index Capabilities
## /<index_name>/
# GET/<index_name>/_rollup_search: Search rollup data
# aggs聚合,天维度统计每个洲的销售额
GET kibana_sample_data_ecommerce/_search
{
"track_total_hits": true,
"size": 0,
"aggs": {
"date_histogram_order_date": {
"date_histogram": {
"field": "order_date",
"calendar_interval": "day"
},
"aggs": {
"terms_continent_name": {
"terms": {
"field": "geoip.continent_name",
"size": 100
},
"aggs": {
"stats_taxful_total_price": {
"stats": {
"field": "taxful_total_price"
}
}
}
}
}
}
}
}
### 创建rollup
## 通用参数说明
# 创建有 2种方式,一种直接基于 Api,一种基于 Kibana 可视化
# _rollup,创建 Rollup的API接口
# cron,定时任务,cron 表达式
# groups,分组逻辑,支持多个分组分桶函数
# index_pattern,指定索引数据,可以指定多个索引,支持通配符
# metrics,指标数据,分组分桶之后聚合的数据,支持常规的指标聚合
# page_size,分页大小,数据分组时,单页大小,默认1000
# rollup_index,新创建的索引名称,聚合之后的数据从这个所有查询
# timeout,超时时间,任务执行超时时间,默认20s
## groups 参数
# 设定滚动分桶聚合函数,建议采用 date_histogram
# groups,分组逻辑,支持多个分组分桶函数
# 创建
PUT _rollup/job/job_ecommerce_001
{
// 数据来源索引
"index_pattern": "kibana_sample_data_ecommerce",
// 转入的索引
"rollup_index": "index_ecommerce_001",
// 定时
"cron": "*/5 * * * * ?",
// 每次统计多少条
"page_size": 1000,
// 分组条件
"groups": {
"date_histogram": {
"field": "order_date",
"calendar_interval": "day"
},
"terms": {
"fields": "geoip.continent_name"
}
},
// 需要统计的值
"metrics": {
// 统计字段
"field": "taxful_total_price",
// 计算方式
"metrics": [
"max",
"min",
"avg",
"sum"
]
}
}
// 查询
GET _rollup/job/job_ecommerce_001
// 启动,创建完后需要启动
POST _rollup/job/job_ecommerce_001/_start
// 暂停
POST _rollup/job/job_ecommerce_001/_stop
// 删除,需要先暂停了才能删除
DELETE _rollup/job/job_ecommerce_001
# 普通查询数据
GET index_ecommerce_001/_search
{}
# rollup 查询语法,一级分桶
GET index_ecommerce_001/_rollup_search
{
"size":0,
// 再次根据continent_name聚合
"aggs":{
"terms_continent_name":{
"terms": {
"field": "geoip.continent_name"
}
}
}
}
# rollup对比aggs聚合,结果是一样的,但是过程不一样
GET kibana_sample_data_ecommerce/_search
{
"track_total_hits": true,
"size": 0,
"aggs": {
"terms_continent_name": {
"terms": {
"field": "geoip.continent_name",
"size": 100
},
"aggs": {
"stats_taxful_total_price": {
"stats": {
"field": "taxful_total_price"
}
}
}
}
}
}
# rollup 查询语法,二级分桶
GET index_ecommerce_001/_rollup_search
{
"size":0,
"aggs":{
"terms_continent_name":{
"terms": {
"field": "geoip.continent_name"
},
"aggs": {
"max_taxful_total_price": {
"max": {
"field": "taxful_total_price"
}
},
"min_taxful_total_price": {
"min": {
"field": "taxful_total_price"
}
}
}
}
}
}
# 基于数据再次统计
GET index_ecommerce_001/_search
{
"size":1,
"aggs":{
"terms_continent_name":{
"terms": {
"field": "geoip.continent_name.terms.value",
"size": 100
},
"aggs": {
"NAME": {
"sum": {
"field": "geoip.continent_name.terms._count"
}
}
}
}
}
}
# 查看有多少个rollup任务
GET _rollup/data/_all
# 查看rollup绑定了哪个索引
GET index_ecommerce_001/_rollup/data
# 通过索引找rollup
GET _rollup/data/kibana_sample_data_ecommerce
### Rollup分桶支持函数
# 分桶聚合函数支持
# 1.分组分桶Group支持的聚合函数
### 支撑的分桶分组聚合函数:date_histogram、histogram、terms
## Rollup数值聚合计算
# 数值聚合支持函数
# 1.Metric数值聚合函数支持
## 支撑的数值聚合函数:min、max、sum、avg、value_count
### Rollup数据查询限制
# 1.支持的查询表达式与类型
# 支持查询的类型语法 说明
# Term Query 精确词项查询
# Terms Query 精确词项查询
# Range Query 范围查询
# MatchAll Query 默认查询
# Any compound query (Boolean, Boosting, ConstantScore, etc)
### Rollup其它限制
# 其它限制
# 1.一个索引同时仅一个Rollup任务支持
# 2.数据必须有时间字段,建议时支持到毫秒的
### Kibana创建示例
# Kibana创建Kibana提供了可视化,非常方便创建维护
## path:Stack Management -> Rollup Jobs
#### 3、Transforms应用实战
## Transform概念
# 概念解释
# 1.与Rollup类似,将数据转换变化,基于聚合的思维,区别在于,可以不受时间限制,主要是在已有的聚合上做一次预聚合;
# 2.自带Checkpoint机制,可实时的刷新数据。
## Transform应用场景
# 应用场景海量明细数据提前预聚合统计;
# 需要进行二次聚合的数据统计
## Transform操作API
# 操作API
# 创建Transforms
# 维护Transforms
# Create transforms
# Delete transforms
# Get transforms
# Get transforms statistics
# Preview transforms
# Start transforms
# Stop transforms
# Update transforms
### 创建Transform
## 通用参数说明
# transform,创建 Transforms接口,后面参数为任务名称
# description,任务描述,定义转换任务描述
# dest,目标索引,按照规则新生成的索引,只能设定一个索引名称
# frequency,检查原始索引数据是否变化的间隔,最小 1s,建议按照业务需求设定,实际就是定时器,与 Rollup-cron表达式有点类似
# latest,标识数据字段是否最新,
# pivot,数据转换入口参数,包括分组group_by与aggregations聚合统计2部分
# retention_policy,数据保留策略,设置 time,setting
# settings,数据转换性能限制
# source,设置需要转换的索引信息
# sync,数据同步设定
## defer_validation 参数
# 依赖索引有效性验证,取值范围 true/false,默认false
# 部分索引是动态创建,不一定立刻就有
## dest 参数
# 设定目标索引信息
# 目标索引,按照规则新生成的索引,只能设定一个索引名称dest,
# pipeline,设定数据加工管道处理函数名称
## latest 参数
# latest,标识数据字段是否最新,用于判断哪些数据是否变化,便于计算挑选
# unique_key,指定唯一标识数据字段
# sort,设定排序字段
## pivot 参数
# pivot,数据转换入口参数,包括分组group_by与aggregations聚合统计2部分
# group_by,分组分桶字段选择,内部支持多个分组分桶,函数也支持多种,与Rollup不一样
# aggs/aggregations,聚合统计的数据值,一般建议是数值统计,支持多种指标聚合函数
## retention_policy 参数
# retention_policy,设定转换后的数据有效期
# field,指定数据字段
# max_age,设定有效期,可以设置多种维度
## settings 参数
# 设定数据抽取的频率限制
# 限制数据转换对于性能影响
# settings,数据转换性能限制
# docs_per_second,控制查询数据源,单秒阀值,默认无限制,集群任务过多,考虑资源建议限制,若从原数据-次查询过度,会交叉影响原始索引别的业务应用。
# max_page_search _size,默认500,范围【10,10000】,限制组合分桶时的输入数据量,到达这个阀值就开始执行聚合逻辑。防止单次聚合原始数据过多。白话就是每次聚合时,单次数据量尽量别太多。
# dates_as_epoch_milis,设置转换的时间字段是否符合iso 标准,取值范围 true/false、默认 false
## source 参数
# 设置需要转换的索引信息
# source,设置需要转换的索引信息
# index,指定原有索引名称,可以指定多个,支持通配符,最好是同一类索引数据
# query,限定原有数据集合,满足查询过滤条件,详细参考 DSL-query部分
# runtime_mappings,设定运行时字段
## sync 参数
# 设置数据写入到新索引策略
# sync,数据同步设定
# time,时间策略
# delay,间隔写入新索引的时间,默认60s,建议设置稍微大点
# field,设定前后间隔时间对比时参考的字段数据,一般建议设定时间
### 查询案例:按照大洲区域,按照客户性别,统计订单金额数据
GET kibana_sample_data_ecommerce/_search
{
"track_total_hits": true,
"size": 1,
"aggs": {
"terms_continent_name": {
"terms": {
"field": "geoip.continent_name",
"size": 7,
"order": {
"_key": "asc"
}
},
"aggs": {
"terms_customer_gender": {
"terms": {
"field": "customer_gender",
"size": 10
},
"aggs": {
"stats_taxful_total_price": {
"stats": {
"field": "taxful_total_price"
}
}
}
}
}
}
}
}
# 创建Transform
PUT _transform/my-trans-001
{
"description": "我的转换-001",
"source": {
"index": "kibana_sample_data_ecommerce",
"query": {
"match_all": {}
}
},
"pivot": {
"group_by": {
"terms_continent_name": {
"terms": {
"field": "geoip.continent_name"
}
},
"terms_customer_gender": {
"terms": {
"field": "customer_gender"
}
}
},
"aggregations": {
"max_taxful_total_price": {
"max": {
"field": "@timestamp"
}
},
"min_taxful_total_price": {
"min": {
"field": "@timestamp"
}
},
"avg_taxful_total_price": {
"avg": {
"field": "@timestamp"
}
},
"sum_taxful_total_price": {
"sum": {
"field": "@timestamp"
}
}
}
},
"dest":{
"index":"index-trans-001"
},
"frequency":"1m",
"sync":{
"time":{
"field":"order_date",
"delay":"60s"
}
},
"settings":{
"docs_per_second":500,
"max_page_search_size":500
}
}
## 启动暂停
# transfroms与 rollup 一样,基于定时运行规则,也需要人为的启动/暂停
# start
POST _transform/my-trans-001/_start
# stop
POST _transform/my-trans-001/_stop
## 查询数据
# 查询数据,直接从新索引查询
# _search 查询,与 rollup不一样
GET index-trans-001/_search
{
}
## 管理维护
# preview
# 在线预览创建后的 transfroms生成的数据
# preview,在线预览
POST _transform/_preview
{
#此处与创建内容一样
}
## update
# 更新 transform 设置
# _update,在线更新
POST _transform/trans01/_update
{
#此处与创建内容一样
}
## DELETE
DELETE _transform/<transform_id>
DELETE _transform/my-trans-001
## 查询GET
# transforms创建后,就会存储在 ES 之中
# 运行中会产生很多统计信息
# GET transform/<transform id>
# GET transform/<transform_id>,<transform_id>
# GET transform/
# # GET transform/_all
# GET transform/*
## 状态stats
# GET transform/<transform id>/ stats
# GET transform/<transform_id>,<transform_id>/ stats
# GET transform/stats
# GET transform/_all/_stats
# GET transform/*/stats
### Transform 限制
## aggregations聚合函数支持
# 注意高级的聚合函数支持,并非所有的都支持
# 支持计算聚合函数:avg、max、min、sum、bucket_script 管道聚合、cardinality、filter、geo_bounds、geo_centroid、median_absolute_deviation、missing、percentiles、rare_terms、scripted_metric、terms、value_count、weighted_avg
## group_by分桶支持函数
# 限制条件
# 函数限制,并非所有的聚合函数都支持,支持常用的大部分;
# 支持分桶聚合函数:date_histogram、geotile_grid、histogram、terms
## 其它限制条件
# 限制条件
# 1.当前限制单集群1000个数据转换任务iob
### 转换节点设置
# ES为了大规模应用转换,启用了独立的节点角色,需要在节点启动之前配置好
# 参数设置
# 配置文件:{ES_HOME}/config/elasticsearch.yml
# node.roles:[transform]
#### 4、数据转换建议以及经验分享
## 任务数量限制
## 数据转换角色
## 可大规模运用ES分析能力,可有条件的替代市面上部分数据产品
# rollup & transform
# https://www.elastic.co/guide/en/elasticsearch/reference/8.6/data-rollup-transform.html
# rollup
# https://www.elastic.co/guide/en/elasticsearch/reference/8.6/xpack-rollup.html
# rollup-apis
# https://www.elastic.co/guide/en/elasticsearch/reference/8.6/rollup-apis.html
# transforms
# https://www.elastic.co/guide/en/elasticsearch/reference/8.6/transforms.html
# transform-checkpoints 局部更新机制
# https://www.elastic.co/guide/en/elasticsearch/reference/8.6/transform-checkpoints.html
# transform-apis
# https://www.elastic.co/guide/en/elasticsearch/reference/8.6/transform-apis.html
# transforms 节点角色
# https://www.elastic.co/guide/en/elasticsearch/reference/8.6/modules-node.html
# Apache Druid 产品
# https://druid.apache.org