一、ElasticSearchUtils
package com.wssnail.elasticsearch.util;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.Refresh;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchAllQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.TermsQuery;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.ElasticsearchIndicesClient;
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wssnail.elasticsearch.constants.MetricConstant;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.*;
/**
* @Author: 熟透的蜗牛
* @CreateTime: 2023-01-05 16:47
* @Description: TODO
* @Version: 1.0
*/
@Slf4j
public class ElasticSearchUtils {
private ElasticsearchClient elasticsearchClient;
private static final Integer MAX_PAGE_SIZE = 1000;
public ElasticSearchUtils(ElasticsearchClient elasticsearchClient) {
this.elasticsearchClient = elasticsearchClient;
}
/*
* @description:创建索引
* @author: 熟透的蜗牛
* @date: 2023/1/5 16:58
* @param index
* @return: boolean
**/
public boolean createIndex(String index) {
try {
if (isIndexExist(index)) {
log.error("Index is exits!");
return false;
}
//1.创建索引请求
CreateIndexResponse indexResponse = elasticsearchClient.indices().create(c -> c.index(index));
return indexResponse.acknowledged();
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>createIndex>>>id>>>>>>{}", e);
}
return false;
}
/*
* @description:判断索引是否存在
* @author: 熟透的蜗牛
* @date: 2023/1/5 16:59
* @param index
* @return: boolean
**/
public boolean isIndexExist(String index) {
try {
ElasticsearchIndicesClient indices = elasticsearchClient.indices();
BooleanResponse response = indices.exists(ExistsRequest.of(x -> x.index(index)));
return response.value();
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>isIndexExist>>>id>>>>>>{}", e);
}
return false;
}
/*
* @description:删除索引
* @author: 熟透的蜗牛
* @date: 2023/1/5 17:00
* @param null
* @return: null
**/
public boolean deleteIndex(String index) {
try {
if (!isIndexExist(index)) {
log.error("Index is not exits!");
return false;
}
DeleteIndexResponse response = elasticsearchClient.indices().delete(x -> x.index(index));
return response.acknowledged();
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>deleteIndex>>>id>>>>>>{}", e);
}
return false;
}
/*
* @description:更新数据,如果没有数据则新增
* @author: 熟透的蜗牛
* @date: 2023/1/5 17:01
* @param object 对象
* @param index 索引名称
* @param id 数据id
* @return: java.lang.String
**/
public String submitDocument(Object object, String index, String id) {
if (null == id) {
return addDocument(object, index);
}
if (this.existsById(index, id)) {
return this.updateDocumentByIdNoRealTime(object, index, id);
} else {
return addDocument(object, index, id);
}
}
/*
* @description:新增数据
* @author: 熟透的蜗牛
* @date: 2023/1/5 17:03
* @param object
* @param index
* @param id 数据id,为空时自动生成
* @return: java.lang.String
**/
public String addDocument(Object object, String index, String id) {
try {
if (null == id) {
return addDocument(object, index);
}
if (this.existsById(index, id)) {
return this.updateDocumentByIdNoRealTime(object, index, id);
}
IndexResponse indexResponse = elasticsearchClient.index(IndexRequest.of(x -> x.id(id).document(object).index(index)));
log.info("添加数据成功{}", indexResponse.result().jsonValue());
return id;
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>addDocument>>>id>>>>>>{}", e);
}
return null;
}
/*
* @description:新增数据,id用UUID生成
* @author: 熟透的蜗牛
* @date: 2023/1/5 17:05
* @param object
* @param index
* @return: java.lang.String
**/
public String addDocument(Object object, String index) {
try {
return addDocument(object, index, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>addDocument>>>>>>{}", e);
}
return null;
}
/*
* @description:根据id判断文档是否存在
* @author: 熟透的蜗牛
* @date: 2023/1/5 17:07
* @param index
* @param id
* @return: boolean
**/
public boolean existsById(String index, String id) {
try {
BooleanResponse response = elasticsearchClient.existsSource(ExistsSourceRequest.of(builder -> builder.index(index).id(id)));
return response.value();
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>existsById>>>>>>{}", e);
}
return false;
}
/*
* @description:通过id删除数据
* @author: 熟透的蜗牛
* @date: 2023/1/5 17:08
* @param index
* @param id
* @return: java.lang.String
**/
public String deleteDocumentById(String index, String id) {
try {
DeleteResponse delete = elasticsearchClient.delete(DeleteRequest.of(builder -> builder.id(id).index(index)));
log.info("删除数据成功 >>>>>>>>id为{}", delete.id());
return delete.id();
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>deleteDocumentById>>>>>>{}", e);
}
return null;
}
/*
* @description:通过id修改数据
* @author: 熟透的蜗牛
* @date: 2023/1/5 17:09
* @param null
* @return: null
**/
public String updateDocumentById(Object object, String index, String id) {
try {
UpdateRequest<Object, Object> updateRequest = new UpdateRequest.Builder<>().doc(object).index(index).id(id).build();
UpdateResponse<Object> updateResponse = elasticsearchClient.update(updateRequest, object.getClass());
log.info("索引为: {}, id为: {},updateResponseID:{}, 实时更新数据成功", index, id, updateResponse.id());
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>updateDocumentById>>>>>>{}", e);
}
return null;
}
/*
* @description:通过id实时更新数据
* @author: 熟透的蜗牛
* @date: 2023/1/5 17:11
* @param object
* @param index
* @param id
* @return: java.lang.String
**/
public String updateDocumentByIdNoRealTime(Object object, String index, String id) {
try {
UpdateResponse<?> updateResponse = elasticsearchClient.update(x -> x.id(id).index(index).doc(object)
.refresh(Refresh.WaitFor).timeout(t -> t.time("1s")), object.getClass());
//执行更新请求
log.info("索引为: {}, id为: {},updateResponseID:{}, 实时更新数据成功", index, id, updateResponse.id());
return updateResponse.id();
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>updateDocumentByIdNoRealTime>>>>>>{}", e);
}
return null;
}
/**
* @param index
* @param id
* @param includeFields 指定返回字段
* @param excludeFields 指定排除的字段
* @description: 根据id查询指定数据
* @return: java.util.Map
* @author 熟透的蜗牛
* @date: 2025/1/19 17:39
*/
public Map searchDocumentById(String index, String id, List<String> includeFields, List<String> excludeFields) {
try {
GetResponse<Map> response = elasticsearchClient.get(builder -> builder.index(index).id(id).sourceIncludes(includeFields).sourceExcludes(excludeFields), Map.class);
return response.source();
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>查询失败searchDocumentById>>>>>>{}", e);
}
return new HashMap();
}
/**
* @param index
* @param objects
* @description:批量插入,单词不能超20w
* @return: boolean
* @author 熟透的蜗牛
* @date: 2025/1/19 18:07
*/
public boolean bulkPost(String index, List<Map> objects) {
try {
BulkRequest.Builder br = new BulkRequest.Builder();
objects.forEach(list -> {
br.operations(op -> op
.index(idx -> idx.index(index).document(list.get("data")).id((String) list.get("id"))));
});
BulkResponse result = elasticsearchClient.bulk(br.build());
if (result.errors()) {
log.info(">>>>>>>>>>>>>>>批量插入有误");
for (BulkResponseItem item : result.items()) {
if (item.error() != null) {
log.error(item.error().reason());
}
}
return result.errors();
}
return true;
} catch (Exception e) {
log.error(">>>>>>异常信息{}", e);
return false;
}
}
/**
* @param index
* @param highlightField 需要高亮的字段
* @param start
* @param pageSize
* @param query
* @description: 高亮查询
* @return: java.util.List<java.lang.Object>
* @author 熟透的蜗牛
* @date: 2025/1/19 20:26
*/
public List<Hit<Map>> searchHighLightFieldDocument(String index,
String[] highlightField, Integer start, Integer pageSize, Query... query) {
List<Hit<Map>> hits = new ArrayList<>();
try {
SearchRequest.Builder searchBuilder = new SearchRequest.Builder().index(index);
for (int i = 0; i < highlightField.length; i++) {
int finalI = i;
searchBuilder.highlight(builder -> builder.preTags("<span style='color:red'>")
.postTags("</span>").fields(highlightField[finalI], h -> h));
}
for (Query q : query) {
if (q._kind().equals(Query.Kind.Bool)) {
searchBuilder.query(bb -> bb.bool(b -> b.must(q)));
}
if (q._kind().equals(Query.Kind.Range)) {
q.range();
}
if (q._kind().equals(Query.Kind.Term)) {
q.term();
}
if (q._kind().equals(Query.Kind.MultiMatch)) {
q.multiMatch();
}
searchBuilder.query(q);
}
searchBuilder.size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize);
searchBuilder.from((start - 1) * pageSize);
SearchResponse searchResponse = elasticsearchClient.search(searchBuilder.build(), Map.class);
log.info(">>>>>>>>>>>>>searchResponse{}", searchResponse);
if (searchResponse.hits().total().value() > 0) {
hits = searchResponse.hits().hits();
}
} catch (IOException e) {
log.info(">>>>>>>>>>>>>>>searchListDocument>>>>>>{}", e);
}
return hits;
}
/**
* @param index
* @param field 需要查询的字段
* @param value 查询字段的值
* @param beanClass
* @param start
* @param pageSize
* @description:按条件精确查询
* @return: java.util.List<T>
* @author 熟透的蜗牛
* @date: 2025/1/21 1:11
*/
public <T> List<T> termQuery(String index, String field, String value, Class<T> beanClass, Integer start, Integer pageSize) {
List<T> list = new ArrayList<>();
try {
Query query = MatchQuery.of(m -> m
.field(field)
.query(value)
)._toQuery();
SearchResponse<T> response = elasticsearchClient.search(s -> s
.index(index)
.query(q -> q
.bool(b -> b
.must(query)
)
).from((start - 1) * pageSize).size(pageSize),
beanClass
);
this.queryEsData(beanClass, list, response);
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>termQuery>>>>>>{}", e);
}
return list;
}
/*
* @description:自定义查询
* @author: 熟透的蜗牛
* @date: 2023/1/9 15:49
* @param index
* @param beanClass
* @param query 查询条件
* @return: java.util.List<T>
**/
public <T> List<T> selfDefine(String index, Class<T> beanClass, Query query) {
List<T> list = new ArrayList<>();
try {
SearchResponse<T> response = elasticsearchClient.search(q -> q.index(index).query(query), beanClass);
this.queryEsData(beanClass, list, response);
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>termQuery>>>>>>{}", e);
}
return list;
}
/**
* @param indexName
* @param query
* @description: 查询文档数量
* @return: long
* @author 熟透的蜗牛
* @date: 2025/1/21 2:11
*/
public <T> long count(String indexName, Query query) {
try {
CountResponse response = elasticsearchClient.count(q -> q.index(indexName).query(query));
return response.count();
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>count>>>>>>{}", e);
}
return 0;
}
/*
* @description:多值匹配
* @author: 熟透的蜗牛
* @date: 2023/1/9 14:39
* @param index
* @param field 字段
* @param dataArgs 数值
* @param beanClass 转化实体类
* @param start
* @param pageSize
* @return: java.util.List<T>
**/
public <T> List<T> termsQuery(String index, String field, List<FieldValue> fieldValues, Class<T> beanClass, Integer start, Integer pageSize) {
// 查询的数据列表
List<T> list = new ArrayList<>();
try {
TermsQuery termsQuery = TermsQuery.of(terms -> terms.field(field).terms(value -> value.value(fieldValues)));
SearchResponse<T> response = elasticsearchClient.search(builder ->
builder.index(index).query(terms -> terms.terms(termsQuery))
.from((start - 1) * pageSize)
.size(pageSize)
.trackTotalHits(f -> f.enabled(true))
, beanClass);
queryEsData(beanClass, list, response);
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>termsQuery>>>>>>{}", e);
}
return list;
}
/**
* @param index
* @param beanClass
* @param start
* @param pageSize
* @description: match_all查询
* @return: java.util.List<T>
* @author 熟透的蜗牛
* @date: 2025/1/22 0:10
*/
public <T> List<T> matchAllQuery(String index, Class<T> beanClass, int start, int pageSize) {
// 查询的数据列表
List<T> list = new ArrayList<>();
try {
SearchResponse<T> searchResponse = elasticsearchClient.search(
q -> q.index(index)
.query(matchAll -> matchAll
.matchAll(all -> all))
.from((start - 1) * pageSize)
.size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
.trackTotalHits(f -> f.enabled(true)),
beanClass);
queryEsData(beanClass, list, searchResponse);
} catch (IOException e) {
log.info(">>>>>>>>>>>>>>>matchAllQuery>>>>>>{}", e);
}
return list;
}
/**
* @param index
* @param beanClass
* @param field
* @param value
* @description:词组查询
* @return: java.util.List<T>
* @author 熟透的蜗牛
* @date: 2025/1/22 0:08
*/
public <T> List<T> matchPhraseQuery(String index, Class<T> beanClass, String field, Object value, int start, int pageSize) {
List<T> list = new ArrayList<>();
try {
SearchResponse<T> response = elasticsearchClient.search(q ->
q.index(index).query(
pp -> pp.matchPhrase(
p -> p.field(field)
.query(String.valueOf(value))))
.from((start - 1) * pageSize)
.size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
.trackTotalHits(f -> f.enabled(true)),
beanClass);
queryEsData(beanClass, list, response);
} catch (IOException e) {
log.info(">>>>>>>>>>>>>>>matchPhraseQuery>>>>>>{}", e);
}
return list;
}
/*
* @description:内容在多字段查询
* @author: 熟透的蜗牛
* @date: 2023/1/9 15:27
* @param index
* @param beanClass
* @param fields
* @param text
* @param start 起始位置
* @param pageSize 页容量
* @return: java.util.List<T>
**/
public <T> List<T> matchMultiQuery(String index, Class<T> beanClass, String[] fields, Object text, Integer start, Integer pageSize) {
// 查询的数据列表
List<T> list = new ArrayList<>();
try {
SearchResponse<T> response = elasticsearchClient.search(q -> q.index(index)
.query(multiMatch ->
multiMatch.multiMatch(m ->
m.fields(Arrays.asList(fields))
.query(text.toString())))
.from((start - 1) * pageSize)
.size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
.trackTotalHits(f -> f.enabled(true)),
beanClass);
queryEsData(beanClass, list, response);
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>matchMultiQuery>>>>>>{}", e);
}
return list;
}
/*
* @description: 通配符查询(wildcard):会对查询条件进行分词。还可以使用通配符 ?(任意单个字符) 和 * (0个或多个字符)
* *:表示多个字符(0个或多个字符)
* ?:表示单个字符
* @author: 熟透的蜗牛
* @date: 2023/1/6 15:02
* @param index
* @param beanClass
* @param field
* @param text
* @return: java.util.List<T>
**/
public <T> List<T> wildcardQuery(String index, Class<T> beanClass, String field, String text, Integer start, Integer pageSize) {
// 查询的数据列表
List<T> list = new ArrayList<>();
try {
SearchResponse<T> response = elasticsearchClient.search(q -> q.index(index)
.query(wildcard -> wildcard.wildcard(w ->
w.field(field)
.value(text.toString())))
.from((start - 1) * pageSize)
.size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
.trackTotalHits(f -> f.enabled(true)),
beanClass);
queryEsData(beanClass, list, response);
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>wildcardQuery>>>>>>{}", e);
}
return list;
}
/**
* @param index
* @param beanClass
* @param field
* @param text
* @param start
* @param pageSize
* @param prefixLength 指定匹配的前缀长度,前面多少个字符需要完全匹配
* @param maxExpansion 限制查询扩展的最大数量。用于控制在查询中允许的最大变体数量
* @description: 模糊匹配
* @return: java.util.List<T>
* @author 熟透的蜗牛
* @date: 2025/1/22 1:02
*/
public <T> List<T> fuzzyQuery(String index, Class<T> beanClass, String field, String text, Integer start,
Integer pageSize, Integer prefixLength, Integer maxExpansion) {
// 查询的数据列表
List<T> list = new ArrayList<>();
try {
SearchResponse<T> response = elasticsearchClient.search(q -> q.index(index)
.query(fuzzy ->
fuzzy.fuzzy(f ->
f.field(field)
.value(text.toString())
.fuzziness("auto")
.prefixLength(prefixLength)
.maxExpansions(maxExpansion)))
.from((start - 1) * pageSize)
.size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
.trackTotalHits(f -> f.enabled(true)),
beanClass);
queryEsData(beanClass, list, response);
} catch (IOException e) {
log.info(">>>>>>>>>>>>>>>fuzzyQuery>>>>>>{}", e);
}
return list;
}
/*
* @description:聚合查询
* @author: 熟透的蜗牛
* @date: 2023/1/6 15:34
* @param index
* @param type 常用的操作有:avg:求平均、max:最大值、min:最小值、sum:求和
* @param field
**/
public Double metricQuery(String index, String type, String field) {
try {
Query query = MatchAllQuery.of(q -> q)._toQuery();
SearchRequest.Builder searchBuilder = new SearchRequest.Builder().index(index).query(query);
if (MetricConstant.MAX.equals(type)) {
Aggregation max = AggregationBuilders.max(m -> m.field(field));
searchBuilder.aggregations("max", max);
}
if (MetricConstant.MIN.equals(type)) {
Aggregation min = AggregationBuilders.min(m -> m.field(field));
searchBuilder.aggregations("min", min);
}
if (MetricConstant.AVG.equals(type)) {
Aggregation avg = AggregationBuilders.avg(a -> a.field(field));
searchBuilder.aggregations("avg", avg);
}
if (MetricConstant.SUM.equals(type)) {
Aggregation sum = AggregationBuilders.sum(s -> s.field(field));
searchBuilder.aggregations("sum", sum);
}
SearchResponse searchResponse = elasticsearchClient.search(searchBuilder.build(), Map.class);
Map aggregations = searchResponse.aggregations();
if (MetricConstant.MAX.equals(type)) {
Aggregate maxAggregate = (Aggregate) aggregations.get(MetricConstant.MAX);
MaxAggregate max = (MaxAggregate) maxAggregate._get();
return max.value();
}
if (MetricConstant.MIN.equals(type)) {
Aggregate minAggregate = (Aggregate) aggregations.get(MetricConstant.MIN);
MinAggregate min = (MinAggregate) minAggregate._get();
return min.value();
}
if (MetricConstant.AVG.equals(type)) {
Aggregate avgAggregate = (Aggregate) aggregations.get(MetricConstant.AVG);
AvgAggregate avg = (AvgAggregate) avgAggregate._get();
return avg.value();
}
if (MetricConstant.SUM.equals(type)) {
Aggregate sumAggregate = (Aggregate) aggregations.get(MetricConstant.SUM);
SumAggregate sum = (SumAggregate) sumAggregate._get();
return sum.value();
}
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>metricQuery>>>>>>{}", e);
}
return null;
}
/**
* @param index
* @param bucketField 分组的字段,字段类型为keyword,bucketField分组别名
* @description: 分组聚合查询,返回map集合
* @return: java.util.List<java.util.HashMap < java.lang.String, java.lang.Object>>
* @author 熟透的蜗牛
* @date: 2025/1/22 14:42
*/
public List<Map<String, Object>> bucketQuery(String index, String bucketField, String bucketFieldAlias) {
List<Map<String, Object>> resultMapList = new ArrayList<>();
try {
SearchResponse<Void> response = elasticsearchClient.search(q -> q.index(index).query(query -> query.matchAll(all -> all))
.aggregations(bucketFieldAlias, agg -> agg.terms(t -> t.field(bucketField))),
Void.class);
Map<String, Aggregate> aggregationsMap = response.aggregations();
Aggregate aggregate = aggregationsMap.get(bucketFieldAlias);
StringTermsAggregate stringTermsAggregate = (StringTermsAggregate) aggregate._get();
Buckets<StringTermsBucket> buckets = stringTermsAggregate.buckets();
List<StringTermsBucket> list = (List) buckets._get();
HashMap<String, Object> resulMap = new HashMap<>();
int i = 0;
for (StringTermsBucket st : list) {
i++;
JSONObject jsonObject = new JSONObject();
jsonObject.put(st.key()._toJsonString(), st.docCount());
resulMap.put(bucketFieldAlias + i, jsonObject);
resultMapList.add(resulMap);
}
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>bucketQuery>>>>>>{}", e);
}
return resultMapList;
}
/**
* @param index
* @param bucketField
* @param bucketFieldAlias 分组的别名
* @param avgFiled
* @param avgFiledAlias 平均值的别名
* @description: 分组后的聚合
* @author 熟透的蜗牛
* @date: 2025/1/22 15:20
*/
public List<Map<String, Object>> subBucketQuery(String index, String bucketField, String bucketFieldAlias, String avgFiled, String avgFiledAlias) {
List<Map<String, Object>> resultMapList = new ArrayList<>();
try {
SearchResponse<Void> response = elasticsearchClient.search(q -> q.index(index).query(query -> query.matchAll(all -> all))
.aggregations(bucketFieldAlias, agg -> agg.terms(t -> t.field(bucketField))
.aggregations(avgFiledAlias, avg -> avg.avg(a -> a.field(avgFiled))))
, Void.class);
Map<String, Aggregate> aggregations = response.aggregations();
log.info(">>>>>>>>>>>>aggregations{}", aggregations);
Aggregate aggregate = aggregations.get(bucketFieldAlias);
StringTermsAggregate stringTermsAggregate = (StringTermsAggregate) aggregate._get();
List<StringTermsBucket> list = (List<StringTermsBucket>) stringTermsAggregate.buckets()._get();
int i = 0;
for (StringTermsBucket st : list) {
HashMap<String, Object> resultMap = new HashMap<>();
JSONObject jsonObject = new JSONObject();
Aggregate avgAggregate = st.aggregations().get(avgFiledAlias); //从聚合中取平均值
AvgAggregate avg = (AvgAggregate) avgAggregate._get();
jsonObject.put(st.key()._toJsonString(), st.docCount());
jsonObject.put(avgFiledAlias, avg.value());
resultMap.put(bucketFieldAlias + i, jsonObject);
resultMapList.add(resultMap);
}
} catch (Exception e) {
log.info(">>>>>>>>>>>>>>>subBucketQuery>>>>>>{}", e);
}
return resultMapList;
}
/**
* @param beanClass
* @param list
* @param searchResponse
* @description: 封装返回数据
* @return: void
* @author 熟透的蜗牛
* @date: 2025/1/15 23:58
*/
private <T> void queryEsData(Class<T> beanClass, List<T> list, SearchResponse<T> searchResponse)
throws IOException {
if (searchResponse.hits().total().value() > 0) {
List<Hit<T>> hitList = searchResponse.hits().hits();
for (Hit hit : hitList) {
// 将 JSON 转换成对象
T bean = JSON.parseObject(JSON.toJSONString(hit.source()), beanClass);
list.add(bean);
}
}
}
}
二、测试代码
package com.snail.system.controller;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.MultiMatchQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.TermQuery;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.snail.manager.common.utils.ValidateCodeUtils;
import com.snail.system.service.user.User;
import com.wssnail.elasticsearch.util.ElasticSearchUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
/**
* @author 熟透的蜗牛
* @version 1.0
* @description: TODO
* @date 2025/1/16 20:20
*/
@RestController
@Slf4j
public class DemoController {
@Autowired
private ElasticSearchUtils elasticSearchUtils;
@GetMapping("/demo")
public String test(int type) {
switch (type) {
case 1:
log.info("create......{}", elasticSearchUtils.createIndex("user_index"));
break;
case 2:
log.info("delete......{}", elasticSearchUtils.deleteIndex("user_index"));
break;
case 3:
log.info("isIndexExist......{}", elasticSearchUtils.isIndexExist("user_index"));
break;
case 4:
for (int i = 0; i < 1000; i++) {
User user = new User();
user.setId("10000" + i);
user.setName("tom" + i + ValidateCodeUtils.generateLetter(6));
user.setAge((int) (Math.random() * 100));
if (i < 50) {
user.setAddress("北京市东城区1号路");
user.setBirthday(new Date("1990/03/01 00:00:00"));
}
if (i > 50 && i < 100) {
user.setAddress("北京市西城区88号路");
user.setBirthday(new Date("1998/03/01 00:00:00"));
}
if (i > 100 && i < 200) {
user.setAddress("北京市朝阳区青年路100号");
user.setBirthday(new Date("2000/03/01 00:00:00"));
}
if (i > 200 && i < 500) {
user.setAddress("天津市和平区青年路100号");
user.setBirthday(new Date("2005/08/01 00:00:00"));
}
if (i > 500 && i < 800) {
user.setAddress("天津市河西区青年路88号");
}
if (i > 800) {
user.setAddress("石家庄市桥西区青年路100号");
}
String id = elasticSearchUtils.submitDocument(user, "user_index", user.getId());
log.info("id:{}", id);
}
break;
case 5:
log.info("是否存在数据>>>>{}", elasticSearchUtils.existsById("user_index", "1000035"));
break;
case 6:
log.info("根据地删除数据>>>>{}", elasticSearchUtils.deleteDocumentById("user_index", "1000036"));
break;
case 7:
User user = new User("tom356666666", 20, "上海市闸北区霞飞路66号");
log.info("根据id修改数据>>>>>{}", elasticSearchUtils.updateDocumentById(user, "user_index", "1000035"));
break;
case 8:
log.info("时实修改数据>>>>>>>{}", elasticSearchUtils.updateDocumentByIdNoRealTime(User.builder().name("66666666").build(), "user_index", "1000037"));
break;
case 9:
log.info("根据id查询数据>>>>>{}", JSON.to(User.class, elasticSearchUtils.searchDocumentById("user_index", "1000035", new ArrayList<>(), new ArrayList<>())));
break;
case 10:
log.info("根据id查询数据返回指定字段>>>>>{}", JSON.to(User.class, elasticSearchUtils.searchDocumentById("user_index", "1000035", Arrays.asList("name", "address"), new ArrayList<>())));
break;
case 11:
//批量插入
ArrayList<Map> mapList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
HashMap<Object, Object> userMap = new HashMap<>();
User user1 = new User();
user1.setId("1000000" + i);
user1.setName(ValidateCodeUtils.generateLetter(10));
user1.setAge((int) (Math.random() * 100));
user1.setAddress("南京市淮河区夫子庙35号");
user1.setBirthday(new Date("2010/10/01 00:00:00"));
userMap.put("data", user1);
userMap.put("id", user1.getId());
mapList.add(userMap);
}
log.info(">>>>>>>>>>>>>>>批量插入结果{}", elasticSearchUtils.bulkPost("user_index", mapList));
break;
case 12:
// Query termQuery = TermQuery.of(t -> t.field("address").value("北京"))._toQuery();
// Query rangeQuery = RangeQuery.of(age -> age.field("age").gte(JsonData.of(60)))._toQuery();
Query multiQuery = MultiMatchQuery.of(m -> m.fields("address").query("南京"))._toQuery();
String[] highlightField = {"address"};
List<Hit<Map>> hits = elasticSearchUtils.searchHighLightFieldDocument("user_index", highlightField, 1, 20, multiQuery);
List<User> result = new ArrayList<>();
for (Hit<Map> hit : hits) {
Map source = hit.source();
User userResult = JSONObject.parseObject(JSONObject.toJSONString(source), User.class);
Map<String, List<String>> highlight = hit.highlight();
userResult.setAddress(CollectionUtils.isEmpty(highlight.get("address")) ? userResult.getAddress() : highlight.get("address").get(0));
userResult.setAge(CollectionUtils.isEmpty(highlight.get("age")) ? userResult.getAge() : Integer.valueOf(highlight.get("age").get(0)));
result.add(userResult);
}
log.info(">>>>>>>>>>>>>>>>>数据结果{}", result);
break;
case 13:
log.info(">>>>>>>>精确查询结果{}", elasticSearchUtils.termQuery("user_index", "address", "北京市东城区1号路", User.class, 1, 30));
break;
case 14:
//自定义查询
Query boolQuery = BoolQuery.of(b -> b.must(name -> name.term(n -> n.field("name.keyword").value("tom38ictwvq")))
.must(m -> m.range(age -> age.field("age").gte(JsonData.of(60)).lte(JsonData.of(80))))
.should(s -> s.term(address -> address.field("address").value("南京市淮河区夫子庙35号"))))._toQuery();
log.info(">>>>>>>>>>自定义查询{}", elasticSearchUtils.selfDefine("user_index", User.class, boolQuery));
break;
case 15:
//查询文档数量
Query ageQuery = TermQuery.of(t -> t.field("age").value("63"))._toQuery();
log.info(">>>>>>>>文档数量{}", elasticSearchUtils.count("user_index", ageQuery));
break;
case 16:
List<FieldValue> list = new ArrayList<>();
list.add(FieldValue.of("tom58dhmceA"));
list.add(FieldValue.of("tom38ictwvq"));
log.info(">>>>>>>多值查询{}", elasticSearchUtils.termsQuery("user_index", "name.keyword", list, User.class, 1, 20));
break;
case 17:
log.info(">>>>>>>>>匹配所有{}", elasticSearchUtils.matchAllQuery("user_index", User.class, 1, 20));
break;
case 18:
log.info(">>>>>>>>>phrase匹配{}", elasticSearchUtils.matchPhraseQuery("user_index", User.class, "address", "南京", 1, 20));
break;
case 19:
log.info(">>>>>>>>>多字段匹配{}", elasticSearchUtils.matchMultiQuery("user_index", User.class, new String[]{"address", "age"}, "88", 1, 20));
break;
case 20:
log.info("wildcardQuery>>>>>>{}", elasticSearchUtils.wildcardQuery("user_index", User.class, "name", "tom*", 1, 20));
break;
case 21:
log.info("fuzzyQuery>>>>>>{}", elasticSearchUtils.fuzzyQuery("user_index", User.class, "name", "tom38ictvq", 1, 20, 5, 5));
break;
case 22:
log.info("avg>>>>>>{}", elasticSearchUtils.metricQuery("user_index", "avg", "age"));
log.info("max>>>>>>{}", elasticSearchUtils.metricQuery("user_index", "max", "age"));
log.info("min>>>>>>{}", elasticSearchUtils.metricQuery("user_index", "min", "age"));
log.info("sum>>>>>>{}", elasticSearchUtils.metricQuery("user_index", "sum", "age"));
break;
case 23:
log.info("聚合分组查询>>>>>>>{}", elasticSearchUtils.bucketQuery("user_index", "address.keyword", "address"));
break;
case 24:
log.info(">>>>>>分组后的聚合查询>>{}", elasticSearchUtils.subBucketQuery("user_index", "address.keyword", "address", "age", "age_avg"));
break;
default:
break;
}
return "success";
}
}