I have 3 log files in folders. how
foldera = emplog,deptlog,companylog folderb = emplog,deptlog,companylog folderc = emplog,deptlog,companylog
I have 3 diff scala program file for extracting data from each of them.
employee.scala department.scala companylog.scala
Each of them has the following code:
I want to combine all these files and execute them in parallel.
package com.sample import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import org.apache.spark.sql.SQLContext import org.apache.log4j.{Level, Logger} object logparser { def main(args: Array[String]) = { Logger.getLogger("org").setLevel(Level.OFF) Logger.getLogger("akka").setLevel(Level.OFF) //Start the Spark context val conf = new SparkConf() .setAppName("Parser") .setMaster("local") val sc = new SparkContext(conf) val sqlContext= new SQLContext(sc) val test = sc.wholeTextFiles("C:\\mkdir\\*\\*") .map{l => if(l._1.endsWith("emplog.txt")){ empparser(l._2,sc,sqlContext) } l } .foreach{println} } def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = { val emppattern="""[(](\d+)[)]\s([\w\s._]{30})\s+""".r import sqlContext.implicits._ val indrecs = emppattern.findAllIn(record) .map{ line => val emppattern(eid,ename) = line (eid,ename) } .toSeq .toDF("eid","ename") .show() } }
I tried using my code to bind each method to the same object.
Now 2 questions arise Q1. When I compile, I get
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@6b0615ae ) - field (class: com.sample.logparser$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext) - object (class com.sample.logparser$$anonfun$1, <function1>)
As far as I know (just a beginner) Spark context cannot be serialized. If I do not pass the sc parameter as a parameter, I get a Nullpointer Exception. How to solve this?
Q2: I will embed the hive table code in the empparser method after converting to DF. Once this is done, I do not want to do anything in my core. But my card code will not be executed if after that I have no actions. that is why i have foreacch println after that. Is there any way to overcome this problem?