当前位置: 首页 > article >正文

基于 K8S kubernetes 的常见日志收集方案

目录

1、日志对我们来说到底重不重要?

2、常见的日志收集方案

2.1 EFK

2.2 ELK Stack

2.3 ELK+filebeat

2.4 其他方案

2、elasticsearch组件介绍

3、filebeat组件介绍

3.1 filebeat和beat关系

3.2 filebeat是什么?

3.3 Filebeat工作原理

3.4 传输方案

4、logstash组件介绍

4.1 Logstash工作原理

5、fluentd组件介绍

6、fluentd、filebeat、logstash对比分析

7、安装elasticsearch组件

7.1 创建名称空间

7.2 查看kube-logging名称空间是否创建成功

7.3 安装elasticsearch组件

7.3.1 创建headless service服务

7.3.2 通过statefulset创建elasticsearch集群

7.3.2.1 创建Storageclass,实现存储类动态供给

7.3.2.2 安装elasticsearch集群

8、安装kibana可视化UI界面

9、安装fluentd组件

10、测试收集pod容器日志

11、基于EFK+logstash+kafka构建高吞吐量的日志收集平台

1、部署fluentd

2、接入kafka

3、配置logstash

4、启动logstash


文档中的YAML文件配置直接复制粘贴可能存在格式错误,故实验中所需要的YAML文件以及本地包均打包至网盘

链接:https://pan.baidu.com/s/15Ryaoa0_9ABQElLw9y28DA 
提取码:xdbm 

实验环境:

EFK安装在k8s集群,k8s环境如下:

k8s集群:        k8s的控制节点

ip:192.168.40.180

主机名:xianchaomaster1

配置:6vCPU/6Gi内存

                    

k8s的工作节点:

ip:192.168.40.181

主机名:xianchaonode1

配置:6vCPU/8Gi内存

1、日志对我们来说到底重不重要?

在生产环境或者测试环境,如果某个服务或者业务组件出现问题,如何定位和排查?需要靠日志,日志是定位问题的重要手段,就像办案人员要根据现场留下的线索推断案情一样。

监控、日志:企业必须具备的

日志打印的常见级别:

日志打印通常有四种级别,从高到底分别是:ERROR、WARN、INFO、DEBUG。应该选用哪种级别是很重要的问题。

日志级别中的优先级是什么意思在你的系统中如果开启了某一级别的日志后,就不会打印比它级别低的日志。例如,程序如果开启了INFO级别日志,DEBUG日志就不会打印通常在生产环境中开启INFO日志。

1、DEBUG:

DEBU可以打印出最详细的日志信息,主要用于开发过程中打印一些运行信息。

2、INFO:

INFO可以打印一些你感兴趣的或者重要的信息,这个可以用于生产环境中输出程序运行的一些重要信息,但是不能滥用,避免打印过多的日志。

3、WARNING:

WARNING 表明发生了一些暂时不影响运行的错误,会出现潜在错误的情形,有些信息不是错误信息,但是也要给程序员的一些提示 

4、ERROR:

ERROR 可以打印错误和异常信息,如果不想输出太多的日志,可以使用这个级别这一级就是比较重要的错误了,软件的某些功能已经不能继续执行了。

那么应该打印什么级别的日志呢?先我们应该明确谁在看日志。

通常来说,系统出了问题客户不会进到系统对着控制台查看日志输出,所以日志所面对的主体对象必然是软件开发人员(包括测试测试、维护人员)。

下面我们假设几种场景来帮助我们理解日志级别

程序开发结束后交由给测试人员进行测试,测试人员根据测试用例发现某个用例的输出和预期不符,此时他的一反应该是查看日志。此时的日志是INFO级别日志不会出现DEBUG级别的日志,现在就需要根据日志打印分为两种情况决定他下一步操作:

通过查看INFO日志发现是由于自己操作失误,造成了程序结果和预期不符合,这种情况不是程序出错,所以并不是bug,不需要开发人员到场。

通过查看INFO日志发现自己的操作正确,根据INFO日志查看并不是操作失误造成,这个时候就需要开发人员到场确认。

以上两种情况是理想情况,测试人员仅根据INFO级别的日志就能判断出程序的输出结果与预期不符是因为自己操作失误还是程序bug。更为现实的情况实际是测试人员并不能根据INFO级别的日志判断是否是自己失误还是程序bug。

综上,INFO级别的日志应该是能帮助测试人员判断这是否是一个真正的bug,而不是自己操作失误造成的。假设测试人员现在已经初步判断这是一个bug,并且这个bug不那么明显,此时就需要开发人员到场确认。开发人员到达现场后,一步应该是查看INFO日志初步作判断验证测试人员的看法,接着如果不能判断出问题所在则应该是将日志级别调整至DEBUG级别,打印出DEBUG级别的日志,通过DEBUG日志来分析定位bug出在哪里。所以,DEBUG级别的日志应该是能帮助开发人员分析定位bug所在的位置。

ERRORWARN的级别都比INFO要高,所以在设定日志级别在INFO时,这两者的日志也会被打印。根据上面INFO和DEBUG级别的区别以及适用人员可以知道,ERROR和WARN是同时给测试和开发、运维观察的。WARN级别称之为“警告”,这个“警告”实际上就有点含糊了,它不算错,你可以选择忽视它,但也可以选择重视它。例如,现在一个WARN日志打出这么一条日志“系统有崩溃的风险”,这个时候就需要引起足够的重视,它代表现在不会崩溃,但是它有崩溃的风险。或者出现“某用户在短时间内将密码输出很多次过后才进入了系统”,这个时候是不是系统被暴力破解了呢?等等,这个级别日志如同它的字面含义,给你一个警告,你可以选择忽视,也可以重视,但至少它现在不会给系统带来其他影响。

 ERROR级别称之为“错误”,这个含义就更明显了,就是系统出现了错误,需要处理。为常见的就是捕获异常时所打印的日志。

2、常见的日志收集方案

常见的进行日志分析的方法:直接在日志文件中grep、awk就可以获得自己想要的信息。但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询。需要集中化的日志管理,把所有服务器上的日志收集汇总。常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。

大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定位到具体的服务器和服务模块,构建一套集中式日志系统,可以提高定位问题的效率。

对大量的日志业务数据进行分析,如平台PV、UV、IP、PageTOP等维度进行分析查询等。另外安全审计、数据挖掘、行为分析等都少不了日志对其作为支撑。

2.1 EFK

Kubernetes集群上运行多个服务和应用程序时,日志收集系统可以帮助快速分类和分析Pod生成的大量日志数据。Kubernetes中比较流行的日志收集解决方案是Elasticsearch、Fluentd和Kibana(EFK)技术栈,也是官方推荐的一种方案。

Elasticsearch是一个实时的,分布式的,可扩展的搜索引擎,它允许进行全文本和结构化搜索以及对日志进行分析。它通常用于索引和搜索大量日志数据,也可以用于搜索许多不同种类的文档。

Elasticsearch通常与Kibana一起部署,kibana可以把Elasticsearch采集到的数据通过dashboard(仪表板)可视化展示出来。Kibana允许通过Web界面浏览Elasticsearch日志数据,也可自定义查询条件快速检索出elasticccsearch中的日志数据

Fluentd是一个流行的开源数据收集器,我们在 Kubernetes 集群节点上安装 Fluentd,通过获取容器日志文件、过滤和转换日志数据,然后将数据传递到 Elasticsearch 集群,在该集群中对其进行索引和存储。

2.2 ELK Stack

E - Elasticsearch(简称:ES)

L - Logstash

K - Kibana

Elasticsearch:日志存储和搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。

Logstash:是一个完全开源的工具,他可以对你的日志进行收集、过滤,并将其存储供以后使用(支持动态的从各种数据源搜集数据,并对数据进行过滤、分析、丰富、统一格式等操作。)。

Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志。

流程

应用程序(AppServer)–>Logstash–>ElasticSearch–>Kibana–>浏览器(Browser):

Logstash收集AppServer产生的Log,并存放到ElasticSearch集群中,而Kibana则从ElasticSearch集群中查询数据生成图表,再返回给Browser。

考虑到聚合端(日志处理、清洗等)负载问题和采集端传输效率,一般在日志量比较大的时候在采集端和聚合端增加队列,以用来实现日志消峰。

2.3 ELK+filebeat

Filebeat(采集)—> Logstash(聚合、处理)—> ElasticSearch (存储)—>Kibana (展示)

2.4 其他方案

ELK日志流程可以有多种方案(不同组件可自由组合,根据自身业务配置),常见有以下:

    Logstash(采集、处理)—> ElasticSearch (存储)—>Kibana (展示)

    Logstash(采集)—> Logstash(聚合、处理)—> ElasticSearch (存储)—>Kibana (展示)

    Filebeat(采集、处理)—> ElasticSearch (存储)—>Kibana (展示)

    Filebeat(采集)—> Logstash(聚合、处理)—> ElasticSearch (存储)—>Kibana (展示)

Filebeat(采集)—> Kafka/Redis(消峰) —> Logstash(聚合、处理)—> ElasticSearch (存储)—>Kibana (展示)

2、elasticsearch组件介绍

Elasticsearch 是一个分布式的免费开源搜索和分析引擎,适用于包括文本、数字、地理空间、结构化和非结构化数据等在内的所有类型的数据。Elasticsearch 在 Apache Lucene 的基础上开发而成,由 Elasticsearch N.V.(即现在的 Elastic)于 2010 年首次发布。Elasticsearch 以其简单的 REST 风格 API、分布式特性、速度和可扩展性而闻名,是 Elastic Stack 的核心组件;Elastic Stack 是一套适用于数据采集、扩充、存储、分析和可视化的免费开源工具。人们通常将 Elastic Stack 称为 ELK Stack(代指 Elasticsearch、Logstash 和 Kibana),目前 Elastic Stack 包括一系列丰富的轻量型数据采集代理,这些代理统称为 Beats,可用来向 Elasticsearch 发送数据。

3、filebeat组件介绍

3.1 filebeat和beat关系

filebeat是Beats中的一员。
Beats是一个轻量级日志采集器,Beats家族有6个成员,早期的ELK架构中使用Logstash收集、解析日志,但是Logstash对内存、cpu、io等资源消耗比较高。相比Logstash,Beats所占系统的CPU和内存几乎可以忽略不计。


目前Beats包含六种工具:

1Packetbeat:网络数据(收集网络流量数据)

2Metricbeat:指标(收集系统、进程和文件系统级别的CPU和内存使用情况等数据)

3Filebeat:日志文件(收集文件数据)

4Winlogbeat:windows事件日志(收集Windows事件日志数据)

5Auditbeat:审计数据(收集审计日志)

6Heartbeat:运行时间监控(收集系统运行时的数据)

3.2 filebeat是什么?

Filebeat是用于转发和收集日志数据的轻量级传送工具。Filebeat监视指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或 Logstash

Filebeat的工作方式如下:启动Filebeat时,它将启动一个或多个输入,这些输入将在为日志数据指定的位置中查找。对于Filebeat所找到的每个日志,Filebeat都会启动收集器。每个收集器都读取单个日志以获取新内容,并将新日志数据发送到libbeat,libbeat将聚集事件,并将聚集的数据发送到为Filebeat配置的输出。

   工作的流程图如下:

Filebeat 有两个主要组件:

    harvester:一个harvester负责读取一个单个文件的内容。harvester逐行读取每个文件,并把这些内容发送到输出。每个文件启动一个harvester。

    Input:一个input负责管理harvesters,并找到所有要读取的源。如果input类型是log,则input查找驱动器上与已定义的log日志路径匹配的所有文件,并为每个文件启动一个harvester。

3.3 Filebeat工作原理

在任何环境下,应用程序都有停机的可能性。 Filebeat 读取并转发日志行,如果中断,则会记住所有事件恢复联机状态时所在位置。

Filebeat带有内部模块(auditd,Apache,Nginx,System和MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化。

FileBeat 不会让你的管道超负荷。FileBeat 如果是向 Logstash 传输数据,当 Logstash 忙于处理数据,会通知 FileBeat 放慢读取速度。一旦拥塞得到解决,FileBeat将恢复到原来的速度并继续传播。

Filebeat保持每个文件的状态,并经常刷新注册表文件中的磁盘状态。状态用于记住harvester正在读取的最后偏移量,并确保发送所有日志行。Filebeat将每个事件的传递状态存储在注册表文件中。所以它能保证事件至少传递一次到配置的输出,没有数据丢失。

3.4 传输方案

1、output.elasticsearch

如果你希望使用 filebeat 直接向 elasticsearch 输出数据,需要配置 output.elasticsearch

output.elasticsearch:

  hosts: ["192.168.40.180:9200"]

2、output.logstash

如果使用filebeat向 logstash输出数据,然后由 logstash 再向elasticsearch 输出数据,需要配置 output.logstash。 logstash 和 filebeat 一起工作时,如果 logstash 忙于处理数据,会通知FileBeat放慢读取速度。一旦拥塞得到解决,FileBeat 将恢复到原来的速度并继续传播。这样,可以减少管道超负荷的情况。

output.logstash:

  hosts: ["192.168.40.180:5044"]  

3、output.kafka

如果使用filebeat向kafka输出数据,然后由 logstash 作为消费者拉取kafka中的日志,并再向elasticsearch 输出数据,需要配置 output.logstash

output.kafka:

  enabled: true

  hosts: ["192.168.40.180:9092"]

  topic: elfk8stest

4、logstash组件介绍

Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。Logstash 是一个应用程序日志、事件的传输、处理、管理和搜索的平台。你可以用它来统一对应用程序日志进行收集管理,提供 Web 接口用于查询和统计。

输入:采集各种样式、大小和来源的数据

数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从你的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

过滤器:实时解析和转换数据

数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响:

1、利用 Grok 从非结构化数据中派生出结构

2、从 IP 地址破译出地理坐标

3、将 PII 数据匿名化,完全排除敏感字段

4、整体处理不受数据源、格式或架构的影响

输出:选择你的存储,导出你的数据

尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。Logstash 提供众多输出选择,你可以将数据发送到你要指定的地方。 

4.1 Logstash工作原理

Logstash 有两个必要元素:input 和 output ,一个可选元素:filter。 这三个元素,分别代表 Logstash 事件处理的三个阶段:输入 > 过滤器 > 输出

Input负责从数据源采集数据。

filter 将数据修改为你指定的格式或内容。

output 将数据传输到目的地。

在实际应用场景中,通常输入、输出、过滤器不止一个。Logstash 的这三个元素都使用插件式管理方式,可以根据应用需要,灵活的选用各阶段需要的插件,并组合使用。

常用input模块

    Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,可从日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

    file:从文件系统上的文件读取

    syslog:在众所周知的端口514上侦听系统日志消息,并根据RFC3164格式进行解析

    redis:从redis服务器读取,使用redis通道和redis列表。 Redis经常用作集中式Logstash安装中的“代理”,它将接收来自远程Logstash“托运人”的Logstash事件排队。

    beats:处理由Filebeat发送的事件。

常用的filter模块

    过滤器是Logstash管道中的中间处理设备。可以将条件过滤器组合在一起,对事件执行操作。

    grok:解析和结构任意文本。 Grok目前是Logstash中将非结构化日志数据解析为结构化和可查询的最佳方法。

    mutate:对事件字段执行一般转换。可以重命名,删除,替换和修改事件中的字段。

    drop:完全放弃一个事件,例如调试事件。

    clone:制作一个事件的副本,可能会添加或删除字段。

    geoip:添加有关IP地址的地理位置的信息

常用output

    elasticsearch:将事件数据发送给 Elasticsearch(推荐模式)。

    file:将事件数据写入文件或磁盘。

    graphite:将事件数据发送给 graphite(一个流行的开源工具,存储和绘制指标, http://graphite.readthedocs.io/en/latest/)。

    statsd:将事件数据发送到 statsd (这是一种侦听统计数据的服务,如计数器和定时器,通过UDP发送并将聚合发送到一个或多个可插入的后端服务)。

常用code插件

    json:以JSON格式对数据进行编码或解码。

    multiline:将多行文本事件(如java异常和堆栈跟踪消息)合并为单个事件。

input {

      kafka {

        bootstrap_servers => "192.168.40.180:9092"

        auto_offset_reset => "latest"

        consumer_threads => 5

        decorate_events => true

        topics => ["elktest"]

      }

}

          

output {

    elasticsearch {

        hosts => ["192.168.40.180:9200"]

        index => "elkk8stest-%{+YYYY.MM.dd}"

        }

}        

5、fluentd组件介绍

fluentd是一个针对日志的收集、处理、转发系统。通过丰富的插件系统,可以收集来自于各种系统或应用的日志,转化为用户指定的格式后,转发到用户所指定的日志存储系统之中。

fluentd 常常被拿来和Logstash比较,我们常说ELK,L就是这个agent。fluentd 是随着Docker和es一起流行起来的agent。

  • fluentd 比 logstash 更省资源;
  • 更轻量级的 fluent-bid 对应 filebeat,作为部署在结点上的日志收集器;
  • fluentd 有更多强大、开放的插件数量和社区。插件多,也非常灵活,规则也不复杂。

6、fluentd、filebeat、logstash对比分析

常见的日志采集工具有Logstash、Filebeat、Fluentd、Logagent、rsyslog等等,那么他们之间有什么区别呢?什么情况下我们应该用哪一种工具?

Logstash

Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。

优势

Logstash 主要的点就是它的灵活性,主要因为它有很多插件,详细的文档以及直白的配置格式让它可以在多种场景下应用。我们基本上可以在网上找到很多资源,几乎可以处理任何问题。

劣势

Logstash 致命的问题是它的性能以及资源消耗(默认的堆大小是 1GB)。尽管它的性能在近几年已经有很大提升,与它的替代者们相比还是要慢很多的。这里有 Logstash 与 rsyslog 性能对比以及Logstash 与 filebeat 的性能对比。它在大数据量的情况下会是个问题。

另一个问题是它目前不支持缓存,目前的典型替代方案是将 Redis 或 Kafka 作为中心缓冲池:

典型应用场景

因为 Logstash 自身的灵活性以及网络上丰富的资料,Logstash 适用于原型验证阶段使用,或者解析非常的复杂的时候。在不考虑服务器资源的情况下,如果服务器的性能足够好,我们也可以为每台服务器安装 Logstash 。我们也不需要使用缓冲,因为文件自身就有缓冲的行为,而 Logstash 也会记住上次处理的位置。

如果服务器性能较差,并不推荐为每个服务器安装 Logstash ,这样就需要一个轻量的日志传输工具将数据从服务器端经由一个或多个 Logstash 中心服务器传输到 Elasticsearch

随着日志项目的推进,可能会因为性能或代价的问题,需要调整日志传输的方式(log shipper)。当判断 Logstash 的性能是否足够好时,重要的是对吞吐量的需求有着准确的估计,这也决定了需要为 Logstash 投入多少硬件资源。

Filebeat

作为 Beats 家族的一员,Filebeat 是一个轻量级的日志传输工具,它的存在正弥补了 Logstash 的缺点:Filebeat 作为一个轻量级的日志传输工具可以将日志推送到中心 Logstash。

在版本 5.x 中,Elasticsearch 具有解析的能力(像 Logstash 过滤器)— Ingest。这也就意味着可以将数据直接用 Filebeat 推送到 Elasticsearch,并让 Elasticsearch 既做解析的事情,又做存储的事情。也不需要使用缓冲,因为 Filebeat 也会和 Logstash 一样记住上次读取的偏移,如果需要缓冲(例如,不希望将日志服务器的文件系统填满),可以使用 Redis/Kafka,因为 Filebeat 可以与它们进行通信。

优势

Filebeat 只是一个二进制文件没有任何依赖。它占用资源极少,尽管它还十分年轻,正式因为它简单,所以几乎没有什么可以出错的地方,所以它的可靠性还是很高的。它也为我们提供了很多可以调节的点,例如:它以何种方式搜索新的文件,以及当文件有一段时间没有发生变化时,何时选择关闭文件句柄。

劣势

Filebeat 的应用范围十分有限,所以在某些场景下我们会碰到问题。例如,如果使用 Logstash 作为下游管道,我们同样会遇到性能问题。正因为如此,Filebeat 的范围在扩大。开始时,它只能将日志发送到 Logstash 和 Elasticsearch,而现在它可以将日志发送给 Kafka 和 Redis,在 5.x 版本中,它还具备过滤的能力。

Fluentd

Fluentd 创建的初衷主要是尽可能的使用 JSON 作为日志输出,所以传输工具及其下游的传输线不需要猜测子字符串里面各个字段的类型。这样,它为几乎所有的语言都提供库,这也意味着,我们可以将它插入到我们自定义的程序中。

优势

和多数 Logstash 插件一样,Fluentd 插件是用 Ruby 语言开发的非常易于编写维护。所以它数量很多,几乎所有的源和目标存储都有插件(各个插件的成熟度也不太一样)。这也意味这我们可以用 Fluentd 来串联所有的东西。

劣势

因为在多数应用场景下,我们会通过 Fluentd 得到结构化的数据,它的灵活性并不好。但是我们仍然可以通过正则表达式,来解析非结构化的数据。尽管,性能在大多数场景下都很好,但它并不是最好的,和 syslog-ng 一样,它的缓冲只存在与输出端,单线程核心以及 Ruby GIL 实现的插件意味着它大的节点下性能是受限的,不过,它的资源消耗在大多数场景下是可以接受的。对于小的或者嵌入式的设备,可能需要看看 Fluent Bit,它和 Fluentd 的关系与 Filebeat 和 Logstash 之间的关系类似。

典型应用场景

Fluentd 在日志的数据源和目标存储各种各样时非常合适,因为它有很多插件。而且,如果大多数数据源都是自定义的应用,所以可以发现用 fluentd 的库要比将日志库与其他传输工具结合起来要容易很多。特别是在我们的应用是多种语言编写的时候,即我们使用了多种日志库,日志的行为也不太一样。

Logagent

Logagent 是 Sematext 提供的传输工具,它用来将日志传输到 Logsene(一个基于 SaaS 平台的 Elasticsearch API),因为 Logsene 会暴露 Elasticsearch API,所以 Logagent 可以很容易将数据推送到 Elasticsearch 。

优势

可以获取 /var/log 下的所有信息,解析各种格式(Elasticsearch,Solr,MongoDB,Apache HTTPD等等),它可以掩盖敏感的数据信息,例如,个人验证信息(PII),出生年月日,信用卡号码,等等。它还可以基于 IP 做 GeoIP 丰富地理位置信息(例如,access logs)。同样,它轻量又快速,可以将其置入任何日志块中。在新的 2.0 版本中,它以第三方 node.js 模块化方式增加了支持对输入输出的处理插件。重要的是 Logagent 有本地缓冲,所以不像 Logstash ,在数据传输目的地不可用时会丢失日志。

劣势

尽管 Logagent 有些比较有意思的功能(例如,接收 Heroku 或 CloudFoundry 日志),但是它并没有 Logstash 灵活。

典型应用场景

Logagent 作为一个可以做所有事情的传输工具是值得选择的(提取、解析、缓冲和传输)。

logtail

阿里云日志服务的生产者,目前在阿里集团内部机器上运行,经过3年多时间的考验,目前为阿里公有云用户提供日志收集服务。

采用C++语言实现,对稳定性、资源控制、管理等下过很大的功夫,性能良好。相比于logstash、fluentd的社区支持,logtail功能较为单一,专注日志收集功能。

优势

logtail占用机器cpu、内存资源最少,结合阿里云日志服务的E2E体验良好。

劣势

logtail目前对特定日志类型解析的支持较弱,后续需要把这一块补起来。

rsyslog

绝大多数 Linux 发布版本默认的 syslog 守护进程,rsyslog 可以做的不仅仅是将日志从 syslog socket 读取并写入 /var/log/messages 。它可以提取文件、解析、缓冲(磁盘和内存)以及将它们传输到多个目的地,包括 Elasticsearch 。可以从此处找到如何处理 Apache 以及系统日志。

优势

rsyslog 是经测试过的最快的传输工具。如果只是将它作为一个简单的 router/shipper 使用,几乎所有的机器都会受带宽的限制,但是它非常擅长处理解析多个规则。它基于语法的模块(mmnormalize)无论规则数目如何增加,它的处理速度始终是线性增长的。这也就意味着,如果当规则在 20-30 条时,如解析 Cisco 日志时,它的性能可以大大超过基于正则式解析的 grok ,达到 100 倍(当然,这也取决于 grok 的实现以及 liblognorm 的版本)。

它同时也是我们能找到的最轻的解析器,当然这也取决于我们配置的缓冲。

劣势

rsyslog 的配置工作需要更大的代价(这里有一些例子),这让两件事情非常困难:

文档难以搜索和阅读,特别是那些对术语比较陌生的开发者。

5.x 以上的版本格式不太一样(它扩展了 syslogd 的配置格式,同时也仍然支持旧的格式),尽管新的格式可以兼容旧格式,但是新的特性(例如,Elasticsearch 的输出)只在新的配置下才有效,然后旧的插件(例如,Postgres 输出)只在旧格式下支持。

尽管在配置稳定的情况下,rsyslog 是可靠的(它自身也提供多种配置方式,最终都可以获得相同的结果),它还是存在一些 bug 。

典型应用场景

rsyslog 适合那些非常轻的应用(应用,小VM,Docker容器)。如果需要在另一个传输工具(例如,Logstash)中进行处理,可以直接通过 TCP 转发 JSON ,或者连接 Kafka/Redis 缓冲。

rsyslog 还适合我们对性能有着非常严格的要求时,特别是在有多个解析规则时。那么这就值得为之投入更多的时间研究它的配置。

重点:Logstash、fluentd、filebeat

实验需要的课件说明:

下文需要的资源清单yaml文件和离线镜像压缩包都在课件里,可上传自己机器上,直接使用即可

7、安装elasticsearch组件

7.1 创建名称空间

安装Elasticsearch集群之前,我们先创建一个名称空间,在这个名称空间下安装日志收工具elasticsearch、fluentd、kibana。我们创建一个kube-logging名称空间,将EFK组件安装到该名称空间中。

[root@xianchaomaster1]# kubectl apply -f kube-logging.yaml

kube-logging.yaml文件内容如下:

kind: Namespace

apiVersion: v1

metadata:

  name: kube-logging

7.2 查看kube-logging名称空间是否创建成功

[root@xianchaomaster1 efk]# kubectl get namespaces | grep kube-logging

#显示如下,说明创建成功:

kube-logging      Active   86s

7.3 安装elasticsearch组件

通过上面步骤已经创建了一个名称空间kube-logging,在这个名称空间下去安装日志收集组件efk,首先,我们需要部署一个3节点的Elasticsearch集群。我们使用3个Elasticsearch Pods可以避免高可用的多节点群集中发生的“脑裂问题。 Elasticsearch脑裂可参考如下:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html#split-brain

7.3.1 创建headless service服务

[root@xianchaomaster1]# kubectl apply -f elasticsearch_svc.yaml

elasticsearch_svc.yaml文件内容如下:

kind: Service

apiVersion: v1

metadata:

  name: elasticsearch

  namespace: kube-logging

  labels:

    app: elasticsearch

spec:

  selector:

    app: elasticsearch

  clusterIP: None

  ports:

    - port: 9200

      name: rest

    - port: 9300

      name: inter-node

查看elasticsearch的service是否创建成功

[root@xianchaomaster1]# kubectl get services --namespace=kube-logging

看到如下,说明在kube-logging名称空间下创建了一个名字是elasticsearch的headless service:

NAME            TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)             AGE

elasticsearch   ClusterIP   None         <none>        9200/TCP,9300/TCP   2m

现在我们已经为 Pod 设置了无头服务和一个稳定的域名.elasticsearch.kube-logging.svc.cluster.local,接下来我们通过 StatefulSet来创建具体的 Elasticsearch的Pod 应用。

7.3.2 通过statefulset创建elasticsearch集群

7.3.2.1 创建Storageclass,实现存储类动态供给

#安装nfs服务,选择k8s集群的xianchaomaster1节点,k8s集群的xianchaomaster1节点的ip是192.168.40.180

#yum安装nfs

[root@xianchaomaster1 ~]# yum install nfs-utils -y

[root@xianchaonode1 ~]# yum install nfs-utils -y

#启动nfs服务

[root@xianchaomaster1 ~]# systemctl start nfs

[root@xianchaonode1 ~]# systemctl start nfs

#设置nfs开机自启动

[root@xianchaomaster1 ~]# systemctl enable nfs.service

[root@xianchaonode1 ~]# systemctl enable nfs.service

#在xianchaomaster1上创建一个nfs共享目录

[root@xianchaomaster1 ~]# mkdir /data/v1 -p

#编辑/etc/exports文件

  1. [root@xianchaomaster1 ~]# vim /etc/exports

/data/v1 *(rw,no_root_squash)

#加载配置,使配置生效

[root@xianchaomaster1 ~]# exportfs -arv

[root@xianchaomaster1 ~]# systemctl restart nfs

#创建nfs作为存储的供应商

1、创建运行nfs-provisioner需要的sa账号

[root@xianchaomaster1 nfs]# cat serviceaccount.yaml

apiVersion: v1

kind: ServiceAccount

metadata:

  name: nfs-provisioner

[root@xianchaomaster1 nfs]# kubectl apply -f serviceaccount.yaml

serviceaccount/nfs-provisioner created

扩展:什么是sa?

sa的全称是serviceaccount。

serviceaccount是为了方便Pod里面的进程调用Kubernetes API或其他外部服务而设计的

指定了serviceaccount之后,我们把pod创建出来了,我们在使用这个pod时,这个pod就有了我们指定的账户的权限了

2、对sa授权 

[root@xianchaomaster1]# kubectl create clusterrolebinding nfs-provisioner-clusterrolebinding --clusterrole=cluster-admin --serviceaccount=default:nfs-provisioner

#把nfs-subdir-external-provisioner.tar.gz上传到xianchaonode1上,手动解压。

[root@xianchaonode1 ~]# docker load -i nfs-subdir-external-provisioner.tar.gz

#通过deployment创建pod用来运行nfs-provisioner

[root@xianchaomaster1]# kubectl apply -f deployment.yaml

deployment.yaml文件解释说明:

kind: Deployment

apiVersion: apps/v1

metadata:

  name: nfs-provisioner

spec:

  selector:

    matchLabels:

      app: nfs-provisioner

  replicas: 1

  strategy:

    type: Recreate

  template:

    metadata:

      labels:

        app: nfs-provisioner

    spec:

      serviceAccount: nfs-provisioner

      containers:

        - name: nfs-provisioner

          image: registry.cn-beijing.aliyuncs.com/mydlq/nfs-subdir-external-provisioner:v4.0.0

          imagePullPolicy: IfNotPresent

          volumeMounts:

            - name: nfs-client-root

              mountPath: /persistentvolumes

          env:

            - name: PROVISIONER_NAME

              value: example.com/nfs

            - name: NFS_SERVER

              value: 192.168.40.180

#这个需要写nfs服务端所在的ip地址,大家需要写自己安装了nfs服务的机器ip

            - name: NFS_PATH

              value: /data/v1

#这个是nfs服务端共享的目录

      volumes:

        - name: nfs-client-root

          nfs:

            server: 192.168.40.180

            path: /data/v1

#验证nfs是否创建成功

[root@xianchaomaster1]# kubectl get pods | grep nfs

#显示如下说明创建成功:

nfs-provisioner-5975849bb4-92dhq   1/1     Running     3          11h

7.3.2.2 安装elasticsearch集群

#创建stoorageclass

[root@xianchaomaster1]# kubectl apply -f class.yaml

class.yaml文件内容如下:

apiVersion: storage.k8s.io/v1

kind: StorageClass

metadata:

  name: do-block-storage

provisioner: example.com/nfs

注:

provisioner:  example.com/nfs   

#该值需要和nfs provisioner配置的PROVISIONER_NAME处的value值保持一致

#把elasticsearch_7_2_0.tar.gzbusybox.tar.gz

上传到xianchaonode1,手动解压:

[root@xianchaonode1 ~]# docker load -i elasticsearch_7_2_0.tar.gz

[root@xianchaonode1 ~]# docker load -i busybox.tar.gz

#更新资源清单文件

[root@xianchaomaster1]# kubectl apply -f elasticsearch-statefulset.yaml

elasticsearch-statefulset.yaml文件解释说明:

apiVersion: apps/v1

kind: StatefulSet

metadata:

  name: es-cluster

  namespace: kube-logging

spec:

  serviceName: elasticsearch

  replicas: 3

  selector:

    matchLabels:

      app: elasticsearch

  template:

    metadata:

      labels:

        app: elasticsearch

上面内容的解释:在kube-logging的名称空间中定义了一个es-cluster的StatefulSet。然后,我们使用serviceName 字段与我们前创建的headless ElasticSearch服务相关联。这样可以确保可以使用以下DNS地址访问StatefulSet中的每个Pod:,es-cluster-[0,1,2].elasticsearch.kube-logging.svc.cluster.local,其中[0,1,2]与Pod分配的序号数相对应我们指定3replicas(3个Pod副本selector matchLabels 设置为app: elasticseach。该.spec.selector.matchLabels和.spec.template.metadata.labels字段必须匹配。

2)statefulset中定义pod模板,内容如下:

. . .

    spec:

      containers:

      - name: elasticsearch

        image: docker.elastic.co/elasticsearch/elasticsearch:7.2.0

        imagePullPolicy: IfNotPresent

        resources:

            limits:

              cpu: 1000m

            requests:

              cpu: 100m

        ports:

        - containerPort: 9200

          name: rest

          protocol: TCP

        - containerPort: 9300

          name: inter-node

          protocol: TCP

        volumeMounts:

        - name: data

          mountPath: /usr/share/elasticsearch/data

        env:

          - name: cluster.name

            value: k8s-logs

          - name: node.name

            valueFrom:

              fieldRef:

                fieldPath: metadata.name

          - name: discovery.seed_hosts

            value: "es-cluster-0.elasticsearch,es-cluster-1.elasticsearch,es-cluster-2.elasticsearch"

          - name: cluster.initial_master_nodes

            value: "es-cluster-0,es-cluster-1,es-cluster-2"

          - name: ES_JAVA_OPTS

            value: "-Xms512m -Xmx512m"

上面内容解释:在statefulset中定义了pod,容器的名字是elasticsearch,镜像是docker.elastic.co/elasticsearch/elasticsearch:7.2.0。使用resources字段来指定容器至少需要0.1个vCPU,并且容器最多可以使用1个vC​​PU了解有关资源请求和限制,可参考Resource Management for Pods and Containers | Kubernetes

容器暴露了9200和9300两个端口,名称要和上面定义的 Service 保持一致,通过volumeMount声明了数据持久化目录,定义了一个data数据卷通过volumeMount把它挂载到容器里的/usr/share/elasticsearch/data目录

容器中设置一些环境变量:

cluster.name:Elasticsearch 集群的名称,我们这里 k8s-logs。

node.name:节点的名称,通过metadata.name来获取。这将解析为 es-cluster-[0,1,2],取决于节点的指定顺序。

discovery.seed_hosts:此字段用于设置在Elasticsearch集群中节点相互连接的发现方法它为我们的集群指定了一个静态主机列表。由于我们之前配置的无头服务,我们的 Pod 具有唯一的 DNS 地址es-cluster-[0,1,2].elasticsearch.kube-logging.svc.cluster.local,因此我们相应地设置此地址变量即可。由于都在同一个 namespace 下面,所以我们可以将其缩短为es-cluster-[0,1,2].elasticsearch。

要了解有关 Elasticsearch 发现的更多信息,请参阅 Elasticsearch 官方文档:Discovery and cluster formation | Elasticsearch Guide [8.15] | Elastic

ES_JAVA_OPTS:这里我们设置为-Xms512m -Xmx512m,告诉JVM使用512 MB的最小和最大堆。这个值应该根据群集的资源可用性和需求调整这些参数。

要了解更多信息,请参阅设置堆大小的相关文档:Heap size settings | Elasticsearch Guide [8.15] | Elastic

3)initcontainer内容

 . . .

      initContainers:

      - name: fix-permissions

        image: busybox

        command: ["sh", "-c", "chown -R 1000:1000 /usr/share/elasticsearch/data"]

        securityContext:

          privileged: true

        volumeMounts:

        - name: data

          mountPath: /usr/share/elasticsearch/data

      - name: increase-vm-max-map

        image: busybox

        command: ["sysctl", "-w", "vm.max_map_count=262144"]

        securityContext:

          privileged: true

      - name: increase-fd-ulimit

        image: busybox

        command: ["sh", "-c", "ulimit -n 65536"]

        securityContext:

          privileged: true

这里我们定义了几个在主应用程序之前运行的 Init 容器,这些初始容器按照定义的顺序依次执行,执行完成后才会启动主应用容器。

第一个名为 fix-permissions 的容器用来运行 chown 命令,将 Elasticsearch 数据目录的用户和组更改为1000:1000(Elasticsearch 用户的 UID)。因为默认情况下,Kubernetes 用 root 用户挂载数据目录,这会使得 Elasticsearch 无法访问该数据目录,可以参考 Elasticsearch 生产中的一些默认注意事项相关文档说明:https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html#_notes_for_production_use_and_defaults

第二个名为 increase-vm-max-map 的容器用来增加操作系统对mmap计数的限制,默认情况下该值可能太低,导致内存不足的错误,要了解更多关于该设置的信息,可以查看 Elasticsearch 官方文档说明:Virtual memory | Elasticsearch Guide [8.15] | Elastic

最后一个初始化容器是用来执行ulimit命令增加打开文件描述符的最大数量的。

此外 Elastisearch Notes for Production Use 文档还提到了由于性能原因最好禁用 swap,对于 Kubernetes 集群而言,最好也是禁用 swap 分区的。

现在我们已经定义了主应用容器和它之前运行的 Init Containers 来调整一些必要的系统参数,接下来可以添加数据目录的持久化相关的配置

4)在 StatefulSet 中,使用volumeClaimTemplates来定义volume 模板即可:

. . .

  volumeClaimTemplates:

  - metadata:

      name: data

      labels:

        app: elasticsearch

    spec:

      accessModes: [ "ReadWriteOnce" ]

      storageClassName: do-block-storage

      resources:

        requests:

          storage: 10Gi

我们这里使用 volumeClaimTemplates 来定义持久化模板,Kubernetes 会使用它为 Pod 创建 PersistentVolume,设置访问模式为ReadWriteOnce,这意味着它只能被 mount 到单个节点上进行读写,然后最重要的是使用了一个名为do-block-storage的 StorageClass 对象,所以我们需要提前创建该对象,我们这里使用的 NFS 作为存储后端,所以需要安装一个对应的nfs  provisioner 驱动

#查看es的pod是否创建成功

[root@xianchaomaster1]# kubectl get pods -n kube-logging

显示如下,说明es集群创建成功了:

NAME           READY   STATUS    RESTARTS   AGE

es-cluster-0   1/1     Running   0          55s

es-cluster-1   1/1     Running   0          35s

es-cluster-2   1/1     Running   0          28s

[root@xianchaomaster1]# kubectl get svc -n kube-logging

NAME            TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)             

elasticsearch   ClusterIP   None         <none>        9200/TCP,9300/TCP

8、安装kibana可视化UI界面

#把kibana_7_2_0.tar.gz上传到xianchaonode1节点,手动解压:

[root@xianchaonode1 ~]# docker load -i kibana_7_2_0.tar.gz

#更新资源清单文件

[root@xianchaomaster]# kubectl apply -f kibana.yaml

kibana.yaml资源清单文件解读

apiVersion: v1

kind: Service

metadata:

  name: kibana

  namespace: kube-logging

  labels:

    app: kibana

spec:

  type: NodePort

  ports:

  - port: 5601

  selector:

    app: kibana

---

apiVersion: apps/v1

kind: Deployment

metadata:

  name: kibana

  namespace: kube-logging

  labels:

    app: kibana

spec:

  replicas: 1

  selector:

    matchLabels:

      app: kibana

  template:

    metadata:

      labels:

        app: kibana

    spec:

      containers:

      - name: kibana

        image: docker.elastic.co/kibana/kibana:7.2.0

        imagePullPolicy: IfNotPresent

        resources:

          limits:

            cpu: 1000m

          requests:

            cpu: 100m

        env:

          - name: ELASTICSEARCH_URL

            value: http://elasticsearch.kube-logging.svc.cluster.local:9200

        ports:

        - containerPort: 5601

上面我们定义了两个资源对象,一个 Service 和 Deployment,为了测试方便,我们将 Service 设置为了 NodePort 类型,Kibana Pod 中配置都比较简单,唯一需要注意的是我们使用 ELASTICSEARCH_URL 这个环境变量来设置Elasticsearch 集群的端点和端口,直接使用 Kubernetes DNS 即可,此端点对应服务名称为 elasticsearch,由于是一个 headless service,所以该域将解析为3个 Elasticsearch Pod 的 IP 地址列表。

[root@xianchaomaster1]#kubectl get pods -n kube-logging

显示如下,看到kibana开头的pod,说明kibana也已经部署成功了

NAME                      READY   STATUS    RESTARTS   AGE

es-cluster-0              1/1     Running   0          170m

es-cluster-1              1/1     Running   0          170m

es-cluster-2              1/1     Running   0          170m

kibana-5749b5778b-c9djr   1/1     Running   0          4m3s

[root@xianchaomaster1 efk]# kubectl get svc -n kube-logging

NAME            TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)             

elasticsearch   ClusterIP   None             <none>        9200/TCP,9300/TCP   

kibana          NodePort    10.106.209.160   <none>        5601:31966/TCP      

在浏览器中打开http://<任意节点IP>:31966即可,如果看到如下欢迎界面证明 Kibana 已经成功部署到了Kubernetes集群之中。

9、安装fluentd组件

我们使用daemonset控制器部署fluentd组件,这样可以保证集群中的每个节点都可以运行同样fluentd的pod副本,这样就可以收集k8s集群中每个节点的日志,在k8s集群中,容器应用程序的输入输出日志会重定向到node节点里的json文件中,fluentd可以tail和过滤以及把日志转换成指定的格式发送到elasticsearch集群中。除了容器日志,fluentd也可以采集kubelet、kube-proxy、docker的日志。

# 离线镜像压缩包fluentd.tar.gz上传到xianchaonode1和xianchaomaster1上,手动解压:

[root@xianchaonode1 ~]# docker load -i fluentd.tar.gz

[root@xianchaomaster1 ~]# docker load -i fluentd.tar.gz

#更新资源清单文件

[root@xianchaomaster]# kubectl apply -f fluentd.yaml

#查看fluentd是否部署成功

[root@xianchaomaster1]# kubectl get pods -n kube-logging

显示如下,看到fluentd的pod的status状态是running,说明部署成功:

NAME                 READY   STATUS    RESTARTS  

es-cluster-0             1/1      Running   0          

es-cluster-1             1/1      Running   0          

es-cluster-2             1/1      Running   0         

fluentd-dszg7           1/1      Running   0         

fluentd-dwk7t           1/1      Running   0       

kibana-84cf7f59c-2546x  1/1     Running   0       

Fluentd 启动成功后,我们可以前往 Kibana 的 Dashboard 页面中

点击Try our sample data

点击左侧的Discover

可以看到如下配置页面:

在这里可以配置我们需要的 Elasticsearch 索引,前面 Fluentd 配置文件中我们采集的日志使用的是 logstash 格式,这里只需要在文本框中输入logstash-*即可匹配到 Elasticsearch 集群中的所有日志数据

点击Next step

选择@timestamp,创建索引

点击左侧的discover,可看到如下:

10、测试收集pod容器日志

#把busybox.tar.gz上传到xianchaonode1节点,手动解压:

[root@xianchaonode1 ~]# docker load -i busybox.tar.gz

[root@xianchaomaster1]# kubectl apply -f pod.yaml

Kibana查询语言KQL官方地址:

https://www.elastic.co/guide/en/kibana/7.2/kuery-query.html

登录到kibana的控制面板,在discover处的搜索栏中输入kubernetes.pod_name:counter这将过滤名为的Pod的日志数据counter,如下所示:

通过上面几个步骤,我们已经在k8s集群成功部署了elasticsearch,fluentd,kibana,这里使用的efk系统包括3个Elasticsearch Pod,一个Kibana Pod和一组作为DaemonSet部署的Fluentd Pod。

11、基于EFK+logstash+kafka构建高吞吐量的日志收集平台

fluentd-->kafka-->logstash-->elasticsearch-->kibana

1、部署fluentd

https://github.com/kubernetes/kubernetes/blob/master/cluster/addons/fluentd-elasticsearch/fluentd-es-ds.yaml

[root@xianchaomaster1 ~]# cat fluentd-configmap.yaml

kind: ConfigMap

apiVersion: v1

metadata:

  name: fluentd-config

  namespace: logging

  labels:

    addonmanager.kubernetes.io/mode: Reconcile

data:

  system.conf: |-

    <system>

      root_dir /tmp/fluentd-buffers/

    </system>

  containers.input.conf: |-

    <source>

      @id fluentd-containers.log

      @type tail

      path /var/log/containers/*.log

      pos_file /var/log/es-containers.log.pos

      time_format %Y-%m-%dT%H:%M:%S.%NZ

      localtime

      tag raw.kubernetes.*

      format json

      read_from_head true

    </source>

    # Detect exceptions in the log output and forward them as one log entry.

    <match raw.kubernetes.**>

      @id raw.kubernetes

      @type detect_exceptions

      remove_tag_prefix raw

      message log

      stream stream

      multiline_flush_interval 5

      max_bytes 500000

      max_lines 1000

    </match>

  system.input.conf: |-

    # Logs from systemd-journal for interesting services.

    <source>

      @id journald-docker

      @type systemd

      filters [{ "_SYSTEMD_UNIT": "docker.service" }]

      <storage>

        @type local

        persistent true

      </storage>

      read_from_head true

      tag docker

    </source>

    <source>

      @id journald-kubelet

      @type systemd

      filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]

      <storage>

        @type local

        persistent true

      </storage>

      read_from_head true

      tag kubelet

    </source>

  forward.input.conf: |-

    # Takes the messages sent over TCP

    <source>

      @type forward

    </source>

  output.conf: |-

    # Enriches records with Kubernetes metadata

    <filter kubernetes.**>

      @type kubernetes_metadata

    </filter>

    <match **>

      @id elasticsearch

      @type elasticsearch

      @log_level info

      include_tag_key true

      host es主机ip

      port 9200

      logstash_format true

      request_timeout    30s

      <buffer>

        @type file

        path /var/log/fluentd-buffers/kubernetes.system.buffer

        flush_mode interval

        retry_type exponential_backoff

        flush_thread_count 2

        flush_interval 5s

        retry_forever

        retry_max_interval 30

        chunk_limit_size 2M

        queue_limit_length 8

        overflow_action block

      </buffer>

    </match>

[root@xianchaomaster1 ~]# cat fluentd-daemonset.yaml

apiVersion: v1

kind: ServiceAccount

metadata:

  name: fluentd-es

  namespace: logging

  labels:

    k8s-app: fluentd-es

    kubernetes.io/cluster-service: "true"

    addonmanager.kubernetes.io/mode: Reconcile

---

kind: ClusterRole

apiVersion: rbac.authorization.k8s.io/v1

metadata:

  name: fluentd-es

  labels:

    k8s-app: fluentd-es

    kubernetes.io/cluster-service: "true"

    addonmanager.kubernetes.io/mode: Reconcile

rules:

- apiGroups:

  - ""

  resources:

  - "namespaces"

  - "pods"

  verbs:

  - "get"

  - "watch"

  - "list"

---

kind: ClusterRoleBinding

apiVersion: rbac.authorization.k8s.io/v1

metadata:

  name: fluentd-es

  labels:

    k8s-app: fluentd-es

    kubernetes.io/cluster-service: "true"

    addonmanager.kubernetes.io/mode: Reconcile

subjects:

- kind: ServiceAccount

  name: fluentd-es

  namespace: logging

  apiGroup: ""

roleRef:

  kind: ClusterRole

  name: fluentd-es

  apiGroup: ""

---

apiVersion: apps/v1

kind: DaemonSet

metadata:

  name: fluentd-es

  namespace: logging

  labels:

    k8s-app: fluentd-es

    version: v2.0.4

    kubernetes.io/cluster-service: "true"

    addonmanager.kubernetes.io/mode: Reconcile

spec:

  selector:

    matchLabels:

      k8s-app: fluentd-es

      version: v2.0.4

  template:

    metadata:

      labels:

        k8s-app: fluentd-es

        kubernetes.io/cluster-service: "true"

        version: v2.0.4

      # This annotation ensures that fluentd does not get evicted if the node

      # supports critical pod annotation based priority scheme.

      # Note that this does not guarantee admission on the nodes (#40573).

      annotations:

        scheduler.alpha.kubernetes.io/critical-pod: ''

    spec:

      serviceAccountName: fluentd-es

      containers:

      - name: fluentd-es

        image: cnych/fluentd-elasticsearch:v2.0.4

        env:

        - name: FLUENTD_ARGS

          value: --no-supervisor -q

        resources:

          limits:

            memory: 500Mi

          requests:

            cpu: 100m

            memory: 200Mi

        volumeMounts:

        - name: varlog

          mountPath: /var/log

        - name: varlibdockercontainers

          mountPath: /var/lib/docker/containers

          readOnly: true

        - name: config-volume

          mountPath: /etc/fluent/config.d

      nodeSelector:

        beta.kubernetes.io/fluentd-ds-ready: "true"

      tolerations:

      - key: node-role.kubernetes.io/master

        operator: Exists

        effect: NoSchedule

      terminationGracePeriodSeconds: 30

      volumes:

      - name: varlog

        hostPath:

          path: /var/log

      - name: varlibdockercontainers

        hostPath:

          path: /var/lib/docker/containers

      - name: config-volume

        configMap:

          name: fluentd-config

创建节点标签

[root@xianchaomaster1 ~]# kubectl label nodes xianchaomaster1 beta.kubernetes.io/fluentd-ds-ready=true

[root@xianchaomaster1 ~]# kubectl label nodes xianchaonode1 beta.kubernetes.io/fluentd-ds-ready=true

[root@xianchaomaster1 ~]# kubectl apply  -f fluentd-daemonset.yaml 

2、接入kafka

cat kafka-config.yaml

kind: ConfigMap

apiVersion: v1

metadata:

  name: fluentd-config

  namespace: logging

  labels:

    addonmanager.kubernetes.io/mode: Reconcile

data:

  system.conf: |-

    <system>

      root_dir /tmp/fluentd-buffers/

    </system>

  containers.input.conf: |-

    <source>

      @id fluentd-containers.log

      @type tail

      path /var/log/containers/*.log

      pos_file /var/log/es-containers.log.pos

      time_format %Y-%m-%dT%H:%M:%S.%NZ

      localtime

      tag raw.kubernetes.*

      format json

      read_from_head true

    </source>

    # Detect exceptions in the log output and forward them as one log entry.

    <match raw.kubernetes.**>

      @id raw.kubernetes

      @type detect_exceptions

      remove_tag_prefix raw

      message log

      stream stream

      multiline_flush_interval 5

      max_bytes 500000

      max_lines 1000

    </match>

  system.input.conf: |-

    # Logs from systemd-journal for interesting services.

    <source>

      @id journald-docker

      @type systemd

      filters [{ "_SYSTEMD_UNIT": "docker.service" }]

      <storage>

        @type local

        persistent true

      </storage>

      read_from_head true

      tag docker

    </source>

    <source>

      @id journald-kubelet

      @type systemd

      filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]

      <storage>

        @type local

        persistent true

      </storage>

      read_from_head true

      tag kubelet

    </source>

  forward.input.conf: |-

    # Takes the messages sent over TCP

    <source>

      @type forward

    </source>

  output.conf: |-

    # Enriches records with Kubernetes metadata

    <filter kubernetes.**>

      @type kubernetes_metadata

    </filter>

    <match **>

      @id kafka

      @type kafka2

      @log_level info

      include_tag_key true

      # list of seed brokers

      brokers kafka ip:9092

      use_event_time true

      # buffer settings

      <buffer>

        @type file

        path /var/log/fluentd-buffers/kubernetes.system.buffer

        flush_mode interval

        retry_type exponential_backoff

        flush_thread_count 2

        flush_interval 5s

        retry_forever

        retry_max_interval 30

        chunk_limit_size 2M

        queue_limit_length 8

        overflow_action block

      </buffer>

      # data type settings

      <format>

        @type json

      </format>

      # topic settings

      topic_key topic

      default_topic messages

      # producer settings

      required_acks -1

      compression_codec gzip

    </match>

重启fluentd 

3、配置logstash

配置logstash消费messages日志写入elasticsearch

cat config/kafkaInput_fluentd.conf

input {

    kafka {

        bootstrap_servers => ["kafka ip:9092"]

        client_id => "fluentd"

        group_id => "fluentd"

        consumer_threads => 1

        auto_offset_reset => "latest"

        topics => ["messages"]

    }

}

 

filter {

        json{

                source => "message"

        }

        

       ruby {

       code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"

       }

      ruby {

        code => "event.set('@timestamp',event.get('timestamp'))"

       }

      ruby {

        code => "event.set('find_time',event.get('@timestamp').time.localtime - 8*60*60)"

       }

     mutate {

    remove_field => ["timestamp"]

    remove_field => ["message"]

    }

     

}

output {

          elasticsearch{

               hosts => ["es ip地址: 9200"]

               index => "kubernetes_%{+YYYY_MM_dd}"

 

          }

#    stdout {

#           codec => rubydebug

#           }

}

4、启动logstash

nohup ./bin/logstash -f config/kafkaInput_fluentd.conf --config.reload.automatic --path.data=/opt/logstash/data_fluentd 2>&1 > fluentd.log &


http://www.kler.cn/a/314756.html

相关文章:

  • 消息中间件类型介绍
  • Zookeeper 集群安装
  • 数据结构——栈的实现
  • 【简博士统计学习方法】第1章:2. 统计学习方法的基本分类
  • Effective C++读书笔记——item13(使用对象管理资源)
  • 游戏关卡设计的常用模式
  • 大模型如何学习数据
  • NLP 文本分类核心问题
  • LangChain教程 - 构建一个检索增强生成 (RAG) 应用程序
  • 面试金典题8
  • go webapi上传文件
  • 【Linux】Docker:离线主机部署
  • 【Temporal】日志打印控制
  • 【AI视频】AI虚拟主播制作网站推荐
  • 深度学习02-pytorch-06-张量的形状操作
  • 基于深度学习的智能电网优化
  • Java异常架构与异常关键字
  • Spring后端直接用枚举类接收参数,自定义通用枚举类反序列化器
  • IT行业:未来发展的无限可能
  • 【医学半监督】置信度指导遮蔽学习的半监督医学图像分割
  • 51单片机-系列-数码管中断和定时器
  • Lsposed Java HOOK原理及检测
  • 我的AI工具箱Tauri版-VideoIntroductionClipCut视频介绍混剪
  • Nacos与Eureka的区别:深入解析微服务中的服务注册与发现
  • npm切换为淘宝镜像源
  • GPU加速生物信息分析的尝试