I have an AWS cluster, Cloudera Hadoop Distribution 5.4 version configured with 1 namenode and 2 datanodes
I have a HBase table with 100K records and a scan operation is performed on top of this table using Java. According to the user’s choice at the front end, I will generate when conditions for different columns in the table are received for the records.
When I have several filter conditions and you try to retrieve data from an HBase table, I get the following exception
node3.xxx.com:50010:DataXceiver error processing WRITE_BLOCK operation src: /xxx.xx.xx.194:58862 dst: /xxx.xx.xx.193:50010
java.io.IOException: Premature EOF from inputStream
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:201)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:466)
at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:780)
at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:783)
at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:137)
at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:74)
at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:243)
at java.lang.Thread.run(Thread.java:745)
node3.xxx.com:50010:DataXceiver error processing WRITE_BLOCK operation src: /xxx.xx.xx.194:35615 dst: /xxx.xx.xx.193:50010
java.io.IOException: Premature EOF from inputStream
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:201)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:466)
at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:780)
at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:783)
at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:137)
at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:74)
at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:243)
at java.lang.Thread.run(Thread.java:745)
Below is the dependency that I added to connect to HBase using Java
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.0.0-cdh5.4.3</version>
</dependency>
Below is the java code that I use to retrieve records from HBase table
public synchronized List<Map<String, String>> getFilterData(Map<String, String> map) {
Connection connection = null;
Table table = null;
ResultScanner resultScanner = null;
List<Map<String, String>> resultList = new ArrayList<Map<String, String>>();
try {
connection = ConnectionFactory.createConnection(config);
table = connection.getTable(TableName.valueOf(fileReader.getProperty("HBASE_FILTER_DATA_TABLE_NAME")));
Scan scan = new Scan();
scan.setCaching(1000);
scan.addColumn(Bytes.toBytes("data"), Bytes.toBytes("avgValue"));
scan.addColumn(Bytes.toBytes("data"), Bytes.toBytes("ts"));
scan.addColumn(Bytes.toBytes("filter"), Bytes.toBytes("dataType"));
SingleColumnValueFilter ageGroupFilter = new SingleColumnValueFilter(Bytes.toBytes("filter"), Bytes.toBytes("ageGroup"), CompareOp.EQUAL, Bytes.toBytes("-1"));
SingleColumnValueFilter applicationFilter = new SingleColumnValueFilter(Bytes.toBytes("filter"), Bytes.toBytes("appName"), CompareOp.EQUAL, Bytes.toBytes("-1"));
SingleColumnValueFilter deviceFilter = new SingleColumnValueFilter(Bytes.toBytes("filter"), Bytes.toBytes("deviceModel"), CompareOp.EQUAL, Bytes.toBytes("-1"));
SingleColumnValueFilter genderFilter = new SingleColumnValueFilter(Bytes.toBytes("filter"), Bytes.toBytes("gender"), CompareOp.EQUAL, Bytes.toBytes("-1"));
if (map.containsKey("ageGroup")) {
ageGroupFilter = new SingleColumnValueFilter(Bytes.toBytes("filter"), Bytes.toBytes("ageGroup"), CompareOp.EQUAL, Bytes.toBytes(map.get("ageGroup")));
scan.addColumn(Bytes.toBytes("filter"), Bytes.toBytes("ageGroup"));
}
if (map.containsKey("appName")) {
applicationFilter = new SingleColumnValueFilter(Bytes.toBytes("filter"), Bytes.toBytes("appName"), CompareOp.EQUAL, Bytes.toBytes(map.get("appName").toLowerCase().replace(" ", "_")));
scan.addColumn(Bytes.toBytes("filter"), Bytes.toBytes("appName"));
}
if (map.containsKey("deviceModel")) {
deviceFilter = new SingleColumnValueFilter(Bytes.toBytes("filter"), Bytes.toBytes("deviceModel"), CompareOp.EQUAL, Bytes.toBytes(map.get("deviceModel").toLowerCase().replace(" ", "_")));
scan.addColumn(Bytes.toBytes("filter"), Bytes.toBytes("deviceModel"));
}
if (map.containsKey("gender")) {
genderFilter = new SingleColumnValueFilter(Bytes.toBytes("filter"), Bytes.toBytes("gender"), CompareOp.EQUAL, Bytes.toBytes(map.get("gender").toLowerCase()));
scan.addColumn(Bytes.toBytes("filter"), Bytes.toBytes("gender"));
}
FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filters.addFilter(ageGroupFilter);
filters.addFilter(applicationFilter);
filters.addFilter(deviceFilter);
filters.addFilter(genderFilter);
if (map.containsKey("dataType")) {
SingleColumnValueFilter dataTypeFilter = new SingleColumnValueFilter(Bytes.toBytes("filter"), Bytes.toBytes("dataType"), CompareOp.EQUAL, Bytes.toBytes(map.get("dataType").toLowerCase()));
filters.addFilter(dataTypeFilter);
}
if (map.containsKey("startDate") && map.containsKey("endDate")) {
String startDate = dateFormat.format(new Date(Long.parseLong(map.get("startDate"))));
String endDate = dateFormat.format(new Date(Long.parseLong(map.get("endDate"))));
SingleColumnValueFilter startDateFilter = new SingleColumnValueFilter(Bytes.toBytes("data"), Bytes.toBytes("hour"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(startDate));
SingleColumnValueFilter endDateFilter = new SingleColumnValueFilter(Bytes.toBytes("data"), Bytes.toBytes("hour"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(endDate));
filters.addFilter(startDateFilter);
filters.addFilter(endDateFilter);
}
scan.setFilter(filters);
resultScanner = table.getScanner(scan);
for (Result result = resultScanner.next(); result != null; result = resultScanner.next()) {
Map<String, String> row = new HashMap<String, String>();
for (Cell cell : result.listCells()) {
row.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
resultList.add(row);
}
} catch (Exception e) {
logger.error("Exception in getFilterData() : ", e.getMessage(), e);
} finally {
try {
if (resultScanner != null) {
resultScanner.close();
}
if (table != null) {
table.close();
}
if (connection != null && !connection.isClosed()) {
connection.close();
}
} catch (Exception e2) {
}
}
return resultList;
}