当前位置: 首页 > article >正文

折腾 Quickwit,Rust 编写的分布式搜索引擎 - 从不同的来源摄取数据

image

摄取 API

在这节教程中,我们将介绍如何使用 Ingest API 向 Quickwit 发送数据。

要跟随这节教程,您需要有一个本地的 Quickwit 实例正在运行。

  • https://quickwit.io/docs/get-started/installation

要启动它,请在终端中运行 ./quickwit run

创建索引

首先,我们创建一个无模式的索引。

# Create the index config file.
cat << EOF > stackoverflow-schemaless-config.yaml
version: 0.7
index_id: stackoverflow-schemaless
doc_mapping:
  mode: dynamic
indexing_settings:
  commit_timeout_secs: 30
EOF
# Use the CLI to create the index...
./quickwit index create --index-config stackoverflow-schemaless-config.yaml
# Or with cURL.
curl -XPOST -H 'Content-Type: application/yaml' 'http://localhost:7280/api/v1/indexes' --data-binary @stackoverflow-schemaless-config.yaml

摄取数据

让我们先下载 StackOverflow 数据集的一个样本。

  • https://www.kaggle.com/stackoverflow/stacksample
# Download the first 10_000 Stackoverflow posts articles.
curl -O https://quickwit-datasets-public.s3.amazonaws.com/stackoverflow.posts.transformed-10000.json

您可以使用命令行界面或 cURL 来发送数据。命令行界面对于发送几 GB 的数据更为方便,因为当 Ingest 队列已满时,Quickwit 可能会返回 429 响应。在这种情况下,Quickwit 命令行界面将自动重试发送。

# Ingest the first 10_000 Stackoverflow posts articles with the CLI...
./quickwit index ingest --index stackoverflow-schemaless --input-path stackoverflow.posts.transformed-10000.json --force

# OR with cURL.
curl -XPOST -H 'Content-Type: application/json' 'http://localhost:7280/api/v1/stackoverflow-schemaless/ingest?commit=force' --data-binary @stackoverflow.posts.transformed-10000.json

执行搜索查询

现在您可以对索引进行搜索了。

curl 'http://localhost:7280/api/v1/stackoverflow-schemaless/search?query=body:python'

清理源(可选)

curl -XDELETE 'http://localhost:7280/api/v1/indexes/stackoverflow-schemaless'

至此完成了教程。现在您可以继续阅读下一教程。

本地文件

在这节教程中,我们将介绍如何使用 Quickwit 命令行界面来索引本地文件。

要跟随这节教程,您需要有Quickwit 二进制文件。

  • https://quickwit.io/docs/main-branch/get-started/installation

创建索引

首先,我们创建一个无模式的索引。我们需要仅为了创建索引而启动 Quickwit 服务器,因此我们将启动它并在之后关闭它。

启动 Quickwit server。

./quickwit run

在另一个终端中创建索引。

# Create the index config file.
cat << EOF > stackoverflow-schemaless-config.yaml
version: 0.7
index_id: stackoverflow-schemaless
doc_mapping:
  mode: dynamic
indexing_settings:
  commit_timeout_secs: 30
EOF

./quickwit index create --index-config stackoverflow-schemaless-config.yaml

现在您可以通过在第一个终端中按下 Ctrl+C 来关闭服务器。

摄取文件

要发送文件,只需执行以下命令:

./quickwit tool local-ingest --index stackoverflow-schemaless --input-path stackoverflow.posts.transformed-10000.json

几秒钟后,您应该能看到以下输出:

❯ Ingesting documents locally...

---------------------------------------------------
 Connectivity checklist
 ✔ metastore
 ✔ storage
 ✔ _ingest-cli-source

 Num docs   10000 Parse errs     0 PublSplits   1 Input size     6MB Thrghput  3.34MB/s Time 00:00:02
 Num docs   10000 Parse errs     0 PublSplits   1 Input size     6MB Thrghput  2.23MB/s Time 00:00:03
 Num docs   10000 Parse errs     0 PublSplits   1 Input size     6MB Thrghput  1.67MB/s Time 00:00:04

Indexed 10,000 documents in 4s.
Now, you can query the index with the following command:
quickwit index search --index stackoverflow-schemaless --config ./config/quickwit.yaml --query "my query"
Clearing local cache directory...
✔ Local cache directory cleared.
✔ Documents successfully indexed.

支持像 s3://mybucket/mykey.json 这样的对象存储 URI 作为 --input-path,前提是您的环境配置了适当的权限。

清理源(可选)

就这样!现在您可以清理创建的源。您可以通过运行以下命令来完成:

./quickwit run

在另一个终端中:

./quickwit index delete --index-id stackoverflow-schemaless

至此完成了教程。现在您可以继续阅读下一教程。

Kafka

在这节教程中,我们将介绍如何在几分钟内设置 Quickwit 以从 Kafka 摄取数据。首先,我们将创建一个索引并配置 Kafka 源。然后,我们将创建一个 Kafka 主题并将一些事件从 GH Archive 加载到其中。最后,我们将执行一些搜索和聚合查询来探索新发送的数据。

  • https://www.gharchive.org/

前提条件

要完成这篇教程,您需要以下条件:

  • 正在运行的 Kafka 集群(参见 Kafka 快速入门)
    • https://kafka.apache.org/quickstart
  • 本地安装的 Quickwit 指南
    • https://quickwit.io/docs/main-branch/get-started/installation

创建索引

首先,我们创建一个新的索引。以下是与 GH Archive 事件模式对应的索引配置和文档映射:

#
# Index config file for gh-archive dataset.
#
version: 0.7

index_id: gh-archive

doc_mapping:
  field_mappings:
    - name: id
      type: text
      tokenizer: raw
    - name: type
      type: text
      fast: true
      tokenizer: raw
    - name: public
      type: bool
      fast: true
    - name: payload
      type: json
      tokenizer: default
    - name: org
      type: json
      tokenizer: default
    - name: repo
      type: json
      tokenizer: default
    - name: actor
      type: json
      tokenizer: default
    - name: other
      type: json
      tokenizer: default
    - name: created_at
      type: datetime
      fast: true
      input_formats:
        - rfc3339
      fast_precision: seconds
  timestamp_field: created_at

indexing_settings:
  commit_timeout_secs: 10

执行这些 Bash 命令来下载索引配置并创建 gh-archive 索引:

# Download GH Archive index config.
wget -O gh-archive.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/index-config.yaml

# Create index.
./quickwit index create --index-config gh-archive.yaml

创建并填充 Kafka topic

现在,我们创建一个 Kafka 主题并将一些事件加载到其中。

# Create a topic named `gh-archive` with 3 partitions.
bin/kafka-topics.sh --create --topic gh-archive --partitions 3 --bootstrap-server localhost:9092

# Download a few GH Archive files.
wget https://data.gharchive.org/2022-05-12-{10..15}.json.gz

# Load the events into Kafka topic.
gunzip -c 2022-05-12*.json.gz | \
bin/kafka-console-producer.sh --topic gh-archive --bootstrap-server localhost:9092

创建 Kafka 源

这篇教程假设 Kafka 集群在默认端口(9092)上本地可用。
如果情况并非如此,请相应地更新 bootstrap.servers 参数。

#
# Kafka source config file.
#
version: 0.8
source_id: kafka-source
source_type: kafka
num_pipelines: 2
params:
  topic: gh-archive
  client_params:
    bootstrap.servers: localhost:9092

运行这些命令来下载源配置文件并创建源。

# Download Kafka source config.
wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/kafka-source.yaml

# Create source.
./quickwit source create --index gh-archive --source-config kafka-source.yaml

如果您遇到以下错误:

Command failed: Topic `gh-archive` has no partitions.

这意味着 Kafka 主题 gh-archive 在前一步骤中未正确创建。

启动索引和搜索服务

最后,执行此命令以服务器模式启动 Quickwit。

# Launch Quickwit services.
./quickwit run

在幕后,这个命令会启动一个索引器和一个搜索器。启动时,索引器将连接到由源指定的 Kafka 主题,并开始从组成主题的分区流式处理和索引事件。使用默认的提交超时值(参见 索引设置),索引器应在大约 60 秒后发布第一个分片。

  • https://quickwit.io/docs/configuration/index-config#indexing-settings

您可以在另一个 shell 中运行此命令来检查索引的属性并查看当前发布的分片数量:

# Display some general information about the index.
./quickwit index describe --index gh-archive

一旦发布了第一个分片,您就可以开始运行搜索查询。例如,我们可以找到所有关于 Kubernetes 仓库的事件:

  • https://github.com/kubernetes/kubernetes
curl 'http://localhost:7280/api/v1/gh-archive/search?query=org.login:kubernetes%20AND%20repo.name:kubernetes'

也可以通过 Quickwit 用户界面 访问这些结果。

  • http://localhost:7280/ui/search?query=org.login%3Akubernetes+AND+repo.name%3Akubernetes&index_id=gh-archive&max_hits=10

我们还可以按类型对这些事件进行分组并计数它们:

curl -XPOST -H 'Content-Type: application/json' 'http://localhost:7280/api/v1/gh-archive/search' -d '
{
  "query":"org.login:kubernetes AND repo.name:kubernetes",
  "max_hits":0,
  "aggs":{
    "count_by_event_type":{
      "terms":{
        "field":"type"
      }
    }
  }
}'

安全的 Kafka 连接(可选)

Quickwit 的 Kafka 源支持 SSL 和 SASL 身份验证。这对于从外部 Kafka 服务消费数据特别有用。

证书和密钥文件必须存在于所有 Quickwit 节点上,以便创建 Kafka 源并成功运行索引管道。

SSL 配置
version: 0.8
source_id: kafka-source-ssl
source_type: kafka
num_pipelines: 2
params:
  topic: gh-archive
  client_params:
    bootstrap.servers: your-kafka-broker.com
    security.protocol: SSL
    ssl.ca.location: /path/to/ca.pem
    ssl.certificate.location: /path/to/service.cert
    ssl.key.location: /path/to/service.key
SASL 配置
version: 0.8
source_id: kafka-source-sasl
source_type: kafka
num_pipelines: 2
params:
  topic: gh-archive
  client_params:
    bootstrap.servers: your-kafka-broker.com
    ssl.ca.location: /path/to/ca.pem
    security.protocol: SASL_SSL
    sasl.mechanisms: SCRAM-SHA-256
    sasl.username: your_sasl_username
    sasl.password: your_sasl_password
如果您遇到以下错误:

Client creation error: ssl.ca.location failed: error:05880002:x509 certificate routines::system lib

通常意味着 CA 证书的路径不正确。请相应地更新 ssl.ca.location 参数。

清理源(可选)

让我们删除为这篇教程创建的文件和源。

# Delete Kafka topic.
bin/kafka-topics.sh --delete --topic gh-archive --bootstrap-server localhost:9092

# Delete index.
./quickwit index delete --index gh-archive

# Delete source config.
rm kafka-source.yaml

至此完成了教程。如果您有关于 Quickwit 的任何问题或遇到任何问题,请不要犹豫,在 GitHub 上提出 问题 或打开 问题报告,或者直接在 Discord 上联系我们。

  • https://github.com/quickwit-oss/quickwit
  • https://github.com/quickwit-oss/quickwit/discussions
  • https://github.com/quickwit-oss/quickwit/issues
  • https://discord.com/invite/MT27AG5EVE

Pulsar

在这节教程中,我们将介绍如何在几分钟内设置 Quickwit 以从 Pulsar 摄取数据。首先,我们将创建一个索引并配置 Pulsar 源。然后,我们将创建一个 Pulsar topic 并将一些事件从 Stack Overflow 数据集 加载到其中。最后,我们将执行一些搜索。

  • https://www.kaggle.com/stackoverflow/stacksample

前提条件

要完成这篇教程,您需要以下条件:

  • 本地运行的 Quickwit 实例
    • https://quickwit.io/docs/main-branch/get-started/installation
  • 本地运行的 Pulsar 实例
    • https://pulsar.apache.org/docs/next/getting-started-standalone/
Quickwit 设置

下载 Quickwit 并启动一个服务器。然后打开一个新的终端,使用同一个二进制文件执行 CLI 命令。

  • https://quickwit.io/docs/main-branch/get-started/installation
./quickwit run

测试集群是否正在运行:

./quickwit index list
Pulsar 设置

Local

wget https://archive.apache.org/dist/pulsar/pulsar-2.11.0/apache-pulsar-2.11.0-bin.tar.gz
tar xvfz apache-pulsar-2.11.0-bin.tar.gz
cd apache-pulsar-2.11.0
bin/pulsar standalone

Docker

docker run -it -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:2.11.0 bin/pulsar standalone

请参阅 官方文档 的详细信息。

  • https://pulsar.apache.org/docs/next/getting-started-docker/

准备 Quickwit

首先,我们创建一个新的索引。以下是与 Stack Overflow 帖子模式对应的索引配置和文档映射:

#
# Index config file for Stack Overflow dataset.
#
version: 0.7

index_id: stackoverflow

doc_mapping:
  field_mappings:
    - name: user
      type: text
      fast: true
      tokenizer: raw
    - name: tags
      type: array<text>
      fast: true
      tokenizer: raw
    - name: type
      type: text
      fast: true
      tokenizer: raw
    - name: title
      type: text
      tokenizer: default
      record: position
      stored: true
    - name: body
      type: text
      tokenizer: default
      record: position
      stored: true
    - name: questionId
      type: u64
    - name: answerId
      type: u64
    - name: acceptedAnswerId
      type: u64
    - name: creationDate
      type: datetime
      fast: true
      input_formats:
        - rfc3339
      fast_precision: seconds
  timestamp_field: creationDate

search_settings:
  default_search_fields: [title, body]

indexing_settings:
  commit_timeout_secs: 10

执行这些 Bash 命令来下载索引配置并创建 stackoverflow 索引。

# Download stackoverflow index config.
wget -O stackoverflow.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/stackoverflow/index-config.yaml

# Create index.
./quickwit index create --index-config stackoverflow.yaml

创建 Pulsar 源

Pulsar 源只需要定义主题列表和实例地址。

#
# Pulsar source config file.
#
version: 0.7
source_id: pulsar-source
source_type: pulsar
params:
  topics:
    - stackoverflow
  address: pulsar://localhost:6650

运行这些命令来下载源配置文件并创建源。

# Download Pulsar source config.
wget -O stackoverflow-pulsar-source.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/stackoverflow/pulsar-source.yaml

# Create source.
./quickwit source create --index stackoverflow --source-config stackoverflow-pulsar-source.yaml

一旦创建了 Pulsar 源,Quickwit 控制平面将请求索引器启动一个新的索引管道。您可以在索引器上看到类似下面的日志:

INFO spawn_pipeline{index=stackoverflow gen=0}:pulsar-consumer{subscription_name="quickwit-stackoverflow-pulsar-source" params=PulsarSourceParams { topics: ["stackoverflow"], address: "pulsar://localhost:6650", consumer_name: "quickwit", authentication: None } current_positions={}}: quickwit_indexing::source::pulsar_source: Seeking to last checkpoint positions. positions={}

创建并填充 Pulsar topic

我们将使用 Pulsar 的默认租户/命名空间 public/default。为了填充主题,我们将使用一个 Python 脚本:

import json
import pulsar

client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('public/default/stackoverflow')

with open('stackoverflow.posts.transformed-10000.json', encoding='utf8') as file:
   for i, line in enumerate(file):
       producer.send(line.encode('utf-8'))
       if i % 100 == 0:
           print(f"{i}/10000 messages sent.", i)

client.close()

安装本地 Python 客户端,更多详情请参阅 文档页面:

  • https://pulsar.apache.org/docs/2.11.x/client-libraries-python/
# Download the first 10_000 Stackoverflow posts articles.
curl -O https://quickwit-datasets-public.s3.amazonaws.com/stackoverflow.posts.transformed-10000.json

# Install pulsar python client.
# Requires a python version < 3.11
pip3 install 'pulsar-client==2.10.1'
wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/stackoverflow/send_messages_to_pulsar.py
python3 send_messages_to_pulsar.py

开始搜索!

您可以运行此命令来检查索引的属性并查看当前发布的分片和文档数量:

# Display some general information about the index.
./quickwit index describe --index stackoverflow

您将特别注意到发布的文档数量。

现在您可以执行一些查询了。

curl 'http://localhost:7280/api/v1/stackoverflow/search?query=search+AND+engine'

如果您的 Quickwit 服务器是本地的,您可以通过 Quickwit UI 访问结果,网址为 localhost:7280。

  • http://localhost:7280/ui/search?query=&index_id=stackoverflow&max_hits=10

清理源(可选)

让我们删除为这篇教程创建的文件和源。

# Delete quickwit index.
./quickwit index delete --index stackoverflow --yes
# Delete Pulsar topic.
bin/pulsar-admin topics delete stackoverflow

至此完成了教程。如果您有关于 Quickwit 的任何问题或遇到任何问题,请不要犹豫,在 GitHub 上提出 问题 或打开 问题报告,或者直接在 Discord 上联系我们。

  • https://github.com/quickwit-oss/quickwit
  • https://github.com/quickwit-oss/quickwit/discussions
  • https://github.com/quickwit-oss/quickwit/issues
  • https://discord.com/invite/MT27AG5EVE

Kinesis

在这节教程中,我们将介绍如何在几分钟内设置 Quickwit 以从 Kinesis 摄取数据。首先,我们将创建一个索引并配置 Kinesis 源。然后,我们将创建一个 Kinesis 流并将一些事件从 GH Archive 加载到其中。最后,我们将执行一些搜索和聚合查询来探索新发送的数据。

  • https://www.gharchive.org/

在这篇教程中使用 Amazon Kinesis 服务会产生一些费用。

前提条件

要完成这篇教程,您需要以下条件:

  • AWS CLI 版本 2(参见 开始使用 AWS CLI 了解先决条件和安装)
    • https://docs.aws.amazon.com/cli/latest/userguide/getting-started-prereqs.html
  • 本地安装的 Quickwit 指南
    • https://quickwit.io/docs/main-branch/get-started/installation
  • jq
    • https://stedolan.github.io/jq/download/
  • GNU parallel
    • https://www.gnu.org/software/parallel/

jq 用于重塑事件成为可通过 Amazon Kinesis API 发送的记录。

创建索引

首先,我们创建一个新的索引。以下是与 GH Archive 事件模式对应的索引配置和文档映射:

#
# Index config file for gh-archive dataset.
#
version: 0.7

index_id: gh-archive

doc_mapping:
  field_mappings:
    - name: id
      type: text
      tokenizer: raw
    - name: type
      type: text
      fast: true
      tokenizer: raw
    - name: public
      type: bool
      fast: true
    - name: payload
      type: json
      tokenizer: default
    - name: org
      type: json
      tokenizer: default
    - name: repo
      type: json
      tokenizer: default
    - name: actor
      type: json
      tokenizer: default
    - name: other
      type: json
      tokenizer: default
    - name: created_at
      type: datetime
      fast: true
      input_formats:
        - rfc3339
      fast_precision: seconds
  timestamp_field: created_at

indexing_settings:
  commit_timeout_secs: 10

执行这些 Bash 命令来下载索引配置并创建 gh-archive 索引。

# Download GH Archive index config.
wget -O gh-archive.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/index-config.yaml

# Create index.
./quickwit index create --index-config gh-archive.yaml

创建并填充 Kinesis 流

现在,我们创建一个 Kinesis 流并将一些事件加载到其中。

这一步可能相当慢,具体取决于可用带宽。当前命令通过仅取从 GH Archive 下载的每个文件的前 10,000 行来限制要发送的数据量。如果您有足够的带宽,可以移除它来发送整套文件。您也可以通过增加分片的数量和/或 parallel 启动的任务数量 (-j 选项) 来加快速度。

# Create a stream named `gh-archive` with 3 shards.
aws kinesis create-stream --stream-name gh-archive --shard-count 8

# Download a few GH Archive files.
wget https://data.gharchive.org/2022-05-12-{10..12}.json.gz

# Load the events into Kinesis stream
gunzip -c 2022-05-12*.json.gz | \
head -n 10000 | \
parallel --gnu -j8 -N 500 --pipe \
'jq --slurp -c "{\"Records\": [.[] | {\"Data\": (. | tostring), \"PartitionKey\": .id }], \"StreamName\": \"gh-archive\"}" > records-{%}.json && \
aws kinesis put-records --cli-input-json file://records-{%}.json --cli-binary-format raw-in-base64-out >> out.log'

创建 Kinesis 源

#
# Kinesis source config file.
#
version: 0.7
source_id: kinesis-source
source_type: kinesis
params:
  stream_name: gh-archive

运行这些命令来下载源配置文件并创建源。

# Download Kinesis source config.
wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/kinesis-source.yaml

# Create source.
./quickwit source create --index gh-archive --source-config kinesis-source.yaml

如果此命令出现以下错误消息而失败:

Command failed: Stream gh-archive under account XXXXXXXXX not found.

Caused by:
    0: Stream gh-archive under account XXXXXXXX not found.
    1: Stream gh-archive under account XXXXXXXX not found.

这意味着 Kinesis 流在前一步骤中未正确创建。

启动索引和搜索服务

最后,执行此命令以服务器模式启动 Quickwit。

# Launch Quickwit services.
./quickwit run

在幕后,这个命令会启动一个索引器和一个搜索器。启动时,索引器将连接到由源指定的 Kinesis 流,并开始从组成流的分片流式处理和索引事件。使用默认的提交超时值(参见 索引设置),索引器应在大约 60 秒后发布第一个分片。

  • https://quickwit.io/docs/configuration/index-config#indexing-settings

您可以在另一个 shell 中运行此命令来检查索引的属性并查看当前发布的分片数量:

# Display some general information about the index.
./quickwit index describe --index gh-archive

也可以通过 Quickwit 用户界面 获取索引信息。

  • http://localhost:7280/ui/indexes/gh-archive

一旦发布了第一个分片,您就可以开始运行搜索查询。例如,我们可以找到所有关于 Kubernetes 仓库的事件:

  • https://github.com/kubernetes/kubernetes
curl 'http://localhost:7280/api/v1/gh-archive/search?query=org.login:kubernetes%20AND%20repo.name:kubernetes'

也可以通过 用户界面 访问这些结果。

  • http://localhost:7280/ui/search?query=org.login%3Akubernetes+AND+repo.name%3Akubernetes&index_id=gh-archive&max_hits=10

我们还可以按类型对这些事件进行分组并计数它们:

curl -XPOST -H 'Content-Type: application/json' 'http://localhost:7280/api/v1/gh-archive/search' -d '
{
  "query":"org.login:kubernetes AND repo.name:kubernetes",
  "max_hits":0,
  "aggs":{
    "count_by_event_type":{
      "terms":{
        "field":"type"
      }
    }
  }
}'

清理源(可选)

让我们删除为这篇教程创建的文件和源。

# Delete Kinesis stream.
aws kinesis delete-stream --stream-name gh-archive

# Delete index.
./quickwit index delete --index gh-archive

# Delete source config.
rm kinesis-source.yaml

至此完成了教程。如果您有关于 Quickwit 的任何问题或遇到任何问题,请不要犹豫,在 GitHub 上提出 问题 或打开 问题报告,或者直接在 Discord 上联系我们。

  • https://github.com/quickwit-oss/quickwit
  • https://github.com/quickwit-oss/quickwit/discussions
  • https://github.com/quickwit-oss/quickwit/issues
  • https://discord.com/invite/MT27AG5EVE

具有 SQS 通知的 S3

在这篇教程中,我们介绍如何设置 Quickwit 以从 S3 摄取数据,其中桶通知事件通过 SQS 流式传输。我们首先使用 Terraform 创建 AWS 源(S3 桶、SQS 队列、通知)。然后配置 Quickwit 索引和文件源。最后,我们将一些数据发送到源桶并验证其是否被正确索引。

AWS 源

完整的 Terraform 脚本可以从 这里 下载。

  • https://quickwit.io/docs/assets/sqs-file-source.tf

首先,创建接收源数据文件(NDJSON 格式)的桶:

resource "aws_s3_bucket" "file_source" {
  bucket_prefix = "qw-tuto-source-bucket"
}

然后设置 SQS 队列,当文件添加到桶时,队列将承载通知。队列配置了一个策略,允许源桶向其写入 S3 通知消息。同时创建一个死信队列 (DLQ),用于接收文件源无法处理的消息(例如损坏的文件)。消息在经过 5 次索引尝试后会被移动到 DLQ。

locals {
  sqs_notification_queue_name = "qw-tuto-s3-event-notifications"
}

data "aws_iam_policy_document" "sqs_notification" {
  statement {
    effect = "Allow"

    principals {
      type        = "*"
      identifiers = ["*"]
    }

    actions   = ["sqs:SendMessage"]
    resources = ["arn:aws:sqs:*:*:${local.sqs_notification_queue_name}"]

    condition {
      test     = "ArnEquals"
      variable = "aws:SourceArn"
      values   = [aws_s3_bucket.file_source.arn]
    }
  }
}

resource "aws_sqs_queue" "s3_events_deadletter" {
  name = "${locals.sqs_notification_queue_name}-deadletter"
}

resource "aws_sqs_queue" "s3_events" {
  name   = local.sqs_notification_queue_name
  policy = data.aws_iam_policy_document.sqs_notification.json

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.s3_events_deadletter.arn
    maxReceiveCount     = 5
  })
}

resource "aws_sqs_queue_redrive_allow_policy" "s3_events_deadletter" {
  queue_url = aws_sqs_queue.s3_events_deadletter.id

  redrive_allow_policy = jsonencode({
    redrivePermission = "byQueue",
    sourceQueueArns   = [aws_sqs_queue.s3_events.arn]
  })
}

配置桶通知,每当源桶中创建新文件时,都会向 SQS 写入消息:

resource "aws_s3_bucket_notification" "bucket_notification" {
  bucket = aws_s3_bucket.file_source.id

  queue {
    queue_arn = aws_sqs_queue.s3_events.arn
    events    = ["s3:ObjectCreated:*"]
  }
}

只支持 s3:ObjectCreated:* 类型的事件。
其他类型(例如 ObjectRemoved)会被确认,并且会记录警告日志。

源需要能够访问通知队列和源桶。以下策略文档包含了源所需的最小权限:

data "aws_iam_policy_document" "quickwit_node" {
  statement {
    effect = "Allow"
    actions = [
      "sqs:ReceiveMessage",
      "sqs:DeleteMessage",
      "sqs:ChangeMessageVisibility",
      "sqs:GetQueueAttributes",
    ]
    resources = [aws_sqs_queue.s3_events.arn]
  }
  statement {
    effect    = "Allow"
    actions   = ["s3:GetObject"]
    resources = ["${aws_s3_bucket.file_source.arn}/*"]
  }
}

创建 IAM 用户和凭证,以便将其与本地 Quickwit 实例关联:

resource "aws_iam_user" "quickwit_node" {
  name = "quickwit-filesource-tutorial"
  path = "/system/"
}

resource "aws_iam_user_policy" "quickwit_node" {
  name   = "quickwit-filesource-tutorial"
  user   = aws_iam_user.quickwit_node.name
  policy = data.aws_iam_policy_document.quickwit_node.json
}

resource "aws_iam_access_key" "quickwit_node" {
  user = aws_iam_user.quickwit_node.name
}

我们不建议在生产环境中使用 IAM 用户凭证运行 Quickwit 节点。
这只是为了简化教程设置。在 EC2/ECS 上运行时,应将策略文档附加到 IAM 角色上。

下载 完整的 Terraform 脚本,并使用 terraform initterraform apply 部署它。成功执行后,将列出配置 Quickwit 所需的输出。您可以使用以下命令显示敏感输出(密钥 ID 和密钥)的值:

terraform output quickwit_node_access_key_id
terraform output quickwit_node_secret_access_key

运行 Quickwit

本地安装 Quickwit,然后在安装目录中,使用必要的访问权限运行 Quickwit,将 <quickwit_node_access_key_id><quickwit_node_secret_access_key> 替换为匹配的 Terraform 输出值:

  • https://quickwit.io/docs/get-started/installation
AWS_ACCESS_KEY_ID=<quickwit_node_access_key_id> \
AWS_SECRET_ACCESS_KEY=<quickwit_node_secret_access_key> \
AWS_REGION=us-east-1 \
./quickwit run

配置索引和源

在另一个终端中,在 Quickwit 安装目录中,创建一个索引:

cat << EOF > tutorial-sqs-file-index.yaml
version: 0.7
index_id: tutorial-sqs-file
doc_mapping:
  mode: dynamic
indexing_settings:
  commit_timeout_secs: 30
EOF

./quickwit index create --index-config tutorial-sqs-file-index.yaml

<notification_queue_url> 替换为相应的 Terraform 输出值,为该索引创建一个文件源:

cat << EOF > tutorial-sqs-file-source.yaml
version: 0.8
source_id: sqs-filesource
source_type: file
num_pipelines: 2
params:
  notifications:
    - type: sqs
      queue_url: <notification_queue_url>
      message_type: s3_notification
EOF

./quickwit source create --index tutorial-sqs-file --source-config tutorial-sqs-file-source.yaml

num_pipeline 配置控制了多少个消费者将并行地从队列中轮询。根据您想要为此源分配的索引器计算资源选择数字。一般而言,每 2 个核心配置 1 个管道。

摄取数据

我们现在可以通过将文件上传到 S3 来向 Quickwit 发送数据。如果您已安装 AWS CLI,运行以下命令,将 <source_bucket_name> 替换为关联的 Terraform 输出:

curl https://quickwit-datasets-public.s3.amazonaws.com/hdfs-logs-multitenants-10000.json | \
    aws s3 cp - s3://<source_bucket_name>/hdfs-logs-multitenants-10000.json

如果您不想使用 AWS CLI,您也可以下载文件并通过 AWS 控制台手动将其上传到源桶。

等待大约 1 分钟,数据应该会出现在索引中:

./quickwit index describe --index tutorial-sqs-file

清理源

这节教程中实例化的 AWS 源不会产生固定成本,但我们仍然建议您完成后删除它们。在包含 Terraform 脚本的目录中,运行 terraform destroy

更多

1. Binance 如何使用 Quickwit 构建 100PB 日志服务(Quickwit 博客)


http://www.kler.cn/a/286788.html

相关文章:

  • 迅为RK3568开发板篇OpenHarmony实操HDF驱动控制LED-添加内核编译
  • cookie 与 session -- 会话管理
  • 高并发压力测试
  • 鸿蒙仓颉环境配置(仓颉SDK下载,仓颉VsCode开发环境配置,仓颉DevEco开发环境配置)
  • ChatGPT 摘要,以 ESS 作为你的私有数据存储
  • 下载ovkml文件的数据
  • Django+Vue农产品销售系统的设计与实现
  • 理解大模型中的Cursor技术:优化长文本推理的前沿方案
  • 微服务集成 Seata
  • 【 html+css 绚丽Loading 】000030 灵文闪烁符
  • 【Selenium】UI自动化实践——输入验证码登录
  • Mysql基础练习题 1084.销售分析3 (力扣)
  • 数据结构--初步了解(抽象分级)
  • 【专题】2024年中国AI人工智能基础数据服务研究报告合集PDF分享(附原数据表)
  • 架构设计(13)安全架构设计理论
  • QT +ffmpeg-4.2.2-win64-shared 拉取 RTMP/http-flv 流播放
  • 模型 冯/诺依曼思维模型
  • 实习的一点回顾单元测试
  • 网络爬虫调研报告
  • Force Yc 第九引导公告页HTML源码
  • Codeforces Round 969 (Div. 2)
  • ffplay源码分析(五)包缓存队列和帧缓存队列
  • 【微服务】springboot 自定义注解+反射+aop实现动态修改请求参数
  • WebAssembly技术实践
  • 通义说【线性代数】什么是线性
  • Git 基础使用--权限管理--用户和用户组授权