文本转语音-音画适时推送rtsp并播放
文本语音 rtsp适时播放叫号系统的底层逻辑
发布Linux, unix socket 和window win32做为音频源的 python10下的(ffmpeg version 7.1) 可运行版本.
这两天在弄这个,前2篇是通过虚拟声卡,达到了最简单的一个逻辑,播放文本就从声卡发声,不播无所谓,自动忙音。 那个工作在windows平台,
而今天的这个相似功能的代码是mac os,理论支持windows,和linux,依赖ffmpeg和xiu(一个rust流服务器)的rtsp服务。
今天的难点有点多
- asyncio的任务 async def _tts_worker(self, text: str) 运行中有各种错误, engine runAndWait是不行的。 内部有它的event loop。所以init和endLoop,是暂时找到的解决办法。同时经历了,这个,和调用 ffmpeg 外部指令,并直接获取- 代表的stdout。 会遇到各种问题。做了捕获和处理。但是查找的时候,不是太容易。
- self._start_ffmpeg() 他需要, create socket 或pipe完成以后,才能运行。 调试我都手工在外部启动。 作用就是,输出到rtsp服务器,以备播放。
- input handle,等都是ai生成的,因为有好多种循环,这是比较省心在。
- 最紧急隐蔽在是, async def _heartbeat(self) 他需要计算播放静音的时间,长了不行,短了不行。 这个最初在测试代码,就几个函数。然后AI,生成了三个theading的版本,两个Queue。 然后转到了异步版本,明显快了很多。
- 在windows上使用win32pipen可以达到unix socket的效果很相似, 记得还有FIFO是linux专用的,当然还有stdin,和stdout。对于ffmpeg,这是一些程序内部的传送机制
- rtsp是需要一个后台的服务的,xiu是开源的rust项目,可以使。另外window推荐metamtx,双击运行,什么也不管。
音画同步应该是另个问题了,几天前,鼓捣了一下图片。让编辑后的,马上 在视频中显示。 这个另外一个话题了。做的这些就为了,让报号和点单,有个界面。
ffmpeg -re -framerate 30 -f image2 -loop 1 -i "image1.jpg" -c:v libx264 -preset ultrafast -tune zerolatency -pix_fmt rgba -f rtsp -rtsp_transport tcp rtsp://localhost:8554/live
合并的代码,就当成剩下的作业,有空再来做。
对于刚接触的,最好是慢慢和AI调试着来,一些功能就做出来。
语音推送使用ffmpeg独立进程,实现了前后中断后自动重启。
程序主体
可独立运行,也可以结合ffmg管理推送进程
- macos ,理论Linux适用,单文件可执行
main.py
import asyncio
import struct
import pyttsx3
import tempfile
import os
import socket
from aioconsole import ainput
from contextlib import suppress
from typing import Optional
class AsyncTTSController:
def __init__(self):
# 使用Unix域套接字
self.socket_path = "/tmp/tts_audio.sock"
self.server_socket: Optional[socket.socket] = None
self.client_socket: Optional[socket.socket] = None
# 进程控制
self.ffmpeg_process: Optional[asyncio.subprocess.Process] = None
self.running = False
# TTS引擎
self.engine = pyttsx3.init()
self.engine.setProperty('rate', 180)
self.engine.setProperty('volume', 1.0)
# 音频参数
self.sample_rate = 24000
self.channels = 1
self.bits_per_sample = 16
self.silence = self._generate_silence(0.2)
self.wav_header = self._generate_wav_header()
# 状态管理
self.connection_active = False
self.last_heartbeat = 0.0
self.heartbeat_interval = 2.0
self.sending_audio = 0
def _generate_wav_header(self) -> bytes:
"""生成WAV文件头"""
byte_rate = self.sample_rate * self.channels * self.bits_per_sample // 8
block_align = self.channels * self.bits_per_sample // 8
return struct.pack(
'<4sI4s4sIHHIIHH4sI',
b'RIFF', 36, b'WAVE', b'fmt ', 16, 1, self.channels,
self.sample_rate, byte_rate, block_align, self.bits_per_sample,
b'data', 0
)
def _generate_silence(self, duration: float) -> bytes:
"""生成静音数据"""
samples = int(self.sample_rate * duration)
return bytes(samples * self.channels * (self.bits_per_sample // 8))
async def _async_create_socket(self) -> None:
"""创建Unix域套接字"""
with suppress(Exception):
if os.path.exists(self.socket_path):
os.unlink(self.socket_path)
self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.server_socket.setblocking(False)
self.server_socket.bind(self.socket_path)
self.server_socket.listen(1)
loop = asyncio.get_running_loop()
while self.running and not self.connection_active:
try:
self.client_socket, _ = await loop.sock_accept(self.server_socket)
self.connection_active = True
print("客户端已连接")
await loop.sock_sendall(self.client_socket, self.wav_header)
except (BlockingIOError, InterruptedError):
await asyncio.sleep(0.1)
except Exception as e:
print(f"连接错误: {str(e)}")
self.connection_active = False
await asyncio.sleep(1)
async def _start_ffmpeg(self) -> None:
"""启动FFmpeg进程"""
with suppress(Exception):
if self.ffmpeg_process:
self.ffmpeg_process.terminate()
await self.ffmpeg_process.wait()
socketid='unix:'+self.socket_path
self.ffmpeg_process = await asyncio.create_subprocess_exec(
'ffmpeg',
'-f', 's16le',
'-ar', str(self.sample_rate),
'-ac', str(self.channels),
'-i', socketid, # 修改输入源为套接字路径
'-c:a', 'aac',
'-f', 'rtsp',
'-rtsp_transport', 'tcp',
'rtsp://localhost:8554/mystream',
stdout=asyncio.subprocess.DEVNULL,
stdin=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
asyncio.create_task(self._monitor_ffmpeg_errors())
async def _monitor_ffmpeg_errors(self) -> None:
"""监控FFmpeg错误输出"""
while self.running and self.ffmpeg_process:
line = await self.ffmpeg_process.stderr.readline()
if not line:
break
# print(f"[FFmpeg Error] {line.decode().strip()}")
async def _async_write_socket(self, data: bytes) -> None:
"""安全写入套接字"""
try:
if self.client_socket and self.connection_active:
loop = asyncio.get_running_loop()
await loop.sock_sendall(self.client_socket, data)
except (BrokenPipeError, ConnectionResetError):
print("连接已断开,尝试重连...")
await self._reconnect_pipeline()
except Exception as e:
print(f"写入错误: {str(e)}")
self.connection_active = False
async def _reconnect_pipeline(self) -> None:
"""完整重连流程"""
print("启动重连流程...")
self.connection_active = False
if self.client_socket:
self.client_socket.close()
task1=asyncio.create_task(self._async_create_socket()),
task2=asyncio.create_task( self._start_ffmpeg()),
await task2
await task1
# await asyncio.gather(task1, task2)
#await self._async_create_socket()
#await self._start_ffmpeg()
# 剩余的heartbeat、tts_worker、input_handler等方法保持相同...
async def stop(self) -> None:
"""安全关闭"""
self.running = False
with suppress(Exception):
if self.ffmpeg_process:
self.ffmpeg_process.terminate()
await self.ffmpeg_process.wait()
if self.client_socket:
self.client_socket.close()
if self.server_socket:
self.server_socket.close()
if os.path.exists(self.socket_path):
os.unlink(self.socket_path)
print("所有资源已释放")
async def _heartbeat(self) -> None:
"""心跳维持机制"""
while self.running:
if self.connection_active :
for i in range(10):
if self.sending_audio<0:
await self._async_write_socket(self.silence)
else :
self.sending_audio-= 2
await asyncio.sleep(0.2)
# print(self.sending_audio,"slend")
# await asyncio.sleep(self.heartbeat_interval)
else:
await asyncio.sleep(0.5)
def _sync_tts(self,text,tmp_filename):
eng=pyttsx3.init()
# eng.say(text)
eng.save_to_file(text, 'temp3.wav')
eng.runAndWait()
eng.endLoop()
async def _tts_worker(self, text: str) -> None:
"""异步TTS处理核心"""
tmp_filename = None
#with open('audio1.raw','rb') as chunkf:
# data=chunkf.read()
# secdd=len(data)/48000
# self.sending_audio=int(secdd*10)
# await self._async_write_socket(data)
# #await asyncio.sleep(secdd)
# print (secdd,len(data) )
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp_filename = tmp.name
# # 同步TTS操作转异步执行
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._sync_tts, *(text, 'temp3.wav',))
# 转换音频格式
# await asyncio.sleep(1.3)
# self._sync_tts(text,tmp_filename)
try:
proc = await asyncio.create_subprocess_exec(
'ffmpeg',
'-hide_banner',
'-loglevel', 'error',
'-y',
'-i', 'temp3.wav', # 输入文件路径
'-f', 's16le', # 强制输出格式为PCM s16le
'-acodec', 'pcm_s16le', # 明确指定音频编解码器 👈 关键修复
'-ar', str(self.sample_rate),
'-ac', str(self.channels),
'-', # 输出到标准输出
stdout=asyncio.subprocess.PIPE
)
# 流式发送音频数据
sum=0
while chunk := await proc.stdout.read(4096):
sum+=len(chunk)
await self._async_write_socket(chunk)
self.sending_audio=int(sum*10/48000)
print("write data x0.1s:",self.sending_audio)
finally:
if tmp_filename and os.path.exists(tmp_filename):
1
# os.unlink(tmp_filename)
async def _input_handler(self) -> None:
"""异步输入处理"""
while self.running:
try:
text = await ainput("请输入文本(输入q退出): ")
if text.lower() == 'q':
self.running = False
break
if text.strip():
await self._tts_worker(text)
except Exception as e:
print(f"输入错误: {str(e)}")
async def run(self) -> None:
"""主运行循环"""
self.running = True
#
#await self._start_ffmpeg()
tasks = [
asyncio.create_task(self._async_create_socket()),
asyncio.create_task( self._start_ffmpeg()),
asyncio.create_task(self._input_handler()),
asyncio.create_task(self._heartbeat()),
]
await asyncio.gather(*tasks)
# 以下保持不变...
if __name__ == "__main__":
controller = AsyncTTSController()
try:
asyncio.run(controller.run())
except KeyboardInterrupt:
asyncio.run(controller.stop())
"""
ffmpeg -y -i temp.wav -f s16le -acodec pcm_s16le -ar 24000 -ac 1 audio.raw
ffmpeg -ar 24000 -ac 1 -f s16le -i unix:/tmp/tts_audio.sock -f rtsp rtsp://localhost:8554/mystream
"""
- window10系统python10 可运行版本
主要让deepseek,执行了,socket 到 win32pipen的替换.因为本来就是换过去的.这一块的代码完全没有手工介入. 唯一改的是注释eng.endLoop(),并不用每次init() ,应改是pyttsx3的一个跨平台特性. ,异步的win32下支持稳定.
def _sync_tts(self, text, tmp_filename):
eng = self.engine #pyttsx3.init()
eng.save_to_file(text, 'temp3.wav')
eng.runAndWait()
# eng.endLoop()
main-win.py
import asyncio
import struct
import pyttsx3
import tempfile
import os
from aioconsole import ainput
from contextlib import suppress
from typing import Optional
import win32pipe
import win32file
import pywintypes
class AsyncTTSController:
def __init__(self):
# 使用Windows命名管道
self.pipe_name = r'\\.\pipe\tts_audio_pipe'
self.pipe_handle = None
# 进程控制
self.ffmpeg_process: Optional[asyncio.subprocess.Process] = None
self.running = False
# TTS引擎
self.engine = pyttsx3.init()
self.engine.setProperty('rate', 180)
self.engine.setProperty('volume', 1.0)
# 音频参数
self.sample_rate = 24000
self.channels = 1
self.bits_per_sample = 16
self.silence = self._generate_silence(0.2)
self.wav_header = self._generate_wav_header()
# 状态管理
self.connection_active = False
self.last_heartbeat = 0.0
self.heartbeat_interval = 2.0
self.sending_audio = 0
def _generate_wav_header(self) -> bytes:
"""生成WAV文件头"""
byte_rate = self.sample_rate * self.channels * self.bits_per_sample // 8
block_align = self.channels * self.bits_per_sample // 8
return struct.pack(
'<4sI4s4sIHHIIHH4sI',
b'RIFF', 36, b'WAVE', b'fmt ', 16, 1, self.channels,
self.sample_rate, byte_rate, block_align, self.bits_per_sample,
b'data', 0
)
def _generate_silence(self, duration: float) -> bytes:
"""生成静音数据"""
samples = int(self.sample_rate * duration)
return bytes(samples * self.channels * (self.bits_per_sample // 8))
async def _async_create_pipe(self) -> None:
"""创建命名管道"""
while self.running and not self.connection_active:
try:
# 创建命名管道
self.pipe_handle = win32pipe.CreateNamedPipe(
self.pipe_name,
win32pipe.PIPE_ACCESS_DUPLEX,
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE | win32pipe.PIPE_WAIT,
1, # 最大实例数
65536, 65536, # 输入输出缓冲区大小
0, # 默认超时
None # 安全属性
)
# 异步等待连接
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, win32pipe.ConnectNamedPipe, self.pipe_handle, None)
self.connection_active = True
print("客户端已连接")
await self._async_write_socket(self.wav_header)
except pywintypes.error as e:
if e.winerror == 536: # ERROR_PIPE_CONNECTED
self.connection_active = True
print("客户端已连接")
elif e.winerror == 232: # 客户端断开
print("客户端断开连接")
self.connection_active = False
if self.pipe_handle:
win32file.CloseHandle(self.pipe_handle)
self.pipe_handle = None
await asyncio.sleep(1)
else:
print(f"管道错误: {e}")
await asyncio.sleep(1)
except Exception as e:
print(f"其他错误: {e}")
await asyncio.sleep(1)
async def _start_ffmpeg(self) -> None:
"""启动FFmpeg进程"""
with suppress(Exception):
if self.ffmpeg_process:
self.ffmpeg_process.terminate()
await self.ffmpeg_process.wait()
self.ffmpeg_process = await asyncio.create_subprocess_exec(
'ffmpeg',
'-f', 's16le',
'-ar', str(self.sample_rate),
'-ac', str(self.channels),
'-i', self.pipe_name,
'-c:a', 'aac',
'-f', 'rtsp',
'-rtsp_transport', 'tcp',
'rtsp://localhost:8554/mystream',
stdout=asyncio.subprocess.DEVNULL,
stdin=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
asyncio.create_task(self._monitor_ffmpeg_errors())
async def _monitor_ffmpeg_errors(self) -> None:
"""监控FFmpeg错误输出"""
while self.running and self.ffmpeg_process:
line = await self.ffmpeg_process.stderr.readline()
if not line:
break
# print(f"[FFmpeg Error] {line.decode().strip()}")
async def _async_write_socket(self, data: bytes) -> None:
"""安全写入管道"""
try:
if self.connection_active and self.pipe_handle:
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, win32file.WriteFile, self.pipe_handle, data)
except pywintypes.error as e:
print(f"写入错误: {e}")
self.connection_active = False
await self._reconnect_pipeline()
except Exception as e:
print(f"其他写入错误: {e}")
self.connection_active = False
async def _reconnect_pipeline(self) -> None:
"""完整重连流程"""
print("启动重连流程...")
self.connection_active = False
if self.pipe_handle:
win32file.CloseHandle(self.pipe_handle)
self.pipe_handle = None
await asyncio.gather(
self._async_create_pipe(),
self._start_ffmpeg()
)
async def _heartbeat(self) -> None:
"""心跳维持机制"""
while self.running:
if self.connection_active:
for i in range(10):
if self.sending_audio < 0:
await self._async_write_socket(self.silence)
else:
self.sending_audio -= 2
await asyncio.sleep(0.2)
else:
await asyncio.sleep(0.5)
def _sync_tts(self, text, tmp_filename):
eng = pyttsx3.init()
eng.save_to_file(text, 'temp3.wav')
eng.runAndWait()
# eng.endLoop()
async def _tts_worker(self, text: str) -> None:
"""异步TTS处理核心"""
await asyncio.get_event_loop().run_in_executor(None, self._sync_tts, text, 'temp3.wav')
try:
proc = await asyncio.create_subprocess_exec(
'ffmpeg',
'-hide_banner',
'-loglevel', 'error',
'-y',
'-i', 'temp3.wav',
'-f', 's16le',
'-acodec', 'pcm_s16le',
'-ar', str(self.sample_rate),
'-ac', str(self.channels),
'-',
stdout=asyncio.subprocess.PIPE
)
sum_bytes = 0
while chunk := await proc.stdout.read(4096):
sum_bytes += len(chunk)
await self._async_write_socket(chunk)
self.sending_audio = int(sum_bytes * 10 / 48000)
print(f"写入数据 x0.1s: {self.sending_audio}")
finally:
if os.path.exists('temp3.wav'):
os.remove('temp3.wav')
async def _input_handler(self) -> None:
"""异步输入处理"""
while self.running:
try:
text = await ainput("请输入文本(输入q退出): ")
if text.lower() == 'q':
self.running = False
break
if text.strip():
await self._tts_worker(text)
except Exception as e:
print(f"输入错误: {str(e)}")
async def run(self) -> None:
"""主运行循环"""
self.running = True
tasks = [
asyncio.create_task(self._async_create_pipe()),
asyncio.create_task(self._start_ffmpeg()),
asyncio.create_task(self._input_handler()),
asyncio.create_task(self._heartbeat()),
]
await asyncio.gather(*tasks)
async def stop(self) -> None:
"""安全关闭"""
self.running = False
with suppress(Exception):
if self.ffmpeg_process:
self.ffmpeg_process.terminate()
await self.ffmpeg_process.wait()
if self.pipe_handle:
win32pipe.DisconnectNamedPipe(self.pipe_handle)
win32file.CloseHandle(self.pipe_handle)
print("所有资源已释放")
if __name__ == "__main__":
controller = AsyncTTSController()
try:
asyncio.run(controller.run())
except KeyboardInterrupt:
asyncio.run(controller.stop())
独立的ffmpeg启动和监控的独立代码
验证了一下rtsp断线重建连结,也验证了 上面 的main.py的socket server退出后,ffmpeg自动重启连接。 要使用这个更稳健球的程序,需要
注释main.py run中的asyncio.create_task( self._start_ffmpeg()),
代码不用修改,把管道名这个快, 彻底修改为,ffmpeg认识的window样式,就可运行.
r’\.\pipe\tts_audio_pipe’
ffmg,py
import asyncio
from contextlib import suppress
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class FFmpegManager:
def __init__(self):
self.ffmpeg_process = None
self._retry_count = 0
self._max_retries = 5
self._retry_lock = asyncio.Lock()
self._is_running = False
self.sample_rate=24000
self.channels=1
self.socket_path = "/tmp/tts_audio.sock"
async def _start_ffmpeg(self) -> None:
"""带自动重试的FFmpeg启动函数"""
async with self._retry_lock:
await self._safe_terminate()
try:
socketid = 'unix:' + self.socket_path
self.ffmpeg_process =await asyncio.create_subprocess_exec(
'ffmpeg',
'-f', 's16le',
'-ar', str(self.sample_rate),
'-ac', str(self.channels),
'-i', socketid, # 修改输入源为套接字路径
'-c:a', 'aac',
'-f', 'rtsp',
'-rtsp_transport', 'tcp',
'rtsp://localhost:8554/mystream',
stdout=asyncio.subprocess.DEVNULL,
stdin=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
self._retry_count = 0 # 重置重试计数器
asyncio.create_task(self._monitor_ffmpeg_errors())
self._is_running = True
except Exception as e:
logging.error(f"FFmpeg启动失败: {str(e)}")
await self._handle_retry()
async def _monitor_ffmpeg_errors(self):
"""增强型进程监控"""
while self._is_running:
logging.info("loop error cathch")
stderr = await self.ffmpeg_process.stderr.readline()
if stderr:
logging.error(f"FFmpeg错误输出: {stderr.decode().strip()}")
# 检测进程状态
return_code = self.ffmpeg_process.returncode
if return_code is not None:
logging.warning(f"FFmpeg异常退出,返回码: {return_code}")
self._is_running = False
await self._handle_retry()
break
async def _handle_retry(self):
"""智能重试策略"""
if self._retry_count >= self._max_retries:
logging.critical("达到最大重试次数,放弃重启")
return
# 指数退避算法
delay = min(2 ** self._retry_count, 30) # 最大间隔30秒
self._retry_count += 1
logging.info(f"将在 {delay} 秒后尝试第 {self._retry_count} 次重启")
await asyncio.sleep(delay)
await self._start_ffmpeg()
async def _safe_terminate(self):
"""安全终止现有进程"""
if self.ffmpeg_process:
with suppress(Exception):
self.ffmpeg_process.terminate()
await self.ffmpeg_process.wait()
self.ffmpeg_process = None
# 以下保持不变...
async def main():
controller=FFmpegManager()
try:
await controller._start_ffmpeg()
logging.info('rung')
await asyncio.sleep(1160)
except KeyboardInterrupt:
logging.info(3)
asyncio.run(controller._safe_terminate())
if __name__ == "__main__":
asyncio.run(main())