How can I show that a theft occurs in Java Fork / Join?

I would like to improve my fork / join example to show that theft occurs while working with the Java Fork / Join runtime.

What changes do I need to make for the following code? The purpose of the example: simply conduct a linear study of the decomposition of value between multiple threads.

package com.stackoverflow.questions; import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class CounterFJ<T extends Comparable<T>> extends RecursiveTask<Integer> { private static final long serialVersionUID = 5075739389907066763L; private List<T> _list; private T _test; private int _lastCount = -1; private int _start; private int _end; private int _divideFactor = 4; private static final int THRESHOLD = 20; public CounterFJ(List<T> list, T test, int start, int end, int factor) { _list = list; _test = test; _start = start; _end = end; _divideFactor = factor; } public CounterFJ(List<T> list, T test, int factor) { this(list, test, 0, list.size(), factor); } @Override protected Integer compute() { if (_end - _start < THRESHOLD) { int count = 0; for (int i = _start; i < _end; i++) { if (_list.get(i).compareTo(_test) == 0) { count++; } } _lastCount = count; return new Integer(count); } LinkedList<CounterFJ<T>> taskList = new LinkedList<>(); int step = (_end - _start) / _divideFactor; for (int j = 0; j < _divideFactor; j++) { CounterFJ<T> task = null; if (j == 0) task = new CounterFJ<T>(_list, _test, _start, _start + step, _divideFactor); else if (j == _divideFactor - 1) task = new CounterFJ<T>(_list, _test, _start + (step * j), _end, _divideFactor); else task = new CounterFJ<T>(_list, _test, _start + (step * j), _start + (step * (j + 1)), _divideFactor); // task.fork(); taskList.add(task); } invokeAll(taskList); _lastCount = 0; for (CounterFJ<T> task : taskList) { _lastCount += task.join(); } return new Integer(_lastCount); } public int getResult() { return _lastCount; } public static void main(String[] args) { LinkedList<Long> list = new LinkedList<Long>(); long range = 200; Random r = new Random(42); for (int i = 0; i < 1000; i++) { list.add(new Long((long) (r.nextDouble() * range))); } CounterFJ<Long> counter = new CounterFJ<>(list, new Long(100), 4); ForkJoinPool pool = new ForkJoinPool(); long time = System.currentTimeMillis(); pool.invoke(counter); System.out.println("Fork join counter in " + (System.currentTimeMillis() - time)); System.out.println("Occurrences:" + counter.getResult()); } } 
+2
source share
1 answer

I finally managed how to do it, and it’s not difficult, so I leave it for future readers.

In the RecursiveTask cost constructor, save the stream that the instance itself created. In the compute method, check if the execution thread is the same or not. If there was no work theft.

So I added this member variable

 private long _threadId = -1; private static int stolen_tasks = 0; 

changed the constructor as follows:

 public CounterFJ(List<T> list, T test, int start, int end, int factor) { _list = list; _threadId = Thread.currentThread().getId(); //added _test = test; _start = start; _end = end; _branchFactor = factor; } 

and added a comparison to the compute method:

  @Override protected Integer compute() { long thisThreadId = Thread.currentThread().getId(); if (_threadId != thisThreadId){ stolen_tasks++; } // rest of the method 
+6
source

Source: https://habr.com/ru/post/1479468/


All Articles