I have a problem with my Observable chains not ending when I use a flat map.
I applied my example:
int count = Observable.just(1,2,3)
.flatMap(s -> Observable.<Integer>create(subscr-> {
subscr.onNext(s);
if(s>2) {
subscr.onCompleted();
}
}))
.doOnEach(a->System.err.println(a.getKind()+" -> "+a.toString()))
.count()
.toBlocking()
.first();
System.err.println("Count: "+count);
I would expect "doOnEach" to report three onNext events, followed by onCompleted, and finally the chain should end.
However, the conclusion is as follows:
OnNext -> [rx.Notification@c9e6d24d OnNext 1]
OnNext -> [rx.Notification@c9e6d24e OnNext 2]
OnNext -> [rx.Notification@c9e6d24f OnNext 3]
(and then continues to hang)
If I remove the flatMap operator:
int count = Observable.just(1,2,3)
.doOnEach(a->System.err.println(a.getKind()+" -> "+a.toString()))
.count()
.toBlocking()
.first();
System.err.println("Count: "+count);
... it works exactly as expected:
CREATED!
OnNext -> [rx.Notification@e3598bd9 OnNext 1]
OnNext -> [rx.Notification@e3598bda OnNext 2]
OnNext -> [rx.Notification@e3598bdb OnNext 3]
OnCompleted -> [rx.Notification@3834d63f OnCompleted]
Count: 3
I think that I am doing something wrong (I cannot imagine an error in such a basic scenario), but I do not see this.
Any help is appreciated ... (I am using RxJava 1.1.0)
source
share