当前位置: 首页 > article >正文

StructuredStreaming (二)——kafka

将数据从kafka的某个主题中抽取出来,再将数据放入另一个主题中

一、导入jar包

通过百度网盘分享的文件:python-kafka
链接:https://pan.baidu.com/s/1q2UlOJFBNNuhRC87AlAACg?pwd=c6dy 
提取码:c6dy

 将这六个jar包放入本地pyspark中jars下

二、案例编写

         首先需要生成数据,将数据源源不断的导入topicA中,再使用spark读取kafka中的数据,然后将清洗过的数据导入etlTopic中

import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pyspark.sql.functions as F

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'

    # 创建一个sparkSession对象
    spark = SparkSession.builder.appName("kafkaDemo").getOrCreate()

    kafkaDf = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "bigdata01:9092") \
        .option("subscribe", "topicA") \
        .option("startingOffsets", "latest") \
        .load()
    # 这个就是为了 筛出想要的数据
    dataDf = kafkaDf.selectExpr("CAST(value AS STRING)")
    dataDf.createOrReplaceTempView("tmp")
    etlDf = spark.sql("""
    select * from tmp where value like "%success%"
    """)

    etlDf.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "bigdata01:9092") \
        .option("topic", "etlTopic") \
        .option("checkpointLocation", "../../resources/ckp") \
        .start().awaitTermination()

    spark.stop()


http://www.kler.cn/a/399996.html

相关文章:

  • gitlab 服务器集群配置及 存储扩展配置
  • 7.揭秘C语言输入输出内幕:printf与scanf的深度剖析
  • STM32单片机设计防儿童人员误锁/滞留车内警报系统
  • MongoDB在现代Web开发中的应用
  • shell脚本配置nginx
  • 哈希表学习分享
  • Docker: ubuntu系统下Docker的安装
  • 免费开源!DBdoctor推出开源版系统诊断工具systool
  • Android屏幕横竖屏切换和生命周期
  • Hadoop 3.x 新特性详解
  • 【数据分享】中国渔业统计年鉴(1979-2024) pdf
  • GaussDB性能调优
  • 机器学习—偏差或方差与神经网络
  • 基于 AI 智能名片 2 + 1 链动模式商城小程序的立体连接营销策略研究
  • 问:Spring MVC DispatcherServlet流程步骤梳理
  • go 学习网站,go例子 go demo go学习视频
  • 基于Canny边缘检测和轮廓检测
  • 版本控制【Git Bash】【Gitee】
  • Django5 2024全栈开发指南(一):框架简介、环境搭建与项目结构
  • 浅谈数据仓库的架构及其演变
  • C++中的观察者模式:通俗易懂的讲解与实现
  • 113页PPT制造业研发工艺协同及制造一体化
  • 四十、Python(pytest框架-下)
  • github进不去解决办法-误打误撞进去了
  • Redis GEO 功能解析
  • Spring Cloud Ribbon 实现“负载均衡”的详细配置说明