Create an infinite sequence of natural numbers using RxJava

I am trying to write a simple program using RxJava to generate an infinite sequence of natural numbers. So I found two ways to generate a sequence of numbers using Observable.timer () and Observable.interval () . I am not sure if these functions are suitable for solving this problem. I was expecting a simple function like the one we have in Java 8 to generate infinite natural numbers.

IntStream.iterate (1, value β†’ value +1) .forEach (System.out :: println);

I tried using IntStream with Observable, but this does not work correctly. It sends an endless stream of numbers only to the first caller. How can I correctly generate an infinite sequence of natural numbers?

import rx.Observable; import rx.functions.Action1; import java.util.stream.IntStream; public class NaturalNumbers { public static void main(String[] args) { Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> { IntStream stream = IntStream.iterate(1, val -> val + 1); stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber)); }); Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber); Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber); Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber); naturalNumbers.subscribe(first); naturalNumbers.subscribe(second); naturalNumbers.subscribe(third); } } 
+6
source share
3 answers

The problem is that you are inserting naturalNumbers.subscribe(first); OnSubscribe , you execute forEach through an endless stream, so your program never ends.

One way to solve this problem is to subscribe them to another thread asynchronously. To easily see the results, I had to enter a dream into Stream processing:

 Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> { IntStream stream = IntStream.iterate(1, i -> i + 1); stream.peek(i -> { try { // Added to visibly see printing Thread.sleep(50); } catch (InterruptedException e) { } }).forEach(subscriber::onNext); }); final Subscription subscribe1 = naturalNumbers .subscribeOn(Schedulers.newThread()) .subscribe(first); final Subscription subscribe2 = naturalNumbers .subscribeOn(Schedulers.newThread()) .subscribe(second); final Subscription subscribe3 = naturalNumbers .subscribeOn(Schedulers.newThread()) .subscribe(third); Thread.sleep(1000); System.out.println("Unsubscribing"); subscribe1.unsubscribe(); subscribe2.unsubscribe(); subscribe3.unsubscribe(); Thread.sleep(1000); System.out.println("Stopping"); 
+3
source

Observable.Generate is exactly the operator that decisively solves this class of problem. I also assume this is a pedagogical example, since using an iterative for this is probably better in any case.

Your code generates the entire stream in the subscriber stream. Since this is an endless stream, the subscribe call will never end. Apart from this obvious problem, canceling your subscription will also be problematic as you are not checking it in your cycle.

You want to use a scheduler to solve this problem - of course, do not use subscribeOn , as this will burden all observers. Schedule the delivery of each number to onNext - and as the last step in each planned action, plan the next.

Essentially, this is what Observable.Generate gives - each iteration is scheduled on the provided scheduler (the default value that introduces concurrency if you do not specify it is used). Scheduler operations can be undone and avoid starvation.

Rx.NET solves it like this (actually there is an async/await model that is better, but not available in Java afaik):

 static IObservable<int> Range(int start, int count, IScheduler scheduler) { return Observable.Create<int>(observer => { return scheduler.Schedule(0, (i, self) => { if (i < count) { Console.WriteLine("Iteration {0}", i); observer.OnNext(start + i); self(i + 1); } else { observer.OnCompleted(); } }); }); } 

Two things can be noted here:

  • The schedule call returns a subscription descriptor that is passed back to the observer
  • The schedule is recursive - the self parameter is a reference to the scheduler used to invoke the next iteration. This allows you to unsubscribe to cancel the operation.

Not sure how it looks in RxJava, but the idea should be the same. Again, Observable.Generate will probably be easier for you as it was designed to take care of this scenario.

+2
source

When creating infinite sequences, you should pay attention to:

  • subscribe and watch on different threads; otherwise you will only serve one subscriber
  • stop generating values ​​as soon as the subscription is completed; otherwise fluent loops will eat your processor.

The first problem is solved using subscribeOn() , observeOn() and various schedulers.

The second problem is best solved using the methods provided by the library methods Observable.generate() or Observable.fromIterable() . They perform due diligence.

Check this:

 Observable<Integer> naturalNumbers = Observable.<Integer, Integer>generate(() -> 1, (s, g) -> { logger.info("generating {}", s); g.onNext(s); return s + 1; }).subscribeOn(Schedulers.newThread()); Disposable sub1 = naturalNumbers .subscribe(v -> logger.info("1 got {}", v)); Disposable sub2 = naturalNumbers .subscribe(v -> logger.info("2 got {}", v)); Disposable sub3 = naturalNumbers .subscribe(v -> logger.info("3 got {}", v)); Thread.sleep(100); logger.info("unsubscribing..."); sub1.dispose(); sub2.dispose(); sub3.dispose(); Thread.sleep(1000); logger.info("done"); 
+1
source

Source: https://habr.com/ru/post/986548/


All Articles