如何摆脱NoSuchMethodError:Spark Streaming + Kafka中的org.apache.kafka.clients.consumer.KafkaConsumer.subscribe错误 [英] How to get rid of NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe error in Spark Streaming + Kafka

查看:172
本文介绍了如何摆脱NoSuchMethodError:Spark Streaming + Kafka中的org.apache.kafka.clients.consumer.KafkaConsumer.subscribe错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用Spark Streaming并将其与Kafka连接.但是我仍然收到NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe错误,现在我不知道下一步该怎么做.

I would like to use Spark Streaming and connect it with Kafka. However I still get NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe error and now I just do not know what to do next.

我的设置:

Ubuntu 16.04

Ubuntu 16.04

Scala 2.11

Scala 2.11

Kafka 2.11-1.0.0(我也尝试使用2.11-0.10.0.0)

Kafka 2.11-1.0.0 (I have also tried to use 2.11-0.10.0.0)

火花2.2.1

Hadoop 2.9.0

Hadoop 2.9.0

我什至无法运行示例脚本:

I cannot even run the example script:

from pyspark.sql import SparkSession, Row, SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *

sparkSession = SparkSession.builder.appName("example-pyspark-read-and-write").getOrCreate() 

# Subscribe to 1 topic
df = sparkSession\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("kafka.partition.assignment.strategy", "range") \
  .option("subscribe", "test")\
  .option("startingOffsets", "earliest")\
  .load() 

query = df.writeStream\
    .format("console")\
    .start()

query.awaitTermination()

我使用

spark-submit --master local[2] --jars /home/some_path/spark-sql-kafka-0-10_2.11-2.2.1.jar spark_streaming_kafka_example.py 

我得到一个错误

Exception in thread "stream execution thread for [id = 38ee73d5-4f20-41d0-ac89-a29c3f3255d1, runId = dadfc8ab-8e4c-464f-b4ef-495426aafc88]" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
    at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:63)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
    at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:244)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
    at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
    at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
    at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
    at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Traceback (most recent call last):
  File "/home/some_path/spark_streaming_kafka_example.py", line 41, in <module>
    query.awaitTermination()
  File "/usr/local/spark/spark-2.2.1-bin-without-hadoop/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 106, in awaitTermination
  File "/usr/local/spark/spark-2.2.1-bin-without-hadoop/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/local/spark/spark-2.2.1-bin-without-hadoop/python/lib/pyspark.zip/pyspark/sql/utils.py", line 75, in deco
pyspark.sql.utils.StreamingQueryException: u'org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V\n=== Streaming Query ===\nIdentifier: [id = 38ee73d5-4f20-41d0-ac89-a29c3f3255d1, runId = dadfc8ab-8e4c-464f-b4ef-495426aafc88]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {}\n\nCurrent State: INITIALIZING\nThread State: RUNNABLE'

我还试图在--jars中包含spark-streaming-kafka-0-10-assembly_2.11-2.2.1(而不是spark-sql-kafka-0-10_2.11-2.2.1.jar ),但也无济于事.

I have also tried to include spark-streaming-kafka-0-10-assembly_2.11-2.2.1 in --jars (instead of spark-sql-kafka-0-10_2.11-2.2.1.jar), but it didn't help either.

我也试图将这两行放入spark-defaults.conf

Also I have tried to put those two lines in spark-defaults.conf

spark.driver.extraClassPath /home/some_path/spark-streaming-kafka-0-10-assembly_2.11-2.2.1.jar
spark.executor.extraClassPath /home/some_path/spark-streaming-kafka-0-10-assembly_2.11-2.2.1.jar

OR

spark.driver.extraClassPath /home/some_path/spark-sql-kafka-0-10_2.11-2.2.1.jar
spark.executor.extraClassPath /home/some_path/spark-sql-kafka-0-10_2.11-2.2.1.jar

推荐答案

尝试在spark2-submit命令之前运行export SPARK_KAFKA_VERSION=0.10.

Try running export SPARK_KAFKA_VERSION=0.10 before your spark2-submit command. [Source for solution]

这篇关于如何摆脱NoSuchMethodError:Spark Streaming + Kafka中的org.apache.kafka.clients.consumer.KafkaConsumer.subscribe错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆