Соси кошек вместе с кодом! Эта статья участвует【Эссе "Мяу Звезды"】.
Чей ты котик?
Одноклассник Сяомин: Котёнок из семьи Эргоузи снова пришёл его украсть Сяохуамао голодает, что мне делать? ? ?
что делать? ? ? Идентификация мелких домашних животных может быть проведена только исследовательской группой университета Что мне делать, как воспитаннику детского сада? ? ?
Одноклассник Сяомин в отчаянии почесал голову и издал болезненный звук «ааааа». . . . . .
Сяо Мин обнаружил новостной репортаж «Технология распознавания лиц обезьяны» уже здесь! 》new.qq.com/ oh beauty / 2021022 ...
Не волнуйся, одноклассник детского сада Сяомин, я научу тебя использовать структуру глубокого обучения MegEngine для распознавания котят, а котятам, которых ты не знаешь, вход запрещен.
1. Сбор данных
Все видео с кошками собраны из общедоступных видео, а фото лица котенка получено через скриншот видео, поэтому нет необходимости делать отдельное фото.
!unzip -q data/data71411/cat.zip
replace 1.mp4? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C
1.1 python вызывает openCV, чтобы делать снимок из видео каждую 1 секунду и сохраняет его после нумерации.
import cv2
import os
for i in range(1,5):
# 创建图片目录
print(i)
mp4_file=str(i)+'.mp4'
dir_path=os.path.join('dataset',str(i))
if not os.path.exists(dir_path):
os.makedirs(dir_path)
# 每秒存一次图片
vidcap = cv2.VideoCapture(mp4_file)
success,image = vidcap.read()
fps = int(vidcap.get(cv2.CAP_PROP_FPS))
count = 0
while success:
if count % fps == 0:
cv2.imwrite("{}/{}.jpg".format(dir_path, int(count / fps)), image)
print('Process %dth seconds: ' % int(count / fps), success)
success,image = vidcap.read()
count += 1
1.2 Генерация изображений для обработки
Удалите ненормальные изображения, такие как кредиты
Вручную...................................
import matplotlib.pyplot as plt
%matplotlib inline
import cv2 as cv
import numpy as np
# jupyter notebook显示
def visualize_images():
img = cv.imread('dataset/1/1.jpg')
plt.imshow(img)
plt.show()
visualize_images()
1.3 Представление набора данных
4 разных котенка
1.4 генерация списка
Для пользовательского набора данных сначала создайте список изображений, разделите пользовательские изображения на тестовый набор и обучающий набор и пометьте их. Следующая программа может быть запущена одна, если передан путь к папке большой категории, программа будет перебирать каждую подкатегорию в ней, чтобы сгенерировать список в фиксированном формате.Например, мы помещаем корневой каталог категория лица Передайте ее в ./dataset. Наконец, в указанном каталоге будут созданы три файла: readme.json, train.list и test.list.
import os
import json
# 设置要生成文件的路径
data_root_path = 'cat'
# 所有类别的信息
class_detail = []
# 获取所有类别保存的文件夹名称,这里是['1', '2', '3','4']
class_dirs = os.listdir(data_root_path)
# 类别标签
class_label = 0
# 获取总类别的名称
father_paths = data_root_path.split('/')
while True:
if father_paths[father_paths.__len__() - 1] == '':
del father_paths[father_paths.__len__() - 1]
else:
break
father_path = father_paths[father_paths.__len__() - 1]
data_list_path='./'
# 清空原来的数据
with open( "test.txt", 'w') as f:
pass
with open( "train.txt", 'w') as f:
pass
# 总的图像数量
all_class_images = 0
# 读取每个类别
for class_dir in class_dirs:
# 每个类别的信息
class_detail_list = {}
test_sum = 0
trainer_sum = 0
# 统计每个类别有多少张图片
class_sum = 0
# 获取类别路径
path = data_root_path + "/" + class_dir
# 获取所有图片
img_paths = os.listdir(path)
for img_path in img_paths: # 遍历文件夹下的每个图片
name_path = path + '/' + img_path # 每张图片的路径
if class_sum % 10 == 0: # 每10张图片取一个做测试数据
test_sum += 1 #test_sum测试数据的数目
with open(data_list_path + "test.txt", 'a') as f:
f.write(name_path + "\t%d" % class_label + "\n") #class_label 标签:0,1,2
else:
trainer_sum += 1 #trainer_sum测试数据的数目
with open(data_list_path + "train.txt", 'a') as f:
f.write(name_path + "\t%d" % class_label + "\n")#class_label 标签:0,1,2
class_sum += 1 #每类图片的数目
all_class_images += 1 #所有类图片的数目
# 说明的json文件的class_detail数据
class_detail_list['class_name'] = class_dir #类别名称,如jiangwen
class_detail_list['class_label'] = class_label #类别标签,0,1,2
class_detail_list['class_test_images'] = test_sum #该类数据的测试集数目
class_detail_list['class_trainer_images'] = trainer_sum #该类数据的训练集数目
class_detail.append(class_detail_list)
class_label += 1 #class_label 标签:0,1,2
# 获取类别数量
all_class_sum = class_dirs.__len__()
# 说明的json文件信息
readjson = {}
readjson['all_class_name'] = father_path #文件父目录
readjson['all_class_sum'] = all_class_sum #
readjson['all_class_images'] = all_class_images
readjson['class_detail'] = class_detail
jsons = json.dumps(readjson, sort_keys=True, indent=4, separators=(',', ': '))
with open(data_list_path + "readme.json",'w') as f:
f.write(jsons)
print ('生成数据列表完成!')
生成数据列表完成!
1.5 Создание набора данных
import megengine.hub
import urllib
import cv2
import numpy as np
import megengine.data.transform as T
import megengine.functional as F
import numpy as np
from PIL import Image
class MiaoMiaoDataset(megengine.data.dataset.Dataset):
"""
2类Bee数据集类的定义
"""
def __init__(self,mode='train'):
"""
初始化函数
"""
self.data = []
with open('{}.txt'.format(mode)) as f:
for line in f.readlines():
info = line.strip().split('\t')
if len(info) > 0:
self.data.append([info[0].strip(), info[1].strip()])
if mode == 'train':
self.transforms = T.Compose([
T.Resize((224,224)),
T.RandomHorizontalFlip(0.5), # 随机水平翻转
T.ToMode("CHW"), # 数据的格式转换和标准化 HWC => CHW
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # 图像归一化
])
else:
self.transforms = T.Compose([
T.Resize((224,224)), # 图像大小修改
# T.RandomCrop(IMAGE_SIZE), # 随机裁剪
T.ToMode("CHW"), # 数据的格式转换和标准化 HWC => CHW
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # 图像归一化
])
def get_origin_data(self):
return self.data
def __getitem__(self, index):
"""
根据索引获取单个样本
"""
image_file, label = self.data[index]
image = Image.open(image_file)
if image.mode != 'RGB':
image = image.convert('RGB')
image = self.transforms(image)
return image, np.array(label, dtype='int64')
def __len__(self):
"""
获取样本总数
"""
return len(self.data)
train_dataset=MiaoMiaoDataset(mode='train')
test_dataset=MiaoMiaoDataset(mode='test')
print('train_data len: {}, test_data len:{}'.format(train_dataset.__len__(), test_dataset.__len__()))
train_data len: 45, test_data len:7
2. Определение модели
В настоящее время данные разделены на обучающие и тестовые наборы данных, а также на количество классификаций.
Далее мы определим модель и снова рассмотрим сеть resnet.
import math
import megengine.functional as F
import megengine.hub as hub
import megengine.module as M
class BasicBlock(M.Module):
expansion = 1
def __init__(
self,
in_channels,
channels,
stride=1,
groups=1,
base_width=64,
dilation=1,
norm=M.BatchNorm2d,
):
super().__init__()
if groups != 1 or base_width != 64:
raise ValueError("BasicBlock only supports groups=1 and base_width=64")
if dilation > 1:
raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
self.conv1 = M.Conv2d(
in_channels, channels, 3, stride, padding=dilation, bias=False
)
self.bn1 = norm(channels)
self.conv2 = M.Conv2d(channels, channels, 3, 1, padding=1, bias=False)
self.bn2 = norm(channels)
self.downsample = (
M.Identity()
if in_channels == channels and stride == 1
else M.Sequential(
M.Conv2d(in_channels, channels, 1, stride, bias=False), norm(channels),
)
)
def forward(self, x):
identity = x
x = self.conv1(x)
x = self.bn1(x)
x = F.relu(x)
x = self.conv2(x)
x = self.bn2(x)
identity = self.downsample(identity)
x += identity
x = F.relu(x)
return x
class Bottleneck(M.Module):
expansion = 4
def __init__(
self,
in_channels,
channels,
stride=1,
groups=1,
base_width=64,
dilation=1,
norm=M.BatchNorm2d,
):
super().__init__()
width = int(channels * (base_width / 64.0)) * groups
self.conv1 = M.Conv2d(in_channels, width, 1, 1, bias=False)
self.bn1 = norm(width)
self.conv2 = M.Conv2d(
width,
width,
3,
stride,
padding=dilation,
groups=groups,
dilation=dilation,
bias=False,
)
self.bn2 = norm(width)
self.conv3 = M.Conv2d(width, channels * self.expansion, 1, 1, bias=False)
self.bn3 = norm(channels * self.expansion)
self.downsample = (
M.Identity()
if in_channels == channels * self.expansion and stride == 1
else M.Sequential(
M.Conv2d(in_channels, channels * self.expansion, 1, stride, bias=False),
norm(channels * self.expansion),
)
)
def forward(self, x):
identity = x
x = self.conv1(x)
x = self.bn1(x)
x = F.relu(x)
x = self.conv2(x)
x = self.bn2(x)
x = F.relu(x)
x = self.conv3(x)
x = self.bn3(x)
identity = self.downsample(identity)
x += identity
x = F.relu(x)
return x
class ResNet(M.Module):
def __init__(
self,
block,
layers,
num_classes=1000,
zero_init_residual=False,
groups=1,
width_per_group=64,
replace_stride_with_dilation=None,
norm=M.BatchNorm2d,
):
super().__init__()
self.in_channels = 64
self.dilation = 1
if replace_stride_with_dilation is None:
# each element in the tuple indicates if we should replace
# the 2x2 stride with a dilated convolution instead
replace_stride_with_dilation = [False, False, False]
if len(replace_stride_with_dilation) != 3:
raise ValueError(
"replace_stride_with_dilation should be None "
"or a 3-element tuple, got {}".format(replace_stride_with_dilation)
)
self.groups = groups
self.base_width = width_per_group
self.conv1 = M.Conv2d(
3, self.in_channels, kernel_size=7, stride=2, padding=3, bias=False
)
self.bn1 = norm(self.in_channels)
self.maxpool = M.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 64, layers[0], norm=norm)
self.layer2 = self._make_layer(
block,
128,
layers[1],
stride=2,
dilate=replace_stride_with_dilation[0],
norm=norm,
)
self.layer3 = self._make_layer(
block,
256,
layers[2],
stride=2,
dilate=replace_stride_with_dilation[1],
norm=norm,
)
self.layer4 = self._make_layer(
block,
512,
layers[3],
stride=2,
dilate=replace_stride_with_dilation[2],
norm=norm,
)
self.fc = M.Linear(512 * block.expansion, num_classes)
for m in self.modules():
if isinstance(m, M.Conv2d):
M.init.msra_normal_(m.weight, mode="fan_out", nonlinearity="relu")
if m.bias is not None:
fan_in, _ = M.init.calculate_fan_in_and_fan_out(m.weight)
bound = 1 / math.sqrt(fan_in)
M.init.uniform_(m.bias, -bound, bound)
elif isinstance(m, M.BatchNorm2d):
M.init.ones_(m.weight)
M.init.zeros_(m.bias)
elif isinstance(m, M.Linear):
M.init.msra_uniform_(m.weight, a=math.sqrt(5))
if m.bias is not None:
fan_in, _ = M.init.calculate_fan_in_and_fan_out(m.weight)
bound = 1 / math.sqrt(fan_in)
M.init.uniform_(m.bias, -bound, bound)
# Zero-initialize the last BN in each residual branch,
# so that the residual branch starts with zeros, and each residual block
# behaves like an identity. According to https://arxiv.org/abs/1706.02677
# This improves the model by 0.2~0.3%.
if zero_init_residual:
for m in self.modules():
if isinstance(m, Bottleneck):
M.init.zeros_(m.bn3.weight)
elif isinstance(m, BasicBlock):
M.init.zeros_(m.bn2.weight)
def _make_layer(
self, block, channels, blocks, stride=1, dilate=False, norm=M.BatchNorm2d
):
previous_dilation = self.dilation
if dilate:
self.dilation *= stride
stride = 1
layers = []
layers.append(
block(
self.in_channels,
channels,
stride,
groups=self.groups,
base_width=self.base_width,
dilation=previous_dilation,
norm=norm,
)
)
self.in_channels = channels * block.expansion
for _ in range(1, blocks):
layers.append(
block(
self.in_channels,
channels,
groups=self.groups,
base_width=self.base_width,
dilation=self.dilation,
norm=norm,
)
)
return M.Sequential(*layers)
def extract_features(self, x):
outputs = {}
x = self.conv1(x)
x = self.bn1(x)
x = F.relu(x)
x = self.maxpool(x)
outputs["stem"] = x
x = self.layer1(x)
outputs["res2"] = x
x = self.layer2(x)
outputs["res3"] = x
x = self.layer3(x)
outputs["res4"] = x
x = self.layer4(x)
outputs["res5"] = x
return outputs
def forward(self, x):
x = self.extract_features(x)["res5"]
x = F.avg_pool2d(x, 7)
x = F.flatten(x, 1)
x = self.fc(x)
return x
@hub.pretrained(
"https://data.megengine.org.cn/models/weights/resnet18_naiveaug_70312_78a63ca6.pkl"
)
def resnet18(**kwargs):
r"""ResNet-18 model from
`"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_
"""
return ResNet(BasicBlock, [2, 2, 2, 2], **kwargs)
@hub.pretrained(
"https://data.megengine.org.cn/models/weights/resnet34_naiveaug_73960_fd9d869d.pkl"
)
def resnet34(**kwargs):
r"""ResNet-34 model from
`"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_
"""
return ResNet(BasicBlock, [3, 4, 6, 3], **kwargs)
@hub.pretrained(
"https://data.megengine.org.cn/models/weights/resnet50_fbaug_76254_4e14b7d1.pkl"
)
def resnet50(**kwargs):
r"""ResNet-50 model from
`"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_
"""
return ResNet(Bottleneck, [3, 4, 6, 3], **kwargs)
@hub.pretrained(
"https://data.megengine.org.cn/models/weights/resnet101_fbaug_77944_b7932921.pkl"
)
def resnet101(**kwargs):
r"""ResNet-101 model from
`"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_
"""
return ResNet(Bottleneck, [3, 4, 23, 3], **kwargs)
@hub.pretrained(
"https://data.megengine.org.cn/models/weights/resnet152_fbaug_78582_7551aff3.pkl"
)
def resnet152(**kwargs):
r"""ResNet-152 model from
`"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_
"""
return ResNet(Bottleneck, [3, 8, 36, 3], **kwargs)
@hub.pretrained(
"https://data.megengine.org.cn/models/weights/resnext50_32x4d_fbaug_77592_c4b04e5e.pkl"
)
def resnext50_32x4d(**kwargs):
r"""ResNeXt-50 32x4d model from
`"Aggregated Residual Transformation for Deep Neural Networks"
<https://arxiv.org/pdf/1611.05431.pdf>`_
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
progress (bool): If True, displays a progress bar of the download to stderr
"""
kwargs["groups"] = 32
kwargs["width_per_group"] = 4
return ResNet(Bottleneck, [3, 4, 6, 3], **kwargs)
@hub.pretrained(
"https://data.megengine.org.cn/models/weights/resnext101_32x8d_fbaug_79520_80efb344.pkl"
)
def resnext101_32x8d(**kwargs):
r"""ResNeXt-101 32x8d model from
`"Aggregated Residual Transformation for Deep Neural Networks"
<https://arxiv.org/pdf/1611.05431.pdf>`_
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
progress (bool): If True, displays a progress bar of the download to stderr
"""
kwargs["groups"] = 32
kwargs["width_per_group"] = 8
return ResNet(Bottleneck, [3, 4, 23, 3], **kwargs)
class AverageMeter:
"""Computes and stores the average and current value"""
def __init__(self, name, fmt=":.3f"):
self.name = name
self.fmt = fmt
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def __str__(self):
fmtstr = "{name} {val" + self.fmt + "} ({avg" + self.fmt + "})"
return fmtstr.format(**self.__dict__)
def valid(func, data_queue, args):
objs = AverageMeter("Loss")
top1 = AverageMeter("Acc@1")
top5 = AverageMeter("Acc@5")
clck = AverageMeter("Time")
t = time.time()
for step, (image, label) in enumerate(data_queue):
image = megengine.tensor(image, dtype="float32")
label = megengine.tensor(label, dtype="int32")
n = image.shape[0]
loss, acc1, acc5 = func(image, label)
objs.update(loss.item(), n)
top1.update(100 * acc1.item(), n)
top5.update(100 * acc5.item(), n)
clck.update(time.time() - t, n)
t = time.time()
if step % args.print_freq == 0 and dist.get_rank() == 0:
logging.info("Test step %d, %s %s %s %s", step, objs, top1, top5, clck)
return objs.avg, top1.avg, top5.avg
3. Модельное обучение
def worker(args):
# pylint: disable=too-many-statements
if dist.get_rank() == 0:
os.makedirs(os.path.join(args.save, args.arch), exist_ok=True)
megengine.logger.set_log_file(os.path.join(args.save, args.arch, "log.txt"))
# build dataset
train_dataloader, valid_dataloader = build_dataset(args)
train_queue = iter(train_dataloader) # infinite
steps_per_epoch = 1280000 // (dist.get_world_size() * args.batch_size)
# build model
model = snet_model.__dict__[18]()
# Sync parameters and buffers
if dist.get_world_size() > 1:
dist.bcast_list_(model.parameters())
dist.bcast_list_(model.buffers())
# Autodiff gradient manager
gm = autodiff.GradManager().attach(
model.parameters(),
callbacks=dist.make_allreduce_cb("mean") if dist.get_world_size() > 1 else None,
)
# Optimizer
params_wd = []
params_nwd = []
for n, p in model.named_parameters():
if n.find("weight") >= 0 and len(p.shape) > 1:
print("include ", n, p.shape)
params_wd.append(p)
else:
print("NOT include ", n, p.shape)
params_nwd.append(p)
opt = optim.SGD(
[
{"params": params_wd},
{"params": params_nwd, "weight_decay": 0},
],
lr=args.lr * dist.get_world_size(),
momentum=args.momentum,
weight_decay=args.weight_decay,
)
# train and valid func
def train_step(image, label):
with gm:
logits = model(image)
loss = F.nn.cross_entropy(logits, label, label_smooth=0.1)
acc1, acc5 = F.topk_accuracy(logits, label, topk=(1, 5))
gm.backward(loss)
opt.step().clear_grad()
return loss, acc1, acc5
def valid_step(image, label):
logits = model(image)
loss = F.nn.cross_entropy(logits, label, label_smooth=0.1)
acc1, acc5 = F.topk_accuracy(logits, label, topk=(1, 5))
# calculate mean values
if dist.get_world_size() > 1:
loss = F.distributed.all_reduce_sum(loss) / dist.get_world_size()
acc1 = F.distributed.all_reduce_sum(acc1) / dist.get_world_size()
acc5 = F.distributed.all_reduce_sum(acc5) / dist.get_world_size()
return loss, acc1, acc5
# linear learning rate scheduler
def adjust_learning_rate(step):
lr = args.lr * dist.get_world_size() * (1 - step / (args.epochs * steps_per_epoch))
for param_group in opt.param_groups:
param_group["lr"] = lr
return lr
# start training
objs = AverageMeter("Loss")
top1 = AverageMeter("Acc@1")
top5 = AverageMeter("Acc@5")
clck = AverageMeter("Time")
for step in range(0, args.epochs * steps_per_epoch):
lr = adjust_learning_rate(step)
t = time.time()
image, label = next(train_queue)
image = megengine.tensor(image, dtype="float32")
label = megengine.tensor(label, dtype="int32")
loss, acc1, acc5 = train_step(image, label)
objs.update(loss.item())
top1.update(100 * acc1.item())
top5.update(100 * acc5.item())
clck.update(time.time() - t)
if step % args.print_freq == 0 and dist.get_rank() == 0:
logging.info(
"Epoch %d Step %d, LR %.4f, %s %s %s %s",
step // steps_per_epoch,
step,
lr,
objs,
top1,
top5,
clck,
)
objs.reset()
top1.reset()
top5.reset()
clck.reset()
if (step + 1) % steps_per_epoch == 0:
model.eval()
_, valid_acc1, valid_acc5 = valid(valid_step, valid_dataloader, args)
model.train()
logging.info(
"Epoch %d Test Acc@1 %.3f, Acc@5 %.3f",
(step + 1) // steps_per_epoch,
valid_acc1,
valid_acc5,
)
if dist.get_rank() == 0:
megengine.save(
{
"epoch": (step + 1) // steps_per_epoch,
"state_dict": model.state_dict(),
},
os.path.join(args.save, args.arch, "checkpoint.pkl"),
)
предсказывать
Делайте прогнозы на основе данных test_dataset
print('测试数据集样本量:{}'.format(len(test_dataset)))
测试数据集样本量:7
# 执行预测
result = model.predict(test_dataset)
Predict begin...
step 7/7 [==============================] - 32ms/step
Predict samples: 7
# 打印前10条看看结果
for idx in range(7):
predict_label = str(np.argmax(result[0][idx]))
real_label = str(test_dataset.__getitem__(idx)[1])
print('样本ID:{}, 真实标签:{}, 预测值:{}'.format(idx, real_label, predict_label))
样本ID:0, 真实标签:0, 预测值:0
样本ID:1, 真实标签:0, 预测值:0
样本ID:2, 真实标签:2, 预测值:2
样本ID:3, 真实标签:3, 预测值:3
样本ID:4, 真实标签:3, 预测值:3
样本ID:5, 真实标签:4, 预测值:0
样本ID:6, 真实标签:4, 预测值:1
# 定义画图方法
from PIL import Image
import matplotlib.font_manager as font_manager
import matplotlib.pyplot as plt
%matplotlib inline
fontpath = 'MINGHEI_R.TTF'
font = font_manager.FontProperties(fname=fontpath, size=10)
def show_img(img, predict):
plt.figure()
plt.title(predict, FontProperties=font)
plt.imshow(img, cmap=plt.cm.binary)
plt.show()
# 抽样展示
origin_data=test_dataset.get_origin_data()
for i in range(7):
img_path=origin_data[i][0]
real_label=str(origin_data[i][1])
predict_label= str(np.argmax(result[0][i]))
img=Image.open(img_path)
title='样本ID:{}, 真实标签:{}, 预测值:{}'.format(idx, real_label, predict_label)
show_img(img, title)