二百六十四、Java——Java采集Kafka主题A的JSON数据,解析成一条条数据,然后写入Kafka主题B中
一、目的
由于Hive是单机环境,因此庞大的原始JSON数据在Hive中解析的话就太慢了,必须放在Hive之前解析成一个个字段、一条条CSV数据
二、IDEA创建SpringBoot项目
三、项目中各个文件
3.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hurys</groupId> <artifactId>hurrys-jw-kafka</artifactId> <version>1.0.0</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.6.13</spring-boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>${spring-boot.version}</version> <executions> <execution> <id>repackage</id> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
3.2 application.yml
kafka: servers: 192.168.10.12:9092 server: port: 9830 spring: application: name: jw-kafka kafka: bootstrap-servers: ${kafka.servers} consumer: group-id: jw-kafka key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
3.3 log4j2.xml
<?xml version="1.0" encoding="UTF-8"?> <!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出--> <!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数--> <Configuration status="OFF" monitorInterval="600"> <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL 如果查看DEBUG级别日志,需要修改<RollingFile name="RollingFileInfo"> <ThresholdFilter level="INFO">和<root level="DEBUG">--> <!--变量配置--> <Properties> <!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符--> <!-- %logger{36} 表示 Logger 名字最长36个字符 --> <property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %class{36} %M() @%L - %msg%n"/> <!-- 定义日志存储的路径 --> <property name="FILE_PATH" value="/home/hurys-log/jw-kafka"/> <property name="FILE_DAY_PATH" value="/home/hurys-log/jw-kafka/%d{yyyy-MM}/%d{yyyy-MM-dd}"/> </Properties> <Appenders> <!-- 这个输出到控制台的配置--> <Console name="Console" target="SYSTEM_OUT"> <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) --> <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/> <!--输出日志的格式--> <PatternLayout pattern="${LOG_PATTERN}"/> </Console> <!-- 这个会打印出所有的info及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档--> <RollingFile name="RollingFileInfo" fileName="${FILE_PATH}/info.log" filePattern="${FILE_DAY_PATH}/INFO-%d{yyyy-MM-dd}_%i.log.gz"> <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) --> <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/> <!-- 输出格式 --> <PatternLayout pattern="${LOG_PATTERN}"/> <Policies> <!--interval属性用来指定多久滚动一次,默认是1 hour--> <TimeBasedTriggeringPolicy modulate="true" interval="1"/> <!-- 此处为每个文件大小策略限制,使用它一般会在文件中filePattern采用%i模式 --> <SizeBasedTriggeringPolicy size="100MB"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--> <DefaultRolloverStrategy max="20"> <!-- 从basePath到日志文件路径%d{yyyy-MM}/%d{yyyy-MM-dd}/INFO-%d{yyyy-MM-dd}_%i.log.gz的maxDepth是3--> <Delete basePath="${FILE_PATH}" maxDepth="3"> <!-- 这里的age必须和filePattern协调, 后者是精确到dd, 这里就要写成xd, xD就不起作用 另外, 数字最好>2, 否则可能造成删除的时候, 最近的文件还处于被占用状态,导致删除不成功!--> <IfLastModified age="30d"/> </Delete> </DefaultRolloverStrategy> </RollingFile> <!-- 这个会打印出所有的warn及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档--> <RollingFile name="RollingFileWarn" fileName="${FILE_PATH}/warn.log" filePattern="${FILE_DAY_PATH}/WARN-%d{yyyy-MM-dd}_%i.log.gz"> <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Policies> <!--interval属性用来指定多久滚动一次,默认是1 hour--> <TimeBasedTriggeringPolicy modulate="true" interval="1"/> <SizeBasedTriggeringPolicy size="100MB"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--> <DefaultRolloverStrategy max="15"/> </RollingFile> <!-- 这个会打印出所有的error及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档--> <RollingFile name="RollingFileError" fileName="${FILE_PATH}/error.log" filePattern="${FILE_DAY_PATH}/ERROR-%d{yyyy-MM-dd}_%i.log.gz"> <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="${LOG_PATTERN}"/> <Policies> <!--interval属性用来指定多久滚动一次,默认是1 hour--> <TimeBasedTriggeringPolicy interval="1"/> <SizeBasedTriggeringPolicy size="100MB"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖--> <DefaultRolloverStrategy max="15"/> </RollingFile> </Appenders> <!--Logger节点用来单独指定日志的形式,比如要为指定包下的class指定不同的日志级别等。--> <!--然后定义loggers,只有定义了logger并引入的appender,appender才会生效--> <loggers> <!--过滤掉spring和mybatis的一些无用的DEBUG信息--> <logger name="org.mybatis" level="error" additivity="false"> <AppenderRef ref="Console"/> </logger> <!--监控系统信息--> <!--若是additivity设为false,则 子Logger 只会在自己的appender里输出,而不会在 父Logger 的appender里输出。--> <Logger name="org.springframework" level="error" additivity="false"> <AppenderRef ref="Console"/> </Logger> <root level="INFO"> <appender-ref ref="Console"/> <appender-ref ref="RollingFileInfo"/> <appender-ref ref="RollingFileWarn"/> <appender-ref ref="RollingFileError"/> </root> </loggers> </Configuration>
3.4 KafkaConstants
package com.hurys.kafka.constant; public interface KafkaConstants { /** * 静态排队数据 */ String TOPIC_INTERNAL_DATA_STATIC_QUEUE = "topic_internal_data_static_queue"; /** * 动态排队数据 */ String TOPIC_INTERNAL_DATA_DYNAMIC_QUEUE = "topic_internal_data_dynamic_queue"; }
3.5 JsonUtil
package com.hurys.kafka.util; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; public class JsonUtil { /** * 将对象转换为 JSON 字符串,不忽略空值字段。 * * @param object 要序列化的对象 * @return 转换后的 JSON 字符串 */ public static String objectToJson(Object object) { return JSON.toJSONString(object, SerializerFeature.WriteMapNullValue); } }
3.6 KafkaApplication
package com.hurys.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }
3.7 KafkaServiceListener
package com.hurys.kafka.listener; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.hurys.kafka.constant.KafkaConstants; import com.hurys.kafka.util.JsonUtil; import lombok.extern.log4j.Log4j2; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; /** * kafka消费service * * @author wangjing * @Date 2024/09/09 */ @Service @Log4j2 public class KafkaServiceListener { @Resource private KafkaTemplate kafkaTemplate; // 1、转向比数据 @KafkaListener(topics = KafkaConstants.TOPIC_INTERNAL_DATA_TURN_RATIO) public void processData(String message) { try { JSONObject jsonObject = JSON.parseObject(message); // System.out.println("原始数据"+JsonUtil.objectToJson(jsonObject)); //获取雷达数据 String device_no = jsonObject.getString("deviceNo"); String source_device_type = jsonObject.getString("sourceDeviceType"); String sn = jsonObject.getString("sn"); String model = jsonObject.getString("model"); String createTime = jsonObject.getString("createTime"); String create_time = createTime.substring(0,19); JSONObject data = jsonObject.getJSONObject("data"); String cycle = data.getString("cycle"); String volume_sum = data.getString("volumeSum"); String speed_avg = data.getString("speedAvg"); String volume_left = data.getString("volumeLeft"); String speed_left = data.getString("speedLeft"); String volume_straight = data.getString("volumeStraight"); String speed_straight = data.getString("speedStraight"); String volume_right = data.getString("volumeRight"); String speed_right = data.getString("speedRight"); String volume_turn = data.getString("volumeTurn"); String speed_turn = data.getString("speedTurn"); String outputLine = (device_no +","+source_device_type+","+sn+","+model+","+create_time+","+cycle+","+volume_sum+","+speed_avg+","+ volume_left+","+speed_left+","+volume_straight+","+speed_straight+","+volume_right+","+speed_right+","+volume_turn+","+speed_turn); // System.out.println("outputLine数据1"+outputLine); kafkaTemplate.send("topic_db_data_turn_ratio", outputLine); } catch (Exception e) { log.error("process turn_ratio error", e); } } // 2、静态排队数据 @KafkaListener(topics = KafkaConstants.TOPIC_INTERNAL_DATA_STATIC_QUEUE) public void processData2(String message) { try { JSONObject jsonObject = JSON.parseObject(message); //获取雷达数据 String device_no = jsonObject.getString("deviceNo"); String source_device_type = jsonObject.getString("sourceDeviceType"); String sn = jsonObject.getString("sn"); String model = jsonObject.getString("model"); String createTime = jsonObject.getString("createTime"); String create_time = createTime.substring(0,19); JSONObject data = jsonObject.getJSONObject("data"); List<JSONObject> queueList = data.getJSONArray("queueList").toJavaList(JSONObject.class); for (JSONObject queueItem:queueList) { String lane_no = queueItem.getString("laneNo"); String lane_type = queueItem.getString("laneType"); String queue_count = queueItem.getString("queueCount"); String queue_len = queueItem.getString("queueLen"); String queue_head = queueItem.getString("queueHead"); String queue_tail = queueItem.getString("queueTail"); String outputLine = ( device_no+","+ source_device_type+","+ sn+","+ model+","+ create_time+","+ lane_no+","+ lane_type+","+ queue_count+","+ queue_len+","+ queue_head+","+queue_tail); System.out.println("outputLine数据2"+outputLine); kafkaTemplate.send("topic_db_data_static_queue", outputLine); } } catch (Exception e) { log.error("process static_queue error", e); } } // 7、区域数据 @KafkaListener(topics = KafkaConstants.TOPIC_INTERNAL_DATA_AREA) public void processData7(String message) { try { JSONObject jsonObject = JSON.parseObject(message); //获取雷达数据 String device_no = jsonObject.getString("deviceNo"); String source_device_type = jsonObject.getString("sourceDeviceType"); String sn = jsonObject.getString("sn"); String model = jsonObject.getString("model"); String createTime = jsonObject.getString("createTime"); String create_time = createTime.substring(0,19); JSONObject data = jsonObject.getJSONObject("data"); List<JSONObject> areaStatusList = data.getJSONArray("areaStatusList").toJavaList(JSONObject.class); for (JSONObject areaStatus:areaStatusList) { String area_no = areaStatus.getString("areaNo"); List<JSONObject> laneStatusList = areaStatus.getJSONArray("laneStatusList").toJavaList(JSONObject.class); for (JSONObject laneItem : laneStatusList) { String lane_no = laneItem.getString("laneNo"); String lane_type = laneItem.getString("laneType"); String target_count = laneItem.getString("targetCount"); String space_occupancy = laneItem.getString("spaceOccupancy"); String pareto = laneItem.getString("pareto"); String speed_avg = laneItem.getString("speedAvg"); String speed_head = laneItem.getString("speedHead"); String speed_tail = laneItem.getString("speedTail"); String pos_head = laneItem.getString("posHead"); String pos_tail = laneItem.getString("posTail"); String average_arrival_time = laneItem.getString("averageArrivalTime"); String head_position = laneItem.getString("headPosition"); String tail_position = laneItem.getString("tailPosition"); String outputLine = (device_no + "," + source_device_type + "," + sn+ "," +model+ "," +create_time+ "," + lane_no+ "," + lane_type+ "," + target_count+ "," + space_occupancy+ "," + pareto+ "," + speed_avg+ "," + speed_head+ "," + speed_tail+ "," + pos_head+ "," + pos_tail+ "," + area_no+ "," + average_arrival_time+ "," + head_position+ "," + tail_position); // System.out.println("outputLine数据7" + outputLine); kafkaTemplate.send("topic_db_data_area", outputLine); } } } catch (Exception e) { log.error("process area error", e); } } // 8、统计数据 @KafkaListener(topics = KafkaConstants.TOPIC_INTERNAL_DATA_STATISTICS) public void processData8(String message) { try { JSONObject jsonObject = JSON.parseObject(message); //获取雷达数据 String device_no = jsonObject.getString("deviceNo"); String source_device_type = jsonObject.getString("sourceDeviceType"); String sn = jsonObject.getString("sn"); String model = jsonObject.getString("model"); String createTime = jsonObject.getString("createTime"); String create_time = createTime.substring(0,19); JSONObject data = jsonObject.getJSONObject("data"); String cycle = data.getString("cycle"); List<JSONObject> sectionList = data.getJSONArray("sectionList").toJavaList(JSONObject.class); for (JSONObject sectionStatus:sectionList) { String section_no = sectionStatus.getString("sectionNo"); List<JSONObject> coilList = sectionStatus.getJSONArray("coilList").toJavaList(JSONObject.class); for (JSONObject coilItem : coilList) { String lane_no = coilItem.getString("laneNo"); String lane_type = coilItem.getString("laneType"); String coil_no = coilItem.getString("coilNo"); String volume_sum = coilItem.getString("volumeSum"); String volume_person = coilItem.getString("volumePerson"); String volume_car_non = coilItem.getString("volumeCarNon"); String volume_car_small = coilItem.getString("volumeCarSmall"); String volume_car_middle = coilItem.getString("volumeCarMiddle"); String volume_car_big = coilItem.getString("volumeCarBig"); String speed_avg = coilItem.getString("speedAvg"); String speed_85 = coilItem.getString("speed85"); String time_occupancy = coilItem.getString("timeOccupancy"); String average_headway = coilItem.getString("averageHeadway"); String average_gap = coilItem.getString("averageGap"); String outputLine = (device_no + "," + source_device_type + "," + sn+ "," +model+ "," +create_time+ "," + cycle+ "," + lane_no+ "," + lane_type+ "," + section_no+ "," + coil_no+ "," + volume_sum+ "," + volume_person+ "," + volume_car_non+ "," + volume_car_small+ "," + volume_car_middle+ "," + volume_car_big+ "," + speed_avg+ "," + speed_85+ "," + time_occupancy+ "," + average_headway+ "," + average_gap); // System.out.println("outputLine数据8" + outputLine); kafkaTemplate.send("topic_db_data_statistics", outputLine); } } } catch (Exception e) { log.error("process statistics error", e); } } // 9、事件资源 @KafkaListener(topics = KafkaConstants.TOPIC_INTERNAL_DATA_EVENT_RESOURCE) public void processData9(String message) { try { JSONObject jsonObject = JSON.parseObject(message); //获取雷达数据 String device_no = jsonObject.getString("deviceNo"); String source_device_type = jsonObject.getString("sourceDeviceType"); String sn = jsonObject.getString("sn"); String model = jsonObject.getString("model"); String createTime = jsonObject.getString("createTime"); String create_time = createTime.substring(0,19); JSONObject data = jsonObject.getJSONObject("data"); String event_id = data.getString("eventId"); // 获取数组中的第一个元素,并转换为文本 JSONArray pictureArray = data.getJSONArray("picture"); String picture = pictureArray.getString(0);// 获取数组中的第一个元素 // 获取数组中的第一个元素,并转换为文本 JSONArray videoArray = data.getJSONArray("video"); String video = videoArray.getString(0);// 获取数组中的第一个元素 String outputLine = (device_no +","+source_device_type+","+sn+","+model+","+create_time+","+event_id+","+picture+","+ video); // System.out.println("outputLine数据9" + outputLine); kafkaTemplate.send("topic_db_data_event_resource", outputLine); } catch (Exception e) { log.error("process event_resource error", e); } } // 10、事件数据 @KafkaListener(topics = KafkaConstants.TOPIC_INTERNAL_DATA_EVENT) public void processData10(String message) { try { JSONObject jsonObject = JSON.parseObject(message); // System.out.println("原始数据"+JsonUtil.objectToJson(jsonObject)); //获取雷达数据 String device_no = jsonObject.getString("deviceNo"); String source_device_type = jsonObject.getString("sourceDeviceType"); String sn = jsonObject.getString("sn"); String model = jsonObject.getString("model"); String createTime = jsonObject.getString("createTime"); String create_time = createTime.substring(0,19); String event_id = jsonObject.getString("eventId"); String event_type = jsonObject.getString("eventType"); String state = jsonObject.getString("state"); switch (event_type) { case "QueueOverrun": // 1、处理QueueOverrun事件 JSONObject data = jsonObject.getJSONObject("data"); String station = data.getString("station"); String flow = data.getString("flow"); List<JSONObject> queueList = data.getJSONArray("queueList").toJavaList(JSONObject.class); for (JSONObject queueItem:queueList) { String lane_no = queueItem.getString("laneNo"); String queue_len = queueItem.getString("queueLen"); String geography_head = queueItem.getString("geographyHead"); String geography_tail = queueItem.getString("geographyTail"); String queue_count = queueItem.getString("queueCount"); String speed_avg = queueItem.getString("speedAvg"); String event_type_detail = null; String area_no = null; String lane_no_original = null; String target_id = null; String target_type = null; String speed = null; String limit_speed = null; String pos_x = null; String pos_y = null; String pos_z = null; String longitude = null; String latitude = null; String altitude = null; String area_num = null; String space_occupancy = null; String congestion_grade = null; String congestion_length = null; String length = null; String width = null; String height = null; String vehicle_type = null; String vehicle_color = null; String plate_type = null; String plate_color = null; String plate_number = null; String outputLine = (device_no+","+ source_device_type+","+sn+","+model+","+ create_time+","+ event_id+","+event_type+","+event_type_detail+","+ state+","+area_no+","+station+","+ flow+","+ lane_no+","+ lane_no_original+","+target_id+","+target_type+","+ queue_len+","+ queue_count+","+ speed+","+ speed_avg+","+limit_speed+","+ pos_x+","+ pos_y+","+ pos_z+","+ geography_head+","+ geography_tail+","+longitude+","+latitude+","+altitude+","+area_num+","+ space_occupancy+","+ congestion_grade+","+ congestion_length+","+length+","+width+","+height+","+ vehicle_type+","+vehicle_color+","+plate_type+","+plate_color+","+ plate_number); // System.out.println("outputLine数据10"+outputLine); kafkaTemplate.send("topic_db_data_event", outputLine); } break; case "Debris": // 12、处理Debris事件 JSONObject data12 = jsonObject.getJSONObject("data"); String event_type_detail12 = null; String area_no12 = data12.getString("areaNo"); String station12 = data12.getString("station"); String flow12 = null; String lane_no12 = null; String lane_no_original12 = null; String target_id12 = null; String target_type12 =null; String queue_len12 = null; String queue_count12 = null; String speed12 = null; String speed_avg12 =null; String limit_speed12 = null; String pos_x12 = data12.getString("posX"); String pos_y12 = data12.getString("posY"); String pos_z12 = data12.getString("posZ"); String geography_head12 = null; String geography_tail12 = null; String longitude12 = data12.getString("longitude"); String latitude12 = data12.getString("latitude"); String altitude12 = data12.getString("altitude"); String area_num12 = null; String space_occupancy12 = null; String congestion_grade12 = null; String congestion_length12 =null; String length12 = data12.getString("length"); String width12 = data12.getString("width"); String height12 = data12.getString("height"); String vehicle_type12 = null; String vehicle_color12 = null; String plate_type12 = null; String plate_color12 = null; String plate_number12 = null; String outputLine12 = (device_no+","+ source_device_type+","+sn+","+model+","+ create_time+","+ event_id+","+event_type+","+event_type_detail12+"," + state+","+area_no12+"," +station12+","+ flow12+","+ lane_no12+","+ lane_no_original12+","+target_id12+","+target_type12+","+ queue_len12+"," + queue_count12+","+ speed12+","+ speed_avg12+"," +limit_speed12+","+ pos_x12+","+ pos_y12+","+ pos_z12+","+ geography_head12+","+ geography_tail12+"," +longitude12+","+latitude12+","+altitude12+","+area_num12+"," + space_occupancy12+","+ congestion_grade12+","+ congestion_length12+","+length12+"," +width12+","+height12+","+ vehicle_type12+","+vehicle_color12+"," +plate_type12+","+plate_color12+","+ plate_number12); // System.out.println("outputLine数据22"+outputLine12); kafkaTemplate.send("topic_db_data_event", outputLine12); break; default: // 默认处理 break; } } catch (Exception e) { log.error("process event error", e); } } }
四、启动KafkaApplication任务,可以打开Kafka主题B的消费窗口进行查看
4.1 启动KafkaApplication任务
4.2 打开Kafka主题B的消费窗口
搞定!!!