CompletionService是什么?
Callable+Future可以实现多个task并行执行,但是如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果。
CompletionService的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。
他只有一个实现类:ExecutorCompletionService
CompletionService原理
内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。
CompletionService的应用场景
当需要批量提交异步任务的时候建议你使用CompletionService:CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。
CompletionService能够让异步任务的执行结果有序化:先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如ForkingCluster这样的需求。
线程池隔离:CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
CompletionService的使用方式
importjava.util.Random;importjava.util.concurrent.Callable;importjava.util.concurrent.CompletionService;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.ExecutorCompletionService;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.TimeUnit;publicclassCompletionServiceTest{publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{//创建线程池ExecutorServiceexecutor=Executors.newFixedThreadPool(5);//创建CompletionServiceCompletionServiceIntegercs=newExecutorCompletionService(executor);//开启5个任务for(inti=0;i5;i++){cs.submit(newCompletionTask());}//将询价结果异步保存到数据库for(inti=0;i5;i++){System.out.println(Thread.currentThread().getName()+"获取到任务结果:"+cs.take().get());}}}/***自定义的任务*/classCompletionTaskimplementsCallableInteger{
OverridepublicIntegercall()throwsException{//获取随机的世界intrandomSleepTime=newRandom().nextInt(10)*;//开始执行System.out.println(Thread.currentThread().getName()+"开始执行,当前任务需要等待"+randomSleepTime+"毫秒");//线程等待,模拟正在执行TimeUnit.MILLISECONDS.sleep(randomSleepTime);//结束执行System.out.println(Thread.currentThread().getName()+"结束执行,当前任务需要等待"+randomSleepTime+"毫秒");//返回他等待的时间returnrandomSleepTime;}}运行结果:等待时间短的线程优先返回结果
CompletionServiceTest执行结果.png
CompletionService的构造方法源码分析
/***传入线程池的构造方法*/publicExecutorCompletionService(Executorexecutor){//线程池为null,直接抛出异常if(executor==null)thrownewNullPointerException();//把使用的线程池赋值this.executor=executor;//判断是不是AbstractExecutorService的子类this.aes=(executorinstanceofAbstractExecutorService)?(AbstractExecutorService)executor:null;//默认只是用LinkedBlockingQueue进行结果的存放this.