Kryo: deserialize an old version of a class

I need to change the class by adding two new parameters. This class is serialized with Kryo. I currently save information related to this class, among other things, like RDD, every time I stop my thread. When I restart the stream, I load the previously saved information and use it to have consistency between when I stopped and when I restart.

Since the class I persist in need of these new parameters, I changed the class and the serializer, adding new kryo.writeObject(output, object, ObjectSerializer)and kryo.readObject(input, classOf[Object], ObjectSerializer)new parameters.

Now, when I restart my thread, I get an exception: "Discovered unregistered class ...".

This seems obvious because I'm trying to deserialize an object that is not contained in the information that I saved when I stopped the thread. If I delete this data and start the stream, as if it did not have a previous run, an exception will not occur.

Is there any way to avoid this exception? Maybe specifying some default values ​​in the absence of these parameters?

thank

EDIT:

I found something useful that I had not seen before: the Kryo 194 problem .

This guy implemented the version by simply inserting a long definition of which version of the deserializer he should use. This is a simple solution, but since the company that wrote the code I'm working on did not think about advanced compatibility, I think I will have to throw out all the data that was saved before the new serializer in the window.

, , .

2:

- . CompatibleFieldSerializer, : CompatibleFieldSerializer , , , . , java.lang.NullPointerException. - , . , , , . - .

+4
1

. , . , - Kryo FieldSerializer, . .

:

case class ClassA(field1 : Long, field2 : String)

:

object ClassASerializer extends Serializer[ClassA] with Serializable{
  override def write(kryo: Kryo, output: Output, t: ClassA) = {
      output.writeLong    { t.field1 }
      output.writeString  { t.field2 }
 }
  override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
       classA( 
           field1 = input.readLong(),
           field2 = input.readLong()
       )

Seq, , , , .

    protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ...
    final def register(kryo: Kryo) = {
         registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) }
    }

, , case.

, , Kryo "",

...
import com.esotericsoftware.kryo.serializers.FieldSerializer.Optional
import scala.annotation.meta.field
...

case class ClassA(field1 : Long, field2 : String,  @(Optional @field)("field3") field3 : ClassB)

, , 3 :

object ClassASerializer extends Serializer[ClassA] with Serializable{
  override def write(kryo: Kryo, output: Output, t: ClassA) = {
      output.writeLong    { t.field1 }
      output.writeString  { t.field2 }
      kryo.writeObject(output, Option { t.field3 } getOrElse ClassB.default, ClassBSerializer)

 }
  override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
       ClassA( 
           field1 = input.readLong(),
           field2 = input.readLong(),
           field3 = ClassB.default
       )

kryo :

    protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ...
    def optionals = Seq("field3")

    final def register(kryo: Kryo) = {
        optionals.foreach { optional =>
        kryo.getContext.asInstanceOf[ObjectMap[Any, Any]].put(optional, true) }
        registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) }
    }

. , , , .

, FieldSerializer, .

+2

Source: https://habr.com/ru/post/1652207/


All Articles