博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java线程池的原理学习(三)
阅读量:7164 次
发布时间:2019-06-29

本文共 7329 字,大约阅读时间需要 24 分钟。

接上文:

ThreadPoolExecutor深入剖析

线程池的五种状态

ThreadPoolExecutor 类中将线程状态( runState)分为了以下五种:

RUNNING:可以接受新任务并且处理进入队列中的任务

SHUTDOWN:不接受新任务,但是仍然执行队列中的任务
STOP:不接受新任务也不执行队列中的任务
TIDYING:所有任务中止,队列为空,进入该状态下的任务会执行 terminated()方法
TERMINATEDterminated()方法执行完成后进入该状态

状态之间的转换

  • RUNNING -> SHUTDOWN

调用了 shutdown()方法,可能是在 finalize()方法中被隐式调用

  • (RUNNING or SHUTDOWN) -> STOP

调用 shutdownNow()

  • SHUTDOWN -> TIDYING

当队列和线程池都为空时

  • STOP -> TIDYING

线程池为空时

  • TIDYING -> TERMINATED

terminated()方法执行完成

线程池状态实现

如果查看 ThreadPoolExecutor的源码,会发现开头定义了这几个变量来代表线程状态和活动线程的数量:

//原子变量    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    private static final int COUNT_BITS = Integer.SIZE - 3;    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    // runState is stored in the high-order bits    private static final int RUNNING    = -1 << COUNT_BITS;    private static final int SHUTDOWN   =  0 << COUNT_BITS;    private static final int STOP       =  1 << COUNT_BITS;    private static final int TIDYING    =  2 << COUNT_BITS;    private static final int TERMINATED =  3 << COUNT_BITS;

这个类中将二进制数分为了两部分,高位代表线程池状态( runState),低位代表活动线程数( workerCount), CAPACITY代表最大的活动线程数,为2^29-1,下面为了更直观的看到这些数我做了些打印:

public class Test1 {    public static void main(String[] args) {            final int COUNT_BITS = Integer.SIZE - 3;            final int CAPACITY   = (1 << COUNT_BITS) - 1;            final int RUNNING    = -1 << COUNT_BITS;            final int SHUTDOWN   =  0 << COUNT_BITS;            final int STOP       =  1 << COUNT_BITS;            final int TIDYING    =  2 << COUNT_BITS;            final int TERMINATED =  3 << COUNT_BITS;                        System.out.println(Integer.toBinaryString(CAPACITY));            System.out.println(Integer.toBinaryString(RUNNING));            System.out.println(Integer.toBinaryString(SHUTDOWN));            System.out.println(Integer.toBinaryString(STOP));            System.out.println(Integer.toBinaryString(TIDYING));            System.out.println(Integer.toBinaryString(TERMINATED));    }}

输出:

1111111111111111111111111111111100000000000000000000000000000010000000000000000000000000000010000000000000000000000000000001100000000000000000000000000000

打印的时候会将高位0省略

可以看到,第一行代表线程容量,后面5行提取高3位得到:

111 - RUNNING000 - SHUTDOWN001 - STOP010 - TIDYING011 - TERMINATED

分别对应5种状态,可以看到这样定义之后,只需要通过简单的移位操作就可以进行状态的转换。

重要方法

execute方法:

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();               int c = ctl.get();        /**分三步执行         * 如果workerCount
=corePoolSize,判断线程池是否处于运行状态,再将任务加入队列 * */ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //用于double check //如果线程池处于非运行态,则将任务从缓存队列中删除 if (! isRunning(recheck) && remove(command)) reject(command); //拒绝任务 else if (workerCountOf(recheck) == 0) //如果活动线程数为0,则创建新线程 addWorker(null, false); } //如果线程池不处于RUNNING状态,或者workQueue满了,则执行以下代码 else if (!addWorker(command, false)) reject(command); }

可以看到,在类中使用了 Work类来代表任务,下面是 Work类的简单摘要:

private final class Worker extends AbstractQueuedSynchronizer        implements Runnable    {        /** Thread this worker is running in.  Null if factory fails. */        final Thread thread;        /** Initial task to run.  Possibly null. */        Runnable firstTask;        /** Per-thread task counter */        volatile long completedTasks;        /**         * Creates with given first task and thread from ThreadFactory.         * @param firstTask the first task (null if none)         */        Worker(Runnable firstTask) {            this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        }        /** Delegates main run loop to outer runWorker  */        public void run() {            runWorker(this);        }        ...

Work类实现了 Runnable接口,使用了线程工厂创建线程,使用 runWork方法来运行任务

创建新线程时用到了 addWorker()方法:

/**     * 检查在当前线程池状态和限制下能否创建一个新线程,如果可以,会相应改变workerCount,     * 每个worker都会运行他们的firstTask     * @param firstTask 第一个任务     * @param core true使用corePoolSize作为边界,false使用maximumPoolSize     * @return false 线程池关闭或者已经具备关闭的条件或者线程工厂没有创建新线程     */
private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // 只有当rs < SHUTDOWN才有可能接受新任务            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                return false;            for (;;) {                int wc = workerCountOf(c); //工作线程数量                if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize)) //不合法则返回                    return false;                if (compareAndIncrementWorkerCount(c)) //将工作线程数量+1                    break retry;                c = ctl.get();  // Re-read ctl                if (runStateOf(c) != rs) //判断线程池状态有没有改变,改变了则进行外循环,否则只进行内循环                    continue retry;                // else CAS failed due to workerCount change; retry inner loop            }        }        //创建新线程        Worker w = new Worker(firstTask);        Thread t = w.thread;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            //再次检查状态,防止ThreadFactory创建线程失败或者状态改变了            int c = ctl.get();            int rs = runStateOf(c);            if (t == null ||                (rs >= SHUTDOWN &&                 ! (rs == SHUTDOWN &&                    firstTask == null))) {                decrementWorkerCount();  //减少线程数量                tryTerminate();//尝试中止线程                return false;            }            workers.add(w);//添加到工作线程Set集合中            int s = workers.size();            if (s > largestPoolSize)                largestPoolSize = s;        } finally {            mainLock.unlock();        }        t.start();//执行任务       //状态变成了STOP(调用了shutdownNow方法)        if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())            t.interrupt();        return true;    }

再看 Work中runWork方法:

final void runWorker(Worker w) {        Runnable task = w.firstTask;        w.firstTask = null;        boolean completedAbruptly = true;//线程是否异常中止        try {            //先取firstTask,再从队列中取任务直到为null            while (task != null || (task = getTask()) != null) {                w.lock();                clearInterruptsForTaskRun();                try {                    beforeExecute(w.thread, task);//实现钩子方法                    Throwable thrown = null;                    try {                        task.run();//运行任务                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        afterExecute(task, thrown);//实现钩子方法                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;//成功运行,说明没有异常中止        } finally {            processWorkerExit(w, completedAbruptly);        }    }

转载地址:http://rhtwm.baihongyu.com/

你可能感兴趣的文章