In RxJava, I cannot emit onComplete from flatMap

I have a problem with my Observable chains not ending when I use a flat map.

I applied my example:

int count = Observable.just(1,2,3)
        .flatMap(s -> Observable.<Integer>create(subscr-> {
            subscr.onNext(s);
            if(s>2) {
                subscr.onCompleted();
            }
        }))
        .doOnEach(a->System.err.println(a.getKind()+" -> "+a.toString()))
        .count()
        .toBlocking()
        .first();
    System.err.println("Count: "+count);

I would expect "doOnEach" to report three onNext events, followed by onCompleted, and finally the chain should end.

However, the conclusion is as follows:

OnNext -> [rx.Notification@c9e6d24d OnNext 1]
OnNext -> [rx.Notification@c9e6d24e OnNext 2]
OnNext -> [rx.Notification@c9e6d24f OnNext 3]

(and then continues to hang)

If I remove the flatMap operator:

int count = Observable.just(1,2,3)
        .doOnEach(a->System.err.println(a.getKind()+" -> "+a.toString()))
        .count()
        .toBlocking()
        .first();
System.err.println("Count: "+count);

... it works exactly as expected:

CREATED!
OnNext -> [rx.Notification@e3598bd9 OnNext 1]
OnNext -> [rx.Notification@e3598bda OnNext 2]
OnNext -> [rx.Notification@e3598bdb OnNext 3]
OnCompleted -> [rx.Notification@3834d63f OnCompleted]
Count: 3

I think that I am doing something wrong (I cannot imagine an error in such a basic scenario), but I do not see this.

Any help is appreciated ... (I am using RxJava 1.1.0)

+4
source share
1 answer

The problem is the following lines:

        if(s>2) {
            subscr.onCompleted();
        }

Observable s == 1 s == 2 onCompleted. flatMap onCompleted, Observable , onCompleted .

+8

Source: https://habr.com/ru/post/1624350/


All Articles