当前位置: 首页 > article >正文

用SparkSQL和PySpark完成按时间字段顺序将字符串字段中的值组合在一起分组显示

用SparkSQL和PySpark完成以下数据转换。
源数据:
userid,page_name,visit_time
1,A,2021-2-1
2,B,2024-1-1
1,C,2020-5-4
2,D,2028-9-1

目的数据:
user_id,page_name_path
1,C->A
2,B->D

PySpark:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 初始化SparkSession(如果在已有环境中可以直接使用已有的spark对象)
spark = SparkSession.builder.appName("DataTransformation").getOrCreate()

# 创建示例数据的DataFrame
data = [
    (1, "A", "2021-2-1"),
    (2, "B", "2024-1-1"),
    (1, "C", "2020-5-4"),
    (2, "D", "2028-9-1")
]
columns = ["userid", "page_name", "visit_time"]
df = spark.createDataFrame(data, columns)

# 将visit_time转换为日期类型,方便后续排序
df = df.withColumn("visit_time", F.to_date(F.col("visit_time")))

# 按照userid分区,根据visit_time排序创建窗口
window_spec = Window.partitionBy("userid").orderBy("visit_time")

# 使用collect_list函数收集每个userid对应的page_name列表,然后使用concat_ws函数将其拼接为指定格式
result_df = df.withColumn("page_name_list", F.collect_list("page_name").over(window_spec)) \
             .groupBy("userid") \
             .agg(F.concat_ws("->", F.col("page_name_list")).alias("page_name_path")) \
             .select("userid", "page_name_path")

# 重命名userid列为user_id(和目标数据列名一致)
result_df = result_df.withColumnRenamed("userid", "user_id")

# 展示结果
result_df.show()

SparkSQL:

SELECT userid AS user_id,
       CONCAT_WS('->', collect_list(page_name) OVER (PARTITION BY userid ORDER BY visit_time)) AS page_name_path
FROM page_visits
GROUP BY userid

http://www.kler.cn/a/446473.html

相关文章:

  • 彻底认识和理解探索分布式网络编程中的SSL安全通信机制
  • python 曲线拟合,曲线拟合交点
  • 音频接口:PDM TDM128 TDM256
  • C语言进阶(2) ---- 指针的进阶
  • 2024年合肥师范学院信息安全小组内部选拔赛(c211)WP
  • 服务器防火墙设置某个端口号只允许固定 ip地址访问
  • mac 安装graalvm
  • 【Http,Netty,Socket,WebSocket的应用场景和区别】
  • CESS 出席华盛顿区块链政策峰会:参与国家安全与数据隐私保护专题讨论
  • BOE(京东方)“向新2025”年终媒体智享会首站落地上海 六大维度创新开启产业发展新篇章
  • 【HTML】DOCTYPE的作用?
  • SAP RESTful架构和OData协议
  • 微信小程序之今日热搜新闻
  • 【论文速读】| FirmRCA:面向 ARM 嵌入式固件的后模糊测试分析,并实现高效的基于事件的故障定位
  • 推送本地仓库到远程git仓库
  • 问题解决:发现Excel中的部分内容有问题。是否让我们尽量尝试恢复? 如果您信任此工作簿的源,请单击“是”。
  • 解析基于 SSM 框架 Vue 电脑测评系统:把握电脑测评精髓
  • Dash:数据可视化的未来之星
  • 【从零开始入门unity游戏开发之——C#篇23】C#面向对象继承——`as`类型转化和`is`类型检查、向上转型和向下转型、里氏替换原则(LSP)
  • 用bootstrap搭建侧边栏
  • uniapp新建项目hello,什么都没干提示应用未关联服务空间,请在uniCloud目录右键关联服务空间
  • Linux中部署项目
  • 精准采集整车信号:风丘混合动力汽车工况测试
  • Solon 集成 activemq-client
  • Next.js v15- Metadata
  • vimdc