Is there a better way by which I can highlight the size that I use by doing a ByteBuffer instead of using a hard-coded 70000 ?
There are at least two approaches that do not overlap. You can use both.
One of them is the buffer pool. You need to find out how many buffers you need during peak periods, and use the maximum over it, for example. max + max / 2, max + average, max + mode, 2 * max.
import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.concurrent.CompletionStage; import java.util.concurrent.LinkedBlockingDeque; import java.util.function.Consumer; import java.util.function.Function; public class ByteBufferPool { private final int bufferCapacity; private final LinkedBlockingDeque<ByteBuffer> queue; public ByteBufferPool(int limit, int bufferCapacity) { if (limit < 0) throw new IllegalArgumentException("limit must not be negative."); if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative."); this.bufferCapacity = bufferCapacity; this.queue = (limit == 0) ? null : new LinkedBlockingDeque<>(limit); } public ByteBuffer acquire() { ByteBuffer buffer = (queue == null) ? null : queue.pollFirst(); if (buffer == null) { buffer = ByteBuffer.allocate(bufferCapacity); } else { buffer.clear(); buffer.order(ByteOrder.BIG_ENDIAN); } return buffer; } public boolean release(ByteBuffer buffer) { if (buffer == null) throw new IllegalArgumentException("buffer must not be null."); if (buffer.capacity() != bufferCapacity) throw new IllegalArgumentException("buffer has unsupported capacity."); if (buffer.isDirect()) throw new IllegalArgumentException("buffer must not be direct."); if (buffer.isReadOnly()) throw new IllegalArgumentException("buffer must not be read-only."); return (queue == null) ? false : queue.offerFirst(buffer); } public void withBuffer(Consumer<ByteBuffer> action) { if (action == null) throw new IllegalArgumentException("action must not be null."); ByteBuffer buffer = acquire(); try { action.accept(buffer); } finally { release(buffer); } } public <T> T withBuffer(Function<ByteBuffer, T> function) { if (function == null) throw new IllegalArgumentException("function must not be null."); ByteBuffer buffer = acquire(); try { return function.apply(buffer); } finally { release(buffer); } } public <T> CompletionStage<T> withBufferAsync(Function<ByteBuffer, CompletionStage<T>> asyncFunction) { if (asyncFunction == null) throw new IllegalArgumentException("asyncFunction must not be null."); ByteBuffer buffer = acquire(); CompletionStage<T> future = null; try { future = asyncFunction.apply(buffer); } finally { if (future == null) { release(buffer); } else { future = future.whenComplete((result, throwable) -> release(buffer)); } } return future; } }
withBuffer methods allow direct use of the pool, while acquire and release allow you to separate collection and release points.
The other is the separation of the serialization interface, for example. put , putInt and putLong , where you can implement the byte counting class and the actual byte buffering class. You must add a method to such an interface to find out if the serializer counts bytes or buffering, to avoid unnecessary byte generation, and another method to increase the use of bytes directly is useful when calculating the size of a string in some encoding without actual serialization.
public interface ByteSerializer { ByteSerializer put(byte value); ByteSerializer putInt(int value); ByteSerializer putLong(long value); boolean isSerializing(); ByteSerializer add(int bytes); int position(); }
public class ByteCountSerializer implements ByteSerializer { private int count = 0; @Override public ByteSerializer put(byte value) { count += 1; return this; } @Override public ByteSerializer putInt(int value) { count += 4; return this; } @Override public ByteSerializer putLong(long value) { count += 8; return this; } @Override public boolean isSerializing() { return false; } @Override public ByteSerializer add(int bytes) { if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative."); count += bytes; return this; } @Override public int position() { return count; } }
import java.nio.ByteBuffer; public class ByteBufferSerializer implements ByteSerializer { private final ByteBuffer buffer; public ByteBufferSerializer(int bufferCapacity) { if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative."); this.buffer = ByteBuffer.allocate(bufferCapacity); } @Override public ByteSerializer put(byte value) { buffer.put(value); return this; } @Override public ByteSerializer putInt(int value) { buffer.putInt(value); return this; } @Override public ByteSerializer putLong(long value) { buffer.putLong(value); return this; } @Override public boolean isSerializing() { return true; } @Override public ByteSerializer add(int bytes) { if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative."); for (int b = 0; b < bytes; b++) { buffer.put((byte)0); } return this;
In your code, you would do something in this direction (not tested):
ByteCountSerializer counter = new ByteCountSerializer(); dataFrame.serialize(counter); ByteBufferSerializer serializer = new ByteByfferSerializer(counter.position()); dataFrame.serialize(serializer); ByteBuffer buffer = serializer.buffer();
Your DataFrame.serialize method DataFrame.serialize to be refactored to accept a ByteSerializer , and in cases where it will generate data, it should check isSerializing to see if it should calculate the size or actually write bytes.
I leave the combination of both approaches as an exercise, mainly because it depends on how you decide to do it.
For example, you can force ByteBufferSerializer use the pool directly and save any capacity (for example, your 70,000), you can combine ByteBuffer by capacity (but instead of the required capacity, use the lowest power 2 more than the required capacity, and set the buffer limit before returning from acquire ), or you can directly ByteBufferSerializer while you add the reset() method.
Also, is there a better way compared to what I'm doing, which packs my header and data into a single byte array?
Yes. Pass an instance of byte buffering instead of some methods returning byte arrays that are discarded the moment after checking their length or their contents.
I also need to make sure it is thread safe, as it can be called by multiple threads.
As long as each buffer is used by only one thread, with proper synchronization, you do not need to worry.
Correct synchronization means that your pool manager acquires and frees semantics in your methods and that if a buffer is used by several threads between fetching and returning to the pool, you add release semantics to the stream, which stops using the buffer and adding receive semantics to the stream that starts use a buffer. For example, if you pass a buffer through CompletableFuture s, you should not worry about this, or if you explicitly pass threads using Exchanger or the correct implementation of BlockingQueue .
From java.util.concurrent package description:
The methods of all classes in java.util.concurrent and its subpackages extend these guarantees for higher-level synchronization. In particular:
Actions in a stream before placing an object in any parallel collection occur before actions after accessing or removing this element from the collection in another stream.
Actions in the thread before sending Runnable to Executor will happen before it starts. Similarly for Callables provided in ExecutorService .
The actions taken by asynchronous computation, represented by Future actions that occurred after searching for the result through Future.get() in another thread.
Actions before releasing synchronizer methods such as Lock.unlock , Semaphore.release and CountDownLatch.countDown occur before actions after a successful receiving method, such as Lock.lock , Semaphore.acquire , Condition.await and CountDownLatch.await in one and the same synchronizer object in another thread.
For each pair of threads that successfully exchange objects through Exchanger , actions prior to exchange() in each thread occur before those that follow the corresponding exchange() in another thread.
Actions before the call to CyclicBarrier.await and Phaser.awaitAdvance (as well as its variants) occur before actions performed by the barrier action, and actions performed by the barrier action occur before actions after successful return from the corresponding await in other threads.