I would like to simulate the following scenario: several consumers, producer flows working on changing some data as
Customization
BlockingQueue<String> q1 = new SynchronousQueue<String>(); BlockingQueue<String> q2 = new SynchronousQueue<String>(); Producer dataProducer = new Producer(q1); // publish to q1 Filter1 filter1 = new Filter1(q1, q2); // read from q1, publish to q2 Filter2 filter2 = new Filter2(q2); // read from q2 new Thread(dataProducer, "Producer-Thread").start(); new Thread(filter1, "Filter1-Thread").start(); new Thread(filter2, "Filter2-Thread").start();
Manufacturer
public void run() { try { while (true) { this.q.put(saySomething()); } } catch (InterruptedException ex) { ex.printStackTrace(); } } public String saySomething() { return "Something"; }
Filter 1
public void run() { try { while (true) { consume(qIn.take()); } } catch (InterruptedException ex) { ex.printStackTrace(); } } private void consume(String take) {
Filter 2
public void run() { try { while (true) { consume(qIn.take()); } } catch (InterruptedException ex) { ex.printStackTrace(); } } private void consume(String s) { System.out.println("Something became: " + s); }
So, to repeat: The producer puts something in the queue from which Filter 1 is read. He changes the data and publishes them in another queue from which Filter 2 is read. Filter 2 prints the final data.
This code does not work with
Exception in thread "Thread-2" java.lang.IllegalStateException: Queue full
Could you help me understand why?
source share