What you come across is really strange. After playing a little, I finally realized that this could be due to a problem with the optimizer engine. It seems that the problem is not in the UDF function, but in the struct .
I make it work (Spark 1.6.3) when I cache groupedData , without caching. I get your reported exception:
import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Demo { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[1]")) val sqlContext = new HiveContext(sc) import sqlContext.implicits._ import org.apache.spark.sql.functions._ def processName = udf((input: Row) => { Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname")) }) val inputData = sc.parallelize( Seq(("1", "Kevin", "Costner")) ).toDF("id", "firstname", "lastname") val groupedData = inputData.withColumn("name", struct(inputData("firstname"), inputData("lastname"))) .cache() // does not work without cache val udfApply = groupedData.withColumn("newName", processName(groupedData("name"))) udfApply.show() } }
Alternatively, you can use the RDD API to create your own structure, but this is not very nice:
case class Name(firstname:String,lastname:String) // define outside main val groupedData = inputData.rdd .map{r => (r.getAs[String]("id"), Name( r.getAs[String]("firstname"), r.getAs[String]("lastname") ) ) } .toDF("id","name")
source share