aclStream流处理多路并发Pipeline框架中 视频解码 代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理
目录
1 类的整体层次关系
2 送解码数据时候从上到下的整体调用关系
3 获取解码数据之后回调的从下到上的调用层次关系
这个就不画详细的流程图了,只把自己代码阅读过程中的笔记复制到这里备忘。
1 类的整体层次关系
类的从上到下关系是:先是DecodeService类,然后DecoderAcl类,然后是AclLiteVideoProc类,然后是VideoDecoder类,然后再往下是
VdecHelper类,VdecHelper中做具体的处理。
2 送解码数据时候从上到下的整体调用关系
VdecHelper这个类只有你发送了任务之后才会创建,整个调用关系是这样的。
void VideoTaskAdd(std::string const& param, std::string& reply)
try {
int ret = session->start(g_source_name);
if (ret != 0) {
return reply_error(reply, ERROR_UNKOWN, get_error_msg(ERROR_UNKOWN));
}
}
int TaskSession::start(std::string data_source) {
ret = AddSourceForFile(source, stream_id, filename, FLAGS_src_frame_rate);
int AddSourceForFile(cnstream::DataSource *source, const std::string &stream_id, const std::string &filename,
const int &frame_rate = 25) {
cnstream::FileSourceParam param;
param.filename = filename;
param.framerate = frame_rate;
param.loop = FLAGS_loop;
auto handler = cnstream::CreateSource(source, stream_id, param);
-->std::make_shared<FileHandler>(module, stream_id, param);
-->FileHandler::FileHandler(DataSource *module, const std::string &stream_id, const FileSourceParam ¶m)
: SourceHandler(module, stream_id) {
impl_ = new (std::nothrow) FileHandlerImpl(module, param, this);
explicit FileHandlerImpl(DataSource *module, const FileSourceParam ¶m, FileHandler *handler)
: SourceRender(handler),
module_(module),
handle_param_(param),
stream_id_(handler->GetStreamId()),
parser_(stream_id_) {}
return source->AddSource(handler);
return 0;
}
SourceModule::AddSource(std::shared_ptr<SourceHandler> handler) {
if (handler->Open() != true) {
LOGE(CORE) << "[" << stream_id << "]: stream Open failed";
return -1;
}
FileHandler::Open()
return impl_->Open();
bool FileHandlerImpl::Open() {
thread_ = std::thread(&FileHandlerImpl::Loop, this);
void FileHandlerImpl::Loop() {
if (!SetCurrentDevice(param_.device_id)) return;
if (!PrepareResources()) {//这个Loop是个线程函数,然后这里的prepare是准备,往下调用了一系列的open,
ClearResources();
if (nullptr != module_) {
Event e;
e.type = EventType::EVENT_STREAM_ERROR;
e.module_name = module_->GetName();
e.message = "Prepare codec resources failed.";
e.stream_id = stream_id_;
e.thread_id = std::this_thread::get_id();
module_->PostEvent(e);
}
LOGE(SOURCE) << "[FileHandlerImpl] Loop(): [" << stream_id_ << "]: PrepareResources failed.";
return;
}
set_thread_name("demux_decode");
FrController controller(handle_param_.framerate);
if (handle_param_.framerate > 0) controller.Start();
VLOG1(SOURCE) << "[FileHandlerImpl] Loop(): [" << stream_id_ << "]: DecoderLoop";
while (running_.load()) {
if (!Process()) {//这里的process里面是处理函数。
break;
}
if (handle_param_.framerate > 0) controller.Control();
}
VLOG1(SOURCE) << "[FileHandlerImpl] Loop(): [" << stream_id_ << "]: DecoderLoop Exit.";
ClearResources();
}
bool FileHandlerImpl::PrepareResources(bool demux_only) {
VLOG1(SOURCE) << "[FileHandlerImpl] PrepareResources(): [" << stream_id_ << "]: Begin preprare resources";
int ret = parser_.Open(handle_param_.filename, this, handle_param_.only_key_frame);
VLOG1(SOURCE) << "[FileHandlerImpl] PrepareResources(): [" << stream_id_ << "]: Finish preprare resources";
if (ret < 0 || dec_create_failed_) {
return false;
}
return true;
}
int FFParser::Open(const std::string& url, IParserResult* result, bool only_key_frame) {
if (impl_) {
return impl_->Open(url, result, only_key_frame);
}
return -1;
}
int Open(const std::string& url, IParserResult* result, bool only_key_frame = false) {
std::unique_lock<std::mutex> guard(mutex_);
if (result_) {
result_->OnParserInfo(info);
}
class FileHandlerImpl : public IParserResult, public IDecodeResult, public SourceRender, public IUserPool //FileHandlerImpl 继承的IParserResult。
void FileHandlerImpl::OnParserInfo(VideoInfo *info) {
bool ret = decoder_->Create(info, &extra);
bool DeviceDecoder::Create(VideoInfo *info, ExtraDecoderInfo *extra) {
int ret = VdecCreate(&vdec_, &create_params);
int VdecCreate(void **vdec, VdecCreateParams *params) {
return infer_server::DecodeService::Instance().Create(vdec, params);
}
int Create(void **vdec, VdecCreateParams *params) {
if (decoder_->Create(params) < 0) {
LOG(ERROR) << "[InferServer] [DecodeService] Create(): Create decoder failed";
delete decoder_;
return -1;
}
int DecoderAcl::Create(VdecCreateParams *params) {
if (vdec_ == nullptr) {
vdec_ = std::unique_ptr<acllite::AclLiteVideoProc>(new acllite::AclLiteVideoProc(vdec_config_, create_params_.device_id, create_params_.channel_id));
}
AclLiteVideoProc::AclLiteVideoProc(VideoDecodeConfig& vdecConfig, int32_t deviceId, uint32_t channelId) {
cap_ = new VideoDecoder(vdecConfig, deviceId, channelId);
Open();
}
AclLiteError AclLiteVideoProc::Open() {
return cap_->Open();
VideoDecoder::Open()
AclLiteError VideoDecoder::Open()//这个函数里面先是设置device,context哪一些,然后创建了类,然后又调用了VdecHelper的init函数。
dvppVdec_ = new VdecHelper(deviceId_, channelId_, config_.width, config_.height, config_.resizedWidth,
aclRet = aclrtGetCurrentContext(&context_);
然后创建完类,接着调用了init函数,dvppVdec_->Init();
AclLiteError VdecHelper::Init() {
ACLLITE_LOG_INFO("Vdec process init start...");
aclError aclRet = aclrtCreateStream(&stream_);
if (aclRet != ACL_SUCCESS) {
ACLLITE_LOG_ERROR("Vdec create stream failed, errorno:%d", aclRet);
return ACLLITE_ERROR_CREATE_STREAM;
}
ACLLITE_LOG_INFO("Vdec create stream ok");
int ret = pthread_create(&subscribeThreadId_, nullptr,
SubscribeReportThreadFunc, (void *)this);
if (ret) {
ACLLITE_LOG_ERROR("Start vdec subscribe thread failed, return:%d", ret);
return ACLLITE_ERROR_CREATE_THREAD;
}
aclRet = aclrtSubscribeReport(subscribeThreadId_, stream_);
if (aclRet != ACL_SUCCESS) {
ACLLITE_LOG_ERROR("Vdec subscrible report failed, error %d", aclRet);
return ACLLITE_ERROR_SUBSCRIBE_REPORT;
}
ret = CreateVdecChannelDesc();
if (ret != ACLLITE_OK) {
ACLLITE_LOG_ERROR("Create vdec channel failed");
return ret;
}
return ACLLITE_OK;
}
上面基本上是void FileHandlerImpl::Loop()里面的if (!PrepareResources()) {//这个Loop是个线程函数,然后这里的prepare是准备,往下调用了一系列的open,分支,这个Loop函数里面其实还有if (!Process()) {分支。
这个Process往下调用的流程是
bool FileHandlerImpl::Process() {
parser_.Parse();
int FFParser::Parse() {
if (impl_) {
impl_->Parse();
}
return -1;
}
int Parse() {
result_->OnParserFrame(&frame);
void FileHandlerImpl::OnParserFrame(VideoEsFrame *frame) {
if (decoder_ && decoder_->Process(&pkt) == true) {
bool DeviceDecoder::Process(VideoEsPacket *pkt) {
int ret = VdecSendStream(vdec_, &stream, 1000);
int VdecSendStream(void *vdec, const VdecStream *stream, int timeout_ms) {
return infer_server::DecodeService::Instance().SendStream(vdec, stream, timeout_ms);
}
int SendStream(void *vdec, const VdecStream *stream, int timeout_ms) {
if (!vdec || !stream) {
LOG(ERROR) << "[InferServer] [DecodeService] SendStream(): Decoder or stream pointer is invalid";
return -1;
}
IDecoder *decoder_ = static_cast<IDecoder *>(vdec);
return decoder_->SendStream(stream, timeout_ms);
}
int DecoderAcl::SendStream(const VdecStream *stream, int timeout_ms) {
acllite::AclLiteError codec_ret = vdec_->Decode(data_ptr, data_size, frame_id, this);
AclLiteError AclLiteVideoProc::Decode(std::shared_ptr<uint8_t> data, uint32_t dataSize, uint64_t frameId, void* userData) {
if (cap_ != nullptr) {
return cap_->Decode(data, dataSize, frameId, userData);
}
else {
return ACLLITE_ERROR_UNSURPPORT_VIDEO_CAPTURE;
}
}
AclLiteError VideoDecoder::Decode(const std::shared_ptr<uint8_t> data, uint32_t dataSize, uint64_t frameId, void* userData) {
void* buffer = CopyDataToDevice(data.get(), dataSize,
UserData* tempUserData = new UserData;
tempUserData->decoderSelf = (void*)this;
tempUserData->userData = userData;
ret = dvppVdec_->Process(videoFrame, tempUserData);
AclLiteError VdecHelper::Process(shared_ptr<FrameData> frameData, void* userData) {
// create input desc
AclLiteError atlRet = CreateInputStreamDesc(frameData);
if (atlRet != ACLLITE_OK) {
ACLLITE_LOG_ERROR("Create stream desc failed");
return atlRet;
}
if (!frameData->isFinished) {
// create out desc
atlRet = CreateOutputPicDesc(outputPicSize_.load());
if (atlRet != ACLLITE_OK) {
ACLLITE_LOG_ERROR("Create pic desc failed");
return atlRet;
}
}
else {
outputPicDesc_ = acldvppCreatePicDesc();
if (outputPicDesc_ == nullptr) {
ACLLITE_LOG_ERROR("Create vdec output pic desc failed");
return ACLLITE_ERROR_CREATE_PIC_DESC;
}
}
// send data to dvpp vdec to decode
ret = aclvdecSendFrame(vdecChannelDesc_, inputStreamDesc_,
outputPicDesc_, nullptr, userData);//最终就到了这里的aclvdecSendFrame了。然后这不就到了那个回调函数那里去了。
if (ret != ACL_SUCCESS) {
ACLLITE_LOG_ERROR("Send frame to vdec failed, errorno:%d", ret);
return ACLLITE_ERROR_VDEC_SEND_FRAME;
}
return ACLLITE_OK;
}
所以最终的其实都是在VdecHelper,
3 获取解码数据之后回调的从下到上的调用层次关系
再往下,这里sendframe之后又去哪里了,
首先去了回调函数
void VideoDecoder::DvppVdecCallbackV2(hi_video_frame_info *frame, void *userdata) {
然后进一步去了
int VideoDecoder::DecodeCallback(const std::shared_ptr<ImageData> decodedImage, uint32_t frameId, void* userData) {
return config_.callbackFunc(decodedImage, channelId_, frameId, userData);
}
然后又去了
config_.callbackFunc(decodedImage, channelId_, frameId, userData);
这里其实就是
static int CallBackVdec(const std::shared_ptr<acllite::ImageData> decoded_image, uint32_t channel_id, uint32_t frame_id, void* user_data) {
auto decoder = reinterpret_cast<DecoderAcl *>(user_data);
decoder->OnFrame(decoded_image, channel_id, frame_id);
return 0;
}
然后就是
// IVDecResult
void DecoderAcl::OnFrame(const std::shared_ptr<acllite::ImageData> codec_image, uint32_t channel_id, uint32_t frame_id) {
BufSurface *surf = nullptr;
if (create_params_.GetBufSurf(&surf, codec_image->width, codec_image->height, CastColorFmt(codec_image->format),
create_params_.surf_timeout_ms, create_params_.userdata) < 0) {
LOG(ERROR) << "[InferServer] [DecoderAcl] OnFrame(): Get BufSurface failed";
OnError(-1);
return;
}
if (surf->mem_type != BUF_MEMORY_DVPP) {
LOG(ERROR) << "[InferServer] [DecoderAcl] OnFrame(): BufSurface memory type must be BUF_MEMORY_DVPP";
return;
}
switch (codec_image->format) {
case acllite::ImageFormat::YUV_SP_420:
case acllite::ImageFormat::YVU_SP_420:
if (surf->surface_list[0].width != codec_image->width || surf->surface_list[0].height != codec_image->height) {
BufSurface transform_src;
BufSurfaceParams src_param;
memset(&transform_src, 0, sizeof(BufSurface));
memset(&src_param, 0, sizeof(BufSurfaceParams));
src_param.color_format = CastColorFmt(codec_image->format);
src_param.data_size = codec_image->size;
src_param.data_ptr = reinterpret_cast<void *>(codec_image->data.get());
VLOG(5) << "[InferServer] [DecoderAcl] OnFrame(): codec_frame: "
<< " width = " << codec_image->width
<< ", height = " << codec_image->height
<< ", width stride = " << codec_image->alignWidth
<< ", height stride = " << codec_image->alignHeight;
VLOG(5) << "[InferServer] [DecoderAcl] OnFrame(): surf->surface_list[0]: "
<< " width = " << surf->surface_list[0].width
<< ", height = " << surf->surface_list[0].height
<< ", width stride = " << surf->surface_list[0].width_stride
<< ", height stride = " << surf->surface_list[0].height_stride;
src_param.width = codec_image->width;
src_param.height = codec_image->height;
src_param.width_stride = codec_image->alignWidth;
src_param.height_stride = codec_image->alignHeight;
transform_src.batch_size = 1;
transform_src.num_filled = 1;
transform_src.device_id = create_params_.device_id;
transform_src.mem_type = BUF_MEMORY_DVPP;
transform_src.surface_list = &src_param;
TransformParams trans_params;
memset(&trans_params, 0, sizeof(trans_params));
trans_params.transform_flag = TRANSFORM_RESIZE_SRC;
if (Transform(transformer_, &transform_src, surf, &trans_params) < 0) {
LOG(ERROR) << "[InferServer] [DecoderAcl] OnFrame(): Transfrom failed";
break;
}
}
else {
CALL_ACL_FUNC(acllite::CopyDataToHostEx(surf->surface_list[0].data_ptr, codec_image->size, codec_image->data.get(), codec_image->size, codec_image->deviceId)
, "[DecoderAcl] OnFrame(): copy codec buffer data to surf failed");
}
break;
default:
break;
}
surf->pts = frame_id;
create_params_.OnFrame(surf, create_params_.userdata);
}
然后到了class DeviceDecoder类的
static int OnFrame_(BufSurface *surf, void *userdata) {
DeviceDecoder *thiz = reinterpret_cast<DeviceDecoder *>(userdata);
return thiz->OnFrame(surf);
}
然后到了
int DeviceDecoder::OnFrame(BufSurface *surf) {
surf->surface_list[0].width -= surf->surface_list[0].width & 1;
surf->surface_list[0].height -= surf->surface_list[0].height & 1;
BufSurfWrapperPtr wrapper = std::make_shared<BufSurfaceWrapper>(surf);
if (result_) {
result_->OnDecodeFrame(wrapper);
return 0;
}
return -1;
}
然后到了
void FileHandlerImpl::OnDecodeFrame(BufSurfWrapperPtr wrapper) {
if (frame_count_++ % param_.interval != 0) {
// LOGI(SOURCE) << "frames are discarded" << frame_count_;
return; // discard frames
}
std::shared_ptr<CNFrameInfo> data = this->CreateFrameInfo();
if (!data) {
LOGW(SOURCE) << "[FileHandlerImpl] OnDecodeFrame(): failed to create FrameInfo.";
return;
}
data->timestamp = wrapper->GetPts();
if (!wrapper->GetBufSurface()) {
data->flags = static_cast<size_t>(CNFrameFlag::CN_FRAME_FLAG_INVALID);
this->SendFrameInfo(data);
return;
}
#if 0
BufSurface* src_buf = wrapper->GetBufSurface();
std::vector<uint8_t> indata(src_buf->surface_list[0].data_size);
MemcpyHD(indata.data(), BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_ptr, BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_size);
cv::Mat yuv_mat(src_buf->surface_list[0].height * 3 / 2, src_buf->surface_list[0].width, CV_8UC1, indata.data());
cv::Mat bgr_mat;
cv::cvtColor(yuv_mat, bgr_mat, cv::COLOR_YUV2BGR_NV12);
cv::imwrite("decoded.jpg", bgr_mat);
#endif
int ret = SourceRender::Process(data, std::move(wrapper), frame_id_++, param_);
if (ret < 0) {
LOGE(SOURCE) << "[FileHandlerImpl] OnDecodeFrame(): [" << stream_id_ << "]: Render frame failed";
return;
}
this->SendFrameInfo(data);
}
然后是
bool SendFrameInfo(std::shared_ptr<CNFrameInfo> data) { return handler_->SendData(data); }
bool SendData(std::shared_ptr<CNFrameInfo> data) {
if (this->module_) {
return this->module_->SendData(data);
}
return false;
}
bool Module::TransmitData(std::shared_ptr<CNFrameInfo> data) {
if (!HasTransmit()) {
return true;
}
if (!DoTransmitData(data)) {
return true;
}
return false;
}