当前位置: 首页 > article >正文

线性链条件随机场(Linear Chain Conditional Random Field)-有监督学习方法、概率模型、生成模型、对数线性模型、参数化模型

定义

条件随机场

设X与Y是随机变量, P ( Y ∣ X ) P(Y|X) P(YX)是在给定X的条件下 Y Y Y的条件概率分布,若随机变量 Y Y Y构成一个由无向图 G = ( V , E ) G=(V,E) G=(V,E)表示的马尔可夫随机场,即
P ( Y v ∣ X , Y ω , ω ≠ v ) = P ( Y v ∣ X , Y ω , ω ∼ v ) P(Y_v|X,Y_\omega,\omega\ne v) = P(Y_v|X,Y_\omega,\omega \sim v) P(YvX,Yω,ω=v)=P(YvX,Yω,ωv)
对任意结点 v v v成立,则称条件概率分布 P ( Y ∣ X ) P(Y|X) P(YX)为条件随机场。上式中 ω ∼ v \omega \sim v ωv表示在图 G = ( V , E ) G=(V,E) G=(V,E)中与结点 v v v有边连接的所有结点 ω , ω ≠ v \omega,\omega \ne v ω,ω=v表示结点 v v v以外的所有结点, Y v , Y u 与 Y ω Y_v,Y_u与Y_\omega Yv,YuYω为结点 v , u 与 ω 对应的随机变量。 v,u与\omega对应的随机变量。 v,uω对应的随机变量。

线性链条件随机场的参数化形式

P ( Y ∣ X ) P(Y|X) P(YX)为线性链条件随机场,则在随机变量 X X X取值为 x x x的条件下,随机变量 Y Y Y取值为 y y y的条件概率具有如下形式:
P ( y ∣ x ) = 1 Z ( x ) e x p ( ∑ i , k λ k t k ( y i − 1 , y i , x , i ) + ∑ μ l s l ( y i , x , i ) ) P(y|x) = \frac{1}{Z(x)}exp\bigg( \sum_{i,k} \lambda_k t_k (y_i-1,y_i,x,i) + \sum \mu_l s_l(y_i,x,i)\bigg) P(yx)=Z(x)1exp(i,kλktk(yi1,yi,x,i)+μlsl(yi,x,i))
其中,
Z ( x ) = ∑ y e x p ( ∑ i , k λ k t k ( y i − 1 , y i , x , i ) + ∑ i , l μ l s l ( y i , x , i ) ) Z(x) = \sum_y exp\bigg( \sum_{i,k} \lambda_k t_k (y_i - 1,y_i,x,i) + \sum_{i,l} \mu_l s_l(y_i,x,i) \bigg) Z(x)=yexp(i,kλktk(yi1,yi,x,i)+i,lμlsl(yi,x,i))
式中, t k 和 s l 是特征函数 , λ k 和 μ l 是对应的权值。 Z ( x ) 是规范化因子 , 求和是在所有可能的输出序列上进行的。 t_k和s_l是特征函数,\lambda_k和\mu_l是对应的权值。Z(x)是规范化因子,求和是在所有可能的输出序列上进行的。 tksl是特征函数,λkμl是对应的权值。Z(x)是规范化因子,求和是在所有可能的输出序列上进行的。

输入空间

X = ( X 1 , X 2 , X 3 , ⋯   , X n ) X = (X_1,X_2,X_3,\cdots,X_n) X=(X1,X2,X3,,Xn)
Y = ( Y 1 , Y 2 , Y 3 , ⋯   , Y n ) , Y ∈ R n Y = (Y_1,Y_2,Y_3,\cdots,Y_n),Y \in R^n Y=(Y1,Y2,Y3,,Yn),YRn

算法

from math import log
import os
from sklearn import datasets 
# from matplotlib.tri.triinterpolate import LinearTriInterpolator
import numpy as np
from functools import partial
import sys
from pathlib import Path
from rich.console import Console
from rich.table import Table
import jieba
import jieba.posseg as psg # conda install -c conda-forge jieba
# sys.path.append(str(Path(os.path.abspath(__file__)).parent.parent))
# from utils import *

class LinearChainConditionalRandomField:
    def __init__(self, feature_funcs, trans_feature_funcs, sequence_length, n_x, n_y, max_iteration=100, verbose=False):
        """
        `feature_funcs是列表中的一组函数s(y_i,X,i)
        `trans_feature_funcs是列表中的一组函数t(y_{i-1},y_i,X,i)
        `sequence_length是每个输入序列的长度
        `n_x`是序列x中每个项目的可能值的数量
        `n`是序列y中每个项目的可能值的数量
        """
        self.feature_funcs = feature_funcs
        self.trans_feature_funcs = trans_feature_funcs
        self.n_x = n_x
        self.n_y = n_y
        self.sequence_length = sequence_length
        self.max_iteration = max_iteration
        self.verbose = verbose
        
    def newton(self, f, g, x0, epsilon=1e-6):
        """
        Find the zero point wehre f(x) = 0 of function f
        g(x) is the gradient function of f
        """
        prex = x0
        x = x0 - f(x0) / g(x0)
        while abs(x - prex) > epsilon:
            prex, x = x, x - f(x) / g(x)
        return x

    def get_trans(self, x):
        """得到给定观测序列x的转移矩阵"""
        trans_feature = np.zeros([self.sequence_length, self.n_y, self.n_y])
        for i in range(self.sequence_length):
            for y_i_1 in range(self.n_y):
                for y_i in range(self.n_y):
                    for j, func in enumerate(self.used_feature_funcs):
                        trans_feature[i, y_i_1, y_i] += self.w_feature_funcs[j] * func(y_i, x, i)
            if i > 0:
                for y_i_1 in range(self.n_y):
                    for y_i in range(self.n_y):
                        for j, func in enumerate(self.used_trans_feature_funcs):
                            trans_feature[i, y_i_1, y_i] += self.w_trans_feature_funcs[j] * func(y_i_1, y_i, x, i)
        return np.exp(trans_feature)

    def fit(self, X, Y):
        """
        X是观测序列的二维矩阵
        Y是隐藏状态序列的二维矩阵
        通过改进迭代缩放优化权重
        """
        E_feature = np.zeros(len(self.feature_funcs))
        E_trans_feature = np.zeros(len(self.trans_feature_funcs))

        #因为每个x都是一个序列,所以它的向量空间太大,无法迭代。
        #我们需要在训练期间存储所有可能的序列x
        #并且只迭代现有的x。
        p_x = {tuple(x): 0. for x in X}

        for x, y in zip(X, Y):
            x_key = tuple(x)
            p_x[x_key] += 1 / len(X)
            for i, yi in enumerate(y):
                for j, func in enumerate(self.feature_funcs):
                    E_feature[j] += func(yi, x, i) / len(X)
            for i in range(1, self.sequence_length):
                yi_1, yi = y[i - 1], y[i]
                for j, func in enumerate(self.trans_feature_funcs):
                    E_trans_feature[j] += func(yi_1, yi, x, i) / len(X)

        # 训练数据中没有显示的特征是无用的,过滤它们
        self.used_feature_funcs = [func for E, func in zip(E_feature, self.feature_funcs) if E != 0]
        self.used_trans_feature_funcs = [func for E, func in zip(E_trans_feature, self.trans_feature_funcs) if E != 0]
        E_feature = E_feature[E_feature.nonzero()]
        E_trans_feature = E_trans_feature[E_trans_feature.nonzero()]
        self.w_feature_funcs = np.zeros(len(self.used_feature_funcs))
        self.w_trans_feature_funcs = np.zeros(len(self.used_trans_feature_funcs))

        # 预先计算特征函数的所有可能值
        feature = np.zeros([len(self.used_feature_funcs), len(p_x), self.sequence_length, self.n_y])
        trans_feature = np.zeros([len(self.used_trans_feature_funcs), len(p_x), self.sequence_length, self.n_y, self.n_y])
        for x_i, x_key in enumerate(p_x):
            x = np.array(x_key)
            for func_i, func in enumerate(self.used_trans_feature_funcs):
                for i in range(1, self.sequence_length):
                    for y_i_1 in range(self.n_y):
                        for y_i in range(self.n_y):
                            trans_feature[func_i, x_i, i, y_i_1, y_i] = func(y_i_1, y_i, x, i)
            for func_i, func in enumerate(self.used_feature_funcs):
                for i in range(self.sequence_length):
                    for y_i in range(self.n_y):
                        feature[func_i, x_i, i, y_i] = func(y_i, x, i)

        # 在给定x的情况下,预先计算特征的最大数量
        max_feature = np.zeros(len(p_x), dtype=int)
        sum_trans_feature = trans_feature.sum(axis=0)
        sum_feature = feature.sum(axis=0)
        for x_i, x_key in enumerate(p_x):
            cur_max_feature = np.zeros(self.n_y)
            for i in range(self.sequence_length):
                cur_max_feature = (cur_max_feature[:, None] + sum_trans_feature[x_i, i]).max(axis=0) + sum_feature[x_i, i]
            max_feature[x_i] = cur_max_feature.max()
        n_coef = max(max_feature) + 1

        # 开始训练
        for iteration in range(self.max_iteration):
            if self.verbose:
                print(f'Iteration {iteration} starts...')
            loss = 0.
            for funcs, w, E_experience in zip(
                    [self.used_feature_funcs, self.used_trans_feature_funcs],
                    [self.w_feature_funcs, self.w_trans_feature_funcs],
                    [E_feature, E_trans_feature]):
                for func_i in range(len(funcs)):
                    # 如果函数是 self.used_trans_feature_funcs:
                    coef = np.zeros(n_coef)
                    # 只有可能是x上的迭代器
                    for x_i, x_key in enumerate(p_x):
                        cur_p_x = p_x[x_key]
                        x = np.array(x_key)

                        trans = self.get_trans(x)
                        # 前向推理算法
                        cur_prob = np.ones(self.n_y)
                        forward_prob = np.zeros([self.sequence_length + 1, self.n_y])
                        forward_prob[0] = cur_prob
                        for i in range(self.sequence_length):
                            cur_prob = cur_prob @ trans[i]
                            forward_prob[i + 1] = cur_prob
                        # 后向推理算法
                        cur_prob = np.ones(self.n_y)
                        backward_prob = np.zeros([self.sequence_length + 1, self.n_y])
                        backward_prob[-1] = cur_prob
                        for i in range(self.sequence_length - 1, -1, -1):
                            cur_prob = trans[i] @ cur_prob
                            backward_prob[i] = cur_prob

                        if iteration < 10:
                            np.testing.assert_almost_equal(
                                forward_prob[-1].sum(),
                                backward_prob[0].sum()
                            )
                            for i in range(1, self.sequence_length + 1):
                                np.testing.assert_almost_equal(
                                    forward_prob[i] @ backward_prob[i],
                                    forward_prob[-1].sum()
                                )
                            for i in range(0, self.sequence_length):
                                np.testing.assert_almost_equal(
                                    (np.outer(forward_prob[i], backward_prob[i + 1]) * trans[i]).sum(),
                                    forward_prob[-1].sum()
                                )

                        # 计算给定x的每个特征函数的期望值
                        cur_E_feature = 0.
                        if funcs is self.used_feature_funcs:
                            for i in range(1, self.sequence_length + 1):
                                cur_E_feature += (
                                    forward_prob[i] * backward_prob[i] * feature[func_i, x_i, i - 1]
                                ).sum()
                        elif funcs is self.used_trans_feature_funcs:
                            for i in range(0, self.sequence_length):
                                cur_E_feature += (
                                    np.outer(forward_prob[i], backward_prob[i + 1]) * trans[i] * trans_feature[func_i, x_i, i]
                                ).sum()
                        else:
                            raise Exception("Unknown function set!")
                        cur_E_feature /= forward_prob[-1].sum()

                        coef[max_feature[x_i]] += cur_p_x * cur_E_feature

                    # 更新w
                    dw_i = log(self.newton(
                        lambda x: sum(c * x ** i for i, c in enumerate(coef)) - E_experience[func_i],
                        lambda x: sum(i * c * x ** (i  - 1) for i, c in enumerate(coef) if i > 0),
                        1
                    ))
                    w[func_i] += dw_i
                    loss += abs(E_experience[func_i] - coef.sum())
            loss /= len(self.feature_funcs) + len(self.trans_feature_funcs)
            if self.verbose:
                print(f'Iteration {iteration} ends, Loss: {loss}')

    def predict(self, X):
        """
        用维特比算法预测状态序列y
        X是二维数组中的一组序列X
        """
        ans = np.zeros([len(X), self.sequence_length])
        for x_i, x in enumerate(X):
            # 预先计算特征函数的所有可能值
            feature = np.zeros([len(self.used_feature_funcs), self.sequence_length, self.n_y])
            trans_feature = np.zeros([len(self.used_trans_feature_funcs), self.sequence_length, self.n_y, self.n_y])
            for func_i, func in enumerate(self.used_trans_feature_funcs):
                for i in range(1, self.sequence_length):
                    for y_i_1 in range(self.n_y):
                        for y_i in range(self.n_y):
                            trans_feature[func_i, i, y_i_1, y_i] = func(y_i_1, y_i, x, i)
            for func_i, func in enumerate(self.used_feature_funcs):
                for i in range(self.sequence_length):
                    for y_i in range(self.n_y):
                        feature[func_i, i, y_i] = func(y_i, x, i)
            feature = (self.w_feature_funcs[:, None, None] * feature).sum(axis=0)
            trans_feature = (self.w_trans_feature_funcs[:, None, None, None] * trans_feature).sum(axis=0)

            # 维特比算法
            pre_state = np.zeros([self.sequence_length, self.n_y], dtype=int) - 1
            prob = np.zeros([self.sequence_length, self.n_y])
            cur_prob = np.ones(self.n_y)
            for i in range(self.sequence_length):
                trans_prob = cur_prob[:, None] + trans_feature[i]
                pre_state[i] = trans_prob.argmax(axis=0)
                cur_prob = trans_prob.max(axis=0) + feature[i]
                prob[i] = cur_prob

            # 回溯痕迹
            cur_state = prob[-1].argmax()
            for i in range(self.sequence_length - 1, -1, -1):
                ans[x_i, i] = cur_state
                cur_state = pre_state[i, cur_state]
        return ans


if __name__ == '__main__':
    def demonstrate(X, Y, testX, n_y):
        console = Console(markup=False)

        vocab = set(X.flatten())
        vocab_size = len(vocab)
        word2num = {word: num for num, word in enumerate(vocab)}

        f_word2num = np.vectorize(lambda word: word2num[word])

        numX, num_testX = map(f_word2num, (X, testX))

        sequence_length = numX.shape[-1]

        class FeatureFunc:
            def __init__(self, x_i, y_i):
                self.x_i = x_i
                self.y_i = y_i

            def __call__(self, y_i, x, i):
                return int(y_i == self.y_i and x[i] == self.x_i)

        class TransFeatureFunc:
            def __init__(self, y_i_1, y_i):
                self.y_i = y_i
                self.y_i_1 = y_i_1

            def __call__(self, y_i_1, y_i, x, i):
                return int(y_i_1 == self.y_i_1 and y_i == self.y_i)

        feature_funcs = [FeatureFunc(x_i, y_i)
                         for x_i in range(vocab_size)
                         for y_i in range(n_y)]
        trans_feature_funcs = [TransFeatureFunc(y_i_1, y_i)
                               for y_i_1 in range(n_y)
                               for y_i in range(n_y)]

        linear_chain_conditional_random_field = LinearChainConditionalRandomField(
            feature_funcs,
            trans_feature_funcs,
            sequence_length,
            vocab_size,
            n_y,
            verbose=True
        )
        linear_chain_conditional_random_field.fit(numX, Y)
        pred = linear_chain_conditional_random_field.predict(num_testX)

        # show in table
        table = Table()
        for x, p in zip(testX, pred):
            table.add_row(*map(str, x))
            table.add_row(*map(str, p))
        console.print(table)

    X = np.array([s.split() for s in
                  ['i am good .',
                   'i am bad .',
                   'you are good .',
                   'you are bad .',
                   'it is good .',
                   'it is bad .',
                   ]
                  ])
    Y = np.array([
        [0, 1, 2, 3],
        [0, 1, 2, 3],
        [0, 1, 2, 3],
        [0, 1, 2, 3],
        [0, 1, 2, 3],
    ])
    testX = np.array([s.split() for s in
                  ['you is good .',
                   'i are bad .',
                   'it are good .']
                  ])
    testX = np.concatenate([X, testX])
    demonstrate(X, Y, testX, 4)

http://www.kler.cn/a/310087.html

相关文章:

  • 15 个改变世界的开源项目:塑造现代技术的先锋力量
  • Python的Web请求:requests库入门与应用
  • 2024年11月10日系统架构设计师考试题目回顾
  • 5G 现网信令参数学习(3) - RrcSetup(1)
  • old-cms(原生PHP开发的企业网站管理系统)
  • IPv4与IPv6的优缺点
  • 力扣(LeetCode)每日一题 1184. 公交站间的距离
  • 前后端分离Vue美容店会员信息管理系统o7grs
  • Java-使用反射来处理对象,并构建新的JSON数据结构
  • 换个手机IP地址是不是不一样?
  • spring boot admin集成,springboot2.x集成监控
  • .net core8 使用JWT鉴权(附当前源码)
  • Python 之数据库操作(Python Database Operations)
  • Linux(ubuntu)(c语言程序)
  • C++(C++的文件I/O)
  • nanoGPT用红楼梦数据从头训练babyGPT-12.32M实现任意问答
  • Redis 5.0.4 安装教程
  • NFT Insider #147:Sandbox 人物化身九月奖励上线;Catizen 付费用户突破百万
  • PDF转图片的思路思考
  • Leetcode—环形链表||
  • 脚本基本规则
  • C++:日期类的实现
  • java 递归读取前10个匹配的文件所在的全路径
  • 松散绑定是什么?
  • 切换淘宝最新镜像源:优化NPM包管理的极致体验
  • windows C++ 并行编程-异步消息块(一)