This does not work because, despite using Rx, your data layer is not responding.
By its nature, the kingdom is a reactive data source, and its managed objects are also volatile in nature (updated at the Realm site) and thread-limited (available only in the same stream where Realm was opened).
For your code to work, you will need to copy data from Realm.
@Override public Single<List<ResponseZone>> entityList() { return Single.fromCallable(() -> { try(Realm realm = Realm.getDefaultInstance()) { return realm.copyFromRealm(realm.where(ResponseZone.class).findAll()); } }); }
I took the liberty and presented your Single as Single , believing that it is not Observable, it does not listen to changes, there is only 1 event, and this is the list itself, so sending it through an ObservableEmitter does not make sense, since it does not emit events.
Therefore, this is why I said: your data layer is not responding. You are not listening to change. You simply receive the data directly, and you are never notified of any changes; despite using Rx.
I drew several drawings in paint to illustrate my point. (blue means side effects)

in your case, you call a one-time operation to retrieve data from several data sources (cache, local, remote). When you receive it, you will not listen to the changes; technically, if you are editing data in one place and in another place, the only way to update is to "force the cache to get new data manually"; for which you should know that you changed the data somewhere else . Why do you need a way to directly call a callback or send a message / event - a change notification.
Therefore, you must create a cache invalidation notification event. And if you listen to it, the decision can become reactive again. Besides the fact that you do it manually.
----------------------------------------------- --- --------------------
Given that Realm is already a reactive data source (similar to SQLBrite for SQLite), it can provide change notifications by which you can "invalidate the cache."
In fact, if your local data source is the only data source, and any record from the network is the change you are listening to, then your "cache" can be written as replay(1).publish().refCount() (rewrite the latest data for new subscribers, replace the data with new ones if new data is evaluated), which is RxReplayingShare .

Using a Scheduler created from a handler thread looper , you can listen to Kingdom changes in the background thread by creating a reactive data source that returns updated unmanaged copies that you can transfer between threads (although matching directly with immutable domain models is preferable to copyFromRealm() if you choose this route - a path that is pure architecture).
return io.reactivex.Observable.create(new ObservableOnSubscribe<List<ResponseZone>>() { @Override public void subscribe(ObservableEmitter<List<ResponseZone>> emitter) throws Exception { final Realm observableRealm = Realm.getDefaultInstance(); final RealmResults<ResponseZone> results = observableRealm.where(ResponseZone.class).findAllAsync(); final RealmChangeListener<RealmResults<ResponseZone>> listener = results -> { if(!emitter.isDisposed()) { if(results.isValid() && results.isLoaded()) { emitter.onNext(observableRealm.copyFromRealm(results)); } } }; emitter.setDisposable(Disposables.fromRunnable(() -> { if(results.isValid()) { results.removeChangeListener(listener); } observableRealm.close(); })); results.addChangeListener(listener);
If the cycle planner is received as
handlerThread = new HandlerThread("LOOPER_SCHEDULER"); handlerThread.start(); synchronized(handlerThread) { looperScheduler = AndroidSchedulers.from(handlerThread.getLooper()); }
And thatβs how you create reactive clean architecture with Realm.
ADDED:
LooperScheduler is only needed if you intend to actually implement a clean architecture in Realm. This is because by default, Realm encourages you to use your data objects as domain models, and as an advantage, lazy downloadable local stream views are provided that change during the upgrade; but Clean Architecture says that you should use immutable domain models instead (regardless of your data level). Therefore, if you want to create a reactive clean architecture where you copy Realm in the background stream at any time when Realm changes, then you need a looper scheduler (or watch in the background stream, but copy from the updated Realm on Schedulers.io() ) .
With Realm, as a rule, you want to use RealmObjects as models of your domain and rely on a lazy assessment. In this case, you are not using copyFromRealm() , and you are not matching RealmResults with anything else; but you can open it as Flowable or LiveData .
You can read the related information here .