I am currently using java.nio.channel.Selectors and SocketChannels for an application that will open one-to-many connections to continue streaming to the server. I have three streams for my application: StreamWriteWorker - performs a write operation to SocketChannel, StreamReadWorker - reads bytes from the buffer and parsing contents, and StreamTaskDispatcher - selects a Selector for readyOps and sends new runnables for workflows.
Problem. When calling the selection method, only the value> 0 (valid readyOps) is returned on the first call; I can write and send data on all ready channels at a time, but all subsequent calls to the select method return 0.
Question: Do I need to activate SocketChannel after each read / write (I hope not!)? If this is not the case, what could be the reason that the SocketChannels are not available for any read / write operations?
I'm sorry that I canβt post the code, but I hope I have explained the problem clearly enough for someone to help. I was looking for answers, and I see that you cannot reuse the SocketChannel connection after closing it, but my channel should not be close, the server never gets the result of the EOF stream.
I made some progress and realized that the write operation did not occur in the server application due to a json parsing error. So, now my SocketChannel in the client application code becomes ready for another write operation after processing the read operation. I assume this is the nature of TCP SocketChannels. However, SocketChannel does not become available for another server-side read operation. Is this normal behavior for SocketChannels? Do I need to close the client-side connection after a read operation and establish a new connection?
Here is an example of the code I'm trying to do:
package org.stream.socket; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CodingErrorAction; import java.util.HashMap; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang3.RandomStringUtils; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import com.google.gson.stream.JsonToken; public class ClientServerTest { private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>(); private ExecutorService executor = Executors.newFixedThreadPool(1); private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>(); private class StreamWriteTask implements Runnable { private ByteBuffer buffer; private SelectionKey key; private Selector selector; private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) { this.buffer = buffer; this.key = key; this.selector = selector; } @Override public void run() { SocketChannel sc = (SocketChannel) key.channel(); byte[] data = (byte[]) key.attachment(); buffer.clear(); buffer.put(data); buffer.flip(); int results = 0; while (buffer.hasRemaining()) { try { results = sc.write(buffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (results == 0) { buffer.compact(); buffer.flip(); data = new byte[buffer.remaining()]; buffer.get(data); key.interestOps(SelectionKey.OP_WRITE); key.attach(data); selector.wakeup(); return; } } key.interestOps(SelectionKey.OP_READ); key.attach(null); selector.wakeup(); } } private class StreamReadTask implements Runnable { private ByteBuffer buffer; private SelectionKey key; private Selector selector; private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) { this.buffer = buffer; this.key = key; this.selector = selector; } private boolean checkUUID(byte[] data) { return uuidToSize.containsKey(new String(data)); } @Override public void run() { SocketChannel sc = (SocketChannel) key.channel(); buffer.clear(); byte[] data = (byte[]) key.attachment(); if (data != null) { buffer.put(data); } int count = 0; int readAttempts = 0; try { while ((count = sc.read(buffer)) > 0) { readAttempts++; } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (count == 0) { buffer.flip(); data = new byte[buffer.limit()]; buffer.get(data); if (checkUUID(data)) { key.interestOps(SelectionKey.OP_READ); key.attach(data); } else { System.out.println("Clinet Read - uuid ~~~~ " + new String(data)); key.interestOps(SelectionKey.OP_WRITE); key.attach(null); } } if (count == -1) { try { sc.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } selector.wakeup(); } } private class ClientWorker implements Runnable { @Override public void run() { try { Selector selector = Selector.open(); SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false); sc.connect(new InetSocketAddress("127.0.0.1", 9001)); sc.register(selector, SelectionKey.OP_CONNECT); ByteBuffer buffer = ByteBuffer.allocateDirect(65535); while (selector.isOpen()) { int count = selector.select(10); if (count == 0) { continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { final SelectionKey key = it.next(); it.remove(); if (!key.isValid()) { continue; } if (key.isConnectable()) { sc = (SocketChannel) key.channel(); if (!sc.finishConnect()) { continue; } sc.register(selector, SelectionKey.OP_WRITE); } if (key.isReadable()) { key.interestOps(0); executor.execute(new StreamReadTask(buffer, key, selector)); } if (key.isWritable()) { key.interestOps(0); if(key.attachment() == null){ key.attach(dataQueue.take()); } executor.execute(new StreamWriteTask(buffer, key, selector)); } } } } catch (IOException ex) { // Handle Exception }catch(InterruptedException ex){ } } } private class ServerWorker implements Runnable { @Override public void run() { try { Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ServerSocket socket = ssc.socket(); socket.bind(new InetSocketAddress(9001)); ssc.configureBlocking(false); ssc.register(selector, SelectionKey.OP_ACCEPT); ByteBuffer buffer = ByteBuffer.allocateDirect(65535); DataHandler handler = new DataHandler(); while (selector.isOpen()) { int count = selector.select(10); if (count == 0) { continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { final SelectionKey key = it.next(); it.remove(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { handler.readSocket(buffer, key); } if (key.isWritable()) { handler.writeToSocket(buffer, key); } } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private class DataHandler { private JsonObject parseData(StringBuilder builder) { if (!builder.toString().endsWith("}")) { return null; } JsonParser parser = new JsonParser(); JsonObject obj = (JsonObject) parser.parse(builder.toString()); return obj; } private void readSocket(ByteBuffer buffer, SelectionKey key) throws IOException { SocketChannel sc = (SocketChannel) key.channel(); buffer.clear(); int count = Integer.MAX_VALUE; int readAttempts = 0; try { while ((count = sc.read(buffer)) > 0) { readAttempts++; } } catch (IOException e) { e.printStackTrace(); } if (count == 0) { buffer.flip(); StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key .attachment() : new StringBuilder(); Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); decoder.onMalformedInput(CodingErrorAction.IGNORE); System.out.println(buffer); CharBuffer charBuffer = decoder.decode(buffer); String content = charBuffer.toString(); charBuffer = null; builder.append(content); System.out.println(content); JsonObject obj = parseData(builder); if (obj == null) { key.attach(builder); key.interestOps(SelectionKey.OP_READ); } else { System.out.println("data ~~~~~~~ " + builder.toString()); JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive(); key.attach(uuid.toString().getBytes()); key.interestOps(SelectionKey.OP_WRITE); } } if (count == -1) { key.attach(null); sc.close(); } } private void writeToSocket(ByteBuffer buffer, SelectionKey key) throws IOException { SocketChannel sc = (SocketChannel) key.channel(); byte[] data = (byte[]) key.attachment(); buffer.clear(); buffer.put(data); buffer.flip(); int writeAttempts = 0; while (buffer.hasRemaining()) { int results = sc.write(buffer); writeAttempts++; System.out.println("Write Attempt #" + writeAttempts); if (results == 0) { buffer.compact(); buffer.flip(); data = new byte[buffer.remaining()]; buffer.get(data); key.attach(data); key.interestOps(SelectionKey.OP_WRITE); break; } } key.interestOps(SelectionKey.OP_READ); key.attach(null); } } public ClientServerTest() { for (int index = 0; index < 1000; index++) { JsonObject obj = new JsonObject(); String uuid = UUID.randomUUID().toString(); uuidToSize.put(uuid, uuid.length()); obj.addProperty("uuid", uuid); String data = RandomStringUtils.randomAlphanumeric(10000); obj.addProperty("event", data); dataQueue.add(obj.toString().getBytes()); } Thread serverWorker = new Thread(new ServerWorker()); serverWorker.start(); Thread clientWorker = new Thread(new ClientWorker()); clientWorker.start(); } /** * @param args */ public static void main(String[] args) { ClientServerTest test = new ClientServerTest(); for(;;){ } } }