原创

Java七种创建线程池的方法以及实现原理

1.newSingleThreadExecutor

    //创建一个单线程的线程池,只有一个工作线程来顺序执行任务!
    @Test
    public void newSingleThreadExecutorT(){
        ExecutorService s1 = Executors.newSingleThreadExecutor();
        for(int i=0;i<3;i++){
            final int k = i;
            s1.execute(new Runnable() {
                public void run() {
                    Thread.currentThread().setName("ThreadName,k->" + k);
                    try {
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getName() + " k->" + k);
                    } catch (InterruptedException e) {

                    }
                }
            });
        }
        try {
            //等待线程池所有执行完
            if(s1.awaitTermination(10, TimeUnit.SECONDS)){
                //s1.shutdownNow();
            }
        }catch (InterruptedException e){
            System.out.printf("00");
        }finally {

        }
        System.out.println("执行完了。。。");
    }

源码分析

 public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
    }

1)底层最终还是使用ThreadPoolExecutor,并设定核心和最大线程数都是1
2)重点看又包了一层FinalizableDelegatedExecutorService下面看在源码

    static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

1)源码可以看出静态类FinalizableDelegatedExecutorService继承了DelegatedExecutorService
2)并且增加了一个finalize方法,这个方法里面会调用shutdown关闭线程池
3)finalize方法会在虚拟机利用垃圾回收清理对象时被调用!
4)其实FinalizableDelegatedExecutorService的实例会自动帮你关闭线程池这样

2.newCachedThreadPool

//创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    //线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程
    @Test
    public void newCachedThreadPoolT(){
        ExecutorService e = Executors.newCachedThreadPool();
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        for(int i=0;i<10;i++) {
            final int k = i;
            e.execute(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(2000);
                        Thread.currentThread().setName("ThreadName->k" + k);
                        System.out.println("线程" + Thread.currentThread().getName() + "执行了!");
                    } catch (InterruptedException e) {

                    }finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        boolean timeoutFlag = false;
        try {
            timeoutFlag = countDownLatch.await(4, TimeUnit.SECONDS);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
            System.out.println(e1.getMessage());
        }
        if(timeoutFlag)
        {
            System.out.println("所有子线程执行完成");
        }
        System.out.println("执行完了。。。");
    }

源码分析

    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
。。。。。
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

1)此线程缓存线程,避免了每次执行任务都要创建、销毁线程的开销
2)还需要注意每个任务执行时间要小于keepAliveTime,不然线程会被销毁,达不到缓存线程的目的
3)源码来看还是使用ThreadPoolExecutor来创建线程池,核心数为0,最大线程数为Integer.MAX_VALUE

3.newFixedThreadPool

    //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
    @Test
    public void newFixedThreadPoolT(){
        ExecutorService e = Executors.newFixedThreadPool(5);
        for(int i=0;i<10;i++) {
            final int k = i;
            e.execute(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(2000);
                        Thread.currentThread().setName("ThreadName->k" + k);
                        System.out.println("线程" + Thread.currentThread().getName() + "执行了!");
                    } catch (InterruptedException e) {

                    }finally {

                    }
                }
            });
        }
    }

1)这个就相对普通了 就是创建了一个核心数和最大线程一样的ThreadPoolExecutor对象
2)线程池中的线程是一个定量,可以严格控制线程的并发量
3)超出一定量的线程被提交时候需在队列中等待

4.newScheduledThreadPool

    @Test
    public static void main(String[] a){
        ScheduledExecutorService exc = Executors.newScheduledThreadPool(5);

        exc.scheduleAtFixedRate(new Runnable() {
            public void run() {
                System.out.println("延迟1秒后在每2秒执行");
            }
        }, 1,2, TimeUnit.SECONDS);

//        exc.schedule(new Runnable() {
//            public void run() {
//                System.out.println("延迟2秒执行");
//            }
//        }, 2, TimeUnit.SECONDS);

    }

1)创建一个定长线程池,支持定时及周期性任务执行

5.newWorkStealingPool

    public static void main(String[] a) throws IOException {
        ExecutorService exc = Executors.newWorkStealingPool();
        System.out.println(Runtime.getRuntime().availableProcessors());
        for(int i=0;i<10;i++) {
            final int k = i;
            exc.execute(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(2000);
                        Thread.currentThread().setName("ThreadName->k" + k);
                        System.out.println("线程" + Thread.currentThread().getName() + "执行了!");
                    } catch (InterruptedException e) {

                    }finally {
                        //countDownLatch.countDown();
                    }
                }
            });
        }
        System.out.println("主线程执行完了。。。");
        System.in.read();
    }

先看下源码

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
    ...
        public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

1)jdk8或者7新增的一种线程池实现
2)源码可以看出来newWorkStealingPool不是基于ThreadPoolExecutor实现的,而底层调用的是ForkJoinPool线程池
3)不过也都是在Executors类中实现
4)看源码中的默认参数,默认使用当前计算机中可用的cpu数量(Runtime.getRuntime().availableProcessors())这很有意思
5)这个新的线程池会创建一个拥有多个任务队列的线程池,可以减少连接数,默认创建当前可用cpu核数量的线程来并行执行,
6)适用于并行处理大耗时的操作

6.newSingleThreadScheduledExecutor


    public static void main(String[] a) throws IOException {
        ScheduledExecutorService s1 = Executors.newSingleThreadScheduledExecutor();
        for(int i=0;i<3;i++){
            final int k = i;
            s1.scheduleWithFixedDelay(new Runnable() {
                public void run() {
                    Thread.currentThread().setName("ThreadName,k->" + k);
                    try {
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getName() + " k->" + k);
                    } catch (InterruptedException e) {

                    }
                }
            },1,3, TimeUnit.SECONDS);
        }
        System.in.read();
    }

1)创建单线程池延迟任务

7.ThreadPoolExecutor

    @Test
    public void ThreadPoolExecutorT(){
        ThreadPoolExecutor e = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        for(int i=0;i<10;i++) {
            final int k = i;
            e.execute(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(2000);
                        Thread.currentThread().setName("ThreadName->k" + k);
                        System.out.println("线程" + Thread.currentThread().getName() + "执行了!");
                    } catch (InterruptedException e) {

                    }finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        boolean timeoutFlag = false;
        try {
            timeoutFlag = countDownLatch.await(4, TimeUnit.SECONDS);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
            System.out.println(e1.getMessage());
        }
        if(timeoutFlag)
        {
            System.out.println("所有子线程执行完成");
        }
        System.out.println("执行完了。。。");
    }

1)从源码看就知道 上面的前4种线程池的创建最后都是用ThreadPoolExecutor,也就是对它的一次包装,jdk给你预先提供好了多种线程池的选择

核心参数

  1. corePoolSize:核心线程数,线程池里一直不会被销毁的线程数量
  2. maximumPoolSize:最大线程数量
  3. keepAliveTime:非核心线程空闲时的存活时间,该参数只有在线程数量 > corePoolSize情况下才有用
  4. unit:keepAlive时间单位
  5. workQueue:工作队列,JDK提供这几种工作队列
    1)ArrayBlockingQueue:基于数组的有界阻塞队列,任务以FIFO顺序排序
    2)LinkedBlockingQueue:基于链表的阻塞队列,任务以FIFO顺序排列,吞吐量优于ArrayBlockingQueue,在使用时需要注意,此阻塞队列在不设置大小的时候,默认的长度是Integer.MAX_VALUE
    3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含任务的排序不是FIFO,而是依据任务的自然排序顺序或者是构造函数的Comparator决定的顺序
    4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的,典型的生产者-消费者模型,它不存储元素,每一次的插入必须要等另一个线程的移除操作完成
    5)threadFactory:创建线程工厂,可以自定义线程工厂给线程池里的线程设置一个自定义线程名

过几天在写ThreadPoolExecutor源码的文章,这样上面的4种线程池原理就会更清晰,如果我忘了在这里更新文章地址就自己在博客搜一下

正文到此结束
本文目录