I am studying RxJava and its applicability to Android, and I am trying to implement a simple cache cache usage example, as shown in the following ASCII graph:
--------------- --- failure --| Load data |-- success --- | --------------- | VV ------------------- ------------- | Get from cache | | Filter | ------------------- ------------- | | | V | ---------------- ------------- ------------>| Display |<------| Cache | ---------------- -------------
Here is the code I originally came up with:
subscription = AndroidObservable.bindFragment(this, restClient.getItems()) .onErrorReturn(new Func1<Throwable, List<Item>>() { @Override public List<Item> call(Throwable throwable) { return itemsDao.getCachedItems(); } }) .flatMap(new Func1<ItemContainer, Observable<Item>>() { @Override public Observable<Item> call(ItemContainer itemContainer) { return Observable.from(itemContainer.getItems()); } }) .filter(new Func1<Item, Boolean>() { @Override public Boolean call(Item item) { return item.getName().startsWith("B"); } }) .toList() .map(new Func1<List<Item>, List<Item>>() { @Override public List<Item> call(List<Item> items) { itemsDao.cacheItems(items); return items; } }) .subscribeOn(Schedulers.from(AsyncTask.THREAD_POOL_EXECUTOR)) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<List<Item>>() { @Override public void call(List<Item> items) { displayData(items); } });
As expected, network communication and caching are performed in the background thread, and data is displayed in the user interface thread. The problem is that the data returned by onErrorReturn() goes through the same filtering and caching cycle that is redundant. However, if I changed the code to this:
subscription = AndroidObservable.bindFragment(this, restClient.getItems()) .flatMap(new Func1<ItemContainer, Observable<Item>>() { @Override public Observable<Item> call(ItemContainer itemContainer) { return Observable.from(itemContainer.getItems()); } }) .filter(new Func1<Item, Boolean>() { @Override public Boolean call(Item item) { return item.getName().startsWith("B"); } }) .toList() .map(new Func1<List<Item>, List<Item>>() { @Override public List<Item> call(List<Item> items) { itemsDao.cacheItems(items); return items; } }) .onErrorReturn(new Func1<Throwable, List<Item>>() { @Override public List<Item> call(Throwable throwable) { return itemsDao.getCachedItems(); } }) .subscribeOn(Schedulers.from(AsyncTask.THREAD_POOL_EXECUTOR)) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<List<Item>>() { @Override public void call(List<Item> items) { displayData(items); } });
displayData() never called. What would be the right way to compose these observables to implement the circuit that I have?
source share