本文章我们来学习一下使用PaddlePaddle实现人脸对比和人脸识别,使用的训练数据集是CASIA-WebFace。
数据集介绍我们使用的是CASIA-WebFace数据集,该人脸数据集是目前最大的公开人脸数据集。该人脸数据集一共有包含10,575个人,494,414张图像,包含彩色图和灰图。各大人脸数据集情况如下表。
Dataset
Subjects
Images
Availability
LFW [1]
5,749
13,233
Public
WDRef [2]
2,995
99,773
Public (feature only)
CelebFaces [3]
10,177
202,599
Private
SFC [4]
4,030
4,400,000
Private
CACD [5]
2,000
163,446
Public (partial annotated)
CASIA-WebFace
10,575
494,414
Public
训练模型为了方便读取数据集,我们要生成一个图像列表,用于训练时读取数据,这个列表的作用具体可以阅读笔者之前的文章《我的PaddlePaddle学习之路》笔记四——自定义图像数据集的识别,执行下面代码生成人脸数据集的图像列表。下载CASIA-WebFace数据集并解压,执行代码时传入解压后的根目录,执行之后会在/home/test生成一个图像列表文件夹。
# 生成图像列表程序 import os import json class CreateDataList: def __init__(self): pass def createTrainDataList(self, data_root_path): # # 把生产的数据列表都放在自己的总类别文件夹中 data_list_path = '' # 所有类别的信息 class_detail = [] # 获取所有类别 class_dirs = os.listdir(data_root_path) # 类别标签 class_label = 0 # 获取总类别的名称 father_paths = data_root_path.split('/') while True: if father_paths[father_paths.__len__() - 1] == '': del father_paths[father_paths.__len__() - 1] else: break father_path = father_paths[father_paths.__len__() - 1] all_class_images = 0 # 读取每个类别 for class_dir in class_dirs: # 每个类别的信息 class_detail_list = {} test_sum = 0 trainer_sum = 0 # 把生产的数据列表都放在自己的总类别文件夹中 data_list_path = "/home/test/%s/" % father_path # 统计每个类别有多少张图片 class_sum = 0 # 获取类别路径 path = data_root_path + "/" + class_dir # 获取所有图片 img_paths = os.listdir(path) for img_path in img_paths: # 每张图片的路径 name_path = path + '/' + img_path # 如果不存在这个文件夹,就创建 isexist = os.path.exists(data_list_path) if not isexist: os.makedirs(data_list_path) # 每10张图片取一个做测试数据 trainer_sum += 1 with open(data_list_path + "trainer.list", 'a') as f: f.write(name_path + "t%d" % class_label + "n") class_sum += 1 all_class_images += 1 class_label += 1 # 说明的json文件的class_detail数据 class_detail_list['class_name'] = class_dir class_detail_list['class_label'] = class_label class_detail_list['class_test_images'] = test_sum class_detail_list['class_trainer_images'] = trainer_sum class_detail.append(class_detail_list) # 获取类别数量 all_class_sum = class_dirs.__len__() # 说明的json文件信息 readjson = {} readjson['all_class_name'] = father_path readjson['all_class_sum'] = all_class_sum readjson['all_class_images'] = all_class_images readjson['class_detail'] = class_detail jsons = json.dumps(readjson, sort_keys=True, indent=4, separators=(',', ': ')) with open(data_list_path + "readme.json",'w') as f: f.write(jsons) if __name__ == '__main__': createDataList = CreateDataList() createDataList.createTrainDataList('/home/test/WebFace/')编写读取图像的reader,这个reader对图像做的预处理的进行中心裁剪,因为人脸都是居中的,进行居中裁剪可以去掉其他的背景的影响。
# 把图像和label读取成reader # coding=utf-8 import cv2 import numpy as np import paddle.v2 as paddle import random from multiprocessing import cpu_count class MyReader: def __init__(self, imageSize, type_size, center_crop_size = 128): self.imageSize = imageSize self.type_size = type_size self.center_crop_size = center_crop_size self.default_image_size = 250 def train_mapper(self, sample): img, label = sample sparse_label = [0 for i in range(self.type_size)] sparse_label[label - 1] = 1 # 裁剪中心图片 def crop_img(img, center_crop_size): img = cv2.imread(img, 0) if center_crop_size < self.default_image_size: side = (self.default_image_size - center_crop_size) / 2 img = img[side: self.default_image_size - side - 1, side: self.default_image_size - side - 1] return img img = crop_img(img, self.center_crop_size) img = cv2.resize(img, (self.imageSize, self.imageSize)) return img.flatten().astype('float32'), label, sparse_label # 获取训练的reader def train_reader(self, train_list, buffered_size=1024): def reader(): with open(train_list, 'r') as f: lines = [line.strip() for line in f] # 打乱数据 random.shuffle(lines) for line in lines: line = line.strip().split('t') img_path = line[0] img_label = line[1] yield img_path, int(img_label) return paddle.reader.xmap_readers(self.train_mapper, reader, cpu_count(), buffered_size)编写卷积神经网络,这个是根据resnet修改的网络。使用了6个卷积块,最后的返回值是最后一个池化层和最后一个全连接层,输出最后一层池化层是为了在预测的是获取图像的人脸特征,做人脸对比。
import numpy as np import paddle.v2 as paddle def conv_bn_layer(input, ch_out, filter_size, stride, padding, active_type=paddle.activation.Relu(), ch_in=None): tmp = paddle.layer.img_conv( input=input, filter_size=filter_size, num_channels=ch_in, num_filters=ch_out, stride=stride, padding=padding, act=paddle.activation.Linear(), bias_attr=False) return paddle.layer.batch_norm(input=tmp, act=active_type, moving_average_fraction=0.999) def shortcut(ipt, ch_in, ch_out, stride): if ch_in != ch_out: return conv_bn_layer(ipt, ch_out, 1, stride, 0, paddle.activation.Linear()) else: return ipt def basicblock(ipt, ch_in, ch_out, stride): tmp = conv_bn_layer(ipt, ch_out, 3, stride, 1) tmp = conv_bn_layer(tmp, ch_out, 3, 1, 1, paddle.activation.Linear()) short = shortcut(ipt, ch_in, ch_out, stride) return paddle.layer.addto(input=[tmp, short], act=paddle.activation.Relu()) def layer_warp(block_func, ipt, ch_in, ch_out, count, stride): tmp = block_func(ipt, ch_in, ch_out, stride) for i in range(1, count): tmp = block_func(tmp, ch_out, ch_out, 1) return tmp def resnet(ipt, class_dim): n = 1 feature_maps = 512 ipt_bn = ipt - 128.0 # 获取卷积层输出 conv1 = conv_bn_layer(ipt_bn, ch_in=1, ch_out=8, filter_size=3, stride=1, padding=1) # 多个残差块组合 res0 = layer_warp(basicblock, conv1, 8, 16, n, 1) res1 = layer_warp(basicblock, res0, 16, 32, n, 1) res2 = layer_warp(basicblock, res1, 32, 64, n, 2) res3 = layer_warp(basicblock, res2, 64, 128, n, 2) res4 = layer_warp(basicblock, res3, 128, 256, n, 2) res5 = layer_warp(basicblock, res4, 256, feature_maps, n, 2) # 最后使用池化层来降维 pool = paddle.layer.img_pool(input=res5, name='pool', pool_size=8, stride=1, pool_type=paddle.pooling.Avg()) fc = paddle.layer.fc(input=pool, size=class_dim, act=paddle.activation.Softmax()) return pool, fc开始训练模型
# 训练代码 import os import sys import paddle.v2 as paddle from paddle.v2.plot import Ploter step = 0 class PaddleUtil: # **********************获取参数*************************************** def get_parameters(self, parameters_path=None, cost=None): if not parameters_path: # 使用cost创建parameters if not cost: raise NameError('请输入cost参数') else: # 根据损失函数创建参数 parameters = paddle.parameters.create(cost) print "cost" return parameters else: # 使用之前训练好的参数 try: # 使用训练好的参数 with open(parameters_path, 'r') as f: parameters = paddle.parameters.Parameters.from_tar(f) print "使用parameters" return parameters except Exception as e: raise NameError("你的参数文件错误,具体问题是:%s" % e) # ***********************获取训练器*************************************** # datadim 数据大小 def get_trainer(self, datadim, type_size, parameters_path, batch_size): # 获得图片对于的信息标签 label = paddle.layer.data(name="label", type=paddle.data_type.integer_value(type_size)) image = paddle.layer.data(name="image", type=paddle.data_type.dense_vector(datadim)) # 获取全连接层,也就是分类器 fea, out = resnet(image, class_dim=type_size) # 获得损失函数 cost = paddle.layer.classification_cost(input=out, label=label) # 获得参数 if not parameters_path: parameters = self.get_parameters(cost=cost) else: parameters = self.get_parameters(parameters_path=parameters_path) ''' 定义优化方法 learning_rate 迭代的速度 momentum 跟前面动量优化的比例 regularzation 正则化,防止过拟合 ''' optimizer = paddle.optimizer.Momentum( momentum=0.9, regularization=paddle.optimizer.L2Regularization(rate=0.0005 * batch_size), learning_rate=0.00001 / batch_size, learning_rate_decay_a=0.1, learning_rate_decay_b=128000 * 35, learning_rate_schedule="discexp", ) ''' 创建训练器 cost 分类器 parameters 训练参数,可以通过创建,也可以使用之前训练好的参数 update_equation 优化方法 ''' trainer = paddle.trainer.SGD(cost=cost, parameters=parameters, update_equation=optimizer) return trainer # ***********************开始训练*************************************** def start_trainer(self, trainer, num_passes, save_parameters_name, trainer_reader, batch_size): # 获得数据 reader = paddle.batch(reader=paddle.reader.shuffle(reader=trainer_reader, buf_size=5000), batch_size=batch_size) # 保证保存模型的目录是存在的 father_path = save_parameters_name[:save_parameters_name.rfind("/")] if not os.path.exists(father_path): os.makedirs(father_path) # 指定每条数据和padd.layer.data的对应关系 feeding = {"image": 0, "label": 1} train_title = "Train cost" error_title = "Error" cost_ploter = Ploter(train_title, error_title) # 定义训练事件,画出折线图,该事件的图可以在notebook上显示,命令行不会正常输出 def event_handler_plot(event): global step if isinstance(event, paddle.event.EndIteration): if step % 1 == 0: cost_ploter.append(train_title, step, event.cost) # cost_ploter.append(error_title, step, event.metrics['classification_error_evaluator']) cost_ploter.plot() step += 1 if step % 100 == 0: # 保存训练好的参数 with open(save_parameters_name, 'w') as f: trainer.save_parameter_to_tar(f) ''' 开始训练 reader 训练数据 num_passes 训练的轮数 event_handler 训练的事件,比如在训练的时候要做一些什么事情 feeding 说明每条数据和padd.layer.data的对应关系 ''' trainer.train(reader=reader, num_passes=num_passes, event_handler=event_handler_plot, feeding=feeding) if __name__ == '__main__': paddle.init(use_gpu=True, trainer_count=1) # 类别总数 type_size = 10575 # 图片大小 imageSize = 128 # 中心裁剪大小 crop_size = 128 # Batch Size batch_size = 256 # 保存的model路径 parameters_path = "/home/test/model.tar" # 数据的大小 datadim = imageSize * imageSize paddleUtil = PaddleUtil() # *******************************开始训练************************************** myReader = MyReader(imageSize=imageSize, type_size=type_size, center_crop_size=crop_size) trainer_reader = myReader.train_reader(train_list="/home/test/train_set/trainer.list") # 获取训练器 trainer = paddleUtil.get_trainer(datadim=datadim, type_size=type_size, parameters_path=None, batch_size=batch_size) paddleUtil.start_trainer(trainer=trainer, num_passes=50, save_parameters_name=parameters_path, trainer_reader=trainer_reader, batch_size=batch_size)预测经过上面的训练之后,获得得到了一个训练好的模型,我们将会使用这个模型来进行人脸对比和人脸识别。
人脸对比人脸对比,人脸对比其实就是做普通的分类预测,但是输出的不是最后一层全连接层,而是最后一层池化层,这样输出的就是人脸的特征,然后使用对角余弦函数来计算他们的相似度。
通过人脸对比的方式实现一些场景的应用。比如对比证件上的人脸和真实的人脸是否为同一个人,操作方式判断人脸相似度的result是否达到预设值,推荐相似度为0.8时,为同一个人。利用这种的人脸对比方式,有可以实现人脸识别。 首先我们可以把人脸以注册人脸的方式加入到注册人脸库中,加关联到该人脸的信息;然后要进行识别时,把要识别的人脸和已注册的人脸库中的人脸进行对比,当对比为识别为同一个人脸,就算识别成功这样的处理方式好处是,不需要每次增加新的用户时,需要收集大量该用户的人脸,只有收集一张或者多张多角度的人脸,完全可以使用同一个模型进行人脸对比。# 预测代码 import numpy as np import paddle.v2 as paddle import os import cv2 import math from sklearn import preprocessing # 获取参数 def get_parameters(parameters_path): with open(parameters_path, 'r') as f: parameters = paddle.parameters.Parameters.from_tar(f) return parameters # 获取预测器 def get_inference(parameters, fea): inferer = paddle.inference.Inference(output_layer=fea, parameters=parameters) return inferer # 预处理图片 def load_image(file, imageSize): img = cv2.imread(file, 0) img = np.reshape(img, [img.shape[0], img.shape[1], 1]) img = paddle.image.center_crop(img, 128, is_color=False) img = cv2.resize(img, (imageSize, imageSize)).flatten() return img # 使用训练好的参数进行预测 def to_prediction(inferer, image_paths, imageSize): # 获得要预测的图片 test_data = [] for image_path in image_paths: test_data.append([load_image(image_path, imageSize)]) # 获得预测结果 probs = inferer.infer(input=test_data) # 获取两个图片的预测输出 prob1 = probs[0] prob2 = probs[1] # 对角余弦值 dist = np.dot(prob1, prob2) / (np.linalg.norm(prob1) * np.linalg.norm(prob2)) return dist if __name__ == '__main__': paddle.init(use_gpu=True, trainer_count=1) # 类别总数 type_size = 10575 # 图片大小 imageSize = 128 # 保存的model路径 parameters_path = "/home/test/model.tar" # 数据的大小 datadim = imageSize * imageSize # 获取预测器 parameters = get_parameters(parameters_path=parameters_path) image = paddle.layer.data(name="image", type=paddle.data_type.dense_vector(datadim)) fea, out = resnet(image, class_dim=type_size) inferer = get_inference(parameters=parameters, fea=fea) image_path = [] image_path1, image_path2 = "/home/test/0.jpg", "/home/test/1.jpg" image_path.append(image_path1) image_path.append(image_path2) # 得到两张图的相似度 result = to_prediction(inferer=inferer, image_paths=image_path, imageSize=imageSize) print("两张图像的相似度为:" + result )人脸识别这个是人脸识别方式是不推荐使用的,它就是一个分类的操作,输入一张人脸图片,获取对应的人脸的label和概率。 但是如果要加入新的人脸,需要收集大量该用户的人脸,并再次进行训练,得到新的模型。 这样的识别方式,扩展性非常弱,但是识别速度比较快,不需要每张人脸都进行对比。
# 预测代码 import cv2 import numpy as np import paddle.v2 as paddle # 获取参数 def get_parameters(parameters_path): with open(parameters_path, 'r') as f: parameters = paddle.parameters.Parameters.from_tar(f) return parameters # 获取预测器 def get_inference(parameters, fea): inferer = paddle.inference.Inference(output_layer=fea, parameters=parameters) return inferer # 预处理图片 def load_image(file, imageSize): img = cv2.imread(file, 0) img = np.reshape(img, [img.shape[0], img.shape[1], 1]) img = paddle.image.center_crop(img, 128, is_color=False) img = cv2.resize(img, (imageSize, imageSize)).flatten() return img # 使用训练好的参数进行预测 def to_prediction(inferer, image_paths, imageSize): # 获得要预测的图片 test_data = [] test_data.append([load_image(image_path, imageSize)]) # 获得预测结果 probs = inferer.infer(input=test_data) # 处理预测结果 lab = np.argsort(-probs) # 返回概率最大的值和其对应的概率值 return lab[0][0], probs[0][(lab[0][0])] if __name__ == '__main__': paddle.init(use_gpu=True, trainer_count=1) # 类别总数 type_size = 10575 # 图片大小 imageSize = 128 # 保存的model路径 parameters_path = "/home/test/model.tar" # 数据的大小 datadim = imageSize * imageSize # 获取预测器 parameters = get_parameters(parameters_path=parameters_path) image = paddle.layer.data(name="image", type=paddle.data_type.dense_vector(datadim)) fea, out = resnet(image, class_dim=type_size) inferer = get_inference(parameters=parameters, fea=out) image_path = "/home/test/0.jpg" # 获取人脸对比的label和概率 result, probability = to_prediction(inferer=inferer, image_paths=image_path, imageSize=imageSize) print('预测结果为:%d,可信度为:%f' % (result, probability)) ---来自腾讯云社区的---夜雨飘零
微信扫一扫打赏
支付宝扫一扫打赏