当前位置: 首页 > article >正文

基于发布-订阅模型的音视频流分发框架

有时需要同时网络推流和把流封装为某格式,或做一些其它操作。这就需要一个分发流的机制,把同一路流分发给多个使用者去操作,下面实现了一个简易的线程安全的音视频流分发框架。代码如下:

avStreamHub.h

#ifndef STREAMHUB_H
#define STREAMHUB_H

#include <stddef.h>
#include <pthread.h>

typedef void (*AVStreamCallback)(void *data, size_t size, void *arg);

typedef struct Subscriber
{
    AVStreamCallback callback;
    void *arg;
} Subscriber;

typedef struct AVStreamHub
{
    Subscriber *subscribers; // 存储订阅者的数组
    size_t size;             // 当前订阅者数量
    size_t capacity;         // 动态数组容量
    pthread_mutex_t lock;    // 互斥锁
} AVStreamHub;

/**
 * @brief 初始化AVStreamHub
 * @param hub 指向AVStreamHub结构体的指针
 * @param initial_capacity 初始分配的订阅者容量
 * @return 0 成功,-1 失败
 */
int avStreamHub_init(AVStreamHub *hub, size_t initial_capacity);

/**
 * @brief 添加订阅者
 * @param hub 指向AVStreamHub结构体的指针
 * @param callback 订阅者的回调函数
 * @param arg 回调携带的参数
 * @return 0 成功,< 0 失败
 */
int avStreamHub_addSub(AVStreamHub *hub, AVStreamCallback callback, void *arg);

/**
 * @brief 删除订阅者
 * @param hub 指向AVStreamHub结构体的指针
 * @param callback 要移除的订阅者的回调函数
 * @return 无
 */
void avStreamHub_removeSub(AVStreamHub *hub, AVStreamCallback callback);

/**
 * @brief 将数据流分发给所有已注册的订阅者
 * @param hub 指向AVStreamHub结构体的指针
 * @param data 指向要分发的数据的指针
 * @param size 数据的大小
 * @return 无
 */
void avStreamHub_publish(AVStreamHub *hub, void *data, size_t size);

/**
 * @brief 销毁AVStreamHub
 * @param hub 指向AVStreamHub结构体的指针
 * @return 无
 */
void avStreamHub_destroy(AVStreamHub *hub);

#endif // STREAMHUB_H

avStreamHub.c

#include "avStreamHub.h"
#include <stdlib.h>
#include <string.h>

#define INITIAL_CAPACITY 2

int avStreamHub_init(AVStreamHub *hub, size_t initial_capacity)
{
    hub->size = 0;
    hub->capacity = 0;

    if (initial_capacity == 0)
    {
        initial_capacity = INITIAL_CAPACITY;
    }
    hub->subscribers = (Subscriber *)malloc(initial_capacity * sizeof(Subscriber));
    if (!hub->subscribers)
    {
        return -1;
    }
    memset(hub->subscribers, 0, initial_capacity * sizeof(Subscriber));

    hub->capacity = initial_capacity;
    pthread_mutex_init(&hub->lock, NULL);

    return 0;
}

// 添加订阅者
int avStreamHub_addSub(AVStreamHub *hub, AVStreamCallback callback, void *arg)
{
    pthread_mutex_lock(&hub->lock);

    // 检查该回调是否已存在
    for (size_t i = 0; i < hub->size; i++)
    {
        if (hub->subscribers[i].callback == callback)
        {
            pthread_mutex_unlock(&hub->lock);
            return -1; // 回调已存在
        }
    }

    // 如果容量不足则扩展容量
    if (hub->size == hub->capacity)
    {
        hub->subscribers = (Subscriber *)realloc(hub->subscribers,
                                                 hub->capacity * 2 * sizeof(Subscriber));
        if (!hub->subscribers)
        {
            pthread_mutex_unlock(&hub->lock);
            return -2;
        }
        hub->capacity *= 2;
    }

    // 添加新订阅者
    hub->subscribers[hub->size].callback = callback;
    hub->subscribers[hub->size].arg = arg;
    hub->size++;

    pthread_mutex_unlock(&hub->lock);
    return 0;
}

// 删除订阅者
void avStreamHub_removeSub(AVStreamHub *hub, AVStreamCallback callback)
{
    pthread_mutex_lock(&hub->lock);

    for (size_t i = 0; i < hub->size; i++)
    {
        if (hub->subscribers[i].callback == callback)
        {
            size_t j = i;
            for (; j < hub->size - 1; j++)
            {
                hub->subscribers[j] = hub->subscribers[j + 1];
            }
            hub->subscribers[j].callback = NULL;
            hub->subscribers[j].arg = NULL;
            hub->size--;
            break;
        }
    }

    pthread_mutex_unlock(&hub->lock);
}

// 分发数据
void avStreamHub_publish(AVStreamHub *hub, void *data, size_t size)
{
    pthread_mutex_lock(&hub->lock);

    for (size_t i = 0; i < hub->size; i++)
    {
        hub->subscribers[i].callback(data, size, hub->subscribers[i].arg);
    }

    pthread_mutex_unlock(&hub->lock);
}

// 销毁AVStreamHub
void avStreamHub_destroy(AVStreamHub *hub)
{
    free(hub->subscribers);
    hub->subscribers = NULL;
    hub->size = 0;
    hub->capacity = 0;
    pthread_mutex_destroy(&hub->lock);
}

http://www.kler.cn/a/293900.html

相关文章:

  • Android 下内联汇编,Android Studio 汇编开发
  • WordPress 2024主题实例镜像
  • 网络安全-Linux基础(bash脚本)
  • PHP爬虫快速获取京东商品详情(代码示例)
  • TVM计算图分割--分割方式
  • SpringBoot整合Freemarker(三)
  • 2024 第十二届重庆国际植保暨新型肥料农药产业博览会
  • 上海大学《2022年836+915自动控制原理真题及答案》 (完整版)
  • GIT:git add命令指定文件夹
  • dubbo 服务消费原理分析之应用级服务发现
  • [论文笔记]Making Large Language Models A Better Foundation For Dense Retrieval
  • 《长得太长也是错?——后端 Long 型 ID 精度丢失的“奇妙”修复之旅》
  • Python精选200Tips:11-20
  • JAVA学习-练习试用Java实现“删除有序数组中的重复项”
  • NLP中文本预处理
  • 反向沙箱-安全上网解决方案
  • [Docker]当下实测可用Docker镜像源
  • 专为游戏行业设计的安全防护盾——游戏盾
  • 企业财税自动化解决方案的安全与合规性保障
  • 完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (三) 代码编译、各种问题解决、代码修改
  • 使用Ansible实现高效服务器配置管理的最佳实践
  • el-table使用type=“expand”根据数据条件隐藏展开按钮
  • vue2,vue3基于elementUI的el-table实现复制粘贴功能
  • 【docker】基于docker-compose 安装elasticsearch + kibana + ik分词器(8.10.4版本)
  • HDFS工具类
  • 高级架构师备考计划