I was curious to use cases for ConnectableObservableand thought it might be useful to turn expensive outliers from a cold observable (for example, from a database query) and emit them like hot ones. In this way, costly repetitions could be avoided, and a single set of emissions would be provided for all operators and subscribers.
However, after some thought experiments, I had some concerns that self-promotion in flatMaps could cause problems.
For example, let's say I emit values from 1 to 10 through ConnectableObservable. But I flatMap()each value sums all the values, and then subtracts the current value.
ConnectableObservable<Integer> source = Observable.range(1,10)
.doOnNext(System.out::println)
.publish();
source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
.subscribe(sum -> System.out.println("SUM - i: " + sum));
source.connect();
I was hoping that I would get this result.
1
2
3
4
5
6
7
8
9
10
SUM - i: 54
SUM - i: 53
SUM - i: 52
SUM - i: 51
SUM - i: 50
SUM - i: 49
SUM - i: 48
SUM - i: 47
SUM - i: 46
SUM - i: 45
But instead, I got it.
1
2
3
4
5
6
7
8
9
10
SUM - i: 53
SUM - i: 50
SUM - i: 46
SUM - i: 41
SUM - i: 35
SUM - i: 28
SUM - i: 20
SUM - i: 11
SUM - i: 1
SUM - i: -10
, flatMap() , , . , cache(), , flatMap().
Observable<Integer> source = Observable.range(1,10)
.doOnNext(System.out::println)
.cache();
source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
.subscribe(sum -> System.out.println("SUM - i: " + sum));
:
ConnectableObservable? , ?
, ConnectableObervable , ? cache() ?