graylog+sidecar通过docker-compose部署并采集SSH登录日志
文章目录
- 前言
- 一、graylog日志系统数据流向清洗图
- 二、资源准备及部署
- 1.docker-compose部署
- 2.准备docker-compose.yml文件
- 3.安装graylog-sidecar并配置
- 4.给sidecar创建token
- 三、graylog-WEB配置采集SSH日志
- 1.配置Inputs
- 2.创建sidecar采集器
- 3.将页面创建好的sidecar与服务器绑定
- 4.创建indices
- 5.分流并创建流规则
- 6.通过提取器extractor替换source字段的值
- 7.移除日志中无关紧要的日志字段
- 8.最终效果演示
- 8.1.分流效果演示
- 8.2.日志效果展示
- 补充知识点
- 1.使用pipelines从message字段中获取源IP、用户、端口、状态
- 2.使用pipelines规则获取被连接的服务器IP
- 3.通过提取器Extractor获取登录失败的次数
- 4.增加一个字段serviceName,获取对应的服务名称
- 总结
前言
本篇文章以采集ssh登录日志为例,结合docker、docker-compose对graylog+sidecar采集日志系统的搭建部署、日志采集流程、配置方法做一个详细的介绍,相比于ELK,graylog日志平台在国内比较冷门.能用的知识比较少,只有一个官网博客.但是我维护的项目中实际用的是graylog,对比ELK比较轻量且维护性强。话不多说,开始展示.
正则玩的好的话,用graylog会轻松很多的!!
一、graylog日志系统数据流向清洗图
自己理解画的图,若有问题请大佬指正
官网图
二、资源准备及部署
ip | 作用 |
---|---|
192.168.56.160 | graylog-server(4.2)、mongodb(4.2)、es(7.10)、docker(1.19)、docker-compose(1.25) |
192.168.56.162 | graylog-sidecar、被采集日志的机器 |
docker自行部署,不在描述。系统centos7.6
1.docker-compose部署
192.168.56.160
[root@localhost export]# curl -L https://github.com/docker/compose/releases/download/1.25.4/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
[root@localhost export]# chmod +x /usr/local/bin/docker-compose
2.准备docker-compose.yml文件
192.168.56.160
[root@localhost graylog]# mkdir -p mongo_data
[root@localhost graylog]# mkdir -p es_data
[root@localhost graylog]# mkdir -p graylog_data/config
[root@localhost graylog]# ll
总用量 0
drwxr-xr-x. 2 root root 6 1月 2 14:46 es_data
drwxr-xr-x. 3 root root 20 1月 2 14:47 graylog_data
drwxr-xr-x. 2 root root 6 1月 2 14:46 mongo_data
下载相关文件并调整conf文件配置
[root@localhost graylog]# wget https://raw.githubusercontent.com/Graylog2/graylog-docker/4.2/config/graylog.conf
[root@localhost graylog]# wget https://raw.githubusercontent.com/Graylog2/graylog-docker/4.2/config/log4j2.xml
#设置高亮和时区
[root@localhost graylog]# vim graylog.conf
修改 root_timezone = PRC
修改 allow_highlighting = true
此处的yml是基于官网提供的yml文件上进行了简单修改,只是用于做测试
version: '3'
services:
mongodb:
image: swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/mongo:4.2
volumes:
- /export/graylog/mongo_data:/data/db
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo/Asia/Shanghai:/etc/timezone:ro
elasticsearch:
image: swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/elasticsearch:7.10.1
volumes:
- /export/graylog/es_data:/usr/share/elasticsearch/data
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo/Asia/Shanghai:/etc/timezone:ro
environment:
- http.host=0.0.0.0
- transport.host=localhost
- network.host=0.0.0.0
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- TZ=Asia/Shanghai
ulimits:
memlock:
soft: -1
hard: -1
# Graylog: https://hub.docker.com/r/graylog/graylog/
graylog:
image: graylog/graylog:4.2
volumes:
- /export/graylog/graylog_data:/usr/share/graylog/data
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo/Asia/Shanghai:/etc/timezone:ro
environment:
# CHANGE ME (must be at least 16 characters)!
- GRAYLOG_PASSWORD_SECRET=somepasswordpepper
- GRAYLOG_ROOT_PASSWORD_SHA2=8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
- GRAYLOG_HTTP_EXTERNAL_URI=http://127.0.0.1:9000/
- TZ=Asia/Shanghai
entrypoint: /usr/bin/tini -- wait-for-it elasticsearch:9200 -- /docker-entrypoint.sh
links:
- mongodb:mongo
- elasticsearch
restart: always
depends_on:
- mongodb
- elasticsearch
ports:
# Graylog web interface and REST API
- 9000:9000
# Syslog TCP
- 1514:1514
# Syslog UDP
- 1514:1514/udp
# GELF TCP
- 12201:12201
# GELF UDP
- 12201:12201/udp
# Beats
- 5044:5044
启动yml文件,并验证graylog是否能成功访问
[root@localhost graylog]# docker-compose up -d
至此,192.168.56.160机器所需要的资源准备完成,并成功启动graylog
3.安装graylog-sidecar并配置
graylog-server与sidecar依赖的版本
https://go2docs.graylog.org/current/getting_in_log_data/install_sidecar_on_linux.htm?tocpath=Get%20in%20Logs%7CGraylog%20Sidecar%7CInstall%20Graylog%20Sidecar%7C_____1
Sidecar Version | Graylog Server Version |
---|---|
1.5.x | 5.2.x or higher |
1.4.x | 5.0.x or higher |
1.3.x | 5.0.x |
1.2.x | 3.2.5 or higher |
1.1.x | 3.2.5 or higher |
安装graylog-sidecar的脚本
#!/bin/bash
# desc: 用于自动部署graylog agent 并配置cgroup 资源限制
#参数定义
date=`date +"%Y-%m-%d-%H:%M:%S"`
centosVersion=$(awk '{print $(NF-1)}' /etc/redhat-release)
#日志相关
echo "graylog agent 自动部署 当前时间:$date,当前系统:$centosVersion"
# 在/tmp 执行
cd /tmp/
# 下载包 如果没有包则下载,有包则忽略此步骤
#wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.13.3-x86_64.rpm
#wget https://github.com/Graylog2/collector-sidecar/releases/download/1.1.0/graylog-sidecar-1.1.0-1.x86_64.rpm
# 安装包
rpm -ivh filebeat-7.13.3-x86_64.rpm
rpm -ivh graylog-sidecar-1.1.0-1.x86_64.rpm
# 配置
graylog-sidecar -service install
cat>/usr/bin/filebeat_cgroup.sh<<EOF
mkdir -p /sys/fs/cgroup/memory/system.slice/graylog-sidecar.service
mkdir -p /sys/fs/cgroup/cpu/system.slice/graylog-sidecar.service
echo 9000 > /sys/fs/cgroup/cpu/system.slice/graylog-sidecar.service/cpu.cfs_quota_us
echo 100000 > /sys/fs/cgroup/cpu/system.slice/graylog-sidecar.service/cpu.cfs_period_us
echo 0 > /sys/fs/cgroup/memory/system.slice/graylog-sidecar.service/memory.swappiness
echo 0 > /sys/fs/cgroup/memory/system.slice/graylog-sidecar.service/memory.oom_control
echo 104857600 > /sys/fs/cgroup/memory/system.slice/graylog-sidecar.service/memory.limit_in_bytes
EOF
cat>/etc/systemd/system/graylog-sidecar.service<<EOF
[Unit]
Description=Wrapper service for Graylog controlled collector
ConditionFileIsExecutable=/usr/bin/graylog-sidecar
[Service]
StartLimitInterval=5
StartLimitBurst=10
ExecStartPre=/bin/bash /usr/bin/filebeat_cgroup.sh
ExecStart=/usr/bin/graylog-sidecar
Restart=always
RestartSec=120
EnvironmentFile=-/etc/sysconfig/graylog-sidecar
[Install]
WantedBy=multi-user.target
EOF
cat>/etc/graylog/sidecar/sidecar.yml<<EOF
server_url: "http://192.168.56.160:9000/api/" #修改
server_api_token: "1tb0974q9eothfnsrrlmu710re77d2u3i14qm6thubim67cqlmuu" #修改
node_name: "$ip"
send_status: true
EOF
node_ip=`ifconfig | grep 'inet' | grep -v '127.0.0.1' |cut -d: -f2|awk '{ print $2}'`
sed -i "s/^node_name.*$/node_name: \"$node_ip\"/" /etc/graylog/sidecar/sidecar.yml
chmod a+x /usr/bin/filebeat_cgroup.sh
systemctl daemon-reload
systemctl enable graylog-sidecar
systemctl restart graylog-sidecar
systemctl status graylog-sidecar
echo "`date +"%Y-%m-%d-%H:%M:%S"` graylog agent 安装完成..."
4.给sidecar创建token
修改sidecar.yml文件,并重启
[root@localhost sidecar]# vim sidecar.yml
server_url: "http://192.168.56.160:9000/api/"
server_api_token: "1oce920dm3rd5jhuibee80b5ssnquok79cg8vds5arjlrglti66v" #修改这个
node_name: "192.168.56.162"
send_status: true
[root@localhost sidecar]# systemctl restart graylog-sidecar
[root@localhost sidecar]# systemctl status graylog-sidecar
三、graylog-WEB配置采集SSH日志
背景:
1、服务器的/var/log/secure这个文件记录了通过ssh协议连接服务器的信息(成功、失败、来源IP、用户等)
2、采集这个日志到graylog中展示
1.配置Inputs
如下图(示例):
2.创建sidecar采集器
如下图(示例):
上述配置字段含义解释
# Needed for Graylog
fields_under_root: true #自定义字段将直接位于输出文档的最外层,而不是嵌套在 fields 子字典中。
fields.collector_node_id: ${sidecar.nodeName} #当 Graylog 集群中有多个 Collector 节点时,这个字段可以用来追踪每个日志事件的来源。
fields.gl2_source_collector: ${sidecar.nodeId} #指定消息的来源采集器,其值为 Sidecar 节点 ID
fields.localIp: ${sidecar.nodeName} #指定消息的本地 IP 地址,其值为 Sidecar节点名称
fields.inputType: agent #指定消息的输入类型,其值为“agent”,表示该消息是来自 Agent 的日志数据。可以自定义
filebeat.registry.flush: 60s #指定 Filebeat 注册表的刷新时间
filebeat.shutdown_timeout: 10s #指定 Filebeat 的关闭超时时间
max_procs: 2 #指定 Filebeat 的最大进程数,默认值为逻辑 CPU 数量的两倍 该参数用于限制 Filebeat 运行时的并发度,防止系统资源被占用过多
multiline.pattern: '^\w{3} [ :0-9]{11} [A-Za-z0-9._-]+' #这个按照日志中的错误需要
multiline.negate: false #设置为false表示不否定多行模式
multiline.match: after #设置为after表示在多行模式下,从当前行的末尾开始匹配
multiline.max_lines: 200 #最大多行数
3.将页面创建好的sidecar与服务器绑定
如下图(示例):
4.创建indices
如下图(示例):
建立新索引 在 System / Sidecar 界面点击 Indices,可以看到默认只有三个索引。
所有推过来的消息全部都挤在在 Default index set 中,一旦滚起来,只会越查越慢。
主要用于将日志存储到es的索引中,环境默认分片的压力
这里开始创建新的索引,点击右上角 Create index set:
Title: 索引的标题
Description: 索引的描述
Index prefer: 这个相当于是索引的名字。这个是关联 ES 里 Index_name 的配置,而且这个值会存档到 MongoDB 以及 ES 中,
建议写个自己能看懂的如果有用到 “-” 号要注意,两个 Index prefer 如果横杠前相同,横杠后不同,也会被报冲突,
比如 aaa-bbb 与 aaa-c1234d 会被报冲突。
Analyzer: 要使用什么分词器。这个是 ES 里的 Analyzer 配置,如果你的 ES 自己建了分析器的话,
比如自定义了词元过滤器(tokenfilter)和分词器(tokenizer)的分析器(Analyzer),
这里输入你的分析器的名字是可以直接调用到的。如果你有其他文字或者任何特殊需求,要用到特别的分析器的话,
这里可以填你的自定义 Analyzer 名。不过默认 Standard 分析器就已经能满足大部分需求了
index shards: 这个涉及到 ES 的分片概念,可以去查看(相关文档)(https://blog.csdn.net/qq_38486203/article/details/80077844)。
index replicas: 索引副本数。默认0。
max. number of segments: 当触发 ES 对 index 的 optimization 时,最大的切割份数。默认是1,这个是做 Index rotation(索引切割)时定义用的参数。
Fields type refresh interval: Fields 列表的变动(比如被前面的 Pipeline 删掉)在多久后会更新到 Index 里,默认是5 ,单位未知,推测秒。
Select rotation strategy: 使用什么方法进行 Index rotation(索引切割)。
索引切割是维护索引易用性和查询速度很重要的一环,以前在给 ES 手动写索引模版的时候这点就很烦人,现在 Graylog 可以全 自动配置了。
有三个可选项,时间(Index time)、字节大小(Index size)、信息统计数量(Index Msg Count)
Select retention stratege: 如何处理过期的索引。Delete index(删除索引)、close index(关闭索引)、Do nothing(不做任何事情)。
max number of indices: 至少保留多少个切割后的数据,也就是存档多少份历史数据。当超过设定的最大索引数量时,会删除旧的索引
Rotation period:
如果保存策略是以时间为单位 这里可以写P1~nD(天数)、P1M(一个月)、PT6H(6小时) 根据实际情况修改
如果保存策略是以大小为单位 1073741824(1GB)、MB 根据实际情况修改
如果保存策略是以数量为单位 默认是20000000个
5.分流并创建流规则
如下图(示例):
上述索引建立好后,开始对stream进行分类。因为刚才创建的索引并没有实质性的分类效果。当日志进来后,还是走默认的stream流All messages。
为什么需要创建新的流?
1、打个比方,在目前的默认情况下只有一个自来水厂(目前只配置了一个 Sidecar),一个水龙头(目前只配置了一个 Input),
一个水管(目前默认只有一个 Stream),两个杯子(目前配置了两个 Index 集,一个是默认的 Default Index Set,
一个是上面我们刚新建的 Index Set),这里的水管就是我们目前提到的 Stream。
2、按照正常的理解思路,水(Log)从一个水龙头(Input)进来后,如果只有一根水管(Stream),你是无论如何也分不到两个杯子(Index)里的,
也就是说我们刚建好的索引实际上完全没用上。如果我们有多个服务,也就是多个杯子,这时候必须要拉新的水管,才能同时给多个杯子里装水。
步骤
1、新建 Stream 点击导航栏的 Stream 选项卡,打开 Stream 列表。这里展示了目前你所拥有的所有 Stream。
如果没错的话,当前默认只有三个 Stream,其中的 All Message 就充当了当前环境下的主水管。
2、既然我们已经有了两个杯子(可能你已经建了多个杯子),那么我们就要接入一个全新的水管。
点击右侧的 Create Stream:
Title: 标题。这个稍后可以改
Description: 描述。稍后可以改
Index Set: 使用哪个 Index ,也就是前面提到的“杯子”,在下拉列表里可以选择自己的 Index,代表这个 Stream 会将所有符合匹配规则(稍后讲到)的数据捕获到自己的数据流里,同时储存到指定的 Index 中,这个稍后也可以改。
[ ] Remove matches from 'All Messages' Stream: 这个参数要注意一下,如果打上勾,那么所有被本 Stream 捕获的日志数据,都会从默认的 All Message 中移除。也就是说,如果不勾,那数据就有双份,因为没有被移除。如果勾选了数据就只有单份。
但是同样默认的 All Message Stream 用的是 Default Index Set,如果你建立了多个 Stream,全部都不勾选这个的话All Message 的 Index 压力就会很大。这个请按照自己的实际情况配置,我建议是勾上。
3、配置与启用
Stream Stream 建立好后,还是无法发挥作用的,因为我们还没有写 Rules,也就是 Stream 无法知道应该抓取什么样的数据,
相当于水龙头里流的是各式各样的混合液,你新接的水管无法从中分离出纯净水,
那么我们要给水管设定判定规则,决定符合什么条件的才算 “纯净水”。
Field: 从哪个 Field 进行判断。这里就要重新提到我们之前在对 Sidecar 进行配置时,那里的“log_file_path”参数的作用了
Type: 匹配规则。默认是“match exactly(全量匹配)”,这里用作分类的话可以选“contain(包含)”选项,其他选项如果有需要可以自己调试
Value: 要匹配 Fields 中的哪个值。可以观察一下我们在对 Sidecar 配置完成后,发进来的日志的详细信息——点击 Search 页面,
再点击右侧的任意一条日志,将其展开,选择对应的值
Inverted: 是否反选。也就是对 Type 进行反向选择,如果选“contain”,还打钩了这个框,那就是“not contain”,其他规则也类似
Description: 描述
快速规则添加完成后,这个 Stream 只要点击 Start Stream,就可以开始根据你设定的 Rule 抓取日志了。
6.通过提取器extractor替换source字段的值
方式一、采用提取器操作,方式二、见最下方的补充,采用pipelines规则实现
提取器定义
定义:
提取器Extractor是一种功能强大的工具,用于从日志消息的内容中提取数据并将其存储为字段。
这些提取器可以自动化地解析和分析日志数据,帮助用户更好地理解和搜索日志。
提取器广泛用于日志数据的预处理,可以提取结构化信息并转换为特定的字段,供后续的搜索、分析或报警使用
提取器的作用
1、从原始日志消息中提取有用的信息,并将其存储为 Graylog 消息的字段(这个字段可以是自定义的新字段或者是替换掉原字段的值)
2、可以用来过滤、重命名、转换或者删除不需要的字段,优化后续的数据处理流程。
3、可以将原始日志中的非结构化数据(如字符串、JSON、XML 等)转换为结构化的字段
提取器类型
Graylog 支持多种类型的提取器,每种提取器适用于不同的日志格式:
Grok 提取器: 最常用的提取器之一,适用于结构化的文本数据。可以使用正则表达式匹配日志中的模式,并将提取的内容分配到字段。
JSON 提取器: 适用于提取 JSON 格式的日志数据。如果日志是 JSON 格式,使用 JSON 提取器可以轻松地提取字段。
XML 提取器: 用于解析 XML 格式的日志数据。
Regex 提取器: 基于正则表达式的提取器,适用于各种日志格式,特别是那些不规则的文本数据。
如下图(示例):
需求: 通过提取器extractor解析message字段中的日志,提取包含的IP地址,将原source字段的localhost.localdomain替换为提取到的IP地址
在此处解释一下配置提取器的参数含义
Extractor type | 提取器表达式类型(选择的是什么就是什么) |
---|---|
source field | 针对哪个字段进行提取(选择的是什么就是什么) |
Regular expression | 正则表达式,可以进行测试看是否能正确获取到值 |
condition | Always try to extract: 这个选项表示无论消息内容如何,都会尝试提取规则。这意味着提取器将始终尝试从日志消息中提取数据,无论消息是否包含预期的模式或字符串。 Only attempt extraction if field contains string: 这个选项表示只有当消息中包含特定的字符串时,提取器才会尝试提取数据。你需要指定一个字符串,如果消息中包含这个字符串,提取器就会应用提取规则。 Only attempt extraction if field matches regular expression: 这个选项表示只有当消息符合特定的正则表达式时,提取器才会尝试提取数据。需要提供一个正则表达式,如果消息内容与该正则表达式匹配,提取器就会应用提取规则 |
store as field | 将获取到的值用哪个字段接收(可以用原字段(覆盖原值),也可以自定义字段(会新增)) |
Extraction strategy | Copy(复制): 当选择"copy"策略时,提取器会从原始消息中提取出指定的数据,并将其复制到一个新的字段中。原始消息保持不变,新的字段包含了提取出的数据。这种方法不会改变原始消息的内容。 Cut(剪切): 当选择"cut"策略时,提取器会从原始消息中提取出指定的数据,并将其剪切到一个新的字段中。这意味着原始消息中提取出的部分将被移除,新的字段包含了提取出的数据。这种方法会改变原始消息的内容,因为它移除了被提取的部分。 |
Extractor title | 提取器名称 |
Add Converter | 选择一个转换器,该转换器用于将提取出的数据转换为不同的格式或类型。 |
最终替换效果展示
7.移除日志中无关紧要的日志字段
需求: 如下图示例
创建pipelines步骤:
1、新建 Pipeline 点击 System 选项卡,找到“Pipeline”。点击“Add new pipeline”:Title: Pipeline 的标题
2、Description: 描述。稍后可以改。创建后,点击右上角的 “Manage rules” ,点击 “Create Rule”,创建一条新的 Pipeline 规则
3、Rule source: 规则内容
上述配置如下所示
rule "funciton RemoveFields"
when
has_field("beats_type")
then
remove_field("beats_type");
remove_field("@metadata_beat");
remove_field("@metadata_type");
remove_field("@metadata_version");
remove_field("agent_ephemeral_id");
remove_field("@timestamp");
remove_field("agent_id");
remove_field("agent_type");
remove_field("agent_version");
remove_field("ecs_version");
remove_field("input_type");
remove_field("log_offset");
remove_field("host_name");
remove_field("log_flags");
remove_field("agent_name");
remove_field("agent_hostname");
end
绑定pipeline到对应的流中
注意事项: 此处建议也把默认的All messages流也选择上,刚开始我测试时没有选择它,发现创建的剔除无关字段的规则并没有再自定义流中生效,选择后就生效了
8.最终效果演示
8.1.分流效果演示
8.2.日志效果展示
补充知识点
1.使用pipelines从message字段中获取源IP、用户、端口、状态
message字段如下所示
需求:使用pipelines从message字段中获取源IP、用户、端口、状态
在pipelines中创建规则rules
查看结果: 完美的实现了从message中获取想要的字段信息
2.使用pipelines规则获取被连接的服务器IP
实际上这个值直接从sidecar.yml中的node_name中可以获取到,但是此处演示使用pipeline规则获取
从控制台查看gl2_remote_ip地址
查看结果
3.通过提取器Extractor获取登录失败的次数
步骤如下
验证结果如下
4.增加一个字段serviceName,获取对应的服务名称
需求:如果日志保存在宿主机中且以/xxx/xxx/logs/opsService/opsService.log 形式保存的日志,那么在graylog中增加一个字段,用于获取到该日志对应的微服务名称。此处即为serviceName: opsService
演示:以/var/log/secure日志文件为主,最终获取到serviceName:secure
验证结果如下
总结
从采用docker-compose部署组件到agent的安装调试,实现了一个简单的从0到1的graylog+sidecar的日志系统,涉及到的难点就是pipelines、提取器、正则表达式三大部分,针对不同的服务日志如何采用以上三部分取出日志中的重点信息并通过规则之间的相互配合,把日志的重点呈现给后端开发人员。当然,graylog不只有这一个功能,还具备根据日志关键字实现日志告警,后续有空为大家更新相关文章。