Why is the code inside RDD.map not executing in YARN?

I am using Hortonworks 2.6 with 5 nodes. I'm spark-submitup to YARN (with 16 GB of RAM and 4 cores).

I have an RDD conversion that works fine in local, but not with the yarnmain URL.

rdd1 has such meanings as:

id  name    date
1   john    10/05/2001 (dd/mm/yyyy)
2   steve   11/06/2015

I would like to change the date format from dd/mm/yyyyto mm/dd/yy, so I wrote a method transformations.transformthat I use in the function RDD.mapas follows:

rdd2 = rdd1.map { rec => (rec.split(",")(0), transformations.transform(rec)) }

transformations.transform the method is as follows:

object transformations {
  def transform(t: String): String = {
    val msg = s">>> transformations.transform($t)"
    println(msg)     
    msg
  }
}

Actually, the above code works fine locally, but not in the cluster. The method simply returns the result, as if it maplooked like this:

rdd2 = rdd1.map { rec => (rec.split(",")(0), rec) } 

recdoes not seem to be passed to the method transformations.transform.

I use the action to run the method transformations.transform(), but no luck.

val rdd3 = rdd2.count()
println(rdd3)

println , transformations.transform. ?

+4
3

tl; dr Hadoop yarn logs -applicationId ( println Spark ). YARN, sbin/stop-yarn.sh, sbin/start-yarn.sh ( sbin/stop-all.sh sbin/start-all.sh).


, println YARN, , , Spark spark-submit 'ed YARN, YARN, ApplicationMaster Spark.

RDD.map - , Spark ( RDD). , println .

. local JVM , ( ).

, println - ResourceManager http://localhost:8088/cluster Spark.

RM web interface

, yarn.log-aggregation-enable YARN ( article ):

// etc/hadoop/yarn-site.xml
<property>
  <name>yarn.log-aggregation-enable</name>
  <value>true</value>
</property>
<property>
  <name>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds</name>
  <value>3600</value>
</property>

spark-submit --master yarn Spark, yarn logs -applicationId ( yarn logs -applicationId application_ID > output.txt output.txt).

>>> transformations.transform(1,john,10/05/2001).

, , :

import org.apache.spark.SparkContext

object HelloRdd extends App {

  object transformations {
    def transform(t: String): String = {
      val msg = s">>> transformations.transform($t)"
      println(msg)
      msg
    }
  }

  val sc = SparkContext.getOrCreate()
  val rdd1 = sc.textFile(args(0))
  val rdd2 = rdd1.map { rec => (rec.split(",")(0), transformations.transform(rec)) }
  rdd2.count()
}

spark-submit, .

$ HADOOP_CONF_DIR=/tmp ~/dev/apps/spark/bin/spark-submit \
  --master yarn \
  target/scala-2.11/spark-project_2.11-0.1.jar `pwd`/hello.txt
+2

,

, if, else

else, if, , .

, , , , . ClassNotFoundException AbstractMethodError; Scala .

, , transformation var. , , , , ( if). , transformation node, .

, https://spark.apache.org/docs/latest/rdd-programming-guide.html#local-vs-cluster-modes:

- , , - . Spark , . , , , , , , .

+1

RDD.map ?
   (dd/mm/yyyy) (mm/dd/yy), , () map()

If you want to change only the dateformat format , I would suggest you not go through such difficulties , since it is very difficult to analyze the cause of the problem. I suggest you use dataframes instead of rdds, as there are many built-in functions to meet your needs. For your specific requirements to_dateand date_formatbuilt-in functions should do the trick

First of all, read the data in the dataframe as

val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", true)
  .load("path to the data file")

Then just apply the functions to_dateand date_formathow

import org.apache.spark.sql.functions._
df.withColumn("date2", date_format(to_date(col("date"), "dd/MM/yyyy"), "MM/dd/yy")).show(false)

and you should get

+---+-----+----------+--------+
|id |name |date      |date2   |
+---+-----+----------+--------+
|1  |john |10/05/2001|05/10/01|
|2  |steve|11/06/2015|06/11/15|
+---+-----+----------+--------+

Simple right?

0
source

Source: https://habr.com/ru/post/1695384/


All Articles