如何增加 Flink taskmanager.numberOfTaskSlots 以在没有 Flink 服务器的情况下运行它(在 IDE 或 fat jar 中) [英] How to increase Flink taskmanager.numberOfTaskSlots to run it without Flink server(in IDE or fat jar)
问题描述
我有一个关于在 IDE 中运行 Flink 流作业或作为胖 jar 而不将其部署到 Flink 服务器的问题.
I have one questions about running Flink streaming job in IDE or as fat jar without deploying it to Flink server.
问题是当我的工作中有超过 1 个任务槽时,我无法在 IDE 中运行它.
The problem is I cannot run it in IDE when I have more than 1 taskslot in my job.
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaProperties.setProperty("group.id", "test");
env.setParallelism(1);
DataStream<String> kafkaSource = env
.addSource(new FlinkKafkaConsumer010<>("flink-source", new SimpleStringSchema(), kafkaProperties))
.name("Kafka-Source")
.slotSharingGroup("Kafka-Source");
kafkaSource.print().slotSharingGroup("Print");
env.execute("Flink Streaming Java API Skeleton");
}
}
我知道这个作业需要 2 个插槽,我可以在 Flink 集群中有两个任务管理器,但是我如何在 IDE 中本地运行它.
I know that job need 2 slot for this job and I can have two taskmanagers in Flink cluster, but how can I run it locally in IDE.
目前我必须在本地为所有运营商指定相同的 slotSharingGroup 名称才能拥有一个插槽.但它不灵活.
Currently I have to specify the same slotSharingGroup name for all operator locally to have one slot. But it's not flexible.
你是如何处理的?
推荐答案
这是您描述的已知错误.您可以在此处找到相应的 JIRA 问题.
This is a known bug which you are describing. You can find the corresponding JIRA issue here.
绕过这个问题的方法是手动设置启动TaskExecutor
的任务槽数.您可以通过 TaskManagerOptions.NUM_TASK_SLOTS
配置选项执行此操作:
The way to circumvent this problem is to manually set the number of task slots with which the TaskExecutor
is started. You can do this via the TaskManagerOptions.NUM_TASK_SLOTS
configuration option:
final int parallelism = ...;
final Configuration configuration = new Configuration();
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, configuration);
这篇关于如何增加 Flink taskmanager.numberOfTaskSlots 以在没有 Flink 服务器的情况下运行它(在 IDE 或 fat jar 中)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!