I really like RxJava, it is a great tool, but sometimes itโs very difficult to understand how it works. We use Retrofit with RxJava in our Android project, and there is the following use case:
I need to poll the server, with some delay between retries, while the server is doing a certain job. When the server is complete, I must provide the result. So, I have successfully done this with RxJava, here is the code snippet: I used "skipWhile" with "repeatWhen"
Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob) .skipWhile(new Func1<CheckJobResponse, Boolean>() { @Override public Boolean call(CheckJobResponse checkJobResponse) { boolean shouldSkip = false; if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus()); switch (checkJobResponse.getJobStatus()){ case CheckJobResponse.PROCESSING: shouldSkip = true; break; case CheckJobResponse.DONE: case CheckJobResponse.ERROR: shouldSkip = false; break; } if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip); return shouldSkip; } }) .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Void> observable) { if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable); return observable.delay(1, TimeUnit.SECONDS); } }).subscribe(new Subscriber<CheckJobResponse>(){ @Override public void onNext(CheckJobResponse response) { if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response); } @Override public void onError(BaseError error) { if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error); Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show(); } @Override public void onCompleted() { if (SHOW_LOGS) Logger.v(TAG, "onCompleted"); } });
The code works fine:
When the server replied that the job is being processed, I return "true" from the chain "skipWhile", the original Observable waits 1 second and again executes the http request. This process repeats until I return false from the skipWhile chain.
Here are a few things I donโt understand:
I saw in the "skipWhile" documentation that it will not return anything (onError, onNext, onComplete) from the original Observable until I return "false" from its "call" method. So, if it does not emit anything, why does the "repeatWhen" Observable do the job? He waits one second and again requests a request. Who launches it?
Second question: why does the Observable from "repeatWhen" not work forever, I mean, why does it stop repeating when I return "false" from "skipWhile"? I get onNext successfully at my subscriber if I return "false".
The "repeatWhile" documentation says that I end up getting an "onComplete" call in my subscriber, but "onComplete" is never called.
It doesnโt matter if I change the order of the skipWhile and repeatWhen chains. Why is this?
I understand that RxJava is open source, and I could just read the code, but as I said, it is very difficult to understand.
Thanks.