未能找到主题的负责人; org.apache.kafka.common.utils.Utils.formatAddress上的java.lang.NullPointerException NullPointerException [英] Failed to find leader for topics; java.lang.NullPointerException NullPointerException at org.apache.kafka.common.utils.Utils.formatAddress
问题描述
当我们尝试从启用SSL的Kafka主题流式传输数据时,我们将面临以下error.您能在这个问题上帮助我们吗?
When we are trying to stream the data from SSL enabled Kafka topic we are facing below error . Can you please help us on this issue .
19/11/07 13:26:54 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1573151189884] Added fetcher for partitions ArrayBuffer()
19/11/07 13:26:54 WARN ConsumerFetcherManager$LeaderFinderThread: [spark-streaming-consumer_dvtcbddc101.corp.cox.com-1573151189725-d40a510f-leader-finder-thread], Failed to find leader for Set([inst_monitor_status_test,2], [inst_monitor_status_test,0], [inst_monitor_status_test,1])
java.lang.NullPointerException
at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:408)
at kafka.cluster.Broker.connectionString(Broker.scala:62)
at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Pyspark代码:
Pyspark code :
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer
def handler(message):
records = message.collect()
for record in records:
print(record)
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 10)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
counts.pprint()
kvs.foreachRDD(handler)
ssc.start()
ssc.awaitTermination()
火花提交命令:
火花提交:
/usr/hdp/2.6.1.0-129/spark2/bin/spark-submit-打包org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,org.apache. spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 dsstream2.py主机:2181 inst_monitor_status_test
/usr/hdp/2.6.1.0-129/spark2/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 dsstream2.py host:2181 inst_monitor_status_test
推荐答案
感谢您的投入.我已通过以下方法传递了SSL参数,并按预期正常工作.
Thanks for your inputs . I have passed the SSL parameters in following method and working fine as expected.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
import time
# Spark Streaming context :
spark = SparkSession.builder.appName('PythonStreamingDirectKafkaWordCount').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)
# Kafka Topic Details :
KAFKA_TOPIC_NAME_CONS = "topic_name"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "topic_to_hdfs"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'kafka_server:9093'
# Creating readstream DataFrame :
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option("subscribe", KAFKA_TOPIC_NAME_CONS) \
.option("startingOffsets", "earliest") \
.option("kafka.security.protocol","SASL_SSL")\
.option("kafka.client.id" ,"Clinet_id")\
.option("kafka.sasl.kerberos.service.name","kafka")\
.option("kafka.ssl.truststore.location", "/home/path/kafka_trust.jks") \
.option("kafka.ssl.truststore.password", "password_rd") \
.option("kafka.sasl.kerberos.keytab","/home/path.keytab") \
.option("kafka.sasl.kerberos.principal","path") \
.load()
df1 = df.selectExpr( "CAST(value AS STRING)")
# Creating Writestream DataFrame :
df1.writeStream \
.option("path","target_directory") \
.format("csv") \
.option("checkpointLocation","chkpint_directory") \
.outputMode("append") \
.start()
ssc.awaitTermination()
这篇关于未能找到主题的负责人; org.apache.kafka.common.utils.Utils.formatAddress上的java.lang.NullPointerException NullPointerException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!