Using RxJava for a series of operations chain - where to go next?

Scenario

I built an API for my application, which is located behind the gateway with request throttling. Before I built the API, my applications coordinated the requests and thus could run many requests in milliseconds to synchronize the data for the application through the 9 providers used to retrieve the data. Now this logic has been pasted into my API adapter layer. I need to think about how I can control the number of requests per second so as not to hit my limits. Increasing the speed limit is not an option, because the gateway provider requires a level level that I do not want to pay.

I started using RxJava

With a little effort in this strong movement within the Java community, I decided to use RxJava along with Retrofit and Retrolamba for the SDK API that I created. It has been largely successful and works live without a problem.

My application

Now my application allows users to save "spots" that, when synchronized, get local weather, tidal and swell conditions for this area. Each place uses 4 API resources to get a complete set of data, in particular:

/luna/locations/xtide/{id} - Luna Event detail (read: tide times) /solar/locations/xtide/{id} - Solar Event detail (read: sunrise/sunset) /water/locations/{provider}/{id}{?daysData} - Water Event detail (read: swell measures) /meteo/wwo/weather{?query,daysData} - Meteo Event detail (read: weather data) 

The application allows any number of points, n means that with the current code I have 4n requests per place. For example, if I have 10 spots saved and try to synchronize everything, I will call 4 * 10 = 40 API requests that will start around 0.75s!

Self-throttling

I want to use Rx to simplify the process of self-extinguishing my API requests. Here is what I want to achieve - (hopefully accurate) marble chart;

enter image description here Figure 1: Marble Diagram Showing Desired Stream Composition

The SynchronisationService.java code looks something like this:

  Observable.zip( Observable.from(spots), Observable.interval(SYNC_TICK, TimeUnit.MILLISECONDS), (obs, timer) -> obs) .subscribeOn(scheduler) .observeOn(scheduler) .unsubscribeOn(scheduler) .flatMap(spot -> syncTidePosition.get().buildObservable(spot)) .subscribe(spotAndTideEvent -> new TideEventSubscriber( lunaEventService, synchronisationIntentProvider.get(), spotAndTideEvent.spot, String.format( getString(string.tide_error_message), spotAndTideEvent.spot.getTidePosition() ), errorHandlerService, localBroadcastManager) ); 

... and the call to buildObservable looks like this:

 Observable<SpotAndTideEventTuple> buildObservable(final Spot spot) { return Observable.zip( Observable.just(spot), lunaEventsProvider .listTideTimes( spot.getTideOperator(), Integer.toString(spot.getTidePosition()) ), SpotAndTideEventTuple::new ); } 

... and the lunaEventsProvider.listTideTimes(...) method is as follows:

 public Observable<List<TideEvent>> listTideTimes(@NonNull final LunaProvider provider, @NonNull final String identifier) { return getRetrofitServiceImpl(LunaEventsProviderDefinition.class) .listTideTimes(provider, identifier) .map(TideEventsTemplate::buildModels); } 

Problem

As an Rx fan, I read most of the documentation to get this far, but when I run into a bug with the code, I donโ€™t understand where to go next. Either the subscription does not lead to the launch of outliers (both with the fragments shown), and with a small change in the settings I get a useless low-level NPE ( rx.Scheduler ).

Where do I go next? Am I on the right track using Rx for the described scenario? Any help was appreciated.

+5
source share
1 answer

It is somewhat embarrassing that the NPE errors that I saw had nothing to do with Rx, and the scheduler that I specified to start the operation was introduced in android.app.Service , but due to a little "wrong configuration" (omitting @Inject annotation!) the scheduler variable was empty.

A little comfort in understanding that the reason I missed this is because my injection scheduler also qualified, which means that it โ€œlookedโ€ just like my other ads at the top of the class;

 @Inject @IoScheduler Scheduler scheduler; @Inject LocalBroadcastManager localBroadcastManager; @Inject NotificationManager notificationManager; @Inject SharedPreferences sharedPrefs; 

Well, I had fun building these marble charts and spreading my understanding on Rx. The current call now coordinates all 4 API requests and looks like this:

  Observable.zip( Observable.from(spots), Observable.interval(SYNC_TICK, TimeUnit.MILLISECONDS), (obs, timer) -> obs) .subscribeOn(scheduler) .observeOn(scheduler) .unsubscribeOn(scheduler) .flatMap(this::buildObservable) .subscribe( new EventSubscriber( lunaEventService, solarService, swellService, conditionsService, synchronisationIntentProvider.get(), errorHandlerService, localBroadcastManager, TRENDING_LENGTH_DAYS ) ); 

This is part of the way through the refactor of this service, so I expect it to change a bit more, especially when it comes to getting tests under green. Glad I got stuck with it using Rx, literally deletes 50 to 100 lines of code every time I learn a function!

+1
source

Source: https://habr.com/ru/post/1261784/


All Articles