This operation is not supported by the DataFrame API. It is possible that zip two RDDs, but for it to work, you must match both the number of sections and the number of elements per section. Suppose this is so:
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructField, StructType, LongType} val a: DataFrame = sc.parallelize(Seq( ("abc", 123), ("cde", 23))).toDF("column_1", "column_2") val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3") // Merge rows val rows = a.rdd.zip(b.rdd).map{ case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}
If the above conditions are not met, the only question that comes to mind is adding an index and concatenating:
def addIndex(df: DataFrame) = sqlContext.createDataFrame( // Add index df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)}, // Create schema StructType(df.schema.fields :+ StructField("_index", LongType, false)) ) // Add indices val aWithIndex = addIndex(a) val bWithIndex = addIndex(b) // Join and clean val ab = aWithIndex .join(bWithIndex, Seq("_index")) .drop("_index")
source share