当前位置: 首页 > article >正文

分布式搜索引擎Elasticsearch(三)

一、数据聚合

1. 概述

聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见有三类:

桶(Bcket)聚合:用来对文档做分组

  • TermAggregation: 按照文档字段值分组
  • Date Histogram: 按照日期阶梯分组,例如一周一组,或者一月为一组

度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等

  • Avg:求平均值
  • Max:求最大值
  • Min:求最小值
  • Stats:同时求max、min、avg、sum 等

管道(pipeline)聚合:结果为基础做聚合

注意:参与聚合的字段必须是:keyword、数值、日期、布尔

 2. Bucket聚合

2.1 DSL 实现 Bucket聚合

统计所有数据中的酒店品牌有几种,此时可以根据酒店的名称做聚合。类型为 term类型。DSL实例如下。

GET  /hotel/_search

{

  "size"0,   // 设置size0,结果中不包含文档,只包含聚合结果

  "aggs": { // 定义聚合

    "brandAgg": { //给聚合起个名字

      "terms": { // 聚合的类型,按照品牌值聚合,所以选择term

        "field""brand"// 参与聚合的字段

        "size"20 // 希望获取的聚合结果数量

      }

    }

  }

}

2.2 Bucket聚合-聚合结果排序

默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序。我们可以修改结果排序方式:

GET /hotel/_search

{

  "size"0

  "aggs": {

    "brandAgg": {

      "terms": {

        "field""brand",

        "order": {

          "_count""asc" // 按照_count升序排列

        },

        "size"20

      }

    }

  }

}

2.3 Bucket聚合-限定聚合范围

默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可

GET /hotel/_search

{

  "query": {

    "range": {

      "price": {

        "lte"200 // 只对200元以下的文档聚合

      }

    }

  }, 

  "size"0

  "aggs": {

    "brandAgg": {

      "terms": {

        "field""brand",

        "size"20

      }

    }

  }

}

2.4 小结

aggs代表聚合,与query同级,此时query的作用是 限定聚合的的文档范围

聚合必须的三要素:聚合名称聚合类型聚合字段

聚合可配置属性有:

  • size指定聚合结果数量
  • order指定聚合结果排序方式
  • field指定聚合字段

3. Metrics 聚合

3.1 DSL实现 Metrics 聚合

获取每个品牌的用户评分的min、max、avg等值。我们可以利用 stats 聚合:

GET /hotel/_search

{

  "size"0

  "aggs": {

    "brandAgg": { 

      "terms": { 

        "field""brand", 

        "size"20

      },

      "aggs": { // brands聚合的子聚合,也就是分组后对每组分别计算

        "score_stats": { // 聚合名称

          "stats": { // 聚合类型,这里stats可以计算minmaxavg

            "field""score" // 聚合字段,这里是score

          }

        }

      }

    }

  }

}

 4. RestAPI 实现聚合

以品牌聚合为例,演示下Java的RestClient使用,下图是DSL 语句和 java代码的对应关系。

请求组装

java 代码如下 

@Test
    void testAggregation() throws IOException {
        // 1. 准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2. 准备 DSL
        // 2.1 设置 size
        request.source().size(0);
        //2.2 聚合
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg")
                .field("brand")
                .size(10)
        );
        // 3 发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4. 解析结果
        System.out.println(response);
    }

得到的结果是个 JSON 格式的数据,下面就要对聚合结果解析

聚合结果解析

java 代码块

// 4. 解析结果
        Aggregations aggregations = response.getAggregations();
        // 4.1 根据集合名称获取聚合结果
        Terms brandTerms = aggregations.get("brandAgg");
        // 4.2 获取buckets 
        List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
        // 4.3 遍历
        for (Terms.Bucket bucket : buckets) {
            // 获取 key
            String key = bucket.getKeyAsString();
            System.out.println(key);
        }

整体代码

public class HotelSearchTest {
    private RestHighLevelClient client;

    // 客户端初始化
    @BeforeEach
    void setUp() {
        this.client = new RestHighLevelClient(RestClient.builder(
                HttpHost.create("http://192.168.30.130:9200")  // 服务器IP + ES 端口
        ));
    }

    @Test
    void testAggregation() throws IOException {
        // 1. 准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2. 准备 DSL
        // 2.1 设置 size
        request.source().size(0);
        //2.2 聚合
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg")
                .field("brand")
                .size(10)
        );
        // 3 发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4. 解析结果
        Aggregations aggregations = response.getAggregations();
        // 4.1 根据集合名称获取聚合结果
        Terms brandTerms = aggregations.get("brandAgg");
        // 4.2 获取buckets 
        List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
        // 4.3 遍历
        for (Terms.Bucket bucket : buckets) {
            // 获取 key
            String key = bucket.getKeyAsString();
            System.out.println(key);
        }
    }


    @AfterEach
    void tearDown() throws IOException {
        this.client.close();
    }

}

结果

二、自动补全

在搜索框输入字符时,我们应该提示出与该字符有关的搜索项,如电商网站搜索

1 安装拼音分词插件 

要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址 https://github.com/medcl/elasticsearch-analysis-pinyin

安装方式与IK分词器一样,分三步:

  1. 解压
  2. 上传到虚拟机中,elasticsearch的plugin目录
  3. 重启elasticsearch
  4. 测试 

 POST /_analyze

{

  "text""如家酒店",     // 分词内容

  "analyzer""pinyin"    // 哪种分词器

}

如果是用的汉字,则使用的是ik分词器;拼音分词器使用的是pinyin分词器。

2 自定义分词器 

elasticsearch中分词器(analyzer)的组成包含三部分:

  1. character filters(字符过滤器):分词之前对词条进行处理。例如删除字符、替换字符。 
  2. tokenizer(分词器):指定分词器。
  3. tokenizer filter(过滤器):对分词后的词条进行处理。例如大小写转换、同义词处理、拼音处理等。

 在创建索引库时,通过settings来配置自定义的analyzer(分词器)自定义分词器只能在当前索引下使用

PUT /test    // 请求路径,创建一个名为 test 的索引

{

  "settings": {    // 设置索引的参数,用于配置索引的分析器和过滤器

    "analysis": {   // 分析器和过滤器的设置

      "analyzer": { // 分析器的设置,可以自定义分词器名称

        "my_analyzer": {  // 自定义分词器名称

          "tokenizer""ik_max_word",  // 分析器使用的分词器,这里使用IK分词器,可以将中文切分成单个单词。

          "filter""py"      // 定义过滤器,这里定义一个名为py的过滤器

        }

      },

      "filter": { // 过滤器的配置

        "py": { // 过滤器名称,可自定义

          "type""pinyin",   // 过滤器类型,这里是pinyin,表示把中文转换成拼音

          "keep_full_pinyin"false,    // 拼音转换后是否保留完整拼音之间的设置

          "keep_joined_full_pinyin"true,   // 拼音转换后是否保留完整拼音之间连字符的设置

          "keep_original"true,   // 是否保留原始文本的设置

          "limit_first_letter_length"16,      // 拼音转换后首字母的长度限制

          "remove_duplicated_term"true,  // 是否去除重复结果的设置

          "none_chinese_pinyin_tokenize"false  // 是否对非中文文本进行拼音切分的设置

        }

      }

    }

  }

}

如果想在其他地方使用自定义分词器,需要指定自定义索引名。

POST /test/_analyze    // test是自定义的索引名   

{

    "text": ["如家酒店还不错"],

    "analyzer": "my_analyzer"     // 自定义分词器

}

拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。因为当查询的时候,会把所有同音字的数据都返回。

解决方案:创建索引时,用自定义分词器,搜索时用ik分词器。

PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { 
        "my_analyzer": { 
          "tokenizer": "ik_max_word",
          "filter": "py"
        }
      },
      "filter": {
        "py": { 
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "name":{
        "type": "text",
        "analyzer": "my_analyzer",
        "search_analyzer": "ik_smart"
      }
    }
  }
}

3. 自动补全

elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型必须是 completion 类型。字段里内容就是补全的多个词条

3.1 completion suggest 查询补全数据

// 创建索引库
PUT 索引库名

{

  "mappings": {

    "properties": {

      "title":{

        "type""completion"

      }

    }

  }

}

示例数据

// 示例数据

POST test/_doc

{

  "title": ["Sony""WH-1000XM3"]

}

POST test/_doc

{

  "title": ["SK-II""PITERA"]

}

POST test/_doc

{

  "title": ["Nintendo""switch"]

}

自动补全查询语法

// 自动补全查询

GET /索引库名/_search

{

  "suggest": {   // 实现搜索建议功能

    "title_suggest": {    // 建议查询的名称,可以自定义

      "text""s",   // 用户输入的搜素内容,这里表示用户输入的内容是 “s"

      "completion": {   // 建议查询的类型,表示 通过补全的方式提供搜索建议

        "field""title"// 对哪个字段名称进行补全建议,表示 对 title 字段进行建议

        "skip_duplicates"true// 可选参数,是否跳过重复的结果

        "size"10 // 可选参数,获取前10条结果

      }

    }

  }

}

3.2 RestAPI 自动补全

来看一下 java 代码 与 DSL 对应关系。

发送请求代码片段

@Test
    void testSuggest() throws IOException {
        // 1. 准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2. 准备 DSL 
        request.source().suggest(new SuggestBuilder().addSuggestion(
                "suggestions",
                SuggestBuilders.completionSuggestion("suggestion")
                        .prefix("h")
                        .skipDuplicates(true)
                        .size(10)
        ));
        // 3. 发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4. 解析结果
    }

结果解析代码片段

结果解析代码片段

// 4. 解析结果
        Suggest suggest = response.getSuggest();
        // 4.1 根据名称获取补全结果, suggest.getSuggestion 的name 属性是 DSL 语句定义的名称
        CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
        // 4.2 获取 options
        List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
        // 4.3 遍历
        for (CompletionSuggestion.Entry.Option option : options) {
            String text = option.getText().toString();
            System.out.println(text);
        }

完整代码

package cn.itcast.hotel;

import cn.itcast.hotel.pojo.HotelDoc;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggester;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;


public class HotelSearchTest {
    private RestHighLevelClient client;

    // 客户端初始化
    @BeforeEach
    void setUp() {
        this.client = new RestHighLevelClient(RestClient.builder(
                HttpHost.create("http://192.168.30.130:9200")  // 服务器IP + ES 端口
        ));
    }

    @Test
    void testSuggest() throws IOException {
        // 1. 准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2. 准备 DSL
        request.source().suggest(new SuggestBuilder().addSuggestion(
                "suggestions",
                SuggestBuilders.completionSuggestion("suggestion")
                        .prefix("h")
                        .skipDuplicates(true)
                        .size(10)
        ));
        // 3. 发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4. 解析结果
        Suggest suggest = response.getSuggest();
        // 4.1 根据名称获取补全结果, suggest.getSuggestion 的name 属性是 DSL 语句定义的名称
        CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
        // 4.2 获取 options
        List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
        // 4.3 遍历
        for (CompletionSuggestion.Entry.Option option : options) {
            String text = option.getText().toString();
            System.out.println(text);
        }

    }

    @AfterEach
    void tearDown() throws IOException {
        this.client.close();
    }

}

三、数据同步

1 数据同步的三种方式

elasticsearch中的数据来自数据库,当数据库中数据发生变化时,elasticsearch 也必须跟着改变,这个就是es 和数据库之间的数据同步

在微服务中,负责操作数据库的业务和操作es的业务可能在两个不同的微服务上,如何实现数据同步呢?

方案一同步调用

操作完数据库,在调用搜索的微服务模块,由搜索的微服务模块更新es。

优点:实现简单

缺点:耦合度高,两个微服务模块之间需要调用;性能低,需要依次执行各步骤。

方式二异步通知

数据更新到数据库后,发送一条消息,由搜索的微服务监听发送的消息,最后更新es。

优点:耦合度低

缺点:依赖于消息中间件,考验中间件的可靠性,如MQ。

方式三监听binlog

每当操作数据库时,都会把对应的操作记录在binlog中,通过中间件canal监听binlog,当数据发生变化时,就会通知搜索模块的微服务更新es。

优点:完全解除微服务耦合

缺点:开启binlog增加数据库负担,实现复杂度高。

四、elasticsearch集群

1 概述

单机的es做数据存储,面临海量数据存储和单点故障问题。

  • 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点.
  • 单点故障问题:将分片数据在不同节点备份

2. 搭建集群

2.1 创建es集群

在单机上测试可以利用docker容器运行多个es实例来模拟es集群。但是在生产环境推荐每台服务器仅部署一个es的实例。部署es集群可以直接使用docker-compose来完成。但要求Linux虚拟机至少有4G的内存空间。

2.1 创建es集群,这里利用3个docker容器搭建3个es集群,编写一个docker-compose.yml文件。作用是一个文件部署多个es实例,达到一键启动的效果。

version: '2.2'
services:
  es01:
    image: elasticsearch:7.12.1           #镜像
    container_name: es01                #容器名称
    environment:                #环境变量
      - node.name=es01       #节点名称,每个容器都有独一无二的名称
      - cluster.name=es-docker-cluster    # 集群名称,es会把所有集群名称一致的节点自动组装成同一个集群中
      - discovery.seed_hosts=es02,es03    # 集群中另外两个节点的ip地址。这里用的docker容器,容器互联可以直接用集群名称
      - cluster.initial_master_nodes=es01,es02,es03       # 初始化的主节点。主节点是需要选举的,这里表示有哪些节点可以是候选选举的
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"     #配置JVM堆内存的内存大小,最小内存和最大内存配置为512兆
    volumes:       #数据卷挂载
      - data01:/usr/share/elasticsearch/data
    ports:       # 端口映射
      - 9200:9200
    networks:       # 加入一个名为 elastic 的网络中
      - elastic
  es02:
    image: elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data02:/usr/share/elasticsearch/data
    ports:
      - 9201:9200
    networks:
      - elastic
  es03:
    image: elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data03:/usr/share/elasticsearch/data
    networks:
      - elastic
    ports:
      - 9202:9200
volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge

2.2 修改 /etc/sysctl.conf 文件。es运行需要修改一些linux系统权限

vim /etc/sysctl.conf

2.3 添加下面的内容

vm.max_map_count=262144

2.4 然后执行命令,让配置生效:

sysctl -p

2.5 通过docker-compose启动集群:

docker-compose up -d

注:如果显示 docker-compose: 未找到命令...,需要安装docker-compose

下载安装docker-compose 

curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
curl -L "https://mirror.ghproxy.com/https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

修改目录权限 

chmod  +x /usr/local/bin/docker-compose

创建软链接

ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose

查看版本

docker-compose --version

2.3 集群监控状态

kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。

这里推荐使用 cerebro 来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro

解压好的目录如下:

进入bin目录,双击启动 cerbro.bat 脚本文件 

输入服务器ip+cerebro端口,即可进入管理界面:如 http://192.168.30.130:9000/ 

在Node address 输入框输入 http://服务器ip+节点端口,点击Connect即可。如http://192.168.30.130:9201 

绿色的条,代表集群处于健康状态

2.3 创建索引库

2.3.1 利用kibana的DevTools创建索引库

在DevTools中输入指令:

PUT /itcast
{
  "settings": {
    "number_of_shards": 3, // 分片数量,默认是1,不分片。这里分3片
    "number_of_replicas": 1 // 副本数量,给每个分片创建一个副本
  },
  "mappings": {
    "properties": {
      // mapping映射定义 ...
    }
  }
}

 

3. 集群职责及脑裂

elasticsearch中集群节点有不同的职责划分

master eligible节点的作用是什么?

  • 1. 参与集群选主
  • 2. 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求

data节点的作用是什么?

1. 数据的CRUD

coordinator节点的作用是什么?

1. 路由请求到其它节点

2. 合并查询到的结果,返回给用户

 

默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。 

脑裂:一个集群出现了两个节点。

选取主节点:floor(N / 2) + 1 , N为节点数量,即节点数量除二加一的值向下取整。

discovery.zen.minimum_master_nodes:这是一个非常重要的配置参数,用于防止脑裂问题。它指定了在选举过程中需要的最少主节点候选者数量。推荐设置为 (master_eligible_nodes / 2) + 1,其中 master_eligible_nodes 是候选主节点的数量。

node.master:此设置决定了一个节点是否有资格参与主节点选举。设置为 true 的节点可以成为主节点;设置为 false 的节点不能成为主节点。

3. 分布式新增和查询流程

当新增文档时,应该保存到不同分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?

elasticsearch会通过hash算法来计算文档应该存储到哪个分片:

// 确定分片算法

shard  =  hash(_routing)  %  number_of_shards

说明

_routing 默认是文档的 id

number_of_shards 是分片数量。算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!

 新增文档流程

查询流程 

elasticsearch的查询分成两个阶段:

• scatter phase:分散阶段,coordinating node会把请求分发到每一个分片

• gather phase:聚集阶段,将查询结果汇总到coordinating node ,整理并返回给用户

 4. 故障转移

集群中的主节点会监控集群中其他的节点,如果有节点发生了宕机,会把宕机的分片数据迁移到其他的节点上。如果是主节点发生宕机,会重新选取主节点,然后由新的主节点把宕机的分片迁移到其他节点上,确保数据安全,这就是故障转移。

如下图,假设node1是主节点宕机了,重新选取主节点为node2,。此时,node2会去查看宕机节点的分片,即 p-0 和 R - 1两个分片,把宕机节点的分片迁移到 node 2 或 node 3上。

in


http://www.kler.cn/a/429645.html

相关文章:

  • Windows service运行Django项目
  • LabVIEW智能水肥一体灌溉控制系统
  • Go语言之路————func
  • 基于springboot+vue的 嗨玩-旅游网站
  • 【MySQL】SQL菜鸟教程(一)
  • rk3568 , buildroot , qt ,使用sqlite, 动态库, 静态库
  • 告别充电焦虑:移动充电机器人的革命性解决方案
  • 3D 目标检测:从萌芽到前沿的技术演进之路
  • linux间隔记录服务器的CPU占用率TOP10的进程
  • 使用Nexus3搭建Maven私有镜像仓库
  • Burp(8)-验证码爆破插件
  • 基于FPGA的PI环调节电压
  • Xcode编译的时候运行python脚本
  • 计算机网络——三大交换技术
  • 准确率99.9%的离线IP地址定位库 ip2region - python 示例
  • GauHuman阅读笔记【3D Human Modelling】
  • 知从科技闪耀汽车智能底盘大会:共探软件安全新篇章
  • ElasticSearch常见面试题汇总
  • 《maven 常用知识详解 3:聚合与继承》
  • Blender均匀放缩模型
  • 第8章:CSS预处理器 --[CSS零基础入门]
  • vue3 实现音频转文字组件
  • 深入理解 React 状态管理:useState 和 useReducer
  • java工具包介绍
  • FCOS: Fully Convolutional One-Stage Object Detection——全卷积一阶段目标检测
  • 【CMD、PowerShell和Bash设置代理】