Fetch处理大模型流式数据请求与解析
为什么有的大模型可以一次返回多个
data
?
Server-Sent Events (SSE):允许服务器连续发送多个 data: 行,每个代表一个独立的数据块。
流式响应:大模型服务通常以流式响应方式返回数据,提高响应速度。
批量处理:服务器可能将多个数据块打包发送,减少网络开销。
一、问题分析
从上图中只能简易看出这种数据特点前两个特点:
- 分段返回
- 每段有多条类似json数据
- 每段的最后一条json数据有可能不完整
二、解决方案
/**
* 异步函数 fetchEventGpt,用于向服务器发送获取 GPT 数据的请求
*
* @param data 要发送的数据对象
* @returns 返回响应体内容
*/
const fetchEventGpt = async (data: any) => {
const response = await fetch('/getGPT', {
method: 'Post',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data),
})
// 等待直到请求完成
console.log(response)
return response.body
}
/**
* 异步解析流数据并返回 Response 对象。
*
* 该函数通过 fetchEventGpt 发送请求,获取 ReadableStream 流数据,然后通过 ReadableStream API 进行解析。
* 解析过程中,会将流数据分片并尝试解析 JSON 对象,处理可识别内容,并将结果通过 controller.enqueue 返回。
* 如果解析结束,会清空缓存区并关闭连接。
*
* @return {Response} Response 对象,包含解析后的流数据
*/
const parseStream = async () => {
const response = await fetchEventGpt({
messages: [
{
role: 'user',
content: '核能',
},
{
role: 'assistant',
content:
'<br/>根据您提供的文章内容,以下是对“核能”的相关信息的整理:<br/><br/>1. 知识产权归属 :本文件及其附件的知识产权属于核电运行研究(上海)有限公司和中核()供应链管理有限公司,未经书面许可不得擅自使用。<br/><br/>2. 民用核设施事故责任 :根据《民法典》第一千二百三十七条,民用核设施或运入运出核设施的核材料发生事故造成他人损害的,营运单位应承担侵权责任。但是,如果损害是由战争、武装冲突、暴乱等情形或者受害人故意造成的,则营运单位不承担责任。<br/><br/>3. 招标代理机构招标人 :本次招标的代理机构为中核(上海)供应链管理有限公司,招标人为核电运行研究(上海)有限公司。<br/><br/>4. 招标投标系统检测 :根据第九条,实验室应按照《招标投标系统检测技术规范》等进行检测,检测内容包括数据项、业务规则、功能、接口、性能、安全性、可靠性、易用性、运行环境等,并对相关文档进行审核。<br/><br/>5. 招标项目目标 :项目的目标是突破微波干燥处理核电厂放射性废物的关键技术,研制微波干燥成套装备,解决核电厂放射性废物处理难题。<br/><br/>6. 投标文件提交方式 :电子投标文件应通过中核集团电子采购平台提交,纸质版投标文件提交地点为中核(上海)供应链管理有限公司。<br/><br/>以上内容涉及核能相关的法律法规、招标流程、技术规范等方面,希望能对您。 ',
},
{
role: 'user',
content: '核能',
},
],
classify: '',
}).catch(() => {
return new Error('请求失败')
})
if (!(response instanceof ReadableStream)) {
return new Error('请求失败')
}
const reader = response.getReader() as any
const decoder = new TextDecoder('utf-8')
const encoder = new TextEncoder()
let jsonBuffer = ''
const readableStream = new ReadableStream({
async start(controller) {
function push() {
reader
.read()
.then(({ done, value }: any) => {
if (done) {
controller.close()
return
}
// 1、流返回的块数据
const chunk = decoder.decode(value, { stream: true })
// 2、更新到缓存区
jsonBuffer += chunk
// 3、尝试分片解析json
let boundaryIndex = 0
// 当前片内容
let result = ''
while ((boundaryIndex = jsonBuffer.indexOf('\n')) >= 0) {
// 3.1 数据块切片
const jsonString = jsonBuffer.slice(0, boundaryIndex)
// 3.2 更新缓存区
jsonBuffer = jsonBuffer.slice(boundaryIndex + 2)
try {
const jsonStr = jsonString.replace('data:', '')
const jsonObject = JSON.parse(jsonStr) // 解析 JSON
// 处理可识别内容 - 伪代码,根据实际对象处理
const content = jsonObject?.data?.content
controller.enqueue(encoder.encode(content))
// 解析结束 - 我们业务是根据此字段标识,根据实际情况调整
if (jsonObject?.data?.isEnd === true) {
// 清空缓存区
jsonBuffer = ''
break
}
} catch (error) {
console.log('json解析出错', error)
}
}
if (jsonBuffer) {
try {
const jsonObject = JSON.parse(jsonBuffer)
} catch (error) {
console.log(error)
}
}
push()
})
.catch((err: any) => {
controller.error(err)
})
}
push()
},
})
return new Response(readableStream)
}