ElasticSearch分页查询性能及封装实现
Es的分页方式
from+size
最基本的分页方式,类似于SQL中的Limit语法:
//查询年龄在12到32之间的前15条数据
{
"query":{
"bool":{
"must":{
"range":{
"user_age":{
"gte":12,
"lte":32
}
}
}
}
},
"sort":{
"user_age":{
"order":"desc"
}
}
"from":0,
"size":15
}
与Limit一样,from+size
分页是通过设置from
参数来指定返回结果的起始位置,而size
参数来指定返回结果的数量。这里的页码在程序中需要额外进行处理一下,因为from是从0开始的,而size代表返回的条数,使用时需要对页码参数进行 :
原理
Es的查询过程如上图,即一个查询请求,在集群环境下是会被协调到各个节点中,最终落到对应索引的分片上,由每个分片进行查询,最终将数据回给主节点进行汇总。而使用from+size的分页,每个分片的处理则是这样的:
- 搜索请求通常跨越多个分片,每个分片必须将其请求的命中内容以及任何先前页面的命中内容加载到内存中。
- 对于翻页较深的页面或大量结果,这些操作会显著增加内存和 CPU 使用率,从而导致性能下降或节点故障。
例如,from=10000,size=10,需要将10010 条数据加载到内存,这通常意味着需要从多个分片中收集数据,然后在协调节点上进行合并和排序,然后经过后台处理后返回了最后 10条我们想要的数据。这个过程随着数据量的增加而变得更加复杂和资源密集,那也就意味着,越往后翻页(也就是深度翻页)需要加载的数据量越大,势必会越耗费 CPU + 内存资源,响应也会越慢。
性能
默认情况下,
from+size
的限制是 10000,这意味着from
参数加上size
参数的值不能超过 10000,这是为了避免大数据量的召回导致性能低下。如果你尝试进行深度分页,超过了这个限制,Elasticsearch 会抛出错误,提示结果窗口太大。为了解决这个问题,可以通过调整
index.max_result_window
的值来增加这个限制,但这通常不推荐,因为它会增加内存和 CPU 使用率,可能导致性能下降或节点故障
超出from+size限制报错:
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [10001]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting."
}
],
"type": "search_phase_execution_exception",
scroll分页
原理
当发起一个带有 scroll
参数的搜索请求时,Elasticsearch 的分片会为这次搜索创建一个上下文,然后各分片基于快照数据进行相应的查询,每轮查询结束后,会记录一个scrollId,将这批结果以及对应的scrollId返回给客户端。
此时客户端根据返回的scrollId再次发起查询,此时Es服务端会根据该scrollId,找到所属的上下文,并基于上次查询的结果的尾段进行继续查询,相比from+size的每次查询都需要重复大数据量的召回,scroll查询有效的避免了召回操作。减少了CPU、IO的消耗。
性能
相比from+size的重复大批量数据召回消耗CPU和IO,Scroll是更友好且适合大数据量的深度查询(不受制于max_result_window),但是Scroll提高性能的代价是牺牲实时性;当开始一个 scroll session 时,Elasticsearch 会创建一个索引的快照(上下文中),这个快照代表了初始化搜索请求时的索引状态。在 scroll session 的生命周期内,即使索引发生了变化(如新增、删除或更新文档),这些变化也不会反映在后续的滚动查询中。
同时,快照、上下文维护的存在必然导致需要更多的内存来支撑。
官方文档强调:不再建议使用scroll API进行深度分页。如果要分页检索超过 Top 10,000+ 结果时,推荐使用:PIT + search_after。
search_after分页
原理
search_after
是 Elasticsearch 5.0 以上版本提供的一种分页查询机制,用于解决深度分页的性能问题。它通过维护一个实时游标来避免传统 from+size
分页方式在处理大量数据时的性能损耗,也不需要像 scroll
API 那样创建和维护一个历史快照,从而减少了资源的占用。
search_after进行查询时,必须指定排序字段,它使用上一次查询的最后一个文档的排序值来获取下一页数据。不同于scroll基于快照的查询,search_after是基于实时的数据,不需要维护一个很大的快照。
性能
search_after
是一种无状态的分页方式,它不需要维护搜索上下文,因此不会占用额外的资源。每次请求都会根据上一次请求的最后一个文档的排序值来获取下一页数据,这样可以避免大量的内存消耗。search_after
提供了更好的实时性,因为它每次请求都会反映索引的最新状态。这意味着在查询过程中如果有数据的更新,这些变化会反映在分页结果中
场景及优缺点汇总
基于RestHighLevelClient的实现
RestHighLevelClient 是 Elasticsearch 的高级 Java 客户端,它提供了一套简单易用的 API 来与 Elasticsearch 服务器进行交互。
RestHighLevelClient位于org.elasticsearch.client包下,常用功能包括:
方法名称 | 入参 | 使用样例 | 备注 |
---|---|---|---|
createIndex | client : RestHighLevelClient 实例, indexName : 索引名称 | createIndex(client, "my_index"); | 创建索引 |
deleteIndex | client : RestHighLevelClient 实例, indexName : 索引名称 | deleteIndex(client, "my_index"); | 删除索引 |
index | request : IndexRequest 对象 | client.index(request, RequestOptions.DEFAULT); | 插入数据 |
get | request : GetRequest 对象 | client.get(request, RequestOptions.DEFAULT); | 根据ID获取数据 |
update | request : UpdateRequest 对象 | client.update(request, RequestOptions.DEFAULT); | 更新数据 |
delete | request : DeleteRequest 对象 | client.delete(request, RequestOptions.DEFAULT); | 根据ID删除数据 |
search | request : SearchRequest 对象 | client.search(request, RequestOptions.DEFAULT); | 搜索数据 |
scroll | request : SearchScrollRequest 对象 | client.scroll(request, RequestOptions.DEFAULT); | 滚动搜索 |
clearScroll | request : ClearScrollRequest 对象 | client.clearScroll(request, RequestOptions.DEFAULT); | 清除滚动ID |
bulk | request : BulkRequest 对象 | client.bulk(request, RequestOptions.DEFAULT); | 批量操作 |
count | request : CountRequest 对象 | client.count(request, RequestOptions.DEFAULT); | 计数查询 |
exists | request : GetRequest 对象 | client.exists(request, RequestOptions.DEFAULT); | 检查文档是否存在 |
updateByQuery | request : UpdateByQueryRequest 对象 | client.updateByQuery(request, RequestOptions.DEFAULT); | 根据查询更新数据 |
deleteByQuery | request : DeleteByQueryRequest 对象 | client.deleteByQuery(request, RequestOptions.DEFAULT); | 根据查询删除数据 |
其中 BulkRequest
、GetRequest
等参数,均为ActionRequest的子类,具体使用方式可以参考下文。
其中DSL语法可配合org.elasticsearch.search.builder包中的Builder来进行构建,eg:
public void test(){
SearchSourceBuilder searchBody = new SearchSourceBuilder()
.from(0)
.size(10)
.query(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("user_name","张三"))
.must(QueryBuilders.termQuery("age",12)))
.sort("id", SortOrder.DESC)
.fetchSource(Arrays.asList("id","name","age").toArray(new String[0]),new String[0])
.aggregation(AggregationBuilders.terms("Test")
.field("className")
.size(15));
}
form+size分页
定义es数据实体类 DocBaseEntity<T>类:
@Data
public class DocBaseEntity<T> implements Serializable {
private String _index;
private String _type;
private String _id;
private T datas;
public DocBaseEntity(SearchHit data) {
this._index = data.getIndex();
this._type = data.getType();
this._id = data.getId();
}
public DocBaseEntity(JSONObject jsonHits){
this._index = jsonHits.getStr("_index");
this._type = jsonHits.getStr("_type");
this._id = jsonHits.getStr("_id");
}
public T getDatas(){
return datas;
}
}
查询返回实体类SearchResult<T>
@Data
public class SearchResult<T> implements Serializable {
private int total;
private List<DocBaseEntity<T>> source = new ArrayList<>();
private JSONObject aggregations;
public void addData(DocBaseEntity<T> obj){
source.add(obj);
}
public List<T> getDatas(){
return source.stream().map(DocBaseEntity::getDatas).collect(Collectors.toList());
}
public void addDatas(List<DocBaseEntity<T>> objs){
source.addAll(objs);
}
public void setTotal(Object total){
this.total = Integer.parseInt(String.valueOf(total));
}
public JSONObject toJSONObject(){
return JSONUtil.parseObj(this,true);
}
}
定义查询接口ElasticSearchActuator
public interface ElasticSearchActuator {
/**
* from+size 分页查询
* @param indexName 索引名称
* @param searchSourceBuilder 查询条件
* @param pageNo 页码
* @param pageSize 每页数量
* @param resultObj 具体目标对象
* @return SearchResult
*/
<T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj);
}
from+size分页实现
@Component
@Slf4j
public class ElasticSearchActuatorImpl implements ElasticSearchActuator {
//restHighLevelClient客户端Configure相关单独编写,这里不再复述
@Autowird
private RestHighLevelClient restHighLevelClient;
private final static Integer MAX_RESULT_WINDOW = 10000;
@Override
public <T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj){
SearchResult<T> resultMap = new SearchResult<T>();
//提前规避超出长度的情况
if( from+size >= MAX_RESULT_WINDOW){
log.error("XXXXXXX")
//其他操作
return null;
}
//分页参数处理
int from = (pageNo - 1) * pageSize;
searchSourceBuilder.from(from).size(pageSize);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(SearchSourceBuilder );
SearchResponse response = executSearch(searchRequest);
if(null != response){
return createSearchResult(searchResp,resultObj);
}
return resultMap;
}
/**
* 执行查询
*/
/**
* 执行查询
*/
private SearchResponse executSearch(SearchRequest searchRequest) {
SearchResponse searchResponse = null;
try{
searchResponse = restHighLevelClient.search(searchRequest,RequestOptions.DEFAULT);
}catch(Exception e){
//异常处理
}
return searchResponse;
}
/**
* 构建目标结果
* @param response 返回参数
* @param resultObj 类对象
* @param <T>
* @return
*/
private <T> SearchResult<T> createSearchResult(SearchResponse response,Class<T> resultObj){
SearchResult<T> resultMap = new SearchResult<>();
SearchHit[] datas = response.getHits().getHits();
for(SearchHit data:datas){
DocBaseEntity<T> temp = new DocBaseEntity<>(data);
temp.setDatas(JSONUtil.toBean(JSONUtil.parseObj(data.getSourceAsMap()),resultObj));
resultMap.addData(temp);
}
resultMap.setTotal(response.getHits().getTotalHits().value);
return resultMap;
}
}
scroll分页
对SearchResult<T>补充scrollId值:
@Data
public class SearchResult<T> implements Serializable {
private int total;
//scrollId
private String scrollId;
private List<DocBaseEntity<T>> source = new ArrayList<>();
private JSONObject aggregations;
public void addData(DocBaseEntity<T> obj){
source.add(obj);
}
public List<T> getDatas(){
return source.stream().map(DocBaseEntity::getDatas).collect(Collectors.toList());
}
public void addDatas(List<DocBaseEntity<T>> objs){
source.addAll(objs);
}
public void setTotal(Object total){
this.total = Integer.parseInt(String.valueOf(total));
}
public JSONObject toJSONObject(){
return JSONUtil.parseObj(this,true);
}
}
继续定义查询接口ElasticSearchActuator
public interface ElasticSearchActuator {
/**
* from+size 分页查询
* @param indexName 索引名称
* @param searchSourceBuilder 查询条件
* @param pageNo 页码
* @param pageSize 每页数量
* @param resultObj 具体目标对象
* @return SearchResult
*/
<T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj);
/**
* 滚动分页查询
* @param indexName 索引
* @param searchSourceBuilder 查询体
* @param pageNo 页码
* @param pageSize 每页条数
* @param scrollId 滚动ID
* @param resultObj 目标对象
* @return SearchResult
* @param <T> T
*/
<T> SearchResult<T> scrollSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,String scrollId,Class<T> resultObj);
}
实现类:
@Component
@Slf4j
public class ElasticSearchActuatorImpl implements ElasticSearchActuator {
//restHighLevelClient客户端Configure相关单独编写,这里不再复述
@Autowird
private RestHighLevelClient restHighLevelClient;
private final static Integer MAX_RESULT_WINDOW = 10000;
@Override
public <T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj){
//……省略from+size查询
}
@Override
public <T> SearchResult<T> scrollSearchElasticSearchDatas(String indexName, SearchSourceBuilder searchSourceBuilder, int pageNo, int pageSize, String scrollId, Class<T> resultObj) throws IOException {
SearchRequest searchRequest = new SearchRequest(indexName);
searchSourceBuilder.size(pageSize);
//设定scroll失效时长
Scroll scroll = new Scroll(TimeValue.timeValueMinutes(3));
searchRequest.scroll(scroll);
SearchResponse searchResponse = null;
if(StringUtils.isEmpty(scrollId)){
searchResponse = executSearch(searchRequest);
String tempscrollId = searchResponse.getScrollId();
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(tempscrollId);
searchScrollRequest.scroll(scroll);
for (int i = 0; i < (pageNo -1); i++) {
searchResponse = scrollSearch(searchScrollRequest);
}
scrollId = tempscrollId;
}else {
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
searchResponse = scrollSearch(searchScrollRequest);
}
//构建结果
SearchResult<T> result = createSearchResult(searchResponse,resultObj);
result.setSrcollId(scrollId);
clearScrollSession(scrollId);
return result;
}
/**
* 滚动查询执行
* @param searchScrollRequest
* @return
*/
private SearchResponse scrollSearch(SearchScrollRequest searchScrollRequest){
SearchResponse searchResponse = null;
try{
searchResponse = restHighLevelClient.scroll(searchScrollRequest,RequestOptions.DEFAULT);
}catch(Exception e){
//异常处理
}
return searchResponse;
}
/**
* 关闭scroll
* @param scrollId
* @throws IOException
*/
private void clearScrollSession(String scrollId) throws IOException {
if (scrollId != null) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
clearScrollResponse.isSucceeded();
}
}
}
注意:
使用scroll查询,如果设置的scroll超时,scroll ID会在指定的超时时间内保持活跃,这个超时时间可以通过scroll参数设置。一旦超出这个时间限制,scroll ID将失效,但是不会自动清理。为了避免资源泄露,建议在scroll使用完毕后,显式地清理scroll上下文。
这里最好建立一层缓存记录,即每次客户发来请求后,记录当次查询的scrollId序列,然后定时的释放掉缓存中不用的序列。
search_after分页
对SearchResult<T>补充sortId值:
@Data
public class SearchResult<T> implements Serializable {
private int total;
//scrollId
private String scrollId;
//sortId
private List<Object> sortId;
private List<DocBaseEntity<T>> source = new ArrayList<>();
private JSONObject aggregations;
public void addData(DocBaseEntity<T> obj){
source.add(obj);
}
public List<T> getDatas(){
return source.stream().map(DocBaseEntity::getDatas).collect(Collectors.toList());
}
public void addDatas(List<DocBaseEntity<T>> objs){
source.addAll(objs);
}
public void setTotal(Object total){
this.total = Integer.parseInt(String.valueOf(total));
}
public JSONObject toJSONObject(){
return JSONUtil.parseObj(this,true);
}
}
继续定义查询接口ElasticSearchActuator
public interface ElasticSearchActuator {
/**
* from+size 分页查询
* @param indexName 索引名称
* @param searchSourceBuilder 查询条件
* @param pageNo 页码
* @param pageSize 每页数量
* @param resultObj 具体目标对象
* @return SearchResult
*/
<T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj);
/**
* 滚动分页查询
* @param indexName 索引
* @param searchSourceBuilder 查询体
* @param pageNo 页码
* @param pageSize 每页条数
* @param scrollId 滚动ID
* @param resultObj 目标对象
* @return SearchResult
* @param <T> T
*/
<T> SearchResult<T> scrollSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,String scrollId,Class<T> resultObj);
/**
* aftersearch分页查询
* @param indexName 索引
* @param searchSourceBuilder 查询体dsl
* @param pageNo 页码
* @param pageSize 每页条数
* @param sortId 排序游标
* @param resultObj 目标对象
* @return SearchResult
* @param <T> T
*/
<T> SearchResult<T> afterSearchElasticSearchData(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,List<Object> sortId,Class<T> resultObj);
}
实现类:
@Component
@Slf4j
public class ElasticSearchActuatorImpl implements ElasticSearchActuator {
//……其他逻辑
@Override
public <T> SearchResult<T> afterSearchElasticSearchData(String indexName, SearchSourceBuilder searchSourceBuilder, int pageNo, int pageSize, List<Object> sortId, Class<T> resultObj) {
SearchRequest searchRequest = new SearchRequest(indexName);
searchSourceBuilder.size(pageSize);
if(!CollectionUtils.isEmpty(sortId)){
searchSourceBuilder.searchAfter(sortId.toArray());
}else {
if(pageNo > 1){
//如果不携带上次排序标识,且非首页,递归查询
SearchResult<T> previousPage = afterSearchElasticSearchData(indexName,searchSourceBuilder,pageNo-1,pageSize,null,resultObj);
searchSourceBuilder.searchAfter(previousPage.getSortId().toArray());
}
searchRequest.source(searchSourceBuilder);
}
try{
SearchResponse response = executSearch(searchRequest);
SearchResult<T> rest = createSearchResult(response,resultObj);
SearchHit[] hits = response.getHits().getHits();
if(hits.length > 0){
rest.setSortId(Arrays.asList(hits[hits.length-1].getSortValues()));
}
return rest;
}catch (Exception e){
//异常处理
log.error("XXXXXX");
}
return null;
}
}