How to tear down general, endless observables with a delay after the last subscriber unsubscribed

We use mutliple services in our Android application. These services provide their data as endless Observables , which are often created by combining the Observables other services. The construction of these Observables can be expensive. In addition, services are often consumed in several places, so their Observable should be shared among subscribers.

Example:

  • LocationService , provides an infinite Observable<Location> that emits the current location
  • ReminderService , provides an infinite Observable<List<Reminder>> that emits a list of all stored reminders after each change to the dataset
  • LocationAwareReminderService , provides an infinite Observable<List<Reminders>> nearby Observable.combineLatest Observables reminders from the previous two services

First approach: internal BehaviorSubjects as a cache

Each service combines the consumed Observables and signs it with an internal BehaviorSubject on the received feed. Consumers can then subscribe to this BehaviorSubject . LocationAwareReminderService for example:

 public class LocationAwareReminderService { Observable<List<Reminder>> feed; public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) { BehaviorSubject<List<Reminder>> cache = BehaviorSubject.create(); Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() { @Override public List<Reminder> call(List<Reminder> reminders, Location location) { return calculateNearbyReminders(reminders, location); } }).subscribe(cache); feed = cache.asObservable(); } public Observable<List<Reminder>> getFeed() { return feed; } } 

Inconvenience:

  • Due to the Behavioral subject, service reminder channels and locatoinService are never decorated. Even if there is no consumer
  • This is especially problematic if they depend on services such as LocationService, which often publish new items.
  • because of the subscription (cache) in the designer, the service starts counting neighboring reminders, even if there is no subscriber.

Advantage:

  • The resulting channel is used by all subscribers.
  • because the channel never scrolls, short periods without a subscriber do not destroy the entire pipe.

Second approach: play (1) .refCount ().

 public class LocationAwareReminderService { Observable<List<Reminder>> feed; public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) { feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() { @Override public List<Reminder> call(List<Reminder> reminders, Location location) { return calculateNearbyReminders(reminders, location); } }).replay(1).refCount(); } public Observable<List<Reminder>> getFeed() { return feed; } } 

Inconvenience:

  • short periods without Subscriber destroy the entire pipe. During the next subscription, the entire pipe needs reconstruction.
  • Transitions from Activity A to Activity B, both subscribe to LocationAwareReminderService.getFeed() , lead to a complete de- and reconstruction of the pipe

Advantage:

  • after the last Subscriber not signed, LocationAwareReminderService will also unsubscribe from LocationService.getFeed() and reminderService.getFeed() Observables .
  • LocationAwareReminderService just starting to provide neighboring Reminders after the first Subscriber signed
  • the resulting feed is shared by all Subscriber s

Third approach: unsubscribe refCount using timeout

Therefore, I am building a Transformer that supports subscription for a certain period after the last Subscriber canceled the subscription

 public class RxPublishTimeoutCache<T> implements Observable.Transformer<T, T> { private long keepAlive; private TimeUnit timeUnit; public RxPublishTimeoutCache(long keepAlive, TimeUnit timeUnit) { this.keepAlive = keepAlive; this.timeUnit = timeUnit; } @Override public Observable<T> call(Observable<T> upstream) { final Observable<T> sharedUpstream = upstream.replay(1).refCount(); return Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(Subscriber<? super T> subscriber) { if (subscriber.isUnsubscribed()) return; // subscribe an empty Subscriber that keeps the subsription of refCount() alive final Subscription keepAliveSubscription = sharedUpstream.subscribe(new NopSubscriber<T>()); // listen to unsubscribe from the subscriber subscriber.add(Subscriptions.create(new Action0() { @Override public void call() { // the subscriber unsubscribed Observable.timer(keepAlive, timeUnit).subscribe(new Action1<Long>() { @Override public void call(Long _) { // unsubscribe the keep alive subscription keepAliveSubscription.unsubscribe(); } }); } })); sharedUpstream.subscribe(subscriber); } }); } public class NopSubscriber<T> extends Subscriber<T> { @Override public void onCompleted() {} @Override public void onError(Throwable e) {} @Override public void onNext(T o) {} } } 

LocationAwareReminderService using RxPublishTimeoutCache

 public class LocationAwareReminderService { Observable<List<Reminder>> feed; public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) { feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() { @Override public List<Reminder> call(List<Reminder> reminders, Location location) { return calculateNearbyReminders(reminders, location); } }).compose(new RxPublishTimeoutCache<List<Reminder>>(10, TimeUnit.SECONDS)); } public Observable<List<Reminder>> getFeed() { return feed; } } 

Advantage:

  • LocationAwareReminderService just starting to provide neighboring Reminders after the first Subscriber signed
  • The resulting channel is used by all subscribers.
  • short periods without a subscriber do not collapse the entire chain.
  • the whole pipe will fork after there is no subscription for a certain period of time

Inconvenience:

  • Maybe some common flaw?

Questions:

  • Is there any other way to achieve this in RxJava?
  • Is there any general design error in RxPublishTimeoutCache ?
  • Is the overall strategy compiling such services with an RxJava error?
+6
source share
2 answers

I thought this was an interesting problem, and it seemed to me that this was a useful operator, so I did Transformers.delayFinalUnsubscribe in rxjava-extras :

 observable .publish() .refCount() .compose(Transformers .delayFinalUnsubscribe(1, TimeUnit.MINUTES)); 

It is available in rxjava-extras from 0.7.9.1 on Maven Central. Give it a spin if you want, and see if there are any problems.

+2
source

There is now a refCount overload that takes a timeout that does just that

0
source

Source: https://habr.com/ru/post/1247033/


All Articles