当前位置: 首页 > article >正文

代码复现(四):DBINet

文章目录

  • datasets/AB2019BASDataset.py
  • datasets/ext_transforms.py
  • network/modules.py
  • network/DBINet.py
  • network/DBINet_Backbone.py
  • AB2019_train.py


  代码链接:DBINet

datasets/AB2019BASDataset.py

  加载Australia Bushfire 2019 Burned Area Segmentation Dataset的自定义数据集类AB2019BASDataset

  • def __init__(self, is_train, voc_dir):输入数据集目录路径voc_dir,数据jpeg图像路径为voc_dir/Images、掩膜png图像路径为voc_dir/Masks
  • def __getitem__(self, idx):根据索引获取图像和掩模,分别转换为RGB和灰度格式并返回。
  • def __len__(self):返回数据集样本总数。

执行逻辑见代码复现(二)。

datasets/ext_transforms.py

  定义了一些扩展的图像变换类,主要用于语义分割任务。
在这里插入图片描述

network/modules.py

  定义了DBINet、ResNet、PVT模型中的常用模块。

  • class ChannelAttention(nn.Module):实现通道注意力机制,以增强神经网络模型中通道维度的重要性。
  • class SpatialAttention(nn.Module):实现空间注意力机制,以增强神经网络模型中空间维度的重要性。
  • def autopad(k, p=None, d=1):根据卷积核大小、填充方式、膨胀系数来计算padding参数,使得卷积操作的输出尺寸与输入尺寸一致。
  • class Conv(nn.Module):实现卷积模块,包含 C o n v + B a t c h N o r m a l i z a t i o n + R e L U Conv+BatchNormalization+ReLU Conv+BatchNormalization+ReLU
  • class CBAM(nn.Module):实现卷积注意力模块。
  • class Bottleneck(nn.Module):实现ResNet的瓶颈块结果。
  • class C2(nn.Module):实现基于CSP(Cross Stage Partial)架构的瓶颈模块。
  • class SRM(nn.Module):定于 S e l f − R e f i n e m e n t Self-Refinement SelfRefinement模块,见论文Attention guided contextual feature fusion network for salient object detection、EfficientNet: Rethinking Model Scaling for Convolutional Neural Networks。
  • class Decoder(nn.Module):定义解码器模块。

在这里插入图片描述

class Decoder(nn.Module):
    #ch_in:输入通道数;ch_out:输出通道数;n:Bottleneck模块的数量;shortcut:是否使用残差连接;g:组卷积的数量;e:膨胀系数
    def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):  # ch_in, ch_out, number, shortcut, groups, expansion
        super().__init__()
        #隐藏通道数
        self.c = int(c2 * e)
        self.cv1 = Conv(c1, 2 * self.c, 1, 1)
        self.cv2 = Conv(2 * self.c, c2, 1)  # optional act=FReLU(c2)
        #n个Bottleneck模块组成的顺序容器,用于提取特征
        self.m = nn.Sequential(*(Bottleneck(self.c, self.c, shortcut, g, k=((3, 3), (3, 3)), e=1.0) for _ in range(n)))
        #自我精细化模块(Self Refinement Module)
        self.sr=SRM(c2)

    def forward(self, x):
        a, b = self.cv1(x).split((self.c, self.c), 1)
        out=self.cv2(torch.cat((self.m(a), b), 1))
        out=self.sr(out)
        return out
  • class REBNCONV(nn.Module):定义卷积模块,包含 C o n v + B a t c h N o r m l i z a t i o n + R e L U Conv+BatchNormlization+ReLU Conv+BatchNormlization+ReLU操作。
  • def _upsample_like(src,tar):通过线性插值法完成上采样操作。
  • class Convertor(nn.Module):定义转换器模块。

在这里插入图片描述

class Convertor(nn.Module):
    def __init__(self, in_ch=64, out_ch=64):
        super(Convertor,self).__init__()
        mid_ch=in_ch
        self.rebnconv1 = REBNCONV(mid_ch,mid_ch,dirate=1)
        self.pool1 = nn.MaxPool2d(2,stride=2,ceil_mode=True)

        self.rebnconv2 = REBNCONV(mid_ch,mid_ch,dirate=1)
        self.pool2 = nn.MaxPool2d(2,stride=2,ceil_mode=True)

        self.rebnconv3 = REBNCONV(mid_ch,mid_ch,dirate=1)
        self.pool3 = nn.MaxPool2d(2,stride=2,ceil_mode=True)

        self.rebnconv4 = REBNCONV(mid_ch,mid_ch,dirate=1)

        self.rebnconv5 = REBNCONV(mid_ch,mid_ch,dirate=2)

        self.cbam1=CBAM(mid_ch, mid_ch)
        self.cbam2=CBAM(mid_ch, mid_ch)
        self.cbam3=CBAM(mid_ch, mid_ch)
        self.cbam4=CBAM(mid_ch, mid_ch)

        self.rebnconv4d = REBNCONV(mid_ch*2,mid_ch,dirate=1)
        self.rebnconv3d = REBNCONV(mid_ch*2,mid_ch,dirate=1)
        self.rebnconv2d = REBNCONV(mid_ch*2,mid_ch,dirate=1)
        self.rebnconv1d = REBNCONV(mid_ch*2,out_ch,dirate=1)

    def forward(self,x1,x2,x3,x4):

        hx1 = self.rebnconv1(x1)
        hx = self.pool1(hx1)

        hx2 = self.rebnconv2(hx+x2)
        hx = self.pool2(hx2)

        hx3 = self.rebnconv3(hx+x3)
        hx = self.pool3(hx3)

        hx4 = self.rebnconv4(hx+x4)

        hx5 = self.rebnconv5(hx4)

        hx4d = self.rebnconv4d(torch.cat((hx5,self.cbam4(hx4)),1))
        hx4dup = _upsample_like(hx4d,hx3)

        hx3d = self.rebnconv3d(torch.cat((hx4dup,self.cbam3(hx3)),1))
        hx3dup = _upsample_like(hx3d,hx2)

        hx2d = self.rebnconv2d(torch.cat((hx3dup,self.cbam2(hx2)),1))
        hx2dup = _upsample_like(hx2d,hx1)

        hx1d = self.rebnconv1d(torch.cat((hx2dup,self.cbam1(hx1)),1))

        return hx1d, hx2d, hx3d, hx4d

network/DBINet.py

在这里插入图片描述
  定义类DBINet,包含了模型的总体执行流程。

  • def __init__(self,**kwargs):初始化DBINet的各个模块。
  • def forward(self,inputs):定义DBINet的前向传播过程。
class DBINet(nn.Module):
    def __init__(self, **kwargs):
        super(DBINet, self).__init__()      
		#初始化DBI编码器架构
        self.backbone  = DBIBkb()
        #定义特征图通道数,mid_ch表中间层通道数,eout_channels包含从主干网络输出的不同阶段特征图通道数,out_ch是最终输出的通道数
        mid_ch=64
        eout_channels=[64, 256, 512, 1024, 2048]
        out_ch=1
      	#对解码器输出四张特征图的卷积操作,卷积将不同层级输出的特征图压缩到mid_ch通道数
        self.eside1=Conv(eout_channels[1], mid_ch)
        self.eside2=Conv(eout_channels[2], mid_ch)
        self.eside3=Conv(eout_channels[3], mid_ch)
        self.eside4=Conv(eout_channels[4], mid_ch)
		#定义转换器
        self.convertor=Convertor(mid_ch, mid_ch)
        #定义上采样,可将特征图空间分辨率增加两倍
        self.upsample2 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        #定义解码器部分
        self.decoder4 = Decoder(mid_ch, mid_ch)
        self.decoder3 = Decoder(mid_ch, mid_ch)
        self.decoder2 = Decoder(mid_ch, mid_ch)
        self.decoder1 = Decoder(mid_ch, mid_ch)
        #定义对特征图s1、s2、s3、s4的卷积操作
        self.dside1 = nn.Conv2d(mid_ch, out_ch, kernel_size=3, stride=1, padding=1)
        self.dside2 = nn.Conv2d(mid_ch, out_ch, kernel_size=3, stride=1, padding=1)
        self.dside3 = nn.Conv2d(mid_ch, out_ch, kernel_size=3, stride=1, padding=1)
        self.dside4 = nn.Conv2d(mid_ch, out_ch, kernel_size=3, stride=1, padding=1)

        #使用kaiming初始化法初始化卷积层和批量归一化层的参数
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                init_weights(m, init_type='kaiming')
            elif isinstance(m, nn.BatchNorm2d):
                init_weights(m, init_type='kaiming')


    def forward(self, inputs):
    	#获取输入特征图的高和宽,后续用于上采样时恢复原始尺寸
        H, W = inputs.size(2), inputs.size(3)

        #编码器输出(四个ResNet Block的输出和最后DFM融合的输出特征)
        outs=self.backbone(inputs)
        c1, c2, c3, c4, c5 = outs
        
        #通过卷积操作改变编码器输出特征图的通道数
        c1=self.eside1(c1)
        c2=self.eside2(c2)
        c3=self.eside3(c3)
        c4=self.eside4(c4)

        #接受转换器的输出
        ca1,ca2,ca3,ca4=self.convertor(c1,c2,c3,c4)
        #将ca1特征图通过双线性插值的方式调整到ca2, ca3, ca4 的尺寸,便于后续特征融合
        ca12=F.interpolate(ca1, size=ca2.size()[2:], mode='bilinear', align_corners=True)
        ca13=F.interpolate(ca1, size=ca3.size()[2:], mode='bilinear', align_corners=True)
        ca14=F.interpolate(ca1, size=ca4.size()[2:], mode='bilinear', align_corners=True)

        #将转换器输出、编码器输出、上一解码器的输出(decoder4则对应DFM的输出)进行融合,输入到解码器模块中
        up4=self.decoder4(ca14 + c4 + c5)
        up3=self.decoder3(ca13 + c3 + self.upsample2(up4))
        up2=self.decoder2(ca12 + c2 + self.upsample2(up3))
        up1=self.decoder1(ca1 + c1 + self.upsample2(up2))
        
        #通过卷积操作将特征图up1、up2、up3、up4转换为单通道的输出特征图d1、d2、d3、d4
        d1=self.dside1(up1)
        d2=self.dside2(up2)
        d3=self.dside3(up3)
        d4=self.dside4(up4)
		#通过插值法将输出特征图恢复到图像的原始尺寸(Upsample)
        S1 = F.interpolate(d1, size=(H, W), mode='bilinear', align_corners=True)
        S2 = F.interpolate(d2, size=(H, W), mode='bilinear', align_corners=True)
        S3 = F.interpolate(d3, size=(H, W), mode='bilinear', align_corners=True)
        S4 = F.interpolate(d4, size=(H, W), mode='bilinear', align_corners=True)
     
  		#返回四张特征图,并进行sigmoid操作作为最终输出
        return S1,S2,S3,S4, torch.sigmoid(S1), torch.sigmoid(S2), torch.sigmoid(S3), torch.sigmoid(S4)

network/DBINet_Backbone.py

  定义DBINet编码器的执行流程。

  • def autopad(k, p=None, d=1):实现same填充。计算卷积操作中的填充(padding)参数,确保输出形状与输入形状相同。
    • k:卷积核大小。
    • p:是否使用填充。
    • d:空洞卷积膨胀系数。
def autopad(k, p=None, d=1):  # kernel, padding, dilation
    # Pad to 'same' shape outputs
    if d > 1:
        k = d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k]  # actual kernel-size
    if p is None:
        p = k // 2 if isinstance(k, int) else [x // 2 for x in k]  # auto-pad
    return p

在这里插入图片描述

  • class Conv(nn.Module):实现DBINet中的卷积模块( C o n v + B a t c h    N o r m a l i z a t i o n + R e L U Conv+Batch\;Normalization+ReLU Conv+BatchNormalization+ReLU)。
    • def __init__(self, c1, c2, k=1, s=1, p=None, g=1, d=1, act=True)

在这里插入图片描述

def __init__(self, c1, c2, k=1, s=1, p=None, g=1, d=1, act=True):
	super().__init__()
	self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p, d), groups=g, dilation=d, bias=False)
	self.bn = nn.BatchNorm2d(c2)
	self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()
  • def forward(self, x):定义前向传播过程( C o n v + B a t c h    N o r m a l i z a t i o n + R e L U Conv+Batch\;Normalization+ReLU Conv+BatchNormalization+ReLU)。
def forward(self, x):
	return self.act(self.bn(self.conv(x)))
  • def forward_fuse(self, x):前向融合传播,跳过批归一化层,只进行卷积和激活操作。这个函数常用于推理阶段,通过移除批归一化层加快推理速度。
def forward_fuse(self, x):
	return self.act(self.conv(x))
  • def weight_init(module):递归遍历一个模型的所有子模块,并根据模块类型(如卷积层、批归一化层、线性层等)使用不同方法进行初始化。
def weight_init(module):
#named_children()返回模型的所有子模块的名称和对应的子模块对象
    for n, m in module.named_children():
        print('initialize: '+n)
        if isinstance(m, nn.Conv2d):
        #卷积层使用Kaiming正态分布初始化权重,mode='fan_in'表示根据输入特征图数量来计算缩放系数,nonlinearity='relu'表明卷积层后面使用的是ReLU激活函数
            nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
            #若有偏置则初始化为零
            if m.bias is not None:
                nn.init.zeros_(m.bias)
        #批归一化层(BatchNorm2d)或组归一化层(GroupNorm)将尺度因子gamma初始化为1,偏置初始化为0.
        elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
            nn.init.ones_(m.weight)
            if m.bias is not None:
                nn.init.zeros_(m.bias)
        #线性层初始化方式同卷积层
        elif isinstance(m, nn.Linear):
            nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
            if m.bias is not None:
                nn.init.zeros_(m.bias)
        else:
        #其他自定义的子模块,则使用自己的initialize()方法初始化
            m.initialize()

在这里插入图片描述

  • class Bottleneck(nn.Module):实现ResNet网络中的瓶颈块(Bottleneck Block,上图即为普通残差结构与瓶颈结构)。通常用于深层卷积神经网络中,通过引入1x1的卷积来减少维度,然后再用 3x3 的卷积进行特征提取,最后通过另一个 1x1 的卷积恢复维度。这样设计能够降低计算量,同时保持较深网络的表达能力。
    • __init__(self, inplanes, planes, stride=1, downsample=None, dilation=1)

在这里插入图片描述

def __init__(self, inplanes, planes, stride=1, downsample=None, dilation=1):
	super(Bottleneck, self).__init__()
	#1x1卷积
	self.conv1      = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
	#batch normalization
	self.bn1        = nn.BatchNorm2d(planes)
	#3x3卷积,padding可使得在不改变特征图大小的情况下进行卷积操作
	self.conv2      = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=(3*dilation-1)//2, bias=False, dilation=dilation)
	self.bn2        = nn.BatchNorm2d(planes)
	#3x3卷积,通道数扩展到4*planes来恢复到瓶颈块的输出维度
	self.conv3      = nn.Conv2d(planes, planes*4, kernel_size=1, bias=False)
	self.bn3        = nn.BatchNorm2d(planes*4)
	#可选的下采样层
	self.downsample = downsample
  • def forward(self,x):定义前向传播。
def forward(self, x):
	#残差分支
	residual = x
	out      = F.relu(self.bn1(self.conv1(x)), inplace=True)
	out      = F.relu(self.bn2(self.conv2(out)), inplace=True)
	out      = self.bn3(self.conv3(out))
	#若downsample不为空,则调整残差分支尺寸,使之能与out相加
	if self.downsample is not None:
		residual = self.downsample(x)
	#残差连接+relu
	return F.relu(out+residual, inplace=True)

在这里插入图片描述

  • class DFM(nn.Module):定义DFM模块。
class DFM(nn.Module):
    def __init__(self, cur_in_ch, sup_in_ch, out_ch, mid_ch=64):
        super(DFM, self).__init__()
		#f_cur对应的卷积模块(Conv+BatchNorm+ReLU)
        self.cur_cv = Conv(cur_in_ch, mid_ch)
        #f_sup对应的卷积模块(Conv+BatchNorm+ReLU)          
        self.sup_cv = Conv(sup_in_ch, mid_ch)
        self.fuse1 = Conv(mid_ch*2, mid_ch)
        self.fuse2 = Conv(mid_ch, out_ch)

    def forward(self, x_cur, x_sup):
    	#f_cur、f_sup输入卷积模块
        x_cur1=self.cur_cv(x_cur)
        x_sup1=self.sup_cv(x_sup)
		#将f_cur、f_sup进行拼接
        xfuse=torch.cat([x_cur1, x_sup1], dim=1)
        #xfuse输入卷积模块,并与x_cur1执行元素加法
        x=self.fuse1(xfuse) + x_cur1
        #输入卷积模块得到f_out
        x=self.fuse2(x)
        return x

在这里插入图片描述

  • class DBIkb(nn.Module):实现DBI模型的编码器架构。
    • __init__(self, img_size=224, patch_size=4, in_chans=256, embed_dims=[64, 128, 320, 512], num_heads=[1, 2, 5, 8], mlp_ratios=[8, 8, 4, 4], qkv_bias=True, qk_scale=None, drop_rate=0.0, attn_drop_rate=0., drop_path_rate=0.1, norm_layer=partial(nn.LayerNorm, eps=1e-6), depths=[2, 2, 2, 2], sr_ratios=[8, 4, 2, 1], num_stages=4, F4=False):实现了用作主编码器的 R e s N e t 50 ResNet50 ResNet50和用作辅助编码器的 P V T ( P y r a m i d    V i s i o n    T r a n s f o r m e r ) PVT(Pyramid\;Vision\;Transformer) PVT(PyramidVisionTransformer) R e s N e t ResNet ResNet提取初步的低层次特征, P V T PVT PVT通过注意力机制提取高层次特征,并且使用 D F M DFM DFM进行多层次、多模态的特征融合,增强模型的表现能力。

在这里插入图片描述

def __init__(self, img_size=224, patch_size=4, in_chans=256, embed_dims=[64, 128, 320, 512],
             num_heads=[1, 2, 5, 8], mlp_ratios=[8, 8, 4, 4], qkv_bias=True, qk_scale=None, drop_rate=0.0,
             attn_drop_rate=0., drop_path_rate=0.1, norm_layer=partial(nn.LayerNorm, eps=1e-6), depths=[2, 2, 2, 2],
             sr_ratios=[8, 4, 2, 1], num_stages=4, F4=False):
    super().__init__()
    #定义ResNet模块初步对输入图像特征提取,其中由make_layer()构建每一层ResNet的具体实现
    mid_ch = 64
    self.inplanes = 64
    self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
    self.bn1 = nn.BatchNorm2d(64)
    self.layer1 = self.make_layer(64, 3, stride=1, dilation=1)
    self.layer2 = self.make_layer(128, 4, stride=2, dilation=1)
    self.layer3 = self.make_layer(256, 6, stride=2, dilation=1)
    self.layer4 = self.make_layer(512, 3, stride=2, dilation=1)
    self.CBkbs = [self.layer1, self.layer2, self.layer3, self.layer4]
	#定义DFM模块,用于在用于在不同层级当中融合ResNet和PVT提取的特征
    rout_chl = [256, 512, 1024, 2048]
    #融合ResNet的当前特征和PVT的支持特征,作为下一ResNet模块的输入(位于主编码器中)
    self.cdfm1 = DFM(cur_in_ch=rout_chl[1], sup_in_ch=embed_dims[1], out_ch=rout_chl[1])
    self.cdfm2 = DFM(cur_in_ch=rout_chl[2], sup_in_ch=embed_dims[2], out_ch=rout_chl[2])
    #融合PVT的当前特征和ResNet的支持特征,作为下一PVT模块的输入(位于辅助编码器中)
    self.tdfm1 = DFM(cur_in_ch=embed_dims[1], sup_in_ch=rout_chl[1], out_ch=embed_dims[1])
    self.tdfm2 = DFM(cur_in_ch=embed_dims[2], sup_in_ch=rout_chl[2], out_ch=embed_dims[2])
	#融合主、辅助编码器的输出,作为解码器的输入之一
    self.sumdfm = DFM(cur_in_ch=rout_chl[3], sup_in_ch=embed_dims[3], out_ch=mid_ch)
	#主编码器DFM模块列表
    self.cdfmx = [self.cdfm1, self.cdfm2]
    #辅助编码器DFM模块列表
    self.tdfmx = [self.tdfm1, self.tdfm2]

    #定义PVT模块
    #每个阶段包含的注意力块数目
    self.depths = depths
    self.F4 = F4
    self.num_stages = num_stages
	#每个层分配的路径丢弃率(drop path rate)
    dpr = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))]  
    cur = 0
	#定义每个阶段的PVT模块
    for i in range(1, num_stages):
    	#使用PatchEmbed()将图像分割为patch并嵌入到高维特征向量中.参数列表:
    	##img_size:输入图像大小
    	##patch_size:图像patch的大小
    	##in_chans:shu如图像的通道数
    	##embed_dim:嵌入维度,将每个patch嵌入到一个embed_dim大小的特征(行)向量中
    	#设输入图像张量形状为(B,C,H,W),则返回嵌入向量序列(Tensor矩阵)形状为(B,N,embed_dim),其中N是patch的数目(也是嵌入向量的数目)
        patch_embed = PatchEmbed(img_size=img_size if i == 0 else img_size // (2 ** (i + 1)),
                                 patch_size=patch_size if i == 0 else 2,
                                 in_chans=in_chans if i == 1 else embed_dims[i - 1],
                                 embed_dim=embed_dims[i])
        num_patches = patch_embed.num_patches if i != num_stages - 1 else patch_embed.num_patches + 1
        #定义位置嵌入矩阵(可训练参数)
        pos_embed = nn.Parameter(torch.zeros(1, num_patches, embed_dims[i]))
        #位置嵌入的dropout,防止过拟合
        pos_drop = nn.Dropout(p=drop_rate)
        #定义当前阶段的多个Transformer模块
        block = nn.ModuleList([Block(
            dim=embed_dims[i], num_heads=num_heads[i], mlp_ratio=mlp_ratios[i], qkv_bias=qkv_bias,
            qk_scale=qk_scale, drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr[cur + j],
            norm_layer=norm_layer, sr_ratio=sr_ratios[i])
            for j in range(depths[i])])
        cur += depths[i]
		#动态为类定义属性,包括每个阶段的嵌入模块、位置嵌入、位置Dropout和Transformer块
        setattr(self, f"patch_embed{i + 1}", patch_embed)
        setattr(self, f"pos_embed{i + 1}", pos_embed)
        setattr(self, f"pos_drop{i + 1}", pos_drop)
        setattr(self, f"block{i + 1}", block)
		#使用截断正太分布初始化位置嵌入
        trunc_normal_(pos_embed, std=.02)

    #对整个模块进行权重初始化(_init_weights在前面有定义)
    self.apply(self._init_weights)
  • def __init__weights(self,m):对模型中不同类型的层初始化参数。
def _init_weights(self, m):
	#全连接层
	if isinstance(m, nn.Linear):
	#截断正态分布初始化
	trunc_normal_(m.weight, std=.02)
	if isinstance(m, nn.Linear) and m.bias is not None:
		nn.init.constant_(m.bias, 0)
	#归一化层
	elif isinstance(m, nn.LayerNorm):
		nn.init.constant_(m.bias, 0)
		nn.init.constant_(m.weight, 1.0)
  • def _get_pos_embed(self, pos_embed, patch_embed, H, W):通过插值法调整位置嵌入的尺寸,使其能够与特征图的尺寸一致,便于后续位置嵌入的操作。
    • pos_embed:需调整尺寸的位置嵌入序列。
    • patch_embed:图像patch序列。
    • HW:特征图的高度与宽度。
def _get_pos_embed(self, pos_embed, patch_embed, H, W):
	#1.执行reshape将位置嵌入形状调整为(1,patch_embed.H,patch_embed.W,-1).其中,patch_embed.H、patch_embed.W表patch嵌入序列的高度、宽度.-1表自动推导出最后的维度(位置嵌入行向量通道数).
	#2.执行permute()变换维度,(B,H,W,embed_dim)->(B,embed_dim,H,W)
	#3.执行interpolate()进行双线性插值来调整位置嵌入尺寸,(patch_embed.H, patch_embed.W)->(H, W)
	#4.执行reshape()将插值后的位置嵌入调整到(1,embed_dim,H*W),即,将特征图的空间维度展平为一个一维的序列
	#5.执行permute()变换维度,(1,embed_dim,H*W)->(1,H*W,embed_dim)
	return F.interpolate(
	pos_embed.reshape(1, patch_embed.H, patch_embed.W, -1).permute(0, 3, 1, 2),
	size=(H, W), mode="bilinear").reshape(1, -1, H * W).permute(0, 2, 1)
  • def make_layer(self, planes, blocks, stride, dilation):构建残差网络ResNet的一个层级,通常由多个Bottleneck模块堆叠而成。

在这里插入图片描述

def make_layer(self, planes, blocks, stride, dilation):
	downsample = None
	#若步幅不为1,或输入通道数不等于Bottleneck模块的输出通道数(planes*4),则需下采样
	if stride != 1 or self.inplanes != planes*4:
		#构建下采样模块
		downsample = nn.Sequential(nn.Conv2d(self.inplanes, planes*4, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(planes*4))
		#添加Bottleneck模块
        layers = [Bottleneck(self.inplanes, planes, stride, downsample, dilation=dilation)]
        self.inplanes = planes*4
        for _ in range(1, blocks):
            layers.append(Bottleneck(self.inplanes, planes, dilation=dilation))
    #将所有模块使用nn.Sequential()串联并返回
	return nn.Sequential(*layers)

在这里插入图片描述

  • def forward(self,x):定义DBI模型编码器的前向传播过程,每个阶段中, R e s N e t ResNet ResNet模块(CBkbs)和 P V T PVT PVTtdfmxblock)模块分别处理输入特征,并在中间阶段通过融合模块(DFM)进行特征交互,最终产生多级特征。执行流程如下:
    • 输入阶段
      • (1)输入图像通过卷积(7x7,步长为2,填充为3,卷积核个数为64)+BatchNormlization+ReLU+最大池化。
      • (2)输入ResNet Bokck1CBkbs[0])得到初始卷积特征图out1
    • 卷积模块和Transformer模块输入特征的初始化
      • (1)执行c_out = out1t_out = out1,将ResNet Bokck2PVT Block1的输入特征均初始化为out1,并将out1保存到列表couts中。
    • 循环阶段:依次处理ResNet2+PVT Block1、ResNet3+PVT Block2+DFM、ResNet4+PVT Block3+DFM、DFM,分别对应阶段1、2、3。
      • (1)Transformer分支:使用patch_embed对上一PVT Block的输出t_out分块处理得到curt_out,若是2、3阶段则通过DFM模块融合t_out(PVT提取的特征)和c_out(ResNet提取的特征)。之后将位置编码pos_embedcurt_out相加,并应用pos_drop得到更新后的curt_out。将curt_out输入到下一PVT Block中,之后重塑curt_out形状,将其调整为 ( B , C , H , W ) (B,C,H,W) (B,C,H,W)作为下一阶段的t_out
      • (2)卷积分支:若是2或3阶段,则通过 D F M DFM DFM模块将卷积特征 c o u t c_out cout T r a n s f o r m e r Transformer Transformer特征 t o u t 融合 t_out 融合 tout融合,再输入下一ResNet卷积块 C B k b s [ i ] CBkbs[i] CBkbs[i]进行处理,更新 c o u t c_out cout。之后将更新后的c_out添加到couts列表中。
    • 最终阶段:卷积特征c_outTransformer特征t_out最终通过sumdfm模块进行融合,产生最终的融合特征ct_fuse,并存储到couts列表中。
def forward(self, x):
	#存储四个ResNet模块输出的特征、最终合并的特征
    couts = []
    #获取batch_size
    B = x.shape[0]
    #输入阶段:卷积(7x7,步长为2,填充为3,卷积核个数为64)+BatchNormlization+ReLU+最大池化
    out1 = F.relu(self.bn1(self.conv1(x)), inplace=True)
    out1 = F.max_pool2d(out1, kernel_size=3, stride=2, padding=1)
    #输入ResNet Block1,得到初始卷积特征图out1
    out1 = self.CBkbs[0](out1)

    c_out = out1
    t_out = out1
    #将ResNet Bokck1的输出保存到couts列表
    couts.append(c_out)
    #num_stages=4,循环共执行三次,对应ResNet Block2+PVT Block1、DFM+ResNet Block3+PVT Block2、DFM+ResNet Block4+PVT Block3、DFM(最终融合)
    for i in range(1, self.num_stages):
  		#获取PVT分支
    	#获取patch_embed模块,用于将特征图分割为固定大小的patch并转化为一个低维向量
        patch_embed = getattr(self, f"patch_embed{i + 1}")
        #获取位置编码
        pos_embed = getattr(self, f"pos_embed{i + 1}")
        #获取位置编码后的dropout层
        pos_drop = getattr(self, f"pos_drop{i + 1}")
        #获取Transformer Block(Self-Attention+MLP模块)
        block = getattr(self, f"block{i + 1}")
		#当i==2或i==3时(2、3阶段),t_out和c_out 经过self.tdfmx[i-2](DFM)特征融合,即,融合CNN和Transformer模块提取的特征,并将其传入patch_embed.而i==1时(1阶段),直接将t_out(Transformer模块的输出)传入patch_embed,并融合.
		#t_out、c_out分别代表Transformer、CNN上一个阶段的输出
		#curt_out代表经过patch_embed处理后的输出特征图,(H,W)表示特征图的高度与宽度
        curt_out, (H, W) = patch_embed(self.tdfmx[i - 2](t_out, c_out) if i in [2, 3] else t_out)
        if i == self.num_stages - 1:
        #若是最后阶段,则调整位置编码,将其调整为适应当前特征图的大小(H,W),其中pos_embed移除了向量0维度的值.因为位置编码(pos_embed)通常包含一个特殊的class token的位置编码来作为图像的类别信息.但在特征提取、语义分割任务中更关注特征图中的每个位置,无需图像的整体类别信息,故只保留与特征图位置相关的编码.
            pos_embed = self._get_pos_embed(pos_embed[:, 1:], patch_embed, H, W)
        else:
        #调整位置编码,使其匹配当前特征图的大小(H,W)
            pos_embed = self._get_pos_embed(pos_embed, patch_embed, H, W)
        #融合位置编码与特征图,并输入dropout层
        curt_out = pos_drop(curt_out + pos_embed)
        #ResNet分支
		#若是2、3阶段,则使用DFM模块融合c_out、t_out(CNN、Transformer提取到的特征)并送入到对应的ResNet Block(i+1)中,否则直接将原始的c_out作为输入
        c_out = self.CBkbs[i](self.cdfmx[i - 2](c_out, t_out) if i in [2, 3] else c_out)
		#PVT模块对提取的特征进行处理
        for blk in block:
            curt_out = blk(curt_out, H, W)
        #重塑特征图形状,[B, num_patches, embed_dim]->[[B, H, W, embed_dim],即,将一维的patch序列重塑为二维图像特征,并通过permute将其调整为[B,embed_dim,H,W]以适应卷积网络的输入.
        t_out = curt_out.reshape(B, H, W, -1).permute(0, 3, 1, 2).contiguous()
		#保存ResNet提取的特征
        couts.append(c_out)
	#最后阶段,使用DFM融合特征并保存到couts列表中并返回
    ct_fuse = self.sumdfm(c_out, t_out)
    couts.append(ct_fuse)
    return couts

AB2019_train.py

  Burned_Area_Segment-2019数据集训练代码,与代码复现(二)基本一致。

  • def get_argparse():定义命令行参数。
参数名作用
–trainset_path训练集路径。
–testset_path测试集路径。
–num_classes类别数,默认2,仅火点和非火点两种。
–dataset数据集名称,默认LC8FPS256。
–threshold火灾像素阈值。
–model模型名称,默认FPSU2Net。
–epochs训练轮次,默认80。
–total_itrs总迭代次数,默认30000。
–lr学习率,默认0.001。
–lr_policy学习策略,默认poly,可选poly、step。
–step_size步长,默认10。
–batch_size批次大小,默认4。
–trainsize图像训练尺寸,默认256。
–n_cpu默认2。
–ckpt模型、优化器和调度器状态的保存路径,可用于加载检查点,默认None。
–loss_type损失函数类型,默认bi,可选bi、bce。
–gpu_idgpu编号,默认0。
–weight_decay默认0.0001。
–random_seed随机种子,默认1。
–val_interval检查点保存间隔。
  • def get_dataset(opts):返回训练集、验证集的Dataset对象。
  • def validate(opts, model, loader, device, metrics):验证模型性能,将模型的预测与真实标签进行比较,并使用指标(metrics)来评估模型的准确性。
    • opts:命令行参数对象。
    • model:需验证的模型。
    • loader:验证集数据集加载器。
    • device:计算设备。
    • metrics:性能指标计算对象。
def validate(opts, model, loader, device, metrics):
	#清除之前计算的指标结果,确保验证过程从头开始
    metrics.reset()
    #验证阶段模型无需更新权重,因此禁止梯度计算
    with torch.no_grad():
    	#遍历验证集的输入图像、标签
        for i, (images, labels) in tqdm(enumerate(loader)):

            images = images.to(device, dtype=torch.float32)
            labels = labels.to(device, dtype=torch.long)
			#将图像送入模型,返回多个输出特征图和对应的激活值
            s1,s2,s3,s4, s1_sig,s2_sig,s3_sig,s4_sig=model(images)
            #s1即为最终特征图,其激活值作为模型输出,并使用squeeze()调整张量形状作为模型预测
            outputs=s1_sig
            preds=outputs.squeeze()
            #将预测矩阵二值化
            preds[preds>=opts.threshold]=1
            preds[preds<opts.threshold]=0
			#outputs是模型在前向传播过程中输出的张量,需将其从计算图中分离.并使用cpu()将其转移到cpu,因为numpy不支持GPU张量.
            preds = outputs.detach().cpu().numpy()
            #预测结果转为int型,便于与真实标签比较
            preds=preds.astype(int)
            #将真实标签 labels 转换为 NumPy 数组
            targets = labels.cpu().numpy()
            #若真实标签只有一个维度(即batch大小为1),使用squeeze()去除多余的维度,确保与预测结果的形状一致
            if targets.shape[0]==1:
                targets=targets.squeeze()
           #更新评估指标
            metrics.update(targets, preds)   
		#获取最终评估结果
        score = metrics.get_results()
    #返回分数
    return score
  • def main():训练、验证模型。
def main():
    if not os.path.exists('CHKP'):
        utils.mkdir('CHKP')

    opts = get_argparser().parse_args()

    tb_writer = SummaryWriter()
    
    torch.cuda.empty_cache()
    os.environ['CUDA_VISIBLE_DEVICES'] = opts.gpu_id
    
    print("Device: %s" % device)

    torch.manual_seed(opts.random_seed)
    np.random.seed(opts.random_seed)
    random.seed(opts.random_seed)

    train_dst, val_dst = get_dataset(opts)
    opts.total_itrs=opts.epochs * (len(train_dst) // opts.batch_size)
    print('opts:',opts)

    train_loader = data.DataLoader(
        train_dst, batch_size=opts.batch_size, shuffle=True, num_workers=opts.n_cpu,
        drop_last=True)  
    val_loader = data.DataLoader(
        val_dst, batch_size=opts.batch_size, shuffle=True, num_workers=opts.n_cpu)
    print("Dataset: %s, Train set: %d, Val set: %d" %
          (opts.dataset, len(train_dst), len(val_dst)))


    model = eval(opts.model)()
    
    optimizer = torch.optim.Adam(model.parameters(), lr=opts.lr, betas=(0.9, 0.999), 
                                 eps=1e-08, weight_decay=opts.weight_decay)
    metrics = StreamSegMetrics(opts.num_classes)

 
    if opts.lr_policy == 'poly':
        scheduler = utils.PolyLR(optimizer, opts.total_itrs, power=0.9)
    elif opts.lr_policy == 'step':
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=opts.step_size, gamma=0.1)


    def save_ckpt(path):
        torch.save({
            "epoch": epoch+1,
            "model_state": model.module.state_dict(),
            "optimizer_state": optimizer.state_dict(),
            "scheduler_state": scheduler.state_dict(),
        }, path)
        print("Model saved as %s" % path)  
        
    cur_epoch=0
    if opts.ckpt is not None and os.path.isfile(opts.ckpt):
        checkpoint = torch.load(opts.ckpt, map_location=torch.device('cpu'))
        model.load_state_dict(checkpoint["model_state"])
        model = nn.DataParallel(model)
        model=model.to(device)
        
        optimizer.load_state_dict(checkpoint["optimizer_state"])
        scheduler.load_state_dict(checkpoint["scheduler_state"])
        cur_epoch = checkpoint["epoch"]   
        
        print("Model restored from %s" % opts.ckpt)
        del checkpoint  # free memory
    else:
        print("[!] Retrain")
        model = nn.DataParallel(model)
        model=model.to(device)


    for epoch in range(cur_epoch,opts.epochs):
        model.train()
        cur_itrs=0
        data_loader = tqdm(train_loader, file=sys.stdout)
        running_loss = 0.0
        
        for (images, gts) in data_loader:
            cur_itrs += 1

            images = images.to(device, dtype=torch.float32)
            gts = gts.to(device, dtype=torch.float32)

            optimizer.zero_grad()
           
            s1,s2,s3,s4, s1_sig,s2_sig,s3_sig,s4_sig= model(images)
            
            loss1 = CE(s1, gts) + IOU(s1_sig, gts)
            loss2 = CE(s2, gts) + IOU(s2_sig, gts)
            loss3 = CE(s3, gts) + IOU(s3_sig, gts)
            loss4 = CE(s4, gts) + IOU(s4_sig, gts)
    
            total_loss = loss1 + loss2/2 + loss3/4 +loss4/8 
            
            running_loss += total_loss.data.item()

            total_loss.backward()
            optimizer.step()

            data_loader.desc = "Epoch {}/{}, loss={:.4f}".format(epoch, opts.epochs, running_loss/cur_itrs)
            
            scheduler.step()
            

        if (epoch+1) % opts.val_interval == 0:
            save_ckpt('CHKP/latest_{}_{}.pth'.format(opts.model, opts.dataset))

                
        print("validation...")
        model.eval()
        #传入验证集加载器、输出模型评估分数
        val_score = validate(
            opts=opts, model=model, loader=val_loader, device=device, metrics=metrics)
		#打印验证分数
        print('val_score:',val_score)

        tags = ["train_loss", "learning_rate","Mean_Acc","Mean_IoU","BA_IoU"]

        tb_writer.add_scalar(tags[0], (running_loss/cur_itrs), epoch)
        tb_writer.add_scalar(tags[1], optimizer.param_groups[0]["lr"], epoch)
        tb_writer.add_scalar(tags[2], val_score['Mean Acc'], epoch)
        tb_writer.add_scalar(tags[3], val_score['Mean IoU'], epoch)
        tb_writer.add_scalar(tags[4], val_score['Class IoU'][1], epoch)

http://www.kler.cn/news/359350.html

相关文章:

  • MongoDB聚合管道(Aggregation Pipeline)
  • 计算机基础 -- 计算机补码的原理
  • 【LeetCode:910. 最小差值 II + 模拟 + 思维】
  • 问:JVM中GC类型有哪些?触发条件有哪些?区别是啥?
  • 基于Spring Boot的大创项目高效管理系统
  • 关于Vue脚手架
  • 大模型日报10月21日
  • 利用透视变换实现文档矫正功能
  • AUTOSAR_EXP_ARAComAPI的5章笔记(13)
  • iOS IPA上传到App Store Connect的三种方案详解
  • chat_gpt回答:python使用writearray写tiff速度太慢,有什么快速的方法吗
  • UML(Unified Modeling Language,统一建模语言)
  • 基于Neo4j的推理知识图谱展示:智能系统与图谱可视化
  • Go 1.19.4 命令调用、日志、包管理、反射-Day 17
  • Git的认识及基本操作
  • 基于IP的真实地址生成器
  • 2024-10-17 问AI: [AI面试题] 讨论 AI 的挑战和局限性
  • 深度学习:YOLO目标检测和YOLO-V1算法损失函数的计算
  • 030 elasticsearch查询、聚合
  • 【Java】多线程-thread类的常用方法和生命周期【主线学习笔记】