CompletableFuture | then Apply vs then Compose

我不能理解 thenApplythenCompose之间的区别。

那么,有人能提供一个有效的用例吗?

来自 Java 文档:

thenApply(Function<? super T,? extends U> fn)

返回一个新的 CompletionStage,当这个阶段结束时 在正常情况下,使用此阶段的结果作为 提供的功能。

thenCompose(Function<? super T,? extends CompletionStage<U>> fn)

返回一个新的 CompletionStage,当这个阶段结束时 通常,在这个阶段执行,作为所提供的 功能。

我得到了 thenCompose的第2个参数扩展了 CompletionStage,而 thenApply没有。

有人能提供一个例子,在哪种情况下,我必须使用 thenApply和时 thenCompose

88041 次浏览

如果具有同步映射函数,则使用 thenApply

CompletableFuture<Integer> future =
CompletableFuture.supplyAsync(() -> 1)
.thenApply(x -> x+1);

如果具有异步映射函数(即返回 CompletableFuture的函数) ,则使用 thenCompose。然后它将直接返回一个带有结果的未来,而不是一个嵌套的未来。

CompletableFuture<Integer> future =
CompletableFuture.supplyAsync(() -> 1)
.thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1));

Java9中更新的 Javadocs 可能有助于更好地理解它:

那就申请吧

<U> CompletionStage<U> thenApply​(Function<? super T,? extends U> fn)

返回一个新的 CompletionStage,当这个阶段结束时 在正常情况下,使用此阶段的结果作为 提供的功能。

该方法类似于 Optional.mapStream.map

请参阅 CompletionStage文档以了解相关规则 非凡的成就。

然后创作

<U> CompletionStage<U> thenCompose​(Function<? super T,? extends CompletionStage<U>> fn)

返回一个新的 CompletionStage,该 CompletionStage使用相同的 值作为给定函数返回的 CompletionStage

当这个阶段正常完成时,调用给定的函数 这个阶段的结果作为参数,返回另一个 当该阶段正常完成时, 此方法返回的 CompletionStage使用相同的 价值。

为了确保进度,提供的函数必须最终安排 完成其结果。

这种方法类似于 Optional.flatMapStream.flatMap .

请参阅 CompletionStage文档以了解相关规则 非凡的成就。

我认为@Joe C 的回复是误导性的。

让我用一个例子来解释 thenApplythenCompose之间的区别。

假设我们有两个方法: getUserInfo(int userId)getUserRating(UserInfo userInfo):

public CompletableFuture<UserInfo> getUserInfo(userId)


public CompletableFuture<UserRating> getUserRating(UserInfo)

两种方法返回类型都是 CompletableFuture

我们希望首先调用 getUserInfo(),然后在它完成时,调用带有结果 UserInfogetUserRating()

在完成 getUserInfo()方法后,让我们同时尝试 thenApplythenCompose。不同之处在于返回类型:

CompletableFuture<CompletableFuture<UserRating>> f =
userInfo.thenApply(this::getUserRating);


CompletableFuture<UserRating> relevanceFuture =
userInfo.thenCompose(this::getUserRating);

thenCompose()的工作原理类似于 Scala 是 flatMap,它可以平化嵌套期货。

thenApply()返回嵌套的未来,但是 thenCompose()将嵌套的 CompletableFutures压平,以便更容易链接更多的方法调用。

thenApplythenComposeCompletableFuture的两种方法。当你想用 FunctionCompletableFuture的结果做些什么的时候使用它们。

thenApplythenCompose都返回一个 CompletableFuture作为它们自己的结果。可以将多个 thenApplythenCompose连接在一起。为每个调用提供一个 Function,其结果将作为下一个 Function的输入。

您提供的 Function有时需要同步执行某些操作。Function的返回类型应该是非 Future类型。在这种情况下,您应该使用 thenApply

CompletableFuture.completedFuture(1)
.thenApply((x)->x+1) // adding one to the result synchronously, returns int
.thenApply((y)->System.println(y)); // value of y is 1 + 1 = 2

在其他情况下,您可能希望在此 Function中执行异步处理。在这种情况下,您应该使用 thenComposeFunction的返回类型应该是 CompletionStage。链中的下一个 Function将获得该 CompletionStage作为输入的结果,从而展开 CompletionStage

// addOneAsync may be implemented by using another thread, or calling a remote method
abstract CompletableFuture<Integer> addOneAsync(int input);


CompletableFuture.completedFuture(1)
.thenCompose((x)->addOneAsync(x)) // doing something asynchronous, returns CompletableFuture<Integer>
.thenApply((y)->System.println(y)); // y is an Integer, the result of CompletableFuture<Integer> above

这与 Javascript 的 Promise类似。Promise.then可以接受返回值或返回值的 Promise的函数。这两个方法在 Java 中有不同名称的原因是由于 Promise.then1。Function<? super T,? extends U> fnFunction<? super T,? extends CompletionStage<U>> fn被认为是相同的运行时类型 -Function。因此,thenApplythenCompose必须有明确的名称,否则 Java 编译器会抱怨方法签名相同。最终的结果是,Javascript 的 Promise.then在 Java 中由 thenApplythenCompose两部分实现。

如果对相关函数 thenApplyAsync也感到困惑,可以阅读 我的另一个答案

那么 Compose () 更适合链接 CompletableFuture。

那么 Apply () 更适合于 Completable future 的转换结果。

您可以使用这两种技术来实现您的目标,但是一种技术更适合于一种用例而不是另一种。

public CompletableFuture<Integer> process(Integer i) {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(
() -> new HeavyTask(i).execute());
return completableFuture;
}


@SneakyThrows
public CompletableFuture<Integer> thenApplyVsThenCompose() {
// each time calling thenApply() function returns CompletionState
// so you will get nested Futures
// you can think about it like map() java optional
CompletableFuture<Future<Integer>> cf1 = CompletableFuture.supplyAsync(
() -> new HeavyTask().execute())
.thenApply(i -> process(i));


// to get result you will have to get nested get() calls
Integer resultFromThenApply = cf1.get().get();


// when calling thenCompose() nested Futures are flatten
// you can think about it like flatMap() java optional
CompletableFuture<Integer> cf2;
cf2 = CompletableFuture.supplyAsync(
() -> new HeavyTask().execute())
.thenCompose(this::process);


// you have to just call one get() since thenCompose was flatten
Integer resultFromThenCompose = cf2.get();
return null;
}

还有一个问题可以看出这两者之间的区别

当您不知道要应用 Apply ()/thenCompose ()(例如递归方法)多少次时,您将如何实现解决方案?

public void nested() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFutureToCompose = CompletableFuture.completedFuture(1);
for (int i = 0; i < 10; i++) {
log.info("Composing");
completableFutureToCompose = completableFutureToCompose.thenCompose(this::process);
}
completableFutureToCompose.get();


// not achievable using then apply
CompletableFuture<Integer> completableFutureToApply = CompletableFuture.completedFuture(1);
for (int i = 0; i < 10; i++) {
log.info("Applying");
completableFutureToApply = completableFutureToApply.thenApply(this::process).get();
}
completableFutureToCompose.get();
}


public CompletableFuture<Integer> process(Integer i) {
log.info("PROCESSING");
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(
() -> new HeavyTask(i).execute());
return completableFuture;
}
  • 使用组合,您首先创建收据期货是如何传递给其他,然后执行
  • 在每次应用程序调用之后执行逻辑

JoeC 的回答是正确的,但是我认为更好的比较可以明确 然后创作的用途是 那就申请吧那就申请吧之间的比较!同步映射一次传递给它,异步映射一次传递给它。

如果传递给 那就申请吧的映射返回一个 绳子(非将来,因此映射是同步的) ,那么它的结果将是 CompletableFuture<String>。现在类似地,当传递给它的映射返回一个 CompletableFuture < String > (一个将来,所以映射是异步的)时,那就申请吧的结果是什么?最终的结果将是 CompletableFuture<CompletableFuture<String>>,这是不必要的嵌套(未来的未来仍然是未来!).在这里,我们可以使用 然后创作在彼此之间“组合”(嵌套)多个异步任务,而不需要将未来嵌套在结果中。

Private void test1()抛出 ExectionException,InterruptedException {

    //thenApply返回的是之前的CompletableFuture
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1)
.thenApply((x) -> {
x = x + 1;
log.info("thenApply, 1, x:{}", x);
return x;
});


System.out.println(future.get());
}


//thenCompose返回的是新的CompletableFuture
private void test2() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1)
.thenCompose((x) -> {
return CompletableFuture.supplyAsync(() -> {
Integer y = x + 1;
log.info("thenCompose, 1, x:{}", y);
return y;
});
});


System.out.println(future.get());
}

如果你还是不明白当我使用 Apply vs thenCompose 和嵌套的未来是什么样子的时候代码的真正区别是什么,那么请看完整的工作示例。

package com.graphql.demo.garphqlapi;


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;


public class ComposeVsThenApply {


public static void main(String[] args) {
ComposeVsThenApply cva = new ComposeVsThenApply();
//thenCompose usage : Using the thenCompose for simplifying the return type of dependent processing.
System.out.println("Starting thenCompose demo");
CompletableFuture<StockRating> flattenedFuture = cva.getStockDetails("Apple").thenCompose((stock) -> {
return cva.getRating(stock);
});
//Retrive results
try {
StockRating stockViaThenCompose = flattenedFuture.get();
System.out.println("\n\t\tStock summery :" + stockViaThenCompose.getStockName() + ", Rating :" + stockViaThenCompose.getRating());
System.out.println("--- thenCompose demo ended ---");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}


//ThenAply: thenApply is good for result transformation but sucks when we have two asynchronous dependent processing. Now you get nested future.
System.out.println("\n\n\nStarting thenApply demo");
CompletableFuture<CompletableFuture<StockRating>> nestedFuture = cva.getStockDetails("Apple").thenApply((stock) -> {
return cva.getRating(stock);
});
//Retrive results
try {
StockRating stockrViaThenApply = nestedFuture.get().get();
System.out.println("\n\t\tStock summery :" + stockrViaThenApply.getStockName() + ", Rating :" + stockrViaThenApply.getRating());
System.out.println("--- thenApply demo ended---");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}


class Stock {
private String name;


public Stock(String name) {
this.name = name;
}


public String getName() {
return name;
}


public void setName(String name) {
this.name = name;
}
}


class StockRating {
private Double rating;


public boolean isBuyCall() {
return buyCall;
}


public void setBuyCall(boolean buyCall) {
this.buyCall = buyCall;
}


public String getStockName() {
return stockName;
}


public void setStockName(String stockName) {
this.stockName = stockName;
}


private boolean buyCall;


public StockRating(Double rating, boolean buyCall, String stockName) {
this.rating = rating;
this.buyCall = buyCall;
this.stockName = stockName;
}


private String stockName;


public StockRating(Double rating) {
this.rating = rating;
}


public Double getRating() {
return rating;
}


public void setRating(Double rating) {
this.rating = rating;
}
}


class StockSupplier implements Supplier<Stock> {
private String name;


public StockSupplier(String name) {
this.name = name;
}


@Override
public Stock get() {
try {
System.out.println("\n\t\tRetriving details for " + this.name);
TimeUnit.SECONDS.sleep(4);
System.out.println("\n\t\tDone with details retrival for " + this.name);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Stock(name);
}
}


class RatingSupplier implements Supplier<StockRating> {
private Stock stock;


public RatingSupplier(Stock stock) {
this.stock = stock;
}


@Override
public StockRating get() {
try {
System.out.println("\n\t\tRetriving stock rating for " + this.stock.getName());
TimeUnit.SECONDS.sleep(4);
System.out.println("\n\t\tDone with rating retrival for " + this.stock.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return new StockRating(10d, true, this.stock.getName());
}
}


public CompletableFuture<Stock> getStockDetails(String name) {
return CompletableFuture.supplyAsync(new StockSupplier(name));
}


public CompletableFuture<StockRating> getRating(Stock stock) {
return CompletableFuture.supplyAsync(new RatingSupplier(stock));
}


public String getSummeryReport(Stock stock, StockRating sr) {
return stock.getName() + "/n " + sr.getRating();
}
}

我的理解是,通过前一步的结果,如果您希望执行复杂的编排,那么 Compose 将比 Apply 具有优势。

下面的示例是,通过第一步的结果,到两个不同的地方进行计算,无论谁返回得更快,您都可以看到它们之间的差异

    CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> 1);
// thenCompose
System.out.println(result.thenCompose(i -> CompletableFuture.anyOf(CompletableFuture.supplyAsync(() -> i + 1), CompletableFuture.supplyAsync(() -> i + 2))).join());
System.out.println(result.thenCompose(i -> CompletableFuture.supplyAsync(() -> i + 1).applyToEither(CompletableFuture.supplyAsync(() -> i + 2), j -> j)).join());
// ----- thenApply
System.out.println(result.thenApply(i -> CompletableFuture.anyOf(CompletableFuture.supplyAsync(() -> i + 1), CompletableFuture.supplyAsync(() -> i + 2)).join()).join());
System.out.println(result.thenApply(i -> CompletableFuture.supplyAsync(() -> i + 1).applyToEither(CompletableFuture.supplyAsync(() -> i + 2), j -> j).join()).join());