当前位置: 首页 > article >正文

WebSocket实现消息实时推送

文章目录

    • websocket介绍
      • 特点
      • 工作原理
    • 用websocket实现实时推送
      • 引入依赖
      • WebSocket 函数定义
      • 变量声明
      • 初始化 WebSocket 连接
      • WebSocket 连接的初始化和事件处理
      • 连接打开事件
      • 接收消息处理
      • 连接关闭和重连机制
      • 心跳机制
      • 使用 WebSocket
      • 代码完整显示

websocket介绍

WebSocket 是一种网络通信协议,旨在实现客户端和服务器之间的双向通信。它允许在单个 TCP 连接上进行全双工(即同时进行发送和接收)通信。WebSocket 特别适用于需要实时更新的应用,比如在线游戏、聊天应用、实时数据流等。

特点

双向通信:与传统的 HTTP 请求-响应模型不同,WebSocket 允许客户端和服务器同时发送和接收数据,这意味着一方可以主动向另一方发送消息,而不需要等待请求。

持久连接:WebSocket 建立的连接是持久的,客户端和服务器在一次连接后可以持续交换消息,而不需要频繁建立和关闭连接,从而减少了网络延迟和开销。

低延迟:由于使用了持久连接,WebSocket 可以减少消息传输中的延迟,使实时应用的响应速度更快。

轻量级:WebSocket 协议在数据传输中相对轻量,不需要像 HTTP 那样包含大量的头信息,数据包开销更小。

工作原理

握手:首先,客户端通过发送 HTTP 请求向服务器发起 WebSocket 连接。请求中包含一些特定的头信息,表明希望升级到 WebSocket 协议。

建立连接:服务器收到请求后,如果支持 WebSocket,将返回一个响应,确认升级连接。此时,HTTP 连接转变为 WebSocket 连接。

数据传输:连接建立后,双方可以自由地发送和接收消息。消息格式可以是文本(如 JSON)或二进制数据。

关闭连接:一方可以发送关闭帧,另一方收到后也会关闭连接,整个通信过程结束。

用websocket实现实时推送

这个封装提供了一个基础的 WebSocket 功能,可以支持实时消息推送。通过增加事件监听的管理、错误处理、灵活的心跳机制等功能,可以使这个封装更加健壮和灵活。

引入依赖

import { ElMessage, getCache } from "@/utils";

ElMessage: 用于显示消息提示。
getCache: 用于获取缓存数据,这里用来获取用户ID。

WebSocket 函数定义

function webSocket(params: string) {
  ...
  return {
    onMessageFns,
    init,
  };
}

webSocket 函数接收一个参数 params(通常是用户ID),用于建立特定的 WebSocket 连接。

变量声明

  let urlParams: string = params;
  const isReconnect = ref(true);
  let reconnection: any;
  let ws: WebSocket | null = null;
  let websocketConnectedCount = 0;
  let serverTimeoutObj: any = null;
  const hearbeatInterval = 300000; // 心跳间隔

is_reconnect: 用于标识是否可以重连。
ws: 存储 WebSocket 实例。
websocketConnectdCount : 记录连接失败次数。
serverTimeoutObj : 用于管理心跳检测的定时器。

初始化 WebSocket 连接

const init = () => {
  if (!("WebSocket" in window)) {
    ElMessage({
      message: "抱歉,浏览器不支持Websocket!",
      type: "warning",
      duration: 1000,
    });
    return;
  }
  try {
    initWebSocket();
  } catch (e) {
    console.log("尝试创建连接失败");
    reConnect();
  }
};

首先检查浏览器是否支持 WebSocket。如果支持,则尝试初始化连接;如果失败,则调用 reConnect 进行重连。

WebSocket 连接的初始化和事件处理

 function initWebSocket() {
    const baseUrl = import.meta.env.VITE_BaseUrl?.slice(7);
    const url = `ws://${baseUrl}/websocket/${urlParams}`;
    ws = new WebSocket(url);

    ws.onopen = (e: Event) => {
      websocketOpen(e);
    };

    ws.onmessage = (e: MessageEvent) => {
      websocketOnMessage(e);
    };

    ws.onerror = () => {
      console.log("WebSocket连接发生错误");
      isReconnect.value = false;
      websocketConnectedCount++;
      if (websocketConnectedCount <= 5) {
        reConnect();
      }
    };

    ws.onclose = (e: CloseEvent) => {
      websocketClose(e);
    };
  }

创建 WebSocket 实例,并设置各类事件处理函数(打开、接收消息、错误、关闭)。

连接打开事件

function websocketOpen(e: Event) {
    console.log("连接成功");
    reset();
    start();
    const data = { sendType: "HEALTH" };
    ws?.send(JSON.stringify(data));
  }
}

连接成功时,重置心跳并发送健康检查消息。

接收消息处理

const onMessageFns = new Set<(e: MessageEvent) => void>();
  function websocketOnMessage(e: MessageEvent) {
    onMessageFns.forEach((callback) => callback(e));
    reset();
    start();
    return e.data;
  }

接收到消息时,执行所有注册的回调函数,并重置心跳。

连接关闭和重连机制

function websocketclose(e: any) {
  console.log(e);
  is_reconnect.value = false;
  console.log("connection closed (" + e.code + ")");
}

let reConnect = () => {
  console.log("尝试重新连接");
  if (is_reconnect) return; // 如果已经连上就不再重连
  reconnection && clearTimeout(reconnection);
  reconnection = setTimeout(function () {
    init();
  }, 5000);
};

连接关闭时设置标识,并在指定时间后尝试重连。

心跳机制

 const reset = () => {
    clearTimeout(serverTimeoutObj);
  };

  const start = () => {
    serverTimeoutObj = setInterval(() => {
      if (ws?.readyState === WebSocket.OPEN) {
        console.log("连接状态,发送消息保持连接");
        const data = { sendType: "HEALTH" };
        ws?.send(JSON.stringify(data));
        reset();
      } else {
        console.log("断开连接, 尝试重连");
        webSocket(urlParams);
      }
    }, hearbeatInterval);
  };

定期发送健康检查消息,保持连接活跃。

使用 WebSocket

import webSocket from "@/utils/websocket";

const userId = getCache("userId");
const ws = webSocket(userId)!;

onMounted(async () => {
  const addMessage = () => {
    ws.onMessageFns.add((value: MessageEvent) => {
      try {
        if (value.data !== "来自后台的反馈:连接成功" && value.data !== "SUCCESS") {
          const parsedData = JSON.parse(value.data);
          console.log("获取到的信息", parsedData);
        }
      } catch (error) {
        console.error("消息解析错误:", error);
      }
    });
  };
  addMessage();
});

在组件挂载时,获取用户ID并创建 WebSocket 实例。添加消息接收处理,过滤特定消息并处理 JSON 数据。

代码完整显示

import { ElMessage, getCache } from "@/utils";

function webSocket(params: string) {
  let urlParams: string = params;
  const isReconnect = ref(true);
  let reconnection: any;
  let ws: WebSocket | null = null;
  let websocketConnectedCount = 0;
  let serverTimeoutObj: any = null;
  const hearbeatInterval = 300000; // 心跳间隔

  const init = () => {
    if (!("WebSocket" in window)) {
      ElMessage({
        message: "抱歉,浏览器不支持WebSocket!",
        type: "warning",
        duration: 1000,
      });
      return;
    }
    try {
      initWebSocket();
    } catch (e) {
      console.log("尝试创建连接失败");
      reConnect();
    }
  };

  function initWebSocket() {
    const baseUrl = import.meta.env.VITE_BaseUrl?.slice(7);
    const url = `ws://${baseUrl}/websocket/${urlParams}`;
    ws = new WebSocket(url);

    ws.onopen = (e: Event) => {
      websocketOpen(e);
    };

    ws.onmessage = (e: MessageEvent) => {
      websocketOnMessage(e);
    };

    ws.onerror = () => {
      console.log("WebSocket连接发生错误");
      isReconnect.value = false;
      websocketConnectedCount++;
      if (websocketConnectedCount <= 5) {
        reConnect();
      }
    };

    ws.onclose = (e: CloseEvent) => {
      websocketClose(e);
    };
  }

  function websocketOpen(e: Event) {
    console.log("连接成功");
    reset();
    start();
    const data = { sendType: "HEALTH" };
    ws?.send(JSON.stringify(data));
  }

  const onMessageFns = new Set<(e: MessageEvent) => void>();
  function websocketOnMessage(e: MessageEvent) {
    onMessageFns.forEach((callback) => callback(e));
    reset();
    start();
    return e.data;
  }

  function websocketClose(e: CloseEvent) {
    console.log("connection closed (" + e.code + ")");
    isReconnect.value = false;
  }

  const reConnect = () => {
    console.log("尝试重新连接");
    if (isReconnect.value) return; // 如果已经连上就不再重连
    reconnection && clearTimeout(reconnection);
    reconnection = setTimeout(init, 5000);
  };

  const reset = () => {
    clearTimeout(serverTimeoutObj);
  };

  const start = () => {
    serverTimeoutObj = setInterval(() => {
      if (ws?.readyState === WebSocket.OPEN) {
        console.log("连接状态,发送消息保持连接");
        const data = { sendType: "HEALTH" };
        ws?.send(JSON.stringify(data));
        reset();
      } else {
        console.log("断开连接, 尝试重连");
        webSocket(urlParams);
      }
    }, hearbeatInterval);
  };

  return {
    onMessageFns,
    init,
  };
}

// 使用 WebSocket
import webSocket from "@/utils/websocket";

const userId = getCache("userId");
const ws = webSocket(userId)!;

onMounted(async () => {
  const addMessage = () => {
    ws.onMessageFns.add((value: MessageEvent) => {
      try {
        if (value.data !== "来自后台的反馈:连接成功" && value.data !== "SUCCESS") {
          const parsedData = JSON.parse(value.data);
          console.log("获取到的信息", parsedData);
        }
      } catch (error) {
        console.error("消息解析错误:", error);
      }
    });
  };
  addMessage();
});

http://www.kler.cn/a/382473.html

相关文章:

  • Unity中 Xlua使用整理(二)
  • Java Spring Boot实现基于URL + IP访问频率限制
  • 手机的ip地址是根据电话卡归属地定吗
  • “深入浅出”系列之QT:(6)如何在一个项目中调用另一个项目
  • 分布式环境下定时任务扫描时间段模板创建可预订时间段
  • JS进阶--JS听到了不灭的回响
  • C# 一个工具类让winform自动根据窗体大小缩放所有控件
  • 【EasyExcel】EasyExcel导出表格包含合计行、自定义样式、自适应列宽
  • Rust 构建 TCP/UDP 网络服务
  • 导航栏渐变色iOS
  • 在不知道root密码的情况下向MobaXterm发送信息
  • 机器学习在运维中的应用
  • 仓库(Repository)
  • Go + Wasm
  • 深入理解 C++ 中的 std::vector
  • 淘宝商品详情 API:助力电商业务腾飞的新桥梁
  • 流程与模式
  • Python正则表达式匹配汉字、英文、数字、常用符号等
  • Automated Isotope Identification Algorithm UsingArtificial Neural Networks-论文阅读
  • Rust常用数据结构教程 String与str,元组和数组
  • 【K8S系列】Kubernetes 中 Service 更改未生效的故障排查与解决方案【已解决】
  • 智能提醒助理系列-jdk8升级到21,springboot2.3升级到3.3【性能篇】
  • WandB概念、主要功能、详细说明和总结
  • 鸿蒙ArkTS中的布局容器组件(Scroll、List、Tabs)
  • react中得类组件和函数组件有啥区别,怎么理解这两个函数
  • 源文件到可执行文件流程