线性链条件随机场(Linear Chain Conditional Random Field)-有监督学习方法、概率模型、生成模型、对数线性模型、参数化模型
定义
条件随机场
设X与Y是随机变量,
P
(
Y
∣
X
)
P(Y|X)
P(Y∣X)是在给定X的条件下
Y
Y
Y的条件概率分布,若随机变量
Y
Y
Y构成一个由无向图
G
=
(
V
,
E
)
G=(V,E)
G=(V,E)表示的马尔可夫随机场,即
P
(
Y
v
∣
X
,
Y
ω
,
ω
≠
v
)
=
P
(
Y
v
∣
X
,
Y
ω
,
ω
∼
v
)
P(Y_v|X,Y_\omega,\omega\ne v) = P(Y_v|X,Y_\omega,\omega \sim v)
P(Yv∣X,Yω,ω=v)=P(Yv∣X,Yω,ω∼v)
对任意结点
v
v
v成立,则称条件概率分布
P
(
Y
∣
X
)
P(Y|X)
P(Y∣X)为条件随机场。上式中
ω
∼
v
\omega \sim v
ω∼v表示在图
G
=
(
V
,
E
)
G=(V,E)
G=(V,E)中与结点
v
v
v有边连接的所有结点
ω
,
ω
≠
v
\omega,\omega \ne v
ω,ω=v表示结点
v
v
v以外的所有结点,
Y
v
,
Y
u
与
Y
ω
Y_v,Y_u与Y_\omega
Yv,Yu与Yω为结点
v
,
u
与
ω
对应的随机变量。
v,u与\omega对应的随机变量。
v,u与ω对应的随机变量。
线性链条件随机场的参数化形式
设
P
(
Y
∣
X
)
P(Y|X)
P(Y∣X)为线性链条件随机场,则在随机变量
X
X
X取值为
x
x
x的条件下,随机变量
Y
Y
Y取值为
y
y
y的条件概率具有如下形式:
P
(
y
∣
x
)
=
1
Z
(
x
)
e
x
p
(
∑
i
,
k
λ
k
t
k
(
y
i
−
1
,
y
i
,
x
,
i
)
+
∑
μ
l
s
l
(
y
i
,
x
,
i
)
)
P(y|x) = \frac{1}{Z(x)}exp\bigg( \sum_{i,k} \lambda_k t_k (y_i-1,y_i,x,i) + \sum \mu_l s_l(y_i,x,i)\bigg)
P(y∣x)=Z(x)1exp(i,k∑λktk(yi−1,yi,x,i)+∑μlsl(yi,x,i))
其中,
Z
(
x
)
=
∑
y
e
x
p
(
∑
i
,
k
λ
k
t
k
(
y
i
−
1
,
y
i
,
x
,
i
)
+
∑
i
,
l
μ
l
s
l
(
y
i
,
x
,
i
)
)
Z(x) = \sum_y exp\bigg( \sum_{i,k} \lambda_k t_k (y_i - 1,y_i,x,i) + \sum_{i,l} \mu_l s_l(y_i,x,i) \bigg)
Z(x)=y∑exp(i,k∑λktk(yi−1,yi,x,i)+i,l∑μlsl(yi,x,i))
式中,
t
k
和
s
l
是特征函数
,
λ
k
和
μ
l
是对应的权值。
Z
(
x
)
是规范化因子
,
求和是在所有可能的输出序列上进行的。
t_k和s_l是特征函数,\lambda_k和\mu_l是对应的权值。Z(x)是规范化因子,求和是在所有可能的输出序列上进行的。
tk和sl是特征函数,λk和μl是对应的权值。Z(x)是规范化因子,求和是在所有可能的输出序列上进行的。
输入空间
X
=
(
X
1
,
X
2
,
X
3
,
⋯
,
X
n
)
X = (X_1,X_2,X_3,\cdots,X_n)
X=(X1,X2,X3,⋯,Xn)
Y
=
(
Y
1
,
Y
2
,
Y
3
,
⋯
,
Y
n
)
,
Y
∈
R
n
Y = (Y_1,Y_2,Y_3,\cdots,Y_n),Y \in R^n
Y=(Y1,Y2,Y3,⋯,Yn),Y∈Rn
算法
from math import log
import os
from sklearn import datasets
# from matplotlib.tri.triinterpolate import LinearTriInterpolator
import numpy as np
from functools import partial
import sys
from pathlib import Path
from rich.console import Console
from rich.table import Table
import jieba
import jieba.posseg as psg # conda install -c conda-forge jieba
# sys.path.append(str(Path(os.path.abspath(__file__)).parent.parent))
# from utils import *
class LinearChainConditionalRandomField:
def __init__(self, feature_funcs, trans_feature_funcs, sequence_length, n_x, n_y, max_iteration=100, verbose=False):
"""
`feature_funcs是列表中的一组函数s(y_i,X,i)
`trans_feature_funcs是列表中的一组函数t(y_{i-1},y_i,X,i)
`sequence_length是每个输入序列的长度
`n_x`是序列x中每个项目的可能值的数量
`n`是序列y中每个项目的可能值的数量
"""
self.feature_funcs = feature_funcs
self.trans_feature_funcs = trans_feature_funcs
self.n_x = n_x
self.n_y = n_y
self.sequence_length = sequence_length
self.max_iteration = max_iteration
self.verbose = verbose
def newton(self, f, g, x0, epsilon=1e-6):
"""
Find the zero point wehre f(x) = 0 of function f
g(x) is the gradient function of f
"""
prex = x0
x = x0 - f(x0) / g(x0)
while abs(x - prex) > epsilon:
prex, x = x, x - f(x) / g(x)
return x
def get_trans(self, x):
"""得到给定观测序列x的转移矩阵"""
trans_feature = np.zeros([self.sequence_length, self.n_y, self.n_y])
for i in range(self.sequence_length):
for y_i_1 in range(self.n_y):
for y_i in range(self.n_y):
for j, func in enumerate(self.used_feature_funcs):
trans_feature[i, y_i_1, y_i] += self.w_feature_funcs[j] * func(y_i, x, i)
if i > 0:
for y_i_1 in range(self.n_y):
for y_i in range(self.n_y):
for j, func in enumerate(self.used_trans_feature_funcs):
trans_feature[i, y_i_1, y_i] += self.w_trans_feature_funcs[j] * func(y_i_1, y_i, x, i)
return np.exp(trans_feature)
def fit(self, X, Y):
"""
X是观测序列的二维矩阵
Y是隐藏状态序列的二维矩阵
通过改进迭代缩放优化权重
"""
E_feature = np.zeros(len(self.feature_funcs))
E_trans_feature = np.zeros(len(self.trans_feature_funcs))
#因为每个x都是一个序列,所以它的向量空间太大,无法迭代。
#我们需要在训练期间存储所有可能的序列x
#并且只迭代现有的x。
p_x = {tuple(x): 0. for x in X}
for x, y in zip(X, Y):
x_key = tuple(x)
p_x[x_key] += 1 / len(X)
for i, yi in enumerate(y):
for j, func in enumerate(self.feature_funcs):
E_feature[j] += func(yi, x, i) / len(X)
for i in range(1, self.sequence_length):
yi_1, yi = y[i - 1], y[i]
for j, func in enumerate(self.trans_feature_funcs):
E_trans_feature[j] += func(yi_1, yi, x, i) / len(X)
# 训练数据中没有显示的特征是无用的,过滤它们
self.used_feature_funcs = [func for E, func in zip(E_feature, self.feature_funcs) if E != 0]
self.used_trans_feature_funcs = [func for E, func in zip(E_trans_feature, self.trans_feature_funcs) if E != 0]
E_feature = E_feature[E_feature.nonzero()]
E_trans_feature = E_trans_feature[E_trans_feature.nonzero()]
self.w_feature_funcs = np.zeros(len(self.used_feature_funcs))
self.w_trans_feature_funcs = np.zeros(len(self.used_trans_feature_funcs))
# 预先计算特征函数的所有可能值
feature = np.zeros([len(self.used_feature_funcs), len(p_x), self.sequence_length, self.n_y])
trans_feature = np.zeros([len(self.used_trans_feature_funcs), len(p_x), self.sequence_length, self.n_y, self.n_y])
for x_i, x_key in enumerate(p_x):
x = np.array(x_key)
for func_i, func in enumerate(self.used_trans_feature_funcs):
for i in range(1, self.sequence_length):
for y_i_1 in range(self.n_y):
for y_i in range(self.n_y):
trans_feature[func_i, x_i, i, y_i_1, y_i] = func(y_i_1, y_i, x, i)
for func_i, func in enumerate(self.used_feature_funcs):
for i in range(self.sequence_length):
for y_i in range(self.n_y):
feature[func_i, x_i, i, y_i] = func(y_i, x, i)
# 在给定x的情况下,预先计算特征的最大数量
max_feature = np.zeros(len(p_x), dtype=int)
sum_trans_feature = trans_feature.sum(axis=0)
sum_feature = feature.sum(axis=0)
for x_i, x_key in enumerate(p_x):
cur_max_feature = np.zeros(self.n_y)
for i in range(self.sequence_length):
cur_max_feature = (cur_max_feature[:, None] + sum_trans_feature[x_i, i]).max(axis=0) + sum_feature[x_i, i]
max_feature[x_i] = cur_max_feature.max()
n_coef = max(max_feature) + 1
# 开始训练
for iteration in range(self.max_iteration):
if self.verbose:
print(f'Iteration {iteration} starts...')
loss = 0.
for funcs, w, E_experience in zip(
[self.used_feature_funcs, self.used_trans_feature_funcs],
[self.w_feature_funcs, self.w_trans_feature_funcs],
[E_feature, E_trans_feature]):
for func_i in range(len(funcs)):
# 如果函数是 self.used_trans_feature_funcs:
coef = np.zeros(n_coef)
# 只有可能是x上的迭代器
for x_i, x_key in enumerate(p_x):
cur_p_x = p_x[x_key]
x = np.array(x_key)
trans = self.get_trans(x)
# 前向推理算法
cur_prob = np.ones(self.n_y)
forward_prob = np.zeros([self.sequence_length + 1, self.n_y])
forward_prob[0] = cur_prob
for i in range(self.sequence_length):
cur_prob = cur_prob @ trans[i]
forward_prob[i + 1] = cur_prob
# 后向推理算法
cur_prob = np.ones(self.n_y)
backward_prob = np.zeros([self.sequence_length + 1, self.n_y])
backward_prob[-1] = cur_prob
for i in range(self.sequence_length - 1, -1, -1):
cur_prob = trans[i] @ cur_prob
backward_prob[i] = cur_prob
if iteration < 10:
np.testing.assert_almost_equal(
forward_prob[-1].sum(),
backward_prob[0].sum()
)
for i in range(1, self.sequence_length + 1):
np.testing.assert_almost_equal(
forward_prob[i] @ backward_prob[i],
forward_prob[-1].sum()
)
for i in range(0, self.sequence_length):
np.testing.assert_almost_equal(
(np.outer(forward_prob[i], backward_prob[i + 1]) * trans[i]).sum(),
forward_prob[-1].sum()
)
# 计算给定x的每个特征函数的期望值
cur_E_feature = 0.
if funcs is self.used_feature_funcs:
for i in range(1, self.sequence_length + 1):
cur_E_feature += (
forward_prob[i] * backward_prob[i] * feature[func_i, x_i, i - 1]
).sum()
elif funcs is self.used_trans_feature_funcs:
for i in range(0, self.sequence_length):
cur_E_feature += (
np.outer(forward_prob[i], backward_prob[i + 1]) * trans[i] * trans_feature[func_i, x_i, i]
).sum()
else:
raise Exception("Unknown function set!")
cur_E_feature /= forward_prob[-1].sum()
coef[max_feature[x_i]] += cur_p_x * cur_E_feature
# 更新w
dw_i = log(self.newton(
lambda x: sum(c * x ** i for i, c in enumerate(coef)) - E_experience[func_i],
lambda x: sum(i * c * x ** (i - 1) for i, c in enumerate(coef) if i > 0),
1
))
w[func_i] += dw_i
loss += abs(E_experience[func_i] - coef.sum())
loss /= len(self.feature_funcs) + len(self.trans_feature_funcs)
if self.verbose:
print(f'Iteration {iteration} ends, Loss: {loss}')
def predict(self, X):
"""
用维特比算法预测状态序列y
X是二维数组中的一组序列X
"""
ans = np.zeros([len(X), self.sequence_length])
for x_i, x in enumerate(X):
# 预先计算特征函数的所有可能值
feature = np.zeros([len(self.used_feature_funcs), self.sequence_length, self.n_y])
trans_feature = np.zeros([len(self.used_trans_feature_funcs), self.sequence_length, self.n_y, self.n_y])
for func_i, func in enumerate(self.used_trans_feature_funcs):
for i in range(1, self.sequence_length):
for y_i_1 in range(self.n_y):
for y_i in range(self.n_y):
trans_feature[func_i, i, y_i_1, y_i] = func(y_i_1, y_i, x, i)
for func_i, func in enumerate(self.used_feature_funcs):
for i in range(self.sequence_length):
for y_i in range(self.n_y):
feature[func_i, i, y_i] = func(y_i, x, i)
feature = (self.w_feature_funcs[:, None, None] * feature).sum(axis=0)
trans_feature = (self.w_trans_feature_funcs[:, None, None, None] * trans_feature).sum(axis=0)
# 维特比算法
pre_state = np.zeros([self.sequence_length, self.n_y], dtype=int) - 1
prob = np.zeros([self.sequence_length, self.n_y])
cur_prob = np.ones(self.n_y)
for i in range(self.sequence_length):
trans_prob = cur_prob[:, None] + trans_feature[i]
pre_state[i] = trans_prob.argmax(axis=0)
cur_prob = trans_prob.max(axis=0) + feature[i]
prob[i] = cur_prob
# 回溯痕迹
cur_state = prob[-1].argmax()
for i in range(self.sequence_length - 1, -1, -1):
ans[x_i, i] = cur_state
cur_state = pre_state[i, cur_state]
return ans
if __name__ == '__main__':
def demonstrate(X, Y, testX, n_y):
console = Console(markup=False)
vocab = set(X.flatten())
vocab_size = len(vocab)
word2num = {word: num for num, word in enumerate(vocab)}
f_word2num = np.vectorize(lambda word: word2num[word])
numX, num_testX = map(f_word2num, (X, testX))
sequence_length = numX.shape[-1]
class FeatureFunc:
def __init__(self, x_i, y_i):
self.x_i = x_i
self.y_i = y_i
def __call__(self, y_i, x, i):
return int(y_i == self.y_i and x[i] == self.x_i)
class TransFeatureFunc:
def __init__(self, y_i_1, y_i):
self.y_i = y_i
self.y_i_1 = y_i_1
def __call__(self, y_i_1, y_i, x, i):
return int(y_i_1 == self.y_i_1 and y_i == self.y_i)
feature_funcs = [FeatureFunc(x_i, y_i)
for x_i in range(vocab_size)
for y_i in range(n_y)]
trans_feature_funcs = [TransFeatureFunc(y_i_1, y_i)
for y_i_1 in range(n_y)
for y_i in range(n_y)]
linear_chain_conditional_random_field = LinearChainConditionalRandomField(
feature_funcs,
trans_feature_funcs,
sequence_length,
vocab_size,
n_y,
verbose=True
)
linear_chain_conditional_random_field.fit(numX, Y)
pred = linear_chain_conditional_random_field.predict(num_testX)
# show in table
table = Table()
for x, p in zip(testX, pred):
table.add_row(*map(str, x))
table.add_row(*map(str, p))
console.print(table)
X = np.array([s.split() for s in
['i am good .',
'i am bad .',
'you are good .',
'you are bad .',
'it is good .',
'it is bad .',
]
])
Y = np.array([
[0, 1, 2, 3],
[0, 1, 2, 3],
[0, 1, 2, 3],
[0, 1, 2, 3],
[0, 1, 2, 3],
])
testX = np.array([s.split() for s in
['you is good .',
'i are bad .',
'it are good .']
])
testX = np.concatenate([X, testX])
demonstrate(X, Y, testX, 4)