将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2 换掉付费的Event Hubs)
前情回顾:
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客
前边的方案是挺好的,但 Azure Event Hubs 是付费服务,我这里只是一个获取日志进行必要的分析,并且不要求实时性,关键 没钱,那怎么办呢?
替代方案:
如果对实时性没有严格要求,并且希望避免使用付费的 Azure Event Hubs 服务,可以采用更经济的方式,比如将 Azure Blob 存储的日志直接发送到 Elasticsearch 或通过 Azure Storage Queue 和 Azure Function 来实现。这些方法可以有效减少成本,同时满足日志解析和分析需求。
1. 直接从 Azure Blob 存储读取日志并处理
思路
直接从 Azure Blob 存储 中读取生成的日志文件,解析所需字段后发送到 Elasticsearch。这种方式适合没有实时性需求的场景,定期运行任务即可。
实现步骤
-
启用 Azure 存储日志记录:
- 在存储账户中启用 诊断设置。
- 将日志记录输出到同一存储账户的 Blob 容器中,例如
insights-logs
。
-
使用 Azure Function 或定时任务读取日志:
- 编写脚本(可以用 Python、PowerShell、C# 等语言),从指定的 Blob 容器中下载日志文件。
- 解析日志文件,根据需要提取字段并格式化为 Elasticsearch 可接受的 JSON 数据。
- 将数据批量发送到 Elasticsearch。
-
示例代码:
- 使用 Python 和
azure-storage-blob
库:
from azure.storage.blob import BlobServiceClient from elasticsearch import Elasticsearch import json # Azure Blob Storage 配置 storage_account_name = "your_storage_account" storage_account_key = "your_storage_account_key" container_name = "insights-logs" # Elasticsearch 配置 es = Elasticsearch("http://your-elasticsearch-server:9200") def process_blob_logs(): blob_service_client = BlobServiceClient( account_url=f"https://{storage_account_name}.blob.core.windows.net", credential=storage_account_key ) container_client = blob_service_client.get_container_client(container_name) # 列出日志文件 blobs = container_client.list_blobs() for blob in blobs: blob_client = container_client.get_blob_client(blob) log_data = blob_client.download_blob().readall().decode('utf-8') # 假设日志是 JSON 格式 for line in log_data.splitlines(): try: log_entry = json.loads(line) # 提取你需要的字段 parsed_entry = { "timestamp": log_entry["time"], "operation": log_entry["operationName"], "blobName": log_entry.get("blobName"), "requestorIp": log_entry.get("requestorIpAddress"), } # 写入 Elasticsearch es.index(index="storage-logs", document=parsed_entry) except Exception as e: print(f"Error processing log entry: {e}") if __name__ == "__main__": process_blob_logs()
- 使用 Python 和
-
设置定时运行:
- 如果使用 Azure Function,配置 Timer Trigger 定期运行该脚本。
- 如果使用本地脚本,使用
cron
(Linux)或计划任务(Windows)实现定时任务。
2. 使用 Azure Storage Queue
思路
利用 Azure Storage Queue 作为消息队列替代 Event Hubs,用于暂存日志文件元数据或小型消息,然后由处理程序(如 Azure Function 或脚本)消费队列消息,读取并处理日志。
实现步骤
-
启用日志记录:
- 将日志写入 Blob 存储。
-
配置 Azure Storage Queue:
- 创建一个 Azure Queue。
- 编写脚本将日志的元数据(例如 Blob 的路径)写入队列。
-
编写处理脚本:
- 消费队列消息,根据消息中的 Blob 路径读取并解析日志文件。
-
示例代码:
- 使用 Python 和
azure-storage-queue
:
from azure.storage.queue import QueueClient from azure.storage.blob import BlobServiceClient import json queue_name = "your-queue" storage_account_name = "your_storage_account" storage_account_key = "your_storage_account_key" def process_queue_messages(): queue_client = QueueClient( account_url=f"https://{storage_account_name}.queue.core.windows.net", credential=storage_account_key, queue_name=queue_name ) blob_service_client = BlobServiceClient( account_url=f"https://{storage_account_name}.blob.core.windows.net", credential=storage_account_key ) messages = queue_client.receive_messages() for msg in messages: blob_path = msg.content # 假设队列消息中存储的是 Blob 路径 container_name, blob_name = blob_path.split('/', 1) blob_client = blob_service_client.get_blob_client(container_name, blob_name) log_data = blob_client.download_blob().readall().decode('utf-8') # 解析日志并发送到 Elasticsearch(同上) print(f"Processing blob: {blob_path}") queue_client.delete_message(msg) # 删除已处理消息 if __name__ == "__main__": process_queue_messages()
- 使用 Python 和
3. 直接读取和解析日志文件后推送到 Elasticsearch
这种方法可以完全避开队列服务,直接通过脚本定期从存储账户下载日志文件并解析后推送到 Elasticsearch。
注意事项
- 定时任务频率:根据日志生成的频率设定脚本运行的时间间隔。
- 日志存储策略:Blob 日志文件可能会快速增长,考虑启用存储生命周期管理规则定期删除过期日志文件。
- 安全性:确保存储帐户密钥或连接字符串的安全性,可使用 Azure 的 Managed Identity 替代密钥。
比较与选择
方案 | 适用场景 | 实现复杂度 | 成本 |
---|---|---|---|
直接从 Blob 存储读取 | 数据量不大,无实时性需求 | 低 | 低 |
使用 Storage Queue | 有一定的队列需求,异步处理 | 中 | 较低 |
使用 Azure Event Hubs | 需要高吞吐量和实时性 | 高 | 高 |
推荐:
- 如果数据量不大,选择 直接从 Blob 存储读取。
- 如果需要解耦消息和处理程序,可以选择 Azure Storage Queue。
前情后续:
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2)-CSDN博客
将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3)-CSDN博客