Ai
  • 具身智能ACT算法

    具身智能ACT算法

    基本原理 简单总结一下什么是ACT算法。传统的机器算法过程是观测关节位置J1经过模型预测动作A2然后执行,观测到J2预测数A3,观测到J3遇到A4依次类推,这样就有一个问题,假设预测出的A2跟实际相比偏差就比较大那么对应的观测到的J2就偏离比较大。如果要连续预测K步,就要连续采集K步,缺点就是误差会累积同时预测效率也比较低。那么对于ACT算法是怎么进行优化的了? ACT算法是一下观测连续的K个动作,然后预测出K个动作,这样相对于传统算法效率就提升了K倍。同时也可以解决累积误差,计时K个连续的动作中,有某个动作偏差比较大,但是整体经过模型就会弱化不至于累积。假设K是10 ,简单举个例子理解过程,T0时刻观测到J1数据(开始时只有一个数据),模型直接预测数10个动作序列,等机器按顺序依次执行完这10个动作后,模型下一次就直接把这10个动作当做输入然后预测下一批的10个动作,依次类推。 基于transformer的动作分块(ACT)架构。分为训练模式和测试模式。 当为训练模式是,ACT为左图的编码器+右图的解码器。左图可以理解为一个CVAE的编码器,将关节序列、动作序列、CLS经过transformer编码压缩为风格变量Z。然后将Z再加上采集的摄像头数据、关节序列作为输入给到右边的解码器最终输出动作序列。 当为测试模式时,左图丢弃不再,只需要使用右图的部分,输入为摄像头数据、关节序列、Z(被简单设置为0,表示先验的平均值)。可以这么理解Z为CVAE模型中的风格,经过训练后,Z已经让模型的参数定型,在后续的测试过程中就不需要了,因为参数已经固定了就不用了。 以lerobot的机械臂为例,个人理解关节序列指的是从臂的舵机位置,而动作序列是主臂的舵机位置。 动作分块 动作分块Action Chunking机制,传统机器人每执行一步都要重新观测环境(如拍摄一张照片),走一步采集一步预测一步,而ACT采用的是"分块执行"策略。 具体就是累积到每K步观测一次(如K=100),然后一次性输出K个动作序列,执行这就可以按顺序执行这组K个动作序列。 动作分块也可以理解为决策频率进行了压缩,传统的单步策略需每一步观测环境并生成动作(如T次决策),而ACT是每K步观测一次,一次性生成后续K个动作序列(决策点将至T/K个),例如若K=10,1000步的任务仅需100次决策,效率提升了10倍。 将各个动作组合在一起并作为一个单元的执行,从而使起存储和执行更有效率,直观地讲一组动作可以对应抓住糖果包装纸的一角或将电池插入插槽,在实现中将块大小固定为K,每K步agent会收到一次观测然后预测生成K个动作然后机器按顺序执行。如上图所示假设K为4,t=0时刻策略观测到4个动作,然后就会生成4个动作序列,让机器按顺序执行;紧接着到t=4这个时刻,策略又观测到4个动作,生成4个预测动作机器按这个4个动作顺序执行。 分块的还可以帮助模拟人类演示中的非马尔可夫行为,具体来说是单步策略会难以应对时间相关的混杂因素,例如在采集演示过程中的停顿,会让模型这时候不知道该如何做,因为这些行为不仅仅取决于状态,还取决于时间步长,而动作分块就可以缓解当混杂因素位于一个块内是,不会引入历史条件策略的因果混淆问题。 时间集成 仅仅简单的是用动作分块实现还有一个问题,那就是每K步突然合并一个新的环境观测,可能会导致机器人运动不平稳,也就是说执行完一系列动作后,再到下一个序列动作时,可能差异比较大会导致机器抽搐。如下图中t0时刻开始执行的0~3序列切到t4时刻执行的4~7训练,这个切换过程可能会导致机器运动不平稳。 为了解决这个问题,提出了时间步查询策略。假设动作分块为K,那么每个时间刻都预测了K个序列,如上图t0时刻预测了0~3,t1时刻预测了1~4,t2时刻预测了2~5。然后每个时间刻真正要执行的序列为该时间集成加权平均,加权方案为wi = exp(−m ∗ i),其中w0是最早的动作权重,合并新的观测速度由m控制,其中m越小表示合并越快。如上图1位置实际执行的动作为t0时刻预测的第2个动作与t1时刻预测的第1个动作加权平均,2位置实际执行的动作是t0时刻第3个预测动作、t1时刻第2个预测动作、t2时刻第1个预测动作加权平均,以此类推。靠得越紧的预测动作权重值越大,靠得远的权重值越小。 详细架构 训练 步骤1 采样数据 准备好采样数据: images:对应的4组RGB图像。 joints:2个机器,每个机器有7个自由度,那么对应14个关节位置信息。 action sequence:演示数据集长度为K组的目标动作序列,每组14个关节位置信息。 怎么理解joints和action sequence了? 假设当前是T0时刻,采样到机器4组摄像头数据得到4 * (4806403)的图像数据,然后也采样到当前时刻机器的关节位置信息(14,)。那么action sequence数据怎么来了,要从T0时刻开始计时,到T0+K时刻进行记录K组(每组14关节位置信息)关节位置信息,然后将这些信息组合得到一组完整的数据。但一般采样的这K组数据一般使用领导臂,不使用机器的,作者在论文中提到主要是考虑因为机器是通过低级PID控制器来转换执行的,采用机器的记录数据可能会导致符合误差。 步骤2 推理Z 步骤3 预测动作 推理
  • lerobot学习率调度器

    lerobot学习率调度器

    学习率调度器简介 是什么 学习率调度器(Learning Rate Scheduler)是深度学习训练中动态调整优化器学习率的工具(注意是在优化器的基础上动态调整学习率),通过优化收敛过程提升模型性能。 学习率(η)控制梯度更新步长,参数更新量 = -η × 梯度。过大步长导致震荡,过小则陷入局部最优或训练缓慢,固定学习率易导致训练初期震荡或后期收敛缓慢,调度器在训练期间自适应调整初期较高学习率加速收敛,中期稳定探索最优解,后期精细调优避免震荡。 以下是常见的学习率调度器 StepLR:每隔固定epoch将学习率乘以衰减因子(如step_size=10, gamma=0.5)。 ExponentialLR:每epoch按指数衰减(lr = lr × gamma)。 CosineAnnealingLR:按余弦曲线周期下降:η = η_min + 0.5×(η_max - η_min)×(1 + cos(π×t/T_max))。 OneCycleLR:分两阶段:线性升温至峰值,再退火至极小值。 PowerLR:基于幂律关系调整,与批次大小和token数量无关。 怎么用 import torch.optim as optim from torch.optim.lr_scheduler import StepLR # 1. 定义模型和优化器 model = ... # 神经网络模型 optimizer = optim.SGD(model.parameters(), lr=0.1) # 初始学习率0.1 # 2. 创建调度器(绑定优化器) scheduler = StepLR(optimizer, step_size=30, gamma=0.5) # 每30epoch衰减50% # 3. 训练循环 for epoch in range(100): # 前向传播 + 反向传播 train(...) optimizer.zero_grad() loss.backward() optimizer.step() # 4. 更新学习率(通常在epoch结束时) scheduler.step() # 查看当前学习率 print(f"Epoch {epoch}: LR={scheduler.get_last_lr()[0]:.6f}") 上面代码演示了基础的学习率调度使用示例,创建一个学习率调度器,传入的参数有优化器,主要的目的是让学习率调度器和优化器绑定,因为相当于是在优化器的基础上改进学习率调度。接下来就是在前向、反向传播更新完参数后调用scheduler.step()更新学习率,以便下一次计算使用。 from torch.optim.lr_scheduler import CosineAnnealingLR optimizer = SGD(model.parameters(), lr=0.01) # 基础学习率 total_epochs = 100 warmup_epochs = 10 # 预热epoch数 for epoch in range(total_epochs): # 1. 预热阶段:前10epoch线性增长 if epoch < warmup_epochs: lr = 0.01 * (epoch / warmup_epochs) for param_group in optimizer.param_groups: param_group['lr'] = lr # 2. 余弦退火阶段 else: scheduler = CosineAnnealingLR(optimizer, T_max=total_epochs-warmup_epochs, eta_min=1e-5) scheduler.step() # 训练代码同上... print(f"Epoch {epoch}: LR={optimizer.param_groups[0]['lr']:.6f}") 上面使用的是余弦退火+预热的方式。 lerobot调度器 抽象基类 @dataclass class LRSchedulerConfig(draccus.ChoiceRegistry, abc.ABC): num_warmup_steps: int # 预热步数(所有调度器的公共参数) @property def type(self) -> str: return self.get_choice_name(self.__class__) # 获取子类注册名称(如 "diffuser") @abc.abstractmethod def build(self, optimizer: Optimizer, num_training_steps: int) -> LRScheduler | None: """创建学习率调度器实例""" raise NotImplementedError LRSchedulerConfig是学习率调度器配置的核心抽象,通过抽象接口定义+配置注册机制,实现了学习率调度策略的统一管理与灵活扩展。 其同样继承了abc.ABC标记为抽象基类,强制子类事项build抽象方法,同时继承draccus.ChoiceRegistry提供子类注册机制,通过 @LRSchedulerConfig.register_subclass("名称") 将子类与调度器类型绑定(如 "diffuser" → DiffuserSchedulerConfig),支持配置驱动的动态实例化。同时使用了@dataclass 装饰器自动生成构造函数、repr 等方法,简化调度器超参数的定义与管理。 其核心属性只有一个num_warmup_steps表示学习率预热步数。预热是深度学习训练的常见技巧(尤其在 Transformer 等模型中),将其作为基类字段可避免子类重复定义。也是几乎所有学习率调度器的基础参数,用于控制“预热阶段”(学习率从低到高线性增长的步数),避免训练初期因高学习率导致的不稳定。 其只有两个方法type和build,type是用于获取子类注册调度器类型名称进而匹配到对应子类,而build的方法是强制子类实现。 实例化子类 lerobot的学习率调度实现了3个DiffuserSchedulerConfig、VQBeTSchedulerConfig、CosineDecayWithWarmupSchedulerConfig子类,这里以DiffuserSchedulerConfig简单说明。 @LRSchedulerConfig.register_subclass("diffuser") @dataclass class DiffuserSchedulerConfig(LRSchedulerConfig): name: str = "cosine" # 调度器类型(如 "cosine"、"linear",来自 diffusers) num_warmup_steps: int | None = None # 预热步数(可选,未指定则不预热) def build(self, optimizer: Optimizer, num_training_steps: int) -> LambdaLR: from diffusers.optimization import get_scheduler # 复用 Diffusers 的调度器实现 kwargs = {**asdict(self), "num_training_steps": num_training_steps, "optimizer": optimizer} # 构造调度器参数:类字段 + 训练相关参数 return get_scheduler(**kwargs) # 调用 diffusers API 创建调度器 diffusers.optimization.get_scheduler 是 Diffusers 库提供的调度器工厂函数,支持多种预定义策略(如 "cosine"、"linear"、"constant" 等)。 asdict(self)将 DiffuserSchedulerConfig 实例的字段(如 name="cosine"、num_warmup_steps=1000)转换为字典;而num_training_steps:总训练步数(调度器需基于此计算衰减周期);最后的optimizer就是待绑定的优化器实例(调度器需调整其参数组的学习率)。 状态管理 (1)存储 def save_scheduler_state(scheduler: LRScheduler, save_dir: Path) -> None: state_dict = scheduler.state_dict() # 获取调度器状态(如当前 step、预热步数等) write_json(state_dict, save_dir / SCHEDULER_STATE) # 保存为 JSON 文件(如 "scheduler_state.json") 该函数主要是将学习率调度器的状态写入到磁盘,为后续断点续训提供支持。state_dict = scheduler.state_dict()获取调度器的当前状态字典,包含调度器运行所需的所有动态信息。接着调用write_json写入到到文件中,文件名默认scheduler_state.json。 (2)加载 def load_scheduler_state(scheduler: LRScheduler, save_dir: Path) -> LRScheduler: # 从 JSON 加载状态,并按当前调度器结构适配(确保兼容性) state_dict = deserialize_json_into_object(save_dir / SCHEDULER_STATE, scheduler.state_dict()) scheduler.load_state_dict(state_dict) # 恢复状态 return scheduler 该函数也比较简单,主要负责从磁盘加载之前保存的调度器状态(如当前训练步数、学习率调整进度等),使训练能够从断点处继续,确保学习率调整策略的连续性。 工程调用 命令参数 在lerobot中,启动训练是可以通过参数来实例化使用哪些调度器,如下。 --scheduler.type=diffuser指定使用diffuser调度类。 --scheduler.num_warmup_steps=1000指定预热步数。 --scheduler.num_warmup_steps=0.5指定余弦衰减周期。 创建 工程中通过 optim/factory.py 中的 make_optimizer_and_scheduler 函数统一创建优化器和调度器,并完成两者的绑定。 def make_optimizer_and_scheduler( cfg: TrainPipelineConfig, policy: PreTrainedPolicy ) -> tuple[Optimizer, LRScheduler | None]: # 1. 创建优化器(基于优化器配置,如 AdamConfig、MultiAdamConfig) optimizer = cfg.optimizer.build(policy.parameters()) # policy.parameters() 为模型参数 # 2. 创建调度器(基于调度器配置,如 DiffuserSchedulerConfig、VQBeTSchedulerConfig) # 关键:通过调度器配置的 `build` 方法,将优化器作为参数传入,完成绑定。 lr_scheduler = cfg.scheduler.build(optimizer, cfg.steps) if cfg.scheduler is not None else None return optimizer, lr_scheduler 优化器创建:cfg.optimizer.build(...) 根据配置生成优化器实例(如 Adam、AdamW),并关联模型参数(policy.parameters())。 调度器绑定:cfg.scheduler.build(optimizer, cfg.steps) 调用调度器配置类(如 DiffuserSchedulerConfig)的 build 方法,将优化器实例作为参数传入,生成与该优化器绑定的调度器实例(如 LambdaLR) 更新 在训练过程中,如工程 scripts/train.py 中,调度器被集成到训练循环,通过 scheduler.step() 更新学习率通过调用scheduler.step() 更新学习率。 def train(): # ... 初始化优化器、调度器 ... optimizer, lr_scheduler = make_optimizer_and_scheduler(cfg, policy) for step in range(start_step, cfg.steps): # ... 前向传播、损失计算、反向传播 ... optimizer.step() if lr_scheduler is not None: lr_scheduler.step() # 调用调度器更新学习率 续训恢复 在断点续训是,通过 utils/train_utils.py 中的 save_training_state 和 load_training_state 函数处理断点续训,其中调用了 schedulers.py 的状态管理函数: def save_training_state(step: int, optimizer: Optimizer, lr_scheduler: LRScheduler | None, save_dir: Path): # ... 保存优化器状态 ... if lr_scheduler is not None: save_scheduler_state(lr_scheduler, save_dir) # 调用 schedulers.py 中的保存函数 def load_training_state(checkpoint_path: Path, optimizer: Optimizer, lr_scheduler: LRScheduler | None): # ... 加载优化器状态 ... if lr_scheduler is not None: lr_scheduler = load_scheduler_state(lr_scheduler, checkpoint_path) # 调用 schedulers.py 中的加载函数 return step, optimizer, lr_scheduler
  • lerobot策略优化器

    lerobot策略优化器

    torch.optim简介 在学校lerobot的策略优化器前,我们先再复习一下什么是优化器。 什么优化器 优化器官方解释就是在深度学习中让损失函数通过梯度下降思想逐步调整参数以达到最小损失。 简单理解优化器的就是更新计算参数的,根据损失函数的梯度方向调整模型权重和偏置值,公式为:新参数 = 旧参数 - 学习率 × 梯度。通过迭代逐步逼近最优解。在文章http://www.laumy.tech/2050.html我们已经探讨过常用的优化算法。 接下来我们再来从PyTorch使用的角度复习一下。torch.optim 是 PyTorch 官方优化器模块,提供了 SGD、Adam、AdamW 等主流优化算法的实现,所有的优化器都继承自基类 torch.optim.Optimizer。其核心作用是如下: 自动化参数更新:根据反向传播计算的梯度,按特定优化策略(如 Adam 的自适应学习率)更新模型参数。 统一接口抽象:通过 Optimizer 基类封装不同算法,提供一致的使用流程(zero_grad() 清空梯度 → step() 更新参数)。 在Pytorch中优化器统一封装成torch.optim接口调用,可以有以下优势。 避免重复实现复杂算法:无需手动编写 Adam 的动量、二阶矩估计等逻辑,直接调用成熟接口。 灵活支持训练需求:支持单/多参数组优化(如不同模块用不同学习率)、学习率调度、梯度清零等核心训练逻辑。 工程化与可维护性:通过统一接口管理超参数(lr、weight_decay),便于实验对比与代码复用。 torch.optim怎么用 Step 1:定义模型与优化器 import torch from torch import nn, optim # 1. 定义模型(示例:简单线性层) model = nn.Linear(in_features=10, out_features=2) # 2. 初始化优化器:传入模型参数 + 超参数 optimizer = optim.Adam( params=model.parameters(), # 待优化参数(PyTorch 参数迭代器) lr=1e-3, # 学习率(核心超参数) betas=(0.9, 0.999), # Adam 动量参数(控制历史梯度影响) weight_decay=0.01 # 权重衰减(L2 正则化,可选) ) 定义了一个optimizer使用的是Adam优化器,该优化器学习率设置为1e-3,权重衰减为0.01。 Step 2:训练循环 # 模拟输入数据(batch_size=32,特征维度=10) inputs = torch.randn(32, 10) targets = torch.randn(32, 2) # 模拟目标值 # 训练循环 for epoch in range(10): # ① 清空过往梯度(必须!否则梯度累积导致更新异常) optimizer.zero_grad() # ② 前向传播 + 计算损失 outputs = model(inputs) loss = nn.MSELoss()(outputs, targets) # 均方误差损失 # ③ 反向传播计算梯度 + 优化器更新参数 loss.backward() # 自动计算所有可训练参数的梯度 optimizer.step() # 根据梯度更新参数 print(f"Epoch {epoch}, Loss: {loss.item():.4f}") loss是损失,通过调用loss.backward()进行反向传播Pytorch就自动把梯度计算好保存了,然后调用关键的一部optimizer.step()就可以执行参数更新了(即新参数=旧参数-学习率*梯度),需要注意的是,在调用loss.backward()进行反向传播计算梯度时,要先调用optimizer.zero_grad()把之前的梯度值情况,因此每计算一次梯度都是被保存,不情况会导致梯度累积。 Step 3:差异化优化参数分组 # 定义参数组(不同模块用不同学习率) optimizer = optim.Adam([ { "params": model.backbone.parameters(), # backbone 参数 "lr": 1e-5 # 小学习率微调 }, { "params": model.head.parameters(), # 任务头参数 "lr": 1e-3 # 大学习率更新 } ], betas=(0.9, 0.999)) # 公共超参数(所有组共享) 上面是针对同一个模型内不同模块使用不同的超参数lr。 抽象基类OptimizerConfig OptimizerConfig 是所有优化器配置的抽象基类,通过 draccus.ChoiceRegistry 实现子类注册机制(类似插件系统),为新增优化器类型提供统一接口。 @dataclass class OptimizerConfig(draccus.ChoiceRegistry, abc.ABC): lr: float weight_decay: float grad_clip_norm: float @property def type(self) -> str: return self.get_choice_name(self.__class__) @classmethod def default_choice_name(cls) -> str | None: return "adam" @abc.abstractmethod def build(self) -> torch.optim.Optimizer | dict[str, torch.optim.Optimizer]: raise NotImplementedError 继承关系 class OptimizerConfig(draccus.ChoiceRegistry, abc.ABC): OptimizerConfig 继承abc.ABC和draccus.ChoiceRegistry,前者标记为抽象基类,强制子类实现build的抽象方法,确保接口的一致性。后者提供子类注册机制,通过@OptimizerConfig.register_subclass("名称") 将子类与优化器类型绑定(如 "adam" → AdamConfig),支持配置驱动的动态实例化。 核心属性 lr: float # 学习率(核心超参数) weight_decay: float # 权重衰减(L2正则化系数) grad_clip_norm: float # 梯度裁剪阈值(防止梯度爆炸) 上面3个参数都是优化器的基础配置,避免子类重复定义。 lr/weight_decay:直接传递给 torch.optim 优化器(如 Adam 的 lr 参数) grad_clip_norm:不参与优化器创建,而是在训练流程中用于梯度裁剪(如 train.py 中 torch.nn.utils.clip_grad_norm_) 核心方法 (1)type属性用于表示优化器类型 @property def type(self) -> str: return self.get_choice_name(self.__class__) 通过 draccus.ChoiceRegistry 的 get_choice_name 方法,获取子类注册的优化器类型名称(如 AdamConfig 的 type 为 "adam")。 在实际应用中,在配置解析时,通过 type 字段(如 {"type": "adam"})即可匹配到对应子类(AdamConfig),实现“配置→实例”的自动映射。 (2)default_choice_name默认优化器类型 @classmethod def default_choice_name(cls) -> str | None: return "adam" 当配置中为显式指定type时,默认是用adam类型即AdamConfig,旨在简化用户配置,无需手动指定常见优化器类型。 (3)build抽象接口,优化器创建接口 @abc.abstractmethod def build(self) -> torch.optim.Optimizer | dict[str, torch.optim.Optimizer]: """Build the optimizer. It can be a single optimizer or a dictionary of optimizers.""" raise NotImplementedError 强制子类实现build的方法。 实例化子类 optimizers.py中一共定义了4种优化器配置子类,adam,adamw,sgd, multi_adam,其中前3个是单参数优化器,最后一个是多参数优化器,最终均通过build方法创建torch.optim实例 单参数优化器 @OptimizerConfig.register_subclass("adam") @dataclass class AdamConfig(OptimizerConfig): lr: float = 1e-3 betas: tuple[float, float] = (0.9, 0.999) eps: float = 1e-8 weight_decay: float = 0.0 grad_clip_norm: float = 10.0 def build(self, params: dict) -> torch.optim.Optimizer: kwargs = asdict(self) # 将 dataclass 字段转为字典 kwargs.pop("grad_clip_norm") # 移除梯度裁剪阈值(非优化器参数) return torch.optim.Adam(params, **kwargs) # 创建 PyTorch Adam 实例 AdamConfig 是 OptimizerConfig 的核心子类,封装了 Adam 优化器的配置与实例化逻辑,通过 draccus 注册机制与工程训练流程深度集成。 @OptimizerConfig.register_subclass("adam")将 AdamConfig 类与字符串 "adam" 绑定,实现 配置驱动的动态实例化,当配置文件中 optimizer.type: "adam" 时,draccus 会自动解析并实例化 AdamConfig。继承自 OptimizerConfig 的 ChoiceRegistry 机制,确保子类可通过 type 字段被唯一标识。 @dataclass自动生成 init、repr 等方法,简化超参数管理,无需手动编写构造函数,直接通过类字段定义超参数(如 lr=1e-3)。 在AdamConfig中默认初始化了一些参数值,其中lr、betas、eps、weight_decay直接对应 torch.optim.Adam 的参数,通过 build 方法传递给 PyTorch 优化器,而grad_clip_norm不参与优化器创建,而是用于训练时的梯度裁剪(如 train.py 中 torch.nn.utils.clip_grad_norm_),实现“优化器参数”与“训练流程参数”的职责分离。 在最后的build方法中,调用torch.optim.Adam(params, **kwargs) 实例化优化器。在此之前,先调用asdict(self)将 AdamConfig 实例的字段(如 lr、betas)转换为字典 {"lr": 1e-3, "betas": (0.9, 0.999), ...},再调用kwargs.pop("grad_clip_norm")剔除 grad_clip_norm(梯度裁剪阈值),因其不属于torch.optim.Adam 的参数(优化器仅负责参数更新,梯度裁剪是训练流程的独立步骤)。 多参数优化器 @OptimizerConfig.register_subclass("multi_adam") @dataclass class MultiAdamConfig(OptimizerConfig): optimizer_groups: dict[str, dict[str, Any]] = field(default_factory=dict) # 组内超参数 def build(self, params_dict: dict[str, list]) -> dict[str, torch.optim.Optimizer]: optimizers = {} for name, params in params_dict.items(): # 合并默认超参数与组内超参数(组内参数优先) group_config = self.optimizer_groups.get(name, {}) optimizer_kwargs = { "lr": group_config.get("lr", self.lr), # 组内 lr 或默认 lr "betas": group_config.get("betas", (0.9, 0.999)), "weight_decay": group_config.get("weight_decay", self.weight_decay), } optimizers[name] = torch.optim.Adam(params, **optimizer_kwargs) # 为每组创建独立优化器 return optimizers # 返回优化器字典:{"backbone": optimizer1, "head": optimizer2, ...} MultiAdamConfig 是 OptimizerConfig 的关键子类,专为多参数组优化场景设计,支持为模型不同模块(如 backbone 与 head)创建独立的 Adam 优化器,实现差异化超参数配置。 首先跟前面单参数的属性不同点是多了一个optimizer_groups,这是一个超参数字典,存储多组不同的超参数,示例如下。 optimizer_groups={ "backbone": {"lr": 1e-5, "weight_decay": 1e-4}, # 低学习率微调 backbone "head": {"lr": 1e-3, "betas": (0.95, 0.999)} # 高学习率更新 head,自定义动量参数 } build的方法主要逻辑如下: def build(self, params_dict: dict[str, list]) -> dict[str, torch.optim.Optimizer]: optimizers = {} for name, params in params_dict.items(): # 1. 获取组内超参数(无则使用默认) group_config = self.optimizer_groups.get(name, {}) # 2. 合并默认与组内超参数 optimizer_kwargs = { "lr": group_config.get("lr", self.lr), "betas": group_config.get("betas", (0.9, 0.999)), "eps": group_config.get("eps", 1e-5), "weight_decay": group_config.get("weight_decay", self.weight_decay), } # 3. 为该组创建 Adam 优化器 optimizers[name] = torch.optim.Adam(params, **optimizer_kwargs) return optimizers # 返回:{组名: 优化器实例} 其中params_dict是超参数组的来源,是字典类型,键为参数组名称(需与 optimizer_groups 键匹配),值为该组参数列表(如模型某模块的 parameters())。通常是策略类的get_optim_params方法提供,如下: # 策略类中拆分参数组(示例逻辑) def get_optim_params(self): return { "backbone": self.backbone.parameters(), "head": self.head.parameters() } 主要的核心逻辑是对于每个参数组,优先使用 optimizer_groups 中的超参数(如 group_config.get("lr")),无则回退到默认值(如 self.lr),然后为每个参数组创建独立的 torch.optim.Adam 实例,确保参数更新互不干扰。 优化器状态管理 状态保存 将优化器的某一个时刻参数进行存储,方便过程查看以及重新加载模型训练等等。 def save_optimizer_state( optimizer: torch.optim.Optimizer | dict[str, torch.optim.Optimizer], # 优化器实例或字典 save_dir: Path # 根保存目录 ) -> None: if isinstance(optimizer, dict): # 1. 处理多参数优化器字典(如 MultiAdamConfig 创建的优化器) for name, opt in optimizer.items(): # 遍历优化器名称与实例(如 "backbone": opt1) optimizer_dir = save_dir / name # 创建子目录:根目录/优化器名称(如 save_dir/backbone) optimizer_dir.mkdir(exist_ok=True, parents=True) # 确保目录存在(含父目录创建) _save_single_optimizer_state(opt, optimizer_dir) # 委托单优化器保存逻辑 else: # 2. 处理单参数优化器(如 AdamConfig 创建的优化器) _save_single_optimizer_state(optimizer, save_dir) # 直接使用根目录保存 区分单参数和多参数优化器,对应多参数优化器为每个优化器创建独立子目录(如 save_dir/backbone、save_dir/head),避免不同优化器的状态文件冲突。如果是单优化器,则直接调用 _save_single_optimizer_state,状态文件保存于 save_dir 根目录,结构简洁。 def _save_single_optimizer_state(optimizer: torch.optim.Optimizer, save_dir: Path) -> None: """Save a single optimizer's state to disk.""" # 1. 获取优化器完整状态字典(含参数组和内部状态) state = optimizer.state_dict() # 2. 分离参数组(超参数配置)与剩余状态(张量数据) param_groups = state.pop("param_groups") # 参数组:学习率、权重衰减等超参数(非张量) flat_state = flatten_dict(state) # 剩余状态:动量、二阶矩等张量(展平嵌套字典,便于序列化) # 3. 保存张量状态(safetensors)与参数组(JSON) save_file(flat_state, save_dir / OPTIMIZER_STATE) # 张量数据:高效二进制存储(如 "optimizer_state.safetensors") write_json(param_groups, save_dir / OPTIMIZER_PARAM_GROUPS) # 参数组:JSON 格式(如 "optimizer_param_groups.json"方便可视化查看) 存储张量和非张量的格式 param_groups:包含优化器的超参数配置(如 lr、weight_decay、betas),是列表嵌套字典的结构,存储为JSON格式,JSON 序列化后可直接查看超参数,便于训练过程追溯。其文件名为optimizer_param_groups.json。 state:包含优化器的内部状态张量(如 Adam 的 exp_avg、exp_avg_sq 动量缓冲区),是嵌套字典结构,通过 flatten_dict 展平后用 safetensors 保存,safetensors专为张量设计的存储格式,支持高效读写、内存映射,避免 PyTorch torch.save 的 pickle 兼容性问题。其文件名为optimizer_state.safetensors。 状态加载 def load_optimizer_state( optimizer: torch.optim.Optimizer | dict[str, torch.optim.Optimizer], # 待恢复的优化器(单实例或字典) save_dir: Path # 状态文件根目录 ) -> torch.optim.Optimizer | dict[str, torch.optim.Optimizer]: if isinstance(optimizer, dict): # 1. 处理多优化器字典(如 MultiAdamConfig 创建的优化器) loaded_optimizers = {} for name, opt in optimizer.items(): # 遍历优化器名称与实例(如 "backbone": opt1) optimizer_dir = save_dir / name # 子目录路径:根目录/优化器名称(如 save_dir/backbone) if optimizer_dir.exists(): # 仅当目录存在时加载(避免新增优化器时出错) loaded_optimizers[name] = _load_single_optimizer_state(opt, optimizer_dir) else: loaded_optimizers[name] = opt # 目录不存在时返回原优化器 return loaded_optimizers else: # 2. 处理单优化器(如 AdamConfig 创建的优化器) return _load_single_optimizer_state(optimizer, save_dir) # 直接从根目录加载 同样是区分单参数和多参数,对于多参数组根据save_dir / name 定位每个优化器的独立子目录(与 save_optimizer_state 的保存结构对应),如果是单参数优化器直接调用 _load_single_optimizer_state,从 save_dir 根目录加载状态文件。 def _load_single_optimizer_state(optimizer: torch.optim.Optimizer, save_dir: Path) -> torch.optim.Optimizer: """Load a single optimizer's state from disk.""" # 1. 获取当前优化器的状态字典结构(用于校验与适配) current_state_dict = optimizer.state_dict() # 2. 加载并恢复张量状态(safetensors → 嵌套字典) flat_state = load_file(save_dir / OPTIMIZER_STATE) # 加载展平的张量状态(如 "optimizer_state.safetensors") state = unflatten_dict(flat_state) # 恢复为嵌套字典(与保存时的 flatten_dict 对应) # 3. 处理优化器内部状态(如动量缓冲区) if "state" in state: # 将字符串键转为整数(safetensors 保存时键为字符串,PyTorch 期望参数索引为整数) loaded_state_dict = {"state": {int(k): v for k, v in state["state"].items()}} else: loaded_state_dict = {"state": {}} # 新创建的优化器可能无状态,初始化为空 # 4. 处理参数组(超参数配置,如学习率、权重衰减) if "param_groups" in current_state_dict: # 从 JSON 反序列化参数组,并确保结构与当前优化器匹配 param_groups = deserialize_json_into_object( save_dir / OPTIMIZER_PARAM_GROUPS, # 加载参数组 JSON 文件(如 "optimizer_param_groups.json") current_state_dict["param_groups"] # 以当前参数组结构为模板,确保兼容性 ) loaded_state_dict["param_groups"] = param_groups # 5. 将恢复的状态字典加载到优化器 optimizer.load_state_dict(loaded_state_dict) return optimizer 张量的状态恢复部分,通过 unflatten_dict 将保存时展平的状态(flatten_dict)恢复为嵌套字典,匹配 PyTorch 优化器状态的原始结构。接着通过state["state"] 的键在保存时被序列化为字符串(如 "0"),加载时需转回整数(如 0),以匹配 PyTorch 参数索引的整数类型。 对于参数组恢复先通过JSON 反序列化,deserialize_json_into_object 将 JSON 文件中的参数组配置(如 [{"lr": 1e-3, ...}, ...])反序列化为 Python 对象。再以当前优化器的 current_state_dict["param_groups"] 为模板,确保加载的参数组与当前优化器的参数结构兼容(如参数组数量、超参数字段匹配),避免因配置变更导致的加载失败。 最后合并 state(张量数据)和 param_groups(超参数配置)为完整状态字典,通过 optimizer.load_state_dict 完成优化器状态恢复。 工程调用 创建流程 # 1. 策略提供参数(如多参数组) params = policy.get_optim_params() # 例如:{"backbone": [params1...], "head": [params2...]} # 2. 配置解析:根据 config.optimizer.type 实例化对应子类(如 MultiAdamConfig) cfg.optimizer = MultiAdamConfig( lr=1e-3, optimizer_groups={"backbone": {"lr": 1e-5}, "head": {"lr": 1e-3}} ) # 3. 创建优化器实例 optimizer = cfg.optimizer.build(params) # 返回:{"backbone": Adam, "head": Adam} 训练流程 def update_policy(...): # 前向传播计算损失 loss, output_dict = policy.forward(batch) # 反向传播与梯度裁剪 grad_scaler.scale(loss).backward() grad_scaler.unscale_(optimizer) torch.nn.utils.clip_grad_norm_(policy.parameters(), grad_clip_norm) # 参数更新 grad_scaler.step(optimizer) optimizer.zero_grad() # 清空梯度
  • lerobot训练

    lerobot训练

    初始化 @parser.wrap() def train(cfg: TrainPipelineConfig): cfg.validate() # 验证配置合法性(如路径、超参数范围) init_logging() # 初始化日志系统(本地文件+控制台输出) if cfg.seed is not None: set_seed(cfg.seed) # 固定随机种子(确保训练可复现) device = get_safe_torch_device(cfg.policy.device, log=True) # 自动选择训练设备(GPU/CPU) torch.backends.cudnn.benchmark = True # 启用CuDNN自动优化(加速卷积运算) torch.backends.cuda.matmul.allow_tf32 = True # 启用TF32精度(加速矩阵乘法) 初始化阶段主要是解析参数,初始化日志,确定训练的设备。 参数解析依旧是使用了装饰器parser.wrap,通过命令后参数构建生成TrainPipelineConfig类,该类是LeRobot 框架中训练流程的核心配置类,继承自 HubMixin(支持 Hugging Face Hub 交互),通过 dataclass 定义训练全流程的参数(如数据集路径、模型超参、训练步数等)。其核心作用是: 参数聚合:统一管理数据集、策略模型、优化器、评估等模块的配置,避免参数分散。 合法性校验:通过 validate 方法确保配置参数有效(如路径存在、超参数范围合理)。 可复现性支持:固定随机种子、保存/加载配置,确保训练过程可复现。 Hub 集成:支持从 Hub 加载预训练配置或推送配置至 Hub,便于共享和断点续训。 (1)核心属性 dataset:DatasetConfig, 数据集配置(如 repo_id="laumy/record-07271539"、图像预处理参数) env: envs.EnvConfig,评估环境配置(如仿真环境类型、任务名称,仅用于训练中评估) policy: PreTrainedConfig,策略模型配置(如 ACT 的 Transformer 层数、视觉编码器类型)。 output_dir: Path,训练输出目录(保存 checkpoint、日志、评估视频)。 resume,是否从 checkpoint 续训(需指定 checkpoint_path) seed,随机种子(控制模型初始化、数据 shuffle、评估环境随机性,确保复现)。 num_workers,数据加载线程数,用于加速数据预处理。 batch_size,训练批次大小,即单步输入样本数。 steps,总训练步数,每次参数更新计为1步。 log_freq,日志记录频率,每 200 步打印一次训练指标,如 loss、梯度范数。 eval_freq,评估频率(每 20,000 步在环境中测试策略性能,计算成功率、平均奖励) save_checkpoint,是否保存 checkpoint(模型权重、优化器状态),用于续训。 wandb,Weights & Biases 日志配置(控制是否上传指标、视频至 WandB) use_policy_training_preset,是否使用策略内置的训练预设(如 ACT 策略默认 AdamW 优化器、学习率)。 optimizer,优化器配置(如学习率、权重衰减,仅当 use_policy_training_preset=False 时需手动设置)。 scheduler,学习率调度器配置(如余弦退火,同上)。 (2)核心方法 post_init:初始化实例后设置 checkpoint_path(断点续训时的路径),为后续配置校验做准备。 validate:确保所有配置参数合法且一致,例如续训时校验 checkpoint 路径存在,自动生成唯一输出目录(避免覆盖),强制要求非预设模式下手动指定优化器。 _save_pretrained:将配置保存为 JSON 文件(train_config.json),用于 Hub 共享或本地存储。 from_pretrained::从 Hub 或本地路径加载配置(支持断点续训或复用已有配置)。 数据与模型准备 # 1. 加载离线数据集(如机器人操作轨迹数据) logging.info("Creating dataset") dataset = make_dataset(cfg) # 从HuggingFace Hub或本地路径加载数据集 # 2. 初始化策略模型(如ACT、Diffusion Policy) logging.info("Creating policy") policy = make_policy( cfg=cfg.policy, # 策略配置(如ACT的transformer层数、视觉编码器类型) ds_meta=dataset.meta, # 数据集元信息(输入/输出维度、特征类型) ) # 3. 创建优化器和学习率调度器 logging.info("Creating optimizer and scheduler") optimizer, lr_scheduler = make_optimizer_and_scheduler(cfg, policy) # 默认AdamW优化器 grad_scaler = GradScaler(device.type, enabled=cfg.policy.use_amp) # 混合精度训练梯度缩放器 make_dataset(cfg):根据配置加载数据集(如 laumy/record-07271539),返回 LeRobotDataset 对象,包含观测(图像、关节状态)和动作序列。 make_policy(...):根据 policy.type(如 act)和数据集元信息初始化模型,自动适配输入维度(如图像分辨率、状态维度)和输出维度(如动作空间大小)。 make_optimizer_and_scheduler(cfg, policy):创建优化器(默认AdamW,学习率 1e-5)和调度器(默认无,可配置余弦退火等),支持对不同参数组设置不同学习率(如视觉 backbone 微调)。 数据加载 数据加载调用的是make_dataset函数,其是LeRobot 框架中数据集创建的核心工厂函数,负责根据训练配置(TrainPipelineConfig)初始化离线机器人数据集。它整合了图像预处理、时序特征处理(delta timestamps)和数据集元信息加载,最终返回可直接用于训练的 LeRobotDataset 对象。 根据数据集配置初始化图像预处理管道(如Resize、Normalize、RandomCrop等)。若 cfg.dataset.image_transforms.enable=True(通过命令行或配置文件设置),则创建 ImageTransforms 实例,加载预设的图像变换参数(如分辨率、是否翻转等),否则不进行图像预处理。 image_transforms = ( ImageTransforms(cfg.dataset.image_transforms) if cfg.dataset.image_transforms.enable else None ) (1)单数据集加载 if isinstance(cfg.dataset.repo_id, str): # 加载数据集元信息(特征定义、统计数据、帧率等) ds_meta = LeRobotDatasetMetadata( cfg.dataset.repo_id, root=cfg.dataset.root, revision=cfg.dataset.revision ) # 计算时序偏移(delta timestamps) delta_timestamps = resolve_delta_timestamps(cfg.policy, ds_meta) # 创建 LeRobotDataset 实例 dataset = LeRobotDataset( cfg.dataset.repo_id, # 数据集标识(HuggingFace Hub repo_id 或本地路径) root=cfg.dataset.root, # 本地缓存根目录 episodes=cfg.dataset.episodes, # 指定加载的轨迹片段(如 ["ep001", "ep002"]) delta_timestamps=delta_timestamps, # 时序特征偏移(见下文详解) image_transforms=image_transforms, # 图像预处理管道 revision=cfg.dataset.revision, # 数据集版本(如 Git commit hash) video_backend=cfg.dataset.video_backend, # 视频解码后端(如 "pyav") ) 首先实例化LeRobotDatasetMetadata,加载数据集的信息,包括特征定义如observation.images.laptop 的形状、action的维度等,以及统计信息如图像均值/方差、动作范围,还有帧率fps以便时序偏移计算。 其次调用resolve_delta_timestamps根据模型计算时序特征偏移,例如如果策略需要当前帧及前2帧的观测,则生成[-0.04, -0.02, 0](单位:秒),用于从数据中提取多时序特征。 接着实例化LeRobotDataset,其实现数据加载、时序特征拼接、图像预处理等功能,为悬链提供批次化数据,具体见http://www.laumy.tech/2332.html#h37 (2)多数据集支持 else: raise NotImplementedError("The MultiLeRobotDataset isn't supported for now.") # 以下为预留的多数据集加载代码(暂未实现) dataset = MultiLeRobotDataset( cfg.dataset.repo_id, # 多数据集标识列表(如 ["repo1", "repo2"]) image_transforms=image_transforms, video_backend=cfg.dataset.video_backend, ) logging.info(f"多数据集索引映射: {pformat(dataset.repo_id_to_index)}") 预留多数据集合并功能(如融合不同场景的机器人轨迹数据),目前未实现,直接抛出 NotImplementedError。 (3)imsageNet统计量替换 if cfg.dataset.use_imagenet_stats: for key in dataset.meta.camera_keys: # 遍历所有相机图像特征(如 "observation.images.laptop") for stats_type, stats in IMAGENET_STATS.items(): # IMAGENET_STATS = {"mean": [...], "std": [...]} dataset.meta.stats[key][stats_type] = torch.tensor(stats, dtype=torch.float32) 其目的主要将数据集图像的归一化统计量(均值/方差)替换为 ImageNet 预训练模型的统计量。当使用预训练视觉编码器(如 ResNet)时,用 ImageNet 统计量归一化图像,可提升模型迁移学习效果(避免因数据集自身统计量导致的分布偏移)。 模型加载 # 初始化策略模型(如ACT、Diffusion Policy) logging.info("Creating policy") policy = make_policy( cfg=cfg.policy, # 策略配置(如ACT的transformer层数、视觉编码器类型) ds_meta=dataset.meta, # 数据集元信息(输入/输出维度、特征类型) ) make_policy 是 LeRobot 框架中策略模型实例化的核心工厂函数,负责根据配置(PreTrainedConfig)、数据集元信息(ds_meta)或环境配置(env_cfg),动态创建并初始化策略模型(如 ACT、Diffusion、TDMPC 等)。其核心作用是自动适配策略输入/输出维度(基于数据或环境特征),并支持加载预训练权重或初始化新模型。 在函数中,根据策略类型cfg.type,如 "act"、"diffusion")动态获取对应的策略类,如若 cfg.type = "act",get_policy_class 返回 ACTPolicy 类(ACT 策略的实现)。如果模型需要明确输入特征(如图像、状态)和输出特征(如动作)的维度,需进一步通过数据集或环境解析。 if ds_meta is not None: features = dataset_to_policy_features(ds_meta.features) # 从数据集元信息提取特征 kwargs["dataset_stats"] = ds_meta.stats # 数据集统计量(用于输入归一化,如图像均值/方差) else: if not cfg.pretrained_path: logging.warning("无数据集统计量,归一化模块可能初始化异常") # 无预训练时,缺少数据统计量会导致归一化参数异常 features = env_to_policy_features(env_cfg) # 从环境配置提取特征(如 Gym 环境的观测/动作空间) 如果定义了离线数据进行解析特征,否则基于环境解析特征。 获取到特征后进行配置输入/输出特征。 cfg.output_features = {key: ft for key, ft in features.items() if ft.type is FeatureType.ACTION} cfg.input_features = {key: ft for key, ft in features.items() if key not in cfg.output_features} 将解析后的特征划分为输入特征(如图像、状态)和输出特征(仅动作,FeatureType.ACTION),并更新到策略配置 cfg 中。例如若 features 包含 "observation.images.laptop"(图像)和 "action"(动作),则:input_features = {"observation.images.laptop": ...},output_features = {"action": ...}。 接着对模型进行实例化,也就是预训练模型的加载或新模型的初始化。 if cfg.pretrained_path: # 加载预训练策略(如从 HuggingFace Hub 或本地路径) kwargs["pretrained_name_or_path"] = cfg.pretrained_path policy = policy_cls.from_pretrained(**kwargs) # 调用策略类的 from_pretrained 方法加载权重 else: # 初始化新模型(随机权重) policy = policy_cls(** kwargs) # 传入配置和特征信息初始化模型结构 最后将模型迁移到目标设备,如cuda:0或cpu。 policy.to(cfg.device) # 将模型移至目标设备(如 "cuda:0"、"cpu") assert isinstance(policy, nn.Module) # 确保返回的是 PyTorch 模型 return policy 创建优化器 # 创建优化器和学习率调度器 logging.info("Creating optimizer and scheduler") optimizer, lr_scheduler = make_optimizer_and_scheduler(cfg, policy) # 默认AdamW优化器 grad_scaler = GradScaler(device.type, enabled=cfg.policy.use_amp) # 混合精度训练梯度缩放器 负责初始化训练核心组件:优化器(更新模型参数)、学习率调度器(动态调整学习率)和梯度缩放器(混合精度训练支持)。三者共同构成策略模型的“参数更新引擎”,直接影响训练效率和收敛稳定性。 (1)优化器与调度器创建 通过工厂函数 make_optimizer_and_scheduler 动态创建优化器和学习率调度器,参数来源于配置 cfg 和策略模型 policy。该函数根据 TrainPipelineConfig 中的 optimizer 和 scheduler 配置,或策略预设(use_policy_training_preset=True),生成优化器和调度器。其中优化器默认使用 AdamW(带权重衰减的 Adam),参数包括学习率(cfg.optimizer.lr)、权重衰减系数(cfg.optimizer.weight_decay)等,优化对象为 policy.parameters()(策略模型的可学习参数);而调度器如果配置了 scheduler.type(如 "cosine"),则创建对应学习率调度器(如余弦退火调度器),否则返回 None。 若 cfg.use_policy_training_preset=True(默认),则直接使用策略内置的优化器参数(如 ACT 策略默认 lr=3e-4,weight_decay=1e-4),无需手动配置 optimizer 和 scheduler。 (2)梯度缩放器初始化 GradScaler用于解决低精度(如 float16)训练中的梯度下溢问题。device.type参数模型所在设备类型("cuda"/"mps"/"cpu"),确保缩放器与设备匹配。参数enabled=cfg.policy.use_amp确定是否启用混合精度训练(由策略配置 use_amp 控制)。若为 False,缩放器将禁用(梯度不缩放)。 本质原理是混合精度训练时,前向传播使用低精度(加速计算),但梯度可能因数值过小而下溢(变为 0)。GradScaler 通过梯度缩放(放大损失值 → 梯度按比例放大 → 更新时反缩放)避免下溢,同时保证参数更新精度。 数据加载器配置 # 创建时序感知采样器(针对机器人轨迹数据) if hasattr(cfg.policy, "drop_n_last_frames"): # 如ACT策略需丢弃轨迹末尾帧 sampler = EpisodeAwareSampler( dataset.episode_data_index, # 轨迹索引信息 drop_n_last_frames=cfg.policy.drop_n_last_frames, # 丢弃每段轨迹末尾N帧 shuffle=True, # 轨迹内随机打乱 ) else: sampler = None # 普通随机采样 # 构建DataLoader(多线程加载+内存锁定) dataloader = torch.utils.data.DataLoader( dataset, num_workers=cfg.num_workers, # 数据加载线程数(加速IO) batch_size=cfg.batch_size, # 批次大小 sampler=sampler, pin_memory=device.type != "cpu", # 内存锁定(加速CPU→GPU数据传输) ) dl_iter = cycle(dataloader) # 循环迭代器(数据集遍历完后自动重启) EpisodeAwareSampler:确保采样的batch包含完整轨迹片段(避免时序断裂),适配机器人操作等时序依赖任务。 cycle(dataloader):将DataLoader转换为无限迭代器,支持训练步数(cfg.steps)远大于数据集长度的场景。 采样器选择 hasattr(cfg.policy, "drop_n_last_frames")用于检查模型中是否支持drop_n_last_frames 属性(如 ACT 策略需丢弃每段轨迹的最后 N 帧,避免无效数据)。如果支持,则启用时序感知采样EpisodeAwareSampler。器策略依赖时序连续的轨迹数据(如机器人操作的连贯动作序列)。核心参数如下: dataset.episode_data_index:数据集轨迹索引(记录每段轨迹的起始/结束位置),确保采样时不跨轨迹断裂时序。 drop_n_last_frames=cfg.policy.drop_n_last_frames:丢弃每段轨迹的最后 N 帧(如因传感器延迟导致的无效帧)。 shuffle=True:轨迹内部随机打乱(但保持轨迹内时序连续性),平衡随机性与时序完整性。 若模型策略中不支持时序感知采样,那么则采用普通的随机采样,使用默认随机采样(shuffle=True,sampler=None),DataLoader 直接对数据集全局打乱。 数据加载管道 dataloader = torch.utils.data.DataLoader基于采样器配置,创建 PyTorch DataLoader,实现多线程并行数据加载,为训练循环提供高效的批次数据。 num_workers=cfg.num_workers:使用配置的线程数并行加载数据(如 8 线程),避免数据加载成为训练瓶颈。 pin_memory=True(当使用 GPU 时):将数据加载到固定内存页,加速数据从 CPU 异步传输到 GPU,减少等待时间。 drop_last=False:保留最后一个可能不完整的批次(尤其在小数据集场景,避免数据浪费)。 batch_size:控制每个批次的样本数量。batch_size=8 时,所有张量第一维度为 8。 sampler:控制采样顺序(时序连续/随机)。EpisodeAwareSampler 确保动作序列来自同一段轨迹。 num_workers:控制并行加载的子进程数,影响数据加载速度(非数据结构)。num_workers=8 比单线程加载快 5-10 倍(取决于硬件)。注意其并行只会影响数据加载速度,不会影响训练速度,当前数据加载是持续循环进行,而非一次性完成,但是这个相对训练时间。通过控制并行数据加载进程数,减少 GPU 等待时间,提升整体效率。 dataloader 是 torch.utils.data.DataLoader 类的实例,本质是一个可迭代对象(iterable),用于按批次加载数据。其核心作用是将原始数据集(LeRobotDataset)转换为训练可用的批次化数据,支持多线程并行加载、自定义采样顺序等功能。 DataLoader 通过以下步骤将原始数据集(LeRobotDataset)转换为批次化数据 数据集索引采样:原始数据集 dataset(LeRobotDataset 实例),包含所有机器人轨迹数据。通过 sampler 参数(如 EpisodeAwareSampler 或默认随机采样)生成样本索引序列,决定数据加载顺序。 多线程并行加载:num_workers=cfg.num_workers:启动 num_workers 个子进程并行执行 dataset.getitem(index),从磁盘/内存中加载单个样本数据(如读取图像、解析状态)。 样本拼接:默认行为:DataLoader 使用 torch.utils.data.default_collate 函数,将多个单样本字典(来自不同子进程)拼接为批次字典,对每个特征键(如 "observation.images.laptop"),将所有单样本张量(shape=[C, H, W])堆叠为批次张量(shape=[B, C, H, W]);非张量数据(如列表)会被转换为张量或保留为列表(取决于数据类型)。 内存优化:pin_memory=device.type != "cpu":当使用 GPU 时,启用内存锁定(pin memory),将加载的张量数据存入 CPU 的固定内存页,加速后续异步传输到 GPU 的过程(batch[key].to(device, non_blocking=True)) 循环迭代器 dl_iter = cycle(dataloader) 将 DataLoader 转换为无限迭代器,支持按“训练步数”(cfg.steps)而非“ epochs” 训练。离线训练通常以固定步数(如 100,000 步)为目标,而非遍历数据集次数(epochs)。当数据集较小时,cycle(dataloader) 可在数据集遍历结束后自动重启,确保训练步数达标。 那dataloader输出的批次数据结构长什么样的? 当通过 next(dl_iter) 获取批次数据时(如代码中 first_batch = next(dl_iter)),返回的是一个字典类型的批次数据,结构如下: dl_iter = cycle(dataloader) first_batch = next(dl_iter) for key, value in first_batch.items(): if isinstance(value, torch.Tensor): print(f"{key}: shape={value.shape}, dtype={value.dtype}") else: print(f"{key}: type={type(value)}") 打印如下: observation.images.handeye: shape=torch.Size([8, 3, 480, 640]), dtype=torch.float32 observation.images.fixed: shape=torch.Size([8, 3, 480, 640]), dtype=torch.float32 action: shape=torch.Size([8, 100, 6]), dtype=torch.float32 observation.state: shape=torch.Size([8, 6]), dtype=torch.float32 timestamp: shape=torch.Size([8]), dtype=torch.float32 frame_index: shape=torch.Size([8]), dtype=torch.int64 episode_index: shape=torch.Size([8]), dtype=torch.int64 index: shape=torch.Size([8]), dtype=torch.int64 task_index: shape=torch.Size([8]), dtype=torch.int64 action_is_pad: shape=torch.Size([8, 100]), dtype=torch.bool task: type=<class 'list'> 可以看到一个batch有8组数据,因为TrainPipelineConfig::batch_size设置的8,控制每个批次的样本数量,batch_size定义了每次参数更新是输入模型的样本数量,如这里的8,就是表示输入8个样本更新一次参数。batch_size的设定要根据模型大小、GPU内存、数据特征综合确定。 如果batch_size过小,批次(如 batch_size=1)的梯度受单个样本噪声影响大,导致参数更新方向不稳定,Loss 曲线剧烈震荡(如下图示例),难以收敛到稳定最小值,如果模型使用了BN层,过小的批次会导致BN统计量(均值/方差)估计不准,影响特征表达。同时GPU利用率低,训练速度会变慢; 如果batch_size过大最直接的影响就是GPU内存会溢出,同时会导致收敛速度变慢或陷入次优解。 一般情况下,若数据集样本量少(如仅 1k 样本),可设 batch_size=32(全量数据集的 3%),避免批次占比过大导致过拟合;若模型中含有BN层,batch_size 建议 ≥ 16,确保 BN 统计量稳定。 batch_size 状态 典型问题 解决方案 过大(OOM) GPU 内存溢出,收敛慢 减小批次/降低图像分辨率/梯度累积 过小(<4) Loss 波动大,GPU 利用率低 增大批次至 8-32(需满足内存) 合理(8-32) 梯度稳定,GPU 利用率高(80%-90%) 维持默认或根据模型/数据微调 batch_size 的核心是平衡内存、速度与收敛性,建议从默认值开始,结合硬件条件和训练监控动态调整。 开始训练 训练模式设置 policy.train() 将策略模型切换为训练模式,确保所有层(如 Dropout、BatchNorm)按训练逻辑运行。PyTorch模型模式有差异,分为训练模式和评估模式。 训练模式:启用 Dropout(随机丢弃神经元防止过拟合)、BatchNorm 更新运行时统计量(均值/方差)。 评估模式(policy.eval()):关闭 Dropout、BatchNorm 使用训练阶段累积的统计量。 在训练循环前显式调用,避免因模型残留评估模式导致训练效果异常(如 Dropout 未激活导致过拟合)。 指标跟踪初始化 train_metrics = { "loss": AverageMeter("loss", ":.3f"), # 训练损失(格式:保留3位小数) "grad_norm": AverageMeter("grdn", ":.3f"), # 梯度范数(格式:缩写"grdn",保留3位小数) "lr": AverageMeter("lr", ":0.1e"), # 学习率(格式:科学计数法,保留1位小数) "update_s": AverageMeter("updt_s", ":.3f"), # 单步更新耗时(格式:缩写"updt_s",保留3位小数) "dataloading_s": AverageMeter("data_s", ":.3f"),# 数据加载耗时(格式:缩写"data_s",保留3位小数) } 通过 AverageMeter 类(来自 lerobot.utils.logging_utils)定义需跟踪的核心训练指标,支持实时平均计算和格式化输出训练信息,这个类实例最终通过参数传递给MetricsTracker。AverageMeter 功能为内部维护 sum(累积和)、count(样本数)、avg(平均值),通过 update 方法更新指标,并按指定格式(如 ":.3f")输出。例:每步训练后调用 train_metrics["loss"].update(loss.item()),自动累积并计算平均 loss。 train_tracker = MetricsTracker( cfg.batch_size, # 批次大小(用于计算每样本指标) dataset.num_frames, # 数据集总帧数(用于进度比例计算) dataset.num_episodes, # 数据集总轨迹数(辅助日志上下文) train_metrics, # 上述定义的指标跟踪器字典 initial_step=step # 初始步数(支持断点续训时从上次步数开始跟踪) ) 创建 MetricsTracker 实例(来自 lerobot.utils.logging_utils),用于聚合、格式化和记录所有训练指标,主要的作用如下: 指标更新:训练循环中通过 train_tracker.loss = loss.item() 便捷更新单个指标。 平均计算:自动对 AverageMeter 指标进行滑动平均(如每 log_freq 步输出平均 loss)。 日志输出:调用 logging.info(train_tracker) 时,按统一格式打印所有指标(如 loss: 0.523 | grdn: 1.234 | lr: 3.0e-4)。 断点续训支持:通过 initial_step=step 确保从断点恢复训练时,指标统计不重复计算。 总结下该代码段是训练前的关键初始化步骤,通过 AverageMeter 和 MetricsTracker 构建训练全流程的指标监控框架,为后续训练循环中的指标更新、日志记录和性能调优提供参考。 启动循环训练 logging.info("Start offline training on a fixed dataset") # 启动训练循环,从当前 step(初始为 0 或断点续训的步数)迭代至 cfg.steps(配置文件中定义的总训练步数,如 100,000 步)。 for _ in range(step, cfg.steps): # 1. 加载数据batch batch = next(dl_iter) # 从循环迭代器获取batch batch = {k: v.to(device, non_blocking=True) for k, v in batch.items() if isinstance(v, torch.Tensor)} # 数据移至设备 # 2. 单步训练(前向传播→损失计算→反向传播→参数更新) train_tracker, output_dict = update_policy( train_tracker, policy, batch, optimizer, cfg.optimizer.grad_clip_norm, # 梯度裁剪阈值(默认1.0) grad_scaler=grad_scaler, use_amp=cfg.policy.use_amp, # 启用混合精度训练 ) # 3. 训练状态更新与记录 step += 1 if is_log_step: # 按log_freq记录训练指标(loss、梯度范数等) logging.info(train_tracker) if is_saving_step: # 按save_freq保存模型checkpoint save_checkpoint(checkpoint_dir, step, cfg, policy, optimizer, lr_scheduler) if is_eval_step: # 按eval_freq在环境中评估策略性能 eval_info = eval_policy(eval_env, policy, cfg.eval.n_episodes) # 执行评估 update_policy(...):核心训练函数,实现: -- 前向传播:policy.forward(batch) 计算损失(如动作预测MSE损失+VAE KL散度)。 -- 反向传播:grad_scaler.scale(loss).backward() 缩放损失梯度(混合精度训练)。 -- 梯度裁剪:torch.nn.utils.clip_grad_norm_ 限制梯度范数(防止梯度爆炸)。 -- 参数更新:grad_scaler.step(optimizer) 更新模型参数,optimizer.zero_grad() 清空梯度缓存。 save_checkpoint(...):保存模型权重、优化器状态、学习率调度器状态和当前步数,支持断点续训。 eval_policy(...):在仿真环境中测试策略性能,计算平均奖励、成功率等指标,并保存评估视频。 上面代码是train 函数的核心训练循环,负责执行离线训练的完整流程:从数据加载、模型参数更新,到指标记录、模型保存与策略评估。循环以“训练步数”(step)为驱动,从初始步数(0 或断点续训的步数)运行至目标步数(cfg.steps),确保模型充分训练并实时监控性能。 (1)循环初始化,按步数迭代 for _ in range(step, cfg.steps): 启动训练循环,从当前 step(初始为 0 或断点续训的步数)迭代至 cfg.steps(配置文件中定义的总训练步数,如 100,000 步)。 (2)数据加载与耗时记录 start_time = time.perf_counter() batch = next(dl_iter) # 从无限迭代器获取批次数据 train_tracker.dataloading_s = time.perf_counter() - start_time # 记录数据加载耗时 dl_iter = cycle(dataloader):cycle 将 DataLoader 转换为无限迭代器,数据集遍历完毕后自动重启,确保训练步数达标(而非受限于数据集大小)。 dataloading_s 指标:通过 train_tracker 记录单批次加载时间,用于监控数据加载是否成为训练瓶颈(若该值接近模型更新时间 update_s,需优化数据加载)。 (3)批次数据设备迁移 for key in batch: if isinstance(batch[key], torch.Tensor): batch[key] = batch[key].to(device, non_blocking=True) 将批次中的张量数据(如图像、动作)异步传输到目标设备(GPU/CPU)。其中non_blocking=True:启用异步数据传输,允许 CPU 在数据传输至 GPU 的同时执行后续计算(如模型前向传播准备),提升硬件利用率。 (4)模型参数更新 train_tracker, output_dict = update_policy( train_tracker, policy, batch, optimizer, cfg.optimizer.grad_clip_norm, grad_scaler=grad_scaler, lr_scheduler=lr_scheduler, use_amp=cfg.policy.use_amp, ) 调用 update_policy 函数执行单次参数更新,流程包括: 前向传播:计算模型输出和损失(loss = policy.forward(batch))。 混合精度训练:通过 torch.autocast 启用低精度计算(若 use_amp=True),加速训练并节省显存。 反向传播:梯度缩放(grad_scaler.scale(loss).backward())避免数值下溢,梯度裁剪(clip_grad_norm_)防止梯度爆炸。 参数更新:优化器.step() 更新参数,学习率调度器.step() 动态调整学习率。 指标记录:返回更新后的训练指标(loss、梯度范数、学习率等)。 (5)步数递增与状态跟踪 step += 1 # 步数递增(在更新后,确保日志/评估对应已完成的更新) train_tracker.step() # 更新指标跟踪器的当前步数 step用于更新当前系统的训练步数,是整个训练过程的基础计算器。train_tracker.step()通知训练指标跟踪器进入新阶段。 is_log_step = cfg.log_freq > 0 and step % cfg.log_freq == 0 is_saving_step = step % cfg.save_freq == 0 or step == cfg.steps is_eval_step = cfg.eval_freq > 0 and step % cfg.eval_freq == 0 通过计算这些标志,用于后续的逻辑控制。 is_log_step:日志打印标志位,当配置的日志频率(cfg.log_freq)大于0且当前步数是cfg.log_freq 倍数时激活日志的打印。cfg.log_freq是来自用户的命令行参数的配置。默认是200步打印一次。 is_saving_step:保存标志位,当配置的保存频率相等或是其倍数时激活保存。默认是20000步保存一次。 is_eval_step:模型评估标志为。默认是20000评估一次。 标志通过模块化设计实现了训练过程的精细化控制,参数均来自TrainConfig配置类。 (6)训练指标日志(按频率触发) is_log_step = cfg.log_freq > 0 and step % cfg.log_freq == 0 if is_log_step: logging.info(train_tracker) # 控制台打印平均指标(如 loss: 0.523 | grdn: 1.234) if wandb_logger: wandb_log_dict = train_tracker.to_dict() if output_dict: wandb_log_dict.update(output_dict) # 合并模型输出指标(如策略特定的辅助损失) wandb_logger.log_dict(wandb_log_dict, step) # 上传至 WandB train_tracker.reset_averages() # 重置平均计数器,准备下一轮统计 根据前面计算的is_log_step满足时,默认是200步,则调用logging.info()输出训练的指标,如损失、准确率。如果启动了wandb,则将跟踪器数据转换为字典,合并额外输出(output_dict)后记录到WandB,关联当前步数。最后调用train_tracker.reset_averages() 清除跟踪累计值,为下一周期计数做准备。 (7)模型 checkpoint 保存(按频率触发) if cfg.save_checkpoint and is_saving_step: logging.info(f"Checkpoint policy after step {step}") checkpoint_dir = get_step_checkpoint_dir(cfg.output_dir, cfg.steps, step) save_checkpoint(checkpoint_dir, step, cfg, policy, optimizer, lr_scheduler) # 保存模型、优化器、调度器状态 update_last_checkpoint(checkpoint_dir) # 更新 "last" 软链接指向最新 checkpoint if wandb_logger: wandb_logger.log_policy(checkpoint_dir) # 上传 checkpoint 至 WandB artifacts 默认是启动了save_checkpoint, 每20000步将训练结束的状态进行保存一次,便于支持断点续训和模型版本管理。其中save_checkpoint函数和输出目录结构如下: 005000/ # training step at checkpoint ├── pretrained_model/ │ ├── config.json # 存储模型架构定义,包括网络层数、隐藏维度、激活函数类型等拓扑结构信息 │ ├── model.safetensors # 采用SafeTensors格式存储模型权重,包含所有可学习参数的张量数据,具有内存安全和高效加载特性 │ └── train_config.json # 序列化的训练配置对象,包含超参数(学习率、批大小)、数据路径、预处理策略等完整训练上下文 └── training_state/ ├── optimizer_param_groups.json # 记录参数分组信息,包括不同层的学习率、权重衰减等差异化配置 ├── optimizer_state.safetensors # 保存优化器动态状态,如Adam的一阶矩(momentum)和二阶矩(variance)估计,SGD的动量缓冲区等 ├── rng_state.safetensors # 捕获PyTorch全局RNG和CUDA(如使用)的随机数状态,确保恢复训练时数据采样和权重初始化的一致性 ├── scheduler_state.json #学习率调度器的内部状态,包括当前调度阶段、预热状态、周期信息等 └── training_step.json #当前训练迭代次数,用于精确定位训练数据读取位置 pretrained_dir = checkpoint_dir / PRETRAINED_MODEL_DIR policy.save_pretrained(pretrained_dir) # 保存模型架构和权重 cfg.save_pretrained(pretrained_dir) # 保存训练配置 save_training_state(checkpoint_dir, step, optimizer, scheduler) # 保存训练动态状态 update_last_checkpoint用于维护一个指向最新检查点目录的符号链接(symlink),在训练过程中跟踪和管理最新的模型检查点。 def update_last_checkpoint(checkpoint_dir: Path) -> Path: # 1. 构建符号链接路径:在检查点父目录下创建名为 LAST_CHECKPOINT_LINK 的链接 last_checkpoint_dir = checkpoint_dir.parent / LAST_CHECKPOINT_LINK # 2. 如果符号链接已存在,则先删除旧链接 if last_checkpoint_dir.is_symlink(): last_checkpoint_dir.unlink() # 3. 计算当前检查点目录相对于父目录的相对路径 relative_target = checkpoint_dir.relative_to(checkpoint_dir.parent) # 4. 创建新的符号链接,指向当前检查点目录 last_checkpoint_dir.symlink_to(relative_target) (8)策略评估(按频率触发) is_eval_step = cfg.eval_freq > 0 and step % cfg.eval_freq == 0 if cfg.env and is_eval_step: step_id = get_step_identifier(step, cfg.steps) logging.info(f"Eval policy at step {step}") with torch.no_grad(), torch.autocast(...) if use_amp else nullcontext(): eval_info = eval_policy(eval_env, policy, cfg.eval.n_episodes, ...) # 在环境中执行策略评估 # 记录评估指标(平均奖励、成功率、耗时)并上传至 WandB eval_tracker = MetricsTracker(...) eval_tracker.avg_sum_reward = eval_info["aggregated"]["avg_sum_reward"] ... logging.info(eval_tracker) if wandb_logger: wandb_logger.log_dict(...) wandb_logger.log_video(...) # 上传评估视频 在强化学习和机器人控制领域,策略评估(Policy Evaluation) 是指在特定环境中系统性测试智能体策略(Policy)性能的过程。它通过执行预设数量的评估回合,收集关键指标(如奖励、成功率、执行时间等),客观衡量策略的实际效果。总结一下就是有以下4个核心作用。 性能监控:跟踪训练过程中策略性能的变化趋势,判断模型是否收敛或退化 过拟合检测:通过独立评估集验证策略泛化能力,避免在训练数据上过拟合 决策依据:基于评估指标决定是否保存模型、调整超参数或终止训练 行为分析:通过可视化记录(如代码中的评估视频)观察策略执行细节,发现异常行为 负责在指定训练步骤对策略进行系统性评估,并记录关键指标与可视化结果。代码块实现了周期性策略评估机制,当满足环境配置(cfg.env)和评估步骤标志(is_eval_step)时才会触发。 按 cfg.eval_freq(这里默认是20000步)在环境中评估策略性能,核心功能: 无梯度推理:torch.no_grad() 禁用梯度计算,节省显存并加速评估。 指标计算:通过 eval_policy 获取平均奖励(avg_sum_reward)、成功率(pc_success)等关键指标。 可视化:保存评估视频(如机器人执行任务的轨迹)并上传至 WandB,直观观察策略行为。 训练更新 训练环境准备 device = get_device_from_parameters(policy) policy.train() with torch.autocast(device_type=device.type) if use_amp else nullcontext(): loss, output_dict = policy.forward(batch) 先调用get_device_from_parameters从模型参数自动推断当前计算设备,确保后续张量操作与模型参数在同医社保上,避免跨设备数据传输错误。 接着调用policy.train()将模型切换为训练模式,主要是启动dropout层随机失活功能,激活BatchNormalization层的移动平均统计更新等,与评估模式区别推理时需要调用policy.eval()。 最后使用了with预计用于创建一个上下文管理器,在进入代码块是调用管理器enter()方法,退出时调用exit()方法,这种机制确保资源被正确获取和释放或在特定上下文中执行代码,其代码等价于如下。 if use_amp: context_manager = torch.autocast(device_type=device.type) else: context_manager = nullcontext() with context_manager: loss, output_dict = policy.forward(batch) 也就是当条件激活use_amp启用混合精度训练,否则使用空上下文,torch.autocast会动态选择最优精度,对数值稳定性要求高的操作保留FP32。 最后就是调用loss, output_dict = policy.forward(batch)这是模型前向计算的核心函数,返回的是损失值和输出字典。 batch包含的是训练数据(图像、关节动作等),policy.forward()处理输入并生成预测值并计算与真实值的差距loss,然后loss将用于后续梯队的计算。 这里在总结一下什么是混合精度训练? 混合精度训练(Mixed Precision Training)主要指 FP16(半精度浮点数)与 FP32(单精度浮点数)的混合使用。 FP16:用于模型前向/反向传播的计算(如矩阵乘法、激活函数等),占用内存少(仅为 FP32 的一半),且支持 GPU 硬件加速(如 NVIDIA Tensor Cores)。 FP32:用于存储模型参数、梯度和优化器状态,避免低精度导致的数值精度损失(如梯度下溢、参数更新不稳定)。 默认是使用F32精度进行,因为使用低精度FP16可以加速训练速度(计算量小,可以并发更多数据)、降低内存(仅占用FP32一半的内存)等好处,所以代码中如果use_amp=True,将启动混合精度训练。但是低精度也有个坏处就是数值精度会损失导致梯度为0参数不稳定,但是可以通过GradScaler进行放大loss进而放大梯度,反向传播计算完了之后再反缩放回来,确保梯度进行不丢失。 梯队计算 grad_scaler.scale(loss).backward() grad_scaler.unscale_(optimizer) grad_norm = torch.nn.utils.clip_grad_norm_( policy.parameters(), grad_clip_norm, error_if_nonfinite=False, ) 这段代码是训练处理计算梯队的核心流程,主要就是用于计算梯队。 首先grad_scaler.scale(loss).backward()是将损失放大然后间接放大梯队,grad_scaler 是 PyTorch 的 GradScaler 实例,用于自动混合精度训练中管理梯度缩放,scale(loss)将损失值放大scaler倍(通常是2^n),避免梯度在反向传播中因数值过小而下溢为0。在混合精度训练(如使用 FP16)时,小梯度可能因数值精度不足而“下溢”(变为 0)。通过 scale(loss) 将损失值放大(例如放大 2^k 倍),反向传播时梯度也会同步放大,避免梯度下溢。最后backward()触发反向传播,计算所有可训练参数的梯度(此时梯度已被放大)。 其次再调用grad_scaler.unscale_(optimizer)将放大的梯度恢复到原始尺寸,由于损失被放大,方向传播的梯队也被同等放大,使用unscale_对所有参数梯度执行反缩放,相当于处于scaler。 最后是调用 torch.nn.utils.clip_grad_norm_执行梯度裁剪(Gradient Clipping),限制梯度的 L2 范数,防止梯度爆炸。其返回结果grad_norm是一个标量float,表示 梯度裁剪后所有参数梯度的总 L2 范数。 总结一下,段代码是混合精度训练中梯度处理的标准流程,解决了两个核心问题。 数值精度问题:通过 scale(loss) 和 unscale_(optimizer) 确保小梯度在低精度(如 FP16)下不丢失,同时恢复梯度原始范围用于后续优化。 梯度爆炸问题:通过 clip_grad_norm_ 限制梯度大小,避免过大梯度导致参数更新不稳定(例如权重跳变、Loss 震荡)。 但需要注意的是,如果没有启动混合精度grad_scaler.scale(loss)和grad_scaler.unscale_(optimizer)没有缩放和反缩放作用,但梯队采集逻辑正常运作。为启动混合精度时整个流程与标准 FP32 训练完全一致,grad_scaler 相关操作因「禁用状态」而自动失效,不会引入额外计算开销。 参数优化与更新 with lock if lock is not None else nullcontext(): grad_scaler.step(optimizer) grad_scaler.update() optimizer.zero_grad() if lr_scheduler is not None: lr_scheduler.step() 先试用with lock条件性启动线程锁,确保参数更新的线程安全。 grad_scaler.step(optimizer)执行参数更新,为了下一次重新计算损失。 接着调用grad_scaler.update()动态调整梯度的缩放因子,主要是自动平衡FP16的数值范围限制,在避免梯度下溢和溢出之间找到最优缩放比例。 最后就是调用optimizer.zero_grad()清除优化器中所有参数梯度缓存,因为Pytorch梯队计算默认都是累积模式(param.grad会累加),需要手动清零与loss.backward()配对,形成成"清零→前向→反向→更新→清零"的完整循环。 lr_scheduler.step()是按照批次更新学习率,可以使用循环学习率、余弦退化调度以及梯队的自适应调度等策略。 对于训练的闭环可以看成:scale(loss)→backward()→unscale_()→clip_grad_norm_()→step()→update()形成完整的混合精度训练流程,解决FP16数值范围限制问题。 最后update_policy返回的是train_metrics和output_dict。前者是传递给日志系统(如TensorBoard/WandB)进行可视化,后者是包含模型前向传播的详细输出(如预测值、中间特征),可用于后续分析 训练状态维护 if has_method(policy, "update"): policy.update() train_metrics.loss = loss.item() train_metrics.grad_norm = grad_norm.item() train_metrics.lr = optimizer.param_groups[0]["lr"] return train_metrics, output_dict 如果policy有update的方法,则调用进行更新,这里主要是做兼容性设计。 最后train_metrics主要是记录量化训练过程中的关键特征,为监控、调试和优化提供数据支撑。 训练收尾 # 训练结束后清理 if eval_env: eval_env.close() # 关闭评估环境(释放资源) logging.info("End of training") # 推送模型至HuggingFace Hub(若启用) if cfg.policy.push_to_hub: policy.push_model_to_hub(cfg) # 保存模型配置、权重至Hub,支持后续部署 eval_env.close():关闭仿真环境(如Gym/DM Control),释放显存和CPU资源。 policy.push_model_to_hub(cfg):将训练好的模型(权重+配置)推送至HuggingFace Hub,支持跨设备共享和部署。 总结
  • lerobot录制

    lerobot录制

    简介 lerobot record是关键核心流程,其包括了数据的采集和模型推理两部分。 如果是数据采集模式,命令启动如下 python -m lerobot.record \ --robot.disable_torque_on_disconnect=true \ --robot.type=so101_follower \ --robot.port=/dev/ttyACM0 \ --robot.id=R12252801 \ --robot.cameras="{ handeye: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30}, fixed: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}}" \ --teleop.type=so101_leader \ --teleop.port=/dev/ttyACM1 \ --teleop.id=R07252608 \ --dataset.repo_id=${HF_USER}/record-07271148\ --dataset.num_episodes=10 \ --dataset.reset_time_s=5 \ --dataset.push_to_hub=false \ --dataset.single_task="Grab the cube" \ --display_data=true 如果是模型推理模式,则命令如下: python -m lerobot.record \ --robot.type=so101_follower \ --robot.disable_torque_on_disconnect=true \ --robot.port=/dev/ttyACM0 \ --robot.cameras="{ handeye: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30}, fixed: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}}" \ --robot.id=R12252801 \ --display_data=false \ --dataset.single_task="Put brick into the box" \ --policy.path=outputs/weigh_07280842/pretrained_model \ --dataset.episode_time_s=240 \ --dataset.repo_id=${HF_USER}/eval_so101_07271148 默认录制时长是60s,60S后会停止,如果要改长加上--dataset.episode_time_s=640 主要的区别是如果是采集模式需要使用--teleop参数启动遥控机器,如果是模型推理模式则不需要启动遥控机器,但是需要指定模型路径--policy.path,本质上就是机器人的动作指令来源于哪里,要么来之遥控器的,要么来自模型推理出来的。 在阅读本文前,这里先做一个总结: 从上图可以看出整个录制流程主要围绕机器设备、遥控设备、模型、数据集四个要素进行展开。 机器:有SO101Follower、LeKiwi等机器,都继承Robot类。通过命令行参数robot.type调用make_robot_from_config函数选择创建具体的实例设备,函数返回的还是Robot但是指向的是具体的机器实例如SO101Follower,利用了多态的特性做到了解耦,如果要新添机器时,只需要参考SO101Follower添加一个新的设备即可。在创建机器实例时传递RobotConfig参数,这个参数依旧是抽象基类,其继承了draccus.ChoiceRregistry,通过命令行参数robot.type选择注册具体的配置如SO101FollowerConfig。 遥控:用于控制机器,常用于数据的采集。这里同样通过命令行参数teleop.type调用make_teleoperator_from_config函数选择创建具体的设备实例,创建实例时需要传递TeleoperatorConfig参数,其也是一个抽象基类,基于命令选择注册实例化的配置类参数,如SO101LeaderConfig。 模型:模型用于决策推理动作,其和遥控二选一,如果指定了遥控了,模型就不需要指定了。通用使用了机器、遥控的解耦机制,具体的实例化为ACT或DiffusionPolicy等。 数据:通过参数dataset.xxx将参数构建为DataRecordConfig类,然后将其中的信息传递给LeRobotDataset。 lerobot的整个代码框架将具体的设备、配置、模型等做到了解耦,方便拓展新的设备,其设计思想值得借鉴。下面本文将先按照执行命令启动流程可以分为参数解析、硬件设备初始化与连接、数据集初始化、采集循环、数据保存等几个阶段,接下来将按照这几个阶段进行介绍。 关键配置类 在介绍关键流程时,先来总结一下关键record流程需要的配置类数据结构,这些Config类主要用于机器、遥控、模型实例化传递的参数。 RecordConfig RecordConfig是整个record入口函数的传递参数,其通过命令行的参数构建形成RecordConfig对象传递给函数。 class RecordConfig: # 1. 核心以来配置 # 机器人硬件配置如型号、端口、相机参数等,有RobotConfig类定义 # 如s101_folloer的通信端口、关节限制等,必现通过命令行或代码显式传入。 robot: RobotConfig #数据集录制配置,如repo_id,num_episodes、fpsdeng,必现显式传入。 dataset: DatasetRecordConfig # 2.控制方式配置(可选) # 遥控操作器配置,如So100_leader,可选。 teleop: TeleoperatorConfig | None = None # 预训练策略配置,如模型路径、推理设备等。 policy: PreTrainedConfig | None = None #3. UI与反馈配置(可选) # 是否实时显示相机画面,通过rerun可视化工具 display_data: bool = False # 是否启用语音合成反馈,人机的提示声,默认开启。 play_sounds: bool = True # 是否从现有数据集续录 resume: bool = False def __post_init__(self): # 如果指定了policy,则进行加载模型 policy_path = parser.get_path_arg("policy") if policy_path: cli_overrides = parser.get_cli_overrides("policy") self.policy = PreTrainedConfig.from_pretrained(policy_path, cli_overrides=cli_overrides) self.policy.pretrained_path = policy_path # teleop和policy必须要指定一个 if self.teleop is None and self.policy is None: raise ValueError("Choose a policy, a teleoperator or both to control the robot") @classmethod def __get_path_fields__(cls) -> list[str]: """This enables the parser to load config from the policy using `--policy.path=local/dir`""" return ["policy"] RecordConfig中有几个关键的成员,分别是RobotConfig,DatasetRrcordConfig、TeleoperatorConfig、PreTrainedConfig。其中除了DatasetRrcordConfig外的其他几个都是继承draccus.ChoiceRegistry 和 abc.ABC,是一个抽象基类,需通过注册的子类(如特定机器人型号的配置类)实例化,种设计既保证了配置的结构化(继承 abc.ABC),又支持灵活的子类选择(通过 draccus.ChoiceRegistry 实现配置注册与解析)。 RobotConfig 控制硬件接口,确保机器人正确连接和数据采集; DatasetRecordConfig 控制数据存储,定义数据集格式和元信息; TeleoperatorConfig 和 PreTrainedConfig 控制机器人行为,分别对应手动和自动控制模式。 RobotConfig #标记为数据类,所有字段必现通过关键参数传入,自动生成__init__等方法 @dataclass(kw_only=True) #继承draccus框架选择注册机制,允许子类如SO100FollowerConfig、KochFollowerConfig #作为可选机器人型号注册,支持通过配置文件或命令行参数 #动态选择例如--robot.type=s101_follower #abc.ABC抽象基类,不可直接实例化,必现通过子类具体机器人型号配置使用。 class RobotConfig(draccus.ChoiceRegistry, abc.ABC): # 机器的唯一标识实例 id: str | None = None # 标定文件存储目录 calibration_dir: Path | None = None #是 dataclass 的初始化后钩子,用于补充参数校验逻辑,确保机器人配置的合法性。 #主要是检查cameras中的宽高、帧率等。 def __post_init__(self): if hasattr(self, "cameras") and self.cameras: for _, config in self.cameras.items(): for attr in ["width", "height", "fps"]: if getattr(config, attr) is None: raise ValueError( f"Specifying '{attr}' is required for the camera to be used in a robot" ) #通过 draccus.ChoiceRegistry 的 get_choice_name 方法,动态返回子类的注册名称(即机器人型号)。 @property def type(self) -> str: return self.get_choice_name(self.__class__) RobotConfig 是抽象基类(ABC),仅定义所有机器共有的通用配置字段,如id、calibration_dir。其继承了draccus.ChoiceRegistry实现了不同机器人型号的动态注册与选择。 下面以一个继承实例说明 @RobotConfig.register_subclass("so101_follower") @dataclass class SO101FollowerConfig(RobotConfig): # 机器的通信端口,如/dev/ttyACM0,通过--robot.port命令传入 port: str # 断开连接时是否关闭电机扭矩,通过命令--robot.disable_torque_on_disconnect disable_torque_on_disconnect: bool = True # 电机相对位置目标安全上限,防止运动幅度过大。 max_relative_target: int | None = None # 相机的配置通过字典的方式。 cameras: dict[str, CameraConfig] = field(default_factory=dict) #--robot.cameras="{ handeye: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30}, fixed: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}} # 是否使用角度制而非弧度制 use_degrees: bool = False 这是框架自定义的子类注册装饰器,作用是将 SO101FollowerConfig 类与字符串 so101_follower 绑定,使其成为 RobotConfig 的可选型号。装饰器内部会将 SO101FollowerConfig 类添加到 RobotConfig 的“子类注册表”中,键为 so101_follower,值为类本身。当用户通过命令行传入 --robot.type=so101_follower 时,框架会从注册表中查找并实例化该类。 总结一下,RobotConfig是机器的硬件配置,但是其是一个虚拟基类,其具体实例的机器型号通过子类继承RobotConfig,有很多型号的子类,其巧妙的使用了draccus.ChoiceRegistry注册机制(RobotConfig继承),通过参数--robot.type来指定具体实例化的设备。同时这里也使用了多态的特性,通过统一的RobotConfig接口操作不同实例的具体实现,如通过 robot.type 属性(基类定义),可以动态判断具体子类类型,并执行对应逻辑。 if cfg.robot.type == "so101_follower": # 通过基类接口获取子类类型 print(f"SO101 特有端口: {cfg.robot.port}") # 访问子类特有属性 elif cfg.robot.type == "so100_follower": print(f"SO100 特有IP: {cfg.robot.ip_address}") # 另一子类的特有属性 DatasetRecordConfig class DatasetRecordConfig: #1. 数据集标识与存储 #数据集唯一标识符,格式为 '{hf_username}/{dataset_name}' (e.g. `lerobot/test`),用于定位hugging Face Hub仓库或本地路径 repo_id: str #录制任务的文字描述,用于标注数据用途。 single_task: str # 数据集本地存储根目录,如果未指定使用~/.cache/huggingface/datasets root: str | Path | None = None # 2.录制控制参数 # 录制的帧率,控制数据采集帧率。 fps: int = 30 # 单段录制时长,默认60S episode_time_s: int | float = 60 # 重置环境时长,两端录制之间预留环境重置时间。 reset_time_s: int | float = 60 # 总录制段数,控制数据集包含样本数量(如50段*60S=3000S总数据) num_episodes: int = 50 #3. 数据处理与上传 # 是否将录制图像帧编码为视频文件,默认开启 video: bool = True # 是否上传数据集到hugging Face Hub,默认自动上传。 push_to_hub: bool = True # Hub仓库是否设置为私有,默认是公开 private: bool = False # 上传到hub上的数据集标签 tags: list[str] | None = None #4. 图像存储性能参数 #图像写入的线程数 num_image_writer_processes: int = 0 #每个相机图像写入的线程数 num_image_writer_threads_per_camera: int = 4 def __post_init__(self): if self.single_task is None: raise ValueError("You need to provide a task as argument in `single_task`.") #__post_init__ 是 dataclass 的初始化后钩子方法,在 __init__ 初始化所有字段后自动执行, # 用于补充校验逻辑。 # 确保 single_task 字段不为空。因为 single_task 是描述录制任务的核心元数据(无默认值且必选), # 若未提供则直接抛出错误,避免后续数据标注缺失关键信息。 DatasetRecordConfig 是数据集录制任务的参数容器,被嵌套在 RecordConfig 中(作为 RecordConfig.dataset 字段),最终通过 parser.wrap() 装饰器从命令行参数解析生成实例。 例如,用户通过命令行指定: --dataset.repo_id=aliberts/record-test --dataset.num_episodes=2 --dataset.single_task="Grab the cube" 这些参数会被解析为 DatasetRecordConfig 实例,其字段值(如 num_episodes=2)直接控制录制逻辑(如 record_loop 函数的循环次数)。 TeleoperatorConfig @dataclass(kw_only=True) class TeleoperatorConfig(draccus.ChoiceRegistry, abc.ABC): # Allows to distinguish between different teleoperators of the same type id: str | None = None # Directory to store calibration file calibration_dir: Path | None = None @property def type(self) -> str: return self.get_choice_name(self.__class__) TeleoperatorConfig 是远程遥控操作器(如手柄、键盘、 leader 机器人)的抽象配置基类,用于定义所有遥操作器共有的通用配置字段和动态选择机制。它与RobotConfig类似,同样继承了draccus.ChoiceRegistry 实现了注册机制,其具体的子类又继承TeleoperatorConfig。 @TeleoperatorConfig.register_subclass("so101_leader") @dataclass class SO101LeaderConfig(TeleoperatorConfig): # Port to connect to the arm port: str use_degrees: bool = False 用户通过-teleop.type指定来实例化具体的操作设备,如--teleop.type=so101_leader时,具体的流程如下。 框架通过 draccus.ChoiceRegistry 查找注册名称为 so101_leader 的子类(如 SO101LeaderConfig); 实例化该子类,接收命令行参数(如 --teleop.port=/dev/ttyACM0)并初始化特有字段(如 port); 最终通过 TeleoperatorConfig 基类引用(如 cfg.teleop)传入 make_teleoperator_from_config 函数,创建具体遥操作器实例。 PreTrainedConfig @dataclass class PreTrainedConfig(draccus.ChoiceRegistry, HubMixin, abc.ABC): #1. 观测与特征配置 # 策略输入的观测部署,如=2表示输入当前+前1步观测 n_obs_steps: int = 1 #特征归一化模式映射{"image":"mead_std"} normalization_mapping: dict[str, NormalizationMode] = field(default_factory=dict) #输入特征的规范{"image": PolicyFeature(type=VISUAL, shape=(3, 224, 224))}) input_features: dict[str, PolicyFeature] = field(default_factory=dict) #输出特征规范输出特征规范(定义策略输出的特征类型,如 {"action": PolicyFeature(type=ACTION, shape=(6,))}) output_features: dict[str, PolicyFeature] = field(default_factory=dict) #2. 设备与性能配置 # 策略运行设备如是否使用cuda或cpu device: str | None = None # cuda | cpu | mp #是否启用自动混合精度训练 use_amp: bool = False #3. hugging face Hub继承 #是否将配置上传到Hub push_to_hub: bool = False #Hub的id repo_id: str | None = None # 仓库是否私有 private: bool | None = None # 仓库的标签 tags: list[str] | None = None # Add tags to your policy on the hub. license: str | None = None #方法在__init__后执行,处理运行设备选择和AMP的可用性 def __post_init__(self): self.pretrained_path = None #自动选择可用设备,如果用户指定的设备不可用。 if not self.device or not is_torch_device_available(self.device): auto_device = auto_select_torch_device() logging.warning(f"Device '{self.device}' is not available. Switching to '{auto_device}'.") self.device = auto_device.type #自动禁用不支持AMP if self.use_amp and not is_amp_available(self.device): logging.warning( f"Automatic Mixed Precision (amp) is not available on device '{self.device}'. Deactivating AMP." ) self.use_amp = False #通过 draccus.ChoiceRegistry 的 get_choice_name 方法,返回子类注册的策略型号名称(如 sac、tdmpc),用于日志打印和策略选择逻辑 @property def type(self) -> str: return self.get_choice_name(self.__class__) #抽象方法 observation_delta_indices:返回观测特征的差分索引 action_delta_indices : 返回动作特征的差分索引 reward_delta_indices : 返回奖励特征的差分索引 et_optimizer_preset() : 返回优化器配置 get_scheduler_preset() : 返回学习率调度器配置(如 CosineAnnealing)。 validate_features() :校验 input_features 和 output_features 的合法性(如形状匹配) #将策略配置实例(含超参数、特征规范、设备设置等)序列化 #为标准 JSON 文件 config.json,存储到指定目录。这是策略配 #置上传到 Hugging Face Hub 的前置步骤——Hub 要求模型/配 #置必须包含 config.json 以描述其参数,而 _save_pretrained 正 #是生成该文件的标准化实现。例如,当调用 #config.push_to_hub() 时,框架会先调用 _save_pretrained 将 #配置保存到临时目录,再将该目录上传到 Hub,最终用户可通 #过 PreTrainedConfig.from_pretrained("repo_id") 加载此 JSON #配置。 def _save_pretrained(self, save_directory: Path) -> None: with open(save_directory / CONFIG_NAME, "w") as f, draccus.config_type("json"): draccus.dump(self, f, indent=4) #from_pretrained 是 PreTrainedConfig 的 核心类方法,用于从 #本地目录 或 Hugging Face Hub 加载预训练策略的配置文件 #(config.json),并实例化为具体的配置对象(如 SACConfig、 #TDMPCConfig)。它是策略配置“复用与共享”的入口,支持通过 #命令行参数覆盖配置,实现灵活的参数调整。 @classmethod def from_pretrained( cls: Type[T], pretrained_name_or_path: str | Path, *, force_download: bool = False, resume_download: bool = None, proxies: dict | None = None, token: str | bool | None = None, cache_dir: str | Path | None = None, local_files_only: bool = False, revision: str | None = None, **policy_kwargs, ) -> T: model_id = str(pretrained_name_or_path) config_file: str | None = None #从本地目录加载 if Path(model_id).is_dir(): if CONFIG_NAME in os.listdir(model_id): config_file = os.path.join(model_id, CONFIG_NAME) else: print(f"{CONFIG_NAME} not found in {Path(model_id).resolve()}") #从hugging face hub下载 else: try: config_file = hf_hub_download( repo_id=model_id, filename=CONFIG_NAME, revision=revision, cache_dir=cache_dir, force_download=force_download, proxies=proxies, resume_download=resume_download, token=token, local_files_only=local_files_only, ) except HfHubHTTPError as e: raise FileNotFoundError( f"{CONFIG_NAME} not found on the HuggingFace Hub in {model_id}" ) from e # HACK: this is very ugly, ideally we'd like to be able to do that natively with draccus # something like --policy.path (in addition to --policy.type) cli_overrides = policy_kwargs.pop("cli_overrides", []) with draccus.config_type("json"): return draccus.parse(cls, config_file, args=cli_overrides) PreTrainedConfig是所有策略模型如ACT,TDMPC的抽象配置类,定义了策略训练/推理所需的通用参数,特征规范、设备配置及Hugging Face Hub交互机制。它通过 dataclass、draccus.ChoiceRegistry 和 abc.ABC 实现“配置标准化”“多策略兼容”和“Hub 集成”,是策略初始化的核心参数载体。 draccus.ChoiceRegistry:通用跟前面的RobotConfig类似,支持动态子类注册,允许子类(如 SACConfig、TDMPCConfig)作为“策略选项”注册,支持通过 --policy.type=sac 动态选择。 HubMixin:Hugging Face Hub 交互混入类,提供 from_pretrained(从 Hub/本地加载配置)和 _save_pretrained(保存配置到 Hub/本地)方法,实现策略配置的共享与复用。 abc.ABC:抽象基类,包含未实现的抽象方法(如 get_optimizer_preset),强制子类必须实现核心逻辑,确保策略配置的完整性。 参数解析 输入的参数会draccus 解析器读取所有 --xxx 参数,映射到 RecordConfig 类(定义在 record.py 中),生成结构化配置对象 RecordConfig类型的cfg实例,关键的配置项如下: robot.type=so101_follower:指定机器人类型为 so101_follower(从动机器人)。 teleop.type=so101_leader:指定遥操作器类型为 so101_leader(主动遥操作器)。 dataset.num_episodes=10:采集10个回合数据。 display_data=true:启用 Rerun 可视化工具显示摄像头画面和机器人状态。 RecordConfig的cfg实例构造是,会调用RecordConfig.post_init 检查如single_task,--teleop.type以及-policy.type必现要选择一个。因为录制要么就是验证模式,要么就是数据采集模式。验证模式就是通过大模型推理得到的动作数据,而采集模式通过遥控臂得到的数据控制设备。 硬件初始化与连接 机器人的初始化so101_follower robot = make_robot_from_config(cfg.robot) robot.connect() 传入的参数是cfg.robot,cfg.robot是一个基类,实际实例化为so101_follower的实例,这里使用了多态的特性。根据传入的参数--robot.port=/dev/ttyACM0 连接到机器人串口,初始化通信协议(如 ROS 或自定义串口协议)。根据 --robot.cameras 配置两个 OpenCV 摄像头。--robot.disable_torque_on_disconnect=true 确保程序退出时机器人断电,避免碰撞风险。 遥控机器初始化so101_leader teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None teleop.connect() 遥控机器初始化是可选的,如果指定了模型即是验证的方式,那么就不用启动遥控机器了,只有采集数据的时候才需要遥控机器。 总结一下,通过make_robot_from_config根据传入的robot.type创建一个机器实例,这里是class SO101Follower(Robot)。如果是采集模式还会创建一个遥控机器人实例,通过make_teleoperator_from_config生成实例class SO101Leader(Teleoperator)。 数据集创建 数据特征定义 # 动作特征->数据集动作特征 action_features = hw_to_dataset_features(robot.action_features, "action", cfg.dataset.video) # 观测特征->数据集观测特征 obs_features = hw_to_dataset_features(robot.observation_features, "observation", cfg.dataset.video) # 整个动作特征、观测特征 dataset_features = {**action_features, **obs_features} from pprint import pprint print("Action Features:") pprint(action_features) print("Observation Features:") pprint(obs_features) print("Dataset Features:") pprint(dataset_features) 数据应该长什么样? 这总的要格式要求吧? hw_to_dataset_features就是将机器硬件特征(电机位置、摄像头图像)转换为数据集特征描述,也就是说数据要按照这个格式来。格式包含数据类型dtype、形状shape等。下面根据直接打印action_features、obs_features、dataset_features的打印的结果来分别分析。 Action Features: { 'action': { 'dtype': 'float32', 'shape': (6,), 'names': ['shoulder_pan.pos', 'shoulder_lift.pos', 'elbow_flex.pos', 'wrist_flex.pos', 'wrist_roll.pos', 'gripper.pos'] } } Observation Features: { 'observation.state': { # 机器人关节状态 'dtype': 'float32', 'shape': (6,), 'names': ['shoulder_pan.pos', ..., 'gripper.pos'] # 与动作特征电机名称一致 }, 'observation.images.handeye': { # 手眼相机图像 'dtype': 'video', 'shape': (480, 640, 3), 'names': ['height', 'width', 'channels'] }, 'observation.images.fixed': { # 固定视角相机图像 'dtype': 'video', 'shape': (480, 640, 3), 'names': ['height', 'width', 'channels'] } } 动作与观测特征 生成action_features、obs_features都是调用hw_to_dataset_features函数,下面来看看这个函数的实现。 # 函数入参为3个 # -hw_features: dict[str, type | tuple] 机器硬件特征字典,键为特征名称如关节名、摄像头名,值为特征的类型如float或图像尺寸元组(height,width,channels),注意这里的type是类型不是实际的数值 # -prefix: str 特征前缀,用于区分数据集不同的部分 # -use_video: bool 是否将摄像头图像编码为视频 def hw_to_dataset_features( hw_features: dict[str, type | tuple], prefix: str, use_video: bool = True ) -> dict[str, dict]: features = {} # 1.硬件特征分类,将关节特征和图像特征分出来 # jointfs是关键特征,通过for循环遍历hw_features.items()满足ftype为float。然后将key和ftype通过新的键值对存储到joint_fs中。 #示例输出:{"shoulder_pan.pos": float, "shoulder_lift.pos": float, ..., "gripper.pos": float}(共6个关节)。 joint_fts = {key: ftype for key, ftype in hw_features.items() if ftype is float} #cam_fs是摄像头的图像特征,值为(height, width, channels)元组。 #示例输出:{"handeye": (480, 640, 3), "fixed": (480, 640, 3)}(两个摄像头,分辨率480×640,RGB三通道)。 cam_fts = {key: shape for key, shape in hw_features.items() if isinstance(shape, tuple)} #2. 关节特征转换数值型 #根据传入的action动作指令,构建一个新的键值对,键为action。 if joint_fts and prefix == "action": features[prefix] = { "dtype": "float32", #统一数值类型为float32,适合模型训练 "shape": (len(joint_fts),),#形状关键数量,如6个关键,(6,) "names": list(joint_fts),#关节名称列表,与机人电机意义对应 } #3. 观测装特特征,与前面action结构类似,只是键值为observation.state if joint_fts and prefix == "observation": features[f"{prefix}.state"] = { "dtype": "float32", "shape": (len(joint_fts),), "names": list(joint_fts), } #4. 摄像头特征转换,为每个摄像头生成图像/视频存储特征 for key, shape in cam_fts.items(): features[f"{prefix}.images.{key}"] = { "dtype": "video" if use_video else "image", #存储类型:视频或图像 "shape": shape, #图像尺寸(height, width, channels) "names": ["height", "width", "channels"],#形状维度名称 } _validate_feature_names(features) return features 数据集的创建关键来源hw_features: dict[str, type | tuple]参数,该值来源于robot.action_features、robot.observation_features,根据参数的实例化以SO101Follower实例为例。 @property def _motors_ft(self) -> dict[str, type]: return {f"{motor}.pos": float for motor in self.bus.motors} @property def _cameras_ft(self) -> dict[str, tuple]: return { cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras } @cached_property def observation_features(self) -> dict[str, type | tuple]: return {**self._motors_ft, **self._cameras_ft} @cached_property def action_features(self) -> dict[str, type]: return self._motors_ft 电机位置特征结构_motors_ft定义了电机位置特征结构,描述关节运动的状态,其来源于self.bus.motors管理的电机列表,在Init中初始化,包含6个关节电机: motors={ "shoulder_pan": Motor(1, "sts3215", ...), # 肩转 "shoulder_lift": Motor(2, "sts3215", ...), # 肩抬 "elbow_flex": Motor(3, "sts3215", ...), # 肘弯 "wrist_flex": Motor(4, "sts3215", ...), # 腕弯 "wrist_roll": Motor(5, "sts3215", ...), # 腕转 "gripper": Motor(6, "sts3215", ...), # 夹爪 } 其输出格式返回字典 {电机名.pos: 数据类型},例如: { "shoulder_pan.pos": float, "shoulder_lift.pos": float, ..., "gripper.pos": float } float表示电机位置是浮点数值。 摄像头图像特征定义机器人摄像头图像特征的结构,用于描述视觉传感器数据格式。来源self.cameras 是由 make_cameras_from_configs 创建的摄像头实例(如 handeye、fixed 摄像头),配置来自 self.config.cameras(包含分辨率等参数)。其输出格式为返回字典 {摄像头名: (高度, 宽度, 通道数)},例如: { "handeye": (480, 640, 3), # 480px高、640px宽、RGB三通道 "fixed": (480, 640, 3) } 观测特征observation_features整合了电机位置和摄像头图像特征,定义机器人完整可观测装特,提供数据集录制和策略决策。通过字典解包(**)合并 _motors_ft(电机位置)和 _cameras_ft(摄像头图像),输出示例: { # 电机位置特征(来自 _motors_ft) "shoulder_pan.pos": float, "shoulder_lift.pos": float, ..., "gripper.pos": float, # 摄像头图像特征(来自 _cameras_ft) "handeye": (480, 640, 3), "fixed": (480, 640, 3) } 动作特征action_features定义机器人的动作指令格式,即遥控机器人。其直接服用_motors_ft,说明动作指令就是电机目标位置。 数据集特征合并 dataset_features = {**action_features, **obs_features} 合并动作特征与观测特征,形成数据集完整存储结构,用于初始化 LeRobotDataset。确保每个时间步的记录包含「观测→动作」的完整配对,满足训练需求(如模仿学习中,观测为输入,动作为标签)。 Dataset Features: { 'action': { 'dtype': 'float32', 'shape': (6,), 'names': [ 'shoulder_pan.pos', 'shoulder_lift.pos', 'elbow_flex.pos', 'wrist_flex.pos', 'wrist_roll.pos', 'gripper.pos' ] }, 'observation.state': { 'dtype': 'float32', 'shape': (6,), 'names': [ 'shoulder_pan.pos', 'shoulder_lift.pos', 'elbow_flex.pos', 'wrist_flex.pos', 'wrist_roll.pos', 'gripper.pos' ] }, 'observation.images.handeye': { 'dtype': 'video', 'shape': (480, 640, 3), 'names': ['height', 'width', 'channels'] }, 'observation.images.fixed': { 'dtype': 'video', 'shape': (480, 640, 3), 'names': ['height', 'width', 'channels'] } } action:是遥控操作机器(如SO101Leader)的关节位置数据 observation.state:机器人(如S0101Follower)实时采集的关节位置数据。 observation.images.xx:机器人实时采集的图像数据,可有多个。 创建数据集 创建数据集,是创建一个空的数据集。 传入的参数有repo_id标识了数据集的目录,root为数据集的根目录等等,具体如下阐述。 dataset = LeRobotDataset.create( cfg.dataset.repo_id, cfg.dataset.fps, root=cfg.dataset.root, robot_type=robot.name, features=dataset_features, use_videos=cfg.dataset.video, image_writer_processes=cfg.dataset.num_image_writer_processes, image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras), ) cfg.dataset.repo_id: 数据集的唯一标识,用于本地粗才能路径标识和上传hugging Face Hub上传的仓库名称 cfg.dataset.fps:录制帧率。 root=cfg.dataset.root:本地存储的路径,可以通过--dataset.root指定,如果没有提供,则默认使用缓存目录~/.cache/huggingface/ robot_type=robot.name:机器的唯一标识,会写入到meta.json中。 features=dataset_features:核心参数,数据集特征定义,由上一章节合并特征。 use_videos=cfg.dataset.video: 图像的存储格式,true表示使用mpr格式。False保留为PNG格式,占用空间大。内部在录制结束后,通过encode_episode_videos 调用 ffmpeg 将 PNG 序列转为视频(定义于 lerobot_dataset.py). image_writer_processes: 图像写入的进程数量,默认0,仅使用线程即一个进程。 image_writer_threads:图像写入的线程数,默认4线程/摄像头 × N摄像头。 LeRobotDataset.create LeRobotDataset.create返回的是一个LeRobotDataset对象实例。 典型的LeRobotDataset本地存储为: . ├── data │ ├── chunk-000 │ │ ├── episode_000000.parquet │ │ ├── episode_000001.parquet │ │ ├── episode_000002.parquet │ │ └── ... │ ├── chunk-001 │ │ ├── episode_001000.parquet │ │ ├── episode_001001.parquet │ │ ├── episode_001002.parquet │ │ └── ... │ └── ... ├── meta │ ├── episodes.jsonl │ ├── info.json │ ├── stats.json │ └── tasks.jsonl └── videos ├── chunk-000 │ ├── observation.images.laptop │ │ ├── episode_000000.mp4 │ │ ├── episode_000001.mp4 │ │ ├── episode_000002.mp4 │ │ └── ... │ ├── observation.images.phone │ │ ├── episode_000000.mp4 │ │ ├── episode_000001.mp4 │ │ ├── episode_000002.mp4 │ │ └── ... ├── chunk-001 └── ... 核心属性 meta: LeRobotDatasetMetadata类,元数据管理器,存储数据集“说明书”:特征定义(动作/观测维度)、帧率、机器人类型、任务描述等。 repo_id: str类型,数据集唯一标识(如 lerobot/pick_cube),用于 Hugging Face Hub 定位和本地路径标识。 root:Path,本地存储根路径(默认:~/.cache/huggingface/lerobot/{repo_id}),包含 data/(Parquet)、meta/(元数据)、videos/(视频)。 revision: str类型,表示数据集的版本,默认代码库版本为v2.1。 tolerance_s:float类型,时间戳校验容差,确保录制视频帧率稳定性。 hf_dataset: Hugging Face Dataset 对象,存储非图像数据(关节位置、时间戳等),以 Parquet 格式。 episode_data_index: dirc类型,episode 索引映射表,记录每个 episode 的起始/结束帧位置(如 {"from": [0, 300], "to": [299, 599]}),加速帧检索。 delta_indices: dict类型,时间戳偏移索引(如 {"past": [-2, -1], "future": [1, 2]}),用于多模态数据同步(如动作与图像对齐)。 episode_buffer: dict类型,内存缓冲区,临时存储当前录制的帧数据(如 {"action": [], "observation.images.handeye": [], "timestamp": []})。 image_writer:AsyncImageWriter,异步图像写入器(多线程/多进程),避免图像存储阻塞主控制循环(确保录制帧率稳定)。 image_transforms:图像预处理函数(如 torchvision.transforms.Resize、Normalize),训练时动态应用于摄像头图像。 video_backend: str类型,视频解码后端(默认 torchcodec 或 pyav),支持从 MP4 中精准提取指定时间戳的帧。 核心方法 init: 数据集加载的入口,根据本地缓存状态完成元数据校验、数据下载(若缺失) 和时间戳同步检查,确保数据集可用。 add_frame:逐帧添加数据到缓冲区,录制时将单帧数据(动作、观测、图像)添加到 episode_buffer,并异步写入图像(避免阻塞控制循环)。 save_episode:写入到磁盘,录制完成一个 episode 后,将 episode_buffer 中的数据写入磁盘。包括非图像数据转换为Parquet文件存储到xxx.parquet,图像数据编码为MP4存储,元数据更新info.json,episodes.jsonl,episodes_stats.jsonl。 getiterm:数据的加载与训练适配,实现 torch.utils.data.Dataset 接口,支持通过 DataLoader 加载数据用于训练。 push_to_hub:将数据集(元数据、Parquet、视频)上传至 Hugging Face Hub,自动生成数据集卡片(README.md),包含特征说明、硬件兼容性和统计信息 pull_from_repo:从 Hub 拉取指定版本的数据集,支持按 allow_patterns 筛选文件(如仅拉取元数据 meta/ 或特定 episode)。 start_image_writer/stop_image_writer:为避免图像存储阻塞录制主循环(导致帧率波动),通过 AsyncImageWriter 启动多线程/多进程写入器。 create:数据的新建或加载,是初始化的类方法。 @classmethod def create( cls, repo_id: str, # 数据集标识(用户指定) fps: int, # 帧率(默认 30 FPS) features: dict, # 特征定义(动作+观测,来自机器人硬件) root: ..., # 本地路径 robot_type: str, # 机器人类型(如 "so101_follower") use_videos: bool = True, # 是否编码视频(默认 True) image_writer_processes: int = 0, # 图像写入进程数 image_writer_threads: int = 4 * len(robot.cameras), # 图像写入线程数 ) -> "LeRobotDataset": # 1. 初始化元数据(调用 LeRobotDatasetMetadata.create) obj.meta = LeRobotDatasetMetadata.create( repo_id=repo_id, fps=fps, features=features, robot_type=robot_type, use_videos=use_videos ) # 2. 启动图像写入器(异步写入) if image_writer_processes or image_writer_threads: obj.start_image_writer(image_writer_processes, image_writer_threads) # 3. 初始化 episode 缓冲区(内存临时存储) obj.episode_buffer = obj.create_episode_buffer() # 4. 创建空 HF Dataset(存储非图像数据) obj.hf_dataset = obj.create_hf_dataset() return obj 元数据驱动:通过 LeRobotDatasetMetadata.create 确保数据集特征与机器人硬件严格对齐,避免录制后因特征不匹配导致训练错误。 性能优化:异步图像写入器(多线程/多进程)解决了录制时 I/O 阻塞问题,保障实时控制循环的稳定性。 标准化存储:统一 Parquet+MP4 格式,兼容 Hugging Face Hub 生态和 PyTorch DataLoader,简化“录制-训练”流程。 LeRobotDataset.create 是一个类方法,通过 @classmethod 装饰,其核心特性是无需先实例化LeRobotDataset 类即可调用,直接通过类名 LeRobotDataset.create(...) 触发,并返回一个初始化完成的 LeRobotDataset 实例,通常用于工厂模式或单例模式的场景。 类方法(@classmethod)的第一个参数是 cls(代表类本身),因此可以直接通过类名触发,而非实例。LeRobotDataset.create 的作用正是绕开普通构造函数 init,为新建数据集执行特殊初始化逻辑(如元数据创建、目录结构搭建、异步写入器启动等),最终返回一个完整的 LeRobotDataset 实例。 LeRobotDatasetMetadata.create 在LeRobotDataset中会调用到obj.meta = LeRobotDatasetMetadata.create创建一个LeRobotDatasetMetadata对象,其核心功能是为全新数据集初始化元数据结构,包括目录创建、特征定义合并、元数据文件生成(如 meta/info.json)和硬件兼容性校验,确保后续机器人数据录制(关节状态、摄像头图像等)与存储格式严格对齐。 @classmethod def create( cls, repo_id: str, fps: int, features: dict, robot_type: str | None = None, root: str | Path | None = None, use_videos: bool = True, ) -> "LeRobotDatasetMetadata": """Creates metadata for a LeRobotDataset.""" obj = cls.__new__(cls) #创建空的 LeRobotDatasetMetadata 实例(不触发 __init__,避免加载现有元数据) obj.repo_id = repo_id # 数据集标识(如 "username/pick_cube") obj.root = Path(root) if root is not None else HF_LEROBOT_HOME / repo_id # 本地路径(默认缓存目录) obj.root.mkdir(parents=True, exist_ok=False) # TODO(aliberts, rcadene): implement sanity check for features # 合并用户提供的特征与默认特征(补充必选字段) features = {**features, **DEFAULT_FEATURES} _validate_feature_names(features) # 校验特征名称合法性(如禁止含空格、特殊字符,确保与硬件接口一致) obj.tasks, obj.task_to_task_index = {}, {} # 任务列表(如 {"pick": 0, "place": 1})及索引映射 obj.episodes_stats, obj.stats, obj.episodes = {}, {}, {} # 初始化 episode 统计信息、全局统计、episode 列表 obj.info = create_empty_dataset_info(CODEBASE_VERSION, fps, features, use_videos, robot_type) #生成数据集的“总说明书”(meta/info.json),包含以下关键信息 if len(obj.video_keys) > 0 and not use_videos: raise ValueError() write_json(obj.info, obj.root / INFO_PATH) obj.revision = None return obj LeRobotDatasetMetadata.create 是新建数据集的“元数据基石”,通过标准化目录结构、特征定义和兼容性校验,确保机器人录制数据(动作、图像、状态)的存储格式与硬件特征严格对齐。其输出的 LeRobotDatasetMetadata 实例是连接机器人硬件与数据集文件系统的核心桥梁,为后续数据录制、编码和训练加载提供统一的元信息描述。 加载数据集 加载数据集是加载已经存在的数据集,针对的场景是针对此前的录制场景接着录制,与创建数据集不同,创建数据集是从头开始,创建一个空的数据集。那么lerobot怎么进行恢复了? 前面章节无论是LeRobotDataset.create 还是LeRobotDatasetMetadata.create都会绕过类的构造函数xxx.init的运行,那么xxx.init在哪里运行了? LeRobotDataset.init 是类的默认构造函数,仅在直接实例化 LeRobotDataset 时调用,核心场景是 恢复已有数据集的录制或加载。而 LeRobotDataset.create 是类方法,用于创建全新数据集,二者分工明确,对应不同的用户需求。接下来针对恢复的场景来说明。 当输入参数--resume=true,即在原来的数据集基础上进行。 if cfg.resume: #走恢复模式 dataset = LeRobotDataset( cfg.dataset.repo_id,# 数据集标识(如 "lerobot/pick_cube") root=cfg.dataset.root,# 本地存储路径(默认:~/.cache/huggingface/lerobot/{repo_id}) ) ) if hasattr(robot, "cameras") and len(robot.cameras) > 0: dataset.start_image_writer( num_processes=cfg.dataset.num_image_writer_processes, num_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras), ) sanity_check_dataset_robot_compatibility(dataset, robot, cfg.dataset.fps, dataset_features) else: dataset = LeRobotDataset.create(......) 关键流程是实例化LeRobotDataset,触发LeRobotDataset.init 方法,LeRobotDataset类的初始化代码如下。 LeRobotDataset.init LeRobotDataset: def __init__( self, repo_id: str, # 数据集标识(如 "lerobot/pick_cube") root: ..., # 本地存储路径(默认:~/.cache/huggingface/lerobot/{repo_id}) episodes: list[int] | None = None, # 指定加载的 episode 索引(如 [0, 2, 5]) image_transforms: Callable | None = None, # 图像预处理(如 Resize、Normalize) delta_timestamps: ..., # 时间戳偏移(用于多模态数据同步) tolerance_s: float = 1e-4, # 时间戳校验容差(确保帧率稳定性) revision: ..., # 数据集版本(默认代码库版本 v2.1) force_cache_sync: bool = False, # 强制同步缓存(忽略本地文件,重新拉取) download_videos: bool = True, # 是否下载视频文件 video_backend: str | None = None, # 视频解码后端(如 "pyav"、"torchcodec") ): # 1. 初始化基础路径与配置 self.repo_id = repo_id self.root = Path(root) or HF_LEROBOT_HOME / repo_id # 本地路径 #没有创建的话默认就是~/.cache/huggingface/lerobot self.root.mkdir(exist_ok=True, parents=True) # 创建目录(若不存在) # 2. 加载元数据(通过 LeRobotDatasetMetadata) self.meta = LeRobotDatasetMetadata( self.repo_id, self.root, self.revision, force_cache_sync=force_cache_sync ) # 3. 加载实际数据(优先本地缓存,缺失则从 Hub 下载) try: if force_cache_sync: raise FileNotFoundError # 强制同步时跳过本地缓存 # 验证本地数据文件完整性 assert all((self.root / fpath).is_file() for fpath in self.get_episodes_file_paths()) self.hf_dataset = self.load_hf_dataset() # 加载 Parquet 格式数据(非图像) except (AssertionError, FileNotFoundError): # 从 Hub 下载数据(含 Parquet 和视频文件) self.revision = get_safe_version(self.repo_id, self.revision) # 校验版本合法性 self.download_episodes(download_videos) # 下载指定 episode 数据 self.hf_dataset = self.load_hf_dataset() # 重新加载数据 # 4. 数据校验(确保时间戳与帧率匹配) timestamps = torch.stack(tuple(self.hf_dataset["timestamp"])).numpy() episode_indices = torch.stack(tuple(self.hf_dataset["episode_index"])).numpy() check_timestamps_sync( # 校验每帧时间间隔是否为 1/fps ± tolerance_s timestamps, episode_indices, self.episode_data_index, self.fps, self.tolerance_s ) LeRobotDataset 类的构造函数(init 方法),其核心作用是加载已存在的机器人数据集(本地或 Hugging Face Hub),并完成元数据校验、数据完整性检查、时间戳同步等关键步骤,为后续数据访问(如训练、可视化)提供统一接口。该方法是 LeRobotDataset 类的“入口”,仅在恢复已有数据集或加载数据集用于训练时调用。 LeRobotDataset 是基于 PyTorch Dataset 的子类,专为机器人数据设计,支持两种场景: 加载本地已有数据集:从指定路径(root)直接读取元数据和数据文件。 从 Hugging Face Hub 下载数据集:若本地数据缺失,自动从 Hub 拉取并加载。 无论哪种场景,init 均确保数据集的元数据一致性(如机器人类型、帧率)、数据完整性(文件不缺失)和时间戳有效性(符合录制帧率),为下游任务(如策略训练)提供可靠数据。 LeRobotDatasetMetadata.init 在LeRobotDataset.init中实例化了self.meta = LeRobotDatasetMetadata,该类创建时会调用构造函数LeRobotDatasetMetadata.init。 lass LeRobotDatasetMetadata: def __init__( self, repo_id: str, root: str | Path | None = None, revision: str | None = None, force_cache_sync: bool = False, ): self.repo_id = repo_id # 数据集唯一标识 self.revision = revision if revision else CODEBASE_VERSION # 版本(默认代码库版本) self.root = Path(root) if root is not None else HF_LEROBOT_HOME / repo_id #本地路径(默认缓存目录) try: if force_cache_sync: raise FileNotFoundError self.load_metadata() except (FileNotFoundError, NotADirectoryError): if is_valid_version(self.revision): self.revision = get_safe_version(self.repo_id, self.revision) (self.root / "meta").mkdir(exist_ok=True, parents=True) self.pull_from_repo(allow_patterns="meta/") # 加载本地元数据 self.load_metadata() def load_metadata(self): self.info = load_info(self.root) check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION) self.tasks, self.task_to_task_index = load_tasks(self.root) self.episodes = load_episodes(self.root) if self._version < packaging.version.parse("v2.1"): self.stats = load_stats(self.root) self.episodes_stats = backward_compatible_episodes_stats(self.stats, self.episodes) else: self.episodes_stats = load_episodes_stats(self.root) self.stats = aggregate_stats(list(self.episodes_stats.values())) def load_metadata(self): self.info = load_info(self.root) check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION) self.tasks, self.task_to_task_index = load_tasks(self.root) self.episodes = load_episodes(self.root) if self._version < packaging.version.parse("v2.1"): self.stats = load_stats(self.root) self.episodes_stats = backward_compatible_episodes_stats(self.stats, self.episodes) else: self.episodes_stats = load_episodes_stats(self.root) self.stats = aggregate_stats(list(self.episodes_stats.values())) 从 self.root/meta 目录加载核心元数据文件(info.json、tasks.jsonl、episodes.jsonl 等),并校验元数据版本与当前代码库兼容性(check_version_compatibility) 录制流程 policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta) #如果是能了策略模型,进行实例创建 listener, events = init_keyboard_listener() #初始化监听键盘事件 先判断是否指定了策略模型,如果指定了进行创建实例。同时初始化键盘监听器 recorded_episodes = 0 while recorded_episodes < cfg.dataset.num_episodes and not events["stop_recording"]: # 录制当前回合数据 log_say(f"Recording episode {dataset.num_episodes}", cfg.play_sounds) # 语音提示开始录制 record_loop( # 核心数据采集函数 robot=robot, teleop=teleop, # 遥操作器输入(或策略生成动作) policy=policy, dataset=dataset, # 数据写入目标 control_time_s=cfg.dataset.episode_time_s, # 单回合录制时长(默认60秒) display_data=cfg.display_data, # 实时可视化 ) # 回合间环境重置(给用户调整物体位置的时间) # 该阶段不写入数据,用于用户重置环境。 if not events["stop_recording"] and (...): log_say("Reset the environment", cfg.play_sounds) record_loop( # 重置阶段不记录数据(dataset=None) robot=robot, teleop=teleop, control_time_s=cfg.dataset.reset_time_s, # 重置时长(用户指定5秒) dataset=None, # 禁用数据写入 ) # 处理重录事件(用户按 'r' 触发) if events["rerecord_episode"]: log_say("Re-record episode", cfg.play_sounds) dataset.clear_episode_buffer() # 清空当前回合无效数据 continue # 重新录制当前回合 dataset.save_episode() # 保存当前回合数据到磁盘(图像、状态、动作) recorded_episodes += 1 # 回合计数+1 核心是调用record_loop进行采集录制,过程中监听用户输入的键盘值处理相关逻辑如右键结束单次录制,左键重复录制,ESC退出流程。 当前回合录制结束后,继续调用record_loop只是传递的参数不一样,等待环境复位。录制完当前回合后,调用dataset.save_episode()将数据写入到磁盘中。 录制循环 record_loop函数在两种场景下被调用,主要的作用是协调机器人控制(遥操作/策略)、数据采集与存储,确保数据帧率稳定且格式符合数据集要求。 正常录制:按配置时长(control_time_s)录制 episode 数据,保存到数据集。 环境重置:录制结束后,执行一段无数据保存的控制(让用户手动重置环境)。 初始化与参数校验 # 校验数据集帧率与录制帧率一致性 if dataset is not None and dataset.fps != fps: raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).") # 多遥操作器处理(如机械臂遥操作 + 键盘底盘控制) teleop_arm = teleop_keyboard = None if isinstance(teleop, list): # 分离键盘遥操作器和机械臂遥操作器 teleop_keyboard = next((t for t in teleop if isinstance(t, KeyboardTeleop)), None) teleop_arm = next((t for t in teleop if isinstance(t, (SO100Leader, ...))), None) # 校验:仅支持 LeKiwi 机器人,且必须包含 1 个键盘遥操作器 + 1 个机械臂遥操作器 if not (teleop_arm and teleop_keyboard and len(teleop) == 2 and robot.name == "lekiwi_client"): raise ValueError(...) # 策略重置(若使用策略控制,确保初始状态一致) if policy is not None: policy.reset() 首先校验一下数据集的帧率与录制帧率是否一致,如果遥控机器人有多个实例,比如主臂+键盘,一般这种组合用于lekiwi,包含一个键盘遥控操作器和1个机械臂遥控操作器。最后判断是否使用模型推理控制,如果是使用了先进行复位。 主循环控制与数据采集 timestamp = 0 # episode 已录制时长(秒) start_episode_t = time.perf_counter() # 起始时间戳 while timestamp < control_time_s: start_loop_t = time.perf_counter() # 本轮循环起始时间 # 检查是否提前退出(如用户按特定按键) if events["exit_early"]: events["exit_early"] = False break # 1. 获取机器人观测(传感器数据:摄像头图像、关节角度等) observation = robot.get_observation() # 2. 构建观测帧(符合数据集格式) if policy is not None or dataset is not None: observation_frame = build_dataset_frame(dataset.features, observation, prefix="observation") # 3. 生成动作(策略/遥操作器二选一) if policy is not None: # 策略生成动作:输入观测帧,输出动作值 action_values = predict_action( observation_frame, policy, device=get_safe_torch_device(...), ... ) # 格式化动作:按机器人动作特征(如关节名称)构建字典 action = {key: action_values[i].item() for i, key in enumerate(robot.action_features)} elif policy is None and isinstance(teleop, Teleoperator): # 单遥操作器:直接获取动作 action = teleop.get_action() elif policy is None and isinstance(teleop, list): # 多遥操作器:合并机械臂动作和底盘动作 arm_action = teleop_arm.get_action() # 机械臂动作 base_action = robot._from_keyboard_to_base_action(teleop_keyboard.get_action()) # 底盘动作 action = {**arm_action, **base_action} # 合并动作 # 4. 执行动作并记录实际发送的动作(可能因安全限制被裁剪) sent_action = robot.send_action(action) # 5. 保存数据到数据集(若启用) if dataset is not None: action_frame = build_dataset_frame(dataset.features, sent_action, prefix="action") # 动作帧 frame = {**observation_frame, **action_frame} # 合并观测与动作 dataset.add_frame(frame, task=single_task) # 添加到数据集 # 6. 可视化数据(若启用) if display_data: log_rerun_data(observation, action) # 记录数据用于 Rerun 可视化 # 7. 维持帧率:等待剩余时间以确保循环周期为 1/FPS 秒 dt_s = time.perf_counter() - start_loop_t # 本轮循环耗时 busy_wait(1 / fps - dt_s) # busy 等待以补足时间 # 更新 episode 已录制时长 timestamp = time.perf_counter() - start_episode_t 这里分为两种模式,一个是模型推理模式和遥控操作模式,前者是使用大模型进行预测产生动作处理,后者是用于大模型训练采集数据。 对于采集数据模式流程如下: 获取观测原始数据:调用robot.get_observation()获取观测数据,包括机器关节的数据、摄像头数据等。用于后续的数据存储。 获取遥控机器人的动作数据:调用action = teleop.get_action()获取到动作数据。如果是多遥控操作器,需要将其合并动作。 发送机器人执行动作:调用sent_action = robot.send_action(action)执行动作。 保存数据集:先调用build_dataset_frame构建遥控臂的数据action,然后将action与从臂机器的观测数据action_values进行合并frame,最后调用dataset.add_frame添加到数据集中。 可视化数据:如果启动了可视化数据,调用log_rerun_data记录数据用于rerun可视化。 时长更新:录制一轮有一个默认的等待时长,调用busy_wait进行等待。 对于模型推理模式流程如下: 获取观测原始数据:调用robot.get_observation()获取观测数据,包括机器关节的数据、摄像头数据等。 构建模型输入数据:要能够喂给模型做预测动作,需要将数据转换为模型可接收的格式,调用build_dataset_frame将观测原始数据进行转换。 模型生成预测动作:调用predict_action得到预测动作action_values,接着将action_values转换为机器的动作特征action。 发送预测动作:调用robot.send_action(action)将预测动作发送给机器进行执行。 下面是observation = robot.get_observation()返回的数据格式,主要包括的是6个舵机关节位置信息+两个相机的图像信息,一共8个键值对。 observation: {'elbow_flex.pos': 99.54710144927537, 'fixed': array([[[ 58, 48, 39], [ 56, 46, 37], [ 49, 39, 29], ..., ..., [ 45, 81, 69], [ 54, 87, 76], [ 55, 88, 77]]], shape=(480, 640, 3), dtype=uint8), 'gripper.pos': 2.666666666666667, 'handeye': array([[[76, 71, 75], [75, 70, 74], [69, 67, 70], ..., ..., [38, 73, 31], [31, 68, 25], [26, 63, 20]]], shape=(480, 640, 3), dtype=uint8), 'shoulder_lift.pos': -98.65771812080537, 'shoulder_pan.pos': -10.490956072351423, 'wrist_flex.pos': 54.17743324720067, 'wrist_roll.pos': -3.5714285714285694} 原始的观测数据,要经过处理,以便喂给模型或者存储到本地,输出的数据如下,一共有3个键值对,包括2个摄像头的键值对和一个舵机关节位置的,可以看到将前面6个舵机的合并为一个键值对了。 observation_frame: {'observation.images.fixed': array([[[ 80, 55, 61], [ 73, 48, 54], [ 68, 48, 49], ..., ..., [ 62, 82, 73], [ 69, 89, 80], [ 73, 90, 82]]], shape=(480, 640, 3), dtype=uint8), 'observation.images.handeye': array([[[67, 78, 72], [72, 83, 77], [69, 85, 75], ..., ..., [51, 71, 20], [39, 59, 6], [42, 63, 7]]], shape=(480, 640, 3), dtype=uint8), 'observation.state': array([-10.490956 , -98.657715 , 99.547104 , 54.177433 , -3.5714285, 2.6666667], dtype=float32)} 发送给机器人的动作数据,比较简单如下,也就是6个舵机的关节位置信息。 sent_action: {'elbow_flex.pos': 99.63685882886972, 'gripper.pos': 0.6509357200976403, 'shoulder_lift.pos': -99.36102236421725, 'shoulder_pan.pos': -10.0, 'wrist_flex.pos': 53.53886235345203, 'wrist_roll.pos': -3.598634095087988} 存储录制数据时,需要遥控机器action_frame、观测机器abservation_frame(前面已经列出了),下面看看相关数据的格式。 action_frame是遥控机器的sent_action转换而来,调用action_frame = build_dataset_frame(dataset.features, sent_action, prefix="action"),实际上就是将sent_action数据6个键值对改为1个键值对。 action_frame: {'action': array([-10. , -99.36102 , 99.636856 , 53.538864 , -3.598634 , 0.6509357], dtype=float32)} 最后将经过处理的一个遥控机器数据和经过处理的观测数据进行合并,最终得到如下4个键值对的数据。 frame: {'action': array([-10. , -99.36102 , 99.636856 , 53.538864 , -3.598634 , 0.6509357], dtype=float32), 'observation.images.fixed': array([[[ 70, 62, 51], [ 63, 55, 44], [ 61, 54, 44], ..., ..., [ 53, 95, 83], [ 47, 90, 80], [ 58, 104, 93]]], shape=(480, 640, 3), dtype=uint8), 'observation.images.handeye': array([[[76, 80, 55], [78, 82, 59], [77, 79, 57], ..., ..., [52, 64, 18], [47, 61, 12], [50, 66, 17]]], shape=(480, 640, 3), dtype=uint8), 'observation.state': array([-10.490956 , -98.657715 , 99.547104 , 54.177433 , -3.5714285, 2.6666667], dtype=float32)} 可视化显示 # 6. 可视化数据(若启用) if display_data: log_rerun_data(observation, action) # 记录数据用于 Rerun 可视化 可以看到界面显示的observation.xxx和action.xxx就是调用的这里log_rerun_data函数,理论上来说observation和action要越吻合越好,因为action是模型推理或主臂的动作,observation是实际的动作。 数据存储 在上一节中,循序录制过程中,将经过处理的遥控机器数据action_frame和经过处理后的机器观测数据observation_frame合并得到的数据frame,然后先调用dataset.add_frame(frame, task=single_task)写入,最后调用save_episode方法将缓存数据写入磁盘。 写入缓存 add_frame 是 LeRobotDataset 类的核心方法之一,负责将单帧机器人数据暂存到内存缓冲区(episode_buffer),并对图像数据进行预处理(如格式转换、临时存储)。该方法是数据录制流程中的关键环节,确保每帧数据符合数据集格式要求,并为后续的 save_episode 方法(将缓冲区数据写入磁盘)做准备。 def add_frame(self, frame: dict, task: str, timestamp: float | None = None) -> None: # 1. 数据格式转换PyTorch Tensor → NumPy 数组 # 目的是统一数据格式为 NumPy 数组(数据集底层存储格式),避免因混合 Tensor/NumPy 类型导致后续处理异常。 for name in frame: if isinstance(frame[name], torch.Tensor): frame[name] = frame[name].numpy() #2. 数据校验,确保帧格式符合数据集定义 #检查 frame 中的所有特征(如 observation.state、action) #是否与 self.features 中定义的 dtype(数据类型)、shape #(维度)一致。例如,若 self.features 定义 action 为 #float32 且形状为 (6,),则校验 frame["action"] 是否满足这些 #条件,确保数据规范性。 validate_frame(frame, self.features) #3. 如果没有初始化 episode 缓冲区,则初始化创建 # 内存缓冲区,用于累积当前 episode 的所有帧数据。结构 #与 self.features 对应,例如包含 observation.state、 #action、timestamp 等键,每个键的值为列表(按帧顺序存 #储数据) if self.episode_buffer is None: self.episode_buffer = self.create_episode_buffer() # 4. 记录帧索引与时间戳 # 未提供 timestamp,默认按帧率(self.fps)计算相对时间 #(如 30 FPS 时,第 0 帧为 0s,第 1 帧为 1/30 ≈0.033s),确保时间序列连续性 frame_index = self.episode_buffer["size"] if timestamp is None: timestamp = frame_index / self.fps # 记录存储frame_index,timestamp,task self.episode_buffer["frame_index"].append(frame_index) self.episode_buffer["timestamp"].append(timestamp) self.episode_buffer["task"].append(task) # 5. 添加一帧数据到episode_buffer for key in frame: if key not in self.features: raise ValueError( f"An element of the frame is not in the features. '{key}' not in '{self.features.keys()}'." ) # 如果是图像,处理追加到数据缓冲区 if self.features[key]["dtype"] in ["image", "video"]: img_path = self._get_image_file_path( episode_index=self.episode_buffer["episode_index"], image_key=key, frame_index=frame_index ) if frame_index == 0: img_path.parent.mkdir(parents=True, exist_ok=True) #通过 _get_image_file_path 生成标准化路径,调用存储图片,可同步可异步。 self._save_image(frame[key], img_path) self.episode_buffer[key].append(str(img_path)) else: # 如果是动作特征数据直接追加到原始数据缓冲区 self.episode_buffer[key].append(frame[key]) #6. 最后更新缓冲区大小 self.episode_buffer["size"] += 1 add_frame 是机器人数据录制的“数据暂存中枢”,通过内存缓冲区累积帧数据、预处理图像并校验数据格式,为后续持久化存储奠定基础。其设计兼顾了录制效率(异步图像写入)和数据可靠性(格式校验),是构建标准化机器人数据集的核心环节,其设计,总结有以下优点: 数据暂存与累积:通过 episode_buffer 在内存中临时存储一整段 episode 的数据,避免频繁磁盘 I/O 影响录制帧率。 图像预处理解耦:将图像保存与数据记录分离,支持异步图像写入(AsyncImageWriter),确保主录制循环(record_loop)不受图像存储速度影响,维持稳定帧率。 数据一致性校验:通过 validate_frame 提前过滤无效数据,避免错误数据进入后续流程(如 save_episode 写入磁盘) 写入磁盘 录制完一轮,会调用dataset.save_episode()进行存储写入磁盘。 def save_episode(self, episode_data: dict | None = None) -> None: #1. 输入处理与数据校验 if not episode_data: episode_buffer = self.episode_buffer #使用内存缓冲区数据(默认流程) #校验缓冲区数据的完整性,确保所有特征(如 #observation.state、action)的长度一致(与 episode 总帧 #数匹配),且 episode_index 未超出当前数据集范围 validate_episode_buffer(episode_buffer, self.meta.total_episodes, self.features) # 2.从缓冲区中提取非特征数据(这些键不直接存储到 Parquet) episode_length = episode_buffer.pop("size")# 当前 episode 的总帧数 tasks = episode_buffer.pop("task")# 每帧的任务标签列表(可能重复) episode_tasks = list(set(tasks))# 当前 episode 涉及的唯一任务列表 episode_index = episode_buffer["episode_index"] # 当前 episode 的索引 #3. 构建全局索引与episode标识,确保每帧在整个数据集中有唯一标识,便于后续数据加载时定位。 # 生成全局帧索引(从数据集总帧数开始累加) episode_buffer["index"] = np.arange(self.meta.total_frames, self.meta.total_frames + episode_length) #生成 episode_index 数组(所有帧均属于当前 episode) episode_buffer["episode_index"] = np.full((episode_length,), episode_index) # 4. 任务标签处理与元数据更新 # 添加新任务到元数据(若任务不存在) for task in episode_tasks: task_index = self.meta.get_task_index(task) if task_index is None: self.meta.add_task(task)# 新增任务并更新 tasks.jsonl # 生成 task_index 数组(将每帧的任务标签映射为整数索引) episode_buffer["task_index"] = np.array([self.meta.get_task_index(task) for task in tasks]) # 5.数值特征数据格式化 for key, ft in self.features.items(): # 跳过索引类特征、图像和视频(这些由其他逻辑处理) if key in ["index", "episode_index", "task_index"] or ft["dtype"] in ["image", "video"]: continue # 将列表形式的帧数据堆叠为二维数组(shape: [episode_length, feature_dim]) episode_buffer[key] = np.stack(episode_buffer[key]) #若 action 特征为长度 6 的向量,episode_buffer["action"] # 会从 [ [a0], [a1], ..., [aN] ] 堆叠为 [ [a0_0, ..., a0_5], ..., [aN_0, ..., aN_5] ],符合 Parquet 存储格式。 #6. 等待异步图像写入完成 self._wait_image_writer()# 确保所有临时图像文件已写入磁盘 # 保存数值到Parquet文件 self._save_episode_table(episode_buffer, episode_index) ep_stats = compute_episode_stats(episode_buffer, self.features) # _save_episode_table 行为 # 从缓冲区提取特征数据,构造 datasets.Dataset 对象。 # 将数据写入 Parquet 文件(路径由元数据的 get_data_file_path 生成,如 data/chunk-000/episode_000000.parquet)。 # 更新内存中的 hf_dataset(拼接新 episode 数据)。 #8. 若启用视频模式,将图像编码存储为视频文件 if len(self.meta.video_keys) > 0: video_paths = self.encode_episode_videos(episode_index) # 将临时图像编码为 MP4 #调用 ffmpeg 将临时目录下的图像帧(如 images/observation.images.laptop/episode_000000/frame_*.png) #编码为 MP4 视频,存储路径由元数据的 get_video_file_path 定义 for key in self.meta.video_keys: episode_buffer[key] = video_paths[key] # 记录视频文件路径 # `meta.save_episode` be executed after encoding the videos #9. 更新元数据与校验时间戳 # 更新元数据(总 episodes、总 frames、chunks 等) self.meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats) # 校验时间戳连续性(确保帧间隔符合帧率 1/fps ± tolerance_s) ep_data_index = get_episode_data_index(self.meta.episodes, [episode_index]) ep_data_index_np = {k: t.numpy() for k, t in ep_data_index.items()} check_timestamps_sync( episode_buffer["timestamp"],# 帧时间戳数组 episode_buffer["episode_index"],# episode 索引数组 ep_data_index_np,# episode 帧范围 self.fps,# 帧率 self.tolerance_s,# 时间容差 ) #10. 文件完整性校验与资源清理 # 验证视频文件数量(每个 episode × 每个视频 key 应对应一个 MP4) video_files = list(self.root.rglob("*.mp4")) assert len(video_files) == self.num_episodes * len(self.meta.video_keys) # 验证 Parquet 文件数量(每个 episode 对应一个 Parquet) parquet_files = list(self.root.rglob("*.parquet")) assert len(parquet_files) == self.num_episodes # 删除临时图像目录(已编码为视频,无需保留) img_dir = self.root / "images" if img_dir.is_dir(): shutil.rmtree(self.root / "images") # 重置缓冲区,准备下一段 episode 录制 if not episode_data: # Reset the buffer self.episode_buffer = self.create_episode_buffer() save_episode 是 LeRobotDataset 类的核心方法,负责将内存缓冲区(episode_buffer)中累积的单段 episode 数据持久化到磁盘,同时更新元数据、生成视频文件、校验数据完整性,并重置缓冲区以准备下一段录制。该方法是数据录制流程的“收尾环节”,确保每段 episode 数据符合 LeRobot 数据集格式规范。主要功能如下: 数据持久化:将内存中的帧数据(数值特征、图像路径)写入 Parquet 文件(结构化数据)和 MP4 文件(视频数据)。 元数据更新:更新数据集元信息(如总 episode 数、总帧数、任务标签),确保元数据与实际数据一致。 数据校验:验证时间戳连续性、文件完整性(如视频/Parquet 文件数量匹配预期),避免无效数据入库。 资源清理:删除临时图像文件(已编码为视频),释放内存缓冲区。 save_episode 是 LeRobot 数据集录制的“最终执行者”,通过系统化的数据格式化、编码、校验和清理,将内存中的临时帧数据转化为符合规范的磁盘存储,同时维护元数据一致性和数据完整性。其设计确保了录制数据的可靠性、存储效率和下游可用性,是连接实时录制与离线数据使用的关键桥梁。 结束采集 log_say("Stop recording", cfg.play_sounds, blocking=True) # 语音提示结束录制 robot.disconnect() # 断开机器人连接(禁用扭矩,确保安全) if teleop is not None: teleop.disconnect() # 断开遥操作器连接 if not is_headless() and listener is not None: listener.stop() # 停止键盘监听器 断开设备,停止键盘监听。 if cfg.dataset.push_to_hub: dataset.push_to_hub(tags=cfg.dataset.tags, private=cfg.dataset.private) # 上传数据集到 Hugging Face Hub 将数据上传到hugging Face Hub上。
  • lerobot示教

    lerobot示教

    启动 示教的功能主要是主臂控制,从臂跟随,在数据采集是非常的一环。下面是模块启动的执行命令: python -m lerobot.teleoperate \ --robot.type=so101_follower \ --robot.port=/dev/ttyACM0 \ --robot.id=R12252801 \ --teleop.type=so101_leader \ --teleop.port=/dev/ttyACM1 \ --teleop.id=R07252608 python解释器回安装sys.path查找lerobot.teleoperate模块,找到执行lerobot/src/lerobot/teleoperate.py文件,调用teleoperate函数。 ## 配置解析 @draccus.wrap() def teleoperate(cfg: TeleoperateConfig): init_logging() logging.info(pformat(asdict(cfg))) ## 初始化可视化工具 if cfg.display_data: _init_rerun(session_name="teleoperation") ##创建操作设备和机器人实例 teleop = make_teleoperator_from_config(cfg.teleop) robot = make_robot_from_config(cfg.robot) ## 连接设备 teleop.connect() robot.connect() ##示教循环 try: teleop_loop(teleop, robot, cfg.fps, display_data=cfg.display_data, duration=cfg.teleop_time_s) except KeyboardInterrupt: pass finally: if cfg.display_data: rr.rerun_shutdown() ## 断开连接 teleop.disconnect() robot.disconnect() if __name__ == "__main__": teleoperate() 下面展开来说明一下。 配置解析 @draccus.wrap() def teleoperate(cfg: TeleoperateConfig): # 初始化日志记录 init_logging() # 打印配置信息 logging.info(pformat(draccus.asdict(cfg))) 在teleoperate.py中teleoperate函数使用@draccus.wrap()装饰器解析命令行参数,将参数转换为TeleoperateConfig类型的配置对象。dracus是用于配置解析的库,类型与argparse。 这里的TeleoperateConfig是一个自定义的配置类,定义如下: from dataclasses import dataclass @dataclass class RobotConfig: type: str port: str id: str @dataclass class TeleopConfig: type: str port: str id: str @dataclass class TeleoperateConfig: teleop: TeleoperatorConfig robot: RobotConfig # Limit the maximum frames per second. fps: int = 60 teleop_time_s: float | None = None # Display all cameras on screen display_data: bool = False 可以看到TeleoperateConfig类中属性定义了robot,teleop两个对象,命令行的参数将会映射到,如: --robot.type=so101_follower:对应cfg.robot.type --teleop.port=/dev/ttyACM1:对应cfg.teleop.port 初始化可视化工具 if cfg.display_data: _init_rerun(session_name="teleoperation") 当参数--display_data=true,即cfg.display_data 为 True,则调用 _init_rerun 函数初始化 rerun 可视化工具。rerun 是一个用于记录和可视化科学数据的工具。 def _init_rerun(session_name: str = "lerobot_control_loop") -> None: """Initializes the Rerun SDK for visualizing the control loop.""" batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000") os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_size rr.init(session_name) memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%") rr.spawn(memory_limit=memory_limit) _init_rerun函数的主要功能是初始化Rerun SDK,设置数据刷新的字节数和内存使用限制,然后启动Rerun会话,为后续的控制循环可视化工作做准备。 其中有两个参数是从环境变量中获取,分别是RERUN_FLUSH_NUM_BYTES和LEROBOT_RERUN_MEMORY_LIMIT,用于控制每次刷新是处理数据的字节数和限制使用的内存量。 创建主从臂实例 teleop = make_teleoperator_from_config(cfg.teleop) robot = make_robot_from_config(cfg.robot) 调用调用 make_teleoperator_from_config 和make_robot_from_config 函数,根据配置对象创建 SO101Leader遥操作设备实例和 SO101Follower 机器人实例。 def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator: if config.type == "keyboard": from .keyboard import KeyboardTeleop return KeyboardTeleop(config) elif config.type == "koch_leader": from .koch_leader import KochLeader return KochLeader(config) elif config.type == "so100_leader": from .so100_leader import SO100Leader return SO100Leader(config) elif config.type == "so101_leader": from .so101_leader import SO101Leader return SO101Leader(config) ...... 以leader为例,最终调用SO101Leader对象实例。 class SO101Leader(Teleoperator): """ SO-101 Leader Arm designed by TheRobotStudio and Hugging Face. """ config_class = SO101LeaderConfig name = "so101_leader" def __init__(self, config: SO101LeaderConfig): super().__init__(config) #调用父类Teleoperator够着函数初始化属性,包括 #self.id:设备的唯一标识符,从配置对象中获取。 #self.calibration_dir:校准文件的存储目录。 #self.calibration_fpath:校准文件的完整路径。 #self.calibration:存储电机校准信息的字典。 self.config = config #类型为 SO101LeaderConfig,存储传入的配置对象,方便在类的其他方法中访问配置信息。 norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100 self.bus = FeetechMotorsBus( port=self.config.port, motors={ "shoulder_pan": Motor(1, "sts3215", norm_mode_body), "shoulder_lift": Motor(2, "sts3215", norm_mode_body), "elbow_flex": Motor(3, "sts3215", norm_mode_body), "wrist_flex": Motor(4, "sts3215", norm_mode_body), "wrist_roll": Motor(5, "sts3215", norm_mode_body), "gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), }, calibration=self.calibration, ) #类型为 FeetechMotorsBus,用于与 Feetech 电机进行通信,初始化时传入端口、电机信息和校准信息。 SO101Leader类继承了Teleoperator,super().init(config) 调用父类 Teleoperator 的构造函数初始化的属性。 class Teleoperator(abc.ABC): def __init__(self, config: TeleoperatorConfig): self.id = config.id #设备的id self.calibration_dir = ( config.calibration_dir if config.calibration_dir else HF_LEROBOT_CALIBRATION / TELEOPERATORS / self.name ) #校准文件的存储目录 self.calibration_dir.mkdir(parents=True, exist_ok=True) #创建校准目录 self.calibration_fpath = self.calibration_dir / f"{self.id}.json" self.calibration: dict[str, MotorCalibration] = {} # 初始化一个字典,键为字符串,值为MotorCalibration if self.calibration_fpath.is_file(): self._load_calibration() #检查校准文件是否存在,存在调用加载校准信息的方法。 Teleoperator定义了一个遥控的基类,其构造函数中接受一个TeleoperatorConfig类型的参数config,主要有以下属性。 self.id:设备的唯一标识符,从传入的配置对象 config 里获取,用于在系统中唯一标识该 SO101Leader 设备实例。 self.calibration_dir:校准文件的存储目录。若 config.calibration_dir 有值则使用该值,否则使用默认路径。此目录用于存放设备的校准文件。 self.calibration_fpath:校准文件的完整路径,由 self.calibration_dir 和设备 id 组合而成,文件格式为 JSON。 self.calibration:存储电机校准信息的字典,键为电机名称,值为 MotorCalibration 类型的对象,用于保存电机的校准参数。 而SO101Leader类中自己定义的实力属性有 - self.config:类型为 SO101LeaderConfig,存储传入的配置对象。借助这个属性,类的其他方法可以访问配置信息,如端口号、是否使用角度单位等。 - self.bus:类型为 FeetechMotorsBus,用于和 Feetech 电机进行通信。初始化时传入端口、电机信息和校准信息,后续可以通过该对象实现电机的读写操作、校准等功能。 总结下就是SO101Leader 类包含 2 个类属性和多个实例属性,这些属性从不同层面描述了 SO101 领导者手臂设备的类型、配置、校准信息以及与电机的通信对象等。而SO101Follower也是同理这里就不过多阐述。 连接设备 teleop.connect() robot.connect() 连接设备就是各自调用此前创建实例的connect方法,建立与硬件设备的连接。以teleop为例,最总调用到self.bus.connect发起连接并进行配置。 def connect(self, calibrate: bool = True) -> None: if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() self.configure() logger.info(f"{self} connected.") 跟随控制 def teleop_loop( teleop: Teleoperator, robot: Robot, fps: int, display_data: bool = False, duration: float | None = None ): display_len = max(len(key) for key in robot.action_features) start = time.perf_counter() while True: loop_start = time.perf_counter() #从主臂获取动作位置数据 action = teleop.get_action() #如果启动了实时显示,进行显示 if display_data: observation = robot.get_observation() log_rerun_data(observation, action) #将主臂的动作位置数据发送给从臂 robot.send_action(action) #计算循环执行时间 dt_s = time.perf_counter() - loop_start busy_wait(1 / fps - dt_s) loop_s = time.perf_counter() - loop_start #打印动作的信息,每个舵机的位置。 print("\n" + "-" * (display_len + 10)) print(f"{'NAME':<{display_len}} | {'NORM':>7}") for motor, value in action.items(): print(f"{motor:<{display_len}} | {value:>7.2f}") print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)") if duration is not None and time.perf_counter() - start >= duration: return # 将终端的文本光标向上移动一定的行数,以此实现覆盖上一次输出内容的效果,从而在终端里模拟实时更新的显示 move_cursor_up(len(action) + 5)
  • lerobot设备标定

    lerobot设备标定

    why calibrate 先来看看标定后的数据 { "shoulder_pan": { #肩部旋转关节 "id": 1, "drive_mode": 0, "homing_offset": -1620, "range_min": 1142, "range_max": 2931 }, "shoulder_lift": { #肩部升降关节 "id": 2, "drive_mode": 0, "homing_offset": 2025, "range_min": 844, "range_max": 3053 }, "elbow_flex": { #肘部弯曲关节 "id": 3, "drive_mode": 0, "homing_offset": -1208, "range_min": 963, "range_max": 3078 }, "wrist_flex": { #腕部弯曲关节 "id": 4, "drive_mode": 0, "homing_offset": 2021, "range_min": 884, "range_max": 3222 }, "wrist_roll": { #腕部旋转关节 "id": 5, "drive_mode": 0, "homing_offset": -777, "range_min": 142, "range_max": 3961 }, "gripper": { #夹爪关节 "id": 6, "drive_mode": 0, "homing_offset": 909, "range_min": 1978, "range_max": 3522 } } 上面数据是每个舵机的相关参数,一共有6个舵机,每个舵机都有一个id来标识,可以对应实物来看实际是从下到上。 id: 电机的唯一标识符,用于在总线通信时精准定位和控制特定电机。 drive_mode: 电机的驱动模式,取值为 0 表示特定的驱动模式,不同驱动模式会影响电机的运动特性与控制方式。 homing_offset:归位偏移量,指电机从物理零点位置到校准零点位置的偏移量。此参数能保证电机在每次启动时都能回到准确的零点位置,从而提升运动精度。 range_min 和 range_max:电机运动范围的最小值和最大值,以数值形式呈现。这两个参数限定了电机的运动边界,避免因超出范围而导致硬件损坏或者运动异常。range_min 和 range_max:电机运动范围的最小值和最大值,以数值形式呈现。这两个参数限定了电机的运动边界,避免因超出范围而导致硬件损坏或者运动异常。 上面的参数信息在代码中lerobot/src/lerobot/robots/koch_follower/koch_follower.py 中回进行配置。 从上面的配置信息可知,之所以要标定,就是要获取电机的一些物理特性,主要是几个方面:明确电机物理运动范围、归一化电机位置、确保机器人运行精度等。 明确电机范围 机器的电机要有特定的物理运动范围,因为发送要是超过这个范围,回造成硬件损坏。使用calibrate操作可以记录每个电机的最小和最大位置限制,保证后续发送的指令都在安全范围内。 lerobot/src/lerobot/robots/so101_follower/so101_follower.py def calibrate(self) -> None: # ...已有代码... print( "Move all joints sequentially through their entire ranges " "of motion.\nRecording positions. Press ENTER to stop..." ) range_mins, range_maxes = self.bus.record_ranges_of_motion() self.calibration = {} for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, homing_offset=homing_offsets[motor], range_min=range_mins[motor], range_max=range_maxes[motor], ) # ...已有代码... 归一化电机位置 不同电机的原始位置值可能都离散的任意值,这些依赖于具体的电机型号,不具备通用性。calibrate 操作能将这些原始位置值归一化为有意义的连续值,像百分比、角度等,方便后续处理和控制。 lerobot/src/lerobot/motors/motors_bus.py def _normalize(self, id_, val, min_, max_, drive_mode): motor = self._id_to_name(id_) if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100: norm = (((val - min_) / (max_ - min_)) * 200) - 100 normalized_values[id_] = -norm if drive_mode else norm # ...已有代码... 确保机器运行精度 lerobot/src/lerobot/robots/so100_leader/so100_leader.py 中的 calibrate def calibrate(self) -> None: # ...已有代码... input(f"Move {self} to the middle of its range of motion and press ENTER....") homing_offsets = self.bus.set_half_turn_homings() # ...已有代码... 通过校准,可以确定电机的归位偏移量(homing offset),这有助于提高机器人运动的精度和重复性。在实际应用中,准确的位置控制对机器人完成任务至关重要。 最后经过校准后的数据,后续无需在重复校准,校准保存的路径一般在~/.cache/huggingface/lerobot/calibration,实际运行过程中,回进行加载。 保存 def _save_calibration(self, fpath: Path | None = None) -> None: draccus.dump(self.calibration, f, indent=4) 加载 def _load_calibration(self, fpath: Path | None = None) -> None: self.calibration = draccus.load(dict[str, MotorCalibration], f) 校准流程 python -m lerobot.calibrate \ --robot.type=so101_follower \ --robot.port=/dev/ttyACM0 \ --robot.id=R12252801 上面是执行命令示例,接下来个跟踪一下流程。Python 解释器接收到 python -m lerobot.calibrate 命令后,会将 lerobot.calibrate 当作一个可执行模块来处理。按照 Python 模块查找机制,会在 sys.path 所包含的路径里寻找 lerobot/calibrate.py 文件,对应的文件路径为lerobot/src/lerobot/calibrate.py。 @draccus.wrap() def calibrate(cfg: CalibrateConfig): init_logging() logging.info(pformat(asdict(cfg))) if isinstance(cfg.device, RobotConfig): device = make_robot_from_config(cfg.device) elif isinstance(cfg.device, TeleoperatorConfig): device = make_teleoperator_from_config(cfg.device) device.connect(calibrate=False) device.calibrate() device.disconnect() if __name__ == "__main__": calibrate() 调用calibrate含税,从calibrate可以看到主要的步骤为: 参数解析与配置初始化 创建设备实例 连接设备 设备校准 设备断开连接 参数解析与配置初始化 calibrate函数被@draccus.wrap()装饰,draccus是一个用于解析命令行参数并创建配置对象的库。 @dataclass class CalibrateConfig: teleop: TeleoperatorConfig | None = None robot: RobotConfig | None = None def __post_init__(self): if bool(self.teleop) == bool(self.robot): raise ValueError("Choose either a teleop or a robot.") self.device = self.robot if self.robot else self.teleop draccus会把命令行参数--robot.type=so101_follower --robot.port=/dev/ttyACM0 --robot.id=R12252801解析成CalibrateConfig类的实例。robot属性会被初始化为RobotConfig类型的对象,如果是--teleop.type=so101_leader 将会初始化为TeleoperatorConfig类型的对象。RobotConfig对象在config.py中定义的。 @dataclass(kw_only=True) class RobotConfig(draccus.ChoiceRegistry, abc.ABC): # Allows to distinguish between different robots of the same type id: str | None = None # Directory to store calibration file calibration_dir: Path | None = None def __post_init__(self): if hasattr(self, "cameras") and self.cameras: for _, config in self.cameras.items(): for attr in ["width", "height", "fps"]: if getattr(config, attr) is None: raise ValueError( f"Specifying '{attr}' is required for the camera to be used in a robot" ) @property def type(self) -> str: return self.get_choice_name(self.__class__) 其会根据命令行参数或配置文件中的类型字段,动态选择并创建对应的配置子类实例。 如当前的参数有type,port,id分别设置为so101_follower、/dev/ttyACM0 和 R12252801。 创建设备实例 根据cfg.device类型,调用对应工厂含税创建设备实例。 if isinstance(cfg.device, RobotConfig): device = make_robot_from_config(cfg.device) elif isinstance(cfg.device, TeleoperatorConfig): device = make_teleoperator_from_config(cfg.device) 由于 cfg.device 是 RobotConfig 类型,所以会调用 make_robot_from_config(cfg.device)。该函数可能定义在 /home/laumy/lerobot/src/lerobot/robots/init.py 或者相关的工厂模块中,依据 cfg.device.type 的值(即 so101_follower),会创建 SO101Follower 类的实例。 连接设备 调用 device.connect(calibrate=False) 方法连接设备,lerobot/src/lerobot/robots/so101_follower/so101_follower.py 文件中,其 connect 方法可能如下: def connect(self, calibrate: bool = True) -> None: if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") # 连接串口总线 self.bus = DynamixelBus(port=self.config.port, baudrate=1000000) self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() # 连接摄像头 for cam in self.cameras.values(): cam.connect() self.configure() logger.info(f"{self} connected.") 因为传入的 calibrate=False,所以不会触发自动校准。在连接过程中,会先连接串口总线,再连接摄像头,最后进行设备配置。 标定动作 调用 device.calibrate() 方法对设备进行校准,SO101Follower 类的 calibrate 方法可能如下: def calibrate(self) -> None: logger.info(f"\nRunning calibration of {self}") # 禁用扭矩 self.bus.disable_torque() # 设置电机工作模式为位置模式 for motor in self.bus.motors: self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) # 手动操作提示 input(f"Move {self} to the middle of its range of motion and press ENTER....") # 设置半圈归零偏移量 homing_offsets = self.bus.set_half_turn_homings() print( "Move all joints sequentially through their entire ranges " "of motion.\nRecording positions. Press ENTER to stop..." ) # 记录关节运动范围 range_mins, range_maxes = self.bus.record_ranges_of_motion() self.calibration = {} for motor, m in self.bus.motors.items(): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, homing_offset=homing_offsets[motor], range_min=range_mins[motor], range_max=range_maxes[motor], ) # 将校准数据写入电机 self.bus.write_calibration(self.calibration) # 保存校准数据 self._save_calibration() 校准过程包含禁用扭矩、设置电机工作模式、手动操作提示、记录关节运动范围、保存校准数据等步骤。 设备断开连接 调用 device.disconnect() 方法断开设备连接,SO101Follower 类的 disconnect 方法如下: def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") # 断开串口总线连接 self.bus.disconnect(self.config.disable_torque_on_disconnect) # 断开摄像头连接 for cam in self.cameras.values(): cam.disconnect() logger.info(f"{self} disconnected.")
  • 模型训练GPU跑飞

    模型训练GPU跑飞

    问题 当前使用的是魔改版的NVIDIA 2080 Ti 22G显卡,发现在模型训练过程中,跑着跑着就报错了,具体如下: raceback (most recent call last): File "/home/laumy/lerobot/./src/lerobot/scripts/train.py", line 291, in <module> train() File "/home/laumy/lerobot/src/lerobot/configs/parser.py", line 226, in wrapper_inner response = fn(cfg, *args, **kwargs) File "/home/laumy/lerobot/./src/lerobot/scripts/train.py", line 212, in train train_tracker, output_dict = update_policy( File "/home/laumy/lerobot/./src/lerobot/scripts/train.py", line 101, in update_policy train_metrics.loss = loss.item() RuntimeError: CUDA error: unspecified launch failure CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect. For debugging consider passing CUDA_LAUNCH_BLOCKING=1 Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions. 然后使用nvidia-smi发现显卡都找不到了。 nvidia-smi No devices were found 排查 重启电脑,重新训练模型,同时执行以下命令查看显卡情况。 watch -n nvidia-smi 发现训练过程中,温度飙升非常快,初步怀疑是性能跑太满,导致温度过高保护了。 解决 限制GPU的功率和核心频率 sudo nvidia-smi -pl 150 # 将功率限制设置为150W sudo nvidia-smi -lgc 1000,1000 # 限制核心频率为1000MHz 限制后继续跑,发现没有问题了。也可以使用nvidia-smi -a来详细查看参数。 另外如果想实时查看GPU监控,可以使用 nvtop 修改显卡为高性能模式 把GPU的runtime PM和PCIe ASPM关掉,下面是开机自启动配置脚本。 sudo vim /usr/local/sbin/fix-nvidia-runtime-pm.sh #!/bin/bash # 强制关闭 ASPM,避免因节能进入低功耗模式 echo performance | tee /sys/module/pcie_aspm/parameters/policy # 获取 NVIDIA GPU 的 PCIe 设备路径 GPU_DEV="0000:$(lspci | awk '/NVIDIA/{print $1; exit}')" if [ -z "$GPU_DEV" ]; then echo "NVIDIA GPU not found, exiting script." exit 1 fi # 设置 GPU 的 power/control 为 'on',确保设备不会休眠 CUR="/sys/bus/pci/devices/${GPU_DEV}" while [ -n "$CUR" ] && [ -e "$CUR" ]; do if [ -w "$CUR/power/control" ]; then echo on | sudo tee "$CUR/power/control" fi # 上一级桥 PARENT=$(readlink -f "$CUR/..") [[ "$PARENT" == "/sys/devices" ]] && break CUR="$PARENT" done # 启用 NVIDIA persistence 模式,避免 GPU 重置 nvidia-smi -pm 1 # 输出信息确认操作成功 echo "GPU power control set to 'on' and ASPM disabled." 配置可执行sudo chmod +x /usr/local/sbin/fix-nvidia-runtime-pm.sh 然后配置开机自启动 sudo vim /etc/systemd/system/fix-nvidia-runtime-pm.service [Unit] Description=Force NVIDIA GPU power control 'on' and disable ASPM After=multi-user.target Wants=multi-user.target [Service] Type=oneshot ExecStart=/usr/local/sbin/fix-nvidia-runtime-pm.sh RemainAfterExit=true [Install] WantedBy=multi-user.target 然后设置开机自启动 sudo systemctl daemon-reload sudo systemctl enable fix-nvidia-runtime-pm.service sudo systemctl start fix-nvidia-runtime-pm.service 重启后可以验证一下 cat /sys/bus/pci/devices/0000:$(lspci | awk '/NVIDIA/{print $1; exit}')/power/control 如果打开的是on成功 cat /sys/module/pcie_aspm/parameters/policy 输出应该是 [performance]
  • lerobot搭建

    lerobot搭建

    设备查询 本文是记录ubuntu系统lerobot试验的快捷命令,方便开始负责执行设备,不会介绍为什么? python -m lerobot.find_port sudo chmod +666 /dev/ttyACM0 /dev/ttyACM1 python -m lerobot.find_cameras 机器标定 从臂标定 python -m lerobot.calibrate \ --robot.type=so101_follower \ --robot.port=/dev/ttyACM0 \ --robot.id=R12252801 主臂标定 python -m lerobot.calibrate \ --teleop.type=so101_leader \ --teleop.port=/dev/ttyACM1 \ --teleop.id=R07252608 标定的文件路径默认存储在/home/laumy/.cache/huggingface/lerobot/calibration。如果要更改路径。 --robot.calibration_dir=/home/laumy/lerobot/calibrations --teleop.calibration_dir=/home/laumy/lerobot/calibrations 如果指定了路径,后续的代码都需要指定。 示教 python -m lerobot.teleoperate \ --robot.type=so101_follower \ --robot.port=/dev/ttyACM0 \ --robot.id=R12252801 \ --teleop.type=so101_leader \ --teleop.port=/dev/ttyACM1 \ --teleop.id=R07252608 数据采集 python -m lerobot.record \ --robot.disable_torque_on_disconnect=true \ --robot.type=so101_follower \ --robot.port=/dev/ttyACM0 \ --robot.id=R12252801 \ --robot.cameras="{ handeye: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30}, fixed: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}}" \ --teleop.type=so101_leader \ --teleop.port=/dev/ttyACM1 \ --teleop.id=R07252608 \ --dataset.repo_id=laumy/record-07271148\ --dataset.num_episodes=10 \ --dataset.reset_time_s=5 \ --dataset.push_to_hub=false \ --dataset.single_task="Grab the cube" \ --display_data=true repo_id必现为"用户名/数据集名"格式,代码中会检测是否有"/"。 数据存储默认的路径为/home/laumy/.cache/huggingface/lerobot/laumy/record-07271148如果要修改路径的话,加上下面参数。 --dataset.root=/home/laumy/lerobot/data/record_07271148 如果要继续上一次的录制,可以添加在上面命令基础上加上: --resume=true 如果数据采集过程中,突然异常终止,无法恢复报错如下时。 Traceback (most recent call last): File "/home/laumy/lerobot/./src/lerobot/scripts/train.py", line 291, in <module> train() File "/home/laumy/lerobot/src/lerobot/configs/parser.py", line 226, in wrapper_inner response = fn(cfg, *args, **kwargs) File "/home/laumy/lerobot/./src/lerobot/scripts/train.py", line 128, in train dataset = make_dataset(cfg) File "/home/laumy/lerobot/src/lerobot/datasets/factory.py", line 90, in make_dataset dataset = LeRobotDataset( File "/home/laumy/lerobot/src/lerobot/datasets/lerobot_dataset.py", line 489, in __init__ check_timestamps_sync(timestamps, episode_indices, ep_data_index_np, self.fps, self.tolerance_s) File "/home/laumy/lerobot/src/lerobot/datasets/utils.py", line 585, in check_timestamps_sync raise ValueError( ValueError: One or several timestamps unexpectedly violate the tolerance inside episode range. This might be due to synchronization issues during data collection. [{'diff': np.float32(-15.966666), 'episode_index': 90, 'timestamps': [np.float32(15.966666), np.float32(0.0)]}] 解决办法就是到数据集路径下~/.cache/huggingface/lerobot/laumy/record-07261516/检查数据,一般就是数据集不匹配了,打开data和videos下面的数据,看看数量是否和meta里面的jsonl对齐了,因为程序采集过程中异常终止,可能只写了data目录或videos目录,但是meta目录下没来得及写就崩溃退出,把最新的一组数据删除了并把meta下面的所有文件数量对齐就可以了。 需要注意的时,info.json中的数据要特别进行修改,下面是需要修改info.json的地方。 { "codebase_version": "v2.1", "robot_type": "so101_follower", "total_episodes": 54, ------采集的周期 "total_frames": 21243, ------采集的总长度,是episodes.jsonl的总和,这个可以通过后面的脚步来计算确认 "total_tasks": 1, "total_videos": 108, -----总的视频数量,一般是episodes * 2 "total_chunks": 1, "chunks_size": 1000, "fps": 30, "splits": { "train": "0:54" -----用于划分给训练集的数据,后面的数据一般要和total_episodes一致。 }, 如果meta/info.json中total_frames是所有视频的总帧数,是从meta/episodes.jsonl中所有的length字段获取的总和,如果做了调整要重新计算更新一下这个total_frames。下面是重新计算的脚步。 import os import json import argparse from pathlib import Path def calibrate_total_frames(dataset_root): dataset_root = Path(dataset_root) # 手动指定 INFO_PATH 和 EPISODES_PATH info_path = dataset_root / "info.json" # 替换为实际的元数据文件路径 episodes_path = dataset_root / "episodes.jsonl" # 替换为实际的剧集信息文件路径 # 检查元数据文件是否存在 if not info_path.exists(): print(f"Metadata file {info_path} does not exist.") return # 检查剧集信息文件是否存在 if not episodes_path.exists(): print(f"Episodes file {episodes_path} does not exist.") return # 加载元数据python ./src/lerobot/scripts/train.py \ --dataset.repo_id=${HF_USER}/record-07271539 \ --policy.type=act \ --output_dir=outputs/train/weigh_07271539 \ --job_name=act_so101_test \ --policy.device=cuda \ --policy.push_to_hub=false \ --wandb.enable=false with open(info_path, 'r') as f: info = json.load(f) # 重新计算 total_frames total_frames = 0 with open(episodes_path, 'r') as f: for line in f: episode = json.loads(line) total_frames += episode.get('length', 0) # 更新元数据 # info["total_frames"] = total_frames # with open(info_path, 'w') as f: # json.dump(info, f, indent=4) print(f"Total frames calibrated to {total_frames}") if __name__ == "__main__": parser = argparse.ArgumentParser(description='Calibrate total frames in dataset metadata.') parser.add_argument('dataset_root', type=str, help='Root path of the dataset') args = parser.parse_args() calibrate_total_frames(args.dataset_root) 训练 python ./src/lerobot/scripts/train.py \ --dataset.repo_id=laumy/record-07271539 \ --policy.type=act \ --output_dir=outputs/train/weigh_07271539 \ --job_name=act_so101_test \ --policy.device=cuda \ --policy.push_to_hub=false \ --wandb.enable=false 训练过程如果不小心终止了,执行下面命令可以接着上次的训练。 python ./src/lerobot/scripts/train.py \ --config_path=outputs/train/weigh_07271539/checkpoints/last/pretrained_model/train_config.json \ --resume=true --steps=200000 --steps参数表示迭代次数,这里表示200K次。 测试 python -m lerobot.record \ --robot.type=so101_follower \ --robot.disable_torque_on_disconnect=true \ --robot.port=/dev/ttyACM0 \ --robot.cameras="{ handeye: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30}, fixed: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}}" \ --robot.id=R12252801 \ --display_data=false \ --dataset.single_task="Put brick into the box" \ --policy.path=outputs/weigh_07280842/pretrained_model \ --dataset.episode_time_s=240 \ --dataset.repo_id=laumy/eval_so101_07280842 默认录制时长是60s,60S后会停止,如果要改长加上--dataset.episode_time_s=640 class ACTConfig(PreTrainedConfig): n_obs_steps: int = 1 chunk_size: int = 100 n_action_steps: int = 1 n_obs_steps:传递给策略的观测时间窗口大小,具体是指包含当前步骤在内的连续观测步数。也就是说我要观测几步动作来预测接下来的动作。 chunk_size=100: 动作预测分块大小,为单次模型前向传播预测的动作序列总长度(以环境步数为单位),简单理解就是预测的动作步数,实际执行的动作会从这里面选择。 n_action_steps:从预测的动作分块中实际执行的步数,所以必须满足 n_action_steps ≤ chunk_size。 观测序列 → [O_t-n, ..., O_t] → 策略网络 → [A1...A100] → 执行[A1...A50] ↑ ↑ ↑ ↑ n_obs_steps=3 chunk_size=1 n_action_steps=50 一般来说如果是抓取固定的物体,n_obs_steps(3~5)设置可设较小值,chun_size设置大一点以减少策略调用频率,n_action_steps一般取chunk_size/2。如果是动态的环境(如装配、堆叠)n_obs_steps建议设置大一点10~20,chunk_size也缩减一点。 action.xxx: 代表的是要设定执行的动作,也就是主臂或者模型推理处理发送的指令。 observation.xxx:实际执行的动作。 一般情况下observation要跟action越吻合越好。 其他(废弃) 用于记录,可不用看 mac 找uart和摄像头 export HF_USER=laumy python -m lerobot.find_port python -m lerobot.find_cameras 从臂标定 python -m lerobot.calibrate \ --robot.type=so101_follower \ --robot.port=/dev/tty.usbmodem5A7A0576331 \ --robot.id=R12252801 主臂标定 python -m lerobot.calibrate \ --teleop.type=so101_leader \ --teleop.port=/dev/tty.usbmodem5A7A0582001 \ --teleop.id=R07252608 示教 python -m lerobot.teleoperate \ --robot.type=so101_follower \ --robot.port=/dev/ttyACM0 \ --robot.id=R12252801 \ --teleop.type=so101_leader \ --teleop.port=/dev/ttyACM1 \ --teleop.id=R07252608 采集数据 python -m lerobot.record \ --robot.disable_torque_on_disconnect=true \ --robot.type=so101_follower \ --robot.port=/dev/ttyACM0 \ --robot.id=R12252801 \ --robot.cameras="{ handeye: {type: opencv, index_or_path: 8, width: 640, height: 360, fps: 30}, fixed: {type: opencv, index_or_path: 10, width: 640, height: 360, fps: 30}}" \ --teleop.type=so101_leader \ --teleop.port=/dev/ttyACM1 \ --teleop.id=R07252608 \ --dataset.repo_id=${HF_USER}/record-test \ --dataset.num_episodes=10 \ --dataset.reset_time_s=5 \ --dataset.push_to_hub=false \ --dataset.single_task="Grab the cube" 训练 python ./src/lerobot/scripts/train.py \ --dataset.repo_id=${HF_USER}/data_07181406 \ --policy.type=act \ --output_dir=outputs/train/weigh_07181406 \ --job_name=act_so101_test \ --policy.device=cuda \ --policy.push_to_hub=false \ --wandb.enable=false 测试 python -m lerobot.record \ --robot.type=so101_follower \ --robot.disable_torque_on_disconnect=true \ --robot.port=/dev/ttyACM0 \ --robot.cameras="{ handeye: {type: opencv, index_or_path: 8, width: 640, height: 360, fps: 30}, fixed: {type: opencv, index_or_path: 10, width: 640, height: 360, fps: 30}}" \ --robot.id=R12252801 \ --display_data=false \ --dataset.single_task="Put lego brick into the transparent box" \ --policy.path=outputs/100000/pretrained_model \ --dataset.repo_id=${HF_USER}/eval_so101 windows 找uart和摄像头 set HF_USER=86152 python -m lerobot.find_port python -m lerobot.find_cameras 从臂标定 python -m lerobot.calibrate ` --robot.type=so101_follower ` --robot.port=COM5 ` --robot.id=R12252801 主臂标定 python -m lerobot.calibrate ` --teleop.type=so101_leader ` --teleop.port=COM6 ` --teleop.id=R07252608 示教 python -m lerobot.teleoperate ` --robot.type=so101_follower ` --robot.port=COM5 ` --robot.id=R12252801 ` --teleop.type=so101_leader ` --teleop.port=COM6 ` --teleop.id=R07252608 录制 t 训练 python src/lerobot/scripts/train.py ` --dataset.repo_id=86152/data_07181406 ` --policy.type=act ` --output_dir=outputs/train/weigh_07181406 ` --job_name=act_so101_test ` --policy.device=cuda ` --policy.push_to_hub=false ` --wandb.enable=false 测试 python -m lerobot.record ` --robot.type=so101_follower ` --robot.disable_torque_on_disconnect=true ` --robot.port=COM5 ` --robot.cameras="{ handeye: {type: opencv, index_or_path: 1, width: 640, height: 360, fps: 30}, fixed: {type: opencv, index_or_path: 2, width: 640, height: 360, fps: 30}}" ` --robot.id=R12252801 ` --display_data=false ` --dataset.single_task="Put lego brick into the transparent box" ` --dataset.repo_id=${HF_USER}/eval_so101 ` --policy.path=outputs/train/weigh_07172300/checkpoints/last/pretrained_model unbuntu export HF_USER=laumy python lerobot/scripts/find_motors_bus_port.py python lerobot/common/robot_devices/cameras/opencv.py python lerobot/scripts/control_robot.py \ --robot.type=so101 \ --control.type=record \ --control.fps=30 \ --control.single_task="Grasp a lego block and put it in the bin." \ --control.repo_id=${HF_USER}/so101_test \ --control.tags='["so101","tutorial"]' \ --control.warmup_time_s=5 \ --control.episode_time_s=30 \ --control.reset_time_s=5 \ --control.num_episodes=10 \ --control.push_to_hub=false 如果要继续上一次的录制,可以添加在上面命令基础上加上:--control.resume=true 可视化训练数据 python lerobot/scripts/visualize_dataset_html.py --repo-id ${HF_USER}/so101_test 拷贝数据到服务器 scp -P 18620 so101_test.tar.gz root@connect.bjb1.seetacloud.com:~/ python lerobot/scripts/train.py \ --dataset.repo_id=${HF_USER}/so101_test \ --policy.type=act \ --output_dir=outputs/train/act_so101_test0717 \ --job_name=act_so101_test \ --policy.device=cuda \ --wandb.enable=false python lerobot/scripts/control_robot.py \ --robot.type=so101 \ --control.type=record \ --control.fps=30 \ --control.single_task="Grasp a lego block and put it in the bin." \ --control.repo_id=${HF_USER}/eval_act_so101_test \ --control.tags='["tutorial"]' \ --control.warmup_time_s=5 \ --control.episode_time_s=30 \ --control.reset_time_s=30 \ --control.num_episodes=10 \ --control.push_to_hub=false \ --control.policy.path=outputs/train/act_so101_test/checkpoints/last/pretrained_model
  • ONNX Runtime C++端侧模型部署YOLOv5

    ONNX Runtime C++端侧模型部署YOLOv5

    加载准备 初始化ONNXRuntime环境 Ort::Env env(ORT_LOGGING_LEVEL_WARNING, "YOLOv5Inference"); Ort::Env 是 ONNX Runtime C++ API 中用于初始化运行环境的类,有多个重载的构造函数,下面是一个构造函数原型及参数作用如下。 Ort::Env( OrtLoggingLevel logging_level, const char* logid, OrtLoggingFunction logging_fn = nullptr, void* logger_param = nullptr ); logging_level:控制日志输出级别 logid: 自定义日志标签,用于区分不同模块的日志来源 logging_fn:自定义日志回调函数,若为 nullptr 则使用默认日志输出到控制台。 logger_param:传递给自定义日志函数的用户参数(如上下文对象) 设置会话参数 Ort::SessionOptions session_options; session_options.SetIntraOpNumThreads(1); session_options.SetGraphOptimizationLevel(GraphOptimizationLevel::ORT_ENABLE_ALL); 初始化一个空的会话配置对象session_options,SetIntraOpNumThreads限制单个算子(Intra-op)内部使用的线程数为 1,适用于轻量级任务或避免多线程竞争,SetGraphOptimizationLevel启用所有图优化策略(如算子融合、常量折叠),提升推理性能。 模型加载 Ort::Session session_(env, modelPath.c_str(), session_options); Ort::Session 是 ONNX Runtime C++ API 中用于加载 ONNX 模型并创建推理会话的核心类,其功能分解如下。 Ort::Session( const Ort::Env& env, const char* model_path, const Ort::SessionOptions& options ); env:全局运行环境对象,管理线程池和内存分配等资源,需优先初始化。 model_path:ONNX 模型文件的路径,c语言的字符串类型。 options:会话参数,配置会话行为,如线程数、优化级别、硬件后端等。 获取输入和输出信息 输入名称 Ort::AllocatorWithDefaultOptions allocator; //创建默认内存分配器对象,用于管理 ONNX Runtime 中的内存分配(如节点名称字符串的内存 std::vector<const char*> input_node_names_; //存储 C 风格字符串指针,用于直接传递给 ONNX Runtime 的推理接口 std::vector<std::string> input_names_; //存储标准字符串对象的vector,用于长期维护字符串内存 size_t num_inputs_; num_inputs_ = session_.GetInputCount(); //获取输入节点的个数,有多少个节点就决定了多个个name,一般都是1个。 input_node_names_.resize(num_inputs_); input_names_.resize(num_inputs_, ""); //预分配容器空间,避免动态扩容的开销 std::cout << "num_inputs = "<< num_inputs_<<std::endl; for (size_t i = 0; i < num_inputs_; ++i) { auto input_name = session_.GetInputNameAllocated(i, allocator); //通过分配器安全获取第 i 个输入节点的名称(返回 Ort::AllocatedStringPtr 对象) input_names_[i].append(input_name.get()); //获取名称的原始指针,存入 input_names_ 的字符串中 input_node_names_[i] = input_names_[i].c_str(); //将 std::string 转换为 C 风格指针,供 input_node_names_ 使用 } 上面函数示例了如何获取输入节点name,首先通过session_.GetInputCount()获取到输入的节点,然后使用for循环进行遍历每个节点,通过session_.GetInputNameAllocated(i, allocator)获取每个节点的名称,返回一个Ort::AllocatedStringPtr智能指针,需要通过.get方法返回c字符串,由于智能指针指向的存储空间退出for后会销毁,所以上述代码将其复制到input_names_中。 输入张量维度 Ort::TypeInfo input_type_info = session_.GetInputTypeInfo(0); //获取模型第0个输入节点的类型信息对象,返回Ort::TypeInfo类型 auto input_tensor_info = input_type_info.GetTensorTypeAndShapeInfo(); //从类型信息中提取张量相关的形状和数据类型信息,返回Ort::TensorTypeAndShapeInfo对象 std::vector<int64_t> input_dims = input_tensor_info.GetShape(); //获取输入张量的维度信息,返回std::vector<int64_t>容器,存储各维度大小 //典型YOLO模型的输入维度为[batch, channel, height, width] int inputWidth = input_dims[3]; int inputHeight = input_dims[2]; 上面函数示例获取输入张量形状,最终通过张量的形状获取到了输入图像的宽和高。实际上可以简化一下,按照下面的方式。 auto inputShapeInfo = session_.GetInputTypeInfo(0).GetTensorTypeAndShapeInfo().GetShape(); int ch = inputShapeInfo[1]; inputWidth = inputShapeInfo[2]; inputHeight = inputShapeInfo[3]; 输出名称 std::vector<const char*> output_node_names_; //存储C风格字符串指针的vector,用于兼容需要const char*的ONNX Runtime API调用 std::vector<std::string> output_names_; //存储标准字符串对象的vector,用于长期维护字符串内存 size_t num_outputs_; num_outputs_ = session_.GetOutputCount(); //获取模型输出节点数量 output_node_names_.resize(num_outputs_); output_names_.resize(num_outputs_, ""); //预分配两个vector的空间 for (size_t i = 0; i < num_outputs_; ++i) { auto output_name = session_.GetOutputNameAllocated(i, allocator); output_names_[i].append(output_name.get()); //将名称存入std::string保证生命周期 output_node_names_[i] = output_names_[i].c_str(); } //循环获取每个输出节点名称 上面示例了获取输出名称,与输入方法类似。 输入预处理 cv::Mat image = cv::imread(imagePath); if (image.empty()) { std::cerr << "Error: Could not read image." << std::endl; return -1; } cv::Mat originalImage = image.clone(); cv::Size image_shape = originalImage.size(); // 图像预处理 std::vector<float> inputTensor = preprocess(image, inputWidth, inputHeight); 使用opencv读取图像,调用preprocess进行预处理。 std::vector<float> preprocess(const cv::Mat& image, int inputWidth = 320, int inputHeight = 320) { cv::Mat resizedImage; cv::resize(image, resizedImage, cv::Size(inputWidth, inputHeight)); //图像缩放:使用OpenCV的resize函数将图像调整为指定尺寸(默认320x320) cv::cvtColor(resizedImage, resizedImage, cv::COLOR_BGR2RGB); //颜色空间转换:从BGR转换为RGB格式(多数深度学习模型使用RGB输入) resizedImage.convertTo(resizedImage, CV_32F, 1.0 / 255.0); //数值归一化:通过convertTo将像素值从[0,255]归一化到[0,1]范围 std::vector<float> inputTensor; for (int c = 0; c < 3; ++c) { for (int h = 0; h < inputHeight; ++h) { for (int w = 0; w < inputWidth; ++w) { inputTensor.push_back(resizedImage.at<cv::Vec3f>(h, w)[c]); } } } //通过三重循环将OpenCV的HWC格式(Height-Width-Channel)转换为CHW格式 //内存布局变为连续通道数据:RRR...GGG...BBB,最终输出std::vector<float> return inputTensor; } 上面的代码实现了模型对输入数据的部分预处理,包括输入图片缩放固定尺寸,数值归一化,以及将格式转换为CHW张量格式,但是对于输入模型,需要的数据格式为Ort::Value类型。 std::vector<int64_t> input_shape = {1, 3, inputHeight, inputWidth}; //input_shape采用NCHW格式(批次数-通道-高度-宽度),这是深度学习模型的通用输入布局 auto memory_info = Ort::MemoryInfo::CreateCpu(OrtArenaAllocator, OrtMemTypeDefault); //描述用于描述内存分配的信息,包括内存的位置(CPU 或 GPU)以及内存的具体类型(固定内存或常规内存) Ort::Value input_tensor = Ort::Value::CreateTensor<float>(memory_info, inputTensor.data(), inputTensor.size(), input_shape.data(), input_shape.size()); 关于CreateTensor对象解析如下。 static Ort::Value CreateTensor<float>( const MemoryInfo& memory_info, // 内存管理策略 float* p_data, // 输入数据指针 size_t p_data_length, // 输入的大小 const int64_t* shape, // 维度数组指针 size_t shape_length // 维度数量 ); memory_info:指定张量内存分配策略,通常由Ort::MemoryInfo::CreateCpu创建。 inputTensor.data():输入数据的地址(需确保内存连续)。 inputTensor.size():输入数据的大小。 input_shape.data():输入张量的形状数组信息(如NCHW格式的{1,3,640,640})。 shape_length: 输入张量的形状信息维度数 模型推理 std::vector<Ort::Value> outputs = session_.Run( Ort::RunOptions{nullptr}, input_node_names_.data(), &input_tensor, 1, output_node_names_.data(), output_node_names_.size()); 下面是函数的参数 OrtStatus * OrtApi::Run( const OrtRunOptions * run_options, const char *const *input_names, //输入节点名称的数组 const OrtValue *const *inputs, //模型输入的数据Ort::Value类型 size_t input_len, //输入张量数量,需与input_names数组长度一致 const char *const *output_names,//输出节点名称数组 size_t output_names_len,//输出节点名称的数量,与output_names数组数量保持一致。 ) 输出后处理 //张量信息提取,outputs[0]指向坐标、分数张量指针,outputs[1]指向类别张量的指针 float* dets_data = outputs[0].GetTensorMutableData<float>(); //坐标(格式为[x1,y1,x2,y2,score]) float* labels_pred_data = outputs[1].GetTensorMutableData<float>(); //类别 //张量维度的解析,用于获取检测框的数量 auto dets_tensor_info = outputs[0].GetTensorTypeAndShapeInfo(); std::vector<int64_t> dets_dims = dets_tensor_info.GetShape(); size_t num_detections = dets_dims[1]; //结构化重组,解析输出的张量将其存储dets、scores、lables_pred std::vector<std::vector<float>> dets(num_detections, std::vector<float>(4)); std::vector<float> scores(num_detections); std::vector<int> labels_pred(num_detections); //遍历解析存储坐标dets、分数scores、标签类别lables_pred for (size_t i = 0; i < num_detections; ++i) { for (int j = 0; j < 4; ++j) { dets[i][j] = dets_data[i * 5 + j]; } scores[i] = dets_data[i * 5 + 4]; labels_pred[i] = static_cast<int>(labels_pred_data[i]); } //将坐标信息进行缩放以适应正常的图片大小。 float scale_x = static_cast<float>(image_shape.width) / inputWidth; float scale_y = static_cast<float>(image_shape.height) / inputHeight; for (auto& det : dets) { det[0] *= scale_x; det[1] *= scale_y; det[2] *= scale_x; det[3] *= scale_y; } 上面的代码从输出张量信息中进行解析,将坐标、分数、标签类别依次存储到dets、scores、lables_pred中。 void visualizeResults(cv::Mat& image, const std::vector<std::vector<float>>& dets, const std::vector<float>& scores, const std::vector<int>& labels_pred, const std::vector<std::string>& labels, float conf_threshold = 0.4) { for (size_t i = 0; i < dets.size(); ++i) { const auto& det = dets[i]; float score = scores[i]; if (score > conf_threshold) { int class_id = labels_pred[i]; int x1 = static_cast<int>(det[0]); int y1 = static_cast<int>(det[1]); int x2 = static_cast<int>(det[2]); int y2 = static_cast<int>(det[3]); std::string label = labels[class_id]; cv::rectangle(image, cv::Point(x1, y1), cv::Point(x2, y2), cv::Scalar(0, 255, 0), 2); cv::putText(image, label + ": " + std::to_string(score), cv::Point(x1, y1 - 10), cv::FONT_HERSHEY_SIMPLEX, 0.9, cv::Scalar(0, 255, 0), 2); } } } 最终将获取到的将坐标、分数、标签类别传入到visualizeResults进行绘制。 发现一个开源的ai toolkit,相对比较全。https://github.com/xlite-dev/lite.ai.toolkit/tree/main
  • pip install

    pip install

    是什么 pip install 是python包管理器,用于python软件包的下载、安装、卸载等功能。 怎么用 在线安装 pip install 软件包名 pip install 软件包名==版本号 例如pip install requests,或pip install requests==1.1。 也可以从文件列表中获取安装 pip install -r requirements.txt 从requirements.txt文件安装依赖,通常用于项目的依赖管理。 pip的软件包一般有两种格式: whl (Wheel) 格式: 文件是一种预编译的Python包格式,类似于Windows的.exe安装文件,但专门用于Python。 tar.gz:包含了Python包的源代码,需要先解压,然后pip会根据其中的setup.py文件进行编译和安装。 whl文件是pip推荐的安装包格式,因为它更快,而.tar.gz文件则用于源代码分发和离线安装。 torchvision-0.17.1-cp311-cp311-macosx_10_13_x86_64.whl 这个命名规则是什么?第一个cp311是编译是python版本为3.11,第二个cp311表示ABI(应用二进制接口)兼容 Python 3.1,确保与 Python 3.11 环境完全适;操作系统架构为macos 10.13以上,x86_64 intel/AMD 64位。 离线升级 pip install --no-index --find-links=./offline_packages -r requirements.txt no-index:表示不从网上获取安装。 find-links:选择本地包的路径 r:下载所有依赖,可省略。 获取软件包可以通过u盘或者下载的方式,看看怎么下载。 pip download -d ./offline_packages -r requirements.txt 升级 pip install --upgrade 软件包名 或简写方式: pip install -U 软件包 用于升级软件包名称。包括升级pip。 软件包源 查看源 pip config list 安装软件时不指定源就会默认从当前的源获取,对应的配置文件路径:~/.config/pip/pip.conf 设置源 pip config set global.index-url <源地址> 示例:pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple 永久设置源,将会写到配置文件。 pip config unset global.index-url 删除全局配置的源。 指定源 pip install xxx -i https:xxx 或者pip install xxx --index-url https:xxx 也可以从github中获取 pip install git+<仓库地址> 常用源 清华大学:https://pypi.tuna.tsinghua.edu.cn/simple 阿里云:https://mirrors.aliyun.com/pypi/simple/ 中国科学技术大学:https://pypi.mirrors.ustc.edu.cn/simple/ 卸载 pip uninstall 软件包名 卸载对应的软件包。 查看 pip --version 查看pip的版本。 pip list 列出安装了那些包
  • ONNX Runtime Python端侧模型部署YOLOv5

    ONNX Runtime Python端侧模型部署YOLOv5

    ONNX Runtime介绍 ONNX Runtime不依赖于Pytorch、tensorflow等机器学习训练模型框架。他提供了一种简单的方法,可以在CPU、GPU、NPU上运行模型。通常ONNX Runtime用于端侧设备模型的运行推理。要使用ONNX Runtime运行模型,一般的步骤如下: 用你最喜欢的框架(如pytorch、tensorflow、paddle等)训练一个模型。 将模型转换或导出为ONNX格式。 在端侧使用ONNX Runtime加载并运行模型。 模型的训练和导出为ONNX格式这里就不再阐述了。下面基于python在端侧运行模型的示例: import numpy # 导入numpy模块 import onnxruntime as rt # 导入onnxruntime模块 sess = rt.InferenceSession( "logreg_iris.onnx", providers=rt.get_available_providers()) # 加载模型logreg_iris.onnx input_name = sess.get_inputs()[0].name # 获取模型的输入名称,对应的是使用https://netron.app/中intput name。 pred_onx = sess.run(None, {input_name: X_test.astype(numpy.float32)})[0] # 运行模型推理,返回结果到pred_onx中 print(pred_onx) 上面给出的python示例中,端侧运行模型可以总结为2个步骤。加载模型,模型推理。 加载模型 class onnxruntime.InferenceSession( path_or_bytes: str | bytes | os.PathLike, sess_options: onnxruntime.SessionOptions | None = None, providers: Sequence[str | tuple[str, dict[Any, Any]]] | None = None, provider_options: Sequence[dict[Any, Any]] | None = None, **kwargs) path_or_bytes: 模型文件名或者ONNX、ORT格式二进制。 sess_options: 会话选项,比如配置线程数、优先级。 providers: 指定执行提供者优先级(['CUDAExecutionProvider','CPUExecutionProvider']) provider_options: 字典序列,为每个提供者配置专属参数(如CUDA设备ID) options = onnxruntime.SessionOptions() options.SetIntraOpNumThreads(4) # 多设备优先级配置 session = InferenceSession( "model.onnx", sess_options=options, providers=[ ('CUDAExecutionProvider', {'device_id': 0}), 'CPUExecutionProvider' ] ) 模型推理 outputs = senssion.run(output_names, input_feed, run_options=None) output_names:输出节点名称,字符串列表,指定需要获取的输出节点名称,若为None则返回所有输出 input_feed:输入数据,字典类型,结构为{"输入节点名": numpy数组/ORTValue},建议使用ORTValue封装输入数据以减少CPU-GPU拷贝开销。 run_options:运行参数,如日志级别。 import numpy as np import onnxruntime as ort # 创建示例数据 cpu_data = np.random.rand(1, 3, 224, 224).astype(np.float32) # 转换为GPU上的ORTValue gpu_ort_value = ort.OrtValue.ortvalue_from_numpy( cpu_data, device_type='cuda', # 关键参数:指定GPU设备 device_id=0 # GPU设备ID(多卡时指定) ) print(gpu_ort_value.device_name()) # 输出: 'Cuda' results = session.run( ["output_name"], {"input_name": gpu_ort_value} # 避免CPU->GPU拷贝 ) 在运行模型是,需要获取模型的输入和输出名称,可以通过调用对应的函数session.get_inputs(),session.get_outputs()来获取。inputs和outputs函数返回的是onnxruntime.NodeArg类,该类是ONNX Runtime中表示计算图节点输入/输出参数的核心类,该类有3个成员变量,如下: property name: 参数唯一标识符,对应计算图中的节点名称。 property shape: 张量形状。 property type:数据类型(如tensor(float32)/tensor(int64)) 以下是获取输入名称和输出名称的示例。 input_name = session.get_inputs()[0].name output_names = [output.name for output in session.get_outputs()] 详细请参考:https://onnxruntime.ai/docs/api/python/api_summary.html YOLOv5运行示例 加载模型 session_options = ort.SessionOptions() session_options.intra_op_num_threads = 1 # 加载 ONNX 模型 session = ort.InferenceSession( "yolov5_n.q.onnx", sess_options=session_options, providers=["XXXExecutionProvider"]) 创建SessionOptions对象用于定制化会话行为,限制算子内部并行线程数为1,加载名为yolov5_n.q.onnx的量化版YOLOv5模型,指定自定义执行提供者XXXExecutionProvider。 图像预处理 image = cv2.imread(args.image) #image shape (375, 500, 3) image_shape = image.shape[:2] #image_shape的值(375, 500),取前面2个值为图像的宽高 # 获取图像的尺寸大小高和宽。 input_tensor = preprocess(image) # 图像预处理函数 def preprocess(image, input_size=(640, 640)): # 调整图像大小为640*640, image = cv2.resize(image, input_size) # 转换颜色空间RGB image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # 归一化处理 #astype(np.float32)将图像像素值从整数类型(如uint8)转换为32位浮点数, #避免后续除法运算的精度损失,/ 255.0将像素值从[0,255]的原始范围线性映射到[0,1]区间, #符合神经网络输入的典型数值范围要求 image = image.astype(np.float32) / 255.0 # 转置模型为CHW格式,原输入是HWC格式。 # 输入的数据是(640,640,3),需要调整为NCHW模型格式 [batch, channel, height, width] # 使用np.transpose进行转置,变换成(3,640,640) image = np.transpose(image, (2, 0, 1)) image = np.expand_dims(image, axis=0) # 接着再加上一个轴变化成(1,3,640,640)tensor。 return image 如何理解深度学习中的轴了? 在深度学习中,轴可以理解为维度。如上图是一个NCHW排布的格式,把N当成第一个维度即位轴0,C第二维度即为轴1,H第三维度即为轴2,W为第四维度即为轴3。np.expand_dims(image, axis=0)即拓展了轴0,原来只有3个维度现在变成4个维度了,N为1。还可以按照指定的轴进行求和,即做压缩。执行np.sum(data, axis=0)时,也就是沿着N的维度就行压缩求和,就变成如上图。由原来的(N,C,W,H)变成了(C',W',H'),即N个CWH中的各自相加。如果是np.sum(data,axis=1),那就是按照C维度方向进行相加,结果就是(N,W,H),即如RGB格式就是每个图像RGB 3通道的像素相加,如下图所示。 模型推理 模型推理前,需要获取计算图输入和输入的名称 input_name = session.get_inputs()[0].name output_names = [output.name for output in session.get_outputs()] print('input name', input_name) print('output name', output_names) 输出结果与下图对应。 input name input output name ['dets', 'labels'] 获取到intput_name和ouput_names后,即可调用运行推理。 outputs = session.run(output_names, {input_name: input_tensor}) 模型后处理 # 把batch这个维度去掉 dets = outputs[0].squeeze() labels_pred = outputs[1].squeeze() #将坐标进行缩放以适应实际图片的大小。 input_size = (640, 640) scale_x = image_shape[1] / input_size[0] scale_y = image_shape[0] / input_size[1] dets[:, 0] *= scale_x dets[:, 1] *= scale_y dets[:, 2] *= scale_x dets[:, 3] *= scale_y 模型outputs有两个输出,一个是dets,这是一个二位数组dets[n][5],其中det[5]包含了坐标x1, y1, x2, y2,score,前面4个预选框的坐标,后面一个为预选框的分数。 def visualize_results(image, dets, labels_pred, labels, conf_threshold): for i in range(len(dets)): det = dets[i] score = det[4] #每个框的分数 if score > conf_threshold: #小于分数的剔除 class_id = int(labels_pred[i]) x1, y1, x2, y2 = map(int, det[:4]) label = labels[class_id] cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText(image, f'{label}: {score:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) return image 根据阈值分数进行画框,最终完成结果的后处理,注意上面并没有进行极大值抑制。
  • 密码保护:端侧vscode AI开发环境搭建

    密码保护:端侧vscode AI开发环境搭建

    此内容受密码保护。如需查阅,请在下列字段中输入您的密码。 密码:
  • llama.cpp部署大模型

    llama.cpp部署大模型

    安装llama.cpp 从GitHub上下载官方的源码。 git clone https://github.com/ggml-org/llama.cpp.git cd llama.cpp 使用camke进行编译,先创建build环境 cmake -B build 发现有报错curl没有安装。 -- The C compiler identification is GNU 11.3.0 -- The CXX compiler identification is GNU 11.3.0 -- Detecting C compiler ABI info -- Detecting C compiler ABI info - done -- Check for working C compiler: /usr/bin/cc - skipped -- Detecting C compile features -- Detecting C compile features - done -- Detecting CXX compiler ABI info -- Detecting CXX compiler ABI info - done -- Check for working CXX compiler: /usr/bin/c++ - skipped -- Detecting CXX compile features -- Detecting CXX compile features - done -- Found Git: /usr/bin/git (found version "2.34.1") -- Looking for pthread.h -- Looking for pthread.h - found -- Performing Test CMAKE_HAVE_LIBC_PTHREAD -- Performing Test CMAKE_HAVE_LIBC_PTHREAD - Success -- Found Threads: TRUE -- Warning: ccache not found - consider installing it for faster compilation or disable this warning with GGML_CCACHE=OFF -- CMAKE_SYSTEM_PROCESSOR: x86_64 -- GGML_SYSTEM_ARCH: x86 -- Including CPU backend -- Found OpenMP_C: -fopenmp (found version "4.5") -- Found OpenMP_CXX: -fopenmp (found version "4.5") -- Found OpenMP: TRUE (found version "4.5") -- x86 detected -- Adding CPU backend variant ggml-cpu: -march=native -- Could NOT find CURL (missing: CURL_LIBRARY CURL_INCLUDE_DIR) CMake Error at common/CMakeLists.txt:85 (message): Could NOT find CURL. Hint: to disable this feature, set -DLLAMA_CURL=OFF 使用apt-get安装libcur14,如下。 sudo apt-get update sudo apt-get install libcurl4-openssl-dev 安装curl成功后,解决了,继续执行cmake -B build,会生成build目录。 cmake -B build -- Warning: ccache not found - consider installing it for faster compilation or disable this warning with GGML_CCACHE=OFF -- CMAKE_SYSTEM_PROCESSOR: x86_64 -- GGML_SYSTEM_ARCH: x86 -- Including CPU backend -- x86 detected -- Adding CPU backend variant ggml-cpu: -march=native -- Found CURL: /usr/lib/x86_64-linux-gnu/libcurl.so (found version "7.81.0") -- Configuring done -- Generating done -- Build files have been written to: /root/autodl-tmp/llama.cpp/build 接着llama.cpp的源码。 cmake --build build --config Release 编译完成之后,生成的二进制都在llama.cpp/build/bin目录下。 模型下载 使用wget下载模型。 wget https://huggingface.co/bartowski/Llama-3.2-3B-Instruct-GGUF/resolve/main/Llama-3.2-3B-Instruct-Q8_0.gguf llamap.cpp只能使用GGUF格式的大模型,使用的模型可以在Hugging Face获取https://huggingface.co/。也可以在modelscope上获取https://modelscope.cn/models。 这里有个技巧,可能仓库里面有很多量化参数的模型,如果使用git全部clone下来会比较久,这里可以只下载指定的GGUF模型,点击要使用的模型,如下: 然后,获取到下面的下载链接。 如果是modelsscope,找到下载,然后鼠标长按左键不松手拖到上面的输入网址框获取到下载链接。 这样就可以使用wget进行下载了。 wget https://huggingface.co/Qwen/Qwen2.5-0.5B-Instruct-GGUF/resolve/main/qwen2.5-0.5b-instruct-q8_0.gguf wget https://modelscope.cn/models/Qwen/Qwen2.5-3B-Instruct-GGUF/resolve/master/qwen2.5-3b-instruct-q8_0.gguf 模型测试 运行大模型 ./llama.cpp/build/bin/llama-cli -m model/Llama-3.2-3B-Instruct-Q8_0.gguf 运行日志如下,可以看到使用的是CPU,没有使用GPU,因为前面编译的时候没有使能CUDA。 llama_perf_sampler_print: sampling time = 8.06 ms / 80 runs ( 0.10 ms per token, 9920.63 tokens per second) llama_perf_context_print: load time = 1070.39 ms llama_perf_context_print: prompt eval time = 859.42 ms / 15 tokens ( 57.29 ms per token, 17.45 tokens per second) llama_perf_context_print: eval time = 20880.31 ms / 65 runs ( 321.24 ms per token, 3.11 tokens per second) llama_perf_context_print: total time = 37979.41 ms / 80 tokens load time: 模型加载时间,耗时1070.39ms,属于一次性开销,与模型大小和硬件I/O性能相关。 prompt eaval time: 有些也称为prefill(TPS),表示提示词处理时间,处理15个输入Token耗时859.42ms,平均57.29ms/Token,速度17.45 Token/s。 eval time:有些也称为decode (TPS), 表示生成推理时间,生成65个Token耗时20880.31ms,平均321.24ms/Token,速度仅3.11 Token/s,显著低于采样阶段的9920.63 Token/s,说明生成阶段存在计算瓶颈。 sampling time: 采样80次仅8.06ms,速度高达9920.63 Token/s,表明采样算法本身效率极高,非性能瓶颈。 total time: 输入到输出的总耗时,包括模型加载时间、提示词处理时间、生成推理时间,其他时间(可能含内存交换或调度延迟) 可以使用vscode的打开多个终端,一个执行大模型交互,一个使用htop看看CPU和内存使用情况。 从上面看输入是17.45 token/s,输出是3.11 token/s,速度还是比较慢。 没有使用GPU,都是用cpu在推理。那么怎么使能使用gpu了?使用下面的方式,构建编译的时候打开CUDA,然后重新编译试一下。要用多线程编译,否则编译贼慢。 cd llama.cpp cmake -B build -DGGML_CUDA=ON cmake --build build --config Release -j16 重新运行模型后,看到硬件信息用了GPU了。 llama_perf_sampler_print: sampling time = 10.88 ms / 105 runs ( 0.10 ms per token, 9649.85 tokens per second) llama_perf_context_print: load time = 959.88 ms llama_perf_context_print: prompt eval time = 573.18 ms / 14 tokens ( 40.94 ms per token, 24.43 tokens per second) llama_perf_context_print: eval time = 17212.83 ms / 91 runs ( 189.15 ms per token, 5.29 tokens per second) llama_perf_context_print: total time = 34584.56 ms / 105 tokens 输出token有提升,但是看起来不明显,为啥了?
  • transformer

    transformer

    模型结构 transform使用了自注意力机制,由编码器和解码器组成。 编码器 transformer的编码器输入一排向量,输出另外一排同样长度的向量。transformer的编码中加入了残差连接和层归一化,其中N X表示重复N此。首先在输入的地方需要加上位置编码,经过自注意力处理后,再嘉盛残差连接和层归一化。接下来经过全连接的前馈神经网络,再做一次残差连接和层归一化,这就是一个完整的块输出,而这个块重复N此。 上图中的块就是前面说的多头注意力+残差连接和层归一化+全连接前馈神经网络等组成。编码器可以理解为就是对输入进行编码处理。 解码器 解码器分为自回归解码和非自回归解码。 自回归解码(Autoregressive,AT) 以语言识别为例,输入一段声音,输出一串文字。 首先,将一段"机器学习"的音频输入给编码器,编码器会输出一排向量。然后将这一排向量送入到编码器中。 其次,解码器输入一个代表开始的特殊符号BOS(Begin of Sequence),这是一个特殊的词元(token),代表开始。编码器读入BOS后,就会输出一个向量。这个向量代表了词表中每个词的概率,跟分类一样,经过了softmax操作,总和为1。向量的长度和词表一样大,每个中文一对应一个分值。 接着,在向量中挑选分数最高的作为解码器的第一个输出。这里应该就是"机"。 最后,把编码器的输出"机"当成解码器新的输入,输入为特殊符号"BOS"和"机",解码器同样输出一个向量,这个向量里面给出了每一个中文字的分数,这里应该是"器"分数最高,这个过程反复持续下去。 上面的运作过程中,解码器把上次的输出当做输入反复下去,那么如何让解码器停止了?要让解码器停止,也需要准备一个特别的结束符号"EOS",当产生完"习"之后,再把"习"当做编码器输入以后,解码器要能够输出"EOS",这个EOS的概率必须最大,输出了EOS,整个解码产生的序列就结束了。 总结一下,自回归模型就是,解码器先读入编码的输入,然后输入BOS,输出W1,再把W1当做输入,再输出W2,直到输出EOS为止。 非回归解码(NAT) 自回归编码是根据上次解码器的输入一个字一个字的输出,假设要输入长度一百个字的句子,就需要做一百次的解码。那能不能一次性全部输出了?这就是非自回归解码器,假设产生的是中文的句子,非自回归不是一次产生一个字,而是一次把整个具体都产生出来。那要怎么做了,有两个做法。 方法1:用分类器来解决,将编码器的的输出结果先给分类器,分类器得到一个数字,这个数字达标的是解码器要输出的长度,比如输入是5,非自回归的解码器就是吃5个BOS,这样就产生了5个中文的子。 方法2:给编码器一堆的BOS词元,因为输出的句子有上限,假设不超过300个字,那就输入300个BOS,解码器就输出300个字,输出句子中EOS右边的输出就裁掉。 简单来说,非自回归解码是一次性输出句子,与自回归解码不同的是,非字回归解码输入的全是BOS,而自回归解码输入的是上一轮的输出。 transform的训练 既然要训练,就要去衡量误差,这个误差怎么衡量了? 解码器的输出一个是概率分布,以输出的"机"为例,当输入"BOS"的时候,输出的答案应该要跟"机"这个向量越接近越好。 参考书籍:《深度学习详解》
  • 自注意力机制

    自注意力机制

    运作原理 自注意力机制要解决的是让机器根据输入序列能根据上下文来理解。举个例子,输入句子为"我有一个苹果手机",对于机器来说这里的"苹果"应该是指水果还是手机品牌了?所以要解决这个问题,就需要在上下文中去理解,那怎么在上下文中去理解了?那就是由句子中的其他词对于施加权重,让"苹果"更靠近"手机"。具体怎么做了?来看看下面的图。 上图中的a1~a4是输入的词,每个输入的词都需要跟句子中的其他词做运算得到一个输出b1~b4。如a1要得到b1,那么a1需要与a2、a3、a4输入的词进行相关运算得到b1,同理其他a2、a3、a4对应输出b2、b3、b4。注意这里a1到b1的输出并不是a1与其他a2~a4的简单相乘或相加,那具体是怎么个相关运算了? 计算向量关联程度的方法有点积和相加,目前比较常用的是点积。下面以点积来进行说明。在自注意力模型中,采样查询-键-值(Query-Key-Value)的模式。主要分为3个步骤,分别是计算QK内积、再计算V向量、最后加权得到b。 QK内积 q: q称为查询,就是使用搜索引擎查找相关文章的关键字。q的计算方式为输入乘上Wq矩阵得到,如把a1乘上Wq得到q1。 k: k称为键值,输入乘上Wk得到向量k。如a2,a3,a4乘以Wk得到k2,k3,k4。 qk:把q和k做点积就得到a12,a13,a14,即表征a1与a2,a3,a4之间的关联性了。 通常情况下,得到最终的qk内积结果(记为axx)会进行一次归一化处理得到a',可以使用softmax也可以使用别的激活函数,如下图所示。 最终处理的结果a'表示的是输入a1与其他a2~a4存在的关联性分数,也称为注意力分数,也可以说是一个权重值,上下文中其他的词对a1最终词的解释权重。 V向量 qk内积计算了注意力分数,那接下来需要根据注意力的分数提取出信息得到最终的b。那么要进行提取,那必然需要先获取到其他词的特征信息,怎么获取了,获取的方式非常简单,就是让各自输入乘以Wv矩阵得到一个向量V。比如a1乘以Wv得到V1,a2乘以Wv得到V2。 加权和b 得到了各自的注意力分数qk,也获取到了各自输入的特征信息,最后就可以计算最终的输出b了。公式为: $b^1 = \sum_{i} \alpha_{1,i}' v^i$ 。就是特征信息V和注意力分数进行相乘,然后把所有结果加起来。 如果a1和a2的关联性很强,那么a12'的值就大,跟V2相乘值对应也就大,这样b1的值就可能比较接近V2。所以谁的注意力分数越大,谁的V就会主导抽出的结果。 小结 上面通过以a1进行相关运算后输出b1过程,a2、a3、a4计算过程同理,同时输入的各自计算是并行的,不需要各自依赖,这也是与RNN的本质区别。同时计算过程中出现的Wq、Wk、Wv都是要学习的参数。而在实际过程中,并行运算都是通过矩阵的方式进行的,这里就不再过多阐述了。 多头注意力,所谓多头注意力,就是对应的qk有多个,也就是说W参数也有多个。 位置编码,在计算QKV的时候,引入位置编码,让输入的位置也占一定的权重。 参考书籍:《深度学习详解》
  • 密码保护:YOLOv5端侧部署代码分析

    密码保护:YOLOv5端侧部署代码分析

    此内容受密码保护。如需查阅,请在下列字段中输入您的密码。 密码:
  • 端侧部署YOLOv5模型

    端侧部署YOLOv5模型

    导出 ONNX模型 python export.py --weights runs/train/exp2/weights/ NPU不支持动态输入,使用onnxim工具进行转换为固定输入,先安装onnxsim工具。 pip install onnxsim -i https://pypi.doubanio.com/simple/ 接着进行转换 python -m onnxsim runs/train/exp2/weights/best.onnx yolov5s-sim.onnx --input-shape 1,3,640,640 模型裁剪 在实际端侧中,NPU端量化的后处理运算不适合使用uint8量化,一般使用float的混合量化,但这样相对麻烦,本文示例将后处理放在CPU测进行,所以我们需要把下图中的后处理部分裁剪掉。 import onnx onnx.utils.extract_model('./yolov5s-mask.onnx', './yolov5s-mask-rt.onnx', ['images'], ['/model.24/Reshape_output_0','/model.24/Reshape_8_output_0','/model.24/Reshape_16_output_0']) 使用上面的python可以进行裁剪,输入为[images],输出为3个节点,下面是最后一个节点的示例截图。实际要根据模型文件进行调整。 python extrat-mask.py 接着使用上面的命令,就输出了裁剪后的模型yolov5s-mask-rt.onnx。 如果原生的模型没有将后处理裁剪,输入输出如下: 输出的tensor[1,25200,85],其中25200=3x(20x20+40x40+80x80),即3个特征图一共25200个先验框。 原生模型使用裁剪后使用netron.app打开得到输入输出如下: YOLOv5模型参数含义详细解释如下: 模型名称: 表示模型或图结构的名称,此处为从主图结构中提取的名称。 输入参数:float32[1,3,640,640],表示输入张量的数据类型和维度。1表示一次处理的样本,3为通道数,输入的高宽均为640,此前YOLOv3是416。 输出参数:有3个输出张量。 -- /model.24/Reshape_output_0:表示8倍降采样率,输出为float32[1,3,85,80,80],网格划分为80x80,每个网格有3给先验框,每个先验框预测包含85个元素(坐标4、置信度1、类别80)。 -- /model.24/Reshape_8_output_0:float32[1,3,85,40,40],网格尺寸为40x40。 -- /model.24/Reshape_16_output_0:float32[1,3,85,20,20],网格尺寸为20x20。 模型参数数量:parameter参数为7225908,表示模型中参数的总数,这是模型复杂度和计算资源需求的重要指标。 再来看看此次针对口罩微调后裁剪后的模型输入输出,使用netron.app打开得到输入输出如下: 上图可以看到,网格划分、先验框数量是一样的,不一样的是预测的元素为8(坐标4,置信度1,类别3)。 创建端侧转换环境 sudo docker images sudo docker run --ipc=host -itd -v /home/xxx/ai/docker_data:/workspace --name laumy_npu_v1.8.x ubuntu-npu:v1.8.11 /bin/bash sudo docker ps -a sudo docker exec -it 55f9cd9eb15e /bin/bash 在进行端侧部署前,需要准备量化环境,这里直接使用的是docker环境。 模型转换 创建目录 |-- data | |-- maksssksksss0.png | |-- maksssksksss1.png | |-- maksssksksss10.png | |-- maksssksksss11.png | |-- maksssksksss12.png | |-- maksssksksss13.png | |-- maksssksksss14.png | |-- maksssksksss15.png | |-- maksssksksss16.png | |-- maksssksksss17.png | |-- maksssksksss18.png | |-- maksssksksss19.png | |-- maksssksksss2.png | |-- maksssksksss3.png | |-- maksssksksss4.png | |-- maksssksksss5.png | |-- maksssksksss6.png | |-- maksssksksss7.png | |-- maksssksksss8.png | `-- maksssksksss9.png |-- dataset.txt `-- yolov5s-mask-rt.onnx 准备数据量化的数据data、数据配置dataset.txt(内容如下)、裁剪的模型yolov5s-mask-rt.onnx。 ./data/maksssksksss0.png ./data/maksssksksss1.png ./data/maksssksksss2.png ./data/maksssksksss3.png ./data/maksssksksss4.png ./data/maksssksksss5.png ./data/maksssksksss6.png ./data/maksssksksss7.png ./data/maksssksksss8.png ./data/maksssksksss9.png ./data/maksssksksss10.png ./data/maksssksksss11.png ./data/maksssksksss12.png ./data/maksssksksss13.png ./data/maksssksksss14.png ./data/maksssksksss15.png ./data/maksssksksss16.png ./data/maksssksksss17.png ./data/maksssksksss18.png ./data/maksssksksss19.png 模型导入 pegasus import onnx --model yolov5s-mask-rt.onnx --output-data yolov5s-mask-rt.data --output-model yolov5s-mask-rt.json 模型导入将输出yolov5s-mask-rt.json和yolov5s-mask-rt.data文件。前者为后期量化需要的网络结构文件,后者为模型网络权重文件。 前后处理配置文件yml 生成前处理配置文件 pegasus generate inputmeta --model yolov5s-mask-rt.json --input-meta-output yolov5s-mask-rt_inputmeta.yml 生成后处理配置文件,因为后处理是在cpu上处理并且我们已经裁剪掉了onn模型的后处理,可以不生成。 pegasus generate postprocess-file --model yolov5s-mask-rt.json --postprocess-file-output yolov5s-mask-rt_postprocess_file.yml 上面根据模型网络结构文件yolov5s-mask-rt.json转化生成后续量化需要的前处理和后处理描述文件,格式为yml格式。 input_meta: databases: - path: dataset.txt #表示模型量化需要的数据描述文件 type: TEXT ports: - lid: images_205 #表示输入节点名称。 category: image dtype: float32 sparse: false tensor_name: layout: nchw #输入数据的排列格式,n表示batch,c表示channel,h表示高,w表示宽 shape: #模型输入的形状 - 1 #输入数据的batch,如果后面量化的batch参数不为1,需要改这里。 - 3 - 640 - 640 fitting: scale preprocess: reverse_channel: true mean: - 0 - 0 - 0 scale: #3通道的缩放值,yolov5s需要改为0.00392157 - 1.0 - 1.0 - 1.0 preproc_node_params: add_preproc_node: false #是否添加预处理节点,用于格式转化和裁剪,这里要改为true preproc_type: IMAGE_RGB #预处理输入的格式 preproc_image_size: - 640 - 640 preproc_crop: enable_preproc_crop: false crop_rect: - 0 - 0 - 640 - 640 preproc_perm: - 0 - 1 - 2 - 3 redirect_to_output: false 对于模型的输入yml,需要将scale修改为0.00392157,同时默认使用cpu预处理图像,所以add_preproc_node设置为true。 量化 pegasus quantize --model yolov5s-mask-rt.json --model-data yolov5s-mask-rt.data --device CPU --iterations=12 --with-input-meta yolov5s-mask-rt_inputmeta.yml --rebuild --model-quantize yolov5s-mask-rt.quantize --quantizer asymmetric_affine --qtype uint8 下面是量化模型的参数解析: model: 模型的网络结构文件 model-data:模型需要的权重文件 with-input-meta:模型需要前处理配置文件 model-quantize: 输出的模型文件 iterations: 模型量化使用的数据量,设置1(默认)使用dataset第一条数据。若设置20,会先遍历dataset.txt中的20行数据。 qtype:量化数据类型,有int8,uint8,int16等。 batch_size: 量化多少轮,如果不为1,需要改输入的yml文件shape,先用默认。 量化后,会生成量化文件yolov5s-mask-rt.quantize。 推理 pegasus inference --model yolov5s-mask-rt.json --model-data yolov5s-mask-rt.data --dtype quantized --model-quantize yolov5s-mask-rt.quantize --device CPU --with-input-meta yolov5s-mask-rt_inputmeta.yml --postprocess-file yolov5s-mask-rt_postprocess_file.yml 模型推理是为了验证量化后的模型效果,这步可省略。 导出模型 pegasus export ovxlib --model yolov5s-mask-rt.json --model-data yolov5s-mask-rt.data --dtype quantized --model-quantize yolov5s-mask-rt.quantize --save-fused-graph --target-ide-project 'linux64' --with-input-meta yolov5s-mask-rt_inputmeta.yml --output-path ovxilb/yolov5s-mask-rt/yolov5s-simprj --pack-nbg-unify --postprocess-file yolov5s-mask-rt_postprocess_file.yml --optimize "VIP9000PICO_PID0XEE" --viv-sdk ${VIV_SDK} 模型导出,最终会生成ovxilb/yolov5s-mask-rt_nbg_unify/network_binary.nb 端侧部署 修改端侧后处理的分类名和数量配置文件。 改完之后,执行./build_linux.sh -t \编译生成端侧的应用,将可执行应用推到设备端。 ./yolov5 -b network_binary.nb -i mask-test.jpeg model_file=network_binary.nb, input=mask-test.jpeg, loop_count=1, malloc_mbyte=10 input 0 dim 3 640 640 1, data_format=2, quant_format=0, name=input[0], none-quant output 0 dim 80 80 8 3, data_format=0, name=uid_20000_sub_uid_1_out_0, none-quant output 1 dim 40 40 8 3, data_format=0, name=uid_20001_sub_uid_1_out_0, none-quant output 2 dim 20 20 8 3, data_format=0, name=uid_20002_sub_uid_1_out_0, none-quant nbg name=network_binary.nb, size: 7196096. create network 0: 35626 us. prepare network: 38972 us. buffer ptr: 0xb6996000, buffer size: 1228800 feed input cost: 94526 us. network: 0, loop count: 1 run time for this network 0: 119081 us. detection num: 1 1: 94%, [ 316, 228, 541, 460], 1 draw objects time : 154 ms destory npu finished. ~NpuUint. 参考: https://v853.docs.aw-ol.com/en/npu/dev_npu/ https://blog.csdn.net/weixin_42904656/article/details/127768309
  • 云服务器搭建YOLOv5训练环境

    云服务器搭建YOLOv5训练环境

    介绍 本文使用AutoDL云服务搭建YOLOv5的运行环境。 获取云服务器 在这个链接上https://www.autodl.com/home订阅服务,这里选择的是按量计费。 镜像选择基础镜像Mniconda最新ubuntu环境。 交钱订阅完成后就可以获取到登录的信息了。 这里使用的是ssh工具根据获取到的登录名和密码进行登录,需要注意的是端口可能不是默认的22,按照实际的端口进行。 配置conda环境 由于autoDL的服务器可能并不能访问外网,这里先将conda的源更换为清华的源。 conda config --remove-key channels conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/ conda config --set show_channel_urls yes pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple 接下来创建和激活虚拟环境 conda create -n yolov5 python==3.8.5 # 创建虚拟环境名称为yolov5, python版本为3.8.5 conda activate yolov5 # 激活yolov5环境 conda init # 如果提示没有初始化conda环境的话,执行conda init后退出控制台重新登录,再次激活。 conda env list # 通过上面的命令可以查看当前创建的conda虚拟环境 拉取YOLOv5代码环境 通过官网链接获取YOLOv5的代码。 cd autodl-tmp/ #这里先切换到audodl-tmp目录,这个空间比较大,读写也比较快。 git clone https://github.com/ultralytics/yolov5 # 在github上拉取代码 如果要在本地使用vscode查看代码的话,可以参考:https://www.autodl.com/docs/vscode/ 这里需要注意的是,audoDL可能没有访问GitHub,处理的办法就是参考这个方法在服务器上面做一个代理,https://github.com/VocabVictor/clash-for-AutoDL 或者https://gitee.com/laumy0929/clash-for-AutoDL 获取到CLASH_URL如下示例: 截屏2025-07-17 12.34.36 截屏2025-07-17 12.39.15 按照yolov5的代码环境需要的包 pip install -r requirements.txt 安装完成后,进行推理测试,看看环境是否安装正常,执行detect.py 指定模型文件和输入图片进行测试。 python detect.py --weights yolov5s.pt --source data/images/bus.jpg 执行上面脚本是,会自动拉取预训练的模型yolov5s.pt,最终将推理结果存储到runs/detect/exp2目录下。 训练自定义数据集 这里从GitHub上抓了一个口罩的书籍集,https://github.com/iAmEthanMai/mask-detection-dataset.git,使用git clone拉取到本地。 git clone https://github.com/iAmEthanMai/mask-detection-dataset.git 查看mask的数据集配置,有3个分类。这里使用yolov5s进行微调,复制一份yolov5的模型配置文件,并修改分类为3,需要根据实际的数据集存放的位置调整一下路径。 cp models/yolov5s.yaml models/mask_yolov5s.yaml 配置完成后就可以进行训练了。 python train.py --data mask-detection-dataset/data/data.yaml --cfg models/mask_yolov5s.yaml --weights pretrained/yolov5s.pt --epoch 100 --batch-size 4 模型文件将输出到:runs/train/exp5/weights/ python detect.py --weights runs/train/exp5/weights/best.pt --source mask-detection-dataset/data/images/maksssksksss1.png 上面执行命令使用训练的模型文件测试看看效果。
  • YOLOv2和YOLOv3

    YOLOv2和YOLOv3

    YOLOv2 回顾一下YOLOv1有哪些缺陷? 边界框训练时回归不稳定,导致定位误差大。 每个网格只能预测两个边界框且只能识别一类目标。 小目标检测效果差。 针对以上的问题,YOLOv2进行了改进,下面从检测机制优化、网络结构优化、训练策略优化3个维度进行。 检测机制优化 锚框(Anchor Box)机制 YOLOv1每个网格只会预测一个目标,因为每个网格预测的B个边界框的类别概率都是共享的,要是有多2个目标的中心都落在了一个网格中,那么有一个目标就没法预测了。怎么解决了? 让每个边界框都对应一个类别概率,这样就能做到每个网格可以预测多个目标了。 每个边界框训练是没有基准的,这样训练的时候就很不稳定。如果预先定义边界框,使训练的时候按照这些预定义的边界框作为基准进行训练调整。这里做个类比来理解,假设我们的目标坐标是(8,8),那么如果没有设置基准,从坐标(0,0)找到(8,8)就相对比较远,那假设我们从(6,6)这个基准开始找,那找到(8,8)的概率就大了。 上面预先定义的边界框就称为先验框(anchor Box),那么新的问题来了,这个anchor Box我们每个网格设置多少个?设置什么样的形状了?实际的数据集中Ground Truth(真实标签)边界框有些是长方形、有些是正方形。 YOLOv2使用了K均值聚类算法用于生成先验框(Anchor Boxes),其核心目标是从训练数据中自动学习边界框的尺寸和比例,替代人工预设的锚框,从而提升检测召回率与定位精度。YOLOv2通过聚类COCO数据集,得到5个先验框尺寸(如(0.25,0.33), (0.5,0.75)...),覆盖常见物体形状。 pw,ph是先验框anchor的宽高(根据K类均值聚类得来),tx,ty,tw,th模型预测的偏移量(训练得到)。所以通过pw,ph,tx,ty,tw,th就可以计算出实际要预测的框bx,by,bw,bh。模型最终预测出tx,ty,tw,th这四个值就可以计算出bx,by,bw,bh。 这里需要注意 YOLOv2中对tx,ty进行了sigmod归一化,防止训练初期,中心点数值极大训练不稳定。 YOLOv2预测边界框的宽和高初始值是基于先验框而来,是模型对每个锚框输出宽高缩放因子(t_w, t_h),通过聚类生成的先验框指数变换得到最终宽高。而YOLOv1是根据图像实际宽高缩放而来。 全卷积网络与先验框 YOLOv1最后阶段使用的是全连接层,使用全连接层不仅仅参数量大,同时会将先前的特征图包含的空间信息破坏,在YOLOv2中改成了全卷积结构。 可以看到输出也发生了变化YOLOv1的输出是7 x 7 x (1+4+1+4+20),而YOLOv2输出是13 x 13 x k x (1 + 4 + 20),这里的k是每个网格的先验框数量,一般为5。(1+4+20)分别是锚框的置信度、边界框坐标、类别概率,每个anchor先验框都对应一个(1+4+20),也就是说每个先验框可以检测一个目标,这样就解决了YOLOv1中每个网格只能检测一个目标的问题,YOLOv2中每个网格有5个先验框,就可以检测最多5个类别。 网络结构优化 加入批量归一化: YOLOv1中每层卷积都是由线性卷积+非线性激活函数组成,由于批量归一化得到越来越普遍的应用,并且效果较好,因此在YOLOv2每层卷积层都加入了批量归一化。所以卷积层就变成了线性卷积+归一化+非线性激活函数组成。 融合高分辨率特征图:YOLOv1输出的特征图是13 x 13 x 1024,分辨率越低丢失的特征就越多。为了解决这个问题,YOLOv2在第17层单独抽出一层26 x 26 x 512的特征图,然后通过特殊的降采样得到13 x 13 x 256的特征图,最后将这个13 x 13 x 256的特征图与前面13 x 13 x 1024的相加,这样达到提高特征信息保留。 多尺度训练 YOLOv2在训练上也做了进一步的优化,因为同一张图像,缩放到不同尺寸,不同尺寸包含的图像信息也不同。因此为了提高精度,引入了多尺度训练训练机制。 具体就是在训练网络时,对图像按照320、352、384、416、448、480、512、544、576、608等不同输入尺寸进行训练。 总结一下,YOLOv2针对YOLOv1的改进点有以下。 增加先验框机制:每个网格使用K类均值聚类预设K个先验框作为基准训练。每个先验框负责预测一个目标。 加入批量归一化:每个卷积层对训练数据做批量归一化处理。 高分辨率特征图:网格划分13 x 13,主干网络中抽离一路高分辨率特征图进行特殊处理然后再加回去。 对尺度训练:训练阶段使用不同尺度的图像数据进行训练。 YOLOv3 对应目标检测网络可以由主干网络、颈部网络、检测头。 主干网络:提取多尺度特征。通过卷积层、池化层等操作,将输入图像逐层抽象化,生成不同层级的特征图。有浅层特征和深层特征。浅层特征是保留细节(如边缘、纹理),适合小目标检测;深层特征是蕴含语义信息(如物体整体结构),适合大目标识别。 颈部网络:融合与优化特征。连接主干网络与检测头,整合不同层级的特征图,增强模型对不同尺度目标的感知能力。 检测头:执行具体检测任务。基于融合后的特征,输出目标的位置、类别及置信度。 YOLOv2对于小目标的检测还是不够精确,这一缺陷的主要原因是YOLOv2只使用了32倍的降采样率。 浅层卷积层:没有经过更多的卷积层处理,提出的语义信息较少,具有较浅的语义信息;但对应没有过多的降采样因此具备较多的位置信息。 深层卷积层:经过更多的卷积层处理,提取更多的语义信息;但是位置信息经过了太多的降采样,丢失了位置信息。 语义信息可以理解为是什么类别的物品,位置信息是这个物品在图中的什么坐标位置。浅层卷积层更适合检测小目标(语义信息不需要这么多),深层卷积层适合检测大目标(需要更多的语义信息)。根据这个认知,YOLOv3主干网络就使用了3个不同尺寸的特征图,分布对应的降采样是32、16、8倍。对于小尺度目标使用的是8倍将采样并在浅层网络进行先提出输出针对性处理。而大尺度目标使用32倍降采样在最深层的网络中进行输出。 输入是416 x 416的图像,输出的是3个特征图,分布是C1=B X 256 X 52 X 52;C2=B x 512 x 26 x 26;C3=B x 1024 x 13 x 13,这里的B是先验框,一般为3。针对输入图像做了52x52、26x26、13x13三种不同疏密度的网格。 3个特征图根据多级检测结构推理后,得到最终预测的3个结果。y1= B x (4 + 1 + Nc) x 13 x 13;y2= B x (4 + 1 + Nc) x26 x 26;y3= B x (4 + 1 + Nc) x 52 x 52; 其中B为先验框数量,一般是3。Nc是类别个数,根据实际数据集,YOLOv3使用的是COCO数据集,有80个类别。 参考: 书籍《YOLO目标检测》
\t