最新文章
-
轻量SmolVLA:半层VLM、视觉压缩与异步推理赋能具身智能
概述 SmolVLA 是一套轻量级视觉-语言-行动(VLA)策略:前端用小型 VLM(视觉 SigLIP + 语言 SmolLM2)做感知与理解;后端用一个“动作专家”专门预测一段连续的低层控制。它与Pi0相比,参数规模少了将近10倍只有约0.45B(450M)。它的目标是在低算力下也能稳定执行多任务机器人控制,并保持接近甚至超过更大模型的效果。 SmolVLA 通过冻结 VLM、只训练动作专家(Action Expert),再配上四件“硬核小技巧”——取 VLM 的前半层、把每帧视觉 token 压到 64、以及Self-Attn—>Cross-Attn交替方式、异步推理;在大幅降算力与时延的同时,保持/逼近甚至超过更大模型的性能;注意力计算交替方式让动作专家既能不断获取外部视觉/语言指导,又能在内部序列里建立自己的时序与物理一致性,从而在算力可控的前提下提升稳定性和表现;提供异步执行,把“算下一段动作”和“执行当前动作”并行起来,显著减少空窗时间。 原理 结构 其模型结构主要有前端的VLM+后端的动作专家Action Expert组成,结构组成与Pi基本一致但实现方式有很大差异不同。先总结一下组件,稍后我们在稍作展开补充。 输入:文本指令token+视觉token(多摄像头采集的图像)+机器的状态(关节角、传感器)。 VLM(感知端):采用SmolVLM-2,VLM共有L层,但是只N=⌊L/2⌋层隐藏表示喂给动作专家。 Action Expert(控制端):一个Flow Matching Transformer,以以Cross → Causal Self → Cross 的“三明治”层为基本单元,按块预测n步动作序列。 输出:一次预测长度为n的动作块,对应机器的控制指令。 SmolVLA与Pi0有很多相似之处,不过其背后有四个关键设计,分别是Layer Skipping(层跳过)、Visual tokens reduction(视觉token压缩)、动作专家交替Self-Attn与Cross-Attn、异步推理。本小节先围绕前面3部分进行解析,异步推理于后续章节展开。 (1)Layer Skipping 层跳过就是把感知端的VLM(视觉+语言)的解码器中间层拿来当条件特征,而不是等它把整个层都计算完再输出;具体的做法就是只去$N=L/2$层的隐藏层表示送给动作专家,VLM权重冻结不训练。之所以要这样做经验规律表示(论文中作者提到)深层 LLM 层更偏“词级生成/长链路语义”,而中层已经集中了“指令 + 视觉”的对齐语义,对控制足够;继续往后让语言头生成 token 既耗算,又不是控制必须。前半层就停下,少算一半的自注意 + MLP,显存开销也随之下降。 大概得实现是把“文本指令 token、图像 token、状态 token”拼接,送入解码器;在第$N$层获取特征信息H然后用一个线性投影到Action expert所需的维度$d_a$作为$K/V$。如果在长时、极强推理型任务(需要深层语言生成)时可以适当调大N。 (2)视觉token压缩 在transformer里面,“token”就是序列里的一个位置。对图像来说,我们把一张图拆成很多小块(patch)或网格上的特征点,每个块/点用一个向量表示,这个向量就是视觉 token(不明白的可以看看ViT原理解析介绍)。视觉token压缩具体的做法是保整图、不裁块,把空间上密的token折叠到通道里,从而让token数变少。 设 ViT patch 后得到的特征图大小为 $\frac{H}{p} \times \frac{W}{p} \times d$,选一个下采样因子 $r$ (整数),做 space-to-depth: $$ \underbrace{\frac{H}{p} \times \frac{W}{p}}{\text{原网格}} \xrightarrow{\div r} \underbrace{\frac{H}{pr} \times \frac{W}{pr}}{\text{更稀疏的网格}}, \quad \underbrace{d}{\text{通道}} \xrightarrow{\times r^2} \underbrace{d \cdot r^2}{\text{更厚的通道}} $$ 计算示例: 输入尺寸:$512 \times 512$ 图像 Patch大小 $p=16$ ⇒ $32 \times 32$ token网格 选$r=4$ ⇒ $\frac{32}{4} \times \frac{32}{4} = 8 \times 8$ 网格 Token数:$8 \times 8 = 64$(减少$4^2=16$倍) 通道维度:$d=3 \rightarrow d=3 \times 16=48$ 可以看到如果是按照ViT的默认方式patch数量为32x32=1024,每个patch的维度为3x16x16=768,然后如果输入编码的$d_mode=512$那么经过线性投影变成矩阵(1024,512),即1024个token数量,每个token维度是512;而如果进行压缩后patch数量为8x8=64,每个patch的维度为48x16x16=12288,经过线性投影后变成(64,512)即64个token,每个维度是512。这里也可以看到原来是768降为到512,压缩的是从12288降维到512,降得比较猛,效果真的没有衰减吗? 总结一下smolvla在视觉token上进行了压缩,使用space-to-depth,对于512X512的图每帧token从1024降低到了64帧,如ViT的patch操作后得到的特征图维度为$\mathbb{R}^{\frac{H}{p} \times \frac{W}{p} \times d}$,选择下采样因子 $r$(整数)进行space-to-depth操作: $$ \mathbb{R}^{\frac{H}{p}\times\frac{W}{p}\times d} \xrightarrow{\text{S2D}_{r}} \mathbb{R}^{\frac{H}{pr}\times\frac{W}{pr}\times(d\cdot r^{2})} $$ 这样token数减少$r^{2}$倍,把细节挪到通道数去。 (3)动作专家交替Self-Attn与Cross-Attn 在动作专家中使用了交叉注意力机制,具体的排布可以配置。VLM的每一层与右边的Expert是一一对齐的,当然也可以配置Expert模型只有VLM层数的一半,每两层VLM才有一层Expert,那么其中VLM对齐层将为NONE,下图以VLM和Expert都为4层来示例交替注意力的实现。 Self-Attn(管自己,守时序):只允许第 i 步看 ≤i 的历史步(因果掩码),在动作序列内部传播动力学与约束,做轨迹的时间一致性与平滑。这一步相当于“内化刚才读到的证据”并让各步动作彼此协调。在计算注意力时,会将VLM的QKV与Expert的QKV进行拼接起来一起送入transformer计算,但通过掩码保证 VLM 的Q只看自己(不去读 Expert),而 Expert 的 Q 可以访问 VLM 的 K/V(即“读”VLM 语义),这样既提供了计算效率也提升了Expert的语义丰富性。 Cross-Attn(看环境,取证):看环境取证,让每个动作 token 先从条件特征里“读”一遍(条件=VLM中间层输出,含文本指令+多路视觉+状态)。这样动作表示一开始就被场景锚定,知道当下该关注哪里/哪件物体。具体是交叉注意力计算Q来自Expert Action自己,而K/V 来自 VLM 对应层的输出缓存。 训练 目标让动作专家 在观测条件 $o_t$ 下输出$v_\theta$速度场,把“噪声动作”沿路径推向真实动作块 $A_t$。这里跟Pi0和Flow Matching是一样大同小异,就简要说明一下。 观测条件:$o_t=H^{(N)}\in\mathbb{R}^{T\times d_o}\ \xrightarrow{\text{proj}}\ O_N\in\mathbb{R}^{T\times d_a}$(VLM 冻结;$O_N$ 作为 Cross-Attn 的 K/V)。 真实动作块:$A_t\in\mathbb{R}^{n\times d_{\text{act}}}$(建议标准化/白化)。 噪声:$\varepsilon\sim\mathcal N(0,I)$(同形)。 路径时间:$\tau\sim\mathrm{Beta}(\alpha,\beta)$。这里与Pi0不同。 路径与目标速度场: $$ A_t^{\tau}=\tau A_t+(1-\tau)\varepsilon,\left(A_t^{\tau}\mid A_t\right)=\varepsilon-A_t $$ 计算损失函数: $$ \mathcal{L}^{\tau}(\theta) = \mathbb{E}{p\left(\mathbf{A}{t} \mid \mathbf{o}{t}\right), q\left(\mathbf{A}{t}^{\tau} \mid \mathbf{A}{t}\right)} \left[ \left|| \mathbf{v}{\theta}\left(\mathbf{A}{t}^{\tau}, \mathbf{o}{t}\right) - \mathbf{u}\left(\mathbf{A}{t}^{\tau} \mid \mathbf{A}{t}\right) \right||^{2} \right] $$ 这里 $||\cdot||^{2}$ 表示欧氏范数平方;实现即逐元素 MSE。 为提升推理效率,动作专家隐藏宽度取 $d_a=0.75\times d$($d$ 为 VLM 的隐藏宽度)。 以下是一个简单的示例,可看看过程理解一下。 # 条件:取 VLM 第 N 层隐藏(冻结) with torch.no_grad(): H = vlm_hidden_at_layer_N(obs_tokens) # [T, d_o] KV = proj(H) # [T, d_a] 供 Cross-Attn 作 K/V(可缓存) # 构造路径与目标速度 A = sample_action_chunk() # [n, d_act] (已标准化) eps = torch.randn_like(A) # [n, d_act] tau = Beta(alpha, beta).sample(()).to(A.device) A_tau = tau * A + (1 - tau) * eps u = eps - A # 前向与损失 pred = v_theta(A_tau, KV) # 与 u 同形 loss = F.mse_loss(pred, u) # 对应 ||·||^2 loss.backward(); optimizer.step(); optimizer.zero_grad() 推理 在SmolVLA提到了异步推理,先来看看同步推理。 取最新观测 → 得到 $O_N$; 以噪声初始化,做 $K\approx10$ 步显式积分(Euler/Heun)得到一个动作块 $[a_t,\dots,a_{t+n-1}]$; 执行动作块 → 重复。 同步(sync)推理一次性生成长度为 $n$ 的动作队列(chunk)$A_t=\big[a_t,\dots,a_{t+n-1}\big]$,执行完再用新观测预测下一段。执行与推理串行,会产生“空窗”(执行停下等待推理)。而异步(async)推理是解耦“动作执行”和“动作预测”。机器人端持续消费现有队列;当队列余额低于阈值就异步把当前观测发给策略端预测“下一段”,回来后与旧队列重叠拼接。这样执行与推理并行,显著降低总时延,同时仍保持接近的成功率。 异步推理在架构上可以分为两个部分: RobotClient(机器人端):以控制周期 $\Delta t$ 持续下发队列头部动作;本地维护动作队列 $A_t$ 与触发逻辑;可做相似度过滤(见下节)。 PolicyServer(策略端):接收观测 $o_t$,运行策略 $\pi$ 预测新队列 $\tilde A_{t+1}$ 后返回;可放在更强的远端算力(GPU/工作站/云)。 看看论文中给出的算法实现: 设时域 $T$、段长 $n$、触发阈值 $g\in[0,1]$。 初始化:采集 $o_0$,发送到策略端,得到首段 $A_0\leftarrow\pi(o_0)$。 主循环 对 $t=0\dots T$:取出并执行一步 $a_t\leftarrow\text{PopFront}(A_t)$;若 队列余额占比 $\dfrac{|A_t|}{n}<g$,采集新观测 $o_{t+1}$;若 NeedsProcessing$(o_{t+1})$ 为真(见“相似度过滤”),则异步触发:①发送 $o_{t+1}$ 到策略端,得到新段 $\tilde A_{t+1}\leftarrow\pi(o_{t+1})$(异步返回);②用重叠拼接函数 $f(\cdot)$ 合并:$A_{t+1}\leftarrow f(A_t,\tilde A_{t+1})$;若本轮异步推理尚未结束:$A_{t+1}\leftarrow A_t$(继续消费旧队列)。 论文中的 NeedsProcessing 用于避免重复观测触发;$f$ 表示对重叠步的拼接(线性渐入/平滑器等,见下重叠拼接(Overlap & Merge))。 关键触发量,队列余额阈值 $g$: 触发条件:当 $\dfrac{|A_t|}{n}<g$ 时触发一次异步预测。 直觉:$g$ 越大,越提前触发,越不容易见底;但也会更频繁地调用策略端(算力/网络开销更高)。 论文的三个代表场景:$g=0$(顺序极限):耗尽队列才发起新预测 → 一定出现空窗等待;$g=0.7$(典型异步):每段大约消耗 $1-g=0.3$ 的比例就触发,计算摊在执行过程中,队列不见底;$g=1$(计算密集极限):步步都发观测 → 几乎“满队列”,反应最快但计算最贵(等同每个 tick 都前向一次)。 对于相似性过滤做法:主要动机是观测几乎不变时没必要反复调用服务器 → 降低抖动与无效请求。具体做法(论文)是用关节空间距离作为近似(例如欧式距离),若两次观测间距离 $<\varepsilon$(阈值,$\varepsilon\in\mathbb{R}^+$)则丢弃本次请求。兜底做法是若队列真的耗尽,则无论相似度如何都要处理最近的观测,以防停摆。 重叠拼接(Overlap & Merge):核心思想通过重叠区域平滑过渡避免硬切抖动,数学上实现是设旧队列尾部与新队列头部重叠 $w$ 步,对第 $k=0,\dots,w-1$ 步做线性渐入融合: $$ a_{t+k}^{\text{merge}} = \alpha_k \tilde{a}{t+1+k} + (1-\alpha_k) a{t+k}, \quad \alpha_k = \frac{k+1}{w} $$ 也可用余弦窗、Slerp 或在位姿/速度层加滤波器;关键是重叠 + 平滑避免硬切抖动。 总结一下对于异步并发处理有优势,但是需要处理其中的细节主要是: 维护动作队列。前台执行当前队列,后台异步预测下一段;在重叠窗口内平滑拼接新旧段。 避免队列见底的解析下界,设控制周期为 $\Delta t$,则避免队列耗尽的充分条件为 $$ g\ \ge\ \frac{\mathbb E[\ell_S]/\Delta t}{n} $$ 其中 $\ell_S$ 为一次(本地/远端)推理延迟,$\Delta t$ 控制周期,$n$ 为动作块长度。从触发到返回的时间内(平均 $\mathbb{E}[\ell_S]$ 秒)你还要有足够的剩余动作可执行(约 $\mathbb{E}[\ell_S]/\Delta t$ 个),所以触发点的剩余比例至少为这部分占 $n$ 的比值。论文配合给出真实控制频率示例(如 $30$ FPS $\to \Delta t=33,$ms),并分析了不同 $g$ 对队列曲线的影响(下图)。 数据 论文中提到的复现配置如下: 模型与输入:冻结 VLM,仅训动作专家;取 前半层 $N=\lfloor L/2\rfloor$ 的 $H^{(N)}$ → 投影成 $O_N$。图像 512×512;64 视觉 token/帧;状态→1 token;bfloat16。 动作块与解算: 每段 $n=50$;推理 10 步 Flow-Matching 积分。 优化:训练 200k step;global batch 256;AdamW($\beta_1=0.9,\ \beta_2=0.95$);余弦退火学习率 $1\times10^{-4}\to2.5\times10^{-6}$。 参数量: 总计 ≈450M;动作专家 ≈100M;若 VLM 有 32 层,取前 16 层。 论文中提到需要关注的信息: 模拟(LIBERO/Meta-World):中等规模(~0.45B)已对标/超过若干更大基线;放大到 ~2.25B 继续提升。 真实机器人(SO100/101):多任务平均成功率 ≈78%,优于 π0 与 ACT。 异步 vs 同步:成功率相近,但异步平均完成时间缩短 ~30%,固定窗口内完成次数显著更多。 论文中提到的落地经验: 形状与缓存:把 $H^{(N)}(T\times d_o)$ 投到 $d_a$ 后当 K/V;两次 Cross 复用 KV 缓存。 因果掩码:Self-Attn 必须用因果掩码(第 $i$ 步不可看未来)。 视觉压缩:优先用 space-to-depth 固定 64/帧;任务特别细腻时用 $r=2$(256 token)或多尺度/ROI 方案。 起步超参:$n=50$、积分步数 10、$N=\lfloor L/2\rfloor$、$d_a=0.75d$。 异步阈值:按 $g\ge\frac{\ell_S/\Delta t}{n}$ 设定,取略高于下界更稳;配合相似度过滤与重叠拼接。 动作归一化:对不同量纲(角/位移/速度)做标准化,训练更稳、积分不发散。 交替注意力有效:Cross + 因果 Self 明显优于单一注意力;“用前半层”普遍优于“直接换小 VLM”。 参考:https://arxiv.org/abs/2506.01844 -
浅析Pi0 :VLM 与 Flow Matching 的结合之道
概述 传统机器人策略模型往往局限在单一任务或平台,难以跨场景泛化。与此同时,大规模 视觉-语言模型(VLM) 已展现出卓越的语义理解与任务指令解析能力。如果能将 VLM 的语义理解能力 与 Flow Matching 的连续动作建模能力 结合,有望构建具备泛化与实时性的机器人通用控制器。 Pi0 (π0)正是这样一个探索:基于 PaliGemma(3B 参数 VLM) 作为感知与语义主干,结合 Flow Matching 动作生成器,实现语言到多机器人动作的端到端建模。它借鉴了大语言模型的“预训练 + 微调”范式,把互联网级别的语义知识和机器人操控数据结合起来,从而实现跨平台、跨任务的通用机器人控制。 我们此前分析了VLM、Flow Matching原理,掌握这些之后理解Pi0是非常简单的。 原理 结构 模型结构主要有VLM主干+ Action Expert动作专家构成。 VLM主干:基于 PaliGemma(一个 3B 参数的 VLM),继承互联网规模的图像+语言知识。 Action Expert(动作专家):额外的子网络,负责用 Flow Matching 方法预测连续动作向量。 模型的输入包括观测的多视角RGB图像、语言指令、机器人自身状态(关节角、传感器),经过模型处理后输出为高频动作序列(每秒50HZ动作chunk),这些动作控制单臂、双臂、移动操作臂等多类机器人。 训练 我们训练的目标是让$A_t^0 \sim \mathcal{N}(0, I)$ ——>$A_t$(真实动作),希望模型学会如何把一个“噪声动作”流动成一个真实的动作。就像扩散模型是“噪声 → 图像”,这里是“噪声动作 → 专家动作”。 在训练的时候要让噪声动作流向真实动作,我们需要构建一个路径,这里依旧使用的是直线路径。 $$ A_t^\tau = \tau A_t + (1-\tau)\epsilon, \quad \epsilon \sim \mathcal{N}(0,I) $$ 这个公式跟我们在Flow Matching文章中的训练公式是不是一样的。我们在噪声动作$\epsilon$和真实动作$A_t$之间,采样一个"插值点"。$\tau $表示时间的进度,当$\tau = 0$时完全是噪声,当$\tau = 1$时完全是真实动作,这个就构造了一条噪声到动作的直线路径。 我们的目标是要让模型告诉我们"从当前点$A_t$应该往哪个方向移动,才能逐渐靠近真实动作",因此就是在计算在每个时间速度。 $$ u(A_t^\tau \mid A_t) \triangleq \frac{d}{d\tau} A_t^\tau $$ 代入公式可得: $$ \frac{d}{d\tau} A_t^\tau = A_t - \epsilon $$ 而论文中成$u(A_t^\tau \mid A_t) = \epsilon - A_t$,只是方向约定相反,本质上没有差异。上面的公式,目标速度就是噪声 - 动作,它定义了“流动的方向”。就像在地图上,目标向量场就是指路的“箭头”。这样得到了真实的速度场,我们就可以在训练的时候计算损失了。 $$ L(\theta) = \mathbb{E}\big[ | v_\theta(A_t^\tau, o_t) - u(A_t^\tau \mid A_t) |^2 \big] $$ $v_\theta$是神经网络(Action Expert),输入 当前 noisy action + 观察$o_t$,输出预测的速度场。损失函数就是 预测的速度场 vs 真实的目标速度场 的均方误差 (MSE)。训练目标:让模型学会在任意中间点给出正确的“流动方向”。 推理 $$ A_t^{\tau+\delta} = A_t^\tau + \delta v_\theta(A_t^\tau, o_t) $$ 推理生成也比较简单,从噪声动作$A_t$开始,每次迭代一步:输入当前的$A_t^\tau $和观察的$o_t$,接着模型给出速度场,就沿着这个方向走一步(步长$\delta$),然后按照这个步骤重复迭代,最终得到真实的动作$A_t$。和扩散模型不同:这里不需要几十/上百步,只要 ~10 步 ODE 积分,就能得到高质量动作,适合机器人实时控制。 参考: https://arxiv.org/abs/2410.24164 -
Flow Matching:让生成模型“流动”起来
背景 上一篇文章分析了diffusion扩散模型。diffusion扩散模型做法是加噪声、再一步步去噪,训练过程复杂,还需要 carefully 设计噪声调度。 Flow Matching提出了更直接的方式:与其通过一大堆离散的“加噪/去噪”步骤,不如直接学习一个连续的流动 (flow),让点从噪声“顺滑地流动”到目标数据。 原理 把生成过程看作流体运动,想象有一堆水滴(噪声),通过一个力场,它们会被推动、流动,最后聚集成目标形状(真实分布的数据)。Flow Matching从物理学角度学一个"速度场",让数据点从"源分布(噪声)"流动到"目标分布(真实数据)"。 如图所示左边是源随机点云,中间是目标形状,右边是实际使用模型生成的形状。为了更直观的体会再来看下图从源分布逼近目标分布的过程。 左图就是源分布的点在不同时间应该朝那个方向运动直到最终的目标分布,右图是不同时刻让这些点应该往哪个方向进行流动速度场。 接下来看看数学怎么表示,我们希望从源分布$p_{src}$(比如高斯分布)按照流动的方式到目标分布$p_{data}$,那么方式就是在每个时间$t$为每个点$x$都指定一个速度$v_\theta(x,t)$,这样在不同时间就知道点该往哪里动,那么点的轨迹就完全确定了。在数学上点的位置$x(t)$随着时间变化,那就是速度场向量,即常微分方程 $$ \frac{dx}{dt} = v_\theta(x, t) $$ 左边的$\frac{dx}{dt}$描述的是随时间的变化率,右边$v_\theta(x, t)$就是我们要学习的"速度场",它给出"$t$时刻,位置$x$应该往哪里动"。 总结一下Flow Matching 里速度场写成 ODE,是因为它给出“点的位置随时间的变化率”,这正是常微分方程的定义,生成过程就是解 ODE,从噪声轨迹流到数据。 推理 模型要做的事情就是要预测出下一个时间刻应该往哪里走,输出是一个速度场;推理的过程就是解常微分方程ODE。 输入:当前位置$x \in \mathbb{R}^d$,当前时间$t \in [0,1]$。 输出:模型计算输出当前的速度向量,即$\frac{dx}{dt} = v_\theta(x,t)$。 更新:根据速度向量$v_\theta(x,t)$通过积分公式把所有时间段速度累积起来得到最终点$x(1)$。 $$ x(1) = x(0) + \int_{0}^{1} v_\theta(x(t),t)\,dt $$ 直观理解就是神经网络提供"切线方向",积分就是"把所有切线拼起来",形成完整的轨迹,从噪声走到目标分布。 但实际过程中我们用离散的数值方式,比如欧拉法,如下: 时间从$t$=$0$到$t$=$1$,分成若干小步(比如50或100步),在每一步按照上面公式更新。 输入:当前的位置$x_k$和当前时间步$t_k$。 输出: 模型预测计算速度向量场$v_\theta(x_k, t_k)$。 更新:通过欧拉法更新公式更新下一步位置$x_{k+1} = x_k + v_\theta(x_k, t_k)\Delta t$ 每一步模型计算出速度向量$v_\theta(x_k, t_k)$然后根据公式进行更新下一步的位置,新位置=旧位置+速度x时间步长;$v_\theta(x_k, t_k)\Delta t$计算每次迭代的移动距离(速度x时间),这就是基本的欧拉积分法,直观的意义是在短时间$\Delta t$内,点会沿着速度场方向前进一点。不断的进行多步迭代,从$x_0$出发,逐步得到$x_1$,$x_1$,$x_2$,$x_3$,....,$x_k$,当$k$=$K$时,$t_K$=$1$,就得到最终的$x(1)$。 怎么理解$t_k$、$x_k$、$\Delta t$? $t_k$是第$k$步对应的时间点,如果flow matching的时间区间是[0,1],我们把它切成$K$个小步(如50或100步),每个时间点就是$t0$=$0.00$,$t1$=$0.01$;$\Delta t$是时间步长如把时间区间[0,1]均匀分成100步,那么$\Delta t$=$1/100$=$0.01$;$x_k$是表示在$t_k$时的点(或点云),初始时从高斯噪音采样到。 下面再来一个直观图展示了Flow Matching推理的过程。 灰色箭头:代表速度场$v_\theta(x_k, t_k)$,告诉每个位置的点应该往哪里走。上图设定的目标是(2,2)。 绿色点:初始$x(0)$来自噪声分布即源分布。 红色叉:表示目标位置,代表数据分布的一个样本区域。 蓝色折现轨迹:数值积分结果,点一步一步验证速度场北推向目标。 训练 我们希望模型学会把源分布$p_{src}$流动到目标分布$p_{data}$;换句话说就是有$x_0 \sim p_{\text{src}}$,输出目标点$x_1 \sim p_{\text{data}}$我们要训练一个速度场网络$v_\theta(x_k, t_k)$,让它指导点$x_t$沿正确的路径从$x_0$——>$x_1$。 要训练行动轨迹需要知道真实轨迹这样才能和实际预测值做比较求损失,而训练的关键却正好是不知道真实的速度场。那如何构建训练的目标了?可以设计一个简单的"参考轨迹",如直线路径$x_0$——>$x_1$。 $$ x_t = (1 - t)x_0 + tx_1 $$ 给定输入样本$(x_0 \sim p_{\text{src}},x_1 \sim p_{\text{data}})$,其中$x_0$是源随机位置,$x_1$是目标位置。在训练的时候我们自己定义一条直线路径$x_0$——>$x_1$,我们不能一步到位,而是要有一个流动的过程。 这条直线路径上的真实速度公式对$t$求偏导,而恰巧速度是一个常数(始终指向目标点$x_1$)。 $$ u^\star = \frac{dx_t}{dt} = x_1 - x_0 $$ 既然速度方向就是一个常数$x1-x0$,为什么不直接一步把$x1$变成$x0$,而要搞成连续流动了? 如果一步到位公式就变成$x_1 = x_0 + (x_1 - x_0)$,相当于直接跳到目标点,完全不需要ODE、积分、网络。但问题在于训练时我们有配对的$(x_0, x_1)$,所以能写下$(x_1-x_0)$,而推理时了我们只有$x_0 \sim p_{\text{src}}$,并不知道该对应那个$x1$,因此不能一步到位,因为没有$x_1$可直接计算。 最后我们训练目标就是网络预测的速度$v_\theta(x_k, t_k)$,损失就网络预测的速度$v_\theta(x_k, t_k)$与真实的速度$x_1-x_0$的均方误差。训练完成之后,网络就学会了在任何位置$x_t$、时间$t$给出正确的速度场。 $$ \mathbb{E}\Big[ || v_\theta(x_t, t) - (x_1 - x_0) ||^2 \Big] $$ 源码示例 为了加深理解,程序实现一个最小的 Conditional Flow Matching(直线路径的 Rectified Flow)示例,学习时间条件速度场 vθ(x,t),把二维标准高斯源分布推到左右两个高斯簇的目标分布。训练后输出两张图:训练损失曲线 cfm_loss.png,以及三联静态图 cfm_overview.png(源/目标/生成)。 # -*- coding: utf-8 -*- # Flow Matching demo: source N(0,I) -> target: Two Gaussians (left & right) # 输出: # 1) cfm_loss.png(训练损失) # 2) cfm_overview.png(三联图:Source / Target / Generated) # 依赖:pip install torch matplotlib import time, warnings warnings.filterwarnings("ignore", category=UserWarning, module="matplotlib") import numpy as np import torch, torch.nn as nn, torch.optim as optim import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt # ------------------------- 配置 ------------------------- device = torch.device("cuda" if torch.cuda.is_available() else "cpu") torch.manual_seed(0) XLIM = (-4.0, 4.0) YLIM = (-3.0, 3.0) # ------------------------- 数据分布 ------------------------- def sample_source(n): return torch.randn(n, 2, device=device) def sample_target(n): sigma = 0.35 means = torch.tensor([[-2.0, 0.0], [2.0, 0.0]], device=device) idx = torch.randint(0, 2, (n,), device=device) mu = means[idx] return mu + sigma * torch.randn(n, 2, device=device) # ------------------------- 模型:速度场 v_theta(x,t) ------------------------- class VelocityNet(nn.Module): def __init__(self, h=64): super().__init__() self.net = nn.Sequential( nn.Linear(3, h), nn.ReLU(), nn.Linear(h, h), nn.ReLU(), nn.Linear(h, 2), ) def forward(self, x, t): return self.net(torch.cat([x, t], -1)) # ------------------------- 训练(CFM,直线路径) ------------------------- def train_cfm(steps=2000, batch=512, lr=1e-3): net = VelocityNet().to(device) opt = optim.Adam(net.parameters(), lr=lr) loss_hist = [] t0 = time.time() for s in range(1, steps + 1): x0 = sample_source(batch) x1 = sample_target(batch) t = torch.rand(batch, 1, device=device) xt = (1 - t) * x0 + t * x1 u = x1 - x0 pred = net(xt, t) loss = ((pred - u)**2).mean() opt.zero_grad(set_to_none=True) loss.backward(); opt.step() loss_hist.append(float(loss)) if s % 200 == 0: print(f"[{s}/{steps}] loss={loss:.4f}") print(f"Train time: {time.time() - t0:.2f}s") return net, loss_hist # ------------------------- 采样(生成轨迹) ------------------------- @torch.no_grad() def generate_traj(net, n=3000, steps=60): x = sample_source(n) dt = 1.0 / steps traj = [x.cpu().numpy()] for k in range(steps): t = torch.full((n,1), (k + 0.5) * dt, device=device) x = x + net(x, t) * dt traj.append(x.cpu().numpy()) return traj # ------------------------- Matplotlib 工具 ------------------------- def save_loss(loss_hist, path): plt.figure(figsize=(6, 3.6)) plt.plot(loss_hist) plt.title("Training Loss (CFM)") plt.xlabel("step"); plt.ylabel("MSE") plt.tight_layout(); plt.savefig(path, dpi=140); plt.close() print(f"Saved {path}") def save_overview(src, tgt, gen, path): fig, axes = plt.subplots(1, 3, figsize=(12, 4)) titles = ["Source (Noise)", "Target (Two Gaussians)", "Generated (Flow Matching ODE)"] for ax, title, pts in zip(axes, titles, [src, tgt, gen]): ax.scatter(pts[:, 0], pts[:, 1], s=5, alpha=0.75) ax.set_title(title) ax.set_xlim(*XLIM); ax.set_ylim(*YLIM) ax.set_xticks([]); ax.set_yticks([]) plt.tight_layout(); plt.savefig(path, dpi=140); plt.close() print(f"Saved {path}") # (已移除 GIF 相关工具与依赖) # ------------------------- 主程序 ------------------------- if __name__ == "__main__": # 训练 net, loss_hist = train_cfm(steps=2000, batch=512, lr=1e-3) save_loss(loss_hist, "cfm_loss.png") # 数据与生成 src = sample_source(3000).cpu().numpy() tgt = sample_target(3000).cpu().numpy() traj = generate_traj(net, n=3000, steps=60) gen = traj[-1] # 三联静态图 save_overview(src, tgt, gen, "cfm_overview.png") # (已移除 GIF 生成步骤) print("All done.") (1)模型结构 模型结构为VelocityNet,使用了一个小型 MLP,输入 3 维(x 的 2 维 + t 的 1 维),输出 2 维速度向量。结构为Linear(3,64) → ReLU → Linear(64,64) → ReLU → Linear(64,2)。forward(x,t) 直接拼接 [x, t] 后送入网络。这里没有使用时间位置编码。 (2)训练过程 训练函数为train_cfm(steps=2000, batch=512, lr=1e-3),具体过程如下: 1) 每步采样源 x0 ~ source 和目标 x1 ~ target,独立均匀采样 t~U(0,1)。 2) 构造直线桥接点 xt = (1 - t)x0 + tx1。 3) 定义理想恒定速度 u = x1 - x0(常速,不依赖 t)。 4) 让网络在 (xt, t) 上预测 pred = vθ(xt,t),用 MSE(pred, u) 作为损失。 5) Adam 更新一次;每 200 步打印当前损失。 6) 返回训练好的 net 与 loss_hist。 直观理解,虽然 u 依赖 (x0, x1),但模型只观察 (xt,t)。训练学到的是条件期望 E[x1 - x0 | xt, t],也就是让网络在直线路径上学会把点往“正确方向”推的平均速度。这是直线路径 CFM 的核心思想。 (3)采样 采样函数为generate_traj,从源分布采样 n 个起点,设步长 dt=1/steps。用无梯度模式按欧拉法更新:对每步 k,用中点时间 t=(k+0.5)dt 预测速度 vθ(x,t),然后 x ← x + vθ(x,t)dt。记录每一步的点云到列表,返回整个轨迹(列表元素是 numpy 数组)。主程序中只使用最后一步作为“生成结果”。 (4)主流程 最后就是主流程先调 train_cfm 进行训练,保存 cfm_loss.png。分别采样 3000 个源样本 src 与目标样本 tgt。生成 n=3000、steps=60 的轨迹 traj,并取 gen = traj[-1] 作为最终生成样本。保存 cfm_overview.png,展示源/目标/生成的对比。 整体主要的实现点为 目标路径:x_t = (1 - t) x0 + t x1,直线连接源与目标。 理想速度:dx/dt = x1 - x0,使点沿直线以恒定速度匀速前进。 学习目标:在 (xt,t) 上回归 u = x1 - x0 的条件期望;推断时只需网络与当前状态,无需知道具体的 x0 或 x1。 数值积分:使用欧拉法简单高效;采用中点时间能略微减小离散误差。 -
Diffusion:如何从噪声中生成清晰图像
概述 图像生成是当下研究的热点,diffusion是一种人工智能领域图像生成的基础模型,当下Stable diffusion、DALL·E、MidJourney文生图模型的基座都使用了diffusion。 diffusion扩散模型属于生成式模型,生成图像不是正向从0到1构成图像而是反向的预先生成一个随机的噪声图中然后根据文本提示词逐渐的去噪"扣"出图像。主要思想是先训练一个权重模型,把一张清晰照片弄得越来越模糊(加入噪声),然后把模糊的图片融合文本提示词作为输入去训练一个模型学会“擦亮它”,反向恢复成清晰图像。训练完成后,就得到了模型的权重,那么使用这个权重模型只要给一副完全随机的“噪点图”和要生成图片的提示词,它就能一步步去掉噪声,变出一幅崭新、逼真的图片。 借用米开朗基罗雕刻"大卫像"时说的"我在大理石中看见天使,于是我不停地雕刻,直至使他自由”。而diffusion也是这样的原理,通过随机生成的一个噪声图片,结合输入的文字去掉噪音恢复到你想象的照片样子。 工作原理 推理 (1)输入阶段 输入阶段有3个输入信息,分别是随机噪声图像、文本提示、时间步。 随机噪声图像:最开始随机生成一个高斯噪声的图片。 文本提示:告诉模型,想要生成的内容是什么。 时间步:指明当前是去噪第几步,模型是一个多步迭代去噪的过程。按照数字依次递减进行迭代,数值越小去噪强度越弱。 (2)模型处理 核心组件是Noise Predictor(一般是一个U-Net结构神经网络),输入的带噪图像$X_{t}$、时间步$t$、以及提示文本通过Noise Predictor预测出这张图里有多少噪声,生成一张噪声图片$\epsilon^\theta(x_t, t, c)$。 (3)输出阶段 将输入-减去预测出的噪声图片就得到最后的去噪图片了,$x_{t-1} = x_t - \epsilon^\theta(x_t, t, c)$。 (4)迭代 迭代一轮得到一个降噪图片之后,接着将输出的降噪图片作为输入的带噪图片按照之前的步骤进行重复,直至$t$=$T$(比如$1000$)一直迭代到$t$=$0$得到最终的图像。当所有步骤完成后,随机噪声逐渐被“洗掉”,生成的就是一张符合条件描述的清晰图像。 下面是推理过程的算法伪代码 初始化:$x_T \sim \mathcal{N}(0, I)$从标准高斯分布中采样一个随机噪声向量(或噪声图像),作为生成过程的起点。 迭代循环:从$t$=$T$到$t$=$1$逐步迭代,每次去掉一部分噪声。如果$t$>$1$,额外采样一个噪声向量$z\sim \mathcal{N}(0, I)$。如果$t$=$1$,则$z$=$0$,即最后一步不加噪声。 核心公式:先去掉预测的噪声(括号里面的部分)得到更接近干净数据的样子,接着在进行缩放调整(除以$\sqrt{\alpha_t}$),最后加一点随机噪声$\sigma_t z$来保持生成的多样性。 输出:当循环结束时,最终的$x_0$就是最终生成的清晰图像了。 对于核心公式的参数这里稍微补充一下 参数 $\epsilon_\theta(x_t, t)$是预测的噪声; 参数$\alpha_t$取值范围是$0$~$1$,控制在第$t$步中保留多少原始图像信息加入多少噪声,当$\alpha_t$接近$1$时几乎保留全部信息,噪声小;当值趋于0时,原始信号衰减就大,噪声比例高; 参数$\bar{\alpha}_t$累积乘积参数,表示从第$1$步到$t$步累积保留原始信息的比例。 参数$\sigma_t z$随机扰动项,保持采样的多样性。 训练 训练模型我们需要把模型的输出结果和真实值进行比较才能进行梯度下降找到网络权重,那该如何设计准备训练结果和真实值的数据? diffusion模型的核心是要预测出图片的噪声分布然后减去预测的噪声得到真实的输出照片。以上图第一步进行说明,使用原始的图片,通过随机生成一个噪声图($x_{1}$)迭加作用到原始图片上这样就得到了模型的带噪声的输入图像,然后融合文本、时间步模型前向计算得到噪声图($x_2$)。已经知道了真实的噪声图是$x_{1}$,那么计算$x_{1}$和$x_{2}$的相似性就可以计算出损失了。 训练过程中关于图片-文本可以从Lion平台上获取,通过上面步骤取样照片然后不断加强噪声得到越来越模糊的图片送入模型预测进行计算迭代权重,让模型学会真正准确预测每一步中"加进去的噪声",训练完成之后,模型学会了如何"识别噪声",在推理时就从纯随机噪声$x_T$出发,通过文本提示词反向迭代去噪得到最终的想要的照片。 论文中的伪代码如下: repeat:表示循环执行训练过程。 采样数据:$x_0 \sim q(x_0)$从真实数据分布$q(x_0)$中采样一个训练样本比如一张猫。 随机采样时间步:$t \sim \text{Uniform}({1, \dots, T})$随机挑选一个扩散的时间步$t$,确保模型能在不同噪声水平都学会去噪。 采样噪声:$\epsilon \sim \mathcal{N}(0, I)$从标准的高斯分布中采样一份噪声,用于后续得到到原始图片上。 梯度下降更新参数:计算预测噪声和真实噪声$\epsilon$的均方误差。 模型 本章节简要说一下业界文生图模型,其结构可以总结为以上3个部分,文本编码器、生成式模型、解码器。 文本编码器:将用户输入的文本提示通过预训练的文本编码器如CLIP Text Encoder将自然语言转化为向量表示。 生成式模型:将编码的文本向量和噪声图像noisy latent作为输入,然后逐步迭代去噪。这里的模型如有diffusion、autoregressive等。输出是压缩到更低维的"潜在空间"。 解码器:将生成式模型的输入Latten Representation通过解码器还原最终生成清晰图像。生成式模型一般输出的是压缩的低维潜在空间,这样可以降低每一步迭代的计算量,最终加一个解码器来将其还原。 下面是stable diffusion、DALL-E、Imagen的模型结构图,核心组成都是上面3个部分,这里就不过多阐述了。 stable diffusion DALL-E Imagen 本文主要来自李宏毅Diffusion Model原理解析的笔记。 -
视觉 Token 如何注入语言模型?VLM拆解
VLM与LLM 如果说我们有一张图片、一个图表想让大模型来帮忙理解那应该要怎么实现了? 标准的LLM语言大模型只能处理文本序列,是不能够读取图像的,如果没有办法将视觉的数据转换为LLM能够理解的形式,那么LLM是无法处理的。需要注意的是我们这里说的LLM并不是transformer,LLM指的是大语言模型如DeepSeek,GPT,Qwen,其是使用了transformer架构应用,而transformer是一种神经网络架构。LLM的token专门指的是文本token来自Tokennizer其输入是字节流,而transformer不一定是文本单位,可以是任何序列元素如词、图像(上节说的ViT)等。 要解决语言大模型理解图片,那么这就是视觉-语言大模派上用场了。回顾一下我们此前说的ViT视觉大模型,是不是就是用提取图像特征的,因此本章节我们要介绍的正是视觉大模型与语言大模型的融合:vision language model,即视觉-语言大模型。 视觉-语言大模型是视觉大模型+语言大模型的结合,其主要有哪些用途?核心用处是让 AI 能够“读图如读文”,在多模态场景下实现理解、生成和交互,如下示例: 内容理解:多模态的问答VQA,比如给一张图让大模型理解图片里面描述了什么,让其识别图片里物体、动作、关系,自动生成图片说明(Image Captioning)等等。 信息获取与搜索:给一张图找对应的描述,或给一句话找到相关图片(比如电商商品搜索)以及搜索引擎文字搜图或图搜文字等。 模型结构 发展到今天有很多的视觉-语言大模型,各自都有自己的架构实现。我们先以VILA为例来说明一下视觉-语言大模型的关键组件,上图来自论文:VILA: On Pre-training for Visual Language Models。 上图我们先来分析一下其运作流程,可以分为左右两部分:左图可以看成是怎么跑起来的(数据流推理/前向),右图是如何训练的步骤。 数据流 左图:数据流推理 ViT: 首先将图像送入ViT视觉编码器,提取出视觉特征。 Projector:因为ViT输出的特征维度可能与LLM词嵌入维度不一致,所以这里也需要通过一个线性层/小MLP做映射,把视觉特征空间转换为LLM的嵌入空间为,为上图的visual tokens。 token融合:文本提示经过tokenizer转换为text tual tokens与visu tokens在同一序列中进行拼接或交错输入到LLM。 LLM生成:进入LLM后,视觉与文本已在同一token流中就可以共同参与计算注意力,最后输出最后的结果a cat。 训练策略 右图:训练策略 训练主要分为3个阶段,projector初始化,交错式预训练、监督微调,主要涉及projector和LLM模型参数更新,火焰代表参数会被更新,雪花代表冻结不更新。 Step 0 Projector初始化:只训Projector,LLM冻结,通常ViT也冻结,目的是先把视觉特征大致对齐到LLM词向量空间,避免一上来就动LLM破坏语言能力。 Step 1 交错式预训练:同时更新Projector与LLM,在包含图像-文本交错(图像token混在文本序列里)的数据上做自回归训练。更新LLM才参数才能让LLM学会"在文本上下文中使用视觉特征";图像和文本的输入进行交错能够教会模型跨模态对齐与引用。 Step2 监督微调:联合微调projector与LLM,输入数据是指令时的多模态问答/对话。这样可以把能力对齐到agent任务上,同时避免LLM文本能力退化。 小结 通过VILA架构为例,我们大概了解了VLM视觉-语言大模型的架构,我们总结下VLM模型架构主要可以分为三大部分: 视觉编码器:将视觉输入转换为结构化的数值表示,提取语义信息。如基于transformer架构的ViT,将图像分割成小块,通过transformer编码全局和局部特征;如传统基于CNN卷积神经网络ResNet,擅长提取局部纹理特征。 投影器:视觉和文本嵌入必须对齐到一个共享的多模态嵌入空间。通常由一个较小的模块完成,称为投影层或融合层:常见的实现方式有MLP通过全连接层转化维度(如DeepSeek-VL);交叉注意力机制通过动态关联图像区域与文本token(如llama 3.2 vision),增强空间理解。 LLM:接收图像+文本融合后的多模态输入,生成自然语言响应(如描述、答案、推理)。 QA1:这里的投影器projector与此前我们分析ViT中的projection线性投影有什么不一样? ViT中的projection作用是将图像分割后的每个小块线性映射为固定维度向量(token)作为transformer编码器的输入;而VLM的projector是将视觉编码器(如ViT)输出特征映射到语言大模型(LLM)的文本嵌入空间,解决跨模态语义鸿沟。一个是作用在ViT的输入映射为transformer的标准输入另外是一个作用再ViT的输出映射为LLM的标准输入。 QA2:为什么要将图像和文本进行融合多模态嵌入空间? 多模态嵌入空间是VLM具备推理能力的关键,通过在同一潜在空间表示视觉和文本信息,主要有以下优势: 上下文感知:使不同模态之间能够进行丰富的交互,这意味着模型能够将文本概念(例如,“公交车”、“十字路口”)准确地与视觉特征信息(公交车位置、颜色、十字路口)连续起来。 语义连接:将抽象的文本概念与具体的视觉示例进行对齐。例如模型不仅将“行人”理解为单词,还将其视为图像中可视觉识别的实体。 跨模态推理:允许模型在不同模态之间进行推理,回答复杂的视觉问题,进行逻辑推断,或检测微妙的视觉-文本差异。 模型预训练 训练史 先来看看视觉识别训练的发展,可以划分为5个阶段:传统机器学习与预测,深度学习从零训练与预测,监督式预训练、微调与预测,无监督预训练、微调与预测,视觉语言模型预训练与零样本预测。稍微总结一下各自特点。 传统机器学习与预测:需要人工设计学习特征。 深度学习从零训练与预测:从零自己标注大量数据(因为没法迁移),从零训练。 监督式预训练、微调与预测:预训练复用公开标注好的海量数据(可以迁移,所以可用公开别人标注好的海量数据),从零标注一些少量数进行微调。 无监督预训练、微调与预测:预训练数据集再扩大了,可以直接爬取互联网的数据进行训练,但还是需要从零标注一些少量数据进行微调。 视觉语言模型预训练与零样本预测:不需要进行微调了,那么也不需要标注的数据集了,做到零样本。 VLM的预训练与零样本预测方式与过往的相比,对下游视觉识别任务上实现零样本,去掉了微调的过程,那么这种方式就可以有效利用大规模的网络数据。 预训练架构 因为VLM有很多种模型架构,因此预训练的架构也有区别,下面列出常见的几种。 双塔式架构:视觉和文本模态分别通过独立的编码器处理(如ViT处理图像、BERT处理文本),模态交互仅发生在编码后的特征层面,在最后进行融合,典型的模型有CLIP、ALIGN等。 双分支架构:在独立编码器基础上引入动态交互模块,支持灵活切换双塔或单塔模式,实现任务自适应融合如VLMo、Mini-Gemini等。 单塔式架构:像和文本输入共享同一Transformer编码器,通过交叉注意力机制实现早期深度融合,典型的模型如ViLT,FLAVA等。 预训练目标 前面阐述了当前视觉-语言大模型通常采用预训练与零样本预测的方式。那么在视觉语言大模型(Vision-Language Models, VLM)中我们的预训练目标是什么了?所谓预训练目标(Pre-training Objectives)是让模型从海量无标注图文对中自动学习跨模态关联的核心机制。这些目标的目的建立视觉与语言模态的语义对齐,为下游任务(如视觉问答、图像描述)提供通用表征基础。而当前的训练目标大致可以分为3类:对比目标、生成目标、对齐目标。 对比目标:让模型学会"配对"正确的图文,并区分错误的组合,比如正样本匹配的图文对(如猫图 + “一只猫”),模型需让它们的特征向量高度相似;负样本不匹配的图文对(如猫图 + “一辆汽车”),模型需让它们的特征向量差异巨大。计算的损失函数为所有配对的相似度误差(如 InfoNCE损失),指导模型调整参数,代表模型有CLIP、ALIGN等,该方式一般适用于零样本分类、图文检索的模型。 生成目标:让模型“填空”或“创作”,通过预测缺失内容学习深层语义。具体输入通过mask遮住文本或图像,训练模型让其复原得到网络权重。该方式一般应用与图像描述、视觉问答(VQA)的模型。 对齐目标:让模型能够把句子的词精准对应到图中位置,要求最高。比如用目标检测框出识别图中的物体(如汽车),与文本中的词精确关联。该方式一般用于目标检测、语义分割等场景。 VLM模型 当前已经出现了很多视觉语言模型,各自的模型都具有独特的功能,在视觉语言研究领域和实际应用上扮演着重要的贡献,除了在第2章节我们介绍的VILA外,这里我们在本章节再补充举例几个进行简要说明一下。 CLIP 上图是CLIP模型,是一个典型的双塔式视觉-语言模型,由视觉编码器(ViT)和文本编码器(Transformer)等核心组件构成。通过预训练对比目标的方式学习实现图像与文本的跨膜态对齐,其核心创新点在于无需任务特定训练,直接利用自然语言提示(Pormt)完成零样本预测,支持识别训练数据中为出现的新类别。 从图中我们可以看成可以分为3个阶段,对比预训练、创建零样本分类器、零样本预测。 (1)对比预训练阶段 输入是海量的图文对,如图片输入狗+文本输入"pepper the aussie pup"。 编码:文本编码器(如transformer)将文本嵌入向量,图像编码器(如ViT/ResNet)将图像嵌入向量。 目标:图文预文本嵌入向量的点积度量图文相似性。通过对比损失(infoNCE)计算图文相似度矩阵。拉近匹配对(如对角线深蓝块,如狗图与"狗"文本),推远不匹配对(非对角线浅色块,如狗图与“汽车”文本)。 (2)创建零样本分类器 输入:新任务的类别标签(如 "dog", "bird", "car")。 处理:将标签转化为提示文本(如 "a photo of a {label}"),文本编码器生成所有标签的文本嵌入向量。 输出:得到一组文本嵌入,构成无需训练的分类器权重(传统模型需图像数据训练分类头) (3)零样本预测 输入:一张新图像(如鸟的图片)。 处理:图像编码器生成图像嵌入向量(左侧绿色向量),计算该向量与所有类别文本嵌入相似度。 输出:选择相似度最高的文本标签作为预测结果(如输出 "a photo of a bird")。 总结一下就是,通过上面的预训练,将配对的图文靠近,非配对的原理,学到语义对齐的公共空间,这样在在推理时把“类别标签”也写成一句话,当作“文本查询”;用这句“查询”去和图像向量比相似度,谁最像选谁。 LLaVA LLaVA是把视觉模型提取的图像特征通过一个映射层转成语言模型能理解的 token,然后和用户的语言指令一起输入到大语言模型(LLM),从而实现图像理解与多模态对话。其架构主要由Vision Encoder(视觉编码器)、Projector(视觉特征投影)、Language Instruction(语言指令输入)、LLM大模型几个组件构成,跟我们前面第2章节总结的结构类似,这里就不过多阐述了。下面简要说一下流程: 输入图像:输入的图像通过Vision Encoder提取特征$Z_{v}$。具体来说,预训练用的是CLIP模型的视觉编码器ViT-L/14。 特征投影:通过projector W提取的图像特征$Z_{v}$转换成LLM能够处理的token表示$H_{v}$。 输入指令:用户文本$X_{q}$转换为token表示$H_{q}$。 拼接输入:将[$H_{v}$,$H_{q}$]拼接一起送入LLM。 语言生成:LLM输出语言响应$X_{a}$,完成图像理解+问答。 LLaVA 是一个用于对齐视觉和语言数据以处理复杂多模态任务的复杂模型。它采用独特的方法,将图像处理与大型语言模型融合,以增强其解释和响应图像相关查询的能力。通过利用文本和视觉表示,LLaVA 在视觉问答、交互式图像生成以及涉及图像的基于对话的任务中表现出色。其与强大语言模型的集成使其能够生成详细描述,并协助实时视觉语言交互。 参考: 1. An Introduction to Vision-Language Modeling 2. Vision Language Transformers: A Survey 3. Understanding Vision-Language Models (VLMs): A Practical Guide 4. Guide to Vision-Language Models (VLMs) -
解读ViT:Transformer 在视觉领域如何落地
背景 计算机视觉领域,一直都是卷积天下。传统的卷积神经网络(CNN)依赖于卷积核提取局部特征,效果很好,但是也有一些不足,如需要人工设计卷积结构包括卷积核大小和层数,另外就是难以捕捉全局的依赖关系。 transformer最早更多的是应用在NLP领域的架构,用注意力机制来捕捉长距离的依赖。那把transformer应用在视觉领域了,会有什么效果吗?而在2021年发表的https://arxiv.org/abs/2010.11929这篇论文就是使用transformer应用在图像识别的领域。 论文中提到基于transformer使用监督学习方式训练模型进行图像分类时,在中等规模数据集(如ImageNet)上如果没有使用强正则化其准确率略低于同等规模的ResNet。但是当加大数据集(1400W至3亿张图像)训练时,发现其识别水平超越了现有技术。 模型概览 上图就是整个ViT模型结构了,对transformer比较熟悉的,整个结构就很简单了。可以发现只有transformer encoder没有transformer decoder。 这里先整体看看其流程步骤: 图像切块:原图输入为224x224分辨率的图像,将其切分为14x14共196块的(如使用卷积),每块大小的分辨率为16x16。 分块展平:将每块为16x16分辨率的patch展平为一维向量,共计有196个这样的向量。由于每块是RGB 3通道图像,因此向量维度为16x16x3= 768,按照RGB排布进行展开为一维向量。因此最后的数据形状为(196,768)。 线性投影:对每个patch的向量乘以一个权重矩阵,映射到D维的embedding空间,这个D维跟transformer输入维度一致(默认是512)。因此经过转换后的数据就变成了(196,768)->(196,512)。 位置编码:对经过线性映射的patch加上位置编码,每个patch一个位置向量,其向量的维度与patch维度一致,总的位置编码矩阵为(196,512)。将这个位置编码与经过线性映射的进行相加得到输入。 编码输入:经过位置编码后的输入然后在最开始加上了[CLS]向量送入编码器。因此输入的数据为(197,512)。如果算上批量数据最后就是(B,197,768)。B为batch size,197为patch数,512为embedding维度。 编码输出:最后经过多层感知机MLP得到最后的输出,如果是分类任务的话,就是(B,C)结果,B为batch size,C为类别数。也就是结果每行就是一个概率分布。 常见问题 (1)图像是如何切分展平的? 以输入尺寸3x224x224的RGB图像为例,块大小为16x16,因此块的数量为14x14=196个块。每个块3x16x16被拉成一维向量长度为16x16x3=768,也就是每个块被展平为768维向量,一共有196个块,也就是说转换为(196,768)的矩阵。 (2)每个patch为什么要展平? 主要是transformer的输入要求,因为transformer是序列处理器,其输入必现是一维的向量序列,而图像分块后得到的每个块是二维矩阵。还记得在transformer实现文章中吗?输入的是(seq,d_model),seq为token的数量,而d_model为每个token嵌入的向量。当然这里的图像最后还需要经过映射降维跟这里的d_model保持一致,这样才能输入到transformer的编码器中。 (3)线性投影有什么作用? 主要有两个作用,其一是图像分块展平后得到的是高维稀疏向量(如16163=768),包含了大量冗余信息如局部宽高、噪声等,缺乏高层语义表达,数据量大,计算量也大,线性投影是一个可训练全连接权重矩阵,可以提取保留关键局部特征;其二是为了适配transformer输入结构,Transformer要求输入为固定维度向量序列(如 D=512)。线性投影统一所有图像块的输出维度,确保自注意力机制可计算。 (4)这里的位置编码与transformer的有什么不同吗? ViT中的位置编码使用的是自适应位置编码,transformer中用的是正余弦固定公式,因为ViT中的输入序列位置一般都有限,因此用1D的可学习的位置编码即可,这个位置编码是一个可学习的参数矩阵,初始化为全0,在训练过程中通过反向传播自动优化。 (5)输出的MLP与transformer FFN有什么不同吗? 基本一样的,FFN是前馈神经网络的统称,MLP是具体的前馈神经网络具体实现特指全连接网络。 (6)最后的输出是什么样的? ViT最后的输出结构根据实际任务需求有关,如果是图像分类任务,在最终输出是[CLS] token向量经 MLP Head映射后的logits(未归一化的类别分数),形状为 [B, K](K为类别数); (7)整个处理流程数据变化是怎么样的? 处理阶段 输入形状 操作 输出形状 示例值(B=64) 原始输入 [B, C, H, W] — [64, 3, 224, 224] Patch分块 + 展平 [B, C, H, W] 卷积核尺寸=步长=P(如 16×16) [B, N, P²·C] [64, 196, 768] 线性投影(Patch Embedding) [B, N, P²·C] 全连接层映射至目标维度 D=512 [B, N, D] [64, 196, 512] 添加 Class Token [B, N, D] 序列前拼接可学习的 [CLS] 向量 [B, N+1, D] [64, 197, 512] 位置编码叠加 [B, N+1, D] 加可学习位置编码 E_{pos} ∈ ℝ^{1×(N+1)×D} [B, N+1, D] [64, 197, 512] Transformer 编码器 [B, N+1, D] 多头自注意力(MSA) + MLP 前馈网络 [B, N+1, D] [64, 197, 512] 分类头输出 [B, D](仅取 [CLS]) 全连接层映射至类别数 K [B, K] [64, 1000] -
lerobot之smolvla体验
环境安装 pip install -e ".[smolvla]" 在原来lerobot的环境基础上。 启动训练 本文主要是记录复现lerobot smolvla策略的效果,为了快速看到效果,这里不进行采集数据了,直接用此前ACT采集的数据,将数据打包放到autodl云服务器上进行训练。 python src/lerobot/scripts/train.py \ --dataset.root=/root/autodl-tmp/lerobot/data/record-07271539 \ --dataset.repo_id=laumy/record-07271539 \ --policy.push_to_hub=false \ --policy.path=lerobot/smolvla_base \ --policy.device=cuda \ --output_dir=outputs/train/smolvla_test \ --job_name=smolvla_test --batch_size=64 \ --steps=20000 --wandb.enable=false 或者 python -m lerobot.scripts.train \ --policy.type=smolvla \ --policy.vlm_model_name=HuggingFaceTB/SmolVLM2-500M-Video-Instruct \ --policy.load_vlm_weights=true \ --policy.num_vlm_layers=16 \ --policy.num_expert_layers=8 \ --dataset.repo_id=laumy/record-07271539 \ --output_dir=outputs/train/smolvla_test2 \ --job_name=smolvla_test \ --batch_size=64 --steps=20000 --wandb.enable=false 如果数据集在huggingface上面,则需要先登陆hugging face huggingface-cli login 填写token. python src/lerobot/scripts/train.py \ --dataset.repo_id=laumy0929/grab_candy_or_lemon \ --policy.path=lerobot/smolvla_base \ --policy.device=cuda \ --policy.repo_id=laumy0929/smolvla_test \ --output_dir=outputs/train/smolvla_test \ --job_name=smolvla_test --batch_size=64 \ --steps=20000 --wandb.enable=false 关于数据集的获取取决于两个参数,一个是repo_id另外一个是dataset.root。 repo_id: 必填字段,是在 Hugging Face Hub 上的数据集标识(datasets 仓库名)。 dataset.root :选填字段,是本地数据集所在目录。 训练首先从 dataset.root 读取本地数据;如果本地缺失需要的文件,才会用 repo_id 到 Hub 拉取缺的内容到这个 root 目录里。 下面有几个场景。 如果同时给定了dataset.root和dataset.repo_id 如果 root 目录已经是规范的 LeRobot v2 数据集结构(有 meta/info.json、data/.parquet、可选 videos/.mp4),会直接用本地文件,不会下载。 如果本地缺少 meta(或部分 data 文件),代码会用 repo_id 从 Hub 把缺的部分同步到你指定的 root 目录后再加载。 如果只传dataset.repo_id 会把本地根目录设为默认缓存:~/.cache/huggingface/lerobot/{repo_id}(若设置了环境变量 LEROBOT_HOME,则用 $LEROBOT_HOME/{repo_id}),如果本地缓存里已经有完整数据,则直接用本地文件,不再下载。如果本地没有缓存,远端也没有数据,就会报错。 推理验证 python -m lerobot.record \ --robot.type=so101_follower \ --robot.disable_torque_on_disconnect=true \ --robot.port=/dev/ttyACM0 --robot.cameras="{ handeye: {type: opencv, index_or_path: 4, width: 640, height: 480, fps: 30}, fixed: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30}}" \ --robot.id=R12252801 \ --display_data=false \ --dataset.single_task="Grab the cube" \ --policy.path=outputs/smolvla_weigh_08181710/pretrained_model \ --dataset.episode_time_s=240 \ --dataset.repo_id=laumy/eval_smolvla_08181710 常见问题 训练报错如下: Traceback (most recent call last): File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/urllib3/connection.py", line 198, in _new_conn sock = connection.create_connection( File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/urllib3/util/connection.py", line 85, in create_connection raise err File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/urllib3/util/connection.py", line 73, in create_connection sock.connect(sa) TimeoutError: [Errno 110] Connection timed out The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/urllib3/connectionpool.py", line 787, in urlopen response = self._make_request( File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/urllib3/connectionpool.py", line 488, in _make_request raise new_e File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/urllib3/connectionpool.py", line 464, in _make_request self._validate_conn(conn) File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/urllib3/connectionpool.py", line 1093, in _validate_conn conn.connect() File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/urllib3/connection.py", line 753, in connect self.sock = sock = self._new_conn() File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/urllib3/connection.py", line 207, in _new_conn raise ConnectTimeoutError( urllib3.exceptions.ConnectTimeoutError: (<urllib3.connection.HTTPSConnection object at 0x7fe651566380>, 'Connection to huggingface.co timed out. (connect timeout=None)') The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/requests/adapters.py", line 667, in send resp = conn.urlopen( File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/urllib3/connectionpool.py", line 841, in urlopen retries = retries.increment( File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/urllib3/util/retry.py", line 519, in increment raise MaxRetryError(_pool, url, reason) from reason # type: ignore[arg-type] urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='huggingface.co', port=443): Max retries exceeded with url: /api/models/HuggingFaceTB/SmolVLM2-500M-Video-Instruct/tree/main/additional_chat_templates?recursive=False&expand=False (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7fe651566380>, 'Connection to huggingface.co timed out. (connect timeout=None)')) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/root/autodl-tmp/lerobot/src/lerobot/scripts/train.py", line 291, in <module> train() File "/root/autodl-tmp/lerobot/src/lerobot/configs/parser.py", line 226, in wrapper_inner response = fn(cfg, *args, **kwargs) File "/root/autodl-tmp/lerobot/src/lerobot/scripts/train.py", line 139, in train policy = make_policy( File "/root/autodl-tmp/lerobot/src/lerobot/policies/factory.py", line 168, in make_policy policy = policy_cls.from_pretrained(**kwargs) File "/root/autodl-tmp/lerobot/src/lerobot/policies/pretrained.py", line 101, in from_pretrained instance = cls(config, **kwargs) File "/root/autodl-tmp/lerobot/src/lerobot/policies/smolvla/modeling_smolvla.py", line 356, in __init__ self.language_tokenizer = AutoProcessor.from_pretrained(self.config.vlm_model_name).tokenizer File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/transformers/models/auto/processing_auto.py", line 288, in from_pretrained config_dict, _ = ProcessorMixin.get_processor_dict(pretrained_model_name_or_path, **kwargs) File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/transformers/processing_utils.py", line 873, in get_processor_dict for template in list_repo_templates( File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/transformers/utils/hub.py", line 161, in list_repo_templates return [ File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/transformers/utils/hub.py", line 161, in <listcomp> return [ File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/huggingface_hub/hf_api.py", line 3168, in list_repo_tree for path_info in paginate(path=tree_url, headers=headers, params={"recursive": recursive, "expand": expand}): File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/huggingface_hub/utils/_pagination.py", line 36, in paginate r = session.get(path, params=params, headers=headers) File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/requests/sessions.py", line 602, in get return self.request("GET", url, **kwargs) File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/requests/sessions.py", line 589, in request resp = self.send(prep, **send_kwargs) File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/requests/sessions.py", line 703, in send r = adapter.send(request, **kwargs) File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/huggingface_hub/utils/_http.py", line 96, in send return super().send(request, *args, **kwargs) File "/root/miniconda3/envs/lerobot/lib/python3.10/site-packages/requests/adapters.py", line 688, in send raise ConnectTimeout(e, request=request) requests.exceptions.ConnectTimeout: (MaxRetryError("HTTPSConnectionPool(host='huggingface.co', port=443): Max retries exceeded with url: /api/models/HuggingFaceTB/SmolVLM2-500M-Video-Instruct/tree/main/additional_chat_templates?recursive=False&expand=False (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7fe651566380>, 'Connection to huggingface.co timed out. (connect timeout=None)'))"), '(Request ID: 7f4d5747-ec95-47cc-a55f-cb3e230c52e2)') 原因是训练在初始化 SmolVLA 的 VLM 时需要从 Hugging Face Hub 拉取资源(AutoProcessor.from_pretrained 默认用 vlm_model_name=HuggingFaceTB/SmolVLM2-500M-Video-Instruct)。你的机器连到 huggingface.co 超时,导致下载失败并报 ConnectTimeout。 解决办法:export HF_ENDPOINT=https://hf-mirror.com 把原本指向 https://huggingface.co 的所有 Hub 请求(模型/数据集下载、API 调用)改走 https://hf-mirror.com。作用范围仅当前这个终端会话。关闭终端或开新终端就失效。 训练过程过程中警告 huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using `tokenizers` before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false) huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using `tokenizers` before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false) huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using `tokenizers` before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false) huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using `tokenizers` before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false) huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using `tokenizers` before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false) huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using `tokenizers` before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false) huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using `tokenizers` before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false) huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using `tokenizers` before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false) huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using `tokenizers` before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false) huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using `tokenizers` before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false) TOKENIZERS_PARALLELISM是分词器在一次调用会开多线程并行处理文本。分词器库是hugging Face的分词器库,负责把文本指令变成模型可用的token id序列,也能把id还原会文本,跟我们此前在一步步实现transformer 的词表类型。出现这样的警告是tokenizers它开了多线程并发,而 DataLoader 再 fork 出子进程并发(本身DataLoader是可以并发),这样容易有死锁风险,为安全起见,库检测到这种顺序就自动把自己的多线程并行关掉,并给出提示。如果要关掉tokenizers的多线程并发,export TOKENIZERS_PARALLELISM=false。 -
从零实现 Transformer:中英文翻译实例
概述 在http://www.laumy.tech/2458.html#h37章节中,介绍了transformer的原理,本章用pytorch来实现一个将"我有一个苹果"翻译为英文"I have an apple"的模型,直观体会transformer原理实现。 接下来先上图看看整体的代码流程。 推理 训练 模型 编解码器 到这里就涵盖了整个transformer模型翻译的例子了,下面的章节只是对图中的代码进行展开说明,如果不想陷入细节,可以直接跳转到最后一节获取源码运行实验一下。 数据预处理 数据准备 (1) 准备原始文本对 既然要做翻译那得先有数据用于模型训练,因此需要先准备原始的中文->英文的文本对,下面是使用python列表(List)准备中英匹配语料,List中包含的是元组(Tuple)。 pairs = [ ("我 有 一个 苹果", "i have an apple"), ("我 有 一本 书", "i have a book"), ("你 有 一个 苹果", "you have an apple"), ("他 有 一个 苹果", "he has an apple"), ("她 有 一个 苹果", "she has an apple"), ("我们 有 一个 苹果", "we have an apple"), ("我 喜欢 苹果", "i like apples"), ("我 吃 苹果", "i eat apples"), ("你 喜欢 书", "you like books"), ("我 喜欢 书", "i like books"), ("我 有 两个 苹果", "i have two apples"), ("我 有 红色 苹果", "i have red apples"), ] 为了方便,在构建原始文本对时,中英文的分词就以空格划分,这样接下来就可以根据空格来进行构建词表。 (2)构建词表 因为神经网络不能直接处理文本,模型只能处理数字,比如不能直接处理"我"、"有","I"等中英文词,对于计算机来讲都是数字,所以需要把文字转换为对应的映射表。 所以词表就是一个"字典",把每个词映射到一个唯一的数字ID上,所有的文本都需要转换为数字序列。 如下示例,中英文的编号。 # 中文词表示例 SRC_STOI = { "我": 1, "有": 2, "一个": 3, "苹果": 4, "书": 5, "喜欢": 6, # ... 更多词 } # 英文词表示例 TGT_STOI = { "i": 1, "have": 2, "an": 3, "apple": 4, "a": 5, "book": 6, # ... 更多词 } 如何构建词表了。既然中文、英文都需要各自编号,那么得先把此前准备的原始文本队中文、英文各自拆出来,然后我们使用python的set集合,将中文、英文分别添加到set集合中,使用set集合的好处是可以自动去重,添加了重复元素,set就不会添加,这样就得到了各自的中文、英文词表。最后再对这些词表进行依次编号即可。 下面就看看使用python代码怎么实现,首先是将原始文本对拆解,把中文放一起,英文放一起。 src_texts = [p[0] for p in pairs] tgt_texts = [p[1] for p in pairs] print(src_texts) print(tgt_texts) src_texts ['我 有 一个 苹果', '我 有 一本 书', '你 有 一个 苹果', '他 有 一个 苹果', '她 有 一个 苹果', '我们 有 一个 苹果', '我 喜欢 苹果', '我 吃 苹果', '你 喜欢 书', '我 喜欢 书', '我 有 两个 苹果', '我 有 红色 苹果'] tgt_texts ['i have an apple', 'i have a book', 'you have an apple', 'he has an apple', 'she has an apple', 'we have an apple', 'i like apples', 'i eat apples', 'you like books', 'i like books', 'i have two apples', 'i have red apples'] 接下来实现一个build_vocab函数,主要的思路就是句子先按照空格进行分好词,接着将所有词添加到set集合中,set集合会自动去重,这里需要注意的时,需要再加上3个特殊的词,分别是pad、bos、eos分别表示填充、开始、结束。填充是因为输入句子是不定长的,但是对于transformer来说所有的输入矩阵处理都是固定长度,所以不够的需要补齐,而bos和eos是用于transformer解码的,便于开始和结束翻译过程,最后构建好词表后就按照词表中进行变化,3个特殊词分为为1、2、3其他的词依次编号。 def build_vocab(examples: List[str]): """构建词表(字符串→索引 与 索引→字符串) - 输入示例为用空格分词后的句子列表 - 加入特殊符号 `<pad>`, `<bos>`, `<eos>` 并将其它 token 排序,保证可复现 返回: stoi: dict[token->id] itos: List[id->token] """ tokens = set() # 建立一个集合,用于存储所有的词表(不重复的词) for s in examples: # 依次遍历获得每个句子 for t in s.split(): # 通过空格划分,依次遍历句子中的每个词, tokens.add(t.lower()) # 将词添加到set中,这里为了方便统一转换小写 itos = ["<pad>", "<bos>", "<eos>"] + sorted(tokens) # 加入3个特殊的词,同时对set中的词进行排序。 stoi = {t: i for i, t in enumerate(itos)} # 对词表中的词按照顺序依次编号 return stoi, itos SRC_STOI, SRC_ITOS = build_vocab(src_texts) TGT_STOI, TGT_ITOS = build_vocab(tgt_texts) build_vocab最终返回是一个字典和列表,字典是词:编号的映射,列表是存放的是词表。列表是按照编号顺序依次排布,这样我们可以通过编号定位到时那个词。 为什么要一个字典和列表了?因为transformer输入是词->编号(转换为编码数字给计算机处理),输出是编号->词过程(转化为句子给人看)。通过字典我们可以查询词对应的编号[key:value],而通过列表的索引(编号)我们可以查询到对应的词。 中文和英文分别各自对应一个字典和词表。 SRC_STOI {'<pad>': 0, '<bos>': 1, '<eos>': 2, '一个': 3, '一本': 4, '两个': 5, '书': 6, '他': 7, '你': 8, '吃': 9, '喜欢': 10, '她': 11, '我': 12, '我们': 13, '有': 14, '红色': 15, '苹果': 16} SRC_ITOS ['<pad>', '<bos>', '<eos>', '一个', '一本', '两个', '书', '他', '你', '吃', '喜欢', '她', '我', '我们', '有', '红色', '苹果'] TGT_STOI {'<pad>': 0, '<bos>': 1, '<eos>': 2, 'a': 3, 'an': 4, 'apple': 5, 'apples': 6, 'book': 7, 'books': 8, 'eat': 9, 'has': 10, 'have': 11, 'he': 12, 'i': 13, 'like': 14, 'red': 15, 'she': 16, 'two': 17, 'we': 18, 'you': 19} TGT_ITOS ['<pad>', '<bos>', '<eos>', 'a', 'an', 'apple', 'apples', 'book', 'books', 'eat', 'has', 'have', 'he', 'i', 'like', 'red', 'she', 'two', 'we', 'you'] 这样我们就给中文和英文的所有词都编好号了,同时通过列表也可以通过编号查询到词。 数据加载器 在pytorch中模型训练那必然少不了DataLoader和Dataset,关于这两个类的介绍在http://www.laumy.tech/2491.html#h23中有简要说明,这里就不阐述了。注意本小节说明的数据的批量处理都适用于训练准备,主要是实现Dataset和Dataloader用于pytorch模型的训练,如果只是推理则是不需要的。 (1)Dataset继承类实现 首先要实现DataLoader中关键的输入类Dataset继承类,用于产出“单个样本”,怎么按索引取到一个样本,以及总共有多少个样本。每个样本是中文句子->英文句子。样本集为此前定义pairs,但是要把pairs中句子转换为编号,词表在前面我们已经构建好了,直接查询就行,那这里我们定义一个Example用于定义样本,src是中文句子的编号列表,tgt是对于英文句子的编号列表。 @dataclass class Example: """单条并行样本 - src: 源语言索引序列(不含 BOS/EOS) - tgt: 目标语言索引序列(含 BOS/EOS) """ src: List[int] tgt: List[int] 接下来就是实现Dataset的继承类ToyDataset,返回有多少个样本,以及通过编号获取指定的样本。 class ToyDataset(Dataset): """语料数据集,用于快速过拟合演示。""" def __init__(self, pairs: List[Tuple[str, str]]): self.data = [Example(encode_src(s), encode_tgt(t)) for s, t in pairs] def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx] 需要把pairs句子中词列表编码为数字列表,这里实现encode_src用于将输入(即pairs中的中文)编号为列表,再实现encode_tgt将输出(即pairs中的英文)编号为列表。使用for列表推导式从pairs列表中获取到s(中文句子)和t(英文句子)然后传入encode_src和encoder_tgt进而构建一个新的列表元素Example。这样就组建样本的self.data的样本列表,元素为Example类型,可以通过idx获取到指定的样本。 def encode_src(s: str) -> List[int]: """将原语句(已空格分词)编码为索引序列(不含 BOS/EOS)。""" return [SRC_STOI[w.lower()] for w in s.split()] def encode_tgt(s: str) -> List[int]: """将目标语句编码为索引序列,并在首尾添加 BOS/EOS。""" return [BOS_IDX] + [TGT_STOI[w.lower()] for w in s.split()] + [EOS_IDX] 上面就是输入句子编码为编号向量的实现了,也很简单,通过此前构建的词表字典,通过词就可以搜索到对应编号了。这里需要注意的是编码的源句子(输入)是没有包含BOS和EOS的,因为transformer的编码器不需要BOS和EOS,而编码的目标句子(输出)需要在句子前加上BOS,句子结尾加上EOS,因为transformer的解码器输入需要通过BOS来翻译第一个词,通过EOS来结束一个句子的翻译,要是不明白为什么了可以看看前面transformer原理的文章。 (2)Dataload DataLoader 负责“成批取样”,模型训练输入数据不是一个样本一个样本的送入训练,而是按照批次(多个样本合成一个批次)进行训练,这样训练效率才高。DataLoader决定批大小、是否打乱、多进程加载,返回的是一个可迭代的对象。 DataLoader重点是要实现 collate_fn回调,也就是怎么把一个批里的样本“拼起来”。 loader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn) 训练transformer,准备数据。我们的目的是要能够返回批量数据,批量数据也有好几个类型。 输入给encoder批量数据:输入矩阵类型(B,S),包含补齐的padding。 输入给decoder的批量数据:输入给decoder的矩阵类型(B,T),包含BOS以及右对齐的padding。不能加EOS,因为EOS是预测的结果,防止模型训练作弊。 decoder输出的批量数据:解码器的监督目标,主要用于预测数据与实际的结果比较计算损失,矩阵类型(B,T),不含BOS但是包含EOS。 encoder输入的pad掩码数据:因为输入给encoder的数据有padding,所以要告诉transformer哪些做了补齐,后续计算的时候要处理。 decoder输入的pad掩码数据:同上。 def collate_fn(batch: List[Example]): """将一个 batch 的样本对齐为等长张量,并构造 teacher forcing 所需的输入/输出。 返回: - src: (B,S) 源序列,已 padding - tgt_in: (B,T) 解码器输入(含 BOS,右对齐 padding) - tgt_out: (B,T) 解码器监督目标(对 tgt_in 右移一位,含 EOS) - src_pad_mask: (B,S) 源端 padding 掩码,True 表示 padding 位置 - tgt_pad_mask: (B,T) 目标端 padding 掩码(针对输入序列) """ # padding to max length in batch src_max = max(len(b.src) for b in batch) tgt_max = max(len(b.tgt) for b in batch) src_batch = [] tgt_in_batch = [] tgt_out_batch = [] for ex in batch: src = ex.src + [PAD_IDX] * (src_max - len(ex.src)) # Teacher forcing: shift-in, shift-out tgt_in = ex.tgt[:-1] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[:-1])) tgt_out = ex.tgt[1:] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[1:])) src_batch.append(src) tgt_in_batch.append(tgt_in) tgt_out_batch.append(tgt_out) src = torch.tensor(src_batch, dtype=torch.long) # (B, S) tgt_in = torch.tensor(tgt_in_batch, dtype=torch.long) # (B, T_in) tgt_out = torch.tensor(tgt_out_batch, dtype=torch.long) # (B, T_out) src_pad_mask = src.eq(PAD_IDX) # (B, S) tgt_pad_mask = tgt_in.eq(PAD_IDX) # (B, T) return src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask 上面就是Dataloader回调函数如何获取批量数据的实现了,输入为一个列表(包含所有样本的列表)。输出为5个2维向量,分别对应的就是上面说的5个批量数据。 首先计算样本列表中最长的源序列长度src_max和目标序列长度tgt_max,为后续的不足长度的句子进行padding操作,提供基准的长度。 其次使用for循环遍历每个样本(Example),将源序列src(encoder的输入)使用PAD_IDX填充到相同长度,保持做对齐;将目标序列输入(tgt_in)去掉最后一个token(EOS)作为decoder的输入,目标序列输出比对样本tgb_out去掉第一个tokenBOS作为监督目标,使用的teacher Forcing机制,这样就是实现了输入预测下一个的训练模式数据准备。 最后就是准备src和tgt_in的mask矩阵,形状跟src和tgt_in一样,使用python的eq比对如果对应的位置是padding就是true,不是就是false。 模型架构 数据准备好了,接下来就是设计我们的模型了。我们的模型是一个翻译模型可以分为两个路径,一个是编码路径和解码路径。 编码路径:词嵌入->位置编码->编码器。 解码路径:词嵌入->位置编码->解码器->生成器。 Class Seq2SeqTransformer(nn.Module): def __init__(self, src_vocab_size, tgt_vocab_size, d_model=128, nhead=4, num_encoder_layers=2, num_decoder_layers=2, dim_ff=256, dropout=0.1): super().__init__() self.d_model = d_model # 编码路径 # 1.词嵌入层,将tokenID转换为密集向量 self.src_tok = nn.Embedding(src_vocab_size, d_model, padding_idx=PAD_IDX) self.tgt_tok = nn.Embedding(tgt_vocab_size, d_model, padding_idx=PAD_IDX) # 2. 对输入添加位置信息 self.pos_enc = PositionalEncoding(d_model, dropout=dropout) # 3. 源序列的编码 self.encoder = Encoder(d_model, nhead, dim_ff, num_encoder_layers, dropout) # 解码路径 # 1. 解码生成目标序列 self.decoder = Decoder(d_model, nhead, dim_ff, num_decoder_layers, dropout) # 2. 将解码器输出转换为词表概率 self.generator = nn.Linear(d_model, tgt_vocab_size) 词嵌入直接调用的是神经网络的库nn.Embedding,其他部分都要自己实现,接下来我们会一一展开。下面我们需要先实现模型Seq2SeqTransformer的方法,主要包括如下: make_subsequent_mask:解码器因果掩码,不允许解码器看到未来。 forward: 模型前向传播的方法,pytorch训练的时候自动调用。 greedy_decode:模型推理方法,用于推理的应用。 因果掩码 为什么需要掩码了?主要是让模型不能看到未来的词。 推理阶段虽然是自回归一个一个输入然后一个一个迭代输出,但是在训练阶段,我们解码器的样本是全部一次性输入的。如下的步骤,我们虽然给到模型输入为:"BOS i have an apple ",但是每个步骤给到模型看到的不能是全部,否则给模型都看到输入结果了,那还谈啥预测,模型会偷懒直接就照搬就是一个映射过程了。如当输入BOS i 期望预测输出i have,如果没有掩码模型都看到全部的"BOS i have an apple ",就不是预测了,模型的参数也没法迭代了。 # 步骤1: 输入BOS → 期望输出i # 步骤2: 输入BOS i → 期望输出i have # 步骤3: 输入BOS i have → 期望输出i have an # 步骤4: 输入BOS i have an → 期望输出 i have an apple # 步骤5: 输入BOS i have an apple → 期望输出i have an apple EOS 哪有个问题,为什么我们输入的时候不按照要多少输入多少,为啥要全部一下给到输入?输入倒是可以要多少输入多少,但是要要考虑模型的并行训练,实际上上面的5个步骤在模型训练时是并行进行的,模型训练要的是训练参数,在某个阶段看到什么输入遇到什么输出,都分好类了自然可以并行的,所以这就需要结合掩码了,告诉模型那个步骤你能看到哪些? 总结一下mask的作用就是让模型不能看到未来的词,同时也是让模型不要对padding位进行误预测。 def make_subsequent_mask(self, sz: int) -> torch.Tensor: """构造大小为 (sz, sz) 的下三角因果掩码;True 为屏蔽(不允许看未来)。""" return torch.triu(torch.ones(sz, sz, dtype=torch.bool), diagonal=1) mask是要生成一个下三角形状,示例如下: # 对于序列长度4 mask = make_subsequent_mask(4) # 结果: # [[False, True, True, True], # 位置0: 只能看位置0 # [False, False, True, True], # 位置1: 能看位置0,1 # [False, False, False, True], # 位置2: 能看位置0,1,2 # [False, False, False, False]] # 位置3: 能看所有位置 前向传播 def forward(self, src, tgt_in, src_pad_mask, tgt_pad_mask): """训练/教师强制阶段的前向。 参数: - src: (B, S) 源 token id - tgt_in: (B, T) 目标端输入(以 BOS 开头) - src_pad_mask: (B, S) True 为 padding - tgt_pad_mask: (B, T) True 为 padding(针对 tgt_in) 返回: - logits: (B, T, V) 词表维度的分类分布 """ # 1) 词嵌入 + 位置编码 src_emb = self.pos_enc(self.src_tok(src)) # (B,S,C) tgt_emb = self.pos_enc(self.tgt_tok(tgt_in)) # (B,T,C) # 2) 编码:仅使用 key_padding_mask 屏蔽 padding memory = self.encoder(src_emb, src_key_padding_mask=src_pad_mask) # (B,S,C) # 3) 解码:自注意力需要因果掩码 + padding 掩码;交叉注意力需要 memory 的 padding 掩码 tgt_mask = self.make_subsequent_mask(tgt_in.size(1)).to(src.device) # (T,T) out = self.decoder( tgt_emb, memory, tgt_mask=tgt_mask, tgt_key_padding_mask=tgt_pad_mask, memory_key_padding_mask=src_pad_mask, ) # (B,T,C) logits = self.generator(out) return logits 上面就是模型的训练了,也比较简单,就是对输入词进行词嵌入+位置编码计算,然后送入编码器得到输出特征矩阵memory;给编码器输入的只是padding的掩码,因为不要提取padding的词; 其次生成因果掩码,将编码器的的特征矩阵输出结果memory以及解码器侧自身的输入给到解码器最终得到(B,T,C)的输出矩阵,其包含了最终输出结果词位置的隐藏信息; 最后调用self.generator(out)即线性变化得到输出目标词表的概率分布(B,T,V);后面就可以用其使用交叉熵跟目标结果进行比对计算损失了。 解码推理 @torch.no_grad() def greedy_decode(self, src_ids: List[int], max_len=20, device="cpu"): """在推理阶段进行贪心解码。 参数: - src_ids: 源端 token id 序列(不含 BOS/EOS) - max_len: 最大生成长度(含 BOS/EOS) - device: 运行设备 返回: - 生成的目标端 id 序列(含 BOS/EOS) """ #切换为评估模式,关闭dropout/batchnorm等随机性 self.eval() # 将源端token id序列转换为张量,并添加一个维度,如[1, 2, 3, 4] -> [[1, 2, 3, 4]] # 变为批维度的 (1, S);dtype 为 long 主要是以适配 nn.Embedding的输入格式。 src = torch.tensor(src_ids, dtype=torch.long, device=device).unsqueeze(0) # 生成一个跟src相同形状的mask矩阵,让编码器不要计算提取pandding的位置信息。 #按元素判断 src 是否等于 PAD_IDX,等于的位置为 True,不等的位置为 False。 src_pad_mask = src.eq(PAD_IDX) # 计算src_tok= src 经过词嵌入+位置编码后的结果 src_tok = self.src_tok(src) src_pos = self.pos_enc(src_tok) # 将该结果送入编码器,返回的memory就是编码器提取的特征向量。 # 输入编码器,即使没有填充(pandding)的token,也需要传入src_key_padding_mask。 memory = self.encoder(src_pos, src_key_padding_mask=src_pad_mask) # 初始化目标端token id序列,维度为(1,1),初始值为BOS_IDX # 表示目标端序列的开始,BOS_IDX=1 # 推理时输入是没有PAD,但是仍然需要tgt_pad_mask. ys = torch.tensor([[BOS_IDX]], dtype=torch.long, device=device) for _ in range(max_len - 1): #计算本次解码的Mask,跟ys形状一样。 tgt_pad_mask = ys.eq(PAD_IDX) # 计算本次因果掩码,把未来看到的token都屏蔽。 tgt_mask = self.make_subsequent_mask(ys.size(1)).to(device) # 可以看到当推理模式时,解码器输入token数量依次是1,2,3,4..... out = self.decoder( self.pos_enc(self.tgt_tok(ys)), memory, tgt_key_padding_mask=tgt_pad_mask, memory_key_padding_mask=src_pad_mask, ) # 转化为预测词的概率分布 logits = self.generator(out[:, -1:, :]) # 使用贪心选择概率最大的作为本次预测的目标 next_token = logits.argmax(-1) next_id = next_token.item() # 显示选择的token token_text = TGT_ITOS[next_id] if next_id < len(TGT_ITOS) else f"ID_{next_id}" print(f"选择: {token_text}({next_id})") ys = torch.cat([ys, next_token], dim=1) # 当下一个输出为EOS时表示结束,则退出。 if next_id == EOS_IDX: break return ys.squeeze(0).tolist() 上面代码的设计要点主要为几个部分: 编码信息提取:将要翻译的句子进行词嵌入,位置编码,然后送入编码器计算提出特征信息memory,最终给到解码器作为输入。 自回归生成:最开始使用BOS一个token+编码器此前计算的输出memory、掩码等信息输入给解码器,解码器预测得到一个输出,然后将输出拼接会此前BOS的后面形成解码器新的输入,以此循环进行预测,直至遇到EOS结束。解侧输入序列长度逐步增长:1 → 2 → 3 → 4 → ...,最开始的序列为BOS表示开始。 掩码生成:使用了因果掩码和padding掩码;虽然推理阶段没有对输入数据进行padding操作,但是依旧需要这两个掩码,主要的考量是保持接口的一致性(原来的接口需要传递这个参数)。 贪心策略:解码器的输出进行线性变化得到词表的概率分布后,然后挑选概率最高的token。 结束循环:当判断到模型预测出EOS时,模式则结束,整个预测完成。 位置编码 class PositionalEncoding(nn.Module): """经典正弦/余弦位置编码。 给定嵌入 `x (B,L,C)`,按长度切片并与位置编码相加,再做 dropout。 """ def __init__(self, d_model: int, max_len: int = 5000, dropout: float = 0.1): super().__init__() self.dropout = nn.Dropout(dropout) # 创建一个形状为 (max_len, d_model) 的零张量,用于存储位置编码 pe = torch.zeros(max_len, d_model) # (L, C) # 创建一个形状为 (max_len, 1) 的张量,用于存储位置索引 position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # (L, 1) # 创建一个形状为 (d_model//2,) 的张量,用于存储位置编码的缩放因子 div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # sin, cos 交错 pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) # (1, L, C) self.register_buffer("pe", pe) def forward(self, x: torch.Tensor): # (B, L, C) """为输入嵌入添加位置编码并做 dropout。 参数: - x: (B, L, C) 返回: - (B, L, C) """ x = x + self.pe[:, : x.size(1)] return self.dropout(x) # 对于位置 pos 和维度 i: # 偶数维度: PE(pos, 2i) = sin(pos / 10000^(2i/d_model)) # 奇数维度: PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model)) # pe[:, 0::2]: 选择所有行的偶数列 (0, 2, 4, ...) # pe[:, 1::2]: 选择所有行的奇数列 (1, 3, 5, ...) # 计算过程: # 位置0: sin(0 * div_term), cos(0 * div_term), sin(0 * div_term), ... # 位置1: sin(1 * div_term), cos(1 * div_term), sin(1 * div_term), ... # 位置2: sin(2 * div_term), cos(2 * div_term), sin(2 * div_term), ... 位置编码比较简单,就是按照sin和cos按公式计算生成向量,最终返回词嵌入向量+位置编码向量。 编码器 class Encoder(nn.Module): def __init__(self, d_model: int, nhead: int, dim_ff: int, num_layers: int, dropout: float = 0.1): super().__init__() self.layers = nn.ModuleList([ EncoderLayer(d_model, nhead, dim_ff, dropout) for _ in range(num_layers) ]) def forward(self, x: torch.Tensor, src_key_padding_mask: torch.Tensor | None = None) -> torch.Tensor: """堆叠若干编码层。 参数: - x: (B, S, C) - src_key_padding_mask: (B, S) True 为 padding 返回: - (B, S, C) """ for layer in self.layers: x = layer(x, src_key_padding_mask=src_key_padding_mask) return x 编码器框架就是若干个编码层堆叠起来,但是每层的都有自己的参数,主要调用的是nn.ModuleList进行注册子模块,确保参数都能够被优化器找到,num_layers控制了编码器的深度。 前向传播函数也很简单,输入一次通过每一个编码层,得到的输出结果给到下一个编码层,以此循环最终经过最后一层编码器得得到的特征信息,给后续解码器使用。 class EncoderLayer(nn.Module): """Transformer 编码层(后归一化 post-norm 版本) 子层:自注意力 + 前馈;均带残差连接与 LayerNorm。 """ def __init__(self, d_model: int, nhead: int, dim_ff: int, dropout: float = 0.1): super().__init__() self.self_attn = MultiHeadAttention(d_model, nhead, dropout) self.norm1 = nn.LayerNorm(d_model) self.ff = PositionwiseFeedForward(d_model, dim_ff, dropout) self.norm2 = nn.LayerNorm(d_model) def forward(self, x: torch.Tensor, src_key_padding_mask: torch.Tensor | None = None) -> torch.Tensor: """单层编码层前向。 参数: - x: (B, S, C) - src_key_padding_mask: (B, S) True 为 padding 返回: - (B, S, C) """ # 自注意力子层 attn_out = self.self_attn(x, x, x, attn_mask=None, key_padding_mask=src_key_padding_mask) x = self.norm1(x + attn_out) # 前馈子层 ff_out = self.ff(x) x = self.norm2(x + ff_out) return x 编码层的组件为MultiHeadAttention、LayerNorm、PositionwiseFeedForward这与我们此前介绍的transformer原理一致。 其前向传播过程,首先输入X(查询),X(键),X(值),qkv都是一样的;注意力计算时,把attn_mask=None,因为编码器不需要因果掩码,但是需要padding mask。其次进行残差连接计算x+attn_out,再调用norml进行层归一化,最后是计算前馈网络,再进行归一化就得到一层的输出结果了。 class PositionwiseFeedForward(nn.Module): """前馈网络:逐位置的两层 MLP(含激活与 dropout)""" def __init__(self, d_model: int, dim_ff: int, dropout: float = 0.1): super().__init__() self.fc1 = nn.Linear(d_model, dim_ff) self.fc2 = nn.Linear(dim_ff, d_model) self.act = nn.ReLU() self.dropout = nn.Dropout(dropout) def forward(self, x: torch.Tensor) -> torch.Tensor: """两层逐位置前馈网络。 参数: - x: (B, L, C) 返回: - (B, L, C) """ x = self.fc2(self.dropout(self.act(self.fc1(x)))) x = self.dropout(x) return x 前馈网络主要两层: 第一层:d_model → dim_ff (通常 dim_ff = 4 * d_model) 激活函数:ReLU。 第二层:dim_ff → d_model 就是对输入进行升维然后非线性变化再降维,提取更多的信息。两层都使用了dropout,展开就是如下。 # 1. 第一层线性变换 x = self.fc1(x) # (B, L, C) → (B, L, dim_ff) # 2. 激活函数 x = self.act(x) # 应用ReLU # 3. 第一个dropout x = self.dropout(x) # 随机置零部分神经元 # 4. 第二层线性变换 x = self.fc2(x) # (B, L, dim_ff) → (B, L, C) # 5. 第二个dropout x = self.dropout(x) # 最终dropout 解码器 class Decoder(nn.Module): def __init__(self, d_model: int, nhead: int, dim_ff: int, num_layers: int, dropout: float = 0.1): super().__init__() self.layers = nn.ModuleList([ DecoderLayer(d_model, nhead, dim_ff, dropout) for _ in range(num_layers) ]) def forward( self, x: torch.Tensor, memory: torch.Tensor, tgt_mask: torch.Tensor | None = None, tgt_key_padding_mask: torch.Tensor | None = None, memory_key_padding_mask: torch.Tensor | None = None, ) -> torch.Tensor: """堆叠若干解码层。 参数: - x: (B, T, C) 目标端嵌入 - memory: (B, S, C) 编码器输出 - tgt_mask: (T, T) 因果掩码,True 为屏蔽 - tgt_key_padding_mask: (B, T) 目标端 padding 掩码 - memory_key_padding_mask: (B, S) 源端 padding 掩码 返回: - (B, T, C) """ for layer in self.layers: x = layer( x, memory, tgt_mask=tgt_mask, tgt_key_padding_mask=tgt_key_padding_mask, memory_key_padding_mask=memory_key_padding_mask, ) return x 与编码器类似,使用nn.ModuleList创建多个解码层,每个解码层都是独立的DecoderLayer实例;解码器的输入数据有两个,一个是解码器侧自己的输入序列,另外一个是编码器计算得到的特征信息。解码器的每一层都需要输入编码器给的特征序列,但是都是一样的;解码器层计算得到的输出将传递给下一层解码器层,循环得到最后的输出。 Decoder (解码器) ├── DecoderLayer 1 (解码层1) │ ├── MultiHeadAttention (自注意力) │ ├── LayerNorm1 + 残差连接 │ ├── MultiHeadAttention (交叉注意力) │ ├── LayerNorm2 + 残差连接 │ ├── PositionwiseFeedForward (前馈网络) │ └── LayerNorm3 + 残差连接 ├── DecoderLayer 2 (解码层2) │ └── ... (同上结构) └── ... (重复 num_layers 次) 输入: x (B, T, C) + memory (B, S, C) → DecoderLayer 1 → DecoderLayer 2 → ... → DecoderLayer N → 输出: (B, T, C) 其前向传播也大同小异,与编码器不同的是需要传递因果掩码,tgt_mask,防止看到未来信息,同时还传入了源序列的pandding掩码,跟输入给编码器的mask是一样的。 class DecoderLayer(nn.Module): """Transformer 解码层(自注意力 + 交叉注意力 + 前馈)""" def __init__(self, d_model: int, nhead: int, dim_ff: int, dropout: float = 0.1): super().__init__() self.self_attn = MultiHeadAttention(d_model, nhead, dropout) self.norm1 = nn.LayerNorm(d_model) self.cross_attn = MultiHeadAttention(d_model, nhead, dropout) self.norm2 = nn.LayerNorm(d_model) self.ff = PositionwiseFeedForward(d_model, dim_ff, dropout) self.norm3 = nn.LayerNorm(d_model) def forward( self, x: torch.Tensor, memory: torch.Tensor, tgt_mask: torch.Tensor | None = None, tgt_key_padding_mask: torch.Tensor | None = None, memory_key_padding_mask: torch.Tensor | None = None, ) -> torch.Tensor: """单层解码层前向。 参数: - x: (B, T, C) 解码器输入 - memory: (B, S, C) 编码器输出 - tgt_mask: (T, T) 因果掩码,true为屏蔽 - tgt_key_padding_mask: (B, T) - memory_key_padding_mask: (B, S) 返回: - (B, T, C) """ # 1) 解码器自注意力(带因果掩码 tgt_mask) sa = self.self_attn(x, x, x, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask) x = self.norm1(x + sa) # 2) 交叉注意力:Q 来自解码器,K/V 来自编码器 memory ca = self.cross_attn(x, memory, memory, attn_mask=None, key_padding_mask=memory_key_padding_mask) x = self.norm2(x + ca) # 3) 前馈 ff = self.ff(x) x = self.norm3(x + ff) return x 解码器层比编码器层多了一个cross_attn交叉注意力。除了输入数据有些不同,其他都基本类似,下面按前向传播的流程来分析一下。 首先是第一个子层自注意力的计算,输入X(q),X(k),X(v)来自解码器侧路径的输入,推理模式则是由自己预测自回归的输入,训练模式是给定的。自注意力传入了因果掩码attn_mask和屏蔽pandding mask。 其次就是计算残差和层归一化,与编码器类似。 接着就是计算交叉注意力了,核心的注意力类还是MultiHeadAttention,跟编码器和解码器的都来自一个。唯一的区别就是传入的参数不一样,其中查询Q来自于解码器当前的状态X即解码器上一个自注意力的的输出,特征路径是解码器给的信息。而键值K,V则使用的是编码器的输出memory,不使用因果掩码,因为因果掩码前面已经处理了。 最后就是前馈网络的升维和降维处理等了,跟编码器就一样了,就不阐述了。 三个子层的不同作用: 自注意力层:处理目标序列内部的关系,生成"i have an apple"时,"have"应该关注"i","an"应该关注"i have",通过因果掩码确保只能看到历史信息。 交叉注意力层:让解码器"看到"编码器的信息,翻译成英文时,需要参考中文源序列,通过交叉注意力,解码器可以访问编码器的完整表示。 前馈网络则层:增加非线性表达能力,每个位置独立计算,不涉及位置间的关系。 注意力 接下来就是核心MultiHeadAttention。 MultiHeadAttention class MultiHeadAttention(nn.Module): """多头注意力(Batch-first) - 输入输出为 (B, L, C) - 内部将通道 C 切分到 H 个头,每头维度 Dh=C/H - 支持两类掩码: 1) attn_mask: (Lq, Lk) 下三角等自回归掩码 2) key_padding_mask: (B, Lk) 序列 padding 掩码 两者会在内部合并为可广播到 (B,H,Lq,Lk) 的布尔张量。 """ def __init__(self, d_model: int, nhead: int, dropout: float = 0.1): super().__init__() assert d_model % nhead == 0, "d_model 必须能被 nhead 整除" self.d_model = d_model self.nhead = nhead self.d_head = d_model // nhead self.w_q = nn.Linear(d_model, d_model) self.w_k = nn.Linear(d_model, d_model) self.w_v = nn.Linear(d_model, d_model) self.attn = ScaledDotProductAttention(dropout) self.proj = nn.Linear(d_model, d_model) self.dropout = nn.Dropout(dropout) # 将 (B, L, C) 重塑为 (B, L, H, Dh),原来的数据都不会变化,只是形状改变了 # 加了一个维,然后交换了张量维度顺序。 def _shape(self, x: torch.Tensor) -> torch.Tensor: """(B, L, C) 切分重排为 (B, H, L, Dh)。""" B, L, C = x.shape # 第一步:将 (B, L, C) 重塑为 (B, L, H, Dh) x_reshaped = x.view(B, L, self.nhead, self.d_head) #x.view不复制数据,只是改变数据的"视角",数据在内存中存储顺序不变 # 第二步:交换维度 1 和 2,从 (B, L, H, Dh) 变为 (B, H, L, Dh) x_transposed = x_reshaped.transpose(1, 2) return x_transposed def _merge(self, x: torch.Tensor) -> torch.Tensor: """(B, H, L, Dh) 合并重排回 (B, L, C)。""" B, H, L, Dh = x.shape # 第一步:交换维度 1 和 2,从 (B, H, L, Dh) 变为 (B, L, H, Dh) x_transposed = x.transpose(1, 2) # 第二步:确保内存连续,然后重塑为 (B, L, H*Dh) x_contiguous = x_transposed.contiguous() # 第三步:重塑为 (B, L, C) 其中 C = H * Dh x_reshaped = x_contiguous.view(B, L, H * Dh) return x_reshaped # 因为QKV算的是矩阵,在transformer中涉及到两个mask # 一个是attn_mask控制哪些位置可以相互关注,如因果掩码防止看未来 # 一个是key_padding_mask控制哪些位置是有效的,如填充token不应该被关注 # 因为都要计算所以把这两个使用|合并起来,一起跟QKV计算即可,否则得计算两次。 # 对于encode来说传参只会穿key_pandding_mask,另外一个没有 # 对于decoder来说,两个都会传递。 def _build_attn_mask( self, Lq: int, Lk: int, attn_mask: torch.Tensor | None, key_padding_mask: torch.Tensor | None, device: torch.device, ) -> torch.Tensor | None: """将两类掩码合并成 (1/ B, 1/ H, Lq, Lk) 可广播布尔张量。True 表示屏蔽。""" mask = None if attn_mask is not None: # (Lq, Lk) -> (1,1,Lq,Lk) m1 = attn_mask.to(device).unsqueeze(0).unsqueeze(0) mask = m1 if mask is None else (mask | m1) if key_padding_mask is not None: # (B, Lk) -> (B,1,1,Lk) m2 = key_padding_mask.to(device).unsqueeze(1).unsqueeze(1) mask = m2 if mask is None else (mask | m2) return mask (0)网络层定义 self.w_q = nn.Linear(d_model, d_model) # 查询线性变换 self.w_k = nn.Linear(d_model, d_model) # 键线性变换 self.w_v = nn.Linear(d_model, d_model) # 值线性变换 self.attn = ScaledDotProductAttention(dropout) # 缩放点积注意力 self.proj = nn.Linear(d_model, d_model) # 输出投影 self.dropout = nn.Dropout(dropout) # 输出dropout w_q, w_k, w_v: 将输入转换为查询、键、值表示,attn为计算注意力权重和加权求和,proj将多头结果投影会原始维度,dropout是防止过拟合。 (1)将输入分成多个头 对输入按照head划分为多份,所以这里需要注意的是d_model必现要能被nhead整除,确保每个头有相同的维度。如原来的输入为(B,L,C)切分后变成(B, H, L, Dh),Dh=d_model/nhead。 第一步先使用view重塑为(B, H, L, Dh),然后第二步进行重排。举个例子输入为(B, L, C) = (1, 4, 6)重塑为(B, L, H, Dh) = (1, 4, 2, 3),重塑后的内存布局,[word1_head1_3, word1_head2_3, word2_head1_3, word2_head2_3, ...]每个词的头是交错存储的,为了适应多头注意力的并行计算还要重排一下,让每个头的数据连续存储。 (2)掩码合并 将key_padding_mask和attn_mask(因果)进行合并,这样后续计算就不用计算两次了。 # 使用逻辑或运算 | 合并 # True | True = True (屏蔽) # True | False = True (屏蔽) # False | False = False (不屏蔽) # 最终掩码形状: (B, H, Lq, Lk) 或 (1, H, Lq, Lk) # 可以广播到注意力计算的形状 (3)每个头计算注意力 Q = self._shape(self.w_q(query)) # (B,H,Lq,Dh) K = self._shape(self.w_k(key)) # (B,H,Lk,Dh) V = self._shape(self.w_v(value)) # (B,H,Lk,Dh) mask = self._build_attn_mask(Lq, Lk, attn_mask, key_padding_mask, device) out = self.attn(Q, K, V, mask) # (B,H,Lq,Dh) 计算注意力时,首先对输入分别进行计算线性变换(如QxWq,这样就有参数了)然后重排分别得到QKV,对于编码器来说输入的query、key、value都是一样的,计算QKV的方式也是一样的,都是进行线性nn.Linear层然后再进行重排,但是各自有各自参数,这就是要训练的参数。经过线性层的结果后都需要调用_shape进行重排划分为多个头的数据,便于输入给多头注意力;构建好合并后的掩码之后,就传递到attn中计算注意力。计算出的多头的注意力,需要合并为原来的形状,最后再通过一个线性变化得到最后的结果输出。 完整的数据流示例: # 输入: query (1, 4, 6), key (1, 4, 6), value (1, 4, 6) # 参数: d_model=6, nhead=2, d_head=3 # 步骤1: 线性变换 (保持形状) # w_q(query): (1, 4, 6) -> (1, 4, 6) # w_k(key): (1, 4, 6) -> (1, 4, 6) # w_v(value): (1, 4, 6) -> (1, 4, 6) # 每个词从6维变换到6维 # 学习查询、键、值的表示 # 步骤2: 分头 # _shape(w_q(query)): (1, 4, 6) -> (1, 2, 4, 3) # _shape(w_k(key)): (1, 4, 6) -> (1, 2, 4, 3) # _shape(w_v(value)): (1, 4, 6) -> (1, 2, 4, 3) # 将6维分成2个头,每个头3维 # 头1: 3维表示 # 头2: 3维表示 # 步骤3: 注意力计算 # attn(Q, K, V, mask): (1, 2, 4, 3) -> (1, 2, 4, 3) # 每个头独立计算注意力: # 头1: 计算4个位置之间的注意力,每个位置3维 # 头2: 计算4个位置之间的注意力,每个位置3维 # 步骤4: 合并头 # _merge(out): (1, 2, 4, 3) -> (1, 4, 6) # 将2个头的3维表示合并回6维 # 每个位置现在包含所有头的信息 # 步骤5: 输出变换 # proj(out): (1, 4, 6) -> (1, 4, 6) # dropout(out): (1, 4, 6) -> (1, 4, 6) # 最终输出: (1, 4, 6) ScaledDotProductAttention class ScaledDotProductAttention(nn.Module): """缩放点积注意力(单头) 给定 Q(查询)、K(键)、V(值) 与掩码,计算注意力加权输出。 形状约定: - Q: (B, H, Lq, Dh) - K: (B, H, Lk, Dh) - V: (B, H, Lk, Dh) - mask: 可广播到 (B, H, Lq, Lk),True 表示屏蔽。 """ def __init__(self, dropout: float = 0.1): super().__init__() self.dropout = nn.Dropout(dropout) def forward(self, Q: torch.Tensor, K: torch.Tensor, V: torch.Tensor, mask: torch.Tensor | None = None): """计算缩放点积注意力。 参数: - Q: (B, H, Lq, Dh) - K: (B, H, Lk, Dh) - V: (B, H, Lk, Dh) - mask: 可广播到 (B, H, Lq, Lk) 的布尔掩码,True 表示屏蔽 返回: - (B, H, Lq, Dh) """ d_k = Q.size(-1) # 注意力分数 = QK^T / sqrt(dk) scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k) # (B,H,Lq,Lk) if mask is not None: # 对被屏蔽位置填充一个极小值,softmax 后 ~0 scores = scores.masked_fill(mask, float("-inf")) attn = torch.softmax(scores, dim=-1) # (B,H,Lq,Lk) attn = self.dropout(attn) out = torch.matmul(attn, V) # (B,H,Lq,Dh) return out 这里就是实现缩放点积注意力机制了,Q.transpose(-2, -1)将K的最后两个维度转置,torch.matmul(Q, K^T): 计算Q和K的点积,再math.sqrt(d_k): 缩放因子,防止分数过大。 可以看到会根据传入的mask进行处理,让mask=True的位置会被填充为-inf,这样经过softmax之后,这些位置就接近0,从而实现了屏蔽某位位置的效果。 softmax是将分数转换为概率分布,所有位置的权重和为1,分数越高的位置,权重越大,也就是跟词相关性越大提取的值越丰富,如果是0那基本不相关,掩码为true的位置就是0,也就是基本不提取信息。 总结一下,核心就是公式Attention(Q,K,V) = softmax(QK^T/√d_k)V计算。 应用 接下来就是调用应用了 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") dataset = ToyDataset(pairs) loader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn) model = Seq2SeqTransformer( src_vocab_size=len(SRC_ITOS), tgt_vocab_size=len(TGT_ITOS), d_model=6, nhead=3, num_encoder_layers=2, num_decoder_layers=2, dim_ff=256, dropout=0.1 ).to(device) criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX) optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4) 定义dataset、loader准备数据,然后定义模型model,损失函数定义以及优化方法。 def evaluate_sample(sent="我 有 一个 苹果"): """辅助函数:对输入中文句子进行编码→推理→解码并打印结果。""" ids = encode_src(sent) print("ids",ids) pred_ids = model.greedy_decode(ids, device=device) pred_text = decode_tgt(pred_ids) print(f'INPUT : {sent}') print(f'OUTPUT: {pred_text}\n') print("Before training:") evaluate_sample("我 有 一个 苹果") 上面是整个应用翻译应用,在没有训练出参数,自然预测出的结果是不对的。 EPOCHS = 800 # 小步数即可过拟合玩具数据 for epoch in range(1, EPOCHS + 1): model.train() total_loss = 0.0 for src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask in loader: src = src.to(device) tgt_in = tgt_in.to(device) tgt_out = tgt_out.to(device) src_pad_mask = src_pad_mask.to(device) tgt_pad_mask = tgt_pad_mask.to(device) logits = model(src, tgt_in, src_pad_mask, tgt_pad_mask) # (B, T, V) loss = criterion(logits.reshape(-1, logits.size(-1)), tgt_out.reshape(-1)) optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() total_loss += loss.item() if epoch % 5 == 0 or epoch == 1: print(f"Epoch {epoch:02d} | loss={total_loss/len(loader):.4f}") evaluate_sample("我 有 一个 苹果") 上面是训练过程。 常见问题 (1) 解码器训练时的输入和推理时的输入有什么不同? 训练模式是固定长度输入,例如(2,5),所有样本都padding到相同长度,批次内所有样本的长度一致。 # 使用教师强制,目标序列已知 tgt_in = [BOS, i, have, an, apple,PAD] # 完整的输入序列 tgt_out = [i, have, an, apple, EOS] # 完整的监督目标 而推理模式序列长度随着时间步逐步增长,例如# 例如: (1, 1) → (1, 2) → (1, 3) → ...,每次生成后长度+1。 # 逐步生成,每次只预测下一个token ys = [[BOS_ID]] # 第1步 ys = [[BOS_ID, i]] # 第2步 ys = [[BOS_ID, i, have]] # 第3步 ys = [[BOS_ID, i, have, an]] # 第4步 ys = [[BOS_ID, i, have, an,apple]] # 第5步 之所以有这样的差异是训练时用的是Teacher Forcing优势,使用了并行计算让所有位置可以同时计算预测,提高效率快速收敛。而推理时是自回归模式,每个token的生成只能基于之前输出的信息。 (2)什么情况下输入数据需要PAD? 通常无论是编码器的输入还是解码器的输入如果不是批量并行计算都可以不用PAD,但如果是批量并行都需要PAD MASK。 在训练模式下,为了提高效率需要批量并行计算,所以无论编码器还是解码器的输入都是需要PAD,在本文中要不要PAD动作是在DataLoader的回调函数中collate_fn进行的,会对编码器和解码器的输入都会pad对齐到一样的长度。 因此最主要的考量是否要批量并行计算,因为并行计算如果长度不同,无法并行处理,无论是自注意力分数、前馈网络、还是残差连接,只有长度一致,才能并行一下处理多个样本。而往往训练模型基本都是批量处理。 总之只处理一个样本时可以不需要PAD,如果要批量都一定需要PAD。而只处理一个样本,往往是推理模式场景。 (3)既然推理模式的编码器和解码器输入没有进行PAD到一定长度,那为什么无论编码器和解码器都依旧还需要传入PAD mask? 需要PAD mask我认为本质上有两点原因:其一用于告知模型输入序列的长度,其二为了接口的一致性,因为transformer最核心的是无论编码器还是解码器最终的核心是Scaled Dot-Product Attetion,可以理解为这是一个共有底层函数,都要调用,做兼容了所以一定要传这个参数。 (3)推理模式的解码器既然是一个一个token往后生成的然后依次拼接回给到输入,未来的词其实根本就没有输入,为什么还需要下三角度的因果mask? 本质上还是保证接口的兼容性,这块都无论是推理还是训练模式都需要传入这个因果mask。 首先在实现层面让训练模式和推理模式代码能够兼容,训练模式使用的是teacher forcing把整个目标序列一次性喂进去,那自然不能让模型看到未来token。推理模式严格上如果一次一个token,每次只输入已经生成的部分,在这种最简单的视线下,确实不需要再加下三角mask,因为未来token不存在,自然无法attend到。但是大多数框架都选择统一接口,无论训练还是推理都传causal mask,避免在不同模式下切换逻辑。 其次从推理模式的多样性考虑,即使是推理阶段,也有可能遇到这种情况,也就是批量生成,一次生成多个序列,每个序列长度不同。 下三角是一个通用的"未来屏蔽"机制,不只是为了防止模型看见未来token,也是为了让实现和训练推理保持一致,并支持批量/并行推理优化。 附:完整源码 # toy_transformer_translation.py # A tiny, runnable Transformer seq2seq example to translate Chinese->English on a toy dataset. # PyTorch >= 2.0 recommended. import math import random from dataclasses import dataclass from typing import List, Tuple import torch import torch.nn as nn from torch.utils.data import DataLoader, Dataset random.seed(0) torch.manual_seed(0) # -------------------------- # 1) Toy parallel corpus # -------------------------- pairs = [ # 基本陈述 ("我 有 一个 苹果", "i have an apple"), ("我 有 一本 书", "i have a book"), ("你 有 一个 苹果", "you have an apple"), ("他 有 一个 苹果", "he has an apple"), ("她 有 一个 苹果", "she has an apple"), ("我们 有 一个 苹果", "we have an apple"), ("我 喜欢 苹果", "i like apples"), ("我 吃 苹果", "i eat apples"), ("你 喜欢 书", "you like books"), ("我 喜欢 书", "i like books"), # 稍作扩展 ("我 有 两个 苹果", "i have two apples"), ("我 有 红色 苹果", "i have red apples"), ] # 中文使用"空格分词(简化)",英文用空格分词 def build_vocab(examples: List[str]): """构建词表(字符串→索引 与 索引→字符串) - 输入示例为用空格分词后的句子列表 - 加入特殊符号 `<pad>`, `<bos>`, `<eos>` 并将其它 token 排序,保证可复现 返回: stoi: dict[token->id] itos: List[id->token] """ tokens = set() # 建立一个集合,用于存储所有不同的token for s in examples: # 遍历所有句子,s是句子,如我 有 一个 苹果 for t in s.split(): # 遍历句子中的每个token,t是token,如我 tokens.add(t.lower()) # 将token添加到集合中,并转换为小写,如我 # 特殊符号 itos = ["<pad>", "<bos>", "<eos>"] + sorted(tokens) # 将特殊符号和所有不同的token排序 # print(itos) stoi = {t: i for i, t in enumerate(itos)} # 将token和索引建立映射关系 # print(stoi) return stoi, itos src_texts = [p[0] for p in pairs] tgt_texts = [p[1] for p in pairs] print("src_texts",src_texts) print("tgt_texts",tgt_texts) SRC_STOI, SRC_ITOS = build_vocab(src_texts) print("SRC_STOI",SRC_STOI) print("SRC_ITOS",SRC_ITOS) TGT_STOI, TGT_ITOS = build_vocab(tgt_texts) print("TGT_STOI",TGT_STOI) print("TGT_ITOS",TGT_ITOS) PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2 #将源语句编码为索引序列(不含 BOS/EOS),如我 有 一个 苹果 -> [1, 2, 3, 4] def encode_src(s: str) -> List[int]: """将原语句(已空格分词)编码为索引序列(不含 BOS/EOS)。""" return [SRC_STOI[w.lower()] for w in s.split()] def encode_tgt(s: str) -> List[int]: """将目标语句编码为索引序列,并在首尾添加 BOS/EOS。""" return [BOS_IDX] + [TGT_STOI[w.lower()] for w in s.split()] + [EOS_IDX] def decode_tgt(ids: List[int]) -> str: """将目标端索引序列解码回字符串(忽略 PAD/BOS,遇到 EOS 停止)。""" words = [] for i in ids: if i == EOS_IDX: break if i in (PAD_IDX, BOS_IDX): continue words.append(TGT_ITOS[i]) return " ".join(words) @dataclass class Example: """单条并行样本 - src: 源语言索引序列(不含 BOS/EOS) - tgt: 目标语言索引序列(含 BOS/EOS) """ src: List[int] tgt: List[int] class ToyDataset(Dataset): """极小玩具平行语料数据集,用于快速过拟合演示。""" def __init__(self, pairs: List[Tuple[str, str]]): self.data = [Example(encode_src(s), encode_tgt(t)) for s, t in pairs] def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx] def collate_fn(batch: List[Example]): """将一个 batch 的样本对齐为等长张量,并构造 teacher forcing 所需的输入/输出。 返回: - src: (B,S) 源序列,已 padding - tgt_in: (B,T) 解码器输入(含 BOS,右对齐 padding) - tgt_out: (B,T) 解码器监督目标(对 tgt_in 右移一位,含 EOS) - src_pad_mask: (B,S) 源端 padding 掩码,True 表示 padding 位置 - tgt_pad_mask: (B,T) 目标端 padding 掩码(针对输入序列) """ # padding to max length in batch src_max = max(len(b.src) for b in batch) tgt_max = max(len(b.tgt) for b in batch) src_batch = [] tgt_in_batch = [] tgt_out_batch = [] for ex in batch: src = ex.src + [PAD_IDX] * (src_max - len(ex.src)) # Teacher forcing: shift-in, shift-out tgt_in = ex.tgt[:-1] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[:-1])) tgt_out = ex.tgt[1:] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[1:])) src_batch.append(src) tgt_in_batch.append(tgt_in) tgt_out_batch.append(tgt_out) src = torch.tensor(src_batch, dtype=torch.long) # (B, S) tgt_in = torch.tensor(tgt_in_batch, dtype=torch.long) # (B, T_in) tgt_out = torch.tensor(tgt_out_batch, dtype=torch.long) # (B, T_out) src_pad_mask = src.eq(PAD_IDX) # (B, S) tgt_pad_mask = tgt_in.eq(PAD_IDX) # (B, T) return src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask # -------------------------- # 2) Positional encoding # -------------------------- class PositionalEncoding(nn.Module): """经典正弦/余弦位置编码。 给定嵌入 `x (B,L,C)`,按长度切片并与位置编码相加,再做 dropout。 """ def __init__(self, d_model: int, max_len: int = 5000, dropout: float = 0.1): super().__init__() self.dropout = nn.Dropout(dropout) # 创建一个形状为 (max_len, d_model) 的零张量,用于存储位置编码 pe = torch.zeros(max_len, d_model) # (L, C) # 创建一个形状为 (max_len, 1) 的张量,用于存储位置索引 position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # (L, 1) # 创建一个形状为 (d_model//2,) 的张量,用于存储位置编码的缩放因子 div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # sin, cos 交错 pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) # (1, L, C) self.register_buffer("pe", pe) def forward(self, x: torch.Tensor): # (B, L, C) """为输入嵌入添加位置编码并做 dropout。 参数: - x: (B, L, C) 返回: - (B, L, C) """ x = x + self.pe[:, : x.size(1)] return self.dropout(x) # -------------------------- # 3) 手写 Transformer 编码/解码层(含详细注释) # -------------------------- class ScaledDotProductAttention(nn.Module): """缩放点积注意力(单头) 给定 Q(查询)、K(键)、V(值) 与掩码,计算注意力加权输出。 形状约定: - Q: (B, H, Lq, Dh) - K: (B, H, Lk, Dh) - V: (B, H, Lk, Dh) - mask: 可广播到 (B, H, Lq, Lk),True 表示屏蔽。 """ def __init__(self, dropout: float = 0.1): super().__init__() self.dropout = nn.Dropout(dropout) def forward(self, Q: torch.Tensor, K: torch.Tensor, V: torch.Tensor, mask: torch.Tensor | None = None): """计算缩放点积注意力。 参数: - Q: (B, H, Lq, Dh) - K: (B, H, Lk, Dh) - V: (B, H, Lk, Dh) - mask: 可广播到 (B, H, Lq, Lk) 的布尔掩码,True 表示屏蔽 返回: - (B, H, Lq, Dh) """ d_k = Q.size(-1) # 注意力分数 = QK^T / sqrt(dk) scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k) # (B,H,Lq,Lk) if mask is not None: # 对被屏蔽位置填充一个极小值,softmax 后 ~0 scores = scores.masked_fill(mask, float("-inf")) attn = torch.softmax(scores, dim=-1) # (B,H,Lq,Lk) attn = self.dropout(attn) out = torch.matmul(attn, V) # (B,H,Lq,Dh) return out class MultiHeadAttention(nn.Module): """多头注意力(Batch-first) - 输入输出为 (B, L, C) - 内部将通道 C 切分到 H 个头,每头维度 Dh=C/H - 支持两类掩码: 1) attn_mask: (Lq, Lk) 下三角等自回归掩码 2) key_padding_mask: (B, Lk) 序列 padding 掩码 两者会在内部合并为可广播到 (B,H,Lq,Lk) 的布尔张量。 """ def __init__(self, d_model: int, nhead: int, dropout: float = 0.1): super().__init__() assert d_model % nhead == 0, "d_model 必须能被 nhead 整除" self.d_model = d_model self.nhead = nhead self.d_head = d_model // nhead self.w_q = nn.Linear(d_model, d_model) self.w_k = nn.Linear(d_model, d_model) self.w_v = nn.Linear(d_model, d_model) self.attn = ScaledDotProductAttention(dropout) self.proj = nn.Linear(d_model, d_model) self.dropout = nn.Dropout(dropout) # 将 (B, L, C) 重塑为 (B, L, H, Dh),原来的数据都不会变化,只是形状改变了 # 加了一个维,然后交换了张量维度顺序。 def _shape(self, x: torch.Tensor) -> torch.Tensor: """(B, L, C) 切分重排为 (B, H, L, Dh)。""" B, L, C = x.shape # 第一步:将 (B, L, C) 重塑为 (B, L, H, Dh) x_reshaped = x.view(B, L, self.nhead, self.d_head) #x.view不复制数据,只是改变数据的"视角",数据在内存中存储顺序不变 # 第二步:交换维度 1 和 2,从 (B, L, H, Dh) 变为 (B, H, L, Dh) x_transposed = x_reshaped.transpose(1, 2) return x_transposed def _merge(self, x: torch.Tensor) -> torch.Tensor: """(B, H, L, Dh) 合并重排回 (B, L, C)。""" B, H, L, Dh = x.shape # 第一步:交换维度 1 和 2,从 (B, H, L, Dh) 变为 (B, L, H, Dh) x_transposed = x.transpose(1, 2) # 第二步:确保内存连续,然后重塑为 (B, L, H*Dh) x_contiguous = x_transposed.contiguous() # 第三步:重塑为 (B, L, C) 其中 C = H * Dh x_reshaped = x_contiguous.view(B, L, H * Dh) return x_reshaped # 因为QKV算的是矩阵,在transformer中涉及到两个mask # 一个是attn_mask控制哪些位置可以相互关注,如因果掩码防止看未来 # 一个是key_padding_mask控制哪些位置是有效的,如填充token不应该被关注 # 因为都要计算所以把这两个使用|合并起来,一起跟QKV计算即可,否则得计算两次。 # 对于encode来说传参只会穿key_pandding_mask,另外一个没有 # 对于decoder来说,两个都会传递。 def _build_attn_mask( self, Lq: int, Lk: int, attn_mask: torch.Tensor | None, key_padding_mask: torch.Tensor | None, device: torch.device, ) -> torch.Tensor | None: """将两类掩码合并成 (1/ B, 1/ H, Lq, Lk) 可广播布尔张量。True 表示屏蔽。""" mask = None if attn_mask is not None: # (Lq, Lk) -> (1,1,Lq,Lk) m1 = attn_mask.to(device).unsqueeze(0).unsqueeze(0) mask = m1 if mask is None else (mask | m1) if key_padding_mask is not None: # (B, Lk) -> (B,1,1,Lk) m2 = key_padding_mask.to(device).unsqueeze(1).unsqueeze(1) mask = m2 if mask is None else (mask | m2) return mask def forward( self, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, attn_mask: torch.Tensor | None = None, key_padding_mask: torch.Tensor | None = None, ) -> torch.Tensor: """多头注意力前向。 参数: - query, key, value: (B, L, C) - attn_mask: (Lq, Lk) 因果/结构掩码,True 为屏蔽 - key_padding_mask: (B, Lk) padding 掩码,True 为 padding 返回: - (B, Lq, C) """ # 输入均为 (B, L, C) B, Lq, _ = query.shape _, Lk, _ = key.shape device = query.device Q = self._shape(self.w_q(query)) # (B,H,Lq,Dh) K = self._shape(self.w_k(key)) # (B,H,Lk,Dh) V = self._shape(self.w_v(value)) # (B,H,Lk,Dh) mask = self._build_attn_mask(Lq, Lk, attn_mask, key_padding_mask, device) out = self.attn(Q, K, V, mask) # (B,H,Lq,Dh) out = self._merge(out) # (B,Lq,C) out = self.proj(out) out = self.dropout(out) return out class PositionwiseFeedForward(nn.Module): """前馈网络:逐位置的两层 MLP(含激活与 dropout)""" def __init__(self, d_model: int, dim_ff: int, dropout: float = 0.1): super().__init__() self.fc1 = nn.Linear(d_model, dim_ff) self.fc2 = nn.Linear(dim_ff, d_model) self.act = nn.ReLU() self.dropout = nn.Dropout(dropout) def forward(self, x: torch.Tensor) -> torch.Tensor: """两层逐位置前馈网络。 参数: - x: (B, L, C) 返回: - (B, L, C) """ x = self.fc2(self.dropout(self.act(self.fc1(x)))) x = self.dropout(x) return x class EncoderLayer(nn.Module): """Transformer 编码层(后归一化 post-norm 版本) 子层:自注意力 + 前馈;均带残差连接与 LayerNorm。 """ def __init__(self, d_model: int, nhead: int, dim_ff: int, dropout: float = 0.1): super().__init__() self.self_attn = MultiHeadAttention(d_model, nhead, dropout) self.norm1 = nn.LayerNorm(d_model) self.ff = PositionwiseFeedForward(d_model, dim_ff, dropout) self.norm2 = nn.LayerNorm(d_model) def forward(self, x: torch.Tensor, src_key_padding_mask: torch.Tensor | None = None) -> torch.Tensor: """单层编码层前向。 参数: - x: (B, S, C) - src_key_padding_mask: (B, S) True 为 padding 返回: - (B, S, C) """ # 自注意力子层 attn_out = self.self_attn(x, x, x, attn_mask=None, key_padding_mask=src_key_padding_mask) x = self.norm1(x + attn_out) # 前馈子层 ff_out = self.ff(x) x = self.norm2(x + ff_out) return x class DecoderLayer(nn.Module): """Transformer 解码层(自注意力 + 交叉注意力 + 前馈)""" def __init__(self, d_model: int, nhead: int, dim_ff: int, dropout: float = 0.1): super().__init__() self.self_attn = MultiHeadAttention(d_model, nhead, dropout) self.norm1 = nn.LayerNorm(d_model) self.cross_attn = MultiHeadAttention(d_model, nhead, dropout) self.norm2 = nn.LayerNorm(d_model) self.ff = PositionwiseFeedForward(d_model, dim_ff, dropout) self.norm3 = nn.LayerNorm(d_model) def forward( self, x: torch.Tensor, memory: torch.Tensor, tgt_mask: torch.Tensor | None = None, tgt_key_padding_mask: torch.Tensor | None = None, memory_key_padding_mask: torch.Tensor | None = None, ) -> torch.Tensor: """单层解码层前向。 参数: - x: (B, T, C) 解码器输入 - memory: (B, S, C) 编码器输出 - tgt_mask: (T, T) 因果掩码,true为屏蔽 - tgt_key_padding_mask: (B, T) - memory_key_padding_mask: (B, S) 返回: - (B, T, C) """ # 1) 解码器自注意力(带因果掩码 tgt_mask) sa = self.self_attn(x, x, x, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask) x = self.norm1(x + sa) # 2) 交叉注意力:Q 来自解码器,K/V 来自编码器 memory ca = self.cross_attn(x, memory, memory, attn_mask=None, key_padding_mask=memory_key_padding_mask) x = self.norm2(x + ca) # 3) 前馈 ff = self.ff(x) x = self.norm3(x + ff) return x class Encoder(nn.Module): def __init__(self, d_model: int, nhead: int, dim_ff: int, num_layers: int, dropout: float = 0.1): super().__init__() self.layers = nn.ModuleList([ EncoderLayer(d_model, nhead, dim_ff, dropout) for _ in range(num_layers) ]) def forward(self, x: torch.Tensor, src_key_padding_mask: torch.Tensor | None = None) -> torch.Tensor: """堆叠若干编码层。 参数: - x: (B, S, C) - src_key_padding_mask: (B, S) True 为 padding 返回: - (B, S, C) """ for layer in self.layers: x = layer(x, src_key_padding_mask=src_key_padding_mask) return x class Decoder(nn.Module): def __init__(self, d_model: int, nhead: int, dim_ff: int, num_layers: int, dropout: float = 0.1): super().__init__() self.layers = nn.ModuleList([ DecoderLayer(d_model, nhead, dim_ff, dropout) for _ in range(num_layers) ]) def forward( self, x: torch.Tensor, memory: torch.Tensor, tgt_mask: torch.Tensor | None = None, tgt_key_padding_mask: torch.Tensor | None = None, memory_key_padding_mask: torch.Tensor | None = None, ) -> torch.Tensor: """堆叠若干解码层。 参数: - x: (B, T, C) 目标端嵌入 - memory: (B, S, C) 编码器输出 - tgt_mask: (T, T) 因果掩码,True 为屏蔽 - tgt_key_padding_mask: (B, T) 目标端 padding 掩码 - memory_key_padding_mask: (B, S) 源端 padding 掩码 返回: - (B, T, C) """ for layer in self.layers: x = layer( x, memory, tgt_mask=tgt_mask, tgt_key_padding_mask=tgt_key_padding_mask, memory_key_padding_mask=memory_key_padding_mask, ) return x class Seq2SeqTransformer(nn.Module): """最小可运行的手写 Transformer 序列到序列模型 - 使用我们实现的 Encoder/Decoder/MHA/FFN - 仍保持与上文训练/解码接口一致 """ def __init__(self, src_vocab_size, tgt_vocab_size, d_model=128, nhead=4, num_encoder_layers=2, num_decoder_layers=2, dim_ff=256, dropout=0.1): super().__init__() self.d_model = d_model self.src_tok = nn.Embedding(src_vocab_size, d_model, padding_idx=PAD_IDX) self.tgt_tok = nn.Embedding(tgt_vocab_size, d_model, padding_idx=PAD_IDX) self.pos_enc = PositionalEncoding(d_model, dropout=dropout) self.encoder = Encoder(d_model, nhead, dim_ff, num_encoder_layers, dropout) self.decoder = Decoder(d_model, nhead, dim_ff, num_decoder_layers, dropout) self.generator = nn.Linear(d_model, tgt_vocab_size) def make_subsequent_mask(self, sz: int) -> torch.Tensor: """构造大小为 (sz, sz) 的下三角因果掩码;True 为屏蔽(不允许看未来)。""" return torch.triu(torch.ones(sz, sz, dtype=torch.bool), diagonal=1) def forward(self, src, tgt_in, src_pad_mask, tgt_pad_mask): """训练/教师强制阶段的前向。 参数: - src: (B, S) 源 token id - tgt_in: (B, T) 目标端输入(以 BOS 开头) - src_pad_mask: (B, S) True 为 padding - tgt_pad_mask: (B, T) True 为 padding(针对 tgt_in) 返回: - logits: (B, T, V) 词表维度的分类分布 """ # 1) 词嵌入 + 位置编码 src_emb = self.pos_enc(self.src_tok(src)) # (B,S,C) tgt_emb = self.pos_enc(self.tgt_tok(tgt_in)) # (B,T,C) # 2) 编码:仅使用 key_padding_mask 屏蔽 padding memory = self.encoder(src_emb, src_key_padding_mask=src_pad_mask) # (B,S,C) # 3) 解码:自注意力需要因果掩码 + padding 掩码;交叉注意力需要 memory 的 padding 掩码 tgt_mask = self.make_subsequent_mask(tgt_in.size(1)).to(src.device) # (T,T) out = self.decoder( tgt_emb, memory, tgt_mask=tgt_mask, tgt_key_padding_mask=tgt_pad_mask, memory_key_padding_mask=src_pad_mask, ) # (B,T,C) logits = self.generator(out) return logits @torch.no_grad() def greedy_decode(self, src_ids: List[int], max_len=20, device="cpu"): """在推理阶段进行贪心解码。 参数: - src_ids: 源端 token id 序列(不含 BOS/EOS) - max_len: 最大生成长度(含 BOS/EOS) - device: 运行设备 返回: - 生成的目标端 id 序列(含 BOS/EOS) """ #切换为评估模式,关闭dropout/batchnorm等随机性 self.eval() # 将源端token id序列转换为张量,并添加一个维度,如[1, 2, 3, 4] -> [[1, 2, 3, 4]] # 变为批维度的 (1, S);dtype 为 long 主要是以适配 nn.Embedding的输入格式。 src = torch.tensor(src_ids, dtype=torch.long, device=device).unsqueeze(0) # 生成一个跟src相同形状的mask矩阵,让编码器不要计算提取pandding的位置信息。 #按元素判断 src 是否等于 PAD_IDX,等于的位置为 True,不等的位置为 False。 src_pad_mask = src.eq(PAD_IDX) # 计算src_tok= src 经过词嵌入+位置编码后的结果 src_tok = self.src_tok(src) src_pos = self.pos_enc(src_tok) # 将该结果送入编码器,返回的memory就是编码器提取的特征向量。 # 输入编码器,即使没有填充(pandding)的token,也需要传入src_key_padding_mask。 memory = self.encoder(src_pos, src_key_padding_mask=src_pad_mask) # 初始化目标端token id序列,维度为(1,1),初始值为BOS_IDX # 表示目标端序列的开始,BOS_IDX=1 # 推理时输入是没有PAD,但是仍然需要tgt_pad_mask. ys = torch.tensor([[BOS_IDX]], dtype=torch.long, device=device) for _ in range(max_len - 1): #计算本次解码的Mask,跟ys形状一样。 tgt_pad_mask = ys.eq(PAD_IDX) # 计算本次因果掩码,把未来看到的token都屏蔽。 tgt_mask = self.make_subsequent_mask(ys.size(1)).to(device) # 可以看到当推理模式时,解码器输入token数量依次是1,2,3,4..... out = self.decoder( self.pos_enc(self.tgt_tok(ys)), memory, tgt_key_padding_mask=tgt_pad_mask, memory_key_padding_mask=src_pad_mask, ) # 转化为预测词的概率分布 logits = self.generator(out[:, -1:, :]) # 使用贪心选择概率最大的作为本次预测的目标 next_token = logits.argmax(-1) next_id = next_token.item() # 显示选择的token token_text = TGT_ITOS[next_id] if next_id < len(TGT_ITOS) else f"ID_{next_id}" print(f"选择: {token_text}({next_id})") ys = torch.cat([ys, next_token], dim=1) if next_id == EOS_IDX: break return ys.squeeze(0).tolist() # -------------------------- # 4) Train # -------------------------- device = torch.device("cuda" if torch.cuda.is_available() else "cpu") dataset = ToyDataset(pairs) loader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn) model = Seq2SeqTransformer( src_vocab_size=len(SRC_ITOS), tgt_vocab_size=len(TGT_ITOS), d_model=6, nhead=3, num_encoder_layers=2, num_decoder_layers=2, dim_ff=256, dropout=0.1 ).to(device) criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX) optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4) def evaluate_sample(sent="我 有 一个 苹果"): """辅助函数:对输入中文句子进行编码→推理→解码并打印结果。""" ids = encode_src(sent) print("ids",ids) pred_ids = model.greedy_decode(ids, device=device) pred_text = decode_tgt(pred_ids) print(f'INPUT : {sent}') print(f'OUTPUT: {pred_text}\n') print("Before training:") evaluate_sample("我 有 一个 苹果") EPOCHS = 80 # 小步数即可过拟合玩具数据 for epoch in range(1, EPOCHS + 1): model.train() total_loss = 0.0 for src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask in loader: src = src.to(device) tgt_in = tgt_in.to(device) tgt_out = tgt_out.to(device) src_pad_mask = src_pad_mask.to(device) tgt_pad_mask = tgt_pad_mask.to(device) logits = model(src, tgt_in, src_pad_mask, tgt_pad_mask) # (B, T, V) loss = criterion(logits.reshape(-1, logits.size(-1)), tgt_out.reshape(-1)) optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() total_loss += loss.item() if epoch % 5 == 0 or epoch == 1: print(f"Epoch {epoch:02d} | loss={total_loss/len(loader):.4f}") evaluate_sample("我 有 一个 苹果") print("After training:") evaluate_sample("我 有 一个 苹果") evaluate_sample("我 有 一本 书") evaluate_sample("你 有 一个 苹果") -
dataset和DataLoader
简介 Dataset和DataLoader在pytorch中主要用于数据的组织。这两个类通常一起搭配处理深度学习中的数据流。 Dataset 用于产出“单个样本”:定义怎么按索引取到一个样本,以及总共有多少个样本。 DataLoader 负责“成批取样”:决定批大小、是否打乱、多进程加载、并用 collate_fn 把一个批里的样本“拼起来”(对齐、padding、mask、teacher forcing 等)。 一句话记忆:Dataset 只管“单条样本”;DataLoader 负责“多条怎么一起、怎么并行、怎么对齐”。变长就写 collate_fn,性能就调 workers/pin_memory/分桶。 Dataset Dataset类作用:定义数据集的统一接口,支持自定义数据加载逻辑。 关键方法: init:初始化数据路径、预处理函数等。 len:返回数据集样本总数。 getitem:根据索引返回单个样本(数据+标签)。 通常情况下用户都会有自己的数据集,所以定义的数据集类继承dataset。 #准备一个数据集 pairs: List[Tuple[str, str]] = [ ("我 有 一个 苹果", "i have an apple"), ("我 有 一本 书", "i have a book"), ("你 喜欢 书", "you like books"), ("我 吃 苹果", "i eat apples"), ] def build_vocab(texts: List[str]): tokens = set() for s in texts: tokens.update([w.lower() for w in s.split()]) itos = ["<pad>", "<bos>", "<eos>"] + sorted(tokens) stoi = {t: i for i, t in enumerate(itos)} return stoi, itos src_texts = [s for s, _ in pairs] tgt_texts = [t for _, t in pairs] SRC_STOI, SRC_ITOS = build_vocab(src_texts) TGT_STOI, TGT_ITOS = build_vocab(tgt_texts) PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2 def encode_src(s: str) -> List[int]: return [SRC_STOI[w.lower()] for w in s.split()] def encode_tgt(s: str) -> List[int]: return [BOS_IDX] + [TGT_STOI[w.lower()] for w in s.split()] + [EOS_IDX] # Dataset:定义“单样本怎么取” @dataclass class Example: src: List[int] tgt: List[int] class ToyDataset(Dataset): def __init__(self, pairs: List[Tuple[str, str]]): for s, t in pairs: print("encode_src(s)",encode_src(s)) print("encode_tgt(t)",encode_tgt(t)) self.data = [Example(encode_src(s), encode_tgt(t)) for s, t in pairs] def __len__(self) -> int: return len(self.data) def __getitem__(self, idx: int) -> Example: return self.data[idx] 样本结构:用 Example(src: List[int], tgt: List[int]) 表示一条样本的源序列与目标序列(都是 token id 列表)。 词表与编码:源序列仅分词并映射到 id。目标序列前加 bos、后加 eos,便于自回归训练。 协议:实现 len 和 getitem 两个方法即可被 DataLoader 使用。 DataLoader class torch.utils.data.DataLoader(Data[T_co]): def __init__( self, dataset, batch_size: int = 1, shuffle: bool | None = None, sampler = None, batch_sampler = None, num_workers: int = 0, collate_fn = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn = None, multiprocessing_context = None, generator = None, prefetch_factor: int = 2, persistent_workers: bool = False, pin_memory_device: str = "" ): ... dataset: Dataset 或 IterableDataset 实例。 batch_size: 每批样本数。 shuffle: 是否在每个 epoch 打乱索引(Map-style 且未显式传 sampler 时有效)。 sampler: 自定义样本采样器(与 shuffle 互斥;指定它就不要再用 shuffle)。 batch_sampler: 一次直接产出“一个 batch 的索引列表”(与 batch_size、shuffle、sampler 互斥)。 num_workers: 进程数(0 为主进程;>0 开多进程并行加载)。 collate_fn(samples_list) -> batch: 批内拼接函数;变长序列需要自定义(默认会尝试堆叠等长 tensor)。 pin_memory: 将 batch 固定到页锁内存,配合 CUDA 加速 H2D 拷贝。 drop_last: 数据量不是 batch_size 整数倍时,是否丢弃最后不满的一批。 timeout: 从 worker 等待数据的秒数(>0 时生效)。 worker_init_fn(worker_id): 每个 worker 的初始化回调(设随机种子、打开文件等)。 multiprocessing_context: 指定多进程上下文(spawn/forkserver 等)。 generator: 控制随机性(打乱、采样)用的随机数生成器。 prefetch_factor: 每个 worker 预取多少个 batch(num_workers > 0 时有效)。 persistent_workers: True 时 DataLoader 第一次迭代后保持 worker 不销毁,提高多轮迭代性能。 pin_memory_device: 当 pin_memory=True 时,指定固定内存的设备标签(一般留空即可)。 DataLoader返回是一个可迭代的对象,每次迭代产出一个批次的样本。一个批次的内容就是把当批样本列表交给 collate_fn 的返回值(若未自定义,则用 PyTorch 的默认 default_collate)。而类型取决于两点Dataset.getitem 返回什么(tensor/数值/dict/tuple…)和collate_fn 如何把一批“样本列表”拼成“批次”。 这里重点阐述一下collate_fn是一个用户需要注册的回调函数,目的是要把一个批的样本拼接起来。同时对于输入样本如果张量的形状不一致如变长序列,进行padding、对齐、mask等动作。 def collate_fn(batch: List[Example]): src_max = max(len(b.src) for b in batch) tgt_max = max(len(b.tgt) for b in batch) src_batch: List[List[int]] = [] tgt_in_batch: List[List[int]] = [] tgt_out_batch: List[List[int]] = [] for ex in batch: src = ex.src + [PAD_IDX] * (src_max - len(ex.src)) # teacher forcing:输入去掉最后一个、输出去掉第一个 tgt_in = ex.tgt[:-1] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[:-1])) tgt_out = ex.tgt[1:] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[1:])) src_batch.append(src) tgt_in_batch.append(tgt_in) tgt_out_batch.append(tgt_out) src = torch.tensor(src_batch, dtype=torch.long) # (B,S) tgt_in = torch.tensor(tgt_in_batch, dtype=torch.long) # (B,T) tgt_out = torch.tensor(tgt_out_batch, dtype=torch.long) # (B,T) src_pad_mask = src.eq(PAD_IDX) # (B,S) True=PAD tgt_pad_mask = tgt_in.eq(PAD_IDX) # (B,T) True=PAD return src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask 输入:batch 是若干个 Example,每个包含 src: List[int] 与 tgt: List[int](目标序列已含 bos/eos)。 核心:对齐变长序列(右侧 padding),构造 teacher forcing 的 (tgt_in, tgt_out),并生成 padding 掩码。 输出: src: (B, S) tgt_in: (B, T) tgt_out: (B, T) src_pad_mask: (B, S);True=PAD tgt_pad_mask: (B, T);True=PAD 首先使用src_max/tgt_max计算批内最长长度,这样能够将所有样本右侧补到同一长度,方便堆叠为矩阵。 接着定义批内累积的容器src_batch,tgt_in_batch,tgt_out_batch。 src_batch: 编码器输入样本的批次。 tgt_in_batch:解码器输入样本的批次。 tgt_out_batch:解码器输出样本的批次。 其次使用for循环对每个样本进行补齐,使其跟src_max、tgt_max长度一致,[PAD_IDX] * (src_max - len(ex.src))的意思是将[PAD_IDX]的单元素列表重复src_max - len(ex.src)用于拼接追加到ex.src后,使其对齐。tgt_in和tgt_out同理。 在对tgt_in和tgt_out做样本补齐时,因为输入ex.tgt是包含了bos和eos目标序列,对于tgt_in输入需要去掉最后一个token bos,tgt_out输出需要去掉第一个token eos。 然后就是将补齐的序列依次添加到src_batch,tgt_in_batch,tgt_out_batch。这样就对输入的数据进行了分类,把编码器的输入整合了在一起,解码器的输入和输出整合了一起。 最后就是将批内对齐后的源序列列表转换为张量,同时计算src和tag_in的mask,也就是说对数据哪些位置添加了pad。 下面是collate_fn相关的打印数据,便于理解。 batch [Example(src=[9, 10, 3, 11], tgt=[1, 11, 10, 4, 5, 2]), Example(src=[6, 8, 5], tgt=[1, 13, 12, 8, 2])] src [9, 10, 3, 11] tgt_in [1, 11, 10, 4, 5] tgt_out [11, 10, 4, 5, 2] src [6, 8, 5, 0] tgt_in [1, 13, 12, 8, 0] tgt_out [13, 12, 8, 2, 0] src_batch [[9, 10, 3, 11], [6, 8, 5, 0]] tgt_in_batch [[1, 11, 10, 4, 5], [1, 13, 12, 8, 0]] tgt_out_batch [[11, 10, 4, 5, 2], [13, 12, 8, 2, 0]] src tensor([[ 9, 10, 3, 11], [ 6, 8, 5, 0]]) tgt_in tensor([[ 1, 11, 10, 4, 5], [ 1, 13, 12, 8, 0]]) tgt_out tensor([[11, 10, 4, 5, 2], [13, 12, 8, 2, 0]]) src_pad_mask tensor([[False, False, False, False], [False, False, False, True]]) tgt_pad_mask tensor([[False, False, False, False, False], [False, False, False, False, True]]) src tensor([[ 9, 10, 3, 11], [ 6, 8, 5, 0]]) tgt_in tensor([[ 1, 11, 10, 4, 5], [ 1, 13, 12, 8, 0]]) tgt_out tensor([[11, 10, 4, 5, 2], [13, 12, 8, 2, 0]]) src_mask tensor([[False, False, False, False], [False, False, False, True]]) tgt_mask tensor([[False, False, False, False, False], [False, False, False, False, True]]) 最后完整的示例代码 #!/usr/bin/env python3 """ 最小可运行示例:用 Dataset + DataLoader(含 collate_fn)演示变长序列如何拼批并生成 padding 掩码。 运行: python3 dataloader_demo.py """ from dataclasses import dataclass from typing import List, Tuple import torch from torch.utils.data import Dataset, DataLoader # -------------------------- # 1) 准备一点语料(空格分词) # -------------------------- pairs: List[Tuple[str, str]] = [ ("我 有 一个 苹果", "i have an apple"), ("我 有 一本 书", "i have a book"), ("你 喜欢 书", "you like books"), ("我 吃 苹果", "i eat apples"), ] def build_vocab(texts: List[str]): tokens = set() for s in texts: tokens.update([w.lower() for w in s.split()]) itos = ["<pad>", "<bos>", "<eos>"] + sorted(tokens) stoi = {t: i for i, t in enumerate(itos)} return stoi, itos src_texts = [s for s, _ in pairs] tgt_texts = [t for _, t in pairs] SRC_STOI, SRC_ITOS = build_vocab(src_texts) TGT_STOI, TGT_ITOS = build_vocab(tgt_texts) PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2 def encode_src(s: str) -> List[int]: return [SRC_STOI[w.lower()] for w in s.split()] def encode_tgt(s: str) -> List[int]: return [BOS_IDX] + [TGT_STOI[w.lower()] for w in s.split()] + [EOS_IDX] # -------------------------- # 2) Dataset:定义“单样本怎么取” # -------------------------- @dataclass class Example: src: List[int] tgt: List[int] class ToyDataset(Dataset): def __init__(self, pairs: List[Tuple[str, str]]): for s, t in pairs: print("encode_src(s)",encode_src(s)) print("encode_tgt(t)",encode_tgt(t)) self.data = [Example(encode_src(s), encode_tgt(t)) for s, t in pairs] def __len__(self) -> int: return len(self.data) def __getitem__(self, idx: int) -> Example: return self.data[idx] # -------------------------- # 3) collate_fn:把“样本列表”拼成一批(对齐 padding + 生成 mask + teacher forcing) # -------------------------- def collate_fn(batch: List[Example]): src_max = max(len(b.src) for b in batch) #计算批次内最长长度,这样能将样本右侧补齐到同一长度,方便堆叠矩阵 tgt_max = max(len(b.tgt) for b in batch) src_batch: List[List[int]] = [] tgt_in_batch: List[List[int]] = [] tgt_out_batch: List[List[int]] = [] print("batch",batch) for ex in batch: src = ex.src + [PAD_IDX] * (src_max - len(ex.src)) # teacher forcing:输入去掉最后一个、输出去掉第一个 tgt_in = ex.tgt[:-1] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[:-1])) tgt_out = ex.tgt[1:] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[1:])) print("src",src) print("tgt_in",tgt_in) print("tgt_out",tgt_out) src_batch.append(src) tgt_in_batch.append(tgt_in) tgt_out_batch.append(tgt_out) print("src_batch",src_batch) print("tgt_in_batch",tgt_in_batch) print("tgt_out_batch",tgt_out_batch) src = torch.tensor(src_batch, dtype=torch.long) # (B,S) tgt_in = torch.tensor(tgt_in_batch, dtype=torch.long) # (B,T) tgt_out = torch.tensor(tgt_out_batch, dtype=torch.long) # (B,T) src_pad_mask = src.eq(PAD_IDX) # (B,S) True=PAD tgt_pad_mask = tgt_in.eq(PAD_IDX) # (B,T) True=PAD print("src",src) print("tgt_in",tgt_in) print("tgt_out",tgt_out) print("src_pad_mask",src_pad_mask) print("tgt_pad_mask",tgt_pad_mask) return src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask # -------------------------- # 4) DataLoader:定义“如何按批取样本”并演示输出 # -------------------------- def main(): dataset = ToyDataset(pairs) for i in range(len(dataset)): print("dataset",dataset.__getitem__(i)) loader = DataLoader( dataset, batch_size=2, shuffle=True, num_workers=0, # 跨平台演示,用 0;Linux 可调大 collate_fn=collate_fn, pin_memory=False, ) # EPOCH=40 # for epoch in range(EPOCH): # for src, tgt_in, tgt_out, src_mask, tgt_mask in loader: # 前向、loss、反传、优化 total_steps = 1000 data_iter = iter(loader) for step in range(total_steps): try: src, tgt_in, tgt_out, src_mask, tgt_mask = next(data_iter) except StopIteration: # 当前迭代器用尽,重建一个新的(相当于进入新一轮) data_iter = iter(loader) src, tgt_in, tgt_out, src_mask, tgt_mask = next(data_iter) print("src",src) print("tgt_in",tgt_in) print("tgt_out",tgt_out) print("src_mask",src_mask) print("tgt_mask",tgt_mask) if __name__ == "__main__": main() iter(loader): 把可迭代的 DataLoader 变成“批次迭代器”。 next(iterator): 从该迭代器中取“下一个批次”。第一次调用就是“第一个 batch”。 it = iter(loader) batch1 = next(it) batch2 = next(it) 在 shuffle=True 时,每次 iter(loader) 相当于开始“新的一轮遍历”,顺序会重新洗牌;drop_last、num_workers、pin_memory 等参数会影响批次数量、并行加载与传输性能。 当然除了用next迭代,还是用for循环的方式,如下: for epoch in range(EPOCH): for src, tgt_in, tgt_out, src_mask, tgt_mask in loader: print("src", src) print("tgt_in", tgt_in) print("tgt_out", tgt_out) print("src_mask", src_mask) print("tgt_mask", tgt_mask) -
数据维度
维度是什么 维度=数据需要“几个”索引才能定位到一个元素,也叫做轴数(axis)或阶(rank)。 可以看成"套盒子"的层数,盒子里面装盒子,再装数字。每多一层外括号/分类,就多一维。 0维=一个数;1维=一排数;2维=表格;3维=一摞表格;更高维=外面再套一层一层分类; 判断有几个维度的方法: 获取一个元素需要几个索引才能定位到。 多一层外括号=多一维;形状从外到内写“有多少个”。(外层是更粗粒度的分类,写在前面,如小批量彩色图像 (B, C, H, W);批次B、通道C、高H、宽W) 1D: ──●──●──●── 一条线 2D: 行×列 一张表 ┌───────┐ │● ● ● │ │● ● ● │ └───────┘ 3D: 多张2D表叠成“砖块” 从0到多维的例子 0维(标量):单个数 42 标量 shape:(), 只要“指它自己”就能找到,例:体温36.5。 1维(向量):一排数 [3, 5, 8] 向量shape:(N),需要1个索引(第几个)才能定位。 2维(矩阵/表格):多排多列 [ [1, 2, 3], [4, 5, 6] [7, 8, 9]] 矩阵shape:(R,C),需要2个索引(第几行,第几列)才能定位到。 3维度(立体):多张矩阵堆叠 [ [[1,2,3], [4,5,6]], [[7,8,9], [11,12,13]] ] 或 层0: [ [...], [...], ... ] 层1: [ [...], [...], ... ] ... 立体shape:(D,R,C),需要3个索引(第几层、第几行、第几列)才能定位到。 n维(张量):继续外面套一层索引 如4维度,小批量彩色图像 (B, C, H, W);批次B、通道C、高H、宽W 深度学习场景维度含义 图像/CNN:(B,C,H,W),B为batch个数,C为图像通道,H为图像高度,W为图像宽度。 文本/transformer:(B,S,C),Batch size,批大小。一次前向里同时处理的样本数。S有时也写作L,Sequence length,序列长度/时间步数(NLP 的 token 数、语音/时序的帧数)。在图像等场景里,若把二维特征展平成序列,也可表示展平后的步数。Channels/Features,特征维度。NLP 里常指 embedding 或 d_model;CV 里指通道数;时序里指每步的特征维度。[B, S, C] 通常表示“B 个样本,每个样本有 S 个时间步/位置,每个时间步有 C 维特征”。 怎么理解C(特征维/通道数)? 在一个张量形状 [B, S, C] 中,C 表示“每个位置(序列中的每个 token/时间步)所携带的特征向量维度”。也就是“描述一个位置所需的数值属性个数”。 表达能力上限: C 越大,单个位置能承载的信息越丰富(更“宽”的向量空间),可拟合更复杂的模式。 稳定性与信息瓶颈: 太小的 C 可能造成信息瓶颈,难以表达远距离依赖或复杂结构。 计算与显存代价: 层内线性/注意力的主计算大多与 C^2 成正比,激活占用与 BLC 成正比。增大 C 会显著提高计算/显存成本。 x.dim() # 轴数,也就是多少个维度。 x.shape # 形状,如 (B,L,C) x.size(-1) # 最后一维长度