如何修复不兼容的类型:org.apache.beam.sdk.options.ValueProvider<java.lang.String>无法转换为java.lang.String" [英] How to fix "incompatible types: org.apache.beam.sdk.options.ValueProvider<java.lang.String> cannot be converted to java.lang.String"

查看:0
本文介绍了如何修复不兼容的类型:org.apache.beam.sdk.options.ValueProvider<java.lang.String>无法转换为java.lang.String"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我按照this link创建了一个模板,该模板构建了一个从KafkaIO读取的光束管道。但我总是遇到"不兼容的类型:org.apache.beam.sdk.options.ValueProvider无法转换为java.lang.String"。导致错误的是行".withBootstrapServers(options.getKafkaServer())"。BEAM版本为2.9.0,以下是我的部分代码。

public interface Options extends PipelineOptions {
    @Description("Kafka server")
    @Required
    ValueProvider<String> getKafkaServer();

    void setKafkaServer(ValueProvider<String> value);

    @Description("Topic to read from")
    @Required
    ValueProvider<String> getInputTopic();

    void setInputTopic(ValueProvider<String> value);

    @Description("Topic to write to")
    @Required
    ValueProvider<String> getOutputTopic();

    void setOutputTopic(ValueProvider<String> value);

    @Description("File path to write to")
    @Required
    ValueProvider<String> getOutput();

    void setOutput(ValueProvider<String> value);
}

public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline p = Pipeline.create(options);

    PCollection<String> processedData = p.apply(KafkaIO.<Long, String>read()
            .withBootstrapServers(options.getKafkaServer())
            .withTopic(options.getInputTopic())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withoutMetadata() 
    )

以下是我运行代码的方式:

mvn compile exec:java 
-Dexec.mainClass=${MyClass} 
-Pdataflow-runner -Dexec.args=" 
--project=${MyClass} 
--stagingLocation=gs://${MyBucket}/staging 
--tempLocation=gs://${MyBucket}/temp 
--templateLocation=gs://${MyBucket}/templates/${MyClass} 
--runner=DataflowRunner"

推荐答案

要通过ValueProvider访问值,需要使用get方法,然后获取具有其具体类型的值。

例如: 当有选项时:

ValueProvider<String> getKafkaServer();

可以通过以下方式访问:

getKafkaServer().get()这将返回您的字符串对象。

KafkaIo Api似乎需要获取字符串参数,而不是ValueProvider,您必须从ValueProvider包装中提取该值。

这篇关于如何修复不兼容的类型:org.apache.beam.sdk.options.ValueProvider&lt;java.lang.String&gt;无法转换为java.lang.String&quot;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆