Using a module with udf defined internally freezes pyspark - explanation?

Here is the situation:

We have a module where we define some functions that return pyspark.sql.DataFrame (DF). To get these pyspark.sql.functions.udf , we use some pyspark.sql.functions.udf , defined both in one file and in auxiliary modules. When we actually write the work for pyspark to execute, we import only the functions from the modules (we provide the .zip file to --py-files ) and then just save the dataframe to hdfs.

The problem is that when we do this, the udf function freezes our work. The disgusting fix that we discovered was to define the udf functions inside the job and provide them with imported functions from our module. Another fix I found here is to define a class:

 from pyspark.sql.functions import udf class Udf(object): def __init__(s, func, spark_type): s.func, s.spark_type = func, spark_type def __call__(s, *args): return udf(s.func, s.spark_type)(*args) 

Then use this to define my udf module. It works!

Can someone explain why we have this problem in the first place? And why does this fix (the last with the class definition) work?

Additional information: PySpark 2.1.0. Deploying yarn jobs in cluster mode.

Thanks!

+5
source share

Source: https://habr.com/ru/post/1269816/


All Articles