当前位置: 首页 > article >正文

使用FFmpeg实现简单的拉流、推流、视频解码Demo

使用FFmpeg实现一个RTSP拉流、RTMP推流及视频解码的Demo可以分为几个主要步骤。首先,利用avformat_open_input函数打开RTSP流并使用avformat_find_stream_info解析流信息。接着,选择视频流并使用avcodec_find_decoder找到合适的解码器,使用avcodec_open2打开解码器。

然后,通过av_read_frame循环读取每一帧数据,判断是否为视频帧,如果是,则调用avcodec_send_packetavcodec_receive_frame进行解码,得到原始的YUV数据。

同时,为了实现RTMP推流,需要创建一个新的RTMP输出上下文,通过avformat_alloc_output_context2创建RTMP推流上下文,并配置输出流参数。将解码后的帧编码为RTMP流格式后,通过av_write_frame将数据推送到RTMP服务器。

该Demo展示了如何通过FFmpeg处理多媒体流,实现从RTSP拉流到RTMP推流的功能,附带了视频的解码、没有编码操作,编码反之亦然

首先是头文件--------Streamer.h

#ifndef NEW_STREAMER_H
#define NEW_STREAMER_H

extern "C" {
#include <libavcodec/avcodec.h>
#include <libavfilter/buffersink.h>
#include <libavfilter/buffersrc.h>
#include <libavformat/avformat.h>
#include <libavutil/dict.h>
}
#include <memory>
#include <string>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>

class Streamer {
public:
    typedef struct {
        int nCallBackNum = 0;
        int nExit = 0;
        int nOpenTimeOut = 0;
        time_t m_tStart;
        bool beFirst = true;

    } avformat_open_input_Runner;

    Streamer();
    ~Streamer();

    bool Start(const std::string& pull_url, const std::string& push_url);

private:

    bool openInput();

    bool openOutPut();

    bool initVideoCodecInfo();

    void decodePacket();

    bool pushVideo();

    bool readFrame();

    void threadFunc();

    void pushPacket(std::unique_ptr<AVPacket> pkt);

    std::unique_ptr<AVPacket> popPacket();

    static int interrupt_callback(void* ctx);

private:
    std::unique_ptr<AVFormatContext> pInputCtx_;
    std::unique_ptr<AVCodecContext> pCodecCtx_;
    std::unique_ptr<AVFormatContext> pOutCtx_;
    std::unique_ptr<AVPacket> pCurPacket_;
    AVCodec* codec_;
    avformat_open_input_Runner m_Runner_;
    bool bRun_;
    bool bRunDecode_;
    bool bRunRead_;
    bool bOpenPush_;
    std::int32_t timebase_num_;
    std::int32_t timebase_den_;
    std::thread streamThread_;
    std::thread decodeThread_;
    std::string strInputUrl_;
    std::string strOutUrl_;
    std::int32_t vIndex_;
    std::mutex mutex_;
    std::queue<std::unique_ptr<AVPacket>> qPacket_;
    std::queue<AVFrame*> qFrame_;
    std::condition_variable cond_;
    std::int64_t last_dts_;
};


#endif

源文件Streamer.cpp

#include <iostream>
#include "Streamer.h"

Streamer::Streamer() {
    bRun_ = false;
    bRunDecode_ = false;
    bOpenPush_ = false;
    bRunRead_ = false;
    last_dts_ = 0;
}

Streamer::~Streamer() {
    bRun_ = false;
    bRunDecode_ = false;
    bOpenPush_ = false;
    bRunRead_ = false;
    last_dts_ = 0;

    if (streamThread_.joinable()) {
        streamThread_.join();
    }

    if (decodeThread_.joinable()) {
        decodeThread_.join();
    }
}


bool Streamer::Start(const std::string& pull_url, const std::string& push_url) {
    av_register_all();
    avformat_network_init();
    strInputUrl_ = pull_url;
    strOutUrl_ = push_url;

    bRun_ = true;
    streamThread_ = std::thread(&Streamer::threadFunc, this);
    decodeThread_ = std::thread(&Streamer::decodePacket, this);

    return true;
}

int Streamer::interrupt_callback(void* ctx) {
    avformat_open_input_Runner* input = (avformat_open_input_Runner*) ctx;
    int nTimeOut = input->nOpenTimeOut;
    if (input->beFirst) {

        if (time(nullptr) - input->m_tStart >= nTimeOut) {
            // 等待超过10s则中断
            input->nExit = 1;
        }
    }

    return input->nExit;
}

bool Streamer::openInput() {
    AVDictionary* pTmp = nullptr;
    //av_dict_set(&pTmp, "rtsp_transport", "tcp", 0);
    //av_dict_set(&pTmp, "stimeout", "5000000", 0); // 设置超时15秒
    //av_dict_set(&pTmp, "maxdelay", "5000000", 0); // 设置最大时延
    //av_dict_set(&pTmp, "flvflags", "no_duration_filesize", 0);
    //av_dict_set(&pTmp, "buffer_size", "10240000", 0); // 设置缓存大小,1080p可将值调大
    //av_dict_set(&pTmp, "tcp_nodelay", "true", 0);

    AVFormatContext* Ctx{};
    AVPacket* packet = nullptr;
    Ctx = avformat_alloc_context();
    packet = av_packet_alloc();

    if (Ctx == nullptr || packet == nullptr) {
        std::cout << "avformat_alloc_context failed" << std::endl;
        return false;
    }

    pCurPacket_.reset(packet);

    Ctx->interrupt_callback.callback = interrupt_callback;
    Ctx->interrupt_callback.opaque = &m_Runner_;
    m_Runner_.nCallBackNum = 0;
    m_Runner_.nExit = 0;
    m_Runner_.nOpenTimeOut = 10;
    m_Runner_.m_tStart = time(NULL);
    m_Runner_.beFirst = true;

    if (avformat_open_input(&Ctx, strInputUrl_.c_str(), nullptr, &pTmp) < 0) {
        std::cout << "avformat_open_input failed, Input Url:" << strInputUrl_ << std::endl;
        return false;
    }

    Ctx->max_analyze_duration = 5 * AV_TIME_BASE;
    Ctx->probesize = 100 * 1024;
    std::cout << "start avformat_find_stream_info" << std::endl;
    if (avformat_find_stream_info(Ctx, nullptr) < 0) {
         std::cout << "avformat_find_stream_info failed" << std::endl;
        return false;
    }
    m_Runner_.beFirst = false;
    std::cout << "end avformat_find_stream_info" << std::endl;

    AVCodec* codec{};
    auto VIndex = av_find_best_stream(Ctx, AVMEDIA_TYPE_VIDEO, -1, -1, &codec, 0);
    if (VIndex < 0) {
        std::cout << "av_find_best_stream failed" << std::endl;
        return false;
    }

    vIndex_ = VIndex;

    auto&& stream = Ctx->streams[vIndex_];

    timebase_num_ = stream->time_base.num;
    timebase_den_ = stream->time_base.den;

    pInputCtx_.reset(Ctx);
    return true;
}

bool Streamer::openOutPut() {
    AVFormatContext* OutCtx{};
    avformat_alloc_output_context2(&OutCtx, nullptr, "flv", strOutUrl_.c_str());

    if (avio_open2(&OutCtx->pb, strOutUrl_.c_str(), AVIO_FLAG_WRITE, nullptr, nullptr) < 0) {
        std::cout<<"avio_open2 Failed, Push Url: " << strOutUrl_<< std::endl;
        return false;
    }

    pOutCtx_.reset(OutCtx);

    if (!initVideoCodecInfo()) {
        std::cout << "initVideoCodecInfo Failed" << std::endl;
        return false;
    }

    auto nRet = avformat_write_header(pOutCtx_.get(), nullptr);

    if (nRet != AVSTREAM_INIT_IN_WRITE_HEADER && nRet != AVSTREAM_INIT_IN_INIT_OUTPUT) {
        std::cout << "avformat_write_header Failed" << std::endl;
        return false;
    }

    return true;
}

bool Streamer::initVideoCodecInfo() {
    codec_ = (AVCodec*) avcodec_find_decoder(pInputCtx_->streams[vIndex_]->codecpar->codec_id);

    if (!codec_) {
        std::cout << "avcodec_find_decoder Failed" << std::endl;
        return false;
    }

    AVStream* pVideoStream = avformat_new_stream(pOutCtx_.get(), codec_);
    if (!pVideoStream) {
        std::cout << "avformat_new_stream Failed" << std::endl;
        return false;
    }

    AVCodecContext* CodecCtx{};
    CodecCtx = avcodec_alloc_context3(codec_);
    if (!CodecCtx) {
        std::cout << "avcodec_alloc_context3 Failed" << std::endl;
        return false;
    }

    if (avcodec_parameters_to_context(CodecCtx, pInputCtx_->streams[vIndex_]->codecpar) < 0) {
        std::cout << "avcodec_parameters_to_context Failed" << std::endl;
        return false;
    }

    if (pOutCtx_->oformat->flags & AVFMT_GLOBALHEADER) {
        CodecCtx->flags |= AV_CODEC_FLAG_GLOBAL_HEADER;
    }

    if (avcodec_open2(CodecCtx, codec_, nullptr) < 0) {
        std::cout << "avcodec_open2 Failed" << std::endl;
        return false;
    }

    if (avcodec_parameters_from_context(pVideoStream->codecpar, CodecCtx) < 0) {
        std::cout << "avcodec_parameters_from_context Failed" << std::endl;
        return false;
    }

    pCodecCtx_.reset(CodecCtx);
    return true;
}

void Streamer::decodePacket() {
    std::cout << "Start decode thread" << std::endl;
    bRunDecode_ = true;
    while (bRunDecode_) {
        auto pkt = popPacket();
        if (pkt->data) {
            AVPacket* clonePacket = av_packet_alloc();        
            av_packet_ref(clonePacket, pkt.get());
            if (!clonePacket)
            {
                std::cout << "clonePacket is nullptr" << std::endl;
            }
            if (avcodec_send_packet(pCodecCtx_.get(), clonePacket) < 0) {
                std::cout << "avcodec_send_packet Failed" << std::endl;
                return;
            }

            for (;;) {
                AVFrame* frame = av_frame_alloc();
                auto nRet = avcodec_receive_frame(pCodecCtx_.get(), frame); // 从解码器中获取被解码后的帧数据AVFrame
                if (nRet == AVERROR(EAGAIN) || nRet == AVERROR_EOF) {
                    break;
                } else if (nRet < 0) {
                    std::cout << "avcodec_receive_frame Failed" << std::endl;
                    return;
                }

                av_frame_unref(frame);
            }
            
            av_packet_unref(clonePacket);
        }

        av_packet_unref(pkt.get());
        
    }
}

bool Streamer::pushVideo() {
    if (pCurPacket_->dts <= 0) {
        /*std::cout << "Packet PTS: " << pCurPacket_->pts << "Packet DTS: " << pCurPacket_->dts
                  << "Packet Duration: " << pCurPacket_->duration << std::endl;*/
        return true;
    }
    auto&& input_timebase = pInputCtx_->streams[vIndex_]->time_base; 
    auto&& output_timebase = pOutCtx_->streams[vIndex_]->time_base;

    // 修复首帧错误时间戳。
    if (pCurPacket_->dts < last_dts_) {
        pCurPacket_->dts = last_dts_;
    }

    last_dts_ = pCurPacket_->dts;

    if (pCurPacket_->pts < pCurPacket_->dts) {
        pCurPacket_->pts = pCurPacket_->dts;
    }

    // 重整时间戳。
    pCurPacket_->pts = av_rescale_q_rnd(pCurPacket_->pts, input_timebase, output_timebase,
        static_cast<AVRounding>(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
    pCurPacket_->dts = av_rescale_q_rnd(pCurPacket_->dts, input_timebase, output_timebase,
        static_cast<AVRounding>(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
    pCurPacket_->duration = av_rescale_q(pCurPacket_->duration, input_timebase, output_timebase);
    pCurPacket_->pos = -1;

    // 提供合理的推流延时。
    auto pts_time = av_rescale_q(pCurPacket_->dts, output_timebase, AVRational{1, 1000});
    auto now_time = av_gettime() / 1000;

    if (pts_time > now_time) {
        av_usleep(static_cast<std::uint32_t>(pts_time - now_time));
    }

    auto nRet = av_interleaved_write_frame(pOutCtx_.get(), pCurPacket_.get());

    if (nRet < 0) {
        std::cout << "av_interleaved_write_frame Failed, Error Num:" << nRet << " Url: " << strOutUrl_ << std::endl;
        return false;
    }
    return true;
}

bool Streamer::readFrame() {
    std::cout << "start readFrame" << std::endl;
    while (bRunRead_) {
        av_packet_unref(pCurPacket_.get());
        auto nRet = av_read_frame(pInputCtx_.get(), pCurPacket_.get());
        
        if (nRet == AVERROR_EOF || nRet == AVERROR_EXIT) {
            return false;
        }

        if (nRet < 0 || pCurPacket_->size < 0) {
            std::cout << "av_read_frame Failed" << std::endl;
            return false;
        }

        if (pCurPacket_->stream_index != vIndex_) {
            continue;
        }

        //if (pCurPacket_->stream_index == vIndex_ && pCurPacket_->flags == AV_PKT_FLAG_KEY && bOpenPush_) {
        //    auto clonePacket = std::make_unique<AVPacket>();
        //    av_packet_ref(clonePacket.get(), pCurPacket_.get());
        //    pushPacket(std::move(clonePacket));
        //    //decodePacket();
        //}

        if (pCurPacket_->stream_index == vIndex_ && bOpenPush_) {
            auto clonePacket = std::make_unique<AVPacket>();
            av_packet_ref(clonePacket.get(), pCurPacket_.get());
            pushPacket(std::move(clonePacket));
        }

        if (!bOpenPush_) {
            if (!openOutPut()) {
                std::cout<<"OpenInput Failed, Push Url:" << strOutUrl_<< std::endl;
                return false;
            }

            bOpenPush_ = true;
        }

        if (pCurPacket_->stream_index == vIndex_) {
            if (!pushVideo()) {
                std::cout << "PushVideo Failed" << std::endl;
                return false;
            }
        }
    }
    return true;
}

void Streamer::threadFunc() {
    int nErrorCount = 0;
    while (bRun_) {
        nErrorCount++;
        if (nErrorCount == 10) {
            return;
        }

        if (!openInput()) {
            std::cout << "OpenInput Failed, RTSP:{}" << strInputUrl_ << std::endl;
            continue;
        }

        bRunRead_ = true;

        if (!readFrame()) {
            continue;
        }
    }
}

void Streamer::pushPacket(std::unique_ptr<AVPacket> pkt) {
    std::lock_guard<std::mutex> lock(mutex_);
    qPacket_.push(std::move(pkt));
    cond_.notify_one();
}

std::unique_ptr<AVPacket> Streamer::popPacket() {
    std::unique_lock<std::mutex> lock(mutex_);
    cond_.wait(lock, [this]() { return !qPacket_.empty(); });
    auto pkt = std::move(qPacket_.front());
    qPacket_.pop();
    return pkt;
}


http://www.kler.cn/a/291340.html

相关文章:

  • MATLAB常见数学运算函数
  • AntFlow 0.11.0版发布,增加springboot starter模块,一款设计上借鉴钉钉工作流的免费企业级审批流平台
  • 手摸手5-springboot开启打印sql完整语句
  • Kotlin的data class
  • 【AtCoder】Beginner Contest 380-F.Exchange Game
  • JVM垃圾回收详解(重点)
  • 微服务中的服务降级与熔断机制
  • 搜维尔科技:使用Geomagic Touch X 对机械臂进行远程遥操作
  • mysql 使用 general 开启SQL跟踪功能
  • Knife4j:为Spring Boot API赋能的文档生成器
  • SwaggerAPI未授权访问漏洞
  • 代码随想录Day 32|leetcode题目:501.斐波那契数、70.爬楼梯、746.使用最小花费爬楼梯
  • 【软件工程】软件与软件危机
  • List 的介绍
  • OPenCV结构分析与形状描述符(3)计算一个点集的最小外接矩形的函数boundingRect()的使用
  • react购物车Redux
  • 交叉编译概念
  • 秒杀商品实时热点发现及如何进行测试
  • sqlite3 db.configure方法详解:设置项与默认值
  • [STM32]从零开始的STM32标准库环境搭建(小白向)
  • Java项目服务器CPU飙升问题排查
  • 1998-2023年上市公司金融/信贷/资本资源错配程度数据(含原始数据+计算代码+结果)
  • 每日OJ_牛客_Emacs计算器(逆波兰表达式)
  • 图论(1)
  • Day11_0.1基础学习MATLAB学习小技巧总结(11)——程序流程控制2
  • 50ETF期权和股指期权有什么区别?ETF期权应该怎么做?