I am trying to understand RxJava and work in the following situation.
Consider the following method that returns an observable that calls NsdManager.registerService . The registerService method needs a listener that is called when registration is successful (or unsuccessful).
public Observable<Boolean> registerService() { return Observable.create(new Observable.OnSubscribe<Boolean>() { @Override public void call(Subscriber<? super Boolean> subscriber) { nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, registrationListener);
The observer can provide a notification only after the listener is called, but the listener is called asynchronously.
How can I do this using RxJava?
I came up with the following using a BehaviorSubject. I don't know if this is the best solution, but it works.
private BehaviorSubject<Boolean> registrationSubject; public Observable<Boolean> registerService() { registrationSubject = BehaviorSubject.create(); Observable.create(new Observable.OnSubscribe<Boolean>() { @Override public void call(Subscriber<? super Boolean> subscriber) { NsdServiceInfo serviceInfo = new NsdServiceInfo(); serviceInfo.setServiceName(serviceName); serviceInfo.setServiceType(NSD_SERVICE_TYPE); serviceInfo.setPort(serverSocket.getLocalPort()); nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, registrationListener); } }).subscribe(registrationSubject); return registrationSubject; } private NsdManager.RegistrationListener registrationListener = new NsdManager.RegistrationListener() { @Override public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { registrationSubject.onNext(false); registrationSubject.onCompleted(); } @Override public void onServiceRegistered(NsdServiceInfo serviceInfo) { registrationSubject.onNext(true); registrationSubject.onCompleted(); } @Override public void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { } @Override public void onServiceUnregistered(NsdServiceInfo serviceInfo) {} };
source share