当前位置: 首页 > article >正文

飞书机器人告警实现

功能实现说明

1. 准备好config.json文件,用于存放配置文件信息

2. 准备好config.py用于读取配置文件信息

3. feishu.py用于实现获取临时token以及推送消息至指定机器人并推到指定飞书用户

config.json

{
    "auth":{
        "app_id": "cli_xxxxxc",
        "app_secret": "xxxxxxx"
    },
    "user_ids":{
        "replace_your_feishu_user_name":"replace_your_feishu_user_name's_user_id"
    }
}

config.py

"""
This module provides a Config class for managing configuration settings for the Feishu API integration.

It reads configuration data from a JSON file and provides methods to access various settings.
"""

import json

class Config:
    """
    A class to manage configuration settings for the Feishu API integration.

    This class reads configuration data from a JSON file and provides methods
    to access app credentials and user IDs.
    """

    def __init__(self, config_file='config.json'):
        """
        Initialize the Config instance.

        Args:
            config_file (str): Path to the configuration JSON file. Defaults to 'config.json'.

        Raises:
            FileNotFoundError: If the specified config file is not found.
            json.JSONDecodeError: If the config file is not valid JSON.
            KeyError: If expected keys are missing in the config file.
        """
        with open(config_file, 'r') as f:
            config = json.load(f)
        
        # Extract configuration values
        self.app_id = config['auth']['app_id']
        self.app_secret = config['auth']['app_secret']
        self.user_ids = config['user_ids']

    def get_app_id(self):
        """
        Get the Feishu app ID.

        Returns:
            str: The app ID.
        """
        return self.app_id

    def get_app_secret(self):
        """
        Get the Feishu app secret.

        Returns:
            str: The app secret.
        """
        return self.app_secret

    def get_user_id(self, name):
        """
        Get a user ID by name.

        Args:
            name (str): The name of the user.

        Returns:
            str: The user ID if found, None otherwise.
        """
        return self.user_ids.get(name)

if __name__=='__main__':
    # Example usage of the Config class
    config = Config()
    print(config.get_app_id())
    print(config.get_app_secret())
    print(config.get_user_id('test'))

feishu.py

飞书机器人消息推送功能实现

import urllib3
import json
import ssl
import warnings
from config import Config

# Suppress only the single InsecureRequestWarning from urllib3 needed.
warnings.filterwarnings('ignore', category=urllib3.exceptions.InsecureRequestWarning)

class Feishu:
    """
    A class to interact with the Feishu API.

    This class provides methods to authenticate and communicate with the Feishu API,
    including obtaining a tenant access token for further API operations.
    """

    def __init__(self):
        """
        Initialize the Feishu instance.

        Sets up the configuration, base URL, and creates a custom SSL context
        to allow unverified HTTPS connections (for development purposes only).
        """
        self.config = Config()
        self.base_url = "https://open.feishu.cn/open-apis"
        self.tenant_access_token = None
        
        # Create a custom SSL context
        ssl_context = ssl.create_default_context()
        ssl_context.check_hostname = False
        ssl_context.verify_mode = ssl.CERT_NONE
        
        # Use the custom SSL context in PoolManager
        self.http = urllib3.PoolManager(
            ssl_context=ssl_context
        )

    def get_tenant_access_token(self):
        """
        Obtain a tenant access token from the Feishu API.

        This method sends a POST request to the Feishu API to get a tenant access token.
        It uses the app_id and app_secret from the config to authenticate the request.

        Returns:
            str: The tenant access token if successful, None otherwise.

        Raises:
            Exception: If an error occurs during the API request.
        """
        url = f"{self.base_url}/auth/v3/tenant_access_token/internal"
        headers = {
            "Content-Type": "application/json"
        }
        data = {
            "app_id": self.config.get_app_id(),
            "app_secret": self.config.get_app_secret()
        }
        try:
            response = self.http.request("POST", url, headers=headers, body=json.dumps(data))
            response_data = json.loads(response.data.decode('utf-8'))
            
            if response.status == 200:
                self.tenant_access_token = response_data.get("tenant_access_token")
                if self.tenant_access_token:
                    return self.tenant_access_token
                else:
                    print("Error: tenant_access_token not found in the response")
            else:
                print(f"Error: HTTP {response.status}")
            
            print("Response data:", response_data)
        except Exception as e:
            print(f"An error occurred: {str(e)}")
        
        return None

if __name__ == "__main__":
    feishu = Feishu()
    token = feishu.get_tenant_access_token()
    if token:
        print(f"Successfully obtained tenant access token: {token}")
    else:
        print("Failed to obtain tenant access token")


http://www.kler.cn/a/488343.html

相关文章:

  • Jmeter-压测时接口如何按照顺序执行
  • ORB-SALM3配置流程及问题记录
  • 高通,联发科(MTK)等手机平台调优汇总
  • AI多模态技术介绍:视觉语言模型(VLMs)指南
  • Windows 安装 Docker 和 Docker Compose
  • XML通过HTTP POST 请求发送到指定的 API 地址,进行数据回传
  • U盘加密软件哪个好用?免安装、安全、防复制
  • 在 Go 应用中 如何像 FastAPI 一样优雅地构建控制器
  • 【DES加密】
  • el-date-picker日期时间选择器的选择时间限制到分钟级别
  • uniapp页面高度设置(铺满可视区域、顶部状态栏高度、底部导航栏高度)
  • 51单片机——串口通信(重点)
  • 深入解析 Python 2 与 Python 3 的差异与演进
  • 【Python】Python之Selenium基础教程+实战demo:提升你的测试+测试数据构造的效率!
  • [研发效率]什么是软件工程生产力
  • 【Go】:图片上添加水印的全面指南——从基础到高级特性
  • 【Linux】gawk编辑器
  • Python Matplotlib 教程-Matplotlib 如何绘制常见图表
  • 信息科技伦理与道德3:智能决策
  • C++二十三种设计模式之解释器模式
  • 【c++实战项目】负载均衡式在线OJ
  • C#里使用libxl读取EXCEL文件的例子
  • 在Ubuntu中使用systemd设置后台自启动服务
  • ffmpeg7.0 aac转pcm
  • Docker入门篇[SpringBoot之Docker实战系列] - 第534篇
  • 商品详情API接口数据解析,API接口系列(示例返回数据(JSON格式))