: GzipCodec
/ GzipCodec, org.apache.hadoop.io.compress, java :
public String getDefaultExtension() {
return ".gz";
}
:
public String getDefaultExtension() {
return ".whatever";
}
GzipCodec , ( GzipCodec).
, textFile() , , gzip .
. @AlexHall, , (), GzipCodec ( ).
: newAPIHadoopFile / TextInputFormat
(TextInputFormat). , TextInputFormat ( , sparkContext.textFile(), RDD), .gz , , .
textFile,
sparkContext.textFile("s3://mybucket/file.gz")
sparkContext.newAPIHadoopFile:
import org.apache.hadoop.mapreduce.lib.input.FakeGzInputFormat
import org.apache.spark.SparkContext
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
sparkContext.newAPIHadoopFile(
"s3://mybucket/file.gz",
classOf[FakeGzInputFormat],
classOf[LongWritable],
classOf[Text],
new Configuration(sparkContext.hadoopConfiguration)
)
.map { case (_, text) => text.toString }
newAPIHadoopFile TextInputFormat. , ".gz". FakeGzInputFormat, .
, FakeGzInputFormat, TextInputFormat (, src/mainjava/org/apache/hadoop/mapreduce/lib/input/FakeGzInputFormat.java):
package org.apache.hadoop.mapreduce.lib.input;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.google.common.base.Charsets;
public class FakeGzInputFormat extends TextInputFormat {
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split,
TaskAttemptContext context
) {
String delimiter =
context.getConfiguration().get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new FakeGzLineRecordReader(recordDelimiterBytes);
}
}
LineRecordReader .
, LineRecordReader (in src/mainjava/org/apache/hadoop/mapreduce/lib/input/FakeGzLineRecordReader.java ) initialize(InputSplit genericSplit, TaskAttemptContext context), , gz, :
package org.apache.hadoop.mapreduce.lib.input;
public class FakeGzLineRecordReader extends RecordReader<LongWritable, Text> {
public FakeGzLineRecordReader(byte[] recordDelimiter) {
this.recordDelimiterBytes = recordDelimiter;
}
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
fileIn.seek(start);
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
: Spark 2.2. Spark 2.1, TextInputFormat 2.1 2.2, TextInputFormat, Spark 2.1.
( gz ) .