当前位置: 首页 > article >正文

Dataworks_PySpark开发流程

PySpark是由Spark官方开发的Python语言第三方库,Python开发者可以通过使用python语言来编写Spark程序和SparkSQL完成开发。

之所以采用PySpark而不采用Java/Scala,是由于:

  1. Dataworks可通过将代码在线写入DataWorks Python资源的方式,实现PySpark作业开发,并通过ODPS Spark节点提交运行该代码逻辑,开发过程较为简单。

  2. 而使用Java或Scala语言类型代码前,需先在本地开发好Spark on MaxCompute作业代码,再通过DataWorks上传为MaxCompute的资源。

步骤一、编写PySpark代码

  1. 在Dataworks业务流程中,右键点击MacCompute文件夹下的资源文件夹,选择新建资源,选择Python。

  2. 设置python脚本名称,以.py为后缀名,点击新建

  3. 编写PySpark代码(注意:代码中不能含有中文)

    1. 代码示例1:判断一个字符串是否可以转换为数字

      注意,如果表/代码中含有中文,必须修改为utf8编码格式
      默认编码是ascii,出现中文字符会报错
      
      
      # -*- coding: utf-8 -*-
      import sys
      reload(sys)
      sys.setdefaultencoding('utf8')
      # -*- coding: utf-8 -*-
      # Spark2.x
      import sys
      from pyspark.sql import SparkSession
      
      try:
          # for python 2
          reload(sys)
          sys.setdefaultencoding('utf8')
      except:
          # python 3 not needed
          pass
      
      if __name__ == '__main__':
          spark = SparkSession.builder\
              .appName("spark sql")\
              .config("spark.sql.broadcastTimeout", 20 * 60)\
              .config("spark.sql.crossJoin.enabled", True)\
              .config("odps.exec.dynamic.partition.mode", "nonstrict")\
              .config("spark.sql.catalogImplementation", "odps")\
              .getOrCreate()
      
      def is_number(s):
          try:
              float(s)
              return True
          except ValueError:
              pass
      
          try:
              import unicodedata
              unicodedata.numeric(s)
              return True
          except (TypeError, ValueError):
              pass
      
          return False
      
      print(is_number('foo'))
      print(is_number('1'))
      print(is_number('1.3'))
      print(is_number('-1.37'))
      print(is_number('1e3'))
    2. 代码示例2:建表、插入数据、读取数据
      # -*- coding: utf-8 -*-
      #Spark2.x
      from pyspark.sql import SparkSession
      if __name__ == '__main__':
          spark = SparkSession.builder.appName("spark sql").getOrCreate()
          spark.sql("DROP TABLE IF EXISTS spark_sql_test_table")
          spark.sql("CREATE TABLE spark_sql_test_table(name STRING, num BIGINT)")
          spark.sql("INSERT INTO spark_sql_test_table SELECT 'abc', 100000")
          spark.sql("SELECT * FROM spark_sql_test_table").show()
          spark.sql("SELECT COUNT(*) FROM spark_sql_test_table").show()
  4. 点击提交,保存并提交资源

步骤二、创建并配置ODPS Spark节点

  1. 右键点击文件夹,选择新建节点,选择新建ODPS Spark节点
  2. 输入ODPS Spark节点名称,点击确认
  3. 配置节点参数:选择Spark版本,选择Python语言,选择主python资源(步骤一的PySpark代码),添加配置项目;设置调度配置参数,点击提交,保存并提交节点

步骤三、查看输出结果

由于数据开发中的ODPS Spark节点没有运行入口,因此需要在开发环境的运维中心执行Spark任务。

  1. 进入运维中心,点击周期任务运维,点击补数据实例,点击新建补数据任务
  2. 搜索ODPS Spark节点任务,点击添加
  3. 设置补数据运行策略,点击提交
  4. 等待运行完成,点击实例名称pyspark_test,点击查看日志,点击logview url
  5. 点击master-0,点击StdOut,查看输出结果

参考文档:

  1. 开发ODPS Spark任务
  2. PySpark开发示例

  3. PySpark基础操作

  4. SparkSQL基础语法

  5. pyspark中文api_pyspark中文文档-CSDN博客


http://www.kler.cn/a/284968.html

相关文章:

  • uni-app表单⑪
  • 双十一云服务器抢购后,用SD-WAN连通多云网络
  • 如何在有限内存下对外部大文件进行排序
  • 16S,18S引物覆盖度测试:SILVA和PR2
  • 文本语义分块、RAG 系统的分块难题:小型语言模型如何找到最佳断点
  • 【Vue】Vue3.0(二十二) v-model 在原始Dom元素、自定义输入组件中双向绑定的底层实现原理详解
  • azure-search-openai-demo-csharp does not deploy correctly to azure clooad
  • vue项目打包压缩静态资源—使用compression-webpack-plugin
  • 是否应该使用WordPress自动更新的功能
  • ComfyUI使用Flux模型
  • 黑马JavaWeb开发笔记07——Ajax、Axios请求、前后端分离开发介绍、Yapi详细配置步骤
  • 网络压缩之参数量化(parameter quantization)
  • Spring Boot发送http请求
  • C语言补习课
  • gdb 教程
  • springboot学习(2)
  • 美团2024秋招编程题:小美的red子序列数量之和
  • WebSocket 实现消息推送
  • AOP 面向切片编程
  • 我的推荐:腾讯云罗云《从零构建向量数据库》
  • 无人机之遥控器防水性能篇
  • Ubuntu 20.04 安装 GitHub CLI(gh),并使用
  • C语言——简单的do while循环找100~999之间的水仙花数(所有的三位水仙花数)
  • 数据结构(三)——双向链表,循环链表,内核链表,栈和队列
  • 『功能项目』怪物反击主角复活【14】
  • spring security 会话管理