原创

Java线程池实现原理与源码解析(JDK1.8)

1.介绍

已经是第三次看线程池源码了,前2次是为了面试哈哈

2.从理解ThreadPoolExecutor类的成员变量ctl以及相关运算开始

  • 这个一定要理解,不然下面的可能会看不懂
    //ctl是一个Integer的原子变量,构造参数下面在看
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    "Integer.SIZE=32,COUNTBITS=29"
    private static final int COUNT_BITS = Integer.SIZE - 3;

    "//536870911 二进制(00011111111111111111111111111111)线程的最大个数(低29位)"
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    "线程池状态"
    "//-536870912二进制(11100000000000000000000000000000)高三位"
    private static final int RUNNING    = -1 << COUNT_BITS;

    "//0        二进制(00000000000000000000000000000000)高三位"
    private static final int SHUTDOWN   =  0 << COUNT_BITS;

    "//536870912 二进制(100000000000000000000000000000)高三位"
    private static final int STOP       =  1 << COUNT_BITS;

    "//1073741824二进制(01000000000000000000000000000000)高三位"
    private static final int TIDYING    =  2 << COUNT_BITS;

    "//1610612736二进制(01100000000000000000000000000000)高三位"
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    "//ctl相关的云运行以及对应线程池的属性"

    "//获取高3位(运行状态)"
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

    "//获取低29位(线程个数)"
    private static int workerCountOf(int c)  { return c & CAPACITY; }

    "//计算ctl新值(包含了线程状态和线程个数)"
    private static int ctlOf(int rs, int wc) { return rs | wc; }

3.execute提交任务方法开始分析

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

根据if判断分析提交任务方法分为三步

  1. 先从ctl中取出线程状态和线程个数的组合值
  2. 从workerCountOf方法中取出低29位即线程个数,如果小于核心线程数则向workers里面新增一个核心线程执行该任务
  3. 利用isRunning方法(c < SHUTDOWN ),判断是否在运行,如果处于RUNNING则添加任务到任务队列,添加成功则继续执行
  4. if体里面再次检查recheck变量,如果当前线程池状态不是RUNNING则从队列中删除任务,并执行拒绝策略reject(command),因为添加任务到任务队列之后,线程池的状态已经发生变化,所以第二次校验
  5. workerCountOf(recheck) == 0 如果当前线程池为空则添加一个线程
  6. !addWorker(command, false) 如果队列满了,则新增线程,新增失败执行拒绝策略

看到这里有没有觉得线程池代码设计真是精秒,不愧是在自由环境下写的源码.....

4.分析addWorker方法

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            "1.检查队列是否为空尽在必要的时候"
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            "2.循环 利用CAS原理增加线程"
            for (;;) {
                int wc = workerCountOf(c);
                "3.判断是否超过线程最大个数和设定个数 (CAPACITY值为536870911)"
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                "4.利用CAS原子增加线程个数,同时只会增加一个线程"    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                "5.如果增加失败,判断状态是否发生变化"
                if (runStateOf(c) != rs)
                    "6.如果当前状态不等于之前的rs 则调到外层循环retry处,重新获取线程池状态,否则继续循环CAS增加线程"
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        "7.CAS增加线程成功之后"
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            "8.将待执行的任务线程包装成Worker,获取一个工作线程this.thread即w.thread"
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                "9.加锁再次检查"
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    "10.再次检查线程池状态,避免获取锁之前调用了shutdown接口"
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        "11.添加一个任务线程进workers(HashSet)"    
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    "12.必要的释放锁"
                    mainLock.unlock();
                }
                "13.如果添加成功则调用start()启动"
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
  1. 整体看分为2部分,
  2. 第一部分双重循环的目的是通过CAS操作增加线程
  3. 第二部门增加成功把并发安全的任务添加到一个HashSet执行任务
正文到此结束
本文目录