当前位置: 首页 > article >正文

AWS EMR上的Spark日志实时搜索关键指标网页呈现的设计和实现

为了在AWS EMR上实现基于Spark的大数据日志处理系统,并通过Kafka、ElasticSearch和Python Flask构建实时搜索与可视化平台,以下是详细的设计与实现方案:


一、架构设计

日志生成端
Apache Kafka
AWS EMR Spark Streaming
ElasticSearch
Python Flask Web
用户浏览器
核心组件说明
  1. Kafka Cluster:日志收集缓冲层
    • 建议使用Amazon MSK(托管Kafka服务)
    • 按日志量规划分区数和副本数
  2. EMR Spark:实时处理引擎
    • 启用Spark Structured Streaming
    • 使用EMR 6.x+版本(内置Spark 3.x)
  3. ElasticSearch:搜索与存储层
    • 推荐使用Amazon OpenSearch Service(托管ES)
    • 配置热/冷节点架构优化成本
  4. Flask Web:可视化层
    • 部署于EC2或ECS容器
    • 集成Jinja2模板与ECharts可视化

二、详细实现步骤

1. Kafka日志采集配置
# 生产者示例(Python)
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers='kafka-broker1:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

log_data = {
    "timestamp": datetime.now().isoformat(),
    "level": "ERROR",
    "service": "payment-gateway",
    "message": "Transaction timeout"
}
producer.send('app-logs', log_data)
2. EMR集群配置
  • Bootstrap Action:
    #!/bin/bash
    sudo pip-3.7 install elasticsearch-hadoop
    
  • 集群参数
    {
      "Classification": "spark-defaults",
      "Properties": {
        "spark.sql.streaming.schemaInference": "true",
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"
      }
    }
    
3. Spark Streaming处理(Scala示例)
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-broker1:9092")
  .option("subscribe", "app-logs")
  .load()

// JSON解析与字段提取
val logsDF = df.selectExpr("CAST(value AS STRING)")
  .select(from_json($"value", schema).as("log"))
  .select("log.*")
  .withColumn("timestamp", to_timestamp($"timestamp"))

// 关键指标计算
val metricsDF = logsDF
  .withWatermark("timestamp", "5 minutes")
  .groupBy(window($"timestamp", "1 minute"), $"service")
  .agg(
    count("*").as("total_errors"),
    sum(when($"level" === "ERROR", 1).otherwise(0)).as("critical_errors")
  )

// 写入ElasticSearch
metricsDF.writeStream
  .outputMode("update")
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes", "opensearch-domain:9200")
  .option("es.mapping.id", "window")
  .option("checkpointLocation", "/checkpoint")
  .start("metrics-index/_doc")
4. ElasticSearch索引优化
PUT /metrics-index
{
  "settings": {
    "number_of_shards": 3,
    "refresh_interval": "30s"
  },
  "mappings": {
    "dynamic": "strict",
    "properties": {
      "window": {"type": "date_range", "format": "epoch_millis"},
      "service": {"type": "keyword"},
      "total_errors": {"type": "integer"},
      "critical_errors": {"type": "integer"}
    }
  }
}
5. Flask Web服务实现
# app.py
from flask import Flask, render_template
from elasticsearch import Elasticsearch

app = Flask(__name__)
es = Elasticsearch(['opensearch-domain:9200'])

@app.route('/dashboard')
def dashboard():
    query = {
        "size": 0,
        "aggs": {
            "services": {
                "terms": {"field": "service.keyword"},
                "aggs": {"total_errors": {"sum": {"field": "total_errors"}}}
            }
        }
    }
    res = es.search(index="metrics-index", body=query)
    return render_template('dashboard.html', data=res['aggregations'])

# templates/dashboard.html
<script src="https://cdn.jsdelivr.net/npm/echarts@5.4.0/dist/echarts.min.js"></script>
<div id="chart" style="width:800px;height:600px;"></div>
<script>
  let chart = echarts.init(document.getElementById('chart'));
  let option = {
    title: {text: '实时服务错误统计'},
    tooltip: {},
    xAxis: {data: {{ data.services.buckets|map(attribute='key')|list|tojson }} },
    yAxis: {},
    series: [{type: 'bar', data: {{ data.services.buckets|map(attribute='total_errors.value')|list|tojson }}}]
  };
  chart.setOption(option);
</script>

三、性能优化策略

  1. Kafka优化

    • 启用Snappy压缩
    • 配置linger.ms=20和batch.size=16384
  2. Spark调优

    spark-submit --executor-memory 8G \
                 --executor-cores 4 \
                 --conf spark.sql.shuffle.partitions=200
    
  3. ES写入优化

    • 设置es.batch.size.bytes=10mb
    • 禁用副本写入es.write.operation=create
  4. Web层缓存

    @app.route('/dashboard')
    @cache.cached(timeout=10)  # 使用Flask-Caching
    def dashboard():
        # ...
    

---

### **四、监控与运维**

1. **监控指标**:
   - Kafka Consumer Lag
   - Spark Streaming批处理时间
   - ES JVM Heap使用率

2. **日志排查工具**:
   ```bash
   # 查看Spark Streaming进度
   yarn logs -applicationId <appId> -log_files stdout

   # ES慢查询日志
   GET /_search?pretty&pre_filter_shard_size=1024&typed_keys=true

五、安全方案

  1. 网络隔离

    • 将Kafka/ES部署在私有子网
    • 使用Security Group限制访问源
  2. 认证授权

    es = Elasticsearch(
        hosts=['https://opensearch-domain:9200'],
        http_auth=('admin', 'password'),
        use_ssl=True
    )
    

该方案可实现每秒处理万级日志事件,并在5秒内完成从日志产生到可视化展示的全流程。建议根据实际业务需求调整时间窗口和聚合粒度,可通过增加EMR Task节点实现水平扩展。


http://www.kler.cn/a/529837.html

相关文章:

  • 2181、合并零之间的节点
  • 深入解析:一个简单的浮动布局 HTML 示例
  • 【4Day创客实践入门教程】Day2 探秘微控制器——单片机与MicroPython初步
  • [EAI-028] Diffusion-VLA,能够进行多模态推理和机器人动作预测的VLA模型
  • ArkTS语言介绍
  • TensorFlow 示例摄氏度到华氏度的转换(一)
  • 测压表压力表计量表针头针尾检测数据集VOC+YOLO格式4862张4类别
  • 使用MATLAB进行雷达数据采集可视化
  • MySQL的覆盖索引
  • Games104——网络游戏的架构基础
  • Eigen::Tensor使用帮助
  • Qt常用控件 输入类控件
  • 文本复制兼容方案最佳实现落地。
  • Node.js常用知识
  • 在Qt中,slots 关键字有什么用?
  • GIt使用笔记大全
  • 面经-C语言——指针相关概念总结
  • 1. 【.NET Aspire 从入门到实战】--理论入门与环境搭建--引言
  • vue3-element-plus-admin 框架
  • 如何让跨域文件管控简单又高效
  • Hot100之贪心算法
  • 记录一下【Facebook 】expansionToken参数逆向
  • lstm代码解析1.1
  • Ubuntu 下 nginx-1.24.0 源码分析 main函数 — ngx_cdecl 宏
  • kamailio-Core 说明书 版本:Kamailio SIP Server v6.0.x(稳定版)
  • 效用曲线的三个实例