You can use window and share source Observable. There's also a little trick with bufferCount(2, 1) :
const str = 'aaaaabbbbccccddd-e'; const source = Observable.from(str.split('-'), Rx.Scheduler.async).share(); source .bufferCount(2, 1) // delay emission by one item .map(arr => arr[0]) .window(source .bufferCount(2, 1) // keep the previous and current item .filter(([oldValue, newValue]) => oldValue !== newValue) ) .concatMap(obs => obs.toArray()) .subscribe(console.log);
This prints (due to toArray() ):
[ 'a', 'a', 'a', 'a', 'a' ] [ 'b', 'b', 'b', 'b' ] [ 'c', 'c', 'c', 'c' ] [ 'd', 'd', 'd' ] [ 'e' ]
The problem with this solution is the order of subscriptions to source . We need to notify window in order to subscribe before the first bufferCount . Otherwise, the element is first placed further, and then it is checked whether it differs from the previous one with .filter(([oldValue, newValue]) ...) .
This means that you need to delay the radiation by one before the window (which is the first .bufferCount(2, 1).map(arr => arr[0]) .
Or maybe it’s easier to control the subscription order of publish() :
const str = 'aaaaabbbbccccddd-e'; const source = Observable.from(str.split('-'), Rx.Scheduler.async).share(); const connectable = source.publish(); connectable .window(source .bufferCount(2, 1) // keep the previous and current item .filter(([oldValue, newValue]) => oldValue !== newValue) ) .concatMap(obs => obs.toArray()) .subscribe(console.log); connectable.connect();
The conclusion is the same.