基于发布-订阅模型的音视频流分发框架
有时需要同时网络推流和把流封装为某格式,或做一些其它操作。这就需要一个分发流的机制,把同一路流分发给多个使用者去操作,下面实现了一个简易的线程安全的音视频流分发框架。代码如下:
avStreamHub.h
#ifndef STREAMHUB_H
#define STREAMHUB_H
#include <stddef.h>
#include <pthread.h>
typedef void (*AVStreamCallback)(void *data, size_t size, void *arg);
typedef struct Subscriber
{
AVStreamCallback callback;
void *arg;
} Subscriber;
typedef struct AVStreamHub
{
Subscriber *subscribers; // 存储订阅者的数组
size_t size; // 当前订阅者数量
size_t capacity; // 动态数组容量
pthread_mutex_t lock; // 互斥锁
} AVStreamHub;
/**
* @brief 初始化AVStreamHub
* @param hub 指向AVStreamHub结构体的指针
* @param initial_capacity 初始分配的订阅者容量
* @return 0 成功,-1 失败
*/
int avStreamHub_init(AVStreamHub *hub, size_t initial_capacity);
/**
* @brief 添加订阅者
* @param hub 指向AVStreamHub结构体的指针
* @param callback 订阅者的回调函数
* @param arg 回调携带的参数
* @return 0 成功,< 0 失败
*/
int avStreamHub_addSub(AVStreamHub *hub, AVStreamCallback callback, void *arg);
/**
* @brief 删除订阅者
* @param hub 指向AVStreamHub结构体的指针
* @param callback 要移除的订阅者的回调函数
* @return 无
*/
void avStreamHub_removeSub(AVStreamHub *hub, AVStreamCallback callback);
/**
* @brief 将数据流分发给所有已注册的订阅者
* @param hub 指向AVStreamHub结构体的指针
* @param data 指向要分发的数据的指针
* @param size 数据的大小
* @return 无
*/
void avStreamHub_publish(AVStreamHub *hub, void *data, size_t size);
/**
* @brief 销毁AVStreamHub
* @param hub 指向AVStreamHub结构体的指针
* @return 无
*/
void avStreamHub_destroy(AVStreamHub *hub);
#endif // STREAMHUB_H
avStreamHub.c
#include "avStreamHub.h"
#include <stdlib.h>
#include <string.h>
#define INITIAL_CAPACITY 2
int avStreamHub_init(AVStreamHub *hub, size_t initial_capacity)
{
hub->size = 0;
hub->capacity = 0;
if (initial_capacity == 0)
{
initial_capacity = INITIAL_CAPACITY;
}
hub->subscribers = (Subscriber *)malloc(initial_capacity * sizeof(Subscriber));
if (!hub->subscribers)
{
return -1;
}
memset(hub->subscribers, 0, initial_capacity * sizeof(Subscriber));
hub->capacity = initial_capacity;
pthread_mutex_init(&hub->lock, NULL);
return 0;
}
// 添加订阅者
int avStreamHub_addSub(AVStreamHub *hub, AVStreamCallback callback, void *arg)
{
pthread_mutex_lock(&hub->lock);
// 检查该回调是否已存在
for (size_t i = 0; i < hub->size; i++)
{
if (hub->subscribers[i].callback == callback)
{
pthread_mutex_unlock(&hub->lock);
return -1; // 回调已存在
}
}
// 如果容量不足则扩展容量
if (hub->size == hub->capacity)
{
hub->subscribers = (Subscriber *)realloc(hub->subscribers,
hub->capacity * 2 * sizeof(Subscriber));
if (!hub->subscribers)
{
pthread_mutex_unlock(&hub->lock);
return -2;
}
hub->capacity *= 2;
}
// 添加新订阅者
hub->subscribers[hub->size].callback = callback;
hub->subscribers[hub->size].arg = arg;
hub->size++;
pthread_mutex_unlock(&hub->lock);
return 0;
}
// 删除订阅者
void avStreamHub_removeSub(AVStreamHub *hub, AVStreamCallback callback)
{
pthread_mutex_lock(&hub->lock);
for (size_t i = 0; i < hub->size; i++)
{
if (hub->subscribers[i].callback == callback)
{
size_t j = i;
for (; j < hub->size - 1; j++)
{
hub->subscribers[j] = hub->subscribers[j + 1];
}
hub->subscribers[j].callback = NULL;
hub->subscribers[j].arg = NULL;
hub->size--;
break;
}
}
pthread_mutex_unlock(&hub->lock);
}
// 分发数据
void avStreamHub_publish(AVStreamHub *hub, void *data, size_t size)
{
pthread_mutex_lock(&hub->lock);
for (size_t i = 0; i < hub->size; i++)
{
hub->subscribers[i].callback(data, size, hub->subscribers[i].arg);
}
pthread_mutex_unlock(&hub->lock);
}
// 销毁AVStreamHub
void avStreamHub_destroy(AVStreamHub *hub)
{
free(hub->subscribers);
hub->subscribers = NULL;
hub->size = 0;
hub->capacity = 0;
pthread_mutex_destroy(&hub->lock);
}