Sending a byte array of approximately fixed size each time to a different method

I have a method that takes a Partition enum parameter. This method will be called by several background threads (15 max.) For the same period of time, passing a different Partition value. Here, dataHoldersByPartition is ImmutableMap of Partition and ConcurrentLinkedQueue<DataHolder> .

  private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition; //... some code to populate entry in `dataHoldersByPartition` map private void validateAndSend(final Partition partition) { ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition); Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>(); int totalSize = 0; DataHolder dataHolder; while ((dataHolder = dataHolders.poll()) != null) { byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8); if (clientKeyBytes.length > 255) continue; byte[] processBytes = dataHolder.getProcessBytes(); int clientKeyLength = clientKeyBytes.length; int processBytesLength = processBytes.length; int additionalLength = clientKeyLength + processBytesLength; if (totalSize + additionalLength > 50000) { Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition); // here size of `message.serialize()` byte array should always be less than 50k at all cost sendToDatabase(message.getAddress(), message.serialize()); clientKeyBytesAndProcessBytesHolder = new HashMap<>(); totalSize = 0; } clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes); totalSize += additionalLength; } // calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) { Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder); // here size of `message.serialize()` byte array should always be less than 50k at all cost sendToDatabase(message.getAddress(), message.serialize()); } } 

And below is my Message class:

 public final class Message { private final byte dataCenter; private final byte recordVersion; private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder; private final long address; private final long addressFrom; private final long addressOrigin; private final byte recordsPartition; private final byte replicated; public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) { this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder; this.recordsPartition = (byte) recordPartition.getPartition(); this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter(); this.recordVersion = 1; this.replicated = 0; long packedAddress = new Data().packAddress(); this.address = packedAddress; this.addressFrom = 0L; this.addressOrigin = packedAddress; } // Output of this method should always be less than 50k always public byte[] serialize() { // 36 + dataSize + 1 + 1 + keyLength + 8 + 2; int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder); ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN); // header layout byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size()) .putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin) .put(recordsPartition).put(replicated); // data layout for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) { byte keyType = 0; byte[] key = entry.getKey(); byte[] value = entry.getValue(); byte keyLength = (byte) key.length; short valueLength = (short) value.length; ByteBuffer dataBuffer = ByteBuffer.wrap(value); long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis(); byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength) .put(value); } return byteBuffer.array(); } private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) { int size = 36; for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) { size += 1 + 1 + 8 + 2; size += entry.getKey().length; size += entry.getValue().length; } return size; } // getters and to string method here } 

Basically, I have to make sure that whenever the sendToDatabase method sendToDatabase called in the validateAndSend method, the size of the message.serialize() byte array should always be less than 50 KB at all costs. My sendToDatabase method sends an array of bytes coming from the serialize method. So, for example, if I have 60k entries in the dataHolders CLQ, then I will send in two pieces to the validateAndSend method:

  • First, I will make an array of bytes that is less than 50 thousand (which means that the byte array coming from message.serialize() is less than 50 thousand) and call the sendToDatabase method on it.
  • Secondly, I will call the sendToDatabase method for the repeated part.

And to accomplish the above, I had the totalSize variable in the validateAndSend method, which is trying to measure the size of 50 KB, but it looks like my approach may be wrong, and I may discard some records or send more than 50 thousand each time. when i guess

It looks like my Message class knows about the clientKeyBytesAndProcessBytesHolder map, and I can use this map to determine the exact size by calling the getBufferCapacity method, and if it's less than 50k, then call the sendToDatabase method?

+5
source share
2 answers

You can get cleaner code by sorting. The Message class is currently responsible for converting DataHolder elements to serialized form. But t is also expected to provide a size limit. Unfortunately, the call method checks size expectations without knowing anything about the size requirements of the Message class.

I suggest taking responsibility for sending the appropriate data files to the Message class, and thus remove the β€œknowledge of the proper formatting of unwanted data” in the Message class itself.

You may also notice that the current implementation takes into account the full size of the header for each element, while the header is added only once per serialize()

The following is a brief overview of the proposed improvement. The code will need additional polishing. But first of all, it is intended to illustrate elementary improvements in structure and readability / maintainability.

To highlight sendToDatabase() functionality from the Message class, I simply added a simple interface:

 // decoupling the sending logic from the formatting // if external requirements suggest linking such functionality into the message class // such interface would be unnecessary public interface DatabaseDelivery { void sendToDatabase(long addres, byte[] messagePayload); } 

The message class has been changed to handle add and size restrictions. Now Closeable indicates that you should call close() definitively. (This way you can use the correct constructs with current versions of java)

 public final class Message implements Closeable { // or initialize it from some external source if this might change dynamically private static final int MAX_SIZE = 50000; // better determine this in sync with addHeader() method private static final int HEADER_SIZE = 36; private final byte dataCenter; private final byte recordVersion; private final long address; private final long addressFrom; private final long addressOrigin; private final byte recordsPartition; private final byte replicated; private final DatabaseDelivery delivery; private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE); private int pendingItems = 0; public Message(final Partition recordPartition, final DatabaseDelivery databaseDelivery) { this.recordsPartition = (byte) recordPartition.getPartition(); this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter(); this.recordVersion = 1; this.replicated = 0; final long packedAddress = new Data().packAddress(); this.address = packedAddress; this.addressFrom = 0L; this.addressOrigin = packedAddress; this.delivery = databaseDelivery; } private void addHeader(final ByteBuffer buffer, final int items) { buffer.put(dataCenter) .put(recordVersion) .putInt(items) .putInt(buffer.capacity()) .putLong(address) .putLong(addressFrom) .putLong(addressOrigin) .put(recordsPartition) .put(replicated); } private void sendData() { if (itemBuffer.position() == 0) { // no data to be sent //Properties: itemBuffer serialized size == 0 return; } final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE); addHeader(buffer, pendingItems); itembuffer.flip(); buffer.put(itemBuffer); delivery.sendToDatabase(address, Arrays.copyOf(buffer.array(),buffer.position()); itemBuffer.clear(); pendingItems = 0; //Properties: itemBuffer serialized size == 0 } public void addAndSendJunked(final byte[] key, final byte[] data) { if (key.length > 255) { return; } if (data.length > 255) { return; } final byte keyLength = (byte) key.length; final byte dataLength = (byte) data.length; final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2; final int newSize = itemBuffer.position() + additionalSize; //Properties: itemBuffer serialized size < MAX if (newSize >= (MAX_SIZE-HEADER_SIZE)) { sendData(); } if (additionalSize > (MAX_SIZE-HEADER_SIZE)) { //XXX Use exception that is appropriate for your application //XXX You might add sizes involved for ease of analysis throw new AppConfigurationException("Size of single item exceeds maximum size"); } //Properties: itemBuffer size (old+new or new) < MAX final ByteBuffer dataBuffer = ByteBuffer.wrap(data); final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis(); // data layout itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength).put(data); pendingItems++ ; 

// Properties: itemBuffer size <MAXIMUM}

  @Override public void close() { if (pendingItems > 0) { sendData(); } } 

Finally, your calling code will mutate:

 private void validateAndSend(final Partition partition) { final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition); // the instance providing sendToDatabase() method // just for cutting off details external to the discussion final DatabaseDelivery delivery = this; final Message message = new Message(partition, this); DataHolder dataHolder; while ((dataHolder = dataHolders.poll()) != null) { // XXX: why is client key using explicit encoding while process bytes is not? message.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8), dataHolder.getProcessBytes()); } message.close(); } 

Please note that I have added some markers ( XXX ) in places that may require attention. (However, this can be explained from information external to what was provided).

There are a few details that could be considered. For instance. I'm not sure that using ByteBuffer is the right collection for this use case (in most places).

Edit: Regarding testing, due to the small size of the code, you might consider applying formal verification (at least in part). This is similar to the fact that modern compilers are associated with static code analysis: you look at your code (using paper and a pencil) and get the properties that are in this very place. I added comments to the code above (marked with //Properties ) to illustrate what you can do. (Reason: this is a simple illustration, and it will definitely require more properties to output and execute for each statement). I just made some minimal attributes for the resulting buffer size. (using MAX' as placeholder for the maximum acceptable size of the item part of the final buffer, aka MAX_SIZE-HEADER_SIZE`).

Of course, people can (correctly) suggest writing tests for critical cases. In this case, it will be tests with white boxes. testing the correct functionality of the code in the corner cases of a (known) implementation. You will also need to verify that Blackbox is working in a test mode of your code for specifications.

You can also add runtime checks to ensure correct behavior in critical parts. For instance. when doing sendToDatabase() you can check the maximum size. However, such testing will require appropriate input to plug-in the proper behavior. Using properties obtained from the code through static analysis can provide evidence of good behavior without the final doubt that he did not find one test case that would cause a crash.

+4
source

So here is my attempt (the question will probably be better addressed to the Code Review community, but anyway). He relies on some design changes on Message to make him more like a Builder pattern. The buffer becomes part of the message. Its filling is controlled by the response to a BufferOverflowException . As soon as this happens, the buffer returns to the last successfully added result, a new message is highlighted and a second retry to add the same piece of data. After the buffer is completed, the total number of records and the total size are written to the header, and the entire buffer is dumped into a byte array (I will probably try to avoid this additional conversion and work with the buffer directly in sendToDatabase , but this is not visible):

 // TODO: structure has been adjusted for testing purposes enum Partition { A(0x1); private final int _partition; int getPartition() { return _partition; } Partition(final int partition) { _partition = partition; } } // TODO: structure has been adjusted for testing purposes final static class DataHolder { private final String _clientKey; private final byte[] _processBytes; public DataHolder( final String clientKey, final String value) { _clientKey = clientKey; byte[] valueBytes = value.getBytes(); // simulate payload including extra bytes for the header final ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + valueBytes.length) .order(ByteOrder.BIG_ENDIAN); buffer.putInt(0).putLong(System.currentTimeMillis()).put(valueBytes); _processBytes = readToBytes(buffer); } String getClientKey() { return _clientKey; } byte[] getProcessBytes() { return _processBytes; } } // API has been changed to something more like the Builder pattern final static class Message { private final long address; private final long addressFrom; private final long addressOrigin; private final byte recordsPartition; private final byte replicated; private final ByteBuffer buffer; private final int writeStatsPosition; private int payloadCount; Message(Partition recordPartition, int sizeLimit) { this.recordsPartition = (byte) recordPartition.getPartition(); this.replicated = 0; // TODO: temporarily replaced with a hard-coded constant long packedAddress = 123456789L; this.address = packedAddress; this.addressFrom = 0L; this.addressOrigin = packedAddress; buffer = ByteBuffer.allocate(sizeLimit).order(ByteOrder.BIG_ENDIAN); // TODO: temporarily replaced with a hard-coded constant byte dataCenter = 0x1; byte recordVersion = 1; buffer.put(dataCenter).put(recordVersion); writeStatsPosition = buffer.position(); buffer.putInt(datacenter).putInt(recordVersion); buffer.putLong(address).putLong(addressFrom).putLong(addressOrigin) .put(recordsPartition).put(replicated); } /** * Tries to add another pair of client key and process bytes to * the current message. Returns true if successfully added, false - * if the data cannot be accommodated due to message binary size limit. */ boolean add(byte[] key, byte[] value) { try { byte keyType = 0; byte keyLength = (byte) key.length; short valueLength = (short) value.length; ByteBuffer valueAsBuffer = ByteBuffer.wrap(value); long timestamp = valueAsBuffer.capacity() > 10 ? valueAsBuffer.getLong(2) : System.currentTimeMillis(); payloadCount++; // remember position in the buffer to roll back to in case of overflow buffer.mark(); buffer.put(keyType).put(keyLength).put(key); buffer.putLong(timestamp).putShort(valueLength).put(value); return true; } catch (BufferOverflowException e) { payloadCount--; buffer.reset(); return false; } } byte[] serialize() { int finalPosition = buffer.position(); // adjust the message header with the totals buffer.putInt(writeStatsPosition, payloadCount) .putInt(writeStatsPosition + 4, finalPosition); return readToBytes(buffer); } } static void validateAndSend(final Partition partition, final Supplier<Message> messageFactory) throws InterruptedException { final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition); Message message = messageFactory.get(); DataHolder dataHolder; while ((dataHolder = dataHolders.poll()) != null) { final byte[] keyBytes = dataHolder.getClientKey() .getBytes(StandardCharsets.UTF_8); final int keyLength = keyBytes.length; if (keyLength > 255) { continue; } while (!message.add(keyBytes, dataHolder.getProcessBytes())) { // TODO: consider proper handling of the case when the buffer size is too small to accept even a single pair Preconditions.checkState(message.payloadCount > 0, "buffer size too small to accommodate payload"); final byte[] serializedMessage = message.serialize(); // TODO: makes sense to introduce a message consumer interface and call it here instead of sendToDatabase() - simplifies testing sendToDatabase(message.address, serializedMessage); message = messageFactory.get(); } } if (message.payloadCount > 0) { byte[] serializedMessage = message.serialize(); sendToDatabase(message.address, serializedMessage); } } static void sendToDatabase(long address, byte[] serializedMessage) { // TODO: added simulating activity System.out.printf("Sending %d bytes to %d: %s%n", serializedMessage.length, address, DatatypeConverter.printHexBinary(serializedMessage)); } static byte[] readToBytes(ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); return bytes; } public static void main(String[] args) throws ExecutionException, InterruptedException { // TODO: using small value for testing - must be set to 50K in real case final int maxMessageSize = 80; final Supplier<Message> messageFactory = new Supplier<Message>() { @Override public Message get() { return new Message(Partition.A, maxMessageSize); } }; final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(Partition.A); dataHolders.add(new DataHolder("0000000001", "alpha")); dataHolders.add(new DataHolder("0000000002", "bravo")); dataHolders.add(new DataHolder("0000000003", "charlie")); dataHolders.add(new DataHolder("0000000004", "delta")); dataHolders.add(new DataHolder("0000000005", "echo")); dataHolders.add(new DataHolder("0000000006", "foxtrot")); validateAndSend(Partition.A, messageFactory); } 
+3
source

Source: https://habr.com/ru/post/1273011/


All Articles