如何通过 Apache Camel 将数据导入 Elasticsearch
作者:来自 Elastic Andre Luiz
使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中,我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能,我们将实现一个入门应用程序,逐步演示如何配置和使用 Apache Camel 将数据发送到 Elasticsearch。
什么是 Apache Camel?
Apache Camel 是一个开源集成框架,可简化不同系统的连接,使开发人员可以专注于业务逻辑,而不必担心系统通信的复杂性。Camel 的核心概念是 “routes - 路由”,它定义了消息从源到目的地所遵循的路径,可能包括转换、验证和过滤等中间步骤。
Apache Camel 架构
Camel 使用 “components- 组件” 连接不同的系统和协议,例如数据库和消息传递服务,并使用 “endpoints- 端点” 表示消息的入口点和出口点。这些概念提供了模块化和灵活的设计,使配置和管理复杂集成变得更加容易,高效且可扩展。
使用 Elasticsearch 和 Apache Camel
我们将演示如何配置一个简单的 Java 应用程序,该应用程序使用 Apache Camel 将数据导入 Elasticsearch 集群。还将介绍使用 Apache Camel 中定义的路由在 Elasticsearch 中创建、更新和删除数据的过程。
1. 添加依赖项
配置此集成的第一步是将必要的依赖项添加到项目的 pom.xml 文件中。这将包括 Apache Camel 和 Elasticsearch 库。我们将使用新的 Java API 客户端库,因此我们必须导入 camel-elasticsearch 组件,并且版本必须与 camel-core 库相同。
如果你想使用 Java 低级 Rest 客户端,则必须使用 Elasticsearch 低级 Rest 客户端组件。
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>4.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-elasticsearch</artifactId>
<version>4.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jackson</artifactId>
<version>4.7.0</version>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.14.3</version>
</dependency>
2. 配置和运行 Camel 上下文
配置首先使用 DefaultCamelContext 类创建一个新的 Camel 上下文,该类是定义和执行路由的基础。接下来,我们配置 Elasticsearch 组件,这将允许 Apache Camel 与 Elasticsearch 集群交互。ESlasticsearchComponent 实例配置为连接到地址 localhost:9200,这是本地 Elasticsearch 集群的默认地址。对于需要身份验证的环境设置,你应该阅读有关如何配置组件和启用基本身份验证的文档,称为 “Configure the component and enable basic authentication - 配置组件和启用基本身份验证”。
public class ESComponent {
public static ElasticsearchComponent getInstance() {
var elasticsearch = new ElasticsearchComponent();
elasticsearch.setHostAddresses("localhost:9200");
return elasticsearch;
}
public static String getName() {
return "elasticsearch";
}
}
然后将该组件添加到 Camel 上下文中,使得定义的路由能够使用该组件在 Elasticsearch 中执行操作。
try (var context = new DefaultCamelContext()) {
context.addComponent(ESComponent.getName(), ESComponent.getInstance());
context.addRoutes(new OperationBulkRoute());
context.start();
}
随后,将路由添加到上下文中。我们将创建用于批量索引、更新和删除文档的路由。
3. 配置 Camel 路由
数据索引
我们将配置的第一个路由用于数据索引。我们将使用包含电影目录的 JSON 文件。路由将配置为读取位于 src/main/resources/movies.json 的文件,将 JSON 内容反序列化为 Java 对象,然后应用聚合策略将多条消息合并为一条,从而允许在 Elasticsearch 中进行批量操作。配置了每条消息 500 个项目的大小,即批量将一次索引 500 部电影。
路由 Elasticsearch 操作 bulk:
String URI_BULK_OPERATION = String
.format("elasticsearch://elasticsearch?operation=%s&indexName=%s",
IndexOperationConfig.BULK_OPERATION,
INDEX_NAME);
public class OperationBulkRoute extends RouteBuilder {
private static final Log log = LogFactory.getLog(OperationBulkRoute.class);
private static final int BULK_SIZE = 500;
@Override
public void configure() {
from("file:src/main/resources?fileName=movies.json&noop=true")
.routeId("route-bulk-ingest")
.unmarshal().json()
.split(body())
.aggregate(constant(true), new BulkAggregationStrategy())
.completionSize(BULK_SIZE)
.to(URI_BULK_OPERATION)
.process(exchange -> {
var body = exchange.getIn().getBody(String.class);
log.info(String.format("Response: %s", body));
})
.end();
}
}
这批文档将被发送到 Elasticsearch 的批量操作端点。这种方法可确保处理大量数据时的效率和速度。
数据更新
下一个路由是更新文档。我们在上一步中索引了一些电影,现在我们将创建新的路由,通过参考代码搜索文档,然后更新评级字段。
我们设置了一个 Camel 上下文 (DefaultCamelContext),其中注册了一个 Elasticsearch 组件,并添加了一个自定义路由 IngestionRoute。操作首先通过 ProducerTemplate 发送文档代码,然后从 direct:update-ingestion 端点启动路由。
try (var context = new DefaultCamelContext()) {
context.addComponent(ESComponent.getName(), ESComponent.getInstance());
context.addRoutes(new IngestionRoute());
context.start();
ProducerTemplate producerTemplate = context.createProducerTemplate();
producerTemplate.sendBody("direct:update-ingestion", documentCode);
Thread.sleep(5000);
}
接下来,我们有 IngestionRoute,它是此流程的输入端点。该路由执行几个流水线操作。首先,在 Elasticsearch 中进行搜索以按代码 (direct:search-by-id) 定位文档,其中 SearchByCodeProcessor 根据代码组装查询。然后,检索到的文档由 UpdateRatingProcessor 处理,它将结果转换为 Movie 对象,将电影评级(movie rating)更新为特定值,并准备将更新后的文档发送回 Elasticsearch 进行更新。
public class IngestionRoute extends RouteBuilder {
private static final Log log = LogFactory.getLog(IngestionRoute.class);
@Override
public void configure() throws Exception {
from("direct:update-ingestion")
.pipeline()
.to("direct:search-by-id")
.to(URI_SEARCH_OPERATION)
.to("direct:update-rating")
.to(URI_UPDATE_OPERATION)
.process(exchange -> {
var body = exchange.getIn().getBody(String.class);
log.info(String.format("Response: %s", body));
})
.end();
from("direct:search-by-id")
.process(new SearchByCodeProcessor());
from("direct:update-rating")
.process(new UpdateRatingProcessor());
}
}
SearchByCodeProcessor 处理器仅配置为执行搜索查询:
public class SearchByCodeProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
var code = exchange.getIn().getBody();
String query = "{\n" +
" \"query\": {\n" +
" \"term\": {\n" +
" \"code\": {\n" +
" \"value\":" + code + "\n" +
" }\n" +
" }\n" +
" }\n" +
"}";
exchange.setProperty("document_code", code);
exchange.getIn().setBody(query);
}
}
UpdateRatingProcessor 处理器负责更新评级字段。
public class UpdateRatingProcessor implements Processor {
private final ObjectMapper objectMapper;
public UpdateRatingProcessor() {
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
@Override
public void process(Exchange exchange) throws Exception {
HitsMetadata response = exchange.getIn().getBody(HitsMetadata.class);
var code = Long.parseLong(exchange.getProperty("document_code").toString());
if (response != null && response.hits() != null) {
var documents = parseToMovies(response);
var optionalMovie = documents.stream()
.filter(document -> code == (document.getSource().getCode())).findAny();
optionalMovie.ifPresent(document -> {
document.getSource().setRating(13.0);
Map<String, Object> updateMap = new HashMap<>();
updateMap.put("doc", document.getSource());
exchange.getIn().setHeader("indexId", document.getId());
exchange.getIn().setBody(updateMap);
});
}
}
数据删除
最后,配置删除文档的路由。在这里,我们将使用文档的 ID 删除文档。在 Elasticsearch 中,要删除文档,我们需要知道文档标识符、存储文档的索引并执行删除请求。在 Apache Camel 中,我们将通过创建新路由来执行此操作,如下所示。
路由从 direct:op-delete 端点开始,该端点作为入口点。当需要删除文档时,将在消息正文中收到其标识符 (_id)。然后,路由使用 simple("${body}") 将 indexId 标头设置为该标识符的值,这会从消息正文中提取 _id。
public class OperationDeleteRoute extends RouteBuilder {
private static final Log log = LogFactory.getLog(OperationDeleteRoute.class);
@Override
public void configure() {
from("direct:op-delete")
.routeId("route-delete")
.setHeader("indexId", simple("${body}"))
.to(URI_DELETE_OPERATION)
.process(exchange -> {
var body = exchange.getIn().getBody(String.class);
log.info(String.format("Response: %s", body));
})
.end();
;
}
}
String URI_DELETE_OPERATION = String
.format("elasticsearch://elasticsearch?operation=%s&indexName=%s",
IndexOperationConfig.DELETE_OPERATION,
INDEX_NAME);
最后,消息被定向到URI_DELETE_OPERATION指定的端点,该端点连接到 Elasticsearch 以执行相应索引中的文档删除操作。
现在我们已经创建了路由,我们可以创建一个 Camel 上下文(DefaultCamelContext),它被配置为包含 Elasticsearch 组件。
try (var context = new DefaultCamelContext()) {
context.addComponent(ESComponent.getName(), ESComponent.getInstance());
context.addRoutes(new OperationDeleteRoute());
context.start();
ProducerTemplate producerTemplate = context.createProducerTemplate();
producerTemplate.sendBody("direct:op-delete", documentId);
}
接下来,将 OperationDeleteRoute 类定义的删除路由(delete route)添加到上下文中。初始化上下文后,使用 ProducerTemplate 将应删除的文档的标识符传递给 direct:op-delete 端点,从而触发删除路由。
结论
Apache Camel 与 Elasticsearch 之间的集成允许实现强大而高效的数据提取,利用 Camel 的灵活性来定义可以处理不同数据操作场景(例如索引、更新和删除)的路由。通过此设置,你可以以可扩展的方式编排和自动化复杂流程,确保你的数据在 Elasticsearch 中得到有效管理。此示例演示了如何将这些工具一起使用来创建高效且适应性强的数据提取解决方案。
参考资料
- Apache Camel
- Apache Camel 架构
- 聚合 Apache Camel
- 文件组件
- Elasticsearch 组件
准备好自己尝试一下了吗?开始免费试用。
想要获得 Elastic 认证吗?了解下一期 Elasticsearch 工程师培训何时开始!
原文:https://www.elastic.co/search-labs/blog/elasticsearch-apache-camel-ingest-data