Pyspark Structured Streaming Kafka 配置错误 [英] Pyspark Structured Streaming Kafka configuration error

查看:24
本文介绍了Pyspark Structured Streaming Kafka 配置错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我之前成功地将 pyspark 用于 Spark Streaming (Spark 2.0.2) 和 Kafka (0.10.1.0),但我的目的更适合结构化流.我尝试在线使用示例:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

I've been using pyspark for Spark Streaming (Spark 2.0.2) with Kafka (0.10.1.0) successfully before, but my purposes are better suited for Structured Streaming. I've attempted to use the example online: https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

使用以下类似代码:

ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
query = ds1
  .writeStream
  .outputMode('append')
  .format('console')
  .start()
query.awaitTermination() 

但是,我总是以以下错误告终:

However, I always end up with the following error:

: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value

我还尝试在创建 ds1 时将其添加到我的选项集中:

I also tried adding in this to my set of options when creating ds1:

.option("partition.assignment.strategy", "range")

但即使明确地为其分配一个值也不能阻止错误,我可以在网上或 Kafka 文档中找到的任何其他值(如roundrobin")也没有.

But even explicitly assigning it a value didn't stop the error, nor did any other value (like "roundrobin") that I could find online or in the Kafka documentation.

我也用assign"选项尝试了这个并实现了同样的错误(我们的Kafka主机设置为assign——每个消费者只分配一个分区,我们没有任何重新平衡).

I also tried this with the "assign" option and achieved the same error (our Kafka host is set up for assign--each consumer is assigned only one partition, and we don't have any rebalancing).

知道这里发生了什么吗?该文档没有帮助(可能是因为它仍处于实验阶段).另外,是否有使用 KafkaUtils 进行结构化流处理?或者这是唯一的网关?

Any idea what's going on here? The documentation isn't helpful (probably since it's still in experimental phase). Also, is there anyway to do Structured Streaming using KafkaUtils? Or is this the only gateway?

推荐答案

  1. Kafka 0.10.1.* 客户端存在一个已知问题,您不应将其与 Spark 一起使用,因为它可能会因 https://issues.apache.org/jira/browse/KAFKA-4547.您可以使用 0.10.0.1 客户端,它应该可以与 0.10.1.* Kafka 集群一起使用.

  1. There is a known issue in Kafka 0.10.1.* client, and you should not use it with Spark because it may generate wrong answers due to https://issues.apache.org/jira/browse/KAFKA-4547 . You can use 0.10.0.1 client, and it should work with 0.10.1.* Kafka cluster.

要在Structured Streaming中向Kafka消费者客户端发送Kafka配置,需要添加kafka.前缀,例如.option("kafka.partition.assignment.策略",范围").但是,您不需要设置 kafka.partition.assignment.strategy 因为它有一个默认值.我的预感是您可能将 Kafka 0.8.* 和 0.10.* jar 放在类路径上并加载了错误的类.

To send a Kafka configuration to Kafka consumer client in Structured Streaming, you need to add the kafka. prefix, such as .option("kafka.partition.assignment.strategy", "range"). However, you don't need to set kafka.partition.assignment.strategy because it has a default value. My hunch is you probably put both Kafka 0.8.* and 0.10.* jars on classpath and load wrong classes.

您想使用 KafkaUtils 中的哪个 API,但在结构化流中却没有?Spark 2.2.0 刚刚发布,您可以在 Structured Streaming 中对 Kafka 使用批处理或流查询.阅读 http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html 示例.

Which API in KafkaUtils you want to use but is missing in Structured Streaming? Spark 2.2.0 is just out, you can use both batch or streaming queries with Kafka in Structured Streaming. Read http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html for examples.

这篇关于Pyspark Structured Streaming Kafka 配置错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆