The correct solution is probably to use NIO in some way. I already commented on how Hadoop did it here using nio underneath .
But a simpler solution is in the Dexter answer . I also came across a response from EJP that suggests using BufferedOutputStream to control when data is output. So I combined these two to come to the TimedOutputStream shown below. It does not give full control over the buffering of output to the remote (most of them are performed by the OS), but combining the appropriate buffer size and write timeout provides at least some control (see the second program for testing TimedOutputStream ).
I have not fully tested TimedOutputStream , so do your own due diligence.
Edit: An updated recording method to better correlate between buffer size and write latency, as well as a modified test program. Comments about unsafe asynchronous closing of the output stream of sockets are added.
import java.io.*; import java.util.concurrent.*; public class TimedOutputStream extends FilterOutputStream { protected int timeoutMs = 50_000; protected final boolean closeExecutor; protected final ScheduledExecutorService executor; protected ScheduledFuture<?> timeoutTask; protected volatile boolean writeTimedout; protected volatile IOException writeTimeoutCloseException; public void setTimeoutMs(int timeoutMs) { this.timeoutMs = timeoutMs; } public int getTimeoutMs() { return timeoutMs; } public boolean isWriteTimeout() { return writeTimedout; } public IOException getWriteTimeoutCloseException() { return writeTimeoutCloseException; } public ScheduledExecutorService getScheduledExecutor() { return executor; } @Override public void close() throws IOException { try { super.close();
And the test program:
import java.io.*; import java.net.*; import java.util.concurrent.*; public class TestTimedSocketOut implements Runnable, Closeable { public static void main(String[] args) { TestTimedSocketOut m = new TestTimedSocketOut(); try { m.run(); } finally { m.close(); } } final int clients = 3; // 2 is minimum, client 1 is expected to fail. final int timeOut = 1000; final int bufSize = 4096; final long maxWait = 5000L; // need a large array to write, else the OS just buffers everything and makes it work byte[] largeMsg = new byte[28_602]; final ThreadPoolExecutor tp = (ThreadPoolExecutor) Executors.newCachedThreadPool(); final ScheduledThreadPoolExecutor stp = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1); final ConcurrentLinkedQueue<Closeable> closeables = new ConcurrentLinkedQueue<Closeable>(); final CountDownLatch[] serversReady = new CountDownLatch[clients]; final CountDownLatch clientsDone = new CountDownLatch(clients); final CountDownLatch serversDone = new CountDownLatch(clients); ServerSocket ss; int port; @Override public void run() { stp.setRemoveOnCancelPolicy(true); try { ss = new ServerSocket(); ss.bind(null); port = ss.getLocalPort(); tp.execute(new SocketAccept()); for (int i = 0; i < clients; i++) { serversReady[i] = new CountDownLatch(1); ClientSideSocket css = new ClientSideSocket(i); closeables.add(css); tp.execute(css); // need sleep to ensure client 0 connects first. Thread.sleep(50L); } if (!clientsDone.await(maxWait, TimeUnit.MILLISECONDS)) { println("CLIENTS DID NOT FINISH"); } else { if (!serversDone.await(maxWait, TimeUnit.MILLISECONDS)) { println("SERVERS DID NOT FINISH"); } else { println("Finished"); } } } catch (Exception e) { e.printStackTrace(); } } @Override public void close() { try { if (ss != null) ss.close(); } catch (Exception ignored) {} Closeable c = null; while ((c = closeables.poll()) != null) { try { c.close(); } catch (Exception ignored) {} } tp.shutdownNow(); println("Scheduled tasks executed: " + stp.getTaskCount() + ", max. threads: " + stp.getLargestPoolSize()); stp.shutdownNow(); } class SocketAccept implements Runnable { @Override public void run() { try { for (int i = 0; i < clients; i++) { SeverSideSocket sss = new SeverSideSocket(ss.accept(), i); closeables.add(sss); tp.execute(sss); } } catch (Exception e) { e.printStackTrace(); } } } class SeverSideSocket implements Runnable, Closeable { Socket s; int number, cnumber; boolean completed; public SeverSideSocket(Socket s, int number) { this.s = s; this.number = number; cnumber = -1; } @Override public void run() { String t = "nothing"; try { DataInputStream in = new DataInputStream(s.getInputStream()); DataOutputStream out = new DataOutputStream(s.getOutputStream()); serversReady[number].countDown(); Thread.sleep(timeOut); t = in.readUTF(); in.readFully(new byte[largeMsg.length], 0, largeMsg.length); t += in.readUTF(); out.writeByte(1); out.flush(); cnumber = in.readInt(); completed = true; } catch (Exception e) { println("server side " + number + " stopped after " + e); // e.printStackTrace(); } finally { println("server side " + number + " received: " + t); if (completed && cnumber != number) { println("server side " + number + " expected client number " + number + " but got " + cnumber); } close(); serversDone.countDown(); } } @Override public void close() { TestTimedSocketOut.close(s); s = null; } } class ClientSideSocket implements Runnable, Closeable { Socket s; int number; public ClientSideSocket(int number) { this.number = number; } @SuppressWarnings("resource") @Override public void run() { Byte b = -1; TimedOutputStream tout = null; try { s = new Socket(); s.connect(new InetSocketAddress(port)); DataInputStream in = new DataInputStream(s.getInputStream()); tout = new TimedOutputStream(s.getOutputStream(), bufSize, stp); if (number == 1) { // expect fail tout.setTimeoutMs(timeOut / 2); } else { // expect all OK tout.setTimeoutMs(timeOut * 2); } DataOutputStream out = new DataOutputStream(tout); if (!serversReady[number].await(maxWait, TimeUnit.MILLISECONDS)) { throw new RuntimeException("Server side for client side " + number + " not ready."); } out.writeUTF("client side " + number + " starting transfer"); out.write(largeMsg); out.writeUTF(" - client side " + number + " completed transfer"); out.flush(); b = in.readByte(); out.writeInt(number); out.flush(); } catch (Exception e) { println("client side " + number + " stopped after " + e); // e.printStackTrace(); } finally { println("client side " + number + " result: " + b); if (tout != null) { if (tout.isWriteTimeout()) { println("client side " + number + " had write timeout, close exception: " + tout.getWriteTimeoutCloseException()); } else { println("client side " + number + " had no write timeout"); } } close(); clientsDone.countDown(); } } @Override public void close() { TestTimedSocketOut.close(s); s = null; } } private static void close(Socket s) { try { if (s != null) s.close(); } catch (Exception ignored) {} } private static final long START_TIME = System.currentTimeMillis(); private static void println(String msg) { System.out.println((System.currentTimeMillis() - START_TIME) + "\t " + msg); } }
source share