物流实时数仓ODS层——Mysql到Kafka
目录
1.采集流程
2.项目架构
3.resources目录下的log4j.properties文件
4.依赖
5.ODS层——OdsApp
6.环境入口类——CreateEnvUtil
7.kafka工具类——KafkaUtil
8.启动集群项目
这一层要从Mysql读取数据,分为事实数据和维度数据,将不同类型的数据进行不同的ETL处理,发送到kakfa中。
代码
1.采集流程
2.项目架构
3.resources目录下的log4j.properties文件
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
4.依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.tms.realtime</groupId>
<artifactId>tms-realtime</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<flink.version>1.17.0</flink.version>
<hadoop.version>3.3.4</hadoop.version>
<flink-cdc.version>2.3.0</flink-cdc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink-cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.11</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
<!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常-->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<!-- connector和format依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决-->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
5.ODS层——OdsApp
package com.atguigu.tms.realtime.app.ods;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
/**
* ODS数据的采集
*/
public class OdsApp {
public static void main(String[] args) throws Exception {
// TODO 1.获取流处理环境并指定检查点
StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
env.setParallelism(4);
// TODO 2.使用FlinkCDC从Mysql中读取数据-事实数据-保存到kafka
String dwdOption = "dwd";
String dwdServerId = "6030";
String dwdSourceName = "ods_app_dwd_source";
mysqlToKafka(dwdOption, dwdServerId, dwdSourceName, env, args);
// TODO 3.使用FlinkCDC从Mysql中读取数据-维度数据-保存到kafka
String realtimeOption = "realtime_dim";
String realtimeServerId = "6040";
String realtimeSourceName = "ods_app_realtimeDim_source";
mysqlToKafka(realtimeOption, realtimeServerId, realtimeSourceName, env, args);
env.execute();
}
public static void mysqlToKafka(String option, String serverId, String sourceName, StreamExecutionEnvironment env, String[] args) {
// TODO 1.使用FlinkCDC从Mysql中读取数据
MySqlSource<String> mysqlSource = CreateEnvUtil.getMysqlSource(option, serverId, args);
SingleOutputStreamOperator<String> strDS = env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), sourceName)
.setParallelism(1)// 并行度设置为1的原因是防止乱序
.uid(option + sourceName);
// TODO 2.简单的ETL
SingleOutputStreamOperator<String> processDS = strDS.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String jsonStr, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
try {
// 将json字符串转为json对象
JSONObject jsonObj = JSON.parseObject(jsonStr);
// after属性不为空,并且不是删除
if (jsonObj.getJSONObject("after") != null && !"d".equals(jsonObj.getString("op"))) {
// 为了防止歧义,将ts_ms字段改为ts
Long tsMs = jsonObj.getLong("ts_ms");
jsonObj.put("ts", tsMs);
jsonObj.remove("ts_ms");// 移除原来的ts_ms字段
// 符合条件以后,向下传递之前先将json对象转为json字符串
out.collect(jsonObj.toJSONString());
}
} catch (Exception e) {
e.printStackTrace();
Log.error("从Flink-CDC得到的数据不是一个标准的json格式");
}
}
}).setParallelism(1);// 防止乱序
// TODO 3.按照主键进行分许,避免出现乱序,主键就是after下的id字段
KeyedStream<String, String> keyedDS = processDS.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String jsonStr) throws Exception {
// 获取当前的key
// 流中的字符串转为json对象
JSONObject jsonObj = JSON.parseObject(jsonStr);
return jsonObj.getJSONObject("after").getString("id");
}
});
// TODO 4.将数据写到kafka主题中
keyedDS.sinkTo(KafkaUtil.getKafkaSink("tms_ods", sourceName + "_transPre", args))
.uid(option + "_ods_app_sink");
}
}
6.环境入口类——CreateEnvUtil
package com.atguigu.tms.realtime.utils;
import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverterConfig;
import java.util.HashMap;
/**
* 获取执行环境
* flinkCDC读取mysqlSource的原因是将自己伪装成从机
*/
public class CreateEnvUtil {
//获取流处理环境
public static StreamExecutionEnvironment getStreamEnv(String[] args) {
//TODO 1.基本环境准备
//1.1 指定流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 2.检查点相关的设置
//2.1 开启检查点
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
//2.2 设置检查点的超时时间
env.getCheckpointConfig().setCheckpointTimeout(120000L);
//2.3 设置job取消之后 检查点是否保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.4 设置两个检查点之间的最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);
//2.5 设置重启策略
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));
//2.6 设置状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/tms/ck");
//2.7 设置操作hdfs的用户
//获取命令行参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String hdfsUserName = parameterTool.get("hadoop-user-name", "atguigu");
System.setProperty("HADOOP_USER_NAME", hdfsUserName);
return env;
}
//获取MySqlSource
public static MySqlSource<String> getMysqlSource(String option, String serverId, String[] args) {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String mysqlHostname = parameterTool.get("mysql-hostname", "hadoop102");
int mysqlPort = Integer.valueOf(parameterTool.get("mysql-port", "3306"));
String mysqlUsername = parameterTool.get("mysql-username", "root");
String mysqlPasswd = parameterTool.get("mysql-passwd", "root");
option = parameterTool.get("start-up-options", option);
// serverId是对服务器节点进行标记
serverId = parameterTool.get("server-id", serverId);
// 创建配置信息 Map 集合,将 Decimal 数据类型的解析格式配置 k-v 置于其中
HashMap config = new HashMap<>();
config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
// 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema,该 Schema 将用于 MysqlSource 的初始化
JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema =
new JsonDebeziumDeserializationSchema(false, config);
MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
.hostname(mysqlHostname)
.port(mysqlPort)
.username(mysqlUsername)
.password(mysqlPasswd)
.deserializer(jsonDebeziumDeserializationSchema);
// 读取的数据可能是维度或事实,需要通过标记来区分,从而对不同类型的数据进不同的处理
switch (option) {
// 读取事实数据
case "dwd":
String[] dwdTables = new String[]{
"tms.order_info",
"tms.order_cargo",
"tms.transport_task",
"tms.order_org_bound"};// 只读取这4个事实表
return builder
.databaseList("tms")
.tableList(dwdTables)
.startupOptions(StartupOptions.latest())// 表示从mysql的binlog最新位置读取最新的数据
.serverId(serverId)
.build();
// 读取维度数据
case "realtime_dim":
String[] realtimeDimTables = new String[]{
"tms.user_info",
"tms.user_address",
"tms.base_complex",
"tms.base_dic",
"tms.base_region_info",
"tms.base_organ",
"tms.express_courier",
"tms.express_courier_complex",
"tms.employee_info",
"tms.line_base_shift",
"tms.line_base_info",
"tms.truck_driver",
"tms.truck_info",
"tms.truck_model",
"tms.truck_team"};// 读取维度数据表15张
return builder
.databaseList("tms")
.tableList(realtimeDimTables)
.startupOptions(StartupOptions.initial())// 表示在第一次启动时对监控的数据库表执行初始快照,并继续读取最新的binlog。
.serverId(serverId)
.build();
case "config_dim":
return builder
.databaseList("tms_config")
.tableList("tms_config.tms_config_dim")
.startupOptions(StartupOptions.initial())
.serverId(serverId)
.build();
}
Log.error("不支持的操作类型!");
return null;
}
}
7.kafka工具类——KafkaUtil
package com.atguigu.tms.realtime.utils;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.io.IOException;
/**
* 操作Kafka的工具类
*/
public class KafkaUtil {
private static final String KAFKA_SERVER = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
// 获取kafkaSink的方法 事务id的前缀
public static KafkaSink<String> getKafkaSink(String topic, String transIdPrefix, String[] args) {
// 使用args参数的原因是为了从外部获取参数。在Java中,args是一个命令行参数数组,当你在命令行中运行Java程序时,你可以通过在命令行中输入参数来传递数据给程序。
// 将命令行参数对象封装为 ParameterTool 类对象
ParameterTool parameterTool = ParameterTool.fromArgs(args);
// 提取命令行传入的 key 为 topic 的配置信息,并将默认值指定为方法参数 topic
// 当命令行没有指定 topic 时,会采用默认值
topic = parameterTool.get("topic", topic);
// 如果命令行没有指定主题名称且默认值为 null 则抛出异常
if (topic == null) {
throw new IllegalArgumentException("主题名不可为空:命令行传参为空且没有默认值!");
}
// 获取命令行传入的 key 为 bootstrap-servers 的配置信息,并指定默认值
String bootstrapServers = parameterTool.get("bootstrap-severs", KAFKA_SERVER);
// 获取命令行传入的 key 为 transaction-timeout 的配置信息,并指定默认值
String transactionTimeout = parameterTool.get("transaction-timeout", 15 * 60 * 1000 + "");
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix(transIdPrefix)
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout)
.build();
return kafkaSink;
}
// 使用这个就只需要传入topic和args即可
public static KafkaSink<String> getKafkaSink(String topic, String[] args) {
return getKafkaSink(topic, topic + "_trans", args);
}
}
8.启动集群项目
开启消费者,然后启动java项目即可