json文件数据转入kafka主题 [英] Json file data into kafka topic
问题描述
如何使用kafka-console-producer将json文件数据插入kafka topic?每个json数据集可以存储为一条消息吗?
How to insert json file data into kafka topic using kafka-console-producer? Can each json data set be stored as a message?
示例-
{
"id": 1,
"first_name": "John",
"last_name": "Lindt",
"email": "jlindt@gmail.com",
"gender": "Male",
"ip_address": "1.2.3.4"
}
如果你使用这个命令 -
If you use this command -
cat sampledata.json|kafka-console-producer --broker-list localhost:9092 --topic stream-test-topic
每一行都被视为一个单独的消息.
Each line is taken as a separate message.
这样做的正确方法是什么?
What is the right way to do this?
谢谢!
ps-
该主题正在被 Elastic 搜索读取.示例 json 消息文件 -
The topic is being read by Elastic search. Sample json message file -
[{
"id": 1,
"first_name": "John",
"last_name": "Lindt",
"email": "jlindt@gmail.com",
"gender": "Male",
"ip_address": "1.2.3.4"
}, {
"id": 2,
"first_name": "Peter",
"last_name": "Friz",
"email": "Friz3@gmail.com",
"gender": "Male",
"ip_address": "4.5.6.7"
}, {
"id": 3,
"first_name": "Dell",
"last_name": "Chang",
"email": "Dellc@gmail.com",
"gender": "Female",
"ip_address": "8.9.10.11"
}, {
"id": 4,
"first_name": "Lolita",
"last_name": "John",
"email": "LolitaJ@gmail.com",
"gender": "Female",
"ip_address": "12.13.14.15"
}, {
"id": 5,
"first_name": "Pele",
"last_name": "Wang",
"email": "Pele@gmail.com",
"gender": "Male",
"ip_address": "16.17.18.19"
}, {
"id": 6,
"first_name": "Rene",
"last_name": "Charm",
"email": "Rene3@gmail.com",
"gender": "Male",
"ip_address": "20.21.22.23"
}]
推荐答案
如果文件中有 JSON 消息,可以使用以下方式写入 kafka 主题:
If you have JSON messages in the file, you can use following way to write in the kafka topic:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user-timeline < samplerecords.json
Kafka 生产者使用默认的 LineMessageReader
逐行读取消息.默认键和值序列化器是 StringSerializer
.它不会验证是否存在正确的 json,而是将原始字符串对象视为发布到 kafka 主题.但是如果你想验证你可以在 console-producer 命令中定义下面的配置.
Kafka producers reads the messages line by line using default LineMessageReader
. Default Key and value serializers are StringSerializer
. It will not validate whether there is a proper json or not, instead consider as raw string object as publish to a kafka topic. But if you want to validate you can define below configuration in console-producer command.
key.serializer
value.serializer
示例:
kafka-console-producer --broker-list localhost:9092 --topic testTopic--property value.serializer=custom.class.serialization.JsonSerializer
在消费者方面,您可以采用类似的方法.使用 JsonDeserializer 读取数据.
At the consumer side, you can do the similar approach. Use JsonDeserializer to read the data.
这篇关于json文件数据转入kafka主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!