kafka connect - 使用 hdfs 接收器连接器的 ExtractTopic 转换抛出 NullPointerException [英] kafka connect - ExtractTopic transformation with hdfs sink connector throws NullPointerException

查看:24
本文介绍了kafka connect - 使用 hdfs 接收器连接器的 ExtractTopic 转换抛出 NullPointerException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用具有 kafka 2.0.0 的融合 hdfs 接收器连接器 5.0.0,我需要使用 ExtractTopic 转换 (https://docs.confluent.io/current/connect/transforms/extracttopic.html).我的连接器工作正常,但是当我添加这个转换时,我得到 NullPointerException,即使在只有 2 个属性的简单数据样本上也是如此.

I am using confluent hdfs sink connector 5.0.0 with kafka 2.0.0 and I need to use ExtractTopic transformation (https://docs.confluent.io/current/connect/transforms/extracttopic.html). My connector works fine but when I add this transformation I get NullPointerException, even on simple data sample with only 2 attributes.

ERROR Task hive-table-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.NullPointerException
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:352)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748) 

这是连接器的配置:

name=hive-table-test
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=hive_table_test

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=${env.SCHEMA_REGISTRY_URL}
value.converter.schema.registry.url=${env.SCHEMA_REGISTRY_URL}
schema.compatibility=BACKWARD

# HDFS configuration
# Use store.url instead of hdfs.url (deprecated) in later versions. Property store.url does not work, yet
hdfs.url=${env.HDFS_URL}
hadoop.conf.dir=/etc/hadoop/conf
hadoop.home=/opt/cloudera/parcels/CDH/lib/hadoop
topics.dir=${env.HDFS_TOPICS_DIR}

# Connector configuration
format.class=io.confluent.connect.hdfs.avro.AvroFormat
flush.size=100
rotate.interval.ms=60000

# Hive integration
hive.integration=true
hive.metastore.uris=${env.HIVE_METASTORE_URIS}
hive.conf.dir=/etc/hive/conf
hive.home=/opt/cloudera/parcels/CDH/lib/hive
hive.database=kafka_connect

# Transformations
transforms=InsertMetadata, ExtractTopic
transforms.InsertMetadata.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertMetadata.partition.field=partition
transforms.InsertMetadata.offset.field=offset

transforms.ExtractTopic.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ExtractTopic.field=name
transforms.ExtractTopic.skip.missing.or.null=true

我正在使用架构注册表,数据采用 avro 格式,并且我确定给定的属性 name 不为空.有什么建议?我需要的基本上是提取给定字段的内容并将其用作主题名称.

I am using schema registry, data is in avro format and I am sure the given attribute name is not null. Any suggestions? What I need is basically to extract content of given field and use it as a topic name.

即使在像这样的 avro 格式的简单 json 上也会发生这种情况:

It happens even on simple json like this in avro format:

{
   "attr": "tmp",
   "name": "topic1"
}

推荐答案

简短的回答是因为,您在转换中更改了主题的名称.

Short answer is because, you change the name of the topic in your Transformation.

每个主题分区的 Hdfs 连接器都有单独的 TopicPartitionWriter.当 SinkTask 负责处理消息时,在 open(...) 方法中为每个分区创建 TopicPartitionWriter 创建.

Hdfs Connector for each topic partition has separate TopicPartitionWriter. When SinkTask, that is responsible for processing messages is created in open(...) method for each partition TopicPartitionWriter is created.

当它处理 SinkRecords 时,它根据 topic 名称和 partition 编号查找 TopicPartitionWriter 并尝试将记录附加到其缓冲区.在您的情况下,它找不到任何写入消息.Transformation 更改了主题名称,并且没有为该对(主题、分区)创建任何 TopicPartitionWriter.

When it processed SinkRecords, based on topic name and partition number it looks up for TopicPartitionWriter and try to append record to its buffer. In your case it couldn't find any write for message. The topic name was changed by Transformation and for that pair (topic, partition) any TopicPartitionWriter was not created.

传递给 HdfsSinkTask::put(Collection records) 的 SinkRecords 已经设置了分区和主题,因此您不必应用任何转换.

SinkRecords, that are passed to HdfsSinkTask::put(Collection<SinkRecord> records), have partitions and topic already set, so you don't have to apply any Transformations.

我认为 io.confluent.connect.transforms.ExtractTopic 应该用于 SourceConnector.

I think io.confluent.connect.transforms.ExtractTopic should be rather used for SourceConnector.

这篇关于kafka connect - 使用 hdfs 接收器连接器的 ExtractTopic 转换抛出 NullPointerException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆