当前位置: 首页 > article >正文

Elasticsearch实战使用

没有搭建好ES的,请查看另外一篇文章:

链接

1.es和redis的依赖

<!--es查询包 start-->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.6.0</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--core end-->

        <!--redisson-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.14.1</version>
        </dependency>

2.es集群配置类

@Slf4j
@Configuration
public class EsConfiguration {
    private static final int ADDRESS_LENGTH = 2;

    private static final String HTTP_SCHEME = "http";

    @Value("${es.hosts:ip:9200}")
    private String[] hosts;

    @Value("${es.username:elastic}")
    private String username;

    @Value("${es.password:elastic}")
    private String password;

    @Value("${es.connectTimeout:20000}")
    private Integer connectTimeout;

    @Value("${es.socketTimeout:20000}")
    private Integer socketTimeout;

    @Value("${es.connectionRequestTimeout:20000}")
    private Integer connectionRequestTimeout;

    public CredentialsProvider basicCredentialsProvider() {
        if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
            CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                    new UsernamePasswordCredentials(username, password));
            return credentialsProvider;
        }
        return null;
    }

    @Bean
    public RestClientBuilder restClientBuilder() {
        HttpHost[] httpHosts = Arrays.stream(hosts)
                .map(this::getHttpHost)
                .filter(Objects::nonNull)
                .toArray(HttpHost[]::new);
        RestClientBuilder builder = RestClient.builder(httpHosts);
        CredentialsProvider credentialsProvider = basicCredentialsProvider();
        if (credentialsProvider != null) {
            builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
            builder.setRequestConfigCallback(requestConfigBuilder -> {
                //设置连接超时时间
                requestConfigBuilder.setConnectTimeout(connectTimeout);
                //设置socket超时时间s
                requestConfigBuilder.setSocketTimeout(socketTimeout);
                //设置连接请求超时时间
                requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout);
                return requestConfigBuilder;
            });
            builder.setFailureListener(new RestClient.FailureListener() {
                /*3.设置每次节点发生故障时收到通知的侦听器。内部嗅探到故障时被启用。*/
                @Override
                public void onFailure(Node node) {
                    log.error("某个节点发生故障,故障地址是" + node.getHost());
                }
            });
        }
        return builder;
    }

    @Bean(name = "highLevelClient")
    public RestHighLevelClient highLevelClient(@Autowired RestClientBuilder restClientBuilder) {
        return new RestHighLevelClient(restClientBuilder);
    }


    private HttpHost getHttpHost(String host) {
        assert StringUtils.isNotEmpty(host);
        String[] address = host.split(":");
        if (address.length == ADDRESS_LENGTH) {
            String ip = address[0];
            int port = Integer.parseInt(address[1]);
            return new HttpHost(ip, port, HTTP_SCHEME);
        } else {
            return null;
        }
    }

}

3.ES客户端连接类EsOperationClient

package com.mita.web.core.config.es;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author sunpeiyang
 * @date 2024/11/7 16:52
 */
@Component
@Slf4j
public class EsOperationClient {

    @Resource
    private RestHighLevelClient client;

    private final RequestOptions options = RequestOptions.DEFAULT;

    @Resource
    private RedissonClient redissonClient;

    ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
        @Override
        public void onResponse(BulkResponse bulkResponse) {
        }

        @Override
        public void onFailure(Exception e) {
            log.error("es异步插入失败=", e);
        }
    };

    /**
     * 验证索引是否存在
     */
    public boolean indexExists(String index) {
        try {
            GetIndexRequest request = new GetIndexRequest(index);
            request.local(false);
            request.humanReadable(true);
            return client.indices().exists(request, options);
        } catch (Exception e) {
            log.error("es检查索引失败=", e);
        }
        return false;
    }

    /**
     * 创建索引
     */
    public boolean createIndex(String indexName) {
        //使用redis避免并发创建报错
        RLock lock = null;
        try {
            //加锁
            lock = redissonClient.getLock("mita:createIndex:lock:" + indexName);
            boolean isOrderLock = lock.tryLock(1, TimeUnit.SECONDS);
            if (!isOrderLock) {
                //索引创建中
                return false;
            }
            if (!indexExists(indexName)) {
                CreateIndexRequest request = new CreateIndexRequest(indexName);
                CreateIndexResponse createIndexResponse = client.indices().create(request, options);
                return createIndexResponse.isAcknowledged();
            }
        } catch (Exception e) {
            log.error("es创建索引失败=", e);
        } finally {
            if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
        return false;
    }

    /**
     * 删除索引
     */
    public boolean deleteIndex(String indexName) {
        try {
            if (indexExists(indexName)) {
                DeleteIndexRequest request = new DeleteIndexRequest(indexName);
                AcknowledgedResponse response = client.indices().delete(request, options);
                return response.isAcknowledged();
            }
        } catch (Exception e) {
            log.error("es删除索引失败=", e);
        }
        return Boolean.FALSE;
    }

    /**
     * 批量插入数据异步
     */
    public void bulkDataAsync(String indexName, Map<String, Object> valueMap) {
        try {
            Set<String> keySet = valueMap.keySet();
            //分批插入,每批最多500条
            List<List<String>> batches = Lists.partition(new ArrayList<>(keySet), 500);
            for (List<String> keyList : batches) {
                BulkRequest bulkRequest = new BulkRequest();
                for (String id : keyList) {
                    IndexRequest request = new IndexRequest(indexName);
                    request.index(indexName).id(id)
                            .source(JSON.toJSONString(valueMap.get(id)), XContentType.JSON);
                    bulkRequest.add(request);
                }
                client.bulkAsync(bulkRequest, options, listener);
            }
        } catch (Exception e) {
            log.error("es批量插入失败=", e);
        }
    }

    /**
     * 过滤无效索引
     *
     * @param indexName 索引名,逗号分隔
     * @return 结果
     */
    public String getIndexNameAndFilter(String indexName) {
        return Arrays.stream(indexName.split(","))
                .filter(this::indexExists)
                .collect(Collectors.joining(","));
    }

    /**
     * 分页查询数据
     *
     * @param indexName 索引名
     * @param nvMap
     * @param orNvMap
     * @param rangMap
     * @param page
     * @param pageSize
     * @return
     */
    public List<JSONObject> pageOr(String indexName,
                                   Map<String, Object> nvMap,
                                   Map<String, Object> orNvMap,
                                   Map<String, JSONObject> rangMap,
                                   Integer page, Integer pageSize) {
        try {
            indexName = getIndexNameAndFilter(indexName);
            if (StringUtils.isBlank(indexName)) {
                return null;
            }
            SearchRequest searchRequest = new SearchRequest();
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
            if (!CollectionUtils.isEmpty(nvMap)) {
                for (String name : nvMap.keySet()) {
                    TermsQueryBuilder termsQueryBuilder;
                    Object o = nvMap.get(name);
                    if (o instanceof Collection) {
                        Collection value = (Collection) o;
                        termsQueryBuilder = QueryBuilders.termsQuery(name, value);
                    } else {
                        termsQueryBuilder = QueryBuilders.termsQuery(name, nvMap.get(name));
                    }
                    boolQuery.filter(termsQueryBuilder);
                }
            }
            if (!CollectionUtils.isEmpty(orNvMap)) {
                for (String name : orNvMap.keySet()) {
                    TermsQueryBuilder termsQueryBuilder;
                    Object o = orNvMap.get(name);
                    if (o instanceof Collection) {
                        Collection value = (Collection) o;
                        termsQueryBuilder = QueryBuilders.termsQuery(name, value);
                    } else {
                        termsQueryBuilder = QueryBuilders.termsQuery(name, orNvMap.get(name));
                    }
                    boolQuery.should(termsQueryBuilder);
                }
            }

            if (!CollectionUtils.isEmpty(rangMap)) {
                for (String range : rangMap.keySet()) {
                    JSONObject timeObj = rangMap.get(range);
                    RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(range);
                    rangeQueryBuilder.gte(timeObj.get("startTime"));
                    rangeQueryBuilder.lte(timeObj.get("endTime"));
                    boolQuery.filter(rangeQueryBuilder);
                }
                sourceBuilder.sort("Time");
            }
            sourceBuilder.query(boolQuery);
            sourceBuilder.from(page);
            sourceBuilder.size(pageSize);
            searchRequest.indices(indexName);
            searchRequest.source(sourceBuilder);
            SearchResponse search = client.search(searchRequest, options);
            SearchHit[] hits = search.getHits().getHits();
            List<JSONObject> resultList = Lists.newArrayList();
            for (SearchHit hit : hits) {
                JSONObject jsonObject = JSON.parseObject(hit.getSourceAsString());
                resultList.add(jsonObject);
            }
            return resultList;
        } catch (Exception e) {
            log.error("es批量查询失败=", e);
        }
        return null;
    }

    public List<JSONObject> selectEsList(String indexName, Map<String, Object> map, int page, int pageSize) {
        List<JSONObject> resultList = Lists.newArrayList();
        try {
            SearchRequest searchRequest = new SearchRequest();
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//            TermsQueryBuilder termsQueryBuilder;
//            for (Map.Entry<String, Object> entry : map.entrySet()) {
//                String key = entry.getKey();
//                Object value = entry.getValue();
//                termsQueryBuilder = QueryBuilders.termsQuery(key, value);
//                boolQuery.filter(termsQueryBuilder);
//            }
            sourceBuilder.query(boolQuery);
            sourceBuilder.from(page);
            sourceBuilder.size(pageSize);
            searchRequest.indices(indexName);
            searchRequest.source(sourceBuilder);
            SearchResponse search = client.search(searchRequest, options);
            SearchHit[] hits = search.getHits().getHits();
            int count = 1;
            for (SearchHit hit : hits) {
                JSONObject jsonObject = JSON.parseObject(hit.getSourceAsString());
                jsonObject.put("number", count++);
                resultList.add(jsonObject);
            }
        } catch (Exception e) {
            log.error("获取es数据异常=", e);
        }
        return resultList;
    }

}

4.检查ES是否有索引

在这里插入图片描述
在这里插入图片描述

5.索引创建

@Slf4j
@RestController
@RequestMapping("/openapi/customer/info")
public class OmsCustomerOrderInfoController extends BaseController {
@Resource
    private EsOperationClient esOperationClient;
/**
     * 索引创建
     *
     * @param indexName 索引名称
     * @return 结果
     */
    @GetMapping("/createIndex")
    public AjaxResult createIndex(String indexName) {
        return AjaxResult.success(esOperationClient.createIndex(indexName));
    }
}

127.0.0.1:10086/openapi/customer/info/createIndex?indexName=20241107test1

在这里插入图片描述

6.验证索引是否存在

/**
     * 验证索引是否存在
     *
     * @param indexName 索引名称
     * @return 结果
     */
    @GetMapping("/indexExists")
    public AjaxResult indexExists(String indexName) {
        return AjaxResult.success(esOperationClient.indexExists(indexName));
    }

在这里插入图片描述

7.批量插入数据异步

/**
     * 批量插入数据异步
     */
    @GetMapping("/bulkDataAsync")
    public AjaxResult bulkDataAsync(String indexName) {
        Map<String, Object> valueMap = new HashMap<>();
        for (int i = 0; i <= 100000; i++) {
            OmsCustomerOrderInfo info = new OmsCustomerOrderInfo();
            info.setCustomerCode(String.valueOf(i));
            info.setCustomerName("我是最可爱的=" + i);
            valueMap.put(String.valueOf(i), info);
        }
        esOperationClient.bulkDataAsync(indexName, valueMap);
        return AjaxResult.success();
    }

在这里插入图片描述

8.查询es数据

@GetMapping("/selectEsList")
    public AjaxResult selectEsList(String indexName) {
        Map<String, Object> rangMap = Maps.newHashMap();
//        rangMap.put("customerCode", "0");
        return AjaxResult.success(esOperationClient.selectEsList(indexName, rangMap, 1, 50));
    }
/**
     *
     * @param indexName 索引名称
     * @param map 参数
     * @param page 页码
     * @param pageSize 数据量
     * @return 结果
     */
    public List<JSONObject> selectEsList(String indexName, Map<String, Object> map, int page, int pageSize) {
        List<JSONObject> resultList = Lists.newArrayList();
        try {
            SearchRequest searchRequest = new SearchRequest();
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
            TermsQueryBuilder termsQueryBuilder;
            for (Map.Entry<String, Object> entry : map.entrySet()) {
                String key = entry.getKey();
                Object value = entry.getValue();
                termsQueryBuilder = QueryBuilders.termsQuery(key, value);
                boolQuery.filter(termsQueryBuilder);
            }
            sourceBuilder.query(boolQuery);
            sourceBuilder.from(page);
            sourceBuilder.size(pageSize);
            searchRequest.indices(indexName);
            searchRequest.source(sourceBuilder);
            SearchResponse search = client.search(searchRequest, options);
            SearchHit[] hits = search.getHits().getHits();
            for (SearchHit hit : hits) {
                JSONObject jsonObject = JSON.parseObject(hit.getSourceAsString());
                resultList.add(jsonObject);
            }
        } catch (Exception e) {
            log.error("获取es数据异常=", e);
            throw new RuntimeException(e);
        }
        return resultList;
    }

在这里插入图片描述

9.查询ES指定的数据

在这里插入图片描述

{
    "indexName": "20241108test1",
    "page": 0,
    "pageSize": 5,
    "customerCode":"2"
}
/**
     * 查询es数据
     *
     * @param map 参数
     * @return 结果
     */
    @PostMapping("/selectEsList")
    public AjaxResult selectEsList(@RequestBody Map<String,Object> map) {
        return AjaxResult.success(esOperationClient.selectEsList(map));
    }
/**
     * @param map 参数
     * @return 结果
     */
    public List<JSONObject> selectEsList(Map<String, Object> map) {
        List<JSONObject> resultList = Lists.newArrayList();
        try {
            SearchRequest searchRequest = new SearchRequest();
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
            for (Map.Entry<String, Object> entry : map.entrySet()) {
                String key = entry.getKey();
                Object value = entry.getValue();
                if ("customerName".equals(key) || "customerCode".equals(key)) {
                    if (value instanceof Collection) {
                        TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery(key, (Collection<?>) value);
                        boolQuery.filter(termsQueryBuilder);
                    } else {
                        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(key, value);
                        boolQuery.filter(termQueryBuilder);
                    }
                }
            }
            sourceBuilder.query(boolQuery);
            sourceBuilder.from(MapUtils.getInteger(map, "page"));
            sourceBuilder.size(MapUtils.getInteger(map, "pageSize"));
            searchRequest.indices(MapUtils.getString(map, "indexName"));
            searchRequest.source(sourceBuilder);
            // 输出查询条件
            log.info("Query=" + boolQuery.toString());
            // 输出搜索请求
            log.info("SearchRequest=" + searchRequest.toString());
            SearchResponse search = client.search(searchRequest, options);
            SearchHit[] hits = search.getHits().getHits();
            // 输出搜索响应
            log.info("SearchResponse=" + search.toString());
            for (SearchHit hit : hits) {
                JSONObject jsonObject = JSON.parseObject(hit.getSourceAsString());
                resultList.add(jsonObject);
            }
        } catch (Exception e) {
            log.error("获取es数据异常=", e);
            throw new RuntimeException(e);
        }
        return resultList;
    }

在这里插入图片描述

10.使用kibana来查询指定索引数据

GET /20241108test1/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "customerCode": "2"
          }
        }
      ]
    }
  }
}

在这里插入图片描述

11.使用分页查询

插入多一点的数据

/**
     * 批量插入数据异步
     */
    @GetMapping("/bulkDataAsync")
    public AjaxResult bulkDataAsync(String indexName) {
        Map<String, Object> valueMap = new HashMap<>();
        for (int i = 0; i <= 1000000; i++) {
            OmsCustomerOrderInfo info = new OmsCustomerOrderInfo();
            info.setCustomerCode(String.valueOf(i));
            info.setCustomerName("这个异常通常是由于请求体格式不符合 Elasticsearch 期待的格式。检查你发送的数据是否为有效的 JSON 或压缩数据,并确保请求头和请求体的格式匹配。如果问题仍然存在,检查日志和客户端代码,确保数据的压缩和格式是正确的=" + new Date());
            valueMap.put(String.valueOf(i), info);
        }
        return esOperationClient.bulkDataAsync(indexName, valueMap);
    }

在这里插入图片描述

一页查询50条

在这里插入图片描述

查询第四页

在这里插入图片描述

以上就是ES的简单使用,里面还有很多细节需要讲解,放在下一次吧


http://www.kler.cn/a/388062.html

相关文章:

  • 【大数据技术基础 | 实验十】Hive实验:部署Hive
  • 大数据技术之Hadoop :我是恁爹
  • C++20 概念与约束(1)—— SFINAE
  • 组件间通信(组件间传递数据)
  • SQL HAVING子句
  • 鸿蒙多线程开发——并发模型对比(Actor与内存共享)
  • u盘怎么重装电脑系统_u盘重装电脑系统步骤和详细教程【新手宝典】
  • Hive中查看字段中是否包含某些字符串的函数
  • Git 入门篇(三)
  • 发布 VectorTraits v3.0(支持 X86架构的Avx512系列指令集,支持 Wasm架构及PackedSimd指令集等)
  • 从0开始深度学习(24)——填充和步幅
  • 通过 SSH 连接远程 Ubuntu 服务器
  • 24下半年教资面试资源(幼儿+小学+初中+高中+各科)逐字稿
  • Redis集群——针对实习面试
  • JDK8主要特性
  • React 中 `key` 属性的警告及其解决方案
  • C++设计模式精选面试题及参考答案
  • 如何找到系统中bert-base-uncased默认安装位置
  • 数据结构和算法-贪心算法01- 认识贪心
  • 如何利用 Python 的爬虫技术获取淘宝天猫商品的价格信息?
  • 手写线程池c
  • 前端基础面试题·第四篇——Vue(其三)
  • 【flask web】补充内容,session、全局对象g、装饰器、钩子函数。
  • RandomWords随机生成单词
  • 「QT」几何数据类 之 QLineF 浮点型直线类
  • 前端入门一之DOM、获取元素、DOM核心、事件高级、操作元素、事件基础、节点操作