将 List < CompletableFuture > 转换为 CompletableFuture < List >

我正在尝试将 List<CompletableFuture<X>>转换为 CompletableFuture<List<T>>。这非常有用,因为当您有许多异步任务并且需要获得所有这些任务的结果时。

如果其中任何一个失败了,那么最终的未来也将失败:

public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}

运行它:

ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);

如果其中任何一个失败了,那么它就失败了。即使有一百万种期货,它也能提供预期的产量。我的问题是: 假设有超过5000种期货,如果其中任何一种失败了,我得到的是 StackOverflowError:

线程“ pool-1-thread-2611”java.lang. StackOverfloError 中的异常 在 并发。 CompletableFuture.interalComplete (CompletableFuture.java: 210) 在 并发. CompletableFuture $ThenCompos.run (CompletableFuture.java: 1487) 在 并发。 CompletableFuture.postComplete (CompletableFuture.java: 193) 在 并发。 CompletableFuture.interalComplete (CompletableFuture.java: 210) 在 并发. CompletableFuture $ThenCompos.run (CompletableFuture.java: 1487)

我哪里做错了?

注意: 当任何一个未来失败时,以上返回的未来将失败。接受的答案也应该采取这一点。

46923 次浏览

Use CompletableFuture.allOf(...):

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}

A few comments on your implementation:

Your use of .thenComposeAsync, .thenApplyAsync and .thenCombineAsync is likely not doing what you expect. These ...Async methods run the function supplied to them in a separate thread. So, in your case, you are causing the addition of the new item to the list to run in the supplied executor. There is no need to stuff light-weight operations into a cached thread executor. Do not use thenXXXXAsync methods without a good reason.

Additionally, reduce should not be used to accumulate into mutable containers. Even though it might work correctly when the stream is sequential, it will fail if the stream were to be made parallel. To perform mutable reduction, use .collect instead.

If you want to complete the entire computation exceptionally immediately after the first failure, do the following in your sequence method:

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);


com.forEach(f -> f.whenComplete((t, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
}
}));


return result;

If, additionally, you want to cancel the remaining operations on first failure, add exec.shutdownNow(); right after result.completeExceptionally(ex);. This, of course, assumes that exec only exist for this one computation. If it doesn't, you'll have to loop over and cancel each remaining Future individually.

As Misha has pointed out, you are overusing …Async operations. Further, you are composing a complex chain of operations modelling a dependency which doesn’t reflect your program logic:

  • you create a job x which depends on the first and second job of your list
  • you create a job x+1 which depends on job x and the third job of your list
  • you create a job x+2 which depends on job x+1 and the 4th job of your list
  • you create a job x+5000 which depends on job x+4999 and the last job of your list

Then, canceling (explicitly or due to an exception) this recursively composed job might be performed recursively and might fail with a StackOverflowError. That’s implementation-dependent.

As already shown by Misha, there is a method, allOf which allows you to model your original intention, to define one job which depends on all jobs of your list.

However, it’s worth noting that even that isn’t necessary. Since you are using an unbounded thread pool executor, you can simply post an asynchronous job collecting the results into a list and you are done. Waiting for the completion is implied by asking for the result of each job anyway.

ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
.mapToObj(x -> CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
() -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
executorService);

Using methods for composing dependent operations are important, when the number of threads is limited and the jobs may spawn additional asynchronous jobs, to avoid having waiting jobs stealing threads from jobs which have to complete first, but neither is the case here.

In this specific case one job simply iterating over this large number of prerequisite jobs and waiting if necessary may be more efficient than modelling this large number of dependencies and having each job to notify the dependent job about the completion.

An example sequence operation using thenCombine on CompletableFuture

public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){


CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());


BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList =
(acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});


BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;


return com.stream()
.reduce(identity,
combineToList,
combineLists);


}
}

If you don't mind using 3rd party libraries cyclops-react (I am the author) has a set of utility methods for CompletableFutures (and Optionals, Streams etc)

  List<CompletableFuture<String>> listOfFutures;


CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);

You can get Spotify's CompletableFutures library and use allAsList method. I think it's inspired from Guava's Futures.allAsList method.

public static <T> CompletableFuture<List<T>> allAsList(
List<? extends CompletionStage<? extends T>> stages) {

And here is a simple implementation if you don't want to use a library:

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()])
).thenApply(ignored ->
futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);
}

In addition to Spotify Futures library you might try my code locate here: https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/tascalate/concurrent/CompletionStages.java (has a dependencies to other classes in same package)

It implements a logic to return "at least N out of M" CompletionStage-s with a policy how much errors it's allowed to tolerate. There are convinient methods for all/any cases, plus cancellation policy for the remaining futures, plus the code deals with CompletionStage-s (interface) rather than CompletableFuture (concrete class).

Javaslang has a very convenient Future API. It also allows to make a future of collection out of a collection of futures.

List<Future<String>> listOfFutures = ...
Future<Seq<String>> futureOfList = Future.sequence(listOfFutures);

See http://static.javadoc.io/io.javaslang/javaslang/2.0.5/javaslang/concurrent/Future.html#sequence-java.lang.Iterable-

To add upto the accepted answer by @Misha, it can be further expanded as a collector:

 public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}

Now you can:

Stream<CompletableFuture<Integer>> stream = Stream.of(
CompletableFuture.completedFuture(1),
CompletableFuture.completedFuture(2),
CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());

Disclaimer: This will not completely answer the initial question. It will lack the "fail all if one fails" part. However, I can't answer the actual, more generic question, because it was closed as a duplicate of this one: Java 8 CompletableFuture.allOf(...) with Collection or List. So I will answer here:

How to convert List<CompletableFuture<V>> to CompletableFuture<List<V>> using Java 8's stream API?

Summary: Use the following:

private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());


BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
futureValue.thenCombine(futureList, (value, list) -> {
List<V> newList = new ArrayList<>(list.size() + 1);
newList.addAll(list);
newList.add(value);
return newList;
});


BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
List<V> newList = new ArrayList<>(list1.size() + list2.size());
newList.addAll(list1);
newList.addAll(list2);
return newList;
});


return listOfFutures.stream().reduce(identity, accumulator, combiner);
}

Example usage:

List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
.mapToObj(i -> loadData(i, executor)).collect(toList());


CompletableFuture<List<String>> futureList = sequence(listOfFutures);

Complete Example:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.stream.IntStream;


import static java.util.stream.Collectors.toList;


public class ListOfFuturesToFutureOfList {


public static void main(String[] args) {
ListOfFuturesToFutureOfList test = new ListOfFuturesToFutureOfList();
test.load(10);
}


public void load(int numThreads) {
final ExecutorService executor = Executors.newFixedThreadPool(numThreads);


List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
.mapToObj(i -> loadData(i, executor)).collect(toList());


CompletableFuture<List<String>> futureList = sequence(listOfFutures);


System.out.println("Future complete before blocking? " + futureList.isDone());


// this will block until all futures are completed
List<String> data = futureList.join();
System.out.println("Loaded data: " + data);


System.out.println("Future complete after blocking? " + futureList.isDone());


executor.shutdown();
}


public CompletableFuture<String> loadData(int dataPoint, Executor executor) {
return CompletableFuture.supplyAsync(() -> {
ThreadLocalRandom rnd = ThreadLocalRandom.current();


System.out.println("Starting to load test data " + dataPoint);


try {
Thread.sleep(500 + rnd.nextInt(1500));
} catch (InterruptedException e) {
e.printStackTrace();
}


System.out.println("Successfully loaded test data " + dataPoint);


return "data " + dataPoint;
}, executor);
}


private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());


BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
futureValue.thenCombine(futureList, (value, list) -> {
List<V> newList = new ArrayList<>(list.size() + 1);
newList.addAll(list);
newList.add(value);
return newList;
});


BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
List<V> newList = new ArrayList<>(list1.size() + list2.size());
newList.addAll(list1);
newList.addAll(list2);
return newList;
});


return listOfFutures.stream().reduce(identity, accumulator, combiner);
}


}

Your task could be done easily like following,

final List<CompletableFuture<Module> futures =...
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).join();