Extend SequenceFileInputFormat to include file name + offset

I would like to be able to create a custom InputFormat that reads the sequence files, but additionally provides the file path and offset in the file where the record is located.

To take a step back, a use case is used here: I have a sequence file containing variable-sized data. The keys are mostly irrelevant, and the values ​​are up to several megabytes containing many different fields. I would like to index some of these fields in elasticsearch along with the file name and offset. That way, I can request these fields from elasticsearch, and then use the file name and offset to go back to the sequence file and get the original record, and not store it all in ES.

I have this whole process working as one java program. The SequenceFile.Reader class conveniently provides the getPosition and seek methods for this to happen.

However, in the end, there will be a lot of terabytes of data, so I will need to convert this to a MapReduce job (possibly only for maps). Since the actual keys in the sequence file do not matter, the approach I was hoping to do is to create a custom InputFormat that extends or somehow uses SquenceFileInputFormat, but instead of returning the actual keys, instead returns a composite key consisting of the file and offset.

However, in practice this is more complicated. It seems like this should be possible, but given the actual APIs and what is being exposed, this is complicated. Any ideas? Maybe an alternative approach that I should take?

+6
source share
1 answer

In case anyone is faced with a similar problem, here is the solution I came across. In the end, I just duplicated part of the code in SequenceFileInputFormat / RecordReader and just changed it. I was hoping to write either a subclass, or a decorator, or something else ... this method is not very, but it works:

SequenceFileOffsetInputFormat.java:

 import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; public class SequenceFileOffsetInputFormat<V extends Writable> extends FileInputFormat<PathOffsetWritable, V> { private static class SequenceFileOffsetRecordReader<V extends Writable> extends RecordReader<PathOffsetWritable, V> { private SequenceFile.Reader in; private long start; private long end; private boolean more = true; private PathOffsetWritable key = null; private Writable k = null; private V value = null; private Configuration conf; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) split; conf = context.getConfiguration(); Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); this.in = new SequenceFile.Reader(fs, path, conf); try { this.k = (Writable) in.getKeyClass().newInstance(); this.value = (V) in.getValueClass().newInstance(); } catch (InstantiationException e) { throw new IOException(e); } catch (IllegalAccessException e) { throw new IOException(e); } this.end = fileSplit.getStart() + fileSplit.getLength(); if (fileSplit.getStart() > in.getPosition()) { in.sync(fileSplit.getStart()); } this.start = in.getPosition(); more = start < end; key = new PathOffsetWritable(path, start); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!more) { return false; } long pos = in.getPosition(); more = in.next(k, value); if (!more || (pos >= end && in.syncSeen())) { key = null; value = null; more = false; } else { key.setOffset(pos); } return more; } @Override public PathOffsetWritable getCurrentKey() { return key; } @Override public V getCurrentValue() { return value; } @Override public float getProgress() throws IOException, InterruptedException { if (end == start) { return 0.0f; } else { return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start)); } } @Override public void close() throws IOException { in.close(); } } @Override public RecordReader<PathOffsetWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new SequenceFileOffsetRecordReader<V>(); } @Override public List<InputSplit> getSplits(JobContext context) throws IOException { return new SequenceFileInputFormat<PathOffsetWritable, V>().getSplits(context); } @Override public long getFormatMinSplitSize() { return SequenceFile.SYNC_INTERVAL; } } 

PathOffsetWritable.java:

 import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class PathOffsetWritable implements WritableComparable<PathOffsetWritable> { private Text t = new Text(); private Path path; private long offset; public PathOffsetWritable(Path path, long offset) { this.path = path; this.offset = offset; } public Path getPath() { return path; } public long getOffset() { return offset; } public void setPath(Path path) { this.path = path; } public void setOffset(long offset) { this.offset = offset; } @Override public void readFields(DataInput in) throws IOException { t.readFields(in); path = new Path(t.toString()); offset = in.readLong(); } @Override public void write(DataOutput out) throws IOException { t.set(path.toString()); t.write(out); out.writeLong(offset); } @Override public int compareTo(PathOffsetWritable o) { int x = path.compareTo(o.path); if (x != 0) { return x; } else { return Long.valueOf(offset).compareTo(Long.valueOf(o.offset)); } } } 
+5
source

Source: https://habr.com/ru/post/953264/


All Articles