当前位置: 首页 > article >正文

【实战ES】实战 Elasticsearch:快速上手与深度实践-7.1.1Spark Streaming实时写入ES

👉 点击关注不迷路
👉 点击关注不迷路
👉 点击关注不迷路


文章大纲

  • 7.1.1 Spark Streaming实时写入Elasticsearch深度实践指南
    • 1. 核心集成原理
      • 1.1 数据流转架构
      • 1.2 技术选型对比
    • 2. 全链路配置实战
      • 2.1 环境准备清单
      • 2.2 基础写入模板
    • 3. 高级调优策略
      • 3.1 `性能调优参数表`
      • 3.2 `写入模式对比`
    • 4. 企业级应用案例
      • 4.1 电商实时日志分析
      • 4.2 物联网时序数据
    • 5. 容错与监控
      • 5.1 故障处理机制
      • 5.2 监控指标体系
    • 6. 安全加固方案
      • 6.1 安全连接配置
      • 6.2 权限控制模板

7.1.1 Spark Streaming实时写入Elasticsearch深度实践指南

  • Spark Streaming与Elasticsearch实时数据管道架构
开始
数据源接入
创建Spark Streaming上下文
数据接收与转换
数据是否符合格式?
数据预处理
数据清洗与转换
数据分区与批量处理
配置Elasticsearch连接
写入Elasticsearch
写入是否成功?
监控与日志记录
错误处理与重试
是否继续接收数据?
结束

1. 核心集成原理

1.1 数据流转架构

Kafka/Flume/S3
ETL
聚合计算
数据源
Spark Streaming
数据处理
结构化数据
指标数据
Elasticsearch Writer
Elasticsearch Cluster
Kibana可视化

1.2 技术选型对比

集成方式吞吐量延迟可靠性开发复杂度
原生ES-Hadoop连接器5万条/秒2-5秒
自定义Bulk API8万条/秒1-3秒
Logstash管道3万条/秒5-10秒
Kafka Connect6万条/秒3-8秒

2. 全链路配置实战

2.1 环境准备清单

// 此为 build.sbt 文件,用于配置 Scala 项目的依赖项和构建信息
// 通过定义 libraryDependencies 来指定项目所需的外部库

// 使用 Seq 函数来创建一个依赖项的序列,将多个依赖项组合在一起
libraryDependencies ++= Seq(
  // 添加 Apache Spark Core 库依赖
  // "org.apache.spark" 是库的组织名称,代表该库来自 Apache Spark 项目
  // "spark-core" 是库的名称,它包含了 Spark 的核心功能,如 RDD(弹性分布式数据集)等
  // "3.3.1" 是库的版本号,指定使用该版本的 Spark Core
  "org.apache.spark" %% "spark-core" % "3.3.1",
  
  // 添加 Apache Spark Streaming 库依赖
  // "spark-streaming" 提供了 Spark 进行实时流处理的功能,可用于处理连续的数据流
  // 同样使用版本号 "3.3.1",确保与 Spark Core 版本兼容
  "org.apache.spark" %% "spark-streaming" % "3.3.1",
  
  // 添加 Elasticsearch 与 Spark 集成的库依赖
  // "org.elasticsearch" 是库的组织名称,表明该库来自 Elasticsearch 项目
  // "elasticsearch-spark-30" 是用于将 Spark 与 Elasticsearch 集成的库,允许将 Spark 处理的数据写入 Elasticsearch 或从 Elasticsearch 读取数据
  // "8.6.1" 是该集成库的版本号
  "org.elasticsearch" %% "elasticsearch-spark-30" % "8.6.1",
  
  // 添加 Apache Spark SQL 与 Kafka 集成的库依赖
  // "spark-sql-kafka-0-10" 用于在 Spark SQL 中与 Kafka 进行交互,可从 Kafka 主题读取数据或向 Kafka 主题写入数据
  // 版本号 "3.3.1" 保证与其他 Spark 组件版本一致
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.1"
)

// 版本兼容矩阵
Spark版本ES-Hadoop版本ES Server版本特性支持
3.0.x7.17.x7.x-8.x基础写入
3.3.x8.6.x8.x向量化写入/安全连接
3.5.x8.9.x8.x原生向量搜索支持

2.2 基础写入模板

// 导入 Spark 配置类,用于设置 Spark 应用的各种参数
import org.apache.spark.SparkConf
// 导入 Spark Streaming 上下文类,用于创建和管理流式计算任务
import org.apache.spark.streaming._
// 导入 Kafka 相关工具类,用于从 Kafka 主题接收数据流
import org.apache.spark.streaming.kafka010._
// 导入 Kafka 消费策略枚举
import org.apache.kafka.common.serialization.StringDeserializer
// 导入 Elasticsearch 与 Spark SQL 集成的工具类,用于将数据写入 Elasticsearch
import org.elasticsearch.spark.sql._
// 导入 SparkSession 类,用于创建和管理 Spark 会话
import org.apache.spark.sql.SparkSession

// 创建一个 SparkConf 对象,用于配置 Spark 应用的属性
val conf = new SparkConf()
  // 设置 Spark 应用的名称为 "ES-Streaming",方便在 Spark 集群管理界面识别
  .setAppName("ES-Streaming")
  // 设置 Elasticsearch 节点的地址,这里指定了两个节点,多个节点地址用逗号分隔
  .set("es.nodes", "es-node1:9200,es-node2:9200")
  // 设置当写入 Elasticsearch 时,如果索引不存在则自动创建
  .set("es.index.auto.create", "true")
  // 指定 Elasticsearch 文档的 ID 字段为 "id",在写入数据时会根据该字段进行操作
  .set("es.mapping.id", "id")

// 创建一个 StreamingContext 对象,用于初始化 Spark Streaming 环境
// 第一个参数是之前创建的 SparkConf 对象,第二个参数是批处理间隔为 5 秒
val ssc = new StreamingContext(conf, Seconds(5))

// 定义 Kafka 参数,指定 Kafka 服务器地址、键和值的反序列化器等
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "kafka-broker1:9092,kafka-broker2:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-streaming-group",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

// 使用 KafkaUtils 创建一个直接从 Kafka 主题接收数据的 DStream
// 第一个参数是 StreamingContext 对象,第二个参数是消费策略,这里选择 PreferConsistent 表示均匀分配分区
// 第三个参数是订阅的 Kafka 主题和 Kafka 参数,这里订阅了 "log-topic" 主题
val kafkaStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String]("log-topic", kafkaParams)
)

// 对从 Kafka 接收到的每个 RDD 进行处理
kafkaStream.foreachRDD { rdd =>
  // 导入 Elasticsearch 与 Spark SQL 集成的隐式转换和方法
  import org.elasticsearch.spark.sql._
  
  // 创建或获取一个 SparkSession 对象,使用当前 RDD 的 SparkContext 配置
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  
  // 将 RDD 中的每条消息的值(JSON 字符串)解析为 DataFrame
  // rdd.map(_.value) 提取出消息的值部分,spark.read.json 方法将其解析为 DataFrame
  val df = spark.read.json(rdd.map(_.value))
  
  // 将 DataFrame 中的数据保存到 Elasticsearch
  // 第一个参数指定 Elasticsearch 的索引名称,这里使用了动态索引名,根据 @timestamp 字段的日期生成索引
  // 第二个参数是一个 Map,包含了一些写入 Elasticsearch 的配置参数
  df.saveToEs("logs-{@timestamp:YYYY.MM.dd}",
    Map(
      // 启用日期字段的自动检测,让 Elasticsearch 自动识别日期字段
      "es.mapping.date.detection" -> "true",
      // 设置写入操作类型为 upsert,如果文档已存在则更新,不存在则插入
      "es.write.operation" -> "upsert",
      // 告诉 Elasticsearch 输入的数据是 JSON 格式
      "es.input.json" -> "true"
    )
  )
}

// 启动 Spark Streaming 上下文,开始接收和处理数据流
ssc.start()
// 等待 StreamingContext 终止,使程序保持运行状态,直到手动停止或发生异常
ssc.awaitTermination()

3. 高级调优策略

3.1 性能调优参数表

参数默认值推荐值调优效果
es.batch.size.entries10005000提升35%写入吞吐量
es.batch.write.refreshtruefalse减少60%索引刷新开销
es.batch.write.retry.count35提升网络波动下的可靠性
es.http.timeout1m30s降低慢节点影响
es.nodes.discoveryfalsetrue自动发现节点提升负载均衡
es.mapping.id-“business_id”避免文档重复写入

3.2 写入模式对比

写入策略配置参数适用场景性能基准(万条/秒)
批量提交es.batch.size.entries=5000高吞吐离线数据8.2
实时逐条es.batch.size.entries=1低延迟关键事务1.5
分区并行es.spark.partition.multiplier=4大数据量场景12.4
异步回压es.spark.streaming.backpressure.enabled=true流量波动场景9.8

4. 企业级应用案例

4.1 电商实时日志分析

// 定义一个样例类 LogEvent,用于表示结构化的日志事件
// 样例类会自动为其属性生成 getter、setter 方法,还会生成 equals、hashCode、toString 等方法,方便数据处理和比较
case class LogEvent(userId: String, 
                   action: String, 
                   timestamp: Long,
                   geo: String)

// 对从 Kafka 接收到的数据流 kafkaStream 进行处理
val logStream = kafkaStream
  // 从 Kafka 消息中提取消息的值部分,因为 Kafka 消息是键值对形式,这里只关注值
  .map(_.value)
  // 使用 transform 方法对 RDD 进行转换操作
  .transform(rdd => {
    // 使用 SparkSession 的 read.json 方法将 RDD 中的 JSON 字符串解析为 DataFrame
    // 然后使用 as[LogEvent] 方法将 DataFrame 转换为 Dataset[LogEvent],即按照 LogEvent 样例类的结构进行映射
    spark.read.json(rdd).as[LogEvent]
      // 过滤掉 userId 为 null 的日志事件,确保处理的数据中 userId 是有效的
      .filter(_.userId != null)
      // 使用自定义的 geohashUDF 函数对 geo 字段进行处理,生成一个新的列 geo_hash
      // 这里假设 geohashUDF 是一个已经定义好的用户自定义函数,用于将地理位置信息转换为地理哈希值
      .withColumn("geo_hash", geohashUDF(col("geo")))
  })

// 对处理后的日志流 logStream 中的每个 RDD 进行操作
logStream.foreachRDD { rdd =>
  // 将 RDD 中的数据保存到 Elasticsearch
  // 第一个参数指定 Elasticsearch 的索引名称,使用动态索引名,根据 timestamp 字段的日期生成索引
  rdd.saveToEs("logs-{timestamp:yyyy-MM-dd}",
    // 第二个参数是一个 Map,包含了一些写入 Elasticsearch 的配置参数
    Map(
      // 指定 Elasticsearch 文档的 ID 生成规则,使用 userId 和 timestamp 拼接而成
      // 这样可以确保每个文档的 ID 是唯一的,避免数据重复
      "es.mapping.id" -> "concat(userId,'_',timestamp)",
      // 设置写入操作类型为 create,表示只有当文档不存在时才进行写入操作
      // 如果文档已存在,写入操作会失败
      "es.write.operation" -> "create",
      // 启用 Spark 写入 Elasticsearch 后的清理操作,释放相关资源
      "es.spark.cleanup" -> "true"
    )
  )
}
  • 性能指标
数据规模批次间隔处理延迟吞吐量资源消耗
1亿条/天5秒3.2秒6.8万条/秒32核/64GB
5亿条/天10秒6.5秒9.4万条/秒64核/128GB
10亿条/天15秒9.1秒12.1万条/秒128核/256GB

4.2 物联网时序数据

  • 从 Kafka 接收 IoT 数据,对数据进行解析和转换,并将处理后的数据连同元数据一起写入 Elasticsearch 的功能。
// 定义一个包含 Elasticsearch 配置信息的 Map,键值对用于设置与 Elasticsearch 交互时的各种参数
val iotConf = Map(
  // 设置是否仅通过广域网(WAN)连接到 Elasticsearch 节点
  // 值为 "true" 表示只使用广域网连接,通常在集群部署环境中使用,避免本地网络干扰
  "es.nodes.wan.only" -> "true",
  
  // 指定 Elasticsearch 中的资源(索引和文档类型),这里使用动态索引名
  // 索引名会根据数据中的 device_type 字段和 @timestamp 字段的年月部分动态生成
  // 例如,不同的设备类型和时间会对应不同的索引,便于数据的组织和管理
  "es.resource" -> "iot-{device_type}-{@timestamp:YYYY-MM}",
  
  // 配置 Elasticsearch 文档的路由规则
  // 使用 device_id 作为路由键,确保相同 device_id 的文档会被路由到同一分片,提高查询性能
  "es.mapping.routing" -> "device_id",
  
  // 设置是否自动创建 Elasticsearch 索引
  // 值为 "true" 表示如果写入数据时指定的索引不存在,会自动创建该索引
  "es.index.auto.create" -> "true"
)

// 对从 Kafka 接收到的数据流 kafkaStream 进行处理
kafkaStream
  // 使用 map 函数对 kafkaStream 中的每条消息应用 parseIoTData 函数
  // parseIoTData 函数用于将 Kafka 消息解析为特定的 IoT 数据格式
  .map(parseIoTData)
  // 对处理后的 DStream 中的每个 RDD 进行操作
  .foreachRDD { rdd =>
    // 对 RDD 中的每个元素应用 transformData 函数
    // transformData 函数用于对解析后的 IoT 数据进行进一步的转换和处理
    rdd.map(transformData)
      // 将处理后的数据连同元数据一起保存到 Elasticsearch
      // iotConf 是之前定义的 Elasticsearch 配置信息,用于指定写入的索引、路由规则等
      .saveToEsWithMeta(iotConf)
  }
  • 优化效果
优化项优化前优化后提升比例
写入吞吐量4.2万条/秒7.8万条/秒85%↑
索引延迟8.5秒2.3秒73%↓
存储压缩率1:31:566%↑
查询响应时间420ms120ms71%↓

5. 容错与监控

5.1 故障处理机制

// 配置 Elasticsearch 批量写入的重试策略

// 设置批量写入操作失败时的重试次数
// 这里将重试次数设置为 5 次,意味着当批量写入操作失败时,Spark 会尝试重新执行该操作,最多重试 5 次
conf.set("es.batch.write.retry.count", "5")

// 设置每次重试之间的等待时间
// 这里将等待时间设置为 10 秒,即每次重试操作之间会间隔 10 秒,避免频繁重试给 Elasticsearch 带来过大压力
conf.set("es.batch.write.retry.wait", "10s")

// 配置死信队列处理,当向 Elasticsearch 写入数据时出现错误,将无法写入的数据存储到死信队列

// 指定 DataFrame 写入的数据格式为 Elasticsearch
df.write
  .format("es")
  // 设置处理 Elasticsearch 写入错误的处理类
  // 这里指定使用 com.example.ESErrorHandler 类来处理写入过程中出现的错误
  // 该类需要开发者自定义实现,用于处理各种 Elasticsearch 写入错误情况
  .option("es.write.rest.error.handlers", "com.example.ESErrorHandler")
  // 设置死信队列的存储路径
  // 当 com.example.ESErrorHandler 类处理错误时,将无法写入 Elasticsearch 的数据存储到指定的路径 /dlq 下
  // 可以后续对死信队列中的数据进行分析和处理,以找出写入失败的原因并尝试重新写入
  .option("es.write.rest.error.handler.com.example.ESErrorHandler.deadLetterPath", "/dlq")
  // 执行写入操作,将 DataFrame 中的数据写入到 Elasticsearch 中
  .save()

5.2 监控指标体系

指标类别监控项告警阈值采集方式
资源使用Executor CPU使用率>85%持续5分钟Prometheus
数据流量批次处理速率<1000条/秒Spark Metrics
写入健康ES Bulk拒绝率>1%ES Cluster Stats
时效性端到端延迟>10秒打点日志分析

6. 安全加固方案

6.1 安全连接配置

// 以下配置用于设置 Spark 与 Elasticsearch 之间的安全连接及相关参数

// 开启 Elasticsearch 连接的 SSL 加密
// 当设置为 "true" 时,Spark 与 Elasticsearch 之间的通信将使用 SSL 协议进行加密,确保数据在传输过程中的安全性
conf.set("es.net.ssl", "true")

// 指定 SSL 密钥库的存储位置
// 密钥库(keystore)通常包含客户端的私钥和证书,用于 SSL 握手过程中的身份验证
// 这里设置密钥库的路径为 /path/to/keystore.jks,确保 Spark 能够找到并使用该密钥库进行安全连接
conf.set("es.net.ssl.keystore.location", "/path/to/keystore.jks")

// 设置密钥库的访问密码
// 密钥库是加密存储的,需要使用密码来解锁
// 这里设置的密码 "password" 用于访问 /path/to/keystore.jks 密钥库
conf.set("es.net.ssl.keystore.pass", "password")

// 指定 SSL 信任库的存储位置
// 信任库(truststore)包含了客户端信任的证书颁发机构(CA)的证书
// 通过设置信任库的路径为 /path/to/truststore.jks,Spark 可以验证 Elasticsearch 服务器提供的证书是否由受信任的 CA 颁发
conf.set("es.net.ssl.truststore.location", "/path/to/truststore.jks")

// 禁用 Elasticsearch 节点的自动发现功能
// 默认情况下,Elasticsearch 客户端会尝试自动发现集群中的其他节点
// 当设置为 "false" 时,Spark 将不会自动发现 Elasticsearch 集群中的其他节点,而是使用预先配置的节点信息进行连接
// 这种设置通常用于固定节点配置的场景,或者在网络环境复杂的情况下避免自动发现带来的问题
conf.set("es.nodes.discovery", "false")

6.2 权限控制模板

// 向 Elasticsearch 发送 PUT 请求,用于创建一个名为 spark_writer 的安全角色
PUT _security/role/spark_writer
{
    // 定义该角色在集群层面拥有的权限
    "cluster": [
        // 赋予该角色对集群进行监控的权限,拥有此权限可以查看集群的状态信息、节点信息等监控数据
        "monitor"
    ],
    // 定义该角色在索引层面拥有的权限,可针对不同的索引进行不同的权限设置
    "indices": [
        {
            // 指定该权限所作用的索引名称模式,这里表示对所有以 "logs-" 开头的索引生效
            "names": ["logs-*"],
            // 定义该角色对匹配上述模式的索引所拥有的权限
            "privileges": [
                // 允许该角色创建新的索引,当需要向符合 "logs-*" 模式的索引写入数据但该索引不存在时,可以自动创建
                "create_index",
                // 允许该角色向索引中写入新的文档,即可以执行插入操作
                "index"
            ],
            // 配置字段级别的安全权限,用于控制对索引中特定字段的访问
            "field_security": { 
                // 授予该角色对索引中除了特定字段外的所有字段的访问权限
                "grant": ["*"],
                // 明确列出不授予访问权限的字段,这里是 "password" 和 "credit_card" 字段
                // 这意味着即使该角色有对索引的操作权限,但无法访问和操作这些敏感字段
                "except": ["password", "credit_card"]
            }
        }
    ]
}

附录:运维工具箱

工具类别推荐方案核心功能
性能诊断Spark History Server作业执行分析
日志分析ELK Stack管道异常追踪
资源监控Grafana+Prometheus实时资源监控
数据校验Great Expectations数据质量检查

最佳实践

  1. 生产环境必须启用SSL加密
  2. 建议采用结构化数据格式(Parquet/JSON)
  3. 定期执行索引模板优化
  4. 建立数据Schema版本控制

http://www.kler.cn/a/579999.html

相关文章:

  • java学习总结三:springMVC
  • 【批量图片识别改名】如何自动识别图片文字并命名,一次性识别多张图片内容作为文件名,基于WPF和百度OCR,教你如何实现
  • 从头开始开发基于虹软SDK的人脸识别考勤系统(python+RTSP开源)(四)
  • Java本地方法根据线上地址下载图片到本地然后返回本地可以访问的地址
  • c语言笔记 一维数组与二维数组
  • python爬虫:Android自动化工具Auto.js的详细使用
  • RabbitMQ高级特性----生产者确认机制
  • craco.config.js是什么?
  • Java剪刀石头布
  • 小程序实现存储用户注册信息功能 前后端+数据库联调
  • 【2025】基于php+vue的舞蹈培训机构管理系统(源码+文档+调试+图文修改+答疑)
  • 静态网页的爬虫(以电影天堂为例)
  • 基于SpringBoot实现旅游酒店平台功能三
  • 【Academy】Web 缓存欺骗 ------ Web cache deception
  • 深入理解隐式类型转换:从原理到应用
  • FPGA|Verilog-自己写的SPI驱动
  • 我们在开发时,什么时候用到虚函数和纯虚函数?
  • MacOS安装FFmpeg和FFprobe
  • 洛谷 P1433 吃奶酪
  • Spring Cloud 负载均衡器架构选型