Files
nano-vllm/nanovllm/layers/rotary_embedding.py
T
Rain-Bus ffd2defdfc add Chinese annotations to all source files for learning purposes
Annotated 16 source files covering the full architecture:
engine (scheduler, block manager, model runner), layers (attention,
linear, sampler, etc.), model (qwen3), and utils.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 21:33:15 +08:00

85 lines
2.7 KiB
Python

from functools import lru_cache
import torch
from torch import nn
def apply_rotary_emb(
x: torch.Tensor,
cos: torch.Tensor,
sin: torch.Tensor,
) -> torch.Tensor:
"""应用旋转位置编码(RoPE)。
将向量沿最后一维分成两半 (x1, x2),然后做旋转变换:
y1 = x1 * cos - x2 * sin
y2 = x2 * cos + x1 * sin
这等价于在二维平面上将每个相邻的 (x1, x2) 对旋转 theta 角度,
其中 theta = position / (base^(2i/d))。
"""
x1, x2 = torch.chunk(x.float(), 2, dim=-1)
y1 = x1 * cos - x2 * sin
y2 = x2 * cos + x1 * sin
return torch.cat((y1, y2), dim=-1).to(x.dtype)
class RotaryEmbedding(nn.Module):
"""旋转位置编码(Rotary Position Embedding, RoPE)。
RoPE 通过旋转矩阵编码位置信息,使得注意力计算中
内积只依赖相对位置(q_i · k_j 只与 i-j 有关),
从而天然支持外推到更长序列。
预计算所有位置的 cos 和 sin 值并缓存,避免重复计算。
缓存形状: [max_position_embeddings, 1, head_dim],中间维是 num_heads 的广播维度。
"""
def __init__(
self,
head_size: int,
rotary_dim: int,
max_position_embeddings: int,
base: float,
) -> None:
super().__init__()
self.head_size = head_size
assert rotary_dim == head_size
# 计算逆频率: 1 / (base^(2i/d)), i = 0, 1, ..., d/2-1
inv_freq = 1.0 / (base**(torch.arange(0, rotary_dim, 2, dtype=torch.float) / rotary_dim))
# 计算所有位置的频率: pos * inv_freq
t = torch.arange(max_position_embeddings, dtype=torch.float)
freqs = torch.einsum("i,j -> ij", t, inv_freq)
cos = freqs.cos()
sin = freqs.sin()
# 拼接 cos 和 sin,形状 [max_pos, 1, head_dim]
cache = torch.cat((cos, sin), dim=-1).unsqueeze_(1)
self.register_buffer("cos_sin_cache", cache, persistent=False)
@torch.compile
def forward(
self,
positions: torch.Tensor,
query: torch.Tensor,
key: torch.Tensor,
) -> tuple[torch.Tensor, torch.Tensor]:
"""根据位置索引查找 cos/sin 并应用到 Q 和 K。"""
cos_sin = self.cos_sin_cache[positions]
cos, sin = cos_sin.chunk(2, dim=-1)
query = apply_rotary_emb(query, cos, sin)
key = apply_rotary_emb(key, cos, sin)
return query, key
@lru_cache(1)
def get_rope(
head_size: int,
rotary_dim: int,
max_position: int,
base: float,
):
"""获取 RotaryEmbedding 的单例实例。
使用 lru_cache 确保相同参数只创建一次。
"""
rotary_emb = RotaryEmbedding(head_size, rotary_dim, max_position, base)
return rotary_emb