How to chain two Completable in RxJava2

I have two Completable. I would like to do following scenario: If first Completable gets to onComplete , continue with second Completable. The final results would be onComplete of second Completable.

This is how I do it when I have Single getUserIdAlreadySavedInDevice() and Completable login():

@Override
public Completable loginUserThatIsAlreadySavedInDevice(String password) {
return getUserIdAlreadySavedInDevice()
.flatMapCompletable(s -> login(password, s))


}
42735 次浏览

You are looking for andThen operator.

Returns a Completable that first runs this Completable and then the other completable.

firstCompletable
.andThen(secondCompletable)

In general, this operator is a "replacement" for a flatMap on Completable:

Completable       andThen(CompletableSource next)
<T> Maybe<T>      andThen(MaybeSource<T> next)
<T> Observable<T> andThen(ObservableSource<T> next)
<T> Flowable<T>   andThen(Publisher<T> next)
<T> Single<T>     andThen(SingleSource<T> next)

Try

Completable.concat

Returns a Completable which completes only when all sources complete, one after another.

http://reactivex.io/RxJava/javadoc/io/reactivex/Completable.html#concat(java.lang.Iterable)

TL;DR: the other answers miss a subtlety. Use doThingA().andThen(doThingB()) if you want the equivalent of concat, use doThingA().andThen(Completable.defer(() -> doThingB()) if you want the equivalent of flatMap.


EDIT: a more complete reference

  • flatMap() is the mapping version of merge()
  • concatMap() is the mapping version of concat()
  • For a Completable you need defer() to make function calls lazy like in the mapping functions for Single or Observable (or preferably make it so that nothing happens until you hit subscribe - this is a good convention to follow and is used in the official Rx libraries as well as any Rx extensions I've encountered, for advanced users this refers to cold completables only but most people can ignore that).
  • the only difference between concat(a, b) and a.andThen(b) is syntax

Some examples:

  • foo(a).andThen(bar(b)) will:

    1. call foo(a)
    2. immediately call bar(b) even if the completable returned by step 1 returns an error
    3. subscribe to whatever step 1 returns
    4. Will then subscribe to the result of bar(b) only if the last step completed successfully
  • foo(a).andThen(Completable.defer(() -> bar(b)) will:

    1. call foo(a)
    2. subscribe to the result of step 1
    3. only if the completable returned by by foo(a) succeeds then calls bar(b)

I'm going to leave out the treatment of merge() since it gets a bit more complicated, but long story short that's the one to call if you want "parallelism".


The above answers are sort of correct, but I found them misleading because they miss a subtlety about eager evaluation.

doThingA().andThen(doThingB()) will call doThingB() immediately but only subscribe to the observable returned by doThingB() when the observable returned by doThingA() completes.

doThingA().andThen(Completable.defer(() -> doThingB()) will call doThingB() only after thing A has completed.

This is important only if doThingB() has side effects before a subscribe event. E.g. Single.just(sideEffect(1)).toCompletable()

An implementation that doesn't have side effects before the subscribe event (a true cold observable) might be Single.just(1).doOnSuccess(i -> sideEffect(i)).toCompletable().

In the case that's just bitten me thing A is some validation logic and doThingB() kicks off an asynchronous database update immediately that completes a VertX ObservableFuture. This is bad. Arguably doThingB() should be written to only update the database upon subscribe, and I'm going to try and design things that way in the future.

I had the same problem and I had use the operator .concactWith to make it work. In my case, I have two fun of type Completable.

fun makeTwoThings(): Completable {
makeFirstThing().concatWith(makeSecondThing())
}


fun makeFirstThing(): Completable{
//TODO()
}


fun makeSecondThing(): Completable{
//TODO()
}

Observe that the answer that has more votes here is a little bit misleading. See the example below, my idea is to show some testing scenarios and show how the completable logic with the operator andThen behaves.

 private fun doThingAFail(): Completable {
print("Do thingA Called\n")
return Completable.fromCallable {
print("calling stream A\n")
throw(Exception("The excep"))
}
}


private fun doThingB(): Completable {
print("Do thingB Called\n")
return Completable.fromCallable {
print("calling stream B\n")


}
}


private fun doThingA(): Completable {
print("Do thingA Called\n")
return Completable.fromCallable {
print("calling stream A\n")
}
}


Observe that for the test below:

@Test
fun testCallAPlusB() {
doThingA().andThen(doThingB())
}

the output would be:

Do thingA Called
Do thingB Called

Quick note here: Observe that we are not subscribing to these Completables in this snippet.

For the test:

@Test
fun theTestSubscribe() {
doThingA().andThen(doThingB()).subscribe()
}

The output would be:

Do thingA Called
Do thingB Called
calling stream A
calling stream B


And lastly, in case first completable fails, the second completable would not be executed.

@Test
fun theTestFailThingA() {
doThingAFail().andThen(doThingB()).subscribe()
}

the output would be:

Do thingA Called
Do thingB Called
calling stream A

The key concept here is that the logic inside the method and inside the observable are executed not at the same time. "Do thingA Called" and "Do thingB Called" lines will be printed once we call doThingA() and doThingB() methods. While the "calling stream A" and "calling stream B" lines will only be called when someone subscribes to doThingA and doThingB methods.

The second concept here is how the andThen operator will handle errors. In the example above, in case doThingA() completable ends up with an error, the stream will end and not print the "calling stream B" line.