I tested some new features in Java 9. So I put together a test to have a publisher emitting numbers at a given speed. I also implemented a subscriber to listen to these publications and simply printed them for the console.
Although I can not fully understand how to use this Api, because the method onNext()does not print anything, but getLastItem()returns only 0.
The only part that seems to work is the onSubscribe()one that correctly initializes the variable lastItem.
@Test
public void testReactiveStreams(){
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
TestIntegerSubscriber subscriber = new TestIntegerSubscriber();
publisher.subscribe(subscriber);
assertTrue(publisher.hasSubscribers());
System.out.println("Publishing Items...");
List.of(1,2,3,4,5).stream().forEach(i -> {
publisher.submit(i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
});
assertEquals(5, subscriber.getLastItem());
publisher.close();
}
private class TestIntegerSubscriber implements Flow.Subscriber<Integer> {
private int lastItem;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscribed");
lastItem = 0;
}
@Override
public void onNext(Integer item) {
System.out.println("Received : "+item);
lastItem += 1;
assertTrue(lastItem == item);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
System.out.println("Completed");
}
public int getLastItem(){
return lastItem;
}
}
Can someone tell me what I'm doing wrong in my testing, please? I expect the test to print these numbers and return 5 as the last element.
, Observables Subjects Angular2, .