当前位置: 首页 > article >正文

pytorch千问模型源码分析

class Qwen2Config(PretrainedConfig):
    model_type = "qwen2"
    # 表明在推理过程中,对于某些操作,模型或库会忽略 past_key_values 的存在。这对于控制序列生成的行为是非常有用的,
    # 特别是在需要初始化生成过程或格式化输出结果时。然而,实际应用中,past_key_values 经常用于加速连续生成过程,特
    # 别是在长时间依赖的场景下
    keys_to_ignore_at_inference = ["past_key_values"]
    def __init__(
        self,
        # 用途:用于初始化嵌入层(embedding layer),以及作为最终全连接层(fully connected layer)的输出维度。
        vocab_size=151936,# 词汇表的大小,即模型可以识别的不同单词或标记的数量。
        hidden_size=4096,# 含义:隐藏层的维度,即每个Transformer编码器或解码器层的输出向量的大小。
        # 决定了模型内部状态的表示能力
        intermediate_size=22016,# 前馈神经网络(feed-forward network, FFN)中间层的维度。
        # FFN通常由两个线性层组成,第一个线性层的输出维度为 intermediate_size,用于提升模型的学习能力。
        num_hidden_layers=32,# Transformer模型中编码器或解码器堆叠的层数。增加模型的深度,以增强其捕捉复杂特征的能力。
        num_attention_heads=32,# 含义:每个Transformer层中多头注意力机制(multi-head attention mechanism)的头数。
        # 允许多个并行的注意力机制运行,从而捕捉不同的特征。
        num_key_value_heads=32,# 每层中用于计算键(Key)和值(Value)的注意力头的数量
        # 优化计算资源,有时候为了节省计算成本,可以设置 num_key_value_heads 小于 num_attention_heads。
        hidden_act="silu",# 隐藏层使用的激活函数。引入非线性,使模型能够学习复杂的映射关系
        max_position_embeddings=32768,# 模型支持的最大位置嵌入的长度。决定了模型能够处理的最大序列长度。
        initializer_range=0.02,# 模型权重初始化的标准差范围。控制模型参数初始化时的随机性。
        rms_norm_eps=1e-6,# RMSNorm 层中使用的数值稳定性项。防止除法运算中的除零错误。
        use_cache=True,# 是否使用缓存机制来存储过去计算的结果。在生成任务中,可以加速推理过程
        tie_word_embeddings=False,# 是否共享输入嵌入层(input embedding)和输出嵌入层(output embedding)的权重。
        # 减少模型参数数量,有时可以提高模型性能。
        rope_theta=10000.0,# 旋转位置嵌入(Rotary Positional Embedding)中的超参数。
        # 帮助模型理解不同位置的相对关系。
        use_sliding_window=False,# 含义:是否使用滑动窗口机制。
        # 用途:在处理长序列时,可以减少内存消耗。
        sliding_window=4096, # 含义:滑动窗口的大小。定义了滑动窗口覆盖的序列长度。
        max_window_layers=28,# 含义:最多可以有多少层使用滑动窗口机制。
        # 用途:限制滑动窗口机制使用的层数,平衡计算效率和模型性能。
        attention_dropout=0.0,# 含义:注意力机制中的Dropout概率。随机丢弃一些注意力权重来防止过拟合。
        **kwargs,
    ):
        self.vocab_size = vocab_size
        self.max_position_embeddings = max_position_embeddings
        self.hidden_size = hidden_size
        self.intermediate_size = intermediate_size
        self.num_hidden_layers = num_hidden_layers
        self.num_attention_heads = num_attention_heads
        self.use_sliding_window = use_sliding_window
        self.sliding_window = sliding_window
        self.max_window_layers = max_window_layers

        # for backward compatibility
        if num_key_value_heads is None:
            num_key_value_heads = num_attention_heads

        self.num_key_value_heads = num_key_value_heads
        self.hidden_act = hidden_act
        self.initializer_range = initializer_range
        self.rms_norm_eps = rms_norm_eps
        self.use_cache = use_cache
        self.rope_theta = rope_theta
        self.attention_dropout = attention_dropout

        super().__init__(
            tie_word_embeddings=tie_word_embeddings,
            **kwargs,
        )
# 规范化技术,旨在替代传统的 Layer Normalization(LN)
# 核心思想是对输入张量的每个样本的每个特征进行规范化,使其均值为 0,方差为 1
class Qwen2RMSNorm(nn.Module):
    def __init__(self, hidden_size, eps=1e-6): # 隐藏层的大小
        super().__init__()
        # 一个可学习的权重参数,初始化为全 1 张量。
        self.weight = nn.Parameter(torch.ones(hidden_size))
        # 用于防止除零错误的小常数。
        self.variance_epsilon = eps
    def forward(self, hidden_states):
        # 记录输入张量的数据类型,以便最终转换回原始类型。
        input_dtype = hidden_states.dtype
        # 转换为 torch.float32 类型,以确保数值稳定性。
        hidden_states = hidden_states.to(torch.float32)
        # 计算每个样本的方差
        variance = hidden_states.pow(2).mean(-1, keepdim=True)
        # 计算每个样本的 RMS 值,并对每个样本进行规范化
        hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
        # 应用可学习的权重,其中 γγ 是一个可学习的参数,用于缩放规范化后的张量。
        return self.weight * hidden_states.to(input_dtype)
# 用于生成旋转位置嵌入。这种嵌入方法在 Transformer 模型中用于捕捉序列中的位置信息,尤其适用于长序列任务。
# 通过旋转的方式将位置信息编码到嵌入向量中。具体步骤如下:
# 生成频率:通过指数函数生成一系列频率值。计算正弦和余弦:利用生成的频率计算正弦和余弦值
# ,旋转嵌入:将输入向量按一定规则旋转,以嵌入位置信息。
class Qwen2RotaryEmbedding(nn.Module):
    def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None):
        super().__init__()
        self.dim = dim
        # 最大位置嵌入的长度,默认为 2048,base:基数,默认为 10000。。
        self.max_position_embeddings = max_position_embeddings
        self.base = base
        # inv_freq:计算频率的逆值。
        # 位置列表先归一化(从绝对位置变成相对位置),之后取指数(1--接近10000),之后取倒数,位置从1--越来越小
        inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2, dtype=torch.int64).float().to(device) / self.dim))
        # register_buffer:将 inv_freq 注册为缓冲区,以便在模型保存和加载时保持不变。
        # register_buffer 方法用于注册一个非训练的缓冲区(buffer),这意味着它不会被梯度更新。当你使用 register_buffer 注册一个缓
        # 冲区时,它会被保存在模型的状态字典(state dict)中,并且在模型保存和加载时也会被序列化。
        # persistent=True:缓冲区会出现在模型的状态字典中,并且会被序列化和加载。
        # persistent=False:缓冲区不会出现在模型的状态字典中,但在实际保存和加载时,仍然会被序列化并加载。
        self.register_buffer("inv_freq", inv_freq, persistent=False)
        # Build here to make `torch.jit.trace` work.生成正弦和余弦缓存
        self._set_cos_sin_cache(
            seq_len=max_position_embeddings, device=self.inv_freq.device, dtype=torch.get_default_dtype()
        )
    def _set_cos_sin_cache(self, seq_len, device, dtype):
        self.max_seq_len_cached = seq_len
        # t 是一个包含位置索引的张量,形状为 (seq_len,)。
        t = torch.arange(self.max_seq_len_cached, device=device, dtype=torch.int64).type_as(self.inv_freq)
        # torch.outer:计算外积,得到一个形状为 (seq_len, dim/2) 的张量
        freqs = torch.outer(t, self.inv_freq) # 计算频率。
        # Different from paper, but it uses a different permutation in order to obtain the same calculation
        # 拼接频率。emb 的形状为 (seq_len, dim)。
        # 在旋转位置嵌入(RoPE)中,我们通常将嵌入向量分为两个部分,并分别应用正弦和余弦变换。具体来说:
        # 对于每个位置 tt,计算频率 ff,得到一个形状为 (seq_len, dim/2) 的张量。
        # 将频率张量拼接两次,得到一个形状为 (seq_len, dim) 的张量。
        # 这样做的原因是,我们将嵌入向量分为两部分,每部分对应一个频率值。
        emb = torch.cat((freqs, freqs), dim=-1)
        # cos_cached 和 sin_cached:注册正弦和余弦缓存。
        self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False)
        self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False)
    def forward(self, x, seq_len=None): # x:输入张量。
        # x: [bs, num_attention_heads, seq_len, head_size]
        # 如果 seq_len 大于已缓存的最大长度,则重新生成缓存。
        if seq_len > self.max_seq_len_cached:
            self._set_cos_sin_cache(seq_len=seq_len, device=x.device, dtype=x.dtype)
        return ( # 返回正弦和余弦缓存的切片。
            self.cos_cached[:seq_len].to(dtype=x.dtype),
            self.sin_cached[:seq_len].to(dtype=x.dtype),
        )
class Qwen2MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.hidden_size = config.hidden_size # d
        self.intermediate_size = config.intermediate_size # hd
        self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) # d-->hd
        self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)# d-->hd
        self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) # hd-->d
        self.act_fn = ACT2FN[config.hidden_act]
    def forward(self, hidden_state): # (h,s,d)
        # 门控信号生成:gate_proj(hidden_state) 生成门控信号
        # 特征调整:gate_output 与 up_output 相乘,将门控信号应用于特征表示。
        # 门控机制的作用:通过门控信号动态调整哪些特征应该通过哪些特征应该被抑制。
        # 激活函数的选择:如果 config.hidden_act 是 "sigmoid",那么激活函数将是 sigmoid
        return self.down_proj(self.act_fn(self.gate_proj(hidden_state)) * self.up_proj(hidden_state))
class Qwen2Attention(nn.Module):
    def __init__(self, config: Qwen2Config, layer_idx: Optional[int] = None):
        super().__init__() # 调用父类的初始化方法
        self.config = config # 配置类实例
        self.layer_idx = layer_idx # 层索引
        if layer_idx is None:
            logger.warning_once(
                f"Instantiating {self.__class__.__name__} without passing `layer_idx` is not recommended and will "
                "to errors during the forward call, if caching is used. Please make sure to provide a `layer_idx` "
                "when creating this class."
            )
        
        self.hidden_size = config.hidden_size # d
        self.num_heads = config.num_attention_heads # q_h
        self.head_dim = self.hidden_size // self.num_heads # dk
        self.num_key_value_heads = config.num_key_value_heads # kv_h
        self.num_key_value_groups = self.num_heads // self.num_key_value_heads # 比例
        self.max_position_embeddings = config.max_position_embeddings # p
        self.rope_theta = config.rope_theta # base
        self.is_causal = True # 是否用因果掩码
        self.attention_dropout = config.attention_dropout # dropout
        # 嵌入维度必须能被整除
        if (self.head_dim * self.num_heads) != self.hidden_size:
            raise ValueError(
                f"hidden_size must be divisible by num_heads (got `hidden_size`: {self.hidden_size}"
                f" and `num_heads`: {self.num_heads})."
            )
        # 线性投影
        self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=True)
        #需要注意的是这里的投影维度可能和q的投影维度不同
        self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=True)
        self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=True)
        # 最后一个线性转换层
        self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)
        # 旋转位置嵌入层
        self.rotary_emb = Qwen2RotaryEmbedding(
            self.head_dim, # dk
            max_position_embeddings=self.max_position_embeddings,# max_position
            base=self.rope_theta, # base
        )
    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.Tensor] = None,# 可选
        position_ids: Optional[torch.LongTensor] = None,# 可选
        past_key_value: Optional[Cache] = None, # 可选参数:缓存
        output_attentions: bool = False,# 是否输出注意力权重
        use_cache: bool = False, # 是否使用缓存
        cache_position: Optional[torch.LongTensor] = None, # 缓存位置
    ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
        bsz, q_len, _ = hidden_states.size() # b,s,d
        # 投影
        query_states = self.q_proj(hidden_states)
        key_states = self.k_proj(hidden_states)
        value_states = self.v_proj(hidden_states)
        # (b,q_len,q_h,dk)-->(b,q_h,q_len,dk),transpose:换轴(转置)
        query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
        # (b,k_h,k_len,dk)
        key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
        value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
        kv_seq_len = key_states.shape[-2] # k_len
        # 缓存上个时间步的key,value表示
        if past_key_value is not None: # 如果设置了缓存
            if self.layer_idx is None: # 就必须有layer_idx,不然报错
                raise ValueError(
                    f"The cache structure has changed since version v4.36. If you are using {self.__class__.__name__} "
                    "for auto-regressive decoding with k/v caching, please make sure to initialize the attention class "
                    "with a layer index."
                )
            kv_seq_len += past_key_value.get_usable_length(kv_seq_len, self.layer_idx)
        # 旋转位置嵌入,传kv_len
        # 键/值序列长度:kv_seq_len 是键和值向量的长度,这是因为键和值向量代表的是相同的序列。
        # 查询序列长度:q_len 是查询向量的长度,这可能不同于键/值向量的长度。
        # 旋转位置嵌入:在计算旋转位置嵌入时,使用键/值序列长度是为了确保位置信息与键和值向量一致。
        cos, sin = self.rotary_emb(value_states, seq_len=kv_seq_len)
        # 返回带位置信息的嵌入表示
        query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids)
        # 如果past_key_value is not None
        if past_key_value is not None:
            cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position}  # Specific to RoPE models
            # 更新当前的key,value表示
            key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs)
        # repeat k/v heads if n_kv_heads < n_heads
        # 如果键值头数量少于查询头数量,则重复键值头以匹配查询头数量。
        key_states = repeat_kv(key_states, self.num_key_value_groups)
        value_states = repeat_kv(value_states, self.num_key_value_groups)
        # (b,q_h,q_len,dk)@(b,k_h,dk,k_len)-->(b,h,q_len,k_len)
        attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(self.head_dim)
        if attn_weights.size()


http://www.kler.cn/a/329620.html

相关文章:

  • 搜维尔科技:Xsens人形机器人解决方案的优势
  • Typora + PowerShell 在终端打开文件
  • 快手极速版如何查找ip归属地?怎么关掉
  • mysql8.0 重要指标参数介绍
  • 使用FRP进行内网穿透
  • AI在SEO中的关键词优化策略探讨
  • leetcode.每日一题.2516.每种字符至少取 K 个
  • 【C++】C++基础
  • 魔都千丝冥缘——软件终端架构思维———未来之窗行业应用跨平台架构
  • D21【python接口自动化学习】-python基础之内置数据类型
  • Git记录
  • C语言:排序(1)
  • 毕业设计选题:基于ssm+vue+uniapp的家庭记账本小程序
  • 在线远程考试|基于springBoot的在线远程考试系统设计与实现(附项目源码+论文+数据库)
  • 【C++】“list”的介绍和常用接口的模拟实现
  • 进程通信——内存映射
  • Java项目实战II基于Java+Spring Boot+MySQL的智能物流管理系统(文档+源码+数据库)
  • [大语言模型-论文精读] 阿里巴巴-通过多阶段对比学习实现通用文本嵌入
  • 从0开始实现es6 promise类
  • 【可答疑】基于51单片机的体温心率血氧检测系统(含仿真、代码、报告等)
  • I2C-Tools的安装与使用方法(详解,一篇教会你熟练使用)
  • 数据库索引和磁盘的关系大揭秘
  • Leetcode 3307. Find the K-th Character in String Game II
  • 无线通信系统仿真与原型设计:MATLAB实践指南
  • LDRA Testbed(TBrun)软件集成测试(部件测试)_操作指南
  • postgresql僵尸进程的处理思路