RxJava share with latest value for new subscribers

I created an Observable as a result of a BehaviourSubject conversion with many features. Now I want to share the values โ€‹โ€‹of this Observable , so the function chain will not be re-executed for each new subscriber. I also want the shared copy to behave the same as the original, i.e. A newly arrived subscriber should receive the last emitted value immediately after subscribing.

At 0.20.x you could use multicast(subjectFactory).refCount() with the factory of BehaviourSubject 'or just use share(initialValue) , which in turn used a BehaviourSubject instead of PublishSubject .

How to achieve the same behavior in 1.0.x ?

+5
source share
1 answer

I think you can replace multicast(behaviorSubjectFactory).refCount() with replay(1).refCount() .

To make the discussion more specific, here is a complete example (in Scala):

 @volatile var startTime: Long = 0 def printTimestamped(s: String) { println(s"[t=${System.currentTimeMillis-startTime}] $s") } // Suppose for simplicity that the UI Events are just ticks of a // hot timer observable. val uiEvents = Observable.timer(1000 millis, 1000 millis) .doOnEach(i => printTimestamped("producing " + i)) .publish // Now apply all the transformations val transformed = uiEvents.map(i => i + 101) .doOnEach(i => printTimestamped("transformed to " + i)) // And set a default start value val o1 = transformed.startWith(100) // Share and make sure new subscribers get the last element replayed // immediately after they subscribe: val o2 = o1.replay(1).refCount // startTime is just before we start the uiEvents observable startTime = System.currentTimeMillis val subscriptionUiEvents = uiEvents.connect Thread.sleep(500) printTimestamped("subscribing A") val subscriptionA = o2.subscribe(i => printTimestamped("A got " + i)) Thread.sleep(2000) printTimestamped("subscribing B") val subscriptionB = o2.subscribe(i => printTimestamped("B got " + i)) Thread.sleep(2000) printTimestamped("unsubscribing B") subscriptionB.unsubscribe() Thread.sleep(2000) printTimestamped("unsubscribing A") subscriptionA.unsubscribe() // Now the transformations will stop being executed, but the UI // events will still be produced Thread.sleep(2000) // Finally, also stop the UI events: subscriptionUiEvents.unsubscribe() 

Conclusion:

 [t=505] subscribing A [t=519] A got 100 [t=1002] producing 0 [t=1003] transformed to 101 [t=1003] A got 101 [t=2002] producing 1 [t=2002] transformed to 102 [t=2002] A got 102 [t=2520] subscribing B [t=2521] B got 102 [t=3003] producing 2 [t=3003] transformed to 103 [t=3003] A got 103 [t=3003] B got 103 [t=4002] producing 3 [t=4002] transformed to 104 [t=4002] A got 104 [t=4002] B got 104 [t=4521] unsubscribing B [t=5003] producing 4 [t=5003] transformed to 105 [t=5003] A got 105 [t=6002] producing 5 [t=6002] transformed to 106 [t=6002] A got 106 [t=6522] unsubscribing A [t=7003] producing 6 [t=8002] producing 7 

Original answer:

Quote release notes for 1.0.0 :

Removed any method overload that took on its initial value, since the startWith statement already resolves this as a whole.

So instead of share(initialValue) just use share().startWith(initialValue) .

+7
source

Source: https://habr.com/ru/post/1207309/


All Articles