How to use java.time.LocalDate in datasets (crash with java.lang.UnsupportedOperationException: encoder not found)?

  • Spark 2.1.1
  • Scala 2.11.8
  • Java 8
  • Linux Ubuntu 16.04 LTS

I would like to convert my RDD to a dataset. For this, I use the implicits toDS() method, which gives me the following error:

 Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate - field (class: "java.time.LocalDate", name: "date") - root class: "observatory.TemperatureRow" at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:344) 

In my case, I have to use the java.time.LocalDate type, I cannot use java.sql.data . I read that I need to tell Spark how to convert Java type to Sql type, in this direction I build 2 implicits functions below:

 implicit def toSerialized(t: TemperatureRow): EncodedTemperatureRow = EncodedTemperatureRow(t.date.toString, t.location, t.temperature) implicit def fromSerialized(t: EncodedTemperatureRow): TemperatureRow = TemperatureRow(LocalDate.parse(t.date), t.location, t.temperature) 

Below is the code of my application:

 case class Location(lat: Double, lon: Double) case class TemperatureRow( date: LocalDate, location: Location, temperature: Double ) case class EncodedTemperatureRow( date: String, location: Location, temperature: Double val s = Seq[TemperatureRow]( TemperatureRow(LocalDate.parse("2017-01-01"), Location(1.4,5.1), 4.9), TemperatureRow(LocalDate.parse("2014-04-05"), Location(1.5,2.5), 5.5) ) import spark.implicits._ val temps: RDD[TemperatureRow] = sc.parallelize(s) val tempsDS = temps.toDS 

I don't know why Spark is looking for an encoder for java.time.LocalDate , I provide implicit conversions for TemperatureRow and EncodedTemperatureRow ...

+5
source share
1 answer

java.time.LocalDate not supported until Spark 2.2 (and I tried to write Encoder for the type for some time and failed ).

You need to convert java.time.LocalDate to another supported type, and java.sql.Timestamp or java.sql.Date are supported candidates.

+4
source

Source: https://habr.com/ru/post/1269985/


All Articles