Cannot multithread scalable method

UPDATE. To help clarify what I'm asking, I posted a little Java code that uses this idea.

Some time ago I asked a question about how to get an algorithm for breaking down a set of numbers, the idea was to give it a list of numbers (1,2,3,4,5) and a final one ( 10 ), and it would calculate all the multiples of each number that will be added to the sum ( '1*10' or ' 1*1,1*2,1*3,1*4 ' or ' 2*5 ', etc.). This was the first exercise on the programming that I have ever done, so it took me a while and I got it, but now I want to try to find out if I can scale it. The person in the original question said it is scalable, but I'm a little confused about how to do this. The recursive part is the area in which I am stuck when scaling the part that combines all the results (the table to which it refers is not scalable, but applies caching, I can do this quickly)

I have the following algorithm (pseudocode):

 //generates table for i = 1 to k for z = 0 to sum: for c = 1 to z / x_i: if T[z - c * x_i][i - 1] is true: set T[z][i] to true //uses table to bring all the parts together function RecursivelyListAllThatWork(k, sum) // Using last k variables, make sum /* Base case: If we've assigned all the variables correctly, list this * solution. */ if k == 0: print what we have so far return /* Recursive step: Try all coefficients, but only if they work. */ for c = 0 to sum / x_k: if T[sum - c * x_k][k - 1] is true: mark the coefficient of x_k to be c call RecursivelyListAllThatWork(k - 1, sum - c * x_k) unmark the coefficient of x_k 

I'm really at a loss like the thread / multiprocessor function of RecursivelyListAllThatWork. I know if I send him a smaller K (which is an int of the total number of items in the list), it will process this subset, but I don’t know how to do those that combine the results in a subset. For example, if the list is [1,2,3,4,5,6,7,8,9,10] , and I send it K = 3, then only 1,2,3 is processed, but this is normal, but what to do if I need results that include 1 and 10? I tried changing the table (T variable), so that only the subset I want is there, but still does not work, because, like the solution above, it executes the subset, but cannot handle answers that require a wider range.

I don't need code if someone can explain how to conceptually break this recursive step so that other kernels / machines can be used.

UPDATE: I still can’t understand how to turn RecursivelyListAllThatWork into runnable (I know how to do this, but I don’t understand how to change the RecursivelyListAllThatWork algorithm so that it can run in parallel. The rest of the parts are just here to make an example of work I need only implement runnable in the RecursivelyListAllThatWork method). Here is the Java code:

 import java.awt.Point; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class main { public static void main(String[] args) { System.out.println("starting.."); int target_sum = 100; int[] data = new int[] { 10, 5, 50, 20, 25, 40 }; List T = tableGeneator(target_sum, data); List<Integer> coeff = create_coeff(data.length); RecursivelyListAllThatWork(data.length, target_sum, T, coeff, data); } private static List<Integer> create_coeff(int i) { // TODO Auto-generated method stub Integer[] integers = new Integer[i]; Arrays.fill(integers, 0); List<Integer> integerList = Arrays.asList(integers); return integerList; } private static void RecursivelyListAllThatWork(int k, int sum, List T, List<Integer> coeff, int[] data) { // TODO Auto-generated method stub if (k == 0) { //# print what we have so far for (int i = 0; i < coeff.size(); i++) { System.out.println(data[i] + " = " + coeff.get(i)); } System.out.println("*******************"); return; } Integer x_k = data[k-1]; // Recursive step: Try all coefficients, but only if they work. for (int c = 0; c <= sum/x_k; c++) { //the c variable caps the percent if (T.contains(new Point((sum - c * x_k), (k-1)))) { // mark the coefficient of x_k to be c coeff.set((k-1), c); RecursivelyListAllThatWork((k - 1), (sum - c * x_k), T, coeff, data); // unmark the coefficient of x_k coeff.set((k-1), 0); } } } public static List tableGeneator(int target_sum, int[] data) { List T = new ArrayList(); T.add(new Point(0, 0)); float max_percent = 1; int R = (int) (target_sum * max_percent * data.length); for (int i = 0; i < data.length; i++) { for (int s = -R; s < R + 1; s++) { int max_value = (int) Math.abs((target_sum * max_percent) / data[i]); for (int c = 0; c < max_value + 1; c++) { if (T.contains(new Point(s - c * data[i], i))) { Point p = new Point(s, i + 1); if (!T.contains(p)) { T.add(p); } } } } } return T; } } 
+6
source share
3 answers

The general answer to multithreading is to eliminate recursive implementation thanks to the stack (LIFO or FIFO). When implementing such an algorithm, the number of threads is a fixed parameter for the algorithm (for example, the number of cores).

To implement it, the language call stack is replaced with a stack that stores the last context as a checkpoint when the checked state completes the recursion. In your case, these are either k=0 or coeff correspond to the target sum.

After deactivation, the first implementation is designed to start multiple threads for stack consumption, but access to the stack becomes a rival point, because synchronization may be required.

The best scalable solution is to allocate a stack for each thread, but initial context creation on the stack is required.

I suggest a mixed approach where the first thread works recursively for a limited number k as the maximum recursion depth: 2 for a small data set in the example, but I recommend 3 if more. Then this first part delegates the generated intermediate contexts to the thread pool, which will process the remaining k with a non-recursive implementation. This code is not based on the complex algorithm that you use, but on a rather "basic" implementation:

 import java.util.Arrays; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class MixedParallel { // pre-requisite: sorted values !! private static final int[] data = new int[] { 5, 10, 20, 25, 40, 50 }; // Context to store intermediate computation or a solution static class Context { int k; int sum; int[] coeff; Context(int k, int sum, int[] coeff) { this.k = k; this.sum = sum; this.coeff = coeff; } } // Thread pool for parallel execution private static ExecutorService executor; // Queue to collect solutions private static Queue<Context> solutions; static { final int numberOfThreads = 2; executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 1000, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); // concurrent because of multi-threaded insertions solutions = new ConcurrentLinkedQueue<Context>(); } public static void main(String[] args) { int target_sum = 100; // result vector, init to 0 int[] coeff = new int[data.length]; Arrays.fill(coeff, 0); mixedPartialSum(data.length - 1, target_sum, coeff); executor.shutdown(); // System.out.println("Over. Dumping results"); while(!solutions.isEmpty()) { Context s = solutions.poll(); printResult(s.coeff); } } private static void printResult(int[] coeff) { StringBuffer sb = new StringBuffer(); for (int i = coeff.length - 1; i >= 0; i--) { if (coeff[i] > 0) { sb.append(data[i]).append(" * ").append(coeff[i]).append(" "); } } System.out.println(sb.append("from ").append(Thread.currentThread())); } private static void mixedPartialSum(int k, int sum, int[] coeff) { int x_k = data[k]; for (int c = sum / x_k; c >= 0; c--) { coeff[k] = c; int[] newcoeff = Arrays.copyOf(coeff, coeff.length); if (c * x_k == sum) { //printResult(newcoeff); solutions.add(new Context(0, 0, newcoeff)); continue; } else if (k > 0) { if (data.length - k < 2) { mixedPartialSum(k - 1, sum - c * x_k, newcoeff); // for loop on "c" goes on with previous coeff content } else { // no longer recursive. delegate to thread pool executor.submit(new ComputePartialSum(new Context(k - 1, sum - c * x_k, newcoeff))); } } } } static class ComputePartialSum implements Callable<Void> { // queue with contexts to process private Queue<Context> contexts; ComputePartialSum(Context request) { contexts = new ArrayDeque<Context>(); contexts.add(request); } public Void call() { while(!contexts.isEmpty()) { Context current = contexts.poll(); int x_k = data[current.k]; for (int c = current.sum / x_k; c >= 0; c--) { current.coeff[current.k] = c; int[] newcoeff = Arrays.copyOf(current.coeff, current.coeff.length); if (c * x_k == current.sum) { //printResult(newcoeff); solutions.add(new Context(0, 0, newcoeff)); continue; } else if (current.k > 0) { contexts.add(new Context(current.k - 1, current.sum - c * x_k, newcoeff)); } } } return null; } } } 

You can check which thread found the output and see if they are all turned on: the main thread in recursive mode and two threads from the pool in context stack mode.

Now this implementation scales when data.length is high:

  • maximum recursion depth is limited by the main thread at a low level
  • each thread from the pool works with its own context stack without competing with others
  • parameters to configure are now numberOfThreads and maxRecursionDepth

So yes, your algorithm can be parallelized. Here is a fully recursive implementation based on your code:

 import java.awt.Point; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class OriginalParallel { static final int numberOfThreads = 2; static final int maxRecursionDepth = 3; public static void main(String[] args) { int target_sum = 100; int[] data = new int[] { 50, 40, 25, 20, 10, 5 }; List T = tableGeneator(target_sum, data); int[] coeff = new int[data.length]; Arrays.fill(coeff, 0); RecursivelyListAllThatWork(data.length, target_sum, T, coeff, data); executor.shutdown(); } private static void printResult(int[] coeff, int[] data) { StringBuffer sb = new StringBuffer(); for (int i = coeff.length - 1; i >= 0; i--) { if (coeff[i] > 0) { sb.append(data[i]).append(" * ").append(coeff[i]).append(" "); } } System.out.println(sb.append("from ").append(Thread.currentThread())); } // Thread pool for parallel execution private static ExecutorService executor; static { executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 1000, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); } private static void RecursivelyListAllThatWork(int k, int sum, List T, int[] coeff, int[] data) { if (k == 0) { printResult(coeff, data); return; } Integer x_k = data[k-1]; // Recursive step: Try all coefficients, but only if they work. for (int c = 0; c <= sum/x_k; c++) { //the c variable caps the percent if (T.contains(new Point((sum - c * x_k), (k-1)))) { // mark the coefficient of x_k to be c coeff[k-1] = c; if (data.length - k != maxRecursionDepth) { RecursivelyListAllThatWork((k - 1), (sum - c * x_k), T, coeff, data); } else { // delegate to thread pool when reaching depth 3 int[] newcoeff = Arrays.copyOf(coeff, coeff.length); executor.submit(new RecursiveThread(k - 1, sum - c * x_k, T, newcoeff, data)); } // unmark the coefficient of x_k coeff[k-1] = 0; } } } static class RecursiveThread implements Callable<Void> { int k; int sum; int[] coeff; int[] data; List T; RecursiveThread(int k, int sum, List T, int[] coeff, int[] data) { this.k = k; this.sum = sum; this.T = T; this.coeff = coeff; this.data = data; System.out.println("New job for k=" + k); } public Void call() { RecursivelyListAllThatWork(k, sum, T, coeff, data); return null; } } public static List tableGeneator(int target_sum, int[] data) { List T = new ArrayList(); T.add(new Point(0, 0)); float max_percent = 1; int R = (int) (target_sum * max_percent * data.length); for (int i = 0; i < data.length; i++) { for (int s = -R; s < R + 1; s++) { int max_value = (int) Math.abs((target_sum * max_percent) / data[i]); for (int c = 0; c < max_value + 1; c++) { if (T.contains(new Point(s - c * data[i], i))) { Point p = new Point(s, i + 1); if (!T.contains(p)) { T.add(p); } } } } } return T; } } 
+8
source

1) Instead

 if k == 0: print what we have so far return 

You can check how many coefficients are non-zero; if this score is greater than a certain threshold (3 in your example), then just do not print it. (Hint: this will be closely related to

mark the coefficient of x_k to be c

lines.)

2) Recursive functions are usually exponential in nature, which means that as the scale increases, the execution time will increase dramatically.

With this in mind, you can use multithreading to compute a table and a recursive function.

When considering a table, consider which parts of the cycle affect each other and should be performed sequentially; Of course, the opposite is to find which parts do not affect each other and can be performed in parallel.

As for the recursive function, the best option would probably be to apply multithreading to the branching part.

+1
source

They do this for multithreading, to make sure that you do not have unnecessary global data structures, for example, your “labels” for coefficients.

Say you have K numbers n [0] ... n [K-1] in your table, and the amount you want to achieve is S. I assume below that the array n [] is sorted from smallest to largest .

Here is a simple enumeration algorithm. I am the index into the list of numbers, s is the already existing sum, and cs is the list of coefficients for the numbers 0 .. I am 1:

 function enumerate(i, s, cs): if (s == S): output_solution(cs) else if (i == K): return // dead end else if ((S - s) < n[i]): return // no solution can be found else: for c in 0 .. floor((S - s) / n[i]): // note: floor(...) > 0 enumerate(i + 1, s + c * n[i], append(cs, c)) 

To start the process:

  enumerate(0, 0, make_empty_list()) 

Now there are no global data structures, except for the table n [] (constant data), and "enumerate" also does not return anything, so you can change the recursive call to run in your stream of your own free will. For instance. you can create a new thread to call enumerate () recursively if you have too many threads that are already running, in which case you are waiting.

+1
source

Source: https://habr.com/ru/post/911714/


All Articles