Convert Pandas DataFrame to Spark DataFrame

I asked the previous question about how to convert a scipy sparse matrix to pyspark.sql.dataframe.DataFrame and made some progress after reading the provided answer, as well as in this article . In the end, I came up with the following code to convert scipy.sparse.csc_matrix to pandas dataframe:

df = pd.DataFrame(csc_mat.todense()).to_sparse(fill_value=0) df.columns = header 

Then I tried to convert the pandas framework to a spark data block using the suggested syntax:

 spark_df = sqlContext.createDataFrame(df) 

However, I am returning the following error:

 ValueError: cannot create an RDD from type: <type 'list'> 

I do not believe this has anything to do with sqlContext, as I was able to convert another pandas framework of about the same size into a spark framework, no problem. Any thoughts?

2 answers

to_sparse(fill_value=0) is mostly deprecated. Just use the standard version.


and as long as the types are compatible, everything will be fine.


I'm not sure if this question is still relevant for the current version of pySpark, but here is a solution that I developed a couple of weeks after posting this question. The code is pretty ugly and possibly inefficient, but I am posting it here because of the ongoing interest in this subject.

 from pyspark import SparkContext from pyspark.sql import HiveContext from pyspark import SparkConf from py4j.protocol import Py4JJavaError myConf = SparkConf(loadDefaults=True) sc = SparkContext(conf=myConf) hc = HiveContext(sc) def chunks(lst, k): """Yield k chunks of close to equal size""" n = len(lst) / k for i in range(0, len(lst), n): yield lst[i: i + n] def reconstruct_rdd(lst, num_parts): partitions = chunks(lst, num_parts) for part in range(0, num_parts - 1): print "Partition ", part, " started..." partition = next(partitions) # partition is a list of lists if part == 0: prime_rdd = sc.parallelize(partition) else: second_rdd = sc.parallelize(partition) prime_rdd = prime_rdd.union(second_rdd) print "Partition ", part, " complete!" return prime_rdd def build_col_name_list(len_cols): name_lst = [] for i in range(1, len_cols): idx = "_" + str(i) name_lst.append(idx) return name_lst def set_spark_df_header(header, sdf): oldColumns = build_col_name_lst(len(sdf.columns)) newColumns = header sdf = reduce(lambda sdf, idx: sdf.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), sdf) return sdf def convert_pdf_matrix_to_sdf(pdf, sdf_header, num_of_parts): try: sdf = hc.createDataFrame(pdf) except ValueError: lst = pdf.values.tolist() #Need to convert to list of list to parallelize try: rdd = sc.parallelize(lst) except Py4JJavaError: rdd = reconstruct_rdd(lst, num_of_parts) sdf = hc.createDataFrame(rdd) sdf = set_spark_df_header(sdf_header, sdf) return sdf 


