lerobot ACT实现分析

🕒 2025-08-04 📁 lerobot 👤 laumy 🔥 117 热度

配置类ACTConfig

@PreTrainedConfig.register_subclass("act")
@dataclass
class ACTConfig(PreTrainedConfig):
    # 输入/输出结构
    chunk_size: int = 100  # 动作块长度(每次预测的动作序列长度)
    n_action_steps: int = 100  # 每次策略调用执行的动作步数(≤ chunk_size)
    temporal_ensemble_coeff: float | None = None  # 时序集成系数(None表示禁用)

    # VAE配置
    use_vae: bool = True  # 是否启用VAE(增强动作多样性)
    latent_dim: int = 32  # 潜在空间维度
    kl_weight: float = 10.0  # KL散度损失权重

    # Transformer配置
    dim_model: int = 512  # Transformer隐藏维度
    n_heads: int = 8  # 注意力头数
    n_encoder_layers: int = 4  # 编码器层数
    n_decoder_layers: int = 1  # 解码器层数(原始实现bug,仅用第1层)

    # 视觉Backbone
    vision_backbone: str = "resnet18"  # 图像特征提取网络
    ......

ACTConfig是ACT算法核心配置类,主要定义了模型结构、输入输出格式、训练参数和推理逻辑等。

输入/输出结构

参数主要配置模型输入观测、输出动作的基本格式,是连接环境与模型的桥梁。

# Input / output structure.
n_obs_steps: int = 1  # 输入观测的时间步数(当前仅支持1步观测,即当前时刻观测)
chunk_size: int = 100  # 动作块长度:每次预测的连续动作序列长度(核心参数,决定分块粒度)
n_action_steps: int = 100  # 每次策略调用执行的动作步数(≤ chunk_size,默认与chunk_size一致,即一次执行整段动作块)

normalization_mapping: dict[str, NormalizationMode] = field(
    default_factory=lambda: {
        "VISUAL": NormalizationMode.MEAN_STD,  # 图像特征归一化:减均值除标准差
        "STATE": NormalizationMode.MEAN_STD,   # 状态特征(如机器人关节角)归一化:同上
        "ACTION": NormalizationMode.MEAN_STD,  # 动作归一化:同上(确保训练时输入分布稳定)
    }
)
  • chunk_size 是 ACT 算法的核心设计:将长时序动作生成分解为固定长度的“动作块”(如100步),避免一次性规划整个任务序列,降低计算复杂度。
  • n_action_steps 控制每次策略调用后实际执行的动作步数。例如,若 chunk_size=100 且 n_action_steps=50,则模型预测100步动作,执行前50步,丢弃后50步(适用于需要频繁重新规划的场景)。

架构配置

从此前的具身智能ACT算法我们知道ACT模型算法主要是基于transformer结构,从实现上模型的核心组件可以分为视觉backbone、transformer、VAE结构。

(1)视觉backbone配置

# Vision backbone.
vision_backbone: str = "resnet18"  # 视觉特征提取网络:使用ResNet18(轻量级,适合实时控制)
pretrained_backbone_weights: str | None = "ResNet18_Weights.IMAGENET1K_V1"  # 预训练权重:使用ImageNet-1K预训练参数初始化,提升特征提取能力
replace_final_stride_with_dilation: int = False  # 是否用空洞卷积替换ResNet的最终2x2 stride(默认关闭,保持特征图分辨率)

上面的参数是ACT算法中用于图像特征提取模块的核心配置,影响模型对视觉输入的理解能力和计算效率。

首先指定了图像特征提取的骨干网络为resnet18,其仅有18层网络,参数量约1100万,常用于实时机器人控制场景,ResNet是通过残差连接缓解深层网络梯度消失问题,能有效提取多尺度图像特征包括边缘纹理到语义信息。视觉 Backbone 的输出(如 ResNet-18 的 layer4 特征图)会被展平为序列,与机器人状态、潜在向量等多模态特征拼接后输入 Transformer 编码器。

其次指定了resnet18预训练权重的来源,默认使用使用 ImageNet-1K 数据集预训练的权重。

最后的replace_final_stride_with_dilation默认关闭,主要是控制是否用“空洞卷积”替换resnet最后一层的2*2步幅卷积。关闭空洞卷积适合对实时性要求高、特征分辨率要求低的场景,如粗粒度抓取任务。如果打开可保留更多的空洞细节(如物体边缘、纹理),适合精细操作如螺丝拧入、零件对齐、但是需要权衡计算量增加和内存的占用。

(2)transformer配置

# Transformer layers.
pre_norm: bool = False  # Transformer块归一化位置:False=后归一化(原始ACT实现),True=前归一化(更稳定但需调参)
dim_model: int = 512    # Transformer隐藏层维度(特征维度)
n_heads: int = 8        # 多头注意力头数(8头,总注意力维度=512/8=64/头)
dim_feedforward: int = 3200  # 前馈网络中间维度(通常为dim_model的4-6倍,此处3200=512*6.25)
feedforward_activation: str = "relu"  # 前馈网络激活函数(ReLU,原始ACT实现)
n_encoder_layers: int = 4  # Transformer编码器层数(4层,用于融合多模态输入特征)
# 注:原始ACT实现中n_decoder_layers=7,但因代码bug仅使用第1层,此处对齐原始实现设为1
n_decoder_layers: int = 1  # Transformer解码器层数(1层,用于生成动作块序列)

上面参数定义了ACT算法中Transformer 编码器/解码器的核心结构参数,直接决定模型的序列建模能力、计算效率和特征融合效果。

  • pre_norm: 归一化位置 ,False=原始行为,True=训练更稳定(需重新调参),若训练发散,可尝试设为 True。
  • dim_model:特征维度(模型容量),增大→更强表达能力,但计算/内存成本平方级增长,机器人实时场景建议 ≤ 1024。
  • n_heads:注意力并行头数,增多更细粒度关注,但通信开销增大 ,保持 dim_model/n_heads = 64(如 512/8=64)。
  • n_encoder_layers: 特征融合深度,增多融合更充分,但推理延迟增加,机械臂操作建议 4-6 层。
  • n_decoder_layers: 动作生成深度,受原始 bug 限制,固定为 1 以对齐行为,若修复原始 bug,可尝试增至 3-4 层。

(3)VAE变分自编码配置

# VAE.
use_vae: bool = True          # 是否启用VAE(默认启用,通过潜在空间建模动作分布)
latent_dim: int = 32          # VAE潜在空间维度(32维,压缩动作序列信息)
n_vae_encoder_layers: int = 4 # VAE编码器层数(4层Transformer,用于将动作块编码为潜在分布)

(4)推理配置

# Inference.
# Note: ACT原论文中启用时序集成时默认值为0.01
temporal_ensemble_coeff: float | None = None  # 时序集成系数:None=禁用,>0=启用(指数加权平均平滑动作)

该参数就是是否启动ACT的Temporal Ensembling机制,时序集成(Temporal Ensembling) 功能的启用与权重计算方式,用于在推理时平滑动作序列,避免机器人执行突变动作(尤其适用于精细操作如机械臂抓取、插入等任务)。

要启动Temporal Ensembling机制需显式设置该参数为非 None 的浮点值(如 0.01),且需满足n_action_steps 必须设为 1(每次策略调用仅执行 1 步动作,确保每步都通过集成优化)。

当 temporal_ensemble_coeff = α(如 0.01)时,ACTTemporalEnsembler 会对连续多轮预测的动作块(chunk_size 长度)进行 指数加权平均。

(5)训练损失配置

    # Training and loss computation.
    dropout: float = 0.1
    kl_weight: float = 10.0

dropout控制 Transformer 层的 随机失活概率,用于正则化,防止模型过拟合训练数据。在训练过程中,以 dropout 概率(此处 10%)随机将 Transformer 层(如多头注意力输出、前馈网络输出)的部分神经元激活值设为 0,强制模型学习更鲁棒的特征(不依赖特定神经元组合)

kl_weight控制 KL 散度损失(KL-divergence Loss) 的权重,仅在启用 VAE(use_vae=True,默认启用)时生效。10.0 是一个较大的权重,表明原始 ACT 实现中更注重约束潜在分布的“规范性”(接近标准正态),以确保 VAE 能生成多样化的动作序列,避免模型仅记忆训练数据中的动作模式。

(6)训练优化

    # Training preset
    optimizer_lr: float = 1e-5
    optimizer_weight_decay: float = 1e-4
    optimizer_lr_backbone: float = 1e-5

optimizer_lr控制除视觉Backbone外所有参数(如Transformer编码器/解码器、VAE层等)的梯度更新步长。学习率过大会导致训练不稳定(Loss震荡),过小则收敛缓慢。1e-5(0.00001)是训练Transformer类模型的经典学习率(如BERT、GPT等),尤其适用于。

optimizer_weight_decay权重衰减(L2正则化)系数,用于抑制过拟合。过小(如1e-5):正则化不足,易过拟合训练数据。过大(如1e-3):过度抑制参数更新,导致模型欠拟合。适用于机器人操作任务,训练数据通常包含噪声(如传感器误差、动作抖动),权重衰减可提升模型对噪声的鲁棒性。

optimizer_lr_backbone视觉Backbone(如ResNet18)参数的专用学习率。ACT原论文中Backbone与主模型联合训练,未使用更小的Backbone学习率。

在实际的工程中,可在get_optim_params 中显式区分Backbone与非Backbone参数,应用不同学习率:

def get_optim_params(self) -> dict:
    return [
        {
            "params": [p for n, p in self.named_parameters() if not n.startswith("model.backbone") and p.requires_grad],
            "lr": self.config.optimizer_lr,  # 主参数学习率
        },
        {
            "params": [p for n, p in self.named_parameters() if n.startswith("model.backbone") and p.requires_grad],
            "lr": self.config.optimizer_lr_backbone,  # Backbone专用学习率
        },
    ]

策略入口类ACTPolicy

初始化逻辑

class ACTPolicy(PreTrainedPolicy):
    def __init__(self, config: ACTConfig, dataset_stats=None):
        super().__init__(config)
        # 输入/输出归一化(标准化数据分布)
        self.normalize_inputs = Normalize(config.input_features, config.normalization_mapping, dataset_stats)
        self.unnormalize_outputs = Unnormalize(config.output_features, config.normalization_mapping, dataset_stats)

        self.model = ACT(config)  # 加载ACT神经网络

        # 初始化时序集成器(若启用)
        if config.temporal_ensemble_coeff is not None:
            self.temporal_ensembler = ACTTemporalEnsembler(config.temporal_ensemble_coeff, config.chunk_size)

        self.reset()  # 重置动作队列/集成器

这段代码定义了一个基于Action Chunking Transformer (ACT)的策略类,主要用于机器人操作任务的动作生成。

self.normalize_inputs、self.normalize_targets、self.unnormalize_outputs。这3个参数用于数据预处理和后处理的关键组件,负责输入特征的归一化、目标动作的归一化以及模型输出动作的反归一化。

根据config.temporal_ensemble_coeff条件来判断是否初始化temporal ensembler,用于联系预测的动作块进行加权平均,提升动作输出的稳定性。ACTTemporalEnsembler通过指数权重(exp(-temporal_ensemble_coeff * i))对历史动作进行加权, older动作权重更高(原论文默认系数0.01)。

推理逻辑

select_action是ACTPolicy类的核心方法,主要的目的就是根据环境观测(batch)然后预测输出机器人要执行的动作。生成预测的动作有两种模式,一个是启用temporal ensemble方式另外一种不启用。

在进入预测生成动作之前先调用self.eval()强制策略进入评估模式(推理),因为策略处于训练模式(启用dropout等)。

(1)时间集成模式

def select_action(self, batch: dict[str, Tensor]) -> Tensor:
    if self.config.temporal_ensemble_coeff is not None:
        actions = self.predict_action_chunk(batch)  # 生成动作块
        action = self.temporal_ensembler.update(actions)  # 时序集成平滑
        return action

开启temporal ensemble:根据配置中temporal_ensemble_coeff条件优先走temporal ensemble模式。该模式先调用self.predict_action_chunk(batch)调用模型预测一个动作块((batch_size, chunk_size, action_dim)),即一次性预测多个连续动作。然后调用self.temporal_ensembler.update(actions)通过时间集成器对动作块进行加权平滑( older 动作权重更高,原论文默认系数 0.01),输出单个稳定动作。主要的目的就是论文中的减少动作抖动,提升机器人控制平滑性。需要注意的是如果开启了该模式,n_action_steps 必须为 1,否则会破坏集成器的时序加权逻辑。

时间集成核心实现

class ACTTemporalEnsembler:
    def __init__(self, temporal_ensemble_coeff: float, chunk_size: int):
        # 指数权重:w_i = exp(-coeff * i),i为动作索引(0为最旧动作)
        self.ensemble_weights = torch.exp(-temporal_ensemble_coeff * torch.arange(chunk_size))
        self.ensemble_weights_cumsum = torch.cumsum(self.ensemble_weights, dim=0)  # 权重累加和(用于归一化)

    def update(self, actions: Tensor) -> Tensor:
        # actions: (batch_size, chunk_size, action_dim)
        if self.ensembled_actions is None:
            self.ensembled_actions = actions.clone()  # 初始化集成动作
        else:
            # 在线加权更新:历史动作 * 累计权重 + 新动作 * 当前权重,再归一化
            self.ensembled_actions *= self.ensemble_weights_cumsum[self.ensembled_actions_count - 1]
            self.ensembled_actions += actions[:, :-1] * self.ensemble_weights[self.ensembled_actions_count]
            self.ensembled_actions /= self.ensemble_weights_cumsum[self.ensembled_actions_count]
        return self.ensembled_actions[:, 0]  # 返回集成后的首步动作

(2)动作队列模式

def select_action(self, batch: dict[str, Tensor]) -> Tensor:
    if len(self._action_queue) == 0:
        # 生成动作块(chunk_size步),取前n_action_steps步存入队列
        actions = self.predict_action_chunk(batch)[:, :self.config.n_action_steps]
        # 队列形状:(n_action_steps, batch_size, action_dim),故转置后入队
        self._action_queue.extend(actions.transpose(0, 1))
    return self._action_queue.popleft()  # 每次弹出队列首步动作

关闭temporal ensemble:未启用时间集成器是,使用简单的动作队列缓存动作块并逐步输出。首先调用调用 predict_action_chunk 获取动作块,这里将会输出一个chunk的动作。但是并不是把这个chunk的集合全都送入队列,而是截取前 n_action_steps 个动作(n_action_steps 为每次预测的动作步数,通常 ≤ chunk_size)。举个例子如果chunk_size是100,但是n_action_steps是50,那么策略一次预测出100个序列动作,但是只取前面的50个。最后把这动作块进行转置后加入队列,之所以转置是因为模型输出动作块形状为 (batch_size, n_action_steps, action_dim),而队列需要按时间步顺序存储(即 (n_action_steps, batch_size, action_dim)),因此通过 transpose(0, 1) 交换前两维。

为什么预测了chunk块,要用n_action_steps做限制了?

可能是因为利用了批量推理的效率,避免因动作块过长导致环境状态变化(如物体移动、机器人位姿偏移)时动作失效。同时限制单次执行的动作步数,强制模型在 n_action_steps 步后重新推理(基于最新观测),确保动作与环境状态同步。

训练损失计算

def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict]:
    batch = self.normalize_inputs(batch)  # 输入归一化
    batch = self.normalize_targets(batch)  # 目标动作归一化

    actions_hat, (mu, log_sigma_x2) = self.model(batch)  # 模型输出:预测动作、VAE分布参数

    # L1损失(忽略填充动作)
    l1_loss = (F.l1_loss(batch[ACTION], actions_hat, reduction="none") * ~batch["action_is_pad"].unsqueeze(-1)).mean()
    loss_dict = {"l1_loss": l1_loss.item()}

    # VAE KL散度损失(若启用)
    if self.config.use_vae:
        mean_kld = (-0.5 * (1 + log_sigma_x2 - mu.pow(2) - log_sigma_x2.exp())).sum(-1).mean()
        loss_dict["kld_loss"] = mean_kld.item()
        loss = l1_loss + mean_kld * self.config.kl_weight  # 总损失 = 重构损失 + KL权重 * KL损失
    else:
        loss = l1_loss
    return loss, loss_dict

这是ACTPolicy类训练模型的接口,负责接收输入数据、通过模推理生成动作预测、计算损失并返回总损失及损失组件字典。

首先对输入的观测数据进行归一化处理,normalize_inputs 基于数据集统计信息(均值、标准差)将输入特征缩放到标准分布(通常均值为0、方差为1),确保模型训练时输入数据分布稳定。

接着将图像特征统一整理到到 batch[OBS_IMAGES] 列表中,便于模型后续提取图像特征。

其次调用self.model(batch)进行模型推理返回模型预测的归一化动作序列,已经如果启用了VAE返回latent分布的均值和对数方差。

计算预测动作(actions_hat)与真实动作(batch[ACTION])的 L1 损失(平均绝对误差)。KL散度的理论意义在于度量两个概率分布之间的差异程度,当KL散度越大的时候,说明两者的差异程度越大;而当KL散度小的时候,则说明两者的差异程度小。如果两者相同的话,则该KL散度应该为0。如果启动了VAE,需要再计算KL散度,总的损失为L1损失与加权KL损失知乎,其中kl_weight是控制损失权重的超参数。如果没有启动VAE直接返回L1损失。

核心算法ACT

整体结构

class ACT(nn.Module):
    def __init__(self, config: ACTConfig):
        super().__init__()
        self.config = config

        # VAE编码器(可选):将动作序列编码为潜在分布
        if config.use_vae:
            self.vae_encoder = ACTEncoder(config, is_vae_encoder=True)
            self.vae_encoder_latent_output_proj = nn.Linear(config.dim_model, config.latent_dim * 2)  # 输出mu和log(sigma²)

        # 视觉Backbone:ResNet提取图像特征
        if config.image_features:
            backbone_model = getattr(torchvision.models, config.vision_backbone)(weights=config.pretrained_backbone_weights)
            self.backbone = IntermediateLayerGetter(backbone_model, return_layers={"layer4": "feature_map"})  # 取layer4特征图

        # Transformer编码器-解码器
        self.encoder = ACTEncoder(config)  # 处理多模态输入(图像、状态、潜在向量)
        self.decoder = ACTDecoder(config)  # 生成动作块
        self.action_head = nn.Linear(config.dim_model, config.action_feature.shape[0])  # 动作输出头

这段代码是ACT类的构造函数,主要是负责初始化模型的核心组件,包括VAE编码器、视觉backbone、transformer编码器/解码器、输入投影层、位置嵌入和动作预测头等。

(1)VAE编码器初始化

if self.config.use_vae:
    self.vae_encoder = ACTEncoder(config, is_vae_encoder=True)  # VAE 编码器(Transformer 架构)
    self.vae_encoder_cls_embed = nn.Embedding(1, config.dim_model)  # CLS 标记嵌入(用于 latent 分布参数)
    # 机器人状态投影层:将关节状态特征映射到模型隐藏维度
    if self.config.robot_state_feature:
        self.vae_encoder_robot_state_input_proj = nn.Linear(
            self.config.robot_state_feature.shape[0], config.dim_model
        )
    # 动作投影层:将动作特征映射到模型隐藏维度
    self.vae_encoder_action_input_proj = nn.Linear(
        self.config.action_feature.shape[0], config.dim_model
    )
    # Latent 分布投影层:将 VAE 编码器输出映射为 latent 均值和方差(维度=2*latent_dim)
    self.vae_encoder_latent_output_proj = nn.Linear(config.dim_model, config.latent_dim * 2)
    # 固定正弦位置嵌入:为 VAE 编码器输入序列添加位置信息(CLS + 机器人状态 + 动作序列)
    num_input_token_encoder = 1 + config.chunk_size  # 1(CLS) + chunk_size(动作序列长度)
    if self.config.robot_state_feature:
        num_input_token_encoder += 1  # 若包含机器人状态,增加 1 个 token
    self.register_buffer(
        "vae_encoder_pos_enc",  # 注册为缓冲区(不参与梯度更新)
        create_sinusoidal_pos_embedding(num_input_token_encoder, config.dim_model).unsqueeze(0),
    )

调用ACTEncoder初始化一个VAE编码器,本质是一个transformer编码器,其参数is_vae_encoder=True 标志用于区分该编码器为 VAE 专用(影响层数、注意力机制等配置,具体见 ACTEncoder 实现)。

定义一个可学习的CLS标记,类似BERT中的[CLS],用于聚合VAE编码器输入序列的全局信息,最终生成latent分布参数(均值和方差),nn.Embedding(1, config.dim_model) 创建一个单元素嵌入表,输出维度为模型隐藏维度 dim_model。

当输入包含机器人状态特征(如关节角度、速度)时启用。通过线性层将机器人状态特征(原始维度)映射到模型隐藏维度 dim_model,确保与其他输入 token(如动作序列)维度一致,可拼接为序列输入。

同理将动作序列中的每个动作(原始维度,如机器人关节控制维度)通过线性层映射到 dim_model,转换为 Transformer 可处理的 token 序列。

将 VAE 编码器输出的 CLS 标记特征(维度 dim_model)映射到 latent 分布的参数空间。

最后的固定正弦位置嵌入,其作用是为 VAE 编码器的输入序列添加固定位置信息,帮助 Transformer 区分不同位置的 token(CLS、机器人状态、动作序列中的不同时间步)。

(2)视觉backbone初始化

if self.config.image_features:
    backbone_model = getattr(torchvision.models, config.vision_backbone)(
        replace_stride_with_dilation=[False, False, config.replace_final_stride_with_dilation],  # 控制最后一层是否使用空洞卷积
        weights=config.pretrained_backbone_weights,  # 预训练权重(如 ImageNet)
        norm_layer=FrozenBatchNorm2d,  # 冻结 BatchNorm 层(避免微调时破坏预训练分布)
    )
    # 提取 ResNet 的 layer4 输出作为图像特征图(高层语义特征)
    self.backbone = IntermediateLayerGetter(backbone_model, return_layers={"layer4": "feature_map"})

当ACT类中配置包含图像特征,初始化图像特征提取骨干网络,并通过 IntermediateLayerGetter 提取高层视觉特征供后续 Transformer 处理。

首先调用getattr动态加载 torchvision.models 中的 ResNet 模型(如 resnet18、resnet50),具体型号由配置 config.vision_backbone 指定。

然后使用 torchvision.ops.misc.IntermediateLayerGetter 从 ResNet 中提取指定层的输出,作为图像的高层特征。return_layers={“layer4”: “feature_map”}指定提取 ResNet 的 layer4(最后一个残差块)输出,并将其重命名为 feature_map。ResNet 的 layer4 输出包含最抽象的视觉语义信息(如物体轮廓、纹理),是下游任务(如 Transformer 编码)的关键输入。self.backbone 调用时返回字典 {“feature_map”: tensor},其中 tensor 为形状 (B, C, H, W) 的特征图(B 为 batch 大小,C 为通道数,H/W 为特征图高/宽)。

(3)transformer编码器/解码器初始化

# Transformer 编码器:处理输入特征(latent、机器人状态、环境状态、图像特征)
self.encoder = ACTEncoder(config)
# Transformer 解码器:生成动作序列(作为 VAE 解码器时,输入为 latent;否则直接处理编码器输出)
self.decoder = ACTDecoder(config)

这两行代码是初始化ACT核心组件transformer编码器和解码器。

(4)输入投影层

# 机器人状态投影:将机器人关节状态特征(如关节角度、速度)映射到 dim_model
if self.config.robot_state_feature:
    self.encoder_robot_state_input_proj = nn.Linear(
        self.config.robot_state_feature.shape[0], config.dim_model
    )
# 环境状态投影:将环境状态特征(如物体位置)映射到 dim_model
if self.config.env_state_feature:
    self.encoder_env_state_input_proj = nn.Linear(
        self.config.env_state_feature.shape[0], config.dim_model
    )
# Latent 投影:将 VAE 输出的 latent 向量映射到 dim_model
self.encoder_latent_input_proj = nn.Linear(config.latent_dim, config.dim_model)
# 图像特征投影:通过 1x1 卷积将 Backbone 输出的特征图(C×H×W)映射到 dim_model
if self.config.image_features:
    self.encoder_img_feat_input_proj = nn.Conv2d(
        backbone_model.fc.in_features,  # Backbone 输出通道数(如 ResNet18 的 layer4 输出为 512)
        config.dim_model, 
        kernel_size=1  # 1x1 卷积不改变空间维度,仅调整通道数
    )

在 Transformer 编码器中,要求所有输入 token 具有相同的维度(dim_model),而不同输入特征(状态、图像、latent 等)的原始维度各异,投影层通过线性/卷积变换实现维度对齐。
投影层(Projection Layer) 是一类用于将不同类型的输入特征(如机器人状态、环境状态、 latent 向量、图像特征等)映射到统一维度的神经网络层。其核心作用是将原始输入特征的维度转换为 Transformer 编码器能够处理的隐藏维度(即代码中的 config.dim_model),确保多模态输入(如状态、图像)能被编码器统一处理。

在ACT类中定义了多个投影层

  • self.config.robot_state_feature:输入为机器人状态特征(如关节角度、速度),原始维度为 self.config.robot_state_feature.shape[0],通过线性层(nn.Linear)将机器人状态的原始维度映射到 dim_model,使其成为 Transformer 编码器可接收的 token。
  • self.config.env_state_feature:环境状态特征(如物体位置、场景参数),原始维度为 self.config.env_state_feature.shape[0],与机器人状态投影层类似,通过线性层将环境状态映射到 dim_model,实现多模态特征的维度统一。
  • self.encoder_latent_input_proj:输入是Latent 向量(来自 VAE 采样或零向量),维度为 config.latent_dim,将 latent 向量从 latent 空间维度映射到 dim_model,作为 Transformer 编码器的核心输入 token 之一。
  • self.encoder_img_feat_input_proj:输入是图像特征图(来自 ResNet 骨干网络的 layer4 输出),通道数为 backbone_model.fc.in_features(如 ResNet18 为 512),通过 1×1 卷积层(nn.Conv2d)将图像特征图的通道数调整为 dim_model,同时保持空间维度(H×W)不变,以便展平为序列 token 输入 Transformer。

(5)位置嵌入

# 1D 位置嵌入:用于 latent、机器人状态、环境状态等非图像特征(共 n_1d_tokens 个 token)
n_1d_tokens = 1  # latent 占 1 个 token
if self.config.robot_state_feature:
    n_1d_tokens += 1  # 机器人状态占 1 个 token
if self.config.env_state_feature:
    n_1d_tokens += 1  # 环境状态占 1 个 token
self.encoder_1d_feature_pos_embed = nn.Embedding(n_1d_tokens, config.dim_model)  # 可学习的 1D 位置嵌入

# 2D 位置嵌入:用于图像特征图(H×W 空间位置)
if self.config.image_features:
    self.encoder_cam_feat_pos_embed = ACTSinusoidalPositionEmbedding2d(config.dim_model // 2)  # 正弦 2D 位置嵌入

位置嵌入是 Transformer 的关键组件,用于解决自注意力机制 对输入序列顺序不敏感 的问题。本代码中有一个1D特征位置嵌入层和图像特征位置嵌入式层。

  • 1D 特征位置嵌入式层:为 1D 特征 token 提供 可学习的位置嵌入,帮助 Transformer 区分不同类型 token 的位置(如 latent 是第 1 个 token,机器人状态是第 2 个等)。
  • 2D 图像特征位置嵌入层:为图像特征图的 2D 空间像素 提供 正弦位置嵌入,编码像素在特征图中的 (高度, 宽度) 空间位置信息。

(6)解码器位置嵌入与动作预测头

self.decoder_pos_embed = nn.Embedding(config.chunk_size, config.dim_model)  # chunk_size 为动作序列长度

self.action_head = nn.Linear(config.dim_model, self.config.action_feature.shape[0])
  • self.decoder_pos_embed:为解码器生成的动作序列(action chunk)提供 可学习的位置嵌入,帮助 Transformer 解码器区分动作序列中不同时间步的位置信息(如第 1 个动作、第 2 个动作等)。
  • self.action_head:将解码器输出的高维特征(config.dim_model 维度)投影到实际动作空间维度,生成最终可执行的动作序列。

forward方法

forward负责执行 Action Chunking Transformer 的完整前向传播流程,涵盖 VAE 编码(可选)、多模态输入处理、Transformer 编码器-解码器计算,最终输出动作序列及潜在变量分布参数(若启用 VAE)。以下是分步骤解析。

(1)输入验证与 batch_size 确定

if self.config.use_vae and self.training:
    assert "action" in batch, "actions must be provided when using the variational objective in training mode."

if "observation.images" in batch:
    batch_size = batch["observation.images"][0].shape[0]
else:
    batch_size = batch["observation.environment_state"].shape[0]

若启用变分目标(VAE)且处于训练模式,需确保输入包含动作序列(”action”),因为 VAE 编码器需以动作序列为目标数据。确定batch_size,根据输入模态(图像或环境状态)确定批次大小,确保后续张量操作维度对齐。

(2) Latent 向量生成(VAE 编码逻辑)

# 构建 VAE 编码器输入:[cls_token, 机器人状态(可选), 动作序列]
cls_embed = einops.repeat(self.vae_encoder_cls_embed.weight, "1 d -> b 1 d", b=batch_size)  # (B, 1, D)
if self.config.robot_state_feature:
    robot_state_embed = self.vae_encoder_robot_state_input_proj(batch["observation.state"]).unsqueeze(1)  # (B, 1, D)
action_embed = self.vae_encoder_action_input_proj(batch["action"])  # (B, S, D)
vae_encoder_input = torch.cat([cls_embed, robot_state_embed, action_embed] if self.config.robot_state_feature else [cls_embed, action_embed], axis=1)  # (B, S+2, D) 或 (B, S+1, D)

# 添加固定正弦位置嵌入
pos_embed = self.vae_encoder_pos_enc.clone().detach().permute(1, 0, 2)  # (S+2, 1, D)

# 构建注意力掩码(忽略填充 token)
cls_joint_is_pad = torch.full((batch_size, 2 if self.config.robot_state_feature else 1), False, device=batch["observation.state"].device)
key_padding_mask = torch.cat([cls_joint_is_pad, batch["action_is_pad"]], axis=1)  # (B, S+2) 或 (B, S+1)

# VAE 编码器前向传播,提取 cls token 输出
cls_token_out = self.vae_encoder(vae_encoder_input.permute(1, 0, 2), pos_embed=pos_embed, key_padding_mask=key_padding_mask)[0]  # (B, D)
latent_pdf_params = self.vae_encoder_latent_output_proj(cls_token_out)  # (B, 2*latent_dim)
mu = latent_pdf_params[:, :self.config.latent_dim]  # 均值 (B, latent_dim)
log_sigma_x2 = latent_pdf_params[:, self.config.latent_dim:]  # 2*log(标准差) (B, latent_dim)

# 重参数化采样 latent 向量
latent_sample = mu + log_sigma_x2.div(2).exp() * torch.randn_like(mu)  # (B, latent_dim)

将动作序列编码为 latent 分布(均值 mu、方差相关参数 log_sigma_x2),并通过重参数化技巧采样得到 latent 向量,作为 Transformer 编码器的核心输入。

(3)无VAE时的latent向量初始化

mu = log_sigma_x2 = None
latent_sample = torch.zeros([batch_size, self.config.latent_dim], dtype=torch.float32).to(batch["observation.state"].device)

如果不启用VAE或非训练模式,直接使用零向量作为latent输入。

(4)transformer编码器输入构建

encoder_in_tokens = [self.encoder_latent_input_proj(latent_sample)]  # latent 投影:(B, latent_dim) → (B, dim_model)
encoder_in_pos_embed = list(self.encoder_1d_feature_pos_embed.weight.unsqueeze(1))  # 1D token 位置嵌入:(n_1d_tokens, 1, dim_model)

# 添加机器人状态 token(若启用)
if self.config.robot_state_feature:
    encoder_in_tokens.append(self.encoder_robot_state_input_proj(batch["observation.state"]))  # (B, dim_model)

# 添加环境状态 token(若启用)
if self.config.env_state_feature:
    encoder_in_tokens.append(self.encoder_env_state_input_proj(batch["observation.environment_state"]))  # (B, dim_model)

1D特征token处理,通过线性层(nn.Linear)将 latent 向量、机器人/环境状态的原始维度映射到模型隐藏维度 dim_model,确保各 token 维度一致。为每个 1D token(latent、状态)分配可学习的位置嵌入,编码其在序列中的位置信息。

if self.config.image_features:
    all_cam_features = []
    all_cam_pos_embeds = []
    for img in batch["observation.images"]:  # 遍历多相机图像
        # 骨干网络提取特征图(如 ResNet layer4 输出)
        cam_features = self.backbone(img)["feature_map"]  # (B, C_backbone, H, W)
        # 图像位置嵌入(2D 正弦位置编码)
        cam_pos_embed = self.encoder_cam_feat_pos_embed(cam_features).to(dtype=cam_features.dtype)  # (1, dim_model, H, W)
        # 特征投影:调整通道数至 dim_model
        cam_features = self.encoder_img_feat_input_proj(cam_features)  # (B, dim_model, H, W)
        # 展平为序列:(H*W, B, dim_model)
        cam_features = einops.rearrange(cam_features, "b c h w -> (h w) b c")
        cam_pos_embed = einops.rearrange(cam_pos_embed, "b c h w -> (h w) b c")
        all_cam_features.append(cam_features)
        all_cam_pos_embeds.append(cam_pos_embed)
    # 拼接多相机特征
    encoder_in_tokens.extend(torch.cat(all_cam_features, axis=0))
    encoder_in_pos_embed.extend(torch.cat(all_cam_pos_embeds, axis=0))

对于图像的特征输入,启用图像输入,通过视觉骨干网络提取特征并转换为序列 token。通过 1×1 卷积(encoder_img_feat_input_proj)将特征图通道数调整为 dim_model,再展平为序列 token(H*W 个像素 token)。通过 ACTSinusoidalPositionEmbedding2d 为像素 token 添加空间位置信息,编码其在特征图中的 (H, W) 坐标。

(5)transformer编码器-解码器前向传播

# 堆叠所有输入 token 和位置嵌入
encoder_in_tokens = torch.stack(encoder_in_tokens, axis=0)  # (seq_len, B, dim_model)
encoder_in_pos_embed = torch.stack(encoder_in_pos_embed, axis=0)  # (seq_len, 1, dim_model)

# 编码器前向传播
encoder_out = self.encoder(encoder_in_tokens, pos_embed=encoder_in_pos_embed)  # (seq_len, B, dim_model)

上面为编码器输出,输入序列为包含 1D 特征 token(latent、状态)和图像像素 token,总长度为 seq_len = n_1d_tokens + sum(H*W for 各相机)。通过自注意力机制融合多模态输入,输出包含全局上下文的特征序列 encoder_out。

# 解码器输入初始化为零向量(类似 DETR 的目标查询)
decoder_in = torch.zeros((self.config.chunk_size, batch_size, self.config.dim_model), dtype=encoder_in_pos_embed.dtype, device=encoder_in_pos_embed.device)  # (chunk_size, B, dim_model)

# 解码器前向传播(交叉注意力融合编码器输出)
decoder_out = self.decoder(
    decoder_in,
    encoder_out,
    encoder_pos_embed=encoder_in_pos_embed,  # 编码器位置嵌入
    decoder_pos_embed=self.decoder_pos_embed.weight.unsqueeze(1),  # 解码器动作序列位置嵌入
)  # (chunk_size, B, dim_model)

# 转换维度并投影到动作空间
decoder_out = decoder_out.transpose(0, 1)  # (B, chunk_size, dim_model)
actions = self.action_head(decoder_out)  # (B, chunk_size, action_dim)

return actions, (mu, log_sigma_x2)

解码器部分,初始化为零向量序列(长度 chunk_size,即一次预测的动作数量),类似 DETR 的“目标查询”。解码器通过交叉注意力机制关注编码器输出的上下文特征,生成动作序列特征。通过 action_head(线性层)将解码器输出的高维特征投影到机器人动作空间维度(action_dim),得到最终动作序列。

最终返回actions和(mu, log_sigma_x2)。前者是形状 (B, chunk_size, action_dim),预测的动作序列;后者是若启用 VAE,返回 latent 分布的均值和方差参数(log_sigma_x2 = 2*log(σ)),否则为 (None, None)。

ACT编码器

ACTEncoder

ACTEncoder 是 Transformer 编码器的顶层容器,负责堆叠多个 ACTEncoderLayer(编码器层)并执行最终归一化,支持 VAE 编码器 和 主 Transformer 编码器 两种角色。

class ACTEncoder(nn.Module):
    def __init__(self, config: ACTConfig, is_vae_encoder: bool = False):
        super().__init__()
        self.is_vae_encoder = is_vae_encoder
        # 根据角色选择编码器层数(VAE 编码器 vs 主编码器)
        num_layers = config.n_vae_encoder_layers if self.is_vae_encoder else config.n_encoder_layers
        # 堆叠 num_layers 个编码器层
        self.layers = nn.ModuleList([ACTEncoderLayer(config) for _ in range(num_layers)])
        # 最终归一化(预归一化模式下启用)
        self.norm = nn.LayerNorm(config.dim_model) if config.pre_norm else nn.Identity()

通过 is_vae_encoder 区分角色,分别使用 n_vae_encoder_layers(VAE 专用层数)或 n_encoder_layers(主编码器层数)。通过 nn.ModuleList 管理多个 ACTEncoderLayer,形成深度编码器结构。若 config.pre_norm=True(预归一化),对所有层输出做最终归一化;否则使用 nn.Identity(无操作),此时归一化在每层内部完成(后归一化)。

def forward(
    self, x: Tensor, pos_embed: Tensor | None = None, key_padding_mask: Tensor | None = None
) -> Tensor:
    for layer in self.layers:
        x = layer(x, pos_embed=pos_embed, key_padding_mask=key_padding_mask)
    x = self.norm(x)
    return x
  • 逐层特征提取:输入张量 x(形状通常为 (seq_len, batch_size, dim_model))依次通过所有 ACTEncoderLayer,每层融合自注意力和前馈网络特征。
  • 位置嵌入与掩码:pos_embed 提供序列位置信息,key_padding_mask 标记需忽略的填充位置,两者均传递给每层。
  • 最终归一化:所有层处理完毕后,通过 self.norm 输出最终特征。

ACTEncoderLayer

下面是单个编码器层的实现

class ACTEncoderLayer(nn.Module):
    def __init__(self, config: ACTConfig):
        super().__init__()
        # 自注意力模块
        self.self_attn = nn.MultiheadAttention(config.dim_model, config.n_heads, dropout=config.dropout)
        # 前馈网络(Linear -> Activation -> Dropout -> Linear)
        self.linear1 = nn.Linear(config.dim_model, config.dim_feedforward)
        self.dropout = nn.Dropout(config.dropout)
        self.linear2 = nn.Linear(config.dim_feedforward, config.dim_model)
        # 归一化与 dropout 层
        self.norm1, self.norm2 = nn.LayerNorm(config.dim_model), nn.LayerNorm(config.dim_model)
        self.dropout1, self.dropout2 = nn.Dropout(config.dropout), nn.Dropout(config.dropout)
        # 激活函数与归一化模式标记
        self.activation = get_activation_fn(config.feedforward_activation)
        self.pre_norm = config.pre_norm

ACTEncoderLayer 是编码器的核心计算单元,包含 自注意力机制、前馈网络 和 残差连接,支持预归一化(PreNorm)或后归一化(PostNorm)模式。

  • 自注意力:nn.MultiheadAttention 实现多头注意力,输入维度 dim_model,头数 n_heads。
  • 前馈网络:将特征从 dim_model 映射到 dim_feedforward(扩展维度),经激活和 dropout 后映射回 dim_model。
  • 归一化与 dropout:每层包含两个归一化层(norm1 用于注意力,norm2 用于前馈网络)和两个 dropout 层,增强训练稳定性。
def forward(self, x, pos_embed: Tensor | None = None, key_padding_mask: Tensor | None = None) -> Tensor:
    # 自注意力模块 + 残差连接
    skip = x
    if self.pre_norm:  # 预归一化:先归一化再计算注意力
        x = self.norm1(x)
    q = k = x if pos_embed is None else x + pos_embed  #  query 和 key 融合位置嵌入
    x = self.self_attn(q, k, value=x, key_padding_mask=key_padding_mask)[0]  # 取注意力输出(忽略权重)
    x = skip + self.dropout1(x)  # 残差连接 + dropout

    # 前馈网络模块 + 残差连接
    if self.pre_norm:  # 预归一化:先归一化再计算前馈
        skip = x
        x = self.norm2(x)
    else:  # 后归一化:先计算注意力再归一化
        x = self.norm1(x)
        skip = x
    x = self.linear2(self.dropout(self.activation(self.linear1(x))))  # 前馈网络
    x = skip + self.dropout2(x)  # 残差连接 + dropout
    if not self.pre_norm:  # 后归一化:最后归一化输出
        x = self.norm2(x)
    return x

上面是forward方法,可以分为自注意力阶段和前馈网络阶段。

  • 自注意力阶段:残差连接,skip 保存输入,注意力输出经 dropout1 后与 skip 相加。位置嵌入,q 和 k 若有 pos_embed 则叠加位置信息,帮助模型捕捉序列顺序。归一化时机,pre_norm=True 时,先对 x 归一化(norm1)再计算注意力;否则后归一化(注意力后通过 norm1 归一化)。
  • 前馈网络阶段:前馈计算,x 经线性层扩展维度、激活(如 ReLU/GELU)、dropout、线性层压缩维度。残差与归一化,类似注意力阶段,pre_norm 决定归一化时机,最终输出融合残差的特征。

总结一下,ACTEncoder,通过堆叠多个 ACTEncoderLayer 实现深度编码,动态适配 VAE 或主编码器角色,输出融合全局依赖的序列特征。ACTEncoderLayer,单个编码器层核心,通过“自注意力+前馈网络+残差连接”提取局部与全局特征,支持预/后归一化模式,是 Transformer 编码器的基础组件。两者协同构成 ACT 模型的编码器部分,负责将多模态输入(如图像、状态)编码为上下文特征,供解码器生成动作序列。

ACT解码器

ACTDecoder

ACTDecoder 是 Transformer 解码器的顶层模块,负责堆叠多个 ACTDecoderLayer(解码器子层)并对最终输出进行归一化,实现从编码器上下文特征到动作序列的映射。

class ACTDecoder(nn.Module):
    def __init__(self, config: ACTConfig):
        super().__init__()
        self.layers = nn.ModuleList([ACTDecoderLayer(config) for _ in range(config.n_decoder_layers)])
        self.norm = nn.LayerNorm(config.dim_model)

通过 nn.ModuleList 创建 config.n_decoder_layers 个 ACTDecoderLayer 实例,构成深度解码器(每层包含自注意力、交叉注意力和前馈网络)。使用 nn.LayerNorm 对所有解码器层的输出进行归一化,稳定训练过程。

def forward(
    self,
    x: Tensor,
    encoder_out: Tensor,
    decoder_pos_embed: Tensor | None = None,
    encoder_pos_embed: Tensor | None = None,
) -> Tensor:
    for layer in self.layers:
        x = layer(
            x, encoder_out, decoder_pos_embed=decoder_pos_embed, encoder_pos_embed=encoder_pos_embed
        )
    if self.norm is not None:
        x = self.norm(x)
    return x

输入参数如下:

  • x:解码器输入序列(初始为零向量,形状 (chunk_size, batch_size, dim_model),chunk_size 为动作序列长度);
  • encoder_out:编码器输出特征(形状 (encoder_seq_len, batch_size, dim_model));
  • decoder_pos_embed:解码器位置嵌入(为动作序列提供时序位置信息);
  • encoder_pos_embed:编码器位置嵌入(为编码器特征提供位置信息,辅助交叉注意力)。

将输入 x、编码器输出 encoder_out 及位置嵌入依次传入每个 ACTDecoderLayer,更新 x 为每层输出。所有层处理完毕后,通过 self.norm 对输出进行归一化,返回形状为 (chunk_size, batch_size, dim_model) 的特征张量(后续将映射为动作序列)。

ACTDecoderLayer

下面再介绍下ACTDecoderLayer。

ACTDecoderLayer 是解码器的基础单元,包含 自注意力(捕捉动作序列内部依赖)、交叉注意力(融合编码器上下文特征)和 前馈网络(增强特征表达能力)三大模块,支持预归一化(PreNorm)或后归一化(PostNorm)模式。

class ACTDecoderLayer(nn.Module):
    def __init__(self, config: ACTConfig):
        super().__init__()
        # 自注意力(解码器内部时序依赖建模)
        self.self_attn = nn.MultiheadAttention(config.dim_model, config.n_heads, dropout=config.dropout)
        # 交叉注意力(融合编码器输出特征)
        self.multihead_attn = nn.MultiheadAttention(config.dim_model, config.n_heads, dropout=config.dropout)
        # 前馈网络(特征变换与增强)
        self.linear1 = nn.Linear(config.dim_model, config.dim_feedforward)  # 升维
        self.dropout = nn.Dropout(config.dropout)
        self.linear2 = nn.Linear(config.dim_feedforward, config.dim_model)  # 降维
        # 归一化层(3个,分别对应自注意力、交叉注意力、前馈网络)
        self.norm1, self.norm2, self.norm3 = [nn.LayerNorm(config.dim_model) for _ in range(3)]
        # Dropout层(3个,增强正则化)
        self.dropout1, self.dropout2, self.dropout3 = [nn.Dropout(config.dropout) for _ in range(3)]
        # 激活函数(如ReLU/GELU)
        self.activation = get_activation_fn(config.feedforward_activation)
        # 归一化模式标记(PreNorm/PostNorm)
        self.pre_norm = config.pre_norm

前向传播forward可以分为3个阶段,分为自注意力->交叉注意力->前馈网络,每个阶段都包含归一化->计算->dropout->残差连接的逻辑。

(1)自注意力阶段

skip = x  # 残差连接的输入
if self.pre_norm:  # 预归一化:先归一化,再计算注意力
    x = self.norm1(x)
# Query和Key融合位置嵌入(Value不融合,保持原始特征)
q = k = self.maybe_add_pos_embed(x, decoder_pos_embed)
x = self.self_attn(q, k, value=x)[0]  # 自注意力输出(忽略注意力权重)
x = skip + self.dropout1(x)  # 残差连接 + Dropout

(2)交叉注意力阶段

if self.pre_norm:  # 预归一化:更新残差输入,归一化当前特征
    skip = x
    x = self.norm2(x)
else:  # 后归一化:先归一化自注意力输出,再更新残差输入
    x = self.norm1(x)
    skip = x
# Query(解码器特征)融合解码器位置嵌入,Key(编码器特征)融合编码器位置嵌入
x = self.multihead_attn(
    query=self.maybe_add_pos_embed(x, decoder_pos_embed),
    key=self.maybe_add_pos_embed(encoder_out, encoder_pos_embed),
    value=encoder_out,
)[0]  # 交叉注意力输出(忽略权重)
x = skip + self.dropout2(x)  # 残差连接 + Dropout

(3)前馈网络

if self.pre_norm:  # 预归一化:更新残差输入,归一化当前特征
    skip = x
    x = self.norm3(x)
else:  # 后归一化:先归一化交叉注意力输出,再更新残差输入
    x = self.norm2(x)
    skip = x
# 前馈网络:升维→激活→Dropout→降维
x = self.linear2(self.dropout(self.activation(self.linear1(x))))
x = skip + self.dropout3(x)  # 残差连接 + Dropout
if not self.pre_norm:  # 后归一化:最后归一化前馈网络输出
    x = self.norm3(x)

发表你的看法

\t