最新文章
  • lerobot学习率调度器

    lerobot学习率调度器

    学习率调度器简介 是什么 学习率调度器(Learning Rate Scheduler)是深度学习训练中动态调整优化器学习率的工具(注意是在优化器的基础上动态调整学习率),通过优化收敛过程提升模型性能。 学习率(η)控制梯度更新步长,参数更新量 = -η × 梯度。过大步长导致震荡,过小则陷入局部最优或训练缓慢,固定学习率易导致训练初期震荡或后期收敛缓慢,调度器在训练期间自适应调整初期较高学习率加速收敛,中期稳定探索最优解,后期精细调优避免震荡。 以下是常见的学习率调度器 StepLR:每隔固定epoch将学习率乘以衰减因子(如step_size=10, gamma=0.5)。 ExponentialLR:每epoch按指数衰减(lr = lr × gamma)。 CosineAnnealingLR:按余弦曲线周期下降:η = η_min + 0.5×(η_max - η_min)×(1 + cos(π×t/T_max))。 OneCycleLR:分两阶段:线性升温至峰值,再退火至极小值。 PowerLR:基于幂律关系调整,与批次大小和token数量无关。 怎么用 import torch.optim as optim from torch.optim.lr_scheduler import StepLR # 1. 定义模型和优化器 model = ... # 神经网络模型 optimizer = optim.SGD(model.parameters(), lr=0.1) # 初始学习率0.1 # 2. 创建调度器(绑定优化器) scheduler = StepLR(optimizer, step_size=30, gamma=0.5) # 每30epoch衰减50% # 3. 训练循环 for epoch in range(100): # 前向传播 + 反向传播 train(...) optimizer.zero_grad() loss.backward() optimizer.step() # 4. 更新学习率(通常在epoch结束时) scheduler.step() # 查看当前学习率 print(f"Epoch {epoch}: LR={scheduler.get_last_lr()[0]:.6f}") 上面代码演示了基础的学习率调度使用示例,创建一个学习率调度器,传入的参数有优化器,主要的目的是让学习率调度器和优化器绑定,因为相当于是在优化器的基础上改进学习率调度。接下来就是在前向、反向传播更新完参数后调用scheduler.step()更新学习率,以便下一次计算使用。 from torch.optim.lr_scheduler import CosineAnnealingLR optimizer = SGD(model.parameters(), lr=0.01) # 基础学习率 total_epochs = 100 warmup_epochs = 10 # 预热epoch数 for epoch in range(total_epochs): # 1. 预热阶段:前10epoch线性增长 if epoch < warmup_epochs: lr = 0.01 * (epoch / warmup_epochs) for param_group in optimizer.param_groups: param_group['lr'] = lr # 2. 余弦退火阶段 else: scheduler = CosineAnnealingLR(optimizer, T_max=total_epochs-warmup_epochs, eta_min=1e-5) scheduler.step() # 训练代码同上... print(f"Epoch {epoch}: LR={optimizer.param_groups[0]['lr']:.6f}") 上面使用的是余弦退火+预热的方式。 lerobot调度器 抽象基类 @dataclass class LRSchedulerConfig(draccus.ChoiceRegistry, abc.ABC): num_warmup_steps: int # 预热步数(所有调度器的公共参数) @property def type(self) -> str: return self.get_choice_name(self.__class__) # 获取子类注册名称(如 "diffuser") @abc.abstractmethod def build(self, optimizer: Optimizer, num_training_steps: int) -> LRScheduler | None: """创建学习率调度器实例""" raise NotImplementedError LRSchedulerConfig是学习率调度器配置的核心抽象,通过抽象接口定义+配置注册机制,实现了学习率调度策略的统一管理与灵活扩展。 其同样继承了abc.ABC标记为抽象基类,强制子类事项build抽象方法,同时继承draccus.ChoiceRegistry提供子类注册机制,通过 @LRSchedulerConfig.register_subclass("名称") 将子类与调度器类型绑定(如 "diffuser" → DiffuserSchedulerConfig),支持配置驱动的动态实例化。同时使用了@dataclass 装饰器自动生成构造函数、repr 等方法,简化调度器超参数的定义与管理。 其核心属性只有一个num_warmup_steps表示学习率预热步数。预热是深度学习训练的常见技巧(尤其在 Transformer 等模型中),将其作为基类字段可避免子类重复定义。也是几乎所有学习率调度器的基础参数,用于控制“预热阶段”(学习率从低到高线性增长的步数),避免训练初期因高学习率导致的不稳定。 其只有两个方法type和build,type是用于获取子类注册调度器类型名称进而匹配到对应子类,而build的方法是强制子类实现。 实例化子类 lerobot的学习率调度实现了3个DiffuserSchedulerConfig、VQBeTSchedulerConfig、CosineDecayWithWarmupSchedulerConfig子类,这里以DiffuserSchedulerConfig简单说明。 @LRSchedulerConfig.register_subclass("diffuser") @dataclass class DiffuserSchedulerConfig(LRSchedulerConfig): name: str = "cosine" # 调度器类型(如 "cosine"、"linear",来自 diffusers) num_warmup_steps: int | None = None # 预热步数(可选,未指定则不预热) def build(self, optimizer: Optimizer, num_training_steps: int) -> LambdaLR: from diffusers.optimization import get_scheduler # 复用 Diffusers 的调度器实现 kwargs = {**asdict(self), "num_training_steps": num_training_steps, "optimizer": optimizer} # 构造调度器参数:类字段 + 训练相关参数 return get_scheduler(**kwargs) # 调用 diffusers API 创建调度器 diffusers.optimization.get_scheduler 是 Diffusers 库提供的调度器工厂函数,支持多种预定义策略(如 "cosine"、"linear"、"constant" 等)。 asdict(self)将 DiffuserSchedulerConfig 实例的字段(如 name="cosine"、num_warmup_steps=1000)转换为字典;而num_training_steps:总训练步数(调度器需基于此计算衰减周期);最后的optimizer就是待绑定的优化器实例(调度器需调整其参数组的学习率)。 状态管理 (1)存储 def save_scheduler_state(scheduler: LRScheduler, save_dir: Path) -> None: state_dict = scheduler.state_dict() # 获取调度器状态(如当前 step、预热步数等) write_json(state_dict, save_dir / SCHEDULER_STATE) # 保存为 JSON 文件(如 "scheduler_state.json") 该函数主要是将学习率调度器的状态写入到磁盘,为后续断点续训提供支持。state_dict = scheduler.state_dict()获取调度器的当前状态字典,包含调度器运行所需的所有动态信息。接着调用write_json写入到到文件中,文件名默认scheduler_state.json。 (2)加载 def load_scheduler_state(scheduler: LRScheduler, save_dir: Path) -> LRScheduler: # 从 JSON 加载状态,并按当前调度器结构适配(确保兼容性) state_dict = deserialize_json_into_object(save_dir / SCHEDULER_STATE, scheduler.state_dict()) scheduler.load_state_dict(state_dict) # 恢复状态 return scheduler 该函数也比较简单,主要负责从磁盘加载之前保存的调度器状态(如当前训练步数、学习率调整进度等),使训练能够从断点处继续,确保学习率调整策略的连续性。 工程调用 命令参数 在lerobot中,启动训练是可以通过参数来实例化使用哪些调度器,如下。 --scheduler.type=diffuser指定使用diffuser调度类。 --scheduler.num_warmup_steps=1000指定预热步数。 --scheduler.num_warmup_steps=0.5指定余弦衰减周期。 创建 工程中通过 optim/factory.py 中的 make_optimizer_and_scheduler 函数统一创建优化器和调度器,并完成两者的绑定。 def make_optimizer_and_scheduler( cfg: TrainPipelineConfig, policy: PreTrainedPolicy ) -> tuple[Optimizer, LRScheduler | None]: # 1. 创建优化器(基于优化器配置,如 AdamConfig、MultiAdamConfig) optimizer = cfg.optimizer.build(policy.parameters()) # policy.parameters() 为模型参数 # 2. 创建调度器(基于调度器配置,如 DiffuserSchedulerConfig、VQBeTSchedulerConfig) # 关键:通过调度器配置的 `build` 方法,将优化器作为参数传入,完成绑定。 lr_scheduler = cfg.scheduler.build(optimizer, cfg.steps) if cfg.scheduler is not None else None return optimizer, lr_scheduler 优化器创建:cfg.optimizer.build(...) 根据配置生成优化器实例(如 Adam、AdamW),并关联模型参数(policy.parameters())。 调度器绑定:cfg.scheduler.build(optimizer, cfg.steps) 调用调度器配置类(如 DiffuserSchedulerConfig)的 build 方法,将优化器实例作为参数传入,生成与该优化器绑定的调度器实例(如 LambdaLR) 更新 在训练过程中,如工程 scripts/train.py 中,调度器被集成到训练循环,通过 scheduler.step() 更新学习率通过调用scheduler.step() 更新学习率。 def train(): # ... 初始化优化器、调度器 ... optimizer, lr_scheduler = make_optimizer_and_scheduler(cfg, policy) for step in range(start_step, cfg.steps): # ... 前向传播、损失计算、反向传播 ... optimizer.step() if lr_scheduler is not None: lr_scheduler.step() # 调用调度器更新学习率 续训恢复 在断点续训是,通过 utils/train_utils.py 中的 save_training_state 和 load_training_state 函数处理断点续训,其中调用了 schedulers.py 的状态管理函数: def save_training_state(step: int, optimizer: Optimizer, lr_scheduler: LRScheduler | None, save_dir: Path): # ... 保存优化器状态 ... if lr_scheduler is not None: save_scheduler_state(lr_scheduler, save_dir) # 调用 schedulers.py 中的保存函数 def load_training_state(checkpoint_path: Path, optimizer: Optimizer, lr_scheduler: LRScheduler | None): # ... 加载优化器状态 ... if lr_scheduler is not None: lr_scheduler = load_scheduler_state(lr_scheduler, checkpoint_path) # 调用 schedulers.py 中的加载函数 return step, optimizer, lr_scheduler
  • lerobot策略优化器

    lerobot策略优化器

    torch.optim简介 在学校lerobot的策略优化器前,我们先再复习一下什么是优化器。 什么优化器 优化器官方解释就是在深度学习中让损失函数通过梯度下降思想逐步调整参数以达到最小损失。 简单理解优化器的就是更新计算参数的,根据损失函数的梯度方向调整模型权重和偏置值,公式为:新参数 = 旧参数 - 学习率 × 梯度。通过迭代逐步逼近最优解。在文章http://www.laumy.tech/2050.html我们已经探讨过常用的优化算法。 接下来我们再来从PyTorch使用的角度复习一下。torch.optim 是 PyTorch 官方优化器模块,提供了 SGD、Adam、AdamW 等主流优化算法的实现,所有的优化器都继承自基类 torch.optim.Optimizer。其核心作用是如下: 自动化参数更新:根据反向传播计算的梯度,按特定优化策略(如 Adam 的自适应学习率)更新模型参数。 统一接口抽象:通过 Optimizer 基类封装不同算法,提供一致的使用流程(zero_grad() 清空梯度 → step() 更新参数)。 在Pytorch中优化器统一封装成torch.optim接口调用,可以有以下优势。 避免重复实现复杂算法:无需手动编写 Adam 的动量、二阶矩估计等逻辑,直接调用成熟接口。 灵活支持训练需求:支持单/多参数组优化(如不同模块用不同学习率)、学习率调度、梯度清零等核心训练逻辑。 工程化与可维护性:通过统一接口管理超参数(lr、weight_decay),便于实验对比与代码复用。 torch.optim怎么用 Step 1:定义模型与优化器 import torch from torch import nn, optim # 1. 定义模型(示例:简单线性层) model = nn.Linear(in_features=10, out_features=2) # 2. 初始化优化器:传入模型参数 + 超参数 optimizer = optim.Adam( params=model.parameters(), # 待优化参数(PyTorch 参数迭代器) lr=1e-3, # 学习率(核心超参数) betas=(0.9, 0.999), # Adam 动量参数(控制历史梯度影响) weight_decay=0.01 # 权重衰减(L2 正则化,可选) ) 定义了一个optimizer使用的是Adam优化器,该优化器学习率设置为1e-3,权重衰减为0.01。 Step 2:训练循环 # 模拟输入数据(batch_size=32,特征维度=10) inputs = torch.randn(32, 10) targets = torch.randn(32, 2) # 模拟目标值 # 训练循环 for epoch in range(10): # ① 清空过往梯度(必须!否则梯度累积导致更新异常) optimizer.zero_grad() # ② 前向传播 + 计算损失 outputs = model(inputs) loss = nn.MSELoss()(outputs, targets) # 均方误差损失 # ③ 反向传播计算梯度 + 优化器更新参数 loss.backward() # 自动计算所有可训练参数的梯度 optimizer.step() # 根据梯度更新参数 print(f"Epoch {epoch}, Loss: {loss.item():.4f}") loss是损失,通过调用loss.backward()进行反向传播Pytorch就自动把梯度计算好保存了,然后调用关键的一部optimizer.step()就可以执行参数更新了(即新参数=旧参数-学习率*梯度),需要注意的是,在调用loss.backward()进行反向传播计算梯度时,要先调用optimizer.zero_grad()把之前的梯度值情况,因此每计算一次梯度都是被保存,不情况会导致梯度累积。 Step 3:差异化优化参数分组 # 定义参数组(不同模块用不同学习率) optimizer = optim.Adam([ { "params": model.backbone.parameters(), # backbone 参数 "lr": 1e-5 # 小学习率微调 }, { "params": model.head.parameters(), # 任务头参数 "lr": 1e-3 # 大学习率更新 } ], betas=(0.9, 0.999)) # 公共超参数(所有组共享) 上面是针对同一个模型内不同模块使用不同的超参数lr。 抽象基类OptimizerConfig OptimizerConfig 是所有优化器配置的抽象基类,通过 draccus.ChoiceRegistry 实现子类注册机制(类似插件系统),为新增优化器类型提供统一接口。 @dataclass class OptimizerConfig(draccus.ChoiceRegistry, abc.ABC): lr: float weight_decay: float grad_clip_norm: float @property def type(self) -> str: return self.get_choice_name(self.__class__) @classmethod def default_choice_name(cls) -> str | None: return "adam" @abc.abstractmethod def build(self) -> torch.optim.Optimizer | dict[str, torch.optim.Optimizer]: raise NotImplementedError 继承关系 class OptimizerConfig(draccus.ChoiceRegistry, abc.ABC): OptimizerConfig 继承abc.ABC和draccus.ChoiceRegistry,前者标记为抽象基类,强制子类实现build的抽象方法,确保接口的一致性。后者提供子类注册机制,通过@OptimizerConfig.register_subclass("名称") 将子类与优化器类型绑定(如 "adam" → AdamConfig),支持配置驱动的动态实例化。 核心属性 lr: float # 学习率(核心超参数) weight_decay: float # 权重衰减(L2正则化系数) grad_clip_norm: float # 梯度裁剪阈值(防止梯度爆炸) 上面3个参数都是优化器的基础配置,避免子类重复定义。 lr/weight_decay:直接传递给 torch.optim 优化器(如 Adam 的 lr 参数) grad_clip_norm:不参与优化器创建,而是在训练流程中用于梯度裁剪(如 train.py 中 torch.nn.utils.clip_grad_norm_) 核心方法 (1)type属性用于表示优化器类型 @property def type(self) -> str: return self.get_choice_name(self.__class__) 通过 draccus.ChoiceRegistry 的 get_choice_name 方法,获取子类注册的优化器类型名称(如 AdamConfig 的 type 为 "adam")。 在实际应用中,在配置解析时,通过 type 字段(如 {"type": "adam"})即可匹配到对应子类(AdamConfig),实现“配置→实例”的自动映射。 (2)default_choice_name默认优化器类型 @classmethod def default_choice_name(cls) -> str | None: return "adam" 当配置中为显式指定type时,默认是用adam类型即AdamConfig,旨在简化用户配置,无需手动指定常见优化器类型。 (3)build抽象接口,优化器创建接口 @abc.abstractmethod def build(self) -> torch.optim.Optimizer | dict[str, torch.optim.Optimizer]: """Build the optimizer. It can be a single optimizer or a dictionary of optimizers.""" raise NotImplementedError 强制子类实现build的方法。 实例化子类 optimizers.py中一共定义了4种优化器配置子类,adam,adamw,sgd, multi_adam,其中前3个是单参数优化器,最后一个是多参数优化器,最终均通过build方法创建torch.optim实例 单参数优化器 @OptimizerConfig.register_subclass("adam") @dataclass class AdamConfig(OptimizerConfig): lr: float = 1e-3 betas: tuple[float, float] = (0.9, 0.999) eps: float = 1e-8 weight_decay: float = 0.0 grad_clip_norm: float = 10.0 def build(self, params: dict) -> torch.optim.Optimizer: kwargs = asdict(self) # 将 dataclass 字段转为字典 kwargs.pop("grad_clip_norm") # 移除梯度裁剪阈值(非优化器参数) return torch.optim.Adam(params, **kwargs) # 创建 PyTorch Adam 实例 AdamConfig 是 OptimizerConfig 的核心子类,封装了 Adam 优化器的配置与实例化逻辑,通过 draccus 注册机制与工程训练流程深度集成。 @OptimizerConfig.register_subclass("adam")将 AdamConfig 类与字符串 "adam" 绑定,实现 配置驱动的动态实例化,当配置文件中 optimizer.type: "adam" 时,draccus 会自动解析并实例化 AdamConfig。继承自 OptimizerConfig 的 ChoiceRegistry 机制,确保子类可通过 type 字段被唯一标识。 @dataclass自动生成 init、repr 等方法,简化超参数管理,无需手动编写构造函数,直接通过类字段定义超参数(如 lr=1e-3)。 在AdamConfig中默认初始化了一些参数值,其中lr、betas、eps、weight_decay直接对应 torch.optim.Adam 的参数,通过 build 方法传递给 PyTorch 优化器,而grad_clip_norm不参与优化器创建,而是用于训练时的梯度裁剪(如 train.py 中 torch.nn.utils.clip_grad_norm_),实现“优化器参数”与“训练流程参数”的职责分离。 在最后的build方法中,调用torch.optim.Adam(params, **kwargs) 实例化优化器。在此之前,先调用asdict(self)将 AdamConfig 实例的字段(如 lr、betas)转换为字典 {"lr": 1e-3, "betas": (0.9, 0.999), ...},再调用kwargs.pop("grad_clip_norm")剔除 grad_clip_norm(梯度裁剪阈值),因其不属于torch.optim.Adam 的参数(优化器仅负责参数更新,梯度裁剪是训练流程的独立步骤)。 多参数优化器 @OptimizerConfig.register_subclass("multi_adam") @dataclass class MultiAdamConfig(OptimizerConfig): optimizer_groups: dict[str, dict[str, Any]] = field(default_factory=dict) # 组内超参数 def build(self, params_dict: dict[str, list]) -> dict[str, torch.optim.Optimizer]: optimizers = {} for name, params in params_dict.items(): # 合并默认超参数与组内超参数(组内参数优先) group_config = self.optimizer_groups.get(name, {}) optimizer_kwargs = { "lr": group_config.get("lr", self.lr), # 组内 lr 或默认 lr "betas": group_config.get("betas", (0.9, 0.999)), "weight_decay": group_config.get("weight_decay", self.weight_decay), } optimizers[name] = torch.optim.Adam(params, **optimizer_kwargs) # 为每组创建独立优化器 return optimizers # 返回优化器字典:{"backbone": optimizer1, "head": optimizer2, ...} MultiAdamConfig 是 OptimizerConfig 的关键子类,专为多参数组优化场景设计,支持为模型不同模块(如 backbone 与 head)创建独立的 Adam 优化器,实现差异化超参数配置。 首先跟前面单参数的属性不同点是多了一个optimizer_groups,这是一个超参数字典,存储多组不同的超参数,示例如下。 optimizer_groups={ "backbone": {"lr": 1e-5, "weight_decay": 1e-4}, # 低学习率微调 backbone "head": {"lr": 1e-3, "betas": (0.95, 0.999)} # 高学习率更新 head,自定义动量参数 } build的方法主要逻辑如下: def build(self, params_dict: dict[str, list]) -> dict[str, torch.optim.Optimizer]: optimizers = {} for name, params in params_dict.items(): # 1. 获取组内超参数(无则使用默认) group_config = self.optimizer_groups.get(name, {}) # 2. 合并默认与组内超参数 optimizer_kwargs = { "lr": group_config.get("lr", self.lr), "betas": group_config.get("betas", (0.9, 0.999)), "eps": group_config.get("eps", 1e-5), "weight_decay": group_config.get("weight_decay", self.weight_decay), } # 3. 为该组创建 Adam 优化器 optimizers[name] = torch.optim.Adam(params, **optimizer_kwargs) return optimizers # 返回:{组名: 优化器实例} 其中params_dict是超参数组的来源,是字典类型,键为参数组名称(需与 optimizer_groups 键匹配),值为该组参数列表(如模型某模块的 parameters())。通常是策略类的get_optim_params方法提供,如下: # 策略类中拆分参数组(示例逻辑) def get_optim_params(self): return { "backbone": self.backbone.parameters(), "head": self.head.parameters() } 主要的核心逻辑是对于每个参数组,优先使用 optimizer_groups 中的超参数(如 group_config.get("lr")),无则回退到默认值(如 self.lr),然后为每个参数组创建独立的 torch.optim.Adam 实例,确保参数更新互不干扰。 优化器状态管理 状态保存 将优化器的某一个时刻参数进行存储,方便过程查看以及重新加载模型训练等等。 def save_optimizer_state( optimizer: torch.optim.Optimizer | dict[str, torch.optim.Optimizer], # 优化器实例或字典 save_dir: Path # 根保存目录 ) -> None: if isinstance(optimizer, dict): # 1. 处理多参数优化器字典(如 MultiAdamConfig 创建的优化器) for name, opt in optimizer.items(): # 遍历优化器名称与实例(如 "backbone": opt1) optimizer_dir = save_dir / name # 创建子目录:根目录/优化器名称(如 save_dir/backbone) optimizer_dir.mkdir(exist_ok=True, parents=True) # 确保目录存在(含父目录创建) _save_single_optimizer_state(opt, optimizer_dir) # 委托单优化器保存逻辑 else: # 2. 处理单参数优化器(如 AdamConfig 创建的优化器) _save_single_optimizer_state(optimizer, save_dir) # 直接使用根目录保存 区分单参数和多参数优化器,对应多参数优化器为每个优化器创建独立子目录(如 save_dir/backbone、save_dir/head),避免不同优化器的状态文件冲突。如果是单优化器,则直接调用 _save_single_optimizer_state,状态文件保存于 save_dir 根目录,结构简洁。 def _save_single_optimizer_state(optimizer: torch.optim.Optimizer, save_dir: Path) -> None: """Save a single optimizer's state to disk.""" # 1. 获取优化器完整状态字典(含参数组和内部状态) state = optimizer.state_dict() # 2. 分离参数组(超参数配置)与剩余状态(张量数据) param_groups = state.pop("param_groups") # 参数组:学习率、权重衰减等超参数(非张量) flat_state = flatten_dict(state) # 剩余状态:动量、二阶矩等张量(展平嵌套字典,便于序列化) # 3. 保存张量状态(safetensors)与参数组(JSON) save_file(flat_state, save_dir / OPTIMIZER_STATE) # 张量数据:高效二进制存储(如 "optimizer_state.safetensors") write_json(param_groups, save_dir / OPTIMIZER_PARAM_GROUPS) # 参数组:JSON 格式(如 "optimizer_param_groups.json"方便可视化查看) 存储张量和非张量的格式 param_groups:包含优化器的超参数配置(如 lr、weight_decay、betas),是列表嵌套字典的结构,存储为JSON格式,JSON 序列化后可直接查看超参数,便于训练过程追溯。其文件名为optimizer_param_groups.json。 state:包含优化器的内部状态张量(如 Adam 的 exp_avg、exp_avg_sq 动量缓冲区),是嵌套字典结构,通过 flatten_dict 展平后用 safetensors 保存,safetensors专为张量设计的存储格式,支持高效读写、内存映射,避免 PyTorch torch.save 的 pickle 兼容性问题。其文件名为optimizer_state.safetensors。 状态加载 def load_optimizer_state( optimizer: torch.optim.Optimizer | dict[str, torch.optim.Optimizer], # 待恢复的优化器(单实例或字典) save_dir: Path # 状态文件根目录 ) -> torch.optim.Optimizer | dict[str, torch.optim.Optimizer]: if isinstance(optimizer, dict): # 1. 处理多优化器字典(如 MultiAdamConfig 创建的优化器) loaded_optimizers = {} for name, opt in optimizer.items(): # 遍历优化器名称与实例(如 "backbone": opt1) optimizer_dir = save_dir / name # 子目录路径:根目录/优化器名称(如 save_dir/backbone) if optimizer_dir.exists(): # 仅当目录存在时加载(避免新增优化器时出错) loaded_optimizers[name] = _load_single_optimizer_state(opt, optimizer_dir) else: loaded_optimizers[name] = opt # 目录不存在时返回原优化器 return loaded_optimizers else: # 2. 处理单优化器(如 AdamConfig 创建的优化器) return _load_single_optimizer_state(optimizer, save_dir) # 直接从根目录加载 同样是区分单参数和多参数,对于多参数组根据save_dir / name 定位每个优化器的独立子目录(与 save_optimizer_state 的保存结构对应),如果是单参数优化器直接调用 _load_single_optimizer_state,从 save_dir 根目录加载状态文件。 def _load_single_optimizer_state(optimizer: torch.optim.Optimizer, save_dir: Path) -> torch.optim.Optimizer: """Load a single optimizer's state from disk.""" # 1. 获取当前优化器的状态字典结构(用于校验与适配) current_state_dict = optimizer.state_dict() # 2. 加载并恢复张量状态(safetensors → 嵌套字典) flat_state = load_file(save_dir / OPTIMIZER_STATE) # 加载展平的张量状态(如 "optimizer_state.safetensors") state = unflatten_dict(flat_state) # 恢复为嵌套字典(与保存时的 flatten_dict 对应) # 3. 处理优化器内部状态(如动量缓冲区) if "state" in state: # 将字符串键转为整数(safetensors 保存时键为字符串,PyTorch 期望参数索引为整数) loaded_state_dict = {"state": {int(k): v for k, v in state["state"].items()}} else: loaded_state_dict = {"state": {}} # 新创建的优化器可能无状态,初始化为空 # 4. 处理参数组(超参数配置,如学习率、权重衰减) if "param_groups" in current_state_dict: # 从 JSON 反序列化参数组,并确保结构与当前优化器匹配 param_groups = deserialize_json_into_object( save_dir / OPTIMIZER_PARAM_GROUPS, # 加载参数组 JSON 文件(如 "optimizer_param_groups.json") current_state_dict["param_groups"] # 以当前参数组结构为模板,确保兼容性 ) loaded_state_dict["param_groups"] = param_groups # 5. 将恢复的状态字典加载到优化器 optimizer.load_state_dict(loaded_state_dict) return optimizer 张量的状态恢复部分,通过 unflatten_dict 将保存时展平的状态(flatten_dict)恢复为嵌套字典,匹配 PyTorch 优化器状态的原始结构。接着通过state["state"] 的键在保存时被序列化为字符串(如 "0"),加载时需转回整数(如 0),以匹配 PyTorch 参数索引的整数类型。 对于参数组恢复先通过JSON 反序列化,deserialize_json_into_object 将 JSON 文件中的参数组配置(如 [{"lr": 1e-3, ...}, ...])反序列化为 Python 对象。再以当前优化器的 current_state_dict["param_groups"] 为模板,确保加载的参数组与当前优化器的参数结构兼容(如参数组数量、超参数字段匹配),避免因配置变更导致的加载失败。 最后合并 state(张量数据)和 param_groups(超参数配置)为完整状态字典,通过 optimizer.load_state_dict 完成优化器状态恢复。 工程调用 创建流程 # 1. 策略提供参数(如多参数组) params = policy.get_optim_params() # 例如:{"backbone": [params1...], "head": [params2...]} # 2. 配置解析:根据 config.optimizer.type 实例化对应子类(如 MultiAdamConfig) cfg.optimizer = MultiAdamConfig( lr=1e-3, optimizer_groups={"backbone": {"lr": 1e-5}, "head": {"lr": 1e-3}} ) # 3. 创建优化器实例 optimizer = cfg.optimizer.build(params) # 返回:{"backbone": Adam, "head": Adam} 训练流程 def update_policy(...): # 前向传播计算损失 loss, output_dict = policy.forward(batch) # 反向传播与梯度裁剪 grad_scaler.scale(loss).backward() grad_scaler.unscale_(optimizer) torch.nn.utils.clip_grad_norm_(policy.parameters(), grad_clip_norm) # 参数更新 grad_scaler.step(optimizer) optimizer.zero_grad() # 清空梯度
  • lerobot训练

    lerobot训练

    初始化 @parser.wrap() def train(cfg: TrainPipelineConfig): cfg.validate() # 验证配置合法性(如路径、超参数范围) init_logging() # 初始化日志系统(本地文件+控制台输出) if cfg.seed is not None: set_seed(cfg.seed) # 固定随机种子(确保训练可复现) device = get_safe_torch_device(cfg.policy.device, log=True) # 自动选择训练设备(GPU/CPU) torch.backends.cudnn.benchmark = True # 启用CuDNN自动优化(加速卷积运算) torch.backends.cuda.matmul.allow_tf32 = True # 启用TF32精度(加速矩阵乘法) 初始化阶段主要是解析参数,初始化日志,确定训练的设备。 参数解析依旧是使用了装饰器parser.wrap,通过命令后参数构建生成TrainPipelineConfig类,该类是LeRobot 框架中训练流程的核心配置类,继承自 HubMixin(支持 Hugging Face Hub 交互),通过 dataclass 定义训练全流程的参数(如数据集路径、模型超参、训练步数等)。其核心作用是: 参数聚合:统一管理数据集、策略模型、优化器、评估等模块的配置,避免参数分散。 合法性校验:通过 validate 方法确保配置参数有效(如路径存在、超参数范围合理)。 可复现性支持:固定随机种子、保存/加载配置,确保训练过程可复现。 Hub 集成:支持从 Hub 加载预训练配置或推送配置至 Hub,便于共享和断点续训。 (1)核心属性 dataset:DatasetConfig, 数据集配置(如 repo_id="laumy/record-07271539"、图像预处理参数) env: envs.EnvConfig,评估环境配置(如仿真环境类型、任务名称,仅用于训练中评估) policy: PreTrainedConfig,策略模型配置(如 ACT 的 Transformer 层数、视觉编码器类型)。 output_dir: Path,训练输出目录(保存 checkpoint、日志、评估视频)。 resume,是否从 checkpoint 续训(需指定 checkpoint_path) seed,随机种子(控制模型初始化、数据 shuffle、评估环境随机性,确保复现)。 num_workers,数据加载线程数,用于加速数据预处理。 batch_size,训练批次大小,即单步输入样本数。 steps,总训练步数,每次参数更新计为1步。 log_freq,日志记录频率,每 200 步打印一次训练指标,如 loss、梯度范数。 eval_freq,评估频率(每 20,000 步在环境中测试策略性能,计算成功率、平均奖励) save_checkpoint,是否保存 checkpoint(模型权重、优化器状态),用于续训。 wandb,Weights & Biases 日志配置(控制是否上传指标、视频至 WandB) use_policy_training_preset,是否使用策略内置的训练预设(如 ACT 策略默认 AdamW 优化器、学习率)。 optimizer,优化器配置(如学习率、权重衰减,仅当 use_policy_training_preset=False 时需手动设置)。 scheduler,学习率调度器配置(如余弦退火,同上)。 (2)核心方法 post_init:初始化实例后设置 checkpoint_path(断点续训时的路径),为后续配置校验做准备。 validate:确保所有配置参数合法且一致,例如续训时校验 checkpoint 路径存在,自动生成唯一输出目录(避免覆盖),强制要求非预设模式下手动指定优化器。 _save_pretrained:将配置保存为 JSON 文件(train_config.json),用于 Hub 共享或本地存储。 from_pretrained::从 Hub 或本地路径加载配置(支持断点续训或复用已有配置)。 数据与模型准备 # 1. 加载离线数据集(如机器人操作轨迹数据) logging.info("Creating dataset") dataset = make_dataset(cfg) # 从HuggingFace Hub或本地路径加载数据集 # 2. 初始化策略模型(如ACT、Diffusion Policy) logging.info("Creating policy") policy = make_policy( cfg=cfg.policy, # 策略配置(如ACT的transformer层数、视觉编码器类型) ds_meta=dataset.meta, # 数据集元信息(输入/输出维度、特征类型) ) # 3. 创建优化器和学习率调度器 logging.info("Creating optimizer and scheduler") optimizer, lr_scheduler = make_optimizer_and_scheduler(cfg, policy) # 默认AdamW优化器 grad_scaler = GradScaler(device.type, enabled=cfg.policy.use_amp) # 混合精度训练梯度缩放器 make_dataset(cfg):根据配置加载数据集(如 laumy/record-07271539),返回 LeRobotDataset 对象,包含观测(图像、关节状态)和动作序列。 make_policy(...):根据 policy.type(如 act)和数据集元信息初始化模型,自动适配输入维度(如图像分辨率、状态维度)和输出维度(如动作空间大小)。 make_optimizer_and_scheduler(cfg, policy):创建优化器(默认AdamW,学习率 1e-5)和调度器(默认无,可配置余弦退火等),支持对不同参数组设置不同学习率(如视觉 backbone 微调)。 数据加载 数据加载调用的是make_dataset函数,其是LeRobot 框架中数据集创建的核心工厂函数,负责根据训练配置(TrainPipelineConfig)初始化离线机器人数据集。它整合了图像预处理、时序特征处理(delta timestamps)和数据集元信息加载,最终返回可直接用于训练的 LeRobotDataset 对象。 根据数据集配置初始化图像预处理管道(如Resize、Normalize、RandomCrop等)。若 cfg.dataset.image_transforms.enable=True(通过命令行或配置文件设置),则创建 ImageTransforms 实例,加载预设的图像变换参数(如分辨率、是否翻转等),否则不进行图像预处理。 image_transforms = ( ImageTransforms(cfg.dataset.image_transforms) if cfg.dataset.image_transforms.enable else None ) (1)单数据集加载 if isinstance(cfg.dataset.repo_id, str): # 加载数据集元信息(特征定义、统计数据、帧率等) ds_meta = LeRobotDatasetMetadata( cfg.dataset.repo_id, root=cfg.dataset.root, revision=cfg.dataset.revision ) # 计算时序偏移(delta timestamps) delta_timestamps = resolve_delta_timestamps(cfg.policy, ds_meta) # 创建 LeRobotDataset 实例 dataset = LeRobotDataset( cfg.dataset.repo_id, # 数据集标识(HuggingFace Hub repo_id 或本地路径) root=cfg.dataset.root, # 本地缓存根目录 episodes=cfg.dataset.episodes, # 指定加载的轨迹片段(如 ["ep001", "ep002"]) delta_timestamps=delta_timestamps, # 时序特征偏移(见下文详解) image_transforms=image_transforms, # 图像预处理管道 revision=cfg.dataset.revision, # 数据集版本(如 Git commit hash) video_backend=cfg.dataset.video_backend, # 视频解码后端(如 "pyav") ) 首先实例化LeRobotDatasetMetadata,加载数据集的信息,包括特征定义如observation.images.laptop 的形状、action的维度等,以及统计信息如图像均值/方差、动作范围,还有帧率fps以便时序偏移计算。 其次调用resolve_delta_timestamps根据模型计算时序特征偏移,例如如果策略需要当前帧及前2帧的观测,则生成[-0.04, -0.02, 0](单位:秒),用于从数据中提取多时序特征。 接着实例化LeRobotDataset,其实现数据加载、时序特征拼接、图像预处理等功能,为悬链提供批次化数据,具体见http://www.laumy.tech/2332.html#h37 (2)多数据集支持 else: raise NotImplementedError("The MultiLeRobotDataset isn't supported for now.") # 以下为预留的多数据集加载代码(暂未实现) dataset = MultiLeRobotDataset( cfg.dataset.repo_id, # 多数据集标识列表(如 ["repo1", "repo2"]) image_transforms=image_transforms, video_backend=cfg.dataset.video_backend, ) logging.info(f"多数据集索引映射: {pformat(dataset.repo_id_to_index)}") 预留多数据集合并功能(如融合不同场景的机器人轨迹数据),目前未实现,直接抛出 NotImplementedError。 (3)imsageNet统计量替换 if cfg.dataset.use_imagenet_stats: for key in dataset.meta.camera_keys: # 遍历所有相机图像特征(如 "observation.images.laptop") for stats_type, stats in IMAGENET_STATS.items(): # IMAGENET_STATS = {"mean": [...], "std": [...]} dataset.meta.stats[key][stats_type] = torch.tensor(stats, dtype=torch.float32) 其目的主要将数据集图像的归一化统计量(均值/方差)替换为 ImageNet 预训练模型的统计量。当使用预训练视觉编码器(如 ResNet)时,用 ImageNet 统计量归一化图像,可提升模型迁移学习效果(避免因数据集自身统计量导致的分布偏移)。 模型加载 # 初始化策略模型(如ACT、Diffusion Policy) logging.info("Creating policy") policy = make_policy( cfg=cfg.policy, # 策略配置(如ACT的transformer层数、视觉编码器类型) ds_meta=dataset.meta, # 数据集元信息(输入/输出维度、特征类型) ) make_policy 是 LeRobot 框架中策略模型实例化的核心工厂函数,负责根据配置(PreTrainedConfig)、数据集元信息(ds_meta)或环境配置(env_cfg),动态创建并初始化策略模型(如 ACT、Diffusion、TDMPC 等)。其核心作用是自动适配策略输入/输出维度(基于数据或环境特征),并支持加载预训练权重或初始化新模型。 在函数中,根据策略类型cfg.type,如 "act"、"diffusion")动态获取对应的策略类,如若 cfg.type = "act",get_policy_class 返回 ACTPolicy 类(ACT 策略的实现)。如果模型需要明确输入特征(如图像、状态)和输出特征(如动作)的维度,需进一步通过数据集或环境解析。 if ds_meta is not None: features = dataset_to_policy_features(ds_meta.features) # 从数据集元信息提取特征 kwargs["dataset_stats"] = ds_meta.stats # 数据集统计量(用于输入归一化,如图像均值/方差) else: if not cfg.pretrained_path: logging.warning("无数据集统计量,归一化模块可能初始化异常") # 无预训练时,缺少数据统计量会导致归一化参数异常 features = env_to_policy_features(env_cfg) # 从环境配置提取特征(如 Gym 环境的观测/动作空间) 如果定义了离线数据进行解析特征,否则基于环境解析特征。 获取到特征后进行配置输入/输出特征。 cfg.output_features = {key: ft for key, ft in features.items() if ft.type is FeatureType.ACTION} cfg.input_features = {key: ft for key, ft in features.items() if key not in cfg.output_features} 将解析后的特征划分为输入特征(如图像、状态)和输出特征(仅动作,FeatureType.ACTION),并更新到策略配置 cfg 中。例如若 features 包含 "observation.images.laptop"(图像)和 "action"(动作),则:input_features = {"observation.images.laptop": ...},output_features = {"action": ...}。 接着对模型进行实例化,也就是预训练模型的加载或新模型的初始化。 if cfg.pretrained_path: # 加载预训练策略(如从 HuggingFace Hub 或本地路径) kwargs["pretrained_name_or_path"] = cfg.pretrained_path policy = policy_cls.from_pretrained(**kwargs) # 调用策略类的 from_pretrained 方法加载权重 else: # 初始化新模型(随机权重) policy = policy_cls(** kwargs) # 传入配置和特征信息初始化模型结构 最后将模型迁移到目标设备,如cuda:0或cpu。 policy.to(cfg.device) # 将模型移至目标设备(如 "cuda:0"、"cpu") assert isinstance(policy, nn.Module) # 确保返回的是 PyTorch 模型 return policy 创建优化器 # 创建优化器和学习率调度器 logging.info("Creating optimizer and scheduler") optimizer, lr_scheduler = make_optimizer_and_scheduler(cfg, policy) # 默认AdamW优化器 grad_scaler = GradScaler(device.type, enabled=cfg.policy.use_amp) # 混合精度训练梯度缩放器 负责初始化训练核心组件:优化器(更新模型参数)、学习率调度器(动态调整学习率)和梯度缩放器(混合精度训练支持)。三者共同构成策略模型的“参数更新引擎”,直接影响训练效率和收敛稳定性。 (1)优化器与调度器创建 通过工厂函数 make_optimizer_and_scheduler 动态创建优化器和学习率调度器,参数来源于配置 cfg 和策略模型 policy。该函数根据 TrainPipelineConfig 中的 optimizer 和 scheduler 配置,或策略预设(use_policy_training_preset=True),生成优化器和调度器。其中优化器默认使用 AdamW(带权重衰减的 Adam),参数包括学习率(cfg.optimizer.lr)、权重衰减系数(cfg.optimizer.weight_decay)等,优化对象为 policy.parameters()(策略模型的可学习参数);而调度器如果配置了 scheduler.type(如 "cosine"),则创建对应学习率调度器(如余弦退火调度器),否则返回 None。 若 cfg.use_policy_training_preset=True(默认),则直接使用策略内置的优化器参数(如 ACT 策略默认 lr=3e-4,weight_decay=1e-4),无需手动配置 optimizer 和 scheduler。 (2)梯度缩放器初始化 GradScaler用于解决低精度(如 float16)训练中的梯度下溢问题。device.type参数模型所在设备类型("cuda"/"mps"/"cpu"),确保缩放器与设备匹配。参数enabled=cfg.policy.use_amp确定是否启用混合精度训练(由策略配置 use_amp 控制)。若为 False,缩放器将禁用(梯度不缩放)。 本质原理是混合精度训练时,前向传播使用低精度(加速计算),但梯度可能因数值过小而下溢(变为 0)。GradScaler 通过梯度缩放(放大损失值 → 梯度按比例放大 → 更新时反缩放)避免下溢,同时保证参数更新精度。 数据加载器配置 # 创建时序感知采样器(针对机器人轨迹数据) if hasattr(cfg.policy, "drop_n_last_frames"): # 如ACT策略需丢弃轨迹末尾帧 sampler = EpisodeAwareSampler( dataset.episode_data_index, # 轨迹索引信息 drop_n_last_frames=cfg.policy.drop_n_last_frames, # 丢弃每段轨迹末尾N帧 shuffle=True, # 轨迹内随机打乱 ) else: sampler = None # 普通随机采样 # 构建DataLoader(多线程加载+内存锁定) dataloader = torch.utils.data.DataLoader( dataset, num_workers=cfg.num_workers, # 数据加载线程数(加速IO) batch_size=cfg.batch_size, # 批次大小 sampler=sampler, pin_memory=device.type != "cpu", # 内存锁定(加速CPU→GPU数据传输) ) dl_iter = cycle(dataloader) # 循环迭代器(数据集遍历完后自动重启) EpisodeAwareSampler:确保采样的batch包含完整轨迹片段(避免时序断裂),适配机器人操作等时序依赖任务。 cycle(dataloader):将DataLoader转换为无限迭代器,支持训练步数(cfg.steps)远大于数据集长度的场景。 采样器选择 hasattr(cfg.policy, "drop_n_last_frames")用于检查模型中是否支持drop_n_last_frames 属性(如 ACT 策略需丢弃每段轨迹的最后 N 帧,避免无效数据)。如果支持,则启用时序感知采样EpisodeAwareSampler。器策略依赖时序连续的轨迹数据(如机器人操作的连贯动作序列)。核心参数如下: dataset.episode_data_index:数据集轨迹索引(记录每段轨迹的起始/结束位置),确保采样时不跨轨迹断裂时序。 drop_n_last_frames=cfg.policy.drop_n_last_frames:丢弃每段轨迹的最后 N 帧(如因传感器延迟导致的无效帧)。 shuffle=True:轨迹内部随机打乱(但保持轨迹内时序连续性),平衡随机性与时序完整性。 若模型策略中不支持时序感知采样,那么则采用普通的随机采样,使用默认随机采样(shuffle=True,sampler=None),DataLoader 直接对数据集全局打乱。 数据加载管道 dataloader = torch.utils.data.DataLoader基于采样器配置,创建 PyTorch DataLoader,实现多线程并行数据加载,为训练循环提供高效的批次数据。 num_workers=cfg.num_workers:使用配置的线程数并行加载数据(如 8 线程),避免数据加载成为训练瓶颈。 pin_memory=True(当使用 GPU 时):将数据加载到固定内存页,加速数据从 CPU 异步传输到 GPU,减少等待时间。 drop_last=False:保留最后一个可能不完整的批次(尤其在小数据集场景,避免数据浪费)。 batch_size:控制每个批次的样本数量。batch_size=8 时,所有张量第一维度为 8。 sampler:控制采样顺序(时序连续/随机)。EpisodeAwareSampler 确保动作序列来自同一段轨迹。 num_workers:控制并行加载的子进程数,影响数据加载速度(非数据结构)。num_workers=8 比单线程加载快 5-10 倍(取决于硬件)。注意其并行只会影响数据加载速度,不会影响训练速度,当前数据加载是持续循环进行,而非一次性完成,但是这个相对训练时间。通过控制并行数据加载进程数,减少 GPU 等待时间,提升整体效率。 dataloader 是 torch.utils.data.DataLoader 类的实例,本质是一个可迭代对象(iterable),用于按批次加载数据。其核心作用是将原始数据集(LeRobotDataset)转换为训练可用的批次化数据,支持多线程并行加载、自定义采样顺序等功能。 DataLoader 通过以下步骤将原始数据集(LeRobotDataset)转换为批次化数据 数据集索引采样:原始数据集 dataset(LeRobotDataset 实例),包含所有机器人轨迹数据。通过 sampler 参数(如 EpisodeAwareSampler 或默认随机采样)生成样本索引序列,决定数据加载顺序。 多线程并行加载:num_workers=cfg.num_workers:启动 num_workers 个子进程并行执行 dataset.getitem(index),从磁盘/内存中加载单个样本数据(如读取图像、解析状态)。 样本拼接:默认行为:DataLoader 使用 torch.utils.data.default_collate 函数,将多个单样本字典(来自不同子进程)拼接为批次字典,对每个特征键(如 "observation.images.laptop"),将所有单样本张量(shape=[C, H, W])堆叠为批次张量(shape=[B, C, H, W]);非张量数据(如列表)会被转换为张量或保留为列表(取决于数据类型)。 内存优化:pin_memory=device.type != "cpu":当使用 GPU 时,启用内存锁定(pin memory),将加载的张量数据存入 CPU 的固定内存页,加速后续异步传输到 GPU 的过程(batch[key].to(device, non_blocking=True)) 循环迭代器 dl_iter = cycle(dataloader) 将 DataLoader 转换为无限迭代器,支持按“训练步数”(cfg.steps)而非“ epochs” 训练。离线训练通常以固定步数(如 100,000 步)为目标,而非遍历数据集次数(epochs)。当数据集较小时,cycle(dataloader) 可在数据集遍历结束后自动重启,确保训练步数达标。 那dataloader输出的批次数据结构长什么样的? 当通过 next(dl_iter) 获取批次数据时(如代码中 first_batch = next(dl_iter)),返回的是一个字典类型的批次数据,结构如下: dl_iter = cycle(dataloader) first_batch = next(dl_iter) for key, value in first_batch.items(): if isinstance(value, torch.Tensor): print(f"{key}: shape={value.shape}, dtype={value.dtype}") else: print(f"{key}: type={type(value)}") 打印如下: observation.images.handeye: shape=torch.Size([8, 3, 480, 640]), dtype=torch.float32 observation.images.fixed: shape=torch.Size([8, 3, 480, 640]), dtype=torch.float32 action: shape=torch.Size([8, 100, 6]), dtype=torch.float32 observation.state: shape=torch.Size([8, 6]), dtype=torch.float32 timestamp: shape=torch.Size([8]), dtype=torch.float32 frame_index: shape=torch.Size([8]), dtype=torch.int64 episode_index: shape=torch.Size([8]), dtype=torch.int64 index: shape=torch.Size([8]), dtype=torch.int64 task_index: shape=torch.Size([8]), dtype=torch.int64 action_is_pad: shape=torch.Size([8, 100]), dtype=torch.bool task: type=<class 'list'> 可以看到一个batch有8组数据,因为TrainPipelineConfig::batch_size设置的8,控制每个批次的样本数量,batch_size定义了每次参数更新是输入模型的样本数量,如这里的8,就是表示输入8个样本更新一次参数。batch_size的设定要根据模型大小、GPU内存、数据特征综合确定。 如果batch_size过小,批次(如 batch_size=1)的梯度受单个样本噪声影响大,导致参数更新方向不稳定,Loss 曲线剧烈震荡(如下图示例),难以收敛到稳定最小值,如果模型使用了BN层,过小的批次会导致BN统计量(均值/方差)估计不准,影响特征表达。同时GPU利用率低,训练速度会变慢; 如果batch_size过大最直接的影响就是GPU内存会溢出,同时会导致收敛速度变慢或陷入次优解。 一般情况下,若数据集样本量少(如仅 1k 样本),可设 batch_size=32(全量数据集的 3%),避免批次占比过大导致过拟合;若模型中含有BN层,batch_size 建议 ≥ 16,确保 BN 统计量稳定。 batch_size 状态 典型问题 解决方案 过大(OOM) GPU 内存溢出,收敛慢 减小批次/降低图像分辨率/梯度累积 过小(<4) Loss 波动大,GPU 利用率低 增大批次至 8-32(需满足内存) 合理(8-32) 梯度稳定,GPU 利用率高(80%-90%) 维持默认或根据模型/数据微调 batch_size 的核心是平衡内存、速度与收敛性,建议从默认值开始,结合硬件条件和训练监控动态调整。 开始训练 训练模式设置 policy.train() 将策略模型切换为训练模式,确保所有层(如 Dropout、BatchNorm)按训练逻辑运行。PyTorch模型模式有差异,分为训练模式和评估模式。 训练模式:启用 Dropout(随机丢弃神经元防止过拟合)、BatchNorm 更新运行时统计量(均值/方差)。 评估模式(policy.eval()):关闭 Dropout、BatchNorm 使用训练阶段累积的统计量。 在训练循环前显式调用,避免因模型残留评估模式导致训练效果异常(如 Dropout 未激活导致过拟合)。 指标跟踪初始化 train_metrics = { "loss": AverageMeter("loss", ":.3f"), # 训练损失(格式:保留3位小数) "grad_norm": AverageMeter("grdn", ":.3f"), # 梯度范数(格式:缩写"grdn",保留3位小数) "lr": AverageMeter("lr", ":0.1e"), # 学习率(格式:科学计数法,保留1位小数) "update_s": AverageMeter("updt_s", ":.3f"), # 单步更新耗时(格式:缩写"updt_s",保留3位小数) "dataloading_s": AverageMeter("data_s", ":.3f"),# 数据加载耗时(格式:缩写"data_s",保留3位小数) } 通过 AverageMeter 类(来自 lerobot.utils.logging_utils)定义需跟踪的核心训练指标,支持实时平均计算和格式化输出训练信息,这个类实例最终通过参数传递给MetricsTracker。AverageMeter 功能为内部维护 sum(累积和)、count(样本数)、avg(平均值),通过 update 方法更新指标,并按指定格式(如 ":.3f")输出。例:每步训练后调用 train_metrics["loss"].update(loss.item()),自动累积并计算平均 loss。 train_tracker = MetricsTracker( cfg.batch_size, # 批次大小(用于计算每样本指标) dataset.num_frames, # 数据集总帧数(用于进度比例计算) dataset.num_episodes, # 数据集总轨迹数(辅助日志上下文) train_metrics, # 上述定义的指标跟踪器字典 initial_step=step # 初始步数(支持断点续训时从上次步数开始跟踪) ) 创建 MetricsTracker 实例(来自 lerobot.utils.logging_utils),用于聚合、格式化和记录所有训练指标,主要的作用如下: 指标更新:训练循环中通过 train_tracker.loss = loss.item() 便捷更新单个指标。 平均计算:自动对 AverageMeter 指标进行滑动平均(如每 log_freq 步输出平均 loss)。 日志输出:调用 logging.info(train_tracker) 时,按统一格式打印所有指标(如 loss: 0.523 | grdn: 1.234 | lr: 3.0e-4)。 断点续训支持:通过 initial_step=step 确保从断点恢复训练时,指标统计不重复计算。 总结下该代码段是训练前的关键初始化步骤,通过 AverageMeter 和 MetricsTracker 构建训练全流程的指标监控框架,为后续训练循环中的指标更新、日志记录和性能调优提供参考。 启动循环训练 logging.info("Start offline training on a fixed dataset") # 启动训练循环,从当前 step(初始为 0 或断点续训的步数)迭代至 cfg.steps(配置文件中定义的总训练步数,如 100,000 步)。 for _ in range(step, cfg.steps): # 1. 加载数据batch batch = next(dl_iter) # 从循环迭代器获取batch batch = {k: v.to(device, non_blocking=True) for k, v in batch.items() if isinstance(v, torch.Tensor)} # 数据移至设备 # 2. 单步训练(前向传播→损失计算→反向传播→参数更新) train_tracker, output_dict = update_policy( train_tracker, policy, batch, optimizer, cfg.optimizer.grad_clip_norm, # 梯度裁剪阈值(默认1.0) grad_scaler=grad_scaler, use_amp=cfg.policy.use_amp, # 启用混合精度训练 ) # 3. 训练状态更新与记录 step += 1 if is_log_step: # 按log_freq记录训练指标(loss、梯度范数等) logging.info(train_tracker) if is_saving_step: # 按save_freq保存模型checkpoint save_checkpoint(checkpoint_dir, step, cfg, policy, optimizer, lr_scheduler) if is_eval_step: # 按eval_freq在环境中评估策略性能 eval_info = eval_policy(eval_env, policy, cfg.eval.n_episodes) # 执行评估 update_policy(...):核心训练函数,实现: -- 前向传播:policy.forward(batch) 计算损失(如动作预测MSE损失+VAE KL散度)。 -- 反向传播:grad_scaler.scale(loss).backward() 缩放损失梯度(混合精度训练)。 -- 梯度裁剪:torch.nn.utils.clip_grad_norm_ 限制梯度范数(防止梯度爆炸)。 -- 参数更新:grad_scaler.step(optimizer) 更新模型参数,optimizer.zero_grad() 清空梯度缓存。 save_checkpoint(...):保存模型权重、优化器状态、学习率调度器状态和当前步数,支持断点续训。 eval_policy(...):在仿真环境中测试策略性能,计算平均奖励、成功率等指标,并保存评估视频。 上面代码是train 函数的核心训练循环,负责执行离线训练的完整流程:从数据加载、模型参数更新,到指标记录、模型保存与策略评估。循环以“训练步数”(step)为驱动,从初始步数(0 或断点续训的步数)运行至目标步数(cfg.steps),确保模型充分训练并实时监控性能。 (1)循环初始化,按步数迭代 for _ in range(step, cfg.steps): 启动训练循环,从当前 step(初始为 0 或断点续训的步数)迭代至 cfg.steps(配置文件中定义的总训练步数,如 100,000 步)。 (2)数据加载与耗时记录 start_time = time.perf_counter() batch = next(dl_iter) # 从无限迭代器获取批次数据 train_tracker.dataloading_s = time.perf_counter() - start_time # 记录数据加载耗时 dl_iter = cycle(dataloader):cycle 将 DataLoader 转换为无限迭代器,数据集遍历完毕后自动重启,确保训练步数达标(而非受限于数据集大小)。 dataloading_s 指标:通过 train_tracker 记录单批次加载时间,用于监控数据加载是否成为训练瓶颈(若该值接近模型更新时间 update_s,需优化数据加载)。 (3)批次数据设备迁移 for key in batch: if isinstance(batch[key], torch.Tensor): batch[key] = batch[key].to(device, non_blocking=True) 将批次中的张量数据(如图像、动作)异步传输到目标设备(GPU/CPU)。其中non_blocking=True:启用异步数据传输,允许 CPU 在数据传输至 GPU 的同时执行后续计算(如模型前向传播准备),提升硬件利用率。 (4)模型参数更新 train_tracker, output_dict = update_policy( train_tracker, policy, batch, optimizer, cfg.optimizer.grad_clip_norm, grad_scaler=grad_scaler, lr_scheduler=lr_scheduler, use_amp=cfg.policy.use_amp, ) 调用 update_policy 函数执行单次参数更新,流程包括: 前向传播:计算模型输出和损失(loss = policy.forward(batch))。 混合精度训练:通过 torch.autocast 启用低精度计算(若 use_amp=True),加速训练并节省显存。 反向传播:梯度缩放(grad_scaler.scale(loss).backward())避免数值下溢,梯度裁剪(clip_grad_norm_)防止梯度爆炸。 参数更新:优化器.step() 更新参数,学习率调度器.step() 动态调整学习率。 指标记录:返回更新后的训练指标(loss、梯度范数、学习率等)。 (5)步数递增与状态跟踪 step += 1 # 步数递增(在更新后,确保日志/评估对应已完成的更新) train_tracker.step() # 更新指标跟踪器的当前步数 step用于更新当前系统的训练步数,是整个训练过程的基础计算器。train_tracker.step()通知训练指标跟踪器进入新阶段。 is_log_step = cfg.log_freq > 0 and step % cfg.log_freq == 0 is_saving_step = step % cfg.save_freq == 0 or step == cfg.steps is_eval_step = cfg.eval_freq > 0 and step % cfg.eval_freq == 0 通过计算这些标志,用于后续的逻辑控制。 is_log_step:日志打印标志位,当配置的日志频率(cfg.log_freq)大于0且当前步数是cfg.log_freq 倍数时激活日志的打印。cfg.log_freq是来自用户的命令行参数的配置。默认是200步打印一次。 is_saving_step:保存标志位,当配置的保存频率相等或是其倍数时激活保存。默认是20000步保存一次。 is_eval_step:模型评估标志为。默认是20000评估一次。 标志通过模块化设计实现了训练过程的精细化控制,参数均来自TrainConfig配置类。 (6)训练指标日志(按频率触发) is_log_step = cfg.log_freq > 0 and step % cfg.log_freq == 0 if is_log_step: logging.info(train_tracker) # 控制台打印平均指标(如 loss: 0.523 | grdn: 1.234) if wandb_logger: wandb_log_dict = train_tracker.to_dict() if output_dict: wandb_log_dict.update(output_dict) # 合并模型输出指标(如策略特定的辅助损失) wandb_logger.log_dict(wandb_log_dict, step) # 上传至 WandB train_tracker.reset_averages() # 重置平均计数器,准备下一轮统计 根据前面计算的is_log_step满足时,默认是200步,则调用logging.info()输出训练的指标,如损失、准确率。如果启动了wandb,则将跟踪器数据转换为字典,合并额外输出(output_dict)后记录到WandB,关联当前步数。最后调用train_tracker.reset_averages() 清除跟踪累计值,为下一周期计数做准备。 (7)模型 checkpoint 保存(按频率触发) if cfg.save_checkpoint and is_saving_step: logging.info(f"Checkpoint policy after step {step}") checkpoint_dir = get_step_checkpoint_dir(cfg.output_dir, cfg.steps, step) save_checkpoint(checkpoint_dir, step, cfg, policy, optimizer, lr_scheduler) # 保存模型、优化器、调度器状态 update_last_checkpoint(checkpoint_dir) # 更新 "last" 软链接指向最新 checkpoint if wandb_logger: wandb_logger.log_policy(checkpoint_dir) # 上传 checkpoint 至 WandB artifacts 默认是启动了save_checkpoint, 每20000步将训练结束的状态进行保存一次,便于支持断点续训和模型版本管理。其中save_checkpoint函数和输出目录结构如下: 005000/ # training step at checkpoint ├── pretrained_model/ │ ├── config.json # 存储模型架构定义,包括网络层数、隐藏维度、激活函数类型等拓扑结构信息 │ ├── model.safetensors # 采用SafeTensors格式存储模型权重,包含所有可学习参数的张量数据,具有内存安全和高效加载特性 │ └── train_config.json # 序列化的训练配置对象,包含超参数(学习率、批大小)、数据路径、预处理策略等完整训练上下文 └── training_state/ ├── optimizer_param_groups.json # 记录参数分组信息,包括不同层的学习率、权重衰减等差异化配置 ├── optimizer_state.safetensors # 保存优化器动态状态,如Adam的一阶矩(momentum)和二阶矩(variance)估计,SGD的动量缓冲区等 ├── rng_state.safetensors # 捕获PyTorch全局RNG和CUDA(如使用)的随机数状态,确保恢复训练时数据采样和权重初始化的一致性 ├── scheduler_state.json #学习率调度器的内部状态,包括当前调度阶段、预热状态、周期信息等 └── training_step.json #当前训练迭代次数,用于精确定位训练数据读取位置 pretrained_dir = checkpoint_dir / PRETRAINED_MODEL_DIR policy.save_pretrained(pretrained_dir) # 保存模型架构和权重 cfg.save_pretrained(pretrained_dir) # 保存训练配置 save_training_state(checkpoint_dir, step, optimizer, scheduler) # 保存训练动态状态 update_last_checkpoint用于维护一个指向最新检查点目录的符号链接(symlink),在训练过程中跟踪和管理最新的模型检查点。 def update_last_checkpoint(checkpoint_dir: Path) -> Path: # 1. 构建符号链接路径:在检查点父目录下创建名为 LAST_CHECKPOINT_LINK 的链接 last_checkpoint_dir = checkpoint_dir.parent / LAST_CHECKPOINT_LINK # 2. 如果符号链接已存在,则先删除旧链接 if last_checkpoint_dir.is_symlink(): last_checkpoint_dir.unlink() # 3. 计算当前检查点目录相对于父目录的相对路径 relative_target = checkpoint_dir.relative_to(checkpoint_dir.parent) # 4. 创建新的符号链接,指向当前检查点目录 last_checkpoint_dir.symlink_to(relative_target) (8)策略评估(按频率触发) is_eval_step = cfg.eval_freq > 0 and step % cfg.eval_freq == 0 if cfg.env and is_eval_step: step_id = get_step_identifier(step, cfg.steps) logging.info(f"Eval policy at step {step}") with torch.no_grad(), torch.autocast(...) if use_amp else nullcontext(): eval_info = eval_policy(eval_env, policy, cfg.eval.n_episodes, ...) # 在环境中执行策略评估 # 记录评估指标(平均奖励、成功率、耗时)并上传至 WandB eval_tracker = MetricsTracker(...) eval_tracker.avg_sum_reward = eval_info["aggregated"]["avg_sum_reward"] ... logging.info(eval_tracker) if wandb_logger: wandb_logger.log_dict(...) wandb_logger.log_video(...) # 上传评估视频 在强化学习和机器人控制领域,策略评估(Policy Evaluation) 是指在特定环境中系统性测试智能体策略(Policy)性能的过程。它通过执行预设数量的评估回合,收集关键指标(如奖励、成功率、执行时间等),客观衡量策略的实际效果。总结一下就是有以下4个核心作用。 性能监控:跟踪训练过程中策略性能的变化趋势,判断模型是否收敛或退化 过拟合检测:通过独立评估集验证策略泛化能力,避免在训练数据上过拟合 决策依据:基于评估指标决定是否保存模型、调整超参数或终止训练 行为分析:通过可视化记录(如代码中的评估视频)观察策略执行细节,发现异常行为 负责在指定训练步骤对策略进行系统性评估,并记录关键指标与可视化结果。代码块实现了周期性策略评估机制,当满足环境配置(cfg.env)和评估步骤标志(is_eval_step)时才会触发。 按 cfg.eval_freq(这里默认是20000步)在环境中评估策略性能,核心功能: 无梯度推理:torch.no_grad() 禁用梯度计算,节省显存并加速评估。 指标计算:通过 eval_policy 获取平均奖励(avg_sum_reward)、成功率(pc_success)等关键指标。 可视化:保存评估视频(如机器人执行任务的轨迹)并上传至 WandB,直观观察策略行为。 训练更新 训练环境准备 device = get_device_from_parameters(policy) policy.train() with torch.autocast(device_type=device.type) if use_amp else nullcontext(): loss, output_dict = policy.forward(batch) 先调用get_device_from_parameters从模型参数自动推断当前计算设备,确保后续张量操作与模型参数在同医社保上,避免跨设备数据传输错误。 接着调用policy.train()将模型切换为训练模式,主要是启动dropout层随机失活功能,激活BatchNormalization层的移动平均统计更新等,与评估模式区别推理时需要调用policy.eval()。 最后使用了with预计用于创建一个上下文管理器,在进入代码块是调用管理器enter()方法,退出时调用exit()方法,这种机制确保资源被正确获取和释放或在特定上下文中执行代码,其代码等价于如下。 if use_amp: context_manager = torch.autocast(device_type=device.type) else: context_manager = nullcontext() with context_manager: loss, output_dict = policy.forward(batch) 也就是当条件激活use_amp启用混合精度训练,否则使用空上下文,torch.autocast会动态选择最优精度,对数值稳定性要求高的操作保留FP32。 最后就是调用loss, output_dict = policy.forward(batch)这是模型前向计算的核心函数,返回的是损失值和输出字典。 batch包含的是训练数据(图像、关节动作等),policy.forward()处理输入并生成预测值并计算与真实值的差距loss,然后loss将用于后续梯队的计算。 这里在总结一下什么是混合精度训练? 混合精度训练(Mixed Precision Training)主要指 FP16(半精度浮点数)与 FP32(单精度浮点数)的混合使用。 FP16:用于模型前向/反向传播的计算(如矩阵乘法、激活函数等),占用内存少(仅为 FP32 的一半),且支持 GPU 硬件加速(如 NVIDIA Tensor Cores)。 FP32:用于存储模型参数、梯度和优化器状态,避免低精度导致的数值精度损失(如梯度下溢、参数更新不稳定)。 默认是使用F32精度进行,因为使用低精度FP16可以加速训练速度(计算量小,可以并发更多数据)、降低内存(仅占用FP32一半的内存)等好处,所以代码中如果use_amp=True,将启动混合精度训练。但是低精度也有个坏处就是数值精度会损失导致梯度为0参数不稳定,但是可以通过GradScaler进行放大loss进而放大梯度,反向传播计算完了之后再反缩放回来,确保梯度进行不丢失。 梯队计算 grad_scaler.scale(loss).backward() grad_scaler.unscale_(optimizer) grad_norm = torch.nn.utils.clip_grad_norm_( policy.parameters(), grad_clip_norm, error_if_nonfinite=False, ) 这段代码是训练处理计算梯队的核心流程,主要就是用于计算梯队。 首先grad_scaler.scale(loss).backward()是将损失放大然后间接放大梯队,grad_scaler 是 PyTorch 的 GradScaler 实例,用于自动混合精度训练中管理梯度缩放,scale(loss)将损失值放大scaler倍(通常是2^n),避免梯度在反向传播中因数值过小而下溢为0。在混合精度训练(如使用 FP16)时,小梯度可能因数值精度不足而“下溢”(变为 0)。通过 scale(loss) 将损失值放大(例如放大 2^k 倍),反向传播时梯度也会同步放大,避免梯度下溢。最后backward()触发反向传播,计算所有可训练参数的梯度(此时梯度已被放大)。 其次再调用grad_scaler.unscale_(optimizer)将放大的梯度恢复到原始尺寸,由于损失被放大,方向传播的梯队也被同等放大,使用unscale_对所有参数梯度执行反缩放,相当于处于scaler。 最后是调用 torch.nn.utils.clip_grad_norm_执行梯度裁剪(Gradient Clipping),限制梯度的 L2 范数,防止梯度爆炸。其返回结果grad_norm是一个标量float,表示 梯度裁剪后所有参数梯度的总 L2 范数。 总结一下,段代码是混合精度训练中梯度处理的标准流程,解决了两个核心问题。 数值精度问题:通过 scale(loss) 和 unscale_(optimizer) 确保小梯度在低精度(如 FP16)下不丢失,同时恢复梯度原始范围用于后续优化。 梯度爆炸问题:通过 clip_grad_norm_ 限制梯度大小,避免过大梯度导致参数更新不稳定(例如权重跳变、Loss 震荡)。 但需要注意的是,如果没有启动混合精度grad_scaler.scale(loss)和grad_scaler.unscale_(optimizer)没有缩放和反缩放作用,但梯队采集逻辑正常运作。为启动混合精度时整个流程与标准 FP32 训练完全一致,grad_scaler 相关操作因「禁用状态」而自动失效,不会引入额外计算开销。 参数优化与更新 with lock if lock is not None else nullcontext(): grad_scaler.step(optimizer) grad_scaler.update() optimizer.zero_grad() if lr_scheduler is not None: lr_scheduler.step() 先试用with lock条件性启动线程锁,确保参数更新的线程安全。 grad_scaler.step(optimizer)执行参数更新,为了下一次重新计算损失。 接着调用grad_scaler.update()动态调整梯度的缩放因子,主要是自动平衡FP16的数值范围限制,在避免梯度下溢和溢出之间找到最优缩放比例。 最后就是调用optimizer.zero_grad()清除优化器中所有参数梯度缓存,因为Pytorch梯队计算默认都是累积模式(param.grad会累加),需要手动清零与loss.backward()配对,形成成"清零→前向→反向→更新→清零"的完整循环。 lr_scheduler.step()是按照批次更新学习率,可以使用循环学习率、余弦退化调度以及梯队的自适应调度等策略。 对于训练的闭环可以看成:scale(loss)→backward()→unscale_()→clip_grad_norm_()→step()→update()形成完整的混合精度训练流程,解决FP16数值范围限制问题。 最后update_policy返回的是train_metrics和output_dict。前者是传递给日志系统(如TensorBoard/WandB)进行可视化,后者是包含模型前向传播的详细输出(如预测值、中间特征),可用于后续分析 训练状态维护 if has_method(policy, "update"): policy.update() train_metrics.loss = loss.item() train_metrics.grad_norm = grad_norm.item() train_metrics.lr = optimizer.param_groups[0]["lr"] return train_metrics, output_dict 如果policy有update的方法,则调用进行更新,这里主要是做兼容性设计。 最后train_metrics主要是记录量化训练过程中的关键特征,为监控、调试和优化提供数据支撑。 训练收尾 # 训练结束后清理 if eval_env: eval_env.close() # 关闭评估环境(释放资源) logging.info("End of training") # 推送模型至HuggingFace Hub(若启用) if cfg.policy.push_to_hub: policy.push_model_to_hub(cfg) # 保存模型配置、权重至Hub,支持后续部署 eval_env.close():关闭仿真环境(如Gym/DM Control),释放显存和CPU资源。 policy.push_model_to_hub(cfg):将训练好的模型(权重+配置)推送至HuggingFace Hub,支持跨设备共享和部署。 总结
  • lerobot录制

    lerobot录制

    简介 lerobot record是关键核心流程,其包括了数据的采集和模型推理两部分。 如果是数据采集模式,命令启动如下 python -m lerobot.record \ --robot.disable_torque_on_disconnect=true \ --robot.type=so101_follower \ --robot.port=/dev/ttyACM0 \ --robot.id=R12252801 \ --robot.cameras="{ handeye: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30}, fixed: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}}" \ --teleop.type=so101_leader \ --teleop.port=/dev/ttyACM1 \ --teleop.id=R07252608 \ --dataset.repo_id=${HF_USER}/record-07271148\ --dataset.num_episodes=10 \ --dataset.reset_time_s=5 \ --dataset.push_to_hub=false \ --dataset.single_task="Grab the cube" \ --display_data=true 如果是模型推理模式,则命令如下: python -m lerobot.record \ --robot.type=so101_follower \ --robot.disable_torque_on_disconnect=true \ --robot.port=/dev/ttyACM0 \ --robot.cameras="{ handeye: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30}, fixed: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}}" \ --robot.id=R12252801 \ --display_data=false \ --dataset.single_task="Put brick into the box" \ --policy.path=outputs/weigh_07280842/pretrained_model \ --dataset.episode_time_s=240 \ --dataset.repo_id=${HF_USER}/eval_so101_07271148 默认录制时长是60s,60S后会停止,如果要改长加上--dataset.episode_time_s=640 主要的区别是如果是采集模式需要使用--teleop参数启动遥控机器,如果是模型推理模式则不需要启动遥控机器,但是需要指定模型路径--policy.path,本质上就是机器人的动作指令来源于哪里,要么来之遥控器的,要么来自模型推理出来的。 在阅读本文前,这里先做一个总结: 从上图可以看出整个录制流程主要围绕机器设备、遥控设备、模型、数据集四个要素进行展开。 机器:有SO101Follower、LeKiwi等机器,都继承Robot类。通过命令行参数robot.type调用make_robot_from_config函数选择创建具体的实例设备,函数返回的还是Robot但是指向的是具体的机器实例如SO101Follower,利用了多态的特性做到了解耦,如果要新添机器时,只需要参考SO101Follower添加一个新的设备即可。在创建机器实例时传递RobotConfig参数,这个参数依旧是抽象基类,其继承了draccus.ChoiceRregistry,通过命令行参数robot.type选择注册具体的配置如SO101FollowerConfig。 遥控:用于控制机器,常用于数据的采集。这里同样通过命令行参数teleop.type调用make_teleoperator_from_config函数选择创建具体的设备实例,创建实例时需要传递TeleoperatorConfig参数,其也是一个抽象基类,基于命令选择注册实例化的配置类参数,如SO101LeaderConfig。 模型:模型用于决策推理动作,其和遥控二选一,如果指定了遥控了,模型就不需要指定了。通用使用了机器、遥控的解耦机制,具体的实例化为ACT或DiffusionPolicy等。 数据:通过参数dataset.xxx将参数构建为DataRecordConfig类,然后将其中的信息传递给LeRobotDataset。 lerobot的整个代码框架将具体的设备、配置、模型等做到了解耦,方便拓展新的设备,其设计思想值得借鉴。下面本文将先按照执行命令启动流程可以分为参数解析、硬件设备初始化与连接、数据集初始化、采集循环、数据保存等几个阶段,接下来将按照这几个阶段进行介绍。 关键配置类 在介绍关键流程时,先来总结一下关键record流程需要的配置类数据结构,这些Config类主要用于机器、遥控、模型实例化传递的参数。 RecordConfig RecordConfig是整个record入口函数的传递参数,其通过命令行的参数构建形成RecordConfig对象传递给函数。 class RecordConfig: # 1. 核心以来配置 # 机器人硬件配置如型号、端口、相机参数等,有RobotConfig类定义 # 如s101_folloer的通信端口、关节限制等,必现通过命令行或代码显式传入。 robot: RobotConfig #数据集录制配置,如repo_id,num_episodes、fpsdeng,必现显式传入。 dataset: DatasetRecordConfig # 2.控制方式配置(可选) # 遥控操作器配置,如So100_leader,可选。 teleop: TeleoperatorConfig | None = None # 预训练策略配置,如模型路径、推理设备等。 policy: PreTrainedConfig | None = None #3. UI与反馈配置(可选) # 是否实时显示相机画面,通过rerun可视化工具 display_data: bool = False # 是否启用语音合成反馈,人机的提示声,默认开启。 play_sounds: bool = True # 是否从现有数据集续录 resume: bool = False def __post_init__(self): # 如果指定了policy,则进行加载模型 policy_path = parser.get_path_arg("policy") if policy_path: cli_overrides = parser.get_cli_overrides("policy") self.policy = PreTrainedConfig.from_pretrained(policy_path, cli_overrides=cli_overrides) self.policy.pretrained_path = policy_path # teleop和policy必须要指定一个 if self.teleop is None and self.policy is None: raise ValueError("Choose a policy, a teleoperator or both to control the robot") @classmethod def __get_path_fields__(cls) -> list[str]: """This enables the parser to load config from the policy using `--policy.path=local/dir`""" return ["policy"] RecordConfig中有几个关键的成员,分别是RobotConfig,DatasetRrcordConfig、TeleoperatorConfig、PreTrainedConfig。其中除了DatasetRrcordConfig外的其他几个都是继承draccus.ChoiceRegistry 和 abc.ABC,是一个抽象基类,需通过注册的子类(如特定机器人型号的配置类)实例化,种设计既保证了配置的结构化(继承 abc.ABC),又支持灵活的子类选择(通过 draccus.ChoiceRegistry 实现配置注册与解析)。 RobotConfig 控制硬件接口,确保机器人正确连接和数据采集; DatasetRecordConfig 控制数据存储,定义数据集格式和元信息; TeleoperatorConfig 和 PreTrainedConfig 控制机器人行为,分别对应手动和自动控制模式。 RobotConfig #标记为数据类,所有字段必现通过关键参数传入,自动生成__init__等方法 @dataclass(kw_only=True) #继承draccus框架选择注册机制,允许子类如SO100FollowerConfig、KochFollowerConfig #作为可选机器人型号注册,支持通过配置文件或命令行参数 #动态选择例如--robot.type=s101_follower #abc.ABC抽象基类,不可直接实例化,必现通过子类具体机器人型号配置使用。 class RobotConfig(draccus.ChoiceRegistry, abc.ABC): # 机器的唯一标识实例 id: str | None = None # 标定文件存储目录 calibration_dir: Path | None = None #是 dataclass 的初始化后钩子,用于补充参数校验逻辑,确保机器人配置的合法性。 #主要是检查cameras中的宽高、帧率等。 def __post_init__(self): if hasattr(self, "cameras") and self.cameras: for _, config in self.cameras.items(): for attr in ["width", "height", "fps"]: if getattr(config, attr) is None: raise ValueError( f"Specifying '{attr}' is required for the camera to be used in a robot" ) #通过 draccus.ChoiceRegistry 的 get_choice_name 方法,动态返回子类的注册名称(即机器人型号)。 @property def type(self) -> str: return self.get_choice_name(self.__class__) RobotConfig 是抽象基类(ABC),仅定义所有机器共有的通用配置字段,如id、calibration_dir。其继承了draccus.ChoiceRegistry实现了不同机器人型号的动态注册与选择。 下面以一个继承实例说明 @RobotConfig.register_subclass("so101_follower") @dataclass class SO101FollowerConfig(RobotConfig): # 机器的通信端口,如/dev/ttyACM0,通过--robot.port命令传入 port: str # 断开连接时是否关闭电机扭矩,通过命令--robot.disable_torque_on_disconnect disable_torque_on_disconnect: bool = True # 电机相对位置目标安全上限,防止运动幅度过大。 max_relative_target: int | None = None # 相机的配置通过字典的方式。 cameras: dict[str, CameraConfig] = field(default_factory=dict) #--robot.cameras="{ handeye: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30}, fixed: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}} # 是否使用角度制而非弧度制 use_degrees: bool = False 这是框架自定义的子类注册装饰器,作用是将 SO101FollowerConfig 类与字符串 so101_follower 绑定,使其成为 RobotConfig 的可选型号。装饰器内部会将 SO101FollowerConfig 类添加到 RobotConfig 的“子类注册表”中,键为 so101_follower,值为类本身。当用户通过命令行传入 --robot.type=so101_follower 时,框架会从注册表中查找并实例化该类。 总结一下,RobotConfig是机器的硬件配置,但是其是一个虚拟基类,其具体实例的机器型号通过子类继承RobotConfig,有很多型号的子类,其巧妙的使用了draccus.ChoiceRegistry注册机制(RobotConfig继承),通过参数--robot.type来指定具体实例化的设备。同时这里也使用了多态的特性,通过统一的RobotConfig接口操作不同实例的具体实现,如通过 robot.type 属性(基类定义),可以动态判断具体子类类型,并执行对应逻辑。 if cfg.robot.type == "so101_follower": # 通过基类接口获取子类类型 print(f"SO101 特有端口: {cfg.robot.port}") # 访问子类特有属性 elif cfg.robot.type == "so100_follower": print(f"SO100 特有IP: {cfg.robot.ip_address}") # 另一子类的特有属性 DatasetRecordConfig class DatasetRecordConfig: #1. 数据集标识与存储 #数据集唯一标识符,格式为 '{hf_username}/{dataset_name}' (e.g. `lerobot/test`),用于定位hugging Face Hub仓库或本地路径 repo_id: str #录制任务的文字描述,用于标注数据用途。 single_task: str # 数据集本地存储根目录,如果未指定使用~/.cache/huggingface/datasets root: str | Path | None = None # 2.录制控制参数 # 录制的帧率,控制数据采集帧率。 fps: int = 30 # 单段录制时长,默认60S episode_time_s: int | float = 60 # 重置环境时长,两端录制之间预留环境重置时间。 reset_time_s: int | float = 60 # 总录制段数,控制数据集包含样本数量(如50段*60S=3000S总数据) num_episodes: int = 50 #3. 数据处理与上传 # 是否将录制图像帧编码为视频文件,默认开启 video: bool = True # 是否上传数据集到hugging Face Hub,默认自动上传。 push_to_hub: bool = True # Hub仓库是否设置为私有,默认是公开 private: bool = False # 上传到hub上的数据集标签 tags: list[str] | None = None #4. 图像存储性能参数 #图像写入的线程数 num_image_writer_processes: int = 0 #每个相机图像写入的线程数 num_image_writer_threads_per_camera: int = 4 def __post_init__(self): if self.single_task is None: raise ValueError("You need to provide a task as argument in `single_task`.") #__post_init__ 是 dataclass 的初始化后钩子方法,在 __init__ 初始化所有字段后自动执行, # 用于补充校验逻辑。 # 确保 single_task 字段不为空。因为 single_task 是描述录制任务的核心元数据(无默认值且必选), # 若未提供则直接抛出错误,避免后续数据标注缺失关键信息。 DatasetRecordConfig 是数据集录制任务的参数容器,被嵌套在 RecordConfig 中(作为 RecordConfig.dataset 字段),最终通过 parser.wrap() 装饰器从命令行参数解析生成实例。 例如,用户通过命令行指定: --dataset.repo_id=aliberts/record-test --dataset.num_episodes=2 --dataset.single_task="Grab the cube" 这些参数会被解析为 DatasetRecordConfig 实例,其字段值(如 num_episodes=2)直接控制录制逻辑(如 record_loop 函数的循环次数)。 TeleoperatorConfig @dataclass(kw_only=True) class TeleoperatorConfig(draccus.ChoiceRegistry, abc.ABC): # Allows to distinguish between different teleoperators of the same type id: str | None = None # Directory to store calibration file calibration_dir: Path | None = None @property def type(self) -> str: return self.get_choice_name(self.__class__) TeleoperatorConfig 是远程遥控操作器(如手柄、键盘、 leader 机器人)的抽象配置基类,用于定义所有遥操作器共有的通用配置字段和动态选择机制。它与RobotConfig类似,同样继承了draccus.ChoiceRegistry 实现了注册机制,其具体的子类又继承TeleoperatorConfig。 @TeleoperatorConfig.register_subclass("so101_leader") @dataclass class SO101LeaderConfig(TeleoperatorConfig): # Port to connect to the arm port: str use_degrees: bool = False 用户通过-teleop.type指定来实例化具体的操作设备,如--teleop.type=so101_leader时,具体的流程如下。 框架通过 draccus.ChoiceRegistry 查找注册名称为 so101_leader 的子类(如 SO101LeaderConfig); 实例化该子类,接收命令行参数(如 --teleop.port=/dev/ttyACM0)并初始化特有字段(如 port); 最终通过 TeleoperatorConfig 基类引用(如 cfg.teleop)传入 make_teleoperator_from_config 函数,创建具体遥操作器实例。 PreTrainedConfig @dataclass class PreTrainedConfig(draccus.ChoiceRegistry, HubMixin, abc.ABC): #1. 观测与特征配置 # 策略输入的观测部署,如=2表示输入当前+前1步观测 n_obs_steps: int = 1 #特征归一化模式映射{"image":"mead_std"} normalization_mapping: dict[str, NormalizationMode] = field(default_factory=dict) #输入特征的规范{"image": PolicyFeature(type=VISUAL, shape=(3, 224, 224))}) input_features: dict[str, PolicyFeature] = field(default_factory=dict) #输出特征规范输出特征规范(定义策略输出的特征类型,如 {"action": PolicyFeature(type=ACTION, shape=(6,))}) output_features: dict[str, PolicyFeature] = field(default_factory=dict) #2. 设备与性能配置 # 策略运行设备如是否使用cuda或cpu device: str | None = None # cuda | cpu | mp #是否启用自动混合精度训练 use_amp: bool = False #3. hugging face Hub继承 #是否将配置上传到Hub push_to_hub: bool = False #Hub的id repo_id: str | None = None # 仓库是否私有 private: bool | None = None # 仓库的标签 tags: list[str] | None = None # Add tags to your policy on the hub. license: str | None = None #方法在__init__后执行,处理运行设备选择和AMP的可用性 def __post_init__(self): self.pretrained_path = None #自动选择可用设备,如果用户指定的设备不可用。 if not self.device or not is_torch_device_available(self.device): auto_device = auto_select_torch_device() logging.warning(f"Device '{self.device}' is not available. Switching to '{auto_device}'.") self.device = auto_device.type #自动禁用不支持AMP if self.use_amp and not is_amp_available(self.device): logging.warning( f"Automatic Mixed Precision (amp) is not available on device '{self.device}'. Deactivating AMP." ) self.use_amp = False #通过 draccus.ChoiceRegistry 的 get_choice_name 方法,返回子类注册的策略型号名称(如 sac、tdmpc),用于日志打印和策略选择逻辑 @property def type(self) -> str: return self.get_choice_name(self.__class__) #抽象方法 observation_delta_indices:返回观测特征的差分索引 action_delta_indices : 返回动作特征的差分索引 reward_delta_indices : 返回奖励特征的差分索引 et_optimizer_preset() : 返回优化器配置 get_scheduler_preset() : 返回学习率调度器配置(如 CosineAnnealing)。 validate_features() :校验 input_features 和 output_features 的合法性(如形状匹配) #将策略配置实例(含超参数、特征规范、设备设置等)序列化 #为标准 JSON 文件 config.json,存储到指定目录。这是策略配 #置上传到 Hugging Face Hub 的前置步骤——Hub 要求模型/配 #置必须包含 config.json 以描述其参数,而 _save_pretrained 正 #是生成该文件的标准化实现。例如,当调用 #config.push_to_hub() 时,框架会先调用 _save_pretrained 将 #配置保存到临时目录,再将该目录上传到 Hub,最终用户可通 #过 PreTrainedConfig.from_pretrained("repo_id") 加载此 JSON #配置。 def _save_pretrained(self, save_directory: Path) -> None: with open(save_directory / CONFIG_NAME, "w") as f, draccus.config_type("json"): draccus.dump(self, f, indent=4) #from_pretrained 是 PreTrainedConfig 的 核心类方法,用于从 #本地目录 或 Hugging Face Hub 加载预训练策略的配置文件 #(config.json),并实例化为具体的配置对象(如 SACConfig、 #TDMPCConfig)。它是策略配置“复用与共享”的入口,支持通过 #命令行参数覆盖配置,实现灵活的参数调整。 @classmethod def from_pretrained( cls: Type[T], pretrained_name_or_path: str | Path, *, force_download: bool = False, resume_download: bool = None, proxies: dict | None = None, token: str | bool | None = None, cache_dir: str | Path | None = None, local_files_only: bool = False, revision: str | None = None, **policy_kwargs, ) -> T: model_id = str(pretrained_name_or_path) config_file: str | None = None #从本地目录加载 if Path(model_id).is_dir(): if CONFIG_NAME in os.listdir(model_id): config_file = os.path.join(model_id, CONFIG_NAME) else: print(f"{CONFIG_NAME} not found in {Path(model_id).resolve()}") #从hugging face hub下载 else: try: config_file = hf_hub_download( repo_id=model_id, filename=CONFIG_NAME, revision=revision, cache_dir=cache_dir, force_download=force_download, proxies=proxies, resume_download=resume_download, token=token, local_files_only=local_files_only, ) except HfHubHTTPError as e: raise FileNotFoundError( f"{CONFIG_NAME} not found on the HuggingFace Hub in {model_id}" ) from e # HACK: this is very ugly, ideally we'd like to be able to do that natively with draccus # something like --policy.path (in addition to --policy.type) cli_overrides = policy_kwargs.pop("cli_overrides", []) with draccus.config_type("json"): return draccus.parse(cls, config_file, args=cli_overrides) PreTrainedConfig是所有策略模型如ACT,TDMPC的抽象配置类,定义了策略训练/推理所需的通用参数,特征规范、设备配置及Hugging Face Hub交互机制。它通过 dataclass、draccus.ChoiceRegistry 和 abc.ABC 实现“配置标准化”“多策略兼容”和“Hub 集成”,是策略初始化的核心参数载体。 draccus.ChoiceRegistry:通用跟前面的RobotConfig类似,支持动态子类注册,允许子类(如 SACConfig、TDMPCConfig)作为“策略选项”注册,支持通过 --policy.type=sac 动态选择。 HubMixin:Hugging Face Hub 交互混入类,提供 from_pretrained(从 Hub/本地加载配置)和 _save_pretrained(保存配置到 Hub/本地)方法,实现策略配置的共享与复用。 abc.ABC:抽象基类,包含未实现的抽象方法(如 get_optimizer_preset),强制子类必须实现核心逻辑,确保策略配置的完整性。 参数解析 输入的参数会draccus 解析器读取所有 --xxx 参数,映射到 RecordConfig 类(定义在 record.py 中),生成结构化配置对象 RecordConfig类型的cfg实例,关键的配置项如下: robot.type=so101_follower:指定机器人类型为 so101_follower(从动机器人)。 teleop.type=so101_leader:指定遥操作器类型为 so101_leader(主动遥操作器)。 dataset.num_episodes=10:采集10个回合数据。 display_data=true:启用 Rerun 可视化工具显示摄像头画面和机器人状态。 RecordConfig的cfg实例构造是,会调用RecordConfig.post_init 检查如single_task,--teleop.type以及-policy.type必现要选择一个。因为录制要么就是验证模式,要么就是数据采集模式。验证模式就是通过大模型推理得到的动作数据,而采集模式通过遥控臂得到的数据控制设备。 硬件初始化与连接 机器人的初始化so101_follower robot = make_robot_from_config(cfg.robot) robot.connect() 传入的参数是cfg.robot,cfg.robot是一个基类,实际实例化为so101_follower的实例,这里使用了多态的特性。根据传入的参数--robot.port=/dev/ttyACM0 连接到机器人串口,初始化通信协议(如 ROS 或自定义串口协议)。根据 --robot.cameras 配置两个 OpenCV 摄像头。--robot.disable_torque_on_disconnect=true 确保程序退出时机器人断电,避免碰撞风险。 遥控机器初始化so101_leader teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None teleop.connect() 遥控机器初始化是可选的,如果指定了模型即是验证的方式,那么就不用启动遥控机器了,只有采集数据的时候才需要遥控机器。 总结一下,通过make_robot_from_config根据传入的robot.type创建一个机器实例,这里是class SO101Follower(Robot)。如果是采集模式还会创建一个遥控机器人实例,通过make_teleoperator_from_config生成实例class SO101Leader(Teleoperator)。 数据集创建 数据特征定义 # 动作特征->数据集动作特征 action_features = hw_to_dataset_features(robot.action_features, "action", cfg.dataset.video) # 观测特征->数据集观测特征 obs_features = hw_to_dataset_features(robot.observation_features, "observation", cfg.dataset.video) # 整个动作特征、观测特征 dataset_features = {**action_features, **obs_features} from pprint import pprint print("Action Features:") pprint(action_features) print("Observation Features:") pprint(obs_features) print("Dataset Features:") pprint(dataset_features) 数据应该长什么样? 这总的要格式要求吧? hw_to_dataset_features就是将机器硬件特征(电机位置、摄像头图像)转换为数据集特征描述,也就是说数据要按照这个格式来。格式包含数据类型dtype、形状shape等。下面根据直接打印action_features、obs_features、dataset_features的打印的结果来分别分析。 Action Features: { 'action': { 'dtype': 'float32', 'shape': (6,), 'names': ['shoulder_pan.pos', 'shoulder_lift.pos', 'elbow_flex.pos', 'wrist_flex.pos', 'wrist_roll.pos', 'gripper.pos'] } } Observation Features: { 'observation.state': { # 机器人关节状态 'dtype': 'float32', 'shape': (6,), 'names': ['shoulder_pan.pos', ..., 'gripper.pos'] # 与动作特征电机名称一致 }, 'observation.images.handeye': { # 手眼相机图像 'dtype': 'video', 'shape': (480, 640, 3), 'names': ['height', 'width', 'channels'] }, 'observation.images.fixed': { # 固定视角相机图像 'dtype': 'video', 'shape': (480, 640, 3), 'names': ['height', 'width', 'channels'] } } 动作与观测特征 生成action_features、obs_features都是调用hw_to_dataset_features函数,下面来看看这个函数的实现。 # 函数入参为3个 # -hw_features: dict[str, type | tuple] 机器硬件特征字典,键为特征名称如关节名、摄像头名,值为特征的类型如float或图像尺寸元组(height,width,channels),注意这里的type是类型不是实际的数值 # -prefix: str 特征前缀,用于区分数据集不同的部分 # -use_video: bool 是否将摄像头图像编码为视频 def hw_to_dataset_features( hw_features: dict[str, type | tuple], prefix: str, use_video: bool = True ) -> dict[str, dict]: features = {} # 1.硬件特征分类,将关节特征和图像特征分出来 # jointfs是关键特征,通过for循环遍历hw_features.items()满足ftype为float。然后将key和ftype通过新的键值对存储到joint_fs中。 #示例输出:{"shoulder_pan.pos": float, "shoulder_lift.pos": float, ..., "gripper.pos": float}(共6个关节)。 joint_fts = {key: ftype for key, ftype in hw_features.items() if ftype is float} #cam_fs是摄像头的图像特征,值为(height, width, channels)元组。 #示例输出:{"handeye": (480, 640, 3), "fixed": (480, 640, 3)}(两个摄像头,分辨率480×640,RGB三通道)。 cam_fts = {key: shape for key, shape in hw_features.items() if isinstance(shape, tuple)} #2. 关节特征转换数值型 #根据传入的action动作指令,构建一个新的键值对,键为action。 if joint_fts and prefix == "action": features[prefix] = { "dtype": "float32", #统一数值类型为float32,适合模型训练 "shape": (len(joint_fts),),#形状关键数量,如6个关键,(6,) "names": list(joint_fts),#关节名称列表,与机人电机意义对应 } #3. 观测装特特征,与前面action结构类似,只是键值为observation.state if joint_fts and prefix == "observation": features[f"{prefix}.state"] = { "dtype": "float32", "shape": (len(joint_fts),), "names": list(joint_fts), } #4. 摄像头特征转换,为每个摄像头生成图像/视频存储特征 for key, shape in cam_fts.items(): features[f"{prefix}.images.{key}"] = { "dtype": "video" if use_video else "image", #存储类型:视频或图像 "shape": shape, #图像尺寸(height, width, channels) "names": ["height", "width", "channels"],#形状维度名称 } _validate_feature_names(features) return features 数据集的创建关键来源hw_features: dict[str, type | tuple]参数,该值来源于robot.action_features、robot.observation_features,根据参数的实例化以SO101Follower实例为例。 @property def _motors_ft(self) -> dict[str, type]: return {f"{motor}.pos": float for motor in self.bus.motors} @property def _cameras_ft(self) -> dict[str, tuple]: return { cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras } @cached_property def observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} @cached_property def action_features(self) -> dict[str, type]: return self._motors_ft 电机位置特征结构_motors_ft定义了电机位置特征结构,描述关节运动的状态,其来源于self.bus.motors管理的电机列表,在Init中初始化,包含6个关节电机: motors={ "shoulder_pan": Motor(1, "sts3215", ...), # 肩转 "shoulder_lift": Motor(2, "sts3215", ...), # 肩抬 "elbow_flex": Motor(3, "sts3215", ...), # 肘弯 "wrist_flex": Motor(4, "sts3215", ...), # 腕弯 "wrist_roll": Motor(5, "sts3215", ...), # 腕转 "gripper": Motor(6, "sts3215", ...), # 夹爪 } 其输出格式返回字典 {电机名.pos: 数据类型},例如: { "shoulder_pan.pos": float, "shoulder_lift.pos": float, ..., "gripper.pos": float } float表示电机位置是浮点数值。 摄像头图像特征定义机器人摄像头图像特征的结构,用于描述视觉传感器数据格式。来源self.cameras 是由 make_cameras_from_configs 创建的摄像头实例(如 handeye、fixed 摄像头),配置来自 self.config.cameras(包含分辨率等参数)。其输出格式为返回字典 {摄像头名: (高度, 宽度, 通道数)},例如: { "handeye": (480, 640, 3), # 480px高、640px宽、RGB三通道 "fixed": (480, 640, 3) } 观测特征observation_features整合了电机位置和摄像头图像特征,定义机器人完整可观测装特,提供数据集录制和策略决策。通过字典解包(**)合并 _motors_ft(电机位置)和 _cameras_ft(摄像头图像),输出示例: { # 电机位置特征(来自 _motors_ft) "shoulder_pan.pos": float, "shoulder_lift.pos": float, ..., "gripper.pos": float, # 摄像头图像特征(来自 _cameras_ft) "handeye": (480, 640, 3), "fixed": (480, 640, 3) } 动作特征action_features定义机器人的动作指令格式,即遥控机器人。其直接服用_motors_ft,说明动作指令就是电机目标位置。 数据集特征合并 dataset_features = {**action_features, **obs_features} 合并动作特征与观测特征,形成数据集完整存储结构,用于初始化 LeRobotDataset。确保每个时间步的记录包含「观测→动作」的完整配对,满足训练需求(如模仿学习中,观测为输入,动作为标签)。 Dataset Features: { 'action': { 'dtype': 'float32', 'shape': (6,), 'names': [ 'shoulder_pan.pos', 'shoulder_lift.pos', 'elbow_flex.pos', 'wrist_flex.pos', 'wrist_roll.pos', 'gripper.pos' ] }, 'observation.state': { 'dtype': 'float32', 'shape': (6,), 'names': [ 'shoulder_pan.pos', 'shoulder_lift.pos', 'elbow_flex.pos', 'wrist_flex.pos', 'wrist_roll.pos', 'gripper.pos' ] }, 'observation.images.handeye': { 'dtype': 'video', 'shape': (480, 640, 3), 'names': ['height', 'width', 'channels'] }, 'observation.images.fixed': { 'dtype': 'video', 'shape': (480, 640, 3), 'names': ['height', 'width', 'channels'] } } action:是遥控操作机器(如SO101Leader)的关节位置数据 observation.state:机器人(如S0101Follower)实时采集的关节位置数据。 observation.images.xx:机器人实时采集的图像数据,可有多个。 创建数据集 创建数据集,是创建一个空的数据集。 传入的参数有repo_id标识了数据集的目录,root为数据集的根目录等等,具体如下阐述。 dataset = LeRobotDataset.create( cfg.dataset.repo_id, cfg.dataset.fps, root=cfg.dataset.root, robot_type=robot.name, features=dataset_features, use_videos=cfg.dataset.video, image_writer_processes=cfg.dataset.num_image_writer_processes, image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras), ) cfg.dataset.repo_id: 数据集的唯一标识,用于本地粗才能路径标识和上传hugging Face Hub上传的仓库名称 cfg.dataset.fps:录制帧率。 root=cfg.dataset.root:本地存储的路径,可以通过--dataset.root指定,如果没有提供,则默认使用缓存目录~/.cache/huggingface/ robot_type=robot.name:机器的唯一标识,会写入到meta.json中。 features=dataset_features:核心参数,数据集特征定义,由上一章节合并特征。 use_videos=cfg.dataset.video: 图像的存储格式,true表示使用mpr格式。False保留为PNG格式,占用空间大。内部在录制结束后,通过encode_episode_videos 调用 ffmpeg 将 PNG 序列转为视频(定义于 lerobot_dataset.py). image_writer_processes: 图像写入的进程数量,默认0,仅使用线程即一个进程。 image_writer_threads:图像写入的线程数,默认4线程/摄像头 × N摄像头。 LeRobotDataset.create LeRobotDataset.create返回的是一个LeRobotDataset对象实例。 典型的LeRobotDataset本地存储为: . ├── data │ ├── chunk-000 │ │ ├── episode_000000.parquet │ │ ├── episode_000001.parquet │ │ ├── episode_000002.parquet │ │ └── ... │ ├── chunk-001 │ │ ├── episode_001000.parquet │ │ ├── episode_001001.parquet │ │ ├── episode_001002.parquet │ │ └── ... │ └── ... ├── meta │ ├── episodes.jsonl │ ├── info.json │ ├── stats.json │ └── tasks.jsonl └── videos ├── chunk-000 │ ├── observation.images.laptop │ │ ├── episode_000000.mp4 │ │ ├── episode_000001.mp4 │ │ ├── episode_000002.mp4 │ │ └── ... │ ├── observation.images.phone │ │ ├── episode_000000.mp4 │ │ ├── episode_000001.mp4 │ │ ├── episode_000002.mp4 │ │ └── ... ├── chunk-001 └── ... 核心属性 meta: LeRobotDatasetMetadata类,元数据管理器,存储数据集“说明书”:特征定义(动作/观测维度)、帧率、机器人类型、任务描述等。 repo_id: str类型,数据集唯一标识(如 lerobot/pick_cube),用于 Hugging Face Hub 定位和本地路径标识。 root:Path,本地存储根路径(默认:~/.cache/huggingface/lerobot/{repo_id}),包含 data/(Parquet)、meta/(元数据)、videos/(视频)。 revision: str类型,表示数据集的版本,默认代码库版本为v2.1。 tolerance_s:float类型,时间戳校验容差,确保录制视频帧率稳定性。 hf_dataset: Hugging Face Dataset 对象,存储非图像数据(关节位置、时间戳等),以 Parquet 格式。 episode_data_index: dirc类型,episode 索引映射表,记录每个 episode 的起始/结束帧位置(如 {"from": [0, 300], "to": [299, 599]}),加速帧检索。 delta_indices: dict类型,时间戳偏移索引(如 {"past": [-2, -1], "future": [1, 2]}),用于多模态数据同步(如动作与图像对齐)。 episode_buffer: dict类型,内存缓冲区,临时存储当前录制的帧数据(如 {"action": [], "observation.images.handeye": [], "timestamp": []})。 image_writer:AsyncImageWriter,异步图像写入器(多线程/多进程),避免图像存储阻塞主控制循环(确保录制帧率稳定)。 image_transforms:图像预处理函数(如 torchvision.transforms.Resize、Normalize),训练时动态应用于摄像头图像。 video_backend: str类型,视频解码后端(默认 torchcodec 或 pyav),支持从 MP4 中精准提取指定时间戳的帧。 核心方法 init: 数据集加载的入口,根据本地缓存状态完成元数据校验、数据下载(若缺失) 和时间戳同步检查,确保数据集可用。 add_frame:逐帧添加数据到缓冲区,录制时将单帧数据(动作、观测、图像)添加到 episode_buffer,并异步写入图像(避免阻塞控制循环)。 save_episode:写入到磁盘,录制完成一个 episode 后,将 episode_buffer 中的数据写入磁盘。包括非图像数据转换为Parquet文件存储到xxx.parquet,图像数据编码为MP4存储,元数据更新info.json,episodes.jsonl,episodes_stats.jsonl。 getiterm:数据的加载与训练适配,实现 torch.utils.data.Dataset 接口,支持通过 DataLoader 加载数据用于训练。 push_to_hub:将数据集(元数据、Parquet、视频)上传至 Hugging Face Hub,自动生成数据集卡片(README.md),包含特征说明、硬件兼容性和统计信息 pull_from_repo:从 Hub 拉取指定版本的数据集,支持按 allow_patterns 筛选文件(如仅拉取元数据 meta/ 或特定 episode)。 start_image_writer/stop_image_writer:为避免图像存储阻塞录制主循环(导致帧率波动),通过 AsyncImageWriter 启动多线程/多进程写入器。 create:数据的新建或加载,是初始化的类方法。 @classmethod def create( cls, repo_id: str, # 数据集标识(用户指定) fps: int, # 帧率(默认 30 FPS) features: dict, # 特征定义(动作+观测,来自机器人硬件) root: ..., # 本地路径 robot_type: str, # 机器人类型(如 "so101_follower") use_videos: bool = True, # 是否编码视频(默认 True) image_writer_processes: int = 0, # 图像写入进程数 image_writer_threads: int = 4 * len(robot.cameras), # 图像写入线程数 ) -> "LeRobotDataset": # 1. 初始化元数据(调用 LeRobotDatasetMetadata.create) obj.meta = LeRobotDatasetMetadata.create( repo_id=repo_id, fps=fps, features=features, robot_type=robot_type, use_videos=use_videos ) # 2. 启动图像写入器(异步写入) if image_writer_processes or image_writer_threads: obj.start_image_writer(image_writer_processes, image_writer_threads) # 3. 初始化 episode 缓冲区(内存临时存储) obj.episode_buffer = obj.create_episode_buffer() # 4. 创建空 HF Dataset(存储非图像数据) obj.hf_dataset = obj.create_hf_dataset() return obj 元数据驱动:通过 LeRobotDatasetMetadata.create 确保数据集特征与机器人硬件严格对齐,避免录制后因特征不匹配导致训练错误。 性能优化:异步图像写入器(多线程/多进程)解决了录制时 I/O 阻塞问题,保障实时控制循环的稳定性。 标准化存储:统一 Parquet+MP4 格式,兼容 Hugging Face Hub 生态和 PyTorch DataLoader,简化“录制-训练”流程。 LeRobotDataset.create 是一个类方法,通过 @classmethod 装饰,其核心特性是无需先实例化LeRobotDataset 类即可调用,直接通过类名 LeRobotDataset.create(...) 触发,并返回一个初始化完成的 LeRobotDataset 实例,通常用于工厂模式或单例模式的场景。 类方法(@classmethod)的第一个参数是 cls(代表类本身),因此可以直接通过类名触发,而非实例。LeRobotDataset.create 的作用正是绕开普通构造函数 init,为新建数据集执行特殊初始化逻辑(如元数据创建、目录结构搭建、异步写入器启动等),最终返回一个完整的 LeRobotDataset 实例。 LeRobotDatasetMetadata.create 在LeRobotDataset中会调用到obj.meta = LeRobotDatasetMetadata.create创建一个LeRobotDatasetMetadata对象,其核心功能是为全新数据集初始化元数据结构,包括目录创建、特征定义合并、元数据文件生成(如 meta/info.json)和硬件兼容性校验,确保后续机器人数据录制(关节状态、摄像头图像等)与存储格式严格对齐。 @classmethod def create( cls, repo_id: str, fps: int, features: dict, robot_type: str | None = None, root: str | Path | None = None, use_videos: bool = True, ) -> "LeRobotDatasetMetadata": """Creates metadata for a LeRobotDataset.""" obj = cls.__new__(cls) #创建空的 LeRobotDatasetMetadata 实例(不触发 __init__,避免加载现有元数据) obj.repo_id = repo_id # 数据集标识(如 "username/pick_cube") obj.root = Path(root) if root is not None else HF_LEROBOT_HOME / repo_id # 本地路径(默认缓存目录) obj.root.mkdir(parents=True, exist_ok=False) # TODO(aliberts, rcadene): implement sanity check for features # 合并用户提供的特征与默认特征(补充必选字段) features = {**features, **DEFAULT_FEATURES} _validate_feature_names(features) # 校验特征名称合法性(如禁止含空格、特殊字符,确保与硬件接口一致) obj.tasks, obj.task_to_task_index = {}, {} # 任务列表(如 {"pick": 0, "place": 1})及索引映射 obj.episodes_stats, obj.stats, obj.episodes = {}, {}, {} # 初始化 episode 统计信息、全局统计、episode 列表 obj.info = create_empty_dataset_info(CODEBASE_VERSION, fps, features, use_videos, robot_type) #生成数据集的“总说明书”(meta/info.json),包含以下关键信息 if len(obj.video_keys) > 0 and not use_videos: raise ValueError() write_json(obj.info, obj.root / INFO_PATH) obj.revision = None return obj LeRobotDatasetMetadata.create 是新建数据集的“元数据基石”,通过标准化目录结构、特征定义和兼容性校验,确保机器人录制数据(动作、图像、状态)的存储格式与硬件特征严格对齐。其输出的 LeRobotDatasetMetadata 实例是连接机器人硬件与数据集文件系统的核心桥梁,为后续数据录制、编码和训练加载提供统一的元信息描述。 加载数据集 加载数据集是加载已经存在的数据集,针对的场景是针对此前的录制场景接着录制,与创建数据集不同,创建数据集是从头开始,创建一个空的数据集。那么lerobot怎么进行恢复了? 前面章节无论是LeRobotDataset.create 还是LeRobotDatasetMetadata.create都会绕过类的构造函数xxx.init的运行,那么xxx.init在哪里运行了? LeRobotDataset.init 是类的默认构造函数,仅在直接实例化 LeRobotDataset 时调用,核心场景是 恢复已有数据集的录制或加载。而 LeRobotDataset.create 是类方法,用于创建全新数据集,二者分工明确,对应不同的用户需求。接下来针对恢复的场景来说明。 当输入参数--resume=true,即在原来的数据集基础上进行。 if cfg.resume: #走恢复模式 dataset = LeRobotDataset( cfg.dataset.repo_id,# 数据集标识(如 "lerobot/pick_cube") root=cfg.dataset.root,# 本地存储路径(默认:~/.cache/huggingface/lerobot/{repo_id}) ) ) if hasattr(robot, "cameras") and len(robot.cameras) > 0: dataset.start_image_writer( num_processes=cfg.dataset.num_image_writer_processes, num_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras), ) sanity_check_dataset_robot_compatibility(dataset, robot, cfg.dataset.fps, dataset_features) else: dataset = LeRobotDataset.create(......) 关键流程是实例化LeRobotDataset,触发LeRobotDataset.init 方法,LeRobotDataset类的初始化代码如下。 LeRobotDataset.init LeRobotDataset: def __init__( self, repo_id: str, # 数据集标识(如 "lerobot/pick_cube") root: ..., # 本地存储路径(默认:~/.cache/huggingface/lerobot/{repo_id}) episodes: list[int] | None = None, # 指定加载的 episode 索引(如 [0, 2, 5]) image_transforms: Callable | None = None, # 图像预处理(如 Resize、Normalize) delta_timestamps: ..., # 时间戳偏移(用于多模态数据同步) tolerance_s: float = 1e-4, # 时间戳校验容差(确保帧率稳定性) revision: ..., # 数据集版本(默认代码库版本 v2.1) force_cache_sync: bool = False, # 强制同步缓存(忽略本地文件,重新拉取) download_videos: bool = True, # 是否下载视频文件 video_backend: str | None = None, # 视频解码后端(如 "pyav"、"torchcodec") ): # 1. 初始化基础路径与配置 self.repo_id = repo_id self.root = Path(root) or HF_LEROBOT_HOME / repo_id # 本地路径 #没有创建的话默认就是~/.cache/huggingface/lerobot self.root.mkdir(exist_ok=True, parents=True) # 创建目录(若不存在) # 2. 加载元数据(通过 LeRobotDatasetMetadata) self.meta = LeRobotDatasetMetadata( self.repo_id, self.root, self.revision, force_cache_sync=force_cache_sync ) # 3. 加载实际数据(优先本地缓存,缺失则从 Hub 下载) try: if force_cache_sync: raise FileNotFoundError # 强制同步时跳过本地缓存 # 验证本地数据文件完整性 assert all((self.root / fpath).is_file() for fpath in self.get_episodes_file_paths()) self.hf_dataset = self.load_hf_dataset() # 加载 Parquet 格式数据(非图像) except (AssertionError, FileNotFoundError): # 从 Hub 下载数据(含 Parquet 和视频文件) self.revision = get_safe_version(self.repo_id, self.revision) # 校验版本合法性 self.download_episodes(download_videos) # 下载指定 episode 数据 self.hf_dataset = self.load_hf_dataset() # 重新加载数据 # 4. 数据校验(确保时间戳与帧率匹配) timestamps = torch.stack(tuple(self.hf_dataset["timestamp"])).numpy() episode_indices = torch.stack(tuple(self.hf_dataset["episode_index"])).numpy() check_timestamps_sync( # 校验每帧时间间隔是否为 1/fps ± tolerance_s timestamps, episode_indices, self.episode_data_index, self.fps, self.tolerance_s ) LeRobotDataset 类的构造函数(init 方法),其核心作用是加载已存在的机器人数据集(本地或 Hugging Face Hub),并完成元数据校验、数据完整性检查、时间戳同步等关键步骤,为后续数据访问(如训练、可视化)提供统一接口。该方法是 LeRobotDataset 类的“入口”,仅在恢复已有数据集或加载数据集用于训练时调用。 LeRobotDataset 是基于 PyTorch Dataset 的子类,专为机器人数据设计,支持两种场景: 加载本地已有数据集:从指定路径(root)直接读取元数据和数据文件。 从 Hugging Face Hub 下载数据集:若本地数据缺失,自动从 Hub 拉取并加载。 无论哪种场景,init 均确保数据集的元数据一致性(如机器人类型、帧率)、数据完整性(文件不缺失)和时间戳有效性(符合录制帧率),为下游任务(如策略训练)提供可靠数据。 LeRobotDatasetMetadata.init 在LeRobotDataset.init中实例化了self.meta = LeRobotDatasetMetadata,该类创建时会调用构造函数LeRobotDatasetMetadata.init。 lass LeRobotDatasetMetadata: def __init__( self, repo_id: str, root: str | Path | None = None, revision: str | None = None, force_cache_sync: bool = False, ): self.repo_id = repo_id # 数据集唯一标识 self.revision = revision if revision else CODEBASE_VERSION # 版本(默认代码库版本) self.root = Path(root) if root is not None else HF_LEROBOT_HOME / repo_id #本地路径(默认缓存目录) try: if force_cache_sync: raise FileNotFoundError self.load_metadata() except (FileNotFoundError, NotADirectoryError): if is_valid_version(self.revision): self.revision = get_safe_version(self.repo_id, self.revision) (self.root / "meta").mkdir(exist_ok=True, parents=True) self.pull_from_repo(allow_patterns="meta/") # 加载本地元数据 self.load_metadata() def load_metadata(self): self.info = load_info(self.root) check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION) self.tasks, self.task_to_task_index = load_tasks(self.root) self.episodes = load_episodes(self.root) if self._version < packaging.version.parse("v2.1"): self.stats = load_stats(self.root) self.episodes_stats = backward_compatible_episodes_stats(self.stats, self.episodes) else: self.episodes_stats = load_episodes_stats(self.root) self.stats = aggregate_stats(list(self.episodes_stats.values())) def load_metadata(self): self.info = load_info(self.root) check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION) self.tasks, self.task_to_task_index = load_tasks(self.root) self.episodes = load_episodes(self.root) if self._version < packaging.version.parse("v2.1"): self.stats = load_stats(self.root) self.episodes_stats = backward_compatible_episodes_stats(self.stats, self.episodes) else: self.episodes_stats = load_episodes_stats(self.root) self.stats = aggregate_stats(list(self.episodes_stats.values())) 从 self.root/meta 目录加载核心元数据文件(info.json、tasks.jsonl、episodes.jsonl 等),并校验元数据版本与当前代码库兼容性(check_version_compatibility) 录制流程 policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta) #如果是能了策略模型,进行实例创建 listener, events = init_keyboard_listener() #初始化监听键盘事件 先判断是否指定了策略模型,如果指定了进行创建实例。同时初始化键盘监听器 recorded_episodes = 0 while recorded_episodes < cfg.dataset.num_episodes and not events["stop_recording"]: # 录制当前回合数据 log_say(f"Recording episode {dataset.num_episodes}", cfg.play_sounds) # 语音提示开始录制 record_loop( # 核心数据采集函数 robot=robot, teleop=teleop, # 遥操作器输入(或策略生成动作) policy=policy, dataset=dataset, # 数据写入目标 control_time_s=cfg.dataset.episode_time_s, # 单回合录制时长(默认60秒) display_data=cfg.display_data, # 实时可视化 ) # 回合间环境重置(给用户调整物体位置的时间) # 该阶段不写入数据,用于用户重置环境。 if not events["stop_recording"] and (...): log_say("Reset the environment", cfg.play_sounds) record_loop( # 重置阶段不记录数据(dataset=None) robot=robot, teleop=teleop, control_time_s=cfg.dataset.reset_time_s, # 重置时长(用户指定5秒) dataset=None, # 禁用数据写入 ) # 处理重录事件(用户按 'r' 触发) if events["rerecord_episode"]: log_say("Re-record episode", cfg.play_sounds) dataset.clear_episode_buffer() # 清空当前回合无效数据 continue # 重新录制当前回合 dataset.save_episode() # 保存当前回合数据到磁盘(图像、状态、动作) recorded_episodes += 1 # 回合计数+1 核心是调用record_loop进行采集录制,过程中监听用户输入的键盘值处理相关逻辑如右键结束单次录制,左键重复录制,ESC退出流程。 当前回合录制结束后,继续调用record_loop只是传递的参数不一样,等待环境复位。录制完当前回合后,调用dataset.save_episode()将数据写入到磁盘中。 录制循环 record_loop函数在两种场景下被调用,主要的作用是协调机器人控制(遥操作/策略)、数据采集与存储,确保数据帧率稳定且格式符合数据集要求。 正常录制:按配置时长(control_time_s)录制 episode 数据,保存到数据集。 环境重置:录制结束后,执行一段无数据保存的控制(让用户手动重置环境)。 初始化与参数校验 # 校验数据集帧率与录制帧率一致性 if dataset is not None and dataset.fps != fps: raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).") # 多遥操作器处理(如机械臂遥操作 + 键盘底盘控制) teleop_arm = teleop_keyboard = None if isinstance(teleop, list): # 分离键盘遥操作器和机械臂遥操作器 teleop_keyboard = next((t for t in teleop if isinstance(t, KeyboardTeleop)), None) teleop_arm = next((t for t in teleop if isinstance(t, (SO100Leader, ...))), None) # 校验:仅支持 LeKiwi 机器人,且必须包含 1 个键盘遥操作器 + 1 个机械臂遥操作器 if not (teleop_arm and teleop_keyboard and len(teleop) == 2 and robot.name == "lekiwi_client"): raise ValueError(...) # 策略重置(若使用策略控制,确保初始状态一致) if policy is not None: policy.reset() 首先校验一下数据集的帧率与录制帧率是否一致,如果遥控机器人有多个实例,比如主臂+键盘,一般这种组合用于lekiwi,包含一个键盘遥控操作器和1个机械臂遥控操作器。最后判断是否使用模型推理控制,如果是使用了先进行复位。 主循环控制与数据采集 timestamp = 0 # episode 已录制时长(秒) start_episode_t = time.perf_counter() # 起始时间戳 while timestamp < control_time_s: start_loop_t = time.perf_counter() # 本轮循环起始时间 # 检查是否提前退出(如用户按特定按键) if events["exit_early"]: events["exit_early"] = False break # 1. 获取机器人观测(传感器数据:摄像头图像、关节角度等) observation = robot.get_observation() # 2. 构建观测帧(符合数据集格式) if policy is not None or dataset is not None: observation_frame = build_dataset_frame(dataset.features, observation, prefix="observation") # 3. 生成动作(策略/遥操作器二选一) if policy is not None: # 策略生成动作:输入观测帧,输出动作值 action_values = predict_action( observation_frame, policy, device=get_safe_torch_device(...), ... ) # 格式化动作:按机器人动作特征(如关节名称)构建字典 action = {key: action_values[i].item() for i, key in enumerate(robot.action_features)} elif policy is None and isinstance(teleop, Teleoperator): # 单遥操作器:直接获取动作 action = teleop.get_action() elif policy is None and isinstance(teleop, list): # 多遥操作器:合并机械臂动作和底盘动作 arm_action = teleop_arm.get_action() # 机械臂动作 base_action = robot._from_keyboard_to_base_action(teleop_keyboard.get_action()) # 底盘动作 action = {**arm_action, **base_action} # 合并动作 # 4. 执行动作并记录实际发送的动作(可能因安全限制被裁剪) sent_action = robot.send_action(action) # 5. 保存数据到数据集(若启用) if dataset is not None: action_frame = build_dataset_frame(dataset.features, sent_action, prefix="action") # 动作帧 frame = {**observation_frame, **action_frame} # 合并观测与动作 dataset.add_frame(frame, task=single_task) # 添加到数据集 # 6. 可视化数据(若启用) if display_data: log_rerun_data(observation, action) # 记录数据用于 Rerun 可视化 # 7. 维持帧率:等待剩余时间以确保循环周期为 1/FPS 秒 dt_s = time.perf_counter() - start_loop_t # 本轮循环耗时 busy_wait(1 / fps - dt_s) # busy 等待以补足时间 # 更新 episode 已录制时长 timestamp = time.perf_counter() - start_episode_t 这里分为两种模式,一个是模型推理模式和遥控操作模式,前者是使用大模型进行预测产生动作处理,后者是用于大模型训练采集数据。 对于采集数据模式流程如下: 获取观测原始数据:调用robot.get_observation()获取观测数据,包括机器关节的数据、摄像头数据等。用于后续的数据存储。 获取遥控机器人的动作数据:调用action = teleop.get_action()获取到动作数据。如果是多遥控操作器,需要将其合并动作。 发送机器人执行动作:调用sent_action = robot.send_action(action)执行动作。 保存数据集:先调用build_dataset_frame构建遥控臂的数据action,然后将action与从臂机器的观测数据action_values进行合并frame,最后调用dataset.add_frame添加到数据集中。 可视化数据:如果启动了可视化数据,调用log_rerun_data记录数据用于rerun可视化。 时长更新:录制一轮有一个默认的等待时长,调用busy_wait进行等待。 对于模型推理模式流程如下: 获取观测原始数据:调用robot.get_observation()获取观测数据,包括机器关节的数据、摄像头数据等。 构建模型输入数据:要能够喂给模型做预测动作,需要将数据转换为模型可接收的格式,调用build_dataset_frame将观测原始数据进行转换。 模型生成预测动作:调用predict_action得到预测动作action_values,接着将action_values转换为机器的动作特征action。 发送预测动作:调用robot.send_action(action)将预测动作发送给机器进行执行。 下面是observation = robot.get_observation()返回的数据格式,主要包括的是6个舵机关节位置信息+两个相机的图像信息,一共8个键值对。 observation: {'elbow_flex.pos': 99.54710144927537, 'fixed': array([[[ 58, 48, 39], [ 56, 46, 37], [ 49, 39, 29], ..., ..., [ 45, 81, 69], [ 54, 87, 76], [ 55, 88, 77]]], shape=(480, 640, 3), dtype=uint8), 'gripper.pos': 2.666666666666667, 'handeye': array([[[76, 71, 75], [75, 70, 74], [69, 67, 70], ..., ..., [38, 73, 31], [31, 68, 25], [26, 63, 20]]], shape=(480, 640, 3), dtype=uint8), 'shoulder_lift.pos': -98.65771812080537, 'shoulder_pan.pos': -10.490956072351423, 'wrist_flex.pos': 54.17743324720067, 'wrist_roll.pos': -3.5714285714285694} 原始的观测数据,要经过处理,以便喂给模型或者存储到本地,输出的数据如下,一共有3个键值对,包括2个摄像头的键值对和一个舵机关节位置的,可以看到将前面6个舵机的合并为一个键值对了。 observation_frame: {'observation.images.fixed': array([[[ 80, 55, 61], [ 73, 48, 54], [ 68, 48, 49], ..., ..., [ 62, 82, 73], [ 69, 89, 80], [ 73, 90, 82]]], shape=(480, 640, 3), dtype=uint8), 'observation.images.handeye': array([[[67, 78, 72], [72, 83, 77], [69, 85, 75], ..., ..., [51, 71, 20], [39, 59, 6], [42, 63, 7]]], shape=(480, 640, 3), dtype=uint8), 'observation.state': array([-10.490956 , -98.657715 , 99.547104 , 54.177433 , -3.5714285, 2.6666667], dtype=float32)} 发送给机器人的动作数据,比较简单如下,也就是6个舵机的关节位置信息。 sent_action: {'elbow_flex.pos': 99.63685882886972, 'gripper.pos': 0.6509357200976403, 'shoulder_lift.pos': -99.36102236421725, 'shoulder_pan.pos': -10.0, 'wrist_flex.pos': 53.53886235345203, 'wrist_roll.pos': -3.598634095087988} 存储录制数据时,需要遥控机器action_frame、观测机器abservation_frame(前面已经列出了),下面看看相关数据的格式。 action_frame是遥控机器的sent_action转换而来,调用action_frame = build_dataset_frame(dataset.features, sent_action, prefix="action"),实际上就是将sent_action数据6个键值对改为1个键值对。 action_frame: {'action': array([-10. , -99.36102 , 99.636856 , 53.538864 , -3.598634 , 0.6509357], dtype=float32)} 最后将经过处理的一个遥控机器数据和经过处理的观测数据进行合并,最终得到如下4个键值对的数据。 frame: {'action': array([-10. , -99.36102 , 99.636856 , 53.538864 , -3.598634 , 0.6509357], dtype=float32), 'observation.images.fixed': array([[[ 70, 62, 51], [ 63, 55, 44], [ 61, 54, 44], ..., ..., [ 53, 95, 83], [ 47, 90, 80], [ 58, 104, 93]]], shape=(480, 640, 3), dtype=uint8), 'observation.images.handeye': array([[[76, 80, 55], [78, 82, 59], [77, 79, 57], ..., ..., [52, 64, 18], [47, 61, 12], [50, 66, 17]]], shape=(480, 640, 3), dtype=uint8), 'observation.state': array([-10.490956 , -98.657715 , 99.547104 , 54.177433 , -3.5714285, 2.6666667], dtype=float32)} 可视化显示 # 6. 可视化数据(若启用) if display_data: log_rerun_data(observation, action) # 记录数据用于 Rerun 可视化 可以看到界面显示的observation.xxx和action.xxx就是调用的这里log_rerun_data函数,理论上来说observation和action要越吻合越好,因为action是模型推理或主臂的动作,observation是实际的动作。 数据存储 在上一节中,循序录制过程中,将经过处理的遥控机器数据action_frame和经过处理后的机器观测数据observation_frame合并得到的数据frame,然后先调用dataset.add_frame(frame, task=single_task)写入,最后调用save_episode方法将缓存数据写入磁盘。 写入缓存 add_frame 是 LeRobotDataset 类的核心方法之一,负责将单帧机器人数据暂存到内存缓冲区(episode_buffer),并对图像数据进行预处理(如格式转换、临时存储)。该方法是数据录制流程中的关键环节,确保每帧数据符合数据集格式要求,并为后续的 save_episode 方法(将缓冲区数据写入磁盘)做准备。 def add_frame(self, frame: dict, task: str, timestamp: float | None = None) -> None: # 1. 数据格式转换PyTorch Tensor → NumPy 数组 # 目的是统一数据格式为 NumPy 数组(数据集底层存储格式),避免因混合 Tensor/NumPy 类型导致后续处理异常。 for name in frame: if isinstance(frame[name], torch.Tensor): frame[name] = frame[name].numpy() #2. 数据校验,确保帧格式符合数据集定义 #检查 frame 中的所有特征(如 observation.state、action) #是否与 self.features 中定义的 dtype(数据类型)、shape #(维度)一致。例如,若 self.features 定义 action 为 #float32 且形状为 (6,),则校验 frame["action"] 是否满足这些 #条件,确保数据规范性。 validate_frame(frame, self.features) #3. 如果没有初始化 episode 缓冲区,则初始化创建 # 内存缓冲区,用于累积当前 episode 的所有帧数据。结构 #与 self.features 对应,例如包含 observation.state、 #action、timestamp 等键,每个键的值为列表(按帧顺序存 #储数据) if self.episode_buffer is None: self.episode_buffer = self.create_episode_buffer() # 4. 记录帧索引与时间戳 # 未提供 timestamp,默认按帧率(self.fps)计算相对时间 #(如 30 FPS 时,第 0 帧为 0s,第 1 帧为 1/30 ≈0.033s),确保时间序列连续性 frame_index = self.episode_buffer["size"] if timestamp is None: timestamp = frame_index / self.fps # 记录存储frame_index,timestamp,task self.episode_buffer["frame_index"].append(frame_index) self.episode_buffer["timestamp"].append(timestamp) self.episode_buffer["task"].append(task) # 5. 添加一帧数据到episode_buffer for key in frame: if key not in self.features: raise ValueError( f"An element of the frame is not in the features. '{key}' not in '{self.features.keys()}'." ) # 如果是图像,处理追加到数据缓冲区 if self.features[key]["dtype"] in ["image", "video"]: img_path = self._get_image_file_path( episode_index=self.episode_buffer["episode_index"], image_key=key, frame_index=frame_index ) if frame_index == 0: img_path.parent.mkdir(parents=True, exist_ok=True) #通过 _get_image_file_path 生成标准化路径,调用存储图片,可同步可异步。 self._save_image(frame[key], img_path) self.episode_buffer[key].append(str(img_path)) else: # 如果是动作特征数据直接追加到原始数据缓冲区 self.episode_buffer[key].append(frame[key]) #6. 最后更新缓冲区大小 self.episode_buffer["size"] += 1 add_frame 是机器人数据录制的“数据暂存中枢”,通过内存缓冲区累积帧数据、预处理图像并校验数据格式,为后续持久化存储奠定基础。其设计兼顾了录制效率(异步图像写入)和数据可靠性(格式校验),是构建标准化机器人数据集的核心环节,其设计,总结有以下优点: 数据暂存与累积:通过 episode_buffer 在内存中临时存储一整段 episode 的数据,避免频繁磁盘 I/O 影响录制帧率。 图像预处理解耦:将图像保存与数据记录分离,支持异步图像写入(AsyncImageWriter),确保主录制循环(record_loop)不受图像存储速度影响,维持稳定帧率。 数据一致性校验:通过 validate_frame 提前过滤无效数据,避免错误数据进入后续流程(如 save_episode 写入磁盘) 写入磁盘 录制完一轮,会调用dataset.save_episode()进行存储写入磁盘。 def save_episode(self, episode_data: dict | None = None) -> None: #1. 输入处理与数据校验 if not episode_data: episode_buffer = self.episode_buffer #使用内存缓冲区数据(默认流程) #校验缓冲区数据的完整性,确保所有特征(如 #observation.state、action)的长度一致(与 episode 总帧 #数匹配),且 episode_index 未超出当前数据集范围 validate_episode_buffer(episode_buffer, self.meta.total_episodes, self.features) # 2.从缓冲区中提取非特征数据(这些键不直接存储到 Parquet) episode_length = episode_buffer.pop("size")# 当前 episode 的总帧数 tasks = episode_buffer.pop("task")# 每帧的任务标签列表(可能重复) episode_tasks = list(set(tasks))# 当前 episode 涉及的唯一任务列表 episode_index = episode_buffer["episode_index"] # 当前 episode 的索引 #3. 构建全局索引与episode标识,确保每帧在整个数据集中有唯一标识,便于后续数据加载时定位。 # 生成全局帧索引(从数据集总帧数开始累加) episode_buffer["index"] = np.arange(self.meta.total_frames, self.meta.total_frames + episode_length) #生成 episode_index 数组(所有帧均属于当前 episode) episode_buffer["episode_index"] = np.full((episode_length,), episode_index) # 4. 任务标签处理与元数据更新 # 添加新任务到元数据(若任务不存在) for task in episode_tasks: task_index = self.meta.get_task_index(task) if task_index is None: self.meta.add_task(task)# 新增任务并更新 tasks.jsonl # 生成 task_index 数组(将每帧的任务标签映射为整数索引) episode_buffer["task_index"] = np.array([self.meta.get_task_index(task) for task in tasks]) # 5.数值特征数据格式化 for key, ft in self.features.items(): # 跳过索引类特征、图像和视频(这些由其他逻辑处理) if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["image", "video"]: continue # 将列表形式的帧数据堆叠为二维数组(shape: [episode_length, feature_dim]) episode_buffer[key] = np.stack(episode_buffer[key]) #若 action 特征为长度 6 的向量,episode_buffer["action"] # 会从 [ [a0], [a1], ..., [aN] ] 堆叠为 [ [a0_0, ..., a0_5], ..., [aN_0, ..., aN_5] ],符合 Parquet 存储格式。 #6. 等待异步图像写入完成 self._wait_image_writer()# 确保所有临时图像文件已写入磁盘 # 保存数值到Parquet文件 self._save_episode_table(episode_buffer, episode_index) ep_stats = compute_episode_stats(episode_buffer, self.features) # _save_episode_table 行为 # 从缓冲区提取特征数据,构造 datasets.Dataset 对象。 # 将数据写入 Parquet 文件(路径由元数据的 get_data_file_path 生成,如 data/chunk-000/episode_000000.parquet)。 # 更新内存中的 hf_dataset(拼接新 episode 数据)。 #8. 若启用视频模式,将图像编码存储为视频文件 if len(self.meta.video_keys) > 0: video_paths = self.encode_episode_videos(episode_index) # 将临时图像编码为 MP4 #调用 ffmpeg 将临时目录下的图像帧(如 images/observation.images.laptop/episode_000000/frame_*.png) #编码为 MP4 视频,存储路径由元数据的 get_video_file_path 定义 for key in self.meta.video_keys: episode_buffer[key] = video_paths[key] # 记录视频文件路径 # `meta.save_episode` be executed after encoding the videos #9. 更新元数据与校验时间戳 # 更新元数据(总 episodes、总 frames、chunks 等) self.meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats) # 校验时间戳连续性(确保帧间隔符合帧率 1/fps ± tolerance_s) ep_data_index = get_episode_data_index(self.meta.episodes, [episode_index]) ep_data_index_np = {k: t.numpy() for k, t in ep_data_index.items()} check_timestamps_sync( episode_buffer["timestamp"],# 帧时间戳数组 episode_buffer["episode_index"],# episode 索引数组 ep_data_index_np,# episode 帧范围 self.fps,# 帧率 self.tolerance_s,# 时间容差 ) #10. 文件完整性校验与资源清理 # 验证视频文件数量(每个 episode × 每个视频 key 应对应一个 MP4) video_files = list(self.root.rglob("*.mp4")) assert len(video_files) == self.num_episodes * len(self.meta.video_keys) # 验证 Parquet 文件数量(每个 episode 对应一个 Parquet) parquet_files = list(self.root.rglob("*.parquet")) assert len(parquet_files) == self.num_episodes # 删除临时图像目录(已编码为视频,无需保留) img_dir = self.root / "images" if img_dir.is_dir(): shutil.rmtree(self.root / "images") # 重置缓冲区,准备下一段 episode 录制 if not episode_data: # Reset the buffer self.episode_buffer = self.create_episode_buffer() save_episode 是 LeRobotDataset 类的核心方法,负责将内存缓冲区(episode_buffer)中累积的单段 episode 数据持久化到磁盘,同时更新元数据、生成视频文件、校验数据完整性,并重置缓冲区以准备下一段录制。该方法是数据录制流程的“收尾环节”,确保每段 episode 数据符合 LeRobot 数据集格式规范。主要功能如下: 数据持久化:将内存中的帧数据(数值特征、图像路径)写入 Parquet 文件(结构化数据)和 MP4 文件(视频数据)。 元数据更新:更新数据集元信息(如总 episode 数、总帧数、任务标签),确保元数据与实际数据一致。 数据校验:验证时间戳连续性、文件完整性(如视频/Parquet 文件数量匹配预期),避免无效数据入库。 资源清理:删除临时图像文件(已编码为视频),释放内存缓冲区。 save_episode 是 LeRobot 数据集录制的“最终执行者”,通过系统化的数据格式化、编码、校验和清理,将内存中的临时帧数据转化为符合规范的磁盘存储,同时维护元数据一致性和数据完整性。其设计确保了录制数据的可靠性、存储效率和下游可用性,是连接实时录制与离线数据使用的关键桥梁。 结束采集 log_say("Stop recording", cfg.play_sounds, blocking=True) # 语音提示结束录制 robot.disconnect() # 断开机器人连接(禁用扭矩,确保安全) if teleop is not None: teleop.disconnect() # 断开遥操作器连接 if not is_headless() and listener is not None: listener.stop() # 停止键盘监听器 断开设备,停止键盘监听。 if cfg.dataset.push_to_hub: dataset.push_to_hub(tags=cfg.dataset.tags, private=cfg.dataset.private) # 上传数据集到 Hugging Face Hub 将数据上传到hugging Face Hub上。
  • python补习

    python补习

    装饰器 函数装饰器 什么是装饰器 装饰器是python的一种高级语法,本质上是函数包装器,可以在不修改函数代码的前提下为函数添加额外功能如日志记录、性能计时、权限校验,也可以修改函数的输入和输出。装饰器通过@装饰器名语法应用与函数,也是一种语法糖,简化包装的代码。 基本语法与原理 装饰器是一个接收函数作为参数,并返回新函数的函数。当用@decorator修饰函数func时,相当于执行func=decorator(func),即原函数被替换为装饰器返回的新函数。 # 定义装饰器:打印函数调用信息 def log_decorator(func): def wrapper(*args, **kwargs): print(f"调用函数: {func.__name__}") # 额外功能:打印日志 result = func(*args, **kwargs) # 执行原函数 print(f"函数 {func.__name__} 执行完毕") return result # 返回原函数结果 return wrapper # 应用装饰器 @log_decorator def add(a, b): return a + b # 调用函数 add(1, 2) 上面打印的输出为 调用函数: add 函数 add 执行完毕 3 可以看到,相当于给add函数做了一层包装。这里 log_decorator 为 add 函数添加了“调用日志”功能,原 add 函数代码未做任何修改。 带参数的装饰器 如果装饰器需要自定义参数(如 @parser.wrap() 中的空括号),需在基础装饰器外再嵌套一层参数接收函数。 # 带参数的装饰器:自定义日志前缀 def log_decorator(prefix="LOG"): def decorator(func): def wrapper(*args, **kwargs): print(f"[{prefix}] 调用函数: {func.__name__}") # 使用装饰器参数 result = func(*args, **kwargs) return result return wrapper return decorator # 应用装饰器(传递参数) @log_decorator(prefix="DEBUG") def multiply(a, b): return a * b multiply(3, 4) # 输出: [DEBUG] 调用函数: multiply → 返回 12 装饰器在 Python 中非常常用,典型场景包括: 日志记录:自动记录函数调用、参数、返回值; 性能计时:统计函数执行时间; 权限校验:检查用户是否有权限调用函数; 输入/输出处理:自动转换参数类型、格式化返回值; 资源管理:自动打开/关闭文件、数据库连接等。 类装饰器 前面描述了函数装饰器,还有类的装饰器。类的装饰器是通过修改类的定义、添加/覆盖方法或属性,或返回一个新类,来增强类的功能,与函数装饰器(修饰函数)不同,类装饰器专注于修饰类本身。类装饰器通过 @装饰器名 语法应用于类,是一种“语法糖”,简化了类的动态修改逻辑。 类装饰器是一个接收类作为参数,并返回新类的函数。当用 @decorator 修饰类 MyClass 时,相当于执行 MyClass = decorator(MyClass),即原类被“替换”为装饰器返回的新类。 简单类装饰器示例 # 定义类装饰器:为类添加一个属性和方法 def add_greeting(cls): cls.greeting = "Hello from decorator!" # 新添加类属性 def say_hello(self): # 新定义要添加的实例方法 return f"{self.greeting} I'm {self.name}." cls.say_hello = say_hello # 将方法绑定到类 return cls # 返回修改后的类 # 应用装饰器 @add_greeting class Person: def __init__(self, name): self.name = name # 使用装饰后的类 p = Person("Alice") print(p.greeting) # 输出:Hello from decorator! print(p.say_hello()) # 输出:Hello from decorator! I'm Alice. 内置装饰器@dataclass @dataclass 是 Python 标准库 dataclasses 提供的内置类装饰器,用于快速定义数据存储类(Data Class),核心作用是: 自动生成 init 方法:无需手动编写 def init(self, robot, dataset, ...): ...,装饰器会根据类字段自动生成。 自动生成 repr、eq 等方法:方便打印实例(如 print(cfg))和比较实例是否相等。 支持字段默认值和类型注解:如 teleop: TeleoperatorConfig | None = None 中的默认值和类型约束。 先来看看如果不适用@dataclass装饰器,定义一个普通的类,需要手动编写init,repr等方法,如下 class DatasetConfig: def __init__(self, repo_id: str, num_episodes: int = 50): self.repo_id = repo_id # 手动绑定 self.xxx self.num_episodes = num_episodes def __repr__(self): # 手动编写打印逻辑 return f"DatasetConfig(repo_id={self.repo_id!r}, num_episodes={self.num_episodes!r})" # 使用 cfg = DatasetConfig("aliberts/record-test", 2) print(cfg) # 输出:DatasetConfig(repo_id='aliberts/record-test', num_episodes=2) 如果使用了@dataclass,则不需要编写init等方法,但是需要添加属性的类型注解声明,如下面repo_id,num_episodes。如果有默认值的,如下的int=50,可选传递参数,如果没有默认值的,必现要传递参数如repo_id。 from dataclasses import dataclass @dataclass class DatasetConfig: repo_id: str # 必选字段(无默认值) num_episodes: int = 50 # 可选字段(默认值 50) # 使用(效果与普通类完全一致) cfg = DatasetConfig("aliberts/record-test", 2) print(cfg) # 自动生成 __repr__:DatasetConfig(repo_id='aliberts/record-test', num_episodes=2) @dataclass提供初始化后回调方法,在init执行完毕后自动调用,用于字段校验,动态修改字段值等。 from dataclasses import dataclass @dataclass class DatasetConfig: repo_id: str # 必选字段(无默认值) num_episodes: int = 50 # 可选字段(默认值 50) def __post_init__(self): if self.repo_id is None: raise ValueError("You need to provide a repo_id as argument.") 函数返回类型注解 函数返回类型注解是python 3.0+引入的类型提示语法,用于显式声明函数预期返回值的类型。它不会改变函数的运行逻辑,只是为了提升代码的可读性、支持IDE智能提示,便于静态代码检查工具检测潜在错误。其语法格式为如下: def 函数名(参数: 参数类型) -> 返回值类型: # 函数逻辑 return 返回值 返回类型注解通过->类型语法声明,位于函数定义参数列表之后、冒号:之前。 def record(cfg: RecordConfig) -> LeRobotDataset: # ... 函数逻辑 ... return dataset # dataset 是 LeRobotDataset 实例 这里 -> LeRobotDataset 表示:record 函数执行完毕后,预期返回一个 LeRobotDataset 类的实例。 注解都有哪些类型了,除了基础的int、float、str、bool、None(空)这几个类型外,还有容器类型、组合类型、特殊类型等。 容器类型 列表,list[值类型] ,用于标注列表、字典、元组等容器的元素类型,Python 3.9+ 支持直接用 list[int] 形式,旧版本需从 typing 模块导入(如 List[int])。 motor_names: list[str] = ["shoulder_pan", "elbow_flex"] # 字符串列表 positions: list[float] = [0.2, 0.5, -0.3] # 浮点数列表 字典,dict[键类型, 值类型]如下示例 def _motors_ft(self) -> dict[str, type]: # 键为字符串,值为类型对象(如 float) return {f"{motor}.pos": float for motor in self.bus.motors} 元组,tuple[类型1, 类型2, ...],如下示例: def _cameras_ft(self) -> dict[str, tuple]: # 值为元组(高、宽、通道数) return {cam: (height, width, 3) for cam in self.cameras} # 更精确标注:tuple[int, int, int](高、宽、3通道) 集合类型,set[元素类型],如下示例唯一电机ID集合。 motor_ids: set[int] = {1, 2, 3, 4, 5, 6} # 整数集合 组合类型 Union,Union[类型1, 类型2, ...],允许整数或字符串的参数 from typing import Union def get_motor(motor_id: Union[int, str]) -> Motor: # motor_id 可为 int 或 str ... # Python 3.10+ 简写: def get_motor(motor_id: int | str) -> Motor: ... Option,Optional[类型],(等价于 Union[类型, None]),可能为None的配置参数。 from typing import Optional def connect(port: Optional[str] = None) -> None: # port 可为字符串或 None ... 特殊类型 Any,任意类型,关闭类型检查,允许任何类型(常用于动态数据,如lerobot代码中 get_observation 返回 dict[str, Any]) from typing import Any def get_observation(self) -> dict[str, Any]: # 值可为任意类型(电机位置/图像等) ... Callable,Callable[[参数类型1, 参数类型2], 返回值类型],接受函数作为参数。 from typing import Callable def calibrate(callback: Callable[[str], None]) -> None: # callback 是 (str) -> None 的函数 callback("Calibration done") type,Type[类型](标注“类型本身”而非实例,如lerobot代码中 dict[str, type]),接受类作为参数。 from typing import Type def create_robot(robot_class: Type[Robot]) -> Robot: # robot_class 是 Robot 的子类 return robot_class() 配置选择注册机制 以draccus.ChoiceRegistry为例说明,draccus.ChoiceRegistry 是 draccus 配置框架提供的子类注册与动态选择机制。它允许将基类的多个子类注册为“可选选项”,并通过配置参数(如命令行、配置文件)动态选择具体子类。在工程中,这一机制用于实现 “同一接口,多种实现” 的灵活配置(例如不同机器人型号共享 RobotConfig 接口,但有各自的硬件参数实现)。 注册与选择流程 1. 基类:继承ChoiceRegistry并声明接口。如示例基类(如 RobotConfig)继承 draccus.ChoiceRegistry,作为所有子类的“公共接口”。它定义通用字段和方法,不包含具体实现细节。 from dataclasses import dataclass import abc import draccus @dataclass(kw_only=True) class RobotConfig(draccus.ChoiceRegistry, abc.ABC): # 继承 ChoiceRegistry # 通用字段(所有子类共享) id: str | None = None # 机器人标识 calibration_dir: Path | None = None # 校准文件路径 @property def type(self) -> str: # 获取当前子类的注册名称(核心方法) return self.get_choice_name(self.__class__) 2. 子类:注册为可选项。每个具体实现(如不同机器人型号)定义一个 RobotConfig 的子类,补充特有字段和逻辑。draccus 会自动将子类注册为一个可选选项,如下: 例如,so101_follower 机器人的配置子类: # SO101FollowerConfig(so101_follower 型号) @dataclass(kw_only=True) class SO101FollowerConfig(RobotConfig): port: str # 型号特有字段(通信端口) disable_torque_on_disconnect: bool = True # 型号特有字段(扭矩控制) # KochFollowerConfig(koch_follower 型号) @dataclass(kw_only=True) class KochFollowerConfig(RobotConfig): ip_address: str # 型号特有字段(以太网通信地址) timeout_ms: int = 500 # 型号特有字段(通信超时) SO101FollowerConfig/KochFollowerConfig继承了RobotConfig,而RobotConfig继承了draccus.ChoiceRegistry。 3. 动态选择:通过配置参数指定子类。用户通过配置参数(如命令行 --robot.type=so101_follower)指定要使用的子类。draccus 会: 根据参数值(so101_follower)查找注册的子类; 实例化该子类,并将其他配置参数(如 --robot.port=/dev/ttyACM1)映射到子类字段; 返回实例化后的子类对象,作为业务逻辑的输入。 # 命令行参数示例 python -m lerobot.record \ --robot.type=so101_follower \ # 选择 SO101FollowerConfig 子类 --robot.id=black \ # 设置通用字段 id --robot.port=/dev/ttyACM0 \ # 设置型号特有字段 port --robot.disable_torque_on_disconnect=true # 设置型号特有字段 动作先行思维 C语言的风格是写法是条件先行,再写动作。而python支持动作先行写法,再补充条件,主要是为了简写。请看下面示例。 A if cond else B # 简洁写法(条件表达式) teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None # 等价传统写法(if-else 块) if cfg.teleop is not None: teleop = make_teleoperator_from_config(cfg.teleop) else: teleop = None 使用的简洁方法是: A if cond else B。 or短路运算 # 例:若 config_path 为空,则默认使用 "./config.json" config_path = user_provided_path or "./config.json" for循环 C 的 for 循环强调“初始化→条件→增量”的控制流,而 Python 的 for 更关注“迭代对象→元素处理”,动作(循环体)直接跟在迭代逻辑后。 # 例:遍历数据集并处理每个帧 for frame in dataset.frames: process_frame(frame) # 动作(循环体)直接跟在迭代逻辑后 Python 无需显式初始化索引、判断终止条件或手动增量(如 i++),迭代逻辑由“可迭代对象”(如列表、字典、生成器)内部处理。 列表推导式 结构是列表推导式:[表达式 for 变量 in 可迭代对象 if 条件] Python 的列表推导式将“对元素的处理动作”放在最前面,直接表达“要生成什么样的列表”,而非 C 中“如何生成列表”的步骤式逻辑。 # 例:筛选偶数并计算平方(动作:x**2,条件:x%2==0) even_squares = [x**2 for x in range(10) if x % 2 == 0] # 结果:[0, 4, 16, 36, 64] 结构拆解: 动作:(x**2),定义每个元素的转换方式(先明确“要做什么”); 迭代逻辑:(for x in range(10)),从哪里获取元素; 条件:(if x\%2 =\= 0),筛选元素的规则,后补充限制条件。 也可以直接做赋值,看下面例子 @dataclass class Example: src: List[int] tgt: List[int] 收集列表直接赋值为data data = [Example(s,t)] for s, t in paris if len(s) > 0] 等价于 for s, t in pairs: if len(s) > 0: src_ids = s; tgt_idg = s; data.append(Example(s,t)) 字典推导式 核心逻辑是:{新键: 新值 for 键, 值 in 迭代器 if 条件} 先定义“键和值的生成动作”,再说明迭代范围和筛选规则,适用于快速构建字典。示例:将遥操作器原始动作(如 {"shoulder": 0.2, "gripper": 0.9})转换为带前缀的数据集格式: # 原始遥操作动作 teleop_action = {"shoulder": 0.2, "elbow": 0.5, "gripper": 0.9} # 字典推导式:先定义键值动作(添加前缀),再迭代 dataset_action = { f"action.{key}": value # 动作:键添加前缀,值保持不变 for key, value in teleop_action.items() # 迭代范围:遥操作动作字典 if key != "gripper" # 筛选条件:排除 gripper(假设无需记录) } print(dataset_action) # 输出:{"action.shoulder": 0.2, "action.elbow": 0.5} 再来看看几个例子: features = {} joint_fts = {key: ftype for key, ftype in hw_features.items() if ftype is float} 遍历 hw_features 的所有键值对(key, ftype),仅保留 值为 float 类型 的键值对(即电机角度等浮点型特征)。若hw_features 含 {"shoulder.pos": float, "camera": (480,640,3)},则 joint_fts 为 {"shoulder.pos": float}。函数的作用就是筛选电机特征。实际打印如下: joint_fts : {'shoulder_pan.pos': <class 'float'>, 'shoulder_lift.pos': <class 'float'>, 'elbow_flex.pos': <class 'float'>, 'wrist_flex.pos': <class 'float'>, 'wrist_roll.pos': <class 'float'>, 'gripper.pos': <class 'float'>} cam_fts = {key: shape for key, shape in hw_features.items() if isinstance(shape, tuple)} 遍历 hw_features,仅保留 值为元组类型 的键值对(即相机尺寸等元组特征,如 (高, 宽, 通道数))。示例:若 hw_features 含 {"camera": (480,640,3)},则 cam_fts 为 {"camera": (480,640,3)}。函数的作用就是筛选相机特征。 cam_fts : {'handeye': (480, 640, 3), 'fixed': (480, 640, 3)} Action Features: {'action': {'dtype': 'float32', 'shape': (6,), 'names': ['shoulder_pan.pos', 'shoulder_lift.pos', 'elbow_flex.pos', 'wrist_flex.pos', 'wrist_roll.pos', 'gripper.pos']}} 面向对象 三大特性 (1)封装 class MotorsBus: def __init__(self, port): self.port = port self._private_var = "内部使用" # 私有变量 def public_method(self): return self._private_var # 通过公共方法访问私有数据 (2)继承 # 基类 class MotorsBus(abc.ABC): def __init__(self, port): self.port = port @abc.abstractmethod def write_calibration(self): pass # 子类 class FeetechMotorsBus(MotorsBus): def write_calibration(self): # 具体实现 pass (3)多态 # 同一个接口,不同实现 bus1 = FeetechMotorsBus(port) bus2 = DynamixelMotorsBus(port) # 调用相同方法,但执行不同逻辑 bus1.write_calibration() # Feetech 的实现 bus2.write_calibration() # Dynamixel 的实现 抽象基类 定义抽象接口 import abc class MotorsBus(abc.ABC): @abc.abstractmethod def write_calibration(self): pass # 强制子类实现 子类实现 class FeetechMotorsBus(MotorsBus): def write_calibration(self): # 具体实现 for motor, calibration in calibration_dict.items(): self.write("Homing_Offset", motor, calibration.homing_offset) 类型注解 class MotorsBus(ABC): """电机总线基类""" def __init__(self, port: str, motors: Dict[str, 'Motor']): # 基础类型注解 self.port: str = port self.motors: Dict[str, 'Motor'] = motors # 可选类型注解 self.calibration: Optional[Dict[str, 'MotorCalibration']] = None # 延迟初始化的类型注解(仅声明类型,不赋值) self.port_handler: 'PortHandler' self.packet_handler: 'PacketHandler' # 私有变量类型注解 self._comm_success: int self._no_error: int 父类只声明变量的类型,子类需要实现。 class FeetechMotorsBus(MotorsBus): """Feetech 电机总线实现""" def __init__(self, port: str, motors: Dict[str, 'Motor']): super().__init__(port, motors) # 子类中的具体初始化 import scservo_sdk as scs self.port_handler = scs.PortHandler(self.port) # 实际赋值 self.packet_handler = scs.PacketHandler(0) 异步编程 为什么需要async 在传统python中 def download_file(): data = requests.get("https://example.com/file") return data 如上示例,代码是阻塞了,程序必须等请求返回结果后才能执行下一行,但如果要同时下载上百个文件,就会非常慢,为了解决这种I/O阻塞问题,python提供了异步编程,让多个任务可以并发运行(并非正在并行,但效率更高)。 基本的语法 import asyncio async def say_hello(): print("Hello") await asyncio.sleep(1) print("World") asyncio.run(say_hello()) async def:定义一个协程函数(coroutine function),调用它不会立即执行,而是返回一个协程对象(coroutine object)。 await:挂起当前协程,等待另外一个异步操作完成后再继续执行。 可以理解会把say_hello任务放到一个线程中运行。 那如何实现“并发”了? import asyncio async def download(name, delay): print(f"开始下载 {name}") await asyncio.sleep(delay) print(f"下载完成 {name}") async def main(): await asyncio.gather( download("文件1", 2), download("文件2", 1), download("文件3", 3), ) asyncio.run(main()) 上面的示例就会“并发”下载3个文件,输出顺序不是严格依次的,因为 await asyncio.sleep() 会释放控制权,其他任务可同时运行。 print(f"开始下载 {name}")这句会交叉打印,而不是依次文件1、文件2、文件3排序严格运行。可以理解会把download放到了3个线程中运行。 async with 这是异步上下文管理器,普通的with会调用: __enter__() 和 __exit__() 对于上下文管理器语法,会给上下文对象起一个变量名,语法为: with 表达式 as 变量: ... 示例 with open("file.txt") as f: data = f.read() with包括的上下文中创建了对象f,在离开with上下文中,会自动销毁f。普通的with可以自动帮助打开文件->执行enter,读取内容,离开代码块是自动关闭文件,执行exit。所以这里的with是资源安全保护壳,不管有没有异常都会自动清理资源。 在异步世界里,有些资源(比如网络连接、数据库连接、WebSocket)也需要“自动清理”,但它们的清理动作本身是异步的,必须 await 才能完成。于是python提供了async with。他会自动调用。 __aenter__() # 异步进入 __aexit__() # 异步退出 下面举个例子: import httpx import asyncio async def fetch(url): async with httpx.AsyncClient() as client: # 注意这里 response = await client.get(url) return response.text async def main(): html = await fetch("https://www.python.org") print(len(html)) asyncio.run(main()) 上面的as client是创建异步http客户端的对象,Python 会调用它的 aenter() 方法(异步版的 enter()),这个方法返回的对象会被赋值给你在 as client 中定义的变量,离开 with 块时,会自动调用 aexit() 关闭连接、释放资源。 除了async with外,还有async for,也是类似的用法。下面举例: import asyncio import websockets async def listen(): async with websockets.connect("wss://echo.websocket.org") as ws: await ws.send("hello") async for message in ws: # 持续监听消息流 print("收到:", message) asyncio.run(listen()) 闭包、函数对象与回调注册机制 def register_service(self, sd: ServiceDescriptor) -> None: async def impl(arguments: Dict[str, Any]) -> Any: return self._tool_router.call_tool(sd.full_name, arguments or {}) self._server.add_tool( impl, name=sd.full_name, description=sd.description, ) 如上示例,impl作为回调函数注册到self._server.add_tool作为回调函数,从C语言转到python后可能会有一个疑问? 为什么impl函数在离开register_service函数作用域后依然不会被销毁? 函数是对象 def foo(): print("hi") bar = foo # 函数对象赋给变量 bar() # 调用 在python中,函数本质上是对象,意味着函数可以赋值给变量,存入列表或字典,作为参数传递,作为返回值返回,持久化引用(例如在回调系统中长期存活)。 因此:函数与普通对象无异,只要有引用,它就不会被销毁。 闭包:内部函数会"捕获"外层变量 如果在函数内部在定义函数,并且内部函数使用了外部变量,那么python会自动创建闭包(closure)。 def outer(): x = 42 def inner(): print(x) return inner f = outer() f() # 输出 42 这里 inner 捕获了外部的变量 x,即使 outer() 已经结束,x 依然存在于 inner.closure 中。 闭包让内部函数拥有独立于作用域的状态。 作用域结束 ≠ 对象销毁 python中必须要区分两类东西: 项目 是否会在函数返回后销毁? 包含内容 栈帧/作用域(Stack Frame) ✔ 会销毁 局部变量表、执行上下文 对象本身(Object) ✘ 不会销毁(只要有引用) 函数对象、闭包变量、列表、字典等 这就是为什么函数内部定义的内部函数,外部变量被闭包捕获后,不会随着外层结束而消失。 决定对象是否被销毁的只有"引用计数",只有当引用计数变为0对象才会被销毁。
  • lerobot示教

    lerobot示教

    启动 示教的功能主要是主臂控制,从臂跟随,在数据采集是非常的一环。下面是模块启动的执行命令: python -m lerobot.teleoperate \ --robot.type=so101_follower \ --robot.port=/dev/ttyACM0 \ --robot.id=R12252801 \ --teleop.type=so101_leader \ --teleop.port=/dev/ttyACM1 \ --teleop.id=R07252608 python解释器回安装sys.path查找lerobot.teleoperate模块,找到执行lerobot/src/lerobot/teleoperate.py文件,调用teleoperate函数。 ## 配置解析 @draccus.wrap() def teleoperate(cfg: TeleoperateConfig): init_logging() logging.info(pformat(asdict(cfg))) ## 初始化可视化工具 if cfg.display_data: _init_rerun(session_name="teleoperation") ##创建操作设备和机器人实例 teleop = make_teleoperator_from_config(cfg.teleop) robot = make_robot_from_config(cfg.robot) ## 连接设备 teleop.connect() robot.connect() ##示教循环 try: teleop_loop(teleop, robot, cfg.fps, display_data=cfg.display_data, duration=cfg.teleop_time_s) except KeyboardInterrupt: pass finally: if cfg.display_data: rr.rerun_shutdown() ## 断开连接 teleop.disconnect() robot.disconnect() if __name__ == "__main__": teleoperate() 下面展开来说明一下。 配置解析 @draccus.wrap() def teleoperate(cfg: TeleoperateConfig): # 初始化日志记录 init_logging() # 打印配置信息 logging.info(pformat(draccus.asdict(cfg))) 在teleoperate.py中teleoperate函数使用@draccus.wrap()装饰器解析命令行参数,将参数转换为TeleoperateConfig类型的配置对象。dracus是用于配置解析的库,类型与argparse。 这里的TeleoperateConfig是一个自定义的配置类,定义如下: from dataclasses import dataclass @dataclass class RobotConfig: type: str port: str id: str @dataclass class TeleopConfig: type: str port: str id: str @dataclass class TeleoperateConfig: teleop: TeleoperatorConfig robot: RobotConfig # Limit the maximum frames per second. fps: int = 60 teleop_time_s: float | None = None # Display all cameras on screen display_data: bool = False 可以看到TeleoperateConfig类中属性定义了robot,teleop两个对象,命令行的参数将会映射到,如: --robot.type=so101_follower:对应cfg.robot.type --teleop.port=/dev/ttyACM1:对应cfg.teleop.port 初始化可视化工具 if cfg.display_data: _init_rerun(session_name="teleoperation") 当参数--display_data=true,即cfg.display_data 为 True,则调用 _init_rerun 函数初始化 rerun 可视化工具。rerun 是一个用于记录和可视化科学数据的工具。 def _init_rerun(session_name: str = "lerobot_control_loop") -> None: """Initializes the Rerun SDK for visualizing the control loop.""" batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000") os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_size rr.init(session_name) memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%") rr.spawn(memory_limit=memory_limit) _init_rerun函数的主要功能是初始化Rerun SDK,设置数据刷新的字节数和内存使用限制,然后启动Rerun会话,为后续的控制循环可视化工作做准备。 其中有两个参数是从环境变量中获取,分别是RERUN_FLUSH_NUM_BYTES和LEROBOT_RERUN_MEMORY_LIMIT,用于控制每次刷新是处理数据的字节数和限制使用的内存量。 创建主从臂实例 teleop = make_teleoperator_from_config(cfg.teleop) robot = make_robot_from_config(cfg.robot) 调用调用 make_teleoperator_from_config 和make_robot_from_config 函数,根据配置对象创建 SO101Leader遥操作设备实例和 SO101Follower 机器人实例。 def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator: if config.type == "keyboard": from .keyboard import KeyboardTeleop return KeyboardTeleop(config) elif config.type == "koch_leader": from .koch_leader import KochLeader return KochLeader(config) elif config.type == "so100_leader": from .so100_leader import SO100Leader return SO100Leader(config) elif config.type == "so101_leader": from .so101_leader import SO101Leader return SO101Leader(config) ...... 以leader为例,最终调用SO101Leader对象实例。 class SO101Leader(Teleoperator): """ SO-101 Leader Arm designed by TheRobotStudio and Hugging Face. """ config_class = SO101LeaderConfig name = "so101_leader" def __init__(self, config: SO101LeaderConfig): super().__init__(config) #调用父类Teleoperator够着函数初始化属性,包括 #self.id:设备的唯一标识符,从配置对象中获取。 #self.calibration_dir:校准文件的存储目录。 #self.calibration_fpath:校准文件的完整路径。 #self.calibration:存储电机校准信息的字典。 self.config = config #类型为 SO101LeaderConfig,存储传入的配置对象,方便在类的其他方法中访问配置信息。 norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100 self.bus = FeetechMotorsBus( port=self.config.port, motors={ "shoulder_pan": Motor(1, "sts3215", norm_mode_body), "shoulder_lift": Motor(2, "sts3215", norm_mode_body), "elbow_flex": Motor(3, "sts3215", norm_mode_body), "wrist_flex": Motor(4, "sts3215", norm_mode_body), "wrist_roll": Motor(5, "sts3215", norm_mode_body), "gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), }, calibration=self.calibration, ) #类型为 FeetechMotorsBus,用于与 Feetech 电机进行通信,初始化时传入端口、电机信息和校准信息。 SO101Leader类继承了Teleoperator,super().init(config) 调用父类 Teleoperator 的构造函数初始化的属性。 class Teleoperator(abc.ABC): def __init__(self, config: TeleoperatorConfig): self.id = config.id #设备的id self.calibration_dir = ( config.calibration_dir if config.calibration_dir else HF_LEROBOT_CALIBRATION / TELEOPERATORS / self.name ) #校准文件的存储目录 self.calibration_dir.mkdir(parents=True, exist_ok=True) #创建校准目录 self.calibration_fpath = self.calibration_dir / f"{self.id}.json" self.calibration: dict[str, MotorCalibration] = {} # 初始化一个字典,键为字符串,值为MotorCalibration if self.calibration_fpath.is_file(): self._load_calibration() #检查校准文件是否存在,存在调用加载校准信息的方法。 Teleoperator定义了一个遥控的基类,其构造函数中接受一个TeleoperatorConfig类型的参数config,主要有以下属性。 self.id:设备的唯一标识符,从传入的配置对象 config 里获取,用于在系统中唯一标识该 SO101Leader 设备实例。 self.calibration_dir:校准文件的存储目录。若 config.calibration_dir 有值则使用该值,否则使用默认路径。此目录用于存放设备的校准文件。 self.calibration_fpath:校准文件的完整路径,由 self.calibration_dir 和设备 id 组合而成,文件格式为 JSON。 self.calibration:存储电机校准信息的字典,键为电机名称,值为 MotorCalibration 类型的对象,用于保存电机的校准参数。 而SO101Leader类中自己定义的实力属性有 - self.config:类型为 SO101LeaderConfig,存储传入的配置对象。借助这个属性,类的其他方法可以访问配置信息,如端口号、是否使用角度单位等。 - self.bus:类型为 FeetechMotorsBus,用于和 Feetech 电机进行通信。初始化时传入端口、电机信息和校准信息,后续可以通过该对象实现电机的读写操作、校准等功能。 总结下就是SO101Leader 类包含 2 个类属性和多个实例属性,这些属性从不同层面描述了 SO101 领导者手臂设备的类型、配置、校准信息以及与电机的通信对象等。而SO101Follower也是同理这里就不过多阐述。 连接设备 teleop.connect() robot.connect() 连接设备就是各自调用此前创建实例的connect方法,建立与硬件设备的连接。以teleop为例,最总调用到self.bus.connect发起连接并进行配置。 def connect(self, calibrate: bool = True) -> None: if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() self.configure() logger.info(f"{self} connected.") 跟随控制 def teleop_loop( teleop: Teleoperator, robot: Robot, fps: int, display_data: bool = False, duration: float | None = None ): display_len = max(len(key) for key in robot.action_features) start = time.perf_counter() while True: loop_start = time.perf_counter() #从主臂获取动作位置数据 action = teleop.get_action() #如果启动了实时显示,进行显示 if display_data: observation = robot.get_observation() log_rerun_data(observation, action) #将主臂的动作位置数据发送给从臂 robot.send_action(action) #计算循环执行时间 dt_s = time.perf_counter() - loop_start busy_wait(1 / fps - dt_s) loop_s = time.perf_counter() - loop_start #打印动作的信息,每个舵机的位置。 print("\n" + "-" * (display_len + 10)) print(f"{'NAME':<{display_len}} | {'NORM':>7}") for motor, value in action.items(): print(f"{motor:<{display_len}} | {value:>7.2f}") print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)") if duration is not None and time.perf_counter() - start >= duration: return # 将终端的文本光标向上移动一定的行数,以此实现覆盖上一次输出内容的效果,从而在终端里模拟实时更新的显示 move_cursor_up(len(action) + 5)
  • 密码保护:手机给电脑配置代理

    密码保护:手机给电脑配置代理

    此内容受密码保护。如需查看请在下方输入访问密码: 密码:
  • lerobot设备标定

    lerobot设备标定

    why calibrate 先来看看标定后的数据 { "shoulder_pan": { #肩部旋转关节 "id": 1, "drive_mode": 0, "homing_offset": -1620, "range_min": 1142, "range_max": 2931 }, "shoulder_lift": { #肩部升降关节 "id": 2, "drive_mode": 0, "homing_offset": 2025, "range_min": 844, "range_max": 3053 }, "elbow_flex": { #肘部弯曲关节 "id": 3, "drive_mode": 0, "homing_offset": -1208, "range_min": 963, "range_max": 3078 }, "wrist_flex": { #腕部弯曲关节 "id": 4, "drive_mode": 0, "homing_offset": 2021, "range_min": 884, "range_max": 3222 }, "wrist_roll": { #腕部旋转关节 "id": 5, "drive_mode": 0, "homing_offset": -777, "range_min": 142, "range_max": 3961 }, "gripper": { #夹爪关节 "id": 6, "drive_mode": 0, "homing_offset": 909, "range_min": 1978, "range_max": 3522 } } 上面数据是每个舵机的相关参数,一共有6个舵机,每个舵机都有一个id来标识,可以对应实物来看实际是从下到上。 id: 电机的唯一标识符,用于在总线通信时精准定位和控制特定电机。 drive_mode: 电机的驱动模式,取值为 0 表示特定的驱动模式,不同驱动模式会影响电机的运动特性与控制方式。 homing_offset:归位偏移量,指电机从物理零点位置到校准零点位置的偏移量。此参数能保证电机在每次启动时都能回到准确的零点位置,从而提升运动精度。 range_min 和 range_max:电机运动范围的最小值和最大值,以数值形式呈现。这两个参数限定了电机的运动边界,避免因超出范围而导致硬件损坏或者运动异常。range_min 和 range_max:电机运动范围的最小值和最大值,以数值形式呈现。这两个参数限定了电机的运动边界,避免因超出范围而导致硬件损坏或者运动异常。 上面的参数信息在代码中lerobot/src/lerobot/robots/koch_follower/koch_follower.py 中回进行配置。 从上面的配置信息可知,之所以要标定,就是要获取电机的一些物理特性,主要是几个方面:明确电机物理运动范围、归一化电机位置、确保机器人运行精度等。 明确电机范围 机器的电机要有特定的物理运动范围,因为发送要是超过这个范围,回造成硬件损坏。使用calibrate操作可以记录每个电机的最小和最大位置限制,保证后续发送的指令都在安全范围内。 lerobot/src/lerobot/robots/so101_follower/so101_follower.py def calibrate(self) -> None: # ...已有代码... print( "Move all joints sequentially through their entire ranges " "of motion.\nRecording positions. Press ENTER to stop..." ) range_mins, range_maxes = self.bus.record_ranges_of_motion() self.calibration = {} for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, homing_offset=homing_offsets[motor], range_min=range_mins[motor], range_max=range_maxes[motor], ) # ...已有代码... 归一化电机位置 不同电机的原始位置值可能都离散的任意值,这些依赖于具体的电机型号,不具备通用性。calibrate 操作能将这些原始位置值归一化为有意义的连续值,像百分比、角度等,方便后续处理和控制。 lerobot/src/lerobot/motors/motors_bus.py def _normalize(self, id_, val, min_, max_, drive_mode): motor = self._id_to_name(id_) if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100: norm = (((val - min_) / (max_ - min_)) * 200) - 100 normalized_values[id_] = -norm if drive_mode else norm # ...已有代码... 确保机器运行精度 lerobot/src/lerobot/robots/so100_leader/so100_leader.py 中的 calibrate def calibrate(self) -> None: # ...已有代码... input(f"Move {self} to the middle of its range of motion and press ENTER....") homing_offsets = self.bus.set_half_turn_homings() # ...已有代码... 通过校准,可以确定电机的归位偏移量(homing offset),这有助于提高机器人运动的精度和重复性。在实际应用中,准确的位置控制对机器人完成任务至关重要。 最后经过校准后的数据,后续无需在重复校准,校准保存的路径一般在~/.cache/huggingface/lerobot/calibration,实际运行过程中,回进行加载。 保存 def _save_calibration(self, fpath: Path | None = None) -> None: draccus.dump(self.calibration, f, indent=4) 加载 def _load_calibration(self, fpath: Path | None = None) -> None: self.calibration = draccus.load(dict[str, MotorCalibration], f) 校准流程 python -m lerobot.calibrate \ --robot.type=so101_follower \ --robot.port=/dev/ttyACM0 \ --robot.id=R12252801 上面是执行命令示例,接下来个跟踪一下流程。Python 解释器接收到 python -m lerobot.calibrate 命令后,会将 lerobot.calibrate 当作一个可执行模块来处理。按照 Python 模块查找机制,会在 sys.path 所包含的路径里寻找 lerobot/calibrate.py 文件,对应的文件路径为lerobot/src/lerobot/calibrate.py。 @draccus.wrap() def calibrate(cfg: CalibrateConfig): init_logging() logging.info(pformat(asdict(cfg))) if isinstance(cfg.device, RobotConfig): device = make_robot_from_config(cfg.device) elif isinstance(cfg.device, TeleoperatorConfig): device = make_teleoperator_from_config(cfg.device) device.connect(calibrate=False) device.calibrate() device.disconnect() if __name__ == "__main__": calibrate() 调用calibrate含税,从calibrate可以看到主要的步骤为: 参数解析与配置初始化 创建设备实例 连接设备 设备校准 设备断开连接 参数解析与配置初始化 calibrate函数被@draccus.wrap()装饰,draccus是一个用于解析命令行参数并创建配置对象的库。 @dataclass class CalibrateConfig: teleop: TeleoperatorConfig | None = None robot: RobotConfig | None = None def __post_init__(self): if bool(self.teleop) == bool(self.robot): raise ValueError("Choose either a teleop or a robot.") self.device = self.robot if self.robot else self.teleop draccus会把命令行参数--robot.type=so101_follower --robot.port=/dev/ttyACM0 --robot.id=R12252801解析成CalibrateConfig类的实例。robot属性会被初始化为RobotConfig类型的对象,如果是--teleop.type=so101_leader 将会初始化为TeleoperatorConfig类型的对象。RobotConfig对象在config.py中定义的。 @dataclass(kw_only=True) class RobotConfig(draccus.ChoiceRegistry, abc.ABC): # Allows to distinguish between different robots of the same type id: str | None = None # Directory to store calibration file calibration_dir: Path | None = None def __post_init__(self): if hasattr(self, "cameras") and self.cameras: for _, config in self.cameras.items(): for attr in ["width", "height", "fps"]: if getattr(config, attr) is None: raise ValueError( f"Specifying '{attr}' is required for the camera to be used in a robot" ) @property def type(self) -> str: return self.get_choice_name(self.__class__) 其会根据命令行参数或配置文件中的类型字段,动态选择并创建对应的配置子类实例。 如当前的参数有type,port,id分别设置为so101_follower、/dev/ttyACM0 和 R12252801。 创建设备实例 根据cfg.device类型,调用对应工厂含税创建设备实例。 if isinstance(cfg.device, RobotConfig): device = make_robot_from_config(cfg.device) elif isinstance(cfg.device, TeleoperatorConfig): device = make_teleoperator_from_config(cfg.device) 由于 cfg.device 是 RobotConfig 类型,所以会调用 make_robot_from_config(cfg.device)。该函数可能定义在 /home/laumy/lerobot/src/lerobot/robots/init.py 或者相关的工厂模块中,依据 cfg.device.type 的值(即 so101_follower),会创建 SO101Follower 类的实例。 连接设备 调用 device.connect(calibrate=False) 方法连接设备,lerobot/src/lerobot/robots/so101_follower/so101_follower.py 文件中,其 connect 方法可能如下: def connect(self, calibrate: bool = True) -> None: if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") # 连接串口总线 self.bus = DynamixelBus(port=self.config.port, baudrate=1000000) self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() # 连接摄像头 for cam in self.cameras.values(): cam.connect() self.configure() logger.info(f"{self} connected.") 因为传入的 calibrate=False,所以不会触发自动校准。在连接过程中,会先连接串口总线,再连接摄像头,最后进行设备配置。 标定动作 调用 device.calibrate() 方法对设备进行校准,SO101Follower 类的 calibrate 方法可能如下: def calibrate(self) -> None: logger.info(f"\nRunning calibration of {self}") # 禁用扭矩 self.bus.disable_torque() # 设置电机工作模式为位置模式 for motor in self.bus.motors: self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) # 手动操作提示 input(f"Move {self} to the middle of its range of motion and press ENTER....") # 设置半圈归零偏移量 homing_offsets = self.bus.set_half_turn_homings() print( "Move all joints sequentially through their entire ranges " "of motion.\nRecording positions. Press ENTER to stop..." ) # 记录关节运动范围 range_mins, range_maxes = self.bus.record_ranges_of_motion() self.calibration = {} for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, homing_offset=homing_offsets[motor], range_min=range_mins[motor], range_max=range_maxes[motor], ) # 将校准数据写入电机 self.bus.write_calibration(self.calibration) # 保存校准数据 self._save_calibration() 校准过程包含禁用扭矩、设置电机工作模式、手动操作提示、记录关节运动范围、保存校准数据等步骤。 设备断开连接 调用 device.disconnect() 方法断开设备连接,SO101Follower 类的 disconnect 方法如下: def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") # 断开串口总线连接 self.bus.disconnect(self.config.disable_torque_on_disconnect) # 断开摄像头连接 for cam in self.cameras.values(): cam.disconnect() logger.info(f"{self} disconnected.")
  • 模型训练GPU跑飞

    模型训练GPU跑飞

    问题 当前使用的是魔改版的NVIDIA 2080 Ti 22G显卡,发现在模型训练过程中,跑着跑着就报错了,具体如下: raceback (most recent call last): File "/home/laumy/lerobot/./src/lerobot/scripts/train.py", line 291, in <module> train() File "/home/laumy/lerobot/src/lerobot/configs/parser.py", line 226, in wrapper_inner response = fn(cfg, *args, **kwargs) File "/home/laumy/lerobot/./src/lerobot/scripts/train.py", line 212, in train train_tracker, output_dict = update_policy( File "/home/laumy/lerobot/./src/lerobot/scripts/train.py", line 101, in update_policy train_metrics.loss = loss.item() RuntimeError: CUDA error: unspecified launch failure CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect. For debugging consider passing CUDA_LAUNCH_BLOCKING=1 Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions. 然后使用nvidia-smi发现显卡都找不到了。 nvidia-smi No devices were found 排查 重启电脑,重新训练模型,同时执行以下命令查看显卡情况。 watch -n nvidia-smi 发现训练过程中,温度飙升非常快,初步怀疑是性能跑太满,导致温度过高保护了。 解决 限制GPU的功率和核心频率 sudo nvidia-smi -pl 150 # 将功率限制设置为150W sudo nvidia-smi -lgc 1000,1000 # 限制核心频率为1000MHz 限制后继续跑,发现没有问题了。也可以使用nvidia-smi -a来详细查看参数。 另外如果想实时查看GPU监控,可以使用 nvtop 修改显卡为高性能模式 把GPU的runtime PM和PCIe ASPM关掉,下面是开机自启动配置脚本。 sudo vim /usr/local/sbin/fix-nvidia-runtime-pm.sh #!/bin/bash # 强制关闭 ASPM,避免因节能进入低功耗模式 echo performance | tee /sys/module/pcie_aspm/parameters/policy # 获取 NVIDIA GPU 的 PCIe 设备路径 GPU_DEV="0000:$(lspci | awk '/NVIDIA/{print $1; exit}')" if [ -z "$GPU_DEV" ]; then echo "NVIDIA GPU not found, exiting script." exit 1 fi # 设置 GPU 的 power/control 为 'on',确保设备不会休眠 CUR="/sys/bus/pci/devices/${GPU_DEV}" while [ -n "$CUR" ] && [ -e "$CUR" ]; do if [ -w "$CUR/power/control" ]; then echo on | sudo tee "$CUR/power/control" fi # 上一级桥 PARENT=$(readlink -f "$CUR/..") [[ "$PARENT" == "/sys/devices" ]] && break CUR="$PARENT" done # 启用 NVIDIA persistence 模式,避免 GPU 重置 nvidia-smi -pm 1 # 输出信息确认操作成功 echo "GPU power control set to 'on' and ASPM disabled." 配置可执行sudo chmod +x /usr/local/sbin/fix-nvidia-runtime-pm.sh 然后配置开机自启动 sudo vim /etc/systemd/system/fix-nvidia-runtime-pm.service [Unit] Description=Force NVIDIA GPU power control 'on' and disable ASPM After=multi-user.target Wants=multi-user.target [Service] Type=oneshot ExecStart=/usr/local/sbin/fix-nvidia-runtime-pm.sh RemainAfterExit=true [Install] WantedBy=multi-user.target 然后设置开机自启动 sudo systemctl daemon-reload sudo systemctl enable fix-nvidia-runtime-pm.service sudo systemctl start fix-nvidia-runtime-pm.service 重启后可以验证一下 cat /sys/bus/pci/devices/0000:$(lspci | awk '/NVIDIA/{print $1; exit}')/power/control 如果打开的是on成功 cat /sys/module/pcie_aspm/parameters/policy 输出应该是 [performance]
  • 安装unbuntu双系统

    安装unbuntu双系统

    准备 本文只作为个人安装简单记录,不做详细过程,如果是新手,可不必花时间再往下阅读。 准备一个16G以上的U盘,安装过程中跟实际的笔记本硬件会差异比较大。 下载ubuntu镜像 先下载ubuntu镜像:https://cn.ubuntu.com/download/desktop,我这里使用的是Ubuntu 24.04.2 LTS版本。或者可以从国内的镜像站获取https://mirror.nyist.edu.cn/ubuntu-releases/ 制作U盘启动盘 下载U盘启动盘制作工具,将镜像写入U盘中 准备ubuntu系统分区 打开磁盘管理,选中一个大一点的磁盘,点击压缩卷。 输入压缩空间容量就是要拓展出来的分区,不要进行格式化新建卷,如下就行。 设置非安全启动 需要将BIOS设置为非安全启动,否则不能启动U盘。系统关机重启,一直按F1(有的是F2),然后进入设置界面,将security boot改为disabled,然后保存。 选中U盘启动 关掉security boot后,重启按住F12选中U盘启动,接下来就可以安装向导安装ubuntu了。 特别需要注意的是,要选择手动分区安装。 然后选择剩余空间,进行格式化挂载。 选择好挂载点和文件系统Ext4. 接下来就进行下一步安装。 修改默认启动顺序 windows基础上装了ubuntu双系统后,默认将是ubuntu启动,如果要修改默认启动顺序,方法如下。开机重启,在刚开始启动的时候会让选择是ubuntu还是window,默认就是ubuntu系统第1行,然后记住windows是第几行,一般是第3行,确认了windows是第几行后,然后先进入登录ubuntu系统。 sudo vim /etc/defaut/grub 将GRUB_DEFAULT=0修改为GRUB_DEFAULT=2,这里的2对应的第三行。 然后保存退出,
\t