vue3之写一个aichat---实现聊天逻辑
乱七八糟的理思路
首先我们的已聊天界面长这样,可以看到消息是两部分组成的,一个是用户发出去的消息,一个是调用流式接口返回的消息,所以chatRecordList聊天记录列表就是下面的形式,role区分是ai助手消息还是用户消息;content为消息的内容;如果是AI助手消息还要记录消息的状态status: thinking(思考中) 、done(已完成回答)、 stop(停止思考)、 error(回答出错),isPlaying:后面要实现文字语音播放效果,用来记录是否正在播放;showThought: 是否显示思等待动效; id: 消息的唯一标识
chatRecordList.value = [
{
role: 'assistant',
content: '',
thought: '',
status: 'thinking',
isPlaying: false,
showThought: true,
products: [],
id: Date.now() + 1
},{
role: 'user',
content: value,
status: 'done',
id: Date.now()
}
]
stores/chat.js
管理聊天相关的状态和逻辑
import { defineStore } from 'pinia'
import { ref, computed } from 'vue'
import { ChatService } from '@/services/chat.service'
import api from '@/api'
export const useChatStore = defineStore(
'chat', () => {
const hasChatted = ref(false) //是否已聊天
const isFromRecommend = ref(false) //是否来自推荐
const isResponseStopped = ref(false) //是否停止响应
async function startChat(value, fromRecommend = false) {
isResponseStopped.value = false//开始响应
isFromRecommend.value = fromRecommend
await chatService.startChat(value)
}
function initAbortController() {
abortController.value = new AbortController()
console.log('initAbortController', abortController.value)
isResponseStopped.value = false
}
// 终止请求
function abortRequest() {
if (abortController.value) {
abortController.value.abort()
isResponseStopped.value = true
}
}
},
{
persist: {
key: 'chat-store',//本地存储的键名,所有与这个键相关联的数据都会被存储在这个键下
storage: localStorage,//指定浏览器的本地存储(localStorage)作为存储介质,数据会在会话之间持久存储,直至数据被明确删除,根据需要也可以选择存储到sessionStorage
paths: ['hasChatted', 'assistantMessageId', 'sessionId', 'isHistory', 'historyTime']//指定需要持久化的状态属性,只有这些属性会被存储到本地存储中
}
})
services/chat.service.js
定义一个名为ChatService的类,用于处理聊天功能,包含了多个方法: 生成会话ID、处理流数据、处理流式响应、处理事件数据、预处理JSON内容以及开始聊天
import api from '@/api'
import { useUserStore } from '@/stores/user'
import getCookie from '@/utils/getCookie'
export class ChatService {
constructor() {
this.store = null//存储stores/chat.js下面的变量和函数,以便在当前类中使用
this.currentEventType = null//当前事件类型
}
// 定义一个名为 setStore 的方法,用于设置 store 属性,在stores/chat.js中调用
setStore(store) {
// 将传入的 store 参数赋值给当前对象的 store 属性
this.store = store
}
async startChat(value) {
const userStore = useUserStore()
this.store.setRecommendedQuestions([])
this.store.initAbortController()//开始聊天的时候,初始化一个AbortController实列,用于取消请求
// 如果 store 或 chatRecordList 未初始化,则抛出错误
if (!this.store?.chatRecordList?.value) {
throw new Error('聊天服务未正确初始化')
}
try {
// 设置已开始聊天状态,将hasChatted设置为true,控制页面切换到已聊天组件
this.store.setHasChatted(true)
// 添加用户消息
this.store.addMessage({
role: 'user',
content: value,
status: 'done',
id: Date.now()
})
// 添加助手消息
const newAssistantMessageId = Date.now() + 1
this.store.setAssistantMessageId(newAssistantMessageId)
this.store.addMessage({
role: 'assistant',
content: '',
thought: '',
status: 'thinking',
isPlaying: false,
showThought: true,
products: [],
id: newAssistantMessageId
})
// 发送请求
const response = await api.chat.getFlowChat({
question: {
Question: value,
SessionId: this.store.sessionId.value,
UserId: userStore.hasLoggedIn ? userStore.userId : '',
UserCookie: getCookie('cookieUserName')
},
signal: this.store.abortController.signal//将AbortController的signal传递给请求,以便在需要时取消请求
})
if (!response.ok) {
throw new Error('网络请求失败')
}
// 处理响应
await this.processStreamResponse(response)
} catch (error) {
console.error('Chat error:', error)
const assistantMessage = this.store.chatRecordList.value.find(
msg => msg.id === this.store.assistantMessageId.value
)
if (assistantMessage) {
assistantMessage.status = 'error'
assistantMessage.content = ''
}
throw error
}
}
}
发起聊天的函数
先看发起聊天的函数,主要是是发送请求这里,封装的流式调用的接口如下
为什么使用fetch而不是封装好了axios?
1、流式数据支持
fetch的响应对象Response提供了原生的ReadableStream接口,允许通过流式逐块(chunk)读取数据,对于SEE这种余姚实时处理每个事件的场景,fetch可以监听流的data事件,实时解析每个分块的数据
axios是基于XMLHttpRequest(XHR)实现的,而XHR设计的目标是完整响应的获取,它会等待整个响应体接收完成后才触发回调(除非手动处理onpregress),虽然可以通过responseType: 'stream’获取流,但需要额外处理流的读取逻辑,且不如fetch的流式API直接。
2、数据解析的复杂性
SSE的协议要求
SSE的数据格式是多个\n\n分隔的事件块,每个事件可能包含data:、id:、event:等字段,需要实时解析这些字段,而axios默认的响应处理(如自动转换JSON)不适于这种流式数据处理。就算axios强制使用流模式,也需要自行实现缓冲区和事件解析逻辑,代码复杂度高
接收到的流数据大致长这样
export const getFlowChat = async (data, signal) => {
const response = await fetch(
'/chat/Events',
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream'
},
body: JSON.stringify(data),
signal,
credentials: 'include' // 包含 cookies
}
)
return response
}
处理流式响应
async processStreamResponse(response) {
//检查响应式对象是否有效
if (!response?.body) {
console.error('Invalid response object')
return
}
const reader = response.body.getReader()
const decoder = new TextDecoder('utf-8')
let buffer = ''
try {
while (true) {
// 检查是否需要停止响应
if (this.store?.isResponseStopped?.value) {
// 先取消 reader
await reader.cancel()
// 再终止请求
this.store.abortRequest()
// 最后更新消息状态
const currentAssistantMessage = this.store.chatRecordList?.value?.find(
msg => msg.id === this.store.assistantMessageId?.value
)
if (currentAssistantMessage) {
currentAssistantMessage.status = 'stop'
currentAssistantMessage.content = ''
currentAssistantMessage.thought = ''
currentAssistantMessage.showThought = false
}
break
}
const { done, value } = await reader.read()
// console.log("done", done, value);
if (done) break
const chunk = decoder.decode(value, { stream: true })
buffer += chunk
// 处理完整的事件块
while (buffer.includes('\n\n')) {
const eventBlockEnd = buffer.indexOf('\n\n')
const eventBlock = buffer.slice(0, eventBlockEnd)
buffer = buffer.slice(eventBlockEnd + 2)
// 如果事件块为空,直接跳过
if (!eventBlock.trim()) {
continue
}
// 解析事件块中的行
const lines = eventBlock.split('\n')
let currentEvent = null
let currentData = null
for (const line of lines) {
const trimmedLine = line.trim()
if (!trimmedLine) continue
if (line.startsWith('event:')) {
currentEvent = line.slice(6).trim()
} else if (line.startsWith('data:')) {
currentData = line.slice(5).trim()
// 只有当事件和数据都存在时才处理
if (currentEvent && currentData) {
try {
await this.processEventData(currentEvent, currentData)
} catch (e) {
console.warn(`处理事件数据失败: ${currentEvent}`, e)
}
}
}
}
}
}
} catch (error) {
console.error('Stream processing error:', error)
} finally {
try {
await reader.cancel()
} catch (e) {
console.warn('Error cancelling reader:', e)
}
}
}
初始化检查与准备
首先检查响应对象是否有效,即是否存在response.body,接着创建ReadableStream可读流的读取器(reader)和UT-8解码器(docoder),初始化buffer字符串,用于暂存未处理的流数据块
async processStreamResponse(response) {
if (!response?.body) {
console.error('Invalid response object');
return;
}
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
主循环与中断处理
主动中断:通过store.isResponseStopped标志判断用户是否要求停止响应,若需停止:
1、调用reader.cancel()终止流读取;
2、调用 this.store.abortRequest()(对应stores/chat.js中的abortRequest())方法终止底层请求;
3、修改当前消息状态为stop,并清空消息内容
触发场景是,当正在生成内容的时候,不想要了,那么手动点击“停止按钮”
流读取:通过reader.read()逐块读取数据,直到done为true为止
解码分块:使用TextDecoder解码二进制数据,{ stream: true }允许保留不完整字符供后续拼接
try {
while (true) {
// 检查是否需要停止响应
if (this.store?.isResponseStopped?.value) {
await reader.cancel();
this.store.abortRequest();
// 更新消息状态
const currentAssistantMessage = this.store.chatRecordList?.value?.find(
msg => msg.id === this.store.assistantMessageId?.value
);
if (currentAssistantMessage) {
currentAssistantMessage.status = 'stop';
currentAssistantMessage.content = '';
currentAssistantMessage.thought = '';
currentAssistantMessage.showThought = false;
}
break;
}
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
buffer += chunk;
事件块逻辑处理
分割事件块:通过\n\n分隔符识别完整事件块,从buffer中提前并移除已处理部分
空块跳过:忽略空白事件块
逐行解析:拆分事件块为行,解析event:和data:字段
条件处理:仅当event和data同时存在时调用processEventData
// 处理完整的事件块
while (buffer.includes('\n\n')) {
const eventBlockEnd = buffer.indexOf('\n\n');//查找当前缓冲区第一个出现的两个连续的换行符的位置,从而确定当前可以处理的事件块范围
const eventBlock = buffer.slice(0, eventBlockEnd);//使用slice方法从缓冲区的开头到找到第一个\n\n标记之间的内容提取出来,作为完整的事件块进行处理,这确保了每次循环值处理一个完整的事件块,而不是部分或者多个事件块混合在一起
buffer = buffer.slice(eventBlockEnd + 2);//在处理完当前事件块之后,需要更新缓冲区,移除已处理的部分,这里+2是因为要跳过两个换行符(\n\n),从而让缓冲区指向下一段未处理的数据,这样做是为了准备下一次循环,继续处理下一个事件块
if (!eventBlock.trim()) continue;//事件块为空跳过处理
const lines = eventBlock.split('\n');//将事件块按行分割成数组,每行代表一个键值对(如event:或data:)
let currentEvent = null;
let currentData = null;
//遍历行,根据每行的前缀提取相应的事件名称和数据,并在同时获取事件名和数据
for (const line of lines) {
const trimmedLine = line.trim();// 去除行首行尾的空白字符
if (!trimmedLine) continue;//跳过空行
if (line.startsWith('event:')) {
currentEvent = line.slice(6).trim();//提取事件名称
} else if (line.startsWith('data:')) {
currentData = line.slice(5).trim();//提取数据内容
// 仅在 event 和 data 同时存在时处理
if (currentEvent && currentData) {
try {
await this.processEventData(currentEvent, currentData);// 处理事件和数据
} catch (e) {
console.warn(`处理事件数据失败: ${currentEvent}`, e);
}
}
}
}
}
处理事件数据
首先检查chatRecordList是否存在,接着通过preprocessJsonContent处理数据,主要是将特殊的字符进行转义,然后在chatRecordList找出当前的ai助手消息(是不是直接取数组最后一个也可以?),然后根据eventType分别处理parsedData
async processEventData(eventType, data) {
try {
// 确保 store 和 chatRecordList 存在
if (!this.store?.chatRecordList?.value) {
console.error('Store or chatRecordList is not initialized')
return
}
let processedJson = this.preprocessJsonContent(data)
const parsedData = JSON.parse(processedJson)// 解析 JSON 数据转换为javascript对象
// 找到当前的助手消息
const assistantMessage = this.store.chatRecordList.value.find(
msg => msg.id === this.store.assistantMessageId?.value
)
if (!assistantMessage) {
console.error('Assistant message not found')
return
}
switch (eventType) {
case 'message':
if (data.includes('"Content":"')) {
// 根据 Type 更新消息状态和内容
switch (parsedData.Type) {
case 'reply':
if (parsedData.Content) {
assistantMessage.content = parsedData.Content.replace(/\\n/g, '\n')
}
if (parsedData.isFinal) {
assistantMessage.status = 'done'
// 回答完成时自动收起 thought
assistantMessage.showThought = false
} else {
assistantMessage.status = 'typing'
}
break
case 'thought':
if (!parsedData.isFinal) {
// 只在非完成状态时更新 thought
assistantMessage.status = 'thinking'
if (parsedData.Content) {
assistantMessage.thought = parsedData.Content.replace(/\\n/g, '\n')
assistantMessage.showThought = true
}
}
break
}
}
break
case 'product':
assistantMessage.products = parsedData
break
case 'question':
if (this.store?.setRecommendedQuestions) {
// 确保问题数据是数组格式
const questions = Array.isArray(parsedData) ? parsedData : [parsedData]
this.store.setRecommendedQuestions(questions)
}
break
default:
console.log(`未处理的事件类型: ${eventType}`)
}
} catch (e) {
console.warn(`处理事件数据失败: ${eventType}`, e)
// 不要抛出错误,让流程继续
}
}
preprocessJsonContent(jsonContent) {
const contentMatch = jsonContent.match(/"Content":"([\s\S]*?)(?="}$)/)//正则匹配"Content":"格式的字符串,并提取其中的内容部分
if (contentMatch) {
const content = contentMatch[1]//提取的内容部分
const escapedContent = content
.replace(/\n/g, '\\n')
.replace(/\r/g, '\\r')
.replace(/\t/g, '\\t')
.replace(/\\/g, '\\\\')
.replace(/"/g, '\\"')//将内容中的特殊字符进行转义,替换换行符、回车符、制表符反斜杠、双引号为相应的转义序列,以便JSON解析
return jsonContent.replace(content, escapedContent)//将转义后的内容替换回原始JSON字符串中
}
return jsonContent//如果匹配不到,则返回原始JSON字符串
}
总结:通过TextDecoder API将接收到的二进制数据转换为UTF-8编码文本,通过正则匹配并替换JSON字符串中的特殊字符,确保其能正常的解析,结合pi’nia来实现管理和同步应用状态