How to avoid shuffling when connecting DataFrames to unique keys?

I have two DataFrames Aand B:

  • Ahas columns (id, info1, info2)with approximately 200 million rows
  • Bonly has a column idwith 1 million rows

The column idis unique in both DataFrames.

I want a new DataFrame that filters Ato include only values ​​from B.

if B was very small, I know that I would do something in the lines

A.filter($("id") isin B("id"))

but Bstill quite large, so not all of this can fit as a broadcast variable.

and i know i can use

A.join(B, Seq("id"))

but this would not affect the uniqueness, and I am afraid that this will lead to unnecessary shuffles.

?

+4
3

:

  • , ( , , , ). Spark , (, ). () (), .

    , , . spark.sql.autoBroadcastJoinThreshold , .

  • . RDD , . , , , . , RDD ( ), , ( ). , , -, . , .

  • / , (2) OOM, . (dataframe.write.partitionBy()). , "" .

+4

- Dataframe A, , "Join and Shuffle".

Partitioner:

A.join(B, Seq("id"))

, , . , . enter image description here

HashPartitioner: partitionBy() Dataframe, Spark , hash-partitioned, join() . , A.join(B, Seq ( "id" )), Spark B RDD. B , A, B

:

 val A = sc.sequenceFile[id, info1, info2]("hdfs://...")
     .partitionBy(new HashPartitioner(100)) // Create 100 partitions
     .persist()
 A.join(B, Seq("id"))

enter image description here

.

+3

, , DataFrame B node, (.. id DataFrame A) node , (.. ).

, , :

import org.apache.spark.sql.functions.broadcast

val joinExpr = A.col("id") === B.col("id")

val filtered_A = A.join(broadcast(B), joinExpr, "left_semi")

filtered_A.explain(), , .

+1

Source: https://habr.com/ru/post/1676592/


All Articles