lerobot ACT实现分析
配置类ACTConfig
@PreTrainedConfig.register_subclass("act")
@dataclass
class ACTConfig(PreTrainedConfig):
# 输入/输出结构
chunk_size: int = 100 # 动作块长度(每次预测的动作序列长度)
n_action_steps: int = 100 # 每次策略调用执行的动作步数(≤ chunk_size)
temporal_ensemble_coeff: float | None = None # 时序集成系数(None表示禁用)
# VAE配置
use_vae: bool = True # 是否启用VAE(增强动作多样性)
latent_dim: int = 32 # 潜在空间维度
kl_weight: float = 10.0 # KL散度损失权重
# Transformer配置
dim_model: int = 512 # Transformer隐藏维度
n_heads: int = 8 # 注意力头数
n_encoder_layers: int = 4 # 编码器层数
n_decoder_layers: int = 1 # 解码器层数(原始实现bug,仅用第1层)
# 视觉Backbone
vision_backbone: str = "resnet18" # 图像特征提取网络
......
ACTConfig是ACT算法核心配置类,主要定义了模型结构、输入输出格式、训练参数和推理逻辑等。
输入/输出结构
参数主要配置模型输入观测、输出动作的基本格式,是连接环境与模型的桥梁。
# Input / output structure.
n_obs_steps: int = 1 # 输入观测的时间步数(当前仅支持1步观测,即当前时刻观测)
chunk_size: int = 100 # 动作块长度:每次预测的连续动作序列长度(核心参数,决定分块粒度)
n_action_steps: int = 100 # 每次策略调用执行的动作步数(≤ chunk_size,默认与chunk_size一致,即一次执行整段动作块)
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.MEAN_STD, # 图像特征归一化:减均值除标准差
"STATE": NormalizationMode.MEAN_STD, # 状态特征(如机器人关节角)归一化:同上
"ACTION": NormalizationMode.MEAN_STD, # 动作归一化:同上(确保训练时输入分布稳定)
}
)
- chunk_size 是 ACT 算法的核心设计:将长时序动作生成分解为固定长度的“动作块”(如100步),避免一次性规划整个任务序列,降低计算复杂度。
- n_action_steps 控制每次策略调用后实际执行的动作步数。例如,若 chunk_size=100 且 n_action_steps=50,则模型预测100步动作,执行前50步,丢弃后50步(适用于需要频繁重新规划的场景)。
架构配置
从此前的具身智能ACT算法我们知道ACT模型算法主要是基于transformer结构,从实现上模型的核心组件可以分为视觉backbone、transformer、VAE结构。
(1)视觉backbone配置
# Vision backbone.
vision_backbone: str = "resnet18" # 视觉特征提取网络:使用ResNet18(轻量级,适合实时控制)
pretrained_backbone_weights: str | None = "ResNet18_Weights.IMAGENET1K_V1" # 预训练权重:使用ImageNet-1K预训练参数初始化,提升特征提取能力
replace_final_stride_with_dilation: int = False # 是否用空洞卷积替换ResNet的最终2x2 stride(默认关闭,保持特征图分辨率)
上面的参数是ACT算法中用于图像特征提取模块的核心配置,影响模型对视觉输入的理解能力和计算效率。
首先指定了图像特征提取的骨干网络为resnet18,其仅有18层网络,参数量约1100万,常用于实时机器人控制场景,ResNet是通过残差连接缓解深层网络梯度消失问题,能有效提取多尺度图像特征包括边缘纹理到语义信息。视觉 Backbone 的输出(如 ResNet-18 的 layer4 特征图)会被展平为序列,与机器人状态、潜在向量等多模态特征拼接后输入 Transformer 编码器。
其次指定了resnet18预训练权重的来源,默认使用使用 ImageNet-1K 数据集预训练的权重。
最后的replace_final_stride_with_dilation默认关闭,主要是控制是否用“空洞卷积”替换resnet最后一层的2*2步幅卷积。关闭空洞卷积适合对实时性要求高、特征分辨率要求低的场景,如粗粒度抓取任务。如果打开可保留更多的空洞细节(如物体边缘、纹理),适合精细操作如螺丝拧入、零件对齐、但是需要权衡计算量增加和内存的占用。
(2)transformer配置
# Transformer layers.
pre_norm: bool = False # Transformer块归一化位置:False=后归一化(原始ACT实现),True=前归一化(更稳定但需调参)
dim_model: int = 512 # Transformer隐藏层维度(特征维度)
n_heads: int = 8 # 多头注意力头数(8头,总注意力维度=512/8=64/头)
dim_feedforward: int = 3200 # 前馈网络中间维度(通常为dim_model的4-6倍,此处3200=512*6.25)
feedforward_activation: str = "relu" # 前馈网络激活函数(ReLU,原始ACT实现)
n_encoder_layers: int = 4 # Transformer编码器层数(4层,用于融合多模态输入特征)
# 注:原始ACT实现中n_decoder_layers=7,但因代码bug仅使用第1层,此处对齐原始实现设为1
n_decoder_layers: int = 1 # Transformer解码器层数(1层,用于生成动作块序列)
上面参数定义了ACT算法中Transformer 编码器/解码器的核心结构参数,直接决定模型的序列建模能力、计算效率和特征融合效果。
- pre_norm: 归一化位置 ,False=原始行为,True=训练更稳定(需重新调参),若训练发散,可尝试设为 True。
- dim_model:特征维度(模型容量),增大→更强表达能力,但计算/内存成本平方级增长,机器人实时场景建议 ≤ 1024。
- n_heads:注意力并行头数,增多更细粒度关注,但通信开销增大 ,保持 dim_model/n_heads = 64(如 512/8=64)。
- n_encoder_layers: 特征融合深度,增多融合更充分,但推理延迟增加,机械臂操作建议 4-6 层。
- n_decoder_layers: 动作生成深度,受原始 bug 限制,固定为 1 以对齐行为,若修复原始 bug,可尝试增至 3-4 层。
(3)VAE变分自编码配置
# VAE.
use_vae: bool = True # 是否启用VAE(默认启用,通过潜在空间建模动作分布)
latent_dim: int = 32 # VAE潜在空间维度(32维,压缩动作序列信息)
n_vae_encoder_layers: int = 4 # VAE编码器层数(4层Transformer,用于将动作块编码为潜在分布)
(4)推理配置
# Inference.
# Note: ACT原论文中启用时序集成时默认值为0.01
temporal_ensemble_coeff: float | None = None # 时序集成系数:None=禁用,>0=启用(指数加权平均平滑动作)
该参数就是是否启动ACT的Temporal Ensembling机制,时序集成(Temporal Ensembling) 功能的启用与权重计算方式,用于在推理时平滑动作序列,避免机器人执行突变动作(尤其适用于精细操作如机械臂抓取、插入等任务)。
要启动Temporal Ensembling机制需显式设置该参数为非 None 的浮点值(如 0.01),且需满足n_action_steps 必须设为 1(每次策略调用仅执行 1 步动作,确保每步都通过集成优化)。
当 temporal_ensemble_coeff = α(如 0.01)时,ACTTemporalEnsembler 会对连续多轮预测的动作块(chunk_size 长度)进行 指数加权平均。
(5)训练损失配置
# Training and loss computation.
dropout: float = 0.1
kl_weight: float = 10.0
dropout控制 Transformer 层的 随机失活概率,用于正则化,防止模型过拟合训练数据。在训练过程中,以 dropout 概率(此处 10%)随机将 Transformer 层(如多头注意力输出、前馈网络输出)的部分神经元激活值设为 0,强制模型学习更鲁棒的特征(不依赖特定神经元组合)
kl_weight控制 KL 散度损失(KL-divergence Loss) 的权重,仅在启用 VAE(use_vae=True,默认启用)时生效。10.0 是一个较大的权重,表明原始 ACT 实现中更注重约束潜在分布的“规范性”(接近标准正态),以确保 VAE 能生成多样化的动作序列,避免模型仅记忆训练数据中的动作模式。
(6)训练优化
# Training preset
optimizer_lr: float = 1e-5
optimizer_weight_decay: float = 1e-4
optimizer_lr_backbone: float = 1e-5
optimizer_lr控制除视觉Backbone外所有参数(如Transformer编码器/解码器、VAE层等)的梯度更新步长。学习率过大会导致训练不稳定(Loss震荡),过小则收敛缓慢。1e-5(0.00001)是训练Transformer类模型的经典学习率(如BERT、GPT等),尤其适用于。
optimizer_weight_decay权重衰减(L2正则化)系数,用于抑制过拟合。过小(如1e-5):正则化不足,易过拟合训练数据。过大(如1e-3):过度抑制参数更新,导致模型欠拟合。适用于机器人操作任务,训练数据通常包含噪声(如传感器误差、动作抖动),权重衰减可提升模型对噪声的鲁棒性。
optimizer_lr_backbone视觉Backbone(如ResNet18)参数的专用学习率。ACT原论文中Backbone与主模型联合训练,未使用更小的Backbone学习率。
在实际的工程中,可在get_optim_params 中显式区分Backbone与非Backbone参数,应用不同学习率:
def get_optim_params(self) -> dict:
return [
{
"params": [p for n, p in self.named_parameters() if not n.startswith("model.backbone") and p.requires_grad],
"lr": self.config.optimizer_lr, # 主参数学习率
},
{
"params": [p for n, p in self.named_parameters() if n.startswith("model.backbone") and p.requires_grad],
"lr": self.config.optimizer_lr_backbone, # Backbone专用学习率
},
]
策略入口类ACTPolicy
初始化逻辑
class ACTPolicy(PreTrainedPolicy):
def __init__(self, config: ACTConfig, dataset_stats=None):
super().__init__(config)
# 输入/输出归一化(标准化数据分布)
self.normalize_inputs = Normalize(config.input_features, config.normalization_mapping, dataset_stats)
self.unnormalize_outputs = Unnormalize(config.output_features, config.normalization_mapping, dataset_stats)
self.model = ACT(config) # 加载ACT神经网络
# 初始化时序集成器(若启用)
if config.temporal_ensemble_coeff is not None:
self.temporal_ensembler = ACTTemporalEnsembler(config.temporal_ensemble_coeff, config.chunk_size)
self.reset() # 重置动作队列/集成器
这段代码定义了一个基于Action Chunking Transformer (ACT)的策略类,主要用于机器人操作任务的动作生成。
self.normalize_inputs、self.normalize_targets、self.unnormalize_outputs。这3个参数用于数据预处理和后处理的关键组件,负责输入特征的归一化、目标动作的归一化以及模型输出动作的反归一化。
根据config.temporal_ensemble_coeff条件来判断是否初始化temporal ensembler,用于联系预测的动作块进行加权平均,提升动作输出的稳定性。ACTTemporalEnsembler通过指数权重(exp(-temporal_ensemble_coeff * i))对历史动作进行加权, older动作权重更高(原论文默认系数0.01)。
推理逻辑
select_action是ACTPolicy类的核心方法,主要的目的就是根据环境观测(batch)然后预测输出机器人要执行的动作。生成预测的动作有两种模式,一个是启用temporal ensemble方式另外一种不启用。
在进入预测生成动作之前先调用self.eval()强制策略进入评估模式(推理),因为策略处于训练模式(启用dropout等)。
(1)时间集成模式
def select_action(self, batch: dict[str, Tensor]) -> Tensor:
if self.config.temporal_ensemble_coeff is not None:
actions = self.predict_action_chunk(batch) # 生成动作块
action = self.temporal_ensembler.update(actions) # 时序集成平滑
return action
开启temporal ensemble:根据配置中temporal_ensemble_coeff条件优先走temporal ensemble模式。该模式先调用self.predict_action_chunk(batch)调用模型预测一个动作块((batch_size, chunk_size, action_dim)),即一次性预测多个连续动作。然后调用self.temporal_ensembler.update(actions)通过时间集成器对动作块进行加权平滑( older 动作权重更高,原论文默认系数 0.01),输出单个稳定动作。主要的目的就是论文中的减少动作抖动,提升机器人控制平滑性。需要注意的是如果开启了该模式,n_action_steps 必须为 1,否则会破坏集成器的时序加权逻辑。
时间集成核心实现
class ACTTemporalEnsembler:
def __init__(self, temporal_ensemble_coeff: float, chunk_size: int):
# 指数权重:w_i = exp(-coeff * i),i为动作索引(0为最旧动作)
self.ensemble_weights = torch.exp(-temporal_ensemble_coeff * torch.arange(chunk_size))
self.ensemble_weights_cumsum = torch.cumsum(self.ensemble_weights, dim=0) # 权重累加和(用于归一化)
def update(self, actions: Tensor) -> Tensor:
# actions: (batch_size, chunk_size, action_dim)
if self.ensembled_actions is None:
self.ensembled_actions = actions.clone() # 初始化集成动作
else:
# 在线加权更新:历史动作 * 累计权重 + 新动作 * 当前权重,再归一化
self.ensembled_actions *= self.ensemble_weights_cumsum[self.ensembled_actions_count - 1]
self.ensembled_actions += actions[:, :-1] * self.ensemble_weights[self.ensembled_actions_count]
self.ensembled_actions /= self.ensemble_weights_cumsum[self.ensembled_actions_count]
return self.ensembled_actions[:, 0] # 返回集成后的首步动作
(2)动作队列模式
def select_action(self, batch: dict[str, Tensor]) -> Tensor:
if len(self._action_queue) == 0:
# 生成动作块(chunk_size步),取前n_action_steps步存入队列
actions = self.predict_action_chunk(batch)[:, :self.config.n_action_steps]
# 队列形状:(n_action_steps, batch_size, action_dim),故转置后入队
self._action_queue.extend(actions.transpose(0, 1))
return self._action_queue.popleft() # 每次弹出队列首步动作
关闭temporal ensemble:未启用时间集成器是,使用简单的动作队列缓存动作块并逐步输出。首先调用调用 predict_action_chunk 获取动作块,这里将会输出一个chunk的动作。但是并不是把这个chunk的集合全都送入队列,而是截取前 n_action_steps 个动作(n_action_steps 为每次预测的动作步数,通常 ≤ chunk_size)。举个例子如果chunk_size是100,但是n_action_steps是50,那么策略一次预测出100个序列动作,但是只取前面的50个。最后把这动作块进行转置后加入队列,之所以转置是因为模型输出动作块形状为 (batch_size, n_action_steps, action_dim),而队列需要按时间步顺序存储(即 (n_action_steps, batch_size, action_dim)),因此通过 transpose(0, 1) 交换前两维。
为什么预测了chunk块,要用n_action_steps做限制了?
可能是因为利用了批量推理的效率,避免因动作块过长导致环境状态变化(如物体移动、机器人位姿偏移)时动作失效。同时限制单次执行的动作步数,强制模型在 n_action_steps 步后重新推理(基于最新观测),确保动作与环境状态同步。
训练损失计算
def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict]:
batch = self.normalize_inputs(batch) # 输入归一化
batch = self.normalize_targets(batch) # 目标动作归一化
actions_hat, (mu, log_sigma_x2) = self.model(batch) # 模型输出:预测动作、VAE分布参数
# L1损失(忽略填充动作)
l1_loss = (F.l1_loss(batch[ACTION], actions_hat, reduction="none") * ~batch["action_is_pad"].unsqueeze(-1)).mean()
loss_dict = {"l1_loss": l1_loss.item()}
# VAE KL散度损失(若启用)
if self.config.use_vae:
mean_kld = (-0.5 * (1 + log_sigma_x2 - mu.pow(2) - log_sigma_x2.exp())).sum(-1).mean()
loss_dict["kld_loss"] = mean_kld.item()
loss = l1_loss + mean_kld * self.config.kl_weight # 总损失 = 重构损失 + KL权重 * KL损失
else:
loss = l1_loss
return loss, loss_dict
这是ACTPolicy类训练模型的接口,负责接收输入数据、通过模推理生成动作预测、计算损失并返回总损失及损失组件字典。
首先对输入的观测数据进行归一化处理,normalize_inputs 基于数据集统计信息(均值、标准差)将输入特征缩放到标准分布(通常均值为0、方差为1),确保模型训练时输入数据分布稳定。
接着将图像特征统一整理到到 batch[OBS_IMAGES] 列表中,便于模型后续提取图像特征。
其次调用self.model(batch)进行模型推理返回模型预测的归一化动作序列,已经如果启用了VAE返回latent分布的均值和对数方差。
计算预测动作(actions_hat)与真实动作(batch[ACTION])的 L1 损失(平均绝对误差)。KL散度的理论意义在于度量两个概率分布之间的差异程度,当KL散度越大的时候,说明两者的差异程度越大;而当KL散度小的时候,则说明两者的差异程度小。如果两者相同的话,则该KL散度应该为0。如果启动了VAE,需要再计算KL散度,总的损失为L1损失与加权KL损失知乎,其中kl_weight是控制损失权重的超参数。如果没有启动VAE直接返回L1损失。
核心算法ACT
整体结构
class ACT(nn.Module):
def __init__(self, config: ACTConfig):
super().__init__()
self.config = config
# VAE编码器(可选):将动作序列编码为潜在分布
if config.use_vae:
self.vae_encoder = ACTEncoder(config, is_vae_encoder=True)
self.vae_encoder_latent_output_proj = nn.Linear(config.dim_model, config.latent_dim * 2) # 输出mu和log(sigma²)
# 视觉Backbone:ResNet提取图像特征
if config.image_features:
backbone_model = getattr(torchvision.models, config.vision_backbone)(weights=config.pretrained_backbone_weights)
self.backbone = IntermediateLayerGetter(backbone_model, return_layers={"layer4": "feature_map"}) # 取layer4特征图
# Transformer编码器-解码器
self.encoder = ACTEncoder(config) # 处理多模态输入(图像、状态、潜在向量)
self.decoder = ACTDecoder(config) # 生成动作块
self.action_head = nn.Linear(config.dim_model, config.action_feature.shape[0]) # 动作输出头
这段代码是ACT类的构造函数,主要是负责初始化模型的核心组件,包括VAE编码器、视觉backbone、transformer编码器/解码器、输入投影层、位置嵌入和动作预测头等。
(1)VAE编码器初始化
if self.config.use_vae:
self.vae_encoder = ACTEncoder(config, is_vae_encoder=True) # VAE 编码器(Transformer 架构)
self.vae_encoder_cls_embed = nn.Embedding(1, config.dim_model) # CLS 标记嵌入(用于 latent 分布参数)
# 机器人状态投影层:将关节状态特征映射到模型隐藏维度
if self.config.robot_state_feature:
self.vae_encoder_robot_state_input_proj = nn.Linear(
self.config.robot_state_feature.shape[0], config.dim_model
)
# 动作投影层:将动作特征映射到模型隐藏维度
self.vae_encoder_action_input_proj = nn.Linear(
self.config.action_feature.shape[0], config.dim_model
)
# Latent 分布投影层:将 VAE 编码器输出映射为 latent 均值和方差(维度=2*latent_dim)
self.vae_encoder_latent_output_proj = nn.Linear(config.dim_model, config.latent_dim * 2)
# 固定正弦位置嵌入:为 VAE 编码器输入序列添加位置信息(CLS + 机器人状态 + 动作序列)
num_input_token_encoder = 1 + config.chunk_size # 1(CLS) + chunk_size(动作序列长度)
if self.config.robot_state_feature:
num_input_token_encoder += 1 # 若包含机器人状态,增加 1 个 token
self.register_buffer(
"vae_encoder_pos_enc", # 注册为缓冲区(不参与梯度更新)
create_sinusoidal_pos_embedding(num_input_token_encoder, config.dim_model).unsqueeze(0),
)
调用ACTEncoder初始化一个VAE编码器,本质是一个transformer编码器,其参数is_vae_encoder=True 标志用于区分该编码器为 VAE 专用(影响层数、注意力机制等配置,具体见 ACTEncoder 实现)。
定义一个可学习的CLS标记,类似BERT中的[CLS],用于聚合VAE编码器输入序列的全局信息,最终生成latent分布参数(均值和方差),nn.Embedding(1, config.dim_model) 创建一个单元素嵌入表,输出维度为模型隐藏维度 dim_model。
当输入包含机器人状态特征(如关节角度、速度)时启用。通过线性层将机器人状态特征(原始维度)映射到模型隐藏维度 dim_model,确保与其他输入 token(如动作序列)维度一致,可拼接为序列输入。
同理将动作序列中的每个动作(原始维度,如机器人关节控制维度)通过线性层映射到 dim_model,转换为 Transformer 可处理的 token 序列。
将 VAE 编码器输出的 CLS 标记特征(维度 dim_model)映射到 latent 分布的参数空间。
最后的固定正弦位置嵌入,其作用是为 VAE 编码器的输入序列添加固定位置信息,帮助 Transformer 区分不同位置的 token(CLS、机器人状态、动作序列中的不同时间步)。
(2)视觉backbone初始化
if self.config.image_features:
backbone_model = getattr(torchvision.models, config.vision_backbone)(
replace_stride_with_dilation=[False, False, config.replace_final_stride_with_dilation], # 控制最后一层是否使用空洞卷积
weights=config.pretrained_backbone_weights, # 预训练权重(如 ImageNet)
norm_layer=FrozenBatchNorm2d, # 冻结 BatchNorm 层(避免微调时破坏预训练分布)
)
# 提取 ResNet 的 layer4 输出作为图像特征图(高层语义特征)
self.backbone = IntermediateLayerGetter(backbone_model, return_layers={"layer4": "feature_map"})
当ACT类中配置包含图像特征,初始化图像特征提取骨干网络,并通过 IntermediateLayerGetter 提取高层视觉特征供后续 Transformer 处理。
首先调用getattr动态加载 torchvision.models 中的 ResNet 模型(如 resnet18、resnet50),具体型号由配置 config.vision_backbone 指定。
然后使用 torchvision.ops.misc.IntermediateLayerGetter 从 ResNet 中提取指定层的输出,作为图像的高层特征。return_layers={“layer4”: “feature_map”}指定提取 ResNet 的 layer4(最后一个残差块)输出,并将其重命名为 feature_map。ResNet 的 layer4 输出包含最抽象的视觉语义信息(如物体轮廓、纹理),是下游任务(如 Transformer 编码)的关键输入。self.backbone 调用时返回字典 {“feature_map”: tensor},其中 tensor 为形状 (B, C, H, W) 的特征图(B 为 batch 大小,C 为通道数,H/W 为特征图高/宽)。
(3)transformer编码器/解码器初始化
# Transformer 编码器:处理输入特征(latent、机器人状态、环境状态、图像特征)
self.encoder = ACTEncoder(config)
# Transformer 解码器:生成动作序列(作为 VAE 解码器时,输入为 latent;否则直接处理编码器输出)
self.decoder = ACTDecoder(config)
这两行代码是初始化ACT核心组件transformer编码器和解码器。
(4)输入投影层
# 机器人状态投影:将机器人关节状态特征(如关节角度、速度)映射到 dim_model
if self.config.robot_state_feature:
self.encoder_robot_state_input_proj = nn.Linear(
self.config.robot_state_feature.shape[0], config.dim_model
)
# 环境状态投影:将环境状态特征(如物体位置)映射到 dim_model
if self.config.env_state_feature:
self.encoder_env_state_input_proj = nn.Linear(
self.config.env_state_feature.shape[0], config.dim_model
)
# Latent 投影:将 VAE 输出的 latent 向量映射到 dim_model
self.encoder_latent_input_proj = nn.Linear(config.latent_dim, config.dim_model)
# 图像特征投影:通过 1x1 卷积将 Backbone 输出的特征图(C×H×W)映射到 dim_model
if self.config.image_features:
self.encoder_img_feat_input_proj = nn.Conv2d(
backbone_model.fc.in_features, # Backbone 输出通道数(如 ResNet18 的 layer4 输出为 512)
config.dim_model,
kernel_size=1 # 1x1 卷积不改变空间维度,仅调整通道数
)
在 Transformer 编码器中,要求所有输入 token 具有相同的维度(dim_model),而不同输入特征(状态、图像、latent 等)的原始维度各异,投影层通过线性/卷积变换实现维度对齐。
投影层(Projection Layer) 是一类用于将不同类型的输入特征(如机器人状态、环境状态、 latent 向量、图像特征等)映射到统一维度的神经网络层。其核心作用是将原始输入特征的维度转换为 Transformer 编码器能够处理的隐藏维度(即代码中的 config.dim_model),确保多模态输入(如状态、图像)能被编码器统一处理。
在ACT类中定义了多个投影层
- self.config.robot_state_feature:输入为机器人状态特征(如关节角度、速度),原始维度为 self.config.robot_state_feature.shape[0],通过线性层(nn.Linear)将机器人状态的原始维度映射到 dim_model,使其成为 Transformer 编码器可接收的 token。
- self.config.env_state_feature:环境状态特征(如物体位置、场景参数),原始维度为 self.config.env_state_feature.shape[0],与机器人状态投影层类似,通过线性层将环境状态映射到 dim_model,实现多模态特征的维度统一。
- self.encoder_latent_input_proj:输入是Latent 向量(来自 VAE 采样或零向量),维度为 config.latent_dim,将 latent 向量从 latent 空间维度映射到 dim_model,作为 Transformer 编码器的核心输入 token 之一。
- self.encoder_img_feat_input_proj:输入是图像特征图(来自 ResNet 骨干网络的 layer4 输出),通道数为 backbone_model.fc.in_features(如 ResNet18 为 512),通过 1×1 卷积层(nn.Conv2d)将图像特征图的通道数调整为 dim_model,同时保持空间维度(H×W)不变,以便展平为序列 token 输入 Transformer。
(5)位置嵌入
# 1D 位置嵌入:用于 latent、机器人状态、环境状态等非图像特征(共 n_1d_tokens 个 token)
n_1d_tokens = 1 # latent 占 1 个 token
if self.config.robot_state_feature:
n_1d_tokens += 1 # 机器人状态占 1 个 token
if self.config.env_state_feature:
n_1d_tokens += 1 # 环境状态占 1 个 token
self.encoder_1d_feature_pos_embed = nn.Embedding(n_1d_tokens, config.dim_model) # 可学习的 1D 位置嵌入
# 2D 位置嵌入:用于图像特征图(H×W 空间位置)
if self.config.image_features:
self.encoder_cam_feat_pos_embed = ACTSinusoidalPositionEmbedding2d(config.dim_model // 2) # 正弦 2D 位置嵌入
位置嵌入是 Transformer 的关键组件,用于解决自注意力机制 对输入序列顺序不敏感 的问题。本代码中有一个1D特征位置嵌入层和图像特征位置嵌入式层。
- 1D 特征位置嵌入式层:为 1D 特征 token 提供 可学习的位置嵌入,帮助 Transformer 区分不同类型 token 的位置(如 latent 是第 1 个 token,机器人状态是第 2 个等)。
- 2D 图像特征位置嵌入层:为图像特征图的 2D 空间像素 提供 正弦位置嵌入,编码像素在特征图中的 (高度, 宽度) 空间位置信息。
(6)解码器位置嵌入与动作预测头
self.decoder_pos_embed = nn.Embedding(config.chunk_size, config.dim_model) # chunk_size 为动作序列长度
self.action_head = nn.Linear(config.dim_model, self.config.action_feature.shape[0])
- self.decoder_pos_embed:为解码器生成的动作序列(action chunk)提供 可学习的位置嵌入,帮助 Transformer 解码器区分动作序列中不同时间步的位置信息(如第 1 个动作、第 2 个动作等)。
- self.action_head:将解码器输出的高维特征(config.dim_model 维度)投影到实际动作空间维度,生成最终可执行的动作序列。
forward方法
forward负责执行 Action Chunking Transformer 的完整前向传播流程,涵盖 VAE 编码(可选)、多模态输入处理、Transformer 编码器-解码器计算,最终输出动作序列及潜在变量分布参数(若启用 VAE)。以下是分步骤解析。
(1)输入验证与 batch_size 确定
if self.config.use_vae and self.training:
assert "action" in batch, "actions must be provided when using the variational objective in training mode."
if "observation.images" in batch:
batch_size = batch["observation.images"][0].shape[0]
else:
batch_size = batch["observation.environment_state"].shape[0]
若启用变分目标(VAE)且处于训练模式,需确保输入包含动作序列(”action”),因为 VAE 编码器需以动作序列为目标数据。确定batch_size,根据输入模态(图像或环境状态)确定批次大小,确保后续张量操作维度对齐。
(2) Latent 向量生成(VAE 编码逻辑)
# 构建 VAE 编码器输入:[cls_token, 机器人状态(可选), 动作序列]
cls_embed = einops.repeat(self.vae_encoder_cls_embed.weight, "1 d -> b 1 d", b=batch_size) # (B, 1, D)
if self.config.robot_state_feature:
robot_state_embed = self.vae_encoder_robot_state_input_proj(batch["observation.state"]).unsqueeze(1) # (B, 1, D)
action_embed = self.vae_encoder_action_input_proj(batch["action"]) # (B, S, D)
vae_encoder_input = torch.cat([cls_embed, robot_state_embed, action_embed] if self.config.robot_state_feature else [cls_embed, action_embed], axis=1) # (B, S+2, D) 或 (B, S+1, D)
# 添加固定正弦位置嵌入
pos_embed = self.vae_encoder_pos_enc.clone().detach().permute(1, 0, 2) # (S+2, 1, D)
# 构建注意力掩码(忽略填充 token)
cls_joint_is_pad = torch.full((batch_size, 2 if self.config.robot_state_feature else 1), False, device=batch["observation.state"].device)
key_padding_mask = torch.cat([cls_joint_is_pad, batch["action_is_pad"]], axis=1) # (B, S+2) 或 (B, S+1)
# VAE 编码器前向传播,提取 cls token 输出
cls_token_out = self.vae_encoder(vae_encoder_input.permute(1, 0, 2), pos_embed=pos_embed, key_padding_mask=key_padding_mask)[0] # (B, D)
latent_pdf_params = self.vae_encoder_latent_output_proj(cls_token_out) # (B, 2*latent_dim)
mu = latent_pdf_params[:, :self.config.latent_dim] # 均值 (B, latent_dim)
log_sigma_x2 = latent_pdf_params[:, self.config.latent_dim:] # 2*log(标准差) (B, latent_dim)
# 重参数化采样 latent 向量
latent_sample = mu + log_sigma_x2.div(2).exp() * torch.randn_like(mu) # (B, latent_dim)
将动作序列编码为 latent 分布(均值 mu、方差相关参数 log_sigma_x2),并通过重参数化技巧采样得到 latent 向量,作为 Transformer 编码器的核心输入。
(3)无VAE时的latent向量初始化
mu = log_sigma_x2 = None
latent_sample = torch.zeros([batch_size, self.config.latent_dim], dtype=torch.float32).to(batch["observation.state"].device)
如果不启用VAE或非训练模式,直接使用零向量作为latent输入。
(4)transformer编码器输入构建
encoder_in_tokens = [self.encoder_latent_input_proj(latent_sample)] # latent 投影:(B, latent_dim) → (B, dim_model)
encoder_in_pos_embed = list(self.encoder_1d_feature_pos_embed.weight.unsqueeze(1)) # 1D token 位置嵌入:(n_1d_tokens, 1, dim_model)
# 添加机器人状态 token(若启用)
if self.config.robot_state_feature:
encoder_in_tokens.append(self.encoder_robot_state_input_proj(batch["observation.state"])) # (B, dim_model)
# 添加环境状态 token(若启用)
if self.config.env_state_feature:
encoder_in_tokens.append(self.encoder_env_state_input_proj(batch["observation.environment_state"])) # (B, dim_model)
1D特征token处理,通过线性层(nn.Linear)将 latent 向量、机器人/环境状态的原始维度映射到模型隐藏维度 dim_model,确保各 token 维度一致。为每个 1D token(latent、状态)分配可学习的位置嵌入,编码其在序列中的位置信息。
if self.config.image_features:
all_cam_features = []
all_cam_pos_embeds = []
for img in batch["observation.images"]: # 遍历多相机图像
# 骨干网络提取特征图(如 ResNet layer4 输出)
cam_features = self.backbone(img)["feature_map"] # (B, C_backbone, H, W)
# 图像位置嵌入(2D 正弦位置编码)
cam_pos_embed = self.encoder_cam_feat_pos_embed(cam_features).to(dtype=cam_features.dtype) # (1, dim_model, H, W)
# 特征投影:调整通道数至 dim_model
cam_features = self.encoder_img_feat_input_proj(cam_features) # (B, dim_model, H, W)
# 展平为序列:(H*W, B, dim_model)
cam_features = einops.rearrange(cam_features, "b c h w -> (h w) b c")
cam_pos_embed = einops.rearrange(cam_pos_embed, "b c h w -> (h w) b c")
all_cam_features.append(cam_features)
all_cam_pos_embeds.append(cam_pos_embed)
# 拼接多相机特征
encoder_in_tokens.extend(torch.cat(all_cam_features, axis=0))
encoder_in_pos_embed.extend(torch.cat(all_cam_pos_embeds, axis=0))
对于图像的特征输入,启用图像输入,通过视觉骨干网络提取特征并转换为序列 token。通过 1×1 卷积(encoder_img_feat_input_proj)将特征图通道数调整为 dim_model,再展平为序列 token(H*W 个像素 token)。通过 ACTSinusoidalPositionEmbedding2d 为像素 token 添加空间位置信息,编码其在特征图中的 (H, W) 坐标。
(5)transformer编码器-解码器前向传播
# 堆叠所有输入 token 和位置嵌入
encoder_in_tokens = torch.stack(encoder_in_tokens, axis=0) # (seq_len, B, dim_model)
encoder_in_pos_embed = torch.stack(encoder_in_pos_embed, axis=0) # (seq_len, 1, dim_model)
# 编码器前向传播
encoder_out = self.encoder(encoder_in_tokens, pos_embed=encoder_in_pos_embed) # (seq_len, B, dim_model)
上面为编码器输出,输入序列为包含 1D 特征 token(latent、状态)和图像像素 token,总长度为 seq_len = n_1d_tokens + sum(H*W for 各相机)。通过自注意力机制融合多模态输入,输出包含全局上下文的特征序列 encoder_out。
# 解码器输入初始化为零向量(类似 DETR 的目标查询)
decoder_in = torch.zeros((self.config.chunk_size, batch_size, self.config.dim_model), dtype=encoder_in_pos_embed.dtype, device=encoder_in_pos_embed.device) # (chunk_size, B, dim_model)
# 解码器前向传播(交叉注意力融合编码器输出)
decoder_out = self.decoder(
decoder_in,
encoder_out,
encoder_pos_embed=encoder_in_pos_embed, # 编码器位置嵌入
decoder_pos_embed=self.decoder_pos_embed.weight.unsqueeze(1), # 解码器动作序列位置嵌入
) # (chunk_size, B, dim_model)
# 转换维度并投影到动作空间
decoder_out = decoder_out.transpose(0, 1) # (B, chunk_size, dim_model)
actions = self.action_head(decoder_out) # (B, chunk_size, action_dim)
return actions, (mu, log_sigma_x2)
解码器部分,初始化为零向量序列(长度 chunk_size,即一次预测的动作数量),类似 DETR 的“目标查询”。解码器通过交叉注意力机制关注编码器输出的上下文特征,生成动作序列特征。通过 action_head(线性层)将解码器输出的高维特征投影到机器人动作空间维度(action_dim),得到最终动作序列。
最终返回actions和(mu, log_sigma_x2)。前者是形状 (B, chunk_size, action_dim),预测的动作序列;后者是若启用 VAE,返回 latent 分布的均值和方差参数(log_sigma_x2 = 2*log(σ)),否则为 (None, None)。
ACT编码器
ACTEncoder
ACTEncoder 是 Transformer 编码器的顶层容器,负责堆叠多个 ACTEncoderLayer(编码器层)并执行最终归一化,支持 VAE 编码器 和 主 Transformer 编码器 两种角色。
class ACTEncoder(nn.Module):
def __init__(self, config: ACTConfig, is_vae_encoder: bool = False):
super().__init__()
self.is_vae_encoder = is_vae_encoder
# 根据角色选择编码器层数(VAE 编码器 vs 主编码器)
num_layers = config.n_vae_encoder_layers if self.is_vae_encoder else config.n_encoder_layers
# 堆叠 num_layers 个编码器层
self.layers = nn.ModuleList([ACTEncoderLayer(config) for _ in range(num_layers)])
# 最终归一化(预归一化模式下启用)
self.norm = nn.LayerNorm(config.dim_model) if config.pre_norm else nn.Identity()
通过 is_vae_encoder 区分角色,分别使用 n_vae_encoder_layers(VAE 专用层数)或 n_encoder_layers(主编码器层数)。通过 nn.ModuleList 管理多个 ACTEncoderLayer,形成深度编码器结构。若 config.pre_norm=True(预归一化),对所有层输出做最终归一化;否则使用 nn.Identity(无操作),此时归一化在每层内部完成(后归一化)。
def forward(
self, x: Tensor, pos_embed: Tensor | None = None, key_padding_mask: Tensor | None = None
) -> Tensor:
for layer in self.layers:
x = layer(x, pos_embed=pos_embed, key_padding_mask=key_padding_mask)
x = self.norm(x)
return x
- 逐层特征提取:输入张量 x(形状通常为 (seq_len, batch_size, dim_model))依次通过所有 ACTEncoderLayer,每层融合自注意力和前馈网络特征。
- 位置嵌入与掩码:pos_embed 提供序列位置信息,key_padding_mask 标记需忽略的填充位置,两者均传递给每层。
- 最终归一化:所有层处理完毕后,通过 self.norm 输出最终特征。
ACTEncoderLayer
下面是单个编码器层的实现
class ACTEncoderLayer(nn.Module):
def __init__(self, config: ACTConfig):
super().__init__()
# 自注意力模块
self.self_attn = nn.MultiheadAttention(config.dim_model, config.n_heads, dropout=config.dropout)
# 前馈网络(Linear -> Activation -> Dropout -> Linear)
self.linear1 = nn.Linear(config.dim_model, config.dim_feedforward)
self.dropout = nn.Dropout(config.dropout)
self.linear2 = nn.Linear(config.dim_feedforward, config.dim_model)
# 归一化与 dropout 层
self.norm1, self.norm2 = nn.LayerNorm(config.dim_model), nn.LayerNorm(config.dim_model)
self.dropout1, self.dropout2 = nn.Dropout(config.dropout), nn.Dropout(config.dropout)
# 激活函数与归一化模式标记
self.activation = get_activation_fn(config.feedforward_activation)
self.pre_norm = config.pre_norm
ACTEncoderLayer 是编码器的核心计算单元,包含 自注意力机制、前馈网络 和 残差连接,支持预归一化(PreNorm)或后归一化(PostNorm)模式。
- 自注意力:nn.MultiheadAttention 实现多头注意力,输入维度 dim_model,头数 n_heads。
- 前馈网络:将特征从 dim_model 映射到 dim_feedforward(扩展维度),经激活和 dropout 后映射回 dim_model。
- 归一化与 dropout:每层包含两个归一化层(norm1 用于注意力,norm2 用于前馈网络)和两个 dropout 层,增强训练稳定性。
def forward(self, x, pos_embed: Tensor | None = None, key_padding_mask: Tensor | None = None) -> Tensor:
# 自注意力模块 + 残差连接
skip = x
if self.pre_norm: # 预归一化:先归一化再计算注意力
x = self.norm1(x)
q = k = x if pos_embed is None else x + pos_embed # query 和 key 融合位置嵌入
x = self.self_attn(q, k, value=x, key_padding_mask=key_padding_mask)[0] # 取注意力输出(忽略权重)
x = skip + self.dropout1(x) # 残差连接 + dropout
# 前馈网络模块 + 残差连接
if self.pre_norm: # 预归一化:先归一化再计算前馈
skip = x
x = self.norm2(x)
else: # 后归一化:先计算注意力再归一化
x = self.norm1(x)
skip = x
x = self.linear2(self.dropout(self.activation(self.linear1(x)))) # 前馈网络
x = skip + self.dropout2(x) # 残差连接 + dropout
if not self.pre_norm: # 后归一化:最后归一化输出
x = self.norm2(x)
return x
上面是forward方法,可以分为自注意力阶段和前馈网络阶段。
- 自注意力阶段:残差连接,skip 保存输入,注意力输出经 dropout1 后与 skip 相加。位置嵌入,q 和 k 若有 pos_embed 则叠加位置信息,帮助模型捕捉序列顺序。归一化时机,pre_norm=True 时,先对 x 归一化(norm1)再计算注意力;否则后归一化(注意力后通过 norm1 归一化)。
- 前馈网络阶段:前馈计算,x 经线性层扩展维度、激活(如 ReLU/GELU)、dropout、线性层压缩维度。残差与归一化,类似注意力阶段,pre_norm 决定归一化时机,最终输出融合残差的特征。
总结一下,ACTEncoder,通过堆叠多个 ACTEncoderLayer 实现深度编码,动态适配 VAE 或主编码器角色,输出融合全局依赖的序列特征。ACTEncoderLayer,单个编码器层核心,通过“自注意力+前馈网络+残差连接”提取局部与全局特征,支持预/后归一化模式,是 Transformer 编码器的基础组件。两者协同构成 ACT 模型的编码器部分,负责将多模态输入(如图像、状态)编码为上下文特征,供解码器生成动作序列。
ACT解码器
ACTDecoder
ACTDecoder 是 Transformer 解码器的顶层模块,负责堆叠多个 ACTDecoderLayer(解码器子层)并对最终输出进行归一化,实现从编码器上下文特征到动作序列的映射。
class ACTDecoder(nn.Module):
def __init__(self, config: ACTConfig):
super().__init__()
self.layers = nn.ModuleList([ACTDecoderLayer(config) for _ in range(config.n_decoder_layers)])
self.norm = nn.LayerNorm(config.dim_model)
通过 nn.ModuleList 创建 config.n_decoder_layers 个 ACTDecoderLayer 实例,构成深度解码器(每层包含自注意力、交叉注意力和前馈网络)。使用 nn.LayerNorm 对所有解码器层的输出进行归一化,稳定训练过程。
def forward(
self,
x: Tensor,
encoder_out: Tensor,
decoder_pos_embed: Tensor | None = None,
encoder_pos_embed: Tensor | None = None,
) -> Tensor:
for layer in self.layers:
x = layer(
x, encoder_out, decoder_pos_embed=decoder_pos_embed, encoder_pos_embed=encoder_pos_embed
)
if self.norm is not None:
x = self.norm(x)
return x
输入参数如下:
- x:解码器输入序列(初始为零向量,形状 (chunk_size, batch_size, dim_model),chunk_size 为动作序列长度);
- encoder_out:编码器输出特征(形状 (encoder_seq_len, batch_size, dim_model));
- decoder_pos_embed:解码器位置嵌入(为动作序列提供时序位置信息);
- encoder_pos_embed:编码器位置嵌入(为编码器特征提供位置信息,辅助交叉注意力)。
将输入 x、编码器输出 encoder_out 及位置嵌入依次传入每个 ACTDecoderLayer,更新 x 为每层输出。所有层处理完毕后,通过 self.norm 对输出进行归一化,返回形状为 (chunk_size, batch_size, dim_model) 的特征张量(后续将映射为动作序列)。
ACTDecoderLayer
下面再介绍下ACTDecoderLayer。
ACTDecoderLayer 是解码器的基础单元,包含 自注意力(捕捉动作序列内部依赖)、交叉注意力(融合编码器上下文特征)和 前馈网络(增强特征表达能力)三大模块,支持预归一化(PreNorm)或后归一化(PostNorm)模式。
class ACTDecoderLayer(nn.Module):
def __init__(self, config: ACTConfig):
super().__init__()
# 自注意力(解码器内部时序依赖建模)
self.self_attn = nn.MultiheadAttention(config.dim_model, config.n_heads, dropout=config.dropout)
# 交叉注意力(融合编码器输出特征)
self.multihead_attn = nn.MultiheadAttention(config.dim_model, config.n_heads, dropout=config.dropout)
# 前馈网络(特征变换与增强)
self.linear1 = nn.Linear(config.dim_model, config.dim_feedforward) # 升维
self.dropout = nn.Dropout(config.dropout)
self.linear2 = nn.Linear(config.dim_feedforward, config.dim_model) # 降维
# 归一化层(3个,分别对应自注意力、交叉注意力、前馈网络)
self.norm1, self.norm2, self.norm3 = [nn.LayerNorm(config.dim_model) for _ in range(3)]
# Dropout层(3个,增强正则化)
self.dropout1, self.dropout2, self.dropout3 = [nn.Dropout(config.dropout) for _ in range(3)]
# 激活函数(如ReLU/GELU)
self.activation = get_activation_fn(config.feedforward_activation)
# 归一化模式标记(PreNorm/PostNorm)
self.pre_norm = config.pre_norm
前向传播forward可以分为3个阶段,分为自注意力->交叉注意力->前馈网络,每个阶段都包含归一化->计算->dropout->残差连接的逻辑。
(1)自注意力阶段
skip = x # 残差连接的输入
if self.pre_norm: # 预归一化:先归一化,再计算注意力
x = self.norm1(x)
# Query和Key融合位置嵌入(Value不融合,保持原始特征)
q = k = self.maybe_add_pos_embed(x, decoder_pos_embed)
x = self.self_attn(q, k, value=x)[0] # 自注意力输出(忽略注意力权重)
x = skip + self.dropout1(x) # 残差连接 + Dropout
(2)交叉注意力阶段
if self.pre_norm: # 预归一化:更新残差输入,归一化当前特征
skip = x
x = self.norm2(x)
else: # 后归一化:先归一化自注意力输出,再更新残差输入
x = self.norm1(x)
skip = x
# Query(解码器特征)融合解码器位置嵌入,Key(编码器特征)融合编码器位置嵌入
x = self.multihead_attn(
query=self.maybe_add_pos_embed(x, decoder_pos_embed),
key=self.maybe_add_pos_embed(encoder_out, encoder_pos_embed),
value=encoder_out,
)[0] # 交叉注意力输出(忽略权重)
x = skip + self.dropout2(x) # 残差连接 + Dropout
(3)前馈网络
if self.pre_norm: # 预归一化:更新残差输入,归一化当前特征
skip = x
x = self.norm3(x)
else: # 后归一化:先归一化交叉注意力输出,再更新残差输入
x = self.norm2(x)
skip = x
# 前馈网络:升维→激活→Dropout→降维
x = self.linear2(self.dropout(self.activation(self.linear1(x))))
x = skip + self.dropout3(x) # 残差连接 + Dropout
if not self.pre_norm: # 后归一化:最后归一化前馈网络输出
x = self.norm3(x)