当前位置: 首页 > article >正文

使用 Python结合ffmpeg 实现单线程和多线程推流

一、引言

在本文中,我们将详细介绍如何使用 Python 进行视频的推流操作。我们将通过两个不同的实现方式,即单线程推流和多线程推流,来展示如何利用 cv2(OpenCV)和 subprocess 等库将视频帧推送到指定的 RTMP 地址。这两种方式都涉及到从摄像头读取视频帧,以及使用 ffmpeg 命令行工具将视频帧进行编码和推流的过程。

二、单线程推流

以下是单线程推流的代码:

import cv2 as cv
import subprocess as sp


def push_stream():
    # 视频读取对象
    cap = cv.VideoCapture(0) 
    fps = int(cap.get(cv.CAP_PROP_FPS))
    w = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
    ret, frame = cap.read()
    # 推流地址
    rtmpUrl = "rtmp://192.168.3.33:1935/live/"
    # 推流参数
    command = ['ffmpeg',
              '-y',
              '-f', 'rawvideo',
              '-vcodec','rawvideo',
              '-pix_fmt', 'bgr24',
              '-s', "{}x{}".format(w, h),
              '-r', str(fps),
              '-i', '-',
              '-c:v', 'libx264',
              '-pix_fmt', 'yuv420p',
              '-preset', 'ultrafast',
              '-f', 'flv', 
              rtmpUrl]
    # 创建、管理子进程
    pipe = sp.Popen(command, stdin=sp.PIPE, bufsize=10 ** 8)
    # 循环读取
    while cap.isOpened():
        # 读取一帧
        ret, frame = cap.read()
        if frame is None:
            print('read frame err!')
            continue
        # 显示一帧
        cv.imshow("frame", frame)
        # 按键退出
        if cv.waitKey(1) & 0xFF == ord('q'):
            break
        # 读取尺寸、推流
        # img=cv.resize(frame,size)
        pipe.stdin.write(frame) 
    # 关闭窗口
    cv.destroyAllWindows()
    # 停止读取
    cap.release()

在这个单线程的实现中,我们执行以下步骤:

  1. 初始化视频读取对象
    • 使用 cv2.VideoCapture(0) 来打开默认的摄像头设备。
    • 获取摄像头的帧率 fps、宽度 w 和高度 h 等参数。
  2. 设置推流地址和参数
    • 定义 rtmpUrl 作为推流的目标地址。
    • 构造 ffmpeg 的命令列表 command,该列表包含了一系列的参数,如 -y 表示覆盖输出文件、-f rawvideo 表示输入格式为原始视频等。
    • 使用 sp.Popen 创建一个子进程,将 ffmpeg 命令作为子进程运行,并且将其输入管道 stdin 连接到我们的程序。
  3. 循环读取和推流
    • 在一个 while 循环中,不断读取摄像头的帧。
    • 若读取失败,打印错误信息并继续。
    • 使用 cv2.imshow 显示当前帧,同时监听 q 键,按下 q 键时退出程序。
    • 将读取到的帧通过管道发送给 ffmpeg 进行推流。

三、多线程推流

以下是多线程推流的代码:

import queue
import threading
import cv2 as cv
import subprocess as sp


class Live(object):
    def __init__(self):
        self.frame_queue = queue.Queue()
        self.command = ""
        # 自行设置
        self.rtmpUrl = ""
        self.camera_path = ""

    def read_frame(self):
        print("开启推流")
        cap = cv.VideoCapture(self.camera_path)

        # Get video information
        fps = int(cap.get(cv.CAP_PROP_FPS))
        width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))

        # ffmpeg command
        self.command = ['ffmpeg',
                       '-y',
                       '-f', 'rawvideo',
                       '-vcodec','rawvideo',
                       '-pix_fmt', 'bgr24',
                       '-s', "{}x{}".format(width, height),
                       '-r', str(fps),
                       '-i', '-',
                       '-c:v', 'libx264',
                       '-pix_fmt', 'yuv420p',
                       '-preset', 'ultrafast',
                       '-f', 'flv', 
                       self.rtmpUrl]

        # read webcamera
        while(cap.isOpened()):
            ret, frame = cap.read()
            if not ret:
                print("Opening camera is failed")
                break

            # put frame into queue
            self.frame_queue.put(frame)

    def push_frame(self):
        # 防止多线程时 command 未被设置
        while True:
            if len(self.command) > 0:
                # 管道配置
                p = sp.Popen(self.command, stdin=sp.PIPE)
                break

        while True:
            if self.frame_queue.empty()!= True:
                frame = self.frame_queue.get()
                # process frame
                # 你处理图片的代码
                # write to pipe
                p.stdin.write(frame.tostring())

    def run(self):
        threads = [
            threading.Thread(target=Live.read_frame, args=(self,)),
            threading.Thread(target=Live.push_frame, args=(self,))
        ]
        [thread.setDaemon(True) for thread in threads]
        [thread.start() for thread in threads]

在这个多线程的实现中,我们使用了 threadingqueue 库:

  1. 类的初始化
    • 创建一个 Live 类,在 __init__ 方法中初始化帧队列 frame_queuecommandrtmpUrlcamera_path 等变量。
  2. 读取帧的线程方法
    • read_frame 方法中,使用 cv2.VideoCapture(self.camera_path) 打开摄像头。
    • 获取摄像头的参数,并构造 ffmpeg 命令。
    • 不断从摄像头读取帧,并将帧放入队列 frame_queue 中。
  3. 推流的线程方法
    • push_frame 方法中,等待 command 被设置,然后使用 sp.Popen 启动 ffmpeg 子进程。
    • 从帧队列中取出帧,并将其写入 ffmpeg 的输入管道进行推流。
  4. 启动线程
    • run 方法创建并启动两个线程,一个用于读取帧,一个用于推流,并且将它们设置为守护线程。

四、代码解释和注意事项

单线程推流

  • 这种方式相对简单,适合初学者理解。但由于是单线程操作,在处理复杂任务时可能会导致性能瓶颈,特别是在同时进行视频显示、读取和推流的情况下,可能会出现卡顿现象。

多线程推流

  • 利用多线程可以将不同的任务分配给不同的线程,提高性能。
  • frame_queue 是一个线程安全的队列,用于在两个线程之间传递帧数据,避免了数据竞争问题。
  • setDaemon(True) 使得线程在主线程结束时自动终止,防止程序无法正常退出。

五、总结

通过上述代码和解释,我们可以看到如何使用 Python 进行单线程和多线程的视频推流操作。单线程代码简单明了,但性能可能受限;多线程代码可以更好地处理高负载,但也需要注意线程安全和资源管理等问题。在实际应用中,我们可以根据具体的需求和硬件性能来选择合适的推流方式。同时,我们可以进一步优化代码,例如添加异常处理、优化帧处理逻辑等,以提高程序的稳定性和性能。


http://www.kler.cn/a/464389.html

相关文章:

  • 【RTD MCAL 篇3】 K312 MCU时钟系统配置
  • springboot3 redis 批量删除特定的 key 或带有特定前缀的 key
  • Deepseek v3 的笔记
  • C++编程库与框架实战——ZeroMQ消息队列
  • Redis(二)value 的五种常见数据类型简述
  • 探索 AIGC 的基础知识:人工智能生成内容的全景视图
  • 婚庆摄影小程序ssm+论文源码调试讲解
  • UE5.3 虚幻引擎 Windows插件开发打包(带源码插件打包、无源码插件打包)
  • 神经网络入门实战:(二十三)使用本地数据集进行训练和验证
  • Qt使用CMake编译项目时报错:#undefined reference to `vtable for MainView‘
  • 网络安全 | 量子计算与网络安全:未来的威胁与机遇
  • 量子计算:定义、使用方法和示例
  • UE4.27 Android环境下获取手机电量
  • Tushare提示本接口即将停止更新,请尽快使用Pro版接口:https://tushare.pro/document/2
  • 从0入门自主空中机器人-4-【PX4与Gazebo入门】
  • 【大模型系列 02】LLM大模型基础知识
  • 第12关:博客系统之删除评论
  • 汇编语言:从键盘输入数字字符,(计算阶乘),以无符号十进制形式输出(分支、循环程序)
  • Wend看源码-Java.util 工具类学习(上)
  • CertiK《Hack3d:2024年度安全报告》(附报告全文链接)
  • 【Java 学习】Comparable接口 和 Comparator接口,掌控排序逻辑解析,深入 Comparable 和 Comparator 的优雅切换
  • linux进阶
  • Kafka优势剖析-分布式架构
  • 迅为RK3568开发板编译Android12源码包-设置屏幕配置
  • [人工智能] 结合最新技术:Transformer、CLIP与边缘计算在提高人脸识别准确率中的应用
  • halcon中的BLOB与灰度直方图的分析与理解