线程池处理流程

线程池处理流程

  1. 判断核心线程池是否已满, 不满则创建新线程执行任务
  2. 等待队列如果有界, 判断等待队列是否已满, 不满, 则添加任务到等待队列
  3. 判断最大线程数是否已满, 不满则创建新线程执行任务
  4. 最大线程数已满, 按照既定策略处理新任务

    全参构造及各参数含义

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize, // 核心线程数
    int maximumPoolSize, // 最大线程数
    long keepAliveTime, // 核心线程外线程的存活时间
    TimeUnit unit, // 存活时间的单位
    BlockingQueue<Runnable> workQueue, // 保存等待执行的线程的阻塞队列
    ThreadFactory threadFactory, // 线程工厂
    RejectedExecutionHandler handler) { // 线程拒绝策略
    if (corePoolSize < 0 ||
    maximumPoolSize <= 0 ||
    maximumPoolSize < corePoolSize ||
    keepAliveTime < 0)
    throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
    null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
    }
    // 省略 . . .
    }

1.workQueue阻塞队列
ArrayBlockingQueue: 是一个基于数组结构的有界阻塞队列, 此队列按 FIFO(先进先出) 原则对元素进行排序.
LinkedBlockingQueue: 一个基于链表结构的阻塞队列,此队列按 FIFO(先进先出) 排序元素, 吞吐量通常要高于ArrayBlockingQueue. 静态工厂方法Executors.newFixedThreadPool()使用了这个队列
SynchronousQueue: 一个不存储元素的阻塞队列. 每个插入操作必须等到另一个线程调用移除操作, 否则插入操作一直处于阻塞状态, 吞吐量通常要高于LinkedBlockingQueue, 静态工厂方法Executors.newCachedThreadPool使用了这个队列.
PriorityBlockingQueue: 一个具有优先级的无限阻塞队列.

2.threadFactory线程工厂
可以使用默认的工厂也可以自定义工厂, 或者使用 google guava 提供的工厂, 可以为线程命名和设置是否为守护线程

1
2
3
4
// 默认工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// google guava工具提供
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

3.handler线程拒绝策略
当线程池达到最大线程数, 并且队列满了, 新的线程要采取的处理策略.
1.AbortPolicy 拒绝新任务并抛出RejectedExecutionException异常
2.CallerRunsPolicy 直接在调用程序的线程中运行
3.DiscardOldestPolicy 放弃最早的任务, 即队列最前面的任务
4.DiscardPolicy 丢弃, 不处理

Executors初始化线程池的四种方式

这四种初始化线程池的方式, 前三种都是调用 ThreadPoolExecutor 类的构造创建的线程池, 只不过使用的阻塞队列方式不同.

  1. newFixedThreadPool()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Executors {

/**
* 固定线程池
* 核心线程数 = 最大线程数
* 超时时间为0
* LinkedBlockingQueue无界队列, 会持续等待
* 使用默认拒绝策略 AbortPolicy
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
}
  1. newCachedThreadPool()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Executors {
/**
* 无界线程池
* 核心线程数0 最大线程数 (2³¹ -1)
* 超时时间 60秒
* SynchronousQueue不存储元素的阻塞队列
* 线程空闲时间超过60秒, 会自动释放资源, 提交任务如果没有空闲线程, 则会创建新线程
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
}

3.newSingleThreadExecutor()

1
2
3
4
5
6
7
8
9
10
11
12
13
14

public class Executors {
/**
* 创建只有 1个线程的线程池
* 如果线程异常, 则创建一个新的线程继续执行任务
*
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
}

4.newSingleThreadExecutor()

1
2
3
4
5
6
7
8
9
10
11
12

public class Executors {

/**
* ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor 类
* 可以在指定时间周期内执行任务
*
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}

源码解析

变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* ctx 为原子类型的变量, 有两个概念
* workerCount, 表示有效的线程数
* runState, 表示线程状态, 是否正在运行, 关闭等
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 容量 2²⁹-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits 线程池的五中状态
// 即高3位为111, 接受新任务并处理排队任务
private static final int RUNNING = -1 << COUNT_BITS;
// 即高3位为000, 不接受新任务, 但处理排队任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 即高3位为001, 不接受新任务, 不处理排队任务, 并中断正在进行的任务
private static final int STOP = 1 << COUNT_BITS;
// 即高3位为010, 所有任务都已终止, 工作线程为0, 线程转换到状态TIDYING, 将运行terminate()钩子方法
private static final int TIDYING = 2 << COUNT_BITS;
// 即高3位为011, 标识terminate()已经完成
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl 用来计算线程的方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
}

execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ThreadPoolExecutor extends AbstractExecutorService {

public void execute(Runnable command) {
// 空则抛出异常
if (command == null)
throw new NullPointerException();
// 获取当前线程池的状态
int c = ctl.get();
// 计算工作线程数 并判断是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// addWorker提交任务, 提交成功则结束
if (addWorker(command, true))
return;
// 提交失败再次获取当前状态
c = ctl.get();
}
// 判断线程状态, 并插入队列, 失败则移除
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取状态
int recheck = ctl.get();
// 如果状态不是RUNNING, 并移除失败
if (! isRunning(recheck) && remove(command))
// 调用拒绝策略
reject(command);
// 如果工作线程为0 则调用 addWorker
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 提交任务失败 走拒绝策略
else if (!addWorker(command, false))
reject(command);
}

}

addWorker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 检查任务是否可以提交
*
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 外层循环
for (;;) {
// 获取当前状态
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary. 检查线程池是否关闭
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层循环
for (;;) {
int wc = workerCountOf(c);
// 工作线程大于容量 或者大于 核心或最大线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 线程数增加, 成功则调到外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 失败则再次获取线程状态
c = ctl.get(); // Re-read ctl
// 不相等则重新走外层循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/**
* 创建新worker 开始新线程
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判断线程是否存活, 已存活抛出非法异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 设置包含池中的所有工作线程。仅在持有mainLock时访问 workers是 HashSet 集合
// private final HashSet<Worker> workers = new HashSet<Worker>();
workers.add(w);
int s = workers.size();
// 设置池最大大小, 并将 workerAdded设置为 true
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 解锁
mainLock.unlock();
}
// 添加成功 开始启动线程 并将 workerStarted 设置为 true
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 启动线程失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
/**
* 启动线程失败, 加锁
* 移除线程, 并减少线程总数
* 转换状态
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}

}