当前位置: 首页 > article >正文

Java技术栈 —— Spark入门(三)之实时视频流

Java技术栈 —— Spark入门(三)之实时视频流转灰度图像

  • 一、将摄像头数据发送至kafka
  • 二、Kafka准备topic
  • 三、spark读取kafka图像数据并处理
  • 四、本地显示灰度图像(存在卡顿现象,待优化)

项目整体结构图如下

在这里插入图片描述

参考文章或视频链接
[1] Architecture-for-real-time-video-streaming-analytics

一、将摄像头数据发送至kafka

这个代码将运行在你有摄像头的机器上,缺依赖就装依赖

import cv2
import kafka
import numpy as np

# 设置 Kafka Producer
# 注意修改你的kafka地址
producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092')

# 打开摄像头(0 为默认摄像头)
cap = cv2.VideoCapture(0)

while True:
    # 从摄像头捕获帧
    ret, frame = cap.read()
    if not ret:
        break
    
    # 将图像编码为 JPEG 格式
    _, buffer = cv2.imencode('.jpg', frame)

    # 将图像作为字节数组发送到 Kafka
    producer.send('camera-images', buffer.tobytes())

    # 显示当前捕获的帧
    cv2.imshow('Video', frame)

    # 按 'q' 键退出
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 释放资源
cap.release()
cv2.destroyAllWindows()
producer.close()

二、Kafka准备topic

在准备topic之前,要先配置kafka中的config/server.properties文件,否则其它机器无法联通kafka,配置好后重启kafka。

# 找到这两个选项并修改成如下内容
listeners=PLAINTEXT://0.0.0.0:9092
# 改成你的kafka所在服务器ip
advertised.listeners=PLAINTEXT://{your_ip}:9092

如果你之前创建过topic,那就清空这些topic中的数据

# 设置保留时间为0,相当于立即清空数据
#bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name {your_topic_name} --add-config retention.ms=0
# 恢复原始保留设置,立即清空数据后,将数据的保留时间恢复至原有状态
#bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name {your_topic_name} --add-config retention.ms=604800000


开始正式创建topic

# 创建输入图片所在topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic camera-images --partitions 1 --replication-factor 1
# 创建输出的gray灰度图片所在topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic result-gray-images --partitions 1 --replication-factor 1

# 准备好后查看下topic list进行验证
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# 查看某topic中的数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {your_topic_name} --from-beginning

三、spark读取kafka图像数据并处理

首先给你的spark脚本所运行的python环境(这个环境一般可以为conda等虚拟环境),安装必要的依赖库

pip install opencv-python-headless

准备脚本文件

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType
import cv2
import numpy as np

bootstrapServers = "localhost:9092"

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("Kafka-Spark-OpenCV") \
    .getOrCreate()

# 初始化 Kafka Producer,用于发送处理后的图像
# 如果不这样做,会出现PicklingError,因为如果UDF中,包含了无法被序列化的对象,例如线程锁(_thread.RLock)或 Kafka 的 KafkaProducer 实例,序列化就会失败。
# 因此,在每个执行器内部,创建 KafkaProducer 实例
producer = None

# 从 Kafka 读取数据流
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "camera-images") \
  .load()

# UDF 用于将图像转换为灰度
def convert_to_gray(image_bytes):
    global producer

    # 创建 KafkaProducer 实例(在每个执行器上只初始化一次)
    if producer is None:
        producer = KafkaProducer(bootstrap_servers = bootstrapServers)

    # 将字节数组转换为 numpy 数组
    nparr = np.frombuffer(image_bytes, np.uint8)
    # 将 numpy 数组解码为图像
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    # 将图像转换为灰度
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    # 将灰度图像编码为 JPEG
    _, buffer = cv2.imencode('.jpg', gray)
    
    # 将处理后的图像发送到 Kafka 'result-gray-images' 主题
    producer.send('result-gray-images', buffer.tobytes())
    
    return buffer.tobytes()

# 注册 UDF
convert_to_gray_udf = udf(convert_to_gray, BinaryType())

# 应用 UDF 对数据进行灰度化处理
gray_df = df.withColumn("gray_image", convert_to_gray_udf("value"))

# 将处理后的数据写入文件或其他输出
query = gray_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
    
# query = gray_df\
#     .writeStream \
#     .format('kafka') \
#     .outputMode('update') \
#     .option("kafka.bootstrap.servers", bootstrapServers) \
#     .option('checkpointLocation', '/spark/job-checkpoint') \
#     .option("topic", "result-gray-images") \
#     .start()

query.awaitTermination()

spark-submit提交脚本文件:

# 1.提高内存
# 2.调整 Kafka 批次大小,减少单个批次的数据量,从而降低内存使用(这个步骤存疑)
/opt/spark-3.5.2-bin-hadoop3/bin/spark-submit \
--executor-memory 4g \
--driver-memory 4g \
--conf "spark.kafka.maxOffsetsPerTrigger=1000" \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,org.apache.kafka:kafka-clients:3.5.2 \
/opt/spark-3.5.2-bin-hadoop3/jobs/pyjobs/kafka_to_spark.py

四、本地显示灰度图像(存在卡顿现象,待优化)

import cv2
import numpy as np
from kafka import KafkaConsumer

# 设置 Kafka Consumer
consumer = KafkaConsumer(
    'result-gray-images',
    bootstrap_servers='{your_kafka_ip}:9092',
    auto_offset_reset='latest',
    enable_auto_commit=True,
    # group_id='image-display-group'
)

# 从 Kafka 主题读取灰度图像并显示
for message in consumer:
    # print("reading gray image.... ")
    # 将消息转换为 numpy 数组
    nparr = np.frombuffer(message.value, np.uint8)
    # 解码为图像
    gray_img = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE)
    # 显示灰度图像
    cv2.imshow('Gray Video', gray_img)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 释放资源
cv2.destroyAllWindows()
consumer.close()

http://www.kler.cn/news/283782.html

相关文章:

  • Dubbo如何传递链路追踪id?
  • 小琳AI课堂:使用ChatGPT API搭建系统(二)
  • innovus:如何让部分sink长到target insertion delay的长度
  • 关于OBI 在unity URP环境下使用的正确步骤
  • 网络编程(学习)2024.8.27
  • jQuery基础——选择器的补充方法——过滤方法、查找方法
  • python使用multiprocessing多进程通讯
  • 各种各样的正则表达式
  • 92. UE5 RPG 使用C++创建GE实现灼烧的负面效果
  • 达梦数据库-DM8 企业版安装指南
  • [java][代码] java中date格式化输出时间字符串
  • 《征服数据结构》LFU缓存
  • Vatee万腾平台:打造企业智能化转型的坚实后盾
  • 【Android】UIMode
  • fpga图像处理实战-双三次插值算法
  • Jmeter提取token并设置为全局变量
  • 聊聊STM32 MCU的BOOT0和BOOT1引脚
  • 浅谈Vue3和React18
  • 六个方面探讨企业为何迫切需要替换FTP
  • PyQt 迁移到 PySide
  • WPF ToolkitMVVM RelayCommand
  • 探究:Elasticsearch 文档的 _id 是 Lucene 的 docid 吗?
  • DNN学习平台(GoogleNet、SSD、FastRCNN、Yolov3)
  • C# 自动化抢购脚本:基于商品链接的实现方案
  • 【杂谈】新能源和智能车
  • 在docker中安装skywalking + es
  • 一起搭WPF架构之浅写View界面按钮的进阶设计
  • 人工智能领域面试基础问题整理(二):什么是人工智能?
  • OpenCV小练习:人脸检测
  • LVS之net模式实验