lerobot示教

启动

示教的功能主要是主臂控制,从臂跟随,在数据采集是非常的一环。下面是模块启动的执行命令:

python -m lerobot.teleoperate \
  --robot.type=so101_follower \
  --robot.port=/dev/ttyACM0 \
  --robot.id=R12252801 \
  --teleop.type=so101_leader \
  --teleop.port=/dev/ttyACM1 \
  --teleop.id=R07252608

python解释器回安装sys.path查找lerobot.teleoperate模块,找到执行lerobot/src/lerobot/teleoperate.py文件,调用teleoperate函数。

## 配置解析
@draccus.wrap()
def teleoperate(cfg: TeleoperateConfig):
    init_logging()
    logging.info(pformat(asdict(cfg)))
## 初始化可视化工具
    if cfg.display_data:
        _init_rerun(session_name="teleoperation")
##创建操作设备和机器人实例
    teleop = make_teleoperator_from_config(cfg.teleop)
    robot = make_robot_from_config(cfg.robot)
## 连接设备
    teleop.connect()
    robot.connect()
##示教循环
    try:
        teleop_loop(teleop, robot, cfg.fps, display_data=cfg.display_data, duration=cfg.teleop_time_s)
    except KeyboardInterrupt:
        pass
    finally:
        if cfg.display_data:
            rr.rerun_shutdown()
## 断开连接
        teleop.disconnect()
        robot.disconnect()


if __name__ == "__main__":
    teleoperate()

下面展开来说明一下。

配置解析

@draccus.wrap()
def teleoperate(cfg: TeleoperateConfig):
    # 初始化日志记录
    init_logging()
    # 打印配置信息
    logging.info(pformat(draccus.asdict(cfg)))

在teleoperate.py中teleoperate函数使用@draccus.wrap()装饰器解析命令行参数,将参数转换为TeleoperateConfig类型的配置对象。dracus是用于配置解析的库,类型与argparse。

这里的TeleoperateConfig是一个自定义的配置类,定义如下:

from dataclasses import dataclass

@dataclass
class RobotConfig:
    type: str
    port: str
    id: str

@dataclass
class TeleopConfig:
    type: str
    port: str
    id: str

@dataclass
class TeleoperateConfig:
    teleop: TeleoperatorConfig
    robot: RobotConfig
    # Limit the maximum frames per second.
    fps: int = 60
    teleop_time_s: float | None = None
    # Display all cameras on screen
    display_data: bool = False

可以看到TeleoperateConfig类中属性定义了robot,teleop两个对象,命令行的参数将会映射到,如:

  • --robot.type=so101_follower:对应cfg.robot.type
  • --teleop.port=/dev/ttyACM1:对应cfg.teleop.port

初始化可视化工具

    if cfg.display_data:
        _init_rerun(session_name="teleoperation")

当参数--display_data=true,即cfg.display_data 为 True,则调用 _init_rerun 函数初始化 rerun 可视化工具。rerun 是一个用于记录和可视化科学数据的工具。

def _init_rerun(session_name: str = "lerobot_control_loop") -> None:
    """Initializes the Rerun SDK for visualizing the control loop."""
    batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000")
    os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_size
    rr.init(session_name)
    memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10
    rr.spawn(memory_limit=memory_limit)

_init_rerun函数的主要功能是初始化Rerun SDK,设置数据刷新的字节数和内存使用限制,然后启动Rerun会话,为后续的控制循环可视化工作做准备。

其中有两个参数是从环境变量中获取,分别是RERUN_FLUSH_NUM_BYTES和LEROBOT_RERUN_MEMORY_LIMIT,用于控制每次刷新是处理数据的字节数和限制使用的内存量。

创建主从臂实例

teleop = make_teleoperator_from_config(cfg.teleop)
robot = make_robot_from_config(cfg.robot)

调用调用 make_teleoperator_from_config 和make_robot_from_config 函数,根据配置对象创建 SO101Leader遥操作设备实例和 SO101Follower 机器人实例。

def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
    if config.type == "keyboard":
        from .keyboard import KeyboardTeleop

        return KeyboardTeleop(config)
    elif config.type == "koch_leader":
        from .koch_leader import KochLeader

        return KochLeader(config)
    elif config.type == "so100_leader":
        from .so100_leader import SO100Leader

        return SO100Leader(config)
    elif config.type == "so101_leader":
        from .so101_leader import SO101Leader

        return SO101Leader(config)
    ......

以leader为例,最终调用SO101Leader对象实例。

class SO101Leader(Teleoperator):
    """
    SO-101 Leader Arm designed by TheRobotStudio and Hugging Face.
    """

    config_class = SO101LeaderConfig
    name = "so101_leader"

    def __init__(self, config: SO101LeaderConfig):
        super().__init__(config)
        #调用父类Teleoperator够着函数初始化属性,包括
        #self.id:设备的唯一标识符,从配置对象中获取。
        #self.calibration_dir:校准文件的存储目录。
        #self.calibration_fpath:校准文件的完整路径。
        #self.calibration:存储电机校准信息的字典。
        self.config = config
        #类型为 SO101LeaderConfig,存储传入的配置对象,方便在类的其他方法中访问配置信息。
        norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100
        self.bus = FeetechMotorsBus(
            port=self.config.port,
            motors={
                "shoulder_pan": Motor(1, "sts3215", norm_mode_body),
                "shoulder_lift": Motor(2, "sts3215", norm_mode_body),
                "elbow_flex": Motor(3, "sts3215", norm_mode_body),
                "wrist_flex": Motor(4, "sts3215", norm_mode_body),
                "wrist_roll": Motor(5, "sts3215", norm_mode_body),
                "gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
            },
            calibration=self.calibration,
        )
       #类型为 FeetechMotorsBus,用于与 Feetech 电机进行通信,初始化时传入端口、电机信息和校准信息。

SO101Leader类继承了Teleoperator,super().init(config) 调用父类 Teleoperator 的构造函数初始化的属性。

class Teleoperator(abc.ABC):
    def __init__(self, config: TeleoperatorConfig):
        self.id = config.id
        #设备的id
        self.calibration_dir = (
            config.calibration_dir
            if config.calibration_dir
            else HF_LEROBOT_CALIBRATION / TELEOPERATORS / self.name
        )
        #校准文件的存储目录
        self.calibration_dir.mkdir(parents=True, exist_ok=True)
        #创建校准目录
        self.calibration_fpath = self.calibration_dir / f"{self.id}.json"
        self.calibration: dict[str, MotorCalibration] = {}
        # 初始化一个字典,键为字符串,值为MotorCalibration
        if self.calibration_fpath.is_file():
            self._load_calibration()
        #检查校准文件是否存在,存在调用加载校准信息的方法。

Teleoperator定义了一个遥控的基类,其构造函数中接受一个TeleoperatorConfig类型的参数config,主要有以下属性。

  • self.id:设备的唯一标识符,从传入的配置对象 config 里获取,用于在系统中唯一标识该 SO101Leader 设备实例。
  • self.calibration_dir:校准文件的存储目录。若 config.calibration_dir 有值则使用该值,否则使用默认路径。此目录用于存放设备的校准文件。
  • self.calibration_fpath:校准文件的完整路径,由 self.calibration_dir 和设备 id 组合而成,文件格式为 JSON。
  • self.calibration:存储电机校准信息的字典,键为电机名称,值为 MotorCalibration 类型的对象,用于保存电机的校准参数。

而SO101Leader类中自己定义的实力属性有
- self.config:类型为 SO101LeaderConfig,存储传入的配置对象。借助这个属性,类的其他方法可以访问配置信息,如端口号、是否使用角度单位等。
- self.bus:类型为 FeetechMotorsBus,用于和 Feetech 电机进行通信。初始化时传入端口、电机信息和校准信息,后续可以通过该对象实现电机的读写操作、校准等功能。

总结下就是SO101Leader 类包含 2 个类属性和多个实例属性,这些属性从不同层面描述了 SO101 领导者手臂设备的类型、配置、校准信息以及与电机的通信对象等。而SO101Follower也是同理这里就不过多阐述。

连接设备

    teleop.connect()
    robot.connect()

连接设备就是各自调用此前创建实例的connect方法,建立与硬件设备的连接。以teleop为例,最总调用到self.bus.connect发起连接并进行配置。

    def connect(self, calibrate: bool = True) -> None:
        if self.is_connected:
            raise DeviceAlreadyConnectedError(f"{self} already connected")

        self.bus.connect()
        if not self.is_calibrated and calibrate:
            self.calibrate()

        self.configure()
        logger.info(f"{self} connected.")

跟随控制

def teleop_loop(
    teleop: Teleoperator, robot: Robot, fps: int, display_data: bool = False, duration: float | None = None
):
    display_len = max(len(key) for key in robot.action_features)
    start = time.perf_counter()
    while True:
        loop_start = time.perf_counter()
        #从主臂获取动作位置数据
        action = teleop.get_action()
        #如果启动了实时显示,进行显示
        if display_data:
            observation = robot.get_observation()
            log_rerun_data(observation, action)
        #将主臂的动作位置数据发送给从臂
        robot.send_action(action)
        #计算循环执行时间
        dt_s = time.perf_counter() - loop_start
        busy_wait(1 / fps - dt_s)
        loop_s = time.perf_counter() - loop_start

        #打印动作的信息,每个舵机的位置。
        print("\n" + "-" * (display_len + 10))
        print(f"{'NAME':<{display_len}} | {'NORM':>7}")
        for motor, value in action.items():
            print(f"{motor:<{display_len}} | {value:>7.2f}")
        print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")

        if duration is not None and time.perf_counter() - start >= duration:
            return
         # 将终端的文本光标向上移动一定的行数,以此实现覆盖上一次输出内容的效果,从而在终端里模拟实时更新的显示
        move_cursor_up(len(action) + 5)