当前位置: 首页 > article >正文

QQ频道机器人零基础开发详解(基于QQ官方机器人文档)[第七期]

QQ频道机器人零基础开发详解(基于QQ官方机器人文档)[第七期]

第七期介绍:事件订阅之WebSocket方式

目录

  • QQ频道机器人零基础开发详解(基于QQ官方机器人文档)[第七期]
    • 第七期介绍:事件订阅之WebSocket方式
  • WebSocket方式
    • 通用数据结构 Payload
    • 长连接维护 OpCode
    • 发起连接到 Gateway
    • 登录鉴权获得 Session
    • 发送心跳 Ack
    • 恢复登录态 Session
    • 事件订阅Intents
      • 举例
    • 权限
    • 分片连接LoadBalance
    • 获得合适的分片数
    • 分片规则
    • 最大连接数
    • 完整代码示例
  • 致谢和更新

在这里插入图片描述


不懂得的也可以来私聊或评论区问哦~
在这里插入图片描述
原力到一千才可以推广,三连啊喂!!!

在这里插入图片描述

WebSocket方式

通过 WebSocket 建立与QQ机器人开放平台的长链接通信管道,当需要事件通知的时候QQ后台通过 WebSocket 连接下发事件到开发者服务器上。

开发者需要维护 WebSocket 长链接的状态,包括连接状态维护、登录鉴权、心跳维护、断线恢复重连等。

优势:本地服务器即可发起调试,无需依赖公网域名和公网服务器(WebHook)接收回调通知。

在进行接下来操作之前,先学会建立WebSocket连接

import asyncio
import websockets

async def connect_to_gateway(url):
    async with websockets.connect(url) as websocket:
        while True:
            data = await websocket.recv()
            print(f"Received: {data}")
            # 这里可以添加处理接收到的数据的逻辑

通用数据结构 Payload

payload 指的是在 websocket 连接上传输的数据,网关的上下行消息采用的都是同一个结构,如下:

{
  "op": 0,
  "d": {},
  "s": 42,
  "t": "GATEWAY_EVENT_NAME"
}
字段描述
op指的是 opcode,参考连接维护
s下行消息都会有一个序列号,标识消息的唯一性,客户端需要再发送心跳的时候,携带客户端收到的最新的s
t代表事件类型。主要用在op为 0 Dispatch 的时候
d代表事件内容,不同事件类型的事件内容格式都不同,请注意识别。主要用在op为 0 Dispatch 的时候

长连接维护 OpCode

所有 opcode 列表如下:

CODE名称客户端行为描述
0DispatchReceive服务端进行消息推送
1HeartbeatSend/Receive客户端或服务端发送心跳
2IdentifySend客户端发送鉴权
6ResumeSend客户端恢复连接
7ReconnectReceive服务端通知客户端重新连接
9Invalid SessionReceive当 identify 或 resume 的时候,如果参数有错,服务端会返回该消息
10HelloReceive当客户端与网关建立 ws 连接之后,网关下发的第一条消息
11Heartbeat ACKReceive/Reply当发送心跳成功之后,就会收到该消息
12HTTP Callback ACKReply仅用于 http 回调模式的回包,代表机器人收到了平台推送的数据

客户端行为含义如下:

  • Receive 客户端接收到服务端 push 的消息
  • Send 客户端发送消息
  • Reply 客户端接收到服务端发送的消息之后的回包(HTTP 回调模式)

发起连接到 Gateway

第一步先调用 获取通用WSS 接入点 | QQ机器人文档 或 获取带分片WSS 接入点 | QQ机器人文档 接口获取网关地址。

会得到一个类似下面这样的地址:

wss://api.sgroup.qq.com/websocket/

然后进行 websocket 长连接建立,一旦连接成功,就会返回 OpCode 10 Hello 消息。这个消息主要的内容是心跳周期,单位毫秒(milliseconds),如下:

{
  "op": 10,
  "d": {
    "heartbeat_interval": 45000
  }
}

登录鉴权获得 Session

websocket 长连接建立之后,需要进行登录鉴权,登录鉴权成功后会获得一个 session 会话 id,只有登录成功后,QQ后台才会下发事件通知,

发送一个 OpCode 2 Identify 消息, payload 如下:

{
  "op": 2,
  "d": {
    "token": "token string",
    "intents": 513,
    "shard": [0, 4],
    "properties": {
      "$os": "linux",
      "$browser": "my_library",
      "$device": "my_library"
    }
  }
}
字段描述
token格式为"QQBot {AccessToken}"
intents是此次连接所需要接收的事件,具体可参考 Intents 事件订阅intents
shard考虑到开发者事件接收时可以实现负载均衡,QQ 提供了分片逻辑,事件通知会落在不同的分片上,该参数是个拥有两个元素的数组。
properties目前无实际作用,可以按照自己的实际情况填写,也可以留空

python示例

async def identify(websocket):
    token = "你的机器人Token"
    payload = {
        "op": 2,
        "d": {
            "token": token,
            "intents": 513,  # 根据需要订阅的事件修改
            "shard": [0, 4],  # 根据实际情况修改
            "properties": {
                "$os": "linux",
                "$browser": "my_library",
                "$device": "my_library"
            }
        }
    }
    await websocket.send(json.dumps(payload))

鉴权成功之后,QQ 后台会下发一个 Ready Event, payload 如下:

{
  "op": 0,
  "s": 1,
  "t": "READY",
  "d": {
    "version": 1,
    "session_id": "082ee18c-0be3-491b-9d8b-fbd95c51673a",
    "user": {
      "id": "6158788878435714165",
      "username": "群pro测试机器人",
      "bot": true
    },
    "shard": [0, 0]
  }
}

发送心跳 Ack

鉴权成功之后,就需要按照周期进行心跳发送。d 为客户端收到的最新的消息的 s,如果是首次连接,d 为传 null, payload 如下:

{
  "op": 1,
  "d": 251
}

python示例

async def send_heartbeat(websocket, seq):
    while True:
        payload = {"op": 1, "d": seq}
        await websocket.send(json.dumps(payload))
        await asyncio.sleep(45)  # 根据服务器返回的心跳间隔调整

心跳发送成功之后会收到 OpCode 11 Heartbeat ACK 消息, payload 如下:

{
  "op": 11
}

恢复登录态 Session

有很多原因可能会导致 websocket 长连接断开,断开之后短时间内重连会补发中间遗漏的事件,以保障业务逻辑的正确性。断开重连 gateway 后不需要发送重新登录 Opcode 2 Identify请求。在连接到 Gateway 之后,需要发送 Opcode 6 Resume消息, payload 如下:

{
  "op": 6,
  "d": {
    "token": "my_token",
    "session_id": "session_id_i_stored",
    "seq": 1337
  }
}

python示例

async def resume(websocket, session_id, seq):
    token = "你的机器人Token"
    payload = {
        "op": 6,
        "d": {
            "token": token,
            "session_id": session_id,
            "seq": seq
        }
    }
    await websocket.send(json.dumps(payload))

其中 seq 指的是在接收事件时候的 s 字段,我们推荐开发者在处理过事件之后记录下 s 这样可以在 resume 的时候传递给 websocket, websocket 会自动补发这个 seq 之后的事件。

恢复成功之后,就开始补发遗漏事件,所有事件补发完成之后,会下发一个 Resumed Event, payload 如下:

{
  "op": 0,
  "s": 2002,
  "t": "RESUMED",
  "d": ""
}

事件订阅Intents

事件的 intents 是一个标记位,每一位都代表不同的事件,如果需要接收某类事件,就将该位置为 1。

每个 intents 位代表的是一类事件,可以使用使用 websocket 传输的数据中的 t 字段的值来区分。

事件和位移的关系如下:

  • GUILDS (1 << 0)

    • GUILD_CREATE
    • GUILD_UPDATE
    • GUILD_DELETE
    • CHANNEL_CREATE
    • CHANNEL_UPDATE
    • CHANNEL_DELETE
  • GUILD_MEMBERS (1 << 1)

    • GUILD_MEMBER_ADD
    • GUILD_MEMBER_UPDATE
    • GUILD_MEMBER_REMOVE
  • GUILD_MESSAGES (1 << 9)

    • MESSAGE_CREATE
    • MESSAGE_DELETE
  • GUILD_MESSAGE_REACTIONS (1 << 10)

    • MESSAGE_REACTION_ADD
    • MESSAGE_REACTION_REMOVE
  • DIRECT_MESSAGE (1 << 12)

    • DIRECT_MESSAGE_CREATE
    • DIRECT_MESSAGE_DELETE
  • GROUP_AND_C2C_EVENT (1 << 25)

    • C2C_MESSAGE_CREATE
    • FRIEND_ADD
    • FRIEND_DEL
    • C2C_MSG_REJECT
    • C2C_MSG_RECEIVE
    • GROUP_AT_MESSAGE_CREATE
    • GROUP_ADD_ROBOT
    • GROUP_DEL_ROBOT
    • GROUP_MSG_REJECT
    • GROUP_MSG_RECEIVE
  • INTERACTION (1 << 26)

    • INTERACTION_CREATE
  • MESSAGE_AUDIT (1 << 27)

    • MESSAGE_AUDIT_PASS
    • MESSAGE_AUDIT_REJECT
  • FORUMS_EVENT (1 << 28)

    • FORUM_THREAD_CREATE
    • FORUM_THREAD_UPDATE
    • FORUM_THREAD_DELETE
    • FORUM_POST_CREATE
    • FORUM_POST_DELETE
    • FORUM_REPLY_CREATE
    • FORUM_REPLY_DELETE
    • FORUM_PUBLISH_AUDIT_RESULT
  • AUDIO_ACTION (1 << 29)

    • AUDIO_START
    • AUDIO_FINISH
    • AUDIO_ON_MIC
    • AUDIO_OFF_MIC
  • PUBLIC_GUILD_MESSAGES (1 << 30)

    • AT_MESSAGE_CREATE
    • PUBLIC_MESSAGE_DELETE

举例

如开发者需要接收用户 at 机器人的消息,那么就需要在 intents 中设置接收 PUBLIC_GUILD_MESSAGES。则需要先计算 1 << 30 的值。然后与 0 做位或操作,得到最终需要传递的 intents。

如果涉及到多个事件类型的接收,则需要将多个结果做位或操作,如:0|1<<30|1<<1 代表订阅 PUBLIC_GUILD_MESSAGES 和 GUILD_MEMBERS 这两类事件。

权限

事件类型的订阅,是有权限控制的,除了 GUILDS,PUBLIC_GUILD_MESSAGES,GUILD_MEMBERS 事件是基础的事件,默认有权限订阅之外,其他的特殊事件,都需要经过申请才能够使用,
如果在鉴权的时候传递了无权限的 intents, websocket 会报错,并直接关闭连接。请开发者注意订阅事件的范围需要控制在自己所需要的范围之内。

如果拥有的某个特殊事件类型的权限被取消,则在当前连接上不会报错,但是将不会收到对应的事件类型,如果重新连接,则报错,所以如果开发者的事件类型权限被取消,请及时调整监听事件代码,避免报错导致的无法连接。

分片连接LoadBalance

随着bot的增长并被添加到越来越多的频道中,事件越来越多,业务有必要对事件进行水平分割,实现负载均衡。机器人网关实现了一种用户可控制的分片方法,该方法允许跨多个网关连接拆分事件。 分片完全由用户控制,并且不需要在单独的连接之间进行状态共享。

要在连接上启用分片,需要在建立连接的时候指定分片参数,具体参考gateway。

获得合适的分片数

使用 /gateway/bot 接口获取网关地址的时候,会同时返回一个建议的 shard数,及最大并发限制。

{
  "url": "wss://sandbox.api.sgroup.qq.com/websocket",
  "shards": 1,
  "session_start_limit": {
    "total": 1000,
    "remaining": 1000,
    "reset_after": 86400000,
    "max_concurrency": 1
  }
}

分片规则

分片是按照频道id进行哈希的,同一个频道的信息会固定从同一个链接推送。具体哈希计算规则如下:

shard_id = (guild_id >> 22) % num_shards

最大连接数

每个机器人创建的连接数不能超过 remaining 剩余连接数

完整代码示例

import requests
import asyncio
import json
import base64
import websockets
import re
from model.SparkApi import xunfei_chat, getText, xunfei_chat_1
from openai import OpenAI
from model.word2picture import xunfei_picture

# 获取 WebSocket 网关 URL 的函数
def get_ws_url():
    url = "https://api.sgroup.qq.com/gateway"
    response = requests.get(url, headers=headers).json()
    return response["url"]

# 获取访问令牌的函数
def get_access_token():
    url = "https://bots.qq.com/app/getAppAccessToken"
    json_headers = {"Content-Type": "application/json"}
    data = {"appId": "# 你自己的", "clientSecret": "# 你自己的"}
    response = requests.post(url, json=data, headers=json_headers)
    access_token = response.json()['access_token']
    return access_token

# 鉴权方式
headers = {"Content-Type": "application/json", "Authorization": "QQBot " + get_access_token()}

class QQRobotClient:
    def __init__(self, token, intents):
        self.token = token
        self.intents = intents
        self.session_id = None
        self.seq = None
        self.heartbeat_interval = None

    async def connect(self, gateway_url):
        await self.reconnect(gateway_url)

    async def reconnect(self, gateway_url):
        while True:
            try:
                async with websockets.connect(gateway_url) as websocket:
                    await self.handle_connection(websocket)
            except websockets.exceptions.ConnectionClosed as e:
                print(f"连接关闭: {e}")
                await asyncio.sleep(5)  # 等待5秒后重连
                continue  # 继续重连循环

    async def handle_connection(self, websocket):
        print("连接成功")
        await self.send_hello(websocket)
        if self.session_id and self.seq:
            await self.resume(websocket)
        else:
            await self.identify(websocket)
        self.heartbeat_interval = 45000  # 设置心跳间隔
        while True:
            try:
                message = await websocket.recv()
                self.handle_message(message, websocket)
            except websockets.exceptions.ConnectionClosed as e:
                print(f"连接关闭: {e}")
                break

    async def send_hello(self, websocket):
        hello_message = {"op": 10, "d": {}}
        await websocket.send(json.dumps(hello_message))

    async def identify(self, websocket):
        identify_message = {
            "op": 2,
            "d": {
                "token": self.token,
                "intents": self.intents,
                "shard": [0, 1],
                "properties": {}
            }
        }
        await websocket.send(json.dumps(identify_message))

    async def resume(self, websocket):
        resume_message = {
            "op": 6,
            "d": {
                "token": self.token,
                "session_id": self.session_id,
                "seq": self.seq
            }
        }
        await websocket.send(json.dumps(resume_message))

    def handle_message(self, message, websocket):
        data = json.loads(message)
        if data["op"] == 0:  # Dispatch event
            self.handle_dispatch(data, websocket)
        elif data["op"] == 11:  # Heartbeat ACK
            pass  # 心跳确认,无需额外操作
        elif data["op"] == 10:  # Hello
            self.heartbeat_interval = data["d"]["heartbeat_interval"]
        else:
            print(f"Unhandled opcode: {data['op']}")

    def handle_dispatch(self, data, websocket):
        event_type = data["t"]
        if event_type == "RESUMED":
            print("Resumed successfully")
        elif event_type == "AT_MESSAGE_CREATE":
            self.handle_channel_event(data["d"])
        elif event_type == "GROUP_AT_MESSAGE_CREATE":
            self.handle_group_event(data["d"])
        self.seq = data["s"]  # 更新序列号

    def handle_channel_event(self, event_data):
        print(f"接受到频道事件: {event_data}")
        # 接下来写你自己的代码
    def handle_group_event(self, event_data):
        print(f"接受到群事件: {event_data}")
		# 接下来写你自己的代码
    
    def run(self, gateway_url):
        asyncio.run(self.connect(gateway_url))

# Usage
intents = (1 << 25) | (1 << 30)# 监听哪些就添加哪些
token = "QQBot " + get_access_token()
client = QQRobotClient(token, intents)
client.run(get_ws_url())

致谢和更新

该栏目目前制作完成,内容会在原基础上持续更新,具体更新时间以下面⬇️更新时间为准。
文章持续更新,如果三连支持,速更!!!
请在评论区提出疑惑和建议
上次更新: 9/19/2024, AM
在这里插入图片描述

⬅️第六期:频道模块之内容管理


http://www.kler.cn/a/319872.html

相关文章:

  • 【Pytorch实用教程】TCN(Temporal Convolutional Network,时序卷积网络)简介
  • 前端基础笔记
  • 基于机器学习的用户健康风险分类及预测分析
  • 【Mysql进阶知识】Mysql 程序的介绍、选项在命令行配置文件的使用、选项在配置文件中的语法
  • 一文掌握Docker
  • 数据结构--二叉树
  • Go Mail设置指南:如何提升发送邮件效率?
  • 【Linux 从基础到进阶】Hadoop 大数据平台搭建与优化
  • ARM/Linux嵌入式面经(三九):中科驭数
  • 解决多尺度网络中上采样尺寸不一致问题
  • 低代码中实现数据映射的必要性与方案
  • 18 vue3之定义自定义指令Directive
  • 10.Lab Nine —— file system-上
  • 跳跃列表(Skip List)详解
  • JS显示数字时钟的格式时间
  • Vue.js 与 Flask 或 Django 后端配合
  • ArrayList源码实现(一)
  • Scala第一天
  • Tomcat may not be running
  • Facebook个人账户被停用是什么原因?如何解决?
  • 剖析:基于 RDMA 的多机数据分发和接收场景
  • 基于Java的宠物之家小程序 宠物服务小程序【源码+调试】
  • sort 命令:文本排序
  • 计算机的错误计算(一百零四)
  • 通过两个类计算一个长方形的周长和面积
  • MySql语言操作数据库---增删改查数据库,表,数据