建模杂谈系列256 规则函数化改造
说明
之前尝试用FastAPI来构造规则,碰到的问题是由于请求量过大(TPS > 1000), 从而导致微服务端口资源耗尽。所以现在的point是:
- 1 如何使用函数来替代微服务(同时要保留使用微服务的优点)
- 2 进一步抽象并规范规则的执行
- 3 等效合并规则的方法
内容
0 机制讨论
过去在使用tornado作为后端服务的时候,是没有碰到端口耗尽的问题的,也许是tornado本身采取的是长连接,更适合大批量数据请求的后端任务。
FastAPI更适合做短、平的IO类异步需求,不可以用于级联,TPS大约400-1200的样子。
这次的业务场景是实体匹配,我们需要从原文中提取出实体,然后完成匹配。
数据样本:
ent_list = ['基金', '美芯晟', '高新兴', '骏成科技', '证券时报', '深市主板', '创业板', '沪市', '科创板', '计算机',
'机械设备', '共有3', '潍柴动力', '乐心医疗', '嘉曼服饰', '敏芯股份', '渝开发', '长虹美菱', '德联集团', '数据宝',
'中航西飞', '顺络电子', '基金家数', '华利集团', '杰瑞股份', '邦彦技术', '兴瑞科技', '深天马', '漫步',
'金力永磁', '太阳能', '普蕊斯', '德方纳米', '华锐精密', '伊之密', '西子洁能', '陕西华达', '浙江鼎力', '诺瓦星云', '远光']
original_text_half_width = '''昨日基金共对31家公司进行调研,扎堆调研美芯晟、高新兴、骏成科技等。证券时报•数据宝统计,6月5日共40家公司被机构调研,按调研机构类型看,基金参与31家公司的调研活动,其中,10家以上基金扎堆调研公司共6家。美芯晟最受关注,参与调研的基金达27家;高新兴、骏成科技等分别获18家、15家基金集体调研。基金参与调研的公司中,按所属板块统计,深市主板公司有13家,创业板公司有13家,沪市主板公司有1家,科创板公司有4家。所属行业来看,基金调研的公司共涉及13个行业,所属电子行业最多,有7家公司上榜;计算机、机械设备等紧随其后,分别有4家、4家公司上榜。从基金调研公司的A股总市值统计,总市值在500亿元以上的共有3家,其中总市值超千亿元的有潍柴动力等,总市值不足100亿元的有17家,分别是乐心医疗、嘉曼服饰、敏芯股份等。市场表现上,基金调研股中,近5日上涨的有10只,涨幅居前的有敏芯股份、高新兴、骏成科技等,涨幅为21.46%、19.43%、13.83%;下跌的有21只,跌幅居前的有渝开发、长虹美菱、德联集团等,跌幅为12.31%、9.92%、9.22%。数据宝统计,基金参与调研股中,近5日资金净流入的有12只,中航西飞近5日净流入资金1.53亿元,主力资金净流入最多;净流入资金较多的还有高新兴、顺络电子等,净流入资金分别为8217.28万元、3140.67万元。(数据宝)6月5日基金调研公司一览代码简称基金家数最新收盘价(元)近5日涨跌幅(%)行业688458美芯晟27 35.85 -4.48电子300098高新兴18 3.75 19.43计算机301106骏成科技15 32.42 13.83电子002138顺络电子14 25.27 7.76电子300979华利集团14 67.90 1.19纺织服饰300803指南针13 42.92 -0.60计算机002353杰瑞股份9 34.33 -2.05机械设备301276嘉曼服饰8 22.71 -0.74纺织服饰688132邦彦技术8 18.55 5.64国防军工688286敏芯股份6 44.60 21.46电子000338潍柴动力6 15.74 -2.24汽车002937兴瑞科技5 20.93 1.45电子000514渝开发4 3.49 -12.31房地产000050深天马A4 7.31 -2.40电子002351漫步者3 12.73 -0.08电子300748金力永磁2 14.06 -2.77有色金属000591太阳能2 5.05 -6.31公用事业301257普蕊斯2 40.05 -4.53医药生物000768中航西飞2 25.23 4.69国防军工300769德方纳米2 33.88 -2.98电力设备300562乐心医疗1 8.61 -6.62医药生物688059华锐精密1 54.53 -6.79机械设备002666德联集团1 3.94 -9.22基础化工300415伊之密1 22.34 1.79机械设备002534西子洁能1 10.99 -4.18电力设备301517陕西华达1 61.60 2.56国防军工301362民爆光电1 34.00 -7.34家用电器603338浙江鼎力1 61.86 -4.93机械设备301589诺瓦星云1 219.68 -8.31计算机000521长虹美菱1 8.90 -9.92家用电器002063远光软件1 5.70 -1.55计算机注:本文系新闻报道,不构成投资建议,股市有风险,投资需谨慎。'''
在逻辑上,我们会按照实际情况设计分级,在程序上,我们要有一个合并的逻辑。这种逻辑要简单,不要offend逻辑。
1 现有的服务
采用“WaterFall”的方法逐步批量的处理并分流数据。
一条规则是如此
# reject
@app.post("/r000/")
async def r000(justent:JustEnt):
the_ent = justent.some_ent
the_result = RuleResult()
try:
if judge_existence(the_ent, word_list=r0_exe_clude_list):
the_result.status = 'reject'
else:
the_result.status = 'pass'
return the_result.dict()
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
在发起调用时,采用异步方式,每次根据请求的目标先品出参数,然后将渠道的结果进行解析。
import time
def waterfall_api_mode(last_fall, rule_name ,reject_list = None, get_list = None, mappling_list = None, raw = None , base_url = None):
next_fall = []
last_ent_list = last_fall
pure_rule_url = rule_name + '/'
if len(last_ent_list):
rule_url = base_url + pure_rule_url
# api mode
tick1 = time.time()
task_list = []
for ent in last_ent_list:
tem_dict = {}
tem_dict['task_id'] = ent
tem_dict['url'] = rule_url
if raw is None :
tem_dict['json_params'] = {'some_ent':ent}
else:
tem_dict['json_params'] = {'some_ent':ent,'raw':raw}
task_list.append(tem_dict)
rule_res = asyncio.run(json_player(task_list, concurrent = 10))
# 解析结果,保留pass
for tem_res in rule_res:
for k,v in tem_res.items():
# print(k,v)
if v['status'] == 'pass':
next_fall.append(k)
elif v['status'] == 'get':
if get_list is not None :
get_list.append(v['data'])
if mappling_list is not None :
mappling_list.append({'ent':k,'mapping_ent': v['data']})
elif v['status'] == 'reject':
if reject_list is not None :
reject_list.append(k)
tick2 = time.time()
print('takes %.2f ' %(tick2-tick1))
return next_fall
在批量调用规则时,采用几乎一样的形式即可,这是非常简洁的地方。
# ============= fall of short
# r100_1
next_fall_short = waterfall_api_mode(next_fall_short, 'r100_1',base_url = base_url)
# r100
next_fall_short = waterfall_api_mode(next_fall_short, 'r100', get_list = mr.short_result,mappling_list=mr.mapping_list ,base_url = base_url)
# r102
next_fall_short = waterfall_api_mode(next_fall_short, 'r102', get_list = mr.short_result,mappling_list=mr.mapping_list ,base_url = base_url)
# r102_1
next_fall_short = waterfall_api_mode(next_fall_short, 'r102_1', get_list = mr.short_result,mappling_list=mr.mapping_list ,base_url = base_url)
# r103
next_fall_short = waterfall_api_mode(next_fall_short, 'r103', raw = original_text_half_width,base_url = base_url)
# r104
next_fall_short = waterfall_api_mode(next_fall_short, 'r104',base_url = base_url)
# r105
next_fall_short = waterfall_api_mode(next_fall_short, 'r105',base_url = base_url)
# r106
next_fall_short = waterfall_api_mode(next_fall_short, 'r106',base_url = base_url)
# r107
next_fall_short = waterfall_api_mode(next_fall_short, 'r107',base_url = base_url)
# r200
next_fall_short = waterfall_api_mode(next_fall_short, 'r200',base_url = base_url)
# r201
next_fall_short = waterfall_api_mode(next_fall_short, 'r201',base_url = base_url)
# r202
next_fall_short = waterfall_api_mode(next_fall_short, 'r202',base_url = base_url)
# r203
next_fall_short = waterfall_api_mode(next_fall_short, 'r203',base_url = base_url)
# r299
next_fall_short = waterfall_api_mode(next_fall_short, 'r299', get_list = mr.short_result,mappling_list=mr.mapping_list,base_url = base_url )
觉得还不错,需要保持的地方:
- 1 数据规范。使用pydantic,这个可以继续保持
- 2 waterfall_api_mode ,可以作为waterfall_func_mode, 且这次可以规定输出为4部分:get、pass、reject、error
- 3 执行时,每条规则除了顺序之外,应该还有层次,实现BFS。规则分为若干模式,例如 is_in , is_not_in, 在每个层之间的同类规则可以合并。
2 设计与改进
诶,我突然想到了我的APIFunc。
总体上说,这个框架还是比较强大的,但是非常僵化,所以最终没有走向实际应用。所以我觉得完全可以进行拆解,重构。当然,里面有一部分问题的解决还是蛮厉害的,反正这一会我想不出来。
有几块内容是需要添加上的:
- 1 logging对象:灵活的进行记录,后续会和logstash结合在一起(ELK)
- 2 错误输出:遇到错误时发送到kafka
- 3 shortuuid: 每次处理会生成一个shortuuid用于追溯,代表一次会话之内的
修改的部分:
- 1 原来有很多数据的校验部分,现在可以用pydantic来控制
- 2 BFS替代逐个的链式
- 3 没有列式方法,全部是行式方法
- 4 g变量:会存储额外的字典,不必完全按照df格式
优化的部分:
- 1 修饰器方法,支持按依赖定义规则。例如 on depends of [rule1,rule2], def new rule。
保留的部分:
- 1 reinit_data 重新初始化数据
2.1 原型部分
2.1.1 Logging
import logging
from logging.handlers import RotatingFileHandler
def get_logger(name, lpath='/var/log/', module='default.default'):
logger = logging.getLogger(name)
# 防止重复添加 handler
if not logger.handlers:
fpath = lpath + name + '.log'
handler = RotatingFileHandler(fpath, maxBytes=100 * 1024 * 1024, backupCount=10)
# 设置日志格式为 [时间] - [日志级别] - [模块名称] - 消息
formatter = logging.Formatter(
'[%(asctime)s] - [%(levelname)s] - [{}] - %(message)s'.format(module),
datefmt='%Y-%m-%d %H:%M:%S'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
logger = get_logger('example')
# 记录不同级别的日志
logger.info('[part.a]系统启动完成')
logger.warning('[[part.b]磁盘空间不足,剩余空间小于10%')
logger.error('无法连接数据库,请检查网络设置')
logger.debug('这是调试信息,不会显示在日志中')
logger.critical('系统崩溃,立即采取措施')
2.1.2 BFS
先回忆一下过去的成果,当时的结论是:使用networkx作为核心的图计算工具,而neo4只是后端的存储backup。可以认为是pandas和mysql的关系。
在使用的时候,可以为一个项目设置一个独立的名称,这个独立的名称也就是节点的label,或者可以认为是节点的“表”。在需要的时候,可以整个读取(neo4j disk),在内存中处理(networkx memory)。
这段代码定义了一个很小的图,
import networkx as nx
import matplotlib.pyplot as plt
# =======================> 图的定义
# Create a directed graph
G = nx.DiGraph()
def hello():
print('This is Node Running ...')
G.add_node(1)
G.nodes[1]['name'] = 'MinuteData'
G.nodes[1]['run'] = hello
G.add_node(2)
G.nodes[2]['name'] = 'CaptialDataDaily'
G.nodes[2]['run'] = hello
G.add_node(3)
G.nodes[3]['name'] = 'MergeData'
G.nodes[3]['run'] = hello
G.add_edge(1,3)
G.add_edge(2,3)
G.add_node(4)
G.nodes[4]['name'] = 'FeatureHorizonal'
G.nodes[4]['run'] = hello
G.add_edge(3,4)
G.add_node(5)
G.nodes[5]['name'] = 'ImbalanceSample'
G.nodes[5]['run'] = hello
G.add_edge(4,5)
G.add_node(6)
G.nodes[6]['name'] = 'FeatureVertical'
G.nodes[6]['run'] = hello
G.add_edge(5,6)
# =======================> 图的绘画
# 获取节点标签属性
node_labels = nx.get_node_attributes(G, "name")
# pos = nx.shell_layout(G)
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=False, node_size=1000, font_size=12, font_color='black', arrows=True)
# 绘制节点标签
_ = nx.draw_networkx_labels(G, pos, labels=node_labels)
这里,可以看到节点的依赖关系可以很清楚的展示出来。
然后稍微跳一下
# 输入一个nx图,给出BFS层级字典
def BFS(some_G,max_depth = 100):
layer_dict = {}
# 初始化节点
init_node_list = [node for node, in_degree in some_G.in_degree() if in_degree == 0]
layer_dict[0] = init_node_list
# 节点的入度字典
in_degree_dict = dict(some_G.in_degree())
all_nodes = set(some_G.nodes)
travel_nodes = set(init_node_list)
# 迭代节点
for i in range(1,max_depth):
last_layer_nodes = layer_dict[i-1]
layer_dict[i] = []
for last_node in last_layer_nodes:
out_nodes = list(some_G.successors(last_node))
if len(out_nodes):
for out_node in out_nodes:
out_node_degree = in_degree_dict[out_node]
out_node_degree1 = out_node_degree-1
if out_node_degree1 == 0:
layer_dict[i].append(out_node)
travel_nodes.add(out_node)
else:
in_degree_dict[out_node] = out_node_degree1
gap_set = all_nodes - travel_nodes
if len(gap_set) ==0:
break
return layer_dict
BFS(G)
{0: [1, 2], 1: [3], 2: [4], 3: [5], 4: [6]}
给到一个定义好的图,通过BFS可以很快把层级梳理出来。
所以,将原来的修饰器改一改,将节点的依赖关系在启动修饰器的时候解释。函数可以在修饰器下临时定义,也可以引用已经编辑好的。现在已经具备了使用形式化参数(如slice_list_batch
)来调用函数了,既有本地的包,也有微服务。
2.1.3 会话
我们将程序的每一次执行视为一次会话。
将程序的每一次执行视为一次会话是一种有用的抽象,可以帮助我们追踪、分析和管理程序的行为。每次执行都可以被认为是一个独立的会话,这些会话可以包括一系列输入、处理和输出。以下是将程序执行视为会话时的一些要点:
1. 会话的定义
- 每次程序的执行周期(从启动到结束)被视为一个独立的会话。
- 会话的范围可以根据程序的复杂度定义,可能包括启动、执行逻辑、处理数据、生成结果,并最终结束。
2. 会话数据
- 输入数据:用户输入或外部数据源提供的信息。
- 上下文信息:会话中的环境或系统状态(如用户信息、配置设置、会话 ID)。
- 日志记录:在每个会话中生成的日志信息,帮助监控、调试和跟踪程序执行的过程。
- 输出数据:会话完成后生成的结果或操作。
3. 会话标识
- 每个会话可以使用唯一的标识符(例如 UUID、时间戳)来区分和追踪。
- 日志和监控系统可以根据这个标识符来收集会话信息。
4. 会话的生命周期
- 开始:程序启动或用户发起的操作。
- 执行:程序的核心逻辑运行,处理输入并生成中间或最终结果。
- 结束:程序完成执行或用户操作结束。程序可以写入日志、清理资源或返回结果。
5. 会话状态
- 成功:程序按预期完成所有操作。
- 失败:程序执行中出现错误或异常。
- 中断:程序由于外部原因或用户取消而中途停止。
6. 会话管理
- 可以通过记录每次会话的执行时间、状态、输入和输出数据,来分析系统的性能和稳定性。
- 会话管理有助于调试(当出现问题时可以回溯某一具体会话)、分析(汇总和统计会话数据)以及优化程序。
7. 会话存储
- 将会话数据存储到数据库、日志文件或分布式系统中,以便后续分析或复盘。
通过这种“会话”概念,能够更好地组织和管理程序的执行过程,尤其在需要跟踪状态、并发操作、或者执行历史时非常有用。
两个需要增加的点(以前没这么实施)
- 1 生成uuid,用于生成会话的唯一ID
import uuid
def get_uuid(version=4, name=None, namespace=None):
"""
生成 UUID。
参数:
- version: UUID 版本 (1, 3, 4, 5)
- name: 当使用 UUID3 或 UUID5 时,需要提供的名称
- namespace: 当使用 UUID3 或 UUID5 时,需要提供的命名空间 (uuid.NAMESPACE_DNS, uuid.NAMESPACE_URL 等)
返回:
- 生成的 UUID 字符串
"""
if version == 1:
# 基于时间生成 UUID
return uuid.uuid1()
elif version == 3:
if name is None or namespace is None:
raise ValueError("UUID3 需要提供 name 和 namespace 参数")
# 基于 MD5 哈希的命名空间 UUID
return uuid.uuid3(namespace, name)
elif version == 4:
# 生成随机的 UUID
return uuid.uuid4()
elif version == 5:
if name is None or namespace is None:
raise ValueError("UUID5 需要提供 name 和 namespace 参数")
# 基于 SHA-1 哈希的命名空间 UUID
return uuid.uuid5(namespace, name)
else:
raise ValueError("不支持的 UUID 版本。版本应为 1, 3, 4 或 5")
- 2 会话数据存储
在高性能的场景下,里面增加的每一个操作可能都会导致系统的不稳定。但是,如果是必要的操作,那么也不能省。
我问了下大模型,自己也想了想,觉得还是用kafka比较合适。
python操作kafka一般使用confluent-kafka,在有些环境下安装会有点问题。例如,我在ubuntu18.04上安装时,爆了一些底层错误,类似C之类的依赖;在20.04上安装就没有问题。但总归要考虑这种环境问题差异会比较麻烦,所以我也做了一个kafka_agent,以API的形式提供kafka的访问。缺点是,json序列化的过程要加多一次。
我们来考虑当前场景时,并不是对每一个请求都发送会话数据:
- 1 正常的执行(INFO):可以考虑按很低的概率抓取会话数据。
- 2 错误(ERROR): 可以完全抓取,但这个类型的比例应该本身就是极低的。
- 3 特定的抓取(DEBUG):可以在请求时用特定的字段区分,这类型的会话数据会被抓取。
总之,需要发起数据存储的概率非常低,总体上可能不到1%,所以这些额外的开销应该可以接受。反之,如果因为会话数据的存储影响了处理,说明:
- 1 程序的水准过低,错误率太高。
- 2 确实有必要进行并行:一边运行,一边监控。
如果是程序问题,那么就需要不断优化;如果是需要同步进行并行检查,那么就设置缓冲队列,加分布式处理。
使用kafka agent
假设topic为event_collect ,发送一个消息
import requests as req
from pydantic import BaseModel,field_validator
import pandas as pd
import json
import time
class Producer(BaseModel):
servers : str
raw_msg_list : list
is_json : bool = True
topic : str
@property
def msg_list(self):
# change raw - json
if self.is_json:
tick1 = time.time()
the_list = pd.Series(self.raw_msg_list).apply(json.dumps).to_list()
print('takes %.2f for json dumps ' %(time.time() - tick1 ))
return the_list
else:
return self.raw_msg_list
回顾一下kafka的搭建,可以使用docker-compose
搭建,但是我还是比较喜欢直接用docker。
首先需要搭建zookeeper。
docker run -d --restart=always --name zookeeper -e \
ZOOKEEPER_CLIENT_PORT=2181 -e \
ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 \
registry.cn-hangzhou.aliyuncs.com/andy08008/zookeeper0718:v100
然后再搭kafka,假设kafka分为内网和外网监听。
创建kafka持久化的路径
mkdir /home/data2T/kafka_data
然后创建
WAN_IP=XXXX
LAN_IP=192.168.0.159
docker run -d --name kafka \
-p xxxx:xxxx \
-p 9092:9092 \
--link zookeeper:zk \
-e HOST_IP=localhost \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \
-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://${LAN_IP}:9092,EXTERNAL://${WAN_IP}:xxxx \
-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:xxxx \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \
-e KAFKA_LISTENER_NAME=INTERNAL \
-e KAFKA_LISTENER_NAME=EXTERNAL \
-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
-e KAFKA_LOG_DIRS=/data/kafka-logs \
-v /home/data2T/kafka_data:/data/kafka-logs \
registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100
这时候就可以使用kafka_agent进行连接了
生产
import requests as req
from pydantic import BaseModel,field_validator
import pandas as pd
import json
import time
class Producer(BaseModel):
servers : str
raw_msg_list : list
is_json : bool = True
topic : str
@property
def msg_list(self):
# change raw - json
if self.is_json:
tick1 = time.time()
the_list = pd.Series(self.raw_msg_list).apply(json.dumps).to_list()
print('takes %.2f for json dumps ' %(time.time() - tick1 ))
return the_list
else:
return self.raw_msg_list
msg_list = [{'id':i ,'value':'abc' } for i in range(10)]
produces = Producer(servers = WAN_IP,raw_msg_list = msg_list, topic='mytest200' )
import time
tick1 = time.time()
resp = req.post(f'http://{agent_url}/send_msg/',json = produces.dict()).json()
tick2 = time.time()
print('takes %.2f' %(tick2 - tick1) )
resp 10
# 外网被占用的情况下,耗时比较久
takes 1.44
消费
import requests as req
from pydantic import BaseModel,field_validator
import pandas as pd
import json
import time
# group.id: 声明不同的group.id 可以重头消费
class InputConsumer(BaseModel):
servers : str
groupid : str = 'default01'
is_commit: bool = True
msg_num : int = 3
topic : str
is_json : bool = True
# 外网
the_consumer = InputConsumer(servers = f'{WAN_IP}', msg_num =10, topic='mytest202')
import time
tick1 = time.time()
resp = req.post(f'http://{agent_url}/consume_msg/',json = the_consumer.dict()).json()
tick2 = time.time()
print('takes %.2f' %(tick2 - tick1) )
# 内网
lan_the_consumer = InputConsumer(servers = f'{LAN_IP}', msg_num =10, topic='mytest202')
import time
tick1 = time.time()
resp = req.post(f'http://{agent_url}/consume_msg/',json = lan_the_consumer.dict()).json()
tick2 = time.time()
print('takes %.2f' %(tick2 - tick1) )
我发现在带宽在被占满的情况,从公网拉取的消息结果为空,但是从内网可以拉取到结果。
原因大致如下:对应方法是保证带宽,或者在消费端进行修改
当我取消掉模拟耗带宽的操作(rsync大文件),此时无论WAN还是LAN都恢复正常了。
Q1: 使用代理,性能是否会有影响?
A1: 由于向代理发送,接受消息都要经过json序列化,效率将会大幅下降。80%以上的开销均为序列化开销。
生产者: 10万json+agent 1秒 外/ 0.78 内,仅10万json 0.5秒
消费者: 10万条 2.1秒 |1.95 |1.79
但可以看到,这样的速度仍然可以大规模使用。
Q2: 如果输错了服务器地址会怎样?
A2: 服务将陷入短暂不可用情况。在取消错误的请求后,大约5分钟,代理重连太久后才会自动取消。
结论:保存数据走内网kafka。
其他
Logstash的调试
Logstash 是一个开源的 数据收集引擎,通常用于实时数据处理和日志管理。它可以从多种来源收集数据,将其过滤、解析,并将处理后的数据发送到不同的目标存储系统。Logstash 是 ELK/Elastic Stack(Elasticsearch、Logstash、Kibana)的一部分,通常与 Elasticsearch 和 Kibana 搭配使用来构建一个完整的日志和事件管理系统。
Logstash 的主要功能
-
数据收集:
- Logstash 支持从各种数据源收集数据,例如日志文件、数据库、网络、消息队列等。通过插件系统,它能够轻松集成到不同的数据源环境中。
-
数据过滤与解析:
- Logstash 可以对收集到的数据进行过滤和解析,例如使用正则表达式提取字段,重新格式化数据,或者对数据进行清洗。
- Logstash 的过滤器插件支持丰富的处理操作,比如 Grok 解析、JSON、日期处理、去重、聚合等。
-
数据输出:
- Logstash 可以将处理后的数据发送到多个目标系统,比如 Elasticsearch(用于搜索和分析)、文件、数据库、消息队列、监控系统等。
Logstash 主要的架构组件
-
Inputs(输入插件):用于指定数据来源,如文件、数据库、消息队列等。常见的输入插件包括
file
、syslog
、kafka
、http
等。 -
Filters(过滤插件):用于处理、解析和转换数据,可以使用 Grok、正则表达式、日期处理等插件来解析复杂的日志格式。
-
Outputs(输出插件):用于定义数据的存储位置,比如发送到 Elasticsearch、存储到文件、发送到消息队列等。
常见使用场景
-
日志管理与分析:
- Logstash 经常与 Elasticsearch 和 Kibana 搭配使用来实现集中式日志管理,将来自不同服务的日志集中采集、分析和展示。
-
实时数据流处理:
- 它还可以用来处理实时数据流,例如从 Kafka 或 Redis 获取消息,对数据进行实时处理后发送到目标系统。
-
系统监控与安全分析:
- 在 DevOps 环境中,Logstash 用于实时监控应用程序、服务器和网络设备的日志,并通过 Kibana 展示给运维人员,实现系统健康监控和安全日志分析。
简单工作流程
- 输入:从不同的数据源收集数据(如文件、数据库、API 等)。
- 过滤:通过解析、格式化和过滤等操作对数据进行处理。
- 输出:将处理后的数据发送到指定目标(如 Elasticsearch、Kafka、文件等)。
示例
下面是一个简单的 Logstash 配置,它从一个日志文件中收集数据,解析后发送到 Elasticsearch:
input {
file {
path => "/var/log/example.log"
start_position => "beginning"
}
}
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}" }
}
date {
match => ["timestamp", "ISO8601"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
这个配置会将日志文件中的数据解析为 JSON 格式,并按日期创建 Elasticsearch 索引。
Logstash 通过灵活的输入、过滤、输出插件,使它成为处理异构数据的强大工具。
日志
这段日志输出来自 Logstash,显示了一个日志事件的详细信息。下面是每个字段的解释:
9月 15 19:09:07 m7 logstash[23910]: {
9月 15 19:09:07 m7 logstash[23910]: "@version" => "1",
9月 15 19:09:07 m7 logstash[23910]: "log" => {
9月 15 19:09:07 m7 logstash[23910]: "file" => {
9月 15 19:09:07 m7 logstash[23910]: "path" => "/var/log/example.log"
9月 15 19:09:07 m7 logstash[23910]: }
9月 15 19:09:07 m7 logstash[23910]: },
9月 15 19:09:07 m7 logstash[23910]: "module" => "part.b",
9月 15 19:09:07 m7 logstash[23910]: "log_message" => "无法连接数据库,请检查网络设置",
9月 15 19:09:07 m7 logstash[23910]: "timestamp" => "2024-09-15 19:08:15",
9月 15 19:09:07 m7 logstash[23910]: "message" => "[2024-09-15 19:08:15] - [ERROR] - [part.b] - 无法连接数据库,请检查网络设置",
9月 15 19:09:07 m7 logstash[23910]: "level" => "ERROR",
9月 15 19:09:07 m7 logstash[23910]: "@timestamp" => 2024-09-15T11:09:06.865477838Z,
9月 15 19:09:07 m7 logstash[23910]: "event" => {
9月 15 19:09:07 m7 logstash[23910]: "original" => "[2024-09-15 19:08:15] - [ERROR] - [part.b] - 无法连接数据库,请检查网络设置"
9月 15 19:09:07 m7 logstash[23910]: },
9月 15 19:09:07 m7 logstash[23910]: "host" => {
9月 15 19:09:07 m7 logstash[23910]: "name" => "m7"
9月 15 19:09:07 m7 logstash[23910]: }
9月 15 19:09:07 m7 logstash[23910]: }
详细解释:
-
@version
: 指示事件的版本,通常为“1”,表示使用的事件格式的版本。 -
log
: 包含日志文件的信息。file
: 具体的日志文件信息。path
: 日志文件的完整路径,即日志数据的来源。
-
module
: 动态模块名称,即日志消息中包含的模块标识。这个值是你在日志记录中自定义的,比如在你的代码中你设置为part.b
。 -
log_message
: 从原始消息中提取出的主要日志内容(不包含时间戳和日志级别)。 -
timestamp
: 日志事件的时间戳,表示事件发生的实际时间。 -
message
: 日志的原始格式化消息,包含时间戳、日志级别、模块名称和日志内容。 -
level
: 日志的级别,例如“ERROR”、“INFO”等。表示事件的严重性。 -
@timestamp
: Logstash 处理事件的时间戳,通常是 Logstash 解析日志并将其写入 Elasticsearch 的时间。 -
event
: 包含原始日志消息的完整文本,通常用于保持日志的原始格式。 -
host
: 提供了有关 Logstash 运行的主机的信息。name
: 主机名,显示 Logstash 实例所在的机器名。
这个日志条目展示了从日志文件中提取的数据,以及 Logstash 对其进行解析和处理后的结构化数据。
要特别注意日志偏移的设置,这个相当于是logstash的断点续传。
在实际生产环境中,sincedb_path
选项是用于 Logstash 跟踪文件读取进度的机制,默认情况下它不会设置为 /dev/null
。下面解释一下它的常用场景和配置方式:
1. sincedb
是什么?
sincedb
文件:用于 Logstash 记录输入插件(如file
)读取文件的当前位置。每次 Logstash 读取文件时,它会更新sincedb
文件,以便在 Logstash 重启或系统重启时能够从上次停止的地方继续读取,而不是从头开始。- 位置:默认情况下,
sincedb
文件存储在用户的主目录下,例如:- Linux:
~/.sincedb_*
- Windows:
C:\Users\Username\.sincedb_*
- Linux:
每个 sincedb
文件会跟踪一个特定日志文件的 inode 信息及读取进度。
2. 实际场景下 sincedb_path
的使用
-
正常生产环境:
-
典型配置:你通常会为
sincedb_path
指定一个具体的文件路径,确保 Logstash 在重启时能够继续处理文件。例如:sincedb_path => "/var/lib/logstash/sincedb"
在这个例子中,
sincedb
文件会存储在/var/lib/logstash/
目录下,确保 Logstash 有足够的权限去读取和写入该文件。
-
-
文件路径管理:如果你有多个不同的日志文件输入,可以为每个文件输入指定不同的
sincedb_path
,以避免冲突。例如:input { file { path => "/var/log/app1.log" sincedb_path => "/var/lib/logstash/sincedb_app1" } file { path => "/var/log/app2.log" sincedb_path => "/var/lib/logstash/sincedb_app2" } }
3. sincedb_path => "/dev/null"
在实际中的用途
在某些特殊场景下,你可能会临时使用 /dev/null
,但不建议在生产环境中使用。
使用 /dev/null
的情况:
-
调试/开发阶段:
-
当你在开发或调试 Logstash 配置时,你可能希望每次启动 Logstash 时都从头读取日志文件。在这种情况下,你可以临时将
sincedb_path
设置为/dev/null
,这样 Logstash 每次都会忽略之前的进度,从文件的开头开始读取。sincedb_path => "/dev/null"
-
-
短期任务:
- 对于一次性读取文件的任务或临时性的日志分析,你可能不需要记录进度。在这种情况下,使用
/dev/null
也是合理的。
- 对于一次性读取文件的任务或临时性的日志分析,你可能不需要记录进度。在这种情况下,使用
不建议在生产环境中使用的原因:
- 文件读取进度丢失:如果你将
sincedb_path
设置为/dev/null
,Logstash 无法保存文件读取进度。在生产环境中,如果 Logstash 服务重启或系统出现问题,你将丢失已处理文件的位置信息,Logstash 会从头开始读取整个日志文件,这可能会导致重复处理日志。
4. 总结:sincedb_path 在生产环境的最佳实践
-
指定合适的路径:在生产环境中,建议明确指定
sincedb_path
到一个持久存储的路径,通常位于/var/lib/logstash
之类的目录,确保 Logstash 能记录文件读取进度。例如:
sincedb_path => "/var/lib/logstash/sincedb_example"
-
使用
/dev/null
慎重:仅在调试、开发或一次性任务中使用/dev/null
,避免在生产环境中使用,以防日志文件重新读取时产生问题。
整体上,我们写好了程序,当逻辑较为复杂时,或者我们将之作为服务进行长期运行时,容易"失联"。我们并不知道程序/服务出了什么问题,进行定位时需要切到非常细的操作,经常达到让人望而却步的程度。
比较可行的方法是程序将日志追加到文件,然后由其他程序(如logstash)进行读取,解析,转存到es中,供监控和后续分析。
追加到日志是代价比较低,且不会犯错的操作。通过rotate,我们也避免了磁盘满的风险。
日志分为5个级别,我们关注其中四个(忽略Debug):
例如,Info 可以是类心跳的信息,确保程序正常运行,无论是Idle还是处理数据。给到的FeedBack是在常态运行。另一个点就是,提前准备好可以测试其功能的样本数据,隔一段时间调一次,确保无论是空载还是满载都能得到反馈。
Warning 是一些预警,例如磁盘空间不足、内存空间不足、网络带宽不足等。这些随时可能会导致程序崩溃、挂起。
Error 是一些错误,例如数据库连接中断,部分数据逻辑处理错误。
Critical 是致命性错误,例如显卡出问题了,模型无法载入。
日志文件 /var/log/example.log
[2024-09-15 19:08:15] - [INFO] - [part.a] - 系统启动完成
[2024-09-15 19:08:15] - [WARNING] - [part.b] - 磁盘空间不足,剩余空间小于10%
[2024-09-15 19:08:15] - [ERROR] - [part.b] - 无法连接数据库,请检查网络设置
[2024-09-15 19:08:15] - [CRITICAL] - [part.b] - 系统崩溃,立即采取措施
写入vim /etc/logstash/conf.d/debug_logstash.conf
, grok语句解析4个变量:
- 1 时间戳 timestamp
- 2 日志等级:level
- 3 模块名称:module
- 4 消息主体:log_message
input {
file {
path => "/var/log/example.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => {
"message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] - \[%{LOGLEVEL:level}\] - \[%{DATA:module}\] - %{GREEDYDATA:log_message}"
}
}
}
output {
stdout { codec => rubydebug }
}
校验语句
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/debug_logstash.conf --config.test_and_exit
重启服务
systemctl restart logstash
观察结果
journalctl -u logstash -f
总体上感觉grok解析还是有点麻烦,尽量简单点好了;倒是JSON解析可能更适合我,但是显然效率会稍微低一点。
篇幅太长了,再写一篇续吧。
本篇:
- 1 介绍了问题的由来,现状(api)。
- 2 完成了设计思路,以及一些对应组件的validate
- 1 logging : python的logging 和 logstash配合
- 2 graph: 使用 networkx 来进行BFS计算,规则之间可以按照nx的方式定义依赖
- 3 uuid: 使用 uuid 来表示会话
- 4 kafka: 回顾kafka的搭建,使用kafka agent进行数据提交保存
下篇:
- 1 重构新的规则对象
- 核心功能:允许灵活的定义规则(graph)
- 2 将本次的日志、会话(uuid及保存)实现
- 3 梳理未来规则分类与合并的思路