线程池原理及源码解析

线程池处理流程

线程池处理流程

  1. 判断核心线程池是否已满, 不满则创建新线程执行任务
  2. 等待队列如果有界, 判断等待队列是否已满, 不满, 则添加任务到等待队列
  3. 判断最大线程数是否已满, 不满则创建新线程执行任务
  4. 最大线程数已满, 按照既定策略处理新任务

    全参构造及各参数含义

    public class ThreadPoolExecutor extends AbstractExecutorService {
     public ThreadPoolExecutor(int corePoolSize, // 核心线程数
                               int maximumPoolSize, // 最大线程数
                               long keepAliveTime, // 核心线程外线程的存活时间
                               TimeUnit unit, // 存活时间的单位
                               BlockingQueue<Runnable> workQueue, // 保存等待执行的线程的阻塞队列
                               ThreadFactory threadFactory, // 线程工厂
                               RejectedExecutionHandler handler) { // 线程拒绝策略
         if (corePoolSize < 0 ||
             maximumPoolSize <= 0 ||
             maximumPoolSize < corePoolSize ||
             keepAliveTime < 0)
             throw new IllegalArgumentException();
         if (workQueue == null || threadFactory == null || handler == null)
             throw new NullPointerException();
         this.acc = System.getSecurityManager() == null ?
                 null :
                 AccessController.getContext();
         this.corePoolSize = corePoolSize;
         this.maximumPoolSize = maximumPoolSize;
         this.workQueue = workQueue;
         this.keepAliveTime = unit.toNanos(keepAliveTime);
         this.threadFactory = threadFactory;
         this.handler = handler;
     }
     // 省略 . . .
    }
    
    1.workQueue阻塞队列
    ArrayBlockingQueue: 是一个基于数组结构的有界阻塞队列, 此队列按 FIFO(先进先出) 原则对元素进行排序.
    LinkedBlockingQueue: 一个基于链表结构的阻塞队列,此队列按 FIFO(先进先出) 排序元素, 吞吐量通常要高于ArrayBlockingQueue. 静态工厂方法Executors.newFixedThreadPool()使用了这个队列
    SynchronousQueue: 一个不存储元素的阻塞队列. 每个插入操作必须等到另一个线程调用移除操作, 否则插入操作一直处于阻塞状态, 吞吐量通常要高于LinkedBlockingQueue, 静态工厂方法Executors.newCachedThreadPool使用了这个队列.
    PriorityBlockingQueue: 一个具有优先级的无限阻塞队列.

2.threadFactory线程工厂
可以使用默认的工厂也可以自定义工厂, 或者使用 google guava 提供的工厂, 可以为线程命名和设置是否为守护线程

// 默认工厂
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// google guava工具提供
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

3.handler线程拒绝策略
当线程池达到最大线程数, 并且队列满了, 新的线程要采取的处理策略.
1.AbortPolicy 拒绝新任务并抛出RejectedExecutionException异常
2.CallerRunsPolicy 直接在调用程序的线程中运行
3.DiscardOldestPolicy 放弃最早的任务, 即队列最前面的任务
4.DiscardPolicy 丢弃, 不处理

Executors初始化线程池的四种方式

这四种初始化线程池的方式, 前三种都是调用 ThreadPoolExecutor 类的构造创建的线程池, 只不过使用的阻塞队列方式不同.

  1. newFixedThreadPool()
public class Executors {

    /**
     * 固定线程池
     * 核心线程数 = 最大线程数
     * 超时时间为0
     * LinkedBlockingQueue无界队列, 会持续等待
     * 使用默认拒绝策略 AbortPolicy
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
}
  1. newCachedThreadPool()
public class Executors {
    /**
     * 无界线程池
     * 核心线程数0 最大线程数 (2³¹ -1)
     * 超时时间 60秒
     * SynchronousQueue不存储元素的阻塞队列
     * 线程空闲时间超过60秒, 会自动释放资源, 提交任务如果没有空闲线程, 则会创建新线程
     */
    public static ExecutorService newCachedThreadPool() {
           return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                         60L, TimeUnit.SECONDS,
                                         new SynchronousQueue<Runnable>());
    }
}

3.newSingleThreadExecutor()


public class Executors {
    /**
     * 创建只有 1个线程的线程池
     * 如果线程异常, 则创建一个新的线程继续执行任务
     *
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
}

4.newSingleThreadExecutor()


public class Executors {

    /**
     * ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor 类
     * 可以在指定时间周期内执行任务
     *
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
          return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}

源码解析

变量

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
    * ctx 为原子类型的变量, 有两个概念
    * workerCount, 表示有效的线程数
    * runState, 表示线程状态, 是否正在运行, 关闭等
    */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 容量 2²⁹-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    // runState is stored in the high-order bits 线程池的五中状态
    // 即高3位为111, 接受新任务并处理排队任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 即高3位为000, 不接受新任务, 但处理排队任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 即高3位为001, 不接受新任务, 不处理排队任务, 并中断正在进行的任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 即高3位为010, 所有任务都已终止, 工作线程为0, 线程转换到状态TIDYING, 将运行terminate()钩子方法
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 即高3位为011, 标识terminate()已经完成
    private static final int TERMINATED =  3 << COUNT_BITS;
    // Packing and unpacking ctl 用来计算线程的方法
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
}

execute方法

public class ThreadPoolExecutor extends AbstractExecutorService {

    public void execute(Runnable command) {
        // 空则抛出异常
        if (command == null)
            throw new NullPointerException();
        // 获取当前线程池的状态
        int c = ctl.get();
        // 计算工作线程数 并判断是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // addWorker提交任务, 提交成功则结束
            if (addWorker(command, true))
                return;
            // 提交失败再次获取当前状态
            c = ctl.get();
        }
        // 判断线程状态, 并插入队列, 失败则移除
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次获取状态
            int recheck = ctl.get();
            // 如果状态不是RUNNING, 并移除失败
            if (! isRunning(recheck) && remove(command))
                // 调用拒绝策略
                reject(command);
            // 如果工作线程为0 则调用 addWorker
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 提交任务失败 走拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

}

addWorker方法

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 检查任务是否可以提交
     *
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 外层循环
        for (;;) {
            // 获取当前状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary. 检查线程池是否关闭
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            // 内层循环
            for (;;) {
                int wc = workerCountOf(c);
                // 工作线程大于容量 或者大于 核心或最大线程数
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS 线程数增加, 成功则调到外层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 失败则再次获取线程状态
                c = ctl.get();  // Re-read ctl
                // 不相等则重新走外层循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        /**
         * 创建新worker 开始新线程
         */
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 判断线程是否存活, 已存活抛出非法异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //  设置包含池中的所有工作线程。仅在持有mainLock时访问 workers是 HashSet 集合
                        //  private final HashSet<Worker> workers = new HashSet<Worker>();
                        workers.add(w);
                        int s = workers.size();
                        // 设置池最大大小, 并将 workerAdded设置为 true
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 解锁
                    mainLock.unlock();
                }
                // 添加成功 开始启动线程 并将 workerStarted 设置为 true
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 启动线程失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    /**
     * 启动线程失败, 加锁
     * 移除线程, 并减少线程总数
     * 转换状态
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

}

   版权声明

文章作者: liuzhihang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源!

评论