Error Flink 1.4 AvroUtils

I am trying to send a job to Flink 1.4 and get the following exception.

Any idea how to solve the problem?

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.VerifyError: Bad type on operand stack Exception Details: Location: org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V @23: invokespecial Reason: Type 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' (current frame, stack[7]) is not assignable to 'com/esotericsoftware/kryo/Serializer' Current Frame: bci: @23 flags: { } locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils', 'java/util/LinkedHashMap' } stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6, uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12, 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' } Bytecode: 0x0000000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59 0x0000010: bb00 0659 b700 0eb7 000f b700 10b6 0011 0x0000020: 57b1 at java.lang.Class.getDeclaredConstructors0(Native Method) at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) at java.lang.Class.getConstructor0(Class.java:3075) at java.lang.Class.getConstructor(Class.java:1825) at org.apache.flink.api.java.typeutils.AvroUtils.getAvroUtils(AvroUtils.java:48) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:481) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(KryoSerializer.java:119) at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:90) at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107) at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52) at org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102) at org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:253) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:520) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:165) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:692) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) 
+5
source share
2 answers

thanks toch in the previous answer - he gave me the direction to solve

 <exclude>com.esotericsoftware.kryo:kryo</exclude> 

already been excluded in our pom.xml

I noticed that kryo-shaded was turned on

I excluded pom from adding

 <exclude>com.esotericsoftware:kryo-shaded</exclude> 

and he solves the problem

+2
source

This is a conflict between Kryo lib and jar and one already loaded into Flink.

Indeed, Serializers $ SpecificInstanceCollectionSerializerForArrayList inherits from com.esotericsoftware.kryo.Serializer through the SpecificInstanceCollectionSerializer and CollectionSerializer. (see Serializers.java in Flink and CollectionSerializer.java in Kryo).

We will fix it by excluding Kryo lib from the work of Jar. In Gradle build.gradle, this is done with (if you use the ShadowJar plugin):

 configurations { runtime.exclude module: 'kryo' , group: 'com.esotericsoftware.kryo' } 

In Maven pom.xml, this is done with (if you are using a shadow plugin):

 <exclude>com.esotericsoftware.kryo:kryo</exclude> 
+5
source

Source: https://habr.com/ru/post/1274251/


All Articles