How to load CSV with timestamps in custom format?

I have a timestamp field in a csv file that I upload to the framework using the csv spark library. The same piece of code works on my local machine with Spark 2.0, but it throws an error on Azure Hortonworks HDP 3.5 and 3.6.

I checked and Azure HDInsight 3.5 also uses the same version of Spark, so I don't think this is a problem with the version of Spark.

import org.apache.spark.sql.types._
val sourceFile = "C:\\2017\\datetest"
val sourceSchemaStruct = new StructType()
  .add("EventDate",DataTypes.TimestampType)
  .add("Name",DataTypes.StringType)
val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .option("delimiter","|")
  .option("mode","FAILFAST")
  .option("inferSchema","false")
  .option("dateFormat","yyyy/MM/dd HH:mm:ss.SSS")
  .schema(sourceSchemaStruct)
  .load(sourceFile)

All exception looks like this:

Caused by: java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
  at java.sql.Timestamp.valueOf(Timestamp.java:237)
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:179)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply$mcJ$sp(UnivocityParser.scala:142)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
  at scala.util.Try.getOrElse(Try.scala:79)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:139)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:135)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$nullSafeDatum(UnivocityParser.scala:179)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:135)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:134)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:215)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:187)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
  at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61)
  ... 27 more

The csv file has only one line as follows:

"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"
+4
source share
2 answers

TL DR Use parameter timestampFormat(not dateFormat).


Spark 2.3.0-SNAPSHOT ( ).

// OS shell
$ cat so-43259485.csv
"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"

// spark-shell
scala> spark.version
res1: String = 2.3.0-SNAPSHOT

case class Event(EventDate: java.sql.Timestamp, Name: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Event].schema

scala> spark
  .read
  .format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .schema(schema)
  .load("so-43259485.csv")
  .show(false)
17/04/08 11:03:42 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
    at java.sql.Timestamp.valueOf(Timestamp.java:237)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:167)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply$mcJ$sp(UnivocityParser.scala:146)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
    at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
    at scala.util.Try.getOrElse(Try.scala:79)

Spark " " :

Timestamp.valueOf(s)

javadoc Timestamp.valueOf, , :

timestamp yyyy-[m]m-[d]d hh:mm:ss[.f...]. . mm dd .

. " ", , EventDate String, .

val eventsAsString = spark.read.format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .load("so-43259485.csv")

, TimestampType Spark timestampFormat, , , Timestamp.valueOf.

, timestampFormat ( dateFormat!).

val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .option("delimiter","|")
  .option("mode","FAILFAST")
  .option("inferSchema","false")
  .option("timestampFormat","yyyy/MM/dd HH:mm:ss.SSS")
  .schema(sourceSchemaStruct)
  .load(sourceFile)
scala> df.show(false)
+-----------------------+----+
|EventDate              |Name|
+-----------------------+----+
|2016-12-19 00:43:27.583|adam|
+-----------------------+----+

2.1.0

CSV inferSchema timestampFormat.

, inferSchema timestampFormat, .

val events = spark.read
  .format("csv")
  .option("header", true)
  .option("mode","FAILFAST")
  .option("delimiter","|")
  .option("inferSchema", true)
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
  .load("so-43259485.csv")

scala> events.show(false)
+-------------------+----+
|EventDate          |Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+

scala> events.printSchema
root
 |-- EventDate: timestamp (nullable = true)
 |-- Name: string (nullable = true)

"" ​​

val events = eventsAsString
  .withColumn("date", split($"EventDate", " ")(0))
  .withColumn("date", translate($"date", "/", "-"))
  .withColumn("time", split($"EventDate", " ")(1))
  .withColumn("time", split($"time", "[.]")(0))    // <-- remove millis part
  .withColumn("EventDate", concat($"date", lit(" "), $"time")) // <-- make EventDate right
  .select($"EventDate" cast "timestamp", $"Name")

scala> events.printSchema
root
 |-- EventDate: timestamp (nullable = true)
 |-- Name: string (nullable = true)
    events.show(false)

scala> events.show
+-------------------+----+
|          EventDate|Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+

2.2.0

Spark 2.2, to_timestamp timestamp.

eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)

scala> eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
+-----------------------+----------------------------------------------------+
|EventDate              |to_timestamp(`EventDate`, 'yyyy/MM/dd HH:mm:ss.SSS')|
+-----------------------+----------------------------------------------------+
|2016/12/19 00:43:27.583|2016-12-19 00:43:27                                 |
+-----------------------+----------------------------------------------------+
+4

Github https://github.com/databricks/spark-csv/pull/280, ​​ . code, , ​​ inferSchema false .

inferSchema: . false

inferSchema true yyyy/MM/dd HH:mm:ss.SSS SimpleDateFormat.

0

Source: https://habr.com/ru/post/1674221/


All Articles