How to call separate logic for diff file name in spark mode

I have 3 log files in folders. how

foldera = emplog,deptlog,companylog folderb = emplog,deptlog,companylog folderc = emplog,deptlog,companylog 

I have 3 diff scala program file for extracting data from each of them.

 employee.scala department.scala companylog.scala 

Each of them has the following code:

I want to combine all these files and execute them in parallel.

  package com.sample import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import org.apache.spark.sql.SQLContext import org.apache.log4j.{Level, Logger} object logparser { def main(args: Array[String]) = { Logger.getLogger("org").setLevel(Level.OFF) Logger.getLogger("akka").setLevel(Level.OFF) //Start the Spark context val conf = new SparkConf() .setAppName("Parser") .setMaster("local") val sc = new SparkContext(conf) val sqlContext= new SQLContext(sc) val test = sc.wholeTextFiles("C:\\mkdir\\*\\*") .map{l => if(l._1.endsWith("emplog.txt")){ empparser(l._2,sc,sqlContext) } l } .foreach{println} } def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = { val emppattern="""[(](\d+)[)]\s([\w\s._]{30})\s+""".r import sqlContext.implicits._ val indrecs = emppattern.findAllIn(record) .map{ line => val emppattern(eid,ename) = line (eid,ename) } .toSeq .toDF("eid","ename") .show() } } 

I tried using my code to bind each method to the same object.

Now 2 questions arise Q1. When I compile, I get

 Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@6b0615ae ) - field (class: com.sample.logparser$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext) - object (class com.sample.logparser$$anonfun$1, <function1>) 

As far as I know (just a beginner) Spark context cannot be serialized. If I do not pass the sc parameter as a parameter, I get a Nullpointer Exception. How to solve this?

Q2: I will embed the hive table code in the empparser method after converting to DF. Once this is done, I do not want to do anything in my core. But my card code will not be executed if after that I have no actions. that is why i have foreacch println after that. Is there any way to overcome this problem?

+5
source share
1 answer

To try to answer the question, I'm going to assume that the result of processing an employee or department leads to the same type of record. I would expect this to be different for each data type, so I separately process the processing of different records to provide this "setting with reality."

First, we define a case class entry and parsers for different types or types of entries. (Here I copy the same for simplicity)

 case class Record(id:String, name: String) val empParser: String => Option[Record] = { record => val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r record match { case pattern(eid,ename) => Some(Record(eid, ename)) case _ => None } } val deptParser: String => Option[Record] = { record => val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r record match { case pattern(eid,ename) => Some(Record(eid, ename)) case _ => None } } val companyParser: String => Option[Record] = { record => val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r record match { case pattern(eid,ename) => Some(Record(eid, ename)) case _ => None } } 

Loading data using wholeFiles :

 val dataPath = "/.../data/wholefiles/*/*" val logFiles = sc.wholeTextFiles(dataPath) 

And then we process different types of records, filtering the files to get the files we need, and apply the parser, which we defined above. Notice how we practically repeat the same process. It can be abstracted.

 val empLogs = logFiles.filter{case (filename, content) => filename.endsWith("emplog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> empParser(line))} val deptLogs = logFiles.filter{case (filename, content) => filename.endsWith("deptlog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> deptParser(line))} val compLogs = logFiles.filter{case (filename, content) => filename.endsWith("companylog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> companyParser(line))} 

Now we convert to DataFrame

 val empDF = empLogs.toDF 

And we could do the same for other types of records.

There is enough space in this process to reduce code duplication, depending on whether we can find common features in processes of different data types.

+1
source

Source: https://habr.com/ru/post/1261446/


All Articles