Java 多线程源码解析
00 min
2024-5-26
2024-5-26
type
status
date
slug
summary
tags
category
icon
password
😀
Java 多线程的源码实现解析
 

为什么要用线程池?

  • 降低资源消耗:通过重复利用现有的线程来执行任务,避免多次创建和销毁线程。
  • 提高相应速度:因为省去了创建线程这个步骤,所以在拿到任务时,可以立刻开始执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供附加功能:线程池的可拓展性使得我们可以自己加入新的功能,比如说定时、延时来执行某些线程。

线程池的设计

notion image
如上图所示,本文试图回答几个问题:
  1. 线程池如何维护自身状态(表示、获取、转移)?
  1. 线程池如何管理任务(任务获取,分配)?
  1. 线程池如何管理线程(表示、创建、执行任务、回收)?

线程池如何维护自身状态?

在JDK的 ThreadPoolExecutor 线程池中用一个原子整型来维护线程池的两个状态参数:
ctl 的高 3 位被用来表示线程池运行状态 runState, 其余 29 位用来表示线程池中的线程数量 workerCount
  1. RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
  1. SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
  1. STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
  1. TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
  1. TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
    1. 进入TERMINATED的条件如下:
      • 线程池不是RUNNING状态;
      • 线程池状态不是TIDYING状态或TERMINATED状态;
      • 如果线程池状态是SHUTDOWN并且workerQueue为空;
      • workerCount为0;
      • 设置TIDYING状态成功
五大状态的轮转过程:
notion image
二者分别通过下面两个函数获取:

线程池如何管理任务?

如图1所示,当用户提交一个任务时,线程池应该根据其状态做出不同的响应,对应的函数为 execute() 函数:

execute函数执行过程(分配)

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  1. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  1. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  1. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
这里有一点要注意,就是在将任务添加到队列中后,做了一个recheck,这是因为在往阻塞队列中添加任务地时候,有可能阻塞队列已满,需要等待其他的任务移出队列,在这个过程中,线程池的状态可能会发生变化,所以需要double check。

getTask 函数(获取)

这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多余的非核心线程销毁掉,保持线程数量在corePoolSize即可。

线程池如何管理线程?

Worker 类(表示)

Worker类继承了 AbstractQueuedSynchronizer 类并且实现了 Runnable 接口。之所以继承 AbstractQueuedSynchronizer 类是因为线程池有一个需求是要获取线程的运行状态(工作中,空闲中)。Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的。
上述代码可以实现:
lock方法一旦获取了独占锁,表示当前线程正在执行任务中(runWorker函数在取到任务后会执行lock()方法后执行任务);如果正在执行任务(state = 1),则不应该中断线程;如果该线程现在不是独占锁的状态,也就是空闲(state = 0)的状态,说明它没有在处理任务,这时可以对该线程进行中断;线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程;

addWorker 函数(创建)

addWorker函数的作用是新建一个线程,其源码如下:

runWorker函数(执行与回收)

执行流程:
  1. while 循环通过 getTask 函数不断地从阻塞队列中获取任务;
  1. if判断:
      • 如果线程池状态大于等于STOP(正在停止)则设置当前线程的中断状态(保证当前线程中断)
      • 如果线程池状态小于STOP则清除中断状态(保证当前线程不中断)
  1. 调用 task.run() 方法执行任务;
  1. 如果 task == null, 跳出while循环,执行回收函数销毁线程;

processWorkerExit 函数(销毁)

执行流程:
  1. 判断是否为异常退出,如果是说明线程执行时出现了异常,需要建 workerCount 减 1;
  1. 统计线程池完成任务数量,将Worker引用从HashSet中移除(会被jvm回收),相当于销毁线程;
  1. 根据线程池状态判断是否结束线程池;
  1. 当线程池状态为 RUNNING 或 SHUTDOWN时:
    1. 如果任务为异常结束:

一个小 Demo

上一篇
图解红黑树
下一篇
人工神经网络原理解析

Comments
Loading...