Sending data to a database with a limited amount of fragments

I have a method that takes an Partitionenum parameter . This method will be called by several background threads (15 max.) For the same period of time, passing a different value Partition. Here dataHoldersByPartitionis the mapping Partitionand ConcurrentLinkedQueue<DataHolder>.

  private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition;

  //... some code to populate entry in `dataHoldersByPartition`

  private void validateAndSend(final Partition partition) {  
    ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
    Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;      
    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll())  != null) {      
      byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
      if (clientKeyBytes.length > 255)
        continue;

      byte[] processBytes = dataHolder.getProcessBytes();
      int clientKeyLength = clientKeyBytes.length;
      int processBytesLength = processBytes.length;

      int additionalLength = clientKeyLength + processBytesLength;
      if (totalSize + additionalLength > 50000) {
        Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
        // here size of `message.serialize()` byte array should always be less than 50k at all cost
        sendToDatabase(message.getAddress(), message.serialize());
        clientKeyBytesAndProcessBytesHolder = new HashMap<>();
        totalSize = 0;
      }
      clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
      totalSize += additionalLength;
    }
    // calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
    if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
        Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
        // here size of `message.serialize()` byte array should always be less than 50k at all cost
        sendToDatabase(message.getAddress(), message.serialize());      
    }
  }

And below is my class Message:

public final class Message {
  private final byte dataCenter;
  private final byte recordVersion;
  private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;

  public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) {
    this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  // Output of this method should always be less than 50k always
  public byte[] serialize() {
    int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;

    ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
    // header layout
    byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size())
        .putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin)
        .put(recordsPartition).put(replicated);

    // now the data layout
    for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
      byte keyType = 0;
      byte[] key = entry.getKey();
      byte[] value = entry.getValue();
      byte keyLength = (byte) key.length;
      short valueLength = (short) value.length;

      ByteBuffer dataBuffer = ByteBuffer.wrap(value);
      long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();

      byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength)
          .put(value);
    }
    return byteBuffer.array();
  }

  private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
    int size = 36;
    for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
      size += 1 + 1 + 8 + 2;
      size += entry.getKey().length;
      size += entry.getValue().length;
    }
    return size;
  }

    // getters and to string method here
}

, , , sendToDatabase, message.serialize() 50 . sendToDatabase , serialize. - . dataHolders CLQ, clientKeyBytes processBytes. , :

  • clientKeyBytes 255, .
  • totalSize, clientKeyLength processBytesLength, totalSize 50000 .
  • 50000, clientKeyBytesAndProcessBytesHolder sendToDatabase , reset totalSize 0 .
  • , dataHolders , , .

, , - , , - - , . , 50k- , , getBufferCapacity, sendToDatabase?

+4
2

, . , , 50 ., 50 . 50K, if if (totalSize + additionalLength >= 50000).

- , , totalSize + additionalLength 50 , .

50 , , . - , dataHoldersByPartition. , . , . , : -

synchronized(this){
    ConcurrentLinkedQueue<DataHolder> dataHolders =  dataHoldersByPartition.get(partition);
}

. if (totalSize + additionalLength > 50000) clientKeyBytesAndProcessBytesHolder if(sizeof(clientKeyBytesAndProcessBytesHolder) >= 50000) ( sizeof java). - , , . , , , .

, validateAndSend . validateAndSend , . , , - , . , , . , , , , , . , , , , , , .

, : -

  `private synchronize void validateAndSend`

: : -

           synchronize(this){
            Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);                  
            sendToDatabase(message.getAddress(), message.serialize());
     }

, , .

+1

validateAndSend . , . , . , . - / . , . - netflix/hystrix.

+1

Source: https://habr.com/ru/post/1688040/


All Articles