AWS EMR上的Spark日志实时搜索关键指标网页呈现的设计和实现
为了在AWS EMR上实现基于Spark的大数据日志处理系统,并通过Kafka、ElasticSearch和Python Flask构建实时搜索与可视化平台,以下是详细的设计与实现方案:
一、架构设计
核心组件说明
- Kafka Cluster:日志收集缓冲层
- 建议使用Amazon MSK(托管Kafka服务)
- 按日志量规划分区数和副本数
- EMR Spark:实时处理引擎
- 启用Spark Structured Streaming
- 使用EMR 6.x+版本(内置Spark 3.x)
- ElasticSearch:搜索与存储层
- 推荐使用Amazon OpenSearch Service(托管ES)
- 配置热/冷节点架构优化成本
- Flask Web:可视化层
- 部署于EC2或ECS容器
- 集成Jinja2模板与ECharts可视化
二、详细实现步骤
1. Kafka日志采集配置
# 生产者示例(Python)
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers='kafka-broker1:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
log_data = {
"timestamp": datetime.now().isoformat(),
"level": "ERROR",
"service": "payment-gateway",
"message": "Transaction timeout"
}
producer.send('app-logs', log_data)
2. EMR集群配置
- Bootstrap Action:
#!/bin/bash sudo pip-3.7 install elasticsearch-hadoop
- 集群参数:
{ "Classification": "spark-defaults", "Properties": { "spark.sql.streaming.schemaInference": "true", "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0" } }
3. Spark Streaming处理(Scala示例)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker1:9092")
.option("subscribe", "app-logs")
.load()
// JSON解析与字段提取
val logsDF = df.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", schema).as("log"))
.select("log.*")
.withColumn("timestamp", to_timestamp($"timestamp"))
// 关键指标计算
val metricsDF = logsDF
.withWatermark("timestamp", "5 minutes")
.groupBy(window($"timestamp", "1 minute"), $"service")
.agg(
count("*").as("total_errors"),
sum(when($"level" === "ERROR", 1).otherwise(0)).as("critical_errors")
)
// 写入ElasticSearch
metricsDF.writeStream
.outputMode("update")
.format("org.elasticsearch.spark.sql")
.option("es.nodes", "opensearch-domain:9200")
.option("es.mapping.id", "window")
.option("checkpointLocation", "/checkpoint")
.start("metrics-index/_doc")
4. ElasticSearch索引优化
PUT /metrics-index
{
"settings": {
"number_of_shards": 3,
"refresh_interval": "30s"
},
"mappings": {
"dynamic": "strict",
"properties": {
"window": {"type": "date_range", "format": "epoch_millis"},
"service": {"type": "keyword"},
"total_errors": {"type": "integer"},
"critical_errors": {"type": "integer"}
}
}
}
5. Flask Web服务实现
# app.py
from flask import Flask, render_template
from elasticsearch import Elasticsearch
app = Flask(__name__)
es = Elasticsearch(['opensearch-domain:9200'])
@app.route('/dashboard')
def dashboard():
query = {
"size": 0,
"aggs": {
"services": {
"terms": {"field": "service.keyword"},
"aggs": {"total_errors": {"sum": {"field": "total_errors"}}}
}
}
}
res = es.search(index="metrics-index", body=query)
return render_template('dashboard.html', data=res['aggregations'])
# templates/dashboard.html
<script src="https://cdn.jsdelivr.net/npm/echarts@5.4.0/dist/echarts.min.js"></script>
<div id="chart" style="width:800px;height:600px;"></div>
<script>
let chart = echarts.init(document.getElementById('chart'));
let option = {
title: {text: '实时服务错误统计'},
tooltip: {},
xAxis: {data: {{ data.services.buckets|map(attribute='key')|list|tojson }} },
yAxis: {},
series: [{type: 'bar', data: {{ data.services.buckets|map(attribute='total_errors.value')|list|tojson }}}]
};
chart.setOption(option);
</script>
三、性能优化策略
-
Kafka优化:
- 启用Snappy压缩
- 配置linger.ms=20和batch.size=16384
-
Spark调优:
spark-submit --executor-memory 8G \ --executor-cores 4 \ --conf spark.sql.shuffle.partitions=200
-
ES写入优化:
- 设置
es.batch.size.bytes=10mb
- 禁用副本写入
es.write.operation=create
- 设置
-
Web层缓存:
@app.route('/dashboard') @cache.cached(timeout=10) # 使用Flask-Caching def dashboard(): # ...
---
### **四、监控与运维**
1. **监控指标**:
- Kafka Consumer Lag
- Spark Streaming批处理时间
- ES JVM Heap使用率
2. **日志排查工具**:
```bash
# 查看Spark Streaming进度
yarn logs -applicationId <appId> -log_files stdout
# ES慢查询日志
GET /_search?pretty&pre_filter_shard_size=1024&typed_keys=true
五、安全方案
-
网络隔离:
- 将Kafka/ES部署在私有子网
- 使用Security Group限制访问源
-
认证授权:
es = Elasticsearch( hosts=['https://opensearch-domain:9200'], http_auth=('admin', 'password'), use_ssl=True )
该方案可实现每秒处理万级日志事件,并在5秒内完成从日志产生到可视化展示的全流程。建议根据实际业务需求调整时间窗口和聚合粒度,可通过增加EMR Task节点实现水平扩展。