当前位置: 首页 > article >正文

企业微信应用消息收发实施记录

一、前置配置

1.1 进入我的企业页面,记录下企业ID

1.2 创建企微应用,记录下应用的 AgentIdSecret

1.3 设置应用的企业可信IP,将服务器公网 IP 填入即可。

1.4 设置应用接收消息API

填入服务器 API 地址,并记录下随机获取的 TokenEncodingAESKey。完成后,先不要点击保存,后续等服务端应用启动后再保存,即可完成校验

二、服务端部署

2.1 企业应用消息收发流程拓扑

2.2 企微相关开发者文档说明

①、消息接收概述(主要说明了 消息加解密方法、消息收发协议、消息收发格式等)

概述 - 文档 - 企业微信开发者中心 (qq.com)

②、消息加解密官方库(包含多种代码语言,本文使用的是python库,解压使用的文件如下:)

加解密库下载与返回码 - 文档 - 企业微信开发者中心 (qq.com)

注意:需要使用 WXBizMsgCrypt3.py 这个文件

③、企微应用主动发送消息(被动方式回复消息的格式不支持markdown和文件类型,为使回复内容更美观,可以采用主动发送消息的方式进行指定回复。)

发送应用消息 - 文档 - 企业微信开发者中心 (qq.com)

2.3 安装python相关依赖库。

pip3 install -r requirements.txt

 requirements 内容如下:

bcrypt==4.1.1
blinker==1.8.2
certifi==2024.8.30
cffi==1.17.0
charset-normalizer==3.3.2
click==8.1.7
colorama==0.4.6
crypto==1.4.1
cryptography==36.0.2
flask==3.0.3
idna==3.8
importlib-metadata==8.4.0
itsdangerous==2.2.0
jinja2==3.1.4
MarkupSafe==2.1.5
Naked==0.1.32
paramiko==3.0.0
pycparser==2.22
pycryptodome==3.20.0
PyNaCl==1.5.0
PyYAML==6.0.2
requests==2.32.3
shellescape==3.8.1
urllib3==2.2.2
werkzeug==3.0.4
zipp==3.20.1

2.4 主程序 app.py 内容:

# -*- coding: utf-8 -*-
from flask import Flask, request, make_response
from WXBizMsgCrypt3 import WXBizMsgCrypt
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import fromstring

# help_list
from help_list import help_list

# funny
from funny.help_funny_list import help_funny_list
from funny.get_weather import get_weather
from funny.get_myrb import get_myrb
from funny.get_music import get_music
from funny.get_fortune import get_fortune
from funny.get_tellocal import get_tel
from funny.get_express import get_express

# ops_tools
from ops_tools.get_ops import get_ops

app = Flask(__name__)


def printXML(xml_content):
    # 创建XML元素
    element = ET.XML(xml_content)

    # 使用indent()函数进行格式化打印
    ET.indent(element)
    print(ET.tostring(element, encoding='unicode'))


# 对应接受消息回调模式中的token,EncodingAESKey 和 企业信息中的企业id
qy_api = [
    WXBizMsgCrypt("***************", "**************************", "*********************"), ]


# 开启消息接受模式时验证接口连通性
def signature(request, i):
    msg_signature = request.args.get('msg_signature', '')
    timestamp = request.args.get('timestamp', '')
    nonce = request.args.get('nonce', '')
    echo_str = request.args.get('echostr', '')
    ret, sEchoStr = qy_api[i].VerifyURL(msg_signature, timestamp, nonce, echo_str)
    if (ret != 0):
        print("ERR: VerifyURL ret: " + str(ret))
        return ("failed")
    else:
        return (sEchoStr)


# 接收用户消息,可进行被动响应
def handle_user_message(request, i):
    user_message = request.data
    printXML(user_message)
    msg_signature = request.args.get('msg_signature', '')
    timestamp = request.args.get('timestamp', '')
    nonce = request.args.get('nonce', '')
    ret, sMsg = qy_api[i].DecryptMsg(user_message.decode('utf-8'), msg_signature, timestamp, nonce)
    decrypt_data = {}
    for node in list(fromstring(sMsg.decode('utf-8'))):
        decrypt_data[node.tag] = node.text
    # 解析后得到的decrypt_data: {"ToUserName":"企业号", "FromUserName":"发送者用户名", "CreateTime":"发送时间", "Content":"用户发送的内容", "MsgId":"唯一id,需要针对此id做出响应", "AagentID": "应用id"}
    # 用户应根据Content的内容自定义要做出的行为,包括响应返回数据,如下例子,如果发送的是123,就返回hello world

    content_text = decrypt_data.get('Content', '')
    to_username_text = decrypt_data.get('ToUserName', '')
    from_username_text = decrypt_data.get('FromUserName', '')
    create_time_text = decrypt_data.get('CreateTime', '')

    # 主菜单
    if content_text == '#help':
        sRespData = help_list(to_username_text, from_username_text, create_time_text)

    # 生活菜单
    if content_text == '#help02':
        sRespData = help_funny_list(to_username_text, from_username_text, create_time_text)

    # 天气查询
    if content_text == '#天气查询':
        sRespData = get_weather(to_username_text, from_username_text, create_time_text)

    # 摸鱼日报
    if content_text == '#摸鱼日报':
        sRespData = get_myrb(to_username_text, from_username_text, create_time_text)

    # 随机点歌
    if content_text == '#随机点歌':
        sRespData = get_music(to_username_text, from_username_text, create_time_text)

    # 星座运势
    if "#星座运势#" in content_text:
        sRespData = get_fortune(content_text, to_username_text, from_username_text, create_time_text)

    # 电话查询
    if "#电话查询#" in content_text:
        sRespData = get_tel(content_text, to_username_text, from_username_text, create_time_text)

    # 快递查询
    if "#快递查询#" in content_text:
        sRespData = get_express(content_text, to_username_text, from_username_text, create_time_text)

    # OPS工具
    if "#ops#" in content_text:
        sRespData = get_ops(content_text, to_username_text, from_username_text, create_time_text)

    ret, send_msg = qy_api[i].EncryptMsg(sReplyMsg=sRespData, sNonce=nonce)
    if ret == 0:
        return send_msg
    else:
        print(send_msg)


@app.route('/company_wechat', methods=['GET', 'POST'])
def company_wechat():
    if request.method == 'GET':
        return signature(request, 0)
    else:
        print("收到请求......")
        return handle_user_message(request, 0)

# Flask服务端口,可自定义
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=6969, debug=True)

 将刚刚记录下的 TokenEncodingAESKey企业ID 分别替换至该段:

2.5 解密库 WXBizMsgCrypt3.py 内容:

# -*- encoding:utf-8 -*-

""" 对企业微信发送给企业后台的消息加解密示例代码.
@copyright: Copyright (c) 1998-2014 Tencent Inc.

"""
# ------------------------------------------------------------------------
import logging
import base64
import random
import hashlib
import time
import struct
from Crypto.Cipher import AES
import xml.etree.cElementTree as ET
import socket
import ierror

"""
关于Crypto.Cipher模块,ImportError: No module named 'Crypto'解决方案
请到官方网站 https://www.dlitz.net/software/pycrypto/ 下载pycrypto。
下载后,按照README中的“Installation”小节的提示进行pycrypto安装。
"""

class FormatException(Exception):
    pass

def throw_exception(message, exception_class=FormatException):
    """my define raise exception function"""
    raise exception_class(message)

class SHA1:
    """计算企业微信的消息签名接口"""

    def getSHA1(self, token, timestamp, nonce, encrypt):
        """用SHA1算法生成安全签名
        @param token:  票据
        @param timestamp: 时间戳
        @param encrypt: 密文
        @param nonce: 随机字符串
        @return: 安全签名
        """
        try:
            sortlist = [token, timestamp, nonce, encrypt]
            sortlist.sort()
            sha = hashlib.sha1()
            sha.update("".join(sortlist).encode())
            return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_ComputeSignature_Error, None

class XMLParse:
    """提供提取消息格式中的密文及生成回复消息格式的接口"""

    # xml消息模板
    AES_TEXT_RESPONSE_TEMPLATE = """<xml>
<Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt>
<MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature>
<TimeStamp>%(timestamp)s</TimeStamp>
<Nonce><![CDATA[%(nonce)s]]></Nonce>
</xml>"""

    def extract(self, xmltext):
        """提取出xml数据包中的加密消息
        @param xmltext: 待提取的xml字符串
        @return: 提取出的加密消息字符串
        """
        try:
            xml_tree = ET.fromstring(xmltext)
            encrypt = xml_tree.find("Encrypt")
            return ierror.WXBizMsgCrypt_OK, encrypt.text
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_ParseXml_Error, None

    def generate(self, encrypt, signature, timestamp, nonce):
        """生成xml消息
        @param encrypt: 加密后的消息密文
        @param signature: 安全签名
        @param timestamp: 时间戳
        @param nonce: 随机字符串
        @return: 生成的xml字符串
        """
        resp_dict = {
            'msg_encrypt': encrypt,
            'msg_signaturet': signature,
            'timestamp': timestamp,
            'nonce': nonce,
        }
        resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict
        return resp_xml

class PKCS7Encoder():
    """提供基于PKCS7算法的加解密接口"""

    block_size = 32

    def encode(self, text):
        """ 对需要加密的明文进行填充补位
        @param text: 需要进行填充补位操作的明文
        @return: 补齐明文字符串
        """
        text_length = len(text)
        # 计算需要填充的位数
        amount_to_pad = self.block_size - (text_length % self.block_size)
        if amount_to_pad == 0:
            amount_to_pad = self.block_size
        # 获得补位所用的字符
        pad = chr(amount_to_pad)
        return text + (pad * amount_to_pad).encode()

    def decode(self, decrypted):
        """删除解密后明文的补位字符
        @param decrypted: 解密后的明文
        @return: 删除补位字符后的明文
        """
        pad = ord(decrypted[-1])
        if pad < 1 or pad > 32:
            pad = 0
        return decrypted[:-pad]

class Prpcrypt(object):
    """提供接收和推送给企业微信消息的加解密接口"""

    def __init__(self, key):

        # self.key = base64.b64decode(key+"=")
        self.key = key
        # 设置加解密模式为AES的CBC模式
        self.mode = AES.MODE_CBC

    def encrypt(self, text, receiveid):
        """对明文进行加密
        @param text: 需要加密的明文
        @return: 加密得到的字符串
        """
        # 16位随机字符串添加到明文开头
        text = text.encode()
        text = self.get_random_str() + struct.pack("I", socket.htonl(len(text))) + text + receiveid.encode()

        # 使用自定义的填充方式对明文进行补位填充
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # 加密
        cryptor = AES.new(self.key, self.mode, self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # 使用BASE64对加密后的字符串进行编码
            return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_EncryptAES_Error, None

    def decrypt(self, text, receiveid):
        """对解密后的明文进行补位删除
        @param text: 密文
        @return: 删除填充补位后的明文
        """
        try:
            cryptor = AES.new(self.key, self.mode, self.key[:16])
            # 使用BASE64对密文进行解码,然后AES-CBC解密
            plain_text = cryptor.decrypt(base64.b64decode(text))
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_DecryptAES_Error, None
        try:
            pad = plain_text[-1]
            # 去掉补位字符串
            # pkcs7 = PKCS7Encoder()
            # plain_text = pkcs7.encode(plain_text)
            # 去除16位随机字符串
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
            xml_content = content[4: xml_len + 4]
            from_receiveid = content[xml_len + 4:]
        except Exception as e:
            logger = logging.getLogger()
            logger.error(e)
            return ierror.WXBizMsgCrypt_IllegalBuffer, None

        if from_receiveid.decode('utf8') != receiveid:
            return ierror.WXBizMsgCrypt_ValidateCorpid_Error, None
        return 0, xml_content

    def get_random_str(self):
        """ 随机生成16位字符串
        @return: 16位字符串
        """
        return str(random.randint(1000000000000000, 9999999999999999)).encode()

class WXBizMsgCrypt(object):
    # 构造函数
    def __init__(self, sToken, sEncodingAESKey, sReceiveId):
        try:
            self.key = base64.b64decode(sEncodingAESKey + "=")
            assert len(self.key) == 32
        except:
            throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
            # return ierror.WXBizMsgCrypt_IllegalAesKey,None
        self.m_sToken = sToken
        self.m_sReceiveId = sReceiveId

        # 验证URL
        # @param sMsgSignature: 签名串,对应URL参数的msg_signature
        # @param sTimeStamp: 时间戳,对应URL参数的timestamp
        # @param sNonce: 随机串,对应URL参数的nonce
        # @param sEchoStr: 随机串,对应URL参数的echostr
        # @param sReplyEchoStr: 解密之后的echostr,当return返回0时有效
        # @return:成功0,失败返回对应的错误码

    def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr)
        if ret != 0:
            return ret, None
        if not signature == sMsgSignature:
            return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
        pc = Prpcrypt(self.key)
        ret, sReplyEchoStr = pc.decrypt(sEchoStr, self.m_sReceiveId)
        return ret, sReplyEchoStr

    def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):
        # 将企业回复用户的消息加密打包
        # @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串
        # @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间
        # @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce
        # sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,
        # return:成功0,sEncryptMsg,失败返回对应的错误码None
        pc = Prpcrypt(self.key)
        ret, encrypt = pc.encrypt(sReplyMsg, self.m_sReceiveId)
        encrypt = encrypt.decode('utf8')
        if ret != 0:
            return ret, None
        if timestamp is None:
            timestamp = str(int(time.time()))
        # 生成安全签名
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt)
        if ret != 0:
            return ret, None
        xmlParse = XMLParse()
        return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce)

    def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
        # 检验消息的真实性,并且获取解密后的明文
        # @param sMsgSignature: 签名串,对应URL参数的msg_signature
        # @param sTimeStamp: 时间戳,对应URL参数的timestamp
        # @param sNonce: 随机串,对应URL参数的nonce
        # @param sPostData: 密文,对应POST请求的数据
        #  xml_content: 解密后的原文,当return返回0时有效
        # @return: 成功0,失败返回对应的错误码
        # 验证安全签名
        xmlParse = XMLParse()
        ret, encrypt = xmlParse.extract(sPostData)
        if ret != 0:
            return ret, None
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt)
        if ret != 0:
            return ret, None
        if not signature == sMsgSignature:
            return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
        pc = Prpcrypt(self.key)
        ret, xml_content = pc.decrypt(encrypt, self.m_sReceiveId)
        return ret, xml_content

2.6 启动应用,测试收发

nohup python3 app.py > /dev/null 2>&1 &

 测试收发(旧图):

三、菜单功能示例

3.1 help_funny_list.py 菜单内容:

注:改用了主动发送消息的方式,将回复内容设为markdown,并发送至指定成员ID)

import requests
import json


# 帮助菜单
def help_funny_list(to_username_text, from_username_text, create_time_text):

	# 获取access_token
    token_api = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'

    params = {
        'corpid': "******************",
        'corpsecret': "******************"
    }
    access_token = requests.get(token_api, params=params).json()['access_token']
    print(access_token)
	
	# 主动发送消息
    send_api = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'

    payload = json.dumps({
        "touser": from_username_text,
        "msgtype": "markdown",
        "agentid": 1000003,
        "markdown": {
            "content": "# 【其他功能菜单】\n "
                       ">**【`#天气查询`】:查询实时天气信息**\n\n\n "
                       ">**【`#电话查询`】:查询手机号归属地信息**\n "
                       ">[<font color=\"warning\">指令格式</font>]:<font color=\"comment\">#电话查询#手机号码</font>\n"
                       ">[<font color=\"info\">指令示例</font>]:<font color=\"comment\">##电话查询#15200000000</font>\n\n\n"
                       ">**【`#快递查询`】:查询实时快递物流信息**\n "
                       ">[<font color=\"warning\">指令格式</font>]:<font color=\"comment\">#快递查询#快递公司#手机尾号#快递单号</font>\n"
                       ">[<font color=\"info\">指令示例</font>]:<font color=\"comment\">#快递查询#京东快递#95**#JD01425**************</font>\n"
                       "→[点击查看可用快递列表](http://work.weixin.qq.com/api/doc)\n\n\n"
                       ">**【`#星座运势`】:查询当日十二星座运势**\n "
                       ">[<font color=\"warning\">指令格式</font>]:<font color=\"comment\">#星座运势#星座名称</font>\n"
                       ">[<font color=\"info\">指令示例</font>]:<font color=\"comment\">#星座运势#金牛座</font>\n\n\n"
                       ">**【`#随机点歌`】:随机获取网易云在线歌曲**\n\n\n "
                       ">**【`#摸鱼日报`】:获取当日宜忌事项、历史事件、热点新闻**\n\n\n "
                       ">**【`#help`】:获取主菜单**"
        }
    })
    headers = {
        'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
        'Content-Type': 'application/json',
        'Accept': '*/*',
        'Host': 'qyapi.weixin.qq.com',
        'Connection': 'keep-alive'
    }
    response = requests.post(send_api, headers=headers, data=payload, timeout=15)
    if response.status_code == 200:
        print('ok')

将刚刚记录下来的 企业IDSecret 分别替换至该段:

 效果如下:

3.2 通过 paramiko 交互远程服务器,回复服务器信息

get_ops.py 内容如下:

import paramiko
import os


def get_ops(content_text, to_username_text, from_username_text, create_time_text):
    # 使用 split 以#分割字符串
    parts = content_text.split('#')
    # 检查分割后的列表是否有足够的分段
    if len(parts) >= 4:
        ip_address = parts[2]  # 获取ip地址
        command = parts[3]  # 获取命令
        client = paramiko.SSHClient()
        # 添加服务器密钥,如果使用的是密钥形式
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        # 连接SSH服务端
        client.connect(ip_address, port=22, username='root', password='********')
        # 执行命令
        stdin, stdout, stderr = client.exec_command(command)
        # 获取命令执行结果
        result = stdout.read().decode('utf-8', errors='ignore')
        sRespData = """<xml>
                <ToUserName>{to_username}</ToUserName>
                <FromUserName>{from_username}</FromUserName>
                <CreateTime>{create_time}</CreateTime>
                <MsgType>text</MsgType>
                <Content>{content}</Content>
                </xml>
                """.format(to_username=to_username_text,
                           from_username=from_username_text,
                           create_time=create_time_text,
                           content=result, )
        return sRespData

效果如下:

3.3 查询物流信息,被动回复纯文本格式

get_express.py 内容如下:

import requests
import json
import re


def get_express(content_text, to_username_text, from_username_text, create_time_text):
    
    # 将接收到的消息内容以#进行分割
    parts = content_text.split('#')
    if len(parts) >= 4:
        com_str = parts[2]
        phone_int = parts[3]
        no_str = parts[4]

    # 查询本地json文件中com_str对应的NO值
    with open('funny/exp.json', 'r', encoding='utf-8') as f:
        data = json.load(f)["result"]
    for item in data:
        if item["com"] == com_str:
            com_no = (item["no"])
            break
    else:
        error_msg = '输入有误,未找到物流信息'

	# 聚合平台物流查询接口,接口文档:https://www.juhe.cn/docs/api/id/43
    api_url = "http://v.juhe.cn/exp/index"
    params = {
        "key": "*********************************",
        "com": com_no,
        "no": no_str,
        "receiverPhone": phone_int
    }
    response = requests.get(api_url, params=params)
    json_data = response.json()
    
    # 提取result中的值
    exp_info = {
        "company": json_data["result"].get("company"),
        "no": json_data["result"].get("no"),
        "status_detail": json_data["result"].get("status_detail")
    }
   
   # 从 list 中提取每一项的 datetime 和 remark ,然后格式化为字符串
    list_items = "\n\n".join(
        "【物流时间】:{}\n【物流详情】:{}".format(item.get("datetime"), item.get("remark"))
        for item in json_data["result"].get("list", [])
    )
    # 构造最终的回复字符串,包括所有物流详情
    reply = ('【物流公司】:{company}\n【物流单号】:{no}\n【物流状态】:{status_detail}\n{list_items}'.format(**exp_info,
                                                                                                      list_items=list_items))

    sRespData = """<xml>
            <ToUserName>{to_username}</ToUserName>
            <FromUserName>{from_username}</FromUserName>
            <CreateTime>{create_time}</CreateTime>
            <MsgType>text</MsgType>
            <Content>{content}</Content>
            </xml>
            """.format(to_username=to_username_text,
                       from_username=from_username_text,
                       create_time=create_time_text,
                       content=reply, )
    return sRespData

 效果如下:

3.4 获取随机音乐链接,被动回复图文格式

get_music.py 内容如下:

import requests
import json
import re


def get_music(to_username_text, from_username_text, create_time_text):
    api_url = "https://api.52vmy.cn/api/music/wy/rand"
    response = requests.get(api_url)
    data = response.json().get('data', {})
    music_info = {key: data.get(key) for key in ['song', 'singer', 'cover', 'Music']}
    sRespData = """<xml>
                        <ToUserName>{to_username}</ToUserName>
                        <FromUserName>{from_username}</FromUserName>
                        <CreateTime>{create_time}</CreateTime>
                        <MsgType>news</MsgType>
                        <ArticleCount>1</ArticleCount>
                            <Articles>
                                <item>
                                    <Title>歌曲名:{title}</Title>
                                    <Description>演唱者:{description}</Description>
                                    <PicUrl>{picurl}</PicUrl>
                                    <Url>{url}</Url>
                                </item>
                            </Articles>
                        </xml>
                        """.format(to_username=to_username_text,
                                   from_username=from_username_text,
                                   create_time=create_time_text,
                                   title=music_info['song'],
                                   description=music_info['singer'],
                                   picurl=music_info['cover'],
                                   url=music_info['Music'])
    return sRespData

效果如下:

3.5 其他问题

由于应用被动回复消息的格式不支持文件类型,如需将文件回复至企微可以采用2种方式:

①、在服务端配置nginx静态目录,通过静态页面路径 + 文件名 的形式拼接出完整的文件 url 地址,再通过图文类型的消息格式带入回复。

②、改用主动发送消息的方式,通过企微素材上传接口上传文件,并获取对应的 media_id ,再通过文件类型消息带入 media_id 指定对应成员完成发送,示例:

# ===================== 【获取access_token】 ==========================

# API 地址
url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
# 企业ID
corpid = '*****************'
# SECRET
corpsecret = '******************************'

params = {
    'corpid': corpid,
    'corpsecret': corpsecret
}

access_token = requests.get(url, params=params).json()['access_token']
print(access_token)

# ================== 【上传素材获取media_id】 =======================

# API 地址
url = 'https://qyapi.weixin.qq.com/cgi-bin/media/upload'

params = {
    'access_token':  access_token,
    'type': 'file'
}

# 要上传的文件
files = {
    'media': ('111.xlsx', open(r'C:\Users\Looper\Desktop\111.xlsx', 'rb'), 'application/octet-stream')
}

media_id = requests.post(url, params=params, files=files).json()['media_id']
print(media_id)

到此,企微应用的消息收发实施,全部测试完毕。  


http://www.kler.cn/a/313819.html

相关文章:

  • 字节跳动Android面试题汇总及参考答案(80+面试题,持续更新)
  • docker:docker: Get https://registry-1.docker.io/v2/: net/http: request canceled
  • uniapp使用scroll-view下拉刷新与上滑加载
  • Golang | Leetcode Golang题解之第559题N叉树的最大深度
  • HarmonyOS 如何实现传输中的数据加密
  • Flutter Getx状态管理
  • Spring Boot实现:Java免税商品购物商城全攻略
  • 8. 详细描述一条 SQL 语句在 MySQL 中的执行过程。
  • 深度学习——微积分求导,反向传播
  • 简单多状态dp第三弹 leetcode -买卖股票的最佳时机问题
  • 嵌入式Linux学习笔记(6)-线程处理、线程同步、线程池(c语言实现)
  • Spring Boot与gRPC的完美融合:构建高效用户服务与订单服务通信
  • 视频存储EasyCVR视频监控汇聚管理平台设备录像下载报错404是什么原因?
  • GO GIN 推荐的库
  • 2024年上海初中生古诗文大会倒计时一个半月:来做2024官方模拟题
  • 新手学习Python第十天-新手笔记(速学)
  • 【深度学习】(1)--神经网络
  • Pytorch Lightning框架
  • Java集合(List篇)
  • SpringBootAdmin源码修改编译002_踩坑记录一堆坑_记录过程_没有成功---VUE工作笔记0027
  • linux 操作系统下dhcrelay命令介绍和案例应用
  • 28V_1MHZ电子烟,无线鼠标,医疗器械等专用恒频升压转换器超小体积封装
  • 用户态缓存:高效数据交互与性能优化
  • Spring Boot中的响应与分层解耦架构
  • C一语言—动态内存管理
  • 24年蓝桥杯及攻防世界赛题-MISC-1