When is the cache executed and saved (since they don't seem to be actions)?

I am using a spark application, from which below is an example fragment (not the same code):

val rdd1 = sc.textfile(HDFS_PATH)
val rdd2 = rdd1.map(func)
rdd2.persist(StorageLevel.MEMORY_AND_DISK)
println(rdd2.count)

When checking the performance of this code from the Spark user interface, I see an entry for the action count, but not for persist. The DAG for this counter action also has a node for converting the "map" (line 2 of the above code).

Is it possible to conclude that the map conversion is performed when it occurs count(in the last line), and not when persistit occurs?

Also, at what point does rdd2 actually persist? I understand that only two types of operations can be called on RDDs - transforms and actions. If the RDD is persistently lazy when the action is called count, will the persist be considered a transformation or an action or not?

+4
source share
2 answers

Is it possible to conclude that the map conversion is performed when the account is found (in the last line), and not when persist is encountered?

Yes

Also, at what point is rdd2 actually stored?

Data is read, displayed, and saved at the same time as the count statement

will persist be considered a conversion or action or not?

, . Spark , . , Spark . , persist

+1

cache persist , ( , ).

Spark RDD ( ):

Spark ( ) . RDD, node , , ( , ). ( 10 ). - .

RDD, , persist() cache(). , , . - - RDD , , .

( Spark SQL!) :

rdd2.persist(StorageLevel.MEMORY_AND_DISK).count

.

count , , ( ).

, :

persist , ?

, , ( ).


" -", , ( RDD) .

enter image description here

cache persist explain ( QueryExecution.optimizedPlan).

val q1 = spark.range(10).groupBy('id % 5).count.cache
scala> q1.explain
== Physical Plan ==
InMemoryTableScan [(id % 5)#84L, count#83L]
   +- InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)])
            +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
               +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)])
                  +- *Range (0, 10, step=1, splits=8)

scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
01    +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L])
02       +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
03          +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L])
04             +- *Range (0, 10, step=1, splits=8)

// Cache sample table range5 using pure SQL
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q2 = spark.sql("SELECT * FROM range5")
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#0L]
   +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
         +- *Range (0, 5, step=1, splits=8)

InMemoryTableScan ( InMemoryRelation) - , , , , .


, Spark SQL DataFrame SQL CACHE TABLE query (, RDD, ):

if (!isLazy) {
  // Performs eager caching
  sparkSession.table(tableIdent).count()
}

, . cache persist , SQL CACHE TABLE .

+8

Source: https://habr.com/ru/post/1677256/


All Articles