Apache Flink 中的并行度 [英] Degree of parallelism in Apache Flink

查看:24
本文介绍了Apache Flink 中的并行度的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我可以在 Flink 中为我们程序中任务的不同部分设置不同程度的并行度吗?例如,Flink 如何解释以下示例代码?两个自定义从业者MyPartitioner1、MyPartitioner2,将输入数据分为4个和2个分区.

Can I set different degree of parallelism for different part of the task in our program in Flink? For instance, how does Flink interpret the following sample code? The two custom practitioners MyPartitioner1, MyPartitioner2, partition the input data two 4 and 2 partitions.

partitionedData1 = inputData1
  .partitionCustom(new MyPartitioner1(), 1);
env.setParallelism(4);
DataSet<Tuple2<Integer, Integer>> output1 = partitionedData1
  .mapPartition(new calculateFun());

partitionedData2 = inputData2
  .partitionCustom(new MyPartitioner2(), 2);
env.setParallelism(2);
DataSet<Tuple2<Integer, Integer>> output2 = partitionedData2
  .mapPartition(new calculateFun());

我收到此代码的以下错误:

I get the following error for this code:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:80)
    at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:92)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Unknown Source)

推荐答案

ExecutionEnvironment.setParallelism() 设置整个程序的并行度,即程序的所有操作符.

ExecutionEnvironment.setParallelism() sets the parallelism for the whole program, i.e., all operators of the program.

您可以通过调用运算符的 setParallelism() 方法为每个单独的运算符指定并行度.

You can specify the parallelism for each individual operator by calling the setParallelism() method on the operator.

抛出 ArrayIndexOutOfBoundsException 是因为您的自定义分区程序返回了一个无效的分区号,这可能是由于意外的并行度造成的.自定义分区器接收接收器的实际并行度作为其 partition(K key, int numPartitions) 方法中的参数.

The ArrayIndexOutOfBoundsException is thrown because your custom partitioner returns an invalid partition number probably due to the unexpected degree of parallelism. The custom partitioner receives the actual parallelism of the receiver as a parameter in its partition(K key, int numPartitions) method.

这篇关于Apache Flink 中的并行度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆