Java 9 Publisher Stream Presentation Behavior

I played with the Java Flow offer operator, but after reading the documentation and completing my test, I do not understand.

Here is my test

 @Test public void offer() throws InterruptedException { //Create Publisher for expected items Strings SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); //Register Subscriber publisher.subscribe(new CustomSubscriber<>()); publisher.subscribe(new CustomSubscriber<>()); publisher.subscribe(new CustomSubscriber<>()); publisher.offer("item", (subscriber, value) -> false); Thread.sleep(500); } 

The offer operator receives the element to be emitted and the BiPredicate function, and as I understand it, after reading the documentation, only if the predicate function is true, the element will be emitted.

After passing the test, the result:

 Subscription done: Subscription done: Subscription done: Got : item --> onNext() callback Got : item --> onNext() callback Got : item --> onNext() callback 

There is no change as a result if instead of false I return true.

Can someone explain this operator to me a little better please.

+5
source share
2 answers

No, the predicate function is used to decide whether to repeat the publish operation, as indicated in the docs :

onDrop - if not null, the handler is called upon transition to the subscriber with the arguments of the subscriber and the element; if it returns true, the sentence is retried (once)

This does not affect whether the item should initially be sent.

EDIT: An example of how drops can occur when using the offer method

I came up with an example of how drops can occur when calling the offer method. I do not think that the output is 100% deterministic, but there is a clear difference when it starts several times. You can simply change the handler to return true instead of false to see how repeating reduces drops due to saturated buffers. In this example, a crash usually occurs because the maximum buffer capacity is clearly small (passed to the SubmissionPublisher constructor). But when restart is enabled after a short wait period, the droplets are removed:

 public class SubmissionPubliserDropTest { public static void main(String[] args) throws InterruptedException { // Create Publisher for expected items Strings // Note the small buffer max capacity to be able to cause drops SubmissionPublisher<String> publisher = new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2); // Register Subscriber publisher.subscribe(new CustomSubscriber<>()); publisher.subscribe(new CustomSubscriber<>()); publisher.subscribe(new CustomSubscriber<>()); // publish 3 items for each subscriber for(int i = 0; i < 3; i++) { int result = publisher.offer("item" + i, (subscriber, value) -> { // sleep for a small period before deciding whether to retry or not try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } return false; // you can switch to true to see that drops are reduced }); // show the number of dropped items if(result < 0) { System.err.println("dropped: " + result); } } Thread.sleep(3000); publisher.close(); } } class CustomSubscriber<T> implements Flow.Subscriber<T> { private Subscription sub; @Override public void onComplete() { System.out.println("onComplete"); } @Override public void onError(Throwable th) { th.printStackTrace(); sub.cancel(); } @Override public void onNext(T arg0) { System.out.println("Got : " + arg0 + " --> onNext() callback"); sub.request(1); } @Override public void onSubscribe(Subscription sub) { System.out.println("Subscription done"); this.sub = sub; sub.request(1); } } 
+5
source

SubmissionPublisher.offer indicates that

An element can be reset by one or more subscribers if resource limits are exceeded, in which case this handler (if not null) is called, and if it returns true , it is repeated once.

Just to understand in both of your calls

 publisher.offer("item", (subscriber, value) -> true); // the handler would be invoked publisher.offer("item", (subscriber, value) -> false); // the handler wouldn't be invoked 

But still publisher publishes this item for each of its current subscriber. that happens in your current scenario.


A script to check whether the handler you provided is called or not is trying to reproduce with tight resource limits, as the document suggests:

An element can be reset by one or more subscribers if resource limits are exceeded, in which case this handler (if not null) is called, and if it returns true, it is repeated once.

However, you can try resetting items with minimum timeouts set to the minimum value using the overloaded method for offer​(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)

timeout - how long to wait for resources for any subscriber before failure in units

unit - TimeUnit that defines how to interpret the timeout parameter

Since the offer methods can discard elements (either immediately or with a limited timeout ), this will provide the ability to insert a handler and then try again.

+4
source

Source: https://habr.com/ru/post/1272247/


All Articles