Flink doesn't start my application due to invalidtypes error when using java8 lambdas

I use Flink and Java8. When using lambda functions with tuples and generic types, my compiler ends with an exception

    /Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/bin/java -Didea.launcher.port=7536 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath "/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/tools.jar:/Users/hasan.guercan/Git/flink-java-project/target/classes:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-java/1.0.3/flink-java-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-core/1.0.3/flink-core-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-annotations/1.0.3/flink-annotations-1.0.3.jar:/Users/hasan.guercan/.m2/repository/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/Users/hasan.guercan/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/Users/hasan.guercan/.m2/repository/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/Users/hasan.guercan/.m2/repository/org/apache/avro/avro/1.7.6/avro-1.7.6.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-shaded-hadoop2/1.0.3/flink-shaded-hadoop2-1.0.3.jar:/Users/hasan.guercan/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/Users/hasan.guercan/.m2/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar:/Users/hasan.guercan/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/Users/hasan.guercan/.m2/repository/commons-net/commons-net/3.1/commons-net-3.1.jar:/Users/hasan.guercan/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar:/Users/hasan.guercan/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/hasan.guercan/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/hasan.guercan/.m2/repository/com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar:/Users/hasan.guercan/.m2/repository/commons-el/commons-el/1.0/commons-el-1.0.jar:/Users/hasan.guercan/.m2/repository/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar:/Users/hasan.guercan/.m2/repository/com/jamesmurty/utils/java-xmlbuilder/0.4/java-xmlbuilder-0.4.jar:/Users/hasan.guercan/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/Users/hasan.guercan/.m2/repository/commons-configuration/commons-configuration/1.7/commons-configuration-1.7.jar:/Users/hasan.guercan/.m2/repository/commons-digester/commons-digester/1.8.1/commons-digester-1.8.1.jar:/Users/hasan.guercan/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.8.8/jackson-core-asl-1.8.8.jar:/Users/hasan.guercan/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.8.8/jackson-mapper-asl-1.8.8.jar:/Users/hasan.guercan/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/Users/hasan.guercan/.m2/repository/org/xerial/snappy/snappy-java/1.0.5/snappy-java-1.0.5.jar:/Users/hasan.guercan/.m2/repository/com/jcraft/jsch/0.1.42/jsch-0.1.42.jar:/Users/hasan.guercan/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/Users/hasan.guercan/.m2/repository/io/netty/netty/3.7.0.Final/netty-3.7.0.Final.jar:/Users/hasan.guercan/.m2/repository/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar:/Users/hasan.guercan/.m2/repository/org/tukaani/xz/1.0/xz-1.0.jar:/Users/hasan.guercan/.m2/repository/commons-beanutils/commons-beanutils-bean-collections/1.8.3/commons-beanutils-bean-collections-1.8.3.jar:/Users/hasan.guercan/.m2/repository/commons-daemon/commons-daemon/1.0.13/commons-daemon-1.0.13.jar:/Users/hasan.guercan/.m2/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/Users/hasan.guercan/.m2/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/Users/hasan.guercan/.m2/repository/javax/activation/activation/1.1/activation-1.1.jar:/Users/hasan.guercan/.m2/repository/com/google/inject/guice/3.0/guice-3.0.jar:/Users/hasan.guercan/.m2/repository/javax/inject/javax.inject/1/javax.inject-1.jar:/Users/hasan.guercan/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar:/Users/hasan.guercan/.m2/repository/org/apache/commons/commons-math3/3.5/commons-math3-3.5.jar:/Users/hasan.guercan/.m2/repository/org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.jar:/Users/hasan.guercan/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar:/Users/hasan.guercan/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/force-shading/1.0.3/force-shading-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-streaming-java_2.10/1.0.3/flink-streaming-java_2.10-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-runtime_2.10/1.0.3/flink-runtime_2.10-1.0.3.jar:/Users/hasan.guercan/.m2/repository/io/netty/netty-all/4.0.27.Final/netty-all-4.0.27.Final.jar:/Users/hasan.guercan/.m2/repository/org/javassist/javassist/3.18.2-GA/javassist-3.18.2-GA.jar:/Users/hasan.guercan/.m2/repository/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar:/Users/hasan.guercan/.m2/repository/com/typesafe/akka/akka-actor_2.10/2.3.7/akka-actor_2.10-2.3.7.jar:/Users/hasan.guercan/.m2/repository/com/typesafe/config/1.2.1/config-1.2.1.jar:/Users/hasan.guercan/.m2/repository/com/typesafe/akka/akka-remote_2.10/2.3.7/akka-remote_2.10-2.3.7.jar:/Users/hasan.guercan/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/Users/hasan.guercan/.m2/repository/org/uncommons/maths/uncommons-maths/1.2.2a/uncommons-maths-1.2.2a.jar:/Users/hasan.guercan/.m2/repository/com/typesafe/akka/akka-slf4j_2.10/2.3.7/akka-slf4j_2.10-2.3.7.jar:/Users/hasan.guercan/.m2/repository/org/clapper/grizzled-slf4j_2.10/1.0.2/grizzled-slf4j_2.10-1.0.2.jar:/Users/hasan.guercan/.m2/repository/com/github/scopt/scopt_2.10/3.2.0/scopt_2.10-3.2.0.jar:/Users/hasan.guercan/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.0/metrics-core-3.1.0.jar:/Users/hasan.guercan/.m2/repository/io/dropwizard/metrics/metrics-jvm/3.1.0/metrics-jvm-3.1.0.jar:/Users/hasan.guercan/.m2/repository/io/dropwizard/metrics/metrics-json/3.1.0/metrics-json-3.1.0.jar:/Users/hasan.guercan/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.4.2/jackson-databind-2.4.2.jar:/Users/hasan.guercan/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.4.0/jackson-annotations-2.4.0.jar:/Users/hasan.guercan/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.4.2/jackson-core-2.4.2.jar:/Users/hasan.guercan/.m2/repository/com/twitter/chill_2.10/0.7.4/chill_2.10-0.7.4.jar:/Users/hasan.guercan/.m2/repository/com/twitter/chill-java/0.7.4/chill-java-0.7.4.jar:/Users/hasan.guercan/.m2/repository/org/apache/commons/commons-math/2.2/commons-math-2.2.jar:/Users/hasan.guercan/.m2/repository/org/apache/sling/org.apache.sling.commons.json/2.0.6/org.apache.sling.commons.json-2.0.6.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-clients_2.10/1.0.3/flink-clients_2.10-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-optimizer_2.10/1.0.3/flink-optimizer_2.10-1.0.3.jar:/Users/hasan.guercan/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/Users/hasan.guercan/.m2/repository/org/apache/commons/commons-lang3/3.0.1/commons-lang3-3.0.1.jar:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar" com.intellij.rt.execution.application.AppMain org.apache.flink.quickstart.exercise2.ReplyGraph
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'retrieve(ReplyGraph.java:33)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
    at org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
    at org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
    at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
    at org.apache.flink.quickstart.exercise2.ReplyGraph.retrieve(ReplyGraph.java:41)
    at org.apache.flink.quickstart.exercise2.ReplyGraph.main(ReplyGraph.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple3' are missing. 
It seems that your compiler has not stored them into the .class file. 
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. 
See the documentation for more information about how to compile jobs containing lambda expressions.
    at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1316)
    at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1302)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:346)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:304)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:119)
    at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
    at org.apache.flink.quickstart.exercise2.ReplyGraph.retrieve(ReplyGraph.java:33)
    ... 6 more

Therefore, I must create at least an anonymous class to solve the problem. The first piece of code is code that leads to the described exception:

DataSet<MailEntry> filteredUserReplyMails = replyMails.filter(entryTuple -> {
            String sender = entryTuple.getField(1).toString();
            return !sender.contains("git@") && !sender.contains("jira@");
        }).map((entry -> {
            MailEntry mailEntry = new MailEntry();
            mailEntry.messageId = entry.f0.replaceAll("<", "").replaceAll(">", "");
            mailEntry.sender = entry.f1;
            mailEntry.replyTo = entry.f2;
            return mailEntry;
        });

The following works when creating an anonymous class:

DataSet<MailEntry> filteredUserReplyMails = replyMails.filter(entryTuple -> {
            String sender = entryTuple.getField(1).toString();
            return !sender.contains("git@") && !sender.contains("jira@");
        }).map(new MapFunction<Tuple3<String, String, String>, MailEntry>() {
            @Override
            public MailEntry map(Tuple3<String, String, String> entry) throws Exception {
                MailEntry mailEntry = new MailEntry();
                mailEntry.messageId = entry.f0.replaceAll("<", "").replaceAll(">", "");
                mailEntry.sender = entry.f1;
                mailEntry.replyTo = entry.f2;
                return mailEntry;
            }
        });

The Javas lambda function is very neat. How can I solve this problem without creating an anonymous class?

+4
source share
2 answers

Try using the return method after the card:

DataSet<MailEntry> filteredUserReplyMails = replyMails.filter(entryTuple -> {
            String sender = entryTuple.getField(1).toString();
            return !sender.contains("git@") && !sender.contains("jira@");
        }).map(entry -> {
            MailEntry mailEntry = new MailEntry();
            mailEntry.messageId = entry.f0.replaceAll("<", "").replaceAll(">", "");
            mailEntry.sender = entry.f1;
            mailEntry.replyTo = entry.f2;
            return mailEntry;
        }).returns(MailEntry.class);
+3
source

Lambda Flink, javac, . https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/java8.html

Flink , Lambda Expressions, , Eclipse JDT, Eclipse Luna 4.4.2 ( ).

, Eclipse. , :

  • pom.xml, ( , )

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source> <!-- ${java.version} -->
                    <target>1.8</target>
                    <compilerId>jdt</compilerId>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.eclipse.tycho</groupId>
                        <artifactId>tycho-compiler-jdt</artifactId>
                        <version>0.21.0</version>
                    </dependency>
                </dependencies>
            </plugin>
    

    JDT- javac

  • , . " " (. ), " " ( , "-" ) " Maven Goal" ( "+" ), "" " " (. ).

, , Flink

0

Source: https://habr.com/ru/post/1653917/


All Articles