RxJS 序列等效于 nose.then() ?

我曾经开发了很多有前途的东西,现在我要转移到 RxJS。RxJS 的文档没有提供关于如何从承诺链转移到观察者序列的非常清晰的示例。

例如,我通常写承诺链有多个步骤,如

// a function that returns a promise
getPromise()
.then(function(result) {
// do something
})
.then(function(result) {
// do something
})
.then(function(result) {
// do something
})
.catch(function(err) {
// handle error
});

我应该如何用 RxJS 风格重写这个承诺链?

73919 次浏览

For data flow (equivalent to then):

Rx.Observable.fromPromise(...)
.flatMap(function(result) {
// do something
})
.flatMap(function(result) {
// do something
})
.subscribe(function onNext(result) {
// end of chain
}, function onError(error) {
// process the error
});

A promise can be converted into an observable with Rx.Observable.fromPromise.

Some promise operators have a direct translation. For instance RSVP.all, or jQuery.when can be replaced by Rx.Observable.forkJoin.

Keep in mind that you have a bunch of operators that allows to transform data asynchronously, and to perform tasks that you cannot or would be very hard to do with promises. Rxjs reveals all its powers with asynchronous sequences of data (sequence i.e. more than 1 asynchronous value).

For error management, the subject is a little bit more complex.

  • there are catch and finally operators too
  • retryWhen can also help to repeat a sequence in case of error
  • you can also deal with errors in the subscriber itself with the onError function.

For precise semantics, have a deeper look at the documentation and examples you can find on the web, or ask specific questions here.

This would definitely be a good starting point for going deeper in error management with Rxjs : https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html

A more modern alternative:

import {from as fromPromise} from 'rxjs';
import {catchError, flatMap} from 'rxjs/operators';


fromPromise(...).pipe(
flatMap(result => {
// do something
}),
flatMap(result => {
// do something
}),
flatMap(result => {
// do something
}),
catchError(error => {
// handle error
})
)

Also note that for all this to work, you need to subscribe to this piped Observable somewhere, but I assume it's handled in some other part of the application.

if getPromise function is in a middle of a stream pipe you should simple wrap it into one of functions mergeMap, switchMap or concatMap (usually mergeMap):

stream$.pipe(
mergeMap(data => getPromise(data)),
filter(...),
map(...)
).subscribe(...);

if you want to start your stream with getPromise() then wrap it into from function:

import {from} from 'rxjs';


from(getPromise()).pipe(
filter(...)
map(...)
).subscribe(...);

As far as i just found out, if you return a result in a flatMap, it converts it to an Array, even if you returned a string.

But if you return an Observable, that observable can return a string;

If I understood correctly, you mean consuming the values, in which case you use sbuscribe i.e.

const arrObservable = from([1,2,3,4,5,6,7,8]);
arrObservable.subscribe(number => console.log(num) );

Additionally, you can just turn the observable to a promise using toPromise() as shown:

arrObservable.toPromise().then()

Update May 2019, using RxJs 6

Agree with the provided answers above, wished to add a concrete example with some toy data & simple promises (with setTimeout) using RxJs v6 to add clarity.

Just update the passed id (currently hard-coded as 1) to something that does not exist to execute the error handling logic too. Importantly, also note the use of of with catchError message.

import { from as fromPromise, of } from "rxjs";
import { catchError, flatMap, tap } from "rxjs/operators";


const posts = [
{ title: "I love JavaScript", author: "Wes Bos", id: 1 },
{ title: "CSS!", author: "Chris Coyier", id: 2 },
{ title: "Dev tools tricks", author: "Addy Osmani", id: 3 }
];


const authors = [
{ name: "Wes Bos", twitter: "@wesbos", bio: "Canadian Developer" },
{
name: "Chris Coyier",
twitter: "@chriscoyier",
bio: "CSS Tricks and CodePen"
},
{ name: "Addy Osmani", twitter: "@addyosmani", bio: "Googler" }
];


function getPostById(id) {
return new Promise((resolve, reject) => {
setTimeout(() => {
const post = posts.find(post => post.id === id);
if (post) {
console.log("ok, post found!");
resolve(post);
} else {
reject(Error("Post not found!"));
}
}, 200);
});
}


function hydrateAuthor(post) {
return new Promise((resolve, reject) => {
setTimeout(() => {
const authorDetails = authors.find(person => person.name === post.author);
if (authorDetails) {
post.author = authorDetails;
console.log("ok, post hydrated with author info");
resolve(post);
} else {
reject(Error("Author not Found!"));
}
}, 200);
});
}


function dehydratePostTitle(post) {
return new Promise((resolve, reject) => {
setTimeout(() => {
delete post.title;
console.log("ok, applied transformation to remove title");
resolve(post);
}, 200);
});
}


// ok, here is how it looks regarding this question..
let source$ = fromPromise(getPostById(1)).pipe(
flatMap(post => {
return hydrateAuthor(post);
}),
flatMap(post => {
return dehydratePostTitle(post);
}),
catchError(error => of(`Caught error: ${error}`))
);


source$.subscribe(console.log);

Output Data:

ok, post found!
ok, post hydrated with author info
ok, applied transformation to remove title
{ author:
{ name: 'Wes Bos',
twitter: '@wesbos',
bio: 'Canadian Developer' },
id: 1 }

The key part, is equivalent to the following using plain promise control flow:

getPostById(1)
.then(post => {
return hydrateAuthor(post);
})
.then(post => {
return dehydratePostTitle(post);
})
.then(author => {
console.log(author);
})
.catch(err => {
console.error(err);
});

This is how I did it.

Previously

  public fetchContacts(onCompleteFn: (response: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => void) {
const request = gapi.client.people.people.connections.list({
resourceName: 'people/me',
pageSize: 100,
personFields: 'phoneNumbers,organizations,emailAddresses,names'
}).then(response => {
onCompleteFn(response as gapi.client.Response<gapi.client.people.ListConnectionsResponse>);
});
}


// caller:


this.gapi.fetchContacts((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
// handle rsp;
});

After(ly?)

public fetchContacts(): Observable<gapi.client.Response<gapi.client.people.ListConnectionsResponse>> {
return from(
new Promise((resolve, reject) => {
gapi.client.people.people.connections.list({
resourceName: 'people/me',
pageSize: 100,
personFields: 'phoneNumbers,organizations,emailAddresses,names'
}).then(result => {
resolve(result);
});
})
).pipe(map((result: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
return result; //map is not really required if you not changing anything in the response. you can just return the from() and caller would subscribe to it.
}));
}


// caller


this.gapi.fetchContacts().subscribe(((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
// handle rsp
}), (error) => {
// handle error
});

RxJS sequence equivalent to promise.then()?

For example

function getdata1 (argument) {
return this.http.get(url)
.map((res: Response) => res.json());
}


function getdata2 (argument) {
return this.http.get(url)
.map((res: Response) => res.json());
}


getdata1.subscribe((data1: any) => {
console.log("got data one. get data 2 now");
getdata2.subscribe((data2: any) => {
console.log("got data one and two here");
});
});