Json文件数据转换为kafka主题 [英] Json file data into kafka topic

查看:161
本文介绍了Json文件数据转换为kafka主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用kafka-console-producer将json文件数据插入kafka主题?每个json数据集都可以存储为消息吗?

How to insert json file data into kafka topic using kafka-console-producer? Can each json data set be stored as a message?

示例-

{
  "id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}

如果使用此命令-

cat sampledata.json|kafka-console-producer --broker-list localhost:9092 --topic  stream-test-topic

每行都被当作单独的消息.

Each line is taken as a separate message.

正确的方法是什么?

谢谢!

ps-

Elastic搜索正在读取该主题.样本JSON消息文件-

The topic is being read by Elastic search. Sample json message file -

[{
  "id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}, {
  "id": 2,
  "first_name": "Peter",
  "last_name": "Friz",
  "email": "Friz3@gmail.com",
  "gender": "Male",
  "ip_address": "4.5.6.7"
}, {
  "id": 3,
  "first_name": "Dell",
  "last_name": "Chang",
  "email": "Dellc@gmail.com",
  "gender": "Female",
  "ip_address": "8.9.10.11"
}, {
  "id": 4,
  "first_name": "Lolita",
  "last_name": "John",
  "email": "LolitaJ@gmail.com",
  "gender": "Female",
  "ip_address": "12.13.14.15"
}, {
  "id": 5,
  "first_name": "Pele",
  "last_name": "Wang",
  "email": "Pele@gmail.com",
  "gender": "Male",
  "ip_address": "16.17.18.19"
}, {
  "id": 6,
  "first_name": "Rene",
  "last_name": "Charm",
  "email": "Rene3@gmail.com",
  "gender": "Male",
  "ip_address": "20.21.22.23"
}]

推荐答案

从Kafka的角度来看,每个消息都是字节数组.如何处理客户的应用程序取决于客户的应用程序(生产者,消费者等).Kafka Producer,Consumer使用Deserializer,Serializer在业务对象(字符串,POJO)之间来回转换字节数组.

From point of view of Kafka each message is array of bytes. It's up to client's application (producer, consumer, etc), how it treats it. Kafka Producer, Consumer uses Deserializer, Serializer to transform from/to Array of bytes to/from business object (String, POJO)

您所面对的问题是Kafka Console生产者从标准输入中读取消息的方式.默认情况下,它使用 LineMessageReader ,它将每行视为新消息.您可以实现自己的实现,也可以在发送之前将json中的每个换行符转换为其他空白.

Issue with which you are facing is way that Kafka Console producer read message from standard input. Be default it uses LineMessageReader, which treats each line as new message. You can implement your own, or before sending translate each new line character in json to some other white space.

例如,您可以使用以下命令:

For example you can use following command:

jq -rc.sampledata.json |kafka-console-producer --broker-list localhost:9092 --topic stream-test-topic

这篇关于Json文件数据转换为kafka主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆