Files
Rain-Bus ffd2defdfc add Chinese annotations to all source files for learning purposes
Annotated 16 source files covering the full architecture:
engine (scheduler, block manager, model runner), layers (attention,
linear, sampler, etc.), model (qwen3), and utils.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 21:33:15 +08:00

88 lines
3.4 KiB
Python

import torch
from torch import nn
import torch.nn.functional as F
import torch.distributed as dist
from nanovllm.utils.context import get_context
class VocabParallelEmbedding(nn.Module):
"""词表并行 Embedding:将词表按 TP rank 切分。
每个 rank 只存储词表中属于自己的分片。前向计算时:
1. 只查找属于自己的 token ID,其他位置输出零。
2. 通过 all-reduce 聚合所有 rank 的结果。
"""
def __init__(
self,
num_embeddings: int,
embedding_dim: int,
):
super().__init__()
self.tp_rank = dist.get_rank()
self.tp_size = dist.get_world_size()
assert num_embeddings % self.tp_size == 0
self.num_embeddings = num_embeddings
self.num_embeddings_per_partition = self.num_embeddings // self.tp_size
self.vocab_start_idx = self.num_embeddings_per_partition * self.tp_rank
self.vocab_end_idx = self.vocab_start_idx + self.num_embeddings_per_partition
self.weight = nn.Parameter(torch.empty(self.num_embeddings_per_partition, embedding_dim))
self.weight.weight_loader = self.weight_loader
def weight_loader(self, param: nn.Parameter, loaded_weight: torch.Tensor):
"""加载属于当前 rank 的词表分片。"""
param_data = param.data
shard_size = param_data.size(0)
start_idx = self.tp_rank * shard_size
loaded_weight = loaded_weight.narrow(0, start_idx, shard_size)
param_data.copy_(loaded_weight)
def forward(self, x: torch.Tensor):
if self.tp_size > 1:
# 构造 mask:标记哪些 token ID 属于当前 rank 的范围
mask = (x >= self.vocab_start_idx) & (x < self.vocab_end_idx)
# 将全局 token ID 转为局部索引
x = mask * (x - self.vocab_start_idx)
y = F.embedding(x, self.weight)
if self.tp_size > 1:
# 非当前 rank 范围的 token 输出清零,然后 all-reduce 求和
y = mask.unsqueeze(1) * y
dist.all_reduce(y)
return y
class ParallelLMHead(VocabParallelEmbedding):
"""并行 LM Head:将隐藏状态映射为词表 logits。
与 Embedding 共享权重(如果模型配置了 tie_word_embeddings),
但前向逻辑不同:使用 F.linear(矩阵乘法)而非 F.embedding(查表)。
Prefill 阶段只需要每个序列最后一个 token 的 logits(因为只有最后一个 token 会用于采样),
所以先用 cu_seqlens 提取最后位置,再做矩阵乘法,减少计算量。
"""
def __init__(
self,
num_embeddings: int,
embedding_dim: int,
bias: bool = False,
):
assert not bias
super().__init__(num_embeddings, embedding_dim)
def forward(self, x: torch.Tensor):
context = get_context()
if context.is_prefill:
# Prefill: 只取每个序列最后一个 token 的隐藏状态用于采样
last_indices = context.cu_seqlens_q[1:] - 1
x = x[last_indices].contiguous()
# 矩阵乘法得到 logits
logits = F.linear(x, self.weight)
if self.tp_size > 1:
# Gather 所有 rank 的 logits 分片到 rank 0
all_logits = [torch.empty_like(logits) for _ in range(self.tp_size)] if self.tp_rank == 0 else None
dist.gather(logits, all_logits, 0)
logits = torch.cat(all_logits, -1) if self.tp_rank == 0 else None
return logits