KafkaSpout 元组重放抛出空指针异常 [英] KafkaSpout tuple replay throws null pointer exception

查看:33
本文介绍了KafkaSpout 元组重放抛出空指针异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用storm 1.0.1和Kafka 0.10.0.0以及storm-kafka-client 1.0.3.

I am using storm 1.0.1 and Kafka 0.10.0.0 with storm-kafka-client 1.0.3.

请在下面找到我的代码配置.

please find the code config I have below.

kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.ByteArrayDeserializer");


            KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreamsNamedTopics.Builder(new Fields(fieldNames), topics)
                    .build();

            KafkaSpoutRetryService retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
                                TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));


            KafkaSpoutTuplesBuilder tuplesBuilder = new KafkaSpoutTuplesBuilderNamedTopics.Builder(new TestTupleBuilder(topics))
                        .build();

            KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService)
                                                                .setOffsetCommitPeriodMs(10_000)
                                                                .setFirstPollOffsetStrategy(LATEST)
                                                                .setMaxRetries(5)
                                                                .setMaxUncommittedOffsets(250)
                                                                .build();

当我失败时,元组不会被重播.Spout 抛出以下错误.请告诉我为什么它会抛出空指针异常.

When I fail the tuple its not getting replayed. Spout throws below error. Please let me know why it's throwing nullpointer exception.

53501 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.util - Async loop died!
java.lang.NullPointerException
    at org.apache.storm.kafka.spout.KafkaSpout.doSeekRetriableTopicPartitions(KafkaSpout.java:260) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:248) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:203) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]
53501 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.d.executor - 
java.lang.NullPointerException
    at org.apache.storm.kafka.spout.KafkaSpout.doSeekRetriableTopicPartitions(KafkaSpout.java:260) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:248) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:203) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]
53527 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.1.jar:1.0.1]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.8.0.jar:?]
    at org.apache.storm.daemon.worker$fn__8554$fn__8555.invoke(worker.clj:761) [storm-core-1.0.1.jar:1.0.1]
    at org.apache.storm.daemon.executor$mk_executor_data$fn__7773$fn__7774.invoke(executor.clj:271) [storm-core-1.0.1.jar:1.0.1]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494) [storm-core-1.0.1.jar:1.0.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]

请在下面找到完整的 spout 配置{key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, group.id=test-group, ssl.keystore.location=C:/test.jks, bootstrap.servers=localhost:1000, auto.commit.interval.ms=1000, security.protocol=SSL, enable.auto.commit=true, ssl.truststore.location=C:/test1.jks, ssl.keystore.password=pass123, ssl.key.password=pass123, ssl.truststore.password=pass123, session.timeout.ms=30000, auto.offset.reset=latest}

Please find the complete spout configs below {key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, group.id=test-group, ssl.keystore.location=C:/test.jks, bootstrap.servers=localhost:1000, auto.commit.interval.ms=1000, security.protocol=SSL, enable.auto.commit=true, ssl.truststore.location=C:/test1.jks, ssl.keystore.password=pass123, ssl.key.password=pass123, ssl.truststore.password=pass123, session.timeout.ms=30000, auto.offset.reset=latest}

推荐答案

我让 enable.auto.commit=true 将值设为 false 解决了我的问题.

I had enable.auto.commit=true making the value as false resolved the issue for me.

这篇关于KafkaSpout 元组重放抛出空指针异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆