当前位置: 首页 > article >正文

Elasticsearch 8.17.1 JAVA工具类

 一、ElasticSearchUtils 

package com.wssnail.elasticsearch.util;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.Refresh;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchAllQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.TermsQuery;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.ElasticsearchIndicesClient;
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wssnail.elasticsearch.constants.MetricConstant;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.*;

/**
 * @Author: 熟透的蜗牛
 * @CreateTime: 2023-01-05 16:47
 * @Description: TODO
 * @Version: 1.0
 */
@Slf4j
public class ElasticSearchUtils {

    private ElasticsearchClient elasticsearchClient;

    private static final Integer MAX_PAGE_SIZE = 1000;

    public ElasticSearchUtils(ElasticsearchClient elasticsearchClient) {
        this.elasticsearchClient = elasticsearchClient;
    }

    /*
     * @description:创建索引
     * @author:  熟透的蜗牛
     * @date: 2023/1/5 16:58
     * @param index
     * @return: boolean
     **/
    public boolean createIndex(String index) {
        try {
            if (isIndexExist(index)) {
                log.error("Index is  exits!");
                return false;
            }
            //1.创建索引请求
            CreateIndexResponse indexResponse = elasticsearchClient.indices().create(c -> c.index(index));
            return indexResponse.acknowledged();
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>createIndex>>>id>>>>>>{}", e);
        }
        return false;
    }

    /*
     * @description:判断索引是否存在
     * @author:  熟透的蜗牛
     * @date: 2023/1/5 16:59
     * @param index
     * @return: boolean
     **/
    public boolean isIndexExist(String index) {
        try {
            ElasticsearchIndicesClient indices = elasticsearchClient.indices();
            BooleanResponse response = indices.exists(ExistsRequest.of(x -> x.index(index)));
            return response.value();
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>isIndexExist>>>id>>>>>>{}", e);
        }
        return false;
    }

    /*
     * @description:删除索引
     * @author:  熟透的蜗牛
     * @date: 2023/1/5 17:00
     * @param null
     * @return: null
     **/
    public boolean deleteIndex(String index) {
        try {
            if (!isIndexExist(index)) {
                log.error("Index is not exits!");
                return false;
            }
            DeleteIndexResponse response = elasticsearchClient.indices().delete(x -> x.index(index));
            return response.acknowledged();
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>deleteIndex>>>id>>>>>>{}", e);
        }
        return false;
    }

    /*
     * @description:更新数据,如果没有数据则新增
     * @author:  熟透的蜗牛
     * @date: 2023/1/5 17:01
     * @param object 对象
     * @param index 索引名称
     * @param id 数据id
     * @return: java.lang.String
     **/
    public String submitDocument(Object object, String index, String id) {
        if (null == id) {
            return addDocument(object, index);
        }
        if (this.existsById(index, id)) {
            return this.updateDocumentByIdNoRealTime(object, index, id);
        } else {
            return addDocument(object, index, id);
        }
    }

    /*
     * @description:新增数据
     * @author:  熟透的蜗牛
     * @date: 2023/1/5 17:03
     * @param object
     * @param index
     * @param id 数据id,为空时自动生成
     * @return: java.lang.String
     **/
    public String addDocument(Object object, String index, String id) {
        try {
            if (null == id) {
                return addDocument(object, index);
            }
            if (this.existsById(index, id)) {
                return this.updateDocumentByIdNoRealTime(object, index, id);
            }

            IndexResponse indexResponse = elasticsearchClient.index(IndexRequest.of(x -> x.id(id).document(object).index(index)));
            log.info("添加数据成功{}", indexResponse.result().jsonValue());
            return id;
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>addDocument>>>id>>>>>>{}", e);
        }
        return null;
    }

    /*
     * @description:新增数据,id用UUID生成
     * @author:  熟透的蜗牛
     * @date: 2023/1/5 17:05
     * @param object
     * @param index
     * @return: java.lang.String
     **/
    public String addDocument(Object object, String index) {
        try {
            return addDocument(object, index, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>addDocument>>>>>>{}", e);
        }
        return null;
    }

    /*
     * @description:根据id判断文档是否存在
     * @author:  熟透的蜗牛
     * @date: 2023/1/5 17:07
     * @param index
     * @param id
     * @return: boolean
     **/
    public boolean existsById(String index, String id) {
        try {
            BooleanResponse response = elasticsearchClient.existsSource(ExistsSourceRequest.of(builder -> builder.index(index).id(id)));
            return response.value();
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>existsById>>>>>>{}", e);
        }
        return false;
    }

    /*
     * @description:通过id删除数据
     * @author:  熟透的蜗牛
     * @date: 2023/1/5 17:08
     * @param index
     * @param id
     * @return: java.lang.String
     **/
    public String deleteDocumentById(String index, String id) {
        try {
            DeleteResponse delete = elasticsearchClient.delete(DeleteRequest.of(builder -> builder.id(id).index(index)));
            log.info("删除数据成功 >>>>>>>>id为{}", delete.id());
            return delete.id();
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>deleteDocumentById>>>>>>{}", e);
        }
        return null;
    }

    /*
     * @description:通过id修改数据
     * @author:  熟透的蜗牛
     * @date: 2023/1/5 17:09
     * @param null
     * @return: null
     **/
    public String updateDocumentById(Object object, String index, String id) {
        try {
            UpdateRequest<Object, Object> updateRequest = new UpdateRequest.Builder<>().doc(object).index(index).id(id).build();
            UpdateResponse<Object> updateResponse = elasticsearchClient.update(updateRequest, object.getClass());

            log.info("索引为: {}, id为: {},updateResponseID:{}, 实时更新数据成功", index, id, updateResponse.id());
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>updateDocumentById>>>>>>{}", e);
        }
        return null;
    }

    /*
     * @description:通过id实时更新数据
     * @author:  熟透的蜗牛
     * @date: 2023/1/5 17:11
     * @param object
     * @param index
     * @param id
     * @return: java.lang.String
     **/
    public String updateDocumentByIdNoRealTime(Object object, String index, String id) {
        try {
            UpdateResponse<?> updateResponse = elasticsearchClient.update(x -> x.id(id).index(index).doc(object)
                    .refresh(Refresh.WaitFor).timeout(t -> t.time("1s")), object.getClass());
            //执行更新请求
            log.info("索引为: {}, id为: {},updateResponseID:{}, 实时更新数据成功", index, id, updateResponse.id());
            return updateResponse.id();
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>updateDocumentByIdNoRealTime>>>>>>{}", e);
        }
        return null;
    }

    /**
     * @param index
     * @param id
     * @param includeFields 指定返回字段
     * @param excludeFields 指定排除的字段
     * @description: 根据id查询指定数据
     * @return: java.util.Map
     * @author 熟透的蜗牛
     * @date: 2025/1/19 17:39
     */
    public Map searchDocumentById(String index, String id, List<String> includeFields, List<String> excludeFields) {
        try {
            GetResponse<Map> response = elasticsearchClient.get(builder -> builder.index(index).id(id).sourceIncludes(includeFields).sourceExcludes(excludeFields), Map.class);
            return response.source();
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>查询失败searchDocumentById>>>>>>{}", e);
        }
        return new HashMap();
    }


    /**
     * @param index
     * @param objects
     * @description:批量插入,单词不能超20w
     * @return: boolean
     * @author 熟透的蜗牛
     * @date: 2025/1/19 18:07
     */
    public boolean bulkPost(String index, List<Map> objects) {
        try {
            BulkRequest.Builder br = new BulkRequest.Builder();
            objects.forEach(list -> {
                br.operations(op -> op
                        .index(idx -> idx.index(index).document(list.get("data")).id((String) list.get("id"))));
            });
            BulkResponse result = elasticsearchClient.bulk(br.build());
            if (result.errors()) {
                log.info(">>>>>>>>>>>>>>>批量插入有误");
                for (BulkResponseItem item : result.items()) {
                    if (item.error() != null) {
                        log.error(item.error().reason());
                    }
                }
                return result.errors();
            }
            return true;
        } catch (Exception e) {
            log.error(">>>>>>异常信息{}", e);
            return false;
        }
    }


    /**
     * @param index
     * @param highlightField 需要高亮的字段
     * @param start
     * @param pageSize
     * @param query
     * @description: 高亮查询
     * @return: java.util.List<java.lang.Object>
     * @author 熟透的蜗牛
     * @date: 2025/1/19 20:26
     */
    public List<Hit<Map>> searchHighLightFieldDocument(String index,
                                                       String[] highlightField, Integer start, Integer pageSize, Query... query) {
        List<Hit<Map>> hits = new ArrayList<>();
        try {
            SearchRequest.Builder searchBuilder = new SearchRequest.Builder().index(index);
            for (int i = 0; i < highlightField.length; i++) {
                int finalI = i;
                searchBuilder.highlight(builder -> builder.preTags("<span style='color:red'>")
                        .postTags("</span>").fields(highlightField[finalI], h -> h));
            }
            for (Query q : query) {
                if (q._kind().equals(Query.Kind.Bool)) {
                    searchBuilder.query(bb -> bb.bool(b -> b.must(q)));
                }
                if (q._kind().equals(Query.Kind.Range)) {
                    q.range();
                }
                if (q._kind().equals(Query.Kind.Term)) {
                    q.term();
                }
                if (q._kind().equals(Query.Kind.MultiMatch)) {
                    q.multiMatch();
                }
                searchBuilder.query(q);
            }
            searchBuilder.size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize);
            searchBuilder.from((start - 1) * pageSize);
            SearchResponse searchResponse = elasticsearchClient.search(searchBuilder.build(), Map.class);
            log.info(">>>>>>>>>>>>>searchResponse{}", searchResponse);
            if (searchResponse.hits().total().value() > 0) {
                hits = searchResponse.hits().hits();
            }
        } catch (IOException e) {
            log.info(">>>>>>>>>>>>>>>searchListDocument>>>>>>{}", e);
        }
        return hits;
    }


    /**
     * @param index
     * @param field     需要查询的字段
     * @param value     查询字段的值
     * @param beanClass
     * @param start
     * @param pageSize
     * @description:按条件精确查询
     * @return: java.util.List<T>
     * @author 熟透的蜗牛
     * @date: 2025/1/21 1:11
     */
    public <T> List<T> termQuery(String index, String field, String value, Class<T> beanClass, Integer start, Integer pageSize) {
        List<T> list = new ArrayList<>();
        try {
            Query query = MatchQuery.of(m -> m
                    .field(field)
                    .query(value)
            )._toQuery();
            SearchResponse<T> response = elasticsearchClient.search(s -> s
                            .index(index)
                            .query(q -> q
                                    .bool(b -> b
                                            .must(query)
                                    )
                            ).from((start - 1) * pageSize).size(pageSize),
                    beanClass
            );
            this.queryEsData(beanClass, list, response);
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>termQuery>>>>>>{}", e);
        }
        return list;
    }

    /*
     * @description:自定义查询
     * @author:  熟透的蜗牛
     * @date: 2023/1/9 15:49
     * @param index
     * @param beanClass
     * @param query 查询条件
     * @return: java.util.List<T>
     **/
    public <T> List<T> selfDefine(String index, Class<T> beanClass, Query query) {
        List<T> list = new ArrayList<>();
        try {
            SearchResponse<T> response = elasticsearchClient.search(q -> q.index(index).query(query), beanClass);
            this.queryEsData(beanClass, list, response);
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>termQuery>>>>>>{}", e);
        }
        return list;
    }

    /**
     * @param indexName
     * @param query
     * @description: 查询文档数量
     * @return: long
     * @author 熟透的蜗牛
     * @date: 2025/1/21 2:11
     */
    public <T> long count(String indexName, Query query) {
        try {
            CountResponse response = elasticsearchClient.count(q -> q.index(indexName).query(query));
            return response.count();
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>count>>>>>>{}", e);
        }
        return 0;
    }

    /*
     * @description:多值匹配
     * @author:  熟透的蜗牛
     * @date: 2023/1/9 14:39
     * @param index
     * @param field 字段
     * @param dataArgs 数值
     * @param beanClass 转化实体类
     * @param start
     * @param pageSize
     * @return: java.util.List<T>
     **/
    public <T> List<T> termsQuery(String index, String field, List<FieldValue> fieldValues, Class<T> beanClass, Integer start, Integer pageSize) {
        // 查询的数据列表
        List<T> list = new ArrayList<>();
        try {
            TermsQuery termsQuery = TermsQuery.of(terms -> terms.field(field).terms(value -> value.value(fieldValues)));
            SearchResponse<T> response = elasticsearchClient.search(builder ->
                            builder.index(index).query(terms -> terms.terms(termsQuery))
                                    .from((start - 1) * pageSize)
                                    .size(pageSize)
                                    .trackTotalHits(f -> f.enabled(true))
                    , beanClass);
            queryEsData(beanClass, list, response);
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>termsQuery>>>>>>{}", e);
        }
        return list;
    }

    /**
     * @param index
     * @param beanClass
     * @param start
     * @param pageSize
     * @description: match_all查询
     * @return: java.util.List<T>
     * @author 熟透的蜗牛
     * @date: 2025/1/22 0:10
     */
    public <T> List<T> matchAllQuery(String index, Class<T> beanClass, int start, int pageSize) {
        // 查询的数据列表
        List<T> list = new ArrayList<>();
        try {
            SearchResponse<T> searchResponse = elasticsearchClient.search(
                    q -> q.index(index)
                            .query(matchAll -> matchAll
                                    .matchAll(all -> all))
                            .from((start - 1) * pageSize)
                            .size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
                            .trackTotalHits(f -> f.enabled(true)),
                    beanClass);
            queryEsData(beanClass, list, searchResponse);
        } catch (IOException e) {
            log.info(">>>>>>>>>>>>>>>matchAllQuery>>>>>>{}", e);
        }
        return list;
    }

    /**
     * @param index
     * @param beanClass
     * @param field
     * @param value
     * @description:词组查询
     * @return: java.util.List<T>
     * @author 熟透的蜗牛
     * @date: 2025/1/22 0:08
     */
    public <T> List<T> matchPhraseQuery(String index, Class<T> beanClass, String field, Object value, int start, int pageSize) {
        List<T> list = new ArrayList<>();
        try {
            SearchResponse<T> response = elasticsearchClient.search(q ->
                            q.index(index).query(
                                            pp -> pp.matchPhrase(
                                                    p -> p.field(field)
                                                            .query(String.valueOf(value))))
                                    .from((start - 1) * pageSize)
                                    .size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
                                    .trackTotalHits(f -> f.enabled(true)),
                    beanClass);
            queryEsData(beanClass, list, response);
        } catch (IOException e) {
            log.info(">>>>>>>>>>>>>>>matchPhraseQuery>>>>>>{}", e);
        }
        return list;
    }

    /*
     * @description:内容在多字段查询
     * @author:  熟透的蜗牛
     * @date: 2023/1/9 15:27
     * @param index
     * @param beanClass
     * @param fields
     * @param text
     * @param start 起始位置
     * @param pageSize 页容量
     * @return: java.util.List<T>
     **/
    public <T> List<T> matchMultiQuery(String index, Class<T> beanClass, String[] fields, Object text, Integer start, Integer pageSize) {
        // 查询的数据列表
        List<T> list = new ArrayList<>();
        try {
            SearchResponse<T> response = elasticsearchClient.search(q -> q.index(index)
                            .query(multiMatch ->
                                    multiMatch.multiMatch(m ->
                                            m.fields(Arrays.asList(fields))
                                                    .query(text.toString())))
                            .from((start - 1) * pageSize)
                            .size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
                            .trackTotalHits(f -> f.enabled(true)),
                    beanClass);
            queryEsData(beanClass, list, response);
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>matchMultiQuery>>>>>>{}", e);
        }
        return list;
    }

    /*
     * @description: 通配符查询(wildcard):会对查询条件进行分词。还可以使用通配符 ?(任意单个字符) 和 * (0个或多个字符)
     * *:表示多个字符(0个或多个字符)
     * ?:表示单个字符
     * @author:  熟透的蜗牛
     * @date: 2023/1/6 15:02
     * @param index
     * @param beanClass
     * @param field
     * @param text
     * @return: java.util.List<T>
     **/
    public <T> List<T> wildcardQuery(String index, Class<T> beanClass, String field, String text, Integer start, Integer pageSize) {
        // 查询的数据列表
        List<T> list = new ArrayList<>();
        try {
            SearchResponse<T> response = elasticsearchClient.search(q -> q.index(index)
                            .query(wildcard -> wildcard.wildcard(w ->
                                    w.field(field)
                                            .value(text.toString())))
                            .from((start - 1) * pageSize)
                            .size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
                            .trackTotalHits(f -> f.enabled(true)),
                    beanClass);
            queryEsData(beanClass, list, response);
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>wildcardQuery>>>>>>{}", e);
        }
        return list;
    }


    /**
     * @param index
     * @param beanClass
     * @param field
     * @param text
     * @param start
     * @param pageSize
     * @param prefixLength 指定匹配的前缀长度,前面多少个字符需要完全匹配
     * @param maxExpansion 限制查询扩展的最大数量。用于控制在查询中允许的最大变体数量
     * @description: 模糊匹配
     * @return: java.util.List<T>
     * @author 熟透的蜗牛
     * @date: 2025/1/22 1:02
     */
    public <T> List<T> fuzzyQuery(String index, Class<T> beanClass, String field, String text, Integer start,
                                  Integer pageSize, Integer prefixLength, Integer maxExpansion) {
        // 查询的数据列表
        List<T> list = new ArrayList<>();
        try {
            SearchResponse<T> response = elasticsearchClient.search(q -> q.index(index)
                            .query(fuzzy ->
                                    fuzzy.fuzzy(f ->
                                            f.field(field)
                                                    .value(text.toString())
                                                    .fuzziness("auto")
                                                    .prefixLength(prefixLength)
                                                    .maxExpansions(maxExpansion)))
                            .from((start - 1) * pageSize)
                            .size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
                            .trackTotalHits(f -> f.enabled(true)),
                    beanClass);
            queryEsData(beanClass, list, response);
        } catch (IOException e) {
            log.info(">>>>>>>>>>>>>>>fuzzyQuery>>>>>>{}", e);
        }
        return list;
    }

    /*
     * @description:聚合查询
     * @author:  熟透的蜗牛
     * @date: 2023/1/6 15:34
     * @param index
     * @param type 常用的操作有:avg:求平均、max:最大值、min:最小值、sum:求和
     * @param field
     **/
    public Double metricQuery(String index, String type, String field) {
        try {
            Query query = MatchAllQuery.of(q -> q)._toQuery();
            SearchRequest.Builder searchBuilder = new SearchRequest.Builder().index(index).query(query);

            if (MetricConstant.MAX.equals(type)) {
                Aggregation max = AggregationBuilders.max(m -> m.field(field));
                searchBuilder.aggregations("max", max);
            }
            if (MetricConstant.MIN.equals(type)) {
                Aggregation min = AggregationBuilders.min(m -> m.field(field));
                searchBuilder.aggregations("min", min);
            }
            if (MetricConstant.AVG.equals(type)) {
                Aggregation avg = AggregationBuilders.avg(a -> a.field(field));
                searchBuilder.aggregations("avg", avg);
            }
            if (MetricConstant.SUM.equals(type)) {
                Aggregation sum = AggregationBuilders.sum(s -> s.field(field));
                searchBuilder.aggregations("sum", sum);
            }
            SearchResponse searchResponse = elasticsearchClient.search(searchBuilder.build(), Map.class);
            Map aggregations = searchResponse.aggregations();
            if (MetricConstant.MAX.equals(type)) {
                Aggregate maxAggregate = (Aggregate) aggregations.get(MetricConstant.MAX);
                MaxAggregate max = (MaxAggregate) maxAggregate._get();
                return max.value();
            }
            if (MetricConstant.MIN.equals(type)) {
                Aggregate minAggregate = (Aggregate) aggregations.get(MetricConstant.MIN);
                MinAggregate min = (MinAggregate) minAggregate._get();
                return min.value();
            }
            if (MetricConstant.AVG.equals(type)) {
                Aggregate avgAggregate = (Aggregate) aggregations.get(MetricConstant.AVG);
                AvgAggregate avg = (AvgAggregate) avgAggregate._get();
                return avg.value();
            }
            if (MetricConstant.SUM.equals(type)) {
                Aggregate sumAggregate = (Aggregate) aggregations.get(MetricConstant.SUM);
                SumAggregate sum = (SumAggregate) sumAggregate._get();
                return sum.value();
            }
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>metricQuery>>>>>>{}", e);
        }
        return null;
    }

    /**
     * @param index
     * @param bucketField 分组的字段,字段类型为keyword,bucketField分组别名
     * @description: 分组聚合查询,返回map集合
     * @return: java.util.List<java.util.HashMap < java.lang.String, java.lang.Object>>
     * @author 熟透的蜗牛
     * @date: 2025/1/22 14:42
     */
    public List<Map<String, Object>> bucketQuery(String index, String bucketField, String bucketFieldAlias) {
        List<Map<String, Object>> resultMapList = new ArrayList<>();
        try {
            SearchResponse<Void> response = elasticsearchClient.search(q -> q.index(index).query(query -> query.matchAll(all -> all))
                            .aggregations(bucketFieldAlias, agg -> agg.terms(t -> t.field(bucketField))),
                    Void.class);
            Map<String, Aggregate> aggregationsMap = response.aggregations();
            Aggregate aggregate = aggregationsMap.get(bucketFieldAlias);
            StringTermsAggregate stringTermsAggregate = (StringTermsAggregate) aggregate._get();
            Buckets<StringTermsBucket> buckets = stringTermsAggregate.buckets();
            List<StringTermsBucket> list = (List) buckets._get();
            HashMap<String, Object> resulMap = new HashMap<>();
            int i = 0;
            for (StringTermsBucket st : list) {
                i++;
                JSONObject jsonObject = new JSONObject();
                jsonObject.put(st.key()._toJsonString(), st.docCount());
                resulMap.put(bucketFieldAlias + i, jsonObject);
                resultMapList.add(resulMap);
            }
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>bucketQuery>>>>>>{}", e);
        }
        return resultMapList;
    }

    /**
     * @param index
     * @param bucketField
     * @param bucketFieldAlias 分组的别名
     * @param avgFiled
     * @param avgFiledAlias    平均值的别名
     * @description: 分组后的聚合
     * @author 熟透的蜗牛
     * @date: 2025/1/22 15:20
     */
    public List<Map<String, Object>> subBucketQuery(String index, String bucketField, String bucketFieldAlias, String avgFiled, String avgFiledAlias) {
        List<Map<String, Object>> resultMapList = new ArrayList<>();
        try {
            SearchResponse<Void> response = elasticsearchClient.search(q -> q.index(index).query(query -> query.matchAll(all -> all))
                            .aggregations(bucketFieldAlias, agg -> agg.terms(t -> t.field(bucketField))
                                    .aggregations(avgFiledAlias, avg -> avg.avg(a -> a.field(avgFiled))))
                    , Void.class);
            Map<String, Aggregate> aggregations = response.aggregations();
            log.info(">>>>>>>>>>>>aggregations{}", aggregations);
            Aggregate aggregate = aggregations.get(bucketFieldAlias);
            StringTermsAggregate stringTermsAggregate = (StringTermsAggregate) aggregate._get();
            List<StringTermsBucket> list = (List<StringTermsBucket>) stringTermsAggregate.buckets()._get();
            int i = 0;
            for (StringTermsBucket st : list) {
                HashMap<String, Object> resultMap = new HashMap<>();
                JSONObject jsonObject = new JSONObject();
                Aggregate avgAggregate = st.aggregations().get(avgFiledAlias); //从聚合中取平均值
                AvgAggregate avg = (AvgAggregate) avgAggregate._get();
                jsonObject.put(st.key()._toJsonString(), st.docCount());
                jsonObject.put(avgFiledAlias, avg.value());
                resultMap.put(bucketFieldAlias + i, jsonObject);
                resultMapList.add(resultMap);
            }
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>>>>subBucketQuery>>>>>>{}", e);
        }
        return resultMapList;
    }

    /**
     * @param beanClass
     * @param list
     * @param searchResponse
     * @description: 封装返回数据
     * @return: void
     * @author 熟透的蜗牛
     * @date: 2025/1/15 23:58
     */
    private <T> void queryEsData(Class<T> beanClass, List<T> list, SearchResponse<T> searchResponse)
            throws IOException {
        if (searchResponse.hits().total().value() > 0) {
            List<Hit<T>> hitList = searchResponse.hits().hits();
            for (Hit hit : hitList) {
                // 将 JSON 转换成对象
                T bean = JSON.parseObject(JSON.toJSONString(hit.source()), beanClass);
                list.add(bean);
            }
        }
    }
}

二、测试代码

package com.snail.system.controller;

import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.MultiMatchQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.TermQuery;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.snail.manager.common.utils.ValidateCodeUtils;
import com.snail.system.service.user.User;
import com.wssnail.elasticsearch.util.ElasticSearchUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.*;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: TODO
 * @date 2025/1/16 20:20
 */
@RestController
@Slf4j
public class DemoController {
    @Autowired
    private ElasticSearchUtils elasticSearchUtils;

    @GetMapping("/demo")
    public String test(int type) {
        switch (type) {
            case 1:
                log.info("create......{}", elasticSearchUtils.createIndex("user_index"));
                break;
            case 2:
                log.info("delete......{}", elasticSearchUtils.deleteIndex("user_index"));
                break;
            case 3:
                log.info("isIndexExist......{}", elasticSearchUtils.isIndexExist("user_index"));
                break;
            case 4:
                for (int i = 0; i < 1000; i++) {
                    User user = new User();
                    user.setId("10000" + i);
                    user.setName("tom" + i + ValidateCodeUtils.generateLetter(6));
                    user.setAge((int) (Math.random() * 100));
                    if (i < 50) {
                        user.setAddress("北京市东城区1号路");
                        user.setBirthday(new Date("1990/03/01 00:00:00"));
                    }
                    if (i > 50 && i < 100) {
                        user.setAddress("北京市西城区88号路");
                        user.setBirthday(new Date("1998/03/01 00:00:00"));
                    }
                    if (i > 100 && i < 200) {
                        user.setAddress("北京市朝阳区青年路100号");
                        user.setBirthday(new Date("2000/03/01 00:00:00"));
                    }
                    if (i > 200 && i < 500) {
                        user.setAddress("天津市和平区青年路100号");
                        user.setBirthday(new Date("2005/08/01 00:00:00"));
                    }
                    if (i > 500 && i < 800) {
                        user.setAddress("天津市河西区青年路88号");
                    }
                    if (i > 800) {
                        user.setAddress("石家庄市桥西区青年路100号");
                    }
                    String id = elasticSearchUtils.submitDocument(user, "user_index", user.getId());
                    log.info("id:{}", id);
                }
                break;
            case 5:
                log.info("是否存在数据>>>>{}", elasticSearchUtils.existsById("user_index", "1000035"));
                break;
            case 6:
                log.info("根据地删除数据>>>>{}", elasticSearchUtils.deleteDocumentById("user_index", "1000036"));
                break;
            case 7:
                User user = new User("tom356666666", 20, "上海市闸北区霞飞路66号");
                log.info("根据id修改数据>>>>>{}", elasticSearchUtils.updateDocumentById(user, "user_index", "1000035"));
                break;
            case 8:
                log.info("时实修改数据>>>>>>>{}", elasticSearchUtils.updateDocumentByIdNoRealTime(User.builder().name("66666666").build(), "user_index", "1000037"));
                break;
            case 9:
                log.info("根据id查询数据>>>>>{}", JSON.to(User.class, elasticSearchUtils.searchDocumentById("user_index", "1000035", new ArrayList<>(), new ArrayList<>())));
                break;
            case 10:
                log.info("根据id查询数据返回指定字段>>>>>{}", JSON.to(User.class, elasticSearchUtils.searchDocumentById("user_index", "1000035", Arrays.asList("name", "address"), new ArrayList<>())));
                break;
            case 11:
                //批量插入
                ArrayList<Map> mapList = new ArrayList<>();
                for (int i = 0; i < 100; i++) {
                    HashMap<Object, Object> userMap = new HashMap<>();
                    User user1 = new User();
                    user1.setId("1000000" + i);
                    user1.setName(ValidateCodeUtils.generateLetter(10));
                    user1.setAge((int) (Math.random() * 100));
                    user1.setAddress("南京市淮河区夫子庙35号");
                    user1.setBirthday(new Date("2010/10/01 00:00:00"));
                    userMap.put("data", user1);
                    userMap.put("id", user1.getId());
                    mapList.add(userMap);
                }
                log.info(">>>>>>>>>>>>>>>批量插入结果{}", elasticSearchUtils.bulkPost("user_index", mapList));
                break;
            case 12:
//                Query termQuery = TermQuery.of(t -> t.field("address").value("北京"))._toQuery();
//                Query rangeQuery = RangeQuery.of(age -> age.field("age").gte(JsonData.of(60)))._toQuery();
                Query multiQuery = MultiMatchQuery.of(m -> m.fields("address").query("南京"))._toQuery();
                String[] highlightField = {"address"};
                List<Hit<Map>> hits = elasticSearchUtils.searchHighLightFieldDocument("user_index", highlightField, 1, 20, multiQuery);
                List<User> result = new ArrayList<>();
                for (Hit<Map> hit : hits) {
                    Map source = hit.source();
                    User userResult = JSONObject.parseObject(JSONObject.toJSONString(source), User.class);
                    Map<String, List<String>> highlight = hit.highlight();
                    userResult.setAddress(CollectionUtils.isEmpty(highlight.get("address")) ? userResult.getAddress() : highlight.get("address").get(0));
                    userResult.setAge(CollectionUtils.isEmpty(highlight.get("age")) ? userResult.getAge() : Integer.valueOf(highlight.get("age").get(0)));
                    result.add(userResult);
                }
                log.info(">>>>>>>>>>>>>>>>>数据结果{}", result);
                break;
            case 13:
                log.info(">>>>>>>>精确查询结果{}", elasticSearchUtils.termQuery("user_index", "address", "北京市东城区1号路", User.class, 1, 30));
                break;
            case 14:
                //自定义查询
                Query boolQuery = BoolQuery.of(b -> b.must(name -> name.term(n -> n.field("name.keyword").value("tom38ictwvq")))
                        .must(m -> m.range(age -> age.field("age").gte(JsonData.of(60)).lte(JsonData.of(80))))
                        .should(s -> s.term(address -> address.field("address").value("南京市淮河区夫子庙35号"))))._toQuery();
                log.info(">>>>>>>>>>自定义查询{}", elasticSearchUtils.selfDefine("user_index", User.class, boolQuery));
                break;
            case 15:
                //查询文档数量
                Query ageQuery = TermQuery.of(t -> t.field("age").value("63"))._toQuery();
                log.info(">>>>>>>>文档数量{}", elasticSearchUtils.count("user_index", ageQuery));
                break;
            case 16:
                List<FieldValue> list = new ArrayList<>();
                list.add(FieldValue.of("tom58dhmceA"));
                list.add(FieldValue.of("tom38ictwvq"));
                log.info(">>>>>>>多值查询{}", elasticSearchUtils.termsQuery("user_index", "name.keyword", list, User.class, 1, 20));
                break;
            case 17:
                log.info(">>>>>>>>>匹配所有{}", elasticSearchUtils.matchAllQuery("user_index", User.class, 1, 20));
                break;
            case 18:
                log.info(">>>>>>>>>phrase匹配{}", elasticSearchUtils.matchPhraseQuery("user_index", User.class, "address", "南京", 1, 20));
                break;
            case 19:
                log.info(">>>>>>>>>多字段匹配{}", elasticSearchUtils.matchMultiQuery("user_index", User.class, new String[]{"address", "age"}, "88", 1, 20));
                break;
            case 20:
                log.info("wildcardQuery>>>>>>{}", elasticSearchUtils.wildcardQuery("user_index", User.class, "name", "tom*", 1, 20));
                break;
            case 21:
                log.info("fuzzyQuery>>>>>>{}", elasticSearchUtils.fuzzyQuery("user_index", User.class, "name", "tom38ictvq", 1, 20, 5, 5));
                break;
            case 22:
                log.info("avg>>>>>>{}", elasticSearchUtils.metricQuery("user_index", "avg", "age"));
                log.info("max>>>>>>{}", elasticSearchUtils.metricQuery("user_index", "max", "age"));
                log.info("min>>>>>>{}", elasticSearchUtils.metricQuery("user_index", "min", "age"));
                log.info("sum>>>>>>{}", elasticSearchUtils.metricQuery("user_index", "sum", "age"));
                break;
            case 23:
                log.info("聚合分组查询>>>>>>>{}", elasticSearchUtils.bucketQuery("user_index", "address.keyword", "address"));
                break;
            case 24:
                log.info(">>>>>>分组后的聚合查询>>{}", elasticSearchUtils.subBucketQuery("user_index", "address.keyword", "address", "age", "age_avg"));
                break;
            default:
                break;
        }
        return "success";
    }

}


http://www.kler.cn/a/514491.html

相关文章:

  • nslookup在内网渗透的使用
  • SpringCloud微服务Gateway网关简单集成Sentinel
  • DDD - 整洁架构_解决技术设计困局
  • PyQt5 超详细入门级教程上篇
  • STM32 FreeROTS Tickless低功耗模式
  • 高频词汇背诵 Day1
  • 【深度学习基础】多层感知机 | 权重衰减
  • C++11左值引用和右值引用
  • 神经网络基础详解
  • 【Maui】注销用户,采用“手势”点击label弹窗选择
  • 打造你的第一个AI Agent:从需求分析到架构设计
  • 【AI编辑器】字节跳动推出AI IDE——Trae,专为中文开发者深度定制
  • Powershell语言的云计算
  • (五)Mapbox GL 中图层`paint`的使用方法和使用场景初识
  • Llama 3:开源大模型的里程碑式突破
  • MDX语言的嵌入式系统
  • C++ initializer_list 列表初始化(八股总结)
  • 深度探索 Java 的字节码增强
  • 如何在Anaconda中顺利安装PyTorch:解决依赖问题的攻略
  • Day 15 卡玛笔记
  • 30天开发操作系统 第 17 天 -- 命令行窗口
  • Linux下 date时间应该与系统的 RTC(硬件时钟)同步
  • 什么是 Flask 的蓝图(Blueprint)
  • Windows远程连接Docker服务
  • openssl 生成证书 windows导入证书
  • 大数据Hadoop中MapReduce的介绍包括编程模型、工作原理(MapReduce、MapTask、ReduceTask、Shuffle工作原理)