基于Spark3.4.4开发StructuredStreaming读取文件数据
maven依赖文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.lh.pblh123</groupId>
<artifactId>spark2024</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<!-- 设置国内maven下载镜像源-->
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>https://maven.aliyun.com/repository/public</url>
</repository>
</repositories>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.4.4</version>
<exclusions> <!--设置日志级别-->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.4.4</version> <!-- 请根据实际版本调整 -->
</dependency>
<dependency>
<groupId>com.typesafe.play</groupId>
<artifactId>play-json_2.12</artifactId>
<version>2.10.6</version>
</dependency>
<!-- 添加spark streaming依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.4.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>2.12.17</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
</args>
</configuration>
</plugin>
</plugins>
</build>
</project>
源码如下:
文件生成代码SparkFilesourceGenerate.scala
package cn.lh.pblh123.spark2024.theorycourse.charpter8
import play.api.libs.json.Json
import java.io.{File, PrintWriter}
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.util.concurrent.ThreadLocalRandom
object SparkFilesourceGenerate {
val test_data_temp_dir = "datas/tmpdirs/"
val test_data_dir = "datas/tmpdirs/testdata/"
val action_def = List("login", "logout", "purchase")
val district_def = List("beijing", "shanghai", "guizhou", "hangzhou")
// 测试目录是否存在,存在则删除旧数据,并建立目录
def test_setup(): Unit = {
val f = new File(test_data_temp_dir)
if (f.exists()) {
try {
val files = f.listFiles()
if (files != null) {
for (file <- files) {
delete_dir(file)
}
}
} catch {
case e: Exception => println(s"Error deleting files in $test_data_temp_dir: ${e.getMessage}")
}
}
try {
if (f.exists()) f.delete()
f.mkdir()
} catch {
case e: Exception => println(s"Error creating directory $test_data_temp_dir: ${e.getMessage}")
}
}
// 删除文件及子目录
def delete_dir(dir: File): Unit = {
if (dir.isDirectory) {
try {
val files = dir.listFiles()
if (files != null) {
for (file <- files) {
delete_dir(file)
}
}
} catch {
case e: Exception => println(s"Error deleting directory $dir: ${e.getMessage}")
}
} else if (dir.isFile) {
try {
dir.delete()
} catch {
case e: Exception => println(s"Error deleting file $dir: ${e.getMessage}")
}
}
}
// 测试环境的恢复,对目录进行清理
def test_cleanup(): Unit = {
val f = new File(test_data_dir)
if (f.exists()) {
try {
val files = f.listFiles()
if (files != null) {
for (file <- files) {
delete_dir(file)
}
}
} catch {
case e: Exception => println(s"Error deleting files in $test_data_dir: ${e.getMessage}")
}
}
}
// 生成测试文件
def generate_test_data(filename: String, data: String): Unit = {
val f = new File(test_data_temp_dir + filename)
val writer = new PrintWriter(f)
writer.write(data)
writer.close()
try {
Files.move(Paths.get(test_data_temp_dir + filename), Paths.get(test_data_dir + filename), StandardCopyOption.REPLACE_EXISTING)
} catch {
case e: Exception => println(s"Error moving file from $test_data_temp_dir to $test_data_dir: ${e.getMessage}")
}
}
// 生成单条 JSON 数据
/**
* 生成单条 JSON 数据
* @return JSON 字符串
*/
def generate_json_data(): String = {
val evenTime = System.currentTimeMillis() / 1000
val action = action_def(ThreadLocalRandom.current().nextInt(3)) // 获取随机动作
val district = district_def(ThreadLocalRandom.current().nextInt(4)) // 获取随机区域
Json.obj(
"eventTime" -> evenTime,
"action" -> action,
"district" -> district
).toString()
}
def main(args: Array[String]): Unit = {
try {
// 初始化测试环境
test_setup()
// 循环生成1000个文件
for (i <- 1 to 1000) {
// 生成文件名
val filename = "pblh-mail-" + i + ".json"
// 创建 StringBuilder 对象用于存储生成的 JSON 数据
val sb = new StringBuilder
// 生成100条 JSON 数据
for (_ <- 1 to 100) {
sb.append(generate_json_data()).append("\n")
}
// 将生成的数据写入文件
generate_test_data(filename, sb.toString())
// 显示进度提示
Console.println(s"已生成文件 $filename,进度: ${i}/1000")
// 每次循环后休眠1秒
Thread.sleep(1000)
}
// 清理测试环境
test_cleanup()
} catch {
// 捕获并打印任何在 main 方法中发生的异常
case e: Exception => println(s"Error in main method: ${e.getMessage}")
}
}
}
StructuredStreaming文件处理代码 处理文件中purchase数据
StructuredFilePurchaseCount.scala
package cn.lh.pblh123.spark2024.theorycourse.charpter8
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{asc, window}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.log4j.{Level, Logger}
object StructuredFilePurchaseCount {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("Usage: StructuredFilePurchaseCount <jsonFilePath>")
System.exit(1)
}
val masterUrl = args(0)
val jsonFilePath = args(1) // 从命令行参数获取路径
// 检查 masterUrl 是否有效
if (!masterUrl.startsWith("local") && !masterUrl.startsWith("spark")) {
System.err.println("Invalid Spark Master URL: " + masterUrl)
System.exit(1)
}
// 检查 jsonFilePath 是否存在
if (!new java.io.File(jsonFilePath).exists()) {
System.err.println("JSON file does not exist: " + jsonFilePath)
System.exit(1)
}
val spark = SparkSession.builder.appName("StructuredFilePurchaseCount").master(masterUrl).getOrCreate()
Logger.getLogger("org").setLevel(Level.ERROR)
var query: Option[org.apache.spark.sql.streaming.StreamingQuery] = None
try {
// 定义模式
val schema = StructType(Array(
StructField("eventTime", TimestampType, true),
StructField("action", StringType, true),
StructField("district", StringType, true)
))
// 读取 JSON 文件
val maxFilesPerTrigger = 100 // 根据实际情况调整
val lines = spark.readStream.format("json").schema(schema).option("maxFilesPerTrigger", maxFilesPerTrigger).load(jsonFilePath)
// 添加调试信息,检查读取的数据
lines.printSchema()
// lines.writeStream.format("console").option("truncate", "false").start().awaitTermination(10000)
import spark.implicits._
// 过滤购买行为
val filteredLines = lines.filter($"action".isNotNull && $"action" === "purchase" && $"eventTime".isNotNull)
// 添加调试信息,检查过滤后的数据
// filteredLines.writeStream.format("console").option("truncate", "false").start().awaitTermination(10000)
// 定义窗口持续时间
val windowDuration = "1 minutes"
// 分组计数
val windowedCounts = filteredLines.groupBy($"district", window($"eventTime", windowDuration))
.count()
.sort(asc("window"))
// 添加调试信息,检查分组计数后的数据
// windowedCounts.writeStream.format("console").option("truncate", "false").start().awaitTermination(10000)
// 启动流处理查询
query = Some(windowedCounts.writeStream.outputMode("complete")
.format("console").option("truncate", "false")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start())
// 等待查询终止
query.get.awaitTermination()
} catch {
case e: Exception =>
val logger = Logger.getLogger(this.getClass)
logger.error("An error occurred: " + e.getMessage, e)
System.exit(1)
} finally {
query.foreach(_.stop())
spark.stop()
}
}
}
代码运行效果如下
启动文件生成程序,模拟json文件数据
启动StructuredFilePurchaseCount处理json文件