连接Confluent Kafka和InfluxDB时发生NullPointerException [英] NullPointerException when connecting Confluent Kafka and InfluxDB

查看:244
本文介绍了连接Confluent Kafka和InfluxDB时发生NullPointerException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Confluent InfluxDB Sink连接器将数据从kafka主题获取到我的InfluxDB中.

I'm trying to use the Confluent InfluxDB Sink Connector to get data from a kafka topic into my InfluxDB.

首先,我使用nifi将数据从日志文件传输到kafka主题,并且效果很好. kafka主题获取数据,如下所示:

Firstly, I transmit data to kafka topic from a log file by using nifi, and it works well. The kafka topic get the data, like below:

  {
    "topic": "testDB5",
    "key": null,
    "value": {
      "timestamp": "2019-03-20 01:24:29,461",
      "measurement": "INFO",
      "thread": "NiFi Web Server-795",
      "class": "org.apache.nifi.web.filter.RequestLogger",
      "message": "Attempting request for (anonymous) 
    },
    "partition": 0,
    "offset": 0
  }

然后,我通过Kafka Connect UI创建InfluxDB接收器连接器,并且出现以下异常:

Then, I create InfluxDB sink connector through the Kafka Connect UI , and I get the following exception:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    ... 10 more

但是,如果我使用手动将数据输入到另一个主题testDB1

But if I manually input data to another topic testDB1 by using

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic testDB1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"measurement","type":"string"},{"name":"timestamp","type":"string"}]}'

有效,我的influxDB可以获取数据.

It works, my influxDB can get the data.

这是连接配置:

connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=http://myurl
tasks.max=1
topics=testDB5

连接主题testDB1的配置与主题名称相同.

the configuration of connecting topic testDB1 is the same except topics name.

nifi是否有任何问题?但是它可以很好地将数据传输到主题.

Is there any problems in nifi ? But it can transmit data to topic well.

推荐答案

我找到了原因.这是因为在Nifi中,我使用PublishKafka_0_10将数据发布到Kafka主题,但是其版本太低了!

I found the reason. It's because in Nifi, I used PublishKafka_0_10 to publish the data to Kafka topic, but its version is to low!

当我在ksql中进行查询时,它说

When I make a query in ksql, it says that

Input record ConsumerRecord(..data..) has invalid (negative) timestamp.
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, 
or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

因此,我将其更改为PublishKafka_1_0,然后重新开始,它可以正常工作!我的influxDB可以获取数据.我无语.

So, I change it to PublishKafka_1_0 , and start again, and it works! My influxDB can get the data. I'm speechless.

感谢Robin Moffatt的回复,这对我很有帮助.

And thanks Robin Moffatt for the reply, its very helpful to me.

这篇关于连接Confluent Kafka和InfluxDB时发生NullPointerException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆