PySpark threshold error The __getnewargs __ ([]) method does not exist

I have a set of files. The file path is saved in the file. Let's say "all_files.txt". Using the apache spark, I need to perform an operation on all files and collect the results.

The steps I want to take are as follows:

  • Create an RDD by reading "all_files.txt"
  • For each line in "all_files.txt" (each line is a path to some file) read the contents of each file in one RDD
  • Then do the operation with all the contents

This is the code I wrote for it:

def return_contents_from_file (file_name): return spark.read.text(file_name).rdd.map(lambda r: r[0]) def run_spark(): file_name = 'path_to_file' spark = SparkSession \ .builder \ .appName("PythonWordCount") \ .getOrCreate() counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file .flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files .flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files 

This causes an error:

line 323, in get_return_value py4j.protocol.Py4JError: an error occurred while calling o25. getnewargs . Trace: py4j.Py4JException: The getnewargs ([]) method does not exist when py4j.reflection.ReflectionEngine.getMethod (ReflectionEngine.javahaps18) in py4j.reflection.ReflectionEngine.getMethod (ReflectionEngine.java:326) on py4jvoke (Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) at py4j.commands.CallCommand.execute (CallCommand.java:79) at py4j.GatewayConnection.run (GatewayConnection.java:14) in java.lang.Thread.run (Thread.java:745)

Can someone please tell me what I am doing wrong and how I should move on. Thanks in advance.

+15
source share
2 answers

Using spark inside flatMap or any conversion that occurs with artists is not allowed ( spark session is available only for the driver). It is also not possible to create RDDs from RDD (see Is it possible to create nested RDDs in Apache Spark? )

But you can achieve this conversion in a different way - read the entire contents of all_files.txt in a dataframe, use local map to make them dataframes and local reduce to combine all, see an example:

 >>> filenames = spark.read.text('all_files.txt').collect() >>> dataframes = map(lambda r: spark.read.text(r[0]), filenames) >>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes) 
+19
source

I ran into this problem today, finally found out that I was referencing the spark.DataFrame object in pandas_udf , which leads to this error.

Conclusion:

You cannot use sparkSession , spark.DataFrame or other distributed Spark objects in udf and pandas_udf because they are not selected.

If you encounter this error and use udf , check it carefully, there should be a relative problem.

+1
source

Source: https://habr.com/ru/post/1012115/


All Articles