How to update observables in RxJava?

I have an observable that wraps an HTTP request

mObservable = retryObservable(mService.getAddressList(getUserId(), true, 1, Integer.MAX_VALUE, "id", true) .map(r -> { return r.getItems(); }) .observeOn(AndroidSchedulers.mainThread())); 

then subscription

 mSubscription = mObservable.subscribe(items -> { mAddressAdapter.swapItems(items); }, getActivityBase()::showError); 

When subscription initialization starts, cold watch is activated and an HTTP request is triggered. Now I know that the underlying data has changed, and I need to do the same, but a new request. I tried

 mSubscription.unsubscribe(); 

then call

 mObservable.subscribe(items -> {doSomething();}) 

again, since, in my opinion, the subscription should start observable, but it does not work. Any suggestions?

+6
source share
1 answer

After completing the Observable it does not publish any new items. This is an Rx contract.

Wrap your code in a method and create a new observable every time.

 Observable<?> getObservable() { return retryObservable(mService.getAddressList(getUserId(), true, 1, Integer.MAX_VALUE, "id", true) .map(r -> { return r.getItems(); }) .observeOn(AndroidSchedulers.mainThread())); } 


As mentioned in a comment by @DaveSexton, it is even better to use the defer function in RxJava

Do not create an Observable until the subscriber has signed; create fresh Observed with every subscription

Pass defer () The Observable factory function (the function that generates Observables), and defer () will return the Observable, which will call this function to generate its observable sequence again each time a new subscriber signs.

More details here: https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#defer

+9
source

Source: https://habr.com/ru/post/978962/


All Articles