I ran into a problem that I could not overcome for ages.
I'm on Spark 1.4 and Scala 2.10. I cannot update at the moment (large distributed infrastructure)
I have a file with several hundred columns, only 2 of which are string and the rest are long. I want to convert this data to a Label / Features data frame.
I managed to get it in LibSVM format.
I just can't get it in the Label / Features format.
The reason is
I cannot use toDF () as shown here https://spark.apache.org/docs/1.5.1/ml-ensembles.html
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
it is not supported in 1.4
So I first converted the txtFile to a DataFrame, where I used something like this
def getColumnDType(columnName:String):StructField = { if((columnName== "strcol1") || (columnName== "strcol2")) return StructField(columnName, StringType, false) else return StructField(columnName, LongType, false) } def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = { val sfRDD = sc.textFile(staticfeatures_filepath)// val sqlContext = new org.apache.spark.sql.SQLContext(sc) // reads a space delimited string from application.properties file val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("") // Generate the schema based on the string of schema val schema = StructType( schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName))) val data = sfRDD .map(line => line.split(",")) .map(p => Row.fromSeq(p.toSeq)) var df = sqlContext.createDataFrame(data, schema) //schemaString.split(" ").drop(4) //.map(s => df = convertColumn(df, s, "int")) return df }
When I do df.na.drop() df.printSchema()
with this returned data frame, I get a perfect outline
root |-- rand_entry: long (nullable = false) |-- strcol1: string (nullable = false) |-- label: long (nullable = false) |-- strcol2: string (nullable = false) |-- f1: long (nullable = false) |-- f2: long (nullable = false) |-- f3: long (nullable = false) and so on till around f300
But - the sad part is what I'm trying to do (see below) with df, I always get a ClassCastException (java.lang.String cannot be cast to java.lang.Long)
val featureColumns = Array("f1","f2",....."f300") assertEquals(-99,df.select("f1").head().getLong(0)) assertEquals(-99,df.first().get(4)) val transformeddf = new VectorAssembler() .setInputCols(featureColumns) .setOutputCol("features") .transform(df)
So - bad - although the schema says Long - df still internally treats everything as String.
edit
Adding a simple example
Say I have a file like this
1,A,20,P,-99,1,0,0,8,1,1,1,1,131153 1,B,23,P,-99,0,1,0,7,1,1,0,1,65543 1,C,24,P,-99,0,1,0,9,1,1,1,1,262149 1,D,7,P,-99,0,0,0,8,1,1,1,1,458759
and
sf-schema=f0 strCol1 f1 strCol2 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11
(column names really don't matter, so you can ignore these details)
All I'm trying to do is create a data frame type of type Label / Features, in which my 3rd column becomes a label, and the 5-11th columns become a [Vector] column. So I can follow the rest of the steps at https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles .
I threw the columns as suggested by zero323
val types = Map("strCol1" -> "string", "strCol2" -> "string") .withDefault(_ => "bigint") df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*) df = df.drop("f0") df = df.drop("strCol1") df = df.drop("strCol2")
But Assert and VectorAssembler still fail. featureColumns = Array ("f2", "f3", ..... "f11") This is the whole sequence that I want to do after I have df
var transformeddf = new VectorAssembler() .setInputCols(featureColumns) .setOutputCol("features") .transform(df) transformeddf.show(2) transformeddf = new StringIndexer() .setInputCol("f1") .setOutputCol("indexedF1") .fit(transformeddf) .transform(transformeddf) transformeddf.show(2) transformeddf = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .setMaxCategories(5) .fit(transformeddf) .transform(transformeddf)
ScalaIDE exception tracing - only when it gets into VectorAssembler, as shown below
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118) at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75) at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75) at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56) at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72) at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70) at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)