当前位置: 首页 > article >正文

基于Spark3.4.4开发StructuredStreaming读取文件数据

maven依赖文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.lh.pblh123</groupId>
    <artifactId>spark2024</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

<!--    设置国内maven下载镜像源-->
    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>https://maven.aliyun.com/repository/public</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.4.4</version>
            <exclusions>  <!--设置日志级别-->
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.4.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.4.4</version> <!-- 请根据实际版本调整 -->
        </dependency>

        <dependency>
            <groupId>com.typesafe.play</groupId>
            <artifactId>play-json_2.12</artifactId>
            <version>2.10.6</version>
        </dependency>

        <!--        添加spark streaming依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.4.4</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>2.12.17</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.8</arg>
                    </args>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

源码如下:

文件生成代码SparkFilesourceGenerate.scala
package cn.lh.pblh123.spark2024.theorycourse.charpter8

import play.api.libs.json.Json

import java.io.{File, PrintWriter}
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.util.concurrent.ThreadLocalRandom

object SparkFilesourceGenerate {

  val test_data_temp_dir = "datas/tmpdirs/"
  val test_data_dir = "datas/tmpdirs/testdata/"

  val action_def = List("login", "logout", "purchase")
  val district_def = List("beijing", "shanghai", "guizhou", "hangzhou")

  // 测试目录是否存在,存在则删除旧数据,并建立目录
  def test_setup(): Unit = {
    val f = new File(test_data_temp_dir)
    if (f.exists()) {
      try {
        val files = f.listFiles()
        if (files != null) {
          for (file <- files) {
            delete_dir(file)
          }
        }
      } catch {
        case e: Exception => println(s"Error deleting files in $test_data_temp_dir: ${e.getMessage}")
      }
    }
    try {
      if (f.exists()) f.delete()
      f.mkdir()
    } catch {
      case e: Exception => println(s"Error creating directory $test_data_temp_dir: ${e.getMessage}")
    }
  }

  // 删除文件及子目录
  def delete_dir(dir: File): Unit = {
    if (dir.isDirectory) {
      try {
        val files = dir.listFiles()
        if (files != null) {
          for (file <- files) {
            delete_dir(file)
          }
        }
      } catch {
        case e: Exception => println(s"Error deleting directory $dir: ${e.getMessage}")
      }
    } else if (dir.isFile) {
      try {
        dir.delete()
      } catch {
        case e: Exception => println(s"Error deleting file $dir: ${e.getMessage}")
      }
    }
  }

  // 测试环境的恢复,对目录进行清理
  def test_cleanup(): Unit = {
    val f = new File(test_data_dir)
    if (f.exists()) {
      try {
        val files = f.listFiles()
        if (files != null) {
          for (file <- files) {
            delete_dir(file)
          }
        }
      } catch {
        case e: Exception => println(s"Error deleting files in $test_data_dir: ${e.getMessage}")
      }
    }
  }

  // 生成测试文件
  def generate_test_data(filename: String, data: String): Unit = {
    val f = new File(test_data_temp_dir + filename)
    val writer = new PrintWriter(f)
    writer.write(data)
    writer.close()
    try {
      Files.move(Paths.get(test_data_temp_dir + filename), Paths.get(test_data_dir + filename), StandardCopyOption.REPLACE_EXISTING)
    } catch {
      case e: Exception => println(s"Error moving file from $test_data_temp_dir to $test_data_dir: ${e.getMessage}")
    }
  }

  // 生成单条 JSON 数据
   /**
    * 生成单条 JSON 数据
    * @return JSON 字符串
    */
   def generate_json_data(): String = {
     val evenTime = System.currentTimeMillis() / 1000
     val action = action_def(ThreadLocalRandom.current().nextInt(3)) // 获取随机动作
     val district = district_def(ThreadLocalRandom.current().nextInt(4)) // 获取随机区域
     Json.obj(
       "eventTime" -> evenTime,
       "action" -> action,
       "district" -> district
     ).toString()
   }

 def main(args: Array[String]): Unit = {
  try {
    // 初始化测试环境
    test_setup()

    // 循环生成1000个文件
    for (i <- 1 to 1000) {
      // 生成文件名
      val filename = "pblh-mail-" + i + ".json"

      // 创建 StringBuilder 对象用于存储生成的 JSON 数据
      val sb = new StringBuilder

      // 生成100条 JSON 数据
      for (_ <- 1 to 100) {
        sb.append(generate_json_data()).append("\n")
      }

      // 将生成的数据写入文件
      generate_test_data(filename, sb.toString())

      // 显示进度提示
      Console.println(s"已生成文件 $filename,进度: ${i}/1000")

      // 每次循环后休眠1秒
      Thread.sleep(1000)
    }

    // 清理测试环境
    test_cleanup()
  } catch {
    // 捕获并打印任何在 main 方法中发生的异常
    case e: Exception => println(s"Error in main method: ${e.getMessage}")
  }
}

}

StructuredStreaming文件处理代码 处理文件中purchase数据

StructuredFilePurchaseCount.scala
package cn.lh.pblh123.spark2024.theorycourse.charpter8

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{asc, window}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.log4j.{Level, Logger}

object StructuredFilePurchaseCount {

  def main(args: Array[String]): Unit = {

    if (args.length != 2) {
      System.err.println("Usage: StructuredFilePurchaseCount <jsonFilePath>")
      System.exit(1)
    }

    val masterUrl = args(0)
    val jsonFilePath = args(1) // 从命令行参数获取路径

    // 检查 masterUrl 是否有效
    if (!masterUrl.startsWith("local") && !masterUrl.startsWith("spark")) {
      System.err.println("Invalid Spark Master URL: " + masterUrl)
      System.exit(1)
    }

    // 检查 jsonFilePath 是否存在
    if (!new java.io.File(jsonFilePath).exists()) {
      System.err.println("JSON file does not exist: " + jsonFilePath)
      System.exit(1)
    }

    val spark = SparkSession.builder.appName("StructuredFilePurchaseCount").master(masterUrl).getOrCreate()

    Logger.getLogger("org").setLevel(Level.ERROR)

    var query: Option[org.apache.spark.sql.streaming.StreamingQuery] = None

    try {
      // 定义模式
      val schema = StructType(Array(
        StructField("eventTime", TimestampType, true),
        StructField("action", StringType, true),
        StructField("district", StringType, true)
      ))

      // 读取 JSON 文件
      val maxFilesPerTrigger = 100 // 根据实际情况调整
      val lines = spark.readStream.format("json").schema(schema).option("maxFilesPerTrigger", maxFilesPerTrigger).load(jsonFilePath)

      // 添加调试信息,检查读取的数据
      lines.printSchema()
//      lines.writeStream.format("console").option("truncate", "false").start().awaitTermination(10000)

      import spark.implicits._
      // 过滤购买行为
      val filteredLines = lines.filter($"action".isNotNull && $"action" === "purchase" && $"eventTime".isNotNull)

      // 添加调试信息,检查过滤后的数据
//      filteredLines.writeStream.format("console").option("truncate", "false").start().awaitTermination(10000)

      // 定义窗口持续时间
      val windowDuration = "1 minutes"

      // 分组计数
      val windowedCounts = filteredLines.groupBy($"district", window($"eventTime", windowDuration))
        .count()
        .sort(asc("window"))

      // 添加调试信息,检查分组计数后的数据
//      windowedCounts.writeStream.format("console").option("truncate", "false").start().awaitTermination(10000)

      // 启动流处理查询
      query = Some(windowedCounts.writeStream.outputMode("complete")
        .format("console").option("truncate", "false")
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .start())

      // 等待查询终止
      query.get.awaitTermination()

    } catch {
      case e: Exception =>
        val logger = Logger.getLogger(this.getClass)
        logger.error("An error occurred: " + e.getMessage, e)
        System.exit(1)
    } finally {
      query.foreach(_.stop())
      spark.stop()
    }
  }

}

代码运行效果如下

启动文件生成程序,模拟json文件数据

启动StructuredFilePurchaseCount处理json文件


http://www.kler.cn/a/402706.html

相关文章:

  • 【时时三省】NIT计算机考试基础知识
  • 《操作系统 - 清华大学》5 -2:覆盖技术
  • 高级网络安全——SSL/TLS, HTTPS, VPN(week4)
  • IP转发流程
  • 【C】错误的变量定义导致sprintf()‌输出错误
  • 【数据结构 | C++】部落
  • 结合第三方模块requests,文件IO、正则表达式,通过函数封装爬虫应用采集数据
  • vue 获取项目本地文件并转base64
  • sei主网节点快速搭建方法
  • 【西瓜书】线性判别分析-LDA
  • 详细解读EcoVadis认证
  • 【K8S系列】深入探讨 Kubernetes 资源配额(Resource Quotas)实现方案
  • React Native的界面与交互
  • 嵌入式学习-C嘎嘎-Day06
  • 11.20Pytorch_概数和基础
  • 深度学习:神经网络中的非线性激活的使用
  • 深入理解C++11右值引用与移动语义:高效编程的基石
  • Android开发实战班 - 现代 UI 开发之自定义 Compose 组件
  • Java基于微信小程序的校园跑腿平台(V2.0)
  • elementUI 表格组件结合单选框做单选效果显示
  • 人形机器人开发、XR仿真训练、影视动画制作,一副手套支持多种应用
  • 安装CLIP
  • 前端项目支持tailwindcss写样式
  • 【Linux】编译器gcc/g++、动静态库
  • docker pull命令拉取镜像失败的解决方案
  • 机器学习笔记——聚类算法(Kmeans、GMM-使用EM优化)