Pyspark实例_读取json文件存入到hive表内
- 需求:
- 将一个json文件的数据写入到hive数据库中
- 实例思路:
- 使用spark.read.json读取json文件转为RDD
- 将RDD注册为临时表
- 通过sql语句筛选和转换想要插入的数据
- spark.sql执行插入
# -*- coding: utf-8 -*-
# 导入findspark模块,并初始化它,以便在环境中找到Spark
import findspark
findspark.init()
import sys
from pyspark.sql import SparkSession
import json
# 创建一个SparkSession,指定运行模式为YARN,应用名称为"json_to_hive",并启用Hive支持
spark = SparkSession.builder.master("yarn").appName("json_to_hive").enableHiveSupport().getOrCreate()
# 指定JSON文件名称
file_name = "xxx.json"
# 指定JSON文件的路径
json_path = "/opt/app/json_data/" + file_name
# 打开JSON文件,并读取其内容到data变量中
with open(json_path, 'r') as file:
data = json.load(file)
# 从data变量中提取名为'data'的键对应的值,这是一个列表,包含多个JSON对象
json_data = data.get('data')
# 使用Spark的read.json方法从json_data列表(通过spark.sparkContext.parallelize转换为RDD)中读取数据,并创建DataFrame
df = spark.read.json(spark.sparkContext.parallelize(json_data))
# 将DataFrame注册为一个临时表,以便后续可以使用SQL语句进行查询和操作
df.registerTempTable("tmp_table")
# 定义一个SQL语句,用于将临时表中的数据插入到Hive表中
# 使用insert overwrite语句会覆盖目标表分区中的现有数据
insert_table_sql = """
insert overwrite table database.table_name partition(par_column='par_name')
select field1, field2 from tmp_table
"""
# 执行SQL语句,将数据插入到Hive表中
spark.sql(insert_table_sql)
# 当所有操作完成后,停止SparkSession以释放资源
spark.stop()