Merging datasets in ApacheFlink

I am trying to merge Seq[DataSet(Long,Long,Double)]into one DataSet[(Long,Long,Double)]in Flink:

     val neighbors= graph.map(el => zKnn.neighbors(results,
      el.vector, 150, metric)).reduce(
     (a, b) => a.union(b)
      ).collect()

Where the graph is a regular set of scala, but can be converted to a DataSet; the results are DataSet[Vector]and should not be collected and necessary in the neighbors method

I always get FlinkRuntime Exeption:

cannot handle nodes with more than 64 outputs. org.apache.flink.optimizer.CompilerException: Cannot process nodes with more than 64 outputs. at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection (OptimizerNode.java{47) at org.apache.flink.optimizer.dag.SingleInputNode.setInput (SingleInputNode.java:202

+4
source share
1 answer

Flink 64 .

64 . - :

DataSet level1a = data1.union(data2.union(data3...(data64))).map(new IDMapper());
DataSet level1b = data65.union(data66...(data128))).map(new IDMapper());
DataSet level2 = level1a.union(level1b)
+4

Source: https://habr.com/ru/post/1599556/


All Articles