如何增加Flink taskmanager.numberOfTaskSlots以在没有Flink服务器的情况下运行它(在IDE或胖子中) [英] How to increase Flink taskmanager.numberOfTaskSlots to run it without Flink server(in IDE or fat jar)

查看:1234
本文介绍了如何增加Flink taskmanager.numberOfTaskSlots以在没有Flink服务器的情况下运行它(在IDE或胖子中)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个问题,关于在IDE中或以胖子形式运行Flink流作业而不将其部署到Flink服务器上.

I have one questions about running Flink streaming job in IDE or as fat jar without deploying it to Flink server.

问题是,当我的工作中有多个任务槽时,我无法在IDE中运行它.

The problem is I cannot run it in IDE when I have more than 1 taskslot in my job.

public class StreamingJob {

public static void main(String[] args) throws Exception {
    // set up the streaming execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
    kafkaProperties.setProperty("group.id", "test");
    env.setParallelism(1);

    DataStream<String> kafkaSource = env
        .addSource(new FlinkKafkaConsumer010<>("flink-source", new SimpleStringSchema(), kafkaProperties))
        .name("Kafka-Source")
        .slotSharingGroup("Kafka-Source");

    kafkaSource.print().slotSharingGroup("Print");

    env.execute("Flink Streaming Java API Skeleton");
}

}

我知道这项工作需要2个插槽,并且Flink集群中可以有两个任务管理器,但是如何在IDE中本地运行它.

I know that job need 2 slot for this job and I can have two taskmanagers in Flink cluster, but how can I run it locally in IDE.

当前,我必须为本地所有操作员指定相同的slotSharingGroup名称,以拥有一个插槽.但这并不灵活.

Currently I have to specify the same slotSharingGroup name for all operator locally to have one slot. But it's not flexible.

您如何处理?

推荐答案

这是您正在描述的已知错误.您可以在此处找到相应的JIRA问题.

This is a known bug which you are describing. You can find the corresponding JIRA issue here.

规避此问题的方法是手动设置启动TaskExecutor的任务插槽的数量.您可以通过TaskManagerOptions.NUM_TASK_SLOTS配置选项来完成此操作:

The way to circumvent this problem is to manually set the number of task slots with which the TaskExecutor is started. You can do this via the TaskManagerOptions.NUM_TASK_SLOTS configuration option:

final int parallelism = ...;
final Configuration configuration = new Configuration();
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, configuration);

这篇关于如何增加Flink taskmanager.numberOfTaskSlots以在没有Flink服务器的情况下运行它(在IDE或胖子中)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆