5-RNN-06_剧本生成项目

Dreama ·
更新时间:2024-09-20
· 876 次阅读

""" 项目:电视剧本生成。数据集:https://www.kaggle.com/wcukierski/the-simpsons-by-the-data 提交时间:12月29日之前。 提交邮箱:yingjun@ibefeing.com """ from utils import helper import numpy as np import warnings from tensorflow.contrib import seq2seq import tensorflow as tf # 数据读取和数据预处理 def read_data_and_preprocess(): data_dir = '../datas/simpson/simpsons/moes_tavern_lines.txt' text = helper.load_data(data_dir) # 去除剧本开头的版权说明。 text = text[81:] print('该剧本长度:%d' % len(text)) # 探索数据集 view_sentence_range = (0, 100) print('数据集统计信息') print('唯一单词的数量: {}'.format(len({word: None for word in text.split()}))) scenes = text.split('\n\n') print('场景数量为: {}'.format(len(scenes))) sentence_count_scene = [scene.count('\n') for scene in scenes] print('每个场景平均句子数量: {}'.format(np.average(sentence_count_scene))) sentences = [sentence for scene in scenes for sentence in scene.split('\n')] print('句子总数量为: {}'.format(len(sentences))) word_count_sentence = [len(sentence.split()) for sentence in sentences] print('每句的平均单词数量: {}'.format(np.average(word_count_sentence))) print() print('The sentences {} to {}:'.format(*view_sentence_range)) print('\n'.join(text.split('\n')[view_sentence_range[0]:view_sentence_range[1]])) # todo 数据预处理:1、查找表(vocab_to_int int_to_vocab);2、标点符号标记 # 1、查找表创建2个字典: # - `vocab_to_int`:{单词:数字} # - `int_to_vocab`:{数字:单词} def create_lookup_tables(text): """ Create lookup tables for vocabulary :param text: The text of tv scripts split into words :return: A tuple of dicts (vocab_to_int, int_to_vocab) """ # todo 需要编程: words = sorted(list(set(text))) vocab_to_int = {word: idx for idx, word in enumerate(words)} int_to_vocab = dict(enumerate(words)) return vocab_to_int, int_to_vocab # 标点符号代替符 """ "!" 转换成: "||Exclamation_Mark||"。key值是标点符号,value是替代的文本。 - Period ( . ) - Comma ( , ) - Quotation Mark ( " ) - Semicolon ( ; ) - Exclamation mark ( ! ) - Question mark ( ? ) - Left Parentheses ( ( ) - Right Parentheses ( ) ) - Dash ( -- ) - Return ( \n ) """ def token_lookup(): """ Generate a dict to turn punctuation into a token. :return: Tokenize dictionary where the key is the punctuation and the value is the token """ token_dict = {'.': '||Period||', ', ': '||Comma||', '"': '||Quotation_Mark||', ';': '||Semicolon||', '!': '||Exclamation_mark||','?':'||Question_mark||', '(':'||Left_Parentheses||', ')': '||Right_Parentheses||', '--': '||Dash||', '\n': '||Return||'} return token_dict # 预处理数据(并生成 训练 验证 和测试数据。),并保存本地 def preprocess_and_save_data(): helper.preprocess_and_save_data(data_dir, token_lookup, create_lookup_tables) # todo 检查点!重启后,可以直接从这里开始 int_text, vocab_to_int, int_to_vocab, token_dict = helper.load_preprocess() # todo 构建RNN网络 # todo 1、创建占位符 def get_inputs(n_steps): """ 为input, targets, and learning rate 创建占位符 :return: Tuple (input, targets, learning rate) """ # todo 需要编程 inputs = tf.placeholder(tf.int32, [None, n_steps], name='input') targets = tf.placeholder(tf.int32, [None, n_steps], name='targets') learning_rate = tf.placeholder(tf.float32) keep_prob = tf.placeholder(tf.float32, name='keep_prob') return inputs, targets, learning_rate, keep_prob # todo Build RNN Cell and Initialize """ 1\堆栈一个或者多个[`BasicLSTMCells`] 2\再使用函数[`tf.identity()`],初始化状态,并命名为:"initial_state" Return : `(Cell, InitialState)` """ def get_init_cell(batch_size, rnn_size, keep_prob, lstm_layers=2): """ Create an RNN Cell and initialize it. :param batch_size: Size of batches :param rnn_size: Size of RNNs :return: Tuple (cell, initialize state) """ # todo 需要编程 def build_cell(lstm_size, keep_prob): cell = tf.nn.rnn_cell.BasicLSTMCell(num_units=lstm_size) drop = tf.nn.rnn_cell.DropoutWrapper(cell, output_keep_prob=keep_prob) return drop # 堆栈cell 构建多层隐藏层 multi_cell = tf.nn.rnn_cell.MultiRNNCell( [build_cell(rnn_size, keep_prob) for _ in range(lstm_layers)] ) # todo 这里使用tf.identity(final_state同)是为了生成一个op操作符,断点继续训练时候要用。 initial_state = multi_cell.zero_state(batch_size, tf.float32) initial_state1 = tf.identity(initial_state, name='initial_state') return multi_cell, initial_state, initial_state1 # todo 单词嵌入-对`input_data`应用单词嵌入;返回 嵌入序列 def get_embed(input_data, vocab_size, embed_dim): """ Create embedding for . :param input_data: TF placeholder for text input. :param vocab_size: Number of words in vocabulary. :param embed_dim: Number of embedding dimensions :return: Embedded input. """ # todo 需要编程: # 1、构建嵌入矩阵的查找表 lookup_w = tf.Variable( initial_value=tf.random_uniform([vocab_size, embed_dim], -1.0, 1.0) ) # 2、获得嵌入输出 embed = tf.nn.embedding_lookup(params=lookup_w, ids=input_data) # [N, n_steps, embed_size] return embed # todo 使用函数[`tf.nn.dynamic_rnn()`]创建RNN """ - 使用函数[`tf.nn.dynamic_rnn()`]创建RNN。 - 对final state应用[`tf.identity()`],并命名为:"final_state"(name='final_state') Return :a tuple `(Outputs, FinalState)` """ def build_rnn(cell, inputs, initial_state): """ Create a RNN using a RNN Cell :param cell: RNN Cell :param inputs: 传入嵌入的输出(而不是Input占位符) :return: Tuple (Outputs, Final State) """ # todo 需要编程: rnn_outputs, final_state = tf.nn.dynamic_rnn( cell, inputs, initial_state=initial_state ) # rnn_outputs shape = [批量, 时间步, 隐藏层节点数量] final_state1 = tf.identity(final_state, name='final_state') return rnn_outputs, final_state, final_state1 # todo 构建RNN网络 """ 运用上面完成的函数,完成以下应用: - 将输入:`input_data`传入函数 `get_embed(input_data, vocab_size, embed_dim)` 并获得嵌入输出。 - 使用`cell` 和函数 `build_rnn(cell, inputs)`构建RNN; - 应用一个全连接层(使用线性激活),输出节点个数== `vocab_size` 。 Return:a tuple (Logits, FinalState) """ def build_nn(cell, input_data, vocab_size, embed_dim, initial_state): """ Build part of the neural network :param cell: RNN cell :param rnn_size: Size of rnns :param input_data: Input data :param vocab_size: Vocabulary size :param embed_dim: Number of embedding dimensions :return: Tuple (Logits, FinalState) """ # todo 1、获取嵌入层的输出 embed = get_embed(input_data, vocab_size, embed_dim) # todo-2、获取 隐藏层的输出 rnn_outputs, final_state, final_state1 = build_rnn( cell, embed, initial_state=initial_state) # 3、构建全连接,获取logits # https://github.com/tensorflow/tensorflow/blob/r1.14/tensorflow/contrib/layers/python/layers/layers.py # 该api接受3-d 的张量,会拉成2-d矩阵,做全连接获得2-d的logits后,再转置为3-D的logits(shape = [批量, 时间步, 类别数量]) logits = tf.contrib.layers.fully_connected(rnn_outputs, vocab_size, activation_fn=None) # [N, n-steps, num_classes] return logits, final_state, final_state1 # todo 批次 """ 完成`get_batches`函数的代码编写:使用 `int_text`创建input 和 targets的batches。 The batches是一个 Numpy数组, shape =`(number of batches, 2, batch size, sequence length)`。每一个batch包含2部分: - 第一部分是单个**input**的batch, shape = `[batch size, sequence length]` - 第二部分是单个**targets**的batch, shape = `[batch size, sequence length]` 最后一个batch没有办法填充满,那么就直接丢弃掉。 例如: `get_batches([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], 2, 3)` 返回以下形式的Numpy数组: [ # First Batch [ # Batch of Input [[ 1 2 3], [ 7 8 9]], # Batch of targets [[ 2 3 4], [ 8 9 10]] ], # Second Batch [ # Batch of Input [[ 4 5 6], [10 11 12]], # Batch of targets [[ 5 6 7], [11 12 13]] ] ] """ # todo-方法一 def get_batches(int_text, batch_size, n_steps): """ Return batches of input and target :param int_text: Text with the words replaced by their ids :param batch_size: The size of batch :param n_steps: The length of sequence :return: Batches as a Numpy array """ # todo 需要编程: # 计算有多少个批量 n_batches = len(int_text) // (batch_size * n_steps) origin_x = np.array(int_text[:n_batches* batch_size * n_steps]) origin_y = np.array(int_text[1: n_batches * batch_size * n_steps + 1]) # 对origin重塑 x_reshaped = np.reshape(origin_x, newshape=[batch_size, -1]) y_reshaped = np.reshape(origin_y, newshape=[batch_size, -1]) batch_reshape_x = np.split(x_reshaped, n_batches, axis=1) batch_reshape_y = np.split(y_reshaped, n_batches, axis=1) batches = np.array(list(zip(batch_reshape_x, batch_reshape_y))) return batches # todo 超参数的设置 num_epochs = 80 # 训练次数 batch_size = 32 # 批量大小 rnn_size = 128 # RNN隐藏层节点数量大小(神经元个数) embed_dim = 200 # 嵌入维度大小 Embedding Dimension Size seq_length = 100 # 序列长度(就是 n_steps) learning_rate = 1e-3 show_every_n_batches = 30 # 打印进度的批次数量 save_dir = './save' # todo 构建模型图(损失和 优化器在模型图中构建。) train_graph = tf.Graph() with train_graph.as_default(): vocab_size = len(int_to_vocab) input_text, targets, lr, keep_prob = get_inputs(n_steps=seq_length) input_data_shape = tf.shape(input_text) multi_cell, initial_state, initial_state1 = get_init_cell( input_data_shape[0], rnn_size, keep_prob) logits, final_state, final_state1 = build_nn( multi_cell, input_text, vocab_size, embed_dim, initial_state) # 生成单词的概率 probs = tf.nn.softmax(logits, name='probs') # 损失函数 Loss function """ 将交叉熵损失的函数过程全部封装在该函数内了。(average_across_timesteps average_across_batch 两个参数使用True的情况下,返回一个标量loss(即该批次的均交叉熵损失。)) seq2seq.sequence_loss( logits, # [batch_size, sequence_length, num_decoder_symbols] targets, # [batch_size, sequence_length] 使用的是真实标签,所以是2-D的。 weights, # [batch_size, sequence_length] 作为掩码(比如有效的时间步为1,padding的设置为0) average_across_timesteps=True, # 设置为True的话,会沿着 时间步 累加loss并求均值。 average_across_batch=True, # 设置为True的话,会沿着 批次 累加loss并求均值。 softmax_loss_function=None, name=None): """ cost = seq2seq.sequence_loss( logits=logits, targets=targets, weights=tf.ones([input_data_shape[0], input_data_shape[1]]) ) optimizer = tf.train.AdamOptimizer(lr) # 梯度裁剪 gradients = optimizer.compute_gradients(cost) capped_gradients = [(tf.clip_by_value(grad, -1., 1.), var) for grad, var in gradients if grad is not None] train_op = optimizer.apply_gradients(capped_gradients) # todo 训练 def training(): batches = get_batches(int_text, batch_size, seq_length) with tf.Session(graph=train_graph) as sess: sess.run(tf.global_variables_initializer()) for epoch_i in range(num_epochs): # 将initial_state 跑出来,并赋值给state。 state = sess.run(initial_state, {input_text: batches[0][0]}) for batch_i, (x, y) in enumerate(batches): feed = {input_text: x, targets: y, initial_state: state, lr: learning_rate, keep_prob: 0.6} train_loss, state, _ = sess.run([cost, final_state, train_op], feed) # 每30步 打印1次模型损失。 if (epoch_i * len(batches) + batch_i) % show_every_n_batches == 0: print('Epoch {:>3} Batch {:>4}/{} train_loss = {:.3f}'.format( epoch_i, batch_i, len(batches), train_loss)) # 模型持久化 saver = tf.train.Saver() saver.save(sess, save_dir) print('Model Trained and Saved') # todo 生成电视剧本 # checkpoints _, vocab_to_int, int_to_vocab, token_dict = helper.load_preprocess() seq_length, load_dir = helper.load_params() print(load_dir) """ ### Get Tensors 使用函数[`get_tensor_by_name()`](https://www.tensorflow.org/api_docs/python/tf/Graph#get_tensor_by_name)中的方法`loaded_graph`获取tensors。Tensors中的名字如下: - "input:0" - "initial_state:0" - "final_state:0" - "probs:0" Return:a tuple `(InputTensor, InitialStateTensor, FinalStateTensor, ProbsTensor)` """ def get_tensors(loaded_graph): """ Get input, initial state, final state, and probabilities tensor from :param loaded_graph: TensorFlow graph loaded from file :return: Tuple (InputTensor, InitialStateTensor, FinalStateTensor, ProbsTensor) """ # todo 需要编程: input_tensor = loaded_graph.get_tensor_by_name('input:0') initial_state_tensor = loaded_graph.get_tensor_by_name("initial_state:0") final_state_tensor = loaded_graph.get_tensor_by_name("final_state:0") probs_tensor = loaded_graph.get_tensor_by_name("probs:0") return input_tensor, initial_state_tensor, final_state_tensor, probs_tensor # Choose Word # 使用 `probabilities`参数选取下一个次(选取概率最大的单词) def pick_word(probabilities, int_to_vocab): """ Pick the next word in the generated text :param probabilities: Probabilites of the next word :param int_to_vocab: Dictionary of word ids as the keys and words as the values :return: String of the predicted word """ # todo 需要编程: word = int_to_vocab[np.argmax(probabilities)] return word # todo 使用训练模型生成电视剧本 def test(): gen_length = 400 # homer_simpson, moe_szyslak, or Barney_Gumble prime_word = 'moe_szyslak' # print(vocab_to_int) # print(vocab_to_int[prime_word]) loaded_graph = tf.Graph() with tf.Session(graph=loaded_graph) as sess: # 载入保存的模型 loader = tf.train.import_meta_graph(load_dir + '.meta') loader.restore(sess, load_dir) # 从载入的模型获取 tensors input_text, initial_state, final_state, probs = get_tensors(loaded_graph) # 句子生成设置 gen_sentences = [prime_word + ':'] prev_state = sess.run(initial_state, {input_text: np.array([[1]])}) # 生成句子 for n in range(gen_length): # 动态输入 dyn_input = [[vocab_to_int[word] for word in gen_sentences[-seq_length:]]] # print(dyn_input) dyn_seq_length = len(dyn_input[0]) # 获取预测值 probabilities, prev_state = sess.run( [probs, final_state], {input_text: dyn_input, initial_state: prev_state}) pred_word = pick_word(probabilities[0][dyn_seq_length - 1], int_to_vocab) gen_sentences.append(pred_word) # 将标点符号 切换回来。 tv_script = ' '.join(gen_sentences) for key, token in token_dict.items(): ending = ' ' if key in ['\n', '(', '"'] else '' tv_script = tv_script.replace(' ' + token.lower(), key) tv_script = tv_script.replace('\n ', '\n') tv_script = tv_script.replace('( ', '(') print(tv_script) if __name__ == '__main__': training() # test()
作者:HJZ11



rnn

需要 登录 后方可回复, 如果你还没有账号请 注册新账号
相关文章