线程池底层原理详解与源码分析
线程池底层原理详解与源码分析
线程池执行任务的具体流程是怎样的?
ThreadPoolExecutor中提供了两种执行任务的方法:
实际上submit中最终还是调用的execute()方法,只不过会返回一个Future对象,用来获取任务执行结果:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

****
线程池的五种状态是如何流转的?
线程池有五种状态:
- ********
- ********
- ********
这五种状态并不能任意转换,只会有以下几种转换情况:
线程池中的线程是如何关闭的?
我们一般会使用thread.start()方法来开启一个线程,那如何停掉一个线程呢?
Thread类提供了一个stop(),但是标记了@Deprecated,为什么不推荐用stop()方法来停掉线程呢?
因为stop()方法太粗暴了,一旦调用了stop(),就会直接停掉线程,但是调用的时候根本不知道线程刚刚在做什么,任务做到哪一步了,这是很危险的。
这里强调一点,stop()会释放线程占用的synchronized锁(不会自动释放ReentrantLock锁,这也是不建议用stop()的一个因素)。
package com.junziln.study.main;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadTest {
static int count = 0;
static final Object lock = new Object();
static final ReentrantLock reentrantLock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Runnable() {
public void run() {
// synchronized (lock) {
reentrantLock.lock();
for (int i = 0; i < 100; i++) {
count++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// }
reentrantLock.unlock();
}
});
thread.start();
Thread.sleep(5*1000);
thread.stop();
//
// Thread.sleep(5*1000);
reentrantLock.lock();
System.out.println(count);
reentrantLock.unlock();
// synchronized (lock) {
// System.out.println(count);
// }
}
}
可以调用thread的interrupt()来中断线程
public class ThreadTest {
static int count = 0;
static boolean stop = false;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Runnable() {
public void run() {
for (int i = 0; i < 100; i++) {
if (Thread.currentThread().isInterrupted()) {
break;
}
count++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
}
});
thread.start();
Thread.sleep(5 * 1000);
thread.interrupt();
Thread.sleep(5 * 1000);
System.out.println(count);
}
}
线程池中就是通过interrupt()来停止线程的,比如shutdownNow()方法中会调用:
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
线程池为什么一定得是阻塞队列?
线程池中的线程在运行过程中,执行完创建线程时绑定的第一个任务后,就会不断的从队列中获取任务并执行,那么如果队列中没有任务了,线程为了不自然消亡,就会阻塞在获取队列任务时,等着队列中有任务过来就会拿到任务从而去执行任务。
通过这种方法能最终确保,线程池中能保留指定个数的核心线程数,关键代码为:
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
某个线程在从队列获取任务时,会判断是否使用超时阻塞获取,我们可以认为非核心线程会poll(),核心线程会take(),非核心线程超过时间还没获取到任务后面就会自然消亡了。
线程发生异常,会被移出线程池吗?
答案是会的,那有没有可能核心线程数在执行任务时都出错了,导致所有核心线程都被移出了线程池?

在源码中,当执行任务时出现异常时,最终会执行processWorkerExit(),执行完这个方法后,当前线程也就自然消亡了,但是!processWorkerExit()方法中会额外再新增一个线程,这样就能维持住固定的核心线程数。
Tomcat是如何自定义线程池的?
Tomcat中用的线程池为org.apache.tomcat.util.threads.ThreadPoolExecutor,注意类名和JUC下的一样,但是包名不一样。
Tomcat会创建这个线程池:
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
注入传入的队列为TaskQueue,它的入队逻辑为
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) {
return super.offer(o);
}
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
return super.offer(o);
}
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
return super.offer(o);
}
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
return false;
}
//if we reached here, we need to add it to the queue
return super.offer(o);
}
特殊在:
这样就控制了,Tomcat的这个线程池,在提交任务时:
所以随着任务的提交,会优先创建线程,直到线程个数等于最大线程数才会入队。
当然其中有一个比较细的逻辑是:在提交任务时,如果正在处理的任务数小于线程池中的线程个数,那么也会直接入队,而不会去创建线程,也就是上面源码中getSubmittedCount的作用。
线程池的核心线程数、最大线程数该如何设置?
线程池中有两个非常重要的参数:
那这两个参数该如何设置呢?
我们对线程池负责执行的任务分为三种情况:
CPU密集型任务的特点时,线程在执行任务时会一直利用CPU,所以对于这种情况,就尽可能避免发生线程上下文切换。
比如,现在我的电脑只有一个CPU,如果有两个线程在同时执行找素数的任务,那么这个CPU就需要额外的进行线程上下文切换,从而达到线程并行的效果,此时执行这两个任务的总时间为:
任务执行时间*2+线程上下文切换的时间
而如果只有一个线程,这个线程来执行两个任务,那么时间为:
任务执行时间*2
所以对于CPU密集型任务,线程数最好就等于CPU核心数,可以通过以下API拿到你电脑的核心数:
Runtime.getRuntime().availableProcessors()
只不过,为了应对线程执行过程发生缺页中断或其他异常导致线程阻塞的请求,我们可以额外在多设置一个线程,这样当某个线程暂时不需要CPU时,可以有替补线程来继续利用CPU。
所以,对于CPU密集型任务,我们可以设置线程数为:
我们在来看IO型任务,线程在执行IO型任务时,可能大部分时间都阻塞在IO上,假如现在有10个CPU,如果我们只设置了10个线程来执行IO型任务,那么很有可能这10个线程都阻塞在了IO上,这样这10个CPU就都没活干了,所以,对于IO型任务,我们通常会设置线程数为:
不过,就算是设置为了2*CPU核心数,也不一定是最佳的,比如,有10个CPU,线程数为20,那么也有可能这20个线程同时阻塞在了IO上,所以可以再增加线程,从而去压榨CPU的利用率。
通常,如果IO型任务执行的时间越长,那么同时阻塞在IO上的线程就可能越多,我们就可以设置更多的线程,但是,线程肯定不是越多越好,我们可以通过以下这个公式来进行计算:
线程数 = CPU核心数 *( 1 + 线程等待时间 / 线程运行总时间 )
可以利用jvisualvm抽样来估计这两个时间:
图中表示,在刚刚这次抽样过程中,run()总共的执行时间为538948ms,利用了CPU的时间为86873ms,所以没有利用CPU的时间为538948ms-86873ms。
所以我们可以计算出:
线程等待时间 = 538948ms-86873ms
线程运行总时间 = 538948ms
所以:线程数 = 8 *( 1 + (538948ms-86873ms) / 538948ms )= 14.xxx
所以根据公式算出来的线程为14、15个线程左右。
按上述公式,如果我们执行的任务IO密集型任务,那么:线程等待时间 = 线程运行总时间,所以:
线程数 = CPU核心数 *( 1 + 线程等待时间 / 线程运行总时间 )
= CPU核心数 *( 1 + 1 )
= CPU核心数 * 2
,实际工作中情况会更复杂,比如一个应用中,可能有多个线程池,除开线程池中的线程可能还有很多其他线程,或者除开这个应用还是一些其他应用也在运行,所以实际工作中如果要确定线程数,最好是压测。
@RestController
public class TestController {
@GetMapping("/test")
public String test() throws InterruptedException {
Thread.sleep(1000);
return "test";
}
}

当我们把线程数调整为500:
server.tomcat.threads.max=500

发现执行效率提高了一倍,假如再增加线程数到1000:
性能就降低了。
总结,我们再工作中,对于:
源码分析
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize(核心线程大小):
当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
maximumPoolSize(最大线程数)****:
线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
runnableTaskQueue(任务队列)****:
用于保存等待执行的任务的阻塞队列。当所有的核心线程被占满时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务

keepAliveTime(线程活动保持时间):
表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0
RejectedExecutionHandler(拒绝策略)****:
ThreadFactory:
用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。
TimeUnit(线程活动保持时间的单位):
可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
核心属性ctl
在线程池的源码中,会通过一个AtomicInteger类型的变量**,来表示线程池的状态和当前线程池中的工作线程数量**。
一个Integer占4个字节,也就是32个bit,线程池有5个状态:
- ********
- ********
2个bit能表示4种状态,那5种状态就至少需要三个bit位,比如在线程池的源码中就是这么来表示的:
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Integer.SIZE为32,所以COUNT_BITS为29,最终各个状态对应的二级制为:
- ****
- ****
- ****
- ****
- ****
所以,只需要使用一个Integer数字的最高三个bit,就可以表示5种线程池的状态,而剩下的29个bit就可以用来表示工作线程数,比如,假如ctl为:**00000 00000000 00000000 00000**0,就表示线程池的状态为RUNNING,线程池中目前在工作的线程有10个,这里说的“在工作”意思是线程活着,要么在执行任务,要么在阻塞等待任务。
同时,在线程池中也提供了一些方法用来获取线程池状态和工作线程数
// 29,二进制为00000000 00000000 00000000 00011101
// 因为线程池的生命周期有 5 个状态,为了表达这 5 个状态,我们需要 3 个二进制位。
private static final int COUNT_BITS = Integer.SIZE - 3;
// 00011111 11111111 11111111 11111111
//表示了工作线程的最大数量 1左移29位,再-1
//00100000 00000000 00000000 00000000
//00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// ~CAPACITY为11100000 00000000 00000000 00000000
// &操作之后,得到就是c的高3位
//计算出当前线程池的状态
//int c 00011111 11111111 11111111 11111111
// ~CAPACITY:11100000 00000000 00000000 00000000
//000 SHUTDOWN状态
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
// CAPACITY为00011111 11111111 11111111 11111111
// &操作之后,得到的就是c的低29位
private static int workerCountOf(int c) {
return c & CAPACITY;
}
同时,还有一个方法:
private static int ctlOf(int rs, int wc)
{ return rs | wc; }
就是用来把运行状态和工作线程数量进行合并的一个方法,不过传入这个方法的两个int数字有限制,rs的低29位都得为0,wc的高3位都得为0,这样经过或运算之后,才能得到准确的ctl。
execute方法:(提交优先级)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取ctl
// ctl初始值是ctlOf(RUNNING, 0),表示线程池处于运行中,工作线程数为0
int c = ctl.get();
1、判断当前的线程数是否小于corePoolSize如果是,
使用入参任务通过addWord方法创建一个新的线程,
如果能完成新线程创建exexute方法结束,成功提交任务;
// 工作线程数小于corePoolSize,则添加工作线程,并把command作为该线程要作为这个线程的第一个任务(firstTask)
if (workerCountOf(c) < corePoolSize) {
// true表示添加的是核心工作线程,具体一点就是,在addWorker内部会判断当前工作线程数是不是超过了corePoolSize
// 如果超过了则会添加失败,addWorker返回false,表示不能直接开启新的线程来执行任务,而是应该先入队
if (addWorker(command, true))
return;
// 如果添加核心工作线程失败,那就重新获取ctl,可能是线程池状态被其他线程修改了
// 也可能是其他线程也在向线程池提交任务,导致核心工作线程已经超过了corePoolSize
c = ctl.get();
}
2、在第一步没有完成任务提交;状态为运行并且能成功加入任务到工作队列后,
再进行一次check,如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了)
非运行状态下当然是需要reject;
然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
//// 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了
//创建核心线程失败 判断当前线程池状态是否是Running 如果是Running,执行offer方法将任务添加到工作队 off返回Boolean add添加失败抛异常
// 线程池状态是否还是RUNNING,如果是就把任务添加到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
// 在任务入队时,线程池的状态可能也会发生改变
// 再次检查线程池的状态,如果线程池不是RUNNING了,那就不能再接受任务了,就得把任务从队列中移除,并进行拒绝策略
// 如果线程池的状态没有发生改变,仍然是RUNNING,那就不需要把任务从队列中移除掉
// 不过,为了确保刚刚入队的任务有线程会去处理它,需要判断一下工作线程数,如果为0,那就添加一个非核心的工作线程
// 添加的这个线程没有自己的任务,目的就是从队列中获取任务来执行
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果线程池状态不是RUNNING,或者线程池状态是RUNNING但是队列满了,则去添加一个非核心工作线程
// 实际上,addWorker中会判断线程池状态如果不是RUNNING,是不会添加工作线程的,拒绝策略不仅仅是在饱和状态下使用,在线程池进入到关闭阶段同样需要使用到;
// false表示非核心工作线程,作用是,在addWorker内部会判断当前工作线程数已经超过了maximumPoolSize,如果超过了则会添加不成功,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
addWorker方法
addWorker方法是核心方法,是用来添加线程的,core参数表示添加的是核心线程还是非核心线程。
什么是添加线程?
实际上就要开启一个线程,不管是核心线程还是非核心线程其实都只是一个普通的线程,而核心和非核心的区别在于:
- ****
- ****
所以在addWorker方法中,首先就要判断工作线程有没有超过限制,如果没有超过限制再去开启一个线程。
并且在addWorker方法中,还得判断线程池的状态,如果线程池的状态不是RUNNING状态了,那就没必要要去添加线程了,当然有一种特例,就是线程池的状态是SHUTDOWN,但是队列中有任务,那此时还是需要添加添加一个线程的。
那这种特例是如何产生的呢?
我们前面提到的都是开启新的工作线程,那么工作线程怎么回收呢?不可能开启的工作线程一直活着,因为如果任务由多变少,那也就不需要过多的线程资源,所以线程池中会有机制对开启的工作线程进行回收,如何回收的,后文会提到,我们这里先分析,有没有可能线程池中所有的线程都被回收了,答案的是有的。
首先非核心工作线程被回收是可以理解的,那核心工作线程要不要回收掉呢?其实线程池存在的意义,就是提前生成好线程资源,需要线程的时候直接使用就可以,而不需要临时去开启线程,所以正常情况下,开启的核心工作线程是不用回收掉的,就算暂时没有任务要处理,也不用回收,就让核心工作线程在那等着就可以了。
但是!在线程池中有这么一个参数:,表示是否允许核心工作线程超时,意思就是是否允许核心工作线程回收,默认这个参数为false,但是我们可以调用allowCoreThreadTimeOut(boolean value)来把这个参数改为true,只要改了,那么核心工作线程也就会被回收了,那这样线程池中的所有工作线程都可能被回收掉,那如果所有工作线程都被回收掉之后,阻塞队列中来了一个任务,这样就形成了特例情况。
// 第一个参数是准备提交给这个线程执行的任务,之前说了,可以为 null
// 第二个参数为 true 代表使用核心线程数 corePoolSize 作为创建线程的界限,也就说创建这个线程的时候,
// 如果线程池中的线程总数已经达到 corePoolSize,那么不能响应这次创建线程的请求
// 如果是 false,代表使用最大线程数 maximumPoolSize 作为界限
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 这个非常不好理解
//如果线程池状态不是RUNNING,再次做后续判断,判断当前任务是否可以不处理
// 如果线程池已关闭,并满足以下条件之一,那么不创建新的 worker:
// 1. 线程池状态大于 SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED
// 2. firstTask != null
// 3. workQueue.isEmpty()
也就是/**
* 只有如下两种情况可以新增worker,继续执行下去:
* case one:rs==RUNNING
* case two:rs==SHUTDOWN && firstTask ==null&&!workQueue.isEmpty()
*/
// 简单分析下:
// 还是状态控制的问题,当线程池处于 SHUTDOWN 的时候,不允许提交任务,但是已有的任务继续执行
// 当状态大于 SHUTDOWN 时,不允许提交任务,且中断正在执行的任务
// 多说一句:如果线程池处于 SHUTDOWN,但是 firstTask 为 null,且 workQueue 非空,那么是允许创建 worker 的
// 这是因为 SHUTDOWN 的语义:不允许提交新的任务,但是要把已经进入到 workQueue 的任务执行完,所以在满足条件的基础上,是允许创建新的 Worker 的
// 线程池如果是SHUTDOWN状态并且队列非空则创建线程,如果队列为空则不创建线程了
// 线程池如果是STOP状态则直接不创建线程了
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 判断工作线程数是否超过了限制
// 如果超过限制了,则return false
// 如果没有超过限制,则修改ctl,增加工作线程数,cas成功则退出外层retry循环,去创建新的工作线程
// 如果cas失败,则表示有其他线程也在提交任务,也在增加工作线程数,此时重新获取ctl
// 如果发现线程池的状态发生了变化,则继续回到retry,重新判断线程池的状态是不是SHUTDOWN或STOP
// 如果状态没有变化,则继续利用cas来增加工作线程数,直到cas成功
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
// 由于有并发,重新再读取一下 ctl
c = ctl.get(); // Re-read ctl
// 正常如果是 CAS 失败的话,进到下一个里层的for循环就可以了
// 可是如果是因为其他线程的操作,导致线程池的状态发生了变更,如有其他线程关闭了这个线程池
// 那么需要回到外层的for循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 上面主要是 ctl修改成功,也就是工作线程数+1成功
// 接下来就要开启一个新的工作线程了
/*
* 到这里,我们认为在当前这个时刻,可以开始创建线程来执行任务了,
* 因为该校验的都校验了,至于以后会发生什么,那是以后的事,至少当前是满足条件的
*/
boolean workerStarted = false;// 用于判断新的worker实例是否已经开始执行Thread.start
boolean workerAdded = false;// 是否已将这个 worker 添加到 workers 这个 HashSet 中
Worker w = null;
try {
// Worker实现了Runnable接口
// 在构造一个Worker对象时,就会利用ThreadFactory新建一个线程
// Worker对象有两个属性:
// Runnable firstTask:表示Worker待执行的第一个任务,第二个任务会从阻塞队列中获取
// Thread thread:表示Worker对应的线程,就是这个线程来获取队列中的任务并执行的
w = new Worker(firstTask);
// 拿出线程对象,还没有start
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//基于重新获取的ctl,拿到线程池的状态
int rs = runStateOf(ctl.get());
// case1如果线程池的状态是RUNNING
// case2或者线程池的状态变成了SHUTDOWN,但是当前线程没有自己的第一个任务,那就表示当前调用addWorker方法是为了从队列中获取任务来执行
// 正常情况下线程池的状态如果是SHUTDOWN,是不能创建新的工作线程的,但是队列中如果有任务,那就是上面说的特例情况
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 因为t是新构建的线程 如果Worker对象对应的线程已经在运行了,那就有问题,直接抛异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers用来记录当前线程池中工作线程,调用线程池的shutdown方法时会遍历worker对象中断对应线程
workers.add(w);
// largestPoolSize用来跟踪线程池在运行过程中工作线程数的峰值
int s = workers.size();
//如果现在的工作线程数,大于历史最大的工程线程数,就重新赋值给largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;//将工作线程添加的标识设置为true
}
} finally {
mainLock.unlock();
}
// 运行线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 在上述过程中如果抛了异常,需要从works中移除所添加的work,并且还要修改ctl,工作线程数-1,表示新建工作线程失败
if (! workerStarted)
addWorkerFailed(w);
}
// 最后表示添加工作线程成功
return workerStarted;
}
对于addWorker方法,核心逻辑就是:
因为 Doug Lea 把线程池中的线程包装成了一个个 Worker,翻译成工人,就是线程池中做任务的线程。所以到这里,我们知道任务是 Runnable(内部变量名叫 task 或 command),线程是 Worker。
Worker 这里又用到了抽象类 AbstractQueuedSynchronizer。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
// 这个是真正的线程,任务靠你啦
final Thread thread;
// 前面说了,这里的 Runnable 是任务。为什么叫 firstTask?因为在创建线程的时候,如果同时指定了
// 这个线程起来以后需要执行的第一个任务,那么第一个任务就是存放在这里的(线程可不止执行这一个任务)
// 当然了,也可以为 null,这样线程起来了,自己到任务队列(BlockingQueue)中取任务(getTask 方法)就行了
Runnable firstTask;
// 用于存放此线程完成的任务数,注意了,这里用了 volatile,保证可见性
volatile long completedTasks;
// Worker 只有这一个构造方法,传入 firstTask,也可以传 null
Worker(Runnable firstTask) {
//刚刚初始化的工作线程不允许被中断的
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 调用 ThreadFactory 来创建一个新的线程
this.thread = getThreadFactory().newThread(this);
}
// 这里调用了外部类的 runWorker 方法
public void run() {
runWorker(this);
}
...// 其他几个方法没什么好看的,就是用 AQS 操作,来获取这个线程的执行权,用了独占锁
}
runWorker方法(
// 此方法由 worker 线程启动后调用,这里用一个 while 循环来不断地从等待队列中获取任务并执行
// 前面说了,worker 在初始化的时候,可以指定 firstTask,那么第一个任务也就可以不需要从队列中获取
final void runWorker(Worker w) {
// 就是当前工作线程
Thread wt = Thread.currentThread();
// 把Worker要执行的第一个任务拿出来(如果有的话)
Runnable task = w.firstTask;
w.firstTask = null;//将Worker的firstTask设置为null
// 这个地方,后面单独分析中断的时候来分析
w.unlock(); // allow interrupts
//执行任务时,勾子函数中是否出现异常的标识
boolean completedAbruptly = true;
try {
// 判断当前线程是否有自己的第一个任务,如果没有就从阻塞队列中获取任务
// 如果阻塞队列中也没有任务,那线程就会阻塞在这里
// 但是并不会一直阻塞,在getTask方法中,会根据我们所设置的keepAliveTime来设置阻塞时间
// 如果当前线程去阻塞队列中获取任务时,等了keepAliveTime时间,还没有获取到任务,则getTask方法返回null,相当于退出循环
// 当然并不是所有线程都会有这个超时判断,主要还得看allowCoreThreadTimeOut属性和当前的工作线程数等等,后面单独分析
// 目前,我们只需要知道工作线程在执行getTask()方法时,可能能直接拿到任务,也可能阻塞,也可能阻塞超时最终返回null
//先从核心和非核心拿 ,再从队列拿,队列的优先级最低
while (task != null || (task = getTask()) != null) {
// 只要拿到了任务,就要去执行任务
// Work先加锁,跟shutdown方法有关,先忽略,后面会分析
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 下面这个if,最好把整篇文章都看完之后再来看这个if的逻辑
// 工作线程在运行过程中
// 如果发现线程池的状态变成了STOP,正常来说当前工作线程的中断标记应该为true,如果发现中断标记不为true,则需要中断自己
// 如果线程池的状态不是STOP,要么是RUNNING,要么是SHUTDOWN
// 但是如果发现中断标记为true,那是不对的,因为线程池状态不是STOP,工作线程仍然是要正常工作的,不能中断掉
// 就算是SHUTDOWN,也要等任务都执行完之后,线程才结束,而目前线程还在执行任务的过程中,不能中断
// 所以需要重置线程的中断标记,不过interrupted方法会自动清空中断标记
// 清空为中断标记后,再次判断一下线程池的状态,如果又变成了STOP,那就仍然中断自己
// 中断了自己后,会把当前任务执行完,在下一次循环调用getTask()方法时,从阻塞队列获取任务时,阻塞队列会负责判断当前线程的中断标记
// 如果发现中断标记为true,那就会抛出异常,最终退出while循环,线程执行结束
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 空方法,给自定义线程池来实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务.执行任务的run方法,并不是调用任务的start方法
// 注意执行任务时可能会抛异常,如果抛了异常会先依次执行三个finally,从而导致 completedAbruptly = false这行代码没有执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 空方法,给自定义线程池来实现
afterExecute(task, thrown);
}
} finally {
// 置空 task,准备 getTask 获取下一个任务. 等下次循环就会从队列里面获取
task = null;
w.completedTasks++; // 跟踪当前Work总共执行了多少了任务
w.unlock();
}
}
// 正常退出了While循环
// 如果是执行任务的时候抛了异常,虽然也退出了循环,但是是不会执行这行代码的,只会直接进去下面的finally块中
// 所以,要么是线程从队列中获取任务时阻塞超时了从而退出了循环会进入到这里
// 要么是线程在阻塞的过程中被中断了,在getTask()方法中会处理中断的情况,如果被中断了,那么getTask()方法会返回null,从而退出循环
// completedAbruptly=false,表示线程正常退出
completedAbruptly = false;
} finally {
// 如果到这里,需要执行线程关闭:
// 1. 说明 getTask 返回 null,也就是说,队列中已经没有任务需要执行了,执行关闭
// 2. 任务执行过程中发生了异常
// 第一种情况,已经在代码处理了将 workCount 减 1,这个在 getTask 方法分析中会说
// 第二种情况,workCount 没有进行处理,所以需要在 processWorkerExit 中处理
// 因为当前线程退出了循环,如果不做某些处理,那么这个线程就运行结束了,就是上文说的回收(自然消亡)掉了,线程自己运行完了也就结束了
// 但是如果是由于执行任务的时候抛了异常,那么这个线程不应该直接结束,而应该继续从队列中获取下一个任务
// 可是代码都执行到这里了,该怎么继续回到while循环呢,怎么实现这个效果呢?
// 当然,如果是由于线程被中断了,或者线程阻塞超时了,那就应该正常的运行结束
// 只不过有一些善后工作要处理,比如修改ctl,工作线程数-1
processWorkerExit(w, completedAbruptly);
}
}
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly为true,表示是执行任务的时候抛了异常,那就修改ctl,工作线程数-1
// 如果completedAbruptly为false,表示是线程阻塞超时了或者被中断了,实际上也要修改ctl,工作线程数-1
// 只不过在getTask方法中已经做过了,这里就不用再做一次了
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 当前Work要运行结束了,将完成的任务数累加到线程池上
completedTaskCount += w.completedTasks;
// 将当前Work对象从workers中移除
workers.remove(w);
} finally {
mainLock.unlock();
}
// 因为当前是处理线程退出流程中,所以要尝试去修改线程池的状态为TINDYING
tryTerminate();
int c = ctl.get();
// 如果线程池的状态为RUNNING或者SHUTDOWN,则可能要替补一个线程
if (runStateLessThan(c, STOP)) {
// completedAbruptly为false,表示线程是正常要退出了,则看是否需要保留线程
if (!completedAbruptly) {
// 如果allowCoreThreadTimeOut为true,但是阻塞队列中还有任务,那就至少得保留一个工作线程来处理阻塞队列中的任务
// 如果allowCoreThreadTimeOut为false,那min就是corePoolSize,表示至少得保留corePoolSize个工作线程活着
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果当前工作线程数大于等于min,则表示符合所需要保留的最小线程数,那就直接return,不会调用下面的addWorker方法新开一个工作线程了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 如果线程池的状态为RUNNING或者SHUTDOWN
// 如果completedAbruptly为true,表示当前线程是执行任务时抛了异常,那就得新开一个工作线程
// 如果completedAbruptly为false,但是不符合所需要保留的最小线程数,那也得新开一个工作线程
addWorker(null, false);
}
}
总结一下,某个工作线程正常情况下会不停的循环从阻塞队列中获取任务来执行,正常情况下就是通过阻塞来保证线程永远活着,但是会有一些特殊情况:
getTask
// 此方法有三种可能:
// 1. 阻塞直到获取到任务返回。我们知道,默认 corePoolSize 之内的线程是不会被回收的,
// 它们会一直等待任务
// 2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭
// 3. 如果发生了以下条件,此方法必须返回 null:
// - 池中有大于 maximumPoolSize 个 workers 存在(通过调用 setMaximumPoolSize 进行设置)
// - 线程池处于 SHUTDOWN,而且 workQueue 是空的,前面说了,这种不再接受新的任务
// - 线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行
//从工作队列中获取任务
private Runnable getTask() {
// 表示上次从阻塞队列中获取任务是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//获取线程池运行状态
// Check if queue empty only if necessary.
//同时满足如下两点,则线程池中工作线程数减1,并返回null
//1> rs >= SHUTDOWN,表示线程池不是RUNNING状态
//2> rs >= STOP 表示STOP、TIDYING和TERMINATED这三个状态,它们共同点就是【不接收新任务】也【不处理workQueue里的线程任务】 or 阻塞队列workQueue为空
// 如果线程池状态是STOP,表示当前线程不需要处理任务了,那就修改ctl工作线程数-1
// 如果线程池状态是SHUTDOWN,但是阻塞队列中为空,表示当前任务没有任务要处理了,那就修改ctl工作线程数-1
// return null表示当前线程无需处理任务,线程退出
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // CAS 操作,减少工作线程数
return null;
}
// 当前工作线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
// 用来判断当前线程是无限阻塞还是超时阻塞,如果一个线程超时阻塞,那么一旦超时了,那么这个线程最终就会退出
// 如果是无限阻塞,那除非被中断了,不然这个线程就一直等着获取队列中的任务
// timed是标志超时销毁
// allowCoreThreadTimeOut true 核心线程池也是可以销毁的
// allowCoreThreadTimeOut为true,表示线程池中的所有线程都可以被回收掉,则当前线程应该直接使用超时阻塞,一旦超时就回收
// allowCoreThreadTimeOut为false,则要看当前工作线程数是否超过了corePoolSize,如果超过了,则表示超过部分的线程要用超时阻塞,一旦超时就回收
// timed用于判断是否需要进行超时控制,当allowCoreThreadTimeOut被设置为ture或者活跃线程数大于核心线程数,则需要进行超时控制
// allowCoreThreadTimeOut默认为false,则表明核心线程不允许超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果工作线程数超过了工作线程的最大限制或者线程超时了,则要修改ctl,工作线程数减1,并且return null
// return null就会导致外层的while循环退出,从而导致线程直接运行结束
//同时满足以下两种情况,则线程池中工作线程数减1并返回nul1:
//case1:当前活动线程数workCount大于最大线程数,或者需要超时控制(timed = true)并且上次从阻塞队列中获取任务超时
//case2:如果有效线程数大于1,或者阻塞队列为空。 我就干掉我自己
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;// 如果 - 1失败,则循环重试
}
//--------------------------从工作队列获取任务------------------------
// 如果需要超时控制,则通过阻塞队列的poll方法进行超时控制,
// 否则,直接获取,如果队列为空,task方法会阻塞直到队列不为空
try {
// 要么超时阻塞,要么无限阻塞
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
// 表示没有超时,在阻塞期间获取到了任务
if (r != null)
return r;//如果拿到任务直接返回执行。
// 超时了,重新进入循环,上面的代码会判断出来当前线程阻塞超时了,最后return null,线程会运行结束
//如果r=nul1,表示超时了,则timeOut设置为true,标记为上一次超时状态
timedOut = true;// (达到了当前工作线程的最大生存时间)
} catch (InterruptedException retry) {
// 从阻塞队列获取任务时,被中断了,也会再次进入循环,此时并不是超时,但是重新进入循环后,会判断线程池的状态
// 如果线程池的状态变成了STOP或者SHUTDOWN,最终也会return null,线程会运行结束
// 但是如果线程池的状态仍然是RUNNING,那当前线程会继续从队列中去获取任务,表示忽略了本次中断
// 只有通过调用线程池的shutdown方法或shutdownNow方法才能真正中断线程池中的线程
timedOut = false;
}
}
}
特别注意:只有通过调用线程池的shutdown方法或shutdownNow方法才能真正中断线程池中的线程。
因为在java,中断一个线程,只是修改了该线程的一个标记,并不是直接kill了这个线程,被中断的线程到底要不要消失,由被中断的线程自己来判断,比如上面代码中,线程遇到了中断异常,它可以选择什么都不做,那线程就会继续进行外层循环,如果选择return,那就退出了循环,后续就会运行结束从而消失。
shutdown方法
调用线程池的shutdown方法,表示要关闭线程池,不接受新任务,但是要把阻塞队列中剩余的任务执行完。
根据前面execute方法的源码,只要线程池的状态不是RUNNING,那么就表示线程池不接受新任务,所以shutdown方法要做的第一件事情就是修改线程池状态。
那第二件事情就是要中断线程池中的工作线程,这些工作线程要么在执行任务,要么在阻塞等待任务:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改ctl,将线程池状态改为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断工作线程
interruptIdleWorkers();
// 空方法,给子类扩展使用
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 调用terminated方法
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历所有正在工作的线程,要么在执行任务,要么在阻塞等待任务
for (Worker w : workers) {
Thread t = w.thread;
// 如果线程没有被中断,并且能够拿到锁,就中断线程
// Worker在执行任务时会先加锁,执行完任务之后会释放锁
// 所以只要这里拿到了锁,就表示线程空出来了,可以中断了
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
还有一个种情况,就是目前所有工作线程都在执行任务,但是阻塞队列中还有剩余任务,那逻辑应该就是这些工作线程执行完当前任务后要继续执行队列中的剩余任务,但是根据我们看到的shutdown方法的逻辑,发现这些工作线程在执行完当前任务后,就会释放锁,那就可能会被中断掉,那队列中剩余的任务怎么办呢?
shutdownNow方法
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改ctl,将线程池状态改为STOP
advanceRunState(STOP);
// 中断工作线程
interruptWorkers();
// 返回阻塞队列中剩余的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 调用terminated方法
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 中断所有工作线程,不管有没有在执行任务
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
// 只要线程没有被中断,那就中断线程,中断的线程虽然也会进入processWorkerExit方法,但是该方法中判断了线程池的状态
// 线程池状态为STOP的情况下,不会再开启新的工作线程了
// 这里getState>-0表示,一个工作线程在创建好,但是还没运行时,这时state为-1,可以看看Worker的构造方法就知道了
// 表示一个工作线程还没开始运行,不能被中断,就算中断也没意义,都还没运行
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
mainLock
在上述源码中,发现很多地方都会用到mainLock,它是线程池中的一把全局锁,主要是用来控制workers集合的并发安全,因为如果没有这把全局锁,就有可能多个线程公用同一个线程池对象,如果一个线程在向线程池提交任务,一个线程在shutdown线程池,如果不做并发控制,那就有可能线程池shutdown了,但是还有工作线程没有被中断,如果1个线程在shutdown,99个线程在提交任务,那么最终就可能导致线程池关闭了,但是线程池中的很多线程都没有停止,仍然在运行,这肯定是不行,所以需要这把全局锁来对workers集合的操作进行并发安全控制。
到此,线程池中的所有核心方法的源码都分析一遍