在 RxJava 中,如何在链接可观察对象时传递变量?

我正在使用 RxJava 连接异步操作,我想传递一些下游变量:

Observable
.from(modifications)
.flatmap( (data1) -> { return op1(data1); })
...
.flatmap( (data2) -> {
// How to access data1 here ?
return op2(data2);
})

这似乎是一个常见的模式,但我无法找到有关它的信息。

26746 次浏览

One possibility would be to use a function call:

private static Observable<T> myFunc(final Object data1) {
return op1(data1)
...
.flatmap( (data2) -> {
// I can access data1 here
return op2(data2);
});
}


Observable
.from(modifications)
.flatmap( (data1) -> { return myFunc(data1); })

BUT: correct me if I'm wrong but it doesn't feel like the reactive-programming way of doing it

Another possibility is to map the result of op1 to a org.apache.commons.lang3.tuple.Pair that contains the variable and pass that along:

Observable
.from(modifications)
.flatmap( (data1) -> {
return op1(data1).map( obj -> { return Pair.of(data1,obj); });
})
...
.flatmap( (dataPair) -> {
// data1 is dataPair.getLeft()
return op2(dataPair.getRight());
})

It works but it feels a bit uncomfortable to have variables hidden inside a Pair/Triple/... and it gets very verbose if you use the Java 6 notation.

I wonder if there is a better solution, maybe some RxJava operator could help?

The advice I got from the Couchbase forum is to use nested observables:

Observable
.from(modifications)
.flatmap( (data1) -> {
return op1(data1)
...
.flatmap( (data2) -> {
// I can access data1 here
return op2(data2);
})
});

EDIT: I'll mark this as the accepted answer as it seems to be the most recommended. If your processing is too complex to nest everything you can also check the solution with function calls.

solution on this thread works, but for complex chains it makes code difficult to read, I had to pass multiple values and what i did was create a private class with all parameters, I find code to be more readable this way,

private class CommonData{
private string data1;
private string data2;


*getters and setters*
}
...
final CommonData data = new CommonData();
Observable
.from(modifications)
.flatmap( (data1) -> {
data.setData1(data1);
return op1(data1);
})
...
.flatmap( (data2) -> {
data2 = data.getData1() + "data 2... ";
data.setData2(data2);
return op2(data2);
})

hope it helps

flatmap can take a second arg:

Observable.just("foo")
.flatMap(foo -> Observable.range(1, 5), Pair::of)
.subscribe(pair -> System.out.println("Result: " + pair.getFirst() + " Foo: " + pair.getSecond()));

source: https://medium.com/rxjava-tidbits/rxjava-tidbits-1-use-flatmap-and-retain-original-source-value-4ec6a2de52d4

You can use "global" variable to achive this:

 Object[] data1Wrapper = new Object[]{null};
Object[] data2Wrapper = new Object[]{null};
Observable
.from(modifications)
.flatmap(data1 -> {
data1Wrapper[0] = data1;
return op1(data1)
})
...
.flatmap(data2 -> {
// I can access data1 here use data1Wrapper[0]
Object data1 = data1Wrapper[0];
data2Wrapper[0] = data2;
return op2(data2);
})

I know this is an old question, but using RxJava2 & lambda, You can use something like:

Observable
.from(modifications)
.flatMap((Function<Data1, ObservableSource<Data2>>) data1 -> {
//Get data 2 obeservable


return Observable.just(new Data2())
}
}, Pair::of)

On the next flow (flatmap/map) your output pair will be (data1, data2)

Actually we have library, that simplify call chains.

https://github.com/pakoito/Komprehensions

Adding as Gradle dependency:

implementation 'io.reactivex.rxjava2:rxjava:2.2.1'
implementation 'com.github.pakoito.Komprehensions:komprehensions-rx2:1.3.2'

Usage (Kotlin):

val observable = doFlatMap(
{ Observable.from(modifications) },
{ data1 -> op1(data1) },
{ data1, data2 -> op2(data2) },
{ data1, data2, data3 -> op3(data1, data2, data3) }
)

you can use resultSelector BiFunction<? super T, ? super U, ? extends R> resultSelector the second parameter in flatmap, you can choose which result to return.