Chaining Observables in RxJS

I'm learning RxJS and Angular 2. Let's say I have a promise chain with multiple async function calls which depend on the previous one's result which looks like:

var promiseChain = new Promise((resolve, reject) => {
setTimeout(() => {
resolve(1);
}, 1000);
}).then((result) => {
console.log(result);


return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(result + 2);
}, 1000);
});
}).then((result) => {
console.log(result);


return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(result + 3);
}, 1000);
});
});


promiseChain.then((finalResult) => {
console.log(finalResult);
});

My attempts at doing the same solely using RxJS without the use of promises produced the following:

var observableChain = Observable.create((observer) => {
setTimeout(() => {
observer.next(1);
observer.complete();
}, 1000);
}).flatMap((result) => {
console.log(result);


return Observable.create((observer) => {
setTimeout(() => {
observer.next(result + 2);
observer.complete()
}, 1000);
});
}).flatMap((result) => {
console.log(result);


return Observable.create((observer) => {
setTimeout(() => {
observer.next(result + 3);
observer.complete()
}, 1000);
});
});


observableChain.subscribe((finalResult) => {
console.log(finalResult);
});

It yields the same output as the promise chain. My questions are

  1. Am I doing this right? Are there any RxJS related improvements that I can make to the above code

  2. How do I get this observable chain to execute repeatedly? i.e. Adding another subscription at the end just produces an additional 6 though I expect it to print 1, 3 and 6.

    observableChain.subscribe((finalResult) => { console.log(finalResult); });

    observableChain.subscribe((finalResult) => { console.log(finalResult); });

    1 3 6 6

131056 次浏览

About promise composition vs. Rxjs, as this is a frequently asked question, you can refer to a number of previously asked questions on SO, among which :

Basically, flatMap is the equivalent of Promise.then.

For your second question, do you want to replay values already emitted, or do you want to process new values as they arrive? In the first case, check the publishReplay operator. In the second case, standard subscription is enough. However you might need to be aware of the cold. vs. hot dichotomy depending on your source (cf. Hot and Cold observables : are there 'hot' and 'cold' operators? for an illustrated explanation of the concept)

Example for clarification:

Top of pipe can emit n values (this answers "How do I get this observable chain to execute repeatedly"), but subsequent chained streams emit one value (hence mimicing promises).

// Emit three values into the top of this pipe
const topOfPipe = of<string>('chaining', 'some', 'observables');


// If any of the chained observables emit more than 1 value
// then don't use this unless you understand what is going to happen.
const firstObservablePipe = of(1);
const secondObservablePipe = of(2);
const thirdObservablePipe = of(3);
const fourthObservablePipe = of(4);


const addToPreviousStream = (previous) => map(current => previous + current);
const first = (one) => firstObservablePipe.pipe(addToPreviousStream(one));
const second = (two) => secondObservablePipe.pipe(addToPreviousStream(two));
const third = (three) => thirdObservablePipe.pipe(addToPreviousStream(three));
const fourth = (four) => fourthObservablePipe.pipe(addToPreviousStream(four));


topOfPipe.pipe(
mergeMap(first),
mergeMap(second),
mergeMap(third),
mergeMap(fourth),
).subscribe(console.log);


// Output: chaining1234 some1234 observables1234

You could also use concatMap or switchMap. They all have subtle differences. See rxjs docs to understand.

mergeMap: https://www.learnrxjs.io/learn-rxjs/operators/transformation/mergemap

concatMap: https://www.learnrxjs.io/learn-rxjs/operators/transformation/concatmap

switchMap: https://www.learnrxjs.io/learn-rxjs/operators/transformation/switchmap