Java: Producer = Consumer, how do I know when to stop?

I have several workers that use ArrayBlockingQueue.

Each worker accepts one object from the queue, processes it, and as a result can receive several objects that will be placed in the queue for further processing. So, worker = producer + consumer.

Working:

public class Worker implements Runnable { private BlockingQueue<String> processQueue = null; public Worker(BlockingQueue<String> processQueue) { this.processQueue = processQueue; } public void run() { try { do { String item = this.processQueue.take(); ArrayList<String> resultItems = this.processItem(item); for(String resultItem : resultItems) { this.processQueue.put(resultItem); } } while(true); } catch(Exception) { ... } } private ArrayList<String> processItem(String item) throws Exception { ... } } 

Main:

 public class Test { public static void main(String[] args) throws Exception { new Test().run(); } private void run() throws Exception { BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000); processQueue.put("lalala"); Executor service = Executors.newFixedThreadPool(100); for(int i=0; i<100; ++i) { service.execute(new Worker(processQueue)); } } } 

What is the best way to stop workers when there is no more work?

Firstly, I mean, it is periodically checking the number of elements in the queue and the number of elements at the moment. If both are zero, then do something like "shutdownNow ()" in the ExecutorService. But I'm not sure if this is the best way.

+6
source share
3 answers

If there is no more work, put a message in the queue saying this, and the workers will be closed by themselves. This is a good way to prevent data corruption.

If you need to notify yet another thread that all workers have gone home, you can use CountDownLatch to do this.

+2
source

It seems that you have a solution - use a separate intermediate queue, the size of which will be the number of processed items. If you use a convention that refers to any queue that is in synchronized(theArrayBlockingQueue) , then everything should be fine. In particular, when moving an element to the processing state, remove it from the ArrayBlockingQueue object and add it to the Queue handler within the same synchronized block.

+1
source

I changed your code a bit, not sure if this is what you expect, but at least it ends! If you use shutdownNow instead of shutdown , your workers will be aborted and, if you do not return them to work, will exit without a guarantee that the queue is empty.

 public class Test { public static void main(String[] args) throws Exception { new Test().run(); } private void run() throws Exception { BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000); processQueue.put("lalalalalalalalalalalalala"); //a little longer to make sure there is enough to process ExecutorService service = Executors.newFixedThreadPool(100); for (int i = 0; i < 100; ++i) { service.execute(new Worker(processQueue)); } service.shutdown(); //orderly shutdown = lets the tasks terminate what they are doing service.awaitTermination(1, TimeUnit.SECONDS); //blocks until all tasks have finished or throws TimeOutException if timeout is reached } public static class Worker implements Runnable { private BlockingQueue<String> processQueue = null; private int count = 0; public Worker(BlockingQueue<String> processQueue) { this.processQueue = processQueue; } @Override public void run() { try { do { //tries to get something from the queue for 100ms and returns null if it could not get anything String item = this.processQueue.poll(100, TimeUnit.MILLISECONDS); if (item == null) break; //Ends the job because the queue was empty count++; List<String> resultItems = this.processItem(item); for (String resultItem : resultItems) { this.processQueue.put(resultItem); } } while (true); } catch (InterruptedException e) { System.out.println("Interrupted"); Thread.currentThread().interrupt(); } if (count != 0) System.out.println(Thread.currentThread() + ": processed " + count + " entries"); } private List<String> processItem(String item) { //let put the string back less final character if (item.isEmpty()) { return Collections.<String> emptyList(); } else { return Arrays.asList(item.substring(0, item.length() - 1)); } } } } 
0
source

Source: https://habr.com/ru/post/912263/


All Articles