Flink 实现超速监控:从 Kafka 读取卡口数据写入 MySQL
目录
1. 背景
2. 实现需求
2.1 数据格式
2.2 超速判断规则
3. 实现步骤
3.1 创建 Kafka Topic
3.2 准备数据发送工具
3.3 Flink 实现代码
4. 代码说明
5. 项目运行验证
6. 总结
1. 背景
在智慧交通项目中,监控车辆是否超速是一个常见的需求。通过 Flink 处理流数据,可以实时监控车辆通过卡口时的速度,并将超速车辆信息写入数据库供后续分析。
本文将展示如何从 Kafka 的 topic-car
中读取车辆卡口数据,筛选出超速车辆,并将其信息写入 MySQL 数据库。
2. 实现需求
2.1 数据格式
- Kafka 数据格式(JSON 示例):
{"action_time": 1682219447, "monitor_id": "0001", "camera_id": "1", "car": "豫A12345", "speed": 34.5, "road_id": "01", "area_id": "20"}
- MySQL 表结构:
CREATE TABLE t_speeding_info (
id INT AUTO_INCREMENT PRIMARY KEY,
car VARCHAR(255) NOT NULL,
monitor_id VARCHAR(255),
road_id VARCHAR(255),
real_speed DOUBLE,
limit_speed INT,
action_time BIGINT
);
DROP TABLE IF EXISTS `t_monitor_info`;
CREATE TABLE `t_monitor_info` (
`monitor_id` varchar(255) NOT NULL,
`road_id` varchar(255) NOT NULL,
`speed_limit` int(11) DEFAULT NULL,
`area_id` varchar(255) DEFAULT NULL,
PRIMARY KEY (`monitor_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--导入数据
INSERT INTO `t_monitor_info` VALUES ('0000', '02', 60, '01');
INSERT INTO `t_monitor_info` VALUES ('0001', '02', 60, '02');
INSERT INTO `t_monitor_info` VALUES ('0002', '03', 80, '01');
INSERT INTO `t_monitor_info` VALUES ('0004', '05', 100, '03');
INSERT INTO `t_monitor_info` VALUES ('0005', '04', 0, NULL);
INSERT INTO `t_monitor_info` VALUES ('0021', '04', 0, NULL);
INSERT INTO `t_monitor_info` VALUES ('0023', '05', 0, NULL);
2.2 超速判断规则
当车辆通过卡口时的 车速(speed)超过限速(limitSpeed)的 1.2 倍,即视为超速,将数据写入 t_speeding_info
表。
3. 实现步骤
3.1 创建 Kafka Topic
在 Kafka 中创建一个名为 topic-car
的主题:
kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic-car
3.2 准备数据发送工具
通过 Kafka Producer 向 topic-car
发送数据:
kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic topic-car
示例数据:
{"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":34.5,"road_id":"01","area_id":"20"}
3.3 Flink 实现代码
3.3.1 定义数据模型
package com.bigdata.day05;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CarInfo {
private long actionTime;
private String monitorId;
private String cameraId;
private String car;
private double speed;
private String roadId;
private String areaId;
// 限速字段,用于超速判断
private double limitSpeed;
}
3.3.2 Flink 处理逻辑
package com.bigdata.windows;
import com.bigdata.day05.CarInfo;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.jdbc.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import com.alibaba.fastjson.JSON;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
public class SpeedingMonitor {
public static void main(String[] args) throws Exception {
// 1. 创建 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 配置 Kafka 消费者
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092");
kafkaProps.setProperty("group.id", "car-monitor-group");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"topic-car",
new SimpleStringSchema(),
kafkaProps
);
// 3. 从 Kafka 中读取数据流
DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
// 4. 数据处理:解析 JSON,判断超速
DataStream<CarInfo> filteredStream = kafkaStream
.map(line -> {
// 将 JSON 数据转换为 CarInfo 对象
CarInfo carInfo = JSON.parseObject(line, CarInfo.class);
carInfo.setLimitSpeed(120); // 设置卡口限速(假定限速为 120)
return carInfo;
})
.filter(carInfo -> carInfo.getSpeed() > carInfo.getLimitSpeed() * 1.2); // 超速判断
// 打印超速车辆信息
filteredStream.print();
// 5. 数据写入 MySQL
filteredStream.addSink(JdbcSink.sink(
"INSERT INTO t_speeding_info (car, monitor_id, road_id, real_speed, limit_speed, action_time) VALUES (?, ?, ?, ?, ?, ?)",
(PreparedStatement ps, CarInfo carInfo) -> {
ps.setString(1, carInfo.getCar());
ps.setString(2, carInfo.getMonitorId());
ps.setString(3, carInfo.getRoadId());
ps.setDouble(4, carInfo.getSpeed());
ps.setInt(5, (int) carInfo.getLimitSpeed());
ps.setLong(6, carInfo.getActionTime());
},
JdbcExecutionOptions.builder()
.withBatchSize(1) // 每次写入一条数据
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUrl("jdbc:mysql://localhost:3306/smart_traffic")
.withUsername("root")
.withPassword("123456")
.build()
));
// 6. 启动任务
env.execute("Speeding Monitor");
}
}
4. 代码说明
-
Kafka 数据流处理:
- 使用
FlinkKafkaConsumer
从 Kafka 读取实时流数据。 - 通过 FastJSON 将 JSON 数据解析为 Java 对象。
- 使用
-
超速判断逻辑:
- 使用
.filter()
对流数据进行过滤,筛选超速车辆。
- 使用
-
MySQL Sink:
- 使用 Flink 的
JdbcSink
将超速数据写入 MySQL 表t_speeding_info
。
- 使用 Flink 的
5. 项目运行验证
5.1 启动 Flink 程序
在 IDEA 中运行 SpeedingMonitor
程序,确保 MySQL 服务正常运行。
5.2 发送测试数据
通过 Kafka Producer 向 topic-car
发送数据:
{"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":145.0,"road_id":"01","area_id":"20"}
5.3 验证 MySQL 数据
查询 MySQL 表 t_speeding_info
:
SELECT * FROM t_speeding_info;
结果示例:
id | car | monitor_id | road_id | real_speed | limit_speed | action_time |
---|---|---|---|---|---|---|
1 | 豫A12345 | 0001 | 01 | 145.0 | 120 | 1682219447 |
6. 总结
通过本文,完整实现了从 Kafka 读取车辆卡口数据,筛选出超速车辆并写入 MySQL 的流程。使用 Flink 和 Kafka 的实时处理能力,可以轻松构建高效的智慧交通系统。
如果本文对你有帮助,请点赞、收藏,并分享给更多人! 😊