未能找到话题的领导者;java.lang.NullPointerException NullPointerException 在 org.apache.kafka.common.utils.Utils.formatAddress [英] Failed to find leader for topics; java.lang.NullPointerException NullPointerException at org.apache.kafka.common.utils.Utils.formatAddress

查看:28
本文介绍了未能找到话题的领导者;java.lang.NullPointerException NullPointerException 在 org.apache.kafka.common.utils.Utils.formatAddress的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我们尝试从启用 SSL 的 Kafka 主题流式传输数据时,我们面临以下错误.你能帮我们解决这个问题吗?

When we are trying to stream the data from SSL enabled Kafka topic we are facing below error . Can you please help us on this issue .

19/11/07 13:26:54 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1573151189884] Added fetcher for partitions ArrayBuffer()
19/11/07 13:26:54 WARN ConsumerFetcherManager$LeaderFinderThread: [spark-streaming-consumer_dvtcbddc101.corp.cox.com-1573151189725-d40a510f-leader-finder-thread], Failed to find leader for Set([inst_monitor_status_test,2], [inst_monitor_status_test,0], [inst_monitor_status_test,1])
java.lang.NullPointerException
        at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:408)
        at kafka.cluster.Broker.connectionString(Broker.scala:62)
        at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
        at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Pyspark 代码:

Pyspark code :

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer


def handler(message):
    records = message.collect()
    for record in records:
        print(record)


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
    counts.pprint()
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()

Spark 提交命令:

Spark submit command :

Spark 提交:

/usr/hdp/2.6.1.0-129/spark2/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 dsstream2.py 主机:2181 inst_monitor_status_test

/usr/hdp/2.6.1.0-129/spark2/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 dsstream2.py host:2181 inst_monitor_status_test

推荐答案

感谢您的意见.我已经通过以下方法传递了 SSL 参数,并且按预期工作正常.

Thanks for your inputs . I have passed the SSL parameters in following method and working fine as expected.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
import time

#  Spark Streaming context :

spark = SparkSession.builder.appName('PythonStreamingDirectKafkaWordCount').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)

#  Kafka Topic Details :

KAFKA_TOPIC_NAME_CONS = "topic_name"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "topic_to_hdfs"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'kafka_server:9093'

#  Creating  readstream DataFrame :

df = spark.readStream 
     .format("kafka") 
     .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) 
     .option("subscribe", KAFKA_TOPIC_NAME_CONS) 
     .option("startingOffsets", "earliest") 
     .option("kafka.security.protocol","SASL_SSL")
     .option("kafka.client.id" ,"Clinet_id")
     .option("kafka.sasl.kerberos.service.name","kafka")
     .option("kafka.ssl.truststore.location", "/home/path/kafka_trust.jks") 
     .option("kafka.ssl.truststore.password", "password_rd") 
     .option("kafka.sasl.kerberos.keytab","/home/path.keytab") 
     .option("kafka.sasl.kerberos.principal","path") 
     .load()

df1 = df.selectExpr( "CAST(value AS STRING)")

#  Creating  Writestream DataFrame :

df1.writeStream 
   .option("path","target_directory") 
   .format("csv") 
   .option("checkpointLocation","chkpint_directory") 
   .outputMode("append") 
   .start()

ssc.awaitTermination()

这篇关于未能找到话题的领导者;java.lang.NullPointerException NullPointerException 在 org.apache.kafka.common.utils.Utils.formatAddress的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆