Transformation to / from reactive is mandatory?
One way to fix this is val x = Observable.fromIterable((0 to 10).map(i => (s"a $i", s"b $i"))), but it will throw an OutOfMemoryError for infinity streams.
Another way is to use .multicast(Pipe.publish[])and then obs.connect()down the code:
import monix.execution.Scheduler.Implicits.global
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).iterator)
val y = Observable.toReactive(x)
val obsY = Observable.fromReactivePublisher(y)
val connectY = obsY.multicast(Pipe.publish[(String, String)])
val fileStream = connectY.mapAsync(5)(a => Task{println(a._1); a._1})
val dateStream = connectY.mapAsync(5)(a => Task{println(a._2); a._2})
fileStream.zip(dateStream)
.map(println)
.subscribe()
connectY.connect()
Thread.sleep(5000)
source
share