Spark DataFrame does not obey the scheme and treats everything as a String

I ran into a problem that I could not overcome for ages.

  1. I'm on Spark 1.4 and Scala 2.10. I cannot update at the moment (large distributed infrastructure)

  2. I have a file with several hundred columns, only 2 of which are string and the rest are long. I want to convert this data to a Label / Features data frame.

  3. I managed to get it in LibSVM format.

  4. I just can't get it in the Label / Features format.

The reason is

  1. I cannot use toDF () as shown here https://spark.apache.org/docs/1.5.1/ml-ensembles.html

    val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF() 

    it is not supported in 1.4

  2. So I first converted the txtFile to a DataFrame, where I used something like this

     def getColumnDType(columnName:String):StructField = { if((columnName== "strcol1") || (columnName== "strcol2")) return StructField(columnName, StringType, false) else return StructField(columnName, LongType, false) } def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = { val sfRDD = sc.textFile(staticfeatures_filepath)// val sqlContext = new org.apache.spark.sql.SQLContext(sc) // reads a space delimited string from application.properties file val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("") // Generate the schema based on the string of schema val schema = StructType( schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName))) val data = sfRDD .map(line => line.split(",")) .map(p => Row.fromSeq(p.toSeq)) var df = sqlContext.createDataFrame(data, schema) //schemaString.split(" ").drop(4) //.map(s => df = convertColumn(df, s, "int")) return df } 

When I do df.na.drop() df.printSchema() with this returned data frame, I get a perfect outline

 root |-- rand_entry: long (nullable = false) |-- strcol1: string (nullable = false) |-- label: long (nullable = false) |-- strcol2: string (nullable = false) |-- f1: long (nullable = false) |-- f2: long (nullable = false) |-- f3: long (nullable = false) and so on till around f300 

But - the sad part is what I'm trying to do (see below) with df, I always get a ClassCastException (java.lang.String cannot be cast to java.lang.Long)

 val featureColumns = Array("f1","f2",....."f300") assertEquals(-99,df.select("f1").head().getLong(0)) assertEquals(-99,df.first().get(4)) val transformeddf = new VectorAssembler() .setInputCols(featureColumns) .setOutputCol("features") .transform(df) 

So - bad - although the schema says Long - df still internally treats everything as String.

edit

Adding a simple example

Say I have a file like this

 1,A,20,P,-99,1,0,0,8,1,1,1,1,131153 1,B,23,P,-99,0,1,0,7,1,1,0,1,65543 1,C,24,P,-99,0,1,0,9,1,1,1,1,262149 1,D,7,P,-99,0,0,0,8,1,1,1,1,458759 

and

 sf-schema=f0 strCol1 f1 strCol2 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11 

(column names really don't matter, so you can ignore these details)

All I'm trying to do is create a data frame type of type Label / Features, in which my 3rd column becomes a label, and the 5-11th columns become a [Vector] column. So I can follow the rest of the steps at https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles .

I threw the columns as suggested by zero323

 val types = Map("strCol1" -> "string", "strCol2" -> "string") .withDefault(_ => "bigint") df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*) df = df.drop("f0") df = df.drop("strCol1") df = df.drop("strCol2") 

But Assert and VectorAssembler still fail. featureColumns = Array ("f2", "f3", ..... "f11") This is the whole sequence that I want to do after I have df

  var transformeddf = new VectorAssembler() .setInputCols(featureColumns) .setOutputCol("features") .transform(df) transformeddf.show(2) transformeddf = new StringIndexer() .setInputCol("f1") .setOutputCol("indexedF1") .fit(transformeddf) .transform(transformeddf) transformeddf.show(2) transformeddf = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .setMaxCategories(5) .fit(transformeddf) .transform(transformeddf) 

ScalaIDE exception tracing - only when it gets into VectorAssembler, as shown below

 java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118) at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75) at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75) at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56) at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72) at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70) at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 
+6
source share
1 answer

You get a ClassCastException because this is exactly what should happen. The schema argument is not used for automatic casting (some DataSources can use the schema this way, but not methods like createDataFrame ). It only declares what types of values โ€‹โ€‹are stored in strings. You are responsible for transmitting data that conforms to the scheme, and not vice versa.

As long as the DataFrame shows the schema that you declared, it is checked only at runtime, so the runtime exception. If you want to convert data to specific, you have cast data explicitly.

  • First read all columns as StringType :

     val rows = sc.textFile(staticfeatures_filepath) .map(line => Row.fromSeq(line.split(","))) val schema = StructType( schemaString.split(" ").map( columnName => StructField(columnName, StringType, false) ) ) val df = sqlContext.createDataFrame(rows, schema) 
  • Then select the selected columns for the desired type:

     import org.apache.spark.sql.types.{LongType, StringType} val types = Map("strcol1" -> StringType, "strcol2" -> StringType) .withDefault(_ => LongType) val casted = df.select(df.columns.map(c => col(c).cast(types(c)).alias(c)): _*) 
  • Use assembler:

     val transformeddf = new VectorAssembler() .setInputCols(featureColumns) .setOutputCol("features") .transform(casted) 

You can simply follow steps 1 and 2 with spark-csv :

 // As originally val schema = StructType( schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName))) val df = sqlContext .read.schema(schema) .format("com.databricks.spark.csv") .option("header", "false") .load(staticfeatures_filepath) 

See also Correct reading of types from a file in PySpark

+8
source

Source: https://habr.com/ru/post/1245025/


All Articles