从 Apache Beam (GCP Dataflow) 写入 ConfluentCloud [英] Write to ConfluentCloud from Apache Beam (GCP Dataflow)

查看:26
本文介绍了从 Apache Beam (GCP Dataflow) 写入 ConfluentCloud的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用以下方法从 Dataflow (Apache Beam) 写入 Confluent Cloud/Kafka:

I am trying to write to write to Confluent Cloud/Kafka from Dataflow (Apache Beam), using the following:

kafkaKnowledgeGraphKVRecords.apply("Write to Kafka", KafkaIO.<String, String>write()
                                .withBootstrapServers("<mybootstrapserver>.confluent.cloud:9092")
                                .withTopic("testtopic").withKeySerializer(StringSerializer.class)
                                .withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));

where Mapprops = new HashMap<>();(即暂时为空)

where Map<String, Object> props = new HashMap<>(); (i.e. empty for now)

在日志中,我得到: send failed : 'Topic testtopic not present in metadata after 60000 ms.'

该主题确实存在于该集群上 - 所以我的猜测是登录存在问题,这是有道理的,因为我找不到传递 APIKey 的方法.

The topic does exist on this cluster - so my guess is that there is an issue with login, which makes sense as I couldn't find a way to pass the APIKey.

我确实尝试了各种组合来将我从 Confluent Cloud 拥有的 APIKey/Secret 传递给使用上面的 props 进行身份验证,但我找不到有效的设置.

I did try various combinations to pass the APIKey/Secret I have from Confluent Cloud to auth with the props above but I couldn't find a working setup.

推荐答案

找到解决方案,感谢@RobinMoffatt 问题下方评论中的提示

Found a solution, thanks to the pointers in the comments of @RobinMoffatt below the question

这是我现在的设置:

Map<String, Object> props = new HashMap<>()

props.put("ssl.endpoint.identification.algorithm", "https");
props.put("sasl.mechanism", "PLAIN");
props.put("request.timeout.ms", 20000);
props.put("retry.backoff.ms", 500);
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<APIKEY>\" password=\"<SECRET>\";");
props.put("security.protocol", "SASL_SSL");

kafkaKnowledgeGraphKVRecords.apply("Write to Kafka-TESTTOPIC", KafkaIO.<String, String>write()
    .withBootstrapServers("<CLUSTER>.confluent.cloud:9092")
    .withTopic("test").withKeySerializer(StringSerializer.class)
    .withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));

我错的关键是sasl.jaas.config(注意最后的;!)

The key line I had wrong is the sasl.jaas.config (note the ; at the end!)

这篇关于从 Apache Beam (GCP Dataflow) 写入 ConfluentCloud的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆