当前位置: 首页 > article >正文

aclStream流处理多路并发Pipeline框架中 视频解码 代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理

目录

1 类的整体层次关系

2 送解码数据时候从上到下的整体调用关系

3 获取解码数据之后回调的从下到上的调用层次关系


这个就不画详细的流程图了,只把自己代码阅读过程中的笔记复制到这里备忘。

1 类的整体层次关系

类的从上到下关系是:先是DecodeService类,然后DecoderAcl类,然后是AclLiteVideoProc类,然后是VideoDecoder类,然后再往下是
VdecHelper类,VdecHelper中做具体的处理

2 送解码数据时候从上到下的整体调用关系

VdecHelper这个类只有你发送了任务之后才会创建,整个调用关系是这样的。

void VideoTaskAdd(std::string const& param, std::string& reply)

try {

            int ret = session->start(g_source_name);

            if (ret != 0) {

                return reply_error(reply, ERROR_UNKOWN, get_error_msg(ERROR_UNKOWN));

            }

        }

int TaskSession::start(std::string data_source) {

ret = AddSourceForFile(source, stream_id, filename, FLAGS_src_frame_rate);

int AddSourceForFile(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename,

    const int &frame_rate = 25) {

    cnstream::FileSourceParam param;

    param.filename = filename;

    param.framerate = frame_rate;

    param.loop = FLAGS_loop;

    auto handler = cnstream::CreateSource(source, stream_id, param);    

            -->std::make_shared<FileHandler>(module, stream_id, param);

                    -->FileHandler::FileHandler(DataSource *module, const std::string &stream_id, const FileSourceParam &param)

                        : SourceHandler(module, stream_id) {

                         impl_ = new (std::nothrow) FileHandlerImpl(module, param, this);

                                    

                                explicit FileHandlerImpl(DataSource *module, const FileSourceParam &param, FileHandler *handler)

                                    : SourceRender(handler),

                                     module_(module),

                                     handle_param_(param),

                                    stream_id_(handler->GetStreamId()),

                                     parser_(stream_id_) {}


 

    return source->AddSource(handler);

    return 0;

}

SourceModule::AddSource(std::shared_ptr<SourceHandler> handler) {

  if (handler->Open() != true) {

    LOGE(CORE) << "[" << stream_id << "]: stream Open failed";

    return -1;

  }

FileHandler::Open()

return impl_->Open();

bool FileHandlerImpl::Open() {

thread_ = std::thread(&FileHandlerImpl::Loop, this);

    void FileHandlerImpl::Loop() {

        if (!SetCurrentDevice(param_.device_id)) return;

        if (!PrepareResources()) {//这个Loop是个线程函数,然后这里的prepare是准备,往下调用了一系列的open,

            ClearResources();

            if (nullptr != module_) {

                Event e;

                e.type = EventType::EVENT_STREAM_ERROR;

                e.module_name = module_->GetName();

                e.message = "Prepare codec resources failed.";

                e.stream_id = stream_id_;

                e.thread_id = std::this_thread::get_id();

                module_->PostEvent(e);

            }

            LOGE(SOURCE) << "[FileHandlerImpl] Loop(): [" << stream_id_ << "]: PrepareResources failed.";

            return;

        }

        set_thread_name("demux_decode");

        FrController controller(handle_param_.framerate);

        if (handle_param_.framerate > 0) controller.Start();

        VLOG1(SOURCE) << "[FileHandlerImpl] Loop(): [" << stream_id_ << "]: DecoderLoop";

        while (running_.load()) {

            if (!Process()) {//这里的process里面是处理函数。

                break;

            }

            if (handle_param_.framerate > 0) controller.Control();

        }

        VLOG1(SOURCE) << "[FileHandlerImpl] Loop(): [" << stream_id_ << "]: DecoderLoop Exit.";

        ClearResources();

    }

bool FileHandlerImpl::PrepareResources(bool demux_only) {

        VLOG1(SOURCE) << "[FileHandlerImpl] PrepareResources(): [" << stream_id_ << "]: Begin preprare resources";

        int ret = parser_.Open(handle_param_.filename, this, handle_param_.only_key_frame);

        VLOG1(SOURCE) << "[FileHandlerImpl] PrepareResources(): [" << stream_id_ << "]: Finish preprare resources";

        if (ret < 0 || dec_create_failed_) {

            return false;

        }

        return true;

    }

int FFParser::Open(const std::string& url, IParserResult* result, bool only_key_frame) {

        if (impl_) {

            return impl_->Open(url, result, only_key_frame);

        }

        return -1;

    }

int Open(const std::string& url, IParserResult* result, bool only_key_frame = false) {

            std::unique_lock<std::mutex> guard(mutex_);

if (result_) {

                result_->OnParserInfo(info);

            }

class FileHandlerImpl : public IParserResult, public IDecodeResult, public SourceRender, public IUserPool //FileHandlerImpl 继承的IParserResult。

void FileHandlerImpl::OnParserInfo(VideoInfo *info) {

bool ret = decoder_->Create(info, &extra);

bool DeviceDecoder::Create(VideoInfo *info, ExtraDecoderInfo *extra) {

int ret = VdecCreate(&vdec_, &create_params);

int VdecCreate(void **vdec, VdecCreateParams *params) {

  return infer_server::DecodeService::Instance().Create(vdec, params);

}

int Create(void **vdec, VdecCreateParams *params) {

if (decoder_->Create(params) < 0) {

      LOG(ERROR) << "[InferServer] [DecodeService] Create(): Create decoder failed";

      delete decoder_;

      return -1;

    }

int DecoderAcl::Create(VdecCreateParams *params) {

if (vdec_ == nullptr) {

            vdec_ = std::unique_ptr<acllite::AclLiteVideoProc>(new acllite::AclLiteVideoProc(vdec_config_, create_params_.device_id, create_params_.channel_id));

        }

AclLiteVideoProc::AclLiteVideoProc(VideoDecodeConfig& vdecConfig, int32_t deviceId, uint32_t channelId) {

        cap_ = new VideoDecoder(vdecConfig, deviceId, channelId);

        Open();

    }

AclLiteError AclLiteVideoProc::Open() {

return cap_->Open();

VideoDecoder::Open()

AclLiteError VideoDecoder::Open()//这个函数里面先是设置device,context哪一些,然后创建了类,然后又调用了VdecHelper的init函数。

        dvppVdec_ = new VdecHelper(deviceId_, channelId_, config_.width, config_.height, config_.resizedWidth,          

         aclRet = aclrtGetCurrentContext(&context_); 

     

然后创建完类,接着调用了init函数,dvppVdec_->Init();

    AclLiteError VdecHelper::Init() {

        ACLLITE_LOG_INFO("Vdec process init start...");

        aclError aclRet = aclrtCreateStream(&stream_);

        if (aclRet != ACL_SUCCESS) {

            ACLLITE_LOG_ERROR("Vdec create stream failed, errorno:%d", aclRet);

            return ACLLITE_ERROR_CREATE_STREAM;

        }

        ACLLITE_LOG_INFO("Vdec create stream ok");

        int ret = pthread_create(&subscribeThreadId_, nullptr,

            SubscribeReportThreadFunc, (void *)this);

        if (ret) {

            ACLLITE_LOG_ERROR("Start vdec subscribe thread failed, return:%d", ret);

            return ACLLITE_ERROR_CREATE_THREAD;

        }

        aclRet = aclrtSubscribeReport(subscribeThreadId_, stream_);

        if (aclRet != ACL_SUCCESS) {

            ACLLITE_LOG_ERROR("Vdec subscrible report failed, error %d", aclRet);

            return ACLLITE_ERROR_SUBSCRIBE_REPORT;

        }

        ret = CreateVdecChannelDesc();

        if (ret != ACLLITE_OK) {

            ACLLITE_LOG_ERROR("Create vdec channel failed");

            return ret;

        }

        return ACLLITE_OK;

    }

上面基本上是void FileHandlerImpl::Loop()里面的if (!PrepareResources()) {//这个Loop是个线程函数,然后这里的prepare是准备,往下调用了一系列的open,分支,这个Loop函数里面其实还有if (!Process()) {分支。

这个Process往下调用的流程是

bool FileHandlerImpl::Process() {

parser_.Parse();

int FFParser::Parse() {

        if (impl_) {

            impl_->Parse();

        }

        return -1;

    }

int Parse() {

result_->OnParserFrame(&frame);


 

void FileHandlerImpl::OnParserFrame(VideoEsFrame *frame) {

if (decoder_ && decoder_->Process(&pkt) == true) {

bool DeviceDecoder::Process(VideoEsPacket *pkt) {

int ret = VdecSendStream(vdec_, &stream, 1000);

int VdecSendStream(void *vdec, const VdecStream *stream, int timeout_ms) {

    return infer_server::DecodeService::Instance().SendStream(vdec, stream, timeout_ms);

}

  int SendStream(void *vdec, const VdecStream *stream, int timeout_ms) {

    if (!vdec || !stream) {

      LOG(ERROR) << "[InferServer] [DecodeService] SendStream(): Decoder or stream pointer is invalid";

      return -1;

    }

    IDecoder *decoder_ = static_cast<IDecoder *>(vdec);

    return decoder_->SendStream(stream, timeout_ms);

  }

int DecoderAcl::SendStream(const VdecStream *stream, int timeout_ms) {

acllite::AclLiteError codec_ret = vdec_->Decode(data_ptr, data_size, frame_id, this);

AclLiteError AclLiteVideoProc::Decode(std::shared_ptr<uint8_t> data, uint32_t dataSize, uint64_t frameId, void* userData) {

        if (cap_ != nullptr) {

            return cap_->Decode(data, dataSize, frameId, userData);

        }

        else {

            return ACLLITE_ERROR_UNSURPPORT_VIDEO_CAPTURE;

        }

    }

AclLiteError VideoDecoder::Decode(const std::shared_ptr<uint8_t> data, uint32_t dataSize, uint64_t frameId, void* userData) {

        void* buffer = CopyDataToDevice(data.get(), dataSize,

        UserData* tempUserData = new UserData;

        tempUserData->decoderSelf = (void*)this;

        tempUserData->userData = userData;    

        ret = dvppVdec_->Process(videoFrame, tempUserData);

    AclLiteError VdecHelper::Process(shared_ptr<FrameData> frameData, void* userData) {

        // create input desc

        AclLiteError atlRet = CreateInputStreamDesc(frameData);

        if (atlRet != ACLLITE_OK) {

            ACLLITE_LOG_ERROR("Create stream desc failed");

            return atlRet;

        }

        if (!frameData->isFinished) {

            // create out desc

            atlRet = CreateOutputPicDesc(outputPicSize_.load());

            if (atlRet != ACLLITE_OK) {

                ACLLITE_LOG_ERROR("Create pic desc failed");

                return atlRet;

            }

        }

        else {

            outputPicDesc_ = acldvppCreatePicDesc();

            if (outputPicDesc_ == nullptr) {

                ACLLITE_LOG_ERROR("Create vdec output pic desc failed");

                return ACLLITE_ERROR_CREATE_PIC_DESC;

            }

        }

        // send data to dvpp vdec to decode

        ret = aclvdecSendFrame(vdecChannelDesc_, inputStreamDesc_,   

            outputPicDesc_, nullptr, userData);//最终就到了这里的aclvdecSendFrame了。然后这不就到了那个回调函数那里去了。

        if (ret != ACL_SUCCESS) {

            ACLLITE_LOG_ERROR("Send frame to vdec failed, errorno:%d", ret);

            return ACLLITE_ERROR_VDEC_SEND_FRAME;

        }

        return ACLLITE_OK;

    }

所以最终的其实都是在VdecHelper,

3 获取解码数据之后回调的从下到上的调用层次关系

再往下,这里sendframe之后又去哪里了,

首先去了回调函数

void VideoDecoder::DvppVdecCallbackV2(hi_video_frame_info *frame, void *userdata) {

然后进一步去了

    int VideoDecoder::DecodeCallback(const std::shared_ptr<ImageData> decodedImage, uint32_t frameId, void* userData) {

        return config_.callbackFunc(decodedImage, channelId_, frameId, userData);

    }

然后又去了

config_.callbackFunc(decodedImage, channelId_, frameId, userData);

这里其实就是

    static int CallBackVdec(const std::shared_ptr<acllite::ImageData> decoded_image, uint32_t channel_id, uint32_t frame_id, void* user_data) {

        auto decoder = reinterpret_cast<DecoderAcl *>(user_data);

        decoder->OnFrame(decoded_image, channel_id, frame_id);

        return 0;

    }

然后就是

    // IVDecResult

    void DecoderAcl::OnFrame(const std::shared_ptr<acllite::ImageData> codec_image, uint32_t channel_id, uint32_t frame_id) {

        BufSurface *surf = nullptr;

        if (create_params_.GetBufSurf(&surf, codec_image->width, codec_image->height, CastColorFmt(codec_image->format),

            create_params_.surf_timeout_ms, create_params_.userdata) < 0) {

            LOG(ERROR) << "[InferServer] [DecoderAcl] OnFrame(): Get BufSurface failed";

            OnError(-1);

            return;

        }

        if (surf->mem_type != BUF_MEMORY_DVPP) {

            LOG(ERROR) << "[InferServer] [DecoderAcl] OnFrame(): BufSurface memory type must be BUF_MEMORY_DVPP";

            return;

        }

        switch (codec_image->format) {

        case acllite::ImageFormat::YUV_SP_420:

        case acllite::ImageFormat::YVU_SP_420:

            if (surf->surface_list[0].width != codec_image->width || surf->surface_list[0].height != codec_image->height) {

                BufSurface transform_src;

                BufSurfaceParams src_param;

                memset(&transform_src, 0, sizeof(BufSurface));

                memset(&src_param, 0, sizeof(BufSurfaceParams));

                src_param.color_format = CastColorFmt(codec_image->format);

                src_param.data_size = codec_image->size;

                src_param.data_ptr = reinterpret_cast<void *>(codec_image->data.get());

                VLOG(5) << "[InferServer] [DecoderAcl] OnFrame(): codec_frame: "

                    << " width = " << codec_image->width

                    << ", height = " << codec_image->height

                    << ", width stride = " << codec_image->alignWidth

                    << ", height stride = " << codec_image->alignHeight;

                VLOG(5) << "[InferServer] [DecoderAcl] OnFrame(): surf->surface_list[0]: "

                    << " width = " << surf->surface_list[0].width

                    << ", height = " << surf->surface_list[0].height

                    << ", width stride = " << surf->surface_list[0].width_stride

                    << ", height stride = " << surf->surface_list[0].height_stride;

                src_param.width = codec_image->width;

                src_param.height = codec_image->height;

                src_param.width_stride = codec_image->alignWidth;

                src_param.height_stride = codec_image->alignHeight;

                transform_src.batch_size = 1;

                transform_src.num_filled = 1;

                transform_src.device_id = create_params_.device_id;

                transform_src.mem_type = BUF_MEMORY_DVPP;

                transform_src.surface_list = &src_param;

                TransformParams trans_params;

                memset(&trans_params, 0, sizeof(trans_params));

                trans_params.transform_flag = TRANSFORM_RESIZE_SRC;

                if (Transform(transformer_, &transform_src, surf, &trans_params) < 0) {

                    LOG(ERROR) << "[InferServer] [DecoderAcl] OnFrame(): Transfrom failed";

                    break;

                }

            }

            else {

                CALL_ACL_FUNC(acllite::CopyDataToHostEx(surf->surface_list[0].data_ptr, codec_image->size, codec_image->data.get(), codec_image->size, codec_image->deviceId)

                   , "[DecoderAcl] OnFrame(): copy codec buffer data to surf failed");

            }

            break;

        default:

            break;

        }

        surf->pts = frame_id;

        create_params_.OnFrame(surf, create_params_.userdata);

    }

然后到了class DeviceDecoder类的

        static int OnFrame_(BufSurface *surf, void *userdata) {

            DeviceDecoder *thiz = reinterpret_cast<DeviceDecoder *>(userdata);

            return thiz->OnFrame(surf);

        }

然后到了

    int DeviceDecoder::OnFrame(BufSurface *surf) {

        surf->surface_list[0].width -= surf->surface_list[0].width & 1;

        surf->surface_list[0].height -= surf->surface_list[0].height & 1;

        BufSurfWrapperPtr wrapper = std::make_shared<BufSurfaceWrapper>(surf);

        if (result_) {

            result_->OnDecodeFrame(wrapper);

            return 0;

        }

        return -1;

    }

然后到了

    void FileHandlerImpl::OnDecodeFrame(BufSurfWrapperPtr wrapper) {

        if (frame_count_++ % param_.interval != 0) {

            // LOGI(SOURCE) << "frames are discarded" << frame_count_;

            return;  // discard frames

        }

        std::shared_ptr<CNFrameInfo> data = this->CreateFrameInfo();

        if (!data) {

            LOGW(SOURCE) << "[FileHandlerImpl] OnDecodeFrame(): failed to create FrameInfo.";

            return;

        }

        data->timestamp = wrapper->GetPts();

        if (!wrapper->GetBufSurface()) {

            data->flags = static_cast<size_t>(CNFrameFlag::CN_FRAME_FLAG_INVALID);

            this->SendFrameInfo(data);

            return;

        }

#if 0

        BufSurface* src_buf = wrapper->GetBufSurface();

        std::vector<uint8_t> indata(src_buf->surface_list[0].data_size);

        MemcpyHD(indata.data(), BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_ptr, BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_size);

        cv::Mat yuv_mat(src_buf->surface_list[0].height * 3 / 2, src_buf->surface_list[0].width, CV_8UC1, indata.data());

        cv::Mat bgr_mat;

        cv::cvtColor(yuv_mat, bgr_mat, cv::COLOR_YUV2BGR_NV12);

        cv::imwrite("decoded.jpg", bgr_mat);

#endif

        int ret = SourceRender::Process(data, std::move(wrapper), frame_id_++, param_);

        if (ret < 0) {

            LOGE(SOURCE) << "[FileHandlerImpl] OnDecodeFrame(): [" << stream_id_ << "]: Render frame failed";

            return;

        }

        this->SendFrameInfo(data);

    }

然后是

  bool SendFrameInfo(std::shared_ptr<CNFrameInfo> data) { return handler_->SendData(data); }

  bool SendData(std::shared_ptr<CNFrameInfo> data) {

    if (this->module_) {

      return this->module_->SendData(data);

    }

    return false;

  }

bool Module::TransmitData(std::shared_ptr<CNFrameInfo> data) {

  if (!HasTransmit()) {

    return true;

  }

  if (!DoTransmitData(data)) {

    return true;

  }

  return false;

}


http://www.kler.cn/a/280768.html

相关文章:

  • 2024.8.24 Python,链表异常断裂问题,双链表的建立问题,全排列中的引用机制与copy的使用,最大子数组和
  • 定制开发AI智能名片商城小程序:重塑品牌曝光的创新推手
  • Android 退出app方式(回忆录)
  • 【C++ STL哈希容器】unordered_set 无序集合
  • react 中的useState useEffect
  • Vue:组件化开发
  • K8S 无状态应用有状态应用
  • 【大模型】llama系列模型基础
  • 【Python机器学习】NLP概述——深度处理
  • VBA之正则表达式(47)-- 快速将公式转换为静态值计算
  • 免杀笔记 ---> CS特性角度看Veh免杀
  • 大数据技术之Flume应用案例(2)
  • Java笔试面试题AI答之线程(21)
  • 资料分析笔记
  • GNN的理解难点:一种不同于传统神经网络的复杂性
  • NoSql数据库 Redis集群详解
  • 设计模式 9 装饰器模式
  • AI模型:追求全能还是专精?
  • 数据结构——链表学习
  • 39-nacos eureka zookeeper区别