Adding a thread pool to an RxJava thread

I was wondering if I can use the RxJava library to add some concurrency to the following use case:

  • Select String columns sequentially from an existing ResultSet with a custom Observable (something like ResultSetObservable.create(resultSet) )
  • Call a web service for each of these values ​​(for example, with an instance of InvokeWebServiceFunc1<String, Pair<String, Integer>>() ) to get some statistics related to String (note that String in Pair like the one was entered into the entrance)
  • Print everything in CSV format (using ExportAsCSVAction1<Pair<String, Integer>>(PrintStream printStream) ).

So here is what I have:

 ResultSetObservable.create(resultSet) .map(new InvokeWebServiceFunc1<String, Pair<String, Integer>>()) .subscribe(new ExportAsCSVAction1<Pair<String, Integer>>(System.out)); 

It works well, but since the web service may take some time for some String input, I want to add using a pool of threads, such as display behavior (of 10 threads, for example), but I need ExportAsCSVAction0 to be called in the same stream (and actually the current stream would be ideal).

Could you help me? I cannot understand if the toBlocking().forEach() template is used correctly, and I do not understand where to use Schedulers.from(fixedThreadPool) (in observeOn() or in subscribeOn() ).

Thanks for the help!

+6
source share
1 answer

I found it myself!

 package radium.rx; import java.util.List; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import rx.Observable; import rx.schedulers.Schedulers; public class TryRx { public static Random RANDOM = new Random(); public static void main(String[] arguments) throws Throwable { final List<Integer> inputs = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12); final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2); Iterable<Integer> outputs = Observable.<Integer>from(inputs) .flatMap((Integer input) -> deferHeavyWeightStuff(input).subscribeOn(Schedulers.from(threadPoolExecutor))) .toBlocking() .toIterable(); for (Integer output : outputs) { System.out.println(output); } threadPoolExecutor.shutdown(); } public static void sleepQuietly(int duration, TimeUnit unit) { try { Thread.sleep(unit.toMillis(duration)); } catch (InterruptedException e) { } } public static Observable<Integer> deferHeavyWeightStuff(final int input) { return Observable.defer(() -> Observable.just(doHeavyWeightStuff(input))); } public static int randomInt(int min, int max) { return RANDOM.nextInt((max - min) + 1) + min; } public static int doHeavyWeightStuff(int input) { sleepQuietly(randomInt(1, 5), TimeUnit.SECONDS); int output = (int) Math.pow(input, 2); return output; } } 

Hooray!

+19
source

Source: https://habr.com/ru/post/979318/


All Articles