from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
import pandas as pd
spark = SparkSession.builder.appName("StringFilter").getOrCreate()
string_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]
allowed_chars = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!"#$%&\'()*+,-./:;<=>?@[]^_`{|}~')
def has_invalid_chars(s):
if s is None:
return False
return any(c not in allowed_chars for c in s)
has_invalid_udf = udf(has_invalid_chars, BooleanType())
if not string_columns:
result_df = spark.createDataFrame([], df.schema)
else:
condition = None
for col_name in string_columns:
col_condition = has_invalid_udf(col(col_name))
if condition is None:
condition = col_condition
else:
condition = condition | col_condition
filtered_df = df.filter(condition)
empty_df = spark.createDataFrame([], df.schema)
result_df = empty_df.union(filtered_df)
pd_df = result_df.toPandas()
pd_df.to_excel("output.xlsx", index=False)
spark.stop()
代码说明:
- 初始化与数据读取:需要根据实际数据源替换读取逻辑(示例中被注释掉的
spark.read.csv
部分) - 获取字符串列:通过分析Schema获取所有字符串类型的字段
- 定义字符白名单:使用集合类型提升查询效率
- UDF定义:用于检查字符串是否包含非法字符
- 条件构建:使用逻辑或组合所有字符串列的检查条件
- 结果处理:
- 直接处理空字符串列的边界情况
- 使用
union
保持与原DataFrame结构一致
- Excel导出:
- 通过转换为Pandas DataFrame实现
- 注意大数据量时可能存在的内存问题
注意事项:
- 大数据量场景下建议分批次处理或使用分布式写入方式
- Excel导出操作会触发数据收集到Driver节点,需确保资源充足
- 实际应用中建议添加异常处理机制
- 空值处理逻辑可根据业务需求调整(当前版本忽略NULL值)