Spark Scala Error serializing from RDD card

I have RDD format RDD [((Long, Long), (Long, Long))], and I need to hide or convert to RDD [((Long, Long), (Long, Long, Long, Long))], where the second RDD tuple is based on a function of the first RDD.

I am trying to fulfill this card-based function, but, I think I'm doing something wrong here. Please help me solve the problem.

Here is the complete code:

package com.ranker.correlation.listitem
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.Map

class ListItemCorrelation(sc: SparkContext) extends Serializable {

  def up_down(dirX: Long, dirY: Long): (Long, Long, Long, Long) = {
    if (dirX.equals(1)) {
      if (dirY.equals(1)) {
        return (1, 0, 0, 0)
      } else {
        return (0, 1, 0, 0)
      }
    } else {
      if (dirY.equals(1)) {
        return (0, 0, 1, 0)
      } else {
        return (0, 0, 0, 1)
      }
    }
  }

  def run(votes: String):  RDD[((Long, Long), (Long, Long, Long, Long))]   = {
    val userVotes = sc.textFile(votes)
    val userVotesPairs = userVotes.map { t =>
      val p = t.split(",")
      (p(0).toLong, (p(1).toLong, p(2).toLong))
    }
    val jn = userVotesPairs.join(userVotesPairs).values.filter(t => t._1._1.<(t._2._1))
    val first = jn.map(t => ((t._1._1, t._2._1), (t._1._2, t._2._2)))
    var second = first.map(t => ((t._1._1, t._2._1), up_down(t._1._2, t._2._2)))
    //More functionality
    return result
  }

}
object ListItemCorrelation extends Serializable {
  def main(args: Array[String]) {
    val votes = args(0)
    val conf = new SparkConf().setAppName("SparkJoins").setMaster("local")
    val context = new SparkContext(conf)
    val job = new ListItemCorrelation(context)
    val results = job.run(votes)
    val output = args(1)
    results.saveAsTextFile(output)
    context.stop()
  }
}

When I try to run this script, I get the following error:

"main" org.apache.spark.SparkException: org.apache.spark.util.ClosureCleaner $.ensureSerializable(ClosureCleaner.scala: 298)    org.apache.spark.util.ClosureCleaner $.org $ $ $Util $ClosureCleaner $$ (ClosureCleaner.scala: 288)    org.apache.spark.util.ClosureCleaner $. (ClosureCleaner.scala: 108)    org.apache.spark.SparkContext.clean(SparkContext.scala: 2094) org.apache.spark.rdd.RDD $$ anonfun $map $1.apply(RDD.scala: 370) at org.apache.spark.rdd.RDD $$ anonfun $map $1.apply(RDD.scala: 369) at org.apache.spark.rdd.RDDOperationScope $.withScope(RDDOperationScope.scala: 151)    org.apache.spark.rdd.RDDOperationScope $.withScope(RDDOperationScope.scala: 112)    org.apache.spark.rdd.RDD.withScope(RDD.scala: 362) at org.apache.spark.rdd.RDD.map(RDD.scala: 369) at com.ranker.correlation.listitem.ListItemCorrelation.run(ListItemCorrelation.scala: 34)    com.ranker.correlation.listitem.ListItemCorrelation $.main(ListItemCorrelation.scala: 47)    com.ranker.correlation.listitem.ListItemCorrelation.main(ListItemCorrelation.scala) : java.io.NotSerializableException: org.apache.spark.SparkContext :   - (: org.apache.spark.SparkContext, : org.apache.spark.SparkContext@4248e66b)   - (: com.ranker.correlation.listitem.ListItemCorrelation, name: sc, type: class org.apache.spark.SparkContext)   - ( com.ranker.correlation.listitem.ListItemCorrelation, com.ranker.correlation.listitem.ListItemCorrelation@270b6b5e)   - (: com.ranker.correlation.listitem.ListItemCorrelation $$ anonfun $4, name: $external, : com.ranker.correlation.listitem.ListItemCorrelation)   - ( com.ranker.correlation.listitem.ListItemCorrelation $$ anonfun $4, ) org.apache.spark.serializer.SerializationDebugger $.improveException(SerializationDebugger.scala: 40)    org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala: 46)    org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala: 100)    org.apache.spark.util.ClosureCleaner $.ensureSerializable(ClosureCleaner.scala: 295)   ... 12

:

var second = first.map(t = > ((t._1._1, t._2._1), up_down (t._1._2, t._2._2)))

scala, , .

+4
1

up_down -. RDD ( , SparkContext) . . :

package com.ranker.correlation.listitem
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.Map

object ListItemCorrelation {
  def up_down(dirX: Long, dirY: Long): (Long, Long, Long, Long) = {
    if (dirX.equals(1)) {
      if (dirY.equals(1)) {
        return (1, 0, 0, 0)
      } else {
        return (0, 1, 0, 0)
      }
    } else {
      if (dirY.equals(1)) {
        return (0, 0, 1, 0)
      } else {
        return (0, 0, 0, 1)
      }
    }
  }
}


class ListItemCorrelation(sc: SparkContext) extends Serializable {

  def run(votes: String):  RDD[((Long, Long), (Long, Long, Long, Long))]   = {
    val userVotes = sc.textFile(votes)
    val userVotesPairs = userVotes.map { t =>
      val p = t.split(",")
      (p(0).toLong, (p(1).toLong, p(2).toLong))
    }
    val jn = userVotesPairs.join(userVotesPairs).values.filter(t => t._1._1.<(t._2._1))
    val first = jn.map(t => ((t._1._1, t._2._1), (t._1._2, t._2._2)))
    var second = first.map(t => ((t._1._1, t._2._1), ListItemCorrelation.up_down(t._1._2, t._2._2)))
    //More functionality
    return result
  }

}
object ListItemCorrelation extends Serializable {
  def main(args: Array[String]) {
    val votes = args(0)
    val conf = new SparkConf().setAppName("SparkJoins").setMaster("local")
    val context = new SparkContext(conf)
    val job = new ListItemCorrelation(context)
    val results = job.run(votes)
    val output = args(1)
    results.saveAsTextFile(output)
    context.stop()
  }
}
0

Source: https://habr.com/ru/post/1671307/


All Articles