Observable.Generate is exactly the operator that decisively solves this class of problem. I also assume this is a pedagogical example, since using an iterative for this is probably better in any case.
Your code generates the entire stream in the subscriber stream. Since this is an endless stream, the subscribe call will never end. Apart from this obvious problem, canceling your subscription will also be problematic as you are not checking it in your cycle.
You want to use a scheduler to solve this problem - of course, do not use subscribeOn , as this will burden all observers. Schedule the delivery of each number to onNext - and as the last step in each planned action, plan the next.
Essentially, this is what Observable.Generate gives - each iteration is scheduled on the provided scheduler (the default value that introduces concurrency if you do not specify it is used). Scheduler operations can be undone and avoid starvation.
Rx.NET solves it like this (actually there is an async/await model that is better, but not available in Java afaik):
static IObservable<int> Range(int start, int count, IScheduler scheduler) { return Observable.Create<int>(observer => { return scheduler.Schedule(0, (i, self) => { if (i < count) { Console.WriteLine("Iteration {0}", i); observer.OnNext(start + i); self(i + 1); } else { observer.OnCompleted(); } }); }); }
Two things can be noted here:
- The schedule call returns a subscription descriptor that is passed back to the observer
- The schedule is recursive - the
self parameter is a reference to the scheduler used to invoke the next iteration. This allows you to unsubscribe to cancel the operation.
Not sure how it looks in RxJava, but the idea should be the same. Again, Observable.Generate will probably be easier for you as it was designed to take care of this scenario.
source share