在项目中有很多情况下需要使用多线程的方式执行任务,那么有什么比较好的方法来判断多线程任务执行完毕,我们再进行下一步的操作,之前的话我会设置等待时间来等待
javaexecutor.awaitTermination(30, TimeUnit.MINUTES);
java
package com.example.demo.component.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CountThreadTask {
    //创建一个最大线程数100的线程池
    private static ExecutorService es =
            new ThreadPoolExecutor(3, 100, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(100));
    public static void main(String[] args) throws Exception {
        for (int i = 1; i <= 10; i++) {
            int finalI = i;
            es.execute(() -> { //提交执行
                System.out.println("线程" + finalI + "执行完成!");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        ThreadPoolExecutor threadPool = ((ThreadPoolExecutor) es);
        System.out.println("线程池任务总数量:"+threadPool.getTaskCount());
        System.out.println("---------线程池开始执行-----------");
        while (true) {
            if (threadPool.getTaskCount() == threadPool.getCompletedTaskCount()) {
                System.out.println("---------线程池执行完了-----------");
                break;
            }
            //间隔2s查询一次
            Thread.sleep(2000);
            System.out.println("线程池还未执行完,敬请等待!已完成的任务数量:"+threadPool.getCompletedTaskCount());
        }
        
    }
}
getTaskCount():返回线程池计划执行的任务总数。注意:由于任务和线程的状态可能在计算过程中动态变化,因此该方法返回值只是一个近似值,不是精准的。
getCompletedTaskCount():返回线程池中已完成的任务数,注意:跟getTaskCount()方法一致,该方法返回值也是一个近似值。
getPoolSize():返回线程池当前的线程数量。
getActiveCount():返回当前线程池中正在执行任务的线程数量。
方式总结:
由于getTaskCount() 与 getCompletedTaskCount()方法返回值都是一个近似值而不是精确值,固结果可能有一定的偏差,这也是该方式的一大缺点。
java
package com.example.demo.component.threadPool;
import java.util.concurrent.*;
/**
 * 使用 FutrueTask 等待线程池执行完全部任务
 */
public class FutureTaskTask {
    
    //创建一个最大线程数100的线程池
    private static ExecutorService es =
            new ThreadPoolExecutor(4, 100, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(100));
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建任务1
        FutureTask<Integer> task1 = new FutureTask<>(() -> {
            System.out.println("---Task 1 开始执行---");
            Thread.sleep(2000);
            System.out.println("------Task 1 执行结束------");
            return 1;
        });
        // 创建任务2
        FutureTask<Integer> task2 = new FutureTask<>(() -> {
            System.out.println("---Task 2 开始执行---");
            Thread.sleep(3000);
            System.out.println("------Task 2 执行结束------");
            return 2;
        });
        // 创建任务3
        FutureTask<Integer> task3 = new FutureTask<>(() -> {
            System.out.println("---Task 3 开始执行---");
            Thread.sleep(1000);
            System.out.println("------Task 3 执行结束------");
            return 3;
        });
        // 创建任务4
        FutureTask<Integer> task4 = new FutureTask<>(() -> {
            System.out.println("---Task 4 开始执行---");
            Thread.sleep(500);
            System.out.println("------Task 4 执行结束------");
            return 4;
        });
        // 提交4个任务给线程池
        es.submit(task1);
        es.submit(task2);
        es.submit(task3);
        es.submit(task4);
        // 等待所有任务执行完毕
        task1.get();
        task2.get();
        task3.get();
        task4.get();
        //执行完毕
        System.out.println("线程池执行完了!");
    }
}
CountDownLatch身为同步工具类,作用之一可协调多个线程之间的同步,或者说接通线程之间的通信(而不是互斥)。CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后再继续执行。其中,计数器初始值为全线程的数量,当每一个线程完成自己任务后,计数器的值就会自动减1;当计数器的值 = 0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。
java
package com.example.demo.component.threadPool;
import java.util.concurrent.*;
/**
 * 使用CountDownLatch
 */
public class CountDownLatchTask {
    //创建一个最大线程数100的线程池
    private static ExecutorService es =
            new ThreadPoolExecutor(1, 100, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(100));
    public static void main(String[] args) throws Exception {
        //计数器,判断线程是否执行结束
        //初始值为10
        CountDownLatch taskLatch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            es.execute(() -> { //提交执行
                try {
                    //模拟线程执行方法,执行1s
                    Thread.sleep(1000);
                    taskLatch.countDown();
                    System.out.println("当前计数器值为:" + taskLatch.getCount());     
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        //当前线程阻塞,等待计数器置为0
        taskLatch.await();
        System.out.println("线程池执行完了!");
    }
}
方式总结:
虽然使用CountDownLatch可达到统计线程是否被执行完,该方式使用起来代码简洁优雅,不需要对线程池进行操作。但由于CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。
CyclicBarrier 和 CountDownLatch 类似,你可以把它理解为一个可以重复使用的循环计数器,CyclicBarrier 可调用 reset() 方法将自己重置到初始状态,这是与CountDownLatch不一样的特性,那具体如何使用CyclicBarrier达到统计线程池所有线程都被执行完的需求吧
javapackage com.example.demo.component.threadPool;
import java.util.Random;
import java.util.concurrent.*;
/**
 * 使用CyclicBarrier
 */
public class CyclicBarrierTask {
    //创建一个最大线程数100的线程池
    private static ExecutorService es =
            new ThreadPoolExecutor(5, 100, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(100));
    public static void main(String[] args) throws InterruptedException {
        //任务总数
        final int taskCount = 5;
        //循环计数器
        CyclicBarrier cyclicBarrier = new CyclicBarrier(taskCount, new Runnable() {
            @Override
            public void run() {
                // 线程池执行完
                System.out.println("---------线程池执行完了-----------");
            }
        });
        // 添加任务
        for (int i = 0; i < taskCount; i++) {
            final int finalI = i;
            es.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        //随机休眠1-4秒
                        TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                        System.out.println("任务" + finalI + "执行完成");
                        // 线程执行完
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}
使用线程池的 isTerminated() 方法,在执行 shutdown() 进行线程池的关闭后, 隔间调用isTerminated()判断线程池中的所有任务是否已经完成即可。那具体如何使用 isTerminated() 方法达到统计线程池所有线程都被执行完的需求吧
javapackage com.example.demo.component.threadPool;
import java.util.concurrent.*;
/**
 * 使用isTerminated()
 */
public class IsTerminatedTask {
    //创建一个最大线程数100的线程池
    private static ExecutorService es =
            new ThreadPoolExecutor(4, 100, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(100));
    public static void main(String[] args) throws Exception {
        for (int i = 1; i <= 10; i++) {
            int finalI = i;
            es.execute(() -> { //提交执行
                System.out.println("线程" + finalI + "执行完成!");
                try {
                    //模拟线程执行过程
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        //关闭线程池
        es.shutdown();
        //隔间1s判断是否执行完了,如果所有任务在关闭后完成,返回true。
        while (!es.isTerminated()) {
            Thread.sleep(1000);
        }
        System.out.println("---------线程池执行完了-----------");
    }
}
本文作者:Weee
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!