最佳答案
我正在尝试将 List<CompletableFuture<X>>
转换为 CompletableFuture<List<T>>
。这非常有用,因为当您有许多异步任务并且需要获得所有这些任务的结果时。
如果其中任何一个失败了,那么最终的未来也将失败:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
运行它:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
如果其中任何一个失败了,那么它就失败了。即使有一百万种期货,它也能提供预期的产量。我的问题是: 假设有超过5000种期货,如果其中任何一种失败了,我得到的是 StackOverflowError
:
线程“ pool-1-thread-2611”java.lang. StackOverfloError 中的异常 在 并发。 CompletableFuture.interalComplete (CompletableFuture.java: 210) 在 并发. CompletableFuture $ThenCompos.run (CompletableFuture.java: 1487) 在 并发。 CompletableFuture.postComplete (CompletableFuture.java: 193) 在 并发。 CompletableFuture.interalComplete (CompletableFuture.java: 210) 在 并发. CompletableFuture $ThenCompos.run (CompletableFuture.java: 1487)
我哪里做错了?
注意: 当任何一个未来失败时,以上返回的未来将失败。接受的答案也应该采取这一点。