I am launching my Spark application for the client yarn model, the spark-submit function looks like this:
spark-submit --master yarn --deploy-mode client --num-executors 256 --driver-memory 2g --executor-memory 10G --class sparkapps.SparkApp1 /home/hadoop/SparkSBT-assembly-1.0.jar 20150805 256 s3path
and my program is this:
object SparkApp1 {
case class RTBRow(receiveTime: String, tag: String, jsonData: String)
val format = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss Z", Locale.CHINA)
def sparkJob() = {
val spark = SparkSession.builder
.appName("HiveFromSpark")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.yarn.executor.memoryOverhead", "2048m")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
val lines = spark.read.textFile(s"s3:...$date/*")
val rtbRows = lines.map(_.split("\t", 3)).map(l => RTBRow(l(0), l(1), l(2))).repartition(1024).toDF()
val retValue = rtbRows
.select($"receiveTime", $"tag",
get_json_object($"jsonData", "$.req.device").alias("device"),
get_json_object($"jsonData", "$.req.imp").alias("imp"),
get_json_object($"jsonData", "$.req.app.content.keywords").alias("app_keywords"),
get_json_object($"jsonData", "$.req.app.content.ext").alias("app_ext"),
get_json_object($"jsonData", "$.req.app.content.title").alias("app_title"),
get_json_object($"jsonData", "$.req.app.name").alias("app_name"),
get_json_object($"jsonData", "$.req.iploc").alias("iploc"))
.select($"receiveTime", $"tag", json_tuple($"device", "didmd5", "os", "osv", "ua", "ip").as(Array("didmd5", "os", "osv", "ua", "ip")),
get_json_object($"imp", "$[0].tagid").alias("tagid"), $"app_keywords",
get_json_object($"app_ext", "$.cs").alias("cs"), get_json_object($"app_ext", "$.s").alias("s"), get_json_object($"app_ext", "$.vid").alias("vid"),
get_json_object($"app_ext", "$.channel").alias("channel"), $"app_title", $"app_name", $"iploc")
.filter(l => !l.anyNull).na.fill("", Seq("iploc")).rdd
.map {
case Row(receiveTime: String, tag: String, didmd5: String, os: String, osv: String, ua: String, ip: String, tagid: String,
app_keywords: String, cs: String, s: String, vid: String, channel: String, app_title: String, app_name: String, iploc: String) =>
val uas = ua.split(";")
val locArray = iploc.split("\t").slice(0, 2)
var province = ""
var city = ""
if (locArray.length == 2) {
province = locArray(0)
city = locArray(1)
} else if (locArray.length == 1) {
val province = locArray(0)
val city = ""
}
((didmd5, os, uas(uas.length - 1), province, city), (app_name, app_keywords))
}
.groupByKey(numPartitions = 1024)
.map { l =>
val app_names = l._2.map(p => p._1).toSeq.distinct.slice(0, 10).mkString("|")
val keywords = l._2.map(p => p._2).flatMap(_.split("\\|")).toSeq.distinct.slice(0, 10).mkString("|")
Array(l._1._1, l._1._2, l._1._3, l._1._4, l._1._5, app_names, keywords).mkString("\001")
}
retValue.repartition(part).saveAsTextFile(output_path)
spark.stop()
}
var date = 20160710
var part = 64
var output_path = ""
def main(args: Array[String]) {
if (args.length == 0)
sparkJob()
else {
args match {
case Array(d: String, p: String, o: String) => this.date = d.toInt; this.part = p.toInt; this.output_path = o; sparkJob()
}
}
}
}
The program is not very complicated, and I do not forget the spark.stop () method. In the beginning, even the whole step is completed, this program does not stop, because I forgot the stop () method. But now, when an exception occurs, for example, βthe output path already existsβ or βskip type matchβ, error messages can be printed on the screen without automatically stopping the application. I google and search for stackoverflow, but no one has the same problem, any of which has some idea about this problem?