Functions from a custom module that does not work in PySpark, but they work when typed interactively

I have a module that I wrote containing functions that act on PySpark DataFrames. They perform column-by-column conversion in a DataFrame, and then return a new DataFrame. Here is a sample code abbreviated to include only one of the functions:

from pyspark.sql import functions as F from pyspark.sql import types as t import pandas as pd import numpy as np metadta=pd.DataFrame(pd.read_csv("metadata.csv")) # this contains metadata on my dataset def str2num(text): if type(text)==None or text=='' or text=='NULL' or text=='null': return 0 elif len(text)==1: return ord(text) else: newnum='' for lettr in text: newnum=newnum+str(ord(lettr)) return int(newnum) str2numUDF = F.udf(lambda s: str2num(s), t.IntegerType()) def letConvNum(df): # df is a PySpark DataFrame #Get a list of columns that I want to transform, using the metadata Pandas DataFrame chng_cols=metadta[(metadta.comments=='letter conversion to num')].col_name.tolist() for curcol in chng_cols: df=df.withColumn(curcol, str2numUDF(df[curcol])) return df 

So this is my module, name it mymodule.py. If I run the PySpark shell, and I do the following:

 import mymodule as mm myf=sqlContext.sql("select * from tablename lim 10") 

I am checking myf (PySpark DataFrame) and that is fine. I verify that I actually imported mymodule while trying to use the str2num function:

 mm.str2num('a') 97 

Thus, it actually imports the module. Then if I try this:

 df2=mm.letConvNum(df) 

And do this to verify that it worked:

 df2.show() 

He tries to execute the action, but then it fails:

  16/03/10 16:10:44 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 365) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main command = pickleSer._read_with_length(infile) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length return self.loads(obj) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads return pickle.loads(obj) File "test2.py", line 16, in <module> str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType()) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf return UserDefinedFunction(f, returnType) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__ self._judf = self._create_judf(name) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD [x._jbroadcast for x in sc._pickled_broadcast_vars], AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars' at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397) at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/03/10 16:10:44 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/dataframe.py", line 256, in show print(self._jdf.showString(n, truncate)) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in deco return f(*a, **kw) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o7299.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 365, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main command = pickleSer._read_with_length(infile) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length return self.loads(obj) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads return pickle.loads(obj) File "test2.py", line 16, in <module> str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType()) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf return UserDefinedFunction(f, returnType) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__ self._judf = self._create_judf(name) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD [x._jbroadcast for x in sc._pickled_broadcast_vars], AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars' at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397) at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main command = pickleSer._read_with_length(infile) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length return self.loads(obj) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads return pickle.loads(obj) File "test2.py", line 16, in <module> str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType()) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf return UserDefinedFunction(f, returnType) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__ self._judf = self._create_judf(name) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD [x._jbroadcast for x in sc._pickled_broadcast_vars], AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars' at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397) at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more 

As a check, I opened a clean shell and instead of importing the module, I just defined the str2num and UDF function in the interactive shell. Then I entered the contents of the last function and did the same final check:

 df2.show() 

This time I return the converted DataFrame that I expected.

Why does this work when functions are entered interactively and not when they are read from a module? I know that he is reading a module since the regular str2num function works.

+4
source share
4 answers

I had the same error and monitored the stack trace.

In my case, I created an Egg file and then passed it to sparking using the --py-files option.

As for the error, I think it comes down to the fact that when F.udf(str2num, t.IntegerType()) UserDefinedFunction , an instance of UserDefinedFunction is created before Spark starts, so it has an empty reference to some SparkContext , calls it sc . When you run UDF, it refers to sc._pickled_broadcast_vars , and this throws an AttributeError in your output.

My job is to avoid creating UDF until Spark is up and running (and therefore there is an active SparkContext . In your case, you can just change your definition

 def letConvNum(df): # df is a PySpark DataFrame #Get a list of columns that I want to transform, using the metadata Pandas DataFrame chng_cols=metadta[(metadta.comments=='letter conversion to num')].col_name.tolist() str2numUDF = F.udf(str2num, t.IntegerType()) # create UDF on demand for curcol in chng_cols: df=df.withColumn(curcol, str2numUDF(df[curcol])) return df 

Note. I really did not test the code above, but the change in my own code was similar, and everything worked fine.

Also, for the interested reader, see Code Sparks for UserDefinedFunction

+3
source

If you use only UDF inside other functions, you can do this.

 from pyspark.sql.functions import udf class Udf(object): def __init__(s, func, spark_type): s.func, s.spark_type = func, spark_type def __call__(s, *args): return udf(s.func, s.spark_type)(*args) myfunc_udf = Udf(myfunc, StringType()) def processing(): df_new = df.select(myfunc_udf('somefield')) 
+1
source

What spark version are you on btw?

Convert the function to UDF as follows:

 str2numUDF = F.udf(str2num, t.IntegerType()) 

Here you do not need the lambda function.

0
source

I am busting my head over this problem for 20 hours. Thank you guys for deciding!

here is my option, in case someone is interested in how I solved the same problem. although basically it is derived from the code / responses above.

The goal is to simply transform the columns of the rows to show their length, but you can do whatever you want (I check the data and error tracking in my main application).

what I use udf for is much more complicated, but this is what I did to verify that my udf is working now.

Assuming your dataframe is all StringType () in my case I had 4 row columns

Decision:

I made a separate .py file called myfunctions

inside him

 from pyspark.sql import functions as F from pyspark.sql.types import IntegerType import logging def str2num(text): if type(text) == None or text == '' or text == 'NULL' or text == 'null': return 0 else: return len(text) def letConvNum(df, columns): str2numUDF = F.udf(str2num, IntegerType()) logging.info(columns) index = 0 for curcol in columns: df = df.withColumn(curcol, str2numUDF(df[curcol])) index += 1 return df 

Then inside my main class add a new .py file to your sparkContext

 #my understanding is that this insures your function is added to a spark across all nodes sc.addPyFile("./myfunctions.py") #dynamically create headers based on config -simplified for example schemaString = "YearMonth,IMEI,IMSI,MSISDN" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(",")] schema = StructType(fields) df = sqlContext.read.format('com.databricks.spark.csv').options(header='false', inferschema='false', delimiter='|').load('/app/teacosy/invictus/kenya/SAF_QUALCOMM_IMEI_20170321.txt', schema=schema) #read and write file to get parquet. please note this was to optimize MASSIVE files 50-200g df.write.parquet("data.parquet", mode='overwrite') dataframe = sqlContext.read.parquet("data.parquet") df2 = mf.letConvNum(dataframe, schemaString.split(",")) df2.show() 

Input:

 +---------+---------------+---------------+------------+ |YearMonth| IMEI| IMSI| MSISDN| +---------+---------------+---------------+------------+ | 201609|869859025975610|639021005869699|254724884336| | 201609|359521062182040|639021025339132|254721224577| | 201609|353121070662770|639021025339132|254721224577| | 201609|868096015837410|639021025339132|254721224577| | 201609|866204020015610|639021025339132|254721224577| | 201609|356051060479107|639028040455896|254710404131| | 201609|353071062803703|639027641207269|254725555262| | 201609|356899067316490|639027841002602|254711955201| | 201609|860357020164930|639028550063234|254715570856| | 201609|862245026673900|639028940332785|254728412070| | 201609|352441075290910|639029340152407|254714582871| | 201609|862074027499277|639029340152407|254714582871| | 201609|357036073532528|639028500408346|254715408346| | 201609|356546060475230|639021011628783|254722841516| | 201609|356546060475220|639021011628783|254722841516| | 201609|866838023727117|639028840277749|254718492024| | 201609|354210053950950|639029440054836|254729308302| | 201609|866912020393040|639029870328080|254725528182| | 201609|357921070054540|639028340694869|254710255083| | 201609|357977056264767|639027141561199|254721977494| 

Output:

 +---------+----+----+------+ |YearMonth|IMEI|IMSI|MSISDN| +---------+----+----+------+ | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| | 6| 15| 15| 12| 

Hope this helps anyone struggling with their pyspark applications freezing or hanging ... so omg disappoints ...

0
source

Source: https://habr.com/ru/post/1269818/


All Articles