当前位置: 首页 > article >正文

项目准备(flask+pyhon+MachineLearning)- 3

目录

1.商品信息

2. 商品销售预测

2.1 机器学习

2.2 预测功能

3. 模型评估


1.商品信息

@app.route('/products')
def products():
    """商品分析页面"""
    data = load_data()
    
    # 计算当前期间和上期间
    current_period = data[data['成交时间'] >= data['成交时间'].max() - timedelta(days=30)]
    previous_period = data[(data['成交时间'] < data['成交时间'].max() - timedelta(days=30)) & 
                         (data['成交时间'] >= data['成交时间'].max() - timedelta(days=60))]
    
    # 计算商品指标
    current_sales = current_period.groupby('商品ID').apply(
        lambda x: (x['销量'] * x['单价']).sum()
    )
    previous_sales = previous_period.groupby('商品ID').apply(
        lambda x: (x['销量'] * x['单价']).sum()
    )
    
    product_metrics = pd.DataFrame({
        'current_sales': current_sales,
        'previous_sales': previous_sales
    }).fillna(0)
    
    product_metrics['growth_rate'] = (
        (product_metrics['current_sales'] - product_metrics['previous_sales']) / 
        product_metrics['previous_sales']
    ).fillna(0)
    
    max_competitor_sales = product_metrics['current_sales'].max()
    product_metrics['market_share'] = (
        product_metrics['current_sales'] / max_competitor_sales
    )
    
    # BCG矩阵分类
    growth_rate_threshold = product_metrics['growth_rate'].median()
    market_share_threshold = product_metrics['market_share'].median()
    
    def classify_product(row):
        if row['growth_rate'] >= growth_rate_threshold:
            return '明星商品' if row['market_share'] >= market_share_threshold else '问题商品'
        else:
            return '现金牛' if row['market_share'] >= market_share_threshold else '瘦狗'
    
    product_metrics['category'] = product_metrics.apply(classify_product, axis=1)
    
    # 统计分类结果
    category_stats = product_metrics.groupby('category').agg({
        'current_sales': ['count', 'sum']
    })
    category_stats.columns = ['product_count', 'sales_amount']
    category_stats['sales_percentage'] = (
        category_stats['sales_amount'] / category_stats['sales_amount'].sum()
    )
    
    return render_template('products.html',
                         category_statistics=category_stats.to_dict('index'),
                         growth_rate_threshold=float(growth_rate_threshold),
                         market_share_threshold=float(market_share_threshold))

2. 商品销售预测

2.1 机器学习

def prepare_features(data):
    """准备特征数据"""
    # 删除包含NaN的行
    data = data.dropna(subset=['销量', '单价', '类别ID', '门店编号'])
    
    # 时间特征
    data['weekday'] = data['成交时间'].dt.weekday
    data['month'] = data['成交时间'].dt.month
    data['hour'] = data['成交时间'].dt.hour
    
    # 类别编码
    le_category = LabelEncoder()
    le_store = LabelEncoder()
    
    # 拟合编码器
    le_category.fit(data['类别ID'].astype(str))  # 将类别ID转换为字符串
    le_store.fit(data['门店编号'].astype(str))
    
    # 转换数据
    data['类别编码'] = le_category.transform(data['类别ID'].astype(str))
    data['门店编码'] = le_store.transform(data['门店编号'].astype(str))
    
    # 特征选择
    features = ['类别编码', '门店编码', '单价', 'weekday', 'month', 'hour']
    target = '销量'
    
    # 确保所有特征都是数值类型
    X = data[features].astype(float)
    y = data[target].astype(float)
    
    return X, y, le_category, le_store

# 创建全局变量来存储模型和编码器
model = None
scaler = None
label_encoder_category = None
label_encoder_store = None

def initialize_model():
    """初始化模型和编码器"""
    global model, scaler, label_encoder_category, label_encoder_store
    
    try:
        data = load_data()
        X, y, le_category, le_store = prepare_features(data)
        
        # 训练模型
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        # 标准化特征
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        
        # 训练决策树模型
        model = DecisionTreeRegressor(random_state=42, max_depth=10)
        model.fit(X_train_scaled, y_train)
        
        # 保存编码器
        label_encoder_category = le_category
        label_encoder_store = le_store
        
        return True
    except Exception as e:
        print(f"模型初始化错误: {str(e)}")
        return False

2.2 预测功能

@app.route('/predict', methods=['POST'])
def predict():
    """处理预测请求"""
    global model, scaler, label_encoder_category, label_encoder_store
    
    try:
        # 如果模型未初始化,先初始化
        if model is None or scaler is None:
            if not initialize_model():
                return jsonify({'error': '模型初始化失败'}), 500
        
        # 获取表单数据
        category = request.form['category']
        store = request.form['store']
        price = float(request.form['price'])
        weekday = int(request.form['weekday'])
        month = int(request.form['month'])
        
        try:
            # 转换类别编码和门店编码
            category_encoded = label_encoder_category.transform([str(category)])[0]
            store_encoded = label_encoder_store.transform([str(store)])[0]
        except ValueError as e:
            return jsonify({'error': f'无效的输入数据: {str(e)}'}), 400
        
        # 准备预测数据
        pred_data = pd.DataFrame([[
            category_encoded,
            store_encoded,
            price,
            weekday,
            month,
            12  # 使用默认时间
        ]], columns=['类别编码', '门店编码', '单价', 'weekday', 'month', 'hour'])
        
        # 标准化预测数据
        pred_data_scaled = scaler.transform(pred_data)
        
        # 预测
        prediction = model.predict(pred_data_scaled)[0]
        
        # 确保预测结果为正整数
        prediction = max(0, round(prediction))
        
        # 获取门店信息
        store_info = STORE_INFO.get(store, {})
        store_name = store_info.get('name', f'门店{store}')
        
        # 加载历史数据进行分析
        data = load_data()
        
        # 计算该类别的历史平均销量
        category_avg = data[data['类别ID'].astype(str) == str(category)]['销量'].mean()
        
        # 计算该门店的历史平均销量
        store_avg = data[data['门店编号'].astype(str) == str(store)]['销量'].mean()
        
        # 计算价格区间的平均销量
        price_range = 0.1  # 价格范围±10%
        price_lower = price * (1 - price_range)
        price_upper = price * (1 + price_range)
        price_avg = data[(data['单价'] >= price_lower) & 
                        (data['单价'] <= price_upper)]['销量'].mean()
        
        # 计算同时段(星期几和月份)的历史平均销量
        time_avg = data[(data['成交时间'].dt.weekday == weekday) & 
                       (data['成交时间'].dt.month == month)]['销量'].mean()
        
        # 生成分析结果
        analysis = {
            'category_comparison': round((prediction / category_avg * 100) if category_avg > 0 else 100),
            'store_comparison': round((prediction / store_avg * 100) if store_avg > 0 else 100),
            'price_comparison': round((prediction / price_avg * 100) if price_avg > 0 else 100),
            'time_comparison': round((prediction / time_avg * 100) if time_avg > 0 else 100),
            'category_avg': round(category_avg if not pd.isna(category_avg) else 0),
            'store_avg': round(store_avg if not pd.isna(store_avg) else 0),
            'price_avg': round(price_avg if not pd.isna(price_avg) else 0),
            'time_avg': round(time_avg if not pd.isna(time_avg) else 0)
        }
        
        return jsonify({
            'prediction': int(prediction),
            'category': category,
            'category_name': CATEGORY_NAMES.get(category, f'类别{category}'),
            'store': store,
            'store_name': store_name,
            'price': price,
            'weekday': weekday,
            'month': month,
            'analysis': analysis
        })
    except Exception as e:
        print(f"预测错误: {str(e)}")
        return jsonify({'error': str(e)}), 400

@app.route('/prediction')
def prediction_page():
    """销售预测页面"""
    data = load_data()
    categories = sorted(data['类别ID'].astype(str).unique().tolist())
    stores = sorted(data['门店编号'].astype(str).unique().tolist())
    
    # 创建类别选项列表,包含ID和名称
    category_options = [
        {'id': cat_id, 'name': CATEGORY_NAMES.get(cat_id, f'类别{cat_id}')} 
        for cat_id in categories
    ]
    
    # 创建门店选项列表
    store_options = [
        {'id': store_id, 'name': STORE_INFO.get(store_id, {}).get('name', f'门店{store_id}')}
        for store_id in stores
    ]
    
    # 初始化模型(如果需要)
    global model
    if model is None:
        initialize_model()
    
    return render_template('prediction.html', 
                         categories=category_options,
                         stores=store_options)

3. 模型评估

@app.route('/model_evaluation')
def model_evaluation():
    """模型评估页面"""
    data = load_data()
    
    # 准备特征
    X, y, le_category, le_store = prepare_features(data)
    
    # 训练模型并获取评估结果
    _, _, metrics, feature_importance, scatter_data, residual_data, feature_names, importance_scores = train_models(X, y)
    
    return render_template('model_evaluation.html',
                         metrics=metrics,
                         feature_importance=feature_importance,
                         scatter_data=scatter_data,
                         residual_data=residual_data,
                         feature_names=feature_names,
                         importance_scores=importance_scores)

4. 训练模型

def train_models(X, y):
    """训练模型"""
    # 数据分割
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # 标准化特征
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # 训练决策树模型
    dt_model = DecisionTreeRegressor(random_state=42, max_depth=10)
    dt_model.fit(X_train_scaled, y_train)
    
    # 预测
    y_pred = dt_model.predict(X_test_scaled)
    
    # 计算模型指标
    metrics = {
        'r2_score': r2_score(y_test, y_pred),
        'mse': mean_squared_error(y_test, y_pred),
        'mae': mean_absolute_error(y_test, y_pred),
        'rmse': np.sqrt(mean_squared_error(y_test, y_pred))
    }
    
    # 特征重要性
    feature_importance = []
    for name, importance in zip(X.columns, dt_model.feature_importances_):
        correlation = np.corrcoef(X[name], y)[0, 1]
        feature_importance.append({
            'name': name,
            'importance': importance,
            'correlation': correlation
        })
    
    # 准备图表数据
    scatter_data = [[float(actual), float(pred)] for actual, pred in zip(y_test, y_pred)]
    residuals = y_test - y_pred
    residual_data = [[float(pred), float(residual)] for pred, residual in zip(y_pred, residuals)]
    
    return dt_model, scaler, metrics, feature_importance, scatter_data, residual_data, X.columns.tolist(), dt_model.feature_importances_.tolist()


http://www.kler.cn/a/569185.html

相关文章:

  • Notpad++通过SFTP连接ubuntu20.04实现windows下文件修改
  • 计算机面试项目经历描述技巧
  • 530 Login fail. A secure connection is requiered(such as ssl)-java发送QQ邮箱(简单配置)
  • 回归实战详细代码+解析:预测新冠感染人数
  • DeepSeek的开源周有什么看点?
  • DeepEP库开源啦!DeepSeek优化GPU通信,破算力瓶颈。
  • 计算机网络——详解TCP三握四挥
  • Java进阶——常用工具类
  • 从头开始学SpringMVC—02获取请求参数向域对象共享数据
  • 前端js搭建(搭建后包含cookie,弹窗,禁用f12)
  • 聊一聊 IM 如何优化架构?
  • PyCharm接入本地部署DeepSeek 实现AI编程!【支持windows与linux】
  • hivePB级迁移方案
  • 算法-二叉树篇26-将有序数组转换为二叉搜索树
  • 基于 OpenAI ChatGPT 3.5 的 LangGraph 对话机器人示例
  • Visual Studio 2022开发C++程序实现目录下重复文件查找
  • 【SpringBoot+Vue】博客项目开发二:用户登录注册模块
  • c++ 文件及基本读写总结
  • Qt之QStateMachine等待
  • 【每日八股】MySQL篇(四):索引(下)