How to emit items from a collection with a delay in RxJava?

I have code emitting items from a collection.

// fills the given coll with some items and // returns the observable emitting the filled collection // does ASYNC work Observable<Collection> generate(Collection coll){ ... } def coll = [] generate(coll).flatMap({ filledColl -> rx.Observable.from(filledColl) }).subscribe({ ... }) 

The problem is that this collection can contain thousands of elements, and since generate works async, this code calls the subscription method thousands of times almost instantly (which is not required for the work that I do inside the observer).

How can I modify this code to emit items from a collection with a delay? For example: emit 100 elements, then wait 100 ms, then release another 100 elements or wait 10 ms before releasing the next element?

+2
source share
1 answer

Inside flatMap, you need to split the filledColl into smaller parts, hold each part and merge them all into one observable, which you will return inside flatMap.

 generate(coll).flatMap({ filledColl -> def chunkSize = 100 resultStream = rx.Observable.never() for (i in 0 ..< filledCol.size()/chunkSize) { def chunk = filledCol[i*chunkSize .. (i+1)*chunkSize] resultStream = resultStream.mergeWith( rx.Observable.from(chunk).delay(100*i, TimeUnit.MILLISECONDS) ) } resultStream }).subscribe({ ... }) 

This is just a tough idea, you can still test, configure and fix according to your needs. In addition, it might make sense to move this to the generation function, but it is up to you, since I cannot know what is in the generation ().

+2
source

Source: https://habr.com/ru/post/981627/


All Articles