当前位置: 首页 > article >正文

Pyspark_结构化流2

Pyspark

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Pyspark_结构化流2
#博学谷IT学习技术支持


文章目录

  • Pyspark
  • 前言
  • 一、输出模式
    • 1.Append 模式
    • 2.Complete模式
    • 3.Update模式
  • 二、输出位置
    • 1.File Sink
    • 2.Foreach Sink
    • 3.ForeachBatch Sink
    • 4.Memory Sink
  • 总结


前言

接下次继续Pyspark_结构化流,今天是Sink(输出)操作。


一、输出模式

在结构化流中定义好df 或者 处理好df后, 调用 writeStream 完成数据写出操作, 在写出的过程中, 同样也可以设置一些相关的属性, 启动流式应用运行
在这里插入图片描述
输出模式:在进行输出的时候, 必须通过outPutMode来设置输出方案, 输出模式共提供三种输出模式

  • 1- append 模式: 增量模式
    • 指的: 当流处理管道中, 有了新的数据后,才会触发输出, 而且Append模式仅支持追加, 不支持聚合操作, 如果执行聚合会直接报错,同时不支持排序操作. 如果存在排序, 也会直接报错
  • 2- complete模式: 完全(全量)模式
    • 指的: 每一次都是针对整个所有批次数据进行处理, 由于数据会越来越多, 要求必须对数据进行聚合操作, 否则会直接报错
  • 3- update模式: 更新模式
    • 指的: 当处理模式中没有聚合操作的时候, 此模式与append基本是一致的, 但是如果有了聚合操作, 仅输出变更和新增的数据, 但是不支持排序操作

1.Append 模式

说明: 当有了聚合计算操作后, 会直接报出错误
说明: 在执行排序操作后, 会直接报出错误

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    print("spark streaming append")

    spark = SparkSession.builder.appName("spark streaming append").master("local[*]") \
        .config('spark.sql.shuffle.partitions', 4) \
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df = df.withColumn("words", F.explode(F.split("value", " "))).select("words")

    df.writeStream.format("console").outputMode("append").start().awaitTermination()

2.Complete模式

说明: 当没有聚合操作的时候, 会报出错误
说明: 完全模式下, 支持对数据进行排序操作

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    print("spark streaming complete")

    spark = SparkSession.builder.appName("spark streaming complete").master("local[*]") \
        .config('spark.sql.shuffle.partitions', 4) \
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df = df.withColumn("words", F.explode(F.split("value", " "))).groupBy("words").agg(
        F.count("words").alias("word_count")
    ).orderBy("word_count", ascending=False)

    df.writeStream.format("console").outputMode("complete").start().awaitTermination()

3.Update模式

说明: 当对数据进行排序操作的时候, 会直接报出错误

from pyspark.sql import SparkSession
import pyspark.sql.functions as F


if __name__ == '__main__':
    print("spark streaming update")

    spark = SparkSession.builder.appName("spark streaming update").master("local[*]") \
        .config('spark.sql.shuffle.partitions', 4) \
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df = df.withColumn("words", F.explode(F.split("value", " "))).select("words").groupBy("words").agg(
        F.count("words").alias("word_count")
    )

    df.writeStream.format("console").outputMode("update").start().awaitTermination()

二、输出位置

默认情况下, Spark的结构化流支持多种输出的方案:

1- console sink: 将结果输出到控制台, 主要是应用测试中 支持三种方式

2- File Sink: 文件接收器, 将结果输出到某个目录下, 形成文件数据, 仅支持追加方式

3- foreach Sink 或 froeachBatch Sink : 将数据进行遍历处理, 遍历后输出到任何位置 三种方式均支持

4- MemMory Sink: 将结果输出到内存中, 主要目的是进行再次迭代计算, 数据集不能过大, 仅支持append 和 complete模式

5- Kafka sink 将结果输出到Kafka 类似是Kafka的生产者 三种模式都支持

1.File Sink

文件输出方案:
一个批次对应一个文件, 有多少个批次, 就会产生多少个文件

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window as win

if __name__ == '__main__':
    print("spark file sink")

    spark = SparkSession.builder.appName("spark file sink").master("local[*]") \
        .config('spark.sql.shuffle.partitions', 4) \
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df = df.withColumn("words", F.explode(F.split("value", " ")))

    df.writeStream.format("csv").outputMode("append") \
        .option("header", True).option("sep", "\001") \
        .option("checkpointLocation", "hdfs://node1:8020/structuredStreaming/chk") \
        .start("hdfs://node1:8020/structuredStreaming/output1").awaitTermination()

可以通过设置触发器, 调整每一批次产生间隔时间

from pyspark.sql import SparkSession

if __name__ == '__main__':
    print("spark streaming trigger")

    spark = SparkSession.builder.appName("spark streaming trigger").master("local[*]") \
        .config('spark.sql.shuffle.partitions', 4) \
        .getOrCreate()

    df = spark.readStream \
        .format('rate') \
        .option('rowsPerSecond', 1) \
        .option('rampUpTime', 0) \
        .option('numPartitions', 1) \
        .load()

    df.writeStream.format("csv") \
        .outputMode("append") \
        .option("header", True).option("sep", "\001") \
        .option("checkpointLocation", "hdfs://node1:8020/structuredStreaming/chk") \
        .trigger(processingTime="5 seconds") \
        .start("hdfs://node1:8020/structuredStreaming/output1")

    df.writeStream.format("console") \
        .outputMode("append") \
        .trigger(processingTime="5 seconds") \
        .start().awaitTermination()

2.Foreach Sink

foreach sink: 对输出的数据, 一个个进行处理操作
方法一:通过process_row函数方式

from pyspark.sql import SparkSession

def process_row(row):
    print(f"{row.name},{row.address}")


if __name__ == '__main__':
    print("spark streaming foreach")

    spark = SparkSession.builder.appName("spark streaming foreach").master("local[*]") \
        .config('spark.sql.shuffle.partitions', 4) \
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df.createTempView("t1")

    df_res = spark.sql("""
        select
            split(value,"-")[0] as name,
            split(value,"-")[1] as address
        from t1
    """)

    df_res.writeStream.foreach(process_row).outputMode("append").trigger(
        processingTime="10 seconds").start().awaitTermination()

方法二:通过自定义ForeachWriter类的方式

from pyspark.sql import SparkSession

class ForeachWriter(object):
    def open(self, partition_id, epoch_id):
        print(f'open方法正在执行, 正在初始化; {partition_id},{epoch_id}')
        return True

    def process(self, row):
        print(f"{row.name},{row.address}")

    def stop(self):
        print('释放资源')


if __name__ == '__main__':
    print("spark streaming foreach")

    spark = SparkSession.builder.appName("spark streaming foreach").master("local[1]") \
        .config('spark.sql.shuffle.partitions', 4) \
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df.createTempView("t1")

    df_res = spark.sql("""
        select
            split(value,"-")[0] as name,
            split(value,"-")[1] as address
        from t1
    """)

    df_res.writeStream.foreach(ForeachWriter()).outputMode("append").trigger(
        processingTime="10 seconds").start().awaitTermination()

3.ForeachBatch Sink

对输出的数据, 进行一批一批的处理操作

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

def foreachBatch(df, batch_id):
    print(f"第{batch_id}批数据")
    df = df.withColumn("new_address", F.concat("name", "address"))
    df.show()

if __name__ == '__main__':
    print("spark streaming foreachBatch")

    spark = SparkSession.builder.appName("spark streaming foreachBatch").master("local[*]") \
        .config('spark.sql.shuffle.partitions', 4) \
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df.createTempView("t1")

    df_res = spark.sql("""
        select
            split(value,"-")[0] as name,
            split(value,"-")[1] as address
        from t1
    """)

    df_res.writeStream.foreachBatch(foreachBatch).outputMode("append").trigger(
        processingTime="10 seconds").start().awaitTermination()

4.Memory Sink

内存输出 基于内存的数据进行二次迭代计算

import time
from pyspark.sql import SparkSession

if __name__ == '__main__':
    print("spark streaming memory sink")

    # 1- 创建SparkSession镀锡
    spark = SparkSession.builder.appName("spark streaming memory sink").master("local[*]") \
        .config('spark.sql.shuffle.partitions', 4) \
        .getOrCreate()

    # 2- 读取外部数据源: 监听某一个端口号
    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df.createTempView('t1')

    # 3- 处理数据
    df_res = spark.sql("""
        select
            split(value,'-')[0] as name,
            split(value,'-')[1] as address
        from t1
    """)

    df_res.writeStream.format("memory").queryName("t2").outputMode("append").trigger(
        processingTime="10 seconds").start()

    while True:
        spark.sql("""
            select * from t2
        """).show()
        time.sleep(5)


总结

今天主要和大家分享了输出操作,包括输出模式和输出位置。


http://www.kler.cn/a/5479.html

相关文章:

  • docker学习记录-部署若依springcloud项目
  • Vben5登录过期无法再次登录问题,http状态码
  • 强化学习方法分类详解
  • 【PyCharm】如何把本地整个项目同步到服务器?
  • 计算机网络 (8)物理层的传输方式
  • 【2024年-9月-21日-开源社区openEuler实践记录】PilotGo:简化运维管理的开源利器
  • 数据的存储--->【大小端字节序】(Big Endian)(Little Endian)
  • Python —— 给女儿写个雷霆战机
  • 命令行上的数据科学第二版:十、多语言数据科学
  • Leetcode.1849 将字符串拆分为递减的连续值
  • 第二十八章 变换坐标总结
  • C++模板基础(四)
  • 有了Bug,先看看类型
  • Activation Function激活函数
  • 三极管用作开关电路的一些思考
  • 跟着AI学AI(1): 线性回归模型
  • 如何使用Spring+OpenAI生成图像
  • 多传感器融合定位GNSS、IMU、Lidar、Camera
  • 06 Laplacian算法
  • HTML5 SSE
  • 数据结构和算法(3):递归
  • 程序员万万不能去的3种公司,越做越倒退,过来人的经验
  • VerilogHDL基本语法和程序
  • PCB模块化设计24——DCDC电源模块PCB布局布线设计规范
  • python Format()函数的用法___实例详解(一)(全,例多)___各种格式化替换,format对齐打印
  • WTI原油交易价格(1986年1⽉2⽇-2022年9⽉6⽇)