lerobot示教
- lerobot
- 4天前
- 34热度
- 0评论
启动
示教的功能主要是主臂控制,从臂跟随,在数据采集是非常的一环。下面是模块启动的执行命令:
python -m lerobot.teleoperate \
--robot.type=so101_follower \
--robot.port=/dev/ttyACM0 \
--robot.id=R12252801 \
--teleop.type=so101_leader \
--teleop.port=/dev/ttyACM1 \
--teleop.id=R07252608
python解释器回安装sys.path查找lerobot.teleoperate模块,找到执行lerobot/src/lerobot/teleoperate.py文件,调用teleoperate函数。
## 配置解析
@draccus.wrap()
def teleoperate(cfg: TeleoperateConfig):
init_logging()
logging.info(pformat(asdict(cfg)))
## 初始化可视化工具
if cfg.display_data:
_init_rerun(session_name="teleoperation")
##创建操作设备和机器人实例
teleop = make_teleoperator_from_config(cfg.teleop)
robot = make_robot_from_config(cfg.robot)
## 连接设备
teleop.connect()
robot.connect()
##示教循环
try:
teleop_loop(teleop, robot, cfg.fps, display_data=cfg.display_data, duration=cfg.teleop_time_s)
except KeyboardInterrupt:
pass
finally:
if cfg.display_data:
rr.rerun_shutdown()
## 断开连接
teleop.disconnect()
robot.disconnect()
if __name__ == "__main__":
teleoperate()
下面展开来说明一下。
配置解析
@draccus.wrap()
def teleoperate(cfg: TeleoperateConfig):
# 初始化日志记录
init_logging()
# 打印配置信息
logging.info(pformat(draccus.asdict(cfg)))
在teleoperate.py中teleoperate函数使用@draccus.wrap()装饰器解析命令行参数,将参数转换为TeleoperateConfig类型的配置对象。dracus是用于配置解析的库,类型与argparse。
这里的TeleoperateConfig是一个自定义的配置类,定义如下:
from dataclasses import dataclass
@dataclass
class RobotConfig:
type: str
port: str
id: str
@dataclass
class TeleopConfig:
type: str
port: str
id: str
@dataclass
class TeleoperateConfig:
teleop: TeleoperatorConfig
robot: RobotConfig
# Limit the maximum frames per second.
fps: int = 60
teleop_time_s: float | None = None
# Display all cameras on screen
display_data: bool = False
可以看到TeleoperateConfig类中属性定义了robot,teleop两个对象,命令行的参数将会映射到,如:
- --robot.type=so101_follower:对应cfg.robot.type
- --teleop.port=/dev/ttyACM1:对应cfg.teleop.port
初始化可视化工具
if cfg.display_data:
_init_rerun(session_name="teleoperation")
当参数--display_data=true,即cfg.display_data 为 True,则调用 _init_rerun 函数初始化 rerun 可视化工具。rerun 是一个用于记录和可视化科学数据的工具。
def _init_rerun(session_name: str = "lerobot_control_loop") -> None:
"""Initializes the Rerun SDK for visualizing the control loop."""
batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000")
os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_size
rr.init(session_name)
memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10
rr.spawn(memory_limit=memory_limit)
_init_rerun函数的主要功能是初始化Rerun SDK,设置数据刷新的字节数和内存使用限制,然后启动Rerun会话,为后续的控制循环可视化工作做准备。
其中有两个参数是从环境变量中获取,分别是RERUN_FLUSH_NUM_BYTES和LEROBOT_RERUN_MEMORY_LIMIT,用于控制每次刷新是处理数据的字节数和限制使用的内存量。
创建主从臂实例
teleop = make_teleoperator_from_config(cfg.teleop)
robot = make_robot_from_config(cfg.robot)
调用调用 make_teleoperator_from_config 和make_robot_from_config 函数,根据配置对象创建 SO101Leader遥操作设备实例和 SO101Follower 机器人实例。
def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
if config.type == "keyboard":
from .keyboard import KeyboardTeleop
return KeyboardTeleop(config)
elif config.type == "koch_leader":
from .koch_leader import KochLeader
return KochLeader(config)
elif config.type == "so100_leader":
from .so100_leader import SO100Leader
return SO100Leader(config)
elif config.type == "so101_leader":
from .so101_leader import SO101Leader
return SO101Leader(config)
......
以leader为例,最终调用SO101Leader对象实例。
class SO101Leader(Teleoperator):
"""
SO-101 Leader Arm designed by TheRobotStudio and Hugging Face.
"""
config_class = SO101LeaderConfig
name = "so101_leader"
def __init__(self, config: SO101LeaderConfig):
super().__init__(config)
#调用父类Teleoperator够着函数初始化属性,包括
#self.id:设备的唯一标识符,从配置对象中获取。
#self.calibration_dir:校准文件的存储目录。
#self.calibration_fpath:校准文件的完整路径。
#self.calibration:存储电机校准信息的字典。
self.config = config
#类型为 SO101LeaderConfig,存储传入的配置对象,方便在类的其他方法中访问配置信息。
norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "sts3215", norm_mode_body),
"shoulder_lift": Motor(2, "sts3215", norm_mode_body),
"elbow_flex": Motor(3, "sts3215", norm_mode_body),
"wrist_flex": Motor(4, "sts3215", norm_mode_body),
"wrist_roll": Motor(5, "sts3215", norm_mode_body),
"gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
},
calibration=self.calibration,
)
#类型为 FeetechMotorsBus,用于与 Feetech 电机进行通信,初始化时传入端口、电机信息和校准信息。
SO101Leader类继承了Teleoperator,super().init(config) 调用父类 Teleoperator 的构造函数初始化的属性。
class Teleoperator(abc.ABC):
def __init__(self, config: TeleoperatorConfig):
self.id = config.id
#设备的id
self.calibration_dir = (
config.calibration_dir
if config.calibration_dir
else HF_LEROBOT_CALIBRATION / TELEOPERATORS / self.name
)
#校准文件的存储目录
self.calibration_dir.mkdir(parents=True, exist_ok=True)
#创建校准目录
self.calibration_fpath = self.calibration_dir / f"{self.id}.json"
self.calibration: dict[str, MotorCalibration] = {}
# 初始化一个字典,键为字符串,值为MotorCalibration
if self.calibration_fpath.is_file():
self._load_calibration()
#检查校准文件是否存在,存在调用加载校准信息的方法。
Teleoperator定义了一个遥控的基类,其构造函数中接受一个TeleoperatorConfig类型的参数config,主要有以下属性。
- self.id:设备的唯一标识符,从传入的配置对象 config 里获取,用于在系统中唯一标识该 SO101Leader 设备实例。
- self.calibration_dir:校准文件的存储目录。若 config.calibration_dir 有值则使用该值,否则使用默认路径。此目录用于存放设备的校准文件。
- self.calibration_fpath:校准文件的完整路径,由 self.calibration_dir 和设备 id 组合而成,文件格式为 JSON。
- self.calibration:存储电机校准信息的字典,键为电机名称,值为 MotorCalibration 类型的对象,用于保存电机的校准参数。
而SO101Leader类中自己定义的实力属性有
- self.config:类型为 SO101LeaderConfig,存储传入的配置对象。借助这个属性,类的其他方法可以访问配置信息,如端口号、是否使用角度单位等。
- self.bus:类型为 FeetechMotorsBus,用于和 Feetech 电机进行通信。初始化时传入端口、电机信息和校准信息,后续可以通过该对象实现电机的读写操作、校准等功能。
总结下就是SO101Leader 类包含 2 个类属性和多个实例属性,这些属性从不同层面描述了 SO101 领导者手臂设备的类型、配置、校准信息以及与电机的通信对象等。而SO101Follower也是同理这里就不过多阐述。
连接设备
teleop.connect()
robot.connect()
连接设备就是各自调用此前创建实例的connect方法,建立与硬件设备的连接。以teleop为例,最总调用到self.bus.connect发起连接并进行配置。
def connect(self, calibrate: bool = True) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
self.configure()
logger.info(f"{self} connected.")
跟随控制
def teleop_loop(
teleop: Teleoperator, robot: Robot, fps: int, display_data: bool = False, duration: float | None = None
):
display_len = max(len(key) for key in robot.action_features)
start = time.perf_counter()
while True:
loop_start = time.perf_counter()
#从主臂获取动作位置数据
action = teleop.get_action()
#如果启动了实时显示,进行显示
if display_data:
observation = robot.get_observation()
log_rerun_data(observation, action)
#将主臂的动作位置数据发送给从臂
robot.send_action(action)
#计算循环执行时间
dt_s = time.perf_counter() - loop_start
busy_wait(1 / fps - dt_s)
loop_s = time.perf_counter() - loop_start
#打印动作的信息,每个舵机的位置。
print("\n" + "-" * (display_len + 10))
print(f"{'NAME':<{display_len}} | {'NORM':>7}")
for motor, value in action.items():
print(f"{motor:<{display_len}} | {value:>7.2f}")
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
if duration is not None and time.perf_counter() - start >= duration:
return
# 将终端的文本光标向上移动一定的行数,以此实现覆盖上一次输出内容的效果,从而在终端里模拟实时更新的显示
move_cursor_up(len(action) + 5)