The general answer to multithreading is to eliminate recursive implementation thanks to the stack (LIFO or FIFO). When implementing such an algorithm, the number of threads is a fixed parameter for the algorithm (for example, the number of cores).
To implement it, the language call stack is replaced with a stack that stores the last context as a checkpoint when the checked state completes the recursion. In your case, these are either k=0
or coeff
correspond to the target sum.
After deactivation, the first implementation is designed to start multiple threads for stack consumption, but access to the stack becomes a rival point, because synchronization may be required.
The best scalable solution is to allocate a stack for each thread, but initial context creation on the stack is required.
I suggest a mixed approach where the first thread works recursively for a limited number k
as the maximum recursion depth: 2 for a small data set in the example, but I recommend 3 if more. Then this first part delegates the generated intermediate contexts to the thread pool, which will process the remaining k
with a non-recursive implementation. This code is not based on the complex algorithm that you use, but on a rather "basic" implementation:
import java.util.Arrays; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class MixedParallel { // pre-requisite: sorted values !! private static final int[] data = new int[] { 5, 10, 20, 25, 40, 50 }; // Context to store intermediate computation or a solution static class Context { int k; int sum; int[] coeff; Context(int k, int sum, int[] coeff) { this.k = k; this.sum = sum; this.coeff = coeff; } } // Thread pool for parallel execution private static ExecutorService executor; // Queue to collect solutions private static Queue<Context> solutions; static { final int numberOfThreads = 2; executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 1000, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); // concurrent because of multi-threaded insertions solutions = new ConcurrentLinkedQueue<Context>(); } public static void main(String[] args) { int target_sum = 100; // result vector, init to 0 int[] coeff = new int[data.length]; Arrays.fill(coeff, 0); mixedPartialSum(data.length - 1, target_sum, coeff); executor.shutdown(); // System.out.println("Over. Dumping results"); while(!solutions.isEmpty()) { Context s = solutions.poll(); printResult(s.coeff); } } private static void printResult(int[] coeff) { StringBuffer sb = new StringBuffer(); for (int i = coeff.length - 1; i >= 0; i--) { if (coeff[i] > 0) { sb.append(data[i]).append(" * ").append(coeff[i]).append(" "); } } System.out.println(sb.append("from ").append(Thread.currentThread())); } private static void mixedPartialSum(int k, int sum, int[] coeff) { int x_k = data[k]; for (int c = sum / x_k; c >= 0; c--) { coeff[k] = c; int[] newcoeff = Arrays.copyOf(coeff, coeff.length); if (c * x_k == sum) { //printResult(newcoeff); solutions.add(new Context(0, 0, newcoeff)); continue; } else if (k > 0) { if (data.length - k < 2) { mixedPartialSum(k - 1, sum - c * x_k, newcoeff); // for loop on "c" goes on with previous coeff content } else { // no longer recursive. delegate to thread pool executor.submit(new ComputePartialSum(new Context(k - 1, sum - c * x_k, newcoeff))); } } } } static class ComputePartialSum implements Callable<Void> { // queue with contexts to process private Queue<Context> contexts; ComputePartialSum(Context request) { contexts = new ArrayDeque<Context>(); contexts.add(request); } public Void call() { while(!contexts.isEmpty()) { Context current = contexts.poll(); int x_k = data[current.k]; for (int c = current.sum / x_k; c >= 0; c--) { current.coeff[current.k] = c; int[] newcoeff = Arrays.copyOf(current.coeff, current.coeff.length); if (c * x_k == current.sum) { //printResult(newcoeff); solutions.add(new Context(0, 0, newcoeff)); continue; } else if (current.k > 0) { contexts.add(new Context(current.k - 1, current.sum - c * x_k, newcoeff)); } } } return null; } } }
You can check which thread found the output and see if they are all turned on: the main thread in recursive mode and two threads from the pool in context stack mode.
Now this implementation scales when data.length
is high:
- maximum recursion depth is limited by the main thread at a low level
- each thread from the pool works with its own context stack without competing with others
- parameters to configure are now
numberOfThreads
and maxRecursionDepth
So yes, your algorithm can be parallelized. Here is a fully recursive implementation based on your code:
import java.awt.Point; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class OriginalParallel { static final int numberOfThreads = 2; static final int maxRecursionDepth = 3; public static void main(String[] args) { int target_sum = 100; int[] data = new int[] { 50, 40, 25, 20, 10, 5 }; List T = tableGeneator(target_sum, data); int[] coeff = new int[data.length]; Arrays.fill(coeff, 0); RecursivelyListAllThatWork(data.length, target_sum, T, coeff, data); executor.shutdown(); } private static void printResult(int[] coeff, int[] data) { StringBuffer sb = new StringBuffer(); for (int i = coeff.length - 1; i >= 0; i--) { if (coeff[i] > 0) { sb.append(data[i]).append(" * ").append(coeff[i]).append(" "); } } System.out.println(sb.append("from ").append(Thread.currentThread())); } // Thread pool for parallel execution private static ExecutorService executor; static { executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 1000, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); } private static void RecursivelyListAllThatWork(int k, int sum, List T, int[] coeff, int[] data) { if (k == 0) { printResult(coeff, data); return; } Integer x_k = data[k-1]; // Recursive step: Try all coefficients, but only if they work. for (int c = 0; c <= sum/x_k; c++) { //the c variable caps the percent if (T.contains(new Point((sum - c * x_k), (k-1)))) { // mark the coefficient of x_k to be c coeff[k-1] = c; if (data.length - k != maxRecursionDepth) { RecursivelyListAllThatWork((k - 1), (sum - c * x_k), T, coeff, data); } else { // delegate to thread pool when reaching depth 3 int[] newcoeff = Arrays.copyOf(coeff, coeff.length); executor.submit(new RecursiveThread(k - 1, sum - c * x_k, T, newcoeff, data)); } // unmark the coefficient of x_k coeff[k-1] = 0; } } } static class RecursiveThread implements Callable<Void> { int k; int sum; int[] coeff; int[] data; List T; RecursiveThread(int k, int sum, List T, int[] coeff, int[] data) { this.k = k; this.sum = sum; this.T = T; this.coeff = coeff; this.data = data; System.out.println("New job for k=" + k); } public Void call() { RecursivelyListAllThatWork(k, sum, T, coeff, data); return null; } } public static List tableGeneator(int target_sum, int[] data) { List T = new ArrayList(); T.add(new Point(0, 0)); float max_percent = 1; int R = (int) (target_sum * max_percent * data.length); for (int i = 0; i < data.length; i++) { for (int s = -R; s < R + 1; s++) { int max_value = (int) Math.abs((target_sum * max_percent) / data[i]); for (int c = 0; c < max_value + 1; c++) { if (T.contains(new Point(s - c * data[i], i))) { Point p = new Point(s, i + 1); if (!T.contains(p)) { T.add(p); } } } } } return T; } }