合并一份 Observables 的名单,等待所有完成

DR 如何将 Task.whenAll(List<Task>)转换成 RxJava

我现有的代码使用 Bolt 构建一个异步任务列表,并等待所有这些任务完成后再执行其他步骤。本质上,它构建一个 List<Task>并返回一个单独的 Task,当列表中的 所有任务完成时,该 Task被标记为已完成,如 螺栓现场的例子所示。

我希望用 RxJava代替 Bolts,我假设这种方法可以建立一个异步任务列表(大小未知) ,并将它们全部打包成一个单一的 Observable,但我不知道如何做。

I've tried looking at merge, zip, concat etc... but can't get to work on the List<Observable> that I'd be building up as they all seem geared to working on just two Observables at a time if I understand the docs correctly.

我正在努力学习 RxJava,对它仍然是非常新的,所以请原谅我,如果这是一个显而易见的问题或解释在文件的地方; 我已经尝试搜索。如果你能帮忙,我将不胜感激。

95951 次浏览

You probably looked at the zip operator that works with 2 Observables.

There is also the static method Observable.zip. It has one form which should be useful for you:

zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

You can check out the javadoc for more.

It sounds like you're looking for the Zip operator.

There are a few different ways of using it, so let's look at an example. Say we have a few simple observables of different types:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

The simplest way to wait for them all is something like this:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

Note that in the zip function, the parameters have concrete types that correspond to the types of the observables being zipped.

Zipping a list of observables is also possible, either directly:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);


Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

...or by wrapping the list into an Observable<Observable<?>>:

Observable<Observable<?>> obsObs = Observable.from(obsList);


Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

However, in both of these cases, the zip function can only accept a single Object[] parameter since the types of the observables in the list are not known in advance as well as their number. This means that that the zip function would have to check the number of parameters and cast them accordingly.

Regardless, all of the above examples will eventually print 1 Blah true

EDIT: When using Zip, make sure that the Observables being zipped all emit the same number of items. In the above examples all three observables emitted a single item. If we were to change them to something like this:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

Then 1, Blah, True and 2, Hello, True would be the only items passed into the zip function(s). The item 3would never be zipped since the other observables have completed.

You can use flatMap in case you have dynamic tasks composition. Something like this:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
return Observable.from(tasks)
//execute in parallel
.flatMap(task -> task.observeOn(Schedulers.computation()))
//wait, until all task are executed
//be aware, all your observable should emit onComplete event
//otherwise you will wait forever
.toList()
//could implement more intelligent logic. eg. check that everything is successful
.map(results -> true);
}

Another good example of parallel execution

Note: I do not really know your requirements for error handling. For example, what to do if only one task fails. I think you should verify this scenario.

I'm writing some computation heave code in Kotlin with JavaRx Observables and RxKotlin. I want to observe a list of observables to be completed and in the meantime giving me an update with the progress and latest result. At the end it returns the best calculation result. An extra requirement was to run Observables in parallel for using all my cpu cores. I ended up with this solution:

@Volatile var results: MutableList<CalculationResult> = mutableListOf()


fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {


return Observable.create { subscriber ->
Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
}).subscribeBy(
onNext = {
results.add(it)
subscriber.onNext(Pair("A calculation is ready", it))


},
onComplete = {
subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results))
subscriber.onComplete()
},
onError = {
subscriber.onError(it)
}
)
}
}

With Kotlin

Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean ->


})

It's important to set the type for the function's arguments or you will have compilation errors

The last argument type change with the number of argument : BiFunction for 2 Function3 for 3 Function4 for 4 ...

Of the suggestions proposed, zip() actually combines observable results with each other, which may or may not be what is wanted, but was not asked in the question. In the question, all that was wanted was execution of each of the operations, either one-by-one or in parallel (which was not specified, but linked Bolts example was about parallel execution). Also, zip() will complete immediately when any of the observables complete, so it's in violation of the requirements.

For parallel execution of Observables, flatMap() presented in the other answer is fine, but merge() would be more straight-forward. Note that merge will exit on error of any of the Observables, if you rather postpone the exit until all observables have finished, you should be looking at mergeDelayError().

For one-by-one, I think Observable.concat() static method should be used. Its javadoc states like this:

concat(java.lang.Iterable> sequences) Flattens an Iterable of Observables into one Observable, one after the other, without interleaving them

which sounds like what you're after if you don't want parallel execution.

Also, if you're only interested in the completion of your task, not return values, you should probably look into Completable instead of Observable.

TLDR: for one-by-one execution of tasks and oncompletion event when they are completed, I think Completable.concat() is best suited. For parallel execution, Completable.merge() or Completable.mergeDelayError() sounds like the solution. The former one will stop immediately on any error on any completable, the latter one will execute them all even if one of them has an error, and only then reports the error.

I had similar problem, I needed to fetch search items from rest call while also integrate saved suggestions from a RecentSearchProvider.AUTHORITY and combine them together to one unified list. I was trying to use @MyDogTom solution, unfortunately there is no Observable.from in RxJava. After some research I got a solution that worked for me.

 fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>>
{
val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0)
fetchedItems.add(fetchSearchSuggestions(context,query).toObservable())
fetchedItems.add(getSearchResults(query).toObservable())


return Observable.fromArray(fetchedItems)
.flatMapIterable { data->data }
.flatMap {task -> task.observeOn(Schedulers.io())}
.toList()
.map { ArrayList(it) }
}

I created an observable from the array of observables that contains lists of suggestions and results from the internet depending on the query. After that you just go over those tasks with flatMapIterable and run them using flatmap, place the results in array, which can be later fetched into a recycle view.

If you use Project Reactor, you can use Mono.when.

Mono.when(publisher1, publisher2)
.map(i-> {
System.out.println("everything is done!");
return i;
}).block()