Choosing a thread to perform a barrier action - Java CyclicBarrier

Looking at the javadocs for CyclicBarrier, I found the following instruction in the class documentation, which I do not fully understand. From javadoc :

If the action of the barrier does not depend on whether the parties are suspended when it is executed, then any of the threads in the side can perform this action when it is released. To facilitate this, each call to await () returns the arrival index of this stream to the barrier. Then you can choose which thread should perform the barrier action, for example:

if (barrier.await() == 0) { // log the completion of this iteration } 

Can someone explain how to assign a specific thread to perform a barrier action as soon as all parties have called .await () and possibly provided an example?

+6
source share
3 answers

CyclicBarrier allows you to assign an ORDER stream:

Assigning a stream returning in SPECIFIC order is possible if, as you say, you enclose the logic for terminating the barrier in a conditional expression that is specific to the stream index. Thus, your implementation above will work in accordance with the documentation you specify.

However, the point of confusion here is that the documentation speaks of the identity of the flows in terms of the order of return to the barrier, and not in the identifier of the stream object. Thus, thread 0 refers to the 0th thread to complete.

Alternative: stream assignment using other mechanisms.

If you want a specific thread to perform a specific action after other work has been completed, you can use a different mechanism — for example, a semaphore. If you want this behavior, you may not need a cyclic barrier.

To check what is meant by the documentation, run the class (modified by http://programmingexamples.wikidot.com/cyclicbarrier ), where ive included your snippet.

An example of what is understood in the docs for CyclicBarrier

packet stream; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;

 public class CyclicBarrierExample { private static int matrix[][] = { { 1 }, { 2, 2 }, { 3, 3, 3 }, { 4, 4, 4, 4 }, { 5, 5, 5, 5, 5 } }; static final int rows = matrix.length; private static int results[]=new int[rows]; static int threadId=0; private static class Summer extends Thread { int row; CyclicBarrier barrier; Summer(CyclicBarrier barrier, int row) { this.barrier = barrier; this.row = row; } public void run() { int columns = matrix[row].length; int sum = 0; for (int i = 0; i < columns; i++) { sum += matrix[row][i]; } results[row] = sum; System.out.println("Results for row " + row + " are : " + sum); // wait for the others // Try commenting the below block, and watch what happens. try { int w = barrier.await(); if(w==0) { System.out.println("merging now !"); int fullSum = 0; for (int i = 0; i < rows; i++) { fullSum += results[i]; } System.out.println("Results are: " + fullSum); } } catch(Exception e) { e.printStackTrace(); } } } public static void main(String args[]) { /* * public CyclicBarrier(int parties,Runnable barrierAction) * Creates a new CyclicBarrier that will trip when the given number * of parties (threads) are waiting upon it, and which will execute * the merger task when the barrier is tripped, performed * by the last thread entering the barrier. */ CyclicBarrier barrier = new CyclicBarrier(rows ); for (int i = 0; i < rows; i++) { System.out.println("Creating summer " + i); new Summer(barrier, i).start(); } System.out.println("Waiting..."); } } 
+1
source

OK, pretend that RuPaul needs work flows, but only the third who has finished must complete the barrier task (Say “Sasha, Chante”).

 import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; public class Main { private static class Worker implements Runnable { private CyclicBarrier barrier; public Worker(CyclicBarrier b) { barrier = b; } public void run() { final String threadName = Thread.currentThread().getName(); System.out.printf("%s: You better work!%n", threadName); // simulate the workin' it part Random rnd = new Random(); int secondsToWorkIt = rnd.nextInt(10) + 1; try { TimeUnit.SECONDS.sleep(secondsToWorkIt); } catch (InterruptedException ex) { /* ...*/ } System.out.printf("%s worked it, girl!%n", threadName); try { int n = barrier.await(); final int myOrder = barrier.getParties() - n; System.out.printf("Turn number: %s was %s%n", myOrder, threadName); // MAGIC CODE HERE!!! if (myOrder == 3) { // the third one that finished System.out.printf("%s: Sashay Chante!%n", myOrder); } // END MAGIC CODE } catch (BrokenBarrierException ex) { /* ... */ } catch (InterruptedException ex) { /* ... */ } } } private final int numThreads = 5; public void work() { /* * I want the 3rd thread that finished to say "Sashay Chante!" * when everyone has called await. * So I'm not going to put my "barrier action" in the CyclicBarrier constructor, * where only the last thread will run it! I'm going to put it in the Runnable * that calls await. */ CyclicBarrier b = new CyclicBarrier(numThreads); for (int i= 0; i < numThreads; i++) { Worker task = new Worker(b); Thread thread = new Thread(task); thread.start(); } } public static void main(String[] args) { Main main = new Main(); main.work(); } } 

Here is an example output:

 Thread-0: You better work! Thread-4: You better work! Thread-2: You better work! Thread-1: You better work! Thread-3: You better work! Thread-1 worked it, girl! Thread-4 worked it, girl! Thread-0 worked it, girl! Thread-3 worked it, girl! Thread-2 worked it, girl! Turn number: 5 was Thread-2 Turn number: 3 was Thread-0 3: Sashay Chante! Turn number: 1 was Thread-1 Turn number: 4 was Thread-3 Turn number: 2 was Thread-4 

As you can see, the thread that occupied the third was Thread-0, so Thread-0 was the one that did the “barrier action”.

Say you can name your topics:

 thread.setName("My Thread " + i); 

Then you can perform the action in the stream of that name ... I do not know how possible this is for you.

+3
source

I think that in the documentation section there is an alternative to the Runnable barrier action, and not a specific way to use it. Pay attention to how it says (my attention):

If the action of the barrier does not depend on the suspended parties , when it is performed

If you specify the action of the barrier as runnable, then it ...

It starts once per barrier point, after the last stream in the lot arrives, but before any flows are released.

So, although the threads are suspended (although, since it is started by the last thread, it does not stop, but at least its normal execution thread is suspended until the barrier expires).

The business of using await() return value is what you can do if you don't need your actions to be performed while the threads are paused.

Examples of documentation are indicative. In the example that uses the Runnable barrier action, the work of some other threads is coordinated - string concatenation and job verification. Other threads must wait until he finds out if they have more work. Therefore, it should work while they are paused. An example of using the return value from await() is some registration. Other threads are independent of logging. Thus, this can happen when other threads started to do more work.

+2
source

Source: https://habr.com/ru/post/904408/


All Articles