Difference between df.repartition and DataFrameWriter partitionBy?

What is the difference between the DataFrame repartition() and DataFrameWriter partitionBy() methods?

I hope both are used to "split data based on dataframe column"? Or is there any difference?

+41
source share
3 answers

If you run repartition(COL) , you will change the separation during the calculations - you will get spark.sql.shuffle.partitions (default: 200) sections. If you then call .write , you will get one directory with many files.

If you run .write.partitionBy(COL) , you will end up with as many directories as there are unique values ​​in COL. This speeds up further reading of the data (if you filter by partitioning columns) and saves some storage space (the partitioning column is deleted from the data files).

UPDATE : see @conradlee answer. He explains in detail not only how the directory structure will look after applying various methods, but also how the number of files will be obtained in both scenarios.

+24
source

Beware: I believe that the accepted answer is not quite right! I am glad that you are asking this question, because the behavior of these functions with the same name differs in important and unexpected ways, which are not well documented in the official documentation for the lawsuit.

The first part of the accepted answer is correct: calling df.repartition(COL, numPartitions=k) will create a frame with partitions k using a partitioning hash. COL defines a split key here — it can be a single column or a list of columns. A hash-based delimiter takes each key of the input line partition key, hashes it into the partition space k through something like partition = hash(partitionKey) % k . This ensures that all rows with the same key are split into one partition. However, strings from several partition keys may also appear in the same partition (when a hash clashes between the partition keys), and some partitions may be empty.

So the non-intuitive aspects of df.repartition(COL, numPartitions=k) are that

  • partitions will not strictly separate partition keys
  • some of your k sections may be empty, while others may contain lines from several sections.

The behavior of df.write.partitionBy completely different, so many users do not expect it. Let's say that you want your output files to be split by date, and your data takes more than 7 days. Suppose also that df has 10 sections. When you run df.write.partitionBy('day') , how many output files should you expect? The answer is "it depends." If each section of your initial sections in df contains data from each day, then the answer is 70. If each of your initial sections in df contains data in exactly one day, then the answer will be 10.

How can we explain this behavior? When you run df.write , each of the source sections in df written independently. That is, each of your initial 10 sections is subdivided separately into the "day" column, and a separate file is recorded for each section.

I find this behavior quite annoying and wish there was a way to do global redistribution when writing data.

+131
source

repartition() used to separate data in memory, and partitionBy used to separate data on disk. They are often used together, as described in this blog post .

Both repartition() and partitionBy can be used to “split data based on a data column”, but repartition() splits data in memory, and partitionBy splits data on disk.

redistribution ()

Let's play with the code to better understand the break. Suppose you have the following CSV data.

 first_name,last_name,country Ernesto,Guevara,Argentina Vladimir,Putin,Russia Maria,Sharapova,Russia Bruce,Lee,China Jack,Ma,China 

df.repartition(col("country")) redistribute data by country in memory.

Let's write out the data so that we can check the contents of each section of memory.

 val outputPath = new java.io.File("./tmp/partitioned_by_country/").getCanonicalPath df.repartition(col("country")) .write .csv(outputPath) 

Here's how the data is written to disk:

 partitioned_by_country/ part-00002-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv part-00044-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv 

Each file contains data for one country - for example, the file part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv contains data for China:

 Bruce,Lee,China Jack,Ma,China 

partitionBy ()

Let's write the data to disk using partitionBy and see how the output of the file system differs.

Here is the code for writing data to disk partitions.

 val outputPath = new java.io.File("./tmp/partitionedBy_disk/").getCanonicalPath df .write .partitionBy("country") .csv(outputPath) 

Here's what the data looks like on disk:

 partitionedBy_disk/ country=Argentina/ part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000.csv country=China/ part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000 country=Russia/ part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000 

Why split data to disk?

Sharing data on disk can make some queries much faster, as described in this blog post .

0
source

Source: https://habr.com/ru/post/1012010/


All Articles