SENET(channel attention)

FECAM(Frequency Enhanced Channel Attention Mechanism)

As a module to enhance the frequency domain modeling capability of transformers and LSTM



import torch
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt

import numpy as np
import math
from math import sqrt
from utils.masking import TriangularCausalMask, ProbMask
# from masking import TriangularCausalMask, ProbMask#for parameter count
import os

class FullAttention(nn.Module):
    def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
        super(FullAttention, self).__init__()
        self.scale = scale
        self.mask_flag = mask_flag
        self.output_attention = output_attention
        self.dropout = nn.Dropout(attention_dropout)

    def forward(self, queries, keys, values, attn_mask):
        B, L, H, E = queries.shape
        _, S, _, D = values.shape
        scale = self.scale or 1. / sqrt(E)

        scores = torch.einsum("blhe,bshe->bhls", queries, keys)

        if self.mask_flag:
            if attn_mask is None:
                attn_mask = TriangularCausalMask(B, L, device=queries.device)

            scores.masked_fill_(attn_mask.mask, -np.inf)

        A = self.dropout(torch.softmax(scale * scores, dim=-1))
        V = torch.einsum("bhls,bshd->blhd", A, values)

        if self.output_attention:
            return (V.contiguous(), A)
            return (V.contiguous(), None)

class ProbAttention(nn.Module):
    def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
        super(ProbAttention, self).__init__()
        self.factor = factor
        self.scale = scale
        self.mask_flag = mask_flag
        self.output_attention = output_attention
        self.dropout = nn.Dropout(attention_dropout)

    def _prob_QK(self, Q, K, sample_k, n_top):  # n_top: c*ln(L_q)
        # Q [B, H, L, D]
        B, H, L_K, E = K.shape
        _, _, L_Q, _ = Q.shape

        # calculate the sampled Q_K
        K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)
        index_sample = torch.randint(L_K, (L_Q, sample_k))  # real U = U_part(factor*ln(L_k))*L_q
        K_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]
        Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze()

        # find the Top_k query with sparisty measurement
        M = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K)
        M_top = M.topk(n_top, sorted=False)[1]

        # use the reduced Q to calculate Q_K
        Q_reduce = Q[torch.arange(B)[:, None, None],
                   torch.arange(H)[None, :, None],
                   M_top, :]  # factor*ln(L_q)
        Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1))  # factor*ln(L_q)*L_k

        return Q_K, M_top

    def _get_initial_context(self, V, L_Q):
        B, H, L_V, D = V.shape
        if not self.mask_flag:
            # V_sum = V.sum(dim=-2)
            V_sum = V.mean(dim=-2)
            contex = V_sum.unsqueeze(-2).expand(B, H, L_Q, V_sum.shape[-1]).clone()
        else:  # use mask
            assert (L_Q == L_V)  # requires that L_Q == L_V, i.e. for self-attention only
            contex = V.cumsum(dim=-2)
        return contex

    def _update_context(self, context_in, V, scores, index, L_Q, attn_mask):
        B, H, L_V, D = V.shape

        if self.mask_flag:
            attn_mask = ProbMask(B, H, L_Q, index, scores, device=V.device)
            scores.masked_fill_(attn_mask.mask, -np.inf)

        attn = torch.softmax(scores, dim=-1)  # nn.Softmax(dim=-1)(scores)

        context_in[torch.arange(B)[:, None, None],
        torch.arange(H)[None, :, None],
        index, :] = torch.matmul(attn, V).type_as(context_in)
        if self.output_attention:
            attns = (torch.ones([B, H, L_V, L_V]) / L_V).type_as(attn).to(attn.device)
            attns[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], index, :] = attn
            return (context_in, attns)
            return (context_in, None)

    def forward(self, queries, keys, values, attn_mask):
        B, L_Q, H, D = queries.shape
        _, L_K, _, _ = keys.shape

        queries = queries.transpose(2, 1)
        keys = keys.transpose(2, 1)
        values = values.transpose(2, 1)

        U_part = self.factor * np.ceil(np.log(L_K)).astype('int').item()  # c*ln(L_k)
        u = self.factor * np.ceil(np.log(L_Q)).astype('int').item()  # c*ln(L_q)

        U_part = U_part if U_part < L_K else L_K
        u = u if u < L_Q else L_Q

        scores_top, index = self._prob_QK(queries, keys, sample_k=U_part, n_top=u)

        # add scale factor
        scale = self.scale or 1. / sqrt(D)
        if scale is not None:
            scores_top = scores_top * scale
        # get the context
        context = self._get_initial_context(values, L_Q)
        # update the context with selected top_k queries
        context, attn = self._update_context(context, values, scores_top, index, L_Q, attn_mask)

        return context.contiguous(), attn

class AttentionLayer(nn.Module):
    def __init__(self, attention, d_model, n_heads, d_keys=None,
        super(AttentionLayer, self).__init__()

        d_keys = d_keys or (d_model // n_heads)
        d_values = d_values or (d_model // n_heads)

        self.inner_attention = attention
        self.query_projection = nn.Linear(d_model, d_keys * n_heads)
        self.key_projection = nn.Linear(d_model, d_keys * n_heads)
        self.value_projection = nn.Linear(d_model, d_values * n_heads)
        self.out_projection = nn.Linear(d_values * n_heads, d_model)
        self.n_heads = n_heads

    def forward(self, queries, keys, values, attn_mask):
        B, L, _ = queries.shape
        _, S, _ = keys.shape
        H = self.n_heads

        queries = self.query_projection(queries).view(B, L, H, -1)
        keys = self.key_projection(keys).view(B, S, H, -1)
        values = self.value_projection(values).view(B, S, H, -1)

        out, attn = self.inner_attention(
        out = out.view(B, L, -1)

        return self.out_projection(out), attn

这个代码实现了一个注意力机制模块,主要包括了两种注意力机制(全局注意力 FullAttention 和 概率注意力 ProbAttention),以及一个常规的 AttentionLayer 用于处理多头注意力。以下是详细的解释:

1. 基础导入

import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import math
from math import sqrt
from utils.masking import TriangularCausalMask, ProbMask
  • torchtorch.nntorch.nn.functional 是 PyTorch 的核心模块,用于定义和操作神经网络结构。
  • matplotlib.pyplotnumpy 是常用的 Python 库,用于绘图和数值计算。
  • TriangularCausalMaskProbMask 用于为注意力机制提供掩码(masking),以确保特定位置的注意力权重被正确计算。

2. FullAttention 类(全局注意力)

class FullAttention(nn.Module):
    def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
        super(FullAttention, self).__init__()
        self.scale = scale
        self.mask_flag = mask_flag
        self.output_attention = output_attention
        self.dropout = nn.Dropout(attention_dropout)
  • FullAttention 实现了全局自注意力机制。它通过 querieskeys 计算注意力权重。
  • mask_flag:决定是否应用掩码。
  • scale:对注意力分数进行缩放。
  • attention_dropout:为防止过拟合,应用 Dropout。

forward 函数:

def forward(self, queries, keys, values, attn_mask):
    B, L, H, E = queries.shape
    _, S, _, D = values.shape
    scale = self.scale or 1. / sqrt(E)

    scores = torch.einsum("blhe,bshe->bhls", queries, keys)
    if self.mask_flag:
        if attn_mask is None:
            attn_mask = TriangularCausalMask(B, L, device=queries.device)
        scores.masked_fill_(attn_mask.mask, -np.inf)
    A = self.dropout(torch.softmax(scale * scores, dim=-1))
    V = torch.einsum("bhls,bshd->blhd", A, values)
    return (V.contiguous(), A if self.output_attention else None)
  • torch.einsum:使用爱因斯坦求和约定来进行矩阵乘法,计算 querieskeys 的相似度。
  • mask_flag:当使用掩码时,将填充 -np.inf 来忽略特定位置的注意力分数。
  • softmax:用于将分数转换为概率分布。
  • V:是使用注意力分数和 values 计算出的加权和。

3. ProbAttention 类(概率注意力)

class ProbAttention(nn.Module):
    def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
        super(ProbAttention, self).__init__()
        self.factor = factor
        self.scale = scale
        self.mask_flag = mask_flag
        self.output_attention = output_attention
        self.dropout = nn.Dropout(attention_dropout)
  • ProbAttention 实现了一种基于稀疏注意力的机制。通过概率的方法减少计算量,特别适用于长序列的处理。
  • factor:控制采样和稀疏度。
  • scaleattention_dropout 类似于 FullAttention 中的作用。

_prob_QK 函数:

def _prob_QK(self, Q, K, sample_k, n_top):
    K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)
    index_sample = torch.randint(L_K, (L_Q, sample_k))
    K_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]
    Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze()

    M = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K)
    M_top = M.topk(n_top, sorted=False)[1]

    Q_reduce = Q[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], M_top, :]
    Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1))
    return Q_K, M_top
  • 目的:通过抽样计算 QK 的相似度,从而减少全局计算开销。
  • 采样:通过从 K 中随机选择一部分,并与 Q 进行计算,得出最具代表性的 Top_k 查询,减少计算负担。

4. AttentionLayer 类

class AttentionLayer(nn.Module):
    def __init__(self, attention, d_model, n_heads, d_keys=None, d_values=None):
        super(AttentionLayer, self).__init__()
        self.inner_attention = attention
        self.query_projection = nn.Linear(d_model, d_keys * n_heads)
        self.key_projection = nn.Linear(d_model, d_keys * n_heads)
        self.value_projection = nn.Linear(d_model, d_values * n_heads)
        self.out_projection = nn.Linear(d_values * n_heads, d_model)
        self.n_heads = n_heads
  • AttentionLayer 是多头注意力层,将输入的 querieskeysvalues 进行线性变换,然后通过注意力机制处理。
  • d_model:输入特征的维度。
  • n_heads:多头注意力的头数。
  • inner_attention:可以是 FullAttentionProbAttention
forward 函数:
def forward(self, queries, keys, values, attn_mask):
    B, L, _ = queries.shape
    H = self.n_heads

    queries = self.query_projection(queries).view(B, L, H, -1)
    keys = self.key_projection(keys).view(B, S, H, -1)
    values = self.value_projection(values).view(B, S, H, -1)

    out, attn = self.inner_attention(
    out = out.view(B, L, -1)
    return self.out_projection(out), attn
  • 线性变换querieskeysvalues 都通过线性层进行投影,转换为适应多头注意力的形状。
  • 多头注意力:将 querieskeysvalues 分成多个头来计算注意力。
  • 输出投影:最后通过 out_projection 投影到原始维度。


  • 全局注意力FullAttention)计算每对 querieskeys 的相似度,用于标准的 Transformer 模型。
  • 概率注意力ProbAttention)通过稀疏采样减少计算复杂度,特别适用于长序列输入。
  • AttentionLayer 实现了一个多头注意力机制,可以选择全局注意力或概率注意力来处理输入。


在 PyTorch 中,forward 函数是每个自定义神经网络模块(nn.Module)的核心方法,它定义了模型的前向传播过程。前向传播是指输入数据如何经过网络层,逐步计算出输出结果的过程。

forward 函数的主要作用:

  1. 定义模型的计算逻辑forward 函数负责具体的计算过程。每当你将输入传入模型时,模型会自动调用 forward 函数来执行从输入到输出的所有计算。

    forward 函数中,你将定义:

    • 数据如何流经每一层(如卷积层、全连接层、注意力机制等)。
    • 应用哪些激活函数、正则化(如 BatchNormDropout)等操作。
    • 最终如何产生输出。
  2. 将输入映射到输出forward 函数的核心任务是将输入数据(张量)传递到各层并生成输出。例如,在卷积神经网络中,输入张量会经过卷积层、池化层、激活函数等模块,最后输出结果。

  3. 自动执行反向传播: 虽然 forward 仅定义前向传播过程,但它与自动微分(autograd)紧密相连。当你调用 loss.backward() 时,PyTorch 会自动根据 forward 中的操作来计算梯度,进而进行反向传播和参数更新。


import torch
import torch.nn as nn
import torch.nn.functional as F
from layers.dctnet import dct_channel_block, dct
# from dctnet import dct_channel_block, dct

class ConvLayer(nn.Module):
    def __init__(self, c_in):
        super(ConvLayer, self).__init__()
        self.downConv = nn.Conv1d(in_channels=c_in,
        self.norm = nn.BatchNorm1d(c_in)
        self.activation = nn.ELU()
        self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        x = self.downConv(x.permute(0, 2, 1))
        x = self.norm(x)
        x = self.activation(x)
        x = self.maxPool(x)
        x = x.transpose(1, 2)
        return x

class EncoderLayer(nn.Module):
    def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
        super(EncoderLayer, self).__init__()
        d_ff = d_ff or 4 * d_model
        self.attention = attention
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu
        self.dct_norm = nn.LayerNorm([512], eps=1e-6)

    def forward(self, x, attn_mask=None):
        # print(x.shape)
        #torch.Size([32, 96, 512])
        #torch.Size([32, 49, 512])
        before self-attention
        # mid  = self.dct_layer(x)#
        # x = x+mid
        new_x, attn = self.attention(
            x, x, x,
        x = x + self.dropout(new_x)
        y = x = self.norm1(x)
        y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
        y = self.dropout(self.conv2(y).transpose(-1, 1))
        # print("y.shape:",y.shape)#y.shape: torch.Size([32, 96, 512])#为了减少计算量,不然要做512次L=96的dct
        result = self.norm2(x + y)
        # mid  = self.dct_layer(result)
        # result = result+mid
        # result = self.dct_norm(result) #norm 144

        return result, attn
        # return self.norm2(x + y), attn

class Encoder(nn.Module):
    def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
        super(Encoder, self).__init__()
        self.attn_layers = nn.ModuleList(attn_layers)
        self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
        self.norm = norm_layer

    def forward(self, x, attn_mask=None):
        # x [B, L, D]
        attns = []
        if self.conv_layers is not None:
            for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
                x, attn = attn_layer(x, attn_mask=attn_mask)
                x = conv_layer(x)
            x, attn = self.attn_layers[-1](x)
            for attn_layer in self.attn_layers:
                x, attn = attn_layer(x, attn_mask=attn_mask)

        if self.norm is not None:
            x = self.norm(x)

        return x, attns

class DecoderLayer(nn.Module):
    def __init__(self, self_attention, cross_attention, d_model, d_ff=None,
                 dropout=0.1, activation="relu"):
        super(DecoderLayer, self).__init__()
        d_ff = d_ff or 4 * d_model
        self.self_attention = self_attention
        self.cross_attention = cross_attention
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu
        # self.dct_layer=dct_channel_block(512)
        # self.dct_norm = nn.LayerNorm([512], eps=1e-6)

    def forward(self, x, cross, x_mask=None, cross_mask=None):
        x = x + self.dropout(self.self_attention(
            x, x, x,
        x = self.norm1(x)

        x = x + self.dropout(self.cross_attention(
            x, cross, cross,

        y = x = self.norm2(x)
        y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
        y = self.dropout(self.conv2(y).transpose(-1, 1))
        # result = self.norm3(x + y)
        # mid  = self.dct_layer(result)
        # result = result+mid
        # result = self.dct_norm(result) #norm 144
        # return result
        # return self.norm3(x + y)
        return self.norm3(x + y)

class Decoder(nn.Module):
    def __init__(self, layers, norm_layer=None, projection=None):
        super(Decoder, self).__init__()
        self.layers = nn.ModuleList(layers)
        self.norm = norm_layer
        self.projection = projection

    def forward(self, x, cross, x_mask=None, cross_mask=None):
        for layer in self.layers:
            x = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask)

        if self.norm is not None:
            x = self.norm(x)

        if self.projection is not None:
            x = self.projection(x)
        return x

这段代码定义了一个类似 Transformer 的神经网络架构,包含卷积层、编码器层和解码器层。它可以用于处理序列数据(如时间序列、自然语言处理等)。具体包括以下组件:

1. ConvLayer(卷积层):

class ConvLayer(nn.Module):
    def __init__(self, c_in):
        super(ConvLayer, self).__init__()
        self.downConv = nn.Conv1d(in_channels=c_in, out_channels=c_in, kernel_size=3, padding=2, padding_mode='circular')
        self.norm = nn.BatchNorm1d(c_in)
        self.activation = nn.ELU()
        self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
  • 这个卷积层首先对输入进行 1D 卷积,然后进行批归一化(BatchNorm)和 ELU 激活,再通过最大池化层减少特征维度。
  • 作用:将高维的输入进行降采样,同时提取局部特征。

2. EncoderLayer(编码器层)

class EncoderLayer(nn.Module):
    def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
        super(EncoderLayer, self).__init__()
        d_ff = d_ff or 4 * d_model
        self.attention = attention
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu
        self.dct_layer = dct_channel_block(512)  # DCT增强
        self.dct_norm = nn.LayerNorm([512], eps=1e-6)
  • 作用:这个层结合了注意力机制、自注意力(self-attention)、卷积层(用于前馈网络),并且通过 LayerNorm 和 Dropout 进行正则化。它还引入了 DCT(离散余弦变换)来增强输入的特征处理。
  • 自注意力机制:通过 attention 来捕获不同位置之间的相关性。
  • 卷积网络:通过卷积进一步对特征进行变换和学习。

3. Encoder(编码器)

class Encoder(nn.Module):
    def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
        super(Encoder, self).__init__()
        self.attn_layers = nn.ModuleList(attn_layers)
        self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
        self.norm = norm_layer
  • 作用:这个类是编码器的整体结构,由多个 EncoderLayer 组成。如果有卷积层(conv_layers),会在每次注意力计算后对输入进行卷积操作。
  • 流程:输入经过每一层的注意力层,最后通过 LayerNorm 进行归一化。

4. DecoderLayer(解码器层)

class DecoderLayer(nn.Module):
    def __init__(self, self_attention, cross_attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
        super(DecoderLayer, self).__init__()
        d_ff = d_ff or 4 * d_model
        self.self_attention = self_attention
        self.cross_attention = cross_attention
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu
  • 作用:解码器层包括了自注意力(self_attention)和交叉注意力(cross_attention)。自注意力用于捕捉解码器内部的依赖关系,而交叉注意力用于将编码器的输出与解码器的输入结合。
  • 卷积层和归一化:使用卷积和归一化对注意力层的输出进一步处理。

5. Decoder(解码器)

class Decoder(nn.Module):
    def __init__(self, layers, norm_layer=None, projection=None):
        super(Decoder, self).__init__()
        self.layers = nn.ModuleList(layers)
        self.norm = norm_layer
        self.projection = projection
  • 作用:解码器由多个 DecoderLayer 组成。输入首先通过每一层的解码器,结合来自编码器的输出,最后通过归一化和可选的投影层生成最终的输出。

6. 前向传播 (forward) 函数

每个模块都有一个 forward 函数,定义了数据在该模块中的流动过程。

def forward(self, x):
    x = self.downConv(x.permute(0, 2, 1))
    x = self.norm(x)
    x = self.activation(x)
    x = self.maxPool(x)
    x = x.transpose(1, 2)
    return x
  • 操作顺序
    1. 输入经过一维卷积处理。
    2. 批归一化。
    3. 激活函数 ELU。
    4. 最大池化。
    5. 变换维度并返回输出。
def forward(self, x, attn_mask=None):
    new_x, attn = self.attention(x, x, x, attn_mask=attn_mask)
    x = x + self.dropout(new_x)
    y = x = self.norm1(x)
    y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
    y = self.dropout(self.conv2(y).transpose(-1, 1))
    result = self.norm2(x + y)
    return result, attn
  • 操作顺序
    1. 输入通过注意力层进行自注意力计算。
    2. 经过残差连接和 Dropout。
    3. 进行 LayerNorm 归一化。
    4. 输入通过前馈卷积网络。
    5. 最后输出经过归一化的结果。
def forward(self, x, cross, x_mask=None, cross_mask=None):
    x = x + self.dropout(self.self_attention(x, x, x, attn_mask=x_mask)[0])
    x = self.norm1(x)
    x = x + self.dropout(self.cross_attention(x, cross, cross, attn_mask=cross_mask)[0])

    y = x = self.norm2(x)
    y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
    y = self.dropout(self.conv2(y).transpose(-1, 1))

    return self.norm3(x + y)
  • 操作顺序
    1. 输入通过自注意力计算。
    2. 进行 LayerNorm 归一化。
    3. 再通过交叉注意力与编码器的输出结合。
    4. 经过前馈卷积层,最后输出结果。


  • 卷积层 用于特征提取和降采样。
  • 编码器解码器 使用注意力机制来捕捉不同时间步之间的依赖关系,并通过卷积层进行进一步的特征处理。
  • 前向传播 函数定义了每个模块内部的数据流动和处理逻辑,结合了卷积、注意力机制和正则化。


import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils import weight_norm
import math

class PositionalEmbedding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEmbedding, self).__init__()
        # Compute the positional encodings once in log space.
        pe = torch.zeros(max_len, d_model).float()
        pe.require_grad = False

        position = torch.arange(0, max_len).float().unsqueeze(1)
        div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return self.pe[:, :x.size(1)]

class TokenEmbedding(nn.Module):
    def __init__(self, c_in, d_model):
        super(TokenEmbedding, self).__init__()
        padding = 1 if torch.__version__ >= '1.5.0' else 2
        self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model,
                                   kernel_size=3, padding=padding, padding_mode='circular', bias=False)
        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu')

    def forward(self, x):
        x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2)
        return x

class FixedEmbedding(nn.Module):
    def __init__(self, c_in, d_model):
        super(FixedEmbedding, self).__init__()

        w = torch.zeros(c_in, d_model).float()
        w.require_grad = False

        position = torch.arange(0, c_in).float().unsqueeze(1)
        div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()

        w[:, 0::2] = torch.sin(position * div_term)
        w[:, 1::2] = torch.cos(position * div_term)

        self.emb = nn.Embedding(c_in, d_model)
        self.emb.weight = nn.Parameter(w, requires_grad=False)

    def forward(self, x):
        return self.emb(x).detach()

class TemporalEmbedding(nn.Module):
    def __init__(self, d_model, embed_type='fixed', freq='h'):
        super(TemporalEmbedding, self).__init__()

        minute_size = 4
        hour_size = 24
        weekday_size = 7
        day_size = 32
        month_size = 13

        Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embedding
        if freq == 't':
            self.minute_embed = Embed(minute_size, d_model)
        self.hour_embed = Embed(hour_size, d_model)
        self.weekday_embed = Embed(weekday_size, d_model)
        self.day_embed = Embed(day_size, d_model)
        self.month_embed = Embed(month_size, d_model)

    def forward(self, x):
        x = x.long()

        minute_x = self.minute_embed(x[:, :, 4]) if hasattr(self, 'minute_embed') else 0.
        hour_x = self.hour_embed(x[:, :, 3])
        weekday_x = self.weekday_embed(x[:, :, 2])
        day_x = self.day_embed(x[:, :, 1])
        month_x = self.month_embed(x[:, :, 0])

        return hour_x + weekday_x + day_x + month_x + minute_x

class TimeFeatureEmbedding(nn.Module):
    def __init__(self, d_model, embed_type='timeF', freq='h'):
        super(TimeFeatureEmbedding, self).__init__()

        freq_map = {'h': 4, 't': 5, 's': 6, 'm': 1, 'a': 1, 'w': 2, 'd': 3, 'b': 3}
        d_inp = freq_map[freq]
        self.embed = nn.Linear(d_inp, d_model, bias=False)

    def forward(self, x):
        return self.embed(x)

class DataEmbedding(nn.Module):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding, self).__init__()

        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
                                                    freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
            d_model=d_model, embed_type=embed_type, freq=freq)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        x = self.value_embedding(x) + self.temporal_embedding(x_mark) + self.position_embedding(x)
        return self.dropout(x)

class DataEmbedding_wo_pos(nn.Module):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding_wo_pos, self).__init__()

        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
                                                    freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
            d_model=d_model, embed_type=embed_type, freq=freq)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        x = self.value_embedding(x) + self.temporal_embedding(x_mark)
        return self.dropout(x)

class DataEmbedding_wo_pos_temp(nn.Module):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding_wo_pos_temp, self).__init__()

        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
                                                    freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
            d_model=d_model, embed_type=embed_type, freq=freq)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        x = self.value_embedding(x)
        return self.dropout(x)

class DataEmbedding_wo_temp(nn.Module):
    def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
        super(DataEmbedding_wo_temp, self).__init__()

        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
        self.position_embedding = PositionalEmbedding(d_model=d_model)
        self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type,
                                                    freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding(
            d_model=d_model, embed_type=embed_type, freq=freq)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark):
        x = self.value_embedding(x) + self.position_embedding(x)
        return self.dropout(x)

这段代码实现了一系列嵌入层(Embedding Layers),主要用于将输入数据(如时间序列数据)转换为适合模型处理的向量表示。这种嵌入是序列建模中非常重要的一部分,特别是在处理时间序列数据时。


  1. PositionalEmbedding(位置嵌入)

    • 作用:为每个序列位置编码一个向量,使得模型能够感知序列中每个位置的顺序。常用于 Transformer 模型中,因为这种模型本身不具备位置信息的感知能力。
    • 实现细节
      • 使用正弦和余弦函数为不同位置生成不同的嵌入,位置嵌入向量的奇偶位分别使用 sincos
      • pe 是一个预计算的二维张量,包含最大序列长度(max_len)和每个位置的嵌入(d_model 的维度)。
  2. TokenEmbedding(令牌嵌入)

    • 作用:将输入的数据通过一维卷积进行嵌入。可以理解为将输入的多维数据映射到一个高维的特征空间。
    • 实现细节
      • 使用 1D 卷积将输入通道数(c_in)转换为模型维度(d_model)。
      • 初始化权重时使用 Kaiming 初始化方法,适合 ReLU 和 LeakyReLU 激活函数。
  3. FixedEmbedding(固定嵌入)

    • 作用:通过固定的正弦和余弦函数生成嵌入,类似于位置嵌入,但它是用于固定长度的输入数据。
    • 实现细节
      • 每个位置的奇数位使用 sin,偶数位使用 cos
      • 权重矩阵 w 是固定的,不参与训练。
  4. TemporalEmbedding(时间嵌入)

    • 作用:为时间特征(如小时、天、月份等)生成嵌入,特别适用于处理时间序列数据。
    • 实现细节
      • 支持嵌入不同的时间粒度(如分钟、小时、周几、天、月份等)。
      • 可以选择使用固定嵌入(FixedEmbedding)或可学习嵌入(nn.Embedding)。
  5. TimeFeatureEmbedding(时间特征嵌入)

    • 作用:为特定时间粒度(如小时、天、周等)的数值进行线性嵌入。
    • 实现细节
      • 根据 freq_map 映射不同的时间粒度,并使用线性变换嵌入。
  6. DataEmbedding(数据嵌入)

    • 作用:组合了 TokenEmbeddingPositionalEmbeddingTemporalEmbedding,生成完整的嵌入向量。
    • 实现细节
      • 先将输入数据通过 TokenEmbedding 进行嵌入。
      • 然后结合 TemporalEmbedding 提取时间相关特征。
      • 还会加入位置嵌入(PositionalEmbedding)来提供序列位置信息。
      • 最后通过 Dropout 防止过拟合。
  7. DataEmbedding_wo_pos(无位置嵌入的数据嵌入)

    • 作用:与 DataEmbedding 类似,但不包含位置嵌入。
    • 实现细节
      • 只包含 TokenEmbeddingTemporalEmbedding,省略位置嵌入部分。
  8. DataEmbedding_wo_pos_temp(无位置和时间嵌入的数据嵌入)

    • 作用:只进行 TokenEmbedding,不包括时间和位置嵌入。
    • 实现细节
      • 仅使用 TokenEmbedding 来处理输入数据。
  9. DataEmbedding_wo_temp(无时间嵌入的数据嵌入)

    • 作用:不包括时间嵌入,但仍然使用位置嵌入。
    • 实现细节
      • 包含 TokenEmbeddingPositionalEmbedding,省略了 TemporalEmbedding


这些嵌入层的主要作用是将输入的时间序列数据转换成向量表示,这样可以更好地被神经网络模型(如 Transformer 或 CNN)处理。时间序列数据通常包含时间信息(如天、小时等),嵌入层通过提取这些时间信息来增强模型对时间依赖性的感知。通过组合 TokenEmbedding(提取特征)、PositionalEmbedding(位置信息)和 TemporalEmbedding(时间信息),模型可以捕捉到输入数据的复杂时空关系。


from distutils.command.config import config
import torch.nn as nn
import math
import numpy as np
import torch
    from torch import irfft
    from torch import rfft
except ImportError:
    def rfft(x, d):
        t = torch.fft.fft(x, dim = (-d))
        r = torch.stack((t.real, t.imag), -1)
        return r
    def irfft(x, d):
        t = torch.fft.ifft(torch.complex(x[:,:,0], x[:,:,1]), dim = (-d))
        return t.real

def dct(x, norm=None):
    Discrete Cosine Transform, Type II (a.k.a. the DCT)

    For the meaning of the parameter `norm`, see:

    :param x: the input signal
    :param norm: the normalization, None or 'ortho'
    :return: the DCT-II of the signal over the last dimension
    x_shape = x.shape
    N = x_shape[-1]
    x = x.contiguous().view(-1, N)

    v = torch.cat([x[:, ::2], x[:, 1::2].flip([1])], dim=1)

    # Vc = torch.fft.rfft(v, 1, onesided=False)
    Vc = rfft(v, 1)

    k = - torch.arange(N, dtype=x.dtype, device=x.device)[None, :] * np.pi / (2 * N)
    W_r = torch.cos(k)
    W_i = torch.sin(k)

    V = Vc[:, :, 0] * W_r - Vc[:, :, 1] * W_i

    if norm == 'ortho':
        V[:, 0] /= np.sqrt(N) * 2
        V[:, 1:] /= np.sqrt(N / 2) * 2

    V = 2 * V.view(*x_shape)

    return V

# class senet_block(nn.Module):
#     def __init__(self, channel=512, ratio=1):
#         super(dct_channel_block, self).__init__()
#         self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
#         self.fc = nn.Sequential(
#                 nn.Linear(channel, channel // 4, bias=False),
#                 nn.ReLU(inplace=True),
#                 nn.Linear(channel //4, channel, bias=False),
#                 nn.Sigmoid()
#         )

#     def forward(self, x):
#         # b, c, l = x.size() # (B,C,L)
#         # y = self.avg_pool(x) # (B,C,L) -> (B,C,1)
#         # print("y",y.shape)
#         x = x.permute(0,2,1)
#         b, c, l = x.size() 
#         y = self.avg_pool(x).view(b, c) # (B,C,L) ->(B,C,1)
#         # print("y",y.shape)
#         # y = self.fc(y).view(b, c, 96)

#         y = self.fc(y).view(b,c,1)
#         # print("y",y.shape)
#         # return x * y
#         return (x*y).permute(0,2,1)
class dct_channel_block(nn.Module):
    def __init__(self, channel):
        super(dct_channel_block, self).__init__()
        # self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
        self.fc = nn.Sequential(
                nn.Linear(channel, channel*2, bias=False),
                nn.Linear( channel*2, channel, bias=False),
        # self.dct_norm = nn.LayerNorm([512], eps=1e-6)
        self.dct_norm = nn.LayerNorm([96], eps=1e-6)#for lstm on length-wise
        # self.dct_norm = nn.LayerNorm([36], eps=1e-6)#for lstm on length-wise on ill with input =36

    def forward(self, x):
        b, c, l = x.size() # (B,C,L) (32,96,512)
        # y = self.avg_pool(x) # (B,C,L) -> (B,C,1)
        # y = self.avg_pool(x).view(b, c) # (B,C,L) -> (B,C,1)
        # print("y",y.shape
        # y = self.fc(y).view(b, c, 96)
        list = []
        for i in range(c):
            # print("freq-shape:",freq.shape)

        stack_dct = torch.tensor(stack_dct)
        for traffic mission:f_weight = self.dct_norm(f_weight.permute(0,2,1))#matters for traffic datasets
        lr_weight = self.dct_norm(stack_dct) 
        lr_weight = self.fc(stack_dct)
        lr_weight = self.dct_norm(lr_weight) 
        # print("lr_weight",lr_weight.shape)
        return x *lr_weight #result

if __name__ == '__main__':
    tensor = torch.rand(8,7,96)
    dct_model = dct_channel_block()
    result = dct_model.forward(tensor) 


1. 导入库和兼容性处理

from distutils.command.config import config
import torch.nn as nn
import math
import numpy as np
import torch

导入了 PyTorch 的 nn 模块用于神经网络构建,mathnumpy 用于数学运算。

    from torch import irfft
    from torch import rfft
except ImportError:
    def rfft(x, d):
        t = torch.fft.fft(x, dim=(-d))
        r = torch.stack((t.real, t.imag), -1)
        return r

    def irfft(x, d):
        t = torch.fft.ifft(torch.complex(x[:, :, 0], x[:, :, 1]), dim=(-d))
        return t.real

这一段代码用于兼容旧版本的 PyTorch,定义了 rfftirfft 函数。在较新的 PyTorch 版本中,rfftirfft 函数可能已被替换,因此这里使用自定义实现确保代码的兼容性。

2. DCT(离散余弦变换)

def dct(x, norm=None):

这个函数实现了 离散余弦变换 (DCT),具体是 DCT-II 类型。DCT 是一种频域转换,通常用于信号处理和特征提取,它可以将时域信号转换为频域表示。

  • 重新排列数据:将输入张量中的偶数和奇数索引部分组合在一起。
  • 进行 FFT(快速傅里叶变换):通过 rfft 函数进行快速傅里叶变换,将信号转换到频域。
  • 正弦和余弦加权:通过 cossin 函数对 FFT 的结果进行加权,得到离散余弦变换的结果。
  • 归一化处理:如果指定 norm='ortho',进行正交归一化。

3. DCT 通道块

class dct_channel_block(nn.Module):
    def __init__(self, channel):
        super(dct_channel_block, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(channel, channel * 2, bias=False),
            nn.Linear(channel * 2, channel, bias=False),
        self.dct_norm = nn.LayerNorm([96], eps=1e-6)

这个类实现了基于 DCT 的通道块,主要用于对输入张量进行变换,增强特征的表达能力。

  • 全连接层(fc:使用了两层全连接层,结合 ReLU 激活函数和 Dropout,来对 DCT 变换后的频域特征进行进一步处理。
  • LayerNorm(dct_norm:用于标准化处理,以稳定训练过程。
前向传播 (forward):
def forward(self, x):
    b, c, l = x.size() # (B,C,L) (32,96,512)
    list = []
    for i in range(c):
        freq = dct(x[:, i, :])

    stack_dct = torch.stack(list, dim=1)
    stack_dct = torch.tensor(stack_dct)
    lr_weight = self.dct_norm(stack_dct)
    lr_weight = self.fc(stack_dct)
    lr_weight = self.dct_norm(lr_weight)
    return x * lr_weight
  • 输入x 的形状为 (B, C, L),其中 B 是批量大小,C 是通道数,L 是序列长度。
  • DCT 变换:对于每个通道,计算 DCT,得到频域表示。
  • 处理 DCT 结果:将 DCT 结果通过全连接层和标准化层处理,得到特征权重。
  • 输出:将原始输入与 DCT 处理后的权重相乘,得到最终的输出。

4. 主函数

if __name__ == '__main__':
    tensor = torch.rand(8, 7, 96)
    dct_model = dct_channel_block(96)
    result = dct_model.forward(tensor)
    print("result.shape:", result.shape)

这部分代码用于测试 dct_channel_block 类:

  • 输入:随机生成一个形状为 (8, 7, 96) 的张量,代表批量大小为 8,通道数为 7,序列长度为 96。
  • 模型实例化:实例化 dct_channel_block,并使用前向传播方法对输入进行处理。
  • 输出:打印处理后的输出张量的形状。


  • DCT 的作用:DCT 可以将时域信号转换为频域信号,在信号处理和特征提取中非常有用。通过这种变换,可以捕捉输入数据中的频率模式,从而增强模型的表达能力。
  • 通道块的作用dct_channel_block 结合 DCT 和全连接层来处理输入特征,并生成频域权重以调整输入特征。
  • 代码的目的:实现了一种结合 DCT 的特征增强机制,旨在通过频域处理提升神经网络在时序数据上的表现。


from pip import main
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct
import numpy as np
import math
from layers.dctnet import dct_channel_block,dct
# from dctnet import dct_channel_block,dct #for parameters calc
import argparse
from fvcore.nn import FlopCountAnalysis,parameter_count_table  

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class Model(nn.Module):#2022.11.7修改前,这个Model能跑通
    def __init__(self,configs,input_size=None,hidden_size=16, output_size=None,batch_size=None,num_layers=2):#input_size与output_size是通道数
        super(Model, self).__init__()
        # self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.input_size = configs.enc_in #channel
        self.hidden_size = hidden_size #输出维度 也就是输出通道
        self.num_layers = num_layers
        self.output_size = configs.enc_in #channel #输出个数
        self.num_directions = 1 # 单向LSTM
        self.batch_size = configs.batch_size
        self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
        self.linear = nn.Linear(self.hidden_size, self.output_size)#通道数对齐层
        self.linear_out_len = nn.Linear(self.seq_len, self.pred_len)#输出长度对齐层
        # self.linear = nn.Linear()
        #add dce-block
        self.dct_norm = nn.LayerNorm([configs.enc_in], eps=1e-6)#作为模块一般normal channel效果好点
        # self.dct_norm = nn.LayerNorm([512], eps=1e-6)#作为模块一般normal channel效果好点
    def forward(self, x):
        # x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL
        # b, c, l = x.size() # (B,C,L)

        # batch_size, seq_len = x.shape[0], x.shape[1]
        # h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size)
        # c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size)
        h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
        c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
        # output(batch_size, seq_len, num_directions * hidden_size)
        output, _ = self.lstm(x, (h_0, c_0)) # output(5, 30, 64)
        # print("output.shape:",output.shape)#result.shape: torch.Size([8, 96, 8])
        result = self.linear(output)  # (B,L,C)
        # output.shape: torch.Size([8, 96, 16])#16是hidden_size
        # result.shape: torch.Size([8, 96, 8])#8是为了符合通道数对齐,exchange_rate有8个变量
        result = self.dct_layer(result.permute(0,2,1))#加入dct模块,mse降低0.12个点
        result_len =  self.linear_out_len(result)#为了输出长度对齐 (8,8,96)
        # result_len =  self.linear_out_len(result.permute(0,2,1))#为了输出长度对齐 (8,8,96)

        # print("result.shape:",result.shape)#result.shape: torch.Size([8, 96, 8])
        # result = result.permute(0,2,1)#(B,L,C)=》(B,C,L)
        # result = self.dct_layer(result_len)#加入dct模块,mse降低0.12个点
        # result = result.permute(0,2,1)#(B,C,L)=》(B,L,C)

        return  result_len.permute(0,2,1)
# lstm = LSTM(input_size=7,hidden_size=64, output_size=7,batch_size=8,num_layers=5).to(device)
# tensor = torch.rand(8, 96, 7).to(device)
# result = lstm(tensor)
# print("result.shape:",result.shape)
if __name__ == '__main__': 
    parser = argparse.ArgumentParser(description='Autoformer & Transformer family for Time Series Forecasting')
    # forecasting task
    parser.add_argument('--seq_len', type=int, default=96, help='input sequence length')
    parser.add_argument('--label_len', type=int, default=48, help='start token length')
    parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
    parser.add_argument('--embed_type', type=int, default=0, help='0: default 1: value embedding + temporal embedding + positional embedding 2: value embedding + temporal embedding 3: value embedding + positional embedding 4: value embedding')
    parser.add_argument('--enc_in', type=int, default=7, help='encoder input size') # DLinear with --individual, use this hyperparameter as the number of channels
    parser.add_argument('--dec_in', type=int, default=7, help='decoder input size')
    parser.add_argument('--c_out', type=int, default=7, help='output size')
    parser.add_argument('--d_model', type=int, default=512, help='dimension of model')
    parser.add_argument('--n_heads', type=int, default=8, help='num of heads')
    parser.add_argument('--e_layers', type=int, default=2, help='num of encoder layers')
    parser.add_argument('--d_layers', type=int, default=1, help='num of decoder layers')

    # optimization
    parser.add_argument('--num_workers', type=int, default=10, help='data loader num workers')
    parser.add_argument('--itr', type=int, default=2, help='experiments times')
    parser.add_argument('--train_epochs', type=int, default=10, help='train epochs')
    parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')
    parser.add_argument('--patience', type=int, default=3, help='early stopping patience')
    parser.add_argument('--learning_rate', type=float, default=0.0001, help='optimizer learning rate')
    parser.add_argument('--des', type=str, default='test', help='exp description')
    parser.add_argument('--loss', type=str, default='mse', help='loss function')
    parser.add_argument('--lradj', type=str, default='type1', help='adjust learning rate')
    parser.add_argument('--use_amp', action='store_true', help='use automatic mixed precision training', default=False)

    # GPU
    parser.add_argument('--use_gpu', type=bool, default=True, help='use gpu')
    parser.add_argument('--gpu', type=int, default=0, help='gpu')
    parser.add_argument('--use_multi_gpu', action='store_true', help='use multiple gpus', default=False)
    parser.add_argument('--devices', type=str, default='0,1,2,3', help='device ids of multile gpus')
    parser.add_argument('--test_flop', action='store_true', default=False, help='See utils/tools for usage')

    args = parser.parse_args()
    model = Model(args)

这段代码实现了一个基于 LSTM(长短期记忆网络)和 DCT(离散余弦变换)的时间序列预测模型,并且可以处理不同的模型配置和超参数。这段代码主要用于时间序列数据的预测任务,具体解释如下:

1. 模型类 Model

class Model(nn.Module):
    def __init__(self, configs, input_size=None, hidden_size=16, output_size=None, batch_size=None, num_layers=2):
        super(Model, self).__init__()
  • configs:通过命令行传递的配置参数(如序列长度、批量大小、隐藏层大小等)。
  • input_size:输入的特征数量(通道数),从 configs.enc_in 中获取。
  • hidden_size:LSTM 隐藏层的大小,默认是 16,即 LSTM 输出的特征维度。
  • num_layers:LSTM 网络的层数,默认为 2
  • output_size:模型的输出特征数,从 configs.enc_in 中获取。


  • 一个 LSTM 层,用于处理输入的时间序列数据。
  • 两个 线性层linearlinear_out_len),用于通道数和序列长度的对齐。
  • 一个基于 DCT 的增强模块 dct_channel_block,用于提取频域特征。

2. LSTM 和线性层

self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
self.linear = nn.Linear(self.hidden_size, self.output_size)  # 通道数对齐
self.linear_out_len = nn.Linear(self.seq_len, self.pred_len)  # 输出长度对齐
  • LSTM 层:输入维度为 input_size,输出维度为 hidden_size,LSTM 层数为 num_layers
  • 线性层 linear:用于将 LSTM 的输出从 hidden_size 映射回输入的特征数量(通道数)。
  • 线性层 linear_out_len:用于将序列长度从 seq_len 映射到预测的序列长度 pred_len

3. DCT 通道块

self.dct_layer = dct_channel_block(configs.seq_len)
self.dct_norm = nn.LayerNorm([configs.enc_in], eps=1e-6)
  • dct_channel_block:通过 DCT(离散余弦变换)提取频域信息,并对输入特征进行处理。它有助于改善模型对时间序列中的周期性或频域特征的捕捉能力。
  • dct_norm:标准化层,用于将经过 DCT 处理的特征进行归一化。

4. 前向传播函数 forward

def forward(self, x):
    h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
    c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
    output, _ = self.lstm(x, (h_0, c_0))  # LSTM 前向传播
    result = self.linear(output)  # 通道数对齐

    result = self.dct_layer(result.permute(0, 2, 1))  # DCT 模块
    result_len = self.linear_out_len(result)  # 长度对齐

    return result_len.permute(0, 2, 1)
  • 初始化隐藏状态和细胞状态h_0c_0 是 LSTM 的初始隐藏状态和细胞状态。
  • LSTM 前向传播:输入 x 经过 LSTM 层,得到 output,并进行通道数对齐。
  • DCT 处理:经过 LSTM 输出后的结果通过 DCT 模块处理,并对频域特征进行操作。
  • 长度对齐:通过 linear_out_len 线性层调整预测序列的长度为 pred_len
  • 输出:返回最终的预测结果,并调整形状。

5. 命令行参数配置

parser = argparse.ArgumentParser(description='Autoformer & Transformer family for Time Series Forecasting')
parser.add_argument('--seq_len', type=int, default=96, help='input sequence length')
parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
parser.add_argument('--enc_in', type=int, default=7, help='encoder input size')
parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data')
args = parser.parse_args()
model = Model(args)
  • seq_len:输入序列的长度。
  • pred_len:预测序列的长度。
  • enc_in:输入特征的通道数。
  • batch_size:批量大小。

通过 argparse 模块,可以通过命令行传入这些参数来控制模型的配置。

6. 模型参数和 FLOPs 计算

from fvcore.nn import FlopCountAnalysis, parameter_count_table  
  • parameter_count_table:用于计算模型的参数数量。
  • FlopCountAnalysis:用于分析模型的浮点操作(FLOPs)。

这段代码通过 fvcore 库计算模型的参数量,并输出这些信息以便进行模型复杂度的评估。


  • 该模型结合了 LSTMDCT,用于时间序列数据的预测。
  • 通过 DCT 增强模型对频域特征的捕捉能力,有助于提高模型的预测性能。
  • 通过命令行参数,模型的配置可以灵活调整,并且支持多 GPU 训练和 FLOPs 计算。


import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct
import numpy as np
import math
from layers.dctnet import dct_channel_block,dct
class Model(nn.Module):#2022.11.7修改前,这个Model能跑通#CNN
    def __init__(self,configs):
        super(Model, self).__init__()
        # self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.channel_num = configs.enc_in
        self.conv1 = nn.Conv1d(in_channels=self.channel_num,out_channels=self.channel_num,kernel_size=5,stride=1,padding=2) #输入通道数和输出通道数应该一样
        # input = torch.randn(32,7,96)
        # batch_size x text_len x embedding_size -> batch_size x embedding_size x text_len
        self.linear = nn.Linear(self.seq_len,self.pred_len)
        # self.dct_norm = nn.LayerNorm([7], eps=1e-6)#作为模块一般normal channel效果好点

    def forward(self, x):
        # print("x.shape:",x.shape)
        x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL and 1-d conv
        # b, c, l = x.size() # (B,C,L)
        out = self.conv1(x) #b,c,l
        out = self.linear(x)#b,c,l
        # print(out.size())
        # out  = self.dct_layer(out)#加入dct模块,mse降低0.12个点
        # out = self.linear(x)#b,c,l
        # x = x+mid
        # x = x.permute(0,2,1) 
        # x = self.dct_norm(x) #norm 144
        # x = x.permute(0,2,1) 
        return  (out).permute(0,2,1)#b,l,c

这段代码定义了一个基于 CNN(卷积神经网络)DCT(离散余弦变换) 的简单时间序列预测模型。模型通过 1D 卷积和 DCT 增强特征提取,结合线性层对输出进行预测。


1. 模型初始化
class Model(nn.Module):
    def __init__(self, configs):
        super(Model, self).__init__()
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.channel_num = configs.enc_in
        self.conv1 = nn.Conv1d(in_channels=self.channel_num, out_channels=self.channel_num, kernel_size=5, stride=1, padding=2)
        self.dct_layer = dct_channel_block(self.seq_len)
        self.linear = nn.Linear(self.seq_len, self.pred_len)
  • seq_len:输入序列的长度。
  • pred_len:预测序列的长度。
  • channel_num:输入的通道数,代表时间序列的特征数量(例如 7 维度的输入表示有 7 个不同的特征)。
  • conv1:一维卷积层,输入和输出通道数保持一致,卷积核大小为 5,步幅为 1,并使用 padding=2 保证输入和输出序列的长度保持不变。

  • dct_layer:通过 dct_channel_block 提取频域特征,用离散余弦变换增强特征。

  • linear:线性层用于将序列长度从 seq_len 映射到预测长度 pred_len,在输出前对时间序列进行长度调整。

2. 前向传播函数 forward
def forward(self, x):
    x = x.permute(0, 2, 1)  # 转置输入,调整维度顺序 (B, L, C) -> (B, C, L)
    out = self.conv1(x)  # 应用一维卷积,提取特征 (B, C, L)
    out = self.linear(x)  # 应用线性层进行序列长度变换
    return out.permute(0, 2, 1)  # 再次转置,调整维度回到 (B, L, C)
  • 输入:输入 x 的形状为 (B, L, C),即 B 表示批量大小,L 表示序列长度,C 表示输入的特征维度(通道数)。

  • 转置:在进行 1D 卷积前,先通过 x.permute(0, 2, 1) 将输入从形状 (B, L, C) 转为 (B, C, L),使得输入符合 1D 卷积的要求。

  • 卷积:通过 conv1 提取特征,卷积后维度仍然是 (B, C, L)

  • 线性层:将经过卷积后的特征通过线性层 linear,将序列长度从 seq_len 转换为 pred_len

  • 输出:最后将输出再转置回 (B, L, C),作为最终的预测结果。


# out  = self.dct_layer(out)  # DCT 层:可选的 DCT 增强模块,用于通过离散余弦变换进一步处理特征。
# out = self.linear(x)  # 可选的线性层,可以用于对卷积后输出再进行进一步处理。
# x = self.dct_norm(x)  # 标准化层,用于标准化 DCT 后的特征。

这些部分暂时被注释掉了,表示 DCT 层和标准化层可能是可选的改进点。在某些情况下,通过 DCT 提取频域特征可以提高模型的性能。

3. 总结

  • 卷积操作:1D 卷积用于捕捉输入时间序列的局部特征。
  • DCT 操作:可选的 DCT 层能够将时域信号转换为频域信号,捕捉输入数据中的周期性或频率特征,从而进一步增强模型的表现。
  • 线性映射:线性层负责调整序列的长度,保证输入和输出长度匹配。

这个模型设计相对简单,但由于使用了卷积操作和 DCT 层,因此在处理具有时空依赖性的数据(如时间序列预测)时有一定优势。


import torch
import torch.nn as nn
import torch.nn.functional as F
from layers.Transformer_EncDec import Decoder, DecoderLayer, Encoder, EncoderLayer, ConvLayer
from layers.SelfAttention_Family import FullAttention, AttentionLayer
from layers.Embed import DataEmbedding,DataEmbedding_wo_pos,DataEmbedding_wo_temp,DataEmbedding_wo_pos_temp
import numpy as np
from layers.dctnet import dct_channel_block,dct

class Model(nn.Module):
    Vanilla Transformer with O(L^2) complexity
    def __init__(self, configs):
        super(Model, self).__init__()
        self.pred_len = configs.pred_len
        self.output_attention = configs.output_attention

        # Embedding
        if configs.embed_type == 0:
            self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,
            self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq,
        elif configs.embed_type == 1:
            self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq,
            self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq,
        elif configs.embed_type == 2:
            self.enc_embedding = DataEmbedding_wo_pos(configs.enc_in, configs.d_model, configs.embed, configs.freq,
            self.dec_embedding = DataEmbedding_wo_pos(configs.dec_in, configs.d_model, configs.embed, configs.freq,

        elif configs.embed_type == 3:
            self.enc_embedding = DataEmbedding_wo_temp(configs.enc_in, configs.d_model, configs.embed, configs.freq,
            self.dec_embedding = DataEmbedding_wo_temp(configs.dec_in, configs.d_model, configs.embed, configs.freq,
        elif configs.embed_type == 4:
            self.enc_embedding = DataEmbedding_wo_pos_temp(configs.enc_in, configs.d_model, configs.embed, configs.freq,
            self.dec_embedding = DataEmbedding_wo_pos_temp(configs.dec_in, configs.d_model, configs.embed, configs.freq,
        # Encoder
        self.encoder = Encoder(
                        FullAttention(False, configs.factor, attention_dropout=configs.dropout,
                                      output_attention=configs.output_attention), configs.d_model, configs.n_heads),
                ) for l in range(configs.e_layers)
        # Decoder
        self.decoder = Decoder(
                        FullAttention(True, configs.factor, attention_dropout=configs.dropout, output_attention=False),
                        configs.d_model, configs.n_heads),
                        FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=False),
                        configs.d_model, configs.n_heads),
                for l in range(configs.d_layers)
            projection=nn.Linear(configs.d_model, configs.c_out, bias=True)
        self.dct_norm = nn.LayerNorm([512], eps=1e-6)

    def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,
                enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):

        enc_out = self.enc_embedding(x_enc, x_mark_enc)
        enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)
        # print("enc_out.shape:",enc_out.shape)#enc_out.shape: torch.Size([32, 96, 512])
        # mid  = self.dct_layer(enc_out)
        # enc_out = enc_out+mid
        # enc_out = self.dct_norm(enc_out) #norm 144

        dec_out = self.dec_embedding(x_dec, x_mark_dec)
        dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)

        if self.output_attention:
            return dec_out[:, -self.pred_len:, :], attns
            return dec_out[:, -self.pred_len:, :]  # [B, L, D]

这段代码实现了一个结合 TransformerDCT(离散余弦变换) 的神经网络模型,主要用于时间序列预测任务。模型结构基于 Transformer 编码器-解码器框架,支持多种嵌入方式,并集成了 DCT 通道块用于增强特征处理。

1. 模型简介

该模型的核心是标准的 Transformer 编码器-解码器结构,具有以下主要模块:

  • 嵌入模块:通过 DataEmbedding 和其变种为输入数据生成嵌入表示。
  • 编码器(Encoder):基于多层自注意力机制的编码器,用于提取输入序列的高维特征。
  • 解码器(Decoder):结合自注意力和交叉注意力的解码器,用于生成预测序列。
  • DCT 模块:用于对编码器输出进行频域增强处理(当前注释掉,作为可选模块)。

2. 嵌入模块

if configs.embed_type == 0:
    self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq, configs.dropout)
    self.dec_embedding = DataEmbedding(configs.dec_in, configs.d_model, configs.embed, configs.freq, configs.dropout)
  • DataEmbedding:为输入数据生成嵌入。嵌入方式由 configs.embed_type 控制,支持五种不同类型的嵌入:
    • 0:默认的嵌入(值、时间、位置嵌入的组合)。
    • 1:与 embed_type=0 相同。
    • 2:没有位置嵌入。
    • 3:没有时间嵌入。
    • 4:没有时间和位置嵌入。

3. 编码器(Encoder)

self.encoder = Encoder(
                FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=configs.output_attention),
                configs.d_model, configs.n_heads),
        ) for l in range(configs.e_layers)
  • Encoder:由多个 EncoderLayer 组成,每层包括自注意力机制和前馈神经网络。
  • FullAttention:注意力机制,False 表示不使用掩码(即标准自注意力)。
  • LayerNorm:标准化层,用于稳定训练。

4. 解码器(Decoder)

self.decoder = Decoder(
                FullAttention(True, configs.factor, attention_dropout=configs.dropout, output_attention=False),
                configs.d_model, configs.n_heads),
                FullAttention(False, configs.factor, attention_dropout=configs.dropout, output_attention=False),
                configs.d_model, configs.n_heads),
        for l in range(configs.d_layers)
    projection=nn.Linear(configs.d_model, configs.c_out, bias=True)
  • Decoder:由多个 DecoderLayer 组成,每层结合了自注意力和交叉注意力机制。
  • projection:最后一层是线性层,用于将解码器的输出投影到目标输出维度(configs.c_out)。

5. DCT 通道块

self.dct_layer = dct_channel_block(512)
self.dct_norm = nn.LayerNorm([512], eps=1e-6)
  • dct_channel_block:DCT 模块用于对编码器的输出进行频域变换,但该部分目前被注释掉。
  • dct_norm:LayerNorm 层用于对经过 DCT 的特征进行标准化处理。

6. 前向传播函数

def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,
            enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):

    enc_out = self.enc_embedding(x_enc, x_mark_enc)
    enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)
    # DCT 模块(可选)
    # mid  = self.dct_layer(enc_out)
    # enc_out = enc_out + mid
    # enc_out = self.dct_norm(enc_out)

    dec_out = self.dec_embedding(x_dec, x_mark_dec)
    dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)

    if self.output_attention:
        return dec_out[:, -self.pred_len:, :], attns
        return dec_out[:, -self.pred_len:, :]
  • 输入

    • x_encx_mark_enc 是编码器的输入和时间标记。
    • x_decx_mark_dec 是解码器的输入和时间标记。
    • enc_self_maskdec_self_maskdec_enc_mask 是可选的注意力掩码。
  • 编码器:通过 enc_embeddingencoder 模块处理编码器输入。

  • DCT:注释掉的 DCT 模块可以对编码器输出进行频域处理。

  • 解码器:通过 dec_embeddingdecoder 模块处理解码器输入,并结合编码器的输出进行预测。

  • 输出

    • 如果 output_attentionTrue,返回解码器的输出和注意力矩阵。
    • 否则,只返回解码器的最终预测。

7. 总结

  • 该模型是一个结合 Transformer 和 DCT 的时间序列预测模型。
  • Transformer 编码器-解码器架构使模型能够捕捉复杂的时间依赖关系。
  • DCT 模块作为可选部分,能够增强模型对频域特征的捕捉。
  • 模型支持多种嵌入方式,通过调整 configs.embed_type 来选择不同的嵌入策略。


import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
import torch_dct as dct

import numpy as np
import math
# class Model(nn.Module):SENET for ETTmx

#     """
#     Just one Linear layer
#     """
#     def __init__(self,configs,channel=7,ratio=1):
#         super(Model, self).__init__()

#         self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
#         self.fc = nn.Sequential(
#                 nn.Linear(7,14, bias=False),
#                 nn.Dropout(p=0.1),
#                 nn.ReLU(inplace=True) ,
#                 nn.Linear(14,7, bias=False),
#                 nn.Sigmoid()
#         )
#         self.seq_len = configs.seq_len
#         self.pred_len = configs.pred_len

#         self.Linear_More_1 = nn.Linear(self.seq_len,self.pred_len * 2)
#         self.Linear_More_2 = nn.Linear(self.pred_len*2,self.pred_len)
#         self.relu = nn.ReLU()
#         self.gelu = nn.GELU()    

#         self.drop = nn.Dropout(p=0.1)
#         # Use this line if you want to visualize the weights
#     def forward(self, x):
#         # x: [Batch, Input length, Channel]
#         x = x.permute(0,2,1) # (B,L,C)->(B,C,L)
#         b, c, l = x.size() # (B,C,L)
#         y = self.avg_pool(x).view(b, c) # (B,C,L) 
#         # np.save('f_weight.npy', f_weight_np)
# #         # np.save('%d f_weight.npy' %epoch, f_weight_np)
#         # print("y",y.shape)
#         # return (x * y).permute(0,2,1)
#         return (z).permute(0,2,1)

class my_Layernorm(nn.Module):
    Special designed layernorm for the seasonal part
    def __init__(self, channels):
        super(my_Layernorm, self).__init__()
        self.layernorm = nn.LayerNorm(channels)

    def forward(self, x):
        x_hat = self.layernorm(x)
        bias = torch.mean(x_hat, dim=1).unsqueeze(1).repeat(1, x.shape[1], 1)
        return x_hat - bias
class Model(nn.Module):
    def __init__(self,configs,channel=96,ratio=1):
        super(Model, self).__init__()
        # self.avg_pool = nn.AdaptiveAvgPool1d(1) #innovation
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.channel_num = configs.enc_in
        self.fc = nn.Sequential(
                nn.Linear(channel, channel*2, bias=False),
                nn.Linear( channel*2, channel, bias=False),
        self.fc_inverse = nn.Sequential(
            nn.Linear(channel, channel//2, bias=False),
            nn.Linear( channel//2, channel, bias=False),
        # self.fc_plot = nn.Linear(channel, channel, bias=False)
        self.mid_Linear = nn.Linear(self.seq_len, self.seq_len)

        self.Linear = nn.Linear(self.seq_len, self.pred_len)
        self.Linear_1 = nn.Linear(self.seq_len, self.pred_len)
        # self.dct_norm = nn.LayerNorm([self.channel_num], eps=1e-6)
        self.dct_norm = nn.LayerNorm(self.seq_len, eps=1e-6)
        # self.my_layer_norm = nn.LayerNorm([96], eps=1e-6)
    def forward(self, x):
        x = x.permute(0,2,1) # (B,L,C)=》(B,C,L)#forL

        b, c, l = x.size() # (B,C,L)
        list = []
        for i in range(c):#i represent channel 
            freq=dct.dct(x[:,i,:])     #dct
            # print("freq-shape:",freq.shape)
        stack_dct = torch.tensor(stack_dct)#(B,L,C)
        stack_dct = self.dct_norm(stack_dct)#matters for traffic
        f_weight = self.fc(stack_dct)
        f_weight = self.dct_norm(f_weight)#matters for traffic

        #visualization for fecam tensor
        f_weight_cpu = f_weight
        f_weight_np = f_weight_cpu.cpu().detach().numpy()
        np.save('f_weight_weather_wf.npy', f_weight_np)
        # np.save('%d f_weight.npy' %epoch, f_weight_np)


        # f_weight = self.dct_norm(f_weight.permute(0,2,1))#matters for traffic
        # result = self.Linear(x)#forL
        # f_weight_np = result.cpu().detach().numpy()
        # np.save('f_weight.npy', f_weight_np)
        # x = x.permute(0,2,1)
        # result = self.Linear((x *(f_weight_inverse)))#forL 
        result = self.Linear((x *(f_weight)))#forL
        return  result.permute(0,2,1)

这段代码实现了一个时间序列预测模型,模型结合了 DCT(离散余弦变换)线性层 来对输入的序列进行处理,并输出预测结果。模型在数据处理中引入了频域变换,并对频域特征进行了加权处理。以下是详细的解释:

1. my_Layernorm

class my_Layernorm(nn.Module):
    Special designed layernorm for the seasonal part
    def __init__(self, channels):
        super(my_Layernorm, self).__init__()
        self.layernorm = nn.LayerNorm(channels)

    def forward(self, x):
        x_hat = self.layernorm(x)
        bias = torch.mean(x_hat, dim=1).unsqueeze(1).repeat(1, x.shape[1], 1)
        return x_hat - bias
  • 作用:实现了一种特殊的 LayerNorm 层,它对每个样本进行标准化,并减去每个样本的均值,目的是更好地处理序列中具有季节性或周期性的特征。

2. 模型 Model 初始化

class Model(nn.Module):
    def __init__(self, configs, channel=96, ratio=1):
        super(Model, self).__init__()
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len
        self.channel_num = configs.enc_in
        self.fc = nn.Sequential(
            nn.Linear(channel, channel*2, bias=False),
            nn.Linear(channel*2, channel, bias=False),
        self.fc_inverse = nn.Sequential(
            nn.Linear(channel, channel//2, bias=False),
            nn.Linear(channel//2, channel, bias=False),
        self.mid_Linear = nn.Linear(self.seq_len, self.seq_len)
        self.Linear = nn.Linear(self.seq_len, self.pred_len)
        self.dct_norm = nn.LayerNorm(self.seq_len, eps=1e-6)
  • fcfc_inverse:两个线性层用于调整输入通道的权重。fc 是增大通道维度的网络,fc_inverse 是缩小通道维度的网络,主要用于特征的重整和处理。
  • mid_LinearLinear:分别用于序列长度的映射。mid_Linear 用于中间的特征处理,Linear 将输入的序列长度从 seq_len 映射到 pred_len
  • dct_norm:用于对经过 DCT 处理的频域特征进行标准化,以确保数据分布稳定,减少模型训练的波动。

3. 前向传播 forward

def forward(self, x):
    x = x.permute(0, 2, 1)  # 调整输入维度 (B, L, C) -> (B, C, L)

    b, c, l = x.size()  # 获取输入数据的维度 (Batch, Channel, Length)
    list = []

    # 对每个通道的输入数据进行 DCT
    for i in range(c):
        freq = dct.dct(x[:, i, :])  # 对第 i 个通道应用 DCT
    # 将各通道的 DCT 结果堆叠在一起
    stack_dct = torch.stack(list, dim=1)
    stack_dct = torch.tensor(stack_dct)

    # 对 DCT 结果进行 LayerNorm 归一化
    stack_dct = self.dct_norm(stack_dct)
    # 通过全连接层对 DCT 结果进行处理
    f_weight = self.fc(stack_dct)
    f_weight = self.dct_norm(f_weight)

    # 保存处理后的权重用于可视化
    f_weight_cpu = f_weight
    f_weight_np = f_weight_cpu.cpu().detach().numpy()
    np.save('f_weight_weather_wf.npy', f_weight_np)

    # 将原始输入和经过处理的 DCT 权重相乘,并通过线性层进行序列预测
    result = self.Linear(x * f_weight)  # 结合权重后的输入数据进行预测
    return result.permute(0, 2, 1)  # 调整输出维度回 (B, L, C)
  1. 维度调整:通过 permute 将输入的维度从 (B, L, C) 转换为 (B, C, L),方便后续进行 1D 操作。
  2. DCT 处理:对每个通道的数据应用 DCT(离散余弦变换),捕捉数据的频域特征。
  3. 堆叠 DCT 结果:将每个通道的 DCT 结果堆叠在一起,形成一个新的张量。
  4. 标准化:通过 LayerNorm 对 DCT 结果进行标准化,确保特征的稳定性。
  5. 全连接层处理:通过全连接层调整 DCT 结果的权重,提取出更高层次的频域特征。
  6. 可视化保存:将处理后的权重保存到 .npy 文件中,用于后续的可视化分析。
  7. 预测结果生成:将输入数据和处理后的权重相乘,然后通过线性层输出预测结果。
  8. 维度恢复:最后将输出的维度恢复为 (B, L, C),与输入数据的形状保持一致。

4. 总结

  • DCT 应用:该模型在每个通道上应用了 DCT,能够提取出输入数据中的频域特征,这对于具有周期性或频率特征的时间序列数据非常有效。
  • 全连接层权重调整:通过全连接层对 DCT 特征进行加权处理,有助于模型学习不同频率成分对预测的影响。
  • 标准化:使用 LayerNorm 保证频域特征在不同通道之间的平衡,避免某些通道的特征在模型中过度放大或缩小。

这个模型结合了 DCT 和神经网络的优势,通过频域特征处理提高了对复杂时间序列数据的预测能力,特别适合具有周期性变化的时间序列任务。



这段代码是一个基于命令行参数的实验框架,用于时间序列预测任务,使用了 Transformer 系列模型(如 Autoformer、Informer、Transformer)。通过命令行参数,可以灵活调整模型、数据集、超参数等配置,执行训练、测试和预测流程。





