以非字符串格式将json事件发送到kafka [英] Sending json events to kafka in non-stringified format

查看:114
本文介绍了以非字符串格式将json事件发送到kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经创建了如下所示的数据框,其中使用了to_json()方法来创建JSON数组值.

I have created a dataframe like below, where I have used to_json() method to create JSON array value.

+---------------------------------------------------------------------------------------------------- 

|json_data                                                                                                  |
+-----------------------------------------------------------------------------------------------------------+
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|
+-----------------------------------------------------------------------------------------------------------+

我正在使用以下方法将数据帧发送到kafka主题.但是当我使用已经发送到kafka主题的数据时,我可以看到json数据被字符串化了.

I am using the below method to send the dataframe to a kafka topic. But when I consume the data which has been sent to the kafka topic, I could see the json data got stringified.

将数据推送到kafka的代码:

outgoingDF.selectExpr("CAST(Key as STRING) as key", "to_json(struct(*)) AS value")
        .write
        .format("kafka")
        .option("topic", "topic_test")
        .option("kafka.bootstrap.servers", "localhost:9093")
        .option("checkpointLocation", checkpointPath)
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("truncate", false)
        .save()

在kafka中接收到字符串化数据:

Stringified data being received in kafka:

{
    "name": "sensor1",
    "value-array": "[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]"
}

我们如何将数据发送到kafka主题,以免将字符串化的json作为输出?

How can we send the data to kafka topic, so that we dont see stringified jsons as output ?

推荐答案

json_data 的类型为 string &再次,您将 json_data 传递给 to_json(struct("*")))函数.

json_data is of type string & again you are passing json_data to to_json(struct("*")) function.

检查要转到kafka的 value 列.

Check value column which is going to kafka.

df.withColumn("value",to_json(struct($"*"))).show(false)
+-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|json_data                                                                                                  |value                                                                                                                                      |
+-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|{"json_data":"{\"name\":\"sensor1\",\"value-array\":[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]}"}|
+-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+

尝试下面的代码.

 df
 .withColumn("value-array",array(struct($"time",$"sensorvalue",$"tag1")))
 .selectExpr("CAST(Key as STRING) as key",to_json(struct($"name",$"value-array")).as("value"))
 .write
 .format("kafka")
 .option("topic", "topic_test")
 .option("kafka.bootstrap.servers", "localhost:9093")
 .option("checkpointLocation", checkpointPath)
 .option("kafka.sasl.mechanism", "PLAIN")
 .option("kafka.security.protocol", "SASL_SSL")
 .option("truncate", false)
 .save()


这篇关于以非字符串格式将json事件发送到kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆