Has Spark 2.3 changed the way you handle small files?

I just started playing with Spark 2+ (version 2.3), and I noticed something strange when I look at Spark UI. I have a list of directories in an HDFS cluster containing a total of 24,000 small files.

When I want to run a Spark action on them, Spark 1.5 creates a separate task for each input file since I have been used so far. I know that each HDFS block (in my case, one small file is one block) generates one section in Spark, and each section is handled by a separate task.

Spark 1.5 UI screenshot

In addition, the team my_dataframe.rdd.getNumPartitions()outputs 24,000.

Now about Spark 2.3. At the same input, the command my_dataframe.rdd.getNumPartitions()outputs 1089. The Spark interface also generates 1089 tasks for my Spark action. You can also see that the number of generated jobs is greater in spark 2.3 and then 1.5

Spark 2.3 UI screenshot

The code is the same for both versions of Spark (I need to slightly change the data structure, paths and column names, because this is the code from work):

%pyspark
dataframe = sqlContext.\
                read.\
                parquet(path_to_my_files)
dataframe.rdd.getNumPartitions()
dataframe.\
    where((col("col1") == 21379051) & (col("col2") == 2281643649) & (col("col3") == 229939942)).\
    select("col1", "col2", "col3").\
    show(100, False)

Here is the physical plan generated by

dataframe.where(...).select(...).explain(True)
Spark 1.5
== Physical Plan ==
Filter (((col1 = 21379051) && (col2 = 2281643649)) && (col3 = 229939942))
 Scan ParquetRelation[hdfs://cluster1ns/path_to_file][col1#27,col2#29L,col3#30L]
Code Generation: true

Spark 2.3
== Physical Plan ==
*(1) Project [col1#0, col2#2L, col3#3L]
+- *(1) Filter (((isnotnull(col1#0) && (col1#0 = 21383478)) && (col2 = 2281643641)) && (col3 = 229979603))
   +- *(1) FileScan parquet [col1,col2,col3] Batched: false, Format: Parquet, Location: InMemoryFileIndex[hdfs://cluster1ns/path_to_file..., PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: struct<col1:bigint,col2:bigint,col3:bigint>....

The above jobs were created from Zeppelin using pyspark. Is there anyone else who has encountered this situation with sparks 2.3? I have to say that I like the new way to handle several small files, but I would also like to know about possible internal Spark changes.

, " ", Spark .

- , . !

+4

Source: https://habr.com/ru/post/1695456/


All Articles