当前位置: 首页 > AI > 文章内容页

AsymmNet :使用非对称瓶颈实现的超轻量卷积神经网络!

时间:2025-07-18    作者:游乐小编    

AsymmNet源于德、中三方团队思路,通过调整卷积计算量分配提升性能,在特定区间效果显著。文中介绍了数据集处理、模型开发与训练等,显示其训练及收敛快,性能优。

asymmnet :使用非对称瓶颈实现的超轻量卷积神经网络! - 游乐网

① 项目背景

AsymmNet的思路来自于德国的Hasso_Plattner_Institute以及阿里云/字节跳动等三家团队的贡献。论文中发现使用深度可分离卷积的系列模型,它的主要计算量都集中在了(两个)PointWise卷积上,因此基于Invered_residual_block的设计理念,也就是说把第一个Pointwise卷积用来扩充通道数,来提升信息流规模,就是团队做的一个共识的分析;作者团队认为第一个Pointwise卷积主要是用来扩充通道,第二个Depthwise卷积用来学习特征的空间相关性,然后接下来的Pointwise卷积是用来学习通道相关性,这就像是从Inception_BlockXception一条通路传承下来的工程共识,虽然它没有被理论证明过的。所以,这里就有一个设想,如果能够把第一个Pointwise卷积(就是用来做通道扩容、信息扩容这件事的卷积),把它的一部分的features直接用搬运的方式、用Copy的方式来做,省下来的计算量把它迁移到主要任务是在学习特征的Depthwise卷积和第二个Pointwise卷积上面,把它转移过去,这样可以提升这个Block的特征学习能力和表达能力? 基于这个的想法,通过简单的改变和尝试,作者发现它的实验结果是在220M FLOPs这样的计算区间和低于220M的那种超轻量CNN模型的计算区间上面它的性能提升是极为明显而有效的,且是可以优于Invered_residual_block原始设计的。

AsymmNet :使用非对称瓶颈实现的超轻量卷积神经网络! - 游乐网

论文地址:https://arxiv.org/pdf/2104.07770.pdf

② 数据准备

2.1 解压缩数据集

我们将网上获取的数据集以压缩包的方式上传到aistudio数据集中,并加载到我们的项目内。

在使用之前我们进行数据集压缩包的一个解压。

In [ ]
!unzip -oq /home/aistudio/data/data69664/Images.zip -d work/dataset
登录后复制In [ ]
import paddleimport numpy as npfrom typing import Callable#参数配置config_parameters = {    "class_dim": 16,  #分类数    "target_path":"/home/aistudio/work/",                         'train_image_dir': '/home/aistudio/work/trainImages',    'eval_image_dir': '/home/aistudio/work/evalImages',    'epochs':100,    'batch_size': 32,    'lr': 0.01}
登录后复制

2.2 划分数据集

接下来我们使用标注好的文件进行数据集类的定义,方便后续模型训练使用。

In [ ]
import osimport shutiltrain_dir = config_parameters['train_image_dir']eval_dir = config_parameters['eval_image_dir']paths = os.listdir('work/dataset/Images')if not os.path.exists(train_dir):    os.mkdir(train_dir)if not os.path.exists(eval_dir):    os.mkdir(eval_dir)for path in paths:    imgs_dir = os.listdir(os.path.join('work/dataset/Images', path))    target_train_dir = os.path.join(train_dir,path)    target_eval_dir = os.path.join(eval_dir,path)    if not os.path.exists(target_train_dir):        os.mkdir(target_train_dir)    if not os.path.exists(target_eval_dir):        os.mkdir(target_eval_dir)    for i in range(len(imgs_dir)):        if ' ' in imgs_dir[i]:            new_name = imgs_dir[i].replace(' ', '_')        else:            new_name = imgs_dir[i]        target_train_path = os.path.join(target_train_dir, new_name)        target_eval_path = os.path.join(target_eval_dir, new_name)             if i % 5 == 0:            shutil.copyfile(os.path.join(os.path.join('work/dataset/Images', path), imgs_dir[i]), target_eval_path)        else:            shutil.copyfile(os.path.join(os.path.join('work/dataset/Images', path), imgs_dir[i]), target_train_path)print('finished train val split!')
登录后复制
finished train val split!
登录后复制

2.3 数据集定义与数据集展示

2.3.1 数据集展示

我们先看一下解压缩后的数据集长成什么样子,对比分析经典模型在Caltech101抽取16类mini版数据集上的效果

In [ ]
import osimport randomfrom matplotlib import pyplot as pltfrom PIL import Imageimgs = []paths = os.listdir('work/dataset/Images')for path in paths:       img_path = os.path.join('work/dataset/Images', path)    if os.path.isdir(img_path):        img_paths = os.listdir(img_path)        img = Image.open(os.path.join(img_path, random.choice(img_paths)))        imgs.append((img, path))f, ax = plt.subplots(4, 4, figsize=(12,12))for i, img in enumerate(imgs[:16]):    ax[i//4, i%4].imshow(img[0])    ax[i//4, i%4].axis('off')    ax[i//4, i%4].set_title('label: %s' % img[1])plt.show()
登录后复制
登录后复制

2.3.2 导入数据集的定义实现

In [ ]
#数据集的定义class Dataset(paddle.io.Dataset):    """    步骤一:继承paddle.io.Dataset类    """    def __init__(self, transforms: Callable, mode: str ='train'):        """        步骤二:实现构造函数,定义数据读取方式        """        super(Dataset, self).__init__()                self.mode = mode        self.transforms = transforms        train_image_dir = config_parameters['train_image_dir']        eval_image_dir = config_parameters['eval_image_dir']        train_data_folder = paddle.vision.DatasetFolder(train_image_dir)        eval_data_folder = paddle.vision.DatasetFolder(eval_image_dir)                if self.mode  == 'train':            self.data = train_data_folder        elif self.mode  == 'eval':            self.data = eval_data_folder    def __getitem__(self, index):        """        步骤三:实现__getitem__方法,定义指定index时如何获取数据,并返回单条数据(训练数据,对应的标签)        """        data = np.array(self.data[index][0]).astype('float32')        data = self.transforms(data)        label = np.array([self.data[index][1]]).astype('int64')                return data, label            def __len__(self):        """        步骤四:实现__len__方法,返回数据集总数目        """        return len(self.data)
登录后复制In [ ]
from paddle.vision import transforms as T#数据增强transform_train =T.Compose([T.Resize((256,256)),                            #T.RandomVerticalFlip(10),                            #T.RandomHorizontalFlip(10),                            T.RandomRotation(10),                            T.Transpose(),                            T.Normalize(mean=[0, 0, 0],                           # 像素值归一化                                        std =[255, 255, 255]),                    # transforms.ToTensor(), # transpose操作 + (img / 255),并且数据结构变为PaddleTensor                            T.Normalize(mean=[0.50950350, 0.54632660, 0.57409690],# 减均值 除标准差                                            std= [0.26059777, 0.26041326, 0.29220656])# 计算过程:output[channel] = (input[channel] - mean[channel]) / std[channel]                            ])transform_eval =T.Compose([ T.Resize((256,256)),                            T.Transpose(),                            T.Normalize(mean=[0, 0, 0],                           # 像素值归一化                                        std =[255, 255, 255]),                    # transforms.ToTensor(), # transpose操作 + (img / 255),并且数据结构变为PaddleTensor                            T.Normalize(mean=[0.50950350, 0.54632660, 0.57409690],# 减均值 除标准差                                            std= [0.26059777, 0.26041326, 0.29220656])# 计算过程:output[channel] = (input[channel] - mean[channel]) / std[channel]                            ])
登录后复制

2.3.3 实例化数据集类

根据所使用的数据集需求实例化数据集类,并查看总样本量。

In [ ]
train_dataset =Dataset(mode='train',transforms=transform_train)eval_dataset  =Dataset(mode='eval', transforms=transform_eval )#数据异步加载train_loader = paddle.io.DataLoader(train_dataset,                                     places=paddle.CUDAPlace(0),                                     batch_size=32,                                     shuffle=True,                                    #num_workers=2,                                    #use_shared_memory=True                                    )eval_loader = paddle.io.DataLoader (eval_dataset,                                     places=paddle.CUDAPlace(0),                                     batch_size=32,                                    #num_workers=2,                                    #use_shared_memory=True                                    )print('训练集样本量: {},验证集样本量: {}'.format(len(train_loader), len(eval_loader)))
登录后复制
训练集样本量: 45,验证集样本量: 12
登录后复制

③ 模型选择和开发

3.1 对比网络构建

本次我们选取了经典的卷积神经网络resnet50,vgg19,mobilenet_v2来进行实验比较。

In [ ]
network = paddle.vision.models.mobilenet_v2(num_classes=16)#模型封装model = paddle.Model(network)#模型可视化model.summary((-1, 3,256 , 256))
登录后复制

3.2 对比网络训练

In [ ]
#优化器选择class SaveBestModel(paddle.callbacks.Callback):    def __init__(self, target=0.5, path='work/best_model', verbose=0):        self.target = target        self.epoch = None        self.path = path    def on_epoch_end(self, epoch, logs=None):        self.epoch = epoch    def on_eval_end(self, logs=None):        if logs.get('acc') > self.target:            self.target = logs.get('acc')            self.model.save(self.path)            print('best acc is {} at epoch {}'.format(self.target, self.epoch))callback_visualdl = paddle.callbacks.VisualDL(log_dir='work/vgg19')callback_savebestmodel = SaveBestModel(target=0.5, path='work/best_model')callbacks = [callback_visualdl, callback_savebestmodel]base_lr = config_parameters['lr']epochs = config_parameters['epochs']def make_optimizer(parameters=None):    momentum = 0.9    learning_rate= paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=base_lr, T_max=epochs, verbose=False)    weight_decay=paddle.regularizer.L2Decay(0.0001)    optimizer = paddle.optimizer.Momentum(        learning_rate=learning_rate,        momentum=momentum,        weight_decay=weight_decay,        parameters=parameters)    return optimizeroptimizer = make_optimizer(model.parameters())model.prepare(optimizer,              paddle.nn.CrossEntropyLoss(),              paddle.metric.Accuracy())model.fit(train_loader,          eval_loader,          epochs=100,          batch_size=1,           # 是否打乱样本集               callbacks=callbacks,           verbose=1)   # 日志展示格式
登录后复制

3.3 Asymmetrical bottlenecks

3.3.1 Asymmetrical bottlenecks模块的介绍

在总结了以前的工作后,作者认为要实现更节约资源的设计,feature reuse是有效的操作。

AsymmNet :使用非对称瓶颈实现的超轻量卷积神经网络! - 游乐网

图1 Asymmetrical bottlenecks模块细节示意图

In [ ]
__all__ = ['AsymmNet_Large', 'AsymmNet_Small', 'AsymmNet']import paddleimport paddle.nn as nnimport mathimport paddle.nn.functional as Fclass HardSigmoid(nn.Layer):    def __init__(self):        super(HardSigmoid, self).__init__()        self.relu = nn.ReLU6()    def forward(self, x):        return self.relu(x + 3) / 6class HardSwish(nn.Layer):    def __init__(self):        super(HardSwish, self).__init__()        self.sigmoid = HardSigmoid()    def forward(self, x):        return x * self.sigmoid(x)class Activation(nn.Layer):    def __init__(self, act_func):        super(Activation, self).__init__()        if act_func == "relu":            self.act = nn.ReLU()        elif act_func == "ReLU6":            self.act = nn.ReLU6()        elif act_func == "hard_sigmoid":            self.act = HardSigmoid()        elif act_func == "hard_swish":            self.act = HardSwish()        else:            raise NotImplementedError    def forward(self, x):        return self.act(x)def make_divisible(x, divisible_by=8):    return int(math.ceil(x * 1. / divisible_by) * divisible_by)class _BasicUnit(nn.Layer):    def __init__(self, num_in, num_out, kernel_size=1, strides=1, pad=0, num_groups=1,                 use_act=True, act_type="relu", norm_layer=nn.BatchNorm2D):        super(_BasicUnit, self).__init__()        self.use_act = use_act        self.conv = nn.Conv2D(in_channels=num_in, out_channels=num_out,                              kernel_size=kernel_size, stride=strides,                              padding=pad, groups=num_groups, bias_attr=False,                              )        self.bn = norm_layer(num_out)        if use_act is True:            self.act = Activation(act_type)    def forward(self, x):        out = self.conv(x)        out = self.bn(out)        if self.use_act:            out = self.act(out)        return outclass SE_Module(nn.Layer):    def __init__(self, channels, reduction=4):        super(SE_Module, self).__init__()        self.Avg = nn.AdaptiveAvgPool2D(1)        reduction_c = make_divisible(channels // reduction)        self.out = nn.Sequential(            nn.Conv2D(channels, reduction_c, 1, bias_attr=True),            nn.ReLU(),            nn.Conv2D(reduction_c, channels, 1, bias_attr=True),            HardSigmoid()        )    def forward(self, x):        y = self.Avg(x)        y = self.out(y)        return x * yclass AsymmBottleneck(nn.Layer):    def __init__(self, num_in, num_mid, num_out, kernel_size, asymmrate=1,                 act_type="relu", use_se=False, strides=1,                 norm_layer=nn.BatchNorm2D):        super(AsymmBottleneck, self).__init__()        assert isinstance(asymmrate, int)        self.asymmrate = asymmrate        self.use_se = use_se        self.use_short_cut_conv = (num_in == num_out and strides == 1)        self.do_expand = (num_mid > max(num_in, asymmrate * num_in))        if self.do_expand:            self.expand = _BasicUnit(num_in, num_mid - asymmrate * num_in,                                     kernel_size=1,                                     strides=1, pad=0, act_type=act_type,                                     norm_layer=norm_layer)            num_mid += asymmrate * num_in        self.dw_conv = _BasicUnit(num_mid, num_mid, kernel_size, strides,                                  pad=self._get_pad(kernel_size), act_type=act_type,                                  num_groups=num_mid, norm_layer=norm_layer)        if self.use_se:            self.se = SE_Module(num_mid)        self.pw_conv_linear = _BasicUnit(num_mid, num_out, kernel_size=1, strides=1,                                         pad=0, act_type=act_type, use_act=False,                                         norm_layer=norm_layer, num_groups=1)    def forward(self, x):        if self.do_expand:            out = self.expand(x)            feat = []            for i in range(self.asymmrate):                feat.append(x)            feat.append(out)            for i in range(self.asymmrate):                feat.append(x)            if self.asymmrate > 0:                out = paddle.concat(feat, axis=1)        else:            out = x        out = self.dw_conv(out)        if self.use_se:            out = self.se(out)        out = self.pw_conv_linear(out)        if self.use_short_cut_conv:            return x + out        return out    def _get_pad(self, kernel_size):        if kernel_size == 1:            return 0        elif kernel_size == 3:            return 1        elif kernel_size == 5:            return 2        elif kernel_size == 7:            return 3        else:            raise NotImplementedErrordef get_asymmnet_cfgs(model_name):    if model_name == 'asymmnet_large':        inplanes = 16        cfg = [            # k, exp, c,  se,     nl,  s,            # stage1            [3, 16, 16, False, 'relu', 1],            # stage2            [3, 64, 24, False, 'relu', 2],            [3, 72, 24, False, 'relu', 1],            # stage3            [5, 72, 40, True, 'relu', 2],            [5, 120, 40, True, 'relu', 1],            [5, 120, 40, True, 'relu', 1],            # stage4            [3, 240, 80, False, 'hard_swish', 2],            [3, 200, 80, False, 'hard_swish', 1],            [3, 184, 80, False, 'hard_swish', 1],            [3, 184, 80, False, 'hard_swish', 1],            [3, 480, 112, True, 'hard_swish', 1],            [3, 672, 112, True, 'hard_swish', 1],            # stage5            [5, 672, 160, True, 'hard_swish', 2],            [5, 960, 160, True, 'hard_swish', 1],            [5, 960, 160, True, 'hard_swish', 1],        ]        cls_ch_squeeze = 960        cls_ch_expand = 1280    elif model_name == 'asymmnet_small':        inplanes = 16        cfg = [            # k, exp, c,  se,     nl,  s,            [3, 16, 16, True, 'relu', 2],            [3, 72, 24, False, 'relu', 2],            [3, 88, 24, False, 'relu', 1],            [5, 96, 40, True, 'hard_swish', 2],            [5, 240, 40, True, 'hard_swish', 1],            [5, 240, 40, True, 'hard_swish', 1],            [5, 120, 48, True, 'hard_swish', 1],            [5, 144, 48, True, 'hard_swish', 1],            [5, 288, 96, True, 'hard_swish', 2],            [5, 576, 96, True, 'hard_swish', 1],            [5, 576, 96, True, 'hard_swish', 1],        ]        cls_ch_squeeze = 576        cls_ch_expand = 1280    else:        raise ValueError('{} model_name is not supported now!'.format(model_name))    return inplanes, cfg, cls_ch_squeeze, cls_ch_expandclass AsymmNet(nn.Layer):    def __init__(self, cfgs_name, num_classes=config_parameters['class_dim'], multiplier=1.0, asymmrate=1, dropout_rate=0.2,                 norm_layer=nn.BatchNorm2D):        super(AsymmNet, self).__init__()        inplanes, cfg, cls_ch_squeeze, cls_ch_expand = get_asymmnet_cfgs(cfgs_name)        k = multiplier        self.inplanes = make_divisible(inplanes * k)        self.first_block = nn.Sequential(            nn.Conv2D(3, self.inplanes, 3, 2, 1, bias_attr=False),            nn.BatchNorm2D(self.inplanes),            HardSwish(),        )        asymm_layers = []        for layer_cfg in cfg:            layer = self._make_layer(kernel_size=layer_cfg[0],                                     exp_ch=make_divisible(k * layer_cfg[1]),                                     out_channel=make_divisible(k * layer_cfg[2]),                                     use_se=layer_cfg[3],                                     act_func=layer_cfg[4],                                     asymmrate=asymmrate,                                     stride=layer_cfg[5],                                     norm_layer=norm_layer,                                     )            asymm_layers.append(layer)        self.asymm_block = nn.Sequential(*asymm_layers)        self.last_block = nn.Sequential(            nn.Conv2D(self.inplanes, make_divisible(k * cls_ch_squeeze), 1, bias_attr=False),            nn.BatchNorm2D(make_divisible(k * cls_ch_squeeze)),            HardSwish(),            nn.AdaptiveAvgPool2D(1),            nn.Conv2D(make_divisible(k * cls_ch_squeeze), cls_ch_expand, 1, bias_attr=False),            HardSwish(),            nn.Dropout2D(p=dropout_rate),            nn.Flatten(),        )        self.output = nn.Linear(cls_ch_expand, num_classes)    def _make_layer(self, kernel_size, exp_ch, out_channel, use_se, act_func, asymmrate, stride,                    norm_layer):        mid_planes = exp_ch        out_planes = out_channel        layer = AsymmBottleneck(self.inplanes, mid_planes,                                out_planes, kernel_size, asymmrate,                                act_func, strides=stride, use_se=use_se, norm_layer=norm_layer)        self.inplanes = out_planes        return layer    def forward(self, x):        x = self.first_block(x)        x = self.asymm_block(x)        x = self.last_block(x)        x = self.output(x)        return xclass AsymmNet_Large(AsymmNet):    def __init__(self, **kwargs):        super(AsymmNet_Large, self).__init__(cfgs_name='asymmnet_large', **kwargs)class AsymmNet_Small(AsymmNet):    def __init__(self, **kwargs):        super(AsymmNet_Small, self).__init__(cfgs_name='asymmnet_small', **kwargs)if __name__ == '__main__':        img = paddle.rand((1, 3, 256, 256))    vit = AsymmNet_Large()    out = vit(img)    print(out.shape)
登录后复制
[1, 16]
登录后复制
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/paddle/nn/layer/norm.py:641: UserWarning: When training, we now always track global mean and variance.  "When training, we now always track global mean and variance.")
登录后复制

④改进模型的训练和优化器的选择

In [ ]
model = AsymmNet_Large()model = paddle.Model(model)model.summary((1,3,256,256))
登录后复制In [ ]
#优化器选择class SaveBestModel(paddle.callbacks.Callback):    def __init__(self, target=0.5, path='work/best_model', verbose=0):        self.target = target        self.epoch = None        self.path = path    def on_epoch_end(self, epoch, logs=None):        self.epoch = epoch    def on_eval_end(self, logs=None):        if logs.get('acc') > self.target:            self.target = logs.get('acc')            self.model.save(self.path)            print('best acc is {} at epoch {}'.format(self.target, self.epoch))callback_visualdl = paddle.callbacks.VisualDL(log_dir='work/AsymmNet_Net')callback_savebestmodel = SaveBestModel(target=0.5, path='work/best_model')callbacks = [callback_visualdl, callback_savebestmodel]base_lr = config_parameters['lr']epochs = config_parameters['epochs']def make_optimizer(parameters=None):    momentum = 0.9    learning_rate= paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=base_lr, T_max=epochs, verbose=False)    weight_decay=paddle.regularizer.L2Decay(0.0002)    optimizer = paddle.optimizer.Momentum(        learning_rate=learning_rate,        momentum=momentum,        weight_decay=weight_decay,        parameters=parameters)    return optimizeroptimizer = make_optimizer(model.parameters())
登录后复制In [ ]
model.prepare(optimizer,              paddle.nn.CrossEntropyLoss(),              paddle.metric.Accuracy())
登录后复制In [32]
model.fit(train_loader,          eval_loader,          epochs=100,          batch_size=1,           # 是否打乱样本集               callbacks=callbacks,           verbose=1)   # 日志展示格式
登录后复制

⑤模型训练效果展示

AsymmNet模型训练速度很快,模型收敛的速度也非常快,性能有了大幅度的提升。

AsymmNet :使用非对称瓶颈实现的超轻量卷积神经网络! - 游乐网

热门推荐

更多

热门文章

更多

首页  返回顶部

本站所有软件都由网友上传,如有侵犯您的版权,请发邮件youleyoucom@outlook.com