Java8流和 RxJava 可观察数据之间的差异

Java8流是否类似于 RxJava 可观察数据?

Java8流定义:

新的 java.util.stream包中的类提供了一个 支持元素流上的函数式操作。

50726 次浏览

Java8Stream 和 RxJava 看起来非常相似。它们具有外观相似的操作符(filter、 map、 latMap...) ,但不是为相同的用途而构建的。

可以使用 RxJava 执行异步任务。

使用 Java8流,您将遍历集合中的项。

你可以在 RxJava 中做几乎相同的事情(遍历集合中的条目) ,但是,由于 RxJava 专注于并发任务,... ,它使用同步,闩锁,... ,所以使用 RxJava 的同一个任务可能比使用 Java8流慢。

RxJava 可以与 CompletableFuture进行比较,但是它可以计算多个值。

有一些技术和概念上的差异,例如,Java8流是单一使用的、基于拉的、同步的值序列,而 RxJava 观察值是可重新观察的、自适应的、基于推拉的、可能是异步的值序列。RxJava 的目标是 Java6 + ,也可以在 Android 上运行。

Java8流是基于拉的。您在使用每个条目的 Java8流上进行迭代。它可能是一条永无止境的溪流。

RXJavaObservable默认是基于推的。您订阅了一个观察对象,当下一个项目到达时(onNext) ,或者当流完成时(onCompleted) ,或者当错误发生时(onError) ,您将得到通知。 因为使用 Observable可以接收 onNextonCompletedonError事件,所以可以执行一些强大的功能,比如将不同的 Observable组合成新的 Observable(zipmergeconcat)。其他你可以做的事情是缓存,节流,..。 它在不同的语言中使用差不多相同的 API (RxJava,C # 中的 RX,RxJS,...)

默认情况下,RxJava 是单线程的。除非您开始使用调度程序,否则所有事情都将发生在同一个线程上。

RxJava 也与 反应流初始化反应流初始化密切相关,并认为它是反应流 API 的简单实现(例如,与 Akka 流实现相比)。主要的区别在于,反应流被设计成能够处理反向压力,但是如果你看一下反应流页面,你就会明白这个想法。他们很好地描述了他们的目标,这些流也与 被动宣言密切相关。

Java8流基本上是无界集合的实现,非常类似于 Scala StreamClojure 懒惰系列

长话短说

所有的序列/流处理库都为管道构建提供了非常相似的 API。不同之处在于处理多线程和管道组合的 API。

答案很长

RxJava 与 Stream 有很大的不同。在所有 JDK 的东西中,最接近于 rx.Observable的可能是 java.util.stream.Collector Stream + CompletableFuture组合(这是以处理额外的单子层为代价的,即必须处理 Stream<CompletableFuture<T>>CompletableFuture<Stream<T>>之间的转换)。

“可观测”和“数据流”之间存在显著差异:

  • 流是基于拉的,可观测的是基于推的。这听起来可能太抽象了,但是它有非常具体的重要后果。
  • 流只能使用一次,可观察可订阅多次。
  • Stream#parallel()将序列分割成多个分区,而 Observable#subscribeOn()Observable#observeOn()不这样做; 它很难用 Observer 模拟 Stream#parallel()的行为,它曾经使用过 .parallel()方法,但是这种方法造成了很大的混乱,以至于 .parallel()支持被转移到单独的存储库: ReactiveX/RxJava并行: RxJava 的实验并行扩展。更多细节在 另一个答案中。
  • Stream#parallel()不允许指定要使用的线程池,这与大多数接受可选调度器的 RxJava 方法不同。由于 JVM 中的 所有流实例使用相同的 fork-join 池,因此添加 .parallel()可能会意外地影响程序的另一个模块中的行为。
  • 流是缺乏时间相关的操作,如 Observable#interval()Observable#window()和许多其他; 这主要是因为流是基于拉,上游没有控制 什么时候发出下一个元素下游。
  • 与 RxJava 相比,流提供有限的操作集。例如,流缺乏截止操作(takeWhile()takeUntil()) ; 使用 Stream#anyMatch()的解决方案是有限的: 它是终端操作,因此您不能在每个流中多次使用它
  • 至于 JDK 8,没有 Stream#zip()操作,这有时非常有用。
  • 流是很难自己构建的,可观察的可以通过很多方式来构建。正如评论中提到的,有很多方法可以构建流。然而,由于没有非终端短路,您不能轻松地在文件中生成行流(JDK 提供了 Files#lines()BufferedReader#lines()开箱即用,其他类似的场景可以通过从 Iterator 构建 Stream 来管理)。
  • 可观察提供资源管理功能(Observable#using()) ; 你可以用它包装 IO 流或互斥锁,并确保用户不会忘记释放资源——它会在订阅终止时自动释放; Stream 有 onClose(Runnable)方法,但你必须手动或通过 try-with-resources 调用它。例如,您必须记住,Files#lines() 必须的被封闭在 try-with-resources 块中。
  • 可观测数据在整个过程中都是同步的(我实际上并没有检查 Streams 是否也是如此)。这使您不必考虑基本操作是否是线程安全的(答案总是“是”,除非存在 bug) ,但是与并发相关的开销将存在,无论您的代码是否需要它。

围捕

RxJava 与 Streams 有很大的不同。真正的 RxJava 替代品是 ReactiveStreams的其他实现,例如 Akka 的相关部分。

更新

对于 Stream#parallel使用非默认的 fork-join 池有一些技巧,请参见 Java8并行流中的自定义线程池

更新

以上所有内容都是基于使用 RxJava 1.x 的经验,现在使用 RxJava 2.x 在这里,这个答案可能已经过时了。

Java8Streams 支持有效地处理真正大的集合,同时利用多核架构。相比之下,RxJava 默认情况下是单线程的(没有调度器)。所以 RxJava 不会利用多核机器,除非你自己编写那个逻辑。

现有的答案是全面和正确的,但缺乏一个明确的例子,为初学者。请允许我在“推/拉式”和“可再观察”等术语后面加入一些具体内容。注意: 我讨厌术语 Observable(看在上帝的份上,它是一个流) ,所以我们简单地指 J8和 RX 流。

考虑一个整数列表,

digits = [1,2,3,4,5]

J8 Stream 是一个修改集合的实用程序,

evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())

这基本上就是 Python 的 映射,过滤,缩小,是 Java 的一个非常好的(而且早就应该)补充。但是,如果数字没有提前收集——如果数字在应用程序运行时流入——我们能实时过滤偶数吗。

假设一个单独的线程进程在应用程序运行时随机输出整数(---表示时间)

digits = 12345---6------7--8--9-10--------11--12

在 RX 中,even可以对每个新数字进行 反应,并实时应用滤波器

even = -2-4-----6---------8----10------------12

不需要存储输入和输出列表。如果你 想要一个输出列表,没有问题,这也是流式的。事实上,是 一切都是一条小溪。

evens_stored = even.collect()

这就是为什么像“无状态”和“功能”这样的术语更多地与 RX 联系在一起