当前位置: 首页 > article >正文

完整地实现了推荐系统的构建、实验和评估过程,为不同推荐算法在同一数据集上的性能比较提供了可重复实验的框架

{
   
 "cells": [
  {
   
   "cell_type": "markdown",
   "metadata": {
   },
   "source": [
    "# 基于用户的协同过滤算法"
   ]
  },
  {
   
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
   },
   "outputs": [],
   "source": [
    "# 导入包\n",
    "import random\n",
    "import math\n",
    "import time\n",
    "from tqdm import tqdm"
   ]
  },
  {
   
   "cell_type": "markdown",
   "metadata": {
   },
   "source": [
    "## 一. 通用函数定义"
   ]
  },
  {
   
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
   },
   "outputs": [],
   "source": [
    "# 定义装饰器,监控运行时间\n",
    "def timmer(func):\n",
    "    def wrapper(*args, **kwargs):\n",
    "        start_time = time.time()\n",
    "        res = func(*args, **kwargs)\n",
    "        stop_time = time.time()\n",
    "        print('Func %s, run time: %s' % (func.__name__, stop_time - start_time))\n",
    "        return res\n",
    "    return wrapper"
   ]
  },
  {
   
   "cell_type": "markdown",
   "metadata": {
   },
   "source": [
    "### 1. 数据处理相关\n",
    "1. load data\n",
    "2. split data"
   ]
  },
  {
   
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
   },
   "outputs": [],
   "source": [
    "class Dataset():\n",
    "    \n",
    "    def __init__(self, fp):\n",
    "        # fp: data file path\n",
    "        self.data = self.loadData(fp)\n",
    "    \n",
    "    @timmer\n",
    "    def loadData(self, fp):\n",
    "        data = []\n",
    "        for l in open(fp):\n",
    "            data.append(tuple(map(int, l.strip().split('::')[:2])))\n",
    "        return data\n",
    "    \n",
    "    @timmer\n",
    "    def splitData(self, M, k, seed=1):\n",
    "        '''\n",
    "        :params: data, 加载的所有(user, item)数据条目\n",
    "        :params: M, 划分的数目,最后需要取M折的平均\n",
    "        :params: k, 本次是第几次划分,k~[0, M)\n",
    "        :params: seed, random的种子数,对于不同的k应设置成一样的\n",
    "        :return: train, test\n",
    "        '''\n",
    "        train, test = [], []\n",
    "        random.seed(seed)\n",
    "        for user, item in self.data:\n",
    "            # 这里与书中的不一致,本人认为取M-1较为合理,因randint是左右都覆盖的\n",
    "            if random.randint(0, M-1) == k:  \n",
    "                test.append((user, item))\n",
    "            else:\n",
    "                train.append((user, item))\n",
    "\n",
    "        # 处理成字典的形式,user->set(items)\n",
    "        def convert_dict(data):\n",
    "            data_dict = {}\n",
    "            for user, item in data:\n",
    "                if user not in data_dict:\n",
    "                    data_dict[user] = set()\n",
    "                data_dict[user].add(item)\n",
    "            data_dict = {k: list(data_dict[k]) for k in data_dict}\n",
    "            return data_dict\n",
    "\n",
    "        return convert_dict(train), convert_dict(test)"
   ]
  },
  {
   
   "cell_type": "markdown",
   "metadata": {
   },
   "source": [
    "### 2. 评价指标\n",
    "1. Precision\n",
    "2. Recall\n",
    "3. Coverage\n",
    "4. Popularity(Novelty)"
   ]
  },
  {
   
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
   },
   "outputs": [],
   "source": [
    "class Metric():\n",
    "    \n",
    "    def __init__(self, train, test, GetRecommendation):\n",
    "        '''\n",
    "        :params: train, 训练数据\n",
    "        :params: test, 测试数据\n",
    "        :params: GetRecommendation, 为某个用户获取推荐物品的接口函数\n",
    "        '''\n",
    "        self.train = train\n",
    "        self.test = test\n",
    "        self.GetRecommendation = GetRecommendation\n",
    "        self.recs = self.getRec()\n",
    "        \n",
    "    # 为test中的每个用户进行推荐\n",
    "    def getRec(self):\n",
    "        recs = {}\n",
    "        for user in self.test:\n",
    "            rank = self.GetRecommendation(user)\n",
    "            recs[user] = rank\n",
    "        return recs\n",
    "        \n",
    "    # 定义精确率指标计算方式\n",
    "    def precision(self):\n",
    "        all, hit = 0, 0\n",
    "        for user in self.test:\n",
    "            test_items = set(self.test[user])\n",
    "            rank = self.recs[user]\n",
    "            for item, score in rank:\n",
    "                if item in test_items:\n",
    "                    hit += 1\n",
    "            all += len(rank)\n",
    "        return round(hit / all * 100, 2)\n",
    "    \n",
    "    # 定义召回率指标计算方式\n",
    "    def recall(self):\n",
    "        all, hit = 0, 0\n",
    "        for user in self.test:\n",
    "            test_items = set(self.test[user])\n",
    "            rank = self.recs[user]\n",
    "            for item, score in rank:\n",
    "                if item in test_items:\n",
    "                    hit += 1\n",
    "            all += len(test_items)\n",
    "        return round(hit / all * 100, 2)\n",
    "    \n",
    "    # 定义覆盖率指标计算方式\n",
    "    def coverage(self):\n",
    "        all_item, recom_item = set(), set()\n",
    "        for user in self.test:\n",
    "            for item in self.train[user]:\n",
    "                all_item.add(item)\n",
    "            rank = self.recs[user]\n",
    "            for item, score in rank:\n",
    "                recom_item.add(item)\n",
    "        return round(len(recom_item) / len(all_item) * 100, 2)\n",
    "    \n",
    "    # 定义新颖度指标计算方式\n",
    "    def popularity(self):\n",
    "        # 计算物品的流行度\n",
    "        item_pop = {}\n",
    "        for user in self.train:\n",
    "            for item in self.train[user]:\n",
    "                if item not in item_pop:\n",
    "                    item_pop[item] = 0\n",
    "                item_pop[item] += 1\n",
    "\n",
    "        num, pop = 0, 0\n",
    "        for user in self.test:\n",
    "            rank = self.recs[user]\n",
    "            for item, score in rank:\n",
    "                # 取对数,防止因长尾问题带来的被流行物品所主导\n",
    "                pop += math.log(1 + item_pop[item])\n",
    "                num += 1\n",
    "        return round(pop / num, 6)\n",
    "    \n",
    "    def eval(self):\n",
    "        metric = {'Precision': self.precision(),\n",
    "                  'Recall': self.recall(),\n",
    "                  'Coverage': self.coverage(),\n",
    "                  'Popularity': self.popularity()}\n",
    "        print('Metric:', metric)\n",
    "        return metric"
   ]
  },
  {
   
   "cell_type": "markdown",
   "metadata": {
   },
   "source": [
    "## 二. 算法实现\n",
    "1. Random\n",
    "2. MostPopular\n",
    "3. UserCF\n",
    "4. UserIIF"
   ]
  },
  {
   
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
   },
   "outputs": [],
   "source": [
    "# 1. 随机推荐\n",
    "def Random(train, K, N):\n",
    "    '''\n",
    "    :params: train, 训练数据集\n",
    "    :params: K, 可忽略\n",
    "    :params: N, 超参数,设置取TopN推荐物品数目\n",
    "    :return: GetRecommendation,推荐接口函数\n",
    "    '''\n",
    "    items = {}\n",
    "    for user in train:\n",
    "        for item in train[user]:\n",
    "            items[item] = 1\n",
    "    \n",
    "    def GetRecommendation(user):\n",
    "        # 随机推荐N个未见过的\n",
    "        user_items = set(train[user])\n",
    "        rec_items = {k: items[k] for k in items if k not in user_items}\n",
    "        rec_items = list(rec_items.items())\n",
    "        random.shuffle(rec_items)\n",
    "        return rec_items[:N]\n",
    "    \n",
    "    return GetRecommendation"
   ]
  },
  {
   
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
   },
   "outputs": [],
   "source": [
    "# 2. 热门推荐\n",
    "def MostPopular(train, K, N):\n",
    "    '''\n",
    "    :params: train, 训练数据集\n",
    "    :params: K, 可忽略\n",
    "    :params: N, 超参数,设置取TopN推荐物品数目\n",
    "    :return: GetRecommendation, 推荐接口函数\n",
    "    '''\n",
    "    items = {}\n",
    "    for user in train:\n",
    "        for item in train[user]:\n",
    "            if item not in items:\n",
    "                items[item] = 0\n",
    "            items[item] += 1\n",
    "        \n",
    "    def GetRecommendation(user):\n",
    "        # 随机推荐N个没见过的最热门的\n",
    "        user_items = set(train[user])\n",
    "        rec_items = {k: items[k] for k in items if k not in user_items}\n",
    "        rec_items = list(sorted(rec_items.items(), key=lambda x: x[1], reverse=True))\n",
    "        return rec_items[:N]\n",
    "    \n",
    "    return GetRecommendation"
   ]
  },
  {
   
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
   },
   "outputs": [],
   "source": [
    "# 3. 基于用户余弦相似度的推荐\n",
    "def UserCF(train, K, N):\n",
    "    '''\n",
    "    :params: train, 训练数据集\n",
    "    :params: K, 超参数,设置取TopK相似用户数目\n",
    "    :params: N, 超参数,设置取TopN推荐物品数目\n",
    "    :return: GetRecommendation, 推荐接口函数\n",
    "    '''\n",
    "    # 计算item->user的倒排索引\n",
    "    item_users = {}\n",
    "    for user in train:\n",
    "        for item in train[user]:\n",
    "            if item not in item_users:\n",
    "                item_users[item] = []\n",
    "            item_users[item].append(user)\n",
    "    \n",
    "    # 计算用户相似度矩阵\n",
    "    sim = {}\n",
    "    num = {}\n",
    "    for item in item_users:\n",
    "        users = item_users[item]\n",
    "        for i in range(len(users)):\n",
    "            u = users[i]\n",
    "            if u not in num:\n",
    "                num[u] = 0\n",
    "            num[u] += 1\n",
    "            if u not in sim:\n",
    "                sim[u] = {}\n",
    "            for j in range(len(users)):\n",
    "                if j == i: continue\n",
    "                v = users[j]\n",
    "                if v not in sim[u]:\n",
    "                    sim[u][v] = 0\n",
    "                sim[u][v] += 1\n",
    "    for u in sim:\n",
    "        for v in sim[u]:\n",
    "            sim[u][v] /= math.sqrt(num[u] * num[v])\n",
    "    \n",
    "    # 按照相似度排序\n",
    "    sorted_user_sim = {k: list(sorted(v.items(), \\\n",
    "                               key=lambda x: x[1], reverse=True)) \\\n",
    "                       for k, v in sim.items()}\n",
    "    \n",
    "    # 获取接口函数\n",
    "    def GetRecommendation(user):\n",
    "        items = {}\n",
    "        seen_items = set(train[user])\n",
    "        for u, _ in sorted_user_sim[user][:K]:\n",
    "            for item in train[u]:\n",
    "                # 要去掉用户见过的\n",
    "                if item not in seen_items:\n",
    "                    if item not in items:\n",
    "                        items[item] = 0\n",
    "                    items[item] += sim[user][u]\n",
    "        recs = list(sorted(items.items(), key=lambda x: x[1], reverse=True))[:N]\n",
    "        return recs\n",
    "    \n",
    "    return GetRecommendation"
   ]
  },
  {
   
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
   },
   "outputs": [],
   "source": [
    "# 4. 基于改进的用户余弦相似度的推荐\n",
    "def UserIIF(train, K, N):\n",
    "    '''\n",
    "    :params: train, 训练数据集\n",
    "    :params: K, 超参数,设置取TopK相似用户数目\n",
    "    :params: N, 超参数,设置取TopN推荐物品数目\n",
    "    :return: GetRecommendation, 推荐接口函数\n",
    "    '''\n",
    "    # 计算item->user的倒排索引\n",
    "    item_users = {}\n",
    "    for user in train:\n",
    "        for item in train[user]:\n",
    "            if item not in item_users:\n",
    "                item_users[item] = []\n",
    "            item_users[item].append(user)\n",
    "    \n",
    "    # 计算用户相似度矩阵\n",
    "    sim = {}\n",
    "    num = {}\n",
    "    for item in item_users:\n",
    "        users = item_users[item]\n",
    "        for i in range(len(users)):\n",
    "            u = users[i]\n",
    "            if u not in num:\n",
    "                num[u] = 0\n",
    "            num[u] += 1\n",
    "            if u not in sim:\n",
    "                sim[u] = {}\n",
    "            for j in range(len(users)):\n",
    "                if j == i: continue\n",
    "                v = users[j]\n",
    "                if v not in sim[u]:\n",
    "                    sim[u][v] = 0\n",
    "                # 相比UserCF,主要是改进了这里\n",
    "                sim[u][v] += 1 / math.log(1 + len(users))\n",
    "    for u in sim:\n",
    "        for v in sim[u]:\n",
    "            sim[u][v] /&#

http://www.kler.cn/a/512569.html

相关文章:

  • 逆波兰表达式求值(力扣150)
  • Ubuntu cuda-cudnn中断安装如何卸载
  • 【云岚到家】-day03-门户缓存实现实战
  • oneplus3t-lineageos-16.1编译-android9,
  • VLAN基础理论
  • 【系统分享01】Python+Vue电影推荐系统
  • docker pull error with proxy
  • 【Linux】常见指令(三)
  • YOLOv8改进,YOLOv8检测头融合DiverseBranchBlock,并添加小目标检测层(四头检测),适合目标检测、分割等
  • 手机怎么远程操控电脑?
  • 算法-求字符串公共前缀
  • Docker 部署 mysql
  • Java设计模式—观察者模式
  • Python实现PDF文档转图片功能
  • c++ 给定欧氏平面中的一组线可以形成的三角形的数量
  • 嵌入式Linux驱动开发之pinctrl和gpio子系统
  • 《Vue3 七》Vue 中的动画
  • 【语言处理和机器学习】概述篇(基础小白入门篇)
  • 蒙操作系统(HarmonyOS)
  • 具身智能新突破!Physical Intelligence推出机器人动作tokenizer,训练提速5倍
  • 高级java每日一道面试题-2025年01月20日-数据库篇-并发事务带来哪些问题?
  • JeecgBoot 低代码 AI 大模型集成 DeepSeek
  • 【云岚到家】-day03-门户缓存实现实战
  • 服务器日志自动上传到阿里云OSS备份
  • 【网络协议】【http】【https】RSA+AES-TLS1.2
  • Unity3D学习笔记(一)