How to verify that the observable uses the correct schedulers in RxJava?

I have the following class (simplified):

public class Usecase<T> { private final Observable<T> get; private final Scheduler observeScheduler; public Usecase(Observable<T> get, Scheduler observeScheduler) { this.get = get; this.observeScheduler = observeScheduler; } public Observable<T> execute() { return get.subscribeOn(Schedulers.io()).observeOn(observeScheduler); } } 

And I write unit tests for him. How can I verify that subscribeOn and observeOn were called with the correct values?

I try the following:

  Observable<String> observable = mock(Observable.class); Usecase<String> usecase = new Usecase(observable, Schedulers.computation()); usecase.execute(); verify(observable).subscribeOn(Schedulers.computation()); // should fail here, but passes verify(observable).observeOn(Schedulers.computation()); // should pass, but fails: Missing method call for verify(mock) here 

The above does not work (I think) because subscribeOn and observeOn are final methods. Maybe there is another way to ensure that the observable uses the right planners?

+5
source share
2 answers

There is a way to indirectly access both threads that the observable is running on and which can be observed, which means that you can really verify that the Observable uses the correct schedulers.

We restrict ourselves to checking threads by name. Fortunately, the streams used by Schedulers.io() are called with a consistent prefix with which we can map. Here (complete?) Is a list of planners that have unique link prefixes:

  • Schedulers.io() - RxCachedThreadScheduler
  • Schedulers.newThread() - RxNewThreadScheduler
  • Schedulers.computation() - RxComputThreadPool

To verify that an Observable has been subscribed to an I / O stream :

 // Calling Thread.currentThread() inside Observer.OnSubscribe will return the // thread the Observable is running on (Schedulers.io() in our case) Observable<String> obs = Observable.create((Subscriber<? super String> s) -> { s.onNext(Thread.currentThread().getName()); s.onCompleted(); }) // Schedule the Observable Usecase usecase = new Usecase(obs, Schedulers.immediate()); Observable usecaseObservable = usecase.execute(); // Verify the Observable emitted the name of the IO thread String subscribingThread = usecaseObservable.toBlocking().first(); assertThat(subscribingThread).startsWith("RxCachedThreadScheduler"); 

To check the observable observable in the computation flow , you can use TestSubscriber#getLastSeenThread to access the last thread used for observation.

 TestSubscriber<Object> subscriber = TestSubscriber.create(); UseCase usecase = new UseCase(Observable.empty(), Schedulers.computation()) usecase.execute().subscribe(subscriber); // The observable runs asynchronously, so wait for it to complete subscriber.awaitTerminalEvent(); subscriber.assertNoErrors(); // Verify the observable was observed on the computation thread String observingThread = subscriber.getLastSeenThread().getName(); assertThat(observingThread).startsWith("RxComputationThreadPool"); 

There is no need for third-party libraries or ridicule, although I use AssertJ for the free startsWith statement.

+3
source

Using operators in an Observable returns a new Observable , so any subsequent operator application will be executed on another object. You will need to follow the composition chart in Observable to find out which operators were applied and with which parameters.

This is not supported in RxJava, and you must rely on internal details and reflection.

Typically, you cannot guess anything about the location where the events are coming from, but you can use observeOn to make sure that they fall into the stream of your choice.

+2
source

Source: https://habr.com/ru/post/1246707/


All Articles