当前位置: 首页 > article >正文

PySpark查找Dataframe中的非ASCII字符并导出Excel文件

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
import pandas as pd

# 初始化Spark会话
spark = SparkSession.builder.appName("StringFilter").getOrCreate()

# 假设原始DataFrame为df(这里需要替换为实际的数据源读取逻辑)
# df = spark.read.csv("input.csv", header=True)

# 获取所有字符串类型的列名
string_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]

# 定义允许的字符集合
allowed_chars = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!"#$%&\'()*+,-./:;<=>?@[]^_`{|}~')

# 定义检查非法字符的UDF
def has_invalid_chars(s):
    if s is None:
        return False
    return any(c not in allowed_chars for c in s)

has_invalid_udf = udf(has_invalid_chars, BooleanType())

# 构建过滤条件
if not string_columns:
    # 如果没有字符串列,直接创建空DataFrame
    result_df = spark.createDataFrame([], df.schema)
else:
    # 组合所有字符串列的检查条件
    condition = None
    for col_name in string_columns:
        col_condition = has_invalid_udf(col(col_name))
        if condition is None:
            condition = col_condition
        else:
            condition = condition | col_condition
    
    # 过滤出包含非法字符的行
    filtered_df = df.filter(condition)
    
    # 创建结构相同的空DataFrame并合并结果
    empty_df = spark.createDataFrame([], df.schema)
    result_df = empty_df.union(filtered_df)

# 导出为Excel文件(注意:此操作会将数据收集到Driver节点)
pd_df = result_df.toPandas()
pd_df.to_excel("output.xlsx", index=False)

# 停止Spark会话(根据实际需要决定是否保留会话)
spark.stop()

代码说明:

  1. 初始化与数据读取:需要根据实际数据源替换读取逻辑(示例中被注释掉的spark.read.csv部分)
  2. 获取字符串列:通过分析Schema获取所有字符串类型的字段
  3. 定义字符白名单:使用集合类型提升查询效率
  4. UDF定义:用于检查字符串是否包含非法字符
  5. 条件构建:使用逻辑或组合所有字符串列的检查条件
  6. 结果处理
    • 直接处理空字符串列的边界情况
    • 使用union保持与原DataFrame结构一致
  7. Excel导出
    • 通过转换为Pandas DataFrame实现
    • 注意大数据量时可能存在的内存问题

注意事项:

  1. 大数据量场景下建议分批次处理或使用分布式写入方式
  2. Excel导出操作会触发数据收集到Driver节点,需确保资源充足
  3. 实际应用中建议添加异常处理机制
  4. 空值处理逻辑可根据业务需求调整(当前版本忽略NULL值)

http://www.kler.cn/a/543387.html

相关文章:

  • TDengine 产品由哪些组件构成
  • 自然语言处理NLP入门 -- 第一节基础概念
  • WPF 设置宽度为 父容器 宽度的一半
  • 团结引擎 Shader Graph:解锁图形创作新高度
  • 基础设施在平台工程中的作用
  • C++智能指针的使用
  • 07贪心 + 动态规划(D1_基础学习)
  • 蓝桥杯试题:归并排序
  • PySide (PyQt)的视图(QGraphicsView)和场景(QGraphicsScene)
  • 深入理解现代前端框架:Vue.js 的进阶探秘
  • bash shell笔记——循环结构
  • 网络初识-
  • 基于微信小程序的超市售货管理平台
  • HttpServletRequest 作用
  • 从洗牌算法到前端工程化:如何用JavaScript实现真正的随机排序并应用到实际项目中
  • 2025常用的SEO工具有哪些?
  • 文档格式转换引擎开发:支持PDF与OFD的技术实现
  • 快速搭建支持图片实时处理的文件服务
  • pytest测试专题 - 1.1 运行pytest
  • SpringBoot3.2新武器RestClient如何替代Feign?
  • 42页精品PPT | 数字政府智慧政务大数据平台建设方案
  • STM 32 IO模拟两线串行接口
  • 使用EVE-NG-锐捷实现ACL访问控制
  • 深度剖析观察者模式:从理论到实战的Java实现
  • Prompt 工程优化方
  • 如何本地部署DeepSeek集成Word办公软件