ConnectableObservable vs flatMap () self-reference?

I was curious to use cases for ConnectableObservableand thought it might be useful to turn expensive outliers from a cold observable (for example, from a database query) and emit them like hot ones. In this way, costly repetitions could be avoided, and a single set of emissions would be provided for all operators and subscribers.

However, after some thought experiments, I had some concerns that self-promotion in flatMaps could cause problems.

For example, let's say I emit values ​​from 1 to 10 through ConnectableObservable. But I flatMap()each value sums all the values, and then subtracts the current value.

    ConnectableObservable<Integer> source = Observable.range(1,10)
            .doOnNext(System.out::println)
            .publish();

    source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
            .subscribe(sum -> System.out.println("SUM - i: " + sum));

    source.connect();

I was hoping that I would get this result.

1
2
3
4
5
6
7
8
9
10
SUM - i: 54
SUM - i: 53
SUM - i: 52
SUM - i: 51
SUM - i: 50
SUM - i: 49
SUM - i: 48
SUM - i: 47
SUM - i: 46
SUM - i: 45

But instead, I got it.

1
2
3
4
5
6
7
8
9
10
SUM - i: 53
SUM - i: 50
SUM - i: 46
SUM - i: 41
SUM - i: 35
SUM - i: 28
SUM - i: 20
SUM - i: 11
SUM - i: 1
SUM - i: -10

, flatMap() , , . , cache(), , flatMap().

    Observable<Integer> source = Observable.range(1,10)
            .doOnNext(System.out::println)
            .cache();

    source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
            .subscribe(sum -> System.out.println("SUM - i: " + sum));

:

  • ConnectableObservable? , ?

  • , ConnectableObervable , ? cache() ?

+4
1

ConnectableObservable? , ?

, , , , . , 1 , 2 10.

, ConnectableObervable , ? () ?

ConnectableObservable, publish, Subscriber : , , .

+2

Source: https://habr.com/ru/post/1615065/


All Articles