Retrieving Items from a SubmissionPublisher Jet Stream

I tested some new features in Java 9. So I put together a test to have a publisher emitting numbers at a given speed. I also implemented a subscriber to listen to these publications and simply printed them for the console.

Although I can not fully understand how to use this Api, because the method onNext()does not print anything, but getLastItem()returns only 0.

The only part that seems to work is the onSubscribe()one that correctly initializes the variable lastItem.

@Test
public void testReactiveStreams(){
    //Create Publisher
    SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

    //Register Subscriber
    TestIntegerSubscriber subscriber = new TestIntegerSubscriber();
    publisher.subscribe(subscriber);

    assertTrue(publisher.hasSubscribers());

    //Publish items
    System.out.println("Publishing Items...");

    List.of(1,2,3,4,5).stream().forEach(i -> {
        publisher.submit(i);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // blah
        }
    });
    assertEquals(5, subscriber.getLastItem());

    publisher.close();
}


private class TestIntegerSubscriber implements Flow.Subscriber<Integer> {

    private int lastItem;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("Subscribed");
        lastItem = 0;
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Received : "+item);
        lastItem += 1; // expect increment by 1
        assertTrue(lastItem == item);
    }

    @Override
    public void onError(Throwable throwable) {
        // nothing for the moment
    }

    @Override
    public void onComplete() {
        System.out.println("Completed");
    }

    public int getLastItem(){
        return lastItem;
    }
}

Can someone tell me what I'm doing wrong in my testing, please? I expect the test to print these numbers and return 5 as the last element.

, Observables Subjects Angular2, .

+4
2

API ( RxJava), , , , . JDK 9 , .

TestIntegerSubscriber onSubscription, 10, , onNext, , 10 .

Flow API, . , :

  • Publisher Subscriber.
  • Publisher::subscribe.
  • Subscription Subscriber::onSubscription , .
  • - Subscription::request, .
  • , Subscriber::onNext. .
  • - Subscriber::onComplete Subscriber::onError .
  • , , Subscription::cancel.
+3

, , , Subscriber Subscription :

public class MySubscriber<T> implements Subscriber<T> {  
  private Subscription subscription;  

...

@Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("Subscribed");
            subscription.request(1);
            lastItem = 0;
        }

...

  @Override
        public void onNext(Integer item) {
            System.out.println("Received : "+item);
            lastItem += 1; // expect increment by 1
            assertTrue(lastItem == item);
            subscription.request(1);
        }

subscription.request(1); , , 1 . ... , - .

0

Source: https://habr.com/ru/post/1672294/


All Articles