当前位置: 首页 > article >正文

flink sink doris

接上文:一文说清flink从编码到部署上线
网上关于flink sink drois的例子较多,大部分不太全面,故本文详细说明,且提供完整代码。

flink doris版本对照表
在这里插入图片描述

1.添加依赖

<!--doris cdc-->
        <!-- 参考:"https://doris.apache.org/zh-CN/docs/ecosystem/flink-doris-connector"版本对照表。
        到"https://repo.maven.apache.org/maven2/org/apache/doris/"下载对应版本的jar-->
        <!--mvn install:install-file -Dfile=D:/maven/flink-doris-connector-1.14_2.11-1.1.1.jar -DgroupId=com.flink.doris -DartifactId=flink-doris-connector-1.14_2.11 -Dversion=1.1.1 -Dpackaging=jar-->
        <dependency>
            <groupId>com.flink.doris</groupId>
            <artifactId>flink-doris-connector-1.14_2.11</artifactId>
            <version>1.1.1</version>
        </dependency>

2.运行环境工具类

EnvUtil具体实现如下:

package com.zl.utils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;

/**
 * EnvUtil
 * @description:
 */
public class EnvUtil {
    /**
     * 设置flink执行环境
     * @param parallelism 并行度
     */
    public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {
        // System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为root
        System.setProperty("HADOOP_USER_NAME", "root");
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        if (parallelism >0 ){
            //设置并行度
            env.setParallelism(parallelism);
        } else {
            env.setParallelism(1);// 默认1
        }

        // 添加重启机制
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));
        // 没有这个配置,会导致“Flink 任务没报错,但是无法同步数据到doris”。
        // 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000
        env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
        //rocksdb状态后端,启用增量checkpoint
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
        //设置checkpoint路径
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        // 同一时间只允许一个 checkpoint 进行(默认)
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //最小间隔,10*60*1000=60000
        checkpointConfig.setMinPauseBetweenCheckpoints(60000);
        // 取消任务后,checkpoint仍然保存
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //checkpoint容忍失败的次数
        checkpointConfig.setTolerableCheckpointFailureNumber(5);
        //checkpoint超时时间 默认10分钟
        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
        //禁用operator chain(方便排查反压)
        env.disableOperatorChaining();
        return env;
    }
}

3.CDC实现

相关sql详见文末代码:“resources/doris”。

3.1 创建doris数据库脚本

CREATE DATABASE IF NOT EXISTS `flink_test`;
USE `flink_test`;

DROP TABLE IF EXISTS `rv_table`;
CREATE TABLE `rv_table` (`dt` date NULL COMMENT '分区时间',
                         `uuid` varchar(30) NULL COMMENT '',
                         `report_time` bigint(20) NULL COMMENT '过车时间'
) ENGINE=OLAP
DUPLICATE KEY(`dt`, `uuid`)
COMMENT 'RV 数据'
PARTITION BY RANGE(`dt`)
()
DISTRIBUTED BY HASH(`uuid`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"is_being_synced" = "false",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p_rv_table",
"dynamic_partition.replication_allocation" = "tag.location.default: 1",
"dynamic_partition.buckets" = "32",
"dynamic_partition.create_history_partition" = "false",
"dynamic_partition.history_partition_num" = "-1",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);

3.2 创建mysql数据库脚本

CREATE DATABASE IF NOT EXISTS `flink_test1`;
USE `flink_test1`;

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for rv_table
-- ----------------------------
DROP TABLE IF EXISTS `rv_table`;
CREATE TABLE `rv_table` (
  `dt` varchar(10) NOT NULL,
  `uuid` varchar(30) DEFAULT NULL,
  `report_time` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

注意:“dt”是varchar类型,开始尝试使用“date”类型,运行正常。但是本来是“2024-12-20”结果到doris是“2020-07-09”,由于不在动态分区,保存失败。后续继续定位……

3.3 核心代码实现

package com.zl.doris;

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.zl.utils.EnvUtil;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.LocalDate;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class DorisExampleCDC {
    public static void main(String[] args) throws Exception {

        // 配置运行环境,并行度1
        StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);
        // 程序间隔离,每个程序单独设置
        env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/DorisExampleCDC");
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 跟DataSteam、RowData不一样。


        DorisOptions.Builder dorisOptions = DorisOptions.builder();
        dorisOptions.setFenodes("10.86.97.191:8130")//FE_IP:HTTP_PORT
                .setTableIdentifier("flink_test.rv_table")// db.table
                .setUsername("root")// root
                .setPassword("pwd");// password


        Properties properties = new Properties();
        // 上游是 json 写入时,需要开启配置
        properties.setProperty("format", "json");
        properties.setProperty("read_json_by_line", "true");
        DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
        executionBuilder.setLabelPrefix("doris_example_cdc3") //streamload label prefix
                .setDeletable(true)// 此处配置与DataSteam、RowData不同
                .setStreamLoadProp(properties);

        DorisSink.Builder<String> builder = DorisSink.builder();
        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(executionBuilder.build())
                .setDorisOptions(dorisOptions.build())
                .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions.build()).build());

        /*
        // 这种写法,整个运行过程都正常,但是最后就是没把数据存储到doris,当然doris官方示例,也没这么写。具体原因后面再找……
        JSONObject rvJsonObject = new JSONObject();
        rvJsonObject.put("dt","2024-12-21");
        rvJsonObject.put("uuid","cdc-2");
        rvJsonObject.put("report_time",1733881971621L);
        env.fromElements(JSONObject.toJSONString(rvJsonObject)).sinkTo(builder.build()).name("doris_sink").uid("doris_sink");*/

        /// mysql source
        List<String> SYNC_TABLES = Arrays.asList("flink_test.rv_table");
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("10.86.37.169")
                .port(3306)
                .databaseList("flink_test")
                .tableList(String.join(",", SYNC_TABLES))
                .username("root")
                .password("pwd")// 记得修改为实际密码
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").sinkTo(builder.build()).name("doris_sink").uid("doris_sink");
        env.execute("dorisSinkJob");
    }// main

}

3.4 运行效果

控制台输出,如下图所示:
在这里插入图片描述
flink web UI:
在这里插入图片描述

cdc成功后mysql与doris数据一致如下图所示:
在这里插入图片描述

4.DataSteam实现

注意:相关数据库脚本,参考CDC部分说明。

package com.zl.doris;

import com.alibaba.fastjson.JSONObject;
import com.zl.utils.EnvUtil;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.LocalDate;
import java.util.Properties;

public class DorisExampleDataSteam {
    public static void main(String[] args) throws Exception {

        // 配置运行环境,并行度1
        StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);
        // 程序间隔离,每个程序单独设置
        env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/DorisExampleDataSteam");
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 没有这个配置,会导致“Flink 任务没报错,但是无法同步数据到doris”。

        DorisOptions.Builder dorisOptions = DorisOptions.builder();
        dorisOptions.setFenodes("10.86.97.191:8130")//FE_IP:HTTP_PORT
                .setTableIdentifier("flink_test.rv_table")// db.table
                .setUsername("root")// root
                .setPassword("pwd");// password


        Properties properties = new Properties();
        // 上游是 json 写入时,需要开启配置
        properties.setProperty("format", "json");
        properties.setProperty("read_json_by_line", "true");
        DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
        executionBuilder.setLabelPrefix("doris_example_datasteam") //streamload label prefix
                .setDeletable(false)
                .setStreamLoadProp(properties);

        DorisSink.Builder<String> builder = DorisSink.builder();
        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(executionBuilder.build())
                .setSerializer(new SimpleStringSerializer()) //serialize according to string
                .setDorisOptions(dorisOptions.build());

        JSONObject rvJsonObject = new JSONObject();
        rvJsonObject.put("dt","2024-12-20");// 日期取当天
        rvJsonObject.put("uuid","data-stream-1");
        rvJsonObject.put("report_time",1733881971621L);

        env.fromElements(JSONObject.toJSONString(rvJsonObject)).sinkTo(builder.build()).name("doris_sink").uid("doris_sink");

        env.execute("dorisSinkJob");
    }// main

}

5.RowData实现

注意:相关数据库脚本,参考CDC部分说明。

package com.zl.doris;

import com.zl.utils.EnvUtil;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.RowDataSerializer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;

import java.time.LocalDate;
import java.util.Properties;

public class DorisExampleRowData {
    public static void main(String[] args) throws Exception {

        // 配置运行环境,并行度1
        StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);
        // 程序间隔离,每个程序单独设置
        env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/DorisExampleRowData");
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 没有这个配置,会导致“Flink 任务没报错,但是无法同步数据到doris”。

        DorisOptions.Builder dorisOptions = DorisOptions.builder();
        dorisOptions.setFenodes("10.86.97.191:8130")//FE_IP:HTTP_PORT
                .setTableIdentifier("flink_test.rv_table")// db.table
                .setUsername("root")// root
                .setPassword("pwd");// password


        Properties properties = new Properties();
        // 上游是 json 写入时,需要开启配置
        properties.setProperty("format", "json");
        properties.setProperty("read_json_by_line", "true");
        DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
        executionBuilder.setLabelPrefix("doris_example_rowdata") //streamload label prefix
                .setDeletable(false)
                .setStreamLoadProp(properties);

        String[] fields = {"dt", "uuid", "report_time"};
        DataType[] types = {DataTypes.DATE(),DataTypes.VARCHAR(30), DataTypes.BIGINT() };
        DorisSink.Builder<RowData> builder = DorisSink.builder();
        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(executionBuilder.build())
                .setSerializer(RowDataSerializer.builder()    //serialize according to rowdata
                        .setFieldNames(fields)
                        .setType("json")           //json format
                        .setFieldType(types).build())
                .setDorisOptions(dorisOptions.build());

        DataStream<RowData> source = env.fromElements("")
                .map(new MapFunction<String, RowData>() {
                    @Override
                    public RowData map(String value) throws Exception {
                        GenericRowData genericRowData = new GenericRowData(3);
                        Long myDate= LocalDate.of(2024,12,21).toEpochDay();// 日期取当天
                        genericRowData.setField(0, myDate.intValue());// 计算指定日期与1970-01-01相差的天数
                        genericRowData.setField(1, StringData.fromString("row-data-1"));
                        genericRowData.setField(2, 1733881971621L);
                        return genericRowData;
                    }
                });

        source.sinkTo(builder.build()).setParallelism(1).name("doris_sink").uid("doris_sink").name("doris_sink").uid("doris_sink");

        env.execute("dorisSinkJob");
    }// main

}

6.完整代码

完整代码见:完整代码
官方文档:flink-doris-connector
首先还是要参考官方文档,其次是照着官方文档多尝试。不同产品、不同章节,编写文档水平、代码风格还是有些差异,多体会其中核心思想。

7.遇到问题

7.1 问题1

遇到问题:整个执行过程都很正常,但是最后表里面没有数据。
问题原因:插入的数据不属于任何分区。
解决方法:注意本实例中是以日期(dt)作为分区字段的,所以dt要是具体的日期,比如“2024-12-20”。

7.2 问题2

遇到问题:整个执行过程都很正常,但是最后表里面没有数据。

官方文档:Connector1.1.0 版本以前,是攒批写入的,写入均是由数据驱动,需要判断上游是否有数据写入。1.1.0 之后,依赖 Checkpoint,必须开启 Checkpoint 才能写入。
其实还要:env.setRuntimeMode(RuntimeExecutionMode.BATCH)。

7.3 问题3

问题:DorisRuntimeException: Failed to get backend via
解决:doris连接信息(ip、端口、账号、密码)错误导致。

7.4 问题4

问题:DorisRuntimeException: stream load error: [INTERNAL_ERROR]too many filtered rows
原因:所在动态分区不存在。


http://www.kler.cn/a/452424.html

相关文章:

  • CentOS7下的vsftpd服务器和客户端
  • 宠物行业的出路:在爱与陪伴中寻找增长新机遇
  • vue-axios+springboot实现文件流下载
  • 探索多模态大语言模型(MLLMs)的推理能力
  • 快速解决oracle 11g中exp无法导出空表的问题
  • 14:30面试,14:08就出来了,面试问的有点变态呀。。。
  • 外包干了两年,技术退步明显...
  • LeetCode每日三題(三
  • shell学习简介(一)
  • 【三维重建】去除瞬态物体Distractor汇总
  • 【行空板K10】评测资料准备
  • 华为OD机试 密码截获(C/JAVA)
  • NNDL 作业11 LSTM
  • FFmpeg在python里推流被处理过的视频流
  • MyBatis如何处理延迟加载?
  • 三维扫描在汽车/航空行业应用
  • Java web的发展历史
  • C#中的委托机制:深入理解与应用
  • 基于earthSDK三维地图组件开发
  • vue.js 指令的修饰符
  • 16.2、网络安全风险评估技术与攻击
  • 解决Gradle下载很慢,运行及打包很慢
  • 在开发嵌入式系统时,尤其是处理大数时,会遇到取值范围的问题。51单片机通常没有内建大整数支持,因此我们需要采用不同的方法来解决这一问题
  • 【ELK】ES单节点升级为集群并开启https【亲测可用】
  • 探索 Samba 服务器:搭建跨平台文件共享的桥梁
  • Converseen:全能免费批量图像处理专家