当前位置: 首页 > article >正文

MOEFeedForward 模块

代码

class FeedForward(nn.Module):
    def __init__(self, config: LMConfig):
        super().__init__()
        if config.hidden_dim is None:
            hidden_dim = 4 * config.dim
            hidden_dim = int(2 * hidden_dim / 3)
            config.hidden_dim = config.multiple_of * ((hidden_dim + config.multiple_of - 1) // config.multiple_of)
        self.w1 = nn.Linear(config.dim, config.hidden_dim, bias=False)
        self.w2 = nn.Linear(config.hidden_dim, config.dim, bias=False)
        self.w3 = nn.Linear(config.dim, config.hidden_dim, bias=False)
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, x):
        return self.dropout(self.w2(F.silu(self.w1(x)) * self.w3(x)))


class MoEGate(nn.Module):
    def __init__(self, config: LMConfig):
        super().__init__()
        self.config = config
        self.top_k = config.num_experts_per_tok
        self.n_routed_experts = config.n_routed_experts

        self.scoring_func = config.scoring_func
        self.alpha = config.aux_loss_alpha
        self.seq_aux = config.seq_aux

        self.norm_topk_prob = config.norm_topk_prob
        self.gating_dim = config.dim
        self.weight = nn.Parameter(torch.empty((self.n_routed_experts, self.gating_dim)))
        self.reset_parameters()

    def reset_parameters(self) -> None:
        import torch.nn.init as init
        init.kaiming_uniform_(self.weight, a=math.sqrt(5))

    def forward(self, hidden_states):
        bsz, seq_len, h = hidden_states.shape
        hidden_states = hidden_states.view(-1, h)
        logits = F.linear(hidden_states, self.weight, None)
        if self.scoring_func == 'softmax':
            scores = logits.softmax(dim=-1)
        else:
            raise NotImplementedError(f'insupportable scoring function for MoE gating: {self.scoring_func}')

        topk_weight, topk_idx = torch.topk(scores, k=self.top_k, dim=-1, sorted=False)

        if self.top_k > 1 and self.norm_topk_prob:
            denominator = topk_weight.sum(dim=-1, keepdim=True) + 1e-20
            topk_weight = topk_weight / denominator

        if self.training and self.alpha > 0.0:
            scores_for_aux = scores
            aux_topk = self.top_k
            topk_idx_for_aux_loss = topk_idx.view(bsz, -1)
            if self.seq_aux:
                scores_for_seq_aux = scores_for_aux.view(bsz, seq_len, -1)
                ce = torch.zeros(bsz, self.n_routed_experts, device=hidden_states.device)
                ce.scatter_add_(1, topk_idx_for_aux_loss,
                                torch.ones(bsz, seq_len * aux_topk, device=hidden_states.device)).div_(
                    seq_len * aux_topk / self.n_routed_experts)
                aux_loss = (ce * scores_for_seq_aux.mean(dim=1)).sum(dim=1).mean() * self.alpha
            else:
                mask_ce = F.one_hot(topk_idx_for_aux_loss.view(-1), num_classes=self.n_routed_experts)
                ce = mask_ce.float().mean(0)
                Pi = scores_for_aux.mean(0)
                fi = ce * self.n_routed_experts
                aux_loss = (Pi * fi).sum() * self.alpha
        else:
            aux_loss = 0
        return topk_idx, topk_weight, aux_loss


class MOEFeedForward(nn.Module):
    def __init__(self, config: LMConfig):
        super().__init__()
        self.config = config
        self.experts = nn.ModuleList([
            FeedForward(config)
            for _ in range(config.n_routed_experts)
        ])
        self.gate = MoEGate(config)
        if config.n_shared_experts is not None:
            self.shared_experts = FeedForward(config)

    def forward(self, x):
        identity = x
        orig_shape = x.shape
        bsz, seq_len, _ = x.shape
        # 使用门控机制选择专家
        topk_idx, topk_weight, aux_loss = self.gate(x)
        x = x.view(-1, x.shape[-1])
        flat_topk_idx = topk_idx.view(-1)
        if self.training:
            # 训练模式下,重复输入数据
            x = x.repeat_interleave(self.config.num_experts_per_tok, dim=0)
            y = torch.empty_like(x, dtype=torch.float16)
            for i, expert in enumerate(self.experts):
                y[flat_topk_idx == i] = expert(x[flat_topk_idx == i]).to(y.dtype)  # 确保类型一致
            y = (y.view(*topk_weight.shape, -1) * topk_weight.unsqueeze(-1)).sum(dim=1)
            y = y.view(*orig_shape)
        else:
            # 推理模式下,只选择最优专家
            y = self.moe_infer(x, flat_topk_idx, topk_weight.view(-1, 1)).view(*orig_shape)
        if self.config.n_shared_experts is not None:
            y = y + self.shared_experts(identity)
        self.aux_loss = aux_loss
        return y

    @torch.no_grad()
    def moe_infer(self, x, flat_expert_indices, flat_expert_weights):
        expert_cache = torch.zeros_like(x)
        idxs = flat_expert_indices.argsort()
        tokens_per_expert = flat_expert_indices.bincount().cpu().numpy().cumsum(0)
        token_idxs = idxs // self.config.num_experts_per_tok
        # 例如当tokens_per_expert=[6, 15, 20, 26, 33, 38, 46, 52]
        # 当token_idxs=[3, 7, 19, 21, 24, 25,  4,  5,  6, 10, 11, 12...]
        # 意味着当token_idxs[:6] -> [3,  7, 19, 21, 24, 25,  4]位置的token都由专家0处理,token_idxs[6:15]位置的token都由专家1处理......
        for i, end_idx in enumerate(tokens_per_expert):
            start_idx = 0 if i == 0 else tokens_per_expert[i - 1]
            if start_idx == end_idx:
                continue
            expert = self.experts[i]
            exp_token_idx = token_idxs[start_idx:end_idx]
            expert_tokens = x[exp_token_idx]
            expert_out = expert(expert_tokens).to(expert_cache.dtype)
            expert_out.mul_(flat_expert_weights[idxs[start_idx:end_idx]])
            # 使用 scatter_add_ 进行 sum 操作
            expert_cache.scatter_add_(0, exp_token_idx.view(-1, 1).repeat(1, x.shape[-1]), expert_out)

        return expert_cache

代码解释

解释一下这段代码的主要组成部分:

  1. FeedForward 类:
  • 实现了一个基础的前馈网络
  • 使用 SwiGLU 激活函数(F.silu(self.w1(x)) * self.w3(x)
  • 包含三个线性层(w1、w2、w3)和一个 dropout 层
  1. MoEGate 类(门控机制):
  • 负责决定每个 token 应该由哪些专家处理
  • 主要步骤:
    1. 计算每个 token 对应每个专家的分数(使用 softmax)
    2. 选择 top-k 个最高分的专家
    3. 计算辅助损失(aux_loss)来平衡专家的使用
  1. MOEFeedForward 类(混合专家系统):
  • 包含多个专家(FeedForward)和一个门控网络(MoEGate)

  • 训练模式:

    1. 使用门控网络选择每个 token 的专家
    2. 将输入数据复制多份,分发给不同专家
    3. 专家并行处理数据
    4. 根据门控权重合并结果
  • 推理模式(moe_infer):

    1. 对专家索引排序,将相同专家的 token 批量处理
    2. 使用 scatter_add_ 将专家输出累加到正确位置
    3. 更高效的推理实现,避免了数据重复
  1. 特殊功能:
  • 支持共享专家(n_shared_experts
  • 实现了专家负载均衡(通过辅助损失)
  • 支持每个 token 选择多个专家(num_experts_per_tok

这是一个典型的 MoE(Mixture of Experts)实现,用于大型语言模型中提高模型容量和计算效率。

示例

# 创建 MoE 实例
dim = 512                    # 输入维度
n_routed_experts = 4         # 专家数量
num_experts_per_tok = 2      # 每个token选择的专家数量

moe = MOEFeedForward(
    dim=dim,
    n_routed_experts=n_routed_experts,
    num_experts_per_tok=num_experts_per_tok,
    hidden_dim=None,         # FFN隐藏层维度,None时自动计算
    dropout=0.1             # dropout比率
)

# 创建示例输入
batch_size = 2
seq_len = 10
x = torch.randn(batch_size, seq_len, dim)  # 形状: [2, 10, 512]

moe(x)

输出

After gate - topk_idx.shape: torch.Size([20, 2]), topk_weight.shape: torch.Size([20, 2])
After view - x.shape: torch.Size([20, 512]), flat_topk_idx.shape: torch.Size([40])
After repeat_interleave - x.shape: torch.Size([40, 512])
Empty y tensor shape: torch.Size([40, 512])
Expert 0 - input shape: torch.Size([9, 512])
Expert 0 - output shape: torch.Size([9, 512])
Expert 1 - input shape: torch.Size([13, 512])
Expert 1 - output shape: torch.Size([13, 512])
Expert 2 - input shape: torch.Size([11, 512])
Expert 2 - output shape: torch.Size([11, 512])
Expert 3 - input shape: torch.Size([7, 512])
Expert 3 - output shape: torch.Size([7, 512])
Before view - y.shape: torch.Size([40, 512])
topk_weight.shape: torch.Size([20, 2])
After view and sum - y.shape: torch.Size([20, 512])
Final y.shape: torch.Size([2, 10, 512])

相应的torch函数

import torch
# empty: 创建未初始化的张量
x = torch.empty((2, 3))  # 创建形状为 2x3 的未初始化张量

# zeros_like: 创建与输入相同形状的全零张量
a = torch.tensor([[1, 2], [3, 4]])
b = torch.zeros_like(a)  # 创建形状为 2x2 的全零张量
print(b)  # tensor([[0, 0], [0, 0]])
tensor([[0, 0],
        [0, 0]])
import torch.nn.functional as F
x = torch.tensor([[1, 2, 3, 4], [5, 6, 7, 8]])
# view: 改变张量形状
y = x.view(-1)  # 展平为一维
print(y)  # tensor([1, 2, 3, 4, 5, 6, 7, 8])

# -1 表示自动计算该维度大小
z = x.view(-1, 2)  # 重塑为 4x2
print(z)  # tensor([[1, 2], [3, 4], [5, 6], [7, 8]])
tensor([1, 2, 3, 4, 5, 6, 7, 8])
tensor([[1, 2],
        [3, 4],
        [5, 6],
        [7, 8]])
# linear: 线性变换 y = xA^T + b
input = torch.randn(2, 3)  # 2个样本,每个3维
weight = torch.randn(4, 3)  # 输出4维
output = F.linear(input, weight)  # 形状变为 [2, 4]

# softmax: 将数值转换为概率分布
logits = torch.tensor([1.0, 2.0, 3.0])
probs = F.softmax(logits, dim=0)
print(probs)  # tensor([0.0900, 0.2447, 0.6652])
tensor([0.0900, 0.2447, 0.6652])
# 找出最大的k个值及其索引
x = torch.tensor([1, 5, 2, 8, 3])
values, indices = torch.topk(x, k=2)
print(values)   # tensor([8, 5])
print(indices)  # tensor([3, 1])
tensor([8, 5])
tensor([3, 1])
x = torch.tensor([1, 2, 3])
# 每个元素重复2次
y = x.repeat_interleave(2)
print(y)  # tensor([1, 1, 2, 2, 3, 3])
tensor([1, 1, 2, 2, 3, 3])
# 统计每个数字出现的次数
x = torch.tensor([1, 1, 2, 3, 1, 2])
counts = x.bincount()
print(counts)  # tensor([0, 3, 2, 1])  # 0出现0次,1出现3次,2出现2次,3出现1次
tensor([0, 3, 2, 1])
# 在指定位置累加值
src = torch.tensor([[1, 2], [3, 4]], dtype=torch.float)  # 指定数据类型为 float
index = torch.tensor([[0, 1], [0, 1]])
out = torch.zeros(2, 2, dtype=torch.float)  # 确保与 src 的数据类型相同
out.scatter_add_(0, index, src)
print(out) 
tensor([[4., 0.],
        [0., 6.]])
# 返回排序后的索引
x = torch.tensor([3, 1, 4, 1, 5])
indices = x.argsort()
print(indices)  # tensor([1, 3, 0, 2, 4])  # 最小值在位置1和3,然后是0,2,4
tensor([1, 3, 0, 2, 4])

http://www.kler.cn/a/584506.html

相关文章:

  • DeepSeek模型本地化部署方案及Python实现
  • ArcGIS Pro 车牌分区数据处理与地图制作全攻略
  • CMOS电平标准详解
  • JAVA面试_进阶部分_Ibatis与Hibernate的区别
  • STM32第一天建立工程
  • 医疗APP开发如何实现跨机构数据互通
  • Html5记忆翻牌游戏开发经验分享
  • c++介绍函数指针 十
  • 智享三代实景无人直播:实景呈现+智能互动,以科技之力稳抓流量与商机
  • Oracle中In和Exists区别分析
  • 56.HarmonyOS NEXT 登录模块开发教程(十):总结与展望
  • Redis中常见的问题
  • Chrome 扩展开发API实战:Runtime(八)
  • 自然语言处理:文本聚类
  • 【笔记一下】RAG 专题基础学习
  • TightVNC服务端安装与配置:Windows远程桌面的内网穿透解决方案
  • 【亲测有用】数据集成平台能力演示(支持国产数据库DaMeng与KingBase)
  • OpenAI智能体初探:使用 OpenAI Responses API 在 PDF 中实现检索增强生成(RAG)
  • 芯片研发不需要PPT
  • 【Axure视频教程】中继器表格——控制开关按钮