import numpy as np
import matplotlib.pyplot as plt
import re
import jieba
import os
from sklearn.model_selection import train_test_split
from gensim.models import KeyedVectors
from keras.models import Sequential
from keras.layers import Dense, GRU, Embedding, LSTM, Bidirectional
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.optimizers import Adam, RMSprop
from keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard, ReduceLROnPlateau
import time
import warnings
warnings.filterwarnings("ignore")
# 使用gensim加载预训练的中文词向量
start_time = time.time()
cn_model = KeyedVectors.load_word2vec_format(
'../datas/chinese_word_vectors/sgns.zhihu.bigram', binary=False)
end_time = time.time()
print('加载预训练词向量所用时间:{} second'.format((end_time - start_time)))
def show_embedding():
# 由这个包训练的词向量长度为300的向量
print(cn_model['山东大学'].shape[0])
print('**' * 45)
print(cn_model['小学'])
def cos_similarity():
"""
计算余弦相似度,演示词向量的含义
:return:
"""
# 1、计算相似度
print(cn_model.similarity('橘子', '橙子'))
print(cn_model.similarity('西瓜', '橙子'))
# 2、自己实现余弦相似度的计算。
cosa_b = np.dot(cn_model['橘子'] / np.linalg.norm(cn_model['橘子']),
cn_model['橙子'] / np.linalg.norm(cn_model['橙子']))
print(cosa_b)
# 3、找出最相似的单词,(通过余弦相似度)
print(cn_model.most_similar(positive=['大学'], topn=10))
# 4、找出最不相似的单词
test_words = '老师 会计师 程序员 律师 医生 老人'
test_words_result = cn_model.doesnt_match(test_words.split())
print('在词组:{}中,不同类别的是:{}'.format(test_words, test_words_result))
# 5
print(cn_model.most_similar(positive=['女人', '出轨'], negative=['男人'], topn=3))
def read_data():
"""
读入训练语料
:return:
"""
# 获得样本的索引,样本是存放在2个文件夹中,分别为 neg 和 pos文件夹中,每个文件夹中有2000个
pos_path = '../datas/chinese_sentiment/pos'
neg_path = '../datas/chinese_sentiment/neg'
pos_texts = os.listdir(pos_path)
neg_texts = os.listdir(neg_path)
print('总样本数量为:{}'.format(len(pos_texts) + len(neg_texts)))
# 将所有评论存储到一个列表中,每条评论为一个string
train_text_origin = []
for i in range(len(pos_texts)):
with open(pos_path + '/' + pos_texts[i], 'r', errors='ignore') as f:
text = f.read().strip()
train_text_origin.append(text)
for i in range(len(neg_texts)):
with open(neg_path + '/' + neg_texts[i], 'r', errors='ignore') as f:
text = f.read().strip()
train_text_origin.append(text)
return train_text_origin
def tokenize(train_text_origin):
"""
1、去除标点符号;2、分词;3、tokenize
:param train_text_origin:
:return:
"""
train_tokenize = []
for text in train_text_origin:
# 1、去掉标点符号
text = re.sub("[\s+\.\!\/_,$%^*(+\"\']+|[+——!,。?、~@#¥%……&*()]+", "", text)
# 2、使用jieba分词
cut = jieba.cut(text)
# jieba分词的输出结果是一个生成器,将生成器转为list
cut_list = [i for i in cut]
for i, word in enumerate(cut_list):
try:
# 将单词转为索引index
cut_list[i] = cn_model.vocab[word].index
except KeyError:
# 如果出现未登录词(单词不在字典中)
cut_list[i] = 0
train_tokenize.append(cut_list)
return train_tokenize
def preprocess_data(train_tokens):
"""
判断截断的长度 多少最合理?
:param train_tokens:
:return:
"""
# 获取所有tokens的长度
num_tokens_len = np.array([len(token) for token in train_tokens])
# 平均tokens的长度
print(np.mean(num_tokens_len), np.max(num_tokens_len))
# 可视化一下
# plt.hist(np.log(num_tokens_len), bins=100)
# plt.xlim((0, 10))
# plt.ylabel('number of tokens')
# plt.xlabel('length of tokens')
# plt.title('Distribution of tokens')
# plt.show()
# 取tokens长度的平均值 + 2 * tokens长度的标准差 == 236
max_tokens = int(np.mean(num_tokens_len) + 2 * np.std(num_tokens_len))
# 复合确认 到底能够覆盖多少样本。 0.9565
print(np.sum(num_tokens_len = num_words] = 0
print(train_pad[33])
# 构建targets目标,前2000个样本为1(正样本), 后2000个样本为0(负样本)
train_targets = np.concatenate([np.ones(2000), np.zeros(2000)])
print(train_targets.shape, train_pad.shape)
# 数据拆分
X_train, X_test, y_train, y_test = train_test_split(
train_pad, train_targets, test_size=0.1, random_state=12)
return X_train, X_test, y_train, y_test
def train(embeding_m, max_tokens, X_train, y_train, embedding_dims=300, num_words=50000):
# 训练
model = Sequential()
# 模型第一层
model.add(Embedding(num_words, embedding_dims, weights=[embeding_m],
input_length=max_tokens, trainable=False))
# 模型第二层双向rnn层
model.add(Bidirectional(LSTM(64, return_sequences=False)))
# todo 如果想多加隐藏层
"""
model.add(GRU(units=32, return_sequences=True))
model.add(GRU(units=16, return_sequences=True))
model.add(GRU(units=32, return_sequences=False))
"""
# 模型第三层 全连接
model.add(Dense(1, activation='sigmoid'))
# 构建优化器
optimizer = Adam(lr=1e-3)
model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
# 打印下模型结构
print(model.summary())
# 建立一个权重的储存点
path_checkpoint = 'sentiment_checkpoint.keras'
checkpoint = ModelCheckpoint(
filepath=path_checkpoint, monitor='val_loss', verbose=1, save_weights_only=True,
save_best_only=True)
# 尝试加载预训练的模型
try:
model.load_weights(path_checkpoint)
print('成功恢复模型训练!')
except Exception as e:
print(e)
# 定义早期停止技术。如果3个epoch内, val_loss没有改善就停止训练
early_stopping = EarlyStopping(monitor='val_loss', patience=3, verbose=1)
# 学习率衰减
lr_reduction = ReduceLROnPlateau(
monitor='val_loss', factor=0.1, min_lr=1e-5, patience=0, verbose=1
)
# 定义callsback函数
callbacks = [early_stopping, checkpoint, lr_reduction]
# 执行模型训练
model.fit(X_train, y_train, validation_split=0.1,
nb_epoch=1, batch_size=128, callbacks=callbacks)
def predict_sentiment(text, model):
"""
:param text: 需要预测的文本
:param model: 训练的模型
:return:
"""
# 1、去标点符号
text = re.sub("[\s+\.\!\/_,$%^*(+\"\']+|[+——!,。?、~@#¥%……&*()]+", "", text)
# 2、结巴分词
cut = jieba.cut(text)
# 3、结巴分词的输出结果是一个生成器,需要将这个生成器转为list
cut_list = [i for i in cut]
for i, word in enumerate(cut_list):
try:
# 将词语转为索引值 效果就是 word2int
cut_list[i] = cn_model.vocab[word].index
except KeyError:
# 如果词语不在字典当中(未登录词),则输出0
cut_list[i] = 0
# 4、调用keras数据预处理库 来进行填充和裁剪
tokens_pad = pad_sequences([cut_list], maxlen=max_tokens,
padding='pre', truncating='pre')
# 5、预测
result = model.predict(x=tokens_pad)
coef = result[0][0]
if coef >=0.5:
print('是一个正面的评价,预测概率为:{:.4f}'.format(coef))
else:
print('是一个负面的评价,预测概率为:{:.4f}'.format(coef))
def test(embeding_m, max_tokens, X_test, y_test, embedding_dims=300, num_words=50000):
# 训练
model = Sequential()
# 模型第一层
model.add(Embedding(num_words, embedding_dims, weights=[embeding_m],
input_length=max_tokens, trainable=False))
# 模型第二层双向rnn层
model.add(Bidirectional(LSTM(64, return_sequences=False)))
# 模型第三层 全连接
model.add(Dense(1, activation='sigmoid'))
# 构建优化器
optimizer = Adam(lr=1e-3)
model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
# 建立一个权重的储存点
path_checkpoint = 'sentiment_checkpoint.keras'
# 尝试加载预训练的模型
try:
model.load_weights(path_checkpoint)
print('成功恢复模型训练!')
except Exception as e:
print(e)
rezult = model.evaluate(X_test, y_test)
print('Accuracy:{}'.format(rezult[1]))
test_list = [
'酒店设施不是新的,服务态度很不好',
'酒店卫生条件非常不好',
'床铺非常舒适',
'我觉得还好吧,就是有点吵'
]
for text in test_list:
predict_sentiment(text, model)
if __name__ == '__main__':
# show_embedding()
# cos_similarity()
train_text_origin = read_data()
train_tokenize = tokenize(train_text_origin)
max_tokens = preprocess_data(train_tokens=train_tokenize)
# show_reverse_tokens(train_tokenize, train_texts_origrin=train_text_origin)
embedding_m = embed_matrix(embedding_dims=300)
X_train, X_test, y_train, y_test = pad_and_truncate(
train_tokenize, max_tokens, num_words=50000)
# train(embedding_m, max_tokens, X_train, y_train)
test(embedding_m, max_tokens, X_test, y_test)