springboot集成flink实现DM数据库同步到ES
前言
今天分享的其实是一个面试上机方案,就是监测DM数据库数据,同步到ES,使用flink实现。基本套路,其实也没啥好说的,非要说也就是,国家队还是很多不跟你玩啊,虽然flink有阿里在背后,但是确实也没有带DM玩。也许是DM不配合,万一真有互联网公司带着玩,政府部门估计不敢用了,哈哈。
一、DM数据库连接
我这里是使用的DBeaver连接DM数据库的,自己建的驱动,以前有分享怎么建,这里就不细说。
看效果检查数据也是使用DBeaver连接的ES,ES要证书什么的,这里也是使用建驱动连接,跟DM一样的。
自建驱动连接DM、ES
ES免证书连接驱动下载
连接DM数据库配置驱动
二、实现方案
- 监视数据变化,DM数据库自己完成,采用触发器监视
- flink监视第一步监视的数据表,flink没有直接binlog这种方式监视DM数据库
- flink实现周期监视变更记录表
- 处理增、删、改业务逻辑同步ES
三、上代码
1.创建触发器
REATE OR REPLACE TRIGGER DB库名.RC_PASS_RECORD_CHANGE_LOG_TRIGGER
AFTER INSERT OR UPDATE OR DELETE ON DB库名.RC_PASS_RECORD
FOR EACH ROW
BEGIN
IF INSERTING THEN
-处理插入
INSERT INTO RC_PASS_RECORD_CHANGE_LOG (CHANGE_TYPE, CHANGE_ID) VALUES ('INSERT', :new.ID);
ELSIF UPDATING THEN
-处理更新
INSERT INTO RC_PASS_RECORD_CHANGE_LOG (CHANGE_TYPE, CHANGE_ID) VALUES ('UPDATE', :new.ID);
ELSIF DELETING THEN
-处理删除
INSERT INTO RC_PASS_RECORD_CHANGE_LOG (CHANGE_TYPE, CHANGE_ID) VALUES ('DELETE', :old.ID);
END IF;
END;
2.flink代码
这里集成Springboot,所以先建最新的Springboot项目,引入flink的依赖、DM数据库、ES的连接依赖。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.zw</groupId>
<artifactId>olap-zw</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>olap-zw</name>
<description>olap-zw</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
<skipTests>true</skipTests>
<flink.version>1.20.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>-->
<!-- <dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.6</version>
</dependency>-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.18</version>
</dependency>-->
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
<version>8.1.3.62</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7</artifactId>
<version>3.0.1-1.17</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.25</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<!-- Flink打包方式一 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.zw.olapzw.OlapZwApplication</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
代码包结构与文件
配置文件
spring.application.name=olap-zw
spring.datasource.url=jdbc:dm://192.168.1.22:5236?schema=DB库名&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai&useSSL=false
spring.datasource.username=账号
spring.datasource.password=密码
spring.datasource.driver-class-name=dm.jdbc.driver.DmDriver
#spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
#spring.datasource.druid.initial-size=5
#spring.datasource.druid.max-active=10
#spring.datasource.druid.min-idle=5
#spring.datasource.druid.max-wait=60000
#spring.datasource.druid.validation-query=SELECT 1
#spring.datasource.druid.test-while-idle=true
#spring.datasource.druid.time-between-eviction-runs-millis=30000
#spring.datasource.druid.min-evictable-idle-time-millis=60000
es.host=192.168.1.21
es.port=9200
logging.level.root=error
logging.level.com.zw.olapzw.sink=error
logging.level.com.zw.olapzw.source=error
entity -> SourceDataEntity
package com.zw.olapzw.entity;
import lombok.Data;
import java.util.Date;
/**
* @author zwmac
*/
@Data
public class SourceDataEntity {
private String id;
private Long deviceId;
private Long gateId;
private String authObjCode;
/**
* 变更类型,来源触发器插入的标识
*/
private String changeType;
/**
* 同步时间
*/
private Date storageTime;
}
sink -> ResultSinkDataEntitySink
package com.zw.olapzw.sink;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.zw.olapzw.entity.SourceDataEntity;
import com.zw.olapzw.util.ConnUtil;
import com.zw.olapzw.util.ESClientUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @author zwmac
*/
@Slf4j
public class ResultSinkDataEntitySink extends RichSinkFunction<SourceDataEntity> {
@Resource
private ConnUtil connUtil;
@Override
public void invoke(SourceDataEntity record, Context context) throws Exception {
super.invoke(record, context);
//收到的数据
log.info("Start sink data entity");
log.info("Sink data: {}", record);
//设置存储时间
record.setStorageTime(DateUtil.date());
//组织sql,利用反射
JSONObject dataJson = JSONUtil.parseObj(record);
log.info("-- 同步数据dataJson: {}", dataJson);
//TODO 连接ES,存储到ES
// 连接 Elasticsearch
ConnUtil connUtil = SpringUtil.getBean(ConnUtil.class);
String es_host = connUtil.getEsHost();
String es_port = connUtil.getEsPort();
RestHighLevelClient restHighLevelClient = ESClientUtil.getClient(es_host, Integer.parseInt(es_port));
//使用RestHighLevelClient创建索引
//根据changeType判断
String changeType = record.getChangeType();
switch (changeType) {
case "INSERT":
//创建索引
sinkInsert(record, dataJson, restHighLevelClient);
break;
case "UPDATE":
sinkUpdate(record, dataJson, restHighLevelClient);
break;
case "DELETE":
sinkDel(record, dataJson, restHighLevelClient);
break;
}
//关闭连接
restHighLevelClient.close();
log.info("end sink data entity");
}
/**
* 删除数据
* @param record
* @param dataJson
* @param restHighLevelClient
* @throws IOException
*/
private void sinkDel(SourceDataEntity record, JSONObject dataJson, RestHighLevelClient restHighLevelClient) throws IOException {
//使用restHighLevelClient删除数据
DeleteRequest deleteRequest = new DeleteRequest("test_index", record.getId());
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
log.error("--- ES使用DeleteRequest删除返回deleteResponse: {}", deleteResponse);
}
/**
* 更新数据
* @param record
* @param dataJson
* @param restHighLevelClient
* @throws IOException
*/
private void sinkUpdate(SourceDataEntity record, JSONObject dataJson, RestHighLevelClient restHighLevelClient) throws IOException {
//使用restHighLevelClient更新数据
UpdateRequest updateRequest = new UpdateRequest("test_index", record.getId());
//String jsonStr = JSONUtil.toJsonStr(dataJson);
//updateRequest.doc("data", jsonStr);
Map<String, Object> updateFields = new HashMap<>();
updateFields.put("deviceId", record.getDeviceId());
updateFields.put("id", record.getId());
updateFields.put("authObjCode", record.getAuthObjCode());
updateFields.put("gateId", record.getGateId());
if (MapUtil.isNotEmpty(updateFields)){
updateRequest.doc(updateRequest);
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.error("---- ES使用UpdateRequest更新返回: {}", updateResponse);
}else {
log.error("------- es更新时 updateMap is empty");
}
}
/**
* 插入数据
* @param record
* @param dataJson
* @param restHighLevelClient
* @throws IOException
*/
private void sinkInsert(SourceDataEntity record, JSONObject dataJson, RestHighLevelClient restHighLevelClient) throws IOException {
//使用restHighLevelClient存储数据
IndexRequest indexRequest = new IndexRequest("test_index");
indexRequest.id(record.getId());
Map<String, Object> insertFields = new HashMap<>();
insertFields.put("deviceId", record.getDeviceId());
insertFields.put("id", record.getId());
insertFields.put("authObjCode", record.getAuthObjCode());
insertFields.put("gateId", record.getGateId());
indexRequest.source(insertFields);
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
log.error("------ ES使用IndexRequest插入返回indexResponse: {}", indexResponse);
}
}
source -> ResultSourceDataEntitySource
package com.zw.olapzw.source;
import cn.hutool.extra.spring.SpringUtil;
import com.zw.olapzw.entity.SourceDataEntity;
import com.zw.olapzw.util.ConnUtil;
import dm.jdbc.util.StringUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.springframework.stereotype.Component;
import java.sql.*;
/**
* @author zwmac
*/
@Component
@Slf4j
public class ResultSourceDataEntitySource extends RichSourceFunction<SourceDataEntity> {
@Resource
private ConnUtil connUtil;
String changeLogSql = "SELECT * FROM LAMP_ZF.RC_PASS_RECORD_CHANGE_LOG RPRCL";
String recordSql = "SELECT * FROM LAMP_ZF.RC_PASS_RECORD RPR";
Long startId = 0L;
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<SourceDataEntity> sourceContext) throws Exception {
//TODO 从数据库中读取数据,这块要是跟springboot结合起来,就可以用一些orm框架,等等项目上的一些东西了
log.info("Start source data entity");
try {
ConnUtil connUtil = SpringUtil.getBean(ConnUtil.class);
String jdbcUrl = connUtil.getJdbcUrl();
String jdbcUsername = connUtil.getJdbcUsername();
String jdbcPassword = connUtil.getJdbcPassword();
Connection connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword);
Statement statement = connection.createStatement();
// 执行查询语句
while (isRunning) {
StringBuffer sbf = new StringBuffer();
sbf.append(changeLogSql);
sbf.append(" WHERE RPRCL.ID > ");
sbf.append(startId);
sbf.append(" ORDER BY RPRCL.ID ASC");
String querySql = sbf.toString();
//log.info(querySql);
log.error("---查询sql:" + querySql);
ResultSet resultSet = statement.executeQuery(querySql);
// 处理查询结果
while (resultSet.next()) {
// 读取每一行数据
Long id = resultSet.getLong("ID");
startId = id;
//CHANGE_TYPE
String changeType = resultSet.getString("CHANGE_TYPE");
//CHANGE_ID
String changeId = resultSet.getString("CHANGE_ID");
//CREATE_TIME
Date changeDate = resultSet.getDate("CREATE_TIME");
SourceDataEntity sourceDataEntity = new SourceDataEntity();
sourceDataEntity.setChangeType(changeType);
//如果是删除就不查了
if("DELETE".equals(changeType)) {
sourceDataEntity.setId(changeId);
}else {
//根据类型查询对应的数据
StringBuffer recordSbf = new StringBuffer();
recordSbf.append(recordSql);
recordSbf.append(" WHERE RPR.ID = '" + changeId + "'");
String queryRecordSql = recordSbf.toString();
log.error("-- 查询记录sql:" + queryRecordSql);
Statement recordSm = connection.createStatement();
ResultSet recordRs = recordSm.executeQuery(queryRecordSql);
while (recordRs.next()) {
//解析数据
sourceDataEntity.setId(recordRs.getString("ID"));
sourceDataEntity.setDeviceId(recordRs.getLong("DEVICE_ID"));
sourceDataEntity.setGateId(recordRs.getLong("GATE_ID"));
sourceDataEntity.setAuthObjCode(recordRs.getString("AUTH_OBJ_CODE"));
}
}
if (sourceDataEntity.getId() != null){
//解决物理删除后的报错
sourceDataEntity.setStorageTime(changeDate);
sourceContext.collect(sourceDataEntity);
}
}
//调试用的,生产可以根据情况加活不加
Thread.sleep(10000L);
}
} catch (SQLException e) {
log.error(e.getMessage());
}
}
@Override
public void cancel() {
isRunning = false;
}
}
util -> ConnUtil
package com.zw.olapzw.util;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Data
public class ConnUtil {
@Value("${spring.datasource.url}")
private String jdbcUrl;
@Value("${spring.datasource.username}")
private String jdbcUsername;
@Value("${spring.datasource.password}")
private String jdbcPassword;
@Value("${es.host}")
private String esHost;
@Value("${es.port}")
private String esPort;
}
util -> ESClientUtil
package com.zw.olapzw.util;
import lombok.Data;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author zwmac
*/
public class ESClientUtil {
private static RestHighLevelClient restHighLevelClient;
public static RestHighLevelClient getClient(String host, int port) {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"))
.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder;
}
});
restHighLevelClient = new RestHighLevelClient(builder);
return restHighLevelClient;
}
public static void closeClient() {
try {
if (restHighLevelClient != null) {
restHighLevelClient.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
OlapZwApplication
package com.zw.olapzw;
import com.zw.olapzw.entity.SourceDataEntity;
import com.zw.olapzw.sink.ResultSinkDataEntitySink;
import com.zw.olapzw.source.ResultSourceDataEntitySource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
/**
* @author zwmac
*/
@SpringBootApplication
public class OlapZwApplication {
/*public static void main(String[] args) {
//SpringApplication.run(OlapZwApplication.class, args);
//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//开启checkpoint,每隔5秒钟做一次checkpoint
env.enableCheckpointing(5000L);
//指定checkpoint的一致性语义
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置任务关闭的时候保留最后一次checkpoint数据
checkpointConfig.enableUnalignedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION.deleteOnCancellation());
//重试策略设置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));
//源数据
DataStreamSource<SourceDataEntity> streamSource = env.addSource(new ResultSourceDataEntitySource(), "ResultSourceDataEntitySource");
//下层处理
streamSource.addSink(new ResultSinkDataEntitySink());
System.out.println("Hello, OlapZwApplication!");
try {
env.execute("达梦数据库变更数据同步");
} catch (Exception e) {
System.out.println("达梦数据库变更数据同步,原因:" + e.getMessage());
throw new RuntimeException(e);
}
}
*/
public static void main(String[] args) {
SpringApplication.run(OlapZwApplication.class, args);
System.out.println("OlapZwApplication started");
}
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
return args -> {
//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//开启checkpoint,每隔5秒钟做一次checkpoint
env.enableCheckpointing(5000L);
//指定checkpoint的一致性语义
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置任务关闭的时候保留最后一次checkpoint数据
checkpointConfig.enableUnalignedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION.deleteOnCancellation());
//重试策略设置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));
//源数据
SourceFunction source = new ResultSourceDataEntitySource();
//添加数据源到运行环境
DataStreamSource<SourceDataEntity> streamSource = env.addSource(source, "ResultSourceDataEntitySource");
//下游处理逻辑
streamSource.addSink(new ResultSinkDataEntitySink());
try {
env.execute("达梦数据库变更数据同步");
} catch (Exception e) {
System.out.println("达梦数据库变更数据同步,原因:" + e.getMessage());
throw new RuntimeException(e);
}
System.out.println("flink CDC started");
};
}
}
总结
- 其实感觉没啥好说的,整个代码后面分享到csdn的gitCode