Creating a synchronized buffer for the Producer / Consumer pattern in Java

I implemented a buffer for the producer / consumer pattern, however, it seems that the consumer never acquires a lock, so starvation occurs. I can't determine why this is happening, since both put () and take () seem to release the lock correctly ...

I know there are BlockingQueue and other good implementations, but I want to implement this using wait () and notify () as an exercise.

public class ProducerConsumerRaw { public static void main(String[] args) { IntBuffer buffer = new IntBuffer(8); ConsumerRaw consumer = new ConsumerRaw(buffer); ProducerRaw producer = new ProducerRaw(buffer); Thread t1 = new Thread(consumer); Thread t2 = new Thread(producer); t1.start(); t2.start(); } } class ConsumerRaw implements Runnable{ private final IntBuffer buffer; public ConsumerRaw(IntBuffer b){ buffer = b; } public void run() { while(!buffer.isEmpty()) { int i = buffer.take(); System.out.println("Consumer reads "+i); // this print may not be in the order } } } class ProducerRaw implements Runnable{ private final IntBuffer buffer; ProducerRaw(IntBuffer b) { this.buffer = b; } public void run(){ for (int i = 0; i < 20; i++) { int n = (int) (Math.random()*100); buffer.put(n); System.out.println("Producer puts "+n); } } } class IntBuffer{ private final int[] storage; private volatile int end; private volatile int start; public IntBuffer(int size) { this.storage = new int[size]; end = 0; start = 0; } public void put(int n) { // puts add the END synchronized(storage) { boolean full = (start == (end+storage.length+1)%storage.length); while(full){ // queue is full try { storage.notifyAll(); storage.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.storage[end] = n; end = incrementMod(end); storage.notifyAll(); } } public int take(){ synchronized(storage) { while (end == start) { // empty queue try { storage.notifyAll(); // notify waiting producers storage.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int index = start; start = incrementMod(start); storage.notifyAll(); // notify waiting producers return this.storage[index]; } } private int incrementMod(int index) { synchronized (storage) { if (index == storage.length-1) return 0; else return index+1; } } public boolean isEmpty(){ synchronized (storage) { return (start == end); } } } 
+4
source share
1 answer

This is at least one problem in your put method:

 boolean full = (start == (end+storage.length+1)%storage.length); while(full){ // queue is full // Code that doesn't change full } 

If full initialized to true , how do you expect the loop to complete?

Another problem is the consumerโ€™s cycle:

 while(!buffer.isEmpty()) { int i = buffer.take(); System.out.println("Consumer reads "+i); } 

You assume that the producer will never empty the buffer - if the consumer starts work in front of the producer, he will immediately stop.

Instead, you need a way to tell the buffer that you stopped producing. The consumer must continue to accept until the queue is empty and receives more data.

+6
source

Source: https://habr.com/ru/post/1487568/


All Articles