当前位置: 首页 > article >正文

python实现webrtc通过whep拉取实时音频流

需求背景:通过whep的方式从流媒体服务器平台(基于srs服务器改造的平台)拉取实时音频流,数据传递采用48khz、16bit、双声道音频流,接收到数据后,转换成16khz、16bit、单声道音频流,并将其以base64加密字节流方式通过websocket传递给第三方;

1:通道及轨道的建立

class AudioTrack(MediaStreamTrack):
    kind = "audio"

    def __init__(self):
        super().__init__()
        self.frames = []

    async def recv(self):
        return await super().recv()
    

    async def pull_run(self, url, trace_id, stmId, timestamp):
        if self.pc is None:
            rtc_conf = RTCConfiguration()
            rtc_conf.iceServers = []
            self.pc = RTCPeerConnection(rtc_conf)
            logger.info(
                f"创建RTCPeerConnection, url: {url}, trace_id: {trace_id}, stmId: {stmId}, timestamp: {timestamp}")

            # 添加音频轨道
            audio_track = AudioTrack()
            self.pc.addTransceiver(audio_track, "recvonly")
            self.pc.onicecandidate = lambda candidate: asyncio.create_task(send_candidate(candidate))

            from semantic_business_app.websocket.client_manager import WebSocketClientSingleton
            client = WebSocketClientSingleton.get_instance()

            self.byte_io, self.output, stream = self.initialize_audio_stream()

            # 处理音频轨道接收
            @self.pc.on("track")
            async def on_track(track):
                if track.kind == "audio":
                    logger.info("Receiving audio track")
                    while True:
                        frame = await track.recv()
                        if frame.to_ndarray() is not None:
                            # 构建要发送的消息
                            coby = self.convert_audio_frames_to_bytes(self.byte_io, self.output, stream, frame)
                            if coby:
                                message = {
                                    "traceId": trace_id,
                                    "stmId": stmId,
                                    "timestamp": time.time(),
                                    "data": base64.b64encode(coby).decode('utf-8')
                                }
                                logger.debug(f"拉流message: {message}")
                                if client.ws and client.ws.sock and client.ws.sock.connected:  # 检查连接状态
                                    logger.debug(f"stmId={stmId}-有效websocket链接,消息发送")
                                    client.send_message(message)
                                    # 清空byte_io,录入下批数据
                                    self.byte_io.seek(0)
                                    self.byte_io.truncate(0)
                                else:
                                    logger.info(f"WebSocket连接无效,拉流信息无法无法下发,stmId={stmId}直接断开连接。")
                                    client.pull_manager.close_connection(stmId)

            try:
                # 创建并发送offer
                offer = await self.pc.createOffer()
                await self.pc.setLocalDescription(offer)
                answer = await send_sdp(offer, url)
                if answer:
                    await self.pc.setRemoteDescription(answer)
                    self.is_connected = True
                    logger.info(f"stmId={stmId} pull设置远程描述成功")
                else:
                    logger.info(f"stmId={stmId} 无效answer,连接失败")
                    return
            except Exception as e:
                logger.error(f"stmId={stmId} 连接过程出错: {e}")
                return

            self.pc.on("connectionstatechange", self.on_connection_state_change)


    def on_connection_state_change(self):
        logger.info(f"连接状态: {self.pc.connectionState}, ICE连接状态: {self.pc.iceConnectionState}")


    async def send_sdp(e_sdp, url):
    async with aiohttp.ClientSession() as session:
        async with session.post(
                url,
                data=e_sdp.sdp.encode(),
                headers={
                    "Content-Type": "application/sdp",
                    "Content-Length": str(len(e_sdp.sdp))
                },
                ssl=False
        ) as response:
            response_data = await response.text()
            return RTCSessionDescription(sdp=response_data, type='answer')

    
    # 发送候选
    async def send_candidate(candidate):
        if candidate:
            logger.info(f"收集到的候选: {candidate}")

2:编码转换及转换成可发送的字节数据

    # 初始化音频流,默认16000采样率
    def initialize_audio_stream(self, sample_rate=16000):
        byte_io = io.BytesIO()
        output = av.open(byte_io, mode='w', format='wav')
        stream = output.add_stream('pcm_s16le', rate=sample_rate, layout='mono')
        self.file_flag = True
        self.byte_flag = True
        return byte_io, output, stream

    def convert_audio_frames_to_bytes(self, byte_io, output, stream, frame):
        try:
            if self.file_flag and self.byte_flag:
                packet = stream.encode(frame)  # 将帧编码成包
                if packet:
                    output.mux(packet)  # 将编码后的包写入字节流

                # 获取字节数组
                audio_bytes = byte_io.getvalue()

                # 判断是否超过指定大小,超过则返回,如果不做限制,会一直累加,超过websocket发送容量
                if len(audio_bytes) >= self.chunk_size:
                    return audio_bytes
            return None
        except av.AVError as e:
            logger.info("pull流已关闭, 无效数据: %s", e)
            return None

3:轨道及通道关闭

    async def close_connection(self):
        if self.pc:
            try:
                # 停止所有音频轨道
                for receivers in self.pc.getReceivers():
                    track = receivers.track
                    if track and track.kind == "audio":
                        logger.info("停止pull所有音频轨道")
                        if hasattr(track, 'stop') and callable(track.stop):
                            if asyncio.iscoroutinefunction(track.stop):
                                await track.stop()  # 如果是协程,使用 await
                            else:
                                track.stop()  # 如果不是协程,直接调用

                await self.pc.close()
                self.pc = None
                self.is_connected = False
                logger.info("关闭pull连接")
            except Exception as e:
                logger.error(f"关闭连接时出错: {e}")

 


http://www.kler.cn/a/508489.html

相关文章:

  • 分频器code
  • 第4章 Kafka核心API——Kafka客户端操作
  • Power Automate 实现字符串分割、替换、换行显示
  • 贪心算法(题1)区间选点
  • MyBatis(四)参数与配置详解
  • 利用EXCEL进行XXE攻击
  • [leetcode](适合有一定基础需要刷题的宝宝)map STL的增删查改
  • 怎么修复损坏的U盘?而且不用格式化的方式!
  • (一)相机标定——四大坐标系的介绍、对应转换、畸变原理以及OpenCV完整代码实战(C++版)
  • MySQL下载安装及配置
  • mysql-5.7.18保姆级详细安装教程
  • 数据仓库复用性:业务需求复用性设计
  • Mac 使用 GVM 管理多版本 Go 环境
  • Big-endian(大端字节序)与Little-endian(小端字节序)区别
  • 【数据库】MySQL数据库SQL语句汇总
  • 基于微信小程序的电子点菜系统设计与实现(KLW+源码+讲解)
  • MySQL 与 Redis 数据一致性 2
  • Python使用seleniumwire接管Chrome查看控制台中参数
  • Debian 设定 tomcat 定时重启
  • LabVIEW时域近场天线测试
  • Django创建项目速成
  • ESP32云开发二( http + led + lcd)
  • Whisper-Medium 模型:音频转文本的原理、实践与硬件推荐
  • 深度学习-86-大模型训练之为什么要设计成预训练和微调两个阶段
  • 第十三章:数据库技术
  • GPT-5 传言:一场正在幕后发生的 AI 变革