Working with a large gzip file in Spark

I have a large (about 85 GB compressed) gzipped file from s3 that I am trying to process using Spark on AWS EMR (right now with an instance of m4.xlarge master and two instances of m4.10xlarge core with 100 GB EBS). I know that gzip is a non-splittable file format, and I saw it suggested that you need to redistribute the compressed file, because Spark initially gives RDD with one partition. However after execution

scala> val raw = spark.read.format("com.databricks.spark.csv").
     | options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")).
     | load("s3://path/to/file.gz").
     | repartition(sc.defaultParallelism * 3)
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields
scala> raw.count()

and looking at the user interface of the Spark application, I still see only one active executor (the remaining 14 are dead) with one task, and the task never ends (or at least I did not wait long enough for him).

  • What's going on here? Can someone help me understand how Spark works in this example?
  • Should I use a different cluster configuration?
  • Unfortunately, I have no control over the compression mode, but is there an alternative way to deal with such a file?
+3
source share
2 answers

If the file format is not shared, then there is no way to avoid completely reading the file on one core. To parallelize the work, you must know how to assign pieces of work to different computers. In the case of gzip, suppose you split it into 128M pieces. The nth chunk depends on the position information of the n-1st block to know how to decompress, which depends on the n-2nd block, and so on until the first.

, . - , , ( ) gzip .

+4

, .

- .gz Spark. unzip , Spark parallelism.

.gz.

import gzip
import shutil
with open('file.txt.gz', 'rb') as f_in, gzip.open('file.txt', 'wb') as f_out:
    shutil.copyfileobj(f_in, f_out)
0

Source: https://habr.com/ru/post/1672151/


All Articles