在Beam中无法通过KafkaIO从kafka读取 [英] Can't read from kafka by KafkaIO in beam

查看:182
本文介绍了在Beam中无法通过KafkaIO从kafka读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Apchea Beam中编写了一个非常简单的管道,如下所示,以从Confluent Cloud上的kafka集群中读取数据,如下所示:

I have written a very simpel pipeline in Apchea Beam as follow to read data from my kafka cluster on Confluent Cloud as follow:

        Pipeline pipeline = Pipeline.create(options);

        Map<String, Object> propertyBuilder = new HashMap();
        propertyBuilder.put("ssl.endpoint.identification.algorithm", "https");
        propertyBuilder.put("sasl.mechanism","PLAIN");
        propertyBuilder.put("request.timeout.ms","20000");
        propertyBuilder.put("retry.backoff.ms","500");

        pipeline
            .apply(KafkaIO.<byte[], byte[]>readBytes()
               .withBootstrapServers("pkc-epgnk.us-central1.gcp.confluent.cloud:9092")
               .withTopic("gcp-ingestion-1")  
               .withKeyDeserializer(ByteArrayDeserializer.class)
               .withValueDeserializer(ByteArrayDeserializer.class)
               .updateConsumerProperties(propertyBuilder)             
               .withoutMetadata() // PCollection<KV<Long, String>>
            ) .apply(Values.<byte[]>create());

但是,当我运行上述代码以从我的kafka集群中读取数据时,我却感到惊讶

However, I get below excpetion when running above codes to read data from my kafka cluster

我在直接Java运行程序上运行,我使用的是Beam 2.8,

I run above on direct java runner, I am using beam 2.8,

我可以读取我的卡夫卡融合集群并向其生成消息,但上述代码无法实现.

I can read and produce messages to my kafka confluent cluster but not by above codes.

推荐答案

如果遵循堆栈跟踪,则代码似乎会尝试将超时配置属性强制转换为Integer:

If you follow the stack trace it appears that the code tries to cast the timeout configuration property to Integer: https://github.com/apache/beam/blob/2e759fecf63d62d110f29265f9438128e3bdc8ab/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L112

但是它得到一个字符串.我的猜测是,这是因为您在此处将其设置为字符串:propertyBuilder.put("request.timeout.ms","20000").我认为正确的事情是将其设置为Integer,例如像propertyBuilder.put("request.timeout.ms", 20000)(在超时值周围没有引号).

But instead it gets a string. My guess is that this is because you set it as string here: propertyBuilder.put("request.timeout.ms","20000"). I assume the correct thing would be to set it as Integer, e.g. like propertyBuilder.put("request.timeout.ms", 20000) (no quotes around the timeout value).

其他配置属性也可能有类似的问题(例如重试退避),您需要仔细检查属性类型.

You also may have similar issues with other configuration properties (e.g. retry backoff), you need to double check the property types.

这篇关于在Beam中无法通过KafkaIO从kafka读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆