import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection
import train_test_split, KFold
from sklearn.metrics import mean_squared_error, r2_score
from mpl_toolkits.mplot3d import Axes3D # 用于3D图
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# 1. 读取数据并进行处理
data = pd.read_csv('data.csv')
# 提取输入 (X) 和输出 (Y)
X = data['X'].values.reshape(-1, 1)
Y = data['Y'].values
# 划分训练集和测试集,70% 训练,30% 测试
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.3, random_state=42)
# 为输入 X 添加一列 1 以考虑截距项 (bias)
X_train_b = np.c_[np.ones((X_train.shape[0], 1)), X_train] # 添加截距项
X_test_b = np.c_[np.ones((X_test.shape[0], 1)), X_test]
# 初始化参数 (theta)
theta = np.zeros(2)
# 定义超参数
learning_rate = 0.01
n_iterations = 1000
# 计算代价函数 (均方误差)def compute_cost(X, Y, theta):
m = len(Y)
predictions = X.dot(theta)
cost = (1 / (2 * m)) * np.sum((predictions - Y) ** 2)
return cost
# 梯度下降算法def gradient_descent(X, Y, theta, learning_rate, n_iterations):
m = len(Y)
cost_history = np.zeros(n_iterations)
for iteration in range(n_iterations):
gradients = (1 / m) * X.T.dot(X.dot(theta) - Y)
theta = theta - learning_rate * gradients
cost_history[iteration] = compute_cost(X, Y, theta)
return theta, cost_history
# 交叉验证函数def cross_validation(X, Y, learning_rate, n_iterations, k_folds=5):
kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
cv_mse = []
for train_index, val_index in kfold.split(X):
X_train_fold, X_val_fold = X[train_index], X[val_index]
Y_train_fold, Y_val_fold = Y[train_index], Y[val_index]
# 为每个 fold 的训练数据添加 bias
X_train_fold_b = np.c_[np.ones((X_train_fold.shape[0], 1)), X_train_fold]
X_val_fold_b = np.c_[np.ones((X_val_fold.shape[0], 1)), X_val_fold]
# 初始化 theta
theta = np.zeros(X_train_fold_b.shape[1])
# 使用梯度下降训练模型
theta_final, _ = gradient_descent(X_train_fold_b, Y_train_fold, theta, learning_rate, n_iterations)
# 对验证集进行预测
Y_val_pred = predict(X_val_fold, theta_final)
# 计算均方误差
mse = mean_squared_error(Y_val_fold, Y_val_pred)
cv_mse.append(mse)
# 返回交叉验证的平均MSE
return np.mean(cv_mse)
# 预测函数def predict(X, theta):
X_b = np.c_[np.ones((X.shape[0], 1)), X] # 添加截距项
return X_b.dot(theta)
# 自动调优学习率和迭代次数,并加入交叉验证
best_theta = None
best_mse = float('inf')
best_learning_rate = None
best_iterations = None
learning_rates = [0.001, 0.01, 0.02]
iteration_steps = [400, 500, 1000, 2000, 4000]
mse_results = np.zeros((len(learning_rates), len(iteration_steps)))
for i, lr in enumerate(learning_rates):
for j, iterations in enumerate(iteration_steps):
cv_mse = cross_validation(X_train, Y_train, lr, iterations)
mse_results[i, j] = cv_mse # 记录每次的MSE
if cv_mse < best_mse:
best_mse = cv_mse
best_learning_rate = lr
best_iterations = iterations
print(f"Best MSE after cross-validation: {best_mse}, Best Learning Rate: {best_learning_rate}, Best Iterations: {best_iterations}")
# 使用最优学习率和迭代次数重新训练模型
theta_final, cost_history = gradient_descent(X_train_b, Y_train, np.zeros(2), best_learning_rate, best_iterations)
# 计算训练集和测试集的拟合程度
Y_train_pred = predict(X_train, theta_final)
Y_test_pred = predict(X_test, theta_final)
# 计算均方误差和R2
train_mse = mean_squared_error(Y_train, Y_train_pred)
test_mse = mean_squared_error(Y_test, Y_test_pred)
train_r2 = r2_score(Y_train, Y_train_pred)
test_r2 = r2_score(Y_test, Y_test_pred)
print(f"Train MSE: {train_mse}, Train R2: {train_r2}")print(f"Test MSE: {test_mse}, Test R2: {test_r2}")
# 1. 可视化训练集和测试集的散点图与拟合直线
plt.figure(figsize=(10, 6))
plt.scatter(X_train, Y_train, color='blue', label='Train Data')
plt.scatter(X_test, Y_test, color='orange', label='Test Data')
# 画拟合直线
X_range = np.linspace(min(X), max(X), 100)
Y_pred_line = predict(X_range, theta_final)
plt.plot(X_range, Y_pred_line, color='red', label='Fitted Line')
# 画新样本的预测结果# 定义多个新输入数据
X_new_sample = np.array([7.0, 8.5, 6.0, 9.0, 5.5]) # 示例多个新输入# 对新输入进行预测
Y_new_pred = predict(X_new_sample, theta_final)print(Y_new_pred)
plt.scatter(X_new_sample, Y_new_pred, color='green', marker='x', s=100, label='Prediction for X=7.0')
plt.title('训练集、测试集与预测结果的拟合曲线')
plt.xlabel('X')
plt.ylabel('Y')
plt.legend()
plt.grid(True)
plt.show()
# 2. 可视化损失函数变化
plt.figure(figsize=(10, 6))
plt.plot(range(len(cost_history)), cost_history, color='green', label='Cost Function')
plt.title('损失的变化图')
plt.xlabel('Number of Iterations')
plt.ylabel('Cost (MSE)')
plt.grid(True)
plt.legend()
plt.show()
# 3. 可视化最佳参数选择(学习率和迭代次数的搜索过程)
X_lr, Y_iter = np.meshgrid(iteration_steps, learning_rates)
fig = plt.figure(figsize=(10, 6))
ax = fig.add_subplot(111, projection='3d')
ax.plot_surface(X_lr, Y_iter, mse_results, cmap='viridis')
ax.set_xlabel('Iterations')
ax.set_ylabel('Learning Rate')
ax.set_zlabel('MSE')
ax.set_title('Learning Rate and Iterations vs. MSE')
plt.show()