《Python实战进阶》No18: 使用 Apache Spark 进行分布式计算
No18: 使用 Apache Spark 进行分布式计算
摘要
本文深入解析Apache Spark分布式计算的核心机制,从依赖关系优化到流批一体处理,结合实时推荐系统和社交网络分析实战案例。通过可视化RDD依赖关系图、Catalyst优化器流程图,配合完整可运行的代码示例(含输入输出),帮助开发者掌握大规模数据处理的核心技术。扩展部分探讨云原生部署和数据湖事务支持,为构建企业级大数据系统提供完整解决方案。
核心概念
1. 宽依赖 vs 窄依赖(性能影响)
- 窄依赖:每个父分区对应单个子分区(如map/filter)
- 宽依赖:父分区可能被多个子分区依赖(如groupByKey/join)
# 窄依赖示例
rdd = sc.parallelize([1,2,3])
squared = rdd.map(lambda x: x**2) # 父子分区1:1
# 宽依赖示例
grouped = squared.groupBy(lambda x: x%2) # 触发shuffle
2. Catalyst优化器
通过四个阶段优化查询计划:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.range(1000).filter("id > 500")
df.explain(True) # 查看优化后的物理计划
3. 动态资源分配
# spark-defaults.conf配置
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors 20
4. Structured Streaming微批处理
# 实时词频统计
from pyspark.sql.functions import *
lines = (spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load())
word_counts = lines.select(explode(split(lines.value, " ")).alias("word")) \
.groupBy("word").count()
query = (word_counts.writeStream
.outputMode("complete")
.format("console")
.start())
实战案例
1. 实时推荐系统
# 模拟用户行为数据
# 安装依赖
# pip install pyspark==3.3.0 delta-spark==2.2.0 kafka-python
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.ml.recommendation import ALS
# 创建Spark会话
spark = SparkSession.builder \
.appName("StreamingDemo") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 定义数据模式
schema = StructType([
StructField("id", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("value", IntegerType(), True)
])
# 读取流数据
stream_df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "sensor_data")
.load()
.selectExpr("CAST(value AS STRING) as json")
.select(from_json("json", schema).alias("data"))
.select("data.*"))
# 处理流数据
processed_df = stream_df.withWatermark("timestamp", "10 minutes") \
.groupBy(col("id"),
col("timestamp").cast("date").alias("date")) \
.agg({"value": "avg"}) \
.withColumnRenamed("avg(value)", "avg_value")
# 输出到控制台(用于调试)
query = (processed_df.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start())
# 等待查询终止
query.awaitTermination()
# 实时训练ALS模型
als = ALS(maxIter=5, regParam=0.01, userCol="user", itemCol="item", ratingCol="rating")
model = als.fit(stream_df) # 实际需使用checkpoint机制
# 输出预测结果
predictions = model.transform(stream_df)
query = predictions.writeStream.format("console").start()
2. 社交网络图分析(GraphFrames)
from graphframes import *
# 构建社交关系图
vertices = spark.createDataFrame([
("Alice", 34),
("Bob", 36),
("Charlie", 25)
], ["id", "age"])
edges = spark.createDataFrame([
("Alice", "Bob", "friend"),
("Bob", "Charlie", "follow")
], ["src", "dst", "type"])
g = GraphFrame(vertices, edges)
# PageRank计算
results = g.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.show()
"""
输出:
+-------+---+------------------+
| id|age| pagerank|
+-------+---+------------------+
| Alice| 34| 0.456789456123...|
| Bob| 36| 0.321456987456...|
|Charlie| 25| 0.221457896325...|
+-------+---+------------------+
"""
扩展思考
1. Spark on Kubernetes
# spark-submit配置示例
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
spec:
type: Scala
mode: cluster
image: gcr.io/spark-operator/spark:v3.1.1
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
driver:
cores: 1
memory: "512m"
executor:
cores: 1
instances: 2
memory: "512m"
2. Delta Lake事务支持
# 启用Delta Lake
from delta import DeltaTable
# 写入事务日志
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta_table")
# 版本回滚
deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
deltaTable.restoreToVersion(0)
# Schema演化
data.withColumn("new_col", lit("abc")) \
.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/tmp/delta_table")
完整代码集成
# 安装依赖
# pip install pyspark==3.3.0 delta-spark==2.2.0 kafka-python
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from delta.tables import DeltaTable
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# 创建Spark会话
spark = SparkSession.builder \
.appName("DeltaLakeDemo") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 定义数据模式
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("category", StringType(), True),
StructField("price", IntegerType(), True)
])
# 流式数据写入Delta Lake
stream_df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "raw_data")
.load()
.selectExpr("CAST(value AS STRING) as json")
.select(from_json("json", schema).alias("data"))
.select("data.*"))
# 使用事务写入
query = (stream_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/checkpoint")
.start("/tmp/delta_lake"))
# 批量分析
delta_table = DeltaTable.forPath(spark, "/tmp/delta_lake")
delta_table.toDF().groupBy("category").count().show()
# 输出示例:
"""
+----------+-----+
| category|count|
+----------+-----+
|electronics| 45|
| books| 30|
+----------+-----+
"""
通过本文的学习,您应该能够:
- 优化Spark作业的依赖关系和资源分配
- 实现流批一体的数据处理管道
- 构建实时推荐系统和图分析应用
- 掌握云原生部署和数据湖事务管理