Ai
			- 
					
						  ubuntu系统xiaozhi server本地部署简介 本文主要是记录在ubuntu系统从零源码的方式本地部署小智Ai服务端的过程,项目的地址为:xiaozhi-server。在部署之前简单了解一下其项目框架,这里总结可以分为3部分:manager-web、manager-api、xiaozhi-server,这3部分的运行是互相独立的,相互之间通过http rest api的方式进行访问,如下图: manager-web: 前端控制台(Vue)。管理员用浏览器操作;调用后端接口,不直接连设备。 manager-api: 后端管理服务(Java Spring Boot)。负责用户/设备/模型/参数/OTA/激活等业务,对外提供 REST API;对数据库(MySQL)与缓存(Redis)读写。 xiaozhi-server: 实时语音与智能体服务(Python)。负责 WebSocket 连接、ASR/LLM/TTS、工具/视觉接口;启动时向 manager-api 拉取配置、运行时上报对话。 3个组件分别使用了不用的语言环境,其中manager-web使用的是Vue.js,而manager-api使用的是java spring boot,xiaozhi-server使用的是python。因此需要装3个不同的语言环境。同时对于后端manager-api需要对数据进行存储,因此还需要安装mysql、redis。下面就围绕这3部分进行展开。 先在本地拉取一份代码: git clone https://github.com/xinnan-tech/xiaozhi-esp32-server.git 值得注意的时,xiaozhi server最简化版本安装,只需要安装xiaozhi-server即可,简化版部署见后续章节。 manager-api安装 数据库安装 由于后端的数据管理需要用到数据库,因此需要安装mysql、redis。 (1)mysql安装 # 安装MySQL sudo apt update sudo apt install -y mysql-server # 启动MySQL服务 sudo systemctl start mysql sudo systemctl enable mysql # 创建数据库 sudo mysql -e "CREATE DATABASE xiaozhi_esp32_server CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;" # 创建用户并指定认证方式(关键改动) sudo mysql -e "CREATE USER 'xiaozhi'@'localhost' IDENTIFIED WITH mysql_native_password BY 'xiaozhi123';" # 授权 sudo mysql -e "GRANT ALL PRIVILEGES ON xiaozhi_esp32_server.* TO 'xiaozhi'@'localhost';" sudo mysql -e "FLUSH PRIVILEGES;" mysql数据库安装后,同时也创建了用户和密码,分别是xiaozhi和xiaozhi123,这个后续需要填充到manaer-api的配置文件中,以便manager-api可以访问。 (2)安装Redis # 安装Redis sudo apt install -y redis-server # 启动Redis服务 sudo systemctl start redis-server sudo systemctl enable redis-server # 检查Redis状态 redis-cli ping Spring boot环境安装 因为后端程序manager-api使用的是java spring boot,因此需要安装java的运行环境。官方提示安装JDK21和Maven,前者是java的运行环境,后者是java项目管理工具。 # 安装JDK 21 sudo apt install -y openjdk-21-jdk # 设置JAVA_HOME环境变量 echo 'export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64' >> ~/.bashrc echo 'export PATH=$JAVA_HOME/bin:$PATH' >> ~/.bashrc source ~/.bashrc # 验证Java安装 java -version # 安装Maven sudo apt install -y maven # 验证Maven安装 mvn -version 配置数据库 数据库和java环境安装好后,就可以配置java spring boot与数据库的连接了。 在xiaozhi-esp32-server/main/manager-api/src/main/resources/application-dev.yml中配置数据库连接信息 @@ -13,8 +13,8 @@ spring: #MySQL driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/xiaozhi_esp32_server?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true - username: root - password: 123456 + username: xiaozhi + password: xiaozhi123 initial-size: 10 max-active: 100 min-idle: 10 在xiaozhi-esp32-server/main/src/main/resources/application-dev.yml中配置Redis连接信息(redis默认配置好了,不用改) spring: data: redis: host: localhost port: 6379 password: database: 0 编译运行 配置好对数据库的连接后,就可以进行编译了。 # 进入manager-api目录 cd xiaozhi-esp32-server/main/manager-api # 编译项目 mvn clean package -DskipTests # 编译完成后的jar包位置 ls -lh target/*.jar 编译完成之后,就可以运行项目了。 java -jar target/xiaozhi-esp32-api.jar --spring.profiles.active=dev 运行如果没有什么报错就说明启动成功了。 manager-web安装 安装node.js 由于前端使用的的是vue.js,所以需要安装node.js环境。 # 安装Node.js 20 curl -fsSL https://deb.nodesource.com/setup_20.x | sudo -E bash - sudo apt install -y nodejs # 验证Node.js安装 node -v npm -v 安装依赖 node.js环境安装好后,就可以安装manager-web的依赖了。 # 进入manager-web目录 cd xiaozhi-esp32-server/main/manager-web # 安装依赖 npm install 启动 切换到manager-web路径下,就可以运行服务程序了。 cd xiaozhi-esp32-server/main/manager-web npm run serve 启动成功之后,就可以访问后台了。登陆地址:http://127.0.0.1:8001,登陆后进行注册一个用户就可以进入到后台进行配置了。 配置模型api key 要让设备能够访问,需要配置模型的api key,登陆到智普的后台,注册获取一个api key。 这里使用的是智谱ai,注册一个账户,然后申请一个api key 然后登陆智控台配置密钥。 xiaozhi-server安装 conda python环境 # 下载并安装miniconda wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh bash Miniconda3-latest-Linux-x86_64.sh -b -p $HOME/miniconda3 # 初始化conda $HOME/miniconda3/bin/conda init bash source ~/.bashrc # 创建Python环境 conda remove -n xiaozhi-esp32-server --all -y conda create -n xiaozhi-esp32-server python=3.10 -y # 激活环境 conda activate xiaozhi-esp32-server # 添加清华源 conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge # 安装必要的系统库 conda install -y libopus ffmpeg libiconv python依赖包 # 进入xiaozhi-server目录 cd xiaozhi-esp32-server/main/xiaozhi-server # 设置pip镜像源 pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ # 安装Python依赖 pip install -r requirements.txt 下载语音模型 # 进入models目录 cd xiaozhi-esp32-server/main/xiaozhi-server/models # 下载模型文件(推荐阿里云镜像) cd SenseVoiceSmall wget https://modelscope.cn/models/iic/SenseVoiceSmall/resolve/master/model.pt # 验证文件 ls -lh model.pt 配置密钥 配置密钥主要是xiaozhi-server与manager-api交互时需要进行认证,因此需要先获取密钥。 (1)先在本地创建配置文件 # 创建data目录 cd xiaozhi-esp32-server/main/xiaozhi-server mkdir -p data # 复制配置文件 cp config_from_api.yaml data/.config.yaml # 编辑配置文件 vim data/.config.yaml 配置.config.yaml: manager-api: url: http://127.0.0.1:8002/xiaozhi secret: 待会从智控台获取 server: websocket: ws://你的IP:8000/xiaozhi/v1/ (2)然后登陆智控台获取密钥 访问智控台:http://127.0.0.1:8001 注册账号(第一个为超级管理员) 登录 → 参数管理 → 找到 server.secret 并复制 回到xiaozhi-server配置: vim xiaozhi-esp32-server/main/xiaozhi-server/data/.config.yaml 设置为 manager-api: url: http://127.0.0.1:8002/xiaozhi secret: 你刚才复制的server.secret值 启动服务 cd xiaozhi-esp32-server/main/xiaozhi-server conda activate xiaozhi-esp32-server python app.py 执行成功的话应该是下面这样 (xiaozhi-esp32-server) liumingyuan@HP-ProBook:~/xiaozhi-esp32-server/main/xiaozhi-server$ python app.py 从API读取配置 251029 20:47:34[0.8.5-00000000000000][core.providers.vad.silero]-INFO-SileroVAD 251029 20:47:34[0.8.5-00000000000000][core.utils.modules_initialize]-INFO-初始化组件: vad成功 VAD_SileroVAD 251029 20:47:38[0.8.5-00000000000000][core.providers.asr.fun_local]-INFO-funasr version: 1.2.3. 251029 20:47:38[0.8.5-00000000000000][core.utils.modules_initialize]-INFO-ASR模块初始化完成 251029 20:47:38[0.8.5-00000000000000][core.utils.modules_initialize]-INFO-初始化组件: asr成功 ASR_FunASR 251029 20:47:38[0.8.5-00000000000000][__main__]-INFO-视觉分析接口是 http://10.0.90.104:8003/mcp/vision/explain 251029 20:47:38[0.8.5-00000000000000][__main__]-INFO-Websocket地址是 ws://10.0.90.104:8000/xiaozhi/v1/ 251029 20:47:38[0.8.5-00000000000000][__main__]-INFO-=======上面的地址是websocket协议地址,请勿用浏览器访问======= 251029 20:47:38[0.8.5-00000000000000][__main__]-INFO-如想测试websocket请用谷歌浏览器打开test目录下的test_page.html 251029 20:47:38[0.8.5-00000000000000][__main__]-INFO-============================================================= 服务访问地址: 智控台:http://127.0.0.1:8001 API文档:http://127.0.0.1:8002/xiaozhi/doc.html WebSocket:ws://127.0.0.1:8000/xiaozhi/v1/ OTA接口:http://127.0.0.1:8002/xiaozhi/ota/ 配置websocket和OTA 由于是全模块部署,所以需要登陆智能控台,设置ota和websocket的接口,需要注意的是weboscket的启动必须是要等xiaozhi-server app启动才能设置。 OTA接口:http://你电脑局域网的ip:8002/xiaozhi/ota/ Websocket接口:ws://你电脑局域网的ip:8000/xiaozhi/v1/ 请你务必把以上两个接口地址写入到智控台中:他们将会影响websocket地址发放和自动升级功能。 1、使用超级管理员账号,登录智控台,在顶部菜单找到参数管理,找到参数编码是server.websocket,输入你的Websocket接口。 2、使用超级管理员账号,登录智控台,在顶部菜单找到参数管理,找到数编码是server.ota,输入你的OTA接口。 简化部署 所谓简化部署就是只跑xiaozhi-server,前后端都不跑。简化部署先参考"xiaozhi-server安装"章节,然后再次基础上进行配置文件即可。与完整部署xiaozhi-server部分唯一的区别就是配置文件不一样。如果要对接前后端使用的默认文件是config_from_api.yaml而如果是简化部署使用的默认文件是config.yaml。 下面是配置步骤。 cd xiaozhi-esp32-server/main/xiaozhi-server/data cp .config.yaml .config.yaml_back #对云端的配置作个备份 cp ../config.yaml .config.yaml #拷贝默认的配置 设置API key 测试 本地服务搭建好好后可以进行测试验证,可以使用xiaozhi-server自带的test程序,也可以使用开源的客户端py-xiaozhi,或者直接搭建esp32的设备接入。这里先用前面两者方式。 xiaozhi-server test cd xiaozhi-esp32-server/main/xiaozhi-server/test python -m http.server 8006 然后网页登陆:http://localhost:8006/test_page.html py-xiaozhi git clone https://github.com/huangjunsen0406/py-xiaozhi.git sudo apt-get update && sudo apt-get install -y portaudio19-dev libportaudio2 conda create -n py-xiaozhi-client python=3.10 conda activate py-xiaozhi-client 配置完成之后就可以执行应用获取到设备验证码之后,登陆绑定即可进行对话。 python main.py --protocol websocket
- 
					
						  Lekiwi驱动链路分析系统架构 硬件组成 Lekiwi是一个底盘+机械臂的结构。 机械臂: 6个自由度(shoulder_pan, shoulder_lift, elbow_flex, wrist_flex, wrist_roll, gripper) 移动底盘:3个全向轮,三轮全向移动(left_wheel, back_wheel, right_wheel) github官网Lekiwi使用 Koch v1.1 机械臂、U2D2 电机控制器和 Dynamixel XL430 电机作为移动基座。我这里买到的使用的是feetech电机,机械臂和底盘一共9个motor接入到一个串口总线上,对于机械臂和底盘移动只需要通过一个串口总线进行。 软件架构 我这里将软件架构分为3层。 应用层:对设备的操作,实例化设备一个设备后,对设备进行连接,移动控制,观测数据获取。 总线层:实现一个MotorBus基类,对设备的一些操作进行统一定义、约束。实现操作的逻辑,具体的实现由继承设备来实现。 设备层:具体设备的实现,继承与MotorBus,实现对电机底层通信接口。 感觉这个框架不是很合理,主要的框架设计是设备层继承总线层的基类,继承相当于是扩展,应用层没有起到屏蔽设备的作用。在应用层要创建一个设备实例如self.bus = FeetechMotorsBus(),而FeetechMotorsBus继承MotorsBus,应用层直接操作FeetechMotorsBus实例,没有达到屏蔽底层的效果。可以完全借鉴Linux设备驱动模型,分为设备、总线、驱动(应用),设备注册后,总线负责匹配设备和驱动,完全隔离,不用关系底层的设备是什么。 驱动链路 初始化 class LeKiwi(Robot): config_class = LeKiwiConfig name = "lekiwi" def __init__(self, config: LeKiwiConfig): super().__init__(config) self.config = config norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100 self.bus = FeetechMotorsBus( port=self.config.port, motors={ # arm "arm_shoulder_pan": Motor(1, "sts3215", norm_mode_body), "arm_shoulder_lift": Motor(2, "sts3215", norm_mode_body), "arm_elbow_flex": Motor(3, "sts3215", norm_mode_body), "arm_wrist_flex": Motor(4, "sts3215", norm_mode_body), "arm_wrist_roll": Motor(5, "sts3215", norm_mode_body), "arm_gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), # base "base_left_wheel": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100), "base_back_wheel": Motor(8, "sts3215", MotorNormMode.RANGE_M100_100), "base_right_wheel": Motor(9, "sts3215", MotorNormMode.RANGE_M100_100), }, calibration=self.calibration, ) self.arm_motors = [motor for motor in self.bus.motors if motor.startswith("arm")] self.base_motors = [motor for motor in self.bus.motors if motor.startswith("base")] self.cameras = make_cameras_from_configs(config.cameras) 继承Robot,指定了配置类为LeKiwiConfig其定义了uart的端口、相机的编号等信息。将角度统一归一化到[-100,100],创建Feetech电机总线实例,创建Feetech电机型号sts3215,配置了机械臂电机ID为1至6,底盘编号为7至9编号。同时对相机也进行了初始化,用于后续的视觉观测。 连接 robot.connect()————> def connect(self, calibrate: bool = True) -> None: if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") self.bus.connect() if not self.is_calibrated and calibrate: logger.info( "Mismatch between calibration values in the motor and the calibration file or no calibration file found" ) self.calibrate() for cam in self.cameras.values(): cam.connect() self.configure() logger.info(f"{self} connected.") 初始化完成之后即可进行发起连接,连接主要是根据指定的串口好进行打开,然后进行握手验证,ping所有配置的电机,检查校准文件与电机的状态,并设置PID、加速度等参数。 校准 def calibrate(self) -> None: if self.calibration: # Calibration file exists, ask user whether to use it or run new calibration user_input = input( f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: " ) if user_input.strip().lower() != "c": logger.info(f"Writing calibration file associated with the id {self.id} to the motors") self.bus.write_calibration(self.calibration) return logger.info(f"\nRunning calibration of {self}") motors = self.arm_motors + self.base_motors self.bus.disable_torque(self.arm_motors) for name in self.arm_motors: self.bus.write("Operating_Mode", name, OperatingMode.POSITION.value) input("Move robot to the middle of its range of motion and press ENTER....") homing_offsets = self.bus.set_half_turn_homings(self.arm_motors) homing_offsets.update(dict.fromkeys(self.base_motors, 0)) full_turn_motor = [ motor for motor in motors if any(keyword in motor for keyword in ["wheel", "wrist_roll"]) ] unknown_range_motors = [motor for motor in motors if motor not in full_turn_motor] print( f"Move all arm joints except '{full_turn_motor}' sequentially through their " "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) for name in full_turn_motor: range_mins[name] = 0 range_maxes[name] = 4095 self.calibration = {} for name, motor in self.bus.motors.items(): self.calibration[name] = MotorCalibration( id=motor.id, drive_mode=0, homing_offset=homing_offsets[name], range_min=range_mins[name], range_max=range_maxes[name], ) self.bus.write_calibration(self.calibration) self._save_calibration() print("Calibration saved to", self.calibration_fpath) 首次校准,从归零到测量范围,生成并保存文件流程,后续使用按回车复用已有校准;输入 'c' 重新校准。可以总结为如下: LeKiwi.calibrate() → 用户选择(使用已有/重新校准) → 归零设置 → 运动范围测量 → 校准参数构建 → FeetechMotorsBus.write_calibration() → MotorsBus.write_calibration() → 逐个写入电机寄存器 校准主要是限定几个参数,如归零偏移、运动范围、驱动模式。校准的流程是由用户选择使用已经有的校准文件直接写入校准还是进行重新启动校准流程校准。 动作执行 def send_action(self, action: dict[str, Any]) -> dict[str, Any]: if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") arm_goal_pos = {k: v for k, v in action.items() if k.endswith(".pos")} base_goal_vel = {k: v for k, v in action.items() if k.endswith(".vel")} base_wheel_goal_vel = self._body_to_wheel_raw( base_goal_vel["x.vel"], base_goal_vel["y.vel"], base_goal_vel["theta.vel"] ) # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: present_pos = self.bus.sync_read("Present_Position", self.arm_motors) goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in arm_goal_pos.items()} arm_safe_goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) arm_goal_pos = arm_safe_goal_pos # Send goal position to the actuators arm_goal_pos_raw = {k.replace(".pos", ""): v for k, v in arm_goal_pos.items()} self.bus.sync_write("Goal_Position", arm_goal_pos_raw) self.bus.sync_write("Goal_Velocity", base_wheel_goal_vel) return {**arm_goal_pos, **base_goal_vel} 输入为动作的序列,输出为实际发送的动作。首先将机械臂(后缀.pos)目标位置和底盘目标速度(后缀.vel)进行分离,如下: # 分离前 action = { "arm_shoulder_pan.pos": 45.0, # 机械臂位置 "arm_elbow_flex.pos": -30.0, "x.vel": 0.1, # 底盘速度 "y.vel": 0.0, "theta.vel": 0.05 } # 分离后 arm_goal_pos = { "arm_shoulder_pan.pos": 45.0, "arm_elbow_flex.pos": -30.0 } base_goal_vel = { "x.vel": 0.1, "y.vel": 0.0, "theta.vel": 0.05 } 然后调用_body_to_wheel_raw进行底盘运动学转换,输入为底盘坐标系速度 (x, y, θ),输出为三轮电机速度指令。转换的时候需要进行安全限制,实际获取机械臂的关节位置,然后计算步幅(目标位置-当前位置),超过max_relative_target 则裁剪使用安全的目标位置。 最后就是调用sync_write分别写入控制机械臂和底盘。下面总结一下流程: LeKiwi.send_action(action) → 动作分离(机械臂位置 + 底盘速度) → 底盘运动学转换 → 安全限制检查 → FeetechMotorsBus.sync_write() → MotorsBus._sync_write() → 批量写入电机寄存器 对Lekiwi的控制,由于底盘是3个万向轮,所以需要进行运动学转换,现将机械臂和底盘进行分离,计算之处底盘坐标系转换的3轮速度,在确保安全限制的条件下,调用sync_write进行写入,sync_write是同时写入多个电机。 状态读取 def get_observation(self) -> dict[str, Any]: if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") # Read actuators position for arm and vel for base start = time.perf_counter() arm_pos = self.bus.sync_read("Present_Position", self.arm_motors) base_wheel_vel = self.bus.sync_read("Present_Velocity", self.base_motors) base_vel = self._wheel_raw_to_body( base_wheel_vel["base_left_wheel"], base_wheel_vel["base_back_wheel"], base_wheel_vel["base_right_wheel"], ) arm_state = {f"{k}.pos": v for k, v in arm_pos.items()} obs_dict = {**arm_state, **base_vel} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") # Capture images from cameras for cam_key, cam in self.cameras.items(): start = time.perf_counter() obs_dict[cam_key] = cam.async_read() dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") return obs_dict 首先调用sync_read分别读取机械臂位置和底盘的当前速度,然后调用底盘速度逆运动学转换(三轮电机->底盘坐标系速度(x, y, θ)),接着将机械臂位置和底盘转换的坐标系速度整合,再次就是对相机图像的待机,先遍历所有配置相机,将图像的数据添加到观测字段,最终再进行整合得到观测字典。 obs_dict = { # 机械臂位置 "arm_shoulder_pan.pos": 45.2, "arm_shoulder_lift.pos": -12.8, # ... 其他关节 # 底盘速度 "x.vel": 0.15, "y.vel": -0.08, "theta.vel": 0.12, # 相机图像 "camera_1": numpy_array, # (H, W, 3) "camera_2": numpy_array, # (H, W, 3) } 下面是总结流程 LeKiwi.get_observation() → FeetechMotorsBus.sync_read() → MotorsBus._sync_read() → 批量读取电机状态 → 底盘速度逆运动学 → 相机图像采集 获取状态与动作执行相反,首先读取到电机的状态,然后通过逆运动学,将底盘的速度转换为坐标系。
- 
					
						  lekiwi+Orin Nano环境搭建环境准备 简要记录在Orin nano平台搭建lekiwi环境,可以远程遥控底盘移动和机械臂示教的过程,需要的硬件如下: - NVIDIA Jetson Orin Nano开发板 - Lekiwi套件(底盘、主从机械臂) - PC,预装好Ubuntu系统 组装硬件 将底盘、主从机械臂、Orin nano组装好,需要注意的是由于官方默认使用的计算平台是树莓派,所以默认提供的供电接口是USB 5V。我们这里使用的Orin Nano平台,使用的是DC5525电源接口,因此需要提前购买准备DC5521 to DC5525的转接线。 详细组装可以参考 (1)https://github.com/SIGRobotics-UIUC/LeKiwi/blob/main/Assembly.md (2)https://huggingface.co/docs/lerobot/so101#step-by-step-assembly-instructions Orin nano 首先,首次启动先接上键盘、鼠标、显示器登录配置好网络和VNC。 sudo apt update sudo apt install vino # 设置vino开机自启 mkdir -p ~/.config/autostart cp /usr/share/applications/vino-server.desktop ~/.config/autostart/. cd /usr/lib/systemd/user/graphical-session.target.wants sudo ln -s ../vino-server.service ./. # 调整共享/认证设置 gsettings set org.gnome.Vino prompt-enabled false gsettings set org.gnome.Vino require-encryption false # 设置密码,默认thepassword gsettings set org.gnome.Vino authentication-methods "['vnc']" gsettings set org.gnome.Vino vnc-password $(echo -n 'thepassword'|base64) # 然后重启就可以使用VNC viewer访问了 sudo reboot 其次,安装conda 环境,由于Jetson架构是aarch64,所以下载miniconda aarch64的版本。 wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-aarch64.sh sh Miniconda3-latest-Linux-x86_64.sh source ~/.bashrc PC ubunut 同理安装conda环境,与jetson不一样的是,PC是X86架构。 wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh sh Miniconda3-latest-Linux-x86_64.sh source ~/.bashrc 软件安装 代码下载 在Orin nano和PC ubuntu上各自从github上克隆开源代码。 git clone https://github.com/huggingface/lerobot.git 配置安装 首先,在Orin nano和PC ubuntu上各自创建lekiwi的环境。 conda create -n lekiwi python=3.10 conda activate lekiwi 其次,切到lerobot环境下进行安装 cd lerobot pip install -e ".[feetech]" conda install -c conda-forge ffmpeg=7.1.1 遥操作 标定 需要给主臂、从臂进行标定,以限制关节的最大运动范围。在标定前,可以先看看视频怎么操作机械臂https://huggingface.co/docs/lerobot/so101#calibrate (1)从臂的运行命令如下 lerobot-calibrate \ --robot.type=lekiwi \ --robot.id=R1225280 \ --robot.cameras='{handeye: {type: opencv, index_or_path: 0, width: 640, height: 360, fps: 30}}' 标定的文件 (2)主臂的运行命令如下: lerobot-calibrate \ --teleop.type=so101_leader \ --teleop.port=/dev/ttyACM0 \ --teleop.id=R07252801 (3)标定好后会在下面路径存储标定的参数 # orin nano平台 ~/.cache/huggingface/lerobot/calibration/robots/lekiwi/R1225280.json # ubuntu平台 ~/.cache/huggingface/lerobot/calibration/teleoperators/so101_leader/R07252801.json 遥控 (1)由于PC和lekiwi之前的遥控使用的是gRPC,所以需要先安装zmq,否则会报错。 pip install zmq (2)orin nano启动命令,host.connection_time_s设置的是时间(单位是秒),超过这个时间会自动断开。 python -m lerobot.robots.lekiwi.lekiwi_host \ --robot.id=R1225280 \ --robot.cameras='{handeye: {type: opencv, index_or_path: 0, width: 640, height: 360, fps: 30}}' \ --host.connection_time_s=300 (3)PC ubuntu需要修改示例代码,修改点如下: diff --git a/examples/lekiwi/teleoperate.py b/examples/lekiwi/teleoperate.py index 6b430df4..cb4ad415 100644 --- a/examples/lekiwi/teleoperate.py +++ b/examples/lekiwi/teleoperate.py @@ -18,20 +18,20 @@ import time from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop, KeyboardTeleopConfig -from lerobot.teleoperators.so100_leader import SO100Leader, SO100LeaderConfig +from lerobot.teleoperators.so101_leader import SO101Leader, SO101LeaderConfig from lerobot.utils.robot_utils import busy_wait from lerobot.utils.visualization_utils import init_rerun, log_rerun_data FPS = 30 # Create the robot and teleoperator configurations -robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="my_lekiwi") -teleop_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem585A0077581", id="my_awesome_leader_arm") +robot_config = LeKiwiClientConfig(remote_ip="192.168.0.33", id="my_lekiwi") +teleop_arm_config = SO101LeaderConfig(port="/dev/ttyACM0", id="R07252801") keyboard_config = KeyboardTeleopConfig(id="my_laptop_keyboard") # Initialize the robot and teleoperator robot = LeKiwiClient(robot_config) -leader_arm = SO100Leader(teleop_arm_config) +leader_arm = SO101Leader(teleop_arm_config) keyboard = KeyboardTeleop(keyboard_config) 将SO100改成SO101,因为我们的主臂使用的是SO101 将remote_ip改成jetson nano的ip 配置好SO101的uart端口和标定的文件 (4)PC ubunut执行命令 python examples/lekiwi/teleoperate.py 之后就可以使用键盘和主臂进行遥控操作了。 Key Action W 前进 S 后退 A 左移 D 右移 Z 左转(逆时针) X 右转(顺时针) R 加速一档 F 减速一档
- 
					
						  Jetson Orin Nano环境搭建安装浏览器 sudo apt update sudo apt install chromium-browser -y 安装后发现点击浏览器会没反应。按照下面方法配置。 snap download snapd --revision=24724 sudo snap ack snapd_24724.assert sudo snap install snapd_24724.snap sudo sudo snap refresh --hold snapd 配置VNC sudo apt update sudo apt install vino 然后配置 步骤1: 设置开机自启 对于 LXDE 桌面(例如 2 GB 版本的 Jetson Nano) mkdir -p ~/.config/autostart cp /usr/share/applications/vino-server.desktop ~/.config/autostart/. 对于 GNOME 桌面: cd /usr/lib/systemd/user/graphical-session.target.wants sudo ln -s ../vino-server.service ./. 步骤2:调整共享/认证设置 gsettings set org.gnome.Vino prompt-enabled false gsettings set org.gnome.Vino require-encryption false 步骤3:然后设置密码 # Replace thepassword with your desired password gsettings set org.gnome.Vino authentication-methods "['vnc']" gsettings set org.gnome.Vino vnc-password $(echo -n 'thepassword'|base64) 上面登录密码设置的是thepassword 步骤4: Reboot the system so that the settings take effect sudo reboot SSH ssh user@ip 输入密码就可以登录进去了。 conda环境 wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-aarch64.sh sh Miniconda3-latest-Linux-x86_64.sh source ~/.bashrc 注意这里是aarch64
- 
					
						  为什么AlphaGo能自学围棋?强化学习基本概念强化学习简介 什么是强化学习 以直升机控制飞行的程序来举例。 自动驾驶的直升机配备了机载计算机、GPS、加速度计、陀螺仪和磁罗盘,我们可以实时确定的知道直升机的位置。如何使用强化学习来让直升机飞行了? 在强化学习中,将直升机的位置、方向和速度等称为状态s,因此我们的目标或任务就是从直升机的状态映射到动作a的函数,意思是将两根控制杆推多远才能保持直升机在空中平衡、飞行而不坠毁。 要获取到动作a可以使用监督学习来训练神经网络,直接学习从x(状态a)到y(动作a)的映射,但事实证明直升机在控制移动时实际上时摸棱两可的,不好判断正确的行动是什么?是向做倾斜一点还是倾斜很多,还是稍微增加直升机的压力?要获得x的数据集和理想的动作y实际上是很困难的。因此对于控制直升机和其他机器人的许多任务,监督学习方法效果不佳,我们改用强化学习。 强化学习的关键输入称为reward(奖励),它告诉直升机何时表现良好,何时表现不佳。强化学习就像是在训练狗,你不知道狗会做出什么行为?但是我们可以判断狗行为的好坏,如果狗的行为是好的我们就就认为他是一个好狗给予奖励,如果它做出的行为不达我们的预期我们认为就是一个坏狗给予惩罚。我们期望它能自己学会如何做出好的动作行为而少做坏的动作行为。 火星探测器 火星探测器从初始状态运动到最终状态(terminal state),最终会得到一个累计的分数(return),每运动一次都会有相应的奖励reward。 当前火星探测器的初始位置在state 4,它可以向左也可以向右。 场景1:state 4(初始)(获得0)->state 3(获得0)->state 2(获得0)->state 1(获得100);最终得100。 场景2:state 4(初始)(获得0)->state 5(获得0)->state 6(获得40);最终得40。 场景3:state 4(初始)(获得0)->state 4(获得0)->state 4(获得0)->state 3(获得0)->state 2(获得0)->state 1(获得100);最终得100。 在每个时间步长上,机器人处于某种状态,我们称为S,它开始选择一个动作a,然后就获得一些奖励(reward),奖励值它从该状态获取,同时了它会切换为一个新的状态S’。公示表示为$(s,a,R(s),s')$,举个具体的例子,机器人处于状态4并采取行动时往左边走没有获得奖励(4,<-,0,3)。核心点就是状态(state)、动作(action)、奖励(reward)、下一个状态(next state),基本上就是每采取新动时都会发生情况,这就是强化学习算法要决定如何采取行动考虑的核心要素。 回报return 采取的行动会经历不同的状态以及如何享受不同的奖励,但是怎么去衡量一组特定的奖励比另外一组奖励好还是差了? 打个比方炒股方案1是你今天买入一支股票A然后后天就卖出了赚了1000块,而方案2你今天买入一支股票但是后天你亏了500,但是到第三天你赚了6000,那你更愿意追求那种方案了?虽然是第三天才显示有收益,但是第三天收益很高,显然会选择方案2。 为了计算这个回报分数,我们对动作后的奖励进行累加,但是了随着越往后的动作,我们需要添加一个折扣因子,也就是说越往前的比例系数越高,但是越往后所占的比例折扣就越大。如上图假设折扣因子(discount factor)为0.9,那么R1的折扣因子是1,R2的折扣因子就是0.9,R3的折扣因子就是0.9的平方依此类推。 最终获得的回报(return)取决于奖励(reward),而奖励取决于你采取的行动,因此回报取决于你采取的行动。 如折扣因子是0.5,如果从状态5向左走回报是6.25,如果是状态4向左走回报是12.5,而如果你状态就在6那么回报就是60。 总结一下强化学习的回报是系统获得的奖励总和,但需要加上折扣系数,每一个时间步的权重不一样,时间步越大奖励的系数结果就越小。 Policy 在强化学习中,可以通过很多不同的方式采取行动,比如我们可以决定始终选择更接近的奖励,如果最左边的奖励更接近则向左走,如果最右边的奖励更接近则向右走。当然我们也可以换种策略,始终追求更大的奖励等等。 策略Pi就是希望我们在那种状态下采取什么样的行动。state———>action。强化学习的目标就是要找到一个策略Pi,告诉在什么状态应该采取什么行动,执行每个状态以最大化回报。 小结 上面涉及到的概念,状态、动作、奖励、折扣因子、回报、策略。强化学习的目标就是让策略选择一个好的行动以获取最大的回报,这个决策过程被称为马尔可夫决策过程(MDP)。MDP指的是未来仅取决于当前的状态,而不取决于进入当前状态之前可能发生的任何事情,换句话说在马尔可夫决策过程中,未来取决于你现在所处的位置,而不取决于你是如何达到这里的;再换一种就是MDP是我们有一个机器人,我们要做的是选择动作a,根据这些动作、环境中发生的事情执行科学的任务。 State-action value funciton 状态动作函数Q 表示在状态s下,执行 动作a后,智能体所能获得的期望累计回报(Expected Return)。换句话说Q值衡量的是:"如果我在当前状态下做这个动作,长期来看能赚多少奖励?"。 计算$Q(s,a)$是强化学习算法的重要组成部分。也就是i说有办法计算s的Q,a,对于每个状态和每个动作,那么当你处于某个状态时,你所要做的就是看看不同的动作A,然后选择动作A,那就最大化s的Q,a。求Q的最大值。 贝尔曼方程 如何计算状态动作价值函数Q of S,A了? 在强化学习中有一个名为贝尔曼方程帮助我们解决计算状态动作值函数。下面来看看示例。 上图中有两个示例当在状态2,向右移动时,那么Q(2,->)=R(2)+0.5 max Q(3,a'0)=0+(0.5)25=12.5。当在状态4,向左移动时,那么Q(4,<-)=R(4)+0.5max Q(3,a')=0+(0.5)25=12.5。 贝尔曼方程体现的核心思想是:"一个状态的价值 = 即时奖励 + 后续状态的折扣价值"。也就是说,当前决策的好坏取决于当下奖励 + 未来的期望收益。 Continuous State Space continuous state space示例 对于火星车可能只有一个单一的状态1~6,对于下面的卡车来说有很多个状态,比如x,y,角度等等。而对于直升机来说又有更多参数集合的状态。 因此状态不仅仅是少数可能离散值的一个,它可以是一个数字向量。 月球着陆器 月球着陆车有很多状态变量,如上图所示。奖励函数我们可以设计成下面这样。 对于月球着陆器要学习的策略pi就是输入s,通过策略得到a,然后计算出最大的return。 学习Q 在状态s处,使用神经网络来计算4个动作中,那个动作的Q(s,nothing),Q(s,left),Q(s,main),Q(s,right)最大,然后选择那个最大值的动作。 那如何获得一个包含X和Y值的训练集,可以进行训练神经网络了?这就需要用到贝尔曼方程如上图,我们可以把贝尔曼方程的左边命名为X,右边命名为Y,神经网络的输入是一个状态和动作对。而输出Y就是就是Q。神经网络学习的就是X到Y的映射。 改进的神经网络架构 对于火星车前面的神经网络对应需要推理4次,即Q(s,nothing),Q(s,left),Q(s,main),Q(s,right),这样效率比较低,那么可以对算法进行改进推理一次直接输出4个动作的对应的Q。下面就是修改后的神经网络架构。 算法改进:ϵ贪婪策略 总结 学习流程 智能体观察当前状态 $s_t$ 根据策略 $\pi(a|s_t)$ 选择动作 $a_t$ 环境返回奖励 $r_t$ 和下一状态 $s_{t+1}$ 智能体根据反馈更新策略或价值函数 这一过程可用 马尔可夫决策过程(MDP) 表示: $$ [ \text{MDP} = (S, A, P, R, \gamma) ] $$ 其中: - 参数$S$:状态集合 - 参数$A$:动作集合 - 参数$P(s'|s,a)$:状态转移概率 - 参数$R(s,a)$:奖励函数 - 参数$\gamma$:折扣因子(0~1) 算法 (1)算法分类 类别 特征 代表算法 基于价值(Value-based) 学习 Q 值,间接得到策略 Q-learning、DQN 基于策略(Policy-based) 直接优化策略函数 REINFORCE、PPO Actor-Critic 混合 同时学习策略(Actor)和值函数(Critic) A2C、A3C、DDPG、SAC 基于模型(Model-based) 显式学习环境动态模型 Dyna-Q、Dreamer、MuZero (2)算法演进 阶段 特征 代表算法 传统表格法(Tabular RL) 离散状态空间,值表更新 Q-Learning, SARSA 深度强化学习(Deep RL) 神经网络逼近 Q 函数 DQN, Double DQN, Dueling DQN 连续动作控制(Continuous Control) 针对机械臂、无人车等连续控制问题 DDPG, TD3, SAC 策略梯度类(Policy Gradient) 直接优化策略参数 REINFORCE, PPO, TRPO 基于模型的RL(Model-based RL) 同时学习环境模型 + 策略 MuZero, DreamerV3 模仿学习 / 具身智能结合 利用演示或视觉模仿学习 GAIL, BC, VLA, RT系列 参考:本文主要来之吴恩达强化学习笔记
- 
					
						  机器人全身控制浅谈:理解 WBC 的原理概念 WBC(Whole-Body Control,全身控制)是什么?机器人是由“各关节”组成的,其不是“各关节各玩各的”而是一个耦合的整体。在某个时刻可能要做很多事情,比如保持平衡(重心别出圈)、手/脚要动作到目标位置、躯干姿态不能乱、关节不能超限、脚下不能打滑。这些都是一系列任务的组合。 WBC的核心就是把这些任务(目标)和约束(物理/安全)写进一个小型优化问题,在每个控制周期(几百hz~1Khz)求解,得到“当下这毫秒,各关节应该怎么动/用多大力”。 一句话总结就是WBC就是用优化的方法求解出要给“关节多少力“”以便让机器的各个关节一起配合完成多个目标,且不违反物理与安全约束。 原理 动力学方程 要解释WBC的原理,那必须绕不开动力学方程,这里就先对动力学方程做个简单介绍。 $$ M(q)\dot{v} + h(q,v) = S^T \tau + J_c^T \lambda $$ 配合接触约束: $$ J_c v = 0,\quad \lambda \in \text{摩擦锥} $$ 通俗理解公式就是:“惯性 × 加速度 + 自然出现的力 = 电机能给的力 + 地面/物体的反力” 公式左边:机器人自身的自然物理 变量$M(q)\dot{v}$:惯性项。$M(q)$是质量矩阵,描述机器人在不同姿态下的惯性特性。$\dot{v} $是广义加速度(关节加速度或身体的加速度);其意义就像$F=ma$里面的$ma$,表示加速一个有惯性的物体需要的力 变量$h(q,v)$:重力+速度相关项(科氏力、离心力)。如果机器人静止,这里主要就是重力。如果在运动,这里就会出现"速度带来的额外力",类似开车转弯时身体被甩出去的感觉。 公式右边:外界能提供的“驱动力” 变量$S^T \tau$:电机能施加的关节力矩。$\tau$电机产生的力矩(控制器的输出),$S^T$选择矩阵,把电机力矩映射到广义坐标系中。 变量$J_c^T \lambda$:接触点反力。$\lambda$来自地面或物体的力(约束反作用力),$J_c$接触点的雅可比,把关节速度映射到接触点速度。$J_c^T \lambda$把“地面推脚的力”转换回“关节的力”。 直观记忆就是类比现实生活中的推车子: 你推车:电机力矩$\tau$。 地面支撑车子:接触反力$\lambda$。 车子有质量,要加速就得克服惯性:$M(q)\dot{v}$。 重力和转弯的惯性: $h(q,v)$。 接触力约束 还要满足接触力约束$J_c v = 0,\quad \lambda \in \text{摩擦锥}$,其中$J_c v = 0$意义是接触点速度为0,比如机器人脚贴在地上不滑、不穿透;$\quad \lambda \in \text{摩擦锥}$意义是接触力必须满足摩擦模型,脚不会不穷大摩擦,力要在摩擦椎范围内。 公式中涉及到一个雅可比,什么是雅可比$Jc$? 假设有一个机械臂: 关节角度:就像是控制的"按钮"。 手末端的位置:就是最终关心的"结果"。 那么问题就是关节角度动一动,末端的位置会怎么动了?这个"关节空间的微小变化"影响到“末端空间的微小变化”。这样一个映射关系,就是雅可比矩阵$J$。 图中红色箭头表示关节角度的小变化$\Delta \theta_1 , \Delta \theta_2$。红色箭头的变换导致绿色箭头末端位置的变化:$ \Delta x , \Delta y$。雅可比矩阵 $J$ 就是把 $$ \begin{bmatrix}\Delta \theta_1 \ \Delta \theta_2\end{bmatrix} \longrightarrow \begin{bmatrix}\Delta x \ \Delta y\end{bmatrix} $$ 因此如果想要末端动多少就用$J$,想算末端力传回关节多少就用$J^T$。 总结一下雅可比$J$就是关节空间和任务空间的桥梁,作用就是我们关节动多少,末端/接触点动多少。 动力学方程在WBC中的用处? 动力学方程是机器人身体运动的"牛顿定律",我们来看看WBC的目标是什么?WBC不只是让机器人"走"或"站",而是要全身协调,比如要去抓杯子,脚要保持不滑,躯干要保持平衡,关节力矩不能超过电机限制。所以WBC的本质是解一个优化问题,找出一组关节力矩$\tau$,既能完成任务目标,又满足动力学方程和约束。 优化问题 WBC的核心思路是把机器人全身的目标任务转化为优化问题,在满足物理规律和约束条件的前提下,求出最合适的一组关节力矩$\tau$。 具体一点在WBC中求解的决策变量通常是如下三个: 最优的关节力矩$\tau$。 接触点反力$\lambda$。 机器人下一步的加速度$\dot{v} $。 优化问题转换为数学的目标函数如下: $$ \min_{x} | J_{\text{task}} \dot{v} - \dot{v}_{\text{des}} |^2 + | \tau |^2 + | \lambda |^2 $$ 公式中$J_{\text{task}}\dot{v} - \dot{v}{\text{des}}$表示实际加速度与期望加速度的误差,$J{\text{task}} \dot{v}$是在当前关节加速度下,末端/任务空的实际加速度,而$\dot{v}_{\text{des}}$是我们期望任务空间实现的期望加速度(比如手往前加速 0.5 m/s²,质心保持 0 加速度)。 同时优化问题还要满足以下约束条件: 动力学约束:$M(q)\dot{v} + h(q,v) = S^T \tau + J_c^T \lambda$。这是硬约束,控制器必须遵守。 接触约束:$J_c v = 0$,接触点不能乱动(不滑、不穿透)。 摩擦约束:$\lambda \in \mathcal{K}_{\text{fric}}$,接触力必须符合摩擦模型(不能无限大)。 力矩限制:$\tau_{\min} \leq \tau \leq \tau_{\max}$。 总结一下优化问题的目标函数意思就是要满足任务误差最小化(手/身体/质心的加速度跟踪目标),同时要满足能量或力矩最小化(不能浪费力),同时满足接触力正则化(力要稳定不能乱跳)。 方程有了,怎么求解了? 这个目标函数是一个二次型,符合QP,所以可以用现成的QP求解器来解,例如:OSQP、qpOASES、Gurobi(商业求解器)、CPLEX(商业求解器)、CVXPy(Python 封装,常用于原型),这里就不过多阐述了。 总结一下WBC核心就是要解决一个优化问题:二次目标(误差最小 + 力矩正则) + 动力学/接触/摩擦/限幅约束。其求解的方式通常使用QP 求解器(实时、高效、全局最优)。求解的结果是关节力矩$\tau$(给电机执行),同时还得到加速度$\dot{v}$和接触力$\lambda$。 示例 接下来我们调用cvxpy库看看示例,直观体验一下。 import cvxpy as cp import numpy as np # ---- 机械臂参数 ---- l1, l2 = 1.0, 1.0 m1, m2 = 1.0, 1.0 theta1, theta2 = np.deg2rad(45), np.deg2rad(30) # ---- 雅可比(末端位置对关节的导数)---- J_task = np.array([ [-l1*np.sin(theta1) - l2*np.sin(theta1+theta2), -l2*np.sin(theta1+theta2)], [ l1*np.cos(theta1) + l2*np.cos(theta1+theta2), l2*np.cos(theta1+theta2)] ]) # ---- 动力学质量矩阵 M(简化版)---- M = np.array([ [m1*l1**2 + m2*(l1**2 + l2**2 + 2*l1*l2*np.cos(theta2)), m2*(l2**2 + l1*l2*np.cos(theta2))], [m2*(l2**2 + l1*l2*np.cos(theta2)), m2*l2**2] ]) h = np.zeros(2) # 忽略重力/科氏项 # ---- 接触约束:假设末端y方向不能动(竖直方向约束)---- Jc = np.array([[0, 1]]) @ J_task # 取末端y方向的行 # ---- 变量 ---- ddq = cp.Variable(2) # 关节加速度 tau = cp.Variable(2) # 力矩 lam = cp.Variable(1) # 接触力 (竖直反作用力) # ---- 期望任务加速度(末端x方向=1.0, y方向=0.0)---- xddot_des = np.array([1.0, 0.0]) # ---- 目标函数:末端任务 + 力矩/接触力正则 ---- objective = cp.Minimize( cp.sum_squares(J_task @ ddq - xddot_des) + 0.01*cp.sum_squares(tau) + 0.01*cp.sum_squares(lam) ) # ---- 约束 ---- constraints = [ M @ ddq + h == tau + Jc.T @ lam, # 动力学方程 Jc @ ddq == 0, # 接触点不加速 tau >= -10, tau <= 10, lam >= 0 # 接触力必须向上推 ] # ---- 求解 ---- prob = cp.Problem(objective, constraints) prob.solve() print("Optimal joint accelerations:", ddq.value) print("Optimal torques:", tau.value) print("Optimal contact force λ:", lam.value) print("End-effector acc achieved:", J_task @ ddq.value) print("Desired end-effector acc :", xddot_des) 上面的示例中可以分为几部分: (1)任务 末端(手)在水平 $x$ 方向产生 $1.0 \text{m/s}^2$ 的加速度。在垂直 $y$ 方向不要加速(因为手撑在桌子上,不应该离开桌面)。 数学写法: $$ \ddot{x}_{des} = [1.0,0.0]^T $$ (2)决策变量 优化器要决定的量是 $$ \ddot{q} = [\ddot{\theta}_1,\ddot{\theta}_2]^T, \quad \tau = [\tau_1,\tau_2]^T, \quad \lambda $$ (3)要优化的目标函数 最小化 $$ \min_{\ddot{q},\;\tau,\;\lambda}| J_{\text{task}} \ddot{q} - \ddot{x}_{des} |^2+ 0.01 |\tau|^2+ 0.01 |\lambda|^2 $$ (4)约束条件 动力学约束:$M(q)\ddot{q} + h(q,\dot{q}) = \tau + J_c^T \lambda$ 接触约束:$J_c \ddot{q} = 0$ 力矩范围:$-10 \leq \tau_i \leq 10$ 接触力非负:$\lambda \geq 0$ (5)输出结果 最后调用 prob = cp.Problem(objective, constraints) prob.solve() 求解得出结果: Optimal joint accelerations: [ 0.50615867 -1.88900987] Optimal torques: [-1.12977187 -0.94450493] Optimal contact force λ: [1.29515155e-23] End-effector acc achieved: [ 9.77823461e-01 -5.00938335e-17] Desired end-effector acc : [1. 0.] QP 解出来的就是 最优关节加速度:$\ddot{q}$ 最优关节力矩:$\tau$ 接触反力:$\lambda$ 实际末端加速度:$\ddot{x} = J_{\text{task}} \ddot{q}$ 并与期望值 $\ddot{x}_{des}$ 对比。 上面的代码中只截取了关键部分,下面是绘制图像的效果如下,可以直观体会看看: WBC与MPC 上一篇文章我们分析了MPC,那么MPC与WBC什么关系了? MPC在WBC之上,MPC作为决策层做"未来几步的规划",比如预测未来1S内,质心应该怎么移动,脚该放哪里,其输出的是期望的任务轨迹。WBC是在下层,拿到MPC给的任务(目标加速度/姿态/接触序列)在动力学和接触约束下,求解QP得到当下这一瞬间的关节力矩$\tau$。简而言之MPC决定“机器人未来要往哪走”,WBC决定“当前每个关节该怎么出力”。 举个例子:双足机器人走路 MPC:优化未来 1 秒的 质心轨迹、摆腿位置、支撑相切换。输出:期望的 $\ddot{x}_{des}$(质心加速度)、脚的落点计划。 WBC:把这些期望当作任务输入,解 QP ,输出关节力矩 $\tau$,同时计算接触力 $\lambda$,保证机器人在每一步不摔倒。
- 
					
						  机器人控制利器:MPC入门与实践解析背景 MPC(Model Predictive Control)模型预测控制,是一种控制方法,广泛应用在机器人、无人驾驶、过程控制、能源系统等领域。它的核心思想用一句话来总结:利用系统模型预测未来,并通过优化选择当前最优的控制输入。 如上图是一个MPC应用框图,先来看看框图中的各个变量。 r(t):参考输入,系统希望达到的目标(设定值/reference signal),例如机器人要达到的位置、温度控制的目标值、车辆期望的速度等等。 e:误差e(t) = y(t) – r(t),用来计算实际的输出期望值的差距。 μ(t):控制输入,由MPC控制器计算出来,施加在系统上的控制量。 y(t):系统实际输出,比如位置、速度、温度等。 系统的目标是由MPC控制器计算输出一个μ(t)控制量,然后作用到系统,让系统的输出能够达到期望值。其中有一个反馈回路,形成一个闭环系统,实际的输出y(t)会反馈给比较器,与目标值对比,当未达到目标时,系统自动修正直到达到目标。 原理 系统模型 为了简单,我们先令系统的实际输出$y=x$,下面用数学建模来描述系统状态模型是 $$ x_{k+1} = A x_k + B u_k $$ 参数$x_k$:系统在时刻$k$的状态,比如汽车的位置和速度[p,v]。 参数$u_k$:系统在时刻$k$的控制输入,比如油门大小、方向盘角度、电机电压等。 A:状态的转移矩阵,决定了系统状态的演化方式。 B:输入矩阵,描述了控制输入如何影响状态的变换。 参数$x_{k+1}$:系统在下一时刻$k+1$的状态。 A和B是两个矩阵,A决定系统 在没有控制输入时,状态如何随时间演化。B决定控制输入$u_k$如何影响的状态。 上面的模型是线性模型,本文以此来进行分析。但是在实际场景中根据实际问题进行建模,模型可能是非线性的如下,这里就过多解释。 $$ x_{k+1} = f(x_k,u_k) $$ 预测未来 MPC原理就是从当前状态$x+k$出发,用模型递推,预测出未来的N步(如果看过之前关于ACT原理,其实MPC有很多类似之处): $$ x_{k+1},x_{k+2},x_{k+3},......,x_{k+N} $$ 目标函数 工程中我们最主要的目标是要求出控制量$u_t$以便让系统最终调整到我们预期的状态。 那如何来设计这个系统了? 前面我们建模了$k$时刻的状态$x_t$,那么这个状态+动作需要满足什么样的数学关系了? 与深度学习类似,我们要对状态+动作的数学关系求一个最小值的表达式。看公式: $$ J = \sum_{i=0}^{N-1} \Big[ (x_{k+i} - x_{k+i}^{\text{ref}})^T Q (x_{k+i} - x_{k+i}^{\text{ref}}) + u_{k+i}^T R u_{k+i} \Big] $$ 上面的公式可以分为两部分,前面部分代表的是k时刻输出状态与目标状态误差值,后面部分是控制的动作。也就是说我们最核心的是要误差越小越好,同时控制的动作不要太大。误差小好理解,动作变化不要太大是因为要保证控制动作要平滑。两部分都是求平方$e^2$和$u^2$放大比例(跟深度学习中的损失差不多),同时各自有一个权重Q和R,这两个参数是权重参数可调,Q用来平衡快速到目标,R用来调节平衡动作要平稳。 公式中$x$和$J$可以说是已知的,那么就可以求出$u$控制量了。 对于求解$u$,需要根据实际的模型,如果模型是线性模型+二次目标函数这样就比较简单使用二次规划QP可以快速解析或数值求解。如果是非线性模型,那么就变成非线性规划NLP,需要迭代求解器(如SQP、IPOPT)。当然如果是简单还可以使用穷举控制序列$U$,直接算$J$取最小。 约束条件 约束条件是在求解代价函数最小值过程中,显示的对控制量$u$、$x_k$做约束。这样的目的是比如对于车来说油门不能超过100%,速度不能超速,机械臂必能超过关节限位等。 因此在求救$J$时,可以添加约束条件,如下: $$ u_{\min} \leq u_k \leq u_{\max};x_{\min} \leq x_k \leq x_{\max} $$ 第一个是 控制输入约束(油门、方向盘角度不能无限大)。第二个是 状态约束(位置、速度等不能超过物理限制)。 工作流程 MPC控制系统中,其工作流程最核心的滚动时域控制,其核心点是每次预测出N步,但是并不是一下就全部执行完N步,因为预测也是有偏差同时在执行过程中会有变化。而是每次预测N步,但是只取第一步进行执行,然后根据新的输出结果重新预测下一个N步。这个其实跟ACT的时间集成有点类似,也就是在每个时间刻都会预测N步,而MPC是取第一步执行,而ACT是取k时刻和此前时刻的加权平均。 下面来看看具体的执行流程: 当前时刻$K$:系统处于某个状态如蓝线上面的红点,控制器采集到当前状态作为优化的起点。 预测未来N步:橙色窗口覆盖未来N步的时间区;红色虚线显示预测的未来轨迹,如果采用这一串控制动作$U=[u_k,u_{k+1}...]$系统怎么走。 优化并得到最优控制序列:在预测窗口里,MPC通过解$J$(误差+控制代价)最小值得到结果一整串最优控制动作序列$U$。 只执行一步:红色箭头指向当前窗口里的第一个控制输入$u_k$,下方蓝色阶梯控制曲线更新,系统真是状态推进到下一个时刻;上方状态曲线的红点更新到新的位置。 丢弃其余动作:窗口里的控制动作$(u_{k+1},u_{k+2}...)$丢弃,因为下一时刻会重新优化得到新的控制动作。 窗口前移,重复循环:橙色预测窗口右移一格$(k->k+1->k+2)$,控制器基于新状态重新预测、重新优化,再次执行第一步,丢弃其余周而复始直到达到目标。 因此整个过程核心就是MPC的滚动时域机制:预测未来——>优化整串动作——>执行第一步——>丢弃其余——>窗口前移——>重新计算。 示例程序 下面是一个简单的示例加深对MPC的认识。场景是一个一维的小车: 状态$x=[位置,速度]$,控制输入$u=加速度$。 小车从0开始,目标位置在10米,并希望最后速度接近0. 用MPC做控制,预测N步,计算代价,找到最优控制序列,但只执行第一个动作然后滚动。 求解$u$这里直接使用的是穷举法。 import numpy as np import matplotlib.pyplot as plt import matplotlib.animation as animation # --- 系统模型 (离散时间) --- dt = 0.2 # 状态空间模型: x_{k+1} = A x_k + B u_k # x = [位置, 速度],u = 加速度 A = np.array([[1, dt], [0, 1]]) B = np.array([[0.5*dt**2], [dt]]) # 初始状态: 位置=0, 速度=0 x = np.array([0.0, 0.0]) # 目标状态: 位置=10, 速度=0 (停在10米处) target = np.array([10.0, 0.0]) # --- MPC 参数 --- N = 5 # 预测时域长度 (未来看5步) U_candidates = [-1, 0, 1] # 控制输入候选集合 (加速度: -1=刹车, 0=不动, 1=加速) def simulate(x0, u_seq): """ 给定初始状态 x0 和一段控制序列 u_seq, 用系统模型递推未来轨迹,并计算代价 J """ x = x0.copy() cost = 0.0 traj = [x.copy()] # 保存预测轨迹 (用于可视化) for u in u_seq: # 状态更新 (预测未来) x = A @ x + B.flatten()*u traj.append(x.copy()) # 代价函数 J = 误差项 + 控制代价 err = x - target cost += err[0]**2 + 0.3*err[1]**2 + 0.1*(u**2) # 位置误差^2 + 速度误差^2(权重0.3) + 控制输入^2(权重0.1) return cost, np.array(traj) # --- MPC 主循环 (滚动时域控制) --- history_x = [] # 真实执行的状态轨迹 history_u = [] # 实际执行的控制输入 (只取最优序列的第一步) pred_trajs = [] # 每次优化得到的预测轨迹 (整串) for step in range(60): best_cost = 1e9 best_seq = None best_traj = None # 穷举所有可能的控制序列 U = [u_k, u_{k+1}, ..., u_{k+N-1}] for U in np.array(np.meshgrid(*[U_candidates]*N)).T.reshape(-1,N): cost, traj = simulate(x, U) if cost < best_cost: best_cost = cost best_seq = U # 当前最优控制序列 best_traj = traj # 当前最优预测轨迹 # 保存数据 (真实轨迹、控制输入、预测轨迹) history_x.append(x.copy()) history_u.append(best_seq[0]) # 只执行第一步 u_k* pred_trajs.append(best_traj) # 保存整条预测轨迹用于画红虚线 # 执行第一步 (滚动时域控制的核心:只执行u_k) u = best_seq[0] x = A @ x + B.flatten()*u # 收敛条件 (位置接近10, 速度≈0) if abs(x[0]-target[0]) < 0.1 and abs(x[1]-target[1]) < 0.1: history_x.append(x.copy()) history_u.append(0) # 终端时控制输入设为0 break history_x = np.array(history_x) # --- 动态可视化 --- fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6)) # 上图:位置随时间 ax1.axhline(target[0], color="green", linestyle="--", label="Target position") (line_real,) = ax1.plot([], [], "bo-", label="Real trajectory") # 蓝点=真实轨迹 (line_pred,) = ax1.plot([], [], "r--", label="Predicted trajectory") # 红虚线=预测轨迹 (point_exec,) = ax1.plot([], [], "ro", markersize=8, label="Execute point") # 红点=执行点 ax1.set_xlim(0, len(history_x)+N) ax1.set_ylim(0, target[0]+5) ax1.set_ylabel("Position") ax1.legend() # 下图:控制输入 (line_u,) = ax2.step([], [], where="post", label="Control input u") ax2.set_xlim(0, len(history_u)) ax2.set_ylim(min(U_candidates)-0.5, max(U_candidates)+0.5) ax2.set_xlabel("Time step") ax2.set_ylabel("u") ax2.legend() # --- 动画更新函数 --- def update(frame): # 蓝线:实际轨迹 line_real.set_data(np.arange(frame+1), history_x[:frame+1,0]) # 红虚线:预测轨迹 (窗口内) pred = pred_trajs[frame] line_pred.set_data(np.arange(frame, frame+len(pred)), pred[:,0]) # 红点:当前执行点 point_exec.set_data([frame], [history_x[frame,0]]) # 阶梯:控制输入 line_u.set_data(np.arange(frame+1), history_u[:frame+1]) return line_real, line_pred, point_exec, line_u # 动画循环:frames=len(pred_trajs),表示每个MPC优化时刻 ani = animation.FuncAnimation(fig, update, frames=len(pred_trajs), interval=800, blit=True, repeat=False) plt.show()
- 
					
						  语音生成模型:CosyVoice入门实践是什么 CosyVoice是阿里开源的一款文字转语音的开源模型,可以支持音色复刻。 怎么用 环境安装 (1)代码下载 git clone --recursive https://github.com/FunAudioLLM/CosyVoice.git cd CosyVoice git submodule update --init --recursive 因为CosyVoice仓库中还依赖了第三方的Matcha-TTS,所以克隆本地仓库后,还需要下载第三方的。 (2)创建conda环境 conda create -n cosyvoice -y python=3.10 conda activate cosyvoice pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/ --trusted-host=mirrors.aliyun.com 创建conda环境并安装依赖。如果没有安装cuda工具的话,还需要执行下面命令安装。 sudo apt install nvidia-cuda-toolkit (3)下载预训练模型 sudo apt update && sudo apt install git-lfs -y mkdir -p pretrained_models git clone https://www.modelscope.cn/iic/CosyVoice2-0.5B.git pretrained_models/CosyVoice2-0.5B git clone https://www.modelscope.cn/iic/CosyVoice-300M.git pretrained_models/CosyVoice-300M git clone https://www.modelscope.cn/iic/CosyVoice-300M-SFT.git pretrained_models/CosyVoice-300M-SFT git clone https://www.modelscope.cn/iic/CosyVoice-300M-Instruct.git pretrained_models/CosyVoice-300M-Instruct git clone https://www.modelscope.cn/iic/CosyVoice-ttsfrd.git pretrained_models/CosyVoice-ttsfrd 上面的模型文件选择一个即可,需要注意的是因为模型比较大,所以要在本地安装git-lfs才能下载大文件。 测试 python3 webui.py --port 50000 --model_dir pretrained_models/CosyVoice-300M 执行上面命令后,就可以登录网页输入http://127.0.0.1:50000/进行测试了。
- 
					
						  语音识别模型:SenseVoice入门实践是什么 SenseVoice是多语言识别的模型,支持语音转文字(ASR, Automatic Speech Recognition,自动语音识别),语种识别(LID, Language Identification),语音情感识别(SER, Speech Emotion Recognition),音频事件检测 / 声学事件分类(AED/AEC, Audio Event Detection / Classification),逆文本正则化 / 标点 / 富文本转写等。 怎么用 配置环境 配置一个conda sensevoice环境并且激活。 conda create -n sensevoice python=3.10 conda activate sensevoice 拉取代码,安装依赖 git clone https://github.com/FunAudioLLM/SenseVoice.git cd SenseVoice/ pip install -r requirements.txt 示例测试 代码中有两个示例分别是demo1.py和demo2.py,执行后可以看看效果。 python demo1.py 执行后会自动从modelscope上下载模型和相关配置到本地。 下载到本地后有模型和相关的配置文件以及示例音频。 (sensevoice) laumy@ThinkBook-14-G7-IAH:~/.cache/modelscope/hub/models/iic$ tree . ├── SenseVoiceSmall │ ├── am.mvn │ ├── chn_jpn_yue_eng_ko_spectok.bpe.model │ ├── configuration.json │ ├── config.yaml │ ├── example │ │ ├── en.mp3 │ │ ├── ja.mp3 │ │ ├── ko.mp3 │ │ ├── yue.mp3 │ │ └── zh.mp3 │ ├── fig │ │ ├── aed_figure.png │ │ ├── asr_results.png │ │ ├── inference.png │ │ ├── sensevoice.png │ │ ├── ser_figure.png │ │ └── ser_table.png │ ├── model.pt │ ├── README.md │ └── tokens.json └── speech_fsmn_vad_zh-cn-16k-common-pytorch ├── am.mvn ├── configuration.json ├── config.yaml ├── example │ └── vad_example.wav ├── fig │ └── struct.png ├── model.pt └── README.md 7 directories, 25 files 最后的识别效果为 还可以测试webui版本 python webui.py 然后网址输入:http://127.0.0.1:7860, 注意不能开代理否则会启动失败。 接口调用 主要简要分析一下使用funASR调用示例。 (1)模型加载 model = AutoModel(model=[str], device=[str], ncpu=[int], output_dir=[str], batch_size=[int], hub=[str], **kwargs) model:模型仓库中的名称 device:推理的设备,如gpu ncpu:cpu并行线程。 output_dir:输出结果的输出路径 batch_size:解码时的批处理,样本个数 hub:从modelscope下载模型。如果为hf,从huggingface下载模型。 (2)推理 res = model.generate(input=[str], output_dir=[str]) input:要解码的输入可以是wav文件路径, 例如: asr_example.wav。 output_dir: 输出结果的输出路径。 实时识别 编写一个示例实时识别 #!/usr/bin/env python3 # -*- encoding: utf-8 -*- # Real-time microphone transcription (pure memory) using ALSA (pyalsaaudio) + numpy import argparse import os import signal import sys import time from pathlib import Path from typing import Optional import numpy as np def _safe_imports() -> None: try: import alsaaudio # noqa: F401 except Exception: print( "缺少 pyalsaaudio,请先安装:\n pip install pyalsaaudio\nUbuntu 可能需要:sudo apt-get install -y python3-alsaaudio 或 alsa-utils", file=sys.stderr, ) raise def _safe_import_model(): try: from funasr import AutoModel from funasr.utils.postprocess_utils import rich_transcription_postprocess except Exception: print( "导入 funasr 失败。如果仓库根目录存在本地 'funasr.py',请重命名(如 'funasr_demo.py')以避免遮蔽外部库。", file=sys.stderr, ) raise return AutoModel, rich_transcription_postprocess def int16_to_float32(audio_int16: np.ndarray) -> np.ndarray: if audio_int16.dtype != np.int16: audio_int16 = audio_int16.astype(np.int16, copy=False) return (audio_int16.astype(np.float32) / 32768.0).clip(-1.0, 1.0) def resample_to_16k(audio_f32: np.ndarray, orig_sr: int) -> tuple[np.ndarray, int]: if orig_sr == 16000: return audio_f32, 16000 backend = os.environ.get("SV_RESAMPLE", "poly").lower() # poly|librosa|linear # 优先使用更快的 poly(scipy),其次 librosa,最后线性插值兜底 if backend in ("poly", "auto"): try: from scipy.signal import resample_poly # type: ignore y = resample_poly(audio_f32, 16000, orig_sr) return y.astype(np.float32, copy=False), 16000 except Exception: pass if backend in ("librosa", "auto"): try: import librosa # type: ignore y = librosa.resample(audio_f32, orig_sr=orig_sr, target_sr=16000) return y.astype(np.float32, copy=False), 16000 except Exception: pass # 线性插值兜底(最慢但零依赖) x = np.arange(audio_f32.size, dtype=np.float32) new_n = int(round(audio_f32.size * 16000.0 / float(orig_sr))) if new_n <= 1: return audio_f32, orig_sr new_x = np.linspace(0.0, x[-1] if x.size > 0 else 0.0, new_n, dtype=np.float32) y = np.interp(new_x, x, audio_f32).astype(np.float32) return y, 16000 def _build_cfg_from_env() -> dict: cfg = {} # runtime knobs try: cfg["min_rms"] = float(os.environ.get("SV_MIN_RMS", "0.003")) except Exception: cfg["min_rms"] = 0.003 try: cfg["overlap_sec"] = float(os.environ.get("SV_OVERLAP_SEC", "0.3")) except Exception: cfg["overlap_sec"] = 0.3 cfg["merge_vad"] = os.environ.get("SV_MERGE_VAD", "0") == "1" try: cfg["merge_len"] = float(os.environ.get("SV_MERGE_LEN", "2.0")) except Exception: cfg["merge_len"] = 2.0 cfg["debug"] = os.environ.get("SV_DEBUG") == "1" cfg["resample_backend"] = os.environ.get("SV_RESAMPLE", "poly").lower() cfg["dump_wav_path"] = os.environ.get("SV_DUMP_WAV") return cfg def _select_strong_channel(frame_any: np.ndarray, channels: int, debug: bool) -> np.ndarray: if channels <= 1: return frame_any frame_mat = frame_any.reshape(-1, channels) ch_rms = np.sqrt(np.mean(frame_mat.astype(np.float32) ** 2, axis=0)) sel = int(np.argmax(ch_rms)) if debug: print(f"[debug] multi-channel rms={ch_rms.tolist()}, select ch={sel}", flush=True) return frame_mat[:, sel] def _ensure_pcm_open(alsa_audio, device: str | None, samplerate: int, channels: int, period_frames: int) -> tuple[object, int, str, str]: tried: list[tuple[str, str, str]] = [] for dev in [device or "default", f"plughw:{(device or 'default').split(':')[-1]}" if (device and not device.startswith('plughw')) else None]: if not dev: continue for fmt in (alsa_audio.PCM_FORMAT_S16_LE, alsa_audio.PCM_FORMAT_S32_LE): try: p = alsa_audio.PCM(type=alsa_audio.PCM_CAPTURE, mode=alsa_audio.PCM_NORMAL, device=dev) p.setchannels(channels) p.setrate(samplerate) p.setformat(fmt) p.setperiodsize(period_frames) dtype = np.int16 if fmt == alsa_audio.PCM_FORMAT_S16_LE else np.int32 sample_bytes = 2 if dtype == np.int16 else 4 fmt_name = "S16_LE" if dtype == np.int16 else "S32_LE" return p, sample_bytes, fmt_name, dev except Exception as e: # noqa: BLE001 tried.append((dev, "S16_LE" if fmt == alsa_audio.PCM_FORMAT_S16_LE else "S32_LE", str(e))) continue raise RuntimeError(f"打开 ALSA 设备失败,尝试: {tried}") def _to_int16_mono(raw_bytes: bytes, sample_bytes: int, channels: int, debug: bool) -> np.ndarray: if sample_bytes == 2: frame_any = np.frombuffer(raw_bytes, dtype=np.int16) else: frame_any = np.frombuffer(raw_bytes, dtype=np.int32) frame_any = (frame_any.astype(np.int32) >> 16).astype(np.int16) if channels > 1: frame_any = _select_strong_channel(frame_any, channels, debug) return frame_any.astype(np.int16, copy=False) def _should_infer(audio_f32: np.ndarray, min_rms: float, debug: bool, frames_count: int) -> bool: if audio_f32.size == 0: return False rms = float(np.sqrt(np.mean(np.square(audio_f32)))) if rms < min_rms: if debug: print(f"[debug] frames={frames_count}, rms={rms:.4f} < min_rms={min_rms:.4f}, skip", flush=True) return False return True def _infer_block(model, arr: np.ndarray, sr: int, cfg: dict, language: str, running_cache: dict) -> str: prefer = os.environ.get("SV_INPUT_FORMAT", "f32") # candidate arrays f32 = arr.astype(np.float32, copy=False) i16 = (np.clip(f32, -1.0, 1.0) * 32767.0).astype(np.int16) candidates: list[tuple[np.ndarray, bool]] if prefer == "i16": candidates = [(i16, False), (f32, False), (i16[None, :], True), (f32[None, :], True)] elif prefer == "f32_2d": candidates = [(f32[None, :], True), (f32, False), (i16[None, :], True), (i16, False)] elif prefer == "i16_2d": candidates = [(i16[None, :], True), (i16, False), (f32[None, :], True), (f32, False)] else: candidates = [(f32, False), (i16, False), (f32[None, :], True), (i16[None, :], True)] res = None for cand, _is2d in candidates: try: try: res = model.generate( input=cand, input_fs=sr, cache=running_cache, language=language, use_itn=True, batch_size_s=60, merge_vad=cfg["merge_vad"], merge_length_s=cfg["merge_len"], ) except TypeError: res = model.generate( input=cand, fs=sr, cache=running_cache, language=language, use_itn=True, batch_size_s=60, merge_vad=cfg["merge_vad"], merge_length_s=cfg["merge_len"], ) break except Exception: continue if not res: return "" out = res[0].get("text", "") return out def run_in_memory( model_dir: str, device: str, language: str, samplerate: int, channels: int, block_seconds: float, alsa_device: Optional[str], ) -> None: _safe_imports() AutoModel, rich_post = _safe_import_model() # 关闭底层 tqdm 进度条等多余输出 os.environ.setdefault("TQDM_DISABLE", "1") print("加载模型...", flush=True) model = AutoModel( model=model_dir, trust_remote_code=True, remote_code="./model.py", vad_model="fsmn-vad", vad_kwargs={"max_single_segment_time": 30000}, device=device, disable_update=True, ) import alsaaudio # 更小的 periodsize,降低 I/O 错误概率;推理聚合由 block_seconds 控制 period_frames = max(256, int(0.1 * samplerate)) cfg = _build_cfg_from_env() # 打开设备,尝试 S16 -> S32;必要时切换 plughw 以启用自动重采样/重格式化 pcm, sample_bytes, fmt_name, open_dev = _ensure_pcm_open(alsaaudio, alsa_device, samplerate, channels, period_frames) if cfg["debug"]: print(f"[debug] opened ALSA device {open_dev} fmt={fmt_name} period={period_frames}", flush=True) print( f"开始录音(纯内存采集),设备={alsa_device or 'default'}, sr={samplerate}, ch={channels}, period={period_frames},按 Ctrl+C 停止。\n", flush=True, ) running_cache: dict = {} printed_text = "" # 聚合到这一帧阈值后才调用一次模型,降低调用频率与输出 min_frames = max(1, int(block_seconds * samplerate)) # 重叠帧数,减少句首/句尾丢失 overlap_frames = int(max(0, cfg["overlap_sec"]) * samplerate) block_buf: list[np.ndarray] = [] buf_frames = 0 stop_flag = False def handle_sigint(signum, frame): # noqa: ANN001 nonlocal stop_flag stop_flag = True signal.signal(signal.SIGINT, handle_sigint) # 连续读取 PCM,并聚合达到阈值后执行一次增量推理 while not stop_flag: try: length, data = pcm.read() except Exception as e: # noqa: BLE001 # 读失败时尝试重建(常见于某些 DMIC Raw) if cfg["debug"]: print(f"[debug] pcm.read error: {e}, reopen stream", flush=True) # 尝试使用 plughw 以获得自动格式/采样率适配 dev = alsa_device or "default" if not dev.startswith("plughw") and dev != "default": dev = f"plughw:{dev.split(':')[-1]}" pcm, sample_bytes, fmt_name, open_dev = _ensure_pcm_open(alsaaudio, dev, samplerate, channels, period_frames) length, data = pcm.read() if length <= 0: time.sleep(0.005) continue # 根据实际格式解析 bytes pcm_int16 = _to_int16_mono(data, sample_bytes, channels, cfg["debug"]) block_buf.append(pcm_int16) buf_frames += pcm_int16.size if buf_frames < min_frames: continue # 达到阈值,拼接并转换为 float32 [-1, 1] joined = np.concatenate(block_buf, axis=0) # 为下一个块保留尾部重叠 if overlap_frames > 0 and joined.size > overlap_frames: tail = joined[-overlap_frames:] block_buf = [tail.astype(np.int16, copy=False)] buf_frames = tail.size else: block_buf.clear() buf_frames = 0 audio_block_f32 = int16_to_float32(joined) # 若采样率不是 16k,重采样到 16k audio_block_f32, eff_sr = resample_to_16k(audio_block_f32, samplerate) if not _should_infer(audio_block_f32, cfg["min_rms"], cfg["debug"], joined.size): continue if cfg["debug"]: rms = float(np.sqrt(np.mean(np.square(audio_block_f32)))) print(f"[debug] frames={joined.size}, eff_sr={eff_sr}, rms={rms:.4f}, infer", flush=True) # 可选:调试时将当前块写到一个覆盖的临时 WAV(重采样后,单声道16k),方便用 aplay 检查采音 if cfg["dump_wav_path"]: import wave, tempfile dump_path = cfg["dump_wav_path"] or str( Path(tempfile.gettempdir()) / "sv_debug_block.wav" ) with wave.open(dump_path, "wb") as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(eff_sr) dump_i16 = (np.clip(audio_block_f32, -1.0, 1.0) * 32767.0).astype(np.int16) wf.writeframes(dump_i16.tobytes()) # 执行推理 res_text = _infer_block(model, audio_block_f32, eff_sr, cfg, language, running_cache) if not res_text: continue text = rich_post(res_text) if text.startswith(printed_text): new_part = text[len(printed_text) :] else: new_part = text if new_part: print(new_part, end="", flush=True) printed_text += new_part # 停止前做一次 flush:如果还有残留缓冲,强制推理,避免句尾丢失 if block_buf: try: joined = np.concatenate(block_buf, axis=0) audio_block_f32 = int16_to_float32(joined) audio_block_f32, eff_sr = resample_to_16k(audio_block_f32, samplerate) text = _infer_block(model, audio_block_f32, eff_sr, cfg, language, running_cache) text = rich_post(text) if text: print(text[len(printed_text):], end="", flush=True) except Exception: pass print("\n已停止。") def main() -> None: parser = argparse.ArgumentParser(description="Real-time mic transcription (in-memory ALSA)") parser.add_argument("--model", default="iic/SenseVoiceSmall", help="模型仓库或本地路径") parser.add_argument("--device", default="cuda:0", help="设备,如 cuda:0 或 cpu") parser.add_argument("--language", default="auto", help="语言代码或 'auto'") parser.add_argument("--samplerate", type=int, default=16000, help="采样率") parser.add_argument("--channels", type=int, default=1, help="通道数") parser.add_argument("--block-seconds", type=float, default=1.0, help="每次推理块时长(秒)") parser.add_argument("--min-rms", type=float, default=0.003, help="触发推理的最小能量阈值(0-1)") parser.add_argument("--overlap-seconds", type=float, default=0.3, help="块间重叠时长(秒),减少句首/句尾丢失") parser.add_argument("--merge-vad", action="store_true", help="启用 VAD 合并(默认关闭以获得更快输出)") parser.add_argument("--merge-length", type=float, default=2.0, help="VAD 合并的最大分段时长(秒)") parser.add_argument("--debug", action="store_true", help="打印调试信息(RMS、帧数等)") parser.add_argument("--alsa-device", default=None, help="ALSA 设备名(如 hw:0,0 或 default)") args = parser.parse_args() # 将 CLI 设置传入环境变量,供运行体内读取 if args.debug: os.environ["SV_DEBUG"] = "1" os.environ["SV_MIN_RMS"] = str(args.min_rms) os.environ["SV_MERGE_VAD"] = "1" if args.merge_vad else "0" os.environ["SV_MERGE_LEN"] = str(args.merge_length) os.environ["SV_OVERLAP_SEC"] = str(args.overlap_seconds) run_in_memory( model_dir=args.model, device=args.device, language=args.language, samplerate=args.samplerate, channels=args.channels, block_seconds=args.block_seconds, alsa_device=args.alsa_device, ) if __name__ == "__main__": main() 执行前先安装一下模块组件 pip install pyalsaaudio 接着运行,就可以实时说话转换为文字了。 python demo_realtime_memory.py --alsa-device plughw:0,6 --samplerate 48000 --channels 2 --block-seconds 1.0 --language zh --device cuda:0 参考:https://github.com/FunAudioLLM/SenseVoice
- 
					
						  Isaac Sim 快速入门:三种工作流程示例简介 如果是NVIDIA Isaac Sim的新用户,可以按照本文的两个示例来体验Isaac Sim。本文主要提供Isaac Sim基础使用教程、机器人基础教程。 在快速入门教程中,所有可通过 GUI 执行的操作同样也能用 Python 实现。您可以在 GUI 操作与 Python 脚本之间自由切换。您在 GUI 中创建的所有内容都能保存为 USD 文件的一部分。 例如,您可以通过图形界面创建世界,并为机器人添加所需动作。随后将整个 USD 文件导入独立的 Python 脚本中,根据需求系统性地修改属性。 基础使用教程 本教程涵盖 Isaac Sim 的基础操作,包括界面导航、场景对象添加、查看对象基本属性以及运行模拟等内容。 通过本教程,您将从空白场景开始,根据三种不同工作流程的选择,最终实现机器人运动控制。提供三种不同工作流程的目的是展示 Isaac Sim 可根据需求以多种方式灵活使用。 可以查看两种工作流中的脚本以了解它们的差异。通过对比分析,有助于掌握如何执行完全相同的任务: 扩展脚本可在Window > Examples > Robotics Examples中找到,然后单击浏览器右上角的**Open Script **按钮。 独立脚本位于 \<isaac-sim-root-dir>/standalone_examples/tutorials/ 文件夹内。 可以通过编辑扩展示例中的任意脚本来体验"hot-reloading"功能。保存文件后,无需关闭模拟器即可立即看到变更生效。 在官方教程中有3个标签页,三个标签页执行相同操作并达成相同结果。 GUI: 图形用户界面 Extensions:扩展功能 Standalone Python:独立Python环境 GUI方式 步骤1:启动Isaac Sim linux:cd ~/isaacsim && ./isaac-sim.selector.sh windows:双击isaac-sim.selector.bat 模拟器完全加载后,创建新场景: 从顶部菜单栏点击File > New。首次启动 Isaac Sim 时,可能需要 5-10 分钟完成初始化。 步骤2: 添加地平面 为场景添加地平面:从顶部菜单栏点击Create > Physics > Ground Plane。 步骤3:添加光源 可以为场景添加光源以照亮其中的物体。如果场景中有光源但没有物体反射光线,场景仍会显得昏暗。 在顶部菜单栏中,点击Create > Lights > Distant Light。 步骤4:添加视觉立方体 "视觉"立方体是指没有附加物理属性的立方体,例如没有质量、没有碰撞体积。这种立方体不会受重力影响下落,也不会与其他物体发生碰撞。 从顶部菜单栏中,依次点击Create > Shape > Cube. 在用户界面最左侧找到箭头图标并点击Play。运行模拟时立方体不会有任何动作。 步骤5:移动、旋转与缩放立方体 使用左侧工具栏上的各种操控工具来操作立方体。 按下"W"键或点击移动工具即可拖拽移动立方体。通过点击箭头并拖拽可单轴移动,点击彩色方块并拖拽可双轴移动,点击工具中心的圆点并拖拽则可三轴自由移动。 按下“E”键或点击旋转控制器来旋转立方体。 按下“R”键或点击缩放控制器来调整立方体大小。点击箭头并拖动可单维度缩放,点击彩色方块并拖动可双维度缩放,点击控制器中心的圆圈并拖动则可实现三维同步缩放。 按下“esc”键取消选中立方体。 对于“移动”和“旋转”操作,可选择基于局部坐标系或世界坐标系进行操作。长按控制器即可查看选项。 可以通过立方体的Property属性面板进行更精确的修改,只需在对应输入框中输入具体数值即可。点击输入框旁的蓝色方块可将数值重置为默认值。 步骤6:添加物理与碰撞属性 常见的物理属性包括质量和惯性矩阵,这些属性使物体能够在重力作用下下落。碰撞属性则决定了物体能否与其他物体发生碰撞。 物理和碰撞属性可以分别添加,因此你可以创建一个能与其他物体碰撞但不受重力影响的物体,或是受重力影响但不会与其他物体碰撞的物体。但在多数情况下,这两个属性会同时添加。 为立方体添加物理和碰撞属性: 在场景树中找到对象("/World/Cube")并高亮显示它。 从工作区右下角的Property属性面板中,点击"Add"按钮并在下拉菜单中选择Physics。这将显示可添加到对象的一系列属性选项。 选择Rigid Body with Colliders Preset“带碰撞器的刚体预设”可为对象同时添加物理和碰撞网格。 按下播放Play按钮,观察立方体在重力作用下坠落并与地平面发生碰撞。 教程结束,记得保存你的工作。 扩展功能方式 通过一个名为"脚本编辑器"的现有扩展模块来演示扩展工作流的特性。脚本编辑器允许用户通过 Python 与场景进行交互。主要使用与独立 Python 工作流相同的 Python API。当我们开始与模拟时间轴交互时,特别是下一个教程中,这两种工作流的区别将变得清晰。 步骤1:启动 启动一个新的 Isaac Sim 实例,转到顶部菜单栏并点击Window > Script Editor。 步骤2:添加地平面 要通过交互式 Python 添加地平面,请将以下代码片段复制粘贴到脚本编辑器中,然后点击底部的运行Run按钮执行。 from isaacsim.core.api.objects.ground_plane import GroundPlane GroundPlane(prim_path="/World/GroundPlane", z_position=0) 步骤3:添加光源 可以为场景添加光源以照亮其中的物体。如果场景中有光源但没有物体反射光线,场景仍会显得昏暗。 在脚本编辑器中新建一个标签页(Tab > Add Tab)。 在脚本编辑器中复制粘贴以下代码片段并运行,即可添加光源。 import omni.usd from pxr import Sdf, UsdLux stage = omni.usd.get_context().get_stage() distantLight = UsdLux.DistantLight.Define(stage, Sdf.Path("/DistantLight")) distantLight.CreateIntensityAttr(300) 步骤4:添加视觉立方体 在脚本编辑器中新建一个标签页 (Tab > Add Tab)。 在脚本编辑器中复制粘贴以下代码片段并运行,即可添加两个立方体。我们将保留其中一个作为纯视觉对象,同时为另一个添加物理和碰撞属性以便对比。 import numpy as np from isaacsim.core.api.objects import VisualCuboid VisualCuboid( prim_path="/visual_cube", name="visual_cube", position=np.array([0, 0.5, 0.5]), size=0.3, color=np.array([255, 255, 0]), ) VisualCuboid( prim_path="/test_cube", name="test_cube", position=np.array([0, -0.5, 0.5]), size=0.3, color=np.array([0, 255, 255]), ) Isaac Sim 核心 API 是对原生 USD 和物理引擎 API 的封装。您可以使用原生 USD API 添加一个视觉立方体(不含物理和颜色属性)。请注意原生 USD API 更为冗长,但能提供对每个属性的更精细控制。 from pxr import UsdPhysics, PhysxSchema, Gf, PhysicsSchemaTools, UsdGeom import omni # USD api for getting the stage stage = omni.usd.get_context().get_stage() # Adding a Cube path = "/visual_cube_usd" cubeGeom = UsdGeom.Cube.Define(stage, path) cubePrim = stage.GetPrimAtPath(path) size = 0.5 offset = Gf.Vec3f(1.5,-0.2,1.0) cubeGeom.CreateSizeAttr(size) if not cubePrim.HasAttribute("xformOp:translate"): UsdGeom.Xformable(cubePrim).AddTranslateOp().Set(offset) else: cubePrim.GetAttribute("xformOp:translate").Set(offset) 步骤5:添加物理与碰撞属性 在 Isaac Sim 核心 API 中,我们为常用对象编写了封装器,这些封装器附带所有物理和碰撞属性。您可以通过以下代码片段添加一个具有物理和碰撞属性的立方体。 import numpy as np from isaacsim.core.api.objects import DynamicCuboid DynamicCuboid( prim_path="/dynamic_cube", name="dynamic_cube", position=np.array([0, -1.0, 1.0]), scale=np.array([0.6, 0.5, 0.2]), size=1.0, color=np.array([255, 0, 0]), ) 另外,如果想修改现有对象使其具备物理和碰撞属性,可以使用以下代码片段。 from isaacsim.core.prims import RigidPrim RigidPrim("/test_cube") from isaacsim.core.prims import GeometryPrim prim = GeometryPrim("/test_cube") prim.apply_collision_apis() 点击播放Play按钮,观察立方体在重力作用下坠落并与地平面碰撞。 步骤6: 移动、旋转与缩放立方体 使用核心 API 移动物体: import numpy as np from isaacsim.core.prims import XFormPrim translate_offset = np.array([[1.5,1.2,1.0]]) orientation_offset = np.array([[0.7,0.7,0,1]]) # note this is in radians scale = np.array([[1,1.5,0.2]]) stage = omni.usd.get_context().get_stage() cube_in_coreapi = XFormPrim(prim_paths_expr="/test_cube") cube_in_coreapi.set_world_poses(translate_offset, orientation_offset) cube_in_coreapi.set_local_scales(scale) 使用原始 USD API 移动物体: from pxr import UsdGeom, Gf import omni.usd stage = omni.usd.get_context().get_stage() cube_prim = stage.GetPrimAtPath("/visual_cube_usd") translate_offset = Gf.Vec3f(1.5,-0.2,1.0) rotate_offset = Gf.Vec3f(90,-90,180) # note this is in degrees scale = Gf.Vec3f(1,1.5,0.2) # translation if not cube_prim.HasAttribute("xformOp:translate"): UsdGeom.Xformable(cube_prim).AddTranslateOp().Set(translate_offset) else: cube_prim.GetAttribute("xformOp:translate").Set(translate_offset) # rotation if not cube_prim.HasAttribute("xformOp:rotateXYZ"): # there are also "xformOp:orient" for quaternion rotation, as well as "xformOp:rotateX", "xformOp:rotateY", "xformOp:rotateZ" for individual axis rotation UsdGeom.Xformable(cube_prim).AddRotateXYZOp().Set(rotate_offset) else: cube_prim.GetAttribute("xformOp:rotateXYZ").Set(rotate_offset) # scale if not cube_prim.HasAttribute("xformOp:scale"): UsdGeom.Xformable(cube_prim).AddScaleOp().Set(scale) else: cube_prim.GetAttribute("xformOp:scale").Set(scale) 独立python环境方式 脚本位于standalone_examples/tutorials/getting_started.py,要运行该脚本,请打开终端,导航至 Isaac Sim 安装根目录,并执行以下命令: ./python.sh standalone_examples/tutorials/getting_started.py 机器人基础教程 本小节介绍如何将机器人添加到场景中、移动机器人以及检查机器人状态。在开始教程前,请确保已经完成了上一章节isaac sim基础使用教程。 GUI方式 步骤1:向场景中添加机器人 新建场景:通过File > New Stage.。 添加机器人:向场景添加机器人,从顶部菜单栏点击Create > Robots > Franka Emika Panda Arm。 步骤2:检查机器人 使用物理检查器查看机器人关节属性。 前往Tools > Physics > Physics Inspector.。右侧将打开一个窗口。 选择 Franka 进行检查。窗口默认会显示关节信息,例如上下限位及默认位置。 点击右上角的三横线图标可查看更多选项,例如关节刚度和阻尼系数。 可选)修改这些数值,观察舞台上机器人随参数变化的运动。修改成功后会出现绿色对勾标记。 点击绿色对勾按钮,将当前参数设为机器人新的默认值。 步骤3:控制机器人 基于图形界面的机器人控制器位于 Omniverse 可视化编程工具 OmniGraphs 中。OmniGraph 相关章节提供了更深入的教程指导。本教程将通过快捷工具生成控制图,然后在 OmniGraph 编辑器中查看该控制图。 通过菜单栏选择 Tools > Robotics > Omnigraph Controllers > Joint Position.来打开控制图生成器。。 在新弹出的关节位置控制器**Articulation Position Controller Inputs **输入窗口中,点击 Robot Prim 字段旁的添加"Add"按钮。 选择 Franka 作为目标对象。 点击确定生成图表。 要移动机器人的话按照下面步骤 在右上角的“舞台”选项卡中,选择Graph > Position_Controller.。 选择 JointCommandArray 节点。您可以通过在舞台树中选择该节点,或在图表编辑器中选择该节点来完成此操作。 在右下角的Property选项卡中,可以看到关节命令值。构造数组节点Construct Array Node下的输入Inputs项对应机器人上的关节,从基座关节开始。 点击+按住+拖动不同的数值字段,或输入不同的值,可以看到机械臂位置发生变化。 点击Play开始模拟。 要生成可视化的图标 打开图表编辑器窗口:**Window > Graph Editors > Action Graph. **。该编辑器窗口会在包含机器人的视口选项卡下方以新选项卡形式打开。 调出新打开的浏览器标签页。 点击位于图形编辑器窗口中央的Edit Action Graph选项。 从列表中选择唯一存在的图形。 选择一个数组并查看Stage和Property选项卡,以了解每个数组节点关联的值。 在图形中选择"关节控制器"Articulation Controller对象以查看其属性。 Extension方式 步骤1:向场景添加机器人 新建一个场景(File > New)。要将机器人添加到场景中,请将以下代码片段复制粘贴到脚本编辑器中并运行。 import carb from isaacsim.core.prims import Articulation from isaacsim.core.utils.stage import add_reference_to_stage from isaacsim.storage.native import get_assets_root_path import numpy as np assets_root_path = get_assets_root_path() if assets_root_path is None: carb.log_error("Could not find Isaac Sim assets folder") usd_path = assets_root_path + "/Isaac/Robots/FrankaRobotics/FrankaPanda/franka.usd" prim_path = "/World/Arm" add_reference_to_stage(usd_path=usd_path, prim_path=prim_path) arm_handle = Articulation(prim_paths_expr=prim_path, name="Arm") arm_handle.set_world_poses(positions=np.array([[0, -1, 0]])) 步骤2:检查机器人 Isaac Sim 核心 API 提供了许多函数调用来获取机器人相关信息。以下是查询关节数量与名称、各类关节属性以及关节状态的示例代码。 在脚本编辑器中新建标签页,复制粘贴以下代码片段。该操作需在上一步添加机器人完成后执行(此时 arm_handle 已创建)。运行代码前需先点击播放Play按钮,这些命令需在物理引擎运行状态下才能生效。 # Get the number of joints num_joints = arm_handle.num_joints print("Number of joints: ", num_joints) # Get joint names joint_names = arm_handle.joint_names print("Joint names: ", joint_names) # Get joint limits joint_limits = arm_handle.get_dof_limits() print("Joint limits: ", joint_limits) # Get joint positions joint_positions = arm_handle.get_joint_positions() print("Joint positions: ", joint_positions) 请注意,点击"运行"时仅会打印一次状态信息,即使模拟正在持续运行。如需查看最新状态,需反复点击"运行"按钮。若希望在每个物理步长都打印信息,需要将这些命令插入到每个物理步长都会执行的回调函数中。在章节"工作流程"中详细讲解时间步进机制。 要将命令插入物理回调中,请在脚本编辑器的单独标签页中运行以下代码片段。 import asyncio from isaacsim.core.api.simulation_context import SimulationContext async def test(): def print_state(dt): joint_positions = arm_handle.get_joint_positions() print("Joint positions: ", joint_positions) simulation_context = SimulationContext() await simulation_context.initialize_simulation_context_async() await simulation_context.reset_async() simulation_context.add_physics_callback("printing_state", print_state) asyncio.ensure_future(test()) 按下播放键启动模拟,然后运行该代码片段。您将看到终端在每个物理步骤打印出的信息。 如果不再需要每个物理步骤都打印信息,可以通过运行以下代码片段来移除物理回调。 simulation_context = SimulationContext() simulation_context.remove_physics_callback("printing_state") 步骤3:控制机器人 在 Isaac Sim 中有多种控制机器人的方式。最底层是直接发送关节指令来设置位置、速度和力矩。以下是通过关节层级的 Articulation API 控制机器人的示例。 在脚本编辑器中新建一个标签页,复制粘贴以下代码片段。该代码需在前述添加机器人步骤完成后运行(此时 arm_handle 已建立)。运行代码片段前请先点击播放键Play。这些指令需在物理引擎运行状态下生效。我们将提供两个位置供您切换。若您已将上述打印状态代码片段添加至每个物理步骤,当机器人移动时您应能看到打印的关节编号发生变化。 # Set joint position randomly arm_handle.set_joint_positions([[-1.5, 0.0, 0.0, -1.5, 0.0, 1.5, 0.5, 0.04, 0.04]]) # Set all joints to 0 arm_handle.set_joint_positions([[0.0, 0.0, 0.0 , 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]) 与前述 get_joint_positions 功能类似,此处的 set_joint_positions 仅在您点击"运行"时执行一次。若您希望在每个物理步骤发送指令,则需将这些命令插入到每个物理步骤都会执行的物理回调函数中。 独立Python方式 脚本位于 standalone_examples/tutorials/getting_started_robot.py 。要运行该脚本,请打开终端,导航至 Isaac Sim 安装的根目录,并执行以下命令: ./python.sh standalone_examples/tutorials/getting_started_robot.py 到此就完成了基本的操作了,接下来就是一系列的教程可以参考:《机器人设置教程系列》。
- 
					
						  Isaac Sim v5.0.0:探索AI 机器人仿真平台什么是isaac sim NVIDIA Issac Sim是一款基于NVIDIA omniverse构建的参考应用应用程序,使开发人员能够在基于物理的虚拟环境开发、模拟和测试AI机器人。 设计 Isaac Sim提供了一系列工作流程,用于导入和调整最常见格式(包括Onshape、统一机器人描述格式URDF和MuJoco XML格式MJCF)设计的机械系统。这通过使用通用常见描述USD实现,该开源3D常见描述API具有高度可扩展性,是Isaac Sim核心的统一数据交换格式。 微调与训练 Isaac Sim的核心功能在与其仿真能力本身:这是一个基于GPU的高保真physX物理引擎,能够支持工业规模的传感器PTX渲染。该平台通过直接调用GPU,实现了对各类传感器(包括摄像头、激光雷达和接触式传感器)的仿真模拟。这种能力进而支持数字孪生技术的实现,让您的端到端流程在真实机器人启动前就能完成测试运行。Isaac Sim提供了一整套工具链:通过Replicator生成合成数据,利用Omnigraph编程仿真环境,调整PhysX参数以匹配实现物理特性,最终通过强化学习RL等多样化方法训练控制智能体。 部署 Isaac Sim预先配备了所有必要组建,不仅能将智能体部署到真实机器人,还能构建与这类系统完全集成的应用程序。Omniverse提供了应用基础设施API,包括图形界面和文件管理功能。该平台还提供了与ROS 2桥接API,实现真实机器人与仿真的直接通信,同时搭建NVIDIA Isaac ROS:一套高性能硬件加速的ROS 2工具包,专为打造自主机器人而设计。 快速入门 快速安装:一小时内完成安装并开始使用。 Isaac Sim基础教程:助您快速上手 NVIDIA Isaac 仿真平台 工作站安装:本地工作站安装指南。 容器安装:远程桌面服务器安装指南。 开发工具:用于调试和开发的工具与环境。 python脚本与教程:使用 NVIDIA Isaac Sim 核心 Python API 构建环境、机器人和任务的工具与教程。 图形用户界面参考:通过图形用户界面了解 NVIDIA Isaac Sim 中的机器人基础概念。 导入与导出功能:支持多种文件格式的机器人及资产导入导出功能。 机器人设置:Isaac Sim 提供的机器人修改工具集。 机器人设置教程系列:关于机器人配置工具及工作流程的系列教学。 机器人仿真:用于模拟机器人的控制器与运动生成工具。 ROS2:ROS2桥接与接口。 Isaac Lab:强化学习框架与克隆器API。 合成数据生成:用于生成合成书记的工具集与工作流程。 数字孪生::用于构建和操作数字孪生的工具,如仓库物流、Cortex 和地图绘制。 系统架构 Isaac Sim旨在支持新型机器人工具的创建,并增强现有工具的功能。该平台为为C++和Python提供了灵活的API,可根据需求以不同深度集成到项目中。平台目标并非与现有软件竞争,而是与之协调并提升其能力。为此,Isaac Sim的许多组建都是开源的,可自由独立使用。可以在Onshape中设计机器人,用Isaac Sim模拟传感器,并通过ROS或其他消息系统控制场景。同样,也可以完全基于isaac sim提供的ingt构建完整的独立应用程序。 Omniverse Kit Isaac Sim基于Omniverse Kit构建,这是一个用于开发原生Omniverse应用和微服务的工具包。Omniverse Kit通过一系列轻量级插件提供多样化功能。这些插件采样C语言接口开发以确保API持久兼容性,同时提供python解释器以便进行便捷的脚本编写和定制。 通过 Python API 可以为 Omniverse Kit 编写新扩展,或为 Omniverse 创建新体验。 开发工作流 Isaac Sim基于C++和Python构建,通常分别通过编译插件和绑定进行操作。这意味着该平台能够支持多种工作流程,拥有构建和交互利用Isaac Sim项目。Isaac Sim提供完整的OMniverse应用程序,拥有与机器人交互和仿真,虽然这是用户与平台互动的最常见方式,但绝非唯一途径。Isaac Sim还以VS code和Jupyter Notebook扩展形式提供直接python开发支持。此外Isaac Sim不仅限与同步操作,还能通过ROS2实现硬件在环运行,从而促进仿真到现实的迁移以及数字孪生应用。 USD格式 NVIDIA Isaac Sim采用USD通过常见描述文件格式呈现常见。Universal Scene Description(USD)是由皮克斯开发的一种易于扩展的开源 3D 场景描述文件格式,专为内容创作和不同工具间的交互而设计。凭借其强大功能和通用性,USD 不仅被视觉特效领域广泛采用,还应用于建筑、设计、机器人、制造等多个学科领域。 安装指南 Isaac Sim支持windows和Linux系统安装。可通过容器( container)、工作站(workstation)、云端(in the cloud)、直播流(livestream)或者python环境进行部署,根据使用场景,还可以自定义硬件配置。 快速安装 快速安装适用于演示场景,可让您了解完整产品的功能概览。完成快速安装后,您能创建包含机器人的虚拟房间,这将更全面的展示产品能力。该只能面向具备基础计算机知识的安装人员。 windows或linux系统快速安装步骤: (1)下载以下任意安装包 windows: windows系统兼容性检查工具 linux:linux系统兼容性检查工具 (2)将安装包解压至制定文件夹 (3)运行脚本检查 window:请双击 omni.isaac.sim.compatibility_check.bat。 linux:./omni.isaac.sim.compatibility_check.sh 更多信息参阅:Isaac Sim兼容性检查 (4)下载任意一个安装包 window:https://download.isaacsim.omniverse.nvidia.com/isaac-sim-standalone-5.0.0-windows-x86_64.zip linux:https://download.isaacsim.omniverse.nvidia.com/isaac-sim-standalone-5.0.0-linux-x86_64.zip (5)创建一个isaac-sim的文件夹 在windows的C:/或linux根目录下直接创建一个名为isaac-sim的文件夹。然后将下载的文件解压到文件夹中去。 (6)在isaac-sim文件夹中,执行操作 window:请双击 isaac-sim.selector.bat linux:命令窗口中运行 ./post_install.sh ,然后运行 ./isaac-sim.selector.sh。 (7)在issac应用选择起窗口,选择"start" 有关应用选择器的详细信息,请参阅《Isaac Sim 应用选择器》。随后将打开另一个命令窗口并运行脚本,此过程可能比预期耗时更长。在此期间由于会出现空白窗口,可能看似安装失败。请继续等待。 (8)issac启动成功 (9)选择 选择创建一个房间 Create > Environment > Simple Room. (10)选择创建一个机械臂 Create > Robots > Franka Emika Panda Arm. (11)点击运行模拟 在屏幕最左侧寻找箭头按钮,点击它来运行一段简短模拟。 isaac 系统需求 对操作系统需求如下 对驱动的要求如下 工作站安装 工作站安装方式是在本地允许模拟器,需要对本地电脑有较高的要求,官方要求最低的配置要为RTX4080的显卡。因此若本地配置了GPU的windows或linux系统上以GUI应用程序允许isaac sim,推荐采用工作站安装方式。下面是安装步骤 (1)isaac Sim兼容性检查工具 Isaac 兼容性检查工具是一款轻量级应用程序,可通过编程方式检查本地的软硬件要求,会给出运行NVIDIA isaac sim时那些要求满足或不满足。 下载工具:Latest Release兼容性工具 解压:将压缩包解压到指定文件夹。 运行:在Linux系统上云霄omni.isaac.sim.compatibility_check.sh脚本,在windows系统上运行omni.isaac.sim.compatibility_check.bat文件。 点击工具的"Test Kit"按钮就会显示测试结果。 应用程序会以不同颜色高亮显示以下状态: 绿色:表示优秀 浅绿色:表示良好 橙色:表示基本满足,建议更高配置 红色:不足/不支持 应用程序检查维度为: NVIDIA GPU:驱动程序版本、支持RTX功能的GPU、显存容量。 CPU、内存和存储:CPU处理器、CPU核心数量、运行内存、可用存储空间。 others:操作系统、显示设备。 对于操作系统如果ubuntu大于版本号也会变成红色,可先测试是否可运行,实测是可以,但不排除有兼容性问题。 (2)下载软件包 下载链接:Latest Release,根据自己的系统选择软件包下载的本地。 在本地创建一个isaacsim文件夹,然后将压缩包解压到文件夹。 (3)运行启动 先创建extension_examples 的符号链接,请运行 post_install 脚本。 linux:./post_install.sh windows:双击 post_install.bat 文件 然后就可以启动应用程序 linux: 执行 ./isaac-sim.selector.sh windows:双击isaac-sim.selector.bat 文件 启动后会弹出以下界面 在弹出的窗口中选择isaac Sim Full,然后点击START就可以运行。启动过程中可能要一点时间,如果期间弹出“程序无法响应”,可以选择等待,以免被误杀。 启动后就可以开始第一个基础教程了:基础教程。 总结一下命令执行的实例: linux系统 mkdir ~/isaacsim cd ~/Downloads unzip "isaac-sim-standalone-5.0.0-linux-x86_64.zip" -d ~/isaacsim cd ~/isaacsim ./post_install.sh ./isaac-sim.selector.sh window系统 mkdir C:\isaacsim cd %USERPROFILE%/Downloads tar -xvzf "isaac-sim-standalone-5.0.0-windows-x86_64.zip" -C C:\isaacsim cd C:\isaacsim post_install.bat isaac-sim.selector.bat Docker容器安装 在远程服务器或者云端部署isaac sim建议使用docker容器的方式。 (1)检查系统是否满足需求 首先先确保系统满足运行NVIDIA isaac Sim所需的系统要求和驱动程序要求。 (2)安装docker Docker installation using the convenience script curl -fsSL https://get.docker.com -o get-docker.sh sudo sh get-docker.sh Post-install steps for Docker sudo groupadd docker sudo usermod -aG docker $USER newgrp docker Verify Docker docker run hello-world 详细的docker安装步骤见docker安装,安装后的配置步骤见配置步骤。 (3)安装NVIDIA容器工具包 Configure the repository curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg \ && curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list | \ sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' | \ sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list \ && \ sudo apt-get update Install the NVIDIA Container Toolkit packages sudo apt-get install -y nvidia-container-toolkit sudo systemctl restart docker Configure the container runtime sudo nvidia-ctk runtime configure --runtime=docker sudo systemctl restart docker Verify NVIDIA Container Toolkit docker run --rm --runtime=nvidia --gpus all ubuntu nvidia-smi 安装最新版本的 NVIDIA 容器工具包以获取安全补丁。 (4)容器部署 docker pull nvcr.io/nvidia/isaac-sim:5.0.0 拉取isaac sim容器。 然后以交互式bash会话运行isaac sim容器。 docker run --name isaac-sim --entrypoint bash -it --runtime=nvidia --gpus all -e "ACCEPT_EULA=Y" --rm --network=host \ -e "PRIVACY_CONSENT=Y" \ -v ~/docker/isaac-sim/cache/kit:/isaac-sim/kit/cache:rw \ -v ~/docker/isaac-sim/cache/ov:/root/.cache/ov:rw \ -v ~/docker/isaac-sim/cache/pip:/root/.cache/pip:rw \ -v ~/docker/isaac-sim/cache/glcache:/root/.cache/nvidia/GLCache:rw \ -v ~/docker/isaac-sim/cache/computecache:/root/.nv/ComputeCache:rw \ -v ~/docker/isaac-sim/logs:/root/.nvidia-omniverse/logs:rw \ -v ~/docker/isaac-sim/data:/root/.local/share/ov/data:rw \ -v ~/docker/isaac-sim/documents:/root/Documents:rw \ nvcr.io/nvidia/isaac-sim:5.0.0 使用 -e "ACCEPT_EULA=Y" 标志即表示您接受 NVIDIA Omniverse 许可协议中规定的镜像许可协议。 使用 -e "PRIVACY_CONSENT=Y" 标志即表示您同意数据收集与使用协议中的条款。不设置此标志即可选择退出数据收集。 使用-e "PRIVACY_USERID=\<emai\l>" 标志可选择性地设置用于标记会话日志。 最后以原生直播模式启动 isaac sim ./runheadless.sh -v 运行直播客户端前,必须确保 Isaac Sim 应用已加载就绪。Isaac Sim 可能需要几分钟才能完全加载。-v 标志用于在着色器缓存预热时显示额外日志。要确认这一点,请留意控制台或日志中的这一行:Isaac Sim Full Streaming App is loaded。 首次加载 Isaac Sim 时,着色器缓存需要较长时间。后续运行 Isaac Sim 会更快,因为着色器已被缓存,且容器运行时缓存会被挂载。 (5)安装isaac sim WebRTC流媒体客户端 见后续章节。输入运行 Isaac Sim 容器的机器或实例的 IP 地址,点击连接按钮开始实时流传输。 云部署 Isaac Sim以容器形式提供,可在本地运行,也可在配备NVIDIA RTX的亚马逊云服务、微软、谷歌云平台、腾讯云或阿里云上运行,并支持将应用程序直接流式传输到您的桌面。这种基于云的交付方式能为任何桌面系统提供最新的RTX图像处理能力与性能,无需本地配置NVIDIA RTX GPU。 根据选择的云服务提供商,提供以下可选方案。 NVIDIA Brev:NVIDIA Brev Instructions AWS:Amazon Web Instructions Azure:Microsoft Cloud Instructions GCP:Google Cloud Instructions Tencent:Tencent Cloud Instructions Alibaba:Alibaba Cloud Instructions Volcano Engine:Volcano Engine Instructions Remote:Remote Workstation Instructions 上述链接提供了云端部署指南,包含通过 SSIsaac Automator 是一款高级工具,可帮助将自定义 Isaac Sim 部署自动化至公有云平台。该工具支持通过 SSH、基于网页的 VNC 客户端以及远程桌面客户端访问 Isaac Sim 实例,兼容 AWS、Azure、GCP 和阿里云等主流云服务商。H 和远程桌面客户端访问实例的操作说明。 直播客户端 本节将介绍如何以无界面模式直播运行 Isaac Sim 实例。需要注意的是每个 Isaac Sim 实例同一时间只能采用一种直播方式。同一时间仅允许一个客户端访问单个 Isaac Sim 实例。要远程退出 Isaac Sim 应用程序:点击文件菜单,然后在流式传输的 Isaac Sim 应用中选择退出。接着关闭 Isaac Sim WebRTC 流媒体客户端应用。当 Isaac Sim 运行在 A100 GPU 上时不支持直播功能。直播需要 NVENC(NVIDIA 编码器)支持,而 A100 GPU 不包含该编码器。 Isaac Sim WebRTC流媒体客户端是推荐的远程查看工具,可让您在桌面或工作站以无需配置高性能GPU可以查看Isaac Sim画面。 (1)服务端的启动 要使用Isaac Sim WebRTC流媒体客户端,需要先在远程运行isaac Sim。 linux:cd ~/isaacsim && ./isaac-sim.streaming.sh windows:双击isaac-sim.streaming.bat Docker:./runheadless.sh PIP:isaacsim isaacsim.exp.full.streaming --no-window Python sample:./python.sh standalone_examples/api/isaacsim.simulation_app/livestream.py 要可以通过互联网连接远程实例运行isaac sim,需要添加以下标志:--/app/livestream/publicEndpointAddress= --/app/livestream/port=49100。如在docker容器示例中: PUBLIC_IP=$(curl -s ifconfig.me) && ./runheadless.sh --/app/livestream/publicEndpointAddress=$PUBLIC_IP --/app/livestream/port=49100 然后在 Isaac Sim WebRTC 流媒体客户端应用中使用相同的公共 IP。运行 Isaac Sim 的主机必须开放UDP port 47998和TCP port 49100。 确保 Isaac Sim 应用已加载就绪。首次启动时,Isaac Sim 可能需要数分钟才能完全加载完成。 为确认加载状态,请在终端/控制台输出或应用日志中查找以下信息。使用 PIP 或 Python Sample 运行时可能不会显示该行信息。Isaac Sim Full Streaming App is loaded. (2)客户端的启动 请根据您的平台,从最新发布版块下载 Isaac Sim WebRTC 流媒体客户端。 运行 Isaac Sim WebRTC 流媒体客户端应用程序。 使用默认的 127.0.0.1 IP 地址作为服务器,连接到本地 Isaac Sim 实例。点击"连接"。连接过程可能需要一些时间。连接成功后,您将在客户端窗口中看到 Isaac Sim 界面。 需要注意的是建议在与 Isaac Sim 无头实例相同的网络中使用 WebRTC 流媒体客户端。连接到同一网络中无头模式的 Isaac Sim 实例时,请将 127.0.0.1 替换为运行 Isaac Sim 的计算机 IP 地址。 linux系统:在终端中运行 chmod +x xx.AppImage 命令,使应用程序获得可执行权限。双击 AppImage 文件即可运行 Isaac Sim WebRTC 流媒体客户端。重要提示:在 Ubuntu 22.04 或更高版本上运行需安装 libfuse2。具体安装方法请参阅《安装 FUSE 2》指南。 windows:若在连接本地或远程 Isaac Sim 实例时遇到问题,请确保 Windows 防火墙允许列表中已添加/kit/kit.exe 及 Isaac Sim WebRTC 流媒体客户端应用。 macbook:打开 DMG 文件后,点击并拖拽 Isaac Sim WebRTC 流媒体客户端应用程序至"应用程序"文件夹图标完成安装。 要重新加载连接,请在视图菜单中点击“重新加载”。如果一段时间后出现空白屏幕,此操作可能会有所帮助。 Python环境安装 主要是在python虚拟环境中通过PIP安装Isaac Sim和使用isaac Sim默认python环境。这里就不展开了,具体参考链接:python环境安装 Isaac Sim 资源库 isaac Sim提供多种资源与机器人模型,助您构建虚拟世界。部分资源专为isaac Sim及机器人应用打造,另一些则适用于其他基于NVIDIA Omniverse的应用程序。默认提供的资源均可在Window > Browsers选项卡中找到。 内容浏览器集中管理所有isaac Sim资源与文件,包含下来全部资源清单,以及URDF文件、配置文件、策略二进制等。Window > Browsers > Content。 isaac Sim最新版本提供示例资源包可供下载。使用这些资源时,需将文件下载至本地磁盘或Nucleus服务器。下文中所有资源路径均默认相对于 persistent.isaac.asset_root.default 设置中的默认资源根目录。详见本地资源包章节。 首次加载资源时耗时较长:机器人模型可能需要数分钟加载,大型环境场景的加载时间可能长达十分钟以上 资源分类如下: 机器人资产 相机与深度传感器 非视觉传感器 道具 环境 精选资源 Neural Volume渲染 机器人 NVIDIA Isaac Sim 支持多种具有不同底盘、外形和功能的机器人。这些机器人可分为轮式机器人、全向移动机器人、四足机器人、机械臂和空中机器人(无人机),它们位于内容浏览器的 Isaac Sim/Robots 文件夹中。 (1)轮式机器人 Limo是NVIDIA Isaac Sim 支持集成 ROS 系统的 AgileX Limo 差速驱动底盘机器人。Robots/AgilexRobotics/Limo/limo.usd NVIDIA 卡特机器人专为导航相关应用提供差速移动底盘。新一代 Nova 卡特机器人基于 Nova Orin 计算与传感器平台打造。 NVIDIA Isaac Sim 支持 Clearpath 移动机器人,包括 Dingo 和 Jackal。Clearpath 机器人位于 Robots/Clearpath 中。 Evobot 是一款采用两轮驱动的自平衡机器人,专为抓取和运输物体设计。该机器人由德国多特蒙德弗劳恩霍夫研究所开发。 Forklift 叉车模型采用单枢轴轮和滚轮设计,通过连接至关节动作的棱柱关节来控制升降操作。 JetBot是开源 NVIDIA JetBot 人工智能机器人平台为创客、学生和爱好者提供了构建创意趣味 AI 应用所需的一切。 Idealworks iw.hub 是一款配备激光雷达和摄像头的移动底盘,搭载 NVIDIA AGX GPU 实现自主导航。该平台负载能力达 1000 公斤,最高行驶速度 2.2 米/秒。 iRobot 公司推出的 Create3 机器人是一款先进的差速驱动机器人,专为多种教育应用场景设计。其圆形底盘集成了多种传感器和先进控制功能,特别适合室内导航、环境建图等任务。NVIDIA Isaac Sim 中的 Create3 机器人配备了差速驱动系统和各类传感器,可实现高度逼真的仿真效果。Create 3 机器人可在 Robots/iRobot/Create3/create_3.usd - 基础版本中找到。已配置移动底盘物理系统。更多信息参阅 iRobot Create 3 。 Leatherback 是 NVIDIA 用于自动驾驶的研究平台。皮背甲机器人位于 Robots/NVIDIA/Leatherback/leatherback.usd (2)全向移动机器人 Kaya机器人是一个展示 Isaac 机器人引擎在 NVIDIA Jetson Nano™平台上运行能力与灵活性的演示平台。该平台采用 3D 打印部件和爱好者级组件设计,力求实现最大程度的可及性,并配备三轮全向驱动系统,使其能够朝任意方向移动。 Robots/NVIDIA/Kaya/kaya.usd :基础版本 Robots/NVIDIA/Kaya/kaya_ogn_gamepad.usd:基础版本,外加使用全向控制器实现的游戏手柄操控功能。 O3dyn 是由多特蒙德弗劳恩霍夫研究所开发的自主全向运输机器人。凭借其全向轮,该机器人可实现任意方向移动,并通过四个杠杆抓取托盘,进而抬升托盘进行运输。 Robots/Fraunhofer/O3dyn/o3dyn.usd: 基础版本。包含移动底盘、夹具与升降机构的物理绑定,以及传感器定位功能。 Robots/Fraunhofer/O3dyn/o3dyn_controller.usd :基础版本,外加使用全向控制器实现的游戏手柄操控功能。 (3)四足机器人 Ant蚂蚁是一种基础四足机器人,腿部采用旋转关节设计,其原型源自 OpenAI Gym 中的 Ant 机器人。 Robots/IsaacSim/Ant/ant.usd :基础版本。可通过菜单栏中 Create>Robots>Ant 选项创建。 Robots/IsaacSim/Ant/ant_instanceable.usd :可实例化版本,专为强化学习场景配置以创建多个高效克隆体。 ANYmal 机器人是由 ANYbotics 开发的自主四足机器人。Isaac Sim 支持 B、C、D 三种型号。 Boston Dynamics Spot 波士顿动力 Spot 机器人位于 Robots/BostonDynamics/spot Unitree Quadruped Robots 宇树四足机器人A1、B2、Go1 和 Go2 是 Unitree Robotics 研发的四足机器人,在 Isaac Sim 中进行仿真。四足示例中使用的是 A1 型号。 关于四组机器人控制示例,请参阅:Isaac Sim 中的强化学习策略示例 (4)机械臂机器人 Denso Cobotta 包含以下 Denso 型号:Cobotta Pro 900、Cobotta Pro 1300。位于 Robots/Denso/Cobotta。 Fanuc CRX10iA/L 是一款 6 轴机器人,有效载荷为 10 公斤。 Festo费斯托协作机器人是一款六轴气动机械臂。 Flexiv Rizon 4 是一款 7 轴自适应机械臂。 RobotStudio是lerobot机械臂,包含以下 RobotStudio 模型:SO-100、SO-101。RobotStudio 机器人位于 Robots/RobotStudio。 太多了,只列出部分。 (5)空中机器人 Crazyflie 2.X 微型四轴飞行器机器人 Ingenuity火星直升机"机智号" (6)人形机器人 Fourier Intelligence GR1 傅里叶智能 GR1 Unitree Humanoids 宇树人形机器人,位于 Robots/Unitree Xiao Peng PX5 小鹏 PX5机器人位于 Robots/XiaoPeng/PX5 (7)移动式机械臂 Clearpath Ridgeback提供两种 Clearpath Ridgeback 型号配置:一种配备 Emika Franka Panda 机械臂,另一种配备 Universal Robots UR5 机械臂。位于 Robots>Clearpath 目录下。 Boston Dynamics Spot 波士顿动力 Spot 机器人,带机械臂的 Spot 机器人位于 Robots>BostonDynamics>spot 路径下 相机与深度传感器 Isaac Sim 支持相机和深度传感器,其数字孪生体可在内容浏览器中找到,位于 Isaac Sim/Sensors 目录下,并按制造商分类存放于子文件夹中。这里就不过多阐述。 非视觉传感器 Isaac Sim 模拟了多种类型的非视觉传感器模型,其数字孪生体可在内容浏览器的 Isaac Sim/Sensors 路径下找到,并按制造商分类存放于子文件夹中。 部分非视觉传感器类型尚未提供数字孪生体。有关这些传感器的详细信息(包括如何通过图形界面创建它们),请点击下方链接: 接触传感器 惯性测量单元传感器 光束传感器 激光雷达 雷达 道具 道具主要是一些角色人物如警察、医生、工人,以及杂项资产。 环境资产 环境资产提供如网格、房间、仓库、医院、办公室、赛道、小型仓库数字孪生 (1)简单网格 这个简易环境包含一块带有网格纹理的平坦地面和围边。系统提供了三种配置:前两种为直角转角,第三种则为圆角设计。 (2)简单房间 包含一张桌子的简易房间 在内容浏览器中搜索 simple_room.usd 或通过创建菜单:Create>Environments>Simple Room (3)仓库 一个包含货架及可放置物品的仓库环境。提供四种配置方案: (4)医院 医院环境,包含多个房间和空间。 (5)办公室 一个办公环境,包含多个房间和开放式平面布局。 (6)赛道 地面上勾勒出的 Jetracer 赛道轮廓。 (7)小型仓库数字孪生 一个小型仓库的数字孪生,可以使用。 精选资产 Nova Carter 搭载 Nova Orin™传感器与计算架构,是一套完整的机器人开发平台,可加速新一代自主移动机器人(AMR)的开发和部署。 Nova Carter 目前作为 Isaac AMR 和 Isaac ROS 软件的双重参考平台,支持真实场景与仿真环境下的开发工作。用户可通过赛格威机器人公司购买 Nova Carter 机器人。 关于功能完整的 Nova Carter Isaac Sim 资产详情,请参阅Nova Carter 文档页面。 注意Nova Carter 机器人在首次加载时可能需要数分钟时间。 Neural Volume渲染 NuRec(神经重建)技术能够利用源自真实世界图像的神经体积数据,在 Omniverse 中进行场景渲染。这些基于 3D 高斯模型的场景可作为标准 USD 资产加载至 Isaac Sim,用于可视化与仿真。 有关 NuRec 在 Omniverse 中的详细工作原理(包括数据准备、渲染设置及已知限制),请参阅NuRec 文档。要生成兼容场景,可使用开源项目3DGruT——该项目提供从图像集合训练 3D 高斯模型的工具,并能导出适用于 Omniverse 应用的 USDZ 格式数据。 示例展示了如何将 NuRec 场景加载到 Isaac Sim 中并运行模拟。该代码片段遍历提供的示例,首先加载指定的舞台,随后加载 carter 导航资源并设置起始位置。接着检查是否需要在生成位置创建碰撞地平面,若需要,则创建一个应用了碰撞 API 的平面基元。然后设置 carter 导航目标基元位置,并运行指定步数的模拟。在模拟过程中,轮式机器人将朝目标位置行进。 import asyncio import os import omni.kit.commands import omni.kit.app import omni.usd import omni.timeline from isaacsim.storage.native import get_assets_root_path_async from isaacsim.core.utils.stage import add_reference_to_stage from pxr import PhysxSchema, UsdGeom, UsdPhysics # User path of the HF NuRec dataset USER_PATH = "/home/user/PhysicalAI-Robotics-NuRec" # Paths for loading and placing the Nova Carter navigation asset and its target. NOVA_CARTER_NAV_URL = "/Isaac/Samples/Replicator/OmniGraph/nova_carter_nav_only.usd" NOVA_CARTER_NAV_USD_PATH = "/World/NovaCarterNav" NOVA_CARTER_NAV_TARGET_PATH = f"{NOVA_CARTER_NAV_USD_PATH}/targetXform" # Scenarios for testing navigation in the environments EXAMPLE_CONFIGS = [ { "name": "Voyager Cafe", "stage_url": f"{USER_PATH}/nova_carter-cafe/stage.usdz", "nav_start_loc": (0, 0, 0), "nav_relative_target_loc": (-3, -1.5, 0), "create_collision_ground_plane": False, "num_simulation_steps": 500, }, { "name": "Galileo Lab", "stage_url": f"{USER_PATH}/nova_carter-galileo/stage.usdz", "nav_start_loc": (3.5, 2.5, 0), "nav_relative_target_loc": (4, 0, 0), "create_collision_ground_plane": False, "num_simulation_steps": 500, }, { "name": "Wormhole", "stage_url": f"{USER_PATH}/nova_carter-wormhole/stage.usdz", "nav_start_loc": (0, 0, 0), "nav_relative_target_loc": (5, 0, 0), "create_collision_ground_plane": False, "num_simulation_steps": 500, }, { "name": "ZH Lounge", "stage_url": f"{USER_PATH}/zh_lounge/usd/zh_lounge.usda", "nav_start_loc": (-1.5, -3, -1.6), "nav_relative_target_loc": (-0.5, 5, -1.6), "create_collision_ground_plane": True, "num_simulation_steps": 500, }, ] async def run_example_async(example_config): example_name = example_config.get("name") print(f"Running example: '{example_name}'") # Open the stage stage_url = example_config.get("stage_url") if not stage_url: print(f"Stage URL not provided, exiting") return if not os.path.exists(stage_url): print(f"Stage URL does not exist: '{stage_url}', exiting") return print(f"Opening stage: '{stage_url}'") await omni.usd.get_context().open_stage_async(stage_url) stage = omni.usd.get_context().get_stage() # Make sure the physics scene is set to synchronous for the navigation to work for prim in stage.Traverse(): if prim.IsA(UsdPhysics.Scene): physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim) physx_scene.GetUpdateTypeAttr().Set("Synchronous") break # Load the carter navigation asset assets_root_path = await get_assets_root_path_async() carter_nav_path = assets_root_path + NOVA_CARTER_NAV_URL print(f"Loading carter nova asset: '{carter_nav_path}'") carter_nav_prim = add_reference_to_stage(usd_path=carter_nav_path, prim_path=NOVA_CARTER_NAV_USD_PATH) # Set the carter navigation start location nav_start_loc = example_config.get("nav_start_loc") if not nav_start_loc: print(f"Navigation start location not provided, exiting") return print(f"Setting carter navigation start location to: {nav_start_loc}") if not carter_nav_prim.GetAttribute("xformOp:translate"): UsdGeom.Xformable(carter_nav_prim).AddTranslateOp() carter_nav_prim.GetAttribute("xformOp:translate").Set(nav_start_loc) # Check if a collision ground plane needs to be created at the spawn location if example_config.get("create_collision_ground_plane"): plane_path = "/World/CollisionPlane" print(f"Creating collision ground plane {plane_path} at {nav_start_loc}") omni.kit.commands.execute("CreateMeshPrimWithDefaultXform", prim_path=plane_path, prim_type="Plane") plane_prim = stage.GetPrimAtPath(plane_path) plane_prim.GetAttribute("xformOp:scale").Set((10, 10, 1)) plane_prim.GetAttribute("xformOp:translate").Set(nav_start_loc) if not plane_prim.HasAPI(UsdPhysics.CollisionAPI): collision_api = UsdPhysics.CollisionAPI.Apply(plane_prim) else: collision_api = UsdPhysics.CollisionAPI(plane_prim) collision_api.CreateCollisionEnabledAttr(True) plane_prim.GetAttribute("visibility").Set("invisible") # Set the carter navigation target prim location nav_relative_target_loc = example_config.get("nav_relative_target_loc") if not nav_relative_target_loc: print(f"Navigation relative target location not provided, exiting") return print(f"Setting carter navigation target location to: {nav_relative_target_loc}") carter_navigation_target_prim = stage.GetPrimAtPath(NOVA_CARTER_NAV_TARGET_PATH) if not carter_navigation_target_prim.IsValid(): print(f"Carter navigation target prim not found at path: '{NOVA_CARTER_NAV_TARGET_PATH}', exiting") return if not carter_navigation_target_prim.GetAttribute("xformOp:translate"): UsdGeom.Xformable(carter_navigation_target_prim).AddTranslateOp() carter_navigation_target_prim.GetAttribute("xformOp:translate").Set(nav_relative_target_loc) # Run the simulation for the given number of steps num_simulation_steps = example_config.get("num_simulation_steps") if not num_simulation_steps: print(f"Number of simulation steps not provided, exiting") return print(f"Running {num_simulation_steps} simulation steps") timeline = omni.timeline.get_timeline_interface() timeline.play() for i in range(num_simulation_steps): if i % 10 == 0: print(f"Step {i}, time: {timeline.get_current_time():.4f}") await omni.kit.app.get_app().next_update_async() print(f"Simulation complete, pausing timeline") timeline.pause() async def run_examples_async(): for example_config in EXAMPLE_CONFIGS: await run_example_async(example_config) asyncio.ensure_future(run_examples_async()) 从 Hugging Face 下载 NVIDIA NuRec 数据集。更新脚本中的 USER_PATH 变量: USER_PATH = "/home/user/PhysicalAI-Robotics-NuRec"。 本文主要来自Isaac Sim Documentation V5.0.0的翻译。
- 
					
						  NVIDIA Jetson平台简介:机器人和边缘AI简介 NVIDIA Jetson平台提供用于开发和部署AI赋能机器人、无人机、IVA(Intelligent Video Analytics,智能视频)应用和自主机器的工具。在边缘生成式AI、NVIDIA Metropolis和Isaac平台支持下,Jetson提供可拓展得软件、现代AI堆栈、灵活的微服务和API、生成就绪型ROS软件包以及特定于应用程序的AI工作流。 Jetson 硬件Roadmap,分为商业方向和工业方向。 上图是商业硬件roadmap,主要分为orin(欧林)和thor(雷神)两个系列。 而工业方向roadmap主要是entry、mainstream、perfromance三个方向。 在软件方面,jetson提供JetPack软件包,截止目前最新的发布版本是JetPack 7.0,主要是基于ubuntu 24.04,集成了CUDA 13.0以及holoscan sensor bride支持。 硬件 NVIDIA Jetson 模组可提供适合各种性能水平和价位的加速计算功能,从而能够满足各种自主应用的需求。从制造业到建筑业,从医疗健康到物流行业,Jetson 平台都能提供出色的性能、卓越的能效和无比轻松的开发体验。下面是jetson系列提供的模组规格简要对比。 上面是NVIDIAjetson系列的模组从最小0.5TFLOPS算力到最大2070 TOPS算力的平台矩阵。 Nano:四核A57@1.43G CPU+128核Maxwell架构GPU+4GB LPDDR内存,可提供472 FGLOP的AI算力;并行运行多个神经网络并同时处理多个高分辨率传感器,其功耗仅需5~10W;应用在网络硬盘录像机(NVR)、家用机器人以及具备全面分析功能的智能网关上面。 TX2:双核NVIDIA Denver™@1.95G+四核Arm® Cortex®-A57@1.92G CPU+256核 Pascal GPU+4GB/8GB LPDDR内存,可提供1.3TFLOPS的AI算力;计算性能翻倍,功耗仅7.5W。可应用在工厂机器人、商用无人机、便携式医疗设备和企业协作设备中。 Xavier NX:6核NVIDIA Carmel ARM®v8.2 64 位 CPU + 48 个 Tensor Core 的 384 核 NVIDIA Volta™ GPU+8/16GB LPDDR4x内存;可提供14TOPS+功耗10W~21TOPS+功耗20W的AI算力;应用在 适用于无人机、便携式医疗设备、小型商业机器人、智能摄像头、高分辨率传感器、自动光学检测、智能工厂和其他 IoT 嵌入式系统等高性能 AI 系统。 AGX Xavier:8 核 NVIDIA Carmel Armv8.2 64 位 CPU+512 个 NVIDIA CUDA Core 和 64 个 Tensor Core Volta 架构GPU+32/64GB内存,提供32TOPS的AI算力,功耗在10W~40W;非常适用于配送和物流机器人、工厂系统和大型工业UAV等自主机器。 Orin Nano、NX、AGX:6\~12核 Arm® Cortex® A78AE v8.2@1.7G\~2.2G +(32\~64)x (1024\~2048)核Ampere 架构 GPU+4G\~64G LPDDR5内存;功耗满足7\~60W,提供算力34 TOPS\~275 TOPS的AI算力;Orin系列是包含7个相同架构的模组其性能是上一代AI推理的8倍并支持高速接口;强大的软件堆栈包含预训练的 AI 模型、参考 AI 工作流和垂直应用框架,可加速生成性 AI 的端到端开发,以及边缘 AI 和机器人应用。 Thor:12\~14核 Arm® Neoverse®-V3AE 64 位 CPU@2.6G+(64\~96)X(1536\~2560)核Blackwell 架构 GPU+128G LPDDR5X内存,提供1200~2070TFLOPS(FP4)算力;功耗在40~75W,与AGX Orin相比,Jetson Thor 系列模组的 AI 计算性能提高至 7.5 倍以上,能效提高至 3.5 倍。应用在人形机器人、空间智能、多传感器处理、生成式AI等多个场景。 软件 NVIDIA Jetson软件是永远边缘构建、部署和扩展人形机器人及生成式AI应用的旗舰平台。它支持全系列Jetson模块,为从原型开发到量产提供统一且可扩展的基础。NVIDIA JetPack SDK赋能实时传感处理、多摄像头追踪,以及如操作和导航等先进机器人功能,集成于强大的AI生态之中。开发者可借助诸如NVIDIA Holoscan(传感器流式处理)和Metropolis VSS(视频分析)等集成框架。通过NVIDIA Isaac机器人工作流程,包括想NVIDIA GROOT N1等基础生成式AI模型,Jetson软件为机器人实现快速、精准、变革性的AI赋能和规模化部署提供支持。 JetPack SDK:是一套完整的软件套件,用于NVIDIA Jetson平台上开发和部署AI驱动的边缘应用。 Holoscan传感器桥接器:将边缘传感器连接到AI工作流,以实现实时、高性能的传感器数据处理。 Jetson AI Lab:由NVIDIA工具和社区项目提供支持,激发机器人和生成式AI领域的创新和动手探索。 NVIDIA Isaac:提供NVIDIA CUDA加速库、框架和AI模型,用一个构建自主机器人,包括ARM、机械臂和人形机器人。 NVIDIA Metropolis:为智慧城市、工业和零售业开发和部署视觉AI应用,并在边缘进行实时视频分析。 JetPack SDK JetPack是NVIDIA Jetson平台官方软件套件,涵盖丰富的工具和库,可用于大招AI赋能边缘应用。目前最新的版本是JetPack7,采用Linux kernel 6.8及ubuntu 24.04 LTS,模块化云原生架构,结合最新的NVIDIA计算堆栈,无缝衔接NVIDIA AI工作流。 JetPack组件由AI计算堆栈、AI框架、Linux组件几个部分组成。 AI计算堆栈:由CUDA、cuDNN、TensorRT组成;用于提供硬件GPU的加速底层接口;CUDA提供NVIDIA GPU 上编写和运行通用计算程序的能力;cuDNN在CUDA之上的深度神经网络算子库,提供高度优化的深度学习核心算子(卷积、池化、激活函数、RNN/LSTM、注意力等)。Pytorch/TensorFlow调用cuDNN中的Conv2d、RNN等。TensorRT推理优化器,只负责推理不负责训练,把训练好的模型转换成高效的 GPU 可执行引擎底层依赖CUDA/cuDNN。与pytorch、TensorFlow不同其即是训练+推理框架。但相对pytorch、TensorFlow的推理,TensorRT性能效率更高。 AI框架:由pytorch、vLLM、SGLang、Triton推理服务器等部分组成。vLLM是便捷、快速的大型语言模型推理与服务库,SGLang 是专为大语言模型及视觉语言模型打造的高效推理框架。 Linux组件:主要是基础系统组件,基于ubuntu系统构建,提供刷机、安全、OTA、图形库(OpenGL、Vulkan、EGL等)、多媒体API、计算机视觉库(OpenCV、VisionWorks)等。 其他组件:Jetson平台服务、云原生设计、Nsight开发工具组成;平台服务提供预构建和可定制的云原生软件服务;云原生设计师提供容器化开发、kubernetes和微服务;Nsight提供强大的分析、调试、性能分析功能,在AI、图形和计算工作负载中优化GPU加速应用。 在JetPack SDK上各种应用SDK,如提供NVIDIA DeepStream SDK、NVIDIA Isaac ROS、NVIDIA Holoscan SDK。 Metropolis VIDIA Metropolis 是一个视觉 AI 应用平台和合作伙伴生态系统,可简化从边缘到云端的视觉 AI 智能体的开发、部署和可扩展性。可以做自动化视觉检查、智能交通系统、工业自动化、智能零售商店等等。 模型:可访问各种先进的AI模型,构建视觉AI应用,支持VLM等;提供TAO工具套件。对模型训练、适应和优化上手简单,不需要专业的AI知识或大型训练数据集,使用自己的数据微调即可完成。 工具:提供AI智能体Blueprints,借助大模型构建智能体,分析、解释和处理大量视频数据,以提供关键见解,帮助各行各业优化流程、提高安全性并降低成本。提供NVIDIA NIM一套易于使用的推理微服务,NIM 支持各种 AI 模型 (包括基础模型、LLM、VLM 等) ,可确保使用行业标准 API 在本地或云端进行无缝、可扩展的 AI 推理。提供DeepStream SDK,是基于 GStreamer 的完整流分析工具包。 数据:Omniverse集成 OpenUSDNVIDIA RTX™ 渲染技术,以及 物理 AI 集成到现有软件工具和仿真工作流中进行开发和测试 。NVIDIA Cosmos™ 是一个先进的生成式AI平台世界基础模型( WFM) 、高级标记器、护栏以及加速数据处理和管护流程,旨在加速物理 AI系统。NVIDIA Isaac SIM开发者在物理精准的虚拟环境中生成合成图像和视频数据,以训练自定义视觉AI模型。 Isaac NVIDIA Isaac AI机器人开发平台有NVIDIA CUDA加速库、应用框架和AI模型组成,可加速自主移动机器人、手臂和操作器以及人形机器人等。 NVIDIA Robotis提供了全栈、加速库和优化的AI模型,能够高效开发、训练、仿真、部署机器人系统。 Isaac ROS:机器人操作系统,是基于开源ROS2构建,包含了NVIDIA CUDA加速计算包的集合,便于简化和加速高级AI机器人应用开发。 Isaac Manipulator:基于Isaac ROS构建,支持开发AI驱动的机械臂,这些机械臂可以无缝感知和理解环境并与环境进行交互。 Isaac Perceptor:基于Isaac ROS构建,支持开发先进的自主移动机器人,能够在仓库货工厂等非结构化环境中进行感知和定位。 Isaac GR00T:用于通用机器人基础模型和数据流水线,以加速人形机器人的开发。 还提供了基于物理的虚拟环境中设计、仿真、测试和训练的框架。NVIDIA Isaac Sim和NVIDIA Isaac Lab。 NVIDIA Isaac Sim:是一款基于 NVIDIA Omniverse 构建的开源参考应用,使开发者能够在基于物理的虚拟环境中模拟和测试 AI 驱动的机器人开发解决方案。 NVIDIA Isaac Lab:Isaac Lab 基于 NVIDIA Isaac Sim™ 开发,使用 NVIDIA®PhysX® 以及基于物理性质的 NVIDIA RTX™ 渲染提供高保真物理仿真。 Holoscan SDK NVIDIA Holoscan 将传感器数据传输到 GPU 进行实时推理,从而加速边缘 AI 开发。 Holoscan 传感器桥接器:可在高吞吐量传感器数据与 GPU 之间提供关键链接,从而无缝集成异构传感器数据。它可标准化并管理从各种传感器接口 (如摄像头输入、超声波或内窥镜视频) 中的数据提取,确保以低延迟、同步和可靠的方式传输数据,从而实现实时 AI 处理。 NVIDIA IGX ORIN:是一个结合了企业级硬件、软件和支持的工业级平台,可在生产就绪型硬件上进行部署。虽然 Holoscan SDK 可在您的目标设备上灵活部署,但 IGX 使公司能够专注于应用开发,并更快地实现 AI 的优势。 Jetson AI Lab Jetson AI Lab 由 NVIDIA 工具和社区项目提供支持,其提供了各种大模型的快速部署示例,如大语言模型LLM/SLM、视觉语言大模型VLM、Web UI等等。 如上图,如果要运行一个模型,就可以通过如上图示例直接获取到运行命令,还可以调整参数。更详细的教程参考:https://www.jetson-ai-lab.com/tutorial-intro.html 参考:https://www.nvidia.cn/autonomous-machines/
- 
					
						  Jetson nano平台随记环境准备 烧录镜像 下载NVIDIA jetson nano镜像,其镜像是基于ubuntu18.04修改。使用开源的balenaEtcher烧录器写到SD卡上,然后插卡启动 网络准备 买一个无线网卡然后安装好驱动配置好wifi连接。 远程访问 方法1: 在nano上安装xrdp的方式,window就可以远程桌面访问。 方法2:在nano上安装VNC,远程访问设备需要下载VNC客户端,支持ubuntu系统。 pyhton独立环境 类型conda activate的环境 sudo apt-get install python3-pip pip3 install virturalenv 创建一个环境 python3 -m virtualenv -p python3 env --system-site-packages 激活环境 source env/bin/activate 图像和视频 主要是https://github.com/thehapyone/NanoCamera 安装opencv 创建一个swap空间,否则内存可能不够。 在安装opencv前,还要准备一下环境 使用wget下载opencv的包。 wget -O opencv_contrib.zip https://github.com/openc/opencv_corntrib/archive/4.5.1.1.zip 使用cmake进行编译。 使用jtop可以查看系统统计信息,前提是要按照pip install -U jeston-stats 硬件接上CSI的摄像头,接上之后可以在/dev/videox 看到节点。可以使用下面的命令测试就可以看到图像。 nvgstcaptrue-1.0 --orientation=2 --cap-dev-node=1指定节点如/dev/video1 读取显示 import cv2 img = cv2.imread('/assets/a.jpg') cv2.imshow("Output",img) cv2.waitkey(0) 对于平台CSI摄像头需要import nanocamera import nanocamera as nano camera = nano.Camera(flip=2, width=640,height=480,fps=30)
- 
					
						  SmolVLA 异步推理:远程 Policy Server 与本地 Client 实操概述 本文记录lerobot smolvla异步推理实践,将SmolVLA的策略server部署到AutoDL上,真机client在本地笔记本上运行。 下面是代码的流程图: 环境准备 先登录AutoDL事先搭建好lerobot的环境,这里就不再重复了,参考往期文章。lerobot环境搭建好后,需要先安装smolvla和gRPC。 # 建议先升级打包工具 python -m pip install -U pip setuptools wheel pip install -e ".[smolvla]" # 安装 gRPC 及相关 python -m pip install grpcio grpcio-tools protobuf 服务器 python src/lerobot/scripts/server/policy_server.py --host=127.0.0.1 --port=8080 --fps=30 --inference_latency=0.033 --obs_queue_timeout=2 启动成功后的日志如下: python src/lerobot/scripts/server/policy_server.py --host=127.0.0.1 --port=8080 --fps=30 --inference_latency=0 --obs_queue_timeout=2 INFO 2025-08-28 10:33:07 y_server.py:384 {'fps': 30, 'host': '127.0.0.1', 'inference_latency': 0.0, 'obs_queue_timeout': 2.0, 'port': 8080} INFO 2025-08-28 10:33:07 y_server.py:394 PolicyServer started on 127.0.0.1:8080 被客户端连接后的日志: INFO 2025-08-28 10:40:42 y_server.py:104 Client ipv4:127.0.0.1:45038 connected and ready INFO 2025-08-28 10:40:42 y_server.py:130 Receiving policy instructions from ipv4:127.0.0.1:45038 | Policy type: smolvla | Pretrained name or path: outputs/smolvla_weigh_08181710/pretrained_model | Actions per chunk: 50 | Device: cuda Loading HuggingFaceTB/SmolVLM2-500M-Video-Instruct weights ... INFO 2025-08-28 10:40:54 odeling.py:1004 We will use 90% of the memory on device 0 for storing the model, and 10% for the buffer to avoid OOM. You can set `max_memory` in to a higher value to use more memory (at your own risk). Reducing the number of VLM layers to 16 ... Loading weights from local directory INFO 2025-08-28 10:41:14 y_server.py:150 Time taken to put policy on cuda: 32.3950 seconds INFO 2025-08-28 10:41:14 ort/utils.py:74 <Logger policy_server (NOTSET)> Starting receiver INFO 2025-08-28 10:41:14 y_server.py:175 Received observation #0 | Avg FPS: 3.45 | Target: 30.00 | One-way latency: -9.22ms INFO 2025-08-28 10:41:14 y_server.py:205 Running inference for observation #0 (must_go: True) INFO 2025-08-28 10:41:15 ort/utils.py:74 <Logger policy_server (NOTSET)> Starting receiver INFO 2025-08-28 10:41:15 y_server.py:175 Received observation #0 | Avg FPS: 3.45 | Target: 30.00 | One-way latency: -9.58ms 服务器仅本地监听(12.0.0.1),这样不暴露公网,客户端通过SSH隧道安全转发。 nohup python src/lerobot/scripts/server/policy_server.py --host=127.0.0.1 --port=8080 --fps=30 --inference_latency=0.033 --obs_queue_timeout=2 >/tmp/policy_server.log 2>&1 & 也可以选择后台运行。 客户端 建立SSH转发 在本地客户端线建立SSH本地端口转发(隧道) ssh -p <服务器ssh的port> -fN -L 8080:127.0.0.1:8080 <用户名@服务器ssh的ip或域名> 如:ssh -p 20567 -fN -L 8080:127.0.0.1:8080 root@connect.xx.xxx.com 如果不想后台运行,运行在前台(Crtl+C结束) ssh -p 20567 -N -L 8080:127.0.0.1:8080 root@connect.xx.xxx.com 本地运行 python src/lerobot/scripts/server/robot_client.py \ --robot.type=so101_follower --robot.port=/dev/ttyACM0 --robot.id=R12252801 \ --robot.cameras="{ handeye: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30}, fixed: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}}" \ --policy_type=smolvla \ --pretrained_name_or_path=outputs/smolvla_weigh_08181710/pretrained_model \ --policy_device=cuda \ --actions_per_chunk=50 \ --fps=30 \ --server_address=localhost:8080 \ --chunk_size_threshold=0.8 \ --debug_visualize_queue_size=True 其他参数 --debug_visualize_queue_size=True: 执行结束后可视化队列情况。 需要安装 pip install matplotlib。 --aggregate_fn_name=conservative:当新动作到达时,如果队列中已经存在相同时间步的动作,系统会使用聚合函数来合并它们。如果为latest_only(默认),只用最新动作,这样可能会抖动剧烈。 --pretrained_name_or_path 会在“服务器上”加载。需要确保服务器上outputs/smolvla_weigh_08181710路径有权重文件。 连接执行的日志如下: python src/lerobot/scripts/server/robot_client.py --robot.type=so101_follower --robot.port=/dev/ttyACM0 --robot.id=R12252801 --robot.cameras="{ handeye: {type: opencv, index_or_path: 6, width: 320, height: 240, fps: 25}, fixed: {type: opencv, index_or_path: 0, width: 320, height: 240, fps: 25}}" --policy_type=smolvla --pretrained_name_or_path=outputs/smolvla_weigh_08181710/pretrained_model --policy_device=cuda --actions_per_chunk=50 --chunk_size_threshold=0.8 --fps=30 --server_address=localhost:8080 --aggregate_fn_name=average INFO 2025-08-28 10:40:38 t_client.py:478 {'actions_per_chunk': 50, 'aggregate_fn_name': 'average', 'chunk_size_threshold': 0.8, 'debug_visualize_queue_size': False, 'fps': 30, 'policy_device': 'cuda', 'policy_type': 'smolvla', 'pretrained_name_or_path': 'outputs/smolvla_weigh_08181710/pretrained_model', 'robot': {'calibration_dir': None, 'cameras': {'fixed': {'color_mode': <ColorMode.RGB: 'rgb'>, 'fps': 25, 'height': 240, 'index_or_path': 0, 'rotation': <Cv2Rotation.NO_ROTATION: 0>, 'warmup_s': 1, 'width': 320}, 'handeye': {'color_mode': <ColorMode.RGB: 'rgb'>, 'fps': 25, 'height': 240, 'index_or_path': 6, 'rotation': <Cv2Rotation.NO_ROTATION: 0>, 'warmup_s': 1, 'width': 320}}, 'disable_torque_on_disconnect': True, 'id': 'R12252801', 'max_relative_target': None, 'port': '/dev/ttyACM0', 'use_degrees': False}, 'server_address': 'localhost:8080', 'task': '', 'verify_robot_cameras': True} INFO 2025-08-28 10:40:40 a_opencv.py:179 OpenCVCamera(6) connected. INFO 2025-08-28 10:40:41 a_opencv.py:179 OpenCVCamera(0) connected. INFO 2025-08-28 10:40:41 follower.py:104 R12252801 SO101Follower connected. WARNING 2025-08-28 10:40:42 ils/utils.py:54 No accelerated backend detected. Using default cpu, this will be slow. WARNING 2025-08-28 10:40:42 /policies.py:80 Device 'cuda' is not available. Switching to 'cpu'. WARNING 2025-08-28 10:40:42 ils/utils.py:54 No accelerated backend detected. Using default cpu, this will be slow. WARNING 2025-08-28 10:40:42 /policies.py:80 Device 'cuda' is not available. Switching to 'cpu'. INFO 2025-08-28 10:40:42 t_client.py:121 Initializing client to connect to server at localhost:8080 INFO 2025-08-28 10:40:42 t_client.py:140 Robot connected and ready INFO 2025-08-28 10:40:42 t_client.py:163 Sending policy instructions to policy server INFO 2025-08-28 10:41:14 t_client.py:486 Starting action receiver thread... INFO 2025-08-28 10:41:14 t_client.py:454 Control loop thread starting INFO 2025-08-28 10:41:14 t_client.py:280 Action receiving thread starting INFO 2025-08-28 10:41:15 t_client.py:216 Sent observation #0 | INFO 2025-08-28 10:41:15 t_client.py:469 Control loop (ms): 288.72 INFO 2025-08-28 10:41:15 t_client.py:216 Sent observation #0 | INFO 2025-08-28 10:41:15 t_client.py:469 Control loop (ms): 132.22 INFO 2025-08-28 10:41:15 t_client.py:216 Sent observation #0 | INFO 2025-08-28 10:41:15 t_client.py:469 Control loop (ms): 127.84 INFO 2025-08-28 10:41:15 t_client.py:216 Sent observation #0 | INFO 2025-08-28 10:41:15 t_client.py:469 Control loop (ms): 123.95 INFO 2025-08-28 10:41:15 t_client.py:216 Sent observation #0 | INFO 2025-08-28 10:41:15 t_client.py:469 Control loop (ms): 140.21 INFO 2025-08-28 10:41:15 t_client.py:469 Control loop (ms): 0.54 INFO 2025-08-28 10:41:15 t_client.py:469 Control loop (ms): 0.42 如果使用ACT策略,对于ACT来说chunk_size_threshold不要设置太大,实测发现不然一个chunk到下一个chunk抖动比较严重 python src/lerobot/scripts/server/robot_client.py \ --robot.type=so101_follower \ --robot.port=/dev/ttyACM0 \ --robot.id=R12252801 \ --robot.cameras="{ handeye: {type: opencv, index_or_path: 6, width: 640, height: 480, fps: 30}, fixed: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}}" \ --policy_type=act \ --pretrained_name_or_path=outputs/act_weigh_07271539/pretrained_model \ --policy_device=cuda \ --actions_per_chunk=100 \ --fps=30 \ --server_address=localhost:8080 \ --chunk_size_threshold=0.1 以上就是用SSH隧道的方式实现异步推理的过程。 参考:https://hugging-face.cn/docs/lerobot/async 常见问题 (0)服务器监控日志分析 Received observation #136:服务器接收到第136个观察数据 Avg FPS: 11.52:实际观测数据帧率,根据客户端每秒采样时间计算,跟服务端没有关系。 Target:目标设置的观测数据帧率,如15帧/s。 One-way latency: 客户端到服务器的单向网络延迟为1.71ms。 inference time: 模型推理耗时时长为1667ms。 action chunk #136:生成了第136个动作块。 Total time: 推理+序列化总处理时长。 (1)服务端实际的观测帧率低 INFO 2025-08-29 09:47:05 y_server.py:175 Received observation #573 | Avg FPS: 1.20 | Target: 30.00 | One-way latency: 35.78ms 可以看到上面的收到的观测帧率平均只有1.2,看看服务端计算FPS的逻辑。 # 每次接收观测时调用 self.total_obs_count += 1 # 包括所有接收的观测(包括被过滤的) total_duration = current_timestamp - self.first_timestamp avg_fps = (self.total_obs_count - 1) / total_duration 影响服务端的接受观测帧帧率的有客户端观测发送频率低,服务端观测被过滤,推理时间长间接影响 关于客户端观测数据的发送限制如下: # 只有当队列大小/动作块大小 <= 阈值时才发送观测 if queue_size / action_chunk_size <= chunk_size_threshold: send_observation() 可以看到只有满足上面的小于动作阈值才会发送,所以要加大发送的帧率需要改大阈值chunk_size_threshold,减少actions_per_chunk,减少queue_size。 对于ACT策略的FPS 低可能不是问题,这是观察发送频率,不是控制频率,因为ACT 策略就是低频观察,高频执行,主要看机器是不是以30FPS动作执行就好。smolvla也是同样的。因此有时候不要过于误解这个观测帧率,太高的观测帧率也不一定是好事。 (2)观测数据被过滤 y_server.py:191 Observation #510 has been filtered out 服务端会根据这次和上次的关节角度计算欧拉来判断相似性,默认的阈值参数是1,可以改小一点,对相似性的判断严苛一下。 def observations_similar( obs1: TimedObservation, obs2: TimedObservation, lerobot_features: dict[str, dict], atol: float = 1 ) -> bool: ...... return bool(torch.linalg.norm(obs1_state - obs2_state) < atol) 如上修改atol的值,可以改小为如0.5。 总结一下: 对于分布式推理不要过于去纠结实际的观测帧率,而是应该看控制的实际帧率,只要控制动作的帧率(如下的延时,大概就是30fps)是满足的就是可以的,也就是说动作队列中动作不要去获取的时候是空。 对于参数优化重点看服务端的推理延时和客户端的队列管理。 服务端是可以设置推理延时的 # 推理延迟控制 --inference_latency=0.033 # 模型推理延迟(秒) # 观察队列管理 --obs_queue_timeout=2 # 获取观测队列超时时间(秒) 客户端动作管理 --actions_per_chunk=100 # 一个动作块的序列大小,越大推理负载就重 --chunk_size_threshold=0.5 # 队列阈值,越大缓存的动作序列越多,越小实时性越好。 --fps=30 # 控制频率 如果要低延时那么就需要把fps提高,也就是服务端的推理时间设置小,客户端的chunk、threshold要小,fps要高。
- 
					
						  LeRobot SmolVLA:从训练到推理链路剖析框架 本文主要对lerobot SmolVLA策略代码进行分析,下面是策略实现关键部分框图。 SmolVLAPolicay类封装向上提供策略的调用。SmolVLAConfig是对SmolVLA策略的配置,用于配置输出动作序列长度、观测图像输入后到模型的缩放尺寸以及微调策略等等。SmolVLAPolicay类中关键的成员是VLAFlowMatching类,是实现SmolVLA模型flow matching机制训练、推理的核心。在VLAFlowMatching类中关系成员是SmolVLMWithExpertModel类,其定义了VLM+Expert模型具体实现。 SmolVLA策略实现主要涉及SmolVLAPolicy、VLAFlowMatching 、SmolVLMWithExperModel三个类来实现。就以模型训练、模型推理两条主线来进行总结。 训练 训练过程可以分为一下几个核心部分: 数据输入处理:图像、文本和状态通过各自处理方法嵌入并标准化,合并成统一的输入,供后续层次处理。 VLM与专家模型交互前向传播:图像、文本和状态数据通过VLM和专家模型进行多层次的自注意力和交叉注意力计算,得到联合特征表示。 损失计算与优化:通过计算预测动作和目标动作之间的损失,具体是速度场的损失,使用反向传播更新参数。 模型参数冻结与训练策略:通过冻结不必要的模型部分(VLM),专注优化重要部分,减少计算的开销。 输入处理 SmolVLA模型分为前缀prefix、后缀suffix输入。前缀主要是观测端数据由图像、文本和机器的状态嵌入构成,提供给VLM处理,目的是为模型提供上下文信息,理解任务的背景。后缀是用于生成过程中,输入的是噪声动作+时间步,经过Expert模型处理输出具体的预测动作。 (1)前缀prefix嵌入 def embed_prefix(self, images, img_masks, lang_tokens, lang_masks, state: torch.Tensor = None): embs = [] pad_masks = [] att_masks = [] # 处理图像 for _img_idx, (img, img_mask) in enumerate(zip(images, img_masks)): img_emb = self.vlm_with_expert.embed_image(img) embs.append(img_emb) pad_masks.append(img_mask) # 处理语言 lang_emb = self.vlm_with_expert.embed_language_tokens(lang_tokens) embs.append(lang_emb) pad_masks.append(lang_masks) # 处理状态 state_emb = self.state_proj(state) embs.append(state_emb) state_mask = torch.ones_like(state_emb) pad_masks.append(state_mask) # 合并所有嵌入 embs = torch.cat(embs, dim=1) pad_masks = torch.cat(pad_masks, dim=1) att_masks = torch.cat(att_masks, dim=1) return embs, pad_masks, att_masks 代码的流程是依次对输入图像、语言、机器状态进行分别做embedding,然后进行按列合并为一个前缀输入。 图像嵌入:通过embed_image方法转换为嵌入表示,每个图像的嵌入被添加到embs列表中,img_mask则记录图像的有效区域。 文本嵌入:通过embed_language_tokens() 被转换为嵌入表示,lang_emb 是语言的嵌入,包含了语言的语法和语义信息。 状态嵌入:状态信息通过 state_proj() 映射到与图像和文本相同维度的空间,得到 state_emb。 最终图像嵌入、文本嵌入和状态嵌入通过 torch.cat() 方法按列合并成一个大的 前缀输入(Prefix)。pad_masks 和 att_masks 也被合并成一个统一的输入,确保每个模态的信息能够与其他模态的输入一起传递。 图像和文本嵌入调用已经隐式包含了位置编码,状态信息state_proj 转换为嵌入,尽管没有显式的位置信息,但会在模型中通过与其他模态嵌入的融合获取上下文信息。 (2)后缀Suffix嵌入 def embed_suffix(self, noisy_actions, timestep): embs = [] pad_masks = [] att_masks = [] # 使用 MLP 融合时间步长和动作信息 action_emb = self.action_in_proj(noisy_actions) device = action_emb.device bsize = action_emb.shape[0] dtype = action_emb.dtype # 使用正弦-余弦位置编码生成时间嵌入 time_emb = create_sinusoidal_pos_embedding( timestep, self.vlm_with_expert.expert_hidden_size, self.config.min_period, self.config.max_period, device=device, ) time_emb = time_emb.type(dtype=dtype) # 将时间嵌入和动作嵌入结合 time_emb = time_emb[:, None, :].expand_as(action_emb) action_time_emb = torch.cat([action_emb, time_emb], dim=2) action_time_emb = self.action_time_mlp_in(action_time_emb) action_time_emb = F.silu(action_time_emb) # swish == silu action_time_emb = self.action_time_mlp_out(action_time_emb) # 将生成的动作嵌入加入到输入中 embs.append(action_time_emb) bsize, action_time_dim = action_time_emb.shape[:2] action_time_mask = torch.ones(bsize, action_time_dim, dtype=torch.bool, device=device) pad_masks.append(action_time_mask) # 设置注意力掩码,防止图像、语言和状态的输入与动作输入相互影响 att_masks += [1] * self.config.chunk_size embs = torch.cat(embs, dim=1) pad_masks = torch.cat(pad_masks, dim=1) att_masks = torch.tensor(att_masks, dtype=embs.dtype, device=embs.device) att_masks = att_masks[None, :].expand(bsize, len(att_masks)) return embs, pad_masks, att_masks 后缀的输入主要是提供给Expert专家模型用于flow matching预测出输出,输入是噪声动作(noisy actions)+时间步长(timestep)。上述代码可以分为以下几个部分: 时间步长嵌入:时间步长(timestep)用于表示当前的生成步骤,生成一个正弦-余弦位置编码(Sine-Cosine Positional Embedding)。create_sinusoidal_pos_embedding() 使用正弦和余弦函数生成时间嵌入,增强模型对时序的理解。 动作嵌入:动作通过 action_in_proj 进行嵌入,得到 action_emb。这一步是将生成的动作(采样的噪声动作)转化为嵌入表示。 融合时间和动作:动作嵌入与时间嵌入(time_emb)通过 torch.cat() 进行拼接,形成一个新的包含时间信息的动作嵌入。这样,生成的动作不仅包含来自环境的信息,还加入了时间步长的变化。 MLP处理:合并后的动作嵌入通过 action_time_mlp_in 和 action_time_mlp_out 层进行处理。这个过程是对动作嵌入进行进一步的处理,确保其能够适应后续的生成任务。 最终,生成的动作嵌入被加入到 embs 列表中,并通过 torch.cat() 合并为一个统一的后缀输入。这个后缀输入将与前缀输入一起通过 Transformer 层进行处理。 前向传播 forward是整个前向传播的核心,将将输入组合后通过模型计算输出。 def forward( self, images, img_masks, lang_tokens, lang_masks, state, actions, noise=None, time=None ): # 1. 前缀输入的生成 prefix_embs, prefix_pad_masks, prefix_att_masks = self.embed_prefix( images, img_masks, lang_tokens, lang_masks, state ) # 2. 后缀输入的生成 suffix_embs, suffix_pad_masks, suffix_att_masks = self.embed_suffix(actions, time) # 3. 拼接前缀和后缀的嵌入 pad_masks = torch.cat([prefix_pad_masks, suffix_pad_masks], dim=1) att_masks = torch.cat([prefix_att_masks, suffix_att_masks], dim=1) # 4. 计算注意力掩码 att_2d_masks = make_att_2d_masks(pad_masks, att_masks) position_ids = torch.cumsum(pad_masks, dim=1) - 1 # 5. 前向计算 (_, suffix_out), _ = self.vlm_with_expert.forward( attention_mask=att_2d_masks, position_ids=position_ids, past_key_values=None, inputs_embeds=[prefix_embs, suffix_embs], use_cache=False, fill_kv_cache=False, ) # 6. 速度场预测计算损失 suffix_out = suffix_out[:, -self.config.chunk_size :] suffix_out = suffix_out.to(dtype=torch.float32) v_t = self.action_out_proj(suffix_out) losses = F.mse_loss(v_t, actions, reduction="none") return losses 代码调用前缀、后缀输入然后进行拼接得到inputs_embeds,然后再计算注意力的掩码就可以调用VLM+Expert模型进行前向计算。在前向计算中有两个参数use_cache 和 fill_kv_cache 参数,这两个参数的设置控制 key-value 缓存 的使用。 (1)模型组合 models = [self.get_vlm_model().text_model, self.lm_expert] model_layers = self.get_model_layers(models) def get_model_layers(self, models: list) -> list: vlm_layers = [] expert_layers = [] multiple_of = self.num_vlm_layers // self.num_expert_layers for i in range(self.num_vlm_layers): if multiple_of > 0 and i > 0 and i % multiple_of != 0: expert_layer = None else: expert_layer_index = i // multiple_of if multiple_of > 0 else i expert_layer = models[1].layers[expert_layer_index] vlm_layers.append(models[0].layers[i]) expert_layers.append(expert_layer) return [vlm_layers, expert_layers] 模型混合主要是生成一个混合的模型层列表,通过get_model_layers函数计算并返回 VLM 层和 Expert 层的对齐关系,VLM层和Expert层对齐是基于multiple_of来进行层级分配的。如果某些 VLM 层 没有对应的 Expert 层,则设置为 None,仅由 VLM 层处理。默认情况下VLM和Expert的层数一样都为16,下图看看VLM=8,Expert=4的示例。 因此最终对于SmolVLA来说,模型是一个混合的模型层列表model_layers。可以通过model_layers[i][layer_idx]来访问具体的模型,model_layers[0][x]为VLM模型,model_layers[1][x]为Expert模型。如model_layers[0][2]为第二层的VLM,model_layers[1][2]为第二层的Expert,model_layers[1][1]为None。 (2)处理输入嵌入 for hidden_states in inputs_embeds: if hidden_states is None: continue batch_size = hidden_states.shape[0] 遍历输入嵌入(inputs_embeds),检查是否有无效的输入(即 None),并获取当前批次的大小batch_size。 inputs_embeds:模型的输入嵌入数据,可能包含多种模态的输入(例如,图像嵌入、文本嵌入等)。 hidden_states.shape[0]:获取当前输入数据的批次大小。 (3)自注意力与交叉注意力 num_layers = self.num_vlm_layers head_dim = self.vlm.config.text_config.head_dim for layer_idx in range(num_layers): if ( fill_kv_cache or "cross" not in self.attention_mode or (self.self_attn_every_n_layers > 0 and layer_idx % self.self_attn_every_n_layers == 0) ): att_outputs, past_key_values = self.forward_attn_layer() else: att_outputs, past_key_values = self.forward_cross_attn_layer() 使用VLM层数来进行遍历,因为VLM侧的层数是Expert的一倍。进行如循环根据条件来判断是进行自注意力计算还是交叉注意力计算。 判断使用自注意力的条件是有3种情况(其中一种满足即可): fill_kv_cache:如果需要填充 键值缓存(key-value cache),则使用自注意力计算。 "cross" not in self.attention_mode:如果当前没有启用交叉注意力模式,则使用自注意力。 self_attn_every_n_layers:在每隔 n 层计算自注意力时,执行该条件。通常用于启用跨层的自注意力机制。 具体关于自注意力与交叉注意力计算的细节见后续章节。 (4)残差连接与前馈网络 out_emb += hidden_states after_first_residual = out_emb.clone() out_emb = layer.post_attention_layernorm(out_emb) out_emb = layer.mlp(out_emb) out_emb += after_first_residual 残差连接:每一层都使用残差连接,将当前层的输出与原始输入相加,防止深层网络的梯度消失问题。 前馈网络(MLP):通过前馈神经网络(通常包括一个隐藏层和激活函数)进行处理,进一步捕捉输入的非线性关系。 (5)输出处理 outputs_embeds = [] for i, hidden_states in enumerate(inputs_embeds): if hidden_states is not None: out_emb = models[i].norm(hidden_states) outputs_embeds.append(out_emb) else: outputs_embeds.append(None) return outputs_embeds, past_key_values 遍历输入嵌入(inputs_embeds),对每个有效的 hidden_states 进行 归一化处理(models[i].norm())。如果嵌入无效(即 None),则直接将 None 放入输出列表中,以保持输入结构的对齐。最终返回 处理后的嵌入 和 past_key_values(如果有的话)。 归一化(通常是层归一化)确保嵌入在后续计算中具有更好的数值稳定性,帮助模型学习。对缺失的嵌入(None)进行特殊处理,主要是VLM+Expert对齐时,Expert通常为VLM的一半,而模型遍历是时按照VLM层次来遍历的,所以有一半的Expert是None,但是这部的None不能在处理VLM层的时候断掉Expert的输入,否则Expert模型梯度链就断了。 损失计算 SmolVLAPolicy.forward(...) ...... 调 VLAFlowMatching 计算逐样本/逐步/逐维损失(不聚合) losses = self.model.forward(images, img_masks, lang_tokens, lang_masks, state, actions, noise, time) if actions_is_pad is not None: in_episode_bound = ~actions_is_pad losses = losses * in_episode_bound.unsqueeze(-1) # 去掉为对齐而pad出的 action 维度 losses = losses[:, :, : self.config.max_action_dim] # 聚合为标量 loss(反向传播用) loss = losses.mean() return loss, {"loss": loss.item()} 在SmolVLAPolicy.forward(...)调用VLAFlowMatching.forward计算返回损失,下面直接来看VLAFlowMatching.forward。 def forward(self, images, img_masks, lang_tokens, lang_masks, state, actions, noise=None, time=None) -> Tensor: # ① 采样噪声与时间 if noise is None: noise = self.sample_noise(actions.shape, actions.device) # ~N(0,1) if time is None: time = self.sample_time(actions.shape[0], actions.device) # Beta(1.5,1.0)→偏向 t≈1 # ② 合成中间点 x_t 与“真向量场” u_t time_expanded = time[:, None, None] # [B,1,1] x_t = time_expanded * noise + (1 - time_expanded) * actions # convex组合 u_t = noise - actions # ③ 前缀/后缀嵌入(图像+文本+状态 | 动作+时间),拼注意力mask/位置id prefix_embs, prefix_pad_masks, prefix_att_masks = self.embed_prefix(images, img_masks, lang_tokens, lang_masks, state) suffix_embs, suffix_pad_masks, suffix_att_masks = self.embed_suffix(x_t, time) pad_masks = torch.cat([prefix_pad_masks, suffix_pad_masks], dim=1) att_masks = torch.cat([prefix_att_masks, suffix_att_masks], dim=1) att_2d_masks = make_att_2d_masks(pad_masks, att_masks) position_ids = torch.cumsum(pad_masks, dim=1) - 1 # ④ 走双塔文本Transformer:prefix + suffix(训练时不建缓存) (_, suffix_out), _ = self.vlm_with_expert.forward( attention_mask=att_2d_masks, position_ids=position_ids, past_key_values=None, inputs_embeds=[prefix_embs, suffix_embs], use_cache=False, fill_kv_cache=False, ) suffix_out = suffix_out[:, -self.config.chunk_size :] # 取后缀对应的输出token # ⑤ Expert头→动作向量场 v_t,并与 u_t 做逐元素 MSE suffix_out = suffix_out.to(dtype=torch.float32) # 数值稳定 v_t = self.action_out_proj(suffix_out) losses = F.mse_loss(u_t, v_t, reduction="none") # [B, T, A] 不聚合 return losses 核心思想还是学习一个向量场 $v_{\theta}(x_t,t)$ 去逼近真实向量场 $u_t = \epsilon - a$,其中 $$ x_t = t \cdot \epsilon + (1 - t) \cdot a, \quad \epsilon \sim \mathcal{N}(0,I) $$ $a$ 是机器真实的动作如舵机的角度,对应上述代码的action;$\epsilon$是noisy action,最开始随机生成采样而来,对应上述的noise。 模型参数 模型参数冻结主要是以下两个方法决定 SmolVLMWithExpertModel.set_requires_grad(管 VLM/Expert的大部分参数); VLAFlowMatching.set_requires_grad(只管 state 的投影头)。 (1)VLM/Expert大部分参数 def set_requires_grad(self): # 1) 冻结视觉编码器(可选) if self.freeze_vision_encoder: self.get_vlm_model().vision_model.eval() for p in self.get_vlm_model().vision_model.parameters(): p.requires_grad = False # 2) 只训练 Expert(常见默认) if self.train_expert_only: self.vlm.eval() for p in self.vlm.parameters(): p.requires_grad = False else: # 3) 非“只训 Expert”时,VLM 只冻结一小部分层,避免 DDP unused params last_layers = [self.num_vlm_layers - 1] if (self.num_vlm_layers != self.num_expert_layers and self.num_vlm_layers % self.num_expert_layers == 0): last_layers.append(self.num_vlm_layers - 2) frozen = ["lm_head", "text_model.model.norm.weight"] for L in last_layers: frozen.append(f"text_model.model.layers.{L}.") for name, p in self.vlm.named_parameters(): if any(k in name for k in frozen): p.requires_grad = False # 4) Expert 侧不训练 lm_head(没用到 LM 头) for name, p in self.lm_expert.named_parameters(): if "lm_head" in name: p.requires_grad = False 冻结视觉编码器:把 VLM 的 vision encoder 切到 eval(),并把其所有参数 requires_grad=False。对于VLM视觉部分已经比较稳定了,若下游数据量不大,继续训练易带来不稳定与显存开销;冻结能省资源并保持视觉表征稳定。 只训练 Expert:把VLM的(视觉编码+LLM)都起到eval且全部冻结。这是一种轻量微调策略——只训练 Expert+ 动作/时间/状态投影头,能在小数据上快速稳定收敛,避免对大模型语义分布造成破坏。 非“只训 Expert”时,VLM 只冻结一小部分层:永远冻结 VLM 的 lm_head(语言模型头,动作任务用不到),冻结text_model.model.norm.weight,降低训练不稳定,冻结最后 1 层; 总结一下: 目标 典型设置 实际可训练部分 轻量微调(默认/推荐起步) freeze_vision_encoder=True + train_expert_only=True Expert 全部层(除 lm_head) + 动作/时间/状态头(VLAFlowMatching 里的 action_in/out_proj、action_time_mlp_、state_proj) 加强表达(部分放开 VLM) freeze_vision_encoder=True/False + train_expert_only=False Expert 全部层 + 大多数 VLM 文本层(但冻结 lm_head、末尾 norm、最后 1–2 层) + 动作/时间/状态头 除了上面的参数之外在 SmolVLMWithExpertModel.train 中又做了一层保险: def train(self, mode=True): super().train(mode) if self.freeze_vision_encoder: self.get_vlm_model().vision_model.eval() if self.train_expert_only: self.vlm.eval() 即使外部调用了 model.train(),被冻的模块仍保持 eval(),避免 Dropout/BN 等训练态行为干扰。是否参与反向仍由 requires_grad 决定;两者配合保证“真冻结”。 (2)state 的投影头 class VLAFlowMatching(nn.Module): def __init__(self, config): super().__init__() self.config = config self.vlm_with_expert = SmolVLMWithExpertModel( ... ) # —— 与动作/状态/时间相关的投影头 —— self.state_proj = nn.Linear( self.config.max_state_dim, self.vlm_with_expert.config.text_config.hidden_size ) self.action_in_proj = nn.Linear(self.config.max_action_dim, self.vlm_with_expert.expert_hidden_size) self.action_out_proj = nn.Linear(self.vlm_with_expert.expert_hidden_size, self.config.max_action_dim) self.action_time_mlp_in = nn.Linear(self.vlm_with_expert.expert_hidden_size * 2, self.vlm_with_expert.expert_hidden_size) self.action_time_mlp_out = nn.Linear(self.vlm_with_expert.expert_hidden_size, self.vlm_with_expert.expert_hidden_size) self.set_requires_grad() # ← 这里调用 ... def set_requires_grad(self): for params in self.state_proj.parameters(): params.requires_grad = self.config.train_state_proj 根据 config.train_state_proj(布尔值)开/关状态投影层 state_proj 的可训练性。这里只对state_proj做控制,这个是把机器人状态(关节角、抓取开合等)映射到 VLM 文本编码器的隐藏维度。不同机器人/任务,状态分布差异很大(量纲、范围、相关性);是否需要学习这个映射,取决于你的数据规模与分布,所以可以根据train_state_proj=True/False来决定是否要训练或冻结。其它头(action_in/out_proj、action_time_mlp_*)对动作/时间更直接,通常都需要学习,因此默认不在这里冻结。 推理 推理的入口函数入口:SmolVLAPolicy.predict_action_chunk ->select_action-> VLAFlowMatching.sample_actions(...),推理跟训练流程大致相同,这里只简单总结一下不同点。 前缀缓存 prefix_embs, prefix_pad_masks, prefix_att_masks = self.embed_prefix(...) prefix_att_2d_masks = make_att_2d_masks(prefix_pad_masks, prefix_att_masks) prefix_position_ids = torch.cumsum(prefix_pad_masks, dim=1) - 1 # 只喂前缀,构建 KV cache _, past_key_values = self.vlm_with_expert.forward( attention_mask=prefix_att_2d_masks, position_ids=prefix_position_ids, past_key_values=None, inputs_embeds=[prefix_embs, None], # ★ 只有前缀 use_cache=self.config.use_cache, # 通常 True fill_kv_cache=True, # ★ 建缓存 ) 与训练的差别是训练不建缓存,推理先把 VLM 的 Q/K/V(更准确:K/V)算出来并存起来(past_key_values),这步只走 self-attn 分支(因为 fill_kv_cache=True),Expert 不参与。另外需要注意的时传递的输入只有prefix_embs而训练是inputs_embeds=[prefix_embs, suffix_embs]既要传递prefix_embs也有传递suffix_embs,这里的后缀编码为插值点的嵌入,即x_t = time_expanded * noise + (1 - time_expanded) * actions。因为没有Expert的输入,所以自注意力算的也只有VLM的输入。 后缀循环 dt = -1.0 / self.config.num_steps x_t = noise # 初始噪声 time = torch.tensor(1.0, ...) while time >= -dt/2: v_t = self.denoise_step(prefix_pad_masks, past_key_values, x_t, time) x_t += dt * v_t # Euler 更新 time += dt return x_t # 作为动作 做 ODE 去噪循环(Euler),每一步只算后缀。与训练的差别是“采一个随机 t 直接监督向量场”,推理是“从 t=1 积分到 t=0”(ODE 解)。这里的 num_steps 控制积分步数(精度/速度权衡)。 denoise_step(...)----> suffix_embs, suffix_pad_masks, suffix_att_masks = self.embed_suffix(x_t, timestep) # 组装 prefix/suffix 的联合注意力掩码(prefix 只提供 pad_2d 以允许被看) full_att_2d_masks = torch.cat([prefix_pad_2d_masks, suffix_att_2d_masks], dim=2) position_ids = prefix_offsets + torch.cumsum(suffix_pad_masks, dim=1) - 1 outputs_embeds, _ = self.vlm_with_expert.forward( attention_mask=full_att_2d_masks, position_ids=position_ids, past_key_values=past_key_values, # ★ 复用 prefix KV inputs_embeds=[None, suffix_embs],# ★ 只有后缀 use_cache=self.config.use_cache, # True fill_kv_cache=False, # ★ 不再建缓存 ) suffix_out = outputs_embeds[1][:, -chunk_size:] v_t = self.action_out_proj(suffix_out) denoise_step(...)拿缓存 + 只喂后缀(前缀为None),分层走 cross/self。VLM 不再重算 Q/K/V,层内 cross-attn 时,Expert 的 Query 去看 prefix 的 K/V 缓存;若该层被 self_attn_every_n_layers 强制 self,则只做 Expert 自注意(VLM 旁路,因为没有输入前缀)。与训练的差别是训练时两侧一起算(inputs_embeds=[prefix, suffix]),且无缓存。 训练 vs 推理 维度 训练(VLAFlowMatching.forward) 推理(sample_actions + denoise_step) 是否用真动作 用,参与构造 x_t,t 与 u_t=noise-actions,形成监督 不用(没有 label),从噪声解 ODE 得动作 时间使用 随机采样 t~Beta(1.5,1.0),单步监督 从 t=1 到 t=0 迭代(步长 dt=-1/num_steps) 是否建 KV Cache 否(use_cache=False, fill_kv_cache=False) 是:先prefix-only 建缓存;循环中 suffix-only 复用缓存 两塔前向喂法 一次性 inputs_embeds=[prefix, suffix] 两段:① [prefix, None](建缓存);② [None, suffix](复用缓存) 层内注意力路由 由 attention_mode / self_attn_every_n_layers 决定,但无缓存上下文 相同路由;cross 时 Expert-Q × cached VLM-KV;self 时只 Expert 自注意 位置编码(RoPE) 每层对参与计算的 Q/K 应用 同上;prefix 的位置在建缓存时用过;suffix 在每步都重算 损失/梯度 MSE(u_t, v_t) → 反向 无损失、无反向 输出后处理 返回标量 loss(policy 中聚合/掩码后) x_t 作为动作 → unnormalize →(可选)Aloha 映射;支持 n-step 队列 注意力 注意力的计算是模型训练和推理的核心,主要涉及自注意力和交叉注意力,这里单独总结一章节进行梳理分析。 自注意力 自注意力的代码注意在forward_attn_layer函数中,接下来根据代码来进行分析。 (1)自注意力QKV计算 query_states = [] key_states = [] value_states = [] 首先定义了Self-Attention 中的 Query、Key 和 Value。这些将用于计算注意力权重。 for i, hidden_states in enumerate(inputs_embeds): layer = model_layers[i][layer_idx] if hidden_states is None or layer is None: continue hidden_states = layer.input_layernorm(hidden_states) inputs_embeds 是一个包含不同模态输入的列表或张量。例如,它可能包含 VLM 的前缀输入(图像、文本、状态)和 Expert 的后缀输入(动作、时间)。enumerate(inputs_embeds) 会遍历 inputs_embeds 中的每个元素,并返回 i(当前元素的索引)和 hidden_states(对应的输入嵌入)。通过 enumerate 我们可以分别处理每个输入模态,i 用来区分是处理 VLM 还是 Expert。i=0 对应 VLM 的输入,i=1 对应 Expert 的输入。 因此model_layers[i][layer_idx]根据 i 来选择当前是处理 VLM 层还是 Expert 层。如果 i=0,则选择 VLM 的层;如果 i=1,则选择 Expert 的层。layer_idx是当前处理的层的索引,指定当前模型中的哪一层进行处理。 当判断hidden_states is None或layer is None是则跳过不处理,对于Expert侧来说会为空,因为外层是按照VLM层数来遍历的,Expert只有VLM的一半,因此每隔VLM一层就会有一个Expert为空。 先使用input_layernorm对当前输入hidden_states进行归一化。然后就各自进行Q/K/V计算。 hidden_states = hidden_states.to(dtype=layer.self_attn.q_proj.weight.dtype) query_state = layer.self_attn.q_proj(hidden_states).view(hidden_shape) key_state = layer.self_attn.k_proj(hidden_states).view(hidden_shape) value_state = layer.self_attn.v_proj(hidden_states).view(hidden_shape) query_states.append(query_state) key_states.append(key_state) value_states.append(value_state) 在for循环中,遍历VLM和Expert各自计算Q/K/V,然后把VLM和Expert计算的Q/K/V都分类各自加入到相同的列表中,如VLM和Expert的Q加入列表query_states.append。 (2)拼接QKV query_states = torch.cat(query_states, dim=1) key_states = torch.cat(key_states, dim=1) value_states = torch.cat(value_states, dim=1) 将VLM和Expert计算出来的Query、Key、Value各自拼接成一个大的张量,用于后续的注意力计算,从这里可以看出。VLM和Expert的注意力计算是使用一个transformer同时对VLM+Expert的输入拼接输入计算的。相当于VLM和Expert的输入可以双向注意力。 (3)EoPE编码 seq_len = query_states.shape[1] if seq_len < position_ids.shape[1]: _position_ids = position_ids[:, :seq_len] _attention_mask = attention_mask[:, :seq_len, :seq_len] else: _position_ids = position_ids _attention_mask = attention_mask attention_mask_ = _attention_mask position_ids_ = _position_ids query_states = apply_rope(query_states, position_ids_) key_states = apply_rope(key_states, position_ids_) 这段代码主要处理的是位置编码和注意力掩码,这里主要是引入了RoPE编码,计算两个位置之间的相对距离来构造编码,而不是仅仅依赖于绝对位置,提高增强模型的泛化能力。 (4)缓存机制 if use_cache: if fill_kv_cache: past_key_values[layer_idx] = { "key_states": key_states, "value_states": value_states, } else: # TODO here, some optimization can be done - similar to a `StaticCache` we can declare the `max_len` before. # so we create an empty cache, with just one cuda malloc, and if (in autoregressive case) we reach # the max len, then we (for instance) double the cache size. This implementation already exists # in `transformers`. (molbap) key_states = torch.cat([past_key_values[layer_idx]["key_states"], key_states], dim=1) value_states = torch.cat([past_key_values[layer_idx]["value_states"], value_states], dim=1) 将每一层的Key和Value缓存到past_key_values[layer_idx]中,模型训练时这里的use_cache设置为0,当模型是推理时use_cache设置为1,fill_kv_cache设置为1。主要是在推理阶段,会先调用VLM+Expert模型推理一次将Key、Value进行缓存保存起来,后续就只是推理Expert了,VLM将不再计算了,通过这样的方式以提高计算效率。 (5)注意力输出 att_output = attention_interface( attention_mask_, batch_size, head_dim, query_states, key_states, value_states ) return [att_output], past_key_values 注意力计算时会把可用来源(VLM 前缀、Expert 后缀)各自算出的 Q/K/V在序列维度拼接后统一做一次注意力,但掩码保证了“单向可见”,即VLM 与 Expert 的 Q/K/V都参与拼接,但二维掩码使 VLM 基本不看 Expert,Expert 能看 VLM。 交叉注意力 交叉注意力在forward_cross_attn_layer中实现。下面来进行分析。 (1)前缀自注意力 if len(inputs_embeds) == 2 and not past_key_values: seq_len = inputs_embeds[0].shape[1] position_id, expert_position_id = position_ids[:, :seq_len], position_ids[:, seq_len:] prefix_attention_mask = attention_mask[:, :seq_len, :seq_len] layer = model_layers[0][layer_idx] # 选 VLM 的第 layer_idx 层 hidden_states = layer.input_layernorm(inputs_embeds[0]) # 投影出 VLM 的 Q/K/V query_state = layer.self_attn.q_proj(hidden_states).view(B, Lp, H, Dh) key_state = layer.self_attn.k_proj(hidden_states).view(B, Lp, H, Dh) value_state = layer.self_attn.v_proj(hidden_states).view(B, Lp, H, Dh) # 对 Q/K 施加 RoPE(相对位置编码) query_states = apply_rope(query_state, position_id) key_states = apply_rope(key_state, position_id) # 只在 prefix 上自注意力(用 prefix 的方阵 mask) att_output = attention_interface(prefix_attention_mask, batch_size, head_dim, query_states, key_states, value_state) att_outputs.append(att_output) else: expert_position_id = position_ids 当满足inputs_embeds有前缀+后缀的数据且没有缓存的时,只取VLM的输入prefix用于计算自注意力,输出结果为att_outputs。同时如果这层是Expert的交叉注意力,那么VLM计算出来的K/V后面要给到后面Expert用作cross的K/V。 上面前缀自注意力只有只有训练的模型的时候进入交叉注意力每次都会跑,在推理阶段时每次推理只会跑一次。 (2)K/V cache缓存处理 if use_cache and past_key_values is None: past_key_values = {} if use_cache: if fill_kv_cache: past_key_values[layer_idx] = {"key_states": key_states, "value_states": value_states} else: key_states = past_key_values[layer_idx]["key_states"] value_states = past_key_values[layer_idx]["value_states"] 推理的时候会用到缓存,在推理时会调用两次forward。 建缓存阶段(prefix-only):外层会先单独跑一遍,只给 inputs_embeds=[prefix_embs, None],fill_kv_cache=True,把 VLM prefix 的 K/V 存到 past_key_values[layer_idx]。 后缀阶段(真正 cross):用 inputs_embeds=[prefix_embs, suffix_embs] 或者只给 suffix,fill_kv_cache=False,此时直接复用缓存里的 prefix K/V,不用再算。 (3)Expert的交叉注意力 expert_layer = model_layers[1][layer_idx] # 取 Expert 的第 layer_idx 层(可能是 None) if expert_layer is not None: expert_hidden_states = expert_layer.input_layernorm(inputs_embeds[1]) expert_layer is None 的出现是由 get_model_layers 对齐规则决定的,multiple_of = num_vlm_layers // num_expert_layers。Expert要能够计算交叉注意力也要满足当前层是否有Expert层。因为VLM和Expert是对齐的,不一定每一层都有Expert,而当self_attn_every_n_layers设置为2时,相当于是奇数层才会自注意力,而当VLM为16,Expert为8,那么正好Expert都在偶数层基数层没有,所以整个模型都没有注意力机制计算。 expert_query_state = expert_layer.self_attn.q_proj(expert_hidden_states) \ .view(B, Ls, He, Dhe) # 先把 VLM 的 K/V 合并 head 维,变为 [B, Lp, H*Dh] _key_states = key_states.to(dtype=expert_layer.self_attn.k_proj.weight.dtype).view(*key_states.shape[:2], -1) _value_states = value_states.to(dtype=expert_layer.self_attn.v_proj.weight.dtype).view(*value_states.shape[:2], -1) # 再喂给 Expert 自己的 k_proj/v_proj,把维度映射到 Expert 的头数与 head_dim expert_key_states = expert_layer.self_attn.k_proj(_key_states) \ .view(*_key_states.shape[:-1], -1, expert_layer.self_attn.head_dim) # [B, Lp, He, Dhe] expert_value_states = expert_layer.self_attn.v_proj(_value_states) \ .view(*_value_states.shape[:-1], -1, expert_layer.self_attn.head_dim) Expert的expert_query_state来自自己的输入,而expert_key_states、expert_value_states来之与key_states、value_states即为VLM计算过来的缓存K/V。也就是Expert计算注意力是Q使用自己的,而K/V使用的是VLM的。但是需要注意的是可能两边的模型VLM和Expert的hidden宽度、KV头数/维度不一样,先把 VLM K/V 的多头维合并(view(*, H*Dh)),再用 Expert 自己的 k_proj/v_proj 做一次线性变换,映射到 Expert 的多头维度。这就是代码里 “cross K/V 适配层” 的作用;对应到 init,当 attention_mode 包含 "cross" 时,会把 Expert 的 k_proj/v_proj 重定义成输入维=VLM 的 kv_heads x head_dim,输出维=Expert 的。 # 让 Expert 的 token 位置从 0 开始(RoPE 需要相对位置) expert_position_id = expert_position_id - torch.min(expert_position_id, dim=1, keepdim=True).values # 行选择 Expert 的 queries(后缀那段),列只到 prefix 的 K/V 长度(严格 cross,不看自己) expert_attention_mask = attention_mask[:, -inputs_embeds[1].shape[1]:, : expert_key_states.shape[1] ] # 对 Expert 的 Query 施加 RoPE expert_query_states = apply_rope(expert_query_state, expert_position_id) att_output = attention_interface(expert_attention_mask, batch_size, head_dim, expert_query_states, expert_key_states, expert_value_states) att_outputs.append(att_output) 接下来就是计算mask,确保Expert计算cross时只看到前缀(纯cross-attn),不能自回看(不看后缀自身)。再计算RoPE的位置编码,最后调用attention_interface计算交叉注意力得到结果输出。 return att_outputs, past_key_values 最终返回的是两个流对应的自注意力输出,att_outputs 的 长度与 inputs_embeds 对齐,索引0代表VLM 流的输出(前面 prefix 自注意力的结果);索引 1 代表Expert 流的输出(本层 cross 的结果;没有 Expert 就是 None)。外层主循环会据此对两个流分别过 o_proj + 残差 + MLP 等,继续下一层。 总结一下:cross-attn 分支“不拼接 Expert 的 K/V”:Expert 的 Q 只对 VLM 的 K/V(经投影到 Expert 维度)做注意。训练时VLM K/V现场算出并可选择写入缓存;Expert Q 只看这份 VLM K/V。推理时先用前缀阶段填好 VLM KV 缓存;去噪时 Expert Q 直接用缓存的 VLM K/V。VLM 不产生 Q,不会“看”Expert。 Expert要计算交叉注意力需要满足什么条件? 主要看3个参数 L = num_vlm_layers:VLM 总层数 E = num_expert_layers:Expert 总层数(必须 > 0 且能整除 L) S = self_attn_every_n_layers:每隔 S 层强制走一次自注意力(=这层不做 cross) 某层做 cross 的条件 : i % M 0 且(S = 0 或 i % S != 0) 举例1:L=16, E=8;有Expert的层是{0,2,4,6,8,10,12,14},若S=2这些层全是S的倍数,那么没有一层做cross。若S=3,做cross的为{2,4,8,10,14}。 总结一下就是能做cross的,先看每隔几层做cross(间接有self_attn_every_n_layers决定)同时要满足能做cross的这几层有没有Expert。一般情况下,当VLM和Expert具有相同层数是,奇数层做Cross,如果Expert为VLM的一半是需要设置self_attn_every_n_layers设置大于2以上的奇数才能做cross。 层类型 训练时 推理时 Self-Attn VLM & Expert 各自算 QKV → 拼接 → 双向注意 → 切分结果 同训练,但 prefix KV 在首轮缓存,后续复用;双向依旧存在,但 VLM 冻结 Cross-Attn VLM 自注意更新自身 KV;Expert 只算 Q,从 VLM KV(线性投影后)读条件 prefix KV 已缓存;Expert 只算 Q,直接读缓存的 VLM KV;无需重复计算 模型配置 SmolVLAConfig 模型配置主要是SmolVLAConfig类,其决定了训练/推理是模型结构、预处理、优化器/调度器、以及VLM骨干选择与冻结策略。 class SmolVLAConfig(PreTrainedConfig): # Input / output structure. n_obs_steps: int = 1 chunk_size: int = 50 n_action_steps: int = 50 normalization_mapping: dict[str, NormalizationMode] = field( default_factory=lambda: { "VISUAL": NormalizationMode.IDENTITY, "STATE": NormalizationMode.MEAN_STD, "ACTION": NormalizationMode.MEAN_STD, } ) # Shorter state and action vectors will be padded max_state_dim: int = 32 max_action_dim: int = 32 # Image preprocessing resize_imgs_with_padding: tuple[int, int] = (512, 512) # Add empty images. Used by smolvla_aloha_sim which adds the empty # left and right wrist cameras in addition to the top camera. empty_cameras: int = 0 # Converts the joint and gripper values from the standard Aloha space to # the space used by the pi internal runtime which was used to train the base model. adapt_to_pi_aloha: bool = False # Converts joint dimensions to deltas with respect to the current state before passing to the model. # Gripper dimensions will remain in absolute values. use_delta_joint_actions_aloha: bool = False # Tokenizer tokenizer_max_length: int = 48 # Decoding num_steps: int = 10 # Attention utils use_cache: bool = True # Finetuning settings freeze_vision_encoder: bool = True train_expert_only: bool = True train_state_proj: bool = True # Training presets optimizer_lr: float = 1e-4 optimizer_betas: tuple[float, float] = (0.9, 0.95) optimizer_eps: float = 1e-8 optimizer_weight_decay: float = 1e-10 optimizer_grad_clip_norm: float = 10 scheduler_warmup_steps: int = 1_000 scheduler_decay_steps: int = 30_000 scheduler_decay_lr: float = 2.5e-6 vlm_model_name: str = "HuggingFaceTB/SmolVLM2-500M-Video-Instruct" # Select the VLM backbone. load_vlm_weights: bool = False # Set to True in case of training the expert from scratch. True when init from pretrained SmolVLA weights add_image_special_tokens: bool = False # Whether to use special image tokens around image features. attention_mode: str = "cross_attn" prefix_length: int = -1 pad_language_to: str = "longest" # "max_length" num_expert_layers: int = 8 # Less or equal to 0 is the default where the action expert has the same number of layers of VLM. Otherwise the expert have less layers. num_vlm_layers: int = 16 # Number of layers used in the VLM (first num_vlm_layers layers) self_attn_every_n_layers: int = 2 # Interleave SA layers each self_attn_every_n_layers expert_width_multiplier: float = 0.75 # The action expert hidden size (wrt to the VLM) min_period: float = 4e-3 # sensitivity range for the timestep used in sine-cosine positional encoding max_period: float = 4.0 可以分为几个部分 (1)输入输出与时序 n_obs_steps: 输入观测的历史步数,默认为1。 chunk_size:每次模型生成的动作序列长度(后缀序列长度)。 n_action_steps:外部消费的动作步数,需要满足n_action_steps <= chunk_size(代码中已校验)。 采样与训练的后缀长度在 VLAFlowMatching.sample_actions/forward 中使用,动作队列在 SmolVLAPolicy 中按 n_action_steps 出队。 (2)归一化与特征维度 normalization_mapping:各模态的标准化策略,视觉默认 Identity,状态与动作 MeanStd。 max_state_dim/max_action_dim:状态、动作向量的固定上限维度;短向量会 pad 到该维度(pad_vector)。 Normalize/Unnormalize 与 state_proj/action_ x _proj 的投影维度。 (3)图像预处理与空相机 resize_imgs_with_padding=(512,512):视觉输入 pad-resize 到固定分辨率,然后再做 [-1,1] 归一化(SigLIP 习惯)。 empty_cameras:允许在 batch 缺少图像时补空相机占位(用于多摄像头但部分缺失的场景)。 (4)Aloha 相关开关 adapt_to_pi_aloha:状态/动作与 Aloha 空间的双向转换(关节翻转、夹爪角度/线性空间互转)。 use_delta_joint_actions_aloha:将关节维度转为相对量(目前未在 LeRobot 中实现,置 True 会报错)。 (5)文本与采样步数 tokenizer_max_length=48:语言 token 最大长度。 num_steps=10:Flow Matching 反推理的 Euler 步数(越大越精细,越慢)。 prepare_language、sample_actions 的迭代去噪循环。 (6)缓存与注意力 use_cache=True:是否使用 KV-Cache(前缀只算一次,后续重复用)。 attention_mode="cross_attn":与 SmolVLMWithExpertModel 的交叉注意力对齐策略。 prefix_length=-1/pad_language_to="longest":前缀长度/语言 padding 策略;用于构造 attention_mask 与 position_ids。 (7)微调的策略 freeze_vision_encoder=True:冻结 VLM 视觉编码器。 train_expert_only=True:只训练动作 expert(VLM 其它部分冻结)。 train_state_proj=True:是否训练状态投影层。 影响SmolVLMWithExpertModel.set_requires_grad 以及 VLM 参数的 requires_grad 设置。 (8)优化器与调度器 optimizer_* 与 scheduler_*:在训练入口 TrainPipelineConfig.validate() 使用,生成默认的 AdamW + 余弦退火带预热调度。 可被 CLI 覆写(如 --optimizer.lr 等)。 (9)VLM骨干与权重加载 vlm_model_name="HuggingFaceTB/SmolVLM2-500M-Video-Instruct":指定用哪个 VLM 仓库(用于取 tokenizer/processor,和构建骨干结构)。 load_vlm_weights=False:是否直接从该 VLM 仓库下载骨干权重。为 False时只拿 AutoConfig 构结构,权重随机初始化,随后通常被策略检查点覆盖。为 True时用 AutoModelForImageTextToText.from_pretrained 加载骨干权重(仅在 --policy.type=smolvla 路线下常用)。 与 --policy.path 的关系为用 --policy.path=lerobot/smolvla_base 时,实际权重来自本地/Hub 的策略检查点(包含 VLM+expert),不会使用骨干权重,但仍会用 vlm_model_name 主要是加载 tokenizer/processor。用 --policy.type=smolvla 时,vlm_model_name 决定骨干结构,load_vlm_weights 决定是否拉骨干权重,expert 按本地配置新建训练。 (10)层数与宽度对齐 num_vlm_layers:把 VLM 的文本层裁剪为前 N 层再用。裁剪层数后设为 self.num_vlm_layers。 num_expert_layers:专家 expert 模型的层数;若 ≤0 则默认与 VLM 层数相同。决定 expert 与 VLM 的层对齐步长 multiple_of = num_vlm_layers // num_expert_layers。只有在 i % multiple_of = 0 的 VLM 层位点才映射到一个 expert 层用于交叉注意力;其他层的 expert_layer 为空。 self_attn_every_n_layers:每隔 n 层强制走“仅自注意力”而不是交叉注意力。当 attention_mode 含 “cross” 且 fill_kv_cache=False 时,如果 layer_idx % n = 0 则走 self-attn 分支,否则走 cross-attn 分支。例如n=2 → 偶数层自注意、奇数层尝试交叉注意,但还需该层“有映射到的 expert 层”(见 multiple_of)才真正执行 cross-attn。 expert_width_multiplier:expert 的隐藏维度 = VLM 隐藏维度 × multiplier(同时重设 FFN 的 intermediate_size)。expert 更窄以降算力;但会改动线性层形状,需与加载的检查点一致,否则会维度不匹配。为实现 cross-attn,代码会按 VLM hidden 尺寸重建部分 q/k/v 投影,使其能接收来自 VLM 的输入(跳过“只自注意”层)。 在SmolVLAConfig配置集中定义了 SmolVLA 的“结构与训练/推理开关”。训练微调常用 --policy.path=lerobot/smolvla_base,此时多数结构参数不宜修改,微调时从smolvla_base中加载config.json配置;而从骨干自建训练时才需要精细调 num_expert_layers/num_vlm_layers/expert_width_multiplier/load_vlm_weights 等,并确保与骨干 hidden_size/层数一致。 加载流程 策略的加载主要分为两条入口路径,两者互斥,通过启动时参数指定。 (1)--policy.path=....方式 用 --policy.path=.....:指定一个已存在的策略checkpoint(Hub 上或本地目录)。如训练时微调可以指定lerobot/smolvla_base,推理时指定output/train/pretrained_model。会从 path/config.json 里反序列化成 SmolVLAConfig;会加载同目录下的 model.safetensors(整个策略权重:VLM骨干 + 动作专家 + 投影层等);训练开始时,模型已经有了一套完整的初始化参数(通常是预训练好的)。 python -m lerobot.scripts.train \ --policy.path=lerobot/smolvla_base \ --dataset.repo_id=xxx \ --batch_size=64 --steps=200000 这里会拿 Hugging Face Hub 上的 lerobot/smolvla_base(含 config.json + model.safetensors,整个策略权重:VLM骨干 + 动作专家 + 投影层等)来初始化。 (2)--policy.type=smolvla方式 指定一个 策略类别(由 @PreTrainedConfig.register_subclass("smolvla") 注册)。会创建一个全新的 SmolVLAConfig 对象(带默认超参),而不是加载 checkpoint。没有预训练权重,除非配合 load_vlm_weights=True,这时只会拉取纯VLM背骨的预训练权重(而动作专家层仍然是随机初始化)。可以用命令行参数覆盖任意超参(比如 --policy.num_expert_layers=4)。 python -m lerobot.scripts.train \ --policy.type=smolvla \ --dataset.repo_id=xxx \ --batch_size=64 --steps=200000 \ --policy.load_vlm_weights=True 从零(或仅用 VLM 预训练骨干)开始训练一个新策略。 下面以推理和训练举例说明其调用流程。 (1)训练使用policy.path方式 在 validate() 中读取 path,并把所有 --policy.xxx 作为“同层覆写”传入配置加载。 policy_path = parser.get_path_arg("policy") self.policy = PreTrainedConfig.from_pretrained(policy_path, cli_overrides=cli_overrides) self.policy.pretrained_path = policy_path 判断是从本地目录还是Hub下载获取配置文件,然后应用得到 SmolVLAConfig。只加载“配置”(config.json),不加载模型权重。权重加载发生在后续 policy_cls.from_pretrained(...)(另一个类,见 policies/pretrained.py)。 @classmethod def from_pretrained(cls, pretrained_name_or_path, *, ..., **policy_kwargs) -> T: model_id = str(pretrained_name_or_path) # 1) 决定从本地目录还是Hub取配置文件(只取config,不取权重) if Path(model_id).is_dir(): if CONFIG_NAME in os.listdir(model_id): config_file = os.path.join(model_id, CONFIG_NAME) else: print(f"{CONFIG_NAME} not found in {Path(model_id).resolve()}") else: try: config_file = hf_hub_download(repo_id=model_id, filename=CONFIG_NAME, ...) except HfHubHTTPError as e: raise FileNotFoundError(...) from e # 2) 应用CLI覆写(如 --policy.xxx=...) cli_overrides = policy_kwargs.pop("cli_overrides", []) with draccus.config_type("json"): return draccus.parse(cls, config_file, args=cli_overrides) 构建策略,注入数据集特征与统计,若存在 pretrained_path 则连同权重加载。 cfg.input_features/output_features = ... if cfg.pretrained_path: policy = policy_cls.from_pretrained(**kwargs) else: policy = policy_cls(**kwargs) 加载权重(目录或 Hub 的 model.safetensors),随后迁移到 device、设 eval()(训练循环里会再切回 train())。 if os.path.isdir(model_id): policy = cls._load_as_safetensor(...) policy.to(config.device); policy.eval() SmolVLA 特定初始化,即使走 path,仍按 vlm_model_name 加载 tokenizer/processor(非权重),并实例化骨干+expert。 self.language_tokenizer = AutoProcessor.from_pretrained(self.config.vlm_model_name).tokenizer self.model = VLAFlowMatching(config) (2)训练使用policy.type方式 draccus 按类型直接实例化 SmolVLAConfig(该类已注册)并解析 --policy.xxx。 @PreTrainedConfig.register_subclass("smolvla") class SmolVLAConfig(PreTrainedConfig): make_policy 同上;因无 pretrained_path,默认从零构建。若配置 load_vlm_weights=true,才会把骨干权重从 vlm_model_name 拉下来(expert 仍需训练)。 if load_vlm_weights: self.vlm = AutoModelForImageTextToText.from_pretrained(model_id, ...) else: config = AutoConfig.from_pretrained(model_id) self.vlm = SmolVLMForConditionalGeneration(config=config) (3)推理模式只能使用policy.path方式 policy_path = parser.get_path_arg("policy") self.policy = PreTrainedConfig.from_pretrained(policy_path, cli_overrides=cli_overrides) self.policy.pretrained_path = policy_path record 的配置按policy.path加载训练的模型,随后通过 predict_action/select_action 使用策略进行推理。 policy.path 对比 policy.type 维度 policy.path=... policy.type=smolvla 配置来源 从 checkpoint 目录/Hub 仓库里的 config.json 反序列化成 SmolVLAConfig 通过 @PreTrainedConfig.register_subclass("smolvla") 新建一个默认 SmolVLAConfig,命令行可覆写 权重来源 从 checkpoint 里的 model.safetensors 加载完整策略权重(VLM骨干 + 动作专家 + 投影层) 默认全随机;若 load_vlm_weights=True,则只加载 VLM骨干权重(SmolVLM2),动作专家仍随机 归一化统计 不从 checkpoint 恢复,而是来自数据集 dataset_stats(normalize_inputs/targets在加载时被忽略) 同左 Tokenizer/Processor 仍然会用 config.vlm_model_name(默认 HuggingFaceTB/SmolVLM2)加载 tokenizer/processor 同左 常见场景 - 直接推理- 微调已有策略 - 从零开始训练新策略- 换结构做实验(改 num_expert_layers、expert_width_multiplier等) 推理可用性 一键可用(权重完整) 不可直接用(专家没训练,输出无意义),除非后续手动加载你自己训练好的权重 是否需要 HuggingFaceTB/SmolVLM2 权重 不需要(只用到它的 processor/tokenizer) 如果 load_vlm_weights=True → 需要拉骨干权重;否则全随机
- 
					
						  从数学角度理解flow matching中的线性插值什么是插值 插值的核心问题是:在已知两个点的情况下,如何找到它们之间的中间点。 举个人走路的例子,起点在家门口(A点),终点在公司(B点),总的路程为1000米,假设人是匀速移动,如果走到一半(t=0.5),那么人就在家和公司的正中间,如果走到四分之一(t=0.25),那么离家250米,离公司750米。 线性插值(LERP) 是最简单的一种:它假设两个点之间的变化是“直线型、匀速”的。 公式 $$ x_t = (1-t)x_0 + t x_1, \quad t \in [0,1] $$ 把两个量$x_0$和$x_1$按加权$1-t$与$t$做加权平均,得到他们之间的线性过渡点。$x_0$是起点,$x_1$是终点,$t$是插值因子,控制起点与重点的位置。当 $t=0$,结果是 $x_0$;当 $t=1$,结果是 $x_1$;当 $t=0.5$,结果是中点;$t$从0到1连续变化时,$x_t$沿着二者之间等速变化(为什么是等速,待会解释)。 先上个图看看直观体会一下。 如图所示起点$x_0$为坐标(1,1),终点$x_1$(4,3),当$t$从0开始连续变化是,$x_t$的位置会朝着$x_1$的方向变化。 当$t=0$时,坐标为(1 - 0) x (1, 1) + 0 x (4, 3) = (1, 1)。 当$t=0.2$时,坐标为(1 - 0.2) x (1, 1) + 0.2 x (4, 3) = (1.6, 1.4)。 当$t=0.5$时,坐标为(1 - 0.5) x (1, 1) + 0.5 x (4, 3) = (2.5, 2)。 当$t=1$时,坐标为(1 - 01) x (1, 1) + 1 x (4, 3) = (4, 3)。 通过公式可以算出每个时间的位置,但是如果要知道每个位置变化的速度/趋势我们应该怎么来衡量了? 那自然就是要求导数了。 $$ \frac{d}{dt}x_t = x_1 - x_0 $$ 可以看到其倒数是一个常数,那就意味着变化是匀速的,以上图为例,都是朝着(4,3)-(1,1)=(3,2)的向量方向去变化。 有了这个变化量,假设我们时间步$\Delta t=0.1$,那么意味着每次变化的位置是0.1 x (3,2) = (0.3, 0.2);假设当前位置是(1,1)那么就可以计算出下一个时间步的位置为$(1,1)+(0.3,0.2)=(1.3,0.2)$。这就与我们此前flow matching里面的公式$x_{k+1} = x_k + v_\theta(x_k, t_k)\Delta t$。
- 
					
						  轻量SmolVLA:半层VLM、视觉压缩与异步推理赋能具身智能概述 SmolVLA 是一套轻量级视觉-语言-行动(VLA)策略:前端用小型 VLM(视觉 SigLIP + 语言 SmolLM2)做感知与理解;后端用一个“动作专家”专门预测一段连续的低层控制。它与Pi0相比,参数规模少了将近10倍只有约0.45B(450M)。它的目标是在低算力下也能稳定执行多任务机器人控制,并保持接近甚至超过更大模型的效果。 SmolVLA 通过冻结 VLM、只训练动作专家(Action Expert),再配上四件“硬核小技巧”——取 VLM 的前半层、把每帧视觉 token 压到 64、以及Self-Attn—>Cross-Attn交替方式、异步推理;在大幅降算力与时延的同时,保持/逼近甚至超过更大模型的性能;注意力计算交替方式让动作专家既能不断获取外部视觉/语言指导,又能在内部序列里建立自己的时序与物理一致性,从而在算力可控的前提下提升稳定性和表现;提供异步执行,把“算下一段动作”和“执行当前动作”并行起来,显著减少空窗时间。 原理 结构 其模型结构主要有前端的VLM+后端的动作专家Action Expert组成,结构组成与Pi基本一致但实现方式有很大差异不同。先总结一下组件,稍后我们在稍作展开补充。 输入:文本指令token+视觉token(多摄像头采集的图像)+机器的状态(关节角、传感器)。 VLM(感知端):采用SmolVLM-2,VLM共有L层,但是只N=⌊L/2⌋层隐藏表示喂给动作专家。 Action Expert(控制端):一个Flow Matching Transformer,以以Cross → Causal Self → Cross 的“三明治”层为基本单元,按块预测n步动作序列。 输出:一次预测长度为n的动作块,对应机器的控制指令。 SmolVLA与Pi0有很多相似之处,不过其背后有四个关键设计,分别是Layer Skipping(层跳过)、Visual tokens reduction(视觉token压缩)、动作专家交替Self-Attn与Cross-Attn、异步推理。本小节先围绕前面3部分进行解析,异步推理于后续章节展开。 (1)Layer Skipping 层跳过就是把感知端的VLM(视觉+语言)的解码器中间层拿来当条件特征,而不是等它把整个层都计算完再输出;具体的做法就是只去$N=L/2$层的隐藏层表示送给动作专家,VLM权重冻结不训练。之所以要这样做经验规律表示(论文中作者提到)深层 LLM 层更偏“词级生成/长链路语义”,而中层已经集中了“指令 + 视觉”的对齐语义,对控制足够;继续往后让语言头生成 token 既耗算,又不是控制必须。前半层就停下,少算一半的自注意 + MLP,显存开销也随之下降。 大概得实现是把“文本指令 token、图像 token、状态 token”拼接,送入解码器;在第$N$层获取特征信息H然后用一个线性投影到Action expert所需的维度$d_a$作为$K/V$。如果在长时、极强推理型任务(需要深层语言生成)时可以适当调大N。 (2)视觉token压缩 在transformer里面,“token”就是序列里的一个位置。对图像来说,我们把一张图拆成很多小块(patch)或网格上的特征点,每个块/点用一个向量表示,这个向量就是视觉 token(不明白的可以看看ViT原理解析介绍)。视觉token压缩具体的做法是保整图、不裁块,把空间上密的token折叠到通道里,从而让token数变少。 设 ViT patch 后得到的特征图大小为 $\frac{H}{p} \times \frac{W}{p} \times d$,选一个下采样因子 $r$ (整数),做 space-to-depth: $$ \underbrace{\frac{H}{p} \times \frac{W}{p}}{\text{原网格}} \xrightarrow{\div r} \underbrace{\frac{H}{pr} \times \frac{W}{pr}}{\text{更稀疏的网格}}, \quad \underbrace{d}{\text{通道}} \xrightarrow{\times r^2} \underbrace{d \cdot r^2}{\text{更厚的通道}} $$ 计算示例: 输入尺寸:$512 \times 512$ 图像 Patch大小 $p=16$ ⇒ $32 \times 32$ token网格 选$r=4$ ⇒ $\frac{32}{4} \times \frac{32}{4} = 8 \times 8$ 网格 Token数:$8 \times 8 = 64$(减少$4^2=16$倍) 通道维度:$d=3 \rightarrow d=3 \times 16=48$ 可以看到如果是按照ViT的默认方式patch数量为32x32=1024,每个patch的维度为3x16x16=768,然后如果输入编码的$d_mode=512$那么经过线性投影变成矩阵(1024,512),即1024个token数量,每个token维度是512;而如果进行压缩后patch数量为8x8=64,每个patch的维度为48x16x16=12288,经过线性投影后变成(64,512)即64个token,每个维度是512。这里也可以看到原来是768降为到512,压缩的是从12288降维到512,降得比较猛,效果真的没有衰减吗? 总结一下smolvla在视觉token上进行了压缩,使用space-to-depth,对于512X512的图每帧token从1024降低到了64帧,如ViT的patch操作后得到的特征图维度为$\mathbb{R}^{\frac{H}{p} \times \frac{W}{p} \times d}$,选择下采样因子 $r$(整数)进行space-to-depth操作: $$ \mathbb{R}^{\frac{H}{p}\times\frac{W}{p}\times d} \xrightarrow{\text{S2D}_{r}} \mathbb{R}^{\frac{H}{pr}\times\frac{W}{pr}\times(d\cdot r^{2})} $$ 这样token数减少$r^{2}$倍,把细节挪到通道数去。 (3)动作专家交替Self-Attn与Cross-Attn 在动作专家中使用了交叉注意力机制,具体的排布可以配置。VLM的每一层与右边的Expert是一一对齐的,当然也可以配置Expert模型只有VLM层数的一半,每两层VLM才有一层Expert,那么其中VLM对齐层将为NONE,下图以VLM和Expert都为4层来示例交替注意力的实现。 Self-Attn(管自己,守时序):只允许第 i 步看 ≤i 的历史步(因果掩码),在动作序列内部传播动力学与约束,做轨迹的时间一致性与平滑。这一步相当于“内化刚才读到的证据”并让各步动作彼此协调。在计算注意力时,会将VLM的QKV与Expert的QKV进行拼接起来一起送入transformer计算,但通过掩码保证 VLM 的Q只看自己(不去读 Expert),而 Expert 的 Q 可以访问 VLM 的 K/V(即“读”VLM 语义),这样既提供了计算效率也提升了Expert的语义丰富性。 Cross-Attn(看环境,取证):看环境取证,让每个动作 token 先从条件特征里“读”一遍(条件=VLM中间层输出,含文本指令+多路视觉+状态)。这样动作表示一开始就被场景锚定,知道当下该关注哪里/哪件物体。具体是交叉注意力计算Q来自Expert Action自己,而K/V 来自 VLM 对应层的输出缓存。 训练 目标让动作专家 在观测条件 $o_t$ 下输出$v_\theta$速度场,把“噪声动作”沿路径推向真实动作块 $A_t$。这里跟Pi0和Flow Matching是一样大同小异,就简要说明一下。 观测条件:$o_t=H^{(N)}\in\mathbb{R}^{T\times d_o}\ \xrightarrow{\text{proj}}\ O_N\in\mathbb{R}^{T\times d_a}$(VLM 冻结;$O_N$ 作为 Cross-Attn 的 K/V)。 真实动作块:$A_t\in\mathbb{R}^{n\times d_{\text{act}}}$(建议标准化/白化)。 噪声:$\varepsilon\sim\mathcal N(0,I)$(同形)。 路径时间:$\tau\sim\mathrm{Beta}(\alpha,\beta)$。这里与Pi0不同。 路径与目标速度场: $$ A_t^{\tau}=\tau A_t+(1-\tau)\varepsilon,\left(A_t^{\tau}\mid A_t\right)=\varepsilon-A_t $$ 计算损失函数: $$ \mathcal{L}^{\tau}(\theta) = \mathbb{E}{p\left(\mathbf{A}{t} \mid \mathbf{o}{t}\right), q\left(\mathbf{A}{t}^{\tau} \mid \mathbf{A}{t}\right)} \left[ \left|| \mathbf{v}{\theta}\left(\mathbf{A}{t}^{\tau}, \mathbf{o}{t}\right) - \mathbf{u}\left(\mathbf{A}{t}^{\tau} \mid \mathbf{A}{t}\right) \right||^{2} \right] $$ 这里 $||\cdot||^{2}$ 表示欧氏范数平方;实现即逐元素 MSE。 为提升推理效率,动作专家隐藏宽度取 $d_a=0.75\times d$($d$ 为 VLM 的隐藏宽度)。 以下是一个简单的示例,可看看过程理解一下。 # 条件:取 VLM 第 N 层隐藏(冻结) with torch.no_grad(): H = vlm_hidden_at_layer_N(obs_tokens) # [T, d_o] KV = proj(H) # [T, d_a] 供 Cross-Attn 作 K/V(可缓存) # 构造路径与目标速度 A = sample_action_chunk() # [n, d_act] (已标准化) eps = torch.randn_like(A) # [n, d_act] tau = Beta(alpha, beta).sample(()).to(A.device) A_tau = tau * A + (1 - tau) * eps u = eps - A # 前向与损失 pred = v_theta(A_tau, KV) # 与 u 同形 loss = F.mse_loss(pred, u) # 对应 ||·||^2 loss.backward(); optimizer.step(); optimizer.zero_grad() 推理 在SmolVLA提到了异步推理,先来看看同步推理。 取最新观测 → 得到 $O_N$; 以噪声初始化,做 $K\approx10$ 步显式积分(Euler/Heun)得到一个动作块 $[a_t,\dots,a_{t+n-1}]$; 执行动作块 → 重复。 同步(sync)推理一次性生成长度为 $n$ 的动作队列(chunk)$A_t=\big[a_t,\dots,a_{t+n-1}\big]$,执行完再用新观测预测下一段。执行与推理串行,会产生“空窗”(执行停下等待推理)。而异步(async)推理是解耦“动作执行”和“动作预测”。机器人端持续消费现有队列;当队列余额低于阈值就异步把当前观测发给策略端预测“下一段”,回来后与旧队列重叠拼接。这样执行与推理并行,显著降低总时延,同时仍保持接近的成功率。 异步推理在架构上可以分为两个部分: RobotClient(机器人端):以控制周期 $\Delta t$ 持续下发队列头部动作;本地维护动作队列 $A_t$ 与触发逻辑;可做相似度过滤(见下节)。 PolicyServer(策略端):接收观测 $o_t$,运行策略 $\pi$ 预测新队列 $\tilde A_{t+1}$ 后返回;可放在更强的远端算力(GPU/工作站/云)。 看看论文中给出的算法实现: 设时域 $T$、段长 $n$、触发阈值 $g\in[0,1]$。 初始化:采集 $o_0$,发送到策略端,得到首段 $A_0\leftarrow\pi(o_0)$。 主循环 对 $t=0\dots T$:取出并执行一步 $a_t\leftarrow\text{PopFront}(A_t)$;若 队列余额占比 $\dfrac{|A_t|}{n}<g$,采集新观测 $o_{t+1}$;若 NeedsProcessing$(o_{t+1})$ 为真(见“相似度过滤”),则异步触发:①发送 $o_{t+1}$ 到策略端,得到新段 $\tilde A_{t+1}\leftarrow\pi(o_{t+1})$(异步返回);②用重叠拼接函数 $f(\cdot)$ 合并:$A_{t+1}\leftarrow f(A_t,\tilde A_{t+1})$;若本轮异步推理尚未结束:$A_{t+1}\leftarrow A_t$(继续消费旧队列)。 论文中的 NeedsProcessing 用于避免重复观测触发;$f$ 表示对重叠步的拼接(线性渐入/平滑器等,见下重叠拼接(Overlap & Merge))。 关键触发量,队列余额阈值 $g$: 触发条件:当 $\dfrac{|A_t|}{n}<g$ 时触发一次异步预测。 直觉:$g$ 越大,越提前触发,越不容易见底;但也会更频繁地调用策略端(算力/网络开销更高)。 论文的三个代表场景:$g=0$(顺序极限):耗尽队列才发起新预测 → 一定出现空窗等待;$g=0.7$(典型异步):每段大约消耗 $1-g=0.3$ 的比例就触发,计算摊在执行过程中,队列不见底;$g=1$(计算密集极限):步步都发观测 → 几乎“满队列”,反应最快但计算最贵(等同每个 tick 都前向一次)。 对于相似性过滤做法:主要动机是观测几乎不变时没必要反复调用服务器 → 降低抖动与无效请求。具体做法(论文)是用关节空间距离作为近似(例如欧式距离),若两次观测间距离 $<\varepsilon$(阈值,$\varepsilon\in\mathbb{R}^+$)则丢弃本次请求。兜底做法是若队列真的耗尽,则无论相似度如何都要处理最近的观测,以防停摆。 重叠拼接(Overlap & Merge):核心思想通过重叠区域平滑过渡避免硬切抖动,数学上实现是设旧队列尾部与新队列头部重叠 $w$ 步,对第 $k=0,\dots,w-1$ 步做线性渐入融合: $$ a_{t+k}^{\text{merge}} = \alpha_k \tilde{a}{t+1+k} + (1-\alpha_k) a{t+k}, \quad \alpha_k = \frac{k+1}{w} $$ 也可用余弦窗、Slerp 或在位姿/速度层加滤波器;关键是重叠 + 平滑避免硬切抖动。 总结一下对于异步并发处理有优势,但是需要处理其中的细节主要是: 维护动作队列。前台执行当前队列,后台异步预测下一段;在重叠窗口内平滑拼接新旧段。 避免队列见底的解析下界,设控制周期为 $\Delta t$,则避免队列耗尽的充分条件为 $$ g\ \ge\ \frac{\mathbb E[\ell_S]/\Delta t}{n} $$ 其中 $\ell_S$ 为一次(本地/远端)推理延迟,$\Delta t$ 控制周期,$n$ 为动作块长度。从触发到返回的时间内(平均 $\mathbb{E}[\ell_S]$ 秒)你还要有足够的剩余动作可执行(约 $\mathbb{E}[\ell_S]/\Delta t$ 个),所以触发点的剩余比例至少为这部分占 $n$ 的比值。论文配合给出真实控制频率示例(如 $30$ FPS $\to \Delta t=33,$ms),并分析了不同 $g$ 对队列曲线的影响(下图)。 数据 论文中提到的复现配置如下: 模型与输入:冻结 VLM,仅训动作专家;取 前半层 $N=\lfloor L/2\rfloor$ 的 $H^{(N)}$ → 投影成 $O_N$。图像 512×512;64 视觉 token/帧;状态→1 token;bfloat16。 动作块与解算: 每段 $n=50$;推理 10 步 Flow-Matching 积分。 优化:训练 200k step;global batch 256;AdamW($\beta_1=0.9,\ \beta_2=0.95$);余弦退火学习率 $1\times10^{-4}\to2.5\times10^{-6}$。 参数量: 总计 ≈450M;动作专家 ≈100M;若 VLM 有 32 层,取前 16 层。 论文中提到需要关注的信息: 模拟(LIBERO/Meta-World):中等规模(~0.45B)已对标/超过若干更大基线;放大到 ~2.25B 继续提升。 真实机器人(SO100/101):多任务平均成功率 ≈78%,优于 π0 与 ACT。 异步 vs 同步:成功率相近,但异步平均完成时间缩短 ~30%,固定窗口内完成次数显著更多。 论文中提到的落地经验: 形状与缓存:把 $H^{(N)}(T\times d_o)$ 投到 $d_a$ 后当 K/V;两次 Cross 复用 KV 缓存。 因果掩码:Self-Attn 必须用因果掩码(第 $i$ 步不可看未来)。 视觉压缩:优先用 space-to-depth 固定 64/帧;任务特别细腻时用 $r=2$(256 token)或多尺度/ROI 方案。 起步超参:$n=50$、积分步数 10、$N=\lfloor L/2\rfloor$、$d_a=0.75d$。 异步阈值:按 $g\ge\frac{\ell_S/\Delta t}{n}$ 设定,取略高于下界更稳;配合相似度过滤与重叠拼接。 动作归一化:对不同量纲(角/位移/速度)做标准化,训练更稳、积分不发散。 交替注意力有效:Cross + 因果 Self 明显优于单一注意力;“用前半层”普遍优于“直接换小 VLM”。 参考:https://arxiv.org/abs/2506.01844
- 
					
						  浅析Pi0 :VLM 与 Flow Matching 的结合之道概述 传统机器人策略模型往往局限在单一任务或平台,难以跨场景泛化。与此同时,大规模 视觉-语言模型(VLM) 已展现出卓越的语义理解与任务指令解析能力。如果能将 VLM 的语义理解能力 与 Flow Matching 的连续动作建模能力 结合,有望构建具备泛化与实时性的机器人通用控制器。 Pi0 (π0)正是这样一个探索:基于 PaliGemma(3B 参数 VLM) 作为感知与语义主干,结合 Flow Matching 动作生成器,实现语言到多机器人动作的端到端建模。它借鉴了大语言模型的“预训练 + 微调”范式,把互联网级别的语义知识和机器人操控数据结合起来,从而实现跨平台、跨任务的通用机器人控制。 我们此前分析了VLM、Flow Matching原理,掌握这些之后理解Pi0是非常简单的。 原理 结构 模型结构主要有VLM主干+ Action Expert动作专家构成。 VLM主干:基于 PaliGemma(一个 3B 参数的 VLM),继承互联网规模的图像+语言知识。 Action Expert(动作专家):额外的子网络,负责用 Flow Matching 方法预测连续动作向量。 模型的输入包括观测的多视角RGB图像、语言指令、机器人自身状态(关节角、传感器),经过模型处理后输出为高频动作序列(每秒50HZ动作chunk),这些动作控制单臂、双臂、移动操作臂等多类机器人。 训练 我们训练的目标是让$A_t^0 \sim \mathcal{N}(0, I)$ ——>$A_t$(真实动作),希望模型学会如何把一个“噪声动作”流动成一个真实的动作。就像扩散模型是“噪声 → 图像”,这里是“噪声动作 → 专家动作”。 在训练的时候要让噪声动作流向真实动作,我们需要构建一个路径,这里依旧使用的是直线路径。 $$ A_t^\tau = \tau A_t + (1-\tau)\epsilon, \quad \epsilon \sim \mathcal{N}(0,I) $$ 这个公式跟我们在Flow Matching文章中的训练公式是不是一样的。我们在噪声动作$\epsilon$和真实动作$A_t$之间,采样一个"插值点"。$\tau $表示时间的进度,当$\tau = 0$时完全是噪声,当$\tau = 1$时完全是真实动作,这个就构造了一条噪声到动作的直线路径。 我们的目标是要让模型告诉我们"从当前点$A_t$应该往哪个方向移动,才能逐渐靠近真实动作",因此就是在计算在每个时间速度。 $$ u(A_t^\tau \mid A_t) \triangleq \frac{d}{d\tau} A_t^\tau $$ 代入公式可得: $$ \frac{d}{d\tau} A_t^\tau = A_t - \epsilon $$ 而论文中成$u(A_t^\tau \mid A_t) = \epsilon - A_t$,只是方向约定相反,本质上没有差异。上面的公式,目标速度就是噪声 - 动作,它定义了“流动的方向”。就像在地图上,目标向量场就是指路的“箭头”。这样得到了真实的速度场,我们就可以在训练的时候计算损失了。 $$ L(\theta) = \mathbb{E}\big[ | v_\theta(A_t^\tau, o_t) - u(A_t^\tau \mid A_t) |^2 \big] $$ $v_\theta$是神经网络(Action Expert),输入 当前 noisy action + 观察$o_t$,输出预测的速度场。损失函数就是 预测的速度场 vs 真实的目标速度场 的均方误差 (MSE)。训练目标:让模型学会在任意中间点给出正确的“流动方向”。 推理 $$ A_t^{\tau+\delta} = A_t^\tau + \delta v_\theta(A_t^\tau, o_t) $$ 推理生成也比较简单,从噪声动作$A_t$开始,每次迭代一步:输入当前的$A_t^\tau $和观察的$o_t$,接着模型给出速度场,就沿着这个方向走一步(步长$\delta$),然后按照这个步骤重复迭代,最终得到真实的动作$A_t$。和扩散模型不同:这里不需要几十/上百步,只要 ~10 步 ODE 积分,就能得到高质量动作,适合机器人实时控制。 参考: https://arxiv.org/abs/2410.24164
- 
					
						  Flow Matching:让生成模型“流动”起来背景 上一篇文章分析了diffusion扩散模型。diffusion扩散模型做法是加噪声、再一步步去噪,训练过程复杂,还需要 carefully 设计噪声调度。 Flow Matching提出了更直接的方式:与其通过一大堆离散的“加噪/去噪”步骤,不如直接学习一个连续的流动 (flow),让点从噪声“顺滑地流动”到目标数据。 原理 把生成过程看作流体运动,想象有一堆水滴(噪声),通过一个力场,它们会被推动、流动,最后聚集成目标形状(真实分布的数据)。Flow Matching从物理学角度学一个"速度场",让数据点从"源分布(噪声)"流动到"目标分布(真实数据)"。 如图所示左边是源随机点云,中间是目标形状,右边是实际使用模型生成的形状。为了更直观的体会再来看下图从源分布逼近目标分布的过程。 左图就是源分布的点在不同时间应该朝那个方向运动直到最终的目标分布,右图是不同时刻让这些点应该往哪个方向进行流动速度场。 接下来看看数学怎么表示,我们希望从源分布$p_{src}$(比如高斯分布)按照流动的方式到目标分布$p_{data}$,那么方式就是在每个时间$t$为每个点$x$都指定一个速度$v_\theta(x,t)$,这样在不同时间就知道点该往哪里动,那么点的轨迹就完全确定了。在数学上点的位置$x(t)$随着时间变化,那就是速度场向量,即常微分方程 $$ \frac{dx}{dt} = v_\theta(x, t) $$ 左边的$\frac{dx}{dt}$描述的是随时间的变化率,右边$v_\theta(x, t)$就是我们要学习的"速度场",它给出"$t$时刻,位置$x$应该往哪里动"。 总结一下Flow Matching 里速度场写成 ODE,是因为它给出“点的位置随时间的变化率”,这正是常微分方程的定义,生成过程就是解 ODE,从噪声轨迹流到数据。 推理 模型要做的事情就是要预测出下一个时间刻应该往哪里走,输出是一个速度场;推理的过程就是解常微分方程ODE。 输入:当前位置$x \in \mathbb{R}^d$,当前时间$t \in [0,1]$。 输出:模型计算输出当前的速度向量,即$\frac{dx}{dt} = v_\theta(x,t)$。 更新:根据速度向量$v_\theta(x,t)$通过积分公式把所有时间段速度累积起来得到最终点$x(1)$。 $$ x(1) = x(0) + \int_{0}^{1} v_\theta(x(t),t)\,dt $$ 直观理解就是神经网络提供"切线方向",积分就是"把所有切线拼起来",形成完整的轨迹,从噪声走到目标分布。 但实际过程中我们用离散的数值方式,比如欧拉法,如下: 时间从$t$=$0$到$t$=$1$,分成若干小步(比如50或100步),在每一步按照上面公式更新。 输入:当前的位置$x_k$和当前时间步$t_k$。 输出: 模型预测计算速度向量场$v_\theta(x_k, t_k)$。 更新:通过欧拉法更新公式更新下一步位置$x_{k+1} = x_k + v_\theta(x_k, t_k)\Delta t$ 每一步模型计算出速度向量$v_\theta(x_k, t_k)$然后根据公式进行更新下一步的位置,新位置=旧位置+速度x时间步长;$v_\theta(x_k, t_k)\Delta t$计算每次迭代的移动距离(速度x时间),这就是基本的欧拉积分法,直观的意义是在短时间$\Delta t$内,点会沿着速度场方向前进一点。不断的进行多步迭代,从$x_0$出发,逐步得到$x_1$,$x_1$,$x_2$,$x_3$,....,$x_k$,当$k$=$K$时,$t_K$=$1$,就得到最终的$x(1)$。 怎么理解$t_k$、$x_k$、$\Delta t$? $t_k$是第$k$步对应的时间点,如果flow matching的时间区间是[0,1],我们把它切成$K$个小步(如50或100步),每个时间点就是$t0$=$0.00$,$t1$=$0.01$;$\Delta t$是时间步长如把时间区间[0,1]均匀分成100步,那么$\Delta t$=$1/100$=$0.01$;$x_k$是表示在$t_k$时的点(或点云),初始时从高斯噪音采样到。 下面再来一个直观图展示了Flow Matching推理的过程。 灰色箭头:代表速度场$v_\theta(x_k, t_k)$,告诉每个位置的点应该往哪里走。上图设定的目标是(2,2)。 绿色点:初始$x(0)$来自噪声分布即源分布。 红色叉:表示目标位置,代表数据分布的一个样本区域。 蓝色折现轨迹:数值积分结果,点一步一步验证速度场北推向目标。 训练 我们希望模型学会把源分布$p_{src}$流动到目标分布$p_{data}$;换句话说就是有$x_0 \sim p_{\text{src}}$,输出目标点$x_1 \sim p_{\text{data}}$我们要训练一个速度场网络$v_\theta(x_k, t_k)$,让它指导点$x_t$沿正确的路径从$x_0$——>$x_1$。 要训练行动轨迹需要知道真实轨迹这样才能和实际预测值做比较求损失,而训练的关键却正好是不知道真实的速度场。那如何构建训练的目标了?可以设计一个简单的"参考轨迹",如直线路径$x_0$——>$x_1$。 $$ x_t = (1 - t)x_0 + tx_1 $$ 给定输入样本$(x_0 \sim p_{\text{src}},x_1 \sim p_{\text{data}})$,其中$x_0$是源随机位置,$x_1$是目标位置。在训练的时候我们自己定义一条直线路径$x_0$——>$x_1$,我们不能一步到位,而是要有一个流动的过程。 这条直线路径上的真实速度公式对$t$求偏导,而恰巧速度是一个常数(始终指向目标点$x_1$)。 $$ u^\star = \frac{dx_t}{dt} = x_1 - x_0 $$ 既然速度方向就是一个常数$x1-x0$,为什么不直接一步把$x1$变成$x0$,而要搞成连续流动了? 如果一步到位公式就变成$x_1 = x_0 + (x_1 - x_0)$,相当于直接跳到目标点,完全不需要ODE、积分、网络。但问题在于训练时我们有配对的$(x_0, x_1)$,所以能写下$(x_1-x_0)$,而推理时了我们只有$x_0 \sim p_{\text{src}}$,并不知道该对应那个$x1$,因此不能一步到位,因为没有$x_1$可直接计算。 最后我们训练目标就是网络预测的速度$v_\theta(x_k, t_k)$,损失就网络预测的速度$v_\theta(x_k, t_k)$与真实的速度$x_1-x_0$的均方误差。训练完成之后,网络就学会了在任何位置$x_t$、时间$t$给出正确的速度场。 $$ \mathbb{E}\Big[ || v_\theta(x_t, t) - (x_1 - x_0) ||^2 \Big] $$ 源码示例 为了加深理解,程序实现一个最小的 Conditional Flow Matching(直线路径的 Rectified Flow)示例,学习时间条件速度场 vθ(x,t),把二维标准高斯源分布推到左右两个高斯簇的目标分布。训练后输出两张图:训练损失曲线 cfm_loss.png,以及三联静态图 cfm_overview.png(源/目标/生成)。 # -*- coding: utf-8 -*- # Flow Matching demo: source N(0,I) -> target: Two Gaussians (left & right) # 输出: # 1) cfm_loss.png(训练损失) # 2) cfm_overview.png(三联图:Source / Target / Generated) # 依赖:pip install torch matplotlib import time, warnings warnings.filterwarnings("ignore", category=UserWarning, module="matplotlib") import numpy as np import torch, torch.nn as nn, torch.optim as optim import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt # ------------------------- 配置 ------------------------- device = torch.device("cuda" if torch.cuda.is_available() else "cpu") torch.manual_seed(0) XLIM = (-4.0, 4.0) YLIM = (-3.0, 3.0) # ------------------------- 数据分布 ------------------------- def sample_source(n): return torch.randn(n, 2, device=device) def sample_target(n): sigma = 0.35 means = torch.tensor([[-2.0, 0.0], [2.0, 0.0]], device=device) idx = torch.randint(0, 2, (n,), device=device) mu = means[idx] return mu + sigma * torch.randn(n, 2, device=device) # ------------------------- 模型:速度场 v_theta(x,t) ------------------------- class VelocityNet(nn.Module): def __init__(self, h=64): super().__init__() self.net = nn.Sequential( nn.Linear(3, h), nn.ReLU(), nn.Linear(h, h), nn.ReLU(), nn.Linear(h, 2), ) def forward(self, x, t): return self.net(torch.cat([x, t], -1)) # ------------------------- 训练(CFM,直线路径) ------------------------- def train_cfm(steps=2000, batch=512, lr=1e-3): net = VelocityNet().to(device) opt = optim.Adam(net.parameters(), lr=lr) loss_hist = [] t0 = time.time() for s in range(1, steps + 1): x0 = sample_source(batch) x1 = sample_target(batch) t = torch.rand(batch, 1, device=device) xt = (1 - t) * x0 + t * x1 u = x1 - x0 pred = net(xt, t) loss = ((pred - u)**2).mean() opt.zero_grad(set_to_none=True) loss.backward(); opt.step() loss_hist.append(float(loss)) if s % 200 == 0: print(f"[{s}/{steps}] loss={loss:.4f}") print(f"Train time: {time.time() - t0:.2f}s") return net, loss_hist # ------------------------- 采样(生成轨迹) ------------------------- @torch.no_grad() def generate_traj(net, n=3000, steps=60): x = sample_source(n) dt = 1.0 / steps traj = [x.cpu().numpy()] for k in range(steps): t = torch.full((n,1), (k + 0.5) * dt, device=device) x = x + net(x, t) * dt traj.append(x.cpu().numpy()) return traj # ------------------------- Matplotlib 工具 ------------------------- def save_loss(loss_hist, path): plt.figure(figsize=(6, 3.6)) plt.plot(loss_hist) plt.title("Training Loss (CFM)") plt.xlabel("step"); plt.ylabel("MSE") plt.tight_layout(); plt.savefig(path, dpi=140); plt.close() print(f"Saved {path}") def save_overview(src, tgt, gen, path): fig, axes = plt.subplots(1, 3, figsize=(12, 4)) titles = ["Source (Noise)", "Target (Two Gaussians)", "Generated (Flow Matching ODE)"] for ax, title, pts in zip(axes, titles, [src, tgt, gen]): ax.scatter(pts[:, 0], pts[:, 1], s=5, alpha=0.75) ax.set_title(title) ax.set_xlim(*XLIM); ax.set_ylim(*YLIM) ax.set_xticks([]); ax.set_yticks([]) plt.tight_layout(); plt.savefig(path, dpi=140); plt.close() print(f"Saved {path}") # (已移除 GIF 相关工具与依赖) # ------------------------- 主程序 ------------------------- if __name__ == "__main__": # 训练 net, loss_hist = train_cfm(steps=2000, batch=512, lr=1e-3) save_loss(loss_hist, "cfm_loss.png") # 数据与生成 src = sample_source(3000).cpu().numpy() tgt = sample_target(3000).cpu().numpy() traj = generate_traj(net, n=3000, steps=60) gen = traj[-1] # 三联静态图 save_overview(src, tgt, gen, "cfm_overview.png") # (已移除 GIF 生成步骤) print("All done.") (1)模型结构 模型结构为VelocityNet,使用了一个小型 MLP,输入 3 维(x 的 2 维 + t 的 1 维),输出 2 维速度向量。结构为Linear(3,64) → ReLU → Linear(64,64) → ReLU → Linear(64,2)。forward(x,t) 直接拼接 [x, t] 后送入网络。这里没有使用时间位置编码。 (2)训练过程 训练函数为train_cfm(steps=2000, batch=512, lr=1e-3),具体过程如下: 1) 每步采样源 x0 ~ source 和目标 x1 ~ target,独立均匀采样 t~U(0,1)。 2) 构造直线桥接点 xt = (1 - t)x0 + tx1。 3) 定义理想恒定速度 u = x1 - x0(常速,不依赖 t)。 4) 让网络在 (xt, t) 上预测 pred = vθ(xt,t),用 MSE(pred, u) 作为损失。 5) Adam 更新一次;每 200 步打印当前损失。 6) 返回训练好的 net 与 loss_hist。 直观理解,虽然 u 依赖 (x0, x1),但模型只观察 (xt,t)。训练学到的是条件期望 E[x1 - x0 | xt, t],也就是让网络在直线路径上学会把点往“正确方向”推的平均速度。这是直线路径 CFM 的核心思想。 (3)采样 采样函数为generate_traj,从源分布采样 n 个起点,设步长 dt=1/steps。用无梯度模式按欧拉法更新:对每步 k,用中点时间 t=(k+0.5)dt 预测速度 vθ(x,t),然后 x ← x + vθ(x,t)dt。记录每一步的点云到列表,返回整个轨迹(列表元素是 numpy 数组)。主程序中只使用最后一步作为“生成结果”。 (4)主流程 最后就是主流程先调 train_cfm 进行训练,保存 cfm_loss.png。分别采样 3000 个源样本 src 与目标样本 tgt。生成 n=3000、steps=60 的轨迹 traj,并取 gen = traj[-1] 作为最终生成样本。保存 cfm_overview.png,展示源/目标/生成的对比。 整体主要的实现点为 目标路径:x_t = (1 - t) x0 + t x1,直线连接源与目标。 理想速度:dx/dt = x1 - x0,使点沿直线以恒定速度匀速前进。 学习目标:在 (xt,t) 上回归 u = x1 - x0 的条件期望;推断时只需网络与当前状态,无需知道具体的 x0 或 x1。 数值积分:使用欧拉法简单高效;采用中点时间能略微减小离散误差。
- 
					
						  Diffusion:如何从噪声中生成清晰图像概述 图像生成是当下研究的热点,diffusion是一种人工智能领域图像生成的基础模型,当下Stable diffusion、DALL·E、MidJourney文生图模型的基座都使用了diffusion。 diffusion扩散模型属于生成式模型,生成图像不是正向从0到1构成图像而是反向的预先生成一个随机的噪声图中然后根据文本提示词逐渐的去噪"扣"出图像。主要思想是先训练一个权重模型,把一张清晰照片弄得越来越模糊(加入噪声),然后把模糊的图片融合文本提示词作为输入去训练一个模型学会“擦亮它”,反向恢复成清晰图像。训练完成后,就得到了模型的权重,那么使用这个权重模型只要给一副完全随机的“噪点图”和要生成图片的提示词,它就能一步步去掉噪声,变出一幅崭新、逼真的图片。 借用米开朗基罗雕刻"大卫像"时说的"我在大理石中看见天使,于是我不停地雕刻,直至使他自由”。而diffusion也是这样的原理,通过随机生成的一个噪声图片,结合输入的文字去掉噪音恢复到你想象的照片样子。 工作原理 推理 (1)输入阶段 输入阶段有3个输入信息,分别是随机噪声图像、文本提示、时间步。 随机噪声图像:最开始随机生成一个高斯噪声的图片。 文本提示:告诉模型,想要生成的内容是什么。 时间步:指明当前是去噪第几步,模型是一个多步迭代去噪的过程。按照数字依次递减进行迭代,数值越小去噪强度越弱。 (2)模型处理 核心组件是Noise Predictor(一般是一个U-Net结构神经网络),输入的带噪图像$X_{t}$、时间步$t$、以及提示文本通过Noise Predictor预测出这张图里有多少噪声,生成一张噪声图片$\epsilon^\theta(x_t, t, c)$。 (3)输出阶段 将输入-减去预测出的噪声图片就得到最后的去噪图片了,$x_{t-1} = x_t - \epsilon^\theta(x_t, t, c)$。 (4)迭代 迭代一轮得到一个降噪图片之后,接着将输出的降噪图片作为输入的带噪图片按照之前的步骤进行重复,直至$t$=$T$(比如$1000$)一直迭代到$t$=$0$得到最终的图像。当所有步骤完成后,随机噪声逐渐被“洗掉”,生成的就是一张符合条件描述的清晰图像。 下面是推理过程的算法伪代码 初始化:$x_T \sim \mathcal{N}(0, I)$从标准高斯分布中采样一个随机噪声向量(或噪声图像),作为生成过程的起点。 迭代循环:从$t$=$T$到$t$=$1$逐步迭代,每次去掉一部分噪声。如果$t$>$1$,额外采样一个噪声向量$z\sim \mathcal{N}(0, I)$。如果$t$=$1$,则$z$=$0$,即最后一步不加噪声。 核心公式:先去掉预测的噪声(括号里面的部分)得到更接近干净数据的样子,接着在进行缩放调整(除以$\sqrt{\alpha_t}$),最后加一点随机噪声$\sigma_t z$来保持生成的多样性。 输出:当循环结束时,最终的$x_0$就是最终生成的清晰图像了。 对于核心公式的参数这里稍微补充一下 参数 $\epsilon_\theta(x_t, t)$是预测的噪声; 参数$\alpha_t$取值范围是$0$~$1$,控制在第$t$步中保留多少原始图像信息加入多少噪声,当$\alpha_t$接近$1$时几乎保留全部信息,噪声小;当值趋于0时,原始信号衰减就大,噪声比例高; 参数$\bar{\alpha}_t$累积乘积参数,表示从第$1$步到$t$步累积保留原始信息的比例。 参数$\sigma_t z$随机扰动项,保持采样的多样性。 训练 训练模型我们需要把模型的输出结果和真实值进行比较才能进行梯度下降找到网络权重,那该如何设计准备训练结果和真实值的数据? diffusion模型的核心是要预测出图片的噪声分布然后减去预测的噪声得到真实的输出照片。以上图第一步进行说明,使用原始的图片,通过随机生成一个噪声图($x_{1}$)迭加作用到原始图片上这样就得到了模型的带噪声的输入图像,然后融合文本、时间步模型前向计算得到噪声图($x_2$)。已经知道了真实的噪声图是$x_{1}$,那么计算$x_{1}$和$x_{2}$的相似性就可以计算出损失了。 训练过程中关于图片-文本可以从Lion平台上获取,通过上面步骤取样照片然后不断加强噪声得到越来越模糊的图片送入模型预测进行计算迭代权重,让模型学会真正准确预测每一步中"加进去的噪声",训练完成之后,模型学会了如何"识别噪声",在推理时就从纯随机噪声$x_T$出发,通过文本提示词反向迭代去噪得到最终的想要的照片。 论文中的伪代码如下: repeat:表示循环执行训练过程。 采样数据:$x_0 \sim q(x_0)$从真实数据分布$q(x_0)$中采样一个训练样本比如一张猫。 随机采样时间步:$t \sim \text{Uniform}({1, \dots, T})$随机挑选一个扩散的时间步$t$,确保模型能在不同噪声水平都学会去噪。 采样噪声:$\epsilon \sim \mathcal{N}(0, I)$从标准的高斯分布中采样一份噪声,用于后续得到到原始图片上。 梯度下降更新参数:计算预测噪声和真实噪声$\epsilon$的均方误差。 模型 本章节简要说一下业界文生图模型,其结构可以总结为以上3个部分,文本编码器、生成式模型、解码器。 文本编码器:将用户输入的文本提示通过预训练的文本编码器如CLIP Text Encoder将自然语言转化为向量表示。 生成式模型:将编码的文本向量和噪声图像noisy latent作为输入,然后逐步迭代去噪。这里的模型如有diffusion、autoregressive等。输出是压缩到更低维的"潜在空间"。 解码器:将生成式模型的输入Latten Representation通过解码器还原最终生成清晰图像。生成式模型一般输出的是压缩的低维潜在空间,这样可以降低每一步迭代的计算量,最终加一个解码器来将其还原。 下面是stable diffusion、DALL-E、Imagen的模型结构图,核心组成都是上面3个部分,这里就不过多阐述了。 stable diffusion DALL-E Imagen 本文主要来自李宏毅Diffusion Model原理解析的笔记。