Merge two Spark DataFrames based on the size of the intersection of the two columns of the array

I have two DataFramein my sparks (v1.5.0):

aDF = [user_id : Int, user_purchases: array<int> ]
bDF = [user_id : Int, user_purchases: array<int> ]

What I want to do is combine these two data frames, but I only need the rows where the intersection between aDF.user_purchasesand bDF.user_purchaseshas more than two elements (intersection> 2).

Do I need to use the RDD API or can I use any function from org.apache.sql.functions?

+4
source share
2 answers

I do not see the built-in function, but you can use UDF:

import scala.collection.mutable.WrappedArray;
val intersect = udf ((a : WrappedArray[Int], b : WrappedArray[Int]) => {
   var count = 0;
   a.foreach (x => {
       if (b.contains(x)) count = count + 1;
    });
    count;
});
// test data sets
val one = sc.parallelize(List(
        (1, Array(1, 2, 3)), 
        (2, Array(1,2 ,3, 4)), 
        (3, Array(1, 2,3)), 
        (4, Array(1,2))
        )).toDF("user", "arr");

val two = sc.parallelize(List(
        (1, Array(1, 2, 3)), 
        (2, Array(1,2 ,3, 4)), 
        (3, Array(1, 2, 3)), 
        (4, Array(1))
        )).toDF("user", "arr");

// usage
one.join(two, one("user") === two("user"))
    .select (one("user"), intersect(one("arr"), two("arr")).as("intersect"))
    .where(col("intersect") > 2).show

// version from comment
one.join(two)
    .select (one("user"), two("user"), intersect(one("arr"), two("arr")).as("intersect")).
    where('intersect > 2).show
+1
source

- . :

import org.apache.spark.sql.functions.explode

:

val aDF_ = aDF.toDF("a_user_id", "a_user_purchases")
val bDF_ = bDF.toDF("b_user_id", "b_user_purchases")

, , :

val filtered = aDF_.withColumn("purchase", explode($"a_user_purchases"))
  .join(bDF_.withColumn("purchase", explode($"b_user_purchases")), Seq("purchase"))
  .groupBy("a_user_id", "b_user_id")
  .count()
  .where($"count" > 2)

, :

filtered.join(aDF_, Seq("a_user_id")).join(bDF_, Seq("b_user_id")).drop("count")

Spark 2.4 :

import org.apache.spark.sql.functions.{size, array_intersect}

aDF_
  .crossJoin(bDF_)
  .where(size(
    array_intersect($"a_user_purchases", $"b_user_purchases"
  )) > 2)

, -.

+1

Source: https://habr.com/ru/post/1662021/


All Articles