CompletableFuture, Future和RxJava's Observable的区别

我想知道两者的区别 CompletableFutureFutureObservable RxJava.

我所知道的都是异步的,但是

Future.get()阻塞线程

CompletableFuture给出了回调方法

RxJava Observable——类似于CompletableFuture,但有其他好处(不确定)

例如:如果客户端需要进行多个服务调用,当我们使用Futures (Java)时,Future.get()将按顺序执行…我想知道它在RxJava中是如何更好的。

文档http://reactivex.io/intro.html

使用Futures来优化组合有条件异步执行流是很困难的(或者不可能,因为每个请求的延迟在运行时是不同的)。当然,这是可以做到的,但很快就会变得复杂(因此容易出错),或者过早地阻塞Future.get(),这就消除了异步执行的好处。

真的有兴趣知道RxJava如何解决这个问题。我发现从文档中很难理解。

101502 次浏览

期货

期货在Java 5(2004)中引入。它们基本上是一个尚未完成的操作结果的占位符。一旦操作完成,Future将包含该结果。例如,操作可以是提交给ExecutorService可运行的可调用的实例。操作的提交者可以使用Future对象检查操作是否结束(),或者使用阻塞get ()方法等待操作完成。

例子:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {


@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 1;
}


}


public static void main(String[] args) throws Exception{
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Integer> f = exec.submit(new MyCallable());


System.out.println(f.isDone()); //False


System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures

CompletableFutures在Java 8(2014)中引入。它们实际上是常规期货的进化,灵感来自谷歌的值得一听的期货番石榴库的一部分。它们是future,还允许您将任务串成一个链。你可以用它们告诉一些工作线程“去做一些任务X,当你完成了,去做另一件事使用X的结果”。使用CompletableFutures,你可以对操作的结果做一些事情,而不需要阻塞线程来等待结果。这里有一个简单的例子:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {


@Override
public Integer get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//Do nothing
}
return 1;
}
}


/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {


@Override
public Integer apply(Integer x) {
return x + 1;
}
}


public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
System.out.println(f.isDone()); // False
CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava是在Netflix上创建的反应性编程的整个库。乍一看,它与Java 8的流类似。是的,但它更强大。

与Futures类似,RxJava可以用来将一堆同步或异步操作串在一起,以创建一个处理管道。与Futures是一次性使用的不同,RxJava工作在零个或多个项目的上。包括包含无限项的永不结束的流。它也更加灵活和强大,这要归功于令人难以置信的丰富操作符集

与Java 8的流不同,RxJava也有反压力机制,这允许它处理处理管道的不同部分在不同线程中操作的情况,以不同的速度

RxJava的缺点是,尽管有可靠的文档,但由于涉及到范式转换,它是一个具有挑战性的库。Rx代码也可能是调试的噩梦,特别是在涉及多个线程的情况下,如果需要反向压力,情况会更糟。

如果你想深入了解,官方网站上有完整的各种教程页面,加上官方的文档Javadoc。你也可以看看一些视频,比如这一个,它简要介绍了Rx,还讨论了Rx和期货之间的区别。

附加功能:Java 9反应流

Java 9的反应流又名流API是由各种反应流库(如RxJava 2Akka流Vertx)实现的一组接口。它们允许这些反应库相互连接,同时保留所有重要的反压力。

我从0.9开始使用Rx Java,现在是1.3.2,很快就会迁移到2。x我在一个私人项目中使用这个,我已经工作了8年。

如果没有这个库,我就不会再编程了。一开始我是持怀疑态度的,但这是你需要创造的一种完全不同的心态。一开始很困难。有时我一看就是几个小时。哈哈

这只是一个实践的问题,真正了解流程(又名可观察对象和观察者的契约),一旦你达到了那里,你就会讨厌这样做。

对我来说,这个库并没有什么缺点。

< p >用例: 我有一个监视器视图,包含9个仪表(cpu, mem,网络等…)在启动视图时,视图将自己订阅到一个系统监视器类,该类返回一个可观察对象(interval),其中包含9米的所有数据。 它将每秒钟向视图推送一个新的结果(所以不是轮询!!) 该可观察对象使用一个平面映射来同时(异步!)从9个不同的来源获取数据,并将结果压缩到你的视图将在onNext()上获得的新模型中

你要怎么用期货,完全期权等等来做呢?好运!:)

Rx Java为我解决了编程中的许多问题,并在某种程度上使编程变得更简单……

优点:

  • Statelss ! !(重要的一点,可能是最重要的一点)
  • 线程管理开箱即用
  • 构建具有自己生命周期的序列
  • 所有东西都是可观察的,所以链接很容易
  • 编写的代码更少
  • 类路径上的单个jar(非常轻量级)
  • 高并发
  • 再也没有回调了
  • 基于用户的(消费者和生产者之间的紧密契约)
  • 反压策略(断路器之类)
  • 出色的错误处理和恢复
  • 非常好的文档(marbles <3)
  • 完全控制
  • 还有更多……
< p >缺点: -难以测试

与普通Future相比,CompletableFuture的主要优点是它利用了极其强大的流API,并为你提供了回调处理程序来连接你的任务,如果你使用普通Future,这是绝对不存在的。除了提供异步架构外,CompletableFuture是处理计算量大的map-reduce任务的一种方式,而不用太担心应用程序的性能。

所有三个接口都用于将值从生产者传输到消费者。消费者可以分为两类:

  • 同步:消费者进行阻塞调用,当值准备好时返回
  • 异步:当值准备好时,调用使用者的回调方法

此外,通信接口在其他方面也有所不同:

  • 能够传输单个值或多个值
  • 如果有多个值,则支持或不支持反压

结果是:

  • Future使用同步接口传输单个值

  • CompletableFuture使用同步和异步接口传递单个值

  • Rx使用带背压的异步接口传递多个值

此外,所有这些通信设施都支持传输异常。但情况并非总是如此。例如,BlockingQueue没有。

Java的Future是一个占位符,用于保存将来用阻塞API完成的事情。你必须使用它的` isDone()方法定期轮询它,以检查该任务是否已完成。当然,您可以实现自己的异步代码来管理轮询逻辑。但是,这会导致更多的样板代码和调试开销。

Java的CompletableFuture是由Scala的Future创新的。它携带一个内部回调方法。一旦完成,回调方法将被触发,并告诉线程应该执行下游操作。这就是为什么它有thenApply方法来对包装在CompletableFuture中的对象做进一步的操作。

RxJava的ObservableCompletableFuture的增强版本。它可以让你处理反压。在我们上面提到的thenApply方法(甚至还有它的兄弟thenApplyAsync)中,这种情况可能会发生:下游方法想要调用有时可能不可用的外部服务。在这种情况下,CompleteableFuture将完全失败,你必须自己处理错误。然而,Observable允许你处理反压,并在外部服务可用时继续执行。

此外,还有一个类似的接口Observable: Flowable。它们的设计目的各不相同。通常Flowable专门用于处理冷的和非计时的操作,而Observable专门用于处理需要即时响应的执行。在这里查看官方文件:https://github.com/ReactiveX/RxJava#backpressure