You can get cleaner code by sorting. The Message class is currently responsible for converting DataHolder elements to serialized form. But t is also expected to provide a size limit. Unfortunately, the call method checks size expectations without knowing anything about the size requirements of the Message class.
I suggest taking responsibility for sending the appropriate data files to the Message class, and thus remove the βknowledge of the proper formatting of unwanted dataβ in the Message class itself.
You may also notice that the current implementation takes into account the full size of the header for each element, while the header is added only once per serialize()
The following is a brief overview of the proposed improvement. The code will need additional polishing. But first of all, it is intended to illustrate elementary improvements in structure and readability / maintainability.
To highlight sendToDatabase() functionality from the Message class, I simply added a simple interface:
The message class has been changed to handle add and size restrictions. Now Closeable indicates that you should call close() definitively. (This way you can use the correct constructs with current versions of java)
public final class Message implements Closeable { // or initialize it from some external source if this might change dynamically private static final int MAX_SIZE = 50000; // better determine this in sync with addHeader() method private static final int HEADER_SIZE = 36; private final byte dataCenter; private final byte recordVersion; private final long address; private final long addressFrom; private final long addressOrigin; private final byte recordsPartition; private final byte replicated; private final DatabaseDelivery delivery; private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE); private int pendingItems = 0; public Message(final Partition recordPartition, final DatabaseDelivery databaseDelivery) { this.recordsPartition = (byte) recordPartition.getPartition(); this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter(); this.recordVersion = 1; this.replicated = 0; final long packedAddress = new Data().packAddress(); this.address = packedAddress; this.addressFrom = 0L; this.addressOrigin = packedAddress; this.delivery = databaseDelivery; } private void addHeader(final ByteBuffer buffer, final int items) { buffer.put(dataCenter) .put(recordVersion) .putInt(items) .putInt(buffer.capacity()) .putLong(address) .putLong(addressFrom) .putLong(addressOrigin) .put(recordsPartition) .put(replicated); } private void sendData() { if (itemBuffer.position() == 0) { // no data to be sent //Properties: itemBuffer serialized size == 0 return; } final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE); addHeader(buffer, pendingItems); itembuffer.flip(); buffer.put(itemBuffer); delivery.sendToDatabase(address, Arrays.copyOf(buffer.array(),buffer.position()); itemBuffer.clear(); pendingItems = 0; //Properties: itemBuffer serialized size == 0 } public void addAndSendJunked(final byte[] key, final byte[] data) { if (key.length > 255) { return; } if (data.length > 255) { return; } final byte keyLength = (byte) key.length; final byte dataLength = (byte) data.length; final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2; final int newSize = itemBuffer.position() + additionalSize; //Properties: itemBuffer serialized size < MAX if (newSize >= (MAX_SIZE-HEADER_SIZE)) { sendData(); } if (additionalSize > (MAX_SIZE-HEADER_SIZE)) { //XXX Use exception that is appropriate for your application //XXX You might add sizes involved for ease of analysis throw new AppConfigurationException("Size of single item exceeds maximum size"); } //Properties: itemBuffer size (old+new or new) < MAX final ByteBuffer dataBuffer = ByteBuffer.wrap(data); final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis(); // data layout itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength).put(data); pendingItems++ ;
// Properties: itemBuffer size <MAXIMUM}
@Override public void close() { if (pendingItems > 0) { sendData(); } }
Finally, your calling code will mutate:
private void validateAndSend(final Partition partition) { final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition); // the instance providing sendToDatabase() method // just for cutting off details external to the discussion final DatabaseDelivery delivery = this; final Message message = new Message(partition, this); DataHolder dataHolder; while ((dataHolder = dataHolders.poll()) != null) { // XXX: why is client key using explicit encoding while process bytes is not? message.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8), dataHolder.getProcessBytes()); } message.close(); }
Please note that I have added some markers ( XXX ) in places that may require attention. (However, this can be explained from information external to what was provided).
There are a few details that could be considered. For instance. I'm not sure that using ByteBuffer is the right collection for this use case (in most places).
Edit: Regarding testing, due to the small size of the code, you might consider applying formal verification (at least in part). This is similar to the fact that modern compilers are associated with static code analysis: you look at your code (using paper and a pencil) and get the properties that are in this very place. I added comments to the code above (marked with //Properties ) to illustrate what you can do. (Reason: this is a simple illustration, and it will definitely require more properties to output and execute for each statement). I just made some minimal attributes for the resulting buffer size. (using MAX' as placeholder for the maximum acceptable size of the item part of the final buffer, aka MAX_SIZE-HEADER_SIZE`).
Of course, people can (correctly) suggest writing tests for critical cases. In this case, it will be tests with white boxes. testing the correct functionality of the code in the corner cases of a (known) implementation. You will also need to verify that Blackbox is working in a test mode of your code for specifications.
You can also add runtime checks to ensure correct behavior in critical parts. For instance. when doing sendToDatabase() you can check the maximum size. However, such testing will require appropriate input to plug-in the proper behavior. Using properties obtained from the code through static analysis can provide evidence of good behavior without the final doubt that he did not find one test case that would cause a crash.