Degree of parallelism in Apache Flink

Can I set a different degree of parallelism for another part of the task in our program in Flink? For example, how does Flink interpret the following code sample? Two custom specialists MyPartitioner1, MyPartitioner2 split the input into two 4 and 2 sections.

partitionedData1 = inputData1 .partitionCustom(new MyPartitioner1(), 1); env.setParallelism(4); DataSet<Tuple2<Integer, Integer>> output1 = partitionedData1 .mapPartition(new calculateFun()); partitionedData2 = inputData2 .partitionCustom(new MyPartitioner2(), 2); env.setParallelism(2); DataSet<Tuple2<Integer, Integer>> output2 = partitionedData2 .mapPartition(new calculateFun()); 

I get the following error for this code:

 Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:80) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:92) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Unknown Source) 
+5
source share
1 answer

ExecutionEnvironment.setParallelism() sets parallelism for the whole program, i.e. all program operators.

You can specify parallelism for each individual statement by calling the setParallelism() method on the statement.

ArrayIndexOutOfBoundsException because your custom separator returns an invalid partition number, probably due to an unexpected degree of parallelism. The custom delimiter receives the actual parallelism of the receiver as a parameter in its partition(K key, int numPartitions) method partition(K key, int numPartitions) .

+5
source

Source: https://habr.com/ru/post/1237266/


All Articles