如何使用睡觉接口在没有使用的情况下从Sink(或一般称为RichFunction)获取作业名称? [英] How to get job name from a Sink (or RichFunction in general) w/o using REST API?

查看:37
本文介绍了如何使用睡觉接口在没有使用的情况下从Sink(或一般称为RichFunction)获取作业名称?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如标题所示。虽然getJobIdRuntimeContext中可用,但作业名称不可用。

https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html

尝试从配置中获取似乎效果不佳:

  @Override
  public void open(Configuration parameters) throws Exception {
    String jobName = parameters.getString(PipelineOptions.NAME); // this is null
  }

我们如何运行独立示例管道:

  public static void main(String... args) {
    try {
      ParameterTool parameterTool = ParameterTool.fromArgs(args);

      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // some pipeline setup
      env.execute("This-is-job-name");
    } catch (Exception e) {
      // logging
    }

推荐答案

假设您将作业名称作为参数传递给作业,您希望将其设置如下:

public static void main(String... args) {
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    final StreamExecutionEnvironment env =
      StreamExecutionEnvironment.getExecutionEnvironment();

    env.getConfig().setGlobalJobParameters(parameters);

然后这应该会起作用

@Override
public void open(Configuration parameters) throws Exception {
    ParameterTool params = (ParameterTool)
      getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    
    String jobName = params.get(nameOfParameterWithJobName);
}
传递给open的配置始终为空--这是一种不再使用的过时机制。未更改方法签名以避免破坏公共API。

将此类信息传递给RichFunction的另一个好方法是将其传递给构造函数。

这篇关于如何使用睡觉接口在没有使用的情况下从Sink(或一般称为RichFunction)获取作业名称?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆