elasticsearch 使用enrich processor填充数据
文章目录
- 使用 POST 请求手动插入用户数据
- 1. 创建 Enrich Policy
- 步骤 1.1: 创建 Enrich Policy
- 步骤 1.2: 执行 Enrich Policy
- 2. 创建 Ingest Pipeline
- 步骤 2.1: 创建 Ingest Pipeline
- 步骤 2.2: 配置 Enrich Processor 参数
- 3. 使用 Ingest Pipeline
- 步骤 3.1: 使用 Pipeline 进行文档索引
- 步骤 3.2: 查询和查看结果
- 4. 更新 Enrich Policy(如果需要)
- 步骤 4.1: 更新 Enrich Policy
- 步骤 4.2: 重新执行 Enrich Policy
- 5. 清理 Enrich Policy(如果不再使用)
- 总结
在 Elasticsearch 中使用 enrich
处理器的流程包括几个主要步骤。首先,您需要定义一个 enrich policy,然后将该策略应用到 Ingest Pipeline 中,最后在数据索引过程中使用该管道来实现数据的丰富操作。下面是一个详细的流程步骤。
使用 POST 请求手动插入用户数据
POST /users/_doc/1
{
"user_id": "1234",
"user_name": "John Doe",
"email": "johndoe@example.com",
"age": 30,
"address": "123 Elm Street"
}
1. 创建 Enrich Policy
enrich policy
定义了如何从某个源索引中提取数据,并根据特定字段将其与目标文档进行匹配。这是使用 enrich
处理器的基础。
步骤 1.1: 创建 Enrich Policy
假设我们有一个 users
索引,存储了用户的详细信息(如 user_id
、user_name
、email
等),并且我们想将这些信息基于 user_id
字段丰富到其他索引中的文档。
PUT _enrich/policy/user_enrich_policy
{
"match": {
"indices": ["users"], // 数据来源索引
"match_field": "user_id", // 用于匹配的字段
"enrich_fields": ["user_name", "email", "age"] // 要从源索引中获取的字段
}
}
在这里:
indices
: 源数据所在的索引(这里是users
索引)。match_field
: 用于匹配的字段(这里是user_id
字段)。enrich_fields
: 要从源索引中获取并丰富目标文档的字段(比如user_name
、email
、age
等)。
步骤 1.2: 执行 Enrich Policy
创建了 enrich policy 后,你需要执行它,这样 Elasticsearch 就会根据 match
查询从源索引中提取数据。
POST _enrich/policy/user_enrich_policy/_execute
执行此请求后,Elasticsearch 将根据政策从 users
索引中提取数据并创建一个内存中的匹配索引。
注意:Enrich Policy 是一个异步操作,创建并执行之后,它会定期更新和同步相关的数据。
2. 创建 Ingest Pipeline
enrich
处理器是通过 Ingest Pipeline 来使用的。在这个管道中,你可以配置 enrich
处理器,它会在文档被索引时将 enrich 数据合并到目标文档中。
步骤 2.1: 创建 Ingest Pipeline
你可以创建一个使用 enrich
处理器的 Ingest Pipeline。该管道会基于文档中的 user_id
字段从 user_enrich_policy
中查找并合并信息。
PUT _ingest/pipeline/user_enrich_pipeline
{
"description": "Enrich user data from users index",
"processors": [
{
"enrich": {
"policy_name": "user_enrich_policy", // 使用的 enrich policy 名称
"field": "user_id", // 用于匹配的字段名
"target_field": "user_info" // 合并到目标文档的字段名
}
}
]
}
在这里:
policy_name
: 指定我们创建的user_enrich_policy
。field
: 目标文档中用来匹配的字段(在这里是user_id
)。target_field
: 合并后的数据将被存储在目标文档中的字段(比如user_info
)。
步骤 2.2: 配置 Enrich Processor 参数
在 enrich
处理器中,你还可以使用以下几个可选参数:
ignore_missing
: 是否忽略缺失的匹配项。默认为false
,即当没有找到匹配时,会抛出错误。如果设置为true
,则会忽略没有匹配的情况,目标字段将保持为空。max_matches
: 如果有多个匹配项,限制合并数据的数量。默认为 1。
3. 使用 Ingest Pipeline
当 enrich
处理器和管道准备好后,你可以在索引文档时使用它。
步骤 3.1: 使用 Pipeline 进行文档索引
当你将文档索引到 Elasticsearch 时,可以指定要使用的 ingest pipeline
,在该管道中,enrich
处理器会根据 user_id
字段从 users
索引获取并合并相关数据。
POST /my_index/_doc/1?pipeline=user_enrich_pipeline
{
"user_id": "1234" // 当前文档中包含 user_id,enrich 处理器会根据此字段进行匹配
}
步骤 3.2: 查询和查看结果
一旦文档被成功索引,你可以查询目标文档并查看 user_info
字段,里面会包含 users
索引中对应 user_id
的信息。
GET /my_index/_doc/1
假设 user_id
为 1234
,那么返回的文档可能如下:
{
"_id": "1",
"_source": {
"user_id": "1234",
"user_info": {
"user_name": "John Doe",
"email": "johndoe@example.com",
"age": 30
}
}
}
在这里,user_info
字段包含了来自 users
索引的用户数据(如 user_name
、email
和 age
)。
4. 更新 Enrich Policy(如果需要)
如果你需要更新 enrich policy
,可以进行相应的调整,比如添加或删除字段,修改匹配规则等。然后,重新执行 enrich policy
。
步骤 4.1: 更新 Enrich Policy
PUT _enrich/policy/user_enrich_policy
{
"match": {
"indices": ["users"],
"match_field": "user_id",
"enrich_fields": ["user_name", "email", "age", "address"] // 新增加了 address 字段
}
}
步骤 4.2: 重新执行 Enrich Policy
POST _enrich/policy/user_enrich_policy/_execute
更新后,新的
user_info
字段将包括新的数据(例如address
)。
5. 清理 Enrich Policy(如果不再使用)
当不再需要某个 enrich policy
时,可以删除它:
DELETE _enrich/policy/user_enrich_policy
这会删除 enrich policy
和关联的匹配数据。
总结
使用 enrich
处理器的完整流程包括以下几个步骤:
- 创建 Enrich Policy:定义从源索引中提取数据的规则。
- 执行 Enrich Policy:执行政策并创建匹配数据。
- 创建 Ingest Pipeline:在管道中使用
enrich
处理器,将匹配的数据合并到目标文档中。 - 使用 Pipeline 索引数据:使用指定的管道进行文档索引。
- 查询数据:查询文档,查看合并后的数据。
- 更新和清理:根据需求更新或删除 enrich policy。
通过 enrich
处理器,您可以在索引文档时将来自其他索引的相关数据动态合并,从而减少实时查询的复杂度和延迟。