Convert RDD to DataFrame in Spark / Scala

RDD was created in the format Array[Array[String]] and has the following meanings:

  Array[Array[String]] = Array(Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1)) 

I want to create a DataFrame with a schema:

 val schemaString = "callId oCallId callTime duration calltype swId" 

Next steps:

 scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim)) rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39 scala> val calDF = sqlContext.createDataFrame(rowRDD, schema) 

Gives the following error:

console: 45: error: overloaded value of createDataFrame method with alternatives: (rdd: org.apache.spark.api.java.JavaRDD [], beanClass: Class []) org.apache.spark.sql.DataFrame (rdd: org. apache.spark.rdd.RDD [], beanClass: Class []) org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.api.java.JavaRDD [org.apache.spark.sql.Row] , schema: org.apache.spark.sql.types.StructType) org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.rdd.RDD [org.apache.spark.sql.Row], schema: org.apache.spark.sql.types.StructType) org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.rdd.RDD [Array [String]],
org.apache.spark.sql.types.StructType) val calDF = sqlContext.createDataFrame (rowRDD, schema)

+5
source share
3 answers

Just paste into spark-shell :

 val a = Array( Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1")) val rdd = sc.makeRDD(a) case class X(callId: String, oCallId: String, callTime: String, duration: String, calltype: String, swId: String) 

Then map() over the RDD to instantiate the case class, and then create a DataFrame using toDF() :

 scala> val df = rdd.map { case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF() df: org.apache.spark.sql.DataFrame = [callId: string, oCallId: string, callTime: string, duration: string, calltype: string, swId: string] 

This infers the schema from the case class.

Then you can continue:

 scala> df.printSchema() root |-- callId: string (nullable = true) |-- oCallId: string (nullable = true) |-- callTime: string (nullable = true) |-- duration: string (nullable = true) |-- calltype: string (nullable = true) |-- swId: string (nullable = true) scala> df.show() +----------+-------+-------------------+--------+--------+----+ | callId|oCallId| callTime|duration|calltype|swId| +----------+-------+-------------------+--------+--------+----+ |4580056797| 0|2015-07-29 10:38:42| 0| 1| 1| |4580056797| 0|2015-07-29 10:38:42| 0| 1| 1| +----------+-------+-------------------+--------+--------+----+ 

If you want to use toDF() in a regular program (and not in spark-shell ), make sure ( here ):

  • In import sqlContext.implicits._ immediately after creating the SQLContext
  • Define a case class outside the method using toDF()
+8
source

You need to convert Array to Row , and then define the circuit. I assumed most of your fields are Long

  val rdd: RDD[Array[String]] = ??? val rows: RDD[Row] = rdd map { case Array(callId, oCallId, callTime, duration, swId) => Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong) } object schema { val callId = StructField("callId", LongType) val oCallId = StructField("oCallId", StringType) val callTime = StructField("callTime", StringType) val duration = StructField("duration", LongType) val swId = StructField("swId", LongType) val struct = StructType(Array(callId, oCallId, callTime, duration, swId)) } sqlContext.createDataFrame(rows, schema.struct) 
+4
source

I assume your schema , as in the Spark Guide , is as follows:

 val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) 

If you look at the signature of createDataFrame , here is the one that takes StructType as 2nd argument (for Scala)

def createDataFrame (rowRDD: RDD [Row], schema: StructType): DataFrame

Creates a DataFrame from RDDs containing rows using the specified schema.

Therefore, it takes as its first argument a RDD[Row] . What you have in rowRDD is RDD[Array[String]] , so there is a mismatch.

Do you need RDD[Array[String]] ?

Otherwise, you can use the following to create your data frame:

 val rowRDD = rdd.map(p => Row(p(0), p(1), p(2),p(3),p(4),p(5).trim)) 
+1
source

Source: https://habr.com/ru/post/1233709/


All Articles