Asynchronous Server Using Java NIO

I am using this tutorial on creating a java nio server without a section for writing.

Everything works fine except for one interesting thing:

  • When the client sends packets too quickly, the server does not receive all messages, the server always receives the first and second packets, but nothing more.
  • If the client sends packets slowly, the server receives all packets.

Any idea?

I am adding a server class code if you need another class mentioned in the code below, I am here :).

NIOServer Class:

package server; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.*; import javax.xml.parsers.ParserConfigurationException; import org.xml.sax.SAXException; public class NioServer implements Runnable { // The host:port combination to listen on private InetAddress hostAddress; private int port; // The channel on which we'll accept connections private ServerSocketChannel serverChannel; // The selector we'll be monitoring private Selector selector; //the cach will hundle the messages that came private Cache cache; // The buffer into which we'll read data when it available private ByteBuffer readBuffer = ByteBuffer.allocate(8192); public NioServer(InetAddress hostAddress, int port , Cache cache) throws IOException { this.cache = cache; this.hostAddress = hostAddress; this.port = port; this.selector = this.initSelector(); } private Selector initSelector() throws IOException { // Create a new selector Selector socketSelector = SelectorProvider.provider().openSelector(); // Create a new non-blocking server socket channel this.serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); // Bind the server socket to the specified address and port InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port); serverChannel.socket().bind(isa); // Register the server socket channel, indicating an interest in // accepting new connections serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT); return socketSelector; } private void accept(SelectionKey key) throws IOException { // For an accept to be pending the channel must be a server socket channel. ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); // Accept the connection and make it non-blocking SocketChannel socketChannel = serverSocketChannel.accept(); Socket socket = socketChannel.socket(); socketChannel.configureBlocking(false); // Register the new SocketChannel with our Selector, indicating // we'd like to be notified when there data waiting to be read socketChannel.register(this.selector, SelectionKey.OP_READ); } private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); // Clear out our read buffer so it ready for new data this.readBuffer.clear(); // Attempt to read off the channel int numRead; try { numRead = socketChannel.read(this.readBuffer); String test = new String(this.readBuffer.array()); System.out.print(test); } catch (IOException e) { // The remote forcibly closed the connection, cancel // the selection key and close the channel. // key.cancel(); // socketChannel.close(); return; } if (numRead == -1) { // Remote entity shut the socket down cleanly. Do the // same from our end and cancel the channel. key.channel().close(); key.cancel(); return; } // Hand the data off to our worker thread this.cache.processData(this, socketChannel, this.readBuffer.array(), numRead); } public void run() { while (true) { try { // Wait for an event one of the registered channels this.selector.select(); // Iterate over the set of keys for which events are available Iterator selectedKeys = this.selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey key = (SelectionKey) selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { continue; } // Check what event is available and deal with it if (key.isAcceptable()) { this.accept(key); } else if (key.isReadable()) { this.read(key); } } } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws ParserConfigurationException, SAXException { try { Cache cache = new Cache(); new Thread(cache).start(); new Thread(new NioServer(null, 9090,cache)).start(); } catch (IOException e) { e.printStackTrace(); } } 
+6
source share
1 answer

I would expect if you read UDP. Notice how slowly you process packets using the read method. You print them to system.out, which is very slow, and not sure how fast you can process data in another thread using the processData method. This library that I wrote can help you make cross-threaded non-blocking communication if this is the source of your delay. You should also check the size of your base socket buffer. The larger it is, the more space you have to be quick and catch up before the packages start to drop. For TCP, you are likely to get an IOException on the channel if the underlying socket buffer is full. For UDP packets are silently deleted.

To access the base socket buffer size you can do:

 final Socket socket = channel.socket(); System.out.println(socket.getReceiveBufferSize()); socket.setReceiveBufferSize(newSize); 

Note. AFAIK Linux may require some OS configuration so that you can resize the base buffer. If setReceiveBufferSize has no effect (read it again to see if it has changed), report it to google. :)

+1
source

Source: https://habr.com/ru/post/917170/


All Articles