How to improve connection speed with a condition in Spark?

I have two data frames A and B. A is large (100 G), and B is relatively small (100 M). Section number A is 8, and section number B is 1.

A.join(broadcast(B), $"cur" >= $"low" &&  $"cur" <= $"high", "left_outer")

The speed is rather slow (> 10 hours).

But if I changed the join condition to:

A.join(broadcast(B), $"cur" === $"low" , "left_outer")

It becomes extremely fast (<30 minutes). But the condition cannot be changed.

So, are there any ways to further improve the connection speed in my original connection state?

source share
1 answer

, join, =, . .

, :

val a = spark.range(100000)
  .withColumn("cur", (rand(1) * 1000).cast("bigint"))

val b = spark.range(100)
  .withColumn("low", (rand(42) * 1000).cast("bigint"))
  .withColumn("high", ($"low" + rand(-42) * 10).cast("bigint"))

, . 50:

val bucketSize = 50L


val aBucketed = a.withColumn(
  "bucket", ($"cur" / bucketSize).cast("bigint") * bucketSize

UDF, :

def get_buckets(bucketSize: Long) = 
  udf((low: Long, high: Long) => {
    val min = (low / bucketSize) * bucketSize
    val max = (high / bucketSize) * bucketSize
    (min to max by bucketSize).toSeq


val bBucketed = b.withColumn(
  "bucket", explode(get_buckets(bucketSize)($"low",  $"high"))


  aBucketed("bucket") === bBucketed("bucket") && 
    $"cur" >= $"low" &&  
    $"cur" <= $"high",

, Spark BroadcastHashJoin:

*BroadcastHashJoin [bucket#184L], [bucket#178L], LeftOuter, BuildRight, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cur#98L, (cast((cast(cur#98L as double) / 50.0) as bigint) * 50) AS bucket#184L]
:  +- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
:     +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[3, bigint, false]))
   +- Generate explode(if ((isnull(low#105L) || isnull(high#109L))) null else UDF(low#105L, high#109L)), true, false, [bucket#178L]
      +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
         +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
            +- *Range (0, 100, step=1, splits=Some(8))


== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
:  +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange IdentityBroadcastMode
   +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
      +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
         +- *Range (0, 100, step=1, splits=Some(8))


, broadcast (, Array Vector) udf .

. 8 100 .




All Articles