当前位置: 首页 > article >正文

Flink 实现超速监控:从 Kafka 读取卡口数据写入 MySQL

目录

1. 背景

2. 实现需求

2.1 数据格式

2.2 超速判断规则

3. 实现步骤

3.1 创建 Kafka Topic

3.2 准备数据发送工具

3.3 Flink 实现代码

4. 代码说明

5. 项目运行验证

6. 总结


1. 背景

在智慧交通项目中,监控车辆是否超速是一个常见的需求。通过 Flink 处理流数据,可以实时监控车辆通过卡口时的速度,并将超速车辆信息写入数据库供后续分析。

本文将展示如何从 Kafka 的 topic-car 中读取车辆卡口数据,筛选出超速车辆,并将其信息写入 MySQL 数据库。


2. 实现需求

2.1 数据格式

  • Kafka 数据格式(JSON 示例):
{"action_time": 1682219447, "monitor_id": "0001", "camera_id": "1", "car": "豫A12345", "speed": 34.5, "road_id": "01", "area_id": "20"}
  • MySQL 表结构:
CREATE TABLE t_speeding_info (
    id INT AUTO_INCREMENT PRIMARY KEY,
    car VARCHAR(255) NOT NULL,
    monitor_id VARCHAR(255),
    road_id VARCHAR(255),
    real_speed DOUBLE,
    limit_speed INT,
    action_time BIGINT
);
DROP TABLE IF EXISTS `t_monitor_info`;
CREATE TABLE `t_monitor_info` (
  `monitor_id` varchar(255) NOT NULL,  
  `road_id` varchar(255) NOT NULL,
  `speed_limit` int(11) DEFAULT NULL,
  `area_id` varchar(255) DEFAULT NULL,
   PRIMARY KEY (`monitor_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--导入数据
INSERT INTO `t_monitor_info` VALUES ('0000', '02', 60, '01');
INSERT INTO `t_monitor_info` VALUES ('0001', '02', 60, '02');
INSERT INTO `t_monitor_info` VALUES ('0002', '03', 80, '01');
INSERT INTO `t_monitor_info` VALUES ('0004', '05', 100, '03');
INSERT INTO `t_monitor_info` VALUES ('0005', '04', 0, NULL);
INSERT INTO `t_monitor_info` VALUES ('0021', '04', 0, NULL);
INSERT INTO `t_monitor_info` VALUES ('0023', '05', 0, NULL);

2.2 超速判断规则

当车辆通过卡口时的 车速(speed)超过限速(limitSpeed)的 1.2 倍,即视为超速,将数据写入 t_speeding_info 表。


3. 实现步骤

3.1 创建 Kafka Topic

在 Kafka 中创建一个名为 topic-car 的主题:

kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic-car

3.2 准备数据发送工具

通过 Kafka Producer 向 topic-car 发送数据:

kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic topic-car

示例数据:

{"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":34.5,"road_id":"01","area_id":"20"}

3.3 Flink 实现代码

3.3.1 定义数据模型
package com.bigdata.day05;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CarInfo {
    private long actionTime;
    private String monitorId;
    private String cameraId;
    private String car;
    private double speed;
    private String roadId;
    private String areaId;

    // 限速字段,用于超速判断
    private double limitSpeed;
}
3.3.2 Flink 处理逻辑
package com.bigdata.windows;

import com.bigdata.day05.CarInfo;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.jdbc.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import com.alibaba.fastjson.JSON;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;

public class SpeedingMonitor {
    public static void main(String[] args) throws Exception {
        // 1. 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 配置 Kafka 消费者
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092");
        kafkaProps.setProperty("group.id", "car-monitor-group");
        
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "topic-car",
                new SimpleStringSchema(),
                kafkaProps
        );

        // 3. 从 Kafka 中读取数据流
        DataStream<String> kafkaStream = env.addSource(kafkaConsumer);

        // 4. 数据处理:解析 JSON,判断超速
        DataStream<CarInfo> filteredStream = kafkaStream
                .map(line -> {
                    // 将 JSON 数据转换为 CarInfo 对象
                    CarInfo carInfo = JSON.parseObject(line, CarInfo.class);
                    carInfo.setLimitSpeed(120); // 设置卡口限速(假定限速为 120)
                    return carInfo;
                })
                .filter(carInfo -> carInfo.getSpeed() > carInfo.getLimitSpeed() * 1.2); // 超速判断

        // 打印超速车辆信息
        filteredStream.print();

        // 5. 数据写入 MySQL
        filteredStream.addSink(JdbcSink.sink(
                "INSERT INTO t_speeding_info (car, monitor_id, road_id, real_speed, limit_speed, action_time) VALUES (?, ?, ?, ?, ?, ?)",
                (PreparedStatement ps, CarInfo carInfo) -> {
                    ps.setString(1, carInfo.getCar());
                    ps.setString(2, carInfo.getMonitorId());
                    ps.setString(3, carInfo.getRoadId());
                    ps.setDouble(4, carInfo.getSpeed());
                    ps.setInt(5, (int) carInfo.getLimitSpeed());
                    ps.setLong(6, carInfo.getActionTime());
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(1) // 每次写入一条数据
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUrl("jdbc:mysql://localhost:3306/smart_traffic")
                        .withUsername("root")
                        .withPassword("123456")
                        .build()
        ));

        // 6. 启动任务
        env.execute("Speeding Monitor");
    }
}

4. 代码说明

  1. Kafka 数据流处理

    • 使用 FlinkKafkaConsumer 从 Kafka 读取实时流数据。
    • 通过 FastJSON 将 JSON 数据解析为 Java 对象。
  2. 超速判断逻辑

    • 使用 .filter() 对流数据进行过滤,筛选超速车辆。
  3. MySQL Sink

    • 使用 Flink 的 JdbcSink 将超速数据写入 MySQL 表 t_speeding_info

5. 项目运行验证

5.1 启动 Flink 程序

在 IDEA 中运行 SpeedingMonitor 程序,确保 MySQL 服务正常运行。

5.2 发送测试数据

通过 Kafka Producer 向 topic-car 发送数据:

{"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":145.0,"road_id":"01","area_id":"20"}

5.3 验证 MySQL 数据

查询 MySQL 表 t_speeding_info

SELECT * FROM t_speeding_info;

结果示例:

idcarmonitor_idroad_idreal_speedlimit_speedaction_time
1豫A12345000101145.01201682219447

6. 总结

通过本文,完整实现了从 Kafka 读取车辆卡口数据,筛选出超速车辆并写入 MySQL 的流程。使用 Flink 和 Kafka 的实时处理能力,可以轻松构建高效的智慧交通系统。

如果本文对你有帮助,请点赞、收藏,并分享给更多人! 😊


http://www.kler.cn/a/412350.html

相关文章:

  • 基础入门-Web应用架构类别源码类别镜像容器建站模版编译封装前后端分离
  • 【TQ2440】02 串口连接进入u-boot
  • uniapp+vue2+uview2.0导航栏组件二次封装
  • el-row el-col显示失效问题修复
  • 微信小程序2-地图显示和地图标记
  • 海康面阵、线阵、读码器及3D相机接线说明
  • 浏览器开发工具
  • java——SpringBoot中常用注解及其底层原理
  • SSM之AOP与事务
  • 缓存雪崩、击穿、穿透深度解析与实战应对
  • 使用OpenCV实现视频背景减除与目标检测
  • 【QT】背景,安装和介绍
  • 【云计算网络安全】解析 Amazon 安全服务:构建纵深防御设计最佳实践
  • docker-compose文件的简介及使用
  • Git 使用技巧
  • 鸿蒙开发异步与线程
  • 使用Cmake导入OpenCV库的大坑记录
  • 如何将 GitHub 私有仓库(private)转换为公共仓库(public)
  • 反爬虫机制
  • 【大数据学习 | Spark-SQL】SparkSession对象
  • 从ETL到DataOps:WhaleStudio替代Informatica,实现信创化升级
  • 计算机网络 实验八 应用层相关协议分析
  • 【NOIP普及组】表达式求值
  • 学习threejs,设置envMap环境贴图创建反光效果
  • Qt程序发布及打包成exe安装包
  • 微信小程序首页搜索框的实现教程