Apache Flink中的并行度 [英] Degree of parallelism in Apache Flink

查看:240
本文介绍了Apache Flink中的并行度的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我可以在Flink程序中为任务的不同部分设置不同程度的并行度吗? 例如,Flink如何解释以下示例代码? 两个自定义实践者MyPartitioner1,MyPartitioner2将输入数据划分为4个分区和2个分区.

Can I set different degree of parallelism for different part of the task in our program in Flink? For instance, how does Flink interpret the following sample code? The two custom practitioners MyPartitioner1, MyPartitioner2, partition the input data two 4 and 2 partitions.

partitionedData1 = inputData1
  .partitionCustom(new MyPartitioner1(), 1);
env.setParallelism(4);
DataSet<Tuple2<Integer, Integer>> output1 = partitionedData1
  .mapPartition(new calculateFun());

partitionedData2 = inputData2
  .partitionCustom(new MyPartitioner2(), 2);
env.setParallelism(2);
DataSet<Tuple2<Integer, Integer>> output2 = partitionedData2
  .mapPartition(new calculateFun());

此代码出现以下错误:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:80)
    at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:92)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Unknown Source)

推荐答案

ExecutionEnvironment.setParallelism()设置整个程序(即程序的所有运算符)的并行性.

ExecutionEnvironment.setParallelism() sets the parallelism for the whole program, i.e., all operators of the program.

您可以通过在运算符上调用setParallelism()方法来为每个运算符指定并行度.

You can specify the parallelism for each individual operator by calling the setParallelism() method on the operator.

ArrayIndexOutOfBoundsException之所以被抛出,是因为您的自定义分区程序返回了无效的分区号,这可能是由于意外的并行度所致.自定义分区程序在其partition(K key, int numPartitions)方法中以参数的形式接收接收器的实际并行度.

The ArrayIndexOutOfBoundsException is thrown because your custom partitioner returns an invalid partition number probably due to the unexpected degree of parallelism. The custom partitioner receives the actual parallelism of the receiver as a parameter in its partition(K key, int numPartitions) method.

这篇关于Apache Flink中的并行度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆