Kafka JDBC Sink 连接器为具有可选字段的架构的消息提供空指针异常 [英] Kafka JDBC Sink Connector gives a Null Pointer Exception for message with schema having an optional field
问题描述
Kafka JDBC Sink 连接器为具有此处为parentId"的可选字段的架构的消息提供空指针异常.我错过了什么吗?我正在使用开箱即用的 JSONConverter 和 JDBC Sink Connector
Kafka JDBC Sink Connector gives a Null Pointer Exception for a message with schema having an optional field here 'parentId'. Have I missed anything? I am using out of the box JSONConverter and JDBC Sink Connector
关于 Kafka 主题的消息是
A message on Kafka topic is
{
"schema":{
"type":"struct",
"fields":[
{
"field":"id",
"type":"string"
},
{
"field":"type",
"type":"string"
},
{
"field":"eventId",
"type":"string"
},
{
"field":"parentId",
"type":"string",
"optional":true
},
{
"field":"created",
"type":"int64",
"name":"org.apache.kafka.connect.data.Timestamp",
"version":1
}
]
},
"payload":{
"id":"asset-1",
"type":"parcel",
"eventId":"evt-1",
"created":1501834166000
}
}
而连接器具有这些属性
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=admin
topics=asset-topic
tasks.max=1
batch.size=1
auto.evolve=true
connection.user=admin
auto.create=true
connection.url=jdbc:postgresql://postgres-db:5432/fabricdb
value.converter=org.apache.kafka.connect.json.JsonConverter
pk.mode=record_value
pk.fields=id
但是 JDBC Sink 连接器对于可选字段 parentId 失败
But JDBC Sink Connector is failing for the optional field parentId as
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:698)
at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:742)
at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:361)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
推荐答案
根据 JsonConverter
字段值的源代码,标记为 optional
的字段必须在消息负载.
According to source code of JsonConverter
value for fields, that are marked as optional
has to be in message payload.
您可以在JsonConverter
类中找到将jsonValue 转换为
Object`
You can find in JsonConverter
class in method that converts jsonValue to
Object`
private static Object convertToConnect(Schema schema, JsonNode jsonValue) {
final Schema.Type schemaType;
if (schema != null) {
schemaType = schema.type();
if (jsonValue.isNull()) {
if (schema.defaultValue() != null)
return schema.defaultValue(); // any logical type conversions should already have been applied
if (schema.isOptional())
return null;
throw new DataException("Invalid null value for required " + schemaType + " field");
}
}
如果您的案例中存在架构,则总结如下:
Summaries if schema is present in your case it is:
{
"field":"parentId",
"type":"string",
"optional":true
}
值必须在消息有效负载中.它可以是null,但必须是.
Value has to be in the message payload. It can be null, but it has to be.
如果您查看其他站点的代码.代码,负责序列化.它为空引用添加了 NullNode
.
If you look at code from other site. Code, that is responsible for serialization. It adds NullNode
for null reference.
private static JsonNode convertToJson(Schema schema, Object logicalValue) {
if (logicalValue == null) {
if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema
return null;
if (schema.defaultValue() != null)
return convertToJson(schema, schema.defaultValue());
if (schema.isOptional())
return JsonNodeFactory.instance.nullNode();
throw new DataException("Conversion error: null value for field that is required and has no default value");
}
这篇关于Kafka JDBC Sink 连接器为具有可选字段的架构的消息提供空指针异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!