当前位置: 首页 > article >正文

语音识别控制(软件、硬件)

1. 环境

python版本:3.11.9

2. 完整代码

import sqlite3
import time
import wave  # 使用wave库可读、写wav类型的音频文件
from funasr import AutoModel
import sounddevice as sd
import numpy as np
from modelscope import pipeline, Tasks
from pypinyin import lazy_pinyin
import pyaudio  # 使用pyaudio库可以进行录音,播放,生成wav文件
# 模型参数设置
chunk_size = [0, 10, 5]
encoder_chunk_look_back = 7
decoder_chunk_look_back = 5
is_task_running= True
model = AutoModel(model="D:\SpeechRecognize\speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch")

# 假设模型要求的采样率为 16000
fs = 16000
duration = 3 #时间
chunk_stride = chunk_size[1] * 960
cache = {}
window_size = 3

# 连接到 SQLite 数据库,如果不存在则会创建新的数据库文件
conn = sqlite3.connect('speech_recognition.db')
cursor = conn.cursor()

# 创建表格
cursor.execute('''
    CREATE TABLE IF NOT EXISTS speech_data
    (text TEXT, time_stamp TEXT, batch TEXT)
''')

def record(time):  # 录音程序
    # 定义数据流块
    CHUNK = 1024  # 音频帧率(也就是每次读取的数据是多少,默认1024FORMAT = pyaudio.paInt16  # 采样时生成wav文件正常格式
    CHANNELS = 1  # 音轨数(每条音轨定义了该条音轨的属性,如音轨的音色、音色库、通道数、输入/输出端口、音量等。可以多个音轨,不唯一)
    RATE = 16000  # 采样率(即每秒采样多少数据)
    RECORD_SECONDS = time  # 录音时间
    WAVE_OUTPUT_FILENAME = "./output.wav"  # 保存音频路径
    p = pyaudio.PyAudio()  # 创建PyAudio对象
    stream = p.open(format=FORMAT,  # 采样生成wav文件的正常格式
                    channels=CHANNELS,  # 音轨数
                    rate=RATE,  # 采样率
                    input=True,  # Ture代表这是一条输入流,False代表这不是输入流
                    frames_per_buffer=CHUNK)  # 每个缓冲多少帧
    print("* 开始录音")  # 开始录音标志
    frames = []  # 定义frames为一个空列表
    for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):  # 计算要读多少次,每秒的采样率/每次读多少数据*录音时间=需要读多少次
        data = stream.read(CHUNK)  # 每次读chunk个数据
        frames.append(data)  # 将读出的数据保存到列表中
    print("* 结束语音")  # 结束录音标志
    stream.stop_stream()  # 停止输入流
    stream.close()  # 关闭输入流
    p.terminate()  # 终止pyaudio
    wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')  # 以'wb‘二进制流写的方式打开一个文件
    wf.setnchannels(CHANNELS)  # 设置音轨数
    wf.setsampwidth(p.get_sample_size(FORMAT))  # 设置采样点数据的格式,和FOMART保持一致
    wf.setframerate(RATE)  # 设置采样率与RATE要一致
    wf.writeframes(b''.join(frames))  # 将声音数据写入文件
    wf.close()  # 数据流保存完,关闭文件
    while is_task_running:
    start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    myrecording = sd.rec(int(fs * duration), samplerate=fs, channels=1)
    sd.wait()
    speech_chunk = myrecording.flatten()
    # 噪声处理
    filtered_chunk = np.convolve(speech_chunk, np.ones(window_size) / window_size, mode='same')
    speech_chunk = filtered_chunk
    is_final = False
    res = model.generate(input=speech_chunk, cache=cache, is_final=is_final, chunk_size=chunk_size,
                         encoder_chunk_look_back=encoder_chunk_look_back,
                         decoder_chunk_look_back=decoder_chunk_look_back)
    text_result=''.join(lazy_pinyin(str(res[0]['text']))).replace(" ", "")
    # 唤醒词
    s1=''.join(lazy_pinyin(str("小爱")))
    if s1 in text_result:
        #关闭循环
        is_task_running ==False
        print("已唤醒,开始录音")
        record(5)  # 定义录音时间,单位/s
        inference_pipeline = pipeline(
            task=Tasks.auto_speech_recognition,
            model='D:/SpeechRecognize/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch',
            model_revision="v2.0.4")
        rec_result = inference_pipeline('./output.wav', hotword='')
        same = ''.join(lazy_pinyin(rec_result[0]["text"].replace(" ", "")))
        print("语音转文字" + same)
        #匹配字符关键词
        #关键词1 、、、、
        g1 = ''.join(lazy_pinyin(str("打开空调")))
        if g1 in same:
            #通讯发送消息,我会提供五种硬件通讯方式 MTTTSocketModBusTcpIP、串口、HTTP请求
            print("发送给设备")
        is_task_running == True
    cursor.execute("INSERT INTO speech_data (text, time_stamp, batch) VALUES (?,?,?)",
                   (text_result, start_time, 'eerr'))
    conn.commit()

3. 硬件通讯

很多人搞不懂,如何用软件控制硬件,但是实际上没大家想的那么复杂,一般的硬件都会提供接口,只要找到厂家要他的通讯方式和通讯内容,就可以实现用软件控制硬件

3.1ModbusTCPIP

比较通用的工业通讯协议,读写PLC数据

from pymodbus.client import ModbusTcpClient
def read_data(ip, port, postion):
    # 创建 Modbus TCP 客户端并连接
    client = ModbusTcpClient(ip, port=port)  # 请替换为实际的设备 IP 和端口
    client.connect()
    try:
        # 读取保持寄存器
        num = 0
        result = client.read_holding_registers(postion, 1)
        for value in result.registers:
            print(value)
            num = value
    except Exception as e:
        print("Exception:", e)
    finally:
        # 关闭连接
        client.close()
    return str(num)


def send_data(ip, port, postion, num):
    client = ModbusTcpClient(ip, port=port)
    client.connect()

    try:
        # 发送数据到保持寄存器
        write_result = client.write_registers(postion, [num])  # 从地址 20 开始写入数据
        if not write_result.isError():
            print("Write Success")
        else:
            print("Write Error:", write_result)

    except Exception as e:
        print("Exception:", e)

    finally:
        client.close()
    return 2

3.2MQTT

最近比较流行的工业协议
读取消息

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("your_topic")

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("broker_ip_address", 1883, 60)

client.loop_forever()

写入消息

import paho.mqtt.publish as publish

publish.single("your_topic", "your_message", hostname="broker_ip_address", port=1883)

3.3 Socket TCPIP

def socketddd(ip, port, code):

    # 要发送的内容
    content = str(code)+'\a\r\n'
    # 创建套接字
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 连接服务器
    server_address = (str(ip), int(port))
    s.connect(server_address)

    # 发送数据
    s.sendall(content.encode())

    # 关闭连接
    s.close()

3.4 串口

比较基础的串口

def task1():
    serials = serial.Serial('COM5', 9600, timeout=0.5)
    while is_task_running1:
        if serials.isOpen():
            print("open success")
            send_data_hex = bytes.fromhex('5A 06 00 00 60\r\n')
            serials.write(send_data_hex)  # 编码
        else:
            print("open failed")
        time.sleep(1)  # 每隔 5 秒执行一次

4. 效果

在这里插入图片描述

5.问题

1.必须有麦克风才能跑起来
2.关于模型包,可以直接从模型社区下载(也可私信我)
3.最后的效果与你电脑的显卡有直接联系
4.关于唤醒后定时录音,实际上可以通过计算音频波形判断是否有音频输入,一般2秒没有音频就默认结束录音
5.关于通讯,实际上本质就是报文,在局域网和公网只要连接上即可


http://www.kler.cn/a/326344.html

相关文章:

  • js代理模式
  • 基于 OPENCV 和 MFC 的图像处理程序
  • Zustand selector 发生 infinate loops的原因以及解决
  • xml简介
  • 【计算机网络】什么是网关(Gateway)?
  • YARN WebUI 服务
  • 【Pytorch图像+序列双输入网络源代码】
  • mac 触控板 三指拖动
  • 【软件工程】模块化思想概述
  • 线性模型到神经网络
  • PyCharm开发工具的安装和基础使用
  • JVM(HotSpot):字符串常量池(StringTable)
  • DK5V100R20ST1直插TO220F功率12V 3A同步整流芯片
  • 解决目标主机showmount -e信息泄露(CVE-1999-0554)
  • 开创远程就可以监测宠物健康新篇章
  • C++ | Leetcode C++题解之第432题全O(1)的数据结构
  • Centos 8安装VNC及多用户配置详细教程
  • java socket bio 改造为 netty nio
  • 【算法业务】基于Multi-Armed Bandits的个性化push文案自动优选算法实践
  • 电商搜索效率飞跃:阿里巴巴搜索API返回值的力量
  • 零工市场小程序如何提高找兼职的效率?
  • FFmpeg源码:avio_feof函数分析
  • 源代码保密技术的升级:模块化沙箱
  • 介绍Java中的反射并举例至少5个反射中常用的API-----Java基础相关面试题分享
  • 经典文献阅读之--WiROS(用于机器人的WiFi感知工具箱)
  • 百分点科技再获多项数据智能领域奖项