最新文章
-
ubuntu系统xiaozhi server本地部署
简介 本文主要是记录在ubuntu系统从零源码的方式本地部署小智Ai服务端的过程,项目的地址为:xiaozhi-server。在部署之前简单了解一下其项目框架,这里总结可以分为3部分:manager-web、manager-api、xiaozhi-server,这3部分的运行是互相独立的,相互之间通过http rest api的方式进行访问,如下图: manager-web: 前端控制台(Vue)。管理员用浏览器操作;调用后端接口,不直接连设备。 manager-api: 后端管理服务(Java Spring Boot)。负责用户/设备/模型/参数/OTA/激活等业务,对外提供 REST API;对数据库(MySQL)与缓存(Redis)读写。 xiaozhi-server: 实时语音与智能体服务(Python)。负责 WebSocket 连接、ASR/LLM/TTS、工具/视觉接口;启动时向 manager-api 拉取配置、运行时上报对话。 3个组件分别使用了不用的语言环境,其中manager-web使用的是Vue.js,而manager-api使用的是java spring boot,xiaozhi-server使用的是python。因此需要装3个不同的语言环境。同时对于后端manager-api需要对数据进行存储,因此还需要安装mysql、redis。下面就围绕这3部分进行展开。 先在本地拉取一份代码: git clone https://github.com/xinnan-tech/xiaozhi-esp32-server.git 值得注意的时,xiaozhi server最简化版本安装,只需要安装xiaozhi-server即可,简化版部署见后续章节。 manager-api安装 数据库安装 由于后端的数据管理需要用到数据库,因此需要安装mysql、redis。 (1)mysql安装 # 安装MySQL sudo apt update sudo apt install -y mysql-server # 启动MySQL服务 sudo systemctl start mysql sudo systemctl enable mysql # 创建数据库 sudo mysql -e "CREATE DATABASE xiaozhi_esp32_server CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;" # 创建用户并指定认证方式(关键改动) sudo mysql -e "CREATE USER 'xiaozhi'@'localhost' IDENTIFIED WITH mysql_native_password BY 'xiaozhi123';" # 授权 sudo mysql -e "GRANT ALL PRIVILEGES ON xiaozhi_esp32_server.* TO 'xiaozhi'@'localhost';" sudo mysql -e "FLUSH PRIVILEGES;" mysql数据库安装后,同时也创建了用户和密码,分别是xiaozhi和xiaozhi123,这个后续需要填充到manaer-api的配置文件中,以便manager-api可以访问。 (2)安装Redis # 安装Redis sudo apt install -y redis-server # 启动Redis服务 sudo systemctl start redis-server sudo systemctl enable redis-server # 检查Redis状态 redis-cli ping Spring boot环境安装 因为后端程序manager-api使用的是java spring boot,因此需要安装java的运行环境。官方提示安装JDK21和Maven,前者是java的运行环境,后者是java项目管理工具。 # 安装JDK 21 sudo apt install -y openjdk-21-jdk # 设置JAVA_HOME环境变量 echo 'export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64' >> ~/.bashrc echo 'export PATH=$JAVA_HOME/bin:$PATH' >> ~/.bashrc source ~/.bashrc # 验证Java安装 java -version # 安装Maven sudo apt install -y maven # 验证Maven安装 mvn -version 配置数据库 数据库和java环境安装好后,就可以配置java spring boot与数据库的连接了。 在xiaozhi-esp32-server/main/manager-api/src/main/resources/application-dev.yml中配置数据库连接信息 @@ -13,8 +13,8 @@ spring: #MySQL driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/xiaozhi_esp32_server?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true - username: root - password: 123456 + username: xiaozhi + password: xiaozhi123 initial-size: 10 max-active: 100 min-idle: 10 在xiaozhi-esp32-server/main/src/main/resources/application-dev.yml中配置Redis连接信息(redis默认配置好了,不用改) spring: data: redis: host: localhost port: 6379 password: database: 0 编译运行 配置好对数据库的连接后,就可以进行编译了。 # 进入manager-api目录 cd xiaozhi-esp32-server/main/manager-api # 编译项目 mvn clean package -DskipTests # 编译完成后的jar包位置 ls -lh target/*.jar 编译完成之后,就可以运行项目了。 java -jar target/xiaozhi-esp32-api.jar --spring.profiles.active=dev 运行如果没有什么报错就说明启动成功了。 manager-web安装 安装node.js 由于前端使用的的是vue.js,所以需要安装node.js环境。 # 安装Node.js 20 curl -fsSL https://deb.nodesource.com/setup_20.x | sudo -E bash - sudo apt install -y nodejs # 验证Node.js安装 node -v npm -v 安装依赖 node.js环境安装好后,就可以安装manager-web的依赖了。 # 进入manager-web目录 cd xiaozhi-esp32-server/main/manager-web # 安装依赖 npm install 启动 切换到manager-web路径下,就可以运行服务程序了。 cd xiaozhi-esp32-server/main/manager-web npm run serve 启动成功之后,就可以访问后台了。登陆地址:http://127.0.0.1:8001,登陆后进行注册一个用户就可以进入到后台进行配置了。 配置模型api key 要让设备能够访问,需要配置模型的api key,登陆到智普的后台,注册获取一个api key。 这里使用的是智谱ai,注册一个账户,然后申请一个api key 然后登陆智控台配置密钥。 xiaozhi-server安装 conda python环境 # 下载并安装miniconda wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh bash Miniconda3-latest-Linux-x86_64.sh -b -p $HOME/miniconda3 # 初始化conda $HOME/miniconda3/bin/conda init bash source ~/.bashrc # 创建Python环境 conda remove -n xiaozhi-esp32-server --all -y conda create -n xiaozhi-esp32-server python=3.10 -y # 激活环境 conda activate xiaozhi-esp32-server # 添加清华源 conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge # 安装必要的系统库 conda install -y libopus ffmpeg libiconv python依赖包 # 进入xiaozhi-server目录 cd xiaozhi-esp32-server/main/xiaozhi-server # 设置pip镜像源 pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ # 安装Python依赖 pip install -r requirements.txt 下载语音模型 # 进入models目录 cd xiaozhi-esp32-server/main/xiaozhi-server/models # 下载模型文件(推荐阿里云镜像) cd SenseVoiceSmall wget https://modelscope.cn/models/iic/SenseVoiceSmall/resolve/master/model.pt # 验证文件 ls -lh model.pt 配置密钥 配置密钥主要是xiaozhi-server与manager-api交互时需要进行认证,因此需要先获取密钥。 (1)先在本地创建配置文件 # 创建data目录 cd xiaozhi-esp32-server/main/xiaozhi-server mkdir -p data # 复制配置文件 cp config_from_api.yaml data/.config.yaml # 编辑配置文件 vim data/.config.yaml 配置.config.yaml: manager-api: url: http://127.0.0.1:8002/xiaozhi secret: 待会从智控台获取 server: websocket: ws://你的IP:8000/xiaozhi/v1/ (2)然后登陆智控台获取密钥 访问智控台:http://127.0.0.1:8001 注册账号(第一个为超级管理员) 登录 → 参数管理 → 找到 server.secret 并复制 回到xiaozhi-server配置: vim xiaozhi-esp32-server/main/xiaozhi-server/data/.config.yaml 设置为 manager-api: url: http://127.0.0.1:8002/xiaozhi secret: 你刚才复制的server.secret值 启动服务 cd xiaozhi-esp32-server/main/xiaozhi-server conda activate xiaozhi-esp32-server python app.py 执行成功的话应该是下面这样 (xiaozhi-esp32-server) liumingyuan@HP-ProBook:~/xiaozhi-esp32-server/main/xiaozhi-server$ python app.py 从API读取配置 251029 20:47:34[0.8.5-00000000000000][core.providers.vad.silero]-INFO-SileroVAD 251029 20:47:34[0.8.5-00000000000000][core.utils.modules_initialize]-INFO-初始化组件: vad成功 VAD_SileroVAD 251029 20:47:38[0.8.5-00000000000000][core.providers.asr.fun_local]-INFO-funasr version: 1.2.3. 251029 20:47:38[0.8.5-00000000000000][core.utils.modules_initialize]-INFO-ASR模块初始化完成 251029 20:47:38[0.8.5-00000000000000][core.utils.modules_initialize]-INFO-初始化组件: asr成功 ASR_FunASR 251029 20:47:38[0.8.5-00000000000000][__main__]-INFO-视觉分析接口是 http://10.0.90.104:8003/mcp/vision/explain 251029 20:47:38[0.8.5-00000000000000][__main__]-INFO-Websocket地址是 ws://10.0.90.104:8000/xiaozhi/v1/ 251029 20:47:38[0.8.5-00000000000000][__main__]-INFO-=======上面的地址是websocket协议地址,请勿用浏览器访问======= 251029 20:47:38[0.8.5-00000000000000][__main__]-INFO-如想测试websocket请用谷歌浏览器打开test目录下的test_page.html 251029 20:47:38[0.8.5-00000000000000][__main__]-INFO-============================================================= 服务访问地址: 智控台:http://127.0.0.1:8001 API文档:http://127.0.0.1:8002/xiaozhi/doc.html WebSocket:ws://127.0.0.1:8000/xiaozhi/v1/ OTA接口:http://127.0.0.1:8002/xiaozhi/ota/ 配置websocket和OTA 由于是全模块部署,所以需要登陆智能控台,设置ota和websocket的接口,需要注意的是weboscket的启动必须是要等xiaozhi-server app启动才能设置。 OTA接口:http://你电脑局域网的ip:8002/xiaozhi/ota/ Websocket接口:ws://你电脑局域网的ip:8000/xiaozhi/v1/ 请你务必把以上两个接口地址写入到智控台中:他们将会影响websocket地址发放和自动升级功能。 1、使用超级管理员账号,登录智控台,在顶部菜单找到参数管理,找到参数编码是server.websocket,输入你的Websocket接口。 2、使用超级管理员账号,登录智控台,在顶部菜单找到参数管理,找到数编码是server.ota,输入你的OTA接口。 简化部署 所谓简化部署就是只跑xiaozhi-server,前后端都不跑。简化部署先参考"xiaozhi-server安装"章节,然后再次基础上进行配置文件即可。与完整部署xiaozhi-server部分唯一的区别就是配置文件不一样。如果要对接前后端使用的默认文件是config_from_api.yaml而如果是简化部署使用的默认文件是config.yaml。 下面是配置步骤。 cd xiaozhi-esp32-server/main/xiaozhi-server/data cp .config.yaml .config.yaml_back #对云端的配置作个备份 cp ../config.yaml .config.yaml #拷贝默认的配置 设置API key 测试 本地服务搭建好好后可以进行测试验证,可以使用xiaozhi-server自带的test程序,也可以使用开源的客户端py-xiaozhi,或者直接搭建esp32的设备接入。这里先用前面两者方式。 xiaozhi-server test cd xiaozhi-esp32-server/main/xiaozhi-server/test python -m http.server 8006 然后网页登陆:http://localhost:8006/test_page.html py-xiaozhi git clone https://github.com/huangjunsen0406/py-xiaozhi.git sudo apt-get update && sudo apt-get install -y portaudio19-dev libportaudio2 conda create -n py-xiaozhi-client python=3.10 conda activate py-xiaozhi-client 配置完成之后就可以执行应用获取到设备验证码之后,登陆绑定即可进行对话。 python main.py --protocol websocket -
Lekiwi驱动链路分析
系统架构 硬件组成 Lekiwi是一个底盘+机械臂的结构。 机械臂: 6个自由度(shoulder_pan, shoulder_lift, elbow_flex, wrist_flex, wrist_roll, gripper) 移动底盘:3个全向轮,三轮全向移动(left_wheel, back_wheel, right_wheel) github官网Lekiwi使用 Koch v1.1 机械臂、U2D2 电机控制器和 Dynamixel XL430 电机作为移动基座。我这里买到的使用的是feetech电机,机械臂和底盘一共9个motor接入到一个串口总线上,对于机械臂和底盘移动只需要通过一个串口总线进行。 软件架构 我这里将软件架构分为3层。 应用层:对设备的操作,实例化设备一个设备后,对设备进行连接,移动控制,观测数据获取。 总线层:实现一个MotorBus基类,对设备的一些操作进行统一定义、约束。实现操作的逻辑,具体的实现由继承设备来实现。 设备层:具体设备的实现,继承与MotorBus,实现对电机底层通信接口。 感觉这个框架不是很合理,主要的框架设计是设备层继承总线层的基类,继承相当于是扩展,应用层没有起到屏蔽设备的作用。在应用层要创建一个设备实例如self.bus = FeetechMotorsBus(),而FeetechMotorsBus继承MotorsBus,应用层直接操作FeetechMotorsBus实例,没有达到屏蔽底层的效果。可以完全借鉴Linux设备驱动模型,分为设备、总线、驱动(应用),设备注册后,总线负责匹配设备和驱动,完全隔离,不用关系底层的设备是什么。 驱动链路 初始化 class LeKiwi(Robot): config_class = LeKiwiConfig name = "lekiwi" def __init__(self, config: LeKiwiConfig): super().__init__(config) self.config = config norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100 self.bus = FeetechMotorsBus( port=self.config.port, motors={ # arm "arm_shoulder_pan": Motor(1, "sts3215", norm_mode_body), "arm_shoulder_lift": Motor(2, "sts3215", norm_mode_body), "arm_elbow_flex": Motor(3, "sts3215", norm_mode_body), "arm_wrist_flex": Motor(4, "sts3215", norm_mode_body), "arm_wrist_roll": Motor(5, "sts3215", norm_mode_body), "arm_gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), # base "base_left_wheel": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100), "base_back_wheel": Motor(8, "sts3215", MotorNormMode.RANGE_M100_100), "base_right_wheel": Motor(9, "sts3215", MotorNormMode.RANGE_M100_100), }, calibration=self.calibration, ) self.arm_motors = [motor for motor in self.bus.motors if motor.startswith("arm")] self.base_motors = [motor for motor in self.bus.motors if motor.startswith("base")] self.cameras = make_cameras_from_configs(config.cameras) 继承Robot,指定了配置类为LeKiwiConfig其定义了uart的端口、相机的编号等信息。将角度统一归一化到[-100,100],创建Feetech电机总线实例,创建Feetech电机型号sts3215,配置了机械臂电机ID为1至6,底盘编号为7至9编号。同时对相机也进行了初始化,用于后续的视觉观测。 连接 robot.connect()————> def connect(self, calibrate: bool = True) -> None: if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") self.bus.connect() if not self.is_calibrated and calibrate: logger.info( "Mismatch between calibration values in the motor and the calibration file or no calibration file found" ) self.calibrate() for cam in self.cameras.values(): cam.connect() self.configure() logger.info(f"{self} connected.") 初始化完成之后即可进行发起连接,连接主要是根据指定的串口好进行打开,然后进行握手验证,ping所有配置的电机,检查校准文件与电机的状态,并设置PID、加速度等参数。 校准 def calibrate(self) -> None: if self.calibration: # Calibration file exists, ask user whether to use it or run new calibration user_input = input( f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: " ) if user_input.strip().lower() != "c": logger.info(f"Writing calibration file associated with the id {self.id} to the motors") self.bus.write_calibration(self.calibration) return logger.info(f"\nRunning calibration of {self}") motors = self.arm_motors + self.base_motors self.bus.disable_torque(self.arm_motors) for name in self.arm_motors: self.bus.write("Operating_Mode", name, OperatingMode.POSITION.value) input("Move robot to the middle of its range of motion and press ENTER....") homing_offsets = self.bus.set_half_turn_homings(self.arm_motors) homing_offsets.update(dict.fromkeys(self.base_motors, 0)) full_turn_motor = [ motor for motor in motors if any(keyword in motor for keyword in ["wheel", "wrist_roll"]) ] unknown_range_motors = [motor for motor in motors if motor not in full_turn_motor] print( f"Move all arm joints except '{full_turn_motor}' sequentially through their " "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) for name in full_turn_motor: range_mins[name] = 0 range_maxes[name] = 4095 self.calibration = {} for name, motor in self.bus.motors.items(): self.calibration[name] = MotorCalibration( id=motor.id, drive_mode=0, homing_offset=homing_offsets[name], range_min=range_mins[name], range_max=range_maxes[name], ) self.bus.write_calibration(self.calibration) self._save_calibration() print("Calibration saved to", self.calibration_fpath) 首次校准,从归零到测量范围,生成并保存文件流程,后续使用按回车复用已有校准;输入 'c' 重新校准。可以总结为如下: LeKiwi.calibrate() → 用户选择(使用已有/重新校准) → 归零设置 → 运动范围测量 → 校准参数构建 → FeetechMotorsBus.write_calibration() → MotorsBus.write_calibration() → 逐个写入电机寄存器 校准主要是限定几个参数,如归零偏移、运动范围、驱动模式。校准的流程是由用户选择使用已经有的校准文件直接写入校准还是进行重新启动校准流程校准。 动作执行 def send_action(self, action: dict[str, Any]) -> dict[str, Any]: if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") arm_goal_pos = {k: v for k, v in action.items() if k.endswith(".pos")} base_goal_vel = {k: v for k, v in action.items() if k.endswith(".vel")} base_wheel_goal_vel = self._body_to_wheel_raw( base_goal_vel["x.vel"], base_goal_vel["y.vel"], base_goal_vel["theta.vel"] ) # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: present_pos = self.bus.sync_read("Present_Position", self.arm_motors) goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in arm_goal_pos.items()} arm_safe_goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) arm_goal_pos = arm_safe_goal_pos # Send goal position to the actuators arm_goal_pos_raw = {k.replace(".pos", ""): v for k, v in arm_goal_pos.items()} self.bus.sync_write("Goal_Position", arm_goal_pos_raw) self.bus.sync_write("Goal_Velocity", base_wheel_goal_vel) return {**arm_goal_pos, **base_goal_vel} 输入为动作的序列,输出为实际发送的动作。首先将机械臂(后缀.pos)目标位置和底盘目标速度(后缀.vel)进行分离,如下: # 分离前 action = { "arm_shoulder_pan.pos": 45.0, # 机械臂位置 "arm_elbow_flex.pos": -30.0, "x.vel": 0.1, # 底盘速度 "y.vel": 0.0, "theta.vel": 0.05 } # 分离后 arm_goal_pos = { "arm_shoulder_pan.pos": 45.0, "arm_elbow_flex.pos": -30.0 } base_goal_vel = { "x.vel": 0.1, "y.vel": 0.0, "theta.vel": 0.05 } 然后调用_body_to_wheel_raw进行底盘运动学转换,输入为底盘坐标系速度 (x, y, θ),输出为三轮电机速度指令。转换的时候需要进行安全限制,实际获取机械臂的关节位置,然后计算步幅(目标位置-当前位置),超过max_relative_target 则裁剪使用安全的目标位置。 最后就是调用sync_write分别写入控制机械臂和底盘。下面总结一下流程: LeKiwi.send_action(action) → 动作分离(机械臂位置 + 底盘速度) → 底盘运动学转换 → 安全限制检查 → FeetechMotorsBus.sync_write() → MotorsBus._sync_write() → 批量写入电机寄存器 对Lekiwi的控制,由于底盘是3个万向轮,所以需要进行运动学转换,现将机械臂和底盘进行分离,计算之处底盘坐标系转换的3轮速度,在确保安全限制的条件下,调用sync_write进行写入,sync_write是同时写入多个电机。 状态读取 def get_observation(self) -> dict[str, Any]: if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") # Read actuators position for arm and vel for base start = time.perf_counter() arm_pos = self.bus.sync_read("Present_Position", self.arm_motors) base_wheel_vel = self.bus.sync_read("Present_Velocity", self.base_motors) base_vel = self._wheel_raw_to_body( base_wheel_vel["base_left_wheel"], base_wheel_vel["base_back_wheel"], base_wheel_vel["base_right_wheel"], ) arm_state = {f"{k}.pos": v for k, v in arm_pos.items()} obs_dict = {**arm_state, **base_vel} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") # Capture images from cameras for cam_key, cam in self.cameras.items(): start = time.perf_counter() obs_dict[cam_key] = cam.async_read() dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") return obs_dict 首先调用sync_read分别读取机械臂位置和底盘的当前速度,然后调用底盘速度逆运动学转换(三轮电机->底盘坐标系速度(x, y, θ)),接着将机械臂位置和底盘转换的坐标系速度整合,再次就是对相机图像的待机,先遍历所有配置相机,将图像的数据添加到观测字段,最终再进行整合得到观测字典。 obs_dict = { # 机械臂位置 "arm_shoulder_pan.pos": 45.2, "arm_shoulder_lift.pos": -12.8, # ... 其他关节 # 底盘速度 "x.vel": 0.15, "y.vel": -0.08, "theta.vel": 0.12, # 相机图像 "camera_1": numpy_array, # (H, W, 3) "camera_2": numpy_array, # (H, W, 3) } 下面是总结流程 LeKiwi.get_observation() → FeetechMotorsBus.sync_read() → MotorsBus._sync_read() → 批量读取电机状态 → 底盘速度逆运动学 → 相机图像采集 获取状态与动作执行相反,首先读取到电机的状态,然后通过逆运动学,将底盘的速度转换为坐标系。 -
lekiwi+Orin Nano环境搭建
环境准备 简要记录在Orin nano平台搭建lekiwi环境,可以远程遥控底盘移动和机械臂示教的过程,需要的硬件如下: - NVIDIA Jetson Orin Nano开发板 - Lekiwi套件(底盘、主从机械臂) - PC,预装好Ubuntu系统 组装硬件 将底盘、主从机械臂、Orin nano组装好,需要注意的是由于官方默认使用的计算平台是树莓派,所以默认提供的供电接口是USB 5V。我们这里使用的Orin Nano平台,使用的是DC5525电源接口,因此需要提前购买准备DC5521 to DC5525的转接线。 详细组装可以参考 (1)https://github.com/SIGRobotics-UIUC/LeKiwi/blob/main/Assembly.md (2)https://huggingface.co/docs/lerobot/so101#step-by-step-assembly-instructions Orin nano 首先,首次启动先接上键盘、鼠标、显示器登录配置好网络和VNC。 sudo apt update sudo apt install vino # 设置vino开机自启 mkdir -p ~/.config/autostart cp /usr/share/applications/vino-server.desktop ~/.config/autostart/. cd /usr/lib/systemd/user/graphical-session.target.wants sudo ln -s ../vino-server.service ./. # 调整共享/认证设置 gsettings set org.gnome.Vino prompt-enabled false gsettings set org.gnome.Vino require-encryption false # 设置密码,默认thepassword gsettings set org.gnome.Vino authentication-methods "['vnc']" gsettings set org.gnome.Vino vnc-password $(echo -n 'thepassword'|base64) # 然后重启就可以使用VNC viewer访问了 sudo reboot 其次,安装conda 环境,由于Jetson架构是aarch64,所以下载miniconda aarch64的版本。 wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-aarch64.sh sh Miniconda3-latest-Linux-x86_64.sh source ~/.bashrc PC ubunut 同理安装conda环境,与jetson不一样的是,PC是X86架构。 wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh sh Miniconda3-latest-Linux-x86_64.sh source ~/.bashrc 软件安装 代码下载 在Orin nano和PC ubuntu上各自从github上克隆开源代码。 git clone https://github.com/huggingface/lerobot.git 配置安装 首先,在Orin nano和PC ubuntu上各自创建lekiwi的环境。 conda create -n lekiwi python=3.10 conda activate lekiwi 其次,切到lerobot环境下进行安装 cd lerobot pip install -e ".[feetech]" conda install -c conda-forge ffmpeg=7.1.1 遥操作 标定 需要给主臂、从臂进行标定,以限制关节的最大运动范围。在标定前,可以先看看视频怎么操作机械臂https://huggingface.co/docs/lerobot/so101#calibrate (1)从臂的运行命令如下 lerobot-calibrate \ --robot.type=lekiwi \ --robot.id=R1225280 \ --robot.cameras='{handeye: {type: opencv, index_or_path: 0, width: 640, height: 360, fps: 30}}' 标定的文件 (2)主臂的运行命令如下: lerobot-calibrate \ --teleop.type=so101_leader \ --teleop.port=/dev/ttyACM0 \ --teleop.id=R07252801 (3)标定好后会在下面路径存储标定的参数 # orin nano平台 ~/.cache/huggingface/lerobot/calibration/robots/lekiwi/R1225280.json # ubuntu平台 ~/.cache/huggingface/lerobot/calibration/teleoperators/so101_leader/R07252801.json 遥控 (1)由于PC和lekiwi之前的遥控使用的是gRPC,所以需要先安装zmq,否则会报错。 pip install zmq (2)orin nano启动命令,host.connection_time_s设置的是时间(单位是秒),超过这个时间会自动断开。 python -m lerobot.robots.lekiwi.lekiwi_host \ --robot.id=R1225280 \ --robot.cameras='{handeye: {type: opencv, index_or_path: 0, width: 640, height: 360, fps: 30}}' \ --host.connection_time_s=300 (3)PC ubuntu需要修改示例代码,修改点如下: diff --git a/examples/lekiwi/teleoperate.py b/examples/lekiwi/teleoperate.py index 6b430df4..cb4ad415 100644 --- a/examples/lekiwi/teleoperate.py +++ b/examples/lekiwi/teleoperate.py @@ -18,20 +18,20 @@ import time from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop, KeyboardTeleopConfig -from lerobot.teleoperators.so100_leader import SO100Leader, SO100LeaderConfig +from lerobot.teleoperators.so101_leader import SO101Leader, SO101LeaderConfig from lerobot.utils.robot_utils import busy_wait from lerobot.utils.visualization_utils import init_rerun, log_rerun_data FPS = 30 # Create the robot and teleoperator configurations -robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="my_lekiwi") -teleop_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem585A0077581", id="my_awesome_leader_arm") +robot_config = LeKiwiClientConfig(remote_ip="192.168.0.33", id="my_lekiwi") +teleop_arm_config = SO101LeaderConfig(port="/dev/ttyACM0", id="R07252801") keyboard_config = KeyboardTeleopConfig(id="my_laptop_keyboard") # Initialize the robot and teleoperator robot = LeKiwiClient(robot_config) -leader_arm = SO100Leader(teleop_arm_config) +leader_arm = SO101Leader(teleop_arm_config) keyboard = KeyboardTeleop(keyboard_config) 将SO100改成SO101,因为我们的主臂使用的是SO101 将remote_ip改成jetson nano的ip 配置好SO101的uart端口和标定的文件 (4)PC ubunut执行命令 python examples/lekiwi/teleoperate.py 之后就可以使用键盘和主臂进行遥控操作了。 Key Action W 前进 S 后退 A 左移 D 右移 Z 左转(逆时针) X 右转(顺时针) R 加速一档 F 减速一档 -
Jetson Orin Nano环境搭建
安装浏览器 sudo apt update sudo apt install chromium-browser -y 安装后发现点击浏览器会没反应。按照下面方法配置。 snap download snapd --revision=24724 sudo snap ack snapd_24724.assert sudo snap install snapd_24724.snap sudo sudo snap refresh --hold snapd 配置VNC sudo apt update sudo apt install vino 然后配置 步骤1: 设置开机自启 对于 LXDE 桌面(例如 2 GB 版本的 Jetson Nano) mkdir -p ~/.config/autostart cp /usr/share/applications/vino-server.desktop ~/.config/autostart/. 对于 GNOME 桌面: cd /usr/lib/systemd/user/graphical-session.target.wants sudo ln -s ../vino-server.service ./. 步骤2:调整共享/认证设置 gsettings set org.gnome.Vino prompt-enabled false gsettings set org.gnome.Vino require-encryption false 步骤3:然后设置密码 # Replace thepassword with your desired password gsettings set org.gnome.Vino authentication-methods "['vnc']" gsettings set org.gnome.Vino vnc-password $(echo -n 'thepassword'|base64) 上面登录密码设置的是thepassword 步骤4: Reboot the system so that the settings take effect sudo reboot SSH ssh user@ip 输入密码就可以登录进去了。 conda环境 wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-aarch64.sh sh Miniconda3-latest-Linux-x86_64.sh source ~/.bashrc 注意这里是aarch64 -
为什么AlphaGo能自学围棋?强化学习基本概念
强化学习简介 什么是强化学习 以直升机控制飞行的程序来举例。 自动驾驶的直升机配备了机载计算机、GPS、加速度计、陀螺仪和磁罗盘,我们可以实时确定的知道直升机的位置。如何使用强化学习来让直升机飞行了? 在强化学习中,将直升机的位置、方向和速度等称为状态s,因此我们的目标或任务就是从直升机的状态映射到动作a的函数,意思是将两根控制杆推多远才能保持直升机在空中平衡、飞行而不坠毁。 要获取到动作a可以使用监督学习来训练神经网络,直接学习从x(状态a)到y(动作a)的映射,但事实证明直升机在控制移动时实际上时摸棱两可的,不好判断正确的行动是什么?是向做倾斜一点还是倾斜很多,还是稍微增加直升机的压力?要获得x的数据集和理想的动作y实际上是很困难的。因此对于控制直升机和其他机器人的许多任务,监督学习方法效果不佳,我们改用强化学习。 强化学习的关键输入称为reward(奖励),它告诉直升机何时表现良好,何时表现不佳。强化学习就像是在训练狗,你不知道狗会做出什么行为?但是我们可以判断狗行为的好坏,如果狗的行为是好的我们就就认为他是一个好狗给予奖励,如果它做出的行为不达我们的预期我们认为就是一个坏狗给予惩罚。我们期望它能自己学会如何做出好的动作行为而少做坏的动作行为。 火星探测器 火星探测器从初始状态运动到最终状态(terminal state),最终会得到一个累计的分数(return),每运动一次都会有相应的奖励reward。 当前火星探测器的初始位置在state 4,它可以向左也可以向右。 场景1:state 4(初始)(获得0)->state 3(获得0)->state 2(获得0)->state 1(获得100);最终得100。 场景2:state 4(初始)(获得0)->state 5(获得0)->state 6(获得40);最终得40。 场景3:state 4(初始)(获得0)->state 4(获得0)->state 4(获得0)->state 3(获得0)->state 2(获得0)->state 1(获得100);最终得100。 在每个时间步长上,机器人处于某种状态,我们称为S,它开始选择一个动作a,然后就获得一些奖励(reward),奖励值它从该状态获取,同时了它会切换为一个新的状态S’。公示表示为$(s,a,R(s),s')$,举个具体的例子,机器人处于状态4并采取行动时往左边走没有获得奖励(4,<-,0,3)。核心点就是状态(state)、动作(action)、奖励(reward)、下一个状态(next state),基本上就是每采取新动时都会发生情况,这就是强化学习算法要决定如何采取行动考虑的核心要素。 回报return 采取的行动会经历不同的状态以及如何享受不同的奖励,但是怎么去衡量一组特定的奖励比另外一组奖励好还是差了? 打个比方炒股方案1是你今天买入一支股票A然后后天就卖出了赚了1000块,而方案2你今天买入一支股票但是后天你亏了500,但是到第三天你赚了6000,那你更愿意追求那种方案了?虽然是第三天才显示有收益,但是第三天收益很高,显然会选择方案2。 为了计算这个回报分数,我们对动作后的奖励进行累加,但是了随着越往后的动作,我们需要添加一个折扣因子,也就是说越往前的比例系数越高,但是越往后所占的比例折扣就越大。如上图假设折扣因子(discount factor)为0.9,那么R1的折扣因子是1,R2的折扣因子就是0.9,R3的折扣因子就是0.9的平方依此类推。 最终获得的回报(return)取决于奖励(reward),而奖励取决于你采取的行动,因此回报取决于你采取的行动。 如折扣因子是0.5,如果从状态5向左走回报是6.25,如果是状态4向左走回报是12.5,而如果你状态就在6那么回报就是60。 总结一下强化学习的回报是系统获得的奖励总和,但需要加上折扣系数,每一个时间步的权重不一样,时间步越大奖励的系数结果就越小。 Policy 在强化学习中,可以通过很多不同的方式采取行动,比如我们可以决定始终选择更接近的奖励,如果最左边的奖励更接近则向左走,如果最右边的奖励更接近则向右走。当然我们也可以换种策略,始终追求更大的奖励等等。 策略Pi就是希望我们在那种状态下采取什么样的行动。state———>action。强化学习的目标就是要找到一个策略Pi,告诉在什么状态应该采取什么行动,执行每个状态以最大化回报。 小结 上面涉及到的概念,状态、动作、奖励、折扣因子、回报、策略。强化学习的目标就是让策略选择一个好的行动以获取最大的回报,这个决策过程被称为马尔可夫决策过程(MDP)。MDP指的是未来仅取决于当前的状态,而不取决于进入当前状态之前可能发生的任何事情,换句话说在马尔可夫决策过程中,未来取决于你现在所处的位置,而不取决于你是如何达到这里的;再换一种就是MDP是我们有一个机器人,我们要做的是选择动作a,根据这些动作、环境中发生的事情执行科学的任务。 State-action value funciton 状态动作函数Q 表示在状态s下,执行 动作a后,智能体所能获得的期望累计回报(Expected Return)。换句话说Q值衡量的是:"如果我在当前状态下做这个动作,长期来看能赚多少奖励?"。 计算$Q(s,a)$是强化学习算法的重要组成部分。也就是i说有办法计算s的Q,a,对于每个状态和每个动作,那么当你处于某个状态时,你所要做的就是看看不同的动作A,然后选择动作A,那就最大化s的Q,a。求Q的最大值。 贝尔曼方程 如何计算状态动作价值函数Q of S,A了? 在强化学习中有一个名为贝尔曼方程帮助我们解决计算状态动作值函数。下面来看看示例。 上图中有两个示例当在状态2,向右移动时,那么Q(2,->)=R(2)+0.5 max Q(3,a'0)=0+(0.5)25=12.5。当在状态4,向左移动时,那么Q(4,<-)=R(4)+0.5max Q(3,a')=0+(0.5)25=12.5。 贝尔曼方程体现的核心思想是:"一个状态的价值 = 即时奖励 + 后续状态的折扣价值"。也就是说,当前决策的好坏取决于当下奖励 + 未来的期望收益。 Continuous State Space continuous state space示例 对于火星车可能只有一个单一的状态1~6,对于下面的卡车来说有很多个状态,比如x,y,角度等等。而对于直升机来说又有更多参数集合的状态。 因此状态不仅仅是少数可能离散值的一个,它可以是一个数字向量。 月球着陆器 月球着陆车有很多状态变量,如上图所示。奖励函数我们可以设计成下面这样。 对于月球着陆器要学习的策略pi就是输入s,通过策略得到a,然后计算出最大的return。 学习Q 在状态s处,使用神经网络来计算4个动作中,那个动作的Q(s,nothing),Q(s,left),Q(s,main),Q(s,right)最大,然后选择那个最大值的动作。 那如何获得一个包含X和Y值的训练集,可以进行训练神经网络了?这就需要用到贝尔曼方程如上图,我们可以把贝尔曼方程的左边命名为X,右边命名为Y,神经网络的输入是一个状态和动作对。而输出Y就是就是Q。神经网络学习的就是X到Y的映射。 改进的神经网络架构 对于火星车前面的神经网络对应需要推理4次,即Q(s,nothing),Q(s,left),Q(s,main),Q(s,right),这样效率比较低,那么可以对算法进行改进推理一次直接输出4个动作的对应的Q。下面就是修改后的神经网络架构。 算法改进:ϵ贪婪策略 总结 学习流程 智能体观察当前状态 $s_t$ 根据策略 $\pi(a|s_t)$ 选择动作 $a_t$ 环境返回奖励 $r_t$ 和下一状态 $s_{t+1}$ 智能体根据反馈更新策略或价值函数 这一过程可用 马尔可夫决策过程(MDP) 表示: $$ [ \text{MDP} = (S, A, P, R, \gamma) ] $$ 其中: - 参数$S$:状态集合 - 参数$A$:动作集合 - 参数$P(s'|s,a)$:状态转移概率 - 参数$R(s,a)$:奖励函数 - 参数$\gamma$:折扣因子(0~1) 算法 (1)算法分类 类别 特征 代表算法 基于价值(Value-based) 学习 Q 值,间接得到策略 Q-learning、DQN 基于策略(Policy-based) 直接优化策略函数 REINFORCE、PPO Actor-Critic 混合 同时学习策略(Actor)和值函数(Critic) A2C、A3C、DDPG、SAC 基于模型(Model-based) 显式学习环境动态模型 Dyna-Q、Dreamer、MuZero (2)算法演进 阶段 特征 代表算法 传统表格法(Tabular RL) 离散状态空间,值表更新 Q-Learning, SARSA 深度强化学习(Deep RL) 神经网络逼近 Q 函数 DQN, Double DQN, Dueling DQN 连续动作控制(Continuous Control) 针对机械臂、无人车等连续控制问题 DDPG, TD3, SAC 策略梯度类(Policy Gradient) 直接优化策略参数 REINFORCE, PPO, TRPO 基于模型的RL(Model-based RL) 同时学习环境模型 + 策略 MuZero, DreamerV3 模仿学习 / 具身智能结合 利用演示或视觉模仿学习 GAIL, BC, VLA, RT系列 参考:本文主要来之吴恩达强化学习笔记 -
机器人全身控制浅谈:理解 WBC 的原理
概念 WBC(Whole-Body Control,全身控制)是什么?机器人是由“各关节”组成的,其不是“各关节各玩各的”而是一个耦合的整体。在某个时刻可能要做很多事情,比如保持平衡(重心别出圈)、手/脚要动作到目标位置、躯干姿态不能乱、关节不能超限、脚下不能打滑。这些都是一系列任务的组合。 WBC的核心就是把这些任务(目标)和约束(物理/安全)写进一个小型优化问题,在每个控制周期(几百hz~1Khz)求解,得到“当下这毫秒,各关节应该怎么动/用多大力”。 一句话总结就是WBC就是用优化的方法求解出要给“关节多少力“”以便让机器的各个关节一起配合完成多个目标,且不违反物理与安全约束。 原理 动力学方程 要解释WBC的原理,那必须绕不开动力学方程,这里就先对动力学方程做个简单介绍。 $$ M(q)\dot{v} + h(q,v) = S^T \tau + J_c^T \lambda $$ 配合接触约束: $$ J_c v = 0,\quad \lambda \in \text{摩擦锥} $$ 通俗理解公式就是:“惯性 × 加速度 + 自然出现的力 = 电机能给的力 + 地面/物体的反力” 公式左边:机器人自身的自然物理 变量$M(q)\dot{v}$:惯性项。$M(q)$是质量矩阵,描述机器人在不同姿态下的惯性特性。$\dot{v} $是广义加速度(关节加速度或身体的加速度);其意义就像$F=ma$里面的$ma$,表示加速一个有惯性的物体需要的力 变量$h(q,v)$:重力+速度相关项(科氏力、离心力)。如果机器人静止,这里主要就是重力。如果在运动,这里就会出现"速度带来的额外力",类似开车转弯时身体被甩出去的感觉。 公式右边:外界能提供的“驱动力” 变量$S^T \tau$:电机能施加的关节力矩。$\tau$电机产生的力矩(控制器的输出),$S^T$选择矩阵,把电机力矩映射到广义坐标系中。 变量$J_c^T \lambda$:接触点反力。$\lambda$来自地面或物体的力(约束反作用力),$J_c$接触点的雅可比,把关节速度映射到接触点速度。$J_c^T \lambda$把“地面推脚的力”转换回“关节的力”。 直观记忆就是类比现实生活中的推车子: 你推车:电机力矩$\tau$。 地面支撑车子:接触反力$\lambda$。 车子有质量,要加速就得克服惯性:$M(q)\dot{v}$。 重力和转弯的惯性: $h(q,v)$。 接触力约束 还要满足接触力约束$J_c v = 0,\quad \lambda \in \text{摩擦锥}$,其中$J_c v = 0$意义是接触点速度为0,比如机器人脚贴在地上不滑、不穿透;$\quad \lambda \in \text{摩擦锥}$意义是接触力必须满足摩擦模型,脚不会不穷大摩擦,力要在摩擦椎范围内。 公式中涉及到一个雅可比,什么是雅可比$Jc$? 假设有一个机械臂: 关节角度:就像是控制的"按钮"。 手末端的位置:就是最终关心的"结果"。 那么问题就是关节角度动一动,末端的位置会怎么动了?这个"关节空间的微小变化"影响到“末端空间的微小变化”。这样一个映射关系,就是雅可比矩阵$J$。 图中红色箭头表示关节角度的小变化$\Delta \theta_1 , \Delta \theta_2$。红色箭头的变换导致绿色箭头末端位置的变化:$ \Delta x , \Delta y$。雅可比矩阵 $J$ 就是把 $$ \begin{bmatrix}\Delta \theta_1 \ \Delta \theta_2\end{bmatrix} \longrightarrow \begin{bmatrix}\Delta x \ \Delta y\end{bmatrix} $$ 因此如果想要末端动多少就用$J$,想算末端力传回关节多少就用$J^T$。 总结一下雅可比$J$就是关节空间和任务空间的桥梁,作用就是我们关节动多少,末端/接触点动多少。 动力学方程在WBC中的用处? 动力学方程是机器人身体运动的"牛顿定律",我们来看看WBC的目标是什么?WBC不只是让机器人"走"或"站",而是要全身协调,比如要去抓杯子,脚要保持不滑,躯干要保持平衡,关节力矩不能超过电机限制。所以WBC的本质是解一个优化问题,找出一组关节力矩$\tau$,既能完成任务目标,又满足动力学方程和约束。 优化问题 WBC的核心思路是把机器人全身的目标任务转化为优化问题,在满足物理规律和约束条件的前提下,求出最合适的一组关节力矩$\tau$。 具体一点在WBC中求解的决策变量通常是如下三个: 最优的关节力矩$\tau$。 接触点反力$\lambda$。 机器人下一步的加速度$\dot{v} $。 优化问题转换为数学的目标函数如下: $$ \min_{x} | J_{\text{task}} \dot{v} - \dot{v}_{\text{des}} |^2 + | \tau |^2 + | \lambda |^2 $$ 公式中$J_{\text{task}}\dot{v} - \dot{v}{\text{des}}$表示实际加速度与期望加速度的误差,$J{\text{task}} \dot{v}$是在当前关节加速度下,末端/任务空的实际加速度,而$\dot{v}_{\text{des}}$是我们期望任务空间实现的期望加速度(比如手往前加速 0.5 m/s²,质心保持 0 加速度)。 同时优化问题还要满足以下约束条件: 动力学约束:$M(q)\dot{v} + h(q,v) = S^T \tau + J_c^T \lambda$。这是硬约束,控制器必须遵守。 接触约束:$J_c v = 0$,接触点不能乱动(不滑、不穿透)。 摩擦约束:$\lambda \in \mathcal{K}_{\text{fric}}$,接触力必须符合摩擦模型(不能无限大)。 力矩限制:$\tau_{\min} \leq \tau \leq \tau_{\max}$。 总结一下优化问题的目标函数意思就是要满足任务误差最小化(手/身体/质心的加速度跟踪目标),同时要满足能量或力矩最小化(不能浪费力),同时满足接触力正则化(力要稳定不能乱跳)。 方程有了,怎么求解了? 这个目标函数是一个二次型,符合QP,所以可以用现成的QP求解器来解,例如:OSQP、qpOASES、Gurobi(商业求解器)、CPLEX(商业求解器)、CVXPy(Python 封装,常用于原型),这里就不过多阐述了。 总结一下WBC核心就是要解决一个优化问题:二次目标(误差最小 + 力矩正则) + 动力学/接触/摩擦/限幅约束。其求解的方式通常使用QP 求解器(实时、高效、全局最优)。求解的结果是关节力矩$\tau$(给电机执行),同时还得到加速度$\dot{v}$和接触力$\lambda$。 示例 接下来我们调用cvxpy库看看示例,直观体验一下。 import cvxpy as cp import numpy as np # ---- 机械臂参数 ---- l1, l2 = 1.0, 1.0 m1, m2 = 1.0, 1.0 theta1, theta2 = np.deg2rad(45), np.deg2rad(30) # ---- 雅可比(末端位置对关节的导数)---- J_task = np.array([ [-l1*np.sin(theta1) - l2*np.sin(theta1+theta2), -l2*np.sin(theta1+theta2)], [ l1*np.cos(theta1) + l2*np.cos(theta1+theta2), l2*np.cos(theta1+theta2)] ]) # ---- 动力学质量矩阵 M(简化版)---- M = np.array([ [m1*l1**2 + m2*(l1**2 + l2**2 + 2*l1*l2*np.cos(theta2)), m2*(l2**2 + l1*l2*np.cos(theta2))], [m2*(l2**2 + l1*l2*np.cos(theta2)), m2*l2**2] ]) h = np.zeros(2) # 忽略重力/科氏项 # ---- 接触约束:假设末端y方向不能动(竖直方向约束)---- Jc = np.array([[0, 1]]) @ J_task # 取末端y方向的行 # ---- 变量 ---- ddq = cp.Variable(2) # 关节加速度 tau = cp.Variable(2) # 力矩 lam = cp.Variable(1) # 接触力 (竖直反作用力) # ---- 期望任务加速度(末端x方向=1.0, y方向=0.0)---- xddot_des = np.array([1.0, 0.0]) # ---- 目标函数:末端任务 + 力矩/接触力正则 ---- objective = cp.Minimize( cp.sum_squares(J_task @ ddq - xddot_des) + 0.01*cp.sum_squares(tau) + 0.01*cp.sum_squares(lam) ) # ---- 约束 ---- constraints = [ M @ ddq + h == tau + Jc.T @ lam, # 动力学方程 Jc @ ddq == 0, # 接触点不加速 tau >= -10, tau <= 10, lam >= 0 # 接触力必须向上推 ] # ---- 求解 ---- prob = cp.Problem(objective, constraints) prob.solve() print("Optimal joint accelerations:", ddq.value) print("Optimal torques:", tau.value) print("Optimal contact force λ:", lam.value) print("End-effector acc achieved:", J_task @ ddq.value) print("Desired end-effector acc :", xddot_des) 上面的示例中可以分为几部分: (1)任务 末端(手)在水平 $x$ 方向产生 $1.0 \text{m/s}^2$ 的加速度。在垂直 $y$ 方向不要加速(因为手撑在桌子上,不应该离开桌面)。 数学写法: $$ \ddot{x}_{des} = [1.0,0.0]^T $$ (2)决策变量 优化器要决定的量是 $$ \ddot{q} = [\ddot{\theta}_1,\ddot{\theta}_2]^T, \quad \tau = [\tau_1,\tau_2]^T, \quad \lambda $$ (3)要优化的目标函数 最小化 $$ \min_{\ddot{q},\;\tau,\;\lambda}| J_{\text{task}} \ddot{q} - \ddot{x}_{des} |^2+ 0.01 |\tau|^2+ 0.01 |\lambda|^2 $$ (4)约束条件 动力学约束:$M(q)\ddot{q} + h(q,\dot{q}) = \tau + J_c^T \lambda$ 接触约束:$J_c \ddot{q} = 0$ 力矩范围:$-10 \leq \tau_i \leq 10$ 接触力非负:$\lambda \geq 0$ (5)输出结果 最后调用 prob = cp.Problem(objective, constraints) prob.solve() 求解得出结果: Optimal joint accelerations: [ 0.50615867 -1.88900987] Optimal torques: [-1.12977187 -0.94450493] Optimal contact force λ: [1.29515155e-23] End-effector acc achieved: [ 9.77823461e-01 -5.00938335e-17] Desired end-effector acc : [1. 0.] QP 解出来的就是 最优关节加速度:$\ddot{q}$ 最优关节力矩:$\tau$ 接触反力:$\lambda$ 实际末端加速度:$\ddot{x} = J_{\text{task}} \ddot{q}$ 并与期望值 $\ddot{x}_{des}$ 对比。 上面的代码中只截取了关键部分,下面是绘制图像的效果如下,可以直观体会看看: WBC与MPC 上一篇文章我们分析了MPC,那么MPC与WBC什么关系了? MPC在WBC之上,MPC作为决策层做"未来几步的规划",比如预测未来1S内,质心应该怎么移动,脚该放哪里,其输出的是期望的任务轨迹。WBC是在下层,拿到MPC给的任务(目标加速度/姿态/接触序列)在动力学和接触约束下,求解QP得到当下这一瞬间的关节力矩$\tau$。简而言之MPC决定“机器人未来要往哪走”,WBC决定“当前每个关节该怎么出力”。 举个例子:双足机器人走路 MPC:优化未来 1 秒的 质心轨迹、摆腿位置、支撑相切换。输出:期望的 $\ddot{x}_{des}$(质心加速度)、脚的落点计划。 WBC:把这些期望当作任务输入,解 QP ,输出关节力矩 $\tau$,同时计算接触力 $\lambda$,保证机器人在每一步不摔倒。 -
机器人控制利器:MPC入门与实践解析
背景 MPC(Model Predictive Control)模型预测控制,是一种控制方法,广泛应用在机器人、无人驾驶、过程控制、能源系统等领域。它的核心思想用一句话来总结:利用系统模型预测未来,并通过优化选择当前最优的控制输入。 如上图是一个MPC应用框图,先来看看框图中的各个变量。 r(t):参考输入,系统希望达到的目标(设定值/reference signal),例如机器人要达到的位置、温度控制的目标值、车辆期望的速度等等。 e:误差e(t) = y(t) – r(t),用来计算实际的输出期望值的差距。 μ(t):控制输入,由MPC控制器计算出来,施加在系统上的控制量。 y(t):系统实际输出,比如位置、速度、温度等。 系统的目标是由MPC控制器计算输出一个μ(t)控制量,然后作用到系统,让系统的输出能够达到期望值。其中有一个反馈回路,形成一个闭环系统,实际的输出y(t)会反馈给比较器,与目标值对比,当未达到目标时,系统自动修正直到达到目标。 原理 系统模型 为了简单,我们先令系统的实际输出$y=x$,下面用数学建模来描述系统状态模型是 $$ x_{k+1} = A x_k + B u_k $$ 参数$x_k$:系统在时刻$k$的状态,比如汽车的位置和速度[p,v]。 参数$u_k$:系统在时刻$k$的控制输入,比如油门大小、方向盘角度、电机电压等。 A:状态的转移矩阵,决定了系统状态的演化方式。 B:输入矩阵,描述了控制输入如何影响状态的变换。 参数$x_{k+1}$:系统在下一时刻$k+1$的状态。 A和B是两个矩阵,A决定系统 在没有控制输入时,状态如何随时间演化。B决定控制输入$u_k$如何影响的状态。 上面的模型是线性模型,本文以此来进行分析。但是在实际场景中根据实际问题进行建模,模型可能是非线性的如下,这里就过多解释。 $$ x_{k+1} = f(x_k,u_k) $$ 预测未来 MPC原理就是从当前状态$x+k$出发,用模型递推,预测出未来的N步(如果看过之前关于ACT原理,其实MPC有很多类似之处): $$ x_{k+1},x_{k+2},x_{k+3},......,x_{k+N} $$ 目标函数 工程中我们最主要的目标是要求出控制量$u_t$以便让系统最终调整到我们预期的状态。 那如何来设计这个系统了? 前面我们建模了$k$时刻的状态$x_t$,那么这个状态+动作需要满足什么样的数学关系了? 与深度学习类似,我们要对状态+动作的数学关系求一个最小值的表达式。看公式: $$ J = \sum_{i=0}^{N-1} \Big[ (x_{k+i} - x_{k+i}^{\text{ref}})^T Q (x_{k+i} - x_{k+i}^{\text{ref}}) + u_{k+i}^T R u_{k+i} \Big] $$ 上面的公式可以分为两部分,前面部分代表的是k时刻输出状态与目标状态误差值,后面部分是控制的动作。也就是说我们最核心的是要误差越小越好,同时控制的动作不要太大。误差小好理解,动作变化不要太大是因为要保证控制动作要平滑。两部分都是求平方$e^2$和$u^2$放大比例(跟深度学习中的损失差不多),同时各自有一个权重Q和R,这两个参数是权重参数可调,Q用来平衡快速到目标,R用来调节平衡动作要平稳。 公式中$x$和$J$可以说是已知的,那么就可以求出$u$控制量了。 对于求解$u$,需要根据实际的模型,如果模型是线性模型+二次目标函数这样就比较简单使用二次规划QP可以快速解析或数值求解。如果是非线性模型,那么就变成非线性规划NLP,需要迭代求解器(如SQP、IPOPT)。当然如果是简单还可以使用穷举控制序列$U$,直接算$J$取最小。 约束条件 约束条件是在求解代价函数最小值过程中,显示的对控制量$u$、$x_k$做约束。这样的目的是比如对于车来说油门不能超过100%,速度不能超速,机械臂必能超过关节限位等。 因此在求救$J$时,可以添加约束条件,如下: $$ u_{\min} \leq u_k \leq u_{\max};x_{\min} \leq x_k \leq x_{\max} $$ 第一个是 控制输入约束(油门、方向盘角度不能无限大)。第二个是 状态约束(位置、速度等不能超过物理限制)。 工作流程 MPC控制系统中,其工作流程最核心的滚动时域控制,其核心点是每次预测出N步,但是并不是一下就全部执行完N步,因为预测也是有偏差同时在执行过程中会有变化。而是每次预测N步,但是只取第一步进行执行,然后根据新的输出结果重新预测下一个N步。这个其实跟ACT的时间集成有点类似,也就是在每个时间刻都会预测N步,而MPC是取第一步执行,而ACT是取k时刻和此前时刻的加权平均。 下面来看看具体的执行流程: 当前时刻$K$:系统处于某个状态如蓝线上面的红点,控制器采集到当前状态作为优化的起点。 预测未来N步:橙色窗口覆盖未来N步的时间区;红色虚线显示预测的未来轨迹,如果采用这一串控制动作$U=[u_k,u_{k+1}...]$系统怎么走。 优化并得到最优控制序列:在预测窗口里,MPC通过解$J$(误差+控制代价)最小值得到结果一整串最优控制动作序列$U$。 只执行一步:红色箭头指向当前窗口里的第一个控制输入$u_k$,下方蓝色阶梯控制曲线更新,系统真是状态推进到下一个时刻;上方状态曲线的红点更新到新的位置。 丢弃其余动作:窗口里的控制动作$(u_{k+1},u_{k+2}...)$丢弃,因为下一时刻会重新优化得到新的控制动作。 窗口前移,重复循环:橙色预测窗口右移一格$(k->k+1->k+2)$,控制器基于新状态重新预测、重新优化,再次执行第一步,丢弃其余周而复始直到达到目标。 因此整个过程核心就是MPC的滚动时域机制:预测未来——>优化整串动作——>执行第一步——>丢弃其余——>窗口前移——>重新计算。 示例程序 下面是一个简单的示例加深对MPC的认识。场景是一个一维的小车: 状态$x=[位置,速度]$,控制输入$u=加速度$。 小车从0开始,目标位置在10米,并希望最后速度接近0. 用MPC做控制,预测N步,计算代价,找到最优控制序列,但只执行第一个动作然后滚动。 求解$u$这里直接使用的是穷举法。 import numpy as np import matplotlib.pyplot as plt import matplotlib.animation as animation # --- 系统模型 (离散时间) --- dt = 0.2 # 状态空间模型: x_{k+1} = A x_k + B u_k # x = [位置, 速度],u = 加速度 A = np.array([[1, dt], [0, 1]]) B = np.array([[0.5*dt**2], [dt]]) # 初始状态: 位置=0, 速度=0 x = np.array([0.0, 0.0]) # 目标状态: 位置=10, 速度=0 (停在10米处) target = np.array([10.0, 0.0]) # --- MPC 参数 --- N = 5 # 预测时域长度 (未来看5步) U_candidates = [-1, 0, 1] # 控制输入候选集合 (加速度: -1=刹车, 0=不动, 1=加速) def simulate(x0, u_seq): """ 给定初始状态 x0 和一段控制序列 u_seq, 用系统模型递推未来轨迹,并计算代价 J """ x = x0.copy() cost = 0.0 traj = [x.copy()] # 保存预测轨迹 (用于可视化) for u in u_seq: # 状态更新 (预测未来) x = A @ x + B.flatten()*u traj.append(x.copy()) # 代价函数 J = 误差项 + 控制代价 err = x - target cost += err[0]**2 + 0.3*err[1]**2 + 0.1*(u**2) # 位置误差^2 + 速度误差^2(权重0.3) + 控制输入^2(权重0.1) return cost, np.array(traj) # --- MPC 主循环 (滚动时域控制) --- history_x = [] # 真实执行的状态轨迹 history_u = [] # 实际执行的控制输入 (只取最优序列的第一步) pred_trajs = [] # 每次优化得到的预测轨迹 (整串) for step in range(60): best_cost = 1e9 best_seq = None best_traj = None # 穷举所有可能的控制序列 U = [u_k, u_{k+1}, ..., u_{k+N-1}] for U in np.array(np.meshgrid(*[U_candidates]*N)).T.reshape(-1,N): cost, traj = simulate(x, U) if cost < best_cost: best_cost = cost best_seq = U # 当前最优控制序列 best_traj = traj # 当前最优预测轨迹 # 保存数据 (真实轨迹、控制输入、预测轨迹) history_x.append(x.copy()) history_u.append(best_seq[0]) # 只执行第一步 u_k* pred_trajs.append(best_traj) # 保存整条预测轨迹用于画红虚线 # 执行第一步 (滚动时域控制的核心:只执行u_k) u = best_seq[0] x = A @ x + B.flatten()*u # 收敛条件 (位置接近10, 速度≈0) if abs(x[0]-target[0]) < 0.1 and abs(x[1]-target[1]) < 0.1: history_x.append(x.copy()) history_u.append(0) # 终端时控制输入设为0 break history_x = np.array(history_x) # --- 动态可视化 --- fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6)) # 上图:位置随时间 ax1.axhline(target[0], color="green", linestyle="--", label="Target position") (line_real,) = ax1.plot([], [], "bo-", label="Real trajectory") # 蓝点=真实轨迹 (line_pred,) = ax1.plot([], [], "r--", label="Predicted trajectory") # 红虚线=预测轨迹 (point_exec,) = ax1.plot([], [], "ro", markersize=8, label="Execute point") # 红点=执行点 ax1.set_xlim(0, len(history_x)+N) ax1.set_ylim(0, target[0]+5) ax1.set_ylabel("Position") ax1.legend() # 下图:控制输入 (line_u,) = ax2.step([], [], where="post", label="Control input u") ax2.set_xlim(0, len(history_u)) ax2.set_ylim(min(U_candidates)-0.5, max(U_candidates)+0.5) ax2.set_xlabel("Time step") ax2.set_ylabel("u") ax2.legend() # --- 动画更新函数 --- def update(frame): # 蓝线:实际轨迹 line_real.set_data(np.arange(frame+1), history_x[:frame+1,0]) # 红虚线:预测轨迹 (窗口内) pred = pred_trajs[frame] line_pred.set_data(np.arange(frame, frame+len(pred)), pred[:,0]) # 红点:当前执行点 point_exec.set_data([frame], [history_x[frame,0]]) # 阶梯:控制输入 line_u.set_data(np.arange(frame+1), history_u[:frame+1]) return line_real, line_pred, point_exec, line_u # 动画循环:frames=len(pred_trajs),表示每个MPC优化时刻 ani = animation.FuncAnimation(fig, update, frames=len(pred_trajs), interval=800, blit=True, repeat=False) plt.show() -
语音生成模型:CosyVoice入门实践
是什么 CosyVoice是阿里开源的一款文字转语音的开源模型,可以支持音色复刻。 怎么用 环境安装 (1)代码下载 git clone --recursive https://github.com/FunAudioLLM/CosyVoice.git cd CosyVoice git submodule update --init --recursive 因为CosyVoice仓库中还依赖了第三方的Matcha-TTS,所以克隆本地仓库后,还需要下载第三方的。 (2)创建conda环境 conda create -n cosyvoice -y python=3.10 conda activate cosyvoice pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/ --trusted-host=mirrors.aliyun.com 创建conda环境并安装依赖。如果没有安装cuda工具的话,还需要执行下面命令安装。 sudo apt install nvidia-cuda-toolkit (3)下载预训练模型 sudo apt update && sudo apt install git-lfs -y mkdir -p pretrained_models git clone https://www.modelscope.cn/iic/CosyVoice2-0.5B.git pretrained_models/CosyVoice2-0.5B git clone https://www.modelscope.cn/iic/CosyVoice-300M.git pretrained_models/CosyVoice-300M git clone https://www.modelscope.cn/iic/CosyVoice-300M-SFT.git pretrained_models/CosyVoice-300M-SFT git clone https://www.modelscope.cn/iic/CosyVoice-300M-Instruct.git pretrained_models/CosyVoice-300M-Instruct git clone https://www.modelscope.cn/iic/CosyVoice-ttsfrd.git pretrained_models/CosyVoice-ttsfrd 上面的模型文件选择一个即可,需要注意的是因为模型比较大,所以要在本地安装git-lfs才能下载大文件。 测试 python3 webui.py --port 50000 --model_dir pretrained_models/CosyVoice-300M 执行上面命令后,就可以登录网页输入http://127.0.0.1:50000/进行测试了。 -
语音识别模型:SenseVoice入门实践
是什么 SenseVoice是多语言识别的模型,支持语音转文字(ASR, Automatic Speech Recognition,自动语音识别),语种识别(LID, Language Identification),语音情感识别(SER, Speech Emotion Recognition),音频事件检测 / 声学事件分类(AED/AEC, Audio Event Detection / Classification),逆文本正则化 / 标点 / 富文本转写等。 怎么用 配置环境 配置一个conda sensevoice环境并且激活。 conda create -n sensevoice python=3.10 conda activate sensevoice 拉取代码,安装依赖 git clone https://github.com/FunAudioLLM/SenseVoice.git cd SenseVoice/ pip install -r requirements.txt 示例测试 代码中有两个示例分别是demo1.py和demo2.py,执行后可以看看效果。 python demo1.py 执行后会自动从modelscope上下载模型和相关配置到本地。 下载到本地后有模型和相关的配置文件以及示例音频。 (sensevoice) laumy@ThinkBook-14-G7-IAH:~/.cache/modelscope/hub/models/iic$ tree . ├── SenseVoiceSmall │ ├── am.mvn │ ├── chn_jpn_yue_eng_ko_spectok.bpe.model │ ├── configuration.json │ ├── config.yaml │ ├── example │ │ ├── en.mp3 │ │ ├── ja.mp3 │ │ ├── ko.mp3 │ │ ├── yue.mp3 │ │ └── zh.mp3 │ ├── fig │ │ ├── aed_figure.png │ │ ├── asr_results.png │ │ ├── inference.png │ │ ├── sensevoice.png │ │ ├── ser_figure.png │ │ └── ser_table.png │ ├── model.pt │ ├── README.md │ └── tokens.json └── speech_fsmn_vad_zh-cn-16k-common-pytorch ├── am.mvn ├── configuration.json ├── config.yaml ├── example │ └── vad_example.wav ├── fig │ └── struct.png ├── model.pt └── README.md 7 directories, 25 files 最后的识别效果为 还可以测试webui版本 python webui.py 然后网址输入:http://127.0.0.1:7860, 注意不能开代理否则会启动失败。 接口调用 主要简要分析一下使用funASR调用示例。 (1)模型加载 model = AutoModel(model=[str], device=[str], ncpu=[int], output_dir=[str], batch_size=[int], hub=[str], **kwargs) model:模型仓库中的名称 device:推理的设备,如gpu ncpu:cpu并行线程。 output_dir:输出结果的输出路径 batch_size:解码时的批处理,样本个数 hub:从modelscope下载模型。如果为hf,从huggingface下载模型。 (2)推理 res = model.generate(input=[str], output_dir=[str]) input:要解码的输入可以是wav文件路径, 例如: asr_example.wav。 output_dir: 输出结果的输出路径。 实时识别 编写一个示例实时识别 #!/usr/bin/env python3 # -*- encoding: utf-8 -*- # Real-time microphone transcription (pure memory) using ALSA (pyalsaaudio) + numpy import argparse import os import signal import sys import time from pathlib import Path from typing import Optional import numpy as np def _safe_imports() -> None: try: import alsaaudio # noqa: F401 except Exception: print( "缺少 pyalsaaudio,请先安装:\n pip install pyalsaaudio\nUbuntu 可能需要:sudo apt-get install -y python3-alsaaudio 或 alsa-utils", file=sys.stderr, ) raise def _safe_import_model(): try: from funasr import AutoModel from funasr.utils.postprocess_utils import rich_transcription_postprocess except Exception: print( "导入 funasr 失败。如果仓库根目录存在本地 'funasr.py',请重命名(如 'funasr_demo.py')以避免遮蔽外部库。", file=sys.stderr, ) raise return AutoModel, rich_transcription_postprocess def int16_to_float32(audio_int16: np.ndarray) -> np.ndarray: if audio_int16.dtype != np.int16: audio_int16 = audio_int16.astype(np.int16, copy=False) return (audio_int16.astype(np.float32) / 32768.0).clip(-1.0, 1.0) def resample_to_16k(audio_f32: np.ndarray, orig_sr: int) -> tuple[np.ndarray, int]: if orig_sr == 16000: return audio_f32, 16000 backend = os.environ.get("SV_RESAMPLE", "poly").lower() # poly|librosa|linear # 优先使用更快的 poly(scipy),其次 librosa,最后线性插值兜底 if backend in ("poly", "auto"): try: from scipy.signal import resample_poly # type: ignore y = resample_poly(audio_f32, 16000, orig_sr) return y.astype(np.float32, copy=False), 16000 except Exception: pass if backend in ("librosa", "auto"): try: import librosa # type: ignore y = librosa.resample(audio_f32, orig_sr=orig_sr, target_sr=16000) return y.astype(np.float32, copy=False), 16000 except Exception: pass # 线性插值兜底(最慢但零依赖) x = np.arange(audio_f32.size, dtype=np.float32) new_n = int(round(audio_f32.size * 16000.0 / float(orig_sr))) if new_n <= 1: return audio_f32, orig_sr new_x = np.linspace(0.0, x[-1] if x.size > 0 else 0.0, new_n, dtype=np.float32) y = np.interp(new_x, x, audio_f32).astype(np.float32) return y, 16000 def _build_cfg_from_env() -> dict: cfg = {} # runtime knobs try: cfg["min_rms"] = float(os.environ.get("SV_MIN_RMS", "0.003")) except Exception: cfg["min_rms"] = 0.003 try: cfg["overlap_sec"] = float(os.environ.get("SV_OVERLAP_SEC", "0.3")) except Exception: cfg["overlap_sec"] = 0.3 cfg["merge_vad"] = os.environ.get("SV_MERGE_VAD", "0") == "1" try: cfg["merge_len"] = float(os.environ.get("SV_MERGE_LEN", "2.0")) except Exception: cfg["merge_len"] = 2.0 cfg["debug"] = os.environ.get("SV_DEBUG") == "1" cfg["resample_backend"] = os.environ.get("SV_RESAMPLE", "poly").lower() cfg["dump_wav_path"] = os.environ.get("SV_DUMP_WAV") return cfg def _select_strong_channel(frame_any: np.ndarray, channels: int, debug: bool) -> np.ndarray: if channels <= 1: return frame_any frame_mat = frame_any.reshape(-1, channels) ch_rms = np.sqrt(np.mean(frame_mat.astype(np.float32) ** 2, axis=0)) sel = int(np.argmax(ch_rms)) if debug: print(f"[debug] multi-channel rms={ch_rms.tolist()}, select ch={sel}", flush=True) return frame_mat[:, sel] def _ensure_pcm_open(alsa_audio, device: str | None, samplerate: int, channels: int, period_frames: int) -> tuple[object, int, str, str]: tried: list[tuple[str, str, str]] = [] for dev in [device or "default", f"plughw:{(device or 'default').split(':')[-1]}" if (device and not device.startswith('plughw')) else None]: if not dev: continue for fmt in (alsa_audio.PCM_FORMAT_S16_LE, alsa_audio.PCM_FORMAT_S32_LE): try: p = alsa_audio.PCM(type=alsa_audio.PCM_CAPTURE, mode=alsa_audio.PCM_NORMAL, device=dev) p.setchannels(channels) p.setrate(samplerate) p.setformat(fmt) p.setperiodsize(period_frames) dtype = np.int16 if fmt == alsa_audio.PCM_FORMAT_S16_LE else np.int32 sample_bytes = 2 if dtype == np.int16 else 4 fmt_name = "S16_LE" if dtype == np.int16 else "S32_LE" return p, sample_bytes, fmt_name, dev except Exception as e: # noqa: BLE001 tried.append((dev, "S16_LE" if fmt == alsa_audio.PCM_FORMAT_S16_LE else "S32_LE", str(e))) continue raise RuntimeError(f"打开 ALSA 设备失败,尝试: {tried}") def _to_int16_mono(raw_bytes: bytes, sample_bytes: int, channels: int, debug: bool) -> np.ndarray: if sample_bytes == 2: frame_any = np.frombuffer(raw_bytes, dtype=np.int16) else: frame_any = np.frombuffer(raw_bytes, dtype=np.int32) frame_any = (frame_any.astype(np.int32) >> 16).astype(np.int16) if channels > 1: frame_any = _select_strong_channel(frame_any, channels, debug) return frame_any.astype(np.int16, copy=False) def _should_infer(audio_f32: np.ndarray, min_rms: float, debug: bool, frames_count: int) -> bool: if audio_f32.size == 0: return False rms = float(np.sqrt(np.mean(np.square(audio_f32)))) if rms < min_rms: if debug: print(f"[debug] frames={frames_count}, rms={rms:.4f} < min_rms={min_rms:.4f}, skip", flush=True) return False return True def _infer_block(model, arr: np.ndarray, sr: int, cfg: dict, language: str, running_cache: dict) -> str: prefer = os.environ.get("SV_INPUT_FORMAT", "f32") # candidate arrays f32 = arr.astype(np.float32, copy=False) i16 = (np.clip(f32, -1.0, 1.0) * 32767.0).astype(np.int16) candidates: list[tuple[np.ndarray, bool]] if prefer == "i16": candidates = [(i16, False), (f32, False), (i16[None, :], True), (f32[None, :], True)] elif prefer == "f32_2d": candidates = [(f32[None, :], True), (f32, False), (i16[None, :], True), (i16, False)] elif prefer == "i16_2d": candidates = [(i16[None, :], True), (i16, False), (f32[None, :], True), (f32, False)] else: candidates = [(f32, False), (i16, False), (f32[None, :], True), (i16[None, :], True)] res = None for cand, _is2d in candidates: try: try: res = model.generate( input=cand, input_fs=sr, cache=running_cache, language=language, use_itn=True, batch_size_s=60, merge_vad=cfg["merge_vad"], merge_length_s=cfg["merge_len"], ) except TypeError: res = model.generate( input=cand, fs=sr, cache=running_cache, language=language, use_itn=True, batch_size_s=60, merge_vad=cfg["merge_vad"], merge_length_s=cfg["merge_len"], ) break except Exception: continue if not res: return "" out = res[0].get("text", "") return out def run_in_memory( model_dir: str, device: str, language: str, samplerate: int, channels: int, block_seconds: float, alsa_device: Optional[str], ) -> None: _safe_imports() AutoModel, rich_post = _safe_import_model() # 关闭底层 tqdm 进度条等多余输出 os.environ.setdefault("TQDM_DISABLE", "1") print("加载模型...", flush=True) model = AutoModel( model=model_dir, trust_remote_code=True, remote_code="./model.py", vad_model="fsmn-vad", vad_kwargs={"max_single_segment_time": 30000}, device=device, disable_update=True, ) import alsaaudio # 更小的 periodsize,降低 I/O 错误概率;推理聚合由 block_seconds 控制 period_frames = max(256, int(0.1 * samplerate)) cfg = _build_cfg_from_env() # 打开设备,尝试 S16 -> S32;必要时切换 plughw 以启用自动重采样/重格式化 pcm, sample_bytes, fmt_name, open_dev = _ensure_pcm_open(alsaaudio, alsa_device, samplerate, channels, period_frames) if cfg["debug"]: print(f"[debug] opened ALSA device {open_dev} fmt={fmt_name} period={period_frames}", flush=True) print( f"开始录音(纯内存采集),设备={alsa_device or 'default'}, sr={samplerate}, ch={channels}, period={period_frames},按 Ctrl+C 停止。\n", flush=True, ) running_cache: dict = {} printed_text = "" # 聚合到这一帧阈值后才调用一次模型,降低调用频率与输出 min_frames = max(1, int(block_seconds * samplerate)) # 重叠帧数,减少句首/句尾丢失 overlap_frames = int(max(0, cfg["overlap_sec"]) * samplerate) block_buf: list[np.ndarray] = [] buf_frames = 0 stop_flag = False def handle_sigint(signum, frame): # noqa: ANN001 nonlocal stop_flag stop_flag = True signal.signal(signal.SIGINT, handle_sigint) # 连续读取 PCM,并聚合达到阈值后执行一次增量推理 while not stop_flag: try: length, data = pcm.read() except Exception as e: # noqa: BLE001 # 读失败时尝试重建(常见于某些 DMIC Raw) if cfg["debug"]: print(f"[debug] pcm.read error: {e}, reopen stream", flush=True) # 尝试使用 plughw 以获得自动格式/采样率适配 dev = alsa_device or "default" if not dev.startswith("plughw") and dev != "default": dev = f"plughw:{dev.split(':')[-1]}" pcm, sample_bytes, fmt_name, open_dev = _ensure_pcm_open(alsaaudio, dev, samplerate, channels, period_frames) length, data = pcm.read() if length <= 0: time.sleep(0.005) continue # 根据实际格式解析 bytes pcm_int16 = _to_int16_mono(data, sample_bytes, channels, cfg["debug"]) block_buf.append(pcm_int16) buf_frames += pcm_int16.size if buf_frames < min_frames: continue # 达到阈值,拼接并转换为 float32 [-1, 1] joined = np.concatenate(block_buf, axis=0) # 为下一个块保留尾部重叠 if overlap_frames > 0 and joined.size > overlap_frames: tail = joined[-overlap_frames:] block_buf = [tail.astype(np.int16, copy=False)] buf_frames = tail.size else: block_buf.clear() buf_frames = 0 audio_block_f32 = int16_to_float32(joined) # 若采样率不是 16k,重采样到 16k audio_block_f32, eff_sr = resample_to_16k(audio_block_f32, samplerate) if not _should_infer(audio_block_f32, cfg["min_rms"], cfg["debug"], joined.size): continue if cfg["debug"]: rms = float(np.sqrt(np.mean(np.square(audio_block_f32)))) print(f"[debug] frames={joined.size}, eff_sr={eff_sr}, rms={rms:.4f}, infer", flush=True) # 可选:调试时将当前块写到一个覆盖的临时 WAV(重采样后,单声道16k),方便用 aplay 检查采音 if cfg["dump_wav_path"]: import wave, tempfile dump_path = cfg["dump_wav_path"] or str( Path(tempfile.gettempdir()) / "sv_debug_block.wav" ) with wave.open(dump_path, "wb") as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(eff_sr) dump_i16 = (np.clip(audio_block_f32, -1.0, 1.0) * 32767.0).astype(np.int16) wf.writeframes(dump_i16.tobytes()) # 执行推理 res_text = _infer_block(model, audio_block_f32, eff_sr, cfg, language, running_cache) if not res_text: continue text = rich_post(res_text) if text.startswith(printed_text): new_part = text[len(printed_text) :] else: new_part = text if new_part: print(new_part, end="", flush=True) printed_text += new_part # 停止前做一次 flush:如果还有残留缓冲,强制推理,避免句尾丢失 if block_buf: try: joined = np.concatenate(block_buf, axis=0) audio_block_f32 = int16_to_float32(joined) audio_block_f32, eff_sr = resample_to_16k(audio_block_f32, samplerate) text = _infer_block(model, audio_block_f32, eff_sr, cfg, language, running_cache) text = rich_post(text) if text: print(text[len(printed_text):], end="", flush=True) except Exception: pass print("\n已停止。") def main() -> None: parser = argparse.ArgumentParser(description="Real-time mic transcription (in-memory ALSA)") parser.add_argument("--model", default="iic/SenseVoiceSmall", help="模型仓库或本地路径") parser.add_argument("--device", default="cuda:0", help="设备,如 cuda:0 或 cpu") parser.add_argument("--language", default="auto", help="语言代码或 'auto'") parser.add_argument("--samplerate", type=int, default=16000, help="采样率") parser.add_argument("--channels", type=int, default=1, help="通道数") parser.add_argument("--block-seconds", type=float, default=1.0, help="每次推理块时长(秒)") parser.add_argument("--min-rms", type=float, default=0.003, help="触发推理的最小能量阈值(0-1)") parser.add_argument("--overlap-seconds", type=float, default=0.3, help="块间重叠时长(秒),减少句首/句尾丢失") parser.add_argument("--merge-vad", action="store_true", help="启用 VAD 合并(默认关闭以获得更快输出)") parser.add_argument("--merge-length", type=float, default=2.0, help="VAD 合并的最大分段时长(秒)") parser.add_argument("--debug", action="store_true", help="打印调试信息(RMS、帧数等)") parser.add_argument("--alsa-device", default=None, help="ALSA 设备名(如 hw:0,0 或 default)") args = parser.parse_args() # 将 CLI 设置传入环境变量,供运行体内读取 if args.debug: os.environ["SV_DEBUG"] = "1" os.environ["SV_MIN_RMS"] = str(args.min_rms) os.environ["SV_MERGE_VAD"] = "1" if args.merge_vad else "0" os.environ["SV_MERGE_LEN"] = str(args.merge_length) os.environ["SV_OVERLAP_SEC"] = str(args.overlap_seconds) run_in_memory( model_dir=args.model, device=args.device, language=args.language, samplerate=args.samplerate, channels=args.channels, block_seconds=args.block_seconds, alsa_device=args.alsa_device, ) if __name__ == "__main__": main() 执行前先安装一下模块组件 pip install pyalsaaudio 接着运行,就可以实时说话转换为文字了。 python demo_realtime_memory.py --alsa-device plughw:0,6 --samplerate 48000 --channels 2 --block-seconds 1.0 --language zh --device cuda:0 参考:https://github.com/FunAudioLLM/SenseVoice -
windows访问Linux搭建的samba服务
Linux 服务 安装 Samba 是Linux上实现SMB/CIFS 协议的服务,可以让 Windows、Linux、macOS 之间互相访问文件夹。 安装samba sudo apt update sudo apt install samba -y 配置 (1)配置文件 sudo vim /etc/samba/smb.conf [share] path = /home/bianbu browseable = yes writable = yes valid users = bianbu read only = no [share] 是共享名称,如window访问可以"\服务器IP\share",如下图。 path 访问的共享目录路径 valid users 表示允许访问的用户,这里一般要与系统的用户一致。 (2)设置用户名 设置Samba用户,注意用户名要是系统中存在的用户名。 sudo smbpasswd -a bianbu 启动 启动服务器并设置开机自启动 sudo systemctl restart smbd sudo systemctl enable smbd 检查状态 systemctl status smbd bianbu@bianbu:~$ sudo systemctl restart smbd sudo systemctl enable smbd Synchronizing state of smbd.service with SysV service script with /usr/lib/systemd/systemd-sysv-install. Executing: /usr/lib/systemd/systemd-sysv-install enable smbd bianbu@bianbu:~$ systemctl status smbd ● smbd.service - Samba SMB Daemon Loaded: loaded (/usr/lib/systemd/system/smbd.service; enabled; preset: enabled) Active: active (running) since Tue 2025-09-23 15:41:53 CST; 8s ago Docs: man:smbd(8) man:samba(7) man:smb.conf(5) Main PID: 22211 (smbd) Status: "smbd: ready to serve connections..." Tasks: 3 (limit: 4089) Memory: 7.1M (peak: 7.6M) CPU: 435ms CGroup: /system.slice/smbd.service ├─22211 /usr/sbin/smbd --foreground --no-process-group ├─22220 "smbd: notifyd" . └─22221 "smbd: cleanupd " 9月 23 15:41:53 bianbu systemd[1]: Starting smbd.service - Samba SMB Daemon... 9月 23 15:41:53 bianbu (smbd)[22211]: smbd.service: Referenced but unset environment variable evaluates to an empty string: SMBDOPTIONS 9月 23 15:41:53 bianbu systemd[1]: Started smbd.service - Samba SMB Daemon. windows连接 ip就是服务器地址,share就是smb.conf。注意如果连接不上,就把"使用其他凭据连接勾选上"。 接着输入用户命令和密码,这里的用户名和密码就是使用sudo smbpasswd -a bianbu命令配置的。 排查 (1)测试linux段samba服务是否正常 测试本地samba服务运行情况。 sudo apt update sudo apt install smbclient -y 然后运行 smbclient -L localhost -U bianbu smbclient是samba提供的客户端工具,用来测试samba服务器是否能正常工作。 ** 在window上命令测试是否联通** 在window上试试命令: net use * \\192.168.0.34\Shared /user:192.168.0.34\bianbu 密码