lerobot ACT算法实现

配置类ACTConfig

@PreTrainedConfig.register_subclass("act")
@dataclass
class ACTConfig(PreTrainedConfig):
    # 输入/输出结构
    chunk_size: int = 100  # 动作块长度(每次预测的动作序列长度)
    n_action_steps: int = 100  # 每次策略调用执行的动作步数(≤ chunk_size)
    temporal_ensemble_coeff: float | None = None  # 时序集成系数(None表示禁用)

    # VAE配置
    use_vae: bool = True  # 是否启用VAE(增强动作多样性)
    latent_dim: int = 32  # 潜在空间维度
    kl_weight: float = 10.0  # KL散度损失权重

    # Transformer配置
    dim_model: int = 512  # Transformer隐藏维度
    n_heads: int = 8  # 注意力头数
    n_encoder_layers: int = 4  # 编码器层数
    n_decoder_layers: int = 1  # 解码器层数(原始实现bug,仅用第1层)

    # 视觉Backbone
    vision_backbone: str = "resnet18"  # 图像特征提取网络
    ......

ACTConfig是ACT算法核心配置类,主要定义了模型结构、输入输出格式、训练参数和推理逻辑等。

输入/输出结构

参数主要配置模型输入观测、输出动作的基本格式,是连接环境与模型的桥梁。

# Input / output structure.
n_obs_steps: int = 1  # 输入观测的时间步数(当前仅支持1步观测,即当前时刻观测)
chunk_size: int = 100  # 动作块长度:每次预测的连续动作序列长度(核心参数,决定分块粒度)
n_action_steps: int = 100  # 每次策略调用执行的动作步数(≤ chunk_size,默认与chunk_size一致,即一次执行整段动作块)

normalization_mapping: dict[str, NormalizationMode] = field(
    default_factory=lambda: {
        "VISUAL": NormalizationMode.MEAN_STD,  # 图像特征归一化:减均值除标准差
        "STATE": NormalizationMode.MEAN_STD,   # 状态特征(如机器人关节角)归一化:同上
        "ACTION": NormalizationMode.MEAN_STD,  # 动作归一化:同上(确保训练时输入分布稳定)
    }
)
  • chunk_size 是 ACT 算法的核心设计:将长时序动作生成分解为固定长度的“动作块”(如100步),避免一次性规划整个任务序列,降低计算复杂度。
  • n_action_steps 控制每次策略调用后实际执行的动作步数。例如,若 chunk_size=100 且 n_action_steps=50,则模型预测100步动作,执行前50步,丢弃后50步(适用于需要频繁重新规划的场景)。

架构配置

从此前的具身智能ACT算法我们知道ACT模型算法主要是基于transformer结构,从实现上模型的核心组件可以分为视觉backbone、transformer、VAE结构。

(1)视觉backbone配置

# Vision backbone.
vision_backbone: str = "resnet18"  # 视觉特征提取网络:使用ResNet18(轻量级,适合实时控制)
pretrained_backbone_weights: str | None = "ResNet18_Weights.IMAGENET1K_V1"  # 预训练权重:使用ImageNet-1K预训练参数初始化,提升特征提取能力
replace_final_stride_with_dilation: int = False  # 是否用空洞卷积替换ResNet的最终2x2 stride(默认关闭,保持特征图分辨率)

上面的参数是ACT算法中用于图像特征提取模块的核心配置,影响模型对视觉输入的理解能力和计算效率。

首先指定了图像特征提取的骨干网络为resnet18,其仅有18层网络,参数量约1100万,常用于实时机器人控制场景,ResNet是通过残差连接缓解深层网络梯度消失问题,能有效提取多尺度图像特征包括边缘纹理到语义信息。视觉 Backbone 的输出(如 ResNet-18 的 layer4 特征图)会被展平为序列,与机器人状态、潜在向量等多模态特征拼接后输入 Transformer 编码器。

其次指定了resnet18预训练权重的来源,默认使用使用 ImageNet-1K 数据集预训练的权重。

最后的replace_final_stride_with_dilation默认关闭,主要是控制是否用“空洞卷积”替换resnet最后一层的2*2步幅卷积。关闭空洞卷积适合对实时性要求高、特征分辨率要求低的场景,如粗粒度抓取任务。如果打开可保留更多的空洞细节(如物体边缘、纹理),适合精细操作如螺丝拧入、零件对齐、但是需要权衡计算量增加和内存的占用。

(2)transformer配置

# Transformer layers.
pre_norm: bool = False  # Transformer块归一化位置:False=后归一化(原始ACT实现),True=前归一化(更稳定但需调参)
dim_model: int = 512    # Transformer隐藏层维度(特征维度)
n_heads: int = 8        # 多头注意力头数(8头,总注意力维度=512/8=64/头)
dim_feedforward: int = 3200  # 前馈网络中间维度(通常为dim_model的4-6倍,此处3200=512*6.25)
feedforward_activation: str = "relu"  # 前馈网络激活函数(ReLU,原始ACT实现)
n_encoder_layers: int = 4  # Transformer编码器层数(4层,用于融合多模态输入特征)
# 注:原始ACT实现中n_decoder_layers=7,但因代码bug仅使用第1层,此处对齐原始实现设为1
n_decoder_layers: int = 1  # Transformer解码器层数(1层,用于生成动作块序列)

上面参数定义了ACT算法中Transformer 编码器/解码器的核心结构参数,直接决定模型的序列建模能力、计算效率和特征融合效果。

  • pre_norm: 归一化位置 ,False=原始行为,True=训练更稳定(需重新调参),若训练发散,可尝试设为 True。
  • dim_model:特征维度(模型容量),增大→更强表达能力,但计算/内存成本平方级增长,机器人实时场景建议 ≤ 1024。
  • n_heads:注意力并行头数,增多更细粒度关注,但通信开销增大 ,保持 dim_model/n_heads = 64(如 512/8=64)。
  • n_encoder_layers: 特征融合深度,增多融合更充分,但推理延迟增加,机械臂操作建议 4-6 层。
  • n_decoder_layers: 动作生成深度,受原始 bug 限制,固定为 1 以对齐行为,若修复原始 bug,可尝试增至 3-4 层。

(3)VAE变分自编码配置

# VAE.
use_vae: bool = True          # 是否启用VAE(默认启用,通过潜在空间建模动作分布)
latent_dim: int = 32          # VAE潜在空间维度(32维,压缩动作序列信息)
n_vae_encoder_layers: int = 4 # VAE编码器层数(4层Transformer,用于将动作块编码为潜在分布)

(4)推理配置

# Inference.
# Note: ACT原论文中启用时序集成时默认值为0.01
temporal_ensemble_coeff: float | None = None  # 时序集成系数:None=禁用,>0=启用(指数加权平均平滑动作)

该参数就是是否启动ACT的Temporal Ensembling机制,时序集成(Temporal Ensembling) 功能的启用与权重计算方式,用于在推理时平滑动作序列,避免机器人执行突变动作(尤其适用于精细操作如机械臂抓取、插入等任务)。

要启动Temporal Ensembling机制需显式设置该参数为非 None 的浮点值(如 0.01),且需满足n_action_steps 必须设为 1(每次策略调用仅执行 1 步动作,确保每步都通过集成优化)。

当 temporal_ensemble_coeff = α(如 0.01)时,ACTTemporalEnsembler 会对连续多轮预测的动作块(chunk_size 长度)进行 指数加权平均。

(5)训练损失配置

    # Training and loss computation.
    dropout: float = 0.1
    kl_weight: float = 10.0

dropout控制 Transformer 层的 随机失活概率,用于正则化,防止模型过拟合训练数据。在训练过程中,以 dropout 概率(此处 10

kl_weight控制 KL 散度损失(KL-divergence Loss) 的权重,仅在启用 VAE(use_vae=True,默认启用)时生效。10.0 是一个较大的权重,表明原始 ACT 实现中更注重约束潜在分布的“规范性”(接近标准正态),以确保 VAE 能生成多样化的动作序列,避免模型仅记忆训练数据中的动作模式。

(6)训练优化

    # Training preset
    optimizer_lr: float = 1e-5
    optimizer_weight_decay: float = 1e-4
    optimizer_lr_backbone: float = 1e-5

optimizer_lr控制除视觉Backbone外所有参数(如Transformer编码器/解码器、VAE层等)的梯度更新步长。学习率过大会导致训练不稳定(Loss震荡),过小则收敛缓慢。1e-5(0.00001)是训练Transformer类模型的经典学习率(如BERT、GPT等),尤其适用于。

optimizer_weight_decay权重衰减(L2正则化)系数,用于抑制过拟合。过小(如1e-5):正则化不足,易过拟合训练数据。过大(如1e-3):过度抑制参数更新,导致模型欠拟合。适用于机器人操作任务,训练数据通常包含噪声(如传感器误差、动作抖动),权重衰减可提升模型对噪声的鲁棒性。

optimizer_lr_backbone视觉Backbone(如ResNet18)参数的专用学习率。ACT原论文中Backbone与主模型联合训练,未使用更小的Backbone学习率。

在实际的工程中,可在get_optim_params 中显式区分Backbone与非Backbone参数,应用不同学习率:

def get_optim_params(self) -> dict:
    return [
        {
            "params": [p for n, p in self.named_parameters() if not n.startswith("model.backbone") and p.requires_grad],
            "lr": self.config.optimizer_lr,  # 主参数学习率
        },
        {
            "params": [p for n, p in self.named_parameters() if n.startswith("model.backbone") and p.requires_grad],
            "lr": self.config.optimizer_lr_backbone,  # Backbone专用学习率
        },
    ]

策略入口类ACTPolicy

初始化逻辑

class ACTPolicy(PreTrainedPolicy):
    def __init__(self, config: ACTConfig, dataset_stats=None):
        super().__init__(config)
        # 输入/输出归一化(标准化数据分布)
        self.normalize_inputs = Normalize(config.input_features, config.normalization_mapping, dataset_stats)
        self.unnormalize_outputs = Unnormalize(config.output_features, config.normalization_mapping, dataset_stats)

        self.model = ACT(config)  # 加载ACT神经网络

        # 初始化时序集成器(若启用)
        if config.temporal_ensemble_coeff is not None:
            self.temporal_ensembler = ACTTemporalEnsembler(config.temporal_ensemble_coeff, config.chunk_size)

        self.reset()  # 重置动作队列/集成器

推理逻辑

(1)动作队列模式

def select_action(self, batch: dict[str, Tensor]) -> Tensor:
    if len(self._action_queue) == 0:
        # 生成动作块(chunk_size步),取前n_action_steps步存入队列
        actions = self.predict_action_chunk(batch)[:, :self.config.n_action_steps]
        # 队列形状:(n_action_steps, batch_size, action_dim),故转置后入队
        self._action_queue.extend(actions.transpose(0, 1))
    return self._action_queue.popleft()  # 每次弹出队列首步动作

(2)时间集成模式

def select_action(self, batch: dict[str, Tensor]) -> Tensor:
    if self.config.temporal_ensemble_coeff is not None:
        actions = self.predict_action_chunk(batch)  # 生成动作块
        action = self.temporal_ensembler.update(actions)  # 时序集成平滑
        return action

时间集成核心实现

class ACTTemporalEnsembler:
    def __init__(self, temporal_ensemble_coeff: float, chunk_size: int):
        # 指数权重:w_i = exp(-coeff * i),i为动作索引(0为最旧动作)
        self.ensemble_weights = torch.exp(-temporal_ensemble_coeff * torch.arange(chunk_size))
        self.ensemble_weights_cumsum = torch.cumsum(self.ensemble_weights, dim=0)  # 权重累加和(用于归一化)

    def update(self, actions: Tensor) -> Tensor:
        # actions: (batch_size, chunk_size, action_dim)
        if self.ensembled_actions is None:
            self.ensembled_actions = actions.clone()  # 初始化集成动作
        else:
            # 在线加权更新:历史动作 * 累计权重 + 新动作 * 当前权重,再归一化
            self.ensembled_actions *= self.ensemble_weights_cumsum[self.ensembled_actions_count - 1]
            self.ensembled_actions += actions[:, :-1] * self.ensemble_weights[self.ensembled_actions_count]
            self.ensembled_actions /= self.ensemble_weights_cumsum[self.ensembled_actions_count]
        return self.ensembled_actions[:, 0]  # 返回集成后的首步动作

训练损失计算

def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict]:
    batch = self.normalize_inputs(batch)  # 输入归一化
    batch = self.normalize_targets(batch)  # 目标动作归一化

    actions_hat, (mu, log_sigma_x2) = self.model(batch)  # 模型输出:预测动作、VAE分布参数

    # L1损失(忽略填充动作)
    l1_loss = (F.l1_loss(batch[ACTION], actions_hat, reduction="none") * ~batch["action_is_pad"].unsqueeze(-1)).mean()
    loss_dict = {"l1_loss": l1_loss.item()}

    # VAE KL散度损失(若启用)
    if self.config.use_vae:
        mean_kld = (-0.5 * (1 + log_sigma_x2 - mu.pow(2) - log_sigma_x2.exp())).sum(-1).mean()
        loss_dict["kld_loss"] = mean_kld.item()
        loss = l1_loss + mean_kld * self.config.kl_weight  # 总损失 = 重构损失 + KL权重 * KL损失
    else:
        loss = l1_loss
    return loss, loss_dict

核心算法ACT

整体结构

class ACT(nn.Module):
    def __init__(self, config: ACTConfig):
        super().__init__()
        self.config = config

        # VAE编码器(可选):将动作序列编码为潜在分布
        if config.use_vae:
            self.vae_encoder = ACTEncoder(config, is_vae_encoder=True)
            self.vae_encoder_latent_output_proj = nn.Linear(config.dim_model, config.latent_dim * 2)  # 输出mu和log(sigma²)

        # 视觉Backbone:ResNet提取图像特征
        if config.image_features:
            backbone_model = getattr(torchvision.models, config.vision_backbone)(weights=config.pretrained_backbone_weights)
            self.backbone = IntermediateLayerGetter(backbone_model, return_layers={"layer4": "feature_map"})  # 取layer4特征图

        # Transformer编码器-解码器
        self.encoder = ACTEncoder(config)  # 处理多模态输入(图像、状态、潜在向量)
        self.decoder = ACTDecoder(config)  # 生成动作块
        self.action_head = nn.Linear(config.dim_model, config.action_feature.shape[0])  # 动作输出头

动作序列编码

def forward(self, batch: dict[str, Tensor]):
    if self.config.use_vae and "action" in batch:
        # VAE输入:[CLS, 机器人状态, 动作序列]
        cls_embed = einops.repeat(self.vae_encoder_cls_embed.weight, "1 d -> b 1 d", b=batch_size)  # CLS token
        action_embed = self.vae_encoder_action_input_proj(batch["action"])  # 动作嵌入:(B, chunk_size, D)
        vae_encoder_input = torch.cat([cls_embed, action_embed], axis=1)  # (B, chunk_size+1, D)

        # VAE编码器输出CLS token,映射为潜在分布参数
        cls_token_out = self.vae_encoder(vae_encoder_input.permute(1, 0, 2))[0]  # (B, D)
        latent_pdf_params = self.vae_encoder_latent_output_proj(cls_token_out)  # (B, 2*latent_dim)
        mu = latent_pdf_params[:, :self.config.latent_dim]  # 均值
        log_sigma_x2 = latent_pdf_params[:, self.config.latent_dim:]  # log(方差)

        # 重参数化采样:latent = mu + sigma * ε(ε~N(0,1))
        latent_sample = mu + log_sigma_x2.div(2).exp() * torch.randn_like(mu)
    else:
        latent_sample = torch.zeros([batch_size, self.config.latent_dim]).to(device)  # 禁用VAE时,latent为0向量

Transformer编码器

# 1. 潜在向量投影
latent_embed = self.encoder_latent_input_proj(latent_sample).unsqueeze(0)  # (1, B, D)

# 2. 机器人状态投影(若启用)
if self.config.robot_state_feature:
    robot_state_embed = self.encoder_robot_state_input_proj(batch["observation.state"]).unsqueeze(0)  # (1, B, D)

# 3. 图像特征提取与投影
if self.config.image_features:
    cam_features = self.backbone(img)["feature_map"]  # ResNet layer4特征:(B, 512, H, W)
    cam_features = self.encoder_img_feat_input_proj(cam_features)  # 1x1卷积投影至D维:(B, D, H, W)
    cam_features = einops.rearrange(cam_features, "b c h w -> (h w) b c")  # 展平为序列:(H*W, B, D)

# 4. 编码器输入:拼接所有特征,添加位置嵌入
encoder_in_tokens = torch.cat([latent_embed, robot_state_embed, cam_features], axis=0)  # (总序列长, B, D)
encoder_out = self.encoder(encoder_in_tokens, pos_embed=encoder_in_pos_embed)  # Transformer编码器输出

Transformer解码器

# 解码器输入:全零向量(类似DETR的"对象查询")
decoder_in = torch.zeros((self.config.chunk_size, batch_size, self.config.dim_model), device=device)
# 解码器位置嵌入:可学习参数,形状 (chunk_size, D)
decoder_pos_embed = self.decoder_pos_embed.weight.unsqueeze(1)  # (chunk_size, 1, D)

# 解码器输出:(chunk_size, B, D)
decoder_out = self.decoder(decoder_in, encoder_out, decoder_pos_embed=decoder_pos_embed)
actions = self.action_head(decoder_out.transpose(0, 1))  # (B, chunk_size, action_dim)