当前位置: 首页 > article >正文

Elasticsearch检索之三:官方推荐方案search_after检索实现(golang)

Elasticsearch8.17.0在mac上的安装

Kibana8.17.0在mac上的安装

Elasticsearch检索方案之一:使用from+size实现分页

快速掌握Elasticsearch检索之二:滚动查询(scrool)获取全量数据(golang)

1、search_after检索

在前面的文章介绍了from+size的普通分页查询,以及scroll滚动查询获取全量数据,其弥补了from+size只能检索1W条以内的数据的缺憾,但是滚动查询本身也存在缺陷,当es滚动上下文大于500,则无法再次进行检索,此时search_after应运而生,它是带着使命来的。

2、使用Kibana了解search_after使用方法

说明:本地es中的数据共11000条,doc_id字段从1-11000,方便说明问题。

首先构造一个排序查询:

GET /new_tag_202411/_search
{
  "size": 10,
  "sort": [
    {
      "doc_id": {
        "order": "asc"
      }
    }
  ]
}

这个检索请求发出后,返回的数据【doc_id】从1开始,每次10条,也就是返回doc_id从1-10的数据,这里着重列出返回的第10条数据:

为什么列出第10条数据,因此search_after需要第10条(检索出的最后一条)数据的sort字段的值作为输入:

GET /new_tag_202411/_search
{
  "size": 10,
  "sort": [
    {
      "doc_id": {
        "order": "asc"
      }
    }
  ],
  "search_after": [10] // 将第一个检索请求返回的sort字段的值放入此字段
}

这时,检索返回将从doc_id为11的数据开始返回:

 

之后再将本次返回的最后一条数据sort字段放入下一次的检索条件中,继续下一次的检索,从此周而复始,直到检索完全部数据,这个逻辑和scroll滚动查询替换scroll_id的道理是一样的。

注意,使用search_after进行查询时,from必须设置为0或者-1,否则会报错:

3、esbuilder自研dsl库支持search_after字段

我自己开发的esbuilder库之前没有支持search_after字段,因为之前不知道这个功能😄😄,库地址:

github.com/liupengh3c/esbuilder

search_after字段为一个数组,数组类型都为常规的整形、字符串,相对来说比较简单,因此在dsl结构体中直接增加该字段:

type dsl struct {
    QueryDsl    query    `json:"query"`
    Source      []string `json:"_source,omitempty"`
    Size        int64    `json:"size,omitempty"`
    From        int64    `json:"from,omitempty"`
    OrderItems  []query  `json:"sort,omitempty"`
    TrackTotal  bool     `json:"track_total_hits,omitempty"`
    SearchAfter []any    `json:"search_after,omitempty"`
}

之后实现reciver method,支持对该字段进行赋值:

func (dsl *dsl) SetSearchAfter(searchAfter []any) *dsl {
    dsl.SearchAfter = searchAfter
    return dsl
}

本小节的内容与search_after的使用关系不大,看不明白也没关系,可以跳过,如果想了解的话可以留言,看到后我一定会第一时间回复。

4、利用search_after全量检索的代码实现(golang)

第一步构造一个带有排序的检索请求,排序的字段最好是每个文档的值唯一:

第二步,设计死循环进行查询,同时获取每次检索结果最后一条sort字段的值赋值给search_after字段,直接检索出所有数据:

for {
	fmt.Println(dslQuery.BuildJson())
	search := esapi.SearchRequest{
		Index: []string{"new_tag_202411"},
		Body:  strings.NewReader(dslQuery.BuildJson()),
	}
	resp, err = search.Do(context.Background(), client)
	if err != nil {
		fmt.Println("search err:", err.Error())
		return
	}
	err = json.NewDecoder(resp.Body).Decode(&docs)
	if err != nil {
		fmt.Println("decode err:", err.Error())
		return
	}
	if len(docs.Hits.Hits) == 0 {
		fmt.Println("no more data")
		break
	}

	fmt.Println("检索数据数量:", len(docs.Hits.Hits), "doc_id:", docs.Hits.Hits[len(docs.Hits.Hits)-1].Source["doc_id"])
	dslQuery.SetSearchAfter(docs.Hits.Hits[len(docs.Hits.Hits)-1].Sort)
}

其中,最重要的一行代码:

dslQuery.SetSearchAfter(docs.Hits.Hits[len(docs.Hits.Hits)-1].Sort)

这行代码是在每轮次中更新search_after参数,否则无法实现滚动查询的效果,无法检索全量数据。

为了方便观察,我们将size设置为1000,每次检索1000条,增加打印dsl语句以及检索到数据量、doc_id值:

从上面的打印可以看到,search_after的值一直在更新,这样才能达到检索全量的目标,doc_id值的变化也可以从侧面看出整个全量检索的过程,目标达成啦~~~~~。

5、全部实例代码

github地址:

https://github.com/liupengh3c/career/blob/main/elastic/search_after/main.go

代码:

package main
import (
    "context"
    "fmt"
    "os"
    "strings"
    "time"
    "github.com/elastic/go-elasticsearch/v7/esapi"
    "github.com/elastic/go-elasticsearch/v8"
    jsoniter "github.com/json-iterator/go"
    "github.com/liupengh3c/esbuilder"
)
// 最外层数据结构
type Documents struct {
    Shards   Shards      `json:"_shards"`
    Hits     HitOutLayer `json:"hits"`
    TimedOut bool        `json:"timed_out"`
    Took     int         `json:"took"`
}
type Shards struct {
    Failed     int `json:"failed"`
    Skipped    int `json:"skipped"`
    Successful int `json:"successful"`
    Total      int `json:"total"`
}
type HitOutLayer struct {
    Hits     []Hits  `json:"hits"`
    MaxScore float64 `json:"max_score"`
    Total    Total   `json:"total"`
}
type Hits struct {
    ID     string         `json:"_id"`
    Index  string         `json:"_index"`
    Score  float64        `json:"_score"`
    Source map[string]any `json:"_source"`
    Type   string         `json:"_type"`
    Sort   []any          `json:"sort"`
}
type Total struct {
    Relation string `json:"relation"`
    Value    int    `json:"value"`
}
func main() {
    SearchFromSize()
}
func SearchFromSize() {
    st := time.Now()
    defer func() {
        fmt.Println("cost:", time.Since(st).Milliseconds(), "ms")
    }()
    var json = jsoniter.ConfigCompatibleWithStandardLibrary
    docs := Documents{}
    cert, _ := os.ReadFile("/Users/liupeng/Documents/study/elasticsearch-8.17.0/config/certs/http_ca.crt")
    client, err := elasticsearch.NewClient(elasticsearch.Config{
        Username:  "elastic",
        Password:  "xpE4DQGWE9bCkoj7WXYE",
        Addresses: []string{"https://127.0.0.1:9200"},
        CACert:    cert,
    })
    if err != nil {
        fmt.Println("create client err:", err.Error())
        return
    }
    dslQuery := esbuilder.NewDsl()
    boolQuery := esbuilder.NewBoolQuery()
    boolQuery.Filter(esbuilder.NewRangeQuery("doc_id").Gte(1))
    dslQuery.SetQuery(boolQuery)
    dslQuery.SetFrom(0)
    dslQuery.SetSize(1000)
    dslQuery.SetOrder(esbuilder.NewSortQuery("doc_id", "asc"))
    dsl := dslQuery.BuildJson()
    search := esapi.SearchRequest{
        Index: []string{"new_tag_202411"},
        Body:  strings.NewReader(dsl),
    }
    resp, err := search.Do(context.Background(), client)
    if err != nil {
        fmt.Println("search err:", err.Error())
        return
    }
    err = json.NewDecoder(resp.Body).Decode(&docs)
    if err != nil {
        fmt.Println("decode err:", err.Error())
        return
    }
    fmt.Println(docs.Hits.Hits[len(docs.Hits.Hits)-1].Sort)
    dslQuery.SetSearchAfter(docs.Hits.Hits[len(docs.Hits.Hits)-1].Sort)
    for {
        fmt.Println(dslQuery.BuildJson())
        search := esapi.SearchRequest{
            Index: []string{"new_tag_202411"},
            Body:  strings.NewReader(dslQuery.BuildJson()),
        }
        resp, err = search.Do(context.Background(), client)
        if err != nil {
            fmt.Println("search err:", err.Error())
            return
        }
        err = json.NewDecoder(resp.Body).Decode(&docs)
        if err != nil {
            fmt.Println("decode err:", err.Error())
            return
        }
        if len(docs.Hits.Hits) == 0 {
            fmt.Println("no more data")
            break
        }
        fmt.Println("检索数据数量:", len(docs.Hits.Hits), "doc_id:", docs.Hits.Hits[len(docs.Hits.Hits)-1].Source["doc_id"])
        dslQuery.SetSearchAfter(docs.Hits.Hits[len(docs.Hits.Hits)-1].Sort)
    }
}

 


http://www.kler.cn/a/465378.html

相关文章:

  • python.exe无法找到程序入口 无法定位程序输入点(Anaconda Prompt报错)
  • 《学校一卡通管理系统》数据库MySQL的设计与实现
  • 深入理解 pytest Fixture 方法及其应用
  • CSS——2.书写格式一
  • 期末速成C++【大题汇总完】
  • WPS表格技巧01-项目管理中的基本功能-计划和每日记录的对应
  • 【SpringBoot教程】IDEA快速搭建正确的SpringBoot版本和Java版本的项目
  • PCA降维算法详细推导
  • UE5材质节点Camera Vector/Reflection Vector
  • 2024-12-29-sklearn学习(26)模型选择与评估-交叉验证:评估估算器的表现 今夜偏知春气暖,虫声新透绿窗纱。
  • 【MySQL】通过 Binlog 恢复MySQL数据
  • 解决Docker国内网络问题
  • Yeelight易来与Control4达成战略合作,开启智能家居全球战略新篇
  • Servlet中配置和使用过滤器
  • 《Vue3实战教程》40:Vue3安全
  • k8s启动报错
  • 华为仓颉编程语言与医疗领域的深度融合:技术与实践
  • android studio android sdk下载地址
  • matlab 实现了一个基于阵列信号处理的空间角和极化参数估计系统
  • 【2024年-8月-29日-开源社区openEuler实践记录】A - Ops:智能运维新时代的开源利器
  • Linux centos7 docker安装 (yum快速安装)
  • 深入剖析 Java HashMap
  • 跟着逻辑先生学习FPGA-实战篇第二课 6-2 LED灯流水灯实验
  • 为什么最好吧css的link标签放在head之间?
  • java进阶:seata分布式事务未生效问题排查纪实|主事务回滚成功,分支事务未回滚
  • C# 设计模式(创建型模式):建造者模式