等待未来的名单

我有一个方法,它返回期货的List

List<Future<O>> futures = getFutures();

现在,我想要等待,直到所有的future都成功地完成处理,或者由future返回输出的任何任务抛出异常。即使一个任务抛出异常,等待其他未来也没有意义。

简单的方法就是

wait() {


For(Future f : futures) {
try {
f.get();
} catch(Exception e) {
//TODO catch specific exception
// this future threw exception , means somone could not do its task
return;
}
}
}

但这里的问题是,例如,如果第4个future抛出异常,那么我将不必要地等待前3个future可用。

如何解决这个问题?倒计时插销有什么帮助吗?我无法使用未来isDone,因为java文档说

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
236740 次浏览

你可以使用CompletionService在期货准备好后立即接收期货,如果其中一个抛出异常,则取消处理。就像这样:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService =
new ExecutorCompletionService<SomeResult>(executor);


//4 tasks
for(int i = 0; i < 4; i++) {
completionService.submit(new Callable<SomeResult>() {
public SomeResult call() {
...
return result;
}
});
}


int received = 0;
boolean errors = false;


while(received < 4 && !errors) {
Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
try {
SomeResult result = resultFuture.get();
received ++;
... // do something with the result
}
catch(Exception e) {
//log
errors = true;
}
}

我认为您可以进一步改进,取消任何仍在执行的任务,如果其中一个抛出错误。

你可以使用ExecutorCompletionService。文档甚至为您的确切用例提供了一个示例:

相反,假设你想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备就绪时取消所有其他任务:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {
}
}
} finally {
for (Future<Result> f : futures)
f.cancel(true);
}


if (result != null)
use(result);
}

这里需要注意的重要事情是ec .take()将获得第一个完成任务,而不仅仅是第一个提交的任务。因此,您应该按照完成执行(或抛出异常)的顺序获取它们。

也许这会有帮助(没有什么会被原始线程取代,耶!) 我建议用一个分开的线程运行每个Future对象(它们并行),然后当其中一个得到错误时,它只会通知管理器(Handler类)

class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
thisThread=Thread.currentThread();
List<Future<Object>> futures = getFutures();
trds=new Thread[futures.size()];
for (int i = 0; i < trds.length; i++) {
RunTask rt=new RunTask(futures.get(i), this);
trds[i]=new Thread(rt);
}
synchronized (this) {
for(Thread tx:trds){
tx.start();
}
}
for(Thread tx:trds){
try {tx.join();
} catch (InterruptedException e) {
System.out.println("Job failed!");break;
}
}if(!failed){System.out.println("Job Done");}
}


private List<Future<Object>> getFutures() {
return null;
}


public synchronized void cancelOther(){if(failed){return;}
failed=true;
for(Thread tx:trds){
tx.stop();//Deprecated but works here like a boss
}thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}

我不得不说上面的代码会出错(没有检查),但我希望我可以解释解决方案。请试一试。

CompletionService将使用.submit()方法获取你的Callables,你可以使用.take()方法检索计算出的期货。

您一定不能忘记的一件事是通过调用.shutdown()方法来终止ExecutorService。此外,只有在保存了对执行器服务的引用时才能调用此方法,因此请确保保留一个引用。

示例代码-对于要并行处理的固定数量的工作项:

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());


CompletionService<YourCallableImplementor> completionService =
new ExecutorCompletionService<YourCallableImplementor>(service);


ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();


for (String computeMe : elementsToCompute) {
futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;


while(received < elementsToCompute.size()) {
Future<YourCallableImplementor> resultFuture = completionService.take();
YourCallableImplementor result = resultFuture.get();
received ++;
}
//important: shutdown your ExecutorService
service.shutdown();

示例代码-对于要并行处理的动态数量的工作项:

public void runIt(){
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();


//Initial workload is 8 threads
for (int i = 0; i < 9; i++) {
futures.add(completionService.submit(write.new CallableImplementor()));
}
boolean finished = false;
while (!finished) {
try {
Future<CallableImplementor> resultFuture;
resultFuture = completionService.take();
CallableImplementor result = resultFuture.get();
finished = doSomethingWith(result.getResult());
result.setResult(null);
result = null;
resultFuture = null;
//After work package has been finished create new work package and add it to futures
futures.add(completionService.submit(write.new CallableImplementor()));
} catch (InterruptedException | ExecutionException e) {
//handle interrupted and assert correct thread / work packet count
}
}


//important: shutdown your ExecutorService
service.shutdown();
}


public class CallableImplementor implements Callable{
boolean result;


@Override
public CallableImplementor call() throws Exception {
//business logic goes here
return this;
}


public boolean getResult() {
return result;
}


public void setResult(boolean result) {
this.result = result;
}
}

如果你正在使用Java 8,那么你可以用CompletableFuture和CompletableFuture.allOf更容易做到这一点,它只在所有提供的CompletableFutures完成后应用回调。

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.


public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);


return CompletableFuture.allOf(cfs)
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}

如果你正在使用Java 8并且不想操作__abc0,我已经写了一个工具来使用流检索List<Future<T>>的结果。关键是你被禁止map(Future::get)当它抛出。

public final class Futures
{


private Futures()
{}


public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
{
return new FutureCollector<>();
}


private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
{
private final List<Throwable> exceptions = new LinkedList<>();


@Override
public Supplier<Collection<T>> supplier()
{
return LinkedList::new;
}


@Override
public BiConsumer<Collection<T>, Future<T>> accumulator()
{
return (r, f) -> {
try
{
r.add(f.get());
}
catch (InterruptedException e)
{}
catch (ExecutionException e)
{
exceptions.add(e.getCause());
}
};
}


@Override
public BinaryOperator<Collection<T>> combiner()
{
return (l1, l2) -> {
l1.addAll(l2);
return l1;
};
}


@Override
public Function<Collection<T>, List<T>> finisher()
{
return l -> {


List<T> ret = new ArrayList<>(l);
if (!exceptions.isEmpty())
throw new AggregateException(exceptions, ret);


return ret;
};


}


@Override
public Set<java.util.stream.Collector.Characteristics> characteristics()
{
return java.util.Collections.emptySet();
}
}

这需要一个像c#一样工作的AggregateException

public class AggregateException extends RuntimeException
{
/**
*
*/
private static final long serialVersionUID = -4477649337710077094L;


private final List<Throwable> causes;
private List<?> successfulElements;


public AggregateException(List<Throwable> causes, List<?> l)
{
this.causes = causes;
successfulElements = l;
}


public AggregateException(List<Throwable> causes)
{
this.causes = causes;
}


@Override
public synchronized Throwable getCause()
{
return this;
}


public List<Throwable> getCauses()
{
return causes;
}


public List<?> getSuccessfulElements()
{
return successfulElements;
}


public void setSuccessfulElements(List<?> successfulElements)
{
this.successfulElements = successfulElements;
}


}

该组件的作用与c#的的任务。WaitAll完全相同。我正在研究一个与CompletableFuture.allOf相同的变体(相当于Task.WhenAll)

我这样做的原因是我正在使用Spring的ListenableFuture并且不想移植到CompletableFuture,尽管这是一种更标准的方式

在Java 8中使用CompletableFuture

    // Kick of multiple, asynchronous lookups
CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");


// Wait until they are all done
CompletableFuture.allOf(page1,page2,page3).join();


logger.info("--> " + page1.get());
 /**
* execute suppliers as future tasks then wait / join for getting results
* @param functors a supplier(s) to execute
* @return a list of results
*/
private List getResultsInFuture(Supplier<?>... functors) {
CompletableFuture[] futures = stream(functors)
.map(CompletableFuture::supplyAsync)
.collect(Collectors.toList())
.toArray(new CompletableFuture[functors.length]);
CompletableFuture.allOf(futures).join();
return stream(futures).map(a-> {
try {
return a.get();
} catch (InterruptedException | ExecutionException e) {
//logger.error("an error occurred during runtime execution a function",e);
return null;
}
}).collect(Collectors.toList());
};

我有一个实用工具类,它包含这些:

@FunctionalInterface
public interface CheckedSupplier<X> {
X get() throws Throwable;
}


public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
return () -> {
try {
return supplier.get();
} catch (final Throwable checkedException) {
throw new IllegalStateException(checkedException);
}
};
}

一旦你有了这些,使用静态导入,你可以像这样简单地等待所有的期货:

futures.stream().forEach(future -> uncheckedSupplier(future::get).get());

你也可以像这样收集他们的所有结果:

List<MyResultType> results = futures.stream()
.map(future -> uncheckedSupplier(future::get).get())
.collect(Collectors.toList());

只是重温我的旧帖子,注意到你有另一个悲伤:

但这里的问题是,例如,如果第4个future抛出异常,那么我将不必要地等待前3个future可用。

在这种情况下,简单的解决方案是并行执行:

futures.stream().parallel()
.forEach(future -> uncheckedSupplier(future::get).get());

这样,第一个异常虽然不会停止future,但会中断foreach语句,就像在串行示例中一样,但由于所有异常都是并行等待的,因此您不必等待前3个异常完成。

如果你想合并一个列表的CompletableFutures,你可以这样做:

List<CompletableFuture<Void>> futures = new ArrayList<>();
// ... Add futures to this ArrayList of CompletableFutures


// CompletableFuture.allOf() method demand a variadic arguments
// You can use this syntax to pass a List instead
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()]));


// Wait for all individual CompletableFuture to complete
// All individual CompletableFutures are executed in parallel
allFutures.get();

更多关于Future &CompletableFuture,有用链接:
1. Future: https://www.baeldung.com/java-future
2. CompletableFuture: https://www.baeldung.com/java-completablefuture
3.CompletableFuture: https://www.callicoder.com/java-8-completablefuture-tutorial/ < / p >

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;


public class Stack2 {
public static void waitFor(List<Future<?>> futures) {
List<Future<?>> futureCopies = new ArrayList<Future<?>>(futures);//contains features for which status has not been completed
while (!futureCopies.isEmpty()) {//worst case :all task worked without exception, then this method should wait for all tasks
Iterator<Future<?>> futureCopiesIterator = futureCopies.iterator();
while (futureCopiesIterator.hasNext()) {
Future<?> future = futureCopiesIterator.next();
if (future.isDone()) {//already done
futureCopiesIterator.remove();
try {
future.get();// no longer waiting
} catch (InterruptedException e) {
//ignore
//only happen when current Thread interrupted
} catch (ExecutionException e) {
Throwable throwable = e.getCause();// real cause of exception
futureCopies.forEach(f -> f.cancel(true));//cancel other tasks that not completed
return;
}
}
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);


Runnable runnable1 = new Runnable (){
public void run(){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
}
};
Runnable runnable2 = new Runnable (){
public void run(){
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
}
}
};




Runnable fail = new Runnable (){
public void run(){
try {
Thread.sleep(1000);
throw new RuntimeException("bla bla bla");
} catch (InterruptedException e) {
}
}
};


List<Future<?>> futures = Stream.of(runnable1,fail,runnable2)
.map(executorService::submit)
.collect(Collectors.toList());


double start = System.nanoTime();
waitFor(futures);
double end = (System.nanoTime()-start)/1e9;
System.out.println(end +" seconds");


}
}

这就是我用来在未来列表上等待特定时间的方法。我认为它更干净。

CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
// Some parallel work
for (Something tp : somethings) {
completionService.submit(() -> {
try {
work(something)
} catch (ConnectException e) {
} finally {
countDownLatch.countDown();
}
});
}
try {
if (!countDownLatch.await(secondsToWait, TimeUnit.SECONDS)){
}
} catch (InterruptedException e) {
}

基于guava的解决方案可以使用Futures.FutureCombiner实现。

下面是在javadoc中给出的代码示例:

 final ListenableFuture<Instant> loginDateFuture =
loginService.findLastLoginDate(username);
final ListenableFuture<List<String>> recentCommandsFuture =
recentCommandsService.findRecentCommands(username);
ListenableFuture<UsageHistory> usageFuture =
Futures.whenAllSucceed(loginDateFuture, recentCommandsFuture)
.call(
() ->
new UsageHistory(
username,
Futures.getDone(loginDateFuture),
Futures.getDone(recentCommandsFuture)),
executor);

有关更多信息,请参阅用户指南的ListenableFutureExplained部分。

如果你对它的工作原理感到好奇,我建议你看看这部分源代码:AggregateFuture.java#L127-L186