import os import random import csv #图片所在的路径 path = '/Users/mac/Desktop/wxd/flag/category/' #files列表保存所有类别的路径 files=[] same_pairs=[] different_pairs=[] for file in os.listdir(path): if file[0]=='.': continue file_path = os.path.join(path,file) files.append(file_path) #该地址为csv要保存到的路径,a表示追加写入 with open('/Users/mac/Desktop/wxd/flag/data.csv','a') as f: #保存相同对 writer = csv.writer(f) for file in files: imgs = os.listdir(file) for i in range(0,len(imgs)-1): for j in range(i+1,len(imgs)): pairs = [] name = file.split(sep='/')[-1] pairs.append(path+name+'/'+imgs[i]) pairs.append(path+name+'/'+imgs[j]) pairs.append(1) writer.writerow(pairs) #保存不同对 for i in range(0,len(files)-1): for j in range(i+1,len(files)): filea = files[i] fileb = files[j] imga_li = os.listdir(filea) imgb_li = os.listdir(fileb) random.shuffle(imga_li) random.shuffle(imgb_li) a_li = imga_li[:] b_li = imgb_li[:] for p in range(len(a_li)): for q in range(len(b_li)): pairs = [] name1 = filea.split(sep='/')[-1] name2 = fileb.split(sep='/')[-1] pairs.append(path+name1+'/'+a_li[p]) pairs.append(path+name2+'/'+b_li[q]) pairs.append(0) writer.writerow(pairs)



from __future__ import absolute_import from __future__ import print_function import numpy as np from keras.models import Model from keras.layers import Input, Dense, Dropout, BatchNormalization, Conv2D, MaxPooling2D, AveragePooling2D, concatenate, \ Activation, ZeroPadding2D from keras.layers import add, Flatten from keras.utils import plot_model from keras.metrics import top_k_categorical_accuracy from keras.preprocessing.image import ImageDataGenerator from keras.models import load_model import tensorflow as tf import random import os import cv2 import csv import numpy as np from keras.models import Model from keras.layers import Input, Flatten, Dense, Dropout, Lambda from keras.optimizers import RMSprop from keras import backend as K from keras.callbacks import ModelCheckpoint from keras.preprocessing.image import img_to_array """ 自定义的参数 """ im_width = 224 im_height = 224 epochs = 100 batch_size = 64 iterations = 1000 csv_path = '' model_result = '' # 计算欧式距离 def euclidean_distance(vects): x, y = vects sum_square = K.sum(K.square(x - y), axis=1, keepdims=True) return K.sqrt(K.maximum(sum_square, K.epsilon())) def eucl_dist_output_shape(shapes): shape1, shape2 = shapes return (shape1[0], 1) # 计算loss def contrastive_loss(y_true, y_pred): '''Contrastive loss from Hadsell-et-al.'06 ''' margin = 1 square_pred = K.square(y_pred) margin_square = K.square(K.maximum(margin - y_pred, 0)) return K.mean(y_true * square_pred + (1 - y_true) * margin_square) def compute_accuracy(y_true, y_pred): '''计算准确率 ''' pred = y_pred.ravel() < 0.5 print('pred:', pred) return np.mean(pred == y_true) def accuracy(y_true, y_pred): '''Compute classification accuracy with a fixed threshold on distances. ''' return K.mean(K.equal(y_true, K.cast(y_pred < 0.5, y_true.dtype))) def processImg(filename): """ :param filename: 图像的路径 :return: 返回的是归一化矩阵 """ img = cv2.imread(filename) img = cv2.resize(img, (im_width, im_height)) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img_to_array(img) img /= 255 return img def Conv2d_BN(x, nb_filter, kernel_size, strides=(1, 1), padding='same', name=None): if name is not None: bn_name = name + '_bn' conv_name = name + '_conv' else: bn_name = None conv_name = None x = Conv2D(nb_filter, kernel_size, padding=padding, strides=strides, activation='relu', name=conv_name)(x) x = BatchNormalization(axis=3, name=bn_name)(x) return x def bottleneck_Block(inpt, nb_filters, strides=(1, 1), with_conv_shortcut=False): k1, k2, k3 = nb_filters x = Conv2d_BN(inpt, nb_filter=k1, kernel_size=1, strides=strides, padding='same') x = Conv2d_BN(x, nb_filter=k2, kernel_size=3, padding='same') x = Conv2d_BN(x, nb_filter=k3, kernel_size=1, padding='same') if with_conv_shortcut: shortcut = Conv2d_BN(inpt, nb_filter=k3, strides=strides, kernel_size=1) x = add([x, shortcut]) return x else: x = add([x, inpt]) return x def resnet_50(): width = im_width height = im_height channel = 3 inpt = Input(shape=(width, height, channel)) x = ZeroPadding2D((3, 3))(inpt) x = Conv2d_BN(x, nb_filter=64, kernel_size=(7, 7), strides=(2, 2), padding='valid') x = MaxPooling2D(pool_size=(3, 3), strides=(2, 2), padding='same')(x) # conv2_x x = bottleneck_Block(x, nb_filters=[64, 64, 256], strides=(1, 1), with_conv_shortcut=True) x = bottleneck_Block(x, nb_filters=[64, 64, 256]) x = bottleneck_Block(x, nb_filters=[64, 64, 256]) # conv3_x x = bottleneck_Block(x, nb_filters=[128, 128, 512], strides=(2, 2), with_conv_shortcut=True) x = bottleneck_Block(x, nb_filters=[128, 128, 512]) x = bottleneck_Block(x, nb_filters=[128, 128, 512]) x = bottleneck_Block(x, nb_filters=[128, 128, 512]) # conv4_x x = bottleneck_Block(x, nb_filters=[256, 256, 1024], strides=(2, 2), with_conv_shortcut=True) x = bottleneck_Block(x, nb_filters=[256, 256, 1024]) x = bottleneck_Block(x, nb_filters=[256, 256, 1024]) x = bottleneck_Block(x, nb_filters=[256, 256, 1024]) x = bottleneck_Block(x, nb_filters=[256, 256, 1024]) x = bottleneck_Block(x, nb_filters=[256, 256, 1024]) # conv5_x x = bottleneck_Block(x, nb_filters=[512, 512, 2048], strides=(2, 2), with_conv_shortcut=True) x = bottleneck_Block(x, nb_filters=[512, 512, 2048]) x = bottleneck_Block(x, nb_filters=[512, 512, 2048]) x = AveragePooling2D(pool_size=(7, 7))(x) x = Flatten()(x) x = Dense(128, activation='relu')(x) return Model(inpt, x) def generator(imgs, batch_size): """ 自定义迭代器 :param imgs: 列表,每个包含一对矩阵以及label :param batch_size: :return: """ while 1: random.shuffle(imgs) li = imgs[:batch_size] pairs = [] labels = [] for i in li: img1 = i[0] img2 = i[1] im1 = cv2.imread(img1) im2 = cv2.imread(img2) if im1 is None or im2 is None: continue label = int(i[2]) img1 = processImg(img1) img2 = processImg(img2) pairs.append([img1, img2]) labels.append(label) pairs = np.array(pairs) labels = np.array(labels) yield [pairs[:, 0], pairs[:, 1]], labels input_shape = (im_width, im_height, 3) base_network = resnet_50() input_a = Input(shape=input_shape) input_b = Input(shape=input_shape) # because we re-use the same instance `base_network`, # the weights of the network # will be shared across the two branches processed_a = base_network(input_a) processed_b = base_network(input_b) distance = Lambda(euclidean_distance, output_shape=eucl_dist_output_shape)([processed_a, processed_b]) with tf.device("/gpu:0"): model = Model([input_a, input_b], distance) # train rms = RMSprop() rows = csv.reader(open(csv_path, 'r'), delimiter=',') imgs = list(rows) checkpoint = ModelCheckpoint(filepath=model_result+'flag_{epoch:03d}.h5', verbose=1) model.compile(loss=contrastive_loss, optimizer=rms, metrics=[accuracy]) model.fit_generator(generator(imgs, batch_size), epochs=epochs, steps_per_epoch=iterations, callbacks=[checkpoint])



model = load_model(model_path,custom_objects={'contrastive_loss': contrastive_loss }) #选取自己的.h模型名称


