How to 'wait' for two observables in RxJS

在我的应用程序中,我有这样的东西:

this._personService.getName(id)
.concat(this._documentService.getDocument())
.subscribe((response) => {
console.log(response)
this.showForm()
});


//Output:
// [getnameResult]
// [getDocumentResult]


// I want:
// [getnameResult][getDocumentResult]

然后我得到两个分离的结果,首先是 _personService,然后是 _documentService。如何等待两个结果,然后再调用 this.showForm()来完成然后操作每个结果。

119107 次浏览

You can use 'zip' or 'buffer' like the following.

function getName() {
return Observable.of('some name').delay(100);
}


function getDocument() {
return Observable.of('some document').delay(200);
}


// CASE1 : concurrent requests
Observable.zip(getName(), getDocument(), (name, document) => {
return `${name}-${document}`;
})
.subscribe(value => console.log(`concurrent: ${value}`));


// CASE2 : sequential requests
getName().concat(getDocument())
.bufferCount(2)
.map(values => `${values[0]}-${values[1]}`)
.subscribe(value => console.log(`sequential: ${value}`));

Have a look at the 'combineLatest' method, it might be appropriate here. Http://reactivex.io/rxjs/class/es6/observable.js~observable.html#static-method-combinelatest

const { Observable } = Rx


const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();


Observable
.combineLatest(name$, document$, (name, document) => ({ name, document }))
.first() // or not, implementation detail
.subscribe(({ name, document }) => {
// here we have both name and document
this.showForm()
})

最后更新: 2022年3月。

RxJS v7: 组合 LatestWith

来自 ReactiveX 文件:

无论何时,任何输入 Observer 发出一个值,它使用来自所有输入的最新值计算一个公式,然后发出该公式的输出。

// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    

name$.pipe(
combineLatestWith($document)
)
.subscribe(([name, document]) => {
this.name = name;
this.document = pair.document;
this.showForm();
})

(弃用) RxJS v6 bineLatest ()

来自 ReactiveX 文件:

Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of that formula.

(Update: Feb, 2021):

// Deprecated (RxJS v6)
// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    

name$.combineLatest(document$, (name, document) => {name, document})
.subscribe(pair => {
this.name = pair.name;
this.document = pair.document;
this.showForm();
})

(替代语法) : comineLatest (观察到的)

// Deprecated (RxJS v6)
// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    

combineLatest(name$, document$, (name, document) => ({name, document}))
.subscribe(pair => {
this.name = pair.name;
this.document = pair.document;
this.showForm();
})

Zip 对组合

(Update: Oct, 2018) 我之前建议使用 zip方法。然而,对于某些用例,combineLatestzip有一些优势。所以了解这些差异很重要。

CombineLatest从可观测数据中发出最新的发出值。而 zip方法按 序列顺序发出发出的项。

For example if observable #1 emits its 第三名 item and observable #2 has emitted its 第五名 item. The result using zip method will be the 第三名 emitted values of both observables.

在这种情况下,使用 combineLatest的结果将是 第五名第三名。这感觉更自然。


Zip (观察到的)

(原答案: 2017年7月) Observer able.zip 方法在 ReactiveX 文档中有解释:

组合多个可观测数据,创建一个可观测数据,其值是根据每个输入可观测数据的值按顺序计算出来的。

// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    

Observable
.zip(name$, document$, (name: string, document: string) => ({name, document}))
.subscribe(pair => {
this.name = pair.name;
this.document = pair.document;
this.showForm();
})

附注(适用于两种方法)

The last parameter, where we have provided a function, (name: string, document: string) => ({name, document}) is optional. You can skip it, or do more complex operations:

如果最新的参数是一个函数,则使用该函数从输入值计算创建的值。否则,将返回输入值的数组。

So if you skip the last part, you get an array:

// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    

Observable
.zip(name$, document$)
.subscribe(pair => {
this.name = pair['0'];
this.document = pair['1'];
this.showForm();
})

使用观测值的 forkJoin()方法

来自 RXJS 医生

当您有一组可观测数据并且只关心每个数据的最终发出值时,最好使用此运算符。这种情况的一个常见用例是,如果您希望在页面加载时发出多个请求(或其他事件) ,并且只希望在为所有人接收到响应时采取操作。这种方式类似于您可能使用 Promise.all的方式

forkJoin([character, characterHomeworld]).subscribe(results => {
// results[0] is our character
// results[1] is our character homeworld
results[0].homeworld = results[1];
this.loadedCharacter = results[0];
});

代码来自: https://coryrylan.com/blog/angular-multiple-http-requests-with-rxjs

对我来说,这个 样本是最好的解决方案。

const source = Observable.interval(500);
const example = source.sample(Observable.interval(2000));
const subscribe = example.subscribe(val => console.log('sample', val));

所以. . 只有当第二个(例子)发出-您将看到第一个(源)的最后发出值。

In my task, I wait form validation and other DOM event.

The 用于傻瓜的 RxJS 操作符: forkJoin,zip,comineLatest,withLatestFrom helped me a lot. As the name states it describes the following combination operators:

他们中的任何一个都可能是你要找的东西,这取决于案情。查看文章了解更多信息。

改进了 哈米德 · 阿斯加里回答,它使用直接参数分解和自动添加类型(当您使用类型脚本时)

const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();


combineLatest([name$, document$]).subscribe(([name, document]) => {
this.name = name;
this.document = document;
this.showForm();
});

额外的好处 : 您也可以使用以下方法处理错误

import { combineLatest, of } from 'rxjs';
//...


const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();


combineLatest([
name$.pipe(     catchError( () => of(null as string  ) ) ),
document$.pipe( catchError( () => of(null as Document) ) ), // 'Document' is arbitrary type
]).subscribe(([name, document]) => {
this.name = name;          // or null if error
this.document = document;  // or null if error
this.showForm();
});

June 2021

与 rxjs 6.6.7

像这样使用 最新合并,否则不推荐使用

combineLatest([a$ , b$]).pipe(
map(([a, b]) => ({a, b})) //change to [a , b] if you want an array
)

Also see @nyxz post

爱情鸟,总是作为一个团队工作,触发只有当所有 可观测数据返回新值

组合最新 -各就各位,开始触发一次所有观察 返回新的值,然后不等人,每次触发时 可观察的返回新值。

随着 LatestFrom -主人奴隶,主人先等待奴隶,然后 只有当主人返回新的时候,操作才会被触发 价值。

forkJoin - the final destination, trigger once when all observables 已经完成了。

发信人: https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom/amp