SparkContext is not serialized inside the companion object

I'm currently trying to expand a machine learning application that uses Scala and Spark. I use the structure of a previous project from Dieterich Lawson, which I found on Github

https://github.com/dieterichlawson/admm

This project mainly uses SparkContext to build RDD blocks of training samples, and then performs local calculations on each of these sets (for example, to solve a linear system).

I followed the same pattern, but for my local calculations I need to execute the L-BFGS algorithm on each block of training samples. For this, I wanted to use the L-BFGS algorithm from mlLib, which has the following signature.

runLBFGS(RDD<scala.Tuple2<Object,Vector>> data, Gradient gradient, 
         Updater updater, int numCorrections, double convergenceTol, 
         int maxNumIterations, double regParam, Vector initialWeights)

As they say, the method accepts RDD [Object, Vector] training samples as input. The problem is that locally for each worker I no longer save the RDD data structure. Therefore, I am trying to use the SparkContext parallelization function for each matrix block. But when I do this, I get an exception for the serializer. (The exact exception message is at the end of the question).

This is a detailed explanation of how I handle SparkContext.

Firstly, in the main application, it is used to open a text file and is used in the factory of the LogRegressionXUpdate class:

val A = sc.textFile("ds1.csv")
A.checkpoint
val f = LogRegressionXUpdate.fromTextFile(A,params.rho,1024,sc)

In an application, the LogRegressionXUpdate class is implemented as follows

class LogRegressionXUpdate(val training: RDD[(Double, NV)],
                           val rho: Double) extends Function1[BDV[Double],Double] with Prox  with Serializable{

def prox(x: BDV[Double], rho: Double): BDV[Double] = {
    val numCorrections = 10
    val convergenceTol = 1e-4
    val maxNumIterations = 20
    val regParam = 0.1
    val (weights, loss) = LBFGS.runLBFGS(
        training,
        new GradientForLogRegADMM(rho,fromBreeze(x)),
        new SimpleUpdater(),
        numCorrections,
        convergenceTol,
        maxNumIterations,
        regParam,
        fromBreeze(x))
    toBreeze(weights.toArray).toDenseVector
}

def apply(x: BDV[Double]): Double = {
    Math.pow(1,2.0)
}

}

With the following companion object:

object LogRegressionXUpdate {
    def fromTextFile(file: RDD[String], rho: Double, blockHeight: Int = 1024, @transient sc: SparkContext): RDF[LogRegressionXUpdate] = {
        val fns = new BlockMatrix(file, blockHeight).blocks.
        map(X => new LogRegressionXUpdate(sc.parallelize((X(*,::).map(fila => (fila(-1),fromBreeze(fila(0 to -2))))).toArray),rho))
        new RDF[LogRegressionXUpdate](fns, 0L)
    }
}

, SparkContext RDD . , @transient . , : "RDD " L-BFGS. !

:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.map(RDD.scala:293)
at admm.functions.LogRegressionXUpdate$.fromTextFile(LogRegressionXUpdate.scala:70)
at admm.examples.Lasso$.run(Lasso.scala:96)
at admm.examples.Lasso$$anonfun$main$1.apply(Lasso.scala:70)
at admm.examples.Lasso$$anonfun$main$1.apply(Lasso.scala:69)
at scala.Option.map(Option.scala:145)
at admm.examples.Lasso$.main(Lasso.scala:69)
at admm.examples.Lasso.main(Lasso.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@20576557)
- field (class: admm.functions.LogRegressionXUpdate$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class admm.functions.LogRegressionXUpdate$$anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 21 more
+4
1

RDD . , -

myRDD.map(someObject.someMethod)

spark , someMethod, . , .

, , , : RDD. , . , , , . .

"... , SparkContext RDD " - , sc.parallelize. - ( ) L-BFGS.

+6

Source: https://habr.com/ru/post/1598170/


All Articles