I want to generate Observable
in real time from the results of the Futures
list.
In the simplest case, suppose I have a list of futures that I start with Future.sequence
, and I just track their progress with the Observable
, which tells me every time it is completed. I basically do it like this:
def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = { Observable[String](observer => { val loudFutures: List[Future[Int]] = futs.map(f => { f onComplete { case Success(a) => observer.onNext(s"just did $a more") case Failure(e) => observer.onError(e) } f }) Future.sequence(loudFutures) onComplete { case Success(_) => observer.onCompleted() case Failure(e) => observer.onError(e) } }) }
This works fine in my test environment. But I just read that onNext
should not be called from different threads, at least not fearing that there are no calls. What is the recommended way to fix this? It seems that many uses of Observables
in the real world require that onNext
called from async code like this, but I cannot find a similar example in the docs.
thund source share