当前位置: 首页 > article >正文

Python脚本之获取Splunk数据发送到第三方UDP端口

原文地址:https://www.program-park.top/2024/10/12/python_21/

  在 Linux 环境执行脚本,Python需要引入对应依赖:

pip install splunk-sdk

  离线环境下,可手动执行python进入 Python 解释器的交互式界面,输入以下命令:

import sys
print(sys.path)

  该命令会输出一个列表,包含了 Python 解释器在搜索模块时会查看的所有目录,如/usr/local/lib/python2.7/site-packages/。再去 Splunk 目录,将./etc/apps/splunk_instrumentation/bin/splunk_instrumentation/splunklib目录复制到/usr/local/lib/python2.7/site-packages/一份,Python 环境即有了 Splunk 的依赖。
  脚本执行命令:

python forwarder_udp.py original 1 0 day 192.9.9.9 514

  Python环境:2.7,脚本如下:

# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
from logging import handlers
import sys, io, logging, socket, os
import splunklib.client as client
import splunklib.results as results

# _create_unverified_https_context = ssl._create_unverified_context
# ssl._create_default_https_context = _create_unverified_https_context
reload(sys)
sys.setdefaultencoding('utf8')

# log日志,存放路径:/logs
class Logger:
    level_relations = {
        'debug': logging.DEBUG,
        'info': logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR,
        'crit': logging.CRITICAL
    }

    def __init__(self, option, level='info', when='W0', backCount=8):
        logfile = "/logs/{}.log".format(option)
        if not os.path.exists('/logs'):
            os.makedirs("/logs")
        self.logger = logging.getLogger(logfile)
        fmt = '%(asctime)s - %(levelname)s: %(message)s'
        format_str = logging.Formatter(fmt)
        self.logger.setLevel(self.level_relations.get(level))
        self.logger.handlers = []
        # stream_handler = logging.StreamHandler()
        # stream_handler.setFormatter(format_str)
        # self.logger.addHandler(stream_handler)
        file_handler = handlers.TimedRotatingFileHandler(filename=logfile, when=when, backupCount=backCount,
                                                         encoding='utf-8')
        file_handler.setFormatter(format_str)
        self.logger.addHandler(file_handler)

# splunk客户端
# option:数据分类标识
class ConnectPhoenix:
    def __init__(self,option):
        self.HOST = "10.10.10.1"
        self.PORT = 8089
        self.USERNAME = "admin"
        self.PASSWORD = "123456"
        self.option = option

    def phoenixService(self):
        phoenix_service = client.connect(
            host=self.HOST,
            port=self.PORT,
            username=self.USERNAME,
            password=self.PASSWORD,
            verify=False,
            app="search")
        return phoenix_service

    # 获取查询SPL
    def get_query(self):
        if self.option == 'original':
            return 'search index=ri_* | table _time,_raw'

    # 获取查询结果
    # period:起始时间
    # delay:截止时间
    # time_type:时间类型,day、hour、minute
    def get_results(self, period, delay, time_type):
        query = self.get_query()
        if time_type == 'day':
            kwargs = {'earliest_time': '-%dd@d'%(int(period)), 'latest_time': '-%dd@d'%(int(delay))}
        elif time_type == 'hour':
            kwargs = {'earliest_time': '-%dh@h' % (int(period)), 'latest_time': '-%dh@h' % (int(delay))}
        elif time_type == 'minute':
            kwargs = {'earliest_time': '-%dm@m'%(int(period)), 'latest_time': '-%dm@m'%(int(delay))}
        phoenix_service = self.phoenixService()
        phoenix_jobs = phoenix_service.jobs
        job = phoenix_jobs.export(query, **kwargs)
        query_results = results.ResultsReader(io.BufferedReader(job))
        return query_results

# 针对查询结果做ETL
# log:查询结果
# option:数据分类标识
class FormatLog:
    def __init__(self,log,option):
        self.log = log
        self.option = option

    def format_log(self):
        if self.option == 'original':
            logdir = self.log['_raw']

        return logdir

# 转发数据到第三方端口
# option:任务类型
# period:查询起始时间
# delay:查询终止时间
# time_type:查询周期
# output_ip:目的IP
# output_port:目的端口
class Forwardudp:
    def __init__(self, option, period, delay, time_type, output_ip, output_port):
        self.option = option
        self.period = period
        self.delay = delay
        self.time_type = time_type
        self.output_ip = output_ip
        self.output_port = output_port

    # 转发数据
    def Forward_udp(self):
        log = Logger(self.option, level='info')
        phoenix_server = ConnectPhoenix(self.option)
        query_results = phoenix_server.get_results(self.period, self.delay, self.time_type)
        # 日志计数
        count_data = 0
        # 创建套接字
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        # 遍历发送数据
        for result in query_results:
            if isinstance(result, results.Message):
                pass
            else:
                formatLog = FormatLog(result, self.option)
                logdic = formatLog.format_log().decode('utf-8')
                if logdic != '':
                    try:
                        s.sendto(str(logdic), (self.output_ip, self.output_port))
                        print(logdic)
                        count_data += 1
                    except Exception as e:
                        log.logger.info("Error: {}".format(e))

        log.logger.info("forwarder process send {} msg to {}".format(count_data, self.output_ip))


if __name__=='__main__':
    # get args from cron cmd
    option = sys.argv[1] if len(sys.argv) > 1 else 'original' # 任务名,默认original
    period = sys.argv[2] if len(sys.argv) > 1 else '1' # 查询起始时间=初始化为整点(当前时间 - period(time_type)),例:13:26:15执行查询前一小时数据,12:00:00=初始化为整点(13:26:15 - 1(hour))
    delay = sys.argv[3] if len(sys.argv) > 0 else '0' # 查询截止时间=初始化为整点(当前时间 - delay(time_type)) - 1s,例:13:26:15执行查询前一小时数据,12:59:59=初始化为整点(13:26:15 - 1(hour)) - 1s
    time_type = sys.argv[4] if len(sys.argv) > 4 else 'day' # 查询时间类型,默认day,可选:day、hour、minute
    output_ip = sys.argv[5] if len(sys.argv) > 5 else '192.9.9.9' # 日志转发目的IP
    output_port = int(sys.argv[6]) if len(sys.argv) > 5 else 514  # 日志转发目的端口

    forwardudp = Forwardudp(option, period, delay, time_type, output_ip, output_port)
    forwardudp.Forward_udp()

http://www.kler.cn/a/353874.html

相关文章:

  • WebSocket监听接口
  • 在 PhpStorm 中配置命令行直接运行 PHP 的步骤
  • 【OJ刷题】同向双指针问题
  • SQL从入门到实战
  • Leecode刷题C语言之按键变更的次数
  • PyTorch 框架实现线性回归:从数据预处理到模型训练全流程
  • java01作业说明:
  • 机器学习:情感分析的原理、应用场景及优缺点介绍
  • 对TCP/IP、HTTP协议原理的分析和总结
  • C++学习笔记----9、发现继承的技巧(一)---- 使用继承构建类(4)
  • 雷达手势识别技术
  • C++ IO多路复用 poll模型
  • 前端学习-CSS的三大特性(十七)
  • python 爬虫 入门 一、基础工具
  • 分布式数据库环境(HBase分布式数据库)的搭建与配置
  • [算法日常] 逆序对
  • 音乐播放器项目专栏介绍​
  • Linux的kafka安装部署
  • 自动化测试与敏捷开发的重要性
  • docker (desktopcompose) download
  • KVM 建黑群晖
  • HarmonyOS NEXT 应用开发实战(六、组件导航Navigation使用详解)
  • JavaWeb Servlet--09深入:注册系统03--删除用户业务
  • 使用WordPress从零开始搭建一个本地网站实现远程访问
  • [Python]将pdf文件转为svg
  • VMware16的安装及VMware配置Linux虚拟机