当前位置: 首页 > article >正文

Spark Streaming的核心功能及其示例PySpark代码

Spark Streaming是Apache Spark中用于实时流数据处理的模块。以下是一些常见功能的实用PySpark代码示例:

  1. 基础流处理:从TCP套接字读取数据并统计单词数量
from pyspark import `SparkContext
from pyspark.streaming import StreamingContext

# 创建SparkContext和StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)  # 1秒的批处理间隔

# 创建一个DStream,从TCP源读取数据
lines = ssc.socketTextStream("localhost", 9999)

# 对每一行数据进行分词,映射为(word, 1)的键值对,然后按单词统计数量
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# 打印每个RDD中的前10个元素
word_counts.pprint()

# 启动流计算
ssc.start()
# 等待流计算结束
ssc.awaitTermination()

在上述代码中:

  • sc 是 SparkContext ,用于与Spark集群交互。
  • ssc 是 StreamingContext ,定义了批处理间隔。
  • lines 是一个 DStream ,从指定的TCP套接字读取数据。
  • words 对每行数据进行分词, word_counts 统计每个单词出现的次数。
  • pprint 方法打印每个批次的前10个元素。
  1. 使用窗口函数
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "WindowedWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))

# 使用窗口函数,窗口大小为3秒,滑动间隔为1秒
windowed_word_counts = word_counts.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 3, 1)

windowed_word_counts.pprint()

ssc.start()
ssc.awaitTermination()

在这个示例中:

  • reduceByKeyAndWindow 方法用于在窗口上进行聚合操作。
  • 第一个参数是用于合并窗口内元素的函数,第二个参数是用于移除窗口外元素的函数。
  1. 状态更新
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "StatefulWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")  # 启用检查点

def updateFunction(new_values, running_count):
    if running_count is None:
        running_count = 0
    return sum(new_values, running_count)

lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))

# 使用updateStateByKey进行状态更新
stateful_word_counts = word_counts.updateStateByKey(updateFunction)

stateful_word_counts.pprint()

ssc.start()
ssc.awaitTermination()

在上述代码中:

  • updateStateByKey 方法用于维护每个键的状态。
  • updateFunction 定义了如何根据新值和现有状态更新状态。
  1. 与Kafka集成
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext("local[2]", "KafkaWordCount")
ssc = StreamingContext(sc, 1)

# Kafka参数
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topics = ["test"]

# 创建Kafka输入DStream
kvs = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)
lines = kvs.map(lambda x: x[1])

words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

word_counts.pprint()

ssc.start()
ssc.awaitTermination()

在这个示例中:

  • KafkaUtils.createDirectStream 用于从Kafka主题读取数据。
  • kvs 是一个包含Kafka消息的DStream, lines 提取消息内容。

http://www.kler.cn/a/509243.html

相关文章:

  • Linux——线程的慨念及控制
  • Oracle数据库传统审计怎么用
  • 网络安全构成要素
  • 基于高光谱数据的叶片水分估测方法研究 【Matlab Python Origin】
  • 用c语言实现哈希表:03_hashing (哈希函数设计)
  • Flask学习入门笔记
  • 电梯系统的UML文档05
  • java根据模板导出word,并在word中插入echarts相关统计图片以及表格
  • 爱德华EDWARDS EPX180EPX500 Vacuum Pumps instruction Manual
  • 基于 Java Spring Boot 开发的项目复盘与技术总结
  • Spring Boot应用关闭分析
  • Linux中的nc命令是网络工具中的一种,用于进行网络连接和数据传输。下面是nc命令的使用方法和实例:
  • 2.使用Spring BootSpring AI快速构建AI应用程序
  • PHP企业IM客服系统
  • 鸿蒙学习构建视图的基本语法(二)
  • Jenkins下载安装
  • 利用 LNMP 实现 WordPress 站点搭建
  • qt自定义加载字体库ttf
  • Android 项目依赖冲突问题:Duplicate class found in modules
  • 阿里云 Serverless 助力盟主直播:高并发下的稳定性和成本优化