在上一篇《一文读懂NLP之隐马尔科夫模型(HMM)详解加python实现》中已经详细介绍了HMM模型的算法原理,这一篇主要是从零实现HMM模型。
定义HMM模型:
class HMM(object):
def __init__(self, n, m, a=None, b=None, pi=None):
# 可能的隐藏状态数
self.N = n
# 可能的观测数
self.M = m
# 状态转移概率矩阵
self.A = a
# 观测概率矩阵
self.B = b
# 初始状态概率矩阵
self.Pi = pi
# 观测序列
self.X = None
# 状态序列
self.Y = None
# 序列长度
self.T = 0
# 定义前向算法
self.alpha = None
# 定义后向算法
self.beta = None
2 概率计算问题
给定模型参数λ=(π,A,B)\lambda=(\boldsymbol{\pi},A,B)λ=(π,A,B),和一个观测序列x\boldsymbol{x}x,,计算在这个模型参数λ\lambdaλ下,观测序列出现的最大概率,即p(x∣λ)p(\boldsymbol{x}|\lambda)p(x∣λ)的最大值。
先定义盒子和球模型,状态集合为Q={1,2,3}Q=\{1,2,3\}Q={1,2,3},观测集合为V={红,白}V=\{红,白\}V={红,白},
A=[0.50.20.30.30.50.20.20.30.5]A=\begin{bmatrix}
0.5 & 0.2 & 0.3\\
0.3 & 0.5 & 0.2\\
0.2 & 0.3 & 0.5
\end{bmatrix}A=⎣⎡0.50.30.20.20.50.30.30.20.5⎦⎤ , B=[0.50.50.40.60.70.3]B=\begin{bmatrix}
0.5 & 0.5 \\
0.4 & 0.6\\
0.7 & 0.3
\end{bmatrix}B=⎣⎡0.50.40.70.50.60.3⎦⎤ , π=(0.2,0.4,0.4)T\pi=(0.2,0.4,0.4)^Tπ=(0.2,0.4,0.4)T
设T=3T=3T=3,x=(红,白,红)\boldsymbol{x}=(红,白,红)x=(红,白,红)
代码表示:
q = [1, 2, 3]
v = ["红", "白"]
n = len(q)
m = len(v)
x = ["红", "白", "红"]
# 建立一个字典
char_to_id = {}
id_to_char = {}
for i in v:
char_to_id[i] = len(char_to_id)
id_to_char[char_to_id[i]] = i
X = [char_to_id[i] for i in x]
a = np.array([[0.5, 0.2, 0.3],
[0.3, 0.5, 0.2],
[0.2, 0.3, 0.5]])
b = np.array([[0.5, 0.5],
[0.4, 0.6],
[0.7, 0.3]])
pi = np.array([0.2, 0.4, 0.4])
2.1 前向算法
具体的算法原理就不讲了,可以参考上一篇文章《一文读懂NLP之隐马尔科夫模型(HMM)详解加python实现》。
初始 def forward(self, x):
"""
前向算法
计算给定模型参数和观测序列的情况下,观测序列出现的最大概率
:param x: 观测序列
:return: 观测值
"""
# 序列长度
self.T = len(x)
self.X = np.array(x)
# alpha是一个具有T行N列的矩阵
self.alpha = np.zeros((self.T, self.N))
# 初始状态
for i in range(self.N):
self.alpha[0][i] = self.Pi[i] * self.B[i][self.X[0]]
# 递推
for t in range(1, self.T):
for i in range(self.N):
probability_sum = 0
for j in range(self.N):
probability_sum += self.alpha[t - 1][j] * self.A[j][i]
self.alpha[t][i] = probability_sum * self.B[i][self.X[t]]
# 终止
return sum(self.alpha[self.T - 1])
运行:
hmm = HMM(n, m, a, b, pi)
print("forward algorithm:", hmm.forward(X))
结果:
forward algorithm: 0.130218
2.2 后向算法
给定HMM模型λ\boldsymbol{\lambda}λ,定义时刻ttt状态为qiq_iqi的条件下,从t+1到Tt+1到Tt+1到T的部分观测序列为xt+1,xt+2,xt+3,...,xTx_{t+1},x_{t+2},x_{t+3},...,x_Txt+1,xt+2,xt+3,...,xT的概率为后向概率,记作:
βt(i)=p(xt+1,xt+2,xt+3,...,xT∣yt=qi,λ)\beta_t(i)=p(x_{t+1},x_{t+2},x_{t+3},...,x_T|y_t=q_i,\boldsymbol{\lambda})βt(i)=p(xt+1,xt+2,xt+3,...,xT∣yt=qi,λ)
def backward(self, x):
"""
后向算法
"""
# 序列长度
self.T = len(x)
self.X = np.array(x)
# beta是一个T行N列的矩阵
self.beta = np.zeros((self.T, self.N))
# 当t=T时,置值为1
for i in range(self.N):
self.beta[self.T - 1][i] = 1
# 从t=T-1递推到t=1
for t in range(self.T - 2, -1, -1):
for i in range(self.N):
for j in range(self.N):
self.beta[t][i] += self.A[i][j] * self.B[j][self.X[t + 1]] * self.beta[t + 1][j]
# 终止
sum_probability = 0
for i in range(self.N):
sum_probability += self.Pi[i] * self.B[i][self.X[0]] * self.beta[0][i]
return sum_probability
运行:
hmm = HMM(n, m, a, b, pi)
# print("forward algorithm:", hmm.forward(X))
print("backward algorithm:", hmm.forward(X))
结果:
backward algorithm: 0.130218
3 模型训练问题
给定训练集(x(i),y(i))(\boldsymbol{x}^{(i)},\boldsymbol{y}^{(i)})(x(i),y(i)),估计模型参数λ=(π,A,B)\lambda=(\boldsymbol{\pi},A,B)λ=(π,A,B),使得在该模型下观测序列概率p(x∣λ)p(\boldsymbol{x}|\lambda)p(x∣λ)最大。
3.1 监督学习–最大似然估计初始状态概率向量的估计:
统计S个样本中,初始状态为qiq_iqi的频率。
π^i=NqiS\hat{\pi}_i=\frac{N_{q_i}}{S}π^i=SNqi
其中,NqiN_{q_i}Nqi是初始状态为qiq_iqi的样本数量,S是样本的数量。
状态转移概率矩阵的估计:
设样本中时刻t处于状态qiq_iqi,时刻t+1处于状态qjq_jqj的频数为AijA_{ij}Aij,那么状态转移概率矩阵的估计为:
a^ij=Aij∑j=1NAij,j=1,2,3,...,N;i=1,2,3,...,N\hat{a}_{ij}=\frac{A_{ij}}{\sum\limits_{j=1}^NA_{ij}},\quad j=1,2,3,...,N;\quad i=1,2,3,...,Na^ij=j=1∑NAijAij,j=1,2,3,...,N;i=1,2,3,...,N
发射概率矩阵的估计:
设样本中状态为iii并观测值为jjj的频数BijB_{ij}Bij,那么状态为iii观测为jjj的概率bijb_{ij}bij的估计为:
b^ij=Bij∑j=1MBij,j=1,2,3,...,M;i=1,2,3,...,N\hat{b}_{ij}=\frac{B_{ij}}{\sum\limits_{j=1}^MB_{ij}},\quad j=1,2,3,...,M;\quad i=1,2,3,...,Nb^ij=j=1∑MBijBij,j=1,2,3,...,M;i=1,2,3,...,N
def train(self, train_data):
"""
训练模型,使用最大似然估计
:param train_data: 训练数据,每一个样本:[观测值,隐藏状态值]
:return: 返回一个HMM模型
"""
self.T = int(len(train_data[0]) / 2)
sample_num = len(train_data)
# 初始化
self.init()
# 初始状态概率矩阵的估计
for sequence in train_data:
self.Pi[sequence[0 + self.T]] += 1
self.Pi = self.Pi / sample_num
# 状态转移矩阵的估计
a_num = np.zeros((self.N, self.N))
for sequence in train_data:
for i in range(self.T - 1):
a_num[sequence[i + self.T]][sequence[i + 1 + self.T]] += 1.0
temp = a_num.sum(axis=1).reshape((3, 1))
self.A = a_num / temp
# 发射概率矩阵的估计
b_num = np.zeros((self.N, self.M))
for sequence in train_data:
for i in range(self.T - 1):
b_num[sequence[i + self.T]][sequence[i]] += 1.0
temp = b_num.sum(axis=1).reshape((3, 1))
self.B = b_num / temp
在训练开始之前,需要创建训练数据
def generate_train_data(n, m, t, a, b, pi, nums=10000):
"""
生成训练数据
:param pi: 初始状态概率矩阵
:param b: 发射概率矩阵
:param a: 状态转移矩阵
:param n: 隐藏状态数量
:param m:观测值数量
:param t: 序列长度
:param nums: 样本数量
:return: 训练数据集
"""
train_data = []
for i in range(nums):
state_sequence = []
observation_sequence = []
# 初始状态
temp = 0
p = random.random()
for j in range(n):
temp += pi[j]
if p > temp:
continue
else:
state_sequence.append(j)
break
# 递推
for t_index in range(t):
# 生成状态
if t_index != 0:
temp = 0
p = random.random()
for state in range(n):
temp += a[state_sequence[-1]][state]
if p > temp:
continue
else:
state_sequence.append(state)
break
# 生成观测序列
temp = 0
p = random.random()
for observation in range(m):
temp += b[state_sequence[-1]][observation]
if p > temp:
continue
else:
observation_sequence.append(observation)
break
observation_sequence.extend(state_sequence)
train_data.append(observation_sequence)
return train_data
开始训练:
train_data = generate_train_data(n, m, 8, a, b, pi)
print(train_data)
hmm = HMM(n, m)
hmm.train(train_data)
print(hmm.Pi)
print(hmm.A)
print(hmm.B)
结果:
[0.1984 0.4011 0.4005]
[[0.49525581 0.20134884 0.30339535]
[0.29948182 0.50112204 0.19939614]
[0.20220083 0.30086282 0.49693635]]
[[0.50483721 0.49516279]
[0.39422253 0.60577747]
[0.70305531 0.29694469]]
可以看见,和原始的数据几乎一样。
2.2 Baum·welch算法aij=∑t=1T−1ξt(i,j)∑t=1T−1γt(i)a_{ij}=\frac{\sum\limits_{t=1}^{T-1}\xi_t(i,j)}{\sum\limits_{t=1}^{T-1}\gamma_t(i)}aij=t=1∑T−1γt(i)t=1∑T−1ξt(i,j)
bij=∑t=1,xt=vjTγt(i)∑t=1Tγt(i)b_{ij}=\frac{\sum\limits_{t=1,x_t=v_j}^T\gamma_t(i)}{\sum\limits_{t=1}^T\gamma_t(i)}bij=t=1∑Tγt(i)t=1,xt=vj∑Tγt(i)
πi=γ1(i)\pi_i=\gamma_1(i)πi=γ1(i)
def baum_welch(self, x, criterion=0.001):
self.T = len(x)
self.X = x
while True:
# 为了得到alpha和beta的矩阵
_ = self.forward(self.X)
_ = self.backward(self.X)
xi = np.zeros((self.T - 1, self.N, self.N), dtype=float)
for t in range(self.T - 1):
# 笨办法
# for i in range(self.N):
# gamma[t][i] = self.calculate_gamma(t, i)
# for j in range(self.N):
# xi[t][i][j] = self.calculate_psi(t, i, j)
# for i in range(self.N):
# gamma[self.T-1][i] = self.calculate_gamma(self.T-1, i)
# numpy矩阵的办法
denominator = np.sum(np.dot(self.alpha[t, :], self.A) *
self.B[:, self.X[t + 1]] * self.beta[t + 1, :])
for i in range(self.N):
molecular = self.alpha[t, i] * self.A[i, :] * self.B[:, self.X[t+1]]*self.beta[t+1, :]
xi[t, i, :] = molecular / denominator
gamma = np.sum(xi, axis=2)
prod = (self.alpha[self.T-1, :]*self.beta[self.T-1, :])
gamma = np.vstack((gamma, prod / np.sum(prod)))
new_pi = gamma[0, :]
new_a = np.sum(xi, axis=0) / np.sum(gamma[:-1, :], axis=0).reshape(-1, 1)
new_b = np.zeros(self.B.shape, dtype=float)
for k in range(self.B.shape[1]):
mask = self.X == k
new_b[:, k] = np.sum(gamma[mask, :], axis=0) / np.sum(gamma, axis=0)
if np.max(abs(self.Pi - new_pi)) < criterion and \
np.max(abs(self.A - new_a)) < criterion and \
np.max(abs(self.B - new_b)) < criterion:
break
self.A, self.B, self.Pi = new_a, new_b, new_pi
4 序列预测问题
序列预测问题就是已知模型参数λ=(π,A,B)\lambda=(\boldsymbol{\pi},A,B)λ=(π,A,B),给定观测序列x\boldsymbol{x}x,求最有可能的状态序列y\boldsymbol{y}y,即求p(y∣x)p(\boldsymbol{y}|\boldsymbol{x})p(y∣x)的最大值。
4.1 维特比算法 初始化,当t=1t=1t=1时,最优路径的备选由N个状态组成,它的前驱为空:ψ1(i)=0,i=1,2,3,..,N\psi_1(i)=0,\quad i=1,2,3,..,Nψ1(i)=0,i=1,2,3,..,N
递推,当t≥2t\geq 2t≥2时,每条备选的路径像贪吃蛇一样吃入一个状态,长度增加一个单位,根据状态转移概率和发射概率计算花费。找出新的局部最优路径,更新两个数组。ψt(i)=argmax1≤j≤N(δt(j)aji),i=1,2,3,..,N\psi_t(i)=\arg\max_{1\leq j \leq N}(\delta_t(j)a_{ji}),\quad i=1,2,3,..,Nψt(i)=arg1≤j≤Nmax(δt(j)aji),i=1,2,3,..,N
终止,找出最终时刻(δt(i)(\delta_t(i)(δt(i)数组中最大概率p∗p^*p∗,以及相应的结尾状态下表yT∗y_T^*yT∗yT∗=argmax1≤j≤NδT(i)y_T^*=\arg\max_{1\leq j \leq N}\delta_T(i)yT∗=arg1≤j≤NmaxδT(i)
回溯,根据前驱数组ψt\psi_tψt回溯前驱状态,取得最优路径状态下标y∗=(y1∗,y2∗,...,yT∗)\boldsymbol{y}^*=(y_1^*,y_2^*,...,y_T^*)y∗=(y1∗,y2∗,...,yT∗)。 def viterbi(self, x):
self.X = x
self.T = len(x)
self.Y = np.zeros(self.N)
# 初始化delta和xi
delta = np.zeros((self.T, self.N))
psi = np.zeros((self.T, self.N))
# 初始化,t=1时
for i in range(self.N):
delta[0][i] = self.Pi[i] * self.B[i][self.X[0]]
psi[0][i] = 0
# 递推
for t in range(1, self.T):
for i in range(self.N):
temp = 0
index = 0
for j in range(self.N):
if temp < delta[t - 1][j] * self.A[j][i]:
temp = delta[t - 1][j] * self.A[j][i]
index = j
delta[t][i] = temp * self.B[i][self.X[t]]
psi[t][i] = j
# 最终
self.Y[-1] = delta.argmax(axis=1)[-1]
p = delta[self.T - 1][int(self.Y[-1])]
# 回溯
for i in range(self.T - 1, 0, -1):
self.Y[i - 1] = psi[i][int(self.Y[i])]
return p, self.Y
运行:
# 使用维特比
hmm = HMM(n, m, a, b, pi)
p, y = hmm.viterbi(X)
print(p)
print(y)
结果:
0.014699999999999998
[2. 2. 2.]
代码详情请见我的Github,请大家多多支持点star。