Java CompletableFuture 的 Apply 和 ApplicyAsync 之间的区别是什么?

假设我有以下代码:

CompletableFuture<Integer> future
= CompletableFuture.supplyAsync( () -> 0);

thenApply个案:

future.thenApply( x -> x + 1 )
.thenApply( x -> x + 1 )
.thenAccept( x -> System.out.println(x));

这里输出为2,现在对于 thenApplyAsync:

future.thenApplyAsync( x -> x + 1 )   // first step
.thenApplyAsync( x -> x + 1 )   // second step
.thenAccept( x -> System.out.println(x)); // third step

我在这个 博客中读到,每个 thenApplyAsync在一个单独的线程中执行,并且“在同一时间”(这意味着在 thenApplyAsyncs完成之前跟随 thenApplyAsyncs开始) ,如果是这样,如果第一步没有完成,那么第二步的输入参数值是多少?

如果不采取第二步措施,第一步的结果将何去何从? 第三步将采取哪一步的结果?

如果第二步必须等待第一步的结果,那么 Async的意义是什么?

这里 x-> x + 1只是为了说明这一点,我想知道的是在计算时间很长的情况下。

30443 次浏览

以下是关于 CompletableFuture's thenApplyAsync的文档说明:

返回一个新的 CompletionStage,当该阶段完成时 正常情况下,使用此阶段的默认异步执行 执行工具,将此阶段的结果作为 提供的功能。

因此,thenApplyAsync必须等待之前的 thenApplyAsync's结果:

在您的情况下,首先执行同步工作,然后执行异步工作。因此,第二个是异步的并不重要,因为它只有在同步工作完成之后才会启动。

我们换一下。在某些情况下,首先打印“异步结果: 2”,在某些情况下,首先打印“同步结果: 2”。这里有一个区别,因为调用1和2都可以异步运行,在单独的线程上调用1,在其他线程上调用2,这些线程可能是主线程。

CompletableFuture<Integer> future
= CompletableFuture.supplyAsync(() -> 0);


future.thenApplyAsync(x -> x + 1) // call 1
.thenApplyAsync(x -> x + 1)
.thenAccept(x -> System.out.println("async result: " + x));


future.thenApply(x -> x + 1) // call 2
.thenApply(x -> x + 1)
.thenAccept(x -> System.out.println("sync result:" + x));

这种差异与负责运行代码的 Executor有关。CompletableFuture上的每个操作符通常有3个版本。

  1. thenApply(fn)-在调用它的 CompleteableFuture所定义的线程上运行 fn,因此通常无法知道在哪里执行它。如果结果已经可用,它可能会立即执行。
  2. thenApplyAsync(fn)-在环境定义的执行器上运行 fn,不管环境如何。对于 CompletableFuture,这通常是 ForkJoinPool.commonPool()
  3. exec上运行 fn

最终的结果是一样的,但是调度行为取决于方法的选择。

你错误地引用了文章的例子,所以你错误地应用了文章的结论。我在你的问题中看到了两个问题:

.then___()的正确用法是什么

在您引用的两个示例中(本文中没有引用) ,第二个函数必须等待第一个函数完成。无论何时调用 a.then___(b -> ...),输入 b都是 a的结果,必须等待 a完成,无论您是否使用名为 Async的方法。这篇文章的结论不适用,因为你错误地引用了它。

本文中的例子实际上是

CompletableFuture<String> receiver = CompletableFuture.supplyAsync(this::findReceiver);


receiver.thenApplyAsync(this::sendMsg);
receiver.thenApplyAsync(this::sendMsg);

注意,thenApplyAsync都应用于 receiver,而不是链接在同一个语句中。这意味着一旦 receiver完成,这两个函数都可以以未指定的顺序启动。(任何顺序的假设都取决于实现。)

更明确地说:

a.thenApply(b).thenApply(c);表示顺序是 a完成,然后 b开始,b完成,然后 c开始。
a b c之间的顺序而言,a.thenApplyAsync(b).thenApplyAsync(c);的表现将与上述完全相同。

a.thenApply(b); a.thenApply(c);意味着 a完成,然后 bc可以开始,在任何顺序。bc不必相互等待。
就订单而言,a.thenApplyAync(b); a.thenApplyAsync(c);的工作原理是相同的。

在阅读下面的内容之前,你应该了解上面的内容。以上涉及到异步编程,没有它,您将无法正确使用 API。下面是关于线程管理的,您可以使用它来优化程序并避免性能陷阱。但是,如果不能正确地编写程序,就无法优化程序。


标题: Java CompletableFuture 的 thenApplythenApplyAsync之间的区别?

我必须指出,编写 JSR 的人一定混淆了技术术语“异步编程”,并且选择了现在让新手和老手都感到困惑的名字。首先,从这些方法的契约来看,在 thenApplyAsync中没有比 thenApply更异步的东西了。

两者之间的区别与函数在哪个线程上运行有关。提供给 thenApply 可以在任何线程上运行的函数

  1. 呼叫 complete
  2. 在同一实例上调用 thenApply

thenApplyAsync的两个过载

  1. 使用默认的 Executor(也称为线程池) ,或者
  2. 使用提供的 Executor

从中可以看出,对于 thenApply,运行时承诺最终将使用您不能控制的某个执行器来运行函数。如果希望控制线程,请使用异步变体。

如果函数是轻量级的,那么运行函数的线程并不重要。

如果函数的 CPU 占用率很高,那么不希望将它留给运行时。如果运行时选择网络线程来运行您的函数,网络线程就不能花时间处理网络请求,从而导致网络请求在队列中等待更长时间,并使服务器无法响应。在这种情况下,您希望将 thenApplyAsync与您自己的线程池一起使用。


有趣的事实: 异步! = 线程

thenApply/thenApplyAsync和它们的对应物 thenCompose/thenComposeAsynchandle/handleAsyncthenAccept/thenAcceptAsync都是异步的!这些函数的异步特性与异步操作 thenApplyAsync0调用 completecompleteExceptionally有关。这个想法来自 Javascript,它的确是异步的,但不是多线程的。

第二步(即计算)总是在第一步之后执行。

如果第二步必须等待第一步的结果,那么异步的意义是什么?

在这种情况下,Async 意味着您可以保证方法将快速返回,并且计算将在不同的线程中执行。

当调用 thenApply(不使用异步)时,就没有这样的保证。在这种情况下,计算可以同步执行,即在调用 thenApply的同一个线程中,如果 CompletableFuture 在调用该方法时已经完成。但是计算也可以由完成未来的线程或在同一 CompletableFuture上调用方法的其他线程异步执行。这个问题的答案是: https://stackoverflow.com/a/46062939/1235217详细解释了 Apply 能做什么和不能保证什么。

那么什么时候应该使用 thenApply,什么时候应该使用 thenApplyAsync呢? 我使用下面的经验法则:

  • 非异步: 仅当任务非常小且非阻塞时,因为在这种情况下,我们不关心哪个可能的线程执行它
  • (通常有一个显式的执行器作为参数) : 用于所有其他任务

thenApplyAsyncthenApply中,传递给这些方法的 Consumer<? super T> action将被异步调用,并且不会阻塞指定使用者的线程。

不同之处在于哪个线程负责调用方法 Consumer#accept(T t):

考虑如下 AsyncHttpClient调用: 注意下面打印的线程名。我希望这能让你明白其中的区别:

// running in the main method
// public static void main(String[] args) ....


CompletableFuture<Response> future =
asyncHttpClient.prepareGet(uri).execute().toCompletableFuture();


log.info("Current Thread " + Thread.currentThread().getName());


//Prints "Current Thread main"

thenApply将使用完成未来的同一个线程。

//will use the dispatcher threads from the asyncHttpClient to call `consumer.apply`
//The thread that completed the future will be blocked by the execution of the method `Consumer#accept(T t)`.
future.thenApply(myResult -> {
log.info("Applier Thread " + Thread.currentThread().getName());
return myResult;
})


//Prints: "Applier Thread httpclient-dispatch-8"

thenApplyAsync将使用执行程序池中的 a 线程。

//will use the threads from the CommonPool to call `consumer.accept`
//The thread that completed the future WON'T be blocked by the execution of the method `Consumer#accept(T t)`.
future.thenApplyAsync(myResult -> {
log.info("Applier Thread " + Thread.currentThread().getName());
return myResult;
})


//Prints: "Applier Thread ForkJoinPool.commonPool-worker-7"

将阻塞主线程。

//If called, `.get()` may block the main thread if the CompletableFuture is not completed.
future.get();

结论

方法 thenApplyAsync中的 Async后缀意味着完成未来的线程不会被执行 Consumer#accept(T t) method阻塞。

thenApplyAsyncthenApply的用法取决于您是否希望阻塞完成未来的线程。