当前位置: 首页 > article >正文

【AI实践】xiaozhi-esp32虾哥开源版-分析

语音交互总流程

客户端(ESP32) 服务器
| |
| 本地唤醒词检测"小智" |
| |
| 打开音频通道 |
|------------------------>|
| |
| 发送唤醒词音频 |
|------------------------>|
| |
| 发送唤醒事件 |
|------------------------>|
| {"type":"listen", |
| "state":"detect", |
| "text":"小智"} |
| |
| 开始持续音频传输 |
|------------------------>|
| (Opus编码的音频数据) |
| |
| ASR文本结果 |
|<------------------------|
| {"type":"stt", |
| "text":"天气怎么样"} |

芯片侧

系统层调用

codec调用

语音前端AFE

唤醒词检测

VAD检测

音频输入 -> AFE处理 -> VAD检测 -> 状态回调
| | | |
| | | +-> LED显示
| | | +-> 设备状态更新
| | |
| | +-> AFE_VAD_SPEECH/AFE_VAD_SILENCE
| |
| +-> 回声消除(AEC)
| +-> 语音增强(SE)
|
+-> 16kHz采样率
+-> 单声道/双声道

VAD模式的选择影响:

  1. 检测灵敏度:
  • 更高的模式 = 更高的检测门限
  • 更低的模式 = 更容易触发
  1. 环境适应:
  • 嘈杂环境选择高模式
  • 安静环境可以用低模式
  1. 功耗影响:
  • 高模式需要更多处理
  • 低模式处理负担较轻
  1. 用户体验:
  • 高模式可能需要更大声说话
  • 低模式说话音量要求低

在这个项目中选择VAD_MODE_3的原因:

  1. 提高准确性
  2. 减少误触发
  3. 适应各种环境
  4. 与唤醒词检测配合

你可以根据实际使用环境和需求调整VAD模式:

  • 家庭安静环境:可以考虑MODE_1或MODE_2
  • 公共场所:建议使用MODE_3
  • 开发测试:可以用MODE_1方便调试

唤醒词Oneshot

“你好小智,天气”,连读发音

idf从系统环境变量获取sdk路径。

端云通道

客户端 服务器

| |

|------ 唤醒词检测 ------>|

| |

|------ 开始监听 -------->|

| |

|------ 音频数据 -------->|

| |

|<----- ASR结果 ---------|

| |

|<----- TTS音频 ---------|

| |

|------ 停止监听 -------->|

通信类型

获取ASR文本

TTS接收播放

MQTT服务器 ESP32设备

| |

|--- TTS音频数据(Opus) ---->|

| |

| | 1. 加入解码队列

| |

| | 2. OutputAudio线程处理

| |

| | 3. Opus解码为PCM

| |

| | 4. 重采样(如需要)

| |

| | 5. 输出到DAC

总结

当前采用面板板Wi-Fi方案

单麦克风,没有AEC没有全双工

wakeup-speaking-listening-speaking-listening

Server服务

虾哥的服务没有开源

但是提供了一个使用后台

小智 AI 聊天机器人

烧录后,管理设备填入mac地址;

tts引擎使用的是字节跳动,火山引擎

音色列表--语音技术-火山引擎

LLM可以选择

Server服务(Xinnan科技)

非虾哥服务端,兼容虾哥,其它开发者开发的。


 

分析下 xiaozhi-esp32-server文件下的,这是服务器端的代码

让我帮你分析服务器端的代码。首先我需要找到相关的文件。

Searched ind:\wsl\github\xiaozhi-ai

1 result

让我看看服务器目录下的文件结构:

Analyzed

d:\wsl\github\xiaozhi-ai\xiaozhi-esp32-server

这是一个基于Python的服务器项目,让我们分析主要的组件:

  1. 主要入口文件: 让我先查看 app.py,这是服务器的入口文件:

Analyzed

app.py:(1-100)

让我们看看主要的配置文件 config.yaml:

Analyzed

config.yaml:(1-100)

根据代码分析,这是一个基于Python的语音对话服务器,主要功能和结构如下:

  1. 主要组件
    • WebSocket服务器:处理实时语音通信
    • WebUI管理界面:提供Web管理界面(目前处于开发中)
    • 多个AI模块:ASR(语音识别)、VAD(语音活动检测)、LLM(大语言模型)、TTS(文本转语音)
  1. 核心功能
    • 语音识别:使用FunASR模型
    • 语音活动检测:使用SileroVAD
    • 大语言模型:支持多种LLM(ChatGLM、DeepSeek、Ali等)
    • 文本转语音:支持EdgeTTS
  1. 项目结构
    • app.py:主入口文件
    • core/:核心功能实现
    • manager/:管理界面相关代码
    • models/:AI模型存储目录
    • config/:配置文件目录
    • ZhiKongTaiWeb/:Web界面代码
  1. 配置特点
    • 支持设备认证(可选)
    • 可配置不同的AI模型
    • 支持多设备管理
    • 可自定义提示词和对话风格
  1. 部署相关
    • 提供Docker支持(有Dockerfile和docker-compose配置)
    • 默认WebSocket服务端口:8000
    • 默认管理界面端口:8002

这是一个功能完整的语音对话服务器,主要用于处理ESP32设备的语音交互。它采用模块化设计,可以灵活配置不同的AI模型和服务。服务器支持实时语音处理,并可以通过WebSocket与客户端(ESP32设备)进行通信。

需要注意的是,管理界面(WebUI)目前还在开发中,建议保持默认配置(disabled)。同时,项目支持Docker部署,便于在不同环境中快速搭建服务。

录音 -> VAD分段 -> 累积音频 -> VAD检测结束 -> ASR整段识别 -> 文本处理 -> LLM对话

录音上传

录音上传和 ASR 处理的完整流程如下:

  1. 录音数据格式
  • ESP32 将音频编码为 Opus 格式
  • 通过 WebSocket 二进制消息发送到服务器
  • 采样率为 16kHz,单声道
  1. 服务器接收流程
python
CopyInsert


async def handleAudioMessage(conn, audio):
    if not conn.asr_server_receive:
        logger.bind(tag=TAG).debug(f"前期数据处理中,暂停接收")
        return
  1. VAD 处理流程
python
CopyInsert


class SileroVAD(VAD):
    def is_vad(self, conn, opus_packet):
        # 1. 解码 Opus 数据
        pcm_frame = self.decoder.decode(opus_packet, 960)
        conn.client_audio_buffer += pcm_frame
        
        # 2. 分帧处理(每帧 512 采样点)
        while len(conn.client_audio_buffer) >= 512 * 2:
            chunk = conn.client_audio_buffer[:512 * 2]
            conn.client_audio_buffer = conn.client_audio_buffer[512 * 2:]
            
            # 3. 数据格式转换
            audio_int16 = np.frombuffer(chunk, dtype=np.int16)
            audio_float32 = audio_int16.astype(np.float32) / 32768.0
            audio_tensor = torch.from_numpy(audio_float32)
            
            # 4. VAD 检测
            speech_prob = self.model(audio_tensor, 16000).item()
            client_have_voice = speech_prob >= self.vad_threshold
            
            # 5. 语音段落检测
            if conn.client_have_voice and not client_have_voice:
                stop_duration = time.time() * 1000 - conn.client_have_voice_last_time
                if stop_duration >= self.silence_threshold_ms:
                    conn.client_voice_stop = True
  1. 音频数据累积
python
CopyInsert


if have_voice == False and conn.client_have_voice == False:
    await no_voice_close_connect(conn)
    conn.asr_audio.clear()
    return
conn.client_no_voice_last_time = 0.0
conn.asr_audio.append(audio)
  1. ASR 处理触发
python
CopyInsert


if conn.client_voice_stop:
    conn.client_abort = False
    conn.asr_server_receive = False
    # 调用 ASR 进行语音识别
    text, file_path = conn.asr.speech_to_text(conn.asr_audio, conn.session_id)
    logger.bind(tag=TAG).info(f"识别文本: {text}")
  1. 处理模式
python
CopyInsert


if conn.client_listen_mode == "auto":
    have_voice = conn.vad.is_vad(conn, audio)
else:
    have_voice = conn.client_have_voice
  1. 关键参数配置
python
CopyInsert


self.vad_threshold = config.get("threshold")  # VAD 检测阈值
self.silence_threshold_ms = config.get("min_silence_duration_ms")  # 静默时长阈值

整个流程的特点:

  1. 实时处理
  • 流式接收音频数据
  • 实时 VAD 检测
  • 动态累积有效语音段
  1. 智能分段
  • 使用 SileroVAD 模型进行语音活动检测
  • 基于概率阈值判断语音存在
  • 通过静默时长判断语音段结束
  1. 数据处理
  • Opus 解码为 PCM
  • 固定大小分帧处理
  • 数据格式规范化
  1. 双模式支持
  • 自动模式:依赖 VAD 检测
  • 手动模式:依赖客户端控制
  1. 错误处理
  • Opus 解码错误处理
  • 异常状态恢复
  • 资源清理机制
  1. 性能优化
  • 缓冲区管理
  • 分帧处理减少延迟
  • 异步处理避免阻塞

这种设计实现了高效的语音采集和处理流程,能够准确捕获用户的语音输入,并通过 VAD 智能判断语音段落,为后续的 ASR 处理提供高质量的语音数据。

VAD

VAD 的具体实现,特别是它的说话结束判断逻辑:

VAD 不仅在开始检测语音活动,也负责判断说话结束。让我解释一下关键的逻辑:

  1. 语音开始检测
python
CopyInsert


# 检测语音活动
speech_prob = self.model(audio_tensor, 16000).item()
client_have_voice = speech_prob >= self.vad_threshold

if client_have_voice:
    conn.client_have_voice = True
    conn.client_have_voice_last_time = time.time() * 1000
  • 使用 Silero VAD 模型计算语音概率
  • 如果概率超过阈值(默认 0.5),则认为有语音
  • 记录检测到语音的时间戳
  1. 说话结束判断
python
CopyInsert


# 如果之前有声音,但本次没有声音,且与上次有声音的时间差已经超过了静默阈值,则认为已经说完一句话
if conn.client_have_voice and not client_have_voice:
    stop_duration = time.time() * 1000 - conn.client_have_voice_last_time
    if stop_duration >= self.silence_threshold_ms:
        conn.client_voice_stop = True

关键参数在配置文件中:

yaml
CopyInsert


VAD:
  SileroVAD:
    threshold: 0.5  # 语音检测阈值
    model_dir: models/snakers4_silero-vad
    min_silence_duration_ms: 700  # 静默持续时间阈值,如果说话停顿比较长,可以把这个值设置大一些

说话结束的判定条件是:

  1. 之前检测到有声音 (conn.client_have_voice)
  2. 当前帧没有检测到声音 (not client_have_voice)
  3. 从上次检测到声音到现在的时间间隔超过了设定的静默阈值(默认 700ms)

这个机制可以防止:

  • 说话中的短暂停顿被误判为说话结束
  • 等待时间过长导致的延迟
  • 背景噪音的干扰

如果你觉得系统对说话结束的判断太快或太慢,可以通过调整 min_silence_duration_ms 参数来优化:

  • 值设置大一些:更容易处理说话时的长停顿,但响应可能会变慢
  • 值设置小一些:响应更快,但可能会把说话中的停顿误判为说话结束

  1. 在 VAD 判停之前,所有有效的音频数据都会被添加到 conn.asr_audio 缓冲区中
  2. 但是这些音频数据并不会被送到 ASR 模型进行处理
  3. 只有当 conn.client_voice_stop 为 True(即 VAD 检测到说话结束)时,才会调用 speech_to_text 进行识别

这种设计的原因:

  1. 资源效率:避免频繁调用 ASR 模型,减少计算资源消耗
  2. 准确性:等待完整的语音段可以获得更好的识别效果
  3. 实现简单:不需要处理复杂的流式 ASR 逻辑

但这也意味着:

  1. ASR 不是实时进行的,需要等待整段语音结束
  2. 所有音频数据都会被缓存在内存中,直到 VAD 判停
  3. 如果语音段很长,可能会导致一定的延迟

ASR识别

现在我可以为你完整地描述 SenseVoiceSmall 的调用流程:

  1. 初始化阶段
python
CopyInsert


# 1. 服务器启动时初始化 ASR 模型
model = AutoModel(
    model="models/SenseVoiceSmall",
    vad_kwargs={"max_single_segment_time": 30000},
    disable_update=True,
    hub="hf"
)

音频处理流程

a. 接收音频数据

    • WebSocket 服务器接收来自 ESP32 的 Opus 格式音频数据
    • 数据先经过 VAD (语音活动检测) 判断是否有声音
python
CopyInsert


have_voice = conn.vad.is_vad(conn, audio)

b. 音频数据缓存

    • 如果检测到有声音,将音频数据添加到缓存
python
CopyInsert


conn.asr_audio.append(audio)

c. 音频转换处理

    • 当语音段结束时,开始处理缓存的音频数据
    • 将 Opus 格式音频解码为 WAV 格式
python
CopyInsert


def save_audio_to_file(self, opus_data, session_id):
    decoder = opuslib_next.Decoder(16000, 1)  # 16kHz, 单声道
    for opus_packet in opus_data:
        pcm_frame = decoder.decode(opus_packet, 960)
        pcm_data.append(pcm_frame)

d. 语音识别

    • 使用 SenseVoiceSmall 模型进行语音识别
python
CopyInsert


result = self.model.generate(
    input=file_path,
    cache={},
    language="auto",
    use_itn=True,
    batch_size_s=60,
)
text = rich_transcription_postprocess(result[0]["text"])

后处理流程

a. 文本处理

    • 对识别出的文本进行处理(去除标点符号等)
python
CopyInsert


text_len, text_without_punctuation = remove_punctuation_and_length(text)

b. 命令判断

    • 判断是否是退出命令
    • 如果不是命令,则进入对话流程
python
CopyInsert


if text_len <= conn.max_cmd_length and await handleCMDMessage(conn, text_without_punctuation):
    return
if text_len > 0:
    await startToChat(conn, text)

c. 清理工作

    • 清理临时音频文件
    • 重置 VAD 状态
python
CopyInsert


conn.asr_audio.clear()
conn.reset_vad_states()
  1. 特点和优化
    • 支持实时语音识别
    • 使用 VAD 进行语音活动检测,提高识别效率
    • 支持自动和手动两种语音检测模式
    • 临时文件自动清理机制
    • 异步处理,不阻塞主线程
    • 支持多种语言自动识别("auto" 模式)

这就是 SenseVoiceSmall 模型在系统中的完整调用流程,从接收音频数据到最终输出文本结果的整个过程。

其中,后处理

步骤总结:

  1. ASR 基础处理
    • 音频文件保存
    • 使用 FunASR 模型进行识别
    • 使用 ITN 进行文本规范化
  1. 文本清理
    • 去除全角/半角标点
    • 去除空格
    • 去除表情符号
    • 去除首尾特殊字符
  1. 命令识别
    • 检查文本长度
    • 匹配退出命令
    • 执行相应操作
  1. 错误处理
    • 音频处理异常
    • 识别失败处理
    • 日志记录

这些处理确保了:

  1. 文本的规范化和清洁度
  2. 命令的准确识别
  3. 与 LLM 的良好交互
  4. 系统的稳定性

LLM调用

分析 LLM (大语言模型) 的完整调用流程:

  1. 初始化阶段
python
CopyInsert


# 在 ConnectionHandler 初始化时
self.llm = _llm  # LLM 实例
self.dialogue = Dialogue()  # 对话管理器
self.llm_finish_task = False  # LLM 任务完成标志
  1. 对话初始化
python
CopyInsert


def _initialize_components(self):
    # 设置系统提示词
    self.prompt = self.config["prompt"]
    if self.private_config:
        self.prompt = self.private_config.private_config.get("prompt", self.prompt)
    # 添加时间信息
    if "{date_time}" in self.prompt:
        date_time = time.strftime("%Y-%m-%d %H:%M", time.localtime())
        self.prompt = self.prompt.replace("{date_time}", date_time)
    # 添加系统提示词到对话历史
    self.dialogue.put(Message(role="system", content=self.prompt))
  1. 对话处理流程
python
CopyInsert


def chat(self, query):
    # 1. 添加用户输入到对话历史
    self.dialogue.put(Message(role="user", content=query))
    
    # 2. 调用 LLM 生成回复
    llm_responses = self.llm.response(
        self.session_id, 
        self.dialogue.get_llm_dialogue()
    )
    
    # 3. 流式处理 LLM 响应
    response_message = []
    start = 0
    self.llm_finish_task = False
    
    for content in llm_responses:
        response_message.append(content)
        
        # 检查是否被用户中断
        if self.client_abort:
            start = len(response_message)
            break
            
        # 分段处理文本,转换为语音
        if is_segment(response_message):
            segment_text = "".join(response_message[start:])
            segment_text = get_string_no_punctuation_or_emoji(segment_text)
            if len(segment_text) > 0:
                self.recode_first_last_text(segment_text)
                future = self.executor.submit(self.speak_and_play, segment_text)
                self.tts_queue.put(future)
                start = len(response_message)
    
    # 4. 更新对话历史
    self.dialogue.put(Message(
        role="assistant", 
        content="".join(response_message)
    ))
  1. LLM 提供者实现(以 ChatGLM 为例):
python
CopyInsert


class LLMProvider(LLMProviderBase):
    def __init__(self, config):
        self.model_name = config.get("model_name")  # glm-4-flash
        self.api_key = config.get("api_key")
        self.base_url = config.get("url")  # https://open.bigmodel.cn/api/paas/v4/
        self.client = openai.OpenAI(
            api_key=self.api_key, 
            base_url=self.base_url
        )

    def response(self, session_id, dialogue):
        responses = self.client.chat.completions.create(
            model=self.model_name,
            messages=dialogue,
            stream=True
        )
        for chunk in responses:
            if chunk.choices and chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
  1. 特点和优化
    • 支持流式输出:LLM 生成的文本会实时转换为语音
    • 支持用户中断:可以随时停止当前对话
    • 分段处理:长文本会被分段处理,提升响应速度
    • 对话历史管理:维护完整的对话上下文
    • 私有配置支持:可以为不同设备配置不同的 LLM 和提示词
    • 错误处理:包含完整的错误处理机制
  1. 配置示例
yaml
CopyInsert


LLM:
  ChatGLMLLM:
    type: openai
    model_name: glm-4-flash  # 免费模型
    url: https://open.bigmodel.cn/api/paas/v4/
    api_key: your_api_key

这就是完整的 LLM 调用流程,从接收用户输入到生成回复,再到转换为语音输出的整个过程。系统采用了流式处理和异步执行的方式,可以提供更好的实时交互体验。

TTS调用

LLM 是流式输出的,但会按照句子为单位分段给 TTS。具体流程如下:

  1. LLM 流式输出
python
CopyInsert


# 获取 LLM 的流式响应
llm_responses = self.llm.response(self.session_id, self.dialogue.get_llm_dialogue())

# 逐个处理 token
for content in llm_responses:
    response_message.append(content)
    
    # 检查是否需要中断
    if self.client_abort:
        start = len(response_message)
        break

    # 记录生成时间
    end_time = time.time()
    self.logger.bind(tag=TAG).debug(f"大模型返回时间时间: {end_time - start_time} 秒, 生成token={content}")
  1. 按句子分段处理
python
CopyInsert


# 检查是否可以分段(遇到标点符号)
if is_segment(response_message):
    # 将当前积累的 tokens 组合成文本
    segment_text = "".join(response_message[start:])
    # 清理标点和表情
    segment_text = get_string_no_punctuation_or_emoji(segment_text)
    
    if len(segment_text) > 0:
        # 记录第一段和最后一段文本
        self.recode_first_last_text(segment_text)
        # 提交 TTS 任务
        future = self.executor.submit(self.speak_and_play, segment_text)
        self.tts_queue.put(future)
        # 更新起始位置
        start = len(response_message)
  1. 分段判断逻辑
python
CopyInsert


def is_segment(tokens):
    # 检查最后一个字符是否为标点符号
    if tokens[-1] in (",", ".", "?", ",", "。", "?", "!", "!", ";", ";", ":", ":"):
        return True
    else:
        return False
  1. 处理流程
  • LLM 以 token 为单位流式输出
  • 系统会缓存这些 token 直到遇到标点符号
  • 遇到标点时,将累积的 token 组合成一个完整句子
  • 将句子交给 TTS 处理
  • TTS 将语音通过 WebSocket 发送给 ESP32
  1. 优化设计
  • 使用线程池处理 TTS 任务
  • 维护任务队列确保顺序执行
  • 支持中断机制
  • 记录第一段和最后一段文本用于状态控制

这种设计的优点:

  1. 实时性:不需要等待 LLM 生成完整回复
  2. 自然度:按句子分段更符合人类说话方式
  3. 响应速度:边生成边转换为语音
  4. 资源效率:避免处理过长的文本
  5. 可控性:支持随时中断生成和播放

所以总结来说,是 LLM 流式输出 token,但系统会将这些 token 按句子组织后再交给 TTS 处理,这样既保证了实时性,又确保了语音输出的自然性。

分析一下 TTS 的完整调用流程:

  1. TTS 基础架构: 系统采用了基类 TTSProviderBase 和多个具体实现类的设计模式:
python
CopyInsert


class TTSProviderBase(ABC):
    def __init__(self, config, delete_audio_file):
        self.delete_audio_file = delete_audio_file
        self.output_file = config.get("output_file")

    @abstractmethod
    def generate_filename(self):
        pass

    def to_tts(self, text):
        tmp_file = self.generate_filename()
        try:
            max_repeat_time = 5
            while not os.path.exists(tmp_file) and max_repeat_time > 0:
                asyncio.run(self.text_to_speak(text, tmp_file))
                if not os.path.exists(tmp_file):
                    max_repeat_time = max_repeat_time - 1
                    
            return tmp_file
        except Exception as e:
            logger.bind(tag=TAG).info(f"Failed to generate TTS file: {e}")
  1. TTS 提供者实现: 系统支持多个 TTS 提供者,以 EdgeTTS 为例:
python
CopyInsert


class TTSProvider(TTSProviderBase):
    def __init__(self, config, delete_audio_file):
        super().__init__(config, delete_audio_file)
        self.voice = config.get("voice")

    def generate_filename(self, extension=".mp3"):
        return os.path.join(
            self.output_file, 
            f"tts-{datetime.now().date()}@{uuid.uuid4().hex}{extension}"
        )

    async def text_to_speak(self, text, output_file):
        communicate = edge_tts.Communicate(text, voice=self.voice)
        await communicate.save(output_file)
  1. TTS 调用流程

ConnectionHandler 中的调用流程:

python
CopyInsert


class ConnectionHandler:
    def __init__(self, config, _vad, _asr, _llm, _tts):
        # TTS 相关初始化
        self.tts = _tts
        self.tts_queue = queue.Queue()
        self.executor = ThreadPoolExecutor(max_workers=10)
        
        # TTS 状态变量
        self.tts_first_text = None
        self.tts_last_text = None
        self.tts_start_speak_time = None
        self.tts_duration = 0

    def speak_and_play(self, text):
        # 将文本转换为语音并播放
        tmp_file = self.tts.to_tts(text)
        return tmp_file

    def _priority_thread(self):
        # TTS 任务处理线程
        while not self.stop_event.is_set():
            try:
                future = self.tts_queue.get()
                # 处理 TTS 任务
                tmp_file = future.result()
                # 处理生成的音频文件...
  1. TTS 配置示例
yaml
CopyInsert


TTS:
  EdgeTTS:
    type: edge
    voice: zh-CN-XiaoxiaoNeural
    output_file: audio_cache
  1. 完整工作流程

a) 初始化阶段

  • 根据配置创建对应的 TTS 提供者实例
  • 初始化线程池和任务队列
  • 设置音频输出目录

b) 文本转语音阶段

  • 生成唯一的输出文件名
  • 调用 TTS API 将文本转换为语音
  • 保存语音文件到指定目录
  • 重试机制处理失败情况

c) 任务调度阶段

  • 将 TTS 任务提交到线程池
  • 通过任务队列管理多个 TTS 请求
  • 异步处理避免阻塞主线程

d) 性能优化

  • 使用线程池并发处理多个 TTS 请求
  • 实现音频文件缓存机制
  • 支持异步生成和播放
  1. 错误处理
  • 最多重试 5 次生成失败的音频
  • 详细的日志记录
  • 异常捕获和处理
  1. 特点和优势
  • 模块化设计,易于扩展新的 TTS 提供者
  • 异步处理提高响应速度
  • 支持多种音频格式
  • 完善的错误处理机制
  • 文件命名保证唯一性

根据性能测试报告,EdgeTTS 的平均合成时间为 1.019s,这个性能表现相当不错,适合实时对话场景。

下发设备端

TTS 的输出确实是通过 WebSocket 流式传输给 ESP32 的。让我详细解释这个流程:

  1. 音频消息发送流程
python
CopyInsert


async def sendAudioMessage(conn, audios, duration, text):
    base_delay = conn.tts_duration

    # 1. 发送开始标记
    if text == conn.tts_first_text:
        logger.bind(tag=TAG).info(f"发送第一段语音: {text}")
        conn.tts_start_speak_time = time.time()

    # 2. 发送句子开始标记
    sentence_task = asyncio.create_task(
        schedule_with_interrupt(base_delay, send_tts_message(conn, "sentence_start", text))
    )
    conn.scheduled_tasks.append(sentence_task)

    conn.tts_duration += duration

    # 3. 流式发送音频数据包
    for idx, opus_packet in enumerate(audios):
        await conn.websocket.send(opus_packet)  # 直接通过 WebSocket 发送音频数据

    # 4. 检查是否需要发送结束标记
    if conn.llm_finish_task and text == conn.tts_last_text:
        stop_duration = conn.tts_duration - (time.time() - conn.tts_start_speak_time)
        stop_task = asyncio.create_task(
            schedule_with_interrupt(stop_duration, send_tts_message(conn, 'stop'))
        )
  1. TTS 状态消息
python
CopyInsert


async def send_tts_message(conn, state, text=None):
    """发送 TTS 状态消息"""
    message = {
        "type": "tts",
        "state": state,  # 可能的状态:start, sentence_start, stop
        "session_id": conn.session_id
    }
    if text is not None:
        message["text"] = text

    await conn.websocket.send(json.dumps(message))
  1. 音频数据格式
  • 音频数据被编码为 Opus 格式的数据包
  • 每个数据包直接通过 WebSocket 发送
  • ESP32 端接收到数据包后进行解码和播放
  1. 流程控制
  • sentence_start: 每个音频文件开始前发送
  • tts.start: 第一段语音开始时发送
  • stop: 最后一段语音结束时发送
  1. 时序控制
  • 使用 tts_duration 跟踪音频持续时间
  • 使用 base_delay 控制发送延迟
  • 通过 schedule_with_interrupt 实现可中断的延迟调度
  1. 错误处理
  • 通过 try-except 处理发送异常
  • 支持中断和重试机制

这种流式传输的设计有以下优点:

  1. 实时性:音频数据生成即发送,不需要等待整个音频生成完成
  2. 低延迟:分包传输减少了等待时间
  3. 可中断:支持随时中断当前播放
  4. 状态同步:通过状态消息保持服务器和客户端的同步
  5. 资源效率:不需要在服务器端存储完整的音频文件

ESP32 端需要实现相应的 WebSocket 客户端来:

  1. 接收并解析状态消息
  2. 缓冲接收到的 Opus 音频数据包
  3. 解码 Opus 数据并通过 DAC 播放
  4. 处理开始和结束标记以正确控制播放流程

总结

交互能力

受到硬件限制,单麦,无aec,因此无法全双工体验;

设备说话,人说法,无法打断或者抢说;

后续可以通过带有独立Codec芯片,支持AEC的情况下实测;

耗时分析

唤醒后,没有播放传统的唤醒反馈语,直接进入角色问候语,因为这个角色是配置在云端的,所以需要走一下LLM模型对话,tts流下发。 耗时2.0s左右;

以下为端端耗时;

虾哥服务实测

1.8s~2.1s左右,端到端延迟,大部分情况是2.0s左右;

Xinnan科技

析从录音最后一包到 TTS 首包的延迟:

  1. VAD 结束判断
python
CopyInsert


if conn.client_have_voice and not client_have_voice:
    stop_duration = time.time() * 1000 - conn.client_have_voice_last_time
    if stop_duration >= self.silence_threshold_ms:  # 通常设置为 500-1000ms
        conn.client_voice_stop = True

VAD 静默判断:500-1000ms

  1. ASR 处理
python
CopyInsert


# 保存音频文件
start_time = time.time()
file_path = self.save_audio_to_file(opus_data, session_id)
logger.bind(tag=TAG).debug(f"音频文件保存耗时: {time.time() - start_time:.3f}s")

# 语音识别
start_time = time.time()
result = self.model.generate(
    input=file_path,
    cache={},
    language="auto",
    use_itn=True,
    batch_size_s=60,
)
logger.bind(tag=TAG).debug(f"语音识别耗时: {time.time() - start_time:.3f}s")
  • 音频保存:10-50ms
  • ASR 识别:500-2000ms(取决于语音长度和模型性能)
  1. LLM 处理
python
CopyInsert


async def startToChat(conn, text):
    # 异步发送 stt 信息
    stt_task = asyncio.create_task(
        schedule_with_interrupt(0, send_stt_message(conn, text))
    )
    conn.scheduled_tasks.append(stt_task)
    conn.executor.submit(conn.chat, text)

LLM 首次响应:1000-3000ms(取决于模型和网络)

  1. TTS 生成
python
CopyInsert


# TTS 处理和首包发送
base_delay = conn.tts_duration
sentence_task = asyncio.create_task(
    schedule_with_interrupt(base_delay, send_tts_message(conn, "sentence_start", text))
)

TTS 生成首包:500-1000ms

总延迟估计:

CopyInsert

VAD 静默判断(500-1000ms)+ 
音频保存(10-50ms)+ 
ASR 识别(500-2000ms)+ 
LLM 首次响应(1000-3000ms)+ 
TTS 首包生成(500-1000ms)

最终格式化为:

CopyInsert

VAD(500-1000ms) + 音频保存(10-50ms) + ASR(500-2000ms) + LLM(1000-3000ms) + TTS(500-1000ms)

理论总延迟范围:2.5秒 - 7秒

注意:

  1. 这些是理论估计值,实际延迟可能因为硬件性能、网络状况、模型大小等因素而变化
  2. LLM 的响应时间变化最大,因为它依赖于输入文本长度和服务器负载
  3. 部分处理是并行的,所以实际总延迟可能小于各部分延迟之和

以上是Windsurf 推测的,推理算力、模型大小影响很大,不一定准,后端服务代码有打印日志时间戳,可以查看具体耗时;

start_time = time.time()
result = self.model.generate(
    input=file_path,
    cache={},
    language="auto",
    use_itn=True,
    batch_size_s=60,
)
text = rich_transcription_postprocess(result[0]["text"])
logger.bind(tag=TAG).debug(f"语音识别耗时: {time.time() - start_time:.3f}s | 结果: {text}")

http://www.kler.cn/a/569217.html

相关文章:

  • PyTorch 中的 nn.ModuleList 是什么?与普通列表有啥区别?
  • C#调用CANoeCLRAdapter.dll文章(一)
  • Go语言学习笔记(六)——标准库
  • 算法系列之双指针(待完善题目)
  • openssl下aes128算法xts模式加解密运算实例
  • MySQL零基础教程13—分组查询(group by 和 having)
  • 消息中间件应用的常见问题与方案?
  • 华为 Open Gauss 数据库在 Spring Boot 中使用 Flyway
  • 【Delphi】如何解决使用webView2时主界面置顶,而导致网页选择文件对话框被覆盖问题
  • Python的那些事第三十四篇:基于 Plotly 的交互式图表与仪表板设计与应用
  • 【北京迅为】itop-3568 开发板openharmony鸿蒙烧写及测试-第1章 体验OpenHarmony—烧写镜像
  • 6-2JVM解释器
  • docker利用docker-compose-gpu.yml启动RAGFLOW,文档解析出错【亲测已解决】
  • 高效API开发:FastAPI中的缓存技术与性能优化
  • 前缀和算法 算法4
  • unsloth报错FileNotFoundError: [WinError 3] 系统找不到指定的路径。
  • Transformer 代码剖析2 - 模型训练 (pytorch实现)
  • 【大模型学习笔记】0基础本地部署dify教程
  • AI辅助学习vue第十四章
  • 欧拉22.03系统安装离线redis 6.2.5