当前位置: 首页 > article >正文

ELK入门教程(超详细)

什么是ELK?

ELK是ElasticsearchLogstashKibana三大开源框架首字母大写简称(后来出现的filebeat属于beats家族中的一员,可以用来替代logstash的数据收集功能,比较轻量级),也被称为Elastic Stack

Filebeat 

Filebeat是用于转发和集中日志数据的轻量级传送工具。Filebeat监视您指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或 Logstash进行索引。Filebeat的工作方式如下:启动Filebeat时,它将启动一个或多个输入,这些输入将在为日志数据指定的位置中查找。对于Filebeat所找到的每个日志,Filebeat都会启动收集器。每个收集器都读取单个日志以获取新内容,并将新日志数据发送到libbeat,libbeat将聚集事件,并将聚集的数据发送到为Filebeat配置的输出。

Logstash

Logstash是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。Logstash能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用Grok从非结构化数据中派生出结构,从IP地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。

ElasticSearch

Elasticsearch是Elastic Stack核心的分布式搜索和分析引擎,是一个基于Lucene、分布式、通过Restful方式进行交互的近实时搜索平台框架。Elasticsearch为所有类型的数据提供近乎实时的搜索和分析。无论您是结构化文本还是非结构化文本,数字数据或地理空间数据,Elasticsearch都能以支持快速搜索的方式有效地对其进行存储和索引。

Kibana

Kibana是一个针对Elasticsearch的开源分析及可视化平台,用来搜索、查看交互存储在Elasticsearch索引中的数据。使用Kibana,可以通过各种图表进行高级数据分析及展示。并且可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以汇总、分析和搜索重要数据日志。还可以让海量数据更容易理解。它操作简单,基于浏览器的用户界面可以快速创建仪表板(dashboard)实时显示Elasticsearch查询动态。

Logstash入门

使用Docker-Compose启动Logstash服务,其中docker-compose.yml文件如下:

version: "3.1"
# 服务配置
services:
  logstash:
    container_name: logstash-7.17.0
    image: docker.elastic.co/logstash/logstash:7.17.0
    volumes:
      - ./logstash/data:/usr/share/logstash/data
      - ./logstash/pipeline:/usr/share/logstash/pipeline
    networks:
      - elk_net

# 网络配置
networks:
  elk_net:
    driver: bridge

示例配置1(标准输入、输出) 

每隔10秒输出字符串:Hello from Logstash!

 input {
  heartbeat {
    interval => 10
    message  => 'Hello from Logstash!'
  }
}
output {
    stdout {
        codec => rubydebug
    }
}

示例配置2(读取文件)

读取文件内容(文件的最后一行将不会读取),示例文件为test.log

hello world!
From Shanghai To Beijing
this is a log for test in logstash!

配置文件如下:

input {
  file {
    path => "/usr/share/logstash/data/test.log"
    start_position => "beginning"
  }
}
output {
    stdout {
        codec => rubydebug
    }

读取结果如下(顺序已打乱)

 

示例配置3(Grok插件) 

Grok为正则表达式,Logstash(v4.4.3)已内置120种表达式,参考网址为:https://github.com/logstash-plugins/logstash-patterns-core/tree/main/patterns.

读取Nginx日志文件,并使用Grok进行过滤。Nginx文件示例如下:

112.195.209.90 - - [20/Feb/2018:12:12:14 +0800] "GET / HTTP/1.1" 200 190 "-" "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Mobile Safari/537.36" "-" 

配置文件如下:

input {
  file {
    path => "/usr/share/logstash/data/test.log"
    start_position => "beginning"
  }
}
filter {
    grok {
        match => {
            "message" => "%{COMBINEDAPACHELOG}"
        }
    }
}
output {
    stdout {
        codec => rubydebug
    }

解析结果为:

 

Kibana在Dev Tools中内置了Grok Debugger(调试器):

 

示例解析配置4(多行读取插件 multiline) 

对 multiline 插件来说,有三个设置比较重要:negate、pattern 和 what。

  • • pattern: 类型是string,要匹配的正则表达式

  • • negate: 类型是boolean,默认false,否定正则表达式

  • • what: 必须设置,可以为 previous 或 next, 如果正则表达式匹配了,那么该事件是属于下一个或是前一个事件

multiline插件可以多行读取。示例文件内容如下(注意最后一行为空行):

[Aug/08/08 14:54:03] hello world
[Aug/08/09 14:54:04] hello logstash
    hello best practice
    hello raochenlin
[Aug/08/10 14:54:05] the end

配置文件:

input {
  file {
    path => "/usr/share/logstash/data/test.log"
    start_position => "beginning"
    codec => multiline {
        pattern => "^\["
        negate => true
        what => "previous"
    }
  }
}
output {
    stdout {
        codec => rubydebug
    }

解析结果如下:

{
       "message" => "[Aug/08/08 14:54:03] hello world",
    "@timestamp" => 2023-12-23T10:41:20.884Z,
          "path" => "/usr/share/logstash/data/test.log",
          "host" => "b62820accf76",
      "@version" => "1"
}
{
       "message" => "[Aug/08/09 14:54:04] hello logstash\n    hello best practice\n    hello raochenlin",
          "path" => "/usr/share/logstash/data/test.log",
          "tags" => [
        [0] "multiline"
    ],
    "@timestamp" => 2023-12-23T10:44:24.846Z,
          "host" => "b62820accf76",
      "@version" => "1"

由于参数what设置为previous,因此只解析出两条数据。当what设置为next时,可解析出三条数据,但解析结果有变化,如下:

{
       "message" => "[Aug/08/08 14:54:03] hello world",
          "path" => "/usr/share/logstash/data/test.log",
    "@timestamp" => 2023-12-23T10:49:23.395Z,
          "host" => "492dfb254e78",
      "@version" => "1"
}
{
       "message" => "    hello best practice\n    hello raochenlin\n[Aug/08/10 14:54:05] the end",
    "@timestamp" => 2023-12-23T10:49:23.415Z,
          "path" => "/usr/share/logstash/data/test.log",
          "host" => "492dfb254e78",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ]
}
{
       "message" => "[Aug/08/09 14:54:04] hello logstash",
          "path" => "/usr/share/logstash/data/test.log",
    "@timestamp" => 2023-12-23T10:49:23.414Z,
          "host" => "492dfb254e78",
      "@version" => "1"

ELK搭建简单示例 

结合Logstash, ElasticSearch与Kibana,将data文件夹中的以log结尾的文件,逐行导入至ElasticSearch中。

logstash.conf配置如下:

input {
  file {
    path => "/usr/share/logstash/data/*.log"
    start_position => "beginning"
  }
}
output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
        hosts => ["http://elasticsearch:9200"]
        index => "test_log"
        action => "index"
    }
}

docker-compose.yml文件如下:

version: "3.1"
# 服务配置
services:
  logstash:
    container_name: logstash-7.17.0
    image: docker.elastic.co/logstash/logstash:7.17.0
    volumes:
      - ./logstash/config/logstash.yml:/usr/share/logstash/logstash.yml
      - ./logstash/data:/usr/share/logstash/data
      - ./logstash/pipeline:/usr/share/logstash/pipeline
    networks:
      - elk_net
    depends_on:
      - elasticsearch

  elasticsearch:
    container_name: elasticsearch-7.17.0
    image: elasticsearch:7.17.0
    environment:
      - "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
      - "http.host=0.0.0.0"
      - "node.name=elastic01"
      - "cluster.name=cluster_elasticsearch"
      - "discovery.type=single-node"
    ports:
      - "9200:9200"
      - "9300:9300"
    volumes:
      - ./es/plugins:/usr/share/elasticsearch/plugins
      - ./es/data:/usr/share/elasticsearch/data
    networks:
      - elk_net

  kibana:
    container_name: kibana-7.17.0
    image: kibana:7.17.0
    ports:
      - "5601:5601"
    networks:
      - elk_net
    depends_on:
      - elasticsearch

# 网络配置
networks:
  elk_net:
    driver: bridge

 ELK日志系统实战

我们来设计这样一个日志系统,其中Logstash可将Flask运行过程中的日志进行收集,并导入至ElasticSearch中,再使用Kibana进行数据分析。

Flask服务

使用Flask构建简单的web服务,代码如下:

# -*- coding: utf-8 -*-
# @file: server.py
# @time: 2023/12/23 19:17
import time
import random
from flask import Flask, Response
import logging

logging.basicConfig(filename='../logstash/data/flask.log',
                    level=logging.DEBUG,
                    format='%(asctime)s-%(filename)s-%(funcName)s-%(levelname)s-%(message)s')
logger = logging.getLogger()

app = Flask("elk_test")


@app.route('/')
def index():
    t1 = time.time()
    logger.info(f"api_endpoint: /, status: 200, cost_time: {(time.time() - t1) * 1000}")
    return "Hello index", 200


@app.route("/io_task")
def io_task():
    t1 = time.time()
    time.sleep(2)
    logger.info(f"api_endpoint: /io_task, status: 200, cost_time: {(time.time() - t1) * 1000}")
    return "IO bound task finish!", 200


@app.route("/cpu_task")
def cpu_task():
    t1 = time.time()
    for i in range(10000):
        n = i*i*i
    logger.info(f"api_endpoint: /cpu_task, status: 200, cost_time: {(time.time() - t1) * 1000}")
    return "CPU bound task finish!", 200


@app.route("/random_sleep")
def random_sleep():
    t1 = time.time()
    time.sleep(random.randint(0, 5))
    logger.info(f"api_endpoint: /random_sleep, status: 200, cost_time: {(time.time() - t1) * 1000}")
    return "random sleep", 200


@app.route("/random_status")
def random_status():
    t1 = time.time()
    status_code = random.choice([200] * 6 + [300, 400, 400, 500])
    logger.info(f"api_endpoint: /random_status, status: {status_code}, cost_time: {(time.time() - t1) * 1000}")
    return Response("random status", status=status_code)


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=5000, debug=True)

使用下面的shell脚本进行HTTP请求模拟:

TIMES=5
for i in $(eval echo "{1..$TIMES}")
do
    siege -c 1 -r 10 http://localhost:5000/
    siege -c 1 -r 5 http://localhost:5000/io_task
    siege -c 1 -r 5 http://localhost:5000/cpu_task
    siege -c 1 -r 3 http://localhost:5000/random_sleep
    siege -c 1 -r 10 http://localhost:5000/random_status
    sleep 5
done 

日志记录在flask.log文件中。

ELK搭建

对上述日志,搭建ELK,docker-compose.yml同上述ELK搭建简单示例,logstash.conf改动如下:

input {
  file {
    path => "/usr/share/logstash/data/flask.log"
    start_position => "beginning"
  }
}
filter {
    # 只对cost_time所在列进行解析
    if "cost_time" in [message] {
        grok {
            match => {
                "message" => "%{TIMESTAMP_ISO8601:request_finish_time}-%{WORD:script}.py-%{WORD:module}-%{LOGLEVEL:loglevel}-api_endpoint: %{DATA:api_endpoint}, status: %{NUMBER:status:int}, cost_time: %{NUMBER:cost_time:float}"
            }
        }
        # 使用mutate过滤器替换字符
        mutate {
            # 替换空格为T
            gsub => [ "request_finish_time", " ", "T" ]
            # 替换逗号为点
            gsub => [ "request_finish_time", ",", "." ]
        }

        # 使用date过滤器解析和格式化日期
        date {
            match => [ "request_finish_time", "ISO8601" ]
        }
    }
    else {
        drop { }
    }
}
output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
        hosts => ["http://elasticsearch:9200"]
        index => "flask_log"
        action => "index"
    }
}

只对有cost_time所在的行进行解析,其它行丢弃,导入至ElasticSearch中的flask_log这个索引中。

数据分析 

对上述的五个API Endpoint进行请求占比分析,饼图如下:

同时,对cost_time进行数据分析,其平均值,90, 95, 99分位数如下表:

 

上述的日志记录方式还有待改进,比如记录程序报错信息,使用json字段解析而不是Grok表达式会更容易些。 


http://www.kler.cn/a/458044.html

相关文章:

  • NCCL源码解读3.1:double binary tree双二叉树构建算法,相比ring环算法的优势
  • DC-2 靶场渗透
  • 4.Web安全——JavaScript基础
  • 小程序发版后,强制更新为最新版本
  • 【图像处理】OpenCv + Python 实现 Photoshop 中的色彩平衡功能
  • CMake配置区分Debug和Release模式
  • 【算法】复杂性理论初步
  • Wordpress Tutor LMS插件存在SQL注入漏洞(CVE-2024-10400)
  • 【机器学习】SVM支持向量机(二)
  • mysql建立主从集群
  • 38. 日志
  • MySQL root用户密码忘记怎么办(Reset root account password)
  • 爬虫案例-爬取网页图片
  • 基于STM32的智能垃圾桶的Proteus仿真
  • 使用 pushy 热更新后 sentry 不能正常显示源码
  • 玉米中的元基因调控网络突出了功能上相关的调控相互作用。/biosample_parser.py
  • 秒鲨后端之MyBatis【2】默认的类型别名、MyBatis的增删改查、idea中设置文件的配置模板、MyBatis获取参数值的两种方式、特殊SQL的执行
  • py打包工具
  • Python + 深度学习从 0 到 1(02 / 99)
  • 基于深度学习(HyperLPR3框架)的中文车牌识别系统-Qt调用Python
  • 在vue3中使用tsx结合render封装一个项目内通用的弹窗组件
  • Docker的概述与安装
  • 算法基础一:冒泡排序
  • React引入Echart水球图
  • systemverilog语法:assertion sequence
  • node-js Express防盗链