Whisper 音视频转写
Whisper 音视频转写 API 接口文档
api.py
import os
import shutil
import socket
import torch
import whisper
from moviepy.editor import VideoFileClip
import opencc
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request
from fastapi.responses import JSONResponse
from typing import Optional
from fastapi.staticfiles import StaticFiles
app = FastAPI(
title="Whisper 音视频转写 API",
description="基于 OpenAI Whisper 模型的音视频转写服务,支持上传文件或使用服务器上的文件生成字幕。",
version="1.0.0"
)
# 挂载静态目录,用于提供文件下载
app.mount("/static", StaticFiles(directory="/media/ubuntu/SOFT/whisper_test"), name="static")
# 支持的文件扩展名
ALLOWED_EXTENSIONS = {'mp3', 'wav', 'mp4', 'avi', 'mov'}
UPLOAD_DIR = "/media/ubuntu/SOFT/whisper_test/uploads"
# 检查文件扩展名是否允许
def allowed_file(filename: str):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# 格式化时间戳为 SRT 格式
def format_timestamp(seconds: float) -> str:
milliseconds = int(seconds * 1000)
hours = milliseconds // (1000 * 60 * 60)
minutes = (milliseconds // (1000 * 60)) % 60
seconds = (milliseconds // 1000) % 60
milliseconds = milliseconds % 1000
return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"
# 生成 SRT 文件内容
def generate_srt(transcription_segments) -> str:
srt_content = ""
converter = opencc.OpenCC('t2s') # 繁体转简体
for i, segment in enumerate(transcription_segments):
start_time = format_timestamp(segment['start']) # 获取开始时间戳
end_time = format_timestamp(segment['end']) # 获取结束时间戳
text = converter.convert(segment['text'].strip()) # 繁体转简体
srt_content += f"{i+1}\n{start_time} --> {end_time}\n{text}\n\n"
return srt_content
# 处理音频文件并生成 SRT 文件,返回转录文本
def transcribe_audio_to_srt(audio_path: str, srt_path: str, model_name="tiny"):
device = "cuda" if torch.cuda.is_available() else "cpu" # 判断是否使用 GPU
model = whisper.load_model(model_name).to(device) # 加载模型
result = model.transcribe(audio_path, language="zh") # 转录音频
print("当前模型:",model_name,"转录内容:",result["text"],"\n")
srt_content = generate_srt(result['segments']) # 生成 SRT 文件内容
with open(srt_path, "w", encoding="utf-8") as srt_file:
srt_file.write(srt_content) # 将内容写入 SRT 文件
return result["text"] # 返回转录的文本内容
# 从视频中提取音频
def extract_audio_from_video(video_path: str, audio_path: str):
video_clip = VideoFileClip(video_path) # 读取视频文件
audio_clip = video_clip.audio # 获取音频
audio_clip.write_audiofile(audio_path, codec='libmp3lame', bitrate="192k") # 保存为 MP3
audio_clip.close() # 关闭音频文件
video_clip.close() # 关闭视频文件
# 处理单个音频或视频文件,生成 SRT 文件,并保留相对目录结构
def process_file_with_structure(file_path: str, input_dir: str, output_dir: str, model_name="tiny"):
# 生成相对路径,保持输入和输出目录结构一致
rel_path = os.path.relpath(file_path, input_dir)
output_srt_dir = os.path.join(output_dir, os.path.dirname(rel_path))
os.makedirs(output_srt_dir, exist_ok=True) # 创建对应的输出目录
srt_output_path = os.path.join(output_srt_dir, os.path.splitext(os.path.basename(file_path))[0] + ".srt") # 生成 SRT 文件路径
if file_path.lower().endswith((".mp3", ".wav")): # 如果是音频文件
text_content = transcribe_audio_to_srt(file_path, srt_output_path, model_name) # 直接处理音频并返回转录文本
elif file_path.lower().endswith((".mp4", ".avi", ".mov")): # 如果是视频文件
audio_path = os.path.join(output_srt_dir, os.path.splitext(os.path.basename(file_path))[0] + "_audio.mp3")
extract_audio_from_video(file_path, audio_path) # 提取音频
text_content = transcribe_audio_to_srt(audio_path, srt_output_path, model_name) # 处理提取的音频并返回转录文本
os.remove(audio_path) # 删除临时音频文件
return srt_output_path, text_content # 返回 SRT 文件路径和转录文本
# 遍历目录并处理所有音视频文件,保持目录结构
def process_directory_with_structure(input_dir: str, output_dir: str, model_name="tiny"):
srt_files = []
for root, _, files in os.walk(input_dir):
for file in files:
if allowed_file(file):
file_path = os.path.join(root, file)
srt_output_path, text_content = process_file_with_structure(file_path, input_dir, output_dir, model_name)
srt_files.append((srt_output_path, text_content))
return srt_files
# 获取局域网 IP 地址
def get_local_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80)) # Google Public DNS
ip = s.getsockname()[0]
s.close()
return ip
# 处理服务器上的文件和目录
@app.post("/transcribe_server/", summary="处理服务器上的目录或文件生成字幕文件", description="通过指定服务器的目录或文件路径,生成字幕文件。")
async def transcribe_server(
request: Request,
model: Optional[str] = Form("tiny"),
input: str = Form(..., description="输入的服务器目录或文件路径"),
output: Optional[str] = Form(None, description="输出目录路径。如果未指定,则默认在输入路径下创建'srt'文件夹。")
):
"""
处理服务器上的目录或文件,生成字幕文件。
"""
input_path = input
output_path = output
if not os.path.exists(input_path):
raise HTTPException(status_code=400, detail="输入路径不存在")
# 如果是目录
if os.path.isdir(input_path):
if not output_path:
output_path = os.path.join(input_path, "srt") # 默认在输入路径下创建 srt 文件夹
srt_files = process_directory_with_structure(input_path, output_path, model)
# 创建下载链接
local_ip = get_local_ip() # 获取局域网 IP 地址
download_links = [f"http://{local_ip}:5001/static/{os.path.relpath(srt[0], '/media/ubuntu/SOFT/whisper_test')}" for srt in srt_files]
return JSONResponse(content={
"input": input_path,
"output": output_path,
"srt_files": [srt[0] for srt in srt_files],
"transcripts": [srt[1] for srt in srt_files],
"download_links": download_links
})
# 如果是文件
elif os.path.isfile(input_path):
if not output_path:
output_path = os.path.join(os.path.dirname(input_path), "srt") # 默认在输入文件所在目录下创建 srt 文件夹
srt_file, text_content = process_file_with_structure(input_path, os.path.dirname(input_path), output_path, model)
# 创建下载链接
local_ip = get_local_ip() # 获取局域网 IP 地址
srt_download_link = f"http://{local_ip}:5001/static/{os.path.relpath(srt_file, '/media/ubuntu/SOFT/whisper_test')}"
return JSONResponse(content={
"input": input_path,
"output": output_path,
"srt_file": srt_file,
"content": text_content,
"download_link": srt_download_link
})
else:
raise HTTPException(status_code=400, detail="输入路径无效:不是有效的文件或目录")
# 处理客户端上传的文件,生成 SRT 文件并返回下载链接和文本内容
@app.post("/transcribe_client/", summary="处理客户端上传的文件生成字幕文件", description="上传客户端的文件,生成 SRT 文件,并返回下载链接和转录内容。")
async def transcribe_client(
request: Request,
model: Optional[str] = Form("tiny"),
input_file: UploadFile = File(..., description="客户端上传的文件")
):
"""
处理客户端上传的文件,生成字幕文件,并返回生成的 SRT 文件路径和转录文本。
"""
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR) # 确保临时目录存在
# 将上传的文件保存到服务器的临时目录
file_location = os.path.join(UPLOAD_DIR, input_file.filename)
with open(file_location, "wb") as f:
shutil.copyfileobj(input_file.file, f)
input_path = file_location # 使用上传的文件路径作为输入路径
if os.path.isfile(input_path):
output_path = os.path.join(UPLOAD_DIR, "srt")
srt_file, text_content = process_file_with_structure(input_path, UPLOAD_DIR, output_path, model)
print("srt_file:",srt_file)
# 返回下载链接和转录文本
local_ip = get_local_ip() # 获取局域网 IP 地址
srt_download_link = f"http://{local_ip}:5001/static/{os.path.relpath(srt_file, '/media/ubuntu/SOFT/whisper_test')}"
print("srt_download_link",srt_download_link)
print("",os.path.relpath(srt_file, '/media/ubuntu/SOFT/whisper_test/srt'))
return JSONResponse(content={
"input": input_path,
"output": output_path,
"srt_file": srt_file,
"content": text_content, # 返回转录的文本内容
"download_link": srt_download_link # 返回生成 SRT 文件的下载链接
})
raise HTTPException(status_code=400, detail="上传的文件无效,必须是音频或视频文件。")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=5001)
项目简介
基于 OpenAI Whisper 模型的音视频转写服务,支持上传文件或使用服务器上的文件生成字幕。该 API 提供了处理音频和视频文件的能力,并将其转录为 SRT 字幕文件。
运行环境
- Python 3.x
- FastAPI
- torch
- whisper
- moviepy
- opencc
安装依赖
在运行该项目之前,请确保安装以下依赖:
pip install fastapi[all] torch moviepy opencc-python-reimplemented
启动服务器
在项目根目录下运行以下命令启动 FastAPI 服务器:
uvicorn main:app --host 0.0.0.0 --port 5001 --reload
接口列表
1. /transcribe_server/
描述:处理服务器上的目录或文件,生成字幕文件。
请求方法:POST
请求参数:
参数 | 类型 | 必填 | 描述 |
---|---|---|---|
model | string | 否 | 使用的 Whisper 模型,默认为 tiny |
input | string | 是 | 输入的服务器目录或文件路径 |
output | string | 否 | 输出目录路径,默认为输入路径下创建 srt 文件夹 |
返回示例:
{
"input": "/path/to/server/directory",
"output": "/path/to/server/directory/srt",
"srt_files": ["/path/to/server/directory/srt/file1.srt"],
"transcripts": ["转录内容"],
"download_links": ["http://192.168.1.1:5001/static/file1.srt"]
}
2. /transcribe_client/
描述:处理客户端上传的文件,生成字幕文件。
请求方法:POST
请求参数:
参数 | 类型 | 必填 | 描述 |
---|---|---|---|
model | string | 否 | 使用的 Whisper 模型,默认为 tiny |
input_file | UploadFile | 是 | 客户端上传的音频或视频文件 |
返回示例:
{
"input": "/media/ubuntu/SOFT/whisper_test/uploads/example.wav",
"output": "/media/ubuntu/SOFT/whisper_test/uploads/srt",
"srt_file": "/media/ubuntu/SOFT/whisper_test/uploads/srt/example.srt",
"content": "转录后的文本内容",
"download_link": "http://192.168.1.1:5001/static/example.srt"
}
接口调用示例
使用 Python 调用接口
import requests
# 调用 /transcribe_server 接口
response = requests.post("http://192.168.1.1:5001/transcribe_server/", data={
"model": "tiny",
"input": "/path/to/server/directory"
})
print(response.json())
# 调用 /transcribe_client 接口
files = {'input_file': open('C:/path/to/your/example.wav', 'rb')}
response = requests.post("http://192.168.1.1:5001/transcribe_client/", files=files, data={"model": "tiny"})
print(response.json())
使用 cURL 测试接口
调用 /transcribe_server/
curl -X POST "http://192.168.1.1:5001/transcribe_server/" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "model=tiny&input=/path/to/server/directory"
调用 /transcribe_client/
curl -X POST "http://192.168.1.1:5001/transcribe_client/" \
-F "model=tiny" \
-F "input_file=@C:/path/to/your/example.wav"
使用 Postman 测试接口
- 打开 Postman,创建一个新的请求。
- 设置请求方法为
POST
。 - 输入请求 URL,例如
http://192.168.1.1:5001/transcribe_server/
或http://192.168.1.1:5001/transcribe_client/
。 - 在
Body
选项中,选择form-data
:- 对于
/transcribe_server/
:- 添加字段
model
(可选),值为tiny
。 - 添加字段
input
(必填),值为服务器上的目录路径。
- 添加字段
- 对于
/transcribe_client/
:- 添加字段
model
(可选),值为tiny
。 - 添加字段
input_file
(必填),值为上传的音频或视频文件。
- 添加字段
- 对于
- 点击
Send
发送请求,查看返回结果。
注意事项
- 确保输入的目录或文件路径正确。
- 上传的文件类型必须为支持的音频或视频格式(mp3, wav, mp4, avi, mov)。
- 下载链接将在响应中返回,确保使用正确的局域网 IP 地址进行访问。