How to implement NOT IN for two DataFrames with different structure in Apache Spark

I am using Apache Spark in my Java application. I have two DataFrame s: df1 and df2 . df1 contains a Row with email , firstName and lastName . df2 contains Row with email .

I want to create a DataFrame : df3 that contains all the lines in df1 whose email address is not in df2 .

Is there any way to do this with Apache Spark? I tried to create a JavaRDD<String> from df1 and df2 , dropping them toJavaRDD() and filtering df1 to contain all emails and after that use subtract , but I don't know how to map the new JavaRDD to ds1 and get a DataFrame .

Basically I need all the lines that are in df1 whose email address is not in df2 .

 DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer "); DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " + "WHERE product_id = '" + productId + "'"); JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0)); List<String> notBoughtEmails = customers.javaRDD() .map(row -> row.getString(0)) .subtract(customersBoughtEmail).collect(); 
+5
source share
2 answers

Spark 2.0.0 +

You can directly use NOT IN .

Spark & ​​lt; 2.0.0

It can be expressed using an external connection and a filter.

 val customers = sc.parallelize(Seq( (" john@example.com ", "John", "Doe"), (" jane@example.com ", "Jane", "Doe") )).toDF("email", "first_name", "last_name") val customersWhoOrderedTheProduct = sc.parallelize(Seq( Tuple1(" jane@example.com ") )).toDF("email") val customersWhoHaventOrderedTheProduct = customers.join( customersWhoOrderedTheProduct.select($"email".alias("email_")), $"email" === $"email_", "leftouter") .where($"email_".isNull).drop("email_") customersWhoHaventOrderedTheProduct.show // +----------------+----------+---------+ // | email|first_name|last_name| // +----------------+----------+---------+ // | john@example.com | John| Doe| // +----------------+----------+---------+ 

Raw SQL equivalent:

 customers.registerTempTable("customers") customersWhoOrderedTheProduct.registerTempTable( "customersWhoOrderedTheProduct") val query = """SELECT c.* FROM customers c LEFT OUTER JOIN customersWhoOrderedTheProduct o ON c.email = o.email WHERE o.email IS NULL""" sqlContext.sql(query).show // +----------------+----------+---------+ // | email|first_name|last_name| // +----------------+----------+---------+ // | john@example.com | John| Doe| // +----------------+----------+---------+ 
+4
source

I did this in python , in addition, I suggest you use integers as keys, not strings.

 from pyspark.sql.types import * samples = sc.parallelize([ (" abonsanto@fakemail.com ", "Alberto", "Bonsanto"), (" mbonsanto@fakemail.com ", "Miguel", "Bonsanto"), (" stranger@fakemail.com ", "Stranger", "Weirdo"), (" dbonsanto@fakemail.com ", "Dakota", "Bonsanto") ]) keys = sc.parallelize( [(" abonsanto@fakemail.com ",), (" mbonsanto@fakemail.com ",), (" dbonsanto@fakemail.com ",)] ) complex_schema = StructType([ StructField("email", StringType(), True), StructField("first_name", StringType(), True), StructField("last_name", StringType(), True) ]) simple_schema = StructType([ StructField("email", StringType(), True) ]) df1 = sqlContext.createDataFrame(samples, complex_schema) df2 = sqlContext.createDataFrame(keys, simple_schema) df1.show() df2.show() df3 = df1.join(df2, df1.email == df2.email, "left_outer").where(df2.email.isNull()).show() 
+2
source

Source: https://habr.com/ru/post/1235703/


All Articles