Spark Struct structural object names changed in UDF

I am trying to pass a structure into a spark before udf. It changes the field names and is renamed to the column position. How to fix it?

object TestCSV { def main(args: Array[String]) { val conf = new SparkConf().setAppName("localTest").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val inputData = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter","|") .option("header", "true") .load("test.csv") inputData.printSchema() inputData.show() val groupedData = inputData.withColumn("name",struct(inputData("firstname"),inputData("lastname"))) val udfApply = groupedData.withColumn("newName",processName(groupedData("name"))) udfApply.show() } def processName = udf((input:Row) =>{ println(input) println(input.schema) Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname")) }) } 

Output:

  root |-- id: string (nullable = true) |-- firstname: string (nullable = true) |-- lastname: string (nullable = true) +---+---------+--------+ | id|firstname|lastname| +---+---------+--------+ | 1| jack| reacher| | 2| john| Doe| +---+---------+--------+ 

Error:

[jack, Reacher] StructType (StructField (i [1], StringType, true),> StructField (i [2], StringType, true)) 03/17/08 09:45:35 ERROR Contractor: Exception in task 0.0 on step 2.0 (TID 2) java.lang.IllegalArgumentException: The "firstname" field does not exist.

+5
source share
1 answer

What you come across is really strange. After playing a little, I finally realized that this could be due to a problem with the optimizer engine. It seems that the problem is not in the UDF function, but in the struct .

I make it work (Spark 1.6.3) when I cache groupedData , without caching. I get your reported exception:

 import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Demo { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[1]")) val sqlContext = new HiveContext(sc) import sqlContext.implicits._ import org.apache.spark.sql.functions._ def processName = udf((input: Row) => { Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname")) }) val inputData = sc.parallelize( Seq(("1", "Kevin", "Costner")) ).toDF("id", "firstname", "lastname") val groupedData = inputData.withColumn("name", struct(inputData("firstname"), inputData("lastname"))) .cache() // does not work without cache val udfApply = groupedData.withColumn("newName", processName(groupedData("name"))) udfApply.show() } } 

Alternatively, you can use the RDD API to create your own structure, but this is not very nice:

 case class Name(firstname:String,lastname:String) // define outside main val groupedData = inputData.rdd .map{r => (r.getAs[String]("id"), Name( r.getAs[String]("firstname"), r.getAs[String]("lastname") ) ) } .toDF("id","name") 
+1
source

Source: https://habr.com/ru/post/1265187/


All Articles