当前位置: 首页 > article >正文

ElasticSearch分页查询性能及封装实现

Es的分页方式

from+size

最基本的分页方式,类似于SQL中的Limit语法:

//查询年龄在12到32之间的前15条数据
{
  "query":{
    "bool":{
      "must":{
          "range":{
                "user_age":{
                    "gte":12,
                    "lte":32
               }
         }
       }
    }
  },
  "sort":{
     "user_age":{
        "order":"desc"
     }
  }
  "from":0,
  "size":15
}

 与Limit一样,from+size分页是通过设置from参数来指定返回结果的起始位置,而size参数来指定返回结果的数量。这里的页码在程序中需要额外进行处理一下,因为from是从0开始的,而size代表返回的条数,使用时需要对页码参数进行 :

from = (pageNo - 1) * pageSize 

 原理

 Es的查询过程如上图,即一个查询请求,在集群环境下是会被协调到各个节点中,最终落到对应索引的分片上,由每个分片进行查询,最终将数据回给主节点进行汇总。而使用from+size的分页,每个分片的处理则是这样的:

  • 搜索请求通常跨越多个分片,每个分片必须将其请求的命中内容以及任何先前页面的命中内容加载到内存中。
  • 对于翻页较深的页面或大量结果,这些操作会显著增加内存和 CPU 使用率,从而导致性能下降或节点故障。

例如,from=10000,size=10,需要将10010 条数据加载到内存,这通常意味着需要从多个分片中收集数据,然后在协调节点上进行合并和排序,然后经过后台处理后返回了最后 10条我们想要的数据。这个过程随着数据量的增加而变得更加复杂和资源密集,那也就意味着,越往后翻页(也就是深度翻页)需要加载的数据量越大,势必会越耗费 CPU + 内存资源,响应也会越慢。

性能

默认情况下,from+size 的限制是 10000,这意味着 from 参数加上 size 参数的值不能超过 10000,这是为了避免大数据量的召回导致性能低下。如果你尝试进行深度分页,超过了这个限制,Elasticsearch 会抛出错误,提示结果窗口太大。

为了解决这个问题,可以通过调整 index.max_result_window 的值来增加这个限制,但这通常不推荐,因为它会增加内存和 CPU 使用率,可能导致性能下降或节点故障

超出from+size限制报错:

"root_cause": [
      {
        "type": "illegal_argument_exception",
        "reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [10001]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting."
      }
    ],
    "type": "search_phase_execution_exception",

scroll分页

原理

当发起一个带有 scroll 参数的搜索请求时,Elasticsearch 的分片会为这次搜索创建一个上下文,然后各分片基于快照数据进行相应的查询,每轮查询结束后,会记录一个scrollId,将这批结果以及对应的scrollId返回给客户端。

此时客户端根据返回的scrollId再次发起查询,此时Es服务端会根据该scrollId,找到所属的上下文,并基于上次查询的结果的尾段进行继续查询,相比from+size的每次查询都需要重复大数据量的召回,scroll查询有效的避免了召回操作。减少了CPU、IO的消耗。

性能

相比from+size的重复大批量数据召回消耗CPU和IO,Scroll是更友好且适合大数据量的深度查询(不受制于max_result_window),但是Scroll提高性能的代价是牺牲实时性;当开始一个 scroll session 时,Elasticsearch 会创建一个索引的快照(上下文中),这个快照代表了初始化搜索请求时的索引状态。在 scroll session 的生命周期内,即使索引发生了变化(如新增、删除或更新文档),这些变化也不会反映在后续的滚动查询中。

同时,快照、上下文维护的存在必然导致需要更多的内存来支撑

官方文档强调:不再建议使用scroll API进行深度分页。如果要分页检索超过 Top 10,000+ 结果时,推荐使用:PIT + search_after。

search_after分页

原理

search_after 是 Elasticsearch 5.0 以上版本提供的一种分页查询机制,用于解决深度分页的性能问题。它通过维护一个实时游标来避免传统 from+size 分页方式在处理大量数据时的性能损耗,也不需要像 scroll API 那样创建和维护一个历史快照,从而减少了资源的占用。

search_after进行查询时,必须指定排序字段,它使用上一次查询的最后一个文档的排序值来获取下一页数据。不同于scroll基于快照的查询,search_after是基于实时的数据,不需要维护一个很大的快照。 

性能

search_after 是一种无状态的分页方式,它不需要维护搜索上下文,因此不会占用额外的资源。每次请求都会根据上一次请求的最后一个文档的排序值来获取下一页数据,这样可以避免大量的内存消耗。search_after 提供了更好的实时性,因为它每次请求都会反映索引的最新状态。这意味着在查询过程中如果有数据的更新,这些变化会反映在分页结果中

场景及优缺点汇总

基于RestHighLevelClient的实现

RestHighLevelClient 是 Elasticsearch 的高级 Java 客户端,它提供了一套简单易用的 API 来与 Elasticsearch 服务器进行交互。

RestHighLevelClient位于org.elasticsearch.client包下,常用功能包括:

方法名称入参使用样例备注
createIndexclientRestHighLevelClient 实例, indexName: 索引名称createIndex(client, "my_index");创建索引 
deleteIndexclientRestHighLevelClient 实例, indexName: 索引名称deleteIndex(client, "my_index");删除索引 
indexrequestIndexRequest 对象client.index(request, RequestOptions.DEFAULT);插入数据 
getrequestGetRequest 对象client.get(request, RequestOptions.DEFAULT);根据ID获取数据 
updaterequestUpdateRequest 对象client.update(request, RequestOptions.DEFAULT);更新数据 
deleterequestDeleteRequest 对象client.delete(request, RequestOptions.DEFAULT);根据ID删除数据 
searchrequestSearchRequest 对象client.search(request, RequestOptions.DEFAULT);搜索数据 
scrollrequestSearchScrollRequest 对象client.scroll(request, RequestOptions.DEFAULT);滚动搜索 
clearScrollrequestClearScrollRequest 对象client.clearScroll(request, RequestOptions.DEFAULT);清除滚动ID 
bulkrequestBulkRequest 对象client.bulk(request, RequestOptions.DEFAULT);批量操作 
countrequestCountRequest 对象client.count(request, RequestOptions.DEFAULT);计数查询 
existsrequestGetRequest 对象client.exists(request, RequestOptions.DEFAULT);检查文档是否存在 
updateByQueryrequestUpdateByQueryRequest 对象client.updateByQuery(request, RequestOptions.DEFAULT);根据查询更新数据 
deleteByQueryrequestDeleteByQueryRequest 对象client.deleteByQuery(request, RequestOptions.DEFAULT);根据查询删除数据 

其中 BulkRequest 、GetRequest 等参数,均为ActionRequest的子类,具体使用方式可以参考下文。

其中DSL语法可配合org.elasticsearch.search.builder包中的Builder来进行构建,eg:

 public void test(){

        SearchSourceBuilder searchBody = new SearchSourceBuilder()
                .from(0)
                .size(10)
                .query(QueryBuilders.boolQuery()
                        .filter(QueryBuilders.termQuery("user_name","张三"))
                        .must(QueryBuilders.termQuery("age",12)))
                .sort("id", SortOrder.DESC)
                .fetchSource(Arrays.asList("id","name","age").toArray(new String[0]),new String[0])
                .aggregation(AggregationBuilders.terms("Test")
                        .field("className")
                        .size(15));
    }
    

form+size分页

定义es数据实体类 DocBaseEntity<T>类:

@Data
public class DocBaseEntity<T> implements Serializable {

    private String _index;

    private String _type;

    private String _id;

    private T datas;

    public DocBaseEntity(SearchHit data) {
        this._index = data.getIndex();
        this._type = data.getType();
        this._id = data.getId();
    }

    public DocBaseEntity(JSONObject jsonHits){
        this._index = jsonHits.getStr("_index");
        this._type = jsonHits.getStr("_type");
        this._id = jsonHits.getStr("_id");
    }

    public T getDatas(){
        return datas;
    }

}

查询返回实体类SearchResult<T>

@Data
public class SearchResult<T> implements Serializable {


    private int total;


    private List<DocBaseEntity<T>> source = new ArrayList<>();


    private JSONObject aggregations;


    public void addData(DocBaseEntity<T> obj){
        source.add(obj);
    }

    public List<T> getDatas(){
        return source.stream().map(DocBaseEntity::getDatas).collect(Collectors.toList());
    }

    public void addDatas(List<DocBaseEntity<T>> objs){
        source.addAll(objs);
    }


    public void setTotal(Object total){
        this.total = Integer.parseInt(String.valueOf(total));
    }


    public JSONObject toJSONObject(){
        return JSONUtil.parseObj(this,true);
    }


}

定义查询接口ElasticSearchActuator


public interface ElasticSearchActuator {

  /**
     * from+size 分页查询
     * @param indexName 索引名称
     * @param searchSourceBuilder 查询条件
     * @param pageNo 页码
     * @param pageSize 每页数量
     * @param resultObj 具体目标对象
     * @return SearchResult
     */
    <T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj);

    
    

}

 from+size分页实现

@Component
@Slf4j
public class ElasticSearchActuatorImpl implements ElasticSearchActuator {

   //restHighLevelClient客户端Configure相关单独编写,这里不再复述
   @Autowird
   private RestHighLevelClient restHighLevelClient;


   private final static Integer MAX_RESULT_WINDOW = 10000;  
   
   @Override
   public <T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj){
       SearchResult<T> resultMap = new SearchResult<T>();
       //提前规避超出长度的情况
       if( from+size >= MAX_RESULT_WINDOW){
           log.error("XXXXXXX")
           //其他操作
           return null;
       }
       //分页参数处理
       int from = (pageNo - 1) * pageSize;
       searchSourceBuilder.from(from).size(pageSize);
       SearchRequest searchRequest = new SearchRequest(indexName);
       searchRequest.source(SearchSourceBuilder );
       SearchResponse response = executSearch(searchRequest);
       if(null != response){
          return  createSearchResult(searchResp,resultObj);
       }
       return resultMap;
    }

    /**
    * 执行查询
    */
    /**
     * 执行查询
     */
    private SearchResponse executSearch(SearchRequest searchRequest)     {
        SearchResponse searchResponse = null;
        try{
            searchResponse = restHighLevelClient.search(searchRequest,RequestOptions.DEFAULT);
        }catch(Exception e){
            //异常处理
        }
        return searchResponse;
    }

    /**
     * 构建目标结果
     * @param response 返回参数
     * @param resultObj 类对象
     * @param <T>
     * @return
     */
    private <T> SearchResult<T> createSearchResult(SearchResponse response,Class<T> resultObj){
        SearchResult<T> resultMap = new SearchResult<>();
        SearchHit[] datas = response.getHits().getHits();
        for(SearchHit data:datas){
            DocBaseEntity<T> temp = new DocBaseEntity<>(data);
            temp.setDatas(JSONUtil.toBean(JSONUtil.parseObj(data.getSourceAsMap()),resultObj));
            resultMap.addData(temp);
        }
        resultMap.setTotal(response.getHits().getTotalHits().value);
        return resultMap;
    }
   

}

scroll分页

SearchResult<T>补充scrollId值:

@Data
public class SearchResult<T> implements Serializable {
    
    private int total;
    //scrollId
    private String scrollId;

    private List<DocBaseEntity<T>> source = new ArrayList<>();
    
    private JSONObject aggregations;
    
    public void addData(DocBaseEntity<T> obj){
        source.add(obj);
    }

    public List<T> getDatas(){
        return source.stream().map(DocBaseEntity::getDatas).collect(Collectors.toList());
    }
    
    public void addDatas(List<DocBaseEntity<T>> objs){
        source.addAll(objs);
    }
    
    public void setTotal(Object total){
        this.total = Integer.parseInt(String.valueOf(total));
    }
    
    public JSONObject toJSONObject(){
        return JSONUtil.parseObj(this,true);
    }
    
}

继续定义查询接口ElasticSearchActuator


public interface ElasticSearchActuator {

  /**
     * from+size 分页查询
     * @param indexName 索引名称
     * @param searchSourceBuilder 查询条件
     * @param pageNo 页码
     * @param pageSize 每页数量
     * @param resultObj 具体目标对象
     * @return SearchResult
     */
    <T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj);

    /**
     * 滚动分页查询
     * @param indexName 索引
     * @param searchSourceBuilder 查询体
     * @param pageNo 页码
     * @param pageSize 每页条数
     * @param scrollId 滚动ID
     * @param resultObj 目标对象
     * @return SearchResult
     * @param <T> T
     */
    <T> SearchResult<T> scrollSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,String scrollId,Class<T> resultObj);

    

}

实现类:

@Component
@Slf4j
public class ElasticSearchActuatorImpl implements ElasticSearchActuator {

   //restHighLevelClient客户端Configure相关单独编写,这里不再复述
   @Autowird
   private RestHighLevelClient restHighLevelClient;


   private final static Integer MAX_RESULT_WINDOW = 10000;  
   
   @Override
   public <T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj){
       //……省略from+size查询
    }

    @Override
    public <T> SearchResult<T> scrollSearchElasticSearchDatas(String indexName, SearchSourceBuilder searchSourceBuilder, int pageNo, int pageSize, String scrollId, Class<T> resultObj) throws IOException {
        SearchRequest searchRequest = new SearchRequest(indexName);
        searchSourceBuilder.size(pageSize);
        //设定scroll失效时长
        Scroll scroll = new Scroll(TimeValue.timeValueMinutes(3));
        searchRequest.scroll(scroll);
        SearchResponse searchResponse = null;
        if(StringUtils.isEmpty(scrollId)){
                searchResponse = executSearch(searchRequest);
                String tempscrollId = searchResponse.getScrollId();
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(tempscrollId);
                searchScrollRequest.scroll(scroll);
                for (int i = 0; i < (pageNo -1); i++) {
                    searchResponse = scrollSearch(searchScrollRequest);
                }
                scrollId = tempscrollId;
        }else {
            SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
            searchResponse = scrollSearch(searchScrollRequest);
        }
        //构建结果
        SearchResult<T> result = createSearchResult(searchResponse,resultObj);
        result.setSrcollId(scrollId);
        clearScrollSession(scrollId);
        return result;
    }



   
    /**
     * 滚动查询执行
     * @param searchScrollRequest
     * @return
     */
   private  SearchResponse scrollSearch(SearchScrollRequest searchScrollRequest){
       SearchResponse searchResponse = null;
       try{
           searchResponse = restHighLevelClient.scroll(searchScrollRequest,RequestOptions.DEFAULT);
       }catch(Exception e){
           //异常处理
       }
       return searchResponse;
   }

    /**
     * 关闭scroll
     * @param scrollId
     * @throws IOException
     */
   private void clearScrollSession(String scrollId) throws IOException {
       if (scrollId != null) {
           ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
           clearScrollRequest.addScrollId(scrollId);
           ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
           clearScrollResponse.isSucceeded();
       }
   }
   

}

注意:

使用scroll查询,如果设置的scroll超时,scroll ID会在指定的超时时间内保持活跃,这个超时时间可以通过scroll参数设置。一旦超出这个时间限制,scroll ID将失效,但是不会自动清理。为了避免资源泄露,建议在scroll使用完毕后,显式地清理scroll上下文。

这里最好建立一层缓存记录,即每次客户发来请求后,记录当次查询的scrollId序列,然后定时的释放掉缓存中不用的序列。

search_after分页

 对SearchResult<T>补充sortId值:

@Data
public class SearchResult<T> implements Serializable {

    private int total;
    //scrollId
    private String scrollId;
    //sortId
    private List<Object> sortId;

    private List<DocBaseEntity<T>> source = new ArrayList<>();

    private JSONObject aggregations;

    public void addData(DocBaseEntity<T> obj){
        source.add(obj);
    }

    public List<T> getDatas(){
        return source.stream().map(DocBaseEntity::getDatas).collect(Collectors.toList());
    }

    public void addDatas(List<DocBaseEntity<T>> objs){
        source.addAll(objs);
    }

    public void setTotal(Object total){
        this.total = Integer.parseInt(String.valueOf(total));
    }

    public JSONObject toJSONObject(){
        return JSONUtil.parseObj(this,true);
    }

}

继续定义查询接口ElasticSearchActuator

public interface ElasticSearchActuator {

  /**
     * from+size 分页查询
     * @param indexName 索引名称
     * @param searchSourceBuilder 查询条件
     * @param pageNo 页码
     * @param pageSize 每页数量
     * @param resultObj 具体目标对象
     * @return SearchResult
     */
    <T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj);

    /**
     * 滚动分页查询
     * @param indexName 索引
     * @param searchSourceBuilder 查询体
     * @param pageNo 页码
     * @param pageSize 每页条数
     * @param scrollId 滚动ID
     * @param resultObj 目标对象
     * @return SearchResult
     * @param <T> T
     */
    <T> SearchResult<T> scrollSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,String scrollId,Class<T> resultObj);

     /**
     * aftersearch分页查询
     * @param indexName 索引
     * @param searchSourceBuilder 查询体dsl
     * @param pageNo 页码
     * @param pageSize 每页条数
     * @param sortId 排序游标
     * @param resultObj 目标对象
     * @return SearchResult
     * @param <T> T
     */
    <T> SearchResult<T> afterSearchElasticSearchData(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,List<Object> sortId,Class<T> resultObj);


}

实现类:

@Component
@Slf4j
public class ElasticSearchActuatorImpl implements ElasticSearchActuator {


   //……其他逻辑


    @Override
    public <T> SearchResult<T> afterSearchElasticSearchData(String indexName, SearchSourceBuilder searchSourceBuilder, int pageNo, int pageSize, List<Object> sortId, Class<T> resultObj) {
        SearchRequest searchRequest = new SearchRequest(indexName);
        searchSourceBuilder.size(pageSize);
        if(!CollectionUtils.isEmpty(sortId)){
            searchSourceBuilder.searchAfter(sortId.toArray());
        }else {
            if(pageNo > 1){
                //如果不携带上次排序标识,且非首页,递归查询
                SearchResult<T> previousPage = afterSearchElasticSearchData(indexName,searchSourceBuilder,pageNo-1,pageSize,null,resultObj);
                searchSourceBuilder.searchAfter(previousPage.getSortId().toArray());
            }
            searchRequest.source(searchSourceBuilder);
        }
        try{
            SearchResponse response = executSearch(searchRequest);
            SearchResult<T> rest = createSearchResult(response,resultObj);
            SearchHit[] hits = response.getHits().getHits();
            if(hits.length > 0){
                rest.setSortId(Arrays.asList(hits[hits.length-1].getSortValues()));
            }
            return rest;
        }catch (Exception e){
            //异常处理
            log.error("XXXXXX");
        }
        return null;
    }

}

http://www.kler.cn/a/317514.html

相关文章:

  • 安装SQL server中python和R
  • 1.两数之和-力扣(LeetCode)
  • 响应式网页设计--html
  • neo4j desktop基本入门
  • 2024版本IDEA创建Sprintboot项目下载依赖缓慢
  • Android Framework AMS(16)进程管理
  • 为什么推荐使用英文版LabVIEW
  • linux-网络管理-网络配置
  • 直播美颜工具的开发详解:基于视频美颜SDK的解决方案
  • java项目之基于springboot的医院资源管理系统源码
  • 【Linux 从基础到进阶】 Xen 虚拟化技术应用
  • 【实践功能7】前端实现截屏
  • Leetcode算法基础篇-递归算法
  • Spring事务类型及传播行为实战指南
  • JEDEC DDR4 SRAM standard
  • go 读取excel数据存储到mysql
  • 案例研究丨国控星鲨利用DataEase释放数据潜能,重塑业务视野
  • 从底层原理上解释 ClickHouse 的索引
  • leetcode 205.同构字符串
  • 如何快速上手一个Github的开源项目
  • C++ 9.24
  • 如何使用ssm实现疫苗预约系统+vue
  • 使用synchronized锁住字符串
  • Shire 智能体市场:IDE 一键安装多智能体,协同打造集体智慧 Copilot
  • 迎国庆-为祖国庆生python、Java、C各显神通
  • 【Python】数据可视化之分布图