企业微信应用消息收发实施记录
一、前置配置
1.1 进入我的企业页面,记录下企业ID。
1.2 创建企微应用,记录下应用的 AgentId 和 Secret。
1.3 设置应用的企业可信IP,将服务器公网 IP 填入即可。
1.4 设置应用接收消息API
填入服务器 API 地址,并记录下随机获取的 Token 和 EncodingAESKey。完成后,先不要点击保存,后续等服务端应用启动后再保存,即可完成校验。
二、服务端部署
2.1 企业应用消息收发流程拓扑
2.2 企微相关开发者文档说明
①、消息接收概述(主要说明了 消息加解密方法、消息收发协议、消息收发格式等)
概述 - 文档 - 企业微信开发者中心 (qq.com)
②、消息加解密官方库(包含多种代码语言,本文使用的是python库,解压使用的文件如下:)
加解密库下载与返回码 - 文档 - 企业微信开发者中心 (qq.com)
注意:需要使用 WXBizMsgCrypt3.py
这个文件。
③、企微应用主动发送消息(被动方式回复消息的格式不支持markdown和文件类型,为使回复内容更美观,可以采用主动发送消息的方式进行指定回复。)
发送应用消息 - 文档 - 企业微信开发者中心 (qq.com)
2.3 安装python相关依赖库。
pip3 install -r requirements.txt
requirements 内容如下:
bcrypt==4.1.1
blinker==1.8.2
certifi==2024.8.30
cffi==1.17.0
charset-normalizer==3.3.2
click==8.1.7
colorama==0.4.6
crypto==1.4.1
cryptography==36.0.2
flask==3.0.3
idna==3.8
importlib-metadata==8.4.0
itsdangerous==2.2.0
jinja2==3.1.4
MarkupSafe==2.1.5
Naked==0.1.32
paramiko==3.0.0
pycparser==2.22
pycryptodome==3.20.0
PyNaCl==1.5.0
PyYAML==6.0.2
requests==2.32.3
shellescape==3.8.1
urllib3==2.2.2
werkzeug==3.0.4
zipp==3.20.1
2.4 主程序 app.py
内容:
# -*- coding: utf-8 -*-
from flask import Flask, request, make_response
from WXBizMsgCrypt3 import WXBizMsgCrypt
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import fromstring
# help_list
from help_list import help_list
# funny
from funny.help_funny_list import help_funny_list
from funny.get_weather import get_weather
from funny.get_myrb import get_myrb
from funny.get_music import get_music
from funny.get_fortune import get_fortune
from funny.get_tellocal import get_tel
from funny.get_express import get_express
# ops_tools
from ops_tools.get_ops import get_ops
app = Flask(__name__)
def printXML(xml_content):
# 创建XML元素
element = ET.XML(xml_content)
# 使用indent()函数进行格式化打印
ET.indent(element)
print(ET.tostring(element, encoding='unicode'))
# 对应接受消息回调模式中的token,EncodingAESKey 和 企业信息中的企业id
qy_api = [
WXBizMsgCrypt("***************", "**************************", "*********************"), ]
# 开启消息接受模式时验证接口连通性
def signature(request, i):
msg_signature = request.args.get('msg_signature', '')
timestamp = request.args.get('timestamp', '')
nonce = request.args.get('nonce', '')
echo_str = request.args.get('echostr', '')
ret, sEchoStr = qy_api[i].VerifyURL(msg_signature, timestamp, nonce, echo_str)
if (ret != 0):
print("ERR: VerifyURL ret: " + str(ret))
return ("failed")
else:
return (sEchoStr)
# 接收用户消息,可进行被动响应
def handle_user_message(request, i):
user_message = request.data
printXML(user_message)
msg_signature = request.args.get('msg_signature', '')
timestamp = request.args.get('timestamp', '')
nonce = request.args.get('nonce', '')
ret, sMsg = qy_api[i].DecryptMsg(user_message.decode('utf-8'), msg_signature, timestamp, nonce)
decrypt_data = {}
for node in list(fromstring(sMsg.decode('utf-8'))):
decrypt_data[node.tag] = node.text
# 解析后得到的decrypt_data: {"ToUserName":"企业号", "FromUserName":"发送者用户名", "CreateTime":"发送时间", "Content":"用户发送的内容", "MsgId":"唯一id,需要针对此id做出响应", "AagentID": "应用id"}
# 用户应根据Content的内容自定义要做出的行为,包括响应返回数据,如下例子,如果发送的是123,就返回hello world
content_text = decrypt_data.get('Content', '')
to_username_text = decrypt_data.get('ToUserName', '')
from_username_text = decrypt_data.get('FromUserName', '')
create_time_text = decrypt_data.get('CreateTime', '')
# 主菜单
if content_text == '#help':
sRespData = help_list(to_username_text, from_username_text, create_time_text)
# 生活菜单
if content_text == '#help02':
sRespData = help_funny_list(to_username_text, from_username_text, create_time_text)
# 天气查询
if content_text == '#天气查询':
sRespData = get_weather(to_username_text, from_username_text, create_time_text)
# 摸鱼日报
if content_text == '#摸鱼日报':
sRespData = get_myrb(to_username_text, from_username_text, create_time_text)
# 随机点歌
if content_text == '#随机点歌':
sRespData = get_music(to_username_text, from_username_text, create_time_text)
# 星座运势
if "#星座运势#" in content_text:
sRespData = get_fortune(content_text, to_username_text, from_username_text, create_time_text)
# 电话查询
if "#电话查询#" in content_text:
sRespData = get_tel(content_text, to_username_text, from_username_text, create_time_text)
# 快递查询
if "#快递查询#" in content_text:
sRespData = get_express(content_text, to_username_text, from_username_text, create_time_text)
# OPS工具
if "#ops#" in content_text:
sRespData = get_ops(content_text, to_username_text, from_username_text, create_time_text)
ret, send_msg = qy_api[i].EncryptMsg(sReplyMsg=sRespData, sNonce=nonce)
if ret == 0:
return send_msg
else:
print(send_msg)
@app.route('/company_wechat', methods=['GET', 'POST'])
def company_wechat():
if request.method == 'GET':
return signature(request, 0)
else:
print("收到请求......")
return handle_user_message(request, 0)
# Flask服务端口,可自定义
if __name__ == '__main__':
app.run(host='0.0.0.0', port=6969, debug=True)
将刚刚记录下的 Token
,EncodingAESKey
和 企业ID
分别替换至该段:
2.5 解密库 WXBizMsgCrypt3.py
内容:
# -*- encoding:utf-8 -*-
""" 对企业微信发送给企业后台的消息加解密示例代码.
@copyright: Copyright (c) 1998-2014 Tencent Inc.
"""
# ------------------------------------------------------------------------
import logging
import base64
import random
import hashlib
import time
import struct
from Crypto.Cipher import AES
import xml.etree.cElementTree as ET
import socket
import ierror
"""
关于Crypto.Cipher模块,ImportError: No module named 'Crypto'解决方案
请到官方网站 https://www.dlitz.net/software/pycrypto/ 下载pycrypto。
下载后,按照README中的“Installation”小节的提示进行pycrypto安装。
"""
class FormatException(Exception):
pass
def throw_exception(message, exception_class=FormatException):
"""my define raise exception function"""
raise exception_class(message)
class SHA1:
"""计算企业微信的消息签名接口"""
def getSHA1(self, token, timestamp, nonce, encrypt):
"""用SHA1算法生成安全签名
@param token: 票据
@param timestamp: 时间戳
@param encrypt: 密文
@param nonce: 随机字符串
@return: 安全签名
"""
try:
sortlist = [token, timestamp, nonce, encrypt]
sortlist.sort()
sha = hashlib.sha1()
sha.update("".join(sortlist).encode())
return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
except Exception as e:
logger = logging.getLogger()
logger.error(e)
return ierror.WXBizMsgCrypt_ComputeSignature_Error, None
class XMLParse:
"""提供提取消息格式中的密文及生成回复消息格式的接口"""
# xml消息模板
AES_TEXT_RESPONSE_TEMPLATE = """<xml>
<Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt>
<MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature>
<TimeStamp>%(timestamp)s</TimeStamp>
<Nonce><![CDATA[%(nonce)s]]></Nonce>
</xml>"""
def extract(self, xmltext):
"""提取出xml数据包中的加密消息
@param xmltext: 待提取的xml字符串
@return: 提取出的加密消息字符串
"""
try:
xml_tree = ET.fromstring(xmltext)
encrypt = xml_tree.find("Encrypt")
return ierror.WXBizMsgCrypt_OK, encrypt.text
except Exception as e:
logger = logging.getLogger()
logger.error(e)
return ierror.WXBizMsgCrypt_ParseXml_Error, None
def generate(self, encrypt, signature, timestamp, nonce):
"""生成xml消息
@param encrypt: 加密后的消息密文
@param signature: 安全签名
@param timestamp: 时间戳
@param nonce: 随机字符串
@return: 生成的xml字符串
"""
resp_dict = {
'msg_encrypt': encrypt,
'msg_signaturet': signature,
'timestamp': timestamp,
'nonce': nonce,
}
resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict
return resp_xml
class PKCS7Encoder():
"""提供基于PKCS7算法的加解密接口"""
block_size = 32
def encode(self, text):
""" 对需要加密的明文进行填充补位
@param text: 需要进行填充补位操作的明文
@return: 补齐明文字符串
"""
text_length = len(text)
# 计算需要填充的位数
amount_to_pad = self.block_size - (text_length % self.block_size)
if amount_to_pad == 0:
amount_to_pad = self.block_size
# 获得补位所用的字符
pad = chr(amount_to_pad)
return text + (pad * amount_to_pad).encode()
def decode(self, decrypted):
"""删除解密后明文的补位字符
@param decrypted: 解密后的明文
@return: 删除补位字符后的明文
"""
pad = ord(decrypted[-1])
if pad < 1 or pad > 32:
pad = 0
return decrypted[:-pad]
class Prpcrypt(object):
"""提供接收和推送给企业微信消息的加解密接口"""
def __init__(self, key):
# self.key = base64.b64decode(key+"=")
self.key = key
# 设置加解密模式为AES的CBC模式
self.mode = AES.MODE_CBC
def encrypt(self, text, receiveid):
"""对明文进行加密
@param text: 需要加密的明文
@return: 加密得到的字符串
"""
# 16位随机字符串添加到明文开头
text = text.encode()
text = self.get_random_str() + struct.pack("I", socket.htonl(len(text))) + text + receiveid.encode()
# 使用自定义的填充方式对明文进行补位填充
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# 加密
cryptor = AES.new(self.key, self.mode, self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# 使用BASE64对加密后的字符串进行编码
return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
except Exception as e:
logger = logging.getLogger()
logger.error(e)
return ierror.WXBizMsgCrypt_EncryptAES_Error, None
def decrypt(self, text, receiveid):
"""对解密后的明文进行补位删除
@param text: 密文
@return: 删除填充补位后的明文
"""
try:
cryptor = AES.new(self.key, self.mode, self.key[:16])
# 使用BASE64对密文进行解码,然后AES-CBC解密
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception as e:
logger = logging.getLogger()
logger.error(e)
return ierror.WXBizMsgCrypt_DecryptAES_Error, None
try:
pad = plain_text[-1]
# 去掉补位字符串
# pkcs7 = PKCS7Encoder()
# plain_text = pkcs7.encode(plain_text)
# 去除16位随机字符串
content = plain_text[16:-pad]
xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
xml_content = content[4: xml_len + 4]
from_receiveid = content[xml_len + 4:]
except Exception as e:
logger = logging.getLogger()
logger.error(e)
return ierror.WXBizMsgCrypt_IllegalBuffer, None
if from_receiveid.decode('utf8') != receiveid:
return ierror.WXBizMsgCrypt_ValidateCorpid_Error, None
return 0, xml_content
def get_random_str(self):
""" 随机生成16位字符串
@return: 16位字符串
"""
return str(random.randint(1000000000000000, 9999999999999999)).encode()
class WXBizMsgCrypt(object):
# 构造函数
def __init__(self, sToken, sEncodingAESKey, sReceiveId):
try:
self.key = base64.b64decode(sEncodingAESKey + "=")
assert len(self.key) == 32
except:
throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
# return ierror.WXBizMsgCrypt_IllegalAesKey,None
self.m_sToken = sToken
self.m_sReceiveId = sReceiveId
# 验证URL
# @param sMsgSignature: 签名串,对应URL参数的msg_signature
# @param sTimeStamp: 时间戳,对应URL参数的timestamp
# @param sNonce: 随机串,对应URL参数的nonce
# @param sEchoStr: 随机串,对应URL参数的echostr
# @param sReplyEchoStr: 解密之后的echostr,当return返回0时有效
# @return:成功0,失败返回对应的错误码
def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):
sha1 = SHA1()
ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr)
if ret != 0:
return ret, None
if not signature == sMsgSignature:
return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
pc = Prpcrypt(self.key)
ret, sReplyEchoStr = pc.decrypt(sEchoStr, self.m_sReceiveId)
return ret, sReplyEchoStr
def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):
# 将企业回复用户的消息加密打包
# @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串
# @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间
# @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce
# sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,
# return:成功0,sEncryptMsg,失败返回对应的错误码None
pc = Prpcrypt(self.key)
ret, encrypt = pc.encrypt(sReplyMsg, self.m_sReceiveId)
encrypt = encrypt.decode('utf8')
if ret != 0:
return ret, None
if timestamp is None:
timestamp = str(int(time.time()))
# 生成安全签名
sha1 = SHA1()
ret, signature = sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt)
if ret != 0:
return ret, None
xmlParse = XMLParse()
return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce)
def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
# 检验消息的真实性,并且获取解密后的明文
# @param sMsgSignature: 签名串,对应URL参数的msg_signature
# @param sTimeStamp: 时间戳,对应URL参数的timestamp
# @param sNonce: 随机串,对应URL参数的nonce
# @param sPostData: 密文,对应POST请求的数据
# xml_content: 解密后的原文,当return返回0时有效
# @return: 成功0,失败返回对应的错误码
# 验证安全签名
xmlParse = XMLParse()
ret, encrypt = xmlParse.extract(sPostData)
if ret != 0:
return ret, None
sha1 = SHA1()
ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt)
if ret != 0:
return ret, None
if not signature == sMsgSignature:
return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
pc = Prpcrypt(self.key)
ret, xml_content = pc.decrypt(encrypt, self.m_sReceiveId)
return ret, xml_content
2.6 启动应用,测试收发
nohup python3 app.py > /dev/null 2>&1 &
测试收发(旧图):
三、菜单功能示例
3.1 help_funny_list.py
菜单内容:
(注:改用了主动发送消息的方式,将回复内容设为markdown,并发送至指定成员ID)
import requests
import json
# 帮助菜单
def help_funny_list(to_username_text, from_username_text, create_time_text):
# 获取access_token
token_api = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
params = {
'corpid': "******************",
'corpsecret': "******************"
}
access_token = requests.get(token_api, params=params).json()['access_token']
print(access_token)
# 主动发送消息
send_api = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'
payload = json.dumps({
"touser": from_username_text,
"msgtype": "markdown",
"agentid": 1000003,
"markdown": {
"content": "# 【其他功能菜单】\n "
">**【`#天气查询`】:查询实时天气信息**\n\n\n "
">**【`#电话查询`】:查询手机号归属地信息**\n "
">[<font color=\"warning\">指令格式</font>]:<font color=\"comment\">#电话查询#手机号码</font>\n"
">[<font color=\"info\">指令示例</font>]:<font color=\"comment\">##电话查询#15200000000</font>\n\n\n"
">**【`#快递查询`】:查询实时快递物流信息**\n "
">[<font color=\"warning\">指令格式</font>]:<font color=\"comment\">#快递查询#快递公司#手机尾号#快递单号</font>\n"
">[<font color=\"info\">指令示例</font>]:<font color=\"comment\">#快递查询#京东快递#95**#JD01425**************</font>\n"
"→[点击查看可用快递列表](http://work.weixin.qq.com/api/doc)\n\n\n"
">**【`#星座运势`】:查询当日十二星座运势**\n "
">[<font color=\"warning\">指令格式</font>]:<font color=\"comment\">#星座运势#星座名称</font>\n"
">[<font color=\"info\">指令示例</font>]:<font color=\"comment\">#星座运势#金牛座</font>\n\n\n"
">**【`#随机点歌`】:随机获取网易云在线歌曲**\n\n\n "
">**【`#摸鱼日报`】:获取当日宜忌事项、历史事件、热点新闻**\n\n\n "
">**【`#help`】:获取主菜单**"
}
})
headers = {
'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
'Content-Type': 'application/json',
'Accept': '*/*',
'Host': 'qyapi.weixin.qq.com',
'Connection': 'keep-alive'
}
response = requests.post(send_api, headers=headers, data=payload, timeout=15)
if response.status_code == 200:
print('ok')
将刚刚记录下来的 企业ID
和 Secret
分别替换至该段:
效果如下:
3.2 通过 paramiko
交互远程服务器,回复服务器信息
get_ops.py
内容如下:
import paramiko
import os
def get_ops(content_text, to_username_text, from_username_text, create_time_text):
# 使用 split 以#分割字符串
parts = content_text.split('#')
# 检查分割后的列表是否有足够的分段
if len(parts) >= 4:
ip_address = parts[2] # 获取ip地址
command = parts[3] # 获取命令
client = paramiko.SSHClient()
# 添加服务器密钥,如果使用的是密钥形式
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接SSH服务端
client.connect(ip_address, port=22, username='root', password='********')
# 执行命令
stdin, stdout, stderr = client.exec_command(command)
# 获取命令执行结果
result = stdout.read().decode('utf-8', errors='ignore')
sRespData = """<xml>
<ToUserName>{to_username}</ToUserName>
<FromUserName>{from_username}</FromUserName>
<CreateTime>{create_time}</CreateTime>
<MsgType>text</MsgType>
<Content>{content}</Content>
</xml>
""".format(to_username=to_username_text,
from_username=from_username_text,
create_time=create_time_text,
content=result, )
return sRespData
效果如下:
3.3 查询物流信息,被动回复纯文本格式
。
get_express.py
内容如下:
import requests
import json
import re
def get_express(content_text, to_username_text, from_username_text, create_time_text):
# 将接收到的消息内容以#进行分割
parts = content_text.split('#')
if len(parts) >= 4:
com_str = parts[2]
phone_int = parts[3]
no_str = parts[4]
# 查询本地json文件中com_str对应的NO值
with open('funny/exp.json', 'r', encoding='utf-8') as f:
data = json.load(f)["result"]
for item in data:
if item["com"] == com_str:
com_no = (item["no"])
break
else:
error_msg = '输入有误,未找到物流信息'
# 聚合平台物流查询接口,接口文档:https://www.juhe.cn/docs/api/id/43
api_url = "http://v.juhe.cn/exp/index"
params = {
"key": "*********************************",
"com": com_no,
"no": no_str,
"receiverPhone": phone_int
}
response = requests.get(api_url, params=params)
json_data = response.json()
# 提取result中的值
exp_info = {
"company": json_data["result"].get("company"),
"no": json_data["result"].get("no"),
"status_detail": json_data["result"].get("status_detail")
}
# 从 list 中提取每一项的 datetime 和 remark ,然后格式化为字符串
list_items = "\n\n".join(
"【物流时间】:{}\n【物流详情】:{}".format(item.get("datetime"), item.get("remark"))
for item in json_data["result"].get("list", [])
)
# 构造最终的回复字符串,包括所有物流详情
reply = ('【物流公司】:{company}\n【物流单号】:{no}\n【物流状态】:{status_detail}\n{list_items}'.format(**exp_info,
list_items=list_items))
sRespData = """<xml>
<ToUserName>{to_username}</ToUserName>
<FromUserName>{from_username}</FromUserName>
<CreateTime>{create_time}</CreateTime>
<MsgType>text</MsgType>
<Content>{content}</Content>
</xml>
""".format(to_username=to_username_text,
from_username=from_username_text,
create_time=create_time_text,
content=reply, )
return sRespData
效果如下:
3.4 获取随机音乐链接,被动回复图文格式
。
get_music.py
内容如下:
import requests
import json
import re
def get_music(to_username_text, from_username_text, create_time_text):
api_url = "https://api.52vmy.cn/api/music/wy/rand"
response = requests.get(api_url)
data = response.json().get('data', {})
music_info = {key: data.get(key) for key in ['song', 'singer', 'cover', 'Music']}
sRespData = """<xml>
<ToUserName>{to_username}</ToUserName>
<FromUserName>{from_username}</FromUserName>
<CreateTime>{create_time}</CreateTime>
<MsgType>news</MsgType>
<ArticleCount>1</ArticleCount>
<Articles>
<item>
<Title>歌曲名:{title}</Title>
<Description>演唱者:{description}</Description>
<PicUrl>{picurl}</PicUrl>
<Url>{url}</Url>
</item>
</Articles>
</xml>
""".format(to_username=to_username_text,
from_username=from_username_text,
create_time=create_time_text,
title=music_info['song'],
description=music_info['singer'],
picurl=music_info['cover'],
url=music_info['Music'])
return sRespData
效果如下:
3.5 其他问题
由于应用被动回复消息的格式不支持文件类型,如需将文件回复至企微可以采用2种方式:
①、在服务端配置nginx静态目录,通过静态页面路径 + 文件名 的形式拼接出完整的文件 url 地址,再通过图文类型的消息格式带入回复。
②、改用主动发送消息的方式,通过企微素材上传接口上传文件,并获取对应的 media_id ,再通过文件类型消息带入 media_id 指定对应成员完成发送,示例:
# ===================== 【获取access_token】 ==========================
# API 地址
url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
# 企业ID
corpid = '*****************'
# SECRET
corpsecret = '******************************'
params = {
'corpid': corpid,
'corpsecret': corpsecret
}
access_token = requests.get(url, params=params).json()['access_token']
print(access_token)
# ================== 【上传素材获取media_id】 =======================
# API 地址
url = 'https://qyapi.weixin.qq.com/cgi-bin/media/upload'
params = {
'access_token': access_token,
'type': 'file'
}
# 要上传的文件
files = {
'media': ('111.xlsx', open(r'C:\Users\Looper\Desktop\111.xlsx', 'rb'), 'application/octet-stream')
}
media_id = requests.post(url, params=params, files=files).json()['media_id']
print(media_id)
到此,企微应用的消息收发实施,全部测试完毕。