一文读懂NLP之HMM模型代码python实现与演示

Tawnie ·
更新时间:2024-11-13
· 734 次阅读

一文读懂NLP之HMM模型代码python实现与演示1. 前言2 概率计算问题2.1 前向算法2.2 后向算法3 模型训练问题3.1 监督学习--最大似然估计2.2 Baum·welch算法4 序列预测问题4.1 维特比算法 1. 前言

在上一篇《一文读懂NLP之隐马尔科夫模型(HMM)详解加python实现》中已经详细介绍了HMM模型的算法原理,这一篇主要是从零实现HMM模型。
定义HMM模型:

class HMM(object): def __init__(self, n, m, a=None, b=None, pi=None): # 可能的隐藏状态数 self.N = n # 可能的观测数 self.M = m # 状态转移概率矩阵 self.A = a # 观测概率矩阵 self.B = b # 初始状态概率矩阵 self.Pi = pi # 观测序列 self.X = None # 状态序列 self.Y = None # 序列长度 self.T = 0 # 定义前向算法 self.alpha = None # 定义后向算法 self.beta = None 2 概率计算问题

给定模型参数λ=(π,A,B)\lambda=(\boldsymbol{\pi},A,B)λ=(π,A,B),和一个观测序列x\boldsymbol{x}x,,计算在这个模型参数λ\lambdaλ下,观测序列出现的最大概率,即p(x∣λ)p(\boldsymbol{x}|\lambda)p(x∣λ)的最大值。

先定义盒子和球模型,状态集合为Q={1,2,3}Q=\{1,2,3\}Q={1,2,3},观测集合为V={红,白}V=\{红,白\}V={红,白},
A=[0.50.20.30.30.50.20.20.30.5]A=\begin{bmatrix} 0.5 & 0.2 & 0.3\\ 0.3 & 0.5 & 0.2\\ 0.2 & 0.3 & 0.5 \end{bmatrix}A=⎣⎡​0.50.30.2​0.20.50.3​0.30.20.5​⎦⎤​ , B=[0.50.50.40.60.70.3]B=\begin{bmatrix} 0.5 & 0.5 \\ 0.4 & 0.6\\ 0.7 & 0.3 \end{bmatrix}B=⎣⎡​0.50.40.7​0.50.60.3​⎦⎤​ , π=(0.2,0.4,0.4)T\pi=(0.2,0.4,0.4)^Tπ=(0.2,0.4,0.4)T
设T=3T=3T=3,x=(红,白,红)\boldsymbol{x}=(红,白,红)x=(红,白,红)
代码表示:

q = [1, 2, 3] v = ["红", "白"] n = len(q) m = len(v) x = ["红", "白", "红"] # 建立一个字典 char_to_id = {} id_to_char = {} for i in v: char_to_id[i] = len(char_to_id) id_to_char[char_to_id[i]] = i X = [char_to_id[i] for i in x] a = np.array([[0.5, 0.2, 0.3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]]) b = np.array([[0.5, 0.5], [0.4, 0.6], [0.7, 0.3]]) pi = np.array([0.2, 0.4, 0.4]) 2.1 前向算法

具体的算法原理就不讲了,可以参考上一篇文章《一文读懂NLP之隐马尔科夫模型(HMM)详解加python实现》。

初始
初始:
根据π\boldsymbol{\pi}π生成t=1t=1t=1时刻的状态y1=qiy_1=q_iy1​=qi​,概率为πi\pi_iπi​,并且根据发射概率矩阵B\boldsymbol{B}B由y1=qiy_1=q_iy1​=qi​生成x1x_1x1​,概率为bix1b_{ix_1}bix1​​,则:
α1(i)=πibix1\alpha_1(i)=\pi_ib_{ix_1}α1​(i)=πi​bix1​​ 递推
对t=1,2,3,...T−1t=1,2,3,...T-1t=1,2,3,...T−1,有:
αt+1(i)=[∑j=1Nαt(j)aji]bixt+1,i=1,2,3,...N\alpha_{t+1}(i)=[\sum\limits^N_{j=1}\alpha_t(j)a_{ji}]b_{ix_{t+1}},\quad i=1,2,3,...Nαt+1​(i)=[j=1∑N​αt​(j)aji​]bixt+1​​,i=1,2,3,...N 终止
p(x∣λ)=∑i=1NαT(i)p(\boldsymbol{x}|\boldsymbol{\lambda})=\sum\limits^N_{i=1}\alpha_T(i)p(x∣λ)=i=1∑N​αT​(i) def forward(self, x): """ 前向算法 计算给定模型参数和观测序列的情况下,观测序列出现的最大概率 :param x: 观测序列 :return: 观测值 """ # 序列长度 self.T = len(x) self.X = np.array(x) # alpha是一个具有T行N列的矩阵 self.alpha = np.zeros((self.T, self.N)) # 初始状态 for i in range(self.N): self.alpha[0][i] = self.Pi[i] * self.B[i][self.X[0]] # 递推 for t in range(1, self.T): for i in range(self.N): probability_sum = 0 for j in range(self.N): probability_sum += self.alpha[t - 1][j] * self.A[j][i] self.alpha[t][i] = probability_sum * self.B[i][self.X[t]] # 终止 return sum(self.alpha[self.T - 1])

运行:

hmm = HMM(n, m, a, b, pi) print("forward algorithm:", hmm.forward(X))

结果:

forward algorithm: 0.130218 2.2 后向算法

给定HMM模型λ\boldsymbol{\lambda}λ,定义时刻ttt状态为qiq_iqi​的条件下,从t+1到Tt+1到Tt+1到T的部分观测序列为xt+1,xt+2,xt+3,...,xTx_{t+1},x_{t+2},x_{t+3},...,x_Txt+1​,xt+2​,xt+3​,...,xT​的概率为后向概率,记作:
βt(i)=p(xt+1,xt+2,xt+3,...,xT∣yt=qi,λ)\beta_t(i)=p(x_{t+1},x_{t+2},x_{t+3},...,x_T|y_t=q_i,\boldsymbol{\lambda})βt​(i)=p(xt+1​,xt+2​,xt+3​,...,xT​∣yt​=qi​,λ)

当t=Tt=Tt=T时:
βT(i)=p(这里没东西∣yT=qi,λ)=1\beta_T(i)=p(这里没东西|y_T=q_i,\boldsymbol{\lambda})=1βT​(i)=p(这里没东西∣yT​=qi​,λ)=1 递推
βt(i)=∑j=1Naijbjxt+1βt+1(j),i=1,2,3,...,N\beta_t(i)=\sum\limits_{j=1}^Na_{ij}b_{jx_{t+1}}\beta_{t+1}(j),\quad i=1,2,3,...,Nβt​(i)=j=1∑N​aij​bjxt+1​​βt+1​(j),i=1,2,3,...,N 终止
p(x∣λ)=∑i=1Nπibix1βt+1(i)p(\boldsymbol{x}|\boldsymbol{\lambda})=\sum\limits_{i=1}^N\pi_ib_{ix_1}\beta_{t+1}(i)p(x∣λ)=i=1∑N​πi​bix1​​βt+1​(i) def backward(self, x): """ 后向算法 """ # 序列长度 self.T = len(x) self.X = np.array(x) # beta是一个T行N列的矩阵 self.beta = np.zeros((self.T, self.N)) # 当t=T时,置值为1 for i in range(self.N): self.beta[self.T - 1][i] = 1 # 从t=T-1递推到t=1 for t in range(self.T - 2, -1, -1): for i in range(self.N): for j in range(self.N): self.beta[t][i] += self.A[i][j] * self.B[j][self.X[t + 1]] * self.beta[t + 1][j] # 终止 sum_probability = 0 for i in range(self.N): sum_probability += self.Pi[i] * self.B[i][self.X[0]] * self.beta[0][i] return sum_probability

运行:

hmm = HMM(n, m, a, b, pi) # print("forward algorithm:", hmm.forward(X)) print("backward algorithm:", hmm.forward(X))

结果:

backward algorithm: 0.130218 3 模型训练问题

给定训练集(x(i),y(i))(\boldsymbol{x}^{(i)},\boldsymbol{y}^{(i)})(x(i),y(i)),估计模型参数λ=(π,A,B)\lambda=(\boldsymbol{\pi},A,B)λ=(π,A,B),使得在该模型下观测序列概率p(x∣λ)p(\boldsymbol{x}|\lambda)p(x∣λ)最大。

3.1 监督学习–最大似然估计

初始状态概率向量的估计
统计S个样本中,初始状态为qiq_iqi​的频率。
π^i=NqiS\hat{\pi}_i=\frac{N_{q_i}}{S}π^i​=SNqi​​​
其中,NqiN_{q_i}Nqi​​是初始状态为qiq_iqi​的样本数量,S是样本的数量。

状态转移概率矩阵的估计
设样本中时刻t处于状态qiq_iqi​,时刻t+1处于状态qjq_jqj​的频数为AijA_{ij}Aij​,那么状态转移概率矩阵的估计为:
a^ij=Aij∑j=1NAij,j=1,2,3,...,N;i=1,2,3,...,N\hat{a}_{ij}=\frac{A_{ij}}{\sum\limits_{j=1}^NA_{ij}},\quad j=1,2,3,...,N;\quad i=1,2,3,...,Na^ij​=j=1∑N​Aij​Aij​​,j=1,2,3,...,N;i=1,2,3,...,N

发射概率矩阵的估计
设样本中状态为iii并观测值为jjj的频数BijB_{ij}Bij​,那么状态为iii观测为jjj的概率bijb_{ij}bij​的估计为:
b^ij=Bij∑j=1MBij,j=1,2,3,...,M;i=1,2,3,...,N\hat{b}_{ij}=\frac{B_{ij}}{\sum\limits_{j=1}^MB_{ij}},\quad j=1,2,3,...,M;\quad i=1,2,3,...,Nb^ij​=j=1∑M​Bij​Bij​​,j=1,2,3,...,M;i=1,2,3,...,N

def train(self, train_data): """ 训练模型,使用最大似然估计 :param train_data: 训练数据,每一个样本:[观测值,隐藏状态值] :return: 返回一个HMM模型 """ self.T = int(len(train_data[0]) / 2) sample_num = len(train_data) # 初始化 self.init() # 初始状态概率矩阵的估计 for sequence in train_data: self.Pi[sequence[0 + self.T]] += 1 self.Pi = self.Pi / sample_num # 状态转移矩阵的估计 a_num = np.zeros((self.N, self.N)) for sequence in train_data: for i in range(self.T - 1): a_num[sequence[i + self.T]][sequence[i + 1 + self.T]] += 1.0 temp = a_num.sum(axis=1).reshape((3, 1)) self.A = a_num / temp # 发射概率矩阵的估计 b_num = np.zeros((self.N, self.M)) for sequence in train_data: for i in range(self.T - 1): b_num[sequence[i + self.T]][sequence[i]] += 1.0 temp = b_num.sum(axis=1).reshape((3, 1)) self.B = b_num / temp

在训练开始之前,需要创建训练数据

def generate_train_data(n, m, t, a, b, pi, nums=10000): """ 生成训练数据 :param pi: 初始状态概率矩阵 :param b: 发射概率矩阵 :param a: 状态转移矩阵 :param n: 隐藏状态数量 :param m:观测值数量 :param t: 序列长度 :param nums: 样本数量 :return: 训练数据集 """ train_data = [] for i in range(nums): state_sequence = [] observation_sequence = [] # 初始状态 temp = 0 p = random.random() for j in range(n): temp += pi[j] if p > temp: continue else: state_sequence.append(j) break # 递推 for t_index in range(t): # 生成状态 if t_index != 0: temp = 0 p = random.random() for state in range(n): temp += a[state_sequence[-1]][state] if p > temp: continue else: state_sequence.append(state) break # 生成观测序列 temp = 0 p = random.random() for observation in range(m): temp += b[state_sequence[-1]][observation] if p > temp: continue else: observation_sequence.append(observation) break observation_sequence.extend(state_sequence) train_data.append(observation_sequence) return train_data

开始训练:

train_data = generate_train_data(n, m, 8, a, b, pi) print(train_data) hmm = HMM(n, m) hmm.train(train_data) print(hmm.Pi) print(hmm.A) print(hmm.B)

结果:

[0.1984 0.4011 0.4005] [[0.49525581 0.20134884 0.30339535] [0.29948182 0.50112204 0.19939614] [0.20220083 0.30086282 0.49693635]] [[0.50483721 0.49516279] [0.39422253 0.60577747] [0.70305531 0.29694469]]

可以看见,和原始的数据几乎一样。

2.2 Baum·welch算法

aij=∑t=1T−1ξt(i,j)∑t=1T−1γt(i)a_{ij}=\frac{\sum\limits_{t=1}^{T-1}\xi_t(i,j)}{\sum\limits_{t=1}^{T-1}\gamma_t(i)}aij​=t=1∑T−1​γt​(i)t=1∑T−1​ξt​(i,j)​

bij=∑t=1,xt=vjTγt(i)∑t=1Tγt(i)b_{ij}=\frac{\sum\limits_{t=1,x_t=v_j}^T\gamma_t(i)}{\sum\limits_{t=1}^T\gamma_t(i)}bij​=t=1∑T​γt​(i)t=1,xt​=vj​∑T​γt​(i)​

πi=γ1(i)\pi_i=\gamma_1(i)πi​=γ1​(i)

def baum_welch(self, x, criterion=0.001): self.T = len(x) self.X = x while True: # 为了得到alpha和beta的矩阵 _ = self.forward(self.X) _ = self.backward(self.X) xi = np.zeros((self.T - 1, self.N, self.N), dtype=float) for t in range(self.T - 1): # 笨办法 # for i in range(self.N): # gamma[t][i] = self.calculate_gamma(t, i) # for j in range(self.N): # xi[t][i][j] = self.calculate_psi(t, i, j) # for i in range(self.N): # gamma[self.T-1][i] = self.calculate_gamma(self.T-1, i) # numpy矩阵的办法 denominator = np.sum(np.dot(self.alpha[t, :], self.A) * self.B[:, self.X[t + 1]] * self.beta[t + 1, :]) for i in range(self.N): molecular = self.alpha[t, i] * self.A[i, :] * self.B[:, self.X[t+1]]*self.beta[t+1, :] xi[t, i, :] = molecular / denominator gamma = np.sum(xi, axis=2) prod = (self.alpha[self.T-1, :]*self.beta[self.T-1, :]) gamma = np.vstack((gamma, prod / np.sum(prod))) new_pi = gamma[0, :] new_a = np.sum(xi, axis=0) / np.sum(gamma[:-1, :], axis=0).reshape(-1, 1) new_b = np.zeros(self.B.shape, dtype=float) for k in range(self.B.shape[1]): mask = self.X == k new_b[:, k] = np.sum(gamma[mask, :], axis=0) / np.sum(gamma, axis=0) if np.max(abs(self.Pi - new_pi)) < criterion and \ np.max(abs(self.A - new_a)) < criterion and \ np.max(abs(self.B - new_b)) < criterion: break self.A, self.B, self.Pi = new_a, new_b, new_pi 4 序列预测问题

序列预测问题就是已知模型参数λ=(π,A,B)\lambda=(\boldsymbol{\pi},A,B)λ=(π,A,B),给定观测序列x\boldsymbol{x}x,求最有可能的状态序列y\boldsymbol{y}y,即求p(y∣x)p(\boldsymbol{y}|\boldsymbol{x})p(y∣x)的最大值。

4.1 维特比算法 初始化,当t=1t=1t=1时,最优路径的备选由N个状态组成,它的前驱为空:
δ1(i)=πibix1,i=1,2,3,..,N\delta_1(i)=\pi_ib_{ix_1},\quad i=1,2,3,..,Nδ1​(i)=πi​bix1​​,i=1,2,3,..,N

ψ1(i)=0,i=1,2,3,..,N\psi_1(i)=0,\quad i=1,2,3,..,Nψ1​(i)=0,i=1,2,3,..,N

递推,当t≥2t\geq 2t≥2时,每条备选的路径像贪吃蛇一样吃入一个状态,长度增加一个单位,根据状态转移概率和发射概率计算花费。找出新的局部最优路径,更新两个数组。
δt(i)=max⁡1≤j≤N[δt−1(j)aji]bixt,i=1,2,3,..,N\delta_t(i)=\max_{1\leq j \leq N}[\delta_{t-1}(j)a_{ji}]b_{ix_t},\quad i=1,2,3,..,Nδt​(i)=1≤j≤Nmax​[δt−1​(j)aji​]bixt​​,i=1,2,3,..,N

ψt(i)=arg⁡max⁡1≤j≤N(δt(j)aji),i=1,2,3,..,N\psi_t(i)=\arg\max_{1\leq j \leq N}(\delta_t(j)a_{ji}),\quad i=1,2,3,..,Nψt​(i)=arg1≤j≤Nmax​(δt​(j)aji​),i=1,2,3,..,N

终止,找出最终时刻(δt(i)(\delta_t(i)(δt​(i)数组中最大概率p∗p^*p∗,以及相应的结尾状态下表yT∗y_T^*yT∗​
P∗=max⁡1≤j≤NδT(i)P^*=\max_{1\leq j \leq N}\delta_T(i)P∗=1≤j≤Nmax​δT​(i)

yT∗=arg⁡max⁡1≤j≤NδT(i)y_T^*=\arg\max_{1\leq j \leq N}\delta_T(i)yT∗​=arg1≤j≤Nmax​δT​(i)

回溯,根据前驱数组ψt\psi_tψt​回溯前驱状态,取得最优路径状态下标y∗=(y1∗,y2∗,...,yT∗)\boldsymbol{y}^*=(y_1^*,y_2^*,...,y_T^*)y∗=(y1∗​,y2∗​,...,yT∗​)。
yt∗=ψt+1(yt+1∗),t=T−1,T−2,...,1y_t^*=\psi_{t+1}(y_{t+1}^*),\quad t=T-1,T-2,...,1yt∗​=ψt+1​(yt+1∗​),t=T−1,T−2,...,1 def viterbi(self, x): self.X = x self.T = len(x) self.Y = np.zeros(self.N) # 初始化delta和xi delta = np.zeros((self.T, self.N)) psi = np.zeros((self.T, self.N)) # 初始化,t=1时 for i in range(self.N): delta[0][i] = self.Pi[i] * self.B[i][self.X[0]] psi[0][i] = 0 # 递推 for t in range(1, self.T): for i in range(self.N): temp = 0 index = 0 for j in range(self.N): if temp < delta[t - 1][j] * self.A[j][i]: temp = delta[t - 1][j] * self.A[j][i] index = j delta[t][i] = temp * self.B[i][self.X[t]] psi[t][i] = j # 最终 self.Y[-1] = delta.argmax(axis=1)[-1] p = delta[self.T - 1][int(self.Y[-1])] # 回溯 for i in range(self.T - 1, 0, -1): self.Y[i - 1] = psi[i][int(self.Y[i])] return p, self.Y

运行:

# 使用维特比 hmm = HMM(n, m, a, b, pi) p, y = hmm.viterbi(X) print(p) print(y)

结果:

0.014699999999999998 [2. 2. 2.]

代码详情请见我的Github,请大家多多支持点star。


作者:Elenstone



nlp hmm Python

需要 登录 后方可回复, 如果你还没有账号请 注册新账号