当前位置: 首页 > article >正文

《Python实战进阶》No18: 使用 Apache Spark 进行分布式计算

No18: 使用 Apache Spark 进行分布式计算


摘要

本文深入解析Apache Spark分布式计算的核心机制,从依赖关系优化到流批一体处理,结合实时推荐系统和社交网络分析实战案例。通过可视化RDD依赖关系图、Catalyst优化器流程图,配合完整可运行的代码示例(含输入输出),帮助开发者掌握大规模数据处理的核心技术。扩展部分探讨云原生部署和数据湖事务支持,为构建企业级大数据系统提供完整解决方案。
在这里插入图片描述


核心概念

1. 宽依赖 vs 窄依赖(性能影响)

  • 窄依赖:每个父分区对应单个子分区(如map/filter)
  • 宽依赖:父分区可能被多个子分区依赖(如groupByKey/join)
# 窄依赖示例
rdd = sc.parallelize([1,2,3])
squared = rdd.map(lambda x: x**2)  # 父子分区1:1

# 宽依赖示例
grouped = squared.groupBy(lambda x: x%2)  # 触发shuffle

2. Catalyst优化器

通过四个阶段优化查询计划:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.range(1000).filter("id > 500")
df.explain(True)  # 查看优化后的物理计划

3. 动态资源分配

# spark-defaults.conf配置
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors 20

4. Structured Streaming微批处理

# 实时词频统计
from pyspark.sql.functions import *

lines = (spark.readStream
         .format("socket")
         .option("host", "localhost")
         .option("port", 9999)
         .load())

word_counts = lines.select(explode(split(lines.value, " ")).alias("word")) \
                   .groupBy("word").count()

query = (word_counts.writeStream
         .outputMode("complete")
         .format("console")
         .start())

实战案例

1. 实时推荐系统

# 模拟用户行为数据
# 安装依赖
# pip install pyspark==3.3.0 delta-spark==2.2.0 kafka-python

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.ml.recommendation import ALS

# 创建Spark会话
spark = SparkSession.builder \
    .appName("StreamingDemo") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 定义数据模式
schema = StructType([
    StructField("id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("value", IntegerType(), True)
])

# 读取流数据
stream_df = (spark.readStream
             .format("kafka")
             .option("kafka.bootstrap.servers", "localhost:9092")
             .option("subscribe", "sensor_data")
             .load()
             .selectExpr("CAST(value AS STRING) as json")
             .select(from_json("json", schema).alias("data"))
             .select("data.*"))

# 处理流数据
processed_df = stream_df.withWatermark("timestamp", "10 minutes") \
    .groupBy(col("id"), 
             col("timestamp").cast("date").alias("date")) \
    .agg({"value": "avg"}) \
    .withColumnRenamed("avg(value)", "avg_value")

# 输出到控制台(用于调试)
query = (processed_df.writeStream
         .outputMode("complete")
         .format("console")
         .option("truncate", "false")
         .start())

# 等待查询终止
query.awaitTermination()

# 实时训练ALS模型
als = ALS(maxIter=5, regParam=0.01, userCol="user", itemCol="item", ratingCol="rating")
model = als.fit(stream_df)  # 实际需使用checkpoint机制

# 输出预测结果
predictions = model.transform(stream_df)
query = predictions.writeStream.format("console").start()

2. 社交网络图分析(GraphFrames)

from graphframes import *

# 构建社交关系图
vertices = spark.createDataFrame([
    ("Alice", 34),
    ("Bob", 36),
    ("Charlie", 25)
], ["id", "age"])

edges = spark.createDataFrame([
    ("Alice", "Bob", "friend"),
    ("Bob", "Charlie", "follow")
], ["src", "dst", "type"])

g = GraphFrame(vertices, edges)

# PageRank计算
results = g.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.show()
"""
输出:
+-------+---+------------------+
|     id|age|          pagerank|
+-------+---+------------------+
|  Alice| 34| 0.456789456123...|
|    Bob| 36| 0.321456987456...|
|Charlie| 25| 0.221457896325...|
+-------+---+------------------+
"""

扩展思考

1. Spark on Kubernetes

# spark-submit配置示例
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
spec:
  type: Scala
  mode: cluster
  image: gcr.io/spark-operator/spark:v3.1.1
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
  driver:
    cores: 1
    memory: "512m"
  executor:
    cores: 1
    instances: 2
    memory: "512m"

2. Delta Lake事务支持

# 启用Delta Lake
from delta import DeltaTable

# 写入事务日志
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta_table")

# 版本回滚
deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
deltaTable.restoreToVersion(0)

# Schema演化
data.withColumn("new_col", lit("abc")) \
    .write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/tmp/delta_table")

完整代码集成

# 安装依赖
# pip install pyspark==3.3.0 delta-spark==2.2.0 kafka-python

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from delta.tables import DeltaTable
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# 创建Spark会话
spark = SparkSession.builder \
    .appName("DeltaLakeDemo") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 定义数据模式
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", IntegerType(), True)
])

# 流式数据写入Delta Lake
stream_df = (spark.readStream
             .format("kafka")
             .option("kafka.bootstrap.servers", "localhost:9092")
             .option("subscribe", "raw_data")
             .load()
             .selectExpr("CAST(value AS STRING) as json")
             .select(from_json("json", schema).alias("data"))
             .select("data.*"))

# 使用事务写入
query = (stream_df.writeStream
         .format("delta")
         .outputMode("append")
         .option("checkpointLocation", "/tmp/checkpoint")
         .start("/tmp/delta_lake"))

# 批量分析
delta_table = DeltaTable.forPath(spark, "/tmp/delta_lake")
delta_table.toDF().groupBy("category").count().show()

# 输出示例:
"""
+----------+-----+
|  category|count|
+----------+-----+
|electronics|   45|
|     books|   30|
+----------+-----+
"""

通过本文的学习,您应该能够:

  1. 优化Spark作业的依赖关系和资源分配
  2. 实现流批一体的数据处理管道
  3. 构建实时推荐系统和图分析应用
  4. 掌握云原生部署和数据湖事务管理

http://www.kler.cn/a/578718.html

相关文章:

  • 轻量级TCC框架的实现
  • ZerotTier -- 开源、不限流、实现远程连接的内网穿透工具(window环境)
  • std::vector的模拟实现
  • Python----数据可视化(Seaborn二:绘图一)
  • vue管理系统常规布局思路,头部+菜单+主题(Container 布局容器)
  • 【编译器】VSCODE搭建ESP32-C3
  • C++【类和对象】
  • 第四届大数据、区块链与经济管理国际学术会议
  • Spring使用@Scheduled注解的参数详解
  • 基于ANTLR4的大数据SQL编辑器解析引擎实践|得物技术
  • Redis- 切片集群
  • LEETCODE:二叉树的层序遍历JAVA
  • android viewmodel如何使用
  • 用OpenCV写个视频播放器可还行?(C++版)
  • 靶场(四)---小白心得全流程分析
  • AIP-162 资源修订
  • Docker(认识且会基础操作)
  • yolov5训练自己数据集的全流程+踩过的坑
  • Linux 入门:常用命令速查手册
  • 【 <一> 炼丹初探:JavaWeb 的起源与基础】之 JSP 标签库:自定义标签的开发与应用