Elasticsearch实战使用
没有搭建好ES的,请查看另外一篇文章:
链接
1.es和redis的依赖
<!--es查询包 start-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.6.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--core end-->
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.14.1</version>
</dependency>
2.es集群配置类
@Slf4j
@Configuration
public class EsConfiguration {
private static final int ADDRESS_LENGTH = 2;
private static final String HTTP_SCHEME = "http";
@Value("${es.hosts:ip:9200}")
private String[] hosts;
@Value("${es.username:elastic}")
private String username;
@Value("${es.password:elastic}")
private String password;
@Value("${es.connectTimeout:20000}")
private Integer connectTimeout;
@Value("${es.socketTimeout:20000}")
private Integer socketTimeout;
@Value("${es.connectionRequestTimeout:20000}")
private Integer connectionRequestTimeout;
public CredentialsProvider basicCredentialsProvider() {
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
return credentialsProvider;
}
return null;
}
@Bean
public RestClientBuilder restClientBuilder() {
HttpHost[] httpHosts = Arrays.stream(hosts)
.map(this::getHttpHost)
.filter(Objects::nonNull)
.toArray(HttpHost[]::new);
RestClientBuilder builder = RestClient.builder(httpHosts);
CredentialsProvider credentialsProvider = basicCredentialsProvider();
if (credentialsProvider != null) {
builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
builder.setRequestConfigCallback(requestConfigBuilder -> {
//设置连接超时时间
requestConfigBuilder.setConnectTimeout(connectTimeout);
//设置socket超时时间s
requestConfigBuilder.setSocketTimeout(socketTimeout);
//设置连接请求超时时间
requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout);
return requestConfigBuilder;
});
builder.setFailureListener(new RestClient.FailureListener() {
/*3.设置每次节点发生故障时收到通知的侦听器。内部嗅探到故障时被启用。*/
@Override
public void onFailure(Node node) {
log.error("某个节点发生故障,故障地址是" + node.getHost());
}
});
}
return builder;
}
@Bean(name = "highLevelClient")
public RestHighLevelClient highLevelClient(@Autowired RestClientBuilder restClientBuilder) {
return new RestHighLevelClient(restClientBuilder);
}
private HttpHost getHttpHost(String host) {
assert StringUtils.isNotEmpty(host);
String[] address = host.split(":");
if (address.length == ADDRESS_LENGTH) {
String ip = address[0];
int port = Integer.parseInt(address[1]);
return new HttpHost(ip, port, HTTP_SCHEME);
} else {
return null;
}
}
}
3.ES客户端连接类EsOperationClient
package com.mita.web.core.config.es;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author sunpeiyang
* @date 2024/11/7 16:52
*/
@Component
@Slf4j
public class EsOperationClient {
@Resource
private RestHighLevelClient client;
private final RequestOptions options = RequestOptions.DEFAULT;
@Resource
private RedissonClient redissonClient;
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
}
@Override
public void onFailure(Exception e) {
log.error("es异步插入失败=", e);
}
};
/**
* 验证索引是否存在
*/
public boolean indexExists(String index) {
try {
GetIndexRequest request = new GetIndexRequest(index);
request.local(false);
request.humanReadable(true);
return client.indices().exists(request, options);
} catch (Exception e) {
log.error("es检查索引失败=", e);
}
return false;
}
/**
* 创建索引
*/
public boolean createIndex(String indexName) {
//使用redis避免并发创建报错
RLock lock = null;
try {
//加锁
lock = redissonClient.getLock("mita:createIndex:lock:" + indexName);
boolean isOrderLock = lock.tryLock(1, TimeUnit.SECONDS);
if (!isOrderLock) {
//索引创建中
return false;
}
if (!indexExists(indexName)) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
CreateIndexResponse createIndexResponse = client.indices().create(request, options);
return createIndexResponse.isAcknowledged();
}
} catch (Exception e) {
log.error("es创建索引失败=", e);
} finally {
if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return false;
}
/**
* 删除索引
*/
public boolean deleteIndex(String indexName) {
try {
if (indexExists(indexName)) {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
AcknowledgedResponse response = client.indices().delete(request, options);
return response.isAcknowledged();
}
} catch (Exception e) {
log.error("es删除索引失败=", e);
}
return Boolean.FALSE;
}
/**
* 批量插入数据异步
*/
public void bulkDataAsync(String indexName, Map<String, Object> valueMap) {
try {
Set<String> keySet = valueMap.keySet();
//分批插入,每批最多500条
List<List<String>> batches = Lists.partition(new ArrayList<>(keySet), 500);
for (List<String> keyList : batches) {
BulkRequest bulkRequest = new BulkRequest();
for (String id : keyList) {
IndexRequest request = new IndexRequest(indexName);
request.index(indexName).id(id)
.source(JSON.toJSONString(valueMap.get(id)), XContentType.JSON);
bulkRequest.add(request);
}
client.bulkAsync(bulkRequest, options, listener);
}
} catch (Exception e) {
log.error("es批量插入失败=", e);
}
}
/**
* 过滤无效索引
*
* @param indexName 索引名,逗号分隔
* @return 结果
*/
public String getIndexNameAndFilter(String indexName) {
return Arrays.stream(indexName.split(","))
.filter(this::indexExists)
.collect(Collectors.joining(","));
}
/**
* 分页查询数据
*
* @param indexName 索引名
* @param nvMap
* @param orNvMap
* @param rangMap
* @param page
* @param pageSize
* @return
*/
public List<JSONObject> pageOr(String indexName,
Map<String, Object> nvMap,
Map<String, Object> orNvMap,
Map<String, JSONObject> rangMap,
Integer page, Integer pageSize) {
try {
indexName = getIndexNameAndFilter(indexName);
if (StringUtils.isBlank(indexName)) {
return null;
}
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
if (!CollectionUtils.isEmpty(nvMap)) {
for (String name : nvMap.keySet()) {
TermsQueryBuilder termsQueryBuilder;
Object o = nvMap.get(name);
if (o instanceof Collection) {
Collection value = (Collection) o;
termsQueryBuilder = QueryBuilders.termsQuery(name, value);
} else {
termsQueryBuilder = QueryBuilders.termsQuery(name, nvMap.get(name));
}
boolQuery.filter(termsQueryBuilder);
}
}
if (!CollectionUtils.isEmpty(orNvMap)) {
for (String name : orNvMap.keySet()) {
TermsQueryBuilder termsQueryBuilder;
Object o = orNvMap.get(name);
if (o instanceof Collection) {
Collection value = (Collection) o;
termsQueryBuilder = QueryBuilders.termsQuery(name, value);
} else {
termsQueryBuilder = QueryBuilders.termsQuery(name, orNvMap.get(name));
}
boolQuery.should(termsQueryBuilder);
}
}
if (!CollectionUtils.isEmpty(rangMap)) {
for (String range : rangMap.keySet()) {
JSONObject timeObj = rangMap.get(range);
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(range);
rangeQueryBuilder.gte(timeObj.get("startTime"));
rangeQueryBuilder.lte(timeObj.get("endTime"));
boolQuery.filter(rangeQueryBuilder);
}
sourceBuilder.sort("Time");
}
sourceBuilder.query(boolQuery);
sourceBuilder.from(page);
sourceBuilder.size(pageSize);
searchRequest.indices(indexName);
searchRequest.source(sourceBuilder);
SearchResponse search = client.search(searchRequest, options);
SearchHit[] hits = search.getHits().getHits();
List<JSONObject> resultList = Lists.newArrayList();
for (SearchHit hit : hits) {
JSONObject jsonObject = JSON.parseObject(hit.getSourceAsString());
resultList.add(jsonObject);
}
return resultList;
} catch (Exception e) {
log.error("es批量查询失败=", e);
}
return null;
}
public List<JSONObject> selectEsList(String indexName, Map<String, Object> map, int page, int pageSize) {
List<JSONObject> resultList = Lists.newArrayList();
try {
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// TermsQueryBuilder termsQueryBuilder;
// for (Map.Entry<String, Object> entry : map.entrySet()) {
// String key = entry.getKey();
// Object value = entry.getValue();
// termsQueryBuilder = QueryBuilders.termsQuery(key, value);
// boolQuery.filter(termsQueryBuilder);
// }
sourceBuilder.query(boolQuery);
sourceBuilder.from(page);
sourceBuilder.size(pageSize);
searchRequest.indices(indexName);
searchRequest.source(sourceBuilder);
SearchResponse search = client.search(searchRequest, options);
SearchHit[] hits = search.getHits().getHits();
int count = 1;
for (SearchHit hit : hits) {
JSONObject jsonObject = JSON.parseObject(hit.getSourceAsString());
jsonObject.put("number", count++);
resultList.add(jsonObject);
}
} catch (Exception e) {
log.error("获取es数据异常=", e);
}
return resultList;
}
}
4.检查ES是否有索引
5.索引创建
@Slf4j
@RestController
@RequestMapping("/openapi/customer/info")
public class OmsCustomerOrderInfoController extends BaseController {
@Resource
private EsOperationClient esOperationClient;
/**
* 索引创建
*
* @param indexName 索引名称
* @return 结果
*/
@GetMapping("/createIndex")
public AjaxResult createIndex(String indexName) {
return AjaxResult.success(esOperationClient.createIndex(indexName));
}
}
127.0.0.1:10086/openapi/customer/info/createIndex?indexName=20241107test1
6.验证索引是否存在
/**
* 验证索引是否存在
*
* @param indexName 索引名称
* @return 结果
*/
@GetMapping("/indexExists")
public AjaxResult indexExists(String indexName) {
return AjaxResult.success(esOperationClient.indexExists(indexName));
}
7.批量插入数据异步
/**
* 批量插入数据异步
*/
@GetMapping("/bulkDataAsync")
public AjaxResult bulkDataAsync(String indexName) {
Map<String, Object> valueMap = new HashMap<>();
for (int i = 0; i <= 100000; i++) {
OmsCustomerOrderInfo info = new OmsCustomerOrderInfo();
info.setCustomerCode(String.valueOf(i));
info.setCustomerName("我是最可爱的=" + i);
valueMap.put(String.valueOf(i), info);
}
esOperationClient.bulkDataAsync(indexName, valueMap);
return AjaxResult.success();
}
8.查询es数据
@GetMapping("/selectEsList")
public AjaxResult selectEsList(String indexName) {
Map<String, Object> rangMap = Maps.newHashMap();
// rangMap.put("customerCode", "0");
return AjaxResult.success(esOperationClient.selectEsList(indexName, rangMap, 1, 50));
}
/**
*
* @param indexName 索引名称
* @param map 参数
* @param page 页码
* @param pageSize 数据量
* @return 结果
*/
public List<JSONObject> selectEsList(String indexName, Map<String, Object> map, int page, int pageSize) {
List<JSONObject> resultList = Lists.newArrayList();
try {
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
TermsQueryBuilder termsQueryBuilder;
for (Map.Entry<String, Object> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
termsQueryBuilder = QueryBuilders.termsQuery(key, value);
boolQuery.filter(termsQueryBuilder);
}
sourceBuilder.query(boolQuery);
sourceBuilder.from(page);
sourceBuilder.size(pageSize);
searchRequest.indices(indexName);
searchRequest.source(sourceBuilder);
SearchResponse search = client.search(searchRequest, options);
SearchHit[] hits = search.getHits().getHits();
for (SearchHit hit : hits) {
JSONObject jsonObject = JSON.parseObject(hit.getSourceAsString());
resultList.add(jsonObject);
}
} catch (Exception e) {
log.error("获取es数据异常=", e);
throw new RuntimeException(e);
}
return resultList;
}
9.查询ES指定的数据
{
"indexName": "20241108test1",
"page": 0,
"pageSize": 5,
"customerCode":"2"
}
/**
* 查询es数据
*
* @param map 参数
* @return 结果
*/
@PostMapping("/selectEsList")
public AjaxResult selectEsList(@RequestBody Map<String,Object> map) {
return AjaxResult.success(esOperationClient.selectEsList(map));
}
/**
* @param map 参数
* @return 结果
*/
public List<JSONObject> selectEsList(Map<String, Object> map) {
List<JSONObject> resultList = Lists.newArrayList();
try {
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
for (Map.Entry<String, Object> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if ("customerName".equals(key) || "customerCode".equals(key)) {
if (value instanceof Collection) {
TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(key, (Collection<?>) value);
boolQuery.filter(termsQueryBuilder);
} else {
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(key, value);
boolQuery.filter(termQueryBuilder);
}
}
}
sourceBuilder.query(boolQuery);
sourceBuilder.from(MapUtils.getInteger(map, "page"));
sourceBuilder.size(MapUtils.getInteger(map, "pageSize"));
searchRequest.indices(MapUtils.getString(map, "indexName"));
searchRequest.source(sourceBuilder);
// 输出查询条件
log.info("Query=" + boolQuery.toString());
// 输出搜索请求
log.info("SearchRequest=" + searchRequest.toString());
SearchResponse search = client.search(searchRequest, options);
SearchHit[] hits = search.getHits().getHits();
// 输出搜索响应
log.info("SearchResponse=" + search.toString());
for (SearchHit hit : hits) {
JSONObject jsonObject = JSON.parseObject(hit.getSourceAsString());
resultList.add(jsonObject);
}
} catch (Exception e) {
log.error("获取es数据异常=", e);
throw new RuntimeException(e);
}
return resultList;
}
10.使用kibana来查询指定索引数据
GET /20241108test1/_search
{
"query": {
"bool": {
"filter": [
{
"term": {
"customerCode": "2"
}
}
]
}
}
}
11.使用分页查询
插入多一点的数据
/**
* 批量插入数据异步
*/
@GetMapping("/bulkDataAsync")
public AjaxResult bulkDataAsync(String indexName) {
Map<String, Object> valueMap = new HashMap<>();
for (int i = 0; i <= 1000000; i++) {
OmsCustomerOrderInfo info = new OmsCustomerOrderInfo();
info.setCustomerCode(String.valueOf(i));
info.setCustomerName("这个异常通常是由于请求体格式不符合 Elasticsearch 期待的格式。检查你发送的数据是否为有效的 JSON 或压缩数据,并确保请求头和请求体的格式匹配。如果问题仍然存在,检查日志和客户端代码,确保数据的压缩和格式是正确的=" + new Date());
valueMap.put(String.valueOf(i), info);
}
return esOperationClient.bulkDataAsync(indexName, valueMap);
}