什么时候应该在 ExecutorService 上使用 CompletionService?

我刚在 这篇博文中找到了 CompletionService。但是,这并没有真正展示 CompletionService 相对于标准 ExecutorService 的优势。同样的代码可以用。那么,CompletionService 什么时候有用呢?

你能给出一个简短的代码示例,使其清晰明了吗?例如,此代码示例仅显示不需要 CompletionService 的地方(= 等效于 ExecutorService)

    ExecutorService taskExecutor = Executors.newCachedThreadPool();
//        CompletionService<Long> taskCompletionService =
//                new ExecutorCompletionService<Long>(taskExecutor);
Callable<Long> callable = new Callable<Long>() {
@Override
public Long call() throws Exception {
return 1L;
}
};


Future<Long> future = // taskCompletionService.submit(callable);
taskExecutor.submit(callable);


while (!future.isDone()) {
// Do some work...
System.out.println("Working on something...");
}
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
68655 次浏览

I think the javadoc best answers the question of when the CompletionService is useful in a way an ExecutorService isn't.

A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks.

Basically, this interface allows a program to have producers which create and submit tasks (and even examine the results of those submissions) without knowing about any other consumers of the results of those tasks. Meanwhile, consumers which are aware of the CompletionService could poll for or take results without being aware of the producers submitting the tasks.

For the record, and I could be wrong because it is rather late, but I am fairly certain that the sample code in that blog post causes a memory leak. Without an active consumer taking results out of the ExecutorCompletionService's internal queue, I'm not sure how the blogger expected that queue to drain.

Basically you use a CompletionService if you want to execute multiple tasks in parallel and then work with them in their completion order. So, if I execute 5 jobs, the CompletionService will give me the first one that that finishes. The example where there is only a single task confers no extra value over an Executor apart from the ability to submit a Callable.

Omitting many details:

  • ExecutorService = incoming queue + worker threads
  • CompletionService = incoming queue + worker threads + output queue

With ExecutorService, once you have submitted the tasks to run, you need to manually code for efficiently getting the results of the tasks completed.

With CompletionService, this is pretty much automated. The difference is not very evident in the code you have presented because you are submitting just one task. However, imagine you have a list of tasks to be submitted. In the example below, multiple tasks are submitted to the CompletionService. Then, instead of trying to find out which task has completed (to get the results), it just asks the CompletionService instance to return the results as they become available.

public class CompletionServiceTest {


class CalcResult {
long result ;


CalcResult(long l) {
result = l;
}
}


class CallableTask implements Callable<CalcResult> {
String taskName ;
long  input1 ;
int input2 ;


CallableTask(String name , long v1 , int v2 ) {
taskName = name;
input1 = v1;
input2 = v2 ;
}


public CalcResult call() throws Exception {
System.out.println(" Task " + taskName + " Started -----");
for(int i=0;i<input2 ;i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
System.out.println(" Task " + taskName + " Interrupted !! ");
e.printStackTrace();
}
input1 += i;
}
System.out.println(" Task " + taskName + " Completed @@@@@@");
return new CalcResult(input1) ;
}


}


public void test(){
ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor);


int submittedTasks = 5;
for (int i=0;i< submittedTasks;i++) {
taskCompletionService.submit(new CallableTask (
String.valueOf(i),
(i * 10),
((i * 10) + 10  )
));
System.out.println("Task " + String.valueOf(i) + "subitted");
}
for (int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++) {
try {
System.out.println("trying to take from Completion service");
Future<CalcResult> result = taskCompletionService.take();
System.out.println("result for a task availble in queue.Trying to get()");
// above call blocks till atleast one task is completed and results availble for it
// but we dont have to worry which one


// process the result here by doing result.get()
CalcResult l = result.get();
System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result));


} catch (InterruptedException e) {
// Something went wrong with a task submitted
System.out.println("Error Interrupted exception");
e.printStackTrace();
} catch (ExecutionException e) {
// Something went wrong with the result
e.printStackTrace();
System.out.println("Error get() threw exception");
}
}
}
}

First of all, if we do not want to waste processor time, we will not use

while (!future.isDone()) {
// Do some work...
}

We must use

service.shutdown();
service.awaitTermination(14, TimeUnit.DAYS);

The bad thing about this code is that it will shut down ExecutorService. If we want to continue work with it (i.e. we have some recursicve task creation), we have two alternatives: invokeAll or ExecutorService.

invokeAll will wait untill all tasks will be complete. ExecutorService grants us ability to take or poll results one by one.

And, finily, recursive example:

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);


while (Tasks.size() > 0) {
for (final Task task : Tasks) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return DoTask(task);
}
});
}


try {
int taskNum = Tasks.size();
Tasks.clear();
for (int i = 0; i < taskNum; ++i) {
Result result = completionService.take().get();
if (result != null)
Tasks.add(result.toTask());
}
} catch (InterruptedException e) {
//  error :(
} catch (ExecutionException e) {
//  error :(
}
}

See it by yourself at run time,try to implement both solutions (Executorservice and Completionservice) and you'll see how different they behave and it will be more clear on when to use one or the other. There is an example here if you want http://rdafbn.blogspot.co.uk/2013/01/executorservice-vs-completionservice-vs.html

Let's say you have 5 long running task(callable task) and you have submitted those task to executer service. Now imagine you don't want to wait for all 5 task to compete instead you want to do some sort of processing on these task if any one completes. Now this can be done either by writing polling logic on future objects or use this API.

If the task producer is not interested in the results and it is another component's responsibility to process results of asynchronous task executed by executor service, then you should use CompletionService. It helps you in separating task result processor from task producer. See example http://www.zoftino.com/java-concurrency-executors-framework-tutorial

there is another advantage of using completionservice: Performance

when you call future.get(), you are spin waiting:

from java.util.concurrent.CompletableFuture

  private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}

when you have a long-running task, this will be a disaster for performance.

with completionservice, once the task is done, it's result will be enqueued and you can poll the queue with lower performance overhand.

completionservice achieve this by using wrap task with a done hook.

java.util.concurrent.ExecutorCompletionService

    private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
package com.barcap.test.test00;


import java.util.concurrent.*;


/**
* Created by Sony on 25-04-2019.
*/
public class ExecutorCompletest00 {


public static void main(String[] args) {


ExecutorService exc= Executors.newFixedThreadPool( 10 );
ExecutorCompletionService executorCompletionService= new ExecutorCompletionService( exc );


for (int i=1;i<10;i++){
Task00 task00= new Task00( i );
executorCompletionService.submit( task00 );
}
for (int i=1;i<20;i++){
try {
Future<Integer> future= (Future <Integer>) executorCompletionService.take();
Integer inttest=future.get();
System.out.println(" the result of completion service is "+inttest);


break;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}

=======================================================

package com.barcap.test.test00;


import java.util.*;
import java.util.concurrent.*;


/**
* Created by Sony on 25-04-2019.
*/
public class ExecutorServ00 {


public static void main(String[] args) {
ExecutorService executorService=Executors.newFixedThreadPool( 9 );
List<Future> futList= new ArrayList <>(  );
for (int i=1;i<10;i++) {
Future result= executorService.submit( new Task00( i ) );
futList.add( result );
}


for (Future<Integer> futureEach :futList ){
try {
Integer inm=   futureEach.get();


System.out.println("the result of future executorservice is "+inm);
break;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}

===========================================================

package com.barcap.test.test00;


import java.util.concurrent.*;


/**
* Created by Sony on 25-04-2019.
*/
public class Task00 implements Callable<Integer> {


int i;


public Task00(int i) {
this.i = i;
}


@Override
public Integer call() throws Exception {
System.out.println(" the current thread is "+Thread.currentThread().getName()  +" the result should be "+i);
int sleepforsec=100000/i;
Thread.sleep( sleepforsec );
System.out.println(" the task complted for "+Thread.currentThread().getName()  +" the result should be "+i);






return i;
}
}

======================================================================

difference of logs for executor completion service: the current thread is pool-1-thread-1 the result should be 1 the current thread is pool-1-thread-2 the result should be 2 the current thread is pool-1-thread-3 the result should be 3 the current thread is pool-1-thread-4 the result should be 4 the current thread is pool-1-thread-6 the result should be 6 the current thread is pool-1-thread-5 the result should be 5 the current thread is pool-1-thread-7 the result should be 7 the current thread is pool-1-thread-9 the result should be 9 the current thread is pool-1-thread-8 the result should be 8 the task complted for pool-1-thread-9 the result should be 9 teh result is 9 the task complted for pool-1-thread-8 the result should be 8 the task complted for pool-1-thread-7 the result should be 7 the task complted for pool-1-thread-6 the result should be 6 the task complted for pool-1-thread-5 the result should be 5 the task complted for pool-1-thread-4 the result should be 4 the task complted for pool-1-thread-3 the result should be 3

the task complted for pool-1-thread-2 the result should be 2

the current thread is pool-1-thread-1 the result should be 1 the current thread is pool-1-thread-3 the result should be 3 the current thread is pool-1-thread-2 the result should be 2 the current thread is pool-1-thread-5 the result should be 5 the current thread is pool-1-thread-4 the result should be 4 the current thread is pool-1-thread-6 the result should be 6 the current thread is pool-1-thread-7 the result should be 7 the current thread is pool-1-thread-8 the result should be 8 the current thread is pool-1-thread-9 the result should be 9 the task complted for pool-1-thread-9 the result should be 9 the task complted for pool-1-thread-8 the result should be 8 the task complted for pool-1-thread-7 the result should be 7 the task complted for pool-1-thread-6 the result should be 6 the task complted for pool-1-thread-5 the result should be 5 the task complted for pool-1-thread-4 the result should be 4 the task complted for pool-1-thread-3 the result should be 3 the task complted for pool-1-thread-2 the result should be 2 the task complted for pool-1-thread-1 the result should be 1 the result of future is 1

=======================================================

for executorservice the result will only be avialable after all tasks complted.

executor completionservice any result avilable make that return.

assuming you execute a tasks in parallel and you save the Future results in a list:

The practical main difference between ExecutorService and CompletionService is:

ExecutorService get() will try to retrieve the results in the submitted order waiting for completion.

CompletionService take() + get() will try to retrieve the results in the completion order disregarding the submission order.