如何使用 RxJava2的 CompositeDisposable?

在 RxJava1中有 CompositeSubscription,但是在 RxJava2中没有,在 rxJava2中有一些 CompositeDisposable。如何在 RxJava2中使用 CompositeDisposable 或 Disposable?

62932 次浏览
private final CompositeDisposable disposables = new CompositeDisposable();




// adding an Observable to the disposable
disposables.add(sampleObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onComplete() {
}


@Override
public void onError(Throwable e) {
}


@Override
public void onNext(String value) {
}
}));


static Observable<String> sampleObservable() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
// Do some long running operation
SystemClock.sleep(2000);
return Observable.just("one", "two", "three", "four", "five");
}
});
}




// Using clear will clear all, but can accept new disposable
disposables.clear();
// Using dispose will clear all and set isDisposed = true, so it will not accept any new disposable
disposables.dispose();

参考文章

// clearing or unsubscibing
disposables.clear();

这地方用的是 dispose()

CompositeDisposable在被处置之后不能被重用。如果你想让一次性生命周期与 Android 活动生命周期同步,可以通过制作一个简单的包装器来实现。

class AndroidDisposable {
private var compositeDisposable: CompositeDisposable? = null


fun add(disposable: Disposable) {
if (compositeDisposable == null) {
compositeDisposable = CompositeDisposable()
}
compositeDisposable?.add(disposable)
}


fun dispose() {
compositeDisposable?.dispose()
compositeDisposable = null
}
}

使用方法:

class MainActivity : AppCompatActivity() {
private disposable = AndroidDisposable()


override fun onStart() {
super.onStart()
disposable.add(/* Some disposable */)
}


override fun onStop() {
disposable.dispose()
super.onStop()
}
}