Spark SQL - reading csv using schema

I encountered a problem while trying to use Spark to simply read a CSV file. After such an operation, I would like to make sure that:

  • data types are correct (using the provided schema)
  • the headers are correct with respect to the scheme provided

What is the code that I use and have problems with:

val schema = Encoders.product[T].schema
val df = spark.read
 .schema(schema)
 .option("header", "true")
 .csv(fileName)

Type Thas type Product, i. e. case class. This works, but does not validate the column names., so I can provide another file and as long as the data types are correct, an error does not occur, and I do not know that the user provided the wrong file, but by some coincidence with the correct data types with proper ordering.

I tried to use the parameters that pass the schema, and then use the method .as[T]in the dataset, but if any column other than Stringis only null, it is interpreted by Spark as a column String, but in my schema it is Integer. Therefore, a cast exception occurs, but the column names have been checked correctly.

To summarize: I found a solution that I can provide the correct data types, but without headers and another solution that I can check the headers, but problems with data types. Is there a way to achieve both, i.e. e. a complete check of headers and types?

I am using Spark 2.2.0.

+4
source share
1 answer

It looks like you have to do it yourself by reading the file header twice.

Spark, ( ), , Spark .

:

val schema = Encoders.product[T].schema

// read the actual schema; This shouldn't be too expensive as Spark's
// laziness would avoid actually reading the entire file 
val fileSchema = spark.read
  .option("header", "true")
  .csv("test.csv").schema

// read the file using your own schema. You can later use this DF
val df = spark.read.schema(schema)
  .option("header", "true")
  .csv("test.csv")

// compare actual and expected column names:
val badColumnNames = fileSchema.fields.map(_.name)
  .zip(schema.fields.map(_.name))
  .filter { case (actual, expected) => actual != expected }

// fail if any inconsistency found:
assert(badColumnNames.isEmpty, 
  s"file schema does not match expected; Bad column names: ${badColumnNames.mkString("; ")}")
+1

Source: https://habr.com/ru/post/1687419/


All Articles