RxJs splits a thread into multiple threads

How can I split an infinite stream into several finite flows based on the grouping method?

--a--aaaab---bb--bcc---ccddde...> 

into these observables

 --a--aaaa-| b---bb--b-| cc---cc-| ddd-| e...> 

As you can see, a is at the beginning, and after getting b I no longer get a , so it should be finished. This is why a normal groupBy not suitable.

+5
source share
3 answers

You can use window and share source Observable. There's also a little trick with bufferCount(2, 1) :

 const str = 'aaaaabbbbccccddd-e'; const source = Observable.from(str.split('-'), Rx.Scheduler.async).share(); source .bufferCount(2, 1) // delay emission by one item .map(arr => arr[0]) .window(source .bufferCount(2, 1) // keep the previous and current item .filter(([oldValue, newValue]) => oldValue !== newValue) ) .concatMap(obs => obs.toArray()) .subscribe(console.log); 

This prints (due to toArray() ):

 [ 'a', 'a', 'a', 'a', 'a' ] [ 'b', 'b', 'b', 'b' ] [ 'c', 'c', 'c', 'c' ] [ 'd', 'd', 'd' ] [ 'e' ] 

The problem with this solution is the order of subscriptions to source . We need to notify window in order to subscribe before the first bufferCount . Otherwise, the element is first placed further, and then it is checked whether it differs from the previous one with .filter(([oldValue, newValue]) ...) .

This means that you need to delay the radiation by one before the window (which is the first .bufferCount(2, 1).map(arr => arr[0]) .

Or maybe it’s easier to control the subscription order of publish() :

 const str = 'aaaaabbbbccccddd-e'; const source = Observable.from(str.split('-'), Rx.Scheduler.async).share(); const connectable = source.publish(); connectable .window(source .bufferCount(2, 1) // keep the previous and current item .filter(([oldValue, newValue]) => oldValue !== newValue) ) .concatMap(obs => obs.toArray()) .subscribe(console.log); connectable.connect(); 

The conclusion is the same.

+5
source

Maybe someone can come up with something simpler, but it works (fiddle: https://fiddle.jshell.net/uk01njgc/ ) ...

 let counter = 0; let items = Rx.Observable.interval(1000) .map(value => Math.floor(value / 3)) .publish(); let distinct = items.distinctUntilChanged() .publish(); distinct .map(value => { return items .startWith(value) .takeUntil(distinct); }) .subscribe(obs => { let obsIndex = counter++; console.log('New observable'); obs.subscribe( value => { console.log(obsIndex.toString() + ': ' + value.toString()); }, err => console.log(err), () => console.log('Completed observable') ); }); distinct.connect(); items.connect(); 
+3
source

Here's an option that completes subscription sharing for you ...

 const stream = ...; // an Observable<Observable<T>> // each inner observable completes when the value changes const split = Observable .create(o => { const connected = stream.publish(); // signals each time the values change (ignore the initial value) const newWindowSignal = connected.distinctUntilChanged().skip(1); // send the observables to our observer connected.window(newWindowSignal).subscribe(o); // now "start" return connected.connect(); }); 
+2
source

Source: https://habr.com/ru/post/1272756/


All Articles