Sparks Scala 2.10 motorcade

I have a DataFrame with 66 columns to process (almost every column value needs to be changed somehow), so I run the following statement

    val result = data.map(row=> (
        modify(row.getString(row.fieldIndex("XX"))),
        (...)
        )
    )

to the 66th column. Since scala in this version has a limit on the maximum set of 22 pairs, I cannot do this. The question is, is there any workaround for this? After all line operations, I convert it to df with specific column names

   result.toDf("c1",...,"c66")
   result.storeAsTempTable("someFancyResult")

“change” is just an example showing my point

+4
source share
2 answers

, , DataFrame, UDF RDD:

import org.apache.spark.sql.functions.udf

val modifyUdf = udf(modify)
data.withColumn("c1", modifyUdf($"c1"))

- , , , - DataFrame RDD[Row]. :

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, IntegerType}


val result: RDD[Row] = data.map(row => {
  val buffer = ArrayBuffer.empty[Any]

  // Add value to buffer
  buffer.append(modify(row.getAs[String]("c1")))

  // ... repeat for other values

  // Build row
  Row.fromSeq(buffer)
})

// Create schema
val schema = StructType(Seq(
  StructField("c1", StringType, false),
  // ...  
  StructField("c66", StringType, false)
))

sqlContext.createDataFrame(result, schema)
+6

, , , , , 22 :

object SimpleApp {
  class Record(val x1: String, val x2: String, val x3: String, ... val x24:String) extends Product with Serializable {
    def canEqual(that: Any) = that.isInstanceOf[Record]

    def productArity = 24

    def productElement(n: Int) = n match {
      case 0 => x1
      case 1 => x2
      case 2 => x3
      ...
      case 23 => x24
    }
  }

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Product Test")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc);

    val record = new Record("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x")

    import sqlContext._
    sc.parallelize(record :: Nil).registerAsTable("records")

    sql("SELECT x1 FROM records").collect()
  }
}
+1

Source: https://habr.com/ru/post/1616736/


All Articles