Rxjs: chunk flow and delays?

In short, trying to split a really large array into pieces of 10 and wait 5 seconds before emitting the next 10.

That's what i currently

Rx.Observable
   .from(hugeArray)
   .bufferCount(10) 
   .delay(5000) //want to wait 5 secs
   .flatMap(e => e) // this needs to go after to flatten the array, buffer spits out arrays of entries
   .flatMap( (data, index) => Rx.Observable.create(observer => {
       // going to render stuff here
       observer.onNext(data)
       observer.onCompleted();  

   }))
   .subscribe(val => console.log('Buffered Values:', val));

Just trying to make 10 pieces in 5 seconds, she was only able to make the initial delay, and then chose the rest.

+4
source share
1 answer

Your chain simply emitted everything at once, and then planned each piece to wait 5 seconds, starting from the same time so that the delay expired for all pieces at the same moment.

The solution may be to use concatMap(), which subscribes to each Observed one at a time.

Rx.Observable
    .from(hugeArray)
    .bufferCount(10)
    .concatMap(data => Rx.Observable.of(data).delay(5000))
    .flatMap(e => e) // or mergeAll() or concatAll()
    .flatMap( (data, index) => Rx.Observable.create(observer => {
        // going to render stuff here
        observer.onNext(data);
        observer.onCompleted();
    }))
    .subscribe(val => console.log('Buffered Values:', val));
+2
source

Source: https://habr.com/ru/post/1665374/


All Articles