Spark: the most efficient way to sort and split data that needs to be written as parquet

My data is basically a table that contains an ID column and a GROUP_ID column, among other "data".

At the first stage, I read CSV in Spark, do some processing to prepare the data for the second step, and write the data as parquet. The second step is a lot of groupBy('GROUP_ID') and Window.partitionBy('GROUP_ID').orderBy('ID') .

Now the goal is to avoid shuffling in the second step - to efficiently load data in the first step, since this is one timer.

Question Part 1: AFAIK, Spark preserves the partition when loading from the floor (which is actually the basis of any "optimized record consideration") - right?

I came up with three possibilities:

  • df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')
  • df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet')
  • df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')

I would set n so that the individual parquet files are ~ 100 MB.

Question Part 2: Is it right that the three options give β€œthe same” / similar results in relation to the goal (avoid shuffling in the second step)? If not, what is the difference? And which one is "better"?

Question. Part 3:. Which of the three options works best with respect to step 1?

Thank you for sharing your knowledge!


EDIT 2017-07-24

After performing some tests (writing and reading from the floor), it seems that Spark cannot restore the partitionBy and orderBy information by default in the second step. The number of partitions (as obtained from df.rdd.getNumPartitions() apparently determined by the number of cores and / or spark.default.parallelism (if installed), but not by the number of parquet partitions, so the answer to question 1 will be WRONG , but questions 2 and 3 will be irrelevant.

So it turns out that REAL QUESTION : is there any way to tell Spark that the data is already separated by column X and sorted by column Y

+13
source share
3 answers

As far as I know, NO, there is no way to read the data from the floor and tell Spark that they are already separated by some expression and ordered.

In short, one file in HDFS, etc. Too large for a single Spark section. And even if you read the entire file in one section, playing with Parquet properties, such as parquet.split.files=false , parquet.task.side.metadata=true , etc., there will be more cost compared to one shuffle.

0
source

Try bucketBy. Partition detection may also help.

0
source

You might be interested in Spark support.

See details here https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-bucketing.html

 large.write .bucketBy(4, "id") .sortBy("id") .mode(SaveMode.Overwrite) .saveAsTable(bucketedTableName) 

Note that Spark 2.4 adds support for bucket pruning partition pruning (e.g. partition pruning ).

The more direct functionality that you pay attention to is Hive-sorted tables https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSortedTables. This is not yet available in Spark (see PS Section below))

Also note that the sorting information will not be loaded by Spark automatically, but since the data has already been sorted ... the sorting operation will actually be much faster than doing a lot of work - for example, transmitting data only to confirm that it already sorted.

PS. Spark and Hive buckets are slightly different. This is an Spark compatibility ticket for tables with packages created in Hive - https://issues.apache.org/jira/browse/SPARK-19256

0
source

Source: https://habr.com/ru/post/1270034/


All Articles