How to pass an instance of a just object to a stream in java, so even if the value of the instance is updated, you should not use the stream using it

I am developing an application in which I constantly receive messages. I store these messages in a data structure in memory (e.g. List). I want to write these messages to a file, but only after the list size reaches a certain threshold value, say 100 messages (do batch processing of messages). One way I can simply check the size of the list after receiving each message and call the function to write messages to a file if the Threshold value is reached. But the problem with this approach is this:

  • The call function may be required indefinitely until all messages are written to a file
  • Incoming messages may be lost in the process, or you may need to wait until you receive the list.

Another way could be to create a new stream that will write messages to a file independently. But when I pass a list (containing messages) to the stream to perform a write operation, it is updated with new messages that are constantly arriving. As a result, newly received messages are also written to a file that is not expected.

This should not happen, as I intend to write new posts in the next installment.

Can someone suggest me a solution for this requirement or any improvements in the above approach that can solve my problems.

+4
source share
5 answers

It is important to understand that you never pass an object in Java - only ever a reference (or a primitive value).

Options:

  • Create a copy of the list and transfer the link to this copy to a new stream
  • Use a producer / consumer queue, so your β€œproducing” stream only ever adds values ​​to the queue, and your consumer stream only ever takes items from the queue to write them to disk. You need to think about how big you want the queue to get before it stops accepting more records, of course.

I would recommend the latter approach, using classes in java.util.concurrent to implement it; in particular BlockingQueue<E> .

+1
source

I believe that a cleaner solution is to support automatic dosing. that is, when the batch size is controlled by the speed of the incoming data.

For this you can use BlockingQueue

 // unbound queue will not block the producer. final BlockingQueue<T> queue = new LinkedBlockingQueue<T>(); // to add an element. queue.add(element); // to get a batch of data List<T> list = new ArrayList<T>(maxElements); while(writing) { T t = queue.take(); // wait for at least one element. list.add(t); queue.drainTo(list, maxElements-1); // process list, eg write to a file. list.clear(); } 

The advantage of this approach is that if the manufacturer works very slowly, you will not get elements that will have an unreasonably long period, but as the rate increases, the lot size naturally grows to keep up with the manufacturer, which means t must decide which size of the best packet to use.

+5
source

I would suggest the following approach:

  • Hold the link to the message list in AtomicReference .
  • When the list is filled enough, replace it with a new, empty list;
  • pass a complete list to a workflow that will save messages in a file.

If you write to the list from one stream, it will be enough to use a regular link instead of AtomicReference .

+1
source

Why not get the basic message-receiving process to create a new message list when it transferred the old file to the file's write stream?

0
source

You can implement Custom BoundedQueue with a Condition , which takes 100 objects and then writes at a time.

Now you can share this instance of the BoundedQueue class with different threads that will put objects into it, and there will be a thread that will call the writeAll() method until you want to call it.

 BoundedBuffer boundedBuffer = new BoundedBuffer(); boundedBuffer.put("test"); ....... 

From writing the topic below

 boundedBuffer.writeAll(); 

Below is a sample code

 import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition full = lock.newCondition(); final Condition empty = lock.newCondition(); final Object[] items = new Object[100]; int count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) { empty.signal(); full.await(); } items[count] = x; ++count; } finally { lock.unlock(); } } public void writeAll() throws InterruptedException { lock.lock(); try { while (count < items.length) empty.await(); // Write to file here After write finished signal full condition count = 0; full.signal(); } finally { lock.unlock(); } } } 
0
source

Source: https://habr.com/ru/post/1438666/


All Articles