【实战ES】实战 Elasticsearch:快速上手与深度实践-7.1.1Spark Streaming实时写入ES
👉 点击关注不迷路
👉 点击关注不迷路
👉 点击关注不迷路
文章大纲
- 7.1.1 Spark Streaming实时写入Elasticsearch深度实践指南
- 1. 核心集成原理
- 1.1 数据流转架构
- 1.2 技术选型对比
- 2. 全链路配置实战
- 2.1 环境准备清单
- 2.2 基础写入模板
- 3. 高级调优策略
- 3.1 `性能调优参数表`
- 3.2 `写入模式对比`
- 4. 企业级应用案例
- 4.1 电商实时日志分析
- 4.2 物联网时序数据
- 5. 容错与监控
- 5.1 故障处理机制
- 5.2 监控指标体系
- 6. 安全加固方案
- 6.1 安全连接配置
- 6.2 权限控制模板
7.1.1 Spark Streaming实时写入Elasticsearch深度实践指南
Spark Streaming与Elasticsearch实时数据管道架构
1. 核心集成原理
1.1 数据流转架构
1.2 技术选型对比
集成方式 | 吞吐量 | 延迟 | 可靠性 | 开发复杂度 |
---|---|---|---|---|
原生ES-Hadoop连接器 | 5万条/秒 | 2-5秒 | 高 | 低 |
自定义Bulk API | 8万条/秒 | 1-3秒 | 中 | 高 |
Logstash管道 | 3万条/秒 | 5-10秒 | 高 | 中 |
Kafka Connect | 6万条/秒 | 3-8秒 | 高 | 中 |
2. 全链路配置实战
2.1 环境准备清单
// 此为 build.sbt 文件,用于配置 Scala 项目的依赖项和构建信息
// 通过定义 libraryDependencies 来指定项目所需的外部库
// 使用 Seq 函数来创建一个依赖项的序列,将多个依赖项组合在一起
libraryDependencies ++= Seq(
// 添加 Apache Spark Core 库依赖
// "org.apache.spark" 是库的组织名称,代表该库来自 Apache Spark 项目
// "spark-core" 是库的名称,它包含了 Spark 的核心功能,如 RDD(弹性分布式数据集)等
// "3.3.1" 是库的版本号,指定使用该版本的 Spark Core
"org.apache.spark" %% "spark-core" % "3.3.1",
// 添加 Apache Spark Streaming 库依赖
// "spark-streaming" 提供了 Spark 进行实时流处理的功能,可用于处理连续的数据流
// 同样使用版本号 "3.3.1",确保与 Spark Core 版本兼容
"org.apache.spark" %% "spark-streaming" % "3.3.1",
// 添加 Elasticsearch 与 Spark 集成的库依赖
// "org.elasticsearch" 是库的组织名称,表明该库来自 Elasticsearch 项目
// "elasticsearch-spark-30" 是用于将 Spark 与 Elasticsearch 集成的库,允许将 Spark 处理的数据写入 Elasticsearch 或从 Elasticsearch 读取数据
// "8.6.1" 是该集成库的版本号
"org.elasticsearch" %% "elasticsearch-spark-30" % "8.6.1",
// 添加 Apache Spark SQL 与 Kafka 集成的库依赖
// "spark-sql-kafka-0-10" 用于在 Spark SQL 中与 Kafka 进行交互,可从 Kafka 主题读取数据或向 Kafka 主题写入数据
// 版本号 "3.3.1" 保证与其他 Spark 组件版本一致
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.1"
)
// 版本兼容矩阵
Spark版本 | ES-Hadoop版本 | ES Server版本 | 特性支持 |
---|---|---|---|
3.0.x | 7.17.x | 7.x-8.x | 基础写入 |
3.3.x | 8.6.x | 8.x | 向量化写入/安全连接 |
3.5.x | 8.9.x | 8.x | 原生向量搜索支持 |
2.2 基础写入模板
// 导入 Spark 配置类,用于设置 Spark 应用的各种参数
import org.apache.spark.SparkConf
// 导入 Spark Streaming 上下文类,用于创建和管理流式计算任务
import org.apache.spark.streaming._
// 导入 Kafka 相关工具类,用于从 Kafka 主题接收数据流
import org.apache.spark.streaming.kafka010._
// 导入 Kafka 消费策略枚举
import org.apache.kafka.common.serialization.StringDeserializer
// 导入 Elasticsearch 与 Spark SQL 集成的工具类,用于将数据写入 Elasticsearch
import org.elasticsearch.spark.sql._
// 导入 SparkSession 类,用于创建和管理 Spark 会话
import org.apache.spark.sql.SparkSession
// 创建一个 SparkConf 对象,用于配置 Spark 应用的属性
val conf = new SparkConf()
// 设置 Spark 应用的名称为 "ES-Streaming",方便在 Spark 集群管理界面识别
.setAppName("ES-Streaming")
// 设置 Elasticsearch 节点的地址,这里指定了两个节点,多个节点地址用逗号分隔
.set("es.nodes", "es-node1:9200,es-node2:9200")
// 设置当写入 Elasticsearch 时,如果索引不存在则自动创建
.set("es.index.auto.create", "true")
// 指定 Elasticsearch 文档的 ID 字段为 "id",在写入数据时会根据该字段进行操作
.set("es.mapping.id", "id")
// 创建一个 StreamingContext 对象,用于初始化 Spark Streaming 环境
// 第一个参数是之前创建的 SparkConf 对象,第二个参数是批处理间隔为 5 秒
val ssc = new StreamingContext(conf, Seconds(5))
// 定义 Kafka 参数,指定 Kafka 服务器地址、键和值的反序列化器等
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "kafka-broker1:9092,kafka-broker2:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark-streaming-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// 使用 KafkaUtils 创建一个直接从 Kafka 主题接收数据的 DStream
// 第一个参数是 StreamingContext 对象,第二个参数是消费策略,这里选择 PreferConsistent 表示均匀分配分区
// 第三个参数是订阅的 Kafka 主题和 Kafka 参数,这里订阅了 "log-topic" 主题
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String]("log-topic", kafkaParams)
)
// 对从 Kafka 接收到的每个 RDD 进行处理
kafkaStream.foreachRDD { rdd =>
// 导入 Elasticsearch 与 Spark SQL 集成的隐式转换和方法
import org.elasticsearch.spark.sql._
// 创建或获取一个 SparkSession 对象,使用当前 RDD 的 SparkContext 配置
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
// 将 RDD 中的每条消息的值(JSON 字符串)解析为 DataFrame
// rdd.map(_.value) 提取出消息的值部分,spark.read.json 方法将其解析为 DataFrame
val df = spark.read.json(rdd.map(_.value))
// 将 DataFrame 中的数据保存到 Elasticsearch
// 第一个参数指定 Elasticsearch 的索引名称,这里使用了动态索引名,根据 @timestamp 字段的日期生成索引
// 第二个参数是一个 Map,包含了一些写入 Elasticsearch 的配置参数
df.saveToEs("logs-{@timestamp:YYYY.MM.dd}",
Map(
// 启用日期字段的自动检测,让 Elasticsearch 自动识别日期字段
"es.mapping.date.detection" -> "true",
// 设置写入操作类型为 upsert,如果文档已存在则更新,不存在则插入
"es.write.operation" -> "upsert",
// 告诉 Elasticsearch 输入的数据是 JSON 格式
"es.input.json" -> "true"
)
)
}
// 启动 Spark Streaming 上下文,开始接收和处理数据流
ssc.start()
// 等待 StreamingContext 终止,使程序保持运行状态,直到手动停止或发生异常
ssc.awaitTermination()
3. 高级调优策略
3.1 性能调优参数表
参数 | 默认值 | 推荐值 | 调优效果 |
---|---|---|---|
es.batch.size.entries | 1000 | 5000 | 提升35%写入吞吐量 |
es.batch.write.refresh | true | false | 减少60%索引刷新开销 |
es.batch.write.retry.count | 3 | 5 | 提升网络波动下的可靠性 |
es.http.timeout | 1m | 30s | 降低慢节点影响 |
es.nodes.discovery | false | true | 自动发现节点提升负载均衡 |
es.mapping.id | - | “business_id” | 避免文档重复写入 |
3.2 写入模式对比
写入策略 | 配置参数 | 适用场景 | 性能基准(万条/秒) |
---|---|---|---|
批量提交 | es.batch.size.entries=5000 | 高吞吐离线数据 | 8.2 |
实时逐条 | es.batch.size.entries=1 | 低延迟关键事务 | 1.5 |
分区并行 | es.spark.partition.multiplier=4 | 大数据量场景 | 12.4 |
异步回压 | es.spark.streaming.backpressure.enabled=true | 流量波动场景 | 9.8 |
4. 企业级应用案例
4.1 电商实时日志分析
// 定义一个样例类 LogEvent,用于表示结构化的日志事件
// 样例类会自动为其属性生成 getter、setter 方法,还会生成 equals、hashCode、toString 等方法,方便数据处理和比较
case class LogEvent(userId: String,
action: String,
timestamp: Long,
geo: String)
// 对从 Kafka 接收到的数据流 kafkaStream 进行处理
val logStream = kafkaStream
// 从 Kafka 消息中提取消息的值部分,因为 Kafka 消息是键值对形式,这里只关注值
.map(_.value)
// 使用 transform 方法对 RDD 进行转换操作
.transform(rdd => {
// 使用 SparkSession 的 read.json 方法将 RDD 中的 JSON 字符串解析为 DataFrame
// 然后使用 as[LogEvent] 方法将 DataFrame 转换为 Dataset[LogEvent],即按照 LogEvent 样例类的结构进行映射
spark.read.json(rdd).as[LogEvent]
// 过滤掉 userId 为 null 的日志事件,确保处理的数据中 userId 是有效的
.filter(_.userId != null)
// 使用自定义的 geohashUDF 函数对 geo 字段进行处理,生成一个新的列 geo_hash
// 这里假设 geohashUDF 是一个已经定义好的用户自定义函数,用于将地理位置信息转换为地理哈希值
.withColumn("geo_hash", geohashUDF(col("geo")))
})
// 对处理后的日志流 logStream 中的每个 RDD 进行操作
logStream.foreachRDD { rdd =>
// 将 RDD 中的数据保存到 Elasticsearch
// 第一个参数指定 Elasticsearch 的索引名称,使用动态索引名,根据 timestamp 字段的日期生成索引
rdd.saveToEs("logs-{timestamp:yyyy-MM-dd}",
// 第二个参数是一个 Map,包含了一些写入 Elasticsearch 的配置参数
Map(
// 指定 Elasticsearch 文档的 ID 生成规则,使用 userId 和 timestamp 拼接而成
// 这样可以确保每个文档的 ID 是唯一的,避免数据重复
"es.mapping.id" -> "concat(userId,'_',timestamp)",
// 设置写入操作类型为 create,表示只有当文档不存在时才进行写入操作
// 如果文档已存在,写入操作会失败
"es.write.operation" -> "create",
// 启用 Spark 写入 Elasticsearch 后的清理操作,释放相关资源
"es.spark.cleanup" -> "true"
)
)
}
- 性能指标:
数据规模 | 批次间隔 | 处理延迟 | 吞吐量 | 资源消耗 |
---|---|---|---|---|
1亿条/天 | 5秒 | 3.2秒 | 6.8万条/秒 | 32核/64GB |
5亿条/天 | 10秒 | 6.5秒 | 9.4万条/秒 | 64核/128GB |
10亿条/天 | 15秒 | 9.1秒 | 12.1万条/秒 | 128核/256GB |
4.2 物联网时序数据
- 从 Kafka 接收 IoT 数据,对数据进行解析和转换,并将处理后的数据连同元数据一起写入 Elasticsearch 的功能。
// 定义一个包含 Elasticsearch 配置信息的 Map,键值对用于设置与 Elasticsearch 交互时的各种参数
val iotConf = Map(
// 设置是否仅通过广域网(WAN)连接到 Elasticsearch 节点
// 值为 "true" 表示只使用广域网连接,通常在集群部署环境中使用,避免本地网络干扰
"es.nodes.wan.only" -> "true",
// 指定 Elasticsearch 中的资源(索引和文档类型),这里使用动态索引名
// 索引名会根据数据中的 device_type 字段和 @timestamp 字段的年月部分动态生成
// 例如,不同的设备类型和时间会对应不同的索引,便于数据的组织和管理
"es.resource" -> "iot-{device_type}-{@timestamp:YYYY-MM}",
// 配置 Elasticsearch 文档的路由规则
// 使用 device_id 作为路由键,确保相同 device_id 的文档会被路由到同一分片,提高查询性能
"es.mapping.routing" -> "device_id",
// 设置是否自动创建 Elasticsearch 索引
// 值为 "true" 表示如果写入数据时指定的索引不存在,会自动创建该索引
"es.index.auto.create" -> "true"
)
// 对从 Kafka 接收到的数据流 kafkaStream 进行处理
kafkaStream
// 使用 map 函数对 kafkaStream 中的每条消息应用 parseIoTData 函数
// parseIoTData 函数用于将 Kafka 消息解析为特定的 IoT 数据格式
.map(parseIoTData)
// 对处理后的 DStream 中的每个 RDD 进行操作
.foreachRDD { rdd =>
// 对 RDD 中的每个元素应用 transformData 函数
// transformData 函数用于对解析后的 IoT 数据进行进一步的转换和处理
rdd.map(transformData)
// 将处理后的数据连同元数据一起保存到 Elasticsearch
// iotConf 是之前定义的 Elasticsearch 配置信息,用于指定写入的索引、路由规则等
.saveToEsWithMeta(iotConf)
}
- 优化效果:
优化项 | 优化前 | 优化后 | 提升比例 |
---|---|---|---|
写入吞吐量 | 4.2万条/秒 | 7.8万条/秒 | 85%↑ |
索引延迟 | 8.5秒 | 2.3秒 | 73%↓ |
存储压缩率 | 1:3 | 1:5 | 66%↑ |
查询响应时间 | 420ms | 120ms | 71%↓ |
5. 容错与监控
5.1 故障处理机制
// 配置 Elasticsearch 批量写入的重试策略
// 设置批量写入操作失败时的重试次数
// 这里将重试次数设置为 5 次,意味着当批量写入操作失败时,Spark 会尝试重新执行该操作,最多重试 5 次
conf.set("es.batch.write.retry.count", "5")
// 设置每次重试之间的等待时间
// 这里将等待时间设置为 10 秒,即每次重试操作之间会间隔 10 秒,避免频繁重试给 Elasticsearch 带来过大压力
conf.set("es.batch.write.retry.wait", "10s")
// 配置死信队列处理,当向 Elasticsearch 写入数据时出现错误,将无法写入的数据存储到死信队列
// 指定 DataFrame 写入的数据格式为 Elasticsearch
df.write
.format("es")
// 设置处理 Elasticsearch 写入错误的处理类
// 这里指定使用 com.example.ESErrorHandler 类来处理写入过程中出现的错误
// 该类需要开发者自定义实现,用于处理各种 Elasticsearch 写入错误情况
.option("es.write.rest.error.handlers", "com.example.ESErrorHandler")
// 设置死信队列的存储路径
// 当 com.example.ESErrorHandler 类处理错误时,将无法写入 Elasticsearch 的数据存储到指定的路径 /dlq 下
// 可以后续对死信队列中的数据进行分析和处理,以找出写入失败的原因并尝试重新写入
.option("es.write.rest.error.handler.com.example.ESErrorHandler.deadLetterPath", "/dlq")
// 执行写入操作,将 DataFrame 中的数据写入到 Elasticsearch 中
.save()
5.2 监控指标体系
指标类别 | 监控项 | 告警阈值 | 采集方式 |
---|---|---|---|
资源使用 | Executor CPU使用率 | >85%持续5分钟 | Prometheus |
数据流量 | 批次处理速率 | <1000条/秒 | Spark Metrics |
写入健康 | ES Bulk拒绝率 | >1% | ES Cluster Stats |
时效性 | 端到端延迟 | >10秒 | 打点日志分析 |
6. 安全加固方案
6.1 安全连接配置
// 以下配置用于设置 Spark 与 Elasticsearch 之间的安全连接及相关参数
// 开启 Elasticsearch 连接的 SSL 加密
// 当设置为 "true" 时,Spark 与 Elasticsearch 之间的通信将使用 SSL 协议进行加密,确保数据在传输过程中的安全性
conf.set("es.net.ssl", "true")
// 指定 SSL 密钥库的存储位置
// 密钥库(keystore)通常包含客户端的私钥和证书,用于 SSL 握手过程中的身份验证
// 这里设置密钥库的路径为 /path/to/keystore.jks,确保 Spark 能够找到并使用该密钥库进行安全连接
conf.set("es.net.ssl.keystore.location", "/path/to/keystore.jks")
// 设置密钥库的访问密码
// 密钥库是加密存储的,需要使用密码来解锁
// 这里设置的密码 "password" 用于访问 /path/to/keystore.jks 密钥库
conf.set("es.net.ssl.keystore.pass", "password")
// 指定 SSL 信任库的存储位置
// 信任库(truststore)包含了客户端信任的证书颁发机构(CA)的证书
// 通过设置信任库的路径为 /path/to/truststore.jks,Spark 可以验证 Elasticsearch 服务器提供的证书是否由受信任的 CA 颁发
conf.set("es.net.ssl.truststore.location", "/path/to/truststore.jks")
// 禁用 Elasticsearch 节点的自动发现功能
// 默认情况下,Elasticsearch 客户端会尝试自动发现集群中的其他节点
// 当设置为 "false" 时,Spark 将不会自动发现 Elasticsearch 集群中的其他节点,而是使用预先配置的节点信息进行连接
// 这种设置通常用于固定节点配置的场景,或者在网络环境复杂的情况下避免自动发现带来的问题
conf.set("es.nodes.discovery", "false")
6.2 权限控制模板
// 向 Elasticsearch 发送 PUT 请求,用于创建一个名为 spark_writer 的安全角色
PUT _security/role/spark_writer
{
// 定义该角色在集群层面拥有的权限
"cluster": [
// 赋予该角色对集群进行监控的权限,拥有此权限可以查看集群的状态信息、节点信息等监控数据
"monitor"
],
// 定义该角色在索引层面拥有的权限,可针对不同的索引进行不同的权限设置
"indices": [
{
// 指定该权限所作用的索引名称模式,这里表示对所有以 "logs-" 开头的索引生效
"names": ["logs-*"],
// 定义该角色对匹配上述模式的索引所拥有的权限
"privileges": [
// 允许该角色创建新的索引,当需要向符合 "logs-*" 模式的索引写入数据但该索引不存在时,可以自动创建
"create_index",
// 允许该角色向索引中写入新的文档,即可以执行插入操作
"index"
],
// 配置字段级别的安全权限,用于控制对索引中特定字段的访问
"field_security": {
// 授予该角色对索引中除了特定字段外的所有字段的访问权限
"grant": ["*"],
// 明确列出不授予访问权限的字段,这里是 "password" 和 "credit_card" 字段
// 这意味着即使该角色有对索引的操作权限,但无法访问和操作这些敏感字段
"except": ["password", "credit_card"]
}
}
]
}
附录:运维工具箱
工具类别 | 推荐方案 | 核心功能 |
---|---|---|
性能诊断 | Spark History Server | 作业执行分析 |
日志分析 | ELK Stack | 管道异常追踪 |
资源监控 | Grafana+Prometheus | 实时资源监控 |
数据校验 | Great Expectations | 数据质量检查 |
最佳实践:
生产环境必须启用SSL加密
- 建议采用
结构化数据格式(Parquet/JSON)
定期执行索引模板优化
- 建立数据Schema版本控制