当前位置: 首页 > article >正文

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)

问题

        项目里使用了 AzureBlob 存储了用户上传的各种资源文件,近期 AzureBlob 的流量费用增长很快,想通过分析Blob的日志,获取一些可用的信息,所以有了这个需求:将存储账户的日志(读写,审计,监控等)通过Azure Event Hubs 发送到我的 elasticsearch, 并且发送给elasticsearch的日志进行必要的加工,即从中解析出一些特别的字段,方便分析。

实现步骤

1. 配置 Azure Blob 存储日志记录

Azure Blob 存储提供日志记录功能,你可以启用存储帐户的活动日志(例如:读取、写入、审计等)以便后续处理。具体步骤如下:

  • 启用日志记录
    • 进入 Azure 门户,选择你的存储账户。
    • 监控 部分,选择 诊断设置
    • 创建一个新的诊断设置,启用 Blob 存储 服务的相关日志项(例如,读取、写入、删除、审计日志等)。
    • 选择将日志输出到 Event Hubs,并选择一个已配置的 Event Hub 实例作为目标。
2. 创建 Azure Event Hubs

Event Hubs 将会作为一个中介,接收来自 Azure Blob 存储的日志并将其转发到 Elasticsearch。具体步骤如下:

  • 创建 Event Hubs 命名空间
    • 在 Azure 门户中,选择 创建资源,然后选择 Event Hubs
    • 创建一个新的 Event Hubs 命名空间并选择合适的区域。
    • 在命名空间下创建一个新的 Event Hub 实例。

3. 配置 Azure Function 解析日志

为了根据你的需要解析日志数据并发送到 Elasticsearch,你可以使用 Azure Function。此函数将从 Event Hubs 获取日志数据,解析日志内容并将它们转发到 Elasticsearch。你可以选择编写自定义代码来实现日志解析。

  • 创建 Azure Function

    • 在 Azure 门户中,选择 创建资源,然后选择 Function App
    • 创建一个新的 Function App,选择合适的区域和运行时堆栈(通常是 .NETNode.js)。
    • 在 Function App 中,创建一个新的 Event Hub Trigger 函数,以便在日志数据到达 Event Hub 时自动触发。
  • 编写函数代码

    • 在函数代码中,你可以使用以下技术解析日志数据:

      • 解析日志内容:提取你需要的字段(如操作类型、文件名、时间戳、请求者 IP 等)。
      • 使用 JSON 处理库:如果日志是 JSON 格式,你可以使用相关库(如 JSON.parse().NETJsonSerializer)来解析数据。
      • 创建 Elasticsearch 客户端:使用 Azure Function 的客户端库或 Elastic Search .NET 客户端(或其他语言的客户端)将解析后的日志发送到 Elasticsearch。

      示例代码(JavaScript,Node.js 环境):

      const { Client } = require('@elastic/elasticsearch');
      const client = new Client({ node: 'http://your-elasticsearch-server:9200' });
      
      module.exports = async function (context, eventHubMessages) {
          for (const message of eventHubMessages) {
              try {
                  const logData = JSON.parse(message.body);
                  
                  // 提取你需要的字段
                  const logEntry = {
                      timestamp: logData.timestamp,
                      operation: logData.operationName,
                      blobName: logData.blobName,
                      requestor: logData.requestorIp
                  };
      
                  // 将数据发送到 Elasticsearch
                  await client.index({
                      index: 'storage-logs',
                      document: logEntry
                  });
              } catch (err) {
                  context.log('Error processing message: ', err);
              }
          }
      };
  • 部署 Azure Function:部署并运行该 Azure Function,它将自动从 Event Hubs 获取日志消息,进行解析,并将结果推送到 Elasticsearch。

4. 配置 Elasticsearch 和 Kibana

在 Elasticsearch 中,你需要创建合适的索引模板和映射,以便于后续检索和分析。你还可以使用 Kibana 来可视化日志数据。

  • 配置索引模板和映射

    • 在 Elasticsearch 中,你可以为 storage-logs 索引定义自定义映射,确保字段类型(如时间戳、IP 地址等)符合预期,方便查询。

    示例映射(适用于上述日志):

    PUT /storage-logs
    {
        "mappings": {
            "properties": {
                "timestamp": { "type": "date" },
                "operation": { "type": "keyword" },
                "blobName": { "type": "text" },
                "requestor": { "type": "ip" }
            }
        }
    }
    
  • 创建 Kibana 仪表板

    • 在 Kibana 中,创建新的索引模式(Index Pattern),指向 storage-logs 索引。
    • 然后,使用 Kibana 创建查询、仪表板和可视化,帮助你对存储日志进行实时分析。
5. 使用 Logstash (可选)

如果你不想使用 Azure Function 来解析日志数据,另一个选择是使用 Logstash,它是 Elastic Stack 的一部分,可以帮助你从 Event Hubs 中提取数据并进行解析,然后将其发送到 Elasticsearch。

  • 配置 Logstash

    • 安装 Logstash 并配置它从 Azure Event Hubs 获取数据。可以使用 Event Hub Input Plugin 连接到 Event Hub。
    • 在 Logstash 配置中,使用过滤器(如 json 过滤器)来解析日志数据并转换字段,最后将数据输出到 Elasticsearch。

    示例 Logstash 配置文件:

    input {
      azure_event_hubs {
        connection_string => "your-event-hub-connection-string"
        codec => json
      }
    }
    
    filter {
      json {
        source => "message"
      }
    
      mutate {
        add_field => {
          "timestamp" => "%{[timestamp]}"
          "operation" => "%{[operationName]}"
        }
      }
    }
    
    output {
      elasticsearch {
        hosts => ["http://your-elasticsearch-server:9200"]
        index => "storage-logs"
        document_type => "_doc"
      }
    }
    
6. 测试和监控
  • 测试:在你配置完成之后,开始生成一些 Azure Blob 存储的日志(例如,进行文件操作)。确保这些日志通过 Event Hub 传递到 Azure Function 或 Logstash,成功解析并被发送到 Elasticsearch。
  • 监控:通过 Azure Monitor、Event Hubs 和 Elasticsearch 的监控工具来查看系统的运行状态和日志处理情况。

注意事项

  • 性能:Event Hubs 可以支持高吞吐量的日志数据传输,但要确保 Event Hubs 的吞吐量单位(TU)配置合理,以处理日志的高并发传输。
  • 数据延迟:从 Blob 存储到 Elasticsearch 的数据传输可能存在一定的延迟,尤其是在高负载情况下。可以通过优化 Event Hubs 和 Azure Function 的配置来减少延迟。
  • 日志解析:在解析日志时,要确保正则表达式、JSON 解析等操作高效执行,避免性能瓶颈。
  • 安全性:确保在使用 Event Hubs 和 Elasticsearch 时启用适当的身份验证和加密,避免敏感数据泄露。
  • 索引管理:Elasticsearch 中的日志数据可能会迅速增长,因此需要定期管理索引,如设置索引生命周期管理(ILM)以避免索引膨胀。

通过这种方式,你可以实现将 Azure Blob 存储的日志通过 Event Hubs 发送到 Elasticsearch,并根据需要解析并优化日志的结构,以便在 Elasticsearch 中进行更高效的查询和分析。


前情后续:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3.纯python的实惠版)-CSDN博客





http://www.kler.cn/a/513203.html

相关文章:

  • BH1750使用程序
  • 力扣 739. 每日温度
  • vue3使用音频audio标签
  • 使用nginx搭建通用的图片代理服务器,支持http/https/重定向式图片地址
  • ChatGPT被曝存在爬虫漏洞,OpenAI未公开承认
  • 内存故障原因与诊断(Reasons and Diagnosis of Memory Failure)
  • FPGA开发中的团队协作:构建高效协同的关键路径
  • WPF3-在xaml中引用其他程序集的名称空间
  • RestTemplate-调用远端接口应用场景
  • 根据经纬度查询地理位置信息API
  • React技术栈搭配(全栈)(MERN栈、PERN栈)
  • [Css]父元素监听鼠标移入子元素
  • Kotlin 2.1.0 入门教程(七)
  • 若依报错:无法访问com.ruoyi.common.annotation
  • 微信小程序使用picker根据接口给的省市区的数据实现省市区三级联动或者省市区街道等多级联动
  • 在Android Studio中如何实现综合实验MP3播放器(保姆级教程)
  • Java学习,List 元素替换
  • 服务器安装ESXI7.0系统及通过离线包方式升级到ESXI8.0
  • Y3编辑器功能指引
  • Redis单线程为什么能这么快
  • grafana+prometheus监控linux指标
  • 美区TikTok解封后如何回归使用?
  • 软件授权产品介绍
  • 算法题目总结-栈和队列
  • 数据库基础知识:理论、E-R图、事务、原则
  • 【VOS源码解析-2024CVPR-Cutie】1、train_wrapper结构解析