Let's say I have two PySpark df1 and df2 data df2 .
df1= 'a' 1 2 5 df2= 'b' 3 6
And I want to find the closest df2['b'] value for each df1['a'] and add the closest values ββas a new column in df1 .
In other words, for each value of x in df1['a'] I want to find y that reaches min(abx(xy)) for all y in df2['b'] (note: it can be assumed that there is only one y , which can reach a minimum distance), and the result will be
'a' 'b' 1 3 2 3 5 6
I tried the following code to first create a distance matrix (before finding values ββthat reach the minimum distance):
from pyspark.sql.types import IntegerType from pyspark.sql.functions import udf def dict(x,y): return abs(xy) udf_dict = udf(dict, IntegerType()) sql_sc = SQLContext(sc) udf_dict(df1.a, df2.b)
which gives
Column<PythonUDF
Then i tried
sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
which works forever without providing error / output.
My questions:
- As I am new to Spark, is my way to build an output DataFrame efficient? (My path would create a distance matrix for all values ββof
a and b , and then find min one) - What happened to the last line of my code and how to fix it?
source share