Limit bandwidth with RxJava

The case that I need now is quite difficult to explain, so I will write a simple version to explain the problem.

I have Observable.from() which emits a sequence of files defined by an ArrayList file. All these files must be uploaded to the server. For this, I have a function that does the job and returns an Observable .

 Observable<Response> uploadFile(File file); 

When I run this code, it gets crazy, Observable.from() emits all the files, and they are loaded all on one, or at least for the maximum number of threads that it can handle.

I want to have a maximum of 2 file downloads in parallel. Is there any operator that can handle this for me?

I tried a buffer , a window, and some others, but they only seem to select two elements together, instead of constantly loading two parallel files. I also tried to set the maximum thread pool in the download part, but this cannot be used in my case.

Should there be a simple statement for this right? Did I miss something?

+5
source share
1 answer

I think that all files are loaded in parallel, because you use flatMap() , which performs all conversions at the same time. Instead, you should use concatMap() , which performs one conversion after another. And to run two parallel downloads, you need to call window(2) on your observed files, and then call flatMap() , as it was in your code.

 Observable<Response> responses = files .window(2) .concatMap(windowFiles -> windowFiles.flatMap(file -> uploadFile(file)); ); 

UPDATE

I found the best solution that does exactly what you want. There is an overload of flatMap() , which takes the maximum number of simultaneous threads.

 Observable<Response> responses = files .onBackpressureBuffer() .flatMap(index -> { return uploadFile(file).subscribeOn(Schedulers.io()); }, 2); 
+4
source

Source: https://habr.com/ru/post/1246327/


All Articles