【Elasticsearch】使用游标遍历所有数据
通过使用Elasticsearch 的内置游标方式,遍历所有数据
部分代码需要参考本专题的前面几个章节内容
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchAllQuery;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
private ElasticsearchClient esClient;
public void scrollAllData(EsDocReq req, Function<List<? extends Hit<? extends Map>>, Boolean> processHits) {
String indexName = req.getIndexLib();
log.info("遍历数据开始" + indexName);
final String timeout = "5m";
try {
SearchRequest searchRequest = SearchRequest.of(i ->
{
i.index(indexName)
.scroll(s -> s.time(timeout))
.query(MatchAllQuery.of(q -> q)._toQuery())
.size(100);
if(null != req.getColumns()){
i.source(src -> src.filter(f -> f.includes(req.getColumns())));
}
return i;
}
);
Map<String, Object> dataMap = new HashMap<>();
SearchResponse<? extends Map> searchResponse = esClient.search(searchRequest, dataMap.getClass());
Map<String, String> scollMap = new HashMap<>();
String key = "scroll";
scollMap.put(key, searchResponse.scrollId());
List<? extends Hit<? extends Map>> hits = searchResponse.hits().hits();
processHits.apply(hits);
while (!hits.isEmpty()) {
ScrollRequest scrollRequest = ScrollRequest.of(i -> i
.scrollId(scollMap.get(key))
.scroll(s -> s.time(timeout))
);
ScrollResponse<? extends Map> scrollResponse = esClient.scroll(scrollRequest, dataMap.getClass());
scollMap.put(key, scrollResponse.scrollId());
hits = scrollResponse.hits().hits();
processHits.apply(hits);
}
ClearScrollRequest clearScrollRequest = ClearScrollRequest.of(i -> i.scrollId(scollMap.get(key)));
ClearScrollResponse clearScrollResponse = esClient.clearScroll(clearScrollRequest);
if (clearScrollResponse.succeeded()) {
log.info("遍历数据结束");
}
} catch (IOException e) {
e.printStackTrace();
}
}
1、EsDocReq 为自定义的请求对象
2、Function<List<? extends Hit<? extends Map>>, Boolean> processHits 用于实际遍历中处理读取的数据。【根据需要自定义函数,并以参数传递】
3、timeout = "5m"; 表示超时时间5分钟
4、 i.source 表示需要返回的指定字段
5、结尾需要清理游标 ClearScrollRequest
以下是 调用函数说明示例:
EsDocReq req = new EsDocReq();
scrollAllData(req, fn -> {
for (Hit<? extends Map> hit : fn) {
try {
String esIndex = hit.id();
// TODO 实际业务
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
return true;
});