Apache Flink 中的并行度 [英] Degree of parallelism in Apache Flink
问题描述
我可以在 Flink 中为我们程序中任务的不同部分设置不同程度的并行度吗?例如,Flink 如何解释以下示例代码?两个自定义从业者MyPartitioner1、MyPartitioner2,将输入数据分为4个和2个分区.
Can I set different degree of parallelism for different part of the task in our program in Flink? For instance, how does Flink interpret the following sample code? The two custom practitioners MyPartitioner1, MyPartitioner2, partition the input data two 4 and 2 partitions.
partitionedData1 = inputData1
.partitionCustom(new MyPartitioner1(), 1);
env.setParallelism(4);
DataSet<Tuple2<Integer, Integer>> output1 = partitionedData1
.mapPartition(new calculateFun());
partitionedData2 = inputData2
.partitionCustom(new MyPartitioner2(), 2);
env.setParallelism(2);
DataSet<Tuple2<Integer, Integer>> output2 = partitionedData2
.mapPartition(new calculateFun());
我收到此代码的以下错误:
I get the following error for this code:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:80)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:92)
at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Unknown Source)
推荐答案
ExecutionEnvironment.setParallelism()
设置整个程序的并行度,即程序的所有操作符.
ExecutionEnvironment.setParallelism()
sets the parallelism for the whole program, i.e., all operators of the program.
您可以通过调用运算符的 setParallelism()
方法为每个单独的运算符指定并行度.
You can specify the parallelism for each individual operator by calling the setParallelism()
method on the operator.
抛出 ArrayIndexOutOfBoundsException
是因为您的自定义分区程序返回了一个无效的分区号,这可能是由于意外的并行度造成的.自定义分区器接收接收器的实际并行度作为其 partition(K key, int numPartitions)
方法中的参数.
The ArrayIndexOutOfBoundsException
is thrown because your custom partitioner returns an invalid partition number probably due to the unexpected degree of parallelism. The custom partitioner receives the actual parallelism of the receiver as a parameter in its partition(K key, int numPartitions)
method.
这篇关于Apache Flink 中的并行度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!