线程池处理流程

- 判断核心线程池是否已满, 不满则创建新线程执行任务
- 等待队列如果有界, 判断等待队列是否已满, 不满, 则添加任务到等待队列
- 判断最大线程数是否已满, 不满则创建新线程执行任务
- 最大线程数已满, 按照既定策略处理新任务
全参构造及各参数含义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, // 核心线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 核心线程外线程的存活时间 TimeUnit unit, // 存活时间的单位 BlockingQueue<Runnable> workQueue, // 保存等待执行的线程的阻塞队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } }
|
1.workQueue阻塞队列
ArrayBlockingQueue: 是一个基于数组结构的有界阻塞队列, 此队列按 FIFO(先进先出) 原则对元素进行排序.
LinkedBlockingQueue: 一个基于链表结构的阻塞队列,此队列按 FIFO(先进先出) 排序元素, 吞吐量通常要高于ArrayBlockingQueue. 静态工厂方法Executors.newFixedThreadPool()使用了这个队列
SynchronousQueue: 一个不存储元素的阻塞队列. 每个插入操作必须等到另一个线程调用移除操作, 否则插入操作一直处于阻塞状态, 吞吐量通常要高于LinkedBlockingQueue, 静态工厂方法Executors.newCachedThreadPool使用了这个队列.
PriorityBlockingQueue: 一个具有优先级的无限阻塞队列.
2.threadFactory线程工厂
可以使用默认的工厂也可以自定义工厂, 或者使用 google guava 提供的工厂, 可以为线程命名和设置是否为守护线程
1 2 3 4
| // 默认工厂 ThreadFactory threadFactory = Executors.defaultThreadFactory(); // google guava工具提供 ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
|
3.handler线程拒绝策略
当线程池达到最大线程数, 并且队列满了, 新的线程要采取的处理策略.
1.AbortPolicy 拒绝新任务并抛出RejectedExecutionException异常
2.CallerRunsPolicy 直接在调用程序的线程中运行
3.DiscardOldestPolicy 放弃最早的任务, 即队列最前面的任务
4.DiscardPolicy 丢弃, 不处理
Executors初始化线程池的四种方式
这四种初始化线程池的方式, 前三种都是调用 ThreadPoolExecutor 类的构造创建的线程池, 只不过使用的阻塞队列方式不同.
- newFixedThreadPool()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } }
|
- newCachedThreadPool()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class Executors {
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } }
|
3.newSingleThreadExecutor()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class Executors {
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } }
|
4.newSingleThreadExecutor()
1 2 3 4 5 6 7 8 9 10 11 12
| public class Executors {
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } }
|
源码解析
变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } }
|
execute方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
}
|
addWorker方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| public class ThreadPoolExecutor extends AbstractExecutorService {
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } }
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
}
|