We use mutliple services in our Android application. These services provide their data as endless Observables , which are often created by combining the Observables other services. The construction of these Observables can be expensive. In addition, services are often consumed in several places, so their Observable should be shared among subscribers.
Example:
LocationService , provides an infinite Observable<Location> that emits the current locationReminderService , provides an infinite Observable<List<Reminder>> that emits a list of all stored reminders after each change to the datasetLocationAwareReminderService , provides an infinite Observable<List<Reminders>> nearby Observable.combineLatest Observables reminders from the previous two services
First approach: internal BehaviorSubjects as a cache
Each service combines the consumed Observables and signs it with an internal BehaviorSubject on the received feed. Consumers can then subscribe to this BehaviorSubject . LocationAwareReminderService for example:
public class LocationAwareReminderService { Observable<List<Reminder>> feed; public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) { BehaviorSubject<List<Reminder>> cache = BehaviorSubject.create(); Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() { @Override public List<Reminder> call(List<Reminder> reminders, Location location) { return calculateNearbyReminders(reminders, location); } }).subscribe(cache); feed = cache.asObservable(); } public Observable<List<Reminder>> getFeed() { return feed; } }
Inconvenience:
- Due to the Behavioral subject, service reminder channels and locatoinService are never decorated. Even if there is no consumer
- This is especially problematic if they depend on services such as LocationService, which often publish new items.
- because of the subscription (cache) in the designer, the service starts counting neighboring reminders, even if there is no subscriber.
Advantage:
- The resulting channel is used by all subscribers.
- because the channel never scrolls, short periods without a subscriber do not destroy the entire pipe.
Second approach: play (1) .refCount ().
public class LocationAwareReminderService { Observable<List<Reminder>> feed; public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) { feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() { @Override public List<Reminder> call(List<Reminder> reminders, Location location) { return calculateNearbyReminders(reminders, location); } }).replay(1).refCount(); } public Observable<List<Reminder>> getFeed() { return feed; } }
Inconvenience:
- short periods without
Subscriber destroy the entire pipe. During the next subscription, the entire pipe needs reconstruction. - Transitions from
Activity A to Activity B, both subscribe to LocationAwareReminderService.getFeed() , lead to a complete de- and reconstruction of the pipe
Advantage:
- after the last
Subscriber not signed, LocationAwareReminderService will also unsubscribe from LocationService.getFeed() and reminderService.getFeed() Observables . LocationAwareReminderService just starting to provide neighboring Reminders after the first Subscriber signed- the resulting feed is shared by all
Subscriber s
Third approach: unsubscribe refCount using timeout
Therefore, I am building a Transformer that supports subscription for a certain period after the last Subscriber canceled the subscription
public class RxPublishTimeoutCache<T> implements Observable.Transformer<T, T> { private long keepAlive; private TimeUnit timeUnit; public RxPublishTimeoutCache(long keepAlive, TimeUnit timeUnit) { this.keepAlive = keepAlive; this.timeUnit = timeUnit; } @Override public Observable<T> call(Observable<T> upstream) { final Observable<T> sharedUpstream = upstream.replay(1).refCount(); return Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(Subscriber<? super T> subscriber) { if (subscriber.isUnsubscribed()) return;
LocationAwareReminderService using RxPublishTimeoutCache
public class LocationAwareReminderService { Observable<List<Reminder>> feed; public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) { feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() { @Override public List<Reminder> call(List<Reminder> reminders, Location location) { return calculateNearbyReminders(reminders, location); } }).compose(new RxPublishTimeoutCache<List<Reminder>>(10, TimeUnit.SECONDS)); } public Observable<List<Reminder>> getFeed() { return feed; } }
Advantage:
LocationAwareReminderService just starting to provide neighboring Reminders after the first Subscriber signed- The resulting channel is used by all subscribers.
- short periods without a subscriber do not collapse the entire chain.
- the whole pipe will fork after there is no subscription for a certain period of time
Inconvenience:
Questions:
- Is there any other way to achieve this in RxJava?
- Is there any general design error in
RxPublishTimeoutCache ? - Is the overall strategy compiling such services with an RxJava error?