How can I make spark / hadoop ignore the .gz extension in a file and consider it uncompressed plain text?

I have the code in the lines:

val lines: RDD[String] = sparkSession.sparkContext.textFile("s3://mybucket/file.gz")

The URL ends with .gz, but this is the result of legacy code. A file is plain text without compression. However, the spark insists on reading it as a gzip file, which obviously fails. How can I make it ignore the extension and just read the file as text?

Based on this article, I tried configuring in different places where the GZIP codec is not enabled, for example:

sparkContext.getConf.set("spark.hadoop.io.compression.codecs", classOf[DefaultCodec].getCanonicalName)

This has no effect.

Since the files are on S3, I cannot just rename them without copying the whole file.

+4
source share
1

: GzipCodec

/ GzipCodec, org.apache.hadoop.io.compress, java :

public String getDefaultExtension() {
  return ".gz";
}

:

public String getDefaultExtension() {
  return ".whatever";
}

GzipCodec , ( GzipCodec).

, textFile() , , gzip .

. @AlexHall, , (), GzipCodec ( ).


: newAPIHadoopFile / TextInputFormat

(TextInputFormat). , TextInputFormat ( , sparkContext.textFile(), RDD), .gz , , .

textFile,

sparkContext.textFile("s3://mybucket/file.gz")

sparkContext.newAPIHadoopFile:

import org.apache.hadoop.mapreduce.lib.input.FakeGzInputFormat
import org.apache.spark.SparkContext
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}

sparkContext.newAPIHadoopFile(
  "s3://mybucket/file.gz",
  classOf[FakeGzInputFormat],
  classOf[LongWritable],
  classOf[Text],
  new Configuration(sparkContext.hadoopConfiguration)
)
.map { case (_, text) => text.toString }

newAPIHadoopFile TextInputFormat. , ".gz". FakeGzInputFormat, .

, FakeGzInputFormat, TextInputFormat (, src/mainjava/org/apache/hadoop/mapreduce/lib/input/FakeGzInputFormat.java):

package org.apache.hadoop.mapreduce.lib.input;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import com.google.common.base.Charsets;

public class FakeGzInputFormat extends TextInputFormat {

  public RecordReader<LongWritable, Text> createRecordReader(
    InputSplit split,
    TaskAttemptContext context
  ) {

    String delimiter =
      context.getConfiguration().get("textinputformat.record.delimiter");

    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);

    // return new LineRecordReader(recordDelimiterBytes);
    return new FakeGzLineRecordReader(recordDelimiterBytes);
  }
}

LineRecordReader .

, LineRecordReader (in src/mainjava/org/apache/hadoop/mapreduce/lib/input/FakeGzLineRecordReader.java ) initialize(InputSplit genericSplit, TaskAttemptContext context), , gz, :

package org.apache.hadoop.mapreduce.lib.input;
// same imports as original

// public class LineRecordReader extends RecordReader<LongWritable, Text> {
public class FakeGzLineRecordReader extends RecordReader<LongWritable, Text> {

// ...

public FakeGzLineRecordReader(byte[] recordDelimiter) {
  this.recordDelimiterBytes = recordDelimiter;
}

// ...

// Here we remove all the stuff related to compressed files:
public void initialize(InputSplit genericSplit,
                       TaskAttemptContext context) throws IOException {
  FileSplit split = (FileSplit) genericSplit;
  Configuration job = context.getConfiguration();
  this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
  start = split.getStart();
  end = start + split.getLength();
  final Path file = split.getPath();

  // open the file and seek to the start of the split
  final FileSystem fs = file.getFileSystem(job);
  fileIn = fs.open(file);

  fileIn.seek(start);
  in = new UncompressedSplitLineReader(
      fileIn, job, this.recordDelimiterBytes, split.getLength());
  filePosition = fileIn;

  if (start != 0) {
    start += in.readLine(new Text(), 0, maxBytesToConsume(start));
  }
  this.pos = start;
}

: Spark 2.2. Spark 2.1, TextInputFormat 2.1 2.2, TextInputFormat, Spark 2.1.

( gz ) .

+2

Source: https://habr.com/ru/post/1694473/


All Articles