五.海量数据实时分析-FlinkCDC+DorisConnector实现数据的全量增量同步
前言
前面四篇文字都在学习Doris的理论知识,也是比较枯燥,当然Doris的理论知识还很多,我们后面慢慢学,本篇文章我们尝试使用SpringBoot来整合Doris完成基本的CRUD。
由于 Doris 高度兼容 Mysql 协议,两者在 SQL 语法方面有着比较强的一致性,另外 Mysql 客户端也是 Doris 官方选择的客户端。因此,如需对 Mysql 进行数据分析,使用 Doris 的迁移成本较低。但是对于数据规模特别大的情况下Mysql的方式还是不太建议的,所以这里还会介绍一种 方式就是通过CDC+DorisConnector
一.整合Mybatis操作Doris
1.准备数据库
CREATE TABLE `doris_test` (
`id` int NULL COMMENT "id",
`price` decimal(10,2) NULL COMMENT "价格",
`title` varchar(20)NULL COMMENT "标题"
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
Query OK, 0 rows affected (0.06 sec)
2.搭建SpringBoot项目
第一步:创建SpringBoot项目,导入依赖,主要是Mybatis相关的依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.1</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--如果要用传统的xml或properties配置,则需要添加此依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<!--mybatisplus持久层依赖-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.23</version>
</dependency>
</dependencies>
3.整合Mybatis
创建好启动类,实体类,服务层,持久层,SQL映射文件等,这里和我们操作Mysql没有任何区别.
启动类如下,通过 @MapperScan 来扫描Mapper接口
@SpringBootApplication
@MapperScan(basePackages = "cn.whale.mapper")
public class DorisApplication {
public static void main(String[] args) {
SpringApplication.run(DorisApplication.class,args);
}
}
服务层和持久层代码如下 , 省略了部分代码
@Service
public class DorisServiceImpl implements DorisService {
@Autowired
DorisMapper dorisMapper;
@Override
public List<Order> listDoris() {
return dorisMapper.listDoris();
}
@Override
public int add(Order order) {
return dorisMapper.add(order);
}
}
public interface DorisMapper {
List<Order> listDoris();
int add(Order order);
}
下面是sql映射文件,和操作数据库没有任何区别
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.whale.mapper.DorisMapper">
<select id="listDoris" resultType="cn.whale.domain.Order">
select id,price,title from orders
</select>
<insert id="add" parameterType="cn.whale.domain.Order">
INSERT INTO orders(id,price,title) VALUES(#{id},#{price},#{title})
</insert>
</mapper>
4.编写配置文件
server:
port: 8080
spring:
#Doris数据库连接配置
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.220.253:9030/app_db?characterEncoding=utf-8&useSSL=false
username: root
password: 123456
type: com.alibaba.druid.pool.DruidDataSource
initial-size: 500
min-idle: 500
max-active: 500
#mybatis的相关配置
mybatis:
mapper-locations: classpath:mapper/*.xml
type-aliases-package: com.wudl.doris.domain.DorisTest
5.编写测试类
注入Service,完成查询和添加的测试
package cn.whale.service;
import cn.whale.DorisApplication;
import cn.whale.domain.Order;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.math.BigDecimal;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest(classes= DorisApplication.class)
public class DorisServiceTest {
@Autowired
private DorisService dorisService;
@Test
public void listDoris() {
List<Order> orders = dorisService.listDoris();
orders.stream().forEach(System.out::println);
}
@Test
public void addDoris() throws InterruptedException {
for (int i = 100 ; i>0 ; i--){
Order order = new Order();
order.setId(Long.valueOf(i));
order.setPrice(new BigDecimal(i));
order.setTitle(i+"元");
dorisService.add(order);
}
Thread.sleep(100000);
}
}
二.SpringBoot整合FlinkCDC+doris-connector实时同步
上面通过Mysql客户端的方式来进行数据的同步方式虽然使用起来比较简单,但是不适合大量数据的实时同步,性能不是很好。Doris官方提供了doris-connector进行Doris读写,那么我们可以通过FlinkCDC监听Mysql数据变更,然后同步到SpringBoot项目中,对数据进行处理后,然后通过doris-connector同步到Doris。虽然我们之前也介绍过直接通过FlinkCDC同步Mysql到Doris,那种方式我们没办法对数据进行处理。
1.搭建项目导入依赖
版本兼容
- flink-connector-mysql-cdc :flinkCDC,实时监听Mysql增量或者全量同步
- flink-doris-connector :用来读写Doris的驱动
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.13.6</flink.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.13</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--mysql -cdc-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.18</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
<!-- flink doris connector -->
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.13_2.12</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
</dependencies>
2.定义消息序列化器
定义序列化器,当拿到Mysql的数据后我们可以通过虚拟化器对数据进行格式化
/**
* @desc mysql消息读取自定义序列化
**/
public class MysqlDeserialization implements DebeziumDeserializationSchema<String> {
public static final String TS_MS = "ts_ms";
public static final String BIN_FILE = "file";
public static final String POS = "pos";
public static final String CREATE = "CREATE";
public static final String BEFORE = "before";
public static final String AFTER = "after";
public static final String SOURCE = "source";
public static final String UPDATE = "UPDATE";
/**
*
* 反序列化数据,转为变更JSON对象
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) {
//拿到数据
Struct struct = (Struct) sourceRecord.value();
//输出数据
collector.collect(getJsonObject(struct, AFTER).toJSONString());
}
/**
*
* 从原数据获取出变更之前或之后的数据
*/
private JSONObject getJsonObject(Struct value, String fieldElement) {
Struct element = value.getStruct(fieldElement);
JSONObject jsonObject = new JSONObject();
if (element != null) {
Schema afterSchema = element.schema();
List<Field> fieldList = afterSchema.fields();
for (Field field : fieldList) {
Object afterValue = element.get(field);
jsonObject.put(field.name(), afterValue);
}
}
return jsonObject;
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
3.Mysql监听器
监听器的作用主要是监听Mysql的数据变更后,通过序列化器把数据进行处理后,然后同步到Doris中
/**
* @desc mysql变更监听
**/
@Component
public class MysqlEventListener implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//设置Mysql数据源
DebeziumSourceFunction<String> dataChangeInfoMySqlSource = buildDataChangeSource();
DataStream<String> streamSource = env
.addSource(dataChangeInfoMySqlSource, "mysql-source")
.setParallelism(1);
//streamSource.addSink(dataChangeSink);
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("strip_outer_array", "true");
//配置Doris信息
streamSource.addSink(DorisSink.sink(DorisExecutionOptions.builder()
.setBatchSize(3)
.setBatchIntervalMs(10L)
.setMaxRetries(3)
.setStreamLoadProp(pro)
.setEnableDelete(true)
.build(),
DorisOptions.builder()
.setFenodes("192.168.220.253:8030")
.setTableIdentifier("app_db.orders")
.setUsername("root")
.setPassword("123456").build()));
env.execute("mysql-stream-cdc");
}
/**
* 构造变更数据源
*/
private DebeziumSourceFunction<String> buildDataChangeSource() {
return MySqlSource.<String>builder()
.hostname("192.168.220.253")
.port(3307)
.databaseList("app_db")
.tableList("app_db.orders")
.username("root")
.password("123456")
/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest:只进行增量导入(不读取历史变化)
* timestamp:指定时间戳进行数据导入(大于等于指定时间读取数据)
*/
.startupOptions(StartupOptions.latest())
//序列化
.deserializer(new MysqlDeserialization())
.serverTimeZone("GMT+8")
.build();
}
}
4.最后编写一个启动类
@SpringBootApplication
public class FlinkCdcApplication {
public static void main(String[] args) {
SpringApplication.run(FlinkCdcApplication.class, args);
}
}
5.准备好数据库
我的Mysql使用的是 :app_db.orders 对应的是Doris中的 app_db.orders ,字段类型,字段名需要一致哦。然后启动项目,修改Mysql的数据,Doris会自动同步过去
三.其他同步方式
Doris还支持其他的数据读写方式,具体的可以根据官网案例去尝试地址:https://doris.apache.org/zh-CN/docs/1.2/ecosystem/spark-doris-connector