卷积神经网络图像分类算法小集

目录结构

训练结构

· 在项目根目录下新建数据集文件夹data_set,建立子文件夹(数据集名称)用于存放训练集和测试集;

· 在项目根目录下新建数据集文件夹class_j,用于存放分类json文件;

· 在项目根目录下新建数据集文件夹models,用于存放训练好的模型文件;

· 神经网络model.py

· 训练脚本train.py

· 预测脚本predict.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# project
├── data_set
│ ├── data
│ ├── train
│ │ ├── 00001.jpg
│ │ ├── 00002.jpg
│ │ ├── 00003.jpg
│ │ ├── ...
│ │ └── 10000.jpg
│ └── val
│ ├── 00001.jpg
│ ├── 00002.jpg
│ ├── 00003.jpg
│ ├── ...
│ └── 01000.jpg
├── class_j
│ ├── class_indices.json
├── models
│ ├── model.pth
├── model.py
├── train.py
└── predict.py

封装结构

GoogLeNet神经网络为例:

1
2
3
4
5
6
# GoogLeNet
├── class_j
│ ├── class_indices.json
│── weights
│ ├── GoogLeNet_GPU_v1.pth
└── model.py

神经网络

VGG

神经网络

model.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
VGG模型
"""
import os
import time
import json
from io import BytesIO
from urllib.request import urlopen

import torch
import torch.nn as nn
from torchvision import transforms
from PIL import Image, ImageFile

from settings import PR, vgg_model

ImageFile.LOAD_TRUNCATED_IMAGES = True

# 预训练权重模型
model_urls = {
'vgg11': 'https://download.pytorch.org/models/vgg11-bbd30ac9.pth',
'vgg13': 'https://download.pytorch.org/models/vgg13-c768596a.pth',
'vgg16': 'https://download.pytorch.org/models/vgg16-397923af.pth',
'vgg19': 'https://download.pytorch.org/models/vgg19-dcbb9e9d.pth'
}

cfgs = {
'vgg11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
'vgg13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
'vgg16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
'vgg19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}


class VGG(nn.Module):
def __init__(self, features, num_classes=1000, init_weights=False):
super(VGG, self).__init__()
self.features = features
self.classifier = nn.Sequential(
nn.Linear(512 * 7 * 7, 4096), # 第1线性层, 2048 减少参数
nn.ReLU(True),
nn.Dropout(p=0.5),
nn.Linear(4096, 4096), # 第2线性层
nn.ReLU(True),
nn.Dropout(p=0.5),
nn.Linear(4096, num_classes), # 第3线性层
)
if init_weights:
self._initialize_weights()

def forward(self, x):
x = self.features(x) # N x 3 x 224 x 224
x = torch.flatten(x, start_dim=1) # N x 512 x 7 x 7
x = self.classifier(x) # N x 512*7*7

return x

def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
nn.init.constant_(m.bias, 0)


def make_features(cfg: list):
layers = []
in_channels = 3
for v in cfg:
if v == "M":
layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
else:
conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
layers += [conv2d, nn.ReLU(True)]
in_channels = v

return nn.Sequential(*layers)


def vgg(model_name="vgg16", **kwargs):
assert model_name in cfgs, "Warning: model number {} not in cfgs dist!".format(model_name)
cfg = cfgs[model_name]
model = VGG(make_features(cfg), **kwargs)

return model


class VGGNetImageClass:
def __init__(self):
# self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.device = torch.device("cpu")
self.data_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
self.json_path = os.path.join(PR, "im_weight_vgg/class_j/class_indices.json")
self.weights_path = os.path.join(PR, f"im_weight_vgg/weights/{vgg_model}")

self.model = vgg(model_name="vgg19", num_classes=13).to(self.device)
self.model.load_state_dict(torch.load(self.weights_path, map_location=self.device))
self.model.eval()

with open(self.json_path, 'r') as f:
self.class_indices = json.load(f)

def detection(self, img):
img = self.data_transform(img) # [N, C H, W]
img = torch.unsqueeze(img, dim=0)
with torch.no_grad():
output = torch.squeeze(self.model(img.to(self.device))).cpu()
predict = torch.softmax(output, dim=0)
predict_cla = torch.argmax(predict).numpy()
result = {
'class': predict_cla.tolist(),
'prob': predict[predict_cla].numpy().tolist()
}
return result

训练

train.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""
训练(GPU)
"""
import os
import sys
import json
import time
import torch
import torch.nn as nn
from torchvision import transforms, datasets, utils
import matplotlib.pyplot as plt
import numpy as np
import torch.optim as optim
from tqdm import tqdm

from model import vgg

from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True


def main():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"use device is {device}")

data_transform = {
"train": transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]),
"val": transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])
}
data_root = os.path.abspath(os.path.join(os.getcwd(), "./"))
image_path = os.path.join(data_root, "data_set", "data")
assert os.path.exists(image_path), "{} path does not exist.".format(image_path)
train_dataset = datasets.ImageFolder(root=os.path.join(image_path, "train"),
transform=data_transform["train"]
)
train_num = len(train_dataset)
flower_list = train_dataset.class_to_idx
cla_dict = dict((val, key) for key, val in flower_list.items())
json_str = json.dumps(cla_dict, indent=12)
with open("class_j/class_indices.json", 'w') as json_file:
json_file.write(json_str)

batch_size = 32
nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # 线程数计算
nw = 0
print(f"Using {nw} dataloader workers every process.")

train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=nw
)
val_dataset = datasets.ImageFolder(root=os.path.join(image_path, "val"),
transform=data_transform["val"]
)
val_num = len(val_dataset)
val_loader = torch.utils.data.DataLoader(val_dataset,
batch_size=4,
shuffle=False,
num_workers=nw
)
print(f"Using {train_num} images for training, {val_num} images for validation.")

model_name = "vgg19"
net = vgg(model_name=model_name, num_classes=13, init_weights=True) # 实例化网络(13分类)

# """ 加载预训练模型权重
model_weight_path = './models/VGG19Net_GPU_v5.pth'
net.load_state_dict(torch.load(model_weight_path, map_location='cpu'))
# """
net.to(device)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.0001)

epochs = 300
save_path = "./VGG19Net_GPU_RE.pth"
best_accuracy = 0.0
train_steps = len(train_loader)
for epoch in range(epochs):
net.train()
running_loss = 0.0
train_bar = tqdm(train_loader, file=sys.stdout)
for step, data in enumerate(train_bar):
images, labels = data
optimizer.zero_grad()
outputs = net(images.to(device))
loss = loss_function(outputs, labels.to(device))
loss.backward()
optimizer.step()
running_loss += loss.item()
train_bar.desc = "train epoch [{}/{}] loss:{:.3f}".format(epoch + 1,
epochs,
loss
)
# 验证
net.eval()
acc = 0.0
with torch.no_grad():
val_bar = tqdm(val_loader, file=sys.stdout)
for val_data in val_bar:
val_images, val_labels = val_data
outputs = net(val_images.to(device))
predict_y = torch.max(outputs, dim=1)[1]
acc += torch.eq(predict_y, val_labels.to(device)).sum().item()
val_accuracy = acc / val_num
print("[epoch %d ] train_loss: %3f val_accurancy: %3f" %
(epoch + 1, running_loss / train_steps, val_accuracy))
if val_accuracy > best_accuracy:
best_accuracy = val_accuracy
torch.save(net.state_dict(), save_path+str(val_accuracy))
print("Finshed Training.")


if __name__ == '__main__':
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
main()

预测

predict.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
"""
预测
"""
from io import BytesIO
from urllib.request import urlopen
from PIL import Image, ImageFile

from im_weight_vgg.model import VGGNetImageClass

if __name__ == '__main__':
im_class = VGGNetImageClass()
url = 'http://192.168.3.18:300/files/group1/M00/11/0E/wKgCBWRUgjWAGTfNAAEKiw0XSZc371.jpg'
img_bytes = urlopen(url).read()
img_pil = Image.open(BytesIO(img_bytes))

print(im_class.detection(img_pil))
print(im_class.detection(Image.open("-532576361772532412_120.jpg").convert("RGB")))

附:vgg16+transformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import torch
from collections import namedtuple
from torchvision import models
import torch.nn as nn
import torch.nn.functional as F


# VGG16神经网络定义
class VGG16(torch.nn.Module):
"""Vgg16 Net"""
def __init__(self, requires_grad=False):
super(VGG16, self).__init__()
vgg_pretrained_features = models.vgg16(pretrained=True).features
self.slice1 = torch.nn.Sequential()
self.slice2 = torch.nn.Sequential()
self.slice3 = torch.nn.Sequential()
self.slice4 = torch.nn.Sequential()

for x in range(4):
self.slice1.add_module(str(x), vgg_pretrained_features[x])

for x in range(4, 9):
self.slice2.add_module(str(x), vgg_pretrained_features[x])

for x in range(9, 16):
self.slice3.add_module(str(x), vgg_pretrained_features[x])

for x in range(16, 23):
self.slice4.add_module(str(x), vgg_pretrained_features[x])

if not requires_grad:
for param in self.parameters():
param.requires_grad = False

def forward(self, X):
h = self.slice1(X)
h_relu1_2 = h
h = self.slice2(h)
h_relu2_2 = h
h = self.slice3(h)
h_relu3_3 = h
h = self.slice4(h)
h_relu4_3 = h

vgg_outputs = namedtuple("VggOutputs", ["relu1_2", "relu2_2", "relu3_3", "relu4_3"])
output = vgg_outputs(h_relu1_2, h_relu2_2, h_relu3_3, h_relu4_3)

return output


class TransformerNet(torch.nn.Module):
def __init__(self):
super(TransformerNet, self).__init__()
self.model = nn.Sequential(
ConvBlock(3, 32, kernel_size=9, stride=1),
ConvBlock(32, 64, kernel_size=3, stride=2),
ConvBlock(64, 128, kernel_size=3, stride=2),
ResidualBlock(128),
ResidualBlock(128),
ResidualBlock(128),
ResidualBlock(128),
ResidualBlock(128),
ConvBlock(128, 64, kernel_size=3, upsample=True),
ConvBlock(64, 32, kernel_size=3, upsample=True),
ConvBlock(32, 3, kernel_size=9, stride=1, normalize=False, relu=False),
)

def forward(self, x):
return self.model(x)


class ResidualBlock(torch.nn.Module):
def __init__(self, channels):
super(ResidualBlock, self).__init__()
self.block = nn.Sequential(
ConvBlock(channels, channels, kernel_size=3, stride=1, normalize=True, relu=True),
ConvBlock(channels, channels, kernel_size=3, stride=1, normalize=True, relu=False),
)

def forward(self, x):
return self.block(x) + x


class ConvBlock(torch.nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, stride=1, upsample=False, normalize=True, relu=True):
super(ConvBlock, self).__init__()
self.upsample = upsample
self.block = nn.Sequential(
nn.ReflectionPad2d(kernel_size // 2),
nn.Conv2d(in_channels, out_channels, kernel_size, (stride,))
)
self.norm = nn.InstanceNorm2d(out_channels, affine=True) if normalize else None
self.relu = relu

def forward(self, x):
if self.upsample:
x = F.interpolate(x, scale_factor=2)
x = self.block(x)
if self.norm is not None:
x = self.norm(x)
if self.relu:
x = F.relu(x)
return x

if __name__ == '__main__':
input1 = torch.rand([224, 3, 224, 224])
model_x = VGG16()
print(model_x)

GoogLeNet

model.py

神经网络

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import json
import torch.nn as nn
import torch
import torch.nn.functional as F
from torchvision import transforms
from settings import GoogLeNet_model

"""
# 定义卷积+激活函数操作模板
"""


class BasicConv2d(nn.Module):
def __init__(self, in_channels, out_channels, **kwargs):
super(BasicConv2d, self).__init__()
self.conv = nn.Conv2d(in_channels, out_channels, **kwargs)
self.relu = nn.ReLU(inplace=True)

def forward(self, x):
x = self.conv(x)
x = self.relu(x)

return x


"""
# 定义 Iception 辅助分类器模板
"""


class InceptionAux(nn.Module):
def __init__(self, in_channels, num_classes):
super(InceptionAux, self).__init__()
self.averagePool = nn.AvgPool2d(kernel_size=5, stride=3)
self.conv = BasicConv2d(in_channels, 128, kernel_size=1) # output = [batch, 128, 4, 4]
self.fc1 = nn.Linear(2048, 1024)
self.fc2 = nn.Linear(1024, num_classes)

def forward(self, x):
# aux1: N*512*14*14, aux2: N*528*14*14
x = self.averagePool(x)
# aux1: N*512*4*4, aux2: N*528*4*4
x = self.conv(x)
# N*128*4*4
x = torch.flatten(x, 1)
x = F.dropout(x, 0.5, training=self.training)
# N*2048
x = F.relu(self.fc1(x), inplace=True)
x = F.dropout(x, 0.5, training=self.training)
# N*1024
x = self.fc2(x)
# N*num_classes
return x


"""
# Inception 模板
"""


class Inception(nn.Module):
def __init__(self, in_channels, ch1x1, ch3x3red, ch3x3, ch5x5red, ch5x5, pool_proj):
super(Inception, self).__init__()
self.branch1 = BasicConv2d(in_channels, ch1x1, kernel_size=1)
self.branch2 = nn.Sequential(
BasicConv2d(in_channels, ch3x3red, kernel_size=1),
BasicConv2d(ch3x3red, ch3x3, kernel_size=3, padding=1) # 保证输出大小等于输入大小
)
self.branch3 = nn.Sequential(
BasicConv2d(in_channels, ch5x5red, kernel_size=1),
# 官方 3x3, https://github.com/pytorch/vision/issues/906
BasicConv2d(ch5x5red, ch5x5, kernel_size=5, padding=2) # 输出大小=输入大小
)
self.branch4 = nn.Sequential(
nn.MaxPool2d(kernel_size=3, stride=1, padding=1),
BasicConv2d(in_channels, pool_proj, kernel_size=1)
)

def forward(self, x):
branch1 = self.branch1(x)
branch2 = self.branch2(x)
branch3 = self.branch3(x)
branch4 = self.branch4(x)
outputs = [branch1, branch2, branch3, branch4]

return torch.cat(outputs, 1) # 拼接数据


"""
# GoogLeNet 模型
"""


class GoogLeNet(nn.Module):
def __init__(self, num_classes=1000, aux_logits=True, init_weights=False):
super(GoogLeNet, self).__init__()
self.aux_logits = aux_logits

self.conv1 = BasicConv2d(3, 64, kernel_size=7, stride=2, padding=3)
self.maxpool1 = nn.MaxPool2d(3, stride=2, ceil_mode=True)

self.conv2 = BasicConv2d(64, 64, kernel_size=1)
self.conv3 = BasicConv2d(64, 192, kernel_size=3, padding=1)
self.maxpool2 = nn.MaxPool2d(3, stride=2, ceil_mode=True)

self.inception3a = Inception(192, 64, 96, 128, 16, 32, 32)
self.inception3b = Inception(256, 128, 128, 192, 32, 96, 64)
self.maxpool3 = nn.MaxPool2d(3, stride=2, ceil_mode=True)

self.inception4a = Inception(480, 192, 96, 208, 16, 48, 64)
self.inception4b = Inception(512, 160, 112, 224, 24, 64, 64)
self.inception4c = Inception(512, 128, 128, 256, 24, 64, 64)
self.inception4d = Inception(512, 112, 144, 288, 32, 64, 64)
self.inception4e = Inception(528, 256, 160, 320, 32, 128, 128)
self.maxpool4 = nn.MaxPool2d(3, stride=2, ceil_mode=True)

self.inception5a = Inception(832, 256, 160, 320, 32, 128, 128)
self.inception5b = Inception(832, 384, 192, 384, 48, 128, 128)

if self.aux_logits:
self.aux1 = InceptionAux(512, num_classes)
self.aux2 = InceptionAux(528, num_classes)

self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.dropout = nn.Dropout(0.4)
self.fc = nn.Linear(1024, num_classes)
if init_weights:
self._initialize_weights()

def forward(self, x):
# N*3*224*224
x = self.conv1(x)
# N*64*112*112
x = self.maxpool1(x)
# N*64*56*56
x = self.conv2(x)
# N*64*56*56
x = self.conv3(x)
# N*192*56*56
x = self.maxpool2(x)

# N*192*28*28
x = self.inception3a(x)
# N*256*28*28
x = self.inception3b(x)
# N*480*28*28
x = self.maxpool3(x)
# N*480*14*14
x = self.inception4a(x)
# N*512*14*14
if self.training and self.aux_logits: # eval model lose this layer
aux1 = self.aux1(x)

x = self.inception4b(x)
# N*512*14*14
x = self.inception4c(x)
# N*512*14*14
x = self.inception4d(x)
# N*528*14*14
if self.training and self.aux_logits: # eval model lose this layer
aux2 = self.aux2(x)

x = self.inception4e(x)
# N*832*14*14
x = self.maxpool4(x)
# N*832*7*7
x = self.inception5a(x)
# N*832*7*7
x = self.inception5b(x)
# N*1024*7*7

x = self.avgpool(x)
# N*1024*1*1
x = torch.flatten(x, 1)
# N*1024
x = self.dropout(x)
x = self.fc(x)
# N*1000 (num_classes)
if self.training and self.aux_logits: # eval model lose this layer
return x, aux2, aux1

return x

def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.constant_(m.bias, 0)


class GoogLeNetImageClass:
def __init__(self):
# self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.device = torch.device("cpu")
self.data_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
self.json_path = "im_weight_gln/class_j/class_indices.json"
self.weights_path = f"im_weight_gln/weights/{GoogLeNet_model}"

self.model = GoogLeNet(num_classes=13, aux_logits=False).to(self.device)
self.model.load_state_dict(torch.load(self.weights_path, map_location=self.device), strict=False)
self.model.eval()

with open(self.json_path, 'r') as f:
self.class_indices = json.load(f)

def detection(self, img):
img = self.data_transform(img) # [N, C H, W]
img = torch.unsqueeze(img, dim=0)
with torch.no_grad():
output = torch.squeeze(self.model(img.to(self.device))).cpu()
predict = torch.softmax(output, dim=0)
predict_cla = torch.argmax(predict).numpy()
result = {
'class': predict_cla.tolist(),
'prob': predict[predict_cla].numpy().tolist()
}
return result

训练

train.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
训练(GPU)
"""
import os
import sys
import json
import time
import torch
import torch.nn as nn
from torchvision import transforms, datasets, utils
import matplotlib.pyplot as plt
import numpy as np
import torch.optim as optim
from tqdm import tqdm

from model import GoogLeNet


def main():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"use device is {device}")

data_transform = {
"train": transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]),
"val": transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])
}
data_root = os.path.abspath(os.path.join(os.getcwd(), "./"))
image_path = "/exp/work/algorithm/vgg/data_set/data"
assert os.path.exists(image_path), "{} path does not exist.".format(image_path)
train_dataset = datasets.ImageFolder(root=os.path.join(image_path, "train"),
transform=data_transform["train"]
)
train_num = len(train_dataset)
flower_list = train_dataset.class_to_idx
cla_dict = dict((val, key) for key, val in flower_list.items())
json_str = json.dumps(cla_dict, indent=12)
with open("calss_indices.json", 'w') as json_file:
json_file.write(json_str)

batch_size = 32
nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # 线程数计算
nw = 0
print(f"Using {nw} dataloader workers every process.")

train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=nw
)
val_dataset = datasets.ImageFolder(root=os.path.join(image_path, "val"),
transform=data_transform["val"]
)
val_num = len(val_dataset)
val_loader = torch.utils.data.DataLoader(val_dataset,
batch_size=4,
shuffle=False,
num_workers=nw
)
print(f"Using {train_num} images for training, {val_num} images for validation.")

net = GoogLeNet(num_classes=13, aux_logits=True, init_weights=True) # 实例化网络(5分类)
net.to(device)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.0001)

epochs = 300
save_path = "./GoogLeNet_GPU.pth"
best_accuracy = 0.0
train_steps = len(train_loader)
for epoch in range(epochs):
net.train()
running_loss = 0.0
train_bar = tqdm(train_loader, file=sys.stdout)
for step, data in enumerate(train_bar):
images, labels = data
optimizer.zero_grad()
logits, aux_logits2, aux_logits1 = net(images.to(device))
loss0 = loss_function(logits, labels.to(device))
loss1 = loss_function(aux_logits1, labels.to(device))
loss2 = loss_function(aux_logits2, labels.to(device))
loss = loss0 + loss1 * 0.3 + loss2 * 0.3
loss.backward()
optimizer.step()
running_loss += loss.item()
train_bar.desc = "train epoch [{}/{}] loss:{:.3f}".format(epoch + 1,
epochs,
loss
)
# 验证
net.eval()
acc = 0.0
with torch.no_grad():
val_bar = tqdm(val_loader, file=sys.stdout)
for val_data in val_bar:
val_images, val_labels = val_data
outputs = net(val_images.to(device))
predict_y = torch.max(outputs, dim=1)[1]
acc += torch.eq(predict_y, val_labels.to(device)).sum().item()
val_accuracy = acc / val_num
print("[epoch %d ] train_loss: %3f val_accurancy: %3f" %
(epoch + 1, running_loss / train_steps, val_accuracy))
if val_accuracy > best_accuracy:
best_accuracy = val_accuracy
torch.save(net.state_dict(), save_path)
print("Finshed Training.")


if __name__ == '__main__':
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
main()

预测

predict.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
"""
预测
"""
from io import BytesIO
from urllib.request import urlopen
from PIL import Image, ImageFile

from im_weight_gln.model import GoogLeNetImageClass

if __name__ == '__main__':
im_class = GoogLeNetImageClass()
url = 'http://192.168.3.18:300/files/group1/M00/11/0E/wKgCBWRUgjWAGTfNAAEKiw0XSZc371.jpg'
img_bytes = urlopen(url).read()
img_pil = Image.open(BytesIO(img_bytes))

print(im_class.detection(img_pil))
print(im_class.detection(Image.open("-532576361772532412_120.jpg").convert("RGB")))

ResNet/ResNext

神经网络

model.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
"""
ResNetr模型
"""

import os
import time
import json
from io import BytesIO
from urllib.request import urlopen

import torch
import torch.nn as nn
from torchvision import transforms
from PIL import Image, ImageFile

from settings import PR, ResNet_model

ImageFile.LOAD_TRUNCATED_IMAGES = True

model_urls = {
'resnet18': 'https://download.pytorch.org/models/resnet18-5c106cde.pth',
'resnet34': 'https://download.pytorch.org/models/resnet34-333f7ec4.pth',
'resnet50': 'https://download.pytorch.org/models/resnet50-19c8e357.pth',
'resnet101': 'https://download.pytorch.org/models/resnet101-5d3b4d8f.pth',
'resnet152': 'https://download.pytorch.org/models/resnet152-b121ed2d.pth',
}

"""
# 定义 BasicBlock 模块
# ResNet18/34的残差结构, 用的是2个3x3大小的卷积
"""


class BasicBlock(nn.Module):
expansion = 1 # 残差结构中, 判断主分支的卷积核个数是否发生变化,不变则为1

def __init__(self, in_channel, out_channel, stride=1, downsample=None, **kwargs): # downsample 对应虚线残差结构
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=out_channel,
kernel_size=3, stride=stride, padding=1, bias=False
)
self.bn1 = nn.BatchNorm2d(out_channel)
self.relu = nn.ReLU()
self.conv2 = nn.Conv2d(in_channels=out_channel, out_channels=out_channel,
kernel_size=3, stride=1, padding=1, bias=False
)
self.bn2 = nn.BatchNorm2d(out_channel)
self.downsample = downsample

def forward(self, x):
identity = x
if self.downsample is not None: # 虚线残差结构,需要下采样
identity = self.downsample(x) # 捷径分支short cut

out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)

out = self.conv2(out)
out = self.bn2(out)

out += identity
out = self.relu(out)

return out


"""
# 定义 Bottleneck 模块
# ResNet50/101/152的残差结构,用的是1x1+3x3+1x1的卷积
"""


class Bottleneck(nn.Module):
"""
# 注意:原论文中,在虚线残差结构的主分支上,第一个1x1卷积层的步距是2,第二个3x3卷积层步距是1。
# 但在pytorch官方实现过程中是第一个1x1卷积层的步距是1,第二个3x3卷积层步距是2,
# 这么做的好处是能够在top1上提升大概0.5%的准确率。
# 可参考Resnet v1.5 https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch
"""
expansion = 4 # 残差结构中第三层卷积核个数是第1/2层卷积核个数的4倍

def __init__(self, in_channel, out_channel, stride=1, downsample=None, groups=1, width_per_group=64):
super(Bottleneck, self).__init__()

width = int(out_channel * (width_per_group / 64.)) * groups

self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=width, kernel_size=1, stride=1, bias=False)
self.bn1 = nn.BatchNorm2d(width)

self.conv2 = nn.Conv2d(in_channels=width, out_channels=width, groups=groups,
kernel_size=3, stride=stride, bias=False, padding=1
)
self.bn2 = nn.BatchNorm2d(width)

self.conv3 = nn.Conv2d(in_channels=width, out_channels=out_channel * self.expansion,
kernel_size=1, stride=1, bias=False)
self.bn3 = nn.BatchNorm2d(out_channel * self.expansion)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample

def forward(self, x):
identity = x
if self.downsample is not None:
identity = self.downsample(x) # 捷径分支short cut

out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)

out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)

out = self.conv3(out)
out = self.bn3(out)

out += identity
out = self.relu(out)

return out


"""
# 残差网络结构
"""


class ResNet(nn.Module):
# block = BasicBlock or Bottleneck
# blocks_num 为残差结构中 conv2_x~conv5_x 中残差块个数, 一个列表
def __init__(self, block, blocks_num, num_classes=1000, include_top=True, groups=1, width_per_group=64):
super(ResNet, self).__init__()
self.include_top = include_top
self.in_channel = 64
self.groups = groups
self.width_per_group = width_per_group

self.conv1 = nn.Conv2d(3, self.in_channel, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = nn.BatchNorm2d(self.in_channel)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 64, blocks_num[0])
self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=2)
self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=2)
self.layer4 = self._make_layer(block, 512, blocks_num[3], stride=2)
if self.include_top:
self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) # output size = (1, 1)
self.fc = nn.Linear(512 * block.expansion, num_classes)

for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

# channel 为残差结构中第1层卷积核个数
def _make_layer(self, block, channel, block_num, stride=1):
downsample = None
# ResNet50/101/152 的残差结构, block.expansion=4
if stride != 1 or self.in_channel != channel * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(channel * block.expansion)
)

layers = []
layers.append(block(self.in_channel,
channel,
downsample=downsample,
stride=stride,
groups=self.groups,
width_per_group=self.width_per_group,
))
self.in_channel = channel * block.expansion

for _ in range(1, block_num):
layers.append(block(self.in_channel,
channel,
groups=self.groups,
width_per_group=self.width_per_group,
))

return nn.Sequential(*layers)

def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)

x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)

if self.include_top:
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)

return x


"""
# resnet34 结构
# https://download.pytorch.org/models/resnet34-333f7ec4.pth
"""


def resnet34(num_classes=1000, include_top=True):
return ResNet(BasicBlock, [3, 4, 6, 3], num_classes=num_classes, include_top=include_top)


"""
# resnet50 结构
# https://download.pytorch.org/models/resnet50-19c8e357.pth
"""


def resnet50(num_classes=1000, include_top=True):
return ResNet(Bottleneck, [3, 4, 6, 3], num_classes=num_classes, include_top=include_top)


"""
# resnet101 结构
# https://download.pytorch.org/models/resnet101-5d3b4d8f.pth
"""


def resnet101(num_classes=1000, include_top=True):
return ResNet(Bottleneck, [3, 4, 23, 3], num_classes=num_classes, include_top=include_top)


"""
# resnet152 结构
# https://download.pytorch.org/models/resnet152-b121ed2d.pth
"""


def resnet152(num_classes=1000, include_top=True):
return ResNet(Bottleneck, [3, 8, 36, 3], num_classes=num_classes, include_top=include_top)


"""
# resnext50_32x4d 结构
# https://download.pytorch.org/models/resnext50_32x4d-7cdf4587.pth
"""


def resnext50_32x4d(num_classes=1000, include_top=True):
groups = 32
width_per_group = 4
return ResNet(Bottleneck, [3, 4, 6, 3],
num_classes=num_classes,
include_top=include_top,
groups=groups,
width_per_group=width_per_group)


"""
# resnext101_32x8d 结构
# https://download.pytorch.org/models/resnext101_32x8d-8ba56ff5.pth
"""


def resnext101_32x8d(num_classes=1000, include_top=True):
groups = 32
width_per_group = 8
return ResNet(Bottleneck, [3, 4, 23, 3],
num_classes=num_classes,
include_top=include_top,
groups=groups,
width_per_group=width_per_group)


class ResNetImageClass:
def __init__(self):
# self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.device = torch.device("cpu")
self.data_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
self.json_path = "im_weight_Res/class_j/class_indices.json"
self.weights_path = f"im_weight_Res/weights/{ResNet_model}"

self.model = resnet152(num_classes=13).to(self.device)
self.model.load_state_dict(torch.load(self.weights_path, map_location=self.device))
self.model.eval()

with open(self.json_path, 'r') as f:
self.class_indices = json.load(f)

def detection(self, img):
img = self.data_transform(img) # [N, C H, W]
img = torch.unsqueeze(img, dim=0)
with torch.no_grad():
output = torch.squeeze(self.model(img.to(self.device))).cpu()
predict = torch.softmax(output, dim=0)
predict_cla = torch.argmax(predict).numpy()
result = {
'class': predict_cla.tolist(),
'prob': predict[predict_cla].numpy().tolist()
}
return result

训练

train.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
训练(GPU)
"""
import os
import sys
import json
import time
import torch
import torch.nn as nn
from torchvision import transforms, datasets, utils
import matplotlib.pyplot as plt
import numpy as np
import torch.optim as optim
from tqdm import tqdm

from model import resnet152


def main():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"use device is {device}")

data_transform = {
"train": transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
"val": transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])
}
data_root = os.path.abspath(os.path.join(os.getcwd(), "./"))
image_path = "/exp/work/algorithm/vgg/data_set/data"
assert os.path.exists(image_path), "{} path does not exist.".format(image_path)
train_dataset = datasets.ImageFolder(root=os.path.join(image_path, "train"),
transform=data_transform["train"]
)
train_num = len(train_dataset)
flower_list = train_dataset.class_to_idx
cla_dict = dict((val, key) for key, val in flower_list.items())
json_str = json.dumps(cla_dict, indent=12)
with open("calss_indices.json", 'w') as json_file:
json_file.write(json_str)

batch_size = 16
nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # 线程数计算
nw = 0
print(f"Using {nw} dataloader workers every process.")

train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=nw
)
val_dataset = datasets.ImageFolder(root=os.path.join(image_path, "val"),
transform=data_transform["val"]
)
val_num = len(val_dataset)
val_loader = torch.utils.data.DataLoader(val_dataset,
batch_size=4,
shuffle=False,
num_workers=nw
)
print(f"Using {train_num} images for training, {val_num} images for validation.")

net = resnet152() # 实例化网络
# load pretrain weights
# download url: https://download.pytorch.org/models/resnet34-333f7ec4.pth
# """
model_weight_path = "./ResNet152_GPU_v2.pth"
assert os.path.exists(model_weight_path), "file {} does not exist.".format(model_weight_path)
net.load_state_dict(torch.load(model_weight_path, map_location='cpu'))
# """
# for param in net.parameters():
# param.requires_grad = False

# change fc layer structure
in_channel = net.fc.in_features
net.fc = nn.Linear(in_channel, 13) # (5分类)
net.to(device)
loss_function = nn.CrossEntropyLoss()

# construct an optimizer
# optimizer = optim.Adam(net.parameters(), lr=0.0001)
params = [p for p in net.parameters() if p.requires_grad]
optimizer = optim.Adam(params, lr=0.0001)

epochs = 300
save_path = "./ResNet152_GPU_RE.pth"
best_accuracy = 0.0
train_steps = len(train_loader)
for epoch in range(epochs):
net.train()
running_loss = 0.0
train_bar = tqdm(train_loader, file=sys.stdout)
for step, data in enumerate(train_bar):
images, labels = data
optimizer.zero_grad()
logits = net(images.to(device))
loss = loss_function(logits, labels.to(device)) # 计算损失函数
loss.backward()
optimizer.step()
running_loss += loss.item()
train_bar.desc = "train epoch [{}/{}] loss:{:.3f}".format(epoch + 1,
epochs,
loss
)
# 验证
net.eval()
acc = 0.0
with torch.no_grad():
val_bar = tqdm(val_loader, file=sys.stdout)
for val_data in val_bar:
val_images, val_labels = val_data
outputs = net(val_images.to(device))
predict_y = torch.max(outputs, dim=1)[1]
acc += torch.eq(predict_y, val_labels.to(device)).sum().item()
val_accuracy = acc / val_num
print("[epoch %d ] train_loss: %3f val_accurancy: %3f" %
(epoch + 1, running_loss / train_steps, val_accuracy))
if val_accuracy > best_accuracy:
best_accuracy = val_accuracy
torch.save(net.state_dict(), save_path)
print("Finished Training.")


if __name__ == '__main__':
main()

预测

predict.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
"""
预测
"""
from io import BytesIO
from urllib.request import urlopen
from PIL import Image, ImageFile

from im_weight_Res.model import ResNetImageClass

if __name__ == '__main__':
im_class = ResNetImageClass()
url = 'http://192.168.3.18:300/files/group1/M00/11/0E/wKgCBWRUgjWAGTfNAAEKiw0XSZc371.jpg'
img_bytes = urlopen(url).read()
img_pil = Image.open(BytesIO(img_bytes))

print(im_class.detection(img_pil))
print(im_class.detection(Image.open("-532576361772532412_120.jpg").convert("RGB")))

AlexNet

神经网络

model.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import torch.nn as nn
import torch


class AlexNet(nn.Module):
def __init__(self, num_classes=1000, init_weights=False):
super(AlexNet, self).__init__()
"""
特征提取
"""
self.features = nn.Sequential(
nn.Conv2d(3, 48, kernel_size=11, stride=4, padding=2), # 输入[3, 224, 224] 输出[48, 55, 55]
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2), # 输出 [48,27,27]
nn.Conv2d(48, 128, kernel_size=5, padding=2), # 输出 [128, 27, 27]
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2), # 输出 [128, 13, 13]
nn.Conv2d(128, 192, kernel_size=3, padding=1), # 输出[192, 13, 13]
nn.ReLU(inplace=True),
nn.Conv2d(192, 192, kernel_size=3, padding=1), # 输出[192, 13, 13]
nn.ReLU(inplace=True),
nn.Conv2d(192, 128, kernel_size=3, padding=1), # 输出[128, 13, 13]
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2) # 输出 [128, 6, 6]
)
"""
分类器
"""
self.classifier = nn.Sequential(
nn.Dropout(p=0.5), # Dropout 随机失活神经元, 比例诶0.5
nn.Linear(128 * 6 * 6, 2048),
nn.ReLU(inplace=True),
nn.Dropout(p=0.5),
nn.Linear(2048, 2048),
nn.ReLU(inplace=True),
nn.Linear(2048, num_classes)
)
if init_weights:
self._initialize_weights()

def forward(self, x):
x = self.features(x)
x = torch.flatten(x, start_dim=1)
x = self.classifier(x)

return x

"""
权重初始化
"""
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0.01)
nn.init.constant_(m.bias, 0)

训练

train.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
训练(GPU)
"""
import os
import sys
import json
import time
import torch
import torch.nn as nn
from torchvision import transforms, datasets, utils
import matplotlib.pyplot as plt
import numpy as np
import torch.optim as optim
from tqdm import tqdm

from model import AlexNet


def main():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"use device is {device}")

data_transform = {
"train": transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]),
"val": transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])
}
data_root = os.path.abspath(os.path.join(os.getcwd(), "./"))
image_path = os.path.join(data_root, "data_set", "flower_data")
assert os.path.exists(image_path), "{} path does not exist.".format(image_path)
train_dataset = datasets.ImageFolder(root=os.path.join(image_path, "train"),
transform=data_transform["train"]
)
train_num = len(train_dataset)
flower_list = train_dataset.class_to_idx
cla_dict = dict((val, key) for key, val in flower_list.items())
json_str = json.dumps(cla_dict, indent=4)
with open("calss_indices.json", 'w') as json_file:
json_file.write(json_str)

batch_size = 32
nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # 线程数计算
nw = 0
print(f"Using {nw} dataloader workers every process.")

train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=nw
)
val_dataset = datasets.ImageFolder(root=os.path.join(image_path, "val"),
transform=data_transform["val"]
)
val_num = len(val_dataset)
val_loader = torch.utils.data.DataLoader(val_dataset,
batch_size=4,
shuffle=False,
num_workers=nw
)
print(f"Using {train_num} images for training, {val_num} images for validation.")

net = AlexNet(num_classes=5, init_weights=True)
net.to(device)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.0002)

epochs = 10
save_path = "./AlexNet.pth"
best_accuracy = 0.0
train_steps = len(train_loader)
for epoch in range(epochs):
net.train()
running_loss = 0.0
train_bar = tqdm(train_loader, file=sys.stdout)
for step, data in enumerate(train_bar):
images, labels = data
optimizer.zero_grad()
outputs = net(images.to(device))
loss = loss_function(outputs, labels.to(device))
loss.backward()
optimizer.step()
running_loss += loss.item()
train_bar.desc = "train epoch [{}/{}] loss:{:.3f}".format(epoch + 1,
epochs,
loss
)
# 验证
net.eval()
acc = 0.0
with torch.no_grad():
val_bar = tqdm(val_loader, file=sys.stdout)
for val_data in val_bar:
val_images, val_labels = val_data
outputs = net(val_images.to(device))
predict_y = torch.max(outputs, dim=1)[1]
acc += torch.eq(predict_y, val_labels.to(device)).sum().item()
val_accuracy = acc / val_num
print("[epoch %d ] train_loss: %3f val_accurancy: %3f" %
(epoch + 1, running_loss / train_steps, val_accuracy))
if val_accuracy > best_accuracy:
best_accuracy = val_accuracy
torch.save(net.state_dict(), save_path)
print("Finshed Training.")


if __name__ == '__main__':
main()

预测

(暂未封装)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
"""
预测
"""

import os
import json
import torch
from PIL import Image
from torchvision import transforms
import matplotlib.pyplot as plt
from model import AlexNet


def main():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

data_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

image_path = "./sunflowers01.jpg"
img = Image.open(image_path)
plt.imshow(img)
img = data_transform(img) # [N, C H, W]
img = torch.unsqueeze(img, dim=0) # 维度扩展
# print(f"img={img}")
json_path = "./calss_indices.json"
with open(json_path, 'r') as f:
class_indict = json.load(f)

# model = AlexNet(num_classes=5).to(device) # GPU
model = AlexNet(num_classes=5) # CPU
weights_path = "./AlexNet.pth"
model.load_state_dict(torch.load(weights_path))
model.eval() # 关闭 Dorpout
with torch.no_grad():
# output = torch.squeeze(model(img.to(device))).cpu() #GPU
output = torch.squeeze(model(img)) # 维度压缩
predict = torch.softmax(output, dim=0)
predict_cla = torch.argmax(predict).numpy()
print_res = "class: {} prob: {:.3}".format(class_indict[str(predict_cla)],
predict[predict_cla].numpy())
plt.title(print_res)
# for i in range(len(predict)):
# print("class: {} prob: {:.3}".format(class_indict[str(predict_cla)],
# predict[predict_cla].numpy()))
plt.show()


if __name__ == '__main__':
main()

LeNet

神经网络

model.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import torch.nn as nn
import torch.nn.functional as F


class LeNet(nn.Module): # 集成nn.Module父类
def __init__(self):
super(LeNet, self).__init__()

# 看一下具体的参数
self.conv1 = nn.Conv2d(in_channels=3,
out_channels=16,
kernel_size=5,
stride=1,
padding=0,
bias=True
)
self.pool1 = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(16, 32, 5)
self.pool2 = nn.MaxPool2d(2, 2)
self.fc1 = nn.Linear(32 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)

# self.relu = nn.ReLU(inplace=True)

# 正向传播
def forward(self, x):
x = F.relu(self.conv1(x)) # 输入: (3, 32, 32), 输出: (16, 28, 28)
x = self.pool1(x) # 输出: (16, 14, 14)
x = F.relu(self.conv2(x)) # 输出: (32, 10, 10)
x = self.pool2(x) # 输出: (32, 5, 5)
x = x.view(-1, 32 * 5 * 5) # 输出: (32*5*5)
x = F.relu(self.fc1(x)) # 输出: (120)
x = F.relu(self.fc2(x)) # 输出: (84)
x = self.fc3(x) # 输出(10)

return x

训练

train.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
训练
"""
import torch
import torchvision
import torch.nn as nn
from model import LeNet
import torch.optim as optim
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import time


def main():
transform = transforms.Compose([
transforms.ToTensor(), # 数据转为张量
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) # 标准化处理
])
# 导入训练集数据(50000张图片)
train_set = torchvision.datasets.CIFAR10(root='./data', # root: 数据集存储路径
train=True, # 数据集为训练集
download=False, # download: True时下载数据集(下载完成修改为False)
transform=transform # 数据预处理
)
# 加载训练集
train_loader = torch.utils.data.DataLoader(train_set, # 加载训练集
batch_size=50, # batch 大小
shuffle=True, # 是否随机打乱训练集
num_workers=0 # 使用的线程数量
)
# 导入测试集(10000张图片)
val_set = torchvision.datasets.CIFAR10(root='./data',
train=False, # 数据集为测试集
download=False,
transform=transform
)
# 加载测试集数据
val_loader = torch.utils.data.DataLoader(val_set,
batch_size=10000, # 测试集batch大小
shuffle=False,
num_workers=0
)
# 获取测试集中的图片和标签
val_data_iter = iter(val_loader)
# val_image, val_label = val_data_iter.next()
val_image, val_label = next(val_data_iter) # python 3

"""
# -------------------------------------------------------------------------------------------
查看数据集, 注意修改查看数据集的 batch
"""
# 定义的分类标签
# class_labels = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
# 查看数据集的图片
# def img_show(img):
# img = img / 2 + 0.5
# np_img = img.numpy()
# plt.imshow(np.transpose(np_img, (1, 2, 0)))
# plt.show()
#
# # 查看数据集中的5张图像
# print(''.join(" %5s " % class_labels[val_label[j]] for j in range(5)))
# img_show(torchvision.utils.make_grid(val_image))
"""
# -------------------------------------------------------------------------------------------
"""

# 检查是否支持CPU
# if torch.cuda.is_available():
# use_dev = torch.device("cuda")
# else:
# use_dev = torch.device("cpu")
# print(use_dev)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

net = LeNet() # 用于训练的网络模型
# 指定GPU or CPU 进行训练
net.to("cpu")
loss_function = nn.CrossEntropyLoss() # 损失函数(交叉熵函数)
optimizer = optim.Adam(net.parameters(), lr=0.001) # 优化器(训练参数, 学习率)

# 训练的轮数
for epoch in range(5):
start_time = time.perf_counter()
running_loss = 0.0
# 遍历训练集, 从0开始
for step, data in enumerate(train_loader, start=0):
inputs, labels = data # 得到训练集图片和标签
optimizer.zero_grad() # 清除历史梯度
outputs = net(inputs) # 正向传播
loss = loss_function(outputs, labels) # 损失计算
loss.backward() # 反向传播
optimizer.step() # 优化器更新参数

# 用于打印精确率等评估参数
running_loss += loss.item()
if step % 500 == 499: # 500步打印一次
with torch.no_grad():
outputs = net(val_image) # 传入测试集数据
predict_y = torch.max(outputs, dim=1)[1]
accuracy = torch.eq(predict_y, val_label).sum().item() / val_label.size(0)

# 打印训练轮数、精确率等
print("[%d, %5d] train_loss: %.3f test_accuracy: %.3f" %
(epoch + 1, step + 1, running_loss / 500, accuracy)
)
running_loss = 0.0
end_time = time.perf_counter()
print("cost time = ", end_time - start_time)

print("Finished trainning")

save_path = "./LeNet.pth"
torch.save(net.state_dict(), save_path) # 保存训练输出的模型文件


if __name__ == '__main__':
main()

预测

(暂未封装)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
""""
测试
"""
import torch
import torchvision.transforms as transforms
from PIL import Image
from model import LeNet


def main():
transform = transforms.Compose([
transforms.Resize((32, 32)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

data_class = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

net = LeNet()
net.load_state_dict(torch.load('LeNet.pth'))
# net.load_state_dict(torch.load('LeNet.pth', map_location=torch.device("cpu")))

test_image = Image.open('cat_test2.jpg')
test_image = transform(test_image) # [C H W]
test_image = torch.unsqueeze(test_image, dim=0) # [N C H W]

with torch.no_grad():
outputs = net(test_image)
predict = torch.max(outputs, dim=1)[1].numpy()
print(f"It is {data_class[int(predict)]}")


if __name__ == '__main__':
main()

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2025 青域 All Rights Reserved.

UV : | PV :