发出的每个列表项的 RxJava 延迟

我正在努力实现一些我认为在 Rx 中会相当简单的东西。

我有一个项目列表,我希望每个项目都有一个延迟发出。

似乎 Rx 延迟()操作符只是按指定的延迟转移所有项目的发射,而不是每个单独的项目。

这是一些测试代码。它对列表中的项进行分组。然后,每个组在发出之前都应该应用一个延迟。

Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.delay(50, TimeUnit.MILLISECONDS)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();

结果是:

154ms
[5]


155ms
[2]


155ms
[1]


155ms
[3]


155ms
[4]

但我希望看到的是这样的情况:

174ms
[5]


230ms
[2]


285ms
[1]


345ms
[3]


399ms
[4]

我做错了什么?

62557 次浏览

我觉得你想要这个:

Observable.range(1, 5)
.delay(50, TimeUnit.MILLISECONDS)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();

这样,它将延迟数字进入组,而不是延迟减少名单5秒。

一种方法是使用 zip结合你的观察和一个 Interval观察延迟输出。

Observable.zip(Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();

为了延迟每个组,您可以更改 flatMap()以返回一个可观察值,从而延迟发出组。

Observable
.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g ->
Observable
.timer(50, TimeUnit.MILLISECONDS)
.flatMap(t -> g.toList())
)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();

您可以实现 自定义 rx 运算符,例如 MinRegularIntervalDelayOperator,然后将其与 lift函数一起使用

Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.lift(new MinRegularIntervalDelayOperator<Integer>(50L))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();

一种不那么干净的方法是使用.  延迟(Func1)运算符通过迭代来改变延迟。

Observable.range(1, 5)
.delay(n -> n*50)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();

还有其他方法可以使用 concatMap,因为 concatMap 返回可观察到的源项。所以我们可以增加观察到的延迟。

这就是我所尝试的。

Observable.range(1, 5)
.groupBy(n -> n % 5)
.concatMap(integerIntegerGroupedObservable ->
integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();

可以通过使用 latMap、 maxConcurrent 和  延迟()在发出的项之间添加延迟

下面是一个示例-发出带有延迟的0..4

@Test
fun testEmitWithDelays() {
val DELAY = 500L
val COUNT = 5


val latch = CountDownLatch(1)
val startMoment = System.currentTimeMillis()
var endMoment : Long = 0


Observable
.range(0, COUNT)
.flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
.subscribe(
{ println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
{},
{
endMoment = System.currentTimeMillis()
latch.countDown()
})


latch.await()


assertTrue { endMoment - startMoment >= DELAY * COUNT }
}


... value: 0, 540
... value: 1, 1042
... value: 2, 1544
... value: 3, 2045
... value: 4, 2547

实现这一点的最简单方法似乎就是使用 concatMap并将每个项目包装在一个延迟的可观察项目中。

long startTime = System.currentTimeMillis();
Observable.range(1, 5)
.concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
.doOnNext(i-> System.out.println(
"Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
.toCompletable().await();

印刷品:

Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms

只需共享一个简单的方法来以间隔发出集合中的每个项:

Observable.just(1,2,3,4,5)
.zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
.subscribe(System.out::println);

每个项目将每500毫秒发出一次

你应该能够做到这一点,通过使用 Timer运算符。我尝试用 delay,但不能达到理想的输出。注意在 flatmap运算符中执行的嵌套操作。

    Observable.range(1,5)
.flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
.map(y -> x))
// attach timestamp
.timestamp()
.subscribe(timedIntegers ->
Log.i(TAG, "Timed String: "
+ timedIntegers.value()
+ " "
+ timedIntegers.time()));

我觉得这正是你需要的,看看:

long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
.timestamp(TimeUnit.MILLISECONDS)
.subscribe(emitTime -> {
System.out.println(emitTime.time() - startTime);
});

你可以用

   Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long, Integer>() {
@Override
public Integer apply(Long aLong) throws Exception {
return aLong.intValue() + 1;
}
})
.startWith(0)
.take(listInput.size())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer index) throws Exception {
Log.d(TAG, "---index of your list --" + index);
}
});

这个代码上面没有重复值(索引)。“我确定”

对于 kotlin 用户,我为“间隔压缩”方法编写了一个扩展函数

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit


fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
Observable.zip(
this,
Observable.interval(interval, timeUnit),
BiFunction { item, _ -> item }
)

它的工作原理是一样的,但是这使得它可以重用。例如:

Observable.range(1, 5)
.delayEach(1, TimeUnit.SECONDS)

在每个发出的物品之间引入延迟是有用的:

List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));


Observable.fromIterable(letters)
.concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
.take(1)
.map(second -> item))
.subscribe(System.out::println);

https://github.com/ReactiveX/RxJava/issues/3505有更多好的选择

Observable.just("A", "B", "C", "D", "E", "F")
.flatMap { item -> Thread.sleep(2000)
Observable.just( item ) }
.subscribe { println( it ) }

这篇文章建议迅速扩展这两种方法。

联系

import RxSwift


extension Observable {
public func delayEach(_ dueTime: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
return self.concatMap { Observable.just($0).delay(dueTime, scheduler: scheduler) }
}
}

什么都没有

import RxSwift


extension Observable {
public func delayEach(_ period: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
return Observable.zip(
Observable<Int>.interval(period, scheduler: scheduler),
self
) { $1 }
}
}

用法

Observable.range(start: 1, count: 5)
.delayEach(.seconds(1), scheduler: MainScheduler.instance)

我个人倾向于使用 concat 方法,因为当上游以比延迟间隔更慢的速度发出项目时,它也将按预期工作。

是的,最初的帖子是 RxJava 特有的,但是 Google 也把你带到这里来查询 RxSwift。

关于 eis 的评论“ 为什么没有一个答案能回答这个问题,为什么这个不起作用,它有什么问题?”:

它的行为不同于预期,因为延迟一个项目意味着其排放时间延迟 相对于该项在其他情况下会发射的时间-不相对于前一个项目。

想象一下 OP 的可观察 没有的任何延迟: 所有项目都在快速连续发出(在相同的毫秒内)。延迟,每个项目发出以后。但是,由于相同的延迟适用于每个项目,他们的相对发射时间没有改变。它们仍然在一毫秒内发射出来。

想象一个人在14:00进入房间,另一个人在14:01进入。 如果你延迟一个小时,他们会在15:00和15:01进入。他们之间还有一分钟。