Using "skipWhile" in combination with "repeatWhen" in RxJava to implement server polling

I really like RxJava, it is a great tool, but sometimes itโ€™s very difficult to understand how it works. We use Retrofit with RxJava in our Android project, and there is the following use case:

I need to poll the server, with some delay between retries, while the server is doing a certain job. When the server is complete, I must provide the result. So, I have successfully done this with RxJava, here is the code snippet: I used "skipWhile" with "repeatWhen"

Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob) .skipWhile(new Func1<CheckJobResponse, Boolean>() { @Override public Boolean call(CheckJobResponse checkJobResponse) { boolean shouldSkip = false; if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus()); switch (checkJobResponse.getJobStatus()){ case CheckJobResponse.PROCESSING: shouldSkip = true; break; case CheckJobResponse.DONE: case CheckJobResponse.ERROR: shouldSkip = false; break; } if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip); return shouldSkip; } }) .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Void> observable) { if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable); return observable.delay(1, TimeUnit.SECONDS); } }).subscribe(new Subscriber<CheckJobResponse>(){ @Override public void onNext(CheckJobResponse response) { if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response); } @Override public void onError(BaseError error) { if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error); Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show(); } @Override public void onCompleted() { if (SHOW_LOGS) Logger.v(TAG, "onCompleted"); } }); 

The code works fine:

When the server replied that the job is being processed, I return "true" from the chain "skipWhile", the original Observable waits 1 second and again executes the http request. This process repeats until I return false from the skipWhile chain.

Here are a few things I donโ€™t understand:

  • I saw in the "skipWhile" documentation that it will not return anything (onError, onNext, onComplete) from the original Observable until I return "false" from its "call" method. So, if it does not emit anything, why does the "repeatWhen" Observable do the job? He waits one second and again requests a request. Who launches it?

  • Second question: why does the Observable from "repeatWhen" not work forever, I mean, why does it stop repeating when I return "false" from "skipWhile"? I get onNext successfully at my subscriber if I return "false".

  • The "repeatWhile" documentation says that I end up getting an "onComplete" call in my subscriber, but "onComplete" is never called.

  • It doesnโ€™t matter if I change the order of the skipWhile and repeatWhen chains. Why is this?

I understand that RxJava is open source, and I could just read the code, but as I said, it is very difficult to understand.

Thanks.

+5
source share
1 answer

I had not worked with repeatWhen , but this question made me curious, so I did some research.

skipWhile onError and onCompleted , even if it never returns true before. Thus, repeatWhen is called every time checkJob() emits onCompleted . This answers the question number 1.

The remaining questions are based on false assumptions. Your subscription works forever because your repeatWhen never ends. This is because repeatWhen is a more complex beast than you understand. Observable in it emits null whenever it gets onCompleted from the source. If you take this and return onCompleted , then it ends, otherwise, if you emit something that it repeats. Since delay simply takes the radiation and delays it, it emits null again. Thus, he constantly corresponded.

Then the answer to # 2 is that it works forever; you are probably doing something else outside this code to unsubscribe. For # 3, you never get onCompleted because it never completes. For number 4, the order does not matter, because you repeat endlessly.

The question is, how do you get the right behavior? It is as simple as using takeUntil instead of skipWhile . Thus, you keep repeating until you get the desired result, thereby stopping the stream when you want it to end.

Here is a sample code:

 Observable<Boolean> source = ...; // Something that eventually emits true source .repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS)) .takeUntil(result -> result) .filter(result -> result) .subscribe( res -> System.out.println("onNext(" + res + ")"), err -> System.out.println("onError()"), () -> System.out.println("onCompleted()") ); 

In this example, source emits boolean values. I repeat every 1 second until the source emits true . I keep taking until result is true . And I filter out all notifications that are false , so the subscriber does not receive them until it is true .

+13
source

Source: https://habr.com/ru/post/1241211/


All Articles