【AI实践】xiaozhi-esp32虾哥开源版-分析
语音交互总流程
客户端(ESP32) 服务器
| |
| 本地唤醒词检测"小智" |
| |
| 打开音频通道 |
|------------------------>|
| |
| 发送唤醒词音频 |
|------------------------>|
| |
| 发送唤醒事件 |
|------------------------>|
| {"type":"listen", |
| "state":"detect", |
| "text":"小智"} |
| |
| 开始持续音频传输 |
|------------------------>|
| (Opus编码的音频数据) |
| |
| ASR文本结果 |
|<------------------------|
| {"type":"stt", |
| "text":"天气怎么样"} |
芯片侧
系统层调用
codec调用
语音前端AFE
唤醒词检测
VAD检测
音频输入 -> AFE处理 -> VAD检测 -> 状态回调
| | | |
| | | +-> LED显示
| | | +-> 设备状态更新
| | |
| | +-> AFE_VAD_SPEECH/AFE_VAD_SILENCE
| |
| +-> 回声消除(AEC)
| +-> 语音增强(SE)
|
+-> 16kHz采样率
+-> 单声道/双声道
VAD模式的选择影响:
- 检测灵敏度:
- 更高的模式 = 更高的检测门限
- 更低的模式 = 更容易触发
- 环境适应:
- 嘈杂环境选择高模式
- 安静环境可以用低模式
- 功耗影响:
- 高模式需要更多处理
- 低模式处理负担较轻
- 用户体验:
- 高模式可能需要更大声说话
- 低模式说话音量要求低
在这个项目中选择VAD_MODE_3的原因:
- 提高准确性
- 减少误触发
- 适应各种环境
- 与唤醒词检测配合
你可以根据实际使用环境和需求调整VAD模式:
- 家庭安静环境:可以考虑MODE_1或MODE_2
- 公共场所:建议使用MODE_3
- 开发测试:可以用MODE_1方便调试
唤醒词Oneshot
“你好小智,天气”,连读发音
idf从系统环境变量获取sdk路径。
端云通道
客户端 服务器
| |
|------ 唤醒词检测 ------>|
| |
|------ 开始监听 -------->|
| |
|------ 音频数据 -------->|
| |
|<----- ASR结果 ---------|
| |
|<----- TTS音频 ---------|
| |
|------ 停止监听 -------->|
通信类型
获取ASR文本
TTS接收播放
MQTT服务器 ESP32设备
| |
|--- TTS音频数据(Opus) ---->|
| |
| | 1. 加入解码队列
| |
| | 2. OutputAudio线程处理
| |
| | 3. Opus解码为PCM
| |
| | 4. 重采样(如需要)
| |
| | 5. 输出到DAC
总结
当前采用面板板Wi-Fi方案
单麦克风,没有AEC没有全双工
wakeup-speaking-listening-speaking-listening
Server服务
虾哥的服务没有开源
但是提供了一个使用后台
小智 AI 聊天机器人
烧录后,管理设备填入mac地址;
tts引擎使用的是字节跳动,火山引擎
音色列表--语音技术-火山引擎
LLM可以选择
Server服务(Xinnan科技)
非虾哥服务端,兼容虾哥,其它开发者开发的。
分析下 xiaozhi-esp32-server文件下的,这是服务器端的代码
让我帮你分析服务器端的代码。首先我需要找到相关的文件。
Searched ind:\wsl\github\xiaozhi-ai
1 result
让我看看服务器目录下的文件结构:
Analyzed
d:\wsl\github\xiaozhi-ai\xiaozhi-esp32-server
这是一个基于Python的服务器项目,让我们分析主要的组件:
- 主要入口文件: 让我先查看
app.py
,这是服务器的入口文件:
Analyzed
app.py:(1-100)
让我们看看主要的配置文件 config.yaml:
Analyzed
config.yaml:(1-100)
根据代码分析,这是一个基于Python的语音对话服务器,主要功能和结构如下:
- 主要组件:
-
- WebSocket服务器:处理实时语音通信
- WebUI管理界面:提供Web管理界面(目前处于开发中)
- 多个AI模块:ASR(语音识别)、VAD(语音活动检测)、LLM(大语言模型)、TTS(文本转语音)
- 核心功能:
-
- 语音识别:使用FunASR模型
- 语音活动检测:使用SileroVAD
- 大语言模型:支持多种LLM(ChatGLM、DeepSeek、Ali等)
- 文本转语音:支持EdgeTTS
- 项目结构:
-
app.py
:主入口文件core/
:核心功能实现manager/
:管理界面相关代码models/
:AI模型存储目录config/
:配置文件目录ZhiKongTaiWeb/
:Web界面代码
- 配置特点:
-
- 支持设备认证(可选)
- 可配置不同的AI模型
- 支持多设备管理
- 可自定义提示词和对话风格
- 部署相关:
-
- 提供Docker支持(有Dockerfile和docker-compose配置)
- 默认WebSocket服务端口:8000
- 默认管理界面端口:8002
这是一个功能完整的语音对话服务器,主要用于处理ESP32设备的语音交互。它采用模块化设计,可以灵活配置不同的AI模型和服务。服务器支持实时语音处理,并可以通过WebSocket与客户端(ESP32设备)进行通信。
需要注意的是,管理界面(WebUI)目前还在开发中,建议保持默认配置(disabled)。同时,项目支持Docker部署,便于在不同环境中快速搭建服务。
录音 -> VAD分段 -> 累积音频 -> VAD检测结束 -> ASR整段识别 -> 文本处理 -> LLM对话
录音上传
录音上传和 ASR 处理的完整流程如下:
- 录音数据格式:
- ESP32 将音频编码为 Opus 格式
- 通过 WebSocket 二进制消息发送到服务器
- 采样率为 16kHz,单声道
- 服务器接收流程:
python
CopyInsert
async def handleAudioMessage(conn, audio):
if not conn.asr_server_receive:
logger.bind(tag=TAG).debug(f"前期数据处理中,暂停接收")
return
- VAD 处理流程:
python
CopyInsert
class SileroVAD(VAD):
def is_vad(self, conn, opus_packet):
# 1. 解码 Opus 数据
pcm_frame = self.decoder.decode(opus_packet, 960)
conn.client_audio_buffer += pcm_frame
# 2. 分帧处理(每帧 512 采样点)
while len(conn.client_audio_buffer) >= 512 * 2:
chunk = conn.client_audio_buffer[:512 * 2]
conn.client_audio_buffer = conn.client_audio_buffer[512 * 2:]
# 3. 数据格式转换
audio_int16 = np.frombuffer(chunk, dtype=np.int16)
audio_float32 = audio_int16.astype(np.float32) / 32768.0
audio_tensor = torch.from_numpy(audio_float32)
# 4. VAD 检测
speech_prob = self.model(audio_tensor, 16000).item()
client_have_voice = speech_prob >= self.vad_threshold
# 5. 语音段落检测
if conn.client_have_voice and not client_have_voice:
stop_duration = time.time() * 1000 - conn.client_have_voice_last_time
if stop_duration >= self.silence_threshold_ms:
conn.client_voice_stop = True
- 音频数据累积:
python
CopyInsert
if have_voice == False and conn.client_have_voice == False:
await no_voice_close_connect(conn)
conn.asr_audio.clear()
return
conn.client_no_voice_last_time = 0.0
conn.asr_audio.append(audio)
- ASR 处理触发:
python
CopyInsert
if conn.client_voice_stop:
conn.client_abort = False
conn.asr_server_receive = False
# 调用 ASR 进行语音识别
text, file_path = conn.asr.speech_to_text(conn.asr_audio, conn.session_id)
logger.bind(tag=TAG).info(f"识别文本: {text}")
- 处理模式:
python
CopyInsert
if conn.client_listen_mode == "auto":
have_voice = conn.vad.is_vad(conn, audio)
else:
have_voice = conn.client_have_voice
- 关键参数配置:
python
CopyInsert
self.vad_threshold = config.get("threshold") # VAD 检测阈值
self.silence_threshold_ms = config.get("min_silence_duration_ms") # 静默时长阈值
整个流程的特点:
- 实时处理:
- 流式接收音频数据
- 实时 VAD 检测
- 动态累积有效语音段
- 智能分段:
- 使用 SileroVAD 模型进行语音活动检测
- 基于概率阈值判断语音存在
- 通过静默时长判断语音段结束
- 数据处理:
- Opus 解码为 PCM
- 固定大小分帧处理
- 数据格式规范化
- 双模式支持:
- 自动模式:依赖 VAD 检测
- 手动模式:依赖客户端控制
- 错误处理:
- Opus 解码错误处理
- 异常状态恢复
- 资源清理机制
- 性能优化:
- 缓冲区管理
- 分帧处理减少延迟
- 异步处理避免阻塞
这种设计实现了高效的语音采集和处理流程,能够准确捕获用户的语音输入,并通过 VAD 智能判断语音段落,为后续的 ASR 处理提供高质量的语音数据。
VAD
VAD 的具体实现,特别是它的说话结束判断逻辑:
VAD 不仅在开始检测语音活动,也负责判断说话结束。让我解释一下关键的逻辑:
- 语音开始检测:
python
CopyInsert
# 检测语音活动
speech_prob = self.model(audio_tensor, 16000).item()
client_have_voice = speech_prob >= self.vad_threshold
if client_have_voice:
conn.client_have_voice = True
conn.client_have_voice_last_time = time.time() * 1000
- 使用 Silero VAD 模型计算语音概率
- 如果概率超过阈值(默认 0.5),则认为有语音
- 记录检测到语音的时间戳
- 说话结束判断:
python
CopyInsert
# 如果之前有声音,但本次没有声音,且与上次有声音的时间差已经超过了静默阈值,则认为已经说完一句话
if conn.client_have_voice and not client_have_voice:
stop_duration = time.time() * 1000 - conn.client_have_voice_last_time
if stop_duration >= self.silence_threshold_ms:
conn.client_voice_stop = True
关键参数在配置文件中:
yaml
CopyInsert
VAD:
SileroVAD:
threshold: 0.5 # 语音检测阈值
model_dir: models/snakers4_silero-vad
min_silence_duration_ms: 700 # 静默持续时间阈值,如果说话停顿比较长,可以把这个值设置大一些
说话结束的判定条件是:
- 之前检测到有声音 (
conn.client_have_voice
) - 当前帧没有检测到声音 (
not client_have_voice
) - 从上次检测到声音到现在的时间间隔超过了设定的静默阈值(默认 700ms)
这个机制可以防止:
- 说话中的短暂停顿被误判为说话结束
- 等待时间过长导致的延迟
- 背景噪音的干扰
如果你觉得系统对说话结束的判断太快或太慢,可以通过调整 min_silence_duration_ms
参数来优化:
- 值设置大一些:更容易处理说话时的长停顿,但响应可能会变慢
- 值设置小一些:响应更快,但可能会把说话中的停顿误判为说话结束
- 在 VAD 判停之前,所有有效的音频数据都会被添加到
conn.asr_audio
缓冲区中 - 但是这些音频数据并不会被送到 ASR 模型进行处理
- 只有当
conn.client_voice_stop
为 True(即 VAD 检测到说话结束)时,才会调用speech_to_text
进行识别
这种设计的原因:
- 资源效率:避免频繁调用 ASR 模型,减少计算资源消耗
- 准确性:等待完整的语音段可以获得更好的识别效果
- 实现简单:不需要处理复杂的流式 ASR 逻辑
但这也意味着:
- ASR 不是实时进行的,需要等待整段语音结束
- 所有音频数据都会被缓存在内存中,直到 VAD 判停
- 如果语音段很长,可能会导致一定的延迟
ASR识别
现在我可以为你完整地描述 SenseVoiceSmall 的调用流程:
- 初始化阶段:
python
CopyInsert
# 1. 服务器启动时初始化 ASR 模型
model = AutoModel(
model="models/SenseVoiceSmall",
vad_kwargs={"max_single_segment_time": 30000},
disable_update=True,
hub="hf"
)
音频处理流程:
a. 接收音频数据:
-
- WebSocket 服务器接收来自 ESP32 的 Opus 格式音频数据
- 数据先经过 VAD (语音活动检测) 判断是否有声音
python
CopyInsert
have_voice = conn.vad.is_vad(conn, audio)
b. 音频数据缓存:
-
- 如果检测到有声音,将音频数据添加到缓存
python
CopyInsert
conn.asr_audio.append(audio)
c. 音频转换处理:
-
- 当语音段结束时,开始处理缓存的音频数据
- 将 Opus 格式音频解码为 WAV 格式
python
CopyInsert
def save_audio_to_file(self, opus_data, session_id):
decoder = opuslib_next.Decoder(16000, 1) # 16kHz, 单声道
for opus_packet in opus_data:
pcm_frame = decoder.decode(opus_packet, 960)
pcm_data.append(pcm_frame)
d. 语音识别:
-
- 使用 SenseVoiceSmall 模型进行语音识别
python
CopyInsert
result = self.model.generate(
input=file_path,
cache={},
language="auto",
use_itn=True,
batch_size_s=60,
)
text = rich_transcription_postprocess(result[0]["text"])
后处理流程:
a. 文本处理:
-
- 对识别出的文本进行处理(去除标点符号等)
python
CopyInsert
text_len, text_without_punctuation = remove_punctuation_and_length(text)
b. 命令判断:
-
- 判断是否是退出命令
- 如果不是命令,则进入对话流程
python
CopyInsert
if text_len <= conn.max_cmd_length and await handleCMDMessage(conn, text_without_punctuation):
return
if text_len > 0:
await startToChat(conn, text)
c. 清理工作:
-
- 清理临时音频文件
- 重置 VAD 状态
python
CopyInsert
conn.asr_audio.clear()
conn.reset_vad_states()
- 特点和优化:
-
- 支持实时语音识别
- 使用 VAD 进行语音活动检测,提高识别效率
- 支持自动和手动两种语音检测模式
- 临时文件自动清理机制
- 异步处理,不阻塞主线程
- 支持多种语言自动识别("auto" 模式)
这就是 SenseVoiceSmall 模型在系统中的完整调用流程,从接收音频数据到最终输出文本结果的整个过程。
其中,后处理
步骤总结:
- ASR 基础处理:
-
- 音频文件保存
- 使用 FunASR 模型进行识别
- 使用 ITN 进行文本规范化
- 文本清理:
-
- 去除全角/半角标点
- 去除空格
- 去除表情符号
- 去除首尾特殊字符
- 命令识别:
-
- 检查文本长度
- 匹配退出命令
- 执行相应操作
- 错误处理:
-
- 音频处理异常
- 识别失败处理
- 日志记录
这些处理确保了:
- 文本的规范化和清洁度
- 命令的准确识别
- 与 LLM 的良好交互
- 系统的稳定性
LLM调用
分析 LLM (大语言模型) 的完整调用流程:
- 初始化阶段:
python
CopyInsert
# 在 ConnectionHandler 初始化时
self.llm = _llm # LLM 实例
self.dialogue = Dialogue() # 对话管理器
self.llm_finish_task = False # LLM 任务完成标志
- 对话初始化:
python
CopyInsert
def _initialize_components(self):
# 设置系统提示词
self.prompt = self.config["prompt"]
if self.private_config:
self.prompt = self.private_config.private_config.get("prompt", self.prompt)
# 添加时间信息
if "{date_time}" in self.prompt:
date_time = time.strftime("%Y-%m-%d %H:%M", time.localtime())
self.prompt = self.prompt.replace("{date_time}", date_time)
# 添加系统提示词到对话历史
self.dialogue.put(Message(role="system", content=self.prompt))
- 对话处理流程:
python
CopyInsert
def chat(self, query):
# 1. 添加用户输入到对话历史
self.dialogue.put(Message(role="user", content=query))
# 2. 调用 LLM 生成回复
llm_responses = self.llm.response(
self.session_id,
self.dialogue.get_llm_dialogue()
)
# 3. 流式处理 LLM 响应
response_message = []
start = 0
self.llm_finish_task = False
for content in llm_responses:
response_message.append(content)
# 检查是否被用户中断
if self.client_abort:
start = len(response_message)
break
# 分段处理文本,转换为语音
if is_segment(response_message):
segment_text = "".join(response_message[start:])
segment_text = get_string_no_punctuation_or_emoji(segment_text)
if len(segment_text) > 0:
self.recode_first_last_text(segment_text)
future = self.executor.submit(self.speak_and_play, segment_text)
self.tts_queue.put(future)
start = len(response_message)
# 4. 更新对话历史
self.dialogue.put(Message(
role="assistant",
content="".join(response_message)
))
- LLM 提供者实现(以 ChatGLM 为例):
python
CopyInsert
class LLMProvider(LLMProviderBase):
def __init__(self, config):
self.model_name = config.get("model_name") # glm-4-flash
self.api_key = config.get("api_key")
self.base_url = config.get("url") # https://open.bigmodel.cn/api/paas/v4/
self.client = openai.OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
def response(self, session_id, dialogue):
responses = self.client.chat.completions.create(
model=self.model_name,
messages=dialogue,
stream=True
)
for chunk in responses:
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
- 特点和优化:
-
- 支持流式输出:LLM 生成的文本会实时转换为语音
- 支持用户中断:可以随时停止当前对话
- 分段处理:长文本会被分段处理,提升响应速度
- 对话历史管理:维护完整的对话上下文
- 私有配置支持:可以为不同设备配置不同的 LLM 和提示词
- 错误处理:包含完整的错误处理机制
- 配置示例:
yaml
CopyInsert
LLM:
ChatGLMLLM:
type: openai
model_name: glm-4-flash # 免费模型
url: https://open.bigmodel.cn/api/paas/v4/
api_key: your_api_key
这就是完整的 LLM 调用流程,从接收用户输入到生成回复,再到转换为语音输出的整个过程。系统采用了流式处理和异步执行的方式,可以提供更好的实时交互体验。
TTS调用
LLM 是流式输出的,但会按照句子为单位分段给 TTS。具体流程如下:
- LLM 流式输出:
python
CopyInsert
# 获取 LLM 的流式响应
llm_responses = self.llm.response(self.session_id, self.dialogue.get_llm_dialogue())
# 逐个处理 token
for content in llm_responses:
response_message.append(content)
# 检查是否需要中断
if self.client_abort:
start = len(response_message)
break
# 记录生成时间
end_time = time.time()
self.logger.bind(tag=TAG).debug(f"大模型返回时间时间: {end_time - start_time} 秒, 生成token={content}")
- 按句子分段处理:
python
CopyInsert
# 检查是否可以分段(遇到标点符号)
if is_segment(response_message):
# 将当前积累的 tokens 组合成文本
segment_text = "".join(response_message[start:])
# 清理标点和表情
segment_text = get_string_no_punctuation_or_emoji(segment_text)
if len(segment_text) > 0:
# 记录第一段和最后一段文本
self.recode_first_last_text(segment_text)
# 提交 TTS 任务
future = self.executor.submit(self.speak_and_play, segment_text)
self.tts_queue.put(future)
# 更新起始位置
start = len(response_message)
- 分段判断逻辑:
python
CopyInsert
def is_segment(tokens):
# 检查最后一个字符是否为标点符号
if tokens[-1] in (",", ".", "?", ",", "。", "?", "!", "!", ";", ";", ":", ":"):
return True
else:
return False
- 处理流程:
- LLM 以 token 为单位流式输出
- 系统会缓存这些 token 直到遇到标点符号
- 遇到标点时,将累积的 token 组合成一个完整句子
- 将句子交给 TTS 处理
- TTS 将语音通过 WebSocket 发送给 ESP32
- 优化设计:
- 使用线程池处理 TTS 任务
- 维护任务队列确保顺序执行
- 支持中断机制
- 记录第一段和最后一段文本用于状态控制
这种设计的优点:
- 实时性:不需要等待 LLM 生成完整回复
- 自然度:按句子分段更符合人类说话方式
- 响应速度:边生成边转换为语音
- 资源效率:避免处理过长的文本
- 可控性:支持随时中断生成和播放
所以总结来说,是 LLM 流式输出 token,但系统会将这些 token 按句子组织后再交给 TTS 处理,这样既保证了实时性,又确保了语音输出的自然性。
分析一下 TTS 的完整调用流程:
- TTS 基础架构: 系统采用了基类
TTSProviderBase
和多个具体实现类的设计模式:
python
CopyInsert
class TTSProviderBase(ABC):
def __init__(self, config, delete_audio_file):
self.delete_audio_file = delete_audio_file
self.output_file = config.get("output_file")
@abstractmethod
def generate_filename(self):
pass
def to_tts(self, text):
tmp_file = self.generate_filename()
try:
max_repeat_time = 5
while not os.path.exists(tmp_file) and max_repeat_time > 0:
asyncio.run(self.text_to_speak(text, tmp_file))
if not os.path.exists(tmp_file):
max_repeat_time = max_repeat_time - 1
return tmp_file
except Exception as e:
logger.bind(tag=TAG).info(f"Failed to generate TTS file: {e}")
- TTS 提供者实现: 系统支持多个 TTS 提供者,以 EdgeTTS 为例:
python
CopyInsert
class TTSProvider(TTSProviderBase):
def __init__(self, config, delete_audio_file):
super().__init__(config, delete_audio_file)
self.voice = config.get("voice")
def generate_filename(self, extension=".mp3"):
return os.path.join(
self.output_file,
f"tts-{datetime.now().date()}@{uuid.uuid4().hex}{extension}"
)
async def text_to_speak(self, text, output_file):
communicate = edge_tts.Communicate(text, voice=self.voice)
await communicate.save(output_file)
- TTS 调用流程:
在 ConnectionHandler
中的调用流程:
python
CopyInsert
class ConnectionHandler:
def __init__(self, config, _vad, _asr, _llm, _tts):
# TTS 相关初始化
self.tts = _tts
self.tts_queue = queue.Queue()
self.executor = ThreadPoolExecutor(max_workers=10)
# TTS 状态变量
self.tts_first_text = None
self.tts_last_text = None
self.tts_start_speak_time = None
self.tts_duration = 0
def speak_and_play(self, text):
# 将文本转换为语音并播放
tmp_file = self.tts.to_tts(text)
return tmp_file
def _priority_thread(self):
# TTS 任务处理线程
while not self.stop_event.is_set():
try:
future = self.tts_queue.get()
# 处理 TTS 任务
tmp_file = future.result()
# 处理生成的音频文件...
- TTS 配置示例:
yaml
CopyInsert
TTS:
EdgeTTS:
type: edge
voice: zh-CN-XiaoxiaoNeural
output_file: audio_cache
- 完整工作流程:
a) 初始化阶段:
- 根据配置创建对应的 TTS 提供者实例
- 初始化线程池和任务队列
- 设置音频输出目录
b) 文本转语音阶段:
- 生成唯一的输出文件名
- 调用 TTS API 将文本转换为语音
- 保存语音文件到指定目录
- 重试机制处理失败情况
c) 任务调度阶段:
- 将 TTS 任务提交到线程池
- 通过任务队列管理多个 TTS 请求
- 异步处理避免阻塞主线程
d) 性能优化:
- 使用线程池并发处理多个 TTS 请求
- 实现音频文件缓存机制
- 支持异步生成和播放
- 错误处理:
- 最多重试 5 次生成失败的音频
- 详细的日志记录
- 异常捕获和处理
- 特点和优势:
- 模块化设计,易于扩展新的 TTS 提供者
- 异步处理提高响应速度
- 支持多种音频格式
- 完善的错误处理机制
- 文件命名保证唯一性
根据性能测试报告,EdgeTTS 的平均合成时间为 1.019s,这个性能表现相当不错,适合实时对话场景。
下发设备端
TTS 的输出确实是通过 WebSocket 流式传输给 ESP32 的。让我详细解释这个流程:
- 音频消息发送流程:
python
CopyInsert
async def sendAudioMessage(conn, audios, duration, text):
base_delay = conn.tts_duration
# 1. 发送开始标记
if text == conn.tts_first_text:
logger.bind(tag=TAG).info(f"发送第一段语音: {text}")
conn.tts_start_speak_time = time.time()
# 2. 发送句子开始标记
sentence_task = asyncio.create_task(
schedule_with_interrupt(base_delay, send_tts_message(conn, "sentence_start", text))
)
conn.scheduled_tasks.append(sentence_task)
conn.tts_duration += duration
# 3. 流式发送音频数据包
for idx, opus_packet in enumerate(audios):
await conn.websocket.send(opus_packet) # 直接通过 WebSocket 发送音频数据
# 4. 检查是否需要发送结束标记
if conn.llm_finish_task and text == conn.tts_last_text:
stop_duration = conn.tts_duration - (time.time() - conn.tts_start_speak_time)
stop_task = asyncio.create_task(
schedule_with_interrupt(stop_duration, send_tts_message(conn, 'stop'))
)
- TTS 状态消息:
python
CopyInsert
async def send_tts_message(conn, state, text=None):
"""发送 TTS 状态消息"""
message = {
"type": "tts",
"state": state, # 可能的状态:start, sentence_start, stop
"session_id": conn.session_id
}
if text is not None:
message["text"] = text
await conn.websocket.send(json.dumps(message))
- 音频数据格式:
- 音频数据被编码为 Opus 格式的数据包
- 每个数据包直接通过 WebSocket 发送
- ESP32 端接收到数据包后进行解码和播放
- 流程控制:
sentence_start
: 每个音频文件开始前发送tts.start
: 第一段语音开始时发送stop
: 最后一段语音结束时发送
- 时序控制:
- 使用
tts_duration
跟踪音频持续时间 - 使用
base_delay
控制发送延迟 - 通过
schedule_with_interrupt
实现可中断的延迟调度
- 错误处理:
- 通过
try-except
处理发送异常 - 支持中断和重试机制
这种流式传输的设计有以下优点:
- 实时性:音频数据生成即发送,不需要等待整个音频生成完成
- 低延迟:分包传输减少了等待时间
- 可中断:支持随时中断当前播放
- 状态同步:通过状态消息保持服务器和客户端的同步
- 资源效率:不需要在服务器端存储完整的音频文件
ESP32 端需要实现相应的 WebSocket 客户端来:
- 接收并解析状态消息
- 缓冲接收到的 Opus 音频数据包
- 解码 Opus 数据并通过 DAC 播放
- 处理开始和结束标记以正确控制播放流程
总结
交互能力
受到硬件限制,单麦,无aec,因此无法全双工体验;
设备说话,人说法,无法打断或者抢说;
后续可以通过带有独立Codec芯片,支持AEC的情况下实测;
耗时分析
唤醒后,没有播放传统的唤醒反馈语,直接进入角色问候语,因为这个角色是配置在云端的,所以需要走一下LLM模型对话,tts流下发。 耗时2.0s左右;
以下为端端耗时;
虾哥服务实测
1.8s~2.1s左右,端到端延迟,大部分情况是2.0s左右;
Xinnan科技
析从录音最后一包到 TTS 首包的延迟:
- VAD 结束判断:
python
CopyInsert
if conn.client_have_voice and not client_have_voice:
stop_duration = time.time() * 1000 - conn.client_have_voice_last_time
if stop_duration >= self.silence_threshold_ms: # 通常设置为 500-1000ms
conn.client_voice_stop = True
VAD 静默判断:500-1000ms
- ASR 处理:
python
CopyInsert
# 保存音频文件
start_time = time.time()
file_path = self.save_audio_to_file(opus_data, session_id)
logger.bind(tag=TAG).debug(f"音频文件保存耗时: {time.time() - start_time:.3f}s")
# 语音识别
start_time = time.time()
result = self.model.generate(
input=file_path,
cache={},
language="auto",
use_itn=True,
batch_size_s=60,
)
logger.bind(tag=TAG).debug(f"语音识别耗时: {time.time() - start_time:.3f}s")
- 音频保存:10-50ms
- ASR 识别:500-2000ms(取决于语音长度和模型性能)
- LLM 处理:
python
CopyInsert
async def startToChat(conn, text):
# 异步发送 stt 信息
stt_task = asyncio.create_task(
schedule_with_interrupt(0, send_stt_message(conn, text))
)
conn.scheduled_tasks.append(stt_task)
conn.executor.submit(conn.chat, text)
LLM 首次响应:1000-3000ms(取决于模型和网络)
- TTS 生成:
python
CopyInsert
# TTS 处理和首包发送
base_delay = conn.tts_duration
sentence_task = asyncio.create_task(
schedule_with_interrupt(base_delay, send_tts_message(conn, "sentence_start", text))
)
TTS 生成首包:500-1000ms
总延迟估计:
CopyInsert
VAD 静默判断(500-1000ms)+
音频保存(10-50ms)+
ASR 识别(500-2000ms)+
LLM 首次响应(1000-3000ms)+
TTS 首包生成(500-1000ms)
最终格式化为:
CopyInsert
VAD(500-1000ms) + 音频保存(10-50ms) + ASR(500-2000ms) + LLM(1000-3000ms) + TTS(500-1000ms)
理论总延迟范围:2.5秒 - 7秒
注意:
- 这些是理论估计值,实际延迟可能因为硬件性能、网络状况、模型大小等因素而变化
- LLM 的响应时间变化最大,因为它依赖于输入文本长度和服务器负载
- 部分处理是并行的,所以实际总延迟可能小于各部分延迟之和
以上是Windsurf 推测的,推理算力、模型大小影响很大,不一定准,后端服务代码有打印日志时间戳,可以查看具体耗时;
start_time = time.time()
result = self.model.generate(
input=file_path,
cache={},
language="auto",
use_itn=True,
batch_size_s=60,
)
text = rich_transcription_postprocess(result[0]["text"])
logger.bind(tag=TAG).debug(f"语音识别耗时: {time.time() - start_time:.3f}s | 结果: {text}")