Spark programming: the best way to organize context import and others with multiple functions

In toy examples, it’s easy and simple to show how to program with a spark. You simply import, create, use and drop, all in one small function.

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext

def main(args: String) {
  val conf = new SparkConf().setAppName("example")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  val hiveContext = new HiveContext(sc)
  import hiveContext.implicits._
  import hiveContext.sql

  // load data from hdfs
  val df1 = sqlContext.textFile("hdfs://.../myfile.csv").map(...)
  val df1B = sc.broadcast(df1)

  // load data from hive
  val df2 = sql("select * from mytable")
  // transform df2 with df1B
  val cleanCol = udf(cleanMyCol(df1B)).apply("myCol")
  val df2_new = df2.withColumn("myCol", cleanCol)

  ...

  sc.stop()
}

In the real world, I find myself several functions for modulating tasks. For example, I would have several functions just to load different data tables. And in these load functions, I would call other functions to perform the necessary data cleaning / conversion when loading data. Then I would pass such contexts:

 def loadHdfsFileAndBroadcast(sc: SparkContext) = {
   // use sc here
   val df = sc.textFile("hdfs://.../myfile.csv").map(...)
   val dfB = sc.broadcast(df)
   dfB
 }

 def loadHiveTable(hiveContext: HiveContext, df1B: Broadcast[Map[String, String]]) = {
   import hiveContext.implicits._
   val data = hiveContext.sql("select * from myHiveTable")
   // data cleaning
   val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
   df_cleaned = data.withColumn("myCol", cleanCol)
   df_cleaned
 }

As you can see, the signatures of the download function are getting pretty heavy.

. (. ), , .

?

+4
2

-, , , - Spark. , , , , .

, Spark . , , , , , , - , .

:

trait UsingSparkContextTrait {
   def sc: SparkContext

   def loadHdfsFileAndBroadcast = {
      val df = sc.textFile("hdfs://.../myfile.csv").map(...)
      sc.broadcast(df)
 }
}

trait UsingHiveContextTrait {
   def hiveContext: HiveContext
   def df1B: Broadcast[Map[String, String]]
   def loadHiveTable = {
      val data = hiveContext.sql("select * from myHiveTable")
      val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
      data.withColumn("myCol", cleanCol)
 }
}

:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext

class ClassDoingWork extends UsingSparkContextTrait with UsingHiveContextTrait {
   val conf = new SparkConf().setAppName("example")
   val sc = new SparkContext(conf) //Satisfies UsingSparkContextTrait
   val sqlContext = new SQLContext(sc)

   val hiveContext = new HiveContext(sc) //Satisfies UsingHiveContextTrait
   val dfb = loadHdfsFileAndBroadcast    //Satisfies UsingHiveContextTrait
   import hiveContext.implicits._
   import hiveContext.sql

   def doAnalytics = {
      val dfCleaned = loadHiveTable
      ...
   }
}

- , , - , .

, SparkContext RDD rdd.context. .

+3

/, / . , /.

,

object testObject {
  def main(args: Array[String]): Unit = {
    val sconf = new SparkConf().setMaster("local[2]").setAppName("testObj")
    val rootLogger = Logger.getRootLogger
    rootLogger.setLevel(Level.ERROR)
    implicit val sc = new SparkContext(sconf)
    implicit val sqlContext = new SQLContext(sc)
    new foo().run()
  }
}

,

case class OneVal(value: String)
class foo(implicit val sc: SparkContext, implicit val sqlC: SQLContext){
  import sqlC.implicits._
  def run(): Unit ={
    doStuff().show(1)
    doOtherStuff().show(1)
  }
  def doStuff(): DataFrame ={
    sc.parallelize(List(OneVal("test"))).toDF()
  }
  def doOtherStuff(): DataFrame ={
    sc.parallelize(List(OneVal("differentTest"))).toDF()
  }
}

SQLContext.toDF .

,

+-----+
|value|
+-----+
| test|
+-----+

+-------------+
|        value|
+-------------+
|differentTest|
+-------------+
+1

Source: https://habr.com/ru/post/1674031/


All Articles