Hasoop CompositeInputFormat does not merge all data

I am currently working with Hadoop 0.20.2 and the old API. What I want to do to join the map. I have a graph dataset consisting of two files, one with edges, and the other with nodes. The edges are in the format of the "label of the target source", and the nodes "nodeID" are "label 0" and sorted by the first value.

Everything works fine with small examples of toys (6 nodes and 8 edges), but when I scale it, it only joins the first part of the input files. I can not find the reason for this, because everything must be in the correct format and sorted. Did I miss something?

For "debugging" I just printed out the key with the values ​​in TupleWritable with the indentation on the map. Here you can see http://pastebin.com/rcNx2r5c .

The nodes file is 9416 bytes, and the edge file is 15797 bytes and are completely output as a result. But after key 98, the connection is terminated. Then it first displays the edges, and then the nodes, however both node and edge with key 99.

My job setup for CompositeInputFormat:

conf.setInputFormat(CompositeInputFormat.class); Path[] input = new Path[] { inputNodes, inputEdges}; conf.set("key.value.separator.in.input.line", "\t"); conf.set("mapred.join.expr", CompositeInputFormat.compose( "outer", KeyValueTextInputFormat.class, input ) ); 

Any help would be appreciated, so thanks in advance!

EDIT . I solved the problem. For those interested; The problem was KeyValueTextInputFormat. It has a key and a value as in the text, where I should have a LongWritable key and a Text value. Although I thought this would not be a problem, it does not seem to help. So I made my own input format based on KeyValueTextInputFormat.

 public class KeyValueLongInputFormat extends FileInputFormat<LongWritable, Text> implements JobConfigurable { private CompressionCodecFactory compressionCodecs = null; @Override public void configure(JobConf conf) { compressionCodecs = new CompressionCodecFactory(conf); } protected boolean isSplitable(FileSystem fs, Path file) { return compressionCodecs.getCodec(file) == null; } @Override public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new KeyValueLongLineRecordReader(job, (FileSplit) genericSplit); } } 

and

 public class KeyValueLongLineRecordReader implements RecordReader<LongWritable, Text> { private final LineRecordReader lineRecordReader; private byte separator = (byte) '\t'; private LongWritable dummyKey; private Text innerValue; public Class getKeyClass() { return LongWritable.class; } public LongWritable createKey() { return new LongWritable(); } public Text createValue() { return new Text(); } public KeyValueLongLineRecordReader(Configuration job, FileSplit split) throws IOException { lineRecordReader = new LineRecordReader(job, split); dummyKey = lineRecordReader.createKey(); innerValue = lineRecordReader.createValue(); String sepStr = job.get("key.value.separator.in.input.line", "\t"); this.separator = (byte) sepStr.charAt(0); } public static int findSeparator(byte[] utf, int start, int length, byte sep) { for (int i = start; i < (start + length); i++) { if (utf[i] == sep) { return i; } } return -1; } /** Read key/value pair in a line. */ public synchronized boolean next(LongWritable key, Text value) throws IOException { LongWritable tKey = key; Text tValue = value; byte[] line = null; int lineLen = -1; if (lineRecordReader.next(dummyKey, innerValue)) { line = innerValue.getBytes(); lineLen = innerValue.getLength(); } else { return false; } if (line == null) return false; int pos = findSeparator(line, 0, lineLen, this.separator); if (pos == -1) { tKey.set(Long.valueOf(new String(line, 0, lineLen))); tValue.set(""); } else { int keyLen = pos; byte[] keyBytes = new byte[keyLen]; System.arraycopy(line, 0, keyBytes, 0, keyLen); int valLen = lineLen - keyLen - 1; byte[] valBytes = new byte[valLen]; System.arraycopy(line, pos + 1, valBytes, 0, valLen); tKey.set(Long.valueOf(new String(keyBytes))); tValue.set(valBytes); } return true; } public float getProgress() { return lineRecordReader.getProgress(); } public synchronized long getPos() throws IOException { return lineRecordReader.getPos(); } public synchronized void close() throws IOException { lineRecordReader.close(); } } 

The problem is fixed.

+4
source share

Source: https://habr.com/ru/post/1446431/


All Articles