Java8流是否类似于 RxJava 可观察数据?
Java8流定义:
新的 java.util.stream包中的类提供了一个 支持元素流上的函数式操作。
java.util.stream
Java8Stream 和 RxJava 看起来非常相似。它们具有外观相似的操作符(filter、 map、 latMap...) ,但不是为相同的用途而构建的。
可以使用 RxJava 执行异步任务。
使用 Java8流,您将遍历集合中的项。
你可以在 RxJava 中做几乎相同的事情(遍历集合中的条目) ,但是,由于 RxJava 专注于并发任务,... ,它使用同步,闩锁,... ,所以使用 RxJava 的同一个任务可能比使用 Java8流慢。
RxJava 可以与 CompletableFuture进行比较,但是它可以计算多个值。
CompletableFuture
有一些技术和概念上的差异,例如,Java8流是单一使用的、基于拉的、同步的值序列,而 RxJava 观察值是可重新观察的、自适应的、基于推拉的、可能是异步的值序列。RxJava 的目标是 Java6 + ,也可以在 Android 上运行。
Java8流是基于拉的。您在使用每个条目的 Java8流上进行迭代。它可能是一条永无止境的溪流。
RXJavaObservable默认是基于推的。您订阅了一个观察对象,当下一个项目到达时(onNext) ,或者当流完成时(onCompleted) ,或者当错误发生时(onError) ,您将得到通知。 因为使用 Observable可以接收 onNext、 onCompleted、 onError事件,所以可以执行一些强大的功能,比如将不同的 Observable组合成新的 Observable(zip、 merge、 concat)。其他你可以做的事情是缓存,节流,..。 它在不同的语言中使用差不多相同的 API (RxJava,C # 中的 RX,RxJS,...)
Observable
onNext
onCompleted
onError
zip
merge
concat
默认情况下,RxJava 是单线程的。除非您开始使用调度程序,否则所有事情都将发生在同一个线程上。
RxJava 也与 反应流初始化反应流初始化密切相关,并认为它是反应流 API 的简单实现(例如,与 Akka 流实现相比)。主要的区别在于,反应流被设计成能够处理反向压力,但是如果你看一下反应流页面,你就会明白这个想法。他们很好地描述了他们的目标,这些流也与 被动宣言密切相关。
Java8流基本上是无界集合的实现,非常类似于 Scala Stream或 Clojure 懒惰系列。
所有的序列/流处理库都为管道构建提供了非常相似的 API。不同之处在于处理多线程和管道组合的 API。
RxJava 与 Stream 有很大的不同。在所有 JDK 的东西中,最接近于 rx.Observable的可能是 java.util.stream.Collector Stream + CompletableFuture组合(这是以处理额外的单子层为代价的,即必须处理 Stream<CompletableFuture<T>>和 CompletableFuture<Stream<T>>之间的转换)。
rx.Observable
java.util.stream.Collector
Stream
Stream<CompletableFuture<T>>
CompletableFuture<Stream<T>>
“可观测”和“数据流”之间存在显著差异:
Stream#parallel()
Observable#subscribeOn()
Observable#observeOn()
.parallel()
Observable#interval()
Observable#window()
takeWhile()
takeUntil()
Stream#anyMatch()
Stream#zip()
Files#lines()
BufferedReader#lines()
Observable#using()
onClose(Runnable)
RxJava 与 Streams 有很大的不同。真正的 RxJava 替代品是 ReactiveStreams的其他实现,例如 Akka 的相关部分。
对于 Stream#parallel使用非默认的 fork-join 池有一些技巧,请参见 Java8并行流中的自定义线程池。
Stream#parallel
以上所有内容都是基于使用 RxJava 1.x 的经验,现在使用 RxJava 2.x 在这里,这个答案可能已经过时了。
Java8Streams 支持有效地处理真正大的集合,同时利用多核架构。相比之下,RxJava 默认情况下是单线程的(没有调度器)。所以 RxJava 不会利用多核机器,除非你自己编写那个逻辑。
现有的答案是全面和正确的,但缺乏一个明确的例子,为初学者。请允许我在“推/拉式”和“可再观察”等术语后面加入一些具体内容。注意: 我讨厌术语 Observable(看在上帝的份上,它是一个流) ,所以我们简单地指 J8和 RX 流。
考虑一个整数列表,
digits = [1,2,3,4,5]
J8 Stream 是一个修改集合的实用程序,
evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())
这基本上就是 Python 的 映射,过滤,缩小,是 Java 的一个非常好的(而且早就应该)补充。但是,如果数字没有提前收集——如果数字在应用程序运行时流入——我们能实时过滤偶数吗。
假设一个单独的线程进程在应用程序运行时随机输出整数(---表示时间)
---
digits = 12345---6------7--8--9-10--------11--12
在 RX 中,even可以对每个新数字进行 反应,并实时应用滤波器
even
even = -2-4-----6---------8----10------------12
不需要存储输入和输出列表。如果你 想要一个输出列表,没有问题,这也是流式的。事实上,是 一切都是一条小溪。
evens_stored = even.collect()
这就是为什么像“无状态”和“功能”这样的术语更多地与 RX 联系在一起