Tensorflow2.0之图像说明文字生成

Shirley ·
更新时间:2024-09-21
· 752 次阅读

文章目录项目介绍代码实现1、导入需要的库2、下载数据集3、读取 json 文件4、载入图片5、载入模型6、获取图片特征6.1 删除重复的图片6.2 切片、分批6.3 将图片输入网络以获取特征7、文本 →\rightarrow→ 数字向量7.1 构建分词器7.2 构建数字向量7.3 将数字向量填充到同一长度8、划分训练集和验证集9、建立 tf.data 数据集10、编码器11、Bahdanau 注意力12、解码器13、设置超参数建立模型14、初始化优化器15、损失函数16、配置检查点17、梯度下降18、训练19、验证19.1 验证函数19.2 画注意力图19.3 随机测试验证集图片19.4 测试自己的图片 项目介绍

在此项目中,我们希望能在输入一张图像之后得到一句话来描述该图像,比如输入下面这张图像后会输出 “a man in an all excited to a podium” 之类的说明性文字。
在这里插入图片描述
注意:这里我们使用 MS-COCO 数据集来练习,这个数据集比较大,大概有13G左右。

代码实现 1、导入需要的库 import tensorflow as tf # You'll generate plots of attention in order to see which parts of an image # our model focuses on during captioning import matplotlib.pyplot as plt # Scikit-learn includes many helpful utilities from sklearn.model_selection import train_test_split from sklearn.utils import shuffle import re import numpy as np import os import time import json from glob import glob from PIL import Image import pickle 2、下载数据集 # Download image files image_folder = '/train2014/' if not os.path.exists(os.path.abspath('.') + image_folder): image_zip = tf.keras.utils.get_file('train2014.zip', cache_subdir=os.path.abspath('.'), origin = 'http://images.cocodataset.org/zips/train2014.zip', extract = True) PATH = os.path.dirname(image_zip) + image_folder os.remove(image_zip) else: PATH = os.path.abspath('.') + image_folder # Download caption annotation files annotation_folder = '/annotations/' if not os.path.exists(os.path.abspath('.') + annotation_folder): annotation_zip = tf.keras.utils.get_file('captions.zip', cache_subdir=os.path.abspath('.'), origin = 'http://images.cocodataset.org/annotations/annotations_trainval2014.zip', extract = True) annotation_file = os.path.dirname(annotation_zip)+'/annotations/captions_train2014.json' os.remove(annotation_zip)

其中 image_zip 是下载的图片压缩包;annotation_zip 是下载的说明性文字压缩包,其中 captions_train2014.json 文件中包含训练集的说明性文字以及对应的图片名称。

3、读取 json 文件 with open(annotation_file, 'r') as f: annotations = json.load(f) annotations.keys() dict_keys(['info', 'images', 'licenses', 'annotations'])

通过打印 annotations 类型可知它是一个字典,它的键值包括 ‘info’,‘images’,‘licenses’ 和 ‘annotations’,其中我们只需要用到 ‘annotations’。

annotations[‘annotations’] 返回一个列表,这个列表中的每个元素都是含有三个键值的字典,这三个键值分别是 ‘image_id’,‘id’ 和 ‘caption’,其中我们需要 ‘caption’ 来返回说明性文字和 ‘image_id’ 来返回这个说明性文字对应的图片代号。

为节约训练时间,我们只取其中30000个样本训练。

all_captions = [] all_img_name_vector = [] for annot in annotations['annotations']: caption = ' ' + annot['caption'] + ' ' image_id = annot['image_id'] full_coco_image_path = PATH + 'COCO_train2014_' + '%012d.jpg' % (image_id) all_img_name_vector.append(full_coco_image_path) all_captions.append(caption) train_captions, img_name_vector = shuffle(all_captions, all_img_name_vector, random_state=1) num_examples = 30000 train_captions = train_captions[:num_examples] img_name_vector = img_name_vector[:num_examples]

关于 sklearn.utils.shuffle() 的用法请参考:用 sklearn.utils.shuffle 来打乱样本顺序。

4、载入图片 def load_image(image_path): img = tf.io.read_file(image_path) img = tf.image.decode_jpeg(img, channels=3) img = tf.image.resize(img, (299, 299)) img = tf.keras.applications.inception_v3.preprocess_input(img) return img, image_path

其中 tf.io.read_file(image_path) 输出的是图片信息,我们在将其进行解码( tf.image.decode_jpeg(img, channels=3) )后可以得到图片像素。

因为我们用 Inception_v3 网络来提取图片特征,所以我们在这里要将图片像素范围转化为 Inception_v3 网络需要的范围,所以要使用 tf.keras.applications.inception_v3.preprocess_input(img)。

5、载入模型

在这里我们使用 Inception_v3 网络来提取图片特征,输出为该网络中最后一个卷积层的输出。

image_model = tf.keras.applications.InceptionV3(include_top=False, weights='imagenet') new_input = image_model.input hidden_layer = image_model.layers[-1].output image_features_extract_model = tf.keras.Model(new_input, hidden_layer) 6、获取图片特征 6.1 删除重复的图片

因为一张图片可能对应不同的说明性文字,所以图片数据集中存在重复的问题。

encode_train = sorted(set(img_name_vector))

此时,encode_train 列表中的图片名称是不重复的。

6.2 切片、分批

首先将列表 encode_train 转化为 dataset 类型的数据。

image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)

然后我们把它映射到 load_image() 函数实现从图片名称到图片的转换。

image_dataset = image_dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(16) 6.3 将图片输入网络以获取特征 for img, path in image_dataset: batch_features = image_features_extract_model(img) batch_features = tf.reshape(batch_features, (batch_features.shape[0], -1, batch_features.shape[3])) for bf, p in zip(batch_features, path): path_of_feature = p.numpy().decode("utf-8") np.save(path_of_feature, bf.numpy())

这里的 np.save(path_of_feature, bf.numpy()) 的目的是:在调用 np.load(path_of_feature+’.npy’) 的时候能输出这个路径下的图片对应的特征 bf.numpy()。

最终得到的图片特征的维度为:(batch_size, 64, 2048)。

7、文本 →\rightarrow→ 数字向量 7.1 构建分词器

为了节省内存,我们把词汇表大小限制在前5000个单词,其他的单词用 “” 代替。

top_k = 5000 tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k, oov_token="", filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ') tokenizer.fit_on_texts(train_captions) 7.2 构建数字向量 train_seqs = tokenizer.texts_to_sequences(train_captions) 7.3 将数字向量填充到同一长度 cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post') 8、划分训练集和验证集 img_name_train, img_name_val, cap_train, cap_val = train_test_split(img_name_vector, cap_vector, test_size=0.2, random_state=0) 9、建立 tf.data 数据集

在 6.2 部分中建立的数据集是为了将图片输入 Inception_v3 网络得到特征,而在这里建立的数据集对应的样本是图片的(解码前的)名称,标签是这张图片对应的说明性文字的数字向量。

dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))

接着,我们要将数据集中的图片名称转换为这张图片对应的特征,所以我们要使用 np.load() 函数。

def map_func(img_name, cap): img_tensor = np.load(img_name.decode('utf-8')+'.npy') return img_tensor, cap dataset = dataset.map(lambda item1, item2: tf.numpy_function( map_func, [item1, item2], [tf.float32, tf.int32]), num_parallel_calls=tf.data.experimental.AUTOTUNE)

接着,对数据集进行打乱、分批:

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE) dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE) 10、编码器

因为在之前我们已经用卷积神经网络提取了特征(batch_size, 64, 2048),所以在这个编码器中我们只需要定义全连接层(其神经元个数是词嵌入维度)即可。

class CNN_Encoder(tf.keras.Model): def __init__(self, embedding_dim): super(CNN_Encoder, self).__init__() self.fc = tf.keras.layers.Dense(embedding_dim) def call(self, x): x = self.fc(x) x = tf.nn.relu(x) return x

经过编码器后,图像特征的形状变为(batch_size, 64, embedding_dim)。

11、Bahdanau 注意力

相关论文参考:BahdanauAttention。

class BahdanauAttention(tf.keras.Model): def __init__(self, units): super(BahdanauAttention, self).__init__() self.W1 = tf.keras.layers.Dense(units) self.W2 = tf.keras.layers.Dense(units) self.V = tf.keras.layers.Dense(1) def call(self, features, hidden): hidden_with_time_axis = tf.expand_dims(hidden, 1) score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis)) attention_weights = tf.nn.softmax(self.V(score), axis=1) context_vector = attention_weights * features context_vector = tf.reduce_sum(context_vector, axis=1) return context_vector, attention_weights

这里的 features 其实就是编码器中输出的结果,经过含 units 个神经元的 Dense 层之后,其形状从 (batch_size, 64, embedding_dim) 变成了 (batch_size, 64, units)。

这里的 hidden 其实就是解码器中输出的隐层向量,我们需要将其维度从 (batch_size, embedding_dim) 变成 (batch_size, 1, embedding_dim) 来执行之后的加法以计算分数,将增加维度后的向量经过含 units 个神经元的 Dense 层之后,其形状从 (batch_size, 1, embedding_dim) 变成了 (batch_size, 1, units)。

将以上两个输出相加得到的形状为 (batch_size, 64, units),经过含1个神经元的 Dense 层之后得到 score,其形状变成 (batch_size, 64, 1)。

Softmax 默认被应用于最后一个轴,但是这里我们想将它应用于第二个轴(即 axis=1),因为分数 (score) 的形状是 (batch_size, 64, 1)。我们想为每个输入的特征 (batch_size, 64, embedding_dim) 分配一个权重,所以 softmax 应该用在 64 这个轴上。经过 Softmax 层之后,得到的注意力权重形状和 score 的形状相同,都是 (batch_size, 64, 1)。
【注】Softmax 的不同的轴的计算规则参考:tf.nn.softmax(x, axis)里axis起什么作用?

将注意力权重和 features 相乘,得到上下文向量,其形状为 (batch_size, 16, embedding_dim)。此向量也就是加了权重的编码向量。将上下文向量基于第二个轴求和(原因与之前相同),得到最终的上下文向量,其形状为 (batch_size, embedding_dim)。

12、解码器 class RNN_Decoder(tf.keras.Model): def __init__(self, embedding_dim, units, vocab_size): super(RNN_Decoder, self).__init__() self.units = units self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) self.gru = tf.keras.layers.GRU(self.units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform') self.fc1 = tf.keras.layers.Dense(self.units) self.fc2 = tf.keras.layers.Dense(vocab_size) self.attention = BahdanauAttention(self.units) def call(self, x, features, hidden): context_vector, attention_weights = self.attention(features, hidden) x = self.embedding(x) x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1) output, state = self.gru(x) x = self.fc1(output) x = tf.reshape(x, (-1, x.shape[2])) x = self.fc2(x) return x, state, attention_weights def reset_state(self, batch_size): return tf.zeros((batch_size, self.units))

在解码器中,我们首先通过 Bahdanau 注意力得到上下文向量和注意力权重,他们的形状分别为 (batch_size, embedding_dim) 和 (batch_size, 64, 1)。

这里的输入 x 是一个词对应的数字(第一个输入模型的数字一定是 “” 对应的数字),其形状为(batch_size, 1),经过词嵌入层之后,其形状变为(batch_size, embedding_dim),将其与增加了第二维度的上下文向量合并后得到形状为 (batch_size, 64, 2*embedding_dim)。

将其输入 GRU,得到输出为 (batch_size, 1, units),隐藏状态为 (batch_size, units)。

然后,经过全连接层后,得到 (batch_size, vocab_size)。

13、设置超参数建立模型 BATCH_SIZE = 64 BUFFER_SIZE = 1000 embedding_dim = 256 units = 512 vocab_size = top_k num_steps = len(img_name_train) // BATCH_SIZE features_shape = 2048 attention_features_shape = 64 encoder = CNN_Encoder(embedding_dim) decoder = RNN_Decoder(embedding_dim, units, vocab_size) 14、初始化优化器 optimizer = tf.keras.optimizers.Adam() 15、损失函数

当输入的向量中出现0元素,说明这个元素所在的文本已经结束了,这个文本不再参与损失的计算,所以在计算损失的时候,要使用掩膜处理,将已结束文本的损失置零。

loss_object = tf.keras.losses.SparseCategoricalCrossentropy( from_logits=True, reduction='none') def loss_function(real, pred): mask = tf.math.logical_not(tf.math.equal(real, 0)) loss_ = loss_object(real, pred) mask = tf.cast(mask, dtype=loss_.dtype) loss_ *= mask return tf.reduce_mean(loss_) 16、配置检查点 checkpoint_path = "./checkpoints/train" ckpt = tf.train.Checkpoint(encoder=encoder, decoder=decoder, optimizer = optimizer) ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5) # 如果检查点存在,则恢复最新的检查点。 start_epoch = 0 if ckpt_manager.latest_checkpoint: start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1]) # restoring the latest checkpoint in checkpoint_path ckpt.restore(ckpt_manager.latest_checkpoint) 17、梯度下降 @tf.function def train_step(img_tensor, target): loss = 0 # initializing the hidden state for each batch # because the captions are not related from image to image hidden = decoder.reset_state(batch_size=target.shape[0]) dec_input = tf.expand_dims([tokenizer.word_index['']] * target.shape[0], 1) with tf.GradientTape() as tape: features = encoder(img_tensor) for i in range(1, target.shape[1]): # passing the features through the decoder predictions, hidden, _ = decoder(dec_input, features, hidden) loss += loss_function(target[:, i], predictions) # using teacher forcing dec_input = tf.expand_dims(target[:, i], 1) total_loss = (loss / int(target.shape[1])) trainable_variables = encoder.trainable_variables + decoder.trainable_variables gradients = tape.gradient(loss, trainable_variables) optimizer.apply_gradients(zip(gradients, trainable_variables)) return loss, total_loss 18、训练 loss_plot = [] EPOCHS = 20 # 训练从 start_epoch 训练到 EPOCHS for epoch in range(start_epoch, EPOCHS): start = time.time() total_loss = 0 for (batch, (img_tensor, target)) in enumerate(dataset): batch_loss, t_loss = train_step(img_tensor, target) total_loss += t_loss if batch % 100 == 0: print ('Epoch {} Batch {} Loss {:.4f}'.format( epoch + 1, batch, batch_loss.numpy() / int(target.shape[1]))) # storing the epoch end loss value to plot later loss_plot.append(total_loss / num_steps) if epoch % 5 == 0: ckpt_manager.save() print ('Epoch {} Loss {:.6f}'.format(epoch + 1, total_loss/num_steps)) print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start)) 19、验证 19.1 验证函数 1、初始化解码器的隐藏状态; 2、为图像添加维度; 3、提取图像特征并将所得形状转换成编码器需要的形状; 4、将图像特征输入编码器; 5、初始化解码器输入为 ‘’; 6、经解码器得到的数据形状为 (batch_size, vocab_size),要用 tf.random.categorical() 函数找出每个样本的概率最大的下一个单词。 7、将预测的单词放到一个列表中。 def evaluate(image): attention_plot = np.zeros((max_length, attention_features_shape)) hidden = decoder.reset_state(batch_size=1) temp_input = tf.expand_dims(load_image(image)[0], 0) img_tensor_val = image_features_extract_model(temp_input) img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3])) features = encoder(img_tensor_val) dec_input = tf.expand_dims([tokenizer.word_index['']], 0) result = [] for i in range(max_length): predictions, hidden, attention_weights = decoder(dec_input, features, hidden) attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy() predicted_id = tf.random.categorical(predictions, 1)[0][0].numpy() result.append(tokenizer.index_word[predicted_id]) if tokenizer.index_word[predicted_id] == '': return result, attention_plot dec_input = tf.expand_dims([predicted_id], 0) attention_plot = attention_plot[:len(result), :] return result, attention_plot 19.2 画注意力图 def plot_attention(image, result, attention_plot): temp_image = np.array(Image.open(image)) fig = plt.figure(figsize=(10, 10)) len_result = len(result) for l in range(len_result): temp_att = np.resize(attention_plot[l], (8, 8)) ax = fig.add_subplot(len_result//2, len_result//2, l+1) ax.set_title(result[l]) img = ax.imshow(temp_image) ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent()) plt.tight_layout() plt.show() 19.3 随机测试验证集图片 # captions on the validation set rid = np.random.randint(0, len(img_name_val)) image = img_name_val[rid] real_caption = ' '.join([tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]]) result, attention_plot = evaluate(image) print ('Real Caption:', real_caption) print ('Prediction Caption:', ' '.join(result)) plot_attention(image, result, attention_plot) 19.4 测试自己的图片 image_url = 'https://tensorflow.org/images/surf.jpg' image_extension = image_url[-4:] image_path = tf.keras.utils.get_file('image'+image_extension, origin=image_url) result, attention_plot = evaluate(image_path) print ('Prediction Caption:', ' '.join(result)) plot_attention(image_path, result, attention_plot) # opening the image Image.open(image_path)
作者:cofisher



说明文 tensorflow

需要 登录 后方可回复, 如果你还没有账号请 注册新账号
相关文章