org.apache.kafka.connect.errors.DataException: 无效的 JSON 记录默认值:null [英] org.apache.kafka.connect.errors.DataException: Invalid JSON for record default value: null

本文介绍了org.apache.kafka.connect.errors.DataException: 无效的 JSON 记录默认值:null的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用 KafkaAvroSerializer 生成的 Kafka Avro 主题.
我的独立属性如下.
我正在使用 Confluent 4.0.0 运行 Kafka 连接.

I have a Kafka Avro Topic generated using KafkaAvroSerializer.
My standalone properties are as below.
I am using Confluent 4.0.0 to run Kafka connect.

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<schema_registry_hostname>:8081
value.converter.schema.registry.url=<schema_registry_hostname>:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

当我在独立模式下为 hdfs 接收器运行 Kafka 连接器时,我收到此错误消息:

When I run Kafka connectors for hdfs sink in standalone mode, I get this error message:

[2018-06-27 17:47:41,746] ERROR WorkerSinkTask{id=camus-email-service-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Invalid JSON for record default value: null
    at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1640)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1527)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1410)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1290)
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1014)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:454)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2018-06-27 17:47:41,748] ERROR WorkerSinkTask{id=camus-email-service-0} Task is being killed and will not recover until manually restarted (    org.apache.kafka.connect.runtime.WorkerTask)
[2018-06-27 17:52:19,554] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect).

当我使用 kafka-avro-console-consumer 传递架构注册表时,我会反序列化 Kafka 消息.

When I use kafka-avro-console-consumer passing the schema registry, I get the Kafka messages deserialized.

即:

/usr/bin/kafka-avro-console-consumer --bootstrap-server <kafka-host>:9092 --topic <KafkaTopicName> --property schema.registry.url=<schema_registry_hostname>:8081

推荐答案

将订阅"列的数据类型更改为联合数据类型解决了该问题.Avroconverters 能够反序列化消息.

Changing the "subscription" column's datatype to Union datatype fixed the issue. Avroconverters were able to deserialize the messages.

这篇关于org.apache.kafka.connect.errors.DataException: 无效的 JSON 记录默认值:null的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆