当前位置: 首页 > article >正文

Java技术栈 —— Spark入门(二)之实时WordCount

Java技术栈 —— Spark入门(二)

  • 一、kafka
    • 1.1 创建topic
    • 1.2 准备input与查看output
  • 二、spark
    • 2.1 spark下的程序文件
    • 2.2 用spark-submit提交作业

参考文章:

参考文章或视频链接
[1] 《Kafka + Spark Stream实时WordCount》

实验环境:
假设你的用户为root,以下软件安装路径为/opt

软件版本
spark: 3.5.2 (scala 2.12)
kafka: 3.8.0 (scala 2.13)

实验结构图

在这里插入图片描述

一、kafka

1.1 创建topic

# 创建input
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test.wordcount.input --partitions 1 --replication-factor 1
# 创建output
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test.wordcount.output --partitions 1 --replication-factor 1

1.2 准备input与查看output

# 打开两个terminal终端
# 准备键盘输入作为prodcuer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test.wordcount.input
# 在屏幕上查看输出
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test.wordcount.output

二、spark

2.1 spark下的程序文件

# coding=utf-8
# /opt/spark-3.5.2-bin-hadoop3/jobs/pyjobs/kafka-wordcount.py
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F

bootstrapServers = "localhost:9092"

spark = SparkSession\
    .builder\
    .appName("StructuredKafkaWordCount")\
    .getOrCreate()

# 基于来自kafka的数据流,创建dataframe
lines = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", bootstrapServers)\
    .option("subscribe", "test.wordcount.input")\
    .option("failOnDataLoss", False)\
    .option("group.id", "wordcount-group3")\
    .load()\
    .selectExpr("CAST(value AS STRING)")

# 将单行数据拆分,转成多行数据
words = lines.select(
    explode(split(lines.value, ' ')).alias('word')
)

# 对单词进行分组,并计算总数
wordCounts = words.groupBy('word').count()

# 将两列数据合并成单列数据
wordCounts = wordCounts.select(F.concat(F.col("word"), F.lit("|"), F.col("count").cast("string")).alias("value"))

# 测试时,可以不将结果写入kafka,直接输出到控制台
# query = wordCounts \
#     .writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .start()

# 将结果输出到 test.wordcount.output
query = wordCounts \
    .writeStream \
    .format('kafka') \
    .outputMode('update') \
    .option("kafka.bootstrap.servers", bootstrapServers) \
    .option('checkpointLocation', '/spark/job-checkpoint') \
    .option("topic", "test.wordcount.output") \
    .start()

query.awaitTermination()

2.2 用spark-submit提交作业

# 提交Spark作业,这个过程需要保证网络畅通,会将一些依赖下载到/root/.ivy2/jars目录下
$SPARK_HOME/bin/spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,\
org.apache.kafka:kafka-clients:3.5.2 \
/opt/spark-3.5.2-bin-hadoop3/jobs/pyjobs/kafka-wordcount.py

http://www.kler.cn/news/285157.html

相关文章:

  • 基于微信小程序的电动车租赁系统---附源码97332
  • 遇到的BUG及解决方法
  • 【读书笔记-《30天自制操作系统》-12】Day13
  • 监控平台之上报(未完成)
  • Python算法工程师面试整理-Python 编程技巧
  • 使用Ansible stat模块检查目录是否存在
  • 【Docker】Dockerfile实列-Nginx镜像构建
  • 类与ES6类之间的继承
  • 叶斯神经网络(BNN)在训练过程中损失函数不收敛或跳动剧烈可能是由多种因素
  • 全网最适合入门的面向对象编程教程:42 Python常用复合数据类型-collections容器数据类型
  • P02-Java流程控制基本结构
  • codetest
  • Linux下递归设置目标目录及其子目录和文件的权限
  • Qt/C++地址转坐标/坐标转地址/逆地址解析/支持百度高德腾讯和天地图
  • 项目策划书六度自由双足机器人
  • WHAT - 通过 react-use 源码学习 React(Animations 篇)
  • Qt QTableWidget可编辑设置,设置部分可编辑
  • 线性表之静态链表
  • Jenkins发邮件功能如何配置以实现自动化?
  • 推理引擎测试-算力共享:test_inference_engine
  • 力扣68.文本左右对齐
  • 18043 找出3个数中最大的数
  • Datawhale x李宏毅苹果书入门 AI夏令营 task03学习笔记
  • 数据结构——单向链表
  • 五、实现随机地图
  • 【STM32】通用定时器TIM(输出比较)
  • 【sqlite3】MySQL8转sqlite3需要对sql做的一些处理
  • PyCharm 自定义字体大小
  • C++ 有向图算法
  • Tiptap中BubbleMenu讲解