Pyspark结构化流Kafka配置错误 [英] Pyspark Structured Streaming Kafka configuration error

查看:136
本文介绍了Pyspark结构化流Kafka配置错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以前,我已经成功地将pyspark与Kafka(0.10.1.0)一起用于Spark Streaming(Spark 2.0.2),但是我的目的更适合于结构化流.我尝试在线使用示例: https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

I've been using pyspark for Spark Streaming (Spark 2.0.2) with Kafka (0.10.1.0) successfully before, but my purposes are better suited for Structured Streaming. I've attempted to use the example online: https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

具有以下类似代码:

ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
query = ds1
  .writeStream
  .outputMode('append')
  .format('console')
  .start()
query.awaitTermination() 

但是,我总是会遇到以下错误:

However, I always end up with the following error:

: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value

在创建ds1时,我还尝试将其添加到我的选项集中:

I also tried adding in this to my set of options when creating ds1:

.option("partition.assignment.strategy", "range")

但是,即使显式分配一个值也不能停止错误,我也不能在网上或在Kafka文档中找到任何其他值(例如"roundrobin").

But even explicitly assigning it a value didn't stop the error, nor did any other value (like "roundrobin") that I could find online or in the Kafka documentation.

我也使用"assign"选项尝试了此操作,并遇到了相同的错误(我们的Kafka主机设置为进行分配-每个使用者仅分配了一个分区,而我们没有任何重新平衡).

I also tried this with the "assign" option and achieved the same error (our Kafka host is set up for assign--each consumer is assigned only one partition, and we don't have any rebalancing).

你知道这里发生了什么吗?该文档没有帮助(可能因为它仍处于实验阶段).另外,是否还有使用KafkaUtils进行结构化流式传输的方法?还是这是唯一的网关?

Any idea what's going on here? The documentation isn't helpful (probably since it's still in experimental phase). Also, is there anyway to do Structured Streaming using KafkaUtils? Or is this the only gateway?

推荐答案

  1. Kafka 0.10.1.*客户端中存在一个已知问题,您不应将其与Spark一起使用,因为由于

  1. There is a known issue in Kafka 0.10.1.* client, and you should not use it with Spark because it may generate wrong answers due to https://issues.apache.org/jira/browse/KAFKA-4547 . You can use 0.10.0.1 client, and it should work with 0.10.1.* Kafka cluster.

要在结构化流中将Kafka配置发送到Kafka消费者客户端,您需要添加kafka.前缀,例如.option("kafka.partition.assignment.strategy", "range").但是,您无需设置kafka.partition.assignment.strategy,因为它具有默认值.我的直觉是您可能将Kafka 0.8.*和0.10.* jars都放在了classpath上并加载了错误的类.

To send a Kafka configuration to Kafka consumer client in Structured Streaming, you need to add the kafka. prefix, such as .option("kafka.partition.assignment.strategy", "range"). However, you don't need to set kafka.partition.assignment.strategy because it has a default value. My hunch is you probably put both Kafka 0.8.* and 0.10.* jars on classpath and load wrong classes.

您想使用KafkaUtils中的哪个API,但是结构化流中缺少哪个API? Spark 2.2.0刚刚发布,您可以在结构化流中将批查询或流查询与Kafka一起使用.阅读 http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html 例如.

Which API in KafkaUtils you want to use but is missing in Structured Streaming? Spark 2.2.0 is just out, you can use both batch or streaming queries with Kafka in Structured Streaming. Read http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html for examples.

这篇关于Pyspark结构化流Kafka配置错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆