RXJS 等待数组中的所有可观察数据完成(或出错)

我把观察到的东西放到一个数组里,就像这样..。

var tasks$ = [];
tasks$.push(Observable.timer(1000));
tasks$.push(Observable.timer(3000));
tasks$.push(Observable.timer(10000));

我想要一个可观察的,当所有的任务 $已经完成时发出。请记住,在实践中,task $没有已知的 Observables 数量。

我试过 Observable.zip(tasks$).subscribe(),但是在只有一个任务的情况下,它似乎失败了,这让我相信 ZIP 需要偶数个元素才能按照我预期的方式工作。

我已经尝试了 Observable.concat(tasks$).subscribe(),但 concat 操作符的结果似乎只是一个可观察的数组... ... 例如,基本上与输入相同。你甚至不能打电话订阅它。

在 C # 中类似于 Task.WhenAll(),在 ES6中类似于 Promise.all()

我遇到过一些 SO 问题,但它们似乎都涉及等待已知数量的流(例如,将它们映射到一起)。

100212 次浏览

If you want to compose an observable that emits when all of the source observables complete, you can use forkJoin:

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/operator/first';


var tasks$ = [];
tasks$.push(Observable.timer(1000).first());
tasks$.push(Observable.timer(3000).first());
tasks$.push(Observable.timer(10000).first());
Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); });

For me this sample was best solution.

const source = Observable.interval(500);
const example = source.sample(Observable.interval(2000));
const subscribe = example.subscribe(val => console.log('sample', val));

So.. only when second (example) emit - you will see last emited value of first (source).

In my task, I wait form validation and other DOM event.

You can make usage of zip.

Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.

const obsvA = this._service.getObjA();
const obsvB = this._service.getObjB();
// or with array
// const obsvArray = [obsvA, obsvB];


const zip = Observable.zip(obsvA, obsvB);
// or with array
// const zip = Observable.zip(...obsvArray);
zip.subscribe(
result => console.log(result), // result is an array with the responses [respA, respB]
);

Things to consider:

  • Doesn't need to be an even number of observables.
  • zip visually
  • enter image description here As said here,

    The zip operator will subscribe to all inner observables, waiting for each to emit a value. Once this occurs, all values with the corresponding index will be emitted. This will continue until at least one inner observable completes.

  • When one of the observables throws an error (or even both of them), the subscription is closed (onComplete on complete is called), and with a onError method, you only get the first error.
  • zip.subscribe(
    result => console.log(result), // result is an array with the responses [respA, respB]
    error => console.log(error), // will return the error message of the first observable that throws error and then finish it
    () => console.log ('completed after first error or if first observable finishes)
    );
    
    // waits for all Observables no matter of success/fails each of them
    // returns array of items
    // each item represent even first value of Observable or it's error
    export function waitAll(args: Observable<any>[]): Observable<any[]> {
    const final = new Subject<any[]>();
    const flags = new Array(args.length);
    const result = new Array(args.length);
    let total = args.length;
    for (let i = 0; i < args.length; i++) {
    flags[i] = false;
    args[i].subscribe(
    res => {
    console.info('waitAll ' + i + ' ok ', res);
    if (flags[i] === false) {
    flags[i] = true;
    result[i] = res;
    total--;
    if (total < 1) {
    final.next(result);
    }
    }
    },
    error => {
    console.error('waitAll ' + i + ' failed ', error);
    if (flags[i] === false) {
    flags[i] = true;
    result[i] = error;
    total--;
    if (total < 1) {
    final.next(result);
    }
    }
    }
    );
    }
    return final.asObservable();
    }
    

    unit test:

    describe('waitAll', () => {
    it('should wait for all observables', async () => {
    const o1 = new Subject();
    const o2 = new Subject();
    const o3 = new Subject();
    
    
    const o = waitAll([o1, o2, o3]);
    const res = {arr: []};
    o.subscribe(result => res.arr = result, err => res.arr = []);
    
    
    expect(res.arr).toEqual([]);
    o1.next('success1');
    expect(res.arr).toEqual([]);
    o2.error('failed2')
    expect(res.arr).toEqual([]);
    o3.next('success3')
    expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
    
    
    o1.next('success1*');
    expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
    o2.error('failed2*')
    expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
    o3.next('success3*')
    expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
    });
    });