json文件数据转入kafka主题 [英] Json file data into kafka topic

查看:129
本文介绍了json文件数据转入kafka主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用kafka-console-producer将json文件数据插入kafka topic?每个json数据集可以存储为一条消息吗?

How to insert json file data into kafka topic using kafka-console-producer? Can each json data set be stored as a message?

示例-

{
  "id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}

如果你使用这个命令 -

If you use this command -

cat sampledata.json|kafka-console-producer --broker-list localhost:9092 --topic  stream-test-topic

每一行都被视为一个单独的消息.

Each line is taken as a separate message.

这样做的正确方法是什么?

What is the right way to do this?

谢谢!

ps-

该主题正在被 Elastic 搜索读取.示例 json 消息文件 -

The topic is being read by Elastic search. Sample json message file -

[{
  "id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}, {
  "id": 2,
  "first_name": "Peter",
  "last_name": "Friz",
  "email": "Friz3@gmail.com",
  "gender": "Male",
  "ip_address": "4.5.6.7"
}, {
  "id": 3,
  "first_name": "Dell",
  "last_name": "Chang",
  "email": "Dellc@gmail.com",
  "gender": "Female",
  "ip_address": "8.9.10.11"
}, {
  "id": 4,
  "first_name": "Lolita",
  "last_name": "John",
  "email": "LolitaJ@gmail.com",
  "gender": "Female",
  "ip_address": "12.13.14.15"
}, {
  "id": 5,
  "first_name": "Pele",
  "last_name": "Wang",
  "email": "Pele@gmail.com",
  "gender": "Male",
  "ip_address": "16.17.18.19"
}, {
  "id": 6,
  "first_name": "Rene",
  "last_name": "Charm",
  "email": "Rene3@gmail.com",
  "gender": "Male",
  "ip_address": "20.21.22.23"
}]

推荐答案

如果文件中有 JSON 消息,可以使用以下方式写入 kafka 主题:

If you have JSON messages in the file, you can use following way to write in the kafka topic:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user-timeline < samplerecords.json

Kafka 生产者使用默认的 LineMessageReader 逐行读取消息.默认键和值序列化器是 StringSerializer.它不会验证是否存在正确的 json,而是将原始字符串对象视为发布到 kafka 主题.但是如果你想验证你可以在 console-producer 命令中定义下面的配置.

Kafka producers reads the messages line by line using default LineMessageReader. Default Key and value serializers are StringSerializer. It will not validate whether there is a proper json or not, instead consider as raw string object as publish to a kafka topic. But if you want to validate you can define below configuration in console-producer command.

key.serializer
value.serializer

示例:

kafka-console-producer --broker-list localhost:9092 --topic testTopic--property value.serializer=custom.class.serialization.JsonSerializer 

在消费者方面,您可以采用类似的方法.使用 JsonDeserializer 读取数据.

At the consumer side, you can do the similar approach. Use JsonDeserializer to read the data.

这篇关于json文件数据转入kafka主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆