Java.nio Selectors and SocketChannel to Continue

I am currently using java.nio.channel.Selectors and SocketChannels for an application that will open one-to-many connections to continue streaming to the server. I have three streams for my application: StreamWriteWorker - performs a write operation to SocketChannel, StreamReadWorker - reads bytes from the buffer and parsing contents, and StreamTaskDispatcher - selects a Selector for readyOps and sends new runnables for workflows.

Problem. When calling the selection method, only the value> 0 (valid readyOps) is returned on the first call; I can write and send data on all ready channels at a time, but all subsequent calls to the select method return 0.

Question: Do I need to activate SocketChannel after each read / write (I hope not!)? If this is not the case, what could be the reason that the SocketChannels are not available for any read / write operations?

I'm sorry that I can’t post the code, but I hope I have explained the problem clearly enough for someone to help. I was looking for answers, and I see that you cannot reuse the SocketChannel connection after closing it, but my channel should not be close, the server never gets the result of the EOF stream.

I made some progress and realized that the write operation did not occur in the server application due to a json parsing error. So, now my SocketChannel in the client application code becomes ready for another write operation after processing the read operation. I assume this is the nature of TCP SocketChannels. However, SocketChannel does not become available for another server-side read operation. Is this normal behavior for SocketChannels? Do I need to close the client-side connection after a read operation and establish a new connection?

Here is an example of the code I'm trying to do:

package org.stream.socket; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CodingErrorAction; import java.util.HashMap; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang3.RandomStringUtils; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import com.google.gson.stream.JsonToken; public class ClientServerTest { private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>(); private ExecutorService executor = Executors.newFixedThreadPool(1); private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>(); private class StreamWriteTask implements Runnable { private ByteBuffer buffer; private SelectionKey key; private Selector selector; private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) { this.buffer = buffer; this.key = key; this.selector = selector; } @Override public void run() { SocketChannel sc = (SocketChannel) key.channel(); byte[] data = (byte[]) key.attachment(); buffer.clear(); buffer.put(data); buffer.flip(); int results = 0; while (buffer.hasRemaining()) { try { results = sc.write(buffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (results == 0) { buffer.compact(); buffer.flip(); data = new byte[buffer.remaining()]; buffer.get(data); key.interestOps(SelectionKey.OP_WRITE); key.attach(data); selector.wakeup(); return; } } key.interestOps(SelectionKey.OP_READ); key.attach(null); selector.wakeup(); } } private class StreamReadTask implements Runnable { private ByteBuffer buffer; private SelectionKey key; private Selector selector; private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) { this.buffer = buffer; this.key = key; this.selector = selector; } private boolean checkUUID(byte[] data) { return uuidToSize.containsKey(new String(data)); } @Override public void run() { SocketChannel sc = (SocketChannel) key.channel(); buffer.clear(); byte[] data = (byte[]) key.attachment(); if (data != null) { buffer.put(data); } int count = 0; int readAttempts = 0; try { while ((count = sc.read(buffer)) > 0) { readAttempts++; } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (count == 0) { buffer.flip(); data = new byte[buffer.limit()]; buffer.get(data); if (checkUUID(data)) { key.interestOps(SelectionKey.OP_READ); key.attach(data); } else { System.out.println("Clinet Read - uuid ~~~~ " + new String(data)); key.interestOps(SelectionKey.OP_WRITE); key.attach(null); } } if (count == -1) { try { sc.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } selector.wakeup(); } } private class ClientWorker implements Runnable { @Override public void run() { try { Selector selector = Selector.open(); SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false); sc.connect(new InetSocketAddress("127.0.0.1", 9001)); sc.register(selector, SelectionKey.OP_CONNECT); ByteBuffer buffer = ByteBuffer.allocateDirect(65535); while (selector.isOpen()) { int count = selector.select(10); if (count == 0) { continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { final SelectionKey key = it.next(); it.remove(); if (!key.isValid()) { continue; } if (key.isConnectable()) { sc = (SocketChannel) key.channel(); if (!sc.finishConnect()) { continue; } sc.register(selector, SelectionKey.OP_WRITE); } if (key.isReadable()) { key.interestOps(0); executor.execute(new StreamReadTask(buffer, key, selector)); } if (key.isWritable()) { key.interestOps(0); if(key.attachment() == null){ key.attach(dataQueue.take()); } executor.execute(new StreamWriteTask(buffer, key, selector)); } } } } catch (IOException ex) { // Handle Exception }catch(InterruptedException ex){ } } } private class ServerWorker implements Runnable { @Override public void run() { try { Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ServerSocket socket = ssc.socket(); socket.bind(new InetSocketAddress(9001)); ssc.configureBlocking(false); ssc.register(selector, SelectionKey.OP_ACCEPT); ByteBuffer buffer = ByteBuffer.allocateDirect(65535); DataHandler handler = new DataHandler(); while (selector.isOpen()) { int count = selector.select(10); if (count == 0) { continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { final SelectionKey key = it.next(); it.remove(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { handler.readSocket(buffer, key); } if (key.isWritable()) { handler.writeToSocket(buffer, key); } } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private class DataHandler { private JsonObject parseData(StringBuilder builder) { if (!builder.toString().endsWith("}")) { return null; } JsonParser parser = new JsonParser(); JsonObject obj = (JsonObject) parser.parse(builder.toString()); return obj; } private void readSocket(ByteBuffer buffer, SelectionKey key) throws IOException { SocketChannel sc = (SocketChannel) key.channel(); buffer.clear(); int count = Integer.MAX_VALUE; int readAttempts = 0; try { while ((count = sc.read(buffer)) > 0) { readAttempts++; } } catch (IOException e) { e.printStackTrace(); } if (count == 0) { buffer.flip(); StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key .attachment() : new StringBuilder(); Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); decoder.onMalformedInput(CodingErrorAction.IGNORE); System.out.println(buffer); CharBuffer charBuffer = decoder.decode(buffer); String content = charBuffer.toString(); charBuffer = null; builder.append(content); System.out.println(content); JsonObject obj = parseData(builder); if (obj == null) { key.attach(builder); key.interestOps(SelectionKey.OP_READ); } else { System.out.println("data ~~~~~~~ " + builder.toString()); JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive(); key.attach(uuid.toString().getBytes()); key.interestOps(SelectionKey.OP_WRITE); } } if (count == -1) { key.attach(null); sc.close(); } } private void writeToSocket(ByteBuffer buffer, SelectionKey key) throws IOException { SocketChannel sc = (SocketChannel) key.channel(); byte[] data = (byte[]) key.attachment(); buffer.clear(); buffer.put(data); buffer.flip(); int writeAttempts = 0; while (buffer.hasRemaining()) { int results = sc.write(buffer); writeAttempts++; System.out.println("Write Attempt #" + writeAttempts); if (results == 0) { buffer.compact(); buffer.flip(); data = new byte[buffer.remaining()]; buffer.get(data); key.attach(data); key.interestOps(SelectionKey.OP_WRITE); break; } } key.interestOps(SelectionKey.OP_READ); key.attach(null); } } public ClientServerTest() { for (int index = 0; index < 1000; index++) { JsonObject obj = new JsonObject(); String uuid = UUID.randomUUID().toString(); uuidToSize.put(uuid, uuid.length()); obj.addProperty("uuid", uuid); String data = RandomStringUtils.randomAlphanumeric(10000); obj.addProperty("event", data); dataQueue.add(obj.toString().getBytes()); } Thread serverWorker = new Thread(new ServerWorker()); serverWorker.start(); Thread clientWorker = new Thread(new ClientWorker()); clientWorker.start(); } /** * @param args */ public static void main(String[] args) { ClientServerTest test = new ClientServerTest(); for(;;){ } } } 
+6
source share
1 answer
  • The correct way to handle OP_CONNECT is to try to finishConnect() once, and if it manages to unregister OP_CONNECT and register OP_READ or OP_WRITE , it may be the last one, since you are a client. Cycle and sleep in non-blocking mode does not make sense. If finishConnect() returns false, OP_CONNECT will light up again.

  • Your processing !key.isAcceptable() !key.isReadable() and !key.isWriteable() makes absolutely zero sense. If the key is acceptable, call accept() . If it is being read, call read() . If it is being written, call write() . It is so simple.

  • You need to know that channels are almost always writable, except for short periods when their socket send buffer is full. Therefore, register only for OP_WRITE when you have something to write, or even better after you tried to write and received zero income; then when OP_WRITE triggered, re-record and unregister OP_WRITE if you did not receive another zero.

  • With your ByteBuffer you are too economical. In practice, you need one per channel. You can save it as a key application so that you can return it when you need it. Otherwise, you will not have any way to accumulate partial readings that are sure to happen, or any other way to re-record.

+4
source

Source: https://habr.com/ru/post/916430/


All Articles