当前位置: 首页 > article >正文

ElasticsearchJavaClient工具类分享

最近升级了Elasticsearch版本,从7.X升级到8.X的变化还是比较大的,原来7版本用的是RestHighLevelClient,8.X弃用RestHighLevelClient转而支持ElasticsearchClient,并且api调用方式经过建造者模式的改造,变成了链式调用。

因此为了更好地使用ElasticsearchClient的api操作Elasticsearch,封装了一个工具类,包含了常用的一些数据操作的方法。废话不多说直接上代码。。。

1、pom依赖

        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>8.15.2</version>
        </dependency>
        <dependency>
            <artifactId>elasticsearch-rest-client</artifactId>
            <groupId>org.elasticsearch.client</groupId>
            <version>8.15.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.30</version>
        </dependency>

2、工具类代码

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.indices.AnalyzeRequest;
import co.elastic.clients.elasticsearch.indices.AnalyzeResponse;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.analyze.AnalyzeToken;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import java.io.IOException;
import java.io.StringReader;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * Elasticsearch工具类
 * Elasticsearch版本:8.15.3
 */
public class ElasticsearchJavaClient {
    private ElasticsearchClient client;

    /**
     * 构造方法,获取客户端(未开启认证)
     * @param httpUrls
     */
    public ElasticsearchJavaClient(String[] httpUrls){
        HttpHost[] httpHosts = Arrays.stream(httpUrls).map(HttpHost::create).toArray(HttpHost[]::new);
        this.client = new ElasticsearchClient(new RestClientTransport(RestClient.builder(httpHosts).build(),
                new JacksonJsonpMapper()));
    }

    /**
     * 构造方法,获取客户端(开启认证,通过用户名密码进行认证并获取客户端)
     * @param httpUrls
     * @param username
     * @param password
     */
    public ElasticsearchJavaClient(String[] httpUrls, String username, String password){
        HttpHost[] httpHosts = Arrays.stream(httpUrls).map(HttpHost::create).toArray(HttpHost[]::new);
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
        RestClientBuilder builder = RestClient.builder(httpHosts);
        builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            }
        });
        this.client = new ElasticsearchClient(new RestClientTransport(builder.build(), new JacksonJsonpMapper()));
    }


    /**
     * 创建索引
     * @param indexName  索引名
     * @param numberOfShards  分片数
     * @param numberOfReplicas  副本数
     * @param mapping  mapping设计json字符串
     * @return
     */
    public boolean createIndex(String indexName, Integer numberOfShards,
                               Integer numberOfReplicas, String mapping) {
        CreateIndexResponse response = null;
        try {
            response = client.indices()
                    .create(builder -> builder.index(indexName)
                            .settings(b -> b.numberOfReplicas(numberOfReplicas.toString())
                                    .numberOfShards(numberOfShards.toString()))
                            .mappings(a -> a.withJson(new StringReader(mapping))));
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            client.shutdown();
        }
        return response.acknowledged();
    }

    /**
     * 删除索引
     * @param indexName  索引名
     * @return
     */
    public boolean deleteIndex(String indexName) {
        try {
            return client.indices().delete(a -> a.index(indexName)).acknowledged();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 判断索引是否已存在
     * @param indexName 索引名
     * @return
     */
    public boolean indexExisit(String indexName) {
        try {
            return client.indices().exists(req -> req.index(indexName)).value();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 由于数据落盘有默认的1秒延迟,刷新后使数据能被检索到
     * @param indexString
     */
    public void refresh(String indexString){
        try {
            client.indices().refresh(req -> req.index(indexString));
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
    }

    /**
     * 插入数据
     * @param indexName
     * @param data
     * @return
     */
    public String insertData(String indexName, JSONObject data){
        try {
            IndexResponse response = client.index(a -> a.index(indexName).document(data));
            return response.id();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return null;
    }

    /**
     * 根据索引和_id查询数据
     * @param indexName
     * @param id
     * @return
     */
    public Map<String, Object> getDocById(String indexName, String id) {
        GetRequest request = GetRequest.of(g -> g.index(indexName).id(id));
        try {
            GetResponse<Map> response = client.get(request, Map.class);
            if(response.found()){
                return response.source();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return null;
    }

    /**
     * 根据索引和_id查询数据
     * @param indexName
     * @param id
     * @return
     */
    public JSONObject getDocInfoById(String indexName, String id) {
        GetRequest request = GetRequest.of(g -> g.index(indexName).id(id));
        try {
            GetResponse<JSONObject> response = client.get(request, JSONObject.class);
            if(response.found()){
                return response.source();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return null;
    }

    /**
     * 根据索引和_id查询数据,并过滤掉无需返回的字段
     * @param indexName
     * @param id
     * @param excludes
     * @return
     */
    public JSONObject getDocInfoById(String indexName, String id, String [] excludes) {
        GetRequest request = GetRequest.of(g -> g.index(indexName).id(id).sourceExcludes(Arrays.asList(excludes)));
        try {
            GetResponse<JSONObject> response = client.get(request, JSONObject.class);
            if(response.found()){
                return response.source();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return null;
    }

    /**
     * 根据索引和_id查询数据,并过指定要返回的字段
     * @param indexName
     * @param id
     * @param includes
     * @return
     */
    public JSONObject getDocInfoByIdWithIncludes(String indexName, String id, String [] includes) {
        GetRequest request = GetRequest.of(g -> g.index(indexName).id(id).sourceIncludes(Arrays.asList(includes)));
        try {
            GetResponse<JSONObject> response = client.get(request, JSONObject.class);
            if(response.found()){
                return response.source();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return null;
    }

    /**
     * 判断数据是否存在
     * @param indexName
     * @param id
     * @return
     */
    public boolean exists(String indexName, String id) {
        GetRequest request = GetRequest.of(g -> g.index(indexName).id(id));
        try {
            GetResponse<JSONObject> response = client.get(request, JSONObject.class);
            return response.found();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 根据索引和_id删除数据
     * @param indexName
     * @param id
     * @return
     */
    public boolean deleteDocById(String indexName, String id) {
        DeleteRequest request = DeleteRequest.of(a -> a.index(indexName).id(id));
        try {
            DeleteResponse response = client.delete(request);
            if(response != null && response.result() != null){
                return Result.Deleted.jsonValue().equals(response.result().jsonValue());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 更新数据
     * @param indexName
     * @param id
     * @param newDoc
     * @return
     */
    public boolean updateDocById(String indexName, String id, JSONObject newDoc) {
        UpdateRequest request = UpdateRequest.of(r -> r.id(id).index(indexName).doc(newDoc));
        request.refresh();
        try {
            UpdateResponse response = client.update(request, JSONObject.class);
            if(response != null && response.result() != null){
                return Result.Updated.jsonValue().equals(response.result().jsonValue());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 对输入的text使用analyzerName进行分词,返回分词后的词项
     * @param analyzerName
     * @param text
     * @return
     */
    public List<AnalyzeToken> analyze(String analyzerName, String text){
        AnalyzeRequest analyzeRequest = new AnalyzeRequest.Builder().analyzer(analyzerName).text(text).build();
        AnalyzeResponse response = null;
        try {
            response = client.indices().analyze(analyzeRequest);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return response.tokens();
    }

    /**
     * 批量删除
     * @param requestList
     * @return
     */
    public boolean bulkDelete(List<DeleteRequest> requestList){
        List<BulkOperation> ops = requestList
                .stream()
                .map(req -> BulkOperation.of(op -> op
                        .delete(d -> d.id(req.id()).index(req.index()))))
                .collect(Collectors.toList());
        try {
            BulkResponse response = client.bulk(r -> r.operations(ops));
            if(response != null ){
                return true;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 批量更新
     * @param requestList
     * @return
     */
    public boolean bulkUpdate(List<UpdateRequest> requestList){
        List<BulkOperation> ops = requestList
                .stream()
                .map(req -> BulkOperation.of(op -> op
                        .update(d -> d.id(req.id())
                                .index(req.index())
                                .action(a -> a.doc(req.doc())))))
                .collect(Collectors.toList());
        try {
            BulkResponse response = client.bulk(r -> r.operations(ops));
            if(response != null ){
                return true;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 批量插入数据
     * @param requestList
     * @return
     */
    public boolean bulkInsert(List<IndexRequest> requestList){
        List<BulkOperation> ops = requestList
                .stream()
                .map(req -> BulkOperation.of(op -> op
                        .index(i -> i.document(req.document()).index(req.index()))))
                .collect(Collectors.toList());
        try {
            BulkResponse response = client.bulk(r -> r.operations(ops));
            if(response != null ){
                return true;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 通过脚本批量更新
     * @param index
     * @param query
     * @param script
     * @return
     */
    public boolean updateByquery(String index, BoolQuery query, String script){
        try {
            UpdateByQueryResponse response = client.updateByQuery(q -> q.index(index)
                    .query(query._toQuery())
                    .script(s -> s.source(script)));
            if(response != null ){
                return true;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 检索
     * @param indexName
     * @param pageNo
     * @param pageSize
     * @param sortField
     * @param sortOrder
     * @param boolQuery
     * @return
     */
    public SearchResponse search(String indexName, Integer pageNo, Integer pageSize,
                                 String sortField, SortOrder sortOrder, BoolQuery boolQuery) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .from((pageNo - 1) * pageSize)
                .size(pageSize)
                .sort(s -> s.field(f -> f.field(sortField)
                        .order(sortOrder))).build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public SearchResponse search(String indexName, Integer pageNo, Integer pageSize,
                                 String sortField, SortOrder sortOrder, BoolQuery boolQuery, String[] excludes) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .source(a -> a.filter(f -> f.excludes(Arrays.asList(excludes))))
                .from((pageNo - 1) * pageSize)
                .size(pageSize)
                .sort(s -> s.field(f -> f.field(sortField)
                        .order(sortOrder))).build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public SearchResponse search(String indexName, Integer pageNo, Integer pageSize, BoolQuery boolQuery,
                                 String sortField, SortOrder sortOrder, String sortField2, SortOrder sortOrder2) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .from((pageNo - 1) * pageSize)
                .size(pageSize)
                .sort(s -> s.field(f -> f.field(sortField)
                        .order(sortOrder).field(sortField2).order(sortOrder2))).build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public SearchResponse search(String indexName, Integer pageNo, Integer pageSize, BoolQuery boolQuery,
                                 String sortField, SortOrder sortOrder, String sortField2, SortOrder sortOrder2,
                                 String[] excludes) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .source(a -> a.filter(f -> f.excludes(Arrays.asList(excludes))))
                .from((pageNo - 1) * pageSize)
                .size(pageSize)
                .sort(s -> s.field(f -> f.field(sortField)
                        .order(sortOrder).field(sortField2).order(sortOrder2))).build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public SearchResponse search(String indexName, Integer pageNo, Integer pageSize, BoolQuery boolQuery,
                                 String sortField, SortOrder sortOrder, String[] includes, String[] excludes) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .source(a -> a.filter(f -> f.excludes(Arrays.asList(excludes)).includes(Arrays.asList(includes))))
                .from((pageNo - 1) * pageSize)
                .size(pageSize)
                .sort(s -> s.field(f -> f.field(sortField)
                        .order(sortOrder))).build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public SearchResponse search(String indexName, Integer pageNo, Integer pageSize,  BoolQuery boolQuery,
                                 String sortField, SortOrder sortOrder, String time) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .from((pageNo - 1) * pageSize)
                .size(pageSize)
                .scroll(new Time.Builder().time(time).build())
                .sort(s -> s.field(f -> f.field(sortField)
                        .order(sortOrder))).build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    /**
     * 查询符合条件的数据条数
     * @param indexName
     * @param boolQuery
     * @return
     */
    public CountResponse count(String indexName, BoolQuery boolQuery) {
        try {
            return client.count(c -> c.index(indexName).query(q -> q.bool(boolQuery)));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    public SearchResponse search(String indexName, BoolQuery boolQuery) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public SearchResponse search(String indexName, BoolQuery boolQuery, int size) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .size(size)
                .build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }
    public SearchResponse search(String indexName,  Integer pageSize, BoolQuery boolQuery, String time){
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .size(pageSize)
                .scroll(new Time.Builder().time(time).build())
                .build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public ScrollResponse searchScroll(String scrollId, String time){
        ScrollRequest request = new ScrollRequest.Builder().scrollId(scrollId).scroll(s -> s.time(time)).build();
        try {
            return client.scroll(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return null;
    }

    public boolean clearScroll (String scrollId){
        ClearScrollRequest request = new ClearScrollRequest.Builder().scrollId(scrollId).build();
        ClearScrollResponse response = null;
        try {
            response = client.clearScroll(request);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response.succeeded();
    }
}

如果本文对你有帮助,请点赞、收藏 + 关注,谢谢!!(本文将持续更新)


http://www.kler.cn/a/500663.html

相关文章:

  • 【数据结构高阶】B-树
  • 通过ESP32和INMP441麦克风模块实现音频数据传递
  • Ubuntu 磁盘修复
  • 图像处理中实现 C++ 和 Python 的高效通信——Boost.Interprocess mmap
  • 【Uniapp-Vue3】插槽Slots及具名插槽实现组件高度定制化
  • [SAP ABAP] APPEND INITIAL LINE 追加空行
  • 苍穹外卖08——(涉及接收日期格式数据、ApachePOI导出报表、sql获取top10菜品数据)
  • doris:模型注意事项
  • npm 与 pnpm:JavaScript 包管理工具的对比与选择
  • 泛目录和泛站有什么差别
  • 跳表和Mysql联合索引的最左原则和索引下推的优化
  • 禅道使用实践(2)-产品篇
  • Golang笔记——rune和byte
  • Linux 容器漏洞
  • Spark Streaming专题
  • MySQL面试题--最最最详细MySQL面试题和解答
  • React中透过render函数学习(一)——workInProgress与双缓存机制
  • 现代 CPU 的高性能架构与并发安全问题
  • 【ESP32】ESP-IDF开发 | WiFi开发 | 基站模式 + AP扫描和AP连接例程
  • iOS实际开发中使用Alamofire实现多文件上传(以个人相册为例)
  • Kafka 主题管理
  • Java面试核心知识4