es 3期 第23节-运用Pipeline实现二转聚合统计
#### 1.Elasticsearch是数据库,不是普通的Java应用程序,传统数据库需要的硬件资源同样需要,提升性能最有效的就是升级硬件。
#### 2.Elasticsearch是文档型数据库,不是关系型数据库,不具备严格的ACID事务特性,任何企图直接替代严格事务性场景的应用项目都会失败!!!
#### 3.Elasticsearch原则上适合一切非事务性应用场景或能够容许一定的延迟的事务性场景;能最大限度的替代mongodb与传统关系型数据库
##### 索引字段与属性都属于静态设置,若后期变更历史数据需要重建索引才可生效
##### 对历史数据无效!!!!
##### 一定要重建索引!!!!
#### 1、Pipeline聚合介绍
### Pipeline聚合
## 概念解释
# 每个数据聚合阶段是一个环节,多个环节串联一起来,形成一个管道模型,上一个环节的聚合结果是下一个环节的输入数据
# Pipeline执行过程
# 1.每个数据聚合阶段完成之后,将本次的聚合结果传入下一个聚合阶段,直到所有聚合完成;2.基于前置的分桶聚合,之后进行管道聚合
# ES聚合本质
# 1.ES聚合本质是属于查询性的聚合,不是内存式的计算,不支持多次迭代。
# 2.只能处理数值类型
### Pipeline聚合关系
## 聚合关系类型
# Parent,基于上下级,有父子关系
# Sibling,基于相同级,有兄弟关系
### Gap间隔异常
## Gaps概念
# 1.分桶数据会产生脏数据或者缺失,遇到此种情况需要设定一种模式;
# 2.类同与bucket聚合时字段确实missing属性
# 间隔处理方式skip,跳过,默认
# insert_zeros,基于0补足
### Pipeline聚合便利
## Pipeline聚合便利
# 一次聚合,可以得到不同维度的聚合结果,相比传统统计,节约至少一次统计
#### 2、Pipeline常用聚合
### 数值pipeline聚合
## avg/sum/max/min
# 常用数值Pipeline聚合
# 1.max_bucket
# 2.avg_bucket
# 3.Sum_bucket
# 4.min_bucket
# 5.buckets_path,设定聚合直接的路径关系指向
## Pipeline聚合关系
# sibling 兄弟平等关系
## max_bucket使用
# 统计出发地国家的飞行时间stats
GET kibana_sample_data_flights/_search
{
"track_total_hits": true,
"size": 0,
"aggs": {
// 注意这个自定义名字,下面会用到
"trems_OriginCountry": {
"terms": {
"field": "OriginCountry"
},
"aggs": {
// 注意这个自定义名字,下面会用到
"stats_FlightTimeMin": {
// 注意,这里是stats
"stats": {
"field": "FlightTimeMin"
}
}
}
},
"pipeline-max-FlightTimeMin": {
"max_bucket": {
// 外层的自定义名字 加上符号'>' 内层的自定义名字和统计名
// 注意pipeline是属于那个aggs下的,从这一层开始取自定义名
// stats的max值
"buckets_path": "trems_OriginCountry>stats_FlightTimeMin.max"
}
}
}
}
# 统计max值的话,下面就不用加max了
GET kibana_sample_data_flights/_search
{
"track_total_hits": true,
"size": 0,
"aggs": {
// 注意这个自定义名字,下面会用到
"trems_OriginCountry": {
"terms": {
"field": "OriginCountry"
},
"aggs": {
// 注意这个自定义名字,下面会用到
"stats_FlightTimeMin": {
// 注意,这里换了max
"max": {
"field": "FlightTimeMin"
}
}
}
},
"pipeline-max-FlightTimeMin": {
"max_bucket": {
// 上面换了max后,结尾就不需要max了
"buckets_path": "trems_OriginCountry>stats_FlightTimeMin"
}
}
}
}
# 统计出发地国家+出发地城市的飞行时间stats,最大值
GET kibana_sample_data_flights/_search
{
"track_total_hits": true,
"size": 0,
"aggs": {
"trems_OriginCountry": {
"terms": {
"field": "OriginCountry",
"size": 10
},
"aggs": {
"trems_OriginCityName": {
"terms": {
"field": "OriginCityName",
"size": 10
},
"aggs": {
"stats_FlightTimeMin": {
"stats": {
"field": "FlightTimeMin"
}
}
}
},
"pipeline-OriginCityName-max": {
"max_bucket": {
// 注意pipeline是属于那个aggs下的,从这一层开始取自定义名
"buckets_path": "trems_OriginCityName>stats_FlightTimeMin.max"
}
}
}
}
}
}
# 统计出发地国家+出发地城市的飞行时间stats,多种
GET kibana_sample_data_flights/_search
{
"track_total_hits": true,
"size": 0,
"aggs": {
"trems_OriginCountry": {
"terms": {
"field": "OriginCountry",
"size": 10
},
"aggs": {
"trems_OriginCityName": {
"terms": {
"field": "OriginCityName",
"size": 10
},
"aggs": {
"stats_FlightTimeMin": {
"stats": {
"field": "FlightTimeMin"
}
}
}
},
// 统计出发地国家+出发地城市的飞行时间 其中的max值
"pipeline-OriginCityName-max": {
"max_bucket": {
// 注意pipeline是属于那个aggs下的,从这一层开始取自定义名
"buckets_path": "trems_OriginCityName>stats_FlightTimeMin.max"
}
},
// 统计出发地国家+出发地城市的飞行时间 其中的min值
"pipeline-OriginCityName-min": {
"min_bucket": {
// 注意pipeline是属于那个aggs下的,从这一层开始取自定义名
"buckets_path": "trems_OriginCityName>stats_FlightTimeMin.min"
}
}
}
},
// 在上面每个分组的max结果集在统计出其中的最大值
"pipeline-OriginCountry-max-FlightTimeMin": {
"max_bucket": {
"buckets_path": "trems_OriginCountry>pipeline-OriginCityName-max"
}
},
// 在上面每个分组的min结果集在统计出其中的最小值
"pipeline-OriginCountry-min-FlightTimeMin": {
"max_bucket": {
"buckets_path": "trems_OriginCountry>pipeline-OriginCityName-min"
}
}
}
}
### Stats/extended status
## 常用数值Pipeline聚合
# 1.stats bucket
# 2.extended stats bucket
## Pipeline聚合关系
# sibling 兄弟平等关系
# 扩展计算
GET kibana_sample_data_flights/_search
{
"track_total_hits": true,
"size": 0,
"aggs": {
"trems_OriginCountry": {
"terms": {
"field": "OriginCountry",
"size": 10
},
"aggs": {
"trems_OriginCityName": {
"terms": {
"field": "OriginCityName",
"size": 10
},
"aggs": {
"stats_FlightTimeMin": {
// 扩展
"extended_stats": {
"field": "FlightTimeMin"
}
}
}
},
"pipeline-OriginCityName-max": {
// 扩展
"extended_stats_bucket": {
// 注意pipeline是属于那个aggs下的,从这一层开始取自定义名
"buckets_path": "trems_OriginCityName>stats_FlightTimeMin.max"
}
}
}
}
}
}
#### 3、Pipeline高级聚合
### Cumulative sum
## 概念解释
#累计汇总,基于分桶累计值汇总
## Pipeline聚合关系
# parent 父子上下级关系
# 统计电商购物数据,统计用户月维度消费金额,并算出每个月的累加值
GET kibana_sample_data_ecommerce/_search
{
"size": 0,
"track_total_hits": true,
"aggs": {
"terms_custom_id": {
"terms": {
"field": "customer_id",
"size": 10
},
"aggs": {
"date_histogram_order_date": {
// 根据订单时间,月维度
"date_histogram": {
"field": "order_date",
"calendar_interval": "month",
"format": "yyyy-MM"
},
"aggs": {
// 自定义key
"stats_taxful_total_price": {
// 统计订单价格
"stats": {
"field": "taxful_total_price"
}
},
// 自定义key
"cumulative_sum_taxful_total_price": {
// 计算累加值
"cumulative_sum": {
"buckets_path": "stats_taxful_total_price.sum"
}
}
}
}
}
}
}
}
### 滑动窗口概念介绍
# 滑动窗口概念解释基于一定的窗口函数,计算窗口内的数据值
# 在大数据分析场景中,经常出现如Flink/Spark中所谓的实时计算其实就是基于窗口理念
# Pipeline聚合关系
# parent 父子上下级关系
## 按照5个分桶跨度,计算平均值
## 分桶序号 分桶 value 窗口数为5,平均值
# 序号-1 10 10=10/1
# 序号-2 20 15=(10+20)/2
# 序号-3 30 20=(10+20+30)/3
# 序号-4 40 25=(10+20+30+40)/4
# 序号-5 50 30=(10+20+30+40+50)/5
# # 序号-6 60 40=(20+30+40+50+60)/5
# 序号-7 70 50=(30+40+50+60+70)/5
# 序号-8 80 60=(40+50+60+70+80)/5
# 序号-9 90 70=(50+60+70+80+90)/5
# 序号-10 100 80=(60+70+80+90+100)/5
## 参数
# moving_fn,函数窗口表达式
# window:窗口大小,表示要包含多少个桶(buckets)进行计算。
# script:自定义脚本,用于定义如何计算窗口内的值。可以是简单的求和、平均值等操作。
# shift(可选):偏移量,表示窗口相对于当前桶的位置。默认为 0,即窗口从当前桶开始
# 平均值,根据订单时间月维度分组统计价格,计算2个窗口的平均值
GET kibana_sample_data_ecommerce/_search
{
"size": 0,
"track_total_hits": true,
"aggs": {
"date_histogram_order_date": {
// 根据订单时间,月维度
"date_histogram": {
"field": "order_date",
"calendar_interval": "day",
"format": "yyyy-MM-dd"
},
"aggs": {
// 自定义key
"sum_taxful_total_price": {
// 统计订单价格,
"sum": {
"field": "taxful_total_price"
}
},
// 自定义key
"moving_fn_avg_taxful_total_price": {
// 计算平均值,
"moving_fn": {
"buckets_path": "sum_taxful_total_price",
// 窗口数2
"window": 2,
// 使用脚本,使用内置的无权重平均函数
"script": "MovingFunctions.unweightedAvg(values)"
}
}
}
}
}
}
# 累加值,根据订单时间月维度分组统计价格,计算2个窗口的累加值
GET kibana_sample_data_ecommerce/_search
{
"size": 0,
"track_total_hits": true,
"aggs": {
"date_histogram_order_date": {
// 根据订单时间,月维度
"date_histogram": {
"field": "order_date",
"calendar_interval": "day",
"format": "yyyy-MM-dd"
},
"aggs": {
// 自定义key
"sum_taxful_total_price": {
// 统计订单价格,
"sum": {
"field": "taxful_total_price"
}
},
// 自定义key
"moving_fn_sum_taxful_total_price": {
// 计算平均值,
"moving_fn": {
"buckets_path": "sum_taxful_total_price",
// 窗口数2
"window": 2,
// 使用脚本,使用内置的累加函数
"script": "MovingFunctions.sum(values)"
}
}
}
}
}
}
# 自定义脚本计算方式
GET kibana_sample_data_ecommerce/_search
{
"size": 0,
"track_total_hits": true,
"aggs": {
"date_histogram_order_date": {
// 根据订单时间,月维度
"date_histogram": {
"field": "order_date",
"calendar_interval": "day",
"format": "yyyy-MM-dd"
},
"aggs": {
// 自定义key
"sum_taxful_total_price": {
// 统计订单价格,
"sum": {
"field": "taxful_total_price"
}
},
// 自定义key
"moving_fn_avg_taxful_total_price": {
// 计算累加值,,
"moving_fn": {
"buckets_path": "sum_taxful_total_price",
// 窗口数2
"window": 2,
// 使用脚本,自定义计算方式
"script": """
double sum = 0;
double weightSum = 0;
for (int i = 0; i < values.length; ++i) {
// 权重从 1 开始递增
double weight = i + 1;
sum += values[i] * weight;
weightSum += weight;
}
return sum / weightSum;
"""
}
}
}
}
}
}
### moving_percentiles
# 基于移动窗口,获取数据的百分位占比
# percentiles bucket
# 需要基于已有的分桶数据结果之后,在其中分析统计分桶的百分位占比
# Pipeline 关系
# sibling 兄弟平等关系
## 查询参数
# percentiles_bucket,百分位分桶占比管道聚合函数,擅长父子结构的分桶
# buckets_path,分桶字段指定,注意这里的路径指向
# percents,百分位段落划分。
# 每组最大值,计算数据所在的百分位
GET kibana_sample_data_ecommerce/_search
{
"size": 0,
"track_total_hits": true,
"aggs": {
"date_histogram_order_date": {
// 根据订单时间,月维度
"date_histogram": {
"field": "order_date",
"calendar_interval": "day",
"format": "yyyy-MM-dd"
},
"aggs": {
// 自定义key
"stats_taxful_total_price": {
// 统计订单价格,,
"stats": {
"field": "taxful_total_price"
}
}
}
},
// 自定义key
"percentiles_bucket_taxful_total_price": {
// 计算stats_taxful_total_price.max百分位
"percentiles_bucket": {
"buckets_path": "date_histogram_order_date>stats_taxful_total_price.max",
"percents": [
1,5,10,15,25,35,45,55,65,75,85,95,99
]
}
}
}
}
### derivative
# 计算前后桶的差值,延伸值
# 此管道函数计算,必须基于 histogram 直方图分桶
# Pipeline 关系
# parent 父子先后上下级关系
## 查询参数
# derivative,管道查询函数
# buckets_path,指定统计字段
GET kibana_sample_data_ecommerce/_search
{
"size": 0,
"track_total_hits": true,
"aggs": {
"date_histogram_order_date": {
// 根据订单时间,月维度
"date_histogram": {
"field": "order_date",
"calendar_interval": "day",
"format": "yyyy-MM-dd"
},
"aggs": {
// 自定义key
"sum_taxful_total_price": {
// 统计订单价格,
"sum": {
"field": "taxful_total_price"
}
},
// 自定义key
"derivative_taxful_total_price": {
// 计算平均值,,
"derivative": {
"buckets_path": "sum_taxful_total_price"
}
}
}
}
}
}
### Serial differencing
# 概念解释
# 基于一定的窗口,计算前后分桶数据差值,如:redis监控中,总命令数差值,默认是递增的
# Pipeline聚合关系
# sibling 兄弟平等关系
# 查询参数
# serial_diff,基于2个分桶值之间,计算前后的差值,一般用户直方图分桶聚合应用中
# buckets_path,指定字段路径
# lag,差值起始位置,默认1,表示从第2个分桶开始计算
# 计算一周的,lag=7
GET kibana_sample_data_ecommerce/_search
{
"size": 0,
"track_total_hits": true,
"aggs": {
"date_histogram_order_date": {
// 根据订单时间,月维度
"date_histogram": {
"field": "order_date",
"calendar_interval": "day",
"format": "yyyy-MM-dd"
},
"aggs": {
// 自定义key
"sum_taxful_total_price": {
// 统计订单价格,
"sum": {
"field": "taxful_total_price"
}
},
// 自定义key
"serial_diff_taxful_total_price": {
// 计算差值
"serial_diff": {
"buckets_path": "sum_taxful_total_price",
// 每周增长情况,而不是每日增长,可以通过设置 lag 参数为7来实现
"lag": 7
}
}
}
}
}
}
## derivative 与 serial_diff 区别
# derivative 主要关注相邻桶之间值的变化率,适合用来观察短期内的变化趋势。
# serial_diff 则更加专注于去除数据中的趋势成分,通过计算与过去若干个桶值的差异来进行平稳化处理,适用于需要去除长期趋势或周期性影响的情况。
### selector分桶筛选
# 概念解释
# 有些应用场景,分桶聚合之后的数据需要进行一些过滤,基于管道聚合很容易实现,有点类似与数据库中的
# group-having组合
## Pipeline 关系
# parent 父子先后上下级关系
# 查询参数
# bucket_selector,分桶数据过滤表达式入口
# buckets_path,选择字段值,可以指定多个字段,采用键值方式{key:value(字段)}
# script,脚本计算表达式,返回的必须是 bool=true/false
# 按照日期统计每日的订单销售金额,并筛选出总金额大于13000小于13500的日期
GET kibana_sample_data_ecommerce/_search
{
"size": 0,
"track_total_hits": true,
"aggs": {
"date_histogram_order_date": {
// 根据订单时间,月维度
"date_histogram": {
"field": "order_date",
"calendar_interval": "day",
"format": "yyyy-MM-dd"
},
"aggs": {
// 自定义key
"sum_taxful_total_price": {
// 统计订单价格,
"sum": {
"field": "taxful_total_price"
}
},
// 自定义key
"custom-pipeline-fliter": {
"bucket_selector": {
"buckets_path": {
"sum":"sum_taxful_total_price"
},
"script": """
params.sum>13000 && 13500>params.sum;
"""
}
}
}
}
}
}
### bucket_sort
# 分桶之后的数据结构,往往需要按照一定的统计值排序,甚至仅仅需要截取其中部分分桶结果
# Pipeline 关系
# parent 父子先后上下级关系
## 查询参数
# bucket sort,分桶结果排序管道
# sort,设定一个或多个排序值,分桶结果按照此规则排序
# from,分页起始分桶位置
# size,分页大小,单页返回分桶多少,类似与之前学习的composite 组合分桶聚合
# 返回前3
GET kibana_sample_data_ecommerce/_search
{
"size": 0,
"track_total_hits": true,
"aggs": {
"date_histogram_order_date": {
// 根据订单时间,月维度
"date_histogram": {
"field": "order_date",
"calendar_interval": "day",
"format": "yyyy-MM-dd"
},
"aggs": {
// 自定义key
"stats_taxful_total_price": {
// 统计订单价格,,
"stats": {
"field": "taxful_total_price"
}
},
// 自定义key
"custom-pipeline-sort": {
"bucket_sort": {
"from": 0,
"size": 3,
"sort": [
{
"stats_taxful_total_price.sum":{
"order":"desc"
}
}
]
}
}
}
}
}
}
#### 4、Pipeline聚合建议以及经验分享
## 是否需要 流式计算Flink/Spark?(简单的数据可以直接使用es)
# Pipeline 聚合
# https://www.elastic.co/guide/en/elasticsearch/reference/8.6/search-aggregations-pipeline.html
# movavg 聚合
# https://www.elastic.co/guide/en/elasticsearch/reference/8.6/search-aggregations-pipeline-movavg-aggregation.html
# movfn 聚合
# https://www.elastic.co/guide/en/elasticsearch/reference/8.6/search-aggregations-pipeline-movfn-aggregation.html
# bucket_script 聚合
# https://www.elastic.co/guide/en/elasticsearch/reference/8.6/search-aggregations-pipeline-bucket-script-aggregation.html