How do I know the number of Spark jobs and stages in a (broadcast) connection?

I am using Spark 2.1.2.

I am trying to understand that on the intrinsic safety tab of the user interface, vis-a-vis is displayed when the task is performed. I am using spark-shell --master localand executing the following query join:

val df = Seq(
  (55, "Canada", -1, "", 0),
  (77, "Ontario", 55, "/55", 1),
  (100, "Toronto", 77, "/55/77", 2),
  (104, "Brampton", 100, "/55/77/100", 3)
).toDF("id", "name", "parentId", "path", "depth")

val dfWithPar = df.as("df1").
  join(df.as("df2"), $"df1.parentId" === $"df2.Id", "leftouter").
  select($"df1.*", $"df2.name" as "parentName")

dfWithPar.show

This is the physical query plan:

== Physical Plan ==
*Project [Id#11, name#12, parentId#13, path#14, depth#15, name#25 AS parentName#63]
+- *BroadcastHashJoin [parentId#13], [Id#24], LeftOuter, BuildRight
   :- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- LocalTableScan [Id#24, name#25]

I have two questions about query execution.

  • Why are there two tasks to query?

    View Spark's quest

  • Why is the scene shown the same for both tasks? Below is a screenshot of the stage presentation of task identifier 1, which exactly matches task identifier 0. Why?

    The scenario of stage 1, which is exactly the same as stage 0

+4
source share
1 answer

Spark 2.3.0, (2.3.1-SNAPSHOT), . ( - ), 2.1.2 2.3.0 ( ).


dfWithPar.show ( Spark SQL Dataset API Scala) ( ).

scala> dfWithPar.explain
== Physical Plan ==
*(1) Project [Id#11, name#12, parentId#13, path#14, depth#15, name#24 AS parentName#58]
+- *(1) BroadcastHashJoin [parentId#13], [Id#23], LeftOuter, BuildRight
   :- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [Id#23, name#24]

?

, Spark.

Fixed Broadcast Connection Request Jobs in the Web Interface

tl; dr Spark BroadcastHashJoinExec, Dataset.show.

Spark , ( API Dataset) RDD API.

Spark SQL Datasets Spark Core RDD Spark. RDD - Spark ( - JVM), Datasets SQL- ( JVM-, Scala Java, - JVM, ).

, Dataset API RDD ( , Java Scala - JVM).

API- Dataset RDD API DataFrame Dataset, RDD.

, Dataset.show RDD-, , , , Spark.

Dataset.show ( numRows 20 ) showString, take (numRows + 1), Array[Row].

val takeResult = newDf.select(castCols: _*).take(numRows + 1)

, dfWithPar.show() dfWithPar.take(21), , , dfWithPar.head(21) Spark.

SQL. .

SQL tab in web user interface

show take head collectFromPlan, Spark ( executeCollect).

, - , . Spark .

enter image description here

BroadcastHashJoin BroadcastExchangeExec

BroadcastHashJoinExec , ( spark.sql.autoBroadcastJoinThreshold, 10M ).

BroadcastExchangeExec () ( BroadcastHashJoinExec).

BroadcastHashJoinExec ( RDD[InternalRow]), , , , BroadcastExchangeExec ( ).

ThreadPoolExecutor.java:1149 0.

:

// Just a single Spark job for the broadcast variable
val r = dfWithPar.rdd

, RDD, , .

enter image description here

Spark, .

RDD.take

, , , , , .. show, take head, RDD.take.

take (num: Int): Array [T] num RDD. , .

, take : " , , , , ". , Spark .

( ) Spark, 4 ,

// RDD.take
def take(num: Int): Array[T] = withScope {
  ...
  while (buf.size < num && partsScanned < totalParts) {
    ...
    val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
    ...
  }
}

RDD.take 21 .

// The other two Spark jobs
r.take(21)

2 Spark, .

enter image description here

, Spark , dfWithPar.show(1).

?

? 1, 0. ?

, Spark RDD.take(20).

Spark , , Spark .

+6

Source: https://habr.com/ru/post/1695111/


All Articles