Java share data between stream

I have a java process that reads data from a socket server. So I have a BufferedReader object and a PrintWriter corresponding to this socket.

Now in the same java process, I have a multi-threaded Java server that accepts client connections. I want to get functionality in which all these clients that I accept can read data from the BufferedReader object that I mentioned above. (So โ€‹โ€‹that they can multiplex the data)

How to make these separate client threads read data from a single BuffereReader object? Sorry for the confusion.

+4
source share
3 answers

I would say that they do not have direct access to the BufferedReader . Assuming you know the data format and each client is trying to read the same format (if itโ€™s not, I donโ€™t see how it can work at all), I would suggest creating one stream to read from BufferedReader and put the work items in Queue . There are many examples of using the producer / consumer queue in Java, and it is also likely to simplify the verification of client code.

Having just one thread accessing the BufferedReader means you don't have to worry about defining an atomic operation for which all other threads will have content. - The read stream efficiently defines this operation, deciding to add a work item to the queue.

EDIT: if all clients should see all the data, this reinforces my suggestion to have one more reader - except that instead of Queue data from which the elements were deleted, you will have a collection of which clients can read all existing data . You will need to use the appropriate thread safe collection, but there are many in Java.

EDIT: just by reading your comment, which says that every client should just see the last item read from the reader, which makes it even easier. Read one data stream, but just save one variable with a link to "last item read". You probably want to either synchronize access to it or use AtomicReference , but both of them are easy.

+6
source

The easiest way is probably to create a temporary file and use java.nio.channels.FileChannel . The reason is that it already gives the semantics of the reader / writer you want. An alternative is to reimplement this semantics using a memory scheme.

What would you do, read the reader and add at the end of the file. Then your readers will receive file channels that initially point to the end of the file. You have to be careful to make sure that they donโ€™t think that they have reached the end of the stream when you are just waiting for more input from the reader. You can either make the top-down logic aware of this, or provide some kind of wrapper around the readers.

Edit: this was written with the understanding that you want all readers to see everything. It is too complicated if that is not what you want.

+1
source

If you cannot store all the data in memory, you can use something like this: (Uses the classic observer pattern, where one thread controls the source and the rest are notified of new data)

 class Broadcaster { private final ConcurrentSkipListSet<Listener> listeners = new ConcurrentSkipListSet<Listener>(); private final BufferedReader source = //inject source // register / de-gegister code public void register(Listener listener); public void deRegister(Listener listener); // usually it used like broadcaster.register(this); public void broadcast(){ String read = null; while((read = source.readLine())!=null){ for(Listener listener : listeners){ listener.send(read); } } } } 

and

 class Listener implements Runnable { private final Queue<String> queue = new LinkedBlockingQueue<String>(); public void send(String read){ queue.add(read); } public void run(){ while(!Thread.currentThread().isInterrupted()){ String got = queue.get(); System.out.println(got); } } } 

I have not tested this thing or anything else, so have mercy if it doesn't work!

EDIT . If you have a memory limit, you can also do this:

 Queue<String> queue = new LinkedBlockingQueue<String>(2000); 

This will lead to an upper limit on how many items can queue in the queue, and when this limit is reached, threads that want to put items on it will have to wait (if add used, that is), so Broadcaster will block (wait) when the listeners cannot quickly use the data (while in this simple scheme, another listener might starve).

EDIT2:. When you do this, you should most likely only queue unchanging things. If you put, for example, byte[] , an impolite listener may modify the data, and other threads may be surprised. If this is somehow impossible, you should put another copy in each queue, for example. byteArray.clone() . This may result in some performance degradation.

+1
source

Source: https://habr.com/ru/post/1342247/


All Articles