Flink CDC 同步 Mysql 数据
文章目录
- 一、Flink CDC、Flink、CDC各有啥关系
- 1.1 概述
- 1.2 和 jdbc Connectors 对比
- 二、使用
- 2.1 Mysql 打开 bin-log 功能
- 2.2 在 Mysql 中建库建表准备
- 2.3 遇到的坑
- 2.4 测试
- 三、番外
一、Flink CDC、Flink、CDC各有啥关系
Flink:流式计算框架,不包含 Flink CDC,和 Flink CDC没关系
CDC:是一种思想,理念,不涉及某一门具体的技术。CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。
Flink CDC:是 CDC 的一种实现而已,不属于 Flink 子版块。这个技术是阿里开发的。目的是为了丰富 Flink 的生态。
1.1 概述
Flink CDC 基于数据库日志的 Change Data Caputre
技术,实现了全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。
1.2 和 jdbc Connectors 对比
JDBC Connectors
连接器,确实可以读取外部的 数据库。比如:MySQL、Oracle、SqlServer等。但是,JDBC连数据库,只是瞬时操作,没办法持续监听数据库的数据变化。
Flink CDC Connectors
,可以实现数据库的变更捕获,能够持续不断地把变更数据同步到下游的系统中。
官网概述:https://ververica.github.io/flink-cdc-connectors/
github链接:https://github.com/ververica/flink-cdc-connectors
二、使用
FlinkCDC 同步数据,有两种方式,一种是 FlinkSQL 的方式,一种是Flink DataStream 和 Table API 的方式。
我这里直接用的是 ieda 测试的 DataStream 方式。
代码来自:https://github.com/yclxiao/flink-cdc-demo/tree/main/src/main/java/com/yclxiao/flinkcdcdemo
CloudAcctProfit2DwsHdjProfitRecordAPI.java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.xiaoqiang.utils.JdbcUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.*;
public class CloudAcctProfit2DwsHdjProfitRecordAPI {
private static final Logger LOG = LoggerFactory.getLogger(CloudAcctProfit2DwsHdjProfitRecordAPI.class);
private static String MYSQL_HOST = "x.x.x.x";
private static int MYSQL_PORT = 3306;
private static String MYSQL_USER = "root";
private static String MYSQL_PASSWD = "xiaoqiang";
private static String SYNC_DB = "league_test";
private static List<String> SYNC_TABLES = Arrays.asList("league_test.oc_settle_profit");
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(MYSQL_HOST)
.port(MYSQL_PORT)
.databaseList(SYNC_DB) // set captured database
.tableList(String.join(",", SYNC_TABLES)) // set captured table
.username(MYSQL_USER)
.password(MYSQL_PASSWD)
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);
DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + "xiaoqiang-flink");
List<String> tableList = getTableList();
System.out.println("tableList--->"+tableList);
for (String tbl : tableList) {
SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, "oc_settle_profit");
// SingleOutputStreamOperator<String> cleanStream = clean(filterStream);
// 流的数据sink出去
filterStream.addSink(new CustomDealDataSink())
.name("sink " + tbl);
}
env.execute("xiaoqiang-flink");
}
/**
* 自定义sink
*/
private static class CustomDealDataSink extends RichSinkFunction<String> {
private transient Connection coalitiondbConnection;
private transient Statement coalitiondbStatement;
private transient Connection cloudConnection;
private transient Statement cloudStatement;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 在这里初始化 JDBC 连接
coalitiondbConnection = DriverManager.getConnection("jdbc:mysql://x.x.x.x:3306/league_test", "root", "");
coalitiondbStatement = coalitiondbConnection.createStatement();
cloudConnection = DriverManager.getConnection("jdbc:mysql://x.x.x.x:3306/cloud_test", "root", "");
cloudStatement = cloudConnection.createStatement();
}
@Override
public void invoke(String value, Context context) throws Exception {
// 解析拿到的CDC-JSON数据
JSONObject rowJson = JSON.parseObject(value);
String outNo = rowJson.getString("out_no");
Integer userType = rowJson.getInteger("user_type");
String id = rowJson.getString("id");
String payOrderNo = rowJson.getString("pay_order_no");
String title = rowJson.getString("title");
String fromUserId = rowJson.getString("from_user_id");
String fromAccountId = rowJson.getString("from_account_id");
String userId = rowJson.getString("user_id");
String accountId = rowJson.getString("account_id");
Integer amount = rowJson.getInteger("amount");
Integer profitState = rowJson.getInteger("profit_state");
Date profitTime = rowJson.getTimestamp("profit_time");
Integer refundState = rowJson.getInteger("refund_state");
Date refundTime = rowJson.getTimestamp("refund_time");
Date addTime = rowJson.getTimestamp("add_time");
String remark = rowJson.getString("remark");
String acctCircle = rowJson.getString("acct_circle");
Integer fromUserType = rowJson.getInteger("from_user_type");
String companyId = rowJson.getString("company_id");
String bizCompanyId = rowJson.getString("biz_company_id");
// if (1 != profitState || !"PG11111".equals(acctCircle)) {
// return;
// }
//
// // 读取相关表的数据(与其他表进行关联)
// Integer bizType = null;
// String contributeUserId = null;
// String relationBrandOwnerId = null;
// ResultSet virtualOrderResultSet = coalitiondbStatement.executeQuery("select * from tc_virtual_order where order_type != 2 and id = '" + outNo + "'");
// // 如果是tc_virtual_order订单(上岗卡、安心卡、课程)
// if (virtualOrderResultSet.next()) {
// // 处理数据逻辑
// Integer virtualOrder4OrderType = virtualOrderResultSet.getInt("order_type");
// String virtualOrder4CompanyId = virtualOrderResultSet.getString("company_id");
// String virtualOrder4BrandId = virtualOrderResultSet.getString("brand_id");
// // 上岗卡订单排掉,因为已经有别的任务处理了
// if (virtualOrder4OrderType == 2) {
// return;
// }
// // orderType转换
// if (virtualOrder4OrderType == 6) {
// bizType = 10;
// } else if (virtualOrder4OrderType == 1) {
// bizType = 11;
// } else if (virtualOrder4OrderType == 5) {
// bizType = 12;
// }
// // userType转换
// if (virtualOrder4OrderType == 6 && userType == 92) {
// contributeUserId = virtualOrder4CompanyId;
// } else if (virtualOrder4OrderType == 1 && userType == 92) {
// contributeUserId = virtualOrder4CompanyId;
// } else if (virtualOrder4OrderType == 5 && userType == 92) {
// contributeUserId = virtualOrder4CompanyId;
// }
// // relationBrandOwnerId转换
// if (virtualOrder4OrderType == 6 && userType == 90) {
// relationBrandOwnerId = virtualOrder4BrandId;
// } else if (virtualOrder4OrderType == 1 && userType == 90) {
// relationBrandOwnerId = virtualOrder4BrandId;
// } else if (virtualOrder4OrderType == 5 && userType == 90) {
// relationBrandOwnerId = virtualOrder4BrandId;
// }
// // remark转换
// if (virtualOrder4OrderType == 1 || virtualOrder4OrderType == 5) {
// remark = title;
// }
// } else {
// // 如果不是tc_virtual_order的数据,则可能是其他数据,此处只保留好到家实物商品数据
// if (StringUtils.isBlank(payOrderNo)) {
// return;
// }
// ResultSet acctPayOrderResultSet = cloudStatement.executeQuery("select * from acct_pay_order t where t.id = '" + payOrderNo + "'");
// if (!acctPayOrderResultSet.next()) {
// return;
// }
// Integer payCate = acctPayOrderResultSet.getInt("pay_cate");
// if (200100 != payCate) { // 好到家实物商品类型
// return;
// }
//
// bizType = 20;
// if (userType == 92 && StringUtils.isNotBlank(bizCompanyId)) {
// contributeUserId = bizCompanyId;
// } else if (userType == 90 && StringUtils.isNotBlank(bizCompanyId)) {
// ResultSet brandOwnerIdResultSet = cloudStatement.executeQuery("select * from uc_brand_partner t where t.company_id = '" + bizCompanyId + "'");
// if (brandOwnerIdResultSet.next()) {
// relationBrandOwnerId = brandOwnerIdResultSet.getString("brand_owner_id");
// }
// }
// }
// if (StringUtils.isBlank(remark)) {
// remark = title;
// }
// 数据写入到mysql
String insertSql = "INSERT INTO dws_profit_record_hdj_flink_api (id, show_profit_id, order_no, from_user_id, from_user_type, user_id,\n" +
" user_type, amount, profit_time, state, acct_circle, biz_type,\n" +
" contribute_user_id, relation_brand_owner_id, remark, add_time)\n" +
"VALUES ('" + id + "', '" + "JSD" + id + "', '" + outNo + "', '" + fromUserId + "', " + fromUserType + ", '" + userId + "', " + userType + ",\n" +
" " + amount + ", '" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss", TimeZone.getTimeZone("GMT")) + "', " + profitState + ", '" + acctCircle + "', " + 1 + ", " + (StringUtils.isBlank("123") ? null : "'" + "contributeUserId" + "'") + ", " + (StringUtils.isBlank("relationBrandOwnerId") ? null : "'" + "relationBrandOwnerId" + "'") + ", '" + remark + "',\n" +
" '" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss", TimeZone.getTimeZone("GMT")) + "');";
cloudStatement.execute("delete from dws_profit_record_hdj_flink_api where id = '" + id + "'");
System.out.println("insertSql--->"+insertSql);
cloudStatement.execute(insertSql);
}
@Override
public void close() throws Exception {
super.close();
// 在这里关闭 JDBC 连接
coalitiondbStatement.close();
coalitiondbConnection.close();
cloudStatement.close();
cloudConnection.close();
}
}
/**
* 清晰数据
*
* @param source
* @return
*/
private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {
return source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String row, Collector<String> out) throws Exception {
try {
LOG.info("============================row:{}", row);
JSONObject rowJson = JSON.parseObject(row);
String op = rowJson.getString("op");
//history,insert,update
if (Arrays.asList("r", "c", "u").contains(op)) {
out.collect(rowJson.getJSONObject("after").toJSONString());
} else {
LOG.info("filter other op:{}", op);
}
} catch (Exception ex) {
LOG.warn("filter other format binlog:{}", row);
}
}
});
}
/**
* 过滤数据
*
* @param source
* @param table
* @return
*/
private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {
return source.filter(new FilterFunction<String>() {
@Override
public boolean filter(String row) throws Exception {
try {
JSONObject rowJson = JSON.parseObject(row);
JSONObject source = rowJson.getJSONObject("source");
String tbl = source.getString("table");
return table.equals(tbl);
} catch (Exception ex) {
ex.printStackTrace();
return false;
}
}
});
}
private static List<String> getTableList() {
List<String> tables = new ArrayList<>();
String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";
List<JSONObject> tableList = JdbcUtil.executeQuery(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);
for (JSONObject jsob : tableList) {
String schemaName = jsob.getString("TABLE_SCHEMA");
String tblName = jsob.getString("TABLE_NAME");
String schemaTbl = schemaName + "." + tblName;
if (SYNC_TABLES.contains(schemaTbl)) {
tables.add(tblName);
}
}
return tables;
}
}
JdbcUtil.java
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class JdbcUtil {
static {
try {
// Class.forName("com.mysql.cj.jdbc.Driver");
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);
public static void main(String[] args) throws SQLException {
}
public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql) {
List<JSONObject> beJson = new ArrayList<>();
String connectionUrl = String.format("jdbc:mysql://%s:%s/league_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai", hostUrl, port);
Connection con = null;
try {
con = DriverManager.getConnection(connectionUrl, user, password);
PreparedStatement ps = con.prepareStatement(sql);
ResultSet rs = ps.executeQuery();
beJson = resultSetToJson(rs);
} catch (SQLException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
con.close();
} catch (Exception e) {
}
}
return beJson;
}
private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException {
List<JSONObject> list = new ArrayList<>();
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
while (rs.next()) {
JSONObject jsonObj = new JSONObject();
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnLabel(i);
String value = rs.getString(columnName);
jsonObj.put(columnName, value);
}
list.add(jsonObj);
}
return list;
}
}
pom.xml:
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.0</version>
</dependency>
2.1 Mysql 打开 bin-log 功能
og_bin 的Value如果为ON代表开启,如果为OFF代表关闭,MySQL8.0默认是开启的:
# 查看是否开启binlog
mysql> SHOW VARIABLES LIKE '%log_bin%';
关闭状态:
- log_bin为ON代表MySQL已经开启binlog日志记录
- log_bin_basename配置了binlog的文件路径及文件前缀名
- log_bin_index配置了binlog索引文件的路径
开启状态:
# 在centos中mysql的配置文件一般都在/etc/mysql目录下,如果不在可以通过 find / -name "my.cnf" 查找
vi /etc/mysql/my.cnf
# 服务ID
server-id=1
# binlog 配置 只要配置了log_bin地址 就会开启
log_bin = /var/lib/mysql/mysql_bin
# 日志存储天数 默认0 永久保存
# 如果数据库会定期归档,建议设置一个存储时间不需要一直存储binlog日志,理论上只需要存储归档之后的日志
expire_logs_days = 30
# binlog最大值
max_binlog_size = 1024M
# 规定binlog的格式,binlog有三种格式statement、row、mixad,默认使用statement,建议使用row格式
binlog_format = ROW
# 在提交n次事务后,进行binlog的落盘,0为不进行强行的刷新操作,而是由文件系统控制刷新日志文件,如果是在线交易和账有关的数据建议设置成1,如果是其他数据可以保持为0即可
sync_binlog = 1
# 重启MySQL服务使配置生效
systemctl restart mysqld / service mysql restart
# 查看日志列表
SHOW MASTER LOGS;
可参考:MySQL 开启配置binlog以及通过binlog恢复数据
2.2 在 Mysql 中建库建表准备
CREATE DATABASE IF NOT EXISTS cloud_test;
CREATE DATABASE IF NOT EXISTS league_test;
CREATE TABLE league_test.oc_settle_profit (
id varchar(32),
show_profit_id varchar(32),
order_no varchar(32),
from_user_id varchar(32),
from_user_type int(11),
user_id varchar(32),
user_type int(11),
rate int(11),
amount int(11),
type int(11),
add_time datetime,
state int(11),
expect_profit_time datetime,
profit_time datetime,
profit_mode int(11),
opt_code varchar(32),
opt_name varchar(32),
acct_circle varchar(32),
process_state int(11),
parent_id varchar(32),
keep_account_from_user_id varchar(32),
keep_account_from_bm_user_id varchar(32),
keep_account_user_id varchar(32),
keep_account_bm_user_id varchar(32),
biz_type int(11),
remark varchar(32),
contribute_user_id varchar(32),
relation_brand_owner_id varchar(32),
PRIMARY KEY (id) USING BTREE
);
CREATE TABLE cloud_test.dws_profit_record_hdj_flink_api (
id varchar(32),
show_profit_id varchar(32),
order_no varchar(32),
from_user_id varchar(32),
from_user_type int(11),
user_id varchar(32),
user_type int(11),
amount int(11),
profit_time datetime,
state int(11),
acct_circle varchar(32),
biz_type int(11),
contribute_user_id varchar(32),
relation_brand_owner_id varchar(32),
remark varchar(32),
add_time datetime,
PRIMARY KEY (id) USING BTREE
);
2.3 遇到的坑
用 JDBC 连接 Mysql 的时候报错:The MySQL server has a timezone offset (0 seconds ahead of UTC)
原因:从错误即可知道是时区的错误。
show variables like '%time_zone%';
Variable_name |Value |
----------------+------+
time_zone |SYSTEM|
// 或者下面这条命令
SELECT @@global.time_zone;
解决:使用 root 用户登录 mysql,再执行 set global time_zone='+8:00'
命令。
注意:一开始改成了 SET GLOBAL time_zone = 'Asia/Shanghai'
,但并不好使。
2.4 测试
Idea 启动程序后,在 oc_settle_profit 表中插入数据后 dws_profit_record_hdj_flink_api 也可以同步插入相应的数据。
参考:
【博学谷学习记录】超强总结,用心分享|大数据之flinkCDC
一次打通FlinkCDC同步Mysql数据
三、番外
用 Flink CDC 可以监控 Mysql,但无法监控 StarRocks,和官方询问过,目前 StarRocks 并没有像 Mysql 这样被外部感知 DDL 操作的 bin-log 功能,所以暂时还无法用 Flink CDC 监控 StarRocks。