调用钉钉接口发送消息
调用钉钉接口发送消息
通过创建钉钉开放平台创建H5小程序,通过该小程序可以实现向企业内的钉钉用户发送消息(消息是以工作通知的形式发送)
1、目前仅支持发送文本消息,相同内容的文本只能成功发送一次,但是接口返回发送成功
2、需要给小程序开放各种权限
import json
import objectpath
import requests
# 在钉钉开放平台上创建应用后获取
AGENT_ID = None
APP_KEY = ""
APP_SECRET = ""
dd_domain = "https://oapi.dingtalk.com"
class DingtalkHelper():
def __init__(self):
self.access_token = self.get_token()
def get_token(self):
"""
获取token
:return:
"""
url = f"{dd_domain}/gettoken"
data = {
"appkey": APP_KEY,
"appsecret": APP_SECRET,
}
res_json = requests.get(url=url, params=data).json()
return objectpath.Tree(res_json).execute("$.access_token")
def get_depList(self):
"""
获取部门列表
:return:
"""
url = f"{dd_domain}/department/list"
data = {
"access_token": self.access_token,
}
res_json = requests.get(url=url, params=data).json()
departmentIds_list = list(objectpath.Tree(res_json).execute("$..*[@.name is not null].id"))
return departmentIds_list
def get_memberList(self, depId: str):
"""
获取部门用户userid列表
:param depId:
:return:
"""
url = f"{dd_domain}/user/getDeptMember"
data = {
"access_token": self.access_token,
"deptId": depId,
}
res_json = requests.get(url=url, params=data).json()
return res_json
def get_userInfo(self, userId):
"""
获取部门用户userid列表
:param userId:
:return:
"""
url = f"{dd_domain}/user/get"
data = {
"access_token": self.access_token,
"userid": userId,
}
res_json = requests.get(url=url, params=data).json()
userIds_list = list(objectpath.Tree(res_json).execute("$..*[@.userid is not null].(mobile, userid)"))
userInfo = userIds_list[0]
return {userInfo.get("mobile"): userInfo.get("userid")}
def send_ddMsg(self, userId, content):
"""
向企业个人发送钉钉通知
:param userId:
:return:
"""
url = f"{dd_domain}/message/send?access_token={self.access_token}"
body_dict = {
"touser": userId,
"agentid": AGENT_ID,
"msgtype": "text",
"text": {"content": content}
}
res = requests.post(url=url, data=json.dumps(body_dict))
print(res.text)
def getAll_userIds(self):
"""
获取公司内所有人员的userId
:return:
"""
deptIds_list = self.get_depList()
userIds_list = list()
for deptId in deptIds_list:
res_json = self.get_memberList(deptId)
userIds = objectpath.Tree(res_json).execute("$.userIds")
userIds_list.extend(userIds)
return userIds_list
def getAll_usersInfo(self):
"""
获取公司内所有人员的mobile和userId的映射关系表
:return:
"""
userIds_list = self.getAll_userIds()
usersInfo_dict = dict()
for userId in userIds_list:
userInfo = self.get_userInfo(userId)
usersInfo_dict.update(userInfo)
return usersInfo_dict