Scala & Spark: multiple columns at once

Since it VectorAssemblercrashes if the passed column is of any type other than NumericTypeor BooleanType, and I deal with many columns TimestampType, I want to know:

Is there an easy way to create multiple columns at once?

Based on this answer , I already have a convenient way to cast a single column:

def castColumnTo(df: DataFrame, 
    columnName: String, 
    targetType: DataType ) : DataFrame = {
      df.withColumn( columnName, df(columnName).cast(targetType) )
}

I was thinking of a recursive call castColumnTo, but I strongly doubt that this is a (perfect) way.

+5
source share
4 answers

Based on the comments (thanks!) I came up with the following code (no error handling):

def castAllTypedColumnsTo(df: DataFrame, 
   sourceType: DataType, targetType: DataType) : DataFrame = {

      val columnsToBeCasted = df.schema
         .filter(s => s.dataType == sourceType)

      //if(columnsToBeCasted.length > 0) {
      //   println(s"Found ${columnsToBeCasted.length} columns " +
      //      s"(${columnsToBeCasted.map(s => s.name).mkString(",")})" +
      //      s" - casting to ${targetType.typeName.capitalize}Type")
      //}

      columnsToBeCasted.foldLeft(df){(foldedDf, col) => 
         castColumnTo(foldedDf, col.name, LongType)}
}

. foldLeft ( ) a for var.

+5

scala

def castAllTypedColumnsTo(df: DataFrame, sourceType: DataType, targetType: DataType) = {
df.schema.filter(_.dataType == sourceType).foldLeft(df) {
    case (acc, col) => acc.withColumn(col.name, df(col.name).cast(targetType))
 }
}
+7
FastDf = (spark.read.csv("Something.csv", header = False, mode="DRPOPFORMED"))
FastDf.OldTypes = [feald.dataType for feald in FastDf.schema.fields]
FastDf.NewTypes = [StringType(), FloatType(), FloatType(), IntegerType()]
FastDf.OldColnames = FastDf.columns
FastDf.NewColnames = ['S_tring', 'F_loat', 'F_loat2', 'I_nteger']
FastDfSchema = FastDf.select(*
                             (FastDf[colnumber]
                              .cast(FastDf.NewTypes[colnumber])
                              .alias(FastDf.NewColnames[colnumber]) 
                                  for colnumber in range(len(FastDf.NewTypes)
                                                )
                             )
                            )

I know this is in pyspark, but the logic may be convenient.

0
source

I am translating a scala program for python. I found a reasonable answer to your problem. The column is called V1 - V28, Time, Amount, Class. (I'm not scala pro) The solution is as follows.

// cast all the column to Double type.
val df = raw.select(((1 to 28).map(i => "V" + i) ++ Array("Time", "Amount", "Class")).map(s => col(s).cast("Double")): _*)

Link: https://github.com/intel-analytics/analytics-zoo/blob/master/apps/fraudDetection/Fraud%20Detction.ipynb

0
source

Source: https://habr.com/ru/post/1668593/


All Articles