一个基于用户行为日志的资源推荐系统落地
1 环境
python | 3.12.0 |
---|---|
mongoDB | 4.2.1 |
redis | 5.0.7 |
技术方案
MongoDB + Redis 组合
使用 MongoDB 作为主存储,存储完整的推荐结果。
使用 Redis 作为缓存,存储热门用户的推荐结果,以提高实时查询性能。
MongoDB + MySQL 组合
使用 MongoDB 存储推荐结果,利用其灵活性和高性能。
使用 MySQL 存储用户画像、物品属性等结构化数据,支持复杂查询。
2 准备
训练数据CSV:用户行为日志,资源维表,用户基础信息表
-- user_log.csv
user_id,resource_id,behavior_type,timestamp
user1,res1,click,2023-10-01 10:00:00
user1,res2,browse,2023-10-01 10:05:00
user1,res3,like,2023-10-01 10:10:00
user1,res4,forward,2023-10-01 10:15:00
user1,res5,purchase,2023-10-01 10:20:00
user2,res1,click,2023-10-01 11:00:00
user2,res3,browse,2023-10-01 11:05:00
user2,res5,like,2023-10-01 11:10:00
user2,res7,forward,2023-10-01 11:15:00
user2,res9,purchase,2023-10-01 11:20:00
user3,res2,click,2023-10-01 12:00:00
user3,res4,browse,2023-10-01 12:05:00
user3,res6,like,2023-10-01 12:10:00
user3,res8,forward,2023-10-01 12:15:00
user3,res10,purchase,2023-10-01 12:20:00
user4,res10,purchase,2023-10-01 12:20:00
user5,res10,purchase,2023-10-01 12:20:00
-- user_info.csv
user_id,resource_id,behavior_type,timestamp
user1,res1,click,2023-10-01 10:00:00
user1,res2,browse,2023-10-01 10:05:00
user1,res3,like,2023-10-01 10:10:00
user1,res4,forward,2023-10-01 10:15:00
user1,res5,purchase,2023-10-01 10:20:00
user2,res1,click,2023-10-01 11:00:00
user2,res3,browse,2023-10-01 11:05:00
user2,res5,like,2023-10-01 11:10:00
user2,res7,forward,2023-10-01 11:15:00
user2,res9,purchase,2023-10-01 11:20:00
user3,res2,click,2023-10-01 12:00:00
user3,res4,browse,2023-10-01 12:05:00
user3,res6,like,2023-10-01 12:10:00
user3,res8,forward,2023-10-01 12:15:00
user3,res10,purchase,2023-10-01 12:20:00
user4,res10,purchase,2023-10-01 12:20:00
user5,res10,purchase,2023-10-01 12:20:00
-- resource_info.csv
resource_id,resource_name,type,tags
res1,Python Basics,programming,beginner
res2,Advanced Python,programming,advanced
res3,Machine Learning,AI,data-science
res4,Deep Learning,AI,advanced
res5,Web Development,programming,beginner
res6,Data Analysis,data-science,beginner
res7,Statistics,data-science,advanced
res8,Big Data,data-science,advanced
res9,Cloud Computing,cloud,beginner
res10,DevOps,cloud,advanced
3 代码
这段代码实现了一个基于用户行为日志的资源推荐系统。它通过分析用户的行为日志(如点击、浏览、点赞等)来计算用户的兴趣得分,并基于资源的标签信息计算资源之间的相似性。最终,系统会为每个用户生成个性化的资源推荐,并将推荐结果缓存到 Redis 中,同时将推荐结果同步到 MongoDB 数据库中。以下是代码的详细解读:
1. 导入依赖库
pandas 和 numpy:用于数据处理和数值计算。
sklearn.feature_extraction.text.TfidfVectorizer:用于将文本标签转换为 TF-IDF 向量。
sklearn.metrics.pairwise.cosine_similarity:用于计算资源之间的余弦相似度。
redis:用于缓存用户推荐结果和资源相似性。
pymongo:用于将推荐结果存储到 MongoDB 中。
logging:用于记录日志信息。
concurrent.futures.ThreadPoolExecutor:用于多线程并发处理用户推荐生成任务。
threading.Lock:用于线程同步,确保 Redis 缓存操作的安全性。
2. 配置日志
使用 logging.basicConfig 配置日志级别和格式,记录程序运行时的信息、警告和错误。
3. Redis 和 MongoDB 连接配置
REDIS_CONFIG:配置 Redis 的连接信息(主机、端口、数据库编号)。
MONGO_CONFIG:配置 MongoDB 的连接信息(主机、端口、用户名、密码、数据库名称、集合名称等)。
4. 数据加载
load_data() 函数从 CSV 文件中加载用户信息、用户行为日志和资源信息数据。如果加载失败,会记录错误并返回 None。
5. 计算用户兴趣得分
calculate_user_interest_scores() 函数根据用户行为日志计算每个用户对每个资源的兴趣得分。不同的行为类型(如点击、浏览、点赞等)有不同的权重,最终生成一个用户兴趣得分的字典。
6. 计算资源相似性并缓存到 Redis
calculate_and_cache_resource_similarity() 函数使用 TF-IDF 向量化资源标签,并计算资源之间的余弦相似度。相似度计算结果会分批缓存到 Redis 中,缓存的有效期为 24 小时。
7. 生成用户推荐并缓存到 Redis
generate_and_cache_user_recommendations() 函数根据用户的兴趣得分和资源的相似性,生成每个用户的个性化推荐结果,并将推荐结果缓存到 Redis 中,缓存的有效期为 1 小时。
8. 从 Redis 读取推荐结果并写入 MongoDB
read_from_redis_and_write_to_mongodb() 函数从 Redis 中读取所有用户的推荐结果,并将这些结果写入 MongoDB 集合中。如果 MongoDB 中已存在该用户的推荐结果,则更新文档;否则插入新文档。
9. 同步推荐结果到 MongoDB
sync_recommendations_to_mongodb() 函数负责连接 MongoDB,并调用 read_from_redis_and_write_to_mongodb() 函数将 Redis 中的推荐结果同步到 MongoDB 中。
10. 主函数
main() 函数是程序的入口,依次执行以下步骤:
加载数据。
计算用户兴趣得分。
连接 Redis。
计算并缓存资源相似性。
使用多线程生成并缓存用户推荐结果。
打印每个用户的推荐结果。
将推荐结果同步到 MongoDB。
手动触发垃圾回收,释放内存。
11. 错误处理
代码中使用了大量的 try-except 块来捕获和处理异常,确保程序在出现错误时不会崩溃,并且能够记录详细的错误信息。
12. 多线程处理
使用 ThreadPoolExecutor 并发处理用户推荐生成任务,提高程序的执行效率。
13. 内存管理
在适当的地方手动触发垃圾回收(gc.collect()),以释放不再使用的内存,避免内存泄漏。
14. 时间处理
使用 pytz 库将 UTC 时间转换为北京时间,并记录推荐结果的时间戳。
15. 主程序入口
if __name__ == "__main__": 确保 main() 函数只在直接运行脚本时执行。
总结
这段代码实现了一个完整的推荐系统,涵盖了数据加载、用户兴趣计算、资源相似性计算、推荐生成、缓存管理和数据同步等多个环节。通过 Redis 缓存和 MongoDB 存储,系统能够高效地处理大规模数据,并为用户提供个性化的资源推荐。
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from collections import defaultdict
import redis
import logging
import traceback
from concurrent.futures import ThreadPoolExecutor
import pymongo
import gc
import threading
from datetime import datetime
import pytz
lock = threading.Lock()
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Redis 连接配置
REDIS_CONFIG = {
'host': 'localhost',
'port': 6379,
'db': 47
}
# MongoDB 连接配置
MONGO_CONFIG = {
'host': 'localhost', # MongoDB 服务器地址
'port': 27017, # MongoDB 服务器端口
'user': 'admin',
'password': 'admin240628',
'database': 'recommendation_db', # MongoDB 数据库名称
'collection': 'user_recommendations_test', # MongoDB 集合名称
'connect_timeout': 10, # 连接超时时间(秒)
'ssl': False # 启用或禁用 SSL
}
# 加载数据
def load_data():
try:
user_info = pd.read_csv('user_info.csv')
user_log = pd.read_csv('user_log.csv')
resource_info = pd.read_csv('resource_info.csv', encoding='utf-8', quotechar='"')
# 确保 resource_id 列是字符串类型
resource_info['resource_id'] = resource_info['resource_id'].astype(str)
logging.info("数据加载成功!")
return user_info, user_log, resource_info
except Exception as e:
logging.error(f"加载数据时发生错误: {e}")
traceback.print_exc()
return None, None, None
# 计算用户兴趣得分
def calculate_user_interest_scores(user_log):
try:
behavior_weights = {
"click": 1.0,
"browse": 0.5,
"like": 0.8,
"forward": 0.6,
"purchase": 1.2
}
user_interest_scores = defaultdict(lambda: defaultdict(float))
for _, row in user_log.iterrows():
user_id = row.get('user_id')
resource_id = row.get('resource_id')
behavior_type = row.get('behavior_type')
if user_id is None or resource_id is None or behavior_type is None:
logging.warning(f"行数据不完整: {row}")
continue
if behavior_type in behavior_weights:
user_interest_scores[user_id][resource_id] += behavior_weights[behavior_type]
logging.info("用户兴趣得分计算完成!")
return user_interest_scores
except Exception as e:
logging.error(f"计算用户兴趣得分时发生错误: {e}")
traceback.print_exc()
return None
# 计算资源相似性并缓存到 Redis
def calculate_and_cache_resource_similarity(resource_info, redis_client):
try:
# 检查数据
resource_info['tags'] = resource_info['tags'].fillna('')
if resource_info['tags'].empty:
logging.error("tags 列为空,无法计算相似性")
return
# 计算资源相似性
tfidf = TfidfVectorizer()
resource_tags = resource_info['tags']
tfidf_matrix = tfidf.fit_transform(resource_tags)
logging.info(f"TF-IDF 矩阵形状: {tfidf_matrix.shape}")
# 分批计算相似性
batch_size = 1000
for i in range(0, len(resource_info), batch_size):
batch = resource_info[i:i + batch_size]
batch_matrix = tfidf_matrix[i:i + batch_size]
batch_similarity = cosine_similarity(batch_matrix, tfidf_matrix)
# 缓存到 Redis
for j, row in batch.iterrows():
resource_id = row['resource_id']
similar_resources = {}
for k, similar_resource_id in enumerate(resource_info['resource_id']):
if resource_id!= similar_resource_id:
similarity_score = batch_similarity[j - i][k]
similar_resources[str(similar_resource_id)] = float(similarity_score)
# 缓存资源相似性到 Redis
cache_resource_similarity(redis_client, resource_id, similar_resources)
logging.info("资源相似性计算完成并缓存到 Redis")
except Exception as e:
logging.error(f"计算资源相似性时发生错误: {e}")
traceback.print_exc()
# 缓存资源相似性到 Redis
def cache_resource_similarity(redis_client, resource_id, similar_resources):
key = f"resource_similarity:{resource_id}"
if similar_resources: # 检查 similar_resources 是否不为空
redis_client.hset(key, mapping=similar_resources) # 使用 hset 替换 hmset
redis_client.expire(key, 86400) # 设置 TTL 为 24 小时
# 获取缓存的资源相似性
def get_cached_resource_similarity(redis_client, resource_id):
key = f"resource_similarity:{resource_id}"
return redis_client.hgetall(key)
# 生成用户推荐并缓存到 Redis
def generate_and_cache_user_recommendations(user_id, user_interest_scores, top_n=10, redis_client=None):
try:
# 检查是否已缓存
if redis_client.exists(f"user_recommendations:{user_id}"):
logging.info(f"用户 {user_id} 的推荐结果已缓存,跳过生成")
return
# 生成推荐结果
recommendations = defaultdict(float)
if user_id not in user_interest_scores:
logging.warning(f"用户 {user_id} 的兴趣得分为空")
return
for resource_id, score in user_interest_scores[user_id].items():
similar_resources = get_cached_resource_similarity(redis_client, resource_id)
if not similar_resources:
logging.warning(f"资源 {resource_id} 的相似性为空")
continue
for similar_resource_id, similarity_score in similar_resources.items():
similar_resource_id = similar_resource_id.decode('utf-8') # 解码为字符串
recommendations[similar_resource_id] += score * float(similarity_score)
# 缓存到 Redis
sorted_recommendations = sorted(recommendations.items(), key=lambda x: x[1], reverse=True)[:top_n]
cache_user_recommendations(redis_client, user_id, {str(resource_id): float(score) for resource_id, score in sorted_recommendations})
logging.info(f"用户 {user_id} 的推荐结果生成完成并缓存到 Redis")
except Exception as e:
logging.error(f"生成用户推荐时发生错误: {e}")
traceback.print_exc()
# 缓存用户推荐结果到 Redis
def cache_user_recommendations(redis_client, user_id, recommendations):
key = f"user_recommendations:{user_id}"
if recommendations: # 检查 recommendations 是否不为空
with lock: # 加锁
redis_client.hset(key, mapping=recommendations) # 使用 hset 替换 hmset
redis_client.expire(key, 3600) # 设置 TTL 为 1 小时
# 获取缓存的用户推荐结果
def get_cached_user_recommendations(redis_client, user_id):
key = f"user_recommendations:{user_id}"
recommendations = redis_client.hgetall(key)
# 确保资源 ID 解码为字符串类型
return {k.decode('utf-8'): float(v) for k, v in recommendations.items()}
def read_from_redis_and_write_to_mongodb(redis_client, collection):
# 获取所有用户推荐结果的键
user_recommendation_keys = redis_client.keys("user_recommendations:*")
logging.info(f"找到 {len(user_recommendation_keys)} 个用户推荐结果")
if not user_recommendation_keys:
logging.warning("Redis 中没有用户推荐结果")
return
for key in user_recommendation_keys:
user_id = key.decode('utf-8').split(":")[1]
recommendations = get_cached_user_recommendations(redis_client, user_id)
if not recommendations:
logging.warning(f"用户 {user_id} 的推荐结果为空")
continue
# 格式化推荐结果
formatted_recommendations = [
{"item_id": resource_id, "score": score}
for resource_id, score in recommendations.items()
]
# 将 UTC 时间转换为北京时间
beijing_timezone = pytz.timezone('Asia/Shanghai')
beijing_timestamp = datetime.now(pytz.utc).astimezone(beijing_timezone).isoformat()
# 构建 MongoDB 文档
document = {
"user_id": user_id,
"recommendations": formatted_recommendations,
"timestamp": beijing_timestamp
}
# 插入或更新 MongoDB 集合中的数据
existing_document = collection.find_one({'user_id': user_id})
if existing_document:
# 更新已存在的文档
collection.update_one(
{'_id': existing_document['_id']},
{'$set': document}
)
else:
# 插入新文档
collection.insert_one(document)
def sync_recommendations_to_mongodb(redis_client, mongo_config):
# 连接 MongoDB
try:
logging.info("正在尝试连接 MongoDB...")
mongo_client = pymongo.MongoClient(
host=mongo_config['host'],
port=mongo_config['port'],
username=mongo_config.get('user'),
password=mongo_config.get('password'),
authSource=mongo_config.get('authSource', 'admin'),
connectTimeoutMS=mongo_config.get('connect_timeout', 5000),
ssl=mongo_config.get('ssl', False)
)
logging.info("成功连接到 MongoDB")
db = mongo_client[mongo_config['database']]
collection = db[mongo_config['collection']]
# 从 Redis 读取数据并写入 MongoDB
read_from_redis_and_write_to_mongodb(redis_client, collection)
logging.info("用户推荐结果已同步到 MongoDB")
except pymongo.errors.PyMongoError as e:
logging.error(f"MongoDB 连接或操作错误: {e}")
traceback.print_exc()
except Exception as e:
logging.error(f"同步用户推荐结果到 MongoDB 时发生错误: {e}")
traceback.print_exc()
# 主函数
def main():
try:
# 加载数据
user_info, user_log, resource_info = load_data()
if user_info is None or user_log is None or resource_info is None:
logging.error("数据加载失败,程序退出")
return
# 计算用户兴趣得分
user_interest_scores = calculate_user_interest_scores(user_log)
if user_interest_scores is None:
logging.error("用户兴趣得分计算失败,程序退出")
return
# 连接 Redis
redis_client = redis.StrictRedis(host=REDIS_CONFIG['host'], port=REDIS_CONFIG['port'], db=REDIS_CONFIG['db'])
logging.info("连接 Redis 完成!")
# 计算并缓存资源相似性
calculate_and_cache_resource_similarity(resource_info, redis_client)
# 生成并缓存用户推荐
user_ids = user_info['user_id'].unique()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(generate_and_cache_user_recommendations, user_id, user_interest_scores, 10, redis_client) for user_id in user_ids]
for future in futures:
future.result()
# 获取并打印用户推荐
for user_id in user_ids:
recommendations = get_cached_user_recommendations(redis_client, user_id)
print(f"为用户 {user_id} 推荐资源:")
sorted_data = sorted(recommendations.items(), key=lambda x: float(x[1]), reverse=True)[:10]
for i, (resource_id, score) in enumerate(sorted_data, 1):
try:
# 查找匹配的资源信息
matching_resources = resource_info[resource_info['resource_id'] == resource_id]
if not matching_resources.empty:
resource_name = matching_resources["resource_name"].values[0]
print(f"{i}. 资源:{resource_id} : {resource_name}, 推荐得分: {score:.2f}")
else:
# 记录更详细的错误信息
logging.warning(f"未找到资源 {resource_id} 的名称(类型: {type(resource_id)}),请检查数据完整性")
except Exception as e:
logging.error(f"获取资源 {resource_id} 的名称时发生错误: {e}")
# 同步推荐结果到 MySQL
sync_recommendations_to_mongodb(redis_client, MONGO_CONFIG)
# 在适当的地方手动触发垃圾回收
del user_info, user_log, resource_info
gc.collect()
except Exception as e:
logging.error(f"主函数运行时发生错误: {e}")
logging.error(traceback.format_exc())
except MemoryError:
logging.error("内存不足,请减少数据规模或增加系统内存")
except SystemError:
logging.error("系统错误,可能是 Python 环境或操作系统问题")
if __name__ == "__main__":
main()
4 推荐结果
为用户 user1 推荐资源:
1. 资源:res6 : Data Analysis, 推荐得分: 2.20
2. 资源:res9 : Cloud Computing, 推荐得分: 2.20
3. 资源:res1 : Python Basics, 推荐得分: 1.20
4. 资源:res7 : Statistics, 推荐得分: 1.10
5. 资源:res8 : Big Data, 推荐得分: 1.10
6. 资源:res10 : DevOps, 推荐得分: 1.10
7. 资源:res5 : Web Development, 推荐得分: 1.00
8. 资源:res2 : Advanced Python, 推荐得分: 0.60
9. 资源:res4 : Deep Learning, 推荐得分: 0.50
10. 资源:res3 : Machine Learning, 推荐得分: 0.00
为用户 user2 推荐资源:
1. 资源:res6 : Data Analysis, 推荐得分: 3.00
2. 资源:res5 : Web Development, 推荐得分: 2.20
3. 资源:res1 : Python Basics, 推荐得分: 2.00
4. 资源:res9 : Cloud Computing, 推荐得分: 1.80
5. 资源:res2 : Advanced Python, 推荐得分: 0.60
6. 资源:res4 : Deep Learning, 推荐得分: 0.60
7. 资源:res8 : Big Data, 推荐得分: 0.60
8. 资源:res10 : DevOps, 推荐得分: 0.60
9. 资源:res3 : Machine Learning, 推荐得分: 0.00
10. 资源:res7 : Statistics, 推荐得分: 0.00
结果数据持久化到mongoDB:
格式化推荐结果:
使用列表推导式将 Redis 中的推荐结果转换为 recommendations 列表,每个元素包含 item_id 和 score。
添加时间戳:
使用 datetime.utcnow().isoformat() 生成当前时间的 ISO 8601 格式字符串,并添加 Z 表示 UTC 时间。
构建 MongoDB 文档:
将 user_id、recommendations 列表和 timestamp 组合成一个文档。
插入或更新 MongoDB:
如果 MongoDB 中已存在该用户的推荐结果,则更新文档;否则插入新文档。
{
"_id" : ObjectId("677dd464bfec5987d02ade8d"),
"user_id" : "user2",
"recommendations" : [
{
"item_id" : "res6",
"score" : 3.0
},
{
"item_id" : "res5",
"score" : 2.2
},
{
"item_id" : "res1",
"score" : 2.0
},
{
"item_id" : "res9",
"score" : 1.8
},
{
"item_id" : "res2",
"score" : 0.6
},
{
"item_id" : "res4",
"score" : 0.6
},
{
"item_id" : "res8",
"score" : 0.6
},
{
"item_id" : "res10",
"score" : 0.6
},
{
"item_id" : "res3",
"score" : 0.0
},
{
"item_id" : "res7",
"score" : 0.0
}
],
"timestamp" : "2025-01-08T09:30:41.363509+08:00"
}