Pyspark isin function

I'm a beginner - Spark. I am converting my old Python code to Spark using Pyspark.

I would like to get the equivalent of Pyspark code below

usersofinterest = actdataall[actdataall['ORDValue'].isin(orddata['ORDER_ID'].unique())]['User ID'] 

Both actdataall and orddata are Spark frames.

I do not want to use the toPandas() function, given the disadvantage associated with it.

Any help is appreciated.

+2
source share
3 answers

So, you have two blocks of intrinsic safety data. One is actdataall and the other is orddata, then use the following command to get the result of your desire.

 usersofinterest = actdataall.where(actdataall['ORDValue'].isin(orddata.select('ORDER_ID').distinct().rdd.flatMap(lambda x:x).collect()[0])).select('User ID') 
0
source
  • If both datasets are large, you should consider using an internal join that will act as a filter:

    First, create a framework containing the order IDs we want to keep:

     orderid_df = orddata.select(orddata.ORDER_ID.alias("ORDValue")).distinct() 

    Now let's attach it to our actdataall file frame:

     usersofinterest = actdataall.join(orderid_df, "ORDValue", "inner").select('User ID').distinct() 
  • If your target list of order IDs is small, you can use the pyspark.sql isin function indicated in the furianpandit message, do not forget to pass your variable before using it (the spark will copy the object to each node, it makes its tasks much faster):

     orderid_list = orddata.select('ORDER_ID').distinct().rdd.flatMap(lambda x:x).collect()[0] sc.broadcast(orderid_list) 
0
source

The most direct translation of your code:

 from pyspark.sql import functions as F # collect all the unique ORDER_IDs to the driver order_ids = [x.ORDER_ID for x in orddata.select('ORDER_ID').distinct().collect()] # filter ORDValue column by list of order_ids, then select only User ID column usersofinterest = actdataall.filter(F.col('ORDValue').isin(order_ids)).select('User ID') 

However, you should only filter this only if the number "ORDER_ID" is definitely small (possibly less than 100,000 or so).

If the number ORDER_ID is large, you should use a broadcast variable that sends an order_ids list to each executor so that it can be compared with order_ids locally for faster processing. Please note: this will work even if "ORDER_ID" is small.

 order_ids = [x.ORDER_ID for x in orddata.select('ORDER_ID').distinct().collect()] order_ids_broadcast = sc.broadcast(order_ids) # send to broadcast variable usersofinterest = actdataall.filter(F.col('ORDValue').isin(order_ids_broadcast.value)).select('User ID') 

For more information on broadcast variables, check out: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html

0
source

Source: https://habr.com/ru/post/1271071/


All Articles