JVM——线程池实现原理
线程池实现原理
- 1、对象池设计模式
- 2、生产者—消费者模式
- 2.1、设计原理
- 2.2、实现案例
- 3、普通线程池的实现原理
- 3.1、设计原理
- 3.2、源码分析
- 4、FutureTask实现原理
- 4.1、设计原理
- 4.2、源码分析
- 5、ScheduledThreadPoolExecutor实现原理
- 5.1、设计原理
- 5.2、源码分析
- 6、Executors实现原理
1、对象池设计模式
当需要频繁使用对象,但对象创建和销毁特别昂贵(很耗费系统资源),并且每个对象使用时间非常短,那么这种高频但效率低的对象使用会严重影响程序的执行效率。为了提高对象使用效率,工程师发明了对象池模式,如图所示。
对象池模式创建了一组可以重复使用的对象。当需要一个对象时,它会从对象池中租用一个空闲的对象。如果先前准备好的对象可用,则立即返回,避免实例化成本。如果池中不存在任何空闲对象,则创建一个新的对象并返回。当对象已经执行完成并且不再使用时,会将它归还到对象池中,归还对象池中的对象可以再次被其他线程使用。
如果一个对象在使用过程中出现了异常,不满足再次被使用的情况,是不能放到对象池中的。在某些对象池中,资源是有限的,因此指定了最大对象数。如果达到最大对象数量,在请求新的对象时,可能会引发异常或者线程被阻塞,直到有对象释放回池中。
对象池从逻辑架构上可以分为4个模块:对象创建工厂、已使用队列、空闲队列、对象池大小动态调节器。对象池逻辑架构如图所示。
1. 对象创建工厂
对象创建工厂负责对象的创建与销毁工作。在对象创建时,工厂需要完成对象的创建、初始化、基础属性的赋值等工作。在对象销毁时,工厂需要完成对象资源的释放、对象引导的释放,并加快垃圾回收工作。
2. 已使用队列
已使用队列是用来存储从对象池中租借出去的对象。当一个程序从对象池中租借对象时,对象池先从空闲队列中获取空闲的对象,然后将对象加入已使用队列,最后返回空闲的对象。当关闭应用时,需要将对象池中所有已使用的对象进行销毁,防止资源泄露。
3. 空闲队列
空闲队列存储着未被线程使用的空闲对象。当应用执行完任务时,程序会归还对象,对象池将归还的对象放入空闲队列。在整个应用关闭时,需要将对象池中所有空闲对象进行销毁,防止资源泄露。
4. 对象池大小动态调节器
每个业务系统都会有高峰与低谷。在业务高峰时,对象使用的频率非常高,在业务低峰时,对象的使用频率比较低,所以对象池必须具备动态调节对象数量的能力。对象池用3个参数来调节大小:min、count、max。min表示最小的对象个数,count表示当前对象的个数,max表示最大的对象个数。在对象池初始化时,对象池会根据min来初始化一批对象供应用程序调用,能够满足业务低峰期使用需求。在业务高峰期到来时,应用程序从对象池中租借对象,对象池发现空闲队列中没有对象了,就会触发扩容,对象池会创建新的对象返回给应用程序。当到达最大值max之后就不会创建新的对象了,让应用程序进入等待。在业务低峰期时,空闲队列中的对象非常多,对象池会再次触发缩容,移除并释放一部分空闲的对象,防止过多空闲对象占用内存。
2、生产者—消费者模式
在多线程开发过程中,往往会有一批线程负责数据的生产,另一批线程负责数据的处理,系统很难保证两部分线程的执行效率是一致的。如果生产数据的速度很快,而处理数据的速度很慢,很容易造成数据没地方存储。如果数据处理的速度大于数据产生的速度,那么数据处理的线程就会经常处于等待状态。为了解决这个问题,图灵奖获得者Edsger W. Dijkstra教授于1965年提出了生产者—消费者模式。
生产者—消费者模式采用了一个缓存区来解决数据的生产与处理之间的速度平衡问题。产生数据的线程称作生产者,处理数据的线程称作消费者,缓存区称作消息队列,如图所示。生产者和消费者不直接通信,而是通过消息队列来进行通信。生产者生产完数据之后直接传递到消息队列,当消息队列满了之后会进行阻塞,这样就能够控制消息的产生速度。消费者直接从消息队列获取数据,如果消息队列空了,就将消费者线程阻塞。这样整个消息队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
2.1、设计原理
整个生产者—消费者模式的核心模块是消息队列,其必须具有:高效的修改能力、并发控制的能力、快速定位队列长度的能力、阻塞生产线程与消费线程的能力。
1. 数据结构
消息队列面临着非常高频的数据插入与数据移除场景,所以在底层的存储结构设计上要支持高性能修改的能力。消息队列一般会采用链表与环形数组作为底层存储形式,其数据结构如图所示。
链表每次的插入与删除都只用改变链表的指针就可以了,天生对修改友好。而环形数组是一段连续线性空间,每次插入时会移动插入的索引,每次移除时会修改读取的索引,不会改变队列的结构,所以修改性能非常好。
2. 并发控制
因为消息队列时刻面临着多个线程在同时修改,所以需要确保多线程修改的安全性。一般会在普通的队列上应用锁机制,例如采用synchronized关键字、ReentrantLock独占锁等同步机制来完成队列的并发控制。消息队列并发控制如图所示。
3. 队列长度
在向队列中插入数据时,生产者需要知道队列是否满了,当队列满了就进行等待。从队列中获取数据的时候,消费者需要知道队列是否空了,当队列空了就进行等待。所以队列要有快速感知队列中元素个数的能力。一般队列内部会设置一个int或者AtomicInteger类型的count变量来表明队列的元素个数,每次添加元素的时候会对count加1,每次移除元素的时候会对count减1。消息队列长度控制如图所示。
4. 线程阻塞、唤醒
队列需要具备主动阻塞、唤醒线程的能力。生产者向队列插入数据时,如果队列满了,队列能够主动阻塞生产者线程。当队列有容量时,队列能够主动唤醒生产者线程,让它继续插入数据。当消费者从队列中移除数据时,如果队列空了,队列要能主动将消费者线程阻塞。当队列有数据时,队列要能主动唤醒消费者线程,让它继续读取数据。
下表总结了Java提供的各种线程安全的消息队列特性。
2.2、实现案例
下面以LinkedBlockingQueue为消息队列来构建一个生产者与消费者的模式,消息队列中存储的是String的字符串消息。生产者通过循环不停产生随机的字符串,并将其插入消息队列中。消费者不停地从消息队列中获取字符串,并打印字符串信息。此案例的UML图如图所示。
Producer实现了Runnable接口,并在内部定义了LinkedBlockingQueue的实例queue,可以在构造函数里对消息队列进行赋值。run方法实现了不停地产生消息,并通过消息队列的put方法向消息队列中插入数据。Producer实现如代码所示。
Consumer也实现了Runnable接口,获得了线程的能力,在内部定义了LinkedBlocking-Queue的实例queue,可以在构造函数里对消息队列进行赋值。run方法不停调用消息队列的take方法来获取数据,并打印出消息队列的信息。Consumer实现如代码所示。
ProducerConsumer是整个生产者—消费者模式的构建类,在内部定义了LinkedBlocking-Queue变量来存储字符串,并利用消息队列构建了生产者线程与消费者线程,其实现如代码所示。
3、普通线程池的实现原理
Java的线程对象的生命周期大致包括三个阶段:创建阶段T1、使用阶段T2、销毁阶段T3。Java线程对象的生命周期如图所示。
每个Java线程至少需要构建5个关联对象,非常消耗系统资源。线程的销毁需要将前面创建的对象销毁掉,需要进行大量内存清理与回收。在线程对象的生命周期中,只有使用阶段才是真正对业务系统是有意义的,创建阶段与销毁阶段会带来大量系统资源损耗。为了提高线程使用效率,JDK1.5提供了线程池ThreadPoolExecutor。ThreadPoolExecutor采用了对象池设计模式,将线程作为一种对象缓存在对象池,让线程能够重复使用,极大地提高了程序运行的性能与效率。
3.1、设计原理
ThreadPoolExecutor采用了生产者—消费者模式与对象池模式相结合的设计模式,保留了生产者模式中的生产者与消息队列,消费者是对象池中的工作线程。因为对象池里的对象不需要被外部租用了,所以将空闲队列与已使用队列合并成了一个工作线程队列。在此基础上,线程池增加了任务调度模块与状态管理模块。任务调度模块负责协调线程任务的提交与执行。状态管理模块负责控制线程池的启动与停止。ThreadPoolExecutor逻辑架构如图所示。
1. 任务调度器
任务调度器是整个线程池的核心控制模块,当线程池收到业务线程提交的任务后,由任务调度器进行统一调度处理。任务调度器主要负责创建线程执行任务、将任务放入任务队列、拒绝执行任务。任务调度流程如图所示。
任务调度器按照尽可能少地建线程的原则来进行调度。线程池首先会判断当前线程数量是否小于核心线程数。如果当前线程数小于核心线程数,线程池就创建线程来执行任务。如果当前线程数大于核心线程数,线程池会将任务加入任务队列。如果任务队列也满了,接着判断线程数是否小于最大线程数:如果当前线程数小于最大线程数,线程池会创建非核心线程来执行任务。如果队列满了,且当前线程数也达到了允许的最大线程数,线程池就会启动拒绝策略来拒绝任务。线程池任务执行流程如图所示。
虽然慢启动的调度策略能够减少系统资源损耗,但是在高并发的场景里,会延迟任务执行时间。系统高峰时,大量的任务会被挤压任务队列里得不到处理。
2. 任务队列
ThreadPoolExecutor采用的任务队列是LinkedBlockingQueue。BlockingQueue是一个接口,具体的功能由子类实现。ThreadPoolExecutor可选任务队列如表所示。
LinkedBlockingQueue具有更高的并发性能,所以是线程池默认的任务队列。
3. 工作线程
当创建一个工作线程时,线程池可以指定一个任务。工作线程会优先执行线程池指定的任务。在完成指定任务后,工作线程会从任务队列中获取任务来执行。工作线程执行流程如图所示。
工作线程会不断地从队列中获取并执行任务。当任务队列为空时,工作线程会被任务队列阻塞。工作线程的执行过程是基于AOP(面向切面编程)思想来设计的,如图所示。ThreadPoolExecutor在线程任务执行前后提供了2个扩展方法:beforeExecute方法与afterExecute方法。beforeExecute方法可以用来执行准备工作(前置处理工作),afterExecute方法可以用来进行善后工作(后置处理工作)。
4. 生命周期管理
为了能够安全停止,线程池定义了5种生命周期状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。线程池的生命周期如图所示。
线程池定义了一个Integer类型的原子变量clt。clt的高3位用来记录线程池状态,后面的29位用来记录线程个数。线程池状态说明如表所示。
5. 调整线程池大小
为了灵活控制线程池的运行效率,ThreadPoolExecutor提供了动态调整线程池中核心线程数大小的能力。核心线程数的动态调整如图所示。
线程池会判断新设置的核心线程数是否比当前线程数小。如果新设置的核心线程数比当前线程数小,则线程池会将所有阻塞在消息队列中的线程销毁。如果新设置的核心线程数比当前线程数大,则线程池会创建新的工作线程。
6. 拒绝策略
当线程池达到了最大线程数且任务队列已满时,就会启用拒绝策略来拒绝任务。线程池提供了4种拒绝策略,详细信息如表所示。
3.2、源码分析
Worker实现了Runnable接口获得独立运行的能力,继承了AbstractQueuedSynchronizer,获得了锁的能力。ThreadPoolExecutor定义了workers、workQueue、mainLock等变量。workers用来存储所有工作线程。workQueue是线程池的任务队列。mainLock是独占锁,用来实现线程池的并发控制。ThreadPoolExecutor的UML图如图所示。
1. 工作线程定义
Worker是工作线程的实现,负责执行线程任务,定义了thread、firstTask、completedTasks变量。thread是当前执行任务的线程。firstTask是工作线程创建时指定的任务,如果firstTask不为空,Worker会优先执行firstTask的任务,执行完firstTask任务后,Worker再从任务队列中取任务。如果firstTask为空,则工作线程会直接从队列中获取任务。completedTasks表示完成的任务数,每完成一个任务,将completedTasks加1。下面代码是Worker的具体代码实现。
Worker的构造函数会调用线程工厂ThreadFactory的newThread方法来创建具体的线程,并将thread指针指向当前创建的线程。
Worker实现了AQS的tryAcquire方法(获取独占锁信号)和tryAcquire方法(释放锁信号)。同时,Worker内部也定义了独占锁获取与释放的相关方法。
2. 执行线程任务
runWorker方法用于执行具体线程任务。runWorker方法执行流程如下。
- 判断firstTask是否为空:如果firstTask不为空,先执行firstTask;如果firstTask为空,从任务队列中获取任务。
- 获取独占锁,只有成功获取到锁了才能执行任务,防止执行任务过程中线程被线程池中断。
- 确保工作线程与线程池的状态是一致的。如果线程池是STOP状态,要确保当前线程是中断状态。如果线程池不是STOP状态,要确保当前线程没有被中断。
- 调用beforeExecute方法,执行前置处理工作。
- 调用任务的run方法执行任务。
- 调用afterExecute方法,执行后置处理工作。
- 释放独占锁。
线程任务执行过程如代码所示。
在执行任务时,工作线程需要获取独占锁,确保线程执行过程中不被打扰,整个流程如图所示。
3. 获取线程任务
getTask方法用于从任务队列中获取任务,如代码所示。它首先调用workQueue的poll与take方法从任务队列中获取任务。如果发生以下情况,getTask方法会停止获取任务。
- 如果线程池处于SHUTDOWN状态,且任务队列为空,getTask方法会停止获取任务。
- 如果线程池处于STOP状态,getTask方法会停止获取任务。
- 如果线程池处于正常状态,但当前的线程数大于核心线程数,getTask方法会停止获取任务。
- 如果设置了任务获取超时时间,在规定的时间内没能获取到任务,getTask方法会停止获取任务。
4. 线程池变量
线程池内部定义了AtomicInteger类型的变量ctl,ctl的高3位用来表示状态,低29位表示线程数量。runStateOf方法的功能是将clt的值转换成线程的状态,workerCountOf方法的功能是将clt的值转换成线程的数量。线程池的常量定义如代码所示。
5. 线程池任务调度
execute方法是线程池提交任务的入口方法,执行流程如下。
- execute方法首先会判断当前线程数量是否小于核心线程数,如果当前线程数小于核心线程数就调用addWorker方法创建核心线程来执行任务。
- 如果当前线程数大于核心线程数,execute方法就将任务插入任务队列。
- 如果任务队列也满了,接着判断当前线程数是否小于线程池的最大线程数。如果当前线程数小于最大线程数,execute方法就调用addWorker方法创建非核心线程来执行任务。
- 如果当前线程数大于最大线程数,execute方法就调用reject方法拒绝任务。
如下代码是线程池任务调度的实现。
addWorker方法有3个核心功能:创建工作线程、将工作线程加入线程池、执行工作线程。addWorker方法有2个参数:firstTask与core。firstTask是初始线程任务,core表示是否为核心线程,这两个参数会构成4个业务场景,如表所示。
addWorker方法会先判断线程池的状态,如果线程池被关闭则无法创建线程。如果参数core为true,且当前线程数大于核心线程数,则直接返回。如果参数core为false,且当前线程数大于最大线程数,则直接返回。接着调用Worker的构造函数来创建工作线程,创建成功后会将其加入工作线程队列workers中,然后调用start方法来启动工作线程。工作线程创建过程如代码所示。
6. 调整线程池大小
ThreadPoolExecutor提供了2个方法来动态调整线程池大小,setCorePoolSize方法用来调整核心线程数的大小,setMaximumPoolSize方法用来调整最大线程数的大小。
setCorePoolSize方法会将新设置的核心线程数与原来的数的差值记作delta。如果当前线程数大于核心线程数,线程池就调用interruptIdleWorkers方法回收空闲的线程。如果delta值大于0说明需要扩容,则调用addWorker方法来创建新的核心线程,如代码所示。
如代码所示,setMaximumPoolSize方法会判断当前线程数是否大于新设置的最大线程数,如果大于就调用interruptIdleWorkers方法来实现缩容。
interruptIdleWorkers方法的功能是通过中断来结束工作线程。如代码所示,它首先获取到线程池锁,确保只有一个线程能操作线程池,然后对workers中的线程进行遍历,如果线程处于空闲状态,则调用interrupt方法中断线程。
7. 关闭线程池
ThreadPoolExecutor提供了2个方法来实现线程池关闭。shutdown方法是缓慢关闭线程池,shutdownNow方法是暴力停止线程池。
shutdown方法会将线程池的状态设置成SHUTDOWN,然后调用interruptIdleWorkers方法中断工作线程,然后调用onShutdown方法进行后续的处理。线程池正常关闭的实现如代码所示。
shutdownNow方法会先将线程池的状态设置成STOP,接着调用interruptIdleWorkers方法中断工作中的线程,最后调用drainQueue方法清空并备份任务队列中的所有任务。线程池强制关闭的实现如代码所示。
4、FutureTask实现原理
4.1、设计原理
Future模式是多线程设计中衍生出来的一种设计模式,它的核心思想是异步调用。当业务线程需要执行耗时的任务时,可以先将任务提交到线程池去执行,业务线程可以继续处理其他逻辑,处理完其他逻辑之后,业务线程再获取前面任务执行的结果。
将任务提交到线程池后,线程池先返回一个Future契约。业务线程可以凭借这个契约去获取线程的执行结果。异步任务执行的过程如图所示。
FutureTask由3个核心部件组成:状态值、结果数据、等待队列,状态值用来表示当前任务的状态,结果数据用来存储任务执行的结果,等待队列是用来存储获取结果的等待线程。
FutureTask首先会将任务设置成开始执行的状态,接着去执行任务。在任务执行完成之后,FutureTask首先保存任务的结果数据,接着修改任务的状态,最后唤醒等待队列中所有的业务线程。
当业务线程来获取结果时,FutureTask首先会检查任务的状态。如果任务在执行中,FutureTask会将业务线程加入等待队列中进行等待;如果任务执行完成,FutureTask直接返回执行结果。
FutureTask的任务状态有7种状态值:NEW、COMPLETING、NORMAL、EXCEPTIONAL、INTERRUPTING、INTERRUPTED、CANCELLED。NEW表示任务处于创建状态,COMPLETING表示任务处于执行中的状态,NORMAL表示任务已正常结束的状态,EXCEPTIONAL表示任务异常结束的状态,INTERRUPTING表示任务被中断中的状态,INTERRUPTED表示任务已经中断的状态,CANCELLED表示任务被取消的状态。其中COM-PLETING和INTERRUPTING是一种中间状态,持续时间非常短暂。
FutureTask定义了变量outcome来存储任务结果。如果任务正常结束,FutureTask会将任务结果赋值给outcome;如果任务执行异常,FutureTask会将异常信息赋值给outcome。
4.2、源码分析
Future接口是异步任务的顶层接口。RunnableFuture接口通过继承Runnable接口与Future接口,获得了线程执行与获取任务结果的能力。FutureTask是RunnableFuture接口的具体实现类,也是异步任务的核心逻辑实现类。ExecutorService是Java线程池的顶层接口,它定义了提交线程任务、获取任务结果的方法。AbstractExecutorService继承了ExecutorService接口,是异步任务的线程池实现类,提供了将Runnable与Callable线程任务转换成FutureTask任务的功能,并提供了将FutureTask任务提交到线程池执行的能力。FutureTask的UML图如图所示。
1. Future接口
Future接口是异步任务的顶层接口,用来获取任务结果,如代码所示。
2. RunnableFuture接口
RunnableFuture接口定义如代码所示。
3. FutureTask实现类
FutureTask定义了state、callable、outcome、runner、waiters等变量。state表示任务的当前状态。callable表示具体的执行任务。outcome用来存储任务的执行结果。runner用来表示具体执行任务的线程。waiters是等待获取结果的线程列表的头节点。FutureTask的变量定义如代码所示。
WaitNode是等待获取结果的业务线程节点,它定义了2个变量:thread与next,thread指向等待获取结果的业务线程,next指向链表的后继节点。WaitNode实现如代码所示。
4. 任务执行
run方法的功能是执行异步任务。首先,它通过CAS的方式将当前线程设置为任务线程。然后,它会调用Callable的call方法来执行任务。如果任务执行成功,它会调用set方法来设置任务的执行结果;如果执行出错,它会调用setException方法来设置异常信息。任务执行过程如代码所示。
set方法的功能是修改任务状态、保存任务结果。set方法先将任务的状态设置成COMPLETING,然后将任务结果设置给outcome,最后将任务状态设置成NORMAL。在设置完状态之后,set方法会调用finishCompletion方法来唤醒所有等待结果的线程。set方法代码如代码所示。
finishCompletion方法的功能是唤醒等待结果的业务线程。如代码所示,它首先通过CAS的方式将waiters节点设置为null,然后调用LockSupport的unpark方法依次唤醒所有等待的线程。
5. 重复执行
runAndReset方法提供了重复执行任务的能力。周期性任务就是通过runAndReset方法来执行的,但runAndReset方法无法返回执行结果。重复执行的实现如代码所示。
6. 获取结果
如代码所示,get方法是用来获取异步任务的执行结果的。如果任务没有完成,它会调用awaitDone方法让当前线程进行等待;如果任务执行完成了,它会调用report方法来获取执行的结果。
awaitDone方法的功能是等待任务执行结束。如果任务已经结束了,它就直接返回;如果任务处于COMPLETING状态,它会调用Thread类的yield方法放弃当前CPU的调度。如果任务处于NEW状态,它会将当前线程加入等待队列进行等待。等待任务执行完成实现如代码所示。
report方法的功能是汇总任务执行结果,如代码所示。
7. 取消任务
cancel方法的功能是取消任务,它会根据mayInterruptIfRunning参数来修改任务的状态:如果为true,则将任务设置成INTERRUPTING状态,并触发线程中断;如果为false,则将任务设置成CANCELLED状态。取消任务实现如代码所示。
FutureTask是Java提供的异步线程任务,采用了多线程场景中的Future设计模式。业务线程可以将耗时的任务提交给线程池去异步地处理,自己则可以去处理其他任务,在其他执行完成后再来获取耗时的任务结果,这样就极大减少了整个线程的执行时长。
5、ScheduledThreadPoolExecutor实现原理
ScheduledThreadPoolExecutor是执行延迟任务与周期性任务的线程池。在它之前,延迟任务与周期性任务是通过Timer和TimerTask来实现的。但是Timer是单线程执行的,一旦任务报错会终止整个定时器,其他任务也会受到牵连。ScheduledThreadPoolExecutor是利用线程池来执行定时任务的,每个工作线处理一个定时任务,规避了定时器Timer的缺陷,确保了定时任务能够安全地执行。
ScheduledThreadPoolExecutor通过继承ThreadPoolExecutor获得了线程池的能力。为了更好地适用延迟任务与周期性任务场景,ScheduledThreadPoolExecutor对线程任务、任务队列、调度机制等做了重新定义。
ScheduledThreadPoolExecutor的具体方法如表所示。
5.1、设计原理
为了更好地满足延迟任务与周期性任务的场景,ScheduledThreadPoolExecutor对普通线程池的任务队列、调度机制、线程任务等组件进行了改造。它将原来基于FIFO原则的阻塞队列改造成带有时间权重值的延迟队列。它重新定义了线程任务:ScheduledFutureTask。逻辑结构如图所示。
1. 延迟队列
ScheduledThreadPoolExecutor重新定义了任务队列,采用DelayedWorkQueue作为任务队列。队列头部是最先到期的任务。线程任务会阻塞在队列头部等待任务到期。只有在延迟时间到了后,工作线程才能从队列中取出任务执行。
堆的结构可以分为大顶堆和小顶堆。DelayedWorkQueue底层的数据结构是小顶堆。小顶堆的每个节点的值都小于其左孩子和右孩子节点的值,堆顶的值最小。数组的小顶堆结构如图所示。
基于数组实现的堆能够按照二叉树进行数据的遍历访问,父节点和子节点关系计算公式如代码所示。
ParentIndex是父节点的索引,LeftIndex是左子节点的数组索引,RightIndex是右子节点的数组索引,index是当前节点的数组索引。
将当前数组索引index减1再除以2就可以获取父节点的索引ParentIndex。例如在图中,数值9对应的索引值是4,9的父节点的数组索引(4-1)/2是1,对应的索引值是5。又如,数值10的数组索引是2,它的左子节点的数组索引LeftIndex是2×2+1,也就是5,对应的值是15,它的右子节点的数组索引RightIndex是2×2+2,也就是6,对应的值是11。
在数据插入时,小顶堆会先将数据插入在数组的最后一个位置,也就是插入在叶子节点上。然后将当前插入节点值和其父节点的值进行比较。如果当前节点大于父节点的值,符合小顶堆的规则,则不进行调整;如果当前节点的值小于父节点的值,则需要进行数据交换。节点会依次向上调整,直到根节点或者其中某个节点的值大于其父节点的值。
- 原数组如图所示,在数组最后的位置,也就是在索引10的位置插入4,如图所示。
- 根据(10-1)/2也就是4查找到数组索引为4,对应的索引值是9,然后将4与9进行比较,发现4比9小,让4与9进行交换,如图9-25c所示。
- 接着以索引4为基点,通过(4-1)/2找到索引为1的父节点5,并将5与4进行比较,发现4比5小,接着让4与5进行交换,如图9-25d所示。
- 基于当前节点索引1,通过(1-1)/2找到父节点的索引为0,然将数组索引为0上的3与数组索引为1的4比较,发现3小于4,终止调整。
小顶堆的堆顶元素最小,所以每次删除都是移除堆顶的元素。在删除堆顶元素时,小顶堆首先移除0号索引位置的数据,将数组最后一个位置的数据迁移到0号索引位置上来。然后从0号位置开始逐层向下遍历,比较当前节点与左右子节点的值,找到左右子节点中较小的值,并进行交换,直到小于或者等于左、右孩子中的任何一个为止。
- 移除数组0号下标对应的元素3(见图9-26a),并将数组中最后一个元素9移动到0号下标对应的元素中,如图9-26b所示。
- 从堆顶开始向下遍历查找左右子节点,0号位置的左子节点是4,右子节点是10,发现9比4小,所以9要与4进行交换,如图9-26c所示。
- 查找9的左右子节点,左子节点是7、右子节点是5,最小的子节点是5,所以9与5进行交换,如图9-26d所示。
2. 任务调度
ScheduledThreadPoolExecutor的任务调度与ThreadPoolExecutor任务调度有很大的差别。每次业务线程提交任务到线程池中时,ScheduledThreadPoolExecutor首先会将任务加入延迟任务队列,延迟任务队列会根据任务执行的时间进行重排序,让最早执行的任务移动到队列的头部。
将任务加入队列后,ScheduledThreadPoolExecutor会判断当前线程数是否小于核心线程数。如果当前线程数小于核心线程数,ScheduledThreadPoolExecutor会创建新的工作线程,创建好的工作线程会直接加入线程池。线程池中的工作线程会尝试从延迟任务队列中获取任务。任务调度过程如图所示。
3. 线程任务
周期性任务执行结束之后,ScheduledThreadPoolExecutor会重新计算它的下一个周期时间,然后将任务重新加入任务队列。线程任务执行流程如图所示。
5.2、源码分析
ScheduledThreadPoolExecutor的UML图如图所示。
1. ScheduledFutureTask
ScheduledFutureTask内部定义了3个变量:time、period、outerTask。time表示任务的具体执行时间,period是任务的间隔周期,计算将time+period可以得到任务下次执行的时间,outerTask是下一周期的任务。ScheduledFutureTask变量定义如代码所示。
同时,ScheduledFutureTask定义了3个工具方法:isPeriodic方法、setNextRunTime方法与compareTo方法。isPeriodic方法是通过period字段值来判断是不是周期性任务:0表示定时任务,大于0表示周期性任务。setNextRunTime方法的功能是设置任务的下一个周期的执行时间(time+period)。compareTo方法是比较两个任务的执行时间,确定最先需要执行的任务。DelayedWorkQueue通过compareTo方法来实现任务优先级比较。compareTo方法实现如代码所示。
run方法的执行逻辑如下。
- run方法首先会调用isPeriodic方法来判断当前任务是否为周期性任务。
- 如果当前任务不是周期性任务,run方法就调用FutureTask的run方法来执行任务。
- 如果是周期性任务,run方法就调用FutureTask的runAndReset方法来执行任务。
- 在执行完周期性任务后,run方法调用setNextRunTime方法设置好下次执行的时间,然后调用reExecutePeriodic方法将当前任务重新加入任务队列。
run方法是任务执行的核心逻辑实现,如代码所示。
2. DelayedWorkQueue
DelayedWorkQueue是基于小顶堆构建的优先级队列。它在内部定义了用于存储周期性任务的queue数组(默认的容量是16),定义了用来表示数组中有多少个元素的size变量,以及定义了用于并发控制的ReentrantLock变量lock。DelayedWorkQueue变量定义如代码所示。
offer方法用于在DelayedWorkQueue队列中插入定时任务。offer方法首先会获取独占锁,确保同一个时刻只有一个线程可在队列中插入任务。接着offer方法根据数组元素大小判断是否需要扩容,如果需要扩容则调用grow方法实现数组扩容。然后会判断数组是否为空,如果为空则将任务直接赋值到数组的头部,如果数据不为空,则调用siftUp方法将任务插入到数组的尾部。如下代码是offer方法的具体代码实现。
siftUp方法首先会将数据插入到队列的尾部,然后按照小顶堆的规则对整个数组进行数据调整。siftUp方法通过(k-1)/2的公式计算出父节点,然后比较当前节点与父节点的时间值。如果父节点的时间大于当前节点的时间,则将父节点和当前节点互换。按照上述方式依次遍历各自的父节点,直到父节点时间值比当前时间值小或者父节点是头节点为止。如下代码是siftUp方法的具体实现。
take方法的功能是从延迟队列中获取需要立即执行的任务。在通过take方法获取任务时,工作线程首先需要获取独占锁,确保线程池中只有一个线程可以从任务队列上获取任务。然后判断队列是否为空,如果队列为空就进行等待。如果队列不为空,判断队列中头节的任务是否到执行时间了。如果任务到了执行时间,就调用finishPoll方法将头节点的任务从队列中移除;如果任务还没到执行时间,会继续等待。如下代码是take方法实现。
finishPoll方法比较简单,就是获取队尾元素,并将队尾最后一个元素设置到队头,然后调用siftDown方法按照小顶堆的规则对整个队列进行数据整理,如代码所示。
siftDown方法是从堆顶向下来整理数据的,也就是从数组的头部向后遍历来整理数组中的数据。若k是队列头节点索引下标,key是队列的最后一个元素。通过2k+1得出左子节点的下标child,2k+2得出右子节点的下标right。找出左子节点与右子节点的最小值c,然后将c与key进行比较。如果key大于c,则将c的值移动到k的位置上。通过遍历查找,让数组中的数据符合小顶堆的规则。移除元素后重新排序的实现如代码所示。
3. ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor提供了submit方法,以将Callable、Runnable等线程任务转换成周期性任务,然后调用schedule方法将任务提交到线程池中。schedule方法先将Callable任务转换成ScheduledFutureTask任务,然后调用delayedExecute方法将任务提交到线程池执行,如代码所示。
delayedExecute方法用于完成周期性线程任务的调度,它先会将任务直接插入任务队列,然后调用ensurePrestart方法来创建工作线程。如下代码是任务调度的具体实现。
6、Executors实现原理
为了让开发者方便地使用线程池,JDK提供了线程池的工厂Executors来创建各种线程池。
Executors是个静态工厂类,提供了丰富的方法来创建各种线程池,方便开发者使用。其UML图如图所示。
ExecutorService定义了线程任务提交、线程任务执行、获取任务结果、关闭线程池等接口方法。AbstractExecutorService是一个抽象类,它实现了ExecutorService接口,对ExecutorService接口的任务提交、任务执行的相关方法进行了默认实现。它能将Runnable线程任务转换成FutureTask任务并提交到线程池执行。
ThreadPoolExecutor是Java默认线程池的实现类,它实现了线程池的任务调度、线程任务执行、线程池大小动态调整、线程池安全停止等相关的功能。
ForkJoinPool是Fork-Join任务的线程池,采用工作窃取的方式进行任务调度,适用于线程执行过程中产生子任务的业务场景。
Executors降低了线程池使用门槛,使得初级开发者也能快速上手。但是Executors也屏蔽了线程池的构造细节,容易造成系统故障。例如固定线程数的线程池与单线程池的任务队列的最大长度为Integer的最大值,很容易造成线程任务在任务队列中积压,甚至导致OOM。对于缓存线程的线程池,允许的最大线程数为Integer的最大值(2147483647),可能会造成系统创建大量的线程,从而导致OOM。所以阿里巴巴的“Java开发规范”禁止使用Executors来创建线程池。该规范规定开发人员必须通过ThreadPoolExecutor的构造函数来创建线程池。
