使用JsonConverter的JSON格式的Kafka Connect HDFS Sink [英] Kafka Connect HDFS Sink for JSON format using JsonConverter
问题描述
在JSON中从Kafka生产/消费。使用以下属性以JSON格式保存到JSON:
key.converter = org.apache.kafka.connect.json.JsonConverter
value.converter = org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable = false
value.converter.schemas.enable = false
生产者:
卷曲- X POST -H内容类型:application / vnd.kafka.json.v1 + json \
--data'{ schema:{ type: boolean, optional:false ,名称:布尔,版本:2,文档:文档,参数:{ foo:栏}},有效载荷:true}' http:// / localhost:8082 / topics / test_hdfs_json
消费者:
./ bin /连接独立等/schema-registry/connect-avro-standalone.properties等/kafka-connect-hdfs/quickstart-hdfs.properties
问题1:
key.converter.schemas.enable = true
value.converter.schemas.enable = true
获取例外:
org.apache.kafka.connect.errors.DataException:具有schemas.enable的JsonDeserializer需要 schema和 payload字段,并且在org.apache.kafka.connect.json处不得包含其他字段
。 JsonConverter.toConnectData(JsonConverter.java:332)
问题2:
启用以上两个属性不会引发任何问题,但不会通过hdfs写入任何数据。
任何建议都会受到高度赞赏。 p>
谢谢
转换器指的是如何从中转换数据Kafka主题将由连接器解释并写入HDFS。 HDFS连接器仅支持开箱即用avro或镶木地板写入HDFS。您可以找到有关如何将格式扩展为JSON的信息这里。如果您进行这样的扩展,建议您将其贡献给连接器的开源项目。
Produce to/Consume from Kafka in JSON. Save to HDFS in JSON using below properties :
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
Producer :
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \
--data '{"schema": {"type": "boolean", "optional": false, "name": "bool", "version": 2, "doc": "the documentation", "parameters": {"foo": "bar" }}, "payload": true }' "http://localhost:8082/topics/test_hdfs_json"
Consumer :
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties
Issue-1:
key.converter.schemas.enable=true
value.converter.schemas.enable=true
Getting Exception:
org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)
Issue-2:
Enabling above two properties is not throwing any issue, but no data are written over hdfs.
Any suggestion will be highly appreciated.
Thanks
The converter refers to how the data will be translated from the Kafka topic to be interpreted by the connector and written to HDFS. The HDFS connector only supports writing to HDFS in avro or parquet out of the box. You can find the information on how to extend the format to JSON here. If you make such an extension I encourage you to contribute it to the open source project for the connector.
这篇关于使用JsonConverter的JSON格式的Kafka Connect HDFS Sink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!