Synchronized Matrix Multiplication Streams

I have been doing this all night and have not found a solution, so if anyone can help me, I would really appreciate it! I probably missed something super obvious. This task is to understand synchronization, when we take the previous destination, where we used streams to multiply 2 matrices. In the previous task, each thread multiplied a row, so there were as many threads as there were rows.

In this task, we should use only 5 threads: all threads should start with one row / column, and as soon as the stream is complete, it should select the next available row / column using synchronization, so now two threads will end up on the same column .

This question helped me in the right direction, but I have to do something wrong with the implementation, because so far I only got the program:

  • only the first 5 lines are executed - 5 threads are executed once, each of which calculates a line or
  • I added a loop (which is now commented out in my code), so the thread will continue to execute, but when I do this, only the first thread does any work.

This is my class with my main and several helper methods:

import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.util.Random; import java.util.Scanner; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; public class MatrixMult { public static void main(String[] args){ int[][] matrixA; int[][] matrixB; int colA = 0; int rowA = 0; int colB = 0; int rowB = 0; Scanner userInput = new Scanner( System.in ); System.out.println("Please enter the dimensions of matrix A"); do{ System.out.print("column for matrix A: "); colA = userInput.nextInt(); System.out.println(); } while(!validDimension(colA)); rowB = colA; do{ System.out.print("row for matrix A: "); rowA = userInput.nextInt(); System.out.println(); } while(!validDimension(rowA)); matrixA = new int[rowA][colA]; System.out.println("Please enter the dimensions of matrix B:"); do{ System.out.print("column for matrix B: "); colB = userInput.nextInt(); System.out.println(); } while(!validDimension(colB)); matrixB = new int[rowB][colB]; fillMatrix(matrixA); fillMatrix(matrixB); System.out.println("Would you like to print out matrix A and B? (y/n)"); String userResponse = userInput.next(); if(userResponse.equalsIgnoreCase("y")){ System.out.println("Matrix A:"); printBackMatrix(matrixA); System.out.println(); System.out.println("Matrix B:"); printBackMatrix(matrixB); System.out.println(); } int[][] matrixProduct3 = multMatrixWithThreadsSync(matrixA, matrixB); String fileName = "C:/matrix.txt"; System.out.println("Matrix product is being written to "+fileName); try { printMatrixToFile(matrixProduct3, fileName); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private static int[][] multMatrixWithThreadsSync(int[][] matrixA, int[][] matrixB) { int[][] matrixProduct = new int[matrixA.length][matrixB[0].length]; int[] matrixProductColumn = new int[matrixA.length]; Runnable task = new MultMatrixByRow(matrixA, matrixB, matrixProduct); for(int i=0; i<5; i++){ Thread worker = new Thread(task); worker.start(); // System.out.println(worker.getName()); try { worker.join(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return matrixProduct; } private static void printMatrixToFile(int[][] matrix, String fileName) throws IOException{ PrintWriter userOutput = new PrintWriter(new FileWriter(fileName)); for(int i=0; i<matrix.length; i++){ for(int j=0; j<matrix[0].length; j++){ userOutput.print(matrix[i][j]+" "); } userOutput.println(); } userOutput.close(); } private static void printBackMatrix(int[][] matrix) { for(int i=0; i<matrix.length; i++){ for(int j=0; j<matrix[0].length; j++){ System.out.print(matrix[i][j]+" "); } System.out.println(); } } private static void fillMatrix(int[][] matrix) { Random rand = new Random(); for(int i=0; i<matrix.length; i++){ for(int j=0; j<matrix[0].length; j++){ matrix[i][j] = rand.nextInt(100) + 1; } } } public static boolean validDimension(int dim){ if (dim <= 0 || dim >1000){ System.err.println("Dimension value entered is not valid"); return false; } return true; } } 

and this is my class with runnable:

 public class MultMatrixByRow implements Runnable { private int i; private int[][] matrixA; private int[][] matrixB; private int[][] matrixProduct; public MultMatrixByRow(int[][] A, int[][] B, int[][] C) { this.matrixA = A; this.matrixB = B; this.matrixProduct = C; } @Override public void run(){ // while(i < matrixProduct.length){ int rowToWork = 0; synchronized (this){ // System.out.println("i is "+i); if ( i < matrixProduct.length){ rowToWork = i; i++; } else{ return; } } for(int j = 0; j < matrixB[0].length; j++){ for(int k=0; k < matrixA[0].length; k++){ matrixProduct[rowToWork][j] += matrixA[rowToWork][k]*matrixB[k][j]; } } // } } } 

Again, any help would really be appreciated! Thank you very much.

+4
source share
4 answers
  • You do not synchronize to ressource, you need to split the lock object (in a static context or using a constructor)
  • I really am not able to understand what needs to be synchronized in your program when you do not even allow them to work synchronously .... You start a stream and directly expect it to stop. I think you should start them first, and then in a different loop, call the union in each thread.

In addition, I'm not quite sure that your Threads should work separately, I think that they all develop the entire product matrix. You need to separate the variable used to identify already processed lines that you are accessing to synchronize.

I could fix your code, but I would appreciate it if you did this work yourself, as it is a task to understand the topic of concurrency.

EDIT: Sync Explanation:
Synchronized takes an object as a lock for which only one thread can hold the monitor. If there is a monitor to block, the thread can process the block; if not, it must wait to receive the monitor.
In your case, you can use private static final Object lock = new Object(); like a lock, you will be synchronized.

EDIT 2: I completely built your code
I'm not proud to have done all your work, but it doesn't matter, here it is.

 package anything.synchronize_stackoverflow_post; /** * @date 21.11.2012 * @author Thomas Jahoda */ public class ConcurrentMatrixMultiplyingTask implements Runnable { private int[][] matrixA; private int[][] matrixB; private int[][] matrixProduct; // private final ConcurrencyContext context; public ConcurrentMatrixMultiplyingTask(ConcurrencyContext context, int[][] A, int[][] B, int[][] C) { if (context == null) { throw new IllegalArgumentException("context can not be null"); } this.context = context; this.matrixA = A; this.matrixB = B; this.matrixProduct = C; } @Override public void run() { while (true) { int row; synchronized (context) { if (context.isFullyProcessed()) { break; } row = context.nextRowNum(); } System.out.println(Thread.currentThread().getName() + " is going to process row " + row); // i'm not really sure if this matrix algorithm here is right, idk.. for (int j = 0; j < matrixB[0].length; j++) { for (int k = 0; k < matrixA[0].length; k++) { matrixProduct[row][j] += matrixA[row][k] * matrixB[k][j]; } } } } public static class ConcurrencyContext { private final int rowCount; private int nextRow = 0; public ConcurrencyContext(int rowCount) { this.rowCount = rowCount; } public synchronized int nextRowNum() { if (isFullyProcessed()) { throw new IllegalStateException("Already fully processed"); } return nextRow++; } public synchronized boolean isFullyProcessed() { return nextRow == rowCount; } } } 

And ProcessingTask

 package anything.synchronize_stackoverflow_post; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.util.Random; import java.util.Scanner; import java.util.logging.Level; import java.util.logging.Logger; /** * @date 21.11.2012 * @author Thomas Jahoda */ public class MatrixMulti { public static void main(String[] args) { int[][] matrixA; int[][] matrixB; int colA = 0; int rowA = 0; int colB = 0; int rowB = 0; Scanner userInput = new Scanner(System.in); System.out.println("Please enter the dimensions of matrix A"); do { System.out.print("column for matrix A: "); colA = userInput.nextInt(); System.out.println(); } while (!validDimension(colA)); rowB = colA; do { System.out.print("row for matrix A: "); rowA = userInput.nextInt(); System.out.println(); } while (!validDimension(rowA)); matrixA = new int[rowA][colA]; System.out.println("Please enter the dimensions of matrix B:"); do { System.out.print("column for matrix B: "); colB = userInput.nextInt(); System.out.println(); } while (!validDimension(colB)); matrixB = new int[rowB][colB]; fillMatrix(matrixA); fillMatrix(matrixB); System.out.println("Would you like to print out matrix A and B? (y/n)"); String userResponse = userInput.next(); if (userResponse.equalsIgnoreCase("y")) { System.out.println("Matrix A:"); printBackMatrix(matrixA); System.out.println(); System.out.println("Matrix B:"); printBackMatrix(matrixB); System.out.println(); } int[][] matrixProduct3 = multMatrixWithThreadsSync(matrixA, matrixB); String fileName = "test.txt"; System.out.println("Matrix product is being written to " + fileName); try { printMatrixToFile(matrixProduct3, fileName); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private static int[][] multMatrixWithThreadsSync(int[][] matrixA, int[][] matrixB) { int[][] matrixProduct = new int[matrixA.length][matrixB[0].length]; int[] matrixProductColumn = new int[matrixA.length]; // ConcurrentMatrixMultiplyingTask.ConcurrencyContext context = new ConcurrentMatrixMultiplyingTask.ConcurrencyContext(matrixProduct.length); // Runnable task = new ConcurrentMatrixMultiplyingTask(context, matrixA, matrixB, matrixProduct); Thread[] workers = new Thread[5]; for (int i = 0; i < workers.length; i++) { workers[i] = new Thread(task, "Worker-"+i); } for (int i = 0; i < workers.length; i++) { Thread worker = workers[i]; worker.start(); } for (int i = 0; i < workers.length; i++) { Thread worker = workers[i]; try { worker.join(); } catch (InterruptedException ex) { Logger.getLogger(MatrixMulti.class.getName()).log(Level.SEVERE, null, ex); } } return matrixProduct; } private static void printMatrixToFile(int[][] matrix, String fileName) throws IOException { PrintWriter userOutput = new PrintWriter(new FileWriter(fileName)); for (int i = 0; i < matrix.length; i++) { for (int j = 0; j < matrix[0].length; j++) { userOutput.print(matrix[i][j] + " "); } userOutput.println(); } userOutput.close(); } private static void printBackMatrix(int[][] matrix) { for (int i = 0; i < matrix.length; i++) { for (int j = 0; j < matrix[0].length; j++) { System.out.print(matrix[i][j] + " "); } System.out.println(); } } private static void fillMatrix(int[][] matrix) { Random rand = new Random(); for (int i = 0; i < matrix.length; i++) { for (int j = 0; j < matrix[0].length; j++) { matrix[i][j] = rand.nextInt(100) + 1; } } } public static boolean validDimension(int dim) { if (dim <= 0 || dim > 1000) { System.err.println("Dimension value entered is not valid"); return false; } return true; } } 
+3
source

To approach your problem, you need to determine what makes a "unit of work." This "unit of work" (or task) is what each thread will perform. Once this is determined, you can reason that this unit of work should do its job.

In the case of matrix multiplication, the natural unit of work is each cell of the resulting matrix. So, given the matrix A [i, j] and B [j, k], your calculation can focus on the point product of the vector A.row (x) (point) B. column (y) for each (0<=x<i,0<=y<k) .

The next step is to present each task. The ideal structure for "feeding" tasks to threads is the queue. java.util.concurrent.BlockingQueue is such an example when synchronization work is performed under the hood. Given that you are asked to explain the synchronization "manually", you can use another container, such as List (or even an array). Your structure will contain each cell that defines the resulting matrix. Maybe something like this:

 class Cell; // int x, int y, getters, setters, ... // build the structure that contains the work to be shared List<Cell> cells = new LinkedList<Cell>(); for (int i=0;i<a.rows;i++) { for (int j=0;j<b.columns;j++) { cells.add(new Cell(i,j)); // represent the cells of my result matrix } } 

Now you need the task given by the cell and the matrices A and B, can calculate the value of this cell. This is your unit of work and therefore what works in the context of the stream. Here you also need to decide if you want the result to be placed. In java, you can use futures and collect your matrix outside the context of the stream, but to keep things simple, I use an array that will contain the results. (Since, by definition, there will be no collisions)

 class DotProduct implements Runnable { int[][] a; int[][] b; int[][] result; List<Cell> cells; public DotProduct(int[][] a, int[][] b, int[][]result, List<Cell> cells) { ... } public void run() { while(true) { Cell cell = null; synchronized(cells) { // here, we ensure exclusive access to the shared mutable structure if (cells.isEmpty()) return; // when there're no more cells, we are done. Cell cell = cells.get(0); // get the first cell not calculated yet cells.remove(cell); // remove it, so nobody else will work on it } int x = cell.getX(); int y = cell.getY(); z = a.row(x) (dot) b.column(y); synchronized (result) { result[x][y] = z; } } } 

Now you are almost done. The only thing you still need to do is create the threads, “feed them” using the DotProduct task and wait until they are completed. Note that I synchronized on result to update the result matrix. Although by definition there is no possibility of simultaneous access to the same cell (since each thread works in a different cell), you need to make sure that the result is “safely published” to other threads by explicitly synchronizing the object. You can also do this by declaring result volatile , but I'm not sure if you already covered this place.

Hope this helps to understand how to approach the concurrency issue.

+1
source

You use the whole spectrum of synchronization primitives: Semaphore, Lock, synchronized. Better to start with synchronization just to find out things. In fact, you need a resource that indicates that the next row / column is being processed, if any. All threads access it using a synchronized block, read the next row / column, move the row / column to the next cell, exit the block and process the resulting row / column.

If the end of the matrix is ​​complete, the workflow simply exits. The main thread waits for all worker threads to exit Thread.join ().

0
source

You really misunderstood the answer to your previous question. rowToWork should be shared between threads. And the thread should probably call the method when building to get its initial value. You should understand that your critical section is the attribute of the next line for this stream.

0
source

Source: https://habr.com/ru/post/1447500/


All Articles