Linux
  • 进程创建

    进程创建

    fork创建了一个新的进程,也就是fork执行后就会返回两次,分别是父进程返回和子进程返回。 exec可以加载新的程序运行(原程序是A,可以在A中运行后加载可执行程序B,B是A的子进程)。而如果没有exec,A程序执行fork后,仅只是将fork之后的代码复制了一份。exec最早是为了实现shell而涉及的,目的是能够A程序启动B程序后,可以改变进程的环境变量实现如 ps > 1.txt这种处理。 父进程可以调用wait来等待子进程退出,子进程结束后会销毁其使用的资源,但是会保留task_struct+栈空间(一般累计4K或8K大小),这块空间子进程不能销毁,只能等父进程使用wait来进行销毁。如果父进程没有wait来销毁这块内存,子进程就会变成僵尸进程。如果父进程没有调用wait而先于子进程结束,那么init进程会收留这个子进程,调用wait进行销毁掉。 进程终止一是自愿终止包括显式调用exit()系统调用或者从某个程序主函数返回;另外一个式被动收到终止信号或者异常终止。进程主动终止:在main函数中返回,链接程序会自动添加exit系统调用或主动调用exit函数两种方式。进程被动终止:进程收到一个自己不能处理的信号、进程在内核态执行时产生异常、进程收到SIGKILL等终止信号。 进程创建 Linux系统中,创建进程用户空间一般有3个函数,fork、vfork、clone,3个函数最终调用的是kernel_clone(旧一点的内核版本调用_do_fork)。 fork:子进程是父进程的翻版,完成复制了一份栈、数据段、堆、文件等等。在操作系统中使用写时复制的技术,当fork一个进程后,父子进程是共享一份系统资源的,当父进程或子进程有任一进程有写操作时,就会触发缺页异常复制一份属于自己的一份资源,从此父子进程的这类资源无任何关系。 vfork:fork和vfork函数类似,vfork的父进程会一直阻塞,直到子进程调用exit或者execve为止。vfork比fork实现多了两个标志位,分别是CLONE_VFORK和CLONE_VM。CLONE_VFORK会让父进程被挂起,直到子进程释放虚拟内存资源,CLONE_VM表示父子进程执行在相同的进程地址空间中。vfork与fork的区别是,父子进程是共享内存空间的,也就是说子进程修改了内存,父进程也会受影响。同时父进程需要等待子进程运行结束才能运行。vfork最初的设计是系统没有MMU而设计的,没法实现COW机制。 clone:Clone通常用于创建用线程,在linux系统中没有专门的线程,而是把线程当成普通进程来看待,在内核中还是以task_struct数据结构来描述线程,并没有使用特殊的数据结构或者调度算法来描述线程。Clone函数功能强调,可以传递众多参数,可以有选择性的继承父进程的资源(共享使用),如可以和vfork一样,与父进程共享一个进程地址空间,从而创建线程,也可以不和父进程共享进程地址空间,甚至可以创建兄弟关系进程。 写时复制 子进程在被创建后,父子进程是共享所有资源的,当父进程或子进程任一进程先触发写操作,触发写保护缺页异常,然后复制一份页面内容。Linux使用写时复制技术使得创建新进程的开销变得很小,免去了复制父进程整个进程地址空间中的内容避免巨大开销,只需要复制父进程页表的一点开销。
  • 进程基本概念

    进程基本概念

    进程标识 进程是程序加载到内存的执行过程。进程与程序相比用于操作系统的资源如内存空间、文件、signal等。对于进程的标识我们使用process id来标识(PID)。 线程是进程中活跃状态的实体,也是操作系统实际调度的基本单元。进程中的所有线程是共享一些资源的。在linux中,实际上不区分进程和线程,进程和线程都是task_struct结构体来描述。在linux中使用thread ID(TID)来标识进程中的线程,thread id在所属进程中是唯一的,在linux系统中也是全局唯一的。对于单线程的进程来说,process ID和thread ID是一样的;而对于多线程来说,每个线程有自己的thread ID,但是所有线程共享一个PID。 进程组是一组进程的集合,创建进程组主要是将一些拥有共同特性的进程组合起来便于管理,如可以发一个信号给一个进程组,则这个组内的进程都会收到该信号。任何一个进程都不是独立存在的,一定是属于某个进程组,当fork的时候进程就归属到创建这所属的进程组。进程组用process group ID(PGID)来标识,进程组内的所有进程都有相同的GPID,等于该组组长的PID。可以通过setpgid、getpgid、setpgrp和getpgrp等接口函数访问PGID。 会话是一个用户登录后会创建一个会话,这个会话用sesssion ID(SID)来进行标识。登录的第一个进程较会话领头进程,通常是shell/bash。领头进程PID=SID。用户登录系统后,不断提交任务给操作系统,最后退出登录,就会销毁该session。可以通过getsid,setid来操作SID。 命名空间是用来隔离内核资源的,当一个进程运行在linux系统上的时候,它就拥有了很多系统资源如PID,网络设备,文件系统等。Linux内核通过namespace可以让一些进程只能看到与自己相关的一部分资源,而另外一些进程也只能看到与他们自己相关的资源,让互不联系的进程感觉不到对方的存在。目前linux现有的namespace有如下7种: namespace 隔离内容 Cgroup Cgoup root directory IPC System V IPC,POSIX消息队列 Network 网络设备、栈、端口等 Mount 挂载点 PID 进程ID User 用户和组ID UTS 主机名和NIS域名 与namespace相关的函数只有三个clone,setns,unshare。分别是创建一个进程放到对应namespace,将当前进程加入到已知namespace,退出指定类型namespace并加如到创建的namespace。 PID命名空间对进程PID重新标号,即不同的namespace下进程可以有同一个PID,如下容器1的a和容器2的a。他们分别对应在内核空间是PID namespace 1和PID namespace2。内核种为所有的PID namespace维护了一个树状结构,最顶层的是系统初始化创建的,被称为root namespace,由他创建的新的PID namespace成为它的chid namespace。父节点是可以看到字节点种的进程的,可以通过信号对子节点的进程产生影响,但是子节点无法看到父节点的进程。PID namespace对容器应用特别重要,可以实现容器内进程的暂停/恢复等功能,还可以支持容器在跨主机的迁移前后保持内部进程的PID不发生变化。 进程描述 Linux系统要对进程进行操作,需要抽象出所拥有的资源,我们称为进程控制块(Process Control Block,PCB),也称为进程描述符号,Linux中使用struct task_struct结构体来进行描述。task_struct数据结构包含的内容可以归类为几类: 进程属性相关。 进程间的关系。 进程调度相关信息。 内存管理相关信息。 文件管理相关信息。 信号相关信息。 资源限制相关信息。 上图中列出了task_struct数据结构中包含的一些内容。在linux中,进程和线程都是使用task_struct来进行描述。 进程状态 就绪态:进程获得了可以运行的所有资源和准备条件 运行态:CPU正在运行该进程。(linux中就绪态和运行态都是TASK_RUNNING) 浅度睡眠:进程需要某些资源不满足而进入等待,当条件满足时转为就绪队列。 深度睡眠:与浅度睡眠不同的时,进程睡眠等待不受干扰,不响应信号,如SIGKILL信号无法终止。 暂停:进程运行停止 僵死:进程已经消亡,但是task_struct数据结构还没有释放,父进程通过wait来获取子进程消亡原因。僵死进程已经放弃了几乎所有的内存空间,不会再执行代码,也不能被调度。之所以产生僵死进程时因为其父进程没有调用wait函数来等待子进程结束(父进程没来收尸,就变僵尸了),如果父进程异常退出了,那么init进程会自动接手这个子进程,也就是说父进程还活着,但是并没有调用wait来清除子进程。 进程间的关系 linux内核启动时,会创建一个init_task进程,这是系统中所有进程的祖先,称为进程0或idle进程,当系统没有进程调度时,调度器就会运行idle进程。在smp中,每个cpu都有一个进程0。在执行kernel_init函数后,会启动进程1(用户第一个进程),进程1是所有用户进程的祖先,可以通过pstree来查看进程关系。 上图中,procd等同于init,有用openwrt的1号进程使用的是procd,当然也可以改成init。 Linux系统中task_struct数据结构使用4个成员来描述进程间的关系,如下: real_parent: 指向创建当前进程的进程描述描述符,如果创建的进程不存在,则指向init进程。 parent:指向进程当前的父进程,通常和real_parent一致。 children:指向其子进程,所有的子进程被链接到这个链表上。 sibling:指向兄弟进程,所有的兄弟进程链接成一个链表。 获取当前进程 系统运行时,调度操作的数据结构就是task_struct,因此在系统调度时要运行进程,必现要找到对应进程的task_struct结构体。Linux内核提供了current宏来方便快速找到当前要运行或正在运行的task_struct数据结构。 current的实现和具体的架构有关,通常有两类实现方式。在ARM32系统中,存放了一个thread_info的数据结构在内核栈里面,current宏通过arm32的SP寄存器来获取当前内核栈的地址,对齐后即可获取到thread_info数据结构的指针,最后通过thread_info->task成员获取task_struct数据结构。 在linux 5.0内核中,新增了一个配置选项CONFIG_THREAD_INFO_IN_TASK,将thread_info存放在task_struct数据结构中。在ARM64处理上,利用SP_EL0寄存器粗放囊当前进程的task_struct数据结构的地址。
  • 中断小结

    中断小结

    上下文 是否抢占 顶半部 中断 否 Softirq/tasklet 软中断 是 workqueue 进程 是 threaded_irq 进程 是 Tasklet:底半部,优先级比较高,处理函数中不能睡眠。 workqueue:底半部,处理函数可以睡眠,也可以执行比较长的应用。 threaded_irq:底半部,处理函数可以睡眠,也可以执行比较长的应用,支持IRQ_ONESHOT。 中断处理过程 为什么软中断中不能睡眠 软中断的触发是在中断执行irq_exit的时候,软中断可以线程化和非线程化处理,当软中断是非线程化时,还是处于中断上下文的,尽管进入软中断会打开本地cpu中断响应,但是但是其他寄存器的值是没有恢复的,如果此时进入睡眠将会触发调度,那么上一个任务就无法返回。 中断是否会丢失 会,使用边沿触发时容易丢失。因为CPU响应一个中断时会屏蔽掉CPU的本地响应,无法进行再响应其他中断,但是中断控制器只会pengding一个信号,没法叠加信号,即在CPU响应处理A中断时,B中断要是触发了1次以上,那么等A中断处理完后再响应B中断时也只会处理1次,本质上还是中断控制器没法叠加信号。因此在驱动编写时,如果不想丢中断,建议使用电平触发,因为电平触发如果没有与外设进行同步清除触发源时,电平会一直触发中断响应。 如何解决电平触发中断洪泛 可使用request_thread_irq(irq,irq_default_primary_handler,xx_isr,IRQF_ONESHOT)。对于电平触发的外设,如果需要使用睡眠的API接口操作外设写相关寄存器才能清除中断就可使用request_thread_irq。如下图的场景,当点击触摸屏LCD触发高电平中断时,当CPU收到高电平触发时需要通过I2C接口写触摸屏的寄存器清除中断,高电平才会拉低进入下一流程。此时就会面临一个问题,当CPU响应中断处理时,在顶半部是无法调用I2C的接口写寄存器清除的,因为I2C的接口可能是睡眠,在中断上下文中是无法进行睡眠,那么处理写I2C的操作需要移到底半部去处理,但是一进入底半部就会开中断,因为是高电平,那么就又会触发中断,因此就引入IRQF_ONESHOT,当注册中断是声明了该参数,中断响应从顶半部退出,直到底半步结束才会打开中断控制器对该外设的中断响应,这就解决了中断洪泛的问题。irq_default_primary_handler是默认的顶半部处理函数,不做任何处理,直接返回IRQ_WAKE_THREAD进入底半部,当然用户也可以自定义声明函数,但是返回值要使用IRQ_WAKE_THREAD,如果自定义声明的顶半部为NULL,那么系统会默认设定irq_default_primary_handler。 为什么linux不是实时操作系统 进入中断处理时,CPU就关闭了本地中断响应,没法再响应其他中断,即linux的中断时没法嵌套的,即使有再高优先级的中断也是没法处理。另外软中断的处理要比任何进程优先级高,因为软中断是可以在中断上下文中运行。 除了中断,软中断外,spinlock在处理过程中是关闭抢占调度的,所以在spinlock期间也是也没法调度的。看下下面的场景: T0时刻normal task执行系统调用进入到内核。 T1时刻获取到了spin lock,进入临界区保护阶段。 T2时刻产生了IRQ1中断,进而进行处理IRQ1中断。 T3时刻唤醒了高优先级的RT task,但此时系统处于中断中无法进行调度。 T4时刻IRQ1中断处理结束,但接着又触发了IRQ2中断,进入IRQ2中断处理。 T5时刻IR2中断处理结束,但仍处于spin lock临界区,依旧无法调度RT task。 T6时刻,spin lock释放,高优先级的RT task得到调度运行。 T7时刻,RT task运行结束,normal task继续得到运行。 T8时刻,从内核态返回用户态。 如何让linux变成实时操作系统 打上RTlinux的patch:https://mirrors.edge.kernel.org/pub/linux/kernel/主要的改进是将spinlock改完可抢占,将所有中断处理改完线程化等等。中断处理线程化,其核心原理比较简单,就是顶半部不处理,直接返回IRQ_WAKE_THREAD,这样所有的操作都进入到中断线程中处理。 如何调试? cat /proc/interrupts /proc/irq/xx/smp_affinity root@TinaLinux:/# cd /sys/kernel/debug/tracing/ root@TinaLinux:/sys/kernel/debug/tracing# echo irqsoff > current_tracer root@TinaLinux:/sys/kernel/debug/tracing# echo 1 > tracing_on root@TinaLinux:/sys/kernel/debug/tracing# echo 0 > tracing_on root@TinaLinux:/sys/kernel/debug/tracing# cat trace pinctrl/*/pins 参考文献 1.GICv3_Software_Overview_Official_Release_B.pdf 2.aarch64_exception_and_interrupt_handling_100933_0100_en.pdf 3.ARM64体系结构编程与实践 4.奔跑吧 Linux内核:基于Linux 4.x内核源代码问题分析 5.http://www.wowotech.net/ 6.https://www.cnblogs.com/LoyenWang/
  • workqueue

    workqueue

    API接口 初始化 函数 说明 DECLARE_WORK(n, f) 静态定义一个work,实际就是定义一个struct work_struct的全局变量。 DECLARE_DELAYED_WORK(_work, _func) 静态定义一个work,与上面的区别就是work可以在指定时间之后再由线程来执行。 INIT_WORK(_work, _func) 可以动态的分配一个struct work_struct,但是调用该函数进行初始化。 INIT_DELAYED_WORK(_work, _func) 动态分配,延迟work执行。 触发执行 函数 说明 schedule_work(struct work_struct *work) 调度一个work运行,会将work挂入到默认workqueue(system_wq)中运行。 queue_work(struct workqueue_struct *wq,struct work_struct *work) 调度一个work在指定的workqueue上运行。 queue_delayed_work(struct workqueue_struct*wq,struct delayed_work *dwork,unsigned long delay) 延迟一段时间调度一个work在指定的workqueue上运行。 schedule_work实际上也是调用queue_work,将其wq指定为system_wq,在workqueue_init_early进行初始化工作队列时会默认创建几个workqueue。 创建workqueue create_workqueue(name) 创建一个普通的workqueue,该workqueue将在每个cpu上都创建一个worker thread,后文会描述。 create_freezable_workqueue(name) 在suspend的时候不冻结内核线程的worker thread create_singlethread_workqueue(name) 只有一个thread,所有的work在thread中排队运行。 cstruct workqueue_struct *alloc_workqueue(const char *fmt,unsigned int flags,int max_active, ...) 最原始的分配函数,上面三个函数都会调用到该函数,其中第三个参数是工作队列中当前能够运行的最大work数量,当大于该值其work将会被添加到未激活的链表中等待运行的work完成后才能运行。上面三个函数的max_active都是1,因此work都是排队运行的,因此要并行work使用alloc_workqueue来创建。 除了系统定义的几个默认workqueue,用户可以调用上面的函数自己创建workqueue,对于queue_work来说一般用就可以指定使用自己创建的workqueue。 数据结构 workqueue涉及到几个重要的数据结构,可以结合下面的图来进行理解。 work_struct: “任务项”,也可以称为工作,填充了用户实际要处理函数任务。初始化后的work将会被添加到worker_pool中链表上。 workqueue_struct:“项目”;任务由那个工作队列负责运行,即可理解为任务所属“项目”,“项目”是多个任务的集合。一个工作队列可以处理多个任务。系统在初始化时默认创建了一些工作队列如system_wq,system_highpri_wq等,用户也可以调用alloc_workqueue来创建一个工作队列。系统定义了一个全局的链表workqueues,所有的工作队列都连接到该链表上。 worker:“工人”;每一个worker对应一个task,该worker上可能挂接着多个等待处理的任务。 worker_pool:“工厂”;一个“工厂”里面多个“工人”,工厂还没被触发的任务都挂在worklist链表上。 pool_workqueue:“厂长”;建立workqueue_struct和pool_workqueue的联系。 workqueue是把work推迟到一个内核线程中去执行,结合上面的数据结构关系具体描述就是:一个项目(workqueue)上可以处理很多个工作(work),这些项目(workqueue)的工作(work)交给工厂负责人(pool_wokerqueue)协调到一个工厂(woker_pool)去生产,工厂(worker_pool)中当收到要启动处理工作(work)时,就安排一个工人(worker)去执行工作(work)。 worker_pool是管理了多个worker,每个worker对应一个task。因此我们也称worker_pool为线程池。线程池的线程数量是可以动态分配或移除。线程池可以分为与特定CPU绑定的线程池和没有绑定的线程池两类。 Bound 线程池:这种线程池根据优先级分为高低两类,分别用来处理高优先级和低优先级的任务。绑定的线程池在系统中使用全局数组定义好了,DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools), 取决于cpu的数量,根据高低优先级那么如果有n个cpu,那么就对应有2n个worker_pool。 Unbound线程池:这类线程池可以运行在任意cpu上,其worker_pool是动态创建的,创建worker_pool时会判断其线程池熟悉,如线程优先级nice,如果属性一样就再重复创建,共有一个线程池。 worker_pool线程池与workqueue是没有直接联系的,当用户创建一个workqueue只是选择一个或多个线程池而已,对于bound类型的线程池,每个cpu有两个线程池对于高低优先级;对应unbound类型线程池,根据属性动态创建线程池。创建线程池后,默认情况下线程池会创建一个worker 线程来处理work,随着work数量的提交,woker_pool动态的调整worker来应对work数量。 后续约定workqueue简写wq;worker_pool简写pool,也称为线程池;pool_workqueue简写pwq; 初始化 workqueue初始化分为两个阶段,分别为早期workqueue_init_early和workqueue_init。 workqueue early init void __init workqueue_init_early(void) { int std_nice[NR_STD_WORKER_POOLS] = { 0, -20 }; pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC); BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL)); cpumask_copy(wq_unbound_cpumask, housekeeping_cpumask(hk_flags)); //静态为每个cpu创建两个线程池,用于处理高优先级和普通优先级的work。 for_each_possible_cpu(cpu) { struct worker_pool *pool; i = 0; //为每个cpu定义了一个静态的worker_pool[2],这里遍历数组进行初始化,设定线程池运行的cpu、nice值以及所属node节点。其中nice值会有差别,依次为0和-20,这就决定了,这两个线程池的优先级运行优先级是不同的。 for_each_cpu_worker_pool(pool, cpu) { BUG_ON(init_worker_pool(pool)); //初始化worker_pool,分配struct workqueue_attrs pool->cpu = cpu; cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu)); //设置要运行的cpu,线程池是绑定在cpu上的。 pool->attrs->nice = std_nice[i++]; //设置线程的优先级 pool->node = cpu_to_node(cpu); //设置所属node节点 //分配worker pool id mutex_lock(&wq_pool_mutex); BUG_ON(worker_pool_assign_id(pool)); mutex_unlock(&wq_pool_mutex); } } //除了静态为每个cpu创建两个线程池外,还可以创建跟cpu不绑定的线程池,线程池将会//动态的创建,这里先分配线程池的属性。 //动态创建的线程池又分为两类,分别是跟node节点绑定的线程池和跟node节点不绑定 //且线程池中任务运行按顺序执行的工作队列。 for (i = 0; i < NR_STD_WORKER_POOLS; i++) { struct workqueue_attrs *attrs; //用于Per-node 线程池的属性创建 BUG_ON(!(attrs = alloc_workqueue_attrs())); //分配一个workqueue_attrs。 attrs->nice = std_nice[i]; //也有两个优先级,因此每个节点至少也会创建两个线程池 unbound_std_wq_attrs[i] = attrs;//属性赋值到全局变量中保存。 /* * An ordered wq should have only one pwq as ordering is * guaranteed by max_active which is enforced by pwqs. * Turn off NUMA so that dfl_pwq is used for all nodes. */ //用于工作队列中任务顺序执行线程池属性创建 BUG_ON(!(attrs = alloc_workqueue_attrs())); attrs->nice = std_nice[i]; //高低优先级的线程池 attrs->no_numa = true; //与节点无关 ordered_wq_attrs[i] = attrs; } //系统将默认创建几个工作队列。 system_wq = alloc_workqueue("events", 0, 0); system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); system_long_wq = alloc_workqueue("events_long", 0, 0); system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE); system_freezable_wq = alloc_workqueue("events_freezable", WQ_FREEZABLE, 0); system_power_efficient_wq = alloc_workqueue("events_power_efficient", WQ_POWER_EFFICIENT, 0); system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient", WQ_FREEZABLE | WQ_POWER_EFFICIENT, 0); BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq || !system_unbound_wq || !system_freezable_wq || !system_power_efficient_wq || !system_freezable_power_efficient_wq); } 在workqueue_init_early的初始化过程中,先初始化每个cpu上的线程池,在系统中为每个cpu上静态定义每个cpu上定义了一个struct worker_pool cpu_worker_pools[2]的数组(DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools)),即每个cpu上有两个线程池,分别用于高优先级和低优先级。 除了为每个CPU绑定两个高低优先级的线程池外,还可以动态的创建线程池,动态创建的线程池可分为跟内存node绑定和跟node节点不绑定且工作队列中的任务顺序执行,所以系统先为没有绑定cpu类型的线程池先分配好属性,存储在unbound_std_wq_attrs[2]和ordered_wq_attrs[2]中,从属性的数组定义也可以看出,每一类也有两个线程池对应的是高优先级和普通优先级。 workqueue init void __init workqueue_init(void) { struct workqueue_struct *wq; struct worker_pool *pool; int cpu, bkt; /* * It\'d be simpler to initialize NUMA in workqueue_init_early() but * CPU to node mapping may not be available that early on some * archs such as power and arm64. As per-cpu pools created * previously could be missing node hint and unbound pools NUMA * affinity, fix them up. * * Also, while iterating workqueues, create rescuers if requested. */ //对numa的wq初始化,嵌入式设备一般只有一个node,所以对于numa的我们暂不深入 wq_numa_init(); mutex_lock(&wq_pool_mutex); for_each_possible_cpu(cpu) { for_each_cpu_worker_pool(pool, cpu) { pool->node = cpu_to_node(cpu); } } list_for_each_entry(wq, &workqueues, list) { wq_update_unbound_numa(wq, smp_processor_id(), true); WARN(init_rescuer(wq), \"workqueue: failed to create early rescuer for %s\", wq->name); } mutex_unlock(&wq_pool_mutex); 遍历CPU,为每个绑定cpu的线程池创建一个worker /* create the initial workers */ for_each_online_cpu(cpu) { for_each_cpu_worker_pool(pool, cpu) { pool->flags &= ~POOL_DISASSOCIATED; BUG_ON(!create_worker(pool)); } } //为unbound类型的创建一个线程池? hash_for_each(unbound_pool_hash, bkt, pool, hash_node) BUG_ON(!create_worker(pool)); wq_online = true; wq_watchdog_init(); } 创建工作队列 创建工作队列主要的工作就是确定好工作队列使用的线程池(worker_pool),工作队列分为BOUND CPU类型和UNBOUND CPU类型。用户可以调用create_workqueue、create_freezable_workqueue以及create_singlethread_workqueue等API接口来创建队列,这些API接口最终都会调用alloc_workqueue来实现,只不过每个函数传入的参数是不一样的,对于后两个函数就是UNBOUND类型,同时create_singlethread_workqueue还是ORDERED类型,该类型的工作队列表示其所处理的工作都是按顺序排队运行的。 在解释创建工作队列的流程前我们再来说明下workqueue、pool_workqueue、worker_pool之间的关系。worker_pool是线程的集合,可以实际处理任务的,挂在workqueue上的工作最终都会交由worker_pool来处理,但是workqueue和worker_pool并没有直接一对一的关系,而是多对多的关系,可以理解要生产的产品和工厂是解耦的,工厂是一个共享的池子,因此要做一个产品(workqueue)和工厂产线生产(worker_pool)之间的联系需要个中间人来张罗,所以pool_workqueue就是这中间人,用于确定制作这个产品选择那个工厂。 Workqueue如何获取到worker_pool? (1)对于bound workqueue,是每个cpu都绑定了两个高优先级和普通优先级的线程池,这些cpu上的线程池都是共享的,因此对于workqueue需要为每个cpu的每个线程池分配一个pool_workqueue来对应到线程池,即该workqueue需要分配2*cpu个数的pool_workqueue,pool_workqueue与worker_pool是一一对应的关系。在alloc_workqueue调用alloc_percpu来分配pwq,然后遍历每个cpu获取到worker_pool,建立器pwq,wq,pool之间的联系。 (2)对于unbound workqueue,就稍微复杂些因为workqueue要找的线程池没有与cpu进行绑定,不像bound workqueue定义好了worker_pool,所以了workqueue需要动态的分配创建pwq,pool。而unbound workqueue又分为两种类型排队运行的工作队列和标准的工作队列,排队运行的工作队列所有的工作将排队依次运行而标准的工作队列意味这可并发。两种类型的队列属性分别存储在ordered_wq_attrs[]和unbound_std_wq_attrs[]两个数组中。当检查到workqueue是unbound类型后,就调用apply_workqueue_attrs分别传入不同的属性创建线程池。 A.unbound std类型:与内存节点node有关联,将会为每个node节点创建高优先级和普通优先级的线程池,之所以这么做是因为不同的node节点之间切会带来性能损耗,因此每个node都创建2个线程池。 B.unbound ordered类型:与内存节点node没有关联了,多个同一优先级的workqueue共享一个worker_pool。 调度工作运行 触发调度 触发调度最后都会调用到_queue_work来执行,只是根据不同的场景有不同的参数可选择配置。对于常用的queue_work(struct workqueue_struct *wq,struct work_struct *work)来说是将work递交到指定的wq来运行,一般用户自己创建的wq,而schedule_work(struct work_struct *work)只有一个参数,将work递交到默认的system_wq来运行,queue_work和schedule_work递交的work在wq中处理,都不绑定cpu,这里的绑定不是WQ_UNBOUND,WQ_UNBOUND和WORK_CPU_UNBOUND是有区别的,WQ_UNBOUND确定WQ的类型是normal Per-CPU worker_pool还是unbound worker_pool,确定了WQ的类型才使用WORK_CPU_UNBOUND进一部分确定work是否要绑定cpu运行。 static void __queue_work(int cpu, struct workqueue_struct *wq, struct work_struct *work) { struct pool_workqueue *pwq; struct worker_pool *last_pool; struct list_head *worklist; unsigned int work_flags; unsigned int req_cpu = cpu; /* * While a work item is PENDING && off queue, a task trying to * steal the PENDING will busy-loop waiting for it to either get * queued or lose PENDING. Grabbing PENDING and queueing should * happen with IRQ disabled. */ lockdep_assert_irqs_disabled(); /* if draining, only works from the same workqueue are allowed */ if (unlikely(wq->flags & __WQ_DRAINING) && WARN_ON_ONCE(!is_chained_work(wq))) return; rcu_read_lock(); retry: /* pwq which will be used unless @work is executing elsewhere */ //先判断wq unbound还是bound,如果是unbound,那么cpu选择的范围需要根据node //节点来限制,如果是bound那么cpu就限定了。 //WQ_UNBOUND用于区分是wq是bound类型还是未bound类型,WORK_CPU_UNBOUND //用于进一步限定woker_pool线程池是否指定选择那一个cpu。 if (wq->flags & WQ_UNBOUND) { if (req_cpu == WORK_CPU_UNBOUND) cpu = wq_select_unbound_cpu(raw_smp_processor_id()); //unbound类型,cpu也没有绑定,则选择一个cpu,优先选择本地cpu。 pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu)); } else { if (req_cpu == WORK_CPU_UNBOUND) cpu = raw_smp_processor_id(); //如果req_cpu类型是WORK_CPU_UNBOUND,那么也是选择当前代码运行的cpu pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); } /* * If @work was previously on a different pool, it might still be * running there, in which case the work needs to be queued on that * pool to guarantee non-reentrancy. */ //查询work是否已经在线程池内了 last_pool = get_work_pool(work); //如果work已经在线程池中,且当前所在的worker_pool与获取的worker_pool不是同一个 if (last_pool && last_pool != pwq->pool) { struct worker *worker; raw_spin_lock(&last_pool->lock); //查询work对应的挂到那个worker上 worker = find_worker_executing_work(last_pool, work); //如果当前的worker所在的wq和请求的wq相同,则获取当前运行worker的pwq,//相当于前面获取的pwq被覆盖更新了,这样做的目的是防止work被重复放到不同的 //线程池中处理,也就是说当一个work被某一个线程池中接受处理后,一直需要等待//其处理完毕,不能更换另外另外一个线程池worker_pool。 if (worker && worker->current_pwq->wq == wq) { pwq = worker->current_pwq; } else { /* meh... not running there, queue here */ raw_spin_unlock(&last_pool->lock); raw_spin_lock(&pwq->pool->lock); } } else { raw_spin_lock(&pwq->pool->lock); } /* * pwq is determined and locked. For unbound pools, we could have * raced with pwq release and it could already be dead. If its * refcnt is zero, repeat pwq selection. Note that pwqs never die * without another pwq replacing it in the numa_pwq_tbl or while * work items are executing on it, so the retrying is guaranteed to * make forward-progress. */ if (unlikely(!pwq->refcnt)) { if (wq->flags & WQ_UNBOUND) { raw_spin_unlock(&pwq->pool->lock); cpu_relax(); goto retry; } /* oops */ WARN_ONCE(true, \"workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt\", wq->name, cpu); } /* pwq determined, queue */ trace_workqueue_queue_work(req_cpu, pwq, work); if (WARN_ON(!list_empty(&work->entry))) goto out; pwq->nr_in_flight[pwq->work_color]++; work_flags = work_color_to_flags(pwq->work_color); //如果work运行的数量还没有超过限制,则获取pwq对应pool的worklist,也就是说后面 //会将work加入到该work中,如果数量超了则挂到临时pwq未激活的链表上。 if (likely(pwq->nr_active < pwq->max_active)) { trace_workqueue_activate_work(work); pwq->nr_active++; worklist = &pwq->pool->worklist; //获取到的worklist如果上面没有工作,则更新时间。 if (list_empty(worklist)) pwq->pool->watchdog_ts = jiffies; } else { work_flags |= WORK_STRUCT_INACTIVE; worklist = &pwq->inactive_works; } debug_work_activate(work); //将work插入到worklist上。 insert_work(pwq, work, worklist, work_flags); out: raw_spin_unlock(&pwq->pool->lock); rcu_read_unlock(); } static void insert_work(struct pool_workqueue *pwq, struct work_struct *work, struct list_head *head, unsigned int extra_flags) { struct worker_pool *pool = pwq->pool; /* record the work call stack in order to print it in KASAN reports */ kasan_record_aux_stack_noalloc(work); //设置flag,并将work添加到worklist链表上 /* we own @work, set data and link */ set_work_pwq(work, pwq, extra_flags); list_add_tail(&work->entry, head); get_pwq(pwq); /* * Ensure either wq_worker_sleeping() sees the above * list_add_tail() or we see zero nr_running to avoid workers lying * around lazily while there are works to be processed. */ smp_mb(); //如果没有空闲的worker了,则唤醒一个新的worker运行。 if (__need_more_worker(pool)) wake_up_worker(pool); } static void wake_up_worker(struct worker_pool *pool) { //从pool的idle_list上获取一个worker,触发运行。 struct worker *worker = first_idle_worker(pool); if (likely(worker)) wake_up_process(worker->task); } 执行调度 static int worker_thread(void *__worker) { struct worker *worker = __worker; struct worker_pool *pool = worker->pool; /* tell the scheduler that this is a workqueue worker */ //通知调度器,这是一个worker set_pf_worker(true); woke_up: raw_spin_lock_irq(&pool->lock); /* am I supposed to die? */ //判断该worker是否要退出? if (unlikely(worker->flags & WORKER_DIE)) { raw_spin_unlock_irq(&pool->lock); WARN_ON_ONCE(!list_empty(&worker->entry)); set_pf_worker(false); set_task_comm(worker->task, \"kworker/dying\"); ida_free(&pool->worker_ida, worker->id); worker_detach_from_pool(worker); kfree(worker); return 0; } //离开idle状态,被唤醒之前都是idle状态 worker_leave_idle(worker); recheck: //检查线程池上是否有work要处理并且当前没有正在运行的worker了则进行处理,否则 //进入休眠状态。 /* no more worker necessary? */ if (!need_more_worker(pool)) goto sleep; //如果pool上的idle worker数量为0,则创建一个worker备用 /* do we need to manage? */ if (unlikely(!may_start_working(pool)) && manage_workers(worker)) goto recheck; /* * ->scheduled list can only be filled while a worker is * preparing to process a work or actually processing it. * Make sure nobody diddled with it while I was sleeping. */ WARN_ON_ONCE(!list_empty(&worker->scheduled)); /* * Finish PREP stage. We\'re guaranteed to have at least one idle * worker or that someone else has already assumed the manager * role. This is where @worker starts participating in concurrency * management if applicable and concurrency management is restored * after being rebound. See rebind_workers() for details. */ worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND); do { //从线程池pool->worklist上的获取一个work struct work_struct *work = list_first_entry(&pool->worklist, struct work_struct, entry); pool->watchdog_ts = jiffies; //如果work不是linked,这里的link指的是有关联的work? if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) { /* optimization path, not strictly necessary */ //处理work process_one_work(worker, work); if (unlikely(!list_empty(&worker->scheduled))) process_scheduled_works(worker); } else { //特殊的work,先将其插入到worker->scheduled上,然后调度一起运行。 move_linked_works(work, &worker->scheduled, NULL); process_scheduled_works(worker); } } while (keep_working(pool)); //一直循环,直到pool->worklist上链表的work处理完成。 worker_set_flags(worker, WORKER_PREP); sleep: /* * pool->lock is held and there\'s no work to process and no need to * manage, sleep. Workers are woken up only while holding * pool->lock or from local cpu, so setting the current state * before releasing pool->lock is enough to prevent losing any * event. */ //处理完了,继续进入idle状态 worker_enter_idle(worker); __set_current_state(TASK_IDLE); raw_spin_unlock_irq(&pool->lock); schedule(); goto woke_up; } static void process_one_work(struct worker *worker, struct work_struct *work) __releases(&pool->lock) __acquires(&pool->lock) { struct pool_workqueue *pwq = get_work_pwq(work); struct worker_pool *pool = worker->pool; bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE; unsigned long work_data; struct worker *collision; #ifdef CONFIG_LOCKDEP /* * It is permissible to free the struct work_struct from * inside the function that is called from it, this we need to * take into account for lockdep too. To avoid bogus \"held * lock freed\" warnings as well as problems when looking into * work->lockdep_map, make a copy and use that here. */ struct lockdep_map lockdep_map; lockdep_copy_map(&lockdep_map, &work->lockdep_map); #endif /* ensure we\'re on the correct CPU */ WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) && raw_smp_processor_id() != pool->cpu); /* * A single work shouldn\'t be executed concurrently by * multiple workers on a single cpu. Check whether anyone is * already processing the work. If so, defer the work to the * currently executing one. */ //检查work已经在其他worker上执行,则将work放入对应的worker->scheduled中延后执行 collision = find_worker_executing_work(pool, work); if (unlikely(collision)) { move_linked_works(work, &collision->scheduled, NULL); return; } //将要执行的work更新为当前要运行的work /* claim and dequeue */ debug_work_deactivate(work); hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work); worker->current_work = work; worker->current_func = work->func; worker->current_pwq = pwq; work_data = *work_data_bits(work); worker->current_color = get_work_color(work_data); /* * Record wq name for cmdline and debug reporting, may get * overridden through set_worker_desc(). */ strscpy(worker->desc, pwq->wq->name, WORKER_DESC_LEN); list_del_init(&work->entry); /* * CPU intensive works don\'t participate in concurrency management. * They\'re the scheduler\'s responsibility. This takes @worker out * of concurrency management and the next code block will chain * execution of the pending work items. */ //如果当前cpu是密集型,则设置一个标志位 if (unlikely(cpu_intensive)) worker_set_flags(worker, WORKER_CPU_INTENSIVE); /* * Wake up another worker if necessary. The condition is always * false for normal per-cpu workers since nr_running would always * be >= 1 at this point. This is used to chain execution of the * pending work items for WORKER_NOT_RUNNING workers such as the * UNBOUND and CPU_INTENSIVE ones. */ //再次检查,是否需要wake up另外的worker,对于normal per-cpu worker总不会触发, //主要针对unbound 和 cpu 密集型 if (need_more_worker(pool)) wake_up_worker(pool); /* * Record the last pool and clear PENDING which should be the last * update to @work. Also, do this inside @pool->lock so that * PENDING and queued state changes happen together while IRQ is * disabled. */ set_work_pool_and_clear_pending(work, pool->id); raw_spin_unlock_irq(&pool->lock); lock_map_acquire(&pwq->wq->lockdep_map); lock_map_acquire(&lockdep_map); /* * Strictly speaking we should mark the invariant state without holding * any locks, that is, before these two lock_map_acquire()\'s. * * However, that would result in: * * A(W1) * WFC(C) * A(W1) * C(C) * * Which would create W1->C->W1 dependencies, even though there is no * actual deadlock possible. There are two solutions, using a * read-recursive acquire on the work(queue) \'locks\', but this will then * hit the lockdep limitation on recursive locks, or simply discard * these locks. * * AFAICT there is no possible deadlock scenario between the * flush_work() and complete() primitives (except for single-threaded * workqueues), so hiding them isn\'t a problem. */ lockdep_invariant_state(true); trace_workqueue_execute_start(work); //执行work函数 worker->current_func(work); /* * While we must be careful to not use \"work\" after this, the trace * point will only record its address. */ trace_workqueue_execute_end(work, worker->current_func); lock_map_release(&lockdep_map); lock_map_release(&pwq->wq->lockdep_map); if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { pr_err(\"BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\\n\" \" last function: %ps\\n\", current->comm, preempt_count(), task_pid_nr(current), worker->current_func); debug_show_held_locks(current); dump_stack(); } /* * The following prevents a kworker from hogging CPU on !PREEMPTION * kernels, where a requeueing work item waiting for something to * happen could deadlock with stop_machine as such work item could * indefinitely requeue itself while all other CPUs are trapped in * stop_machine. At the same time, report a quiescent RCU state so * the same condition doesn\'t freeze RCU. */ cond_resched(); raw_spin_lock_irq(&pool->lock); /* clear cpu intensive status */ if (unlikely(cpu_intensive)) worker_clr_flags(worker, WORKER_CPU_INTENSIVE); /* tag the worker for identification in schedule() */ worker->last_func = worker->current_func; /* we\'re done with it, release */ hash_del(&worker->hentry); worker->current_work = NULL; worker->current_func = NULL; worker->current_pwq = NULL; worker->current_color = INT_MAX; pwq_dec_nr_in_flight(pwq, work_data); //pwq->nr_active--,判断是否要处理延时队列, //将此前pwq->inactive_works添加到pool->worklist } 总结:工作队列可以分为3种,Per CPU,Unbound,Ordered(属于unbound的一种) - Per CPU: create_workequeue(name),创建的WQ使用的线程池是使用Per - CPU静态定义的线程池,每个CPU有两个线程池,分别对于高优先级和低优先级。当触发queue_work时,如果没有指定cpu,则将work递交给当前运行cpu的线程池。该类型的wq,同一个work不能同时递交到多个线程池上运行,同时在一个线程池中也不能同时有多个worker来运行,work一旦选定worker将需要在该worker上运行结束;不同的work可以递交到不同的线程池,这样会在不同的cpu上并发执行。 - Unbound:create_freezable_workqueue(name),创建的WQ使用的线程池动态创建的,会优先选择当前代码运行的cpu,获取到对应的node节点,然后查询当前node节点上是否有线程池,如果有则递交到该线程池处理,如果没有则新建一个线程池。对应这种类型的wq是需要考虑功耗的,在选择cpu时会尽可能的选择当前运行的cpu,让已经休眠的cpu尽可能的保持休眠,毕竟cpu从休眠到唤醒会有更大功耗消耗。如果没有NUMA的架构,那么就只有一个dfl默认的线程池。 - Ordered:create_singlethread_workqueue(name),该类型的wq也是unbound的一种,只不过该工作队列只有一个线程池,这样可以保证工作队列的工作可以顺序运行,也就是说在该wq上的work是没法并发运行的,只能排队运行。 线程池动态管理 线程池将进行动态的增减管理worker,当创一个新的wq后,线程池至少会创建一个worker,该worker会挂到pool->idle_list链表上,当用户调用queue_work或者有另外一个worker处理work导致进入休眠将会触发worker从idle转到running状态进行处理work,当worker从idle进入running中状态时检查是否还有剩余的worker,如果没有了需要动态再创建要给worker,以备后续其他work的运行。处于running状态的worker如果在执行work回调函数时遇到阻塞该worker将进入到休眠状态,唤醒后再次返回到运行状态。当worker处理完线程池pool->worklist上的所有work时将会进入到idle状态,后台会有一个定时器检查处于idle状态的worker,如果大于1个就进行销毁。 下面是running->sleep触发另外一个worker运行和worker销毁的流程。
  • 软中断和tasklet

    软中断和tasklet

    Linux的中断分为上下部机制,上半部在中断上下文中关闭了本地CPU中断响应,下半部是在中断线程中处理。在Linux系统没有引入中断线程化机制之前,就已经出现了一些下半部的机制,如软中断SoftIRQ,Tasklet和workqueue。 SoftIRQ是预留给系统对时间要求比较严格进行使用的,Linux系统已经定义了软中断的类型,通常情况下用户部需要修改软中断的类型,对于用户来说可以使用tasklet机制。 enum { HI_SOFTIRQ=0, 最高优先级的软中断 TIMER_SOFTIRQ, Timer定时器软中断 NET_TX_SOFTIRQ, 网络发包软中断 NET_RX_SOFTIRQ, 网络收包软中断 BLOCK_SOFTIRQ, 块设备软中断 IRQ_POLL_SOFTIRQ, IO轮询的块设备软中断 TASKLET_SOFTIRQ, tasklet软中断 SCHED_SOFTIRQ, 进程调度以及负载均衡软中断 HRTIMER_SOFTIRQ, 高精度定时器软中断 RCU_SOFTIRQ, RCU服务软中断 NR_SOFTIRQS }; 抢占计数 在讨论softirq前,我们先来讨论下抢占计数,这与softirq有密切的关系。Linux配置打开了CONFIG_PREEMPT表示允许高优先级的任务抢占低优先级任务,但是在spin lock,中断/软中断上下文中依旧不允许抢占的。在linux系统中使用了一个Per-CPU的32位变量来标识一些特殊场景,如下。 PREEMPT_BITS:用于记录禁止抢占计数,当调用preempt_enable/preempt_disable会进行减加操作,直到count位0才能允许开启抢占。上述还包括spin lock,read lock等都会禁止抢占。 SOFTIRQ_BITS:软中断计数区域,进入软中断会调用local_bh_disble计数+1,退出软中断调用local_bh_enable计数-1,当SOFTIRQ_BITS为正数就表明处于softirq上下文中,in_softirq为真。其中第8位表示正在处理软中断。 HARDIRQ_BITS:硬件中断计数区,进入+1,退出时-1,当HARDIRQ_BITS为正数时表明处于hardirq上下文中,in_hardirq为真。 MNI_BITS:置位表示进入MNI中断上下文。 #define nmi_count() (preempt_count() & NMI_MASK) #define hardirq_count() (preempt_count() & HARDIRQ_MASK) #ifdef CONFIG_PREEMPT_RT # define softirq_count() (current->softirq_disable_cnt & SOFTIRQ_MASK) #else # define softirq_count() (preempt_count() & SOFTIRQ_MASK) #endif #define irq_count() (nmi_count() | hardirq_count() | softirq_count()) /* * Macros to retrieve the current execution context: * * in_nmi() - We're in NMI context * in_hardirq() - We're in hard IRQ context * in_serving_softirq() - We're in softirq context * in_task() - We're in task context */ #define in_nmi() (nmi_count()) #define in_hardirq() (hardirq_count()) #define in_serving_softirq() (softirq_count() & SOFTIRQ_OFFSET) #define in_task() (!(in_nmi() | in_hardirq() | in_serving_softirq())) /* * The following macros are deprecated and should not be used in new code: * in_irq() - Obsolete version of in_hardirq() * in_softirq() - We have BH disabled, or are processing softirqs * in_interrupt() - We're in NMI,IRQ,SoftIRQ context or have BH disabled */ #define in_irq() (hardirq_count()) #define in_softirq() (softirq_count()) #define in_interrupt() (irq_count()) in_irq: 说明HARDIRQ_BITS位有置起,说明当前有正在处理的中断handler(top half),只要hardirq_count大于0,就说明是IRQ Context。 in_softirq:说明SOFTIRQ_BITS位有置起,但是也不能完全说是当前正在执行softirq,其稍微有点特殊。当softirq正在执行的时候,softirq count会增加,那是处于softirq context没有错,但是再其他场景下也会将softirq count增加,比如在进程上下文中出于对临界区的包含,会调用local_bh_disable/enable保护临界区,那么这也会置起SOFTIRQ_BITS,但是并没有在执行softirq,因此对于softirq还是使用in_serving_softirq来判断是否softirq正在处理。 当MNI_BITS|HARDIRQ_BITS|SOFTIRQ_BITS置位表示处于中断上下文中,in_interrupt为真,当使能CONFIG_PREEMPT后,中断返回会检测这个preempt_count变量,当该值如果为0时才能允许被抢占。 注册软中断 系统中为每个CPU都创建了一个中断线程用于处理软中断。 DEFINE_PER_CPU(struct task_struct *, ksoftirqd); EXPORT_PER_CPU_SYMBOL_GPL(ksoftirqd); static struct smp_hotplug_thread softirq_threads = { .store = &ksoftirqd, .thread_should_run = ksoftirqd_should_run, .thread_fn = run_ksoftirqd, .thread_comm = "ksoftirqd/%u", }; static __init int spawn_ksoftirqd(void) { cpuhp_setup_state_nocalls(CPUHP_SOFTIRQ_DEAD, "softirq:dead", NULL, takeover_tasklets); BUG_ON(smpboot_register_percpu_thread(&softirq_threads)); //为每个CPU创建一个softirq处理线程 return 0; } early_initcall(spawn_ksoftirqd); struct softirq_action { void (*action)(struct softirq_action *); }; static struct softirq_action softirq_vec[NR_SOFTIRQS] 系统中定义了一个全局struct softirq_action变量用于存储各类型中断的回调函数,系统通过open_softirq函数来注册回调函数。 void open_softirq(int nr, void (*action)(struct softirq_action *)) { softirq_vec[nr].action = action; } 在linux系统中为每个CPU都维护了一个softirq的状态信息。 typedef struct { unsigned int __softirq_pending; #ifdef ARCH_WANTS_NMI_IRQSTAT unsigned int __nmi_count; #endif } ____cacheline_aligned irq_cpustat_t; DECLARE_PER_CPU_ALIGNED(irq_cpustat_t, irq_stat); 触发softirq 从上图可以看出触发softirq主要有两个函数raise_softirq和raise_softirq_irqoff。raise_softirq会关闭本地中断响应,而raise_softirq_irqoff不会。 raise_softirq是用于进程上下文触发软中断的,而在进程上下文关闭本地cpu的中断响应是为了保护临界访问,软中断的触发实际是设置软中断的pending位,而只需要关闭本地中断响应是因为每个cpu都维护了一个软中断的pending位。 raise_softirq_irqoff是用于中断上下文触发中断的,实际上大部分的软中断都是在中断上下文触发的,因为本身就处于中断上下文了,本地中断响应已经关闭了,因此不需要再支持local_irq_save。 软中断处理过程中,调用__raise_softirq_irqoff设置软中断的pengding位,这样再执行软中断是会进行检测该位,如果使能才会执行软中断。 在raise_softirq_irqoff中还会进行判断是否处于中断上下文中且是否有软中断置位,对于处于进程上下文触发的软中断,那么可以直接激活软中断线程进程处理软中断。而处于中断上下文中,则什么都不用做,因为在中断退出时才会检测软中断进行执行软中断,因此对于处于中断上下文触发的软中断,仅仅是设置软中断的pending位。 执行softirq 软中断执行时机可以分为3种情况,分别是在中断退出时检测是否有软中断执行、进程上下文中主动执行、在spin_unlock_bh(实际调用__local_bh_enable_ip)。 先来看一下执行中断退出执行软中断的场景,如上图进程被中断打断后进入到中断上下文中,进入中断后cpu硬件会自动关闭cpu本地中断响应,处理中断完成后在执行irq_exit中断退出时,当检测到有软中断pending时执行软中断;如果软中断是在中断上下文执行时,在软中断处理中会调用local_irq_enable打开CPU本地中断响应再处理软中断程序,如果是触发的软中断线程,硬中断已经完成退出也会使能本地中断。因此在软中断执行过程中打开了中断响应,所以可能会再次进入硬中断上下文。 再来看上面这张图,在一个task中处理一个变量此时被硬件中断打断进行中断处理函数,在中断处理快结束时如果有软中断pending将会先处理软中断,如果软中断中也访问了该变量,那么就出现竞态异常,因此为了处理进程和软中断的竞态,调用spin_lock_bh和spin_unlock_bh进行保护,在硬件中断处理完要进入软中断将会被禁止,硬件中断会被直接退出,继而task可以继续运行,当task再执行spin_unlock_bh时会触发执行软中断。另外如果软中断处理函数中的竞态可能在多核直接发生,为了保护多核的临界处理在软中断中只需要调用spin_lock和spin_unlock即可,不需要调用spin_lock_bh和spin_unlock_bh,因为每个cpu上只有一个软中断可以运行不需要做软中断之间的临界保护。总结就是在进程上下文中要避免软中断和多核的竞态保护就调用spin_lock_bh和spin_unlock_bh,软中断中避免多核的竞态保护就调用spin_lock和spin_unlock即可。 Tasklet Tasklet是软中断类型的一种,便于用户实现软中断。 struct tasklet_struct { struct tasklet_struct *next; 多个tasklet串成一个链表 unsigned long state; 标记tasklet的状态 atomic_t count; 正数表示本地已经有运行的tasklet,只有为0才能运行。 bool use_callback; union { void (*func)(unsigned long data); void (*callback)(struct tasklet_struct *t); }; tasklet的处理回调函数 unsigned long data; 传参 }; Tasklet是softirq的一种,系统中分配了两个标志可用于tasklet软中断,分别是高优先级的HI_SOFTIRQ和普通优先级的TASKLET_SOFTIRQ,系统中每个CPU对应维护两个tasklet链表,一个HI_SOFTIRQ类型的链表和一个TASKLET_SOFTIRQ类型的链表,用于管理维护用户注册的tasklet。在进行处理软中断是,会优先级处理HI_SOFTIRQ类型的tasklet。 /* * Tasklets */ struct tasklet_head { struct tasklet_struct *head; struct tasklet_struct **tail; }; static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec); static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec); Softirq在初始化时会为tasklet注册全局的软中断回调函数。 void __init softirq_init(void) { int cpu; for_each_possible_cpu(cpu) { per_cpu(tasklet_vec, cpu).tail = &per_cpu(tasklet_vec, cpu).head; per_cpu(tasklet_hi_vec, cpu).tail = &per_cpu(tasklet_hi_vec, cpu).head; } open_softirq(TASKLET_SOFTIRQ, tasklet_action); open_softirq(HI_SOFTIRQ, tasklet_hi_action); } tasklet的创建 用户可以使用下面两个宏来定义一个tasklet,区别一个已经处于enable另外一个处于disable。 #define DECLARE_TASKLET(name, _callback) \\ struct tasklet_struct name = { \\ .count = ATOMIC_INIT(0), \\ .callback = _callback, \\ .use_callback = true, \\ } #define DECLARE_TASKLET_DISABLED(name, _callback) \\ struct tasklet_struct name = { \\ .count = ATOMIC_INIT(1), \\ .callback = _callback, \\ .use_callback = true, \\ } 如:static DECLARE_TASKLET(fst_tx_task, fst_process_tx_work_q); 除了使用宏来定义外,还可以使用tasklet_init来动态初始化分配。 void tasklet_init(struct tasklet_struct *t, void (*func)(unsigned long), unsigned long data) { t->next = NULL; t->state = 0; atomic_set(&t->count, 0); t->func = func; t->use_callback = false; t->data = data; } EXPORT_SYMBOL(tasklet_init); 触发执行tasklet 触发tasklet得到执行,也比较简单,上面以普通的tasklet为例。用户可调用tasklet_schedule,该函数将tasklet插入到链表中然后调用raise_softirq_irqoff触发软中断,最终触发了softirq,关于softirq的运行在上一章节中已经描述,最后softirq运行tasklet_action,在该函数中进行while遍历链表执行对应的tasklet回调函数。
  • Linux中断实现

    Linux中断实现

    interrupt controller初始化 设备树中对gic-v3的描述如下,其中interrupt-controller标识了该设备是一个中断控制器。 interrupt-controller@3400000 { compatible = "arm,gic-v3"; #interrupt-cells = <0x03>; #address-cells = <0x00>; interrupt-controller; reg = <0x00 0x3400000 0x00 0x10000 0x00 0x3460000 0x00 0xff004>; interrupt-parent = <0x18>; phandle = <0x18>; }; compatible:中断设备节点的属性别名 Interrupt-cells:用来描述子节点interrupts属性的值,为3表明interrupts有3个32bits整数来描述。 Interrupt-parent:标识此设备节点属于那一个中断控制器,gic可以设置为自己。 驱动匹配compatible的定义在drivers/irqchip/irq-gic-v3.c中,如下: IRQCHIP_DECLARE(gic_v3, "arm,gic-v3", gic_of_init); 展开后为: #define _OF_DECLARE(table, name, compat, fn, fn_type) \\ static const struct of_device_id __of_table_gic_v3 \\ __used __section("__" #table "_of_table") \\ __aligned(__alignof__(struct of_device_id)) \\ = { .compatible = "arm,gic-v3", \\ .data = gic_of_init } 上图是中断控制器驱动匹配设备树的调用路径,调用到of_irq_init函数,传入的参数是__irqchip_of_table,该table中定义了所有interrupt controller的compatible信息,在一个完整的系统中不仅只有一个gic控制器,还包括一些级联等其他控制器,通过宏定义IRQCHIP_DECLARE的方式会集中在__irqchip_of_table这个段。 IRQ domain 硬件中断号:1.1和1.2章节控制器为每个硬件中断源分配了一个唯一编号,用于区分不同的中断源。 软件中断号:Linux系统在处理中断过程中使用的编号,也称为虚拟中断号。 为什么要进行中断映射? 上图是一个HW interrupt ID 到Linux IRQ的映射关系图示例。在Linux软件处理过程中,不应该关注中断来自那个中断来源,尤其在中断系统结构中,会出现级联的情况,中断控制器的中断源是另外一个中断控制器的输出,不同的中断控制器会出现重复中断源编号。使用级联这样做的好处就是扩展的中断请求的数量,同时对中断源可以按照控制器分类。如GPIO类型中断控制器,对于Port A(GPIO A1~GPIO A20),这20个GPIO组成的一个中断控制器给到主控制器,也就是说GPIO A1~GPIO A20给到上一级的控制是一个中断信号,系统收到这个信号需要再去读取GPIO Port A的控制器相关掩码进一步判断到是那个GPIO。 Linux系统为了处理硬件中断到虚拟中断的映射关系,引入了Linux IRQ domain。每个中断控制器对应一个IRQ domain,在2.1章节中IRQ domain负责将硬件的中断编号 HW interrupt ID与Linux系统中的IRQ number进行映射。目前IRQ domain支持2中映射方式:linear map,tree map,no map Linear map:系统中维护一个数组,硬件中断号的就是数组的索引,取值就是对应的IRQ number。 Tree map:当HW interrupt数量比较多时,使用Linear map会消耗比较大的内存,选择用树的方式进行映射可以节约内存。 No map:有些控制器可以支持通过些寄存器配置HW interrupt id而不是由物理连接决定,那么这种情况下就不需要进行映射了。 上图是设备树中级联描述关系,pinctrl设备虽然没有描述interrupt-parent节点,那么就默认使用开头定义的interrupt-parent = ,因此pinctrl节点的父中断控制器是interrupt-controller@0,而interrupt-contrller@0指明了interrupt-parent=,所有其父中断控制器是interrupt-controller@3400000即GIC,Timer_arch的interrupt-parent直接指向的是GIC。 上图为struct irq_domain的数据结构,下面是对关键变量的说明。 - link:所有创建的irq domain链接到全局链表irq_domain_list中。 - ops: 对应irq domain操作使用方法集合。 - revmap_size:映射表的大小。 - revmap_tree:Radix Tree映射的根节点。 - revmap[]:反向映射表,将硬件中断号映射回中断数据结构。 root domain创建(GIC) 在interrupt controller初始化章节,在of_irq_init函数中会调用for_each_matching_node进行遍历设备设备树是否与__irqchip_of_table中的定义的匹配,如果匹配并且是interrupt-controller则获取对应的data(即回调函数),并将其添加intc_desc_list链表中。最后在逐一遍历调用对应的回调函数,gic控制器就对应调用的是gic_of_init。 gic控制器初始化调用gic_of_init函数,在该函数中较关键的是调用gic_init_bases进行了一系列的初始化。在linux系统中,定义了一个struct gic_chip_data gic_data[CONFIG_ARM_GIC_MAX_NR] __read_mostly的全局变量,用于管理gic中断控制器。在gic_init_bases创建了gic irq domain(下一章节描述),设置中断函数的入口gic_handler_irq等等。 在interrupt controller初始化章节,gic_init_bases调用irq_domain_crate_tree创建了一个irq domain,该函数调用__irq_domain_add添加一个irq domain。 首先分配了一个struct domain数据结构并初始化相关成员,其中domain->revmap_tree树用于hwirq和virq直接的映射;接着填充domain的操作函数结合ops;再调用irq_domain_check_hierarchy检测domain是否为级联(有子中断控制器连接?),判断的方式是检测ops->alloc是否为空,这里明显是不为空,所以gic是hierarchy,这里决定着后续在映射中断号时的区别(见2.2.2),最后将domain添加到全局链表irq_domain_list中。 中断号映射(GIC) 中断号的映射就是将硬件中断号与Linux系统中的中断号建立起映射关系。在linux系统中,每个linux中断号都对应struct irq_desc实体,其与各关键数据结构的关系如上图。 Linux系统中大部分在DTS描述的节点是可以转换为platform_device的,如果其节点在DTS中指定了中断属性,那么可以在DTS解析设备树的时候可以获取到设备的中断信息,进而建立起映射关系。而对于如果节点没有转换为platform_device信息的,如I2C,SPI设备,那么需要在驱动中主动调用of_irq_get函数去实现,如下。 static int spi_probe(struct device *dev) { const struct spi_driver *sdrv = to_spi_driver(dev->driver); struct spi_device *spi = to_spi_device(dev); int ret; ret = of_clk_set_defaults(dev->of_node, false); if (ret) return ret; if (dev->of_node) { spi->irq = of_irq_get(dev->of_node, 0); if (spi->irq == -EPROBE_DEFER) return -EPROBE_DEFER; if (spi->irq < 0) spi->irq = 0; } ret = dev_pm_domain_attach(dev, true); if (ret) return ret; if (sdrv->probe) { ret = sdrv->probe(spi); if (ret) dev_pm_domain_detach(dev, true); } return ret; } 下面描述从DTS中解析中断属性建立起映射的过程,下面是DTS中设备对中断相关的描述。 xxx@xxxxx { ... interrupt-parent = <0x18>; interrupts = <0x00 0x37 0x04>; ... }; interrupt-parent:指向其父节点,表示中断信号由那个中断控制器来处理,这个属性取值是一个整数或引用到其他节点的phandle。也就是说该字段指明了所属的中断控制器。 interrupts:指定中断的详细信息,<中断类型,中断号,方式>。 内核初始化阶段,会解析DTS中设备,进而解析设备中中断信息,首先调用of_irq_count函数统计设备节点dev中的中断数量num_irq(大多数的设备只有一个中断),接着根据num_irq中断数量调用of_irq_to_resource解析节点的中断信息。在of_irq_to_resource函数中调用of_irq_get函数获取中断号IRQ。 在of_irq_get函数中调用of_irq_parse_one解析中断的描述信息,包括DTS中interrupt-parent和interruts的信息,包含了该中断所属的父节点,中断类型,中断号,中断触发方式等。 获取到中断信息后,调用irq_domain_translate将DTS中描述的中断号(interrupts的第二个cells)转为hwiq,转换方法为如果是SPI类型的中断+32,如果是PPI类型的中断+16。见1.1和1.2章节中,PPI和SGI类型的中断占用了前32号。 获取到hwiq后先调用irq_find_mapping查询是否能获取到virq,如果获取到了则结束整个过程。如果没有获取到中断号,先进行判断domain是否为hierarchy,如果是级联的方式则调用irq_domain_alloc_irqs来进行映射,否则调用irq_carete_mapping来进行映射,对于gic来说这里调用的是irq_domain_alloc_irqs。 无中断控制器映射 中断line接入到的控制器是非级联的,则调用irq_create_mapping进行映射,主要分为两部分,创建desc和建立hwirq和virq的映射。 调用irq_domain_alloc_decs分配一个struct desc描述符,一个virq对应一个struct desc实体。struct desc实体通过virq作为键值插入到irq_desc_tree中,这样就建立起virq与desc直接的联系。 调用irq_domain_associate建立hwirq和virq的映射,会根据hwirq的值来进行判断,如果hwirq比较小采用线性映射的方式即revmap[hwirq]=irq_data,irq_data中存储了irq,hwirq,domain等信息。如果hwirq较大,则使用树映射。 有中断控制器的映射 级联中断控制器通常是中断输入接了有下一级的中断控制器,一般GIC都是级联中断控制器,从上图函数调用关系(黄色部分是与非级联的差异)可以看出级联中断控制器与非级联中断控制器的主要区别是子中断控制器需要为父中断控制器分配irq_data,由其irq_data->parent_data指向其值(有什么用?),调用irq_domain_alloc_irqs_hierarchy创建中断控制器的级联信息(见下)。 以gic_irq_domain_alloc为例,主要的作用先调用gic_irq_domain_translate进行转译hwirq(看起来与irq_domain_translate重复了),接着调用gic_irq_domain_map完成domain信息的设置,根据中断类型设置中断的入口函数,SPIs类型的入口函数为handle_fasteoi_irq,注意2.1章节gic_handle_irq是所有中断的入口,最后再根据中断类型进行分类下陷到各类型入口,之间的关系是gic_handle_irq->......->handle_fasteoi_irq。 小结 (1)解析DTS中断信息,包括父节点,中断号,中断类型,触发类型等。 (2)从allocated_irq位图中获取一个空闲的IRQ中断号。 (3)为IRQ中断号分配一个struct irq_desc实体,并使用IRQ作为键值插入到irq_desc_tree中,以此就建立了IRQ与desc之间的联系。 (4)设置desc->handle_irq的处理函数。 (5)建立IRQ中断与hwirq中断的联系,分为直接映射和树映射。 中断注册 常用申请中断的几个函数如下。 int request_irq(unsigned int irq,irq_handler_t handler,unsigned long flags,const char *name,void *dev) int devm_request_irq(struct device *dev,unsigned int irq,irq_handler_t handler,unsigned long irqflags,const char *devname,void *dev_id) int request_threaded_irq(unsigned int irq,irq_handler_t handler,irq_handler_t thread_fn,unsigned long flags,const char *name,void *dev); 中断注册常用的函数是request_irq和request_threaded_irq,request_irq最终也会调用到request_theaded_irq,如上图将中断注册的重要几个阶段进行了描述。 (1)首先调用irq_to_desc获取中断描述符irq_desc,传入的参数是linux 的irq number,前面我们提到每个linux irq都对应一个irq_desc,其中断注册的数据信息将会该结构进行导出。 (2)其次会对用户设置的中断处理函数handler和thread_fn进行判断,如果hanlder传入为空,则设置默认为irq_default_primary_handler,也就是说当中断处理第三级函数将会调用该函数,该函数如下仅仅是返回IRQ_WAKE_THRAED,该返回将会唤醒创建的中断线程,在下章节我们再详细描述。 /* * Default primary interrupt handler for threaded interrupts. Is * assigned as primary handler when request_threaded_irq is called * with handler == NULL. Useful for oneshot interrupts. */ static irqreturn_t irq_default_primary_handler(int irq, void *dev_id) { return IRQ_WAKE_THREAD; } (3)接着分配struct irqaction实体,会将用户注册的中断回调等信息进行填充。在中断系统中,会存在irq line不够用的情况,那么就让外设共享一个irq line,也就是说一个irq number对应多个外设,每个外设都对应一个irqaction实体,同故宫irqaction->next链接起来。在中断处理时会遍历这些irqaction进行处理。结合目前这种场景应该时比较少的? (4)最后调用__setup_irq进行设置,这部分又可以再分为4个重要阶段。 首先判断是否设置了中断嵌套,如果设置了中断嵌套则将handler设置为irq_nested_primary_handler,本身linux是不支持中断嵌套的,因此这种场景用途是如何,目前还未遇到? 其次判断是否设置了IRQ_NOTHREAD标志,如果没有说明可以进行强制中断线程化。调用irq_setup_forced_threading进行处理,进入函数如果用户handler为irq_default_primary_handler则直接返回,因为中断线程化实际上就是在中断上下文调用irq_default_primary_handler返回IRQ_WAKE_THREAD激活中断线程,通常handler如果被用设置为NULL则会被设置为irq_default_primary_handler。如果不是上述场景,则接着往下,当handler和thread_fn都不为空,那么则相当于需要创建两个中断线程,一个中断线程用于处理handler的回调,另外一个中断线程用于处理thread_fn,因此需要再申请一个secondary action,在实际的中断处理函数中,第一个中断线程函数会回调handler,然后在激活第二线程处理thread_fn,可以看到new->thread_fn=new->handler表示在线程中回调用户注册的handler函数,而实际的new->handler则为irq_default_primary_handler。最后再调用setup_irq_thread创建中断线程。 接着获取desc->action,如果该值不为空,说明已经注册过该函数,对应中断共享的场景,需要区分处理了那些外设,因此用thread_mask来进行标识。最后*old_ptr=new用于将当前的action放入链表。 最后调用wake_up_and_wait_for_irq_thread_ready来激活中断线程,等待中断函数的触发。 小结,整个中断注册过程中重点需要关注的是否支持强制中断线程化,如果没有强制启动中断线程化,那么用户handler则是中断的顶半部(在中断上下文中处理),而thread_fn则是在线程中处理的。当强制中断线程化的时候,中断的顶半部则为irq_default_primary_handler,该函数直接激活中断线程,用户注册的handler将会变成底半步(在线程中处理用户注册的中断回调),在中断线程中回调处理用户注册的中断服务函数。而当用户同时设置了handler和thread_fn,会创建两个中断线程,第一个线程用于回调处理handler,第二个线程用于处理thread_fn,这种场景应该是比较少的,目前还没有遇到,可以结合下一章节中断处理流程来进行分析理解。 中断处理 中断的处理流程这里分为3级,第一级中断是所有中断的入口gic_handle_irq,对于gic来说中断进行了分配包括PPI,SPI等,每一类注册的中断函数是不一样,因此根据每类注册的中断函数irq desc->handler下陷到第二级中断处理。上面是以SPI类型的中断为例,入口函数为handle_fasteoi_irq,在二级中断处理流程中会最终调用到用户注册的中断回调函数action->handler,下陷到第三级的中断处理。 CPU响应GIC的中断后,会立即关闭本地CPU的中断响应(屏蔽掉CPSR相关的位),只有等中断处理结束后在再次打开才能再次响应中断,这个期间我们称为处理中断的上下文,中断的上下文是禁止调用睡眠的。 上图的第一级和第二级是处于中断上下文中的,而第三级如果没有启用线程化那么也是处于中断上下文(第三级的函数直接在第二级的回调函数中调用),如果启动中断线程化处理,那么用户的中断回调函数即在任务中回调,即脱离了中断上下文。 在中断上下文中处理我们称之为中断上半部(顶半部),在中断线程中处理我们称为下半部(底半部)。因为上半部本地CPU已经关了本地中断响应,无法响应其他中断,需要等待该中断处理完成(发送eoi指令),因此上半部中对于程序的要求比较高,执行程序不能太长,更不能执行睡眠函数,这样会影响其他中断的响应,因此对于程序处理时间太长或有睡眠要求的都启用中断线程化来进行处理,即在下半部运行。 在第三级中启用了中断线程化,需要注意的是oneshot处理,该机制是为了解决中断洪泛引入的。在启用中断线程化是,第一级和第二级中断处理完成后即退出了退出了中断上下文,本地CPU打开了中断响应(上图handle_fasteoi_irq -> chip->irq_eoi),那么之后cpu即可再次响应中断,如果该中断来得非常快,尤其是电平类触发的外设再没有读写数据或清除外设标志时,电平时不会消除的,这就会导致该中断还在线程中没处理完,下一个中断又触发了导致中断洪泛,为了解决这个问题通过oneshot标志来进行判断,在第二级中断处理时如果检测到了oneshot标志,那么就先调用mask_irq设置中断控制器不再响应该中断,这样即使第二级中断处理结束退出打开本地CPU中断响应该中断也不会再触发,因为中断控制器的中断影响被屏蔽了,最后等到该中断在线程中处理完成之后在使能该中断(在irq_finalize_oneshot中unmask_irq)。 级联中断控制器 此前描述了IRQ domain,重点说明了root domain(GIC)创建流程,在实际的架构中,还存在着级联的中断控制器,根中断控制器连接子中断控制器,子中断控制器的输出接入到根中断控制器的输入。根据级联的结构分为两种情况N对1和1对1。 级联N对1是子中断控制器的多个中断共用一个中断输出,如上图的Port B、Port C及Port D各自对应一个中断输出接入到GIC的中断输入,而Port B、Port C及PortD各自对应是Port B1~Port Bn、Port C1~Port Cn及Port D1~Port Dn中断输入,在中断处理上以Port B1为例,linux系统先处理irq number 93的中断(Port B 69的映射),在93号中断处理函数中接着再查询具体是Port B上的那一个端口触发的中断,查询到是PB1且PB1映射号是IRQ 96,则再处理IRQ 96的中断。 级联1对1的子中断控制器每一个中断输入直接对应到根中断控制器的输入,对于这种结构,中断处理流程就与非级联的没有多大区别了,本身就是一对一的关系。 本小结pinctrl为例来说明子中断控制器的创建过程和中断处理流程的差异。Pinctrl是一个中断控制器,对应的就是级联N对1的这种结构。 child domain的创建 pio: pinctrl@xxxx { #address-cells = <1>; compatible = "xxx,xxx-pinctrl"; reg = <0x0 0x02000000 0x0 0x800>; interrupts = <GIC_SPI 69 IRQ_TYPE_LEVEL_HIGH>, /* GPIOB */ <GIC_SPI 71 IRQ_TYPE_LEVEL_HIGH>, /* GPIOC */ <GIC_SPI 73 IRQ_TYPE_LEVEL_HIGH>, /* GPIOD */ <GIC_SPI 75 IRQ_TYPE_LEVEL_HIGH>, /* GPIOE */ <GIC_SPI 77 IRQ_TYPE_LEVEL_HIGH>, /* GPIOF */ <GIC_SPI 79 IRQ_TYPE_LEVEL_HIGH>, /* GPIOG */ <GIC_SPI 81 IRQ_TYPE_LEVEL_HIGH>, /* GPIOH */ <GIC_SPI 83 IRQ_TYPE_LEVEL_HIGH>, /* GPIOI */ <GIC_SPI 85 IRQ_TYPE_LEVEL_HIGH>, /* GPIOJ */ <GIC_SPI 140 IRQ_TYPE_LEVEL_HIGH>; /* GPIOK */ clocks = <&ccu CLK_APB1>, <&dcxo24M>, <&rtc_ccu CLK_OSC32K>; clock-names = "apb", "hosc", "losc"; gpio-controller; #gpio-cells = <3>; interrupt-controller; interrupt-parent = <&gic>; #interrupt-cells = <3>; } Pinctrl的设备树描述如上,interrupt-controller标识了其是一个中断控制器,interrupt-parent为gic。interrupts字段标识了各个bank连接到GIC上的中断号,一共有10个bank,对应的就是gic控制器10个中断输入,在kernel_init阶段会遍历解析interrupts字段逐一建立GIC的中断号映射(2.3章节中的描述),所以Pinctrl的bank的中断映射已经建立完成了,如下图中69->93,71->94,73->95的映射。gpio-controller标识了该节点还是一个gpio的控制器,gpio-cells标识引用的描述方法,这里有3个字段,如下是一个wlan设备节点对GPIO控制器的引用,wlan_regon = <>对应的就是gpio-cells的描述,&pio表示引用了pio这个gpio控制器,PB1表示使用的该GPIO控制器上的GPIO号,GPIO_ACTIVE_HIGH表示该GPIO号默认的电平状态。 wlan { compatible = "xxx,xxx-wlan"; wlan_en = <&pio PB 1 GPIO_ACTIVE_HIGH>; }; GIC的驱动匹配compatible的定义使用IRQCHIP_DECLARE(gic_v3, \"arm,gic-v3\", gic_of_init)来实现,在start_kernel的时候进行解析,而pinctrl的驱动则是由各厂商来进行实现,下面是示例。 static int xxx_pinctrl_probe(struct platform_device *pdev) { int ret; return xxx_bsp_pinctrl_init(pdev, &xxxx_pinctrl_data); } static struct of_device_id xxx_pinctrl_match[] = { { .compatible = "xxx,xxx-pinctrl", }, //与设备树pio: pinctrl@xxxx 对应 {} }; MODULE_DEVICE_TABLE(of, xxx_pinctrl_match); static struct platform_driver xxx_pinctrl_driver = { .probe = xxx_pinctrl_probe, .driver = { .name = "xxx-pinctrl", .pm = PINCTRL_PM_OPS, .of_match_table = xxx_pinctrl_match, }, }; static int __init xxx_pio_init(void) { return platform_driver_register(&xxx_pinctrl_driver); } fs_initcall(xxx_pio_init); Xxx_pio_init注册一个pinctrl的驱动,驱动的描述在xxx_pinctrl_driver中,其中保活了of_device_id的信息将与设备树进行匹配,最后调用xxx_pinctrl_probe函数进行初始化。 上图只列出pinctrl与中断相关的关键流程,下面进行简要说明。 (1)分配一个struct gpio_chip结构,该结构中存储了gpio的操作函数集合。 (2)分配一个数组pctl->irq,用于存储每个bank对应的irq number。 (3)调用irq_domain_add_linear为pinctrl创建一个中断domain,每个中断控制器都有一个domain。 (4)调用platform_get_irq获取每个bank的irq number(virq),对应gpio控制器中interrupts的描述,该函数会调用of_irq_get来获取中断号,of_irq_get中会判断hwirq是否已经建立了到virq的映射,在kernel_init中会解析DTS所有的interrupts统一建立好根中断控制器的中断号映射关系,因此这里调用of_irq_get中调用virq= irq_find_mapping(domain, hwirq)即可查询到hwirq对应的virq。 (5)遍历每个bank,调用irq_create_mapping将bank上输入的gpio建立起中断映射,即挂接在每个bank上的gpio都用于一个irq desc描述符。 (6)为每个gpio对应的irq设置irq chip和desc->handler,在中断注册请求的时候还会调用.irq_set_type进行设置一次,这个流程看起来有点多余。 (7)调用irq_set_chained_handler_and_data为每个bank的irq注册回调函数,注册了该回调函数,对于GPIO的中断就于直接连接到GIC的中断不同,GPIO的中断会先调用irq_set_chained_handler_and_data注册的中断xxx_pinctrl_irq_handler,而直接连接到GIC的中断调用handle_fasteoi_irq(见2.5章节)。 小结,Pinctrl的初始化中创建了一个domain,先是获取到bank的irq,在建立起bank外接GPIO的中断号映射得到irq,最后会为bank的irq和gpio的irq分别注册回调函数,bank是直接连接到gic上的,所以在中断处理时系统首先响应bank对应的irq中断处理函数,继而再bank 对应的irq处理函数中查询具体是挂在该bank上的那一个gpio,进行处理该gpio的irq中断,可以结合下一章节的中断处理流程就比较清楚初始化的流程。 中断处理流程 与未级联的处理流程相比,多了一级。在第二级中插入了一级响应,第一级响应后下陷不再是调用到handle_fasteoi_irq,而是调用pinctrl初始化是调用irq_set_chained_handler_and_data注册的中断处理函数,因为pinctrl中断控制器是一个bank对应一个gic的中断输入,而bank上又外挂了多个gpio,即多个gpio对应的是一个gic的输入,因此需要插入一级先处理bank的中断响应,在该中断响应中识别出bank上的那一个gpio,获取到该gpio的irq再进行处理第三级和第四级。 对应上述的第1、2、3级是处于中断上下文中。
  • 中断基本概念

    中断基本概念

    在现代嵌入式系统中,处理器上会挂接很多个外设,CPU在执行任务的时候,可能会同时由多个中断发生,那么中断必要要进行响应处理并维护一个队列一一运行,这样自然会影响CPU的效率,为了让CPU专注于实际运算,中断控制器孕育而生,各中断信号源都先交给中断控制器处理,由中断控制器进行管理,同时接受多个中断请求并进行优先级判断,然后选中一个最高优先级的请求送个CPU进行处理,在CPU响应处理中断时,中断控制器仍然可以响应外部中期的请求,在多核系统中,中断控制器还可以承担路由的作用,将某些中断送到指定CPU进行处理,以达到中断处理的负载均衡。在X86架构中断控制器称为APIC(Advanced Programmable Interrupt Controller),ARM架构的中断控制器则称为GIC(Generic Interrupt Controller),本文主要探讨的是GIC模块。 ARM架构的GIC目前已经发展了多个版本,不同的GIC IP使用于不同的ARM架构,下面是不同版本直接的关键features。 Version Key features Architecture Tpically used with GICv1 Support for up to eight PEs.Support for up to 1020 interrupt IDs.Support for two Security states. A5/A9/R7 GICv2 All key features of GICv1.Support for virtualization. GIC400 A7/A15/A53/A57 GICv3 All key features of GICv2.Support for more than eight PEs.Support for message-based interrupts.Support for more than 1020 interrupt IDs.System register access to the CPU Interface registers.An enhanced security model, separating Secure and Non-secure Group 1 interrupts. GIC500 A72/A53/A57 GICv4 All key features of GICv3 and.Direct injection of virtual interrupts GIC600 A53/A57/A72 GICv2架构 GICv2将中断源类型进行分类。 类型 中断号范围 说明 SGI ID0~ID15 Software Generated Interrupt,软件触发中断,通常用于多核之间通信。 PPI ID16~ID31 Pivate Peripheral Interrupt,每个处理核私有中断,如timer,hw fault等。 SPI ID32~ID1019 Shared Peripheral Interrupt,公用外设中断,最多支持998个。 中断源类型的划分是输入给GIC的角度进行的,而GIC输出到CPU只有IRQ(Interrupt Request)和FIQ(Fast Interupt Request),FIQ主要用于安全OS,不适用于Linux内核本文也暂不讨论。 GIC在GICv2中,有两大模块组成,distributor和interface。 Distributor:实现中断的分发,从上图可以看出,SPIs Interrupt ID 32~1019中断是共享的,将会根据事先配置的寄存器GICD_xxx选择最高优先级的中断发往CPU核,PPI、SGI是各CPU独有的中断,不参与目的core的仲裁。distributor对中断主要提供全局中断使能、每个中断使能、中断优先级、中断分组、中断目的CPU核、中断触发方式、中断状态管理、可修改中断的Pending状态等。 Interface:将GICD发送的中断信息,通过IRQ/FIQ传输给CPU核。这里分为CPU interface和Virtual CPU interface,后者暂未接触先不讨论。interface提供将中断请求发送给CPU、CPU对中断进行确认、中断处理通知完成、设置中断优先级屏蔽等。 GICv3架构 类型 中断号范围 说明 SIGs ID0~ID15 Software Generated Interrupt,软件触发中断,通常用于多核之间通信。 PPIs ID16~ID31 Pivate Peripheral Interrupt,每个处理核私有中断,如timer,hw fault等。 SPIs ID32~ID1019 Shared Peripheral Interrupt,公用外设中断,最多支持998个。 ID1020~ID1023 Used to signal special cases 1024~8191 Reserved LIPs 8192~ Locality-specific Peripheral Interrupt,外设不通过专用中断线向GIC发中断,而是基于消息的中断,配置信息存储在memmory中,通过ITS可以解析消息发送给Redistributor触发中断。 相对于GICv2架构GICv3多一个模块,Resdistributor。 Distributor:主要用来管理配置SPIs类型的中断。 Resdistributor:每个PE(process element,即cpu核)都对应一个,所以想要的配置是针对某个CPU核的,主要用来配置SGI和PPI类型中断。 CPU Interface:每个PE对应一个interface,与GICv2的interface一致。 引入LPIs后,对于中断源触发信号给中断控制器就可以分为两类,peripheral interrupt signal和Peripheral interrupt message。 在GICv3,SPIs类型也是可以配置成message 类型中断,但是LPIs只能是message类型中断。 GIC V3版本中寄存器如上图所示,提供了两种方式访问,memory-mapped和系统寄存器访问。 寄存器 说明(memory-mapped访问方式) GICC CPU interface寄存器 GICD Distributor寄存器 GICR Redistributor寄存器 寄存器 说明(系统寄存器方式方式) ICC 物理 CPU interface系统寄存器 中断处理过程 状态机 上图是中断控制器的状态机,四种状态适用于SPI,PPI和SGI类型的中断源,而LPIs没有active或active and pending状态。 Inactive:中断源没有触发信号。 Pending:中断源触发了信号,GIC会将IAR(Interrupt Acklowlege Register)中该中断源对应bit置1,然后通知了CPU,但CPU没有响应ACK给GIC。 Active:CPU读取了IAR寄存器置1的位,CPU通过interface读取IAR的寄存器表示确认了这个中断并开始处理,此时中断源进入Active状态。 Active and Pending:CPU ACK了这个中断请求后,中断控制器就解除了对该中断源的屏蔽,控制器可以继续响应中断,那么当CPU还在处理中断服务器程序时,此时中断源又触发了信号给中断控制器,此时的状态就处于Acitive and Pending,简称AP。 当CPU完成了中断程序处理,就会写中断控制器的EOI(End of Interrupt)寄存器,中断源的状态就会再次回到inactive状态。 CPU在处理中断程序期间是屏蔽掉IRQ的响应的,处理完成之后才会打开中断响应。这里要注意的是CPU的中断屏蔽和中断控制器的屏蔽是两回事,CPU屏蔽的是IRQ而中断控制器屏蔽的是SPIs/PPIs/SGIs/LPIs。 触发方式 电平触发 电平触发方式时序图如上,可以分为以下几个阶段变化: Inactive to Pending:中断源触发了信号给到中断控制器,GIC收到信号后,如果CPU使能了该中断则触发信号给PE。上图Peripheral To GIC的信号被拉高,继而触发GIC To Core的信号被拉高。 Pending to AP:PE通过CPU interface读取了IARs寄存器,状态转为Active。由于CPU操作的是中断控制器,但并没有对设备进行处理,所以设备给中断控制器的触发信号高电平会一直维持,因此会再次触发中断控制器对应bit置位,表示中断又来了。 AP to Active:对应高电平的触发类型,通常需要CPU收到中断信号后再去响应外设,如写外设相关寄存器或读写外设的数据等,外设的中断源才会取消触发(高电平->低电平)。状态再次从AP转到Active。 Active to Inactive:CPU处理完了中断响应程序,然后写中断控制器EOIRs寄存器,表示中断处理完成。 边沿触发 Inactive to Pending:中断源触发了信号给到中断控制器,GIC收到信号后,如果CPU使能了该中断则触发信号给PE,在CPU没有ACK之前,GIC to Core的电平会被一直拉高,直到CPU做出ACK动作。 Pending to Active:与电平触发类似。 Active to A&P:Linux在处理中断过程中是不允许中断嵌套的,但此时如果有外设再次触发中断,但也不影响,因为中断控制器可以进行处理相当于锁存主第二个中断的到来,这样第二个中断也不会丢失。 A&P to Pending:边沿触发方式通常不需要CPU对外设进行中断信号清除,当cpu处理完了当前中断后,会转入准备处理第二个中断。 边沿触发有丢中断的可能,一般情况下可靠性高的使用电平触发方式。 中断控制器的pengding位,在软件处理完成写eoi寄存器后硬件自己会清除,所以软件不需要清pending,但是对于外设是电平类型触发来说,需要清除外设的电平触发源(操作的不是中断控制器,是外设)。
  • Arm64体系结构简介

    Arm64体系结构简介

    ARM简介 ARM版本 典型处理器 主要特性 v1 26位地址空间 v2 增加乘法、乘加法、支持协处理指令等 v3 地址空间扩展到32位,增加SPSP和CPSR等 v4 ARM7TDMI/ARM920T 增加Thumb指令等 v5 ARM926EJ-S 增加Jazelle和VFPv2等 v6 ARM11 MPCore 增加SIMD、TrustZone以及Thumb-2等 v7 Cortex-A8/Cortex-A9 增加NEON和VFPv3/v4扩展 v8 Cortex-A72 支持32位和64位指令集处理器体系结构 v9 Cortex-x2 支持可伸缩矢量扩展计算、机密计算体系结构 A系列:面向性能密集型系统的应用处理器内核。 R系列:面向实时应用的高性能内核。 M系列:面向各类嵌入式应用的微控制器内核。 Armv8体系结构 通用寄存器 AArch64状态下支持31个64位通用寄存器,分别是X0~X30,而AArch32状态支持16个32位通用寄存器。在AArch64状态下使用X表示64位寄存器,另外还可以使用W来表示32位的数据。X29(FP)是栈帧寄存器,指向栈的顶部(SP指向栈的当前位置),X30(LR)是链接寄存器,保持函数返回地址。 下图是AArch32状态下AArch64到AArch32状态的寄存器mapping。 处理器状态 Aarch64使用了PSTATE寄存器来表示当前处理器的状态(armv7使用的是CPSR),上图中I和F表示cpu本地中断的屏蔽位。SP为选择SP寄存器,在不同的异常等级下,SP寄存器是不一样的(见下小节),当运行在EL0是即为SP_EL0。 特殊寄存器 除了31个通用的寄存器外,还有上面的特殊寄存器。不同特权模式下的寄存器有所差别。 - XZR/WZR:两个零寄存器。WZR是32位零位寄存器,XZR是64位零位寄存器。 - PC:指向当前运行指令的下一条指令地址。 - SP:每个异常等级都有一个专门的SP寄存器SP_ELn。高等级的特权模式可以访问低等级的SP寄存器。 - SPSR:备份程序状态寄存器,运行一个异常程序是,处理器的一些状态信息会保存在备份程序状态寄存器里,如会将处理器的PSTATE寄存器的值暂时保存到SPSR里,当处理完成返回时,再把SPSR的值恢复到PSTATE寄存器。 - ELR:存放异常返回地址。 异常 异常类型 中断 中止 复位 系统调用 异常等级 Armv8支持aarch64和aarch32两种执行状态,aarch64是64位执行状态,运行A64指令集;aarch32是兼容armv7架构32位执行状态,运行A32的指令集。aarch64有4中异常等级。 EL0:用户特权,用于运行普通的用户程序。 EL1:系统特权,用于操作系统内核。如果使能虚拟化,则运行虚拟机操作系统内核。 EL2:运行虚拟化扩展的虚拟机监控器(hypervisor)。 EL3:运行安全os下的监控器(secure monitor)。 除了EL0以外,每个异常等级都有两个选择,选择自己等级或使用EL0等级,如果选用自己等级SP为SP_ELn(SP_ELnH),如果使用EL0那么SP为SP_EL0(SP_ELnt)。 ARMv8有4种以上等级,只有在获取异常或异常返回发生等级的变化。当处理器从较高等级切换到较低等级时,执行状态可以不变或者可以从aarch64切换到aarch32。当处理器从较低等级切换到较高等级时,执行状态可以不变或从aarch32切换到aarch64。 中断发生硬件处理过程 当事件触发异常发生时,处理器硬件会自动的执行以下动作。 1. 进入异常: (1)将PC保存到ELR_ELx (2)PSTATE保存到SPSR_ELx,会关闭到cpu本地的IRQ和FIQ中断,防止中断嵌套 1. 退出异常: (1)将ELR_ELx的数据保存到PC (2)将SPSR_ELx的数据保存到PSTATE,打开中断。 以上过程都是硬件自动完成。硬件处理完成后,就进入程序处理阶段,分为三个阶段保存上下文、中断处理、恢复上下文。 上下文:进入中断后,需要将打断的应用当前的相关寄存器保存起来,一般是存储在当前任务内核堆栈里面,当中断退出时,再从堆栈中获取恢复。 中断处理:包括确定中断源,将触发的中断源屏蔽,接着再处理中断服务程序。 异常向量表 异常发生时,处理器会跳转到相应位置执行异常相关指令。异常相关的处理指令通常存储在内存种,这个存储位置为异常向量,在ARMv8体系结构中,EL1、EL2、EL3都有一个异常向量表,每个异常向量表存储的基地址在VBAR_ELx寄存器中,EL0没有异常向量表,那是因为EL0的异常发生在用户态,当用户态发生发生异常时系统会跃迁到内核态EL1处理,所以EL0没有异常向量表,也可以认为EL0和EL1是共用EL1的异常向量表。 下图是一张异常向量表的示例,可以看到异常向量表分为4组,每组有4种类型。 异常向量表为什么分为4组? (1)当系统的发生异常处于内核态,异常等级为ELx(EL1/EL2/EL3)时,可以选择使用SP_EL0或SP_ELx来作为栈指针,所以分为Current EL with SP0和Current EL with SPx两组。目前对于arm64的嵌入式设备来说,通常只有到EL1等级,EL2用于虚拟化,EL3用于安全基本,所以EL2和EL3基本没有用。 (2)当系统的发生在用户态(EL0),会下陷到内核变为EL1,但是会根据执行状态是aarch64或者aarch32来区分两组,Lower EL using AArch64或Lower EL using AArch32。 arch/arm64/kernel/entry.S SYM_CODE_START(vectors) #内核态发生异常,异常等级为EL1,但是栈依旧使用的是SP_EL0 kernel_ventry 1, t, 64, sync // Synchronous EL1t kernel_ventry 1, t, 64, irq // IRQ EL1t kernel_ventry 1, t, 64, fiq // FIQ EL1h kernel_ventry 1, t, 64, error // Error EL1t #内核态发生异常,异常等级为EL1,但是栈使用是SP_EL1 kernel_ventry 1, h, 64, sync // Synchronous EL1h kernel_ventry 1, h, 64, irq // IRQ EL1h kernel_ventry 1, h, 64, fiq // FIQ EL1h kernel_ventry 1, h, 64, error // Error EL1h #用户态发生异常,异常等级为EL0(切到内核会转为EL1),到内核后执行状态是aarch64 kernel_ventry 0, t, 64, sync // Synchronous 64-bit EL0 kernel_ventry 0, t, 64, irq // IRQ 64-bit EL0 kernel_ventry 0, t, 64, fiq // FIQ 64-bit EL0 kernel_ventry 0, t, 64, error // Error 64-bit EL0 #用户态发生异常,异常等级为EL0(切到内核会转为EL1),到内核后执行状态是aarch32 kernel_ventry 0, t, 32, sync // Synchronous 32-bit EL0 kernel_ventry 0, t, 32, irq // IRQ 32-bit EL0 kernel_ventry 0, t, 32, fiq // FIQ 32-bit EL0 kernel_ventry 0, t, 32, error // Error 32-bit EL0 SYM_CODE_END(vectors)
\t