day16_推荐系统和总结
文章目录
- day16_推荐系统和总结
- 一、推荐实现
- 1、基于流行度推荐(掌握)
- 1.1 近期热门商品推荐
- 1.2 个人热门商品推荐
- 2、基于隐语义模型的协同过滤推荐(了解)
- 2.1 ALS算法介绍
- 2.2 推荐代码
- 3、基于物品的协同过滤推荐(了解)
- 4、基于用户的协同过滤推荐(了解)
- 5、基于关联规则的推荐(熟悉)
- 5.1 关联规则详解
- 5.2 FP-growth算法理解
- 5.3 SparkMLlib中的FP-growth算法
- 5.4 完整代码
- 6、服务部署(了解)
day16_推荐系统和总结
一、推荐实现
推荐系统一般是由Java后端与前端人员进行开发的,大数据开发人员比较少参与主要是提供数据。
为了实现推荐功能,需要启动Hadoop、Hive、ES、Doris、SparkSubmit
启动Hadoop、启动Hive
cd /
./up01.sh start
启动ES
1- 切换用户
su es
2- 进入目录
cd /home/es/elasticsearch-7.10.2/bin
3- 启动
elasticsearch -d
4- 退出es用户
exit
启动Doris
/export/server/doris/fe/bin/start_fe.sh --daemon
/export/server/doris/be/bin/start_be.sh --daemon
/export/server/doris/apache_hdfs_broker/bin/start_broker.sh --daemon
启动SparkSubmit
cd /export/server/spark/sbin
./start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10001 \
--hiveconf hive.server2.thrift.bind.host=up01 \
--hiveconf spark.sql.warehouse.dir=hdfs://up01:8020/user/hive/warehouse \
--master local[*]
1、基于流行度推荐(掌握)
基于流行度推荐,也就是基于统计的推荐,主要用来解决用户的冷启动问题,对于新用户首次登录。基于流行度的推荐也可以用于单独的热门商品模块。
1.1 近期热门商品推荐
可以按商品销售的单量进行倒序排序,然后存入Doirs中。表中可以多存入一些数据,在使用时,根据品类进行倒序查询,取到相关商品即可。
- 计算的sql如下
-- 近期热门商品推荐
select
current_date() as recommend_date, -- 推荐时间,区别是什么时候推荐的
third_category_no,
third_category_name,
goods_no,
goods_name,
count(order_no) as order_count -- 订单量
from dwm.dwm_sold_goods_sold_dtl_i
where
-- 过滤最近一段时间内的销售数据
datediff(current_date(),to_date(trade_date))<=40 and goods_no is not null
group by third_category_no,third_category_name,goods_no,goods_name
order by order_count desc
limit 300 -- 推荐比项目经理要求的推荐数目多一些
- Doris的建表语句如下
create database if not exists recommend_db;
CREATE TABLE IF NOT EXISTS recommend_db.popular_hot_goods (
recommend_date DATE comment '计算日期',
goods_no bigint comment '商品编码',
third_category_no STRING comment '三级品类编码',
third_category_name STRING comment '三级品类名称',
goods_name STRING comment '商品名称',
order_count INT comment '销售数量'
)
UNIQUE KEY(recommend_date, goods_no)
comment '热门商品推荐'
PARTITION BY RANGE(recommend_date) ()
DISTRIBUTED BY HASH(goods_no) BUCKETS 1
sql (
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-365",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"replication_allocation" = "tag.location.default: 1"
);
注意:
1- 如果多个字段作为UNIQUE KEY,那么String类型不能够使用。因此这里将goods_no的类型进行了强制转换。为了将不同日期的数据分开存放,这里使用动态分区表。
2- 建表中的字段顺序需要与UNIQUE KEY中字段顺序保持一致。并且UNIQUE KEY中的字段要放在最上面。如果不遵守会报如下的错:
- 推荐代码
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.appName("topn_goods_recommend")\
.master("local[*]") \
.config("spark.sql.warehouse.dir", "hdfs://up01:8020/user/hive/warehouse") \
.config("hive.metastore.uris", "thrift://up01:9083") \
.config("spark.sql.shuffle.partitions",2)\
.enableHiveSupport() \
.getOrCreate()
# 2- 读取分析Hive中的数据,获取TOPN的热门商品
topn_goods_df = spark.sql("""
select
current_date() as recommend_date, -- 推荐时间,区别是什么时候推荐的
third_category_no,
third_category_name,
goods_no,
goods_name,
count(order_no) as order_count -- 订单量
from dwm.dwm_sold_goods_sold_dtl_i
where
-- 过滤最近一段时间内的销售数据
datediff(current_date(),to_date(trade_date))<=40 and goods_no is not null
group by third_category_no,third_category_name,goods_no,goods_name
order by order_count desc
limit 300 -- 推荐比项目经理要求的推荐数目多一些
""")
# 3- 数据存储到Doris中
topn_goods_df.write.jdbc(
url="jdbc:mysql://192.168.88.166:9030/recommend_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",
table="popular_hot_goods",
mode="append",
sql={ 'user' : 'root', 'password' : '123456' }
)
# 4- 释放资源
spark.stop()
1.2 个人热门商品推荐
也就是根据销售单量统计每个人喜欢购买的前N个商品
- 计算的sql如下
select
current_date() as recommend_date, -- 推荐时间,区别是什么时候推荐的
zt_id as user_id,
goods_no,
goods_name,
third_category_no,
third_category_name,
order_count
from (
select
*,
row_number() over(partition by zt_id order by order_count desc) as rn
from (
select
zt_id,
third_category_no,
third_category_name,
goods_no,
goods_name,
count(order_no) as order_count -- 订单量
from dwm.dwm_sold_goods_sold_dtl_i
where
-- 过滤最近一段时间内的销售数据
datediff(current_date(),to_date(trade_date))<=40 and goods_no is not null
and zt_id is not null and zt_id!=0
group by zt_id,third_category_no,third_category_name,goods_no,goods_name
) tmp_1
) tmp_2 where rn<=20
- Doris建表语句
CREATE TABLE IF NOT EXISTS recommend_db.popular_person_hot_goods (
recommend_date DATE comment '计算日期',
user_id INT comment '会员ID',
goods_no STRING comment '商品编码',
goods_name STRING comment '商品名称',
third_category_no STRING comment '三级品类编码',
third_category_name STRING comment '三级品类名称',
order_count INT comment '订单数'
)
UNIQUE KEY(recommend_date, user_id)
comment '个人热门商品推荐'
PARTITION BY RANGE(recommend_date) ()
DISTRIBUTED BY HASH(user_id) BUCKETS 1
sql (
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-365",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"replication_allocation" = "tag.location.default: 1"
);
- 推荐代码
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.appName("topn_user_goods_recommend")\
.master("local[*]") \
.config("spark.sql.warehouse.dir", "hdfs://up01:8020/user/hive/warehouse") \
.config("hive.metastore.uris", "thrift://up01:9083") \
.config("spark.sql.shuffle.partitions",8)\
.enableHiveSupport() \
.getOrCreate()
# 2- 读取分析Hive中的数据,获取TOPN的热门商品
topn_goods_df = spark.sql("""
select
current_date() as recommend_date, -- 推荐时间,区别是什么时候推荐的
zt_id as user_id,
goods_no,
goods_name,
third_category_no,
third_category_name,
order_count
from (
select
*,
row_number() over(partition by zt_id order by order_count desc) as rn
from (
select
zt_id,
third_category_no,
third_category_name,
goods_no,
goods_name,
count(order_no) as order_count -- 订单量
from dwm.dwm_sold_goods_sold_dtl_i
where
-- 过滤最近一段时间内的销售数据
datediff(current_date(),to_date(trade_date))<=40 and goods_no is not null
and zt_id is not null and zt_id!=0
group by zt_id,third_category_no,third_category_name,goods_no,goods_name
) tmp_1
) tmp_2 where rn<=20
""")
# 3- 数据存储到Doris中
topn_goods_df.write.jdbc(
url="jdbc:mysql://192.168.88.166:9030/recommend_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",
table="popular_person_hot_goods",
mode="append",
sql={ 'user' : 'root', 'password' : '123456' }
)
# 4- 释放资源
spark.stop()
2、基于隐语义模型的协同过滤推荐(了解)
基于隐语义模型的协同过滤方法结合了协同过滤的思想和隐语义模型的技术,通过矩阵分解等方法,将用户-项目交互矩阵分解为两个低维矩阵,分别表示用户在隐空间中的向量和项目在隐空间中的向量。
2.1 ALS算法介绍
ALS算法是2008年以来,用的比较多的协同过滤算法。它已经集成到Spark的Mllib库中,使用起来比较方便。从协同过滤的分类来说,ALS算法属于User-Item CF,也叫做混合CF。它同时考虑了User和Item两个方面。
spark.ml目前支持基于模型的协同过滤,使用交替最小二乘法(ALS)算法实现。
spark.ml的实现具有以下参数:
- numBlocks:用户和物品将被分成的块数,以便并行计算(默认为10)
- rank:模型中的潜在因子数量(默认为10)
- maxIter:运行的最大迭代次数(默认为10)
- regParam:在ALS中指定的正则化参数(默认为1.0)
- implicitPrefs:指定是否使用显式反馈ALS变体或适用于隐式反馈数据的变体(默认为false,表示使用显式反馈)
- alpha:适用于ALS隐式反馈变体的参数,决定了对偏好观察的基本置信度(默认为1.0)
- nonnegative:指定是否对最小二乘法使用非负约束(默认为false)
注意:基于DataFrame的ALS API目前仅支持整数类型的用户和物品ID。
2.2 推荐代码
如果使用基于ALS的协同过滤模型进行推荐,关键是要构造用户对商品的评分数据。评分主要来源于用户的行为,包括浏览、加购、下单、购买、退单、评论、收藏等,一般在企业中,都会将这些因素考虑进去。
具体的评分方法是:浏览 1分,加购 2分,下单 3分, 支付 5分,退单 -5分。
- Doris建表语句
CREATE DATABASE IF NOT EXISTS recommend_db;
CREATE TABLE IF NOT EXISTS recommend_db.als_goods_for_user (
user_id INT comment '用户id',
goods_nos STRING comment '推荐的商品列表'
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
sql("replication_num" = "1");
- 推荐代码
import os
from datetime import datetime
import numpy as np
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, DoubleType
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python'
def get_best_parameter(df):
# 切分数据集
training, test = df.randomSplit([0.8, 0.2], seed=88)
# 使用ALS构建推荐模型
# 将冷启动策略设置为“drop”,以确保不会获得NaN评估指标
als = ALS(userCol='user_id', itemCol='goods_no', ratingCol='score', coldStartStrategy='drop')
# 创建参数网格
param_grid = ParamGridBuilder() \
.addGrid(als.rank, [5, 10, 15]) \
.addGrid(als.maxIter, [5, 10, 20]) \
.addGrid(als.regParam, [0.01, 0.05, 0.1]) \
.build()
# 创建评估器
evaluator = RegressionEvaluator(metricName='rmse', labelCol='score', predictionCol='prediction')
# 创建交叉验证器
crossval = CrossValidator(estimator=als,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3) # 3折交叉验证
# 训练模型
cv_model = crossval.fit(training)
# 选择最佳模型
best_model = cv_model.bestModel
# 评估最佳模型
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)
print('最优模型的均方根误差为:' + str(rmse))
# 获取最佳参数
rank = best_model._java_obj.parent().getRank()
maxIter = best_model._java_obj.parent().getMaxIter()
regParam = best_model._java_obj.parent().getRegParam()
print('最佳参数组合:')
print('rank: ', rank)
print('maxIter: ', maxIter)
print('regParam: ', regParam)
return rank, maxIter, regParam
if __name__ == '__main__':
# 1)创建整合Hive的SparkSession
# 1- 创建SparkSession对象
spark = SparkSession.builder \
.appName("recommend") \
.master("local[*]") \
.config("spark.sql.warehouse.dir", "hdfs://up01:8020/user/hive/warehouse") \
.config("hive.metastore.uris", "thrift://up01:9083") \
.config("spark.sql.shuffle.partitions", 5) \
.enableHiveSupport() \
.getOrCreate()
# 2)从业务库中计算历史评分
select_sql = """
select zt_id as user_id, goods_no, sum(if(trade_type in(2, 5), -1, 1)) * 5 as score
from dwm.dwm_sold_goods_sold_dtl_i
where dt >= date_sub(current_date, 90) and dt <= date_sub(current_date, 1)
and zt_id != 0 and zt_id is not null
group by zt_id, goods_no
"""
hive_df = spark.sql(select_sql)
hive_df.show()
# 3)读取并解析日志数据
file_path = 'hdfs://up01:8020/xtzg/etl/dwd_user_event_etl_result/dt=2025-02-14'
# 读取ORC格式的数据
log_df = spark.read.format('orc').load(file_path).select('user_id', F.split('goods_name', '=')[0].alias('goods_no'), (F.col('is_browse')*1 + F.col('is_cart')*2 + F.col('is_order')*3 + F.col('is_buy')*5 - F.col('is_back_order')*5).alias('score') )
log_df.show()
# 4) 数据合并并聚合
# 因为als模型中,需要userCol和itemCol都是整型,所以需要将类型转成int,又因为goods_no有0开头的,所以需要再前边拼接一个数字
# 因为频次过多会导致评分过大,所以可以使用log将数据变平滑
union_df = hive_df.unionAll(log_df).groupby('user_id', 'goods_no').agg(F.sum('score').alias('score')).\
select(F.col('user_id').astype(IntegerType()).alias('user_id'), F.concat(F.lit('1'), F.col('goods_no')).astype(IntegerType()).alias('goods_no'), 'score')
# union_df.printSchema()
union_df.show()
# 5) 训练模型并得到推荐结果
# 获取最佳超参数
# rank, maxIter, regParam = get_best_parameter(union_df)
rank, maxIter, regParam = 15, 20, 0.1
als = ALS(rank=rank, maxIter=maxIter, regParam=regParam, userCol='user_id', itemCol='goods_no', ratingCol='score',
coldStartStrategy='drop')
als_model: ALSModel = als.fit(union_df)
# 为每个用户生成十大商品推荐
userRecs = als_model.recommendForAllUsers(10)
userRecs.printSchema()
# userRecs.show(truncate=False)
# 处理 goods_no,将int转为str,并去掉前缀1
doris_df = userRecs.withColumn(
'goods_nos',
F.expr("""
TRANSFORM(recommendations, x -> named_struct(
'goods_no', substr(CAST(x.goods_no AS STRING), 2),
'rating', x.rating
))
""")).select('user_id', F.col('goods_nos').astype(StringType()).alias('goods_nos'))
doris_df.printSchema()
doris_df.show(truncate=False)
# 保存到 Doris
doris_df.write.jdbc(
url="jdbc:mysql://192.168.88.166:9030/recommend_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",
table="als_goods_for_user",
mode="append",
sql={'user': 'root', 'password': '123456'}
)
# 释放资源
spark.stop()
3、基于物品的协同过滤推荐(了解)
基于物品的协同过滤就是计算出每个标的物最相似的标的物列表,然后就可以为用户推荐用户喜欢的标的物相似的标的物。 这里可以借助ALS算法生成的矩阵来完成。
离线计算的ALS 算法,算法最终会为用户、商品分别生成最终的特征矩阵,分别是表示用户特征矩阵的U(K x U)矩阵,每个用户由 K 个特征描述;表示物品特征矩阵的V(I x K)矩阵,每个物品也由 K 个特征描述。
- Doris建表语句
CREATE TABLE IF NOT EXISTS recommend_db.als_sim_goods_list (
id INT comment 'id',
goods_no STRING comment '商品编码',
sim_goods_list STRING comment '相似的商品列表'
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10
sql("replication_num" = "1");
- 推荐代码:写到ALS的后面
# 6)为每个商品生成十大用户推荐
# goodsRecs = als_model.recommendForAllItems(10)
# 通过 itemFactors 获得商品的特征表达
# als_model.itemFactors.show(truncate=False)
# 获取商品id及对应的特征表达
item_factors_df = als_model.itemFactors.select(F.expr("substr(cast(id as string), 2) as goods_no"), 'features')
item_factors_df.show(truncate=False)
# 定义计算余弦相似度的 UDF
def consin_sim(vec1, vec2):
vec1 = np.array(vec1)
vec2 = np.array(vec2)
num = np.dot(vec1, vec2)
# np.linalg.norm()用于求范数,默认是二范数
denom = np.linalg.norm(vec1) * np.linalg.norm(vec2)
if denom == 0:
return 0.0
return round(float(num / denom), 4)
consin_sim_udf = F.udf(consin_sim, DoubleType())
# item_factors_df自关联,计算相似度,再将相似度小于0.75的过滤掉
cartesian_goods_df = item_factors_df.alias('df1') \
.crossJoin(item_factors_df.alias('df2')) \
.filter(F.col('df1.goods_no') != F.col('df2.goods_no')) \
.withColumn('simScore', consin_sim_udf(F.col('df1.features'), F.col('df2.features'))) \
.filter('simScore >= 0.75')
# 按照 goods_no 进行分组并构建推荐结果
goods_recs_df = cartesian_goods_df.groupBy('df1.goods_no') \
.agg(F.collect_list(F.struct(F.col('df2.goods_no').alias('rec_goods_no'), F.col('simScore').alias('score'))).alias(
'rec_goods_nos'))
# 对相似的goods_no列表进行排序,并选取前10个【因为个别商品相似的商品太多,所以只保留10个即可】
# 使用 expr 和 array_sort 函数进行排序,并使用 slice 函数只保留前10个元素
sorted_df = goods_recs_df.withColumn(
'sim_goods_list',
F.expr(
"slice(array_sort(rec_goods_nos, (x, y) -> case when x.score > y.score then -1 when x.score < y.score then 1 else 0 end), 1, 10)")
).withColumn(
'id',
F.expr(
"cast(concat('1', goods_no) as int)"
)
).select('id', 'goods_no', F.col('sim_goods_list').astype(StringType()).alias('sim_goods_list'))
# 显示结果
sorted_df.printSchema()
sorted_df.show(truncate=False)
# 保存到 Doris
write_to_doris(sorted_df, 'recommend_db.als_sim_goods_list')
4、基于用户的协同过滤推荐(了解)
UserCF算法主要是考虑用户与用户之间的相似度,给用户推荐和他兴趣相似的其他用户喜欢的物品。俗话说"物以群分,人以类聚",人们总是倾向于跟自己志同道合的人交朋友。同理,你朋友喜欢的东西你大概率也可能会喜欢,UserCF算法正是利用了这个原理。
- Doris建表语句
CREATE TABLE IF NOT EXISTS recommend_db.user_cf_goods_for_user (
user_id INT comment '用户id',
goods_nos STRING comment '推荐的商品列表'
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
sql("replication_num" = "1");
- 推荐代码
import os
from pyspark.ml.feature import CountVectorizer, Normalizer
from pyspark.sql import DataFrame, Window, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, ArrayType, StructType, StructField, StringType, FloatType
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder \
.appName("recommend") \
.master("local[*]") \
.config("spark.sql.warehouse.dir", "hdfs://up01:8020/user/hive/warehouse") \
.config("hive.metastore.uris", "thrift://up01:9083") \
.config("spark.sql.shuffle.partitions", 5) \
.enableHiveSupport() \
.getOrCreate()
# 2)从es读取标签数据并将标签数据合并
es_df = spark.read.format("es") \
.option("es.nodes", "192.168.88.166:9200") \
.option("es.resource", "user_profile_tags") \
.option("es.read.field.include", "user_id,tags_id_times,tags_id_once,tags_id_streaming") \
.load()
temp_df = es_df.select('user_id', F.concat(F.coalesce('tags_id_times', F.lit('')), F.lit(','),
F.coalesce('tags_id_once', F.lit('')), F.lit(','),
F.coalesce('tags_id_streaming', F.lit(''))).alias('tags'))
temp_df.show()
# 3)将标签数据转换成向量
# 使用split函数将字符串切分成数组,然后使用filter将''的元素过滤掉
tags_df = temp_df.select('user_id', F.split('tags', ',').alias('tags')).select('user_id', F.expr(
"filter(tags, x -> x != '')").alias('tags'))
# tags_df.show(truncate=False)
# 将标签数组转换为向量
cv = CountVectorizer(inputCol='tags', outputCol='features')
model = cv.fit(tags_df)
user_df = model.transform(tags_df)
# 因为数据量比较大,容易运行不出来,所以可以抽样,取少量数据
# user_df = user_df.sample(fraction=0.01, seed=66)
user_df.show(truncate=False)
# 4)计算用户相似度
# 标准化向量
normalizer = Normalizer(inputCol="features", outputCol="norm_features")
norm_df = normalizer.transform(user_df).select('user_id', 'norm_features')
norm_df.printSchema()
# norm_df.show(truncate=False)
# 将稀疏向量列转换为稠密向量列
def to_dense(vector):
return vector.toArray().tolist()
to_dense_udf = F.udf(to_dense, ArrayType(DoubleType()))
dense_df = norm_df.withColumn('dense_features', to_dense_udf('norm_features'))
# dense_df.show(truncate=False)
# 计算用户之间的余弦相似度
join_df = dense_df.alias('u1').join(dense_df.alias('u2'), F.col('u1.user_id') != F.col('u2.user_id')) \
.select(F.col('u1.user_id').alias('user1'), F.col('u2.user_id').alias('user2'),
F.col('u1.dense_features').astype(ArrayType(FloatType())).alias('f1'),
F.col('u2.dense_features').astype(ArrayType(FloatType())).alias('f2'))
user_sim = join_df.select('user1', 'user2', F.zip_with('f1', 'f2', lambda x, y: x * y).alias('f3')) \
.withColumn('cosine_sim', F.round(F.aggregate('f3', F.lit(0.0), lambda acc, x: acc + x), 4)) \
.select('user1', 'user2', 'cosine_sim')
# print('-----------------------',user_sim.count(),'------------------------------')
user_sim.printSchema()
# user_sim.show(truncate=False)
# 5)获取每个用户最相似的10个用户
# 定义窗口函数
windowSpec = Window.partitionBy('user1').orderBy(F.col('cosine_sim').desc())
# 取rn前10的列
rn_df = user_sim.withColumn('rn', F.row_number().over(windowSpec)).filter('rn <= 10')
# rn_df.show(truncate=False)
# 6)查询每个用户评分最高的商品
# 计算评分时,因为不同用户购买频次不同会导致评分差距过大,在进行商品推荐时,该评分对结果影响很大,所以可以对score使用log函数,将这种变化变平缓些
select_sql = """
select user_id, goods_no, round(log(score), 3) as score
from (
select
user_id, goods_no, score, rank() over(partition by user_id order by score desc) as rn
from (
select zt_id as user_id, goods_no, sum(if(trade_type in(2, 5), -1, 1)) * 5 as score
from dwm.dwm_sold_goods_sold_dtl_i
where dt >= date_sub(current_date, 90) and dt <= date_sub(current_date, 1)
and zt_id != 0 and zt_id is not null
group by zt_id, goods_no
) tmp
where score > 0 -- score为0或负的不推荐
) t
where rn <= 10
"""
hive_df = spark.sql(select_sql)
# hive_df.show()
# 按照 goods_no 进行分组并构建推荐结果
prefer_goods_df = hive_df.groupBy('user_id') \
.agg(F.collect_list(F.struct('goods_no', 'score')).alias('prefer_goods_nos'))
# prefer_goods_df.show(truncate=False)
# 7)用户关联商品,给用户进行推荐
# 关联商品
join_df = rn_df.join(prefer_goods_df, rn_df['user1'] == prefer_goods_df['user_id'], 'inner').select('user1', 'user2', 'cosine_sim', F.col('prefer_goods_nos').alias('user1_goods_no'))
join_df = join_df.join(prefer_goods_df, join_df['user2'] == prefer_goods_df['user_id'], 'inner').select('user1', 'user2', 'cosine_sim', 'user1_goods_no', F.col('prefer_goods_nos').alias('user2_goods_no'))
# join_df.show(truncate=False)
join_df.printSchema()
# 定义一个udf,将cosine_sim,user1_goods_no和user2_goods_no都传进去,去掉user2_goods_no中的user1_goods_no,并计算user2_goods_no的分数
def calculate_score(cosine_sim, user1_goods_no, user2_goods_no):
user1_goods = [item.goods_no for item in user1_goods_no]
user2_goods = []
for item in user2_goods_no:
if item['goods_no'] not in user1_goods:
user2_goods.append({'goods_no': item['goods_no'], 'score': round(item['score'] * cosine_sim, 3)})
return user2_goods
# 返回值类型
schema = ArrayType(StructType([
StructField('goods_no', StringType(), nullable=False),
StructField('score', DoubleType(), nullable=False)
]))
calculate_score_udf = F.udf(calculate_score, schema)
# 获取用户及推荐的商品
rec_df = join_df.select(F.col('user1').alias('user_id'), calculate_score_udf('cosine_sim', 'user1_goods_no', 'user2_goods_no').alias('rec_goods')).\
filter(F.size(F.col('rec_goods')) > 0)
# rec_df.show(truncate=False)
# 展开 rec_goods 中的元素,然后按照 user_id 进行分组并聚合成列表
goods_recs_df = rec_df.withColumn('goods_no_element', F.explode(F.col('rec_goods'))).groupBy('user_id').agg(F.collect_list('goods_no_element').alias('rec_goods_nos'))
# 对推荐的goods_no列表进行排序,并选取前10个【因为个别用户推荐的商品太多,所以只保留10个即可】
# 使用 expr 和 array_sort 函数进行排序,并使用 slice 函数只保留前10个元素
sorted_df = goods_recs_df.withColumn(
'rec_goods_list',
F.expr(
'slice(array_sort(rec_goods_nos, (x, y) -> case when x.score > y.score then -1 when x.score < y.score then 1 else 0 end), 1, 10)')
).select('user_id', F.col('rec_goods_list').astype(StringType()).alias('goods_nos'))
# sorted_df.show(truncate=False)
# 8)结果保存
# 保存到 Doris
sorted_df.write.jdbc(
url="jdbc:mysql://192.168.88.166:9030/recommend_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",
table="user_cf_goods_for_user",
mode="append",
sql={'user': 'root', 'password': '123456'}
)
spark.stop()
5、基于关联规则的推荐(熟悉)
5.1 关联规则详解
- 什么是关联规则(Association Rules)?
答:关联规则是数据挖掘中的概念,通过分析数据,找到数据之间的关联。电商中经常用来分析购买物品之间的相关性,例如,“购买尿布的用户,有大概率购买啤酒”,这就是一个关联规则。
- 什么是关联规则推荐(Association Rule Based Recommendaion)?
答:顾名思义,利用关联规则,来实施推荐。关联规则推荐的目标,是希望达到“将尿布放入购物车之后,再推荐啤酒”比“直接推荐啤酒”获取有更好的售卖效果。
- 关联规则推荐的典型应用?
线下,可以将尿布和啤酒放在一起;
线上,可以在用户将尿布放入购物车后,立刻推荐啤酒。
- 如何实施?
假设某电商会售卖ABCD四种商品,历史上共5笔订单,分别卖出{A,B,C}, {B,C,D}, {A,B,C,D}, {A,C}, {C}
5.2 FP-growth算法理解
常用的算法有Aprior算法和FP-growth算法,FP-growth算法比Apriori算法效率更高,并且在PySpark中对FP-growth算法进行了实现,所以这里重点讲一下FP-growth算法原理。
FP-growth(Frequent Pattern Tree, 频繁模式树),是韩家炜老师提出的挖掘频繁项集的方法,是将数据集存储在一个特定的称作FP树的结构之后发现频繁项集或频繁项对,即常在一块出现的元素项的集合FP树。
5.3 SparkMLlib中的FP-growth算法
spark.ml中提供了FPGrowth()方法来实现FP-growth算法。spark.ml的FP-growth实现接受以下(超)参数:
- minSupport:将一个项目集识别为频繁项目集的最低支持度。例如,如果一个项目在5个事务中出现了3次,它的支持度就是3/5=0.6。
- minConfidence:生成关联规则的最低置信度。置信度是表明一个关联规则被发现为真的频率。例如,如果在事务中项目集X出现了4次,而X和Y共同出现了2次,则规则X => Y的置信度为2/4=0.5。该参数不会影响频繁项目集的挖掘,但会指定从频繁项目集中生成关联规则的最低置信度。
- numPartitions:用于分配工作的分区数。默认情况下,该参数未设置,使用输入数据集的分区数。
模型训练完成后,会生成 FPGrowthModel 对象。FPGrowthModel 提供以下方法或属性:
-
freqItemsets:频繁项目集,以包含以下列的数据框格式提供:
- items:array:一个给定的项目集。
- freq:long:根据配置的模型参数,该项目集出现的次数。
-
associationRules:生成的置信度高于 minConfidence 的关联规则,以包含以下列的数据框格式提供:
- antecedent:array:作为关联规则假设的项目集。如果关联规则为A->B,则 antecedent 为 A。
- consequent:array:总是包含一个元素的项目集,代表关联规则的结论。如果关联规则为A->B,则 antecedent 为 B。
- confidence:double:置信度,定义参见上文中的 minConfidence。
- lift:double:提升度,计算方法为 support(antecedent ∪ consequent) / (support(antecedent) x support(consequent))。
- support:double:频繁项目集的支持度,定义参见上文中的 minSupport。
-
transform:根据传入的items,比对关联规则,将符合规则的结果添加到预测结果中。transform 方法将汇总所有适用规则的结果作为预测。预测列的数据类型与 items 列相同,并且不包含 items 列中的现有项目。
5.4 完整代码
- Doris建表语句
CREATE TABLE IF NOT EXISTS recommend_db.fpgrowth_association_goods
( `calculate_date` DATETIME COMMENT "计算时间",
`antecedent` ARRAY<STRING> COMMENT "购买的商品",
`consequent` ARRAY<STRING> COMMENT "关联(推荐)商品",
`confidence` DOUBLE COMMENT "置信度",
`lift` DOUBLE COMMENT "提升度",
`support` DOUBLE COMMENT "支持度"
)
DUPLICATE KEY(calculate_date)
comment '关联规则推荐'
PARTITION BY RANGE(calculate_date) ()
DISTRIBUTED BY HASH(calculate_date) BUCKETS 1
sql (
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-365",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"replication_allocation" = "tag.location.default: 1"
);
因为在doris中只是为了备份,所以存储成冗余模型即可。为了区分不同时间计算的结果,在表中添加了calculate_date字段,作为区分。然后为了分区存储,使用了动态分区的方式。
- 推荐代码
from pyspark.ml.fpm import FPGrowth, FPGrowthModel
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, StructType, StructField, ArrayType
from tags.utils.hdfs_utils import HDFSUtil
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 实现商品推荐的功能
def get_recommend_goods(current_goods_no, spark, fpg_model):
schema = StructType([
StructField("items",ArrayType(StringType()))
])
current_goods_no_df = spark.createDataFrame(data=[(current_goods_no,)],schema=schema)
# 使用模型进行商品推荐
result_df = fpg_model.transform(current_goods_no_df)
result_df.show()
result_df.printSchema()
print(result_df.collect())
# 返回最终的推荐商品ID
return result_df.collect()[0][1]
# 基于关联规则的推荐
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.appName("fp_growth")\
.master("local[*]") \
.config("spark.sql.warehouse.dir", "hdfs://up01:8020/user/hive/warehouse") \
.config("hive.metastore.uris", "thrift://up01:9083") \
.config("spark.sql.shuffle.partitions",5) \
.enableHiveSupport() \
.getOrCreate()
# 2- 数据输入:分析商品间的关联关系
order_df = spark.sql("""
select
order_no,
collect_set(goods_no) as items -- 将当前订单下的多个商品合到一个Set集合中
from dwm.dwm_sold_goods_sold_dtl_i
where datediff(current_date(),to_date(trade_date))<=40
and goods_no is not null
and parent_order_no is not null
and order_no is not null
group by order_no
""")
# 3- 通过FP-growth分析商品间的关联关系的频率
path = "/xtzg/recommend/fpg"
if HDFSUtil().exists(path):
# 如果之前已经训练好了模型,那么直接加载出来使用即可
fpg_model = FPGrowthModel.load("hdfs://192.168.88.166:8020"+path)
else:
# 3.1- 创建算法模型实例对象
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.001, minConfidence=0.6)
# 3.2- 对算法模型使用数据进行训练
fpg_model = fpGrowth.fit(order_df)
# 3.3- 再将训练好的模型存储到HDFS
fpg_model.save("hdfs://192.168.88.166:8020"+path)
rule_result = fpg_model.associationRules
rule_result.show(n=100)
rule_result.printSchema()
# 4- 模型训练后的商品关联信息存放到Doris中
# doris_df = rule_result.withColumn("calculate_date",F.current_timestamp())
doris_df = rule_result.select(
F.current_timestamp().alias("calculate_date"),
rule_result.antecedent.cast(StringType()).alias("antecedent"),
rule_result.consequent.cast(StringType()).alias("consequent"),
"confidence",
"lift",
"support"
)
doris_df.write.jdbc(
url="jdbc:mysql://192.168.88.166:9030/recommend_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",
table="fpgrowth_association_goods",
mode="append",
sql={ 'user' : 'root', 'password' : '123456' }
)
# 5- 使用训练好的模型来进行商品的推荐:这里是模拟后面顾客来购买东西的时候,进行推荐的效果
current_goods_no = ["3224064"]
recommend_goods = get_recommend_goods(current_goods_no, spark, fpg_model)
print(recommend_goods)
# 6- 释放资源
spark.stop()
可能遇到的错误:
原因: 将数据输入到Doris的array字段中的时候,需要在输入前将对应的数据格式进行类型转换为字符串
6、服务部署(了解)
配置好get_recommend_goods方法后,每次调用都需要spark session和fpg_model,如果每次调用都新建非常浪费资源,时效也会非常差。所以这里需要将get_recommend_goods方法布置成接口服务。服务启动后,则可以只实例化一个spark session和fgp_model,并实时响应查询推荐商品的请求
这里借助Flask来实现。Flask是一个用Python编写的Web应用程序框架。Flask中文官网:https://dormousehole.readthedocs.io/en/latest/
Flask安装命令
pip install Flask -i https://mirrors.aliyun.com/pypi/simple/
Flask代码
import ast
import os
from flask import Flask, request
from pyspark.ml.fpm import FPGrowthModel
from pyspark.sql import SparkSession
from tags.recommend.fpgrowth_association_goods import get_recommend_goods
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python'
# 初始化SparkSession
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", 5) \
.appName("recommend_api") \
.getOrCreate()
# 加载模型
hdfs_path = "/xtzg/recommend/fpg"
fpg_model: FPGrowthModel = FPGrowthModel.load("hdfs://192.168.88.166:8020"+hdfs_path)
app = Flask(__name__)
@app.route('/recommend')
def recommend():
# 处理get请求,获取?后边的参数
data = request.args.to_dict()
# print(data)
# 使用literal_eval将字符串转换为list
goods_list = ast.literal_eval(data['goods_list'])
print('-----数据来了:', goods_list)
# 方法调用
recommended_goods = get_recommend_goods(goods_list, spark, fpg_model)
return recommended_goods
@app.route('/')
def hello_world():
return '欢迎来到小兔智购商品推荐系统'
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000)
启动Flask,然后可以看到生成的URL。
访问:
这里模拟发送一个get请求,在url后加?,然后是key=value,如下
http://192.168.88.166:5000/recommend?goods_list=[‘3215330’]
则可以得到响应的结果
在工作中,由后端程序调用接口,得到响应结果后,再发生给前端进行渲染,生成推荐结果。