一、引入相关依赖(版本很重要)
<!--以下版本经过测试验证-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.58.Final</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.1.6</version>
</dependency>
二、实现Spark编码
public class ProductRanking {
private Long skuId;
private String goodsName;
private Double totalQuantity;
private Double totalSales;
public ProductRanking(Long skuId, String goodsName, Double totalQuantity, Double totalSales) {
this.skuId = skuId;
this.goodsName = goodsName;
this.totalQuantity = totalQuantity;
this.totalSales = totalSales;
}
public Long getSkuId() {
return skuId;
}
public void setSkuId(Long skuId) {
this.skuId = skuId;
}
public String getGoodsName() {
return goodsName;
}
public void setGoodsName(String goodsName) {
this.goodsName = goodsName;
}
public Double getTotalQuantity() {
return totalQuantity;
}
public void setTotalQuantity(Double totalQuantity) {
this.totalQuantity = totalQuantity;
}
public Double getTotalSales() {
return totalSales;
}
public void setTotalSales(Double totalSales) {
this.totalSales = totalSales;
}
}
public class SparkJob {
public List<ProductRanking> getRanking(){
// 初始化 SparkSession
SparkSession spark = SparkSession.builder()
.appName("Order Item Ranking")
.master("local[*]") // 使用本地模式
.getOrCreate();
// MySQL JDBC 连接配置
String jdbcUrl = "jdbc:mysql://localhost:3306/atest?useUnicode=true&characterEncoding=utf8&useSSL=false";
Properties jdbcProps = new Properties();
jdbcProps.put("user", "root");
jdbcProps.put("password", "123456");
jdbcProps.put("driver", "com.mysql.jdbc.Driver");
// 设置查询条件:假设要查询 2023 年 1 月 1 日之后的订单
String query = "(SELECT * FROM nasi_mts_trade_order_item ) AS order_item_filtered";
// 从 MySQL 中读取符合条件的 order_item 表数据
Dataset<Row> orderItemDF = spark.read()
.jdbc(jdbcUrl, query, jdbcProps);
// 显示数据的前几行,检查数据是否加载正确
// orderItemDF.show();
// 统计每个商品的销量
Dataset<Row> productRankingDF = orderItemDF.groupBy("sku_id", "goods_name")
.agg(
functions.sum("net_weight").alias("total_quantity"),
functions.sum(functions.col("net_weight").multiply(functions.col("goods_price"))).alias("total_sales")
)
.orderBy(functions.col("total_quantity").desc()) // 按销量降序排序
.limit(10); // 获取销量前10名
// 显示商品排名
// productRankingDF.show();
// 转换为 JSON 并映射为 Java 对象
List<Row> rows = productRankingDF.collectAsList();
// 将每一行映射为 com.cnnasi.erp.mts.spark.ProductRanking 对象
List<ProductRanking> ra = rows.stream().map(row -> new ProductRanking(
row.getAs("sku_id"),
row.getAs("goods_name"),
row.getAs("total_quantity"),
row.getAs("total_sales")
)).collect(Collectors.toList());
log.info(JSONObject.toJSONString(ra));
// 停止 SparkSession
spark.stop();
return ra;
}
}