Massive Alternate Template Tasks for Runnable or Callable

For massive parallel computing, I try to use performers and callers. When I have a thousand objects to calculate, I feel not very good to create an instance of thousands of Runnables for each object.

So, I have two approaches to solving this issue:

I of . Divide the workload into a small number of x-workers giving y-objects each. (splitting the list of objects into x-sections with a size of y / x each)

public static <V> List<List<V>> partitions(List<V> list, int chunks) {
      final ArrayList<List<V>> lists = new ArrayList<List<V>>();
      final int size = Math.max(1, list.size() / chunks + 1);
      final int listSize = list.size();
      for (int i = 0; i <= chunks; i++) {
         final List<V> vs = list.subList(Math.min(listSize, i * size), Math.min(listSize, i * size + size));
         if(vs.size() == 0) break;
         lists.add(vs);
      }
      return lists;
   }

II . Create x-workers who retrieve objects from the queue.

Questions:

  • Is creating thousands of runnables really expensive and which should be avoided?
  • /, II?
  • ?
+4
5

Runnable (, Runnable), , .

, Executors .

+5

, java 8 .

+2

:

Runnables ?

, . , , ( , , ). :

List<Computation> computations = ...
List<Thread> threads = new ArrayList<>();
for (Computation computation : computations) {
    Thread thread = new Thread(new Computation(computation));
    threads.add(thread);
    thread.start();
}
// If you need to wait for completion:
for (Thread t : threads) {
    t.join();
}

1) OS ressource ( , ), 2) , , , 3) , ( , , Uncaught, ).

, , ( , "", ) Callable s.

List<Computation> computations = ...
ExecutorService pool = Executors.newFixedSizeThreadPool(someNumber)
List<Future<Result>> results = new ArrayList<>();
for (Computation computation : computations) {
    results.add(pool.submit(new ComputationCallable(computation));
}
for (Future<Result> result : results {
    doSomething(result.get);
}

, , .

/, II?

. -, ( List List<List>) , Guava, .

, , :

" ", ( , ):

public static class Adder extends RecursiveTask<Integer> {
protected List<Integer> globalList;
protected int start;
protected int stop;

public Adder(List<Integer> globalList, int start, int stop) {
  super();
  this.globalList = globalList;
  this.start = start;
  this.stop = stop;
  System.out.println("Creating for " + start + " => " + stop);
}

@Override
protected Integer compute() {
  if (stop - start > 1000) {
    // Too many arguments, we split the list
    Adder subTask1 = new Adder(globalList, start, start + (stop-start)/2);
    Adder subTask2 = new Adder(globalList, start + (stop-start)/2, stop);
    subTask2.fork();
    return subTask1.compute() + subTask2.join();
  } else {
    // Manageable size of arguments, we deal in place
    int result = 0;
    for(int i = start; i < stop; i++) {
      result +=i;
    }
    return result;
  }
}
}

public void doWork() throws Exception {
List<Integer> computation = new ArrayList<>();
for(int i = 0; i < 10000; i++) {
  computation.add(i);
}
ForkJoinPool pool = new ForkJoinPool();

RecursiveTask<Integer> masterTask = new Adder(computation, 0, computation.size());
Future<Integer> future = pool.submit(masterTask);
System.out.println(future.get());

}
  1. Java 8, ( Java Fork/Join).

, .

?

( /), . https://en.wikipedia.org/wiki/Actor_model ...

+1

@ , Java 8 :

void processInParallel(List<V> list) {
    list.parallelStream().forEach(item -> {
        // do something
    });
}

chunks, ForkJoinPool, :

void processInParallel(List<V> list, int chunks) {
    ForkJoinPool forkJoinPool = new ForkJoinPool(chunks);
    forkJoinPool.submit(() -> {
        list.parallelStream().forEach(item -> {
            // do something with each item
        });
    });
}

:

 void processInParallel(List<V> list, int chunks, Consumer<V> processor) {
    ForkJoinPool forkJoinPool = new ForkJoinPool(chunks);
    forkJoinPool.submit(() -> {
        list.parallelStream().forEach(item -> processor.accept(item));
    });
}

:

void processInParallel(List<V> list, int chunks, Consumer<V> processor) {
    new ForkJoinPool(chunks).submit(() -> list.parallelStream().forEach(processor::accept));
}

:

processInParallel(myList, 2, item -> {
    // do something with each item
});

ForkJoinPool#submit() ForkJoinTask, Future, .

, , ForkJoinPool ( ), , , .

0

Runnables ?

, / , "" , . , , Runnable/Callable.

/, II?

2 , 1. , 1 , . , , y/x, . 2 ( numWorkItems < numWorkers).

, 2, ExecutorService invokeAll(Collection<? extends Callable<T>> list).

:

List<Callable<?>> workList = // a single list of all of your work
ExecutorService es = Executors.newCachedThreadPool();
es.invokeAll(workList);

, ExecutorService 2, , .

?

1 2 - . (, java.util.Concurrent, Java 8 Fork/Join), . - .

0

Source: https://habr.com/ru/post/1616746/


All Articles