时间:2025-07-18 作者:游乐小编
AsymmNet源于德、中三方团队思路,通过调整卷积计算量分配提升性能,在特定区间效果显著。文中介绍了数据集处理、模型开发与训练等,显示其训练及收敛快,性能优。
论文地址:https://arxiv.org/pdf/2104.07770.pdf
我们将网上获取的数据集以压缩包的方式上传到aistudio数据集中,并加载到我们的项目内。
在使用之前我们进行数据集压缩包的一个解压。
In [ ]!unzip -oq /home/aistudio/data/data69664/Images.zip -d work/dataset登录后复制In [ ]
import paddleimport numpy as npfrom typing import Callable#参数配置config_parameters = { "class_dim": 16, #分类数 "target_path":"/home/aistudio/work/", 'train_image_dir': '/home/aistudio/work/trainImages', 'eval_image_dir': '/home/aistudio/work/evalImages', 'epochs':100, 'batch_size': 32, 'lr': 0.01}登录后复制
接下来我们使用标注好的文件进行数据集类的定义,方便后续模型训练使用。
In [ ]import osimport shutiltrain_dir = config_parameters['train_image_dir']eval_dir = config_parameters['eval_image_dir']paths = os.listdir('work/dataset/Images')if not os.path.exists(train_dir): os.mkdir(train_dir)if not os.path.exists(eval_dir): os.mkdir(eval_dir)for path in paths: imgs_dir = os.listdir(os.path.join('work/dataset/Images', path)) target_train_dir = os.path.join(train_dir,path) target_eval_dir = os.path.join(eval_dir,path) if not os.path.exists(target_train_dir): os.mkdir(target_train_dir) if not os.path.exists(target_eval_dir): os.mkdir(target_eval_dir) for i in range(len(imgs_dir)): if ' ' in imgs_dir[i]: new_name = imgs_dir[i].replace(' ', '_') else: new_name = imgs_dir[i] target_train_path = os.path.join(target_train_dir, new_name) target_eval_path = os.path.join(target_eval_dir, new_name) if i % 5 == 0: shutil.copyfile(os.path.join(os.path.join('work/dataset/Images', path), imgs_dir[i]), target_eval_path) else: shutil.copyfile(os.path.join(os.path.join('work/dataset/Images', path), imgs_dir[i]), target_train_path)print('finished train val split!')登录后复制
finished train val split!登录后复制
我们先看一下解压缩后的数据集长成什么样子,对比分析经典模型在Caltech101抽取16类mini版数据集上的效果
In [ ]import osimport randomfrom matplotlib import pyplot as pltfrom PIL import Imageimgs = []paths = os.listdir('work/dataset/Images')for path in paths: img_path = os.path.join('work/dataset/Images', path) if os.path.isdir(img_path): img_paths = os.listdir(img_path) img = Image.open(os.path.join(img_path, random.choice(img_paths))) imgs.append((img, path))f, ax = plt.subplots(4, 4, figsize=(12,12))for i, img in enumerate(imgs[:16]): ax[i//4, i%4].imshow(img[0]) ax[i//4, i%4].axis('off') ax[i//4, i%4].set_title('label: %s' % img[1])plt.show()登录后复制
登录后复制
#数据集的定义class Dataset(paddle.io.Dataset): """ 步骤一:继承paddle.io.Dataset类 """ def __init__(self, transforms: Callable, mode: str ='train'): """ 步骤二:实现构造函数,定义数据读取方式 """ super(Dataset, self).__init__() self.mode = mode self.transforms = transforms train_image_dir = config_parameters['train_image_dir'] eval_image_dir = config_parameters['eval_image_dir'] train_data_folder = paddle.vision.DatasetFolder(train_image_dir) eval_data_folder = paddle.vision.DatasetFolder(eval_image_dir) if self.mode == 'train': self.data = train_data_folder elif self.mode == 'eval': self.data = eval_data_folder def __getitem__(self, index): """ 步骤三:实现__getitem__方法,定义指定index时如何获取数据,并返回单条数据(训练数据,对应的标签) """ data = np.array(self.data[index][0]).astype('float32') data = self.transforms(data) label = np.array([self.data[index][1]]).astype('int64') return data, label def __len__(self): """ 步骤四:实现__len__方法,返回数据集总数目 """ return len(self.data)登录后复制In [ ]
from paddle.vision import transforms as T#数据增强transform_train =T.Compose([T.Resize((256,256)), #T.RandomVerticalFlip(10), #T.RandomHorizontalFlip(10), T.RandomRotation(10), T.Transpose(), T.Normalize(mean=[0, 0, 0], # 像素值归一化 std =[255, 255, 255]), # transforms.ToTensor(), # transpose操作 + (img / 255),并且数据结构变为PaddleTensor T.Normalize(mean=[0.50950350, 0.54632660, 0.57409690],# 减均值 除标准差 std= [0.26059777, 0.26041326, 0.29220656])# 计算过程:output[channel] = (input[channel] - mean[channel]) / std[channel] ])transform_eval =T.Compose([ T.Resize((256,256)), T.Transpose(), T.Normalize(mean=[0, 0, 0], # 像素值归一化 std =[255, 255, 255]), # transforms.ToTensor(), # transpose操作 + (img / 255),并且数据结构变为PaddleTensor T.Normalize(mean=[0.50950350, 0.54632660, 0.57409690],# 减均值 除标准差 std= [0.26059777, 0.26041326, 0.29220656])# 计算过程:output[channel] = (input[channel] - mean[channel]) / std[channel] ])登录后复制
根据所使用的数据集需求实例化数据集类,并查看总样本量。
In [ ]train_dataset =Dataset(mode='train',transforms=transform_train)eval_dataset =Dataset(mode='eval', transforms=transform_eval )#数据异步加载train_loader = paddle.io.DataLoader(train_dataset, places=paddle.CUDAPlace(0), batch_size=32, shuffle=True, #num_workers=2, #use_shared_memory=True )eval_loader = paddle.io.DataLoader (eval_dataset, places=paddle.CUDAPlace(0), batch_size=32, #num_workers=2, #use_shared_memory=True )print('训练集样本量: {},验证集样本量: {}'.format(len(train_loader), len(eval_loader)))登录后复制
训练集样本量: 45,验证集样本量: 12登录后复制
本次我们选取了经典的卷积神经网络resnet50,vgg19,mobilenet_v2来进行实验比较。
In [ ]network = paddle.vision.models.mobilenet_v2(num_classes=16)#模型封装model = paddle.Model(network)#模型可视化model.summary((-1, 3,256 , 256))登录后复制
#优化器选择class SaveBestModel(paddle.callbacks.Callback): def __init__(self, target=0.5, path='work/best_model', verbose=0): self.target = target self.epoch = None self.path = path def on_epoch_end(self, epoch, logs=None): self.epoch = epoch def on_eval_end(self, logs=None): if logs.get('acc') > self.target: self.target = logs.get('acc') self.model.save(self.path) print('best acc is {} at epoch {}'.format(self.target, self.epoch))callback_visualdl = paddle.callbacks.VisualDL(log_dir='work/vgg19')callback_savebestmodel = SaveBestModel(target=0.5, path='work/best_model')callbacks = [callback_visualdl, callback_savebestmodel]base_lr = config_parameters['lr']epochs = config_parameters['epochs']def make_optimizer(parameters=None): momentum = 0.9 learning_rate= paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=base_lr, T_max=epochs, verbose=False) weight_decay=paddle.regularizer.L2Decay(0.0001) optimizer = paddle.optimizer.Momentum( learning_rate=learning_rate, momentum=momentum, weight_decay=weight_decay, parameters=parameters) return optimizeroptimizer = make_optimizer(model.parameters())model.prepare(optimizer, paddle.nn.CrossEntropyLoss(), paddle.metric.Accuracy())model.fit(train_loader, eval_loader, epochs=100, batch_size=1, # 是否打乱样本集 callbacks=callbacks, verbose=1) # 日志展示格式登录后复制
在总结了以前的工作后,作者认为要实现更节约资源的设计,feature reuse是有效的操作。
图1 Asymmetrical bottlenecks模块细节示意图
In [ ]__all__ = ['AsymmNet_Large', 'AsymmNet_Small', 'AsymmNet']import paddleimport paddle.nn as nnimport mathimport paddle.nn.functional as Fclass HardSigmoid(nn.Layer): def __init__(self): super(HardSigmoid, self).__init__() self.relu = nn.ReLU6() def forward(self, x): return self.relu(x + 3) / 6class HardSwish(nn.Layer): def __init__(self): super(HardSwish, self).__init__() self.sigmoid = HardSigmoid() def forward(self, x): return x * self.sigmoid(x)class Activation(nn.Layer): def __init__(self, act_func): super(Activation, self).__init__() if act_func == "relu": self.act = nn.ReLU() elif act_func == "ReLU6": self.act = nn.ReLU6() elif act_func == "hard_sigmoid": self.act = HardSigmoid() elif act_func == "hard_swish": self.act = HardSwish() else: raise NotImplementedError def forward(self, x): return self.act(x)def make_divisible(x, divisible_by=8): return int(math.ceil(x * 1. / divisible_by) * divisible_by)class _BasicUnit(nn.Layer): def __init__(self, num_in, num_out, kernel_size=1, strides=1, pad=0, num_groups=1, use_act=True, act_type="relu", norm_layer=nn.BatchNorm2D): super(_BasicUnit, self).__init__() self.use_act = use_act self.conv = nn.Conv2D(in_channels=num_in, out_channels=num_out, kernel_size=kernel_size, stride=strides, padding=pad, groups=num_groups, bias_attr=False, ) self.bn = norm_layer(num_out) if use_act is True: self.act = Activation(act_type) def forward(self, x): out = self.conv(x) out = self.bn(out) if self.use_act: out = self.act(out) return outclass SE_Module(nn.Layer): def __init__(self, channels, reduction=4): super(SE_Module, self).__init__() self.Avg = nn.AdaptiveAvgPool2D(1) reduction_c = make_divisible(channels // reduction) self.out = nn.Sequential( nn.Conv2D(channels, reduction_c, 1, bias_attr=True), nn.ReLU(), nn.Conv2D(reduction_c, channels, 1, bias_attr=True), HardSigmoid() ) def forward(self, x): y = self.Avg(x) y = self.out(y) return x * yclass AsymmBottleneck(nn.Layer): def __init__(self, num_in, num_mid, num_out, kernel_size, asymmrate=1, act_type="relu", use_se=False, strides=1, norm_layer=nn.BatchNorm2D): super(AsymmBottleneck, self).__init__() assert isinstance(asymmrate, int) self.asymmrate = asymmrate self.use_se = use_se self.use_short_cut_conv = (num_in == num_out and strides == 1) self.do_expand = (num_mid > max(num_in, asymmrate * num_in)) if self.do_expand: self.expand = _BasicUnit(num_in, num_mid - asymmrate * num_in, kernel_size=1, strides=1, pad=0, act_type=act_type, norm_layer=norm_layer) num_mid += asymmrate * num_in self.dw_conv = _BasicUnit(num_mid, num_mid, kernel_size, strides, pad=self._get_pad(kernel_size), act_type=act_type, num_groups=num_mid, norm_layer=norm_layer) if self.use_se: self.se = SE_Module(num_mid) self.pw_conv_linear = _BasicUnit(num_mid, num_out, kernel_size=1, strides=1, pad=0, act_type=act_type, use_act=False, norm_layer=norm_layer, num_groups=1) def forward(self, x): if self.do_expand: out = self.expand(x) feat = [] for i in range(self.asymmrate): feat.append(x) feat.append(out) for i in range(self.asymmrate): feat.append(x) if self.asymmrate > 0: out = paddle.concat(feat, axis=1) else: out = x out = self.dw_conv(out) if self.use_se: out = self.se(out) out = self.pw_conv_linear(out) if self.use_short_cut_conv: return x + out return out def _get_pad(self, kernel_size): if kernel_size == 1: return 0 elif kernel_size == 3: return 1 elif kernel_size == 5: return 2 elif kernel_size == 7: return 3 else: raise NotImplementedErrordef get_asymmnet_cfgs(model_name): if model_name == 'asymmnet_large': inplanes = 16 cfg = [ # k, exp, c, se, nl, s, # stage1 [3, 16, 16, False, 'relu', 1], # stage2 [3, 64, 24, False, 'relu', 2], [3, 72, 24, False, 'relu', 1], # stage3 [5, 72, 40, True, 'relu', 2], [5, 120, 40, True, 'relu', 1], [5, 120, 40, True, 'relu', 1], # stage4 [3, 240, 80, False, 'hard_swish', 2], [3, 200, 80, False, 'hard_swish', 1], [3, 184, 80, False, 'hard_swish', 1], [3, 184, 80, False, 'hard_swish', 1], [3, 480, 112, True, 'hard_swish', 1], [3, 672, 112, True, 'hard_swish', 1], # stage5 [5, 672, 160, True, 'hard_swish', 2], [5, 960, 160, True, 'hard_swish', 1], [5, 960, 160, True, 'hard_swish', 1], ] cls_ch_squeeze = 960 cls_ch_expand = 1280 elif model_name == 'asymmnet_small': inplanes = 16 cfg = [ # k, exp, c, se, nl, s, [3, 16, 16, True, 'relu', 2], [3, 72, 24, False, 'relu', 2], [3, 88, 24, False, 'relu', 1], [5, 96, 40, True, 'hard_swish', 2], [5, 240, 40, True, 'hard_swish', 1], [5, 240, 40, True, 'hard_swish', 1], [5, 120, 48, True, 'hard_swish', 1], [5, 144, 48, True, 'hard_swish', 1], [5, 288, 96, True, 'hard_swish', 2], [5, 576, 96, True, 'hard_swish', 1], [5, 576, 96, True, 'hard_swish', 1], ] cls_ch_squeeze = 576 cls_ch_expand = 1280 else: raise ValueError('{} model_name is not supported now!'.format(model_name)) return inplanes, cfg, cls_ch_squeeze, cls_ch_expandclass AsymmNet(nn.Layer): def __init__(self, cfgs_name, num_classes=config_parameters['class_dim'], multiplier=1.0, asymmrate=1, dropout_rate=0.2, norm_layer=nn.BatchNorm2D): super(AsymmNet, self).__init__() inplanes, cfg, cls_ch_squeeze, cls_ch_expand = get_asymmnet_cfgs(cfgs_name) k = multiplier self.inplanes = make_divisible(inplanes * k) self.first_block = nn.Sequential( nn.Conv2D(3, self.inplanes, 3, 2, 1, bias_attr=False), nn.BatchNorm2D(self.inplanes), HardSwish(), ) asymm_layers = [] for layer_cfg in cfg: layer = self._make_layer(kernel_size=layer_cfg[0], exp_ch=make_divisible(k * layer_cfg[1]), out_channel=make_divisible(k * layer_cfg[2]), use_se=layer_cfg[3], act_func=layer_cfg[4], asymmrate=asymmrate, stride=layer_cfg[5], norm_layer=norm_layer, ) asymm_layers.append(layer) self.asymm_block = nn.Sequential(*asymm_layers) self.last_block = nn.Sequential( nn.Conv2D(self.inplanes, make_divisible(k * cls_ch_squeeze), 1, bias_attr=False), nn.BatchNorm2D(make_divisible(k * cls_ch_squeeze)), HardSwish(), nn.AdaptiveAvgPool2D(1), nn.Conv2D(make_divisible(k * cls_ch_squeeze), cls_ch_expand, 1, bias_attr=False), HardSwish(), nn.Dropout2D(p=dropout_rate), nn.Flatten(), ) self.output = nn.Linear(cls_ch_expand, num_classes) def _make_layer(self, kernel_size, exp_ch, out_channel, use_se, act_func, asymmrate, stride, norm_layer): mid_planes = exp_ch out_planes = out_channel layer = AsymmBottleneck(self.inplanes, mid_planes, out_planes, kernel_size, asymmrate, act_func, strides=stride, use_se=use_se, norm_layer=norm_layer) self.inplanes = out_planes return layer def forward(self, x): x = self.first_block(x) x = self.asymm_block(x) x = self.last_block(x) x = self.output(x) return xclass AsymmNet_Large(AsymmNet): def __init__(self, **kwargs): super(AsymmNet_Large, self).__init__(cfgs_name='asymmnet_large', **kwargs)class AsymmNet_Small(AsymmNet): def __init__(self, **kwargs): super(AsymmNet_Small, self).__init__(cfgs_name='asymmnet_small', **kwargs)if __name__ == '__main__': img = paddle.rand((1, 3, 256, 256)) vit = AsymmNet_Large() out = vit(img) print(out.shape)登录后复制
[1, 16]登录后复制
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/paddle/nn/layer/norm.py:641: UserWarning: When training, we now always track global mean and variance. "When training, we now always track global mean and variance.")登录后复制
model = AsymmNet_Large()model = paddle.Model(model)model.summary((1,3,256,256))登录后复制In [ ]
#优化器选择class SaveBestModel(paddle.callbacks.Callback): def __init__(self, target=0.5, path='work/best_model', verbose=0): self.target = target self.epoch = None self.path = path def on_epoch_end(self, epoch, logs=None): self.epoch = epoch def on_eval_end(self, logs=None): if logs.get('acc') > self.target: self.target = logs.get('acc') self.model.save(self.path) print('best acc is {} at epoch {}'.format(self.target, self.epoch))callback_visualdl = paddle.callbacks.VisualDL(log_dir='work/AsymmNet_Net')callback_savebestmodel = SaveBestModel(target=0.5, path='work/best_model')callbacks = [callback_visualdl, callback_savebestmodel]base_lr = config_parameters['lr']epochs = config_parameters['epochs']def make_optimizer(parameters=None): momentum = 0.9 learning_rate= paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=base_lr, T_max=epochs, verbose=False) weight_decay=paddle.regularizer.L2Decay(0.0002) optimizer = paddle.optimizer.Momentum( learning_rate=learning_rate, momentum=momentum, weight_decay=weight_decay, parameters=parameters) return optimizeroptimizer = make_optimizer(model.parameters())登录后复制In [ ]
model.prepare(optimizer, paddle.nn.CrossEntropyLoss(), paddle.metric.Accuracy())登录后复制In [32]
model.fit(train_loader, eval_loader, epochs=100, batch_size=1, # 是否打乱样本集 callbacks=callbacks, verbose=1) # 日志展示格式登录后复制
AsymmNet模型训练速度很快,模型收敛的速度也非常快,性能有了大幅度的提升。
2021-11-05 11:52
手游攻略2021-11-19 18:38
手游攻略2021-10-31 23:18
手游攻略2022-06-03 14:46
游戏资讯2025-06-28 12:37
单机攻略