学习日记_20241123_聚类方法(高斯混合模型)续
前言
提醒:
文章内容为方便作者自己后日复习与查阅而进行的书写与发布,其中引用内容都会使用链接表明出处(如有侵权问题,请及时联系)。
其中内容多为一次书写,缺少检查与订正,如有问题或其他拓展及意见建议,欢迎评论区讨论交流。
文章目录
- 前言
- 续:
- 手动实现
- 代码分析
- def __init__(self, n_components, max_iter=100, tol=1e-6)
- def initialize_parameters(self, X)
- def e_step(self, X)
- def m_step(self, X)
- 数学背景
- fit(self, X)
- predict(self, X)
续:
学习日记_20241117_聚类方法(高斯混合模型)
手动实现
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal
class GaussianMixtureModel:
def __init__(self, n_components, max_iter=100, tol=1e-6):
self.n_components = n_components
self.max_iter = max_iter
self.tol = tol
def initialize_parameters(self, X):
n_samples, n_features = X.shape
self.weights = np.ones(self.n_components) / self.n_components
self.means = X[np.random.choice(n_samples, self.n_components, replace=False)]
self.covariances = np.array([np.cov(X, rowvar=False)] * self.n_components)
def e_step(self, X):
n_samples = X.shape[0]
self.responsibilities = np.zeros((n_samples, self.n_components))
for k in range(self.n_components):
pdf = multivariate_normal(mean=self.means[k], cov=self.covariances[k])
self.responsibilities[:, k] = self.weights[k] * pdf.pdf(X)
self.responsibilities /= self.responsibilities.sum(axis=1, keepdims=True)
def m_step(self, X):
n_samples, n_features = X.shape
effective_n = self.responsibilities.sum(axis=0)
self.means = np.dot(self.responsibilities.T, X) / effective_n[:, np.newaxis]
self.covariances = np.zeros((self.n_components, n_features, n_features))
for k in range(self.n_components):
X_centered = X - self.means[k]
self.covariances[k] = (self.responsibilities[:, k][:, np.newaxis] * X_centered).T @ X_centered
self.covariances[k] /= effective_n[k]
self.weights = effective_n / n_samples
def fit(self, X):
self.initialize_parameters(X)
log_likelihood_old = 0
for iteration in range(self.max_iter):
self.e_step(X)
self.m_step(X)
log_likelihood = 0
for k in range(self.n_components):
pdf = multivariate_normal(mean=self.means[k], cov=self.covariances[k])
log_likelihood += np.sum(self.responsibilities[:, k] * np.log(self.weights[k] * pdf.pdf(X) + 1e-10))
if np.abs(log_likelihood - log_likelihood_old) < self.tol:
print(f"Converged at iteration {iteration}")
break
log_likelihood_old = log_likelihood
def predict(self, X):
n_samples = X.shape[0]
likelihoods = np.zeros((n_samples, self.n_components))
for k in range(self.n_components):
pdf = multivariate_normal(mean=self.means[k], cov=self.covariances[k])
likelihoods[:, k] = self.weights[k] * pdf.pdf(X)
return np.argmax(likelihoods, axis=1)
# 可视化结果
def plot_results(X, labels, means, covariances):
plt.figure(figsize=(8, 6))
unique_labels = np.unique(labels)
colors = ['r', 'g', 'b', 'c', 'm', 'y']
for i, label in enumerate(unique_labels):
plt.scatter(X[labels == label, 0], X[labels == label, 1], s=20, color=colors[i % len(colors)], label=f"Cluster {label + 1}")
# 绘制高斯分布的等高线
mean = means[label]
cov = covariances[label]
x, y = np.meshgrid(np.linspace(X[:, 0].min() - 1, X[:, 0].max() + 1, 100),
np.linspace(X[:, 1].min() - 1, X[:, 1].max() + 1, 100))
pos = np.dstack((x, y))
rv = multivariate_normal(mean, cov)
plt.contour(x, y, rv.pdf(pos), levels=5, colors=colors[i % len(colors)], alpha=0.5)
plt.title("Gaussian Mixture Model Clustering")
plt.xlabel("Feature 1")
plt.ylabel("Feature 2")
plt.legend()
plt.show()
# 测试
if __name__ == "__main__":
np.random.seed(42)
X1 = np.random.normal(0, 1, (100, 2))
X2 = np.random.normal(5, 1, (100, 2))
X3 = np.random.normal(10, 1, (100, 2))
X = np.vstack((X1, X2, X3))
plt.scatter(X[:, 0], X[:, 1])
plt.title("Original Data")
plt.xlabel("Feature 1")
plt.ylabel("Feature 2")
gmm = GaussianMixtureModel(n_components=3)
gmm.fit(X)
labels = gmm.predict(X)
plot_results(X, labels, gmm.means, gmm.covariances)
数据与结果为:
代码分析
def init(self, n_components, max_iter=100, tol=1e-6)
class GaussianMixtureModel:
def __init__(self, n_components, max_iter=100, tol=1e-6):
self.n_components = n_components
self.max_iter = max_iter
self.tol = tol
n_components
:高斯混合模型中的组件数,即聚类的数量。max_iter
:最大迭代次数,防止算法不收敛时无限循环。tol
:收敛的容忍度,用于判断算法是否收敛。
def initialize_parameters(self, X)
def initialize_parameters(self, X):
n_samples, n_features = X.shape
self.weights = np.ones(self.n_components) / self.n_components
self.means = X[np.random.choice(n_samples, self.n_components, replace=False)]
self.covariances = np.array([np.cov(X, rowvar=False)] * self.n_components)
self.weights = np.ones(self.n_components) / self.n_components
设 K K K 为高斯混合模型中的组件数(即n_components
)则每个组件的初始权重 π k \pi_k πk 为: π k = 1 K \pi_k = \frac{1}{K} πk=K1,其中 k k k 的取值范围是 1 1 1 到 K K K。self.weights
为一个长度为n_components
的数组:
π 1 = π 2 = ⋯ = π K = 1 K \pi_1 = \pi_2 = \cdots = \pi_K = \frac{1}{K} π1=π2=⋯=πK=K1
self.means = X[np.random.choice(n_samples, self.n_components, replace=False)]
设 X X X 为数据集,其中包含 n n n 个样本(即n_samples
),每个样本是一个特征向量。设 K K K 为高斯混合模型中的组件数(即n_components
)。
从数据集 X X X 中随机选择 K K K 个不重复的样本作为初始均值向量。用数学表达式表示就是:
μ k = X i k \mu_k = X_{i_k} μk=Xik
其中:
- μ k \mu_k μk 是第 k k k 个组件的初始均值向量。
- X i k X_{i_k} Xik 是数据集 X X X 中的第 i k i_k ik 个样本。
- i k i_k ik 是从集合 { 1 , 2 , … , n } \{1, 2, \ldots, n\} {1,2,…,n} 中随机选择的不重复的 K K K 个整数。
具体地,可以表示为:
μ 1 , μ 2 , … , μ K = X i 1 , X i 2 , … , X i K \mu_1, \mu_2, \ldots, \mu_K = X_{i_1}, X_{i_2}, \ldots, X_{i_K} μ1,μ2,…,μK=Xi1,Xi2,…,XiK
其中 { i 1 , i 2 , … , i K } \{i_1, i_2, \ldots, i_K\} {i1,i2,…,iK} 是从 { 1 , 2 , … , n } \{1, 2, \ldots, n\} {1,2,…,n} 中随机选择的不重复的 K K K 个整数。
在代码中,np.random.choice(n_samples, self.n_components, replace=False)
实现了从 n n n 个样本中随机选择 K K K 个不重复的样本索>引,然后通过这些索引从数据集 X X X 中提取相应的样本作为初始均值向量。
self.covariances = np.array([np.cov(X, rowvar=False)] * self.n_components)
这行代码的目的是初始化高斯混合模型(Gaussian Mixture Model,GMM)中每个组件的协方差矩阵。让我们逐步分析这行代码:
np.cov(X, rowvar=False)
: 这个函数计算输入数据X
的协方差矩阵。参数rowvar=False
表示每一列代表一个变量,而每一行代表一个观测值。这是在多维数据中常见的设置,其中每一维(或特征)存储在单独的列中。[np.cov(X, rowvar=False)] * self.n_components
: 这部分代码创建了一个列表,其中包含self.n_components
个相同的协方差矩阵。self.n_components
是GMM中的组件数。np.array(...)
: 最后,这个列表被转换成一个NumPy数组,使得self.covariances
成为一个二维数组,其中每个元素都是一个协方差矩阵。
def e_step(self, X)
def e_step(self, X):
n_samples = X.shape[0]
self.responsibilities = np.zeros((n_samples, self.n_components))
for k in range(self.n_components):
pdf = multivariate_normal(mean=self.means[k], cov=self.covariances[k])
self.responsibilities[:, k] = self.weights[k] * pdf.pdf(X)
self.responsibilities /= self.responsibilities.sum(axis=1, keepdims=True)
self.responsibilities = np.zeros((n_samples, self.n_components))
这行代码在高斯混合模型(Gaussian Mixture Model, GMM)中用于初始化一个责任矩阵(responsibility matrix)。
(n_samples, self.n_components)
指定了数组的形状,其中n_samples
是输入数据集中样本的数量,self.n_components
是模型中高斯成分的数量。- 责任矩阵
self.responsibilities
的每一行对应于一个样本,而每一列对应于一个高斯成分。- 矩阵中的每个元素
self.responsibilities[i, j]
表示样本i
属于高斯成分j
的责任值(即概率)。它衡量了样本i
在当前模型参数下属于成分j
的可能性。
这段代码是高斯混合模型(Gaussian Mixture Model, GMM)中 E 步骤(Expectation Step)的一部分。它的主要作用是计算每个样本属于每个高斯成分的责任值(responsibility),并进行归一化处理。我们来逐行分析这段代码的含义:
for k in range(self.n_components):
这里的循环遍历所有的高斯成分
k
,self.n_components
是高斯成分的数量。
pdf = multivariate_normal(mean=self.means[k], cov=self.covariances[k])
使用scipy.stats.multivariate_normal
创建一个多元高斯分布对象self.means[k]
,协方差矩阵为self.covariances[k]
。这表示当前成分k
的概率密度函数(PDF)。
self.responsibilities[:, k] = self.weights[k] * pdf.pdf(X)
- 这一行计算每个样本在高斯成分
k
下的责任值,并将其存储在责任矩阵self.responsibilities
的第k
列中。self.weights[k]
是成分k
的混合权重,表示该成分在总体中占的比例。pdf.pdf(X)
计算所有输入样本X
在成分k
下的概率密度值。self.weights[k] * pdf.pdf(X)
计算每个样本在成分k
下的加权概率密度(即责任值)。
self.responsibilities /= self.responsibilities.sum(axis=1, keepdims=True)
- 这行代码对责任矩阵进行归一化处理,使得每个样本在所有成分上的责任值之和为 1。
self.responsibilities.sum(axis=1, keepdims=True)
计算每个样本在所有成分上的总责任值。- 将责任矩阵每一行的值除以该行的总和,确保每个样本的责任值在所有成分之间是一个概率分布。
def m_step(self, X)
def m_step(self, X):
n_samples, n_features = X.shape
effective_n = self.responsibilities.sum(axis=0)
self.means = np.dot(self.responsibilities.T, X) / effective_n[:, np.newaxis]
self.covariances = np.zeros((self.n_components, n_features, n_features))
for k in range(self.n_components):
X_centered = X - self.means[k]
self.covariances[k] = (self.responsibilities[:, k][:, np.newaxis] * X_centered).T @ X_centered
self.covariances[k] /= effective_n[k]
self.weights = effective_n / n_samples
effective_n = self.responsibilities.sum(axis=0)
这行代码的作用是计算每个高斯成分的有效样本数量(effective number of samples),并将其存储在变量effective_n
中。我们来详细分析这一行代码的含义。
self.responsibilities
:
- 这是之前提到的责任矩阵,形状为
(n_samples, n_components)
,其中每一行对应一个样本,每一列对应一个高斯成分。矩阵中的每个值表示样本对某个成分的责任。self.responsibilities.sum(axis=0)
:
sum(axis=0)
表示对责任矩阵的列进行求和。- 这将返回一个一维数组,长度等于高斯成分的数量(
n_components
)。- 数组中的每个元素
effective_n[k]
表示第k
个高斯成分的有效样本数量,即所有样本在该成分上的责任值之和。
- 有效样本数量:有效样本数量反映了每个高斯成分在当前模型下的“重要性”或“权重”。如果某个成分的有效样本数量很小,说明这个成分对当前数据的解释能力较弱。
self.means = np.dot(self.responsibilities.T, X) / effective_n[:, np.newaxis]
这行代码用于更新高斯混合模型中每个高斯成分的均值(means)。让我们逐步分析这行代码及其背后的逻辑。
self.responsibilities.T
:
self.responsibilities
是一个形状为(n_samples, n_components)
的矩阵,表示每个样本对每个高斯成分的责任值。self.responsibilities.T
是将责任矩阵进行转置,得到一个形状为(n_components, n_samples)
的矩阵。np.dot(self.responsibilities.T, X)
:
X
是输入数据矩阵,形状为(n_samples, n_features)
,表示n_samples
个样本的特征。- 执行矩阵乘法
np.dot(self.responsibilities.T, X)
将会产生一个形状为(n_components, n_features)
的矩阵。- 这里,每一行对应一个高斯成分,表示该成分加权后的特征总和。具体来说,这些特征是根据每个样本在该成分上的责任值加权后的结果。
effective_n[:, np.newaxis]
:
effective_n
是一个一维数组,表示每个高斯成分的有效样本数量(我们在之前的讨论中提到过)。- 使用
np.newaxis
将effective_n
转换为形状为(n_components, 1)
的列向量,这样可以在后续的除法运算中进行广播。- 最后的除法:
- 将加权特征总和除以每个成分的有效样本数量,得到加权均值。
- 这一步的结果将更新
self.means
,即每个高斯成分的新均值。数学背景
在高斯混合模型中,均值的更新可以用以下公式表示:
μ k = ∑ i = 1 N γ i k x i ∑ i = 1 N γ i k \mu_k = \frac{\sum_{i=1}^{N} \gamma_{ik} x_i}{\sum_{i=1}^{N} \gamma_{ik}} μk=∑i=1Nγik∑i=1Nγikxi
其中:
- μ k \mu_k μk是第 k k k个高斯成分的均值。
- γ i k \gamma_{ik} γik是样本 i i i对成分 k k k的责任值。
- x i x_i xi 是样本 i i i的特征向量。
这行代码正是这个公式的实现形式,通过矩阵运算来有效地更新每个成分的均值。
这段代码的目的是为高斯混合模型中的每个高斯成分计算协方差矩阵,并更新权重。
self.covariances = np.zeros((self.n_components, n_features, n_features))
这一行初始化每个高斯成分的协方差矩阵为零矩阵,形状为(n_components, n_features, n_features)
,即为每个成分分配一个 n features × n features n_{\text{features}} \times n_{\text{features}} nfeatures×nfeatures 的协方差矩阵。for k in range(self.n_components):
这个循环遍历每一个高斯成分,从 0 到self.n_components - 1
。X_centered = X - self.means[k]
这行代码计算每个样本相对于当前成分均值的偏差。X_centered
是一个包含所有样本的矩阵,其中每个样本的特征值都减去了该成分的均值。self.covariances[k] = (self.responsibilities[:, k][:, np.newaxis] * X_centered).T @ X_centered
这里计算协方差矩阵的主要部分:
self.responsibilities[:, k][:, np.newaxis]
:提取第 k k k 个成分的责任值,并将其转换为列向量。* X_centered
:将责任值与居中后的数据相乘,以加权样本。(.T @ X_centered)
:进行矩阵乘法,计算加权样本的外积,从而得到一个 n features × n features n_{\text{features}} \times n_{\text{features}} nfeatures×nfeatures 的矩阵。- 协方差计算
- 计算得到的结果表示了当前成分的加权协方差矩阵,但为了得到正确的协方差矩阵,还需要将其除以该成分的有效样本数量。
self.covariances[k] /= effective_n[k]
- 将加权结果除以
effective_n[k]
,得到第 k k k 个高斯成分的协方差矩阵。
- 权重更新
self.weights = effective_n / n_samples
- 最后一行代码用于计算每个高斯成分的权重。权重的计算是将每个成分的有效样本数量除以总样本数量
n_samples
,从而得到每个成分在整个模型中的比重。
fit(self, X)
def fit(self, X):
self.initialize_parameters(X)
log_likelihood_old = 0
for iteration in range(self.max_iter):
self.e_step(X)
self.m_step(X)
log_likelihood = 0
for k in range(self.n_components):
pdf = multivariate_normal(mean=self.means[k], cov=self.covariances[k])
log_likelihood += np.sum(self.responsibilities[:, k] * np.log(self.weights[k] * pdf.pdf(X) + 1e-10))
if np.abs(log_likelihood - log_likelihood_old) < self.tol:
print(f"Converged at iteration {iteration}")
break
log_likelihood_old = log_likelihood
这段代码实现了高斯混合模型(GMM)中的
fit
方法。它通过期望最大化(EM)算法来拟合模型,计算对数似然(log likelihood)。
- 初始化参数:
Initialize μ k , Σ k , and π k for all components k \text{Initialize } \mu_k, \Sigma_k, \text{ and } \pi_k \text{ for all components } k Initialize μk,Σk, and πk for all components k- E步(期望步):
在E步中,计算每个数据点对每个成分的责任(即每个组件生成该数据点的概率):
r n k = π k ⋅ N ( x n ∣ μ k , Σ k ) ∑ j = 1 K π j ⋅ N ( x n ∣ μ j , Σ j ) r_{nk} = \frac{\pi_k \cdot \mathcal{N}(x_n | \mu_k, \Sigma_k)}{\sum_{j=1}^{K} \pi_j \cdot \mathcal{N}(x_n | \mu_j, \Sigma_j)} rnk=∑j=1Kπj⋅N(xn∣μj,Σj)πk⋅N(xn∣μk,Σk)
其中, r n k r_{nk} rnk 是数据点 x n x_n xn 对于成分 k k k 的责任值, N ( x n ∣ μ k , Σ k ) \mathcal{N}(x_n | \mu_k, \Sigma_k) N(xn∣μk,Σk) 是多元正态分布的概率密度函数。- M步(最大化步):
在M步中,更新模型参数,即均值、协方差和权重:
- 更新均值:
μ k = 1 N k ∑ n = 1 N r n k x n \mu_k = \frac{1}{N_k} \sum_{n=1}^{N} r_{nk} x_n μk=Nk1n=1∑Nrnkxn- 更新协方差:
Σ k = 1 N k ∑ n = 1 N r n k ( x n − μ k ) ( x n − μ k ) T \Sigma_k = \frac{1}{N_k} \sum_{n=1}^{N} r_{nk} (x_n - \mu_k)(x_n - \mu_k)^T Σk=Nk1n=1∑Nrnk(xn−μk)(xn−μk)T- 更新权重:
π k = N k N \pi_k = \frac{N_k}{N} πk=NNk
其中, N k = ∑ n = 1 N r n k N_k = \sum_{n=1}^{N} r_{nk} Nk=∑n=1Nrnk 是属于成分 k k k 的有效样本数量。
- 对数似然计算:
对数似然是整个数据集 X X X 在模型下的对数概率:
log L = ∑ n = 1 N log ( ∑ k = 1 K π k ⋅ N ( x n ∣ μ k , Σ k ) ) \log L = \sum_{n=1}^{N} \log \left( \sum_{k=1}^{K} \pi_k \cdot \mathcal{N}(x_n | \mu_k, \Sigma_k) \right) logL=n=1∑Nlog(k=1∑Kπk⋅N(xn∣μk,Σk))
由于在计算中添加了一个小的常数 1 e − 10 1e-10 1e−10 以避免对数计算中的数值问题,完整的表达式为:
log L ≈ ∑ n = 1 N ∑ k = 1 K r n k log ( π k ⋅ N ( x n ∣ μ k , Σ k ) + 1 e − 10 ) \log L \approx \sum_{n=1}^{N} \sum_{k=1}^{K} r_{nk} \log \left( \pi_k \cdot \mathcal{N}(x_n | \mu_k, \Sigma_k) + 1e-10 \right) logL≈n=1∑Nk=1∑Krnklog(πk⋅N(xn∣μk,Σk)+1e−10)- 收敛检查:
收敛条件是检查当前对数似然与上一轮的对数似然之差是否小于容忍度 tol \text{tol} tol:
∣ log L − log L old ∣ < tol |\log L - \log L_{\text{old}}| < \text{tol} ∣logL−logLold∣<tol
predict(self, X)
def predict(self, X):
n_samples = X.shape[0]
likelihoods = np.zeros((n_samples, self.n_components))
for k in range(self.n_components):
pdf = multivariate_normal(mean=self.means[k], cov=self.covariances[k])
likelihoods[:, k] = self.weights[k] * pdf.pdf(X)
return np.argmax(likelihoods, axis=1)
这段代码实现了高斯混合模型(GMM)中的
predict
方法,目的是根据已拟合的模型来预测给定数据点 X X X 的每个样本属于哪个组件。
- 输入数据:
- X X X 是一个 n × d n \times d n×d 的矩阵,其中 n n n 是样本数量, d d d 是特征维数。
- 初始化似然数组:
- 创建一个 n × K n \times K n×K 的矩阵
likelihoods
,其中 K K K 是组件的数量。这个矩阵将用于存储每个样本在每个组件下的似然值。
likelihoods = [ 0 0 … 0 0 0 … 0 ⋮ ⋮ ⋱ ⋮ 0 0 … 0 ] ( n × K ) \text{likelihoods} = \begin{bmatrix} 0 & 0 & \ldots & 0 \\ 0 & 0 & \ldots & 0 \\ \vdots & \vdots & \ddots & \vdots \\ 0 & 0 & \ldots & 0 \end{bmatrix} \quad (n \times K) likelihoods= 00⋮000⋮0……⋱…00⋮0 (n×K)- 计算每个组件的似然:
- 对于每个组件 k k k,计算数据点 X X X 在该组件下的加权似然值:
likelihoods [ : , k ] = π k ⋅ N ( X ∣ μ k , Σ k ) \text{likelihoods}[:, k] = \pi_k \cdot \mathcal{N}(X | \mu_k, \Sigma_k) likelihoods[:,k]=πk⋅N(X∣μk,Σk)
其中:- π k \pi_k πk 是组件 k k k 的权重(混合系数)。
- N ( X ∣ μ k , Σ k ) \mathcal{N}(X | \mu_k, \Sigma_k) N(X∣μk,Σk) 是多元正态分布在组件 k k k 下的概率密度函数。
- 确定预测标签:
- 对于每个样本,选择具有最大似然值的组件,返回其索引作为预测类别:
predictions = arg max k ( likelihoods [ : , k ] ) \text{predictions} = \arg\max_{k}(\text{likelihoods}[:, k]) predictions=argkmax(likelihoods[:,k])
这意味着,对于每个样本 $ n $,其最终预测标签为:
y ^ n = arg max k ( π k ⋅ N ( x n ∣ μ k , Σ k ) ) \hat{y}_n = \arg\max_{k} \left( \pi_k \cdot \mathcal{N}(x_n | \mu_k, \Sigma_k) \right) y^n=argkmax(πk⋅N(xn∣μk,Σk))- 返回预测结果:
- 结果是一个长度为 n n n 的数组,包含每个样本的预测组件索引。
总结起来,
predict
方法通过计算每个样本在每个高斯组件下的加权似然,将样本分配给最可能的组件,从而完成聚类或分类任务。