StructuredStreaming (一)
一、sparkStreaming的不足
1.基于微批,延迟高不能做到真正的实时
2.DStream基于RDD,不直接支持SQL
3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)
4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件,一个是事件处理的时间)
5.数据的Exactly-Once(恰好一次语义)需要手动实现
二、StructuredStreaming 的介绍
1、2016年Spark2.0版本中发布
2、基于SparkSQL引擎的可扩展、容错的全新的流处理引擎。
3、并不是对Spark Streaming的简单改进,而是重新开发的全新流式引擎
准实时技术:来一批处理一批 实时:来一条处理一条 离线:一般都是处理一些静止的数据
三、socket+console
1、在虚拟机中下载nc
yum install -y nc
2、启动
nc -lk 9999
案例:wordcount
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
if __name__ == '__main__':
os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
# 创建一个sparkSession对象
spark = SparkSession.builder.appName("socketDemo").getOrCreate()
socketDf = spark.readStream.format("socket") \
.option("host", "bigdata01") \
.option("port", 9999) \
.load()
# 处理
# 方式一:使用dsl语法
splitDf = socketDf.select(explode(F.split(socketDf.value, " ")).alias("word"))
resultDf1 = splitDf.groupBy("word").count()
# 方式二:使用sql
socketDf.createOrReplaceTempView("wordcount")
resultDf2 = spark.sql("""
with t1 as(
select num from wordcount lateral view explode(split(value," ")) c as num
)select num,count(*) counts from t1 group by num;
""")
# 下面的就是sink的写法 后续会写
query1 = resultDf1.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query2 = resultDf2.writeStream \
.outputMode("complete") \
.format("console") \
.start() \
.awaitTermination()
spark.stop()
四、file+console
文件中的数据:
1;yuwen;43
1;shuxue;55
2;yuwen;77
2;shuxue;88
3;yuwen;98
3;shuxue;65
3;yingyu;88
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, DoubleType, LongType, IntegerType, StructType
if __name__ == '__main__':
os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
# 创建一个sparkSession对象
spark = SparkSession.builder.appName("socketDemo").getOrCreate()
# score_schema = StructType([
# StructField(name="stu_id", dataType=IntegerType(), nullable=False),
# StructField(name="subject_name", dataType=StringType(), nullable=True),
# StructField(name="score", dataType=DoubleType(), nullable=True)
# ])
score_schema = StructType().add("stu_id", IntegerType()).add("subject_name", StringType()).add("score",DoubleType())
socketDf = spark.readStream.format("csv") \
.option("sep", ";") \
.schema(score_schema) \
.load("../../resources/input1")
socketDf.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.start() \
.awaitTermination()
spark.stop()