lerobot ACT算法实现
- lerobot
- 6小时前
- 18热度
- 0评论
配置类ACTConfig
@PreTrainedConfig.register_subclass("act")
@dataclass
class ACTConfig(PreTrainedConfig):
# 输入/输出结构
chunk_size: int = 100 # 动作块长度(每次预测的动作序列长度)
n_action_steps: int = 100 # 每次策略调用执行的动作步数(≤ chunk_size)
temporal_ensemble_coeff: float | None = None # 时序集成系数(None表示禁用)
# VAE配置
use_vae: bool = True # 是否启用VAE(增强动作多样性)
latent_dim: int = 32 # 潜在空间维度
kl_weight: float = 10.0 # KL散度损失权重
# Transformer配置
dim_model: int = 512 # Transformer隐藏维度
n_heads: int = 8 # 注意力头数
n_encoder_layers: int = 4 # 编码器层数
n_decoder_layers: int = 1 # 解码器层数(原始实现bug,仅用第1层)
# 视觉Backbone
vision_backbone: str = "resnet18" # 图像特征提取网络
......
ACTConfig是ACT算法核心配置类,主要定义了模型结构、输入输出格式、训练参数和推理逻辑等。
输入/输出结构
参数主要配置模型输入观测、输出动作的基本格式,是连接环境与模型的桥梁。
# Input / output structure.
n_obs_steps: int = 1 # 输入观测的时间步数(当前仅支持1步观测,即当前时刻观测)
chunk_size: int = 100 # 动作块长度:每次预测的连续动作序列长度(核心参数,决定分块粒度)
n_action_steps: int = 100 # 每次策略调用执行的动作步数(≤ chunk_size,默认与chunk_size一致,即一次执行整段动作块)
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.MEAN_STD, # 图像特征归一化:减均值除标准差
"STATE": NormalizationMode.MEAN_STD, # 状态特征(如机器人关节角)归一化:同上
"ACTION": NormalizationMode.MEAN_STD, # 动作归一化:同上(确保训练时输入分布稳定)
}
)
- chunk_size 是 ACT 算法的核心设计:将长时序动作生成分解为固定长度的“动作块”(如100步),避免一次性规划整个任务序列,降低计算复杂度。
- n_action_steps 控制每次策略调用后实际执行的动作步数。例如,若 chunk_size=100 且 n_action_steps=50,则模型预测100步动作,执行前50步,丢弃后50步(适用于需要频繁重新规划的场景)。
架构配置
从此前的具身智能ACT算法我们知道ACT模型算法主要是基于transformer结构,从实现上模型的核心组件可以分为视觉backbone、transformer、VAE结构。
(1)视觉backbone配置
# Vision backbone.
vision_backbone: str = "resnet18" # 视觉特征提取网络:使用ResNet18(轻量级,适合实时控制)
pretrained_backbone_weights: str | None = "ResNet18_Weights.IMAGENET1K_V1" # 预训练权重:使用ImageNet-1K预训练参数初始化,提升特征提取能力
replace_final_stride_with_dilation: int = False # 是否用空洞卷积替换ResNet的最终2x2 stride(默认关闭,保持特征图分辨率)
上面的参数是ACT算法中用于图像特征提取模块的核心配置,影响模型对视觉输入的理解能力和计算效率。
首先指定了图像特征提取的骨干网络为resnet18,其仅有18层网络,参数量约1100万,常用于实时机器人控制场景,ResNet是通过残差连接缓解深层网络梯度消失问题,能有效提取多尺度图像特征包括边缘纹理到语义信息。视觉 Backbone 的输出(如 ResNet-18 的 layer4 特征图)会被展平为序列,与机器人状态、潜在向量等多模态特征拼接后输入 Transformer 编码器。
其次指定了resnet18预训练权重的来源,默认使用使用 ImageNet-1K 数据集预训练的权重。
最后的replace_final_stride_with_dilation默认关闭,主要是控制是否用“空洞卷积”替换resnet最后一层的2*2步幅卷积。关闭空洞卷积适合对实时性要求高、特征分辨率要求低的场景,如粗粒度抓取任务。如果打开可保留更多的空洞细节(如物体边缘、纹理),适合精细操作如螺丝拧入、零件对齐、但是需要权衡计算量增加和内存的占用。
(2)transformer配置
# Transformer layers.
pre_norm: bool = False # Transformer块归一化位置:False=后归一化(原始ACT实现),True=前归一化(更稳定但需调参)
dim_model: int = 512 # Transformer隐藏层维度(特征维度)
n_heads: int = 8 # 多头注意力头数(8头,总注意力维度=512/8=64/头)
dim_feedforward: int = 3200 # 前馈网络中间维度(通常为dim_model的4-6倍,此处3200=512*6.25)
feedforward_activation: str = "relu" # 前馈网络激活函数(ReLU,原始ACT实现)
n_encoder_layers: int = 4 # Transformer编码器层数(4层,用于融合多模态输入特征)
# 注:原始ACT实现中n_decoder_layers=7,但因代码bug仅使用第1层,此处对齐原始实现设为1
n_decoder_layers: int = 1 # Transformer解码器层数(1层,用于生成动作块序列)
上面参数定义了ACT算法中Transformer 编码器/解码器的核心结构参数,直接决定模型的序列建模能力、计算效率和特征融合效果。
- pre_norm: 归一化位置 ,False=原始行为,True=训练更稳定(需重新调参),若训练发散,可尝试设为 True。
- dim_model:特征维度(模型容量),增大→更强表达能力,但计算/内存成本平方级增长,机器人实时场景建议 ≤ 1024。
- n_heads:注意力并行头数,增多更细粒度关注,但通信开销增大 ,保持 dim_model/n_heads = 64(如 512/8=64)。
- n_encoder_layers: 特征融合深度,增多融合更充分,但推理延迟增加,机械臂操作建议 4-6 层。
- n_decoder_layers: 动作生成深度,受原始 bug 限制,固定为 1 以对齐行为,若修复原始 bug,可尝试增至 3-4 层。
(3)VAE变分自编码配置
# VAE.
use_vae: bool = True # 是否启用VAE(默认启用,通过潜在空间建模动作分布)
latent_dim: int = 32 # VAE潜在空间维度(32维,压缩动作序列信息)
n_vae_encoder_layers: int = 4 # VAE编码器层数(4层Transformer,用于将动作块编码为潜在分布)
(4)推理配置
# Inference.
# Note: ACT原论文中启用时序集成时默认值为0.01
temporal_ensemble_coeff: float | None = None # 时序集成系数:None=禁用,>0=启用(指数加权平均平滑动作)
该参数就是是否启动ACT的Temporal Ensembling机制,时序集成(Temporal Ensembling) 功能的启用与权重计算方式,用于在推理时平滑动作序列,避免机器人执行突变动作(尤其适用于精细操作如机械臂抓取、插入等任务)。
要启动Temporal Ensembling机制需显式设置该参数为非 None 的浮点值(如 0.01),且需满足n_action_steps 必须设为 1(每次策略调用仅执行 1 步动作,确保每步都通过集成优化)。
当 temporal_ensemble_coeff = α(如 0.01)时,ACTTemporalEnsembler 会对连续多轮预测的动作块(chunk_size 长度)进行 指数加权平均。
(5)训练损失配置
# Training and loss computation.
dropout: float = 0.1
kl_weight: float = 10.0
dropout控制 Transformer 层的 随机失活概率,用于正则化,防止模型过拟合训练数据。在训练过程中,以 dropout 概率(此处 10
kl_weight控制 KL 散度损失(KL-divergence Loss) 的权重,仅在启用 VAE(use_vae=True,默认启用)时生效。10.0 是一个较大的权重,表明原始 ACT 实现中更注重约束潜在分布的“规范性”(接近标准正态),以确保 VAE 能生成多样化的动作序列,避免模型仅记忆训练数据中的动作模式。
(6)训练优化
# Training preset
optimizer_lr: float = 1e-5
optimizer_weight_decay: float = 1e-4
optimizer_lr_backbone: float = 1e-5
optimizer_lr控制除视觉Backbone外所有参数(如Transformer编码器/解码器、VAE层等)的梯度更新步长。学习率过大会导致训练不稳定(Loss震荡),过小则收敛缓慢。1e-5(0.00001)是训练Transformer类模型的经典学习率(如BERT、GPT等),尤其适用于。
optimizer_weight_decay权重衰减(L2正则化)系数,用于抑制过拟合。过小(如1e-5):正则化不足,易过拟合训练数据。过大(如1e-3):过度抑制参数更新,导致模型欠拟合。适用于机器人操作任务,训练数据通常包含噪声(如传感器误差、动作抖动),权重衰减可提升模型对噪声的鲁棒性。
optimizer_lr_backbone视觉Backbone(如ResNet18)参数的专用学习率。ACT原论文中Backbone与主模型联合训练,未使用更小的Backbone学习率。
在实际的工程中,可在get_optim_params 中显式区分Backbone与非Backbone参数,应用不同学习率:
def get_optim_params(self) -> dict:
return [
{
"params": [p for n, p in self.named_parameters() if not n.startswith("model.backbone") and p.requires_grad],
"lr": self.config.optimizer_lr, # 主参数学习率
},
{
"params": [p for n, p in self.named_parameters() if n.startswith("model.backbone") and p.requires_grad],
"lr": self.config.optimizer_lr_backbone, # Backbone专用学习率
},
]
策略入口类ACTPolicy
初始化逻辑
class ACTPolicy(PreTrainedPolicy):
def __init__(self, config: ACTConfig, dataset_stats=None):
super().__init__(config)
# 输入/输出归一化(标准化数据分布)
self.normalize_inputs = Normalize(config.input_features, config.normalization_mapping, dataset_stats)
self.unnormalize_outputs = Unnormalize(config.output_features, config.normalization_mapping, dataset_stats)
self.model = ACT(config) # 加载ACT神经网络
# 初始化时序集成器(若启用)
if config.temporal_ensemble_coeff is not None:
self.temporal_ensembler = ACTTemporalEnsembler(config.temporal_ensemble_coeff, config.chunk_size)
self.reset() # 重置动作队列/集成器
推理逻辑
(1)动作队列模式
def select_action(self, batch: dict[str, Tensor]) -> Tensor:
if len(self._action_queue) == 0:
# 生成动作块(chunk_size步),取前n_action_steps步存入队列
actions = self.predict_action_chunk(batch)[:, :self.config.n_action_steps]
# 队列形状:(n_action_steps, batch_size, action_dim),故转置后入队
self._action_queue.extend(actions.transpose(0, 1))
return self._action_queue.popleft() # 每次弹出队列首步动作
(2)时间集成模式
def select_action(self, batch: dict[str, Tensor]) -> Tensor:
if self.config.temporal_ensemble_coeff is not None:
actions = self.predict_action_chunk(batch) # 生成动作块
action = self.temporal_ensembler.update(actions) # 时序集成平滑
return action
时间集成核心实现
class ACTTemporalEnsembler:
def __init__(self, temporal_ensemble_coeff: float, chunk_size: int):
# 指数权重:w_i = exp(-coeff * i),i为动作索引(0为最旧动作)
self.ensemble_weights = torch.exp(-temporal_ensemble_coeff * torch.arange(chunk_size))
self.ensemble_weights_cumsum = torch.cumsum(self.ensemble_weights, dim=0) # 权重累加和(用于归一化)
def update(self, actions: Tensor) -> Tensor:
# actions: (batch_size, chunk_size, action_dim)
if self.ensembled_actions is None:
self.ensembled_actions = actions.clone() # 初始化集成动作
else:
# 在线加权更新:历史动作 * 累计权重 + 新动作 * 当前权重,再归一化
self.ensembled_actions *= self.ensemble_weights_cumsum[self.ensembled_actions_count - 1]
self.ensembled_actions += actions[:, :-1] * self.ensemble_weights[self.ensembled_actions_count]
self.ensembled_actions /= self.ensemble_weights_cumsum[self.ensembled_actions_count]
return self.ensembled_actions[:, 0] # 返回集成后的首步动作
训练损失计算
def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict]:
batch = self.normalize_inputs(batch) # 输入归一化
batch = self.normalize_targets(batch) # 目标动作归一化
actions_hat, (mu, log_sigma_x2) = self.model(batch) # 模型输出:预测动作、VAE分布参数
# L1损失(忽略填充动作)
l1_loss = (F.l1_loss(batch[ACTION], actions_hat, reduction="none") * ~batch["action_is_pad"].unsqueeze(-1)).mean()
loss_dict = {"l1_loss": l1_loss.item()}
# VAE KL散度损失(若启用)
if self.config.use_vae:
mean_kld = (-0.5 * (1 + log_sigma_x2 - mu.pow(2) - log_sigma_x2.exp())).sum(-1).mean()
loss_dict["kld_loss"] = mean_kld.item()
loss = l1_loss + mean_kld * self.config.kl_weight # 总损失 = 重构损失 + KL权重 * KL损失
else:
loss = l1_loss
return loss, loss_dict
核心算法ACT
整体结构
class ACT(nn.Module):
def __init__(self, config: ACTConfig):
super().__init__()
self.config = config
# VAE编码器(可选):将动作序列编码为潜在分布
if config.use_vae:
self.vae_encoder = ACTEncoder(config, is_vae_encoder=True)
self.vae_encoder_latent_output_proj = nn.Linear(config.dim_model, config.latent_dim * 2) # 输出mu和log(sigma²)
# 视觉Backbone:ResNet提取图像特征
if config.image_features:
backbone_model = getattr(torchvision.models, config.vision_backbone)(weights=config.pretrained_backbone_weights)
self.backbone = IntermediateLayerGetter(backbone_model, return_layers={"layer4": "feature_map"}) # 取layer4特征图
# Transformer编码器-解码器
self.encoder = ACTEncoder(config) # 处理多模态输入(图像、状态、潜在向量)
self.decoder = ACTDecoder(config) # 生成动作块
self.action_head = nn.Linear(config.dim_model, config.action_feature.shape[0]) # 动作输出头
动作序列编码
def forward(self, batch: dict[str, Tensor]):
if self.config.use_vae and "action" in batch:
# VAE输入:[CLS, 机器人状态, 动作序列]
cls_embed = einops.repeat(self.vae_encoder_cls_embed.weight, "1 d -> b 1 d", b=batch_size) # CLS token
action_embed = self.vae_encoder_action_input_proj(batch["action"]) # 动作嵌入:(B, chunk_size, D)
vae_encoder_input = torch.cat([cls_embed, action_embed], axis=1) # (B, chunk_size+1, D)
# VAE编码器输出CLS token,映射为潜在分布参数
cls_token_out = self.vae_encoder(vae_encoder_input.permute(1, 0, 2))[0] # (B, D)
latent_pdf_params = self.vae_encoder_latent_output_proj(cls_token_out) # (B, 2*latent_dim)
mu = latent_pdf_params[:, :self.config.latent_dim] # 均值
log_sigma_x2 = latent_pdf_params[:, self.config.latent_dim:] # log(方差)
# 重参数化采样:latent = mu + sigma * ε(ε~N(0,1))
latent_sample = mu + log_sigma_x2.div(2).exp() * torch.randn_like(mu)
else:
latent_sample = torch.zeros([batch_size, self.config.latent_dim]).to(device) # 禁用VAE时,latent为0向量
Transformer编码器
# 1. 潜在向量投影
latent_embed = self.encoder_latent_input_proj(latent_sample).unsqueeze(0) # (1, B, D)
# 2. 机器人状态投影(若启用)
if self.config.robot_state_feature:
robot_state_embed = self.encoder_robot_state_input_proj(batch["observation.state"]).unsqueeze(0) # (1, B, D)
# 3. 图像特征提取与投影
if self.config.image_features:
cam_features = self.backbone(img)["feature_map"] # ResNet layer4特征:(B, 512, H, W)
cam_features = self.encoder_img_feat_input_proj(cam_features) # 1x1卷积投影至D维:(B, D, H, W)
cam_features = einops.rearrange(cam_features, "b c h w -> (h w) b c") # 展平为序列:(H*W, B, D)
# 4. 编码器输入:拼接所有特征,添加位置嵌入
encoder_in_tokens = torch.cat([latent_embed, robot_state_embed, cam_features], axis=0) # (总序列长, B, D)
encoder_out = self.encoder(encoder_in_tokens, pos_embed=encoder_in_pos_embed) # Transformer编码器输出
Transformer解码器
# 解码器输入:全零向量(类似DETR的"对象查询")
decoder_in = torch.zeros((self.config.chunk_size, batch_size, self.config.dim_model), device=device)
# 解码器位置嵌入:可学习参数,形状 (chunk_size, D)
decoder_pos_embed = self.decoder_pos_embed.weight.unsqueeze(1) # (chunk_size, 1, D)
# 解码器输出:(chunk_size, B, D)
decoder_out = self.decoder(decoder_in, encoder_out, decoder_pos_embed=decoder_pos_embed)
actions = self.action_head(decoder_out.transpose(0, 1)) # (B, chunk_size, action_dim)