任务分类:
- 图像检测:检测物体,返回边框
- 图像分类与图像检索
- 超分辨率重构
- 医疗图片
- OCR
- 无人驾驶
- 人脸识别
一个MNIST的简单模型
import torch
class Config(object):
def __init__(self):
self.device_name = "cuda:0" if torch.cuda.is_available() else "cpu"
self.device = torch.device(self.device_name)
def get_status(self):
print('torch version:', torch.__version__)
print(f'正在使用{self.device_name}。可用GPU的数量={torch.cuda.device_count()}')
config = Config()
config.get_status()
# %%
import torch.utils.data
from torchvision import transforms, datasets
from torch import nn
input_size, num_classes = 28, 10
num_epochs, batch_size = 3, 64
# %%
# 可以直接作为 dataset,但为了通用性只把它当作数据下载器
train_dataset_tmp = datasets.MNIST(root='./data', train=True)
test_dataset_tmp = datasets.MNIST(root='./data', train=False)
train_data, train_targets = train_dataset_tmp.data.reshape(-1, 1, 28, 28).float(), train_dataset_tmp.targets
test_data, test_targets = test_dataset_tmp.data.reshape(-1, 1, 28, 28).float(), test_dataset_tmp.targets
class MyTransformer(object):
def __init__(self):
pass
def __call__(self, data):
return torch.tensor(data).to(config.device)
class TargetTransformer(object):
def __init__(self):
pass
def __call__(self, label):
return torch.tensor(label).to(config.device)
my_transformer = MyTransformer()
target_transformer = TargetTransformer()
class MyDataSet(torch.utils.data.Dataset):
def __init__(self, data, targets, transform, target_transform):
self.data, self.targets = data, targets
self.transform = transform
self.target_transform = target_transform
def __len__(self):
return len(self.targets)
def __getitem__(self, idx):
return self.transform(self.data[idx]), self.target_transform(self.targets[idx])
train_dataset = MyDataSet(data=train_data, targets=train_targets,
transform=my_transformer, target_transform=target_transformer)
test_dataset = MyDataSet(data=test_data, targets=test_targets,
transform=my_transformer, target_transform=target_transformer)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True)
# %%构建
class MyCNN(nn.Module):
def __init__(self):
super(MyCNN, self).__init__()
self.conv1 = nn.Sequential(
nn.Conv2d(in_channels=1 # 灰度图
, out_channels=16, kernel_size=5, stride=1
, padding=2) # 如果像保持输出的 size 和原来一样,需要 padding = (kernel_size-1)/2 if stride=1
, nn.ReLU()
, nn.MaxPool2d(kernel_size=2)
)
self.conv2 = nn.Sequential(
nn.Conv2d(in_channels=16, out_channels=32, kernel_size=5, stride=1, padding=2)
, nn.ReLU()
, nn.MaxPool2d(2)
)
# 最后一层是全连接层
self.out = nn.Linear(32 * 7 * 7, 10)
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = x.view(x.size(0), -1) # flatten 操作,结果为 batch_size,-1
output = self.out(x)
return output
# %% 训练
my_cnn = MyCNN().to(config.device)
criterion = nn.CrossEntropyLoss()
# 优化器
optimizer = torch.optim.Adam(my_cnn.parameters())
def accuracy(predictions, labels):
pred = torch.max(predictions.data, 1)[1]
rights = pred.eq(labels.data.view_as(pred)).sum()
return rights, len(labels)
for epoch in range(num_epochs):
# 当前 epoch的结果保存下来
train_rights = []
for batch_idx, (data, target) in enumerate(train_loader):
my_cnn.train() # 进入训练模式,Dropout 等生效
output = my_cnn(data)
loss = criterion(output, target)
optimizer.zero_grad()
loss.backward()
optimizer.step()
right = accuracy(output, target)
train_rights.append(right)
if batch_idx % 100 == 0:
my_cnn.eval() # 进入预测模式
test_rights = []
for (data, target) in test_loader:
output = my_cnn(data)
right = accuracy(output, target)
test_rights.append(right)
# 计算准确率
train_r = (sum(tup[0] for tup in train_rights).cpu(), sum(tup[1] for tup in train_rights))
test_r = (sum(tup[0] for tup in test_rights).cpu(), sum(tup[1] for tup in test_rights))
print('epoch {} [{}/{} ({:.0f}%)] loss = {:.6f} test_acc={:.4f} train_acc={:.4f}'
.format(epoch, batch_idx * batch_size, len(train_loader.dataset), 100 * batch_idx / len(train_loader),
loss.data,
train_r[0].numpy() / train_r[1],
test_r[0].numpy() / test_r[1]
))
无用:
# datasets.MNIST 可以直接生成 Dataset,但上面不这么做,只把它当作数据下载器来用,目的是展示更通用的方式
train_dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transforms.ToTensor(), download=True)
torchvision包
from torchvision import datasets,transforms
datasets
# 一些经典数据集,读如数据的 DataLoader
transforms
# 图像预处理,例如标准化,resize,图像增强
models:一些经典的预训练模型
https://pytorch.org/vision/stable/models.html
在 pre-trained 的经典模型上做图像分类
数据预处理
- 图像预处理(resize,标准化等),图像增强 torchvision.transform
- batch化:DataLoader
模型训练
- 加载预训练数据,之后做迁移学习
- 一般最后一层是全连接层,根据目的,改成自己的任务。
- 可以全部训练,也可以只训练最后几层,也可以先训练最后几层然后全部训练
- 选择预训练网络时,图片类型最好是与新任务相似的。
迁移学习
https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html
# from __future__ import print_function, division
import torch
from torch import nn, optim
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
import datetime
cudnn.benchmark = True
plt.ion() # interactive mode
logger_filename = 'log{}.txt'.format(datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S'))
open(logger_filename, 'w').close() # 清空日志文件
def print_logger(info):
# 打印日志的同时存储
info = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + ' ' + info
with open(logger_filename, 'a') as logger_file:
print(info, file=logger_file, flush=True)
print(info)
# %%
model_name = 'ResNet152' # ResNet18, ResNet152
batch_size = 64 # 8~64 ,与显存有关
num_workers = 8
num_threads = 30 # ???这个值怎样设定,如果是 GPU,这个影响不大
torch.set_num_threads(num_threads)
print_logger(f'batch size = {batch_size}, num_workers = {num_workers}')
train_on_gpu = torch.cuda.is_available()
if train_on_gpu:
print_logger('Training on GPU ...')
else:
print_logger('Training on CPU ...')
device = torch.device('cuda:0' if train_on_gpu else 'cpu')
# %%
data_dir = 'data/hymenoptera_data'
data_transforms = {
'train': transforms.Compose([
# 数据增强
transforms.RandomResizedCrop(224)
# 下面几个是可选的
, transforms.RandomHorizontalFlip(p=0.5)
, transforms.RandomVerticalFlip(p=0.5)
, transforms.RandomRotation(degrees=30)
, transforms.ColorJitter(brightness=0.2, contrast=0.1, saturation=0.1, hue=0.1) # 亮度、对比度、饱和度、色相
, transforms.RandomGrayscale(p=0.025) # 概率转灰度,得到3通道 R=G=B
, transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
'val': transforms.Compose([transforms.Resize(256),
transforms.CenterCrop(224), # 中心裁剪,使大小符合预训练模型
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
}
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x])
for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size,
shuffle=True, num_workers=num_workers)
for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes
num_classes = len(class_names)
# image_datasets['train'] # 显示详情
# image_datasets['train'].classes # 有多少类
# image_datasets['train'].class_to_idx # 类和序号的关系
print_logger(f'dataset_sizes = {dataset_sizes}; class_names = {class_names}')
# %%
def imshow(inp, title=None):
inp = np.array([0.229, 0.224, 0.225]) * inp.numpy().transpose((1, 2, 0)) + np.array([0.485, 0.456, 0.406])
plt.imshow(np.clip(inp, 0, 1))
if title is not None:
plt.title(title)
plt.pause(0.001) # pause a bit so that plots are updated
inputs, classes = next(iter(dataloaders['train']))
imshow(torchvision.utils.make_grid(inputs), title=[class_names[x] for x in classes])
# %%???如何选择模型
# 常用:['resnet', 'squeezenet', 'densenet', 'inception']
# 不常用:['alexnet', 'vgg']
def get_model(model_name, num_classes, train_whole):
# train_whole: 迁移学习时,是否训练前面的层
if model_name == 'ResNet18': # 45M
model_ft = models.resnet18(pretrained=True)
if not train_whole:
# 是否训练前面的层
for param in model_ft.parameters():
param.requires_grad = False
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, num_classes)
model_ft = model_ft.to(device)
elif model_name == 'ResNet152': # ResNet:242M
model_ft = models.resnet152(pretrained=True)
if not train_whole:
# 是否训练前面的层
for param in model_ft.parameters():
param.requires_grad = False
num_ftrs = model_ft.fc.in_features # 原本最后一层的输入维度
# model_ft.fs = nn.Sequential(nn.Linear(num_ftrs, num_classes),
# nn.LogSoftmax(dim=1))
model_ft.fc = nn.Linear(num_ftrs, num_classes)
model_ft = model_ft.to(device)
else:
print(f'Invalid model_name {model_name}')
model_ft = None
return model_ft
model_ft = get_model(model_name, num_classes=num_classes, train_whole=False)
# %% 优化器
num_epochs = 25
learning_rate = 0.001
criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(model_ft.parameters(), lr=learning_rate, momentum=0.9)
scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
# 其它方法
# optimizer_ft = optim.Adam(params=params_to_update, lr=1e-2)
# # 学习率衰减:每7个epoch衰减为原来的 1/10
# scheduler = optim.lr_scheduler.StepLR(optimizer=optimizer_ft, step_size=7, gamma=0.1)
# # 最后一层是 nn.LogSoftmax(),用nn.NLLLoss(); nn.CrossEntropyLoss() 相当于 nn.LogSoftmax() 和 nn.NLLLoss() 的组合
# criterion = nn.NLLLoss()
# %%
def train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs=25):
since = time.time()
# 保存最好的模型
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
for epoch in range(num_epochs):
print_logger('-' * 10 + f'Epoch {epoch}/{num_epochs - 1}' + '-' * 10)
for phase in ['train', 'val']:
if phase == 'train':
model.train()
else:
model.eval()
running_loss, running_corrects = 0, 0
for inputs, labels in dataloaders[phase]:
inputs = inputs.to(device)
labels = labels.to(device)
optimizer.zero_grad()
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
if phase == 'train':
loss.backward()
optimizer.step()
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
if phase == 'train':
scheduler.step()
epoch_loss = running_loss / dataset_sizes[phase]
epoch_acc = running_corrects.double() / dataset_sizes[phase]
print_logger(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
# 更新最好的模型
if phase == 'val' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())
time_elapsed = time.time() - since
print_logger(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
print_logger(f'Best val Acc: {best_acc:4f}')
# 加载最好的模型作为最终结果
model.load_state_dict(best_model_wts)
return model, best_acc
model_ft, best_acc = train_model(model_ft, dataloaders, criterion, optimizer_ft, scheduler, num_epochs=num_epochs)
# %%
def visualize_model(model, num_images=6):
was_training = model.training
model.eval()
images_so_far = 0
fig = plt.figure()
with torch.no_grad():
for i, (inputs, labels) in enumerate(dataloaders['val']):
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
for j in range(inputs.size()[0]):
images_so_far += 1
ax = plt.subplot(num_images // 2, 2, images_so_far)
ax.axis('off')
ax.set_title(f'predicted: {class_names[preds[j]]}')
imshow(inputs.cpu().data[j])
if images_so_far == num_images:
model.train(mode=was_training)
return
model.train(mode=was_training)
visualize_model(model_ft)
# %%存模型
model_filename = 'checkpoint.pth'
state = {'state_dict': model_ft.state_dict(),
'best_acc': best_acc,
'optimizer': optimizer_ft.state_dict(), }
torch.save(state, model_filename)
# %% 读模型
checkpoint = torch.load(model_filename)
model_ft.load_state_dict(checkpoint['state_dict'])
# todo:实时存储最好的模型,并继续训练
# %%继续训练:这一次全部层都训练
checkpoint = torch.load(model_filename)
model_ft.load_state_dict(checkpoint['state_dict'])
best_acc = checkpoint['best_acc']
optimizer = optim.SGD(model_ft.parameters(), lr=learning_rate, momentum=0.9)
optimizer.load_state_dict(checkpoint['optimizer'])
for param in model_ft.parameters():
param.requires_grad = True
model_ft, best_acc = train_model(model_ft, dataloaders, criterion, optimizer_ft, scheduler, num_epochs=num_epochs)
# %% 模型的使用
from PIL import Image
transforms = data_transforms['val']
img = Image.open('bee1.jpeg')
img = transforms(img).reshape(1, 3, 224, 224)
predicts = model_ft(img)
_, pred = torch.max(predicts, 1)
class_names[pred.item()]
# %%
def get_model2(model_name, num_classes, train_whole):
input_size = 224
if model_name == 'alexnet': # 233M
model_ft = models.alexnet(pretrained=True)
num_ftrs = model_ft.classifier[6].in_features
model_ft.classifier[6] = nn.Linear(num_ftrs, num_classes)
elif model_name == 'vgg16': # 553M
model_ft = models.vgg16(pretrained=True)
num_ftrs = model_ft.classifier[6].in_features
model_ft.classifier[6] = nn.Linear(num_ftrs, num_classes)
elif model_name == 'squeezenet':
model_ft = models.squeezenet1_0(pretrained=True) # 5M
model_ft.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1, 1), stride=(1, 1))
model_ft.num_classes = num_classes
elif model_name == 'densenet':
model_ft = models.densenet121(pretrained=True) # 31M
num_ftrs = model_ft.classifier.in_features
model_ft.classifier = nn.Linear(num_ftrs, num_classes)
model_ft.num_classes = num_classes
elif model_name == 'inception':
model_ft = models.inception_v3(pretrained=True) # 104M
num_ftrs = model_ft.AuxLogits.in_features
model_ft.AuxLogits.fc = nn.Linear(num_ftrs, num_classes)
model_ft.fc = nn.Linear(num_ftrs, num_classes)
input_size = 299
elif model_name == 'vgg19': # 575M
from collections import OrderedDict
model_ft = models.vgg19(pretrained=True)
model_ft.classifier = nn.Sequential(OrderedDict([
('fc1', nn.Linear(25088, 4096)), # First layer
('relu', nn.ReLU()), # Apply activation function
('fc2', nn.Linear(4096, 102)), # Output layer
('output', nn.LogSoftmax(dim=1)) # Apply loss function
]))
else:
print('Invalid model name')
return
return model_ft, input_size
说明:
- torch 对图像格式的规定是
size = (batch_size, channel, 像素, 像素)