当前位置: 首页 > article >正文

将 Logstash 管道转换为 OpenTelemetry Collector 管道

作者:来自 Elastic Mirko Bez•Taha Derouiche

本指南通过演示如何将常见的 Logstash 管道转换为等效的 OpenTelemetry Collector 配置,帮助 Logstash 用户过渡到 OpenTelemetry。我们将重点介绍日志信号。

简介

Elastic 可观察性策略与 OpenTelemetry 越来越一致。随着 OpenTelemetry 的 Elastic Distributions 的最近发布,我们正在扩展我们的产品,使 OpenTelemetry 更容易使用,Elastic Agent 现在提供 “otel” 模式,使其能够运行 OpenTelemetry Collector 的自定义发行版,无缝增强你的可观察性入门和 Elastic 体验。

这篇文章旨在通过演示如何将一些标准 Logstash 管道转换为相应的 OpenTelemetry Collector 配置,帮助熟悉 Logstash 的用户过渡到 OpenTelemetry。

什么是 OpenTelemetry Collector,我为什么要关心它?

OpenTelemetry 是一个开源框架,可确保与供应商无关的数据收集,为可观察性数据的收集、处理和提取提供标准化方法。 Elastic 完全致力于这一原则,旨在使可观察性真正与供应商无关,并消除用户在切换平台时重新检测其可观察性的需要。

通过采用 OpenTelemetry,你可以获得以下好处:

  • 统一的可观察性:通过使用 OpenTelemetry Collector,你可以从单个工具收集和管理日志、指标和跟踪,从而提供对系统性能和行为的整体可观察性。这简化了微服务等复杂分布式环境中的监控和调试。
  • 灵活性和可扩展性:无论你运行的是小型服务还是大型分布式系统,OpenTelemetry Collector 都可以扩展以处理生成的数据量,从而提供部署为代理(与应用程序一起运行)或网关(集中式集线器)的灵活性。
  • 开放标准:由于 OpenTelemetry 是云原生计算基金会 (CNCF) 下的一个开源项目,因此它可确保你使用广泛接受的标准,从而有助于实现可观测性堆栈的长期可持续性和兼容性。
  • 简化的遥测管道:使用接收器、处理器和导出器构建管道的能力通过集中数据流并最大限度地减少对多个代理的需求,简化了遥测管理。

在接下来的部分中,我们将解释 OTEL Collector 和 Logstash 的结构。

OTEL 收集器配置

OpenTelemetry 收集器配置有不同的部分:

  • 接收器(receivers):从不同来源收集数据。
  • 处理器(processors):转换接收器收集的数据
  • 导出器(exporters):将数据发送到不同的收集器
  • 连接器(connectors):将两个管道连接在一起
  • 服务(service):定义哪些组件处于活动状态
    • 管道(pipelines):结合定义的接收器、处理器、导出器和连接器来处理数据
    • 扩展(extensision)是可选组件,可扩展收集器的功能以完成不直接涉及处理遥测数据的任务(例如,健康监测)
    • 遥测(telemetry),你可以在其中设置收集器本身的可观察性(例如,日志记录和监控)

我们可以将其以以下示意图形式可视化:

我们参考官方文档 Configuration | OpenTelemetry 深入了解组件。

Logstash 管道定义

Logstash 管道由三个主要组件组成:

  • 输入插件:允许我们从不同来源读取数据
  • 过滤器插件:允许我们转换和过滤数据
  • 输出插件:允许我们发送数据

Logstash 还有一个特殊的输入和一个特殊的输出,允许管道到管道通信,我们可以将其视为与 OpenTelemetry 连接器类似的概念。更多有关 Logstash 的介绍,请参考文章:

  • Logstash:Logstash 入门教程 (一)

  • 如何安装 Elastic 栈中的 Logstash

  • Logstash:Logstash-to-Logstash 通信

Logstash 管道与 Otel Collector 组件的比较

我们可以将 Logstash 管道和 OTEL Collector 管道组件之间的关系图示如下:

理论讲得够多了!让我们深入研究一些例子。

将 Logstash 管道转换为 OpenTelemetry Collector 管道

示例 1:解析和转换日志行

让我们考虑以下行:

2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404

我们将应用以下步骤:

  1. 从文件 /tmp/demo-line.log 中读取行。
  2. 将输出定义为 Elasticsearch 数据流 logs-access-default。
  3. 提取 @timestamp、user.name、client.ip、client.port、url.path 和 http.status.code。
  4. 删除与 SYSTEM 用户相关的日志消息。
  5. 使用相关日期格式解析日期时间戳并将其存储在 @timestamp 中。
  6. 根据已知代码的描述添加代码 http.status.code_description。
  7. 将数据发送到 Elasticsearch。

Logstash pipeline

input {
    file {
        path => "/tmp/demo-line.log" #[1]
        start_position => "beginning"
        add_field => { #[2]
            "[data_stream][type]" => "logs"
            "[data_stream][dataset]" => "access_log"
            "[data_stream][namespace]" => "default"
        }
    }
}

filter {
    grok { #[3]
        match => {
            "message" => "%{TIMESTAMP_ISO8601:[date]}: user %{WORD:[user][name]} accessed from %{IP:[client][ip]}:%{NUMBER:[client][port]:int} path %{URIPATH:[url][path]} with error %{NUMBER:[http][status][code]}"
        }
    }
    if "_grokparsefailure" not in [tags] {
        if [user][name] == "SYSTEM" { #[4]
            drop {}
        }
        date { #[5]
            match => ["[date]", "ISO8601"]
            target => "[@timestamp]"
            timezone => "UTC"
            remove_field => [ "date" ]
        }
        translate { #[6]
            source => "[http][status][code]"
            target => "[http][status][code_description]"
            dictionary => {
                "200" => "OK"
                "403" => "Permission denied"
                "404" => "Not Found"
                "500" => "Server Error"
            }
            fallback => "Unknown error"
        }
    }
}

output {
    elasticsearch { #[7]
        hosts => "elasticsearch-enpoint:443"
        api_key => "${ES_API_KEY}"
    }
}

OpenTelemtry Collector 配置

receivers:
  filelog: #[1]
    start_at: beginning
    include:
      - /tmp/demo-line.log
    include_file_name: false
    include_file_path: true
    storage: file_storage 
    operators:
    # Copy the raw message into event.original (this is done OOTB by Logstash in ECS mode)
    - type: copy
      from: body
      to: attributes['event.original']
    - type: add #[2]
      field: attributes["data_stream.type"]
      value: "logs"
    - type: add #[2]
      field: attributes["data_stream.dataset"]
      value: "access_log_otel" 
    - type: add #[2]
      field: attributes["data_stream.namespace"]
      value: "default"

extensions:
  file_storage:
    directory: /var/lib/otelcol/file_storage

processors:
  # Adding  host.name (this is done OOTB by Logstash)
  resourcedetection/system:
    detectors: ["system"]
    system:
      hostname_sources: ["os"]
      resource_attributes:
        os.type:
          enabled: false

  transform/grok: #[3]
    log_statements:
      - context: log
        statements:
        - 'merge_maps(attributes, ExtractGrokPatterns(attributes["event.original"], "%{TIMESTAMP_ISO8601:date}: user %{WORD:user.name} accessed from %{IP:client.ip}:%{NUMBER:client.port:int} path %{URIPATH:url.path} with error %{NUMBER:http.status.code}", true), "insert")'

  filter/exclude_system_user:  #[4]
    error_mode: ignore
    logs:
      log_record:
        - attributes["user.name"] == "SYSTEM"

  transform/parse_date: #[5]
    log_statements:
      - context: log
        statements:
          - set(time, Time(attributes["date"], "%Y-%m-%dT%H:%M:%S"))
          - delete_key(attributes, "date")
        conditions:
          - attributes["date"] != nil

  transform/translate_status_code:  #[6]
    log_statements:
      - context: log
        conditions:
        - attributes["http.status.code"] != nil
        statements:
        - set(attributes["http.status.code_description"], "OK")                where attributes["http.status.code"] == "200"
        - set(attributes["http.status.code_description"], "Permission Denied") where attributes["http.status.code"] == "403"
        - set(attributes["http.status.code_description"], "Not Found")         where attributes["http.status.code"] == "404"
        - set(attributes["http.status.code_description"], "Server Error")      where attributes["http.status.code"] == "500"
        - set(attributes["http.status.code_description"], "Unknown Error")     where attributes["http.status.code_description"] == nil

exporters:
  elasticsearch: #[7]
    endpoints: ["elasticsearch-enpoint:443"]
    api_key: ${env:ES_API_KEY}
    tls:
    logs_dynamic_index:
      enabled: true
    mapping:
      mode: ecs

service:
  extensions: [file_storage]
  pipelines:
    logs:
      receivers:
        - filelog
      processors:
        - resourcedetection/system
        - transform/grok
        - filter/exclude_system_user
        - transform/parse_date
        - transform/translate_status_code
      exporters:
        - elasticsearch

这些将在 Elasticsearch 中生成以下文档:

{
    "@timestamp": "2024-09-20T08:33:27.000Z",
    "client": {
        "ip": "89.66.167.22",
        "port": 10592
    },
    "data_stream": {
        "dataset": "access_log",
        "namespace": "default",
        "type": "logs"
    },
    "event": {
        "original": "2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404"
    },
    "host": {
        "hostname": "my-laptop",
        "name": "my-laptop",
     },
    "http": {
        "status": {
            "code": "404",
            "code_description": "Not Found"
        }
    },
    "log": {
        "file": {
            "path": "/tmp/demo-line.log"
        }
    },
    "message": "2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404",
    "url": {
        "path": "/blog"
    },
    "user": {
        "name": "frank"
    }
}

示例 2:解析和转换 NDJSON 格式的日志文件

让我们考虑以下 json 行:

{"log\_level":"INFO","message":"User login successful","service":"auth-service","timestamp":"2024-10-11 12:34:56.123 \+0100","user":{"id":"A1230","name":"john\_doe"}}

我们将应用以下步骤:

从文件 /tmp/demo.ndjson 中读取一行。

  1. 将输出定义为 Elasticsearch 数据流 logs-json-default
  2. 解析 JSON 并分配相关键和值。
  3. 解析日期。
  4. 覆盖消息字段。
  5. 重命名字段以遵循 ECS 约定。
  6. 将数据发送到 Elasticsearch。

Logstash pipeline

input {
    file {
        path => "/tmp/demo.ndjson" #[1]
        start_position => "beginning"
        add_field => { #[2]
            "[data_stream][type]" => "logs"
            "[data_stream][dataset]" => "json"
            "[data_stream][namespace]" => "default"
        }
    }
}

filter {
  if [message] =~ /^\{.*/ {
    json { #[3] & #[5]
        source => "message"
    }
  }
  date { #[4]
    match => ["[timestamp]", "yyyy-MM-dd HH:mm:ss.SSS Z"]
    remove_field => "[timestamp]"
  }
  mutate {
    rename => { #[6]
      "service" => "[service][name]"
      "log_level" => "[log][level]"
    }
  }
}


output {
    elasticsearch { # [7]
        hosts => "elasticsearch-enpoint:443"
        api_key => "${ES_API_KEY}"
    }
}

OpenTelemtry Collector 配置

receivers:
  filelog/json: # [1]
    include: 
      - /tmp/demo.ndjson
    retry_on_failure:
      enabled: true
    start_at: beginning
    storage: file_storage 
    operators:
     # Copy the raw message into event.original (this is done OOTB by Logstash in ECS mode)
    - type: copy
      from: body
      to: attributes['event.original']
    - type: add #[2]
      field: attributes["data_stream.type"]
      value: "logs"      
    - type: add #[2]
      field: attributes["data_stream.dataset"]
      value: "otel" #[2]
    - type: add
      field: attributes["data_stream.namespace"]
      value: "default"     


extensions:
  file_storage:
    directory: /var/lib/otelcol/file_storage

processors:
  # Adding  host.name (this is done OOTB by Logstash)
  resourcedetection/system:
    detectors: ["system"]
    system:
      hostname_sources: ["os"]
      resource_attributes:
        os.type:
          enabled: false

  transform/json_parse:  #[3]
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - merge_maps(attributes, ParseJSON(body), "upsert")
        conditions: 
          - IsMatch(body, "^\\{")
      

  transform/parse_date:  #[4]
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - set(time, Time(attributes["timestamp"], "%Y-%m-%d %H:%M:%S.%L %z"))
          - delete_key(attributes, "timestamp")
        conditions: 
          - attributes["timestamp"] != nil

  transform/override_message_field: [5]
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - set(body, attributes["message"])
          - delete_key(attributes, "message")

  transform/set_log_severity: # [6]
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - set(severity_text, attributes["log_level"])          

  attributes/rename_attributes: #[6]
    actions:
      - key: service.name
        from_attribute: service
        action: insert
      - key: service
        action: delete
      - key: log_level
        action: delete

exporters:
  elasticsearch: #[7]
    endpoints: ["elasticsearch-enpoint:443"]
    api_key: ${env:ES_API_KEY}
    tls:
    logs_dynamic_index:
      enabled: true
    mapping:
      mode: ecs

service:
  extensions: [file_storage]
  pipelines:
    logs/json:
      receivers: 
        - filelog/json
      processors:
        - resourcedetection/system    
        - transform/json_parse
        - transform/parse_date        
        - transform/override_message_field
        - transform/set_log_severity
        - attributes/rename_attributes
      exporters: 
        - elasticsearch

这些将在 Elasticsearch 中生成以下文档:

{
    "@timestamp": "2024-10-11T12:34:56.123000000Z",
    "data_stream": {
        "dataset": "otel",
        "namespace": "default",
        "type": "logs"
    },
    "event": {
        "original": "{\"log_level\":\"WARNING\",\"message\":\"User login successful\",\"service\":\"auth-service\",\"timestamp\":\"2024-10-11 12:34:56.123 +0100\",\"user\":{\"id\":\"A1230\",\"name\":\"john_doe\"}}"
    },
    "host": {
        "hostname": "my-laptop",
        "name": "my-laptop",
     },
    "log": {
        "file": {
            "name": "json.log"
        },
        "level": "WARNING"
    },
    "message": "User login successful",
    "service": {
        "name": "auth-service"
    },
    "user": {
        "id": "A1230",
        "name": "john_doe"
    }
}

结论

在这篇文章中,我们展示了如何将典型的 Logstash 管道转换为用于日志的 OpenTelemetry Collector 管道的示例。虽然 OpenTelemetry 提供了用于收集和导出日志的强大工具,但如果你的管道依赖于复杂的转换或脚本,Logstash 仍然是更好的选择。这是因为 Logstash 提供了更广泛的内置功能和更灵活的方法来处理高级数据操作任务。

下一步是什么?

现在你已经看到了将 Logstash 管道转换为 OpenTelemetry 的基本(但现实)示例,现在轮到你深入研究了。根据你的需要,你可以进一步探索并在以下存储库中找到更详细的资源:

  • OpenTelemetry Collector:了解核心 OpenTelemetry 组件,从接收器到导出器。
  • OpenTelemetry Collector Contrib:查找社区贡献的组件以实现更广泛的集成和功能。
  • Elastic 的 opentelemetry-collector-components:深入了解 Elastic 针对 OpenTelemetry Collector 的扩展,为 Elastic Stack 用户提供更多定制功能。

如果你遇到特定挑战或需要处理更高级的用例,这些存储库将是发现可以增强你的管道的其他组件或集成的绝佳资源。所有这些存储库都具有类似的结构,其中包含名为接收器、处理器、导出器和连接器的文件夹,阅读本博客后应该会熟悉这些文件夹。无论你是迁移简单的 Logstash 管道还是处理更复杂的数据转换,这些工具和社区都将为你提供成功实施 OpenTelemetry 所需的支持。

原文:Convert Logstash pipelines to OpenTelemetry Collector Pipelines — Elastic Observability Labs


http://www.kler.cn/a/370184.html

相关文章:

  • ipad和macbook同步zotero文献附件失败的解决办法
  • 55.【5】BUUCTF WEB NCTF2019 sqli
  • day 21
  • adb 命令使用大全
  • 什么样的问题适合用递归
  • 基于python的博客系统设计与实现
  • JavaScript如何判断变量数据类型 - 2024最新版前端秋招面试短期突击面试题【100道】
  • SpringBoot 集成RabbitMQ 实现钉钉日报定时发送功能
  • [LeetCode] 526. 优美的排列
  • Docker | 校园网上docker pull或者docker run失败的一种解决方法
  • 探索C嘎嘎:认识string类
  • 【大数据分析与挖掘模型】matlab实现——非线性回归预测模型
  • 【计算机网络 - 基础问题】每日 3 题(五十七)
  • 《等保测评:安全与发展的双轮驱动》
  • 14 C语言中的关键字
  • Prometheus+Telegraf实现自定义监控项配置
  • RDD的常用转换算子
  • Qt实现播放器顶部、底部悬浮工具栏
  • typescript学习计划(一)--简单介绍typescript
  • VUE组件学习 | 六、v-if, v-else-if, v-else组件
  • OpenAI否认今年将发布“Orion”模型,其语音转写工具Whisper被曝存在重大缺陷|AI日报
  • 【C++】--------- 内存管理
  • 【spark】spark structrued streaming读写kafka 使用kerberos认证
  • 【网络篇】计算机网络——链路层详述(笔记)
  • List<T>属性和方法使用
  • springboot整合kafka