当前位置: 首页 > article >正文

【spark】pyspark kerberos 案例,即pyspark-utils客户端工具类

xml_utils.py文件:

# -*- coding: utf-8 -*-
import xml.etree.ElementTree as ET

def parse_xml(file_path):
    map = {}
    tree = ET.parse(file_path)
    root = tree.getroot()
    props = root.findall("property")
    for prop in props:
        name = prop.find('name').text
        value = prop.find('value').text
        map[name] = value
    return map

# test only
if __name__ == '__main__':
    xml_file = 'D:\hive\conf\hive-site.xml'
    map = parse_xml(xml_file)
    print(map)

spark_utils.py文件:

# -*- coding: utf-8 -*-
import os
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from src.xml_utils import parse_xml

metastore_uris_key = "hive.metastore.uris"
metastore_princiapl_key = "hive.metastore.kerberos.principal"
metastore_sasl_enabled_key = "hive.metastore.sasl.enabled"


class SparkUtils(object):

    def __init__(self, hadoop_home
                 , hadoop_conf_dir
                 , principal
                 , keytab_path
                 , krb5_realm
                 , krb5_kdc
                 , krb5_conf
                 , hive_conf_dir
                 , log_level="INFO"):
        self.hadoop_home = hadoop_home
        self.hadoop_conf_dir = hadoop_conf_dir
        self.principal = principal
        self.keytab_path = keytab_path
        self.krb5_realm = krb5_realm
        self.krb5_kdc = krb5_kdc
        self.krb5_conf = krb5_conf
        self.hive_conf_dir = hive_conf_dir
        hive_site_map = parse_xml(f"{hive_conf_dir}/hive-site.xml")
        self.metastore_uris = hive_site_map.get(metastore_uris_key)
        self.metastore_princiapl = hive_site_map.get(metastore_princiapl_key)
        self.metastore_sasl_enabled = hive_site_map.get(metastore_sasl_enabled_key)
        self.java_options = f"-Djava.security.krb5.conf={krb5_conf} -Djava.security.krb5.realm={krb5_realm} -Djava.security.krb5.kdc={krb5_kdc}"
        self.log_level = log_level

    def get_spark(self):
        os.environ['HADOOP_HOME'] = self.hadoop_home
        os.environ['HADOOP_CONF_DIR'] = self.hadoop_conf_dir
        conf = SparkConf().setAppName("pyspark-sql") \
            .setSparkHome("local[*]") \
            .set("spark.sql.catalogImplementation", "hive") \
            .set(metastore_uris_key, self.metastore_uris) \
            .set(metastore_princiapl_key, self.metastore_princiapl) \
            .set(metastore_sasl_enabled_key, self.metastore_sasl_enabled) \
            .set("spark.driver.extraJavaOptions", self.java_options) \
            .set("spark.executor.extraJavaOptions", self.java_options) \
            .set("spark.yarn.keytab", self.keytab_path) \
            .set("spark.yarn.principal", self.principal)

        sc = SparkContext(conf=conf)
        sc.setLogLevel(self.log_level)
        spark = SparkSession(sc)
        return spark

# -*- coding: utf-8 -*-

from pyspark.sql.types import *

from src.spark_utils import SparkUtils
import pandas as pd

if __name__ == '__main__':
    keytab_path = '/D:/kerberos/user.keytab'  # keytab位置
    principal = 'user@XXXXX.COM'
    hadoop_home = '/D:\env\components\hadoop'
    hadoop_conf_dir = 'D:\env\components\hadoop\etc\hadoop-exp'
    krb5_realm = "XXXXX.COM"
    krb5_kdc = "kdc.xxx.com"
    krb5_conf = "/D:\xxx\krb5.conf"
    hive_conf_dir = "D:\hive\conf"
    log_level = "INFO"

    spark_utils = SparkUtils(hadoop_home
                             , hadoop_conf_dir
                             , principal
                             , keytab_path
                             , krb5_realm
                             , krb5_kdc
                             , krb5_conf
                             , hive_conf_dir
                             , log_level)
    spark = spark_utils.get_spark()

    pd01 = spark.sql('select id,name,score from default.tbl1').toPandas()
    pd02 = spark.sql('select id,name,score from default.tbl2').toPandas()

    union_pd = pd.concat([pd01, pd02], ignore_index=True)
    agg_pd = union_pd.groupby(['id','name']).agg({'age': 'sum'}).reset_index()

    agg_pd_schema=StructType([
      StructField('id', StringType(), True),
      StructField('name', StringType(), True),
      StructField('total_score', LongType(), True)
      ])
    agg_df=spark.createDataFrame(agg_pd,schema=agg_pd_schema)
    # agg_df.show()
    agg_df.write.mode('overwrite').saveAsTable('default.tbl3')
    spark.stop()


http://www.kler.cn/a/410384.html

相关文章:

  • 【Python】分割秘籍!掌握split()方法,让你的字符串处理轻松无敌!
  • python(四)os模块、sys模块
  • 如何用通义灵码快速绘制流程图?
  • 自制Windows系统(十)
  • 设计模式:责任链实现数据流风格的数据处理
  • 力扣 LeetCode 110. 平衡二叉树(Day8:二叉树)
  • CentOS7卸载node
  • C51相关实验
  • 基于RAG的text2sql解决方案vanna-ai 私有化部署使用实战
  • 【已解决】python面试、竞赛编程问题:最长递增子序列和旅行商问题(TSP)
  • C 语言学习-06【指针】
  • 探索1688关键词API接口:Python爬虫的高效之旅
  • I2C学习
  • Elasticsearch向量搜索:从语义搜索到图搜图只有一步之遥
  • 【C#】CancellationTokenSource 为任务或线程提供一种优雅的方式来支持取消操作
  • HTML飞舞的爱心
  • 使用八爪鱼爬虫抓取汽车网站数据,分析舆情数据
  • Cocos creator 3.8 一些事件的使用,加载预制体的两种方式 5
  • 深入理解 MyBatis 的缓存机制:一级缓存与二级缓存
  • Java工程管理数字化智慧工地云平台SaaS源码 (PC端、移动端、大屏端)
  • 2022年计算机网络408考研真题解析
  • Qt中2D绘制系统
  • QT简易项目 数据库可视化界面 数据库编程SQLITE QT5.12.3环境 C++实现
  • Leetcode 3362. Zero Array Transformation III
  • JUC:Java内存模型JMM
  • 【深度学习】利用Java DL4J构建金融欺诈检测系统