Transform Java Future into a CompletableFuture

Java 8 introduces CompletableFuture, a new implementation of Future that is composable (includes a bunch of thenXxx methods). I'd like to use this exclusively, but many of the libraries I want to use return only non-composable Future instances.

Is there a way to wrap up a returned Future instances inside of a CompleteableFuture so that I can compose it?

67366 次浏览

有一种方法,但你不会喜欢。下面的方法将 Future<T>转换成 CompletableFuture<T>:

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
// Normally, this should never happen inside ForkJoinPool
Thread.currentThread().interrupt();
// Add the following statement if the future doesn't have side effects
// future.cancel(true);
throw new RuntimeException(e);
}
});
}


private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}


private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}

显然,这种方法的问题在于,对于每个 未来,都会阻塞一个线程,以等待 未来的结果——这与期货的想法相矛盾。在某些情况下,可能会做得更好。但是,一般来说,如果不积极等待 未来的结果,就没有解决方案。

如果要使用的库除了 Future 样式之外还提供了回调样式方法,则可以为其提供一个处理程序,该处理程序完成 CompletableFuture,而不会出现任何额外的线程阻塞。像这样:

    AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
// ...
CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
completableFuture.complete(buffer);
}


@Override
public void failed(Throwable exc, Void attachment) {
completableFuture.completeExceptionally(exc);
}
});
completableFuture.thenApply(...)

没有回调,我认为解决这个问题的另一种方法是使用轮询循环,将所有的 Future.isDone()检查放在一个线程上,然后每当 Future 是 gettable 时调用 complete。

让我提出另一个(希望是更好的)选择: Https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata/concurrent

简而言之,这个想法如下:

  1. 介绍 CompletableTask<V>接口—— CompletionStage<V> + RunnableFuture<V>
  2. 翘曲 ExecutorServicesubmit(...)方法返回 CompletableTask(而不是 Future<V>)
  3. 完成,我们有可运行和可组合的未来。

实现使用了一个替代的 CompletionStage 实现(注意,是 完成阶段而不是 CompletableFuture) :

用法:

J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
.submit( someCallableA )
.thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
.thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c);

我发表了一个小的 未来项目,试图做得比 直截了当的方式更好的答案。

主要思想是只使用一个线程(当然不仅仅是一个自旋循环)来检查内部的所有未来状态,这有助于避免在每个 Future-> CompletableFuture 转换的池中阻塞一个线程。

用法例子:

Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);

建议:

Http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/

但是,基本上:

public class CompletablePromiseContext {
private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();


public static void schedule(Runnable r) {
SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
}
}

还有,完美的承诺:

public class CompletablePromise<V> extends CompletableFuture<V> {
private Future<V> future;


public CompletablePromise(Future<V> future) {
this.future = future;
CompletablePromiseContext.schedule(this::tryToComplete);
}


private void tryToComplete() {
if (future.isDone()) {
try {
complete(future.get());
} catch (InterruptedException e) {
completeExceptionally(e);
} catch (ExecutionException e) {
completeExceptionally(e.getCause());
}
return;
}


if (future.isCancelled()) {
cancel(true);
return;
}


CompletablePromiseContext.schedule(this::tryToComplete);
}
}

例如:

public class Main {
public static void main(String[] args) {
final ExecutorService service = Executors.newSingleThreadExecutor();
final Future<String> stringFuture = service.submit(() -> "success");
final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);


completableFuture.whenComplete((result, failure) -> {
System.out.println(result);
});
}
}

如果您的 Future是对 ExecutorService方法(例如 submit())的调用的结果,那么最简单的方法就是使用 CompletableFuture.runAsync(Runnable, Executor)方法。

Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);

Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);

然后“本地”创建 CompletableFuture

编辑: 追踪@SamMeffd 的评论,@MartinAndersson 更正,如果你想通过 Callable,你需要调用 supplyAsync(),将 Callable<T>转换成 Supplier<T>,例如:

CompletableFuture.supplyAsync(() -> {
try { return myCallable.call(); }
catch (Exception ex) { throw new CompletionException(ex); } // Or return default value
}, myExecutor);

因为 T Callable.call() throws Exception;抛出异常,而 T Supplier.get();不抛出异常,所以必须捕获异常,这样原型才能兼容。

关于异常处理的说明

get()方法没有指定 throws,这意味着它不应该抛出 检查过了异常。但是,可以使用 不受约束异常。CompletableFuture中的代码显示使用了 CompletionException并且未选中(即 RuntimeException) ,因此 catch/throw 将任何异常包装到 CompletionException中。

另外,正如@WeGa 所指出的,您可以使用 handle()方法来处理结果可能引发的异常:

CompletableFuture<T> future = CompletableFuture.supplyAsync(...);
future.handle((ex,res) -> {
if (ex != null) {
// An exception occurred ...
} else {
// No exception was thrown, 'res' is valid and can be handled here
}
});
public static <T> CompletableFuture<T> fromFuture(Future<T> f) {
return CompletableFuture.completedFuture(null).thenCompose(avoid -> {
try {
return CompletableFuture.completedFuture(f.get());
} catch (InterruptedException e) {
return CompletableFuture.failedFuture(e);
} catch (ExecutionException e) {
return CompletableFuture.failedFuture(e.getCause());
}
});
}

主要思想是这样的:

Future<?> future = null;
return CompletableFuture.supplyAsync(future::get);

但是,您将收到来自编译器的一些警告。

所以,这是第一个选择。

Future<?> future = null;
return CompletableFuture.supplyAsync(
()->{
try {
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});

第二个选项,通过强制转换函数接口隐藏 try... catch。

    @FunctionalInterface
public interface MySupplier<T> extends Supplier<T> {
@Override
default T get() {
try {
return getInternal();
} catch (Exception e) {
throw new RuntimeException(e);
}
}


T getInternal() throws Exception;
}




public static void main(String[] args) {
Future<?> future = null;
return CompletableFuture.supplyAsync((MySupplier<?>) future::get);


}




第三个选项,找出一些已经提供了这种功能接口的第三方库。

参见: 抛出异常的 Java8 Lambda 函数?