Rxjava: 我可以使用 retry()但是有延迟吗?

我在 Android 应用程序中使用 rxjava 异步处理网络请求。现在,我想重新尝试一个失败的网络请求,只有在一定的时间后经过。

有没有什么方法可以在一个可观察的对象上使用 retry () ,但是只有在一定延迟之后才能重试?

有没有办法让观察者知道目前正在重试(而不是第一次尝试) ?

我看了一下 deounce ()/throtleWithTimeout () ,但它们似乎在做一些不同的事情。

编辑:

我想我找到了一种方法,但是我感兴趣的要么是确认这是正确的方法,要么是其他更好的方法。

我所做的是这样的: 在我的 Observer able.OnSubscribe 的 call ()方法中,在调用 SubscribersonError ()方法之前,我只是让 Thread 休眠所需的时间。为了每1000毫秒重试一次,我做了这样的操作:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
try {
Log.d(TAG, "trying to load all products with pid: " + pid);
subscriber.onNext(productClient.getProductNodesForParentId(pid));
subscriber.onCompleted();
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e.printStackTrace();
}
subscriber.onError(e);
}
}

因为这个方法无论如何都是在 IO 线程上运行的,所以它不会阻塞 UI。我所能看到的唯一问题是,即使是第一个错误也是带有延迟报告的,因此即使没有 retry () ,延迟也是存在的。如果延迟没有应用到 之后而是 之前重试(但显然不是在第一次尝试之前) ,我会更喜欢它。

59257 次浏览

instead of using MyRequestObservable.retry I use a wrapper function retryObservable(MyRequestObservable, retrycount, seconds) which return a new Observable that handle the indirection for the delay so I can do

retryObservable(restApi.getObservableStuff(), 3, 30)
.subscribe(new Action1<BonusIndividualList>(){
@Override
public void call(BonusIndividualList arg0)
{
//success!
}
},
new Action1<Throwable>(){
@Override
public void call(Throwable arg0) {
// failed after the 3 retries !
}});




// wrapper code
private static <T> Observable<T> retryObservable(
final Observable<T> requestObservable, final int nbRetry,
final long seconds) {


return Observable.create(new Observable.OnSubscribe<T>() {


@Override
public void call(final Subscriber<? super T> subscriber) {
requestObservable.subscribe(new Action1<T>() {


@Override
public void call(T arg0) {
subscriber.onNext(arg0);
subscriber.onCompleted();
}
},


new Action1<Throwable>() {
@Override
public void call(Throwable error) {


if (nbRetry > 0) {
Observable.just(requestObservable)
.delay(seconds, TimeUnit.SECONDS)
.observeOn(mainThread())
.subscribe(new Action1<Observable<T>>(){
@Override
public void call(Observable<T> observable){
retryObservable(observable,
nbRetry - 1, seconds)
.subscribe(subscriber);
}
});
} else {
// still fail after retries
subscriber.onError(error);
}


}
});


}


});


}

You can use the retryWhen() operator to add retry logic to any Observable.

The following class contains the retry logic:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetries;
private final int retryDelayMillis;
private int retryCount;


public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}


@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Function<Throwable, Observable<?>>() {
@Override
public Observable<?> apply(final Throwable throwable) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}


// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
}

RxJava 1.x

public class RetryWithDelay implements
Func1<Observable<? extends Throwable>, Observable<?>> {


private final int maxRetries;
private final int retryDelayMillis;
private int retryCount;


public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}


@Override
public Observable<?> call(Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}


// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
}

Usage:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
.retryWhen(new RetryWithDelay(3, 2000));

This is a solution based on Ben Christensen's snippets I saw, RetryWhen Example, and RetryWhenTestsConditional (I had to change n.getThrowable() to n for it to work). I used evant/gradle-retrolambda to make the lambda notation work on Android, but you don't have to use lambdas (although it's highly recommended). For the delay I implemented exponential back-off, but you can plug in what ever backoff logic you want there. For completeness I added the subscribeOn and observeOn operators. I'm using ReactiveX/RxAndroid for the AndroidSchedulers.mainThread().

int ATTEMPT_COUNT = 10;


public class Tuple<X, Y> {
public final X x;
public final Y y;


public Tuple(X x, Y y) {
this.x = x;
this.y = y;
}
}




observable
.subscribeOn(Schedulers.io())
.retryWhen(
attempts -> {
return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
.flatMap(
ni -> {
if (ni.y > ATTEMPT_COUNT)
return Observable.error(ni.x);
return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
});
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);

Simply do it like this:

                  Observable.just("")
.delay(2, TimeUnit.SECONDS) //delay
.flatMap(new Func1<String, Observable<File>>() {
@Override
public Observable<File> call(String s) {
L.from(TAG).d("postAvatar=");


File file = PhotoPickUtil.getTempFile();
if (file.length() <= 0) {
throw new NullPointerException();
}
return Observable.just(file);
}
})
.retry(6)
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
postAvatar(file);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {


}
});

retryWhen is a complicated, perhaps even buggy, operator. The official doc and at least one answer here use range operator, which will fail if there are no retries to be made. See my discussion with ReactiveX member David Karnok.

I improved upon kjones' answer by changing flatMap to concatMap and by adding a RetryDelayStrategy class. flatMap doesn't preserve order of emission while concatMap does, which is important for delays with back-off. The RetryDelayStrategy, as the name indicates, let's the user choose from various modes of generating retry delays, including back-off. The code is available on my GitHub complete with the following test cases:

  1. Succeeds on 1st attempt (no retries)
  2. Fails after 1 retry
  3. Attempts to retry 3 times but succeeds on 2nd hence doesn't retry 3rd time
  4. Succeeds on 3rd retry

See setRandomJokes method.

You can add a delay in the Observable returned in the retryWhen Operator

          /**
* Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
*/
@Test
public void observableOnErrorResumeNext() {
Subscription subscription = Observable.just(null)
.map(Object::toString)
.doOnError(failure -> System.out.println("Error:" + failure.getCause()))
.retryWhen(errors -> errors.doOnNext(o -> count++)
.flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
Schedulers.newThread())
.onErrorResumeNext(t -> {
System.out.println("Error after all retries:" + t.getCause());
return Observable.just("I save the world for extinction!");
})
.subscribe(s -> System.out.println(s));
new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

You can see more examples here. https://github.com/politrons/reactive

Now with RxJava version 1.0+ you can use zipWith to achieve retry with delay.

Adding modifications to kjones answer.

Modified

public class RetryWithDelay implements
Func1<Observable<? extends Throwable>, Observable<?>> {


private final int MAX_RETRIES;
private final int DELAY_DURATION;
private final int START_RETRY;


/**
* Provide number of retries and seconds to be delayed between retry.
*
* @param maxRetries             Number of retries.
* @param delayDurationInSeconds Seconds to be delays in each retry.
*/
public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
MAX_RETRIES = maxRetries;
DELAY_DURATION = delayDurationInSeconds;
START_RETRY = 1;
}


@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.delay(DELAY_DURATION, TimeUnit.SECONDS)
.zipWith(Observable.range(START_RETRY, MAX_RETRIES),
new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer attempt) {
return attempt;
}
});
}
}

Inspired by Paul's answer, and if you are not concerned with retryWhen problems stated by Abhijit Sarkar, the simplest way to delay resubscription with rxJava2 unconditionnaly is :

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))

You may want to see more samples and explanations on retryWhen and repeatWhen.

Same answer as from kjones but updated to latest version For RxJava 2.x version: ('io.reactivex.rxjava2:rxjava:2.1.3')

public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {


private final int maxRetries;
private final long retryDelayMillis;
private int retryCount;


public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}


@Override
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) throws Exception {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Flowable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}


// Max retries hit. Just pass the error along.
return Flowable.error(throwable);
}
});
}
}

Usage:

// Add retry logic to existing observable. // Retry max of 3 times with a delay of 2 seconds.

observable
.retryWhen(new RetryWithDelay(3, 2000));

For Kotlin & RxJava1 version

class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long)
: Function1<Observable<out Throwable>, Observable<*>> {


private val START_RETRY: Int = 1


override fun invoke(observable: Observable<out Throwable>): Observable<*> {
return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS)
.zipWith(Observable.range(START_RETRY, MAX_RETRIES),
object : Function2<Throwable, Int, Int> {
override fun invoke(throwable: Throwable, attempt: Int): Int {
return attempt
}
})
}
}

(Kotlin) I little bit improved code with exponential backoff and applied defense emitting of Observable.range():

    fun testOnRetryWithDelayExponentialBackoff() {
val interval = 1
val maxCount = 3
val ai = AtomicInteger(1);
val source = Observable.create<Unit> { emitter ->
val attempt = ai.getAndIncrement()
println("Subscribe ${attempt}")
if (attempt >= maxCount) {
emitter.onNext(Unit)
emitter.onComplete()
}
emitter.onError(RuntimeException("Test $attempt"))
}


// Below implementation of "retryWhen" function, remove all "println()" for real code.
val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx ->
throwableRx.doOnNext({ println("Error: $it") })
.zipWith(Observable.range(1, maxCount)
.concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) },
BiFunction { t1: Throwable, t2: Int -> t1 to t2 }
)
.flatMap { pair ->
if (pair.second >= maxCount) {
Observable.error(pair.first)
} else {
val delay = interval * 2F.pow(pair.second)
println("retry delay: $delay")
Observable.timer(delay.toLong(), TimeUnit.SECONDS)
}
}
}


//Code to print the result in terminal.
sourceWithRetry
.doOnComplete { println("Complete") }
.doOnError({ println("Final Error: $it") })
.blockingForEach { println("$it") }
}

This example works with jxjava 2.2.2:

Retry without delay:

Single.just(somePaylodData)
.map(data -> someConnection.send(data))
.retry(5)
.doOnSuccess(status -> log.info("Yay! {}", status);

Retry with delay:

Single.just(somePaylodData)
.map(data -> someConnection.send(data))
.retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
.doOnSuccess(status -> log.info("Yay! {}", status)
.doOnError((Throwable error)
-> log.error("I tried five times with a 300ms break"
+ " delay in between. But it was in vain."));

Our source single fails if someConnection.send() fails. When that happens, the observable of failures inside retryWhen emits the error. We delay that emission by 300ms and send it back to signal a retry. take(5) guarantees that our signaling observable will terminate after we receive five errors. retryWhen sees the termination and doesn't retry after the fifth failure.

in the event when you need to print out the retry count, you can use the example provided in Rxjava's wiki page https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

observable.retryWhen(errors ->
// Count and increment the number of errors.
errors.map(error -> 1).scan((i, j) -> i + j)
.doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount))
// Limit the maximum number of retries.
.takeWhile(errorCount -> errorCount < retryCounts)
// Signal resubscribe event after some delay.
.flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS));

Based on kjones answer here is Kotlin version of RxJava 2.x retry with a delay as an extension. Replace Observable to create the same extension for Flowable.

fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
var retryCount = 0


return retryWhen { thObservable ->
thObservable.flatMap { throwable ->
if (++retryCount < maxRetries) {
Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
} else {
Observable.error(throwable)
}
}
}
}

Then just use it on observable observable.retryWithDelay(3, 1000)

Use retryWhen

     /**
* Retry Handler Support
* @param errors
* @param predicate filter error
* @param maxTry
* @param periodStrategy
* @param timeUnit
* @return
*/
private  Flowable<?> retrySupport(Flowable<Throwable> errors, Predicate<? super Throwable> predicate , Integer maxTry , Function<Long, Long> periodStrategy , TimeUnit timeUnit )
{
LongAdder errorCount = new LongAdder();
return errors
.doOnNext(e -> {
errorCount.increment();
long currentCount = errorCount.longValue();
boolean tryContinue = predicate.test(e) && currentCount < maxTry;
Logger.i("No. of errors: %d , %s",  currentCount,
tryContinue ? String.format("please wait %d %s.", periodStrategy.apply(currentCount), timeUnit.name()) : "skip and throw");
if(!tryContinue)
throw  e;
} )
.flatMapSingle(e -> Single.timer( periodStrategy.apply(errorCount.longValue()), timeUnit));
}


Sample

    private Single<DeviceInfo> getErrorToken( String device)
{
return Single.error(  new IOException( "network is disconnect!" ) );
}


//only retry when emit IOExpcetion
//delay 1s,2s,4s,8s,16s


this.getErrorToken( this.deviceCode )
.retryWhen( error -> retrySupport( error,
e-> e instanceof IOException,
5 ,
count-> (long)Math.pow(2,count-1),TimeUnit.SECONDS ) )
.subscribe( deviceInfo1 -> Logger.i( "----Get Device Info---" ) ,
e -> Logger.e( e, "On Error" ) ,
() -> Logger.i("<<<<<no more>>>>>"));


Worked from me with

//retry with retryCount time after 1 sec of delay
observable.retryWhen(throwableFlowable -> {
return throwableFlowable.take(retryCount).delay(1, TimeUnit.SECONDS);
});

I'm a bit too late for this one, but just in case this could still be useful for someone, I created a Kotlin extension function for RxJava 2 that will retry with an exponential backoff strategy:

  private fun <T> Observable<T>.retryWithExponentialBackoff(): Observable<T> {
val retriesSubject = BehaviorSubject.createDefault(0)
return doOnNext { retriesSubject.onNext(0) }
.retryWhen {
it.withLatestFrom(retriesSubject) { _, retryCount ->
retriesSubject.onNext(retryCount + 1)
retryCount
}.flatMap { retryCount ->
when (retryCount) {
MAX_RETRY_COUNT -> Observable.error(RuntimeException("Max number of retries reached"))
else -> Observable.timer(2.0.pow(retryCount).toLong(), SECONDS)
}
}
}
}