当前位置: 首页 > article >正文

如何使用 DataX 连接 Easysearch

DataX

DataX 是阿里开源的一款离线数据同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

本篇主要介绍 DataX 如何将数据写入到 Easysearch,对于各种数据源的连接不会做深入的探讨,感兴趣的小伙伴可以访问 DataX 的 Github 仓库查看详情。

下载与安装

DataX 无需安装,下载后解压即可使用。

系统需求:

  1. JDK 1.8 及以上
  2. Python2 或 3

创建任务配置文件

每个数据同步的操作可称为一个任务,任务的配置文件定义了数据源(reader)、数据目的(writer) ,以及任务的设置信息,如并发数、速度控制等。DataX 集成了如此多的数据源,如果靠纯手工编写任务配置显然不现实。官方也出了个命令可以根据指定的数据源和数据目的帮助大家生成任务配置。

python datax.py -r {YOUR_READER} -w {YOUR_WRITER}

测试配置文件

此次演示使用 streamreader 和 elasticsearchwriter 作为数据源和数据目的,任务配置如下:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10000,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好,世界-DataX"
              },
              {
                "type": "string",
                "value": "hello,你好,Easysearch"
              }
            ]
          }
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://localhost:9200",
            "accessId": "admin",
            "accessKey": "1ef0c661d8562aaa06be",
            "index": "yf-test",
            "column": [
              { "name": "no", "type": "long" },
              { "name": "content", "type": "keyword" },
              { "name": "content2", "type": "keyword" }
            ]
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 50
      }
    }
  }
}

streamreader 是一个从内存读取数据的插件, 它主要用来快速生成期望的数据并对写入插件进行测试。

我们用 streamreader 构造了 10000 个文档,文档含三个字段,任务启动了 50 个 channel 进行数据发送,结果就是共计发送 50w 个文档。

elasticssearchwriter 指定了 Easysearch 的连接信息:

  • endpoint: Easysearch 的地址和端口
  • accessId: 用户名
  • accessKey: 密码
  • index: 写入索引名
  • column: 对 reader 发来数据的 schema 定义
  • batchsize: 默认 1000

这次我们 Easysearch 开启的 http 服务,因为 DataX 的 elasticsearchwriter 无法跳过证书验证。对于必须使用 https 的场景,可使用 INFINI Gateway 代理 ES 服务,提供 http 通道给离线数据同步专用。

⚠️ 注意:

不同的 reader、writer 对 sliceRecordCount 和 channel 会有不同的行为。

Easysearch

本次测试使用的 Easysearch 版本是 1.9.0,需要注意是 Easysearch 要开启兼容性参数:

elasticsearch.api_compatibility: true

否则创建索引报错退出。(实际索引创建成功了但是 mapping 信息是空的)

运行任务

编辑好任务配置文件后,下一步就是执行任务。

python3 datax.py yf-test.json

写入数据时索引不存在,Datax 根据 schema 定义创建了索引。

OK 任务执行完毕,写入 50w 个文档耗时 10 秒。


http://www.kler.cn/a/541545.html

相关文章:

  • SpringBoot 统一功能处理之拦截器、数据返回格式、异常处理
  • 深度学习之神经网络框架搭建及模型优化
  • 机器学习怎么学习,还有算法基本的源代码
  • iOS AES/CBC/CTR加解密以及AES-CMAC
  • 【前端开发】HTML+CSS+JavaScript前端三剑客的基础知识体系了解
  • 【C语言标准库函数】标准输入输出函数详解[4]:二进制文件读写函数
  • 鸿蒙NEXT开发-鸿蒙三方库
  • html文件怎么转换成pdf文件,2025最新教程
  • electron.vite 项目创建以及better-sqlite3数据库使用
  • 基于SpringBoot的公益社团管理系统
  • Windows逆向工程入门之汇编数据存储\宽度,内存地址及边界,数据截断处理
  • 003 Linux驱动开发——第一个简单开发实验
  • python动物识别深度学习分析系统
  • 2.1 JUnit 5 测试发现机制详解
  • Dify 框架连接 PGSQL 数据库与 Sandbox 环境下的 Linux 系统调用权限问题
  • 什么是动态路由和嵌套路由?
  • Unity快速入门2 - 3D渲染
  • 【Python深入浅出】Python3邂逅MySQL:开启数据交互之旅
  • Python+wxauto:实现电脑端微信程序自动化
  • JDBC数据库连接池及相关练习(学习自用)
  • 云原生周刊:DeepSeek 颠覆人工智能
  • 基于springboot+vue的校园招聘网站的设计与实现
  • 《手札·数转篇》中小制造企业的信息化升级:MES系统的价值与应用
  • mysql 不是内部或外部命令,也不是可运行的程序或批处理文件
  • LeetCode-169多数元素
  • html+canvas地图画布实现快速拖动出现瓦片空白问题优化