Kmeans算法来实现RFM指标计算步骤
K-Means(K均值)是一种经典的无监督聚类算法,主要用于将数据集划分为 KKK 个不同的簇(Cluster)。
它基于最小化簇内样本的平方误差,即最小化数据点与簇中心的距离之和。
1. K-Means 算法原理
(1) 主要步骤
-
初始化
选择 KKK 个初始聚类中心(可以随机选取,也可以使用K-Means++优化选择)。 -
分配样本到最近的簇中心
对于数据集中的每个点,将其分配给最近的聚类中心(使用欧几里得距离、曼哈顿距离等度量)。 -
更新簇中心
计算每个簇中所有点的均值,并将该均值作为新的簇中心。 -
重复步骤 2 和 3
直到聚类中心不再发生变化或者达到最大迭代次数。
(2) 目标函数
K-Means 试图最小化以下目标函数(平方误差):
2. K-Means 的特点
(1) 优势
- 计算速度快,时间复杂度接近 O(nKT)O(nKT)O(nKT)(其中 TTT 是迭代次数)。
- 适用于大规模数据集,易于并行化(可以结合MapReduce/Spark实现)。
- 聚类结果可解释性强,适用于数据分析、推荐系统、用户分群等场景。
(2) 局限性
- K值需要人为指定,如果选择不当,可能会导致聚类效果不佳。
- 对初始簇中心敏感,可能陷入局部最优(K-Means++ 可以优化初始中心选择)。
- 对异常点敏感,受离群点影响较大。
- 适用于凸形簇,对非球形分布的数据表现较差。
3. K-Means Demo示例
# 使用机器学习算法实现rfm
from utils.base import BasicTag
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
class RFMTag(BasicTag):
def make_tag(self):
rule_dict = self.data_dict
# 1-计算获取RFM的值
# 消费周期计算
recency = F.datediff(F.current_date(), F.from_unixtime(F.max('finishtime'))).alias('diff_dt').alias('recency')
# 订单量
frequency = F.countDistinct('ordersn').alias('frequency')
# 订单金额
monetary = F.sum('orderamount').alias('monetary')
df_rfm = self.df_es.groupby('memberid').agg(recency,frequency,monetary)
# 2-根据计算的结果,将数据划分5个等级进行打分
# -----------------------------R数据打分----------------------------------------------------------------------------------
# 2-1 特征工程将数据转为向量(只有转化为坐标向量(VectorAssembler),才能在坐标轴上计算方向及两点之间的距离,再求平均距离)
v_r = VectorAssembler(inputCols=['recency'],outputCol='v_r')
# 添加转化数据,将上面订单金额加入进行转化
df_r_fit = v_r.transform(df_rfm)
# 2-2 使用kmeas算法打分,
# k参数负责划分几个分数段;
# featuresCol负责传入向量值用于聚类的特征向量的列名;
# predictionCol 用于存储聚类结果的列名。KMeans算法会在训练数据上进行聚类分析,并将每个数据点分配到一个聚类。这个聚类结果会存储在predictionCol指定的列中。
#+---+-----------------+ +---+-----------------------+
#|id | v_r | |id | v_r | k_r |
#+---+-----------------+ +---+-----------------+-----+
#| 1 | [1.0, 2.0, 3.0] | 执行KMeans聚类后: | 1 | [1.0, 2.0, 3.0] | 0 |
#| 2 | [4.0, 5.0, 6.0] | ————————————————————————> | 2 | [4.0, 5.0, 6.0] | 1 |
#| 3 | [7.0, 8.0, 9.0] | | 3 | [7.0, 8.0, 9.0] | 2 |
#| 4 |[10.0, 11.0, 12.0]| | 4 |[10.0, 11.0, 12.0]| 3 |
kmeans = KMeans(k=5,featuresCol='v_r',predictionCol='k_r')
# 添加数据----kmeans.fit( )用于训练模型,训练完生成一个KMeans模型对象k_r_fit
k_r_fit = kmeans.fit(df_r_fit)
# 转化数据,对训练结果进行预测,将预测结果添加到数据框:df_k_r中
df_k_r = k_r_fit.transform(df_r_fit)
df_k_r.show()
# 计算k值中心判断k值的大小--clusterCenters()获取每个聚类的中心点(质心),存储在cluster变量中。
cluster = k_r_fit.clusterCenters()
# 遍历k值(质点)中心,计算k值的和
# cluster_dict = {}:创建一个空字典,用于存储每个聚类中心点的索引和其特征值的和。
# enumerate:是 Python 的一个内置函数,用于在遍历可迭代对象(如列表、元组、字符串等)时,同时获取每个元素的索引和值。它返回一个枚举对象,该对象生成一系列包含索引和值的元组。
# 示例列表:
#------------------------------------------+
# fruits = ['apple', 'banana', 'cherry'] |
# #使用 enumerate 进行遍历 |
# for index, value in enumerate(fruits): |
# rint(index, value) |
#--------输出------------------------------+
# 0 apple |
# 1 banana |
# 2 cherry |
#-----------------------------------------+
cluster_dict = {}
for i, v in enumerate(cluster):
cluster_dict[i] = sum(v)
print(cluster_dict) # 输出结果:{0: 1445.0, 1: 1446.0, 2: 1447.0}
# 中心点排序,对cluster_dict按特征值的和进行升序排序--并强制类型转化为字典格式
cluster_dict_sort = dict(sorted(cluster_dict.items(), key=lambda x: x[1]))
# 对k中心替换大小
# count = 0:初始化计数器。
# for k, v in cluster_dict_sort.items():遍历排序后的字典cluster_dict_sort。
# cluster_dict_sort[k] = count:将排序后的中心点索引替换为从0开始的递增值。
# count += 1:计数器递增。然后使用cluster_dict_sort[k] 进行值的替换
count = 0
for k, v in cluster_dict_sort.items():
cluster_dict_sort[k] = count
count += 1
# 替换原来df中的k值编号
@F.udf(returnType=IntegerType())
def repace_data(k):
return cluster_dict_sort.get(k)
df_replace_r = df_k_r.select(df_k_r.memberid,df_k_r.recency,df_k_r.frequency,df_k_r.monetary,repace_data('k_r').alias('k_r'))
#-----------------------------F数据打分----------------------------------------------------------------------------------
# 2-1 特征工程将数据转为向量
f_r = VectorAssembler(inputCols=['frequency'], outputCol='f_r')
# 添加转化数据
df_f_fit = f_r.transform(df_replace_r)
# 2-2 kmeas算法打分
kmeans = KMeans(k=5, featuresCol='f_r', predictionCol='k_f')
# 添加数据
f_r_fit = kmeans.fit(df_f_fit)
# 转化数据
df_f_r = f_r_fit.transform(df_f_fit)
# 计算k值中心判断k值的大小
cluster = f_r_fit.clusterCenters()
# 遍历k值中心,计算k值的和
cluster_dict = {}
for i, v in enumerate(cluster):
cluster_dict[i] = sum(v)
# 中心点排序
cluster_dict_sort = dict(sorted(cluster_dict.items(), key=lambda x: x[1]))
# 对k中心替换大小
count = 0
for k, v in cluster_dict_sort.items():
cluster_dict_sort[k] = count
count += 1
# 替换原来df中的k值编号
@F.udf(returnType=IntegerType())
def repace_data1(k):
return cluster_dict_sort.get(k)
df_replace_f = df_f_r.select(df_f_r.memberid, df_f_r.recency, df_f_r.frequency, df_f_r.monetary,df_f_r.k_r,
repace_data1('k_f').alias('k_f'))
print('F计算完成')
# -----------------------------M数据打分----------------------------------------------------------------------------------
# 2-1 特征工程将数据转为向量
# TODO:inputColss输入字段修改 outputCol字段修改
m_r = VectorAssembler(inputCols=['monetary'], outputCol='m_r')
# 添加转化数据
# TODO: 替换上一步的df数据
df_f_fit = m_r.transform(df_replace_f)
# 2-2 kmeas算法打分
# TODO:inputColss输入字段修改 outputCol字段修改
kmeans = KMeans(k=5, featuresCol='m_r', predictionCol='k_m')
# 添加数据
f_r_fit = kmeans.fit(df_f_fit)
# 转化数据
df_f_r = f_r_fit.transform(df_f_fit)
df_f_r.show()
print('M评分')
# 计算k值中心判断k值的大小
cluster = f_r_fit.clusterCenters()
# 遍历k值中心,计算k值的和
cluster_dict = {}
for i, v in enumerate(cluster):
cluster_dict[i] = sum(v)
# 中心点排序
cluster_dict_sort = dict(sorted(cluster_dict.items(), key=lambda x: x[1]))
# 对k中心替换大小
count = 0
for k, v in cluster_dict_sort.items():
cluster_dict_sort[k] = count
count += 1
# 替换原来df中的k值编号
@F.udf(returnType=IntegerType())
def repace_data2(k):
return cluster_dict_sort.get(k)
# TODO:
df_replace_m = df_f_r.select(df_f_r.memberid, df_f_r.recency, df_f_r.frequency, df_f_r.monetary, df_f_r.k_r, df_f_r.k_f,
repace_data2('k_m').alias('k_m'))
df_replace_m.show()
print('M计算完成')
# 3-计算所有用户的平均打分--结果为一行数据
df_rfm_score_avg = df_replace_m.select(F.avg('k_r').alias('r_avg'),F.avg('k_f').alias('f_avg'),F.avg('k_m').alias('m_avg'))
# +-----+-----+-----+
# |r_avg|f_avg|m_avg|
# +-----+-----+-----+
# | 2.0 | 3.0 | 4.0 |
# +-----+-----+-----+
# 将平均分转为row对象可以获第一行取值(实际上只有一行)
# row 是一个 Row 对象,它允许你像访问属性一样访问列值。
row = df_rfm_score_avg.first()
# 4-通过平均值判断rfm的高低
df_rfm_num = df_replace_m.select(
df_replace_m.memberid,
# 大于为1 否则为0
F.when(df_replace_m.k_r >= row['r_avg'],1).otherwise(0).alias('rn'),
F.when(df_replace_m.k_f > row['f_avg'],1).otherwise(0).alias('fn'),
F.when(df_replace_m.k_m > row['m_avg'],1).otherwise(0).alias('mn'),
)
# 5-标签匹配
# df_rfm_num 是一个包含 RFM 分数的 DataFrame。
# select 方法选择了 memberid 列和通过 concat 方法连接的 RFM 分数(即 rn、fn、mn)。
# F.concat 将 R、F、M 分数转化为字符串并连接成一个新的字符串。
# 最后,alias('rfm') 为这个新字符串列命名为 rfm
df_rfm_str = (df_rfm_num.select
(
df_rfm_num.memberid,
F.concat # F.concat 将 R、F、M 分数转化为字符串并连接成一个新的字符串。并且alias('rfm') 为这个新字符串列命名为 rfm
(
df_rfm_num.rn.cast('string'),
df_rfm_num.fn.cast('string'),
df_rfm_num.mn.cast('string')).alias('rfm'
)
)
)
df_rfm_str.show()
# 这是一个用户定义函数(UDF),用于根据 rule_dict 字典将 RFM 字符串映射到相应的标签。
# @F.udf(returnType=StringType()) 装饰器用于将 Python 函数转化为 PySpark UDF,并指定返回类型为 StringType()。
@F.udf(returnType=StringType())
def match(data):
return rule_dict.get(data)
# 应用 match 函数并创建新 DataFrame:
self.df_new_tag = (df_rfm_str.select
(
df_rfm_str.memberid.alias('userID'),
match(df_rfm_str.rfm).alias('tagsID')
)
)
# 创建对象
ss = SparkSession.builder.config('spark.sql.shuffle.partitions','6').getOrCreate()
rfm = RFMTag()
# 执行
rfm.action('RFM', ss, 'tfec_tags', 'tbl_basic_tag')