Consumer cross-threading

there are problems with inter-threaded communication and "solve" it, using "dummy messages" everywhere. It is a bad idea? What are the possible solutions?

Example. I have.

The main thread starts the thread to process and insert records into the database. the main stream reads, possibly, a huge file and puts one record (object) after another in a blockingqueue. processes the stream, reads from the queue, and works.

How do I say "workflow" to stop? The queue may be empty, but work is not performed, and the main thread now does not work either when the processing of the thread is completed or when it is interrupted.

So, the flow of processing

while (queue.size() > 0 || !Thread.currentThread().isInterrupted()) { MyObject object= queue.poll(100, TimeUnit.MILLISECONDS); if (object != null) { String data = object.getData(); if (data.equals("END")) { break; } // do work } } // clean-up synchronized queue) { queue.notifyAll(); } return; 

and main stream

 // ...start processing thread... while(reader.hasNext(){ // ...read whole file and put data in queue... } MyObject dummy = new MyObject(); dummy.setData("END"); queue.put(dummy); //Note: empty queue here means work is done while (queue.size() > 0) { synchronized (queue) { queue.wait(500); // over-cautios locking prevention i guess } } 

Please note that the insert must be in the same transaction and the transaction cannot be processed on the main topic.

What would be the best way to do this? (I study and do not want to start "doing it wrong")

+6
source share
3 answers

This fake message is valid. This is called "poison." Something that the manufacturer sends to the consumer to stop him.

Another possibility is to call Thread.interrupt () somewhere in the main thread and catch and handle InterruptedException respectively in the worker thread.

+4
source

“solved” it using “dummy messages” throughout. It is a bad idea? What are the possible solutions?

This is not a bad idea, it is called "Poison Pills" and is a smart way to stop a thread-based service.

But it works only when the number of manufacturers and consumers is known.

There are two threads in the code you sent: one is the "main thread" that produces the data, the other is the "processing thread" that consumes the data, "poison tablets" work well for this circumstance.

But to imagine, if you have other manufacturers, how the consumer knows when to stop (only when all manufacturers send "Poison Pills"), you need to know exactly the number of all manufacturers and check the number of "Poison Pills" with the consumer, if it is equal to the number manufacturers, which means that all manufacturers stopped working, then the consumer stopped.

In the "main thread" you need to catch an InterruptedException , because if not, the "main thread" may not be able to set a "Poison Pill". You can do this as below

 ... try { // do normal processing } catch (InterruptedException e) { /* fall through */ } finally { MyObject dummy = new MyObject(); dummy.setData("END"); ... } ... 

Alternatively, you can try using ExecutorService to solve all problems.

(It works when you just need to do some work, and then stop when everything is complete)

 void doWorks(Set<String> works, long timeout, TimeUnit unit) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); try { for (final String work : works) exec.execute(new Runnable() { public void run() { ... } }); } finally { exec.shutdown(); exec.awaitTermination(timeout, unit); } } 

I study and do not want to start "doing it wrong"

You may need to read the book "Java: Java Concurrency" in practice. Believe me, this is the best.

+2
source

What you could do (what I did in a recent project) is to wrap the queue and then add the 'isOpen()' method.

 class ClosableQ<T> { boolean isOpen = true; private LinkedBlockingQueue<T> lbq = new LinkedBlockingQueue<T>(); public void put(T someObject) { if (isOpen) { lbq.put(someObject); } } public T get() { if (isOpen) { return lbq.get(0); } } public boolean isOpen() { return isOpen; } public void open() { isOpen = true; } public void close() { isOpen = false; } } 

So, your stream of letters will become something like:

 while (reader.hasNext() ) { // read the file and put it into the queue dataQ.put(someObject); } // now we're done dataQ.close(); 

and reader stream:

 while (dataQ.isOpen) { someObject = dataQ.get(); } 

You could, of course, expand the list, but it gives the user a level of access that you might not want. And you need to add some concurrency pieces to this code, like AtomicBoolean.

0
source

Source: https://habr.com/ru/post/904267/


All Articles