当前位置: 首页 > article >正文

vue3之写一个aichat---实现聊天逻辑

乱七八糟的理思路

首先我们的已聊天界面长这样,可以看到消息是两部分组成的,一个是用户发出去的消息,一个是调用流式接口返回的消息,所以chatRecordList聊天记录列表就是下面的形式,role区分是ai助手消息还是用户消息;content为消息的内容;如果是AI助手消息还要记录消息的状态status: thinking(思考中) 、done(已完成回答)、 stop(停止思考)、 error(回答出错),isPlaying:后面要实现文字语音播放效果,用来记录是否正在播放;showThought: 是否显示思等待动效; id: 消息的唯一标识

chatRecordList.value = [
	{
        role: 'assistant',
        content: '',
        thought: '',
        status: 'thinking',
        isPlaying: false,
        showThought: true,
        products: [],
        id: Date.now() + 1
   },{
        role: 'user',
        content: value,
        status: 'done',
        id: Date.now()
 }
]

在这里插入图片描述

stores/chat.js

管理聊天相关的状态和逻辑

import { defineStore } from 'pinia'
import { ref, computed } from 'vue'
import { ChatService } from '@/services/chat.service'
import api from '@/api'
export const useChatStore = defineStore(
  'chat',  () => {
   	   const hasChatted = ref(false) //是否已聊天
       const isFromRecommend = ref(false) //是否来自推荐
       const isResponseStopped = ref(false) //是否停止响应
       
       async function startChat(value, fromRecommend = false) {
	      isResponseStopped.value = false//开始响应
	      isFromRecommend.value = fromRecommend
	      await chatService.startChat(value)
      }
     function initAbortController() {
      abortController.value = new AbortController()
      console.log('initAbortController', abortController.value)
      isResponseStopped.value = false
    }

    // 终止请求
    function abortRequest() {
      if (abortController.value) {
        abortController.value.abort()
        isResponseStopped.value = true
      }
    }
    

  },  
  {
    persist: {
      key: 'chat-store',//本地存储的键名,所有与这个键相关联的数据都会被存储在这个键下
      storage: localStorage,//指定浏览器的本地存储(localStorage)作为存储介质,数据会在会话之间持久存储,直至数据被明确删除,根据需要也可以选择存储到sessionStorage
      paths: ['hasChatted', 'assistantMessageId', 'sessionId', 'isHistory', 'historyTime']//指定需要持久化的状态属性,只有这些属性会被存储到本地存储中
    }
  })

services/chat.service.js

定义一个名为ChatService的类,用于处理聊天功能,包含了多个方法: 生成会话ID、处理流数据、处理流式响应、处理事件数据、预处理JSON内容以及开始聊天

import api from '@/api'
import { useUserStore } from '@/stores/user'
import getCookie from '@/utils/getCookie'
export class ChatService {
  constructor() {
    this.store = null//存储stores/chat.js下面的变量和函数,以便在当前类中使用
    this.currentEventType = null//当前事件类型
  }

  // 定义一个名为 setStore 的方法,用于设置 store 属性,在stores/chat.js中调用
  setStore(store) {
    // 将传入的 store 参数赋值给当前对象的 store 属性
    this.store = store
  }
    async startChat(value) {
    const userStore = useUserStore()
    this.store.setRecommendedQuestions([])
    this.store.initAbortController()//开始聊天的时候,初始化一个AbortController实列,用于取消请求
    // 如果 store 或 chatRecordList 未初始化,则抛出错误
    if (!this.store?.chatRecordList?.value) {
      throw new Error('聊天服务未正确初始化')
    }

    try {
      // 设置已开始聊天状态,将hasChatted设置为true,控制页面切换到已聊天组件
      this.store.setHasChatted(true)

      // 添加用户消息
      this.store.addMessage({
        role: 'user',
        content: value,
        status: 'done',
        id: Date.now()
      })

      // 添加助手消息
      const newAssistantMessageId = Date.now() + 1
      this.store.setAssistantMessageId(newAssistantMessageId)
      this.store.addMessage({
        role: 'assistant',
        content: '',
        thought: '',
        status: 'thinking',
        isPlaying: false,
        showThought: true,
        products: [],
        id: newAssistantMessageId
      })

      // 发送请求
      const response = await api.chat.getFlowChat({
        question: {
          Question: value,
          SessionId: this.store.sessionId.value,
          UserId: userStore.hasLoggedIn ? userStore.userId : '',
          UserCookie: getCookie('cookieUserName')
        },
        signal: this.store.abortController.signal//将AbortController的signal传递给请求,以便在需要时取消请求
      })

      if (!response.ok) {
        throw new Error('网络请求失败')
      }

      // 处理响应
      await this.processStreamResponse(response)
    } catch (error) {
      console.error('Chat error:', error)
      const assistantMessage = this.store.chatRecordList.value.find(
        msg => msg.id === this.store.assistantMessageId.value
      )
      if (assistantMessage) {
        assistantMessage.status = 'error'
        assistantMessage.content = ''
      }
      throw error
    }
  }
}

发起聊天的函数

先看发起聊天的函数,主要是是发送请求这里,封装的流式调用的接口如下
为什么使用fetch而不是封装好了axios?
1、流式数据支持
fetch的响应对象Response提供了原生的ReadableStream接口,允许通过流式逐块(chunk)读取数据,对于SEE这种余姚实时处理每个事件的场景,fetch可以监听流的data事件,实时解析每个分块的数据
axios是基于XMLHttpRequest(XHR)实现的,而XHR设计的目标是完整响应的获取,它会等待整个响应体接收完成后才触发回调(除非手动处理onpregress),虽然可以通过responseType: 'stream’获取流,但需要额外处理流的读取逻辑,且不如fetch的流式API直接。
2、数据解析的复杂性
SSE的协议要求
SSE的数据格式是多个\n\n分隔的事件块,每个事件可能包含data:、id:、event:等字段,需要实时解析这些字段,而axios默认的响应处理(如自动转换JSON)不适于这种流式数据处理。就算axios强制使用流模式,也需要自行实现缓冲区和事件解析逻辑,代码复杂度高
接收到的流数据大致长这样
在这里插入图片描述


export const getFlowChat = async (data, signal) => {
  const response = await fetch(
   '/chat/Events',
    {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        Accept: 'text/event-stream'
      },
      body: JSON.stringify(data),
      signal,
      credentials: 'include' // 包含 cookies
    }
  )
  return response
}

处理流式响应

  async processStreamResponse(response) {
  	//检查响应式对象是否有效
    if (!response?.body) {
      console.error('Invalid response object')
      return
    }

    const reader = response.body.getReader()
    const decoder = new TextDecoder('utf-8')
    let buffer = ''

    try {
      while (true) {
        // 检查是否需要停止响应
        if (this.store?.isResponseStopped?.value) {
          // 先取消 reader
          await reader.cancel()
          // 再终止请求
          this.store.abortRequest()
          // 最后更新消息状态
          const currentAssistantMessage = this.store.chatRecordList?.value?.find(
            msg => msg.id === this.store.assistantMessageId?.value
          )
          if (currentAssistantMessage) {
            currentAssistantMessage.status = 'stop'
            currentAssistantMessage.content = ''
            currentAssistantMessage.thought = ''
            currentAssistantMessage.showThought = false
          }
          break
        }

        const { done, value } = await reader.read()
        // console.log("done", done, value);
        if (done) break

        const chunk = decoder.decode(value, { stream: true })
        buffer += chunk

        // 处理完整的事件块
        while (buffer.includes('\n\n')) {
          const eventBlockEnd = buffer.indexOf('\n\n')
          const eventBlock = buffer.slice(0, eventBlockEnd)
          buffer = buffer.slice(eventBlockEnd + 2)

          // 如果事件块为空,直接跳过
          if (!eventBlock.trim()) {
            continue
          }

          // 解析事件块中的行
          const lines = eventBlock.split('\n')
          let currentEvent = null
          let currentData = null

          for (const line of lines) {
            const trimmedLine = line.trim()
            if (!trimmedLine) continue

            if (line.startsWith('event:')) {
              currentEvent = line.slice(6).trim()
            } else if (line.startsWith('data:')) {
              currentData = line.slice(5).trim()
              // 只有当事件和数据都存在时才处理
              if (currentEvent && currentData) {
                try {
                  await this.processEventData(currentEvent, currentData)
                } catch (e) {
                  console.warn(`处理事件数据失败: ${currentEvent}`, e)
                }
              }
            }
          }
        }
      }
    } catch (error) {
      console.error('Stream processing error:', error)
    } finally {
      try {
        await reader.cancel()
      } catch (e) {
        console.warn('Error cancelling reader:', e)
      }
    }
  }

初始化检查与准备
首先检查响应对象是否有效,即是否存在response.body,接着创建ReadableStream可读流的读取器(reader)和UT-8解码器(docoder),初始化buffer字符串,用于暂存未处理的流数据块

async processStreamResponse(response) {
  if (!response?.body) {
    console.error('Invalid response object');
    return;
  }

  const reader = response.body.getReader();
  const decoder = new TextDecoder('utf-8');
  let buffer = '';

主循环与中断处理
主动中断通过store.isResponseStopped标志判断用户是否要求停止响应,若需停止:
1、调用reader.cancel()终止流读取;
2、调用 this.store.abortRequest()(对应stores/chat.js中的abortRequest())方法终止底层请求;
3、修改当前消息状态为stop,并清空消息内容
触发场景是,当正在生成内容的时候,不想要了,那么手动点击“停止按钮”
流读取:通过reader.read()逐块读取数据,直到done为true为止
解码分块:使用TextDecoder解码二进制数据,{ stream: true }允许保留不完整字符供后续拼接

try {
  while (true) {
    // 检查是否需要停止响应
    if (this.store?.isResponseStopped?.value) {
      await reader.cancel();
      this.store.abortRequest();
      // 更新消息状态
      const currentAssistantMessage = this.store.chatRecordList?.value?.find(
        msg => msg.id === this.store.assistantMessageId?.value
      );
      if (currentAssistantMessage) {
        currentAssistantMessage.status = 'stop';
        currentAssistantMessage.content = '';
        currentAssistantMessage.thought = '';
        currentAssistantMessage.showThought = false;
      }
      break;
    }

    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value, { stream: true });
    buffer += chunk;

事件块逻辑处理
分割事件块:通过\n\n分隔符识别完整事件块,从buffer中提前并移除已处理部分
空块跳过:忽略空白事件块
逐行解析:拆分事件块为行,解析event:和data:字段
条件处理:仅当event和data同时存在时调用processEventData

// 处理完整的事件块
while (buffer.includes('\n\n')) {
  const eventBlockEnd = buffer.indexOf('\n\n');//查找当前缓冲区第一个出现的两个连续的换行符的位置,从而确定当前可以处理的事件块范围
  const eventBlock = buffer.slice(0, eventBlockEnd);//使用slice方法从缓冲区的开头到找到第一个\n\n标记之间的内容提取出来,作为完整的事件块进行处理,这确保了每次循环值处理一个完整的事件块,而不是部分或者多个事件块混合在一起
  buffer = buffer.slice(eventBlockEnd + 2);//在处理完当前事件块之后,需要更新缓冲区,移除已处理的部分,这里+2是因为要跳过两个换行符(\n\n),从而让缓冲区指向下一段未处理的数据,这样做是为了准备下一次循环,继续处理下一个事件块

  if (!eventBlock.trim()) continue;//事件块为空跳过处理

  const lines = eventBlock.split('\n');//将事件块按行分割成数组,每行代表一个键值对(如event:或data:)
  let currentEvent = null;
  let currentData = null;
//遍历行,根据每行的前缀提取相应的事件名称和数据,并在同时获取事件名和数据
  for (const line of lines) {
    const trimmedLine = line.trim();// 去除行首行尾的空白字符
    if (!trimmedLine) continue;//跳过空行

    if (line.startsWith('event:')) {
      currentEvent = line.slice(6).trim();//提取事件名称
    } else if (line.startsWith('data:')) {
      currentData = line.slice(5).trim();//提取数据内容
      // 仅在 event 和 data 同时存在时处理
      if (currentEvent && currentData) {
        try {
          await this.processEventData(currentEvent, currentData);// 处理事件和数据
        } catch (e) {
          console.warn(`处理事件数据失败: ${currentEvent}`, e);
        }
      }
    }
  }
}

处理事件数据

首先检查chatRecordList是否存在,接着通过preprocessJsonContent处理数据,主要是将特殊的字符进行转义,然后在chatRecordList找出当前的ai助手消息(是不是直接取数组最后一个也可以?),然后根据eventType分别处理parsedData

  async processEventData(eventType, data) {
    try {
      // 确保 store 和 chatRecordList 存在
      if (!this.store?.chatRecordList?.value) {
        console.error('Store or chatRecordList is not initialized')
        return
      }

      let processedJson = this.preprocessJsonContent(data)
      const parsedData = JSON.parse(processedJson)// 解析 JSON 数据转换为javascript对象


      // 找到当前的助手消息
      const assistantMessage = this.store.chatRecordList.value.find(
        msg => msg.id === this.store.assistantMessageId?.value
      )

      if (!assistantMessage) {
        console.error('Assistant message not found')
        return
      }

      switch (eventType) {
        case 'message':
          if (data.includes('"Content":"')) {
            // 根据 Type 更新消息状态和内容
            switch (parsedData.Type) {
              case 'reply':
                if (parsedData.Content) {
                  assistantMessage.content = parsedData.Content.replace(/\\n/g, '\n')
                }

                if (parsedData.isFinal) {
                  assistantMessage.status = 'done'
                  // 回答完成时自动收起 thought
                  assistantMessage.showThought = false
                } else {
                  assistantMessage.status = 'typing'
                }
                break

              case 'thought':
                if (!parsedData.isFinal) {
                  // 只在非完成状态时更新 thought
                  assistantMessage.status = 'thinking'
                  if (parsedData.Content) {
                    assistantMessage.thought = parsedData.Content.replace(/\\n/g, '\n')
                    assistantMessage.showThought = true
                  }
                }
                break
            }
          }
          break

        case 'product':
          assistantMessage.products = parsedData
          break

        case 'question':
          if (this.store?.setRecommendedQuestions) {
            // 确保问题数据是数组格式
            const questions = Array.isArray(parsedData) ? parsedData : [parsedData]
            this.store.setRecommendedQuestions(questions)
          }
          break

        default:
          console.log(`未处理的事件类型: ${eventType}`)
      }
    } catch (e) {
      console.warn(`处理事件数据失败: ${eventType}`, e)
      // 不要抛出错误,让流程继续
    }
  }
 preprocessJsonContent(jsonContent) {
    const contentMatch = jsonContent.match(/"Content":"([\s\S]*?)(?="}$)/)//正则匹配"Content":"格式的字符串,并提取其中的内容部分
    if (contentMatch) {
      const content = contentMatch[1]//提取的内容部分
      const escapedContent = content
        .replace(/\n/g, '\\n')
        .replace(/\r/g, '\\r')
        .replace(/\t/g, '\\t')
        .replace(/\\/g, '\\\\')
        .replace(/"/g, '\\"')//将内容中的特殊字符进行转义,替换换行符、回车符、制表符反斜杠、双引号为相应的转义序列,以便JSON解析

      return jsonContent.replace(content, escapedContent)//将转义后的内容替换回原始JSON字符串中
    }
    return jsonContent//如果匹配不到,则返回原始JSON字符串
  }

总结:通过TextDecoder API将接收到的二进制数据转换为UTF-8编码文本,通过正则匹配并替换JSON字符串中的特殊字符,确保其能正常的解析,结合pi’nia来实现管理和同步应用状态


http://www.kler.cn/a/598637.html

相关文章:

  • OpenRAND可重复的随机数生成库
  • git 合并多次提交 commit
  • 【xiaozhi赎回之路-2:语音可以自己配置就是用GPT本地API】
  • display: contens的使用
  • python爬虫Redis数据库
  • 【MyDB】5-索引管理之4-单元测试
  • Jupyter Notebook 常用命令(自用)
  • AI安全学习(刚开始,未完版)
  • git 命令回退版本
  • Kafka 八股文
  • python 游戏开发cocos2d库安装与使用
  • 【推荐项目】056-高校学籍管理系统
  • 回归——数学公式推导全过程
  • 【前端】webstorm中运行一个前端项目
  • C++核心语法快速整理
  • SpringBoot集成MQTT客户端
  • HarmonyOS:通过键值型数据库实现数据持久化
  • Mysql 安装教程和Workbench的安装教程以及workbench的菜单栏汉化
  • 响应式CMS架构优化SEO与用户体验
  • 常⻅CMS漏洞 -DeDeCMS 获取webshell