Ai应用
-
Isaac Sim 快速入门:三种工作流程示例
简介 如果是NVIDIA Isaac Sim的新用户,可以按照本文的两个示例来体验Isaac Sim。本文主要提供Isaac Sim基础使用教程、机器人基础教程。 在快速入门教程中,所有可通过 GUI 执行的操作同样也能用 Python 实现。您可以在 GUI 操作与 Python 脚本之间自由切换。您在 GUI 中创建的所有内容都能保存为 USD 文件的一部分。 例如,您可以通过图形界面创建世界,并为机器人添加所需动作。随后将整个 USD 文件导入独立的 Python 脚本中,根据需求系统性地修改属性。 基础使用教程 本教程涵盖 Isaac Sim 的基础操作,包括界面导航、场景对象添加、查看对象基本属性以及运行模拟等内容。 通过本教程,您将从空白场景开始,根据三种不同工作流程的选择,最终实现机器人运动控制。提供三种不同工作流程的目的是展示 Isaac Sim 可根据需求以多种方式灵活使用。 可以查看两种工作流中的脚本以了解它们的差异。通过对比分析,有助于掌握如何执行完全相同的任务: 扩展脚本可在Window > Examples > Robotics Examples中找到,然后单击浏览器右上角的**Open Script **按钮。 独立脚本位于 \<isaac-sim-root-dir>/standalone_examples/tutorials/ 文件夹内。 可以通过编辑扩展示例中的任意脚本来体验"hot-reloading"功能。保存文件后,无需关闭模拟器即可立即看到变更生效。 在官方教程中有3个标签页,三个标签页执行相同操作并达成相同结果。 GUI: 图形用户界面 Extensions:扩展功能 Standalone Python:独立Python环境 GUI方式 步骤1:启动Isaac Sim linux:cd ~/isaacsim && ./isaac-sim.selector.sh windows:双击isaac-sim.selector.bat 模拟器完全加载后,创建新场景: 从顶部菜单栏点击File > New。首次启动 Isaac Sim 时,可能需要 5-10 分钟完成初始化。 步骤2: 添加地平面 为场景添加地平面:从顶部菜单栏点击Create > Physics > Ground Plane。 步骤3:添加光源 可以为场景添加光源以照亮其中的物体。如果场景中有光源但没有物体反射光线,场景仍会显得昏暗。 在顶部菜单栏中,点击Create > Lights > Distant Light。 步骤4:添加视觉立方体 "视觉"立方体是指没有附加物理属性的立方体,例如没有质量、没有碰撞体积。这种立方体不会受重力影响下落,也不会与其他物体发生碰撞。 从顶部菜单栏中,依次点击Create > Shape > Cube. 在用户界面最左侧找到箭头图标并点击Play。运行模拟时立方体不会有任何动作。 步骤5:移动、旋转与缩放立方体 使用左侧工具栏上的各种操控工具来操作立方体。 按下"W"键或点击移动工具即可拖拽移动立方体。通过点击箭头并拖拽可单轴移动,点击彩色方块并拖拽可双轴移动,点击工具中心的圆点并拖拽则可三轴自由移动。 按下“E”键或点击旋转控制器来旋转立方体。 按下“R”键或点击缩放控制器来调整立方体大小。点击箭头并拖动可单维度缩放,点击彩色方块并拖动可双维度缩放,点击控制器中心的圆圈并拖动则可实现三维同步缩放。 按下“esc”键取消选中立方体。 对于“移动”和“旋转”操作,可选择基于局部坐标系或世界坐标系进行操作。长按控制器即可查看选项。 可以通过立方体的Property属性面板进行更精确的修改,只需在对应输入框中输入具体数值即可。点击输入框旁的蓝色方块可将数值重置为默认值。 步骤6:添加物理与碰撞属性 常见的物理属性包括质量和惯性矩阵,这些属性使物体能够在重力作用下下落。碰撞属性则决定了物体能否与其他物体发生碰撞。 物理和碰撞属性可以分别添加,因此你可以创建一个能与其他物体碰撞但不受重力影响的物体,或是受重力影响但不会与其他物体碰撞的物体。但在多数情况下,这两个属性会同时添加。 为立方体添加物理和碰撞属性: 在场景树中找到对象("/World/Cube")并高亮显示它。 从工作区右下角的Property属性面板中,点击"Add"按钮并在下拉菜单中选择Physics。这将显示可添加到对象的一系列属性选项。 选择Rigid Body with Colliders Preset“带碰撞器的刚体预设”可为对象同时添加物理和碰撞网格。 按下播放Play按钮,观察立方体在重力作用下坠落并与地平面发生碰撞。 教程结束,记得保存你的工作。 扩展功能方式 通过一个名为"脚本编辑器"的现有扩展模块来演示扩展工作流的特性。脚本编辑器允许用户通过 Python 与场景进行交互。主要使用与独立 Python 工作流相同的 Python API。当我们开始与模拟时间轴交互时,特别是下一个教程中,这两种工作流的区别将变得清晰。 步骤1:启动 启动一个新的 Isaac Sim 实例,转到顶部菜单栏并点击Window > Script Editor。 步骤2:添加地平面 要通过交互式 Python 添加地平面,请将以下代码片段复制粘贴到脚本编辑器中,然后点击底部的运行Run按钮执行。 from isaacsim.core.api.objects.ground_plane import GroundPlane GroundPlane(prim_path="/World/GroundPlane", z_position=0) 步骤3:添加光源 可以为场景添加光源以照亮其中的物体。如果场景中有光源但没有物体反射光线,场景仍会显得昏暗。 在脚本编辑器中新建一个标签页(Tab > Add Tab)。 在脚本编辑器中复制粘贴以下代码片段并运行,即可添加光源。 import omni.usd from pxr import Sdf, UsdLux stage = omni.usd.get_context().get_stage() distantLight = UsdLux.DistantLight.Define(stage, Sdf.Path("/DistantLight")) distantLight.CreateIntensityAttr(300) 步骤4:添加视觉立方体 在脚本编辑器中新建一个标签页 (Tab > Add Tab)。 在脚本编辑器中复制粘贴以下代码片段并运行,即可添加两个立方体。我们将保留其中一个作为纯视觉对象,同时为另一个添加物理和碰撞属性以便对比。 import numpy as np from isaacsim.core.api.objects import VisualCuboid VisualCuboid( prim_path="/visual_cube", name="visual_cube", position=np.array([0, 0.5, 0.5]), size=0.3, color=np.array([255, 255, 0]), ) VisualCuboid( prim_path="/test_cube", name="test_cube", position=np.array([0, -0.5, 0.5]), size=0.3, color=np.array([0, 255, 255]), ) Isaac Sim 核心 API 是对原生 USD 和物理引擎 API 的封装。您可以使用原生 USD API 添加一个视觉立方体(不含物理和颜色属性)。请注意原生 USD API 更为冗长,但能提供对每个属性的更精细控制。 from pxr import UsdPhysics, PhysxSchema, Gf, PhysicsSchemaTools, UsdGeom import omni # USD api for getting the stage stage = omni.usd.get_context().get_stage() # Adding a Cube path = "/visual_cube_usd" cubeGeom = UsdGeom.Cube.Define(stage, path) cubePrim = stage.GetPrimAtPath(path) size = 0.5 offset = Gf.Vec3f(1.5,-0.2,1.0) cubeGeom.CreateSizeAttr(size) if not cubePrim.HasAttribute("xformOp:translate"): UsdGeom.Xformable(cubePrim).AddTranslateOp().Set(offset) else: cubePrim.GetAttribute("xformOp:translate").Set(offset) 步骤5:添加物理与碰撞属性 在 Isaac Sim 核心 API 中,我们为常用对象编写了封装器,这些封装器附带所有物理和碰撞属性。您可以通过以下代码片段添加一个具有物理和碰撞属性的立方体。 import numpy as np from isaacsim.core.api.objects import DynamicCuboid DynamicCuboid( prim_path="/dynamic_cube", name="dynamic_cube", position=np.array([0, -1.0, 1.0]), scale=np.array([0.6, 0.5, 0.2]), size=1.0, color=np.array([255, 0, 0]), ) 另外,如果想修改现有对象使其具备物理和碰撞属性,可以使用以下代码片段。 from isaacsim.core.prims import RigidPrim RigidPrim("/test_cube") from isaacsim.core.prims import GeometryPrim prim = GeometryPrim("/test_cube") prim.apply_collision_apis() 点击播放Play按钮,观察立方体在重力作用下坠落并与地平面碰撞。 步骤6: 移动、旋转与缩放立方体 使用核心 API 移动物体: import numpy as np from isaacsim.core.prims import XFormPrim translate_offset = np.array([[1.5,1.2,1.0]]) orientation_offset = np.array([[0.7,0.7,0,1]]) # note this is in radians scale = np.array([[1,1.5,0.2]]) stage = omni.usd.get_context().get_stage() cube_in_coreapi = XFormPrim(prim_paths_expr="/test_cube") cube_in_coreapi.set_world_poses(translate_offset, orientation_offset) cube_in_coreapi.set_local_scales(scale) 使用原始 USD API 移动物体: from pxr import UsdGeom, Gf import omni.usd stage = omni.usd.get_context().get_stage() cube_prim = stage.GetPrimAtPath("/visual_cube_usd") translate_offset = Gf.Vec3f(1.5,-0.2,1.0) rotate_offset = Gf.Vec3f(90,-90,180) # note this is in degrees scale = Gf.Vec3f(1,1.5,0.2) # translation if not cube_prim.HasAttribute("xformOp:translate"): UsdGeom.Xformable(cube_prim).AddTranslateOp().Set(translate_offset) else: cube_prim.GetAttribute("xformOp:translate").Set(translate_offset) # rotation if not cube_prim.HasAttribute("xformOp:rotateXYZ"): # there are also "xformOp:orient" for quaternion rotation, as well as "xformOp:rotateX", "xformOp:rotateY", "xformOp:rotateZ" for individual axis rotation UsdGeom.Xformable(cube_prim).AddRotateXYZOp().Set(rotate_offset) else: cube_prim.GetAttribute("xformOp:rotateXYZ").Set(rotate_offset) # scale if not cube_prim.HasAttribute("xformOp:scale"): UsdGeom.Xformable(cube_prim).AddScaleOp().Set(scale) else: cube_prim.GetAttribute("xformOp:scale").Set(scale) 独立python环境方式 脚本位于standalone_examples/tutorials/getting_started.py,要运行该脚本,请打开终端,导航至 Isaac Sim 安装根目录,并执行以下命令: ./python.sh standalone_examples/tutorials/getting_started.py 机器人基础教程 本小节介绍如何将机器人添加到场景中、移动机器人以及检查机器人状态。在开始教程前,请确保已经完成了上一章节isaac sim基础使用教程。 GUI方式 步骤1:向场景中添加机器人 新建场景:通过File > New Stage.。 添加机器人:向场景添加机器人,从顶部菜单栏点击Create > Robots > Franka Emika Panda Arm。 步骤2:检查机器人 使用物理检查器查看机器人关节属性。 前往Tools > Physics > Physics Inspector.。右侧将打开一个窗口。 选择 Franka 进行检查。窗口默认会显示关节信息,例如上下限位及默认位置。 点击右上角的三横线图标可查看更多选项,例如关节刚度和阻尼系数。 可选)修改这些数值,观察舞台上机器人随参数变化的运动。修改成功后会出现绿色对勾标记。 点击绿色对勾按钮,将当前参数设为机器人新的默认值。 步骤3:控制机器人 基于图形界面的机器人控制器位于 Omniverse 可视化编程工具 OmniGraphs 中。OmniGraph 相关章节提供了更深入的教程指导。本教程将通过快捷工具生成控制图,然后在 OmniGraph 编辑器中查看该控制图。 通过菜单栏选择 Tools > Robotics > Omnigraph Controllers > Joint Position.来打开控制图生成器。。 在新弹出的关节位置控制器**Articulation Position Controller Inputs **输入窗口中,点击 Robot Prim 字段旁的添加"Add"按钮。 选择 Franka 作为目标对象。 点击确定生成图表。 要移动机器人的话按照下面步骤 在右上角的“舞台”选项卡中,选择Graph > Position_Controller.。 选择 JointCommandArray 节点。您可以通过在舞台树中选择该节点,或在图表编辑器中选择该节点来完成此操作。 在右下角的Property选项卡中,可以看到关节命令值。构造数组节点Construct Array Node下的输入Inputs项对应机器人上的关节,从基座关节开始。 点击+按住+拖动不同的数值字段,或输入不同的值,可以看到机械臂位置发生变化。 点击Play开始模拟。 要生成可视化的图标 打开图表编辑器窗口:**Window > Graph Editors > Action Graph. **。该编辑器窗口会在包含机器人的视口选项卡下方以新选项卡形式打开。 调出新打开的浏览器标签页。 点击位于图形编辑器窗口中央的Edit Action Graph选项。 从列表中选择唯一存在的图形。 选择一个数组并查看Stage和Property选项卡,以了解每个数组节点关联的值。 在图形中选择"关节控制器"Articulation Controller对象以查看其属性。 Extension方式 步骤1:向场景添加机器人 新建一个场景(File > New)。要将机器人添加到场景中,请将以下代码片段复制粘贴到脚本编辑器中并运行。 import carb from isaacsim.core.prims import Articulation from isaacsim.core.utils.stage import add_reference_to_stage from isaacsim.storage.native import get_assets_root_path import numpy as np assets_root_path = get_assets_root_path() if assets_root_path is None: carb.log_error("Could not find Isaac Sim assets folder") usd_path = assets_root_path + "/Isaac/Robots/FrankaRobotics/FrankaPanda/franka.usd" prim_path = "/World/Arm" add_reference_to_stage(usd_path=usd_path, prim_path=prim_path) arm_handle = Articulation(prim_paths_expr=prim_path, name="Arm") arm_handle.set_world_poses(positions=np.array([[0, -1, 0]])) 步骤2:检查机器人 Isaac Sim 核心 API 提供了许多函数调用来获取机器人相关信息。以下是查询关节数量与名称、各类关节属性以及关节状态的示例代码。 在脚本编辑器中新建标签页,复制粘贴以下代码片段。该操作需在上一步添加机器人完成后执行(此时 arm_handle 已创建)。运行代码前需先点击播放Play按钮,这些命令需在物理引擎运行状态下才能生效。 # Get the number of joints num_joints = arm_handle.num_joints print("Number of joints: ", num_joints) # Get joint names joint_names = arm_handle.joint_names print("Joint names: ", joint_names) # Get joint limits joint_limits = arm_handle.get_dof_limits() print("Joint limits: ", joint_limits) # Get joint positions joint_positions = arm_handle.get_joint_positions() print("Joint positions: ", joint_positions) 请注意,点击"运行"时仅会打印一次状态信息,即使模拟正在持续运行。如需查看最新状态,需反复点击"运行"按钮。若希望在每个物理步长都打印信息,需要将这些命令插入到每个物理步长都会执行的回调函数中。在章节"工作流程"中详细讲解时间步进机制。 要将命令插入物理回调中,请在脚本编辑器的单独标签页中运行以下代码片段。 import asyncio from isaacsim.core.api.simulation_context import SimulationContext async def test(): def print_state(dt): joint_positions = arm_handle.get_joint_positions() print("Joint positions: ", joint_positions) simulation_context = SimulationContext() await simulation_context.initialize_simulation_context_async() await simulation_context.reset_async() simulation_context.add_physics_callback("printing_state", print_state) asyncio.ensure_future(test()) 按下播放键启动模拟,然后运行该代码片段。您将看到终端在每个物理步骤打印出的信息。 如果不再需要每个物理步骤都打印信息,可以通过运行以下代码片段来移除物理回调。 simulation_context = SimulationContext() simulation_context.remove_physics_callback("printing_state") 步骤3:控制机器人 在 Isaac Sim 中有多种控制机器人的方式。最底层是直接发送关节指令来设置位置、速度和力矩。以下是通过关节层级的 Articulation API 控制机器人的示例。 在脚本编辑器中新建一个标签页,复制粘贴以下代码片段。该代码需在前述添加机器人步骤完成后运行(此时 arm_handle 已建立)。运行代码片段前请先点击播放键Play。这些指令需在物理引擎运行状态下生效。我们将提供两个位置供您切换。若您已将上述打印状态代码片段添加至每个物理步骤,当机器人移动时您应能看到打印的关节编号发生变化。 # Set joint position randomly arm_handle.set_joint_positions([[-1.5, 0.0, 0.0, -1.5, 0.0, 1.5, 0.5, 0.04, 0.04]]) # Set all joints to 0 arm_handle.set_joint_positions([[0.0, 0.0, 0.0 , 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]) 与前述 get_joint_positions 功能类似,此处的 set_joint_positions 仅在您点击"运行"时执行一次。若您希望在每个物理步骤发送指令,则需将这些命令插入到每个物理步骤都会执行的物理回调函数中。 独立Python方式 脚本位于 standalone_examples/tutorials/getting_started_robot.py 。要运行该脚本,请打开终端,导航至 Isaac Sim 安装的根目录,并执行以下命令: ./python.sh standalone_examples/tutorials/getting_started_robot.py 到此就完成了基本的操作了,接下来就是一系列的教程可以参考:《机器人设置教程系列》。 -
Isaac Sim v5.0.0:探索AI 机器人仿真平台
什么是isaac sim NVIDIA Issac Sim是一款基于NVIDIA omniverse构建的参考应用应用程序,使开发人员能够在基于物理的虚拟环境开发、模拟和测试AI机器人。 设计 Isaac Sim提供了一系列工作流程,用于导入和调整最常见格式(包括Onshape、统一机器人描述格式URDF和MuJoco XML格式MJCF)设计的机械系统。这通过使用通用常见描述USD实现,该开源3D常见描述API具有高度可扩展性,是Isaac Sim核心的统一数据交换格式。 微调与训练 Isaac Sim的核心功能在与其仿真能力本身:这是一个基于GPU的高保真physX物理引擎,能够支持工业规模的传感器PTX渲染。该平台通过直接调用GPU,实现了对各类传感器(包括摄像头、激光雷达和接触式传感器)的仿真模拟。这种能力进而支持数字孪生技术的实现,让您的端到端流程在真实机器人启动前就能完成测试运行。Isaac Sim提供了一整套工具链:通过Replicator生成合成数据,利用Omnigraph编程仿真环境,调整PhysX参数以匹配实现物理特性,最终通过强化学习RL等多样化方法训练控制智能体。 部署 Isaac Sim预先配备了所有必要组建,不仅能将智能体部署到真实机器人,还能构建与这类系统完全集成的应用程序。Omniverse提供了应用基础设施API,包括图形界面和文件管理功能。该平台还提供了与ROS 2桥接API,实现真实机器人与仿真的直接通信,同时搭建NVIDIA Isaac ROS:一套高性能硬件加速的ROS 2工具包,专为打造自主机器人而设计。 快速入门 快速安装:一小时内完成安装并开始使用。 Isaac Sim基础教程:助您快速上手 NVIDIA Isaac 仿真平台 工作站安装:本地工作站安装指南。 容器安装:远程桌面服务器安装指南。 开发工具:用于调试和开发的工具与环境。 python脚本与教程:使用 NVIDIA Isaac Sim 核心 Python API 构建环境、机器人和任务的工具与教程。 图形用户界面参考:通过图形用户界面了解 NVIDIA Isaac Sim 中的机器人基础概念。 导入与导出功能:支持多种文件格式的机器人及资产导入导出功能。 机器人设置:Isaac Sim 提供的机器人修改工具集。 机器人设置教程系列:关于机器人配置工具及工作流程的系列教学。 机器人仿真:用于模拟机器人的控制器与运动生成工具。 ROS2:ROS2桥接与接口。 Isaac Lab:强化学习框架与克隆器API。 合成数据生成:用于生成合成书记的工具集与工作流程。 数字孪生::用于构建和操作数字孪生的工具,如仓库物流、Cortex 和地图绘制。 系统架构 Isaac Sim旨在支持新型机器人工具的创建,并增强现有工具的功能。该平台为为C++和Python提供了灵活的API,可根据需求以不同深度集成到项目中。平台目标并非与现有软件竞争,而是与之协调并提升其能力。为此,Isaac Sim的许多组建都是开源的,可自由独立使用。可以在Onshape中设计机器人,用Isaac Sim模拟传感器,并通过ROS或其他消息系统控制场景。同样,也可以完全基于isaac sim提供的ingt构建完整的独立应用程序。 Omniverse Kit Isaac Sim基于Omniverse Kit构建,这是一个用于开发原生Omniverse应用和微服务的工具包。Omniverse Kit通过一系列轻量级插件提供多样化功能。这些插件采样C语言接口开发以确保API持久兼容性,同时提供python解释器以便进行便捷的脚本编写和定制。 通过 Python API 可以为 Omniverse Kit 编写新扩展,或为 Omniverse 创建新体验。 开发工作流 Isaac Sim基于C++和Python构建,通常分别通过编译插件和绑定进行操作。这意味着该平台能够支持多种工作流程,拥有构建和交互利用Isaac Sim项目。Isaac Sim提供完整的OMniverse应用程序,拥有与机器人交互和仿真,虽然这是用户与平台互动的最常见方式,但绝非唯一途径。Isaac Sim还以VS code和Jupyter Notebook扩展形式提供直接python开发支持。此外Isaac Sim不仅限与同步操作,还能通过ROS2实现硬件在环运行,从而促进仿真到现实的迁移以及数字孪生应用。 USD格式 NVIDIA Isaac Sim采用USD通过常见描述文件格式呈现常见。Universal Scene Description(USD)是由皮克斯开发的一种易于扩展的开源 3D 场景描述文件格式,专为内容创作和不同工具间的交互而设计。凭借其强大功能和通用性,USD 不仅被视觉特效领域广泛采用,还应用于建筑、设计、机器人、制造等多个学科领域。 安装指南 Isaac Sim支持windows和Linux系统安装。可通过容器( container)、工作站(workstation)、云端(in the cloud)、直播流(livestream)或者python环境进行部署,根据使用场景,还可以自定义硬件配置。 快速安装 快速安装适用于演示场景,可让您了解完整产品的功能概览。完成快速安装后,您能创建包含机器人的虚拟房间,这将更全面的展示产品能力。该只能面向具备基础计算机知识的安装人员。 windows或linux系统快速安装步骤: (1)下载以下任意安装包 windows: windows系统兼容性检查工具 linux:linux系统兼容性检查工具 (2)将安装包解压至制定文件夹 (3)运行脚本检查 window:请双击 omni.isaac.sim.compatibility_check.bat。 linux:./omni.isaac.sim.compatibility_check.sh 更多信息参阅:Isaac Sim兼容性检查 (4)下载任意一个安装包 window:https://download.isaacsim.omniverse.nvidia.com/isaac-sim-standalone-5.0.0-windows-x86_64.zip linux:https://download.isaacsim.omniverse.nvidia.com/isaac-sim-standalone-5.0.0-linux-x86_64.zip (5)创建一个isaac-sim的文件夹 在windows的C:/或linux根目录下直接创建一个名为isaac-sim的文件夹。然后将下载的文件解压到文件夹中去。 (6)在isaac-sim文件夹中,执行操作 window:请双击 isaac-sim.selector.bat linux:命令窗口中运行 ./post_install.sh ,然后运行 ./isaac-sim.selector.sh。 (7)在issac应用选择起窗口,选择"start" 有关应用选择器的详细信息,请参阅《Isaac Sim 应用选择器》。随后将打开另一个命令窗口并运行脚本,此过程可能比预期耗时更长。在此期间由于会出现空白窗口,可能看似安装失败。请继续等待。 (8)issac启动成功 (9)选择 选择创建一个房间 Create > Environment > Simple Room. (10)选择创建一个机械臂 Create > Robots > Franka Emika Panda Arm. (11)点击运行模拟 在屏幕最左侧寻找箭头按钮,点击它来运行一段简短模拟。 isaac 系统需求 对操作系统需求如下 对驱动的要求如下 工作站安装 工作站安装方式是在本地允许模拟器,需要对本地电脑有较高的要求,官方要求最低的配置要为RTX4080的显卡。因此若本地配置了GPU的windows或linux系统上以GUI应用程序允许isaac sim,推荐采用工作站安装方式。下面是安装步骤 (1)isaac Sim兼容性检查工具 Isaac 兼容性检查工具是一款轻量级应用程序,可通过编程方式检查本地的软硬件要求,会给出运行NVIDIA isaac sim时那些要求满足或不满足。 下载工具:Latest Release兼容性工具 解压:将压缩包解压到指定文件夹。 运行:在Linux系统上云霄omni.isaac.sim.compatibility_check.sh脚本,在windows系统上运行omni.isaac.sim.compatibility_check.bat文件。 点击工具的"Test Kit"按钮就会显示测试结果。 应用程序会以不同颜色高亮显示以下状态: 绿色:表示优秀 浅绿色:表示良好 橙色:表示基本满足,建议更高配置 红色:不足/不支持 应用程序检查维度为: NVIDIA GPU:驱动程序版本、支持RTX功能的GPU、显存容量。 CPU、内存和存储:CPU处理器、CPU核心数量、运行内存、可用存储空间。 others:操作系统、显示设备。 对于操作系统如果ubuntu大于版本号也会变成红色,可先测试是否可运行,实测是可以,但不排除有兼容性问题。 (2)下载软件包 下载链接:Latest Release,根据自己的系统选择软件包下载的本地。 在本地创建一个isaacsim文件夹,然后将压缩包解压到文件夹。 (3)运行启动 先创建extension_examples 的符号链接,请运行 post_install 脚本。 linux:./post_install.sh windows:双击 post_install.bat 文件 然后就可以启动应用程序 linux: 执行 ./isaac-sim.selector.sh windows:双击isaac-sim.selector.bat 文件 启动后会弹出以下界面 在弹出的窗口中选择isaac Sim Full,然后点击START就可以运行。启动过程中可能要一点时间,如果期间弹出“程序无法响应”,可以选择等待,以免被误杀。 启动后就可以开始第一个基础教程了:基础教程。 总结一下命令执行的实例: linux系统 mkdir ~/isaacsim cd ~/Downloads unzip "isaac-sim-standalone-5.0.0-linux-x86_64.zip" -d ~/isaacsim cd ~/isaacsim ./post_install.sh ./isaac-sim.selector.sh window系统 mkdir C:\isaacsim cd %USERPROFILE%/Downloads tar -xvzf "isaac-sim-standalone-5.0.0-windows-x86_64.zip" -C C:\isaacsim cd C:\isaacsim post_install.bat isaac-sim.selector.bat Docker容器安装 在远程服务器或者云端部署isaac sim建议使用docker容器的方式。 (1)检查系统是否满足需求 首先先确保系统满足运行NVIDIA isaac Sim所需的系统要求和驱动程序要求。 (2)安装docker Docker installation using the convenience script curl -fsSL https://get.docker.com -o get-docker.sh sudo sh get-docker.sh Post-install steps for Docker sudo groupadd docker sudo usermod -aG docker $USER newgrp docker Verify Docker docker run hello-world 详细的docker安装步骤见docker安装,安装后的配置步骤见配置步骤。 (3)安装NVIDIA容器工具包 Configure the repository curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg \ && curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list | \ sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' | \ sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list \ && \ sudo apt-get update Install the NVIDIA Container Toolkit packages sudo apt-get install -y nvidia-container-toolkit sudo systemctl restart docker Configure the container runtime sudo nvidia-ctk runtime configure --runtime=docker sudo systemctl restart docker Verify NVIDIA Container Toolkit docker run --rm --runtime=nvidia --gpus all ubuntu nvidia-smi 安装最新版本的 NVIDIA 容器工具包以获取安全补丁。 (4)容器部署 docker pull nvcr.io/nvidia/isaac-sim:5.0.0 拉取isaac sim容器。 然后以交互式bash会话运行isaac sim容器。 docker run --name isaac-sim --entrypoint bash -it --runtime=nvidia --gpus all -e "ACCEPT_EULA=Y" --rm --network=host \ -e "PRIVACY_CONSENT=Y" \ -v ~/docker/isaac-sim/cache/kit:/isaac-sim/kit/cache:rw \ -v ~/docker/isaac-sim/cache/ov:/root/.cache/ov:rw \ -v ~/docker/isaac-sim/cache/pip:/root/.cache/pip:rw \ -v ~/docker/isaac-sim/cache/glcache:/root/.cache/nvidia/GLCache:rw \ -v ~/docker/isaac-sim/cache/computecache:/root/.nv/ComputeCache:rw \ -v ~/docker/isaac-sim/logs:/root/.nvidia-omniverse/logs:rw \ -v ~/docker/isaac-sim/data:/root/.local/share/ov/data:rw \ -v ~/docker/isaac-sim/documents:/root/Documents:rw \ nvcr.io/nvidia/isaac-sim:5.0.0 使用 -e "ACCEPT_EULA=Y" 标志即表示您接受 NVIDIA Omniverse 许可协议中规定的镜像许可协议。 使用 -e "PRIVACY_CONSENT=Y" 标志即表示您同意数据收集与使用协议中的条款。不设置此标志即可选择退出数据收集。 使用-e "PRIVACY_USERID=\<emai\l>" 标志可选择性地设置用于标记会话日志。 最后以原生直播模式启动 isaac sim ./runheadless.sh -v 运行直播客户端前,必须确保 Isaac Sim 应用已加载就绪。Isaac Sim 可能需要几分钟才能完全加载。-v 标志用于在着色器缓存预热时显示额外日志。要确认这一点,请留意控制台或日志中的这一行:Isaac Sim Full Streaming App is loaded。 首次加载 Isaac Sim 时,着色器缓存需要较长时间。后续运行 Isaac Sim 会更快,因为着色器已被缓存,且容器运行时缓存会被挂载。 (5)安装isaac sim WebRTC流媒体客户端 见后续章节。输入运行 Isaac Sim 容器的机器或实例的 IP 地址,点击连接按钮开始实时流传输。 云部署 Isaac Sim以容器形式提供,可在本地运行,也可在配备NVIDIA RTX的亚马逊云服务、微软、谷歌云平台、腾讯云或阿里云上运行,并支持将应用程序直接流式传输到您的桌面。这种基于云的交付方式能为任何桌面系统提供最新的RTX图像处理能力与性能,无需本地配置NVIDIA RTX GPU。 根据选择的云服务提供商,提供以下可选方案。 NVIDIA Brev:NVIDIA Brev Instructions AWS:Amazon Web Instructions Azure:Microsoft Cloud Instructions GCP:Google Cloud Instructions Tencent:Tencent Cloud Instructions Alibaba:Alibaba Cloud Instructions Volcano Engine:Volcano Engine Instructions Remote:Remote Workstation Instructions 上述链接提供了云端部署指南,包含通过 SSIsaac Automator 是一款高级工具,可帮助将自定义 Isaac Sim 部署自动化至公有云平台。该工具支持通过 SSH、基于网页的 VNC 客户端以及远程桌面客户端访问 Isaac Sim 实例,兼容 AWS、Azure、GCP 和阿里云等主流云服务商。H 和远程桌面客户端访问实例的操作说明。 直播客户端 本节将介绍如何以无界面模式直播运行 Isaac Sim 实例。需要注意的是每个 Isaac Sim 实例同一时间只能采用一种直播方式。同一时间仅允许一个客户端访问单个 Isaac Sim 实例。要远程退出 Isaac Sim 应用程序:点击文件菜单,然后在流式传输的 Isaac Sim 应用中选择退出。接着关闭 Isaac Sim WebRTC 流媒体客户端应用。当 Isaac Sim 运行在 A100 GPU 上时不支持直播功能。直播需要 NVENC(NVIDIA 编码器)支持,而 A100 GPU 不包含该编码器。 Isaac Sim WebRTC流媒体客户端是推荐的远程查看工具,可让您在桌面或工作站以无需配置高性能GPU可以查看Isaac Sim画面。 (1)服务端的启动 要使用Isaac Sim WebRTC流媒体客户端,需要先在远程运行isaac Sim。 linux:cd ~/isaacsim && ./isaac-sim.streaming.sh windows:双击isaac-sim.streaming.bat Docker:./runheadless.sh PIP:isaacsim isaacsim.exp.full.streaming --no-window Python sample:./python.sh standalone_examples/api/isaacsim.simulation_app/livestream.py 要可以通过互联网连接远程实例运行isaac sim,需要添加以下标志:--/app/livestream/publicEndpointAddress= --/app/livestream/port=49100。如在docker容器示例中: PUBLIC_IP=$(curl -s ifconfig.me) && ./runheadless.sh --/app/livestream/publicEndpointAddress=$PUBLIC_IP --/app/livestream/port=49100 然后在 Isaac Sim WebRTC 流媒体客户端应用中使用相同的公共 IP。运行 Isaac Sim 的主机必须开放UDP port 47998和TCP port 49100。 确保 Isaac Sim 应用已加载就绪。首次启动时,Isaac Sim 可能需要数分钟才能完全加载完成。 为确认加载状态,请在终端/控制台输出或应用日志中查找以下信息。使用 PIP 或 Python Sample 运行时可能不会显示该行信息。Isaac Sim Full Streaming App is loaded. (2)客户端的启动 请根据您的平台,从最新发布版块下载 Isaac Sim WebRTC 流媒体客户端。 运行 Isaac Sim WebRTC 流媒体客户端应用程序。 使用默认的 127.0.0.1 IP 地址作为服务器,连接到本地 Isaac Sim 实例。点击"连接"。连接过程可能需要一些时间。连接成功后,您将在客户端窗口中看到 Isaac Sim 界面。 需要注意的是建议在与 Isaac Sim 无头实例相同的网络中使用 WebRTC 流媒体客户端。连接到同一网络中无头模式的 Isaac Sim 实例时,请将 127.0.0.1 替换为运行 Isaac Sim 的计算机 IP 地址。 linux系统:在终端中运行 chmod +x xx.AppImage 命令,使应用程序获得可执行权限。双击 AppImage 文件即可运行 Isaac Sim WebRTC 流媒体客户端。重要提示:在 Ubuntu 22.04 或更高版本上运行需安装 libfuse2。具体安装方法请参阅《安装 FUSE 2》指南。 windows:若在连接本地或远程 Isaac Sim 实例时遇到问题,请确保 Windows 防火墙允许列表中已添加/kit/kit.exe 及 Isaac Sim WebRTC 流媒体客户端应用。 macbook:打开 DMG 文件后,点击并拖拽 Isaac Sim WebRTC 流媒体客户端应用程序至"应用程序"文件夹图标完成安装。 要重新加载连接,请在视图菜单中点击“重新加载”。如果一段时间后出现空白屏幕,此操作可能会有所帮助。 Python环境安装 主要是在python虚拟环境中通过PIP安装Isaac Sim和使用isaac Sim默认python环境。这里就不展开了,具体参考链接:python环境安装 Isaac Sim 资源库 isaac Sim提供多种资源与机器人模型,助您构建虚拟世界。部分资源专为isaac Sim及机器人应用打造,另一些则适用于其他基于NVIDIA Omniverse的应用程序。默认提供的资源均可在Window > Browsers选项卡中找到。 内容浏览器集中管理所有isaac Sim资源与文件,包含下来全部资源清单,以及URDF文件、配置文件、策略二进制等。Window > Browsers > Content。 isaac Sim最新版本提供示例资源包可供下载。使用这些资源时,需将文件下载至本地磁盘或Nucleus服务器。下文中所有资源路径均默认相对于 persistent.isaac.asset_root.default 设置中的默认资源根目录。详见本地资源包章节。 首次加载资源时耗时较长:机器人模型可能需要数分钟加载,大型环境场景的加载时间可能长达十分钟以上 资源分类如下: 机器人资产 相机与深度传感器 非视觉传感器 道具 环境 精选资源 Neural Volume渲染 机器人 NVIDIA Isaac Sim 支持多种具有不同底盘、外形和功能的机器人。这些机器人可分为轮式机器人、全向移动机器人、四足机器人、机械臂和空中机器人(无人机),它们位于内容浏览器的 Isaac Sim/Robots 文件夹中。 (1)轮式机器人 Limo是NVIDIA Isaac Sim 支持集成 ROS 系统的 AgileX Limo 差速驱动底盘机器人。Robots/AgilexRobotics/Limo/limo.usd NVIDIA 卡特机器人专为导航相关应用提供差速移动底盘。新一代 Nova 卡特机器人基于 Nova Orin 计算与传感器平台打造。 NVIDIA Isaac Sim 支持 Clearpath 移动机器人,包括 Dingo 和 Jackal。Clearpath 机器人位于 Robots/Clearpath 中。 Evobot 是一款采用两轮驱动的自平衡机器人,专为抓取和运输物体设计。该机器人由德国多特蒙德弗劳恩霍夫研究所开发。 Forklift 叉车模型采用单枢轴轮和滚轮设计,通过连接至关节动作的棱柱关节来控制升降操作。 JetBot是开源 NVIDIA JetBot 人工智能机器人平台为创客、学生和爱好者提供了构建创意趣味 AI 应用所需的一切。 Idealworks iw.hub 是一款配备激光雷达和摄像头的移动底盘,搭载 NVIDIA AGX GPU 实现自主导航。该平台负载能力达 1000 公斤,最高行驶速度 2.2 米/秒。 iRobot 公司推出的 Create3 机器人是一款先进的差速驱动机器人,专为多种教育应用场景设计。其圆形底盘集成了多种传感器和先进控制功能,特别适合室内导航、环境建图等任务。NVIDIA Isaac Sim 中的 Create3 机器人配备了差速驱动系统和各类传感器,可实现高度逼真的仿真效果。Create 3 机器人可在 Robots/iRobot/Create3/create_3.usd - 基础版本中找到。已配置移动底盘物理系统。更多信息参阅 iRobot Create 3 。 Leatherback 是 NVIDIA 用于自动驾驶的研究平台。皮背甲机器人位于 Robots/NVIDIA/Leatherback/leatherback.usd (2)全向移动机器人 Kaya机器人是一个展示 Isaac 机器人引擎在 NVIDIA Jetson Nano™平台上运行能力与灵活性的演示平台。该平台采用 3D 打印部件和爱好者级组件设计,力求实现最大程度的可及性,并配备三轮全向驱动系统,使其能够朝任意方向移动。 Robots/NVIDIA/Kaya/kaya.usd :基础版本 Robots/NVIDIA/Kaya/kaya_ogn_gamepad.usd:基础版本,外加使用全向控制器实现的游戏手柄操控功能。 O3dyn 是由多特蒙德弗劳恩霍夫研究所开发的自主全向运输机器人。凭借其全向轮,该机器人可实现任意方向移动,并通过四个杠杆抓取托盘,进而抬升托盘进行运输。 Robots/Fraunhofer/O3dyn/o3dyn.usd: 基础版本。包含移动底盘、夹具与升降机构的物理绑定,以及传感器定位功能。 Robots/Fraunhofer/O3dyn/o3dyn_controller.usd :基础版本,外加使用全向控制器实现的游戏手柄操控功能。 (3)四足机器人 Ant蚂蚁是一种基础四足机器人,腿部采用旋转关节设计,其原型源自 OpenAI Gym 中的 Ant 机器人。 Robots/IsaacSim/Ant/ant.usd :基础版本。可通过菜单栏中 Create>Robots>Ant 选项创建。 Robots/IsaacSim/Ant/ant_instanceable.usd :可实例化版本,专为强化学习场景配置以创建多个高效克隆体。 ANYmal 机器人是由 ANYbotics 开发的自主四足机器人。Isaac Sim 支持 B、C、D 三种型号。 Boston Dynamics Spot 波士顿动力 Spot 机器人位于 Robots/BostonDynamics/spot Unitree Quadruped Robots 宇树四足机器人A1、B2、Go1 和 Go2 是 Unitree Robotics 研发的四足机器人,在 Isaac Sim 中进行仿真。四足示例中使用的是 A1 型号。 关于四组机器人控制示例,请参阅:Isaac Sim 中的强化学习策略示例 (4)机械臂机器人 Denso Cobotta 包含以下 Denso 型号:Cobotta Pro 900、Cobotta Pro 1300。位于 Robots/Denso/Cobotta。 Fanuc CRX10iA/L 是一款 6 轴机器人,有效载荷为 10 公斤。 Festo费斯托协作机器人是一款六轴气动机械臂。 Flexiv Rizon 4 是一款 7 轴自适应机械臂。 RobotStudio是lerobot机械臂,包含以下 RobotStudio 模型:SO-100、SO-101。RobotStudio 机器人位于 Robots/RobotStudio。 太多了,只列出部分。 (5)空中机器人 Crazyflie 2.X 微型四轴飞行器机器人 Ingenuity火星直升机"机智号" (6)人形机器人 Fourier Intelligence GR1 傅里叶智能 GR1 Unitree Humanoids 宇树人形机器人,位于 Robots/Unitree Xiao Peng PX5 小鹏 PX5机器人位于 Robots/XiaoPeng/PX5 (7)移动式机械臂 Clearpath Ridgeback提供两种 Clearpath Ridgeback 型号配置:一种配备 Emika Franka Panda 机械臂,另一种配备 Universal Robots UR5 机械臂。位于 Robots>Clearpath 目录下。 Boston Dynamics Spot 波士顿动力 Spot 机器人,带机械臂的 Spot 机器人位于 Robots>BostonDynamics>spot 路径下 相机与深度传感器 Isaac Sim 支持相机和深度传感器,其数字孪生体可在内容浏览器中找到,位于 Isaac Sim/Sensors 目录下,并按制造商分类存放于子文件夹中。这里就不过多阐述。 非视觉传感器 Isaac Sim 模拟了多种类型的非视觉传感器模型,其数字孪生体可在内容浏览器的 Isaac Sim/Sensors 路径下找到,并按制造商分类存放于子文件夹中。 部分非视觉传感器类型尚未提供数字孪生体。有关这些传感器的详细信息(包括如何通过图形界面创建它们),请点击下方链接: 接触传感器 惯性测量单元传感器 光束传感器 激光雷达 雷达 道具 道具主要是一些角色人物如警察、医生、工人,以及杂项资产。 环境资产 环境资产提供如网格、房间、仓库、医院、办公室、赛道、小型仓库数字孪生 (1)简单网格 这个简易环境包含一块带有网格纹理的平坦地面和围边。系统提供了三种配置:前两种为直角转角,第三种则为圆角设计。 (2)简单房间 包含一张桌子的简易房间 在内容浏览器中搜索 simple_room.usd 或通过创建菜单:Create>Environments>Simple Room (3)仓库 一个包含货架及可放置物品的仓库环境。提供四种配置方案: (4)医院 医院环境,包含多个房间和空间。 (5)办公室 一个办公环境,包含多个房间和开放式平面布局。 (6)赛道 地面上勾勒出的 Jetracer 赛道轮廓。 (7)小型仓库数字孪生 一个小型仓库的数字孪生,可以使用。 精选资产 Nova Carter 搭载 Nova Orin™传感器与计算架构,是一套完整的机器人开发平台,可加速新一代自主移动机器人(AMR)的开发和部署。 Nova Carter 目前作为 Isaac AMR 和 Isaac ROS 软件的双重参考平台,支持真实场景与仿真环境下的开发工作。用户可通过赛格威机器人公司购买 Nova Carter 机器人。 关于功能完整的 Nova Carter Isaac Sim 资产详情,请参阅Nova Carter 文档页面。 注意Nova Carter 机器人在首次加载时可能需要数分钟时间。 Neural Volume渲染 NuRec(神经重建)技术能够利用源自真实世界图像的神经体积数据,在 Omniverse 中进行场景渲染。这些基于 3D 高斯模型的场景可作为标准 USD 资产加载至 Isaac Sim,用于可视化与仿真。 有关 NuRec 在 Omniverse 中的详细工作原理(包括数据准备、渲染设置及已知限制),请参阅NuRec 文档。要生成兼容场景,可使用开源项目3DGruT——该项目提供从图像集合训练 3D 高斯模型的工具,并能导出适用于 Omniverse 应用的 USDZ 格式数据。 示例展示了如何将 NuRec 场景加载到 Isaac Sim 中并运行模拟。该代码片段遍历提供的示例,首先加载指定的舞台,随后加载 carter 导航资源并设置起始位置。接着检查是否需要在生成位置创建碰撞地平面,若需要,则创建一个应用了碰撞 API 的平面基元。然后设置 carter 导航目标基元位置,并运行指定步数的模拟。在模拟过程中,轮式机器人将朝目标位置行进。 import asyncio import os import omni.kit.commands import omni.kit.app import omni.usd import omni.timeline from isaacsim.storage.native import get_assets_root_path_async from isaacsim.core.utils.stage import add_reference_to_stage from pxr import PhysxSchema, UsdGeom, UsdPhysics # User path of the HF NuRec dataset USER_PATH = "/home/user/PhysicalAI-Robotics-NuRec" # Paths for loading and placing the Nova Carter navigation asset and its target. NOVA_CARTER_NAV_URL = "/Isaac/Samples/Replicator/OmniGraph/nova_carter_nav_only.usd" NOVA_CARTER_NAV_USD_PATH = "/World/NovaCarterNav" NOVA_CARTER_NAV_TARGET_PATH = f"{NOVA_CARTER_NAV_USD_PATH}/targetXform" # Scenarios for testing navigation in the environments EXAMPLE_CONFIGS = [ { "name": "Voyager Cafe", "stage_url": f"{USER_PATH}/nova_carter-cafe/stage.usdz", "nav_start_loc": (0, 0, 0), "nav_relative_target_loc": (-3, -1.5, 0), "create_collision_ground_plane": False, "num_simulation_steps": 500, }, { "name": "Galileo Lab", "stage_url": f"{USER_PATH}/nova_carter-galileo/stage.usdz", "nav_start_loc": (3.5, 2.5, 0), "nav_relative_target_loc": (4, 0, 0), "create_collision_ground_plane": False, "num_simulation_steps": 500, }, { "name": "Wormhole", "stage_url": f"{USER_PATH}/nova_carter-wormhole/stage.usdz", "nav_start_loc": (0, 0, 0), "nav_relative_target_loc": (5, 0, 0), "create_collision_ground_plane": False, "num_simulation_steps": 500, }, { "name": "ZH Lounge", "stage_url": f"{USER_PATH}/zh_lounge/usd/zh_lounge.usda", "nav_start_loc": (-1.5, -3, -1.6), "nav_relative_target_loc": (-0.5, 5, -1.6), "create_collision_ground_plane": True, "num_simulation_steps": 500, }, ] async def run_example_async(example_config): example_name = example_config.get("name") print(f"Running example: '{example_name}'") # Open the stage stage_url = example_config.get("stage_url") if not stage_url: print(f"Stage URL not provided, exiting") return if not os.path.exists(stage_url): print(f"Stage URL does not exist: '{stage_url}', exiting") return print(f"Opening stage: '{stage_url}'") await omni.usd.get_context().open_stage_async(stage_url) stage = omni.usd.get_context().get_stage() # Make sure the physics scene is set to synchronous for the navigation to work for prim in stage.Traverse(): if prim.IsA(UsdPhysics.Scene): physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim) physx_scene.GetUpdateTypeAttr().Set("Synchronous") break # Load the carter navigation asset assets_root_path = await get_assets_root_path_async() carter_nav_path = assets_root_path + NOVA_CARTER_NAV_URL print(f"Loading carter nova asset: '{carter_nav_path}'") carter_nav_prim = add_reference_to_stage(usd_path=carter_nav_path, prim_path=NOVA_CARTER_NAV_USD_PATH) # Set the carter navigation start location nav_start_loc = example_config.get("nav_start_loc") if not nav_start_loc: print(f"Navigation start location not provided, exiting") return print(f"Setting carter navigation start location to: {nav_start_loc}") if not carter_nav_prim.GetAttribute("xformOp:translate"): UsdGeom.Xformable(carter_nav_prim).AddTranslateOp() carter_nav_prim.GetAttribute("xformOp:translate").Set(nav_start_loc) # Check if a collision ground plane needs to be created at the spawn location if example_config.get("create_collision_ground_plane"): plane_path = "/World/CollisionPlane" print(f"Creating collision ground plane {plane_path} at {nav_start_loc}") omni.kit.commands.execute("CreateMeshPrimWithDefaultXform", prim_path=plane_path, prim_type="Plane") plane_prim = stage.GetPrimAtPath(plane_path) plane_prim.GetAttribute("xformOp:scale").Set((10, 10, 1)) plane_prim.GetAttribute("xformOp:translate").Set(nav_start_loc) if not plane_prim.HasAPI(UsdPhysics.CollisionAPI): collision_api = UsdPhysics.CollisionAPI.Apply(plane_prim) else: collision_api = UsdPhysics.CollisionAPI(plane_prim) collision_api.CreateCollisionEnabledAttr(True) plane_prim.GetAttribute("visibility").Set("invisible") # Set the carter navigation target prim location nav_relative_target_loc = example_config.get("nav_relative_target_loc") if not nav_relative_target_loc: print(f"Navigation relative target location not provided, exiting") return print(f"Setting carter navigation target location to: {nav_relative_target_loc}") carter_navigation_target_prim = stage.GetPrimAtPath(NOVA_CARTER_NAV_TARGET_PATH) if not carter_navigation_target_prim.IsValid(): print(f"Carter navigation target prim not found at path: '{NOVA_CARTER_NAV_TARGET_PATH}', exiting") return if not carter_navigation_target_prim.GetAttribute("xformOp:translate"): UsdGeom.Xformable(carter_navigation_target_prim).AddTranslateOp() carter_navigation_target_prim.GetAttribute("xformOp:translate").Set(nav_relative_target_loc) # Run the simulation for the given number of steps num_simulation_steps = example_config.get("num_simulation_steps") if not num_simulation_steps: print(f"Number of simulation steps not provided, exiting") return print(f"Running {num_simulation_steps} simulation steps") timeline = omni.timeline.get_timeline_interface() timeline.play() for i in range(num_simulation_steps): if i % 10 == 0: print(f"Step {i}, time: {timeline.get_current_time():.4f}") await omni.kit.app.get_app().next_update_async() print(f"Simulation complete, pausing timeline") timeline.pause() async def run_examples_async(): for example_config in EXAMPLE_CONFIGS: await run_example_async(example_config) asyncio.ensure_future(run_examples_async()) 从 Hugging Face 下载 NVIDIA NuRec 数据集。更新脚本中的 USER_PATH 变量: USER_PATH = "/home/user/PhysicalAI-Robotics-NuRec"。 本文主要来自Isaac Sim Documentation V5.0.0的翻译。 -
NVIDIA Jetson平台简介:机器人和边缘AI
简介 NVIDIA Jetson平台提供用于开发和部署AI赋能机器人、无人机、IVA(Intelligent Video Analytics,智能视频)应用和自主机器的工具。在边缘生成式AI、NVIDIA Metropolis和Isaac平台支持下,Jetson提供可拓展得软件、现代AI堆栈、灵活的微服务和API、生成就绪型ROS软件包以及特定于应用程序的AI工作流。 Jetson 硬件Roadmap,分为商业方向和工业方向。 上图是商业硬件roadmap,主要分为orin(欧林)和thor(雷神)两个系列。 而工业方向roadmap主要是entry、mainstream、perfromance三个方向。 在软件方面,jetson提供JetPack软件包,截止目前最新的发布版本是JetPack 7.0,主要是基于ubuntu 24.04,集成了CUDA 13.0以及holoscan sensor bride支持。 硬件 NVIDIA Jetson 模组可提供适合各种性能水平和价位的加速计算功能,从而能够满足各种自主应用的需求。从制造业到建筑业,从医疗健康到物流行业,Jetson 平台都能提供出色的性能、卓越的能效和无比轻松的开发体验。下面是jetson系列提供的模组规格简要对比。 上面是NVIDIAjetson系列的模组从最小0.5TFLOPS算力到最大2070 TOPS算力的平台矩阵。 Nano:四核A57@1.43G CPU+128核Maxwell架构GPU+4GB LPDDR内存,可提供472 FGLOP的AI算力;并行运行多个神经网络并同时处理多个高分辨率传感器,其功耗仅需5~10W;应用在网络硬盘录像机(NVR)、家用机器人以及具备全面分析功能的智能网关上面。 TX2:双核NVIDIA Denver™@1.95G+四核Arm® Cortex®-A57@1.92G CPU+256核 Pascal GPU+4GB/8GB LPDDR内存,可提供1.3TFLOPS的AI算力;计算性能翻倍,功耗仅7.5W。可应用在工厂机器人、商用无人机、便携式医疗设备和企业协作设备中。 Xavier NX:6核NVIDIA Carmel ARM®v8.2 64 位 CPU + 48 个 Tensor Core 的 384 核 NVIDIA Volta™ GPU+8/16GB LPDDR4x内存;可提供14TOPS+功耗10W~21TOPS+功耗20W的AI算力;应用在 适用于无人机、便携式医疗设备、小型商业机器人、智能摄像头、高分辨率传感器、自动光学检测、智能工厂和其他 IoT 嵌入式系统等高性能 AI 系统。 AGX Xavier:8 核 NVIDIA Carmel Armv8.2 64 位 CPU+512 个 NVIDIA CUDA Core 和 64 个 Tensor Core Volta 架构GPU+32/64GB内存,提供32TOPS的AI算力,功耗在10W~40W;非常适用于配送和物流机器人、工厂系统和大型工业UAV等自主机器。 Orin Nano、NX、AGX:6\~12核 Arm® Cortex® A78AE v8.2@1.7G\~2.2G +(32\~64)x (1024\~2048)核Ampere 架构 GPU+4G\~64G LPDDR5内存;功耗满足7\~60W,提供算力34 TOPS\~275 TOPS的AI算力;Orin系列是包含7个相同架构的模组其性能是上一代AI推理的8倍并支持高速接口;强大的软件堆栈包含预训练的 AI 模型、参考 AI 工作流和垂直应用框架,可加速生成性 AI 的端到端开发,以及边缘 AI 和机器人应用。 Thor:12\~14核 Arm® Neoverse®-V3AE 64 位 CPU@2.6G+(64\~96)X(1536\~2560)核Blackwell 架构 GPU+128G LPDDR5X内存,提供1200~2070TFLOPS(FP4)算力;功耗在40~75W,与AGX Orin相比,Jetson Thor 系列模组的 AI 计算性能提高至 7.5 倍以上,能效提高至 3.5 倍。应用在人形机器人、空间智能、多传感器处理、生成式AI等多个场景。 软件 NVIDIA Jetson软件是永远边缘构建、部署和扩展人形机器人及生成式AI应用的旗舰平台。它支持全系列Jetson模块,为从原型开发到量产提供统一且可扩展的基础。NVIDIA JetPack SDK赋能实时传感处理、多摄像头追踪,以及如操作和导航等先进机器人功能,集成于强大的AI生态之中。开发者可借助诸如NVIDIA Holoscan(传感器流式处理)和Metropolis VSS(视频分析)等集成框架。通过NVIDIA Isaac机器人工作流程,包括想NVIDIA GROOT N1等基础生成式AI模型,Jetson软件为机器人实现快速、精准、变革性的AI赋能和规模化部署提供支持。 JetPack SDK:是一套完整的软件套件,用于NVIDIA Jetson平台上开发和部署AI驱动的边缘应用。 Holoscan传感器桥接器:将边缘传感器连接到AI工作流,以实现实时、高性能的传感器数据处理。 Jetson AI Lab:由NVIDIA工具和社区项目提供支持,激发机器人和生成式AI领域的创新和动手探索。 NVIDIA Isaac:提供NVIDIA CUDA加速库、框架和AI模型,用一个构建自主机器人,包括ARM、机械臂和人形机器人。 NVIDIA Metropolis:为智慧城市、工业和零售业开发和部署视觉AI应用,并在边缘进行实时视频分析。 JetPack SDK JetPack是NVIDIA Jetson平台官方软件套件,涵盖丰富的工具和库,可用于大招AI赋能边缘应用。目前最新的版本是JetPack7,采用Linux kernel 6.8及ubuntu 24.04 LTS,模块化云原生架构,结合最新的NVIDIA计算堆栈,无缝衔接NVIDIA AI工作流。 JetPack组件由AI计算堆栈、AI框架、Linux组件几个部分组成。 AI计算堆栈:由CUDA、cuDNN、TensorRT组成;用于提供硬件GPU的加速底层接口;CUDA提供NVIDIA GPU 上编写和运行通用计算程序的能力;cuDNN在CUDA之上的深度神经网络算子库,提供高度优化的深度学习核心算子(卷积、池化、激活函数、RNN/LSTM、注意力等)。Pytorch/TensorFlow调用cuDNN中的Conv2d、RNN等。TensorRT推理优化器,只负责推理不负责训练,把训练好的模型转换成高效的 GPU 可执行引擎底层依赖CUDA/cuDNN。与pytorch、TensorFlow不同其即是训练+推理框架。但相对pytorch、TensorFlow的推理,TensorRT性能效率更高。 AI框架:由pytorch、vLLM、SGLang、Triton推理服务器等部分组成。vLLM是便捷、快速的大型语言模型推理与服务库,SGLang 是专为大语言模型及视觉语言模型打造的高效推理框架。 Linux组件:主要是基础系统组件,基于ubuntu系统构建,提供刷机、安全、OTA、图形库(OpenGL、Vulkan、EGL等)、多媒体API、计算机视觉库(OpenCV、VisionWorks)等。 其他组件:Jetson平台服务、云原生设计、Nsight开发工具组成;平台服务提供预构建和可定制的云原生软件服务;云原生设计师提供容器化开发、kubernetes和微服务;Nsight提供强大的分析、调试、性能分析功能,在AI、图形和计算工作负载中优化GPU加速应用。 在JetPack SDK上各种应用SDK,如提供NVIDIA DeepStream SDK、NVIDIA Isaac ROS、NVIDIA Holoscan SDK。 Metropolis VIDIA Metropolis 是一个视觉 AI 应用平台和合作伙伴生态系统,可简化从边缘到云端的视觉 AI 智能体的开发、部署和可扩展性。可以做自动化视觉检查、智能交通系统、工业自动化、智能零售商店等等。 模型:可访问各种先进的AI模型,构建视觉AI应用,支持VLM等;提供TAO工具套件。对模型训练、适应和优化上手简单,不需要专业的AI知识或大型训练数据集,使用自己的数据微调即可完成。 工具:提供AI智能体Blueprints,借助大模型构建智能体,分析、解释和处理大量视频数据,以提供关键见解,帮助各行各业优化流程、提高安全性并降低成本。提供NVIDIA NIM一套易于使用的推理微服务,NIM 支持各种 AI 模型 (包括基础模型、LLM、VLM 等) ,可确保使用行业标准 API 在本地或云端进行无缝、可扩展的 AI 推理。提供DeepStream SDK,是基于 GStreamer 的完整流分析工具包。 数据:Omniverse集成 OpenUSDNVIDIA RTX™ 渲染技术,以及 物理 AI 集成到现有软件工具和仿真工作流中进行开发和测试 。NVIDIA Cosmos™ 是一个先进的生成式AI平台世界基础模型( WFM) 、高级标记器、护栏以及加速数据处理和管护流程,旨在加速物理 AI系统。NVIDIA Isaac SIM开发者在物理精准的虚拟环境中生成合成图像和视频数据,以训练自定义视觉AI模型。 Isaac NVIDIA Isaac AI机器人开发平台有NVIDIA CUDA加速库、应用框架和AI模型组成,可加速自主移动机器人、手臂和操作器以及人形机器人等。 NVIDIA Robotis提供了全栈、加速库和优化的AI模型,能够高效开发、训练、仿真、部署机器人系统。 Isaac ROS:机器人操作系统,是基于开源ROS2构建,包含了NVIDIA CUDA加速计算包的集合,便于简化和加速高级AI机器人应用开发。 Isaac Manipulator:基于Isaac ROS构建,支持开发AI驱动的机械臂,这些机械臂可以无缝感知和理解环境并与环境进行交互。 Isaac Perceptor:基于Isaac ROS构建,支持开发先进的自主移动机器人,能够在仓库货工厂等非结构化环境中进行感知和定位。 Isaac GR00T:用于通用机器人基础模型和数据流水线,以加速人形机器人的开发。 还提供了基于物理的虚拟环境中设计、仿真、测试和训练的框架。NVIDIA Isaac Sim和NVIDIA Isaac Lab。 NVIDIA Isaac Sim:是一款基于 NVIDIA Omniverse 构建的开源参考应用,使开发者能够在基于物理的虚拟环境中模拟和测试 AI 驱动的机器人开发解决方案。 NVIDIA Isaac Lab:Isaac Lab 基于 NVIDIA Isaac Sim™ 开发,使用 NVIDIA®PhysX® 以及基于物理性质的 NVIDIA RTX™ 渲染提供高保真物理仿真。 Holoscan SDK NVIDIA Holoscan 将传感器数据传输到 GPU 进行实时推理,从而加速边缘 AI 开发。 Holoscan 传感器桥接器:可在高吞吐量传感器数据与 GPU 之间提供关键链接,从而无缝集成异构传感器数据。它可标准化并管理从各种传感器接口 (如摄像头输入、超声波或内窥镜视频) 中的数据提取,确保以低延迟、同步和可靠的方式传输数据,从而实现实时 AI 处理。 NVIDIA IGX ORIN:是一个结合了企业级硬件、软件和支持的工业级平台,可在生产就绪型硬件上进行部署。虽然 Holoscan SDK 可在您的目标设备上灵活部署,但 IGX 使公司能够专注于应用开发,并更快地实现 AI 的优势。 Jetson AI Lab Jetson AI Lab 由 NVIDIA 工具和社区项目提供支持,其提供了各种大模型的快速部署示例,如大语言模型LLM/SLM、视觉语言大模型VLM、Web UI等等。 如上图,如果要运行一个模型,就可以通过如上图示例直接获取到运行命令,还可以调整参数。更详细的教程参考:https://www.jetson-ai-lab.com/tutorial-intro.html 参考:https://www.nvidia.cn/autonomous-machines/ -
Jetson nano平台随记
环境准备 烧录镜像 下载NVIDIA jetson nano镜像,其镜像是基于ubuntu18.04修改。使用开源的balenaEtcher烧录器写到SD卡上,然后插卡启动 网络准备 买一个无线网卡然后安装好驱动配置好wifi连接。 远程访问 方法1: 在nano上安装xrdp的方式,window就可以远程桌面访问。 方法2:在nano上安装VNC,远程访问设备需要下载VNC客户端,支持ubuntu系统。 pyhton独立环境 类型conda activate的环境 sudo apt-get install python3-pip pip3 install virturalenv 创建一个环境 python3 -m virtualenv -p python3 env --system-site-packages 激活环境 source env/bin/activate 图像和视频 主要是https://github.com/thehapyone/NanoCamera 安装opencv 创建一个swap空间,否则内存可能不够。 在安装opencv前,还要准备一下环境 使用wget下载opencv的包。 wget -O opencv_contrib.zip https://github.com/openc/opencv_corntrib/archive/4.5.1.1.zip 使用cmake进行编译。 使用jtop可以查看系统统计信息,前提是要按照pip install -U jeston-stats 硬件接上CSI的摄像头,接上之后可以在/dev/videox 看到节点。可以使用下面的命令测试就可以看到图像。 nvgstcaptrue-1.0 --orientation=2 --cap-dev-node=1指定节点如/dev/video1 读取显示 import cv2 img = cv2.imread('/assets/a.jpg') cv2.imshow("Output",img) cv2.waitkey(0) 对于平台CSI摄像头需要import nanocamera import nanocamera as nano camera = nano.Camera(flip=2, width=640,height=480,fps=30) -
从零实现 Transformer:中英文翻译实例
概述 在http://www.laumy.tech/2458.html#h37章节中,介绍了transformer的原理,本章用pytorch来实现一个将"我有一个苹果"翻译为英文"I have an apple"的模型,直观体会transformer原理实现。 接下来先上图看看整体的代码流程。 推理 训练 模型 编解码器 到这里就涵盖了整个transformer模型翻译的例子了,下面的章节只是对图中的代码进行展开说明,如果不想陷入细节,可以直接跳转到最后一节获取源码运行实验一下。 数据预处理 数据准备 (1) 准备原始文本对 既然要做翻译那得先有数据用于模型训练,因此需要先准备原始的中文->英文的文本对,下面是使用python列表(List)准备中英匹配语料,List中包含的是元组(Tuple)。 pairs = [ ("我 有 一个 苹果", "i have an apple"), ("我 有 一本 书", "i have a book"), ("你 有 一个 苹果", "you have an apple"), ("他 有 一个 苹果", "he has an apple"), ("她 有 一个 苹果", "she has an apple"), ("我们 有 一个 苹果", "we have an apple"), ("我 喜欢 苹果", "i like apples"), ("我 吃 苹果", "i eat apples"), ("你 喜欢 书", "you like books"), ("我 喜欢 书", "i like books"), ("我 有 两个 苹果", "i have two apples"), ("我 有 红色 苹果", "i have red apples"), ] 为了方便,在构建原始文本对时,中英文的分词就以空格划分,这样接下来就可以根据空格来进行构建词表。 (2)构建词表 因为神经网络不能直接处理文本,模型只能处理数字,比如不能直接处理"我"、"有","I"等中英文词,对于计算机来讲都是数字,所以需要把文字转换为对应的映射表。 所以词表就是一个"字典",把每个词映射到一个唯一的数字ID上,所有的文本都需要转换为数字序列。 如下示例,中英文的编号。 # 中文词表示例 SRC_STOI = { "我": 1, "有": 2, "一个": 3, "苹果": 4, "书": 5, "喜欢": 6, # ... 更多词 } # 英文词表示例 TGT_STOI = { "i": 1, "have": 2, "an": 3, "apple": 4, "a": 5, "book": 6, # ... 更多词 } 如何构建词表了。既然中文、英文都需要各自编号,那么得先把此前准备的原始文本队中文、英文各自拆出来,然后我们使用python的set集合,将中文、英文分别添加到set集合中,使用set集合的好处是可以自动去重,添加了重复元素,set就不会添加,这样就得到了各自的中文、英文词表。最后再对这些词表进行依次编号即可。 下面就看看使用python代码怎么实现,首先是将原始文本对拆解,把中文放一起,英文放一起。 src_texts = [p[0] for p in pairs] tgt_texts = [p[1] for p in pairs] print(src_texts) print(tgt_texts) src_texts ['我 有 一个 苹果', '我 有 一本 书', '你 有 一个 苹果', '他 有 一个 苹果', '她 有 一个 苹果', '我们 有 一个 苹果', '我 喜欢 苹果', '我 吃 苹果', '你 喜欢 书', '我 喜欢 书', '我 有 两个 苹果', '我 有 红色 苹果'] tgt_texts ['i have an apple', 'i have a book', 'you have an apple', 'he has an apple', 'she has an apple', 'we have an apple', 'i like apples', 'i eat apples', 'you like books', 'i like books', 'i have two apples', 'i have red apples'] 接下来实现一个build_vocab函数,主要的思路就是句子先按照空格进行分好词,接着将所有词添加到set集合中,set集合会自动去重,这里需要注意的时,需要再加上3个特殊的词,分别是pad、bos、eos分别表示填充、开始、结束。填充是因为输入句子是不定长的,但是对于transformer来说所有的输入矩阵处理都是固定长度,所以不够的需要补齐,而bos和eos是用于transformer解码的,便于开始和结束翻译过程,最后构建好词表后就按照词表中进行变化,3个特殊词分为为1、2、3其他的词依次编号。 def build_vocab(examples: List[str]): """构建词表(字符串→索引 与 索引→字符串) - 输入示例为用空格分词后的句子列表 - 加入特殊符号 `<pad>`, `<bos>`, `<eos>` 并将其它 token 排序,保证可复现 返回: stoi: dict[token->id] itos: List[id->token] """ tokens = set() # 建立一个集合,用于存储所有的词表(不重复的词) for s in examples: # 依次遍历获得每个句子 for t in s.split(): # 通过空格划分,依次遍历句子中的每个词, tokens.add(t.lower()) # 将词添加到set中,这里为了方便统一转换小写 itos = ["<pad>", "<bos>", "<eos>"] + sorted(tokens) # 加入3个特殊的词,同时对set中的词进行排序。 stoi = {t: i for i, t in enumerate(itos)} # 对词表中的词按照顺序依次编号 return stoi, itos SRC_STOI, SRC_ITOS = build_vocab(src_texts) TGT_STOI, TGT_ITOS = build_vocab(tgt_texts) build_vocab最终返回是一个字典和列表,字典是词:编号的映射,列表是存放的是词表。列表是按照编号顺序依次排布,这样我们可以通过编号定位到时那个词。 为什么要一个字典和列表了?因为transformer输入是词->编号(转换为编码数字给计算机处理),输出是编号->词过程(转化为句子给人看)。通过字典我们可以查询词对应的编号[key:value],而通过列表的索引(编号)我们可以查询到对应的词。 中文和英文分别各自对应一个字典和词表。 SRC_STOI {'<pad>': 0, '<bos>': 1, '<eos>': 2, '一个': 3, '一本': 4, '两个': 5, '书': 6, '他': 7, '你': 8, '吃': 9, '喜欢': 10, '她': 11, '我': 12, '我们': 13, '有': 14, '红色': 15, '苹果': 16} SRC_ITOS ['<pad>', '<bos>', '<eos>', '一个', '一本', '两个', '书', '他', '你', '吃', '喜欢', '她', '我', '我们', '有', '红色', '苹果'] TGT_STOI {'<pad>': 0, '<bos>': 1, '<eos>': 2, 'a': 3, 'an': 4, 'apple': 5, 'apples': 6, 'book': 7, 'books': 8, 'eat': 9, 'has': 10, 'have': 11, 'he': 12, 'i': 13, 'like': 14, 'red': 15, 'she': 16, 'two': 17, 'we': 18, 'you': 19} TGT_ITOS ['<pad>', '<bos>', '<eos>', 'a', 'an', 'apple', 'apples', 'book', 'books', 'eat', 'has', 'have', 'he', 'i', 'like', 'red', 'she', 'two', 'we', 'you'] 这样我们就给中文和英文的所有词都编好号了,同时通过列表也可以通过编号查询到词。 数据加载器 在pytorch中模型训练那必然少不了DataLoader和Dataset,关于这两个类的介绍在http://www.laumy.tech/2491.html#h23中有简要说明,这里就不阐述了。注意本小节说明的数据的批量处理都适用于训练准备,主要是实现Dataset和Dataloader用于pytorch模型的训练,如果只是推理则是不需要的。 (1)Dataset继承类实现 首先要实现DataLoader中关键的输入类Dataset继承类,用于产出“单个样本”,怎么按索引取到一个样本,以及总共有多少个样本。每个样本是中文句子->英文句子。样本集为此前定义pairs,但是要把pairs中句子转换为编号,词表在前面我们已经构建好了,直接查询就行,那这里我们定义一个Example用于定义样本,src是中文句子的编号列表,tgt是对于英文句子的编号列表。 @dataclass class Example: """单条并行样本 - src: 源语言索引序列(不含 BOS/EOS) - tgt: 目标语言索引序列(含 BOS/EOS) """ src: List[int] tgt: List[int] 接下来就是实现Dataset的继承类ToyDataset,返回有多少个样本,以及通过编号获取指定的样本。 class ToyDataset(Dataset): """语料数据集,用于快速过拟合演示。""" def __init__(self, pairs: List[Tuple[str, str]]): self.data = [Example(encode_src(s), encode_tgt(t)) for s, t in pairs] def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx] 需要把pairs句子中词列表编码为数字列表,这里实现encode_src用于将输入(即pairs中的中文)编号为列表,再实现encode_tgt将输出(即pairs中的英文)编号为列表。使用for列表推导式从pairs列表中获取到s(中文句子)和t(英文句子)然后传入encode_src和encoder_tgt进而构建一个新的列表元素Example。这样就组建样本的self.data的样本列表,元素为Example类型,可以通过idx获取到指定的样本。 def encode_src(s: str) -> List[int]: """将原语句(已空格分词)编码为索引序列(不含 BOS/EOS)。""" return [SRC_STOI[w.lower()] for w in s.split()] def encode_tgt(s: str) -> List[int]: """将目标语句编码为索引序列,并在首尾添加 BOS/EOS。""" return [BOS_IDX] + [TGT_STOI[w.lower()] for w in s.split()] + [EOS_IDX] 上面就是输入句子编码为编号向量的实现了,也很简单,通过此前构建的词表字典,通过词就可以搜索到对应编号了。这里需要注意的是编码的源句子(输入)是没有包含BOS和EOS的,因为transformer的编码器不需要BOS和EOS,而编码的目标句子(输出)需要在句子前加上BOS,句子结尾加上EOS,因为transformer的解码器输入需要通过BOS来翻译第一个词,通过EOS来结束一个句子的翻译,要是不明白为什么了可以看看前面transformer原理的文章。 (2)Dataload DataLoader 负责“成批取样”,模型训练输入数据不是一个样本一个样本的送入训练,而是按照批次(多个样本合成一个批次)进行训练,这样训练效率才高。DataLoader决定批大小、是否打乱、多进程加载,返回的是一个可迭代的对象。 DataLoader重点是要实现 collate_fn回调,也就是怎么把一个批里的样本“拼起来”。 loader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn) 训练transformer,准备数据。我们的目的是要能够返回批量数据,批量数据也有好几个类型。 输入给encoder批量数据:输入矩阵类型(B,S),包含补齐的padding。 输入给decoder的批量数据:输入给decoder的矩阵类型(B,T),包含BOS以及右对齐的padding。不能加EOS,因为EOS是预测的结果,防止模型训练作弊。 decoder输出的批量数据:解码器的监督目标,主要用于预测数据与实际的结果比较计算损失,矩阵类型(B,T),不含BOS但是包含EOS。 encoder输入的pad掩码数据:因为输入给encoder的数据有padding,所以要告诉transformer哪些做了补齐,后续计算的时候要处理。 decoder输入的pad掩码数据:同上。 def collate_fn(batch: List[Example]): """将一个 batch 的样本对齐为等长张量,并构造 teacher forcing 所需的输入/输出。 返回: - src: (B,S) 源序列,已 padding - tgt_in: (B,T) 解码器输入(含 BOS,右对齐 padding) - tgt_out: (B,T) 解码器监督目标(对 tgt_in 右移一位,含 EOS) - src_pad_mask: (B,S) 源端 padding 掩码,True 表示 padding 位置 - tgt_pad_mask: (B,T) 目标端 padding 掩码(针对输入序列) """ # padding to max length in batch src_max = max(len(b.src) for b in batch) tgt_max = max(len(b.tgt) for b in batch) src_batch = [] tgt_in_batch = [] tgt_out_batch = [] for ex in batch: src = ex.src + [PAD_IDX] * (src_max - len(ex.src)) # Teacher forcing: shift-in, shift-out tgt_in = ex.tgt[:-1] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[:-1])) tgt_out = ex.tgt[1:] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[1:])) src_batch.append(src) tgt_in_batch.append(tgt_in) tgt_out_batch.append(tgt_out) src = torch.tensor(src_batch, dtype=torch.long) # (B, S) tgt_in = torch.tensor(tgt_in_batch, dtype=torch.long) # (B, T_in) tgt_out = torch.tensor(tgt_out_batch, dtype=torch.long) # (B, T_out) src_pad_mask = src.eq(PAD_IDX) # (B, S) tgt_pad_mask = tgt_in.eq(PAD_IDX) # (B, T) return src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask 上面就是Dataloader回调函数如何获取批量数据的实现了,输入为一个列表(包含所有样本的列表)。输出为5个2维向量,分别对应的就是上面说的5个批量数据。 首先计算样本列表中最长的源序列长度src_max和目标序列长度tgt_max,为后续的不足长度的句子进行padding操作,提供基准的长度。 其次使用for循环遍历每个样本(Example),将源序列src(encoder的输入)使用PAD_IDX填充到相同长度,保持做对齐;将目标序列输入(tgt_in)去掉最后一个token(EOS)作为decoder的输入,目标序列输出比对样本tgb_out去掉第一个tokenBOS作为监督目标,使用的teacher Forcing机制,这样就是实现了输入预测下一个的训练模式数据准备。 最后就是准备src和tgt_in的mask矩阵,形状跟src和tgt_in一样,使用python的eq比对如果对应的位置是padding就是true,不是就是false。 模型架构 数据准备好了,接下来就是设计我们的模型了。我们的模型是一个翻译模型可以分为两个路径,一个是编码路径和解码路径。 编码路径:词嵌入->位置编码->编码器。 解码路径:词嵌入->位置编码->解码器->生成器。 Class Seq2SeqTransformer(nn.Module): def __init__(self, src_vocab_size, tgt_vocab_size, d_model=128, nhead=4, num_encoder_layers=2, num_decoder_layers=2, dim_ff=256, dropout=0.1): super().__init__() self.d_model = d_model # 编码路径 # 1.词嵌入层,将tokenID转换为密集向量 self.src_tok = nn.Embedding(src_vocab_size, d_model, padding_idx=PAD_IDX) self.tgt_tok = nn.Embedding(tgt_vocab_size, d_model, padding_idx=PAD_IDX) # 2. 对输入添加位置信息 self.pos_enc = PositionalEncoding(d_model, dropout=dropout) # 3. 源序列的编码 self.encoder = Encoder(d_model, nhead, dim_ff, num_encoder_layers, dropout) # 解码路径 # 1. 解码生成目标序列 self.decoder = Decoder(d_model, nhead, dim_ff, num_decoder_layers, dropout) # 2. 将解码器输出转换为词表概率 self.generator = nn.Linear(d_model, tgt_vocab_size) 词嵌入直接调用的是神经网络的库nn.Embedding,其他部分都要自己实现,接下来我们会一一展开。下面我们需要先实现模型Seq2SeqTransformer的方法,主要包括如下: make_subsequent_mask:解码器因果掩码,不允许解码器看到未来。 forward: 模型前向传播的方法,pytorch训练的时候自动调用。 greedy_decode:模型推理方法,用于推理的应用。 因果掩码 为什么需要掩码了?主要是让模型不能看到未来的词。 推理阶段虽然是自回归一个一个输入然后一个一个迭代输出,但是在训练阶段,我们解码器的样本是全部一次性输入的。如下的步骤,我们虽然给到模型输入为:"BOS i have an apple ",但是每个步骤给到模型看到的不能是全部,否则给模型都看到输入结果了,那还谈啥预测,模型会偷懒直接就照搬就是一个映射过程了。如当输入BOS i 期望预测输出i have,如果没有掩码模型都看到全部的"BOS i have an apple ",就不是预测了,模型的参数也没法迭代了。 # 步骤1: 输入BOS → 期望输出i # 步骤2: 输入BOS i → 期望输出i have # 步骤3: 输入BOS i have → 期望输出i have an # 步骤4: 输入BOS i have an → 期望输出 i have an apple # 步骤5: 输入BOS i have an apple → 期望输出i have an apple EOS 哪有个问题,为什么我们输入的时候不按照要多少输入多少,为啥要全部一下给到输入?输入倒是可以要多少输入多少,但是要要考虑模型的并行训练,实际上上面的5个步骤在模型训练时是并行进行的,模型训练要的是训练参数,在某个阶段看到什么输入遇到什么输出,都分好类了自然可以并行的,所以这就需要结合掩码了,告诉模型那个步骤你能看到哪些? 总结一下mask的作用就是让模型不能看到未来的词,同时也是让模型不要对padding位进行误预测。 def make_subsequent_mask(self, sz: int) -> torch.Tensor: """构造大小为 (sz, sz) 的下三角因果掩码;True 为屏蔽(不允许看未来)。""" return torch.triu(torch.ones(sz, sz, dtype=torch.bool), diagonal=1) mask是要生成一个下三角形状,示例如下: # 对于序列长度4 mask = make_subsequent_mask(4) # 结果: # [[False, True, True, True], # 位置0: 只能看位置0 # [False, False, True, True], # 位置1: 能看位置0,1 # [False, False, False, True], # 位置2: 能看位置0,1,2 # [False, False, False, False]] # 位置3: 能看所有位置 前向传播 def forward(self, src, tgt_in, src_pad_mask, tgt_pad_mask): """训练/教师强制阶段的前向。 参数: - src: (B, S) 源 token id - tgt_in: (B, T) 目标端输入(以 BOS 开头) - src_pad_mask: (B, S) True 为 padding - tgt_pad_mask: (B, T) True 为 padding(针对 tgt_in) 返回: - logits: (B, T, V) 词表维度的分类分布 """ # 1) 词嵌入 + 位置编码 src_emb = self.pos_enc(self.src_tok(src)) # (B,S,C) tgt_emb = self.pos_enc(self.tgt_tok(tgt_in)) # (B,T,C) # 2) 编码:仅使用 key_padding_mask 屏蔽 padding memory = self.encoder(src_emb, src_key_padding_mask=src_pad_mask) # (B,S,C) # 3) 解码:自注意力需要因果掩码 + padding 掩码;交叉注意力需要 memory 的 padding 掩码 tgt_mask = self.make_subsequent_mask(tgt_in.size(1)).to(src.device) # (T,T) out = self.decoder( tgt_emb, memory, tgt_mask=tgt_mask, tgt_key_padding_mask=tgt_pad_mask, memory_key_padding_mask=src_pad_mask, ) # (B,T,C) logits = self.generator(out) return logits 上面就是模型的训练了,也比较简单,就是对输入词进行词嵌入+位置编码计算,然后送入编码器得到输出特征矩阵memory;给编码器输入的只是padding的掩码,因为不要提取padding的词; 其次生成因果掩码,将编码器的的特征矩阵输出结果memory以及解码器侧自身的输入给到解码器最终得到(B,T,C)的输出矩阵,其包含了最终输出结果词位置的隐藏信息; 最后调用self.generator(out)即线性变化得到输出目标词表的概率分布(B,T,V);后面就可以用其使用交叉熵跟目标结果进行比对计算损失了。 解码推理 @torch.no_grad() def greedy_decode(self, src_ids: List[int], max_len=20, device="cpu"): """在推理阶段进行贪心解码。 参数: - src_ids: 源端 token id 序列(不含 BOS/EOS) - max_len: 最大生成长度(含 BOS/EOS) - device: 运行设备 返回: - 生成的目标端 id 序列(含 BOS/EOS) """ #切换为评估模式,关闭dropout/batchnorm等随机性 self.eval() # 将源端token id序列转换为张量,并添加一个维度,如[1, 2, 3, 4] -> [[1, 2, 3, 4]] # 变为批维度的 (1, S);dtype 为 long 主要是以适配 nn.Embedding的输入格式。 src = torch.tensor(src_ids, dtype=torch.long, device=device).unsqueeze(0) # 生成一个跟src相同形状的mask矩阵,让编码器不要计算提取pandding的位置信息。 #按元素判断 src 是否等于 PAD_IDX,等于的位置为 True,不等的位置为 False。 src_pad_mask = src.eq(PAD_IDX) # 计算src_tok= src 经过词嵌入+位置编码后的结果 src_tok = self.src_tok(src) src_pos = self.pos_enc(src_tok) # 将该结果送入编码器,返回的memory就是编码器提取的特征向量。 # 输入编码器,即使没有填充(pandding)的token,也需要传入src_key_padding_mask。 memory = self.encoder(src_pos, src_key_padding_mask=src_pad_mask) # 初始化目标端token id序列,维度为(1,1),初始值为BOS_IDX # 表示目标端序列的开始,BOS_IDX=1 # 推理时输入是没有PAD,但是仍然需要tgt_pad_mask. ys = torch.tensor([[BOS_IDX]], dtype=torch.long, device=device) for _ in range(max_len - 1): #计算本次解码的Mask,跟ys形状一样。 tgt_pad_mask = ys.eq(PAD_IDX) # 计算本次因果掩码,把未来看到的token都屏蔽。 tgt_mask = self.make_subsequent_mask(ys.size(1)).to(device) # 可以看到当推理模式时,解码器输入token数量依次是1,2,3,4..... out = self.decoder( self.pos_enc(self.tgt_tok(ys)), memory, tgt_key_padding_mask=tgt_pad_mask, memory_key_padding_mask=src_pad_mask, ) # 转化为预测词的概率分布 logits = self.generator(out[:, -1:, :]) # 使用贪心选择概率最大的作为本次预测的目标 next_token = logits.argmax(-1) next_id = next_token.item() # 显示选择的token token_text = TGT_ITOS[next_id] if next_id < len(TGT_ITOS) else f"ID_{next_id}" print(f"选择: {token_text}({next_id})") ys = torch.cat([ys, next_token], dim=1) # 当下一个输出为EOS时表示结束,则退出。 if next_id == EOS_IDX: break return ys.squeeze(0).tolist() 上面代码的设计要点主要为几个部分: 编码信息提取:将要翻译的句子进行词嵌入,位置编码,然后送入编码器计算提出特征信息memory,最终给到解码器作为输入。 自回归生成:最开始使用BOS一个token+编码器此前计算的输出memory、掩码等信息输入给解码器,解码器预测得到一个输出,然后将输出拼接会此前BOS的后面形成解码器新的输入,以此循环进行预测,直至遇到EOS结束。解侧输入序列长度逐步增长:1 → 2 → 3 → 4 → ...,最开始的序列为BOS表示开始。 掩码生成:使用了因果掩码和padding掩码;虽然推理阶段没有对输入数据进行padding操作,但是依旧需要这两个掩码,主要的考量是保持接口的一致性(原来的接口需要传递这个参数)。 贪心策略:解码器的输出进行线性变化得到词表的概率分布后,然后挑选概率最高的token。 结束循环:当判断到模型预测出EOS时,模式则结束,整个预测完成。 位置编码 class PositionalEncoding(nn.Module): """经典正弦/余弦位置编码。 给定嵌入 `x (B,L,C)`,按长度切片并与位置编码相加,再做 dropout。 """ def __init__(self, d_model: int, max_len: int = 5000, dropout: float = 0.1): super().__init__() self.dropout = nn.Dropout(dropout) # 创建一个形状为 (max_len, d_model) 的零张量,用于存储位置编码 pe = torch.zeros(max_len, d_model) # (L, C) # 创建一个形状为 (max_len, 1) 的张量,用于存储位置索引 position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # (L, 1) # 创建一个形状为 (d_model//2,) 的张量,用于存储位置编码的缩放因子 div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # sin, cos 交错 pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) # (1, L, C) self.register_buffer("pe", pe) def forward(self, x: torch.Tensor): # (B, L, C) """为输入嵌入添加位置编码并做 dropout。 参数: - x: (B, L, C) 返回: - (B, L, C) """ x = x + self.pe[:, : x.size(1)] return self.dropout(x) # 对于位置 pos 和维度 i: # 偶数维度: PE(pos, 2i) = sin(pos / 10000^(2i/d_model)) # 奇数维度: PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model)) # pe[:, 0::2]: 选择所有行的偶数列 (0, 2, 4, ...) # pe[:, 1::2]: 选择所有行的奇数列 (1, 3, 5, ...) # 计算过程: # 位置0: sin(0 * div_term), cos(0 * div_term), sin(0 * div_term), ... # 位置1: sin(1 * div_term), cos(1 * div_term), sin(1 * div_term), ... # 位置2: sin(2 * div_term), cos(2 * div_term), sin(2 * div_term), ... 位置编码比较简单,就是按照sin和cos按公式计算生成向量,最终返回词嵌入向量+位置编码向量。 编码器 class Encoder(nn.Module): def __init__(self, d_model: int, nhead: int, dim_ff: int, num_layers: int, dropout: float = 0.1): super().__init__() self.layers = nn.ModuleList([ EncoderLayer(d_model, nhead, dim_ff, dropout) for _ in range(num_layers) ]) def forward(self, x: torch.Tensor, src_key_padding_mask: torch.Tensor | None = None) -> torch.Tensor: """堆叠若干编码层。 参数: - x: (B, S, C) - src_key_padding_mask: (B, S) True 为 padding 返回: - (B, S, C) """ for layer in self.layers: x = layer(x, src_key_padding_mask=src_key_padding_mask) return x 编码器框架就是若干个编码层堆叠起来,但是每层的都有自己的参数,主要调用的是nn.ModuleList进行注册子模块,确保参数都能够被优化器找到,num_layers控制了编码器的深度。 前向传播函数也很简单,输入一次通过每一个编码层,得到的输出结果给到下一个编码层,以此循环最终经过最后一层编码器得得到的特征信息,给后续解码器使用。 class EncoderLayer(nn.Module): """Transformer 编码层(后归一化 post-norm 版本) 子层:自注意力 + 前馈;均带残差连接与 LayerNorm。 """ def __init__(self, d_model: int, nhead: int, dim_ff: int, dropout: float = 0.1): super().__init__() self.self_attn = MultiHeadAttention(d_model, nhead, dropout) self.norm1 = nn.LayerNorm(d_model) self.ff = PositionwiseFeedForward(d_model, dim_ff, dropout) self.norm2 = nn.LayerNorm(d_model) def forward(self, x: torch.Tensor, src_key_padding_mask: torch.Tensor | None = None) -> torch.Tensor: """单层编码层前向。 参数: - x: (B, S, C) - src_key_padding_mask: (B, S) True 为 padding 返回: - (B, S, C) """ # 自注意力子层 attn_out = self.self_attn(x, x, x, attn_mask=None, key_padding_mask=src_key_padding_mask) x = self.norm1(x + attn_out) # 前馈子层 ff_out = self.ff(x) x = self.norm2(x + ff_out) return x 编码层的组件为MultiHeadAttention、LayerNorm、PositionwiseFeedForward这与我们此前介绍的transformer原理一致。 其前向传播过程,首先输入X(查询),X(键),X(值),qkv都是一样的;注意力计算时,把attn_mask=None,因为编码器不需要因果掩码,但是需要padding mask。其次进行残差连接计算x+attn_out,再调用norml进行层归一化,最后是计算前馈网络,再进行归一化就得到一层的输出结果了。 class PositionwiseFeedForward(nn.Module): """前馈网络:逐位置的两层 MLP(含激活与 dropout)""" def __init__(self, d_model: int, dim_ff: int, dropout: float = 0.1): super().__init__() self.fc1 = nn.Linear(d_model, dim_ff) self.fc2 = nn.Linear(dim_ff, d_model) self.act = nn.ReLU() self.dropout = nn.Dropout(dropout) def forward(self, x: torch.Tensor) -> torch.Tensor: """两层逐位置前馈网络。 参数: - x: (B, L, C) 返回: - (B, L, C) """ x = self.fc2(self.dropout(self.act(self.fc1(x)))) x = self.dropout(x) return x 前馈网络主要两层: 第一层:d_model → dim_ff (通常 dim_ff = 4 * d_model) 激活函数:ReLU。 第二层:dim_ff → d_model 就是对输入进行升维然后非线性变化再降维,提取更多的信息。两层都使用了dropout,展开就是如下。 # 1. 第一层线性变换 x = self.fc1(x) # (B, L, C) → (B, L, dim_ff) # 2. 激活函数 x = self.act(x) # 应用ReLU # 3. 第一个dropout x = self.dropout(x) # 随机置零部分神经元 # 4. 第二层线性变换 x = self.fc2(x) # (B, L, dim_ff) → (B, L, C) # 5. 第二个dropout x = self.dropout(x) # 最终dropout 解码器 class Decoder(nn.Module): def __init__(self, d_model: int, nhead: int, dim_ff: int, num_layers: int, dropout: float = 0.1): super().__init__() self.layers = nn.ModuleList([ DecoderLayer(d_model, nhead, dim_ff, dropout) for _ in range(num_layers) ]) def forward( self, x: torch.Tensor, memory: torch.Tensor, tgt_mask: torch.Tensor | None = None, tgt_key_padding_mask: torch.Tensor | None = None, memory_key_padding_mask: torch.Tensor | None = None, ) -> torch.Tensor: """堆叠若干解码层。 参数: - x: (B, T, C) 目标端嵌入 - memory: (B, S, C) 编码器输出 - tgt_mask: (T, T) 因果掩码,True 为屏蔽 - tgt_key_padding_mask: (B, T) 目标端 padding 掩码 - memory_key_padding_mask: (B, S) 源端 padding 掩码 返回: - (B, T, C) """ for layer in self.layers: x = layer( x, memory, tgt_mask=tgt_mask, tgt_key_padding_mask=tgt_key_padding_mask, memory_key_padding_mask=memory_key_padding_mask, ) return x 与编码器类似,使用nn.ModuleList创建多个解码层,每个解码层都是独立的DecoderLayer实例;解码器的输入数据有两个,一个是解码器侧自己的输入序列,另外一个是编码器计算得到的特征信息。解码器的每一层都需要输入编码器给的特征序列,但是都是一样的;解码器层计算得到的输出将传递给下一层解码器层,循环得到最后的输出。 Decoder (解码器) ├── DecoderLayer 1 (解码层1) │ ├── MultiHeadAttention (自注意力) │ ├── LayerNorm1 + 残差连接 │ ├── MultiHeadAttention (交叉注意力) │ ├── LayerNorm2 + 残差连接 │ ├── PositionwiseFeedForward (前馈网络) │ └── LayerNorm3 + 残差连接 ├── DecoderLayer 2 (解码层2) │ └── ... (同上结构) └── ... (重复 num_layers 次) 输入: x (B, T, C) + memory (B, S, C) → DecoderLayer 1 → DecoderLayer 2 → ... → DecoderLayer N → 输出: (B, T, C) 其前向传播也大同小异,与编码器不同的是需要传递因果掩码,tgt_mask,防止看到未来信息,同时还传入了源序列的pandding掩码,跟输入给编码器的mask是一样的。 class DecoderLayer(nn.Module): """Transformer 解码层(自注意力 + 交叉注意力 + 前馈)""" def __init__(self, d_model: int, nhead: int, dim_ff: int, dropout: float = 0.1): super().__init__() self.self_attn = MultiHeadAttention(d_model, nhead, dropout) self.norm1 = nn.LayerNorm(d_model) self.cross_attn = MultiHeadAttention(d_model, nhead, dropout) self.norm2 = nn.LayerNorm(d_model) self.ff = PositionwiseFeedForward(d_model, dim_ff, dropout) self.norm3 = nn.LayerNorm(d_model) def forward( self, x: torch.Tensor, memory: torch.Tensor, tgt_mask: torch.Tensor | None = None, tgt_key_padding_mask: torch.Tensor | None = None, memory_key_padding_mask: torch.Tensor | None = None, ) -> torch.Tensor: """单层解码层前向。 参数: - x: (B, T, C) 解码器输入 - memory: (B, S, C) 编码器输出 - tgt_mask: (T, T) 因果掩码,true为屏蔽 - tgt_key_padding_mask: (B, T) - memory_key_padding_mask: (B, S) 返回: - (B, T, C) """ # 1) 解码器自注意力(带因果掩码 tgt_mask) sa = self.self_attn(x, x, x, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask) x = self.norm1(x + sa) # 2) 交叉注意力:Q 来自解码器,K/V 来自编码器 memory ca = self.cross_attn(x, memory, memory, attn_mask=None, key_padding_mask=memory_key_padding_mask) x = self.norm2(x + ca) # 3) 前馈 ff = self.ff(x) x = self.norm3(x + ff) return x 解码器层比编码器层多了一个cross_attn交叉注意力。除了输入数据有些不同,其他都基本类似,下面按前向传播的流程来分析一下。 首先是第一个子层自注意力的计算,输入X(q),X(k),X(v)来自解码器侧路径的输入,推理模式则是由自己预测自回归的输入,训练模式是给定的。自注意力传入了因果掩码attn_mask和屏蔽pandding mask。 其次就是计算残差和层归一化,与编码器类似。 接着就是计算交叉注意力了,核心的注意力类还是MultiHeadAttention,跟编码器和解码器的都来自一个。唯一的区别就是传入的参数不一样,其中查询Q来自于解码器当前的状态X即解码器上一个自注意力的的输出,特征路径是解码器给的信息。而键值K,V则使用的是编码器的输出memory,不使用因果掩码,因为因果掩码前面已经处理了。 最后就是前馈网络的升维和降维处理等了,跟编码器就一样了,就不阐述了。 三个子层的不同作用: 自注意力层:处理目标序列内部的关系,生成"i have an apple"时,"have"应该关注"i","an"应该关注"i have",通过因果掩码确保只能看到历史信息。 交叉注意力层:让解码器"看到"编码器的信息,翻译成英文时,需要参考中文源序列,通过交叉注意力,解码器可以访问编码器的完整表示。 前馈网络则层:增加非线性表达能力,每个位置独立计算,不涉及位置间的关系。 注意力 接下来就是核心MultiHeadAttention。 MultiHeadAttention class MultiHeadAttention(nn.Module): """多头注意力(Batch-first) - 输入输出为 (B, L, C) - 内部将通道 C 切分到 H 个头,每头维度 Dh=C/H - 支持两类掩码: 1) attn_mask: (Lq, Lk) 下三角等自回归掩码 2) key_padding_mask: (B, Lk) 序列 padding 掩码 两者会在内部合并为可广播到 (B,H,Lq,Lk) 的布尔张量。 """ def __init__(self, d_model: int, nhead: int, dropout: float = 0.1): super().__init__() assert d_model % nhead == 0, "d_model 必须能被 nhead 整除" self.d_model = d_model self.nhead = nhead self.d_head = d_model // nhead self.w_q = nn.Linear(d_model, d_model) self.w_k = nn.Linear(d_model, d_model) self.w_v = nn.Linear(d_model, d_model) self.attn = ScaledDotProductAttention(dropout) self.proj = nn.Linear(d_model, d_model) self.dropout = nn.Dropout(dropout) # 将 (B, L, C) 重塑为 (B, L, H, Dh),原来的数据都不会变化,只是形状改变了 # 加了一个维,然后交换了张量维度顺序。 def _shape(self, x: torch.Tensor) -> torch.Tensor: """(B, L, C) 切分重排为 (B, H, L, Dh)。""" B, L, C = x.shape # 第一步:将 (B, L, C) 重塑为 (B, L, H, Dh) x_reshaped = x.view(B, L, self.nhead, self.d_head) #x.view不复制数据,只是改变数据的"视角",数据在内存中存储顺序不变 # 第二步:交换维度 1 和 2,从 (B, L, H, Dh) 变为 (B, H, L, Dh) x_transposed = x_reshaped.transpose(1, 2) return x_transposed def _merge(self, x: torch.Tensor) -> torch.Tensor: """(B, H, L, Dh) 合并重排回 (B, L, C)。""" B, H, L, Dh = x.shape # 第一步:交换维度 1 和 2,从 (B, H, L, Dh) 变为 (B, L, H, Dh) x_transposed = x.transpose(1, 2) # 第二步:确保内存连续,然后重塑为 (B, L, H*Dh) x_contiguous = x_transposed.contiguous() # 第三步:重塑为 (B, L, C) 其中 C = H * Dh x_reshaped = x_contiguous.view(B, L, H * Dh) return x_reshaped # 因为QKV算的是矩阵,在transformer中涉及到两个mask # 一个是attn_mask控制哪些位置可以相互关注,如因果掩码防止看未来 # 一个是key_padding_mask控制哪些位置是有效的,如填充token不应该被关注 # 因为都要计算所以把这两个使用|合并起来,一起跟QKV计算即可,否则得计算两次。 # 对于encode来说传参只会穿key_pandding_mask,另外一个没有 # 对于decoder来说,两个都会传递。 def _build_attn_mask( self, Lq: int, Lk: int, attn_mask: torch.Tensor | None, key_padding_mask: torch.Tensor | None, device: torch.device, ) -> torch.Tensor | None: """将两类掩码合并成 (1/ B, 1/ H, Lq, Lk) 可广播布尔张量。True 表示屏蔽。""" mask = None if attn_mask is not None: # (Lq, Lk) -> (1,1,Lq,Lk) m1 = attn_mask.to(device).unsqueeze(0).unsqueeze(0) mask = m1 if mask is None else (mask | m1) if key_padding_mask is not None: # (B, Lk) -> (B,1,1,Lk) m2 = key_padding_mask.to(device).unsqueeze(1).unsqueeze(1) mask = m2 if mask is None else (mask | m2) return mask (0)网络层定义 self.w_q = nn.Linear(d_model, d_model) # 查询线性变换 self.w_k = nn.Linear(d_model, d_model) # 键线性变换 self.w_v = nn.Linear(d_model, d_model) # 值线性变换 self.attn = ScaledDotProductAttention(dropout) # 缩放点积注意力 self.proj = nn.Linear(d_model, d_model) # 输出投影 self.dropout = nn.Dropout(dropout) # 输出dropout w_q, w_k, w_v: 将输入转换为查询、键、值表示,attn为计算注意力权重和加权求和,proj将多头结果投影会原始维度,dropout是防止过拟合。 (1)将输入分成多个头 对输入按照head划分为多份,所以这里需要注意的是d_model必现要能被nhead整除,确保每个头有相同的维度。如原来的输入为(B,L,C)切分后变成(B, H, L, Dh),Dh=d_model/nhead。 第一步先使用view重塑为(B, H, L, Dh),然后第二步进行重排。举个例子输入为(B, L, C) = (1, 4, 6)重塑为(B, L, H, Dh) = (1, 4, 2, 3),重塑后的内存布局,[word1_head1_3, word1_head2_3, word2_head1_3, word2_head2_3, ...]每个词的头是交错存储的,为了适应多头注意力的并行计算还要重排一下,让每个头的数据连续存储。 (2)掩码合并 将key_padding_mask和attn_mask(因果)进行合并,这样后续计算就不用计算两次了。 # 使用逻辑或运算 | 合并 # True | True = True (屏蔽) # True | False = True (屏蔽) # False | False = False (不屏蔽) # 最终掩码形状: (B, H, Lq, Lk) 或 (1, H, Lq, Lk) # 可以广播到注意力计算的形状 (3)每个头计算注意力 Q = self._shape(self.w_q(query)) # (B,H,Lq,Dh) K = self._shape(self.w_k(key)) # (B,H,Lk,Dh) V = self._shape(self.w_v(value)) # (B,H,Lk,Dh) mask = self._build_attn_mask(Lq, Lk, attn_mask, key_padding_mask, device) out = self.attn(Q, K, V, mask) # (B,H,Lq,Dh) 计算注意力时,首先对输入分别进行计算线性变换(如QxWq,这样就有参数了)然后重排分别得到QKV,对于编码器来说输入的query、key、value都是一样的,计算QKV的方式也是一样的,都是进行线性nn.Linear层然后再进行重排,但是各自有各自参数,这就是要训练的参数。经过线性层的结果后都需要调用_shape进行重排划分为多个头的数据,便于输入给多头注意力;构建好合并后的掩码之后,就传递到attn中计算注意力。计算出的多头的注意力,需要合并为原来的形状,最后再通过一个线性变化得到最后的结果输出。 完整的数据流示例: # 输入: query (1, 4, 6), key (1, 4, 6), value (1, 4, 6) # 参数: d_model=6, nhead=2, d_head=3 # 步骤1: 线性变换 (保持形状) # w_q(query): (1, 4, 6) -> (1, 4, 6) # w_k(key): (1, 4, 6) -> (1, 4, 6) # w_v(value): (1, 4, 6) -> (1, 4, 6) # 每个词从6维变换到6维 # 学习查询、键、值的表示 # 步骤2: 分头 # _shape(w_q(query)): (1, 4, 6) -> (1, 2, 4, 3) # _shape(w_k(key)): (1, 4, 6) -> (1, 2, 4, 3) # _shape(w_v(value)): (1, 4, 6) -> (1, 2, 4, 3) # 将6维分成2个头,每个头3维 # 头1: 3维表示 # 头2: 3维表示 # 步骤3: 注意力计算 # attn(Q, K, V, mask): (1, 2, 4, 3) -> (1, 2, 4, 3) # 每个头独立计算注意力: # 头1: 计算4个位置之间的注意力,每个位置3维 # 头2: 计算4个位置之间的注意力,每个位置3维 # 步骤4: 合并头 # _merge(out): (1, 2, 4, 3) -> (1, 4, 6) # 将2个头的3维表示合并回6维 # 每个位置现在包含所有头的信息 # 步骤5: 输出变换 # proj(out): (1, 4, 6) -> (1, 4, 6) # dropout(out): (1, 4, 6) -> (1, 4, 6) # 最终输出: (1, 4, 6) ScaledDotProductAttention class ScaledDotProductAttention(nn.Module): """缩放点积注意力(单头) 给定 Q(查询)、K(键)、V(值) 与掩码,计算注意力加权输出。 形状约定: - Q: (B, H, Lq, Dh) - K: (B, H, Lk, Dh) - V: (B, H, Lk, Dh) - mask: 可广播到 (B, H, Lq, Lk),True 表示屏蔽。 """ def __init__(self, dropout: float = 0.1): super().__init__() self.dropout = nn.Dropout(dropout) def forward(self, Q: torch.Tensor, K: torch.Tensor, V: torch.Tensor, mask: torch.Tensor | None = None): """计算缩放点积注意力。 参数: - Q: (B, H, Lq, Dh) - K: (B, H, Lk, Dh) - V: (B, H, Lk, Dh) - mask: 可广播到 (B, H, Lq, Lk) 的布尔掩码,True 表示屏蔽 返回: - (B, H, Lq, Dh) """ d_k = Q.size(-1) # 注意力分数 = QK^T / sqrt(dk) scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k) # (B,H,Lq,Lk) if mask is not None: # 对被屏蔽位置填充一个极小值,softmax 后 ~0 scores = scores.masked_fill(mask, float("-inf")) attn = torch.softmax(scores, dim=-1) # (B,H,Lq,Lk) attn = self.dropout(attn) out = torch.matmul(attn, V) # (B,H,Lq,Dh) return out 这里就是实现缩放点积注意力机制了,Q.transpose(-2, -1)将K的最后两个维度转置,torch.matmul(Q, K^T): 计算Q和K的点积,再math.sqrt(d_k): 缩放因子,防止分数过大。 可以看到会根据传入的mask进行处理,让mask=True的位置会被填充为-inf,这样经过softmax之后,这些位置就接近0,从而实现了屏蔽某位位置的效果。 softmax是将分数转换为概率分布,所有位置的权重和为1,分数越高的位置,权重越大,也就是跟词相关性越大提取的值越丰富,如果是0那基本不相关,掩码为true的位置就是0,也就是基本不提取信息。 总结一下,核心就是公式Attention(Q,K,V) = softmax(QK^T/√d_k)V计算。 应用 接下来就是调用应用了 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") dataset = ToyDataset(pairs) loader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn) model = Seq2SeqTransformer( src_vocab_size=len(SRC_ITOS), tgt_vocab_size=len(TGT_ITOS), d_model=6, nhead=3, num_encoder_layers=2, num_decoder_layers=2, dim_ff=256, dropout=0.1 ).to(device) criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX) optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4) 定义dataset、loader准备数据,然后定义模型model,损失函数定义以及优化方法。 def evaluate_sample(sent="我 有 一个 苹果"): """辅助函数:对输入中文句子进行编码→推理→解码并打印结果。""" ids = encode_src(sent) print("ids",ids) pred_ids = model.greedy_decode(ids, device=device) pred_text = decode_tgt(pred_ids) print(f'INPUT : {sent}') print(f'OUTPUT: {pred_text}\n') print("Before training:") evaluate_sample("我 有 一个 苹果") 上面是整个应用翻译应用,在没有训练出参数,自然预测出的结果是不对的。 EPOCHS = 800 # 小步数即可过拟合玩具数据 for epoch in range(1, EPOCHS + 1): model.train() total_loss = 0.0 for src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask in loader: src = src.to(device) tgt_in = tgt_in.to(device) tgt_out = tgt_out.to(device) src_pad_mask = src_pad_mask.to(device) tgt_pad_mask = tgt_pad_mask.to(device) logits = model(src, tgt_in, src_pad_mask, tgt_pad_mask) # (B, T, V) loss = criterion(logits.reshape(-1, logits.size(-1)), tgt_out.reshape(-1)) optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() total_loss += loss.item() if epoch % 5 == 0 or epoch == 1: print(f"Epoch {epoch:02d} | loss={total_loss/len(loader):.4f}") evaluate_sample("我 有 一个 苹果") 上面是训练过程。 常见问题 (1) 解码器训练时的输入和推理时的输入有什么不同? 训练模式是固定长度输入,例如(2,5),所有样本都padding到相同长度,批次内所有样本的长度一致。 # 使用教师强制,目标序列已知 tgt_in = [BOS, i, have, an, apple,PAD] # 完整的输入序列 tgt_out = [i, have, an, apple, EOS] # 完整的监督目标 而推理模式序列长度随着时间步逐步增长,例如# 例如: (1, 1) → (1, 2) → (1, 3) → ...,每次生成后长度+1。 # 逐步生成,每次只预测下一个token ys = [[BOS_ID]] # 第1步 ys = [[BOS_ID, i]] # 第2步 ys = [[BOS_ID, i, have]] # 第3步 ys = [[BOS_ID, i, have, an]] # 第4步 ys = [[BOS_ID, i, have, an,apple]] # 第5步 之所以有这样的差异是训练时用的是Teacher Forcing优势,使用了并行计算让所有位置可以同时计算预测,提高效率快速收敛。而推理时是自回归模式,每个token的生成只能基于之前输出的信息。 (2)什么情况下输入数据需要PAD? 通常无论是编码器的输入还是解码器的输入如果不是批量并行计算都可以不用PAD,但如果是批量并行都需要PAD MASK。 在训练模式下,为了提高效率需要批量并行计算,所以无论编码器还是解码器的输入都是需要PAD,在本文中要不要PAD动作是在DataLoader的回调函数中collate_fn进行的,会对编码器和解码器的输入都会pad对齐到一样的长度。 因此最主要的考量是否要批量并行计算,因为并行计算如果长度不同,无法并行处理,无论是自注意力分数、前馈网络、还是残差连接,只有长度一致,才能并行一下处理多个样本。而往往训练模型基本都是批量处理。 总之只处理一个样本时可以不需要PAD,如果要批量都一定需要PAD。而只处理一个样本,往往是推理模式场景。 (3)既然推理模式的编码器和解码器输入没有进行PAD到一定长度,那为什么无论编码器和解码器都依旧还需要传入PAD mask? 需要PAD mask我认为本质上有两点原因:其一用于告知模型输入序列的长度,其二为了接口的一致性,因为transformer最核心的是无论编码器还是解码器最终的核心是Scaled Dot-Product Attetion,可以理解为这是一个共有底层函数,都要调用,做兼容了所以一定要传这个参数。 (3)推理模式的解码器既然是一个一个token往后生成的然后依次拼接回给到输入,未来的词其实根本就没有输入,为什么还需要下三角度的因果mask? 本质上还是保证接口的兼容性,这块都无论是推理还是训练模式都需要传入这个因果mask。 首先在实现层面让训练模式和推理模式代码能够兼容,训练模式使用的是teacher forcing把整个目标序列一次性喂进去,那自然不能让模型看到未来token。推理模式严格上如果一次一个token,每次只输入已经生成的部分,在这种最简单的视线下,确实不需要再加下三角mask,因为未来token不存在,自然无法attend到。但是大多数框架都选择统一接口,无论训练还是推理都传causal mask,避免在不同模式下切换逻辑。 其次从推理模式的多样性考虑,即使是推理阶段,也有可能遇到这种情况,也就是批量生成,一次生成多个序列,每个序列长度不同。 下三角是一个通用的"未来屏蔽"机制,不只是为了防止模型看见未来token,也是为了让实现和训练推理保持一致,并支持批量/并行推理优化。 附:完整源码 # toy_transformer_translation.py # A tiny, runnable Transformer seq2seq example to translate Chinese->English on a toy dataset. # PyTorch >= 2.0 recommended. import math import random from dataclasses import dataclass from typing import List, Tuple import torch import torch.nn as nn from torch.utils.data import DataLoader, Dataset random.seed(0) torch.manual_seed(0) # -------------------------- # 1) Toy parallel corpus # -------------------------- pairs = [ # 基本陈述 ("我 有 一个 苹果", "i have an apple"), ("我 有 一本 书", "i have a book"), ("你 有 一个 苹果", "you have an apple"), ("他 有 一个 苹果", "he has an apple"), ("她 有 一个 苹果", "she has an apple"), ("我们 有 一个 苹果", "we have an apple"), ("我 喜欢 苹果", "i like apples"), ("我 吃 苹果", "i eat apples"), ("你 喜欢 书", "you like books"), ("我 喜欢 书", "i like books"), # 稍作扩展 ("我 有 两个 苹果", "i have two apples"), ("我 有 红色 苹果", "i have red apples"), ] # 中文使用"空格分词(简化)",英文用空格分词 def build_vocab(examples: List[str]): """构建词表(字符串→索引 与 索引→字符串) - 输入示例为用空格分词后的句子列表 - 加入特殊符号 `<pad>`, `<bos>`, `<eos>` 并将其它 token 排序,保证可复现 返回: stoi: dict[token->id] itos: List[id->token] """ tokens = set() # 建立一个集合,用于存储所有不同的token for s in examples: # 遍历所有句子,s是句子,如我 有 一个 苹果 for t in s.split(): # 遍历句子中的每个token,t是token,如我 tokens.add(t.lower()) # 将token添加到集合中,并转换为小写,如我 # 特殊符号 itos = ["<pad>", "<bos>", "<eos>"] + sorted(tokens) # 将特殊符号和所有不同的token排序 # print(itos) stoi = {t: i for i, t in enumerate(itos)} # 将token和索引建立映射关系 # print(stoi) return stoi, itos src_texts = [p[0] for p in pairs] tgt_texts = [p[1] for p in pairs] print("src_texts",src_texts) print("tgt_texts",tgt_texts) SRC_STOI, SRC_ITOS = build_vocab(src_texts) print("SRC_STOI",SRC_STOI) print("SRC_ITOS",SRC_ITOS) TGT_STOI, TGT_ITOS = build_vocab(tgt_texts) print("TGT_STOI",TGT_STOI) print("TGT_ITOS",TGT_ITOS) PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2 #将源语句编码为索引序列(不含 BOS/EOS),如我 有 一个 苹果 -> [1, 2, 3, 4] def encode_src(s: str) -> List[int]: """将原语句(已空格分词)编码为索引序列(不含 BOS/EOS)。""" return [SRC_STOI[w.lower()] for w in s.split()] def encode_tgt(s: str) -> List[int]: """将目标语句编码为索引序列,并在首尾添加 BOS/EOS。""" return [BOS_IDX] + [TGT_STOI[w.lower()] for w in s.split()] + [EOS_IDX] def decode_tgt(ids: List[int]) -> str: """将目标端索引序列解码回字符串(忽略 PAD/BOS,遇到 EOS 停止)。""" words = [] for i in ids: if i == EOS_IDX: break if i in (PAD_IDX, BOS_IDX): continue words.append(TGT_ITOS[i]) return " ".join(words) @dataclass class Example: """单条并行样本 - src: 源语言索引序列(不含 BOS/EOS) - tgt: 目标语言索引序列(含 BOS/EOS) """ src: List[int] tgt: List[int] class ToyDataset(Dataset): """极小玩具平行语料数据集,用于快速过拟合演示。""" def __init__(self, pairs: List[Tuple[str, str]]): self.data = [Example(encode_src(s), encode_tgt(t)) for s, t in pairs] def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx] def collate_fn(batch: List[Example]): """将一个 batch 的样本对齐为等长张量,并构造 teacher forcing 所需的输入/输出。 返回: - src: (B,S) 源序列,已 padding - tgt_in: (B,T) 解码器输入(含 BOS,右对齐 padding) - tgt_out: (B,T) 解码器监督目标(对 tgt_in 右移一位,含 EOS) - src_pad_mask: (B,S) 源端 padding 掩码,True 表示 padding 位置 - tgt_pad_mask: (B,T) 目标端 padding 掩码(针对输入序列) """ # padding to max length in batch src_max = max(len(b.src) for b in batch) tgt_max = max(len(b.tgt) for b in batch) src_batch = [] tgt_in_batch = [] tgt_out_batch = [] for ex in batch: src = ex.src + [PAD_IDX] * (src_max - len(ex.src)) # Teacher forcing: shift-in, shift-out tgt_in = ex.tgt[:-1] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[:-1])) tgt_out = ex.tgt[1:] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[1:])) src_batch.append(src) tgt_in_batch.append(tgt_in) tgt_out_batch.append(tgt_out) src = torch.tensor(src_batch, dtype=torch.long) # (B, S) tgt_in = torch.tensor(tgt_in_batch, dtype=torch.long) # (B, T_in) tgt_out = torch.tensor(tgt_out_batch, dtype=torch.long) # (B, T_out) src_pad_mask = src.eq(PAD_IDX) # (B, S) tgt_pad_mask = tgt_in.eq(PAD_IDX) # (B, T) return src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask # -------------------------- # 2) Positional encoding # -------------------------- class PositionalEncoding(nn.Module): """经典正弦/余弦位置编码。 给定嵌入 `x (B,L,C)`,按长度切片并与位置编码相加,再做 dropout。 """ def __init__(self, d_model: int, max_len: int = 5000, dropout: float = 0.1): super().__init__() self.dropout = nn.Dropout(dropout) # 创建一个形状为 (max_len, d_model) 的零张量,用于存储位置编码 pe = torch.zeros(max_len, d_model) # (L, C) # 创建一个形状为 (max_len, 1) 的张量,用于存储位置索引 position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # (L, 1) # 创建一个形状为 (d_model//2,) 的张量,用于存储位置编码的缩放因子 div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # sin, cos 交错 pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) # (1, L, C) self.register_buffer("pe", pe) def forward(self, x: torch.Tensor): # (B, L, C) """为输入嵌入添加位置编码并做 dropout。 参数: - x: (B, L, C) 返回: - (B, L, C) """ x = x + self.pe[:, : x.size(1)] return self.dropout(x) # -------------------------- # 3) 手写 Transformer 编码/解码层(含详细注释) # -------------------------- class ScaledDotProductAttention(nn.Module): """缩放点积注意力(单头) 给定 Q(查询)、K(键)、V(值) 与掩码,计算注意力加权输出。 形状约定: - Q: (B, H, Lq, Dh) - K: (B, H, Lk, Dh) - V: (B, H, Lk, Dh) - mask: 可广播到 (B, H, Lq, Lk),True 表示屏蔽。 """ def __init__(self, dropout: float = 0.1): super().__init__() self.dropout = nn.Dropout(dropout) def forward(self, Q: torch.Tensor, K: torch.Tensor, V: torch.Tensor, mask: torch.Tensor | None = None): """计算缩放点积注意力。 参数: - Q: (B, H, Lq, Dh) - K: (B, H, Lk, Dh) - V: (B, H, Lk, Dh) - mask: 可广播到 (B, H, Lq, Lk) 的布尔掩码,True 表示屏蔽 返回: - (B, H, Lq, Dh) """ d_k = Q.size(-1) # 注意力分数 = QK^T / sqrt(dk) scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k) # (B,H,Lq,Lk) if mask is not None: # 对被屏蔽位置填充一个极小值,softmax 后 ~0 scores = scores.masked_fill(mask, float("-inf")) attn = torch.softmax(scores, dim=-1) # (B,H,Lq,Lk) attn = self.dropout(attn) out = torch.matmul(attn, V) # (B,H,Lq,Dh) return out class MultiHeadAttention(nn.Module): """多头注意力(Batch-first) - 输入输出为 (B, L, C) - 内部将通道 C 切分到 H 个头,每头维度 Dh=C/H - 支持两类掩码: 1) attn_mask: (Lq, Lk) 下三角等自回归掩码 2) key_padding_mask: (B, Lk) 序列 padding 掩码 两者会在内部合并为可广播到 (B,H,Lq,Lk) 的布尔张量。 """ def __init__(self, d_model: int, nhead: int, dropout: float = 0.1): super().__init__() assert d_model % nhead == 0, "d_model 必须能被 nhead 整除" self.d_model = d_model self.nhead = nhead self.d_head = d_model // nhead self.w_q = nn.Linear(d_model, d_model) self.w_k = nn.Linear(d_model, d_model) self.w_v = nn.Linear(d_model, d_model) self.attn = ScaledDotProductAttention(dropout) self.proj = nn.Linear(d_model, d_model) self.dropout = nn.Dropout(dropout) # 将 (B, L, C) 重塑为 (B, L, H, Dh),原来的数据都不会变化,只是形状改变了 # 加了一个维,然后交换了张量维度顺序。 def _shape(self, x: torch.Tensor) -> torch.Tensor: """(B, L, C) 切分重排为 (B, H, L, Dh)。""" B, L, C = x.shape # 第一步:将 (B, L, C) 重塑为 (B, L, H, Dh) x_reshaped = x.view(B, L, self.nhead, self.d_head) #x.view不复制数据,只是改变数据的"视角",数据在内存中存储顺序不变 # 第二步:交换维度 1 和 2,从 (B, L, H, Dh) 变为 (B, H, L, Dh) x_transposed = x_reshaped.transpose(1, 2) return x_transposed def _merge(self, x: torch.Tensor) -> torch.Tensor: """(B, H, L, Dh) 合并重排回 (B, L, C)。""" B, H, L, Dh = x.shape # 第一步:交换维度 1 和 2,从 (B, H, L, Dh) 变为 (B, L, H, Dh) x_transposed = x.transpose(1, 2) # 第二步:确保内存连续,然后重塑为 (B, L, H*Dh) x_contiguous = x_transposed.contiguous() # 第三步:重塑为 (B, L, C) 其中 C = H * Dh x_reshaped = x_contiguous.view(B, L, H * Dh) return x_reshaped # 因为QKV算的是矩阵,在transformer中涉及到两个mask # 一个是attn_mask控制哪些位置可以相互关注,如因果掩码防止看未来 # 一个是key_padding_mask控制哪些位置是有效的,如填充token不应该被关注 # 因为都要计算所以把这两个使用|合并起来,一起跟QKV计算即可,否则得计算两次。 # 对于encode来说传参只会穿key_pandding_mask,另外一个没有 # 对于decoder来说,两个都会传递。 def _build_attn_mask( self, Lq: int, Lk: int, attn_mask: torch.Tensor | None, key_padding_mask: torch.Tensor | None, device: torch.device, ) -> torch.Tensor | None: """将两类掩码合并成 (1/ B, 1/ H, Lq, Lk) 可广播布尔张量。True 表示屏蔽。""" mask = None if attn_mask is not None: # (Lq, Lk) -> (1,1,Lq,Lk) m1 = attn_mask.to(device).unsqueeze(0).unsqueeze(0) mask = m1 if mask is None else (mask | m1) if key_padding_mask is not None: # (B, Lk) -> (B,1,1,Lk) m2 = key_padding_mask.to(device).unsqueeze(1).unsqueeze(1) mask = m2 if mask is None else (mask | m2) return mask def forward( self, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, attn_mask: torch.Tensor | None = None, key_padding_mask: torch.Tensor | None = None, ) -> torch.Tensor: """多头注意力前向。 参数: - query, key, value: (B, L, C) - attn_mask: (Lq, Lk) 因果/结构掩码,True 为屏蔽 - key_padding_mask: (B, Lk) padding 掩码,True 为 padding 返回: - (B, Lq, C) """ # 输入均为 (B, L, C) B, Lq, _ = query.shape _, Lk, _ = key.shape device = query.device Q = self._shape(self.w_q(query)) # (B,H,Lq,Dh) K = self._shape(self.w_k(key)) # (B,H,Lk,Dh) V = self._shape(self.w_v(value)) # (B,H,Lk,Dh) mask = self._build_attn_mask(Lq, Lk, attn_mask, key_padding_mask, device) out = self.attn(Q, K, V, mask) # (B,H,Lq,Dh) out = self._merge(out) # (B,Lq,C) out = self.proj(out) out = self.dropout(out) return out class PositionwiseFeedForward(nn.Module): """前馈网络:逐位置的两层 MLP(含激活与 dropout)""" def __init__(self, d_model: int, dim_ff: int, dropout: float = 0.1): super().__init__() self.fc1 = nn.Linear(d_model, dim_ff) self.fc2 = nn.Linear(dim_ff, d_model) self.act = nn.ReLU() self.dropout = nn.Dropout(dropout) def forward(self, x: torch.Tensor) -> torch.Tensor: """两层逐位置前馈网络。 参数: - x: (B, L, C) 返回: - (B, L, C) """ x = self.fc2(self.dropout(self.act(self.fc1(x)))) x = self.dropout(x) return x class EncoderLayer(nn.Module): """Transformer 编码层(后归一化 post-norm 版本) 子层:自注意力 + 前馈;均带残差连接与 LayerNorm。 """ def __init__(self, d_model: int, nhead: int, dim_ff: int, dropout: float = 0.1): super().__init__() self.self_attn = MultiHeadAttention(d_model, nhead, dropout) self.norm1 = nn.LayerNorm(d_model) self.ff = PositionwiseFeedForward(d_model, dim_ff, dropout) self.norm2 = nn.LayerNorm(d_model) def forward(self, x: torch.Tensor, src_key_padding_mask: torch.Tensor | None = None) -> torch.Tensor: """单层编码层前向。 参数: - x: (B, S, C) - src_key_padding_mask: (B, S) True 为 padding 返回: - (B, S, C) """ # 自注意力子层 attn_out = self.self_attn(x, x, x, attn_mask=None, key_padding_mask=src_key_padding_mask) x = self.norm1(x + attn_out) # 前馈子层 ff_out = self.ff(x) x = self.norm2(x + ff_out) return x class DecoderLayer(nn.Module): """Transformer 解码层(自注意力 + 交叉注意力 + 前馈)""" def __init__(self, d_model: int, nhead: int, dim_ff: int, dropout: float = 0.1): super().__init__() self.self_attn = MultiHeadAttention(d_model, nhead, dropout) self.norm1 = nn.LayerNorm(d_model) self.cross_attn = MultiHeadAttention(d_model, nhead, dropout) self.norm2 = nn.LayerNorm(d_model) self.ff = PositionwiseFeedForward(d_model, dim_ff, dropout) self.norm3 = nn.LayerNorm(d_model) def forward( self, x: torch.Tensor, memory: torch.Tensor, tgt_mask: torch.Tensor | None = None, tgt_key_padding_mask: torch.Tensor | None = None, memory_key_padding_mask: torch.Tensor | None = None, ) -> torch.Tensor: """单层解码层前向。 参数: - x: (B, T, C) 解码器输入 - memory: (B, S, C) 编码器输出 - tgt_mask: (T, T) 因果掩码,true为屏蔽 - tgt_key_padding_mask: (B, T) - memory_key_padding_mask: (B, S) 返回: - (B, T, C) """ # 1) 解码器自注意力(带因果掩码 tgt_mask) sa = self.self_attn(x, x, x, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask) x = self.norm1(x + sa) # 2) 交叉注意力:Q 来自解码器,K/V 来自编码器 memory ca = self.cross_attn(x, memory, memory, attn_mask=None, key_padding_mask=memory_key_padding_mask) x = self.norm2(x + ca) # 3) 前馈 ff = self.ff(x) x = self.norm3(x + ff) return x class Encoder(nn.Module): def __init__(self, d_model: int, nhead: int, dim_ff: int, num_layers: int, dropout: float = 0.1): super().__init__() self.layers = nn.ModuleList([ EncoderLayer(d_model, nhead, dim_ff, dropout) for _ in range(num_layers) ]) def forward(self, x: torch.Tensor, src_key_padding_mask: torch.Tensor | None = None) -> torch.Tensor: """堆叠若干编码层。 参数: - x: (B, S, C) - src_key_padding_mask: (B, S) True 为 padding 返回: - (B, S, C) """ for layer in self.layers: x = layer(x, src_key_padding_mask=src_key_padding_mask) return x class Decoder(nn.Module): def __init__(self, d_model: int, nhead: int, dim_ff: int, num_layers: int, dropout: float = 0.1): super().__init__() self.layers = nn.ModuleList([ DecoderLayer(d_model, nhead, dim_ff, dropout) for _ in range(num_layers) ]) def forward( self, x: torch.Tensor, memory: torch.Tensor, tgt_mask: torch.Tensor | None = None, tgt_key_padding_mask: torch.Tensor | None = None, memory_key_padding_mask: torch.Tensor | None = None, ) -> torch.Tensor: """堆叠若干解码层。 参数: - x: (B, T, C) 目标端嵌入 - memory: (B, S, C) 编码器输出 - tgt_mask: (T, T) 因果掩码,True 为屏蔽 - tgt_key_padding_mask: (B, T) 目标端 padding 掩码 - memory_key_padding_mask: (B, S) 源端 padding 掩码 返回: - (B, T, C) """ for layer in self.layers: x = layer( x, memory, tgt_mask=tgt_mask, tgt_key_padding_mask=tgt_key_padding_mask, memory_key_padding_mask=memory_key_padding_mask, ) return x class Seq2SeqTransformer(nn.Module): """最小可运行的手写 Transformer 序列到序列模型 - 使用我们实现的 Encoder/Decoder/MHA/FFN - 仍保持与上文训练/解码接口一致 """ def __init__(self, src_vocab_size, tgt_vocab_size, d_model=128, nhead=4, num_encoder_layers=2, num_decoder_layers=2, dim_ff=256, dropout=0.1): super().__init__() self.d_model = d_model self.src_tok = nn.Embedding(src_vocab_size, d_model, padding_idx=PAD_IDX) self.tgt_tok = nn.Embedding(tgt_vocab_size, d_model, padding_idx=PAD_IDX) self.pos_enc = PositionalEncoding(d_model, dropout=dropout) self.encoder = Encoder(d_model, nhead, dim_ff, num_encoder_layers, dropout) self.decoder = Decoder(d_model, nhead, dim_ff, num_decoder_layers, dropout) self.generator = nn.Linear(d_model, tgt_vocab_size) def make_subsequent_mask(self, sz: int) -> torch.Tensor: """构造大小为 (sz, sz) 的下三角因果掩码;True 为屏蔽(不允许看未来)。""" return torch.triu(torch.ones(sz, sz, dtype=torch.bool), diagonal=1) def forward(self, src, tgt_in, src_pad_mask, tgt_pad_mask): """训练/教师强制阶段的前向。 参数: - src: (B, S) 源 token id - tgt_in: (B, T) 目标端输入(以 BOS 开头) - src_pad_mask: (B, S) True 为 padding - tgt_pad_mask: (B, T) True 为 padding(针对 tgt_in) 返回: - logits: (B, T, V) 词表维度的分类分布 """ # 1) 词嵌入 + 位置编码 src_emb = self.pos_enc(self.src_tok(src)) # (B,S,C) tgt_emb = self.pos_enc(self.tgt_tok(tgt_in)) # (B,T,C) # 2) 编码:仅使用 key_padding_mask 屏蔽 padding memory = self.encoder(src_emb, src_key_padding_mask=src_pad_mask) # (B,S,C) # 3) 解码:自注意力需要因果掩码 + padding 掩码;交叉注意力需要 memory 的 padding 掩码 tgt_mask = self.make_subsequent_mask(tgt_in.size(1)).to(src.device) # (T,T) out = self.decoder( tgt_emb, memory, tgt_mask=tgt_mask, tgt_key_padding_mask=tgt_pad_mask, memory_key_padding_mask=src_pad_mask, ) # (B,T,C) logits = self.generator(out) return logits @torch.no_grad() def greedy_decode(self, src_ids: List[int], max_len=20, device="cpu"): """在推理阶段进行贪心解码。 参数: - src_ids: 源端 token id 序列(不含 BOS/EOS) - max_len: 最大生成长度(含 BOS/EOS) - device: 运行设备 返回: - 生成的目标端 id 序列(含 BOS/EOS) """ #切换为评估模式,关闭dropout/batchnorm等随机性 self.eval() # 将源端token id序列转换为张量,并添加一个维度,如[1, 2, 3, 4] -> [[1, 2, 3, 4]] # 变为批维度的 (1, S);dtype 为 long 主要是以适配 nn.Embedding的输入格式。 src = torch.tensor(src_ids, dtype=torch.long, device=device).unsqueeze(0) # 生成一个跟src相同形状的mask矩阵,让编码器不要计算提取pandding的位置信息。 #按元素判断 src 是否等于 PAD_IDX,等于的位置为 True,不等的位置为 False。 src_pad_mask = src.eq(PAD_IDX) # 计算src_tok= src 经过词嵌入+位置编码后的结果 src_tok = self.src_tok(src) src_pos = self.pos_enc(src_tok) # 将该结果送入编码器,返回的memory就是编码器提取的特征向量。 # 输入编码器,即使没有填充(pandding)的token,也需要传入src_key_padding_mask。 memory = self.encoder(src_pos, src_key_padding_mask=src_pad_mask) # 初始化目标端token id序列,维度为(1,1),初始值为BOS_IDX # 表示目标端序列的开始,BOS_IDX=1 # 推理时输入是没有PAD,但是仍然需要tgt_pad_mask. ys = torch.tensor([[BOS_IDX]], dtype=torch.long, device=device) for _ in range(max_len - 1): #计算本次解码的Mask,跟ys形状一样。 tgt_pad_mask = ys.eq(PAD_IDX) # 计算本次因果掩码,把未来看到的token都屏蔽。 tgt_mask = self.make_subsequent_mask(ys.size(1)).to(device) # 可以看到当推理模式时,解码器输入token数量依次是1,2,3,4..... out = self.decoder( self.pos_enc(self.tgt_tok(ys)), memory, tgt_key_padding_mask=tgt_pad_mask, memory_key_padding_mask=src_pad_mask, ) # 转化为预测词的概率分布 logits = self.generator(out[:, -1:, :]) # 使用贪心选择概率最大的作为本次预测的目标 next_token = logits.argmax(-1) next_id = next_token.item() # 显示选择的token token_text = TGT_ITOS[next_id] if next_id < len(TGT_ITOS) else f"ID_{next_id}" print(f"选择: {token_text}({next_id})") ys = torch.cat([ys, next_token], dim=1) if next_id == EOS_IDX: break return ys.squeeze(0).tolist() # -------------------------- # 4) Train # -------------------------- device = torch.device("cuda" if torch.cuda.is_available() else "cpu") dataset = ToyDataset(pairs) loader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn) model = Seq2SeqTransformer( src_vocab_size=len(SRC_ITOS), tgt_vocab_size=len(TGT_ITOS), d_model=6, nhead=3, num_encoder_layers=2, num_decoder_layers=2, dim_ff=256, dropout=0.1 ).to(device) criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX) optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4) def evaluate_sample(sent="我 有 一个 苹果"): """辅助函数:对输入中文句子进行编码→推理→解码并打印结果。""" ids = encode_src(sent) print("ids",ids) pred_ids = model.greedy_decode(ids, device=device) pred_text = decode_tgt(pred_ids) print(f'INPUT : {sent}') print(f'OUTPUT: {pred_text}\n') print("Before training:") evaluate_sample("我 有 一个 苹果") EPOCHS = 80 # 小步数即可过拟合玩具数据 for epoch in range(1, EPOCHS + 1): model.train() total_loss = 0.0 for src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask in loader: src = src.to(device) tgt_in = tgt_in.to(device) tgt_out = tgt_out.to(device) src_pad_mask = src_pad_mask.to(device) tgt_pad_mask = tgt_pad_mask.to(device) logits = model(src, tgt_in, src_pad_mask, tgt_pad_mask) # (B, T, V) loss = criterion(logits.reshape(-1, logits.size(-1)), tgt_out.reshape(-1)) optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() total_loss += loss.item() if epoch % 5 == 0 or epoch == 1: print(f"Epoch {epoch:02d} | loss={total_loss/len(loader):.4f}") evaluate_sample("我 有 一个 苹果") print("After training:") evaluate_sample("我 有 一个 苹果") evaluate_sample("我 有 一本 书") evaluate_sample("你 有 一个 苹果") -
dataset和DataLoader
简介 Dataset和DataLoader在pytorch中主要用于数据的组织。这两个类通常一起搭配处理深度学习中的数据流。 Dataset 用于产出“单个样本”:定义怎么按索引取到一个样本,以及总共有多少个样本。 DataLoader 负责“成批取样”:决定批大小、是否打乱、多进程加载、并用 collate_fn 把一个批里的样本“拼起来”(对齐、padding、mask、teacher forcing 等)。 一句话记忆:Dataset 只管“单条样本”;DataLoader 负责“多条怎么一起、怎么并行、怎么对齐”。变长就写 collate_fn,性能就调 workers/pin_memory/分桶。 Dataset Dataset类作用:定义数据集的统一接口,支持自定义数据加载逻辑。 关键方法: init:初始化数据路径、预处理函数等。 len:返回数据集样本总数。 getitem:根据索引返回单个样本(数据+标签)。 通常情况下用户都会有自己的数据集,所以定义的数据集类继承dataset。 #准备一个数据集 pairs: List[Tuple[str, str]] = [ ("我 有 一个 苹果", "i have an apple"), ("我 有 一本 书", "i have a book"), ("你 喜欢 书", "you like books"), ("我 吃 苹果", "i eat apples"), ] def build_vocab(texts: List[str]): tokens = set() for s in texts: tokens.update([w.lower() for w in s.split()]) itos = ["<pad>", "<bos>", "<eos>"] + sorted(tokens) stoi = {t: i for i, t in enumerate(itos)} return stoi, itos src_texts = [s for s, _ in pairs] tgt_texts = [t for _, t in pairs] SRC_STOI, SRC_ITOS = build_vocab(src_texts) TGT_STOI, TGT_ITOS = build_vocab(tgt_texts) PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2 def encode_src(s: str) -> List[int]: return [SRC_STOI[w.lower()] for w in s.split()] def encode_tgt(s: str) -> List[int]: return [BOS_IDX] + [TGT_STOI[w.lower()] for w in s.split()] + [EOS_IDX] # Dataset:定义“单样本怎么取” @dataclass class Example: src: List[int] tgt: List[int] class ToyDataset(Dataset): def __init__(self, pairs: List[Tuple[str, str]]): for s, t in pairs: print("encode_src(s)",encode_src(s)) print("encode_tgt(t)",encode_tgt(t)) self.data = [Example(encode_src(s), encode_tgt(t)) for s, t in pairs] def __len__(self) -> int: return len(self.data) def __getitem__(self, idx: int) -> Example: return self.data[idx] 样本结构:用 Example(src: List[int], tgt: List[int]) 表示一条样本的源序列与目标序列(都是 token id 列表)。 词表与编码:源序列仅分词并映射到 id。目标序列前加 bos、后加 eos,便于自回归训练。 协议:实现 len 和 getitem 两个方法即可被 DataLoader 使用。 DataLoader class torch.utils.data.DataLoader(Data[T_co]): def __init__( self, dataset, batch_size: int = 1, shuffle: bool | None = None, sampler = None, batch_sampler = None, num_workers: int = 0, collate_fn = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn = None, multiprocessing_context = None, generator = None, prefetch_factor: int = 2, persistent_workers: bool = False, pin_memory_device: str = "" ): ... dataset: Dataset 或 IterableDataset 实例。 batch_size: 每批样本数。 shuffle: 是否在每个 epoch 打乱索引(Map-style 且未显式传 sampler 时有效)。 sampler: 自定义样本采样器(与 shuffle 互斥;指定它就不要再用 shuffle)。 batch_sampler: 一次直接产出“一个 batch 的索引列表”(与 batch_size、shuffle、sampler 互斥)。 num_workers: 进程数(0 为主进程;>0 开多进程并行加载)。 collate_fn(samples_list) -> batch: 批内拼接函数;变长序列需要自定义(默认会尝试堆叠等长 tensor)。 pin_memory: 将 batch 固定到页锁内存,配合 CUDA 加速 H2D 拷贝。 drop_last: 数据量不是 batch_size 整数倍时,是否丢弃最后不满的一批。 timeout: 从 worker 等待数据的秒数(>0 时生效)。 worker_init_fn(worker_id): 每个 worker 的初始化回调(设随机种子、打开文件等)。 multiprocessing_context: 指定多进程上下文(spawn/forkserver 等)。 generator: 控制随机性(打乱、采样)用的随机数生成器。 prefetch_factor: 每个 worker 预取多少个 batch(num_workers > 0 时有效)。 persistent_workers: True 时 DataLoader 第一次迭代后保持 worker 不销毁,提高多轮迭代性能。 pin_memory_device: 当 pin_memory=True 时,指定固定内存的设备标签(一般留空即可)。 DataLoader返回是一个可迭代的对象,每次迭代产出一个批次的样本。一个批次的内容就是把当批样本列表交给 collate_fn 的返回值(若未自定义,则用 PyTorch 的默认 default_collate)。而类型取决于两点Dataset.getitem 返回什么(tensor/数值/dict/tuple…)和collate_fn 如何把一批“样本列表”拼成“批次”。 这里重点阐述一下collate_fn是一个用户需要注册的回调函数,目的是要把一个批的样本拼接起来。同时对于输入样本如果张量的形状不一致如变长序列,进行padding、对齐、mask等动作。 def collate_fn(batch: List[Example]): src_max = max(len(b.src) for b in batch) tgt_max = max(len(b.tgt) for b in batch) src_batch: List[List[int]] = [] tgt_in_batch: List[List[int]] = [] tgt_out_batch: List[List[int]] = [] for ex in batch: src = ex.src + [PAD_IDX] * (src_max - len(ex.src)) # teacher forcing:输入去掉最后一个、输出去掉第一个 tgt_in = ex.tgt[:-1] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[:-1])) tgt_out = ex.tgt[1:] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[1:])) src_batch.append(src) tgt_in_batch.append(tgt_in) tgt_out_batch.append(tgt_out) src = torch.tensor(src_batch, dtype=torch.long) # (B,S) tgt_in = torch.tensor(tgt_in_batch, dtype=torch.long) # (B,T) tgt_out = torch.tensor(tgt_out_batch, dtype=torch.long) # (B,T) src_pad_mask = src.eq(PAD_IDX) # (B,S) True=PAD tgt_pad_mask = tgt_in.eq(PAD_IDX) # (B,T) True=PAD return src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask 输入:batch 是若干个 Example,每个包含 src: List[int] 与 tgt: List[int](目标序列已含 bos/eos)。 核心:对齐变长序列(右侧 padding),构造 teacher forcing 的 (tgt_in, tgt_out),并生成 padding 掩码。 输出: src: (B, S) tgt_in: (B, T) tgt_out: (B, T) src_pad_mask: (B, S);True=PAD tgt_pad_mask: (B, T);True=PAD 首先使用src_max/tgt_max计算批内最长长度,这样能够将所有样本右侧补到同一长度,方便堆叠为矩阵。 接着定义批内累积的容器src_batch,tgt_in_batch,tgt_out_batch。 src_batch: 编码器输入样本的批次。 tgt_in_batch:解码器输入样本的批次。 tgt_out_batch:解码器输出样本的批次。 其次使用for循环对每个样本进行补齐,使其跟src_max、tgt_max长度一致,[PAD_IDX] * (src_max - len(ex.src))的意思是将[PAD_IDX]的单元素列表重复src_max - len(ex.src)用于拼接追加到ex.src后,使其对齐。tgt_in和tgt_out同理。 在对tgt_in和tgt_out做样本补齐时,因为输入ex.tgt是包含了bos和eos目标序列,对于tgt_in输入需要去掉最后一个token bos,tgt_out输出需要去掉第一个token eos。 然后就是将补齐的序列依次添加到src_batch,tgt_in_batch,tgt_out_batch。这样就对输入的数据进行了分类,把编码器的输入整合了在一起,解码器的输入和输出整合了一起。 最后就是将批内对齐后的源序列列表转换为张量,同时计算src和tag_in的mask,也就是说对数据哪些位置添加了pad。 下面是collate_fn相关的打印数据,便于理解。 batch [Example(src=[9, 10, 3, 11], tgt=[1, 11, 10, 4, 5, 2]), Example(src=[6, 8, 5], tgt=[1, 13, 12, 8, 2])] src [9, 10, 3, 11] tgt_in [1, 11, 10, 4, 5] tgt_out [11, 10, 4, 5, 2] src [6, 8, 5, 0] tgt_in [1, 13, 12, 8, 0] tgt_out [13, 12, 8, 2, 0] src_batch [[9, 10, 3, 11], [6, 8, 5, 0]] tgt_in_batch [[1, 11, 10, 4, 5], [1, 13, 12, 8, 0]] tgt_out_batch [[11, 10, 4, 5, 2], [13, 12, 8, 2, 0]] src tensor([[ 9, 10, 3, 11], [ 6, 8, 5, 0]]) tgt_in tensor([[ 1, 11, 10, 4, 5], [ 1, 13, 12, 8, 0]]) tgt_out tensor([[11, 10, 4, 5, 2], [13, 12, 8, 2, 0]]) src_pad_mask tensor([[False, False, False, False], [False, False, False, True]]) tgt_pad_mask tensor([[False, False, False, False, False], [False, False, False, False, True]]) src tensor([[ 9, 10, 3, 11], [ 6, 8, 5, 0]]) tgt_in tensor([[ 1, 11, 10, 4, 5], [ 1, 13, 12, 8, 0]]) tgt_out tensor([[11, 10, 4, 5, 2], [13, 12, 8, 2, 0]]) src_mask tensor([[False, False, False, False], [False, False, False, True]]) tgt_mask tensor([[False, False, False, False, False], [False, False, False, False, True]]) 最后完整的示例代码 #!/usr/bin/env python3 """ 最小可运行示例:用 Dataset + DataLoader(含 collate_fn)演示变长序列如何拼批并生成 padding 掩码。 运行: python3 dataloader_demo.py """ from dataclasses import dataclass from typing import List, Tuple import torch from torch.utils.data import Dataset, DataLoader # -------------------------- # 1) 准备一点语料(空格分词) # -------------------------- pairs: List[Tuple[str, str]] = [ ("我 有 一个 苹果", "i have an apple"), ("我 有 一本 书", "i have a book"), ("你 喜欢 书", "you like books"), ("我 吃 苹果", "i eat apples"), ] def build_vocab(texts: List[str]): tokens = set() for s in texts: tokens.update([w.lower() for w in s.split()]) itos = ["<pad>", "<bos>", "<eos>"] + sorted(tokens) stoi = {t: i for i, t in enumerate(itos)} return stoi, itos src_texts = [s for s, _ in pairs] tgt_texts = [t for _, t in pairs] SRC_STOI, SRC_ITOS = build_vocab(src_texts) TGT_STOI, TGT_ITOS = build_vocab(tgt_texts) PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2 def encode_src(s: str) -> List[int]: return [SRC_STOI[w.lower()] for w in s.split()] def encode_tgt(s: str) -> List[int]: return [BOS_IDX] + [TGT_STOI[w.lower()] for w in s.split()] + [EOS_IDX] # -------------------------- # 2) Dataset:定义“单样本怎么取” # -------------------------- @dataclass class Example: src: List[int] tgt: List[int] class ToyDataset(Dataset): def __init__(self, pairs: List[Tuple[str, str]]): for s, t in pairs: print("encode_src(s)",encode_src(s)) print("encode_tgt(t)",encode_tgt(t)) self.data = [Example(encode_src(s), encode_tgt(t)) for s, t in pairs] def __len__(self) -> int: return len(self.data) def __getitem__(self, idx: int) -> Example: return self.data[idx] # -------------------------- # 3) collate_fn:把“样本列表”拼成一批(对齐 padding + 生成 mask + teacher forcing) # -------------------------- def collate_fn(batch: List[Example]): src_max = max(len(b.src) for b in batch) #计算批次内最长长度,这样能将样本右侧补齐到同一长度,方便堆叠矩阵 tgt_max = max(len(b.tgt) for b in batch) src_batch: List[List[int]] = [] tgt_in_batch: List[List[int]] = [] tgt_out_batch: List[List[int]] = [] print("batch",batch) for ex in batch: src = ex.src + [PAD_IDX] * (src_max - len(ex.src)) # teacher forcing:输入去掉最后一个、输出去掉第一个 tgt_in = ex.tgt[:-1] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[:-1])) tgt_out = ex.tgt[1:] + [PAD_IDX] * (tgt_max - 1 - len(ex.tgt[1:])) print("src",src) print("tgt_in",tgt_in) print("tgt_out",tgt_out) src_batch.append(src) tgt_in_batch.append(tgt_in) tgt_out_batch.append(tgt_out) print("src_batch",src_batch) print("tgt_in_batch",tgt_in_batch) print("tgt_out_batch",tgt_out_batch) src = torch.tensor(src_batch, dtype=torch.long) # (B,S) tgt_in = torch.tensor(tgt_in_batch, dtype=torch.long) # (B,T) tgt_out = torch.tensor(tgt_out_batch, dtype=torch.long) # (B,T) src_pad_mask = src.eq(PAD_IDX) # (B,S) True=PAD tgt_pad_mask = tgt_in.eq(PAD_IDX) # (B,T) True=PAD print("src",src) print("tgt_in",tgt_in) print("tgt_out",tgt_out) print("src_pad_mask",src_pad_mask) print("tgt_pad_mask",tgt_pad_mask) return src, tgt_in, tgt_out, src_pad_mask, tgt_pad_mask # -------------------------- # 4) DataLoader:定义“如何按批取样本”并演示输出 # -------------------------- def main(): dataset = ToyDataset(pairs) for i in range(len(dataset)): print("dataset",dataset.__getitem__(i)) loader = DataLoader( dataset, batch_size=2, shuffle=True, num_workers=0, # 跨平台演示,用 0;Linux 可调大 collate_fn=collate_fn, pin_memory=False, ) # EPOCH=40 # for epoch in range(EPOCH): # for src, tgt_in, tgt_out, src_mask, tgt_mask in loader: # 前向、loss、反传、优化 total_steps = 1000 data_iter = iter(loader) for step in range(total_steps): try: src, tgt_in, tgt_out, src_mask, tgt_mask = next(data_iter) except StopIteration: # 当前迭代器用尽,重建一个新的(相当于进入新一轮) data_iter = iter(loader) src, tgt_in, tgt_out, src_mask, tgt_mask = next(data_iter) print("src",src) print("tgt_in",tgt_in) print("tgt_out",tgt_out) print("src_mask",src_mask) print("tgt_mask",tgt_mask) if __name__ == "__main__": main() iter(loader): 把可迭代的 DataLoader 变成“批次迭代器”。 next(iterator): 从该迭代器中取“下一个批次”。第一次调用就是“第一个 batch”。 it = iter(loader) batch1 = next(it) batch2 = next(it) 在 shuffle=True 时,每次 iter(loader) 相当于开始“新的一轮遍历”,顺序会重新洗牌;drop_last、num_workers、pin_memory 等参数会影响批次数量、并行加载与传输性能。 当然除了用next迭代,还是用for循环的方式,如下: for epoch in range(EPOCH): for src, tgt_in, tgt_out, src_mask, tgt_mask in loader: print("src", src) print("tgt_in", tgt_in) print("tgt_out", tgt_out) print("src_mask", src_mask) print("tgt_mask", tgt_mask) -
数据维度
维度是什么 维度=数据需要“几个”索引才能定位到一个元素,也叫做轴数(axis)或阶(rank)。 可以看成"套盒子"的层数,盒子里面装盒子,再装数字。每多一层外括号/分类,就多一维。 0维=一个数;1维=一排数;2维=表格;3维=一摞表格;更高维=外面再套一层一层分类; 判断有几个维度的方法: 获取一个元素需要几个索引才能定位到。 多一层外括号=多一维;形状从外到内写“有多少个”。(外层是更粗粒度的分类,写在前面,如小批量彩色图像 (B, C, H, W);批次B、通道C、高H、宽W) 1D: ──●──●──●── 一条线 2D: 行×列 一张表 ┌───────┐ │● ● ● │ │● ● ● │ └───────┘ 3D: 多张2D表叠成“砖块” 从0到多维的例子 0维(标量):单个数 42 标量 shape:(), 只要“指它自己”就能找到,例:体温36.5。 1维(向量):一排数 [3, 5, 8] 向量shape:(N),需要1个索引(第几个)才能定位。 2维(矩阵/表格):多排多列 [ [1, 2, 3], [4, 5, 6] [7, 8, 9]] 矩阵shape:(R,C),需要2个索引(第几行,第几列)才能定位到。 3维度(立体):多张矩阵堆叠 [ [[1,2,3], [4,5,6]], [[7,8,9], [11,12,13]] ] 或 层0: [ [...], [...], ... ] 层1: [ [...], [...], ... ] ... 立体shape:(D,R,C),需要3个索引(第几层、第几行、第几列)才能定位到。 n维(张量):继续外面套一层索引 如4维度,小批量彩色图像 (B, C, H, W);批次B、通道C、高H、宽W 深度学习场景维度含义 图像/CNN:(B,C,H,W),B为batch个数,C为图像通道,H为图像高度,W为图像宽度。 文本/transformer:(B,S,C),Batch size,批大小。一次前向里同时处理的样本数。S有时也写作L,Sequence length,序列长度/时间步数(NLP 的 token 数、语音/时序的帧数)。在图像等场景里,若把二维特征展平成序列,也可表示展平后的步数。Channels/Features,特征维度。NLP 里常指 embedding 或 d_model;CV 里指通道数;时序里指每步的特征维度。[B, S, C] 通常表示“B 个样本,每个样本有 S 个时间步/位置,每个时间步有 C 维特征”。 怎么理解C(特征维/通道数)? 在一个张量形状 [B, S, C] 中,C 表示“每个位置(序列中的每个 token/时间步)所携带的特征向量维度”。也就是“描述一个位置所需的数值属性个数”。 表达能力上限: C 越大,单个位置能承载的信息越丰富(更“宽”的向量空间),可拟合更复杂的模式。 稳定性与信息瓶颈: 太小的 C 可能造成信息瓶颈,难以表达远距离依赖或复杂结构。 计算与显存代价: 层内线性/注意力的主计算大多与 C^2 成正比,激活占用与 BLC 成正比。增大 C 会显著提高计算/显存成本。 x.dim() # 轴数,也就是多少个维度。 x.shape # 形状,如 (B,L,C) x.size(-1) # 最后一维长度 -
ONNX Runtime C++端侧模型部署YOLOv5
加载准备 初始化ONNXRuntime环境 Ort::Env env(ORT_LOGGING_LEVEL_WARNING, "YOLOv5Inference"); Ort::Env 是 ONNX Runtime C++ API 中用于初始化运行环境的类,有多个重载的构造函数,下面是一个构造函数原型及参数作用如下。 Ort::Env( OrtLoggingLevel logging_level, const char* logid, OrtLoggingFunction logging_fn = nullptr, void* logger_param = nullptr ); logging_level:控制日志输出级别 logid: 自定义日志标签,用于区分不同模块的日志来源 logging_fn:自定义日志回调函数,若为 nullptr 则使用默认日志输出到控制台。 logger_param:传递给自定义日志函数的用户参数(如上下文对象) 设置会话参数 Ort::SessionOptions session_options; session_options.SetIntraOpNumThreads(1); session_options.SetGraphOptimizationLevel(GraphOptimizationLevel::ORT_ENABLE_ALL); 初始化一个空的会话配置对象session_options,SetIntraOpNumThreads限制单个算子(Intra-op)内部使用的线程数为 1,适用于轻量级任务或避免多线程竞争,SetGraphOptimizationLevel启用所有图优化策略(如算子融合、常量折叠),提升推理性能。 模型加载 Ort::Session session_(env, modelPath.c_str(), session_options); Ort::Session 是 ONNX Runtime C++ API 中用于加载 ONNX 模型并创建推理会话的核心类,其功能分解如下。 Ort::Session( const Ort::Env& env, const char* model_path, const Ort::SessionOptions& options ); env:全局运行环境对象,管理线程池和内存分配等资源,需优先初始化。 model_path:ONNX 模型文件的路径,c语言的字符串类型。 options:会话参数,配置会话行为,如线程数、优化级别、硬件后端等。 获取输入和输出信息 输入名称 Ort::AllocatorWithDefaultOptions allocator; //创建默认内存分配器对象,用于管理 ONNX Runtime 中的内存分配(如节点名称字符串的内存 std::vector<const char*> input_node_names_; //存储 C 风格字符串指针,用于直接传递给 ONNX Runtime 的推理接口 std::vector<std::string> input_names_; //存储标准字符串对象的vector,用于长期维护字符串内存 size_t num_inputs_; num_inputs_ = session_.GetInputCount(); //获取输入节点的个数,有多少个节点就决定了多个个name,一般都是1个。 input_node_names_.resize(num_inputs_); input_names_.resize(num_inputs_, ""); //预分配容器空间,避免动态扩容的开销 std::cout << "num_inputs = "<< num_inputs_<<std::endl; for (size_t i = 0; i < num_inputs_; ++i) { auto input_name = session_.GetInputNameAllocated(i, allocator); //通过分配器安全获取第 i 个输入节点的名称(返回 Ort::AllocatedStringPtr 对象) input_names_[i].append(input_name.get()); //获取名称的原始指针,存入 input_names_ 的字符串中 input_node_names_[i] = input_names_[i].c_str(); //将 std::string 转换为 C 风格指针,供 input_node_names_ 使用 } 上面函数示例了如何获取输入节点name,首先通过session_.GetInputCount()获取到输入的节点,然后使用for循环进行遍历每个节点,通过session_.GetInputNameAllocated(i, allocator)获取每个节点的名称,返回一个Ort::AllocatedStringPtr智能指针,需要通过.get方法返回c字符串,由于智能指针指向的存储空间退出for后会销毁,所以上述代码将其复制到input_names_中。 输入张量维度 Ort::TypeInfo input_type_info = session_.GetInputTypeInfo(0); //获取模型第0个输入节点的类型信息对象,返回Ort::TypeInfo类型 auto input_tensor_info = input_type_info.GetTensorTypeAndShapeInfo(); //从类型信息中提取张量相关的形状和数据类型信息,返回Ort::TensorTypeAndShapeInfo对象 std::vector<int64_t> input_dims = input_tensor_info.GetShape(); //获取输入张量的维度信息,返回std::vector<int64_t>容器,存储各维度大小 //典型YOLO模型的输入维度为[batch, channel, height, width] int inputWidth = input_dims[3]; int inputHeight = input_dims[2]; 上面函数示例获取输入张量形状,最终通过张量的形状获取到了输入图像的宽和高。实际上可以简化一下,按照下面的方式。 auto inputShapeInfo = session_.GetInputTypeInfo(0).GetTensorTypeAndShapeInfo().GetShape(); int ch = inputShapeInfo[1]; inputWidth = inputShapeInfo[2]; inputHeight = inputShapeInfo[3]; 输出名称 std::vector<const char*> output_node_names_; //存储C风格字符串指针的vector,用于兼容需要const char*的ONNX Runtime API调用 std::vector<std::string> output_names_; //存储标准字符串对象的vector,用于长期维护字符串内存 size_t num_outputs_; num_outputs_ = session_.GetOutputCount(); //获取模型输出节点数量 output_node_names_.resize(num_outputs_); output_names_.resize(num_outputs_, ""); //预分配两个vector的空间 for (size_t i = 0; i < num_outputs_; ++i) { auto output_name = session_.GetOutputNameAllocated(i, allocator); output_names_[i].append(output_name.get()); //将名称存入std::string保证生命周期 output_node_names_[i] = output_names_[i].c_str(); } //循环获取每个输出节点名称 上面示例了获取输出名称,与输入方法类似。 输入预处理 cv::Mat image = cv::imread(imagePath); if (image.empty()) { std::cerr << "Error: Could not read image." << std::endl; return -1; } cv::Mat originalImage = image.clone(); cv::Size image_shape = originalImage.size(); // 图像预处理 std::vector<float> inputTensor = preprocess(image, inputWidth, inputHeight); 使用opencv读取图像,调用preprocess进行预处理。 std::vector<float> preprocess(const cv::Mat& image, int inputWidth = 320, int inputHeight = 320) { cv::Mat resizedImage; cv::resize(image, resizedImage, cv::Size(inputWidth, inputHeight)); //图像缩放:使用OpenCV的resize函数将图像调整为指定尺寸(默认320x320) cv::cvtColor(resizedImage, resizedImage, cv::COLOR_BGR2RGB); //颜色空间转换:从BGR转换为RGB格式(多数深度学习模型使用RGB输入) resizedImage.convertTo(resizedImage, CV_32F, 1.0 / 255.0); //数值归一化:通过convertTo将像素值从[0,255]归一化到[0,1]范围 std::vector<float> inputTensor; for (int c = 0; c < 3; ++c) { for (int h = 0; h < inputHeight; ++h) { for (int w = 0; w < inputWidth; ++w) { inputTensor.push_back(resizedImage.at<cv::Vec3f>(h, w)[c]); } } } //通过三重循环将OpenCV的HWC格式(Height-Width-Channel)转换为CHW格式 //内存布局变为连续通道数据:RRR...GGG...BBB,最终输出std::vector<float> return inputTensor; } 上面的代码实现了模型对输入数据的部分预处理,包括输入图片缩放固定尺寸,数值归一化,以及将格式转换为CHW张量格式,但是对于输入模型,需要的数据格式为Ort::Value类型。 std::vector<int64_t> input_shape = {1, 3, inputHeight, inputWidth}; //input_shape采用NCHW格式(批次数-通道-高度-宽度),这是深度学习模型的通用输入布局 auto memory_info = Ort::MemoryInfo::CreateCpu(OrtArenaAllocator, OrtMemTypeDefault); //描述用于描述内存分配的信息,包括内存的位置(CPU 或 GPU)以及内存的具体类型(固定内存或常规内存) Ort::Value input_tensor = Ort::Value::CreateTensor<float>(memory_info, inputTensor.data(), inputTensor.size(), input_shape.data(), input_shape.size()); 关于CreateTensor对象解析如下。 static Ort::Value CreateTensor<float>( const MemoryInfo& memory_info, // 内存管理策略 float* p_data, // 输入数据指针 size_t p_data_length, // 输入的大小 const int64_t* shape, // 维度数组指针 size_t shape_length // 维度数量 ); memory_info:指定张量内存分配策略,通常由Ort::MemoryInfo::CreateCpu创建。 inputTensor.data():输入数据的地址(需确保内存连续)。 inputTensor.size():输入数据的大小。 input_shape.data():输入张量的形状数组信息(如NCHW格式的{1,3,640,640})。 shape_length: 输入张量的形状信息维度数 模型推理 std::vector<Ort::Value> outputs = session_.Run( Ort::RunOptions{nullptr}, input_node_names_.data(), &input_tensor, 1, output_node_names_.data(), output_node_names_.size()); 下面是函数的参数 OrtStatus * OrtApi::Run( const OrtRunOptions * run_options, const char *const *input_names, //输入节点名称的数组 const OrtValue *const *inputs, //模型输入的数据Ort::Value类型 size_t input_len, //输入张量数量,需与input_names数组长度一致 const char *const *output_names,//输出节点名称数组 size_t output_names_len,//输出节点名称的数量,与output_names数组数量保持一致。 ) 输出后处理 //张量信息提取,outputs[0]指向坐标、分数张量指针,outputs[1]指向类别张量的指针 float* dets_data = outputs[0].GetTensorMutableData<float>(); //坐标(格式为[x1,y1,x2,y2,score]) float* labels_pred_data = outputs[1].GetTensorMutableData<float>(); //类别 //张量维度的解析,用于获取检测框的数量 auto dets_tensor_info = outputs[0].GetTensorTypeAndShapeInfo(); std::vector<int64_t> dets_dims = dets_tensor_info.GetShape(); size_t num_detections = dets_dims[1]; //结构化重组,解析输出的张量将其存储dets、scores、lables_pred std::vector<std::vector<float>> dets(num_detections, std::vector<float>(4)); std::vector<float> scores(num_detections); std::vector<int> labels_pred(num_detections); //遍历解析存储坐标dets、分数scores、标签类别lables_pred for (size_t i = 0; i < num_detections; ++i) { for (int j = 0; j < 4; ++j) { dets[i][j] = dets_data[i * 5 + j]; } scores[i] = dets_data[i * 5 + 4]; labels_pred[i] = static_cast<int>(labels_pred_data[i]); } //将坐标信息进行缩放以适应正常的图片大小。 float scale_x = static_cast<float>(image_shape.width) / inputWidth; float scale_y = static_cast<float>(image_shape.height) / inputHeight; for (auto& det : dets) { det[0] *= scale_x; det[1] *= scale_y; det[2] *= scale_x; det[3] *= scale_y; } 上面的代码从输出张量信息中进行解析,将坐标、分数、标签类别依次存储到dets、scores、lables_pred中。 void visualizeResults(cv::Mat& image, const std::vector<std::vector<float>>& dets, const std::vector<float>& scores, const std::vector<int>& labels_pred, const std::vector<std::string>& labels, float conf_threshold = 0.4) { for (size_t i = 0; i < dets.size(); ++i) { const auto& det = dets[i]; float score = scores[i]; if (score > conf_threshold) { int class_id = labels_pred[i]; int x1 = static_cast<int>(det[0]); int y1 = static_cast<int>(det[1]); int x2 = static_cast<int>(det[2]); int y2 = static_cast<int>(det[3]); std::string label = labels[class_id]; cv::rectangle(image, cv::Point(x1, y1), cv::Point(x2, y2), cv::Scalar(0, 255, 0), 2); cv::putText(image, label + ": " + std::to_string(score), cv::Point(x1, y1 - 10), cv::FONT_HERSHEY_SIMPLEX, 0.9, cv::Scalar(0, 255, 0), 2); } } } 最终将获取到的将坐标、分数、标签类别传入到visualizeResults进行绘制。 发现一个开源的ai toolkit,相对比较全。https://github.com/xlite-dev/lite.ai.toolkit/tree/main -
pip install
是什么 pip install 是python包管理器,用于python软件包的下载、安装、卸载等功能。 怎么用 在线安装 pip install 软件包名 pip install 软件包名==版本号 例如pip install requests,或pip install requests==1.1。 也可以从文件列表中获取安装 pip install -r requirements.txt 从requirements.txt文件安装依赖,通常用于项目的依赖管理。 pip的软件包一般有两种格式: whl (Wheel) 格式: 文件是一种预编译的Python包格式,类似于Windows的.exe安装文件,但专门用于Python。 tar.gz:包含了Python包的源代码,需要先解压,然后pip会根据其中的setup.py文件进行编译和安装。 whl文件是pip推荐的安装包格式,因为它更快,而.tar.gz文件则用于源代码分发和离线安装。 torchvision-0.17.1-cp311-cp311-macosx_10_13_x86_64.whl 这个命名规则是什么?第一个cp311是编译是python版本为3.11,第二个cp311表示ABI(应用二进制接口)兼容 Python 3.1,确保与 Python 3.11 环境完全适;操作系统架构为macos 10.13以上,x86_64 intel/AMD 64位。 离线升级 pip install --no-index --find-links=./offline_packages -r requirements.txt no-index:表示不从网上获取安装。 find-links:选择本地包的路径 r:下载所有依赖,可省略。 获取软件包可以通过u盘或者下载的方式,看看怎么下载。 pip download -d ./offline_packages -r requirements.txt 升级 pip install --upgrade 软件包名 或简写方式: pip install -U 软件包 用于升级软件包名称。包括升级pip。 软件包源 查看源 pip config list 安装软件时不指定源就会默认从当前的源获取,对应的配置文件路径:~/.config/pip/pip.conf 设置源 pip config set global.index-url <源地址> 示例:pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple 永久设置源,将会写到配置文件。 pip config unset global.index-url 删除全局配置的源。 指定源 pip install xxx -i https:xxx 或者pip install xxx --index-url https:xxx 也可以从github中获取 pip install git+<仓库地址> 常用源 清华大学:https://pypi.tuna.tsinghua.edu.cn/simple 阿里云:https://mirrors.aliyun.com/pypi/simple/ 中国科学技术大学:https://pypi.mirrors.ustc.edu.cn/simple/ 卸载 pip uninstall 软件包名 卸载对应的软件包。 查看 pip --version 查看pip的版本。 pip list 列出安装了那些包 -
ONNX Runtime Python端侧模型部署YOLOv5
ONNX Runtime介绍 ONNX Runtime不依赖于Pytorch、tensorflow等机器学习训练模型框架。他提供了一种简单的方法,可以在CPU、GPU、NPU上运行模型。通常ONNX Runtime用于端侧设备模型的运行推理。要使用ONNX Runtime运行模型,一般的步骤如下: 用你最喜欢的框架(如pytorch、tensorflow、paddle等)训练一个模型。 将模型转换或导出为ONNX格式。 在端侧使用ONNX Runtime加载并运行模型。 模型的训练和导出为ONNX格式这里就不再阐述了。下面基于python在端侧运行模型的示例: import numpy # 导入numpy模块 import onnxruntime as rt # 导入onnxruntime模块 sess = rt.InferenceSession( "logreg_iris.onnx", providers=rt.get_available_providers()) # 加载模型logreg_iris.onnx input_name = sess.get_inputs()[0].name # 获取模型的输入名称,对应的是使用https://netron.app/中intput name。 pred_onx = sess.run(None, {input_name: X_test.astype(numpy.float32)})[0] # 运行模型推理,返回结果到pred_onx中 print(pred_onx) 上面给出的python示例中,端侧运行模型可以总结为2个步骤。加载模型,模型推理。 加载模型 class onnxruntime.InferenceSession( path_or_bytes: str | bytes | os.PathLike, sess_options: onnxruntime.SessionOptions | None = None, providers: Sequence[str | tuple[str, dict[Any, Any]]] | None = None, provider_options: Sequence[dict[Any, Any]] | None = None, **kwargs) path_or_bytes: 模型文件名或者ONNX、ORT格式二进制。 sess_options: 会话选项,比如配置线程数、优先级。 providers: 指定执行提供者优先级(['CUDAExecutionProvider','CPUExecutionProvider']) provider_options: 字典序列,为每个提供者配置专属参数(如CUDA设备ID) options = onnxruntime.SessionOptions() options.SetIntraOpNumThreads(4) # 多设备优先级配置 session = InferenceSession( "model.onnx", sess_options=options, providers=[ ('CUDAExecutionProvider', {'device_id': 0}), 'CPUExecutionProvider' ] ) 模型推理 outputs = senssion.run(output_names, input_feed, run_options=None) output_names:输出节点名称,字符串列表,指定需要获取的输出节点名称,若为None则返回所有输出 input_feed:输入数据,字典类型,结构为{"输入节点名": numpy数组/ORTValue},建议使用ORTValue封装输入数据以减少CPU-GPU拷贝开销。 run_options:运行参数,如日志级别。 import numpy as np import onnxruntime as ort # 创建示例数据 cpu_data = np.random.rand(1, 3, 224, 224).astype(np.float32) # 转换为GPU上的ORTValue gpu_ort_value = ort.OrtValue.ortvalue_from_numpy( cpu_data, device_type='cuda', # 关键参数:指定GPU设备 device_id=0 # GPU设备ID(多卡时指定) ) print(gpu_ort_value.device_name()) # 输出: 'Cuda' results = session.run( ["output_name"], {"input_name": gpu_ort_value} # 避免CPU->GPU拷贝 ) 在运行模型是,需要获取模型的输入和输出名称,可以通过调用对应的函数session.get_inputs(),session.get_outputs()来获取。inputs和outputs函数返回的是onnxruntime.NodeArg类,该类是ONNX Runtime中表示计算图节点输入/输出参数的核心类,该类有3个成员变量,如下: property name: 参数唯一标识符,对应计算图中的节点名称。 property shape: 张量形状。 property type:数据类型(如tensor(float32)/tensor(int64)) 以下是获取输入名称和输出名称的示例。 input_name = session.get_inputs()[0].name output_names = [output.name for output in session.get_outputs()] 详细请参考:https://onnxruntime.ai/docs/api/python/api_summary.html YOLOv5运行示例 加载模型 session_options = ort.SessionOptions() session_options.intra_op_num_threads = 1 # 加载 ONNX 模型 session = ort.InferenceSession( "yolov5_n.q.onnx", sess_options=session_options, providers=["XXXExecutionProvider"]) 创建SessionOptions对象用于定制化会话行为,限制算子内部并行线程数为1,加载名为yolov5_n.q.onnx的量化版YOLOv5模型,指定自定义执行提供者XXXExecutionProvider。 图像预处理 image = cv2.imread(args.image) #image shape (375, 500, 3) image_shape = image.shape[:2] #image_shape的值(375, 500),取前面2个值为图像的宽高 # 获取图像的尺寸大小高和宽。 input_tensor = preprocess(image) # 图像预处理函数 def preprocess(image, input_size=(640, 640)): # 调整图像大小为640*640, image = cv2.resize(image, input_size) # 转换颜色空间RGB image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # 归一化处理 #astype(np.float32)将图像像素值从整数类型(如uint8)转换为32位浮点数, #避免后续除法运算的精度损失,/ 255.0将像素值从[0,255]的原始范围线性映射到[0,1]区间, #符合神经网络输入的典型数值范围要求 image = image.astype(np.float32) / 255.0 # 转置模型为CHW格式,原输入是HWC格式。 # 输入的数据是(640,640,3),需要调整为NCHW模型格式 [batch, channel, height, width] # 使用np.transpose进行转置,变换成(3,640,640) image = np.transpose(image, (2, 0, 1)) image = np.expand_dims(image, axis=0) # 接着再加上一个轴变化成(1,3,640,640)tensor。 return image 如何理解深度学习中的轴了? 在深度学习中,轴可以理解为维度。如上图是一个NCHW排布的格式,把N当成第一个维度即位轴0,C第二维度即为轴1,H第三维度即为轴2,W为第四维度即为轴3。np.expand_dims(image, axis=0)即拓展了轴0,原来只有3个维度现在变成4个维度了,N为1。还可以按照指定的轴进行求和,即做压缩。执行np.sum(data, axis=0)时,也就是沿着N的维度就行压缩求和,就变成如上图。由原来的(N,C,W,H)变成了(C',W',H'),即N个CWH中的各自相加。如果是np.sum(data,axis=1),那就是按照C维度方向进行相加,结果就是(N,W,H),即如RGB格式就是每个图像RGB 3通道的像素相加,如下图所示。 模型推理 模型推理前,需要获取计算图输入和输入的名称 input_name = session.get_inputs()[0].name output_names = [output.name for output in session.get_outputs()] print('input name', input_name) print('output name', output_names) 输出结果与下图对应。 input name input output name ['dets', 'labels'] 获取到intput_name和ouput_names后,即可调用运行推理。 outputs = session.run(output_names, {input_name: input_tensor}) 模型后处理 # 把batch这个维度去掉 dets = outputs[0].squeeze() labels_pred = outputs[1].squeeze() #将坐标进行缩放以适应实际图片的大小。 input_size = (640, 640) scale_x = image_shape[1] / input_size[0] scale_y = image_shape[0] / input_size[1] dets[:, 0] *= scale_x dets[:, 1] *= scale_y dets[:, 2] *= scale_x dets[:, 3] *= scale_y 模型outputs有两个输出,一个是dets,这是一个二位数组dets[n][5],其中det[5]包含了坐标x1, y1, x2, y2,score,前面4个预选框的坐标,后面一个为预选框的分数。 def visualize_results(image, dets, labels_pred, labels, conf_threshold): for i in range(len(dets)): det = dets[i] score = det[4] #每个框的分数 if score > conf_threshold: #小于分数的剔除 class_id = int(labels_pred[i]) x1, y1, x2, y2 = map(int, det[:4]) label = labels[class_id] cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText(image, f'{label}: {score:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) return image 根据阈值分数进行画框,最终完成结果的后处理,注意上面并没有进行极大值抑制。 -
密码保护:端侧vscode AI开发环境搭建
此内容受密码保护。如需查阅,请在下列字段中输入您的密码。 密码: -
llama.cpp部署大模型
安装llama.cpp 从GitHub上下载官方的源码。 git clone https://github.com/ggml-org/llama.cpp.git cd llama.cpp 使用camke进行编译,先创建build环境 cmake -B build 发现有报错curl没有安装。 -- The C compiler identification is GNU 11.3.0 -- The CXX compiler identification is GNU 11.3.0 -- Detecting C compiler ABI info -- Detecting C compiler ABI info - done -- Check for working C compiler: /usr/bin/cc - skipped -- Detecting C compile features -- Detecting C compile features - done -- Detecting CXX compiler ABI info -- Detecting CXX compiler ABI info - done -- Check for working CXX compiler: /usr/bin/c++ - skipped -- Detecting CXX compile features -- Detecting CXX compile features - done -- Found Git: /usr/bin/git (found version "2.34.1") -- Looking for pthread.h -- Looking for pthread.h - found -- Performing Test CMAKE_HAVE_LIBC_PTHREAD -- Performing Test CMAKE_HAVE_LIBC_PTHREAD - Success -- Found Threads: TRUE -- Warning: ccache not found - consider installing it for faster compilation or disable this warning with GGML_CCACHE=OFF -- CMAKE_SYSTEM_PROCESSOR: x86_64 -- GGML_SYSTEM_ARCH: x86 -- Including CPU backend -- Found OpenMP_C: -fopenmp (found version "4.5") -- Found OpenMP_CXX: -fopenmp (found version "4.5") -- Found OpenMP: TRUE (found version "4.5") -- x86 detected -- Adding CPU backend variant ggml-cpu: -march=native -- Could NOT find CURL (missing: CURL_LIBRARY CURL_INCLUDE_DIR) CMake Error at common/CMakeLists.txt:85 (message): Could NOT find CURL. Hint: to disable this feature, set -DLLAMA_CURL=OFF 使用apt-get安装libcur14,如下。 sudo apt-get update sudo apt-get install libcurl4-openssl-dev 安装curl成功后,解决了,继续执行cmake -B build,会生成build目录。 cmake -B build -- Warning: ccache not found - consider installing it for faster compilation or disable this warning with GGML_CCACHE=OFF -- CMAKE_SYSTEM_PROCESSOR: x86_64 -- GGML_SYSTEM_ARCH: x86 -- Including CPU backend -- x86 detected -- Adding CPU backend variant ggml-cpu: -march=native -- Found CURL: /usr/lib/x86_64-linux-gnu/libcurl.so (found version "7.81.0") -- Configuring done -- Generating done -- Build files have been written to: /root/autodl-tmp/llama.cpp/build 接着llama.cpp的源码。 cmake --build build --config Release 编译完成之后,生成的二进制都在llama.cpp/build/bin目录下。 模型下载 使用wget下载模型。 wget https://huggingface.co/bartowski/Llama-3.2-3B-Instruct-GGUF/resolve/main/Llama-3.2-3B-Instruct-Q8_0.gguf llamap.cpp只能使用GGUF格式的大模型,使用的模型可以在Hugging Face获取https://huggingface.co/。也可以在modelscope上获取https://modelscope.cn/models。 这里有个技巧,可能仓库里面有很多量化参数的模型,如果使用git全部clone下来会比较久,这里可以只下载指定的GGUF模型,点击要使用的模型,如下: 然后,获取到下面的下载链接。 如果是modelsscope,找到下载,然后鼠标长按左键不松手拖到上面的输入网址框获取到下载链接。 这样就可以使用wget进行下载了。 wget https://huggingface.co/Qwen/Qwen2.5-0.5B-Instruct-GGUF/resolve/main/qwen2.5-0.5b-instruct-q8_0.gguf wget https://modelscope.cn/models/Qwen/Qwen2.5-3B-Instruct-GGUF/resolve/master/qwen2.5-3b-instruct-q8_0.gguf 模型测试 运行大模型 ./llama.cpp/build/bin/llama-cli -m model/Llama-3.2-3B-Instruct-Q8_0.gguf 运行日志如下,可以看到使用的是CPU,没有使用GPU,因为前面编译的时候没有使能CUDA。 llama_perf_sampler_print: sampling time = 8.06 ms / 80 runs ( 0.10 ms per token, 9920.63 tokens per second) llama_perf_context_print: load time = 1070.39 ms llama_perf_context_print: prompt eval time = 859.42 ms / 15 tokens ( 57.29 ms per token, 17.45 tokens per second) llama_perf_context_print: eval time = 20880.31 ms / 65 runs ( 321.24 ms per token, 3.11 tokens per second) llama_perf_context_print: total time = 37979.41 ms / 80 tokens load time: 模型加载时间,耗时1070.39ms,属于一次性开销,与模型大小和硬件I/O性能相关。 prompt eaval time: 有些也称为prefill(TPS),表示提示词处理时间,处理15个输入Token耗时859.42ms,平均57.29ms/Token,速度17.45 Token/s。 eval time:有些也称为decode (TPS), 表示生成推理时间,生成65个Token耗时20880.31ms,平均321.24ms/Token,速度仅3.11 Token/s,显著低于采样阶段的9920.63 Token/s,说明生成阶段存在计算瓶颈。 sampling time: 采样80次仅8.06ms,速度高达9920.63 Token/s,表明采样算法本身效率极高,非性能瓶颈。 total time: 输入到输出的总耗时,包括模型加载时间、提示词处理时间、生成推理时间,其他时间(可能含内存交换或调度延迟) 可以使用vscode的打开多个终端,一个执行大模型交互,一个使用htop看看CPU和内存使用情况。 从上面看输入是17.45 token/s,输出是3.11 token/s,速度还是比较慢。 没有使用GPU,都是用cpu在推理。那么怎么使能使用gpu了?使用下面的方式,构建编译的时候打开CUDA,然后重新编译试一下。要用多线程编译,否则编译贼慢。 cd llama.cpp cmake -B build -DGGML_CUDA=ON cmake --build build --config Release -j16 重新运行模型后,看到硬件信息用了GPU了。 llama_perf_sampler_print: sampling time = 10.88 ms / 105 runs ( 0.10 ms per token, 9649.85 tokens per second) llama_perf_context_print: load time = 959.88 ms llama_perf_context_print: prompt eval time = 573.18 ms / 14 tokens ( 40.94 ms per token, 24.43 tokens per second) llama_perf_context_print: eval time = 17212.83 ms / 91 runs ( 189.15 ms per token, 5.29 tokens per second) llama_perf_context_print: total time = 34584.56 ms / 105 tokens 输出token有提升,但是看起来不明显,为啥了? -
密码保护:YOLOv5端侧部署代码分析
此内容受密码保护。如需查阅,请在下列字段中输入您的密码。 密码: -
端侧部署YOLOv5模型
导出 ONNX模型 python export.py --weights runs/train/exp2/weights/ NPU不支持动态输入,使用onnxim工具进行转换为固定输入,先安装onnxsim工具。 pip install onnxsim -i https://pypi.doubanio.com/simple/ 接着进行转换 python -m onnxsim runs/train/exp2/weights/best.onnx yolov5s-sim.onnx --input-shape 1,3,640,640 模型裁剪 在实际端侧中,NPU端量化的后处理运算不适合使用uint8量化,一般使用float的混合量化,但这样相对麻烦,本文示例将后处理放在CPU测进行,所以我们需要把下图中的后处理部分裁剪掉。 import onnx onnx.utils.extract_model('./yolov5s-mask.onnx', './yolov5s-mask-rt.onnx', ['images'], ['/model.24/Reshape_output_0','/model.24/Reshape_8_output_0','/model.24/Reshape_16_output_0']) 使用上面的python可以进行裁剪,输入为[images],输出为3个节点,下面是最后一个节点的示例截图。实际要根据模型文件进行调整。 python extrat-mask.py 接着使用上面的命令,就输出了裁剪后的模型yolov5s-mask-rt.onnx。 如果原生的模型没有将后处理裁剪,输入输出如下: 输出的tensor[1,25200,85],其中25200=3x(20x20+40x40+80x80),即3个特征图一共25200个先验框。 原生模型使用裁剪后使用netron.app打开得到输入输出如下: YOLOv5模型参数含义详细解释如下: 模型名称: 表示模型或图结构的名称,此处为从主图结构中提取的名称。 输入参数:float32[1,3,640,640],表示输入张量的数据类型和维度。1表示一次处理的样本,3为通道数,输入的高宽均为640,此前YOLOv3是416。 输出参数:有3个输出张量。 -- /model.24/Reshape_output_0:表示8倍降采样率,输出为float32[1,3,85,80,80],网格划分为80x80,每个网格有3给先验框,每个先验框预测包含85个元素(坐标4、置信度1、类别80)。 -- /model.24/Reshape_8_output_0:float32[1,3,85,40,40],网格尺寸为40x40。 -- /model.24/Reshape_16_output_0:float32[1,3,85,20,20],网格尺寸为20x20。 模型参数数量:parameter参数为7225908,表示模型中参数的总数,这是模型复杂度和计算资源需求的重要指标。 再来看看此次针对口罩微调后裁剪后的模型输入输出,使用netron.app打开得到输入输出如下: 上图可以看到,网格划分、先验框数量是一样的,不一样的是预测的元素为8(坐标4,置信度1,类别3)。 创建端侧转换环境 sudo docker images sudo docker run --ipc=host -itd -v /home/xxx/ai/docker_data:/workspace --name laumy_npu_v1.8.x ubuntu-npu:v1.8.11 /bin/bash sudo docker ps -a sudo docker exec -it 55f9cd9eb15e /bin/bash 在进行端侧部署前,需要准备量化环境,这里直接使用的是docker环境。 模型转换 创建目录 |-- data | |-- maksssksksss0.png | |-- maksssksksss1.png | |-- maksssksksss10.png | |-- maksssksksss11.png | |-- maksssksksss12.png | |-- maksssksksss13.png | |-- maksssksksss14.png | |-- maksssksksss15.png | |-- maksssksksss16.png | |-- maksssksksss17.png | |-- maksssksksss18.png | |-- maksssksksss19.png | |-- maksssksksss2.png | |-- maksssksksss3.png | |-- maksssksksss4.png | |-- maksssksksss5.png | |-- maksssksksss6.png | |-- maksssksksss7.png | |-- maksssksksss8.png | `-- maksssksksss9.png |-- dataset.txt `-- yolov5s-mask-rt.onnx 准备数据量化的数据data、数据配置dataset.txt(内容如下)、裁剪的模型yolov5s-mask-rt.onnx。 ./data/maksssksksss0.png ./data/maksssksksss1.png ./data/maksssksksss2.png ./data/maksssksksss3.png ./data/maksssksksss4.png ./data/maksssksksss5.png ./data/maksssksksss6.png ./data/maksssksksss7.png ./data/maksssksksss8.png ./data/maksssksksss9.png ./data/maksssksksss10.png ./data/maksssksksss11.png ./data/maksssksksss12.png ./data/maksssksksss13.png ./data/maksssksksss14.png ./data/maksssksksss15.png ./data/maksssksksss16.png ./data/maksssksksss17.png ./data/maksssksksss18.png ./data/maksssksksss19.png 模型导入 pegasus import onnx --model yolov5s-mask-rt.onnx --output-data yolov5s-mask-rt.data --output-model yolov5s-mask-rt.json 模型导入将输出yolov5s-mask-rt.json和yolov5s-mask-rt.data文件。前者为后期量化需要的网络结构文件,后者为模型网络权重文件。 前后处理配置文件yml 生成前处理配置文件 pegasus generate inputmeta --model yolov5s-mask-rt.json --input-meta-output yolov5s-mask-rt_inputmeta.yml 生成后处理配置文件,因为后处理是在cpu上处理并且我们已经裁剪掉了onn模型的后处理,可以不生成。 pegasus generate postprocess-file --model yolov5s-mask-rt.json --postprocess-file-output yolov5s-mask-rt_postprocess_file.yml 上面根据模型网络结构文件yolov5s-mask-rt.json转化生成后续量化需要的前处理和后处理描述文件,格式为yml格式。 input_meta: databases: - path: dataset.txt #表示模型量化需要的数据描述文件 type: TEXT ports: - lid: images_205 #表示输入节点名称。 category: image dtype: float32 sparse: false tensor_name: layout: nchw #输入数据的排列格式,n表示batch,c表示channel,h表示高,w表示宽 shape: #模型输入的形状 - 1 #输入数据的batch,如果后面量化的batch参数不为1,需要改这里。 - 3 - 640 - 640 fitting: scale preprocess: reverse_channel: true mean: - 0 - 0 - 0 scale: #3通道的缩放值,yolov5s需要改为0.00392157 - 1.0 - 1.0 - 1.0 preproc_node_params: add_preproc_node: false #是否添加预处理节点,用于格式转化和裁剪,这里要改为true preproc_type: IMAGE_RGB #预处理输入的格式 preproc_image_size: - 640 - 640 preproc_crop: enable_preproc_crop: false crop_rect: - 0 - 0 - 640 - 640 preproc_perm: - 0 - 1 - 2 - 3 redirect_to_output: false 对于模型的输入yml,需要将scale修改为0.00392157,同时默认使用cpu预处理图像,所以add_preproc_node设置为true。 量化 pegasus quantize --model yolov5s-mask-rt.json --model-data yolov5s-mask-rt.data --device CPU --iterations=12 --with-input-meta yolov5s-mask-rt_inputmeta.yml --rebuild --model-quantize yolov5s-mask-rt.quantize --quantizer asymmetric_affine --qtype uint8 下面是量化模型的参数解析: model: 模型的网络结构文件 model-data:模型需要的权重文件 with-input-meta:模型需要前处理配置文件 model-quantize: 输出的模型文件 iterations: 模型量化使用的数据量,设置1(默认)使用dataset第一条数据。若设置20,会先遍历dataset.txt中的20行数据。 qtype:量化数据类型,有int8,uint8,int16等。 batch_size: 量化多少轮,如果不为1,需要改输入的yml文件shape,先用默认。 量化后,会生成量化文件yolov5s-mask-rt.quantize。 推理 pegasus inference --model yolov5s-mask-rt.json --model-data yolov5s-mask-rt.data --dtype quantized --model-quantize yolov5s-mask-rt.quantize --device CPU --with-input-meta yolov5s-mask-rt_inputmeta.yml --postprocess-file yolov5s-mask-rt_postprocess_file.yml 模型推理是为了验证量化后的模型效果,这步可省略。 导出模型 pegasus export ovxlib --model yolov5s-mask-rt.json --model-data yolov5s-mask-rt.data --dtype quantized --model-quantize yolov5s-mask-rt.quantize --save-fused-graph --target-ide-project 'linux64' --with-input-meta yolov5s-mask-rt_inputmeta.yml --output-path ovxilb/yolov5s-mask-rt/yolov5s-simprj --pack-nbg-unify --postprocess-file yolov5s-mask-rt_postprocess_file.yml --optimize "VIP9000PICO_PID0XEE" --viv-sdk ${VIV_SDK} 模型导出,最终会生成ovxilb/yolov5s-mask-rt_nbg_unify/network_binary.nb 端侧部署 修改端侧后处理的分类名和数量配置文件。 改完之后,执行./build_linux.sh -t \编译生成端侧的应用,将可执行应用推到设备端。 ./yolov5 -b network_binary.nb -i mask-test.jpeg model_file=network_binary.nb, input=mask-test.jpeg, loop_count=1, malloc_mbyte=10 input 0 dim 3 640 640 1, data_format=2, quant_format=0, name=input[0], none-quant output 0 dim 80 80 8 3, data_format=0, name=uid_20000_sub_uid_1_out_0, none-quant output 1 dim 40 40 8 3, data_format=0, name=uid_20001_sub_uid_1_out_0, none-quant output 2 dim 20 20 8 3, data_format=0, name=uid_20002_sub_uid_1_out_0, none-quant nbg name=network_binary.nb, size: 7196096. create network 0: 35626 us. prepare network: 38972 us. buffer ptr: 0xb6996000, buffer size: 1228800 feed input cost: 94526 us. network: 0, loop count: 1 run time for this network 0: 119081 us. detection num: 1 1: 94%, [ 316, 228, 541, 460], 1 draw objects time : 154 ms destory npu finished. ~NpuUint. 参考: https://v853.docs.aw-ol.com/en/npu/dev_npu/ https://blog.csdn.net/weixin_42904656/article/details/127768309 -
云服务器搭建YOLOv5训练环境
介绍 本文使用AutoDL云服务搭建YOLOv5的运行环境。 获取云服务器 在这个链接上https://www.autodl.com/home订阅服务,这里选择的是按量计费。 镜像选择基础镜像Mniconda最新ubuntu环境。 交钱订阅完成后就可以获取到登录的信息了。 这里使用的是ssh工具根据获取到的登录名和密码进行登录,需要注意的是端口可能不是默认的22,按照实际的端口进行。 配置conda环境 由于autoDL的服务器可能并不能访问外网,这里先将conda的源更换为清华的源。 conda config --remove-key channels conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/ conda config --set show_channel_urls yes pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple 接下来创建和激活虚拟环境 conda create -n yolov5 python==3.8.5 # 创建虚拟环境名称为yolov5, python版本为3.8.5 conda activate yolov5 # 激活yolov5环境 conda init # 如果提示没有初始化conda环境的话,执行conda init后退出控制台重新登录,再次激活。 conda env list # 通过上面的命令可以查看当前创建的conda虚拟环境 拉取YOLOv5代码环境 通过官网链接获取YOLOv5的代码。 cd autodl-tmp/ #这里先切换到audodl-tmp目录,这个空间比较大,读写也比较快。 git clone https://github.com/ultralytics/yolov5 # 在github上拉取代码 如果要在本地使用vscode查看代码的话,可以参考:https://www.autodl.com/docs/vscode/ 这里需要注意的是,audoDL可能没有访问GitHub,处理的办法就是参考这个方法在服务器上面做一个代理,https://github.com/VocabVictor/clash-for-AutoDL 或者https://gitee.com/laumy0929/clash-for-AutoDL 获取到CLASH_URL如下示例: 截屏2025-07-17 12.34.36 截屏2025-07-17 12.39.15 按照yolov5的代码环境需要的包 pip install -r requirements.txt 安装完成后,进行推理测试,看看环境是否安装正常,执行detect.py 指定模型文件和输入图片进行测试。 python detect.py --weights yolov5s.pt --source data/images/bus.jpg 执行上面脚本是,会自动拉取预训练的模型yolov5s.pt,最终将推理结果存储到runs/detect/exp2目录下。 训练自定义数据集 这里从GitHub上抓了一个口罩的书籍集,https://github.com/iAmEthanMai/mask-detection-dataset.git,使用git clone拉取到本地。 git clone https://github.com/iAmEthanMai/mask-detection-dataset.git 查看mask的数据集配置,有3个分类。这里使用yolov5s进行微调,复制一份yolov5的模型配置文件,并修改分类为3,需要根据实际的数据集存放的位置调整一下路径。 cp models/yolov5s.yaml models/mask_yolov5s.yaml 配置完成后就可以进行训练了。 python train.py --data mask-detection-dataset/data/data.yaml --cfg models/mask_yolov5s.yaml --weights pretrained/yolov5s.pt --epoch 100 --batch-size 4 模型文件将输出到:runs/train/exp5/weights/ python detect.py --weights runs/train/exp5/weights/best.pt --source mask-detection-dataset/data/images/maksssksksss1.png 上面执行命令使用训练的模型文件测试看看效果。 -
Windows Ai开发环境安装
annaconda可以理解为ai环境可以创建很多个房间,比如允许多个不同版本的python。每个房间可以保存不同的环境变量。 步骤1:下载安装包,安装anaconda,https://www.anaconda.com/ 步骤2:设置环境变量 设置环境变量需要根据软件实际的安装位置,这里的软件是安装的D盘的。在cmd命令中,执行conda info表示设置环境变量成功。 步骤3: 创建环境 打开Anaconda Prompt终端界面,创建开发环境前,先更新清华的源。 conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/linux-64/ conda config --set show_channel_urls yes 然后进行安装: conda create -n py39_test python=3.9 -y 其中-n指定环境的名称, python=3.9表示安装python3.9的版本,-y表示同意所有安装过程中的所有确认。 步骤4: 激活环境 conda activate py39_test 步骤4:安装基础环境 pip install -r requirements.txt 使用pip install 进行安装,requirements.txt内容如下。 contourpy==1.3.0 cycler==0.12.1 filelock==3.16.1 fonttools==4.55.3 fsspec==2024.12.0 importlib_resources==6.5.2 Jinja2==3.1.5 kiwisolver==1.4.7 MarkupSafe==3.0.2 matplotlib==3.9.4 mpmath==1.3.0 networkx==3.2.1 numpy==2.0.2 packaging==24.2 pandas==2.2.3 pillow==11.1.0 pyparsing==3.2.1 python-dateutil==2.9.0.post0 pytz==2024.2 six==1.17.0 sympy==1.13.1 torch==2.5.1 torchaudio==2.5.1 torchvision==0.20.1 typing_extensions==4.12.2 tzdata==2024.2 zipp==3.21.0 使用pip list可以查看安装的包。 步骤5:安装pycharm,下载链接 https://www.jetbrains.com/pycharm/download/?section=windows -
小智Ai语音交互简要分析
app start 主要是初始化板级、显示、WiFi连接、音频codec、编解码、协议、音效、唤醒几个环节。 auto& board = Board::GetInstance(); //获取板级实例 SetDeviceState(kDeviceStateStarting);//设置出事状态为kDeviceStateStarting /* Setup the display */ auto display = board.GetDisplay(); //获取显示实例 /* Setup the audio codec */ auto codec = board.GetAudioCodec();//获取codec实例 opus_decode_sample_rate_ = codec->output_sample_rate();//获取当前codec的采样率 opus_decoder_ = std::make_unique<OpusDecoderWrapper>(opus_decode_sample_rate_, 1);//初始化opus解码,设置解码采样率 opus_encoder_ = std::make_unique<OpusEncoderWrapper>(16000, 1, OPUS_FRAME_DURATION_MS);//初始化opus编码,设置采样率16Khz // For ML307 boards, we use complexity 5 to save bandwidth // For other boards, we use complexity 3 to save CPU //根据板级来设置opus编码的复杂度 if (board.GetBoardType() == "ml307") { ESP_LOGI(TAG, "ML307 board detected, setting opus encoder complexity to 5"); opus_encoder_->SetComplexity(5); } else { ESP_LOGI(TAG, "WiFi board detected, setting opus encoder complexity to 3"); opus_encoder_->SetComplexity(3); } //如果codec的采样率不是16Khz,需要进行重采样,下面是重采样初始化。 if (codec->input_sample_rate() != 16000) { input_resampler_.Configure(codec->input_sample_rate(), 16000); reference_resampler_.Configure(codec->input_sample_rate(), 16000); } //注册codec输入音频的回调,表示有录音的pcm,触发mainloop处理。 codec->OnInputReady([this, codec]() { BaseType_t higher_priority_task_woken = pdFALSE; xEventGroupSetBitsFromISR(event_group_, AUDIO_INPUT_READY_EVENT, &higher_priority_task_woken); return higher_priority_task_woken == pdTRUE; }); //注册codec输出音频的回调,表示有录音的pcm,触发mainloop处理。 codec->OnOutputReady([this]() { BaseType_t higher_priority_task_woken = pdFALSE; xEventGroupSetBitsFromISR(event_group_, AUDIO_OUTPUT_READY_EVENT, &higher_priority_task_woken); return higher_priority_task_woken == pdTRUE; }); //启动硬件codec,使能录音和播放。 codec->Start(); //开启一个mainloop线程,处理主要逻辑 /* Start the main loop */ xTaskCreate([](void* arg) { Application* app = (Application*)arg; app->MainLoop(); vTaskDelete(NULL); }, "main_loop", 4096 * 2, this, 4, nullptr); //等待WiFi连接好 /* Wait for the network to be ready */ board.StartNetwork(); // Initialize the protocol display->SetStatus(Lang::Strings::LOADING_PROTOCOL);//显示正在加载协议 根据使用MQTT还是Websocet来选择通信协议 #ifdef CONFIG_CONNECTION_TYPE_WEBSOCKET protocol_ = std::make_unique<WebsocketProtocol>(); #else protocol_ = std::make_unique<MqttProtocol>(); #endif //注册网络接收异常回调函数 protocol_->OnNetworkError([this](const std::string& message) { SetDeviceState(kDeviceStateIdle); Alert(Lang::Strings::ERROR, message.c_str(), "sad", Lang::Sounds::P3_EXCLAMATION); }); //注册接收音频的回调函数,接收到音频后,往加入解码队列 protocol_->OnIncomingAudio([this](std::vector<uint8_t>&& data) { std::lock_guard<std::mutex> lock(mutex_); if (device_state_ == kDeviceStateSpeaking) { audio_decode_queue_.emplace_back(std::move(data)); } }); //注册接收协议打开音频的回调,主要是下发解码的的属性信息,包括采样率等。 protocol_->OnAudioChannelOpened([this, codec, &board]() { board.SetPowerSaveMode(false); if (protocol_->server_sample_rate() != codec->output_sample_rate()) { ESP_LOGW(TAG, "Server sample rate %d does not match device output sample rate %d, resampling may cause distortion", protocol_->server_sample_rate(), codec->output_sample_rate()); } SetDecodeSampleRate(protocol_->server_sample_rate()); auto& thing_manager = iot::ThingManager::GetInstance(); protocol_->SendIotDescriptors(thing_manager.GetDescriptorsJson()); std::string states; if (thing_manager.GetStatesJson(states, false)) { protocol_->SendIotStates(states); } }); //注册音频的关闭回调 protocol_->OnAudioChannelClosed([this, &board]() { board.SetPowerSaveMode(true); Schedule([this]() { auto display = Board::GetInstance().GetDisplay(); display->SetChatMessage("system", ""); SetDeviceState(kDeviceStateIdle); }); }); //注册json解析回调,通知文本,状态等信息 protocol_->OnIncomingJson([this, display](const cJSON* root) { // Parse JSON data auto type = cJSON_GetObjectItem(root, "type"); //文字转语音的状态,包括start,stop,sentence_start/stop(句子开始结束), if (strcmp(type->valuestring, "tts") == 0) { auto state = cJSON_GetObjectItem(root, "state"); if (strcmp(state->valuestring, "start") == 0) { Schedule([this]() { aborted_ = false; if (device_state_ == kDeviceStateIdle || device_state_ == kDeviceStateListening) { SetDeviceState(kDeviceStateSpeaking); } }); } else if (strcmp(state->valuestring, "stop") == 0) { Schedule([this]() { if (device_state_ == kDeviceStateSpeaking) { background_task_->WaitForCompletion(); if (keep_listening_) { protocol_->SendStartListening(kListeningModeAutoStop); SetDeviceState(kDeviceStateListening); } else { SetDeviceState(kDeviceStateIdle); } } }); //句子开始 } else if (strcmp(state->valuestring, "sentence_start") == 0) { auto text = cJSON_GetObjectItem(root, "text"); if (text != NULL) { ESP_LOGI(TAG, "<< %s", text->valuestring); Schedule([this, display, message = std::string(text->valuestring)]() { display->SetChatMessage("assistant", message.c_str()); }); } } =//stt:语音转文字信息 } else if (strcmp(type->valuestring, "stt") == 0) { auto text = cJSON_GetObjectItem(root, "text"); if (text != NULL) { ESP_LOGI(TAG, ">> %s", text->valuestring); Schedule([this, display, message = std::string(text->valuestring)]() { display->SetChatMessage("user", message.c_str()); }); } } else if (strcmp(type->valuestring, "llm") == 0) { auto emotion = cJSON_GetObjectItem(root, "emotion"); if (emotion != NULL) { Schedule([this, display, emotion_str = std::string(emotion->valuestring)]() { display->SetEmotion(emotion_str.c_str()); }); } } else if (strcmp(type->valuestring, "iot") == 0) { auto commands = cJSON_GetObjectItem(root, "commands"); if (commands != NULL) { auto& thing_manager = iot::ThingManager::GetInstance(); for (int i = 0; i < cJSON_GetArraySize(commands); ++i) { auto command = cJSON_GetArrayItem(commands, i); thing_manager.Invoke(command); } } } }); //启动协议 protocol_->Start(); //检测OTA的版本,如果版本比较低则进行升级 // Check for new firmware version or get the MQTT broker address ota_.SetCheckVersionUrl(CONFIG_OTA_VERSION_URL); ota_.SetHeader("Device-Id", SystemInfo::GetMacAddress().c_str()); ota_.SetHeader("Client-Id", board.GetUuid()); ota_.SetHeader("Accept-Language", Lang::CODE); auto app_desc = esp_app_get_description(); ota_.SetHeader("User-Agent", std::string(BOARD_NAME "/") + app_desc->version); xTaskCreate([](void* arg) { Application* app = (Application*)arg; app->CheckNewVersion(); vTaskDelete(NULL); }, "check_new_version", 4096 * 2, this, 2, nullptr); #if CONFIG_USE_AUDIO_PROCESSOR //初始化音频处理,主要是降噪,回声消除,VAD检测等。 audio_processor_.Initialize(codec->input_channels(), codec->input_reference()); audio_processor_.OnOutput([this](std::vector<int16_t>&& data) { background_task_->Schedule([this, data = std::move(data)]() mutable { opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) { //如果启动了音效处理,注册ouput的输出回调。 Schedule([this, opus = std::move(opus)]() { protocol_->SendAudio(opus); }); }); }); }); //注册VAD状态变化 audio_processor_.OnVadStateChange([this](bool speaking) { if (device_state_ == kDeviceStateListening) { Schedule([this, speaking]() { if (speaking) { voice_detected_ = true; } else { voice_detected_ = false; } auto led = Board::GetInstance().GetLed(); led->OnStateChanged();//只点个灯?? }); } }); #endif #if CONFIG_USE_WAKE_WORD_DETECT //启动唤醒检测,初始化唤醒 wake_word_detect_.Initialize(codec->input_channels(), codec->input_reference()); //唤醒词处理回调函数,其中获取到的唤醒词是字符串,还包括获取处理唤醒词的音频编解码 //唤醒词音频部分是否仅仅是唤醒词部分,还包含其他内容数据?需要确认 wake_word_detect_.OnWakeWordDetected([this](const std::string& wake_word) { Schedule([this, &wake_word]() { //如果是idle状态,主要逻辑是,处理业务为连接网络,编码唤醒词,重开唤醒检测 //推送唤醒的音频数据和预料字符串到云端服务器。 if (device_state_ == kDeviceStateIdle) { SetDeviceState(kDeviceStateConnecting); //将唤醒音频内容进行编码 wake_word_detect_.EncodeWakeWordData(); if (!protocol_->OpenAudioChannel()) { //重新再次打开唤醒检测, wake_word_detect_.StartDetection(); return; } //哪些情况会停止唤醒检测:1 检测到唤醒词后会停止。2.处于listening的时候会停止。3.OTA升级过程会停止 std::vector<uint8_t> opus; //编码并将唤醒数据推送到服务器(除了唤醒词可能还包括说话数据?) // Encode and send the wake word data to the server while (wake_word_detect_.GetWakeWordOpus(opus)) { protocol_->SendAudio(opus); } //发送唤醒词的字符串 // Set the chat state to wake word detected protocol_->SendWakeWordDetected(wake_word); ESP_LOGI(TAG, "Wake word detected: %s", wake_word.c_str()); keep_listening_ = true; SetDeviceState(kDeviceStateIdle); } else if (device_state_ == kDeviceStateSpeaking) { //如果说话状态,则将说话进行停止,设置一个停止标志位,并发送停止speak给服务不要再发opus了? AbortSpeaking(kAbortReasonWakeWordDetected); } else if (device_state_ == kDeviceStateActivating) { SetDeviceState(kDeviceStateIdle); } }); }); //启动唤醒检测 wake_word_detect_.StartDetection(); #endif //设置状态为IDLE状态 SetDeviceState(kDeviceStateIdle); esp_timer_start_periodic(clock_timer_handle_, 1000000); mainloop void Application::MainLoop() { while (true) { auto bits = xEventGroupWaitBits(event_group_, SCHEDULE_EVENT | AUDIO_INPUT_READY_EVENT | AUDIO_OUTPUT_READY_EVENT, pdTRUE, pdFALSE, portMAX_DELAY); //处理录音音频处理,将收到的音频做处理送到队列 if (bits & AUDIO_INPUT_READY_EVENT) { InputAudio(); } //处理云端音频处理,将编码的音频进行解码送播放器 if (bits & AUDIO_OUTPUT_READY_EVENT) { OutputAudio(); } //处理其他任务的队列 if (bits & SCHEDULE_EVENT) { std::unique_lock<std::mutex> lock(mutex_); std::list<std::function<void()>> tasks = std::move(main_tasks_); lock.unlock(); for (auto& task : tasks) { task(); } } } } 录音通路 录音处理 // I2S收到音频,触发app应用注册的回调函数通知函数codec->OnInputReady,如下 //通知有数据了,实际读数据通过Read去读。 IRAM_ATTR bool AudioCodec::on_recv(i2s_chan_handle_t handle, i2s_event_data_t *event, void *user_ctx) { auto audio_codec = (AudioCodec*)user_ctx; if (audio_codec->input_enabled_ && audio_codec->on_input_ready_) { return audio_codec->on_input_ready_(); } return false; } //通过eventsetbit触发通知mainloop线程处理音频 codec->OnInputReady([this, codec]() { BaseType_t higher_priority_task_woken = pdFALSE; xEventGroupSetBitsFromISR(event_group_, AUDIO_INPUT_READY_EVENT, &higher_priority_task_woken); return higher_priority_task_woken == pdTRUE; }); //在mainloop中触发Application::InputAudio() void Application::InputAudio() { //获取codec的实例 auto codec = Board::GetInstance().GetAudioCodec(); std::vector<int16_t> data; //获取codec的音频pcm数据存到data中。 if (!codec->InputData(data)) { return;//如果数据为空,直接返回 } //如果采样率不是16Khz,需要进行重采样 if (codec->input_sample_rate() != 16000) { if (codec->input_channels() == 2) { auto mic_channel = std::vector<int16_t>(data.size() / 2); auto reference_channel = std::vector<int16_t>(data.size() / 2); for (size_t i = 0, j = 0; i < mic_channel.size(); ++i, j += 2) { mic_channel[i] = data[j]; reference_channel[i] = data[j + 1]; } auto resampled_mic = std::vector<int16_t>(input_resampler_.GetOutputSamples(mic_channel.size())); auto resampled_reference = std::vector<int16_t>(reference_resampler_.GetOutputSamples(reference_channel.size())); input_resampler_.Process(mic_channel.data(), mic_channel.size(), resampled_mic.data()); reference_resampler_.Process(reference_channel.data(), reference_channel.size(), resampled_reference.data()); data.resize(resampled_mic.size() + resampled_reference.size()); for (size_t i = 0, j = 0; i < resampled_mic.size(); ++i, j += 2) { data[j] = resampled_mic[i]; data[j + 1] = resampled_reference[i]; } } else { auto resampled = std::vector<int16_t>(input_resampler_.GetOutputSamples(data.size())); input_resampler_.Process(data.data(), data.size(), resampled.data()); data = std::move(resampled); } } //如果启动了唤醒检测,判断唤醒检测是否还在运行,如果还在运行将当前的数据合并到唤醒 //检测的buffer中。 #if CONFIG_USE_WAKE_WORD_DETECT if (wake_word_detect_.IsDetectionRunning()) { wake_word_detect_.Feed(data); //会将当前的数据喂给AFE接口,用于做唤醒词 //唤醒词也直接送到云端了??? } #endif //如果打开了音效处理,将音频数据push到音效处理中,直接返回 #if CONFIG_USE_AUDIO_PROCESSOR if (audio_processor_.IsRunning()) { audio_processor_.Input(data); } #else //如果没有打开音效处理,判断当前的状态是否是监听状态,如果是将音频进行编码 //然后推送到远端服务中。 if (device_state_ == kDeviceStateListening) { background_task_->Schedule([this, data = std::move(data)]() mutable { opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) { Schedule([this, opus = std::move(opus)]() { protocol_->SendAudio(opus); }); }); }); } #endif } 音效处理 以下是音效处理过程 //将数据喂给AFE模块,当处理完了之后会触发回调? void AudioProcessor::Input(const std::vector<int16_t>& data) { input_buffer_.insert(input_buffer_.end(), data.begin(), data.end()); auto feed_size = afe_iface_->get_feed_chunksize(afe_data_) * channels_; while (input_buffer_.size() >= feed_size) { auto chunk = input_buffer_.data(); afe_iface_->feed(afe_data_, chunk); input_buffer_.erase(input_buffer_.begin(), input_buffer_.begin() + feed_size); } } void AudioProcessor::AudioProcessorTask() { auto fetch_size = afe_iface_->get_fetch_chunksize(afe_data_); auto feed_size = afe_iface_->get_feed_chunksize(afe_data_); ESP_LOGI(TAG, "Audio communication task started, feed size: %d fetch size: %d", feed_size, fetch_size); while (true) { //获取到PROCESSOR_RUNNING后,不会清除bit(第三个参数),也就说会再次得到运行。 //也就是说AudioProcessor::Start()后,这个会循环运行,直到调用Stop清除。 xEventGroupWaitBits(event_group_, PROCESSOR_RUNNING, pdFALSE, pdTRUE, portMAX_DELAY); //等待获取处理后的数据。 auto res = afe_iface_->fetch_with_delay(afe_data_, portMAX_DELAY); if ((xEventGroupGetBits(event_group_) & PROCESSOR_RUNNING) == 0) { continue; } if (res == nullptr || res->ret_value == ESP_FAIL) { if (res != nullptr) { ESP_LOGI(TAG, "Error code: %d", res->ret_value); } continue; } // VAD state change if (vad_state_change_callback_) { if (res->vad_state == VAD_SPEECH && !is_speaking_) { is_speaking_ = true; vad_state_change_callback_(true); } else if (res->vad_state == VAD_SILENCE && is_speaking_) { is_speaking_ = false; vad_state_change_callback_(false); } } //获取到数据,将数据回调给app->audio_processor_.OnOutput if (output_callback_) { output_callback_(std::vector<int16_t>(res->data, res->data + res->data_size / sizeof(int16_t))); } } } //处理的音效数据的回调,将数据进行编码,然后推送到云端服务器。 audio_processor_.OnOutput([this](std::vector<int16_t>&& data) { background_task_->Schedule([this, data = std::move(data)]() mutable { opus_encoder_->Encode(std::move(data), [this](std::vector<uint8_t>&& opus) { Schedule([this, opus = std::move(opus)]() { protocol_->SendAudio(opus); }); }); }); }); 播放通路 //1. 通过解析输入的json来启动状态的切换。 protocol_->OnIncomingJson([this, display](const cJSON* root) { // Parse JSON data auto type = cJSON_GetObjectItem(root, "type"); if (strcmp(type->valuestring, "tts") == 0) { auto state = cJSON_GetObjectItem(root, "state"); //收到云端音频,云端会发送start,需要切换到speaking状态。 if (strcmp(state->valuestring, "start") == 0) { Schedule([this]() { aborted_ = false; if (device_state_ == kDeviceStateIdle || device_state_ == kDeviceStateListening) { SetDeviceState(kDeviceStateSpeaking); } }); //本次话题结束后,云端会发送stop,可切换到idle。 } else if (strcmp(state->valuestring, "stop") == 0) { Schedule([this]() { if (device_state_ == kDeviceStateSpeaking) { background_task_->WaitForCompletion(); if (keep_listening_) { protocol_->SendStartListening(kListeningModeAutoStop); SetDeviceState(kDeviceStateListening); } else { SetDeviceState(kDeviceStateIdle); } } }); } else if (strcmp(state->valuestring, "sentence_start") == 0) { auto text = cJSON_GetObjectItem(root, "text"); if (text != NULL) { ESP_LOGI(TAG, "<< %s", text->valuestring); Schedule([this, display, message = std::string(text->valuestring)]() { display->SetChatMessage("assistant", message.c_str()); }); } } //2.解析到云端的json后,会发生状态的迁移 void Application::SetDeviceState(DeviceState state) { if (device_state_ == state) { return; } clock_ticks_ = 0; auto previous_state = device_state_; device_state_ = state; ESP_LOGI(TAG, "STATE: %s", STATE_STRINGS[device_state_]); // The state is changed, wait for all background tasks to finish background_task_->WaitForCompletion(); //如果后台有线程还在运行,等待运行结束 auto& board = Board::GetInstance(); auto codec = board.GetAudioCodec(); auto display = board.GetDisplay(); auto led = board.GetLed(); led->OnStateChanged(); switch (state) { case kDeviceStateUnknown: case kDeviceStateIdle: //idle状态,显示"待命" display->SetStatus(Lang::Strings::STANDBY); display->SetEmotion("neutral"); #if CONFIG_USE_AUDIO_PROCESSOR //关掉音效处理 audio_processor_.Stop(); #endif #if CONFIG_USE_WAKE_WORD_DETECT //开启语音唤醒检测 wake_word_detect_.StartDetection(); #endif break; case kDeviceStateConnecting: //连接状态,表示连接服务器 display->SetStatus(Lang::Strings::CONNECTING); display->SetEmotion("neutral"); display->SetChatMessage("system", ""); break; case kDeviceStateListening: //说话状态,显示说话中 display->SetStatus(Lang::Strings::LISTENING); display->SetEmotion("neutral"); //复位解码器,清除掉原来的 ResetDecoder(); //复位编码器的状态 opus_encoder_->ResetState(); #if CONFIG_USE_AUDIO_PROCESSOR //启动音效处理(回声消除?) audio_processor_.Start(); #endif #if CONFIG_USE_WAKE_WORD_DETECT //关闭唤醒检测 wake_word_detect_.StopDetection(); #endif //更新IOT状态 UpdateIotStates(); if (previous_state == kDeviceStateSpeaking) { // FIXME: Wait for the speaker to empty the buffer vTaskDelay(pdMS_TO_TICKS(120)); } break; case kDeviceStateSpeaking: display->SetStatus(Lang::Strings::SPEAKING); //复位解码器 ResetDecoder(); //使能codec输出 codec->EnableOutput(true); #if CONFIG_USE_AUDIO_PROCESSOR //音效处理停止 audio_processor_.Stop(); #endif #if CONFIG_USE_WAKE_WORD_DETECT //开启唤醒检测 wake_word_detect_.StartDetection(); #endif break; default: // Do nothing break; } } //3. 接收云端音频数据的回调,如果是speak状态,将数据入队到队列 protocol_->OnIncomingAudio([this](std::vector<uint8_t>&& data) { std::lock_guard<std::mutex> lock(mutex_); if (device_state_ == kDeviceStateSpeaking) { audio_decode_queue_.emplace_back(std::move(data)); } }); //4.当音频输出准备好后,不会不断的调用这个回调??触发mainloop调用OutputAudio codec->OnOutputReady([this]() { BaseType_t higher_priority_task_woken = pdFALSE; xEventGroupSetBitsFromISR(event_group_, AUDIO_OUTPUT_READY_EVENT, &higher_priority_task_woken); return higher_priority_task_woken == pdTRUE; }); //5. output处理 void Application::OutputAudio() { auto now = std::chrono::steady_clock::now(); auto codec = Board::GetInstance().GetAudioCodec(); const int max_silence_seconds = 10; std::unique_lock<std::mutex> lock(mutex_); //判断解码队列是否为空,如果为空,把codec输出关了,也就是不要再触发回调 if (audio_decode_queue_.empty()) { // Disable the output if there is no audio data for a long time if (device_state_ == kDeviceStateIdle) { auto duration = std::chrono::duration_cast<std::chrono::seconds>(now - last_output_time_).count(); if (duration > max_silence_seconds) { codec->EnableOutput(false); } } return; } //如果是在监听状态,清除掉解码队列,直接返回 if (device_state_ == kDeviceStateListening) { audio_decode_queue_.clear(); return; } //获取编码的数据 last_output_time_ = now; auto opus = std::move(audio_decode_queue_.front()); audio_decode_queue_.pop_front(); lock.unlock(); //将解码数据添加到调度中进行解码播放 background_task_->Schedule([this, codec, opus = std::move(opus)]() mutable { //如果禁止标志位置起,直接退出。在打断唤醒的时候回置起 if (aborted_) { return; } std::vector<int16_t> pcm; //解码为pcm if (!opus_decoder_->Decode(std::move(opus), pcm)) { return; } //如果云端的采样率和codec采样率不一样,进行重采样。 // Resample if the sample rate is different if (opus_decode_sample_rate_ != codec->output_sample_rate()) { int target_size = output_resampler_.GetOutputSamples(pcm.size()); std::vector<int16_t> resampled(target_size); output_resampler_.Process(pcm.data(), pcm.size(), resampled.data()); pcm = std::move(resampled); } //播放音频 codec->OutputData(pcm); }); } -
2条命令本地部署deepseek
环境是centos,下面是部署步骤。 命令1: 安装ollama 安装命令:curl -fsSL https://ollama.com/install.sh | sh 安装日志: >>> Cleaning up old version at /usr/local/lib/ollama >>> Installing ollama to /usr/local >>> Downloading Linux amd64 bundle ######################################################################## 100.0% >>> Creating ollama user... >>> Adding ollama user to video group... >>> Adding current user to ollama group... >>> Creating ollama systemd service... >>> Enabling and starting ollama service... Created symlink from /etc/systemd/system/default.target.wants/ollama.service to /etc/systemd/system/ollama.service. >>> The Ollama API is now available at 127.0.0.1:11434. >>> Install complete. Run "ollama" from the command line. 命令2:下载deepseek模型 安装命令:ollama run deepseek-r1:7b 安装完成后,会直接进入交互控制台: pulling manifest pulling 96c415656d37... 100% ▕█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏ 4.7 GB pulling 369ca498f347... 100% ▕█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏ 387 B pulling 6e4c38e1172f... 100% ▕█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏ 1.1 KB pulling f4d24e9138dd... 100% ▕█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏ 148 B pulling 40fb844194b2... 100% ▕█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏ 487 B verifying sha256 digest writing manifest success >>> hello <think> </think> Hello! How can I assist you today? 😊 >>> 你好 <think> </think> 你好!有什么我可以帮助你的吗?😊 >>> 你是什么模型 <think> </think> 您好!我是由中国的深度求索(DeepSeek)公司开发的智能助手DeepSeek-R1。如您有任何任何问题,我会尽我所能为您提供帮助。 运行时,如果加上--verbose可以查看运行性能参数,如下: total duration: 379.511567ms load duration: 14.749448ms prompt eval count: 60 token(s) prompt eval duration: 15.863495ms prompt eval rate: 3782.27 tokens/s eval count: 64 token(s) eval duration: 322.980292ms eval rate: 198.15 tokens/s total duration:总耗时379.51ms,表示从请求开始到响应完成的整体处理时间 load duration: 模型加载耗时14.75ms,可能涉及模型初始化或数据加载阶段的时间消耗 prompt eval count:输入提示词(prompt)解析的token数量为60个 prompt eval duration:提示词解析耗时15.86ms,反映模型对输入文本的预处理效率 prompt eval rate: 提示词解析速率3782.27 tokens/s,属于高性能表现(通常千级tokens/s为优秀) eval count: 生成输出的token数量为64个 eval duration: 生成耗时322.98ms,占整体耗时的主要部分。 eval rate: 生成速率198.15 tokens/s,属于典型的大模型推理速度(百级tokens/s为常见范围) GGUF导入部署 这种方式可以通过导入GUFF格式的大模型,GUFF格式大模型可以从Hugging Face获取https://huggingface.co/。也可以在modelscope上获取https://modelscope.cn/models。 首先从Hugging Face或者modelscope下载GGUF格式的模型,然后部署主要分为两个步骤 创建模型 通过create指定模型modelfile。 ollama create qwen2.5:7b -f qwen2.5-7b.modelfile modelfile内容如下,指定了模型的路径,模型配置文件描述了模型的参数,更多信息这里不做阐述。 FROM "./qwen2.5-7b-instruct-q4_0.gguf" 运行模型 列出模型 ollama list 运行模型 verbose参数可以打印性能。 ollama run qwen2.5:7b --verbose 也可以使用ollama pull从ollama官方下载,https://ollama.com/search 支持API访问 修改ollama的本地端口 /etc/systemd/system/ollama.service [Unit] Description=Ollama Service After=network-online.target [Service] ExecStart=/usr/local/bin/ollama serve User=ollama Group=ollama Restart=always RestartSec=3 Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin" Environment="OLLAMA_HOST=0.0.0.0" [Install] WantedBy=default.target 然后重新启动 systemctl daemon-reload systemctl restart ollama 确认是否启动成功: sudo netstat -tulpn | grep 11434 # 确认监听0.0.0.0:11434:cite[3]:cite[6] 远程API调用示例 # 查询API版本(验证连通性) curl http://<服务器公网IP>:11434/api/version # 发送生成请求 curl http://localhost:11434/api/generate -d "{\\\\\\\\\\\\\\\\"model\\\\\\\\\\\\\\\\": \\\\\\\\\\\\\\\\"deepseek-r1:7b\\\\\\\\\\\\\\\\", \\\\\\\\\\\\\\\\"prompt\\\\\\\\\\\\\\\\": \\\\\\\\\\\\\\\\"为什么草是绿的\\\\\\\\\\\\\\\\"}" 参考:https://github.com/datawhalechina/handy-ollama/blob/main/docs/C4/1.%20Ollama%20API%20%E4%BD%BF%E7%94%A8%E6%8C%87%E5%8D%97.md 支持web聊天 安装docker 如果要按照网页版的聊天需要安装open ui,先安装docker。 (1)更新系统 sudo yum update -y (2)Docker 需要一些依赖包,你可以通过运行以下命令来安装: sudo yum install -y yum-utils device-mapper-persistent-data lvm2 (3)更新本地镜像源 sudo yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo sed -i 's/download.docker.com/mirrors.aliyun.com\\\\\\\\\\\\\\\\/docker-ce/g' /etc/yum.repos.d/docker-ce.repo yum makecache fast (4)安装docker sudo yum install -y docker-ce (5)设置开机自启动 sudo systemctl start docker sudo systemctl enable docker (6)验证 sudo docker --version systemctl status docker docker安装open webui 拉取并运行 Open WebUI 容器,将容器端口 8080 映射到主机 3000 端口 docker run -d -p 3000:8080 --add-host=host.docker.internal:host-gateway -v open-webui:/app/backend/data --name open-webui --restart always ghcr.io/open-webui/open-webui:main 如果3000端口被占用了,会报错,重新启动也会提示错误如下。 报错解决: docker run -d -p 6664:8080 --add-host=host.docker.internal:host-gateway -v open-webui:/app/backend/data --name open-webui --restart always ghcr.io/open-webui/open-webui:main docker: Error response from daemon: Conflict. The container name "/open-webui" is already in use by container "88f6e12e8e3814038911c30d788cb222d0792a9fc0af45f41140e07186e62a16". You have to remove (or rename) that container to be able to reuse that name. 你遇到的问题是 Docker 容器名称冲突。错误消息表明,容器名称 /open-webui 已经被另一个正在运行的容器占用,因此你无法启动新的容器。 (1)查看当前运行的容器: docker ps -a 88f6e12e8e38 ghcr.io/open-webui/open-webui:main "bash start.sh" 3 minutes ago Created open-webui (2)停止并删除已有的容器 docker stop open-webui docker rm open-webui 登录网址https://xxx:6664 配置即可访问。 -
豆包大模型接入体验
前置条件 需要先创建获得API key和创建推理接入点。 API key获取 https://www.volcengine.com/docs/82379/1361424#f79da451 创建推理接入点 https://www.volcengine.com/docs/82379/1099522 安装python环境 python版本需要安装到Python 2.7或以上版本。执行python --version可以检查当前Python的版本信息。我这里的版本已经到3.8.10 python3 --version Python 3.8.10 接着安装豆包sdk pip install volcengine-python-sdk Collecting volcengine-python-sdk Downloading volcengine-python-sdk-1.0.118.tar.gz (3.1 MB) |████████████████████████████████| 3.1 MB 9.7 kB/s Requirement already satisfied: certifi>=2017.4.17 in /usr/lib/python3/dist-packages (from volcengine-python-sdk) (2019.11.28) Requirement already satisfied: python-dateutil>=2.1 in /usr/lib/python3/dist-packages (from volcengine-python-sdk) (2.7.3) Requirement already satisfied: six>=1.10 in /usr/lib/python3/dist-packages (from volcengine-python-sdk) (1.14.0) Requirement already satisfied: urllib3>=1.23 in /usr/lib/python3/dist-packages (from volcengine-python-sdk) (1.25.8) Building wheels for collected packages: volcengine-python-sdk Building wheel for volcengine-python-sdk (setup.py) ... done Created wheel for volcengine-python-sdk: filename=volcengine_python_sdk-1.0.118-py3-none-any.whl size=10397043 sha256=c4546246eb0ef4e1c68e8047c6f2773d601821bd1acb7bc3a6162919f161423b Stored in directory: /home/apple/.cache/pip/wheels/d2/dc/23/70fa1060e1a527a290fc87a35469401b7588cdb51a2b75797d Successfully built volcengine-python-sdk Installing collected packages: volcengine-python-sdk Successfully installed volcengine-python-sdk-1.0.118 需要更新 pip install --upgrade 'volcengine-python-sdk[ark]' Requirement already up-to-date: volcengine-python-sdk[ark] in /home/apple/.local/lib/python3.8/site-packages (1.0.118) Requirement already satisfied, skipping upgrade: urllib3>=1.23 in /usr/lib/python3/dist-packages (from volcengine-python-sdk[ark]) (1.25.8) Requirement already satisfied, skipping upgrade: six>=1.10 in /usr/lib/python3/dist-packages (from volcengine-python-sdk[ark]) (1.14.0) Requirement already satisfied, skipping upgrade: python-dateutil>=2.1 in /usr/lib/python3/dist-packages (from volcengine-python-sdk[ark]) (2.7.3) Requirement already satisfied, skipping upgrade: certifi>=2017.4.17 in /usr/lib/python3/dist-packages (from volcengine-python-sdk[ark]) (2019.11.28) Collecting cryptography<43.0.4,>=43.0.3; extra == "ark" Downloading cryptography-43.0.3-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.0 MB) |████████████████████████████████| 4.0 MB 1.7 MB/s Collecting httpx<1,>=0.23.0; extra == "ark" Downloading httpx-0.28.1-py3-none-any.whl (73 kB) |████████████████████████████████| 73 kB 1.0 MB/s Collecting pydantic<3,>=1.9.0; extra == "ark" Downloading pydantic-2.10.4-py3-none-any.whl (431 kB) |████████████████████████████████| 431 kB 1.6 MB/s Collecting anyio<5,>=3.5.0; extra == "ark" Downloading anyio-4.5.2-py3-none-any.whl (89 kB) |████████████████████████████████| 89 kB 1.8 MB/s Collecting cffi>=1.12; platform_python_implementation != "PyPy" Downloading cffi-1.17.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (446 kB) |████████████████████████████████| 446 kB 1.2 MB/s Collecting httpcore==1.* Downloading httpcore-1.0.7-py3-none-any.whl (78 kB) |████████████████████████████████| 78 kB 1.8 MB/s Requirement already satisfied, skipping upgrade: idna in /usr/lib/python3/dist-packages (from httpx<1,>=0.23.0; extra == "ark"->volcengine-python-sdk[ark]) (2.8) Collecting pydantic-core==2.27.2 Downloading pydantic_core-2.27.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.0 MB) |████████████████████████████████| 2.0 MB 1.0 MB/s Collecting typing-extensions>=4.12.2 Downloading typing_extensions-4.12.2-py3-none-any.whl (37 kB) Collecting annotated-types>=0.6.0 Downloading annotated_types-0.7.0-py3-none-any.whl (13 kB) Collecting exceptiongroup>=1.0.2; python_version < "3.11" Downloading exceptiongroup-1.2.2-py3-none-any.whl (16 kB) Collecting sniffio>=1.1 Downloading sniffio-1.3.1-py3-none-any.whl (10 kB) Collecting pycparser Downloading pycparser-2.22-py3-none-any.whl (117 kB) |████████████████████████████████| 117 kB 2.9 MB/s Collecting h11<0.15,>=0.13 Downloading h11-0.14.0-py3-none-any.whl (58 kB) |████████████████████████████████| 58 kB 3.3 MB/s Installing collected packages: pycparser, cffi, cryptography, h11, httpcore, exceptiongroup, typing-extensions, sniffio, anyio, httpx, pydantic-core, annotated-types, pydantic WARNING: The script httpx is installed in '/home/apple/.local/bin' which is not on PATH. Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location. Successfully installed annotated-types-0.7.0 anyio-4.5.2 cffi-1.17.1 cryptography-43.0.3 exceptiongroup-1.2.2 h11-0.14.0 httpcore-1.0.7 httpx-0.28.1 pycparser-2.22 pydantic-2.10.4 pydantic-core-2.27.2 sniffio-1.3.1 typing-extensions-4.12.2 测试 单张图片测试 vim test.py import os # 通过 pip install volcengine-python-sdk[ark] 安装方舟SDK from volcenginesdkarkruntime import Ark # 替换为您的模型推理接入点 model="ep-20250101121404-stw4s" # 初始化Ark客户端,从环境变量中读取您的API Key client = Ark( api_key=os.getenv('ARK_API_KEY'), ) # 创建一个对话请求 response = client.chat.completions.create( # 指定您部署了视觉理解大模型的推理接入点ID model = model, messages = [ { "role": "user", # 指定消息的角色为用户 "content": [ # 消息内容列表 {"type": "text", "text":"这张图片讲了什么?"}, # 文本消息 { "type": "image_url", # 图片消息 # 图片的URL,需要大模型进行理解的图片链接 "image_url": {"url": "http://www.laumy.tech/wp-content/uploads/2024/12/wp_editor_md_7a3e5882d13fb51eecfaaf7fc8c53b59.jpg"} }, ], } ], ) print(response.choices[0]) 执行返回结果 python3 test.py Choice( finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage( content='这张图片展示了一个WebRTC(Web实时通信)的流程示意图,涉及到PC(个人计算机)、MQTT代理(mqtt broker)和CEMARA设备。以下是流程图的主要步骤: \n\n1. **PC端操作**: \n - **连接和订阅**:PC端首先进行连接(connect),然后订阅相关主题("webrtc/id/jsonrpc"和"webrtc/id/jsonrpc-replay")。 \n - **发布消息**:PC端发布消息(pub),发送"offer"请求(offer (req))。 \n - **接收消息**:PC端接收来自MQTT代理的消息,包括"message"事件和相关的应答(res)。 \n - **创建应答**:PC端创建应答(pc.createAnswer),并设置远程描述(pc.setRemoteDescription)。 \n\n2. **STUN/TURN服务器交互**: \n - **STUN/TURN绑定请求和应答**:在STUN/TURN服务器上,PC端发起绑定请求(binding req)和应答(binding res),获取SDP(Session Description Protocol)信息。 \n - **ANSWER请求和应答**:PC端发送ANSWER请求(anser (req)),并接收ANSWER应答(anser (res))。 \n\n3. **检查和连接过程**: \n - **检查连接**:PC端按照优先级顺序检查连接的顺畅性(host、srflx、relay)。 \n - **连接完成**:经过一系列的检查和交互,PC端与CEMARA设备成功连接(CONNECTED)。\n\n4. **数据交互和完成**: \n - **数据交互**:PC端和CEMARA设备开始进行数据交互(agent_send和agent_recv)。 \n - **完成状态**:数据交互完成后,流程进入“COMPLETED”状态,表示整个WebRTC通信过程结束。 \n\n整个流程图清晰地展示了WebRTC通信过程中PC端与MQTT代理以及STUN/TURN服务器之间的交互过程,包括连接、消息发布、应答接收、绑定请求、检查连接等步骤,最终实现了PC端与CEMARA设备的数据通信。', role='assistant', function_call=None, tool_calls=None, audio=None ) )