Python脚本之获取Splunk数据发送到第三方UDP端口
原文地址:https://www.program-park.top/2024/10/12/python_21/
在 Linux 环境执行脚本,Python需要引入对应依赖:
pip install splunk-sdk
离线环境下,可手动执行python
进入 Python 解释器的交互式界面,输入以下命令:
import sys
print(sys.path)
该命令会输出一个列表,包含了 Python 解释器在搜索模块时会查看的所有目录,如/usr/local/lib/python2.7/site-packages/
。再去 Splunk 目录,将./etc/apps/splunk_instrumentation/bin/splunk_instrumentation/splunklib
目录复制到/usr/local/lib/python2.7/site-packages/
一份,Python 环境即有了 Splunk 的依赖。
脚本执行命令:
python forwarder_udp.py original 1 0 day 192.9.9.9 514
Python环境:2.7,脚本如下:
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
from logging import handlers
import sys, io, logging, socket, os
import splunklib.client as client
import splunklib.results as results
# _create_unverified_https_context = ssl._create_unverified_context
# ssl._create_default_https_context = _create_unverified_https_context
reload(sys)
sys.setdefaultencoding('utf8')
# log日志,存放路径:/logs
class Logger:
level_relations = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'crit': logging.CRITICAL
}
def __init__(self, option, level='info', when='W0', backCount=8):
logfile = "/logs/{}.log".format(option)
if not os.path.exists('/logs'):
os.makedirs("/logs")
self.logger = logging.getLogger(logfile)
fmt = '%(asctime)s - %(levelname)s: %(message)s'
format_str = logging.Formatter(fmt)
self.logger.setLevel(self.level_relations.get(level))
self.logger.handlers = []
# stream_handler = logging.StreamHandler()
# stream_handler.setFormatter(format_str)
# self.logger.addHandler(stream_handler)
file_handler = handlers.TimedRotatingFileHandler(filename=logfile, when=when, backupCount=backCount,
encoding='utf-8')
file_handler.setFormatter(format_str)
self.logger.addHandler(file_handler)
# splunk客户端
# option:数据分类标识
class ConnectPhoenix:
def __init__(self,option):
self.HOST = "10.10.10.1"
self.PORT = 8089
self.USERNAME = "admin"
self.PASSWORD = "123456"
self.option = option
def phoenixService(self):
phoenix_service = client.connect(
host=self.HOST,
port=self.PORT,
username=self.USERNAME,
password=self.PASSWORD,
verify=False,
app="search")
return phoenix_service
# 获取查询SPL
def get_query(self):
if self.option == 'original':
return 'search index=ri_* | table _time,_raw'
# 获取查询结果
# period:起始时间
# delay:截止时间
# time_type:时间类型,day、hour、minute
def get_results(self, period, delay, time_type):
query = self.get_query()
if time_type == 'day':
kwargs = {'earliest_time': '-%dd@d'%(int(period)), 'latest_time': '-%dd@d'%(int(delay))}
elif time_type == 'hour':
kwargs = {'earliest_time': '-%dh@h' % (int(period)), 'latest_time': '-%dh@h' % (int(delay))}
elif time_type == 'minute':
kwargs = {'earliest_time': '-%dm@m'%(int(period)), 'latest_time': '-%dm@m'%(int(delay))}
phoenix_service = self.phoenixService()
phoenix_jobs = phoenix_service.jobs
job = phoenix_jobs.export(query, **kwargs)
query_results = results.ResultsReader(io.BufferedReader(job))
return query_results
# 针对查询结果做ETL
# log:查询结果
# option:数据分类标识
class FormatLog:
def __init__(self,log,option):
self.log = log
self.option = option
def format_log(self):
if self.option == 'original':
logdir = self.log['_raw']
return logdir
# 转发数据到第三方端口
# option:任务类型
# period:查询起始时间
# delay:查询终止时间
# time_type:查询周期
# output_ip:目的IP
# output_port:目的端口
class Forwardudp:
def __init__(self, option, period, delay, time_type, output_ip, output_port):
self.option = option
self.period = period
self.delay = delay
self.time_type = time_type
self.output_ip = output_ip
self.output_port = output_port
# 转发数据
def Forward_udp(self):
log = Logger(self.option, level='info')
phoenix_server = ConnectPhoenix(self.option)
query_results = phoenix_server.get_results(self.period, self.delay, self.time_type)
# 日志计数
count_data = 0
# 创建套接字
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 遍历发送数据
for result in query_results:
if isinstance(result, results.Message):
pass
else:
formatLog = FormatLog(result, self.option)
logdic = formatLog.format_log().decode('utf-8')
if logdic != '':
try:
s.sendto(str(logdic), (self.output_ip, self.output_port))
print(logdic)
count_data += 1
except Exception as e:
log.logger.info("Error: {}".format(e))
log.logger.info("forwarder process send {} msg to {}".format(count_data, self.output_ip))
if __name__=='__main__':
# get args from cron cmd
option = sys.argv[1] if len(sys.argv) > 1 else 'original' # 任务名,默认original
period = sys.argv[2] if len(sys.argv) > 1 else '1' # 查询起始时间=初始化为整点(当前时间 - period(time_type)),例:13:26:15执行查询前一小时数据,12:00:00=初始化为整点(13:26:15 - 1(hour))
delay = sys.argv[3] if len(sys.argv) > 0 else '0' # 查询截止时间=初始化为整点(当前时间 - delay(time_type)) - 1s,例:13:26:15执行查询前一小时数据,12:59:59=初始化为整点(13:26:15 - 1(hour)) - 1s
time_type = sys.argv[4] if len(sys.argv) > 4 else 'day' # 查询时间类型,默认day,可选:day、hour、minute
output_ip = sys.argv[5] if len(sys.argv) > 5 else '192.9.9.9' # 日志转发目的IP
output_port = int(sys.argv[6]) if len(sys.argv) > 5 else 514 # 日志转发目的端口
forwardudp = Forwardudp(option, period, delay, time_type, output_ip, output_port)
forwardudp.Forward_udp()