可观察与可流动的 rxJava2

我一直在看新的 rx java 2,我不太确定我理解的想法 backpressure了..。

我知道我们有 Observable没有 backpressure的支持和 Flowable有它。

基于这个例子,假设我有 flowableinterval:

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});

这将在大约128个值之后崩溃,很明显,我的消费速度比得到物品的速度慢。

但是 Observable也是一样的

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});

这将不会崩溃在所有,即使当我把一些延迟消费它仍然工作。为了使 Flowable工作,让我们说我把 onBackpressureDrop操作符,崩溃消失了,但并非所有的值都被发出。

因此,基本问题,我无法找到答案目前在我的头脑是,为什么我应该关心的 backpressure时,我可以使用普通的 Observable仍然接收所有的值而不管理的 buffer?或者也许从另一方面来说,backpressure在管理和处理消费方面给了我什么优势?

50781 次浏览

事实上,你的 Flowable崩溃后发出128个值没有反压处理并不意味着它将总是崩溃后,正好128个值: 有时它会崩溃后10,有时它不会崩溃在所有。我相信这就是当你用 Observable尝试这个例子时发生的情况-碰巧没有反压力,所以你的代码工作正常,下次可能就不会了。RxJava2的不同之处在于,在 Observable中不再有反压的概念,也没有办法处理它。如果您正在设计一个反应序列,可能需要明确的背压处理-那么 Flowable是您的最佳选择。

反向压力在实践中体现的是有界缓冲区,Flowable.observeOn有一个包含128个元素的缓冲区,这个缓冲区以下游能够承受的最快速度被排出。您可以单独增加这个缓冲区大小来处理突发源,所有的反压力管理实践仍然适用于1.x。Observable.observeOn有一个无限的缓冲区,它不断地收集元素,你的应用程序可能会耗尽内存。

例如,你可以使用 Observable:

  • 处理 GUI 事件
  • 使用短序列(总共少于1000个元素)

例如,你可以使用 Flowable:

  • 冷源和非定时源
  • 类似发电机的电源
  • 网络和数据库访问器

反向压力是当你的观察者(发布者)创建的事件超出了你的订阅者能够处理的范围。因此,您可以让订阅者错过事件,或者您可以得到一个巨大的事件队列,最终导致内存不足。Flowable考虑反压。Observable则不然。就是这样。

它让我想起一个漏斗,当它有太多的液体溢出。Flowable 可以帮助避免这种情况的发生:

背负着巨大的压力:

enter image description here

但是使用可流动的,背压就小得多了:

enter image description here

Rxjava2有一些背压策略,您可以根据您的用例使用这些策略。我所说的策略是指 Rxjava2提供了一种处理由于溢出(反压力)而无法处理的对象的方法。

策略如下。 我不会把它们都看一遍,但是举个例子,如果你不想担心那些溢出的项目,你可以使用下面这样的删除策略:

ToFlowable (BackpressureStrategies y.DROP)

据我所知,应该有一个128项目的队列限制,之后可能会有溢出(反压力)。即使不是128也接近这个数字。希望这对谁有帮助。

如果你需要改变缓冲区大小从128它看起来像是可以做到的 像这样(但注意任何内存限制:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.

在软件开发通常回压策略意味着你告诉发射器放慢一点,因为消费者不能处理你的发射事件的速度。