ElasticSearch-Ingest Pipeline Painless Script
- Ingest Node & Pipeline & Processor
- Ingest Node
- Pipeline & Processor
- 内置的 Processors
- 创建 pipeline
- 使用 pipeline 更新数据
- 借助 update_by_query 更新已存在的文档
- Ingest Node VS Logstash
- Painless
Ingest Node & Pipeline & Processor
-
应用场景: 修复与增强写入数据
-
Ingest Node
- 默认配置下,每个节点都是 Ingest Node(Elasticsearch 5.0后)
- 具有预处理数据的能力,可拦截lndex或 Bulk API的请求
- 对数据进行转换,并重新返回给Index或 Bulk APl
- 无需Logstash,就可以进行数据的预处理
- 为某个字段设置默认值
- 重命名某个字段的字段名
- 对字段值进行Split 操作
- 支持设置Painless脚本,对数据进行更加复杂的加工
-
Pipeline & Processor
- Pipeline:管道会对通过的数据(文档),按照顺序进行加工
- Processor:Elasticsearch 对一些加工的行为进行了抽象包装
- Elasticsearch 有很多内置的Processors,也支持通过插件的方式,实现自己的Processor
-
内置的 Processors
- Split Processor : 将给定字段值分成一个数组
- Remove / Rename Processor :移除一个重命名字段
- Append : 为商品增加一个新的标签
- Convert:将商品价格,从字符串转换成float 类型
- Date / JSON:日期格式转换,字符串转JSON对象
- Date lndex Name Processor︰将通过该处理器的文档,分配到指定时间格式的索引中
- Fail Processor︰一旦出现异常,该Pipeline 指定的错误信息能返回给用户
- Foreach Process︰数组字段,数组的每个元素都会使用到一个相同的处理器
- Grok Processor︰日志的日期格式切割
- Gsub / Join / Split︰字符串替换│数组转字符串/字符串转数组
- Lowercase / upcase︰大小写转换
# 测试 split tags
POST _ingest/pipeline/_simulate
{"pipeline":{"description":"to split blog tags","processors": [
{"split":{"field":"tags","separator":","}}]},"docs": [
{"_index": "index","_id": "id","_source": {
"title": "Introducing big data......",
"tags": "hadoop,elasticsearch,spark",
"content": "You konw, for big data"}},
{"_index":"index","_id":"idxx","_source":{
"title":"Introducing cloud computering",
"tags":"openstack,k8s",
"content":"You konw, for cloud"}}]}
# 同时为文档增加一个字段
POST _ingest/pipeline/_simulate
{"pipeline":{"description":"to split blog tags","processors":[
{"split":{"field":"tags","separator":","}},
{"set":{"field":"views","value":0}}]},
"docs": [
{"_index":"index","_id":"id","_source":{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"}},
{"_index":"index","_id":"idxx","_source":{
"title":"Introducing cloud computering",
"tags":"openstack,k8s",
"content":"You konw, for cloud"}}]}
- 创建 pipeline
# 为 ES 添加一个 Pipeline
PUT _ingest/pipeline/blog_pipeline
{"description":"a blog pipeline","processors":[
{"split":{"field":"tags","separator":","}},
{"set":{"field":"views","value":0}}]}
- 使用 pipeline 更新数据
# 使用 pipeline 更新数据
PUT tech_blogs/_doc/2?pipeline=blog_pipeline
{"title":"Introducing cloud computering",
"tags":"openstack,k8s",
"content":"You konw, for cloud"}
- 借助 update_by_query 更新已存在的文档
#增加update_by_query的条件
POST tech_blogs/_update_by_query?pipeline=blog_pipeline
{"query":{"bool":{"must_not":{"exists":{"field":"views"}}}}}
- Ingest Node VS Logstash
Logstash | Ingest Node | |
---|---|---|
数据输入与输出 | 支持从不同的数据源读取,并写 入不同的数据源 | 支持从 ES REST API 获取数据, 并且写入 Elasticsearch |
数据缓冲 | 实现了简单的数据队列,支持重写 | 不支持缓冲 |
数据处理 | 支持大量的插件,也支持定制开发 | 内置的插件,可以开发Plugin进 行扩展 (Plugin更新需要重启) |
配置和使用 | 增加了一定的架构复杂度 | 无需额外部署 |
Painless
- Painless 支持所有 Java 的数据类型及 Java API 子集
- Painless Script 具备以下特性
- 高性能/安全
- 支持显示类型或者动态定义类型
- Painless 的用途
- 可以对文档字段进行加工处理
- 更新或删除字段,处理数据聚合操作
- Script Field: 对返回的字段提前进行计算
- Function Score: 对文档的算分进行处理
- 在lngest Pipeline中执行脚本
- 在Reindex APl,Update By Query时,对数据进行处理
- 可以对文档字段进行加工处理
- 通过Painless脚本访问字段
- Ingestion:
ctx.field_name
- Update:
ctx._source.field_name
- Search & Aggregation:
doc["field_name"]
- Ingestion:
# 增加一个 Script Prcessor
POST _ingest/pipeline/_simulate
{"pipeline":{"description":"to split blog tags","processors":[
{"split":{"field":"tags","separator":","}},
{"script":{"source":"""
if(ctx.containsKey("content")){
ctx.content_length = ctx.content.length();
}else{
ctx.content_length=0;
}"""}},
{"set":{"field":"views","value":0}}]},
"docs": [{"_index":"index","_id":"id","_source":{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"}},
{"_index":"index","_id":"idxx","_source":{
"title":"Introducing cloud computering",
"tags":"openstack,k8s",
"content":"You konw, for cloud"}}]}
PUT tech_blogs/_doc/1
{"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data",
"views":0}
POST tech_blogs/_update/1
{"script":{"params":{"new_views":100},
"source":"ctx._source.views += params.new_views"}}
# 查看 views 计数
POST tech_blogs/_search
# 保存脚本在 Cluster State
POST _scripts/update_views
{"script":{"lang":"painless","source":"ctx._source.views += params.new_views"}}
GET tech_blogs/_search
{"script_fields":{"rnd_views":{"script":{"lang":"painless","source":"""
java.util.Random rnd = new Random();
doc['views'].value+rnd.nextInt(1000);"""}}},
"query":{"match_all":{}}}
-
脚本缓存:脚本编译的开销较大,Elasticsearch会将脚本编译后缓存在Cache 中
- Inline scripts和 Stored Scripts都会被缓存
- 默认缓存100个脚本
-
script.cache.max_size
设置最大缓存数 -
script.cache.expire
设置缓存超时 -
script.max_compilations_rate
默认5分钟最多75次编译 (75/5m)