How to pin two (or more) DataFrames in Spark

I have two DataFrame a and b . a looks like

 Column 1 | Column 2 abc | 123 cde | 23 

b is like

 Column 1 1 2 

I want zip a and b (or even more) DataFrames, which becomes something like:

 Column 1 | Column 2 | Column 3 abc | 123 | 1 cde | 23 | 2 

How can i do this?

+5
source share
2 answers

This operation is not supported by the DataFrame API. It is possible that zip two RDDs, but for it to work, you must match both the number of sections and the number of elements per section. Suppose this is so:

 import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructField, StructType, LongType} val a: DataFrame = sc.parallelize(Seq( ("abc", 123), ("cde", 23))).toDF("column_1", "column_2") val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3") // Merge rows val rows = a.rdd.zip(b.rdd).map{ case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)} // Merge schemas val schema = StructType(a.schema.fields ++ b.schema.fields) // Create new data frame val ab: DataFrame = sqlContext.createDataFrame(rows, schema) 

If the above conditions are not met, the only question that comes to mind is adding an index and concatenating:

 def addIndex(df: DataFrame) = sqlContext.createDataFrame( // Add index df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)}, // Create schema StructType(df.schema.fields :+ StructField("_index", LongType, false)) ) // Add indices val aWithIndex = addIndex(a) val bWithIndex = addIndex(b) // Join and clean val ab = aWithIndex .join(bWithIndex, Seq("_index")) .drop("_index") 
+16
source

In Scala's Dataframes implementation, there is no easy way to combine two data frames into one. We can just get around this limitation by adding indexes to each row of data. Then we can do an inner join with these indices. This is my stub code for this implementation:

 val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2") val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId) val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3") val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId) aWithId.join(bWithId, "id") 

A bit easy to read - See how Python does it!

+1
source

Source: https://habr.com/ru/post/1232685/


All Articles