使用JsonConverter的JSON格式的Kafka Connect HDFS Sink [英] Kafka Connect HDFS Sink for JSON format using JsonConverter

查看:691
本文介绍了使用JsonConverter的JSON格式的Kafka Connect HDFS Sink的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在JSON中从Kafka生产/消费。使用以下属性以JSON格式保存到JSON:

  key.converter = org.apache.kafka.connect.json.JsonConverter 
value.converter = org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable = false
value.converter.schemas.enable = false

生产者:

 卷曲- X POST -H内容类型:application / vnd.kafka.json.v1 + json \ 
--data'{ schema:{ type: boolean, optional:false ,名称:布尔,版本:2,文档:文档,参数:{ foo:栏}},有效载荷:true}' http:// / localhost:8082 / topics / test_hdfs_json

消费者:

  ./ bin /连接独立等/schema-registry/connect-avro-standalone.properties等/kafka-connect-hdfs/quickstart-hdfs.properties 

问题1:

  key.converter.schemas.enable = true 

value.converter.schemas.enable = true

获取例外:

  org.apache.kafka.connect.errors.DataException:具有schemas.enable的JsonDeserializer需要 schema和 payload字段,并且在org.apache.kafka.connect.json处不得包含其他字段
。 JsonConverter.toConnectData(JsonConverter.java:332)

问题2:



启用以上两个属性不会引发任何问题,但不会通过hdfs写入任何数据。



任何建议都会受到高度赞赏。 p>

谢谢

解决方案

转换器指的是如何从中转换数据Kafka主题将由连接器解释并写入HDFS。 HDFS连接器仅支持开箱即用avro或镶木地板写入HDFS。您可以找到有关如何将格式扩展为JSON的信息这里。如果您进行这样的扩展,建议您将其贡献给连接器的开源项目。


Produce to/Consume from Kafka in JSON. Save to HDFS in JSON using below properties :

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

Producer :

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \
      --data '{"schema": {"type": "boolean", "optional": false, "name": "bool", "version": 2, "doc": "the documentation", "parameters": {"foo": "bar" }}, "payload": true }' "http://localhost:8082/topics/test_hdfs_json"

Consumer :

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties

Issue-1:

key.converter.schemas.enable=true

value.converter.schemas.enable=true

Getting Exception:

org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)

Issue-2:

Enabling above two properties is not throwing any issue, but no data are written over hdfs.

Any suggestion will be highly appreciated.

Thanks

解决方案

The converter refers to how the data will be translated from the Kafka topic to be interpreted by the connector and written to HDFS. The HDFS connector only supports writing to HDFS in avro or parquet out of the box. You can find the information on how to extend the format to JSON here. If you make such an extension I encourage you to contribute it to the open source project for the connector.

这篇关于使用JsonConverter的JSON格式的Kafka Connect HDFS Sink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆