The queue is a complete mistake, work with several consumers, manufacturers

I would like to simulate the following scenario: several consumers, producer flows working on changing some data as

Customization

BlockingQueue<String> q1 = new SynchronousQueue<String>(); BlockingQueue<String> q2 = new SynchronousQueue<String>(); Producer dataProducer = new Producer(q1); // publish to q1 Filter1 filter1 = new Filter1(q1, q2); // read from q1, publish to q2 Filter2 filter2 = new Filter2(q2); // read from q2 new Thread(dataProducer, "Producer-Thread").start(); new Thread(filter1, "Filter1-Thread").start(); new Thread(filter2, "Filter2-Thread").start(); 

Manufacturer

 public void run() { try { while (true) { this.q.put(saySomething()); } } catch (InterruptedException ex) { ex.printStackTrace(); } } public String saySomething() { return "Something"; } 

Filter 1

 public void run() { try { while (true) { consume(qIn.take()); } } catch (InterruptedException ex) { ex.printStackTrace(); } } private void consume(String take) { //Modify data according to some rules String newData = take.replace("m", "-"); produce(newData); } private void produce(String newData) { // put new data in queue out qOut.add(newData); // <-- Stacktrace points here } 

Filter 2

 public void run() { try { while (true) { consume(qIn.take()); } } catch (InterruptedException ex) { ex.printStackTrace(); } } private void consume(String s) { System.out.println("Something became: " + s); } 

So, to repeat: The producer puts something in the queue from which Filter 1 is read. He changes the data and publishes them in another queue from which Filter 2 is read. Filter 2 prints the final data.

This code does not work with

 Exception in thread "Thread-2" java.lang.IllegalStateException: Queue full 

Could you help me understand why?

+6
source share
1 answer

You should use put() , not add() . SynchronousQueue is always full and empty; it has no depth. put () will tell the queue to suspend this thread until another thread appears to remove the item from the queue.

The add() method will succeed only if there is a thread wait, if the thread does not wait, you will get a complete queue exception.

+12
source

Source: https://habr.com/ru/post/907364/


All Articles