I want to generate Observable in real time from the results of the Futures list.
In the simplest case, suppose I have a list of futures that I start with Future.sequence , and I just track their progress with the Observable , which tells me every time it is completed. I basically do it like this:
def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = { Observable[String](observer => { val loudFutures: List[Future[Int]] = futs.map(f => { f onComplete { case Success(a) => observer.onNext(s"just did $a more") case Failure(e) => observer.onError(e) } f }) Future.sequence(loudFutures) onComplete { case Success(_) => observer.onCompleted() case Failure(e) => observer.onError(e) } }) }
This works fine in my test environment. But I just read that onNext should not be called from different threads, at least not fearing that there are no calls. What is the recommended way to fix this? It seems that many uses of Observables in the real world require that onNext called from async code like this, but I cannot find a similar example in the docs.
thund source share