python实现webrtc通过whep拉取实时音频流
需求背景:通过whep的方式从流媒体服务器平台(基于srs服务器改造的平台)拉取实时音频流,数据传递采用48khz、16bit、双声道音频流,接收到数据后,转换成16khz、16bit、单声道音频流,并将其以base64加密字节流方式通过websocket传递给第三方;
1:通道及轨道的建立
class AudioTrack(MediaStreamTrack):
kind = "audio"
def __init__(self):
super().__init__()
self.frames = []
async def recv(self):
return await super().recv()
async def pull_run(self, url, trace_id, stmId, timestamp):
if self.pc is None:
rtc_conf = RTCConfiguration()
rtc_conf.iceServers = []
self.pc = RTCPeerConnection(rtc_conf)
logger.info(
f"创建RTCPeerConnection, url: {url}, trace_id: {trace_id}, stmId: {stmId}, timestamp: {timestamp}")
# 添加音频轨道
audio_track = AudioTrack()
self.pc.addTransceiver(audio_track, "recvonly")
self.pc.onicecandidate = lambda candidate: asyncio.create_task(send_candidate(candidate))
from semantic_business_app.websocket.client_manager import WebSocketClientSingleton
client = WebSocketClientSingleton.get_instance()
self.byte_io, self.output, stream = self.initialize_audio_stream()
# 处理音频轨道接收
@self.pc.on("track")
async def on_track(track):
if track.kind == "audio":
logger.info("Receiving audio track")
while True:
frame = await track.recv()
if frame.to_ndarray() is not None:
# 构建要发送的消息
coby = self.convert_audio_frames_to_bytes(self.byte_io, self.output, stream, frame)
if coby:
message = {
"traceId": trace_id,
"stmId": stmId,
"timestamp": time.time(),
"data": base64.b64encode(coby).decode('utf-8')
}
logger.debug(f"拉流message: {message}")
if client.ws and client.ws.sock and client.ws.sock.connected: # 检查连接状态
logger.debug(f"stmId={stmId}-有效websocket链接,消息发送")
client.send_message(message)
# 清空byte_io,录入下批数据
self.byte_io.seek(0)
self.byte_io.truncate(0)
else:
logger.info(f"WebSocket连接无效,拉流信息无法无法下发,stmId={stmId}直接断开连接。")
client.pull_manager.close_connection(stmId)
try:
# 创建并发送offer
offer = await self.pc.createOffer()
await self.pc.setLocalDescription(offer)
answer = await send_sdp(offer, url)
if answer:
await self.pc.setRemoteDescription(answer)
self.is_connected = True
logger.info(f"stmId={stmId} pull设置远程描述成功")
else:
logger.info(f"stmId={stmId} 无效answer,连接失败")
return
except Exception as e:
logger.error(f"stmId={stmId} 连接过程出错: {e}")
return
self.pc.on("connectionstatechange", self.on_connection_state_change)
def on_connection_state_change(self):
logger.info(f"连接状态: {self.pc.connectionState}, ICE连接状态: {self.pc.iceConnectionState}")
async def send_sdp(e_sdp, url):
async with aiohttp.ClientSession() as session:
async with session.post(
url,
data=e_sdp.sdp.encode(),
headers={
"Content-Type": "application/sdp",
"Content-Length": str(len(e_sdp.sdp))
},
ssl=False
) as response:
response_data = await response.text()
return RTCSessionDescription(sdp=response_data, type='answer')
# 发送候选
async def send_candidate(candidate):
if candidate:
logger.info(f"收集到的候选: {candidate}")
2:编码转换及转换成可发送的字节数据
# 初始化音频流,默认16000采样率
def initialize_audio_stream(self, sample_rate=16000):
byte_io = io.BytesIO()
output = av.open(byte_io, mode='w', format='wav')
stream = output.add_stream('pcm_s16le', rate=sample_rate, layout='mono')
self.file_flag = True
self.byte_flag = True
return byte_io, output, stream
def convert_audio_frames_to_bytes(self, byte_io, output, stream, frame):
try:
if self.file_flag and self.byte_flag:
packet = stream.encode(frame) # 将帧编码成包
if packet:
output.mux(packet) # 将编码后的包写入字节流
# 获取字节数组
audio_bytes = byte_io.getvalue()
# 判断是否超过指定大小,超过则返回,如果不做限制,会一直累加,超过websocket发送容量
if len(audio_bytes) >= self.chunk_size:
return audio_bytes
return None
except av.AVError as e:
logger.info("pull流已关闭, 无效数据: %s", e)
return None
3:轨道及通道关闭
async def close_connection(self):
if self.pc:
try:
# 停止所有音频轨道
for receivers in self.pc.getReceivers():
track = receivers.track
if track and track.kind == "audio":
logger.info("停止pull所有音频轨道")
if hasattr(track, 'stop') and callable(track.stop):
if asyncio.iscoroutinefunction(track.stop):
await track.stop() # 如果是协程,使用 await
else:
track.stop() # 如果不是协程,直接调用
await self.pc.close()
self.pc = None
self.is_connected = False
logger.info("关闭pull连接")
except Exception as e:
logger.error(f"关闭连接时出错: {e}")