当前位置: 首页 > article >正文

Pyspark实例_读取json文件存入到hive表内

  • 需求:
    • 将一个json文件的数据写入到hive数据库中
  • 实例思路:
    • 使用spark.read.json读取json文件转为RDD
    • 将RDD注册为临时表
    • 通过sql语句筛选和转换想要插入的数据
    • spark.sql执行插入
# -*- coding: utf-8 -*-

# 导入findspark模块,并初始化它,以便在环境中找到Spark
import findspark
findspark.init()

import sys
from pyspark.sql import SparkSession
import json

# 创建一个SparkSession,指定运行模式为YARN,应用名称为"json_to_hive",并启用Hive支持
spark = SparkSession.builder.master("yarn").appName("json_to_hive").enableHiveSupport().getOrCreate()


# 指定JSON文件名称
file_name = "xxx.json"
# 指定JSON文件的路径
json_path = "/opt/app/json_data/" + file_name


# 打开JSON文件,并读取其内容到data变量中
with open(json_path, 'r') as file:
    data = json.load(file)
# 从data变量中提取名为'data'的键对应的值,这是一个列表,包含多个JSON对象
json_data = data.get('data')


# 使用Spark的read.json方法从json_data列表(通过spark.sparkContext.parallelize转换为RDD)中读取数据,并创建DataFrame
df = spark.read.json(spark.sparkContext.parallelize(json_data))

# 将DataFrame注册为一个临时表,以便后续可以使用SQL语句进行查询和操作
df.registerTempTable("tmp_table")

# 定义一个SQL语句,用于将临时表中的数据插入到Hive表中
# 使用insert overwrite语句会覆盖目标表分区中的现有数据
insert_table_sql = """
insert overwrite table database.table_name partition(par_column='par_name')  
select field1, field2 from tmp_table
"""

# 执行SQL语句,将数据插入到Hive表中
spark.sql(insert_table_sql)

# 当所有操作完成后,停止SparkSession以释放资源
spark.stop()

http://www.kler.cn/a/550266.html

相关文章:

  • 麒麟v10 nginx脚本安装
  • DeepSeek R1本地部署 DeepSeek Api接口调用 DeepSeek RAG知识库工作流详解
  • 人工智能3d点云之Pointnet++项目实战源码解读(点云分类与分割)
  • OpenGL ES - 数学基础
  • 什么是网络安全威胁?常见威胁有哪些?
  • 从低清到4K的魔法:FlashVideo突破高分辨率视频生成计算瓶颈(港大港中文字节)
  • Java并发编程5--Java内存模型的基础
  • 同步异步日志系统-项目介绍
  • 【MySQL】第六弹---数据库表约束详解:从空属性到主键的全方位指南
  • Vue 前端开发中的路由知识:从入门到精通
  • DeepSeek 的创新融合:多行业应用实践探索
  • Android 中使用 FFmpeg 进行音视频处理
  • 网工项目理论1.7 设备选型
  • node.js + html调用ChatGPTApi实现Ai网站demo(带源码)
  • PosgreSQL比MySQL更优秀吗?
  • 容联云联络中心AICC:深度整合DeepSeek,业务验证结果公开
  • MDX语言的安全开发
  • 安全筑基,智能赋能:BeeWorks IM引领企业协同新纪元
  • 缺陷检测之图片标注工具--labme
  • 汇能感知摄像头模组/模块产品有哪些?