Applying IndexToString to a Vector in Spark

Context: I have a data frame where all categorical values ​​have been indexed using StringIndexer.

val categoricalColumns = df.schema.collect { case StructField(name, StringType, nullable, meta) => name } val categoryIndexers = categoricalColumns.map { col => new StringIndexer().setInputCol(col).setOutputCol(s"${col}Indexed") } 

Then I used VectorAssembler to vectorize all function columns (including indexed categorical ones).

 val assembler = new VectorAssembler() .setInputCols(dfIndexed.columns.diff(List("label") ++ categoricalColumns)) .setOutputCol("features") 

After applying the classifier and several additional steps, I get a data frame with a label, functions and forecast. I would like to expand my vector of column separator objects to convert indexed values ​​back to their original String form.

 val categoryConverters = categoricalColumns.zip(categoryIndexers).map { colAndIndexer => new IndexToString().setInputCol(s"${colAndIndexer._1}Indexed").setOutputCol(colAndIndexer._1).setLabels(colAndIndexer._2.fit(df).labels) } 

Question: Is there an easy way to do this or a better approach to somehow bind a forecast column to a frame of test data?

What I tried:

 val featureSlicers = categoricalColumns.map { col => new VectorSlicer().setInputCol("features").setOutputCol(s"${col}Indexed").setNames(Array(s"${col}Indexed")) } 

Applying this, I need the columns that I want, but they are in vector form (as it should be done), and not of type Double.

Edit: The desired result is the original data frame (i.e. Categorical functions as String not index) with an additional column representing the predicted label (which in my case is 0 or 1).

For example, let's say that the result of my classifier looked something like this:

 +-----+---------+----------+ |label| features|prediction| +-----+---------+----------+ | 1.0|[0.0,3.0]| 1.0| +-----+---------+----------+ 

Using VectorSlicer for each function, I get:

 +-----+---------+----------+-------------+-------------+ |label| features|prediction|statusIndexed|artistIndexed| +-----+---------+----------+-------------+-------------+ | 1.0|[0.0,3.0]| 1.0| [0.0]| [3.0]| +-----+---------+----------+-------------+-------------+ 

This is great, but I need:

 +-----+---------+----------+-------------+-------------+ |label| features|prediction|statusIndexed|artistIndexed| +-----+---------+----------+-------------+-------------+ | 1.0|[0.0,3.0]| 1.0| 0.0 | 3.0 | +-----+---------+----------+-------------+-------------+ 

To be able to use IndexToString and convert it to:

 +-----+---------+----------+-------------+-------------+ |label| features|prediction| status | artist | +-----+---------+----------+-------------+-------------+ | 1.0|[0.0,3.0]| 1.0| good | Pink Floyd | +-----+---------+----------+-------------+-------------+ 

or even:

 +-----+----------+-------------+-------------+ |label|prediction| status | artist | +-----+----------+-------------+-------------+ | 1.0| 1.0| good | Pink Floyd | +-----+----------+-------------+-------------+ 
+5
source share
1 answer

Well, this is not a very useful operation, but it should be possible to extract the required information using column metadata and simple UDF. I assume that your data has created a pipeline similar to this:

 import org.apache.spark.ml.feature.{VectorSlicer, VectorAssembler, StringIndexer} import org.apache.spark.ml.Pipeline val df = sc.parallelize(Seq( (1L, "a", "foo", 1.0), (2L, "b", "bar", 2.0), (3L, "a", "bar", 3.0) )).toDF("id", "x1", "x2", "x3") val featureCols = Array("x1", "x2", "x3") val featureColsIdx = featureCols.map(c => s"${c}_i") val indexers = featureCols.map( c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_i") ) val assembler = new VectorAssembler() .setInputCols(featureColsIdx) .setOutputCol("features") val slicer = new VectorSlicer() .setInputCol("features") .setOutputCol("string_features") .setNames(featureColsIdx.init) val transformed = new Pipeline() .setStages(indexers :+ assembler :+ slicer) .fit(df) .transform(df) 

First, we can extract the desired metadata from the functions:

 val meta = transformed.select($"string_features") .schema.fields.head.metadata .getMetadata("ml_attr") .getMetadata("attrs") .getMetadataArray("nominal") 

and convert it to something more convenient to use

 case class NominalMetadataWrapper(idx: Long, name: String, vals: Array[String]) // In general it could a good idea to make it a broadcast variable val lookup = meta.map(m => NominalMetadataWrapper( m.getLong("idx"), m.getString("name"), m.getStringArray("vals") )) 

Finally, a little UDF:

 import scala.util.Try val transFeatures = udf((v: Vector) => lookup.map{ m => Try(m.vals(v(m.idx.toInt).toInt)).toOption }) transformed.select(transFeatures($"string_features")). 
+3
source

Source: https://habr.com/ru/post/1243144/


All Articles