当前位置: 首页 > article >正文

spark读取数据性能提升

1. 背景

spark默认的jdbc只会用单task读取数据,读取大数据量时,效率低。

2. 解决方案

根据分区字段,如日期进行划分,增加task数量提升效率。

  /**
    * 返回每个task按时间段划分的过滤语句
    * @param startDate
    * @param endDate
    * @param threadCount
    * @return
    */
  def getPredicateDates(startDate: String, endDate: String, threadCount: Int): Array[String] = {
    getPredicates(startDate, endDate, threadCount).map(x=>s"recordDate>='${x._1}' and recordDate <='${x._2}'")
  }


  /**
    * 将startDate到endDate间的日期,根据给定的threadCount参数,做时间段划分,例如:
    * getPredicates("2017-01-01", "2017-01-31", 10)
    * 返回:
    * 2017-01-01 -> 2017-01-04
    * 2017-01-05 -> 2017-01-08
    * 2017-01-09 -> 2017-01-12
    * 2017-01-13 -> 2017-01-16
    * 2017-01-17 -> 2017-01-20
    * 2017-01-21 -> 2017-01-24
    * 2017-01-25 -> 2017-01-28
    * 2017-01-29 -> 2017-01-31
    *
    * @param startDate   开始日期
    * @param endDate     结束日期
    * @param threadCount 线程数
    * @return 包含各个连续时段的数组
    */
  def getPredicates(startDate: String, endDate: String, threadCount: Int): Array[(String, String)] = {
    val dayDiff = DateTimeUtils.rangeDay(startDate, endDate)

    val buff = new ArrayBuffer[(String, String)]()

    if (dayDiff <= threadCount) {
      //天数差小于期望的线程数,则按照每天一个线程处理
      var tempDate = startDate
      while (tempDate <= endDate) {
        buff += (tempDate -> tempDate)
        tempDate = DateTimeUtils.dateAddOne(tempDate)
      }
    } else {
      //天数差大于期望的线程数,则按照线程数对时间段切分
      val offset = (dayDiff / threadCount).toInt
      var tempDate = startDate

      while (DateTimeUtils.dateAddN(tempDate, offset) <= endDate) {
        buff += (tempDate -> DateTimeUtils.dateAddN(tempDate, offset))
        tempDate = DateTimeUtils.dateAddOne(DateTimeUtils.dateAddN(tempDate, offset))
      }

      if (tempDate != endDate) {
        buff += (tempDate -> endDate)
      }
    }

    buff.toArray
  }

DateTimeUtils工具类
import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Locale}

object DateTimeUtils {

  def rangeDay(startDateStr: String, endDateStr: String): Long = {
    val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    val startDate: Date = dateFormat.parse(startDateStr)
    val endDate: Date = dateFormat.parse(endDateStr)

    (endDate.getTime() - startDate.getTime()) / 1000 / 60 / 60 / 24
  }


  def dateAddOne(dateStr: String): String = {
    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var dateInfo: Date = dateFormat.parse(dateStr)
    var cal: Calendar = Calendar.getInstance()
    cal.setTime(dateInfo)
    cal.add(Calendar.DATE, 1)
    dateFormat.format(cal.getTime)
  }

  def dateAddN(dateStr: String, value: Int): String = {
    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    var dateInfo: Date = dateFormat.parse(dateStr)
    var cal: Calendar = Calendar.getInstance()
    cal.setTime(dateInfo)
    cal.add(Calendar.DATE, value)
    dateFormat.format(cal.getTime)
  }
}

举例

    val startDate = DateTimeUtils.dateAddN(calcDate,-365) //获取计算日期一年前的日期作为开始时间
    val predicates= getPredicateDates(startDate,calcDate,12) //分12个task读取,提高性能
    val url = PropUtils.getProxyJdbc() //jdbc连接的代理(需按自己的项目实现)
    val res = spark.read.jdbc(url, tableName, predicates,PropUtils.getProperties()) 

3. 实验及结论

使用1个节点 8核16G的Clickhouse数据库,spark从clickhouse读取近4亿行数据。

单Task运行时间:14min

按日期划分成12个Task,运行时间:1.6min

结论:性能提升88.6%


http://www.kler.cn/a/319929.html

相关文章:

  • 从AI生成内容到虚拟现实:娱乐体验的新边界
  • MySQL 数据库 :SQL 语句规约(不得使用外键与级联,一切外键概念必须在应用层解决。)
  • LoadBalancer负载均衡服务调用
  • Dubbo泛化调用
  • 【Kotlin】上手学习之类型篇
  • 从字符串使用看Golang和Rust对内存使用的区别
  • PostgreSQL技术内幕12:PostgreSQL事务原理解析-锁管理
  • 【Axure视频教程】跨页面控制中继器表格
  • 商城小程序后端开发实践中出现的问题及其解决方法
  • 【算法——双指针】
  • 机器学习中的KNN算法:原理、应用与实践
  • xpath在爬虫中的应用、xpath插件的安装及使用
  • Python爬虫-Post请求中,参数只有value没有key,如何正确处理?
  • 关联式容器——map与set
  • 集合ArrayList常用方法
  • 鸿蒙界面开发——组件(9):进度条Progress 滑动条Slider
  • 开源数据集网站合集
  • 初试Bootstrap前端框架
  • Spring Boot房屋租赁平台:现代化解决方案
  • 微信小程序IOS真机调试-onPullDownRefresh和onReachBottom不生效
  • 年轻用户对Facebook的使用趋势分析
  • 【MySQL】数据库的操作
  • ⭐ Unity 对象池的应用 Cube流星落
  • 【Roblox/Lua】Roblox抽奖游戏设计概述
  • 扩展uview复选组件库支持自定义图片+自定义内容
  • 笔记整理—内核!启动!—linux应用编程、网络编程部分(3)文件共享与标准IO