当前位置: 首页 > article >正文

记录使用Spark计算订单明细表销量排行榜的实现

一、引入相关依赖(版本很重要)

<!--以下版本经过测试验证-->
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.12</artifactId>
	<version>3.5.3</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.12</artifactId>
	<version>3.5.3</version>
</dependency>
<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.58.Final</version>
</dependency>
<dependency>
	<groupId>org.codehaus.janino</groupId>
	<artifactId>janino</artifactId>
	<version>3.1.6</version>
</dependency>

二、实现Spark编码

  • ProductRanking.java
public class ProductRanking {
    private Long skuId;
    private String goodsName;
    private Double totalQuantity;
    private Double totalSales;

    public ProductRanking(Long skuId, String goodsName, Double totalQuantity, Double totalSales) {
        this.skuId = skuId;
        this.goodsName = goodsName;
        this.totalQuantity = totalQuantity;
        this.totalSales = totalSales;
    }

    public Long getSkuId() {
        return skuId;
    }

    public void setSkuId(Long skuId) {
        this.skuId = skuId;
    }

    public String getGoodsName() {
        return goodsName;
    }

    public void setGoodsName(String goodsName) {
        this.goodsName = goodsName;
    }

    public Double getTotalQuantity() {
        return totalQuantity;
    }

    public void setTotalQuantity(Double totalQuantity) {
        this.totalQuantity = totalQuantity;
    }

    public Double getTotalSales() {
        return totalSales;
    }

    public void setTotalSales(Double totalSales) {
        this.totalSales = totalSales;
    }
}
  • SparkJob.java
public class SparkJob {
    public List<ProductRanking> getRanking(){
        // 初始化 SparkSession
        SparkSession spark = SparkSession.builder()
                .appName("Order Item Ranking")
                .master("local[*]")  // 使用本地模式
                .getOrCreate();

        // MySQL JDBC 连接配置
        String jdbcUrl = "jdbc:mysql://localhost:3306/atest?useUnicode=true&characterEncoding=utf8&useSSL=false";
        Properties jdbcProps = new Properties();
        jdbcProps.put("user", "root");
        jdbcProps.put("password", "123456");
        jdbcProps.put("driver", "com.mysql.jdbc.Driver");

        // 设置查询条件:假设要查询 2023 年 1 月 1 日之后的订单
        String query = "(SELECT * FROM nasi_mts_trade_order_item ) AS order_item_filtered";

        // 从 MySQL 中读取符合条件的 order_item 表数据
        Dataset<Row> orderItemDF = spark.read()
                .jdbc(jdbcUrl, query, jdbcProps);

        // 显示数据的前几行,检查数据是否加载正确
//        orderItemDF.show();

        // 统计每个商品的销量
        Dataset<Row> productRankingDF = orderItemDF.groupBy("sku_id", "goods_name")
                .agg(
                        functions.sum("net_weight").alias("total_quantity"),
                        functions.sum(functions.col("net_weight").multiply(functions.col("goods_price"))).alias("total_sales")
                )
                .orderBy(functions.col("total_quantity").desc())  // 按销量降序排序
                .limit(10);  // 获取销量前10名

        // 显示商品排名
//        productRankingDF.show();

        // 转换为 JSON 并映射为 Java 对象
        List<Row> rows = productRankingDF.collectAsList();

        // 将每一行映射为 com.cnnasi.erp.mts.spark.ProductRanking 对象
        List<ProductRanking> ra = rows.stream().map(row -> new ProductRanking(
                row.getAs("sku_id"),
                row.getAs("goods_name"),
                row.getAs("total_quantity"),
                row.getAs("total_sales")
        )).collect(Collectors.toList());
        log.info(JSONObject.toJSONString(ra));
        // 停止 SparkSession
        spark.stop();
        return ra;
    }
}

http://www.kler.cn/a/420468.html

相关文章:

  • rpm包转deb包或deb包转rpm包
  • 【MyBatis】验证多级缓存及 Cache Aside 模式的应用
  • 论文导读 I RAFT:使语言模型适应特定领域的RAG
  • 【分页查询】.NET开源 ORM 框架 SqlSugar 系列
  • 《以 C++为笔,绘就手势识别人机交互新画卷》
  • 图像与文字的创意融合:使用Python进行视觉艺术创作
  • 流量特征分析
  • 【娱乐项目】竖式算术器
  • IDEA使用HotSwapHelper进行热部署
  • Docker Stack简介及使用
  • 近几年,GIS专业的五类就业方向!
  • vue2组件跨层级数据共享provide 和 inject
  • Unity类银河战士恶魔城学习总结(P156 Audio Settings音频设置)
  • 聚观早报 | 戴尔发布第三财季财报;REDMI K80屏幕细节
  • Android 车载虚拟化底层技术-Kernel4.19-Android10(双card)技术实现
  • 瀚高创库建表pgsql
  • linux的挂卸载
  • 【docker】多阶段构建与单阶段构建
  • leetcode每日一题(20241202)
  • BERT的中文问答系统36-2
  • Java GET请求 请求参数在Body中使用Json格式传参
  • 简单介绍下 VitePress 中的 vp-doc 和 vp-raw
  • Git:分支管理
  • 面试小札:JVM虚拟机
  • LeetCode Hot100 21~30
  • 数仓3.0与大模型(如大型语言模型和其他深度学习模型)之间的关系