RxJava: an observable that contains an asynchronous call

I am trying to understand RxJava and work in the following situation.

Consider the following method that returns an observable that calls NsdManager.registerService . The registerService method needs a listener that is called when registration is successful (or unsuccessful).

 public Observable<Boolean> registerService() { return Observable.create(new Observable.OnSubscribe<Boolean>() { @Override public void call(Subscriber<? super Boolean> subscriber) { nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, registrationListener); // how to proceed? } }); } 

The observer can provide a notification only after the listener is called, but the listener is called asynchronously.

How can I do this using RxJava?


I came up with the following using a BehaviorSubject. I don't know if this is the best solution, but it works.

 private BehaviorSubject<Boolean> registrationSubject; public Observable<Boolean> registerService() { registrationSubject = BehaviorSubject.create(); Observable.create(new Observable.OnSubscribe<Boolean>() { @Override public void call(Subscriber<? super Boolean> subscriber) { NsdServiceInfo serviceInfo = new NsdServiceInfo(); serviceInfo.setServiceName(serviceName); serviceInfo.setServiceType(NSD_SERVICE_TYPE); serviceInfo.setPort(serverSocket.getLocalPort()); nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, registrationListener); } }).subscribe(registrationSubject); return registrationSubject; } private NsdManager.RegistrationListener registrationListener = new NsdManager.RegistrationListener() { @Override public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { registrationSubject.onNext(false); registrationSubject.onCompleted(); } @Override public void onServiceRegistered(NsdServiceInfo serviceInfo) { registrationSubject.onNext(true); registrationSubject.onCompleted(); } @Override public void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { } @Override public void onServiceUnregistered(NsdServiceInfo serviceInfo) {} }; 
+6
source share
2 answers

I think it's best to avoid using Subjects whenever possible. In your solution, you only use a theme to call onNext and onCompleted . However, as part of the Observable.create() method, you already have access to a subscriber where you can call these methods. In other words, you can complete the full setup of the event handler inside the Observable.create() method.

 public Observable<Boolean> registerService() { return Observable.create(new Observable.OnSubscribe<Boolean>() { @Override public void call(final Subscriber<? super Boolean> subscriber) { NsdServiceInfo serviceInfo = new NsdServiceInfo(); serviceInfo.setServiceName(serviceName); serviceInfo.setServiceType(NSD_SERVICE_TYPE); serviceInfo.setPort(serverSocket.getLocalPort()); nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, new NsdManager.RegistrationListener() { @Override public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { if (!subscriber.isUnsubscribed()) { subscriber.onNext(false); subscriber.onCompleted(); } } @Override public void onServiceRegistered(NsdServiceInfo serviceInfo) { if (!subscriber.isUnsubscribed()) { subscriber.onNext(true); subscriber.onCompleted(); } } @Override public void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { } @Override public void onServiceUnregistered(NsdServiceInfo serviceInfo) {} } ); } }); } 
+3
source

Inside the listener implementation call:

 subscriber.onNext(result) subscriber.onComplete() 

result is a boolean passed to the listener.

+2
source

Source: https://habr.com/ru/post/973700/


All Articles