Java Network: Socket / InputStream Event

I am implementing an event-oriented layer on top of Java sockets, and I was wondering if there is a way to determine if there is data waiting to be read.

My usual approach would be to read from the socket to the buffer and call the provided callbacks when the buffer is full for a given number of bytes (which could be 0 if the callback needs to be fired every time something comes up), but I I suspect Java is already buffering for me.

Is the available() InputStream method reliable for this? Should I just read() and do my own buffering on top of Socket? Or is there another way?

+6
source share
2 answers

In short, no. available() not reliable (at least it was not for me). I recommend using java.nio.channels.SocketChannel associated with Selector and SelectionKey . This solution is somewhat event-based, but more complex than simple sockets.

For clients:

  • Make a channel for the socket ( socket ), open the selector ( selector = Selector.open(); ).
  • Use non-blocking socket.configureBlocking(false);
  • Register selector for socket.register(selector, SelectionKey.OP_CONNECT); connections socket.register(selector, SelectionKey.OP_CONNECT);
  • Connect socket.connect(new InetSocketAddress(host, port));
  • See if there is anything new selector.select();
  • If β€œnew” refers to a successful connection, register the selector for OP_READ ; if "new" refers to the data available, just read from the socket.

However, in order to have an asynchronous interface, you will need to configure a separate thread (although the socket is created as unlocked, the thread will still block), which checks if something has arrived or not.

There is ServerSocketChannel for the servers, and you use OP_ACCEPT for it.

For reference, this is my code (client), should give you a hint:

  private Thread readingThread = new ListeningThread(); /** * Listening thread - reads messages in a separate thread so the application does not get blocked. */ private class ListeningThread extends Thread { public void run() { running = true; try { while(!close) listen(); messenger.close(); } catch(ConnectException ce) { doNotifyConnectionFailed(ce); } catch(Exception e) { // e.printStackTrace(); messenger.close(); } running = false; } } /** * Connects to host and port. * @param host Host to connect to. * @param port Port of the host machine to connect to. */ public void connect(String host, int port) { try { SocketChannel socket = SocketChannel.open(); socket.configureBlocking(false); socket.register(this.selector, SelectionKey.OP_CONNECT); socket.connect(new InetSocketAddress(host, port)); } catch(IOException e) { this.doNotifyConnectionFailed(e); } } /** * Waits for an event to happen, processes it and then returns. * @throws IOException when something goes wrong. */ protected void listen() throws IOException { // see if there are any new things going on this.selector.select(); // process events Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while(iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); // check validity if(key.isValid()) { // if connectable... if(key.isConnectable()) { // ...establish connection, make messenger, and notify everyone SocketChannel client = (SocketChannel)key.channel(); // now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast if(client!=null && client.finishConnect()) { client.register(this.selector, SelectionKey.OP_READ); } } // if readable, tell messenger to read bytes else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) { // read message here } } } } /** * Starts the client. */ public void start() { // start a reading thread if(!this.running) { this.readingThread = new ListeningThread(); this.readingThread.start(); } } /** * Tells the client to close at nearest possible moment. */ public void close() { this.close = true; } 

And for the server:

  /** * Constructs a server. * @param port Port to listen to. * @param protocol Protocol of messages. * @throws IOException when something goes wrong. */ public ChannelMessageServer(int port) throws IOException { this.server = ServerSocketChannel.open(); this.server.configureBlocking(false); this.server.socket().bind(new InetSocketAddress(port)); this.server.register(this.selector, SelectionKey.OP_ACCEPT); } /** * Waits for event, then exits. * @throws IOException when something goes wrong. */ protected void listen() throws IOException { // see if there are any new things going on this.selector.select(); // process events Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while(iter.hasNext()) { SelectionKey key = iter.next(); // do something with the connected socket iter.remove(); if(key.isValid()) this.process(key); } } /** * Processes a selection key. * @param key SelectionKey. * @throws IOException when something is wrong. */ protected void process(SelectionKey key) throws IOException { // if incoming connection if(key.isAcceptable()) { // get client SocketChannel client = (((ServerSocketChannel)key.channel()).accept()); try { client.configureBlocking(false); client.register(this.selector, SelectionKey.OP_READ); } catch(Exception e) { // catch } } // if readable, tell messenger to read else if(key.isReadable()) { // read } } 

Hope this helps.

+8
source

Accessible () will only tell you if you can read data without going to the OS. This is not very useful here.

You can either block or not block reading, as you prefer. Non-blocking reads simply return when there is no data to read, so that may be what you want.

0
source

Source: https://habr.com/ru/post/887869/


All Articles