Package header and data layout in a single byte array using ByteBuffer in an efficient way?

I have a header and data that I need to represent in a single byte array. And I have a specific format for packing the header in a byte array, as well as another format for packing data in a byte array. After I have these two, I need to make the last byte array from it.

Below is a layout that is defined in C ++ and, accordingly, I have to do it in Java.

// below is my header offsets layout // addressedCenter must be the first byte static constexpr uint32_t addressedCenter = 0; static constexpr uint32_t version = addressedCenter + 1; static constexpr uint32_t numberOfRecords = version + 1; static constexpr uint32_t bufferUsed = numberOfRecords + sizeof(uint32_t); static constexpr uint32_t location = bufferUsed + sizeof(uint32_t); static constexpr uint32_t locationFrom = location + sizeof(CustomerAddress); static constexpr uint32_t locationOrigin = locationFrom + sizeof(CustomerAddress); static constexpr uint32_t partition = locationOrigin + sizeof(CustomerAddress); static constexpr uint32_t copy = partition + 1; // this is the full size of the header static constexpr uint32_t headerOffset = copy + 1; 

And CustomerAddress is a typedef for uint64_t , and it is structured like this:

 typedef uint64_t CustomerAddress; void client_data(uint8_t datacenter, uint16_t clientId, uint8_t dataId, uint32_t dataCounter, CustomerAddress& customerAddress) { customerAddress = (uint64_t(datacenter) << 56) + (uint64_t(clientId) << 40) + (uint64_t(dataId) << 32) + dataCounter; } 

And below is my data layout -

 // below is my data layout - // // key type - 1 byte // key len - 1 byte // key (variable size = key_len) // timestamp (sizeof uint64_t) // data size (sizeof uint16_t) // data (variable size = data size) 

Problem: -

Now, for the part of the project, I'm trying to present general material in one specific class in Java, so that I can simply pass the necessary fields, and it can make me the final Byte Array , from which there will be a header first, and then data:

Below is my DataFrame class:

 public final class DataFrame { private final byte addressedCenter; private final byte version; private final Map<byte[], byte[]> keyDataHolder; private final long location; private final long locationFrom; private final long locationOrigin; private final byte partition; private final byte copy; public DataFrame(byte addressedCenter, byte version, Map<byte[], byte[]> keyDataHolder, long location, long locationFrom, long locationOrigin, byte partition, byte copy) { this.addressedCenter = addressedCenter; this.version = version; this.keyDataHolder = keyDataHolder; this.location = location; this.locationFrom = locationFrom; this.locationOrigin = locationOrigin; this.partition = partition; this.copy = copy; } public byte[] serialize() { // All of the data is embedded in a binary array with fixed maximum size 70000 ByteBuffer byteBuffer = ByteBuffer.allocate(70000); byteBuffer.order(ByteOrder.BIG_ENDIAN); int numOfRecords = keyDataHolder.size(); int bufferUsed = getBufferUsed(keyDataHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2; // header layout byteBuffer.put(addressedCenter); // byte byteBuffer.put(version); // byte byteBuffer.putInt(numOfRecords); // int byteBuffer.putInt(bufferUsed); // int byteBuffer.putLong(location); // long byteBuffer.putLong(locationFrom); // long byteBuffer.putLong(locationOrigin); // long byteBuffer.put(partition); // byte byteBuffer.put(copy); // byte // now the data layout for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) { byte keyType = 0; byte keyLength = (byte) entry.getKey().length; byte[] key = entry.getKey(); byte[] data = entry.getValue(); short dataSize = (short) data.length; ByteBuffer dataBuffer = ByteBuffer.wrap(data); long timestamp = 0; if (dataSize > 10) { timestamp = dataBuffer.getLong(2); } byteBuffer.put(keyType); byteBuffer.put(keyLength); byteBuffer.put(key); byteBuffer.putLong(timestamp); byteBuffer.putShort(dataSize); byteBuffer.put(data); } return byteBuffer.array(); } private int getBufferUsed(final Map<byte[], byte[]> keyDataHolder) { int size = 36; for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) { size += 1 + 1 + 8 + 2; size += entry.getKey().length; size += entry.getValue().length; } return size; } } 

And the following shows how I use my DataFrame class:

  public static void main(String[] args) throws IOException { // header layout byte addressedCenter = 0; byte version = 1; long location = packCustomerAddress((byte) 12, (short) 13, (byte) 32, (int) 120); long locationFrom = packCustomerAddress((byte) 21, (short) 23, (byte) 41, (int) 130); long locationOrigin = packCustomerAddress((byte) 21, (short) 24, (byte) 41, (int) 140); byte partition = 3; byte copy = 0; // this map will have key as the actual key and value as the actual data, both in byte array // for now I am storing only two entries in this map Map<byte[], byte[]> keyDataHolder = new HashMap<byte[], byte[]>(); for (int i = 1; i <= 2; i++) { keyDataHolder.put(generateKey(), getMyData()); } DataFrame records = new DataFrame(addressedCenter, version, keyDataHolder, location, locationFrom, locationOrigin, partition, copy); // this will give me final packed byte array // which will have header and data in it. byte[] packedArray = records.serialize(); } private static long packCustomerAddress(byte datacenter, short clientId, byte dataId, int dataCounter) { return ((long) (datacenter) << 56) | ((long) clientId << 40) | ((long) dataId << 32) | ((long) dataCounter); } 

As you can see in my DataFrame class, I ByteBuffer with a predefined size of 70000 . Is there a better way by which I can highlight the size that I use by doing a ByteBuffer instead of using a hard-coded 70000 ?

Also, is there a better way compared to what I'm doing, which packs my header and data into a single byte array? I also have to make sure it is thread safe, as it can be called by multiple threads.

+5
source share
2 answers

Is there a better way by which I can highlight the size that I use by doing a ByteBuffer instead of using a hard-coded 70000 ?

There are at least two approaches that do not overlap. You can use both.

One of them is the buffer pool. You need to find out how many buffers you need during peak periods, and use the maximum over it, for example. max + max / 2, max + average, max + mode, 2 * max.

 import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.concurrent.CompletionStage; import java.util.concurrent.LinkedBlockingDeque; import java.util.function.Consumer; import java.util.function.Function; public class ByteBufferPool { private final int bufferCapacity; private final LinkedBlockingDeque<ByteBuffer> queue; public ByteBufferPool(int limit, int bufferCapacity) { if (limit < 0) throw new IllegalArgumentException("limit must not be negative."); if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative."); this.bufferCapacity = bufferCapacity; this.queue = (limit == 0) ? null : new LinkedBlockingDeque<>(limit); } public ByteBuffer acquire() { ByteBuffer buffer = (queue == null) ? null : queue.pollFirst(); if (buffer == null) { buffer = ByteBuffer.allocate(bufferCapacity); } else { buffer.clear(); buffer.order(ByteOrder.BIG_ENDIAN); } return buffer; } public boolean release(ByteBuffer buffer) { if (buffer == null) throw new IllegalArgumentException("buffer must not be null."); if (buffer.capacity() != bufferCapacity) throw new IllegalArgumentException("buffer has unsupported capacity."); if (buffer.isDirect()) throw new IllegalArgumentException("buffer must not be direct."); if (buffer.isReadOnly()) throw new IllegalArgumentException("buffer must not be read-only."); return (queue == null) ? false : queue.offerFirst(buffer); } public void withBuffer(Consumer<ByteBuffer> action) { if (action == null) throw new IllegalArgumentException("action must not be null."); ByteBuffer buffer = acquire(); try { action.accept(buffer); } finally { release(buffer); } } public <T> T withBuffer(Function<ByteBuffer, T> function) { if (function == null) throw new IllegalArgumentException("function must not be null."); ByteBuffer buffer = acquire(); try { return function.apply(buffer); } finally { release(buffer); } } public <T> CompletionStage<T> withBufferAsync(Function<ByteBuffer, CompletionStage<T>> asyncFunction) { if (asyncFunction == null) throw new IllegalArgumentException("asyncFunction must not be null."); ByteBuffer buffer = acquire(); CompletionStage<T> future = null; try { future = asyncFunction.apply(buffer); } finally { if (future == null) { release(buffer); } else { future = future.whenComplete((result, throwable) -> release(buffer)); } } return future; } } 

withBuffer methods allow direct use of the pool, while acquire and release allow you to separate collection and release points.

The other is the separation of the serialization interface, for example. put , putInt and putLong , where you can implement the byte counting class and the actual byte buffering class. You must add a method to such an interface to find out if the serializer counts bytes or buffering, to avoid unnecessary byte generation, and another method to increase the use of bytes directly is useful when calculating the size of a string in some encoding without actual serialization.

 public interface ByteSerializer { ByteSerializer put(byte value); ByteSerializer putInt(int value); ByteSerializer putLong(long value); boolean isSerializing(); ByteSerializer add(int bytes); int position(); } 

 public class ByteCountSerializer implements ByteSerializer { private int count = 0; @Override public ByteSerializer put(byte value) { count += 1; return this; } @Override public ByteSerializer putInt(int value) { count += 4; return this; } @Override public ByteSerializer putLong(long value) { count += 8; return this; } @Override public boolean isSerializing() { return false; } @Override public ByteSerializer add(int bytes) { if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative."); count += bytes; return this; } @Override public int position() { return count; } } 

 import java.nio.ByteBuffer; public class ByteBufferSerializer implements ByteSerializer { private final ByteBuffer buffer; public ByteBufferSerializer(int bufferCapacity) { if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative."); this.buffer = ByteBuffer.allocate(bufferCapacity); } @Override public ByteSerializer put(byte value) { buffer.put(value); return this; } @Override public ByteSerializer putInt(int value) { buffer.putInt(value); return this; } @Override public ByteSerializer putLong(long value) { buffer.putLong(value); return this; } @Override public boolean isSerializing() { return true; } @Override public ByteSerializer add(int bytes) { if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative."); for (int b = 0; b < bytes; b++) { buffer.put((byte)0); } return this; // or throw new UnsupportedOperationException(); } @Override public int position() { return buffer.position(); } public ByteBuffer buffer() { return buffer; } } 

In your code, you would do something in this direction (not tested):

 ByteCountSerializer counter = new ByteCountSerializer(); dataFrame.serialize(counter); ByteBufferSerializer serializer = new ByteByfferSerializer(counter.position()); dataFrame.serialize(serializer); ByteBuffer buffer = serializer.buffer(); // ... write buffer, ?, profit ... 

Your DataFrame.serialize method DataFrame.serialize to be refactored to accept a ByteSerializer , and in cases where it will generate data, it should check isSerializing to see if it should calculate the size or actually write bytes.

I leave the combination of both approaches as an exercise, mainly because it depends on how you decide to do it.

For example, you can force ByteBufferSerializer use the pool directly and save any capacity (for example, your 70,000), you can combine ByteBuffer by capacity (but instead of the required capacity, use the lowest power 2 more than the required capacity, and set the buffer limit before returning from acquire ), or you can directly ByteBufferSerializer while you add the reset() method.

Also, is there a better way compared to what I'm doing, which packs my header and data into a single byte array?

Yes. Pass an instance of byte buffering instead of some methods returning byte arrays that are discarded the moment after checking their length or their contents.

I also need to make sure it is thread safe, as it can be called by multiple threads.

As long as each buffer is used by only one thread, with proper synchronization, you do not need to worry.

Correct synchronization means that your pool manager acquires and frees semantics in your methods and that if a buffer is used by several threads between fetching and returning to the pool, you add release semantics to the stream, which stops using the buffer and adding receive semantics to the stream that starts use a buffer. For example, if you pass a buffer through CompletableFuture s, you should not worry about this, or if you explicitly pass threads using Exchanger or the correct implementation of BlockingQueue .

From java.util.concurrent package description:

The methods of all classes in java.util.concurrent and its subpackages extend these guarantees for higher-level synchronization. In particular:

  • Actions in a stream before placing an object in any parallel collection occur before actions after accessing or removing this element from the collection in another stream.

  • Actions in the thread before sending Runnable to Executor will happen before it starts. Similarly for Callables provided in ExecutorService .

  • The actions taken by asynchronous computation, represented by Future actions that occurred after searching for the result through Future.get() in another thread.

  • Actions before releasing synchronizer methods such as Lock.unlock , Semaphore.release and CountDownLatch.countDown occur before actions after a successful receiving method, such as Lock.lock , Semaphore.acquire , Condition.await and CountDownLatch.await in one and the same synchronizer object in another thread.

  • For each pair of threads that successfully exchange objects through Exchanger , actions prior to exchange() in each thread occur before those that follow the corresponding exchange() in another thread.

  • Actions before the call to CyclicBarrier.await and Phaser.awaitAdvance (as well as its variants) occur before actions performed by the barrier action, and actions performed by the barrier action occur before actions after successful return from the corresponding await in other threads.

+1
source

Another way to do this would be through a DataOutputStream around a ByteArrayOutputStream , but you should concentrate the performance tuning around the places they need, and this is not one of them. Efficiency is not a problem. Network I / O will dominate by orders of magnitude.

Another reason for using ByteArrayOutputStream is that you do not need to guess the size of the buffer in advance: it will grow as needed.

To ensure thread safety, use only local variables.

0
source

Source: https://habr.com/ru/post/1262902/


All Articles