Create a Scalding Source like TextLine, which merges several files into separate mappers

We have many small files that need to be combined. In Scalding, you can use TextLine to read files as text strings. The problem is that we get 1 mapper per file, but we want to combine several files so that they are processed by 1 mapper.

I understand that we need to change the input format to the implementation of CombineFileInputFormat , and this may be due to the use of cascadings CombinedHfs . We cannot decide how to do this, but to define our own Scalding source, say CombineTextLine , there should be only a few lines of code.

Many thanks to everyone who can provide the code for this.

As a side issue, we have some data that is in s3, it would be great if this solution works for s3 files - I think it depends on whether CombineFileInputFormat or CombinedHfs works for s3.

+4
scala hadoop cascading scalding
May 28 '14 at 16:45
source share
2 answers

You get an idea in your question, so here is what might be the solution for you.

Create your own input format that extends CombineFileInputFormat and uses its own RecordReader . I am showing you Java code, but you can easily convert it to scala if you want.

 import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileRecordReader; import org.apache.hadoop.mapred.lib.CombineFileSplit; public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> { public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> { private final RecordReader<LongWritable,Text> delegate; public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException { FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations()); delegate = new LineRecordReader(conf, fileSplit); } @Override public boolean next(LongWritable key, Text value) throws IOException { return delegate.next(key, value); } @Override public LongWritable createKey() { return delegate.createKey(); } @Override public Text createValue() { return delegate.createValue(); } @Override public long getPos() throws IOException { return delegate.getPos(); } @Override public void close() throws IOException { delegate.close(); } @Override public float getProgress() throws IOException { return delegate.getProgress(); } } @Override public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { return new CombineFileRecordReader(job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class); } } 

Then you need to extend the TextLine class and use its own input format that you just defined (Scala from now on).

 import cascading.scheme.hadoop.TextLine import cascading.flow.FlowProcess import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf} import cascading.tap.Tap import com.twitter.scalding.{FixedPathSource, TextLineScheme} import cascading.scheme.Scheme class CombineFileTextLine extends TextLine{ override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) { super.sourceConfInit(flowProcess, tap, conf) conf.setInputFormat(classOf[CombinedInputFormat[String, String]]) } } 

Create a schema for combined input.

 trait CombineFileTextLineScheme extends TextLineScheme{ override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]] } 

Finally, create your source class:

 case class CombineFileMultipleTextLine(p : String*) extends FixedPathSource(p :_*) with CombineFileTextLineScheme 

If you want to use one path instead of several, changing to the original class is trivial.

I hope this helps.

+9
May 29 '14 at 14:06
source share
— -

That should do the trick, am I human? - https://wiki.apache.org/hadoop/HowManyMapsAndReduces

0
May 28 '14 at 20:41
source share



All Articles