Convert Pandas DataFrame to Spark DataFrame

I asked the previous question about how to convert a scipy sparse matrix to pyspark.sql.dataframe.DataFrame and made some progress after reading the provided answer, as well as in this article . In the end, I came up with the following code to convert scipy.sparse.csc_matrix to pandas dataframe:

df = pd.DataFrame(csc_mat.todense()).to_sparse(fill_value=0) df.columns = header 

Then I tried to convert the pandas framework to a spark data block using the suggested syntax:

 spark_df = sqlContext.createDataFrame(df) 

However, I am returning the following error:

 ValueError: cannot create an RDD from type: <type 'list'> 

I do not believe this has anything to do with sqlContext, as I was able to convert another pandas framework of about the same size into a spark framework, no problem. Any thoughts?

+7
source share
2 answers

to_sparse(fill_value=0) is mostly deprecated. Just use the standard version.

 sqlContext.createDataFrame(pd.DataFrame(csc_mat.todense())) 

and as long as the types are compatible, everything will be fine.

0
source

I'm not sure if this question is still relevant for the current version of pySpark, but here is a solution that I developed a couple of weeks after posting this question. The code is pretty ugly and possibly inefficient, but I am posting it here because of the ongoing interest in this subject.

 from pyspark import SparkContext from pyspark.sql import HiveContext from pyspark import SparkConf from py4j.protocol import Py4JJavaError myConf = SparkConf(loadDefaults=True) sc = SparkContext(conf=myConf) hc = HiveContext(sc) def chunks(lst, k): """Yield k chunks of close to equal size""" n = len(lst) / k for i in range(0, len(lst), n): yield lst[i: i + n] def reconstruct_rdd(lst, num_parts): partitions = chunks(lst, num_parts) for part in range(0, num_parts - 1): print "Partition ", part, " started..." partition = next(partitions) # partition is a list of lists if part == 0: prime_rdd = sc.parallelize(partition) else: second_rdd = sc.parallelize(partition) prime_rdd = prime_rdd.union(second_rdd) print "Partition ", part, " complete!" return prime_rdd def build_col_name_list(len_cols): name_lst = [] for i in range(1, len_cols): idx = "_" + str(i) name_lst.append(idx) return name_lst def set_spark_df_header(header, sdf): oldColumns = build_col_name_lst(len(sdf.columns)) newColumns = header sdf = reduce(lambda sdf, idx: sdf.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), sdf) return sdf def convert_pdf_matrix_to_sdf(pdf, sdf_header, num_of_parts): try: sdf = hc.createDataFrame(pdf) except ValueError: lst = pdf.values.tolist() #Need to convert to list of list to parallelize try: rdd = sc.parallelize(lst) except Py4JJavaError: rdd = reconstruct_rdd(lst, num_of_parts) sdf = hc.createDataFrame(rdd) sdf = set_spark_df_header(sdf_header, sdf) return sdf 
0
source

Source: https://habr.com/ru/post/1011997/


All Articles