type
status
date
slug
summary
tags
category
icon
password
Java 多线程的源码实现解析
为什么要用线程池?
- 降低资源消耗:通过重复利用现有的线程来执行任务,避免多次创建和销毁线程。
- 提高相应速度:因为省去了创建线程这个步骤,所以在拿到任务时,可以立刻开始执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供附加功能:线程池的可拓展性使得我们可以自己加入新的功能,比如说定时、延时来执行某些线程。
线程池的设计
如上图所示,本文试图回答几个问题:
- 线程池如何维护自身状态(表示、获取、转移)?
- 线程池如何管理任务(任务获取,分配)?
- 线程池如何管理线程(表示、创建、执行任务、回收)?
线程池如何维护自身状态?
在JDK的 ThreadPoolExecutor 线程池中用一个原子整型来维护线程池的两个状态参数:
ctl 的高 3 位被用来表示线程池运行状态
runState
, 其余 29 位用来表示线程池中的线程数量 workerCount
:- RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
- SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
- STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
- TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
- TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
- 线程池不是RUNNING状态;
- 线程池状态不是TIDYING状态或TERMINATED状态;
- 如果线程池状态是SHUTDOWN并且workerQueue为空;
- workerCount为0;
- 设置TIDYING状态成功
进入TERMINATED的条件如下:
五大状态的轮转过程:
二者分别通过下面两个函数获取:
线程池如何管理任务?
如图1所示,当用户提交一个任务时,线程池应该根据其状态做出不同的响应,对应的函数为 execute() 函数:
execute函数执行过程(分配)
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则
创建
并启动一个线程来执行新提交的任务。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务
添加
到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则
创建
并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据
拒绝
策略来处理该任务, 默认的处理方式是直接抛异常。
这里有一点要注意,就是在将任务添加到队列中后,做了一个recheck,这是因为在往阻塞队列中添加任务地时候,有可能阻塞队列已满,需要等待其他的任务移出队列,在这个过程中,线程池的状态可能会发生变化,所以需要double check。
getTask 函数(获取)
这里重要的地方是
第二个if判断
,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多余的非核心线程销毁掉,保持线程数量在corePoolSize即可。线程池如何管理线程?
Worker 类(表示)
Worker类继承了 AbstractQueuedSynchronizer 类并且实现了 Runnable 接口。之所以继承 AbstractQueuedSynchronizer 类是因为线程池有一个需求是要获取线程的运行状态(工作中,空闲中)。Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的。
上述代码可以实现:
lock方法一旦获取了独占锁,表示当前线程正在执行任务中(runWorker函数在取到任务后会执行lock()方法后执行任务);如果正在执行任务(state = 1),则不应该中断线程;如果该线程现在不是独占锁的状态,也就是空闲(state = 0)的状态,说明它没有在处理任务,这时可以对该线程进行中断;线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程;
addWorker 函数(创建)
addWorker函数的作用是新建一个线程,其源码如下:
runWorker函数(执行与回收)
执行流程:
- while 循环通过 getTask 函数不断地从阻塞队列中获取任务;
- if判断:
- 如果线程池状态大于等于STOP(正在停止)则设置当前线程的中断状态(保证当前线程中断)
- 如果线程池状态小于STOP则清除中断状态(保证当前线程不中断)
- 调用 task.run() 方法执行任务;
- 如果 task == null, 跳出while循环,执行回收函数销毁线程;
processWorkerExit 函数(销毁)
执行流程:
- 判断是否为异常退出,如果是说明线程执行时出现了异常,需要建 workerCount 减 1;
- 统计线程池完成任务数量,将Worker引用从HashSet中移除(会被jvm回收),相当于销毁线程;
- 根据线程池状态判断是否结束线程池;
- 当线程池状态为 RUNNING 或 SHUTDOWN时:
如果任务为异常结束:
一个小 Demo
- Author:Lamyh
- URL:https://blog.ccyh.xyz/article/multi-thread
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!