I have a module that I wrote containing functions that act on PySpark DataFrames. They perform column-by-column conversion in a DataFrame, and then return a new DataFrame. Here is a sample code abbreviated to include only one of the functions:
from pyspark.sql import functions as F from pyspark.sql import types as t import pandas as pd import numpy as np metadta=pd.DataFrame(pd.read_csv("metadata.csv"))
So this is my module, name it mymodule.py. If I run the PySpark shell, and I do the following:
import mymodule as mm myf=sqlContext.sql("select * from tablename lim 10")
I am checking myf (PySpark DataFrame) and that is fine. I verify that I actually imported mymodule while trying to use the str2num function:
mm.str2num('a') 97
Thus, it actually imports the module. Then if I try this:
df2=mm.letConvNum(df)
And do this to verify that it worked:
df2.show()
He tries to execute the action, but then it fails:
16/03/10 16:10:44 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 365) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main command = pickleSer._read_with_length(infile) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length return self.loads(obj) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads return pickle.loads(obj) File "test2.py", line 16, in <module> str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType()) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf return UserDefinedFunction(f, returnType) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__ self._judf = self._create_judf(name) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD [x._jbroadcast for x in sc._pickled_broadcast_vars], AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars' at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397) at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/03/10 16:10:44 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/dataframe.py", line 256, in show print(self._jdf.showString(n, truncate)) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in deco return f(*a, **kw) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o7299.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 365, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main command = pickleSer._read_with_length(infile) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length return self.loads(obj) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads return pickle.loads(obj) File "test2.py", line 16, in <module> str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType()) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf return UserDefinedFunction(f, returnType) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__ self._judf = self._create_judf(name) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD [x._jbroadcast for x in sc._pickled_broadcast_vars], AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars' at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397) at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main command = pickleSer._read_with_length(infile) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length return self.loads(obj) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads return pickle.loads(obj) File "test2.py", line 16, in <module> str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType()) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf return UserDefinedFunction(f, returnType) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__ self._judf = self._create_judf(name) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD [x._jbroadcast for x in sc._pickled_broadcast_vars], AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars' at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397) at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more
As a check, I opened a clean shell and instead of importing the module, I just defined the str2num and UDF function in the interactive shell. Then I entered the contents of the last function and did the same final check:
df2.show()
This time I return the converted DataFrame that I expected.
Why does this work when functions are entered interactively and not when they are read from a module? I know that he is reading a module since the regular str2num function works.