We have a 4-datanodes cluster running on CDH5.0.2 installed using Cloudera Manager packages. To import 13M user strings into HBase, we wrote a simple Python script and used an ATM with a data stream. It works, as expected, up to 100 thousand lines. And then ... then, one by one, all datanodes fall on one message:
The health test result for REGION_SERVER_GC_DURATION has become bad:
Average time spent in garbage collection was 44.8 second(s) (74.60%)
per minute over the previous 5 minute(s).
Critical threshold: 60.00%.
Any attempt to solve the problem by following the tips found on the Internet (for example, [1] , [2] , [3] ) never comes close to a solution. A "game" with a java heap size is useless. The only thing that "solved" the situation was to increase the monitoring period for the duration of garbage collection for regional servers from 5 'to 50'. Perhaps a dirty workaround.
We do not have the workforce to create a monitor for our use of the GC right now. In the end, we will, but I was wondering how it is possible to import 13M lines into HBase can lead to the mandatory failure of all servers in the region. Is there a clean solution?
Edit:
JVM options in Datanodes:
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-CMSConcurrentMTEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled
Datanodes - , CentOS 6.5, 32Gb Ram 1Quadcore 2 30 .
Python script, . : , rowkey, , , .
import sys
import happybase
import json
connection = happybase.Connection(host=master_ip)
hbase_main_table = connection.table('users_table')
hbase_index_table = connection.table('users_index_table')
header = ['ID', 'COL1', 'COL2', 'COL3', 'COL4']
for line in sys.stdin:
l = line.replace('"','').strip("\n").split("\t")
if l[header.index("ID")] == "ID":
continue
for h in header[1:]:
try:
id = str(l[header.index("ID")])
col = 'info:' + h.lower()
val = l[header.index(h)].strip()
hbase_table.put(id_au_bytes, {
col: val
})
indexed = ['COL3', 'COL4']
for typ in indexed:
idx = l[header.index(typ)].strip()
if len(idx) == 0:
continue
row = hbase_index_table.row(idx)
old_ids = row.get('d:s')
if old_ids is not None:
ids = json.dumps(list(set(json.loads(old_ids)).union([id_au])))
else:
ids = json.dumps([id_au])
hbase_index.put(idx, {
'd:s': ids,
'd:t': typ,
'd:b': 'ame'
})
except:
msg = 'ERROR '+str(l[header.index("ID")])
logging.info(msg, exc_info=True)