当前位置: 首页 > article >正文

一个基于用户行为日志的资源推荐系统落地

1 环境

python3.12.0
mongoDB4.2.1
redis5.0.7

在这里插入图片描述

技术方案
MongoDB + Redis 组合

使用 MongoDB 作为主存储,存储完整的推荐结果。

使用 Redis 作为缓存,存储热门用户的推荐结果,以提高实时查询性能。

MongoDB + MySQL 组合

使用 MongoDB 存储推荐结果,利用其灵活性和高性能。

使用 MySQL 存储用户画像、物品属性等结构化数据,支持复杂查询。

2 准备

训练数据CSV:用户行为日志,资源维表,用户基础信息表

-- user_log.csv
user_id,resource_id,behavior_type,timestamp
user1,res1,click,2023-10-01 10:00:00
user1,res2,browse,2023-10-01 10:05:00
user1,res3,like,2023-10-01 10:10:00
user1,res4,forward,2023-10-01 10:15:00
user1,res5,purchase,2023-10-01 10:20:00
user2,res1,click,2023-10-01 11:00:00
user2,res3,browse,2023-10-01 11:05:00
user2,res5,like,2023-10-01 11:10:00
user2,res7,forward,2023-10-01 11:15:00
user2,res9,purchase,2023-10-01 11:20:00
user3,res2,click,2023-10-01 12:00:00
user3,res4,browse,2023-10-01 12:05:00
user3,res6,like,2023-10-01 12:10:00
user3,res8,forward,2023-10-01 12:15:00
user3,res10,purchase,2023-10-01 12:20:00
user4,res10,purchase,2023-10-01 12:20:00
user5,res10,purchase,2023-10-01 12:20:00
-- user_info.csv
user_id,resource_id,behavior_type,timestamp
user1,res1,click,2023-10-01 10:00:00
user1,res2,browse,2023-10-01 10:05:00
user1,res3,like,2023-10-01 10:10:00
user1,res4,forward,2023-10-01 10:15:00
user1,res5,purchase,2023-10-01 10:20:00
user2,res1,click,2023-10-01 11:00:00
user2,res3,browse,2023-10-01 11:05:00
user2,res5,like,2023-10-01 11:10:00
user2,res7,forward,2023-10-01 11:15:00
user2,res9,purchase,2023-10-01 11:20:00
user3,res2,click,2023-10-01 12:00:00
user3,res4,browse,2023-10-01 12:05:00
user3,res6,like,2023-10-01 12:10:00
user3,res8,forward,2023-10-01 12:15:00
user3,res10,purchase,2023-10-01 12:20:00
user4,res10,purchase,2023-10-01 12:20:00
user5,res10,purchase,2023-10-01 12:20:00
-- resource_info.csv 
resource_id,resource_name,type,tags
res1,Python Basics,programming,beginner
res2,Advanced Python,programming,advanced
res3,Machine Learning,AI,data-science
res4,Deep Learning,AI,advanced
res5,Web Development,programming,beginner
res6,Data Analysis,data-science,beginner
res7,Statistics,data-science,advanced
res8,Big Data,data-science,advanced
res9,Cloud Computing,cloud,beginner
res10,DevOps,cloud,advanced

3 代码

这段代码实现了一个基于用户行为日志的资源推荐系统。它通过分析用户的行为日志(如点击、浏览、点赞等)来计算用户的兴趣得分,并基于资源的标签信息计算资源之间的相似性。最终,系统会为每个用户生成个性化的资源推荐,并将推荐结果缓存到 Redis 中,同时将推荐结果同步到 MongoDB 数据库中。以下是代码的详细解读:

1. 导入依赖库
pandas 和 numpy:用于数据处理和数值计算。

sklearn.feature_extraction.text.TfidfVectorizer:用于将文本标签转换为 TF-IDF 向量。

sklearn.metrics.pairwise.cosine_similarity:用于计算资源之间的余弦相似度。

redis:用于缓存用户推荐结果和资源相似性。

pymongo:用于将推荐结果存储到 MongoDB 中。

logging:用于记录日志信息。

concurrent.futures.ThreadPoolExecutor:用于多线程并发处理用户推荐生成任务。

threading.Lock:用于线程同步,确保 Redis 缓存操作的安全性。

2. 配置日志
使用 logging.basicConfig 配置日志级别和格式,记录程序运行时的信息、警告和错误。

3. Redis 和 MongoDB 连接配置
REDIS_CONFIG:配置 Redis 的连接信息(主机、端口、数据库编号)。

MONGO_CONFIG:配置 MongoDB 的连接信息(主机、端口、用户名、密码、数据库名称、集合名称等)。

4. 数据加载
load_data() 函数从 CSV 文件中加载用户信息、用户行为日志和资源信息数据。如果加载失败,会记录错误并返回 None5. 计算用户兴趣得分
calculate_user_interest_scores() 函数根据用户行为日志计算每个用户对每个资源的兴趣得分。不同的行为类型(如点击、浏览、点赞等)有不同的权重,最终生成一个用户兴趣得分的字典。

6. 计算资源相似性并缓存到 Redis
calculate_and_cache_resource_similarity() 函数使用 TF-IDF 向量化资源标签,并计算资源之间的余弦相似度。相似度计算结果会分批缓存到 Redis 中,缓存的有效期为 24 小时。

7. 生成用户推荐并缓存到 Redis
generate_and_cache_user_recommendations() 函数根据用户的兴趣得分和资源的相似性,生成每个用户的个性化推荐结果,并将推荐结果缓存到 Redis 中,缓存的有效期为 1 小时。

8. 从 Redis 读取推荐结果并写入 MongoDB
read_from_redis_and_write_to_mongodb() 函数从 Redis 中读取所有用户的推荐结果,并将这些结果写入 MongoDB 集合中。如果 MongoDB 中已存在该用户的推荐结果,则更新文档;否则插入新文档。

9. 同步推荐结果到 MongoDB
sync_recommendations_to_mongodb() 函数负责连接 MongoDB,并调用 read_from_redis_and_write_to_mongodb() 函数将 Redis 中的推荐结果同步到 MongoDB 中。

10. 主函数
main() 函数是程序的入口,依次执行以下步骤:

加载数据。

计算用户兴趣得分。

连接 Redis。

计算并缓存资源相似性。

使用多线程生成并缓存用户推荐结果。

打印每个用户的推荐结果。

将推荐结果同步到 MongoDB。

手动触发垃圾回收,释放内存。

11. 错误处理
代码中使用了大量的 try-except 块来捕获和处理异常,确保程序在出现错误时不会崩溃,并且能够记录详细的错误信息。

12. 多线程处理
使用 ThreadPoolExecutor 并发处理用户推荐生成任务,提高程序的执行效率。

13. 内存管理
在适当的地方手动触发垃圾回收(gc.collect()),以释放不再使用的内存,避免内存泄漏。

14. 时间处理
使用 pytz 库将 UTC 时间转换为北京时间,并记录推荐结果的时间戳。

15. 主程序入口
if __name__ == "__main__": 确保 main() 函数只在直接运行脚本时执行。

总结
这段代码实现了一个完整的推荐系统,涵盖了数据加载、用户兴趣计算、资源相似性计算、推荐生成、缓存管理和数据同步等多个环节。通过 Redis 缓存和 MongoDB 存储,系统能够高效地处理大规模数据,并为用户提供个性化的资源推荐。
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from collections import defaultdict
import redis
import logging
import traceback
from concurrent.futures import ThreadPoolExecutor
import pymongo
import gc
import threading
from datetime import datetime
import pytz
lock = threading.Lock()


# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


# Redis 连接配置
REDIS_CONFIG = {
    'host': 'localhost',
    'port': 6379,
    'db': 47
}

# MongoDB 连接配置
MONGO_CONFIG = {
    'host': 'localhost',  # MongoDB 服务器地址
    'port': 27017,  # MongoDB 服务器端口
    'user': 'admin',
    'password': 'admin240628',
    'database': 'recommendation_db',  # MongoDB 数据库名称
    'collection': 'user_recommendations_test',  # MongoDB 集合名称
    'connect_timeout': 10,  # 连接超时时间(秒)
    'ssl': False  # 启用或禁用 SSL
}


# 加载数据
def load_data():
    try:
        user_info = pd.read_csv('user_info.csv')
        user_log = pd.read_csv('user_log.csv')
        resource_info = pd.read_csv('resource_info.csv', encoding='utf-8', quotechar='"')
        # 确保 resource_id 列是字符串类型
        resource_info['resource_id'] = resource_info['resource_id'].astype(str)
        logging.info("数据加载成功!")
        return user_info, user_log, resource_info
    except Exception as e:
        logging.error(f"加载数据时发生错误: {e}")
        traceback.print_exc()
        return None, None, None


# 计算用户兴趣得分
def calculate_user_interest_scores(user_log):
    try:
        behavior_weights = {
            "click": 1.0,
            "browse": 0.5,
            "like": 0.8,
            "forward": 0.6,
            "purchase": 1.2
        }

        user_interest_scores = defaultdict(lambda: defaultdict(float))
        for _, row in user_log.iterrows():
            user_id = row.get('user_id')
            resource_id = row.get('resource_id')
            behavior_type = row.get('behavior_type')

            if user_id is None or resource_id is None or behavior_type is None:
                logging.warning(f"行数据不完整: {row}")
                continue

            if behavior_type in behavior_weights:
                user_interest_scores[user_id][resource_id] += behavior_weights[behavior_type]
        logging.info("用户兴趣得分计算完成!")
        return user_interest_scores
    except Exception as e:
        logging.error(f"计算用户兴趣得分时发生错误: {e}")
        traceback.print_exc()
        return None


# 计算资源相似性并缓存到 Redis
def calculate_and_cache_resource_similarity(resource_info, redis_client):
    try:
        # 检查数据
        resource_info['tags'] = resource_info['tags'].fillna('')
        if resource_info['tags'].empty:
            logging.error("tags 列为空,无法计算相似性")
            return

        # 计算资源相似性
        tfidf = TfidfVectorizer()
        resource_tags = resource_info['tags']
        tfidf_matrix = tfidf.fit_transform(resource_tags)
        logging.info(f"TF-IDF 矩阵形状: {tfidf_matrix.shape}")

        # 分批计算相似性
        batch_size = 1000
        for i in range(0, len(resource_info), batch_size):
            batch = resource_info[i:i + batch_size]
            batch_matrix = tfidf_matrix[i:i + batch_size]
            batch_similarity = cosine_similarity(batch_matrix, tfidf_matrix)

            # 缓存到 Redis
            for j, row in batch.iterrows():
                resource_id = row['resource_id']
                similar_resources = {}
                for k, similar_resource_id in enumerate(resource_info['resource_id']):
                    if resource_id!= similar_resource_id:
                        similarity_score = batch_similarity[j - i][k]
                        similar_resources[str(similar_resource_id)] = float(similarity_score)
                # 缓存资源相似性到 Redis
                cache_resource_similarity(redis_client, resource_id, similar_resources)

        logging.info("资源相似性计算完成并缓存到 Redis")
    except Exception as e:
        logging.error(f"计算资源相似性时发生错误: {e}")
        traceback.print_exc()


# 缓存资源相似性到 Redis
def cache_resource_similarity(redis_client, resource_id, similar_resources):
    key = f"resource_similarity:{resource_id}"
    if similar_resources:  # 检查 similar_resources 是否不为空
        redis_client.hset(key, mapping=similar_resources)  # 使用 hset 替换 hmset
        redis_client.expire(key, 86400)  # 设置 TTL 为 24 小时


# 获取缓存的资源相似性
def get_cached_resource_similarity(redis_client, resource_id):
    key = f"resource_similarity:{resource_id}"
    return redis_client.hgetall(key)


# 生成用户推荐并缓存到 Redis
def generate_and_cache_user_recommendations(user_id, user_interest_scores, top_n=10, redis_client=None):
    try:
        # 检查是否已缓存
        if redis_client.exists(f"user_recommendations:{user_id}"):
            logging.info(f"用户 {user_id} 的推荐结果已缓存,跳过生成")
            return

        # 生成推荐结果
        recommendations = defaultdict(float)
        if user_id not in user_interest_scores:
            logging.warning(f"用户 {user_id} 的兴趣得分为空")
            return

        for resource_id, score in user_interest_scores[user_id].items():
            similar_resources = get_cached_resource_similarity(redis_client, resource_id)
            if not similar_resources:
                logging.warning(f"资源 {resource_id} 的相似性为空")
                continue

            for similar_resource_id, similarity_score in similar_resources.items():
                similar_resource_id = similar_resource_id.decode('utf-8')  # 解码为字符串
                recommendations[similar_resource_id] += score * float(similarity_score)

        # 缓存到 Redis
        sorted_recommendations = sorted(recommendations.items(), key=lambda x: x[1], reverse=True)[:top_n]
        cache_user_recommendations(redis_client, user_id, {str(resource_id): float(score) for resource_id, score in sorted_recommendations})
        logging.info(f"用户 {user_id} 的推荐结果生成完成并缓存到 Redis")
    except Exception as e:
        logging.error(f"生成用户推荐时发生错误: {e}")
        traceback.print_exc()

# 缓存用户推荐结果到 Redis
def cache_user_recommendations(redis_client, user_id, recommendations):
    key = f"user_recommendations:{user_id}"
    if recommendations:  # 检查 recommendations 是否不为空
        with lock:  # 加锁
            redis_client.hset(key, mapping=recommendations)  # 使用 hset 替换 hmset
            redis_client.expire(key, 3600)  # 设置 TTL 为 1 小时

# 获取缓存的用户推荐结果
def get_cached_user_recommendations(redis_client, user_id):
    key = f"user_recommendations:{user_id}"
    recommendations = redis_client.hgetall(key)
    # 确保资源 ID 解码为字符串类型
    return {k.decode('utf-8'): float(v) for k, v in recommendations.items()}
def read_from_redis_and_write_to_mongodb(redis_client, collection):
    # 获取所有用户推荐结果的键
    user_recommendation_keys = redis_client.keys("user_recommendations:*")
    logging.info(f"找到 {len(user_recommendation_keys)} 个用户推荐结果")

    if not user_recommendation_keys:
        logging.warning("Redis 中没有用户推荐结果")
        return

    for key in user_recommendation_keys:
        user_id = key.decode('utf-8').split(":")[1]
        recommendations = get_cached_user_recommendations(redis_client, user_id)
        if not recommendations:
            logging.warning(f"用户 {user_id} 的推荐结果为空")
            continue

        # 格式化推荐结果
        formatted_recommendations = [
            {"item_id": resource_id, "score": score}
            for resource_id, score in recommendations.items()
        ]

        # 将 UTC 时间转换为北京时间
        beijing_timezone = pytz.timezone('Asia/Shanghai')
        beijing_timestamp = datetime.now(pytz.utc).astimezone(beijing_timezone).isoformat()

        # 构建 MongoDB 文档
        document = {
            "user_id": user_id,
            "recommendations": formatted_recommendations,
            "timestamp": beijing_timestamp
        }

        # 插入或更新 MongoDB 集合中的数据
        existing_document = collection.find_one({'user_id': user_id})
        if existing_document:
            # 更新已存在的文档
            collection.update_one(
                {'_id': existing_document['_id']},
                {'$set': document}
            )
        else:
            # 插入新文档
            collection.insert_one(document)
def sync_recommendations_to_mongodb(redis_client, mongo_config):
    # 连接 MongoDB
    try:
        logging.info("正在尝试连接 MongoDB...")
        mongo_client = pymongo.MongoClient(
            host=mongo_config['host'],
            port=mongo_config['port'],
            username=mongo_config.get('user'),
            password=mongo_config.get('password'),
            authSource=mongo_config.get('authSource', 'admin'),
            connectTimeoutMS=mongo_config.get('connect_timeout', 5000),
            ssl=mongo_config.get('ssl', False)
        )
        logging.info("成功连接到 MongoDB")
        db = mongo_client[mongo_config['database']]
        collection = db[mongo_config['collection']]

        # 从 Redis 读取数据并写入 MongoDB
        read_from_redis_and_write_to_mongodb(redis_client, collection)
        logging.info("用户推荐结果已同步到 MongoDB")
    except pymongo.errors.PyMongoError as e:
        logging.error(f"MongoDB 连接或操作错误: {e}")
        traceback.print_exc()
    except Exception as e:
        logging.error(f"同步用户推荐结果到 MongoDB 时发生错误: {e}")
        traceback.print_exc()
# 主函数
def main():
    try:
        # 加载数据
        user_info, user_log, resource_info = load_data()
        if user_info is None or user_log is None or resource_info is None:
            logging.error("数据加载失败,程序退出")
            return

        # 计算用户兴趣得分
        user_interest_scores = calculate_user_interest_scores(user_log)
        if user_interest_scores is None:
            logging.error("用户兴趣得分计算失败,程序退出")
            return

        # 连接 Redis
        redis_client = redis.StrictRedis(host=REDIS_CONFIG['host'], port=REDIS_CONFIG['port'], db=REDIS_CONFIG['db'])
        logging.info("连接 Redis 完成!")

        # 计算并缓存资源相似性
        calculate_and_cache_resource_similarity(resource_info, redis_client)

        # 生成并缓存用户推荐
        user_ids = user_info['user_id'].unique()
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = [executor.submit(generate_and_cache_user_recommendations, user_id, user_interest_scores, 10, redis_client) for user_id in user_ids]
            for future in futures:
                future.result()

        # 获取并打印用户推荐
        for user_id in user_ids:
            recommendations = get_cached_user_recommendations(redis_client, user_id)
            print(f"为用户 {user_id} 推荐资源:")
            sorted_data = sorted(recommendations.items(), key=lambda x: float(x[1]), reverse=True)[:10]
            for i, (resource_id, score) in enumerate(sorted_data, 1):
                try:
                    # 查找匹配的资源信息
                    matching_resources = resource_info[resource_info['resource_id'] == resource_id]
                    if not matching_resources.empty:
                        resource_name = matching_resources["resource_name"].values[0]
                        print(f"{i}. 资源:{resource_id} : {resource_name}, 推荐得分: {score:.2f}")
                    else:
                        # 记录更详细的错误信息
                        logging.warning(f"未找到资源 {resource_id} 的名称(类型: {type(resource_id)}),请检查数据完整性")
                except Exception as e:
                    logging.error(f"获取资源 {resource_id} 的名称时发生错误: {e}")

        # 同步推荐结果到 MySQL
        sync_recommendations_to_mongodb(redis_client, MONGO_CONFIG)
        # 在适当的地方手动触发垃圾回收
        del user_info, user_log, resource_info
        gc.collect()
    except Exception as e:
        logging.error(f"主函数运行时发生错误: {e}")
        logging.error(traceback.format_exc())
    except MemoryError:
        logging.error("内存不足,请减少数据规模或增加系统内存")
    except SystemError:
        logging.error("系统错误,可能是 Python 环境或操作系统问题")


if __name__ == "__main__":
    main()

4 推荐结果

为用户 user1 推荐资源:
1. 资源:res6 : Data Analysis, 推荐得分: 2.20
2. 资源:res9 : Cloud Computing, 推荐得分: 2.20
3. 资源:res1 : Python Basics, 推荐得分: 1.20
4. 资源:res7 : Statistics, 推荐得分: 1.10
5. 资源:res8 : Big Data, 推荐得分: 1.10
6. 资源:res10 : DevOps, 推荐得分: 1.10
7. 资源:res5 : Web Development, 推荐得分: 1.00
8. 资源:res2 : Advanced Python, 推荐得分: 0.60
9. 资源:res4 : Deep Learning, 推荐得分: 0.50
10. 资源:res3 : Machine Learning, 推荐得分: 0.00
为用户 user2 推荐资源:
1. 资源:res6 : Data Analysis, 推荐得分: 3.00
2. 资源:res5 : Web Development, 推荐得分: 2.20
3. 资源:res1 : Python Basics, 推荐得分: 2.00
4. 资源:res9 : Cloud Computing, 推荐得分: 1.80
5. 资源:res2 : Advanced Python, 推荐得分: 0.60
6. 资源:res4 : Deep Learning, 推荐得分: 0.60
7. 资源:res8 : Big Data, 推荐得分: 0.60
8. 资源:res10 : DevOps, 推荐得分: 0.60
9. 资源:res3 : Machine Learning, 推荐得分: 0.00
10. 资源:res7 : Statistics, 推荐得分: 0.00

结果数据持久化到mongoDB:

格式化推荐结果:

使用列表推导式将 Redis 中的推荐结果转换为 recommendations 列表,每个元素包含 item_id 和 score。

添加时间戳:

使用 datetime.utcnow().isoformat() 生成当前时间的 ISO 8601 格式字符串,并添加 Z 表示 UTC 时间。

构建 MongoDB 文档:

将 user_id、recommendations 列表和 timestamp 组合成一个文档。

插入或更新 MongoDB:

如果 MongoDB 中已存在该用户的推荐结果,则更新文档;否则插入新文档。
{
    "_id" : ObjectId("677dd464bfec5987d02ade8d"),
    "user_id" : "user2",
    "recommendations" : [ 
        {
            "item_id" : "res6",
            "score" : 3.0
        }, 
        {
            "item_id" : "res5",
            "score" : 2.2
        }, 
        {
            "item_id" : "res1",
            "score" : 2.0
        }, 
        {
            "item_id" : "res9",
            "score" : 1.8
        }, 
        {
            "item_id" : "res2",
            "score" : 0.6
        }, 
        {
            "item_id" : "res4",
            "score" : 0.6
        }, 
        {
            "item_id" : "res8",
            "score" : 0.6
        }, 
        {
            "item_id" : "res10",
            "score" : 0.6
        }, 
        {
            "item_id" : "res3",
            "score" : 0.0
        }, 
        {
            "item_id" : "res7",
            "score" : 0.0
        }
    ],
    "timestamp" : "2025-01-08T09:30:41.363509+08:00"
}

http://www.kler.cn/a/499944.html

相关文章:

  • 网易云音乐登录两部手机:IP属地归属何方?
  • 数据结构大作业——家谱管理系统(超详细!完整代码!)
  • python类和对象
  • [石榴翻译] 维吾尔语音识别 + TTS语音合成
  • 《拉依达的嵌入式\驱动面试宝典》—操作系统篇(八)
  • 3D机器视觉的类型、应用和未来趋势
  • vue.js+websocket+mongodb实现纯粹的聊天室项目
  • React面试合集
  • 牛客网刷题 ——C语言初阶(6指针)——BC106 上三角矩阵判定
  • 天气app的收获
  • 频域自适应空洞卷积FADC详解
  • Spring Boot 支持哪些日志框架
  • vue实现淘宝web端,装饰淘宝店铺APP,以及后端设计成能快速响应前端APP
  • # LeetCode 3270. 求出数字答案 —— Python 解题思路与实现
  • Spring Boot + Jasypt 实现application.yml 属性加密的快速示例
  • 【25考研】川大计算机复试情况,重点是啥?怎么准备?
  • 重新面试之JVM
  • 不同方式获取音频时长 - python 实现
  • Selenium python爬虫 是否需要设置浏览器窗口大小 ,有些按钮显示 不全会导致无法正常与这些元素进行交互
  • webpack03
  • 数据结构初阶---排序
  • 【机器学习篇】 科技异次元的超强 “魔杖”,开启奇幻新程
  • 金融项目实战 01|功能测试分析与设计
  • c++ 预备