当前位置: 首页 > article >正文

Elasticsearch Java API Client [8.17] 使用

es8出了个新的JavaClient,相较于HighLevelClient少了对es源代码的引用,更加的轻便了,这里记录下使用方式。

文档地址:Introduction | Elasticsearch Java API Client [8.17] | Elastic

本地运行的是es 8.6.2版本

JavaClient使用的是8.17

1、首先创建个java springboot项目

源码地址:https://github.com/a66245753/es-8-java-client.git

pom依赖文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.4.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.david</groupId>
    <artifactId>es8-java-client</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>es8-java-client</name>
    <description>es8-java-client</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>8.17.3</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.17.0</version>
        </dependency>
        <!-- Lombok 依赖 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-spring-boot3-starter</artifactId>
            <version>3.5.8</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
            <version>8</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.yml

数据库需要改为自己的

spring:
  application:
    name: es8-java-client


  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/david?charset=utf8mb4&parseTime=True&loc=Local
    username: david
    password: david


mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

按照官网上声明EsClient 为spring bean

@Configuration
public class EsClientConfig {

    @Bean
    public ElasticsearchClient elasticsearchClient() {
        // URL and API key
        String serverUrl = "http://localhost:9200";
        //        String apiKey = "VnVhQ2ZHY0JDZGJrU...";

        // Create the low-level client
        RestClient restClient = RestClient
                .builder(HttpHost.create(serverUrl))
//                .setDefaultHeaders(new Header[]{
//                        new BasicHeader("Authorization", "ApiKey " + apiKey)
//                })
                .build();

        // Create the transport with a Jackson mapper
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        // And create the API client
        return new ElasticsearchClient(transport);
    }
}

2、es client使用

class:com.david.coltroller.EsDocController

2.1 新增数据

有单个新增和批量新增,使用批量时最好把索引的更新频率设置一下

/**
     * https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/indexing.html
     *
     * @param add
     * @return
     * @throws IOException
     */
    @PostMapping("/add")
    public ResponseEntity<EsResponseBody> add(@RequestBody EsDocAdd add) throws IOException {
        try {
            IndexResponse response = elasticsearchClient.index(i -> i
                    .index(add.getIndex())
                    // 不设置id,默认使用es生成的
                    // .id(product.getSku())
                    .document(add.getDoc())
            );
            EsResponseBody responseBody = new EsResponseBody(0, response.id(), null);
            return new ResponseEntity<>(responseBody, HttpStatus.OK);
        } catch (ElasticsearchException e) {
            EsResponseBody responseBody = new EsResponseBody(-1, null, e.error().toString());
            return new ResponseEntity<>(responseBody, HttpStatus.OK);
        }
    }

    /**
     * https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/indexing-bulk.html
     *
     * @param add
     * @return
     * @throws IOException
     */
    @PostMapping("/addBatch")
    public ResponseEntity<EsResponseBody> addBatch(@RequestBody EsDocAddBatch add) throws IOException {

        BulkRequest.Builder br = new BulkRequest.Builder();
        for (ObjectNode doc : add.getDocs()) {
            br.operations(op -> op
                    .index(idx -> idx
                            .index(add.getIndex())
                            .document(doc)
                    )
            );
        }
        try {
            BulkResponse result = elasticsearchClient.bulk(br.build());
            if (result.errors()) {
                List<EsDocBucketError> list = new ArrayList<>();
                int i = 0;
                for (BulkResponseItem item : result.items()) {
                    if (item.error() != null) {
                        list.add(new EsDocBucketError(item.error().reason(), add.getDocs().get(i)));
                    }
                    i++;
                }
                EsResponseBody responseBody = new EsResponseBody(-1, list, "失败");
                return new ResponseEntity<>(responseBody, HttpStatus.OK);
            }
            EsResponseBody responseBody = new EsResponseBody(0, null, null);
            return new ResponseEntity<>(responseBody, HttpStatus.OK);
        } catch (ElasticsearchException e) {
            EsResponseBody responseBody = new EsResponseBody(-1, null, e.error().toString());
            return new ResponseEntity<>(responseBody, HttpStatus.OK);
        }
    }

建议使用模版的方式创建索引,比如我下面的dsl语句规定了索引内字段的类型。


PUT _index_template/ssp_ad_union_log_template
{
  // 匹配的索引模式
  "index_patterns": ["ssp_ad_union_log_*"], 
  "template": {
    "settings": {
      "number_of_shards": 1,       
      "number_of_replicas": 1,
      "refresh_interval":"1s"      
    },
    "mappings": {
      "dynamic": "strict",
      "properties": {
        "id": { "type": "long" },   
        "reqId": { "type": "keyword" },   
        "device": { "type": "keyword" },
        "platform": { "type": "byte" },
        "platformName":{"type":"keyword"},
        "clientType": { "type": "byte" },
        "myAppId":{"type":"keyword"},
        "deviceId":{"type":"keyword"},
        "adSiteGroupId": { "type": "long" },
        "adSiteId":{"type":"keyword"},
        "packagePath":{"type":"keyword"},
        "ecpm": { "type": "long" },
        "location":{"type":"geo_point"},
        "ip":{"type":"ip"},
        "cityId":{"type":"integer"},
        "cityName":{"type":"keyword"},
        "areaId":{"type":"integer"},
        "areaName":{"type":"keyword"},
        "provinceId":{"type":"integer"},
        "provinceName":{"type":"keyword"},
        "phoneBrand":{"type":"keyword"},
        "phoneBrandName":{"type":"keyword"},
        "phoneModel":{"type":"keyword"},
        "idfa":{"type":"keyword"},
        "bizType":{"type":"byte"},
        "sdkVersion":{"type":"keyword"},
        "network":{"type":"keyword"},
        "logTime":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"},
        "createdAt":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"}
      }
    },
    "aliases":{
      "ssp_ad_union_log":{}
    }
  }
}

2.2 根据es的objectId获取数据


    /**
     * 根据es的objectId获取数据
     * @param index
     * @param id
     * @return
     * @throws IOException
     */
    @GetMapping("/get")
    public ResponseEntity<EsResponseBody> get(@RequestParam("index") String index, @RequestParam("id") String id) throws IOException {

        GetResponse<ObjectNode> response = elasticsearchClient.get(g -> g
                        .index(index)
                        .id(id),
                ObjectNode.class
        );
        if (response.found()) {
            EsResponseBody responseBody = new EsResponseBody(0, response.source(), null);
            return new ResponseEntity<>(responseBody, HttpStatus.OK);
        }
        EsResponseBody responseBody = new EsResponseBody(-1, null, "not found");
        return new ResponseEntity<>(responseBody, HttpStatus.OK);
    }

    /**
     * 根据es的多个objectId获取数据
     * @param params
     * @return
     * @throws IOException
     */
    @PostMapping("/getBatch")
    public ResponseEntity<EsResponseBody> getBatch(@RequestBody EsGetBatchParams params) throws IOException {

        MgetResponse<ObjectNode> response = elasticsearchClient.mget(g -> g
                        .index(params.getIndex())
                        .ids(params.getIdList()),
                ObjectNode.class
        );
        List<ObjectNode> list = new ArrayList<>();
        for (MultiGetResponseItem<ObjectNode> doc : response.docs()) {
            if (doc.isResult()) {
                list.add(doc.result().source());
            }
            if (doc.isFailure()) {
                System.out.println("isFailure doc" + doc.toString());
            }
        }
        EsResponseBody responseBody = new EsResponseBody(0, list, null);
        return new ResponseEntity<>(responseBody, HttpStatus.OK);
    }

2.3 查询分页

这里写了个通用查询

/**
     * 查询分页
     * @param request
     * @return
     * @throws IOException
     */
    @PostMapping("/search")
    public ResponseEntity<EsResponseBody> search(@RequestBody EsSearchRequest request) throws IOException {
        // 构造查询过滤条件
        List<Query> list = buildQueryList(request);
        Query query = null;
        if (request.getLikeParams() != null && !request.getLikeParams().isEmpty()) {
            // 使用分值查询,分高的排前面
            query = new BoolQuery.Builder().must(list).build()._toQuery();
        } else {
            // 使用filter不算分查询
            query = new BoolQuery.Builder().filter(list).build()._toQuery();
        }
        // 创建 SearchRequest 并指定索引名称
        SearchRequest searchRequest = new SearchRequest.Builder()
                .index(request.getIndexList())
                .query(query)
                .trackTotalHits(tth -> tth.enabled(true))
                .from((request.getPageIndex() - 1) * request.getPageSize())
                .size(request.getPageSize())
                .build();

        // 打印查询语句,方便排查问题
        System.out.println("search query: " + searchRequest.toString());

        // 执行搜索请求, 第二个参数可以换成自己的返回值对象
        SearchResponse<ObjectNode> response = elasticsearchClient.search(searchRequest, ObjectNode.class);
        List<ObjectNode> collect = response.hits().hits().stream().map(Hit::source).toList();
        EsSearchResult result = new EsSearchResult();
        result.setPageIndex(request.getPageIndex());
        result.setPageSize(request.getPageSize());
        result.setTotal(response.hits().total().value());
        result.setList(collect);
        EsResponseBody responseBody = new EsResponseBody(0, result, null);

        return new ResponseEntity<>(responseBody, HttpStatus.OK);
    }

/**
     * 组装请求参数
     *
     * @param request
     * @return
     */
    private List<Query> buildQueryList(EsSearchRequest request) {
        List<Query> list = new ArrayList<>();
        if (request.getEqualsParams() != null && !request.getEqualsParams().isEmpty()) {
            for (Map.Entry<String, Object> entry : request.getEqualsParams().entrySet()) {
                // 构建 TermQuery 对象
                Query query = new TermQuery.Builder()
                        .field(entry.getKey())
                        .value(entry.getValue().toString())
                        .build()._toQuery();
                list.add(query);
            }
        }
        if (request.getInParams() != null && !request.getInParams().isEmpty()) {
            for (EsInParams params : request.getInParams()) {
                // 构建 TermsQuery 对象
                Query query = new TermsQuery.Builder()
                        .field(params.getField())
                        .terms(t -> t.value(params.getValueList().stream().map(FieldValue::of).toList()))
                        .build()._toQuery();
                list.add(query);
            }
        }
        if (request.getLikeParams() != null && !request.getLikeParams().isEmpty()) {
            for (Map.Entry<String, String> entry : request.getLikeParams().entrySet()) {
                // 构建 MatchQuery 对象
                Query query = new MatchQuery.Builder()
                        .field(entry.getKey())
                        .query(entry.getValue())
                        .build()._toQuery();
                list.add(query);
            }
        }
        if (request.getRangeParams() != null && !request.getRangeParams().isEmpty()) {
            for (EsRangeParams rangeParam : request.getRangeParams()) {
                if (rangeParam.getFieldType() == 1) {
                    Query query = new NumberRangeQuery.Builder()
                            .field(rangeParam.getField())
                            .gte(Double.valueOf(rangeParam.getGte()))
                            .lte(Double.valueOf(rangeParam.getLte()))
                            .build()._toRangeQuery()._toQuery();
                    list.add(query);
                } else {
                    Query query = new DateRangeQuery.Builder()
                            .field(rangeParam.getField())
                            .gte(rangeParam.getGte())
                            .lte(rangeParam.getLte())
                            .build()._toRangeQuery()._toQuery();
                    list.add(query);
                }
            }
        }
        return list;
    }

2.4 聚合数据(分组统计)

聚合没有太多的通用性,按照自己业务的需求写就可以。

下面是按照了城市维度分组,统计了每个城市下面业务类型数量、网络类型数量、sdk版本数量、平台数量、手机型号数量,以及最后一个是带条件过滤在统计的数据平均值。


    @PostMapping("/aggs")
    public ResponseEntity<EsResponseBody> aggs(@RequestBody EsSearchRequest request) throws IOException {

        // 构造查询过滤条件
        List<Query> list = buildQueryList(request);

        Query query = new BoolQuery.Builder().filter(list).build()._toQuery();

        // 创建 SearchRequest 并指定索引名称和其他参数
        SearchRequest searchRequest = new SearchRequest.Builder()
                .index("ssp_ad_union_log")
                .trackTotalHits(tth -> tth.enabled(true))
                .from(0)
                .size(1)
                .query(query)
                .aggregations("cityName_bucket",
                        // 一级聚合
                        a -> a.terms(t -> t.field("cityName").size(10))
                                // 二级聚合
                                .aggregations("bizType_bucket", aa -> aa.terms(ts -> ts.field("bizType").size(10)))
                                .aggregations("network_bucket", aa -> aa.terms(ts -> ts.field("network").size(10)))
                                .aggregations("sdkVersion_bucket", aa -> aa.terms(t3 -> t3.field("sdkVersion").size(10)))
                                .aggregations("platformName_bucket", aa -> aa.terms(t4 -> t4.field("platformName").size(10)))
                                .aggregations("phoneBrandName_bucket", aa -> aa.terms(t5 -> t5.field("phoneBrandName").size(10)))
                                // 带条件二级聚合
                                .aggregations("ecpmAvg_bucket", aa -> aa.filter(f6 -> f6.term(c -> c.field("bizType").value(2))).aggregations("ecpmAvg_bucket", ss -> ss.avg(avg -> avg.field("ecpm"))))
                )
                .build();

        // 打印查询语句,方便排查问题
        System.out.println("search query: " + searchRequest.toString());

        // 执行搜索请求
        SearchResponse<ObjectNode> response = elasticsearchClient.search(searchRequest, ObjectNode.class);
        // 处理结果
        System.out.println(response.aggregations().toString());
        List<StringTermsBucket> buckets = response.aggregations()
                .get("cityName_bucket")
                .sterms()
                .buckets()
                .array();
        EsResponseBody responseBody = new EsResponseBody(0, response.aggregations().toString(), null);
        return new ResponseEntity<>(responseBody, HttpStatus.OK);
    }


http://www.kler.cn/a/580059.html

相关文章:

  • RoboVQA:机器人多模态长范围推理
  • protobuf, rpc, 火焰图
  • Kubernetes(K8s)集群中使用 GPU
  • C++零基础LeetCode热题100- 49.字母异位词分组
  • Linux提权-04 capabilities
  • 《 C++ 点滴漫谈: 三十 》高手写 C++,参数这样传才高效!你真的用对了吗?
  • UI自动化:Selenium常规的页面元素定位方法
  • 【漫话机器学习系列】123.感知机学习(Perceptron Learning)
  • 执行npm install 时,是如何将依赖包下载下来的。
  • 服务器磁盘占用率过高解决方案
  • 修复ubuntu下找不到音频设备的问题
  • docker修改daemon.json文件后无法启动
  • Zemax 中的 CAD 文件性能比较
  • 隧道定向号角喇叭为隧道安全保驾护航
  • 腾讯元宝:AI 时代的快速论文阅读助手
  • Windows 图形显示驱动开发-WDDM 3.2-本机 GPU 围栏对象(五)
  • 深度学习与大模型基础-向量
  • LeetCode 解题思路 14(Hot 100)
  • 探讨消息队列系统:AWS SQS vs. Apache Kafka
  • 华为hcia——Datacom实验指南——三层交换和ARP的工作原理