KNN算法原理及python代码实现
目录
一、KNN算法简介
二、KNN算法的工作原理
扩展:距离度量的几种常见方法
三、优缺点分析
四、应用场景
五、python代码实现
5.1 问题引入
5.2、需求概要分析
5.3 K近邻算法解决问题的一般流程
5.3.1 数据准备
5.3.2 选择距离度量方法
5.3.3 确定K值
5.3.4 找到K个最近邻居
5.3.5 预测
5.3.6 评估
扩展:常见的评估指标
5.3.7 优化
5.4 python代码实现及结果分析
扩展:不进行评估 & 初始给定数据集不进行划分,全部用作训练使用
一、KNN算法简介
- 定义:KNN是一种基于实例的学习方法,它不构建模型而是直接使用训练数据集来对新样本进行预测。
- 基本思想:通过计算新样本与已知类别样本之间的距离,找到最近的k个邻居,并根据这些邻居的多数类别来进行预测。
二、KNN算法的工作原理
-
输入:训练数据集(包含特征和标签)、测试集(待预测样本)、参数K(邻居数量)。
注:
- K值的选择直接影响模型的复杂度和泛化能力:较小的K值会使模型对噪声敏感,容易过拟合;较大的K值则可能导致模型欠拟合。
- 常用的确认k值的办法是K折交叉验证(Cross-Validation),具体思路如下:通过将训练数据集分成若干子集,使用其中的一部分作为验证集,其余部分作为训练集进行训练。轮流使用不同的子集作为验证集,重复这个过程多次,最终根据验证集上的表现来评估不同K值的效果。
-
输出:待预测样本的类别或数值。
具体步骤:
-
计算距离:计算待预测样本与训练集中每个样本的距离,常用欧氏距离、曼哈顿距离等。
-
选择K个最近邻居:根据距离排序,选择距离最近的K个样本。
-
分类任务:统计K个邻居中出现最多的类别,作为预测结果。
-
回归任务:计算K个邻居的平均值,作为预测结果。
扩展:距离度量的几种常见方法
欧氏距离:适用于连续数值型数据
其中,x和y是两个样本的特征向量,n是特征的维度。
曼哈顿距离
适用于特征值在不同维度上变化范围差异较大的情况。
明可夫斯基距离:是欧氏距离和曼哈顿距离的推广形式
当p=2时,为欧氏距离;当p=1时,为曼哈顿距离。
余弦相似度:适用于文本等稀疏数据
其中,x⋅y表示向量的点积,∥x∥和∥y∥表示向量的模。
三、优缺点分析
-
优点:
-
简单易实现。
-
无需训练过程,适合动态数据集。
-
-
缺点:
-
计算量大,尤其在大数据集上。
-
对噪声和无关特征敏感。
-
需要大量存储空间。
-
四、应用场景
-
分类:如手写数字识别、图像分类。
-
回归:如房价预测。
五、python代码实现
5.1 问题引入
海伦一直使用在线约会网站寻找适合自己的约会对象。她曾交往过三种类型的人:
- 不喜欢的人
- 一般喜欢的人
- 非常喜欢的人
这些人包含以下三种特征:
- 每年获得的飞行常客里程数
- 玩视频游戏所耗时间百分比
- 每周消费的冰淇淋公升数
该网站现在需要尽可能向海伦推荐她喜欢的人,需要我们设计一个分类器,根据用户的以上三种特征,识别出是否该向海伦推荐。
5.2、需求概要分析
根据问题,我们可知,样本特征个数为3,样本标签为三类。现需要实现将一个待分类样本的三个特征值输入程序后,能够识别该样本的类别,并且将该类别输出。
5.3 K近邻算法解决问题的一般流程
5.3.1 数据准备
这包括数据收集、数据清洗和数据预处理
数据预处理可能包括归一化或标准化特征,以确保所有特征在计算距离时具有相等的权重。
上图为给定数据集的部分截图,第一列表示每年获得的飞行常客里程数,第二列表示玩视频游戏所耗时间百分比,第三列表示每周消费的冰淇淋公升数,第四列表示海伦的态度(不喜欢,一般喜欢,非常喜欢)
观察上图,我们可以发现数据集中的数据值,不难发现,第一个特征的值和第二个特征的值的值远大于第三个特征的值,如果我们不进行处理的话,很容易就造成前两个特征的值会主导距离度量计算的值,导致整个模型会更偏向于依赖前两个特征的值,从而忽略了第三个特征
面对这种情况,我们的解决办法是对数据进行归一化处理,归一化可以确保每个特征都最终距离的计算有大致的相等的贡献。这样,训练出来的模型就不会因为某个特征的值过大,就偏向该特征。
这一步的侧重点在对数据进行归一化处理
公式:
其中
- x 是原始特征值。
- min(𝑋)min(X) 是特征列中的最小值。
- max(𝑋)max(X) 是特征列中的最大值。
5.3.2 选择距离度量方法
确定用于比较样本之间相似性的度量方法,常见的如欧几里得距离、曼哈顿距离等。
距离度量方法我们选择 欧几里得距离
5.3.3 确定K值
选择一个K值,即在分类或回归时应考虑的邻居数量。这是一个超参数,可以通过交叉验证等方法来选择最优的K值。
数据总量是1000,其中训练数据和测试数据比例为 8:2
这里我是直接指定 k=10(最后评估出来的精度有0.94,我感觉还是可以的)
5.3.4 找到K个最近邻居
对于每一个需要预测的未标记的样本
- -计算该样本与训练集中所有样本的距离
- 根据距离对它们进行排序。
- 选择距离最近的K个样本
5.3.5 预测
- 分类任务:查看K个最近邻居中最常见的类别,作为预测结果。例如,如果K=3,并且三个最近邻居的类别是[1, 2, 1],那么预测结果就是类别1。
- 回归任务:预测结果可以是K个最近邻居的平均值或加权平均值。
这里我们要做的是分类任务
5.3.6 评估
使用适当的评价指标(如准确率、均方误差等)评估模型的性能。
这里我们计算 精度 还有 F1分数
扩展:常见的评估指标
回归任务
- 均方误差(Mean Squared Error,MSE):
分类任务
- 错误率(Error Rate)
错误率是分类任务中的基本性能度量,表示被错误分类的样本比例。
(error:错误分类的样本数;m:样本总数)
- 精度(Accuracy)
精度是分类任务中最直观的性能度量,表示模型正确分类的样本比例。
(correct:正确分类的样本数;m:样本总数)
- 召回率 (Recall)和精确率(Precision)
召回率衡量实际为正例的样本中被模型预测为正例的比例。
精确率衡量被模型预测为正例的样本中实际为正例的比例
- F1分数 (F1 Score)
F1 分数是精却率和召回率的调和平均值,用于综合评估模型的分类性能。
5.3.7 优化
基于性能评估结果,可能需要返回并调整某些参数,如K值、距离度量方法等,以获得更好的性能。
5.4 python代码实现及结果分析
import numpy as np
import pandas as pd
from collections import Counter
import matplotlib.pyplot as plt
# 读取数据
file_path = r"D:\Desktop\Code\PYTHON\knn\data.txt"
data = pd.read_csv(file_path, sep='\t', header=None, names=['Miles', 'GameTime', 'IceCream', 'Label'])
label_mapping = {'didntLike': 0, 'smallDoses': 1, 'largeDoses': 2}
data['Label'] = data['Label'].map(label_mapping)
features = data[['Miles', 'GameTime', 'IceCream']].values
labels = data['Label'].values
def min_max_normalize(feature):
return (feature - np.min(feature)) / (np.max(feature) - np.min(feature))
normalized_features = np.apply_along_axis(min_max_normalize, 0, features)
def train_test_split_manual(X, y, test_size=0.2, random_state=None):
if random_state is not None:
np.random.seed(random_state)
shuffled_indices = np.random.permutation(len(X))
test_size = int(test_size * len(X))
test_indices = shuffled_indices[:test_size]
train_indices = shuffled_indices[test_size:]
X_train, X_test = X[train_indices], X[test_indices]
y_train, y_test = y[train_indices], y[test_indices]
return X_train, X_test, y_train, y_test
X_train, X_test, y_train, y_test = train_test_split_manual(normalized_features, labels, test_size=0.2, random_state=42)
class KNN:
def __init__(self, k=10, distance_metric='euclidean'):
self.k = k
self.distance_metric = distance_metric
def fit(self, X, y):
self.X_train = X
self.y_train = y
def predict(self, X):
predictions = [self._predict(x) for x in X]
return np.array(predictions)
def _predict(self, x):
distances = []
for x_train in self.X_train:
dist = np.sqrt(np.sum((x_train - x) ** 2)) if self.distance_metric == 'euclidean' else np.sum(
np.abs(x_train - x))
distances.append(dist)
k_indices = np.argsort(distances)[:self.k]
k_nearest_labels = [self.y_train[i] for i in k_indices]
most_common_label = Counter(k_nearest_labels).most_common(1)[0][0]
return most_common_label
# 计算准确率
def calculate_accuracy(y_true, y_pred):
correct_predictions = sum(y_t == y_p for y_t, y_p in zip(y_true, y_pred))
return correct_predictions / len(y_true)
# 计算F1分数
def calculate_f1_score(y_true, y_pred):
true_positives = {label: 0 for label in set(y_true)}
false_positives = {label: 0 for label in set(y_true)}
false_negatives = {label: 0 for label in set(y_true)}
for y_t, y_p in zip(y_true, y_pred):
if y_t == y_p:
true_positives[y_t] += 1
else:
false_positives[y_p] += 1
false_negatives[y_t] += 1
precision_recall_f1_per_class = {}
for label in set(y_true):
precision = true_positives[label] / (true_positives[label] + false_positives[label]) if (true_positives[label] +
false_positives[
label]) != 0 else 0
recall = true_positives[label] / (true_positives[label] + false_negatives[label]) if (true_positives[label] +
false_negatives[
label]) != 0 else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) != 0 else 0
precision_recall_f1_per_class[label] = {'Precision': precision, 'Recall': recall, 'F1': f1}
# 计算加权平均F1分数
unique_labels, counts = np.unique(y_true, return_counts=True)
class_weights = counts / len(y_true)
weighted_f1 = sum(
[precision_recall_f1_per_class[label]['F1'] * weight for label, weight in zip(unique_labels, class_weights)])
return weighted_f1
k = 10
knn = KNN(k=k, distance_metric='euclidean')
knn.fit(X_train, y_train)
predictions = knn.predict(X_test)
# 输出信息
print(f"Total data size: {len(data)}")
print(f"Training data size: {len(X_train)}")
print(f"Test data size: {len(X_test)}")
correct_predictions = sum(y_t == y_p for y_t, y_p in zip(y_test, predictions))
acc = calculate_accuracy(y_test, predictions)
f1 = calculate_f1_score(y_test, predictions)
print(f"Accuracy: {acc:.2f}")
print(f"F1 Score: {f1:.2f}")
def visualize_classification_3d(X_train, y_train, X_test, y_test, predictions):
fig = plt.figure(figsize=(12, 6))
ax1 = fig.add_subplot(121, projection='3d')
ax2 = fig.add_subplot(122, projection='3d')
colors = ['red', 'green', 'blue']
labels_name = ['didntLike', 'smallDoses', 'largeDoses']
# 绘制训练集分布图
for i in range(len(labels_name)):
mask = y_train == i
ax1.scatter(X_train[mask, 0], X_train[mask, 1], X_train[mask, 2], c=colors[i], label=labels_name[i], alpha=0.5)
ax1.set_title('Training Set Distribution')
ax1.set_xlabel('Normalized ' + data.columns[0])
ax1.set_ylabel('Normalized ' + data.columns[1])
ax1.set_zlabel('Normalized ' + data.columns[2])
# 绘制测试集及其预测结果
for i in range(len(labels_name)):
mask = y_test == i
ax2.scatter(X_test[mask, 0], X_test[mask, 1], X_test[mask, 2], c=colors[i], marker='x',
label=f'Test {labels_name[i]}')
wrong_pred_mask = predictions != y_test
ax2.scatter(X_test[wrong_pred_mask, 0], X_test[wrong_pred_mask, 1], X_test[wrong_pred_mask, 2], c='black', marker='o',
edgecolors='red', s=100, label='Wrong Predictions')
ax2.set_title('Test Set with Predictions')
ax2.set_xlabel('Normalized ' + data.columns[0])
ax2.set_ylabel('Normalized ' + data.columns[1])
ax2.set_zlabel('Normalized ' + data.columns[2])
ax1.legend()
ax2.legend()
plt.tight_layout()
plt.show()
visualize_classification_3d(X_train, y_train, X_test, y_test, predictions)
终端输出
可视化结果输出
注:左图是基于训练集的结果图,右图是基于测试集的测试结果图,其中重点标注了测试结果错误的样本点
扩展:不进行评估 & 初始给定数据集不进行划分,全部用作训练使用
import numpy as np
import pandas as pd
from collections import Counter
# 1. 读取并预处理数据
file_path = r"D:\Desktop\Code\PYTHON\knn\data.txt"
data = pd.read_csv(file_path, sep='\t', header=None, names=['Miles', 'GameTime', 'IceCream', 'Label'])
# 将标签列转换为数值类型
label_mapping = {'didntLike': 0, 'smallDoses': 1, 'largeDoses': 2}
data['Label'] = data['Label'].map(label_mapping)
# 分离特征和标签
features = data[['Miles', 'GameTime', 'IceCream']].values
labels = data['Label'].values
# 归一化处理
def min_max_normalize(feature):
return (feature - np.min(feature)) / (np.max(feature) - np.min(feature))
normalized_features = np.apply_along_axis(min_max_normalize, 0, features)
# 2. 定义KNN算法
class KNN:
def __init__(self, k=10, distance_metric='euclidean'):
self.k = k
self.distance_metric = distance_metric
def fit(self, X, y):
self.X_train = X
self.y_train = y
def predict(self, X):
predictions = [self._predict(x) for x in X]
return np.array(predictions)
def _predict(self, x):
# 计算距离
distances = []
for x_train in self.X_train:
if self.distance_metric == 'euclidean':
dist = np.sqrt(np.sum((x_train - x) ** 2))
elif self.distance_metric == 'manhattan':
dist = np.sum(np.abs(x_train - x))
else:
raise ValueError("Unsupported distance metric")
distances.append(dist)
# 找到最近的K个邻居
k_indices = np.argsort(distances)[:self.k]
k_nearest_labels = [self.y_train[i] for i in k_indices]
# 多数表决
most_common_label = Counter(k_nearest_labels).most_common(1)[0][0]
return most_common_label
# 3. 使用KNN进行分类
k = 10
knn = KNN(k=k, distance_metric='euclidean')
knn.fit(normalized_features, labels)
# 4. 手动输入特征值并进行预测
def get_input():
try:
miles = float(input("请输入每年获得的飞行常客里程数: "))
game_time = float(input("请输入玩视频游戏所耗时间百分比: "))
ice_cream = float(input("请输入每周消费的冰淇淋公升数: "))
return np.array([miles, game_time, ice_cream])
except ValueError:
print("输入无效,请输入数字!")
return get_input()
def normalize_input(input_feature, original_features):
normalized_input = []
for i, feature in enumerate(input_feature):
min_val = np.min(original_features[:, i])
max_val = np.max(original_features[:, i])
normalized_val = (feature - min_val) / (max_val - min_val)
normalized_input.append(normalized_val)
return np.array(normalized_input)
# 获取用户输入并进行预测
user_input = get_input()
normalized_input = normalize_input(user_input, features)
# 进行预测
prediction = knn.predict([normalized_input])[0]
reverse_label_mapping = {0: 'didntLike', 1: 'smallDoses', 2: 'largeDoses'}
predicted_label = reverse_label_mapping[prediction]
print(f"预测结果: {predicted_label}")