Say the interface you defined for Retrofit contains a method similar to this:
public Observable<GameState> loadGameState(@Query("id") String gameId);
Retrofitting methods can be defined in one of three ways:
1.) simple synchronous:
public GameState loadGameState(@Query("id") String gameId);
2.) that accept a Callback for asynchronous processing:
public void loadGameState(@Query("id") String gameId, Callback<GameState> callback);
3.) and the one returned by rxjava Observable , see above. I think that if you are going to use Retrofit in combination with rxjava, it makes sense to use this version.
Thus, you can simply use Observable for a single query, directly as follows:
mApiService.loadGameState(mGameId) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<GameState>() { @Override public void onNext(GameState gameState) {
If you want to re-interrogate the server, you can provide a "pulse" using version timer() or interval() :
Observable.timer(0, 2000, TimeUnit.MILLISECONDS) .flatMap(mApiService.loadGameState(mGameId)) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<GameState>() { @Override public void onNext(GameState gameState) {
It is important to note that flatMap used instead of map , because the return value of loadGameState(mGameId) itself an Observable .
But the version that you use in your update should also work:
Observable.interval(2, TimeUnit.SECONDS, Schedulers.io()) .map(tick -> Api.ReceiveGameTurn()) .doOnError(err -> Log.e("Polling", "Error retrieving messages" + err)) .retry() .observeOn(AndroidSchedulers.mainThread()) .subscribe(sub);
That is, if ReceiveGameTurn() is defined synchronously, like mine 1.) above, you should use map instead of flatMap .
In both cases, the onNext your Subscriber will be called every two seconds with the last game status from the server. You can process them one by one to limit the radiation to one element by inserting take(1) to subscribe() .
However, with regard to the first version: a single network error will be delivered to onError , and then the Observable will stop emitting any elements, which will make your subscriber useless and without input (remember, onError can be called once). To get around this, you can use any of the onError* rxjava methods to “redirect” the onNext failure.
For instance:
Observable.timer(0, 2000, TimeUnit.MILLISECONDS) .flatMap(new Func1<Long, Observable<GameState>>(){ @Override public Observable<GameState> call(Long tick) { return mApiService.loadGameState(mGameId) .doOnError(err -> Log.e("Polling", "Error retrieving messages" + err)) .onErrorResumeNext(new Func1<Throwable, Observable<GameState>(){ @Override public Observable<GameState> call(Throwable throwable) { return Observable.emtpy()); } }); } }) .filter() .take(1) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<GameState>() { @Override public void onNext(GameState gameState) {
This will be every two seconds: * use Retrofit to get the current state of the game from the server * filter out invalid * take the first valid * and unsubscribe
In case of error: * it will display an error message in doOnNext * and otherwise ignore the error: onErrorResumeNext will "consume" onError -Event (i.e. your Subscriber onError will not be called) and replace it with nothing ( Observable.empty() )
And as for the second version: in the event of a network error, retry will immediately return to the interval - and since interval emits the first integer immediately after the subscription, the next request is also sent immediately - and not after 3 seconds, as you probably want ...
Final note. In addition, if your game state is quite large, you can also first poll the server to ask if a new state is available, and only if the answer is yes, restart the new state of the game.
If you need more complex examples, please ask.
UPDATE I rewrote portions of this post and added additional information between them.
UPDATE 2 . I have added a complete error handling example using onErrorResumeNext .