当前位置: 首页 > article >正文

sglang框架源码笔记

文章目录

    • 整体架构
      • 1. **客户端(Client)**:
      • 2. **服务器端(Server)**:
      • 3. **调度器与模型工作节点(Scheduler & Model Worker)**:
    • TpModelWorker类
    • ModelRunner类
    • TpModelWorkerClient类
    • Scheduler类
      • `Scheduler` 类及相关函数概述
        • 1. **`Scheduler` 类简介**
        • 2. **主要职责与功能**
        • 3. **构造函数 (`__init__`)**
        • 4. **调度循环 (`event_loop_normal`, `event_loop_overlap`)**
        • 5. **批次相关函数**
        • 6. **缓存和内存管理**
        • 7. **请求处理**
        • 8. **日志与统计**
        • 9. **权重更新与会话管理**
        • 10. **投机解码和混合式预填充**
      • 总结
    • DataParallelController类
      • `DataParallelController` 类及相关函数概述
        • 1. **`DataParallelController` 类简介**
        • 2. **主要职责与功能**
        • 3. **构造函数 (`__init__`)**
        • 4. **工作者启动函数**
        • 5. **负载均衡调度方法**
        • 6. **事件循环与请求调度**
        • 7. **辅助函数**
        • 8. **异常处理**
      • 总结
    • TokenizerManager 类
      • 主要功能组件:
      • 辅助功能:
      • 作用:
      • 总结:
    • DetokenizerManager 类
      • `DetokenizerManager` 类及其相关函数的实现总结:
        • 1. **`__init__` 方法**
        • 2. **`trim_matched_stop` 方法**
        • 3. **`event_loop` 方法**
        • 4. **`handle_batch_embedding_out` 方法**
        • 5. **`handle_batch_token_id_out` 方法**
        • 6. **`LimitedCapacityDict` 类**
        • 7. **`run_detokenizer_process` 函数**
      • 总结:

整体架构

在这里插入图片描述
这张图片展示了 SGLang 系统的架构概览,主要由 客户端服务器端 两大部分组成,展示了其各个组件之间的交互。以下是各部分的详细解释:

1. 客户端(Client)

  • Native generation API:用于本地生成模型输出的 API,允许用户直接与模型进行交互,获取生成的文本或其他输出。
  • OpenAI-compatible API:兼容 OpenAI API 的接口,允许用户使用与 OpenAI 模型相同的接口调用模型,这使得与其他 OpenAI 模型的兼容性更高。
  • Structured language frontend:这是一个结构化语言前端,可能用于输入特定格式的请求,支持更复杂或定制化的输入方式,如通过结构化语言定义输入数据格式。

2. 服务器端(Server)

  • API server:处理客户端请求的服务器,接收来自客户端的请求并协调后续的处理。
  • Tokenizer:负责对输入数据进行标记化(tokenization),将原始文本转化为模型能够理解的 token。模型推理通常需要标记化后的数据。
  • Detokenizer:与 Tokenizer 对应,负责将模型生成的 tokens 转换回原始文本(反标记化)。

3. 调度器与模型工作节点(Scheduler & Model Worker)

  • KV Cache memory pool:关键值(KV)缓存内存池,每个 token 都占用一个内存页面。缓存用于加速模型计算,存储中间结果或其他必要的数据。
  • Radix tree cache:可能是用来优化数据访问速度的一种树形缓存结构,特别适合快速查找和更新。
  • Finite state machine analyzer:有限状态机分析器,用于分析和控制模型推理过程中可能的状态变化,确保推理过程的正确性和效率。
  • Attention backend (FlashInfer and Triton):注意力机制的后端,使用 FlashInferTriton 进行加速计算。FlashInfer 和 Triton 可能是针对 GPU 和大规模推理的加速工具,可以显著提高推理性能。

TpModelWorker类

import logging
import threading
from typing import Optional

# 导入配置文件和工具函数
from sglang.srt.configs.model_config import ModelConfig
from sglang.srt.hf_transformers_utils import get_processor, get_tokenizer
from sglang.srt.managers.io_struct import (
    GetWeightsByNameReqInput,
    InitWeightsUpdateGroupReqInput,
    UpdateWeightFromDiskReqInput,
    UpdateWeightsFromDistributedReqInput,
    UpdateWeightsFromTensorReqInput,
)
from sglang.srt.managers.schedule_batch import ModelWorkerBatch, global_server_args_dict
from sglang.srt.model_executor.forward_batch_info import ForwardBatch
from sglang.srt.model_executor.model_runner import ModelRunner
from sglang.srt.server_args import ServerArgs
from sglang.srt.utils import MultiprocessingSerializer, broadcast_pyobj, set_random_seed

# 设置日志记录器
logger = logging.getLogger(__name__)

class TpModelWorker:
    """A tensor parallel model worker (一个张量并行模型工作者)"""

    def __init__(
        self,
        server_args: ServerArgs,
        gpu_id: int,
        tp_rank: int,
        dp_rank: Optional[int],
        nccl_port: int,
        is_draft_worker: bool = False,
    ):
        # 解析传入的参数,初始化张量并行工作者
        self.tp_rank = tp_rank

        # 初始化模型配置对象,用于加载模型配置
        self.model_config = ModelConfig(
            (
                server_args.model_path
                if not is_draft_worker
                else server_args.speculative_draft_model_path
            ),
            trust_remote_code=server_args.trust_remote_code,
            revision=server_args.revision,
            context_length=server_args.context_length,
            model_override_args=server_args.json_model_override_args,
            is_embedding=server_args.is_embedding,
            dtype=server_args.dtype,
            quantization=server_args.quantization,
        )
        
        # 初始化模型执行器 (ModelRunner),负责实际的推理过程
        self.model_runner = ModelRunner(
            model_config=self.model_config,
            mem_fraction_static=server_args.mem_fraction_static,
            gpu_id=gpu_id,
            tp_rank=tp_rank,
            tp_size=server_args.tp_size,
            nccl_port=nccl_port,
            server_args=server_args,
            is_draft_worker=is_draft_worker,
        )
        
        # 如果需要跳过tokenizer初始化,则不初始化tokenizer和processor
        if server_args.skip_tokenizer_init:
            self.tokenizer = self.processor = None
        else:
            # 如果模型是多模态的,使用处理器和tokenizer
            if self.model_config.is_multimodal:
                self.processor = get_processor(
                    server_args.tokenizer_path,
                    tokenizer_mode=server_args.tokenizer_mode,
                    trust_remote_code=server_args.trust_remote_code,
                    revision=server_args.revision,
                )
                self.tokenizer = self.processor.tokenizer
            else:
                # 否则,单独初始化tokenizer
                self.tokenizer = get_tokenizer(
                    server_args.tokenizer_path,
                    tokenizer_mode=server_args.tokenizer_mode,
                    trust_remote_code=server_args.trust_remote_code,
                    revision=server_args.revision,
                )
        # 获取模型运行设备(GPU)
        self.device = self.model_runner.device

        # 配置与内存相关的参数
        self.max_total_num_tokens = self.model_runner.max_total_num_tokens
        self.max_prefill_tokens = server_args.max_prefill_tokens
        self.max_running_requests = min(
            (
                self.max_total_num_tokens // 2
                if server_args.max_running_requests is None
                else server_args.max_running_requests
                // (server_args.dp_size if server_args.enable_dp_attention else 1)
            ),
            self.model_runner.req_to_token_pool.size,
        )
        self.max_req_len = min(
            self.model_config.context_len - 1,
            self.max_total_num_tokens - 1,
        )
        self.max_req_input_len = self.max_req_len - 5
        # 确保内存池足够大
        assert (
            self.max_req_len > 0 and self.max_req_input_len > 0
        ), "Memory pool size is too small"

        # 在张量并行工作者之间同步随机种子
        self.random_seed = broadcast_pyobj(
            [server_args.random_seed],
            self.tp_rank,
            self.model_runner.tp_group.cpu_group,
        )[0]
        # 设置随机种子,确保可重复性
        set_random_seed(self.random_seed)

    def get_worker_info(self):
        """返回与工作者相关的信息,如最大token数量、最大请求长度、随机种子等"""
        return (
            self.max_total_num_tokens,
            self.max_prefill_tokens,
            self.max_running_requests,
            self.max_req_len,
            self.max_req_input_len,
            self.random_seed,
            self.device,
            global_server_args_dict,
            self.model_runner.req_to_token_pool.size,
            self.model_runner.req_to_token_pool.max_context_len,
            self.model_runner.token_to_kv_pool.size,
        )

    def get_pad_input_ids_func(self):
        """返回用于填充输入ID的函数(如果模型有提供的话)"""
        return getattr(self.model_runner.model, "pad_input_ids", None)

    def get_tp_cpu_group(self):
        """返回张量并行的CPU组,用于协调并行工作"""
        return self.model_runner.tp_group.cpu_group

    def get_attention_tp_cpu_group(self):
        """返回注意力机制相关的张量并行CPU组"""
        return self.model_runner.attention_tp_group.cpu_group

    def get_memory_pool(self):
        """返回工作者的内存池,用于存储tokens和key-value对"""
        return (
            self.model_runner.req_to_token_pool,
            self.model_runner.token_to_kv_pool,
        )

    def forward_batch_generation(
        self,
        model_worker_batch: ModelWorkerBatch,
        launch_done: Optional[threading.Event] = None,
        skip_sample: bool = False,
    ):
        """执行一批生成任务的前向传播,并根据需要采样token"""
        # 初始化前向批次
        forward_batch = ForwardBatch.init_new(model_worker_batch, self.model_runner)
        # 执行前向传播
        logits_output = self.model_runner.forward(forward_batch)
        
        # 如果提供了launch_done事件,标记为完成
        if launch_done:
            launch_done.set()

        # 如果skip_sample为True,则跳过token采样
        if skip_sample:
            next_token_ids = None
        else:
            next_token_ids = self.model_runner.sample(logits_output, model_worker_batch)

        return logits_output, next_token_ids

    def forward_batch_embedding(self, model_worker_batch: ModelWorkerBatch):
        """执行一批embedding任务的前向传播,返回嵌入向量"""
        forward_batch = ForwardBatch.init_new(model_worker_batch, self.model_runner)
        logits_output = self.model_runner.forward(forward_batch)
        embeddings = logits_output.embeddings
        return embeddings

    def update_weights_from_disk(self, recv_req: UpdateWeightFromDiskReqInput):
        """从磁盘更新权重"""
        success, message = self.model_runner.update_weights_from_disk(
            recv_req.model_path, recv_req.load_format
        )
        return success, message

    def init_weights_update_group(self, recv_req: InitWeightsUpdateGroupReqInput):
        """初始化权重更新的分布式组"""
        success, message = self.model_runner.init_weights_update_group(
            recv_req.master_address,
            recv_req.master_port,
            recv_req.rank_offset,
            recv_req.world_size,
            recv_req.group_name,
            recv_req.backend,
        )
        return success, message

    def update_weights_from_distributed(
        self, recv_req: UpdateWeightsFromDistributedReqInput
    ):
        """从分布式源更新权重"""
        success, message = self.model_runner.update_weights_from_distributed(
            recv_req.name, recv_req.dtype, recv_req.shape
        )
        return success, message

    def update_weights_from_tensor(self, recv_req: UpdateWeightsFromTensorReqInput):
        """从张量更新权重"""
        success, message = self.model_runner.update_weights_from_tensor(
            MultiprocessingSerializer.deserialize(recv_req.serialized_named_tensors)
        )
        return success, message

    def get_weights_by_name(self, recv_req: GetWeightsByNameReqInput):
        """根据权重名称获取对应的权重参数"""
        parameter = self.model_runner.get_weights_by_name(
            recv_req.name, recv_req.truncate_size
        )
        return parameter

ModelRunner类

import gc
import json
import logging
import time
from typing import List, Optional, Tuple

import torch
import torch.distributed as dist

from sglang.srt.configs.device_config import DeviceConfig
from sglang.srt.configs.load_config import LoadConfig
from sglang.srt.configs.model_config import AttentionArch, ModelConfig
from sglang.srt.distributed import (
    get_tp_group,
    init_distributed_environment,
    initialize_model_parallel,
    set_custom_all_reduce,
)
from sglang.srt.distributed.parallel_state import monkey_patch_vllm_parallel_state
from sglang.srt.layers.attention.double_sparsity_backend import DoubleSparseAttnBackend
from sglang.srt.layers.attention.flashinfer_backend import FlashInferAttnBackend
from sglang.srt.layers.attention.flashinfer_mla_backend import FlashInferMLAAttnBackend
from sglang.srt.layers.attention.torch_native_backend import TorchNativeAttnBackend
from sglang.srt.layers.attention.triton_backend import TritonAttnBackend
from sglang.srt.layers.dp_attention import (
    get_attention_tp_group,
    get_attention_tp_size,
    initialize_dp_attention,
)
from sglang.srt.layers.logits_processor import LogitsProcessorOutput
from sglang.srt.layers.sampler import Sampler
from sglang.srt.layers.torchao_utils import apply_torchao_config_to_model
from sglang.srt.lora.lora_manager import LoRAManager
from sglang.srt.managers.schedule_batch import global_server_args_dict
from sglang.srt.mem_cache.memory_pool import (
    DoubleSparseTokenToKVPool,
    MHATokenToKVPool,
    MLATokenToKVPool,
    ReqToTokenPool,
)
from sglang.srt.model_executor.cuda_graph_runner import CudaGraphRunner
from sglang.srt.model_executor.forward_batch_info import ForwardBatch
from sglang.srt.model_loader import get_model
from sglang.srt.server_args import ServerArgs
from sglang.srt.speculative.spec_info import SpeculativeAlgorithm
from sglang.srt.torch_memory_saver_adapter import TorchMemorySaverAdapter
from sglang.srt.utils import (
    enable_show_time_cost,
    get_available_gpu_memory,
    init_custom_process_group,
    is_cuda,
    is_hip,
    monkey_patch_p2p_access_check,
    monkey_patch_vllm_gguf_config,
    set_cpu_offload_max_bytes,
    set_cuda_arch,
)

logger = logging.getLogger(__name__)

class ModelRunner:
    """ModelRunner运行模型的前向传播过程,负责推理的核心工作。"""

    def __init__(
        self,
        model_config: ModelConfig,
        mem_fraction_static: float,
        gpu_id: int,
        tp_rank: int,
        tp_size: int,
        nccl_port: int,
        server_args: ServerArgs,
        is_draft_worker: bool = False,
    ):
        # 解析传入的配置参数
        self.model_config = model_config
        self.mem_fraction_static = mem_fraction_static
        self.device = server_args.device
        self.gpu_id = gpu_id
        self.tp_rank = tp_rank
        self.tp_size = tp_size
        self.dist_port = nccl_port
        self.server_args = server_args
        self.is_draft_worker = is_draft_worker
        self.is_generation = model_config.is_generation  # 是否为生成模型
        self.is_multimodal = model_config.is_multimodal  # 是否为多模态模型
        self.should_log = tp_rank == 0  # 只有rank为0的节点进行日志记录
        self.spec_algorithm = SpeculativeAlgorithm.from_string(
            server_args.speculative_algorithm
        )

        # 针对特定模型的调整
        if (
            self.model_config.attention_arch == AttentionArch.MLA
            and not self.server_args.disable_mla
        ):
            # MLA优化,仅适用于非CPU设备
            if self.server_args.device != "cpu":
                if server_args.enable_flashinfer_mla:
                    logger.info(
                        "MLA优化已开启,使用flashinfer mla后端。"
                    )
                    self.server_args.attention_backend = "flashinfer_mla"
                else:
                    logger.info("MLA优化已开启,使用triton后端。")
                    self.server_args.attention_backend = "triton"

        if self.server_args.enable_double_sparsity:
            # 启用双重稀疏优化,使用triton后端
            logger.info(
                "双重稀疏优化已开启,使用triton后端,不启用CUDA图。"
            )
            self.server_args.attention_backend = "triton"
            self.server_args.disable_cuda_graph = True
            if self.server_args.ds_heavy_channel_type is None:
                raise ValueError(
                    "请指定用于双重稀疏优化的重通道类型。"
                )
            self.init_double_sparsity_channel_config(
                self.server_args.ds_heavy_channel_type
            )

        if self.is_multimodal:
            self.mem_fraction_static *= 0.95
            logger.info(
                f"由于这是一个多模态模型,自动将--mem-fraction-static减少为 {self.mem_fraction_static:.3f} "
            )
            
            # 特定架构的调整
            if self.model_config.hf_config.architectures == [
                "MllamaForConditionalGeneration"
            ]:
                logger.info("为mllama模型自动关闭--chunked-prefill-size。")
                server_args.chunked_prefill_size = -1

            if self.model_config.hf_config.architectures == [
                "Qwen2VLForConditionalGeneration"
            ]:
                # TODO: qwen2-vl暂时不支持基数缓存,自动禁用radix缓存
                logger.info(
                    "为qwen2-vl自动关闭--chunked-prefill-size并禁用基数缓存。"
                )
                server_args.chunked_prefill_size = -1
                server_args.disable_radix_cache = True

        # 全局变量更新
        if server_args.show_time_cost:
            enable_show_time_cost()
        if server_args.disable_outlines_disk_cache:
            from outlines.caching import disable_cache

            disable_cache()

        global_server_args_dict.update(
            {
                "attention_backend": server_args.attention_backend,
                "sampling_backend": server_args.sampling_backend,
                "triton_attention_reduce_in_fp32": server_args.triton_attention_reduce_in_fp32,
                "disable_mla": server_args.disable_mla,
                "torchao_config": server_args.torchao_config,
                "enable_nan_detection": server_args.enable_nan_detection,
                "enable_dp_attention": server_args.enable_dp_attention,
                "enable_ep_moe": server_args.enable_ep_moe,
                "device": server_args.device,
                "enable_flashinfer_mla": server_args.enable_flashinfer_mla,
                "disable_radix_cache": server_args.disable_radix_cache,
            }
        )

        set_cpu_offload_max_bytes(int(server_args.cpu_offload_gb * 1024**3))

        # 获取加载前的内存
        min_per_gpu_memory = self.init_torch_distributed()

        self.memory_saver_adapter = TorchMemorySaverAdapter.create(
            enable=self.server_args.enable_memory_saver
        )

        # 加载模型
        self.sampler = Sampler()
        self.load_model()

        # 应用torchao量化
        torchao_applied = getattr(self.model, "torchao_applied", False)
        # 在分层加载中,torchao可能已经被应用
        if not torchao_applied:
            apply_torchao_config_to_model(
                self.model, global_server_args_dict["torchao_config"]
            )

        # 应用torch TP(如果模型支持)
        supports_torch_tp = getattr(self.model, "supports_torch_tp", False)
        if self.tp_size > 1 and supports_torch_tp:
            self.apply_torch_tp()
            self.torch_tp_applied = True
        else:
            self.torch_tp_applied = False

        # 初始化内存池和注意力后端
        if server_args.lora_paths is not None:
            self.init_lora_manager()
        self.init_memory_pool(
            min_per_gpu_memory,
            server_args.max_running_requests,
            server_args.max_total_tokens,
        )
        if self.device == "cuda":
            self.init_cublas()
            self.init_attention_backend()
            self.init_cuda_graphs()
        else:
            self.cuda_graph_runner = None
            self.init_attention_backend()

    def init_torch_distributed(self):
        """初始化torch分布式环境,包括通信后端和分布式训练设置。"""
        logger.info("初始化torch分布式环境开始。")

        torch.get_device_module(self.device).set_device(self.gpu_id)
        if self.device == "cuda":
            backend = "nccl"
        elif self.device == "xpu":
            backend = "gloo"
        elif self.device == "hpu":
            backend = "hccl"
        elif self.device == "cpu":
            backend = "gloo"

        if not self.server_args.enable_p2p_check:
            monkey_patch_p2p_access_check()

        # 初始化分布式环境
        if self.server_args.dist_init_addr:
            dist_init_method = f"tcp://{self.server_args.dist_init_addr}"
        else:
            dist_init_method = f"tcp://127.0.0.1:{self.dist_port}"
        set_custom_all_reduce(not self.server_args.disable_custom_all_reduce)

        if not self.is_draft_worker:
            init_distributed_environment(
                backend=backend,
                world_size=self.tp_size,
                rank=self.tp_rank,
                local_rank=self.gpu_id,
                distributed_init_method=dist_init_method,
                timeout=self.server_args.dist_timeout,
            )
            initialize_model_parallel(tensor_model_parallel_size=self.tp_size)
            initialize_dp_attention(
                enable_dp_attention=self.server_args.enable_dp_attention,
                tp_rank=self.tp_rank,
                tp_size=self.tp_size,
                dp_size=self.server_args.dp_size,
            )

        min_per_gpu_memory = get_available_gpu_memory(
            self.device, self.gpu_id, distributed=self.tp_size > 1
        )
        self.tp_group = get_tp_group()
        self.attention_tp_group = get_attention_tp_group()

        # 检查内存是否均衡
        if self.tp_size > 1:
            local_gpu_memory = get_available_gpu_memory(self.device, self.gpu_id)
            if min_per_gpu_memory < local_gpu_memory * 0.9:
                raise ValueError(
                    "内存容量不均衡,部分GPU可能被其他进程占用。"
                )

        return min_per_gpu_memory

    def load_model(self):
        """加载模型并设置相关配置。"""
        logger.info(
            f"开始加载权重,当前内存={get_available_gpu_memory(self.device, self.gpu_id):.2f} GB"
        )

        # 设置线程数以减少线程冲突,提升加载速度
        if self.device != "cpu":
            torch.set_num_threads(1)
        if self.device == "cuda":
            if torch.cuda.get_device_capability()[0] < 8:
                logger.info(
                    "计算能力低于sm80,使用float16代替bfloat16。"
                )
                self.server_args.dtype = "float16"
                self.model_config.dtype = torch.float16
                if torch.cuda.get_device_capability()[1] < 5:
                    raise RuntimeError("SGLang仅支持sm75及以上。")

        set_cuda_arch()

        # 准备加载配置
        self.load_config = LoadConfig(
            load_format=self.server_args.load_format,
            download_dir=self.server_args.download_dir,
        )
        if self.server_args.load_format == "gguf":
            monkey_patch_vllm_gguf_config()

        # 加载模型
        monkey_patch_vllm_parallel_state()
        with self.memory_saver_adapter.region():
            self.model = get_model(
                model_config=self.model_config,
                load_config=self.load_config,
                device_config=DeviceConfig(self.device),
            )
        monkey_patch_vllm_parallel_state(reverse=True)

        # 加载KV缓存的缩放因子
        if self.server_args.kv_cache_dtype == "fp8_e4m3":
            if self.server_args.quantization_param_path is not None:
                if callable(getattr(self.model, "load_kv_cache_scales", None)):
                    self.model.load_kv_cache_scales(
                        self.server_args.quantization_param_path
                    )
                    logger.info(
                        "从%s加载KV缓存缩放因子",
                        self.server_args.quantization_param_path,
                    )
                else:
                    raise RuntimeError(
                        "使用FP8 KV缓存并提供缩放因子,但模型不支持加载这些因子。"
                    )
            else:
                logger.warning(
                    "使用FP8 KV缓存,但没有提供缩放因子,默认使用1.0的因子。"
                )

        # 设置其他参数
        self.sliding_window_size = (
            self.model.get_attention_sliding_window_size()
            if hasattr(self.model, "get_attention_sliding_window_size")
            else None
        )
        self.dtype = self.model_config.dtype

        logger.info(
            f"权重加载结束,模型类型={type(self.model).__name__}, dtype={self.dtype}, "
            f"当前内存={get_available_gpu_memory(self.device, self.gpu_id):.2f} GB"
        )

    def update_weights_from_disk(
        self, model_path: str, load_format: str
    ) -> tuple[bool, str]:
        """从磁盘更新引擎权重。"""
        from sglang.srt.model_loader.loader import (
            DefaultModelLoader,
            device_loading_context,
            get_model_loader,
        )
        from sglang.srt.model_loader.utils import set_default_torch_dtype

        logger.info(
            f"开始在线从磁盘更新引擎权重,当前内存={get_available_gpu_memory(self.device, self.gpu_id):.2f} GB"
        )

        target_device = torch.device(self.device)
        self.model_config.model_path = model_path
        load_config = LoadConfig(load_format=load_format)

        # 当前仅支持vllm的DefaultModelLoader
        loader = get_model_loader(load_config)
        if not isinstance(loader, DefaultModelLoader):
            message = f"获取模型加载器失败: {loader}."
            return False, message

        def get_weight_iter(config):
            iter = loader._get_weights_iterator(
                DefaultModelLoader.Source(
                    config.model_path,
                    revision=config.revision,
                    fall_back_to_pt=getattr(
                        self.model, "fall_back_to_pt_during_load", True
                    ),
                )
            )
            return iter

        def model_load_weights(model, iter):
            model.load_weights(iter)
            for _, module in self.model.named_modules():
                quant_method = getattr(module, "quant_method", None)
                if quant_method is not None:
                    with device_loading_context(module, target_device):
                        quant_method.process_weights_after_loading(module)
            return model

        with set_default_torch_dtype(self.model_config.dtype):
            try:
                iter = get_weight_iter(self.model_config)
            except Exception as e:
                message = f"获取权重迭代器失败: {e}."
                return False, message
            try:
                model = model_load_weights(self.model, iter)
            except Exception as e:
                message = (
                    f"更新权重失败: {e}.\n将回滚到原始权重。"
                )
                del iter
                gc.collect()
                iter = get_weight_iter(self.model_config)
                self.model = model_load_weights(self.model, iter)
                return False, message

        self.model = model
        self.server_args.model_path = model_path
        self.server_args.load_format = load_format
        self.load_config = load_config

        logger.info("更新权重结束。")
        return True, "成功从磁盘更新模型权重。"

TpModelWorkerClient类

"""A tensor parallel worker."""

import dataclasses
import logging
import signal
import threading
from queue import Queue
from typing import Optional

import psutil
import torch

from sglang.srt.managers.io_struct import (
    GetWeightsByNameReqInput,
    InitWeightsUpdateGroupReqInput,
    UpdateWeightFromDiskReqInput,
    UpdateWeightsFromDistributedReqInput,
    UpdateWeightsFromTensorReqInput,
)
from sglang.srt.managers.schedule_batch import ModelWorkerBatch
from sglang.srt.managers.tp_worker import TpModelWorker
from sglang.srt.server_args import ServerArgs
from sglang.srt.utils import get_compiler_backend
from sglang.utils import get_exception_traceback

logger = logging.getLogger(__name__)

@torch.compile(dynamic=True, backend=get_compiler_backend())
def resolve_future_token_ids(input_ids, future_token_ids_map):
    """解决未来的token id,通过查找映射更新输入的input_ids"""
    input_ids[:] = torch.where(
        input_ids < 0,
        future_token_ids_map[torch.clamp(-input_ids, min=0)],
        input_ids,
    )


class TpModelWorkerClient:
    """一个张量并行模型工作者客户端,负责与模型进行交互"""

    def __init__(
        self,
        server_args: ServerArgs,
        gpu_id: int,
        tp_rank: int,
        dp_rank: Optional[int],
        nccl_port: int,
    ):
        # 初始化模型工作者
        self.worker = TpModelWorker(server_args, gpu_id, tp_rank, dp_rank, nccl_port)
        self.max_running_requests = self.worker.max_running_requests
        self.device = self.worker.device
        self.gpu_id = gpu_id

        # 初始化未来token映射
        self.future_token_ids_ct = 0
        self.future_token_ids_limit = self.max_running_requests * 3
        self.future_token_ids_map = torch.empty(
            (self.max_running_requests * 5,), dtype=torch.int32, device=self.device
        )

        # 启动线程
        self.input_queue = Queue()  # 输入队列,存储待处理的batch
        self.output_queue = Queue()  # 输出队列,存储处理结果
        self.forward_stream = torch.get_device_module(self.device).Stream()  # 为前向计算创建流
        self.forward_thread = threading.Thread(
            target=self.forward_thread_func,  # 启动前向线程
        )
        self.forward_thread.start()
        self.parent_process = psutil.Process().parent()  # 获取父进程
        self.scheduler_stream = torch.get_device_module(self.device).current_stream()
        if self.device == "cpu":
            self.scheduler_stream.synchronize = lambda: None  # 如果是CPU设备,则同步函数为空操作

    def get_worker_info(self):
        """获取工作者信息,包括最大token数、内存池大小等"""
        return self.worker.get_worker_info()

    def get_pad_input_ids_func(self):
        """获取填充input_ids的函数"""
        return self.worker.get_pad_input_ids_func()

    def get_tp_cpu_group(self):
        """获取张量并行的CPU组"""
        return self.worker.get_tp_cpu_group()

    def get_attention_tp_cpu_group(self):
        """获取注意力张量并行的CPU组"""
        return self.worker.get_attention_tp_cpu_group()

    def get_memory_pool(self):
        """获取内存池,包含tokens和key-value对池"""
        return (
            self.worker.model_runner.req_to_token_pool,
            self.worker.model_runner.token_to_kv_pool,
        )

    def forward_thread_func(self):
        """前向计算线程的函数,处理模型推理"""
        try:
            with torch.get_device_module(self.device).stream(self.forward_stream):
                self.forward_thread_func_()
        except Exception:
            traceback = get_exception_traceback()  # 获取异常堆栈
            logger.error(f"TpModelWorkerClient遇到异常: {traceback}")
            self.parent_process.send_signal(signal.SIGQUIT)  # 向父进程发送退出信号

    @torch.no_grad()
    def forward_thread_func_(self):
        """前向计算的实际函数,处理模型的推理和结果收集"""
        batch_pt = 0
        batch_lists = [None] * 2

        while True:
            # 从输入队列获取batch
            model_worker_batch, future_token_ids_ct = self.input_queue.get()
            if not model_worker_batch:
                break  # 如果batch为空,退出循环

            # 保持对model_worker_batch的引用,否则它的tensor成员会被释放,导致CUDA内存访问错误
            batch_lists[batch_pt % 2] = model_worker_batch
            batch_pt += 1

            # 创建事件
            self.launch_done = threading.Event()
            copy_done = torch.get_device_module(self.device).Event()

            # 解决输入中的未来token ids
            input_ids = model_worker_batch.input_ids
            resolve_future_token_ids(input_ids, self.future_token_ids_map)

            # 进行前向推理
            logits_output, next_token_ids = self.worker.forward_batch_generation(
                model_worker_batch, self.launch_done
            )

            # 更新未来token id映射
            bs = len(model_worker_batch.seq_lens)
            self.future_token_ids_map[
                future_token_ids_ct + 1 : future_token_ids_ct + bs + 1
            ] = next_token_ids

            # 将结果复制到CPU
            if model_worker_batch.return_logprob:
                logits_output.next_token_logprobs = (
                    logits_output.next_token_logprobs.to("cpu", non_blocking=True)
                )
                if logits_output.input_token_logprobs is not None:
                    logits_output.input_token_logprobs = (
                        logits_output.input_token_logprobs.to("cpu", non_blocking=True)
                    )
            if logits_output.hidden_states is not None:
                logits_output.hidden_states = logits_output.hidden_states.to(
                    "cpu", non_blocking=True
                )
            next_token_ids = next_token_ids.to("cpu", non_blocking=True)
            copy_done.record()

            # 将处理结果放入输出队列
            self.output_queue.put((copy_done, logits_output, next_token_ids))

    def resolve_batch_result(self, bid: int):
        """从输出队列获取并处理一个batch的结果"""
        copy_done, logits_output, next_token_ids = self.output_queue.get()
        copy_done.synchronize()  # 等待复制完成
        self.launch_done.wait()  # 等待前向推理完成

        # 将logits结果转换为list格式
        if logits_output.next_token_logprobs is not None:
            logits_output.next_token_logprobs = (
                logits_output.next_token_logprobs.tolist()
            )
            if logits_output.input_token_logprobs is not None:
                logits_output.input_token_logprobs = (
                    logits_output.input_token_logprobs.tolist()
                )
        next_token_ids = next_token_ids.tolist()  # 将token id转换为list
        return logits_output, next_token_ids

    def forward_batch_generation(self, model_worker_batch: ModelWorkerBatch):
        """生成新的batch并将其推送到队列进行处理"""
        # 创建sampling_info的副本,因为调度器会就地更新它
        sampling_info = model_worker_batch.sampling_info
        sampling_info.update_penalties()  # 更新惩罚
        model_worker_batch.sampling_info = self.cur_sampling_info = dataclasses.replace(
            sampling_info,
            sampling_info_done=threading.Event(),
            scaling_penalties=sampling_info.scaling_penalties,
            linear_penalties=sampling_info.linear_penalties,
        )

        # 同步CUDA流,避免非法内存访问错误
        self.scheduler_stream.synchronize()

        # 将新batch推送到输入队列
        self.input_queue.put((model_worker_batch, self.future_token_ids_ct))

        # 分配输出的future对象
        bs = len(model_worker_batch.seq_lens)
        future_next_token_ids = torch.arange(
            -(self.future_token_ids_ct + 1),
            -(self.future_token_ids_ct + 1 + bs),
            -1,
            dtype=torch.int32,
            device=self.device,
        )
        self.future_token_ids_ct = (
            self.future_token_ids_ct + bs
        ) % self.future_token_ids_limit
        return None, future_next_token_ids

    def update_weights_from_disk(self, recv_req: UpdateWeightFromDiskReqInput):
        """从磁盘更新权重"""
        success, message = self.worker.update_weights_from_disk(recv_req)
        return success, message

    def init_weights_update_group(self, recv_req: InitWeightsUpdateGroupReqInput):
        """初始化权重更新组,用于分布式训练"""
        success, message = self.worker.init_weights_update_group(recv_req)
        return success, message

    def update_weights_from_distributed(
        self, recv_req: UpdateWeightsFromDistributedReqInput
    ):
        """从分布式系统更新权重"""
        success, message = self.worker.update_weights_from_distributed(recv_req)
        return success, message

    def update_weights_from_tensor(self, recv_req: UpdateWeightsFromTensorReqInput):
        """从tensor更新权重"""
        success, message = self.worker.update_weights_from_tensor(recv_req)
        return success, message

    def get_weights_by_name(self, recv_req: GetWeightsByNameReqInput):
        """根据权重名称获取模型权重"""
        return self.worker.get_weights_by_name(recv_req)

    def __delete__(self):
        """析构函数,清理资源"""
        self.input_queue.put((None, None))  # 停止输入队列的处理
        self.copy_queue.put((None, None, None))  # 停止复制队列的处理

TpModelWorkerClient 类是一个负责管理张量并行模型工作者的客户端。它与 TpModelWorker 进行交互,管理输入输出队列并处理模型推理任务。
该类通过多线程实现前向推理,并在计算完成后将结果传回主线程。
各个方法实现了模型加载、前向推理、权重更新等功能。

Scheduler类

Scheduler 类及相关函数概述

1. Scheduler 类简介

Scheduler 类是一个调度器,负责管理和调度一个张量并行(TP)GPU工作者,它协调了模型的生成(generation)或嵌入(embedding)任务的执行。调度器通过接收请求、运行批次、并生成结果来处理请求,并管理与硬件(如GPU、内存池)相关的操作。它还涉及到缓存管理、内存池释放、权重更新等操作。

2. 主要职责与功能
  • 调度批次:根据请求的优先级和资源的可用性,决定下一个需要执行的批次。
  • 处理生成与嵌入请求:根据请求类型(生成或嵌入)调度对应的任务,处理模型生成的输出(如logits、token、embedding等)。
  • 内存与缓存管理:通过内存池和缓存的管理,优化GPU资源的利用,避免内存泄漏。
  • 异步处理:调度器支持异步任务处理,能够并行进行数据预填充与生成解码工作,提升吞吐量。
  • Speculative Decoding(投机解码):如果启用了投机解码算法,调度器会启动额外的草案工作者来提前生成部分预测,帮助加速推理过程。
3. 构造函数 (__init__)
  • 初始化调度器,设置多种参数,包括模型配置、硬件信息(如GPU ID、张量并行大小等),以及缓存、内存池等。
  • 根据配置,选择是否启用重叠调度(enable_overlap)。
  • 初始化与其他模块的通信(如与tokenizer的IPC通信)。
  • 启动与GPU工作者相关的实例(如 TpWorkerClass),以及用于投机解码的工作者(如 EAGLEWorker)。
4. 调度循环 (event_loop_normal, event_loop_overlap)
  • event_loop_normal:正常的调度循环,依次接收请求、处理请求、执行批次并生成结果。
  • event_loop_overlap:异步调度循环,能够重叠执行CPU处理与GPU计算,提升效率。
5. 批次相关函数
  • get_next_batch_to_run:获取下一个要运行的批次,决定是否执行预填充(prefill)或解码(decode),并处理DP注意力机制。
  • get_new_batch_prefill:检查是否有请求可以进行预填充,并根据策略生成新的批次。
  • update_running_batch:更新当前正在运行的解码批次,检查是否内存溢出,必要时进行回退。
  • run_batch:运行批次,调用相应的生成或嵌入函数,并返回结果。
6. 缓存和内存管理
  • flush_cache:清空缓存和内存池,用于释放资源。
  • check_memory:检查内存使用情况,发现内存泄漏时触发警告。
7. 请求处理
  • recv_requests:接收来自不同源的请求,并将请求广播到其他张量并行工作者。
  • process_input_requests:处理接收到的请求,分发到合适的处理函数(如生成请求、嵌入请求等)。
  • handle_generate_request:处理生成请求,创建新的生成任务。
  • handle_embedding_request:处理嵌入请求,生成相应的嵌入结果。
8. 日志与统计
  • log_prefill_stats:记录预填充批次的统计数据,如缓存命中率、使用的token数等。
  • log_decode_stats:记录解码批次的统计数据,如生成的token数、生成吞吐量等。
9. 权重更新与会话管理
  • update_weights_from_diskupdate_weights_from_distributedupdate_weights_from_tensor:更新模型的权重,包括从磁盘、分布式或张量输入更新。
  • open_sessionclose_session:用于管理会话,允许为每个会话创建、关闭任务。
10. 投机解码和混合式预填充
  • 投机解码(Speculative Decoding):如果启用,调度器会启动草案工作者(draft_worker),提前生成一些token,优化推理速度。
  • 混合式预填充(Mixed Chunked Prefill):支持混合式预填充,使得可以更高效地进行批次的生成。

总结

Scheduler 类是一个高度集成的调度器,负责协调多个工作者、管理请求、调度批次并优化GPU和内存资源的利用。它能够灵活处理生成与嵌入任务,支持异步操作、投机解码、混合预填充等高级功能,并能够高效管理缓存、内存以及模型权重的更新。

DataParallelController类

DataParallelController 类及相关函数概述

1. DataParallelController 类简介

DataParallelController 类是一个管理数据并行工作者的控制器。它负责根据不同的负载均衡方法(如轮询调度和最短队列调度)将请求分发到多个数据并行(DP)工作者。控制器还负责初始化、管理与数据并行相关的调度程序进程,并进行模型加载、请求调度等任务。

2. 主要职责与功能
  • 负载均衡调度:根据负载均衡策略(ROUND_ROBINSHORTEST_QUEUE)选择合适的工作者,并将请求分发给工作者。
  • 初始化与启动调度器:初始化与启动数据并行调度器进程,为每个数据并行工作者创建独立的调度程序。
  • 调度请求:根据当前负载情况(如调度方法)将请求分发给不同的工作者。
  • 跨进程通信:通过 ZeroMQ 实现跨进程通信,接收请求并将请求转发给合适的工作者。
  • 模型信息管理:管理模型的信息,例如最大token数、请求输入长度等。
3. 构造函数 (__init__)
  • 初始化调度器,设置服务器和端口参数。
  • 选择负载均衡调度方法(轮询调度或最短队列调度)。
  • 初始化 ZeroMQ 上下文和 IPC(进程间通信)套接字。
  • 启动数据并行工作者,分别为每个 dp_rank 启动调度器进程。
  • 配置调度器与工作者的通信通道。
4. 工作者启动函数
  • launch_dp_schedulers:为每个数据并行工作者启动调度程序进程,并初始化相关的端口。
  • launch_dp_attention_schedulers:为数据并行注意力机制的工作者启动调度程序进程,并计算注意力并行信息。
  • launch_tensor_parallel_group:为每个Tensor并行组(TP group)启动进程,并设置工作者和调度器的通信端口。
5. 负载均衡调度方法
  • round_robin_scheduler:按照轮询的方式将请求依次发送给每个工作者,循环利用所有工作者。
  • shortest_queue_scheduler:通过最短队列的方式,将请求分发给当前队列最短的工作者(此方法尚未实现)。
6. 事件循环与请求调度
  • event_loop:控制器的事件循环,持续接收请求,并根据负载均衡方法将请求分发给合适的工作者。如果是控制请求,则将请求转发给Tensor并行组的第一个工作者。
7. 辅助函数
  • run_data_parallel_controller_process:启动 DataParallelController 进程,并处理日志记录和异常处理。
  • setproctitle.setproctitle:设置进程标题,便于进程管理。
  • configure_logger:配置日志记录。
  • get_zmq_socket:创建 ZeroMQ 套接字,用于进程间通信。
8. 异常处理
  • 在调度过程中,如果发生异常,会捕获并记录异常信息,并向父进程发送信号以终止程序。

总结

DataParallelController 类是一个用于管理数据并行工作者的控制器。它负责根据不同的负载均衡策略(如轮询或最短队列)调度请求到各个工作者,并进行进程间通信。它还负责启动和管理调度程序进程、模型信息的管理、内存池的释放等。该类使用 ZeroMQ 进行进程间通信,并通过多个子进程来实现数据并行的工作调度。

TokenizerManager 类

TokenizerManager 类是一个处理文本标记化的核心组件,负责管理与模型相关的各种功能,包括文本的标记化、批处理请求、会话管理、模型权重更新、性能监控等。以下是该类及其相关函数的总结:

主要功能组件:

  1. 初始化 (__init__):

    • 初始化过程中,接受 server_argsport_args 参数,配置 tokenizer 管理器。
    • 通过 ZeroMQ(zmq)套接字实现进程间通信,分别用于从 detokenizer 接收数据和向调度器发送请求。
    • 加载模型配置并根据是否多模态(multimodal)加载相应的处理器和标记器。
  2. 标记化处理:

    • _tokenize_one_request:处理单个请求的标记化,支持通过文本或嵌入向量(embeds)作为输入,并根据模型的上下文限制对输入进行检查。
    • generate_request:处理生成请求,标记化输入并将其发送给调度器进行处理。支持单个请求或批量请求的处理。
  3. 会话管理:

    • open_session & close_session:负责创建和关闭会话,生成会话 ID 或关闭现有会话。
    • _handle_open_session_req_output:处理会话打开请求的响应。
  4. 模型权重更新:

    • update_weights_from_disk, update_weights_from_distributed, update_weights_from_tensor:支持从磁盘、分布式系统或直接通过张量更新模型权重。
    • _wait_for_model_update_from_disk:等待并处理来自磁盘的模型更新请求,支持单节点或分布式环境中的更新。
  5. 批处理请求:

    • _handle_batch_request:处理批量请求的标记化和处理,支持并发处理多个请求,并生成多个响应。它也处理并行采样和批量策略。
    • _handle_batch_output:处理批量输出,支持处理不同类型的结果,如字符串、token ID 和嵌入(embedding)等。
  6. 请求管理:

    • _send_one_request:发送标记化的请求给调度器进行处理。
    • _wait_one_response:等待并处理单个请求的响应,支持超时或连接断开的处理。
    • create_abort_task:创建后台任务,用于在客户端断开连接时终止请求。
  7. 内存和性能监控:

    • flush_cache, abort_request, start_profile, stop_profile:这些方法用于管理缓存刷新、请求中止以及性能分析控制。
    • release_memory_occupation, resume_memory_occupation:用于系统中的内存管理。
  8. 日志和度量监控:

    • configure_logging:配置请求日志参数,如请求日志记录、请求转储文件夹等。
    • collect_metrics:收集性能度量指标,如第一次标记化时间、每个输出 token 的时间、端到端延迟等。
    • dump_requests:在达到设定的阈值时,将请求数据转储到文件中,以便离线分析。
  9. 信号处理与优雅关闭:

    • auto_create_handle_loop:自动创建事件循环以处理信号和请求。
    • sigterm_watchdog:确保在关闭时优雅地处理未完成的请求,并最终结束进程。

辅助功能:

  • detokenize_logprob_tokens:将 token 概率值和 token 索引反向标记化回文本或 token ID。
  • detokenize_top_logprobs_tokens:与 detokenize_logprob_tokens 类似,但用于处理 top-k tokens 的标记化。
  • convert_logprob_style:转换输出的 logprob 样式,包括返回 top-k logprobs 和其他相关的元数据。

作用:

TokenizerManager 类作为中间层,处理文本标记化和输入处理任务,管理请求的标记化、批处理请求、会话管理、模型更新、响应处理等。它与调度器、detokenizer 等其他系统组件进行交互,并提供了高效的请求处理和性能监控机制。

总结:

  • 进程间通信通过 ZeroMQ 套接字进行。
  • 模型标记化和生成标记化请求。
  • 批量处理高效处理多个请求。
  • 会话管理和请求状态追踪。
  • 模型权重更新与同步。
  • 日志记录度量监控
  • 内存管理性能分析
  • 信号处理与优雅关闭,确保进程在关闭时处理所有请求。

该类的核心目的是优化标记化任务、请求处理和模型管理,提供详细的性能监控和日志记录,以便进行高效的推理操作。

DetokenizerManager 类

DetokenizerManager 类及其相关函数的实现总结:

DetokenizerManager 是一个处理 解码反tokenization(反向token化) 的类,它的主要功能是将模型的输出 token id 转换为可读文本,通常用于生成模型的输出。它作为一个独立进程运行,并通过与其他进程(如 tokenizer)进行消息交换来完成任务。它支持批量处理(即同时处理多个请求)。

1. __init__ 方法
  • 初始化 DetokenizerManager,设置各种必要的参数和变量。
  • 初始化 ZeroMQ 套接字,进行进程间通信:
    • recv_from_scheduler:从调度器接收任务(例如需要反token化的 token id)。
    • send_to_tokenizer:将解码结果发送给 tokenizer 进程。
  • 根据传入的配置参数(如 skip_tokenizer_init)来决定是否初始化 tokenizer(即模型的反token化器)。
  • 使用 TypeBasedDispatcher 来处理不同类型的请求:
    • BatchEmbeddingOut:无需解码,直接返回。
    • BatchTokenIDOut:需要解码。
2. trim_matched_stop 方法
  • 用于处理解码过程中匹配到的停止符号(如 stop token 或文本),根据停止标志(finished_reason)来裁剪解码输出。
  • 如果匹配到停止符号,裁剪掉停止符号后的部分。
3. event_loop 方法
  • 这是主事件循环,负责不断接收调度器发送过来的任务,并将处理结果通过 ZeroMQ 套接字返回给 tokenizer 进程。
  • 对接收到的任务(如 BatchTokenIDOut 类型的任务)进行处理,并调用相应的解码逻辑。
4. handle_batch_embedding_out 方法
  • 处理 BatchEmbeddingOut 类型的输出。对于 embedding 类型的任务,不需要进行解码,直接返回原始的 embedding 输出。
5. handle_batch_token_id_out 方法
  • 处理 BatchTokenIDOut 类型的输出。此方法的核心功能是 增量解码(incremental decoding)。
  • 主要步骤:
    1. 遍历每个请求,检查 decode_status(解码状态),如果该请求的解码状态不存在或与当前解码版本不匹配,则初始化解码状态。
    2. 更新解码进度,进行 token ID 到文本的转换。
    3. 通过 tokenizer 反token化 token ids,将其转换为文本字符串。
    4. 对于流式解码的请求,根据是否结束来更新解码状态并生成部分输出文本。
    5. 合成最终的输出文本,并将结果封装为 BatchStrOut 类型发送回调度器。
6. LimitedCapacityDict
  • 这是一个扩展自 OrderedDict 的自定义字典类,它用于存储解码状态(DecodeStatus)。
  • 它有一个容量限制,当字典的大小超过设置的容量时,会移除最早加入的项,以确保字典不会占用过多内存。
  • 通过这种方式,它能有效管理解码状态,避免内存溢出。
7. run_detokenizer_process 函数
  • 该函数负责启动 DetokenizerManager 进程并处理异常。
  • 它设置进程标题、日志配置,并初始化一个 DetokenizerManager 实例来运行解码任务。
  • 如果遇到异常,会捕获并记录错误信息,并向父进程发送终止信号。

总结:

DetokenizerManager 是一个用于解码模型输出的管理器,它负责反token化 token id 并将其转换为文本。它与其他进程(如 tokenizer)进行通信,并能够处理大规模的批量解码任务。为了应对内存压力,它使用了带有限制容量的字典来管理解码状态,并通过增量解码的方式来逐步生成文本输出。此外,它还支持流式解码,允许在解码过程中逐步获取部分结果。


http://www.kler.cn/a/562822.html

相关文章:

  • React面试(一)
  • Linux-IPC-消息队列
  • Magma:多模态 AI 智体的基础模型
  • 半导体制造工艺(二)光刻工艺—掩模版
  • C++ Primer 泛型算法定制操作
  • 【十二】Golang 映射
  • Buildroot 添加自定义模块-内置文件到文件系统
  • 飞腾腾锐D2000 + OpenHarmony 4.1release部署deepseek大模型
  • 大白话React 虚拟 DOM,好处在哪里?跟vue有什区别
  • MySQL数据库入门:从零开始掌握数据库基础
  • C语言【进阶篇】之指针——涵盖基础、数组与高级概念
  • seacmsv9注入管理员账号密码+orderby+limit
  • 图的路径搜索算法
  • 通义灵码插件安装入门教学 - IDEA(安装篇)
  • 2. 在Linux 当中安装 Nginx(13步) 下载安装启动(详细说明+附加详细截图说明)
  • qt-C++笔记之QtCreator新建项目即Create Project所提供模板的逐个尝试
  • 【FastGPT】Linux系统使用podman-compose方式部署指南
  • web安全——分析应用程序
  • 山东大学软件学院ai导论实验之生成对抗网络
  • Qt 自定义控件及插件使用浅谈