带有Confluent Cloud Kafka连接性问题的Spark结构流 [英] Spark Structural Streaming with Confluent Cloud Kafka connectivity issue

查看:79
本文介绍了带有Confluent Cloud Kafka连接性问题的Spark结构流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在PySpark中编写一个Spark结构的流应用程序,以从Confluent Cloud中的Kafka读取数据.spark readstream()函数的文档太浅,在可选参数部分(特别是在auth机制部分)没有指定太多.我不确定哪个参数出错并导致连接崩溃.任何有Spark经验的人都可以帮助我开始此连接吗?

I am writing a Spark structured streaming application in PySpark to read data from Kafka in Confluent Cloud. The documentation for the spark readstream() function is too shallow and didn't specify much on the optional parameter part especially on the auth mechanism part. I am not sure what parameter goes wrong and crash the connectivity. Can anyone have experience in Spark help me to start this connection?

必需参数

> Consumer({'bootstrap.servers':
> 'cluster.gcp.confluent.cloud:9092',
>               'sasl.username':'xxx',
>               'sasl.password':  'xxx',
>               'sasl.mechanisms': 'PLAIN',
>               'security.protocol': 'SASL_SSL',
>     'group.id': 'python_example_group_1',
>     'auto.offset.reset': 'earliest' })

这是我的pyspark代码:

Here is my pyspark code:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "cluster.gcp.confluent.cloud:9092") \
  .option("subscribe", "test-topic") \
  .option("kafka.sasl.mechanisms", "PLAIN")\
  .option("kafka.security.protocol", "SASL_SSL")\
  .option("kafka.sasl.username","xxx")\
  .option("kafka.sasl.password", "xxx")\
  .option("startingOffsets", "latest")\
  .option("kafka.group.id", "python_example_group_1")\
  .load()
display(df)

但是,我不断收到错误消息:

However, I keep getting an error:

kafkashaded.org.apache.kafka.common.KafkaException:无法构建卡夫卡消费者

kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

DataBrick笔记本-用于测试

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4673082066872014/3543014086288496/1802788104169533/latest.html

文档

推荐答案

我们需要指定 kafka.sasl.jaas.config 来添加Confluent Kafka SASL-SSL身份验证方法的用户名和密码.它的参数看起来有些奇怪,但是可以正常工作.

We need to specified kafka.sasl.jaas.config to add the username and password for the Confluent Kafka SASL-SSL auth method. Its parameter looks a bit odd, but it's working.

df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "pkc-43n10.us-central1.gcp.confluent.cloud:9092") \
      .option("subscribe", "wallet_txn_log") \
      .option("startingOffsets", "earliest") \
      .option("kafka.security.protocol","SASL_SSL") \
      .option("kafka.sasl.mechanism", "PLAIN") \
      .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";""").load()
display(df)

这篇关于带有Confluent Cloud Kafka连接性问题的Spark结构流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆