【Java】线程池实现原理以及实践场景

核心设计与实现

UML图

我们首先来看一下 ThreadPoolExecutor 的
UML 类图,了解下 ThreadPoolExecutor 的继承关系。

·
最底层的接口是Executor,它只是提供了一种思想,将任务提交和任务的执行解耦,用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供 Runnable 对象,将任务的运行逻辑提交到执行器 (Executor) 里面。由 Executor 框架完成线程的调配和任务的执行部分。

1
2
3
4
5
package java.util.concurrent;

public interface Executor {
void execute(Runnable var1);
}

ExecutorService 接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一
个或一批异步任务生成 Future 的方法;(2)提供了管控线程池的方法,比如停止线
程池的运行

AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来.

ThreadPoolExecutor 将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

线程池在内部是一个生产者消费者模型,将线程和任务进行分离。

任务相当于是生产者,一个任务过来了,会判断这个任务是由线程来执行,还是放入队列,还是拒绝执行。

线程则相当于是消费者,会根据任务的请求进行线程的分配,这个线程是执行任务,还是说执行完成之后去执行另外一个任务完成线程的复用,最后回收线程。

线程池的生命周期管理

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态 (runState) 和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState、线程数量 (workerCount)
两个关键参数的维护放在了一起,如下代码所示:

1
private final AtomicInteger ctl;

ctl 这个 AtomicInteger 类型,ctl同时包含两个信息,高3位表示的是runState(运行状态),低29位保存的是workCount(线程数量),将两个值保存在同一个变量里面,避免了在做相关操作的时候,出现的不一致问题。这里获取状态和获取数量都采用的是位运算,速度会快很多。

1
2
3
4
5
6
7
8
9
10
11
private static int runStateOf(int c) {
return c & -536870912;
}

private static int workerCountOf(int c) {
return c & 536870911;
}

private static int ctlOf(int rs, int wc) {
return rs | wc;
}

运行状态:

  • Running,就绪状态,能够接受新的任务,能够处理队列中的任务。
  • Shutdown,关闭状态,不能接受新的任务,可以处理队列中的任务。
  • Stop,不接受新的任务,不会处理队列中的任务。
  • Tidying,所有的任务都终止了,workCount=0,
  • Terminated,

任务执行机制

拒绝策略

任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到 maximumPoolSize 时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。

  • 抛异常(默认)
  • 丢弃任务
  • 丢弃最前面的任务
  • 由当前线程去执行。

Worker线程管理

Worker线程

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;

Worker(Runnable firstTask) {
this.setState(-1);
this.firstTask = firstTask;
this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);
}
}

Worker这个工作线程,继承AQS,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask.thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务.

firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

​Worker是通过继承AQS,使用AQS来实现独占锁这个功能,因为我不需要重入。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

1
2
3
4
5
6
7
8
public void lock() {
this.acquire(1);
}

public void unlock() {
this.release(1);
}

1.lock方法一旦获取了独占锁,表示当前线程正在执行任务中。

2.如果正在执行任务,则不应该中断线程。

3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。

4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。

线程池增加线程

增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程。

addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,其执行流程如下图所示:

Worker线程执行任务

在 Worker 类中的 run 方法调用了 runWorker 方法来执行任务,runWorker 方法的执行过程如下:

  1. while 循环不断地通过 getTask() 方法获取任务。
  2. getTask() 方法从阻塞队列中取任务。
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程
    不是中断状态。
  4. 执行任务。
  5. 如果 getTask 结果为 null 则跳出循环,执行 processWorkerExit() 方法,销
    毁线程。

Worker线程回收

线程池中线程的销毁依赖 JVM 自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被 JVM 回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可

Worker 被创建出来后,就会不断地进行轮询,然后获取任务去执行,线程可以无限等待获取任务,当无法获取到任务的时候,也就是说getTask方法为空的时候,循环会结束,worker会主动消除自身在线程池的引用。

1
2
3
4
5
6
7
8
9
try {
while(task != null || (task = this.getTask()) != null) {
...
}

completedAbruptly = false;
} finally {
this.processWorkerExit(w,completedAbruptly);
}

回收线程的时候会记录线程执行任务的情况,然后将线程的引用移出线程池,之后线程池自适应当前的状态。

实践场景

场景一:快速响应用户请求

用户发起的实时请求,服务需要追求这个响应的时间,比如说用户需要查看一个商品的信息,那我需要吧商品的信息维度都要显示出来,展示给用户。

这种场景因为需要实时响应,越快越好,所以这种场景下,所以尽量的不用队列去缓冲任务,而是尽可能的调高我的core和max,去创造更多的线程去执行任务。

场景二:快速处理批量任务

大量的任务,需要快速执行。但是和追求相应速度的场景相比,这种情况下我们并不需要瞬时间完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以在这种情况下,我们可以设置缓冲区,然后调整合适的core和max设置线程池的参数。

实际问题的思考方案

实际上线程池的难点在于他的参数配置,需要我们合理的预估我们执行的任务,IO 密集型和 CPU 密集型的任务运行起来的
情况差异非常大。没有一个很通用的一个公式。为了解决参数不好配,修改参数成本高等问题。在 Java 线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。同时对线程池进行监控,遇到警告可以即使邮件通知。所以可以使用分布式的配置平台来完成线程池参数动态的调整。

线程池参数动态化

线程池是提供了set方法的,比如说setCorePoolSize,setRejectedExecutionHandler,以 setCorePoolSize 为方法例,在运行期线程池使用方调用此方法设置corePoolSize 之后,线程池会直接覆盖原来的 corePoolSize 值,并且基于当前值和原始值的比较结果采取不同的处理策略。对于当前值小于当前工作线程数的情况,说明有多余的 worker 线程,此时会向当前 idle 的 worker 线程发起中断请求以实现回收,多余的 worker 在下次 idel 的时候也会被回收;对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的 worker 线程来执行队列任务.

所以线程池参数动态化,是能够让线程池的运行达到一个平衡点的。

使用搜索:谷歌必应百度