Kryo serial analyzer throwing exception in base class Scala WrappedArray

Two questions, the answer to the general one, will tell me how minimally I can create MVCE.

1) How do I know how to register a WrappedArray in front (and any other class in Scala that I can use)? Is it okay to register classes from libraries with Kryo?

and specific:

2) How to fix it? (Wanting to admit that I might have something else that happens if reflecting a false mistake here, so don't kill yourself trying to reproduce this)

DESCRIPTION

Testing the Spark program in Java using our client classes related to genetics and statistics on Spark 1.4.1, Scala 2.11.5 with the following SparkConf settings:

// for kyro serializer it wants to register all classes that need to be serialized Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class, PrintHetSharing.class}; SparkConf sparkConf = new SparkConf().setAppName("PipeLinkageData") <SNIP other settings to declare master> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //require registration of all classes with Kryo .set("spark.kryo.registrationRequired", "true") .registerKryoClasses(kryoClassArray); 

Getting this error (repeated at the end of a long list of errors):

 Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); 

But I never call this class from my code. I can add scala.collection.mutable.WrappedArray to kryoClassArray, but this does not fix the problem. If I add scala.collection.mutable.WrappedArray $ ofRef.class (as suggested in the error), which is a syntax error, can I see that I cannot declare an anonymous function here?

MVCE: I started working with MVCE, but the problem is that external libraries and text files are required to work with our classes. As soon as I crossed out our classes, I have no problem. If someone can answer a general question, this can help me find out how much of MVCE I can come up with.

As I write this question, I got the opportunity to upgrade to 1.5.2, see if there are any changes and update the question, if so.

Except for MVCE, here are my class declarations:

 public class MVCEPipeLinkageInterface extends LinkageInterface implements Serializable { class PrintHetSharing implements VoidFunction<DropResult> { class SparkDoDrop implements Function<Integer, Integer> { 

Complete errors:

 16/01/08 10:54:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 16/01/08 10:54:55 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp:// sparkExecutor@155.100.214.138 :55646/user/Executor#214759698]) with ID 0 16/01/08 10:54:55 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it. java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242) at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.makeOffers(CoarseGrainedSchedulerBackend.scala:167) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:143) at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127) at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); 
+5
source share
2 answers

In Scala, you should fix this problem by adding "scala.collection.mutable.WrappedArray.ofRef [_]" as a registered class, as in the following snippet:

 conf.registerKryoClasses( Array( ... classOf[Person], classOf[Array[Person]], ... classOf[scala.collection.mutable.WrappedArray.ofRef[_]] ) ) 
+6
source

You do not need to do everything serializable, regardless of whether it is part of the client library or not. But you need to make any lambda that takes effect for performers serializable. They do not run on the main node, so there is no way to prevent serialization (and you do not want to, since the whole purpose of Spark is distributed computing).

For examples and such (and if you have not completely understood the concept), check the white papers for this .

+2
source

Source: https://habr.com/ru/post/1240402/


All Articles