如何使一个可观测序列等待另一个完成之前发射?

假设我有一个 Observable,像这样:

var one = someObservable.take(1);


one.subscribe(function(){ /* do something */ });

然后,我有第二个 Observable:

var two = someOtherObservable.take(1);

现在,我想从 subscribe()two,但是我想确保 onetwo订阅者被激活之前已经完成。

我可以在 two上使用什么样的缓冲方法来使第二个等待第一个完成?

我想我要暂停 two直到 one完成。

211563 次浏览

我能想到几个办法

import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'


//Method one


var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});


//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));

如果第二个观测值是 性感,那么就有 another way暂停/恢复:

var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);


source1.doOnCompleted(function () {
/* resume paused source2 */
pauser.onNext(true);
}).subscribe(function(){
// do something
});


source2.subscribe(function(){
// start to recieve data
});

还可以使用缓冲版本 pausableBuffered在暂停期间保持数据处于打开状态。

如果您想确保执行顺序被保留,您可以使用 latMap 作为下面的示例

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));


first
.flatMap(() => second)
.flatMap(() => third)
.subscribe(()=> console.log('finished'));

结果将是:

"1"
"11"
"111"
"finished"

这里还有另一种利用 switchMap 的结果选择器的可能性

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
/** Wait for first Observable */
() => one$,
/** Only return the value we're actually interested in */
(value2, value1) => value2
)
.subscribe((value2) => {
/* do something */
});

由于 switchMap 的结果选择器已经折旧,下面是一个更新版本

const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
take(1),
switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
/* do something */
});

这里还有另外一个方法,但是我觉得更加直接和直观(或者至少自然,如果你习惯了承诺) ,方法。基本上,您可以使用 Observable.create()创建一个 Observer,将 onetwo包装为一个 Observer。这与 Promise.all()的工作方式非常相似。

var first = someObservable.take(1);
var second = Observable.create((observer) => {
return first.subscribe(
function onNext(value) {
/* do something with value like: */
// observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
someOtherObservable.take(1).subscribe(
function onNext(value) {
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
observer.complete();
}
);
}
);
});

这是怎么回事?首先,我们创建一个新的。传递给 Observable.create()的函数被恰当地命名为 onSubscription,它被传递给观察者(根据传递给 subscribe()的参数构建) ,这类似于在创建一个新承诺时将 resolvereject组合成一个对象。这就是我们施展魔法的方法。

onSubscription中,我们订阅了第一个观察值(在上面的示例中,这个值称为 one)。我们如何处理 nexterror取决于你,但默认提供在我的样本应该是适当的一般来说。但是,当我们接收到 complete事件(这意味着 one现在已经完成)时,我们可以订阅下一个可观察事件; 因此在第一个完成后触发第二个可观察事件。

为第二个观察者提供的示例观察者相当简单。基本上,second现在的行为就像您期望的 two在 OP 中的行为一样。更具体地说,如果没有错误,second将发出第一个也是唯一一个由 someOtherObservable发出的值(因为是 take(1)) ,然后完成。

例子

下面是一个完整的工作示例,如果你想看到我的示例在现实生活中的工作效果,你可以复制/粘贴:

var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);


var first = someObservable.take(1);
var second = Observable.create((observer) => {
return first.subscribe(
function onNext(value) {
/* do something with value like: */
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
someOtherObservable.take(1).subscribe(
function onNext(value) {
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
observer.complete();
}
);
}
);
}).subscribe(
function onNext(value) {
console.log(value);
},
function onError(error) {
console.error(error);
},
function onComplete() {
console.log("Done!");
}
);

如果您观察控制台,上面的示例将打印:

1

6

Done!

skipUntil() with last()

忽略发出的项目,直到另一个可观察的项目发出

Last: 从序列 发出 last 值(即等到它完成后再发出)

请注意,任何从传递给 skipUntil的可观测数据发出的信息都将取消跳跃,这就是为什么我们需要添加 last()-以等待流完成。

main$.skipUntil(sequence2$.pipe(last()))

官方: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil


Possible issue: Note that last() by itself 会出错 if nothing is emitted. The last() operator does have a default parameter but only when used in conjunction with a predicate. I think if this situation is a problem for you (if sequence2$ may complete without emitting) then one of these should work (currently untested):

main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))

请注意,undefined是要发出的有效项,但实际上可以是任何值。还要注意,这是连接到 sequence2$的管道,而不是连接到 main$的管道。

这里有一个可重用的方法(它是一个打印脚本,但是你可以将它改编成 js) :

function waitFor<T>(signal: Observable<any>) {
return (source: Observable<T>) => signal.pipe(
first(),
switchMap(_ => source),
);
}

你可以像任何操作员一样使用它:

var two = someOtherObservable.pipe(waitFor(one), take(1));

它基本上是一个运算符,它将订阅推迟到可观测源,直到可观测信号发出第一个事件。

由于 合并地图(或者他的别名 平面地图)操作符的原因,您可以像下面这样使用从以前的 Observer 中发出的结果:

 const one = Observable.of('https://api.github.com/users');
const two = (c) => ajax(c);//ajax from Rxjs/dom library


one.mergeMap(two).subscribe(c => console.log(c))

我知道这很旧了,但我想你需要的是:

var one = someObservable.take(1);


var two = someOtherObservable.pipe(
concatMap((twoRes) => one.pipe(mapTo(twoRes))),
take(1)
).subscribe((twoRes) => {
// one is completed and we get two's subscription.
})

Here's a custom operator written with TypeScript that waits for a signal before emitting results:

export function waitFor<T>(
signal$: Observable<any>
) {
return (source$: Observable<T>) =>
new Observable<T>(observer => {
// combineLatest emits the first value only when
// both source and signal emitted at least once
combineLatest([
source$,
signal$.pipe(
first(),
),
])
.subscribe(([v]) => observer.next(v));
});
}

You can use it like this:

two.pipe(waitFor(one))
.subscribe(value => ...);

也许您可以使用 delayWhen操作符。

我们有两个观察到的 one$two$。首先观察发射 1后1秒延迟,然后完成。第二个可观察到的发射物 2仅在 one$发射之后:

const one$ = of(1).pipe(
delay(1000),
tap(() => console.log('one$ emitted'))
);


const two$ = of(2).pipe(
delayWhen(() => one$),
tap(() => console.log('two$ emitted')),
);


two$.subscribe(n => {
console.log(`n=${n}`);
});
<script src="https://unpkg.com/rxjs@7.5.5/dist/bundles/rxjs.umd.min.js"></script>
<script>
const {of} = rxjs;
const {delay, delayWhen, tap} = rxjs.operators;
</script>