RxJS: How to not subscribe to initial value and/or undefined?

Being new to RxJS I often create a subject which holds values in the future, but is initially undefined. It can only be undefined the first time. I currently use a filter to skip undefined values, but this is quite cumbersome as I do it everywhere as I need only once. (Maybe I do something wrong here?) Can I somehow subscribe to mySubject only after it got its first value via onNext?

var mySubject = new Rx.BehaviorSubject(undefined);


mySubject.filter(function(value) {
return value !== undefined;
}).subscribe(function(value) {
// do something with the value
});
50258 次浏览

Use new Rx.ReplaySubject(1) instead of BehaviorSubject.

As mentioned by Will you should be able to just skip the first value by using the skip operator:

var mySubject = new Rx.BehaviorSubject(undefined);


mySubject.pipe(skip(1)).subscribe(function(value) {
// do something with the value
});

For now I am using the filter operator, but I don't know if it's a good solution:

var mySubject = new Rx.BehaviorSubject().filter(x => !!x);


mySubject.subscribe(value => { /* will receive value from below */);


mySubject.next('value');


mySubject.subscribe(value => { /* also receives the value */ });

mySubject.pipe( skipWhile( v => !v ) );

Sometimes there will be a need for behaviourSubject, where the initial value does not matter and the current value is needed asynchronously while working inside a stream, Just in our case multiple chain promises are handled with user cancel while processing or during fetching the data from anywhere inside the stream.

This can be achieved using the following way.

// for user related commands
this.commandSource = new BehaviorSubject(CONTINUE);
// filtering over initial value which is continue to make it as a different pipe
const stopPipe = commandSource.pipe(filter(val => val === STOP));
const fetchStream = Observable.fromPromise(this.fetchDetails);


merge(fetchStream, stopPipe).pipe(
take(1),
takeWhile(() => commandSource.value === CONTINUE),
concatMap((response) => {
// fetch Another response you can return promise directly in concatMap
// return array of response [1 ,2 ,3];
return this.fetchYetAnotherDetails;
}),
// we can add this to stop stream in multiple places while processing the response
takeWhile(() => commandSource.value === CONTINUE),
// triggers parallelly values from the concatMap that is 1, 2 , 3
mergeMap(() => // massage the response parallelly using )
finalize(() => thi
commandSource.complete())
).subscribe(res => {
// handle each response 1, 2, 3 mapped
}, () => {
// handle error
}, () => {
// handle complete of the stream
});


// when user, clicks cancel, this should stop the stream.
commandSource.next(STOP)

I have found this frustrating in both RxJS and RxSwift. (Wanting a value subject combined with the ability to wait on the first value).

For JS I am currently just tucking away a filtered version in the subject, something like this:

    let mySubject = new Rx.BehaviorSubject();
mySubject.wait = mySubject.pipe(filter(v=>v!==undefined));

So the subject is still exposed for publishing but clients don't have to repeat the filter.

    mySubject.wait.subscribe((v)=>{...});